linux/net/sunrpc/xprtrdma/backchannel.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2015 Oracle.  All rights reserved.
   3 *
   4 * Support for backward direction RPCs on RPC/RDMA.
   5 */
   6
   7#include <linux/module.h>
   8#include <linux/sunrpc/xprt.h>
   9#include <linux/sunrpc/svc.h>
  10#include <linux/sunrpc/svc_xprt.h>
  11
  12#include "xprt_rdma.h"
  13
  14#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  15# define RPCDBG_FACILITY        RPCDBG_TRANS
  16#endif
  17
  18#undef RPCRDMA_BACKCHANNEL_DEBUG
  19
  20static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
  21                                 struct rpc_rqst *rqst)
  22{
  23        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  24        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
  25
  26        spin_lock(&buf->rb_reqslock);
  27        list_del(&req->rl_all);
  28        spin_unlock(&buf->rb_reqslock);
  29
  30        rpcrdma_destroy_req(&r_xprt->rx_ia, req);
  31
  32        kfree(rqst);
  33}
  34
  35static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
  36                                 struct rpc_rqst *rqst)
  37{
  38        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
  39        struct rpcrdma_regbuf *rb;
  40        struct rpcrdma_req *req;
  41        struct xdr_buf *buf;
  42        size_t size;
  43
  44        req = rpcrdma_create_req(r_xprt);
  45        if (IS_ERR(req))
  46                return PTR_ERR(req);
  47        req->rl_backchannel = true;
  48
  49        size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
  50        rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
  51        if (IS_ERR(rb))
  52                goto out_fail;
  53        req->rl_rdmabuf = rb;
  54
  55        size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
  56        rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
  57        if (IS_ERR(rb))
  58                goto out_fail;
  59        rb->rg_owner = req;
  60        req->rl_sendbuf = rb;
  61        /* so that rpcr_to_rdmar works when receiving a request */
  62        rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
  63
  64        buf = &rqst->rq_snd_buf;
  65        buf->head[0].iov_base = rqst->rq_buffer;
  66        buf->head[0].iov_len = 0;
  67        buf->tail[0].iov_base = NULL;
  68        buf->tail[0].iov_len = 0;
  69        buf->page_len = 0;
  70        buf->len = 0;
  71        buf->buflen = size;
  72
  73        return 0;
  74
  75out_fail:
  76        rpcrdma_bc_free_rqst(r_xprt, rqst);
  77        return -ENOMEM;
  78}
  79
  80/* Allocate and add receive buffers to the rpcrdma_buffer's
  81 * existing list of rep's. These are released when the
  82 * transport is destroyed.
  83 */
  84static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
  85                                 unsigned int count)
  86{
  87        struct rpcrdma_rep *rep;
  88        int rc = 0;
  89
  90        while (count--) {
  91                rep = rpcrdma_create_rep(r_xprt);
  92                if (IS_ERR(rep)) {
  93                        pr_err("RPC:       %s: reply buffer alloc failed\n",
  94                               __func__);
  95                        rc = PTR_ERR(rep);
  96                        break;
  97                }
  98
  99                rpcrdma_recv_buffer_put(rep);
 100        }
 101
 102        return rc;
 103}
 104
 105/**
 106 * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
 107 * @xprt: transport associated with these backchannel resources
 108 * @reqs: number of concurrent incoming requests to expect
 109 *
 110 * Returns 0 on success; otherwise a negative errno
 111 */
 112int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
 113{
 114        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 115        struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
 116        struct rpc_rqst *rqst;
 117        unsigned int i;
 118        int rc;
 119
 120        /* The backchannel reply path returns each rpc_rqst to the
 121         * bc_pa_list _after_ the reply is sent. If the server is
 122         * faster than the client, it can send another backward
 123         * direction request before the rpc_rqst is returned to the
 124         * list. The client rejects the request in this case.
 125         *
 126         * Twice as many rpc_rqsts are prepared to ensure there is
 127         * always an rpc_rqst available as soon as a reply is sent.
 128         */
 129        if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
 130                goto out_err;
 131
 132        for (i = 0; i < (reqs << 1); i++) {
 133                rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
 134                if (!rqst) {
 135                        pr_err("RPC:       %s: Failed to create bc rpc_rqst\n",
 136                               __func__);
 137                        goto out_free;
 138                }
 139                dprintk("RPC:       %s: new rqst %p\n", __func__, rqst);
 140
 141                rqst->rq_xprt = &r_xprt->rx_xprt;
 142                INIT_LIST_HEAD(&rqst->rq_list);
 143                INIT_LIST_HEAD(&rqst->rq_bc_list);
 144
 145                if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
 146                        goto out_free;
 147
 148                spin_lock_bh(&xprt->bc_pa_lock);
 149                list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
 150                spin_unlock_bh(&xprt->bc_pa_lock);
 151        }
 152
 153        rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
 154        if (rc)
 155                goto out_free;
 156
 157        rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
 158        if (rc)
 159                goto out_free;
 160
 161        buffer->rb_bc_srv_max_requests = reqs;
 162        request_module("svcrdma");
 163
 164        return 0;
 165
 166out_free:
 167        xprt_rdma_bc_destroy(xprt, reqs);
 168
 169out_err:
 170        pr_err("RPC:       %s: setup backchannel transport failed\n", __func__);
 171        return -ENOMEM;
 172}
 173
 174/**
 175 * xprt_rdma_bc_up - Create transport endpoint for backchannel service
 176 * @serv: server endpoint
 177 * @net: network namespace
 178 *
 179 * The "xprt" is an implied argument: it supplies the name of the
 180 * backchannel transport class.
 181 *
 182 * Returns zero on success, negative errno on failure
 183 */
 184int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net)
 185{
 186        int ret;
 187
 188        ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0);
 189        if (ret < 0)
 190                return ret;
 191        return 0;
 192}
 193
 194/**
 195 * rpcrdma_bc_marshal_reply - Send backwards direction reply
 196 * @rqst: buffer containing RPC reply data
 197 *
 198 * Returns zero on success.
 199 */
 200int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
 201{
 202        struct rpc_xprt *xprt = rqst->rq_xprt;
 203        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 204        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 205        struct rpcrdma_msg *headerp;
 206        size_t rpclen;
 207
 208        headerp = rdmab_to_msg(req->rl_rdmabuf);
 209        headerp->rm_xid = rqst->rq_xid;
 210        headerp->rm_vers = rpcrdma_version;
 211        headerp->rm_credit =
 212                        cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
 213        headerp->rm_type = rdma_msg;
 214        headerp->rm_body.rm_chunks[0] = xdr_zero;
 215        headerp->rm_body.rm_chunks[1] = xdr_zero;
 216        headerp->rm_body.rm_chunks[2] = xdr_zero;
 217
 218        rpclen = rqst->rq_svec[0].iov_len;
 219
 220#ifdef RPCRDMA_BACKCHANNEL_DEBUG
 221        pr_info("RPC:       %s: rpclen %zd headerp 0x%p lkey 0x%x\n",
 222                __func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf));
 223        pr_info("RPC:       %s: RPC/RDMA: %*ph\n",
 224                __func__, (int)RPCRDMA_HDRLEN_MIN, headerp);
 225        pr_info("RPC:       %s:      RPC: %*ph\n",
 226                __func__, (int)rpclen, rqst->rq_svec[0].iov_base);
 227#endif
 228
 229        req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
 230        req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
 231        req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
 232
 233        req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
 234        req->rl_send_iov[1].length = rpclen;
 235        req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
 236
 237        req->rl_niovs = 2;
 238        return 0;
 239}
 240
 241/**
 242 * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
 243 * @xprt: transport associated with these backchannel resources
 244 * @reqs: number of incoming requests to destroy; ignored
 245 */
 246void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
 247{
 248        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 249        struct rpc_rqst *rqst, *tmp;
 250
 251        spin_lock_bh(&xprt->bc_pa_lock);
 252        list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
 253                list_del(&rqst->rq_bc_pa_list);
 254                spin_unlock_bh(&xprt->bc_pa_lock);
 255
 256                rpcrdma_bc_free_rqst(r_xprt, rqst);
 257
 258                spin_lock_bh(&xprt->bc_pa_lock);
 259        }
 260        spin_unlock_bh(&xprt->bc_pa_lock);
 261}
 262
 263/**
 264 * xprt_rdma_bc_free_rqst - Release a backchannel rqst
 265 * @rqst: request to release
 266 */
 267void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
 268{
 269        struct rpc_xprt *xprt = rqst->rq_xprt;
 270
 271        dprintk("RPC:       %s: freeing rqst %p (req %p)\n",
 272                __func__, rqst, rpcr_to_rdmar(rqst));
 273
 274        smp_mb__before_atomic();
 275        WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
 276        clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
 277        smp_mb__after_atomic();
 278
 279        spin_lock_bh(&xprt->bc_pa_lock);
 280        list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
 281        spin_unlock_bh(&xprt->bc_pa_lock);
 282}
 283
 284/**
 285 * rpcrdma_bc_receive_call - Handle a backward direction call
 286 * @xprt: transport receiving the call
 287 * @rep: receive buffer containing the call
 288 *
 289 * Called in the RPC reply handler, which runs in a tasklet.
 290 * Be quick about it.
 291 *
 292 * Operational assumptions:
 293 *    o Backchannel credits are ignored, just as the NFS server
 294 *      forechannel currently does
 295 *    o The ULP manages a replay cache (eg, NFSv4.1 sessions).
 296 *      No replay detection is done at the transport level
 297 */
 298void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
 299                             struct rpcrdma_rep *rep)
 300{
 301        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 302        struct rpcrdma_msg *headerp;
 303        struct svc_serv *bc_serv;
 304        struct rpcrdma_req *req;
 305        struct rpc_rqst *rqst;
 306        struct xdr_buf *buf;
 307        size_t size;
 308        __be32 *p;
 309
 310        headerp = rdmab_to_msg(rep->rr_rdmabuf);
 311#ifdef RPCRDMA_BACKCHANNEL_DEBUG
 312        pr_info("RPC:       %s: callback XID %08x, length=%u\n",
 313                __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len);
 314        pr_info("RPC:       %s: %*ph\n", __func__, rep->rr_len, headerp);
 315#endif
 316
 317        /* Sanity check:
 318         * Need at least enough bytes for RPC/RDMA header, as code
 319         * here references the header fields by array offset. Also,
 320         * backward calls are always inline, so ensure there
 321         * are some bytes beyond the RPC/RDMA header.
 322         */
 323        if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
 324                goto out_short;
 325        p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
 326        size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
 327
 328        /* Grab a free bc rqst */
 329        spin_lock(&xprt->bc_pa_lock);
 330        if (list_empty(&xprt->bc_pa_list)) {
 331                spin_unlock(&xprt->bc_pa_lock);
 332                goto out_overflow;
 333        }
 334        rqst = list_first_entry(&xprt->bc_pa_list,
 335                                struct rpc_rqst, rq_bc_pa_list);
 336        list_del(&rqst->rq_bc_pa_list);
 337        spin_unlock(&xprt->bc_pa_lock);
 338        dprintk("RPC:       %s: using rqst %p\n", __func__, rqst);
 339
 340        /* Prepare rqst */
 341        rqst->rq_reply_bytes_recvd = 0;
 342        rqst->rq_bytes_sent = 0;
 343        rqst->rq_xid = headerp->rm_xid;
 344
 345        rqst->rq_private_buf.len = size;
 346        set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
 347
 348        buf = &rqst->rq_rcv_buf;
 349        memset(buf, 0, sizeof(*buf));
 350        buf->head[0].iov_base = p;
 351        buf->head[0].iov_len = size;
 352        buf->len = size;
 353
 354        /* The receive buffer has to be hooked to the rpcrdma_req
 355         * so that it can be reposted after the server is done
 356         * parsing it but just before sending the backward
 357         * direction reply.
 358         */
 359        req = rpcr_to_rdmar(rqst);
 360        dprintk("RPC:       %s: attaching rep %p to req %p\n",
 361                __func__, rep, req);
 362        req->rl_reply = rep;
 363
 364        /* Defeat the retransmit detection logic in send_request */
 365        req->rl_connect_cookie = 0;
 366
 367        /* Queue rqst for ULP's callback service */
 368        bc_serv = xprt->bc_serv;
 369        spin_lock(&bc_serv->sv_cb_lock);
 370        list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
 371        spin_unlock(&bc_serv->sv_cb_lock);
 372
 373        wake_up(&bc_serv->sv_cb_waitq);
 374
 375        r_xprt->rx_stats.bcall_count++;
 376        return;
 377
 378out_overflow:
 379        pr_warn("RPC/RDMA backchannel overflow\n");
 380        xprt_disconnect_done(xprt);
 381        /* This receive buffer gets reposted automatically
 382         * when the connection is re-established.
 383         */
 384        return;
 385
 386out_short:
 387        pr_warn("RPC/RDMA short backward direction call\n");
 388
 389        if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
 390                xprt_disconnect_done(xprt);
 391        else
 392                pr_warn("RPC:       %s: reposting rep %p\n",
 393                        __func__, rep);
 394}
 395