linux/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * Copyright (c) 2015-2018 Oracle.  All rights reserved.
   4 *
   5 * Support for reverse-direction RPCs on RPC/RDMA (server-side).
   6 */
   7
   8#include <linux/sunrpc/svc_rdma.h>
   9
  10#include "xprt_rdma.h"
  11#include <trace/events/rpcrdma.h>
  12
  13/**
  14 * svc_rdma_handle_bc_reply - Process incoming backchannel Reply
  15 * @rqstp: resources for handling the Reply
  16 * @rctxt: Received message
  17 *
  18 */
  19void svc_rdma_handle_bc_reply(struct svc_rqst *rqstp,
  20                              struct svc_rdma_recv_ctxt *rctxt)
  21{
  22        struct svc_xprt *sxprt = rqstp->rq_xprt;
  23        struct rpc_xprt *xprt = sxprt->xpt_bc_xprt;
  24        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
  25        struct xdr_buf *rcvbuf = &rqstp->rq_arg;
  26        struct kvec *dst, *src = &rcvbuf->head[0];
  27        __be32 *rdma_resp = rctxt->rc_recv_buf;
  28        struct rpc_rqst *req;
  29        u32 credits;
  30
  31        spin_lock(&xprt->queue_lock);
  32        req = xprt_lookup_rqst(xprt, *rdma_resp);
  33        if (!req)
  34                goto out_unlock;
  35
  36        dst = &req->rq_private_buf.head[0];
  37        memcpy(&req->rq_private_buf, &req->rq_rcv_buf, sizeof(struct xdr_buf));
  38        if (dst->iov_len < src->iov_len)
  39                goto out_unlock;
  40        memcpy(dst->iov_base, src->iov_base, src->iov_len);
  41        xprt_pin_rqst(req);
  42        spin_unlock(&xprt->queue_lock);
  43
  44        credits = be32_to_cpup(rdma_resp + 2);
  45        if (credits == 0)
  46                credits = 1;    /* don't deadlock */
  47        else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
  48                credits = r_xprt->rx_buf.rb_bc_max_requests;
  49        spin_lock(&xprt->transport_lock);
  50        xprt->cwnd = credits << RPC_CWNDSHIFT;
  51        spin_unlock(&xprt->transport_lock);
  52
  53        spin_lock(&xprt->queue_lock);
  54        xprt_complete_rqst(req->rq_task, rcvbuf->len);
  55        xprt_unpin_rqst(req);
  56        rcvbuf->len = 0;
  57
  58out_unlock:
  59        spin_unlock(&xprt->queue_lock);
  60}
  61
  62/* Send a reverse-direction RPC Call.
  63 *
  64 * Caller holds the connection's mutex and has already marshaled
  65 * the RPC/RDMA request.
  66 *
  67 * This is similar to svc_rdma_send_reply_msg, but takes a struct
  68 * rpc_rqst instead, does not support chunks, and avoids blocking
  69 * memory allocation.
  70 *
  71 * XXX: There is still an opportunity to block in svc_rdma_send()
  72 * if there are no SQ entries to post the Send. This may occur if
  73 * the adapter has a small maximum SQ depth.
  74 */
  75static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
  76                              struct rpc_rqst *rqst,
  77                              struct svc_rdma_send_ctxt *sctxt)
  78{
  79        struct svc_rdma_recv_ctxt *rctxt;
  80        int ret;
  81
  82        rctxt = svc_rdma_recv_ctxt_get(rdma);
  83        if (!rctxt)
  84                return -EIO;
  85
  86        ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, &rqst->rq_snd_buf);
  87        svc_rdma_recv_ctxt_put(rdma, rctxt);
  88        if (ret < 0)
  89                return -EIO;
  90
  91        /* Bump page refcnt so Send completion doesn't release
  92         * the rq_buffer before all retransmits are complete.
  93         */
  94        get_page(virt_to_page(rqst->rq_buffer));
  95        sctxt->sc_send_wr.opcode = IB_WR_SEND;
  96        ret = svc_rdma_send(rdma, sctxt);
  97        if (ret < 0)
  98                return ret;
  99
 100        ret = wait_for_completion_killable(&sctxt->sc_done);
 101        svc_rdma_send_ctxt_put(rdma, sctxt);
 102        return ret;
 103}
 104
 105/* Server-side transport endpoint wants a whole page for its send
 106 * buffer. The client RPC code constructs the RPC header in this
 107 * buffer before it invokes ->send_request.
 108 */
 109static int
 110xprt_rdma_bc_allocate(struct rpc_task *task)
 111{
 112        struct rpc_rqst *rqst = task->tk_rqstp;
 113        size_t size = rqst->rq_callsize;
 114        struct page *page;
 115
 116        if (size > PAGE_SIZE) {
 117                WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
 118                          size);
 119                return -EINVAL;
 120        }
 121
 122        page = alloc_page(RPCRDMA_DEF_GFP);
 123        if (!page)
 124                return -ENOMEM;
 125        rqst->rq_buffer = page_address(page);
 126
 127        rqst->rq_rbuffer = kmalloc(rqst->rq_rcvsize, RPCRDMA_DEF_GFP);
 128        if (!rqst->rq_rbuffer) {
 129                put_page(page);
 130                return -ENOMEM;
 131        }
 132        return 0;
 133}
 134
 135static void
 136xprt_rdma_bc_free(struct rpc_task *task)
 137{
 138        struct rpc_rqst *rqst = task->tk_rqstp;
 139
 140        put_page(virt_to_page(rqst->rq_buffer));
 141        kfree(rqst->rq_rbuffer);
 142}
 143
 144static int
 145rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
 146{
 147        struct rpc_xprt *xprt = rqst->rq_xprt;
 148        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 149        struct svc_rdma_send_ctxt *ctxt;
 150        __be32 *p;
 151        int rc;
 152
 153        ctxt = svc_rdma_send_ctxt_get(rdma);
 154        if (!ctxt)
 155                goto drop_connection;
 156
 157        p = xdr_reserve_space(&ctxt->sc_stream, RPCRDMA_HDRLEN_MIN);
 158        if (!p)
 159                goto put_ctxt;
 160        *p++ = rqst->rq_xid;
 161        *p++ = rpcrdma_version;
 162        *p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests);
 163        *p++ = rdma_msg;
 164        *p++ = xdr_zero;
 165        *p++ = xdr_zero;
 166        *p   = xdr_zero;
 167
 168        rqst->rq_xtime = ktime_get();
 169        rc = svc_rdma_bc_sendto(rdma, rqst, ctxt);
 170        if (rc)
 171                goto put_ctxt;
 172        return 0;
 173
 174put_ctxt:
 175        svc_rdma_send_ctxt_put(rdma, ctxt);
 176
 177drop_connection:
 178        return -ENOTCONN;
 179}
 180
 181/**
 182 * xprt_rdma_bc_send_request - Send a reverse-direction Call
 183 * @rqst: rpc_rqst containing Call message to be sent
 184 *
 185 * Return values:
 186 *   %0 if the message was sent successfully
 187 *   %ENOTCONN if the message was not sent
 188 */
 189static int xprt_rdma_bc_send_request(struct rpc_rqst *rqst)
 190{
 191        struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
 192        struct svcxprt_rdma *rdma =
 193                container_of(sxprt, struct svcxprt_rdma, sc_xprt);
 194        int ret;
 195
 196        if (test_bit(XPT_DEAD, &sxprt->xpt_flags))
 197                return -ENOTCONN;
 198
 199        ret = rpcrdma_bc_send_request(rdma, rqst);
 200        if (ret == -ENOTCONN)
 201                svc_close_xprt(sxprt);
 202        return ret;
 203}
 204
 205static void
 206xprt_rdma_bc_close(struct rpc_xprt *xprt)
 207{
 208        xprt_disconnect_done(xprt);
 209        xprt->cwnd = RPC_CWNDSHIFT;
 210}
 211
 212static void
 213xprt_rdma_bc_put(struct rpc_xprt *xprt)
 214{
 215        xprt_rdma_free_addresses(xprt);
 216        xprt_free(xprt);
 217}
 218
 219static const struct rpc_xprt_ops xprt_rdma_bc_procs = {
 220        .reserve_xprt           = xprt_reserve_xprt_cong,
 221        .release_xprt           = xprt_release_xprt_cong,
 222        .alloc_slot             = xprt_alloc_slot,
 223        .free_slot              = xprt_free_slot,
 224        .release_request        = xprt_release_rqst_cong,
 225        .buf_alloc              = xprt_rdma_bc_allocate,
 226        .buf_free               = xprt_rdma_bc_free,
 227        .send_request           = xprt_rdma_bc_send_request,
 228        .wait_for_reply_request = xprt_wait_for_reply_request_def,
 229        .close                  = xprt_rdma_bc_close,
 230        .destroy                = xprt_rdma_bc_put,
 231        .print_stats            = xprt_rdma_print_stats
 232};
 233
 234static const struct rpc_timeout xprt_rdma_bc_timeout = {
 235        .to_initval = 60 * HZ,
 236        .to_maxval = 60 * HZ,
 237};
 238
 239/* It shouldn't matter if the number of backchannel session slots
 240 * doesn't match the number of RPC/RDMA credits. That just means
 241 * one or the other will have extra slots that aren't used.
 242 */
 243static struct rpc_xprt *
 244xprt_setup_rdma_bc(struct xprt_create *args)
 245{
 246        struct rpc_xprt *xprt;
 247        struct rpcrdma_xprt *new_xprt;
 248
 249        if (args->addrlen > sizeof(xprt->addr))
 250                return ERR_PTR(-EBADF);
 251
 252        xprt = xprt_alloc(args->net, sizeof(*new_xprt),
 253                          RPCRDMA_MAX_BC_REQUESTS,
 254                          RPCRDMA_MAX_BC_REQUESTS);
 255        if (!xprt)
 256                return ERR_PTR(-ENOMEM);
 257
 258        xprt->timeout = &xprt_rdma_bc_timeout;
 259        xprt_set_bound(xprt);
 260        xprt_set_connected(xprt);
 261        xprt->bind_timeout = 0;
 262        xprt->reestablish_timeout = 0;
 263        xprt->idle_timeout = 0;
 264
 265        xprt->prot = XPRT_TRANSPORT_BC_RDMA;
 266        xprt->ops = &xprt_rdma_bc_procs;
 267
 268        memcpy(&xprt->addr, args->dstaddr, args->addrlen);
 269        xprt->addrlen = args->addrlen;
 270        xprt_rdma_format_addresses(xprt, (struct sockaddr *)&xprt->addr);
 271        xprt->resvport = 0;
 272
 273        xprt->max_payload = xprt_rdma_max_inline_read;
 274
 275        new_xprt = rpcx_to_rdmax(xprt);
 276        new_xprt->rx_buf.rb_bc_max_requests = xprt->max_reqs;
 277
 278        xprt_get(xprt);
 279        args->bc_xprt->xpt_bc_xprt = xprt;
 280        xprt->bc_xprt = args->bc_xprt;
 281
 282        /* Final put for backchannel xprt is in __svc_rdma_free */
 283        xprt_get(xprt);
 284        return xprt;
 285}
 286
 287struct xprt_class xprt_rdma_bc = {
 288        .list                   = LIST_HEAD_INIT(xprt_rdma_bc.list),
 289        .name                   = "rdma backchannel",
 290        .owner                  = THIS_MODULE,
 291        .ident                  = XPRT_TRANSPORT_BC_RDMA,
 292        .setup                  = xprt_setup_rdma_bc,
 293};
 294