linux/drivers/staging/lustre/lustre/ptlrpc/niobuf.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 */
  36
  37#define DEBUG_SUBSYSTEM S_RPC
  38#include "../include/obd_support.h"
  39#include "../include/lustre_net.h"
  40#include "../include/lustre_lib.h"
  41#include "../include/obd.h"
  42#include "../include/obd_class.h"
  43#include "ptlrpc_internal.h"
  44
  45/**
  46 * Helper function. Sends \a len bytes from \a base at offset \a offset
  47 * over \a conn connection to portal \a portal.
  48 * Returns 0 on success or error code.
  49 */
  50static int ptl_send_buf(lnet_handle_md_t *mdh, void *base, int len,
  51                        lnet_ack_req_t ack, struct ptlrpc_cb_id *cbid,
  52                        struct ptlrpc_connection *conn, int portal, __u64 xid,
  53                        unsigned int offset)
  54{
  55        int           rc;
  56        lnet_md_t        md;
  57
  58        LASSERT(portal != 0);
  59        LASSERT(conn != NULL);
  60        CDEBUG(D_INFO, "conn=%p id %s\n", conn, libcfs_id2str(conn->c_peer));
  61        md.start     = base;
  62        md.length    = len;
  63        md.threshold = (ack == LNET_ACK_REQ) ? 2 : 1;
  64        md.options   = PTLRPC_MD_OPTIONS;
  65        md.user_ptr  = cbid;
  66        md.eq_handle = ptlrpc_eq_h;
  67
  68        if (unlikely(ack == LNET_ACK_REQ &&
  69                     OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_ACK,
  70                                          OBD_FAIL_ONCE))) {
  71                /* don't ask for the ack to simulate failing client */
  72                ack = LNET_NOACK_REQ;
  73        }
  74
  75        rc = LNetMDBind(md, LNET_UNLINK, mdh);
  76        if (unlikely(rc != 0)) {
  77                CERROR("LNetMDBind failed: %d\n", rc);
  78                LASSERT(rc == -ENOMEM);
  79                return -ENOMEM;
  80        }
  81
  82        CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %lld, offset %u\n",
  83               len, portal, xid, offset);
  84
  85        rc = LNetPut(conn->c_self, *mdh, ack,
  86                     conn->c_peer, portal, xid, offset, 0);
  87        if (unlikely(rc != 0)) {
  88                int rc2;
  89                /* We're going to get an UNLINK event when I unlink below,
  90                 * which will complete just like any other failed send, so
  91                 * I fall through and return success here! */
  92                CERROR("LNetPut(%s, %d, %lld) failed: %d\n",
  93                       libcfs_id2str(conn->c_peer), portal, xid, rc);
  94                rc2 = LNetMDUnlink(*mdh);
  95                LASSERTF(rc2 == 0, "rc2 = %d\n", rc2);
  96        }
  97
  98        return 0;
  99}
 100
 101static void mdunlink_iterate_helper(lnet_handle_md_t *bd_mds, int count)
 102{
 103        int i;
 104
 105        for (i = 0; i < count; i++)
 106                LNetMDUnlink(bd_mds[i]);
 107}
 108
 109
 110/**
 111 * Register bulk at the sender for later transfer.
 112 * Returns 0 on success or error code.
 113 */
 114int ptlrpc_register_bulk(struct ptlrpc_request *req)
 115{
 116        struct ptlrpc_bulk_desc *desc = req->rq_bulk;
 117        lnet_process_id_t peer;
 118        int rc = 0;
 119        int rc2;
 120        int posted_md;
 121        int total_md;
 122        __u64 xid;
 123        lnet_handle_me_t  me_h;
 124        lnet_md_t        md;
 125
 126        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_BULK_GET_NET))
 127                return 0;
 128
 129        /* NB no locking required until desc is on the network */
 130        LASSERT(desc->bd_nob > 0);
 131        LASSERT(desc->bd_md_count == 0);
 132        LASSERT(desc->bd_md_max_brw <= PTLRPC_BULK_OPS_COUNT);
 133        LASSERT(desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES);
 134        LASSERT(desc->bd_req != NULL);
 135        LASSERT(desc->bd_type == BULK_PUT_SINK ||
 136                desc->bd_type == BULK_GET_SOURCE);
 137
 138        /* cleanup the state of the bulk for it will be reused */
 139        if (req->rq_resend || req->rq_send_state == LUSTRE_IMP_REPLAY)
 140                desc->bd_nob_transferred = 0;
 141        else
 142                LASSERT(desc->bd_nob_transferred == 0);
 143
 144        desc->bd_failure = 0;
 145
 146        peer = desc->bd_import->imp_connection->c_peer;
 147
 148        LASSERT(desc->bd_cbid.cbid_fn == client_bulk_callback);
 149        LASSERT(desc->bd_cbid.cbid_arg == desc);
 150
 151        /* An XID is only used for a single request from the client.
 152         * For retried bulk transfers, a new XID will be allocated in
 153         * in ptlrpc_check_set() if it needs to be resent, so it is not
 154         * using the same RDMA match bits after an error.
 155         *
 156         * For multi-bulk RPCs, rq_xid is the last XID needed for bulks. The
 157         * first bulk XID is power-of-two aligned before rq_xid. LU-1431 */
 158        xid = req->rq_xid & ~((__u64)desc->bd_md_max_brw - 1);
 159        LASSERTF(!(desc->bd_registered &&
 160                   req->rq_send_state != LUSTRE_IMP_REPLAY) ||
 161                 xid != desc->bd_last_xid,
 162                 "registered: %d  rq_xid: %llu bd_last_xid: %llu\n",
 163                 desc->bd_registered, xid, desc->bd_last_xid);
 164
 165        total_md = (desc->bd_iov_count + LNET_MAX_IOV - 1) / LNET_MAX_IOV;
 166        desc->bd_registered = 1;
 167        desc->bd_last_xid = xid;
 168        desc->bd_md_count = total_md;
 169        md.user_ptr = &desc->bd_cbid;
 170        md.eq_handle = ptlrpc_eq_h;
 171        md.threshold = 1;                      /* PUT or GET */
 172
 173        for (posted_md = 0; posted_md < total_md; posted_md++, xid++) {
 174                md.options = PTLRPC_MD_OPTIONS |
 175                             ((desc->bd_type == BULK_GET_SOURCE) ?
 176                              LNET_MD_OP_GET : LNET_MD_OP_PUT);
 177                ptlrpc_fill_bulk_md(&md, desc, posted_md);
 178
 179                rc = LNetMEAttach(desc->bd_portal, peer, xid, 0,
 180                                  LNET_UNLINK, LNET_INS_AFTER, &me_h);
 181                if (rc != 0) {
 182                        CERROR("%s: LNetMEAttach failed x%llu/%d: rc = %d\n",
 183                               desc->bd_import->imp_obd->obd_name, xid,
 184                               posted_md, rc);
 185                        break;
 186                }
 187
 188                /* About to let the network at it... */
 189                rc = LNetMDAttach(me_h, md, LNET_UNLINK,
 190                                  &desc->bd_mds[posted_md]);
 191                if (rc != 0) {
 192                        CERROR("%s: LNetMDAttach failed x%llu/%d: rc = %d\n",
 193                               desc->bd_import->imp_obd->obd_name, xid,
 194                               posted_md, rc);
 195                        rc2 = LNetMEUnlink(me_h);
 196                        LASSERT(rc2 == 0);
 197                        break;
 198                }
 199        }
 200
 201        if (rc != 0) {
 202                LASSERT(rc == -ENOMEM);
 203                spin_lock(&desc->bd_lock);
 204                desc->bd_md_count -= total_md - posted_md;
 205                spin_unlock(&desc->bd_lock);
 206                LASSERT(desc->bd_md_count >= 0);
 207                mdunlink_iterate_helper(desc->bd_mds, desc->bd_md_max_brw);
 208                req->rq_status = -ENOMEM;
 209                return -ENOMEM;
 210        }
 211
 212        /* Set rq_xid to matchbits of the final bulk so that server can
 213         * infer the number of bulks that were prepared */
 214        req->rq_xid = --xid;
 215        LASSERTF(desc->bd_last_xid == (req->rq_xid & PTLRPC_BULK_OPS_MASK),
 216                 "bd_last_xid = x%llu, rq_xid = x%llu\n",
 217                 desc->bd_last_xid, req->rq_xid);
 218
 219        spin_lock(&desc->bd_lock);
 220        /* Holler if peer manages to touch buffers before he knows the xid */
 221        if (desc->bd_md_count != total_md)
 222                CWARN("%s: Peer %s touched %d buffers while I registered\n",
 223                      desc->bd_import->imp_obd->obd_name, libcfs_id2str(peer),
 224                      total_md - desc->bd_md_count);
 225        spin_unlock(&desc->bd_lock);
 226
 227        CDEBUG(D_NET, "Setup %u bulk %s buffers: %u pages %u bytes, "
 228               "xid x%#llx-%#llx, portal %u\n", desc->bd_md_count,
 229               desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink",
 230               desc->bd_iov_count, desc->bd_nob,
 231               desc->bd_last_xid, req->rq_xid, desc->bd_portal);
 232
 233        return 0;
 234}
 235EXPORT_SYMBOL(ptlrpc_register_bulk);
 236
 237/**
 238 * Disconnect a bulk desc from the network. Idempotent. Not
 239 * thread-safe (i.e. only interlocks with completion callback).
 240 * Returns 1 on success or 0 if network unregistration failed for whatever
 241 * reason.
 242 */
 243int ptlrpc_unregister_bulk(struct ptlrpc_request *req, int async)
 244{
 245        struct ptlrpc_bulk_desc *desc = req->rq_bulk;
 246        wait_queue_head_t            *wq;
 247        struct l_wait_info       lwi;
 248        int                   rc;
 249
 250        LASSERT(!in_interrupt());     /* might sleep */
 251
 252        /* Let's setup deadline for reply unlink. */
 253        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK) &&
 254            async && req->rq_bulk_deadline == 0)
 255                req->rq_bulk_deadline = get_seconds() + LONG_UNLINK;
 256
 257        if (ptlrpc_client_bulk_active(req) == 0)        /* completed or */
 258                return 1;                               /* never registered */
 259
 260        LASSERT(desc->bd_req == req);  /* bd_req NULL until registered */
 261
 262        /* the unlink ensures the callback happens ASAP and is the last
 263         * one.  If it fails, it must be because completion just happened,
 264         * but we must still l_wait_event() in this case to give liblustre
 265         * a chance to run client_bulk_callback() */
 266        mdunlink_iterate_helper(desc->bd_mds, desc->bd_md_max_brw);
 267
 268        if (ptlrpc_client_bulk_active(req) == 0)        /* completed or */
 269                return 1;                               /* never registered */
 270
 271        /* Move to "Unregistering" phase as bulk was not unlinked yet. */
 272        ptlrpc_rqphase_move(req, RQ_PHASE_UNREGISTERING);
 273
 274        /* Do not wait for unlink to finish. */
 275        if (async)
 276                return 0;
 277
 278        if (req->rq_set != NULL)
 279                wq = &req->rq_set->set_waitq;
 280        else
 281                wq = &req->rq_reply_waitq;
 282
 283        for (;;) {
 284                /* Network access will complete in finite time but the HUGE
 285                 * timeout lets us CWARN for visibility of sluggish NALs */
 286                lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
 287                                           cfs_time_seconds(1), NULL, NULL);
 288                rc = l_wait_event(*wq, !ptlrpc_client_bulk_active(req), &lwi);
 289                if (rc == 0) {
 290                        ptlrpc_rqphase_move(req, req->rq_next_phase);
 291                        return 1;
 292                }
 293
 294                LASSERT(rc == -ETIMEDOUT);
 295                DEBUG_REQ(D_WARNING, req, "Unexpectedly long timeout: desc %p",
 296                          desc);
 297        }
 298        return 0;
 299}
 300EXPORT_SYMBOL(ptlrpc_unregister_bulk);
 301
 302static void ptlrpc_at_set_reply(struct ptlrpc_request *req, int flags)
 303{
 304        struct ptlrpc_service_part      *svcpt = req->rq_rqbd->rqbd_svcpt;
 305        struct ptlrpc_service           *svc = svcpt->scp_service;
 306        int service_time = max_t(int, get_seconds() -
 307                                 req->rq_arrival_time.tv_sec, 1);
 308
 309        if (!(flags & PTLRPC_REPLY_EARLY) &&
 310            (req->rq_type != PTL_RPC_MSG_ERR) &&
 311            (req->rq_reqmsg != NULL) &&
 312            !(lustre_msg_get_flags(req->rq_reqmsg) &
 313              (MSG_RESENT | MSG_REPLAY |
 314               MSG_REQ_REPLAY_DONE | MSG_LOCK_REPLAY_DONE))) {
 315                /* early replies, errors and recovery requests don't count
 316                 * toward our service time estimate */
 317                int oldse = at_measured(&svcpt->scp_at_estimate, service_time);
 318
 319                if (oldse != 0) {
 320                        DEBUG_REQ(D_ADAPTTO, req,
 321                                  "svc %s changed estimate from %d to %d",
 322                                  svc->srv_name, oldse,
 323                                  at_get(&svcpt->scp_at_estimate));
 324                }
 325        }
 326        /* Report actual service time for client latency calc */
 327        lustre_msg_set_service_time(req->rq_repmsg, service_time);
 328        /* Report service time estimate for future client reqs, but report 0
 329         * (to be ignored by client) if it's a error reply during recovery.
 330         * (bz15815) */
 331        if (req->rq_type == PTL_RPC_MSG_ERR &&
 332            (req->rq_export == NULL || req->rq_export->exp_obd->obd_recovering))
 333                lustre_msg_set_timeout(req->rq_repmsg, 0);
 334        else
 335                lustre_msg_set_timeout(req->rq_repmsg,
 336                                       at_get(&svcpt->scp_at_estimate));
 337
 338        if (req->rq_reqmsg &&
 339            !(lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT)) {
 340                CDEBUG(D_ADAPTTO, "No early reply support: flags=%#x "
 341                       "req_flags=%#x magic=%d:%x/%x len=%d\n",
 342                       flags, lustre_msg_get_flags(req->rq_reqmsg),
 343                       lustre_msg_is_v1(req->rq_reqmsg),
 344                       lustre_msg_get_magic(req->rq_reqmsg),
 345                       lustre_msg_get_magic(req->rq_repmsg), req->rq_replen);
 346        }
 347}
 348
 349/**
 350 * Send request reply from request \a req reply buffer.
 351 * \a flags defines reply types
 352 * Returns 0 on success or error code
 353 */
 354int ptlrpc_send_reply(struct ptlrpc_request *req, int flags)
 355{
 356        struct ptlrpc_reply_state *rs = req->rq_reply_state;
 357        struct ptlrpc_connection  *conn;
 358        int                     rc;
 359
 360        /* We must already have a reply buffer (only ptlrpc_error() may be
 361         * called without one). The reply generated by sptlrpc layer (e.g.
 362         * error notify, etc.) might have NULL rq->reqmsg; Otherwise we must
 363         * have a request buffer which is either the actual (swabbed) incoming
 364         * request, or a saved copy if this is a req saved in
 365         * target_queue_final_reply().
 366         */
 367        LASSERT(req->rq_no_reply == 0);
 368        LASSERT(req->rq_reqbuf != NULL);
 369        LASSERT(rs != NULL);
 370        LASSERT((flags & PTLRPC_REPLY_MAYBE_DIFFICULT) || !rs->rs_difficult);
 371        LASSERT(req->rq_repmsg != NULL);
 372        LASSERT(req->rq_repmsg == rs->rs_msg);
 373        LASSERT(rs->rs_cb_id.cbid_fn == reply_out_callback);
 374        LASSERT(rs->rs_cb_id.cbid_arg == rs);
 375
 376        /* There may be no rq_export during failover */
 377
 378        if (unlikely(req->rq_export && req->rq_export->exp_obd &&
 379                     req->rq_export->exp_obd->obd_fail)) {
 380                /* Failed obd's only send ENODEV */
 381                req->rq_type = PTL_RPC_MSG_ERR;
 382                req->rq_status = -ENODEV;
 383                CDEBUG(D_HA, "sending ENODEV from failed obd %d\n",
 384                       req->rq_export->exp_obd->obd_minor);
 385        }
 386
 387        /* In order to keep interoprability with the client (< 2.3) which
 388         * doesn't have pb_jobid in ptlrpc_body, We have to shrink the
 389         * ptlrpc_body in reply buffer to ptlrpc_body_v2, otherwise, the
 390         * reply buffer on client will be overflow.
 391         *
 392         * XXX Remove this whenever we drop the interoprability with such client.
 393         */
 394        req->rq_replen = lustre_shrink_msg(req->rq_repmsg, 0,
 395                                           sizeof(struct ptlrpc_body_v2), 1);
 396
 397        if (req->rq_type != PTL_RPC_MSG_ERR)
 398                req->rq_type = PTL_RPC_MSG_REPLY;
 399
 400        lustre_msg_set_type(req->rq_repmsg, req->rq_type);
 401        lustre_msg_set_status(req->rq_repmsg,
 402                              ptlrpc_status_hton(req->rq_status));
 403        lustre_msg_set_opc(req->rq_repmsg,
 404                req->rq_reqmsg ? lustre_msg_get_opc(req->rq_reqmsg) : 0);
 405
 406        target_pack_pool_reply(req);
 407
 408        ptlrpc_at_set_reply(req, flags);
 409
 410        if (req->rq_export == NULL || req->rq_export->exp_connection == NULL)
 411                conn = ptlrpc_connection_get(req->rq_peer, req->rq_self, NULL);
 412        else
 413                conn = ptlrpc_connection_addref(req->rq_export->exp_connection);
 414
 415        if (unlikely(conn == NULL)) {
 416                CERROR("not replying on NULL connection\n"); /* bug 9635 */
 417                return -ENOTCONN;
 418        }
 419        ptlrpc_rs_addref(rs);              /* +1 ref for the network */
 420
 421        rc = sptlrpc_svc_wrap_reply(req);
 422        if (unlikely(rc))
 423                goto out;
 424
 425        req->rq_sent = get_seconds();
 426
 427        rc = ptl_send_buf(&rs->rs_md_h, rs->rs_repbuf, rs->rs_repdata_len,
 428                          (rs->rs_difficult && !rs->rs_no_ack) ?
 429                          LNET_ACK_REQ : LNET_NOACK_REQ,
 430                          &rs->rs_cb_id, conn,
 431                          ptlrpc_req2svc(req)->srv_rep_portal,
 432                          req->rq_xid, req->rq_reply_off);
 433out:
 434        if (unlikely(rc != 0))
 435                ptlrpc_req_drop_rs(req);
 436        ptlrpc_connection_put(conn);
 437        return rc;
 438}
 439EXPORT_SYMBOL(ptlrpc_send_reply);
 440
 441int ptlrpc_reply(struct ptlrpc_request *req)
 442{
 443        if (req->rq_no_reply)
 444                return 0;
 445        else
 446                return (ptlrpc_send_reply(req, 0));
 447}
 448EXPORT_SYMBOL(ptlrpc_reply);
 449
 450/**
 451 * For request \a req send an error reply back. Create empty
 452 * reply buffers if necessary.
 453 */
 454int ptlrpc_send_error(struct ptlrpc_request *req, int may_be_difficult)
 455{
 456        int rc;
 457
 458        if (req->rq_no_reply)
 459                return 0;
 460
 461        if (!req->rq_repmsg) {
 462                rc = lustre_pack_reply(req, 1, NULL, NULL);
 463                if (rc)
 464                        return rc;
 465        }
 466
 467        if (req->rq_status != -ENOSPC && req->rq_status != -EACCES &&
 468            req->rq_status != -EPERM && req->rq_status != -ENOENT &&
 469            req->rq_status != -EINPROGRESS && req->rq_status != -EDQUOT)
 470                req->rq_type = PTL_RPC_MSG_ERR;
 471
 472        rc = ptlrpc_send_reply(req, may_be_difficult);
 473        return rc;
 474}
 475EXPORT_SYMBOL(ptlrpc_send_error);
 476
 477int ptlrpc_error(struct ptlrpc_request *req)
 478{
 479        return ptlrpc_send_error(req, 0);
 480}
 481EXPORT_SYMBOL(ptlrpc_error);
 482
 483/**
 484 * Send request \a request.
 485 * if \a noreply is set, don't expect any reply back and don't set up
 486 * reply buffers.
 487 * Returns 0 on success or error code.
 488 */
 489int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
 490{
 491        int rc;
 492        int rc2;
 493        int mpflag = 0;
 494        struct ptlrpc_connection *connection;
 495        lnet_handle_me_t  reply_me_h;
 496        lnet_md_t        reply_md;
 497        struct obd_device *obd = request->rq_import->imp_obd;
 498
 499        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_RPC))
 500                return 0;
 501
 502        LASSERT(request->rq_type == PTL_RPC_MSG_REQUEST);
 503        LASSERT(request->rq_wait_ctx == 0);
 504
 505        /* If this is a re-transmit, we're required to have disengaged
 506         * cleanly from the previous attempt */
 507        LASSERT(!request->rq_receiving_reply);
 508        LASSERT(!((lustre_msg_get_flags(request->rq_reqmsg) & MSG_REPLAY) &&
 509                (request->rq_import->imp_state == LUSTRE_IMP_FULL)));
 510
 511        if (unlikely(obd != NULL && obd->obd_fail)) {
 512                CDEBUG(D_HA, "muting rpc for failed imp obd %s\n",
 513                        obd->obd_name);
 514                /* this prevents us from waiting in ptlrpc_queue_wait */
 515                spin_lock(&request->rq_lock);
 516                request->rq_err = 1;
 517                spin_unlock(&request->rq_lock);
 518                request->rq_status = -ENODEV;
 519                return -ENODEV;
 520        }
 521
 522        connection = request->rq_import->imp_connection;
 523
 524        lustre_msg_set_handle(request->rq_reqmsg,
 525                              &request->rq_import->imp_remote_handle);
 526        lustre_msg_set_type(request->rq_reqmsg, PTL_RPC_MSG_REQUEST);
 527        lustre_msg_set_conn_cnt(request->rq_reqmsg,
 528                                request->rq_import->imp_conn_cnt);
 529        lustre_msghdr_set_flags(request->rq_reqmsg,
 530                                request->rq_import->imp_msghdr_flags);
 531
 532        if (request->rq_resend)
 533                lustre_msg_add_flags(request->rq_reqmsg, MSG_RESENT);
 534
 535        if (request->rq_memalloc)
 536                mpflag = cfs_memory_pressure_get_and_set();
 537
 538        rc = sptlrpc_cli_wrap_request(request);
 539        if (rc)
 540                GOTO(out, rc);
 541
 542        /* bulk register should be done after wrap_request() */
 543        if (request->rq_bulk != NULL) {
 544                rc = ptlrpc_register_bulk(request);
 545                if (rc != 0)
 546                        GOTO(out, rc);
 547        }
 548
 549        if (!noreply) {
 550                LASSERT(request->rq_replen != 0);
 551                if (request->rq_repbuf == NULL) {
 552                        LASSERT(request->rq_repdata == NULL);
 553                        LASSERT(request->rq_repmsg == NULL);
 554                        rc = sptlrpc_cli_alloc_repbuf(request,
 555                                                      request->rq_replen);
 556                        if (rc) {
 557                                /* this prevents us from looping in
 558                                 * ptlrpc_queue_wait */
 559                                spin_lock(&request->rq_lock);
 560                                request->rq_err = 1;
 561                                spin_unlock(&request->rq_lock);
 562                                request->rq_status = rc;
 563                                GOTO(cleanup_bulk, rc);
 564                        }
 565                } else {
 566                        request->rq_repdata = NULL;
 567                        request->rq_repmsg = NULL;
 568                }
 569
 570                rc = LNetMEAttach(request->rq_reply_portal,/*XXX FIXME bug 249*/
 571                                  connection->c_peer, request->rq_xid, 0,
 572                                  LNET_UNLINK, LNET_INS_AFTER, &reply_me_h);
 573                if (rc != 0) {
 574                        CERROR("LNetMEAttach failed: %d\n", rc);
 575                        LASSERT(rc == -ENOMEM);
 576                        GOTO(cleanup_bulk, rc = -ENOMEM);
 577                }
 578        }
 579
 580        spin_lock(&request->rq_lock);
 581        /* If the MD attach succeeds, there _will_ be a reply_in callback */
 582        request->rq_receiving_reply = !noreply;
 583        request->rq_req_unlink = 1;
 584        /* We are responsible for unlinking the reply buffer */
 585        request->rq_reply_unlink = !noreply;
 586        /* Clear any flags that may be present from previous sends. */
 587        request->rq_replied = 0;
 588        request->rq_err = 0;
 589        request->rq_timedout = 0;
 590        request->rq_net_err = 0;
 591        request->rq_resend = 0;
 592        request->rq_restart = 0;
 593        request->rq_reply_truncate = 0;
 594        spin_unlock(&request->rq_lock);
 595
 596        if (!noreply) {
 597                reply_md.start     = request->rq_repbuf;
 598                reply_md.length    = request->rq_repbuf_len;
 599                /* Allow multiple early replies */
 600                reply_md.threshold = LNET_MD_THRESH_INF;
 601                /* Manage remote for early replies */
 602                reply_md.options   = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT |
 603                        LNET_MD_MANAGE_REMOTE |
 604                        LNET_MD_TRUNCATE; /* allow to make EOVERFLOW error */;
 605                reply_md.user_ptr  = &request->rq_reply_cbid;
 606                reply_md.eq_handle = ptlrpc_eq_h;
 607
 608                /* We must see the unlink callback to unset rq_reply_unlink,
 609                   so we can't auto-unlink */
 610                rc = LNetMDAttach(reply_me_h, reply_md, LNET_RETAIN,
 611                                  &request->rq_reply_md_h);
 612                if (rc != 0) {
 613                        CERROR("LNetMDAttach failed: %d\n", rc);
 614                        LASSERT(rc == -ENOMEM);
 615                        spin_lock(&request->rq_lock);
 616                        /* ...but the MD attach didn't succeed... */
 617                        request->rq_receiving_reply = 0;
 618                        spin_unlock(&request->rq_lock);
 619                        GOTO(cleanup_me, rc = -ENOMEM);
 620                }
 621
 622                CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %llu, portal %u\n",
 623                       request->rq_repbuf_len, request->rq_xid,
 624                       request->rq_reply_portal);
 625        }
 626
 627        /* add references on request for request_out_callback */
 628        ptlrpc_request_addref(request);
 629        if (obd != NULL && obd->obd_svc_stats != NULL)
 630                lprocfs_counter_add(obd->obd_svc_stats, PTLRPC_REQACTIVE_CNTR,
 631                        atomic_read(&request->rq_import->imp_inflight));
 632
 633        OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_SEND, request->rq_timeout + 5);
 634
 635        do_gettimeofday(&request->rq_arrival_time);
 636        request->rq_sent = get_seconds();
 637        /* We give the server rq_timeout secs to process the req, and
 638           add the network latency for our local timeout. */
 639        request->rq_deadline = request->rq_sent + request->rq_timeout +
 640                ptlrpc_at_get_net_latency(request);
 641
 642        ptlrpc_pinger_sending_on_import(request->rq_import);
 643
 644        DEBUG_REQ(D_INFO, request, "send flg=%x",
 645                  lustre_msg_get_flags(request->rq_reqmsg));
 646        rc = ptl_send_buf(&request->rq_req_md_h,
 647                          request->rq_reqbuf, request->rq_reqdata_len,
 648                          LNET_NOACK_REQ, &request->rq_req_cbid,
 649                          connection,
 650                          request->rq_request_portal,
 651                          request->rq_xid, 0);
 652        if (rc == 0)
 653                GOTO(out, rc);
 654
 655        ptlrpc_req_finished(request);
 656        if (noreply)
 657                GOTO(out, rc);
 658
 659 cleanup_me:
 660        /* MEUnlink is safe; the PUT didn't even get off the ground, and
 661         * nobody apart from the PUT's target has the right nid+XID to
 662         * access the reply buffer. */
 663        rc2 = LNetMEUnlink(reply_me_h);
 664        LASSERT(rc2 == 0);
 665        /* UNLINKED callback called synchronously */
 666        LASSERT(!request->rq_receiving_reply);
 667
 668 cleanup_bulk:
 669        /* We do sync unlink here as there was no real transfer here so
 670         * the chance to have long unlink to sluggish net is smaller here. */
 671        ptlrpc_unregister_bulk(request, 0);
 672 out:
 673        if (request->rq_memalloc)
 674                cfs_memory_pressure_restore(mpflag);
 675        return rc;
 676}
 677EXPORT_SYMBOL(ptl_send_rpc);
 678
 679/**
 680 * Register request buffer descriptor for request receiving.
 681 */
 682int ptlrpc_register_rqbd(struct ptlrpc_request_buffer_desc *rqbd)
 683{
 684        struct ptlrpc_service     *service = rqbd->rqbd_svcpt->scp_service;
 685        static lnet_process_id_t  match_id = {LNET_NID_ANY, LNET_PID_ANY};
 686        int                       rc;
 687        lnet_md_t                md;
 688        lnet_handle_me_t          me_h;
 689
 690        CDEBUG(D_NET, "LNetMEAttach: portal %d\n",
 691               service->srv_req_portal);
 692
 693        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_RQBD))
 694                return (-ENOMEM);
 695
 696        /* NB: CPT affinity service should use new LNet flag LNET_INS_LOCAL,
 697         * which means buffer can only be attached on local CPT, and LND
 698         * threads can find it by grabbing a local lock */
 699        rc = LNetMEAttach(service->srv_req_portal,
 700                          match_id, 0, ~0, LNET_UNLINK,
 701                          rqbd->rqbd_svcpt->scp_cpt >= 0 ?
 702                          LNET_INS_LOCAL : LNET_INS_AFTER, &me_h);
 703        if (rc != 0) {
 704                CERROR("LNetMEAttach failed: %d\n", rc);
 705                return (-ENOMEM);
 706        }
 707
 708        LASSERT(rqbd->rqbd_refcount == 0);
 709        rqbd->rqbd_refcount = 1;
 710
 711        md.start     = rqbd->rqbd_buffer;
 712        md.length    = service->srv_buf_size;
 713        md.max_size  = service->srv_max_req_size;
 714        md.threshold = LNET_MD_THRESH_INF;
 715        md.options   = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT | LNET_MD_MAX_SIZE;
 716        md.user_ptr  = &rqbd->rqbd_cbid;
 717        md.eq_handle = ptlrpc_eq_h;
 718
 719        rc = LNetMDAttach(me_h, md, LNET_UNLINK, &rqbd->rqbd_md_h);
 720        if (rc == 0)
 721                return (0);
 722
 723        CERROR("LNetMDAttach failed: %d;\n", rc);
 724        LASSERT(rc == -ENOMEM);
 725        rc = LNetMEUnlink(me_h);
 726        LASSERT(rc == 0);
 727        rqbd->rqbd_refcount = 0;
 728
 729        return (-ENOMEM);
 730}
 731