linux/net/sunrpc/xprtrdma/transport.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
   2/*
   3 * Copyright (c) 2014-2017 Oracle.  All rights reserved.
   4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   5 *
   6 * This software is available to you under a choice of one of two
   7 * licenses.  You may choose to be licensed under the terms of the GNU
   8 * General Public License (GPL) Version 2, available from the file
   9 * COPYING in the main directory of this source tree, or the BSD-type
  10 * license below:
  11 *
  12 * Redistribution and use in source and binary forms, with or without
  13 * modification, are permitted provided that the following conditions
  14 * are met:
  15 *
  16 *      Redistributions of source code must retain the above copyright
  17 *      notice, this list of conditions and the following disclaimer.
  18 *
  19 *      Redistributions in binary form must reproduce the above
  20 *      copyright notice, this list of conditions and the following
  21 *      disclaimer in the documentation and/or other materials provided
  22 *      with the distribution.
  23 *
  24 *      Neither the name of the Network Appliance, Inc. nor the names of
  25 *      its contributors may be used to endorse or promote products
  26 *      derived from this software without specific prior written
  27 *      permission.
  28 *
  29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  40 */
  41
  42/*
  43 * transport.c
  44 *
  45 * This file contains the top-level implementation of an RPC RDMA
  46 * transport.
  47 *
  48 * Naming convention: functions beginning with xprt_ are part of the
  49 * transport switch. All others are RPC RDMA internal.
  50 */
  51
  52#include <linux/module.h>
  53#include <linux/slab.h>
  54#include <linux/seq_file.h>
  55#include <linux/smp.h>
  56
  57#include <linux/sunrpc/addr.h>
  58#include <linux/sunrpc/svc_rdma.h>
  59
  60#include "xprt_rdma.h"
  61#include <trace/events/rpcrdma.h>
  62
  63#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  64# define RPCDBG_FACILITY        RPCDBG_TRANS
  65#endif
  66
  67/*
  68 * tunables
  69 */
  70
  71unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
  72unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
  73unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
  74unsigned int xprt_rdma_memreg_strategy          = RPCRDMA_FRWR;
  75int xprt_rdma_pad_optimize;
  76
  77#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  78
  79static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
  80static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
  81static unsigned int min_inline_size = RPCRDMA_MIN_INLINE;
  82static unsigned int max_inline_size = RPCRDMA_MAX_INLINE;
  83static unsigned int zero;
  84static unsigned int max_padding = PAGE_SIZE;
  85static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
  86static unsigned int max_memreg = RPCRDMA_LAST - 1;
  87static unsigned int dummy;
  88
  89static struct ctl_table_header *sunrpc_table_header;
  90
  91static struct ctl_table xr_tunables_table[] = {
  92        {
  93                .procname       = "rdma_slot_table_entries",
  94                .data           = &xprt_rdma_slot_table_entries,
  95                .maxlen         = sizeof(unsigned int),
  96                .mode           = 0644,
  97                .proc_handler   = proc_dointvec_minmax,
  98                .extra1         = &min_slot_table_size,
  99                .extra2         = &max_slot_table_size
 100        },
 101        {
 102                .procname       = "rdma_max_inline_read",
 103                .data           = &xprt_rdma_max_inline_read,
 104                .maxlen         = sizeof(unsigned int),
 105                .mode           = 0644,
 106                .proc_handler   = proc_dointvec_minmax,
 107                .extra1         = &min_inline_size,
 108                .extra2         = &max_inline_size,
 109        },
 110        {
 111                .procname       = "rdma_max_inline_write",
 112                .data           = &xprt_rdma_max_inline_write,
 113                .maxlen         = sizeof(unsigned int),
 114                .mode           = 0644,
 115                .proc_handler   = proc_dointvec_minmax,
 116                .extra1         = &min_inline_size,
 117                .extra2         = &max_inline_size,
 118        },
 119        {
 120                .procname       = "rdma_inline_write_padding",
 121                .data           = &dummy,
 122                .maxlen         = sizeof(unsigned int),
 123                .mode           = 0644,
 124                .proc_handler   = proc_dointvec_minmax,
 125                .extra1         = &zero,
 126                .extra2         = &max_padding,
 127        },
 128        {
 129                .procname       = "rdma_memreg_strategy",
 130                .data           = &xprt_rdma_memreg_strategy,
 131                .maxlen         = sizeof(unsigned int),
 132                .mode           = 0644,
 133                .proc_handler   = proc_dointvec_minmax,
 134                .extra1         = &min_memreg,
 135                .extra2         = &max_memreg,
 136        },
 137        {
 138                .procname       = "rdma_pad_optimize",
 139                .data           = &xprt_rdma_pad_optimize,
 140                .maxlen         = sizeof(unsigned int),
 141                .mode           = 0644,
 142                .proc_handler   = proc_dointvec,
 143        },
 144        { },
 145};
 146
 147static struct ctl_table sunrpc_table[] = {
 148        {
 149                .procname       = "sunrpc",
 150                .mode           = 0555,
 151                .child          = xr_tunables_table
 152        },
 153        { },
 154};
 155
 156#endif
 157
 158static const struct rpc_xprt_ops xprt_rdma_procs;
 159
 160static void
 161xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap)
 162{
 163        struct sockaddr_in *sin = (struct sockaddr_in *)sap;
 164        char buf[20];
 165
 166        snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
 167        xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
 168
 169        xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA;
 170}
 171
 172static void
 173xprt_rdma_format_addresses6(struct rpc_xprt *xprt, struct sockaddr *sap)
 174{
 175        struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)sap;
 176        char buf[40];
 177
 178        snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
 179        xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
 180
 181        xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA6;
 182}
 183
 184void
 185xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap)
 186{
 187        char buf[128];
 188
 189        switch (sap->sa_family) {
 190        case AF_INET:
 191                xprt_rdma_format_addresses4(xprt, sap);
 192                break;
 193        case AF_INET6:
 194                xprt_rdma_format_addresses6(xprt, sap);
 195                break;
 196        default:
 197                pr_err("rpcrdma: Unrecognized address family\n");
 198                return;
 199        }
 200
 201        (void)rpc_ntop(sap, buf, sizeof(buf));
 202        xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL);
 203
 204        snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
 205        xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
 206
 207        snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
 208        xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
 209
 210        xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
 211}
 212
 213void
 214xprt_rdma_free_addresses(struct rpc_xprt *xprt)
 215{
 216        unsigned int i;
 217
 218        for (i = 0; i < RPC_DISPLAY_MAX; i++)
 219                switch (i) {
 220                case RPC_DISPLAY_PROTO:
 221                case RPC_DISPLAY_NETID:
 222                        continue;
 223                default:
 224                        kfree(xprt->address_strings[i]);
 225                }
 226}
 227
 228/**
 229 * xprt_rdma_connect_worker - establish connection in the background
 230 * @work: worker thread context
 231 *
 232 * Requester holds the xprt's send lock to prevent activity on this
 233 * transport while a fresh connection is being established. RPC tasks
 234 * sleep on the xprt's pending queue waiting for connect to complete.
 235 */
 236static void
 237xprt_rdma_connect_worker(struct work_struct *work)
 238{
 239        struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
 240                                                   rx_connect_worker.work);
 241        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 242        int rc;
 243
 244        rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
 245        xprt_clear_connecting(xprt);
 246        if (r_xprt->rx_ep.rep_connected > 0) {
 247                if (!xprt_test_and_set_connected(xprt)) {
 248                        xprt->stat.connect_count++;
 249                        xprt->stat.connect_time += (long)jiffies -
 250                                                   xprt->stat.connect_start;
 251                        xprt_wake_pending_tasks(xprt, -EAGAIN);
 252                }
 253        } else {
 254                if (xprt_test_and_clear_connected(xprt))
 255                        xprt_wake_pending_tasks(xprt, rc);
 256        }
 257}
 258
 259/**
 260 * xprt_rdma_inject_disconnect - inject a connection fault
 261 * @xprt: transport context
 262 *
 263 * If @xprt is connected, disconnect it to simulate spurious connection
 264 * loss.
 265 */
 266static void
 267xprt_rdma_inject_disconnect(struct rpc_xprt *xprt)
 268{
 269        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 270
 271        trace_xprtrdma_op_inject_dsc(r_xprt);
 272        rdma_disconnect(r_xprt->rx_ia.ri_id);
 273}
 274
 275/**
 276 * xprt_rdma_destroy - Full tear down of transport
 277 * @xprt: doomed transport context
 278 *
 279 * Caller guarantees there will be no more calls to us with
 280 * this @xprt.
 281 */
 282static void
 283xprt_rdma_destroy(struct rpc_xprt *xprt)
 284{
 285        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 286
 287        trace_xprtrdma_op_destroy(r_xprt);
 288
 289        cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
 290
 291        rpcrdma_ep_destroy(r_xprt);
 292        rpcrdma_buffer_destroy(&r_xprt->rx_buf);
 293        rpcrdma_ia_close(&r_xprt->rx_ia);
 294
 295        xprt_rdma_free_addresses(xprt);
 296        xprt_free(xprt);
 297
 298        module_put(THIS_MODULE);
 299}
 300
 301static const struct rpc_timeout xprt_rdma_default_timeout = {
 302        .to_initval = 60 * HZ,
 303        .to_maxval = 60 * HZ,
 304};
 305
 306/**
 307 * xprt_setup_rdma - Set up transport to use RDMA
 308 *
 309 * @args: rpc transport arguments
 310 */
 311static struct rpc_xprt *
 312xprt_setup_rdma(struct xprt_create *args)
 313{
 314        struct rpc_xprt *xprt;
 315        struct rpcrdma_xprt *new_xprt;
 316        struct sockaddr *sap;
 317        int rc;
 318
 319        if (args->addrlen > sizeof(xprt->addr))
 320                return ERR_PTR(-EBADF);
 321
 322        xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt), 0, 0);
 323        if (!xprt)
 324                return ERR_PTR(-ENOMEM);
 325
 326        /* 60 second timeout, no retries */
 327        xprt->timeout = &xprt_rdma_default_timeout;
 328        xprt->bind_timeout = RPCRDMA_BIND_TO;
 329        xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 330        xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
 331
 332        xprt->resvport = 0;             /* privileged port not needed */
 333        xprt->ops = &xprt_rdma_procs;
 334
 335        /*
 336         * Set up RDMA-specific connect data.
 337         */
 338        sap = args->dstaddr;
 339
 340        /* Ensure xprt->addr holds valid server TCP (not RDMA)
 341         * address, for any side protocols which peek at it */
 342        xprt->prot = IPPROTO_TCP;
 343        xprt->addrlen = args->addrlen;
 344        memcpy(&xprt->addr, sap, xprt->addrlen);
 345
 346        if (rpc_get_port(sap))
 347                xprt_set_bound(xprt);
 348        xprt_rdma_format_addresses(xprt, sap);
 349
 350        new_xprt = rpcx_to_rdmax(xprt);
 351        rc = rpcrdma_ia_open(new_xprt);
 352        if (rc)
 353                goto out1;
 354
 355        rc = rpcrdma_ep_create(new_xprt);
 356        if (rc)
 357                goto out2;
 358
 359        rc = rpcrdma_buffer_create(new_xprt);
 360        if (rc)
 361                goto out3;
 362
 363        INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
 364                          xprt_rdma_connect_worker);
 365
 366        xprt->max_payload = frwr_maxpages(new_xprt);
 367        if (xprt->max_payload == 0)
 368                goto out4;
 369        xprt->max_payload <<= PAGE_SHIFT;
 370        dprintk("RPC:       %s: transport data payload maximum: %zu bytes\n",
 371                __func__, xprt->max_payload);
 372
 373        if (!try_module_get(THIS_MODULE))
 374                goto out4;
 375
 376        dprintk("RPC:       %s: %s:%s\n", __func__,
 377                xprt->address_strings[RPC_DISPLAY_ADDR],
 378                xprt->address_strings[RPC_DISPLAY_PORT]);
 379        trace_xprtrdma_create(new_xprt);
 380        return xprt;
 381
 382out4:
 383        rpcrdma_buffer_destroy(&new_xprt->rx_buf);
 384        rc = -ENODEV;
 385out3:
 386        rpcrdma_ep_destroy(new_xprt);
 387out2:
 388        rpcrdma_ia_close(&new_xprt->rx_ia);
 389out1:
 390        trace_xprtrdma_op_destroy(new_xprt);
 391        xprt_rdma_free_addresses(xprt);
 392        xprt_free(xprt);
 393        return ERR_PTR(rc);
 394}
 395
 396/**
 397 * xprt_rdma_close - close a transport connection
 398 * @xprt: transport context
 399 *
 400 * Called during autoclose or device removal.
 401 *
 402 * Caller holds @xprt's send lock to prevent activity on this
 403 * transport while the connection is torn down.
 404 */
 405void xprt_rdma_close(struct rpc_xprt *xprt)
 406{
 407        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 408        struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 409        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 410
 411        might_sleep();
 412
 413        trace_xprtrdma_op_close(r_xprt);
 414
 415        /* Prevent marshaling and sending of new requests */
 416        xprt_clear_connected(xprt);
 417
 418        if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
 419                rpcrdma_ia_remove(ia);
 420                goto out;
 421        }
 422
 423        if (ep->rep_connected == -ENODEV)
 424                return;
 425        if (ep->rep_connected > 0)
 426                xprt->reestablish_timeout = 0;
 427        rpcrdma_ep_disconnect(ep, ia);
 428
 429        /* Prepare @xprt for the next connection by reinitializing
 430         * its credit grant to one (see RFC 8166, Section 3.3.3).
 431         */
 432        r_xprt->rx_buf.rb_credits = 1;
 433        xprt->cwnd = RPC_CWNDSHIFT;
 434
 435out:
 436        ++xprt->connect_cookie;
 437        xprt_disconnect_done(xprt);
 438}
 439
 440/**
 441 * xprt_rdma_set_port - update server port with rpcbind result
 442 * @xprt: controlling RPC transport
 443 * @port: new port value
 444 *
 445 * Transport connect status is unchanged.
 446 */
 447static void
 448xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
 449{
 450        struct sockaddr *sap = (struct sockaddr *)&xprt->addr;
 451        char buf[8];
 452
 453        dprintk("RPC:       %s: setting port for xprt %p (%s:%s) to %u\n",
 454                __func__, xprt,
 455                xprt->address_strings[RPC_DISPLAY_ADDR],
 456                xprt->address_strings[RPC_DISPLAY_PORT],
 457                port);
 458
 459        rpc_set_port(sap, port);
 460
 461        kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
 462        snprintf(buf, sizeof(buf), "%u", port);
 463        xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
 464
 465        kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
 466        snprintf(buf, sizeof(buf), "%4hx", port);
 467        xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
 468}
 469
 470/**
 471 * xprt_rdma_timer - invoked when an RPC times out
 472 * @xprt: controlling RPC transport
 473 * @task: RPC task that timed out
 474 *
 475 * Invoked when the transport is still connected, but an RPC
 476 * retransmit timeout occurs.
 477 *
 478 * Since RDMA connections don't have a keep-alive, forcibly
 479 * disconnect and retry to connect. This drives full
 480 * detection of the network path, and retransmissions of
 481 * all pending RPCs.
 482 */
 483static void
 484xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task)
 485{
 486        xprt_force_disconnect(xprt);
 487}
 488
 489/**
 490 * xprt_rdma_connect - try to establish a transport connection
 491 * @xprt: transport state
 492 * @task: RPC scheduler context
 493 *
 494 */
 495static void
 496xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
 497{
 498        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 499
 500        trace_xprtrdma_op_connect(r_xprt);
 501        if (r_xprt->rx_ep.rep_connected != 0) {
 502                /* Reconnect */
 503                schedule_delayed_work(&r_xprt->rx_connect_worker,
 504                                      xprt->reestablish_timeout);
 505                xprt->reestablish_timeout <<= 1;
 506                if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
 507                        xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
 508                else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
 509                        xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 510        } else {
 511                schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
 512                if (!RPC_IS_ASYNC(task))
 513                        flush_delayed_work(&r_xprt->rx_connect_worker);
 514        }
 515}
 516
 517/**
 518 * xprt_rdma_alloc_slot - allocate an rpc_rqst
 519 * @xprt: controlling RPC transport
 520 * @task: RPC task requesting a fresh rpc_rqst
 521 *
 522 * tk_status values:
 523 *      %0 if task->tk_rqstp points to a fresh rpc_rqst
 524 *      %-EAGAIN if no rpc_rqst is available; queued on backlog
 525 */
 526static void
 527xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
 528{
 529        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 530        struct rpcrdma_req *req;
 531
 532        req = rpcrdma_buffer_get(&r_xprt->rx_buf);
 533        if (!req)
 534                goto out_sleep;
 535        task->tk_rqstp = &req->rl_slot;
 536        task->tk_status = 0;
 537        return;
 538
 539out_sleep:
 540        rpc_sleep_on(&xprt->backlog, task, NULL);
 541        task->tk_status = -EAGAIN;
 542}
 543
 544/**
 545 * xprt_rdma_free_slot - release an rpc_rqst
 546 * @xprt: controlling RPC transport
 547 * @rqst: rpc_rqst to release
 548 *
 549 */
 550static void
 551xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
 552{
 553        memset(rqst, 0, sizeof(*rqst));
 554        rpcrdma_buffer_put(rpcr_to_rdmar(rqst));
 555        rpc_wake_up_next(&xprt->backlog);
 556}
 557
 558static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt,
 559                                 struct rpcrdma_regbuf *rb, size_t size,
 560                                 gfp_t flags)
 561{
 562        if (unlikely(rdmab_length(rb) < size)) {
 563                if (!rpcrdma_regbuf_realloc(rb, size, flags))
 564                        return false;
 565                r_xprt->rx_stats.hardway_register_count += size;
 566        }
 567        return true;
 568}
 569
 570/**
 571 * xprt_rdma_allocate - allocate transport resources for an RPC
 572 * @task: RPC task
 573 *
 574 * Return values:
 575 *        0:    Success; rq_buffer points to RPC buffer to use
 576 *   ENOMEM:    Out of memory, call again later
 577 *      EIO:    A permanent error occurred, do not retry
 578 */
 579static int
 580xprt_rdma_allocate(struct rpc_task *task)
 581{
 582        struct rpc_rqst *rqst = task->tk_rqstp;
 583        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
 584        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 585        gfp_t flags;
 586
 587        flags = RPCRDMA_DEF_GFP;
 588        if (RPC_IS_SWAPPER(task))
 589                flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
 590
 591        if (!rpcrdma_check_regbuf(r_xprt, req->rl_sendbuf, rqst->rq_callsize,
 592                                  flags))
 593                goto out_fail;
 594        if (!rpcrdma_check_regbuf(r_xprt, req->rl_recvbuf, rqst->rq_rcvsize,
 595                                  flags))
 596                goto out_fail;
 597
 598        rqst->rq_buffer = rdmab_data(req->rl_sendbuf);
 599        rqst->rq_rbuffer = rdmab_data(req->rl_recvbuf);
 600        trace_xprtrdma_op_allocate(task, req);
 601        return 0;
 602
 603out_fail:
 604        trace_xprtrdma_op_allocate(task, NULL);
 605        return -ENOMEM;
 606}
 607
 608/**
 609 * xprt_rdma_free - release resources allocated by xprt_rdma_allocate
 610 * @task: RPC task
 611 *
 612 * Caller guarantees rqst->rq_buffer is non-NULL.
 613 */
 614static void
 615xprt_rdma_free(struct rpc_task *task)
 616{
 617        struct rpc_rqst *rqst = task->tk_rqstp;
 618        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
 619        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 620
 621        if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags))
 622                rpcrdma_release_rqst(r_xprt, req);
 623        trace_xprtrdma_op_free(task, req);
 624}
 625
 626/**
 627 * xprt_rdma_send_request - marshal and send an RPC request
 628 * @rqst: RPC message in rq_snd_buf
 629 *
 630 * Caller holds the transport's write lock.
 631 *
 632 * Returns:
 633 *      %0 if the RPC message has been sent
 634 *      %-ENOTCONN if the caller should reconnect and call again
 635 *      %-EAGAIN if the caller should call again
 636 *      %-ENOBUFS if the caller should call again after a delay
 637 *      %-EMSGSIZE if encoding ran out of buffer space. The request
 638 *              was not sent. Do not try to send this message again.
 639 *      %-EIO if an I/O error occurred. The request was not sent.
 640 *              Do not try to send this message again.
 641 */
 642static int
 643xprt_rdma_send_request(struct rpc_rqst *rqst)
 644{
 645        struct rpc_xprt *xprt = rqst->rq_xprt;
 646        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 647        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 648        int rc = 0;
 649
 650#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 651        if (unlikely(!rqst->rq_buffer))
 652                return xprt_rdma_bc_send_reply(rqst);
 653#endif  /* CONFIG_SUNRPC_BACKCHANNEL */
 654
 655        if (!xprt_connected(xprt))
 656                return -ENOTCONN;
 657
 658        if (!xprt_request_get_cong(xprt, rqst))
 659                return -EBADSLT;
 660
 661        rc = rpcrdma_marshal_req(r_xprt, rqst);
 662        if (rc < 0)
 663                goto failed_marshal;
 664
 665        /* Must suppress retransmit to maintain credits */
 666        if (rqst->rq_connect_cookie == xprt->connect_cookie)
 667                goto drop_connection;
 668        rqst->rq_xtime = ktime_get();
 669
 670        __set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
 671        if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
 672                goto drop_connection;
 673
 674        rqst->rq_xmit_bytes_sent += rqst->rq_snd_buf.len;
 675
 676        /* An RPC with no reply will throw off credit accounting,
 677         * so drop the connection to reset the credit grant.
 678         */
 679        if (!rpc_reply_expected(rqst->rq_task))
 680                goto drop_connection;
 681        return 0;
 682
 683failed_marshal:
 684        if (rc != -ENOTCONN)
 685                return rc;
 686drop_connection:
 687        xprt_rdma_close(xprt);
 688        return -ENOTCONN;
 689}
 690
 691void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
 692{
 693        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 694        long idle_time = 0;
 695
 696        if (xprt_connected(xprt))
 697                idle_time = (long)(jiffies - xprt->last_used) / HZ;
 698
 699        seq_puts(seq, "\txprt:\trdma ");
 700        seq_printf(seq, "%u %lu %lu %lu %ld %lu %lu %lu %llu %llu ",
 701                   0,   /* need a local port? */
 702                   xprt->stat.bind_count,
 703                   xprt->stat.connect_count,
 704                   xprt->stat.connect_time / HZ,
 705                   idle_time,
 706                   xprt->stat.sends,
 707                   xprt->stat.recvs,
 708                   xprt->stat.bad_xids,
 709                   xprt->stat.req_u,
 710                   xprt->stat.bklog_u);
 711        seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu ",
 712                   r_xprt->rx_stats.read_chunk_count,
 713                   r_xprt->rx_stats.write_chunk_count,
 714                   r_xprt->rx_stats.reply_chunk_count,
 715                   r_xprt->rx_stats.total_rdma_request,
 716                   r_xprt->rx_stats.total_rdma_reply,
 717                   r_xprt->rx_stats.pullup_copy_count,
 718                   r_xprt->rx_stats.fixup_copy_count,
 719                   r_xprt->rx_stats.hardway_register_count,
 720                   r_xprt->rx_stats.failed_marshal_count,
 721                   r_xprt->rx_stats.bad_reply_count,
 722                   r_xprt->rx_stats.nomsg_call_count);
 723        seq_printf(seq, "%lu %lu %lu %lu %lu %lu\n",
 724                   r_xprt->rx_stats.mrs_recycled,
 725                   r_xprt->rx_stats.mrs_orphaned,
 726                   r_xprt->rx_stats.mrs_allocated,
 727                   r_xprt->rx_stats.local_inv_needed,
 728                   r_xprt->rx_stats.empty_sendctx_q,
 729                   r_xprt->rx_stats.reply_waits_for_send);
 730}
 731
 732static int
 733xprt_rdma_enable_swap(struct rpc_xprt *xprt)
 734{
 735        return 0;
 736}
 737
 738static void
 739xprt_rdma_disable_swap(struct rpc_xprt *xprt)
 740{
 741}
 742
 743/*
 744 * Plumbing for rpc transport switch and kernel module
 745 */
 746
 747static const struct rpc_xprt_ops xprt_rdma_procs = {
 748        .reserve_xprt           = xprt_reserve_xprt_cong,
 749        .release_xprt           = xprt_release_xprt_cong, /* sunrpc/xprt.c */
 750        .alloc_slot             = xprt_rdma_alloc_slot,
 751        .free_slot              = xprt_rdma_free_slot,
 752        .release_request        = xprt_release_rqst_cong,       /* ditto */
 753        .wait_for_reply_request = xprt_wait_for_reply_request_def, /* ditto */
 754        .timer                  = xprt_rdma_timer,
 755        .rpcbind                = rpcb_getport_async,   /* sunrpc/rpcb_clnt.c */
 756        .set_port               = xprt_rdma_set_port,
 757        .connect                = xprt_rdma_connect,
 758        .buf_alloc              = xprt_rdma_allocate,
 759        .buf_free               = xprt_rdma_free,
 760        .send_request           = xprt_rdma_send_request,
 761        .close                  = xprt_rdma_close,
 762        .destroy                = xprt_rdma_destroy,
 763        .print_stats            = xprt_rdma_print_stats,
 764        .enable_swap            = xprt_rdma_enable_swap,
 765        .disable_swap           = xprt_rdma_disable_swap,
 766        .inject_disconnect      = xprt_rdma_inject_disconnect,
 767#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 768        .bc_setup               = xprt_rdma_bc_setup,
 769        .bc_maxpayload          = xprt_rdma_bc_maxpayload,
 770        .bc_free_rqst           = xprt_rdma_bc_free_rqst,
 771        .bc_destroy             = xprt_rdma_bc_destroy,
 772#endif
 773};
 774
 775static struct xprt_class xprt_rdma = {
 776        .list                   = LIST_HEAD_INIT(xprt_rdma.list),
 777        .name                   = "rdma",
 778        .owner                  = THIS_MODULE,
 779        .ident                  = XPRT_TRANSPORT_RDMA,
 780        .setup                  = xprt_setup_rdma,
 781};
 782
 783void xprt_rdma_cleanup(void)
 784{
 785#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 786        if (sunrpc_table_header) {
 787                unregister_sysctl_table(sunrpc_table_header);
 788                sunrpc_table_header = NULL;
 789        }
 790#endif
 791
 792        xprt_unregister_transport(&xprt_rdma);
 793        xprt_unregister_transport(&xprt_rdma_bc);
 794}
 795
 796int xprt_rdma_init(void)
 797{
 798        int rc;
 799
 800        rc = xprt_register_transport(&xprt_rdma);
 801        if (rc)
 802                return rc;
 803
 804        rc = xprt_register_transport(&xprt_rdma_bc);
 805        if (rc) {
 806                xprt_unregister_transport(&xprt_rdma);
 807                return rc;
 808        }
 809
 810#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 811        if (!sunrpc_table_header)
 812                sunrpc_table_header = register_sysctl_table(sunrpc_table);
 813#endif
 814        return 0;
 815}
 816