linux/net/sunrpc/xprtrdma/rpc_rdma.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
   2/*
   3 * Copyright (c) 2014-2020, Oracle and/or its affiliates.
   4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   5 *
   6 * This software is available to you under a choice of one of two
   7 * licenses.  You may choose to be licensed under the terms of the GNU
   8 * General Public License (GPL) Version 2, available from the file
   9 * COPYING in the main directory of this source tree, or the BSD-type
  10 * license below:
  11 *
  12 * Redistribution and use in source and binary forms, with or without
  13 * modification, are permitted provided that the following conditions
  14 * are met:
  15 *
  16 *      Redistributions of source code must retain the above copyright
  17 *      notice, this list of conditions and the following disclaimer.
  18 *
  19 *      Redistributions in binary form must reproduce the above
  20 *      copyright notice, this list of conditions and the following
  21 *      disclaimer in the documentation and/or other materials provided
  22 *      with the distribution.
  23 *
  24 *      Neither the name of the Network Appliance, Inc. nor the names of
  25 *      its contributors may be used to endorse or promote products
  26 *      derived from this software without specific prior written
  27 *      permission.
  28 *
  29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  40 */
  41
  42/*
  43 * rpc_rdma.c
  44 *
  45 * This file contains the guts of the RPC RDMA protocol, and
  46 * does marshaling/unmarshaling, etc. It is also where interfacing
  47 * to the Linux RPC framework lives.
  48 */
  49
  50#include <linux/highmem.h>
  51
  52#include <linux/sunrpc/svc_rdma.h>
  53
  54#include "xprt_rdma.h"
  55#include <trace/events/rpcrdma.h>
  56
  57#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  58# define RPCDBG_FACILITY        RPCDBG_TRANS
  59#endif
  60
  61/* Returns size of largest RPC-over-RDMA header in a Call message
  62 *
  63 * The largest Call header contains a full-size Read list and a
  64 * minimal Reply chunk.
  65 */
  66static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
  67{
  68        unsigned int size;
  69
  70        /* Fixed header fields and list discriminators */
  71        size = RPCRDMA_HDRLEN_MIN;
  72
  73        /* Maximum Read list size */
  74        size += maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
  75
  76        /* Minimal Read chunk size */
  77        size += sizeof(__be32); /* segment count */
  78        size += rpcrdma_segment_maxsz * sizeof(__be32);
  79        size += sizeof(__be32); /* list discriminator */
  80
  81        return size;
  82}
  83
  84/* Returns size of largest RPC-over-RDMA header in a Reply message
  85 *
  86 * There is only one Write list or one Reply chunk per Reply
  87 * message.  The larger list is the Write list.
  88 */
  89static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
  90{
  91        unsigned int size;
  92
  93        /* Fixed header fields and list discriminators */
  94        size = RPCRDMA_HDRLEN_MIN;
  95
  96        /* Maximum Write list size */
  97        size += sizeof(__be32);         /* segment count */
  98        size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
  99        size += sizeof(__be32); /* list discriminator */
 100
 101        return size;
 102}
 103
 104/**
 105 * rpcrdma_set_max_header_sizes - Initialize inline payload sizes
 106 * @ep: endpoint to initialize
 107 *
 108 * The max_inline fields contain the maximum size of an RPC message
 109 * so the marshaling code doesn't have to repeat this calculation
 110 * for every RPC.
 111 */
 112void rpcrdma_set_max_header_sizes(struct rpcrdma_ep *ep)
 113{
 114        unsigned int maxsegs = ep->re_max_rdma_segs;
 115
 116        ep->re_max_inline_send =
 117                ep->re_inline_send - rpcrdma_max_call_header_size(maxsegs);
 118        ep->re_max_inline_recv =
 119                ep->re_inline_recv - rpcrdma_max_reply_header_size(maxsegs);
 120}
 121
 122/* The client can send a request inline as long as the RPCRDMA header
 123 * plus the RPC call fit under the transport's inline limit. If the
 124 * combined call message size exceeds that limit, the client must use
 125 * a Read chunk for this operation.
 126 *
 127 * A Read chunk is also required if sending the RPC call inline would
 128 * exceed this device's max_sge limit.
 129 */
 130static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
 131                                struct rpc_rqst *rqst)
 132{
 133        struct xdr_buf *xdr = &rqst->rq_snd_buf;
 134        struct rpcrdma_ep *ep = r_xprt->rx_ep;
 135        unsigned int count, remaining, offset;
 136
 137        if (xdr->len > ep->re_max_inline_send)
 138                return false;
 139
 140        if (xdr->page_len) {
 141                remaining = xdr->page_len;
 142                offset = offset_in_page(xdr->page_base);
 143                count = RPCRDMA_MIN_SEND_SGES;
 144                while (remaining) {
 145                        remaining -= min_t(unsigned int,
 146                                           PAGE_SIZE - offset, remaining);
 147                        offset = 0;
 148                        if (++count > ep->re_attr.cap.max_send_sge)
 149                                return false;
 150                }
 151        }
 152
 153        return true;
 154}
 155
 156/* The client can't know how large the actual reply will be. Thus it
 157 * plans for the largest possible reply for that particular ULP
 158 * operation. If the maximum combined reply message size exceeds that
 159 * limit, the client must provide a write list or a reply chunk for
 160 * this request.
 161 */
 162static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 163                                   struct rpc_rqst *rqst)
 164{
 165        return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep->re_max_inline_recv;
 166}
 167
 168/* The client is required to provide a Reply chunk if the maximum
 169 * size of the non-payload part of the RPC Reply is larger than
 170 * the inline threshold.
 171 */
 172static bool
 173rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt,
 174                          const struct rpc_rqst *rqst)
 175{
 176        const struct xdr_buf *buf = &rqst->rq_rcv_buf;
 177
 178        return (buf->head[0].iov_len + buf->tail[0].iov_len) <
 179                r_xprt->rx_ep->re_max_inline_recv;
 180}
 181
 182/* ACL likes to be lazy in allocating pages. For TCP, these
 183 * pages can be allocated during receive processing. Not true
 184 * for RDMA, which must always provision receive buffers
 185 * up front.
 186 */
 187static noinline int
 188rpcrdma_alloc_sparse_pages(struct xdr_buf *buf)
 189{
 190        struct page **ppages;
 191        int len;
 192
 193        len = buf->page_len;
 194        ppages = buf->pages + (buf->page_base >> PAGE_SHIFT);
 195        while (len > 0) {
 196                if (!*ppages)
 197                        *ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN);
 198                if (!*ppages)
 199                        return -ENOBUFS;
 200                ppages++;
 201                len -= PAGE_SIZE;
 202        }
 203
 204        return 0;
 205}
 206
 207/* Convert @vec to a single SGL element.
 208 *
 209 * Returns pointer to next available SGE, and bumps the total number
 210 * of SGEs consumed.
 211 */
 212static struct rpcrdma_mr_seg *
 213rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
 214                     unsigned int *n)
 215{
 216        seg->mr_page = virt_to_page(vec->iov_base);
 217        seg->mr_offset = offset_in_page(vec->iov_base);
 218        seg->mr_len = vec->iov_len;
 219        ++seg;
 220        ++(*n);
 221        return seg;
 222}
 223
 224/* Convert @xdrbuf into SGEs no larger than a page each. As they
 225 * are registered, these SGEs are then coalesced into RDMA segments
 226 * when the selected memreg mode supports it.
 227 *
 228 * Returns positive number of SGEs consumed, or a negative errno.
 229 */
 230
 231static int
 232rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
 233                     unsigned int pos, enum rpcrdma_chunktype type,
 234                     struct rpcrdma_mr_seg *seg)
 235{
 236        unsigned long page_base;
 237        unsigned int len, n;
 238        struct page **ppages;
 239
 240        n = 0;
 241        if (pos == 0)
 242                seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n);
 243
 244        len = xdrbuf->page_len;
 245        ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
 246        page_base = offset_in_page(xdrbuf->page_base);
 247        while (len) {
 248                seg->mr_page = *ppages;
 249                seg->mr_offset = page_base;
 250                seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len);
 251                len -= seg->mr_len;
 252                ++ppages;
 253                ++seg;
 254                ++n;
 255                page_base = 0;
 256        }
 257
 258        if (type == rpcrdma_readch)
 259                goto out;
 260
 261        /* When encoding a Write chunk, some servers need to see an
 262         * extra segment for non-XDR-aligned Write chunks. The upper
 263         * layer provides space in the tail iovec that may be used
 264         * for this purpose.
 265         */
 266        if (type == rpcrdma_writech && r_xprt->rx_ep->re_implicit_roundup)
 267                goto out;
 268
 269        if (xdrbuf->tail[0].iov_len)
 270                rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n);
 271
 272out:
 273        if (unlikely(n > RPCRDMA_MAX_SEGS))
 274                return -EIO;
 275        return n;
 276}
 277
 278static int
 279encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr)
 280{
 281        __be32 *p;
 282
 283        p = xdr_reserve_space(xdr, 4 * sizeof(*p));
 284        if (unlikely(!p))
 285                return -EMSGSIZE;
 286
 287        xdr_encode_rdma_segment(p, mr->mr_handle, mr->mr_length, mr->mr_offset);
 288        return 0;
 289}
 290
 291static int
 292encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr,
 293                    u32 position)
 294{
 295        __be32 *p;
 296
 297        p = xdr_reserve_space(xdr, 6 * sizeof(*p));
 298        if (unlikely(!p))
 299                return -EMSGSIZE;
 300
 301        *p++ = xdr_one;                 /* Item present */
 302        xdr_encode_read_segment(p, position, mr->mr_handle, mr->mr_length,
 303                                mr->mr_offset);
 304        return 0;
 305}
 306
 307static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
 308                                                 struct rpcrdma_req *req,
 309                                                 struct rpcrdma_mr_seg *seg,
 310                                                 int nsegs, bool writing,
 311                                                 struct rpcrdma_mr **mr)
 312{
 313        *mr = rpcrdma_mr_pop(&req->rl_free_mrs);
 314        if (!*mr) {
 315                *mr = rpcrdma_mr_get(r_xprt);
 316                if (!*mr)
 317                        goto out_getmr_err;
 318                (*mr)->mr_req = req;
 319        }
 320
 321        rpcrdma_mr_push(*mr, &req->rl_registered);
 322        return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr);
 323
 324out_getmr_err:
 325        trace_xprtrdma_nomrs_err(r_xprt, req);
 326        xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
 327        rpcrdma_mrs_refresh(r_xprt);
 328        return ERR_PTR(-EAGAIN);
 329}
 330
 331/* Register and XDR encode the Read list. Supports encoding a list of read
 332 * segments that belong to a single read chunk.
 333 *
 334 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
 335 *
 336 *  Read chunklist (a linked list):
 337 *   N elements, position P (same P for all chunks of same arg!):
 338 *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
 339 *
 340 * Returns zero on success, or a negative errno if a failure occurred.
 341 * @xdr is advanced to the next position in the stream.
 342 *
 343 * Only a single @pos value is currently supported.
 344 */
 345static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
 346                                    struct rpcrdma_req *req,
 347                                    struct rpc_rqst *rqst,
 348                                    enum rpcrdma_chunktype rtype)
 349{
 350        struct xdr_stream *xdr = &req->rl_stream;
 351        struct rpcrdma_mr_seg *seg;
 352        struct rpcrdma_mr *mr;
 353        unsigned int pos;
 354        int nsegs;
 355
 356        if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped)
 357                goto done;
 358
 359        pos = rqst->rq_snd_buf.head[0].iov_len;
 360        if (rtype == rpcrdma_areadch)
 361                pos = 0;
 362        seg = req->rl_segments;
 363        nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos,
 364                                     rtype, seg);
 365        if (nsegs < 0)
 366                return nsegs;
 367
 368        do {
 369                seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr);
 370                if (IS_ERR(seg))
 371                        return PTR_ERR(seg);
 372
 373                if (encode_read_segment(xdr, mr, pos) < 0)
 374                        return -EMSGSIZE;
 375
 376                trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, nsegs);
 377                r_xprt->rx_stats.read_chunk_count++;
 378                nsegs -= mr->mr_nents;
 379        } while (nsegs);
 380
 381done:
 382        if (xdr_stream_encode_item_absent(xdr) < 0)
 383                return -EMSGSIZE;
 384        return 0;
 385}
 386
 387/* Register and XDR encode the Write list. Supports encoding a list
 388 * containing one array of plain segments that belong to a single
 389 * write chunk.
 390 *
 391 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
 392 *
 393 *  Write chunklist (a list of (one) counted array):
 394 *   N elements:
 395 *    1 - N - HLOO - HLOO - ... - HLOO - 0
 396 *
 397 * Returns zero on success, or a negative errno if a failure occurred.
 398 * @xdr is advanced to the next position in the stream.
 399 *
 400 * Only a single Write chunk is currently supported.
 401 */
 402static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt,
 403                                     struct rpcrdma_req *req,
 404                                     struct rpc_rqst *rqst,
 405                                     enum rpcrdma_chunktype wtype)
 406{
 407        struct xdr_stream *xdr = &req->rl_stream;
 408        struct rpcrdma_mr_seg *seg;
 409        struct rpcrdma_mr *mr;
 410        int nsegs, nchunks;
 411        __be32 *segcount;
 412
 413        if (wtype != rpcrdma_writech)
 414                goto done;
 415
 416        seg = req->rl_segments;
 417        nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
 418                                     rqst->rq_rcv_buf.head[0].iov_len,
 419                                     wtype, seg);
 420        if (nsegs < 0)
 421                return nsegs;
 422
 423        if (xdr_stream_encode_item_present(xdr) < 0)
 424                return -EMSGSIZE;
 425        segcount = xdr_reserve_space(xdr, sizeof(*segcount));
 426        if (unlikely(!segcount))
 427                return -EMSGSIZE;
 428        /* Actual value encoded below */
 429
 430        nchunks = 0;
 431        do {
 432                seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
 433                if (IS_ERR(seg))
 434                        return PTR_ERR(seg);
 435
 436                if (encode_rdma_segment(xdr, mr) < 0)
 437                        return -EMSGSIZE;
 438
 439                trace_xprtrdma_chunk_write(rqst->rq_task, mr, nsegs);
 440                r_xprt->rx_stats.write_chunk_count++;
 441                r_xprt->rx_stats.total_rdma_request += mr->mr_length;
 442                nchunks++;
 443                nsegs -= mr->mr_nents;
 444        } while (nsegs);
 445
 446        /* Update count of segments in this Write chunk */
 447        *segcount = cpu_to_be32(nchunks);
 448
 449done:
 450        if (xdr_stream_encode_item_absent(xdr) < 0)
 451                return -EMSGSIZE;
 452        return 0;
 453}
 454
 455/* Register and XDR encode the Reply chunk. Supports encoding an array
 456 * of plain segments that belong to a single write (reply) chunk.
 457 *
 458 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
 459 *
 460 *  Reply chunk (a counted array):
 461 *   N elements:
 462 *    1 - N - HLOO - HLOO - ... - HLOO
 463 *
 464 * Returns zero on success, or a negative errno if a failure occurred.
 465 * @xdr is advanced to the next position in the stream.
 466 */
 467static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
 468                                      struct rpcrdma_req *req,
 469                                      struct rpc_rqst *rqst,
 470                                      enum rpcrdma_chunktype wtype)
 471{
 472        struct xdr_stream *xdr = &req->rl_stream;
 473        struct rpcrdma_mr_seg *seg;
 474        struct rpcrdma_mr *mr;
 475        int nsegs, nchunks;
 476        __be32 *segcount;
 477
 478        if (wtype != rpcrdma_replych) {
 479                if (xdr_stream_encode_item_absent(xdr) < 0)
 480                        return -EMSGSIZE;
 481                return 0;
 482        }
 483
 484        seg = req->rl_segments;
 485        nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
 486        if (nsegs < 0)
 487                return nsegs;
 488
 489        if (xdr_stream_encode_item_present(xdr) < 0)
 490                return -EMSGSIZE;
 491        segcount = xdr_reserve_space(xdr, sizeof(*segcount));
 492        if (unlikely(!segcount))
 493                return -EMSGSIZE;
 494        /* Actual value encoded below */
 495
 496        nchunks = 0;
 497        do {
 498                seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
 499                if (IS_ERR(seg))
 500                        return PTR_ERR(seg);
 501
 502                if (encode_rdma_segment(xdr, mr) < 0)
 503                        return -EMSGSIZE;
 504
 505                trace_xprtrdma_chunk_reply(rqst->rq_task, mr, nsegs);
 506                r_xprt->rx_stats.reply_chunk_count++;
 507                r_xprt->rx_stats.total_rdma_request += mr->mr_length;
 508                nchunks++;
 509                nsegs -= mr->mr_nents;
 510        } while (nsegs);
 511
 512        /* Update count of segments in the Reply chunk */
 513        *segcount = cpu_to_be32(nchunks);
 514
 515        return 0;
 516}
 517
 518static void rpcrdma_sendctx_done(struct kref *kref)
 519{
 520        struct rpcrdma_req *req =
 521                container_of(kref, struct rpcrdma_req, rl_kref);
 522        struct rpcrdma_rep *rep = req->rl_reply;
 523
 524        rpcrdma_complete_rqst(rep);
 525        rep->rr_rxprt->rx_stats.reply_waits_for_send++;
 526}
 527
 528/**
 529 * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
 530 * @sc: sendctx containing SGEs to unmap
 531 *
 532 */
 533void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
 534{
 535        struct rpcrdma_regbuf *rb = sc->sc_req->rl_sendbuf;
 536        struct ib_sge *sge;
 537
 538        if (!sc->sc_unmap_count)
 539                return;
 540
 541        /* The first two SGEs contain the transport header and
 542         * the inline buffer. These are always left mapped so
 543         * they can be cheaply re-used.
 544         */
 545        for (sge = &sc->sc_sges[2]; sc->sc_unmap_count;
 546             ++sge, --sc->sc_unmap_count)
 547                ib_dma_unmap_page(rdmab_device(rb), sge->addr, sge->length,
 548                                  DMA_TO_DEVICE);
 549
 550        kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
 551}
 552
 553/* Prepare an SGE for the RPC-over-RDMA transport header.
 554 */
 555static void rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt,
 556                                    struct rpcrdma_req *req, u32 len)
 557{
 558        struct rpcrdma_sendctx *sc = req->rl_sendctx;
 559        struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
 560        struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
 561
 562        sge->addr = rdmab_addr(rb);
 563        sge->length = len;
 564        sge->lkey = rdmab_lkey(rb);
 565
 566        ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
 567                                      DMA_TO_DEVICE);
 568}
 569
 570/* The head iovec is straightforward, as it is usually already
 571 * DMA-mapped. Sync the content that has changed.
 572 */
 573static bool rpcrdma_prepare_head_iov(struct rpcrdma_xprt *r_xprt,
 574                                     struct rpcrdma_req *req, unsigned int len)
 575{
 576        struct rpcrdma_sendctx *sc = req->rl_sendctx;
 577        struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
 578        struct rpcrdma_regbuf *rb = req->rl_sendbuf;
 579
 580        if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
 581                return false;
 582
 583        sge->addr = rdmab_addr(rb);
 584        sge->length = len;
 585        sge->lkey = rdmab_lkey(rb);
 586
 587        ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
 588                                      DMA_TO_DEVICE);
 589        return true;
 590}
 591
 592/* If there is a page list present, DMA map and prepare an
 593 * SGE for each page to be sent.
 594 */
 595static bool rpcrdma_prepare_pagelist(struct rpcrdma_req *req,
 596                                     struct xdr_buf *xdr)
 597{
 598        struct rpcrdma_sendctx *sc = req->rl_sendctx;
 599        struct rpcrdma_regbuf *rb = req->rl_sendbuf;
 600        unsigned int page_base, len, remaining;
 601        struct page **ppages;
 602        struct ib_sge *sge;
 603
 604        ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
 605        page_base = offset_in_page(xdr->page_base);
 606        remaining = xdr->page_len;
 607        while (remaining) {
 608                sge = &sc->sc_sges[req->rl_wr.num_sge++];
 609                len = min_t(unsigned int, PAGE_SIZE - page_base, remaining);
 610                sge->addr = ib_dma_map_page(rdmab_device(rb), *ppages,
 611                                            page_base, len, DMA_TO_DEVICE);
 612                if (ib_dma_mapping_error(rdmab_device(rb), sge->addr))
 613                        goto out_mapping_err;
 614
 615                sge->length = len;
 616                sge->lkey = rdmab_lkey(rb);
 617
 618                sc->sc_unmap_count++;
 619                ppages++;
 620                remaining -= len;
 621                page_base = 0;
 622        }
 623
 624        return true;
 625
 626out_mapping_err:
 627        trace_xprtrdma_dma_maperr(sge->addr);
 628        return false;
 629}
 630
 631/* The tail iovec may include an XDR pad for the page list,
 632 * as well as additional content, and may not reside in the
 633 * same page as the head iovec.
 634 */
 635static bool rpcrdma_prepare_tail_iov(struct rpcrdma_req *req,
 636                                     struct xdr_buf *xdr,
 637                                     unsigned int page_base, unsigned int len)
 638{
 639        struct rpcrdma_sendctx *sc = req->rl_sendctx;
 640        struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
 641        struct rpcrdma_regbuf *rb = req->rl_sendbuf;
 642        struct page *page = virt_to_page(xdr->tail[0].iov_base);
 643
 644        sge->addr = ib_dma_map_page(rdmab_device(rb), page, page_base, len,
 645                                    DMA_TO_DEVICE);
 646        if (ib_dma_mapping_error(rdmab_device(rb), sge->addr))
 647                goto out_mapping_err;
 648
 649        sge->length = len;
 650        sge->lkey = rdmab_lkey(rb);
 651        ++sc->sc_unmap_count;
 652        return true;
 653
 654out_mapping_err:
 655        trace_xprtrdma_dma_maperr(sge->addr);
 656        return false;
 657}
 658
 659/* Copy the tail to the end of the head buffer.
 660 */
 661static void rpcrdma_pullup_tail_iov(struct rpcrdma_xprt *r_xprt,
 662                                    struct rpcrdma_req *req,
 663                                    struct xdr_buf *xdr)
 664{
 665        unsigned char *dst;
 666
 667        dst = (unsigned char *)xdr->head[0].iov_base;
 668        dst += xdr->head[0].iov_len + xdr->page_len;
 669        memmove(dst, xdr->tail[0].iov_base, xdr->tail[0].iov_len);
 670        r_xprt->rx_stats.pullup_copy_count += xdr->tail[0].iov_len;
 671}
 672
 673/* Copy pagelist content into the head buffer.
 674 */
 675static void rpcrdma_pullup_pagelist(struct rpcrdma_xprt *r_xprt,
 676                                    struct rpcrdma_req *req,
 677                                    struct xdr_buf *xdr)
 678{
 679        unsigned int len, page_base, remaining;
 680        struct page **ppages;
 681        unsigned char *src, *dst;
 682
 683        dst = (unsigned char *)xdr->head[0].iov_base;
 684        dst += xdr->head[0].iov_len;
 685        ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
 686        page_base = offset_in_page(xdr->page_base);
 687        remaining = xdr->page_len;
 688        while (remaining) {
 689                src = page_address(*ppages);
 690                src += page_base;
 691                len = min_t(unsigned int, PAGE_SIZE - page_base, remaining);
 692                memcpy(dst, src, len);
 693                r_xprt->rx_stats.pullup_copy_count += len;
 694
 695                ppages++;
 696                dst += len;
 697                remaining -= len;
 698                page_base = 0;
 699        }
 700}
 701
 702/* Copy the contents of @xdr into @rl_sendbuf and DMA sync it.
 703 * When the head, pagelist, and tail are small, a pull-up copy
 704 * is considerably less costly than DMA mapping the components
 705 * of @xdr.
 706 *
 707 * Assumptions:
 708 *  - the caller has already verified that the total length
 709 *    of the RPC Call body will fit into @rl_sendbuf.
 710 */
 711static bool rpcrdma_prepare_noch_pullup(struct rpcrdma_xprt *r_xprt,
 712                                        struct rpcrdma_req *req,
 713                                        struct xdr_buf *xdr)
 714{
 715        if (unlikely(xdr->tail[0].iov_len))
 716                rpcrdma_pullup_tail_iov(r_xprt, req, xdr);
 717
 718        if (unlikely(xdr->page_len))
 719                rpcrdma_pullup_pagelist(r_xprt, req, xdr);
 720
 721        /* The whole RPC message resides in the head iovec now */
 722        return rpcrdma_prepare_head_iov(r_xprt, req, xdr->len);
 723}
 724
 725static bool rpcrdma_prepare_noch_mapped(struct rpcrdma_xprt *r_xprt,
 726                                        struct rpcrdma_req *req,
 727                                        struct xdr_buf *xdr)
 728{
 729        struct kvec *tail = &xdr->tail[0];
 730
 731        if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len))
 732                return false;
 733        if (xdr->page_len)
 734                if (!rpcrdma_prepare_pagelist(req, xdr))
 735                        return false;
 736        if (tail->iov_len)
 737                if (!rpcrdma_prepare_tail_iov(req, xdr,
 738                                              offset_in_page(tail->iov_base),
 739                                              tail->iov_len))
 740                        return false;
 741
 742        if (req->rl_sendctx->sc_unmap_count)
 743                kref_get(&req->rl_kref);
 744        return true;
 745}
 746
 747static bool rpcrdma_prepare_readch(struct rpcrdma_xprt *r_xprt,
 748                                   struct rpcrdma_req *req,
 749                                   struct xdr_buf *xdr)
 750{
 751        if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len))
 752                return false;
 753
 754        /* If there is a Read chunk, the page list is being handled
 755         * via explicit RDMA, and thus is skipped here.
 756         */
 757
 758        /* Do not include the tail if it is only an XDR pad */
 759        if (xdr->tail[0].iov_len > 3) {
 760                unsigned int page_base, len;
 761
 762                /* If the content in the page list is an odd length,
 763                 * xdr_write_pages() adds a pad at the beginning of
 764                 * the tail iovec. Force the tail's non-pad content to
 765                 * land at the next XDR position in the Send message.
 766                 */
 767                page_base = offset_in_page(xdr->tail[0].iov_base);
 768                len = xdr->tail[0].iov_len;
 769                page_base += len & 3;
 770                len -= len & 3;
 771                if (!rpcrdma_prepare_tail_iov(req, xdr, page_base, len))
 772                        return false;
 773                kref_get(&req->rl_kref);
 774        }
 775
 776        return true;
 777}
 778
 779/**
 780 * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR
 781 * @r_xprt: controlling transport
 782 * @req: context of RPC Call being marshalled
 783 * @hdrlen: size of transport header, in bytes
 784 * @xdr: xdr_buf containing RPC Call
 785 * @rtype: chunk type being encoded
 786 *
 787 * Returns 0 on success; otherwise a negative errno is returned.
 788 */
 789inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
 790                                     struct rpcrdma_req *req, u32 hdrlen,
 791                                     struct xdr_buf *xdr,
 792                                     enum rpcrdma_chunktype rtype)
 793{
 794        int ret;
 795
 796        ret = -EAGAIN;
 797        req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
 798        if (!req->rl_sendctx)
 799                goto out_nosc;
 800        req->rl_sendctx->sc_unmap_count = 0;
 801        req->rl_sendctx->sc_req = req;
 802        kref_init(&req->rl_kref);
 803        req->rl_wr.wr_cqe = &req->rl_sendctx->sc_cqe;
 804        req->rl_wr.sg_list = req->rl_sendctx->sc_sges;
 805        req->rl_wr.num_sge = 0;
 806        req->rl_wr.opcode = IB_WR_SEND;
 807
 808        rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen);
 809
 810        ret = -EIO;
 811        switch (rtype) {
 812        case rpcrdma_noch_pullup:
 813                if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr))
 814                        goto out_unmap;
 815                break;
 816        case rpcrdma_noch_mapped:
 817                if (!rpcrdma_prepare_noch_mapped(r_xprt, req, xdr))
 818                        goto out_unmap;
 819                break;
 820        case rpcrdma_readch:
 821                if (!rpcrdma_prepare_readch(r_xprt, req, xdr))
 822                        goto out_unmap;
 823                break;
 824        case rpcrdma_areadch:
 825                break;
 826        default:
 827                goto out_unmap;
 828        }
 829
 830        return 0;
 831
 832out_unmap:
 833        rpcrdma_sendctx_unmap(req->rl_sendctx);
 834out_nosc:
 835        trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
 836        return ret;
 837}
 838
 839/**
 840 * rpcrdma_marshal_req - Marshal and send one RPC request
 841 * @r_xprt: controlling transport
 842 * @rqst: RPC request to be marshaled
 843 *
 844 * For the RPC in "rqst", this function:
 845 *  - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG)
 846 *  - Registers Read, Write, and Reply chunks
 847 *  - Constructs the transport header
 848 *  - Posts a Send WR to send the transport header and request
 849 *
 850 * Returns:
 851 *      %0 if the RPC was sent successfully,
 852 *      %-ENOTCONN if the connection was lost,
 853 *      %-EAGAIN if the caller should call again with the same arguments,
 854 *      %-ENOBUFS if the caller should call again after a delay,
 855 *      %-EMSGSIZE if the transport header is too small,
 856 *      %-EIO if a permanent problem occurred while marshaling.
 857 */
 858int
 859rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
 860{
 861        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 862        struct xdr_stream *xdr = &req->rl_stream;
 863        enum rpcrdma_chunktype rtype, wtype;
 864        struct xdr_buf *buf = &rqst->rq_snd_buf;
 865        bool ddp_allowed;
 866        __be32 *p;
 867        int ret;
 868
 869        if (unlikely(rqst->rq_rcv_buf.flags & XDRBUF_SPARSE_PAGES)) {
 870                ret = rpcrdma_alloc_sparse_pages(&rqst->rq_rcv_buf);
 871                if (ret)
 872                        return ret;
 873        }
 874
 875        rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
 876        xdr_init_encode(xdr, &req->rl_hdrbuf, rdmab_data(req->rl_rdmabuf),
 877                        rqst);
 878
 879        /* Fixed header fields */
 880        ret = -EMSGSIZE;
 881        p = xdr_reserve_space(xdr, 4 * sizeof(*p));
 882        if (!p)
 883                goto out_err;
 884        *p++ = rqst->rq_xid;
 885        *p++ = rpcrdma_version;
 886        *p++ = r_xprt->rx_buf.rb_max_requests;
 887
 888        /* When the ULP employs a GSS flavor that guarantees integrity
 889         * or privacy, direct data placement of individual data items
 890         * is not allowed.
 891         */
 892        ddp_allowed = !test_bit(RPCAUTH_AUTH_DATATOUCH,
 893                                &rqst->rq_cred->cr_auth->au_flags);
 894
 895        /*
 896         * Chunks needed for results?
 897         *
 898         * o If the expected result is under the inline threshold, all ops
 899         *   return as inline.
 900         * o Large read ops return data as write chunk(s), header as
 901         *   inline.
 902         * o Large non-read ops return as a single reply chunk.
 903         */
 904        if (rpcrdma_results_inline(r_xprt, rqst))
 905                wtype = rpcrdma_noch;
 906        else if ((ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) &&
 907                 rpcrdma_nonpayload_inline(r_xprt, rqst))
 908                wtype = rpcrdma_writech;
 909        else
 910                wtype = rpcrdma_replych;
 911
 912        /*
 913         * Chunks needed for arguments?
 914         *
 915         * o If the total request is under the inline threshold, all ops
 916         *   are sent as inline.
 917         * o Large write ops transmit data as read chunk(s), header as
 918         *   inline.
 919         * o Large non-write ops are sent with the entire message as a
 920         *   single read chunk (protocol 0-position special case).
 921         *
 922         * This assumes that the upper layer does not present a request
 923         * that both has a data payload, and whose non-data arguments
 924         * by themselves are larger than the inline threshold.
 925         */
 926        if (rpcrdma_args_inline(r_xprt, rqst)) {
 927                *p++ = rdma_msg;
 928                rtype = buf->len < rdmab_length(req->rl_sendbuf) ?
 929                        rpcrdma_noch_pullup : rpcrdma_noch_mapped;
 930        } else if (ddp_allowed && buf->flags & XDRBUF_WRITE) {
 931                *p++ = rdma_msg;
 932                rtype = rpcrdma_readch;
 933        } else {
 934                r_xprt->rx_stats.nomsg_call_count++;
 935                *p++ = rdma_nomsg;
 936                rtype = rpcrdma_areadch;
 937        }
 938
 939        /* This implementation supports the following combinations
 940         * of chunk lists in one RPC-over-RDMA Call message:
 941         *
 942         *   - Read list
 943         *   - Write list
 944         *   - Reply chunk
 945         *   - Read list + Reply chunk
 946         *
 947         * It might not yet support the following combinations:
 948         *
 949         *   - Read list + Write list
 950         *
 951         * It does not support the following combinations:
 952         *
 953         *   - Write list + Reply chunk
 954         *   - Read list + Write list + Reply chunk
 955         *
 956         * This implementation supports only a single chunk in each
 957         * Read or Write list. Thus for example the client cannot
 958         * send a Call message with a Position Zero Read chunk and a
 959         * regular Read chunk at the same time.
 960         */
 961        ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
 962        if (ret)
 963                goto out_err;
 964        ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
 965        if (ret)
 966                goto out_err;
 967        ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
 968        if (ret)
 969                goto out_err;
 970
 971        ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
 972                                        buf, rtype);
 973        if (ret)
 974                goto out_err;
 975
 976        trace_xprtrdma_marshal(req, rtype, wtype);
 977        return 0;
 978
 979out_err:
 980        trace_xprtrdma_marshal_failed(rqst, ret);
 981        r_xprt->rx_stats.failed_marshal_count++;
 982        frwr_reset(req);
 983        return ret;
 984}
 985
 986static void __rpcrdma_update_cwnd_locked(struct rpc_xprt *xprt,
 987                                         struct rpcrdma_buffer *buf,
 988                                         u32 grant)
 989{
 990        buf->rb_credits = grant;
 991        xprt->cwnd = grant << RPC_CWNDSHIFT;
 992}
 993
 994static void rpcrdma_update_cwnd(struct rpcrdma_xprt *r_xprt, u32 grant)
 995{
 996        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 997
 998        spin_lock(&xprt->transport_lock);
 999        __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, grant);
1000        spin_unlock(&xprt->transport_lock);
1001}
1002
1003/**
1004 * rpcrdma_reset_cwnd - Reset the xprt's congestion window
1005 * @r_xprt: controlling transport instance
1006 *
1007 * Prepare @r_xprt for the next connection by reinitializing
1008 * its credit grant to one (see RFC 8166, Section 3.3.3).
1009 */
1010void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt)
1011{
1012        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1013
1014        spin_lock(&xprt->transport_lock);
1015        xprt->cong = 0;
1016        __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, 1);
1017        spin_unlock(&xprt->transport_lock);
1018}
1019
1020/**
1021 * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
1022 * @rqst: controlling RPC request
1023 * @srcp: points to RPC message payload in receive buffer
1024 * @copy_len: remaining length of receive buffer content
1025 * @pad: Write chunk pad bytes needed (zero for pure inline)
1026 *
1027 * The upper layer has set the maximum number of bytes it can
1028 * receive in each component of rq_rcv_buf. These values are set in
1029 * the head.iov_len, page_len, tail.iov_len, and buflen fields.
1030 *
1031 * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in
1032 * many cases this function simply updates iov_base pointers in
1033 * rq_rcv_buf to point directly to the received reply data, to
1034 * avoid copying reply data.
1035 *
1036 * Returns the count of bytes which had to be memcopied.
1037 */
1038static unsigned long
1039rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
1040{
1041        unsigned long fixup_copy_count;
1042        int i, npages, curlen;
1043        char *destp;
1044        struct page **ppages;
1045        int page_base;
1046
1047        /* The head iovec is redirected to the RPC reply message
1048         * in the receive buffer, to avoid a memcopy.
1049         */
1050        rqst->rq_rcv_buf.head[0].iov_base = srcp;
1051        rqst->rq_private_buf.head[0].iov_base = srcp;
1052
1053        /* The contents of the receive buffer that follow
1054         * head.iov_len bytes are copied into the page list.
1055         */
1056        curlen = rqst->rq_rcv_buf.head[0].iov_len;
1057        if (curlen > copy_len)
1058                curlen = copy_len;
1059        srcp += curlen;
1060        copy_len -= curlen;
1061
1062        ppages = rqst->rq_rcv_buf.pages +
1063                (rqst->rq_rcv_buf.page_base >> PAGE_SHIFT);
1064        page_base = offset_in_page(rqst->rq_rcv_buf.page_base);
1065        fixup_copy_count = 0;
1066        if (copy_len && rqst->rq_rcv_buf.page_len) {
1067                int pagelist_len;
1068
1069                pagelist_len = rqst->rq_rcv_buf.page_len;
1070                if (pagelist_len > copy_len)
1071                        pagelist_len = copy_len;
1072                npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT;
1073                for (i = 0; i < npages; i++) {
1074                        curlen = PAGE_SIZE - page_base;
1075                        if (curlen > pagelist_len)
1076                                curlen = pagelist_len;
1077
1078                        destp = kmap_atomic(ppages[i]);
1079                        memcpy(destp + page_base, srcp, curlen);
1080                        flush_dcache_page(ppages[i]);
1081                        kunmap_atomic(destp);
1082                        srcp += curlen;
1083                        copy_len -= curlen;
1084                        fixup_copy_count += curlen;
1085                        pagelist_len -= curlen;
1086                        if (!pagelist_len)
1087                                break;
1088                        page_base = 0;
1089                }
1090
1091                /* Implicit padding for the last segment in a Write
1092                 * chunk is inserted inline at the front of the tail
1093                 * iovec. The upper layer ignores the content of
1094                 * the pad. Simply ensure inline content in the tail
1095                 * that follows the Write chunk is properly aligned.
1096                 */
1097                if (pad)
1098                        srcp -= pad;
1099        }
1100
1101        /* The tail iovec is redirected to the remaining data
1102         * in the receive buffer, to avoid a memcopy.
1103         */
1104        if (copy_len || pad) {
1105                rqst->rq_rcv_buf.tail[0].iov_base = srcp;
1106                rqst->rq_private_buf.tail[0].iov_base = srcp;
1107        }
1108
1109        if (fixup_copy_count)
1110                trace_xprtrdma_fixup(rqst, fixup_copy_count);
1111        return fixup_copy_count;
1112}
1113
1114/* By convention, backchannel calls arrive via rdma_msg type
1115 * messages, and never populate the chunk lists. This makes
1116 * the RPC/RDMA header small and fixed in size, so it is
1117 * straightforward to check the RPC header's direction field.
1118 */
1119static bool
1120rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
1121#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1122{
1123        struct xdr_stream *xdr = &rep->rr_stream;
1124        __be32 *p;
1125
1126        if (rep->rr_proc != rdma_msg)
1127                return false;
1128
1129        /* Peek at stream contents without advancing. */
1130        p = xdr_inline_decode(xdr, 0);
1131
1132        /* Chunk lists */
1133        if (xdr_item_is_present(p++))
1134                return false;
1135        if (xdr_item_is_present(p++))
1136                return false;
1137        if (xdr_item_is_present(p++))
1138                return false;
1139
1140        /* RPC header */
1141        if (*p++ != rep->rr_xid)
1142                return false;
1143        if (*p != cpu_to_be32(RPC_CALL))
1144                return false;
1145
1146        /* Now that we are sure this is a backchannel call,
1147         * advance to the RPC header.
1148         */
1149        p = xdr_inline_decode(xdr, 3 * sizeof(*p));
1150        if (unlikely(!p))
1151                return true;
1152
1153        rpcrdma_bc_receive_call(r_xprt, rep);
1154        return true;
1155}
1156#else   /* CONFIG_SUNRPC_BACKCHANNEL */
1157{
1158        return false;
1159}
1160#endif  /* CONFIG_SUNRPC_BACKCHANNEL */
1161
1162static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length)
1163{
1164        u32 handle;
1165        u64 offset;
1166        __be32 *p;
1167
1168        p = xdr_inline_decode(xdr, 4 * sizeof(*p));
1169        if (unlikely(!p))
1170                return -EIO;
1171
1172        xdr_decode_rdma_segment(p, &handle, length, &offset);
1173        trace_xprtrdma_decode_seg(handle, *length, offset);
1174        return 0;
1175}
1176
1177static int decode_write_chunk(struct xdr_stream *xdr, u32 *length)
1178{
1179        u32 segcount, seglength;
1180        __be32 *p;
1181
1182        p = xdr_inline_decode(xdr, sizeof(*p));
1183        if (unlikely(!p))
1184                return -EIO;
1185
1186        *length = 0;
1187        segcount = be32_to_cpup(p);
1188        while (segcount--) {
1189                if (decode_rdma_segment(xdr, &seglength))
1190                        return -EIO;
1191                *length += seglength;
1192        }
1193
1194        return 0;
1195}
1196
1197/* In RPC-over-RDMA Version One replies, a Read list is never
1198 * expected. This decoder is a stub that returns an error if
1199 * a Read list is present.
1200 */
1201static int decode_read_list(struct xdr_stream *xdr)
1202{
1203        __be32 *p;
1204
1205        p = xdr_inline_decode(xdr, sizeof(*p));
1206        if (unlikely(!p))
1207                return -EIO;
1208        if (unlikely(xdr_item_is_present(p)))
1209                return -EIO;
1210        return 0;
1211}
1212
1213/* Supports only one Write chunk in the Write list
1214 */
1215static int decode_write_list(struct xdr_stream *xdr, u32 *length)
1216{
1217        u32 chunklen;
1218        bool first;
1219        __be32 *p;
1220
1221        *length = 0;
1222        first = true;
1223        do {
1224                p = xdr_inline_decode(xdr, sizeof(*p));
1225                if (unlikely(!p))
1226                        return -EIO;
1227                if (xdr_item_is_absent(p))
1228                        break;
1229                if (!first)
1230                        return -EIO;
1231
1232                if (decode_write_chunk(xdr, &chunklen))
1233                        return -EIO;
1234                *length += chunklen;
1235                first = false;
1236        } while (true);
1237        return 0;
1238}
1239
1240static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length)
1241{
1242        __be32 *p;
1243
1244        p = xdr_inline_decode(xdr, sizeof(*p));
1245        if (unlikely(!p))
1246                return -EIO;
1247
1248        *length = 0;
1249        if (xdr_item_is_present(p))
1250                if (decode_write_chunk(xdr, length))
1251                        return -EIO;
1252        return 0;
1253}
1254
1255static int
1256rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1257                   struct rpc_rqst *rqst)
1258{
1259        struct xdr_stream *xdr = &rep->rr_stream;
1260        u32 writelist, replychunk, rpclen;
1261        char *base;
1262
1263        /* Decode the chunk lists */
1264        if (decode_read_list(xdr))
1265                return -EIO;
1266        if (decode_write_list(xdr, &writelist))
1267                return -EIO;
1268        if (decode_reply_chunk(xdr, &replychunk))
1269                return -EIO;
1270
1271        /* RDMA_MSG sanity checks */
1272        if (unlikely(replychunk))
1273                return -EIO;
1274
1275        /* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */
1276        base = (char *)xdr_inline_decode(xdr, 0);
1277        rpclen = xdr_stream_remaining(xdr);
1278        r_xprt->rx_stats.fixup_copy_count +=
1279                rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3);
1280
1281        r_xprt->rx_stats.total_rdma_reply += writelist;
1282        return rpclen + xdr_align_size(writelist);
1283}
1284
1285static noinline int
1286rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
1287{
1288        struct xdr_stream *xdr = &rep->rr_stream;
1289        u32 writelist, replychunk;
1290
1291        /* Decode the chunk lists */
1292        if (decode_read_list(xdr))
1293                return -EIO;
1294        if (decode_write_list(xdr, &writelist))
1295                return -EIO;
1296        if (decode_reply_chunk(xdr, &replychunk))
1297                return -EIO;
1298
1299        /* RDMA_NOMSG sanity checks */
1300        if (unlikely(writelist))
1301                return -EIO;
1302        if (unlikely(!replychunk))
1303                return -EIO;
1304
1305        /* Reply chunk buffer already is the reply vector */
1306        r_xprt->rx_stats.total_rdma_reply += replychunk;
1307        return replychunk;
1308}
1309
1310static noinline int
1311rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1312                     struct rpc_rqst *rqst)
1313{
1314        struct xdr_stream *xdr = &rep->rr_stream;
1315        __be32 *p;
1316
1317        p = xdr_inline_decode(xdr, sizeof(*p));
1318        if (unlikely(!p))
1319                return -EIO;
1320
1321        switch (*p) {
1322        case err_vers:
1323                p = xdr_inline_decode(xdr, 2 * sizeof(*p));
1324                if (!p)
1325                        break;
1326                trace_xprtrdma_err_vers(rqst, p, p + 1);
1327                break;
1328        case err_chunk:
1329                trace_xprtrdma_err_chunk(rqst);
1330                break;
1331        default:
1332                trace_xprtrdma_err_unrecognized(rqst, p);
1333        }
1334
1335        return -EIO;
1336}
1337
1338/**
1339 * rpcrdma_unpin_rqst - Release rqst without completing it
1340 * @rep: RPC/RDMA Receive context
1341 *
1342 * This is done when a connection is lost so that a Reply
1343 * can be dropped and its matching Call can be subsequently
1344 * retransmitted on a new connection.
1345 */
1346void rpcrdma_unpin_rqst(struct rpcrdma_rep *rep)
1347{
1348        struct rpc_xprt *xprt = &rep->rr_rxprt->rx_xprt;
1349        struct rpc_rqst *rqst = rep->rr_rqst;
1350        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
1351
1352        req->rl_reply = NULL;
1353        rep->rr_rqst = NULL;
1354
1355        spin_lock(&xprt->queue_lock);
1356        xprt_unpin_rqst(rqst);
1357        spin_unlock(&xprt->queue_lock);
1358}
1359
1360/**
1361 * rpcrdma_complete_rqst - Pass completed rqst back to RPC
1362 * @rep: RPC/RDMA Receive context
1363 *
1364 * Reconstruct the RPC reply and complete the transaction
1365 * while @rqst is still pinned to ensure the rep, rqst, and
1366 * rq_task pointers remain stable.
1367 */
1368void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
1369{
1370        struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1371        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1372        struct rpc_rqst *rqst = rep->rr_rqst;
1373        int status;
1374
1375        switch (rep->rr_proc) {
1376        case rdma_msg:
1377                status = rpcrdma_decode_msg(r_xprt, rep, rqst);
1378                break;
1379        case rdma_nomsg:
1380                status = rpcrdma_decode_nomsg(r_xprt, rep);
1381                break;
1382        case rdma_error:
1383                status = rpcrdma_decode_error(r_xprt, rep, rqst);
1384                break;
1385        default:
1386                status = -EIO;
1387        }
1388        if (status < 0)
1389                goto out_badheader;
1390
1391out:
1392        spin_lock(&xprt->queue_lock);
1393        xprt_complete_rqst(rqst->rq_task, status);
1394        xprt_unpin_rqst(rqst);
1395        spin_unlock(&xprt->queue_lock);
1396        return;
1397
1398out_badheader:
1399        trace_xprtrdma_reply_hdr_err(rep);
1400        r_xprt->rx_stats.bad_reply_count++;
1401        rqst->rq_task->tk_status = status;
1402        status = 0;
1403        goto out;
1404}
1405
1406static void rpcrdma_reply_done(struct kref *kref)
1407{
1408        struct rpcrdma_req *req =
1409                container_of(kref, struct rpcrdma_req, rl_kref);
1410
1411        rpcrdma_complete_rqst(req->rl_reply);
1412}
1413
1414/**
1415 * rpcrdma_reply_handler - Process received RPC/RDMA messages
1416 * @rep: Incoming rpcrdma_rep object to process
1417 *
1418 * Errors must result in the RPC task either being awakened, or
1419 * allowed to timeout, to discover the errors at that time.
1420 */
1421void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
1422{
1423        struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1424        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1425        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1426        struct rpcrdma_req *req;
1427        struct rpc_rqst *rqst;
1428        u32 credits;
1429        __be32 *p;
1430
1431        /* Any data means we had a useful conversation, so
1432         * then we don't need to delay the next reconnect.
1433         */
1434        if (xprt->reestablish_timeout)
1435                xprt->reestablish_timeout = 0;
1436
1437        /* Fixed transport header fields */
1438        xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
1439                        rep->rr_hdrbuf.head[0].iov_base, NULL);
1440        p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p));
1441        if (unlikely(!p))
1442                goto out_shortreply;
1443        rep->rr_xid = *p++;
1444        rep->rr_vers = *p++;
1445        credits = be32_to_cpu(*p++);
1446        rep->rr_proc = *p++;
1447
1448        if (rep->rr_vers != rpcrdma_version)
1449                goto out_badversion;
1450
1451        if (rpcrdma_is_bcall(r_xprt, rep))
1452                return;
1453
1454        /* Match incoming rpcrdma_rep to an rpcrdma_req to
1455         * get context for handling any incoming chunks.
1456         */
1457        spin_lock(&xprt->queue_lock);
1458        rqst = xprt_lookup_rqst(xprt, rep->rr_xid);
1459        if (!rqst)
1460                goto out_norqst;
1461        xprt_pin_rqst(rqst);
1462        spin_unlock(&xprt->queue_lock);
1463
1464        if (credits == 0)
1465                credits = 1;    /* don't deadlock */
1466        else if (credits > r_xprt->rx_ep->re_max_requests)
1467                credits = r_xprt->rx_ep->re_max_requests;
1468        rpcrdma_post_recvs(r_xprt, credits + (buf->rb_bc_srv_max_requests << 1),
1469                           false);
1470        if (buf->rb_credits != credits)
1471                rpcrdma_update_cwnd(r_xprt, credits);
1472
1473        req = rpcr_to_rdmar(rqst);
1474        if (unlikely(req->rl_reply))
1475                rpcrdma_rep_put(buf, req->rl_reply);
1476        req->rl_reply = rep;
1477        rep->rr_rqst = rqst;
1478
1479        trace_xprtrdma_reply(rqst->rq_task, rep, credits);
1480
1481        if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
1482                frwr_reminv(rep, &req->rl_registered);
1483        if (!list_empty(&req->rl_registered))
1484                frwr_unmap_async(r_xprt, req);
1485                /* LocalInv completion will complete the RPC */
1486        else
1487                kref_put(&req->rl_kref, rpcrdma_reply_done);
1488        return;
1489
1490out_badversion:
1491        trace_xprtrdma_reply_vers_err(rep);
1492        goto out;
1493
1494out_norqst:
1495        spin_unlock(&xprt->queue_lock);
1496        trace_xprtrdma_reply_rqst_err(rep);
1497        goto out;
1498
1499out_shortreply:
1500        trace_xprtrdma_reply_short_err(rep);
1501
1502out:
1503        rpcrdma_rep_put(buf, rep);
1504}
1505