linux/net/sunrpc/backchannel_rqst.c
<<
>>
Prefs
   1/******************************************************************************
   2
   3(c) 2007 Network Appliance, Inc.  All Rights Reserved.
   4(c) 2009 NetApp.  All Rights Reserved.
   5
   6NetApp provides this source code under the GPL v2 License.
   7The GPL v2 license is available at
   8http://opensource.org/licenses/gpl-license.php.
   9
  10THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  11"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  12LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  13A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
  14CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
  15EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
  16PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
  17PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  18LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  19NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  20SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  21
  22******************************************************************************/
  23
  24#include <linux/tcp.h>
  25#include <linux/slab.h>
  26#include <linux/sunrpc/xprt.h>
  27#include <linux/export.h>
  28#include <linux/sunrpc/bc_xprt.h>
  29
  30#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  31#define RPCDBG_FACILITY RPCDBG_TRANS
  32#endif
  33
  34/*
  35 * Helper routines that track the number of preallocation elements
  36 * on the transport.
  37 */
  38static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
  39{
  40        return xprt->bc_alloc_count < atomic_read(&xprt->bc_free_slots);
  41}
  42
  43static inline void xprt_inc_alloc_count(struct rpc_xprt *xprt, unsigned int n)
  44{
  45        atomic_add(n, &xprt->bc_free_slots);
  46        xprt->bc_alloc_count += n;
  47}
  48
  49static inline int xprt_dec_alloc_count(struct rpc_xprt *xprt, unsigned int n)
  50{
  51        atomic_sub(n, &xprt->bc_free_slots);
  52        return xprt->bc_alloc_count -= n;
  53}
  54
  55/*
  56 * Free the preallocated rpc_rqst structure and the memory
  57 * buffers hanging off of it.
  58 */
  59static void xprt_free_allocation(struct rpc_rqst *req)
  60{
  61        struct xdr_buf *xbufp;
  62
  63        dprintk("RPC:        free allocations for req= %p\n", req);
  64        WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
  65        xbufp = &req->rq_rcv_buf;
  66        free_page((unsigned long)xbufp->head[0].iov_base);
  67        xbufp = &req->rq_snd_buf;
  68        free_page((unsigned long)xbufp->head[0].iov_base);
  69        kfree(req);
  70}
  71
  72static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
  73{
  74        struct page *page;
  75        /* Preallocate one XDR receive buffer */
  76        page = alloc_page(gfp_flags);
  77        if (page == NULL)
  78                return -ENOMEM;
  79        xdr_buf_init(buf, page_address(page), PAGE_SIZE);
  80        return 0;
  81}
  82
  83static
  84struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt, gfp_t gfp_flags)
  85{
  86        struct rpc_rqst *req;
  87
  88        /* Pre-allocate one backchannel rpc_rqst */
  89        req = kzalloc(sizeof(*req), gfp_flags);
  90        if (req == NULL)
  91                return NULL;
  92
  93        req->rq_xprt = xprt;
  94        INIT_LIST_HEAD(&req->rq_list);
  95        INIT_LIST_HEAD(&req->rq_bc_list);
  96
  97        /* Preallocate one XDR receive buffer */
  98        if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) {
  99                printk(KERN_ERR "Failed to create bc receive xbuf\n");
 100                goto out_free;
 101        }
 102        req->rq_rcv_buf.len = PAGE_SIZE;
 103
 104        /* Preallocate one XDR send buffer */
 105        if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) {
 106                printk(KERN_ERR "Failed to create bc snd xbuf\n");
 107                goto out_free;
 108        }
 109        return req;
 110out_free:
 111        xprt_free_allocation(req);
 112        return NULL;
 113}
 114
 115/*
 116 * Preallocate up to min_reqs structures and related buffers for use
 117 * by the backchannel.  This function can be called multiple times
 118 * when creating new sessions that use the same rpc_xprt.  The
 119 * preallocated buffers are added to the pool of resources used by
 120 * the rpc_xprt.  Anyone of these resources may be used used by an
 121 * incoming callback request.  It's up to the higher levels in the
 122 * stack to enforce that the maximum number of session slots is not
 123 * being exceeded.
 124 *
 125 * Some callback arguments can be large.  For example, a pNFS server
 126 * using multiple deviceids.  The list can be unbound, but the client
 127 * has the ability to tell the server the maximum size of the callback
 128 * requests.  Each deviceID is 16 bytes, so allocate one page
 129 * for the arguments to have enough room to receive a number of these
 130 * deviceIDs.  The NFS client indicates to the pNFS server that its
 131 * callback requests can be up to 4096 bytes in size.
 132 */
 133int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
 134{
 135        if (!xprt->ops->bc_setup)
 136                return 0;
 137        return xprt->ops->bc_setup(xprt, min_reqs);
 138}
 139EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
 140
 141int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
 142{
 143        struct rpc_rqst *req;
 144        struct list_head tmp_list;
 145        int i;
 146
 147        dprintk("RPC:       setup backchannel transport\n");
 148
 149        /*
 150         * We use a temporary list to keep track of the preallocated
 151         * buffers.  Once we're done building the list we splice it
 152         * into the backchannel preallocation list off of the rpc_xprt
 153         * struct.  This helps minimize the amount of time the list
 154         * lock is held on the rpc_xprt struct.  It also makes cleanup
 155         * easier in case of memory allocation errors.
 156         */
 157        INIT_LIST_HEAD(&tmp_list);
 158        for (i = 0; i < min_reqs; i++) {
 159                /* Pre-allocate one backchannel rpc_rqst */
 160                req = xprt_alloc_bc_req(xprt, GFP_KERNEL);
 161                if (req == NULL) {
 162                        printk(KERN_ERR "Failed to create bc rpc_rqst\n");
 163                        goto out_free;
 164                }
 165
 166                /* Add the allocated buffer to the tmp list */
 167                dprintk("RPC:       adding req= %p\n", req);
 168                list_add(&req->rq_bc_pa_list, &tmp_list);
 169        }
 170
 171        /*
 172         * Add the temporary list to the backchannel preallocation list
 173         */
 174        spin_lock_bh(&xprt->bc_pa_lock);
 175        list_splice(&tmp_list, &xprt->bc_pa_list);
 176        xprt_inc_alloc_count(xprt, min_reqs);
 177        spin_unlock_bh(&xprt->bc_pa_lock);
 178
 179        dprintk("RPC:       setup backchannel transport done\n");
 180        return 0;
 181
 182out_free:
 183        /*
 184         * Memory allocation failed, free the temporary list
 185         */
 186        while (!list_empty(&tmp_list)) {
 187                req = list_first_entry(&tmp_list,
 188                                struct rpc_rqst,
 189                                rq_bc_pa_list);
 190                list_del(&req->rq_bc_pa_list);
 191                xprt_free_allocation(req);
 192        }
 193
 194        dprintk("RPC:       setup backchannel transport failed\n");
 195        return -ENOMEM;
 196}
 197
 198/**
 199 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
 200 * @xprt:       the transport holding the preallocated strucures
 201 * @max_reqs    the maximum number of preallocated structures to destroy
 202 *
 203 * Since these structures may have been allocated by multiple calls
 204 * to xprt_setup_backchannel, we only destroy up to the maximum number
 205 * of reqs specified by the caller.
 206 */
 207void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
 208{
 209        if (xprt->ops->bc_destroy)
 210                xprt->ops->bc_destroy(xprt, max_reqs);
 211}
 212EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
 213
 214void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
 215{
 216        struct rpc_rqst *req = NULL, *tmp = NULL;
 217
 218        dprintk("RPC:        destroy backchannel transport\n");
 219
 220        if (max_reqs == 0)
 221                goto out;
 222
 223        spin_lock_bh(&xprt->bc_pa_lock);
 224        xprt_dec_alloc_count(xprt, max_reqs);
 225        list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
 226                dprintk("RPC:        req=%p\n", req);
 227                list_del(&req->rq_bc_pa_list);
 228                xprt_free_allocation(req);
 229                if (--max_reqs == 0)
 230                        break;
 231        }
 232        spin_unlock_bh(&xprt->bc_pa_lock);
 233
 234out:
 235        dprintk("RPC:        backchannel list empty= %s\n",
 236                list_empty(&xprt->bc_pa_list) ? "true" : "false");
 237}
 238
 239static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid)
 240{
 241        struct rpc_rqst *req = NULL;
 242
 243        dprintk("RPC:       allocate a backchannel request\n");
 244        if (atomic_read(&xprt->bc_free_slots) <= 0)
 245                goto not_found;
 246        if (list_empty(&xprt->bc_pa_list)) {
 247                req = xprt_alloc_bc_req(xprt, GFP_ATOMIC);
 248                if (!req)
 249                        goto not_found;
 250                list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
 251                xprt->bc_alloc_count++;
 252        }
 253        req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
 254                                rq_bc_pa_list);
 255        req->rq_reply_bytes_recvd = 0;
 256        req->rq_bytes_sent = 0;
 257        memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
 258                        sizeof(req->rq_private_buf));
 259        req->rq_xid = xid;
 260        req->rq_connect_cookie = xprt->connect_cookie;
 261not_found:
 262        dprintk("RPC:       backchannel req=%p\n", req);
 263        return req;
 264}
 265
 266/*
 267 * Return the preallocated rpc_rqst structure and XDR buffers
 268 * associated with this rpc_task.
 269 */
 270void xprt_free_bc_request(struct rpc_rqst *req)
 271{
 272        struct rpc_xprt *xprt = req->rq_xprt;
 273
 274        xprt->ops->bc_free_rqst(req);
 275}
 276
 277void xprt_free_bc_rqst(struct rpc_rqst *req)
 278{
 279        struct rpc_xprt *xprt = req->rq_xprt;
 280
 281        dprintk("RPC:       free backchannel req=%p\n", req);
 282
 283        req->rq_connect_cookie = xprt->connect_cookie - 1;
 284        smp_mb__before_clear_bit();
 285        clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
 286        smp_mb__after_clear_bit();
 287
 288        /*
 289         * Return it to the list of preallocations so that it
 290         * may be reused by a new callback request.
 291         */
 292        spin_lock_bh(&xprt->bc_pa_lock);
 293        if (xprt_need_to_requeue(xprt)) {
 294                list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
 295                xprt->bc_alloc_count++;
 296                req = NULL;
 297        }
 298        spin_unlock_bh(&xprt->bc_pa_lock);
 299        if (req != NULL) {
 300                /*
 301                 * The last remaining session was destroyed while this
 302                 * entry was in use.  Free the entry and don't attempt
 303                 * to add back to the list because there is no need to
 304                 * have anymore preallocated entries.
 305                 */
 306                dprintk("RPC:       Last session removed req=%p\n", req);
 307                xprt_free_allocation(req);
 308                return;
 309        }
 310}
 311
 312/*
 313 * One or more rpc_rqst structure have been preallocated during the
 314 * backchannel setup.  Buffer space for the send and private XDR buffers
 315 * has been preallocated as well.  Use xprt_alloc_bc_request to allocate
 316 * to this request.  Use xprt_free_bc_request to return it.
 317 *
 318 * We know that we're called in soft interrupt context, grab the spin_lock
 319 * since there is no need to grab the bottom half spin_lock.
 320 *
 321 * Return an available rpc_rqst, otherwise NULL if non are available.
 322 */
 323struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid)
 324{
 325        struct rpc_rqst *req;
 326
 327        spin_lock(&xprt->bc_pa_lock);
 328        list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) {
 329                if (req->rq_connect_cookie != xprt->connect_cookie)
 330                        continue;
 331                if (req->rq_xid == xid)
 332                        goto found;
 333        }
 334        req = xprt_alloc_bc_request(xprt, xid);
 335found:
 336        spin_unlock(&xprt->bc_pa_lock);
 337        return req;
 338}
 339
 340/*
 341 * Add callback request to callback list.  The callback
 342 * service sleeps on the sv_cb_waitq waiting for new
 343 * requests.  Wake it up after adding enqueing the
 344 * request.
 345 */
 346void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
 347{
 348        struct rpc_xprt *xprt = req->rq_xprt;
 349        struct svc_serv *bc_serv = xprt->bc_serv;
 350
 351        spin_lock(&xprt->bc_pa_lock);
 352        list_del(&req->rq_bc_pa_list);
 353        xprt_dec_alloc_count(xprt, 1);
 354        spin_unlock(&xprt->bc_pa_lock);
 355
 356        req->rq_private_buf.len = copied;
 357        set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
 358
 359        dprintk("RPC:       add callback request to list\n");
 360        spin_lock(&bc_serv->sv_cb_lock);
 361        list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
 362        wake_up(&bc_serv->sv_cb_waitq);
 363        spin_unlock(&bc_serv->sv_cb_lock);
 364}
 365
 366