linux/net/sunrpc/xprtrdma/svc_rdma_rw.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * Copyright (c) 2016-2018 Oracle.  All rights reserved.
   4 *
   5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
   6 */
   7
   8#include <rdma/rw.h>
   9
  10#include <linux/sunrpc/rpc_rdma.h>
  11#include <linux/sunrpc/svc_rdma.h>
  12#include <linux/sunrpc/debug.h>
  13
  14#include "xprt_rdma.h"
  15#include <trace/events/rpcrdma.h>
  16
  17#define RPCDBG_FACILITY RPCDBG_SVCXPRT
  18
  19static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc);
  20static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc);
  21
  22/* Each R/W context contains state for one chain of RDMA Read or
  23 * Write Work Requests.
  24 *
  25 * Each WR chain handles a single contiguous server-side buffer,
  26 * because scatterlist entries after the first have to start on
  27 * page alignment. xdr_buf iovecs cannot guarantee alignment.
  28 *
  29 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
  30 * from a client may contain a unique R_key, so each WR chain moves
  31 * up to one segment at a time.
  32 *
  33 * The scatterlist makes this data structure over 4KB in size. To
  34 * make it less likely to fail, and to handle the allocation for
  35 * smaller I/O requests without disabling bottom-halves, these
  36 * contexts are created on demand, but cached and reused until the
  37 * controlling svcxprt_rdma is destroyed.
  38 */
  39struct svc_rdma_rw_ctxt {
  40        struct list_head        rw_list;
  41        struct rdma_rw_ctx      rw_ctx;
  42        int                     rw_nents;
  43        struct sg_table         rw_sg_table;
  44        struct scatterlist      rw_first_sgl[0];
  45};
  46
  47static inline struct svc_rdma_rw_ctxt *
  48svc_rdma_next_ctxt(struct list_head *list)
  49{
  50        return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt,
  51                                        rw_list);
  52}
  53
  54static struct svc_rdma_rw_ctxt *
  55svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges)
  56{
  57        struct svc_rdma_rw_ctxt *ctxt;
  58
  59        spin_lock(&rdma->sc_rw_ctxt_lock);
  60
  61        ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts);
  62        if (ctxt) {
  63                list_del(&ctxt->rw_list);
  64                spin_unlock(&rdma->sc_rw_ctxt_lock);
  65        } else {
  66                spin_unlock(&rdma->sc_rw_ctxt_lock);
  67                ctxt = kmalloc(struct_size(ctxt, rw_first_sgl, SG_CHUNK_SIZE),
  68                               GFP_KERNEL);
  69                if (!ctxt)
  70                        goto out;
  71                INIT_LIST_HEAD(&ctxt->rw_list);
  72        }
  73
  74        ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
  75        if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
  76                                   ctxt->rw_sg_table.sgl,
  77                                   SG_CHUNK_SIZE)) {
  78                kfree(ctxt);
  79                ctxt = NULL;
  80        }
  81out:
  82        return ctxt;
  83}
  84
  85static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
  86                                 struct svc_rdma_rw_ctxt *ctxt)
  87{
  88        sg_free_table_chained(&ctxt->rw_sg_table, SG_CHUNK_SIZE);
  89
  90        spin_lock(&rdma->sc_rw_ctxt_lock);
  91        list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts);
  92        spin_unlock(&rdma->sc_rw_ctxt_lock);
  93}
  94
  95/**
  96 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
  97 * @rdma: transport about to be destroyed
  98 *
  99 */
 100void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
 101{
 102        struct svc_rdma_rw_ctxt *ctxt;
 103
 104        while ((ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts)) != NULL) {
 105                list_del(&ctxt->rw_list);
 106                kfree(ctxt);
 107        }
 108}
 109
 110/* A chunk context tracks all I/O for moving one Read or Write
 111 * chunk. This is a a set of rdma_rw's that handle data movement
 112 * for all segments of one chunk.
 113 *
 114 * These are small, acquired with a single allocator call, and
 115 * no more than one is needed per chunk. They are allocated on
 116 * demand, and not cached.
 117 */
 118struct svc_rdma_chunk_ctxt {
 119        struct ib_cqe           cc_cqe;
 120        struct svcxprt_rdma     *cc_rdma;
 121        struct list_head        cc_rwctxts;
 122        int                     cc_sqecount;
 123};
 124
 125static void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
 126                             struct svc_rdma_chunk_ctxt *cc)
 127{
 128        cc->cc_rdma = rdma;
 129        svc_xprt_get(&rdma->sc_xprt);
 130
 131        INIT_LIST_HEAD(&cc->cc_rwctxts);
 132        cc->cc_sqecount = 0;
 133}
 134
 135static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc,
 136                                enum dma_data_direction dir)
 137{
 138        struct svcxprt_rdma *rdma = cc->cc_rdma;
 139        struct svc_rdma_rw_ctxt *ctxt;
 140
 141        while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) {
 142                list_del(&ctxt->rw_list);
 143
 144                rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
 145                                    rdma->sc_port_num, ctxt->rw_sg_table.sgl,
 146                                    ctxt->rw_nents, dir);
 147                svc_rdma_put_rw_ctxt(rdma, ctxt);
 148        }
 149        svc_xprt_put(&rdma->sc_xprt);
 150}
 151
 152/* State for sending a Write or Reply chunk.
 153 *  - Tracks progress of writing one chunk over all its segments
 154 *  - Stores arguments for the SGL constructor functions
 155 */
 156struct svc_rdma_write_info {
 157        /* write state of this chunk */
 158        unsigned int            wi_seg_off;
 159        unsigned int            wi_seg_no;
 160        unsigned int            wi_nsegs;
 161        __be32                  *wi_segs;
 162
 163        /* SGL constructor arguments */
 164        struct xdr_buf          *wi_xdr;
 165        unsigned char           *wi_base;
 166        unsigned int            wi_next_off;
 167
 168        struct svc_rdma_chunk_ctxt      wi_cc;
 169};
 170
 171static struct svc_rdma_write_info *
 172svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk)
 173{
 174        struct svc_rdma_write_info *info;
 175
 176        info = kmalloc(sizeof(*info), GFP_KERNEL);
 177        if (!info)
 178                return info;
 179
 180        info->wi_seg_off = 0;
 181        info->wi_seg_no = 0;
 182        info->wi_nsegs = be32_to_cpup(++chunk);
 183        info->wi_segs = ++chunk;
 184        svc_rdma_cc_init(rdma, &info->wi_cc);
 185        info->wi_cc.cc_cqe.done = svc_rdma_write_done;
 186        return info;
 187}
 188
 189static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
 190{
 191        svc_rdma_cc_release(&info->wi_cc, DMA_TO_DEVICE);
 192        kfree(info);
 193}
 194
 195/**
 196 * svc_rdma_write_done - Write chunk completion
 197 * @cq: controlling Completion Queue
 198 * @wc: Work Completion
 199 *
 200 * Pages under I/O are freed by a subsequent Send completion.
 201 */
 202static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
 203{
 204        struct ib_cqe *cqe = wc->wr_cqe;
 205        struct svc_rdma_chunk_ctxt *cc =
 206                        container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
 207        struct svcxprt_rdma *rdma = cc->cc_rdma;
 208        struct svc_rdma_write_info *info =
 209                        container_of(cc, struct svc_rdma_write_info, wi_cc);
 210
 211        trace_svcrdma_wc_write(wc);
 212
 213        atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
 214        wake_up(&rdma->sc_send_wait);
 215
 216        if (unlikely(wc->status != IB_WC_SUCCESS))
 217                set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
 218
 219        svc_rdma_write_info_free(info);
 220}
 221
 222/* State for pulling a Read chunk.
 223 */
 224struct svc_rdma_read_info {
 225        struct svc_rdma_recv_ctxt       *ri_readctxt;
 226        unsigned int                    ri_position;
 227        unsigned int                    ri_pageno;
 228        unsigned int                    ri_pageoff;
 229        unsigned int                    ri_chunklen;
 230
 231        struct svc_rdma_chunk_ctxt      ri_cc;
 232};
 233
 234static struct svc_rdma_read_info *
 235svc_rdma_read_info_alloc(struct svcxprt_rdma *rdma)
 236{
 237        struct svc_rdma_read_info *info;
 238
 239        info = kmalloc(sizeof(*info), GFP_KERNEL);
 240        if (!info)
 241                return info;
 242
 243        svc_rdma_cc_init(rdma, &info->ri_cc);
 244        info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done;
 245        return info;
 246}
 247
 248static void svc_rdma_read_info_free(struct svc_rdma_read_info *info)
 249{
 250        svc_rdma_cc_release(&info->ri_cc, DMA_FROM_DEVICE);
 251        kfree(info);
 252}
 253
 254/**
 255 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
 256 * @cq: controlling Completion Queue
 257 * @wc: Work Completion
 258 *
 259 */
 260static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
 261{
 262        struct ib_cqe *cqe = wc->wr_cqe;
 263        struct svc_rdma_chunk_ctxt *cc =
 264                        container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
 265        struct svcxprt_rdma *rdma = cc->cc_rdma;
 266        struct svc_rdma_read_info *info =
 267                        container_of(cc, struct svc_rdma_read_info, ri_cc);
 268
 269        trace_svcrdma_wc_read(wc);
 270
 271        atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
 272        wake_up(&rdma->sc_send_wait);
 273
 274        if (unlikely(wc->status != IB_WC_SUCCESS)) {
 275                set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
 276                svc_rdma_recv_ctxt_put(rdma, info->ri_readctxt);
 277        } else {
 278                spin_lock(&rdma->sc_rq_dto_lock);
 279                list_add_tail(&info->ri_readctxt->rc_list,
 280                              &rdma->sc_read_complete_q);
 281                /* Note the unlock pairs with the smp_rmb in svc_xprt_ready: */
 282                set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
 283                spin_unlock(&rdma->sc_rq_dto_lock);
 284
 285                svc_xprt_enqueue(&rdma->sc_xprt);
 286        }
 287
 288        svc_rdma_read_info_free(info);
 289}
 290
 291/* This function sleeps when the transport's Send Queue is congested.
 292 *
 293 * Assumptions:
 294 * - If ib_post_send() succeeds, only one completion is expected,
 295 *   even if one or more WRs are flushed. This is true when posting
 296 *   an rdma_rw_ctx or when posting a single signaled WR.
 297 */
 298static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
 299{
 300        struct svcxprt_rdma *rdma = cc->cc_rdma;
 301        struct svc_xprt *xprt = &rdma->sc_xprt;
 302        struct ib_send_wr *first_wr;
 303        const struct ib_send_wr *bad_wr;
 304        struct list_head *tmp;
 305        struct ib_cqe *cqe;
 306        int ret;
 307
 308        if (cc->cc_sqecount > rdma->sc_sq_depth)
 309                return -EINVAL;
 310
 311        first_wr = NULL;
 312        cqe = &cc->cc_cqe;
 313        list_for_each(tmp, &cc->cc_rwctxts) {
 314                struct svc_rdma_rw_ctxt *ctxt;
 315
 316                ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list);
 317                first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp,
 318                                           rdma->sc_port_num, cqe, first_wr);
 319                cqe = NULL;
 320        }
 321
 322        do {
 323                if (atomic_sub_return(cc->cc_sqecount,
 324                                      &rdma->sc_sq_avail) > 0) {
 325                        ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
 326                        trace_svcrdma_post_rw(&cc->cc_cqe,
 327                                              cc->cc_sqecount, ret);
 328                        if (ret)
 329                                break;
 330                        return 0;
 331                }
 332
 333                trace_svcrdma_sq_full(rdma);
 334                atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
 335                wait_event(rdma->sc_send_wait,
 336                           atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
 337                trace_svcrdma_sq_retry(rdma);
 338        } while (1);
 339
 340        set_bit(XPT_CLOSE, &xprt->xpt_flags);
 341
 342        /* If even one was posted, there will be a completion. */
 343        if (bad_wr != first_wr)
 344                return 0;
 345
 346        atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
 347        wake_up(&rdma->sc_send_wait);
 348        return -ENOTCONN;
 349}
 350
 351/* Build and DMA-map an SGL that covers one kvec in an xdr_buf
 352 */
 353static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info,
 354                               unsigned int len,
 355                               struct svc_rdma_rw_ctxt *ctxt)
 356{
 357        struct scatterlist *sg = ctxt->rw_sg_table.sgl;
 358
 359        sg_set_buf(&sg[0], info->wi_base, len);
 360        info->wi_base += len;
 361
 362        ctxt->rw_nents = 1;
 363}
 364
 365/* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
 366 */
 367static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
 368                                    unsigned int remaining,
 369                                    struct svc_rdma_rw_ctxt *ctxt)
 370{
 371        unsigned int sge_no, sge_bytes, page_off, page_no;
 372        struct xdr_buf *xdr = info->wi_xdr;
 373        struct scatterlist *sg;
 374        struct page **page;
 375
 376        page_off = info->wi_next_off + xdr->page_base;
 377        page_no = page_off >> PAGE_SHIFT;
 378        page_off = offset_in_page(page_off);
 379        page = xdr->pages + page_no;
 380        info->wi_next_off += remaining;
 381        sg = ctxt->rw_sg_table.sgl;
 382        sge_no = 0;
 383        do {
 384                sge_bytes = min_t(unsigned int, remaining,
 385                                  PAGE_SIZE - page_off);
 386                sg_set_page(sg, *page, sge_bytes, page_off);
 387
 388                remaining -= sge_bytes;
 389                sg = sg_next(sg);
 390                page_off = 0;
 391                sge_no++;
 392                page++;
 393        } while (remaining);
 394
 395        ctxt->rw_nents = sge_no;
 396}
 397
 398/* Construct RDMA Write WRs to send a portion of an xdr_buf containing
 399 * an RPC Reply.
 400 */
 401static int
 402svc_rdma_build_writes(struct svc_rdma_write_info *info,
 403                      void (*constructor)(struct svc_rdma_write_info *info,
 404                                          unsigned int len,
 405                                          struct svc_rdma_rw_ctxt *ctxt),
 406                      unsigned int remaining)
 407{
 408        struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
 409        struct svcxprt_rdma *rdma = cc->cc_rdma;
 410        struct svc_rdma_rw_ctxt *ctxt;
 411        __be32 *seg;
 412        int ret;
 413
 414        seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz;
 415        do {
 416                unsigned int write_len;
 417                u32 seg_length, seg_handle;
 418                u64 seg_offset;
 419
 420                if (info->wi_seg_no >= info->wi_nsegs)
 421                        goto out_overflow;
 422
 423                seg_handle = be32_to_cpup(seg);
 424                seg_length = be32_to_cpup(seg + 1);
 425                xdr_decode_hyper(seg + 2, &seg_offset);
 426                seg_offset += info->wi_seg_off;
 427
 428                write_len = min(remaining, seg_length - info->wi_seg_off);
 429                ctxt = svc_rdma_get_rw_ctxt(rdma,
 430                                            (write_len >> PAGE_SHIFT) + 2);
 431                if (!ctxt)
 432                        goto out_noctx;
 433
 434                constructor(info, write_len, ctxt);
 435                ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp,
 436                                       rdma->sc_port_num, ctxt->rw_sg_table.sgl,
 437                                       ctxt->rw_nents, 0, seg_offset,
 438                                       seg_handle, DMA_TO_DEVICE);
 439                if (ret < 0)
 440                        goto out_initerr;
 441
 442                trace_svcrdma_encode_wseg(seg_handle, write_len, seg_offset);
 443                list_add(&ctxt->rw_list, &cc->cc_rwctxts);
 444                cc->cc_sqecount += ret;
 445                if (write_len == seg_length - info->wi_seg_off) {
 446                        seg += 4;
 447                        info->wi_seg_no++;
 448                        info->wi_seg_off = 0;
 449                } else {
 450                        info->wi_seg_off += write_len;
 451                }
 452                remaining -= write_len;
 453        } while (remaining);
 454
 455        return 0;
 456
 457out_overflow:
 458        dprintk("svcrdma: inadequate space in Write chunk (%u)\n",
 459                info->wi_nsegs);
 460        return -E2BIG;
 461
 462out_noctx:
 463        dprintk("svcrdma: no R/W ctxs available\n");
 464        return -ENOMEM;
 465
 466out_initerr:
 467        svc_rdma_put_rw_ctxt(rdma, ctxt);
 468        trace_svcrdma_dma_map_rwctx(rdma, ret);
 469        return -EIO;
 470}
 471
 472/* Send one of an xdr_buf's kvecs by itself. To send a Reply
 473 * chunk, the whole RPC Reply is written back to the client.
 474 * This function writes either the head or tail of the xdr_buf
 475 * containing the Reply.
 476 */
 477static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info,
 478                                  struct kvec *vec)
 479{
 480        info->wi_base = vec->iov_base;
 481        return svc_rdma_build_writes(info, svc_rdma_vec_to_sg,
 482                                     vec->iov_len);
 483}
 484
 485/* Send an xdr_buf's page list by itself. A Write chunk is
 486 * just the page list. a Reply chunk is the head, page list,
 487 * and tail. This function is shared between the two types
 488 * of chunk.
 489 */
 490static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
 491                                      struct xdr_buf *xdr)
 492{
 493        info->wi_xdr = xdr;
 494        info->wi_next_off = 0;
 495        return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
 496                                     xdr->page_len);
 497}
 498
 499/**
 500 * svc_rdma_send_write_chunk - Write all segments in a Write chunk
 501 * @rdma: controlling RDMA transport
 502 * @wr_ch: Write chunk provided by client
 503 * @xdr: xdr_buf containing the data payload
 504 *
 505 * Returns a non-negative number of bytes the chunk consumed, or
 506 *      %-E2BIG if the payload was larger than the Write chunk,
 507 *      %-EINVAL if client provided too many segments,
 508 *      %-ENOMEM if rdma_rw context pool was exhausted,
 509 *      %-ENOTCONN if posting failed (connection is lost),
 510 *      %-EIO if rdma_rw initialization failed (DMA mapping, etc).
 511 */
 512int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
 513                              struct xdr_buf *xdr)
 514{
 515        struct svc_rdma_write_info *info;
 516        int ret;
 517
 518        if (!xdr->page_len)
 519                return 0;
 520
 521        info = svc_rdma_write_info_alloc(rdma, wr_ch);
 522        if (!info)
 523                return -ENOMEM;
 524
 525        ret = svc_rdma_send_xdr_pagelist(info, xdr);
 526        if (ret < 0)
 527                goto out_err;
 528
 529        ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
 530        if (ret < 0)
 531                goto out_err;
 532
 533        trace_svcrdma_encode_write(xdr->page_len);
 534        return xdr->page_len;
 535
 536out_err:
 537        svc_rdma_write_info_free(info);
 538        return ret;
 539}
 540
 541/**
 542 * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
 543 * @rdma: controlling RDMA transport
 544 * @rp_ch: Reply chunk provided by client
 545 * @writelist: true if client provided a Write list
 546 * @xdr: xdr_buf containing an RPC Reply
 547 *
 548 * Returns a non-negative number of bytes the chunk consumed, or
 549 *      %-E2BIG if the payload was larger than the Reply chunk,
 550 *      %-EINVAL if client provided too many segments,
 551 *      %-ENOMEM if rdma_rw context pool was exhausted,
 552 *      %-ENOTCONN if posting failed (connection is lost),
 553 *      %-EIO if rdma_rw initialization failed (DMA mapping, etc).
 554 */
 555int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch,
 556                              bool writelist, struct xdr_buf *xdr)
 557{
 558        struct svc_rdma_write_info *info;
 559        int consumed, ret;
 560
 561        info = svc_rdma_write_info_alloc(rdma, rp_ch);
 562        if (!info)
 563                return -ENOMEM;
 564
 565        ret = svc_rdma_send_xdr_kvec(info, &xdr->head[0]);
 566        if (ret < 0)
 567                goto out_err;
 568        consumed = xdr->head[0].iov_len;
 569
 570        /* Send the page list in the Reply chunk only if the
 571         * client did not provide Write chunks.
 572         */
 573        if (!writelist && xdr->page_len) {
 574                ret = svc_rdma_send_xdr_pagelist(info, xdr);
 575                if (ret < 0)
 576                        goto out_err;
 577                consumed += xdr->page_len;
 578        }
 579
 580        if (xdr->tail[0].iov_len) {
 581                ret = svc_rdma_send_xdr_kvec(info, &xdr->tail[0]);
 582                if (ret < 0)
 583                        goto out_err;
 584                consumed += xdr->tail[0].iov_len;
 585        }
 586
 587        ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
 588        if (ret < 0)
 589                goto out_err;
 590
 591        trace_svcrdma_encode_reply(consumed);
 592        return consumed;
 593
 594out_err:
 595        svc_rdma_write_info_free(info);
 596        return ret;
 597}
 598
 599static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
 600                                       struct svc_rqst *rqstp,
 601                                       u32 rkey, u32 len, u64 offset)
 602{
 603        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 604        struct svc_rdma_chunk_ctxt *cc = &info->ri_cc;
 605        struct svc_rdma_rw_ctxt *ctxt;
 606        unsigned int sge_no, seg_len;
 607        struct scatterlist *sg;
 608        int ret;
 609
 610        sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT;
 611        ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no);
 612        if (!ctxt)
 613                goto out_noctx;
 614        ctxt->rw_nents = sge_no;
 615
 616        sg = ctxt->rw_sg_table.sgl;
 617        for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) {
 618                seg_len = min_t(unsigned int, len,
 619                                PAGE_SIZE - info->ri_pageoff);
 620
 621                head->rc_arg.pages[info->ri_pageno] =
 622                        rqstp->rq_pages[info->ri_pageno];
 623                if (!info->ri_pageoff)
 624                        head->rc_page_count++;
 625
 626                sg_set_page(sg, rqstp->rq_pages[info->ri_pageno],
 627                            seg_len, info->ri_pageoff);
 628                sg = sg_next(sg);
 629
 630                info->ri_pageoff += seg_len;
 631                if (info->ri_pageoff == PAGE_SIZE) {
 632                        info->ri_pageno++;
 633                        info->ri_pageoff = 0;
 634                }
 635                len -= seg_len;
 636
 637                /* Safety check */
 638                if (len &&
 639                    &rqstp->rq_pages[info->ri_pageno + 1] > rqstp->rq_page_end)
 640                        goto out_overrun;
 641        }
 642
 643        ret = rdma_rw_ctx_init(&ctxt->rw_ctx, cc->cc_rdma->sc_qp,
 644                               cc->cc_rdma->sc_port_num,
 645                               ctxt->rw_sg_table.sgl, ctxt->rw_nents,
 646                               0, offset, rkey, DMA_FROM_DEVICE);
 647        if (ret < 0)
 648                goto out_initerr;
 649
 650        list_add(&ctxt->rw_list, &cc->cc_rwctxts);
 651        cc->cc_sqecount += ret;
 652        return 0;
 653
 654out_noctx:
 655        dprintk("svcrdma: no R/W ctxs available\n");
 656        return -ENOMEM;
 657
 658out_overrun:
 659        dprintk("svcrdma: request overruns rq_pages\n");
 660        return -EINVAL;
 661
 662out_initerr:
 663        trace_svcrdma_dma_map_rwctx(cc->cc_rdma, ret);
 664        svc_rdma_put_rw_ctxt(cc->cc_rdma, ctxt);
 665        return -EIO;
 666}
 667
 668/* Walk the segments in the Read chunk starting at @p and construct
 669 * RDMA Read operations to pull the chunk to the server.
 670 */
 671static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
 672                                     struct svc_rdma_read_info *info,
 673                                     __be32 *p)
 674{
 675        unsigned int i;
 676        int ret;
 677
 678        ret = -EINVAL;
 679        info->ri_chunklen = 0;
 680        while (*p++ != xdr_zero && be32_to_cpup(p++) == info->ri_position) {
 681                u32 rs_handle, rs_length;
 682                u64 rs_offset;
 683
 684                rs_handle = be32_to_cpup(p++);
 685                rs_length = be32_to_cpup(p++);
 686                p = xdr_decode_hyper(p, &rs_offset);
 687
 688                ret = svc_rdma_build_read_segment(info, rqstp,
 689                                                  rs_handle, rs_length,
 690                                                  rs_offset);
 691                if (ret < 0)
 692                        break;
 693
 694                trace_svcrdma_encode_rseg(rs_handle, rs_length, rs_offset);
 695                info->ri_chunklen += rs_length;
 696        }
 697
 698        /* Pages under I/O have been copied to head->rc_pages.
 699         * Prevent their premature release by svc_xprt_release() .
 700         */
 701        for (i = 0; i < info->ri_readctxt->rc_page_count; i++)
 702                rqstp->rq_pages[i] = NULL;
 703
 704        return ret;
 705}
 706
 707/* Construct RDMA Reads to pull over a normal Read chunk. The chunk
 708 * data lands in the page list of head->rc_arg.pages.
 709 *
 710 * Currently NFSD does not look at the head->rc_arg.tail[0] iovec.
 711 * Therefore, XDR round-up of the Read chunk and trailing
 712 * inline content must both be added at the end of the pagelist.
 713 */
 714static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
 715                                            struct svc_rdma_read_info *info,
 716                                            __be32 *p)
 717{
 718        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 719        int ret;
 720
 721        ret = svc_rdma_build_read_chunk(rqstp, info, p);
 722        if (ret < 0)
 723                goto out;
 724
 725        trace_svcrdma_encode_read(info->ri_chunklen, info->ri_position);
 726
 727        head->rc_hdr_count = 0;
 728
 729        /* Split the Receive buffer between the head and tail
 730         * buffers at Read chunk's position. XDR roundup of the
 731         * chunk is not included in either the pagelist or in
 732         * the tail.
 733         */
 734        head->rc_arg.tail[0].iov_base =
 735                head->rc_arg.head[0].iov_base + info->ri_position;
 736        head->rc_arg.tail[0].iov_len =
 737                head->rc_arg.head[0].iov_len - info->ri_position;
 738        head->rc_arg.head[0].iov_len = info->ri_position;
 739
 740        /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2).
 741         *
 742         * If the client already rounded up the chunk length, the
 743         * length does not change. Otherwise, the length of the page
 744         * list is increased to include XDR round-up.
 745         *
 746         * Currently these chunks always start at page offset 0,
 747         * thus the rounded-up length never crosses a page boundary.
 748         */
 749        info->ri_chunklen = XDR_QUADLEN(info->ri_chunklen) << 2;
 750
 751        head->rc_arg.page_len = info->ri_chunklen;
 752        head->rc_arg.len += info->ri_chunklen;
 753        head->rc_arg.buflen += info->ri_chunklen;
 754
 755out:
 756        return ret;
 757}
 758
 759/* Construct RDMA Reads to pull over a Position Zero Read chunk.
 760 * The start of the data lands in the first page just after
 761 * the Transport header, and the rest lands in the page list of
 762 * head->rc_arg.pages.
 763 *
 764 * Assumptions:
 765 *      - A PZRC has an XDR-aligned length (no implicit round-up).
 766 *      - There can be no trailing inline content (IOW, we assume
 767 *        a PZRC is never sent in an RDMA_MSG message, though it's
 768 *        allowed by spec).
 769 */
 770static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
 771                                        struct svc_rdma_read_info *info,
 772                                        __be32 *p)
 773{
 774        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 775        int ret;
 776
 777        ret = svc_rdma_build_read_chunk(rqstp, info, p);
 778        if (ret < 0)
 779                goto out;
 780
 781        trace_svcrdma_encode_pzr(info->ri_chunklen);
 782
 783        head->rc_arg.len += info->ri_chunklen;
 784        head->rc_arg.buflen += info->ri_chunklen;
 785
 786        head->rc_hdr_count = 1;
 787        head->rc_arg.head[0].iov_base = page_address(head->rc_pages[0]);
 788        head->rc_arg.head[0].iov_len = min_t(size_t, PAGE_SIZE,
 789                                             info->ri_chunklen);
 790
 791        head->rc_arg.page_len = info->ri_chunklen -
 792                                head->rc_arg.head[0].iov_len;
 793
 794out:
 795        return ret;
 796}
 797
 798/**
 799 * svc_rdma_recv_read_chunk - Pull a Read chunk from the client
 800 * @rdma: controlling RDMA transport
 801 * @rqstp: set of pages to use as Read sink buffers
 802 * @head: pages under I/O collect here
 803 * @p: pointer to start of Read chunk
 804 *
 805 * Returns:
 806 *      %0 if all needed RDMA Reads were posted successfully,
 807 *      %-EINVAL if client provided too many segments,
 808 *      %-ENOMEM if rdma_rw context pool was exhausted,
 809 *      %-ENOTCONN if posting failed (connection is lost),
 810 *      %-EIO if rdma_rw initialization failed (DMA mapping, etc).
 811 *
 812 * Assumptions:
 813 * - All Read segments in @p have the same Position value.
 814 */
 815int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
 816                             struct svc_rdma_recv_ctxt *head, __be32 *p)
 817{
 818        struct svc_rdma_read_info *info;
 819        int ret;
 820
 821        /* The request (with page list) is constructed in
 822         * head->rc_arg. Pages involved with RDMA Read I/O are
 823         * transferred there.
 824         */
 825        head->rc_arg.head[0] = rqstp->rq_arg.head[0];
 826        head->rc_arg.tail[0] = rqstp->rq_arg.tail[0];
 827        head->rc_arg.pages = head->rc_pages;
 828        head->rc_arg.page_base = 0;
 829        head->rc_arg.page_len = 0;
 830        head->rc_arg.len = rqstp->rq_arg.len;
 831        head->rc_arg.buflen = rqstp->rq_arg.buflen;
 832
 833        info = svc_rdma_read_info_alloc(rdma);
 834        if (!info)
 835                return -ENOMEM;
 836        info->ri_readctxt = head;
 837        info->ri_pageno = 0;
 838        info->ri_pageoff = 0;
 839
 840        info->ri_position = be32_to_cpup(p + 1);
 841        if (info->ri_position)
 842                ret = svc_rdma_build_normal_read_chunk(rqstp, info, p);
 843        else
 844                ret = svc_rdma_build_pz_read_chunk(rqstp, info, p);
 845        if (ret < 0)
 846                goto out_err;
 847
 848        ret = svc_rdma_post_chunk_ctxt(&info->ri_cc);
 849        if (ret < 0)
 850                goto out_err;
 851        return 0;
 852
 853out_err:
 854        svc_rdma_read_info_free(info);
 855        return ret;
 856}
 857