linux/net/sunrpc/xprtrdma/backchannel.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2015 Oracle.  All rights reserved.
   3 *
   4 * Support for backward direction RPCs on RPC/RDMA.
   5 */
   6
   7#include <linux/module.h>
   8#include <linux/sunrpc/xprt.h>
   9#include <linux/sunrpc/svc.h>
  10#include <linux/sunrpc/svc_xprt.h>
  11
  12#include "xprt_rdma.h"
  13
  14#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  15# define RPCDBG_FACILITY        RPCDBG_TRANS
  16#endif
  17
  18#undef RPCRDMA_BACKCHANNEL_DEBUG
  19
  20static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
  21                                 struct rpc_rqst *rqst)
  22{
  23        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  24        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
  25
  26        spin_lock(&buf->rb_reqslock);
  27        list_del(&req->rl_all);
  28        spin_unlock(&buf->rb_reqslock);
  29
  30        rpcrdma_destroy_req(req);
  31
  32        kfree(rqst);
  33}
  34
  35static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
  36                                 struct rpc_rqst *rqst)
  37{
  38        struct rpcrdma_regbuf *rb;
  39        struct rpcrdma_req *req;
  40        size_t size;
  41
  42        req = rpcrdma_create_req(r_xprt);
  43        if (IS_ERR(req))
  44                return PTR_ERR(req);
  45        req->rl_backchannel = true;
  46
  47        rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
  48                                  DMA_TO_DEVICE, GFP_KERNEL);
  49        if (IS_ERR(rb))
  50                goto out_fail;
  51        req->rl_rdmabuf = rb;
  52
  53        size = r_xprt->rx_data.inline_rsize;
  54        rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL);
  55        if (IS_ERR(rb))
  56                goto out_fail;
  57        req->rl_sendbuf = rb;
  58        xdr_buf_init(&rqst->rq_snd_buf, rb->rg_base, size);
  59        rpcrdma_set_xprtdata(rqst, req);
  60        return 0;
  61
  62out_fail:
  63        rpcrdma_bc_free_rqst(r_xprt, rqst);
  64        return -ENOMEM;
  65}
  66
  67/* Allocate and add receive buffers to the rpcrdma_buffer's
  68 * existing list of rep's. These are released when the
  69 * transport is destroyed.
  70 */
  71static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
  72                                 unsigned int count)
  73{
  74        struct rpcrdma_rep *rep;
  75        int rc = 0;
  76
  77        while (count--) {
  78                rep = rpcrdma_create_rep(r_xprt);
  79                if (IS_ERR(rep)) {
  80                        pr_err("RPC:       %s: reply buffer alloc failed\n",
  81                               __func__);
  82                        rc = PTR_ERR(rep);
  83                        break;
  84                }
  85
  86                rpcrdma_recv_buffer_put(rep);
  87        }
  88
  89        return rc;
  90}
  91
  92/**
  93 * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
  94 * @xprt: transport associated with these backchannel resources
  95 * @reqs: number of concurrent incoming requests to expect
  96 *
  97 * Returns 0 on success; otherwise a negative errno
  98 */
  99int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
 100{
 101        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 102        struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
 103        struct rpc_rqst *rqst;
 104        unsigned int i;
 105        int rc;
 106
 107        /* The backchannel reply path returns each rpc_rqst to the
 108         * bc_pa_list _after_ the reply is sent. If the server is
 109         * faster than the client, it can send another backward
 110         * direction request before the rpc_rqst is returned to the
 111         * list. The client rejects the request in this case.
 112         *
 113         * Twice as many rpc_rqsts are prepared to ensure there is
 114         * always an rpc_rqst available as soon as a reply is sent.
 115         */
 116        if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
 117                goto out_err;
 118
 119        for (i = 0; i < (reqs << 1); i++) {
 120                rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
 121                if (!rqst) {
 122                        pr_err("RPC:       %s: Failed to create bc rpc_rqst\n",
 123                               __func__);
 124                        goto out_free;
 125                }
 126                dprintk("RPC:       %s: new rqst %p\n", __func__, rqst);
 127
 128                rqst->rq_xprt = &r_xprt->rx_xprt;
 129                INIT_LIST_HEAD(&rqst->rq_list);
 130                INIT_LIST_HEAD(&rqst->rq_bc_list);
 131
 132                if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
 133                        goto out_free;
 134
 135                spin_lock_bh(&xprt->bc_pa_lock);
 136                list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
 137                spin_unlock_bh(&xprt->bc_pa_lock);
 138        }
 139
 140        rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
 141        if (rc)
 142                goto out_free;
 143
 144        rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
 145        if (rc)
 146                goto out_free;
 147
 148        buffer->rb_bc_srv_max_requests = reqs;
 149        request_module("svcrdma");
 150
 151        return 0;
 152
 153out_free:
 154        xprt_rdma_bc_destroy(xprt, reqs);
 155
 156out_err:
 157        pr_err("RPC:       %s: setup backchannel transport failed\n", __func__);
 158        return -ENOMEM;
 159}
 160
 161/**
 162 * xprt_rdma_bc_up - Create transport endpoint for backchannel service
 163 * @serv: server endpoint
 164 * @net: network namespace
 165 *
 166 * The "xprt" is an implied argument: it supplies the name of the
 167 * backchannel transport class.
 168 *
 169 * Returns zero on success, negative errno on failure
 170 */
 171int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net)
 172{
 173        int ret;
 174
 175        ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0);
 176        if (ret < 0)
 177                return ret;
 178        return 0;
 179}
 180
 181/**
 182 * xprt_rdma_bc_maxpayload - Return maximum backchannel message size
 183 * @xprt: transport
 184 *
 185 * Returns maximum size, in bytes, of a backchannel message
 186 */
 187size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
 188{
 189        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 190        struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
 191        size_t maxmsg;
 192
 193        maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize);
 194        return maxmsg - RPCRDMA_HDRLEN_MIN;
 195}
 196
 197/**
 198 * rpcrdma_bc_marshal_reply - Send backwards direction reply
 199 * @rqst: buffer containing RPC reply data
 200 *
 201 * Returns zero on success.
 202 */
 203int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
 204{
 205        struct rpc_xprt *xprt = rqst->rq_xprt;
 206        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 207        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 208        struct rpcrdma_msg *headerp;
 209
 210        headerp = rdmab_to_msg(req->rl_rdmabuf);
 211        headerp->rm_xid = rqst->rq_xid;
 212        headerp->rm_vers = rpcrdma_version;
 213        headerp->rm_credit =
 214                        cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
 215        headerp->rm_type = rdma_msg;
 216        headerp->rm_body.rm_chunks[0] = xdr_zero;
 217        headerp->rm_body.rm_chunks[1] = xdr_zero;
 218        headerp->rm_body.rm_chunks[2] = xdr_zero;
 219
 220        if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, RPCRDMA_HDRLEN_MIN,
 221                                       &rqst->rq_snd_buf, rpcrdma_noch))
 222                return -EIO;
 223        return 0;
 224}
 225
 226/**
 227 * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
 228 * @xprt: transport associated with these backchannel resources
 229 * @reqs: number of incoming requests to destroy; ignored
 230 */
 231void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
 232{
 233        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 234        struct rpc_rqst *rqst, *tmp;
 235
 236        spin_lock_bh(&xprt->bc_pa_lock);
 237        list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
 238                list_del(&rqst->rq_bc_pa_list);
 239                spin_unlock_bh(&xprt->bc_pa_lock);
 240
 241                rpcrdma_bc_free_rqst(r_xprt, rqst);
 242
 243                spin_lock_bh(&xprt->bc_pa_lock);
 244        }
 245        spin_unlock_bh(&xprt->bc_pa_lock);
 246}
 247
 248/**
 249 * xprt_rdma_bc_free_rqst - Release a backchannel rqst
 250 * @rqst: request to release
 251 */
 252void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
 253{
 254        struct rpc_xprt *xprt = rqst->rq_xprt;
 255
 256        dprintk("RPC:       %s: freeing rqst %p (req %p)\n",
 257                __func__, rqst, rpcr_to_rdmar(rqst));
 258
 259        smp_mb__before_atomic();
 260        WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
 261        clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
 262        smp_mb__after_atomic();
 263
 264        spin_lock_bh(&xprt->bc_pa_lock);
 265        list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
 266        spin_unlock_bh(&xprt->bc_pa_lock);
 267}
 268
 269/**
 270 * rpcrdma_bc_receive_call - Handle a backward direction call
 271 * @xprt: transport receiving the call
 272 * @rep: receive buffer containing the call
 273 *
 274 * Called in the RPC reply handler, which runs in a tasklet.
 275 * Be quick about it.
 276 *
 277 * Operational assumptions:
 278 *    o Backchannel credits are ignored, just as the NFS server
 279 *      forechannel currently does
 280 *    o The ULP manages a replay cache (eg, NFSv4.1 sessions).
 281 *      No replay detection is done at the transport level
 282 */
 283void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
 284                             struct rpcrdma_rep *rep)
 285{
 286        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 287        struct rpcrdma_msg *headerp;
 288        struct svc_serv *bc_serv;
 289        struct rpcrdma_req *req;
 290        struct rpc_rqst *rqst;
 291        struct xdr_buf *buf;
 292        size_t size;
 293        __be32 *p;
 294
 295        headerp = rdmab_to_msg(rep->rr_rdmabuf);
 296#ifdef RPCRDMA_BACKCHANNEL_DEBUG
 297        pr_info("RPC:       %s: callback XID %08x, length=%u\n",
 298                __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len);
 299        pr_info("RPC:       %s: %*ph\n", __func__, rep->rr_len, headerp);
 300#endif
 301
 302        /* Sanity check:
 303         * Need at least enough bytes for RPC/RDMA header, as code
 304         * here references the header fields by array offset. Also,
 305         * backward calls are always inline, so ensure there
 306         * are some bytes beyond the RPC/RDMA header.
 307         */
 308        if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
 309                goto out_short;
 310        p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
 311        size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
 312
 313        /* Grab a free bc rqst */
 314        spin_lock(&xprt->bc_pa_lock);
 315        if (list_empty(&xprt->bc_pa_list)) {
 316                spin_unlock(&xprt->bc_pa_lock);
 317                goto out_overflow;
 318        }
 319        rqst = list_first_entry(&xprt->bc_pa_list,
 320                                struct rpc_rqst, rq_bc_pa_list);
 321        list_del(&rqst->rq_bc_pa_list);
 322        spin_unlock(&xprt->bc_pa_lock);
 323        dprintk("RPC:       %s: using rqst %p\n", __func__, rqst);
 324
 325        /* Prepare rqst */
 326        rqst->rq_reply_bytes_recvd = 0;
 327        rqst->rq_bytes_sent = 0;
 328        rqst->rq_xid = headerp->rm_xid;
 329
 330        rqst->rq_private_buf.len = size;
 331        set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
 332
 333        buf = &rqst->rq_rcv_buf;
 334        memset(buf, 0, sizeof(*buf));
 335        buf->head[0].iov_base = p;
 336        buf->head[0].iov_len = size;
 337        buf->len = size;
 338
 339        /* The receive buffer has to be hooked to the rpcrdma_req
 340         * so that it can be reposted after the server is done
 341         * parsing it but just before sending the backward
 342         * direction reply.
 343         */
 344        req = rpcr_to_rdmar(rqst);
 345        dprintk("RPC:       %s: attaching rep %p to req %p\n",
 346                __func__, rep, req);
 347        req->rl_reply = rep;
 348
 349        /* Defeat the retransmit detection logic in send_request */
 350        req->rl_connect_cookie = 0;
 351
 352        /* Queue rqst for ULP's callback service */
 353        bc_serv = xprt->bc_serv;
 354        spin_lock(&bc_serv->sv_cb_lock);
 355        list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
 356        spin_unlock(&bc_serv->sv_cb_lock);
 357
 358        wake_up(&bc_serv->sv_cb_waitq);
 359
 360        r_xprt->rx_stats.bcall_count++;
 361        return;
 362
 363out_overflow:
 364        pr_warn("RPC/RDMA backchannel overflow\n");
 365        xprt_disconnect_done(xprt);
 366        /* This receive buffer gets reposted automatically
 367         * when the connection is re-established.
 368         */
 369        return;
 370
 371out_short:
 372        pr_warn("RPC/RDMA short backward direction call\n");
 373
 374        if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
 375                xprt_disconnect_done(xprt);
 376        else
 377                pr_warn("RPC:       %s: reposting rep %p\n",
 378                        __func__, rep);
 379}
 380