linux/net/sunrpc/xprtrdma/rpc_rdma.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
   2/*
   3 * Copyright (c) 2014-2017 Oracle.  All rights reserved.
   4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   5 *
   6 * This software is available to you under a choice of one of two
   7 * licenses.  You may choose to be licensed under the terms of the GNU
   8 * General Public License (GPL) Version 2, available from the file
   9 * COPYING in the main directory of this source tree, or the BSD-type
  10 * license below:
  11 *
  12 * Redistribution and use in source and binary forms, with or without
  13 * modification, are permitted provided that the following conditions
  14 * are met:
  15 *
  16 *      Redistributions of source code must retain the above copyright
  17 *      notice, this list of conditions and the following disclaimer.
  18 *
  19 *      Redistributions in binary form must reproduce the above
  20 *      copyright notice, this list of conditions and the following
  21 *      disclaimer in the documentation and/or other materials provided
  22 *      with the distribution.
  23 *
  24 *      Neither the name of the Network Appliance, Inc. nor the names of
  25 *      its contributors may be used to endorse or promote products
  26 *      derived from this software without specific prior written
  27 *      permission.
  28 *
  29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  40 */
  41
  42/*
  43 * rpc_rdma.c
  44 *
  45 * This file contains the guts of the RPC RDMA protocol, and
  46 * does marshaling/unmarshaling, etc. It is also where interfacing
  47 * to the Linux RPC framework lives.
  48 */
  49
  50#include <linux/highmem.h>
  51
  52#include <linux/sunrpc/svc_rdma.h>
  53
  54#include "xprt_rdma.h"
  55#include <trace/events/rpcrdma.h>
  56
  57#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  58# define RPCDBG_FACILITY        RPCDBG_TRANS
  59#endif
  60
  61/* Returns size of largest RPC-over-RDMA header in a Call message
  62 *
  63 * The largest Call header contains a full-size Read list and a
  64 * minimal Reply chunk.
  65 */
  66static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
  67{
  68        unsigned int size;
  69
  70        /* Fixed header fields and list discriminators */
  71        size = RPCRDMA_HDRLEN_MIN;
  72
  73        /* Maximum Read list size */
  74        size = maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
  75
  76        /* Minimal Read chunk size */
  77        size += sizeof(__be32); /* segment count */
  78        size += rpcrdma_segment_maxsz * sizeof(__be32);
  79        size += sizeof(__be32); /* list discriminator */
  80
  81        dprintk("RPC:       %s: max call header size = %u\n",
  82                __func__, size);
  83        return size;
  84}
  85
  86/* Returns size of largest RPC-over-RDMA header in a Reply message
  87 *
  88 * There is only one Write list or one Reply chunk per Reply
  89 * message.  The larger list is the Write list.
  90 */
  91static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
  92{
  93        unsigned int size;
  94
  95        /* Fixed header fields and list discriminators */
  96        size = RPCRDMA_HDRLEN_MIN;
  97
  98        /* Maximum Write list size */
  99        size = sizeof(__be32);          /* segment count */
 100        size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
 101        size += sizeof(__be32); /* list discriminator */
 102
 103        dprintk("RPC:       %s: max reply header size = %u\n",
 104                __func__, size);
 105        return size;
 106}
 107
 108/**
 109 * rpcrdma_set_max_header_sizes - Initialize inline payload sizes
 110 * @r_xprt: transport instance to initialize
 111 *
 112 * The max_inline fields contain the maximum size of an RPC message
 113 * so the marshaling code doesn't have to repeat this calculation
 114 * for every RPC.
 115 */
 116void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
 117{
 118        unsigned int maxsegs = r_xprt->rx_ia.ri_max_segs;
 119        struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 120
 121        ep->rep_max_inline_send =
 122                ep->rep_inline_send - rpcrdma_max_call_header_size(maxsegs);
 123        ep->rep_max_inline_recv =
 124                ep->rep_inline_recv - rpcrdma_max_reply_header_size(maxsegs);
 125}
 126
 127/* The client can send a request inline as long as the RPCRDMA header
 128 * plus the RPC call fit under the transport's inline limit. If the
 129 * combined call message size exceeds that limit, the client must use
 130 * a Read chunk for this operation.
 131 *
 132 * A Read chunk is also required if sending the RPC call inline would
 133 * exceed this device's max_sge limit.
 134 */
 135static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
 136                                struct rpc_rqst *rqst)
 137{
 138        struct xdr_buf *xdr = &rqst->rq_snd_buf;
 139        unsigned int count, remaining, offset;
 140
 141        if (xdr->len > r_xprt->rx_ep.rep_max_inline_send)
 142                return false;
 143
 144        if (xdr->page_len) {
 145                remaining = xdr->page_len;
 146                offset = offset_in_page(xdr->page_base);
 147                count = RPCRDMA_MIN_SEND_SGES;
 148                while (remaining) {
 149                        remaining -= min_t(unsigned int,
 150                                           PAGE_SIZE - offset, remaining);
 151                        offset = 0;
 152                        if (++count > r_xprt->rx_ia.ri_max_send_sges)
 153                                return false;
 154                }
 155        }
 156
 157        return true;
 158}
 159
 160/* The client can't know how large the actual reply will be. Thus it
 161 * plans for the largest possible reply for that particular ULP
 162 * operation. If the maximum combined reply message size exceeds that
 163 * limit, the client must provide a write list or a reply chunk for
 164 * this request.
 165 */
 166static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 167                                   struct rpc_rqst *rqst)
 168{
 169        return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep.rep_max_inline_recv;
 170}
 171
 172/* The client is required to provide a Reply chunk if the maximum
 173 * size of the non-payload part of the RPC Reply is larger than
 174 * the inline threshold.
 175 */
 176static bool
 177rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt,
 178                          const struct rpc_rqst *rqst)
 179{
 180        const struct xdr_buf *buf = &rqst->rq_rcv_buf;
 181
 182        return (buf->head[0].iov_len + buf->tail[0].iov_len) <
 183                r_xprt->rx_ep.rep_max_inline_recv;
 184}
 185
 186/* Split @vec on page boundaries into SGEs. FMR registers pages, not
 187 * a byte range. Other modes coalesce these SGEs into a single MR
 188 * when they can.
 189 *
 190 * Returns pointer to next available SGE, and bumps the total number
 191 * of SGEs consumed.
 192 */
 193static struct rpcrdma_mr_seg *
 194rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
 195                     unsigned int *n)
 196{
 197        u32 remaining, page_offset;
 198        char *base;
 199
 200        base = vec->iov_base;
 201        page_offset = offset_in_page(base);
 202        remaining = vec->iov_len;
 203        while (remaining) {
 204                seg->mr_page = NULL;
 205                seg->mr_offset = base;
 206                seg->mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
 207                remaining -= seg->mr_len;
 208                base += seg->mr_len;
 209                ++seg;
 210                ++(*n);
 211                page_offset = 0;
 212        }
 213        return seg;
 214}
 215
 216/* Convert @xdrbuf into SGEs no larger than a page each. As they
 217 * are registered, these SGEs are then coalesced into RDMA segments
 218 * when the selected memreg mode supports it.
 219 *
 220 * Returns positive number of SGEs consumed, or a negative errno.
 221 */
 222
 223static int
 224rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
 225                     unsigned int pos, enum rpcrdma_chunktype type,
 226                     struct rpcrdma_mr_seg *seg)
 227{
 228        unsigned long page_base;
 229        unsigned int len, n;
 230        struct page **ppages;
 231
 232        n = 0;
 233        if (pos == 0)
 234                seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n);
 235
 236        len = xdrbuf->page_len;
 237        ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
 238        page_base = offset_in_page(xdrbuf->page_base);
 239        while (len) {
 240                /* ACL likes to be lazy in allocating pages - ACLs
 241                 * are small by default but can get huge.
 242                 */
 243                if (unlikely(xdrbuf->flags & XDRBUF_SPARSE_PAGES)) {
 244                        if (!*ppages)
 245                                *ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN);
 246                        if (!*ppages)
 247                                return -ENOBUFS;
 248                }
 249                seg->mr_page = *ppages;
 250                seg->mr_offset = (char *)page_base;
 251                seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len);
 252                len -= seg->mr_len;
 253                ++ppages;
 254                ++seg;
 255                ++n;
 256                page_base = 0;
 257        }
 258
 259        /* When encoding a Read chunk, the tail iovec contains an
 260         * XDR pad and may be omitted.
 261         */
 262        if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup)
 263                goto out;
 264
 265        /* When encoding a Write chunk, some servers need to see an
 266         * extra segment for non-XDR-aligned Write chunks. The upper
 267         * layer provides space in the tail iovec that may be used
 268         * for this purpose.
 269         */
 270        if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup)
 271                goto out;
 272
 273        if (xdrbuf->tail[0].iov_len)
 274                seg = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n);
 275
 276out:
 277        if (unlikely(n > RPCRDMA_MAX_SEGS))
 278                return -EIO;
 279        return n;
 280}
 281
 282static inline int
 283encode_item_present(struct xdr_stream *xdr)
 284{
 285        __be32 *p;
 286
 287        p = xdr_reserve_space(xdr, sizeof(*p));
 288        if (unlikely(!p))
 289                return -EMSGSIZE;
 290
 291        *p = xdr_one;
 292        return 0;
 293}
 294
 295static inline int
 296encode_item_not_present(struct xdr_stream *xdr)
 297{
 298        __be32 *p;
 299
 300        p = xdr_reserve_space(xdr, sizeof(*p));
 301        if (unlikely(!p))
 302                return -EMSGSIZE;
 303
 304        *p = xdr_zero;
 305        return 0;
 306}
 307
 308static void
 309xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr *mr)
 310{
 311        *iptr++ = cpu_to_be32(mr->mr_handle);
 312        *iptr++ = cpu_to_be32(mr->mr_length);
 313        xdr_encode_hyper(iptr, mr->mr_offset);
 314}
 315
 316static int
 317encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr)
 318{
 319        __be32 *p;
 320
 321        p = xdr_reserve_space(xdr, 4 * sizeof(*p));
 322        if (unlikely(!p))
 323                return -EMSGSIZE;
 324
 325        xdr_encode_rdma_segment(p, mr);
 326        return 0;
 327}
 328
 329static int
 330encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr,
 331                    u32 position)
 332{
 333        __be32 *p;
 334
 335        p = xdr_reserve_space(xdr, 6 * sizeof(*p));
 336        if (unlikely(!p))
 337                return -EMSGSIZE;
 338
 339        *p++ = xdr_one;                 /* Item present */
 340        *p++ = cpu_to_be32(position);
 341        xdr_encode_rdma_segment(p, mr);
 342        return 0;
 343}
 344
 345/* Register and XDR encode the Read list. Supports encoding a list of read
 346 * segments that belong to a single read chunk.
 347 *
 348 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
 349 *
 350 *  Read chunklist (a linked list):
 351 *   N elements, position P (same P for all chunks of same arg!):
 352 *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
 353 *
 354 * Returns zero on success, or a negative errno if a failure occurred.
 355 * @xdr is advanced to the next position in the stream.
 356 *
 357 * Only a single @pos value is currently supported.
 358 */
 359static noinline int
 360rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 361                         struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype)
 362{
 363        struct xdr_stream *xdr = &req->rl_stream;
 364        struct rpcrdma_mr_seg *seg;
 365        struct rpcrdma_mr *mr;
 366        unsigned int pos;
 367        int nsegs;
 368
 369        pos = rqst->rq_snd_buf.head[0].iov_len;
 370        if (rtype == rpcrdma_areadch)
 371                pos = 0;
 372        seg = req->rl_segments;
 373        nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos,
 374                                     rtype, seg);
 375        if (nsegs < 0)
 376                return nsegs;
 377
 378        do {
 379                seg = frwr_map(r_xprt, seg, nsegs, false, rqst->rq_xid, &mr);
 380                if (IS_ERR(seg))
 381                        return PTR_ERR(seg);
 382                rpcrdma_mr_push(mr, &req->rl_registered);
 383
 384                if (encode_read_segment(xdr, mr, pos) < 0)
 385                        return -EMSGSIZE;
 386
 387                trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, nsegs);
 388                r_xprt->rx_stats.read_chunk_count++;
 389                nsegs -= mr->mr_nents;
 390        } while (nsegs);
 391
 392        return 0;
 393}
 394
 395/* Register and XDR encode the Write list. Supports encoding a list
 396 * containing one array of plain segments that belong to a single
 397 * write chunk.
 398 *
 399 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
 400 *
 401 *  Write chunklist (a list of (one) counted array):
 402 *   N elements:
 403 *    1 - N - HLOO - HLOO - ... - HLOO - 0
 404 *
 405 * Returns zero on success, or a negative errno if a failure occurred.
 406 * @xdr is advanced to the next position in the stream.
 407 *
 408 * Only a single Write chunk is currently supported.
 409 */
 410static noinline int
 411rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 412                          struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
 413{
 414        struct xdr_stream *xdr = &req->rl_stream;
 415        struct rpcrdma_mr_seg *seg;
 416        struct rpcrdma_mr *mr;
 417        int nsegs, nchunks;
 418        __be32 *segcount;
 419
 420        seg = req->rl_segments;
 421        nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
 422                                     rqst->rq_rcv_buf.head[0].iov_len,
 423                                     wtype, seg);
 424        if (nsegs < 0)
 425                return nsegs;
 426
 427        if (encode_item_present(xdr) < 0)
 428                return -EMSGSIZE;
 429        segcount = xdr_reserve_space(xdr, sizeof(*segcount));
 430        if (unlikely(!segcount))
 431                return -EMSGSIZE;
 432        /* Actual value encoded below */
 433
 434        nchunks = 0;
 435        do {
 436                seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr);
 437                if (IS_ERR(seg))
 438                        return PTR_ERR(seg);
 439                rpcrdma_mr_push(mr, &req->rl_registered);
 440
 441                if (encode_rdma_segment(xdr, mr) < 0)
 442                        return -EMSGSIZE;
 443
 444                trace_xprtrdma_chunk_write(rqst->rq_task, mr, nsegs);
 445                r_xprt->rx_stats.write_chunk_count++;
 446                r_xprt->rx_stats.total_rdma_request += mr->mr_length;
 447                nchunks++;
 448                nsegs -= mr->mr_nents;
 449        } while (nsegs);
 450
 451        /* Update count of segments in this Write chunk */
 452        *segcount = cpu_to_be32(nchunks);
 453
 454        return 0;
 455}
 456
 457/* Register and XDR encode the Reply chunk. Supports encoding an array
 458 * of plain segments that belong to a single write (reply) chunk.
 459 *
 460 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
 461 *
 462 *  Reply chunk (a counted array):
 463 *   N elements:
 464 *    1 - N - HLOO - HLOO - ... - HLOO
 465 *
 466 * Returns zero on success, or a negative errno if a failure occurred.
 467 * @xdr is advanced to the next position in the stream.
 468 */
 469static noinline int
 470rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 471                           struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
 472{
 473        struct xdr_stream *xdr = &req->rl_stream;
 474        struct rpcrdma_mr_seg *seg;
 475        struct rpcrdma_mr *mr;
 476        int nsegs, nchunks;
 477        __be32 *segcount;
 478
 479        seg = req->rl_segments;
 480        nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
 481        if (nsegs < 0)
 482                return nsegs;
 483
 484        if (encode_item_present(xdr) < 0)
 485                return -EMSGSIZE;
 486        segcount = xdr_reserve_space(xdr, sizeof(*segcount));
 487        if (unlikely(!segcount))
 488                return -EMSGSIZE;
 489        /* Actual value encoded below */
 490
 491        nchunks = 0;
 492        do {
 493                seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr);
 494                if (IS_ERR(seg))
 495                        return PTR_ERR(seg);
 496                rpcrdma_mr_push(mr, &req->rl_registered);
 497
 498                if (encode_rdma_segment(xdr, mr) < 0)
 499                        return -EMSGSIZE;
 500
 501                trace_xprtrdma_chunk_reply(rqst->rq_task, mr, nsegs);
 502                r_xprt->rx_stats.reply_chunk_count++;
 503                r_xprt->rx_stats.total_rdma_request += mr->mr_length;
 504                nchunks++;
 505                nsegs -= mr->mr_nents;
 506        } while (nsegs);
 507
 508        /* Update count of segments in the Reply chunk */
 509        *segcount = cpu_to_be32(nchunks);
 510
 511        return 0;
 512}
 513
 514/**
 515 * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
 516 * @sc: sendctx containing SGEs to unmap
 517 *
 518 */
 519void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
 520{
 521        struct ib_sge *sge;
 522
 523        /* The first two SGEs contain the transport header and
 524         * the inline buffer. These are always left mapped so
 525         * they can be cheaply re-used.
 526         */
 527        for (sge = &sc->sc_sges[2]; sc->sc_unmap_count;
 528             ++sge, --sc->sc_unmap_count)
 529                ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length,
 530                                  DMA_TO_DEVICE);
 531
 532        if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES,
 533                               &sc->sc_req->rl_flags))
 534                wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
 535}
 536
 537/* Prepare an SGE for the RPC-over-RDMA transport header.
 538 */
 539static bool rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt,
 540                                    struct rpcrdma_req *req, u32 len)
 541{
 542        struct rpcrdma_sendctx *sc = req->rl_sendctx;
 543        struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
 544        struct ib_sge *sge = sc->sc_sges;
 545
 546        if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
 547                goto out_regbuf;
 548        sge->addr = rdmab_addr(rb);
 549        sge->length = len;
 550        sge->lkey = rdmab_lkey(rb);
 551
 552        ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
 553                                      DMA_TO_DEVICE);
 554        sc->sc_wr.num_sge++;
 555        return true;
 556
 557out_regbuf:
 558        pr_err("rpcrdma: failed to DMA map a Send buffer\n");
 559        return false;
 560}
 561
 562/* Prepare the Send SGEs. The head and tail iovec, and each entry
 563 * in the page list, gets its own SGE.
 564 */
 565static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt,
 566                                     struct rpcrdma_req *req,
 567                                     struct xdr_buf *xdr,
 568                                     enum rpcrdma_chunktype rtype)
 569{
 570        struct rpcrdma_sendctx *sc = req->rl_sendctx;
 571        unsigned int sge_no, page_base, len, remaining;
 572        struct rpcrdma_regbuf *rb = req->rl_sendbuf;
 573        struct ib_sge *sge = sc->sc_sges;
 574        struct page *page, **ppages;
 575
 576        /* The head iovec is straightforward, as it is already
 577         * DMA-mapped. Sync the content that has changed.
 578         */
 579        if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
 580                goto out_regbuf;
 581        sc->sc_device = rdmab_device(rb);
 582        sge_no = 1;
 583        sge[sge_no].addr = rdmab_addr(rb);
 584        sge[sge_no].length = xdr->head[0].iov_len;
 585        sge[sge_no].lkey = rdmab_lkey(rb);
 586        ib_dma_sync_single_for_device(rdmab_device(rb), sge[sge_no].addr,
 587                                      sge[sge_no].length, DMA_TO_DEVICE);
 588
 589        /* If there is a Read chunk, the page list is being handled
 590         * via explicit RDMA, and thus is skipped here. However, the
 591         * tail iovec may include an XDR pad for the page list, as
 592         * well as additional content, and may not reside in the
 593         * same page as the head iovec.
 594         */
 595        if (rtype == rpcrdma_readch) {
 596                len = xdr->tail[0].iov_len;
 597
 598                /* Do not include the tail if it is only an XDR pad */
 599                if (len < 4)
 600                        goto out;
 601
 602                page = virt_to_page(xdr->tail[0].iov_base);
 603                page_base = offset_in_page(xdr->tail[0].iov_base);
 604
 605                /* If the content in the page list is an odd length,
 606                 * xdr_write_pages() has added a pad at the beginning
 607                 * of the tail iovec. Force the tail's non-pad content
 608                 * to land at the next XDR position in the Send message.
 609                 */
 610                page_base += len & 3;
 611                len -= len & 3;
 612                goto map_tail;
 613        }
 614
 615        /* If there is a page list present, temporarily DMA map
 616         * and prepare an SGE for each page to be sent.
 617         */
 618        if (xdr->page_len) {
 619                ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
 620                page_base = offset_in_page(xdr->page_base);
 621                remaining = xdr->page_len;
 622                while (remaining) {
 623                        sge_no++;
 624                        if (sge_no > RPCRDMA_MAX_SEND_SGES - 2)
 625                                goto out_mapping_overflow;
 626
 627                        len = min_t(u32, PAGE_SIZE - page_base, remaining);
 628                        sge[sge_no].addr =
 629                                ib_dma_map_page(rdmab_device(rb), *ppages,
 630                                                page_base, len, DMA_TO_DEVICE);
 631                        if (ib_dma_mapping_error(rdmab_device(rb),
 632                                                 sge[sge_no].addr))
 633                                goto out_mapping_err;
 634                        sge[sge_no].length = len;
 635                        sge[sge_no].lkey = rdmab_lkey(rb);
 636
 637                        sc->sc_unmap_count++;
 638                        ppages++;
 639                        remaining -= len;
 640                        page_base = 0;
 641                }
 642        }
 643
 644        /* The tail iovec is not always constructed in the same
 645         * page where the head iovec resides (see, for example,
 646         * gss_wrap_req_priv). To neatly accommodate that case,
 647         * DMA map it separately.
 648         */
 649        if (xdr->tail[0].iov_len) {
 650                page = virt_to_page(xdr->tail[0].iov_base);
 651                page_base = offset_in_page(xdr->tail[0].iov_base);
 652                len = xdr->tail[0].iov_len;
 653
 654map_tail:
 655                sge_no++;
 656                sge[sge_no].addr =
 657                        ib_dma_map_page(rdmab_device(rb), page, page_base, len,
 658                                        DMA_TO_DEVICE);
 659                if (ib_dma_mapping_error(rdmab_device(rb), sge[sge_no].addr))
 660                        goto out_mapping_err;
 661                sge[sge_no].length = len;
 662                sge[sge_no].lkey = rdmab_lkey(rb);
 663                sc->sc_unmap_count++;
 664        }
 665
 666out:
 667        sc->sc_wr.num_sge += sge_no;
 668        if (sc->sc_unmap_count)
 669                __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
 670        return true;
 671
 672out_regbuf:
 673        pr_err("rpcrdma: failed to DMA map a Send buffer\n");
 674        return false;
 675
 676out_mapping_overflow:
 677        rpcrdma_sendctx_unmap(sc);
 678        pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
 679        return false;
 680
 681out_mapping_err:
 682        rpcrdma_sendctx_unmap(sc);
 683        trace_xprtrdma_dma_maperr(sge[sge_no].addr);
 684        return false;
 685}
 686
 687/**
 688 * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR
 689 * @r_xprt: controlling transport
 690 * @req: context of RPC Call being marshalled
 691 * @hdrlen: size of transport header, in bytes
 692 * @xdr: xdr_buf containing RPC Call
 693 * @rtype: chunk type being encoded
 694 *
 695 * Returns 0 on success; otherwise a negative errno is returned.
 696 */
 697int
 698rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
 699                          struct rpcrdma_req *req, u32 hdrlen,
 700                          struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
 701{
 702        req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
 703        if (!req->rl_sendctx)
 704                return -EAGAIN;
 705        req->rl_sendctx->sc_wr.num_sge = 0;
 706        req->rl_sendctx->sc_unmap_count = 0;
 707        req->rl_sendctx->sc_req = req;
 708        __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
 709
 710        if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
 711                return -EIO;
 712
 713        if (rtype != rpcrdma_areadch)
 714                if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype))
 715                        return -EIO;
 716
 717        return 0;
 718}
 719
 720/**
 721 * rpcrdma_marshal_req - Marshal and send one RPC request
 722 * @r_xprt: controlling transport
 723 * @rqst: RPC request to be marshaled
 724 *
 725 * For the RPC in "rqst", this function:
 726 *  - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG)
 727 *  - Registers Read, Write, and Reply chunks
 728 *  - Constructs the transport header
 729 *  - Posts a Send WR to send the transport header and request
 730 *
 731 * Returns:
 732 *      %0 if the RPC was sent successfully,
 733 *      %-ENOTCONN if the connection was lost,
 734 *      %-EAGAIN if the caller should call again with the same arguments,
 735 *      %-ENOBUFS if the caller should call again after a delay,
 736 *      %-EMSGSIZE if the transport header is too small,
 737 *      %-EIO if a permanent problem occurred while marshaling.
 738 */
 739int
 740rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
 741{
 742        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 743        struct xdr_stream *xdr = &req->rl_stream;
 744        enum rpcrdma_chunktype rtype, wtype;
 745        bool ddp_allowed;
 746        __be32 *p;
 747        int ret;
 748
 749        rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
 750        xdr_init_encode(xdr, &req->rl_hdrbuf, rdmab_data(req->rl_rdmabuf),
 751                        rqst);
 752
 753        /* Fixed header fields */
 754        ret = -EMSGSIZE;
 755        p = xdr_reserve_space(xdr, 4 * sizeof(*p));
 756        if (!p)
 757                goto out_err;
 758        *p++ = rqst->rq_xid;
 759        *p++ = rpcrdma_version;
 760        *p++ = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
 761
 762        /* When the ULP employs a GSS flavor that guarantees integrity
 763         * or privacy, direct data placement of individual data items
 764         * is not allowed.
 765         */
 766        ddp_allowed = !(rqst->rq_cred->cr_auth->au_flags &
 767                                                RPCAUTH_AUTH_DATATOUCH);
 768
 769        /*
 770         * Chunks needed for results?
 771         *
 772         * o If the expected result is under the inline threshold, all ops
 773         *   return as inline.
 774         * o Large read ops return data as write chunk(s), header as
 775         *   inline.
 776         * o Large non-read ops return as a single reply chunk.
 777         */
 778        if (rpcrdma_results_inline(r_xprt, rqst))
 779                wtype = rpcrdma_noch;
 780        else if ((ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) &&
 781                 rpcrdma_nonpayload_inline(r_xprt, rqst))
 782                wtype = rpcrdma_writech;
 783        else
 784                wtype = rpcrdma_replych;
 785
 786        /*
 787         * Chunks needed for arguments?
 788         *
 789         * o If the total request is under the inline threshold, all ops
 790         *   are sent as inline.
 791         * o Large write ops transmit data as read chunk(s), header as
 792         *   inline.
 793         * o Large non-write ops are sent with the entire message as a
 794         *   single read chunk (protocol 0-position special case).
 795         *
 796         * This assumes that the upper layer does not present a request
 797         * that both has a data payload, and whose non-data arguments
 798         * by themselves are larger than the inline threshold.
 799         */
 800        if (rpcrdma_args_inline(r_xprt, rqst)) {
 801                *p++ = rdma_msg;
 802                rtype = rpcrdma_noch;
 803        } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
 804                *p++ = rdma_msg;
 805                rtype = rpcrdma_readch;
 806        } else {
 807                r_xprt->rx_stats.nomsg_call_count++;
 808                *p++ = rdma_nomsg;
 809                rtype = rpcrdma_areadch;
 810        }
 811
 812        /* If this is a retransmit, discard previously registered
 813         * chunks. Very likely the connection has been replaced,
 814         * so these registrations are invalid and unusable.
 815         */
 816        while (unlikely(!list_empty(&req->rl_registered))) {
 817                struct rpcrdma_mr *mr;
 818
 819                mr = rpcrdma_mr_pop(&req->rl_registered);
 820                rpcrdma_mr_recycle(mr);
 821        }
 822
 823        /* This implementation supports the following combinations
 824         * of chunk lists in one RPC-over-RDMA Call message:
 825         *
 826         *   - Read list
 827         *   - Write list
 828         *   - Reply chunk
 829         *   - Read list + Reply chunk
 830         *
 831         * It might not yet support the following combinations:
 832         *
 833         *   - Read list + Write list
 834         *
 835         * It does not support the following combinations:
 836         *
 837         *   - Write list + Reply chunk
 838         *   - Read list + Write list + Reply chunk
 839         *
 840         * This implementation supports only a single chunk in each
 841         * Read or Write list. Thus for example the client cannot
 842         * send a Call message with a Position Zero Read chunk and a
 843         * regular Read chunk at the same time.
 844         */
 845        if (rtype != rpcrdma_noch) {
 846                ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
 847                if (ret)
 848                        goto out_err;
 849        }
 850        ret = encode_item_not_present(xdr);
 851        if (ret)
 852                goto out_err;
 853
 854        if (wtype == rpcrdma_writech) {
 855                ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
 856                if (ret)
 857                        goto out_err;
 858        }
 859        ret = encode_item_not_present(xdr);
 860        if (ret)
 861                goto out_err;
 862
 863        if (wtype != rpcrdma_replych)
 864                ret = encode_item_not_present(xdr);
 865        else
 866                ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
 867        if (ret)
 868                goto out_err;
 869
 870        trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype);
 871
 872        ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
 873                                        &rqst->rq_snd_buf, rtype);
 874        if (ret)
 875                goto out_err;
 876        return 0;
 877
 878out_err:
 879        trace_xprtrdma_marshal_failed(rqst, ret);
 880        switch (ret) {
 881        case -EAGAIN:
 882                xprt_wait_for_buffer_space(rqst->rq_xprt);
 883                break;
 884        case -ENOBUFS:
 885                break;
 886        default:
 887                r_xprt->rx_stats.failed_marshal_count++;
 888        }
 889        return ret;
 890}
 891
 892/**
 893 * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
 894 * @rqst: controlling RPC request
 895 * @srcp: points to RPC message payload in receive buffer
 896 * @copy_len: remaining length of receive buffer content
 897 * @pad: Write chunk pad bytes needed (zero for pure inline)
 898 *
 899 * The upper layer has set the maximum number of bytes it can
 900 * receive in each component of rq_rcv_buf. These values are set in
 901 * the head.iov_len, page_len, tail.iov_len, and buflen fields.
 902 *
 903 * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in
 904 * many cases this function simply updates iov_base pointers in
 905 * rq_rcv_buf to point directly to the received reply data, to
 906 * avoid copying reply data.
 907 *
 908 * Returns the count of bytes which had to be memcopied.
 909 */
 910static unsigned long
 911rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
 912{
 913        unsigned long fixup_copy_count;
 914        int i, npages, curlen;
 915        char *destp;
 916        struct page **ppages;
 917        int page_base;
 918
 919        /* The head iovec is redirected to the RPC reply message
 920         * in the receive buffer, to avoid a memcopy.
 921         */
 922        rqst->rq_rcv_buf.head[0].iov_base = srcp;
 923        rqst->rq_private_buf.head[0].iov_base = srcp;
 924
 925        /* The contents of the receive buffer that follow
 926         * head.iov_len bytes are copied into the page list.
 927         */
 928        curlen = rqst->rq_rcv_buf.head[0].iov_len;
 929        if (curlen > copy_len)
 930                curlen = copy_len;
 931        trace_xprtrdma_fixup(rqst, copy_len, curlen);
 932        srcp += curlen;
 933        copy_len -= curlen;
 934
 935        ppages = rqst->rq_rcv_buf.pages +
 936                (rqst->rq_rcv_buf.page_base >> PAGE_SHIFT);
 937        page_base = offset_in_page(rqst->rq_rcv_buf.page_base);
 938        fixup_copy_count = 0;
 939        if (copy_len && rqst->rq_rcv_buf.page_len) {
 940                int pagelist_len;
 941
 942                pagelist_len = rqst->rq_rcv_buf.page_len;
 943                if (pagelist_len > copy_len)
 944                        pagelist_len = copy_len;
 945                npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT;
 946                for (i = 0; i < npages; i++) {
 947                        curlen = PAGE_SIZE - page_base;
 948                        if (curlen > pagelist_len)
 949                                curlen = pagelist_len;
 950
 951                        trace_xprtrdma_fixup_pg(rqst, i, srcp,
 952                                                copy_len, curlen);
 953                        destp = kmap_atomic(ppages[i]);
 954                        memcpy(destp + page_base, srcp, curlen);
 955                        flush_dcache_page(ppages[i]);
 956                        kunmap_atomic(destp);
 957                        srcp += curlen;
 958                        copy_len -= curlen;
 959                        fixup_copy_count += curlen;
 960                        pagelist_len -= curlen;
 961                        if (!pagelist_len)
 962                                break;
 963                        page_base = 0;
 964                }
 965
 966                /* Implicit padding for the last segment in a Write
 967                 * chunk is inserted inline at the front of the tail
 968                 * iovec. The upper layer ignores the content of
 969                 * the pad. Simply ensure inline content in the tail
 970                 * that follows the Write chunk is properly aligned.
 971                 */
 972                if (pad)
 973                        srcp -= pad;
 974        }
 975
 976        /* The tail iovec is redirected to the remaining data
 977         * in the receive buffer, to avoid a memcopy.
 978         */
 979        if (copy_len || pad) {
 980                rqst->rq_rcv_buf.tail[0].iov_base = srcp;
 981                rqst->rq_private_buf.tail[0].iov_base = srcp;
 982        }
 983
 984        return fixup_copy_count;
 985}
 986
 987/* By convention, backchannel calls arrive via rdma_msg type
 988 * messages, and never populate the chunk lists. This makes
 989 * the RPC/RDMA header small and fixed in size, so it is
 990 * straightforward to check the RPC header's direction field.
 991 */
 992static bool
 993rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
 994#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 995{
 996        struct xdr_stream *xdr = &rep->rr_stream;
 997        __be32 *p;
 998
 999        if (rep->rr_proc != rdma_msg)
1000                return false;
1001
1002        /* Peek at stream contents without advancing. */
1003        p = xdr_inline_decode(xdr, 0);
1004
1005        /* Chunk lists */
1006        if (*p++ != xdr_zero)
1007                return false;
1008        if (*p++ != xdr_zero)
1009                return false;
1010        if (*p++ != xdr_zero)
1011                return false;
1012
1013        /* RPC header */
1014        if (*p++ != rep->rr_xid)
1015                return false;
1016        if (*p != cpu_to_be32(RPC_CALL))
1017                return false;
1018
1019        /* Now that we are sure this is a backchannel call,
1020         * advance to the RPC header.
1021         */
1022        p = xdr_inline_decode(xdr, 3 * sizeof(*p));
1023        if (unlikely(!p))
1024                goto out_short;
1025
1026        rpcrdma_bc_receive_call(r_xprt, rep);
1027        return true;
1028
1029out_short:
1030        pr_warn("RPC/RDMA short backward direction call\n");
1031        return true;
1032}
1033#else   /* CONFIG_SUNRPC_BACKCHANNEL */
1034{
1035        return false;
1036}
1037#endif  /* CONFIG_SUNRPC_BACKCHANNEL */
1038
1039static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length)
1040{
1041        u32 handle;
1042        u64 offset;
1043        __be32 *p;
1044
1045        p = xdr_inline_decode(xdr, 4 * sizeof(*p));
1046        if (unlikely(!p))
1047                return -EIO;
1048
1049        handle = be32_to_cpup(p++);
1050        *length = be32_to_cpup(p++);
1051        xdr_decode_hyper(p, &offset);
1052
1053        trace_xprtrdma_decode_seg(handle, *length, offset);
1054        return 0;
1055}
1056
1057static int decode_write_chunk(struct xdr_stream *xdr, u32 *length)
1058{
1059        u32 segcount, seglength;
1060        __be32 *p;
1061
1062        p = xdr_inline_decode(xdr, sizeof(*p));
1063        if (unlikely(!p))
1064                return -EIO;
1065
1066        *length = 0;
1067        segcount = be32_to_cpup(p);
1068        while (segcount--) {
1069                if (decode_rdma_segment(xdr, &seglength))
1070                        return -EIO;
1071                *length += seglength;
1072        }
1073
1074        return 0;
1075}
1076
1077/* In RPC-over-RDMA Version One replies, a Read list is never
1078 * expected. This decoder is a stub that returns an error if
1079 * a Read list is present.
1080 */
1081static int decode_read_list(struct xdr_stream *xdr)
1082{
1083        __be32 *p;
1084
1085        p = xdr_inline_decode(xdr, sizeof(*p));
1086        if (unlikely(!p))
1087                return -EIO;
1088        if (unlikely(*p != xdr_zero))
1089                return -EIO;
1090        return 0;
1091}
1092
1093/* Supports only one Write chunk in the Write list
1094 */
1095static int decode_write_list(struct xdr_stream *xdr, u32 *length)
1096{
1097        u32 chunklen;
1098        bool first;
1099        __be32 *p;
1100
1101        *length = 0;
1102        first = true;
1103        do {
1104                p = xdr_inline_decode(xdr, sizeof(*p));
1105                if (unlikely(!p))
1106                        return -EIO;
1107                if (*p == xdr_zero)
1108                        break;
1109                if (!first)
1110                        return -EIO;
1111
1112                if (decode_write_chunk(xdr, &chunklen))
1113                        return -EIO;
1114                *length += chunklen;
1115                first = false;
1116        } while (true);
1117        return 0;
1118}
1119
1120static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length)
1121{
1122        __be32 *p;
1123
1124        p = xdr_inline_decode(xdr, sizeof(*p));
1125        if (unlikely(!p))
1126                return -EIO;
1127
1128        *length = 0;
1129        if (*p != xdr_zero)
1130                if (decode_write_chunk(xdr, length))
1131                        return -EIO;
1132        return 0;
1133}
1134
1135static int
1136rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1137                   struct rpc_rqst *rqst)
1138{
1139        struct xdr_stream *xdr = &rep->rr_stream;
1140        u32 writelist, replychunk, rpclen;
1141        char *base;
1142
1143        /* Decode the chunk lists */
1144        if (decode_read_list(xdr))
1145                return -EIO;
1146        if (decode_write_list(xdr, &writelist))
1147                return -EIO;
1148        if (decode_reply_chunk(xdr, &replychunk))
1149                return -EIO;
1150
1151        /* RDMA_MSG sanity checks */
1152        if (unlikely(replychunk))
1153                return -EIO;
1154
1155        /* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */
1156        base = (char *)xdr_inline_decode(xdr, 0);
1157        rpclen = xdr_stream_remaining(xdr);
1158        r_xprt->rx_stats.fixup_copy_count +=
1159                rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3);
1160
1161        r_xprt->rx_stats.total_rdma_reply += writelist;
1162        return rpclen + xdr_align_size(writelist);
1163}
1164
1165static noinline int
1166rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
1167{
1168        struct xdr_stream *xdr = &rep->rr_stream;
1169        u32 writelist, replychunk;
1170
1171        /* Decode the chunk lists */
1172        if (decode_read_list(xdr))
1173                return -EIO;
1174        if (decode_write_list(xdr, &writelist))
1175                return -EIO;
1176        if (decode_reply_chunk(xdr, &replychunk))
1177                return -EIO;
1178
1179        /* RDMA_NOMSG sanity checks */
1180        if (unlikely(writelist))
1181                return -EIO;
1182        if (unlikely(!replychunk))
1183                return -EIO;
1184
1185        /* Reply chunk buffer already is the reply vector */
1186        r_xprt->rx_stats.total_rdma_reply += replychunk;
1187        return replychunk;
1188}
1189
1190static noinline int
1191rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1192                     struct rpc_rqst *rqst)
1193{
1194        struct xdr_stream *xdr = &rep->rr_stream;
1195        __be32 *p;
1196
1197        p = xdr_inline_decode(xdr, sizeof(*p));
1198        if (unlikely(!p))
1199                return -EIO;
1200
1201        switch (*p) {
1202        case err_vers:
1203                p = xdr_inline_decode(xdr, 2 * sizeof(*p));
1204                if (!p)
1205                        break;
1206                dprintk("RPC:       %s: server reports "
1207                        "version error (%u-%u), xid %08x\n", __func__,
1208                        be32_to_cpup(p), be32_to_cpu(*(p + 1)),
1209                        be32_to_cpu(rep->rr_xid));
1210                break;
1211        case err_chunk:
1212                dprintk("RPC:       %s: server reports "
1213                        "header decoding error, xid %08x\n", __func__,
1214                        be32_to_cpu(rep->rr_xid));
1215                break;
1216        default:
1217                dprintk("RPC:       %s: server reports "
1218                        "unrecognized error %d, xid %08x\n", __func__,
1219                        be32_to_cpup(p), be32_to_cpu(rep->rr_xid));
1220        }
1221
1222        r_xprt->rx_stats.bad_reply_count++;
1223        return -EREMOTEIO;
1224}
1225
1226/* Perform XID lookup, reconstruction of the RPC reply, and
1227 * RPC completion while holding the transport lock to ensure
1228 * the rep, rqst, and rq_task pointers remain stable.
1229 */
1230void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
1231{
1232        struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1233        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1234        struct rpc_rqst *rqst = rep->rr_rqst;
1235        int status;
1236
1237        xprt->reestablish_timeout = 0;
1238
1239        switch (rep->rr_proc) {
1240        case rdma_msg:
1241                status = rpcrdma_decode_msg(r_xprt, rep, rqst);
1242                break;
1243        case rdma_nomsg:
1244                status = rpcrdma_decode_nomsg(r_xprt, rep);
1245                break;
1246        case rdma_error:
1247                status = rpcrdma_decode_error(r_xprt, rep, rqst);
1248                break;
1249        default:
1250                status = -EIO;
1251        }
1252        if (status < 0)
1253                goto out_badheader;
1254
1255out:
1256        spin_lock(&xprt->queue_lock);
1257        xprt_complete_rqst(rqst->rq_task, status);
1258        xprt_unpin_rqst(rqst);
1259        spin_unlock(&xprt->queue_lock);
1260        return;
1261
1262/* If the incoming reply terminated a pending RPC, the next
1263 * RPC call will post a replacement receive buffer as it is
1264 * being marshaled.
1265 */
1266out_badheader:
1267        trace_xprtrdma_reply_hdr(rep);
1268        r_xprt->rx_stats.bad_reply_count++;
1269        goto out;
1270}
1271
1272void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
1273{
1274        /* Invalidate and unmap the data payloads before waking
1275         * the waiting application. This guarantees the memory
1276         * regions are properly fenced from the server before the
1277         * application accesses the data. It also ensures proper
1278         * send flow control: waking the next RPC waits until this
1279         * RPC has relinquished all its Send Queue entries.
1280         */
1281        if (!list_empty(&req->rl_registered))
1282                frwr_unmap_sync(r_xprt, &req->rl_registered);
1283
1284        /* Ensure that any DMA mapped pages associated with
1285         * the Send of the RPC Call have been unmapped before
1286         * allowing the RPC to complete. This protects argument
1287         * memory not controlled by the RPC client from being
1288         * re-used before we're done with it.
1289         */
1290        if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
1291                r_xprt->rx_stats.reply_waits_for_send++;
1292                out_of_line_wait_on_bit(&req->rl_flags,
1293                                        RPCRDMA_REQ_F_TX_RESOURCES,
1294                                        bit_wait,
1295                                        TASK_UNINTERRUPTIBLE);
1296        }
1297}
1298
1299/* Reply handling runs in the poll worker thread. Anything that
1300 * might wait is deferred to a separate workqueue.
1301 */
1302void rpcrdma_deferred_completion(struct work_struct *work)
1303{
1304        struct rpcrdma_rep *rep =
1305                        container_of(work, struct rpcrdma_rep, rr_work);
1306        struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
1307        struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1308
1309        trace_xprtrdma_defer_cmp(rep);
1310        if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
1311                frwr_reminv(rep, &req->rl_registered);
1312        rpcrdma_release_rqst(r_xprt, req);
1313        rpcrdma_complete_rqst(rep);
1314}
1315
1316/* Process received RPC/RDMA messages.
1317 *
1318 * Errors must result in the RPC task either being awakened, or
1319 * allowed to timeout, to discover the errors at that time.
1320 */
1321void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
1322{
1323        struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1324        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1325        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1326        struct rpcrdma_req *req;
1327        struct rpc_rqst *rqst;
1328        u32 credits;
1329        __be32 *p;
1330
1331        /* Fixed transport header fields */
1332        xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
1333                        rep->rr_hdrbuf.head[0].iov_base, NULL);
1334        p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p));
1335        if (unlikely(!p))
1336                goto out_shortreply;
1337        rep->rr_xid = *p++;
1338        rep->rr_vers = *p++;
1339        credits = be32_to_cpu(*p++);
1340        rep->rr_proc = *p++;
1341
1342        if (rep->rr_vers != rpcrdma_version)
1343                goto out_badversion;
1344
1345        if (rpcrdma_is_bcall(r_xprt, rep))
1346                return;
1347
1348        /* Match incoming rpcrdma_rep to an rpcrdma_req to
1349         * get context for handling any incoming chunks.
1350         */
1351        spin_lock(&xprt->queue_lock);
1352        rqst = xprt_lookup_rqst(xprt, rep->rr_xid);
1353        if (!rqst)
1354                goto out_norqst;
1355        xprt_pin_rqst(rqst);
1356        spin_unlock(&xprt->queue_lock);
1357
1358        if (credits == 0)
1359                credits = 1;    /* don't deadlock */
1360        else if (credits > buf->rb_max_requests)
1361                credits = buf->rb_max_requests;
1362        if (buf->rb_credits != credits) {
1363                spin_lock_bh(&xprt->transport_lock);
1364                buf->rb_credits = credits;
1365                xprt->cwnd = credits << RPC_CWNDSHIFT;
1366                spin_unlock_bh(&xprt->transport_lock);
1367        }
1368
1369        req = rpcr_to_rdmar(rqst);
1370        if (req->rl_reply) {
1371                trace_xprtrdma_leaked_rep(rqst, req->rl_reply);
1372                rpcrdma_recv_buffer_put(req->rl_reply);
1373        }
1374        req->rl_reply = rep;
1375        rep->rr_rqst = rqst;
1376        clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
1377
1378        trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
1379        queue_work(buf->rb_completion_wq, &rep->rr_work);
1380        return;
1381
1382out_badversion:
1383        trace_xprtrdma_reply_vers(rep);
1384        goto out;
1385
1386out_norqst:
1387        spin_unlock(&xprt->queue_lock);
1388        trace_xprtrdma_reply_rqst(rep);
1389        goto out;
1390
1391out_shortreply:
1392        trace_xprtrdma_reply_short(rep);
1393
1394out:
1395        rpcrdma_recv_buffer_put(rep);
1396}
1397