linux/net/sunrpc/xprtrdma/transport.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
   2/*
   3 * Copyright (c) 2014-2017 Oracle.  All rights reserved.
   4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   5 *
   6 * This software is available to you under a choice of one of two
   7 * licenses.  You may choose to be licensed under the terms of the GNU
   8 * General Public License (GPL) Version 2, available from the file
   9 * COPYING in the main directory of this source tree, or the BSD-type
  10 * license below:
  11 *
  12 * Redistribution and use in source and binary forms, with or without
  13 * modification, are permitted provided that the following conditions
  14 * are met:
  15 *
  16 *      Redistributions of source code must retain the above copyright
  17 *      notice, this list of conditions and the following disclaimer.
  18 *
  19 *      Redistributions in binary form must reproduce the above
  20 *      copyright notice, this list of conditions and the following
  21 *      disclaimer in the documentation and/or other materials provided
  22 *      with the distribution.
  23 *
  24 *      Neither the name of the Network Appliance, Inc. nor the names of
  25 *      its contributors may be used to endorse or promote products
  26 *      derived from this software without specific prior written
  27 *      permission.
  28 *
  29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  40 */
  41
  42/*
  43 * transport.c
  44 *
  45 * This file contains the top-level implementation of an RPC RDMA
  46 * transport.
  47 *
  48 * Naming convention: functions beginning with xprt_ are part of the
  49 * transport switch. All others are RPC RDMA internal.
  50 */
  51
  52#include <linux/module.h>
  53#include <linux/slab.h>
  54#include <linux/seq_file.h>
  55#include <linux/smp.h>
  56
  57#include <linux/sunrpc/addr.h>
  58#include <linux/sunrpc/svc_rdma.h>
  59
  60#include "xprt_rdma.h"
  61#include <trace/events/rpcrdma.h>
  62
  63#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  64# define RPCDBG_FACILITY        RPCDBG_TRANS
  65#endif
  66
  67/*
  68 * tunables
  69 */
  70
  71static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
  72unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
  73static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
  74unsigned int xprt_rdma_memreg_strategy          = RPCRDMA_FRWR;
  75int xprt_rdma_pad_optimize;
  76
  77#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  78
  79static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
  80static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
  81static unsigned int min_inline_size = RPCRDMA_MIN_INLINE;
  82static unsigned int max_inline_size = RPCRDMA_MAX_INLINE;
  83static unsigned int zero;
  84static unsigned int max_padding = PAGE_SIZE;
  85static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
  86static unsigned int max_memreg = RPCRDMA_LAST - 1;
  87static unsigned int dummy;
  88
  89static struct ctl_table_header *sunrpc_table_header;
  90
  91static struct ctl_table xr_tunables_table[] = {
  92        {
  93                .procname       = "rdma_slot_table_entries",
  94                .data           = &xprt_rdma_slot_table_entries,
  95                .maxlen         = sizeof(unsigned int),
  96                .mode           = 0644,
  97                .proc_handler   = proc_dointvec_minmax,
  98                .extra1         = &min_slot_table_size,
  99                .extra2         = &max_slot_table_size
 100        },
 101        {
 102                .procname       = "rdma_max_inline_read",
 103                .data           = &xprt_rdma_max_inline_read,
 104                .maxlen         = sizeof(unsigned int),
 105                .mode           = 0644,
 106                .proc_handler   = proc_dointvec_minmax,
 107                .extra1         = &min_inline_size,
 108                .extra2         = &max_inline_size,
 109        },
 110        {
 111                .procname       = "rdma_max_inline_write",
 112                .data           = &xprt_rdma_max_inline_write,
 113                .maxlen         = sizeof(unsigned int),
 114                .mode           = 0644,
 115                .proc_handler   = proc_dointvec_minmax,
 116                .extra1         = &min_inline_size,
 117                .extra2         = &max_inline_size,
 118        },
 119        {
 120                .procname       = "rdma_inline_write_padding",
 121                .data           = &dummy,
 122                .maxlen         = sizeof(unsigned int),
 123                .mode           = 0644,
 124                .proc_handler   = proc_dointvec_minmax,
 125                .extra1         = &zero,
 126                .extra2         = &max_padding,
 127        },
 128        {
 129                .procname       = "rdma_memreg_strategy",
 130                .data           = &xprt_rdma_memreg_strategy,
 131                .maxlen         = sizeof(unsigned int),
 132                .mode           = 0644,
 133                .proc_handler   = proc_dointvec_minmax,
 134                .extra1         = &min_memreg,
 135                .extra2         = &max_memreg,
 136        },
 137        {
 138                .procname       = "rdma_pad_optimize",
 139                .data           = &xprt_rdma_pad_optimize,
 140                .maxlen         = sizeof(unsigned int),
 141                .mode           = 0644,
 142                .proc_handler   = proc_dointvec,
 143        },
 144        { },
 145};
 146
 147static struct ctl_table sunrpc_table[] = {
 148        {
 149                .procname       = "sunrpc",
 150                .mode           = 0555,
 151                .child          = xr_tunables_table
 152        },
 153        { },
 154};
 155
 156#endif
 157
 158static const struct rpc_xprt_ops xprt_rdma_procs;
 159
 160static void
 161xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap)
 162{
 163        struct sockaddr_in *sin = (struct sockaddr_in *)sap;
 164        char buf[20];
 165
 166        snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
 167        xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
 168
 169        xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA;
 170}
 171
 172static void
 173xprt_rdma_format_addresses6(struct rpc_xprt *xprt, struct sockaddr *sap)
 174{
 175        struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)sap;
 176        char buf[40];
 177
 178        snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
 179        xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
 180
 181        xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA6;
 182}
 183
 184void
 185xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap)
 186{
 187        char buf[128];
 188
 189        switch (sap->sa_family) {
 190        case AF_INET:
 191                xprt_rdma_format_addresses4(xprt, sap);
 192                break;
 193        case AF_INET6:
 194                xprt_rdma_format_addresses6(xprt, sap);
 195                break;
 196        default:
 197                pr_err("rpcrdma: Unrecognized address family\n");
 198                return;
 199        }
 200
 201        (void)rpc_ntop(sap, buf, sizeof(buf));
 202        xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL);
 203
 204        snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
 205        xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
 206
 207        snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
 208        xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
 209
 210        xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
 211}
 212
 213void
 214xprt_rdma_free_addresses(struct rpc_xprt *xprt)
 215{
 216        unsigned int i;
 217
 218        for (i = 0; i < RPC_DISPLAY_MAX; i++)
 219                switch (i) {
 220                case RPC_DISPLAY_PROTO:
 221                case RPC_DISPLAY_NETID:
 222                        continue;
 223                default:
 224                        kfree(xprt->address_strings[i]);
 225                }
 226}
 227
 228/**
 229 * xprt_rdma_connect_worker - establish connection in the background
 230 * @work: worker thread context
 231 *
 232 * Requester holds the xprt's send lock to prevent activity on this
 233 * transport while a fresh connection is being established. RPC tasks
 234 * sleep on the xprt's pending queue waiting for connect to complete.
 235 */
 236static void
 237xprt_rdma_connect_worker(struct work_struct *work)
 238{
 239        struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
 240                                                   rx_connect_worker.work);
 241        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 242        int rc;
 243
 244        rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
 245        xprt_clear_connecting(xprt);
 246        if (r_xprt->rx_ep.rep_connected > 0) {
 247                if (!xprt_test_and_set_connected(xprt)) {
 248                        xprt->stat.connect_count++;
 249                        xprt->stat.connect_time += (long)jiffies -
 250                                                   xprt->stat.connect_start;
 251                        xprt_wake_pending_tasks(xprt, -EAGAIN);
 252                }
 253        } else {
 254                if (xprt_test_and_clear_connected(xprt))
 255                        xprt_wake_pending_tasks(xprt, rc);
 256        }
 257}
 258
 259/**
 260 * xprt_rdma_inject_disconnect - inject a connection fault
 261 * @xprt: transport context
 262 *
 263 * If @xprt is connected, disconnect it to simulate spurious connection
 264 * loss.
 265 */
 266static void
 267xprt_rdma_inject_disconnect(struct rpc_xprt *xprt)
 268{
 269        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 270
 271        trace_xprtrdma_op_inject_dsc(r_xprt);
 272        rdma_disconnect(r_xprt->rx_ia.ri_id);
 273}
 274
 275/**
 276 * xprt_rdma_destroy - Full tear down of transport
 277 * @xprt: doomed transport context
 278 *
 279 * Caller guarantees there will be no more calls to us with
 280 * this @xprt.
 281 */
 282static void
 283xprt_rdma_destroy(struct rpc_xprt *xprt)
 284{
 285        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 286
 287        trace_xprtrdma_op_destroy(r_xprt);
 288
 289        cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
 290
 291        rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
 292        rpcrdma_buffer_destroy(&r_xprt->rx_buf);
 293        rpcrdma_ia_close(&r_xprt->rx_ia);
 294
 295        xprt_rdma_free_addresses(xprt);
 296        xprt_free(xprt);
 297
 298        module_put(THIS_MODULE);
 299}
 300
 301static const struct rpc_timeout xprt_rdma_default_timeout = {
 302        .to_initval = 60 * HZ,
 303        .to_maxval = 60 * HZ,
 304};
 305
 306/**
 307 * xprt_setup_rdma - Set up transport to use RDMA
 308 *
 309 * @args: rpc transport arguments
 310 */
 311static struct rpc_xprt *
 312xprt_setup_rdma(struct xprt_create *args)
 313{
 314        struct rpcrdma_create_data_internal cdata;
 315        struct rpc_xprt *xprt;
 316        struct rpcrdma_xprt *new_xprt;
 317        struct rpcrdma_ep *new_ep;
 318        struct sockaddr *sap;
 319        int rc;
 320
 321        if (args->addrlen > sizeof(xprt->addr))
 322                return ERR_PTR(-EBADF);
 323
 324        xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt), 0, 0);
 325        if (!xprt)
 326                return ERR_PTR(-ENOMEM);
 327
 328        /* 60 second timeout, no retries */
 329        xprt->timeout = &xprt_rdma_default_timeout;
 330        xprt->bind_timeout = RPCRDMA_BIND_TO;
 331        xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 332        xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
 333
 334        xprt->resvport = 0;             /* privileged port not needed */
 335        xprt->ops = &xprt_rdma_procs;
 336
 337        /*
 338         * Set up RDMA-specific connect data.
 339         */
 340        sap = args->dstaddr;
 341
 342        /* Ensure xprt->addr holds valid server TCP (not RDMA)
 343         * address, for any side protocols which peek at it */
 344        xprt->prot = IPPROTO_TCP;
 345        xprt->addrlen = args->addrlen;
 346        memcpy(&xprt->addr, sap, xprt->addrlen);
 347
 348        if (rpc_get_port(sap))
 349                xprt_set_bound(xprt);
 350        xprt_rdma_format_addresses(xprt, sap);
 351
 352        cdata.max_requests = xprt_rdma_slot_table_entries;
 353
 354        cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
 355        cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
 356
 357        cdata.inline_wsize = xprt_rdma_max_inline_write;
 358        if (cdata.inline_wsize > cdata.wsize)
 359                cdata.inline_wsize = cdata.wsize;
 360
 361        cdata.inline_rsize = xprt_rdma_max_inline_read;
 362        if (cdata.inline_rsize > cdata.rsize)
 363                cdata.inline_rsize = cdata.rsize;
 364
 365        /*
 366         * Create new transport instance, which includes initialized
 367         *  o ia
 368         *  o endpoint
 369         *  o buffers
 370         */
 371
 372        new_xprt = rpcx_to_rdmax(xprt);
 373
 374        rc = rpcrdma_ia_open(new_xprt);
 375        if (rc)
 376                goto out1;
 377
 378        /*
 379         * initialize and create ep
 380         */
 381        new_xprt->rx_data = cdata;
 382        new_ep = &new_xprt->rx_ep;
 383
 384        rc = rpcrdma_ep_create(&new_xprt->rx_ep,
 385                                &new_xprt->rx_ia, &new_xprt->rx_data);
 386        if (rc)
 387                goto out2;
 388
 389        rc = rpcrdma_buffer_create(new_xprt);
 390        if (rc)
 391                goto out3;
 392
 393        INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
 394                          xprt_rdma_connect_worker);
 395
 396        xprt->max_payload = frwr_maxpages(new_xprt);
 397        if (xprt->max_payload == 0)
 398                goto out4;
 399        xprt->max_payload <<= PAGE_SHIFT;
 400        dprintk("RPC:       %s: transport data payload maximum: %zu bytes\n",
 401                __func__, xprt->max_payload);
 402
 403        if (!try_module_get(THIS_MODULE))
 404                goto out4;
 405
 406        dprintk("RPC:       %s: %s:%s\n", __func__,
 407                xprt->address_strings[RPC_DISPLAY_ADDR],
 408                xprt->address_strings[RPC_DISPLAY_PORT]);
 409        trace_xprtrdma_create(new_xprt);
 410        return xprt;
 411
 412out4:
 413        rpcrdma_buffer_destroy(&new_xprt->rx_buf);
 414        rc = -ENODEV;
 415out3:
 416        rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
 417out2:
 418        rpcrdma_ia_close(&new_xprt->rx_ia);
 419out1:
 420        trace_xprtrdma_op_destroy(new_xprt);
 421        xprt_rdma_free_addresses(xprt);
 422        xprt_free(xprt);
 423        return ERR_PTR(rc);
 424}
 425
 426/**
 427 * xprt_rdma_close - close a transport connection
 428 * @xprt: transport context
 429 *
 430 * Called during autoclose or device removal.
 431 *
 432 * Caller holds @xprt's send lock to prevent activity on this
 433 * transport while the connection is torn down.
 434 */
 435void xprt_rdma_close(struct rpc_xprt *xprt)
 436{
 437        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 438        struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 439        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 440
 441        might_sleep();
 442
 443        trace_xprtrdma_op_close(r_xprt);
 444
 445        /* Prevent marshaling and sending of new requests */
 446        xprt_clear_connected(xprt);
 447
 448        if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
 449                rpcrdma_ia_remove(ia);
 450                goto out;
 451        }
 452
 453        if (ep->rep_connected == -ENODEV)
 454                return;
 455        if (ep->rep_connected > 0)
 456                xprt->reestablish_timeout = 0;
 457        rpcrdma_ep_disconnect(ep, ia);
 458
 459        /* Prepare @xprt for the next connection by reinitializing
 460         * its credit grant to one (see RFC 8166, Section 3.3.3).
 461         */
 462        r_xprt->rx_buf.rb_credits = 1;
 463        xprt->cwnd = RPC_CWNDSHIFT;
 464
 465out:
 466        ++xprt->connect_cookie;
 467        xprt_disconnect_done(xprt);
 468}
 469
 470/**
 471 * xprt_rdma_set_port - update server port with rpcbind result
 472 * @xprt: controlling RPC transport
 473 * @port: new port value
 474 *
 475 * Transport connect status is unchanged.
 476 */
 477static void
 478xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
 479{
 480        struct sockaddr *sap = (struct sockaddr *)&xprt->addr;
 481        char buf[8];
 482
 483        dprintk("RPC:       %s: setting port for xprt %p (%s:%s) to %u\n",
 484                __func__, xprt,
 485                xprt->address_strings[RPC_DISPLAY_ADDR],
 486                xprt->address_strings[RPC_DISPLAY_PORT],
 487                port);
 488
 489        rpc_set_port(sap, port);
 490
 491        kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
 492        snprintf(buf, sizeof(buf), "%u", port);
 493        xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
 494
 495        kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
 496        snprintf(buf, sizeof(buf), "%4hx", port);
 497        xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
 498}
 499
 500/**
 501 * xprt_rdma_timer - invoked when an RPC times out
 502 * @xprt: controlling RPC transport
 503 * @task: RPC task that timed out
 504 *
 505 * Invoked when the transport is still connected, but an RPC
 506 * retransmit timeout occurs.
 507 *
 508 * Since RDMA connections don't have a keep-alive, forcibly
 509 * disconnect and retry to connect. This drives full
 510 * detection of the network path, and retransmissions of
 511 * all pending RPCs.
 512 */
 513static void
 514xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task)
 515{
 516        xprt_force_disconnect(xprt);
 517}
 518
 519/**
 520 * xprt_rdma_connect - try to establish a transport connection
 521 * @xprt: transport state
 522 * @task: RPC scheduler context
 523 *
 524 */
 525static void
 526xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
 527{
 528        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 529
 530        trace_xprtrdma_op_connect(r_xprt);
 531        if (r_xprt->rx_ep.rep_connected != 0) {
 532                /* Reconnect */
 533                schedule_delayed_work(&r_xprt->rx_connect_worker,
 534                                      xprt->reestablish_timeout);
 535                xprt->reestablish_timeout <<= 1;
 536                if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
 537                        xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
 538                else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
 539                        xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 540        } else {
 541                schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
 542                if (!RPC_IS_ASYNC(task))
 543                        flush_delayed_work(&r_xprt->rx_connect_worker);
 544        }
 545}
 546
 547/**
 548 * xprt_rdma_alloc_slot - allocate an rpc_rqst
 549 * @xprt: controlling RPC transport
 550 * @task: RPC task requesting a fresh rpc_rqst
 551 *
 552 * tk_status values:
 553 *      %0 if task->tk_rqstp points to a fresh rpc_rqst
 554 *      %-EAGAIN if no rpc_rqst is available; queued on backlog
 555 */
 556static void
 557xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
 558{
 559        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 560        struct rpcrdma_req *req;
 561
 562        req = rpcrdma_buffer_get(&r_xprt->rx_buf);
 563        if (!req)
 564                goto out_sleep;
 565        task->tk_rqstp = &req->rl_slot;
 566        task->tk_status = 0;
 567        return;
 568
 569out_sleep:
 570        rpc_sleep_on(&xprt->backlog, task, NULL);
 571        task->tk_status = -EAGAIN;
 572}
 573
 574/**
 575 * xprt_rdma_free_slot - release an rpc_rqst
 576 * @xprt: controlling RPC transport
 577 * @rqst: rpc_rqst to release
 578 *
 579 */
 580static void
 581xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
 582{
 583        memset(rqst, 0, sizeof(*rqst));
 584        rpcrdma_buffer_put(rpcr_to_rdmar(rqst));
 585        rpc_wake_up_next(&xprt->backlog);
 586}
 587
 588static bool
 589rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 590                    size_t size, gfp_t flags)
 591{
 592        struct rpcrdma_regbuf *rb;
 593
 594        if (req->rl_sendbuf && rdmab_length(req->rl_sendbuf) >= size)
 595                return true;
 596
 597        rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags);
 598        if (IS_ERR(rb))
 599                return false;
 600
 601        rpcrdma_free_regbuf(req->rl_sendbuf);
 602        r_xprt->rx_stats.hardway_register_count += size;
 603        req->rl_sendbuf = rb;
 604        return true;
 605}
 606
 607/* The rq_rcv_buf is used only if a Reply chunk is necessary.
 608 * The decision to use a Reply chunk is made later in
 609 * rpcrdma_marshal_req. This buffer is registered at that time.
 610 *
 611 * Otherwise, the associated RPC Reply arrives in a separate
 612 * Receive buffer, arbitrarily chosen by the HCA. The buffer
 613 * allocated here for the RPC Reply is not utilized in that
 614 * case. See rpcrdma_inline_fixup.
 615 *
 616 * A regbuf is used here to remember the buffer size.
 617 */
 618static bool
 619rpcrdma_get_recvbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 620                    size_t size, gfp_t flags)
 621{
 622        struct rpcrdma_regbuf *rb;
 623
 624        if (req->rl_recvbuf && rdmab_length(req->rl_recvbuf) >= size)
 625                return true;
 626
 627        rb = rpcrdma_alloc_regbuf(size, DMA_NONE, flags);
 628        if (IS_ERR(rb))
 629                return false;
 630
 631        rpcrdma_free_regbuf(req->rl_recvbuf);
 632        r_xprt->rx_stats.hardway_register_count += size;
 633        req->rl_recvbuf = rb;
 634        return true;
 635}
 636
 637/**
 638 * xprt_rdma_allocate - allocate transport resources for an RPC
 639 * @task: RPC task
 640 *
 641 * Return values:
 642 *        0:    Success; rq_buffer points to RPC buffer to use
 643 *   ENOMEM:    Out of memory, call again later
 644 *      EIO:    A permanent error occurred, do not retry
 645 */
 646static int
 647xprt_rdma_allocate(struct rpc_task *task)
 648{
 649        struct rpc_rqst *rqst = task->tk_rqstp;
 650        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
 651        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 652        gfp_t flags;
 653
 654        flags = RPCRDMA_DEF_GFP;
 655        if (RPC_IS_SWAPPER(task))
 656                flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
 657
 658        if (!rpcrdma_get_sendbuf(r_xprt, req, rqst->rq_callsize, flags))
 659                goto out_fail;
 660        if (!rpcrdma_get_recvbuf(r_xprt, req, rqst->rq_rcvsize, flags))
 661                goto out_fail;
 662
 663        rqst->rq_buffer = req->rl_sendbuf->rg_base;
 664        rqst->rq_rbuffer = req->rl_recvbuf->rg_base;
 665        trace_xprtrdma_op_allocate(task, req);
 666        return 0;
 667
 668out_fail:
 669        trace_xprtrdma_op_allocate(task, NULL);
 670        return -ENOMEM;
 671}
 672
 673/**
 674 * xprt_rdma_free - release resources allocated by xprt_rdma_allocate
 675 * @task: RPC task
 676 *
 677 * Caller guarantees rqst->rq_buffer is non-NULL.
 678 */
 679static void
 680xprt_rdma_free(struct rpc_task *task)
 681{
 682        struct rpc_rqst *rqst = task->tk_rqstp;
 683        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
 684        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 685
 686        if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags))
 687                rpcrdma_release_rqst(r_xprt, req);
 688        trace_xprtrdma_op_free(task, req);
 689}
 690
 691/**
 692 * xprt_rdma_send_request - marshal and send an RPC request
 693 * @rqst: RPC message in rq_snd_buf
 694 *
 695 * Caller holds the transport's write lock.
 696 *
 697 * Returns:
 698 *      %0 if the RPC message has been sent
 699 *      %-ENOTCONN if the caller should reconnect and call again
 700 *      %-EAGAIN if the caller should call again
 701 *      %-ENOBUFS if the caller should call again after a delay
 702 *      %-EMSGSIZE if encoding ran out of buffer space. The request
 703 *              was not sent. Do not try to send this message again.
 704 *      %-EIO if an I/O error occurred. The request was not sent.
 705 *              Do not try to send this message again.
 706 */
 707static int
 708xprt_rdma_send_request(struct rpc_rqst *rqst)
 709{
 710        struct rpc_xprt *xprt = rqst->rq_xprt;
 711        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 712        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 713        int rc = 0;
 714
 715#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 716        if (unlikely(!rqst->rq_buffer))
 717                return xprt_rdma_bc_send_reply(rqst);
 718#endif  /* CONFIG_SUNRPC_BACKCHANNEL */
 719
 720        if (!xprt_connected(xprt))
 721                return -ENOTCONN;
 722
 723        if (!xprt_request_get_cong(xprt, rqst))
 724                return -EBADSLT;
 725
 726        rc = rpcrdma_marshal_req(r_xprt, rqst);
 727        if (rc < 0)
 728                goto failed_marshal;
 729
 730        /* Must suppress retransmit to maintain credits */
 731        if (rqst->rq_connect_cookie == xprt->connect_cookie)
 732                goto drop_connection;
 733        rqst->rq_xtime = ktime_get();
 734
 735        __set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
 736        if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
 737                goto drop_connection;
 738
 739        rqst->rq_xmit_bytes_sent += rqst->rq_snd_buf.len;
 740
 741        /* An RPC with no reply will throw off credit accounting,
 742         * so drop the connection to reset the credit grant.
 743         */
 744        if (!rpc_reply_expected(rqst->rq_task))
 745                goto drop_connection;
 746        return 0;
 747
 748failed_marshal:
 749        if (rc != -ENOTCONN)
 750                return rc;
 751drop_connection:
 752        xprt_rdma_close(xprt);
 753        return -ENOTCONN;
 754}
 755
 756void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
 757{
 758        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 759        long idle_time = 0;
 760
 761        if (xprt_connected(xprt))
 762                idle_time = (long)(jiffies - xprt->last_used) / HZ;
 763
 764        seq_puts(seq, "\txprt:\trdma ");
 765        seq_printf(seq, "%u %lu %lu %lu %ld %lu %lu %lu %llu %llu ",
 766                   0,   /* need a local port? */
 767                   xprt->stat.bind_count,
 768                   xprt->stat.connect_count,
 769                   xprt->stat.connect_time / HZ,
 770                   idle_time,
 771                   xprt->stat.sends,
 772                   xprt->stat.recvs,
 773                   xprt->stat.bad_xids,
 774                   xprt->stat.req_u,
 775                   xprt->stat.bklog_u);
 776        seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu ",
 777                   r_xprt->rx_stats.read_chunk_count,
 778                   r_xprt->rx_stats.write_chunk_count,
 779                   r_xprt->rx_stats.reply_chunk_count,
 780                   r_xprt->rx_stats.total_rdma_request,
 781                   r_xprt->rx_stats.total_rdma_reply,
 782                   r_xprt->rx_stats.pullup_copy_count,
 783                   r_xprt->rx_stats.fixup_copy_count,
 784                   r_xprt->rx_stats.hardway_register_count,
 785                   r_xprt->rx_stats.failed_marshal_count,
 786                   r_xprt->rx_stats.bad_reply_count,
 787                   r_xprt->rx_stats.nomsg_call_count);
 788        seq_printf(seq, "%lu %lu %lu %lu %lu %lu\n",
 789                   r_xprt->rx_stats.mrs_recycled,
 790                   r_xprt->rx_stats.mrs_orphaned,
 791                   r_xprt->rx_stats.mrs_allocated,
 792                   r_xprt->rx_stats.local_inv_needed,
 793                   r_xprt->rx_stats.empty_sendctx_q,
 794                   r_xprt->rx_stats.reply_waits_for_send);
 795}
 796
 797static int
 798xprt_rdma_enable_swap(struct rpc_xprt *xprt)
 799{
 800        return 0;
 801}
 802
 803static void
 804xprt_rdma_disable_swap(struct rpc_xprt *xprt)
 805{
 806}
 807
 808/*
 809 * Plumbing for rpc transport switch and kernel module
 810 */
 811
 812static const struct rpc_xprt_ops xprt_rdma_procs = {
 813        .reserve_xprt           = xprt_reserve_xprt_cong,
 814        .release_xprt           = xprt_release_xprt_cong, /* sunrpc/xprt.c */
 815        .alloc_slot             = xprt_rdma_alloc_slot,
 816        .free_slot              = xprt_rdma_free_slot,
 817        .release_request        = xprt_release_rqst_cong,       /* ditto */
 818        .set_retrans_timeout    = xprt_set_retrans_timeout_def, /* ditto */
 819        .timer                  = xprt_rdma_timer,
 820        .rpcbind                = rpcb_getport_async,   /* sunrpc/rpcb_clnt.c */
 821        .set_port               = xprt_rdma_set_port,
 822        .connect                = xprt_rdma_connect,
 823        .buf_alloc              = xprt_rdma_allocate,
 824        .buf_free               = xprt_rdma_free,
 825        .send_request           = xprt_rdma_send_request,
 826        .close                  = xprt_rdma_close,
 827        .destroy                = xprt_rdma_destroy,
 828        .print_stats            = xprt_rdma_print_stats,
 829        .enable_swap            = xprt_rdma_enable_swap,
 830        .disable_swap           = xprt_rdma_disable_swap,
 831        .inject_disconnect      = xprt_rdma_inject_disconnect,
 832#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 833        .bc_setup               = xprt_rdma_bc_setup,
 834        .bc_maxpayload          = xprt_rdma_bc_maxpayload,
 835        .bc_free_rqst           = xprt_rdma_bc_free_rqst,
 836        .bc_destroy             = xprt_rdma_bc_destroy,
 837#endif
 838};
 839
 840static struct xprt_class xprt_rdma = {
 841        .list                   = LIST_HEAD_INIT(xprt_rdma.list),
 842        .name                   = "rdma",
 843        .owner                  = THIS_MODULE,
 844        .ident                  = XPRT_TRANSPORT_RDMA,
 845        .setup                  = xprt_setup_rdma,
 846};
 847
 848void xprt_rdma_cleanup(void)
 849{
 850#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 851        if (sunrpc_table_header) {
 852                unregister_sysctl_table(sunrpc_table_header);
 853                sunrpc_table_header = NULL;
 854        }
 855#endif
 856
 857        xprt_unregister_transport(&xprt_rdma);
 858        xprt_unregister_transport(&xprt_rdma_bc);
 859}
 860
 861int xprt_rdma_init(void)
 862{
 863        int rc;
 864
 865        rc = xprt_register_transport(&xprt_rdma);
 866        if (rc)
 867                return rc;
 868
 869        rc = xprt_register_transport(&xprt_rdma_bc);
 870        if (rc) {
 871                xprt_unregister_transport(&xprt_rdma);
 872                return rc;
 873        }
 874
 875#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 876        if (!sunrpc_table_header)
 877                sunrpc_table_header = register_sysctl_table(sunrpc_table);
 878#endif
 879        return 0;
 880}
 881