linux/net/sunrpc/xprtrdma/verbs.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the BSD-type
   8 * license below:
   9 *
  10 * Redistribution and use in source and binary forms, with or without
  11 * modification, are permitted provided that the following conditions
  12 * are met:
  13 *
  14 *      Redistributions of source code must retain the above copyright
  15 *      notice, this list of conditions and the following disclaimer.
  16 *
  17 *      Redistributions in binary form must reproduce the above
  18 *      copyright notice, this list of conditions and the following
  19 *      disclaimer in the documentation and/or other materials provided
  20 *      with the distribution.
  21 *
  22 *      Neither the name of the Network Appliance, Inc. nor the names of
  23 *      its contributors may be used to endorse or promote products
  24 *      derived from this software without specific prior written
  25 *      permission.
  26 *
  27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  38 */
  39
  40/*
  41 * verbs.c
  42 *
  43 * Encapsulates the major functions managing:
  44 *  o adapters
  45 *  o endpoints
  46 *  o connections
  47 *  o buffer memory
  48 */
  49
  50#include <linux/interrupt.h>
  51#include <linux/slab.h>
  52#include <linux/prefetch.h>
  53#include <asm/bitops.h>
  54
  55#include "xprt_rdma.h"
  56
  57/*
  58 * Globals/Macros
  59 */
  60
  61#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  62# define RPCDBG_FACILITY        RPCDBG_TRANS
  63#endif
  64
  65static void rpcrdma_reset_frmrs(struct rpcrdma_ia *);
  66static void rpcrdma_reset_fmrs(struct rpcrdma_ia *);
  67
  68/*
  69 * internal functions
  70 */
  71
  72/*
  73 * handle replies in tasklet context, using a single, global list
  74 * rdma tasklet function -- just turn around and call the func
  75 * for all replies on the list
  76 */
  77
  78static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
  79static LIST_HEAD(rpcrdma_tasklets_g);
  80
  81static void
  82rpcrdma_run_tasklet(unsigned long data)
  83{
  84        struct rpcrdma_rep *rep;
  85        void (*func)(struct rpcrdma_rep *);
  86        unsigned long flags;
  87
  88        data = data;
  89        spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
  90        while (!list_empty(&rpcrdma_tasklets_g)) {
  91                rep = list_entry(rpcrdma_tasklets_g.next,
  92                                 struct rpcrdma_rep, rr_list);
  93                list_del(&rep->rr_list);
  94                func = rep->rr_func;
  95                rep->rr_func = NULL;
  96                spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
  97
  98                if (func)
  99                        func(rep);
 100                else
 101                        rpcrdma_recv_buffer_put(rep);
 102
 103                spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
 104        }
 105        spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
 106}
 107
 108static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
 109
 110static const char * const async_event[] = {
 111        "CQ error",
 112        "QP fatal error",
 113        "QP request error",
 114        "QP access error",
 115        "communication established",
 116        "send queue drained",
 117        "path migration successful",
 118        "path mig error",
 119        "device fatal error",
 120        "port active",
 121        "port error",
 122        "LID change",
 123        "P_key change",
 124        "SM change",
 125        "SRQ error",
 126        "SRQ limit reached",
 127        "last WQE reached",
 128        "client reregister",
 129        "GID change",
 130};
 131
 132#define ASYNC_MSG(status)                                       \
 133        ((status) < ARRAY_SIZE(async_event) ?                   \
 134                async_event[(status)] : "unknown async error")
 135
 136static void
 137rpcrdma_schedule_tasklet(struct list_head *sched_list)
 138{
 139        unsigned long flags;
 140
 141        spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
 142        list_splice_tail(sched_list, &rpcrdma_tasklets_g);
 143        spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
 144        tasklet_schedule(&rpcrdma_tasklet_g);
 145}
 146
 147static void
 148rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
 149{
 150        struct rpcrdma_ep *ep = context;
 151
 152        pr_err("RPC:       %s: %s on device %s ep %p\n",
 153               __func__, ASYNC_MSG(event->event),
 154                event->device->name, context);
 155        if (ep->rep_connected == 1) {
 156                ep->rep_connected = -EIO;
 157                rpcrdma_conn_func(ep);
 158                wake_up_all(&ep->rep_connect_wait);
 159        }
 160}
 161
 162static void
 163rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
 164{
 165        struct rpcrdma_ep *ep = context;
 166
 167        pr_err("RPC:       %s: %s on device %s ep %p\n",
 168               __func__, ASYNC_MSG(event->event),
 169                event->device->name, context);
 170        if (ep->rep_connected == 1) {
 171                ep->rep_connected = -EIO;
 172                rpcrdma_conn_func(ep);
 173                wake_up_all(&ep->rep_connect_wait);
 174        }
 175}
 176
 177static const char * const wc_status[] = {
 178        "success",
 179        "local length error",
 180        "local QP operation error",
 181        "local EE context operation error",
 182        "local protection error",
 183        "WR flushed",
 184        "memory management operation error",
 185        "bad response error",
 186        "local access error",
 187        "remote invalid request error",
 188        "remote access error",
 189        "remote operation error",
 190        "transport retry counter exceeded",
 191        "RNR retrycounter exceeded",
 192        "local RDD violation error",
 193        "remove invalid RD request",
 194        "operation aborted",
 195        "invalid EE context number",
 196        "invalid EE context state",
 197        "fatal error",
 198        "response timeout error",
 199        "general error",
 200};
 201
 202#define COMPLETION_MSG(status)                                  \
 203        ((status) < ARRAY_SIZE(wc_status) ?                     \
 204                wc_status[(status)] : "unexpected completion error")
 205
 206static void
 207rpcrdma_sendcq_process_wc(struct ib_wc *wc)
 208{
 209        if (likely(wc->status == IB_WC_SUCCESS))
 210                return;
 211
 212        /* WARNING: Only wr_id and status are reliable at this point */
 213        if (wc->wr_id == 0ULL) {
 214                if (wc->status != IB_WC_WR_FLUSH_ERR)
 215                        pr_err("RPC:       %s: SEND: %s\n",
 216                               __func__, COMPLETION_MSG(wc->status));
 217        } else {
 218                struct rpcrdma_mw *r;
 219
 220                r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
 221                r->r.frmr.fr_state = FRMR_IS_STALE;
 222                pr_err("RPC:       %s: frmr %p (stale): %s\n",
 223                       __func__, r, COMPLETION_MSG(wc->status));
 224        }
 225}
 226
 227static int
 228rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
 229{
 230        struct ib_wc *wcs;
 231        int budget, count, rc;
 232
 233        budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
 234        do {
 235                wcs = ep->rep_send_wcs;
 236
 237                rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
 238                if (rc <= 0)
 239                        return rc;
 240
 241                count = rc;
 242                while (count-- > 0)
 243                        rpcrdma_sendcq_process_wc(wcs++);
 244        } while (rc == RPCRDMA_POLLSIZE && --budget);
 245        return 0;
 246}
 247
 248/*
 249 * Handle send, fast_reg_mr, and local_inv completions.
 250 *
 251 * Send events are typically suppressed and thus do not result
 252 * in an upcall. Occasionally one is signaled, however. This
 253 * prevents the provider's completion queue from wrapping and
 254 * losing a completion.
 255 */
 256static void
 257rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
 258{
 259        struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
 260        int rc;
 261
 262        rc = rpcrdma_sendcq_poll(cq, ep);
 263        if (rc) {
 264                dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
 265                        __func__, rc);
 266                return;
 267        }
 268
 269        rc = ib_req_notify_cq(cq,
 270                        IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
 271        if (rc == 0)
 272                return;
 273        if (rc < 0) {
 274                dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
 275                        __func__, rc);
 276                return;
 277        }
 278
 279        rpcrdma_sendcq_poll(cq, ep);
 280}
 281
 282static void
 283rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
 284{
 285        struct rpcrdma_rep *rep =
 286                        (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
 287
 288        /* WARNING: Only wr_id and status are reliable at this point */
 289        if (wc->status != IB_WC_SUCCESS)
 290                goto out_fail;
 291
 292        /* status == SUCCESS means all fields in wc are trustworthy */
 293        if (wc->opcode != IB_WC_RECV)
 294                return;
 295
 296        dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
 297                __func__, rep, wc->byte_len);
 298
 299        rep->rr_len = wc->byte_len;
 300        ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
 301                                   rdmab_addr(rep->rr_rdmabuf),
 302                                   rep->rr_len, DMA_FROM_DEVICE);
 303        prefetch(rdmab_to_msg(rep->rr_rdmabuf));
 304
 305out_schedule:
 306        list_add_tail(&rep->rr_list, sched_list);
 307        return;
 308out_fail:
 309        if (wc->status != IB_WC_WR_FLUSH_ERR)
 310                pr_err("RPC:       %s: rep %p: %s\n",
 311                       __func__, rep, COMPLETION_MSG(wc->status));
 312        rep->rr_len = ~0U;
 313        goto out_schedule;
 314}
 315
 316static int
 317rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
 318{
 319        struct list_head sched_list;
 320        struct ib_wc *wcs;
 321        int budget, count, rc;
 322
 323        INIT_LIST_HEAD(&sched_list);
 324        budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
 325        do {
 326                wcs = ep->rep_recv_wcs;
 327
 328                rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
 329                if (rc <= 0)
 330                        goto out_schedule;
 331
 332                count = rc;
 333                while (count-- > 0)
 334                        rpcrdma_recvcq_process_wc(wcs++, &sched_list);
 335        } while (rc == RPCRDMA_POLLSIZE && --budget);
 336        rc = 0;
 337
 338out_schedule:
 339        rpcrdma_schedule_tasklet(&sched_list);
 340        return rc;
 341}
 342
 343/*
 344 * Handle receive completions.
 345 *
 346 * It is reentrant but processes single events in order to maintain
 347 * ordering of receives to keep server credits.
 348 *
 349 * It is the responsibility of the scheduled tasklet to return
 350 * recv buffers to the pool. NOTE: this affects synchronization of
 351 * connection shutdown. That is, the structures required for
 352 * the completion of the reply handler must remain intact until
 353 * all memory has been reclaimed.
 354 */
 355static void
 356rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
 357{
 358        struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
 359        int rc;
 360
 361        rc = rpcrdma_recvcq_poll(cq, ep);
 362        if (rc) {
 363                dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
 364                        __func__, rc);
 365                return;
 366        }
 367
 368        rc = ib_req_notify_cq(cq,
 369                        IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
 370        if (rc == 0)
 371                return;
 372        if (rc < 0) {
 373                dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
 374                        __func__, rc);
 375                return;
 376        }
 377
 378        rpcrdma_recvcq_poll(cq, ep);
 379}
 380
 381static void
 382rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
 383{
 384        struct ib_wc wc;
 385        LIST_HEAD(sched_list);
 386
 387        while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
 388                rpcrdma_recvcq_process_wc(&wc, &sched_list);
 389        if (!list_empty(&sched_list))
 390                rpcrdma_schedule_tasklet(&sched_list);
 391        while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
 392                rpcrdma_sendcq_process_wc(&wc);
 393}
 394
 395#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 396static const char * const conn[] = {
 397        "address resolved",
 398        "address error",
 399        "route resolved",
 400        "route error",
 401        "connect request",
 402        "connect response",
 403        "connect error",
 404        "unreachable",
 405        "rejected",
 406        "established",
 407        "disconnected",
 408        "device removal",
 409        "multicast join",
 410        "multicast error",
 411        "address change",
 412        "timewait exit",
 413};
 414
 415#define CONNECTION_MSG(status)                                          \
 416        ((status) < ARRAY_SIZE(conn) ?                                  \
 417                conn[(status)] : "unrecognized connection error")
 418#endif
 419
 420static int
 421rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 422{
 423        struct rpcrdma_xprt *xprt = id->context;
 424        struct rpcrdma_ia *ia = &xprt->rx_ia;
 425        struct rpcrdma_ep *ep = &xprt->rx_ep;
 426#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 427        struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
 428#endif
 429        struct ib_qp_attr *attr = &ia->ri_qp_attr;
 430        struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
 431        int connstate = 0;
 432
 433        switch (event->event) {
 434        case RDMA_CM_EVENT_ADDR_RESOLVED:
 435        case RDMA_CM_EVENT_ROUTE_RESOLVED:
 436                ia->ri_async_rc = 0;
 437                complete(&ia->ri_done);
 438                break;
 439        case RDMA_CM_EVENT_ADDR_ERROR:
 440                ia->ri_async_rc = -EHOSTUNREACH;
 441                dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
 442                        __func__, ep);
 443                complete(&ia->ri_done);
 444                break;
 445        case RDMA_CM_EVENT_ROUTE_ERROR:
 446                ia->ri_async_rc = -ENETUNREACH;
 447                dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
 448                        __func__, ep);
 449                complete(&ia->ri_done);
 450                break;
 451        case RDMA_CM_EVENT_ESTABLISHED:
 452                connstate = 1;
 453                ib_query_qp(ia->ri_id->qp, attr,
 454                            IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
 455                            iattr);
 456                dprintk("RPC:       %s: %d responder resources"
 457                        " (%d initiator)\n",
 458                        __func__, attr->max_dest_rd_atomic,
 459                        attr->max_rd_atomic);
 460                goto connected;
 461        case RDMA_CM_EVENT_CONNECT_ERROR:
 462                connstate = -ENOTCONN;
 463                goto connected;
 464        case RDMA_CM_EVENT_UNREACHABLE:
 465                connstate = -ENETDOWN;
 466                goto connected;
 467        case RDMA_CM_EVENT_REJECTED:
 468                connstate = -ECONNREFUSED;
 469                goto connected;
 470        case RDMA_CM_EVENT_DISCONNECTED:
 471                connstate = -ECONNABORTED;
 472                goto connected;
 473        case RDMA_CM_EVENT_DEVICE_REMOVAL:
 474                connstate = -ENODEV;
 475connected:
 476                dprintk("RPC:       %s: %sconnected\n",
 477                                        __func__, connstate > 0 ? "" : "dis");
 478                ep->rep_connected = connstate;
 479                rpcrdma_conn_func(ep);
 480                wake_up_all(&ep->rep_connect_wait);
 481                /*FALLTHROUGH*/
 482        default:
 483                dprintk("RPC:       %s: %pI4:%u (ep 0x%p): %s\n",
 484                        __func__, &addr->sin_addr.s_addr,
 485                        ntohs(addr->sin_port), ep,
 486                        CONNECTION_MSG(event->event));
 487                break;
 488        }
 489
 490#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 491        if (connstate == 1) {
 492                int ird = attr->max_dest_rd_atomic;
 493                int tird = ep->rep_remote_cma.responder_resources;
 494                printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
 495                        "on %s, memreg %d slots %d ird %d%s\n",
 496                        &addr->sin_addr.s_addr,
 497                        ntohs(addr->sin_port),
 498                        ia->ri_id->device->name,
 499                        ia->ri_memreg_strategy,
 500                        xprt->rx_buf.rb_max_requests,
 501                        ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
 502        } else if (connstate < 0) {
 503                printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
 504                        &addr->sin_addr.s_addr,
 505                        ntohs(addr->sin_port),
 506                        connstate);
 507        }
 508#endif
 509
 510        return 0;
 511}
 512
 513static struct rdma_cm_id *
 514rpcrdma_create_id(struct rpcrdma_xprt *xprt,
 515                        struct rpcrdma_ia *ia, struct sockaddr *addr)
 516{
 517        struct rdma_cm_id *id;
 518        int rc;
 519
 520        init_completion(&ia->ri_done);
 521
 522        id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
 523        if (IS_ERR(id)) {
 524                rc = PTR_ERR(id);
 525                dprintk("RPC:       %s: rdma_create_id() failed %i\n",
 526                        __func__, rc);
 527                return id;
 528        }
 529
 530        ia->ri_async_rc = -ETIMEDOUT;
 531        rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
 532        if (rc) {
 533                dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
 534                        __func__, rc);
 535                goto out;
 536        }
 537        wait_for_completion_interruptible_timeout(&ia->ri_done,
 538                                msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
 539        rc = ia->ri_async_rc;
 540        if (rc)
 541                goto out;
 542
 543        ia->ri_async_rc = -ETIMEDOUT;
 544        rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
 545        if (rc) {
 546                dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
 547                        __func__, rc);
 548                goto out;
 549        }
 550        wait_for_completion_interruptible_timeout(&ia->ri_done,
 551                                msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
 552        rc = ia->ri_async_rc;
 553        if (rc)
 554                goto out;
 555
 556        return id;
 557
 558out:
 559        rdma_destroy_id(id);
 560        return ERR_PTR(rc);
 561}
 562
 563/*
 564 * Drain any cq, prior to teardown.
 565 */
 566static void
 567rpcrdma_clean_cq(struct ib_cq *cq)
 568{
 569        struct ib_wc wc;
 570        int count = 0;
 571
 572        while (1 == ib_poll_cq(cq, 1, &wc))
 573                ++count;
 574
 575        if (count)
 576                dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
 577                        __func__, count, wc.opcode);
 578}
 579
 580/*
 581 * Exported functions.
 582 */
 583
 584/*
 585 * Open and initialize an Interface Adapter.
 586 *  o initializes fields of struct rpcrdma_ia, including
 587 *    interface and provider attributes and protection zone.
 588 */
 589int
 590rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 591{
 592        int rc, mem_priv;
 593        struct rpcrdma_ia *ia = &xprt->rx_ia;
 594        struct ib_device_attr *devattr = &ia->ri_devattr;
 595
 596        ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
 597        if (IS_ERR(ia->ri_id)) {
 598                rc = PTR_ERR(ia->ri_id);
 599                goto out1;
 600        }
 601
 602        ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
 603        if (IS_ERR(ia->ri_pd)) {
 604                rc = PTR_ERR(ia->ri_pd);
 605                dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
 606                        __func__, rc);
 607                goto out2;
 608        }
 609
 610        rc = ib_query_device(ia->ri_id->device, devattr);
 611        if (rc) {
 612                dprintk("RPC:       %s: ib_query_device failed %d\n",
 613                        __func__, rc);
 614                goto out3;
 615        }
 616
 617        if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
 618                ia->ri_have_dma_lkey = 1;
 619                ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
 620        }
 621
 622        if (memreg == RPCRDMA_FRMR) {
 623                /* Requires both frmr reg and local dma lkey */
 624                if ((devattr->device_cap_flags &
 625                     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
 626                    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
 627                        dprintk("RPC:       %s: FRMR registration "
 628                                "not supported by HCA\n", __func__);
 629                        memreg = RPCRDMA_MTHCAFMR;
 630                } else {
 631                        /* Mind the ia limit on FRMR page list depth */
 632                        ia->ri_max_frmr_depth = min_t(unsigned int,
 633                                RPCRDMA_MAX_DATA_SEGS,
 634                                devattr->max_fast_reg_page_list_len);
 635                }
 636        }
 637        if (memreg == RPCRDMA_MTHCAFMR) {
 638                if (!ia->ri_id->device->alloc_fmr) {
 639                        dprintk("RPC:       %s: MTHCAFMR registration "
 640                                "not supported by HCA\n", __func__);
 641                        memreg = RPCRDMA_ALLPHYSICAL;
 642                }
 643        }
 644
 645        /*
 646         * Optionally obtain an underlying physical identity mapping in
 647         * order to do a memory window-based bind. This base registration
 648         * is protected from remote access - that is enabled only by binding
 649         * for the specific bytes targeted during each RPC operation, and
 650         * revoked after the corresponding completion similar to a storage
 651         * adapter.
 652         */
 653        switch (memreg) {
 654        case RPCRDMA_FRMR:
 655                break;
 656        case RPCRDMA_ALLPHYSICAL:
 657                mem_priv = IB_ACCESS_LOCAL_WRITE |
 658                                IB_ACCESS_REMOTE_WRITE |
 659                                IB_ACCESS_REMOTE_READ;
 660                goto register_setup;
 661        case RPCRDMA_MTHCAFMR:
 662                if (ia->ri_have_dma_lkey)
 663                        break;
 664                mem_priv = IB_ACCESS_LOCAL_WRITE;
 665        register_setup:
 666                ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
 667                if (IS_ERR(ia->ri_bind_mem)) {
 668                        printk(KERN_ALERT "%s: ib_get_dma_mr for "
 669                                "phys register failed with %lX\n",
 670                                __func__, PTR_ERR(ia->ri_bind_mem));
 671                        rc = -ENOMEM;
 672                        goto out3;
 673                }
 674                break;
 675        default:
 676                printk(KERN_ERR "RPC: Unsupported memory "
 677                                "registration mode: %d\n", memreg);
 678                rc = -ENOMEM;
 679                goto out3;
 680        }
 681        dprintk("RPC:       %s: memory registration strategy is %d\n",
 682                __func__, memreg);
 683
 684        /* Else will do memory reg/dereg for each chunk */
 685        ia->ri_memreg_strategy = memreg;
 686
 687        rwlock_init(&ia->ri_qplock);
 688        return 0;
 689
 690out3:
 691        ib_dealloc_pd(ia->ri_pd);
 692        ia->ri_pd = NULL;
 693out2:
 694        rdma_destroy_id(ia->ri_id);
 695        ia->ri_id = NULL;
 696out1:
 697        return rc;
 698}
 699
 700/*
 701 * Clean up/close an IA.
 702 *   o if event handles and PD have been initialized, free them.
 703 *   o close the IA
 704 */
 705void
 706rpcrdma_ia_close(struct rpcrdma_ia *ia)
 707{
 708        int rc;
 709
 710        dprintk("RPC:       %s: entering\n", __func__);
 711        if (ia->ri_bind_mem != NULL) {
 712                rc = ib_dereg_mr(ia->ri_bind_mem);
 713                dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
 714                        __func__, rc);
 715        }
 716        if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
 717                if (ia->ri_id->qp)
 718                        rdma_destroy_qp(ia->ri_id);
 719                rdma_destroy_id(ia->ri_id);
 720                ia->ri_id = NULL;
 721        }
 722        if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
 723                rc = ib_dealloc_pd(ia->ri_pd);
 724                dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
 725                        __func__, rc);
 726        }
 727}
 728
 729/*
 730 * Create unconnected endpoint.
 731 */
 732int
 733rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 734                                struct rpcrdma_create_data_internal *cdata)
 735{
 736        struct ib_device_attr *devattr = &ia->ri_devattr;
 737        struct ib_cq *sendcq, *recvcq;
 738        int rc, err;
 739
 740        /* check provider's send/recv wr limits */
 741        if (cdata->max_requests > devattr->max_qp_wr)
 742                cdata->max_requests = devattr->max_qp_wr;
 743
 744        ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
 745        ep->rep_attr.qp_context = ep;
 746        /* send_cq and recv_cq initialized below */
 747        ep->rep_attr.srq = NULL;
 748        ep->rep_attr.cap.max_send_wr = cdata->max_requests;
 749        switch (ia->ri_memreg_strategy) {
 750        case RPCRDMA_FRMR: {
 751                int depth = 7;
 752
 753                /* Add room for frmr register and invalidate WRs.
 754                 * 1. FRMR reg WR for head
 755                 * 2. FRMR invalidate WR for head
 756                 * 3. N FRMR reg WRs for pagelist
 757                 * 4. N FRMR invalidate WRs for pagelist
 758                 * 5. FRMR reg WR for tail
 759                 * 6. FRMR invalidate WR for tail
 760                 * 7. The RDMA_SEND WR
 761                 */
 762
 763                /* Calculate N if the device max FRMR depth is smaller than
 764                 * RPCRDMA_MAX_DATA_SEGS.
 765                 */
 766                if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
 767                        int delta = RPCRDMA_MAX_DATA_SEGS -
 768                                    ia->ri_max_frmr_depth;
 769
 770                        do {
 771                                depth += 2; /* FRMR reg + invalidate */
 772                                delta -= ia->ri_max_frmr_depth;
 773                        } while (delta > 0);
 774
 775                }
 776                ep->rep_attr.cap.max_send_wr *= depth;
 777                if (ep->rep_attr.cap.max_send_wr > devattr->max_qp_wr) {
 778                        cdata->max_requests = devattr->max_qp_wr / depth;
 779                        if (!cdata->max_requests)
 780                                return -EINVAL;
 781                        ep->rep_attr.cap.max_send_wr = cdata->max_requests *
 782                                                       depth;
 783                }
 784                break;
 785        }
 786        default:
 787                break;
 788        }
 789        ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
 790        ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
 791        ep->rep_attr.cap.max_recv_sge = 1;
 792        ep->rep_attr.cap.max_inline_data = 0;
 793        ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 794        ep->rep_attr.qp_type = IB_QPT_RC;
 795        ep->rep_attr.port_num = ~0;
 796
 797        if (cdata->padding) {
 798                ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
 799                                                      GFP_KERNEL);
 800                if (IS_ERR(ep->rep_padbuf))
 801                        return PTR_ERR(ep->rep_padbuf);
 802        } else
 803                ep->rep_padbuf = NULL;
 804
 805        dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
 806                "iovs: send %d recv %d\n",
 807                __func__,
 808                ep->rep_attr.cap.max_send_wr,
 809                ep->rep_attr.cap.max_recv_wr,
 810                ep->rep_attr.cap.max_send_sge,
 811                ep->rep_attr.cap.max_recv_sge);
 812
 813        /* set trigger for requesting send completion */
 814        ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
 815        if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS)
 816                ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS;
 817        else if (ep->rep_cqinit <= 2)
 818                ep->rep_cqinit = 0;
 819        INIT_CQCOUNT(ep);
 820        init_waitqueue_head(&ep->rep_connect_wait);
 821        INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
 822
 823        sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
 824                                  rpcrdma_cq_async_error_upcall, ep,
 825                                  ep->rep_attr.cap.max_send_wr + 1, 0);
 826        if (IS_ERR(sendcq)) {
 827                rc = PTR_ERR(sendcq);
 828                dprintk("RPC:       %s: failed to create send CQ: %i\n",
 829                        __func__, rc);
 830                goto out1;
 831        }
 832
 833        rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
 834        if (rc) {
 835                dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
 836                        __func__, rc);
 837                goto out2;
 838        }
 839
 840        recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
 841                                  rpcrdma_cq_async_error_upcall, ep,
 842                                  ep->rep_attr.cap.max_recv_wr + 1, 0);
 843        if (IS_ERR(recvcq)) {
 844                rc = PTR_ERR(recvcq);
 845                dprintk("RPC:       %s: failed to create recv CQ: %i\n",
 846                        __func__, rc);
 847                goto out2;
 848        }
 849
 850        rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
 851        if (rc) {
 852                dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
 853                        __func__, rc);
 854                ib_destroy_cq(recvcq);
 855                goto out2;
 856        }
 857
 858        ep->rep_attr.send_cq = sendcq;
 859        ep->rep_attr.recv_cq = recvcq;
 860
 861        /* Initialize cma parameters */
 862
 863        /* RPC/RDMA does not use private data */
 864        ep->rep_remote_cma.private_data = NULL;
 865        ep->rep_remote_cma.private_data_len = 0;
 866
 867        /* Client offers RDMA Read but does not initiate */
 868        ep->rep_remote_cma.initiator_depth = 0;
 869        if (devattr->max_qp_rd_atom > 32)       /* arbitrary but <= 255 */
 870                ep->rep_remote_cma.responder_resources = 32;
 871        else
 872                ep->rep_remote_cma.responder_resources =
 873                                                devattr->max_qp_rd_atom;
 874
 875        ep->rep_remote_cma.retry_count = 7;
 876        ep->rep_remote_cma.flow_control = 0;
 877        ep->rep_remote_cma.rnr_retry_count = 0;
 878
 879        return 0;
 880
 881out2:
 882        err = ib_destroy_cq(sendcq);
 883        if (err)
 884                dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
 885                        __func__, err);
 886out1:
 887        rpcrdma_free_regbuf(ia, ep->rep_padbuf);
 888        return rc;
 889}
 890
 891/*
 892 * rpcrdma_ep_destroy
 893 *
 894 * Disconnect and destroy endpoint. After this, the only
 895 * valid operations on the ep are to free it (if dynamically
 896 * allocated) or re-create it.
 897 */
 898void
 899rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 900{
 901        int rc;
 902
 903        dprintk("RPC:       %s: entering, connected is %d\n",
 904                __func__, ep->rep_connected);
 905
 906        cancel_delayed_work_sync(&ep->rep_connect_worker);
 907
 908        if (ia->ri_id->qp) {
 909                rpcrdma_ep_disconnect(ep, ia);
 910                rdma_destroy_qp(ia->ri_id);
 911                ia->ri_id->qp = NULL;
 912        }
 913
 914        rpcrdma_free_regbuf(ia, ep->rep_padbuf);
 915
 916        rpcrdma_clean_cq(ep->rep_attr.recv_cq);
 917        rc = ib_destroy_cq(ep->rep_attr.recv_cq);
 918        if (rc)
 919                dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
 920                        __func__, rc);
 921
 922        rpcrdma_clean_cq(ep->rep_attr.send_cq);
 923        rc = ib_destroy_cq(ep->rep_attr.send_cq);
 924        if (rc)
 925                dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
 926                        __func__, rc);
 927}
 928
 929/*
 930 * Connect unconnected endpoint.
 931 */
 932int
 933rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 934{
 935        struct rdma_cm_id *id, *old;
 936        int rc = 0;
 937        int retry_count = 0;
 938
 939        if (ep->rep_connected != 0) {
 940                struct rpcrdma_xprt *xprt;
 941retry:
 942                dprintk("RPC:       %s: reconnecting...\n", __func__);
 943
 944                rpcrdma_ep_disconnect(ep, ia);
 945                rpcrdma_flush_cqs(ep);
 946
 947                switch (ia->ri_memreg_strategy) {
 948                case RPCRDMA_FRMR:
 949                        rpcrdma_reset_frmrs(ia);
 950                        break;
 951                case RPCRDMA_MTHCAFMR:
 952                        rpcrdma_reset_fmrs(ia);
 953                        break;
 954                case RPCRDMA_ALLPHYSICAL:
 955                        break;
 956                default:
 957                        rc = -EIO;
 958                        goto out;
 959                }
 960
 961                xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
 962                id = rpcrdma_create_id(xprt, ia,
 963                                (struct sockaddr *)&xprt->rx_data.addr);
 964                if (IS_ERR(id)) {
 965                        rc = -EHOSTUNREACH;
 966                        goto out;
 967                }
 968                /* TEMP TEMP TEMP - fail if new device:
 969                 * Deregister/remarshal *all* requests!
 970                 * Close and recreate adapter, pd, etc!
 971                 * Re-determine all attributes still sane!
 972                 * More stuff I haven't thought of!
 973                 * Rrrgh!
 974                 */
 975                if (ia->ri_id->device != id->device) {
 976                        printk("RPC:       %s: can't reconnect on "
 977                                "different device!\n", __func__);
 978                        rdma_destroy_id(id);
 979                        rc = -ENETUNREACH;
 980                        goto out;
 981                }
 982                /* END TEMP */
 983                rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
 984                if (rc) {
 985                        dprintk("RPC:       %s: rdma_create_qp failed %i\n",
 986                                __func__, rc);
 987                        rdma_destroy_id(id);
 988                        rc = -ENETUNREACH;
 989                        goto out;
 990                }
 991
 992                write_lock(&ia->ri_qplock);
 993                old = ia->ri_id;
 994                ia->ri_id = id;
 995                write_unlock(&ia->ri_qplock);
 996
 997                rdma_destroy_qp(old);
 998                rdma_destroy_id(old);
 999        } else {
1000                dprintk("RPC:       %s: connecting...\n", __func__);
1001                rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
1002                if (rc) {
1003                        dprintk("RPC:       %s: rdma_create_qp failed %i\n",
1004                                __func__, rc);
1005                        /* do not update ep->rep_connected */
1006                        return -ENETUNREACH;
1007                }
1008        }
1009
1010        ep->rep_connected = 0;
1011
1012        rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
1013        if (rc) {
1014                dprintk("RPC:       %s: rdma_connect() failed with %i\n",
1015                                __func__, rc);
1016                goto out;
1017        }
1018
1019        wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
1020
1021        /*
1022         * Check state. A non-peer reject indicates no listener
1023         * (ECONNREFUSED), which may be a transient state. All
1024         * others indicate a transport condition which has already
1025         * undergone a best-effort.
1026         */
1027        if (ep->rep_connected == -ECONNREFUSED &&
1028            ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
1029                dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
1030                goto retry;
1031        }
1032        if (ep->rep_connected <= 0) {
1033                /* Sometimes, the only way to reliably connect to remote
1034                 * CMs is to use same nonzero values for ORD and IRD. */
1035                if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
1036                    (ep->rep_remote_cma.responder_resources == 0 ||
1037                     ep->rep_remote_cma.initiator_depth !=
1038                                ep->rep_remote_cma.responder_resources)) {
1039                        if (ep->rep_remote_cma.responder_resources == 0)
1040                                ep->rep_remote_cma.responder_resources = 1;
1041                        ep->rep_remote_cma.initiator_depth =
1042                                ep->rep_remote_cma.responder_resources;
1043                        goto retry;
1044                }
1045                rc = ep->rep_connected;
1046        } else {
1047                dprintk("RPC:       %s: connected\n", __func__);
1048        }
1049
1050out:
1051        if (rc)
1052                ep->rep_connected = rc;
1053        return rc;
1054}
1055
1056/*
1057 * rpcrdma_ep_disconnect
1058 *
1059 * This is separate from destroy to facilitate the ability
1060 * to reconnect without recreating the endpoint.
1061 *
1062 * This call is not reentrant, and must not be made in parallel
1063 * on the same endpoint.
1064 */
1065void
1066rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
1067{
1068        int rc;
1069
1070        rpcrdma_flush_cqs(ep);
1071        rc = rdma_disconnect(ia->ri_id);
1072        if (!rc) {
1073                /* returns without wait if not connected */
1074                wait_event_interruptible(ep->rep_connect_wait,
1075                                                        ep->rep_connected != 1);
1076                dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
1077                        (ep->rep_connected == 1) ? "still " : "dis");
1078        } else {
1079                dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
1080                ep->rep_connected = rc;
1081        }
1082}
1083
1084static struct rpcrdma_req *
1085rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
1086{
1087        struct rpcrdma_req *req;
1088
1089        req = kzalloc(sizeof(*req), GFP_KERNEL);
1090        if (req == NULL)
1091                return ERR_PTR(-ENOMEM);
1092
1093        req->rl_buffer = &r_xprt->rx_buf;
1094        return req;
1095}
1096
1097static struct rpcrdma_rep *
1098rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
1099{
1100        struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1101        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1102        struct rpcrdma_rep *rep;
1103        int rc;
1104
1105        rc = -ENOMEM;
1106        rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1107        if (rep == NULL)
1108                goto out;
1109
1110        rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
1111                                               GFP_KERNEL);
1112        if (IS_ERR(rep->rr_rdmabuf)) {
1113                rc = PTR_ERR(rep->rr_rdmabuf);
1114                goto out_free;
1115        }
1116
1117        rep->rr_buffer = &r_xprt->rx_buf;
1118        return rep;
1119
1120out_free:
1121        kfree(rep);
1122out:
1123        return ERR_PTR(rc);
1124}
1125
1126static int
1127rpcrdma_init_fmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
1128{
1129        int mr_access_flags = IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ;
1130        struct ib_fmr_attr fmr_attr = {
1131                .max_pages      = RPCRDMA_MAX_DATA_SEGS,
1132                .max_maps       = 1,
1133                .page_shift     = PAGE_SHIFT
1134        };
1135        struct rpcrdma_mw *r;
1136        int i, rc;
1137
1138        i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
1139        dprintk("RPC:       %s: initalizing %d FMRs\n", __func__, i);
1140
1141        while (i--) {
1142                r = kzalloc(sizeof(*r), GFP_KERNEL);
1143                if (r == NULL)
1144                        return -ENOMEM;
1145
1146                r->r.fmr = ib_alloc_fmr(ia->ri_pd, mr_access_flags, &fmr_attr);
1147                if (IS_ERR(r->r.fmr)) {
1148                        rc = PTR_ERR(r->r.fmr);
1149                        dprintk("RPC:       %s: ib_alloc_fmr failed %i\n",
1150                                __func__, rc);
1151                        goto out_free;
1152                }
1153
1154                list_add(&r->mw_list, &buf->rb_mws);
1155                list_add(&r->mw_all, &buf->rb_all);
1156        }
1157        return 0;
1158
1159out_free:
1160        kfree(r);
1161        return rc;
1162}
1163
1164static int
1165rpcrdma_init_frmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
1166{
1167        struct rpcrdma_frmr *f;
1168        struct rpcrdma_mw *r;
1169        int i, rc;
1170
1171        i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
1172        dprintk("RPC:       %s: initalizing %d FRMRs\n", __func__, i);
1173
1174        while (i--) {
1175                r = kzalloc(sizeof(*r), GFP_KERNEL);
1176                if (r == NULL)
1177                        return -ENOMEM;
1178                f = &r->r.frmr;
1179
1180                f->fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1181                                                ia->ri_max_frmr_depth);
1182                if (IS_ERR(f->fr_mr)) {
1183                        rc = PTR_ERR(f->fr_mr);
1184                        dprintk("RPC:       %s: ib_alloc_fast_reg_mr "
1185                                "failed %i\n", __func__, rc);
1186                        goto out_free;
1187                }
1188
1189                f->fr_pgl = ib_alloc_fast_reg_page_list(ia->ri_id->device,
1190                                                        ia->ri_max_frmr_depth);
1191                if (IS_ERR(f->fr_pgl)) {
1192                        rc = PTR_ERR(f->fr_pgl);
1193                        dprintk("RPC:       %s: ib_alloc_fast_reg_page_list "
1194                                "failed %i\n", __func__, rc);
1195
1196                        ib_dereg_mr(f->fr_mr);
1197                        goto out_free;
1198                }
1199
1200                list_add(&r->mw_list, &buf->rb_mws);
1201                list_add(&r->mw_all, &buf->rb_all);
1202        }
1203
1204        return 0;
1205
1206out_free:
1207        kfree(r);
1208        return rc;
1209}
1210
1211int
1212rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1213{
1214        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1215        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1216        struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1217        char *p;
1218        size_t len;
1219        int i, rc;
1220
1221        buf->rb_max_requests = cdata->max_requests;
1222        spin_lock_init(&buf->rb_lock);
1223
1224        /* Need to allocate:
1225         *   1.  arrays for send and recv pointers
1226         *   2.  arrays of struct rpcrdma_req to fill in pointers
1227         *   3.  array of struct rpcrdma_rep for replies
1228         * Send/recv buffers in req/rep need to be registered
1229         */
1230        len = buf->rb_max_requests *
1231                (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1232
1233        p = kzalloc(len, GFP_KERNEL);
1234        if (p == NULL) {
1235                dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1236                        __func__, len);
1237                rc = -ENOMEM;
1238                goto out;
1239        }
1240        buf->rb_pool = p;       /* for freeing it later */
1241
1242        buf->rb_send_bufs = (struct rpcrdma_req **) p;
1243        p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1244        buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1245        p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1246
1247        INIT_LIST_HEAD(&buf->rb_mws);
1248        INIT_LIST_HEAD(&buf->rb_all);
1249        switch (ia->ri_memreg_strategy) {
1250        case RPCRDMA_FRMR:
1251                rc = rpcrdma_init_frmrs(ia, buf);
1252                if (rc)
1253                        goto out;
1254                break;
1255        case RPCRDMA_MTHCAFMR:
1256                rc = rpcrdma_init_fmrs(ia, buf);
1257                if (rc)
1258                        goto out;
1259                break;
1260        default:
1261                break;
1262        }
1263
1264        for (i = 0; i < buf->rb_max_requests; i++) {
1265                struct rpcrdma_req *req;
1266                struct rpcrdma_rep *rep;
1267
1268                req = rpcrdma_create_req(r_xprt);
1269                if (IS_ERR(req)) {
1270                        dprintk("RPC:       %s: request buffer %d alloc"
1271                                " failed\n", __func__, i);
1272                        rc = PTR_ERR(req);
1273                        goto out;
1274                }
1275                buf->rb_send_bufs[i] = req;
1276
1277                rep = rpcrdma_create_rep(r_xprt);
1278                if (IS_ERR(rep)) {
1279                        dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1280                                __func__, i);
1281                        rc = PTR_ERR(rep);
1282                        goto out;
1283                }
1284                buf->rb_recv_bufs[i] = rep;
1285        }
1286
1287        return 0;
1288out:
1289        rpcrdma_buffer_destroy(buf);
1290        return rc;
1291}
1292
1293static void
1294rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
1295{
1296        if (!rep)
1297                return;
1298
1299        rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
1300        kfree(rep);
1301}
1302
1303static void
1304rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
1305{
1306        if (!req)
1307                return;
1308
1309        rpcrdma_free_regbuf(ia, req->rl_sendbuf);
1310        rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
1311        kfree(req);
1312}
1313
1314static void
1315rpcrdma_destroy_fmrs(struct rpcrdma_buffer *buf)
1316{
1317        struct rpcrdma_mw *r;
1318        int rc;
1319
1320        while (!list_empty(&buf->rb_all)) {
1321                r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
1322                list_del(&r->mw_all);
1323                list_del(&r->mw_list);
1324
1325                rc = ib_dealloc_fmr(r->r.fmr);
1326                if (rc)
1327                        dprintk("RPC:       %s: ib_dealloc_fmr failed %i\n",
1328                                __func__, rc);
1329
1330                kfree(r);
1331        }
1332}
1333
1334static void
1335rpcrdma_destroy_frmrs(struct rpcrdma_buffer *buf)
1336{
1337        struct rpcrdma_mw *r;
1338        int rc;
1339
1340        while (!list_empty(&buf->rb_all)) {
1341                r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
1342                list_del(&r->mw_all);
1343                list_del(&r->mw_list);
1344
1345                rc = ib_dereg_mr(r->r.frmr.fr_mr);
1346                if (rc)
1347                        dprintk("RPC:       %s: ib_dereg_mr failed %i\n",
1348                                __func__, rc);
1349                ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1350
1351                kfree(r);
1352        }
1353}
1354
1355void
1356rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1357{
1358        struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1359        int i;
1360
1361        /* clean up in reverse order from create
1362         *   1.  recv mr memory (mr free, then kfree)
1363         *   2.  send mr memory (mr free, then kfree)
1364         *   3.  MWs
1365         */
1366        dprintk("RPC:       %s: entering\n", __func__);
1367
1368        for (i = 0; i < buf->rb_max_requests; i++) {
1369                if (buf->rb_recv_bufs)
1370                        rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
1371                if (buf->rb_send_bufs)
1372                        rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
1373        }
1374
1375        switch (ia->ri_memreg_strategy) {
1376        case RPCRDMA_FRMR:
1377                rpcrdma_destroy_frmrs(buf);
1378                break;
1379        case RPCRDMA_MTHCAFMR:
1380                rpcrdma_destroy_fmrs(buf);
1381                break;
1382        default:
1383                break;
1384        }
1385
1386        kfree(buf->rb_pool);
1387}
1388
1389/* After a disconnect, unmap all FMRs.
1390 *
1391 * This is invoked only in the transport connect worker in order
1392 * to serialize with rpcrdma_register_fmr_external().
1393 */
1394static void
1395rpcrdma_reset_fmrs(struct rpcrdma_ia *ia)
1396{
1397        struct rpcrdma_xprt *r_xprt =
1398                                container_of(ia, struct rpcrdma_xprt, rx_ia);
1399        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1400        struct list_head *pos;
1401        struct rpcrdma_mw *r;
1402        LIST_HEAD(l);
1403        int rc;
1404
1405        list_for_each(pos, &buf->rb_all) {
1406                r = list_entry(pos, struct rpcrdma_mw, mw_all);
1407
1408                INIT_LIST_HEAD(&l);
1409                list_add(&r->r.fmr->list, &l);
1410                rc = ib_unmap_fmr(&l);
1411                if (rc)
1412                        dprintk("RPC:       %s: ib_unmap_fmr failed %i\n",
1413                                __func__, rc);
1414        }
1415}
1416
1417/* After a disconnect, a flushed FAST_REG_MR can leave an FRMR in
1418 * an unusable state. Find FRMRs in this state and dereg / reg
1419 * each.  FRMRs that are VALID and attached to an rpcrdma_req are
1420 * also torn down.
1421 *
1422 * This gives all in-use FRMRs a fresh rkey and leaves them INVALID.
1423 *
1424 * This is invoked only in the transport connect worker in order
1425 * to serialize with rpcrdma_register_frmr_external().
1426 */
1427static void
1428rpcrdma_reset_frmrs(struct rpcrdma_ia *ia)
1429{
1430        struct rpcrdma_xprt *r_xprt =
1431                                container_of(ia, struct rpcrdma_xprt, rx_ia);
1432        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1433        struct list_head *pos;
1434        struct rpcrdma_mw *r;
1435        int rc;
1436
1437        list_for_each(pos, &buf->rb_all) {
1438                r = list_entry(pos, struct rpcrdma_mw, mw_all);
1439
1440                if (r->r.frmr.fr_state == FRMR_IS_INVALID)
1441                        continue;
1442
1443                rc = ib_dereg_mr(r->r.frmr.fr_mr);
1444                if (rc)
1445                        dprintk("RPC:       %s: ib_dereg_mr failed %i\n",
1446                                __func__, rc);
1447                ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1448
1449                r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1450                                        ia->ri_max_frmr_depth);
1451                if (IS_ERR(r->r.frmr.fr_mr)) {
1452                        rc = PTR_ERR(r->r.frmr.fr_mr);
1453                        dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1454                                " failed %i\n", __func__, rc);
1455                        continue;
1456                }
1457                r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
1458                                        ia->ri_id->device,
1459                                        ia->ri_max_frmr_depth);
1460                if (IS_ERR(r->r.frmr.fr_pgl)) {
1461                        rc = PTR_ERR(r->r.frmr.fr_pgl);
1462                        dprintk("RPC:       %s: "
1463                                "ib_alloc_fast_reg_page_list "
1464                                "failed %i\n", __func__, rc);
1465
1466                        ib_dereg_mr(r->r.frmr.fr_mr);
1467                        continue;
1468                }
1469                r->r.frmr.fr_state = FRMR_IS_INVALID;
1470        }
1471}
1472
1473/* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
1474 * some req segments uninitialized.
1475 */
1476static void
1477rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
1478{
1479        if (*mw) {
1480                list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
1481                *mw = NULL;
1482        }
1483}
1484
1485/* Cycle mw's back in reverse order, and "spin" them.
1486 * This delays and scrambles reuse as much as possible.
1487 */
1488static void
1489rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1490{
1491        struct rpcrdma_mr_seg *seg = req->rl_segments;
1492        struct rpcrdma_mr_seg *seg1 = seg;
1493        int i;
1494
1495        for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
1496                rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
1497        rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
1498}
1499
1500static void
1501rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1502{
1503        buf->rb_send_bufs[--buf->rb_send_index] = req;
1504        req->rl_niovs = 0;
1505        if (req->rl_reply) {
1506                buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1507                req->rl_reply->rr_func = NULL;
1508                req->rl_reply = NULL;
1509        }
1510}
1511
1512/* rpcrdma_unmap_one() was already done by rpcrdma_deregister_frmr_external().
1513 * Redo only the ib_post_send().
1514 */
1515static void
1516rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
1517{
1518        struct rpcrdma_xprt *r_xprt =
1519                                container_of(ia, struct rpcrdma_xprt, rx_ia);
1520        struct ib_send_wr invalidate_wr, *bad_wr;
1521        int rc;
1522
1523        dprintk("RPC:       %s: FRMR %p is stale\n", __func__, r);
1524
1525        /* When this FRMR is re-inserted into rb_mws, it is no longer stale */
1526        r->r.frmr.fr_state = FRMR_IS_INVALID;
1527
1528        memset(&invalidate_wr, 0, sizeof(invalidate_wr));
1529        invalidate_wr.wr_id = (unsigned long)(void *)r;
1530        invalidate_wr.opcode = IB_WR_LOCAL_INV;
1531        invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
1532        DECR_CQCOUNT(&r_xprt->rx_ep);
1533
1534        dprintk("RPC:       %s: frmr %p invalidating rkey %08x\n",
1535                __func__, r, r->r.frmr.fr_mr->rkey);
1536
1537        read_lock(&ia->ri_qplock);
1538        rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1539        read_unlock(&ia->ri_qplock);
1540        if (rc) {
1541                /* Force rpcrdma_buffer_get() to retry */
1542                r->r.frmr.fr_state = FRMR_IS_STALE;
1543                dprintk("RPC:       %s: ib_post_send failed, %i\n",
1544                        __func__, rc);
1545        }
1546}
1547
1548static void
1549rpcrdma_retry_flushed_linv(struct list_head *stale,
1550                           struct rpcrdma_buffer *buf)
1551{
1552        struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1553        struct list_head *pos;
1554        struct rpcrdma_mw *r;
1555        unsigned long flags;
1556
1557        list_for_each(pos, stale) {
1558                r = list_entry(pos, struct rpcrdma_mw, mw_list);
1559                rpcrdma_retry_local_inv(r, ia);
1560        }
1561
1562        spin_lock_irqsave(&buf->rb_lock, flags);
1563        list_splice_tail(stale, &buf->rb_mws);
1564        spin_unlock_irqrestore(&buf->rb_lock, flags);
1565}
1566
1567static struct rpcrdma_req *
1568rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
1569                         struct list_head *stale)
1570{
1571        struct rpcrdma_mw *r;
1572        int i;
1573
1574        i = RPCRDMA_MAX_SEGS - 1;
1575        while (!list_empty(&buf->rb_mws)) {
1576                r = list_entry(buf->rb_mws.next,
1577                               struct rpcrdma_mw, mw_list);
1578                list_del(&r->mw_list);
1579                if (r->r.frmr.fr_state == FRMR_IS_STALE) {
1580                        list_add(&r->mw_list, stale);
1581                        continue;
1582                }
1583                req->rl_segments[i].rl_mw = r;
1584                if (unlikely(i-- == 0))
1585                        return req;     /* Success */
1586        }
1587
1588        /* Not enough entries on rb_mws for this req */
1589        rpcrdma_buffer_put_sendbuf(req, buf);
1590        rpcrdma_buffer_put_mrs(req, buf);
1591        return NULL;
1592}
1593
1594static struct rpcrdma_req *
1595rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1596{
1597        struct rpcrdma_mw *r;
1598        int i;
1599
1600        i = RPCRDMA_MAX_SEGS - 1;
1601        while (!list_empty(&buf->rb_mws)) {
1602                r = list_entry(buf->rb_mws.next,
1603                               struct rpcrdma_mw, mw_list);
1604                list_del(&r->mw_list);
1605                req->rl_segments[i].rl_mw = r;
1606                if (unlikely(i-- == 0))
1607                        return req;     /* Success */
1608        }
1609
1610        /* Not enough entries on rb_mws for this req */
1611        rpcrdma_buffer_put_sendbuf(req, buf);
1612        rpcrdma_buffer_put_mrs(req, buf);
1613        return NULL;
1614}
1615
1616/*
1617 * Get a set of request/reply buffers.
1618 *
1619 * Reply buffer (if needed) is attached to send buffer upon return.
1620 * Rule:
1621 *    rb_send_index and rb_recv_index MUST always be pointing to the
1622 *    *next* available buffer (non-NULL). They are incremented after
1623 *    removing buffers, and decremented *before* returning them.
1624 */
1625struct rpcrdma_req *
1626rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1627{
1628        struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1629        struct list_head stale;
1630        struct rpcrdma_req *req;
1631        unsigned long flags;
1632
1633        spin_lock_irqsave(&buffers->rb_lock, flags);
1634        if (buffers->rb_send_index == buffers->rb_max_requests) {
1635                spin_unlock_irqrestore(&buffers->rb_lock, flags);
1636                dprintk("RPC:       %s: out of request buffers\n", __func__);
1637                return ((struct rpcrdma_req *)NULL);
1638        }
1639
1640        req = buffers->rb_send_bufs[buffers->rb_send_index];
1641        if (buffers->rb_send_index < buffers->rb_recv_index) {
1642                dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1643                        __func__,
1644                        buffers->rb_recv_index - buffers->rb_send_index);
1645                req->rl_reply = NULL;
1646        } else {
1647                req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1648                buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1649        }
1650        buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1651
1652        INIT_LIST_HEAD(&stale);
1653        switch (ia->ri_memreg_strategy) {
1654        case RPCRDMA_FRMR:
1655                req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
1656                break;
1657        case RPCRDMA_MTHCAFMR:
1658                req = rpcrdma_buffer_get_fmrs(req, buffers);
1659                break;
1660        default:
1661                break;
1662        }
1663        spin_unlock_irqrestore(&buffers->rb_lock, flags);
1664        if (!list_empty(&stale))
1665                rpcrdma_retry_flushed_linv(&stale, buffers);
1666        return req;
1667}
1668
1669/*
1670 * Put request/reply buffers back into pool.
1671 * Pre-decrement counter/array index.
1672 */
1673void
1674rpcrdma_buffer_put(struct rpcrdma_req *req)
1675{
1676        struct rpcrdma_buffer *buffers = req->rl_buffer;
1677        struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1678        unsigned long flags;
1679
1680        spin_lock_irqsave(&buffers->rb_lock, flags);
1681        rpcrdma_buffer_put_sendbuf(req, buffers);
1682        switch (ia->ri_memreg_strategy) {
1683        case RPCRDMA_FRMR:
1684        case RPCRDMA_MTHCAFMR:
1685                rpcrdma_buffer_put_mrs(req, buffers);
1686                break;
1687        default:
1688                break;
1689        }
1690        spin_unlock_irqrestore(&buffers->rb_lock, flags);
1691}
1692
1693/*
1694 * Recover reply buffers from pool.
1695 * This happens when recovering from error conditions.
1696 * Post-increment counter/array index.
1697 */
1698void
1699rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1700{
1701        struct rpcrdma_buffer *buffers = req->rl_buffer;
1702        unsigned long flags;
1703
1704        spin_lock_irqsave(&buffers->rb_lock, flags);
1705        if (buffers->rb_recv_index < buffers->rb_max_requests) {
1706                req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1707                buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1708        }
1709        spin_unlock_irqrestore(&buffers->rb_lock, flags);
1710}
1711
1712/*
1713 * Put reply buffers back into pool when not attached to
1714 * request. This happens in error conditions.
1715 */
1716void
1717rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1718{
1719        struct rpcrdma_buffer *buffers = rep->rr_buffer;
1720        unsigned long flags;
1721
1722        rep->rr_func = NULL;
1723        spin_lock_irqsave(&buffers->rb_lock, flags);
1724        buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1725        spin_unlock_irqrestore(&buffers->rb_lock, flags);
1726}
1727
1728/*
1729 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1730 */
1731
1732static int
1733rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1734                                struct ib_mr **mrp, struct ib_sge *iov)
1735{
1736        struct ib_phys_buf ipb;
1737        struct ib_mr *mr;
1738        int rc;
1739
1740        /*
1741         * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1742         */
1743        iov->addr = ib_dma_map_single(ia->ri_id->device,
1744                        va, len, DMA_BIDIRECTIONAL);
1745        if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1746                return -ENOMEM;
1747
1748        iov->length = len;
1749
1750        if (ia->ri_have_dma_lkey) {
1751                *mrp = NULL;
1752                iov->lkey = ia->ri_dma_lkey;
1753                return 0;
1754        } else if (ia->ri_bind_mem != NULL) {
1755                *mrp = NULL;
1756                iov->lkey = ia->ri_bind_mem->lkey;
1757                return 0;
1758        }
1759
1760        ipb.addr = iov->addr;
1761        ipb.size = iov->length;
1762        mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1763                        IB_ACCESS_LOCAL_WRITE, &iov->addr);
1764
1765        dprintk("RPC:       %s: phys convert: 0x%llx "
1766                        "registered 0x%llx length %d\n",
1767                        __func__, (unsigned long long)ipb.addr,
1768                        (unsigned long long)iov->addr, len);
1769
1770        if (IS_ERR(mr)) {
1771                *mrp = NULL;
1772                rc = PTR_ERR(mr);
1773                dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1774        } else {
1775                *mrp = mr;
1776                iov->lkey = mr->lkey;
1777                rc = 0;
1778        }
1779
1780        return rc;
1781}
1782
1783static int
1784rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1785                                struct ib_mr *mr, struct ib_sge *iov)
1786{
1787        int rc;
1788
1789        ib_dma_unmap_single(ia->ri_id->device,
1790                        iov->addr, iov->length, DMA_BIDIRECTIONAL);
1791
1792        if (NULL == mr)
1793                return 0;
1794
1795        rc = ib_dereg_mr(mr);
1796        if (rc)
1797                dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1798        return rc;
1799}
1800
1801/**
1802 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1803 * @ia: controlling rpcrdma_ia
1804 * @size: size of buffer to be allocated, in bytes
1805 * @flags: GFP flags
1806 *
1807 * Returns pointer to private header of an area of internally
1808 * registered memory, or an ERR_PTR. The registered buffer follows
1809 * the end of the private header.
1810 *
1811 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1812 * receiving the payload of RDMA RECV operations. regbufs are not
1813 * used for RDMA READ/WRITE operations, thus are registered only for
1814 * LOCAL access.
1815 */
1816struct rpcrdma_regbuf *
1817rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
1818{
1819        struct rpcrdma_regbuf *rb;
1820        int rc;
1821
1822        rc = -ENOMEM;
1823        rb = kmalloc(sizeof(*rb) + size, flags);
1824        if (rb == NULL)
1825                goto out;
1826
1827        rb->rg_size = size;
1828        rb->rg_owner = NULL;
1829        rc = rpcrdma_register_internal(ia, rb->rg_base, size,
1830                                       &rb->rg_mr, &rb->rg_iov);
1831        if (rc)
1832                goto out_free;
1833
1834        return rb;
1835
1836out_free:
1837        kfree(rb);
1838out:
1839        return ERR_PTR(rc);
1840}
1841
1842/**
1843 * rpcrdma_free_regbuf - deregister and free registered buffer
1844 * @ia: controlling rpcrdma_ia
1845 * @rb: regbuf to be deregistered and freed
1846 */
1847void
1848rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1849{
1850        if (rb) {
1851                rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
1852                kfree(rb);
1853        }
1854}
1855
1856/*
1857 * Wrappers for chunk registration, shared by read/write chunk code.
1858 */
1859
1860static void
1861rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1862{
1863        seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1864        seg->mr_dmalen = seg->mr_len;
1865        if (seg->mr_page)
1866                seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1867                                seg->mr_page, offset_in_page(seg->mr_offset),
1868                                seg->mr_dmalen, seg->mr_dir);
1869        else
1870                seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1871                                seg->mr_offset,
1872                                seg->mr_dmalen, seg->mr_dir);
1873        if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1874                dprintk("RPC:       %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1875                        __func__,
1876                        (unsigned long long)seg->mr_dma,
1877                        seg->mr_offset, seg->mr_dmalen);
1878        }
1879}
1880
1881static void
1882rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1883{
1884        if (seg->mr_page)
1885                ib_dma_unmap_page(ia->ri_id->device,
1886                                seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1887        else
1888                ib_dma_unmap_single(ia->ri_id->device,
1889                                seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1890}
1891
1892static int
1893rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1894                        int *nsegs, int writing, struct rpcrdma_ia *ia,
1895                        struct rpcrdma_xprt *r_xprt)
1896{
1897        struct rpcrdma_mr_seg *seg1 = seg;
1898        struct rpcrdma_mw *mw = seg1->rl_mw;
1899        struct rpcrdma_frmr *frmr = &mw->r.frmr;
1900        struct ib_mr *mr = frmr->fr_mr;
1901        struct ib_send_wr fastreg_wr, *bad_wr;
1902        u8 key;
1903        int len, pageoff;
1904        int i, rc;
1905        int seg_len;
1906        u64 pa;
1907        int page_no;
1908
1909        pageoff = offset_in_page(seg1->mr_offset);
1910        seg1->mr_offset -= pageoff;     /* start of page */
1911        seg1->mr_len += pageoff;
1912        len = -pageoff;
1913        if (*nsegs > ia->ri_max_frmr_depth)
1914                *nsegs = ia->ri_max_frmr_depth;
1915        for (page_no = i = 0; i < *nsegs;) {
1916                rpcrdma_map_one(ia, seg, writing);
1917                pa = seg->mr_dma;
1918                for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
1919                        frmr->fr_pgl->page_list[page_no++] = pa;
1920                        pa += PAGE_SIZE;
1921                }
1922                len += seg->mr_len;
1923                ++seg;
1924                ++i;
1925                /* Check for holes */
1926                if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1927                    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1928                        break;
1929        }
1930        dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1931                __func__, mw, i);
1932
1933        frmr->fr_state = FRMR_IS_VALID;
1934
1935        memset(&fastreg_wr, 0, sizeof(fastreg_wr));
1936        fastreg_wr.wr_id = (unsigned long)(void *)mw;
1937        fastreg_wr.opcode = IB_WR_FAST_REG_MR;
1938        fastreg_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1939        fastreg_wr.wr.fast_reg.page_list = frmr->fr_pgl;
1940        fastreg_wr.wr.fast_reg.page_list_len = page_no;
1941        fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1942        fastreg_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
1943        if (fastreg_wr.wr.fast_reg.length < len) {
1944                rc = -EIO;
1945                goto out_err;
1946        }
1947
1948        /* Bump the key */
1949        key = (u8)(mr->rkey & 0x000000FF);
1950        ib_update_fast_reg_key(mr, ++key);
1951
1952        fastreg_wr.wr.fast_reg.access_flags = (writing ?
1953                                IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1954                                IB_ACCESS_REMOTE_READ);
1955        fastreg_wr.wr.fast_reg.rkey = mr->rkey;
1956        DECR_CQCOUNT(&r_xprt->rx_ep);
1957
1958        rc = ib_post_send(ia->ri_id->qp, &fastreg_wr, &bad_wr);
1959        if (rc) {
1960                dprintk("RPC:       %s: failed ib_post_send for register,"
1961                        " status %i\n", __func__, rc);
1962                ib_update_fast_reg_key(mr, --key);
1963                goto out_err;
1964        } else {
1965                seg1->mr_rkey = mr->rkey;
1966                seg1->mr_base = seg1->mr_dma + pageoff;
1967                seg1->mr_nsegs = i;
1968                seg1->mr_len = len;
1969        }
1970        *nsegs = i;
1971        return 0;
1972out_err:
1973        frmr->fr_state = FRMR_IS_INVALID;
1974        while (i--)
1975                rpcrdma_unmap_one(ia, --seg);
1976        return rc;
1977}
1978
1979static int
1980rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1981                        struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1982{
1983        struct rpcrdma_mr_seg *seg1 = seg;
1984        struct ib_send_wr invalidate_wr, *bad_wr;
1985        int rc;
1986
1987        seg1->rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;
1988
1989        memset(&invalidate_wr, 0, sizeof invalidate_wr);
1990        invalidate_wr.wr_id = (unsigned long)(void *)seg1->rl_mw;
1991        invalidate_wr.opcode = IB_WR_LOCAL_INV;
1992        invalidate_wr.ex.invalidate_rkey = seg1->rl_mw->r.frmr.fr_mr->rkey;
1993        DECR_CQCOUNT(&r_xprt->rx_ep);
1994
1995        read_lock(&ia->ri_qplock);
1996        while (seg1->mr_nsegs--)
1997                rpcrdma_unmap_one(ia, seg++);
1998        rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1999        read_unlock(&ia->ri_qplock);
2000        if (rc) {
2001                /* Force rpcrdma_buffer_get() to retry */
2002                seg1->rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
2003                dprintk("RPC:       %s: failed ib_post_send for invalidate,"
2004                        " status %i\n", __func__, rc);
2005        }
2006        return rc;
2007}
2008
2009static int
2010rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
2011                        int *nsegs, int writing, struct rpcrdma_ia *ia)
2012{
2013        struct rpcrdma_mr_seg *seg1 = seg;
2014        u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
2015        int len, pageoff, i, rc;
2016
2017        pageoff = offset_in_page(seg1->mr_offset);
2018        seg1->mr_offset -= pageoff;     /* start of page */
2019        seg1->mr_len += pageoff;
2020        len = -pageoff;
2021        if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
2022                *nsegs = RPCRDMA_MAX_DATA_SEGS;
2023        for (i = 0; i < *nsegs;) {
2024                rpcrdma_map_one(ia, seg, writing);
2025                physaddrs[i] = seg->mr_dma;
2026                len += seg->mr_len;
2027                ++seg;
2028                ++i;
2029                /* Check for holes */
2030                if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
2031                    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
2032                        break;
2033        }
2034        rc = ib_map_phys_fmr(seg1->rl_mw->r.fmr, physaddrs, i, seg1->mr_dma);
2035        if (rc) {
2036                dprintk("RPC:       %s: failed ib_map_phys_fmr "
2037                        "%u@0x%llx+%i (%d)... status %i\n", __func__,
2038                        len, (unsigned long long)seg1->mr_dma,
2039                        pageoff, i, rc);
2040                while (i--)
2041                        rpcrdma_unmap_one(ia, --seg);
2042        } else {
2043                seg1->mr_rkey = seg1->rl_mw->r.fmr->rkey;
2044                seg1->mr_base = seg1->mr_dma + pageoff;
2045                seg1->mr_nsegs = i;
2046                seg1->mr_len = len;
2047        }
2048        *nsegs = i;
2049        return rc;
2050}
2051
2052static int
2053rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
2054                        struct rpcrdma_ia *ia)
2055{
2056        struct rpcrdma_mr_seg *seg1 = seg;
2057        LIST_HEAD(l);
2058        int rc;
2059
2060        list_add(&seg1->rl_mw->r.fmr->list, &l);
2061        rc = ib_unmap_fmr(&l);
2062        read_lock(&ia->ri_qplock);
2063        while (seg1->mr_nsegs--)
2064                rpcrdma_unmap_one(ia, seg++);
2065        read_unlock(&ia->ri_qplock);
2066        if (rc)
2067                dprintk("RPC:       %s: failed ib_unmap_fmr,"
2068                        " status %i\n", __func__, rc);
2069        return rc;
2070}
2071
2072int
2073rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
2074                        int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
2075{
2076        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
2077        int rc = 0;
2078
2079        switch (ia->ri_memreg_strategy) {
2080
2081        case RPCRDMA_ALLPHYSICAL:
2082                rpcrdma_map_one(ia, seg, writing);
2083                seg->mr_rkey = ia->ri_bind_mem->rkey;
2084                seg->mr_base = seg->mr_dma;
2085                seg->mr_nsegs = 1;
2086                nsegs = 1;
2087                break;
2088
2089        /* Registration using frmr registration */
2090        case RPCRDMA_FRMR:
2091                rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
2092                break;
2093
2094        /* Registration using fmr memory registration */
2095        case RPCRDMA_MTHCAFMR:
2096                rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
2097                break;
2098
2099        default:
2100                return -EIO;
2101        }
2102        if (rc)
2103                return rc;
2104
2105        return nsegs;
2106}
2107
2108int
2109rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
2110                struct rpcrdma_xprt *r_xprt)
2111{
2112        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
2113        int nsegs = seg->mr_nsegs, rc;
2114
2115        switch (ia->ri_memreg_strategy) {
2116
2117        case RPCRDMA_ALLPHYSICAL:
2118                read_lock(&ia->ri_qplock);
2119                rpcrdma_unmap_one(ia, seg);
2120                read_unlock(&ia->ri_qplock);
2121                break;
2122
2123        case RPCRDMA_FRMR:
2124                rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
2125                break;
2126
2127        case RPCRDMA_MTHCAFMR:
2128                rc = rpcrdma_deregister_fmr_external(seg, ia);
2129                break;
2130
2131        default:
2132                break;
2133        }
2134        return nsegs;
2135}
2136
2137/*
2138 * Prepost any receive buffer, then post send.
2139 *
2140 * Receive buffer is donated to hardware, reclaimed upon recv completion.
2141 */
2142int
2143rpcrdma_ep_post(struct rpcrdma_ia *ia,
2144                struct rpcrdma_ep *ep,
2145                struct rpcrdma_req *req)
2146{
2147        struct ib_send_wr send_wr, *send_wr_fail;
2148        struct rpcrdma_rep *rep = req->rl_reply;
2149        int rc;
2150
2151        if (rep) {
2152                rc = rpcrdma_ep_post_recv(ia, ep, rep);
2153                if (rc)
2154                        goto out;
2155                req->rl_reply = NULL;
2156        }
2157
2158        send_wr.next = NULL;
2159        send_wr.wr_id = 0ULL;   /* no send cookie */
2160        send_wr.sg_list = req->rl_send_iov;
2161        send_wr.num_sge = req->rl_niovs;
2162        send_wr.opcode = IB_WR_SEND;
2163        if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
2164                ib_dma_sync_single_for_device(ia->ri_id->device,
2165                        req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
2166                        DMA_TO_DEVICE);
2167        ib_dma_sync_single_for_device(ia->ri_id->device,
2168                req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
2169                DMA_TO_DEVICE);
2170        ib_dma_sync_single_for_device(ia->ri_id->device,
2171                req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
2172                DMA_TO_DEVICE);
2173
2174        if (DECR_CQCOUNT(ep) > 0)
2175                send_wr.send_flags = 0;
2176        else { /* Provider must take a send completion every now and then */
2177                INIT_CQCOUNT(ep);
2178                send_wr.send_flags = IB_SEND_SIGNALED;
2179        }
2180
2181        rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
2182        if (rc)
2183                dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
2184                        rc);
2185out:
2186        return rc;
2187}
2188
2189/*
2190 * (Re)post a receive buffer.
2191 */
2192int
2193rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
2194                     struct rpcrdma_ep *ep,
2195                     struct rpcrdma_rep *rep)
2196{
2197        struct ib_recv_wr recv_wr, *recv_wr_fail;
2198        int rc;
2199
2200        recv_wr.next = NULL;
2201        recv_wr.wr_id = (u64) (unsigned long) rep;
2202        recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
2203        recv_wr.num_sge = 1;
2204
2205        ib_dma_sync_single_for_cpu(ia->ri_id->device,
2206                                   rdmab_addr(rep->rr_rdmabuf),
2207                                   rdmab_length(rep->rr_rdmabuf),
2208                                   DMA_BIDIRECTIONAL);
2209
2210        rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
2211
2212        if (rc)
2213                dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
2214                        rc);
2215        return rc;
2216}
2217
2218/* Physical mapping means one Read/Write list entry per-page.
2219 * All list entries must fit within an inline buffer
2220 *
2221 * NB: The server must return a Write list for NFS READ,
2222 *     which has the same constraint. Factor in the inline
2223 *     rsize as well.
2224 */
2225static size_t
2226rpcrdma_physical_max_payload(struct rpcrdma_xprt *r_xprt)
2227{
2228        struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
2229        unsigned int inline_size, pages;
2230
2231        inline_size = min_t(unsigned int,
2232                            cdata->inline_wsize, cdata->inline_rsize);
2233        inline_size -= RPCRDMA_HDRLEN_MIN;
2234        pages = inline_size / sizeof(struct rpcrdma_segment);
2235        return pages << PAGE_SHIFT;
2236}
2237
2238static size_t
2239rpcrdma_mr_max_payload(struct rpcrdma_xprt *r_xprt)
2240{
2241        return RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT;
2242}
2243
2244size_t
2245rpcrdma_max_payload(struct rpcrdma_xprt *r_xprt)
2246{
2247        size_t result;
2248
2249        switch (r_xprt->rx_ia.ri_memreg_strategy) {
2250        case RPCRDMA_ALLPHYSICAL:
2251                result = rpcrdma_physical_max_payload(r_xprt);
2252                break;
2253        default:
2254                result = rpcrdma_mr_max_payload(r_xprt);
2255        }
2256        return result;
2257}
2258