linux/net/sunrpc/xprtrdma/verbs.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
   2/*
   3 * Copyright (c) 2014-2017 Oracle.  All rights reserved.
   4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   5 *
   6 * This software is available to you under a choice of one of two
   7 * licenses.  You may choose to be licensed under the terms of the GNU
   8 * General Public License (GPL) Version 2, available from the file
   9 * COPYING in the main directory of this source tree, or the BSD-type
  10 * license below:
  11 *
  12 * Redistribution and use in source and binary forms, with or without
  13 * modification, are permitted provided that the following conditions
  14 * are met:
  15 *
  16 *      Redistributions of source code must retain the above copyright
  17 *      notice, this list of conditions and the following disclaimer.
  18 *
  19 *      Redistributions in binary form must reproduce the above
  20 *      copyright notice, this list of conditions and the following
  21 *      disclaimer in the documentation and/or other materials provided
  22 *      with the distribution.
  23 *
  24 *      Neither the name of the Network Appliance, Inc. nor the names of
  25 *      its contributors may be used to endorse or promote products
  26 *      derived from this software without specific prior written
  27 *      permission.
  28 *
  29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  40 */
  41
  42/*
  43 * verbs.c
  44 *
  45 * Encapsulates the major functions managing:
  46 *  o adapters
  47 *  o endpoints
  48 *  o connections
  49 *  o buffer memory
  50 */
  51
  52#include <linux/interrupt.h>
  53#include <linux/slab.h>
  54#include <linux/sunrpc/addr.h>
  55#include <linux/sunrpc/svc_rdma.h>
  56#include <linux/log2.h>
  57
  58#include <asm-generic/barrier.h>
  59#include <asm/bitops.h>
  60
  61#include <rdma/ib_cm.h>
  62
  63#include "xprt_rdma.h"
  64#include <trace/events/rpcrdma.h>
  65
  66/*
  67 * Globals/Macros
  68 */
  69
  70#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  71# define RPCDBG_FACILITY        RPCDBG_TRANS
  72#endif
  73
  74/*
  75 * internal functions
  76 */
  77static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt);
  78static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt);
  79static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
  80                                       struct rpcrdma_sendctx *sc);
  81static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt);
  82static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt);
  83static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep);
  84static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt);
  85static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
  86static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt);
  87static void rpcrdma_ep_get(struct rpcrdma_ep *ep);
  88static int rpcrdma_ep_put(struct rpcrdma_ep *ep);
  89static struct rpcrdma_regbuf *
  90rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
  91                     gfp_t flags);
  92static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb);
  93static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
  94
  95/* Wait for outstanding transport work to finish. ib_drain_qp
  96 * handles the drains in the wrong order for us, so open code
  97 * them here.
  98 */
  99static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
 100{
 101        struct rpcrdma_ep *ep = r_xprt->rx_ep;
 102        struct rdma_cm_id *id = ep->re_id;
 103
 104        /* Flush Receives, then wait for deferred Reply work
 105         * to complete.
 106         */
 107        ib_drain_rq(id->qp);
 108
 109        /* Deferred Reply processing might have scheduled
 110         * local invalidations.
 111         */
 112        ib_drain_sq(id->qp);
 113
 114        rpcrdma_ep_put(ep);
 115}
 116
 117/**
 118 * rpcrdma_qp_event_handler - Handle one QP event (error notification)
 119 * @event: details of the event
 120 * @context: ep that owns QP where event occurred
 121 *
 122 * Called from the RDMA provider (device driver) possibly in an interrupt
 123 * context. The QP is always destroyed before the ID, so the ID will be
 124 * reliably available when this handler is invoked.
 125 */
 126static void rpcrdma_qp_event_handler(struct ib_event *event, void *context)
 127{
 128        struct rpcrdma_ep *ep = context;
 129
 130        trace_xprtrdma_qp_event(ep, event);
 131}
 132
 133/* Ensure xprt_force_disconnect() is invoked exactly once when a
 134 * connection is closed or lost. (The important thing is it needs
 135 * to be invoked "at least" once).
 136 */
 137static void rpcrdma_force_disconnect(struct rpcrdma_ep *ep)
 138{
 139        if (atomic_add_unless(&ep->re_force_disconnect, 1, 1))
 140                xprt_force_disconnect(ep->re_xprt);
 141}
 142
 143/**
 144 * rpcrdma_flush_disconnect - Disconnect on flushed completion
 145 * @r_xprt: transport to disconnect
 146 * @wc: work completion entry
 147 *
 148 * Must be called in process context.
 149 */
 150void rpcrdma_flush_disconnect(struct rpcrdma_xprt *r_xprt, struct ib_wc *wc)
 151{
 152        if (wc->status != IB_WC_SUCCESS)
 153                rpcrdma_force_disconnect(r_xprt->rx_ep);
 154}
 155
 156/**
 157 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
 158 * @cq: completion queue
 159 * @wc: WCE for a completed Send WR
 160 *
 161 */
 162static void rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 163{
 164        struct ib_cqe *cqe = wc->wr_cqe;
 165        struct rpcrdma_sendctx *sc =
 166                container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
 167        struct rpcrdma_xprt *r_xprt = cq->cq_context;
 168
 169        /* WARNING: Only wr_cqe and status are reliable at this point */
 170        trace_xprtrdma_wc_send(sc, wc);
 171        rpcrdma_sendctx_put_locked(r_xprt, sc);
 172        rpcrdma_flush_disconnect(r_xprt, wc);
 173}
 174
 175/**
 176 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
 177 * @cq: completion queue
 178 * @wc: WCE for a completed Receive WR
 179 *
 180 */
 181static void rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 182{
 183        struct ib_cqe *cqe = wc->wr_cqe;
 184        struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
 185                                               rr_cqe);
 186        struct rpcrdma_xprt *r_xprt = cq->cq_context;
 187
 188        /* WARNING: Only wr_cqe and status are reliable at this point */
 189        trace_xprtrdma_wc_receive(wc);
 190        --r_xprt->rx_ep->re_receive_count;
 191        if (wc->status != IB_WC_SUCCESS)
 192                goto out_flushed;
 193
 194        /* status == SUCCESS means all fields in wc are trustworthy */
 195        rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
 196        rep->rr_wc_flags = wc->wc_flags;
 197        rep->rr_inv_rkey = wc->ex.invalidate_rkey;
 198
 199        ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
 200                                   rdmab_addr(rep->rr_rdmabuf),
 201                                   wc->byte_len, DMA_FROM_DEVICE);
 202
 203        rpcrdma_reply_handler(rep);
 204        return;
 205
 206out_flushed:
 207        rpcrdma_flush_disconnect(r_xprt, wc);
 208        rpcrdma_rep_destroy(rep);
 209}
 210
 211static void rpcrdma_update_cm_private(struct rpcrdma_ep *ep,
 212                                      struct rdma_conn_param *param)
 213{
 214        const struct rpcrdma_connect_private *pmsg = param->private_data;
 215        unsigned int rsize, wsize;
 216
 217        /* Default settings for RPC-over-RDMA Version One */
 218        ep->re_implicit_roundup = xprt_rdma_pad_optimize;
 219        rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
 220        wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
 221
 222        if (pmsg &&
 223            pmsg->cp_magic == rpcrdma_cmp_magic &&
 224            pmsg->cp_version == RPCRDMA_CMP_VERSION) {
 225                ep->re_implicit_roundup = true;
 226                rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
 227                wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
 228        }
 229
 230        if (rsize < ep->re_inline_recv)
 231                ep->re_inline_recv = rsize;
 232        if (wsize < ep->re_inline_send)
 233                ep->re_inline_send = wsize;
 234
 235        rpcrdma_set_max_header_sizes(ep);
 236}
 237
 238/**
 239 * rpcrdma_cm_event_handler - Handle RDMA CM events
 240 * @id: rdma_cm_id on which an event has occurred
 241 * @event: details of the event
 242 *
 243 * Called with @id's mutex held. Returns 1 if caller should
 244 * destroy @id, otherwise 0.
 245 */
 246static int
 247rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
 248{
 249        struct sockaddr *sap = (struct sockaddr *)&id->route.addr.dst_addr;
 250        struct rpcrdma_ep *ep = id->context;
 251
 252        might_sleep();
 253
 254        switch (event->event) {
 255        case RDMA_CM_EVENT_ADDR_RESOLVED:
 256        case RDMA_CM_EVENT_ROUTE_RESOLVED:
 257                ep->re_async_rc = 0;
 258                complete(&ep->re_done);
 259                return 0;
 260        case RDMA_CM_EVENT_ADDR_ERROR:
 261                ep->re_async_rc = -EPROTO;
 262                complete(&ep->re_done);
 263                return 0;
 264        case RDMA_CM_EVENT_ROUTE_ERROR:
 265                ep->re_async_rc = -ENETUNREACH;
 266                complete(&ep->re_done);
 267                return 0;
 268        case RDMA_CM_EVENT_DEVICE_REMOVAL:
 269                pr_info("rpcrdma: removing device %s for %pISpc\n",
 270                        ep->re_id->device->name, sap);
 271                fallthrough;
 272        case RDMA_CM_EVENT_ADDR_CHANGE:
 273                ep->re_connect_status = -ENODEV;
 274                goto disconnected;
 275        case RDMA_CM_EVENT_ESTABLISHED:
 276                rpcrdma_ep_get(ep);
 277                ep->re_connect_status = 1;
 278                rpcrdma_update_cm_private(ep, &event->param.conn);
 279                trace_xprtrdma_inline_thresh(ep);
 280                wake_up_all(&ep->re_connect_wait);
 281                break;
 282        case RDMA_CM_EVENT_CONNECT_ERROR:
 283                ep->re_connect_status = -ENOTCONN;
 284                goto wake_connect_worker;
 285        case RDMA_CM_EVENT_UNREACHABLE:
 286                ep->re_connect_status = -ENETUNREACH;
 287                goto wake_connect_worker;
 288        case RDMA_CM_EVENT_REJECTED:
 289                dprintk("rpcrdma: connection to %pISpc rejected: %s\n",
 290                        sap, rdma_reject_msg(id, event->status));
 291                ep->re_connect_status = -ECONNREFUSED;
 292                if (event->status == IB_CM_REJ_STALE_CONN)
 293                        ep->re_connect_status = -ENOTCONN;
 294wake_connect_worker:
 295                wake_up_all(&ep->re_connect_wait);
 296                return 0;
 297        case RDMA_CM_EVENT_DISCONNECTED:
 298                ep->re_connect_status = -ECONNABORTED;
 299disconnected:
 300                rpcrdma_force_disconnect(ep);
 301                return rpcrdma_ep_put(ep);
 302        default:
 303                break;
 304        }
 305
 306        dprintk("RPC:       %s: %pISpc on %s/frwr: %s\n", __func__, sap,
 307                ep->re_id->device->name, rdma_event_msg(event->event));
 308        return 0;
 309}
 310
 311static struct rdma_cm_id *rpcrdma_create_id(struct rpcrdma_xprt *r_xprt,
 312                                            struct rpcrdma_ep *ep)
 313{
 314        unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1;
 315        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 316        struct rdma_cm_id *id;
 317        int rc;
 318
 319        init_completion(&ep->re_done);
 320
 321        id = rdma_create_id(xprt->xprt_net, rpcrdma_cm_event_handler, ep,
 322                            RDMA_PS_TCP, IB_QPT_RC);
 323        if (IS_ERR(id))
 324                return id;
 325
 326        ep->re_async_rc = -ETIMEDOUT;
 327        rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)&xprt->addr,
 328                               RDMA_RESOLVE_TIMEOUT);
 329        if (rc)
 330                goto out;
 331        rc = wait_for_completion_interruptible_timeout(&ep->re_done, wtimeout);
 332        if (rc < 0)
 333                goto out;
 334
 335        rc = ep->re_async_rc;
 336        if (rc)
 337                goto out;
 338
 339        ep->re_async_rc = -ETIMEDOUT;
 340        rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
 341        if (rc)
 342                goto out;
 343        rc = wait_for_completion_interruptible_timeout(&ep->re_done, wtimeout);
 344        if (rc < 0)
 345                goto out;
 346        rc = ep->re_async_rc;
 347        if (rc)
 348                goto out;
 349
 350        return id;
 351
 352out:
 353        rdma_destroy_id(id);
 354        return ERR_PTR(rc);
 355}
 356
 357static void rpcrdma_ep_destroy(struct kref *kref)
 358{
 359        struct rpcrdma_ep *ep = container_of(kref, struct rpcrdma_ep, re_kref);
 360
 361        if (ep->re_id->qp) {
 362                rdma_destroy_qp(ep->re_id);
 363                ep->re_id->qp = NULL;
 364        }
 365
 366        if (ep->re_attr.recv_cq)
 367                ib_free_cq(ep->re_attr.recv_cq);
 368        ep->re_attr.recv_cq = NULL;
 369        if (ep->re_attr.send_cq)
 370                ib_free_cq(ep->re_attr.send_cq);
 371        ep->re_attr.send_cq = NULL;
 372
 373        if (ep->re_pd)
 374                ib_dealloc_pd(ep->re_pd);
 375        ep->re_pd = NULL;
 376
 377        kfree(ep);
 378        module_put(THIS_MODULE);
 379}
 380
 381static noinline void rpcrdma_ep_get(struct rpcrdma_ep *ep)
 382{
 383        kref_get(&ep->re_kref);
 384}
 385
 386/* Returns:
 387 *     %0 if @ep still has a positive kref count, or
 388 *     %1 if @ep was destroyed successfully.
 389 */
 390static noinline int rpcrdma_ep_put(struct rpcrdma_ep *ep)
 391{
 392        return kref_put(&ep->re_kref, rpcrdma_ep_destroy);
 393}
 394
 395static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
 396{
 397        struct rpcrdma_connect_private *pmsg;
 398        struct ib_device *device;
 399        struct rdma_cm_id *id;
 400        struct rpcrdma_ep *ep;
 401        int rc;
 402
 403        ep = kzalloc(sizeof(*ep), GFP_NOFS);
 404        if (!ep)
 405                return -ENOTCONN;
 406        ep->re_xprt = &r_xprt->rx_xprt;
 407        kref_init(&ep->re_kref);
 408
 409        id = rpcrdma_create_id(r_xprt, ep);
 410        if (IS_ERR(id)) {
 411                kfree(ep);
 412                return PTR_ERR(id);
 413        }
 414        __module_get(THIS_MODULE);
 415        device = id->device;
 416        ep->re_id = id;
 417
 418        ep->re_max_requests = r_xprt->rx_xprt.max_reqs;
 419        ep->re_inline_send = xprt_rdma_max_inline_write;
 420        ep->re_inline_recv = xprt_rdma_max_inline_read;
 421        rc = frwr_query_device(ep, device);
 422        if (rc)
 423                goto out_destroy;
 424
 425        r_xprt->rx_buf.rb_max_requests = cpu_to_be32(ep->re_max_requests);
 426
 427        ep->re_attr.event_handler = rpcrdma_qp_event_handler;
 428        ep->re_attr.qp_context = ep;
 429        ep->re_attr.srq = NULL;
 430        ep->re_attr.cap.max_inline_data = 0;
 431        ep->re_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 432        ep->re_attr.qp_type = IB_QPT_RC;
 433        ep->re_attr.port_num = ~0;
 434
 435        dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
 436                "iovs: send %d recv %d\n",
 437                __func__,
 438                ep->re_attr.cap.max_send_wr,
 439                ep->re_attr.cap.max_recv_wr,
 440                ep->re_attr.cap.max_send_sge,
 441                ep->re_attr.cap.max_recv_sge);
 442
 443        ep->re_send_batch = ep->re_max_requests >> 3;
 444        ep->re_send_count = ep->re_send_batch;
 445        init_waitqueue_head(&ep->re_connect_wait);
 446
 447        ep->re_attr.send_cq = ib_alloc_cq_any(device, r_xprt,
 448                                              ep->re_attr.cap.max_send_wr,
 449                                              IB_POLL_WORKQUEUE);
 450        if (IS_ERR(ep->re_attr.send_cq)) {
 451                rc = PTR_ERR(ep->re_attr.send_cq);
 452                goto out_destroy;
 453        }
 454
 455        ep->re_attr.recv_cq = ib_alloc_cq_any(device, r_xprt,
 456                                              ep->re_attr.cap.max_recv_wr,
 457                                              IB_POLL_WORKQUEUE);
 458        if (IS_ERR(ep->re_attr.recv_cq)) {
 459                rc = PTR_ERR(ep->re_attr.recv_cq);
 460                goto out_destroy;
 461        }
 462        ep->re_receive_count = 0;
 463
 464        /* Initialize cma parameters */
 465        memset(&ep->re_remote_cma, 0, sizeof(ep->re_remote_cma));
 466
 467        /* Prepare RDMA-CM private message */
 468        pmsg = &ep->re_cm_private;
 469        pmsg->cp_magic = rpcrdma_cmp_magic;
 470        pmsg->cp_version = RPCRDMA_CMP_VERSION;
 471        pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK;
 472        pmsg->cp_send_size = rpcrdma_encode_buffer_size(ep->re_inline_send);
 473        pmsg->cp_recv_size = rpcrdma_encode_buffer_size(ep->re_inline_recv);
 474        ep->re_remote_cma.private_data = pmsg;
 475        ep->re_remote_cma.private_data_len = sizeof(*pmsg);
 476
 477        /* Client offers RDMA Read but does not initiate */
 478        ep->re_remote_cma.initiator_depth = 0;
 479        ep->re_remote_cma.responder_resources =
 480                min_t(int, U8_MAX, device->attrs.max_qp_rd_atom);
 481
 482        /* Limit transport retries so client can detect server
 483         * GID changes quickly. RPC layer handles re-establishing
 484         * transport connection and retransmission.
 485         */
 486        ep->re_remote_cma.retry_count = 6;
 487
 488        /* RPC-over-RDMA handles its own flow control. In addition,
 489         * make all RNR NAKs visible so we know that RPC-over-RDMA
 490         * flow control is working correctly (no NAKs should be seen).
 491         */
 492        ep->re_remote_cma.flow_control = 0;
 493        ep->re_remote_cma.rnr_retry_count = 0;
 494
 495        ep->re_pd = ib_alloc_pd(device, 0);
 496        if (IS_ERR(ep->re_pd)) {
 497                rc = PTR_ERR(ep->re_pd);
 498                goto out_destroy;
 499        }
 500
 501        rc = rdma_create_qp(id, ep->re_pd, &ep->re_attr);
 502        if (rc)
 503                goto out_destroy;
 504
 505        r_xprt->rx_ep = ep;
 506        return 0;
 507
 508out_destroy:
 509        rpcrdma_ep_put(ep);
 510        rdma_destroy_id(id);
 511        return rc;
 512}
 513
 514/**
 515 * rpcrdma_xprt_connect - Connect an unconnected transport
 516 * @r_xprt: controlling transport instance
 517 *
 518 * Returns 0 on success or a negative errno.
 519 */
 520int rpcrdma_xprt_connect(struct rpcrdma_xprt *r_xprt)
 521{
 522        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 523        struct rpcrdma_ep *ep;
 524        int rc;
 525
 526        rc = rpcrdma_ep_create(r_xprt);
 527        if (rc)
 528                return rc;
 529        ep = r_xprt->rx_ep;
 530
 531        xprt_clear_connected(xprt);
 532        rpcrdma_reset_cwnd(r_xprt);
 533
 534        /* Bump the ep's reference count while there are
 535         * outstanding Receives.
 536         */
 537        rpcrdma_ep_get(ep);
 538        rpcrdma_post_recvs(r_xprt, true);
 539
 540        rc = rdma_connect(ep->re_id, &ep->re_remote_cma);
 541        if (rc)
 542                goto out;
 543
 544        if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
 545                xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 546        wait_event_interruptible(ep->re_connect_wait,
 547                                 ep->re_connect_status != 0);
 548        if (ep->re_connect_status <= 0) {
 549                rc = ep->re_connect_status;
 550                goto out;
 551        }
 552
 553        rc = rpcrdma_sendctxs_create(r_xprt);
 554        if (rc) {
 555                rc = -ENOTCONN;
 556                goto out;
 557        }
 558
 559        rc = rpcrdma_reqs_setup(r_xprt);
 560        if (rc) {
 561                rc = -ENOTCONN;
 562                goto out;
 563        }
 564        rpcrdma_mrs_create(r_xprt);
 565
 566out:
 567        trace_xprtrdma_connect(r_xprt, rc);
 568        return rc;
 569}
 570
 571/**
 572 * rpcrdma_xprt_disconnect - Disconnect underlying transport
 573 * @r_xprt: controlling transport instance
 574 *
 575 * Caller serializes. Either the transport send lock is held,
 576 * or we're being called to destroy the transport.
 577 *
 578 * On return, @r_xprt is completely divested of all hardware
 579 * resources and prepared for the next ->connect operation.
 580 */
 581void rpcrdma_xprt_disconnect(struct rpcrdma_xprt *r_xprt)
 582{
 583        struct rpcrdma_ep *ep = r_xprt->rx_ep;
 584        struct rdma_cm_id *id;
 585        int rc;
 586
 587        if (!ep)
 588                return;
 589
 590        id = ep->re_id;
 591        rc = rdma_disconnect(id);
 592        trace_xprtrdma_disconnect(r_xprt, rc);
 593
 594        rpcrdma_xprt_drain(r_xprt);
 595        rpcrdma_reps_unmap(r_xprt);
 596        rpcrdma_reqs_reset(r_xprt);
 597        rpcrdma_mrs_destroy(r_xprt);
 598        rpcrdma_sendctxs_destroy(r_xprt);
 599
 600        if (rpcrdma_ep_put(ep))
 601                rdma_destroy_id(id);
 602
 603        r_xprt->rx_ep = NULL;
 604}
 605
 606/* Fixed-size circular FIFO queue. This implementation is wait-free and
 607 * lock-free.
 608 *
 609 * Consumer is the code path that posts Sends. This path dequeues a
 610 * sendctx for use by a Send operation. Multiple consumer threads
 611 * are serialized by the RPC transport lock, which allows only one
 612 * ->send_request call at a time.
 613 *
 614 * Producer is the code path that handles Send completions. This path
 615 * enqueues a sendctx that has been completed. Multiple producer
 616 * threads are serialized by the ib_poll_cq() function.
 617 */
 618
 619/* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
 620 * queue activity, and rpcrdma_xprt_drain has flushed all remaining
 621 * Send requests.
 622 */
 623static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt)
 624{
 625        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 626        unsigned long i;
 627
 628        if (!buf->rb_sc_ctxs)
 629                return;
 630        for (i = 0; i <= buf->rb_sc_last; i++)
 631                kfree(buf->rb_sc_ctxs[i]);
 632        kfree(buf->rb_sc_ctxs);
 633        buf->rb_sc_ctxs = NULL;
 634}
 635
 636static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ep *ep)
 637{
 638        struct rpcrdma_sendctx *sc;
 639
 640        sc = kzalloc(struct_size(sc, sc_sges, ep->re_attr.cap.max_send_sge),
 641                     GFP_KERNEL);
 642        if (!sc)
 643                return NULL;
 644
 645        sc->sc_cqe.done = rpcrdma_wc_send;
 646        return sc;
 647}
 648
 649static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
 650{
 651        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 652        struct rpcrdma_sendctx *sc;
 653        unsigned long i;
 654
 655        /* Maximum number of concurrent outstanding Send WRs. Capping
 656         * the circular queue size stops Send Queue overflow by causing
 657         * the ->send_request call to fail temporarily before too many
 658         * Sends are posted.
 659         */
 660        i = r_xprt->rx_ep->re_max_requests + RPCRDMA_MAX_BC_REQUESTS;
 661        buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
 662        if (!buf->rb_sc_ctxs)
 663                return -ENOMEM;
 664
 665        buf->rb_sc_last = i - 1;
 666        for (i = 0; i <= buf->rb_sc_last; i++) {
 667                sc = rpcrdma_sendctx_create(r_xprt->rx_ep);
 668                if (!sc)
 669                        return -ENOMEM;
 670
 671                buf->rb_sc_ctxs[i] = sc;
 672        }
 673
 674        buf->rb_sc_head = 0;
 675        buf->rb_sc_tail = 0;
 676        return 0;
 677}
 678
 679/* The sendctx queue is not guaranteed to have a size that is a
 680 * power of two, thus the helpers in circ_buf.h cannot be used.
 681 * The other option is to use modulus (%), which can be expensive.
 682 */
 683static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
 684                                          unsigned long item)
 685{
 686        return likely(item < buf->rb_sc_last) ? item + 1 : 0;
 687}
 688
 689/**
 690 * rpcrdma_sendctx_get_locked - Acquire a send context
 691 * @r_xprt: controlling transport instance
 692 *
 693 * Returns pointer to a free send completion context; or NULL if
 694 * the queue is empty.
 695 *
 696 * Usage: Called to acquire an SGE array before preparing a Send WR.
 697 *
 698 * The caller serializes calls to this function (per transport), and
 699 * provides an effective memory barrier that flushes the new value
 700 * of rb_sc_head.
 701 */
 702struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
 703{
 704        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 705        struct rpcrdma_sendctx *sc;
 706        unsigned long next_head;
 707
 708        next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
 709
 710        if (next_head == READ_ONCE(buf->rb_sc_tail))
 711                goto out_emptyq;
 712
 713        /* ORDER: item must be accessed _before_ head is updated */
 714        sc = buf->rb_sc_ctxs[next_head];
 715
 716        /* Releasing the lock in the caller acts as a memory
 717         * barrier that flushes rb_sc_head.
 718         */
 719        buf->rb_sc_head = next_head;
 720
 721        return sc;
 722
 723out_emptyq:
 724        /* The queue is "empty" if there have not been enough Send
 725         * completions recently. This is a sign the Send Queue is
 726         * backing up. Cause the caller to pause and try again.
 727         */
 728        xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
 729        r_xprt->rx_stats.empty_sendctx_q++;
 730        return NULL;
 731}
 732
 733/**
 734 * rpcrdma_sendctx_put_locked - Release a send context
 735 * @r_xprt: controlling transport instance
 736 * @sc: send context to release
 737 *
 738 * Usage: Called from Send completion to return a sendctxt
 739 * to the queue.
 740 *
 741 * The caller serializes calls to this function (per transport).
 742 */
 743static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
 744                                       struct rpcrdma_sendctx *sc)
 745{
 746        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 747        unsigned long next_tail;
 748
 749        /* Unmap SGEs of previously completed but unsignaled
 750         * Sends by walking up the queue until @sc is found.
 751         */
 752        next_tail = buf->rb_sc_tail;
 753        do {
 754                next_tail = rpcrdma_sendctx_next(buf, next_tail);
 755
 756                /* ORDER: item must be accessed _before_ tail is updated */
 757                rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]);
 758
 759        } while (buf->rb_sc_ctxs[next_tail] != sc);
 760
 761        /* Paired with READ_ONCE */
 762        smp_store_release(&buf->rb_sc_tail, next_tail);
 763
 764        xprt_write_space(&r_xprt->rx_xprt);
 765}
 766
 767static void
 768rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
 769{
 770        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 771        struct rpcrdma_ep *ep = r_xprt->rx_ep;
 772        unsigned int count;
 773
 774        for (count = 0; count < ep->re_max_rdma_segs; count++) {
 775                struct rpcrdma_mr *mr;
 776                int rc;
 777
 778                mr = kzalloc(sizeof(*mr), GFP_NOFS);
 779                if (!mr)
 780                        break;
 781
 782                rc = frwr_mr_init(r_xprt, mr);
 783                if (rc) {
 784                        kfree(mr);
 785                        break;
 786                }
 787
 788                spin_lock(&buf->rb_lock);
 789                rpcrdma_mr_push(mr, &buf->rb_mrs);
 790                list_add(&mr->mr_all, &buf->rb_all_mrs);
 791                spin_unlock(&buf->rb_lock);
 792        }
 793
 794        r_xprt->rx_stats.mrs_allocated += count;
 795        trace_xprtrdma_createmrs(r_xprt, count);
 796}
 797
 798static void
 799rpcrdma_mr_refresh_worker(struct work_struct *work)
 800{
 801        struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
 802                                                  rb_refresh_worker);
 803        struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
 804                                                   rx_buf);
 805
 806        rpcrdma_mrs_create(r_xprt);
 807        xprt_write_space(&r_xprt->rx_xprt);
 808}
 809
 810/**
 811 * rpcrdma_mrs_refresh - Wake the MR refresh worker
 812 * @r_xprt: controlling transport instance
 813 *
 814 */
 815void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt)
 816{
 817        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 818        struct rpcrdma_ep *ep = r_xprt->rx_ep;
 819
 820        /* If there is no underlying connection, it's no use
 821         * to wake the refresh worker.
 822         */
 823        if (ep->re_connect_status == 1) {
 824                /* The work is scheduled on a WQ_MEM_RECLAIM
 825                 * workqueue in order to prevent MR allocation
 826                 * from recursing into NFS during direct reclaim.
 827                 */
 828                queue_work(xprtiod_workqueue, &buf->rb_refresh_worker);
 829        }
 830}
 831
 832/**
 833 * rpcrdma_req_create - Allocate an rpcrdma_req object
 834 * @r_xprt: controlling r_xprt
 835 * @size: initial size, in bytes, of send and receive buffers
 836 * @flags: GFP flags passed to memory allocators
 837 *
 838 * Returns an allocated and fully initialized rpcrdma_req or NULL.
 839 */
 840struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
 841                                       gfp_t flags)
 842{
 843        struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
 844        struct rpcrdma_req *req;
 845
 846        req = kzalloc(sizeof(*req), flags);
 847        if (req == NULL)
 848                goto out1;
 849
 850        req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags);
 851        if (!req->rl_sendbuf)
 852                goto out2;
 853
 854        req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags);
 855        if (!req->rl_recvbuf)
 856                goto out3;
 857
 858        INIT_LIST_HEAD(&req->rl_free_mrs);
 859        INIT_LIST_HEAD(&req->rl_registered);
 860        spin_lock(&buffer->rb_lock);
 861        list_add(&req->rl_all, &buffer->rb_allreqs);
 862        spin_unlock(&buffer->rb_lock);
 863        return req;
 864
 865out3:
 866        kfree(req->rl_sendbuf);
 867out2:
 868        kfree(req);
 869out1:
 870        return NULL;
 871}
 872
 873/**
 874 * rpcrdma_req_setup - Per-connection instance setup of an rpcrdma_req object
 875 * @r_xprt: controlling transport instance
 876 * @req: rpcrdma_req object to set up
 877 *
 878 * Returns zero on success, and a negative errno on failure.
 879 */
 880int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 881{
 882        struct rpcrdma_regbuf *rb;
 883        size_t maxhdrsize;
 884
 885        /* Compute maximum header buffer size in bytes */
 886        maxhdrsize = rpcrdma_fixed_maxsz + 3 +
 887                     r_xprt->rx_ep->re_max_rdma_segs * rpcrdma_readchunk_maxsz;
 888        maxhdrsize *= sizeof(__be32);
 889        rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
 890                                  DMA_TO_DEVICE, GFP_KERNEL);
 891        if (!rb)
 892                goto out;
 893
 894        if (!__rpcrdma_regbuf_dma_map(r_xprt, rb))
 895                goto out_free;
 896
 897        req->rl_rdmabuf = rb;
 898        xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
 899        return 0;
 900
 901out_free:
 902        rpcrdma_regbuf_free(rb);
 903out:
 904        return -ENOMEM;
 905}
 906
 907/* ASSUMPTION: the rb_allreqs list is stable for the duration,
 908 * and thus can be walked without holding rb_lock. Eg. the
 909 * caller is holding the transport send lock to exclude
 910 * device removal or disconnection.
 911 */
 912static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt)
 913{
 914        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 915        struct rpcrdma_req *req;
 916        int rc;
 917
 918        list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
 919                rc = rpcrdma_req_setup(r_xprt, req);
 920                if (rc)
 921                        return rc;
 922        }
 923        return 0;
 924}
 925
 926static void rpcrdma_req_reset(struct rpcrdma_req *req)
 927{
 928        /* Credits are valid for only one connection */
 929        req->rl_slot.rq_cong = 0;
 930
 931        rpcrdma_regbuf_free(req->rl_rdmabuf);
 932        req->rl_rdmabuf = NULL;
 933
 934        rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
 935        rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
 936
 937        frwr_reset(req);
 938}
 939
 940/* ASSUMPTION: the rb_allreqs list is stable for the duration,
 941 * and thus can be walked without holding rb_lock. Eg. the
 942 * caller is holding the transport send lock to exclude
 943 * device removal or disconnection.
 944 */
 945static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
 946{
 947        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 948        struct rpcrdma_req *req;
 949
 950        list_for_each_entry(req, &buf->rb_allreqs, rl_all)
 951                rpcrdma_req_reset(req);
 952}
 953
 954/* No locking needed here. This function is called only by the
 955 * Receive completion handler.
 956 */
 957static noinline
 958struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
 959                                       bool temp)
 960{
 961        struct rpcrdma_rep *rep;
 962
 963        rep = kzalloc(sizeof(*rep), GFP_KERNEL);
 964        if (rep == NULL)
 965                goto out;
 966
 967        rep->rr_rdmabuf = rpcrdma_regbuf_alloc(r_xprt->rx_ep->re_inline_recv,
 968                                               DMA_FROM_DEVICE, GFP_KERNEL);
 969        if (!rep->rr_rdmabuf)
 970                goto out_free;
 971
 972        if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
 973                goto out_free_regbuf;
 974
 975        xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
 976                     rdmab_length(rep->rr_rdmabuf));
 977        rep->rr_cqe.done = rpcrdma_wc_receive;
 978        rep->rr_rxprt = r_xprt;
 979        rep->rr_recv_wr.next = NULL;
 980        rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
 981        rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
 982        rep->rr_recv_wr.num_sge = 1;
 983        rep->rr_temp = temp;
 984        list_add(&rep->rr_all, &r_xprt->rx_buf.rb_all_reps);
 985        return rep;
 986
 987out_free_regbuf:
 988        rpcrdma_regbuf_free(rep->rr_rdmabuf);
 989out_free:
 990        kfree(rep);
 991out:
 992        return NULL;
 993}
 994
 995/* No locking needed here. This function is invoked only by the
 996 * Receive completion handler, or during transport shutdown.
 997 */
 998static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
 999{
1000        list_del(&rep->rr_all);
1001        rpcrdma_regbuf_free(rep->rr_rdmabuf);
1002        kfree(rep);
1003}
1004
1005static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
1006{
1007        struct llist_node *node;
1008
1009        /* Calls to llist_del_first are required to be serialized */
1010        node = llist_del_first(&buf->rb_free_reps);
1011        if (!node)
1012                return NULL;
1013        return llist_entry(node, struct rpcrdma_rep, rr_node);
1014}
1015
1016static void rpcrdma_rep_put(struct rpcrdma_buffer *buf,
1017                            struct rpcrdma_rep *rep)
1018{
1019        llist_add(&rep->rr_node, &buf->rb_free_reps);
1020}
1021
1022static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt)
1023{
1024        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1025        struct rpcrdma_rep *rep;
1026
1027        list_for_each_entry(rep, &buf->rb_all_reps, rr_all) {
1028                rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
1029                rep->rr_temp = true;
1030        }
1031}
1032
1033static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
1034{
1035        struct rpcrdma_rep *rep;
1036
1037        while ((rep = rpcrdma_rep_get_locked(buf)) != NULL)
1038                rpcrdma_rep_destroy(rep);
1039}
1040
1041/**
1042 * rpcrdma_buffer_create - Create initial set of req/rep objects
1043 * @r_xprt: transport instance to (re)initialize
1044 *
1045 * Returns zero on success, otherwise a negative errno.
1046 */
1047int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1048{
1049        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1050        int i, rc;
1051
1052        buf->rb_bc_srv_max_requests = 0;
1053        spin_lock_init(&buf->rb_lock);
1054        INIT_LIST_HEAD(&buf->rb_mrs);
1055        INIT_LIST_HEAD(&buf->rb_all_mrs);
1056        INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
1057
1058        INIT_LIST_HEAD(&buf->rb_send_bufs);
1059        INIT_LIST_HEAD(&buf->rb_allreqs);
1060        INIT_LIST_HEAD(&buf->rb_all_reps);
1061
1062        rc = -ENOMEM;
1063        for (i = 0; i < r_xprt->rx_xprt.max_reqs; i++) {
1064                struct rpcrdma_req *req;
1065
1066                req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE * 2,
1067                                         GFP_KERNEL);
1068                if (!req)
1069                        goto out;
1070                list_add(&req->rl_list, &buf->rb_send_bufs);
1071        }
1072
1073        init_llist_head(&buf->rb_free_reps);
1074
1075        return 0;
1076out:
1077        rpcrdma_buffer_destroy(buf);
1078        return rc;
1079}
1080
1081/**
1082 * rpcrdma_req_destroy - Destroy an rpcrdma_req object
1083 * @req: unused object to be destroyed
1084 *
1085 * Relies on caller holding the transport send lock to protect
1086 * removing req->rl_all from buf->rb_all_reqs safely.
1087 */
1088void rpcrdma_req_destroy(struct rpcrdma_req *req)
1089{
1090        struct rpcrdma_mr *mr;
1091
1092        list_del(&req->rl_all);
1093
1094        while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) {
1095                struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
1096
1097                spin_lock(&buf->rb_lock);
1098                list_del(&mr->mr_all);
1099                spin_unlock(&buf->rb_lock);
1100
1101                frwr_release_mr(mr);
1102        }
1103
1104        rpcrdma_regbuf_free(req->rl_recvbuf);
1105        rpcrdma_regbuf_free(req->rl_sendbuf);
1106        rpcrdma_regbuf_free(req->rl_rdmabuf);
1107        kfree(req);
1108}
1109
1110/**
1111 * rpcrdma_mrs_destroy - Release all of a transport's MRs
1112 * @r_xprt: controlling transport instance
1113 *
1114 * Relies on caller holding the transport send lock to protect
1115 * removing mr->mr_list from req->rl_free_mrs safely.
1116 */
1117static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt)
1118{
1119        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1120        struct rpcrdma_mr *mr;
1121
1122        cancel_work_sync(&buf->rb_refresh_worker);
1123
1124        spin_lock(&buf->rb_lock);
1125        while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
1126                                              struct rpcrdma_mr,
1127                                              mr_all)) != NULL) {
1128                list_del(&mr->mr_list);
1129                list_del(&mr->mr_all);
1130                spin_unlock(&buf->rb_lock);
1131
1132                frwr_release_mr(mr);
1133
1134                spin_lock(&buf->rb_lock);
1135        }
1136        spin_unlock(&buf->rb_lock);
1137}
1138
1139/**
1140 * rpcrdma_buffer_destroy - Release all hw resources
1141 * @buf: root control block for resources
1142 *
1143 * ORDERING: relies on a prior rpcrdma_xprt_drain :
1144 * - No more Send or Receive completions can occur
1145 * - All MRs, reps, and reqs are returned to their free lists
1146 */
1147void
1148rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1149{
1150        rpcrdma_reps_destroy(buf);
1151
1152        while (!list_empty(&buf->rb_send_bufs)) {
1153                struct rpcrdma_req *req;
1154
1155                req = list_first_entry(&buf->rb_send_bufs,
1156                                       struct rpcrdma_req, rl_list);
1157                list_del(&req->rl_list);
1158                rpcrdma_req_destroy(req);
1159        }
1160}
1161
1162/**
1163 * rpcrdma_mr_get - Allocate an rpcrdma_mr object
1164 * @r_xprt: controlling transport
1165 *
1166 * Returns an initialized rpcrdma_mr or NULL if no free
1167 * rpcrdma_mr objects are available.
1168 */
1169struct rpcrdma_mr *
1170rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
1171{
1172        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1173        struct rpcrdma_mr *mr;
1174
1175        spin_lock(&buf->rb_lock);
1176        mr = rpcrdma_mr_pop(&buf->rb_mrs);
1177        spin_unlock(&buf->rb_lock);
1178        return mr;
1179}
1180
1181/**
1182 * rpcrdma_mr_put - DMA unmap an MR and release it
1183 * @mr: MR to release
1184 *
1185 */
1186void rpcrdma_mr_put(struct rpcrdma_mr *mr)
1187{
1188        struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
1189
1190        if (mr->mr_dir != DMA_NONE) {
1191                trace_xprtrdma_mr_unmap(mr);
1192                ib_dma_unmap_sg(r_xprt->rx_ep->re_id->device,
1193                                mr->mr_sg, mr->mr_nents, mr->mr_dir);
1194                mr->mr_dir = DMA_NONE;
1195        }
1196
1197        rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
1198}
1199
1200/**
1201 * rpcrdma_buffer_get - Get a request buffer
1202 * @buffers: Buffer pool from which to obtain a buffer
1203 *
1204 * Returns a fresh rpcrdma_req, or NULL if none are available.
1205 */
1206struct rpcrdma_req *
1207rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1208{
1209        struct rpcrdma_req *req;
1210
1211        spin_lock(&buffers->rb_lock);
1212        req = list_first_entry_or_null(&buffers->rb_send_bufs,
1213                                       struct rpcrdma_req, rl_list);
1214        if (req)
1215                list_del_init(&req->rl_list);
1216        spin_unlock(&buffers->rb_lock);
1217        return req;
1218}
1219
1220/**
1221 * rpcrdma_buffer_put - Put request/reply buffers back into pool
1222 * @buffers: buffer pool
1223 * @req: object to return
1224 *
1225 */
1226void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
1227{
1228        if (req->rl_reply)
1229                rpcrdma_rep_put(buffers, req->rl_reply);
1230        req->rl_reply = NULL;
1231
1232        spin_lock(&buffers->rb_lock);
1233        list_add(&req->rl_list, &buffers->rb_send_bufs);
1234        spin_unlock(&buffers->rb_lock);
1235}
1236
1237/**
1238 * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
1239 * @rep: rep to release
1240 *
1241 * Used after error conditions.
1242 */
1243void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1244{
1245        rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep);
1246}
1247
1248/* Returns a pointer to a rpcrdma_regbuf object, or NULL.
1249 *
1250 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1251 * receiving the payload of RDMA RECV operations. During Long Calls
1252 * or Replies they may be registered externally via frwr_map.
1253 */
1254static struct rpcrdma_regbuf *
1255rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
1256                     gfp_t flags)
1257{
1258        struct rpcrdma_regbuf *rb;
1259
1260        rb = kmalloc(sizeof(*rb), flags);
1261        if (!rb)
1262                return NULL;
1263        rb->rg_data = kmalloc(size, flags);
1264        if (!rb->rg_data) {
1265                kfree(rb);
1266                return NULL;
1267        }
1268
1269        rb->rg_device = NULL;
1270        rb->rg_direction = direction;
1271        rb->rg_iov.length = size;
1272        return rb;
1273}
1274
1275/**
1276 * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
1277 * @rb: regbuf to reallocate
1278 * @size: size of buffer to be allocated, in bytes
1279 * @flags: GFP flags
1280 *
1281 * Returns true if reallocation was successful. If false is
1282 * returned, @rb is left untouched.
1283 */
1284bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
1285{
1286        void *buf;
1287
1288        buf = kmalloc(size, flags);
1289        if (!buf)
1290                return false;
1291
1292        rpcrdma_regbuf_dma_unmap(rb);
1293        kfree(rb->rg_data);
1294
1295        rb->rg_data = buf;
1296        rb->rg_iov.length = size;
1297        return true;
1298}
1299
1300/**
1301 * __rpcrdma_regbuf_dma_map - DMA-map a regbuf
1302 * @r_xprt: controlling transport instance
1303 * @rb: regbuf to be mapped
1304 *
1305 * Returns true if the buffer is now DMA mapped to @r_xprt's device
1306 */
1307bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
1308                              struct rpcrdma_regbuf *rb)
1309{
1310        struct ib_device *device = r_xprt->rx_ep->re_id->device;
1311
1312        if (rb->rg_direction == DMA_NONE)
1313                return false;
1314
1315        rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb),
1316                                            rdmab_length(rb), rb->rg_direction);
1317        if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
1318                trace_xprtrdma_dma_maperr(rdmab_addr(rb));
1319                return false;
1320        }
1321
1322        rb->rg_device = device;
1323        rb->rg_iov.lkey = r_xprt->rx_ep->re_pd->local_dma_lkey;
1324        return true;
1325}
1326
1327static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb)
1328{
1329        if (!rb)
1330                return;
1331
1332        if (!rpcrdma_regbuf_is_mapped(rb))
1333                return;
1334
1335        ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb),
1336                            rb->rg_direction);
1337        rb->rg_device = NULL;
1338}
1339
1340static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
1341{
1342        rpcrdma_regbuf_dma_unmap(rb);
1343        if (rb)
1344                kfree(rb->rg_data);
1345        kfree(rb);
1346}
1347
1348/**
1349 * rpcrdma_post_sends - Post WRs to a transport's Send Queue
1350 * @r_xprt: controlling transport instance
1351 * @req: rpcrdma_req containing the Send WR to post
1352 *
1353 * Returns 0 if the post was successful, otherwise -ENOTCONN
1354 * is returned.
1355 */
1356int rpcrdma_post_sends(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
1357{
1358        struct ib_send_wr *send_wr = &req->rl_wr;
1359        struct rpcrdma_ep *ep = r_xprt->rx_ep;
1360        int rc;
1361
1362        if (!ep->re_send_count || kref_read(&req->rl_kref) > 1) {
1363                send_wr->send_flags |= IB_SEND_SIGNALED;
1364                ep->re_send_count = ep->re_send_batch;
1365        } else {
1366                send_wr->send_flags &= ~IB_SEND_SIGNALED;
1367                --ep->re_send_count;
1368        }
1369
1370        trace_xprtrdma_post_send(req);
1371        rc = frwr_send(r_xprt, req);
1372        if (rc)
1373                return -ENOTCONN;
1374        return 0;
1375}
1376
1377/**
1378 * rpcrdma_post_recvs - Refill the Receive Queue
1379 * @r_xprt: controlling transport instance
1380 * @temp: mark Receive buffers to be deleted after use
1381 *
1382 */
1383void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
1384{
1385        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1386        struct rpcrdma_ep *ep = r_xprt->rx_ep;
1387        struct ib_recv_wr *wr, *bad_wr;
1388        struct rpcrdma_rep *rep;
1389        int needed, count, rc;
1390
1391        rc = 0;
1392        count = 0;
1393
1394        needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
1395        if (likely(ep->re_receive_count > needed))
1396                goto out;
1397        needed -= ep->re_receive_count;
1398        if (!temp)
1399                needed += RPCRDMA_MAX_RECV_BATCH;
1400
1401        /* fast path: all needed reps can be found on the free list */
1402        wr = NULL;
1403        while (needed) {
1404                rep = rpcrdma_rep_get_locked(buf);
1405                if (rep && rep->rr_temp) {
1406                        rpcrdma_rep_destroy(rep);
1407                        continue;
1408                }
1409                if (!rep)
1410                        rep = rpcrdma_rep_create(r_xprt, temp);
1411                if (!rep)
1412                        break;
1413
1414                trace_xprtrdma_post_recv(rep);
1415                rep->rr_recv_wr.next = wr;
1416                wr = &rep->rr_recv_wr;
1417                --needed;
1418                ++count;
1419        }
1420        if (!wr)
1421                goto out;
1422
1423        rc = ib_post_recv(ep->re_id->qp, wr,
1424                          (const struct ib_recv_wr **)&bad_wr);
1425out:
1426        trace_xprtrdma_post_recvs(r_xprt, count, rc);
1427        if (rc) {
1428                for (wr = bad_wr; wr;) {
1429                        struct rpcrdma_rep *rep;
1430
1431                        rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
1432                        wr = wr->next;
1433                        rpcrdma_recv_buffer_put(rep);
1434                        --count;
1435                }
1436        }
1437        ep->re_receive_count += count;
1438        return;
1439}
1440