linux/drivers/staging/lustre/lustre/ptlrpc/niobuf.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.gnu.org/licenses/gpl-2.0.html
  19 *
  20 * GPL HEADER END
  21 */
  22/*
  23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  24 * Use is subject to license terms.
  25 *
  26 * Copyright (c) 2011, 2015, Intel Corporation.
  27 */
  28/*
  29 * This file is part of Lustre, http://www.lustre.org/
  30 * Lustre is a trademark of Sun Microsystems, Inc.
  31 */
  32
  33#define DEBUG_SUBSYSTEM S_RPC
  34#include "../include/obd_support.h"
  35#include "../include/lustre_net.h"
  36#include "../include/lustre_lib.h"
  37#include "../include/obd.h"
  38#include "../include/obd_class.h"
  39#include "ptlrpc_internal.h"
  40
  41/**
  42 * Helper function. Sends \a len bytes from \a base at offset \a offset
  43 * over \a conn connection to portal \a portal.
  44 * Returns 0 on success or error code.
  45 */
  46static int ptl_send_buf(struct lnet_handle_md *mdh, void *base, int len,
  47                        enum lnet_ack_req ack, struct ptlrpc_cb_id *cbid,
  48                        struct ptlrpc_connection *conn, int portal, __u64 xid,
  49                        unsigned int offset)
  50{
  51        int rc;
  52        struct lnet_md md;
  53
  54        LASSERT(portal != 0);
  55        CDEBUG(D_INFO, "conn=%p id %s\n", conn, libcfs_id2str(conn->c_peer));
  56        md.start = base;
  57        md.length = len;
  58        md.threshold = (ack == LNET_ACK_REQ) ? 2 : 1;
  59        md.options = PTLRPC_MD_OPTIONS;
  60        md.user_ptr = cbid;
  61        md.eq_handle = ptlrpc_eq_h;
  62
  63        if (unlikely(ack == LNET_ACK_REQ &&
  64                     OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_ACK,
  65                                          OBD_FAIL_ONCE))) {
  66                /* don't ask for the ack to simulate failing client */
  67                ack = LNET_NOACK_REQ;
  68        }
  69
  70        rc = LNetMDBind(md, LNET_UNLINK, mdh);
  71        if (unlikely(rc != 0)) {
  72                CERROR("LNetMDBind failed: %d\n", rc);
  73                LASSERT(rc == -ENOMEM);
  74                return -ENOMEM;
  75        }
  76
  77        CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %lld, offset %u\n",
  78               len, portal, xid, offset);
  79
  80        rc = LNetPut(conn->c_self, *mdh, ack,
  81                     conn->c_peer, portal, xid, offset, 0);
  82        if (unlikely(rc != 0)) {
  83                int rc2;
  84                /* We're going to get an UNLINK event when I unlink below,
  85                 * which will complete just like any other failed send, so
  86                 * I fall through and return success here!
  87                 */
  88                CERROR("LNetPut(%s, %d, %lld) failed: %d\n",
  89                       libcfs_id2str(conn->c_peer), portal, xid, rc);
  90                rc2 = LNetMDUnlink(*mdh);
  91                LASSERTF(rc2 == 0, "rc2 = %d\n", rc2);
  92        }
  93
  94        return 0;
  95}
  96
  97static void mdunlink_iterate_helper(struct lnet_handle_md *bd_mds, int count)
  98{
  99        int i;
 100
 101        for (i = 0; i < count; i++)
 102                LNetMDUnlink(bd_mds[i]);
 103}
 104
 105/**
 106 * Register bulk at the sender for later transfer.
 107 * Returns 0 on success or error code.
 108 */
 109static int ptlrpc_register_bulk(struct ptlrpc_request *req)
 110{
 111        struct ptlrpc_bulk_desc *desc = req->rq_bulk;
 112        struct lnet_process_id peer;
 113        int rc = 0;
 114        int rc2;
 115        int posted_md;
 116        int total_md;
 117        u64 mbits;
 118        struct lnet_handle_me me_h;
 119        struct lnet_md md;
 120
 121        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_BULK_GET_NET))
 122                return 0;
 123
 124        /* NB no locking required until desc is on the network */
 125        LASSERT(desc->bd_nob > 0);
 126        LASSERT(desc->bd_md_count == 0);
 127        LASSERT(desc->bd_md_max_brw <= PTLRPC_BULK_OPS_COUNT);
 128        LASSERT(desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES);
 129        LASSERT(desc->bd_req);
 130        LASSERT(ptlrpc_is_bulk_op_passive(desc->bd_type));
 131
 132        /* cleanup the state of the bulk for it will be reused */
 133        if (req->rq_resend || req->rq_send_state == LUSTRE_IMP_REPLAY)
 134                desc->bd_nob_transferred = 0;
 135        else
 136                LASSERT(desc->bd_nob_transferred == 0);
 137
 138        desc->bd_failure = 0;
 139
 140        peer = desc->bd_import->imp_connection->c_peer;
 141
 142        LASSERT(desc->bd_cbid.cbid_fn == client_bulk_callback);
 143        LASSERT(desc->bd_cbid.cbid_arg == desc);
 144
 145        total_md = DIV_ROUND_UP(desc->bd_iov_count, LNET_MAX_IOV);
 146        /* rq_mbits is matchbits of the final bulk */
 147        mbits = req->rq_mbits - total_md + 1;
 148
 149        LASSERTF(mbits == (req->rq_mbits & PTLRPC_BULK_OPS_MASK),
 150                 "first mbits = x%llu, last mbits = x%llu\n",
 151                 mbits, req->rq_mbits);
 152        LASSERTF(!(desc->bd_registered &&
 153                   req->rq_send_state != LUSTRE_IMP_REPLAY) ||
 154                 mbits != desc->bd_last_mbits,
 155                 "registered: %d  rq_mbits: %llu bd_last_mbits: %llu\n",
 156                 desc->bd_registered, mbits, desc->bd_last_mbits);
 157
 158        desc->bd_registered = 1;
 159        desc->bd_last_mbits = mbits;
 160        desc->bd_md_count = total_md;
 161        md.user_ptr = &desc->bd_cbid;
 162        md.eq_handle = ptlrpc_eq_h;
 163        md.threshold = 1;                      /* PUT or GET */
 164
 165        for (posted_md = 0; posted_md < total_md; posted_md++, mbits++) {
 166                md.options = PTLRPC_MD_OPTIONS |
 167                             (ptlrpc_is_bulk_op_get(desc->bd_type) ?
 168                              LNET_MD_OP_GET : LNET_MD_OP_PUT);
 169                ptlrpc_fill_bulk_md(&md, desc, posted_md);
 170
 171                rc = LNetMEAttach(desc->bd_portal, peer, mbits, 0,
 172                                  LNET_UNLINK, LNET_INS_AFTER, &me_h);
 173                if (rc != 0) {
 174                        CERROR("%s: LNetMEAttach failed x%llu/%d: rc = %d\n",
 175                               desc->bd_import->imp_obd->obd_name, mbits,
 176                               posted_md, rc);
 177                        break;
 178                }
 179
 180                /* About to let the network at it... */
 181                rc = LNetMDAttach(me_h, md, LNET_UNLINK,
 182                                  &desc->bd_mds[posted_md]);
 183                if (rc != 0) {
 184                        CERROR("%s: LNetMDAttach failed x%llu/%d: rc = %d\n",
 185                               desc->bd_import->imp_obd->obd_name, mbits,
 186                               posted_md, rc);
 187                        rc2 = LNetMEUnlink(me_h);
 188                        LASSERT(rc2 == 0);
 189                        break;
 190                }
 191        }
 192
 193        if (rc != 0) {
 194                LASSERT(rc == -ENOMEM);
 195                spin_lock(&desc->bd_lock);
 196                desc->bd_md_count -= total_md - posted_md;
 197                spin_unlock(&desc->bd_lock);
 198                LASSERT(desc->bd_md_count >= 0);
 199                mdunlink_iterate_helper(desc->bd_mds, desc->bd_md_max_brw);
 200                req->rq_status = -ENOMEM;
 201                return -ENOMEM;
 202        }
 203
 204        spin_lock(&desc->bd_lock);
 205        /* Holler if peer manages to touch buffers before he knows the mbits */
 206        if (desc->bd_md_count != total_md)
 207                CWARN("%s: Peer %s touched %d buffers while I registered\n",
 208                      desc->bd_import->imp_obd->obd_name, libcfs_id2str(peer),
 209                      total_md - desc->bd_md_count);
 210        spin_unlock(&desc->bd_lock);
 211
 212        CDEBUG(D_NET, "Setup %u bulk %s buffers: %u pages %u bytes, mbits x%#llx-%#llx, portal %u\n",
 213               desc->bd_md_count,
 214               ptlrpc_is_bulk_op_get(desc->bd_type) ? "get-source" : "put-sink",
 215               desc->bd_iov_count, desc->bd_nob,
 216               desc->bd_last_mbits, req->rq_mbits, desc->bd_portal);
 217
 218        return 0;
 219}
 220
 221/**
 222 * Disconnect a bulk desc from the network. Idempotent. Not
 223 * thread-safe (i.e. only interlocks with completion callback).
 224 * Returns 1 on success or 0 if network unregistration failed for whatever
 225 * reason.
 226 */
 227int ptlrpc_unregister_bulk(struct ptlrpc_request *req, int async)
 228{
 229        struct ptlrpc_bulk_desc *desc = req->rq_bulk;
 230        wait_queue_head_t *wq;
 231        struct l_wait_info lwi;
 232        int rc;
 233
 234        LASSERT(!in_interrupt());     /* might sleep */
 235
 236        /* Let's setup deadline for reply unlink. */
 237        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK) &&
 238            async && req->rq_bulk_deadline == 0 && cfs_fail_val == 0)
 239                req->rq_bulk_deadline = ktime_get_real_seconds() + LONG_UNLINK;
 240
 241        if (ptlrpc_client_bulk_active(req) == 0)        /* completed or */
 242                return 1;                               /* never registered */
 243
 244        LASSERT(desc->bd_req == req);  /* bd_req NULL until registered */
 245
 246        /* the unlink ensures the callback happens ASAP and is the last
 247         * one.  If it fails, it must be because completion just happened,
 248         * but we must still l_wait_event() in this case to give liblustre
 249         * a chance to run client_bulk_callback()
 250         */
 251        mdunlink_iterate_helper(desc->bd_mds, desc->bd_md_max_brw);
 252
 253        if (ptlrpc_client_bulk_active(req) == 0)        /* completed or */
 254                return 1;                               /* never registered */
 255
 256        /* Move to "Unregistering" phase as bulk was not unlinked yet. */
 257        ptlrpc_rqphase_move(req, RQ_PHASE_UNREG_BULK);
 258
 259        /* Do not wait for unlink to finish. */
 260        if (async)
 261                return 0;
 262
 263        if (req->rq_set)
 264                wq = &req->rq_set->set_waitq;
 265        else
 266                wq = &req->rq_reply_waitq;
 267
 268        for (;;) {
 269                /* Network access will complete in finite time but the HUGE
 270                 * timeout lets us CWARN for visibility of sluggish LNDs
 271                 */
 272                lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
 273                                           cfs_time_seconds(1), NULL, NULL);
 274                rc = l_wait_event(*wq, !ptlrpc_client_bulk_active(req), &lwi);
 275                if (rc == 0) {
 276                        ptlrpc_rqphase_move(req, req->rq_next_phase);
 277                        return 1;
 278                }
 279
 280                LASSERT(rc == -ETIMEDOUT);
 281                DEBUG_REQ(D_WARNING, req, "Unexpectedly long timeout: desc %p",
 282                          desc);
 283        }
 284        return 0;
 285}
 286
 287static void ptlrpc_at_set_reply(struct ptlrpc_request *req, int flags)
 288{
 289        struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
 290        struct ptlrpc_service *svc = svcpt->scp_service;
 291        int service_time = max_t(int, ktime_get_real_seconds() -
 292                                 req->rq_arrival_time.tv_sec, 1);
 293
 294        if (!(flags & PTLRPC_REPLY_EARLY) &&
 295            (req->rq_type != PTL_RPC_MSG_ERR) && req->rq_reqmsg &&
 296            !(lustre_msg_get_flags(req->rq_reqmsg) &
 297              (MSG_RESENT | MSG_REPLAY |
 298               MSG_REQ_REPLAY_DONE | MSG_LOCK_REPLAY_DONE))) {
 299                /* early replies, errors and recovery requests don't count
 300                 * toward our service time estimate
 301                 */
 302                int oldse = at_measured(&svcpt->scp_at_estimate, service_time);
 303
 304                if (oldse != 0) {
 305                        DEBUG_REQ(D_ADAPTTO, req,
 306                                  "svc %s changed estimate from %d to %d",
 307                                  svc->srv_name, oldse,
 308                                  at_get(&svcpt->scp_at_estimate));
 309                }
 310        }
 311        /* Report actual service time for client latency calc */
 312        lustre_msg_set_service_time(req->rq_repmsg, service_time);
 313        /* Report service time estimate for future client reqs, but report 0
 314         * (to be ignored by client) if it's a error reply during recovery.
 315         * (bz15815)
 316         */
 317        if (req->rq_type == PTL_RPC_MSG_ERR && !req->rq_export)
 318                lustre_msg_set_timeout(req->rq_repmsg, 0);
 319        else
 320                lustre_msg_set_timeout(req->rq_repmsg,
 321                                       at_get(&svcpt->scp_at_estimate));
 322
 323        if (req->rq_reqmsg &&
 324            !(lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT)) {
 325                CDEBUG(D_ADAPTTO, "No early reply support: flags=%#x req_flags=%#x magic=%x/%x len=%d\n",
 326                       flags, lustre_msg_get_flags(req->rq_reqmsg),
 327                       lustre_msg_get_magic(req->rq_reqmsg),
 328                       lustre_msg_get_magic(req->rq_repmsg), req->rq_replen);
 329        }
 330}
 331
 332/**
 333 * Send request reply from request \a req reply buffer.
 334 * \a flags defines reply types
 335 * Returns 0 on success or error code
 336 */
 337int ptlrpc_send_reply(struct ptlrpc_request *req, int flags)
 338{
 339        struct ptlrpc_reply_state *rs = req->rq_reply_state;
 340        struct ptlrpc_connection *conn;
 341        int rc;
 342
 343        /* We must already have a reply buffer (only ptlrpc_error() may be
 344         * called without one). The reply generated by sptlrpc layer (e.g.
 345         * error notify, etc.) might have NULL rq->reqmsg; Otherwise we must
 346         * have a request buffer which is either the actual (swabbed) incoming
 347         * request, or a saved copy if this is a req saved in
 348         * target_queue_final_reply().
 349         */
 350        LASSERT(req->rq_no_reply == 0);
 351        LASSERT(req->rq_reqbuf);
 352        LASSERT(rs);
 353        LASSERT((flags & PTLRPC_REPLY_MAYBE_DIFFICULT) || !rs->rs_difficult);
 354        LASSERT(req->rq_repmsg);
 355        LASSERT(req->rq_repmsg == rs->rs_msg);
 356        LASSERT(rs->rs_cb_id.cbid_fn == reply_out_callback);
 357        LASSERT(rs->rs_cb_id.cbid_arg == rs);
 358
 359        /* There may be no rq_export during failover */
 360
 361        if (unlikely(req->rq_export && req->rq_export->exp_obd &&
 362                     req->rq_export->exp_obd->obd_fail)) {
 363                /* Failed obd's only send ENODEV */
 364                req->rq_type = PTL_RPC_MSG_ERR;
 365                req->rq_status = -ENODEV;
 366                CDEBUG(D_HA, "sending ENODEV from failed obd %d\n",
 367                       req->rq_export->exp_obd->obd_minor);
 368        }
 369
 370        /* In order to keep interoperability with the client (< 2.3) which
 371         * doesn't have pb_jobid in ptlrpc_body, We have to shrink the
 372         * ptlrpc_body in reply buffer to ptlrpc_body_v2, otherwise, the
 373         * reply buffer on client will be overflow.
 374         *
 375         * XXX Remove this whenever we drop the interoperability with
 376         * such client.
 377         */
 378        req->rq_replen = lustre_shrink_msg(req->rq_repmsg, 0,
 379                                           sizeof(struct ptlrpc_body_v2), 1);
 380
 381        if (req->rq_type != PTL_RPC_MSG_ERR)
 382                req->rq_type = PTL_RPC_MSG_REPLY;
 383
 384        lustre_msg_set_type(req->rq_repmsg, req->rq_type);
 385        lustre_msg_set_status(req->rq_repmsg,
 386                              ptlrpc_status_hton(req->rq_status));
 387        lustre_msg_set_opc(req->rq_repmsg,
 388                           req->rq_reqmsg ?
 389                           lustre_msg_get_opc(req->rq_reqmsg) : 0);
 390
 391        target_pack_pool_reply(req);
 392
 393        ptlrpc_at_set_reply(req, flags);
 394
 395        if (!req->rq_export || !req->rq_export->exp_connection)
 396                conn = ptlrpc_connection_get(req->rq_peer, req->rq_self, NULL);
 397        else
 398                conn = ptlrpc_connection_addref(req->rq_export->exp_connection);
 399
 400        if (unlikely(!conn)) {
 401                CERROR("not replying on NULL connection\n"); /* bug 9635 */
 402                return -ENOTCONN;
 403        }
 404        ptlrpc_rs_addref(rs);              /* +1 ref for the network */
 405
 406        rc = sptlrpc_svc_wrap_reply(req);
 407        if (unlikely(rc))
 408                goto out;
 409
 410        req->rq_sent = ktime_get_real_seconds();
 411
 412        rc = ptl_send_buf(&rs->rs_md_h, rs->rs_repbuf, rs->rs_repdata_len,
 413                          (rs->rs_difficult && !rs->rs_no_ack) ?
 414                          LNET_ACK_REQ : LNET_NOACK_REQ,
 415                          &rs->rs_cb_id, conn,
 416                          ptlrpc_req2svc(req)->srv_rep_portal,
 417                          req->rq_xid, req->rq_reply_off);
 418out:
 419        if (unlikely(rc != 0))
 420                ptlrpc_req_drop_rs(req);
 421        ptlrpc_connection_put(conn);
 422        return rc;
 423}
 424
 425int ptlrpc_reply(struct ptlrpc_request *req)
 426{
 427        if (req->rq_no_reply)
 428                return 0;
 429        return ptlrpc_send_reply(req, 0);
 430}
 431
 432/**
 433 * For request \a req send an error reply back. Create empty
 434 * reply buffers if necessary.
 435 */
 436int ptlrpc_send_error(struct ptlrpc_request *req, int may_be_difficult)
 437{
 438        int rc;
 439
 440        if (req->rq_no_reply)
 441                return 0;
 442
 443        if (!req->rq_repmsg) {
 444                rc = lustre_pack_reply(req, 1, NULL, NULL);
 445                if (rc)
 446                        return rc;
 447        }
 448
 449        if (req->rq_status != -ENOSPC && req->rq_status != -EACCES &&
 450            req->rq_status != -EPERM && req->rq_status != -ENOENT &&
 451            req->rq_status != -EINPROGRESS && req->rq_status != -EDQUOT)
 452                req->rq_type = PTL_RPC_MSG_ERR;
 453
 454        rc = ptlrpc_send_reply(req, may_be_difficult);
 455        return rc;
 456}
 457
 458int ptlrpc_error(struct ptlrpc_request *req)
 459{
 460        return ptlrpc_send_error(req, 0);
 461}
 462
 463/**
 464 * Send request \a request.
 465 * if \a noreply is set, don't expect any reply back and don't set up
 466 * reply buffers.
 467 * Returns 0 on success or error code.
 468 */
 469int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
 470{
 471        int rc;
 472        int rc2;
 473        int mpflag = 0;
 474        struct ptlrpc_connection *connection;
 475        struct lnet_handle_me reply_me_h;
 476        struct lnet_md reply_md;
 477        struct obd_import *imp = request->rq_import;
 478        struct obd_device *obd = imp->imp_obd;
 479
 480        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_RPC))
 481                return 0;
 482
 483        LASSERT(request->rq_type == PTL_RPC_MSG_REQUEST);
 484        LASSERT(request->rq_wait_ctx == 0);
 485
 486        /* If this is a re-transmit, we're required to have disengaged
 487         * cleanly from the previous attempt
 488         */
 489        LASSERT(!request->rq_receiving_reply);
 490        LASSERT(!((lustre_msg_get_flags(request->rq_reqmsg) & MSG_REPLAY) &&
 491                  (imp->imp_state == LUSTRE_IMP_FULL)));
 492
 493        if (unlikely(obd && obd->obd_fail)) {
 494                CDEBUG(D_HA, "muting rpc for failed imp obd %s\n",
 495                       obd->obd_name);
 496                /* this prevents us from waiting in ptlrpc_queue_wait */
 497                spin_lock(&request->rq_lock);
 498                request->rq_err = 1;
 499                spin_unlock(&request->rq_lock);
 500                request->rq_status = -ENODEV;
 501                return -ENODEV;
 502        }
 503
 504        connection = imp->imp_connection;
 505
 506        lustre_msg_set_handle(request->rq_reqmsg,
 507                              &imp->imp_remote_handle);
 508        lustre_msg_set_type(request->rq_reqmsg, PTL_RPC_MSG_REQUEST);
 509        lustre_msg_set_conn_cnt(request->rq_reqmsg, imp->imp_conn_cnt);
 510        lustre_msghdr_set_flags(request->rq_reqmsg, imp->imp_msghdr_flags);
 511
 512        /*
 513         * If it's the first time to resend the request for EINPROGRESS,
 514         * we need to allocate a new XID (see after_reply()), it's different
 515         * from the resend for reply timeout.
 516         */
 517        if (request->rq_nr_resend && list_empty(&request->rq_unreplied_list)) {
 518                __u64 min_xid = 0;
 519                /*
 520                 * resend for EINPROGRESS, allocate new xid to avoid reply
 521                 * reconstruction
 522                 */
 523                spin_lock(&imp->imp_lock);
 524                ptlrpc_assign_next_xid_nolock(request);
 525                min_xid = ptlrpc_known_replied_xid(imp);
 526                spin_unlock(&imp->imp_lock);
 527
 528                lustre_msg_set_last_xid(request->rq_reqmsg, min_xid);
 529                DEBUG_REQ(D_RPCTRACE, request, "Allocating new xid for resend on EINPROGRESS");
 530        }
 531
 532        if (request->rq_bulk) {
 533                ptlrpc_set_bulk_mbits(request);
 534                lustre_msg_set_mbits(request->rq_reqmsg, request->rq_mbits);
 535        }
 536
 537        if (list_empty(&request->rq_unreplied_list) ||
 538            request->rq_xid <= imp->imp_known_replied_xid) {
 539                DEBUG_REQ(D_ERROR, request,
 540                          "xid: %llu, replied: %llu, list_empty:%d\n",
 541                          request->rq_xid, imp->imp_known_replied_xid,
 542                          list_empty(&request->rq_unreplied_list));
 543                LBUG();
 544        }
 545
 546        /**
 547         * For enabled AT all request should have AT_SUPPORT in the
 548         * FULL import state when OBD_CONNECT_AT is set
 549         */
 550        LASSERT(AT_OFF || imp->imp_state != LUSTRE_IMP_FULL ||
 551                (imp->imp_msghdr_flags & MSGHDR_AT_SUPPORT) ||
 552                !(imp->imp_connect_data.ocd_connect_flags &
 553                OBD_CONNECT_AT));
 554
 555        if (request->rq_resend)
 556                lustre_msg_add_flags(request->rq_reqmsg, MSG_RESENT);
 557
 558        if (request->rq_memalloc)
 559                mpflag = cfs_memory_pressure_get_and_set();
 560
 561        rc = sptlrpc_cli_wrap_request(request);
 562        if (rc) {
 563                /*
 564                 * set rq_sent so that this request is treated
 565                 * as a delayed send in the upper layers
 566                 */
 567                if (rc == -ENOMEM)
 568                        request->rq_sent = ktime_get_seconds();
 569                goto out;
 570        }
 571
 572        /* bulk register should be done after wrap_request() */
 573        if (request->rq_bulk) {
 574                rc = ptlrpc_register_bulk(request);
 575                if (rc != 0)
 576                        goto out;
 577        }
 578
 579        if (!noreply) {
 580                LASSERT(request->rq_replen != 0);
 581                if (!request->rq_repbuf) {
 582                        LASSERT(!request->rq_repdata);
 583                        LASSERT(!request->rq_repmsg);
 584                        rc = sptlrpc_cli_alloc_repbuf(request,
 585                                                      request->rq_replen);
 586                        if (rc) {
 587                                /* this prevents us from looping in
 588                                 * ptlrpc_queue_wait
 589                                 */
 590                                spin_lock(&request->rq_lock);
 591                                request->rq_err = 1;
 592                                spin_unlock(&request->rq_lock);
 593                                request->rq_status = rc;
 594                                goto cleanup_bulk;
 595                        }
 596                } else {
 597                        request->rq_repdata = NULL;
 598                        request->rq_repmsg = NULL;
 599                }
 600
 601                rc = LNetMEAttach(request->rq_reply_portal,/*XXX FIXME bug 249*/
 602                                  connection->c_peer, request->rq_xid, 0,
 603                                  LNET_UNLINK, LNET_INS_AFTER, &reply_me_h);
 604                if (rc != 0) {
 605                        CERROR("LNetMEAttach failed: %d\n", rc);
 606                        LASSERT(rc == -ENOMEM);
 607                        rc = -ENOMEM;
 608                        goto cleanup_bulk;
 609                }
 610        }
 611
 612        spin_lock(&request->rq_lock);
 613        /* We are responsible for unlinking the reply buffer */
 614        request->rq_reply_unlinked = noreply;
 615        request->rq_receiving_reply = !noreply;
 616        /* Clear any flags that may be present from previous sends. */
 617        request->rq_req_unlinked = 0;
 618        request->rq_replied = 0;
 619        request->rq_err = 0;
 620        request->rq_timedout = 0;
 621        request->rq_net_err = 0;
 622        request->rq_resend = 0;
 623        request->rq_restart = 0;
 624        request->rq_reply_truncated = 0;
 625        spin_unlock(&request->rq_lock);
 626
 627        if (!noreply) {
 628                reply_md.start = request->rq_repbuf;
 629                reply_md.length = request->rq_repbuf_len;
 630                /* Allow multiple early replies */
 631                reply_md.threshold = LNET_MD_THRESH_INF;
 632                /* Manage remote for early replies */
 633                reply_md.options = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT |
 634                        LNET_MD_MANAGE_REMOTE |
 635                        LNET_MD_TRUNCATE; /* allow to make EOVERFLOW error */
 636                reply_md.user_ptr = &request->rq_reply_cbid;
 637                reply_md.eq_handle = ptlrpc_eq_h;
 638
 639                /* We must see the unlink callback to set rq_reply_unlinked,
 640                 * so we can't auto-unlink
 641                 */
 642                rc = LNetMDAttach(reply_me_h, reply_md, LNET_RETAIN,
 643                                  &request->rq_reply_md_h);
 644                if (rc != 0) {
 645                        CERROR("LNetMDAttach failed: %d\n", rc);
 646                        LASSERT(rc == -ENOMEM);
 647                        spin_lock(&request->rq_lock);
 648                        /* ...but the MD attach didn't succeed... */
 649                        request->rq_receiving_reply = 0;
 650                        spin_unlock(&request->rq_lock);
 651                        rc = -ENOMEM;
 652                        goto cleanup_me;
 653                }
 654
 655                CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %llu, portal %u\n",
 656                       request->rq_repbuf_len, request->rq_xid,
 657                       request->rq_reply_portal);
 658        }
 659
 660        /* add references on request for request_out_callback */
 661        ptlrpc_request_addref(request);
 662        if (obd && obd->obd_svc_stats)
 663                lprocfs_counter_add(obd->obd_svc_stats, PTLRPC_REQACTIVE_CNTR,
 664                        atomic_read(&imp->imp_inflight));
 665
 666        OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_SEND, request->rq_timeout + 5);
 667
 668        ktime_get_real_ts64(&request->rq_sent_tv);
 669        request->rq_sent = ktime_get_real_seconds();
 670        /* We give the server rq_timeout secs to process the req, and
 671         * add the network latency for our local timeout.
 672         */
 673        request->rq_deadline = request->rq_sent + request->rq_timeout +
 674                ptlrpc_at_get_net_latency(request);
 675
 676        ptlrpc_pinger_sending_on_import(imp);
 677
 678        DEBUG_REQ(D_INFO, request, "send flg=%x",
 679                  lustre_msg_get_flags(request->rq_reqmsg));
 680        rc = ptl_send_buf(&request->rq_req_md_h,
 681                          request->rq_reqbuf, request->rq_reqdata_len,
 682                          LNET_NOACK_REQ, &request->rq_req_cbid,
 683                          connection,
 684                          request->rq_request_portal,
 685                          request->rq_xid, 0);
 686        if (likely(rc == 0))
 687                goto out;
 688
 689        request->rq_req_unlinked = 1;
 690        ptlrpc_req_finished(request);
 691        if (noreply)
 692                goto out;
 693
 694 cleanup_me:
 695        /* MEUnlink is safe; the PUT didn't even get off the ground, and
 696         * nobody apart from the PUT's target has the right nid+XID to
 697         * access the reply buffer.
 698         */
 699        rc2 = LNetMEUnlink(reply_me_h);
 700        LASSERT(rc2 == 0);
 701        /* UNLINKED callback called synchronously */
 702        LASSERT(!request->rq_receiving_reply);
 703
 704 cleanup_bulk:
 705        /* We do sync unlink here as there was no real transfer here so
 706         * the chance to have long unlink to sluggish net is smaller here.
 707         */
 708        ptlrpc_unregister_bulk(request, 0);
 709 out:
 710        if (request->rq_memalloc)
 711                cfs_memory_pressure_restore(mpflag);
 712        return rc;
 713}
 714EXPORT_SYMBOL(ptl_send_rpc);
 715
 716/**
 717 * Register request buffer descriptor for request receiving.
 718 */
 719int ptlrpc_register_rqbd(struct ptlrpc_request_buffer_desc *rqbd)
 720{
 721        struct ptlrpc_service *service = rqbd->rqbd_svcpt->scp_service;
 722        static struct lnet_process_id match_id = {LNET_NID_ANY, LNET_PID_ANY};
 723        int rc;
 724        struct lnet_md md;
 725        struct lnet_handle_me me_h;
 726
 727        CDEBUG(D_NET, "LNetMEAttach: portal %d\n",
 728               service->srv_req_portal);
 729
 730        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_RQBD))
 731                return -ENOMEM;
 732
 733        /* NB: CPT affinity service should use new LNet flag LNET_INS_LOCAL,
 734         * which means buffer can only be attached on local CPT, and LND
 735         * threads can find it by grabbing a local lock
 736         */
 737        rc = LNetMEAttach(service->srv_req_portal,
 738                          match_id, 0, ~0, LNET_UNLINK,
 739                          rqbd->rqbd_svcpt->scp_cpt >= 0 ?
 740                          LNET_INS_LOCAL : LNET_INS_AFTER, &me_h);
 741        if (rc != 0) {
 742                CERROR("LNetMEAttach failed: %d\n", rc);
 743                return -ENOMEM;
 744        }
 745
 746        LASSERT(rqbd->rqbd_refcount == 0);
 747        rqbd->rqbd_refcount = 1;
 748
 749        md.start = rqbd->rqbd_buffer;
 750        md.length = service->srv_buf_size;
 751        md.max_size = service->srv_max_req_size;
 752        md.threshold = LNET_MD_THRESH_INF;
 753        md.options = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT | LNET_MD_MAX_SIZE;
 754        md.user_ptr = &rqbd->rqbd_cbid;
 755        md.eq_handle = ptlrpc_eq_h;
 756
 757        rc = LNetMDAttach(me_h, md, LNET_UNLINK, &rqbd->rqbd_md_h);
 758        if (rc == 0)
 759                return 0;
 760
 761        CERROR("LNetMDAttach failed: %d;\n", rc);
 762        LASSERT(rc == -ENOMEM);
 763        rc = LNetMEUnlink(me_h);
 764        LASSERT(rc == 0);
 765        rqbd->rqbd_refcount = 0;
 766
 767        return -ENOMEM;
 768}
 769