linux/net/sunrpc/xprtrdma/svc_rdma_sendto.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
   3 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
   4 *
   5 * This software is available to you under a choice of one of two
   6 * licenses.  You may choose to be licensed under the terms of the GNU
   7 * General Public License (GPL) Version 2, available from the file
   8 * COPYING in the main directory of this source tree, or the BSD-type
   9 * license below:
  10 *
  11 * Redistribution and use in source and binary forms, with or without
  12 * modification, are permitted provided that the following conditions
  13 * are met:
  14 *
  15 *      Redistributions of source code must retain the above copyright
  16 *      notice, this list of conditions and the following disclaimer.
  17 *
  18 *      Redistributions in binary form must reproduce the above
  19 *      copyright notice, this list of conditions and the following
  20 *      disclaimer in the documentation and/or other materials provided
  21 *      with the distribution.
  22 *
  23 *      Neither the name of the Network Appliance, Inc. nor the names of
  24 *      its contributors may be used to endorse or promote products
  25 *      derived from this software without specific prior written
  26 *      permission.
  27 *
  28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  39 *
  40 * Author: Tom Tucker <tom@opengridcomputing.com>
  41 */
  42
  43#include <linux/sunrpc/debug.h>
  44#include <linux/sunrpc/rpc_rdma.h>
  45#include <linux/spinlock.h>
  46#include <asm/unaligned.h>
  47#include <rdma/ib_verbs.h>
  48#include <rdma/rdma_cm.h>
  49#include <linux/sunrpc/svc_rdma.h>
  50
  51#define RPCDBG_FACILITY RPCDBG_SVCXPRT
  52
  53static u32 xdr_padsize(u32 len)
  54{
  55        return (len & 3) ? (4 - (len & 3)) : 0;
  56}
  57
  58int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
  59                     struct xdr_buf *xdr,
  60                     struct svc_rdma_req_map *vec,
  61                     bool write_chunk_present)
  62{
  63        int sge_no;
  64        u32 sge_bytes;
  65        u32 page_bytes;
  66        u32 page_off;
  67        int page_no;
  68
  69        if (xdr->len !=
  70            (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)) {
  71                pr_err("svcrdma: %s: XDR buffer length error\n", __func__);
  72                return -EIO;
  73        }
  74
  75        /* Skip the first sge, this is for the RPCRDMA header */
  76        sge_no = 1;
  77
  78        /* Head SGE */
  79        vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
  80        vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
  81        sge_no++;
  82
  83        /* pages SGE */
  84        page_no = 0;
  85        page_bytes = xdr->page_len;
  86        page_off = xdr->page_base;
  87        while (page_bytes) {
  88                vec->sge[sge_no].iov_base =
  89                        page_address(xdr->pages[page_no]) + page_off;
  90                sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
  91                page_bytes -= sge_bytes;
  92                vec->sge[sge_no].iov_len = sge_bytes;
  93
  94                sge_no++;
  95                page_no++;
  96                page_off = 0; /* reset for next time through loop */
  97        }
  98
  99        /* Tail SGE */
 100        if (xdr->tail[0].iov_len) {
 101                unsigned char *base = xdr->tail[0].iov_base;
 102                size_t len = xdr->tail[0].iov_len;
 103                u32 xdr_pad = xdr_padsize(xdr->page_len);
 104
 105                if (write_chunk_present && xdr_pad) {
 106                        base += xdr_pad;
 107                        len -= xdr_pad;
 108                }
 109
 110                if (len) {
 111                        vec->sge[sge_no].iov_base = base;
 112                        vec->sge[sge_no].iov_len = len;
 113                        sge_no++;
 114                }
 115        }
 116
 117        dprintk("svcrdma: %s: sge_no %d page_no %d "
 118                "page_base %u page_len %u head_len %zu tail_len %zu\n",
 119                __func__, sge_no, page_no, xdr->page_base, xdr->page_len,
 120                xdr->head[0].iov_len, xdr->tail[0].iov_len);
 121
 122        vec->count = sge_no;
 123        return 0;
 124}
 125
 126static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,
 127                              struct xdr_buf *xdr,
 128                              u32 xdr_off, size_t len, int dir)
 129{
 130        struct page *page;
 131        dma_addr_t dma_addr;
 132        if (xdr_off < xdr->head[0].iov_len) {
 133                /* This offset is in the head */
 134                xdr_off += (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
 135                page = virt_to_page(xdr->head[0].iov_base);
 136        } else {
 137                xdr_off -= xdr->head[0].iov_len;
 138                if (xdr_off < xdr->page_len) {
 139                        /* This offset is in the page list */
 140                        xdr_off += xdr->page_base;
 141                        page = xdr->pages[xdr_off >> PAGE_SHIFT];
 142                        xdr_off &= ~PAGE_MASK;
 143                } else {
 144                        /* This offset is in the tail */
 145                        xdr_off -= xdr->page_len;
 146                        xdr_off += (unsigned long)
 147                                xdr->tail[0].iov_base & ~PAGE_MASK;
 148                        page = virt_to_page(xdr->tail[0].iov_base);
 149                }
 150        }
 151        dma_addr = ib_dma_map_page(xprt->sc_cm_id->device, page, xdr_off,
 152                                   min_t(size_t, PAGE_SIZE, len), dir);
 153        return dma_addr;
 154}
 155
 156/* Returns the address of the first read chunk or <nul> if no read chunk
 157 * is present
 158 */
 159struct rpcrdma_read_chunk *
 160svc_rdma_get_read_chunk(struct rpcrdma_msg *rmsgp)
 161{
 162        struct rpcrdma_read_chunk *ch =
 163                (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
 164
 165        if (ch->rc_discrim == xdr_zero)
 166                return NULL;
 167        return ch;
 168}
 169
 170/* Returns the address of the first read write array element or <nul>
 171 * if no write array list is present
 172 */
 173static struct rpcrdma_write_array *
 174svc_rdma_get_write_array(struct rpcrdma_msg *rmsgp)
 175{
 176        if (rmsgp->rm_body.rm_chunks[0] != xdr_zero ||
 177            rmsgp->rm_body.rm_chunks[1] == xdr_zero)
 178                return NULL;
 179        return (struct rpcrdma_write_array *)&rmsgp->rm_body.rm_chunks[1];
 180}
 181
 182/* Returns the address of the first reply array element or <nul> if no
 183 * reply array is present
 184 */
 185static struct rpcrdma_write_array *
 186svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp,
 187                         struct rpcrdma_write_array *wr_ary)
 188{
 189        struct rpcrdma_read_chunk *rch;
 190        struct rpcrdma_write_array *rp_ary;
 191
 192        /* XXX: Need to fix when reply chunk may occur with read list
 193         *      and/or write list.
 194         */
 195        if (rmsgp->rm_body.rm_chunks[0] != xdr_zero ||
 196            rmsgp->rm_body.rm_chunks[1] != xdr_zero)
 197                return NULL;
 198
 199        rch = svc_rdma_get_read_chunk(rmsgp);
 200        if (rch) {
 201                while (rch->rc_discrim != xdr_zero)
 202                        rch++;
 203
 204                /* The reply chunk follows an empty write array located
 205                 * at 'rc_position' here. The reply array is at rc_target.
 206                 */
 207                rp_ary = (struct rpcrdma_write_array *)&rch->rc_target;
 208                goto found_it;
 209        }
 210
 211        if (wr_ary) {
 212                int chunk = be32_to_cpu(wr_ary->wc_nchunks);
 213
 214                rp_ary = (struct rpcrdma_write_array *)
 215                         &wr_ary->wc_array[chunk].wc_target.rs_length;
 216                goto found_it;
 217        }
 218
 219        /* No read list, no write list */
 220        rp_ary = (struct rpcrdma_write_array *)&rmsgp->rm_body.rm_chunks[2];
 221
 222 found_it:
 223        if (rp_ary->wc_discrim == xdr_zero)
 224                return NULL;
 225        return rp_ary;
 226}
 227
 228/* Assumptions:
 229 * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
 230 */
 231static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
 232                      u32 rmr, u64 to,
 233                      u32 xdr_off, int write_len,
 234                      struct svc_rdma_req_map *vec)
 235{
 236        struct ib_rdma_wr write_wr;
 237        struct ib_sge *sge;
 238        int xdr_sge_no;
 239        int sge_no;
 240        int sge_bytes;
 241        int sge_off;
 242        int bc;
 243        struct svc_rdma_op_ctxt *ctxt;
 244
 245        if (vec->count > RPCSVC_MAXPAGES) {
 246                pr_err("svcrdma: Too many pages (%lu)\n", vec->count);
 247                return -EIO;
 248        }
 249
 250        dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
 251                "write_len=%d, vec->sge=%p, vec->count=%lu\n",
 252                rmr, (unsigned long long)to, xdr_off,
 253                write_len, vec->sge, vec->count);
 254
 255        ctxt = svc_rdma_get_context(xprt);
 256        ctxt->direction = DMA_TO_DEVICE;
 257        sge = ctxt->sge;
 258
 259        /* Find the SGE associated with xdr_off */
 260        for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count;
 261             xdr_sge_no++) {
 262                if (vec->sge[xdr_sge_no].iov_len > bc)
 263                        break;
 264                bc -= vec->sge[xdr_sge_no].iov_len;
 265        }
 266
 267        sge_off = bc;
 268        bc = write_len;
 269        sge_no = 0;
 270
 271        /* Copy the remaining SGE */
 272        while (bc != 0) {
 273                sge_bytes = min_t(size_t,
 274                          bc, vec->sge[xdr_sge_no].iov_len-sge_off);
 275                sge[sge_no].length = sge_bytes;
 276                sge[sge_no].addr =
 277                        dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
 278                                    sge_bytes, DMA_TO_DEVICE);
 279                xdr_off += sge_bytes;
 280                if (ib_dma_mapping_error(xprt->sc_cm_id->device,
 281                                         sge[sge_no].addr))
 282                        goto err;
 283                atomic_inc(&xprt->sc_dma_used);
 284                sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
 285                ctxt->count++;
 286                sge_off = 0;
 287                sge_no++;
 288                xdr_sge_no++;
 289                if (xdr_sge_no > vec->count) {
 290                        pr_err("svcrdma: Too many sges (%d)\n", xdr_sge_no);
 291                        goto err;
 292                }
 293                bc -= sge_bytes;
 294                if (sge_no == xprt->sc_max_sge)
 295                        break;
 296        }
 297
 298        /* Prepare WRITE WR */
 299        memset(&write_wr, 0, sizeof write_wr);
 300        ctxt->cqe.done = svc_rdma_wc_write;
 301        write_wr.wr.wr_cqe = &ctxt->cqe;
 302        write_wr.wr.sg_list = &sge[0];
 303        write_wr.wr.num_sge = sge_no;
 304        write_wr.wr.opcode = IB_WR_RDMA_WRITE;
 305        write_wr.wr.send_flags = IB_SEND_SIGNALED;
 306        write_wr.rkey = rmr;
 307        write_wr.remote_addr = to;
 308
 309        /* Post It */
 310        atomic_inc(&rdma_stat_write);
 311        if (svc_rdma_send(xprt, &write_wr.wr))
 312                goto err;
 313        return write_len - bc;
 314 err:
 315        svc_rdma_unmap_dma(ctxt);
 316        svc_rdma_put_context(ctxt, 0);
 317        return -EIO;
 318}
 319
 320noinline
 321static int send_write_chunks(struct svcxprt_rdma *xprt,
 322                             struct rpcrdma_write_array *wr_ary,
 323                             struct rpcrdma_msg *rdma_resp,
 324                             struct svc_rqst *rqstp,
 325                             struct svc_rdma_req_map *vec)
 326{
 327        u32 xfer_len = rqstp->rq_res.page_len;
 328        int write_len;
 329        u32 xdr_off;
 330        int chunk_off;
 331        int chunk_no;
 332        int nchunks;
 333        struct rpcrdma_write_array *res_ary;
 334        int ret;
 335
 336        res_ary = (struct rpcrdma_write_array *)
 337                &rdma_resp->rm_body.rm_chunks[1];
 338
 339        /* Write chunks start at the pagelist */
 340        nchunks = be32_to_cpu(wr_ary->wc_nchunks);
 341        for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
 342             xfer_len && chunk_no < nchunks;
 343             chunk_no++) {
 344                struct rpcrdma_segment *arg_ch;
 345                u64 rs_offset;
 346
 347                arg_ch = &wr_ary->wc_array[chunk_no].wc_target;
 348                write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length));
 349
 350                /* Prepare the response chunk given the length actually
 351                 * written */
 352                xdr_decode_hyper((__be32 *)&arg_ch->rs_offset, &rs_offset);
 353                svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
 354                                                arg_ch->rs_handle,
 355                                                arg_ch->rs_offset,
 356                                                write_len);
 357                chunk_off = 0;
 358                while (write_len) {
 359                        ret = send_write(xprt, rqstp,
 360                                         be32_to_cpu(arg_ch->rs_handle),
 361                                         rs_offset + chunk_off,
 362                                         xdr_off,
 363                                         write_len,
 364                                         vec);
 365                        if (ret <= 0)
 366                                goto out_err;
 367                        chunk_off += ret;
 368                        xdr_off += ret;
 369                        xfer_len -= ret;
 370                        write_len -= ret;
 371                }
 372        }
 373        /* Update the req with the number of chunks actually used */
 374        svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
 375
 376        return rqstp->rq_res.page_len;
 377
 378out_err:
 379        pr_err("svcrdma: failed to send write chunks, rc=%d\n", ret);
 380        return -EIO;
 381}
 382
 383noinline
 384static int send_reply_chunks(struct svcxprt_rdma *xprt,
 385                             struct rpcrdma_write_array *rp_ary,
 386                             struct rpcrdma_msg *rdma_resp,
 387                             struct svc_rqst *rqstp,
 388                             struct svc_rdma_req_map *vec)
 389{
 390        u32 xfer_len = rqstp->rq_res.len;
 391        int write_len;
 392        u32 xdr_off;
 393        int chunk_no;
 394        int chunk_off;
 395        int nchunks;
 396        struct rpcrdma_segment *ch;
 397        struct rpcrdma_write_array *res_ary;
 398        int ret;
 399
 400        /* XXX: need to fix when reply lists occur with read-list and or
 401         * write-list */
 402        res_ary = (struct rpcrdma_write_array *)
 403                &rdma_resp->rm_body.rm_chunks[2];
 404
 405        /* xdr offset starts at RPC message */
 406        nchunks = be32_to_cpu(rp_ary->wc_nchunks);
 407        for (xdr_off = 0, chunk_no = 0;
 408             xfer_len && chunk_no < nchunks;
 409             chunk_no++) {
 410                u64 rs_offset;
 411                ch = &rp_ary->wc_array[chunk_no].wc_target;
 412                write_len = min(xfer_len, be32_to_cpu(ch->rs_length));
 413
 414                /* Prepare the reply chunk given the length actually
 415                 * written */
 416                xdr_decode_hyper((__be32 *)&ch->rs_offset, &rs_offset);
 417                svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
 418                                                ch->rs_handle, ch->rs_offset,
 419                                                write_len);
 420                chunk_off = 0;
 421                while (write_len) {
 422                        ret = send_write(xprt, rqstp,
 423                                         be32_to_cpu(ch->rs_handle),
 424                                         rs_offset + chunk_off,
 425                                         xdr_off,
 426                                         write_len,
 427                                         vec);
 428                        if (ret <= 0)
 429                                goto out_err;
 430                        chunk_off += ret;
 431                        xdr_off += ret;
 432                        xfer_len -= ret;
 433                        write_len -= ret;
 434                }
 435        }
 436        /* Update the req with the number of chunks actually used */
 437        svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
 438
 439        return rqstp->rq_res.len;
 440
 441out_err:
 442        pr_err("svcrdma: failed to send reply chunks, rc=%d\n", ret);
 443        return -EIO;
 444}
 445
 446/* This function prepares the portion of the RPCRDMA message to be
 447 * sent in the RDMA_SEND. This function is called after data sent via
 448 * RDMA has already been transmitted. There are three cases:
 449 * - The RPCRDMA header, RPC header, and payload are all sent in a
 450 *   single RDMA_SEND. This is the "inline" case.
 451 * - The RPCRDMA header and some portion of the RPC header and data
 452 *   are sent via this RDMA_SEND and another portion of the data is
 453 *   sent via RDMA.
 454 * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
 455 *   header and data are all transmitted via RDMA.
 456 * In all three cases, this function prepares the RPCRDMA header in
 457 * sge[0], the 'type' parameter indicates the type to place in the
 458 * RPCRDMA header, and the 'byte_count' field indicates how much of
 459 * the XDR to include in this RDMA_SEND. NB: The offset of the payload
 460 * to send is zero in the XDR.
 461 */
 462static int send_reply(struct svcxprt_rdma *rdma,
 463                      struct svc_rqst *rqstp,
 464                      struct page *page,
 465                      struct rpcrdma_msg *rdma_resp,
 466                      struct svc_rdma_req_map *vec,
 467                      int byte_count)
 468{
 469        struct svc_rdma_op_ctxt *ctxt;
 470        struct ib_send_wr send_wr;
 471        u32 xdr_off;
 472        int sge_no;
 473        int sge_bytes;
 474        int page_no;
 475        int pages;
 476        int ret = -EIO;
 477
 478        /* Prepare the context */
 479        ctxt = svc_rdma_get_context(rdma);
 480        ctxt->direction = DMA_TO_DEVICE;
 481        ctxt->pages[0] = page;
 482        ctxt->count = 1;
 483
 484        /* Prepare the SGE for the RPCRDMA Header */
 485        ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
 486        ctxt->sge[0].length = svc_rdma_xdr_get_reply_hdr_len(rdma_resp);
 487        ctxt->sge[0].addr =
 488            ib_dma_map_page(rdma->sc_cm_id->device, page, 0,
 489                            ctxt->sge[0].length, DMA_TO_DEVICE);
 490        if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
 491                goto err;
 492        atomic_inc(&rdma->sc_dma_used);
 493
 494        ctxt->direction = DMA_TO_DEVICE;
 495
 496        /* Map the payload indicated by 'byte_count' */
 497        xdr_off = 0;
 498        for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) {
 499                sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
 500                byte_count -= sge_bytes;
 501                ctxt->sge[sge_no].addr =
 502                        dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
 503                                    sge_bytes, DMA_TO_DEVICE);
 504                xdr_off += sge_bytes;
 505                if (ib_dma_mapping_error(rdma->sc_cm_id->device,
 506                                         ctxt->sge[sge_no].addr))
 507                        goto err;
 508                atomic_inc(&rdma->sc_dma_used);
 509                ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
 510                ctxt->sge[sge_no].length = sge_bytes;
 511        }
 512        if (byte_count != 0) {
 513                pr_err("svcrdma: Could not map %d bytes\n", byte_count);
 514                goto err;
 515        }
 516
 517        /* Save all respages in the ctxt and remove them from the
 518         * respages array. They are our pages until the I/O
 519         * completes.
 520         */
 521        pages = rqstp->rq_next_page - rqstp->rq_respages;
 522        for (page_no = 0; page_no < pages; page_no++) {
 523                ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
 524                ctxt->count++;
 525                rqstp->rq_respages[page_no] = NULL;
 526                /*
 527                 * If there are more pages than SGE, terminate SGE
 528                 * list so that svc_rdma_unmap_dma doesn't attempt to
 529                 * unmap garbage.
 530                 */
 531                if (page_no+1 >= sge_no)
 532                        ctxt->sge[page_no+1].length = 0;
 533        }
 534        rqstp->rq_next_page = rqstp->rq_respages + 1;
 535
 536        /* The loop above bumps sc_dma_used for each sge. The
 537         * xdr_buf.tail gets a separate sge, but resides in the
 538         * same page as xdr_buf.head. Don't count it twice.
 539         */
 540        if (sge_no > ctxt->count)
 541                atomic_dec(&rdma->sc_dma_used);
 542
 543        if (sge_no > rdma->sc_max_sge) {
 544                pr_err("svcrdma: Too many sges (%d)\n", sge_no);
 545                goto err;
 546        }
 547        memset(&send_wr, 0, sizeof send_wr);
 548        ctxt->cqe.done = svc_rdma_wc_send;
 549        send_wr.wr_cqe = &ctxt->cqe;
 550        send_wr.sg_list = ctxt->sge;
 551        send_wr.num_sge = sge_no;
 552        send_wr.opcode = IB_WR_SEND;
 553        send_wr.send_flags =  IB_SEND_SIGNALED;
 554
 555        ret = svc_rdma_send(rdma, &send_wr);
 556        if (ret)
 557                goto err;
 558
 559        return 0;
 560
 561 err:
 562        svc_rdma_unmap_dma(ctxt);
 563        svc_rdma_put_context(ctxt, 1);
 564        return ret;
 565}
 566
 567void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
 568{
 569}
 570
 571int svc_rdma_sendto(struct svc_rqst *rqstp)
 572{
 573        struct svc_xprt *xprt = rqstp->rq_xprt;
 574        struct svcxprt_rdma *rdma =
 575                container_of(xprt, struct svcxprt_rdma, sc_xprt);
 576        struct rpcrdma_msg *rdma_argp;
 577        struct rpcrdma_msg *rdma_resp;
 578        struct rpcrdma_write_array *wr_ary, *rp_ary;
 579        enum rpcrdma_proc reply_type;
 580        int ret;
 581        int inline_bytes;
 582        struct page *res_page;
 583        struct svc_rdma_req_map *vec;
 584
 585        dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
 586
 587        /* Get the RDMA request header. The receive logic always
 588         * places this at the start of page 0.
 589         */
 590        rdma_argp = page_address(rqstp->rq_pages[0]);
 591        wr_ary = svc_rdma_get_write_array(rdma_argp);
 592        rp_ary = svc_rdma_get_reply_array(rdma_argp, wr_ary);
 593
 594        /* Build an req vec for the XDR */
 595        vec = svc_rdma_get_req_map(rdma);
 596        ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL);
 597        if (ret)
 598                goto err0;
 599        inline_bytes = rqstp->rq_res.len;
 600
 601        /* Create the RDMA response header */
 602        ret = -ENOMEM;
 603        res_page = alloc_page(GFP_KERNEL);
 604        if (!res_page)
 605                goto err0;
 606        rdma_resp = page_address(res_page);
 607        if (rp_ary)
 608                reply_type = RDMA_NOMSG;
 609        else
 610                reply_type = RDMA_MSG;
 611        svc_rdma_xdr_encode_reply_header(rdma, rdma_argp,
 612                                         rdma_resp, reply_type);
 613
 614        /* Send any write-chunk data and build resp write-list */
 615        if (wr_ary) {
 616                ret = send_write_chunks(rdma, wr_ary, rdma_resp, rqstp, vec);
 617                if (ret < 0)
 618                        goto err1;
 619                inline_bytes -= ret + xdr_padsize(ret);
 620        }
 621
 622        /* Send any reply-list data and update resp reply-list */
 623        if (rp_ary) {
 624                ret = send_reply_chunks(rdma, rp_ary, rdma_resp, rqstp, vec);
 625                if (ret < 0)
 626                        goto err1;
 627                inline_bytes -= ret;
 628        }
 629
 630        /* Post a fresh Receive buffer _before_ sending the reply */
 631        ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
 632        if (ret)
 633                goto err1;
 634
 635        ret = send_reply(rdma, rqstp, res_page, rdma_resp, vec,
 636                         inline_bytes);
 637        if (ret < 0)
 638                goto err1;
 639
 640        svc_rdma_put_req_map(rdma, vec);
 641        dprintk("svcrdma: send_reply returns %d\n", ret);
 642        return ret;
 643
 644 err1:
 645        put_page(res_page);
 646 err0:
 647        svc_rdma_put_req_map(rdma, vec);
 648        pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n",
 649               ret);
 650        set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
 651        return -ENOTCONN;
 652}
 653
 654void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
 655                         int status)
 656{
 657        struct ib_send_wr err_wr;
 658        struct page *p;
 659        struct svc_rdma_op_ctxt *ctxt;
 660        enum rpcrdma_errcode err;
 661        __be32 *va;
 662        int length;
 663        int ret;
 664
 665        ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
 666        if (ret)
 667                return;
 668
 669        p = alloc_page(GFP_KERNEL);
 670        if (!p)
 671                return;
 672        va = page_address(p);
 673
 674        /* XDR encode an error reply */
 675        err = ERR_CHUNK;
 676        if (status == -EPROTONOSUPPORT)
 677                err = ERR_VERS;
 678        length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
 679
 680        ctxt = svc_rdma_get_context(xprt);
 681        ctxt->direction = DMA_TO_DEVICE;
 682        ctxt->count = 1;
 683        ctxt->pages[0] = p;
 684
 685        /* Prepare SGE for local address */
 686        ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
 687        ctxt->sge[0].length = length;
 688        ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
 689                                            p, 0, length, DMA_TO_DEVICE);
 690        if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
 691                dprintk("svcrdma: Error mapping buffer for protocol error\n");
 692                svc_rdma_put_context(ctxt, 1);
 693                return;
 694        }
 695        atomic_inc(&xprt->sc_dma_used);
 696
 697        /* Prepare SEND WR */
 698        memset(&err_wr, 0, sizeof(err_wr));
 699        ctxt->cqe.done = svc_rdma_wc_send;
 700        err_wr.wr_cqe = &ctxt->cqe;
 701        err_wr.sg_list = ctxt->sge;
 702        err_wr.num_sge = 1;
 703        err_wr.opcode = IB_WR_SEND;
 704        err_wr.send_flags = IB_SEND_SIGNALED;
 705
 706        /* Post It */
 707        ret = svc_rdma_send(xprt, &err_wr);
 708        if (ret) {
 709                dprintk("svcrdma: Error %d posting send for protocol error\n",
 710                        ret);
 711                svc_rdma_unmap_dma(ctxt);
 712                svc_rdma_put_context(ctxt, 1);
 713        }
 714}
 715