linux/drivers/staging/lustre/lustre/ptlrpc/niobuf.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2015, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 */
  36
  37#define DEBUG_SUBSYSTEM S_RPC
  38#include "../include/obd_support.h"
  39#include "../include/lustre_net.h"
  40#include "../include/lustre_lib.h"
  41#include "../include/obd.h"
  42#include "../include/obd_class.h"
  43#include "ptlrpc_internal.h"
  44
  45/**
  46 * Helper function. Sends \a len bytes from \a base at offset \a offset
  47 * over \a conn connection to portal \a portal.
  48 * Returns 0 on success or error code.
  49 */
  50static int ptl_send_buf(lnet_handle_md_t *mdh, void *base, int len,
  51                        lnet_ack_req_t ack, struct ptlrpc_cb_id *cbid,
  52                        struct ptlrpc_connection *conn, int portal, __u64 xid,
  53                        unsigned int offset)
  54{
  55        int rc;
  56        lnet_md_t md;
  57
  58        LASSERT(portal != 0);
  59        CDEBUG(D_INFO, "conn=%p id %s\n", conn, libcfs_id2str(conn->c_peer));
  60        md.start = base;
  61        md.length = len;
  62        md.threshold = (ack == LNET_ACK_REQ) ? 2 : 1;
  63        md.options = PTLRPC_MD_OPTIONS;
  64        md.user_ptr = cbid;
  65        md.eq_handle = ptlrpc_eq_h;
  66
  67        if (unlikely(ack == LNET_ACK_REQ &&
  68                     OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_ACK,
  69                                          OBD_FAIL_ONCE))) {
  70                /* don't ask for the ack to simulate failing client */
  71                ack = LNET_NOACK_REQ;
  72        }
  73
  74        rc = LNetMDBind(md, LNET_UNLINK, mdh);
  75        if (unlikely(rc != 0)) {
  76                CERROR("LNetMDBind failed: %d\n", rc);
  77                LASSERT(rc == -ENOMEM);
  78                return -ENOMEM;
  79        }
  80
  81        CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %lld, offset %u\n",
  82               len, portal, xid, offset);
  83
  84        rc = LNetPut(conn->c_self, *mdh, ack,
  85                     conn->c_peer, portal, xid, offset, 0);
  86        if (unlikely(rc != 0)) {
  87                int rc2;
  88                /* We're going to get an UNLINK event when I unlink below,
  89                 * which will complete just like any other failed send, so
  90                 * I fall through and return success here!
  91                 */
  92                CERROR("LNetPut(%s, %d, %lld) failed: %d\n",
  93                       libcfs_id2str(conn->c_peer), portal, xid, rc);
  94                rc2 = LNetMDUnlink(*mdh);
  95                LASSERTF(rc2 == 0, "rc2 = %d\n", rc2);
  96        }
  97
  98        return 0;
  99}
 100
 101static void mdunlink_iterate_helper(lnet_handle_md_t *bd_mds, int count)
 102{
 103        int i;
 104
 105        for (i = 0; i < count; i++)
 106                LNetMDUnlink(bd_mds[i]);
 107}
 108
 109/**
 110 * Register bulk at the sender for later transfer.
 111 * Returns 0 on success or error code.
 112 */
 113static int ptlrpc_register_bulk(struct ptlrpc_request *req)
 114{
 115        struct ptlrpc_bulk_desc *desc = req->rq_bulk;
 116        lnet_process_id_t peer;
 117        int rc = 0;
 118        int rc2;
 119        int posted_md;
 120        int total_md;
 121        __u64 xid;
 122        lnet_handle_me_t me_h;
 123        lnet_md_t md;
 124
 125        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_BULK_GET_NET))
 126                return 0;
 127
 128        /* NB no locking required until desc is on the network */
 129        LASSERT(desc->bd_nob > 0);
 130        LASSERT(desc->bd_md_count == 0);
 131        LASSERT(desc->bd_md_max_brw <= PTLRPC_BULK_OPS_COUNT);
 132        LASSERT(desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES);
 133        LASSERT(desc->bd_req);
 134        LASSERT(desc->bd_type == BULK_PUT_SINK ||
 135                desc->bd_type == BULK_GET_SOURCE);
 136
 137        /* cleanup the state of the bulk for it will be reused */
 138        if (req->rq_resend || req->rq_send_state == LUSTRE_IMP_REPLAY)
 139                desc->bd_nob_transferred = 0;
 140        else
 141                LASSERT(desc->bd_nob_transferred == 0);
 142
 143        desc->bd_failure = 0;
 144
 145        peer = desc->bd_import->imp_connection->c_peer;
 146
 147        LASSERT(desc->bd_cbid.cbid_fn == client_bulk_callback);
 148        LASSERT(desc->bd_cbid.cbid_arg == desc);
 149
 150        /* An XID is only used for a single request from the client.
 151         * For retried bulk transfers, a new XID will be allocated in
 152         * in ptlrpc_check_set() if it needs to be resent, so it is not
 153         * using the same RDMA match bits after an error.
 154         *
 155         * For multi-bulk RPCs, rq_xid is the last XID needed for bulks. The
 156         * first bulk XID is power-of-two aligned before rq_xid. LU-1431
 157         */
 158        xid = req->rq_xid & ~((__u64)desc->bd_md_max_brw - 1);
 159        LASSERTF(!(desc->bd_registered &&
 160                   req->rq_send_state != LUSTRE_IMP_REPLAY) ||
 161                 xid != desc->bd_last_xid,
 162                 "registered: %d  rq_xid: %llu bd_last_xid: %llu\n",
 163                 desc->bd_registered, xid, desc->bd_last_xid);
 164
 165        total_md = (desc->bd_iov_count + LNET_MAX_IOV - 1) / LNET_MAX_IOV;
 166        desc->bd_registered = 1;
 167        desc->bd_last_xid = xid;
 168        desc->bd_md_count = total_md;
 169        md.user_ptr = &desc->bd_cbid;
 170        md.eq_handle = ptlrpc_eq_h;
 171        md.threshold = 1;                      /* PUT or GET */
 172
 173        for (posted_md = 0; posted_md < total_md; posted_md++, xid++) {
 174                md.options = PTLRPC_MD_OPTIONS |
 175                             ((desc->bd_type == BULK_GET_SOURCE) ?
 176                              LNET_MD_OP_GET : LNET_MD_OP_PUT);
 177                ptlrpc_fill_bulk_md(&md, desc, posted_md);
 178
 179                rc = LNetMEAttach(desc->bd_portal, peer, xid, 0,
 180                                  LNET_UNLINK, LNET_INS_AFTER, &me_h);
 181                if (rc != 0) {
 182                        CERROR("%s: LNetMEAttach failed x%llu/%d: rc = %d\n",
 183                               desc->bd_import->imp_obd->obd_name, xid,
 184                               posted_md, rc);
 185                        break;
 186                }
 187
 188                /* About to let the network at it... */
 189                rc = LNetMDAttach(me_h, md, LNET_UNLINK,
 190                                  &desc->bd_mds[posted_md]);
 191                if (rc != 0) {
 192                        CERROR("%s: LNetMDAttach failed x%llu/%d: rc = %d\n",
 193                               desc->bd_import->imp_obd->obd_name, xid,
 194                               posted_md, rc);
 195                        rc2 = LNetMEUnlink(me_h);
 196                        LASSERT(rc2 == 0);
 197                        break;
 198                }
 199        }
 200
 201        if (rc != 0) {
 202                LASSERT(rc == -ENOMEM);
 203                spin_lock(&desc->bd_lock);
 204                desc->bd_md_count -= total_md - posted_md;
 205                spin_unlock(&desc->bd_lock);
 206                LASSERT(desc->bd_md_count >= 0);
 207                mdunlink_iterate_helper(desc->bd_mds, desc->bd_md_max_brw);
 208                req->rq_status = -ENOMEM;
 209                return -ENOMEM;
 210        }
 211
 212        /* Set rq_xid to matchbits of the final bulk so that server can
 213         * infer the number of bulks that were prepared
 214         */
 215        req->rq_xid = --xid;
 216        LASSERTF(desc->bd_last_xid == (req->rq_xid & PTLRPC_BULK_OPS_MASK),
 217                 "bd_last_xid = x%llu, rq_xid = x%llu\n",
 218                 desc->bd_last_xid, req->rq_xid);
 219
 220        spin_lock(&desc->bd_lock);
 221        /* Holler if peer manages to touch buffers before he knows the xid */
 222        if (desc->bd_md_count != total_md)
 223                CWARN("%s: Peer %s touched %d buffers while I registered\n",
 224                      desc->bd_import->imp_obd->obd_name, libcfs_id2str(peer),
 225                      total_md - desc->bd_md_count);
 226        spin_unlock(&desc->bd_lock);
 227
 228        CDEBUG(D_NET, "Setup %u bulk %s buffers: %u pages %u bytes, xid x%#llx-%#llx, portal %u\n",
 229               desc->bd_md_count,
 230               desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink",
 231               desc->bd_iov_count, desc->bd_nob,
 232               desc->bd_last_xid, req->rq_xid, desc->bd_portal);
 233
 234        return 0;
 235}
 236
 237/**
 238 * Disconnect a bulk desc from the network. Idempotent. Not
 239 * thread-safe (i.e. only interlocks with completion callback).
 240 * Returns 1 on success or 0 if network unregistration failed for whatever
 241 * reason.
 242 */
 243int ptlrpc_unregister_bulk(struct ptlrpc_request *req, int async)
 244{
 245        struct ptlrpc_bulk_desc *desc = req->rq_bulk;
 246        wait_queue_head_t *wq;
 247        struct l_wait_info lwi;
 248        int rc;
 249
 250        LASSERT(!in_interrupt());     /* might sleep */
 251
 252        /* Let's setup deadline for reply unlink. */
 253        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK) &&
 254            async && req->rq_bulk_deadline == 0)
 255                req->rq_bulk_deadline = ktime_get_real_seconds() + LONG_UNLINK;
 256
 257        if (ptlrpc_client_bulk_active(req) == 0)        /* completed or */
 258                return 1;                               /* never registered */
 259
 260        LASSERT(desc->bd_req == req);  /* bd_req NULL until registered */
 261
 262        /* the unlink ensures the callback happens ASAP and is the last
 263         * one.  If it fails, it must be because completion just happened,
 264         * but we must still l_wait_event() in this case to give liblustre
 265         * a chance to run client_bulk_callback()
 266         */
 267        mdunlink_iterate_helper(desc->bd_mds, desc->bd_md_max_brw);
 268
 269        if (ptlrpc_client_bulk_active(req) == 0)        /* completed or */
 270                return 1;                               /* never registered */
 271
 272        /* Move to "Unregistering" phase as bulk was not unlinked yet. */
 273        ptlrpc_rqphase_move(req, RQ_PHASE_UNREGISTERING);
 274
 275        /* Do not wait for unlink to finish. */
 276        if (async)
 277                return 0;
 278
 279        if (req->rq_set)
 280                wq = &req->rq_set->set_waitq;
 281        else
 282                wq = &req->rq_reply_waitq;
 283
 284        for (;;) {
 285                /* Network access will complete in finite time but the HUGE
 286                 * timeout lets us CWARN for visibility of sluggish LNDs
 287                 */
 288                lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
 289                                           cfs_time_seconds(1), NULL, NULL);
 290                rc = l_wait_event(*wq, !ptlrpc_client_bulk_active(req), &lwi);
 291                if (rc == 0) {
 292                        ptlrpc_rqphase_move(req, req->rq_next_phase);
 293                        return 1;
 294                }
 295
 296                LASSERT(rc == -ETIMEDOUT);
 297                DEBUG_REQ(D_WARNING, req, "Unexpectedly long timeout: desc %p",
 298                          desc);
 299        }
 300        return 0;
 301}
 302EXPORT_SYMBOL(ptlrpc_unregister_bulk);
 303
 304static void ptlrpc_at_set_reply(struct ptlrpc_request *req, int flags)
 305{
 306        struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
 307        struct ptlrpc_service *svc = svcpt->scp_service;
 308        int service_time = max_t(int, ktime_get_real_seconds() -
 309                                 req->rq_arrival_time.tv_sec, 1);
 310
 311        if (!(flags & PTLRPC_REPLY_EARLY) &&
 312            (req->rq_type != PTL_RPC_MSG_ERR) && req->rq_reqmsg &&
 313            !(lustre_msg_get_flags(req->rq_reqmsg) &
 314              (MSG_RESENT | MSG_REPLAY |
 315               MSG_REQ_REPLAY_DONE | MSG_LOCK_REPLAY_DONE))) {
 316                /* early replies, errors and recovery requests don't count
 317                 * toward our service time estimate
 318                 */
 319                int oldse = at_measured(&svcpt->scp_at_estimate, service_time);
 320
 321                if (oldse != 0) {
 322                        DEBUG_REQ(D_ADAPTTO, req,
 323                                  "svc %s changed estimate from %d to %d",
 324                                  svc->srv_name, oldse,
 325                                  at_get(&svcpt->scp_at_estimate));
 326                }
 327        }
 328        /* Report actual service time for client latency calc */
 329        lustre_msg_set_service_time(req->rq_repmsg, service_time);
 330        /* Report service time estimate for future client reqs, but report 0
 331         * (to be ignored by client) if it's a error reply during recovery.
 332         * (bz15815)
 333         */
 334        if (req->rq_type == PTL_RPC_MSG_ERR && !req->rq_export)
 335                lustre_msg_set_timeout(req->rq_repmsg, 0);
 336        else
 337                lustre_msg_set_timeout(req->rq_repmsg,
 338                                       at_get(&svcpt->scp_at_estimate));
 339
 340        if (req->rq_reqmsg &&
 341            !(lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT)) {
 342                CDEBUG(D_ADAPTTO, "No early reply support: flags=%#x req_flags=%#x magic=%x/%x len=%d\n",
 343                       flags, lustre_msg_get_flags(req->rq_reqmsg),
 344                       lustre_msg_get_magic(req->rq_reqmsg),
 345                       lustre_msg_get_magic(req->rq_repmsg), req->rq_replen);
 346        }
 347}
 348
 349/**
 350 * Send request reply from request \a req reply buffer.
 351 * \a flags defines reply types
 352 * Returns 0 on success or error code
 353 */
 354int ptlrpc_send_reply(struct ptlrpc_request *req, int flags)
 355{
 356        struct ptlrpc_reply_state *rs = req->rq_reply_state;
 357        struct ptlrpc_connection *conn;
 358        int rc;
 359
 360        /* We must already have a reply buffer (only ptlrpc_error() may be
 361         * called without one). The reply generated by sptlrpc layer (e.g.
 362         * error notify, etc.) might have NULL rq->reqmsg; Otherwise we must
 363         * have a request buffer which is either the actual (swabbed) incoming
 364         * request, or a saved copy if this is a req saved in
 365         * target_queue_final_reply().
 366         */
 367        LASSERT(req->rq_no_reply == 0);
 368        LASSERT(req->rq_reqbuf);
 369        LASSERT(rs);
 370        LASSERT((flags & PTLRPC_REPLY_MAYBE_DIFFICULT) || !rs->rs_difficult);
 371        LASSERT(req->rq_repmsg);
 372        LASSERT(req->rq_repmsg == rs->rs_msg);
 373        LASSERT(rs->rs_cb_id.cbid_fn == reply_out_callback);
 374        LASSERT(rs->rs_cb_id.cbid_arg == rs);
 375
 376        /* There may be no rq_export during failover */
 377
 378        if (unlikely(req->rq_export && req->rq_export->exp_obd &&
 379                     req->rq_export->exp_obd->obd_fail)) {
 380                /* Failed obd's only send ENODEV */
 381                req->rq_type = PTL_RPC_MSG_ERR;
 382                req->rq_status = -ENODEV;
 383                CDEBUG(D_HA, "sending ENODEV from failed obd %d\n",
 384                       req->rq_export->exp_obd->obd_minor);
 385        }
 386
 387        /* In order to keep interoperability with the client (< 2.3) which
 388         * doesn't have pb_jobid in ptlrpc_body, We have to shrink the
 389         * ptlrpc_body in reply buffer to ptlrpc_body_v2, otherwise, the
 390         * reply buffer on client will be overflow.
 391         *
 392         * XXX Remove this whenever we drop the interoperability with
 393         * such client.
 394         */
 395        req->rq_replen = lustre_shrink_msg(req->rq_repmsg, 0,
 396                                           sizeof(struct ptlrpc_body_v2), 1);
 397
 398        if (req->rq_type != PTL_RPC_MSG_ERR)
 399                req->rq_type = PTL_RPC_MSG_REPLY;
 400
 401        lustre_msg_set_type(req->rq_repmsg, req->rq_type);
 402        lustre_msg_set_status(req->rq_repmsg,
 403                              ptlrpc_status_hton(req->rq_status));
 404        lustre_msg_set_opc(req->rq_repmsg,
 405                req->rq_reqmsg ? lustre_msg_get_opc(req->rq_reqmsg) : 0);
 406
 407        target_pack_pool_reply(req);
 408
 409        ptlrpc_at_set_reply(req, flags);
 410
 411        if (!req->rq_export || !req->rq_export->exp_connection)
 412                conn = ptlrpc_connection_get(req->rq_peer, req->rq_self, NULL);
 413        else
 414                conn = ptlrpc_connection_addref(req->rq_export->exp_connection);
 415
 416        if (unlikely(!conn)) {
 417                CERROR("not replying on NULL connection\n"); /* bug 9635 */
 418                return -ENOTCONN;
 419        }
 420        ptlrpc_rs_addref(rs);              /* +1 ref for the network */
 421
 422        rc = sptlrpc_svc_wrap_reply(req);
 423        if (unlikely(rc))
 424                goto out;
 425
 426        req->rq_sent = ktime_get_real_seconds();
 427
 428        rc = ptl_send_buf(&rs->rs_md_h, rs->rs_repbuf, rs->rs_repdata_len,
 429                          (rs->rs_difficult && !rs->rs_no_ack) ?
 430                          LNET_ACK_REQ : LNET_NOACK_REQ,
 431                          &rs->rs_cb_id, conn,
 432                          ptlrpc_req2svc(req)->srv_rep_portal,
 433                          req->rq_xid, req->rq_reply_off);
 434out:
 435        if (unlikely(rc != 0))
 436                ptlrpc_req_drop_rs(req);
 437        ptlrpc_connection_put(conn);
 438        return rc;
 439}
 440EXPORT_SYMBOL(ptlrpc_send_reply);
 441
 442int ptlrpc_reply(struct ptlrpc_request *req)
 443{
 444        if (req->rq_no_reply)
 445                return 0;
 446        return ptlrpc_send_reply(req, 0);
 447}
 448EXPORT_SYMBOL(ptlrpc_reply);
 449
 450/**
 451 * For request \a req send an error reply back. Create empty
 452 * reply buffers if necessary.
 453 */
 454int ptlrpc_send_error(struct ptlrpc_request *req, int may_be_difficult)
 455{
 456        int rc;
 457
 458        if (req->rq_no_reply)
 459                return 0;
 460
 461        if (!req->rq_repmsg) {
 462                rc = lustre_pack_reply(req, 1, NULL, NULL);
 463                if (rc)
 464                        return rc;
 465        }
 466
 467        if (req->rq_status != -ENOSPC && req->rq_status != -EACCES &&
 468            req->rq_status != -EPERM && req->rq_status != -ENOENT &&
 469            req->rq_status != -EINPROGRESS && req->rq_status != -EDQUOT)
 470                req->rq_type = PTL_RPC_MSG_ERR;
 471
 472        rc = ptlrpc_send_reply(req, may_be_difficult);
 473        return rc;
 474}
 475EXPORT_SYMBOL(ptlrpc_send_error);
 476
 477int ptlrpc_error(struct ptlrpc_request *req)
 478{
 479        return ptlrpc_send_error(req, 0);
 480}
 481EXPORT_SYMBOL(ptlrpc_error);
 482
 483/**
 484 * Send request \a request.
 485 * if \a noreply is set, don't expect any reply back and don't set up
 486 * reply buffers.
 487 * Returns 0 on success or error code.
 488 */
 489int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
 490{
 491        int rc;
 492        int rc2;
 493        int mpflag = 0;
 494        struct ptlrpc_connection *connection;
 495        lnet_handle_me_t reply_me_h;
 496        lnet_md_t reply_md;
 497        struct obd_device *obd = request->rq_import->imp_obd;
 498
 499        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_RPC))
 500                return 0;
 501
 502        LASSERT(request->rq_type == PTL_RPC_MSG_REQUEST);
 503        LASSERT(request->rq_wait_ctx == 0);
 504
 505        /* If this is a re-transmit, we're required to have disengaged
 506         * cleanly from the previous attempt
 507         */
 508        LASSERT(!request->rq_receiving_reply);
 509        LASSERT(!((lustre_msg_get_flags(request->rq_reqmsg) & MSG_REPLAY) &&
 510                  (request->rq_import->imp_state == LUSTRE_IMP_FULL)));
 511
 512        if (unlikely(obd && obd->obd_fail)) {
 513                CDEBUG(D_HA, "muting rpc for failed imp obd %s\n",
 514                       obd->obd_name);
 515                /* this prevents us from waiting in ptlrpc_queue_wait */
 516                spin_lock(&request->rq_lock);
 517                request->rq_err = 1;
 518                spin_unlock(&request->rq_lock);
 519                request->rq_status = -ENODEV;
 520                return -ENODEV;
 521        }
 522
 523        connection = request->rq_import->imp_connection;
 524
 525        lustre_msg_set_handle(request->rq_reqmsg,
 526                              &request->rq_import->imp_remote_handle);
 527        lustre_msg_set_type(request->rq_reqmsg, PTL_RPC_MSG_REQUEST);
 528        lustre_msg_set_conn_cnt(request->rq_reqmsg,
 529                                request->rq_import->imp_conn_cnt);
 530        lustre_msghdr_set_flags(request->rq_reqmsg,
 531                                request->rq_import->imp_msghdr_flags);
 532
 533        if (request->rq_resend)
 534                lustre_msg_add_flags(request->rq_reqmsg, MSG_RESENT);
 535
 536        if (request->rq_memalloc)
 537                mpflag = cfs_memory_pressure_get_and_set();
 538
 539        rc = sptlrpc_cli_wrap_request(request);
 540        if (rc)
 541                goto out;
 542
 543        /* bulk register should be done after wrap_request() */
 544        if (request->rq_bulk) {
 545                rc = ptlrpc_register_bulk(request);
 546                if (rc != 0)
 547                        goto out;
 548        }
 549
 550        if (!noreply) {
 551                LASSERT(request->rq_replen != 0);
 552                if (!request->rq_repbuf) {
 553                        LASSERT(!request->rq_repdata);
 554                        LASSERT(!request->rq_repmsg);
 555                        rc = sptlrpc_cli_alloc_repbuf(request,
 556                                                      request->rq_replen);
 557                        if (rc) {
 558                                /* this prevents us from looping in
 559                                 * ptlrpc_queue_wait
 560                                 */
 561                                spin_lock(&request->rq_lock);
 562                                request->rq_err = 1;
 563                                spin_unlock(&request->rq_lock);
 564                                request->rq_status = rc;
 565                                goto cleanup_bulk;
 566                        }
 567                } else {
 568                        request->rq_repdata = NULL;
 569                        request->rq_repmsg = NULL;
 570                }
 571
 572                rc = LNetMEAttach(request->rq_reply_portal,/*XXX FIXME bug 249*/
 573                                  connection->c_peer, request->rq_xid, 0,
 574                                  LNET_UNLINK, LNET_INS_AFTER, &reply_me_h);
 575                if (rc != 0) {
 576                        CERROR("LNetMEAttach failed: %d\n", rc);
 577                        LASSERT(rc == -ENOMEM);
 578                        rc = -ENOMEM;
 579                        goto cleanup_bulk;
 580                }
 581        }
 582
 583        spin_lock(&request->rq_lock);
 584        /* If the MD attach succeeds, there _will_ be a reply_in callback */
 585        request->rq_receiving_reply = !noreply;
 586        request->rq_req_unlink = 1;
 587        /* We are responsible for unlinking the reply buffer */
 588        request->rq_reply_unlink = !noreply;
 589        /* Clear any flags that may be present from previous sends. */
 590        request->rq_replied = 0;
 591        request->rq_err = 0;
 592        request->rq_timedout = 0;
 593        request->rq_net_err = 0;
 594        request->rq_resend = 0;
 595        request->rq_restart = 0;
 596        request->rq_reply_truncate = 0;
 597        spin_unlock(&request->rq_lock);
 598
 599        if (!noreply) {
 600                reply_md.start = request->rq_repbuf;
 601                reply_md.length = request->rq_repbuf_len;
 602                /* Allow multiple early replies */
 603                reply_md.threshold = LNET_MD_THRESH_INF;
 604                /* Manage remote for early replies */
 605                reply_md.options = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT |
 606                        LNET_MD_MANAGE_REMOTE |
 607                        LNET_MD_TRUNCATE; /* allow to make EOVERFLOW error */
 608                reply_md.user_ptr = &request->rq_reply_cbid;
 609                reply_md.eq_handle = ptlrpc_eq_h;
 610
 611                /* We must see the unlink callback to unset rq_reply_unlink,
 612                 * so we can't auto-unlink
 613                 */
 614                rc = LNetMDAttach(reply_me_h, reply_md, LNET_RETAIN,
 615                                  &request->rq_reply_md_h);
 616                if (rc != 0) {
 617                        CERROR("LNetMDAttach failed: %d\n", rc);
 618                        LASSERT(rc == -ENOMEM);
 619                        spin_lock(&request->rq_lock);
 620                        /* ...but the MD attach didn't succeed... */
 621                        request->rq_receiving_reply = 0;
 622                        spin_unlock(&request->rq_lock);
 623                        rc = -ENOMEM;
 624                        goto cleanup_me;
 625                }
 626
 627                CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %llu, portal %u\n",
 628                       request->rq_repbuf_len, request->rq_xid,
 629                       request->rq_reply_portal);
 630        }
 631
 632        /* add references on request for request_out_callback */
 633        ptlrpc_request_addref(request);
 634        if (obd && obd->obd_svc_stats)
 635                lprocfs_counter_add(obd->obd_svc_stats, PTLRPC_REQACTIVE_CNTR,
 636                        atomic_read(&request->rq_import->imp_inflight));
 637
 638        OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_SEND, request->rq_timeout + 5);
 639
 640        ktime_get_real_ts64(&request->rq_arrival_time);
 641        request->rq_sent = ktime_get_real_seconds();
 642        /* We give the server rq_timeout secs to process the req, and
 643         * add the network latency for our local timeout.
 644         */
 645        request->rq_deadline = request->rq_sent + request->rq_timeout +
 646                ptlrpc_at_get_net_latency(request);
 647
 648        ptlrpc_pinger_sending_on_import(request->rq_import);
 649
 650        DEBUG_REQ(D_INFO, request, "send flg=%x",
 651                  lustre_msg_get_flags(request->rq_reqmsg));
 652        rc = ptl_send_buf(&request->rq_req_md_h,
 653                          request->rq_reqbuf, request->rq_reqdata_len,
 654                          LNET_NOACK_REQ, &request->rq_req_cbid,
 655                          connection,
 656                          request->rq_request_portal,
 657                          request->rq_xid, 0);
 658        if (rc == 0)
 659                goto out;
 660
 661        ptlrpc_req_finished(request);
 662        if (noreply)
 663                goto out;
 664
 665 cleanup_me:
 666        /* MEUnlink is safe; the PUT didn't even get off the ground, and
 667         * nobody apart from the PUT's target has the right nid+XID to
 668         * access the reply buffer.
 669         */
 670        rc2 = LNetMEUnlink(reply_me_h);
 671        LASSERT(rc2 == 0);
 672        /* UNLINKED callback called synchronously */
 673        LASSERT(!request->rq_receiving_reply);
 674
 675 cleanup_bulk:
 676        /* We do sync unlink here as there was no real transfer here so
 677         * the chance to have long unlink to sluggish net is smaller here.
 678         */
 679        ptlrpc_unregister_bulk(request, 0);
 680 out:
 681        if (request->rq_memalloc)
 682                cfs_memory_pressure_restore(mpflag);
 683        return rc;
 684}
 685EXPORT_SYMBOL(ptl_send_rpc);
 686
 687/**
 688 * Register request buffer descriptor for request receiving.
 689 */
 690int ptlrpc_register_rqbd(struct ptlrpc_request_buffer_desc *rqbd)
 691{
 692        struct ptlrpc_service *service = rqbd->rqbd_svcpt->scp_service;
 693        static lnet_process_id_t match_id = {LNET_NID_ANY, LNET_PID_ANY};
 694        int rc;
 695        lnet_md_t md;
 696        lnet_handle_me_t me_h;
 697
 698        CDEBUG(D_NET, "LNetMEAttach: portal %d\n",
 699               service->srv_req_portal);
 700
 701        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_RQBD))
 702                return -ENOMEM;
 703
 704        /* NB: CPT affinity service should use new LNet flag LNET_INS_LOCAL,
 705         * which means buffer can only be attached on local CPT, and LND
 706         * threads can find it by grabbing a local lock
 707         */
 708        rc = LNetMEAttach(service->srv_req_portal,
 709                          match_id, 0, ~0, LNET_UNLINK,
 710                          rqbd->rqbd_svcpt->scp_cpt >= 0 ?
 711                          LNET_INS_LOCAL : LNET_INS_AFTER, &me_h);
 712        if (rc != 0) {
 713                CERROR("LNetMEAttach failed: %d\n", rc);
 714                return -ENOMEM;
 715        }
 716
 717        LASSERT(rqbd->rqbd_refcount == 0);
 718        rqbd->rqbd_refcount = 1;
 719
 720        md.start = rqbd->rqbd_buffer;
 721        md.length = service->srv_buf_size;
 722        md.max_size = service->srv_max_req_size;
 723        md.threshold = LNET_MD_THRESH_INF;
 724        md.options = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT | LNET_MD_MAX_SIZE;
 725        md.user_ptr = &rqbd->rqbd_cbid;
 726        md.eq_handle = ptlrpc_eq_h;
 727
 728        rc = LNetMDAttach(me_h, md, LNET_UNLINK, &rqbd->rqbd_md_h);
 729        if (rc == 0)
 730                return 0;
 731
 732        CERROR("LNetMDAttach failed: %d;\n", rc);
 733        LASSERT(rc == -ENOMEM);
 734        rc = LNetMEUnlink(me_h);
 735        LASSERT(rc == 0);
 736        rqbd->rqbd_refcount = 0;
 737
 738        return -ENOMEM;
 739}
 740