linux/net/sunrpc/xprtrdma/frwr_ops.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2015 Oracle.  All rights reserved.
   3 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   4 */
   5
   6/* Lightweight memory registration using Fast Registration Work
   7 * Requests (FRWR). Also referred to sometimes as FRMR mode.
   8 *
   9 * FRWR features ordered asynchronous registration and deregistration
  10 * of arbitrarily sized memory regions. This is the fastest and safest
  11 * but most complex memory registration mode.
  12 */
  13
  14/* Normal operation
  15 *
  16 * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG
  17 * Work Request (frmr_op_map). When the RDMA operation is finished, this
  18 * Memory Region is invalidated using a LOCAL_INV Work Request
  19 * (frmr_op_unmap).
  20 *
  21 * Typically these Work Requests are not signaled, and neither are RDMA
  22 * SEND Work Requests (with the exception of signaling occasionally to
  23 * prevent provider work queue overflows). This greatly reduces HCA
  24 * interrupt workload.
  25 *
  26 * As an optimization, frwr_op_unmap marks MRs INVALID before the
  27 * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on
  28 * rb_mws immediately so that no work (like managing a linked list
  29 * under a spinlock) is needed in the completion upcall.
  30 *
  31 * But this means that frwr_op_map() can occasionally encounter an MR
  32 * that is INVALID but the LOCAL_INV WR has not completed. Work Queue
  33 * ordering prevents a subsequent FAST_REG WR from executing against
  34 * that MR while it is still being invalidated.
  35 */
  36
  37/* Transport recovery
  38 *
  39 * ->op_map and the transport connect worker cannot run at the same
  40 * time, but ->op_unmap can fire while the transport connect worker
  41 * is running. Thus MR recovery is handled in ->op_map, to guarantee
  42 * that recovered MRs are owned by a sending RPC, and not one where
  43 * ->op_unmap could fire at the same time transport reconnect is
  44 * being done.
  45 *
  46 * When the underlying transport disconnects, MRs are left in one of
  47 * three states:
  48 *
  49 * INVALID:     The MR was not in use before the QP entered ERROR state.
  50 *              (Or, the LOCAL_INV WR has not completed or flushed yet).
  51 *
  52 * STALE:       The MR was being registered or unregistered when the QP
  53 *              entered ERROR state, and the pending WR was flushed.
  54 *
  55 * VALID:       The MR was registered before the QP entered ERROR state.
  56 *
  57 * When frwr_op_map encounters STALE and VALID MRs, they are recovered
  58 * with ib_dereg_mr and then are re-initialized. Beause MR recovery
  59 * allocates fresh resources, it is deferred to a workqueue, and the
  60 * recovered MRs are placed back on the rb_mws list when recovery is
  61 * complete. frwr_op_map allocates another MR for the current RPC while
  62 * the broken MR is reset.
  63 *
  64 * To ensure that frwr_op_map doesn't encounter an MR that is marked
  65 * INVALID but that is about to be flushed due to a previous transport
  66 * disconnect, the transport connect worker attempts to drain all
  67 * pending send queue WRs before the transport is reconnected.
  68 */
  69
  70#include "xprt_rdma.h"
  71
  72#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  73# define RPCDBG_FACILITY        RPCDBG_TRANS
  74#endif
  75
  76static struct workqueue_struct *frwr_recovery_wq;
  77
  78#define FRWR_RECOVERY_WQ_FLAGS          (WQ_UNBOUND | WQ_MEM_RECLAIM)
  79
  80int
  81frwr_alloc_recovery_wq(void)
  82{
  83        frwr_recovery_wq = alloc_workqueue("frwr_recovery",
  84                                           FRWR_RECOVERY_WQ_FLAGS, 0);
  85        return !frwr_recovery_wq ? -ENOMEM : 0;
  86}
  87
  88void
  89frwr_destroy_recovery_wq(void)
  90{
  91        struct workqueue_struct *wq;
  92
  93        if (!frwr_recovery_wq)
  94                return;
  95
  96        wq = frwr_recovery_wq;
  97        frwr_recovery_wq = NULL;
  98        destroy_workqueue(wq);
  99}
 100
 101/* Deferred reset of a single FRMR. Generate a fresh rkey by
 102 * replacing the MR.
 103 *
 104 * There's no recovery if this fails. The FRMR is abandoned, but
 105 * remains in rb_all. It will be cleaned up when the transport is
 106 * destroyed.
 107 */
 108static void
 109__frwr_recovery_worker(struct work_struct *work)
 110{
 111        struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
 112                                            frmr.fr_work);
 113        struct rpcrdma_xprt *r_xprt = r->frmr.fr_xprt;
 114        unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
 115        struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
 116
 117        if (ib_dereg_mr(r->frmr.fr_mr))
 118                goto out_fail;
 119
 120        r->frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
 121        if (IS_ERR(r->frmr.fr_mr))
 122                goto out_fail;
 123
 124        dprintk("RPC:       %s: recovered FRMR %p\n", __func__, r);
 125        r->frmr.fr_state = FRMR_IS_INVALID;
 126        rpcrdma_put_mw(r_xprt, r);
 127        return;
 128
 129out_fail:
 130        pr_warn("RPC:       %s: FRMR %p unrecovered\n",
 131                __func__, r);
 132}
 133
 134/* A broken MR was discovered in a context that can't sleep.
 135 * Defer recovery to the recovery worker.
 136 */
 137static void
 138__frwr_queue_recovery(struct rpcrdma_mw *r)
 139{
 140        INIT_WORK(&r->frmr.fr_work, __frwr_recovery_worker);
 141        queue_work(frwr_recovery_wq, &r->frmr.fr_work);
 142}
 143
 144static int
 145__frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
 146            unsigned int depth)
 147{
 148        struct rpcrdma_frmr *f = &r->frmr;
 149        int rc;
 150
 151        f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
 152        if (IS_ERR(f->fr_mr))
 153                goto out_mr_err;
 154
 155        f->sg = kcalloc(depth, sizeof(*f->sg), GFP_KERNEL);
 156        if (!f->sg)
 157                goto out_list_err;
 158
 159        sg_init_table(f->sg, depth);
 160
 161        init_completion(&f->fr_linv_done);
 162
 163        return 0;
 164
 165out_mr_err:
 166        rc = PTR_ERR(f->fr_mr);
 167        dprintk("RPC:       %s: ib_alloc_mr status %i\n",
 168                __func__, rc);
 169        return rc;
 170
 171out_list_err:
 172        rc = -ENOMEM;
 173        dprintk("RPC:       %s: sg allocation failure\n",
 174                __func__);
 175        ib_dereg_mr(f->fr_mr);
 176        return rc;
 177}
 178
 179static void
 180__frwr_release(struct rpcrdma_mw *r)
 181{
 182        int rc;
 183
 184        rc = ib_dereg_mr(r->frmr.fr_mr);
 185        if (rc)
 186                dprintk("RPC:       %s: ib_dereg_mr status %i\n",
 187                        __func__, rc);
 188        kfree(r->frmr.sg);
 189}
 190
 191static int
 192frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
 193             struct rpcrdma_create_data_internal *cdata)
 194{
 195        int depth, delta;
 196
 197        ia->ri_max_frmr_depth =
 198                        min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
 199                              ia->ri_device->attrs.max_fast_reg_page_list_len);
 200        dprintk("RPC:       %s: device's max FR page list len = %u\n",
 201                __func__, ia->ri_max_frmr_depth);
 202
 203        /* Add room for frmr register and invalidate WRs.
 204         * 1. FRMR reg WR for head
 205         * 2. FRMR invalidate WR for head
 206         * 3. N FRMR reg WRs for pagelist
 207         * 4. N FRMR invalidate WRs for pagelist
 208         * 5. FRMR reg WR for tail
 209         * 6. FRMR invalidate WR for tail
 210         * 7. The RDMA_SEND WR
 211         */
 212        depth = 7;
 213
 214        /* Calculate N if the device max FRMR depth is smaller than
 215         * RPCRDMA_MAX_DATA_SEGS.
 216         */
 217        if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
 218                delta = RPCRDMA_MAX_DATA_SEGS - ia->ri_max_frmr_depth;
 219                do {
 220                        depth += 2; /* FRMR reg + invalidate */
 221                        delta -= ia->ri_max_frmr_depth;
 222                } while (delta > 0);
 223        }
 224
 225        ep->rep_attr.cap.max_send_wr *= depth;
 226        if (ep->rep_attr.cap.max_send_wr > ia->ri_device->attrs.max_qp_wr) {
 227                cdata->max_requests = ia->ri_device->attrs.max_qp_wr / depth;
 228                if (!cdata->max_requests)
 229                        return -EINVAL;
 230                ep->rep_attr.cap.max_send_wr = cdata->max_requests *
 231                                               depth;
 232        }
 233
 234        return 0;
 235}
 236
 237/* FRWR mode conveys a list of pages per chunk segment. The
 238 * maximum length of that list is the FRWR page list depth.
 239 */
 240static size_t
 241frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
 242{
 243        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 244
 245        return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
 246                     rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth);
 247}
 248
 249static void
 250__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_frmr *frmr,
 251                            const char *wr)
 252{
 253        frmr->fr_state = FRMR_IS_STALE;
 254        if (wc->status != IB_WC_WR_FLUSH_ERR)
 255                pr_err("rpcrdma: %s: %s (%u/0x%x)\n",
 256                       wr, ib_wc_status_msg(wc->status),
 257                       wc->status, wc->vendor_err);
 258}
 259
 260/**
 261 * frwr_wc_fastreg - Invoked by RDMA provider for each polled FastReg WC
 262 * @cq: completion queue (ignored)
 263 * @wc: completed WR
 264 *
 265 */
 266static void
 267frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
 268{
 269        struct rpcrdma_frmr *frmr;
 270        struct ib_cqe *cqe;
 271
 272        /* WARNING: Only wr_cqe and status are reliable at this point */
 273        if (wc->status != IB_WC_SUCCESS) {
 274                cqe = wc->wr_cqe;
 275                frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
 276                __frwr_sendcompletion_flush(wc, frmr, "fastreg");
 277        }
 278}
 279
 280/**
 281 * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
 282 * @cq: completion queue (ignored)
 283 * @wc: completed WR
 284 *
 285 */
 286static void
 287frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
 288{
 289        struct rpcrdma_frmr *frmr;
 290        struct ib_cqe *cqe;
 291
 292        /* WARNING: Only wr_cqe and status are reliable at this point */
 293        if (wc->status != IB_WC_SUCCESS) {
 294                cqe = wc->wr_cqe;
 295                frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
 296                __frwr_sendcompletion_flush(wc, frmr, "localinv");
 297        }
 298}
 299
 300/**
 301 * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
 302 * @cq: completion queue (ignored)
 303 * @wc: completed WR
 304 *
 305 * Awaken anyone waiting for an MR to finish being fenced.
 306 */
 307static void
 308frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
 309{
 310        struct rpcrdma_frmr *frmr;
 311        struct ib_cqe *cqe;
 312
 313        /* WARNING: Only wr_cqe and status are reliable at this point */
 314        cqe = wc->wr_cqe;
 315        frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
 316        if (wc->status != IB_WC_SUCCESS)
 317                __frwr_sendcompletion_flush(wc, frmr, "localinv");
 318        complete_all(&frmr->fr_linv_done);
 319}
 320
 321static int
 322frwr_op_init(struct rpcrdma_xprt *r_xprt)
 323{
 324        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 325        struct ib_device *device = r_xprt->rx_ia.ri_device;
 326        unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
 327        struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
 328        int i;
 329
 330        spin_lock_init(&buf->rb_mwlock);
 331        INIT_LIST_HEAD(&buf->rb_mws);
 332        INIT_LIST_HEAD(&buf->rb_all);
 333
 334        i = max_t(int, RPCRDMA_MAX_DATA_SEGS / depth, 1);
 335        i += 2;                         /* head + tail */
 336        i *= buf->rb_max_requests;      /* one set for each RPC slot */
 337        dprintk("RPC:       %s: initalizing %d FRMRs\n", __func__, i);
 338
 339        while (i--) {
 340                struct rpcrdma_mw *r;
 341                int rc;
 342
 343                r = kzalloc(sizeof(*r), GFP_KERNEL);
 344                if (!r)
 345                        return -ENOMEM;
 346
 347                rc = __frwr_init(r, pd, device, depth);
 348                if (rc) {
 349                        kfree(r);
 350                        return rc;
 351                }
 352
 353                list_add(&r->mw_list, &buf->rb_mws);
 354                list_add(&r->mw_all, &buf->rb_all);
 355                r->frmr.fr_xprt = r_xprt;
 356        }
 357
 358        return 0;
 359}
 360
 361/* Post a FAST_REG Work Request to register a memory region
 362 * for remote access via RDMA READ or RDMA WRITE.
 363 */
 364static int
 365frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 366            int nsegs, bool writing)
 367{
 368        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 369        struct ib_device *device = ia->ri_device;
 370        enum dma_data_direction direction = rpcrdma_data_dir(writing);
 371        struct rpcrdma_mr_seg *seg1 = seg;
 372        struct rpcrdma_mw *mw;
 373        struct rpcrdma_frmr *frmr;
 374        struct ib_mr *mr;
 375        struct ib_reg_wr *reg_wr;
 376        struct ib_send_wr *bad_wr;
 377        int rc, i, n, dma_nents;
 378        u8 key;
 379
 380        mw = seg1->rl_mw;
 381        seg1->rl_mw = NULL;
 382        do {
 383                if (mw)
 384                        __frwr_queue_recovery(mw);
 385                mw = rpcrdma_get_mw(r_xprt);
 386                if (!mw)
 387                        return -ENOMEM;
 388        } while (mw->frmr.fr_state != FRMR_IS_INVALID);
 389        frmr = &mw->frmr;
 390        frmr->fr_state = FRMR_IS_VALID;
 391        mr = frmr->fr_mr;
 392        reg_wr = &frmr->fr_regwr;
 393
 394        if (nsegs > ia->ri_max_frmr_depth)
 395                nsegs = ia->ri_max_frmr_depth;
 396
 397        for (i = 0; i < nsegs;) {
 398                if (seg->mr_page)
 399                        sg_set_page(&frmr->sg[i],
 400                                    seg->mr_page,
 401                                    seg->mr_len,
 402                                    offset_in_page(seg->mr_offset));
 403                else
 404                        sg_set_buf(&frmr->sg[i], seg->mr_offset,
 405                                   seg->mr_len);
 406
 407                ++seg;
 408                ++i;
 409
 410                /* Check for holes */
 411                if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
 412                    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
 413                        break;
 414        }
 415        frmr->sg_nents = i;
 416
 417        dma_nents = ib_dma_map_sg(device, frmr->sg, frmr->sg_nents, direction);
 418        if (!dma_nents) {
 419                pr_err("RPC:       %s: failed to dma map sg %p sg_nents %u\n",
 420                       __func__, frmr->sg, frmr->sg_nents);
 421                return -ENOMEM;
 422        }
 423
 424        n = ib_map_mr_sg(mr, frmr->sg, frmr->sg_nents, PAGE_SIZE);
 425        if (unlikely(n != frmr->sg_nents)) {
 426                pr_err("RPC:       %s: failed to map mr %p (%u/%u)\n",
 427                       __func__, frmr->fr_mr, n, frmr->sg_nents);
 428                rc = n < 0 ? n : -EINVAL;
 429                goto out_senderr;
 430        }
 431
 432        dprintk("RPC:       %s: Using frmr %p to map %u segments (%u bytes)\n",
 433                __func__, mw, frmr->sg_nents, mr->length);
 434
 435        key = (u8)(mr->rkey & 0x000000FF);
 436        ib_update_fast_reg_key(mr, ++key);
 437
 438        reg_wr->wr.next = NULL;
 439        reg_wr->wr.opcode = IB_WR_REG_MR;
 440        frmr->fr_cqe.done = frwr_wc_fastreg;
 441        reg_wr->wr.wr_cqe = &frmr->fr_cqe;
 442        reg_wr->wr.num_sge = 0;
 443        reg_wr->wr.send_flags = 0;
 444        reg_wr->mr = mr;
 445        reg_wr->key = mr->rkey;
 446        reg_wr->access = writing ?
 447                         IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
 448                         IB_ACCESS_REMOTE_READ;
 449
 450        DECR_CQCOUNT(&r_xprt->rx_ep);
 451        rc = ib_post_send(ia->ri_id->qp, &reg_wr->wr, &bad_wr);
 452        if (rc)
 453                goto out_senderr;
 454
 455        seg1->mr_dir = direction;
 456        seg1->rl_mw = mw;
 457        seg1->mr_rkey = mr->rkey;
 458        seg1->mr_base = mr->iova;
 459        seg1->mr_nsegs = frmr->sg_nents;
 460        seg1->mr_len = mr->length;
 461
 462        return frmr->sg_nents;
 463
 464out_senderr:
 465        dprintk("RPC:       %s: ib_post_send status %i\n", __func__, rc);
 466        ib_dma_unmap_sg(device, frmr->sg, dma_nents, direction);
 467        __frwr_queue_recovery(mw);
 468        return rc;
 469}
 470
 471static struct ib_send_wr *
 472__frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
 473{
 474        struct rpcrdma_mw *mw = seg->rl_mw;
 475        struct rpcrdma_frmr *f = &mw->frmr;
 476        struct ib_send_wr *invalidate_wr;
 477
 478        f->fr_state = FRMR_IS_INVALID;
 479        invalidate_wr = &f->fr_invwr;
 480
 481        memset(invalidate_wr, 0, sizeof(*invalidate_wr));
 482        f->fr_cqe.done = frwr_wc_localinv;
 483        invalidate_wr->wr_cqe = &f->fr_cqe;
 484        invalidate_wr->opcode = IB_WR_LOCAL_INV;
 485        invalidate_wr->ex.invalidate_rkey = f->fr_mr->rkey;
 486
 487        return invalidate_wr;
 488}
 489
 490static void
 491__frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 492                 int rc)
 493{
 494        struct ib_device *device = r_xprt->rx_ia.ri_device;
 495        struct rpcrdma_mw *mw = seg->rl_mw;
 496        struct rpcrdma_frmr *f = &mw->frmr;
 497
 498        seg->rl_mw = NULL;
 499
 500        ib_dma_unmap_sg(device, f->sg, f->sg_nents, seg->mr_dir);
 501
 502        if (!rc)
 503                rpcrdma_put_mw(r_xprt, mw);
 504        else
 505                __frwr_queue_recovery(mw);
 506}
 507
 508/* Invalidate all memory regions that were registered for "req".
 509 *
 510 * Sleeps until it is safe for the host CPU to access the
 511 * previously mapped memory regions.
 512 */
 513static void
 514frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 515{
 516        struct ib_send_wr *invalidate_wrs, *pos, *prev, *bad_wr;
 517        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 518        struct rpcrdma_mr_seg *seg;
 519        unsigned int i, nchunks;
 520        struct rpcrdma_frmr *f;
 521        int rc;
 522
 523        dprintk("RPC:       %s: req %p\n", __func__, req);
 524
 525        /* ORDER: Invalidate all of the req's MRs first
 526         *
 527         * Chain the LOCAL_INV Work Requests and post them with
 528         * a single ib_post_send() call.
 529         */
 530        invalidate_wrs = pos = prev = NULL;
 531        seg = NULL;
 532        for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
 533                seg = &req->rl_segments[i];
 534
 535                pos = __frwr_prepare_linv_wr(seg);
 536
 537                if (!invalidate_wrs)
 538                        invalidate_wrs = pos;
 539                else
 540                        prev->next = pos;
 541                prev = pos;
 542
 543                i += seg->mr_nsegs;
 544        }
 545        f = &seg->rl_mw->frmr;
 546
 547        /* Strong send queue ordering guarantees that when the
 548         * last WR in the chain completes, all WRs in the chain
 549         * are complete.
 550         */
 551        f->fr_invwr.send_flags = IB_SEND_SIGNALED;
 552        f->fr_cqe.done = frwr_wc_localinv_wake;
 553        reinit_completion(&f->fr_linv_done);
 554        INIT_CQCOUNT(&r_xprt->rx_ep);
 555
 556        /* Transport disconnect drains the receive CQ before it
 557         * replaces the QP. The RPC reply handler won't call us
 558         * unless ri_id->qp is a valid pointer.
 559         */
 560        rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr);
 561        if (rc) {
 562                pr_warn("%s: ib_post_send failed %i\n", __func__, rc);
 563                rdma_disconnect(ia->ri_id);
 564                goto unmap;
 565        }
 566
 567        wait_for_completion(&f->fr_linv_done);
 568
 569        /* ORDER: Now DMA unmap all of the req's MRs, and return
 570         * them to the free MW list.
 571         */
 572unmap:
 573        for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
 574                seg = &req->rl_segments[i];
 575
 576                __frwr_dma_unmap(r_xprt, seg, rc);
 577
 578                i += seg->mr_nsegs;
 579                seg->mr_nsegs = 0;
 580        }
 581
 582        req->rl_nchunks = 0;
 583}
 584
 585/* Post a LOCAL_INV Work Request to prevent further remote access
 586 * via RDMA READ or RDMA WRITE.
 587 */
 588static int
 589frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
 590{
 591        struct rpcrdma_mr_seg *seg1 = seg;
 592        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 593        struct rpcrdma_mw *mw = seg1->rl_mw;
 594        struct rpcrdma_frmr *frmr = &mw->frmr;
 595        struct ib_send_wr *invalidate_wr, *bad_wr;
 596        int rc, nsegs = seg->mr_nsegs;
 597
 598        dprintk("RPC:       %s: FRMR %p\n", __func__, mw);
 599
 600        seg1->rl_mw = NULL;
 601        frmr->fr_state = FRMR_IS_INVALID;
 602        invalidate_wr = &mw->frmr.fr_invwr;
 603
 604        memset(invalidate_wr, 0, sizeof(*invalidate_wr));
 605        frmr->fr_cqe.done = frwr_wc_localinv;
 606        invalidate_wr->wr_cqe = &frmr->fr_cqe;
 607        invalidate_wr->opcode = IB_WR_LOCAL_INV;
 608        invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey;
 609        DECR_CQCOUNT(&r_xprt->rx_ep);
 610
 611        ib_dma_unmap_sg(ia->ri_device, frmr->sg, frmr->sg_nents, seg1->mr_dir);
 612        read_lock(&ia->ri_qplock);
 613        rc = ib_post_send(ia->ri_id->qp, invalidate_wr, &bad_wr);
 614        read_unlock(&ia->ri_qplock);
 615        if (rc)
 616                goto out_err;
 617
 618        rpcrdma_put_mw(r_xprt, mw);
 619        return nsegs;
 620
 621out_err:
 622        dprintk("RPC:       %s: ib_post_send status %i\n", __func__, rc);
 623        __frwr_queue_recovery(mw);
 624        return nsegs;
 625}
 626
 627static void
 628frwr_op_destroy(struct rpcrdma_buffer *buf)
 629{
 630        struct rpcrdma_mw *r;
 631
 632        /* Ensure stale MWs for "buf" are no longer in flight */
 633        flush_workqueue(frwr_recovery_wq);
 634
 635        while (!list_empty(&buf->rb_all)) {
 636                r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
 637                list_del(&r->mw_all);
 638                __frwr_release(r);
 639                kfree(r);
 640        }
 641}
 642
 643const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
 644        .ro_map                         = frwr_op_map,
 645        .ro_unmap_sync                  = frwr_op_unmap_sync,
 646        .ro_unmap                       = frwr_op_unmap,
 647        .ro_open                        = frwr_op_open,
 648        .ro_maxpages                    = frwr_op_maxpages,
 649        .ro_init                        = frwr_op_init,
 650        .ro_destroy                     = frwr_op_destroy,
 651        .ro_displayname                 = "frwr",
 652};
 653