linux/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
   3 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
   4 *
   5 * This software is available to you under a choice of one of two
   6 * licenses.  You may choose to be licensed under the terms of the GNU
   7 * General Public License (GPL) Version 2, available from the file
   8 * COPYING in the main directory of this source tree, or the BSD-type
   9 * license below:
  10 *
  11 * Redistribution and use in source and binary forms, with or without
  12 * modification, are permitted provided that the following conditions
  13 * are met:
  14 *
  15 *      Redistributions of source code must retain the above copyright
  16 *      notice, this list of conditions and the following disclaimer.
  17 *
  18 *      Redistributions in binary form must reproduce the above
  19 *      copyright notice, this list of conditions and the following
  20 *      disclaimer in the documentation and/or other materials provided
  21 *      with the distribution.
  22 *
  23 *      Neither the name of the Network Appliance, Inc. nor the names of
  24 *      its contributors may be used to endorse or promote products
  25 *      derived from this software without specific prior written
  26 *      permission.
  27 *
  28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  39 *
  40 * Author: Tom Tucker <tom@opengridcomputing.com>
  41 */
  42
  43#include <linux/sunrpc/debug.h>
  44#include <linux/sunrpc/rpc_rdma.h>
  45#include <linux/spinlock.h>
  46#include <asm/unaligned.h>
  47#include <rdma/ib_verbs.h>
  48#include <rdma/rdma_cm.h>
  49#include <linux/sunrpc/svc_rdma.h>
  50
  51#define RPCDBG_FACILITY RPCDBG_SVCXPRT
  52
  53/*
  54 * Replace the pages in the rq_argpages array with the pages from the SGE in
  55 * the RDMA_RECV completion. The SGL should contain full pages up until the
  56 * last one.
  57 */
  58static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
  59                               struct svc_rdma_op_ctxt *ctxt,
  60                               u32 byte_count)
  61{
  62        struct rpcrdma_msg *rmsgp;
  63        struct page *page;
  64        u32 bc;
  65        int sge_no;
  66
  67        /* Swap the page in the SGE with the page in argpages */
  68        page = ctxt->pages[0];
  69        put_page(rqstp->rq_pages[0]);
  70        rqstp->rq_pages[0] = page;
  71
  72        /* Set up the XDR head */
  73        rqstp->rq_arg.head[0].iov_base = page_address(page);
  74        rqstp->rq_arg.head[0].iov_len =
  75                min_t(size_t, byte_count, ctxt->sge[0].length);
  76        rqstp->rq_arg.len = byte_count;
  77        rqstp->rq_arg.buflen = byte_count;
  78
  79        /* Compute bytes past head in the SGL */
  80        bc = byte_count - rqstp->rq_arg.head[0].iov_len;
  81
  82        /* If data remains, store it in the pagelist */
  83        rqstp->rq_arg.page_len = bc;
  84        rqstp->rq_arg.page_base = 0;
  85
  86        /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
  87        rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
  88        if (be32_to_cpu(rmsgp->rm_type) == RDMA_NOMSG)
  89                rqstp->rq_arg.pages = &rqstp->rq_pages[0];
  90        else
  91                rqstp->rq_arg.pages = &rqstp->rq_pages[1];
  92
  93        sge_no = 1;
  94        while (bc && sge_no < ctxt->count) {
  95                page = ctxt->pages[sge_no];
  96                put_page(rqstp->rq_pages[sge_no]);
  97                rqstp->rq_pages[sge_no] = page;
  98                bc -= min_t(u32, bc, ctxt->sge[sge_no].length);
  99                rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
 100                sge_no++;
 101        }
 102        rqstp->rq_respages = &rqstp->rq_pages[sge_no];
 103        rqstp->rq_next_page = rqstp->rq_respages + 1;
 104
 105        /* If not all pages were used from the SGL, free the remaining ones */
 106        bc = sge_no;
 107        while (sge_no < ctxt->count) {
 108                page = ctxt->pages[sge_no++];
 109                put_page(page);
 110        }
 111        ctxt->count = bc;
 112
 113        /* Set up tail */
 114        rqstp->rq_arg.tail[0].iov_base = NULL;
 115        rqstp->rq_arg.tail[0].iov_len = 0;
 116}
 117
 118static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
 119{
 120        if (rdma_node_get_transport(xprt->sc_cm_id->device->node_type) ==
 121             RDMA_TRANSPORT_IWARP)
 122                return 1;
 123        else
 124                return min_t(int, sge_count, xprt->sc_max_sge);
 125}
 126
 127/* Issue an RDMA_READ using the local lkey to map the data sink */
 128int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
 129                        struct svc_rqst *rqstp,
 130                        struct svc_rdma_op_ctxt *head,
 131                        int *page_no,
 132                        u32 *page_offset,
 133                        u32 rs_handle,
 134                        u32 rs_length,
 135                        u64 rs_offset,
 136                        bool last)
 137{
 138        struct ib_send_wr read_wr;
 139        int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
 140        struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
 141        int ret, read, pno;
 142        u32 pg_off = *page_offset;
 143        u32 pg_no = *page_no;
 144
 145        ctxt->direction = DMA_FROM_DEVICE;
 146        ctxt->read_hdr = head;
 147        pages_needed =
 148                min_t(int, pages_needed, rdma_read_max_sge(xprt, pages_needed));
 149        read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
 150
 151        for (pno = 0; pno < pages_needed; pno++) {
 152                int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
 153
 154                head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
 155                head->arg.page_len += len;
 156                head->arg.len += len;
 157                if (!pg_off)
 158                        head->count++;
 159                rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
 160                rqstp->rq_next_page = rqstp->rq_respages + 1;
 161                ctxt->sge[pno].addr =
 162                        ib_dma_map_page(xprt->sc_cm_id->device,
 163                                        head->arg.pages[pg_no], pg_off,
 164                                        PAGE_SIZE - pg_off,
 165                                        DMA_FROM_DEVICE);
 166                ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
 167                                           ctxt->sge[pno].addr);
 168                if (ret)
 169                        goto err;
 170                atomic_inc(&xprt->sc_dma_used);
 171
 172                /* The lkey here is either a local dma lkey or a dma_mr lkey */
 173                ctxt->sge[pno].lkey = xprt->sc_dma_lkey;
 174                ctxt->sge[pno].length = len;
 175                ctxt->count++;
 176
 177                /* adjust offset and wrap to next page if needed */
 178                pg_off += len;
 179                if (pg_off == PAGE_SIZE) {
 180                        pg_off = 0;
 181                        pg_no++;
 182                }
 183                rs_length -= len;
 184        }
 185
 186        if (last && rs_length == 0)
 187                set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
 188        else
 189                clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
 190
 191        memset(&read_wr, 0, sizeof(read_wr));
 192        read_wr.wr_id = (unsigned long)ctxt;
 193        read_wr.opcode = IB_WR_RDMA_READ;
 194        ctxt->wr_op = read_wr.opcode;
 195        read_wr.send_flags = IB_SEND_SIGNALED;
 196        read_wr.wr.rdma.rkey = rs_handle;
 197        read_wr.wr.rdma.remote_addr = rs_offset;
 198        read_wr.sg_list = ctxt->sge;
 199        read_wr.num_sge = pages_needed;
 200
 201        ret = svc_rdma_send(xprt, &read_wr);
 202        if (ret) {
 203                pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
 204                set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
 205                goto err;
 206        }
 207
 208        /* return current location in page array */
 209        *page_no = pg_no;
 210        *page_offset = pg_off;
 211        ret = read;
 212        atomic_inc(&rdma_stat_read);
 213        return ret;
 214 err:
 215        svc_rdma_unmap_dma(ctxt);
 216        svc_rdma_put_context(ctxt, 0);
 217        return ret;
 218}
 219
 220/* Issue an RDMA_READ using an FRMR to map the data sink */
 221int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
 222                         struct svc_rqst *rqstp,
 223                         struct svc_rdma_op_ctxt *head,
 224                         int *page_no,
 225                         u32 *page_offset,
 226                         u32 rs_handle,
 227                         u32 rs_length,
 228                         u64 rs_offset,
 229                         bool last)
 230{
 231        struct ib_send_wr read_wr;
 232        struct ib_send_wr inv_wr;
 233        struct ib_send_wr fastreg_wr;
 234        u8 key;
 235        int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
 236        struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
 237        struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt);
 238        int ret, read, pno;
 239        u32 pg_off = *page_offset;
 240        u32 pg_no = *page_no;
 241
 242        if (IS_ERR(frmr))
 243                return -ENOMEM;
 244
 245        ctxt->direction = DMA_FROM_DEVICE;
 246        ctxt->frmr = frmr;
 247        pages_needed = min_t(int, pages_needed, xprt->sc_frmr_pg_list_len);
 248        read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
 249
 250        frmr->kva = page_address(rqstp->rq_arg.pages[pg_no]);
 251        frmr->direction = DMA_FROM_DEVICE;
 252        frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
 253        frmr->map_len = pages_needed << PAGE_SHIFT;
 254        frmr->page_list_len = pages_needed;
 255
 256        for (pno = 0; pno < pages_needed; pno++) {
 257                int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
 258
 259                head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
 260                head->arg.page_len += len;
 261                head->arg.len += len;
 262                if (!pg_off)
 263                        head->count++;
 264                rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
 265                rqstp->rq_next_page = rqstp->rq_respages + 1;
 266                frmr->page_list->page_list[pno] =
 267                        ib_dma_map_page(xprt->sc_cm_id->device,
 268                                        head->arg.pages[pg_no], 0,
 269                                        PAGE_SIZE, DMA_FROM_DEVICE);
 270                ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
 271                                           frmr->page_list->page_list[pno]);
 272                if (ret)
 273                        goto err;
 274                atomic_inc(&xprt->sc_dma_used);
 275
 276                /* adjust offset and wrap to next page if needed */
 277                pg_off += len;
 278                if (pg_off == PAGE_SIZE) {
 279                        pg_off = 0;
 280                        pg_no++;
 281                }
 282                rs_length -= len;
 283        }
 284
 285        if (last && rs_length == 0)
 286                set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
 287        else
 288                clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
 289
 290        /* Bump the key */
 291        key = (u8)(frmr->mr->lkey & 0x000000FF);
 292        ib_update_fast_reg_key(frmr->mr, ++key);
 293
 294        ctxt->sge[0].addr = (unsigned long)frmr->kva + *page_offset;
 295        ctxt->sge[0].lkey = frmr->mr->lkey;
 296        ctxt->sge[0].length = read;
 297        ctxt->count = 1;
 298        ctxt->read_hdr = head;
 299
 300        /* Prepare FASTREG WR */
 301        memset(&fastreg_wr, 0, sizeof(fastreg_wr));
 302        fastreg_wr.opcode = IB_WR_FAST_REG_MR;
 303        fastreg_wr.send_flags = IB_SEND_SIGNALED;
 304        fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva;
 305        fastreg_wr.wr.fast_reg.page_list = frmr->page_list;
 306        fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len;
 307        fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
 308        fastreg_wr.wr.fast_reg.length = frmr->map_len;
 309        fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags;
 310        fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey;
 311        fastreg_wr.next = &read_wr;
 312
 313        /* Prepare RDMA_READ */
 314        memset(&read_wr, 0, sizeof(read_wr));
 315        read_wr.send_flags = IB_SEND_SIGNALED;
 316        read_wr.wr.rdma.rkey = rs_handle;
 317        read_wr.wr.rdma.remote_addr = rs_offset;
 318        read_wr.sg_list = ctxt->sge;
 319        read_wr.num_sge = 1;
 320        if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
 321                read_wr.opcode = IB_WR_RDMA_READ_WITH_INV;
 322                read_wr.wr_id = (unsigned long)ctxt;
 323                read_wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
 324        } else {
 325                read_wr.opcode = IB_WR_RDMA_READ;
 326                read_wr.next = &inv_wr;
 327                /* Prepare invalidate */
 328                memset(&inv_wr, 0, sizeof(inv_wr));
 329                inv_wr.wr_id = (unsigned long)ctxt;
 330                inv_wr.opcode = IB_WR_LOCAL_INV;
 331                inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
 332                inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
 333        }
 334        ctxt->wr_op = read_wr.opcode;
 335
 336        /* Post the chain */
 337        ret = svc_rdma_send(xprt, &fastreg_wr);
 338        if (ret) {
 339                pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
 340                set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
 341                goto err;
 342        }
 343
 344        /* return current location in page array */
 345        *page_no = pg_no;
 346        *page_offset = pg_off;
 347        ret = read;
 348        atomic_inc(&rdma_stat_read);
 349        return ret;
 350 err:
 351        svc_rdma_unmap_dma(ctxt);
 352        svc_rdma_put_context(ctxt, 0);
 353        svc_rdma_put_frmr(xprt, frmr);
 354        return ret;
 355}
 356
 357static unsigned int
 358rdma_rcl_chunk_count(struct rpcrdma_read_chunk *ch)
 359{
 360        unsigned int count;
 361
 362        for (count = 0; ch->rc_discrim != xdr_zero; ch++)
 363                count++;
 364        return count;
 365}
 366
 367/* If there was additional inline content, append it to the end of arg.pages.
 368 * Tail copy has to be done after the reader function has determined how many
 369 * pages are needed for RDMA READ.
 370 */
 371static int
 372rdma_copy_tail(struct svc_rqst *rqstp, struct svc_rdma_op_ctxt *head,
 373               u32 position, u32 byte_count, u32 page_offset, int page_no)
 374{
 375        char *srcp, *destp;
 376        int ret;
 377
 378        ret = 0;
 379        srcp = head->arg.head[0].iov_base + position;
 380        byte_count = head->arg.head[0].iov_len - position;
 381        if (byte_count > PAGE_SIZE) {
 382                dprintk("svcrdma: large tail unsupported\n");
 383                return 0;
 384        }
 385
 386        /* Fit as much of the tail on the current page as possible */
 387        if (page_offset != PAGE_SIZE) {
 388                destp = page_address(rqstp->rq_arg.pages[page_no]);
 389                destp += page_offset;
 390                while (byte_count--) {
 391                        *destp++ = *srcp++;
 392                        page_offset++;
 393                        if (page_offset == PAGE_SIZE && byte_count)
 394                                goto more;
 395                }
 396                goto done;
 397        }
 398
 399more:
 400        /* Fit the rest on the next page */
 401        page_no++;
 402        destp = page_address(rqstp->rq_arg.pages[page_no]);
 403        while (byte_count--)
 404                *destp++ = *srcp++;
 405
 406        rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
 407        rqstp->rq_next_page = rqstp->rq_respages + 1;
 408
 409done:
 410        byte_count = head->arg.head[0].iov_len - position;
 411        head->arg.page_len += byte_count;
 412        head->arg.len += byte_count;
 413        head->arg.buflen += byte_count;
 414        return 1;
 415}
 416
 417static int rdma_read_chunks(struct svcxprt_rdma *xprt,
 418                            struct rpcrdma_msg *rmsgp,
 419                            struct svc_rqst *rqstp,
 420                            struct svc_rdma_op_ctxt *head)
 421{
 422        int page_no, ret;
 423        struct rpcrdma_read_chunk *ch;
 424        u32 handle, page_offset, byte_count;
 425        u32 position;
 426        u64 rs_offset;
 427        bool last;
 428
 429        /* If no read list is present, return 0 */
 430        ch = svc_rdma_get_read_chunk(rmsgp);
 431        if (!ch)
 432                return 0;
 433
 434        if (rdma_rcl_chunk_count(ch) > RPCSVC_MAXPAGES)
 435                return -EINVAL;
 436
 437        /* The request is completed when the RDMA_READs complete. The
 438         * head context keeps all the pages that comprise the
 439         * request.
 440         */
 441        head->arg.head[0] = rqstp->rq_arg.head[0];
 442        head->arg.tail[0] = rqstp->rq_arg.tail[0];
 443        head->hdr_count = head->count;
 444        head->arg.page_base = 0;
 445        head->arg.page_len = 0;
 446        head->arg.len = rqstp->rq_arg.len;
 447        head->arg.buflen = rqstp->rq_arg.buflen;
 448
 449        ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
 450        position = be32_to_cpu(ch->rc_position);
 451
 452        /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
 453        if (position == 0) {
 454                head->arg.pages = &head->pages[0];
 455                page_offset = head->byte_len;
 456        } else {
 457                head->arg.pages = &head->pages[head->count];
 458                page_offset = 0;
 459        }
 460
 461        ret = 0;
 462        page_no = 0;
 463        for (; ch->rc_discrim != xdr_zero; ch++) {
 464                if (be32_to_cpu(ch->rc_position) != position)
 465                        goto err;
 466
 467                handle = be32_to_cpu(ch->rc_target.rs_handle),
 468                byte_count = be32_to_cpu(ch->rc_target.rs_length);
 469                xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
 470                                 &rs_offset);
 471
 472                while (byte_count > 0) {
 473                        last = (ch + 1)->rc_discrim == xdr_zero;
 474                        ret = xprt->sc_reader(xprt, rqstp, head,
 475                                              &page_no, &page_offset,
 476                                              handle, byte_count,
 477                                              rs_offset, last);
 478                        if (ret < 0)
 479                                goto err;
 480                        byte_count -= ret;
 481                        rs_offset += ret;
 482                        head->arg.buflen += ret;
 483                }
 484        }
 485
 486        /* Read list may need XDR round-up (see RFC 5666, s. 3.7) */
 487        if (page_offset & 3) {
 488                u32 pad = 4 - (page_offset & 3);
 489
 490                head->arg.page_len += pad;
 491                head->arg.len += pad;
 492                head->arg.buflen += pad;
 493                page_offset += pad;
 494        }
 495
 496        ret = 1;
 497        if (position && position < head->arg.head[0].iov_len)
 498                ret = rdma_copy_tail(rqstp, head, position,
 499                                     byte_count, page_offset, page_no);
 500        head->arg.head[0].iov_len = position;
 501        head->position = position;
 502
 503 err:
 504        /* Detach arg pages. svc_recv will replenish them */
 505        for (page_no = 0;
 506             &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++)
 507                rqstp->rq_pages[page_no] = NULL;
 508
 509        return ret;
 510}
 511
 512static int rdma_read_complete(struct svc_rqst *rqstp,
 513                              struct svc_rdma_op_ctxt *head)
 514{
 515        int page_no;
 516        int ret;
 517
 518        /* Copy RPC pages */
 519        for (page_no = 0; page_no < head->count; page_no++) {
 520                put_page(rqstp->rq_pages[page_no]);
 521                rqstp->rq_pages[page_no] = head->pages[page_no];
 522        }
 523
 524        /* Adjustments made for RDMA_NOMSG type requests */
 525        if (head->position == 0) {
 526                if (head->arg.len <= head->sge[0].length) {
 527                        head->arg.head[0].iov_len = head->arg.len -
 528                                                        head->byte_len;
 529                        head->arg.page_len = 0;
 530                } else {
 531                        head->arg.head[0].iov_len = head->sge[0].length -
 532                                                                head->byte_len;
 533                        head->arg.page_len = head->arg.len -
 534                                                head->sge[0].length;
 535                }
 536        }
 537
 538        /* Point rq_arg.pages past header */
 539        rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];
 540        rqstp->rq_arg.page_len = head->arg.page_len;
 541        rqstp->rq_arg.page_base = head->arg.page_base;
 542
 543        /* rq_respages starts after the last arg page */
 544        rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
 545        rqstp->rq_next_page = rqstp->rq_respages + 1;
 546
 547        /* Rebuild rq_arg head and tail. */
 548        rqstp->rq_arg.head[0] = head->arg.head[0];
 549        rqstp->rq_arg.tail[0] = head->arg.tail[0];
 550        rqstp->rq_arg.len = head->arg.len;
 551        rqstp->rq_arg.buflen = head->arg.buflen;
 552
 553        /* Free the context */
 554        svc_rdma_put_context(head, 0);
 555
 556        /* XXX: What should this be? */
 557        rqstp->rq_prot = IPPROTO_MAX;
 558        svc_xprt_copy_addrs(rqstp, rqstp->rq_xprt);
 559
 560        ret = rqstp->rq_arg.head[0].iov_len
 561                + rqstp->rq_arg.page_len
 562                + rqstp->rq_arg.tail[0].iov_len;
 563        dprintk("svcrdma: deferred read ret=%d, rq_arg.len=%u, "
 564                "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zu\n",
 565                ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base,
 566                rqstp->rq_arg.head[0].iov_len);
 567
 568        return ret;
 569}
 570
 571/*
 572 * Set up the rqstp thread context to point to the RQ buffer. If
 573 * necessary, pull additional data from the client with an RDMA_READ
 574 * request.
 575 */
 576int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 577{
 578        struct svc_xprt *xprt = rqstp->rq_xprt;
 579        struct svcxprt_rdma *rdma_xprt =
 580                container_of(xprt, struct svcxprt_rdma, sc_xprt);
 581        struct svc_rdma_op_ctxt *ctxt = NULL;
 582        struct rpcrdma_msg *rmsgp;
 583        int ret = 0;
 584        int len;
 585
 586        dprintk("svcrdma: rqstp=%p\n", rqstp);
 587
 588        spin_lock_bh(&rdma_xprt->sc_rq_dto_lock);
 589        if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
 590                ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,
 591                                  struct svc_rdma_op_ctxt,
 592                                  dto_q);
 593                list_del_init(&ctxt->dto_q);
 594                spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
 595                return rdma_read_complete(rqstp, ctxt);
 596        } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
 597                ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
 598                                  struct svc_rdma_op_ctxt,
 599                                  dto_q);
 600                list_del_init(&ctxt->dto_q);
 601        } else {
 602                atomic_inc(&rdma_stat_rq_starve);
 603                clear_bit(XPT_DATA, &xprt->xpt_flags);
 604                ctxt = NULL;
 605        }
 606        spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
 607        if (!ctxt) {
 608                /* This is the EAGAIN path. The svc_recv routine will
 609                 * return -EAGAIN, the nfsd thread will go to call into
 610                 * svc_recv again and we shouldn't be on the active
 611                 * transport list
 612                 */
 613                if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
 614                        goto close_out;
 615
 616                goto out;
 617        }
 618        dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
 619                ctxt, rdma_xprt, rqstp, ctxt->wc_status);
 620        atomic_inc(&rdma_stat_recv);
 621
 622        /* Build up the XDR from the receive buffers. */
 623        rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
 624
 625        /* Decode the RDMA header. */
 626        len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
 627        rqstp->rq_xprt_hlen = len;
 628
 629        /* If the request is invalid, reply with an error */
 630        if (len < 0) {
 631                if (len == -ENOSYS)
 632                        svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
 633                goto close_out;
 634        }
 635
 636        /* Read read-list data. */
 637        ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);
 638        if (ret > 0) {
 639                /* read-list posted, defer until data received from client. */
 640                goto defer;
 641        } else if (ret < 0) {
 642                /* Post of read-list failed, free context. */
 643                svc_rdma_put_context(ctxt, 1);
 644                return 0;
 645        }
 646
 647        ret = rqstp->rq_arg.head[0].iov_len
 648                + rqstp->rq_arg.page_len
 649                + rqstp->rq_arg.tail[0].iov_len;
 650        svc_rdma_put_context(ctxt, 0);
 651 out:
 652        dprintk("svcrdma: ret=%d, rq_arg.len=%u, "
 653                "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zd\n",
 654                ret, rqstp->rq_arg.len,
 655                rqstp->rq_arg.head[0].iov_base,
 656                rqstp->rq_arg.head[0].iov_len);
 657        rqstp->rq_prot = IPPROTO_MAX;
 658        svc_xprt_copy_addrs(rqstp, xprt);
 659        return ret;
 660
 661 close_out:
 662        if (ctxt)
 663                svc_rdma_put_context(ctxt, 1);
 664        dprintk("svcrdma: transport %p is closing\n", xprt);
 665        /*
 666         * Set the close bit and enqueue it. svc_recv will see the
 667         * close bit and call svc_xprt_delete
 668         */
 669        set_bit(XPT_CLOSE, &xprt->xpt_flags);
 670defer:
 671        return 0;
 672}
 673