linux/net/sunrpc/xprtrdma/verbs.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
   2/*
   3 * Copyright (c) 2014-2017 Oracle.  All rights reserved.
   4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   5 *
   6 * This software is available to you under a choice of one of two
   7 * licenses.  You may choose to be licensed under the terms of the GNU
   8 * General Public License (GPL) Version 2, available from the file
   9 * COPYING in the main directory of this source tree, or the BSD-type
  10 * license below:
  11 *
  12 * Redistribution and use in source and binary forms, with or without
  13 * modification, are permitted provided that the following conditions
  14 * are met:
  15 *
  16 *      Redistributions of source code must retain the above copyright
  17 *      notice, this list of conditions and the following disclaimer.
  18 *
  19 *      Redistributions in binary form must reproduce the above
  20 *      copyright notice, this list of conditions and the following
  21 *      disclaimer in the documentation and/or other materials provided
  22 *      with the distribution.
  23 *
  24 *      Neither the name of the Network Appliance, Inc. nor the names of
  25 *      its contributors may be used to endorse or promote products
  26 *      derived from this software without specific prior written
  27 *      permission.
  28 *
  29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  40 */
  41
  42/*
  43 * verbs.c
  44 *
  45 * Encapsulates the major functions managing:
  46 *  o adapters
  47 *  o endpoints
  48 *  o connections
  49 *  o buffer memory
  50 */
  51
  52#include <linux/interrupt.h>
  53#include <linux/slab.h>
  54#include <linux/sunrpc/addr.h>
  55#include <linux/sunrpc/svc_rdma.h>
  56
  57#include <asm-generic/barrier.h>
  58#include <asm/bitops.h>
  59
  60#include <rdma/ib_cm.h>
  61
  62#include "xprt_rdma.h"
  63#include <trace/events/rpcrdma.h>
  64
  65/*
  66 * Globals/Macros
  67 */
  68
  69#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  70# define RPCDBG_FACILITY        RPCDBG_TRANS
  71#endif
  72
  73/*
  74 * internal functions
  75 */
  76static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
  77static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
  78static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
  79static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
  80static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
  81
  82struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
  83
  84int
  85rpcrdma_alloc_wq(void)
  86{
  87        struct workqueue_struct *recv_wq;
  88
  89        recv_wq = alloc_workqueue("xprtrdma_receive",
  90                                  WQ_MEM_RECLAIM | WQ_HIGHPRI,
  91                                  0);
  92        if (!recv_wq)
  93                return -ENOMEM;
  94
  95        rpcrdma_receive_wq = recv_wq;
  96        return 0;
  97}
  98
  99void
 100rpcrdma_destroy_wq(void)
 101{
 102        struct workqueue_struct *wq;
 103
 104        if (rpcrdma_receive_wq) {
 105                wq = rpcrdma_receive_wq;
 106                rpcrdma_receive_wq = NULL;
 107                destroy_workqueue(wq);
 108        }
 109}
 110
 111static void
 112rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
 113{
 114        struct rpcrdma_ep *ep = context;
 115        struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
 116                                                   rx_ep);
 117
 118        trace_xprtrdma_qp_error(r_xprt, event);
 119        pr_err("rpcrdma: %s on device %s ep %p\n",
 120               ib_event_msg(event->event), event->device->name, context);
 121
 122        if (ep->rep_connected == 1) {
 123                ep->rep_connected = -EIO;
 124                rpcrdma_conn_func(ep);
 125                wake_up_all(&ep->rep_connect_wait);
 126        }
 127}
 128
 129/**
 130 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
 131 * @cq: completion queue (ignored)
 132 * @wc: completed WR
 133 *
 134 */
 135static void
 136rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 137{
 138        struct ib_cqe *cqe = wc->wr_cqe;
 139        struct rpcrdma_sendctx *sc =
 140                container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
 141
 142        /* WARNING: Only wr_cqe and status are reliable at this point */
 143        trace_xprtrdma_wc_send(sc, wc);
 144        if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
 145                pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
 146                       ib_wc_status_msg(wc->status),
 147                       wc->status, wc->vendor_err);
 148
 149        rpcrdma_sendctx_put_locked(sc);
 150}
 151
 152/**
 153 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
 154 * @cq: completion queue (ignored)
 155 * @wc: completed WR
 156 *
 157 */
 158static void
 159rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 160{
 161        struct ib_cqe *cqe = wc->wr_cqe;
 162        struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
 163                                               rr_cqe);
 164
 165        /* WARNING: Only wr_id and status are reliable at this point */
 166        trace_xprtrdma_wc_receive(wc);
 167        if (wc->status != IB_WC_SUCCESS)
 168                goto out_fail;
 169
 170        /* status == SUCCESS means all fields in wc are trustworthy */
 171        rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
 172        rep->rr_wc_flags = wc->wc_flags;
 173        rep->rr_inv_rkey = wc->ex.invalidate_rkey;
 174
 175        ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
 176                                   rdmab_addr(rep->rr_rdmabuf),
 177                                   wc->byte_len, DMA_FROM_DEVICE);
 178
 179out_schedule:
 180        rpcrdma_reply_handler(rep);
 181        return;
 182
 183out_fail:
 184        if (wc->status != IB_WC_WR_FLUSH_ERR)
 185                pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
 186                       ib_wc_status_msg(wc->status),
 187                       wc->status, wc->vendor_err);
 188        rpcrdma_set_xdrlen(&rep->rr_hdrbuf, 0);
 189        goto out_schedule;
 190}
 191
 192static void
 193rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
 194                               struct rdma_conn_param *param)
 195{
 196        struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
 197        const struct rpcrdma_connect_private *pmsg = param->private_data;
 198        unsigned int rsize, wsize;
 199
 200        /* Default settings for RPC-over-RDMA Version One */
 201        r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize;
 202        rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
 203        wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
 204
 205        if (pmsg &&
 206            pmsg->cp_magic == rpcrdma_cmp_magic &&
 207            pmsg->cp_version == RPCRDMA_CMP_VERSION) {
 208                r_xprt->rx_ia.ri_implicit_roundup = true;
 209                rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
 210                wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
 211        }
 212
 213        if (rsize < cdata->inline_rsize)
 214                cdata->inline_rsize = rsize;
 215        if (wsize < cdata->inline_wsize)
 216                cdata->inline_wsize = wsize;
 217        dprintk("RPC:       %s: max send %u, max recv %u\n",
 218                __func__, cdata->inline_wsize, cdata->inline_rsize);
 219        rpcrdma_set_max_header_sizes(r_xprt);
 220}
 221
 222static int
 223rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 224{
 225        struct rpcrdma_xprt *xprt = id->context;
 226        struct rpcrdma_ia *ia = &xprt->rx_ia;
 227        struct rpcrdma_ep *ep = &xprt->rx_ep;
 228        int connstate = 0;
 229
 230        trace_xprtrdma_conn_upcall(xprt, event);
 231        switch (event->event) {
 232        case RDMA_CM_EVENT_ADDR_RESOLVED:
 233        case RDMA_CM_EVENT_ROUTE_RESOLVED:
 234                ia->ri_async_rc = 0;
 235                complete(&ia->ri_done);
 236                break;
 237        case RDMA_CM_EVENT_ADDR_ERROR:
 238                ia->ri_async_rc = -EPROTO;
 239                complete(&ia->ri_done);
 240                break;
 241        case RDMA_CM_EVENT_ROUTE_ERROR:
 242                ia->ri_async_rc = -ENETUNREACH;
 243                complete(&ia->ri_done);
 244                break;
 245        case RDMA_CM_EVENT_DEVICE_REMOVAL:
 246#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 247                pr_info("rpcrdma: removing device %s for %s:%s\n",
 248                        ia->ri_device->name,
 249                        rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt));
 250#endif
 251                set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
 252                ep->rep_connected = -ENODEV;
 253                xprt_force_disconnect(&xprt->rx_xprt);
 254                wait_for_completion(&ia->ri_remove_done);
 255
 256                ia->ri_id = NULL;
 257                ia->ri_device = NULL;
 258                /* Return 1 to ensure the core destroys the id. */
 259                return 1;
 260        case RDMA_CM_EVENT_ESTABLISHED:
 261                ++xprt->rx_xprt.connect_cookie;
 262                connstate = 1;
 263                rpcrdma_update_connect_private(xprt, &event->param.conn);
 264                goto connected;
 265        case RDMA_CM_EVENT_CONNECT_ERROR:
 266                connstate = -ENOTCONN;
 267                goto connected;
 268        case RDMA_CM_EVENT_UNREACHABLE:
 269                connstate = -ENETUNREACH;
 270                goto connected;
 271        case RDMA_CM_EVENT_REJECTED:
 272                dprintk("rpcrdma: connection to %s:%s rejected: %s\n",
 273                        rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt),
 274                        rdma_reject_msg(id, event->status));
 275                connstate = -ECONNREFUSED;
 276                if (event->status == IB_CM_REJ_STALE_CONN)
 277                        connstate = -EAGAIN;
 278                goto connected;
 279        case RDMA_CM_EVENT_DISCONNECTED:
 280                ++xprt->rx_xprt.connect_cookie;
 281                connstate = -ECONNABORTED;
 282connected:
 283                xprt->rx_buf.rb_credits = 1;
 284                ep->rep_connected = connstate;
 285                rpcrdma_conn_func(ep);
 286                wake_up_all(&ep->rep_connect_wait);
 287                /*FALLTHROUGH*/
 288        default:
 289                dprintk("RPC:       %s: %s:%s on %s/%s (ep 0x%p): %s\n",
 290                        __func__,
 291                        rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt),
 292                        ia->ri_device->name, ia->ri_ops->ro_displayname,
 293                        ep, rdma_event_msg(event->event));
 294                break;
 295        }
 296
 297        return 0;
 298}
 299
 300static struct rdma_cm_id *
 301rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
 302{
 303        unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1;
 304        struct rdma_cm_id *id;
 305        int rc;
 306
 307        trace_xprtrdma_conn_start(xprt);
 308
 309        init_completion(&ia->ri_done);
 310        init_completion(&ia->ri_remove_done);
 311
 312        id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_conn_upcall,
 313                            xprt, RDMA_PS_TCP, IB_QPT_RC);
 314        if (IS_ERR(id)) {
 315                rc = PTR_ERR(id);
 316                dprintk("RPC:       %s: rdma_create_id() failed %i\n",
 317                        __func__, rc);
 318                return id;
 319        }
 320
 321        ia->ri_async_rc = -ETIMEDOUT;
 322        rc = rdma_resolve_addr(id, NULL,
 323                               (struct sockaddr *)&xprt->rx_xprt.addr,
 324                               RDMA_RESOLVE_TIMEOUT);
 325        if (rc) {
 326                dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
 327                        __func__, rc);
 328                goto out;
 329        }
 330        rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
 331        if (rc < 0) {
 332                trace_xprtrdma_conn_tout(xprt);
 333                goto out;
 334        }
 335
 336        rc = ia->ri_async_rc;
 337        if (rc)
 338                goto out;
 339
 340        ia->ri_async_rc = -ETIMEDOUT;
 341        rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
 342        if (rc) {
 343                dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
 344                        __func__, rc);
 345                goto out;
 346        }
 347        rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
 348        if (rc < 0) {
 349                trace_xprtrdma_conn_tout(xprt);
 350                goto out;
 351        }
 352        rc = ia->ri_async_rc;
 353        if (rc)
 354                goto out;
 355
 356        return id;
 357
 358out:
 359        rdma_destroy_id(id);
 360        return ERR_PTR(rc);
 361}
 362
 363/*
 364 * Exported functions.
 365 */
 366
 367/**
 368 * rpcrdma_ia_open - Open and initialize an Interface Adapter.
 369 * @xprt: transport with IA to (re)initialize
 370 *
 371 * Returns 0 on success, negative errno if an appropriate
 372 * Interface Adapter could not be found and opened.
 373 */
 374int
 375rpcrdma_ia_open(struct rpcrdma_xprt *xprt)
 376{
 377        struct rpcrdma_ia *ia = &xprt->rx_ia;
 378        int rc;
 379
 380        ia->ri_id = rpcrdma_create_id(xprt, ia);
 381        if (IS_ERR(ia->ri_id)) {
 382                rc = PTR_ERR(ia->ri_id);
 383                goto out_err;
 384        }
 385        ia->ri_device = ia->ri_id->device;
 386
 387        ia->ri_pd = ib_alloc_pd(ia->ri_device, 0);
 388        if (IS_ERR(ia->ri_pd)) {
 389                rc = PTR_ERR(ia->ri_pd);
 390                pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
 391                goto out_err;
 392        }
 393
 394        switch (xprt_rdma_memreg_strategy) {
 395        case RPCRDMA_FRWR:
 396                if (frwr_is_supported(ia)) {
 397                        ia->ri_ops = &rpcrdma_frwr_memreg_ops;
 398                        break;
 399                }
 400                /*FALLTHROUGH*/
 401        case RPCRDMA_MTHCAFMR:
 402                if (fmr_is_supported(ia)) {
 403                        ia->ri_ops = &rpcrdma_fmr_memreg_ops;
 404                        break;
 405                }
 406                /*FALLTHROUGH*/
 407        default:
 408                pr_err("rpcrdma: Device %s does not support memreg mode %d\n",
 409                       ia->ri_device->name, xprt_rdma_memreg_strategy);
 410                rc = -EINVAL;
 411                goto out_err;
 412        }
 413
 414        return 0;
 415
 416out_err:
 417        rpcrdma_ia_close(ia);
 418        return rc;
 419}
 420
 421/**
 422 * rpcrdma_ia_remove - Handle device driver unload
 423 * @ia: interface adapter being removed
 424 *
 425 * Divest transport H/W resources associated with this adapter,
 426 * but allow it to be restored later.
 427 */
 428void
 429rpcrdma_ia_remove(struct rpcrdma_ia *ia)
 430{
 431        struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
 432                                                   rx_ia);
 433        struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 434        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 435        struct rpcrdma_req *req;
 436        struct rpcrdma_rep *rep;
 437
 438        cancel_delayed_work_sync(&buf->rb_refresh_worker);
 439
 440        /* This is similar to rpcrdma_ep_destroy, but:
 441         * - Don't cancel the connect worker.
 442         * - Don't call rpcrdma_ep_disconnect, which waits
 443         *   for another conn upcall, which will deadlock.
 444         * - rdma_disconnect is unneeded, the underlying
 445         *   connection is already gone.
 446         */
 447        if (ia->ri_id->qp) {
 448                ib_drain_qp(ia->ri_id->qp);
 449                rdma_destroy_qp(ia->ri_id);
 450                ia->ri_id->qp = NULL;
 451        }
 452        ib_free_cq(ep->rep_attr.recv_cq);
 453        ep->rep_attr.recv_cq = NULL;
 454        ib_free_cq(ep->rep_attr.send_cq);
 455        ep->rep_attr.send_cq = NULL;
 456
 457        /* The ULP is responsible for ensuring all DMA
 458         * mappings and MRs are gone.
 459         */
 460        list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list)
 461                rpcrdma_dma_unmap_regbuf(rep->rr_rdmabuf);
 462        list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
 463                rpcrdma_dma_unmap_regbuf(req->rl_rdmabuf);
 464                rpcrdma_dma_unmap_regbuf(req->rl_sendbuf);
 465                rpcrdma_dma_unmap_regbuf(req->rl_recvbuf);
 466        }
 467        rpcrdma_mrs_destroy(buf);
 468        ib_dealloc_pd(ia->ri_pd);
 469        ia->ri_pd = NULL;
 470
 471        /* Allow waiters to continue */
 472        complete(&ia->ri_remove_done);
 473
 474        trace_xprtrdma_remove(r_xprt);
 475}
 476
 477/**
 478 * rpcrdma_ia_close - Clean up/close an IA.
 479 * @ia: interface adapter to close
 480 *
 481 */
 482void
 483rpcrdma_ia_close(struct rpcrdma_ia *ia)
 484{
 485        if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
 486                if (ia->ri_id->qp)
 487                        rdma_destroy_qp(ia->ri_id);
 488                rdma_destroy_id(ia->ri_id);
 489        }
 490        ia->ri_id = NULL;
 491        ia->ri_device = NULL;
 492
 493        /* If the pd is still busy, xprtrdma missed freeing a resource */
 494        if (ia->ri_pd && !IS_ERR(ia->ri_pd))
 495                ib_dealloc_pd(ia->ri_pd);
 496        ia->ri_pd = NULL;
 497}
 498
 499/*
 500 * Create unconnected endpoint.
 501 */
 502int
 503rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 504                  struct rpcrdma_create_data_internal *cdata)
 505{
 506        struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
 507        struct ib_cq *sendcq, *recvcq;
 508        unsigned int max_sge;
 509        int rc;
 510
 511        max_sge = min_t(unsigned int, ia->ri_device->attrs.max_sge,
 512                        RPCRDMA_MAX_SEND_SGES);
 513        if (max_sge < RPCRDMA_MIN_SEND_SGES) {
 514                pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge);
 515                return -ENOMEM;
 516        }
 517        ia->ri_max_send_sges = max_sge;
 518
 519        rc = ia->ri_ops->ro_open(ia, ep, cdata);
 520        if (rc)
 521                return rc;
 522
 523        ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
 524        ep->rep_attr.qp_context = ep;
 525        ep->rep_attr.srq = NULL;
 526        ep->rep_attr.cap.max_send_sge = max_sge;
 527        ep->rep_attr.cap.max_recv_sge = 1;
 528        ep->rep_attr.cap.max_inline_data = 0;
 529        ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 530        ep->rep_attr.qp_type = IB_QPT_RC;
 531        ep->rep_attr.port_num = ~0;
 532
 533        dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
 534                "iovs: send %d recv %d\n",
 535                __func__,
 536                ep->rep_attr.cap.max_send_wr,
 537                ep->rep_attr.cap.max_recv_wr,
 538                ep->rep_attr.cap.max_send_sge,
 539                ep->rep_attr.cap.max_recv_sge);
 540
 541        /* set trigger for requesting send completion */
 542        ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH,
 543                                   cdata->max_requests >> 2);
 544        ep->rep_send_count = ep->rep_send_batch;
 545        init_waitqueue_head(&ep->rep_connect_wait);
 546        INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
 547
 548        sendcq = ib_alloc_cq(ia->ri_device, NULL,
 549                             ep->rep_attr.cap.max_send_wr + 1,
 550                             1, IB_POLL_WORKQUEUE);
 551        if (IS_ERR(sendcq)) {
 552                rc = PTR_ERR(sendcq);
 553                dprintk("RPC:       %s: failed to create send CQ: %i\n",
 554                        __func__, rc);
 555                goto out1;
 556        }
 557
 558        recvcq = ib_alloc_cq(ia->ri_device, NULL,
 559                             ep->rep_attr.cap.max_recv_wr + 1,
 560                             0, IB_POLL_WORKQUEUE);
 561        if (IS_ERR(recvcq)) {
 562                rc = PTR_ERR(recvcq);
 563                dprintk("RPC:       %s: failed to create recv CQ: %i\n",
 564                        __func__, rc);
 565                goto out2;
 566        }
 567
 568        ep->rep_attr.send_cq = sendcq;
 569        ep->rep_attr.recv_cq = recvcq;
 570
 571        /* Initialize cma parameters */
 572        memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
 573
 574        /* Prepare RDMA-CM private message */
 575        pmsg->cp_magic = rpcrdma_cmp_magic;
 576        pmsg->cp_version = RPCRDMA_CMP_VERSION;
 577        pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok;
 578        pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
 579        pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
 580        ep->rep_remote_cma.private_data = pmsg;
 581        ep->rep_remote_cma.private_data_len = sizeof(*pmsg);
 582
 583        /* Client offers RDMA Read but does not initiate */
 584        ep->rep_remote_cma.initiator_depth = 0;
 585        ep->rep_remote_cma.responder_resources =
 586                min_t(int, U8_MAX, ia->ri_device->attrs.max_qp_rd_atom);
 587
 588        /* Limit transport retries so client can detect server
 589         * GID changes quickly. RPC layer handles re-establishing
 590         * transport connection and retransmission.
 591         */
 592        ep->rep_remote_cma.retry_count = 6;
 593
 594        /* RPC-over-RDMA handles its own flow control. In addition,
 595         * make all RNR NAKs visible so we know that RPC-over-RDMA
 596         * flow control is working correctly (no NAKs should be seen).
 597         */
 598        ep->rep_remote_cma.flow_control = 0;
 599        ep->rep_remote_cma.rnr_retry_count = 0;
 600
 601        return 0;
 602
 603out2:
 604        ib_free_cq(sendcq);
 605out1:
 606        return rc;
 607}
 608
 609/*
 610 * rpcrdma_ep_destroy
 611 *
 612 * Disconnect and destroy endpoint. After this, the only
 613 * valid operations on the ep are to free it (if dynamically
 614 * allocated) or re-create it.
 615 */
 616void
 617rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 618{
 619        cancel_delayed_work_sync(&ep->rep_connect_worker);
 620
 621        if (ia->ri_id && ia->ri_id->qp) {
 622                rpcrdma_ep_disconnect(ep, ia);
 623                rdma_destroy_qp(ia->ri_id);
 624                ia->ri_id->qp = NULL;
 625        }
 626
 627        if (ep->rep_attr.recv_cq)
 628                ib_free_cq(ep->rep_attr.recv_cq);
 629        if (ep->rep_attr.send_cq)
 630                ib_free_cq(ep->rep_attr.send_cq);
 631}
 632
 633/* Re-establish a connection after a device removal event.
 634 * Unlike a normal reconnection, a fresh PD and a new set
 635 * of MRs and buffers is needed.
 636 */
 637static int
 638rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
 639                         struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 640{
 641        int rc, err;
 642
 643        trace_xprtrdma_reinsert(r_xprt);
 644
 645        rc = -EHOSTUNREACH;
 646        if (rpcrdma_ia_open(r_xprt))
 647                goto out1;
 648
 649        rc = -ENOMEM;
 650        err = rpcrdma_ep_create(ep, ia, &r_xprt->rx_data);
 651        if (err) {
 652                pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err);
 653                goto out2;
 654        }
 655
 656        rc = -ENETUNREACH;
 657        err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
 658        if (err) {
 659                pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
 660                goto out3;
 661        }
 662
 663        rpcrdma_mrs_create(r_xprt);
 664        return 0;
 665
 666out3:
 667        rpcrdma_ep_destroy(ep, ia);
 668out2:
 669        rpcrdma_ia_close(ia);
 670out1:
 671        return rc;
 672}
 673
 674static int
 675rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
 676                     struct rpcrdma_ia *ia)
 677{
 678        struct rdma_cm_id *id, *old;
 679        int err, rc;
 680
 681        trace_xprtrdma_reconnect(r_xprt);
 682
 683        rpcrdma_ep_disconnect(ep, ia);
 684
 685        rc = -EHOSTUNREACH;
 686        id = rpcrdma_create_id(r_xprt, ia);
 687        if (IS_ERR(id))
 688                goto out;
 689
 690        /* As long as the new ID points to the same device as the
 691         * old ID, we can reuse the transport's existing PD and all
 692         * previously allocated MRs. Also, the same device means
 693         * the transport's previous DMA mappings are still valid.
 694         *
 695         * This is a sanity check only. There should be no way these
 696         * point to two different devices here.
 697         */
 698        old = id;
 699        rc = -ENETUNREACH;
 700        if (ia->ri_device != id->device) {
 701                pr_err("rpcrdma: can't reconnect on different device!\n");
 702                goto out_destroy;
 703        }
 704
 705        err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
 706        if (err) {
 707                dprintk("RPC:       %s: rdma_create_qp returned %d\n",
 708                        __func__, err);
 709                goto out_destroy;
 710        }
 711
 712        /* Atomically replace the transport's ID and QP. */
 713        rc = 0;
 714        old = ia->ri_id;
 715        ia->ri_id = id;
 716        rdma_destroy_qp(old);
 717
 718out_destroy:
 719        rdma_destroy_id(old);
 720out:
 721        return rc;
 722}
 723
 724/*
 725 * Connect unconnected endpoint.
 726 */
 727int
 728rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 729{
 730        struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
 731                                                   rx_ia);
 732        int rc;
 733
 734retry:
 735        switch (ep->rep_connected) {
 736        case 0:
 737                dprintk("RPC:       %s: connecting...\n", __func__);
 738                rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
 739                if (rc) {
 740                        dprintk("RPC:       %s: rdma_create_qp failed %i\n",
 741                                __func__, rc);
 742                        rc = -ENETUNREACH;
 743                        goto out_noupdate;
 744                }
 745                break;
 746        case -ENODEV:
 747                rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia);
 748                if (rc)
 749                        goto out_noupdate;
 750                break;
 751        default:
 752                rc = rpcrdma_ep_reconnect(r_xprt, ep, ia);
 753                if (rc)
 754                        goto out;
 755        }
 756
 757        ep->rep_connected = 0;
 758
 759        rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
 760        if (rc) {
 761                dprintk("RPC:       %s: rdma_connect() failed with %i\n",
 762                                __func__, rc);
 763                goto out;
 764        }
 765
 766        wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
 767        if (ep->rep_connected <= 0) {
 768                if (ep->rep_connected == -EAGAIN)
 769                        goto retry;
 770                rc = ep->rep_connected;
 771                goto out;
 772        }
 773
 774        dprintk("RPC:       %s: connected\n", __func__);
 775
 776        rpcrdma_post_recvs(r_xprt, true);
 777
 778out:
 779        if (rc)
 780                ep->rep_connected = rc;
 781
 782out_noupdate:
 783        return rc;
 784}
 785
 786/*
 787 * rpcrdma_ep_disconnect
 788 *
 789 * This is separate from destroy to facilitate the ability
 790 * to reconnect without recreating the endpoint.
 791 *
 792 * This call is not reentrant, and must not be made in parallel
 793 * on the same endpoint.
 794 */
 795void
 796rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 797{
 798        int rc;
 799
 800        rc = rdma_disconnect(ia->ri_id);
 801        if (!rc)
 802                /* returns without wait if not connected */
 803                wait_event_interruptible(ep->rep_connect_wait,
 804                                                        ep->rep_connected != 1);
 805        else
 806                ep->rep_connected = rc;
 807        trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt,
 808                                               rx_ep), rc);
 809
 810        ib_drain_qp(ia->ri_id->qp);
 811}
 812
 813/* Fixed-size circular FIFO queue. This implementation is wait-free and
 814 * lock-free.
 815 *
 816 * Consumer is the code path that posts Sends. This path dequeues a
 817 * sendctx for use by a Send operation. Multiple consumer threads
 818 * are serialized by the RPC transport lock, which allows only one
 819 * ->send_request call at a time.
 820 *
 821 * Producer is the code path that handles Send completions. This path
 822 * enqueues a sendctx that has been completed. Multiple producer
 823 * threads are serialized by the ib_poll_cq() function.
 824 */
 825
 826/* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
 827 * queue activity, and ib_drain_qp has flushed all remaining Send
 828 * requests.
 829 */
 830static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf)
 831{
 832        unsigned long i;
 833
 834        for (i = 0; i <= buf->rb_sc_last; i++)
 835                kfree(buf->rb_sc_ctxs[i]);
 836        kfree(buf->rb_sc_ctxs);
 837}
 838
 839static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia)
 840{
 841        struct rpcrdma_sendctx *sc;
 842
 843        sc = kzalloc(sizeof(*sc) +
 844                     ia->ri_max_send_sges * sizeof(struct ib_sge),
 845                     GFP_KERNEL);
 846        if (!sc)
 847                return NULL;
 848
 849        sc->sc_wr.wr_cqe = &sc->sc_cqe;
 850        sc->sc_wr.sg_list = sc->sc_sges;
 851        sc->sc_wr.opcode = IB_WR_SEND;
 852        sc->sc_cqe.done = rpcrdma_wc_send;
 853        return sc;
 854}
 855
 856static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
 857{
 858        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 859        struct rpcrdma_sendctx *sc;
 860        unsigned long i;
 861
 862        /* Maximum number of concurrent outstanding Send WRs. Capping
 863         * the circular queue size stops Send Queue overflow by causing
 864         * the ->send_request call to fail temporarily before too many
 865         * Sends are posted.
 866         */
 867        i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS;
 868        dprintk("RPC:       %s: allocating %lu send_ctxs\n", __func__, i);
 869        buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
 870        if (!buf->rb_sc_ctxs)
 871                return -ENOMEM;
 872
 873        buf->rb_sc_last = i - 1;
 874        for (i = 0; i <= buf->rb_sc_last; i++) {
 875                sc = rpcrdma_sendctx_create(&r_xprt->rx_ia);
 876                if (!sc)
 877                        goto out_destroy;
 878
 879                sc->sc_xprt = r_xprt;
 880                buf->rb_sc_ctxs[i] = sc;
 881        }
 882        buf->rb_flags = 0;
 883
 884        return 0;
 885
 886out_destroy:
 887        rpcrdma_sendctxs_destroy(buf);
 888        return -ENOMEM;
 889}
 890
 891/* The sendctx queue is not guaranteed to have a size that is a
 892 * power of two, thus the helpers in circ_buf.h cannot be used.
 893 * The other option is to use modulus (%), which can be expensive.
 894 */
 895static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
 896                                          unsigned long item)
 897{
 898        return likely(item < buf->rb_sc_last) ? item + 1 : 0;
 899}
 900
 901/**
 902 * rpcrdma_sendctx_get_locked - Acquire a send context
 903 * @buf: transport buffers from which to acquire an unused context
 904 *
 905 * Returns pointer to a free send completion context; or NULL if
 906 * the queue is empty.
 907 *
 908 * Usage: Called to acquire an SGE array before preparing a Send WR.
 909 *
 910 * The caller serializes calls to this function (per rpcrdma_buffer),
 911 * and provides an effective memory barrier that flushes the new value
 912 * of rb_sc_head.
 913 */
 914struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
 915{
 916        struct rpcrdma_xprt *r_xprt;
 917        struct rpcrdma_sendctx *sc;
 918        unsigned long next_head;
 919
 920        next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
 921
 922        if (next_head == READ_ONCE(buf->rb_sc_tail))
 923                goto out_emptyq;
 924
 925        /* ORDER: item must be accessed _before_ head is updated */
 926        sc = buf->rb_sc_ctxs[next_head];
 927
 928        /* Releasing the lock in the caller acts as a memory
 929         * barrier that flushes rb_sc_head.
 930         */
 931        buf->rb_sc_head = next_head;
 932
 933        return sc;
 934
 935out_emptyq:
 936        /* The queue is "empty" if there have not been enough Send
 937         * completions recently. This is a sign the Send Queue is
 938         * backing up. Cause the caller to pause and try again.
 939         */
 940        set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags);
 941        r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf);
 942        r_xprt->rx_stats.empty_sendctx_q++;
 943        return NULL;
 944}
 945
 946/**
 947 * rpcrdma_sendctx_put_locked - Release a send context
 948 * @sc: send context to release
 949 *
 950 * Usage: Called from Send completion to return a sendctxt
 951 * to the queue.
 952 *
 953 * The caller serializes calls to this function (per rpcrdma_buffer).
 954 */
 955static void
 956rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
 957{
 958        struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
 959        unsigned long next_tail;
 960
 961        /* Unmap SGEs of previously completed by unsignaled
 962         * Sends by walking up the queue until @sc is found.
 963         */
 964        next_tail = buf->rb_sc_tail;
 965        do {
 966                next_tail = rpcrdma_sendctx_next(buf, next_tail);
 967
 968                /* ORDER: item must be accessed _before_ tail is updated */
 969                rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]);
 970
 971        } while (buf->rb_sc_ctxs[next_tail] != sc);
 972
 973        /* Paired with READ_ONCE */
 974        smp_store_release(&buf->rb_sc_tail, next_tail);
 975
 976        if (test_and_clear_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags)) {
 977                smp_mb__after_atomic();
 978                xprt_write_space(&sc->sc_xprt->rx_xprt);
 979        }
 980}
 981
 982static void
 983rpcrdma_mr_recovery_worker(struct work_struct *work)
 984{
 985        struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
 986                                                  rb_recovery_worker.work);
 987        struct rpcrdma_mr *mr;
 988
 989        spin_lock(&buf->rb_recovery_lock);
 990        while (!list_empty(&buf->rb_stale_mrs)) {
 991                mr = rpcrdma_mr_pop(&buf->rb_stale_mrs);
 992                spin_unlock(&buf->rb_recovery_lock);
 993
 994                trace_xprtrdma_recover_mr(mr);
 995                mr->mr_xprt->rx_ia.ri_ops->ro_recover_mr(mr);
 996
 997                spin_lock(&buf->rb_recovery_lock);
 998        }
 999        spin_unlock(&buf->rb_recovery_lock);
1000}
1001
1002void
1003rpcrdma_mr_defer_recovery(struct rpcrdma_mr *mr)
1004{
1005        struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
1006        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1007
1008        spin_lock(&buf->rb_recovery_lock);
1009        rpcrdma_mr_push(mr, &buf->rb_stale_mrs);
1010        spin_unlock(&buf->rb_recovery_lock);
1011
1012        schedule_delayed_work(&buf->rb_recovery_worker, 0);
1013}
1014
1015static void
1016rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
1017{
1018        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1019        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1020        unsigned int count;
1021        LIST_HEAD(free);
1022        LIST_HEAD(all);
1023
1024        for (count = 0; count < 3; count++) {
1025                struct rpcrdma_mr *mr;
1026                int rc;
1027
1028                mr = kzalloc(sizeof(*mr), GFP_KERNEL);
1029                if (!mr)
1030                        break;
1031
1032                rc = ia->ri_ops->ro_init_mr(ia, mr);
1033                if (rc) {
1034                        kfree(mr);
1035                        break;
1036                }
1037
1038                mr->mr_xprt = r_xprt;
1039
1040                list_add(&mr->mr_list, &free);
1041                list_add(&mr->mr_all, &all);
1042        }
1043
1044        spin_lock(&buf->rb_mrlock);
1045        list_splice(&free, &buf->rb_mrs);
1046        list_splice(&all, &buf->rb_all);
1047        r_xprt->rx_stats.mrs_allocated += count;
1048        spin_unlock(&buf->rb_mrlock);
1049        trace_xprtrdma_createmrs(r_xprt, count);
1050
1051        xprt_write_space(&r_xprt->rx_xprt);
1052}
1053
1054static void
1055rpcrdma_mr_refresh_worker(struct work_struct *work)
1056{
1057        struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
1058                                                  rb_refresh_worker.work);
1059        struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
1060                                                   rx_buf);
1061
1062        rpcrdma_mrs_create(r_xprt);
1063}
1064
1065struct rpcrdma_req *
1066rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
1067{
1068        struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
1069        struct rpcrdma_regbuf *rb;
1070        struct rpcrdma_req *req;
1071
1072        req = kzalloc(sizeof(*req), GFP_KERNEL);
1073        if (req == NULL)
1074                return ERR_PTR(-ENOMEM);
1075
1076        rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
1077                                  DMA_TO_DEVICE, GFP_KERNEL);
1078        if (IS_ERR(rb)) {
1079                kfree(req);
1080                return ERR_PTR(-ENOMEM);
1081        }
1082        req->rl_rdmabuf = rb;
1083        xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb));
1084        req->rl_buffer = buffer;
1085        INIT_LIST_HEAD(&req->rl_registered);
1086
1087        spin_lock(&buffer->rb_reqslock);
1088        list_add(&req->rl_all, &buffer->rb_allreqs);
1089        spin_unlock(&buffer->rb_reqslock);
1090        return req;
1091}
1092
1093static int
1094rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp)
1095{
1096        struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1097        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1098        struct rpcrdma_rep *rep;
1099        int rc;
1100
1101        rc = -ENOMEM;
1102        rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1103        if (rep == NULL)
1104                goto out;
1105
1106        rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize,
1107                                               DMA_FROM_DEVICE, GFP_KERNEL);
1108        if (IS_ERR(rep->rr_rdmabuf)) {
1109                rc = PTR_ERR(rep->rr_rdmabuf);
1110                goto out_free;
1111        }
1112        xdr_buf_init(&rep->rr_hdrbuf, rep->rr_rdmabuf->rg_base,
1113                     rdmab_length(rep->rr_rdmabuf));
1114
1115        rep->rr_cqe.done = rpcrdma_wc_receive;
1116        rep->rr_rxprt = r_xprt;
1117        INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
1118        rep->rr_recv_wr.next = NULL;
1119        rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
1120        rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1121        rep->rr_recv_wr.num_sge = 1;
1122        rep->rr_temp = temp;
1123
1124        spin_lock(&buf->rb_lock);
1125        list_add(&rep->rr_list, &buf->rb_recv_bufs);
1126        spin_unlock(&buf->rb_lock);
1127        return 0;
1128
1129out_free:
1130        kfree(rep);
1131out:
1132        dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1133                __func__, rc);
1134        return rc;
1135}
1136
1137int
1138rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1139{
1140        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1141        int i, rc;
1142
1143        buf->rb_max_requests = r_xprt->rx_data.max_requests;
1144        buf->rb_bc_srv_max_requests = 0;
1145        spin_lock_init(&buf->rb_mrlock);
1146        spin_lock_init(&buf->rb_lock);
1147        spin_lock_init(&buf->rb_recovery_lock);
1148        INIT_LIST_HEAD(&buf->rb_mrs);
1149        INIT_LIST_HEAD(&buf->rb_all);
1150        INIT_LIST_HEAD(&buf->rb_stale_mrs);
1151        INIT_DELAYED_WORK(&buf->rb_refresh_worker,
1152                          rpcrdma_mr_refresh_worker);
1153        INIT_DELAYED_WORK(&buf->rb_recovery_worker,
1154                          rpcrdma_mr_recovery_worker);
1155
1156        rpcrdma_mrs_create(r_xprt);
1157
1158        INIT_LIST_HEAD(&buf->rb_send_bufs);
1159        INIT_LIST_HEAD(&buf->rb_allreqs);
1160        spin_lock_init(&buf->rb_reqslock);
1161        for (i = 0; i < buf->rb_max_requests; i++) {
1162                struct rpcrdma_req *req;
1163
1164                req = rpcrdma_create_req(r_xprt);
1165                if (IS_ERR(req)) {
1166                        dprintk("RPC:       %s: request buffer %d alloc"
1167                                " failed\n", __func__, i);
1168                        rc = PTR_ERR(req);
1169                        goto out;
1170                }
1171                list_add(&req->rl_list, &buf->rb_send_bufs);
1172        }
1173
1174        buf->rb_posted_receives = 0;
1175        INIT_LIST_HEAD(&buf->rb_recv_bufs);
1176
1177        rc = rpcrdma_sendctxs_create(r_xprt);
1178        if (rc)
1179                goto out;
1180
1181        return 0;
1182out:
1183        rpcrdma_buffer_destroy(buf);
1184        return rc;
1185}
1186
1187static void
1188rpcrdma_destroy_rep(struct rpcrdma_rep *rep)
1189{
1190        rpcrdma_free_regbuf(rep->rr_rdmabuf);
1191        kfree(rep);
1192}
1193
1194void
1195rpcrdma_destroy_req(struct rpcrdma_req *req)
1196{
1197        rpcrdma_free_regbuf(req->rl_recvbuf);
1198        rpcrdma_free_regbuf(req->rl_sendbuf);
1199        rpcrdma_free_regbuf(req->rl_rdmabuf);
1200        kfree(req);
1201}
1202
1203static void
1204rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
1205{
1206        struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
1207                                                   rx_buf);
1208        struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1209        struct rpcrdma_mr *mr;
1210        unsigned int count;
1211
1212        count = 0;
1213        spin_lock(&buf->rb_mrlock);
1214        while (!list_empty(&buf->rb_all)) {
1215                mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all);
1216                list_del(&mr->mr_all);
1217
1218                spin_unlock(&buf->rb_mrlock);
1219
1220                /* Ensure MW is not on any rl_registered list */
1221                if (!list_empty(&mr->mr_list))
1222                        list_del(&mr->mr_list);
1223
1224                ia->ri_ops->ro_release_mr(mr);
1225                count++;
1226                spin_lock(&buf->rb_mrlock);
1227        }
1228        spin_unlock(&buf->rb_mrlock);
1229        r_xprt->rx_stats.mrs_allocated = 0;
1230
1231        dprintk("RPC:       %s: released %u MRs\n", __func__, count);
1232}
1233
1234void
1235rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1236{
1237        cancel_delayed_work_sync(&buf->rb_recovery_worker);
1238        cancel_delayed_work_sync(&buf->rb_refresh_worker);
1239
1240        rpcrdma_sendctxs_destroy(buf);
1241
1242        while (!list_empty(&buf->rb_recv_bufs)) {
1243                struct rpcrdma_rep *rep;
1244
1245                rep = list_first_entry(&buf->rb_recv_bufs,
1246                                       struct rpcrdma_rep, rr_list);
1247                list_del(&rep->rr_list);
1248                rpcrdma_destroy_rep(rep);
1249        }
1250
1251        spin_lock(&buf->rb_reqslock);
1252        while (!list_empty(&buf->rb_allreqs)) {
1253                struct rpcrdma_req *req;
1254
1255                req = list_first_entry(&buf->rb_allreqs,
1256                                       struct rpcrdma_req, rl_all);
1257                list_del(&req->rl_all);
1258
1259                spin_unlock(&buf->rb_reqslock);
1260                rpcrdma_destroy_req(req);
1261                spin_lock(&buf->rb_reqslock);
1262        }
1263        spin_unlock(&buf->rb_reqslock);
1264
1265        rpcrdma_mrs_destroy(buf);
1266}
1267
1268/**
1269 * rpcrdma_mr_get - Allocate an rpcrdma_mr object
1270 * @r_xprt: controlling transport
1271 *
1272 * Returns an initialized rpcrdma_mr or NULL if no free
1273 * rpcrdma_mr objects are available.
1274 */
1275struct rpcrdma_mr *
1276rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
1277{
1278        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1279        struct rpcrdma_mr *mr = NULL;
1280
1281        spin_lock(&buf->rb_mrlock);
1282        if (!list_empty(&buf->rb_mrs))
1283                mr = rpcrdma_mr_pop(&buf->rb_mrs);
1284        spin_unlock(&buf->rb_mrlock);
1285
1286        if (!mr)
1287                goto out_nomrs;
1288        return mr;
1289
1290out_nomrs:
1291        trace_xprtrdma_nomrs(r_xprt);
1292        if (r_xprt->rx_ep.rep_connected != -ENODEV)
1293                schedule_delayed_work(&buf->rb_refresh_worker, 0);
1294
1295        /* Allow the reply handler and refresh worker to run */
1296        cond_resched();
1297
1298        return NULL;
1299}
1300
1301static void
1302__rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr)
1303{
1304        spin_lock(&buf->rb_mrlock);
1305        rpcrdma_mr_push(mr, &buf->rb_mrs);
1306        spin_unlock(&buf->rb_mrlock);
1307}
1308
1309/**
1310 * rpcrdma_mr_put - Release an rpcrdma_mr object
1311 * @mr: object to release
1312 *
1313 */
1314void
1315rpcrdma_mr_put(struct rpcrdma_mr *mr)
1316{
1317        __rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr);
1318}
1319
1320/**
1321 * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it
1322 * @mr: object to release
1323 *
1324 */
1325void
1326rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
1327{
1328        struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
1329
1330        trace_xprtrdma_dma_unmap(mr);
1331        ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
1332                        mr->mr_sg, mr->mr_nents, mr->mr_dir);
1333        __rpcrdma_mr_put(&r_xprt->rx_buf, mr);
1334}
1335
1336/**
1337 * rpcrdma_buffer_get - Get a request buffer
1338 * @buffers: Buffer pool from which to obtain a buffer
1339 *
1340 * Returns a fresh rpcrdma_req, or NULL if none are available.
1341 */
1342struct rpcrdma_req *
1343rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1344{
1345        struct rpcrdma_req *req;
1346
1347        spin_lock(&buffers->rb_lock);
1348        req = list_first_entry_or_null(&buffers->rb_send_bufs,
1349                                       struct rpcrdma_req, rl_list);
1350        if (req)
1351                list_del_init(&req->rl_list);
1352        spin_unlock(&buffers->rb_lock);
1353        return req;
1354}
1355
1356/**
1357 * rpcrdma_buffer_put - Put request/reply buffers back into pool
1358 * @req: object to return
1359 *
1360 */
1361void
1362rpcrdma_buffer_put(struct rpcrdma_req *req)
1363{
1364        struct rpcrdma_buffer *buffers = req->rl_buffer;
1365        struct rpcrdma_rep *rep = req->rl_reply;
1366
1367        req->rl_reply = NULL;
1368
1369        spin_lock(&buffers->rb_lock);
1370        list_add(&req->rl_list, &buffers->rb_send_bufs);
1371        if (rep) {
1372                if (!rep->rr_temp) {
1373                        list_add(&rep->rr_list, &buffers->rb_recv_bufs);
1374                        rep = NULL;
1375                }
1376        }
1377        spin_unlock(&buffers->rb_lock);
1378        if (rep)
1379                rpcrdma_destroy_rep(rep);
1380}
1381
1382/*
1383 * Put reply buffers back into pool when not attached to
1384 * request. This happens in error conditions.
1385 */
1386void
1387rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1388{
1389        struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1390
1391        if (!rep->rr_temp) {
1392                spin_lock(&buffers->rb_lock);
1393                list_add(&rep->rr_list, &buffers->rb_recv_bufs);
1394                spin_unlock(&buffers->rb_lock);
1395        } else {
1396                rpcrdma_destroy_rep(rep);
1397        }
1398}
1399
1400/**
1401 * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
1402 * @size: size of buffer to be allocated, in bytes
1403 * @direction: direction of data movement
1404 * @flags: GFP flags
1405 *
1406 * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that
1407 * can be persistently DMA-mapped for I/O.
1408 *
1409 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1410 * receiving the payload of RDMA RECV operations. During Long Calls
1411 * or Replies they may be registered externally via ro_map.
1412 */
1413struct rpcrdma_regbuf *
1414rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
1415                     gfp_t flags)
1416{
1417        struct rpcrdma_regbuf *rb;
1418
1419        rb = kmalloc(sizeof(*rb) + size, flags);
1420        if (rb == NULL)
1421                return ERR_PTR(-ENOMEM);
1422
1423        rb->rg_device = NULL;
1424        rb->rg_direction = direction;
1425        rb->rg_iov.length = size;
1426
1427        return rb;
1428}
1429
1430/**
1431 * __rpcrdma_map_regbuf - DMA-map a regbuf
1432 * @ia: controlling rpcrdma_ia
1433 * @rb: regbuf to be mapped
1434 */
1435bool
1436__rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1437{
1438        struct ib_device *device = ia->ri_device;
1439
1440        if (rb->rg_direction == DMA_NONE)
1441                return false;
1442
1443        rb->rg_iov.addr = ib_dma_map_single(device,
1444                                            (void *)rb->rg_base,
1445                                            rdmab_length(rb),
1446                                            rb->rg_direction);
1447        if (ib_dma_mapping_error(device, rdmab_addr(rb)))
1448                return false;
1449
1450        rb->rg_device = device;
1451        rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
1452        return true;
1453}
1454
1455static void
1456rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
1457{
1458        if (!rb)
1459                return;
1460
1461        if (!rpcrdma_regbuf_is_mapped(rb))
1462                return;
1463
1464        ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb),
1465                            rdmab_length(rb), rb->rg_direction);
1466        rb->rg_device = NULL;
1467}
1468
1469/**
1470 * rpcrdma_free_regbuf - deregister and free registered buffer
1471 * @rb: regbuf to be deregistered and freed
1472 */
1473void
1474rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
1475{
1476        rpcrdma_dma_unmap_regbuf(rb);
1477        kfree(rb);
1478}
1479
1480/*
1481 * Prepost any receive buffer, then post send.
1482 *
1483 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1484 */
1485int
1486rpcrdma_ep_post(struct rpcrdma_ia *ia,
1487                struct rpcrdma_ep *ep,
1488                struct rpcrdma_req *req)
1489{
1490        struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
1491        int rc;
1492
1493        if (!ep->rep_send_count ||
1494            test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
1495                send_wr->send_flags |= IB_SEND_SIGNALED;
1496                ep->rep_send_count = ep->rep_send_batch;
1497        } else {
1498                send_wr->send_flags &= ~IB_SEND_SIGNALED;
1499                --ep->rep_send_count;
1500        }
1501
1502        rc = ia->ri_ops->ro_send(ia, req);
1503        trace_xprtrdma_post_send(req, rc);
1504        if (rc)
1505                return -ENOTCONN;
1506        return 0;
1507}
1508
1509/**
1510 * rpcrdma_post_recvs - Maybe post some Receive buffers
1511 * @r_xprt: controlling transport
1512 * @temp: when true, allocate temp rpcrdma_rep objects
1513 *
1514 */
1515void
1516rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
1517{
1518        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1519        struct ib_recv_wr *wr, *bad_wr;
1520        int needed, count, rc;
1521
1522        needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
1523        if (buf->rb_posted_receives > needed)
1524                return;
1525        needed -= buf->rb_posted_receives;
1526
1527        count = 0;
1528        wr = NULL;
1529        while (needed) {
1530                struct rpcrdma_regbuf *rb;
1531                struct rpcrdma_rep *rep;
1532
1533                spin_lock(&buf->rb_lock);
1534                rep = list_first_entry_or_null(&buf->rb_recv_bufs,
1535                                               struct rpcrdma_rep, rr_list);
1536                if (likely(rep))
1537                        list_del(&rep->rr_list);
1538                spin_unlock(&buf->rb_lock);
1539                if (!rep) {
1540                        if (rpcrdma_create_rep(r_xprt, temp))
1541                                break;
1542                        continue;
1543                }
1544
1545                rb = rep->rr_rdmabuf;
1546                if (!rpcrdma_regbuf_is_mapped(rb)) {
1547                        if (!__rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, rb)) {
1548                                rpcrdma_recv_buffer_put(rep);
1549                                break;
1550                        }
1551                }
1552
1553                trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
1554                rep->rr_recv_wr.next = wr;
1555                wr = &rep->rr_recv_wr;
1556                ++count;
1557                --needed;
1558        }
1559        if (!count)
1560                return;
1561
1562        rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr, &bad_wr);
1563        if (rc) {
1564                for (wr = bad_wr; wr; wr = wr->next) {
1565                        struct rpcrdma_rep *rep;
1566
1567                        rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
1568                        rpcrdma_recv_buffer_put(rep);
1569                        --count;
1570                }
1571        }
1572        buf->rb_posted_receives += count;
1573        trace_xprtrdma_post_recvs(r_xprt, count, rc);
1574}
1575