linux/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * Copyright (c) 2015-2018 Oracle.  All rights reserved.
   4 *
   5 * Support for backward direction RPCs on RPC/RDMA (server-side).
   6 */
   7
   8#include <linux/sunrpc/svc_rdma.h>
   9
  10#include "xprt_rdma.h"
  11#include <trace/events/rpcrdma.h>
  12
  13#define RPCDBG_FACILITY RPCDBG_SVCXPRT
  14
  15#undef SVCRDMA_BACKCHANNEL_DEBUG
  16
  17/**
  18 * svc_rdma_handle_bc_reply - Process incoming backchannel reply
  19 * @xprt: controlling backchannel transport
  20 * @rdma_resp: pointer to incoming transport header
  21 * @rcvbuf: XDR buffer into which to decode the reply
  22 *
  23 * Returns:
  24 *      %0 if @rcvbuf is filled in, xprt_complete_rqst called,
  25 *      %-EAGAIN if server should call ->recvfrom again.
  26 */
  27int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
  28                             struct xdr_buf *rcvbuf)
  29{
  30        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
  31        struct kvec *dst, *src = &rcvbuf->head[0];
  32        struct rpc_rqst *req;
  33        u32 credits;
  34        size_t len;
  35        __be32 xid;
  36        __be32 *p;
  37        int ret;
  38
  39        p = (__be32 *)src->iov_base;
  40        len = src->iov_len;
  41        xid = *rdma_resp;
  42
  43#ifdef SVCRDMA_BACKCHANNEL_DEBUG
  44        pr_info("%s: xid=%08x, length=%zu\n",
  45                __func__, be32_to_cpu(xid), len);
  46        pr_info("%s: RPC/RDMA: %*ph\n",
  47                __func__, (int)RPCRDMA_HDRLEN_MIN, rdma_resp);
  48        pr_info("%s:      RPC: %*ph\n",
  49                __func__, (int)len, p);
  50#endif
  51
  52        ret = -EAGAIN;
  53        if (src->iov_len < 24)
  54                goto out_shortreply;
  55
  56        spin_lock(&xprt->queue_lock);
  57        req = xprt_lookup_rqst(xprt, xid);
  58        if (!req)
  59                goto out_notfound;
  60
  61        dst = &req->rq_private_buf.head[0];
  62        memcpy(&req->rq_private_buf, &req->rq_rcv_buf, sizeof(struct xdr_buf));
  63        if (dst->iov_len < len)
  64                goto out_unlock;
  65        memcpy(dst->iov_base, p, len);
  66        xprt_pin_rqst(req);
  67        spin_unlock(&xprt->queue_lock);
  68
  69        credits = be32_to_cpup(rdma_resp + 2);
  70        if (credits == 0)
  71                credits = 1;    /* don't deadlock */
  72        else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
  73                credits = r_xprt->rx_buf.rb_bc_max_requests;
  74
  75        spin_lock(&xprt->transport_lock);
  76        xprt->cwnd = credits << RPC_CWNDSHIFT;
  77        spin_unlock(&xprt->transport_lock);
  78
  79        spin_lock(&xprt->queue_lock);
  80        ret = 0;
  81        xprt_complete_rqst(req->rq_task, rcvbuf->len);
  82        xprt_unpin_rqst(req);
  83        rcvbuf->len = 0;
  84
  85out_unlock:
  86        spin_unlock(&xprt->queue_lock);
  87out:
  88        return ret;
  89
  90out_shortreply:
  91        dprintk("svcrdma: short bc reply: xprt=%p, len=%zu\n",
  92                xprt, src->iov_len);
  93        goto out;
  94
  95out_notfound:
  96        dprintk("svcrdma: unrecognized bc reply: xprt=%p, xid=%08x\n",
  97                xprt, be32_to_cpu(xid));
  98        goto out_unlock;
  99}
 100
 101/* Send a backwards direction RPC call.
 102 *
 103 * Caller holds the connection's mutex and has already marshaled
 104 * the RPC/RDMA request.
 105 *
 106 * This is similar to svc_rdma_send_reply_msg, but takes a struct
 107 * rpc_rqst instead, does not support chunks, and avoids blocking
 108 * memory allocation.
 109 *
 110 * XXX: There is still an opportunity to block in svc_rdma_send()
 111 * if there are no SQ entries to post the Send. This may occur if
 112 * the adapter has a small maximum SQ depth.
 113 */
 114static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
 115                              struct rpc_rqst *rqst,
 116                              struct svc_rdma_send_ctxt *ctxt)
 117{
 118        int ret;
 119
 120        ret = svc_rdma_map_reply_msg(rdma, ctxt, &rqst->rq_snd_buf, NULL);
 121        if (ret < 0)
 122                return -EIO;
 123
 124        /* Bump page refcnt so Send completion doesn't release
 125         * the rq_buffer before all retransmits are complete.
 126         */
 127        get_page(virt_to_page(rqst->rq_buffer));
 128        ctxt->sc_send_wr.opcode = IB_WR_SEND;
 129        return svc_rdma_send(rdma, &ctxt->sc_send_wr);
 130}
 131
 132/* Server-side transport endpoint wants a whole page for its send
 133 * buffer. The client RPC code constructs the RPC header in this
 134 * buffer before it invokes ->send_request.
 135 */
 136static int
 137xprt_rdma_bc_allocate(struct rpc_task *task)
 138{
 139        struct rpc_rqst *rqst = task->tk_rqstp;
 140        size_t size = rqst->rq_callsize;
 141        struct page *page;
 142
 143        if (size > PAGE_SIZE) {
 144                WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
 145                          size);
 146                return -EINVAL;
 147        }
 148
 149        page = alloc_page(RPCRDMA_DEF_GFP);
 150        if (!page)
 151                return -ENOMEM;
 152        rqst->rq_buffer = page_address(page);
 153
 154        rqst->rq_rbuffer = kmalloc(rqst->rq_rcvsize, RPCRDMA_DEF_GFP);
 155        if (!rqst->rq_rbuffer) {
 156                put_page(page);
 157                return -ENOMEM;
 158        }
 159        return 0;
 160}
 161
 162static void
 163xprt_rdma_bc_free(struct rpc_task *task)
 164{
 165        struct rpc_rqst *rqst = task->tk_rqstp;
 166
 167        put_page(virt_to_page(rqst->rq_buffer));
 168        kfree(rqst->rq_rbuffer);
 169}
 170
 171static int
 172rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
 173{
 174        struct rpc_xprt *xprt = rqst->rq_xprt;
 175        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 176        struct svc_rdma_send_ctxt *ctxt;
 177        __be32 *p;
 178        int rc;
 179
 180        ctxt = svc_rdma_send_ctxt_get(rdma);
 181        if (!ctxt)
 182                goto drop_connection;
 183
 184        p = ctxt->sc_xprt_buf;
 185        *p++ = rqst->rq_xid;
 186        *p++ = rpcrdma_version;
 187        *p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests);
 188        *p++ = rdma_msg;
 189        *p++ = xdr_zero;
 190        *p++ = xdr_zero;
 191        *p   = xdr_zero;
 192        svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_MIN);
 193
 194#ifdef SVCRDMA_BACKCHANNEL_DEBUG
 195        pr_info("%s: %*ph\n", __func__, 64, rqst->rq_buffer);
 196#endif
 197
 198        rc = svc_rdma_bc_sendto(rdma, rqst, ctxt);
 199        if (rc) {
 200                svc_rdma_send_ctxt_put(rdma, ctxt);
 201                goto drop_connection;
 202        }
 203        return 0;
 204
 205drop_connection:
 206        dprintk("svcrdma: failed to send bc call\n");
 207        return -ENOTCONN;
 208}
 209
 210/* Send an RPC call on the passive end of a transport
 211 * connection.
 212 */
 213static int
 214xprt_rdma_bc_send_request(struct rpc_rqst *rqst)
 215{
 216        struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
 217        struct svcxprt_rdma *rdma;
 218        int ret;
 219
 220        dprintk("svcrdma: sending bc call with xid: %08x\n",
 221                be32_to_cpu(rqst->rq_xid));
 222
 223        mutex_lock(&sxprt->xpt_mutex);
 224
 225        ret = -ENOTCONN;
 226        rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
 227        if (!test_bit(XPT_DEAD, &sxprt->xpt_flags)) {
 228                ret = rpcrdma_bc_send_request(rdma, rqst);
 229                if (ret == -ENOTCONN)
 230                        svc_close_xprt(sxprt);
 231        }
 232
 233        mutex_unlock(&sxprt->xpt_mutex);
 234
 235        if (ret < 0)
 236                return ret;
 237        return 0;
 238}
 239
 240static void
 241xprt_rdma_bc_close(struct rpc_xprt *xprt)
 242{
 243        dprintk("svcrdma: %s: xprt %p\n", __func__, xprt);
 244        xprt->cwnd = RPC_CWNDSHIFT;
 245}
 246
 247static void
 248xprt_rdma_bc_put(struct rpc_xprt *xprt)
 249{
 250        dprintk("svcrdma: %s: xprt %p\n", __func__, xprt);
 251
 252        xprt_free(xprt);
 253}
 254
 255static const struct rpc_xprt_ops xprt_rdma_bc_procs = {
 256        .reserve_xprt           = xprt_reserve_xprt_cong,
 257        .release_xprt           = xprt_release_xprt_cong,
 258        .alloc_slot             = xprt_alloc_slot,
 259        .free_slot              = xprt_free_slot,
 260        .release_request        = xprt_release_rqst_cong,
 261        .buf_alloc              = xprt_rdma_bc_allocate,
 262        .buf_free               = xprt_rdma_bc_free,
 263        .send_request           = xprt_rdma_bc_send_request,
 264        .wait_for_reply_request = xprt_wait_for_reply_request_def,
 265        .close                  = xprt_rdma_bc_close,
 266        .destroy                = xprt_rdma_bc_put,
 267        .print_stats            = xprt_rdma_print_stats
 268};
 269
 270static const struct rpc_timeout xprt_rdma_bc_timeout = {
 271        .to_initval = 60 * HZ,
 272        .to_maxval = 60 * HZ,
 273};
 274
 275/* It shouldn't matter if the number of backchannel session slots
 276 * doesn't match the number of RPC/RDMA credits. That just means
 277 * one or the other will have extra slots that aren't used.
 278 */
 279static struct rpc_xprt *
 280xprt_setup_rdma_bc(struct xprt_create *args)
 281{
 282        struct rpc_xprt *xprt;
 283        struct rpcrdma_xprt *new_xprt;
 284
 285        if (args->addrlen > sizeof(xprt->addr)) {
 286                dprintk("RPC:       %s: address too large\n", __func__);
 287                return ERR_PTR(-EBADF);
 288        }
 289
 290        xprt = xprt_alloc(args->net, sizeof(*new_xprt),
 291                          RPCRDMA_MAX_BC_REQUESTS,
 292                          RPCRDMA_MAX_BC_REQUESTS);
 293        if (!xprt) {
 294                dprintk("RPC:       %s: couldn't allocate rpc_xprt\n",
 295                        __func__);
 296                return ERR_PTR(-ENOMEM);
 297        }
 298
 299        xprt->timeout = &xprt_rdma_bc_timeout;
 300        xprt_set_bound(xprt);
 301        xprt_set_connected(xprt);
 302        xprt->bind_timeout = RPCRDMA_BIND_TO;
 303        xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 304        xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
 305
 306        xprt->prot = XPRT_TRANSPORT_BC_RDMA;
 307        xprt->ops = &xprt_rdma_bc_procs;
 308
 309        memcpy(&xprt->addr, args->dstaddr, args->addrlen);
 310        xprt->addrlen = args->addrlen;
 311        xprt_rdma_format_addresses(xprt, (struct sockaddr *)&xprt->addr);
 312        xprt->resvport = 0;
 313
 314        xprt->max_payload = xprt_rdma_max_inline_read;
 315
 316        new_xprt = rpcx_to_rdmax(xprt);
 317        new_xprt->rx_buf.rb_bc_max_requests = xprt->max_reqs;
 318
 319        xprt_get(xprt);
 320        args->bc_xprt->xpt_bc_xprt = xprt;
 321        xprt->bc_xprt = args->bc_xprt;
 322
 323        /* Final put for backchannel xprt is in __svc_rdma_free */
 324        xprt_get(xprt);
 325        return xprt;
 326}
 327
 328struct xprt_class xprt_rdma_bc = {
 329        .list                   = LIST_HEAD_INIT(xprt_rdma_bc.list),
 330        .name                   = "rdma backchannel",
 331        .owner                  = THIS_MODULE,
 332        .ident                  = XPRT_TRANSPORT_BC_RDMA,
 333        .setup                  = xprt_setup_rdma_bc,
 334};
 335