linux/net/sunrpc/xprtrdma/svc_rdma_rw.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * Copyright (c) 2016-2018 Oracle.  All rights reserved.
   4 *
   5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
   6 */
   7
   8#include <rdma/rw.h>
   9
  10#include <linux/sunrpc/rpc_rdma.h>
  11#include <linux/sunrpc/svc_rdma.h>
  12#include <linux/sunrpc/debug.h>
  13
  14#include "xprt_rdma.h"
  15#include <trace/events/rpcrdma.h>
  16
  17#define RPCDBG_FACILITY RPCDBG_SVCXPRT
  18
  19static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc);
  20static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc);
  21
  22/* Each R/W context contains state for one chain of RDMA Read or
  23 * Write Work Requests.
  24 *
  25 * Each WR chain handles a single contiguous server-side buffer,
  26 * because scatterlist entries after the first have to start on
  27 * page alignment. xdr_buf iovecs cannot guarantee alignment.
  28 *
  29 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
  30 * from a client may contain a unique R_key, so each WR chain moves
  31 * up to one segment at a time.
  32 *
  33 * The scatterlist makes this data structure over 4KB in size. To
  34 * make it less likely to fail, and to handle the allocation for
  35 * smaller I/O requests without disabling bottom-halves, these
  36 * contexts are created on demand, but cached and reused until the
  37 * controlling svcxprt_rdma is destroyed.
  38 */
  39struct svc_rdma_rw_ctxt {
  40        struct list_head        rw_list;
  41        struct rdma_rw_ctx      rw_ctx;
  42        int                     rw_nents;
  43        struct sg_table         rw_sg_table;
  44        struct scatterlist      rw_first_sgl[0];
  45};
  46
  47static inline struct svc_rdma_rw_ctxt *
  48svc_rdma_next_ctxt(struct list_head *list)
  49{
  50        return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt,
  51                                        rw_list);
  52}
  53
  54static struct svc_rdma_rw_ctxt *
  55svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges)
  56{
  57        struct svc_rdma_rw_ctxt *ctxt;
  58
  59        spin_lock(&rdma->sc_rw_ctxt_lock);
  60
  61        ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts);
  62        if (ctxt) {
  63                list_del(&ctxt->rw_list);
  64                spin_unlock(&rdma->sc_rw_ctxt_lock);
  65        } else {
  66                spin_unlock(&rdma->sc_rw_ctxt_lock);
  67                ctxt = kmalloc(sizeof(*ctxt) +
  68                               SG_CHUNK_SIZE * sizeof(struct scatterlist),
  69                               GFP_KERNEL);
  70                if (!ctxt)
  71                        goto out;
  72                INIT_LIST_HEAD(&ctxt->rw_list);
  73        }
  74
  75        ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
  76        if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
  77                                   ctxt->rw_sg_table.sgl)) {
  78                kfree(ctxt);
  79                ctxt = NULL;
  80        }
  81out:
  82        return ctxt;
  83}
  84
  85static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
  86                                 struct svc_rdma_rw_ctxt *ctxt)
  87{
  88        sg_free_table_chained(&ctxt->rw_sg_table, true);
  89
  90        spin_lock(&rdma->sc_rw_ctxt_lock);
  91        list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts);
  92        spin_unlock(&rdma->sc_rw_ctxt_lock);
  93}
  94
  95/**
  96 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
  97 * @rdma: transport about to be destroyed
  98 *
  99 */
 100void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
 101{
 102        struct svc_rdma_rw_ctxt *ctxt;
 103
 104        while ((ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts)) != NULL) {
 105                list_del(&ctxt->rw_list);
 106                kfree(ctxt);
 107        }
 108}
 109
 110/* A chunk context tracks all I/O for moving one Read or Write
 111 * chunk. This is a a set of rdma_rw's that handle data movement
 112 * for all segments of one chunk.
 113 *
 114 * These are small, acquired with a single allocator call, and
 115 * no more than one is needed per chunk. They are allocated on
 116 * demand, and not cached.
 117 */
 118struct svc_rdma_chunk_ctxt {
 119        struct ib_cqe           cc_cqe;
 120        struct svcxprt_rdma     *cc_rdma;
 121        struct list_head        cc_rwctxts;
 122        int                     cc_sqecount;
 123};
 124
 125static void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
 126                             struct svc_rdma_chunk_ctxt *cc)
 127{
 128        cc->cc_rdma = rdma;
 129        svc_xprt_get(&rdma->sc_xprt);
 130
 131        INIT_LIST_HEAD(&cc->cc_rwctxts);
 132        cc->cc_sqecount = 0;
 133}
 134
 135static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc,
 136                                enum dma_data_direction dir)
 137{
 138        struct svcxprt_rdma *rdma = cc->cc_rdma;
 139        struct svc_rdma_rw_ctxt *ctxt;
 140
 141        while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) {
 142                list_del(&ctxt->rw_list);
 143
 144                rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
 145                                    rdma->sc_port_num, ctxt->rw_sg_table.sgl,
 146                                    ctxt->rw_nents, dir);
 147                svc_rdma_put_rw_ctxt(rdma, ctxt);
 148        }
 149        svc_xprt_put(&rdma->sc_xprt);
 150}
 151
 152/* State for sending a Write or Reply chunk.
 153 *  - Tracks progress of writing one chunk over all its segments
 154 *  - Stores arguments for the SGL constructor functions
 155 */
 156struct svc_rdma_write_info {
 157        /* write state of this chunk */
 158        unsigned int            wi_seg_off;
 159        unsigned int            wi_seg_no;
 160        unsigned int            wi_nsegs;
 161        __be32                  *wi_segs;
 162
 163        /* SGL constructor arguments */
 164        struct xdr_buf          *wi_xdr;
 165        unsigned char           *wi_base;
 166        unsigned int            wi_next_off;
 167
 168        struct svc_rdma_chunk_ctxt      wi_cc;
 169};
 170
 171static struct svc_rdma_write_info *
 172svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk)
 173{
 174        struct svc_rdma_write_info *info;
 175
 176        info = kmalloc(sizeof(*info), GFP_KERNEL);
 177        if (!info)
 178                return info;
 179
 180        info->wi_seg_off = 0;
 181        info->wi_seg_no = 0;
 182        info->wi_nsegs = be32_to_cpup(++chunk);
 183        info->wi_segs = ++chunk;
 184        svc_rdma_cc_init(rdma, &info->wi_cc);
 185        info->wi_cc.cc_cqe.done = svc_rdma_write_done;
 186        return info;
 187}
 188
 189static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
 190{
 191        svc_rdma_cc_release(&info->wi_cc, DMA_TO_DEVICE);
 192        kfree(info);
 193}
 194
 195/**
 196 * svc_rdma_write_done - Write chunk completion
 197 * @cq: controlling Completion Queue
 198 * @wc: Work Completion
 199 *
 200 * Pages under I/O are freed by a subsequent Send completion.
 201 */
 202static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
 203{
 204        struct ib_cqe *cqe = wc->wr_cqe;
 205        struct svc_rdma_chunk_ctxt *cc =
 206                        container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
 207        struct svcxprt_rdma *rdma = cc->cc_rdma;
 208        struct svc_rdma_write_info *info =
 209                        container_of(cc, struct svc_rdma_write_info, wi_cc);
 210
 211        trace_svcrdma_wc_write(wc);
 212
 213        atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
 214        wake_up(&rdma->sc_send_wait);
 215
 216        if (unlikely(wc->status != IB_WC_SUCCESS)) {
 217                set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
 218                if (wc->status != IB_WC_WR_FLUSH_ERR)
 219                        pr_err("svcrdma: write ctx: %s (%u/0x%x)\n",
 220                               ib_wc_status_msg(wc->status),
 221                               wc->status, wc->vendor_err);
 222        }
 223
 224        svc_rdma_write_info_free(info);
 225}
 226
 227/* State for pulling a Read chunk.
 228 */
 229struct svc_rdma_read_info {
 230        struct svc_rdma_recv_ctxt       *ri_readctxt;
 231        unsigned int                    ri_position;
 232        unsigned int                    ri_pageno;
 233        unsigned int                    ri_pageoff;
 234        unsigned int                    ri_chunklen;
 235
 236        struct svc_rdma_chunk_ctxt      ri_cc;
 237};
 238
 239static struct svc_rdma_read_info *
 240svc_rdma_read_info_alloc(struct svcxprt_rdma *rdma)
 241{
 242        struct svc_rdma_read_info *info;
 243
 244        info = kmalloc(sizeof(*info), GFP_KERNEL);
 245        if (!info)
 246                return info;
 247
 248        svc_rdma_cc_init(rdma, &info->ri_cc);
 249        info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done;
 250        return info;
 251}
 252
 253static void svc_rdma_read_info_free(struct svc_rdma_read_info *info)
 254{
 255        svc_rdma_cc_release(&info->ri_cc, DMA_FROM_DEVICE);
 256        kfree(info);
 257}
 258
 259/**
 260 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
 261 * @cq: controlling Completion Queue
 262 * @wc: Work Completion
 263 *
 264 */
 265static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
 266{
 267        struct ib_cqe *cqe = wc->wr_cqe;
 268        struct svc_rdma_chunk_ctxt *cc =
 269                        container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
 270        struct svcxprt_rdma *rdma = cc->cc_rdma;
 271        struct svc_rdma_read_info *info =
 272                        container_of(cc, struct svc_rdma_read_info, ri_cc);
 273
 274        trace_svcrdma_wc_read(wc);
 275
 276        atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
 277        wake_up(&rdma->sc_send_wait);
 278
 279        if (unlikely(wc->status != IB_WC_SUCCESS)) {
 280                set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
 281                if (wc->status != IB_WC_WR_FLUSH_ERR)
 282                        pr_err("svcrdma: read ctx: %s (%u/0x%x)\n",
 283                               ib_wc_status_msg(wc->status),
 284                               wc->status, wc->vendor_err);
 285                svc_rdma_recv_ctxt_put(rdma, info->ri_readctxt);
 286        } else {
 287                spin_lock(&rdma->sc_rq_dto_lock);
 288                list_add_tail(&info->ri_readctxt->rc_list,
 289                              &rdma->sc_read_complete_q);
 290                spin_unlock(&rdma->sc_rq_dto_lock);
 291
 292                set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
 293                svc_xprt_enqueue(&rdma->sc_xprt);
 294        }
 295
 296        svc_rdma_read_info_free(info);
 297}
 298
 299/* This function sleeps when the transport's Send Queue is congested.
 300 *
 301 * Assumptions:
 302 * - If ib_post_send() succeeds, only one completion is expected,
 303 *   even if one or more WRs are flushed. This is true when posting
 304 *   an rdma_rw_ctx or when posting a single signaled WR.
 305 */
 306static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
 307{
 308        struct svcxprt_rdma *rdma = cc->cc_rdma;
 309        struct svc_xprt *xprt = &rdma->sc_xprt;
 310        struct ib_send_wr *first_wr;
 311        const struct ib_send_wr *bad_wr;
 312        struct list_head *tmp;
 313        struct ib_cqe *cqe;
 314        int ret;
 315
 316        if (cc->cc_sqecount > rdma->sc_sq_depth)
 317                return -EINVAL;
 318
 319        first_wr = NULL;
 320        cqe = &cc->cc_cqe;
 321        list_for_each(tmp, &cc->cc_rwctxts) {
 322                struct svc_rdma_rw_ctxt *ctxt;
 323
 324                ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list);
 325                first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp,
 326                                           rdma->sc_port_num, cqe, first_wr);
 327                cqe = NULL;
 328        }
 329
 330        do {
 331                if (atomic_sub_return(cc->cc_sqecount,
 332                                      &rdma->sc_sq_avail) > 0) {
 333                        ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
 334                        trace_svcrdma_post_rw(&cc->cc_cqe,
 335                                              cc->cc_sqecount, ret);
 336                        if (ret)
 337                                break;
 338                        return 0;
 339                }
 340
 341                trace_svcrdma_sq_full(rdma);
 342                atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
 343                wait_event(rdma->sc_send_wait,
 344                           atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
 345                trace_svcrdma_sq_retry(rdma);
 346        } while (1);
 347
 348        set_bit(XPT_CLOSE, &xprt->xpt_flags);
 349
 350        /* If even one was posted, there will be a completion. */
 351        if (bad_wr != first_wr)
 352                return 0;
 353
 354        atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
 355        wake_up(&rdma->sc_send_wait);
 356        return -ENOTCONN;
 357}
 358
 359/* Build and DMA-map an SGL that covers one kvec in an xdr_buf
 360 */
 361static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info,
 362                               unsigned int len,
 363                               struct svc_rdma_rw_ctxt *ctxt)
 364{
 365        struct scatterlist *sg = ctxt->rw_sg_table.sgl;
 366
 367        sg_set_buf(&sg[0], info->wi_base, len);
 368        info->wi_base += len;
 369
 370        ctxt->rw_nents = 1;
 371}
 372
 373/* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
 374 */
 375static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
 376                                    unsigned int remaining,
 377                                    struct svc_rdma_rw_ctxt *ctxt)
 378{
 379        unsigned int sge_no, sge_bytes, page_off, page_no;
 380        struct xdr_buf *xdr = info->wi_xdr;
 381        struct scatterlist *sg;
 382        struct page **page;
 383
 384        page_off = info->wi_next_off + xdr->page_base;
 385        page_no = page_off >> PAGE_SHIFT;
 386        page_off = offset_in_page(page_off);
 387        page = xdr->pages + page_no;
 388        info->wi_next_off += remaining;
 389        sg = ctxt->rw_sg_table.sgl;
 390        sge_no = 0;
 391        do {
 392                sge_bytes = min_t(unsigned int, remaining,
 393                                  PAGE_SIZE - page_off);
 394                sg_set_page(sg, *page, sge_bytes, page_off);
 395
 396                remaining -= sge_bytes;
 397                sg = sg_next(sg);
 398                page_off = 0;
 399                sge_no++;
 400                page++;
 401        } while (remaining);
 402
 403        ctxt->rw_nents = sge_no;
 404}
 405
 406/* Construct RDMA Write WRs to send a portion of an xdr_buf containing
 407 * an RPC Reply.
 408 */
 409static int
 410svc_rdma_build_writes(struct svc_rdma_write_info *info,
 411                      void (*constructor)(struct svc_rdma_write_info *info,
 412                                          unsigned int len,
 413                                          struct svc_rdma_rw_ctxt *ctxt),
 414                      unsigned int remaining)
 415{
 416        struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
 417        struct svcxprt_rdma *rdma = cc->cc_rdma;
 418        struct svc_rdma_rw_ctxt *ctxt;
 419        __be32 *seg;
 420        int ret;
 421
 422        seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz;
 423        do {
 424                unsigned int write_len;
 425                u32 seg_length, seg_handle;
 426                u64 seg_offset;
 427
 428                if (info->wi_seg_no >= info->wi_nsegs)
 429                        goto out_overflow;
 430
 431                seg_handle = be32_to_cpup(seg);
 432                seg_length = be32_to_cpup(seg + 1);
 433                xdr_decode_hyper(seg + 2, &seg_offset);
 434                seg_offset += info->wi_seg_off;
 435
 436                write_len = min(remaining, seg_length - info->wi_seg_off);
 437                ctxt = svc_rdma_get_rw_ctxt(rdma,
 438                                            (write_len >> PAGE_SHIFT) + 2);
 439                if (!ctxt)
 440                        goto out_noctx;
 441
 442                constructor(info, write_len, ctxt);
 443                ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp,
 444                                       rdma->sc_port_num, ctxt->rw_sg_table.sgl,
 445                                       ctxt->rw_nents, 0, seg_offset,
 446                                       seg_handle, DMA_TO_DEVICE);
 447                if (ret < 0)
 448                        goto out_initerr;
 449
 450                trace_svcrdma_encode_wseg(seg_handle, write_len, seg_offset);
 451                list_add(&ctxt->rw_list, &cc->cc_rwctxts);
 452                cc->cc_sqecount += ret;
 453                if (write_len == seg_length - info->wi_seg_off) {
 454                        seg += 4;
 455                        info->wi_seg_no++;
 456                        info->wi_seg_off = 0;
 457                } else {
 458                        info->wi_seg_off += write_len;
 459                }
 460                remaining -= write_len;
 461        } while (remaining);
 462
 463        return 0;
 464
 465out_overflow:
 466        dprintk("svcrdma: inadequate space in Write chunk (%u)\n",
 467                info->wi_nsegs);
 468        return -E2BIG;
 469
 470out_noctx:
 471        dprintk("svcrdma: no R/W ctxs available\n");
 472        return -ENOMEM;
 473
 474out_initerr:
 475        svc_rdma_put_rw_ctxt(rdma, ctxt);
 476        trace_svcrdma_dma_map_rwctx(rdma, ret);
 477        return -EIO;
 478}
 479
 480/* Send one of an xdr_buf's kvecs by itself. To send a Reply
 481 * chunk, the whole RPC Reply is written back to the client.
 482 * This function writes either the head or tail of the xdr_buf
 483 * containing the Reply.
 484 */
 485static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info,
 486                                  struct kvec *vec)
 487{
 488        info->wi_base = vec->iov_base;
 489        return svc_rdma_build_writes(info, svc_rdma_vec_to_sg,
 490                                     vec->iov_len);
 491}
 492
 493/* Send an xdr_buf's page list by itself. A Write chunk is
 494 * just the page list. a Reply chunk is the head, page list,
 495 * and tail. This function is shared between the two types
 496 * of chunk.
 497 */
 498static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
 499                                      struct xdr_buf *xdr)
 500{
 501        info->wi_xdr = xdr;
 502        info->wi_next_off = 0;
 503        return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
 504                                     xdr->page_len);
 505}
 506
 507/**
 508 * svc_rdma_send_write_chunk - Write all segments in a Write chunk
 509 * @rdma: controlling RDMA transport
 510 * @wr_ch: Write chunk provided by client
 511 * @xdr: xdr_buf containing the data payload
 512 *
 513 * Returns a non-negative number of bytes the chunk consumed, or
 514 *      %-E2BIG if the payload was larger than the Write chunk,
 515 *      %-EINVAL if client provided too many segments,
 516 *      %-ENOMEM if rdma_rw context pool was exhausted,
 517 *      %-ENOTCONN if posting failed (connection is lost),
 518 *      %-EIO if rdma_rw initialization failed (DMA mapping, etc).
 519 */
 520int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
 521                              struct xdr_buf *xdr)
 522{
 523        struct svc_rdma_write_info *info;
 524        int ret;
 525
 526        if (!xdr->page_len)
 527                return 0;
 528
 529        info = svc_rdma_write_info_alloc(rdma, wr_ch);
 530        if (!info)
 531                return -ENOMEM;
 532
 533        ret = svc_rdma_send_xdr_pagelist(info, xdr);
 534        if (ret < 0)
 535                goto out_err;
 536
 537        ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
 538        if (ret < 0)
 539                goto out_err;
 540
 541        trace_svcrdma_encode_write(xdr->page_len);
 542        return xdr->page_len;
 543
 544out_err:
 545        svc_rdma_write_info_free(info);
 546        return ret;
 547}
 548
 549/**
 550 * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
 551 * @rdma: controlling RDMA transport
 552 * @rp_ch: Reply chunk provided by client
 553 * @writelist: true if client provided a Write list
 554 * @xdr: xdr_buf containing an RPC Reply
 555 *
 556 * Returns a non-negative number of bytes the chunk consumed, or
 557 *      %-E2BIG if the payload was larger than the Reply chunk,
 558 *      %-EINVAL if client provided too many segments,
 559 *      %-ENOMEM if rdma_rw context pool was exhausted,
 560 *      %-ENOTCONN if posting failed (connection is lost),
 561 *      %-EIO if rdma_rw initialization failed (DMA mapping, etc).
 562 */
 563int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch,
 564                              bool writelist, struct xdr_buf *xdr)
 565{
 566        struct svc_rdma_write_info *info;
 567        int consumed, ret;
 568
 569        info = svc_rdma_write_info_alloc(rdma, rp_ch);
 570        if (!info)
 571                return -ENOMEM;
 572
 573        ret = svc_rdma_send_xdr_kvec(info, &xdr->head[0]);
 574        if (ret < 0)
 575                goto out_err;
 576        consumed = xdr->head[0].iov_len;
 577
 578        /* Send the page list in the Reply chunk only if the
 579         * client did not provide Write chunks.
 580         */
 581        if (!writelist && xdr->page_len) {
 582                ret = svc_rdma_send_xdr_pagelist(info, xdr);
 583                if (ret < 0)
 584                        goto out_err;
 585                consumed += xdr->page_len;
 586        }
 587
 588        if (xdr->tail[0].iov_len) {
 589                ret = svc_rdma_send_xdr_kvec(info, &xdr->tail[0]);
 590                if (ret < 0)
 591                        goto out_err;
 592                consumed += xdr->tail[0].iov_len;
 593        }
 594
 595        ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
 596        if (ret < 0)
 597                goto out_err;
 598
 599        trace_svcrdma_encode_reply(consumed);
 600        return consumed;
 601
 602out_err:
 603        svc_rdma_write_info_free(info);
 604        return ret;
 605}
 606
 607static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
 608                                       struct svc_rqst *rqstp,
 609                                       u32 rkey, u32 len, u64 offset)
 610{
 611        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 612        struct svc_rdma_chunk_ctxt *cc = &info->ri_cc;
 613        struct svc_rdma_rw_ctxt *ctxt;
 614        unsigned int sge_no, seg_len;
 615        struct scatterlist *sg;
 616        int ret;
 617
 618        sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT;
 619        ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no);
 620        if (!ctxt)
 621                goto out_noctx;
 622        ctxt->rw_nents = sge_no;
 623
 624        sg = ctxt->rw_sg_table.sgl;
 625        for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) {
 626                seg_len = min_t(unsigned int, len,
 627                                PAGE_SIZE - info->ri_pageoff);
 628
 629                head->rc_arg.pages[info->ri_pageno] =
 630                        rqstp->rq_pages[info->ri_pageno];
 631                if (!info->ri_pageoff)
 632                        head->rc_page_count++;
 633
 634                sg_set_page(sg, rqstp->rq_pages[info->ri_pageno],
 635                            seg_len, info->ri_pageoff);
 636                sg = sg_next(sg);
 637
 638                info->ri_pageoff += seg_len;
 639                if (info->ri_pageoff == PAGE_SIZE) {
 640                        info->ri_pageno++;
 641                        info->ri_pageoff = 0;
 642                }
 643                len -= seg_len;
 644
 645                /* Safety check */
 646                if (len &&
 647                    &rqstp->rq_pages[info->ri_pageno + 1] > rqstp->rq_page_end)
 648                        goto out_overrun;
 649        }
 650
 651        ret = rdma_rw_ctx_init(&ctxt->rw_ctx, cc->cc_rdma->sc_qp,
 652                               cc->cc_rdma->sc_port_num,
 653                               ctxt->rw_sg_table.sgl, ctxt->rw_nents,
 654                               0, offset, rkey, DMA_FROM_DEVICE);
 655        if (ret < 0)
 656                goto out_initerr;
 657
 658        list_add(&ctxt->rw_list, &cc->cc_rwctxts);
 659        cc->cc_sqecount += ret;
 660        return 0;
 661
 662out_noctx:
 663        dprintk("svcrdma: no R/W ctxs available\n");
 664        return -ENOMEM;
 665
 666out_overrun:
 667        dprintk("svcrdma: request overruns rq_pages\n");
 668        return -EINVAL;
 669
 670out_initerr:
 671        trace_svcrdma_dma_map_rwctx(cc->cc_rdma, ret);
 672        svc_rdma_put_rw_ctxt(cc->cc_rdma, ctxt);
 673        return -EIO;
 674}
 675
 676/* Walk the segments in the Read chunk starting at @p and construct
 677 * RDMA Read operations to pull the chunk to the server.
 678 */
 679static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
 680                                     struct svc_rdma_read_info *info,
 681                                     __be32 *p)
 682{
 683        unsigned int i;
 684        int ret;
 685
 686        ret = -EINVAL;
 687        info->ri_chunklen = 0;
 688        while (*p++ != xdr_zero && be32_to_cpup(p++) == info->ri_position) {
 689                u32 rs_handle, rs_length;
 690                u64 rs_offset;
 691
 692                rs_handle = be32_to_cpup(p++);
 693                rs_length = be32_to_cpup(p++);
 694                p = xdr_decode_hyper(p, &rs_offset);
 695
 696                ret = svc_rdma_build_read_segment(info, rqstp,
 697                                                  rs_handle, rs_length,
 698                                                  rs_offset);
 699                if (ret < 0)
 700                        break;
 701
 702                trace_svcrdma_encode_rseg(rs_handle, rs_length, rs_offset);
 703                info->ri_chunklen += rs_length;
 704        }
 705
 706        /* Pages under I/O have been copied to head->rc_pages.
 707         * Prevent their premature release by svc_xprt_release() .
 708         */
 709        for (i = 0; i < info->ri_readctxt->rc_page_count; i++)
 710                rqstp->rq_pages[i] = NULL;
 711
 712        return ret;
 713}
 714
 715/* Construct RDMA Reads to pull over a normal Read chunk. The chunk
 716 * data lands in the page list of head->rc_arg.pages.
 717 *
 718 * Currently NFSD does not look at the head->rc_arg.tail[0] iovec.
 719 * Therefore, XDR round-up of the Read chunk and trailing
 720 * inline content must both be added at the end of the pagelist.
 721 */
 722static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
 723                                            struct svc_rdma_read_info *info,
 724                                            __be32 *p)
 725{
 726        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 727        int ret;
 728
 729        ret = svc_rdma_build_read_chunk(rqstp, info, p);
 730        if (ret < 0)
 731                goto out;
 732
 733        trace_svcrdma_encode_read(info->ri_chunklen, info->ri_position);
 734
 735        head->rc_hdr_count = 0;
 736
 737        /* Split the Receive buffer between the head and tail
 738         * buffers at Read chunk's position. XDR roundup of the
 739         * chunk is not included in either the pagelist or in
 740         * the tail.
 741         */
 742        head->rc_arg.tail[0].iov_base =
 743                head->rc_arg.head[0].iov_base + info->ri_position;
 744        head->rc_arg.tail[0].iov_len =
 745                head->rc_arg.head[0].iov_len - info->ri_position;
 746        head->rc_arg.head[0].iov_len = info->ri_position;
 747
 748        /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2).
 749         *
 750         * If the client already rounded up the chunk length, the
 751         * length does not change. Otherwise, the length of the page
 752         * list is increased to include XDR round-up.
 753         *
 754         * Currently these chunks always start at page offset 0,
 755         * thus the rounded-up length never crosses a page boundary.
 756         */
 757        info->ri_chunklen = XDR_QUADLEN(info->ri_chunklen) << 2;
 758
 759        head->rc_arg.page_len = info->ri_chunklen;
 760        head->rc_arg.len += info->ri_chunklen;
 761        head->rc_arg.buflen += info->ri_chunklen;
 762
 763out:
 764        return ret;
 765}
 766
 767/* Construct RDMA Reads to pull over a Position Zero Read chunk.
 768 * The start of the data lands in the first page just after
 769 * the Transport header, and the rest lands in the page list of
 770 * head->rc_arg.pages.
 771 *
 772 * Assumptions:
 773 *      - A PZRC has an XDR-aligned length (no implicit round-up).
 774 *      - There can be no trailing inline content (IOW, we assume
 775 *        a PZRC is never sent in an RDMA_MSG message, though it's
 776 *        allowed by spec).
 777 */
 778static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
 779                                        struct svc_rdma_read_info *info,
 780                                        __be32 *p)
 781{
 782        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 783        int ret;
 784
 785        ret = svc_rdma_build_read_chunk(rqstp, info, p);
 786        if (ret < 0)
 787                goto out;
 788
 789        trace_svcrdma_encode_pzr(info->ri_chunklen);
 790
 791        head->rc_arg.len += info->ri_chunklen;
 792        head->rc_arg.buflen += info->ri_chunklen;
 793
 794        head->rc_hdr_count = 1;
 795        head->rc_arg.head[0].iov_base = page_address(head->rc_pages[0]);
 796        head->rc_arg.head[0].iov_len = min_t(size_t, PAGE_SIZE,
 797                                             info->ri_chunklen);
 798
 799        head->rc_arg.page_len = info->ri_chunklen -
 800                                head->rc_arg.head[0].iov_len;
 801
 802out:
 803        return ret;
 804}
 805
 806/**
 807 * svc_rdma_recv_read_chunk - Pull a Read chunk from the client
 808 * @rdma: controlling RDMA transport
 809 * @rqstp: set of pages to use as Read sink buffers
 810 * @head: pages under I/O collect here
 811 * @p: pointer to start of Read chunk
 812 *
 813 * Returns:
 814 *      %0 if all needed RDMA Reads were posted successfully,
 815 *      %-EINVAL if client provided too many segments,
 816 *      %-ENOMEM if rdma_rw context pool was exhausted,
 817 *      %-ENOTCONN if posting failed (connection is lost),
 818 *      %-EIO if rdma_rw initialization failed (DMA mapping, etc).
 819 *
 820 * Assumptions:
 821 * - All Read segments in @p have the same Position value.
 822 */
 823int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
 824                             struct svc_rdma_recv_ctxt *head, __be32 *p)
 825{
 826        struct svc_rdma_read_info *info;
 827        int ret;
 828
 829        /* The request (with page list) is constructed in
 830         * head->rc_arg. Pages involved with RDMA Read I/O are
 831         * transferred there.
 832         */
 833        head->rc_arg.head[0] = rqstp->rq_arg.head[0];
 834        head->rc_arg.tail[0] = rqstp->rq_arg.tail[0];
 835        head->rc_arg.pages = head->rc_pages;
 836        head->rc_arg.page_base = 0;
 837        head->rc_arg.page_len = 0;
 838        head->rc_arg.len = rqstp->rq_arg.len;
 839        head->rc_arg.buflen = rqstp->rq_arg.buflen;
 840
 841        info = svc_rdma_read_info_alloc(rdma);
 842        if (!info)
 843                return -ENOMEM;
 844        info->ri_readctxt = head;
 845        info->ri_pageno = 0;
 846        info->ri_pageoff = 0;
 847
 848        info->ri_position = be32_to_cpup(p + 1);
 849        if (info->ri_position)
 850                ret = svc_rdma_build_normal_read_chunk(rqstp, info, p);
 851        else
 852                ret = svc_rdma_build_pz_read_chunk(rqstp, info, p);
 853        if (ret < 0)
 854                goto out_err;
 855
 856        ret = svc_rdma_post_chunk_ctxt(&info->ri_cc);
 857        if (ret < 0)
 858                goto out_err;
 859        return 0;
 860
 861out_err:
 862        svc_rdma_read_info_free(info);
 863        return ret;
 864}
 865