linux/net/sunrpc/xprtrdma/svc_rdma_sendto.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
   3 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
   4 *
   5 * This software is available to you under a choice of one of two
   6 * licenses.  You may choose to be licensed under the terms of the GNU
   7 * General Public License (GPL) Version 2, available from the file
   8 * COPYING in the main directory of this source tree, or the BSD-type
   9 * license below:
  10 *
  11 * Redistribution and use in source and binary forms, with or without
  12 * modification, are permitted provided that the following conditions
  13 * are met:
  14 *
  15 *      Redistributions of source code must retain the above copyright
  16 *      notice, this list of conditions and the following disclaimer.
  17 *
  18 *      Redistributions in binary form must reproduce the above
  19 *      copyright notice, this list of conditions and the following
  20 *      disclaimer in the documentation and/or other materials provided
  21 *      with the distribution.
  22 *
  23 *      Neither the name of the Network Appliance, Inc. nor the names of
  24 *      its contributors may be used to endorse or promote products
  25 *      derived from this software without specific prior written
  26 *      permission.
  27 *
  28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  39 *
  40 * Author: Tom Tucker <tom@opengridcomputing.com>
  41 */
  42
  43#include <linux/sunrpc/debug.h>
  44#include <linux/sunrpc/rpc_rdma.h>
  45#include <linux/spinlock.h>
  46#include <asm/unaligned.h>
  47#include <rdma/ib_verbs.h>
  48#include <rdma/rdma_cm.h>
  49#include <linux/sunrpc/svc_rdma.h>
  50
  51#define RPCDBG_FACILITY RPCDBG_SVCXPRT
  52
  53static u32 xdr_padsize(u32 len)
  54{
  55        return (len & 3) ? (4 - (len & 3)) : 0;
  56}
  57
  58int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
  59                     struct xdr_buf *xdr,
  60                     struct svc_rdma_req_map *vec,
  61                     bool write_chunk_present)
  62{
  63        int sge_no;
  64        u32 sge_bytes;
  65        u32 page_bytes;
  66        u32 page_off;
  67        int page_no;
  68
  69        if (xdr->len !=
  70            (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)) {
  71                pr_err("svcrdma: %s: XDR buffer length error\n", __func__);
  72                return -EIO;
  73        }
  74
  75        /* Skip the first sge, this is for the RPCRDMA header */
  76        sge_no = 1;
  77
  78        /* Head SGE */
  79        vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
  80        vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
  81        sge_no++;
  82
  83        /* pages SGE */
  84        page_no = 0;
  85        page_bytes = xdr->page_len;
  86        page_off = xdr->page_base;
  87        while (page_bytes) {
  88                vec->sge[sge_no].iov_base =
  89                        page_address(xdr->pages[page_no]) + page_off;
  90                sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
  91                page_bytes -= sge_bytes;
  92                vec->sge[sge_no].iov_len = sge_bytes;
  93
  94                sge_no++;
  95                page_no++;
  96                page_off = 0; /* reset for next time through loop */
  97        }
  98
  99        /* Tail SGE */
 100        if (xdr->tail[0].iov_len) {
 101                unsigned char *base = xdr->tail[0].iov_base;
 102                size_t len = xdr->tail[0].iov_len;
 103                u32 xdr_pad = xdr_padsize(xdr->page_len);
 104
 105                if (write_chunk_present && xdr_pad) {
 106                        base += xdr_pad;
 107                        len -= xdr_pad;
 108                }
 109
 110                if (len) {
 111                        vec->sge[sge_no].iov_base = base;
 112                        vec->sge[sge_no].iov_len = len;
 113                        sge_no++;
 114                }
 115        }
 116
 117        dprintk("svcrdma: %s: sge_no %d page_no %d "
 118                "page_base %u page_len %u head_len %zu tail_len %zu\n",
 119                __func__, sge_no, page_no, xdr->page_base, xdr->page_len,
 120                xdr->head[0].iov_len, xdr->tail[0].iov_len);
 121
 122        vec->count = sge_no;
 123        return 0;
 124}
 125
 126static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,
 127                              struct xdr_buf *xdr,
 128                              u32 xdr_off, size_t len, int dir)
 129{
 130        struct page *page;
 131        dma_addr_t dma_addr;
 132        if (xdr_off < xdr->head[0].iov_len) {
 133                /* This offset is in the head */
 134                xdr_off += (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
 135                page = virt_to_page(xdr->head[0].iov_base);
 136        } else {
 137                xdr_off -= xdr->head[0].iov_len;
 138                if (xdr_off < xdr->page_len) {
 139                        /* This offset is in the page list */
 140                        xdr_off += xdr->page_base;
 141                        page = xdr->pages[xdr_off >> PAGE_SHIFT];
 142                        xdr_off &= ~PAGE_MASK;
 143                } else {
 144                        /* This offset is in the tail */
 145                        xdr_off -= xdr->page_len;
 146                        xdr_off += (unsigned long)
 147                                xdr->tail[0].iov_base & ~PAGE_MASK;
 148                        page = virt_to_page(xdr->tail[0].iov_base);
 149                }
 150        }
 151        dma_addr = ib_dma_map_page(xprt->sc_cm_id->device, page, xdr_off,
 152                                   min_t(size_t, PAGE_SIZE, len), dir);
 153        return dma_addr;
 154}
 155
 156/* Returns the address of the first read chunk or <nul> if no read chunk
 157 * is present
 158 */
 159struct rpcrdma_read_chunk *
 160svc_rdma_get_read_chunk(struct rpcrdma_msg *rmsgp)
 161{
 162        struct rpcrdma_read_chunk *ch =
 163                (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
 164
 165        if (ch->rc_discrim == xdr_zero)
 166                return NULL;
 167        return ch;
 168}
 169
 170/* Returns the address of the first read write array element or <nul>
 171 * if no write array list is present
 172 */
 173static struct rpcrdma_write_array *
 174svc_rdma_get_write_array(struct rpcrdma_msg *rmsgp)
 175{
 176        if (rmsgp->rm_body.rm_chunks[0] != xdr_zero ||
 177            rmsgp->rm_body.rm_chunks[1] == xdr_zero)
 178                return NULL;
 179        return (struct rpcrdma_write_array *)&rmsgp->rm_body.rm_chunks[1];
 180}
 181
 182/* Returns the address of the first reply array element or <nul> if no
 183 * reply array is present
 184 */
 185static struct rpcrdma_write_array *
 186svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp,
 187                         struct rpcrdma_write_array *wr_ary)
 188{
 189        struct rpcrdma_read_chunk *rch;
 190        struct rpcrdma_write_array *rp_ary;
 191
 192        /* XXX: Need to fix when reply chunk may occur with read list
 193         *      and/or write list.
 194         */
 195        if (rmsgp->rm_body.rm_chunks[0] != xdr_zero ||
 196            rmsgp->rm_body.rm_chunks[1] != xdr_zero)
 197                return NULL;
 198
 199        rch = svc_rdma_get_read_chunk(rmsgp);
 200        if (rch) {
 201                while (rch->rc_discrim != xdr_zero)
 202                        rch++;
 203
 204                /* The reply chunk follows an empty write array located
 205                 * at 'rc_position' here. The reply array is at rc_target.
 206                 */
 207                rp_ary = (struct rpcrdma_write_array *)&rch->rc_target;
 208                goto found_it;
 209        }
 210
 211        if (wr_ary) {
 212                int chunk = be32_to_cpu(wr_ary->wc_nchunks);
 213
 214                rp_ary = (struct rpcrdma_write_array *)
 215                         &wr_ary->wc_array[chunk].wc_target.rs_length;
 216                goto found_it;
 217        }
 218
 219        /* No read list, no write list */
 220        rp_ary = (struct rpcrdma_write_array *)&rmsgp->rm_body.rm_chunks[2];
 221
 222 found_it:
 223        if (rp_ary->wc_discrim == xdr_zero)
 224                return NULL;
 225        return rp_ary;
 226}
 227
 228/* Assumptions:
 229 * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
 230 */
 231static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
 232                      u32 rmr, u64 to,
 233                      u32 xdr_off, int write_len,
 234                      struct svc_rdma_req_map *vec)
 235{
 236        struct ib_rdma_wr write_wr;
 237        struct ib_sge *sge;
 238        int xdr_sge_no;
 239        int sge_no;
 240        int sge_bytes;
 241        int sge_off;
 242        int bc;
 243        struct svc_rdma_op_ctxt *ctxt;
 244
 245        if (vec->count > RPCSVC_MAXPAGES) {
 246                pr_err("svcrdma: Too many pages (%lu)\n", vec->count);
 247                return -EIO;
 248        }
 249
 250        dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
 251                "write_len=%d, vec->sge=%p, vec->count=%lu\n",
 252                rmr, (unsigned long long)to, xdr_off,
 253                write_len, vec->sge, vec->count);
 254
 255        ctxt = svc_rdma_get_context(xprt);
 256        ctxt->direction = DMA_TO_DEVICE;
 257        sge = ctxt->sge;
 258
 259        /* Find the SGE associated with xdr_off */
 260        for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count;
 261             xdr_sge_no++) {
 262                if (vec->sge[xdr_sge_no].iov_len > bc)
 263                        break;
 264                bc -= vec->sge[xdr_sge_no].iov_len;
 265        }
 266
 267        sge_off = bc;
 268        bc = write_len;
 269        sge_no = 0;
 270
 271        /* Copy the remaining SGE */
 272        while (bc != 0) {
 273                sge_bytes = min_t(size_t,
 274                          bc, vec->sge[xdr_sge_no].iov_len-sge_off);
 275                sge[sge_no].length = sge_bytes;
 276                sge[sge_no].addr =
 277                        dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
 278                                    sge_bytes, DMA_TO_DEVICE);
 279                xdr_off += sge_bytes;
 280                if (ib_dma_mapping_error(xprt->sc_cm_id->device,
 281                                         sge[sge_no].addr))
 282                        goto err;
 283                atomic_inc(&xprt->sc_dma_used);
 284                sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
 285                ctxt->count++;
 286                sge_off = 0;
 287                sge_no++;
 288                xdr_sge_no++;
 289                if (xdr_sge_no > vec->count) {
 290                        pr_err("svcrdma: Too many sges (%d)\n", xdr_sge_no);
 291                        goto err;
 292                }
 293                bc -= sge_bytes;
 294                if (sge_no == xprt->sc_max_sge)
 295                        break;
 296        }
 297
 298        /* Prepare WRITE WR */
 299        memset(&write_wr, 0, sizeof write_wr);
 300        ctxt->cqe.done = svc_rdma_wc_write;
 301        write_wr.wr.wr_cqe = &ctxt->cqe;
 302        write_wr.wr.sg_list = &sge[0];
 303        write_wr.wr.num_sge = sge_no;
 304        write_wr.wr.opcode = IB_WR_RDMA_WRITE;
 305        write_wr.wr.send_flags = IB_SEND_SIGNALED;
 306        write_wr.rkey = rmr;
 307        write_wr.remote_addr = to;
 308
 309        /* Post It */
 310        atomic_inc(&rdma_stat_write);
 311        if (svc_rdma_send(xprt, &write_wr.wr))
 312                goto err;
 313        return write_len - bc;
 314 err:
 315        svc_rdma_unmap_dma(ctxt);
 316        svc_rdma_put_context(ctxt, 0);
 317        return -EIO;
 318}
 319
 320noinline
 321static int send_write_chunks(struct svcxprt_rdma *xprt,
 322                             struct rpcrdma_write_array *wr_ary,
 323                             struct rpcrdma_msg *rdma_resp,
 324                             struct svc_rqst *rqstp,
 325                             struct svc_rdma_req_map *vec)
 326{
 327        u32 xfer_len = rqstp->rq_res.page_len;
 328        int write_len;
 329        u32 xdr_off;
 330        int chunk_off;
 331        int chunk_no;
 332        int nchunks;
 333        struct rpcrdma_write_array *res_ary;
 334        int ret;
 335
 336        res_ary = (struct rpcrdma_write_array *)
 337                &rdma_resp->rm_body.rm_chunks[1];
 338
 339        /* Write chunks start at the pagelist */
 340        nchunks = be32_to_cpu(wr_ary->wc_nchunks);
 341        for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
 342             xfer_len && chunk_no < nchunks;
 343             chunk_no++) {
 344                struct rpcrdma_segment *arg_ch;
 345                u64 rs_offset;
 346
 347                arg_ch = &wr_ary->wc_array[chunk_no].wc_target;
 348                write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length));
 349
 350                /* Prepare the response chunk given the length actually
 351                 * written */
 352                xdr_decode_hyper((__be32 *)&arg_ch->rs_offset, &rs_offset);
 353                svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
 354                                                arg_ch->rs_handle,
 355                                                arg_ch->rs_offset,
 356                                                write_len);
 357                chunk_off = 0;
 358                while (write_len) {
 359                        ret = send_write(xprt, rqstp,
 360                                         be32_to_cpu(arg_ch->rs_handle),
 361                                         rs_offset + chunk_off,
 362                                         xdr_off,
 363                                         write_len,
 364                                         vec);
 365                        if (ret <= 0)
 366                                goto out_err;
 367                        chunk_off += ret;
 368                        xdr_off += ret;
 369                        xfer_len -= ret;
 370                        write_len -= ret;
 371                }
 372        }
 373        /* Update the req with the number of chunks actually used */
 374        svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
 375
 376        return rqstp->rq_res.page_len;
 377
 378out_err:
 379        pr_err("svcrdma: failed to send write chunks, rc=%d\n", ret);
 380        return -EIO;
 381}
 382
 383noinline
 384static int send_reply_chunks(struct svcxprt_rdma *xprt,
 385                             struct rpcrdma_write_array *rp_ary,
 386                             struct rpcrdma_msg *rdma_resp,
 387                             struct svc_rqst *rqstp,
 388                             struct svc_rdma_req_map *vec)
 389{
 390        u32 xfer_len = rqstp->rq_res.len;
 391        int write_len;
 392        u32 xdr_off;
 393        int chunk_no;
 394        int chunk_off;
 395        int nchunks;
 396        struct rpcrdma_segment *ch;
 397        struct rpcrdma_write_array *res_ary;
 398        int ret;
 399
 400        /* XXX: need to fix when reply lists occur with read-list and or
 401         * write-list */
 402        res_ary = (struct rpcrdma_write_array *)
 403                &rdma_resp->rm_body.rm_chunks[2];
 404
 405        /* xdr offset starts at RPC message */
 406        nchunks = be32_to_cpu(rp_ary->wc_nchunks);
 407        for (xdr_off = 0, chunk_no = 0;
 408             xfer_len && chunk_no < nchunks;
 409             chunk_no++) {
 410                u64 rs_offset;
 411                ch = &rp_ary->wc_array[chunk_no].wc_target;
 412                write_len = min(xfer_len, be32_to_cpu(ch->rs_length));
 413
 414                /* Prepare the reply chunk given the length actually
 415                 * written */
 416                xdr_decode_hyper((__be32 *)&ch->rs_offset, &rs_offset);
 417                svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
 418                                                ch->rs_handle, ch->rs_offset,
 419                                                write_len);
 420                chunk_off = 0;
 421                while (write_len) {
 422                        ret = send_write(xprt, rqstp,
 423                                         be32_to_cpu(ch->rs_handle),
 424                                         rs_offset + chunk_off,
 425                                         xdr_off,
 426                                         write_len,
 427                                         vec);
 428                        if (ret <= 0)
 429                                goto out_err;
 430                        chunk_off += ret;
 431                        xdr_off += ret;
 432                        xfer_len -= ret;
 433                        write_len -= ret;
 434                }
 435        }
 436        /* Update the req with the number of chunks actually used */
 437        svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
 438
 439        return rqstp->rq_res.len;
 440
 441out_err:
 442        pr_err("svcrdma: failed to send reply chunks, rc=%d\n", ret);
 443        return -EIO;
 444}
 445
 446/* This function prepares the portion of the RPCRDMA message to be
 447 * sent in the RDMA_SEND. This function is called after data sent via
 448 * RDMA has already been transmitted. There are three cases:
 449 * - The RPCRDMA header, RPC header, and payload are all sent in a
 450 *   single RDMA_SEND. This is the "inline" case.
 451 * - The RPCRDMA header and some portion of the RPC header and data
 452 *   are sent via this RDMA_SEND and another portion of the data is
 453 *   sent via RDMA.
 454 * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
 455 *   header and data are all transmitted via RDMA.
 456 * In all three cases, this function prepares the RPCRDMA header in
 457 * sge[0], the 'type' parameter indicates the type to place in the
 458 * RPCRDMA header, and the 'byte_count' field indicates how much of
 459 * the XDR to include in this RDMA_SEND. NB: The offset of the payload
 460 * to send is zero in the XDR.
 461 */
 462static int send_reply(struct svcxprt_rdma *rdma,
 463                      struct svc_rqst *rqstp,
 464                      struct page *page,
 465                      struct rpcrdma_msg *rdma_resp,
 466                      struct svc_rdma_op_ctxt *ctxt,
 467                      struct svc_rdma_req_map *vec,
 468                      int byte_count)
 469{
 470        struct ib_send_wr send_wr;
 471        u32 xdr_off;
 472        int sge_no;
 473        int sge_bytes;
 474        int page_no;
 475        int pages;
 476        int ret;
 477
 478        ret = svc_rdma_repost_recv(rdma, GFP_KERNEL);
 479        if (ret) {
 480                svc_rdma_put_context(ctxt, 0);
 481                return -ENOTCONN;
 482        }
 483
 484        /* Prepare the context */
 485        ctxt->pages[0] = page;
 486        ctxt->count = 1;
 487
 488        /* Prepare the SGE for the RPCRDMA Header */
 489        ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
 490        ctxt->sge[0].length = svc_rdma_xdr_get_reply_hdr_len(rdma_resp);
 491        ctxt->sge[0].addr =
 492            ib_dma_map_page(rdma->sc_cm_id->device, page, 0,
 493                            ctxt->sge[0].length, DMA_TO_DEVICE);
 494        if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
 495                goto err;
 496        atomic_inc(&rdma->sc_dma_used);
 497
 498        ctxt->direction = DMA_TO_DEVICE;
 499
 500        /* Map the payload indicated by 'byte_count' */
 501        xdr_off = 0;
 502        for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) {
 503                sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
 504                byte_count -= sge_bytes;
 505                ctxt->sge[sge_no].addr =
 506                        dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
 507                                    sge_bytes, DMA_TO_DEVICE);
 508                xdr_off += sge_bytes;
 509                if (ib_dma_mapping_error(rdma->sc_cm_id->device,
 510                                         ctxt->sge[sge_no].addr))
 511                        goto err;
 512                atomic_inc(&rdma->sc_dma_used);
 513                ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
 514                ctxt->sge[sge_no].length = sge_bytes;
 515        }
 516        if (byte_count != 0) {
 517                pr_err("svcrdma: Could not map %d bytes\n", byte_count);
 518                goto err;
 519        }
 520
 521        /* Save all respages in the ctxt and remove them from the
 522         * respages array. They are our pages until the I/O
 523         * completes.
 524         */
 525        pages = rqstp->rq_next_page - rqstp->rq_respages;
 526        for (page_no = 0; page_no < pages; page_no++) {
 527                ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
 528                ctxt->count++;
 529                rqstp->rq_respages[page_no] = NULL;
 530                /*
 531                 * If there are more pages than SGE, terminate SGE
 532                 * list so that svc_rdma_unmap_dma doesn't attempt to
 533                 * unmap garbage.
 534                 */
 535                if (page_no+1 >= sge_no)
 536                        ctxt->sge[page_no+1].length = 0;
 537        }
 538        rqstp->rq_next_page = rqstp->rq_respages + 1;
 539
 540        /* The loop above bumps sc_dma_used for each sge. The
 541         * xdr_buf.tail gets a separate sge, but resides in the
 542         * same page as xdr_buf.head. Don't count it twice.
 543         */
 544        if (sge_no > ctxt->count)
 545                atomic_dec(&rdma->sc_dma_used);
 546
 547        if (sge_no > rdma->sc_max_sge) {
 548                pr_err("svcrdma: Too many sges (%d)\n", sge_no);
 549                goto err;
 550        }
 551        memset(&send_wr, 0, sizeof send_wr);
 552        ctxt->cqe.done = svc_rdma_wc_send;
 553        send_wr.wr_cqe = &ctxt->cqe;
 554        send_wr.sg_list = ctxt->sge;
 555        send_wr.num_sge = sge_no;
 556        send_wr.opcode = IB_WR_SEND;
 557        send_wr.send_flags =  IB_SEND_SIGNALED;
 558
 559        ret = svc_rdma_send(rdma, &send_wr);
 560        if (ret)
 561                goto err;
 562
 563        return 0;
 564
 565 err:
 566        svc_rdma_unmap_dma(ctxt);
 567        svc_rdma_put_context(ctxt, 1);
 568        pr_err("svcrdma: failed to send reply, rc=%d\n", ret);
 569        return -EIO;
 570}
 571
 572void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
 573{
 574}
 575
 576int svc_rdma_sendto(struct svc_rqst *rqstp)
 577{
 578        struct svc_xprt *xprt = rqstp->rq_xprt;
 579        struct svcxprt_rdma *rdma =
 580                container_of(xprt, struct svcxprt_rdma, sc_xprt);
 581        struct rpcrdma_msg *rdma_argp;
 582        struct rpcrdma_msg *rdma_resp;
 583        struct rpcrdma_write_array *wr_ary, *rp_ary;
 584        enum rpcrdma_proc reply_type;
 585        int ret;
 586        int inline_bytes;
 587        struct page *res_page;
 588        struct svc_rdma_op_ctxt *ctxt;
 589        struct svc_rdma_req_map *vec;
 590
 591        dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
 592
 593        /* Get the RDMA request header. The receive logic always
 594         * places this at the start of page 0.
 595         */
 596        rdma_argp = page_address(rqstp->rq_pages[0]);
 597        wr_ary = svc_rdma_get_write_array(rdma_argp);
 598        rp_ary = svc_rdma_get_reply_array(rdma_argp, wr_ary);
 599
 600        /* Build an req vec for the XDR */
 601        ctxt = svc_rdma_get_context(rdma);
 602        ctxt->direction = DMA_TO_DEVICE;
 603        vec = svc_rdma_get_req_map(rdma);
 604        ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL);
 605        if (ret)
 606                goto err0;
 607        inline_bytes = rqstp->rq_res.len;
 608
 609        /* Create the RDMA response header */
 610        ret = -ENOMEM;
 611        res_page = alloc_page(GFP_KERNEL);
 612        if (!res_page)
 613                goto err0;
 614        rdma_resp = page_address(res_page);
 615        if (rp_ary)
 616                reply_type = RDMA_NOMSG;
 617        else
 618                reply_type = RDMA_MSG;
 619        svc_rdma_xdr_encode_reply_header(rdma, rdma_argp,
 620                                         rdma_resp, reply_type);
 621
 622        /* Send any write-chunk data and build resp write-list */
 623        if (wr_ary) {
 624                ret = send_write_chunks(rdma, wr_ary, rdma_resp, rqstp, vec);
 625                if (ret < 0)
 626                        goto err1;
 627                inline_bytes -= ret + xdr_padsize(ret);
 628        }
 629
 630        /* Send any reply-list data and update resp reply-list */
 631        if (rp_ary) {
 632                ret = send_reply_chunks(rdma, rp_ary, rdma_resp, rqstp, vec);
 633                if (ret < 0)
 634                        goto err1;
 635                inline_bytes -= ret;
 636        }
 637
 638        ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, vec,
 639                         inline_bytes);
 640        if (ret < 0)
 641                goto err1;
 642
 643        svc_rdma_put_req_map(rdma, vec);
 644        dprintk("svcrdma: send_reply returns %d\n", ret);
 645        return ret;
 646
 647 err1:
 648        put_page(res_page);
 649 err0:
 650        svc_rdma_put_req_map(rdma, vec);
 651        svc_rdma_put_context(ctxt, 0);
 652        set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
 653        return -ENOTCONN;
 654}
 655
 656void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
 657                         int status)
 658{
 659        struct ib_send_wr err_wr;
 660        struct page *p;
 661        struct svc_rdma_op_ctxt *ctxt;
 662        enum rpcrdma_errcode err;
 663        __be32 *va;
 664        int length;
 665        int ret;
 666
 667        ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
 668        if (ret)
 669                return;
 670
 671        p = alloc_page(GFP_KERNEL);
 672        if (!p)
 673                return;
 674        va = page_address(p);
 675
 676        /* XDR encode an error reply */
 677        err = ERR_CHUNK;
 678        if (status == -EPROTONOSUPPORT)
 679                err = ERR_VERS;
 680        length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
 681
 682        ctxt = svc_rdma_get_context(xprt);
 683        ctxt->direction = DMA_TO_DEVICE;
 684        ctxt->count = 1;
 685        ctxt->pages[0] = p;
 686
 687        /* Prepare SGE for local address */
 688        ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
 689        ctxt->sge[0].length = length;
 690        ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
 691                                            p, 0, length, DMA_TO_DEVICE);
 692        if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
 693                dprintk("svcrdma: Error mapping buffer for protocol error\n");
 694                svc_rdma_put_context(ctxt, 1);
 695                return;
 696        }
 697        atomic_inc(&xprt->sc_dma_used);
 698
 699        /* Prepare SEND WR */
 700        memset(&err_wr, 0, sizeof(err_wr));
 701        ctxt->cqe.done = svc_rdma_wc_send;
 702        err_wr.wr_cqe = &ctxt->cqe;
 703        err_wr.sg_list = ctxt->sge;
 704        err_wr.num_sge = 1;
 705        err_wr.opcode = IB_WR_SEND;
 706        err_wr.send_flags = IB_SEND_SIGNALED;
 707
 708        /* Post It */
 709        ret = svc_rdma_send(xprt, &err_wr);
 710        if (ret) {
 711                dprintk("svcrdma: Error %d posting send for protocol error\n",
 712                        ret);
 713                svc_rdma_unmap_dma(ctxt);
 714                svc_rdma_put_context(ctxt, 1);
 715        }
 716}
 717