linux/net/sunrpc/xprtrdma/svc_rdma_rw.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * Copyright (c) 2016-2018 Oracle.  All rights reserved.
   4 *
   5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
   6 */
   7
   8#include <rdma/rw.h>
   9
  10#include <linux/sunrpc/xdr.h>
  11#include <linux/sunrpc/rpc_rdma.h>
  12#include <linux/sunrpc/svc_rdma.h>
  13
  14#include "xprt_rdma.h"
  15#include <trace/events/rpcrdma.h>
  16
  17static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc);
  18static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc);
  19
  20/* Each R/W context contains state for one chain of RDMA Read or
  21 * Write Work Requests.
  22 *
  23 * Each WR chain handles a single contiguous server-side buffer,
  24 * because scatterlist entries after the first have to start on
  25 * page alignment. xdr_buf iovecs cannot guarantee alignment.
  26 *
  27 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
  28 * from a client may contain a unique R_key, so each WR chain moves
  29 * up to one segment at a time.
  30 *
  31 * The scatterlist makes this data structure over 4KB in size. To
  32 * make it less likely to fail, and to handle the allocation for
  33 * smaller I/O requests without disabling bottom-halves, these
  34 * contexts are created on demand, but cached and reused until the
  35 * controlling svcxprt_rdma is destroyed.
  36 */
  37struct svc_rdma_rw_ctxt {
  38        struct llist_node       rw_node;
  39        struct list_head        rw_list;
  40        struct rdma_rw_ctx      rw_ctx;
  41        unsigned int            rw_nents;
  42        struct sg_table         rw_sg_table;
  43        struct scatterlist      rw_first_sgl[];
  44};
  45
  46static inline struct svc_rdma_rw_ctxt *
  47svc_rdma_next_ctxt(struct list_head *list)
  48{
  49        return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt,
  50                                        rw_list);
  51}
  52
  53static struct svc_rdma_rw_ctxt *
  54svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges)
  55{
  56        struct svc_rdma_rw_ctxt *ctxt;
  57        struct llist_node *node;
  58
  59        spin_lock(&rdma->sc_rw_ctxt_lock);
  60        node = llist_del_first(&rdma->sc_rw_ctxts);
  61        spin_unlock(&rdma->sc_rw_ctxt_lock);
  62        if (node) {
  63                ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node);
  64        } else {
  65                ctxt = kmalloc(struct_size(ctxt, rw_first_sgl, SG_CHUNK_SIZE),
  66                               GFP_KERNEL);
  67                if (!ctxt)
  68                        goto out_noctx;
  69
  70                INIT_LIST_HEAD(&ctxt->rw_list);
  71        }
  72
  73        ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
  74        if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
  75                                   ctxt->rw_sg_table.sgl,
  76                                   SG_CHUNK_SIZE))
  77                goto out_free;
  78        return ctxt;
  79
  80out_free:
  81        kfree(ctxt);
  82out_noctx:
  83        trace_svcrdma_no_rwctx_err(rdma, sges);
  84        return NULL;
  85}
  86
  87static void __svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
  88                                   struct svc_rdma_rw_ctxt *ctxt,
  89                                   struct llist_head *list)
  90{
  91        sg_free_table_chained(&ctxt->rw_sg_table, SG_CHUNK_SIZE);
  92        llist_add(&ctxt->rw_node, list);
  93}
  94
  95static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
  96                                 struct svc_rdma_rw_ctxt *ctxt)
  97{
  98        __svc_rdma_put_rw_ctxt(rdma, ctxt, &rdma->sc_rw_ctxts);
  99}
 100
 101/**
 102 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
 103 * @rdma: transport about to be destroyed
 104 *
 105 */
 106void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
 107{
 108        struct svc_rdma_rw_ctxt *ctxt;
 109        struct llist_node *node;
 110
 111        while ((node = llist_del_first(&rdma->sc_rw_ctxts)) != NULL) {
 112                ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node);
 113                kfree(ctxt);
 114        }
 115}
 116
 117/**
 118 * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O
 119 * @rdma: controlling transport instance
 120 * @ctxt: R/W context to prepare
 121 * @offset: RDMA offset
 122 * @handle: RDMA tag/handle
 123 * @direction: I/O direction
 124 *
 125 * Returns on success, the number of WQEs that will be needed
 126 * on the workqueue, or a negative errno.
 127 */
 128static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma,
 129                                struct svc_rdma_rw_ctxt *ctxt,
 130                                u64 offset, u32 handle,
 131                                enum dma_data_direction direction)
 132{
 133        int ret;
 134
 135        ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, rdma->sc_port_num,
 136                               ctxt->rw_sg_table.sgl, ctxt->rw_nents,
 137                               0, offset, handle, direction);
 138        if (unlikely(ret < 0)) {
 139                svc_rdma_put_rw_ctxt(rdma, ctxt);
 140                trace_svcrdma_dma_map_rw_err(rdma, ctxt->rw_nents, ret);
 141        }
 142        return ret;
 143}
 144
 145/* A chunk context tracks all I/O for moving one Read or Write
 146 * chunk. This is a set of rdma_rw's that handle data movement
 147 * for all segments of one chunk.
 148 *
 149 * These are small, acquired with a single allocator call, and
 150 * no more than one is needed per chunk. They are allocated on
 151 * demand, and not cached.
 152 */
 153struct svc_rdma_chunk_ctxt {
 154        struct rpc_rdma_cid     cc_cid;
 155        struct ib_cqe           cc_cqe;
 156        struct svcxprt_rdma     *cc_rdma;
 157        struct list_head        cc_rwctxts;
 158        int                     cc_sqecount;
 159        enum ib_wc_status       cc_status;
 160        struct completion       cc_done;
 161};
 162
 163static void svc_rdma_cc_cid_init(struct svcxprt_rdma *rdma,
 164                                 struct rpc_rdma_cid *cid)
 165{
 166        cid->ci_queue_id = rdma->sc_sq_cq->res.id;
 167        cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids);
 168}
 169
 170static void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
 171                             struct svc_rdma_chunk_ctxt *cc)
 172{
 173        svc_rdma_cc_cid_init(rdma, &cc->cc_cid);
 174        cc->cc_rdma = rdma;
 175
 176        INIT_LIST_HEAD(&cc->cc_rwctxts);
 177        cc->cc_sqecount = 0;
 178}
 179
 180/*
 181 * The consumed rw_ctx's are cleaned and placed on a local llist so
 182 * that only one atomic llist operation is needed to put them all
 183 * back on the free list.
 184 */
 185static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc,
 186                                enum dma_data_direction dir)
 187{
 188        struct svcxprt_rdma *rdma = cc->cc_rdma;
 189        struct llist_node *first, *last;
 190        struct svc_rdma_rw_ctxt *ctxt;
 191        LLIST_HEAD(free);
 192
 193        first = last = NULL;
 194        while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) {
 195                list_del(&ctxt->rw_list);
 196
 197                rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
 198                                    rdma->sc_port_num, ctxt->rw_sg_table.sgl,
 199                                    ctxt->rw_nents, dir);
 200                __svc_rdma_put_rw_ctxt(rdma, ctxt, &free);
 201
 202                ctxt->rw_node.next = first;
 203                first = &ctxt->rw_node;
 204                if (!last)
 205                        last = first;
 206        }
 207        if (first)
 208                llist_add_batch(first, last, &rdma->sc_rw_ctxts);
 209}
 210
 211/* State for sending a Write or Reply chunk.
 212 *  - Tracks progress of writing one chunk over all its segments
 213 *  - Stores arguments for the SGL constructor functions
 214 */
 215struct svc_rdma_write_info {
 216        const struct svc_rdma_chunk     *wi_chunk;
 217
 218        /* write state of this chunk */
 219        unsigned int            wi_seg_off;
 220        unsigned int            wi_seg_no;
 221
 222        /* SGL constructor arguments */
 223        const struct xdr_buf    *wi_xdr;
 224        unsigned char           *wi_base;
 225        unsigned int            wi_next_off;
 226
 227        struct svc_rdma_chunk_ctxt      wi_cc;
 228};
 229
 230static struct svc_rdma_write_info *
 231svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma,
 232                          const struct svc_rdma_chunk *chunk)
 233{
 234        struct svc_rdma_write_info *info;
 235
 236        info = kmalloc(sizeof(*info), GFP_KERNEL);
 237        if (!info)
 238                return info;
 239
 240        info->wi_chunk = chunk;
 241        info->wi_seg_off = 0;
 242        info->wi_seg_no = 0;
 243        svc_rdma_cc_init(rdma, &info->wi_cc);
 244        info->wi_cc.cc_cqe.done = svc_rdma_write_done;
 245        return info;
 246}
 247
 248static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
 249{
 250        svc_rdma_cc_release(&info->wi_cc, DMA_TO_DEVICE);
 251        kfree(info);
 252}
 253
 254/**
 255 * svc_rdma_write_done - Write chunk completion
 256 * @cq: controlling Completion Queue
 257 * @wc: Work Completion
 258 *
 259 * Pages under I/O are freed by a subsequent Send completion.
 260 */
 261static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
 262{
 263        struct ib_cqe *cqe = wc->wr_cqe;
 264        struct svc_rdma_chunk_ctxt *cc =
 265                        container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
 266        struct svcxprt_rdma *rdma = cc->cc_rdma;
 267        struct svc_rdma_write_info *info =
 268                        container_of(cc, struct svc_rdma_write_info, wi_cc);
 269
 270        trace_svcrdma_wc_write(wc, &cc->cc_cid);
 271
 272        svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
 273
 274        if (unlikely(wc->status != IB_WC_SUCCESS))
 275                svc_xprt_deferred_close(&rdma->sc_xprt);
 276
 277        svc_rdma_write_info_free(info);
 278}
 279
 280/* State for pulling a Read chunk.
 281 */
 282struct svc_rdma_read_info {
 283        struct svc_rqst                 *ri_rqst;
 284        struct svc_rdma_recv_ctxt       *ri_readctxt;
 285        unsigned int                    ri_pageno;
 286        unsigned int                    ri_pageoff;
 287        unsigned int                    ri_totalbytes;
 288
 289        struct svc_rdma_chunk_ctxt      ri_cc;
 290};
 291
 292static struct svc_rdma_read_info *
 293svc_rdma_read_info_alloc(struct svcxprt_rdma *rdma)
 294{
 295        struct svc_rdma_read_info *info;
 296
 297        info = kmalloc(sizeof(*info), GFP_KERNEL);
 298        if (!info)
 299                return info;
 300
 301        svc_rdma_cc_init(rdma, &info->ri_cc);
 302        info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done;
 303        return info;
 304}
 305
 306static void svc_rdma_read_info_free(struct svc_rdma_read_info *info)
 307{
 308        svc_rdma_cc_release(&info->ri_cc, DMA_FROM_DEVICE);
 309        kfree(info);
 310}
 311
 312/**
 313 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
 314 * @cq: controlling Completion Queue
 315 * @wc: Work Completion
 316 *
 317 */
 318static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
 319{
 320        struct ib_cqe *cqe = wc->wr_cqe;
 321        struct svc_rdma_chunk_ctxt *cc =
 322                        container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
 323        struct svcxprt_rdma *rdma = cc->cc_rdma;
 324
 325        trace_svcrdma_wc_read(wc, &cc->cc_cid);
 326
 327        svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
 328        cc->cc_status = wc->status;
 329        complete(&cc->cc_done);
 330        return;
 331}
 332
 333/* This function sleeps when the transport's Send Queue is congested.
 334 *
 335 * Assumptions:
 336 * - If ib_post_send() succeeds, only one completion is expected,
 337 *   even if one or more WRs are flushed. This is true when posting
 338 *   an rdma_rw_ctx or when posting a single signaled WR.
 339 */
 340static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
 341{
 342        struct svcxprt_rdma *rdma = cc->cc_rdma;
 343        struct ib_send_wr *first_wr;
 344        const struct ib_send_wr *bad_wr;
 345        struct list_head *tmp;
 346        struct ib_cqe *cqe;
 347        int ret;
 348
 349        if (cc->cc_sqecount > rdma->sc_sq_depth)
 350                return -EINVAL;
 351
 352        first_wr = NULL;
 353        cqe = &cc->cc_cqe;
 354        list_for_each(tmp, &cc->cc_rwctxts) {
 355                struct svc_rdma_rw_ctxt *ctxt;
 356
 357                ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list);
 358                first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp,
 359                                           rdma->sc_port_num, cqe, first_wr);
 360                cqe = NULL;
 361        }
 362
 363        do {
 364                if (atomic_sub_return(cc->cc_sqecount,
 365                                      &rdma->sc_sq_avail) > 0) {
 366                        ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
 367                        if (ret)
 368                                break;
 369                        return 0;
 370                }
 371
 372                percpu_counter_inc(&svcrdma_stat_sq_starve);
 373                trace_svcrdma_sq_full(rdma);
 374                atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
 375                wait_event(rdma->sc_send_wait,
 376                           atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
 377                trace_svcrdma_sq_retry(rdma);
 378        } while (1);
 379
 380        trace_svcrdma_sq_post_err(rdma, ret);
 381        svc_xprt_deferred_close(&rdma->sc_xprt);
 382
 383        /* If even one was posted, there will be a completion. */
 384        if (bad_wr != first_wr)
 385                return 0;
 386
 387        atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
 388        wake_up(&rdma->sc_send_wait);
 389        return -ENOTCONN;
 390}
 391
 392/* Build and DMA-map an SGL that covers one kvec in an xdr_buf
 393 */
 394static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info,
 395                               unsigned int len,
 396                               struct svc_rdma_rw_ctxt *ctxt)
 397{
 398        struct scatterlist *sg = ctxt->rw_sg_table.sgl;
 399
 400        sg_set_buf(&sg[0], info->wi_base, len);
 401        info->wi_base += len;
 402
 403        ctxt->rw_nents = 1;
 404}
 405
 406/* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
 407 */
 408static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
 409                                    unsigned int remaining,
 410                                    struct svc_rdma_rw_ctxt *ctxt)
 411{
 412        unsigned int sge_no, sge_bytes, page_off, page_no;
 413        const struct xdr_buf *xdr = info->wi_xdr;
 414        struct scatterlist *sg;
 415        struct page **page;
 416
 417        page_off = info->wi_next_off + xdr->page_base;
 418        page_no = page_off >> PAGE_SHIFT;
 419        page_off = offset_in_page(page_off);
 420        page = xdr->pages + page_no;
 421        info->wi_next_off += remaining;
 422        sg = ctxt->rw_sg_table.sgl;
 423        sge_no = 0;
 424        do {
 425                sge_bytes = min_t(unsigned int, remaining,
 426                                  PAGE_SIZE - page_off);
 427                sg_set_page(sg, *page, sge_bytes, page_off);
 428
 429                remaining -= sge_bytes;
 430                sg = sg_next(sg);
 431                page_off = 0;
 432                sge_no++;
 433                page++;
 434        } while (remaining);
 435
 436        ctxt->rw_nents = sge_no;
 437}
 438
 439/* Construct RDMA Write WRs to send a portion of an xdr_buf containing
 440 * an RPC Reply.
 441 */
 442static int
 443svc_rdma_build_writes(struct svc_rdma_write_info *info,
 444                      void (*constructor)(struct svc_rdma_write_info *info,
 445                                          unsigned int len,
 446                                          struct svc_rdma_rw_ctxt *ctxt),
 447                      unsigned int remaining)
 448{
 449        struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
 450        struct svcxprt_rdma *rdma = cc->cc_rdma;
 451        const struct svc_rdma_segment *seg;
 452        struct svc_rdma_rw_ctxt *ctxt;
 453        int ret;
 454
 455        do {
 456                unsigned int write_len;
 457                u64 offset;
 458
 459                seg = &info->wi_chunk->ch_segments[info->wi_seg_no];
 460                if (!seg)
 461                        goto out_overflow;
 462
 463                write_len = min(remaining, seg->rs_length - info->wi_seg_off);
 464                if (!write_len)
 465                        goto out_overflow;
 466                ctxt = svc_rdma_get_rw_ctxt(rdma,
 467                                            (write_len >> PAGE_SHIFT) + 2);
 468                if (!ctxt)
 469                        return -ENOMEM;
 470
 471                constructor(info, write_len, ctxt);
 472                offset = seg->rs_offset + info->wi_seg_off;
 473                ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle,
 474                                           DMA_TO_DEVICE);
 475                if (ret < 0)
 476                        return -EIO;
 477                percpu_counter_inc(&svcrdma_stat_write);
 478
 479                list_add(&ctxt->rw_list, &cc->cc_rwctxts);
 480                cc->cc_sqecount += ret;
 481                if (write_len == seg->rs_length - info->wi_seg_off) {
 482                        info->wi_seg_no++;
 483                        info->wi_seg_off = 0;
 484                } else {
 485                        info->wi_seg_off += write_len;
 486                }
 487                remaining -= write_len;
 488        } while (remaining);
 489
 490        return 0;
 491
 492out_overflow:
 493        trace_svcrdma_small_wrch_err(rdma, remaining, info->wi_seg_no,
 494                                     info->wi_chunk->ch_segcount);
 495        return -E2BIG;
 496}
 497
 498/**
 499 * svc_rdma_iov_write - Construct RDMA Writes from an iov
 500 * @info: pointer to write arguments
 501 * @iov: kvec to write
 502 *
 503 * Returns:
 504 *   On success, returns zero
 505 *   %-E2BIG if the client-provided Write chunk is too small
 506 *   %-ENOMEM if a resource has been exhausted
 507 *   %-EIO if an rdma-rw error occurred
 508 */
 509static int svc_rdma_iov_write(struct svc_rdma_write_info *info,
 510                              const struct kvec *iov)
 511{
 512        info->wi_base = iov->iov_base;
 513        return svc_rdma_build_writes(info, svc_rdma_vec_to_sg,
 514                                     iov->iov_len);
 515}
 516
 517/**
 518 * svc_rdma_pages_write - Construct RDMA Writes from pages
 519 * @info: pointer to write arguments
 520 * @xdr: xdr_buf with pages to write
 521 * @offset: offset into the content of @xdr
 522 * @length: number of bytes to write
 523 *
 524 * Returns:
 525 *   On success, returns zero
 526 *   %-E2BIG if the client-provided Write chunk is too small
 527 *   %-ENOMEM if a resource has been exhausted
 528 *   %-EIO if an rdma-rw error occurred
 529 */
 530static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
 531                                const struct xdr_buf *xdr,
 532                                unsigned int offset,
 533                                unsigned long length)
 534{
 535        info->wi_xdr = xdr;
 536        info->wi_next_off = offset - xdr->head[0].iov_len;
 537        return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
 538                                     length);
 539}
 540
 541/**
 542 * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf
 543 * @xdr: xdr_buf to write
 544 * @data: pointer to write arguments
 545 *
 546 * Returns:
 547 *   On success, returns zero
 548 *   %-E2BIG if the client-provided Write chunk is too small
 549 *   %-ENOMEM if a resource has been exhausted
 550 *   %-EIO if an rdma-rw error occurred
 551 */
 552static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data)
 553{
 554        struct svc_rdma_write_info *info = data;
 555        int ret;
 556
 557        if (xdr->head[0].iov_len) {
 558                ret = svc_rdma_iov_write(info, &xdr->head[0]);
 559                if (ret < 0)
 560                        return ret;
 561        }
 562
 563        if (xdr->page_len) {
 564                ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
 565                                           xdr->page_len);
 566                if (ret < 0)
 567                        return ret;
 568        }
 569
 570        if (xdr->tail[0].iov_len) {
 571                ret = svc_rdma_iov_write(info, &xdr->tail[0]);
 572                if (ret < 0)
 573                        return ret;
 574        }
 575
 576        return xdr->len;
 577}
 578
 579/**
 580 * svc_rdma_send_write_chunk - Write all segments in a Write chunk
 581 * @rdma: controlling RDMA transport
 582 * @chunk: Write chunk provided by the client
 583 * @xdr: xdr_buf containing the data payload
 584 *
 585 * Returns a non-negative number of bytes the chunk consumed, or
 586 *      %-E2BIG if the payload was larger than the Write chunk,
 587 *      %-EINVAL if client provided too many segments,
 588 *      %-ENOMEM if rdma_rw context pool was exhausted,
 589 *      %-ENOTCONN if posting failed (connection is lost),
 590 *      %-EIO if rdma_rw initialization failed (DMA mapping, etc).
 591 */
 592int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
 593                              const struct svc_rdma_chunk *chunk,
 594                              const struct xdr_buf *xdr)
 595{
 596        struct svc_rdma_write_info *info;
 597        struct svc_rdma_chunk_ctxt *cc;
 598        int ret;
 599
 600        info = svc_rdma_write_info_alloc(rdma, chunk);
 601        if (!info)
 602                return -ENOMEM;
 603        cc = &info->wi_cc;
 604
 605        ret = svc_rdma_xb_write(xdr, info);
 606        if (ret != xdr->len)
 607                goto out_err;
 608
 609        trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
 610        ret = svc_rdma_post_chunk_ctxt(cc);
 611        if (ret < 0)
 612                goto out_err;
 613        return xdr->len;
 614
 615out_err:
 616        svc_rdma_write_info_free(info);
 617        return ret;
 618}
 619
 620/**
 621 * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
 622 * @rdma: controlling RDMA transport
 623 * @rctxt: Write and Reply chunks from client
 624 * @xdr: xdr_buf containing an RPC Reply
 625 *
 626 * Returns a non-negative number of bytes the chunk consumed, or
 627 *      %-E2BIG if the payload was larger than the Reply chunk,
 628 *      %-EINVAL if client provided too many segments,
 629 *      %-ENOMEM if rdma_rw context pool was exhausted,
 630 *      %-ENOTCONN if posting failed (connection is lost),
 631 *      %-EIO if rdma_rw initialization failed (DMA mapping, etc).
 632 */
 633int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 634                              const struct svc_rdma_recv_ctxt *rctxt,
 635                              const struct xdr_buf *xdr)
 636{
 637        struct svc_rdma_write_info *info;
 638        struct svc_rdma_chunk_ctxt *cc;
 639        struct svc_rdma_chunk *chunk;
 640        int ret;
 641
 642        if (pcl_is_empty(&rctxt->rc_reply_pcl))
 643                return 0;
 644
 645        chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
 646        info = svc_rdma_write_info_alloc(rdma, chunk);
 647        if (!info)
 648                return -ENOMEM;
 649        cc = &info->wi_cc;
 650
 651        ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
 652                                      svc_rdma_xb_write, info);
 653        if (ret < 0)
 654                goto out_err;
 655
 656        trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount);
 657        ret = svc_rdma_post_chunk_ctxt(cc);
 658        if (ret < 0)
 659                goto out_err;
 660
 661        return xdr->len;
 662
 663out_err:
 664        svc_rdma_write_info_free(info);
 665        return ret;
 666}
 667
 668/**
 669 * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment
 670 * @info: context for ongoing I/O
 671 * @segment: co-ordinates of remote memory to be read
 672 *
 673 * Returns:
 674 *   %0: the Read WR chain was constructed successfully
 675 *   %-EINVAL: there were not enough rq_pages to finish
 676 *   %-ENOMEM: allocating a local resources failed
 677 *   %-EIO: a DMA mapping error occurred
 678 */
 679static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
 680                                       const struct svc_rdma_segment *segment)
 681{
 682        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 683        struct svc_rdma_chunk_ctxt *cc = &info->ri_cc;
 684        struct svc_rqst *rqstp = info->ri_rqst;
 685        unsigned int sge_no, seg_len, len;
 686        struct svc_rdma_rw_ctxt *ctxt;
 687        struct scatterlist *sg;
 688        int ret;
 689
 690        len = segment->rs_length;
 691        sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT;
 692        ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no);
 693        if (!ctxt)
 694                return -ENOMEM;
 695        ctxt->rw_nents = sge_no;
 696
 697        sg = ctxt->rw_sg_table.sgl;
 698        for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) {
 699                seg_len = min_t(unsigned int, len,
 700                                PAGE_SIZE - info->ri_pageoff);
 701
 702                if (!info->ri_pageoff)
 703                        head->rc_page_count++;
 704
 705                sg_set_page(sg, rqstp->rq_pages[info->ri_pageno],
 706                            seg_len, info->ri_pageoff);
 707                sg = sg_next(sg);
 708
 709                info->ri_pageoff += seg_len;
 710                if (info->ri_pageoff == PAGE_SIZE) {
 711                        info->ri_pageno++;
 712                        info->ri_pageoff = 0;
 713                }
 714                len -= seg_len;
 715
 716                /* Safety check */
 717                if (len &&
 718                    &rqstp->rq_pages[info->ri_pageno + 1] > rqstp->rq_page_end)
 719                        goto out_overrun;
 720        }
 721
 722        ret = svc_rdma_rw_ctx_init(cc->cc_rdma, ctxt, segment->rs_offset,
 723                                   segment->rs_handle, DMA_FROM_DEVICE);
 724        if (ret < 0)
 725                return -EIO;
 726        percpu_counter_inc(&svcrdma_stat_read);
 727
 728        list_add(&ctxt->rw_list, &cc->cc_rwctxts);
 729        cc->cc_sqecount += ret;
 730        return 0;
 731
 732out_overrun:
 733        trace_svcrdma_page_overrun_err(cc->cc_rdma, rqstp, info->ri_pageno);
 734        return -EINVAL;
 735}
 736
 737/**
 738 * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk
 739 * @info: context for ongoing I/O
 740 * @chunk: Read chunk to pull
 741 *
 742 * Return values:
 743 *   %0: the Read WR chain was constructed successfully
 744 *   %-EINVAL: there were not enough resources to finish
 745 *   %-ENOMEM: allocating a local resources failed
 746 *   %-EIO: a DMA mapping error occurred
 747 */
 748static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info,
 749                                     const struct svc_rdma_chunk *chunk)
 750{
 751        const struct svc_rdma_segment *segment;
 752        int ret;
 753
 754        ret = -EINVAL;
 755        pcl_for_each_segment(segment, chunk) {
 756                ret = svc_rdma_build_read_segment(info, segment);
 757                if (ret < 0)
 758                        break;
 759                info->ri_totalbytes += segment->rs_length;
 760        }
 761        return ret;
 762}
 763
 764/**
 765 * svc_rdma_copy_inline_range - Copy part of the inline content into pages
 766 * @info: context for RDMA Reads
 767 * @offset: offset into the Receive buffer of region to copy
 768 * @remaining: length of region to copy
 769 *
 770 * Take a page at a time from rqstp->rq_pages and copy the inline
 771 * content from the Receive buffer into that page. Update
 772 * info->ri_pageno and info->ri_pageoff so that the next RDMA Read
 773 * result will land contiguously with the copied content.
 774 *
 775 * Return values:
 776 *   %0: Inline content was successfully copied
 777 *   %-EINVAL: offset or length was incorrect
 778 */
 779static int svc_rdma_copy_inline_range(struct svc_rdma_read_info *info,
 780                                      unsigned int offset,
 781                                      unsigned int remaining)
 782{
 783        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 784        unsigned char *dst, *src = head->rc_recv_buf;
 785        struct svc_rqst *rqstp = info->ri_rqst;
 786        unsigned int page_no, numpages;
 787
 788        numpages = PAGE_ALIGN(info->ri_pageoff + remaining) >> PAGE_SHIFT;
 789        for (page_no = 0; page_no < numpages; page_no++) {
 790                unsigned int page_len;
 791
 792                page_len = min_t(unsigned int, remaining,
 793                                 PAGE_SIZE - info->ri_pageoff);
 794
 795                if (!info->ri_pageoff)
 796                        head->rc_page_count++;
 797
 798                dst = page_address(rqstp->rq_pages[info->ri_pageno]);
 799                memcpy(dst + info->ri_pageno, src + offset, page_len);
 800
 801                info->ri_totalbytes += page_len;
 802                info->ri_pageoff += page_len;
 803                if (info->ri_pageoff == PAGE_SIZE) {
 804                        info->ri_pageno++;
 805                        info->ri_pageoff = 0;
 806                }
 807                remaining -= page_len;
 808                offset += page_len;
 809        }
 810
 811        return -EINVAL;
 812}
 813
 814/**
 815 * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks
 816 * @info: context for RDMA Reads
 817 *
 818 * The chunk data lands in rqstp->rq_arg as a series of contiguous pages,
 819 * like an incoming TCP call.
 820 *
 821 * Return values:
 822 *   %0: RDMA Read WQEs were successfully built
 823 *   %-EINVAL: client provided too many chunks or segments,
 824 *   %-ENOMEM: rdma_rw context pool was exhausted,
 825 *   %-ENOTCONN: posting failed (connection is lost),
 826 *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
 827 */
 828static noinline int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info *info)
 829{
 830        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 831        const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
 832        struct xdr_buf *buf = &info->ri_rqst->rq_arg;
 833        struct svc_rdma_chunk *chunk, *next;
 834        unsigned int start, length;
 835        int ret;
 836
 837        start = 0;
 838        chunk = pcl_first_chunk(pcl);
 839        length = chunk->ch_position;
 840        ret = svc_rdma_copy_inline_range(info, start, length);
 841        if (ret < 0)
 842                return ret;
 843
 844        pcl_for_each_chunk(chunk, pcl) {
 845                ret = svc_rdma_build_read_chunk(info, chunk);
 846                if (ret < 0)
 847                        return ret;
 848
 849                next = pcl_next_chunk(pcl, chunk);
 850                if (!next)
 851                        break;
 852
 853                start += length;
 854                length = next->ch_position - info->ri_totalbytes;
 855                ret = svc_rdma_copy_inline_range(info, start, length);
 856                if (ret < 0)
 857                        return ret;
 858        }
 859
 860        start += length;
 861        length = head->rc_byte_len - start;
 862        ret = svc_rdma_copy_inline_range(info, start, length);
 863        if (ret < 0)
 864                return ret;
 865
 866        buf->len += info->ri_totalbytes;
 867        buf->buflen += info->ri_totalbytes;
 868
 869        buf->head[0].iov_base = page_address(info->ri_rqst->rq_pages[0]);
 870        buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes);
 871        buf->pages = &info->ri_rqst->rq_pages[1];
 872        buf->page_len = info->ri_totalbytes - buf->head[0].iov_len;
 873        return 0;
 874}
 875
 876/**
 877 * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks
 878 * @info: context for RDMA Reads
 879 *
 880 * The chunk data lands in the page list of rqstp->rq_arg.pages.
 881 *
 882 * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec.
 883 * Therefore, XDR round-up of the Read chunk and trailing
 884 * inline content must both be added at the end of the pagelist.
 885 *
 886 * Return values:
 887 *   %0: RDMA Read WQEs were successfully built
 888 *   %-EINVAL: client provided too many chunks or segments,
 889 *   %-ENOMEM: rdma_rw context pool was exhausted,
 890 *   %-ENOTCONN: posting failed (connection is lost),
 891 *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
 892 */
 893static int svc_rdma_read_data_item(struct svc_rdma_read_info *info)
 894{
 895        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 896        struct xdr_buf *buf = &info->ri_rqst->rq_arg;
 897        struct svc_rdma_chunk *chunk;
 898        unsigned int length;
 899        int ret;
 900
 901        chunk = pcl_first_chunk(&head->rc_read_pcl);
 902        ret = svc_rdma_build_read_chunk(info, chunk);
 903        if (ret < 0)
 904                goto out;
 905
 906        /* Split the Receive buffer between the head and tail
 907         * buffers at Read chunk's position. XDR roundup of the
 908         * chunk is not included in either the pagelist or in
 909         * the tail.
 910         */
 911        buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position;
 912        buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position;
 913        buf->head[0].iov_len = chunk->ch_position;
 914
 915        /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2).
 916         *
 917         * If the client already rounded up the chunk length, the
 918         * length does not change. Otherwise, the length of the page
 919         * list is increased to include XDR round-up.
 920         *
 921         * Currently these chunks always start at page offset 0,
 922         * thus the rounded-up length never crosses a page boundary.
 923         */
 924        buf->pages = &info->ri_rqst->rq_pages[0];
 925        length = xdr_align_size(chunk->ch_length);
 926        buf->page_len = length;
 927        buf->len += length;
 928        buf->buflen += length;
 929
 930out:
 931        return ret;
 932}
 933
 934/**
 935 * svc_rdma_read_chunk_range - Build RDMA Read WQEs for portion of a chunk
 936 * @info: context for RDMA Reads
 937 * @chunk: parsed Call chunk to pull
 938 * @offset: offset of region to pull
 939 * @length: length of region to pull
 940 *
 941 * Return values:
 942 *   %0: RDMA Read WQEs were successfully built
 943 *   %-EINVAL: there were not enough resources to finish
 944 *   %-ENOMEM: rdma_rw context pool was exhausted,
 945 *   %-ENOTCONN: posting failed (connection is lost),
 946 *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
 947 */
 948static int svc_rdma_read_chunk_range(struct svc_rdma_read_info *info,
 949                                     const struct svc_rdma_chunk *chunk,
 950                                     unsigned int offset, unsigned int length)
 951{
 952        const struct svc_rdma_segment *segment;
 953        int ret;
 954
 955        ret = -EINVAL;
 956        pcl_for_each_segment(segment, chunk) {
 957                struct svc_rdma_segment dummy;
 958
 959                if (offset > segment->rs_length) {
 960                        offset -= segment->rs_length;
 961                        continue;
 962                }
 963
 964                dummy.rs_handle = segment->rs_handle;
 965                dummy.rs_length = min_t(u32, length, segment->rs_length) - offset;
 966                dummy.rs_offset = segment->rs_offset + offset;
 967
 968                ret = svc_rdma_build_read_segment(info, &dummy);
 969                if (ret < 0)
 970                        break;
 971
 972                info->ri_totalbytes += dummy.rs_length;
 973                length -= dummy.rs_length;
 974                offset = 0;
 975        }
 976        return ret;
 977}
 978
 979/**
 980 * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message
 981 * @info: context for RDMA Reads
 982 *
 983 * Return values:
 984 *   %0: RDMA Read WQEs were successfully built
 985 *   %-EINVAL: there were not enough resources to finish
 986 *   %-ENOMEM: rdma_rw context pool was exhausted,
 987 *   %-ENOTCONN: posting failed (connection is lost),
 988 *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
 989 */
 990static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info)
 991{
 992        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 993        const struct svc_rdma_chunk *call_chunk =
 994                        pcl_first_chunk(&head->rc_call_pcl);
 995        const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
 996        struct svc_rdma_chunk *chunk, *next;
 997        unsigned int start, length;
 998        int ret;
 999
1000        if (pcl_is_empty(pcl))
1001                return svc_rdma_build_read_chunk(info, call_chunk);
1002
1003        start = 0;
1004        chunk = pcl_first_chunk(pcl);
1005        length = chunk->ch_position;
1006        ret = svc_rdma_read_chunk_range(info, call_chunk, start, length);
1007        if (ret < 0)
1008                return ret;
1009
1010        pcl_for_each_chunk(chunk, pcl) {
1011                ret = svc_rdma_build_read_chunk(info, chunk);
1012                if (ret < 0)
1013                        return ret;
1014
1015                next = pcl_next_chunk(pcl, chunk);
1016                if (!next)
1017                        break;
1018
1019                start += length;
1020                length = next->ch_position - info->ri_totalbytes;
1021                ret = svc_rdma_read_chunk_range(info, call_chunk,
1022                                                start, length);
1023                if (ret < 0)
1024                        return ret;
1025        }
1026
1027        start += length;
1028        length = call_chunk->ch_length - start;
1029        return svc_rdma_read_chunk_range(info, call_chunk, start, length);
1030}
1031
1032/**
1033 * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message
1034 * @info: context for RDMA Reads
1035 *
1036 * The start of the data lands in the first page just after the
1037 * Transport header, and the rest lands in rqstp->rq_arg.pages.
1038 *
1039 * Assumptions:
1040 *      - A PZRC is never sent in an RDMA_MSG message, though it's
1041 *        allowed by spec.
1042 *
1043 * Return values:
1044 *   %0: RDMA Read WQEs were successfully built
1045 *   %-EINVAL: client provided too many chunks or segments,
1046 *   %-ENOMEM: rdma_rw context pool was exhausted,
1047 *   %-ENOTCONN: posting failed (connection is lost),
1048 *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1049 */
1050static noinline int svc_rdma_read_special(struct svc_rdma_read_info *info)
1051{
1052        struct xdr_buf *buf = &info->ri_rqst->rq_arg;
1053        int ret;
1054
1055        ret = svc_rdma_read_call_chunk(info);
1056        if (ret < 0)
1057                goto out;
1058
1059        buf->len += info->ri_totalbytes;
1060        buf->buflen += info->ri_totalbytes;
1061
1062        buf->head[0].iov_base = page_address(info->ri_rqst->rq_pages[0]);
1063        buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes);
1064        buf->pages = &info->ri_rqst->rq_pages[1];
1065        buf->page_len = info->ri_totalbytes - buf->head[0].iov_len;
1066
1067out:
1068        return ret;
1069}
1070
1071/**
1072 * svc_rdma_process_read_list - Pull list of Read chunks from the client
1073 * @rdma: controlling RDMA transport
1074 * @rqstp: set of pages to use as Read sink buffers
1075 * @head: pages under I/O collect here
1076 *
1077 * The RPC/RDMA protocol assumes that the upper layer's XDR decoders
1078 * pull each Read chunk as they decode an incoming RPC message.
1079 *
1080 * On Linux, however, the server needs to have a fully-constructed RPC
1081 * message in rqstp->rq_arg when there is a positive return code from
1082 * ->xpo_recvfrom. So the Read list is safety-checked immediately when
1083 * it is received, then here the whole Read list is pulled all at once.
1084 * The ingress RPC message is fully reconstructed once all associated
1085 * RDMA Reads have completed.
1086 *
1087 * Return values:
1088 *   %1: all needed RDMA Reads were posted successfully,
1089 *   %-EINVAL: client provided too many chunks or segments,
1090 *   %-ENOMEM: rdma_rw context pool was exhausted,
1091 *   %-ENOTCONN: posting failed (connection is lost),
1092 *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1093 */
1094int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
1095                               struct svc_rqst *rqstp,
1096                               struct svc_rdma_recv_ctxt *head)
1097{
1098        struct svc_rdma_read_info *info;
1099        struct svc_rdma_chunk_ctxt *cc;
1100        int ret;
1101
1102        info = svc_rdma_read_info_alloc(rdma);
1103        if (!info)
1104                return -ENOMEM;
1105        cc = &info->ri_cc;
1106        info->ri_rqst = rqstp;
1107        info->ri_readctxt = head;
1108        info->ri_pageno = 0;
1109        info->ri_pageoff = 0;
1110        info->ri_totalbytes = 0;
1111
1112        if (pcl_is_empty(&head->rc_call_pcl)) {
1113                if (head->rc_read_pcl.cl_count == 1)
1114                        ret = svc_rdma_read_data_item(info);
1115                else
1116                        ret = svc_rdma_read_multiple_chunks(info);
1117        } else
1118                ret = svc_rdma_read_special(info);
1119        if (ret < 0)
1120                goto out_err;
1121
1122        trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount);
1123        init_completion(&cc->cc_done);
1124        ret = svc_rdma_post_chunk_ctxt(cc);
1125        if (ret < 0)
1126                goto out_err;
1127
1128        ret = 1;
1129        wait_for_completion(&cc->cc_done);
1130        if (cc->cc_status != IB_WC_SUCCESS)
1131                ret = -EIO;
1132
1133        /* rq_respages starts after the last arg page */
1134        rqstp->rq_respages = &rqstp->rq_pages[head->rc_page_count];
1135        rqstp->rq_next_page = rqstp->rq_respages + 1;
1136
1137        /* Ensure svc_rdma_recv_ctxt_put() does not try to release pages */
1138        head->rc_page_count = 0;
1139
1140out_err:
1141        svc_rdma_read_info_free(info);
1142        return ret;
1143}
1144