linux/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2015 Oracle.  All rights reserved.
   3 *
   4 * Support for backward direction RPCs on RPC/RDMA (server-side).
   5 */
   6
   7#include <linux/sunrpc/svc_rdma.h>
   8#include "xprt_rdma.h"
   9
  10#define RPCDBG_FACILITY RPCDBG_SVCXPRT
  11
  12#undef SVCRDMA_BACKCHANNEL_DEBUG
  13
  14int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp,
  15                             struct xdr_buf *rcvbuf)
  16{
  17        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
  18        struct kvec *dst, *src = &rcvbuf->head[0];
  19        struct rpc_rqst *req;
  20        unsigned long cwnd;
  21        u32 credits;
  22        size_t len;
  23        __be32 xid;
  24        __be32 *p;
  25        int ret;
  26
  27        p = (__be32 *)src->iov_base;
  28        len = src->iov_len;
  29        xid = rmsgp->rm_xid;
  30
  31#ifdef SVCRDMA_BACKCHANNEL_DEBUG
  32        pr_info("%s: xid=%08x, length=%zu\n",
  33                __func__, be32_to_cpu(xid), len);
  34        pr_info("%s: RPC/RDMA: %*ph\n",
  35                __func__, (int)RPCRDMA_HDRLEN_MIN, rmsgp);
  36        pr_info("%s:      RPC: %*ph\n",
  37                __func__, (int)len, p);
  38#endif
  39
  40        ret = -EAGAIN;
  41        if (src->iov_len < 24)
  42                goto out_shortreply;
  43
  44        spin_lock_bh(&xprt->transport_lock);
  45        req = xprt_lookup_rqst(xprt, xid);
  46        if (!req)
  47                goto out_notfound;
  48
  49        dst = &req->rq_private_buf.head[0];
  50        memcpy(&req->rq_private_buf, &req->rq_rcv_buf, sizeof(struct xdr_buf));
  51        if (dst->iov_len < len)
  52                goto out_unlock;
  53        memcpy(dst->iov_base, p, len);
  54
  55        credits = be32_to_cpu(rmsgp->rm_credit);
  56        if (credits == 0)
  57                credits = 1;    /* don't deadlock */
  58        else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
  59                credits = r_xprt->rx_buf.rb_bc_max_requests;
  60
  61        cwnd = xprt->cwnd;
  62        xprt->cwnd = credits << RPC_CWNDSHIFT;
  63        if (xprt->cwnd > cwnd)
  64                xprt_release_rqst_cong(req->rq_task);
  65
  66        ret = 0;
  67        xprt_complete_rqst(req->rq_task, rcvbuf->len);
  68        rcvbuf->len = 0;
  69
  70out_unlock:
  71        spin_unlock_bh(&xprt->transport_lock);
  72out:
  73        return ret;
  74
  75out_shortreply:
  76        dprintk("svcrdma: short bc reply: xprt=%p, len=%zu\n",
  77                xprt, src->iov_len);
  78        goto out;
  79
  80out_notfound:
  81        dprintk("svcrdma: unrecognized bc reply: xprt=%p, xid=%08x\n",
  82                xprt, be32_to_cpu(xid));
  83
  84        goto out_unlock;
  85}
  86
  87/* Send a backwards direction RPC call.
  88 *
  89 * Caller holds the connection's mutex and has already marshaled
  90 * the RPC/RDMA request.
  91 *
  92 * This is similar to svc_rdma_reply, but takes an rpc_rqst
  93 * instead, does not support chunks, and avoids blocking memory
  94 * allocation.
  95 *
  96 * XXX: There is still an opportunity to block in svc_rdma_send()
  97 * if there are no SQ entries to post the Send. This may occur if
  98 * the adapter has a small maximum SQ depth.
  99 */
 100static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
 101                              struct rpc_rqst *rqst)
 102{
 103        struct xdr_buf *sndbuf = &rqst->rq_snd_buf;
 104        struct svc_rdma_op_ctxt *ctxt;
 105        struct svc_rdma_req_map *vec;
 106        struct ib_send_wr send_wr;
 107        int ret;
 108
 109        vec = svc_rdma_get_req_map(rdma);
 110        ret = svc_rdma_map_xdr(rdma, sndbuf, vec, false);
 111        if (ret)
 112                goto out_err;
 113
 114        ret = svc_rdma_repost_recv(rdma, GFP_NOIO);
 115        if (ret)
 116                goto out_err;
 117
 118        ctxt = svc_rdma_get_context(rdma);
 119        ctxt->pages[0] = virt_to_page(rqst->rq_buffer);
 120        ctxt->count = 1;
 121
 122        ctxt->direction = DMA_TO_DEVICE;
 123        ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
 124        ctxt->sge[0].length = sndbuf->len;
 125        ctxt->sge[0].addr =
 126            ib_dma_map_page(rdma->sc_cm_id->device, ctxt->pages[0], 0,
 127                            sndbuf->len, DMA_TO_DEVICE);
 128        if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr)) {
 129                ret = -EIO;
 130                goto out_unmap;
 131        }
 132        atomic_inc(&rdma->sc_dma_used);
 133
 134        memset(&send_wr, 0, sizeof(send_wr));
 135        ctxt->cqe.done = svc_rdma_wc_send;
 136        send_wr.wr_cqe = &ctxt->cqe;
 137        send_wr.sg_list = ctxt->sge;
 138        send_wr.num_sge = 1;
 139        send_wr.opcode = IB_WR_SEND;
 140        send_wr.send_flags = IB_SEND_SIGNALED;
 141
 142        ret = svc_rdma_send(rdma, &send_wr);
 143        if (ret) {
 144                ret = -EIO;
 145                goto out_unmap;
 146        }
 147
 148out_err:
 149        svc_rdma_put_req_map(rdma, vec);
 150        dprintk("svcrdma: %s returns %d\n", __func__, ret);
 151        return ret;
 152
 153out_unmap:
 154        svc_rdma_unmap_dma(ctxt);
 155        svc_rdma_put_context(ctxt, 1);
 156        goto out_err;
 157}
 158
 159/* Server-side transport endpoint wants a whole page for its send
 160 * buffer. The client RPC code constructs the RPC header in this
 161 * buffer before it invokes ->send_request.
 162 *
 163 * Returns NULL if there was a temporary allocation failure.
 164 */
 165static void *
 166xprt_rdma_bc_allocate(struct rpc_task *task, size_t size)
 167{
 168        struct rpc_rqst *rqst = task->tk_rqstp;
 169        struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
 170        struct svcxprt_rdma *rdma;
 171        struct page *page;
 172
 173        rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
 174
 175        /* Prevent an infinite loop: try to make this case work */
 176        if (size > PAGE_SIZE)
 177                WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
 178                          size);
 179
 180        page = alloc_page(RPCRDMA_DEF_GFP);
 181        if (!page)
 182                return NULL;
 183
 184        return page_address(page);
 185}
 186
 187static void
 188xprt_rdma_bc_free(void *buffer)
 189{
 190        /* No-op: ctxt and page have already been freed. */
 191}
 192
 193static int
 194rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
 195{
 196        struct rpc_xprt *xprt = rqst->rq_xprt;
 197        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 198        struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)rqst->rq_buffer;
 199        int rc;
 200
 201        /* Space in the send buffer for an RPC/RDMA header is reserved
 202         * via xprt->tsh_size.
 203         */
 204        headerp->rm_xid = rqst->rq_xid;
 205        headerp->rm_vers = rpcrdma_version;
 206        headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests);
 207        headerp->rm_type = rdma_msg;
 208        headerp->rm_body.rm_chunks[0] = xdr_zero;
 209        headerp->rm_body.rm_chunks[1] = xdr_zero;
 210        headerp->rm_body.rm_chunks[2] = xdr_zero;
 211
 212#ifdef SVCRDMA_BACKCHANNEL_DEBUG
 213        pr_info("%s: %*ph\n", __func__, 64, rqst->rq_buffer);
 214#endif
 215
 216        rc = svc_rdma_bc_sendto(rdma, rqst);
 217        if (rc)
 218                goto drop_connection;
 219        return rc;
 220
 221drop_connection:
 222        dprintk("svcrdma: failed to send bc call\n");
 223        xprt_disconnect_done(xprt);
 224        return -ENOTCONN;
 225}
 226
 227/* Send an RPC call on the passive end of a transport
 228 * connection.
 229 */
 230static int
 231xprt_rdma_bc_send_request(struct rpc_task *task)
 232{
 233        struct rpc_rqst *rqst = task->tk_rqstp;
 234        struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
 235        struct svcxprt_rdma *rdma;
 236        int ret;
 237
 238        dprintk("svcrdma: sending bc call with xid: %08x\n",
 239                be32_to_cpu(rqst->rq_xid));
 240
 241        if (!mutex_trylock(&sxprt->xpt_mutex)) {
 242                rpc_sleep_on(&sxprt->xpt_bc_pending, task, NULL);
 243                if (!mutex_trylock(&sxprt->xpt_mutex))
 244                        return -EAGAIN;
 245                rpc_wake_up_queued_task(&sxprt->xpt_bc_pending, task);
 246        }
 247
 248        ret = -ENOTCONN;
 249        rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
 250        if (!test_bit(XPT_DEAD, &sxprt->xpt_flags))
 251                ret = rpcrdma_bc_send_request(rdma, rqst);
 252
 253        mutex_unlock(&sxprt->xpt_mutex);
 254
 255        if (ret < 0)
 256                return ret;
 257        return 0;
 258}
 259
 260static void
 261xprt_rdma_bc_close(struct rpc_xprt *xprt)
 262{
 263        dprintk("svcrdma: %s: xprt %p\n", __func__, xprt);
 264}
 265
 266static void
 267xprt_rdma_bc_put(struct rpc_xprt *xprt)
 268{
 269        dprintk("svcrdma: %s: xprt %p\n", __func__, xprt);
 270
 271        xprt_free(xprt);
 272        module_put(THIS_MODULE);
 273}
 274
 275static struct rpc_xprt_ops xprt_rdma_bc_procs = {
 276        .reserve_xprt           = xprt_reserve_xprt_cong,
 277        .release_xprt           = xprt_release_xprt_cong,
 278        .alloc_slot             = xprt_alloc_slot,
 279        .release_request        = xprt_release_rqst_cong,
 280        .buf_alloc              = xprt_rdma_bc_allocate,
 281        .buf_free               = xprt_rdma_bc_free,
 282        .send_request           = xprt_rdma_bc_send_request,
 283        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
 284        .close                  = xprt_rdma_bc_close,
 285        .destroy                = xprt_rdma_bc_put,
 286        .print_stats            = xprt_rdma_print_stats
 287};
 288
 289static const struct rpc_timeout xprt_rdma_bc_timeout = {
 290        .to_initval = 60 * HZ,
 291        .to_maxval = 60 * HZ,
 292};
 293
 294/* It shouldn't matter if the number of backchannel session slots
 295 * doesn't match the number of RPC/RDMA credits. That just means
 296 * one or the other will have extra slots that aren't used.
 297 */
 298static struct rpc_xprt *
 299xprt_setup_rdma_bc(struct xprt_create *args)
 300{
 301        struct rpc_xprt *xprt;
 302        struct rpcrdma_xprt *new_xprt;
 303
 304        if (args->addrlen > sizeof(xprt->addr)) {
 305                dprintk("RPC:       %s: address too large\n", __func__);
 306                return ERR_PTR(-EBADF);
 307        }
 308
 309        xprt = xprt_alloc(args->net, sizeof(*new_xprt),
 310                          RPCRDMA_MAX_BC_REQUESTS,
 311                          RPCRDMA_MAX_BC_REQUESTS);
 312        if (!xprt) {
 313                dprintk("RPC:       %s: couldn't allocate rpc_xprt\n",
 314                        __func__);
 315                return ERR_PTR(-ENOMEM);
 316        }
 317
 318        xprt->timeout = &xprt_rdma_bc_timeout;
 319        xprt_set_bound(xprt);
 320        xprt_set_connected(xprt);
 321        xprt->bind_timeout = RPCRDMA_BIND_TO;
 322        xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 323        xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
 324
 325        xprt->prot = XPRT_TRANSPORT_BC_RDMA;
 326        xprt->tsh_size = RPCRDMA_HDRLEN_MIN / sizeof(__be32);
 327        xprt->ops = &xprt_rdma_bc_procs;
 328
 329        memcpy(&xprt->addr, args->dstaddr, args->addrlen);
 330        xprt->addrlen = args->addrlen;
 331        xprt_rdma_format_addresses(xprt, (struct sockaddr *)&xprt->addr);
 332        xprt->resvport = 0;
 333
 334        xprt->max_payload = xprt_rdma_max_inline_read;
 335
 336        new_xprt = rpcx_to_rdmax(xprt);
 337        new_xprt->rx_buf.rb_bc_max_requests = xprt->max_reqs;
 338
 339        xprt_get(xprt);
 340        args->bc_xprt->xpt_bc_xprt = xprt;
 341        xprt->bc_xprt = args->bc_xprt;
 342
 343        if (!try_module_get(THIS_MODULE))
 344                goto out_fail;
 345
 346        /* Final put for backchannel xprt is in __svc_rdma_free */
 347        xprt_get(xprt);
 348        return xprt;
 349
 350out_fail:
 351        xprt_rdma_free_addresses(xprt);
 352        args->bc_xprt->xpt_bc_xprt = NULL;
 353        xprt_put(xprt);
 354        xprt_free(xprt);
 355        return ERR_PTR(-EINVAL);
 356}
 357
 358struct xprt_class xprt_rdma_bc = {
 359        .list                   = LIST_HEAD_INIT(xprt_rdma_bc.list),
 360        .name                   = "rdma backchannel",
 361        .owner                  = THIS_MODULE,
 362        .ident                  = XPRT_TRANSPORT_BC_RDMA,
 363        .setup                  = xprt_setup_rdma_bc,
 364};
 365