linux/net/sunrpc/xprtrdma/transport.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the BSD-type
   8 * license below:
   9 *
  10 * Redistribution and use in source and binary forms, with or without
  11 * modification, are permitted provided that the following conditions
  12 * are met:
  13 *
  14 *      Redistributions of source code must retain the above copyright
  15 *      notice, this list of conditions and the following disclaimer.
  16 *
  17 *      Redistributions in binary form must reproduce the above
  18 *      copyright notice, this list of conditions and the following
  19 *      disclaimer in the documentation and/or other materials provided
  20 *      with the distribution.
  21 *
  22 *      Neither the name of the Network Appliance, Inc. nor the names of
  23 *      its contributors may be used to endorse or promote products
  24 *      derived from this software without specific prior written
  25 *      permission.
  26 *
  27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  38 */
  39
  40/*
  41 * transport.c
  42 *
  43 * This file contains the top-level implementation of an RPC RDMA
  44 * transport.
  45 *
  46 * Naming convention: functions beginning with xprt_ are part of the
  47 * transport switch. All others are RPC RDMA internal.
  48 */
  49
  50#include <linux/module.h>
  51#include <linux/slab.h>
  52#include <linux/seq_file.h>
  53#include <linux/sunrpc/addr.h>
  54
  55#include "xprt_rdma.h"
  56
  57#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  58# define RPCDBG_FACILITY        RPCDBG_TRANS
  59#endif
  60
  61/*
  62 * tunables
  63 */
  64
  65static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
  66unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
  67static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
  68static unsigned int xprt_rdma_inline_write_padding;
  69unsigned int xprt_rdma_memreg_strategy          = RPCRDMA_FRMR;
  70int xprt_rdma_pad_optimize;
  71
  72#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  73
  74static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
  75static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
  76static unsigned int min_inline_size = RPCRDMA_MIN_INLINE;
  77static unsigned int max_inline_size = RPCRDMA_MAX_INLINE;
  78static unsigned int zero;
  79static unsigned int max_padding = PAGE_SIZE;
  80static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
  81static unsigned int max_memreg = RPCRDMA_LAST - 1;
  82
  83static struct ctl_table_header *sunrpc_table_header;
  84
  85static struct ctl_table xr_tunables_table[] = {
  86        {
  87                .procname       = "rdma_slot_table_entries",
  88                .data           = &xprt_rdma_slot_table_entries,
  89                .maxlen         = sizeof(unsigned int),
  90                .mode           = 0644,
  91                .proc_handler   = proc_dointvec_minmax,
  92                .extra1         = &min_slot_table_size,
  93                .extra2         = &max_slot_table_size
  94        },
  95        {
  96                .procname       = "rdma_max_inline_read",
  97                .data           = &xprt_rdma_max_inline_read,
  98                .maxlen         = sizeof(unsigned int),
  99                .mode           = 0644,
 100                .proc_handler   = proc_dointvec_minmax,
 101                .extra1         = &min_inline_size,
 102                .extra2         = &max_inline_size,
 103        },
 104        {
 105                .procname       = "rdma_max_inline_write",
 106                .data           = &xprt_rdma_max_inline_write,
 107                .maxlen         = sizeof(unsigned int),
 108                .mode           = 0644,
 109                .proc_handler   = proc_dointvec_minmax,
 110                .extra1         = &min_inline_size,
 111                .extra2         = &max_inline_size,
 112        },
 113        {
 114                .procname       = "rdma_inline_write_padding",
 115                .data           = &xprt_rdma_inline_write_padding,
 116                .maxlen         = sizeof(unsigned int),
 117                .mode           = 0644,
 118                .proc_handler   = proc_dointvec_minmax,
 119                .extra1         = &zero,
 120                .extra2         = &max_padding,
 121        },
 122        {
 123                .procname       = "rdma_memreg_strategy",
 124                .data           = &xprt_rdma_memreg_strategy,
 125                .maxlen         = sizeof(unsigned int),
 126                .mode           = 0644,
 127                .proc_handler   = proc_dointvec_minmax,
 128                .extra1         = &min_memreg,
 129                .extra2         = &max_memreg,
 130        },
 131        {
 132                .procname       = "rdma_pad_optimize",
 133                .data           = &xprt_rdma_pad_optimize,
 134                .maxlen         = sizeof(unsigned int),
 135                .mode           = 0644,
 136                .proc_handler   = proc_dointvec,
 137        },
 138        { },
 139};
 140
 141static struct ctl_table sunrpc_table[] = {
 142        {
 143                .procname       = "sunrpc",
 144                .mode           = 0555,
 145                .child          = xr_tunables_table
 146        },
 147        { },
 148};
 149
 150#endif
 151
 152static struct rpc_xprt_ops xprt_rdma_procs;     /*forward reference */
 153
 154static void
 155xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap)
 156{
 157        struct sockaddr_in *sin = (struct sockaddr_in *)sap;
 158        char buf[20];
 159
 160        snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
 161        xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
 162
 163        xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA;
 164}
 165
 166static void
 167xprt_rdma_format_addresses6(struct rpc_xprt *xprt, struct sockaddr *sap)
 168{
 169        struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)sap;
 170        char buf[40];
 171
 172        snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
 173        xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
 174
 175        xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA6;
 176}
 177
 178void
 179xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap)
 180{
 181        char buf[128];
 182
 183        switch (sap->sa_family) {
 184        case AF_INET:
 185                xprt_rdma_format_addresses4(xprt, sap);
 186                break;
 187        case AF_INET6:
 188                xprt_rdma_format_addresses6(xprt, sap);
 189                break;
 190        default:
 191                pr_err("rpcrdma: Unrecognized address family\n");
 192                return;
 193        }
 194
 195        (void)rpc_ntop(sap, buf, sizeof(buf));
 196        xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL);
 197
 198        snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
 199        xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
 200
 201        snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
 202        xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
 203
 204        xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
 205}
 206
 207void
 208xprt_rdma_free_addresses(struct rpc_xprt *xprt)
 209{
 210        unsigned int i;
 211
 212        for (i = 0; i < RPC_DISPLAY_MAX; i++)
 213                switch (i) {
 214                case RPC_DISPLAY_PROTO:
 215                case RPC_DISPLAY_NETID:
 216                        continue;
 217                default:
 218                        kfree(xprt->address_strings[i]);
 219                }
 220}
 221
 222void
 223rpcrdma_conn_func(struct rpcrdma_ep *ep)
 224{
 225        schedule_delayed_work(&ep->rep_connect_worker, 0);
 226}
 227
 228void
 229rpcrdma_connect_worker(struct work_struct *work)
 230{
 231        struct rpcrdma_ep *ep =
 232                container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
 233        struct rpcrdma_xprt *r_xprt =
 234                container_of(ep, struct rpcrdma_xprt, rx_ep);
 235        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 236
 237        spin_lock_bh(&xprt->transport_lock);
 238        if (++xprt->connect_cookie == 0)        /* maintain a reserved value */
 239                ++xprt->connect_cookie;
 240        if (ep->rep_connected > 0) {
 241                if (!xprt_test_and_set_connected(xprt))
 242                        xprt_wake_pending_tasks(xprt, 0);
 243        } else {
 244                if (xprt_test_and_clear_connected(xprt))
 245                        xprt_wake_pending_tasks(xprt, -ENOTCONN);
 246        }
 247        spin_unlock_bh(&xprt->transport_lock);
 248}
 249
 250static void
 251xprt_rdma_connect_worker(struct work_struct *work)
 252{
 253        struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
 254                                                   rx_connect_worker.work);
 255        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 256        int rc = 0;
 257
 258        xprt_clear_connected(xprt);
 259
 260        dprintk("RPC:       %s: %sconnect\n", __func__,
 261                        r_xprt->rx_ep.rep_connected != 0 ? "re" : "");
 262        rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
 263        if (rc)
 264                xprt_wake_pending_tasks(xprt, rc);
 265
 266        dprintk("RPC:       %s: exit\n", __func__);
 267        xprt_clear_connecting(xprt);
 268}
 269
 270static void
 271xprt_rdma_inject_disconnect(struct rpc_xprt *xprt)
 272{
 273        struct rpcrdma_xprt *r_xprt = container_of(xprt, struct rpcrdma_xprt,
 274                                                   rx_xprt);
 275
 276        pr_info("rpcrdma: injecting transport disconnect on xprt=%p\n", xprt);
 277        rdma_disconnect(r_xprt->rx_ia.ri_id);
 278}
 279
 280/*
 281 * xprt_rdma_destroy
 282 *
 283 * Destroy the xprt.
 284 * Free all memory associated with the object, including its own.
 285 * NOTE: none of the *destroy methods free memory for their top-level
 286 * objects, even though they may have allocated it (they do free
 287 * private memory). It's up to the caller to handle it. In this
 288 * case (RDMA transport), all structure memory is inlined with the
 289 * struct rpcrdma_xprt.
 290 */
 291static void
 292xprt_rdma_destroy(struct rpc_xprt *xprt)
 293{
 294        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 295
 296        dprintk("RPC:       %s: called\n", __func__);
 297
 298        cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
 299
 300        xprt_clear_connected(xprt);
 301
 302        rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
 303        rpcrdma_buffer_destroy(&r_xprt->rx_buf);
 304        rpcrdma_ia_close(&r_xprt->rx_ia);
 305
 306        xprt_rdma_free_addresses(xprt);
 307
 308        xprt_free(xprt);
 309
 310        dprintk("RPC:       %s: returning\n", __func__);
 311
 312        module_put(THIS_MODULE);
 313}
 314
 315static const struct rpc_timeout xprt_rdma_default_timeout = {
 316        .to_initval = 60 * HZ,
 317        .to_maxval = 60 * HZ,
 318};
 319
 320/**
 321 * xprt_setup_rdma - Set up transport to use RDMA
 322 *
 323 * @args: rpc transport arguments
 324 */
 325static struct rpc_xprt *
 326xprt_setup_rdma(struct xprt_create *args)
 327{
 328        struct rpcrdma_create_data_internal cdata;
 329        struct rpc_xprt *xprt;
 330        struct rpcrdma_xprt *new_xprt;
 331        struct rpcrdma_ep *new_ep;
 332        struct sockaddr *sap;
 333        int rc;
 334
 335        if (args->addrlen > sizeof(xprt->addr)) {
 336                dprintk("RPC:       %s: address too large\n", __func__);
 337                return ERR_PTR(-EBADF);
 338        }
 339
 340        xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt),
 341                        xprt_rdma_slot_table_entries,
 342                        xprt_rdma_slot_table_entries);
 343        if (xprt == NULL) {
 344                dprintk("RPC:       %s: couldn't allocate rpcrdma_xprt\n",
 345                        __func__);
 346                return ERR_PTR(-ENOMEM);
 347        }
 348
 349        /* 60 second timeout, no retries */
 350        xprt->timeout = &xprt_rdma_default_timeout;
 351        xprt->bind_timeout = RPCRDMA_BIND_TO;
 352        xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 353        xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
 354
 355        xprt->resvport = 0;             /* privileged port not needed */
 356        xprt->tsh_size = 0;             /* RPC-RDMA handles framing */
 357        xprt->ops = &xprt_rdma_procs;
 358
 359        /*
 360         * Set up RDMA-specific connect data.
 361         */
 362
 363        sap = (struct sockaddr *)&cdata.addr;
 364        memcpy(sap, args->dstaddr, args->addrlen);
 365
 366        /* Ensure xprt->addr holds valid server TCP (not RDMA)
 367         * address, for any side protocols which peek at it */
 368        xprt->prot = IPPROTO_TCP;
 369        xprt->addrlen = args->addrlen;
 370        memcpy(&xprt->addr, sap, xprt->addrlen);
 371
 372        if (rpc_get_port(sap))
 373                xprt_set_bound(xprt);
 374
 375        cdata.max_requests = xprt->max_reqs;
 376
 377        cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
 378        cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
 379
 380        cdata.inline_wsize = xprt_rdma_max_inline_write;
 381        if (cdata.inline_wsize > cdata.wsize)
 382                cdata.inline_wsize = cdata.wsize;
 383
 384        cdata.inline_rsize = xprt_rdma_max_inline_read;
 385        if (cdata.inline_rsize > cdata.rsize)
 386                cdata.inline_rsize = cdata.rsize;
 387
 388        cdata.padding = xprt_rdma_inline_write_padding;
 389
 390        /*
 391         * Create new transport instance, which includes initialized
 392         *  o ia
 393         *  o endpoint
 394         *  o buffers
 395         */
 396
 397        new_xprt = rpcx_to_rdmax(xprt);
 398
 399        rc = rpcrdma_ia_open(new_xprt, sap);
 400        if (rc)
 401                goto out1;
 402
 403        /*
 404         * initialize and create ep
 405         */
 406        new_xprt->rx_data = cdata;
 407        new_ep = &new_xprt->rx_ep;
 408        new_ep->rep_remote_addr = cdata.addr;
 409
 410        rc = rpcrdma_ep_create(&new_xprt->rx_ep,
 411                                &new_xprt->rx_ia, &new_xprt->rx_data);
 412        if (rc)
 413                goto out2;
 414
 415        /*
 416         * Allocate pre-registered send and receive buffers for headers and
 417         * any inline data. Also specify any padding which will be provided
 418         * from a preregistered zero buffer.
 419         */
 420        rc = rpcrdma_buffer_create(new_xprt);
 421        if (rc)
 422                goto out3;
 423
 424        /*
 425         * Register a callback for connection events. This is necessary because
 426         * connection loss notification is async. We also catch connection loss
 427         * when reaping receives.
 428         */
 429        INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
 430                          xprt_rdma_connect_worker);
 431
 432        xprt_rdma_format_addresses(xprt, sap);
 433        xprt->max_payload = new_xprt->rx_ia.ri_ops->ro_maxpages(new_xprt);
 434        if (xprt->max_payload == 0)
 435                goto out4;
 436        xprt->max_payload <<= PAGE_SHIFT;
 437        dprintk("RPC:       %s: transport data payload maximum: %zu bytes\n",
 438                __func__, xprt->max_payload);
 439
 440        if (!try_module_get(THIS_MODULE))
 441                goto out4;
 442
 443        dprintk("RPC:       %s: %s:%s\n", __func__,
 444                xprt->address_strings[RPC_DISPLAY_ADDR],
 445                xprt->address_strings[RPC_DISPLAY_PORT]);
 446        return xprt;
 447
 448out4:
 449        xprt_rdma_free_addresses(xprt);
 450        rc = -EINVAL;
 451out3:
 452        rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
 453out2:
 454        rpcrdma_ia_close(&new_xprt->rx_ia);
 455out1:
 456        xprt_free(xprt);
 457        return ERR_PTR(rc);
 458}
 459
 460/**
 461 * xprt_rdma_close - Close down RDMA connection
 462 * @xprt: generic transport to be closed
 463 *
 464 * Called during transport shutdown reconnect, or device
 465 * removal. Caller holds the transport's write lock.
 466 */
 467static void
 468xprt_rdma_close(struct rpc_xprt *xprt)
 469{
 470        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 471        struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 472        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 473
 474        dprintk("RPC:       %s: closing xprt %p\n", __func__, xprt);
 475
 476        if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
 477                xprt_clear_connected(xprt);
 478                rpcrdma_ia_remove(ia);
 479                return;
 480        }
 481        if (ep->rep_connected == -ENODEV)
 482                return;
 483        if (ep->rep_connected > 0)
 484                xprt->reestablish_timeout = 0;
 485        xprt_disconnect_done(xprt);
 486        rpcrdma_ep_disconnect(ep, ia);
 487}
 488
 489static void
 490xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
 491{
 492        struct sockaddr_in *sap;
 493
 494        sap = (struct sockaddr_in *)&xprt->addr;
 495        sap->sin_port = htons(port);
 496        sap = (struct sockaddr_in *)&rpcx_to_rdmad(xprt).addr;
 497        sap->sin_port = htons(port);
 498        dprintk("RPC:       %s: %u\n", __func__, port);
 499}
 500
 501/**
 502 * xprt_rdma_timer - invoked when an RPC times out
 503 * @xprt: controlling RPC transport
 504 * @task: RPC task that timed out
 505 *
 506 * Invoked when the transport is still connected, but an RPC
 507 * retransmit timeout occurs.
 508 *
 509 * Since RDMA connections don't have a keep-alive, forcibly
 510 * disconnect and retry to connect. This drives full
 511 * detection of the network path, and retransmissions of
 512 * all pending RPCs.
 513 */
 514static void
 515xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task)
 516{
 517        dprintk("RPC: %5u %s: xprt = %p\n", task->tk_pid, __func__, xprt);
 518
 519        xprt_force_disconnect(xprt);
 520}
 521
 522static void
 523xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
 524{
 525        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 526
 527        if (r_xprt->rx_ep.rep_connected != 0) {
 528                /* Reconnect */
 529                schedule_delayed_work(&r_xprt->rx_connect_worker,
 530                                      xprt->reestablish_timeout);
 531                xprt->reestablish_timeout <<= 1;
 532                if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
 533                        xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
 534                else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
 535                        xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 536        } else {
 537                schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
 538                if (!RPC_IS_ASYNC(task))
 539                        flush_delayed_work(&r_xprt->rx_connect_worker);
 540        }
 541}
 542
 543/* Allocate a fixed-size buffer in which to construct and send the
 544 * RPC-over-RDMA header for this request.
 545 */
 546static bool
 547rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 548                    gfp_t flags)
 549{
 550        size_t size = RPCRDMA_HDRBUF_SIZE;
 551        struct rpcrdma_regbuf *rb;
 552
 553        if (req->rl_rdmabuf)
 554                return true;
 555
 556        rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags);
 557        if (IS_ERR(rb))
 558                return false;
 559
 560        r_xprt->rx_stats.hardway_register_count += size;
 561        req->rl_rdmabuf = rb;
 562        return true;
 563}
 564
 565static bool
 566rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 567                    size_t size, gfp_t flags)
 568{
 569        struct rpcrdma_regbuf *rb;
 570
 571        if (req->rl_sendbuf && rdmab_length(req->rl_sendbuf) >= size)
 572                return true;
 573
 574        rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags);
 575        if (IS_ERR(rb))
 576                return false;
 577
 578        rpcrdma_free_regbuf(req->rl_sendbuf);
 579        r_xprt->rx_stats.hardway_register_count += size;
 580        req->rl_sendbuf = rb;
 581        return true;
 582}
 583
 584/* The rq_rcv_buf is used only if a Reply chunk is necessary.
 585 * The decision to use a Reply chunk is made later in
 586 * rpcrdma_marshal_req. This buffer is registered at that time.
 587 *
 588 * Otherwise, the associated RPC Reply arrives in a separate
 589 * Receive buffer, arbitrarily chosen by the HCA. The buffer
 590 * allocated here for the RPC Reply is not utilized in that
 591 * case. See rpcrdma_inline_fixup.
 592 *
 593 * A regbuf is used here to remember the buffer size.
 594 */
 595static bool
 596rpcrdma_get_recvbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 597                    size_t size, gfp_t flags)
 598{
 599        struct rpcrdma_regbuf *rb;
 600
 601        if (req->rl_recvbuf && rdmab_length(req->rl_recvbuf) >= size)
 602                return true;
 603
 604        rb = rpcrdma_alloc_regbuf(size, DMA_NONE, flags);
 605        if (IS_ERR(rb))
 606                return false;
 607
 608        rpcrdma_free_regbuf(req->rl_recvbuf);
 609        r_xprt->rx_stats.hardway_register_count += size;
 610        req->rl_recvbuf = rb;
 611        return true;
 612}
 613
 614/**
 615 * xprt_rdma_allocate - allocate transport resources for an RPC
 616 * @task: RPC task
 617 *
 618 * Return values:
 619 *        0:    Success; rq_buffer points to RPC buffer to use
 620 *   ENOMEM:    Out of memory, call again later
 621 *      EIO:    A permanent error occurred, do not retry
 622 *
 623 * The RDMA allocate/free functions need the task structure as a place
 624 * to hide the struct rpcrdma_req, which is necessary for the actual
 625 * send/recv sequence.
 626 *
 627 * xprt_rdma_allocate provides buffers that are already mapped for
 628 * DMA, and a local DMA lkey is provided for each.
 629 */
 630static int
 631xprt_rdma_allocate(struct rpc_task *task)
 632{
 633        struct rpc_rqst *rqst = task->tk_rqstp;
 634        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
 635        struct rpcrdma_req *req;
 636        gfp_t flags;
 637
 638        req = rpcrdma_buffer_get(&r_xprt->rx_buf);
 639        if (req == NULL)
 640                return -ENOMEM;
 641
 642        flags = RPCRDMA_DEF_GFP;
 643        if (RPC_IS_SWAPPER(task))
 644                flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
 645
 646        if (!rpcrdma_get_rdmabuf(r_xprt, req, flags))
 647                goto out_fail;
 648        if (!rpcrdma_get_sendbuf(r_xprt, req, rqst->rq_callsize, flags))
 649                goto out_fail;
 650        if (!rpcrdma_get_recvbuf(r_xprt, req, rqst->rq_rcvsize, flags))
 651                goto out_fail;
 652
 653        dprintk("RPC: %5u %s: send size = %zd, recv size = %zd, req = %p\n",
 654                task->tk_pid, __func__, rqst->rq_callsize,
 655                rqst->rq_rcvsize, req);
 656
 657        req->rl_connect_cookie = 0;     /* our reserved value */
 658        rpcrdma_set_xprtdata(rqst, req);
 659        rqst->rq_buffer = req->rl_sendbuf->rg_base;
 660        rqst->rq_rbuffer = req->rl_recvbuf->rg_base;
 661        return 0;
 662
 663out_fail:
 664        rpcrdma_buffer_put(req);
 665        return -ENOMEM;
 666}
 667
 668/**
 669 * xprt_rdma_free - release resources allocated by xprt_rdma_allocate
 670 * @task: RPC task
 671 *
 672 * Caller guarantees rqst->rq_buffer is non-NULL.
 673 */
 674static void
 675xprt_rdma_free(struct rpc_task *task)
 676{
 677        struct rpc_rqst *rqst = task->tk_rqstp;
 678        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
 679        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 680        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 681
 682        if (req->rl_backchannel)
 683                return;
 684
 685        dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
 686
 687        rpcrdma_remove_req(&r_xprt->rx_buf, req);
 688        if (!list_empty(&req->rl_registered))
 689                ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
 690        rpcrdma_unmap_sges(ia, req);
 691        rpcrdma_buffer_put(req);
 692}
 693
 694/**
 695 * xprt_rdma_send_request - marshal and send an RPC request
 696 * @task: RPC task with an RPC message in rq_snd_buf
 697 *
 698 * Caller holds the transport's write lock.
 699 *
 700 * Return values:
 701 *        0:    The request has been sent
 702 * ENOTCONN:    Caller needs to invoke connect logic then call again
 703 *  ENOBUFS:    Call again later to send the request
 704 *      EIO:    A permanent error occurred. The request was not sent,
 705 *              and don't try it again
 706 *
 707 * send_request invokes the meat of RPC RDMA. It must do the following:
 708 *
 709 *  1.  Marshal the RPC request into an RPC RDMA request, which means
 710 *      putting a header in front of data, and creating IOVs for RDMA
 711 *      from those in the request.
 712 *  2.  In marshaling, detect opportunities for RDMA, and use them.
 713 *  3.  Post a recv message to set up asynch completion, then send
 714 *      the request (rpcrdma_ep_post).
 715 *  4.  No partial sends are possible in the RPC-RDMA protocol (as in UDP).
 716 */
 717static int
 718xprt_rdma_send_request(struct rpc_task *task)
 719{
 720        struct rpc_rqst *rqst = task->tk_rqstp;
 721        struct rpc_xprt *xprt = rqst->rq_xprt;
 722        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 723        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 724        int rc = 0;
 725
 726        if (!xprt_connected(xprt))
 727                goto drop_connection;
 728
 729        /* On retransmit, remove any previously registered chunks */
 730        if (unlikely(!list_empty(&req->rl_registered)))
 731                r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
 732
 733        rc = rpcrdma_marshal_req(rqst);
 734        if (rc < 0)
 735                goto failed_marshal;
 736
 737        if (req->rl_reply == NULL)              /* e.g. reconnection */
 738                rpcrdma_recv_buffer_get(req);
 739
 740        /* Must suppress retransmit to maintain credits */
 741        if (req->rl_connect_cookie == xprt->connect_cookie)
 742                goto drop_connection;
 743        req->rl_connect_cookie = xprt->connect_cookie;
 744
 745        if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
 746                goto drop_connection;
 747
 748        rqst->rq_xmit_bytes_sent += rqst->rq_snd_buf.len;
 749        rqst->rq_bytes_sent = 0;
 750        return 0;
 751
 752failed_marshal:
 753        if (rc != -ENOTCONN)
 754                return rc;
 755drop_connection:
 756        xprt_disconnect_done(xprt);
 757        return -ENOTCONN;       /* implies disconnect */
 758}
 759
 760void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
 761{
 762        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 763        long idle_time = 0;
 764
 765        if (xprt_connected(xprt))
 766                idle_time = (long)(jiffies - xprt->last_used) / HZ;
 767
 768        seq_puts(seq, "\txprt:\trdma ");
 769        seq_printf(seq, "%u %lu %lu %lu %ld %lu %lu %lu %llu %llu ",
 770                   0,   /* need a local port? */
 771                   xprt->stat.bind_count,
 772                   xprt->stat.connect_count,
 773                   xprt->stat.connect_time,
 774                   idle_time,
 775                   xprt->stat.sends,
 776                   xprt->stat.recvs,
 777                   xprt->stat.bad_xids,
 778                   xprt->stat.req_u,
 779                   xprt->stat.bklog_u);
 780        seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu ",
 781                   r_xprt->rx_stats.read_chunk_count,
 782                   r_xprt->rx_stats.write_chunk_count,
 783                   r_xprt->rx_stats.reply_chunk_count,
 784                   r_xprt->rx_stats.total_rdma_request,
 785                   r_xprt->rx_stats.total_rdma_reply,
 786                   r_xprt->rx_stats.pullup_copy_count,
 787                   r_xprt->rx_stats.fixup_copy_count,
 788                   r_xprt->rx_stats.hardway_register_count,
 789                   r_xprt->rx_stats.failed_marshal_count,
 790                   r_xprt->rx_stats.bad_reply_count,
 791                   r_xprt->rx_stats.nomsg_call_count);
 792        seq_printf(seq, "%lu %lu %lu %lu\n",
 793                   r_xprt->rx_stats.mrs_recovered,
 794                   r_xprt->rx_stats.mrs_orphaned,
 795                   r_xprt->rx_stats.mrs_allocated,
 796                   r_xprt->rx_stats.local_inv_needed);
 797}
 798
 799static int
 800xprt_rdma_enable_swap(struct rpc_xprt *xprt)
 801{
 802        return 0;
 803}
 804
 805static void
 806xprt_rdma_disable_swap(struct rpc_xprt *xprt)
 807{
 808}
 809
 810/*
 811 * Plumbing for rpc transport switch and kernel module
 812 */
 813
 814static struct rpc_xprt_ops xprt_rdma_procs = {
 815        .reserve_xprt           = xprt_reserve_xprt_cong,
 816        .release_xprt           = xprt_release_xprt_cong, /* sunrpc/xprt.c */
 817        .alloc_slot             = xprt_alloc_slot,
 818        .release_request        = xprt_release_rqst_cong,       /* ditto */
 819        .set_retrans_timeout    = xprt_set_retrans_timeout_def, /* ditto */
 820        .timer                  = xprt_rdma_timer,
 821        .rpcbind                = rpcb_getport_async,   /* sunrpc/rpcb_clnt.c */
 822        .set_port               = xprt_rdma_set_port,
 823        .connect                = xprt_rdma_connect,
 824        .buf_alloc              = xprt_rdma_allocate,
 825        .buf_free               = xprt_rdma_free,
 826        .send_request           = xprt_rdma_send_request,
 827        .close                  = xprt_rdma_close,
 828        .destroy                = xprt_rdma_destroy,
 829        .print_stats            = xprt_rdma_print_stats,
 830        .enable_swap            = xprt_rdma_enable_swap,
 831        .disable_swap           = xprt_rdma_disable_swap,
 832        .inject_disconnect      = xprt_rdma_inject_disconnect,
 833#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 834        .bc_setup               = xprt_rdma_bc_setup,
 835        .bc_up                  = xprt_rdma_bc_up,
 836        .bc_maxpayload          = xprt_rdma_bc_maxpayload,
 837        .bc_free_rqst           = xprt_rdma_bc_free_rqst,
 838        .bc_destroy             = xprt_rdma_bc_destroy,
 839#endif
 840};
 841
 842static struct xprt_class xprt_rdma = {
 843        .list                   = LIST_HEAD_INIT(xprt_rdma.list),
 844        .name                   = "rdma",
 845        .owner                  = THIS_MODULE,
 846        .ident                  = XPRT_TRANSPORT_RDMA,
 847        .setup                  = xprt_setup_rdma,
 848};
 849
 850void xprt_rdma_cleanup(void)
 851{
 852        int rc;
 853
 854        dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");
 855#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 856        if (sunrpc_table_header) {
 857                unregister_sysctl_table(sunrpc_table_header);
 858                sunrpc_table_header = NULL;
 859        }
 860#endif
 861        rc = xprt_unregister_transport(&xprt_rdma);
 862        if (rc)
 863                dprintk("RPC:       %s: xprt_unregister returned %i\n",
 864                        __func__, rc);
 865
 866        rpcrdma_destroy_wq();
 867
 868        rc = xprt_unregister_transport(&xprt_rdma_bc);
 869        if (rc)
 870                dprintk("RPC:       %s: xprt_unregister(bc) returned %i\n",
 871                        __func__, rc);
 872}
 873
 874int xprt_rdma_init(void)
 875{
 876        int rc;
 877
 878        rc = rpcrdma_alloc_wq();
 879        if (rc)
 880                return rc;
 881
 882        rc = xprt_register_transport(&xprt_rdma);
 883        if (rc) {
 884                rpcrdma_destroy_wq();
 885                return rc;
 886        }
 887
 888        rc = xprt_register_transport(&xprt_rdma_bc);
 889        if (rc) {
 890                xprt_unregister_transport(&xprt_rdma);
 891                rpcrdma_destroy_wq();
 892                return rc;
 893        }
 894
 895        dprintk("RPCRDMA Module Init, register RPC RDMA transport\n");
 896
 897        dprintk("Defaults:\n");
 898        dprintk("\tSlots %d\n"
 899                "\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",
 900                xprt_rdma_slot_table_entries,
 901                xprt_rdma_max_inline_read, xprt_rdma_max_inline_write);
 902        dprintk("\tPadding %d\n\tMemreg %d\n",
 903                xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy);
 904
 905#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 906        if (!sunrpc_table_header)
 907                sunrpc_table_header = register_sysctl_table(sunrpc_table);
 908#endif
 909        return 0;
 910}
 911