linux/net/sunrpc/xprtrdma/transport.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2014-2017 Oracle.  All rights reserved.
   3 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   4 *
   5 * This software is available to you under a choice of one of two
   6 * licenses.  You may choose to be licensed under the terms of the GNU
   7 * General Public License (GPL) Version 2, available from the file
   8 * COPYING in the main directory of this source tree, or the BSD-type
   9 * license below:
  10 *
  11 * Redistribution and use in source and binary forms, with or without
  12 * modification, are permitted provided that the following conditions
  13 * are met:
  14 *
  15 *      Redistributions of source code must retain the above copyright
  16 *      notice, this list of conditions and the following disclaimer.
  17 *
  18 *      Redistributions in binary form must reproduce the above
  19 *      copyright notice, this list of conditions and the following
  20 *      disclaimer in the documentation and/or other materials provided
  21 *      with the distribution.
  22 *
  23 *      Neither the name of the Network Appliance, Inc. nor the names of
  24 *      its contributors may be used to endorse or promote products
  25 *      derived from this software without specific prior written
  26 *      permission.
  27 *
  28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  39 */
  40
  41/*
  42 * transport.c
  43 *
  44 * This file contains the top-level implementation of an RPC RDMA
  45 * transport.
  46 *
  47 * Naming convention: functions beginning with xprt_ are part of the
  48 * transport switch. All others are RPC RDMA internal.
  49 */
  50
  51#include <linux/module.h>
  52#include <linux/slab.h>
  53#include <linux/seq_file.h>
  54#include <linux/sunrpc/addr.h>
  55
  56#include "xprt_rdma.h"
  57
  58#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  59# define RPCDBG_FACILITY        RPCDBG_TRANS
  60#endif
  61
  62/*
  63 * tunables
  64 */
  65
  66static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
  67unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
  68static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
  69unsigned int xprt_rdma_memreg_strategy          = RPCRDMA_FRWR;
  70int xprt_rdma_pad_optimize;
  71
  72#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  73
  74static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
  75static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
  76static unsigned int min_inline_size = RPCRDMA_MIN_INLINE;
  77static unsigned int max_inline_size = RPCRDMA_MAX_INLINE;
  78static unsigned int zero;
  79static unsigned int max_padding = PAGE_SIZE;
  80static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
  81static unsigned int max_memreg = RPCRDMA_LAST - 1;
  82static unsigned int dummy;
  83
  84static struct ctl_table_header *sunrpc_table_header;
  85
  86static struct ctl_table xr_tunables_table[] = {
  87        {
  88                .procname       = "rdma_slot_table_entries",
  89                .data           = &xprt_rdma_slot_table_entries,
  90                .maxlen         = sizeof(unsigned int),
  91                .mode           = 0644,
  92                .proc_handler   = proc_dointvec_minmax,
  93                .extra1         = &min_slot_table_size,
  94                .extra2         = &max_slot_table_size
  95        },
  96        {
  97                .procname       = "rdma_max_inline_read",
  98                .data           = &xprt_rdma_max_inline_read,
  99                .maxlen         = sizeof(unsigned int),
 100                .mode           = 0644,
 101                .proc_handler   = proc_dointvec_minmax,
 102                .extra1         = &min_inline_size,
 103                .extra2         = &max_inline_size,
 104        },
 105        {
 106                .procname       = "rdma_max_inline_write",
 107                .data           = &xprt_rdma_max_inline_write,
 108                .maxlen         = sizeof(unsigned int),
 109                .mode           = 0644,
 110                .proc_handler   = proc_dointvec_minmax,
 111                .extra1         = &min_inline_size,
 112                .extra2         = &max_inline_size,
 113        },
 114        {
 115                .procname       = "rdma_inline_write_padding",
 116                .data           = &dummy,
 117                .maxlen         = sizeof(unsigned int),
 118                .mode           = 0644,
 119                .proc_handler   = proc_dointvec_minmax,
 120                .extra1         = &zero,
 121                .extra2         = &max_padding,
 122        },
 123        {
 124                .procname       = "rdma_memreg_strategy",
 125                .data           = &xprt_rdma_memreg_strategy,
 126                .maxlen         = sizeof(unsigned int),
 127                .mode           = 0644,
 128                .proc_handler   = proc_dointvec_minmax,
 129                .extra1         = &min_memreg,
 130                .extra2         = &max_memreg,
 131        },
 132        {
 133                .procname       = "rdma_pad_optimize",
 134                .data           = &xprt_rdma_pad_optimize,
 135                .maxlen         = sizeof(unsigned int),
 136                .mode           = 0644,
 137                .proc_handler   = proc_dointvec,
 138        },
 139        { },
 140};
 141
 142static struct ctl_table sunrpc_table[] = {
 143        {
 144                .procname       = "sunrpc",
 145                .mode           = 0555,
 146                .child          = xr_tunables_table
 147        },
 148        { },
 149};
 150
 151#endif
 152
 153static const struct rpc_xprt_ops xprt_rdma_procs;
 154
 155static void
 156xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap)
 157{
 158        struct sockaddr_in *sin = (struct sockaddr_in *)sap;
 159        char buf[20];
 160
 161        snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
 162        xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
 163
 164        xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA;
 165}
 166
 167static void
 168xprt_rdma_format_addresses6(struct rpc_xprt *xprt, struct sockaddr *sap)
 169{
 170        struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)sap;
 171        char buf[40];
 172
 173        snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
 174        xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
 175
 176        xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA6;
 177}
 178
 179void
 180xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap)
 181{
 182        char buf[128];
 183
 184        switch (sap->sa_family) {
 185        case AF_INET:
 186                xprt_rdma_format_addresses4(xprt, sap);
 187                break;
 188        case AF_INET6:
 189                xprt_rdma_format_addresses6(xprt, sap);
 190                break;
 191        default:
 192                pr_err("rpcrdma: Unrecognized address family\n");
 193                return;
 194        }
 195
 196        (void)rpc_ntop(sap, buf, sizeof(buf));
 197        xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL);
 198
 199        snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
 200        xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
 201
 202        snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
 203        xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
 204
 205        xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
 206}
 207
 208void
 209xprt_rdma_free_addresses(struct rpc_xprt *xprt)
 210{
 211        unsigned int i;
 212
 213        for (i = 0; i < RPC_DISPLAY_MAX; i++)
 214                switch (i) {
 215                case RPC_DISPLAY_PROTO:
 216                case RPC_DISPLAY_NETID:
 217                        continue;
 218                default:
 219                        kfree(xprt->address_strings[i]);
 220                }
 221}
 222
 223void
 224rpcrdma_conn_func(struct rpcrdma_ep *ep)
 225{
 226        schedule_delayed_work(&ep->rep_connect_worker, 0);
 227}
 228
 229void
 230rpcrdma_connect_worker(struct work_struct *work)
 231{
 232        struct rpcrdma_ep *ep =
 233                container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
 234        struct rpcrdma_xprt *r_xprt =
 235                container_of(ep, struct rpcrdma_xprt, rx_ep);
 236        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 237
 238        spin_lock_bh(&xprt->transport_lock);
 239        if (ep->rep_connected > 0) {
 240                if (!xprt_test_and_set_connected(xprt))
 241                        xprt_wake_pending_tasks(xprt, 0);
 242        } else {
 243                if (xprt_test_and_clear_connected(xprt))
 244                        xprt_wake_pending_tasks(xprt, -ENOTCONN);
 245        }
 246        spin_unlock_bh(&xprt->transport_lock);
 247}
 248
 249static void
 250xprt_rdma_connect_worker(struct work_struct *work)
 251{
 252        struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
 253                                                   rx_connect_worker.work);
 254        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 255        int rc = 0;
 256
 257        xprt_clear_connected(xprt);
 258
 259        rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
 260        if (rc)
 261                xprt_wake_pending_tasks(xprt, rc);
 262
 263        xprt_clear_connecting(xprt);
 264}
 265
 266static void
 267xprt_rdma_inject_disconnect(struct rpc_xprt *xprt)
 268{
 269        struct rpcrdma_xprt *r_xprt = container_of(xprt, struct rpcrdma_xprt,
 270                                                   rx_xprt);
 271
 272        trace_xprtrdma_inject_dsc(r_xprt);
 273        rdma_disconnect(r_xprt->rx_ia.ri_id);
 274}
 275
 276/*
 277 * xprt_rdma_destroy
 278 *
 279 * Destroy the xprt.
 280 * Free all memory associated with the object, including its own.
 281 * NOTE: none of the *destroy methods free memory for their top-level
 282 * objects, even though they may have allocated it (they do free
 283 * private memory). It's up to the caller to handle it. In this
 284 * case (RDMA transport), all structure memory is inlined with the
 285 * struct rpcrdma_xprt.
 286 */
 287static void
 288xprt_rdma_destroy(struct rpc_xprt *xprt)
 289{
 290        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 291
 292        trace_xprtrdma_destroy(r_xprt);
 293
 294        cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
 295
 296        xprt_clear_connected(xprt);
 297
 298        rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
 299        rpcrdma_buffer_destroy(&r_xprt->rx_buf);
 300        rpcrdma_ia_close(&r_xprt->rx_ia);
 301
 302        xprt_rdma_free_addresses(xprt);
 303        xprt_free(xprt);
 304
 305        module_put(THIS_MODULE);
 306}
 307
 308static const struct rpc_timeout xprt_rdma_default_timeout = {
 309        .to_initval = 60 * HZ,
 310        .to_maxval = 60 * HZ,
 311};
 312
 313/**
 314 * xprt_setup_rdma - Set up transport to use RDMA
 315 *
 316 * @args: rpc transport arguments
 317 */
 318static struct rpc_xprt *
 319xprt_setup_rdma(struct xprt_create *args)
 320{
 321        struct rpcrdma_create_data_internal cdata;
 322        struct rpc_xprt *xprt;
 323        struct rpcrdma_xprt *new_xprt;
 324        struct rpcrdma_ep *new_ep;
 325        struct sockaddr *sap;
 326        int rc;
 327
 328        if (args->addrlen > sizeof(xprt->addr)) {
 329                dprintk("RPC:       %s: address too large\n", __func__);
 330                return ERR_PTR(-EBADF);
 331        }
 332
 333        xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt),
 334                        xprt_rdma_slot_table_entries,
 335                        xprt_rdma_slot_table_entries);
 336        if (xprt == NULL) {
 337                dprintk("RPC:       %s: couldn't allocate rpcrdma_xprt\n",
 338                        __func__);
 339                return ERR_PTR(-ENOMEM);
 340        }
 341
 342        /* 60 second timeout, no retries */
 343        xprt->timeout = &xprt_rdma_default_timeout;
 344        xprt->bind_timeout = RPCRDMA_BIND_TO;
 345        xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 346        xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
 347
 348        xprt->resvport = 0;             /* privileged port not needed */
 349        xprt->tsh_size = 0;             /* RPC-RDMA handles framing */
 350        xprt->ops = &xprt_rdma_procs;
 351
 352        /*
 353         * Set up RDMA-specific connect data.
 354         */
 355        sap = args->dstaddr;
 356
 357        /* Ensure xprt->addr holds valid server TCP (not RDMA)
 358         * address, for any side protocols which peek at it */
 359        xprt->prot = IPPROTO_TCP;
 360        xprt->addrlen = args->addrlen;
 361        memcpy(&xprt->addr, sap, xprt->addrlen);
 362
 363        if (rpc_get_port(sap))
 364                xprt_set_bound(xprt);
 365        xprt_rdma_format_addresses(xprt, sap);
 366
 367        cdata.max_requests = xprt->max_reqs;
 368
 369        cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
 370        cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
 371
 372        cdata.inline_wsize = xprt_rdma_max_inline_write;
 373        if (cdata.inline_wsize > cdata.wsize)
 374                cdata.inline_wsize = cdata.wsize;
 375
 376        cdata.inline_rsize = xprt_rdma_max_inline_read;
 377        if (cdata.inline_rsize > cdata.rsize)
 378                cdata.inline_rsize = cdata.rsize;
 379
 380        /*
 381         * Create new transport instance, which includes initialized
 382         *  o ia
 383         *  o endpoint
 384         *  o buffers
 385         */
 386
 387        new_xprt = rpcx_to_rdmax(xprt);
 388
 389        rc = rpcrdma_ia_open(new_xprt);
 390        if (rc)
 391                goto out1;
 392
 393        /*
 394         * initialize and create ep
 395         */
 396        new_xprt->rx_data = cdata;
 397        new_ep = &new_xprt->rx_ep;
 398
 399        rc = rpcrdma_ep_create(&new_xprt->rx_ep,
 400                                &new_xprt->rx_ia, &new_xprt->rx_data);
 401        if (rc)
 402                goto out2;
 403
 404        rc = rpcrdma_buffer_create(new_xprt);
 405        if (rc)
 406                goto out3;
 407
 408        INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
 409                          xprt_rdma_connect_worker);
 410
 411        xprt->max_payload = new_xprt->rx_ia.ri_ops->ro_maxpages(new_xprt);
 412        if (xprt->max_payload == 0)
 413                goto out4;
 414        xprt->max_payload <<= PAGE_SHIFT;
 415        dprintk("RPC:       %s: transport data payload maximum: %zu bytes\n",
 416                __func__, xprt->max_payload);
 417
 418        if (!try_module_get(THIS_MODULE))
 419                goto out4;
 420
 421        dprintk("RPC:       %s: %s:%s\n", __func__,
 422                xprt->address_strings[RPC_DISPLAY_ADDR],
 423                xprt->address_strings[RPC_DISPLAY_PORT]);
 424        trace_xprtrdma_create(new_xprt);
 425        return xprt;
 426
 427out4:
 428        rpcrdma_buffer_destroy(&new_xprt->rx_buf);
 429        rc = -ENODEV;
 430out3:
 431        rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
 432out2:
 433        rpcrdma_ia_close(&new_xprt->rx_ia);
 434out1:
 435        trace_xprtrdma_destroy(new_xprt);
 436        xprt_rdma_free_addresses(xprt);
 437        xprt_free(xprt);
 438        return ERR_PTR(rc);
 439}
 440
 441/**
 442 * xprt_rdma_close - Close down RDMA connection
 443 * @xprt: generic transport to be closed
 444 *
 445 * Called during transport shutdown reconnect, or device
 446 * removal. Caller holds the transport's write lock.
 447 */
 448static void
 449xprt_rdma_close(struct rpc_xprt *xprt)
 450{
 451        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 452        struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 453        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 454
 455        dprintk("RPC:       %s: closing xprt %p\n", __func__, xprt);
 456
 457        if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
 458                xprt_clear_connected(xprt);
 459                rpcrdma_ia_remove(ia);
 460                return;
 461        }
 462        if (ep->rep_connected == -ENODEV)
 463                return;
 464        if (ep->rep_connected > 0)
 465                xprt->reestablish_timeout = 0;
 466        xprt_disconnect_done(xprt);
 467        rpcrdma_ep_disconnect(ep, ia);
 468}
 469
 470/**
 471 * xprt_rdma_set_port - update server port with rpcbind result
 472 * @xprt: controlling RPC transport
 473 * @port: new port value
 474 *
 475 * Transport connect status is unchanged.
 476 */
 477static void
 478xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
 479{
 480        struct sockaddr *sap = (struct sockaddr *)&xprt->addr;
 481        char buf[8];
 482
 483        dprintk("RPC:       %s: setting port for xprt %p (%s:%s) to %u\n",
 484                __func__, xprt,
 485                xprt->address_strings[RPC_DISPLAY_ADDR],
 486                xprt->address_strings[RPC_DISPLAY_PORT],
 487                port);
 488
 489        rpc_set_port(sap, port);
 490
 491        kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
 492        snprintf(buf, sizeof(buf), "%u", port);
 493        xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
 494
 495        kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
 496        snprintf(buf, sizeof(buf), "%4hx", port);
 497        xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
 498}
 499
 500/**
 501 * xprt_rdma_timer - invoked when an RPC times out
 502 * @xprt: controlling RPC transport
 503 * @task: RPC task that timed out
 504 *
 505 * Invoked when the transport is still connected, but an RPC
 506 * retransmit timeout occurs.
 507 *
 508 * Since RDMA connections don't have a keep-alive, forcibly
 509 * disconnect and retry to connect. This drives full
 510 * detection of the network path, and retransmissions of
 511 * all pending RPCs.
 512 */
 513static void
 514xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task)
 515{
 516        xprt_force_disconnect(xprt);
 517}
 518
 519static void
 520xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
 521{
 522        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 523
 524        if (r_xprt->rx_ep.rep_connected != 0) {
 525                /* Reconnect */
 526                schedule_delayed_work(&r_xprt->rx_connect_worker,
 527                                      xprt->reestablish_timeout);
 528                xprt->reestablish_timeout <<= 1;
 529                if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
 530                        xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
 531                else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
 532                        xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 533        } else {
 534                schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
 535                if (!RPC_IS_ASYNC(task))
 536                        flush_delayed_work(&r_xprt->rx_connect_worker);
 537        }
 538}
 539
 540static bool
 541rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 542                    size_t size, gfp_t flags)
 543{
 544        struct rpcrdma_regbuf *rb;
 545
 546        if (req->rl_sendbuf && rdmab_length(req->rl_sendbuf) >= size)
 547                return true;
 548
 549        rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags);
 550        if (IS_ERR(rb))
 551                return false;
 552
 553        rpcrdma_free_regbuf(req->rl_sendbuf);
 554        r_xprt->rx_stats.hardway_register_count += size;
 555        req->rl_sendbuf = rb;
 556        return true;
 557}
 558
 559/* The rq_rcv_buf is used only if a Reply chunk is necessary.
 560 * The decision to use a Reply chunk is made later in
 561 * rpcrdma_marshal_req. This buffer is registered at that time.
 562 *
 563 * Otherwise, the associated RPC Reply arrives in a separate
 564 * Receive buffer, arbitrarily chosen by the HCA. The buffer
 565 * allocated here for the RPC Reply is not utilized in that
 566 * case. See rpcrdma_inline_fixup.
 567 *
 568 * A regbuf is used here to remember the buffer size.
 569 */
 570static bool
 571rpcrdma_get_recvbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 572                    size_t size, gfp_t flags)
 573{
 574        struct rpcrdma_regbuf *rb;
 575
 576        if (req->rl_recvbuf && rdmab_length(req->rl_recvbuf) >= size)
 577                return true;
 578
 579        rb = rpcrdma_alloc_regbuf(size, DMA_NONE, flags);
 580        if (IS_ERR(rb))
 581                return false;
 582
 583        rpcrdma_free_regbuf(req->rl_recvbuf);
 584        r_xprt->rx_stats.hardway_register_count += size;
 585        req->rl_recvbuf = rb;
 586        return true;
 587}
 588
 589/**
 590 * xprt_rdma_allocate - allocate transport resources for an RPC
 591 * @task: RPC task
 592 *
 593 * Return values:
 594 *        0:    Success; rq_buffer points to RPC buffer to use
 595 *   ENOMEM:    Out of memory, call again later
 596 *      EIO:    A permanent error occurred, do not retry
 597 *
 598 * The RDMA allocate/free functions need the task structure as a place
 599 * to hide the struct rpcrdma_req, which is necessary for the actual
 600 * send/recv sequence.
 601 *
 602 * xprt_rdma_allocate provides buffers that are already mapped for
 603 * DMA, and a local DMA lkey is provided for each.
 604 */
 605static int
 606xprt_rdma_allocate(struct rpc_task *task)
 607{
 608        struct rpc_rqst *rqst = task->tk_rqstp;
 609        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
 610        struct rpcrdma_req *req;
 611        gfp_t flags;
 612
 613        req = rpcrdma_buffer_get(&r_xprt->rx_buf);
 614        if (req == NULL)
 615                goto out_get;
 616
 617        flags = RPCRDMA_DEF_GFP;
 618        if (RPC_IS_SWAPPER(task))
 619                flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
 620
 621        if (!rpcrdma_get_sendbuf(r_xprt, req, rqst->rq_callsize, flags))
 622                goto out_fail;
 623        if (!rpcrdma_get_recvbuf(r_xprt, req, rqst->rq_rcvsize, flags))
 624                goto out_fail;
 625
 626        rpcrdma_set_xprtdata(rqst, req);
 627        rqst->rq_buffer = req->rl_sendbuf->rg_base;
 628        rqst->rq_rbuffer = req->rl_recvbuf->rg_base;
 629        trace_xprtrdma_allocate(task, req);
 630        return 0;
 631
 632out_fail:
 633        rpcrdma_buffer_put(req);
 634out_get:
 635        trace_xprtrdma_allocate(task, NULL);
 636        return -ENOMEM;
 637}
 638
 639/**
 640 * xprt_rdma_free - release resources allocated by xprt_rdma_allocate
 641 * @task: RPC task
 642 *
 643 * Caller guarantees rqst->rq_buffer is non-NULL.
 644 */
 645static void
 646xprt_rdma_free(struct rpc_task *task)
 647{
 648        struct rpc_rqst *rqst = task->tk_rqstp;
 649        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
 650        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 651
 652        if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags))
 653                rpcrdma_release_rqst(r_xprt, req);
 654        trace_xprtrdma_rpc_done(task, req);
 655        rpcrdma_buffer_put(req);
 656}
 657
 658/**
 659 * xprt_rdma_send_request - marshal and send an RPC request
 660 * @task: RPC task with an RPC message in rq_snd_buf
 661 *
 662 * Caller holds the transport's write lock.
 663 *
 664 * Returns:
 665 *      %0 if the RPC message has been sent
 666 *      %-ENOTCONN if the caller should reconnect and call again
 667 *      %-EAGAIN if the caller should call again
 668 *      %-ENOBUFS if the caller should call again after a delay
 669 *      %-EIO if a permanent error occurred and the request was not
 670 *              sent. Do not try to send this message again.
 671 */
 672static int
 673xprt_rdma_send_request(struct rpc_task *task)
 674{
 675        struct rpc_rqst *rqst = task->tk_rqstp;
 676        struct rpc_xprt *xprt = rqst->rq_xprt;
 677        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 678        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 679        int rc = 0;
 680
 681#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 682        if (unlikely(!rqst->rq_buffer))
 683                return xprt_rdma_bc_send_reply(rqst);
 684#endif  /* CONFIG_SUNRPC_BACKCHANNEL */
 685
 686        if (!xprt_connected(xprt))
 687                goto drop_connection;
 688
 689        rc = rpcrdma_marshal_req(r_xprt, rqst);
 690        if (rc < 0)
 691                goto failed_marshal;
 692
 693        if (req->rl_reply == NULL)              /* e.g. reconnection */
 694                rpcrdma_recv_buffer_get(req);
 695
 696        /* Must suppress retransmit to maintain credits */
 697        if (rqst->rq_connect_cookie == xprt->connect_cookie)
 698                goto drop_connection;
 699        rqst->rq_xtime = ktime_get();
 700
 701        __set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
 702        if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
 703                goto drop_connection;
 704
 705        rqst->rq_xmit_bytes_sent += rqst->rq_snd_buf.len;
 706        rqst->rq_bytes_sent = 0;
 707
 708        /* An RPC with no reply will throw off credit accounting,
 709         * so drop the connection to reset the credit grant.
 710         */
 711        if (!rpc_reply_expected(task))
 712                goto drop_connection;
 713        return 0;
 714
 715failed_marshal:
 716        if (rc != -ENOTCONN)
 717                return rc;
 718drop_connection:
 719        xprt_disconnect_done(xprt);
 720        return -ENOTCONN;       /* implies disconnect */
 721}
 722
 723void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
 724{
 725        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 726        long idle_time = 0;
 727
 728        if (xprt_connected(xprt))
 729                idle_time = (long)(jiffies - xprt->last_used) / HZ;
 730
 731        seq_puts(seq, "\txprt:\trdma ");
 732        seq_printf(seq, "%u %lu %lu %lu %ld %lu %lu %lu %llu %llu ",
 733                   0,   /* need a local port? */
 734                   xprt->stat.bind_count,
 735                   xprt->stat.connect_count,
 736                   xprt->stat.connect_time,
 737                   idle_time,
 738                   xprt->stat.sends,
 739                   xprt->stat.recvs,
 740                   xprt->stat.bad_xids,
 741                   xprt->stat.req_u,
 742                   xprt->stat.bklog_u);
 743        seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu ",
 744                   r_xprt->rx_stats.read_chunk_count,
 745                   r_xprt->rx_stats.write_chunk_count,
 746                   r_xprt->rx_stats.reply_chunk_count,
 747                   r_xprt->rx_stats.total_rdma_request,
 748                   r_xprt->rx_stats.total_rdma_reply,
 749                   r_xprt->rx_stats.pullup_copy_count,
 750                   r_xprt->rx_stats.fixup_copy_count,
 751                   r_xprt->rx_stats.hardway_register_count,
 752                   r_xprt->rx_stats.failed_marshal_count,
 753                   r_xprt->rx_stats.bad_reply_count,
 754                   r_xprt->rx_stats.nomsg_call_count);
 755        seq_printf(seq, "%lu %lu %lu %lu %lu %lu\n",
 756                   r_xprt->rx_stats.mrs_recovered,
 757                   r_xprt->rx_stats.mrs_orphaned,
 758                   r_xprt->rx_stats.mrs_allocated,
 759                   r_xprt->rx_stats.local_inv_needed,
 760                   r_xprt->rx_stats.empty_sendctx_q,
 761                   r_xprt->rx_stats.reply_waits_for_send);
 762}
 763
 764static int
 765xprt_rdma_enable_swap(struct rpc_xprt *xprt)
 766{
 767        return 0;
 768}
 769
 770static void
 771xprt_rdma_disable_swap(struct rpc_xprt *xprt)
 772{
 773}
 774
 775/*
 776 * Plumbing for rpc transport switch and kernel module
 777 */
 778
 779static const struct rpc_xprt_ops xprt_rdma_procs = {
 780        .reserve_xprt           = xprt_reserve_xprt_cong,
 781        .release_xprt           = xprt_release_xprt_cong, /* sunrpc/xprt.c */
 782        .alloc_slot             = xprt_alloc_slot,
 783        .release_request        = xprt_release_rqst_cong,       /* ditto */
 784        .set_retrans_timeout    = xprt_set_retrans_timeout_def, /* ditto */
 785        .timer                  = xprt_rdma_timer,
 786        .rpcbind                = rpcb_getport_async,   /* sunrpc/rpcb_clnt.c */
 787        .set_port               = xprt_rdma_set_port,
 788        .connect                = xprt_rdma_connect,
 789        .buf_alloc              = xprt_rdma_allocate,
 790        .buf_free               = xprt_rdma_free,
 791        .send_request           = xprt_rdma_send_request,
 792        .close                  = xprt_rdma_close,
 793        .destroy                = xprt_rdma_destroy,
 794        .print_stats            = xprt_rdma_print_stats,
 795        .enable_swap            = xprt_rdma_enable_swap,
 796        .disable_swap           = xprt_rdma_disable_swap,
 797        .inject_disconnect      = xprt_rdma_inject_disconnect,
 798#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 799        .bc_setup               = xprt_rdma_bc_setup,
 800        .bc_up                  = xprt_rdma_bc_up,
 801        .bc_maxpayload          = xprt_rdma_bc_maxpayload,
 802        .bc_free_rqst           = xprt_rdma_bc_free_rqst,
 803        .bc_destroy             = xprt_rdma_bc_destroy,
 804#endif
 805};
 806
 807static struct xprt_class xprt_rdma = {
 808        .list                   = LIST_HEAD_INIT(xprt_rdma.list),
 809        .name                   = "rdma",
 810        .owner                  = THIS_MODULE,
 811        .ident                  = XPRT_TRANSPORT_RDMA,
 812        .setup                  = xprt_setup_rdma,
 813};
 814
 815void xprt_rdma_cleanup(void)
 816{
 817        int rc;
 818
 819        dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");
 820#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 821        if (sunrpc_table_header) {
 822                unregister_sysctl_table(sunrpc_table_header);
 823                sunrpc_table_header = NULL;
 824        }
 825#endif
 826        rc = xprt_unregister_transport(&xprt_rdma);
 827        if (rc)
 828                dprintk("RPC:       %s: xprt_unregister returned %i\n",
 829                        __func__, rc);
 830
 831        rpcrdma_destroy_wq();
 832
 833        rc = xprt_unregister_transport(&xprt_rdma_bc);
 834        if (rc)
 835                dprintk("RPC:       %s: xprt_unregister(bc) returned %i\n",
 836                        __func__, rc);
 837}
 838
 839int xprt_rdma_init(void)
 840{
 841        int rc;
 842
 843        rc = rpcrdma_alloc_wq();
 844        if (rc)
 845                return rc;
 846
 847        rc = xprt_register_transport(&xprt_rdma);
 848        if (rc) {
 849                rpcrdma_destroy_wq();
 850                return rc;
 851        }
 852
 853        rc = xprt_register_transport(&xprt_rdma_bc);
 854        if (rc) {
 855                xprt_unregister_transport(&xprt_rdma);
 856                rpcrdma_destroy_wq();
 857                return rc;
 858        }
 859
 860        dprintk("RPCRDMA Module Init, register RPC RDMA transport\n");
 861
 862        dprintk("Defaults:\n");
 863        dprintk("\tSlots %d\n"
 864                "\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",
 865                xprt_rdma_slot_table_entries,
 866                xprt_rdma_max_inline_read, xprt_rdma_max_inline_write);
 867        dprintk("\tPadding 0\n\tMemreg %d\n", xprt_rdma_memreg_strategy);
 868
 869#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 870        if (!sunrpc_table_header)
 871                sunrpc_table_header = register_sysctl_table(sunrpc_table);
 872#endif
 873        return 0;
 874}
 875