linux/net/sunrpc/xprtrdma/svc_rdma_rw.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * Copyright (c) 2016-2018 Oracle.  All rights reserved.
   4 *
   5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
   6 */
   7
   8#include <rdma/rw.h>
   9
  10#include <linux/sunrpc/xdr.h>
  11#include <linux/sunrpc/rpc_rdma.h>
  12#include <linux/sunrpc/svc_rdma.h>
  13
  14#include "xprt_rdma.h"
  15#include <trace/events/rpcrdma.h>
  16
  17static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc);
  18static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc);
  19
  20/* Each R/W context contains state for one chain of RDMA Read or
  21 * Write Work Requests.
  22 *
  23 * Each WR chain handles a single contiguous server-side buffer,
  24 * because scatterlist entries after the first have to start on
  25 * page alignment. xdr_buf iovecs cannot guarantee alignment.
  26 *
  27 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
  28 * from a client may contain a unique R_key, so each WR chain moves
  29 * up to one segment at a time.
  30 *
  31 * The scatterlist makes this data structure over 4KB in size. To
  32 * make it less likely to fail, and to handle the allocation for
  33 * smaller I/O requests without disabling bottom-halves, these
  34 * contexts are created on demand, but cached and reused until the
  35 * controlling svcxprt_rdma is destroyed.
  36 */
  37struct svc_rdma_rw_ctxt {
  38        struct list_head        rw_list;
  39        struct rdma_rw_ctx      rw_ctx;
  40        unsigned int            rw_nents;
  41        struct sg_table         rw_sg_table;
  42        struct scatterlist      rw_first_sgl[];
  43};
  44
  45static inline struct svc_rdma_rw_ctxt *
  46svc_rdma_next_ctxt(struct list_head *list)
  47{
  48        return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt,
  49                                        rw_list);
  50}
  51
  52static struct svc_rdma_rw_ctxt *
  53svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges)
  54{
  55        struct svc_rdma_rw_ctxt *ctxt;
  56
  57        spin_lock(&rdma->sc_rw_ctxt_lock);
  58
  59        ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts);
  60        if (ctxt) {
  61                list_del(&ctxt->rw_list);
  62                spin_unlock(&rdma->sc_rw_ctxt_lock);
  63        } else {
  64                spin_unlock(&rdma->sc_rw_ctxt_lock);
  65                ctxt = kmalloc(struct_size(ctxt, rw_first_sgl, SG_CHUNK_SIZE),
  66                               GFP_KERNEL);
  67                if (!ctxt)
  68                        goto out_noctx;
  69                INIT_LIST_HEAD(&ctxt->rw_list);
  70        }
  71
  72        ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
  73        if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
  74                                   ctxt->rw_sg_table.sgl,
  75                                   SG_CHUNK_SIZE))
  76                goto out_free;
  77        return ctxt;
  78
  79out_free:
  80        kfree(ctxt);
  81out_noctx:
  82        trace_svcrdma_no_rwctx_err(rdma, sges);
  83        return NULL;
  84}
  85
  86static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
  87                                 struct svc_rdma_rw_ctxt *ctxt)
  88{
  89        sg_free_table_chained(&ctxt->rw_sg_table, SG_CHUNK_SIZE);
  90
  91        spin_lock(&rdma->sc_rw_ctxt_lock);
  92        list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts);
  93        spin_unlock(&rdma->sc_rw_ctxt_lock);
  94}
  95
  96/**
  97 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
  98 * @rdma: transport about to be destroyed
  99 *
 100 */
 101void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
 102{
 103        struct svc_rdma_rw_ctxt *ctxt;
 104
 105        while ((ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts)) != NULL) {
 106                list_del(&ctxt->rw_list);
 107                kfree(ctxt);
 108        }
 109}
 110
 111/**
 112 * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O
 113 * @rdma: controlling transport instance
 114 * @ctxt: R/W context to prepare
 115 * @offset: RDMA offset
 116 * @handle: RDMA tag/handle
 117 * @direction: I/O direction
 118 *
 119 * Returns on success, the number of WQEs that will be needed
 120 * on the workqueue, or a negative errno.
 121 */
 122static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma,
 123                                struct svc_rdma_rw_ctxt *ctxt,
 124                                u64 offset, u32 handle,
 125                                enum dma_data_direction direction)
 126{
 127        int ret;
 128
 129        ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, rdma->sc_port_num,
 130                               ctxt->rw_sg_table.sgl, ctxt->rw_nents,
 131                               0, offset, handle, direction);
 132        if (unlikely(ret < 0)) {
 133                svc_rdma_put_rw_ctxt(rdma, ctxt);
 134                trace_svcrdma_dma_map_rw_err(rdma, ctxt->rw_nents, ret);
 135        }
 136        return ret;
 137}
 138
 139/* A chunk context tracks all I/O for moving one Read or Write
 140 * chunk. This is a set of rdma_rw's that handle data movement
 141 * for all segments of one chunk.
 142 *
 143 * These are small, acquired with a single allocator call, and
 144 * no more than one is needed per chunk. They are allocated on
 145 * demand, and not cached.
 146 */
 147struct svc_rdma_chunk_ctxt {
 148        struct rpc_rdma_cid     cc_cid;
 149        struct ib_cqe           cc_cqe;
 150        struct svcxprt_rdma     *cc_rdma;
 151        struct list_head        cc_rwctxts;
 152        int                     cc_sqecount;
 153};
 154
 155static void svc_rdma_cc_cid_init(struct svcxprt_rdma *rdma,
 156                                 struct rpc_rdma_cid *cid)
 157{
 158        cid->ci_queue_id = rdma->sc_sq_cq->res.id;
 159        cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids);
 160}
 161
 162static void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
 163                             struct svc_rdma_chunk_ctxt *cc)
 164{
 165        svc_rdma_cc_cid_init(rdma, &cc->cc_cid);
 166        cc->cc_rdma = rdma;
 167
 168        INIT_LIST_HEAD(&cc->cc_rwctxts);
 169        cc->cc_sqecount = 0;
 170}
 171
 172static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc,
 173                                enum dma_data_direction dir)
 174{
 175        struct svcxprt_rdma *rdma = cc->cc_rdma;
 176        struct svc_rdma_rw_ctxt *ctxt;
 177
 178        while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) {
 179                list_del(&ctxt->rw_list);
 180
 181                rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
 182                                    rdma->sc_port_num, ctxt->rw_sg_table.sgl,
 183                                    ctxt->rw_nents, dir);
 184                svc_rdma_put_rw_ctxt(rdma, ctxt);
 185        }
 186}
 187
 188/* State for sending a Write or Reply chunk.
 189 *  - Tracks progress of writing one chunk over all its segments
 190 *  - Stores arguments for the SGL constructor functions
 191 */
 192struct svc_rdma_write_info {
 193        const struct svc_rdma_chunk     *wi_chunk;
 194
 195        /* write state of this chunk */
 196        unsigned int            wi_seg_off;
 197        unsigned int            wi_seg_no;
 198
 199        /* SGL constructor arguments */
 200        const struct xdr_buf    *wi_xdr;
 201        unsigned char           *wi_base;
 202        unsigned int            wi_next_off;
 203
 204        struct svc_rdma_chunk_ctxt      wi_cc;
 205};
 206
 207static struct svc_rdma_write_info *
 208svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma,
 209                          const struct svc_rdma_chunk *chunk)
 210{
 211        struct svc_rdma_write_info *info;
 212
 213        info = kmalloc(sizeof(*info), GFP_KERNEL);
 214        if (!info)
 215                return info;
 216
 217        info->wi_chunk = chunk;
 218        info->wi_seg_off = 0;
 219        info->wi_seg_no = 0;
 220        svc_rdma_cc_init(rdma, &info->wi_cc);
 221        info->wi_cc.cc_cqe.done = svc_rdma_write_done;
 222        return info;
 223}
 224
 225static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
 226{
 227        svc_rdma_cc_release(&info->wi_cc, DMA_TO_DEVICE);
 228        kfree(info);
 229}
 230
 231/**
 232 * svc_rdma_write_done - Write chunk completion
 233 * @cq: controlling Completion Queue
 234 * @wc: Work Completion
 235 *
 236 * Pages under I/O are freed by a subsequent Send completion.
 237 */
 238static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
 239{
 240        struct ib_cqe *cqe = wc->wr_cqe;
 241        struct svc_rdma_chunk_ctxt *cc =
 242                        container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
 243        struct svcxprt_rdma *rdma = cc->cc_rdma;
 244        struct svc_rdma_write_info *info =
 245                        container_of(cc, struct svc_rdma_write_info, wi_cc);
 246
 247        trace_svcrdma_wc_write(wc, &cc->cc_cid);
 248
 249        atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
 250        wake_up(&rdma->sc_send_wait);
 251
 252        if (unlikely(wc->status != IB_WC_SUCCESS))
 253                set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
 254
 255        svc_rdma_write_info_free(info);
 256}
 257
 258/* State for pulling a Read chunk.
 259 */
 260struct svc_rdma_read_info {
 261        struct svc_rqst                 *ri_rqst;
 262        struct svc_rdma_recv_ctxt       *ri_readctxt;
 263        unsigned int                    ri_pageno;
 264        unsigned int                    ri_pageoff;
 265        unsigned int                    ri_totalbytes;
 266
 267        struct svc_rdma_chunk_ctxt      ri_cc;
 268};
 269
 270static struct svc_rdma_read_info *
 271svc_rdma_read_info_alloc(struct svcxprt_rdma *rdma)
 272{
 273        struct svc_rdma_read_info *info;
 274
 275        info = kmalloc(sizeof(*info), GFP_KERNEL);
 276        if (!info)
 277                return info;
 278
 279        svc_rdma_cc_init(rdma, &info->ri_cc);
 280        info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done;
 281        return info;
 282}
 283
 284static void svc_rdma_read_info_free(struct svc_rdma_read_info *info)
 285{
 286        svc_rdma_cc_release(&info->ri_cc, DMA_FROM_DEVICE);
 287        kfree(info);
 288}
 289
 290/**
 291 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
 292 * @cq: controlling Completion Queue
 293 * @wc: Work Completion
 294 *
 295 */
 296static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
 297{
 298        struct ib_cqe *cqe = wc->wr_cqe;
 299        struct svc_rdma_chunk_ctxt *cc =
 300                        container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
 301        struct svcxprt_rdma *rdma = cc->cc_rdma;
 302        struct svc_rdma_read_info *info =
 303                        container_of(cc, struct svc_rdma_read_info, ri_cc);
 304
 305        trace_svcrdma_wc_read(wc, &cc->cc_cid);
 306
 307        atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
 308        wake_up(&rdma->sc_send_wait);
 309
 310        if (unlikely(wc->status != IB_WC_SUCCESS)) {
 311                set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
 312                svc_rdma_recv_ctxt_put(rdma, info->ri_readctxt);
 313        } else {
 314                spin_lock(&rdma->sc_rq_dto_lock);
 315                list_add_tail(&info->ri_readctxt->rc_list,
 316                              &rdma->sc_read_complete_q);
 317                /* Note the unlock pairs with the smp_rmb in svc_xprt_ready: */
 318                set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
 319                spin_unlock(&rdma->sc_rq_dto_lock);
 320
 321                svc_xprt_enqueue(&rdma->sc_xprt);
 322        }
 323
 324        svc_rdma_read_info_free(info);
 325}
 326
 327/* This function sleeps when the transport's Send Queue is congested.
 328 *
 329 * Assumptions:
 330 * - If ib_post_send() succeeds, only one completion is expected,
 331 *   even if one or more WRs are flushed. This is true when posting
 332 *   an rdma_rw_ctx or when posting a single signaled WR.
 333 */
 334static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
 335{
 336        struct svcxprt_rdma *rdma = cc->cc_rdma;
 337        struct svc_xprt *xprt = &rdma->sc_xprt;
 338        struct ib_send_wr *first_wr;
 339        const struct ib_send_wr *bad_wr;
 340        struct list_head *tmp;
 341        struct ib_cqe *cqe;
 342        int ret;
 343
 344        if (cc->cc_sqecount > rdma->sc_sq_depth)
 345                return -EINVAL;
 346
 347        first_wr = NULL;
 348        cqe = &cc->cc_cqe;
 349        list_for_each(tmp, &cc->cc_rwctxts) {
 350                struct svc_rdma_rw_ctxt *ctxt;
 351
 352                ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list);
 353                first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp,
 354                                           rdma->sc_port_num, cqe, first_wr);
 355                cqe = NULL;
 356        }
 357
 358        do {
 359                if (atomic_sub_return(cc->cc_sqecount,
 360                                      &rdma->sc_sq_avail) > 0) {
 361                        ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
 362                        if (ret)
 363                                break;
 364                        return 0;
 365                }
 366
 367                trace_svcrdma_sq_full(rdma);
 368                atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
 369                wait_event(rdma->sc_send_wait,
 370                           atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
 371                trace_svcrdma_sq_retry(rdma);
 372        } while (1);
 373
 374        trace_svcrdma_sq_post_err(rdma, ret);
 375        set_bit(XPT_CLOSE, &xprt->xpt_flags);
 376
 377        /* If even one was posted, there will be a completion. */
 378        if (bad_wr != first_wr)
 379                return 0;
 380
 381        atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
 382        wake_up(&rdma->sc_send_wait);
 383        return -ENOTCONN;
 384}
 385
 386/* Build and DMA-map an SGL that covers one kvec in an xdr_buf
 387 */
 388static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info,
 389                               unsigned int len,
 390                               struct svc_rdma_rw_ctxt *ctxt)
 391{
 392        struct scatterlist *sg = ctxt->rw_sg_table.sgl;
 393
 394        sg_set_buf(&sg[0], info->wi_base, len);
 395        info->wi_base += len;
 396
 397        ctxt->rw_nents = 1;
 398}
 399
 400/* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
 401 */
 402static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
 403                                    unsigned int remaining,
 404                                    struct svc_rdma_rw_ctxt *ctxt)
 405{
 406        unsigned int sge_no, sge_bytes, page_off, page_no;
 407        const struct xdr_buf *xdr = info->wi_xdr;
 408        struct scatterlist *sg;
 409        struct page **page;
 410
 411        page_off = info->wi_next_off + xdr->page_base;
 412        page_no = page_off >> PAGE_SHIFT;
 413        page_off = offset_in_page(page_off);
 414        page = xdr->pages + page_no;
 415        info->wi_next_off += remaining;
 416        sg = ctxt->rw_sg_table.sgl;
 417        sge_no = 0;
 418        do {
 419                sge_bytes = min_t(unsigned int, remaining,
 420                                  PAGE_SIZE - page_off);
 421                sg_set_page(sg, *page, sge_bytes, page_off);
 422
 423                remaining -= sge_bytes;
 424                sg = sg_next(sg);
 425                page_off = 0;
 426                sge_no++;
 427                page++;
 428        } while (remaining);
 429
 430        ctxt->rw_nents = sge_no;
 431}
 432
 433/* Construct RDMA Write WRs to send a portion of an xdr_buf containing
 434 * an RPC Reply.
 435 */
 436static int
 437svc_rdma_build_writes(struct svc_rdma_write_info *info,
 438                      void (*constructor)(struct svc_rdma_write_info *info,
 439                                          unsigned int len,
 440                                          struct svc_rdma_rw_ctxt *ctxt),
 441                      unsigned int remaining)
 442{
 443        struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
 444        struct svcxprt_rdma *rdma = cc->cc_rdma;
 445        const struct svc_rdma_segment *seg;
 446        struct svc_rdma_rw_ctxt *ctxt;
 447        int ret;
 448
 449        do {
 450                unsigned int write_len;
 451                u64 offset;
 452
 453                seg = &info->wi_chunk->ch_segments[info->wi_seg_no];
 454                if (!seg)
 455                        goto out_overflow;
 456
 457                write_len = min(remaining, seg->rs_length - info->wi_seg_off);
 458                if (!write_len)
 459                        goto out_overflow;
 460                ctxt = svc_rdma_get_rw_ctxt(rdma,
 461                                            (write_len >> PAGE_SHIFT) + 2);
 462                if (!ctxt)
 463                        return -ENOMEM;
 464
 465                constructor(info, write_len, ctxt);
 466                offset = seg->rs_offset + info->wi_seg_off;
 467                ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle,
 468                                           DMA_TO_DEVICE);
 469                if (ret < 0)
 470                        return -EIO;
 471
 472                list_add(&ctxt->rw_list, &cc->cc_rwctxts);
 473                cc->cc_sqecount += ret;
 474                if (write_len == seg->rs_length - info->wi_seg_off) {
 475                        info->wi_seg_no++;
 476                        info->wi_seg_off = 0;
 477                } else {
 478                        info->wi_seg_off += write_len;
 479                }
 480                remaining -= write_len;
 481        } while (remaining);
 482
 483        return 0;
 484
 485out_overflow:
 486        trace_svcrdma_small_wrch_err(rdma, remaining, info->wi_seg_no,
 487                                     info->wi_chunk->ch_segcount);
 488        return -E2BIG;
 489}
 490
 491/**
 492 * svc_rdma_iov_write - Construct RDMA Writes from an iov
 493 * @info: pointer to write arguments
 494 * @iov: kvec to write
 495 *
 496 * Returns:
 497 *   On succes, returns zero
 498 *   %-E2BIG if the client-provided Write chunk is too small
 499 *   %-ENOMEM if a resource has been exhausted
 500 *   %-EIO if an rdma-rw error occurred
 501 */
 502static int svc_rdma_iov_write(struct svc_rdma_write_info *info,
 503                              const struct kvec *iov)
 504{
 505        info->wi_base = iov->iov_base;
 506        return svc_rdma_build_writes(info, svc_rdma_vec_to_sg,
 507                                     iov->iov_len);
 508}
 509
 510/**
 511 * svc_rdma_pages_write - Construct RDMA Writes from pages
 512 * @info: pointer to write arguments
 513 * @xdr: xdr_buf with pages to write
 514 * @offset: offset into the content of @xdr
 515 * @length: number of bytes to write
 516 *
 517 * Returns:
 518 *   On succes, returns zero
 519 *   %-E2BIG if the client-provided Write chunk is too small
 520 *   %-ENOMEM if a resource has been exhausted
 521 *   %-EIO if an rdma-rw error occurred
 522 */
 523static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
 524                                const struct xdr_buf *xdr,
 525                                unsigned int offset,
 526                                unsigned long length)
 527{
 528        info->wi_xdr = xdr;
 529        info->wi_next_off = offset - xdr->head[0].iov_len;
 530        return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
 531                                     length);
 532}
 533
 534/**
 535 * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf
 536 * @xdr: xdr_buf to write
 537 * @data: pointer to write arguments
 538 *
 539 * Returns:
 540 *   On succes, returns zero
 541 *   %-E2BIG if the client-provided Write chunk is too small
 542 *   %-ENOMEM if a resource has been exhausted
 543 *   %-EIO if an rdma-rw error occurred
 544 */
 545static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data)
 546{
 547        struct svc_rdma_write_info *info = data;
 548        int ret;
 549
 550        if (xdr->head[0].iov_len) {
 551                ret = svc_rdma_iov_write(info, &xdr->head[0]);
 552                if (ret < 0)
 553                        return ret;
 554        }
 555
 556        if (xdr->page_len) {
 557                ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
 558                                           xdr->page_len);
 559                if (ret < 0)
 560                        return ret;
 561        }
 562
 563        if (xdr->tail[0].iov_len) {
 564                ret = svc_rdma_iov_write(info, &xdr->tail[0]);
 565                if (ret < 0)
 566                        return ret;
 567        }
 568
 569        return xdr->len;
 570}
 571
 572/**
 573 * svc_rdma_send_write_chunk - Write all segments in a Write chunk
 574 * @rdma: controlling RDMA transport
 575 * @chunk: Write chunk provided by the client
 576 * @xdr: xdr_buf containing the data payload
 577 *
 578 * Returns a non-negative number of bytes the chunk consumed, or
 579 *      %-E2BIG if the payload was larger than the Write chunk,
 580 *      %-EINVAL if client provided too many segments,
 581 *      %-ENOMEM if rdma_rw context pool was exhausted,
 582 *      %-ENOTCONN if posting failed (connection is lost),
 583 *      %-EIO if rdma_rw initialization failed (DMA mapping, etc).
 584 */
 585int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
 586                              const struct svc_rdma_chunk *chunk,
 587                              const struct xdr_buf *xdr)
 588{
 589        struct svc_rdma_write_info *info;
 590        struct svc_rdma_chunk_ctxt *cc;
 591        int ret;
 592
 593        info = svc_rdma_write_info_alloc(rdma, chunk);
 594        if (!info)
 595                return -ENOMEM;
 596        cc = &info->wi_cc;
 597
 598        ret = svc_rdma_xb_write(xdr, info);
 599        if (ret != xdr->len)
 600                goto out_err;
 601
 602        trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
 603        ret = svc_rdma_post_chunk_ctxt(cc);
 604        if (ret < 0)
 605                goto out_err;
 606        return xdr->len;
 607
 608out_err:
 609        svc_rdma_write_info_free(info);
 610        return ret;
 611}
 612
 613/**
 614 * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
 615 * @rdma: controlling RDMA transport
 616 * @rctxt: Write and Reply chunks from client
 617 * @xdr: xdr_buf containing an RPC Reply
 618 *
 619 * Returns a non-negative number of bytes the chunk consumed, or
 620 *      %-E2BIG if the payload was larger than the Reply chunk,
 621 *      %-EINVAL if client provided too many segments,
 622 *      %-ENOMEM if rdma_rw context pool was exhausted,
 623 *      %-ENOTCONN if posting failed (connection is lost),
 624 *      %-EIO if rdma_rw initialization failed (DMA mapping, etc).
 625 */
 626int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 627                              const struct svc_rdma_recv_ctxt *rctxt,
 628                              const struct xdr_buf *xdr)
 629{
 630        struct svc_rdma_write_info *info;
 631        struct svc_rdma_chunk_ctxt *cc;
 632        struct svc_rdma_chunk *chunk;
 633        int ret;
 634
 635        if (pcl_is_empty(&rctxt->rc_reply_pcl))
 636                return 0;
 637
 638        chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
 639        info = svc_rdma_write_info_alloc(rdma, chunk);
 640        if (!info)
 641                return -ENOMEM;
 642        cc = &info->wi_cc;
 643
 644        ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
 645                                      svc_rdma_xb_write, info);
 646        if (ret < 0)
 647                goto out_err;
 648
 649        trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount);
 650        ret = svc_rdma_post_chunk_ctxt(cc);
 651        if (ret < 0)
 652                goto out_err;
 653
 654        return xdr->len;
 655
 656out_err:
 657        svc_rdma_write_info_free(info);
 658        return ret;
 659}
 660
 661/**
 662 * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment
 663 * @info: context for ongoing I/O
 664 * @segment: co-ordinates of remote memory to be read
 665 *
 666 * Returns:
 667 *   %0: the Read WR chain was constructed successfully
 668 *   %-EINVAL: there were not enough rq_pages to finish
 669 *   %-ENOMEM: allocating a local resources failed
 670 *   %-EIO: a DMA mapping error occurred
 671 */
 672static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
 673                                       const struct svc_rdma_segment *segment)
 674{
 675        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 676        struct svc_rdma_chunk_ctxt *cc = &info->ri_cc;
 677        struct svc_rqst *rqstp = info->ri_rqst;
 678        struct svc_rdma_rw_ctxt *ctxt;
 679        unsigned int sge_no, seg_len, len;
 680        struct scatterlist *sg;
 681        int ret;
 682
 683        len = segment->rs_length;
 684        sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT;
 685        ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no);
 686        if (!ctxt)
 687                return -ENOMEM;
 688        ctxt->rw_nents = sge_no;
 689
 690        sg = ctxt->rw_sg_table.sgl;
 691        for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) {
 692                seg_len = min_t(unsigned int, len,
 693                                PAGE_SIZE - info->ri_pageoff);
 694
 695                head->rc_arg.pages[info->ri_pageno] =
 696                        rqstp->rq_pages[info->ri_pageno];
 697                if (!info->ri_pageoff)
 698                        head->rc_page_count++;
 699
 700                sg_set_page(sg, rqstp->rq_pages[info->ri_pageno],
 701                            seg_len, info->ri_pageoff);
 702                sg = sg_next(sg);
 703
 704                info->ri_pageoff += seg_len;
 705                if (info->ri_pageoff == PAGE_SIZE) {
 706                        info->ri_pageno++;
 707                        info->ri_pageoff = 0;
 708                }
 709                len -= seg_len;
 710
 711                /* Safety check */
 712                if (len &&
 713                    &rqstp->rq_pages[info->ri_pageno + 1] > rqstp->rq_page_end)
 714                        goto out_overrun;
 715        }
 716
 717        ret = svc_rdma_rw_ctx_init(cc->cc_rdma, ctxt, segment->rs_offset,
 718                                   segment->rs_handle, DMA_FROM_DEVICE);
 719        if (ret < 0)
 720                return -EIO;
 721
 722        list_add(&ctxt->rw_list, &cc->cc_rwctxts);
 723        cc->cc_sqecount += ret;
 724        return 0;
 725
 726out_overrun:
 727        trace_svcrdma_page_overrun_err(cc->cc_rdma, rqstp, info->ri_pageno);
 728        return -EINVAL;
 729}
 730
 731/**
 732 * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk
 733 * @info: context for ongoing I/O
 734 * @chunk: Read chunk to pull
 735 *
 736 * Return values:
 737 *   %0: the Read WR chain was constructed successfully
 738 *   %-EINVAL: there were not enough resources to finish
 739 *   %-ENOMEM: allocating a local resources failed
 740 *   %-EIO: a DMA mapping error occurred
 741 */
 742static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info,
 743                                     const struct svc_rdma_chunk *chunk)
 744{
 745        const struct svc_rdma_segment *segment;
 746        int ret;
 747
 748        ret = -EINVAL;
 749        pcl_for_each_segment(segment, chunk) {
 750                ret = svc_rdma_build_read_segment(info, segment);
 751                if (ret < 0)
 752                        break;
 753                info->ri_totalbytes += segment->rs_length;
 754        }
 755        return ret;
 756}
 757
 758/**
 759 * svc_rdma_copy_inline_range - Copy part of the inline content into pages
 760 * @info: context for RDMA Reads
 761 * @offset: offset into the Receive buffer of region to copy
 762 * @remaining: length of region to copy
 763 *
 764 * Take a page at a time from rqstp->rq_pages and copy the inline
 765 * content from the Receive buffer into that page. Update
 766 * info->ri_pageno and info->ri_pageoff so that the next RDMA Read
 767 * result will land contiguously with the copied content.
 768 *
 769 * Return values:
 770 *   %0: Inline content was successfully copied
 771 *   %-EINVAL: offset or length was incorrect
 772 */
 773static int svc_rdma_copy_inline_range(struct svc_rdma_read_info *info,
 774                                      unsigned int offset,
 775                                      unsigned int remaining)
 776{
 777        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 778        unsigned char *dst, *src = head->rc_recv_buf;
 779        struct svc_rqst *rqstp = info->ri_rqst;
 780        unsigned int page_no, numpages;
 781
 782        numpages = PAGE_ALIGN(info->ri_pageoff + remaining) >> PAGE_SHIFT;
 783        for (page_no = 0; page_no < numpages; page_no++) {
 784                unsigned int page_len;
 785
 786                page_len = min_t(unsigned int, remaining,
 787                                 PAGE_SIZE - info->ri_pageoff);
 788
 789                head->rc_arg.pages[info->ri_pageno] =
 790                        rqstp->rq_pages[info->ri_pageno];
 791                if (!info->ri_pageoff)
 792                        head->rc_page_count++;
 793
 794                dst = page_address(head->rc_arg.pages[info->ri_pageno]);
 795                memcpy(dst + info->ri_pageno, src + offset, page_len);
 796
 797                info->ri_totalbytes += page_len;
 798                info->ri_pageoff += page_len;
 799                if (info->ri_pageoff == PAGE_SIZE) {
 800                        info->ri_pageno++;
 801                        info->ri_pageoff = 0;
 802                }
 803                remaining -= page_len;
 804                offset += page_len;
 805        }
 806
 807        return -EINVAL;
 808}
 809
 810/**
 811 * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks
 812 * @info: context for RDMA Reads
 813 *
 814 * The chunk data lands in head->rc_arg as a series of contiguous pages,
 815 * like an incoming TCP call.
 816 *
 817 * Return values:
 818 *   %0: RDMA Read WQEs were successfully built
 819 *   %-EINVAL: client provided too many chunks or segments,
 820 *   %-ENOMEM: rdma_rw context pool was exhausted,
 821 *   %-ENOTCONN: posting failed (connection is lost),
 822 *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
 823 */
 824static noinline int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info *info)
 825{
 826        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 827        const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
 828        struct svc_rdma_chunk *chunk, *next;
 829        struct xdr_buf *buf = &head->rc_arg;
 830        unsigned int start, length;
 831        int ret;
 832
 833        start = 0;
 834        chunk = pcl_first_chunk(pcl);
 835        length = chunk->ch_position;
 836        ret = svc_rdma_copy_inline_range(info, start, length);
 837        if (ret < 0)
 838                return ret;
 839
 840        pcl_for_each_chunk(chunk, pcl) {
 841                ret = svc_rdma_build_read_chunk(info, chunk);
 842                if (ret < 0)
 843                        return ret;
 844
 845                next = pcl_next_chunk(pcl, chunk);
 846                if (!next)
 847                        break;
 848
 849                start += length;
 850                length = next->ch_position - info->ri_totalbytes;
 851                ret = svc_rdma_copy_inline_range(info, start, length);
 852                if (ret < 0)
 853                        return ret;
 854        }
 855
 856        start += length;
 857        length = head->rc_byte_len - start;
 858        ret = svc_rdma_copy_inline_range(info, start, length);
 859        if (ret < 0)
 860                return ret;
 861
 862        buf->len += info->ri_totalbytes;
 863        buf->buflen += info->ri_totalbytes;
 864
 865        head->rc_hdr_count = 1;
 866        buf->head[0].iov_base = page_address(head->rc_pages[0]);
 867        buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes);
 868        buf->page_len = info->ri_totalbytes - buf->head[0].iov_len;
 869        return 0;
 870}
 871
 872/**
 873 * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks
 874 * @info: context for RDMA Reads
 875 *
 876 * The chunk data lands in the page list of head->rc_arg.pages.
 877 *
 878 * Currently NFSD does not look at the head->rc_arg.tail[0] iovec.
 879 * Therefore, XDR round-up of the Read chunk and trailing
 880 * inline content must both be added at the end of the pagelist.
 881 *
 882 * Return values:
 883 *   %0: RDMA Read WQEs were successfully built
 884 *   %-EINVAL: client provided too many chunks or segments,
 885 *   %-ENOMEM: rdma_rw context pool was exhausted,
 886 *   %-ENOTCONN: posting failed (connection is lost),
 887 *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
 888 */
 889static int svc_rdma_read_data_item(struct svc_rdma_read_info *info)
 890{
 891        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 892        struct xdr_buf *buf = &head->rc_arg;
 893        struct svc_rdma_chunk *chunk;
 894        unsigned int length;
 895        int ret;
 896
 897        chunk = pcl_first_chunk(&head->rc_read_pcl);
 898        ret = svc_rdma_build_read_chunk(info, chunk);
 899        if (ret < 0)
 900                goto out;
 901
 902        head->rc_hdr_count = 0;
 903
 904        /* Split the Receive buffer between the head and tail
 905         * buffers at Read chunk's position. XDR roundup of the
 906         * chunk is not included in either the pagelist or in
 907         * the tail.
 908         */
 909        buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position;
 910        buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position;
 911        buf->head[0].iov_len = chunk->ch_position;
 912
 913        /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2).
 914         *
 915         * If the client already rounded up the chunk length, the
 916         * length does not change. Otherwise, the length of the page
 917         * list is increased to include XDR round-up.
 918         *
 919         * Currently these chunks always start at page offset 0,
 920         * thus the rounded-up length never crosses a page boundary.
 921         */
 922        length = XDR_QUADLEN(info->ri_totalbytes) << 2;
 923        buf->page_len = length;
 924        buf->len += length;
 925        buf->buflen += length;
 926
 927out:
 928        return ret;
 929}
 930
 931/**
 932 * svc_rdma_read_chunk_range - Build RDMA Read WQEs for portion of a chunk
 933 * @info: context for RDMA Reads
 934 * @chunk: parsed Call chunk to pull
 935 * @offset: offset of region to pull
 936 * @length: length of region to pull
 937 *
 938 * Return values:
 939 *   %0: RDMA Read WQEs were successfully built
 940 *   %-EINVAL: there were not enough resources to finish
 941 *   %-ENOMEM: rdma_rw context pool was exhausted,
 942 *   %-ENOTCONN: posting failed (connection is lost),
 943 *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
 944 */
 945static int svc_rdma_read_chunk_range(struct svc_rdma_read_info *info,
 946                                     const struct svc_rdma_chunk *chunk,
 947                                     unsigned int offset, unsigned int length)
 948{
 949        const struct svc_rdma_segment *segment;
 950        int ret;
 951
 952        ret = -EINVAL;
 953        pcl_for_each_segment(segment, chunk) {
 954                struct svc_rdma_segment dummy;
 955
 956                if (offset > segment->rs_length) {
 957                        offset -= segment->rs_length;
 958                        continue;
 959                }
 960
 961                dummy.rs_handle = segment->rs_handle;
 962                dummy.rs_length = min_t(u32, length, segment->rs_length) - offset;
 963                dummy.rs_offset = segment->rs_offset + offset;
 964
 965                ret = svc_rdma_build_read_segment(info, &dummy);
 966                if (ret < 0)
 967                        break;
 968
 969                info->ri_totalbytes += dummy.rs_length;
 970                length -= dummy.rs_length;
 971                offset = 0;
 972        }
 973        return ret;
 974}
 975
 976/**
 977 * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message
 978 * @info: context for RDMA Reads
 979 *
 980 * Return values:
 981 *   %0: RDMA Read WQEs were successfully built
 982 *   %-EINVAL: there were not enough resources to finish
 983 *   %-ENOMEM: rdma_rw context pool was exhausted,
 984 *   %-ENOTCONN: posting failed (connection is lost),
 985 *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
 986 */
 987static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info)
 988{
 989        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 990        const struct svc_rdma_chunk *call_chunk =
 991                        pcl_first_chunk(&head->rc_call_pcl);
 992        const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
 993        struct svc_rdma_chunk *chunk, *next;
 994        unsigned int start, length;
 995        int ret;
 996
 997        if (pcl_is_empty(pcl))
 998                return svc_rdma_build_read_chunk(info, call_chunk);
 999
1000        start = 0;
1001        chunk = pcl_first_chunk(pcl);
1002        length = chunk->ch_position;
1003        ret = svc_rdma_read_chunk_range(info, call_chunk, start, length);
1004        if (ret < 0)
1005                return ret;
1006
1007        pcl_for_each_chunk(chunk, pcl) {
1008                ret = svc_rdma_build_read_chunk(info, chunk);
1009                if (ret < 0)
1010                        return ret;
1011
1012                next = pcl_next_chunk(pcl, chunk);
1013                if (!next)
1014                        break;
1015
1016                start += length;
1017                length = next->ch_position - info->ri_totalbytes;
1018                ret = svc_rdma_read_chunk_range(info, call_chunk,
1019                                                start, length);
1020                if (ret < 0)
1021                        return ret;
1022        }
1023
1024        start += length;
1025        length = call_chunk->ch_length - start;
1026        return svc_rdma_read_chunk_range(info, call_chunk, start, length);
1027}
1028
1029/**
1030 * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message
1031 * @info: context for RDMA Reads
1032 *
1033 * The start of the data lands in the first page just after the
1034 * Transport header, and the rest lands in the page list of
1035 * head->rc_arg.pages.
1036 *
1037 * Assumptions:
1038 *      - A PZRC is never sent in an RDMA_MSG message, though it's
1039 *        allowed by spec.
1040 *
1041 * Return values:
1042 *   %0: RDMA Read WQEs were successfully built
1043 *   %-EINVAL: client provided too many chunks or segments,
1044 *   %-ENOMEM: rdma_rw context pool was exhausted,
1045 *   %-ENOTCONN: posting failed (connection is lost),
1046 *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1047 */
1048static noinline int svc_rdma_read_special(struct svc_rdma_read_info *info)
1049{
1050        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
1051        struct xdr_buf *buf = &head->rc_arg;
1052        int ret;
1053
1054        ret = svc_rdma_read_call_chunk(info);
1055        if (ret < 0)
1056                goto out;
1057
1058        buf->len += info->ri_totalbytes;
1059        buf->buflen += info->ri_totalbytes;
1060
1061        head->rc_hdr_count = 1;
1062        buf->head[0].iov_base = page_address(head->rc_pages[0]);
1063        buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes);
1064        buf->page_len = info->ri_totalbytes - buf->head[0].iov_len;
1065
1066out:
1067        return ret;
1068}
1069
1070/* Pages under I/O have been copied to head->rc_pages. Ensure they
1071 * are not released by svc_xprt_release() until the I/O is complete.
1072 *
1073 * This has to be done after all Read WRs are constructed to properly
1074 * handle a page that is part of I/O on behalf of two different RDMA
1075 * segments.
1076 *
1077 * Do this only if I/O has been posted. Otherwise, we do indeed want
1078 * svc_xprt_release() to clean things up properly.
1079 */
1080static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
1081                                   const unsigned int start,
1082                                   const unsigned int num_pages)
1083{
1084        unsigned int i;
1085
1086        for (i = start; i < num_pages + start; i++)
1087                rqstp->rq_pages[i] = NULL;
1088}
1089
1090/**
1091 * svc_rdma_process_read_list - Pull list of Read chunks from the client
1092 * @rdma: controlling RDMA transport
1093 * @rqstp: set of pages to use as Read sink buffers
1094 * @head: pages under I/O collect here
1095 *
1096 * The RPC/RDMA protocol assumes that the upper layer's XDR decoders
1097 * pull each Read chunk as they decode an incoming RPC message.
1098 *
1099 * On Linux, however, the server needs to have a fully-constructed RPC
1100 * message in rqstp->rq_arg when there is a positive return code from
1101 * ->xpo_recvfrom. So the Read list is safety-checked immediately when
1102 * it is received, then here the whole Read list is pulled all at once.
1103 * The ingress RPC message is fully reconstructed once all associated
1104 * RDMA Reads have completed.
1105 *
1106 * Return values:
1107 *   %1: all needed RDMA Reads were posted successfully,
1108 *   %-EINVAL: client provided too many chunks or segments,
1109 *   %-ENOMEM: rdma_rw context pool was exhausted,
1110 *   %-ENOTCONN: posting failed (connection is lost),
1111 *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1112 */
1113int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
1114                               struct svc_rqst *rqstp,
1115                               struct svc_rdma_recv_ctxt *head)
1116{
1117        struct svc_rdma_read_info *info;
1118        struct svc_rdma_chunk_ctxt *cc;
1119        int ret;
1120
1121        /* The request (with page list) is constructed in
1122         * head->rc_arg. Pages involved with RDMA Read I/O are
1123         * transferred there.
1124         */
1125        head->rc_arg.head[0] = rqstp->rq_arg.head[0];
1126        head->rc_arg.tail[0] = rqstp->rq_arg.tail[0];
1127        head->rc_arg.pages = head->rc_pages;
1128        head->rc_arg.page_base = 0;
1129        head->rc_arg.page_len = 0;
1130        head->rc_arg.len = rqstp->rq_arg.len;
1131        head->rc_arg.buflen = rqstp->rq_arg.buflen;
1132
1133        info = svc_rdma_read_info_alloc(rdma);
1134        if (!info)
1135                return -ENOMEM;
1136        cc = &info->ri_cc;
1137        info->ri_rqst = rqstp;
1138        info->ri_readctxt = head;
1139        info->ri_pageno = 0;
1140        info->ri_pageoff = 0;
1141        info->ri_totalbytes = 0;
1142
1143        if (pcl_is_empty(&head->rc_call_pcl)) {
1144                if (head->rc_read_pcl.cl_count == 1)
1145                        ret = svc_rdma_read_data_item(info);
1146                else
1147                        ret = svc_rdma_read_multiple_chunks(info);
1148        } else
1149                ret = svc_rdma_read_special(info);
1150        if (ret < 0)
1151                goto out_err;
1152
1153        trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount);
1154        ret = svc_rdma_post_chunk_ctxt(cc);
1155        if (ret < 0)
1156                goto out_err;
1157        svc_rdma_save_io_pages(rqstp, 0, head->rc_page_count);
1158        return 1;
1159
1160out_err:
1161        svc_rdma_read_info_free(info);
1162        return ret;
1163}
1164