linux/net/sunrpc/xprt.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/*
   3 *  linux/net/sunrpc/xprt.c
   4 *
   5 *  This is a generic RPC call interface supporting congestion avoidance,
   6 *  and asynchronous calls.
   7 *
   8 *  The interface works like this:
   9 *
  10 *  -   When a process places a call, it allocates a request slot if
  11 *      one is available. Otherwise, it sleeps on the backlog queue
  12 *      (xprt_reserve).
  13 *  -   Next, the caller puts together the RPC message, stuffs it into
  14 *      the request struct, and calls xprt_transmit().
  15 *  -   xprt_transmit sends the message and installs the caller on the
  16 *      transport's wait list. At the same time, if a reply is expected,
  17 *      it installs a timer that is run after the packet's timeout has
  18 *      expired.
  19 *  -   When a packet arrives, the data_ready handler walks the list of
  20 *      pending requests for that transport. If a matching XID is found, the
  21 *      caller is woken up, and the timer removed.
  22 *  -   When no reply arrives within the timeout interval, the timer is
  23 *      fired by the kernel and runs xprt_timer(). It either adjusts the
  24 *      timeout values (minor timeout) or wakes up the caller with a status
  25 *      of -ETIMEDOUT.
  26 *  -   When the caller receives a notification from RPC that a reply arrived,
  27 *      it should release the RPC slot, and process the reply.
  28 *      If the call timed out, it may choose to retry the operation by
  29 *      adjusting the initial timeout value, and simply calling rpc_call
  30 *      again.
  31 *
  32 *  Support for async RPC is done through a set of RPC-specific scheduling
  33 *  primitives that `transparently' work for processes as well as async
  34 *  tasks that rely on callbacks.
  35 *
  36 *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
  37 *
  38 *  Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
  39 */
  40
  41#include <linux/module.h>
  42
  43#include <linux/types.h>
  44#include <linux/interrupt.h>
  45#include <linux/workqueue.h>
  46#include <linux/net.h>
  47#include <linux/ktime.h>
  48
  49#include <linux/sunrpc/clnt.h>
  50#include <linux/sunrpc/metrics.h>
  51#include <linux/sunrpc/bc_xprt.h>
  52#include <linux/rcupdate.h>
  53#include <linux/sched/mm.h>
  54
  55#include <trace/events/sunrpc.h>
  56
  57#include "sunrpc.h"
  58#include "sysfs.h"
  59#include "fail.h"
  60
  61/*
  62 * Local variables
  63 */
  64
  65#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  66# define RPCDBG_FACILITY        RPCDBG_XPRT
  67#endif
  68
  69/*
  70 * Local functions
  71 */
  72static void      xprt_init(struct rpc_xprt *xprt, struct net *net);
  73static __be32   xprt_alloc_xid(struct rpc_xprt *xprt);
  74static void      xprt_destroy(struct rpc_xprt *xprt);
  75static void      xprt_request_init(struct rpc_task *task);
  76
  77static DEFINE_SPINLOCK(xprt_list_lock);
  78static LIST_HEAD(xprt_list);
  79
  80static unsigned long xprt_request_timeout(const struct rpc_rqst *req)
  81{
  82        unsigned long timeout = jiffies + req->rq_timeout;
  83
  84        if (time_before(timeout, req->rq_majortimeo))
  85                return timeout;
  86        return req->rq_majortimeo;
  87}
  88
  89/**
  90 * xprt_register_transport - register a transport implementation
  91 * @transport: transport to register
  92 *
  93 * If a transport implementation is loaded as a kernel module, it can
  94 * call this interface to make itself known to the RPC client.
  95 *
  96 * Returns:
  97 * 0:           transport successfully registered
  98 * -EEXIST:     transport already registered
  99 * -EINVAL:     transport module being unloaded
 100 */
 101int xprt_register_transport(struct xprt_class *transport)
 102{
 103        struct xprt_class *t;
 104        int result;
 105
 106        result = -EEXIST;
 107        spin_lock(&xprt_list_lock);
 108        list_for_each_entry(t, &xprt_list, list) {
 109                /* don't register the same transport class twice */
 110                if (t->ident == transport->ident)
 111                        goto out;
 112        }
 113
 114        list_add_tail(&transport->list, &xprt_list);
 115        printk(KERN_INFO "RPC: Registered %s transport module.\n",
 116               transport->name);
 117        result = 0;
 118
 119out:
 120        spin_unlock(&xprt_list_lock);
 121        return result;
 122}
 123EXPORT_SYMBOL_GPL(xprt_register_transport);
 124
 125/**
 126 * xprt_unregister_transport - unregister a transport implementation
 127 * @transport: transport to unregister
 128 *
 129 * Returns:
 130 * 0:           transport successfully unregistered
 131 * -ENOENT:     transport never registered
 132 */
 133int xprt_unregister_transport(struct xprt_class *transport)
 134{
 135        struct xprt_class *t;
 136        int result;
 137
 138        result = 0;
 139        spin_lock(&xprt_list_lock);
 140        list_for_each_entry(t, &xprt_list, list) {
 141                if (t == transport) {
 142                        printk(KERN_INFO
 143                                "RPC: Unregistered %s transport module.\n",
 144                                transport->name);
 145                        list_del_init(&transport->list);
 146                        goto out;
 147                }
 148        }
 149        result = -ENOENT;
 150
 151out:
 152        spin_unlock(&xprt_list_lock);
 153        return result;
 154}
 155EXPORT_SYMBOL_GPL(xprt_unregister_transport);
 156
 157static void
 158xprt_class_release(const struct xprt_class *t)
 159{
 160        module_put(t->owner);
 161}
 162
 163static const struct xprt_class *
 164xprt_class_find_by_ident_locked(int ident)
 165{
 166        const struct xprt_class *t;
 167
 168        list_for_each_entry(t, &xprt_list, list) {
 169                if (t->ident != ident)
 170                        continue;
 171                if (!try_module_get(t->owner))
 172                        continue;
 173                return t;
 174        }
 175        return NULL;
 176}
 177
 178static const struct xprt_class *
 179xprt_class_find_by_ident(int ident)
 180{
 181        const struct xprt_class *t;
 182
 183        spin_lock(&xprt_list_lock);
 184        t = xprt_class_find_by_ident_locked(ident);
 185        spin_unlock(&xprt_list_lock);
 186        return t;
 187}
 188
 189static const struct xprt_class *
 190xprt_class_find_by_netid_locked(const char *netid)
 191{
 192        const struct xprt_class *t;
 193        unsigned int i;
 194
 195        list_for_each_entry(t, &xprt_list, list) {
 196                for (i = 0; t->netid[i][0] != '\0'; i++) {
 197                        if (strcmp(t->netid[i], netid) != 0)
 198                                continue;
 199                        if (!try_module_get(t->owner))
 200                                continue;
 201                        return t;
 202                }
 203        }
 204        return NULL;
 205}
 206
 207static const struct xprt_class *
 208xprt_class_find_by_netid(const char *netid)
 209{
 210        const struct xprt_class *t;
 211
 212        spin_lock(&xprt_list_lock);
 213        t = xprt_class_find_by_netid_locked(netid);
 214        if (!t) {
 215                spin_unlock(&xprt_list_lock);
 216                request_module("rpc%s", netid);
 217                spin_lock(&xprt_list_lock);
 218                t = xprt_class_find_by_netid_locked(netid);
 219        }
 220        spin_unlock(&xprt_list_lock);
 221        return t;
 222}
 223
 224/**
 225 * xprt_find_transport_ident - convert a netid into a transport identifier
 226 * @netid: transport to load
 227 *
 228 * Returns:
 229 * > 0:         transport identifier
 230 * -ENOENT:     transport module not available
 231 */
 232int xprt_find_transport_ident(const char *netid)
 233{
 234        const struct xprt_class *t;
 235        int ret;
 236
 237        t = xprt_class_find_by_netid(netid);
 238        if (!t)
 239                return -ENOENT;
 240        ret = t->ident;
 241        xprt_class_release(t);
 242        return ret;
 243}
 244EXPORT_SYMBOL_GPL(xprt_find_transport_ident);
 245
 246static void xprt_clear_locked(struct rpc_xprt *xprt)
 247{
 248        xprt->snd_task = NULL;
 249        if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
 250                smp_mb__before_atomic();
 251                clear_bit(XPRT_LOCKED, &xprt->state);
 252                smp_mb__after_atomic();
 253        } else
 254                queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 255}
 256
 257/**
 258 * xprt_reserve_xprt - serialize write access to transports
 259 * @task: task that is requesting access to the transport
 260 * @xprt: pointer to the target transport
 261 *
 262 * This prevents mixing the payload of separate requests, and prevents
 263 * transport connects from colliding with writes.  No congestion control
 264 * is provided.
 265 */
 266int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 267{
 268        struct rpc_rqst *req = task->tk_rqstp;
 269
 270        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
 271                if (task == xprt->snd_task)
 272                        goto out_locked;
 273                goto out_sleep;
 274        }
 275        if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
 276                goto out_unlock;
 277        xprt->snd_task = task;
 278
 279out_locked:
 280        trace_xprt_reserve_xprt(xprt, task);
 281        return 1;
 282
 283out_unlock:
 284        xprt_clear_locked(xprt);
 285out_sleep:
 286        task->tk_status = -EAGAIN;
 287        if  (RPC_IS_SOFT(task))
 288                rpc_sleep_on_timeout(&xprt->sending, task, NULL,
 289                                xprt_request_timeout(req));
 290        else
 291                rpc_sleep_on(&xprt->sending, task, NULL);
 292        return 0;
 293}
 294EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
 295
 296static bool
 297xprt_need_congestion_window_wait(struct rpc_xprt *xprt)
 298{
 299        return test_bit(XPRT_CWND_WAIT, &xprt->state);
 300}
 301
 302static void
 303xprt_set_congestion_window_wait(struct rpc_xprt *xprt)
 304{
 305        if (!list_empty(&xprt->xmit_queue)) {
 306                /* Peek at head of queue to see if it can make progress */
 307                if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst,
 308                                        rq_xmit)->rq_cong)
 309                        return;
 310        }
 311        set_bit(XPRT_CWND_WAIT, &xprt->state);
 312}
 313
 314static void
 315xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt)
 316{
 317        if (!RPCXPRT_CONGESTED(xprt))
 318                clear_bit(XPRT_CWND_WAIT, &xprt->state);
 319}
 320
 321/*
 322 * xprt_reserve_xprt_cong - serialize write access to transports
 323 * @task: task that is requesting access to the transport
 324 *
 325 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
 326 * integrated into the decision of whether a request is allowed to be
 327 * woken up and given access to the transport.
 328 * Note that the lock is only granted if we know there are free slots.
 329 */
 330int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 331{
 332        struct rpc_rqst *req = task->tk_rqstp;
 333
 334        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
 335                if (task == xprt->snd_task)
 336                        goto out_locked;
 337                goto out_sleep;
 338        }
 339        if (req == NULL) {
 340                xprt->snd_task = task;
 341                goto out_locked;
 342        }
 343        if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
 344                goto out_unlock;
 345        if (!xprt_need_congestion_window_wait(xprt)) {
 346                xprt->snd_task = task;
 347                goto out_locked;
 348        }
 349out_unlock:
 350        xprt_clear_locked(xprt);
 351out_sleep:
 352        task->tk_status = -EAGAIN;
 353        if (RPC_IS_SOFT(task))
 354                rpc_sleep_on_timeout(&xprt->sending, task, NULL,
 355                                xprt_request_timeout(req));
 356        else
 357                rpc_sleep_on(&xprt->sending, task, NULL);
 358        return 0;
 359out_locked:
 360        trace_xprt_reserve_cong(xprt, task);
 361        return 1;
 362}
 363EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
 364
 365static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
 366{
 367        int retval;
 368
 369        if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task)
 370                return 1;
 371        spin_lock(&xprt->transport_lock);
 372        retval = xprt->ops->reserve_xprt(xprt, task);
 373        spin_unlock(&xprt->transport_lock);
 374        return retval;
 375}
 376
 377static bool __xprt_lock_write_func(struct rpc_task *task, void *data)
 378{
 379        struct rpc_xprt *xprt = data;
 380
 381        xprt->snd_task = task;
 382        return true;
 383}
 384
 385static void __xprt_lock_write_next(struct rpc_xprt *xprt)
 386{
 387        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 388                return;
 389        if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
 390                goto out_unlock;
 391        if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
 392                                __xprt_lock_write_func, xprt))
 393                return;
 394out_unlock:
 395        xprt_clear_locked(xprt);
 396}
 397
 398static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
 399{
 400        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 401                return;
 402        if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
 403                goto out_unlock;
 404        if (xprt_need_congestion_window_wait(xprt))
 405                goto out_unlock;
 406        if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
 407                                __xprt_lock_write_func, xprt))
 408                return;
 409out_unlock:
 410        xprt_clear_locked(xprt);
 411}
 412
 413/**
 414 * xprt_release_xprt - allow other requests to use a transport
 415 * @xprt: transport with other tasks potentially waiting
 416 * @task: task that is releasing access to the transport
 417 *
 418 * Note that "task" can be NULL.  No congestion control is provided.
 419 */
 420void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 421{
 422        if (xprt->snd_task == task) {
 423                xprt_clear_locked(xprt);
 424                __xprt_lock_write_next(xprt);
 425        }
 426        trace_xprt_release_xprt(xprt, task);
 427}
 428EXPORT_SYMBOL_GPL(xprt_release_xprt);
 429
 430/**
 431 * xprt_release_xprt_cong - allow other requests to use a transport
 432 * @xprt: transport with other tasks potentially waiting
 433 * @task: task that is releasing access to the transport
 434 *
 435 * Note that "task" can be NULL.  Another task is awoken to use the
 436 * transport if the transport's congestion window allows it.
 437 */
 438void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 439{
 440        if (xprt->snd_task == task) {
 441                xprt_clear_locked(xprt);
 442                __xprt_lock_write_next_cong(xprt);
 443        }
 444        trace_xprt_release_cong(xprt, task);
 445}
 446EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
 447
 448void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
 449{
 450        if (xprt->snd_task != task)
 451                return;
 452        spin_lock(&xprt->transport_lock);
 453        xprt->ops->release_xprt(xprt, task);
 454        spin_unlock(&xprt->transport_lock);
 455}
 456
 457/*
 458 * Van Jacobson congestion avoidance. Check if the congestion window
 459 * overflowed. Put the task to sleep if this is the case.
 460 */
 461static int
 462__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
 463{
 464        if (req->rq_cong)
 465                return 1;
 466        trace_xprt_get_cong(xprt, req->rq_task);
 467        if (RPCXPRT_CONGESTED(xprt)) {
 468                xprt_set_congestion_window_wait(xprt);
 469                return 0;
 470        }
 471        req->rq_cong = 1;
 472        xprt->cong += RPC_CWNDSCALE;
 473        return 1;
 474}
 475
 476/*
 477 * Adjust the congestion window, and wake up the next task
 478 * that has been sleeping due to congestion
 479 */
 480static void
 481__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
 482{
 483        if (!req->rq_cong)
 484                return;
 485        req->rq_cong = 0;
 486        xprt->cong -= RPC_CWNDSCALE;
 487        xprt_test_and_clear_congestion_window_wait(xprt);
 488        trace_xprt_put_cong(xprt, req->rq_task);
 489        __xprt_lock_write_next_cong(xprt);
 490}
 491
 492/**
 493 * xprt_request_get_cong - Request congestion control credits
 494 * @xprt: pointer to transport
 495 * @req: pointer to RPC request
 496 *
 497 * Useful for transports that require congestion control.
 498 */
 499bool
 500xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
 501{
 502        bool ret = false;
 503
 504        if (req->rq_cong)
 505                return true;
 506        spin_lock(&xprt->transport_lock);
 507        ret = __xprt_get_cong(xprt, req) != 0;
 508        spin_unlock(&xprt->transport_lock);
 509        return ret;
 510}
 511EXPORT_SYMBOL_GPL(xprt_request_get_cong);
 512
 513/**
 514 * xprt_release_rqst_cong - housekeeping when request is complete
 515 * @task: RPC request that recently completed
 516 *
 517 * Useful for transports that require congestion control.
 518 */
 519void xprt_release_rqst_cong(struct rpc_task *task)
 520{
 521        struct rpc_rqst *req = task->tk_rqstp;
 522
 523        __xprt_put_cong(req->rq_xprt, req);
 524}
 525EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
 526
 527static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt)
 528{
 529        if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state))
 530                __xprt_lock_write_next_cong(xprt);
 531}
 532
 533/*
 534 * Clear the congestion window wait flag and wake up the next
 535 * entry on xprt->sending
 536 */
 537static void
 538xprt_clear_congestion_window_wait(struct rpc_xprt *xprt)
 539{
 540        if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) {
 541                spin_lock(&xprt->transport_lock);
 542                __xprt_lock_write_next_cong(xprt);
 543                spin_unlock(&xprt->transport_lock);
 544        }
 545}
 546
 547/**
 548 * xprt_adjust_cwnd - adjust transport congestion window
 549 * @xprt: pointer to xprt
 550 * @task: recently completed RPC request used to adjust window
 551 * @result: result code of completed RPC request
 552 *
 553 * The transport code maintains an estimate on the maximum number of out-
 554 * standing RPC requests, using a smoothed version of the congestion
 555 * avoidance implemented in 44BSD. This is basically the Van Jacobson
 556 * congestion algorithm: If a retransmit occurs, the congestion window is
 557 * halved; otherwise, it is incremented by 1/cwnd when
 558 *
 559 *      -       a reply is received and
 560 *      -       a full number of requests are outstanding and
 561 *      -       the congestion window hasn't been updated recently.
 562 */
 563void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result)
 564{
 565        struct rpc_rqst *req = task->tk_rqstp;
 566        unsigned long cwnd = xprt->cwnd;
 567
 568        if (result >= 0 && cwnd <= xprt->cong) {
 569                /* The (cwnd >> 1) term makes sure
 570                 * the result gets rounded properly. */
 571                cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
 572                if (cwnd > RPC_MAXCWND(xprt))
 573                        cwnd = RPC_MAXCWND(xprt);
 574                __xprt_lock_write_next_cong(xprt);
 575        } else if (result == -ETIMEDOUT) {
 576                cwnd >>= 1;
 577                if (cwnd < RPC_CWNDSCALE)
 578                        cwnd = RPC_CWNDSCALE;
 579        }
 580        dprintk("RPC:       cong %ld, cwnd was %ld, now %ld\n",
 581                        xprt->cong, xprt->cwnd, cwnd);
 582        xprt->cwnd = cwnd;
 583        __xprt_put_cong(xprt, req);
 584}
 585EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
 586
 587/**
 588 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
 589 * @xprt: transport with waiting tasks
 590 * @status: result code to plant in each task before waking it
 591 *
 592 */
 593void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
 594{
 595        if (status < 0)
 596                rpc_wake_up_status(&xprt->pending, status);
 597        else
 598                rpc_wake_up(&xprt->pending);
 599}
 600EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
 601
 602/**
 603 * xprt_wait_for_buffer_space - wait for transport output buffer to clear
 604 * @xprt: transport
 605 *
 606 * Note that we only set the timer for the case of RPC_IS_SOFT(), since
 607 * we don't in general want to force a socket disconnection due to
 608 * an incomplete RPC call transmission.
 609 */
 610void xprt_wait_for_buffer_space(struct rpc_xprt *xprt)
 611{
 612        set_bit(XPRT_WRITE_SPACE, &xprt->state);
 613}
 614EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
 615
 616static bool
 617xprt_clear_write_space_locked(struct rpc_xprt *xprt)
 618{
 619        if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) {
 620                __xprt_lock_write_next(xprt);
 621                dprintk("RPC:       write space: waking waiting task on "
 622                                "xprt %p\n", xprt);
 623                return true;
 624        }
 625        return false;
 626}
 627
 628/**
 629 * xprt_write_space - wake the task waiting for transport output buffer space
 630 * @xprt: transport with waiting tasks
 631 *
 632 * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
 633 */
 634bool xprt_write_space(struct rpc_xprt *xprt)
 635{
 636        bool ret;
 637
 638        if (!test_bit(XPRT_WRITE_SPACE, &xprt->state))
 639                return false;
 640        spin_lock(&xprt->transport_lock);
 641        ret = xprt_clear_write_space_locked(xprt);
 642        spin_unlock(&xprt->transport_lock);
 643        return ret;
 644}
 645EXPORT_SYMBOL_GPL(xprt_write_space);
 646
 647static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime)
 648{
 649        s64 delta = ktime_to_ns(ktime_get() - abstime);
 650        return likely(delta >= 0) ?
 651                jiffies - nsecs_to_jiffies(delta) :
 652                jiffies + nsecs_to_jiffies(-delta);
 653}
 654
 655static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req)
 656{
 657        const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
 658        unsigned long majortimeo = req->rq_timeout;
 659
 660        if (to->to_exponential)
 661                majortimeo <<= to->to_retries;
 662        else
 663                majortimeo += to->to_increment * to->to_retries;
 664        if (majortimeo > to->to_maxval || majortimeo == 0)
 665                majortimeo = to->to_maxval;
 666        return majortimeo;
 667}
 668
 669static void xprt_reset_majortimeo(struct rpc_rqst *req)
 670{
 671        req->rq_majortimeo += xprt_calc_majortimeo(req);
 672}
 673
 674static void xprt_reset_minortimeo(struct rpc_rqst *req)
 675{
 676        req->rq_minortimeo += req->rq_timeout;
 677}
 678
 679static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req)
 680{
 681        unsigned long time_init;
 682        struct rpc_xprt *xprt = req->rq_xprt;
 683
 684        if (likely(xprt && xprt_connected(xprt)))
 685                time_init = jiffies;
 686        else
 687                time_init = xprt_abs_ktime_to_jiffies(task->tk_start);
 688        req->rq_timeout = task->tk_client->cl_timeout->to_initval;
 689        req->rq_majortimeo = time_init + xprt_calc_majortimeo(req);
 690        req->rq_minortimeo = time_init + req->rq_timeout;
 691}
 692
 693/**
 694 * xprt_adjust_timeout - adjust timeout values for next retransmit
 695 * @req: RPC request containing parameters to use for the adjustment
 696 *
 697 */
 698int xprt_adjust_timeout(struct rpc_rqst *req)
 699{
 700        struct rpc_xprt *xprt = req->rq_xprt;
 701        const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
 702        int status = 0;
 703
 704        if (time_before(jiffies, req->rq_majortimeo)) {
 705                if (time_before(jiffies, req->rq_minortimeo))
 706                        return status;
 707                if (to->to_exponential)
 708                        req->rq_timeout <<= 1;
 709                else
 710                        req->rq_timeout += to->to_increment;
 711                if (to->to_maxval && req->rq_timeout >= to->to_maxval)
 712                        req->rq_timeout = to->to_maxval;
 713                req->rq_retries++;
 714        } else {
 715                req->rq_timeout = to->to_initval;
 716                req->rq_retries = 0;
 717                xprt_reset_majortimeo(req);
 718                /* Reset the RTT counters == "slow start" */
 719                spin_lock(&xprt->transport_lock);
 720                rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
 721                spin_unlock(&xprt->transport_lock);
 722                status = -ETIMEDOUT;
 723        }
 724        xprt_reset_minortimeo(req);
 725
 726        if (req->rq_timeout == 0) {
 727                printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
 728                req->rq_timeout = 5 * HZ;
 729        }
 730        return status;
 731}
 732
 733static void xprt_autoclose(struct work_struct *work)
 734{
 735        struct rpc_xprt *xprt =
 736                container_of(work, struct rpc_xprt, task_cleanup);
 737        unsigned int pflags = memalloc_nofs_save();
 738
 739        trace_xprt_disconnect_auto(xprt);
 740        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
 741        xprt->ops->close(xprt);
 742        xprt_release_write(xprt, NULL);
 743        wake_up_bit(&xprt->state, XPRT_LOCKED);
 744        memalloc_nofs_restore(pflags);
 745}
 746
 747/**
 748 * xprt_disconnect_done - mark a transport as disconnected
 749 * @xprt: transport to flag for disconnect
 750 *
 751 */
 752void xprt_disconnect_done(struct rpc_xprt *xprt)
 753{
 754        trace_xprt_disconnect_done(xprt);
 755        spin_lock(&xprt->transport_lock);
 756        xprt_clear_connected(xprt);
 757        xprt_clear_write_space_locked(xprt);
 758        xprt_clear_congestion_window_wait_locked(xprt);
 759        xprt_wake_pending_tasks(xprt, -ENOTCONN);
 760        spin_unlock(&xprt->transport_lock);
 761}
 762EXPORT_SYMBOL_GPL(xprt_disconnect_done);
 763
 764/**
 765 * xprt_schedule_autoclose_locked - Try to schedule an autoclose RPC call
 766 * @xprt: transport to disconnect
 767 */
 768static void xprt_schedule_autoclose_locked(struct rpc_xprt *xprt)
 769{
 770        set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 771        if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
 772                queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 773        else if (xprt->snd_task && !test_bit(XPRT_SND_IS_COOKIE, &xprt->state))
 774                rpc_wake_up_queued_task_set_status(&xprt->pending,
 775                                                   xprt->snd_task, -ENOTCONN);
 776}
 777
 778/**
 779 * xprt_force_disconnect - force a transport to disconnect
 780 * @xprt: transport to disconnect
 781 *
 782 */
 783void xprt_force_disconnect(struct rpc_xprt *xprt)
 784{
 785        trace_xprt_disconnect_force(xprt);
 786
 787        /* Don't race with the test_bit() in xprt_clear_locked() */
 788        spin_lock(&xprt->transport_lock);
 789        xprt_schedule_autoclose_locked(xprt);
 790        spin_unlock(&xprt->transport_lock);
 791}
 792EXPORT_SYMBOL_GPL(xprt_force_disconnect);
 793
 794static unsigned int
 795xprt_connect_cookie(struct rpc_xprt *xprt)
 796{
 797        return READ_ONCE(xprt->connect_cookie);
 798}
 799
 800static bool
 801xprt_request_retransmit_after_disconnect(struct rpc_task *task)
 802{
 803        struct rpc_rqst *req = task->tk_rqstp;
 804        struct rpc_xprt *xprt = req->rq_xprt;
 805
 806        return req->rq_connect_cookie != xprt_connect_cookie(xprt) ||
 807                !xprt_connected(xprt);
 808}
 809
 810/**
 811 * xprt_conditional_disconnect - force a transport to disconnect
 812 * @xprt: transport to disconnect
 813 * @cookie: 'connection cookie'
 814 *
 815 * This attempts to break the connection if and only if 'cookie' matches
 816 * the current transport 'connection cookie'. It ensures that we don't
 817 * try to break the connection more than once when we need to retransmit
 818 * a batch of RPC requests.
 819 *
 820 */
 821void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
 822{
 823        /* Don't race with the test_bit() in xprt_clear_locked() */
 824        spin_lock(&xprt->transport_lock);
 825        if (cookie != xprt->connect_cookie)
 826                goto out;
 827        if (test_bit(XPRT_CLOSING, &xprt->state))
 828                goto out;
 829        xprt_schedule_autoclose_locked(xprt);
 830out:
 831        spin_unlock(&xprt->transport_lock);
 832}
 833
 834static bool
 835xprt_has_timer(const struct rpc_xprt *xprt)
 836{
 837        return xprt->idle_timeout != 0;
 838}
 839
 840static void
 841xprt_schedule_autodisconnect(struct rpc_xprt *xprt)
 842        __must_hold(&xprt->transport_lock)
 843{
 844        xprt->last_used = jiffies;
 845        if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt))
 846                mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout);
 847}
 848
 849static void
 850xprt_init_autodisconnect(struct timer_list *t)
 851{
 852        struct rpc_xprt *xprt = from_timer(xprt, t, timer);
 853
 854        if (!RB_EMPTY_ROOT(&xprt->recv_queue))
 855                return;
 856        /* Reset xprt->last_used to avoid connect/autodisconnect cycling */
 857        xprt->last_used = jiffies;
 858        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 859                return;
 860        queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 861}
 862
 863#if IS_ENABLED(CONFIG_FAIL_SUNRPC)
 864static void xprt_inject_disconnect(struct rpc_xprt *xprt)
 865{
 866        if (!fail_sunrpc.ignore_client_disconnect &&
 867            should_fail(&fail_sunrpc.attr, 1))
 868                xprt->ops->inject_disconnect(xprt);
 869}
 870#else
 871static inline void xprt_inject_disconnect(struct rpc_xprt *xprt)
 872{
 873}
 874#endif
 875
 876bool xprt_lock_connect(struct rpc_xprt *xprt,
 877                struct rpc_task *task,
 878                void *cookie)
 879{
 880        bool ret = false;
 881
 882        spin_lock(&xprt->transport_lock);
 883        if (!test_bit(XPRT_LOCKED, &xprt->state))
 884                goto out;
 885        if (xprt->snd_task != task)
 886                goto out;
 887        set_bit(XPRT_SND_IS_COOKIE, &xprt->state);
 888        xprt->snd_task = cookie;
 889        ret = true;
 890out:
 891        spin_unlock(&xprt->transport_lock);
 892        return ret;
 893}
 894EXPORT_SYMBOL_GPL(xprt_lock_connect);
 895
 896void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
 897{
 898        spin_lock(&xprt->transport_lock);
 899        if (xprt->snd_task != cookie)
 900                goto out;
 901        if (!test_bit(XPRT_LOCKED, &xprt->state))
 902                goto out;
 903        xprt->snd_task =NULL;
 904        clear_bit(XPRT_SND_IS_COOKIE, &xprt->state);
 905        xprt->ops->release_xprt(xprt, NULL);
 906        xprt_schedule_autodisconnect(xprt);
 907out:
 908        spin_unlock(&xprt->transport_lock);
 909        wake_up_bit(&xprt->state, XPRT_LOCKED);
 910}
 911EXPORT_SYMBOL_GPL(xprt_unlock_connect);
 912
 913/**
 914 * xprt_connect - schedule a transport connect operation
 915 * @task: RPC task that is requesting the connect
 916 *
 917 */
 918void xprt_connect(struct rpc_task *task)
 919{
 920        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
 921
 922        trace_xprt_connect(xprt);
 923
 924        if (!xprt_bound(xprt)) {
 925                task->tk_status = -EAGAIN;
 926                return;
 927        }
 928        if (!xprt_lock_write(xprt, task))
 929                return;
 930
 931        if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
 932                trace_xprt_disconnect_cleanup(xprt);
 933                xprt->ops->close(xprt);
 934        }
 935
 936        if (!xprt_connected(xprt)) {
 937                task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie;
 938                rpc_sleep_on_timeout(&xprt->pending, task, NULL,
 939                                xprt_request_timeout(task->tk_rqstp));
 940
 941                if (test_bit(XPRT_CLOSING, &xprt->state))
 942                        return;
 943                if (xprt_test_and_set_connecting(xprt))
 944                        return;
 945                /* Race breaker */
 946                if (!xprt_connected(xprt)) {
 947                        xprt->stat.connect_start = jiffies;
 948                        xprt->ops->connect(xprt, task);
 949                } else {
 950                        xprt_clear_connecting(xprt);
 951                        task->tk_status = 0;
 952                        rpc_wake_up_queued_task(&xprt->pending, task);
 953                }
 954        }
 955        xprt_release_write(xprt, task);
 956}
 957
 958/**
 959 * xprt_reconnect_delay - compute the wait before scheduling a connect
 960 * @xprt: transport instance
 961 *
 962 */
 963unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt)
 964{
 965        unsigned long start, now = jiffies;
 966
 967        start = xprt->stat.connect_start + xprt->reestablish_timeout;
 968        if (time_after(start, now))
 969                return start - now;
 970        return 0;
 971}
 972EXPORT_SYMBOL_GPL(xprt_reconnect_delay);
 973
 974/**
 975 * xprt_reconnect_backoff - compute the new re-establish timeout
 976 * @xprt: transport instance
 977 * @init_to: initial reestablish timeout
 978 *
 979 */
 980void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to)
 981{
 982        xprt->reestablish_timeout <<= 1;
 983        if (xprt->reestablish_timeout > xprt->max_reconnect_timeout)
 984                xprt->reestablish_timeout = xprt->max_reconnect_timeout;
 985        if (xprt->reestablish_timeout < init_to)
 986                xprt->reestablish_timeout = init_to;
 987}
 988EXPORT_SYMBOL_GPL(xprt_reconnect_backoff);
 989
 990enum xprt_xid_rb_cmp {
 991        XID_RB_EQUAL,
 992        XID_RB_LEFT,
 993        XID_RB_RIGHT,
 994};
 995static enum xprt_xid_rb_cmp
 996xprt_xid_cmp(__be32 xid1, __be32 xid2)
 997{
 998        if (xid1 == xid2)
 999                return XID_RB_EQUAL;
1000        if ((__force u32)xid1 < (__force u32)xid2)
1001                return XID_RB_LEFT;
1002        return XID_RB_RIGHT;
1003}
1004
1005static struct rpc_rqst *
1006xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid)
1007{
1008        struct rb_node *n = xprt->recv_queue.rb_node;
1009        struct rpc_rqst *req;
1010
1011        while (n != NULL) {
1012                req = rb_entry(n, struct rpc_rqst, rq_recv);
1013                switch (xprt_xid_cmp(xid, req->rq_xid)) {
1014                case XID_RB_LEFT:
1015                        n = n->rb_left;
1016                        break;
1017                case XID_RB_RIGHT:
1018                        n = n->rb_right;
1019                        break;
1020                case XID_RB_EQUAL:
1021                        return req;
1022                }
1023        }
1024        return NULL;
1025}
1026
1027static void
1028xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new)
1029{
1030        struct rb_node **p = &xprt->recv_queue.rb_node;
1031        struct rb_node *n = NULL;
1032        struct rpc_rqst *req;
1033
1034        while (*p != NULL) {
1035                n = *p;
1036                req = rb_entry(n, struct rpc_rqst, rq_recv);
1037                switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) {
1038                case XID_RB_LEFT:
1039                        p = &n->rb_left;
1040                        break;
1041                case XID_RB_RIGHT:
1042                        p = &n->rb_right;
1043                        break;
1044                case XID_RB_EQUAL:
1045                        WARN_ON_ONCE(new != req);
1046                        return;
1047                }
1048        }
1049        rb_link_node(&new->rq_recv, n, p);
1050        rb_insert_color(&new->rq_recv, &xprt->recv_queue);
1051}
1052
1053static void
1054xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req)
1055{
1056        rb_erase(&req->rq_recv, &xprt->recv_queue);
1057}
1058
1059/**
1060 * xprt_lookup_rqst - find an RPC request corresponding to an XID
1061 * @xprt: transport on which the original request was transmitted
1062 * @xid: RPC XID of incoming reply
1063 *
1064 * Caller holds xprt->queue_lock.
1065 */
1066struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
1067{
1068        struct rpc_rqst *entry;
1069
1070        entry = xprt_request_rb_find(xprt, xid);
1071        if (entry != NULL) {
1072                trace_xprt_lookup_rqst(xprt, xid, 0);
1073                entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
1074                return entry;
1075        }
1076
1077        dprintk("RPC:       xprt_lookup_rqst did not find xid %08x\n",
1078                        ntohl(xid));
1079        trace_xprt_lookup_rqst(xprt, xid, -ENOENT);
1080        xprt->stat.bad_xids++;
1081        return NULL;
1082}
1083EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
1084
1085static bool
1086xprt_is_pinned_rqst(struct rpc_rqst *req)
1087{
1088        return atomic_read(&req->rq_pin) != 0;
1089}
1090
1091/**
1092 * xprt_pin_rqst - Pin a request on the transport receive list
1093 * @req: Request to pin
1094 *
1095 * Caller must ensure this is atomic with the call to xprt_lookup_rqst()
1096 * so should be holding xprt->queue_lock.
1097 */
1098void xprt_pin_rqst(struct rpc_rqst *req)
1099{
1100        atomic_inc(&req->rq_pin);
1101}
1102EXPORT_SYMBOL_GPL(xprt_pin_rqst);
1103
1104/**
1105 * xprt_unpin_rqst - Unpin a request on the transport receive list
1106 * @req: Request to pin
1107 *
1108 * Caller should be holding xprt->queue_lock.
1109 */
1110void xprt_unpin_rqst(struct rpc_rqst *req)
1111{
1112        if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) {
1113                atomic_dec(&req->rq_pin);
1114                return;
1115        }
1116        if (atomic_dec_and_test(&req->rq_pin))
1117                wake_up_var(&req->rq_pin);
1118}
1119EXPORT_SYMBOL_GPL(xprt_unpin_rqst);
1120
1121static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
1122{
1123        wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req));
1124}
1125
1126static bool
1127xprt_request_data_received(struct rpc_task *task)
1128{
1129        return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
1130                READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0;
1131}
1132
1133static bool
1134xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req)
1135{
1136        return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
1137                READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0;
1138}
1139
1140/**
1141 * xprt_request_enqueue_receive - Add an request to the receive queue
1142 * @task: RPC task
1143 *
1144 */
1145void
1146xprt_request_enqueue_receive(struct rpc_task *task)
1147{
1148        struct rpc_rqst *req = task->tk_rqstp;
1149        struct rpc_xprt *xprt = req->rq_xprt;
1150
1151        if (!xprt_request_need_enqueue_receive(task, req))
1152                return;
1153
1154        xprt_request_prepare(task->tk_rqstp);
1155        spin_lock(&xprt->queue_lock);
1156
1157        /* Update the softirq receive buffer */
1158        memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
1159                        sizeof(req->rq_private_buf));
1160
1161        /* Add request to the receive list */
1162        xprt_request_rb_insert(xprt, req);
1163        set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
1164        spin_unlock(&xprt->queue_lock);
1165
1166        /* Turn off autodisconnect */
1167        del_singleshot_timer_sync(&xprt->timer);
1168}
1169
1170/**
1171 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue
1172 * @task: RPC task
1173 *
1174 * Caller must hold xprt->queue_lock.
1175 */
1176static void
1177xprt_request_dequeue_receive_locked(struct rpc_task *task)
1178{
1179        struct rpc_rqst *req = task->tk_rqstp;
1180
1181        if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
1182                xprt_request_rb_remove(req->rq_xprt, req);
1183}
1184
1185/**
1186 * xprt_update_rtt - Update RPC RTT statistics
1187 * @task: RPC request that recently completed
1188 *
1189 * Caller holds xprt->queue_lock.
1190 */
1191void xprt_update_rtt(struct rpc_task *task)
1192{
1193        struct rpc_rqst *req = task->tk_rqstp;
1194        struct rpc_rtt *rtt = task->tk_client->cl_rtt;
1195        unsigned int timer = task->tk_msg.rpc_proc->p_timer;
1196        long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt));
1197
1198        if (timer) {
1199                if (req->rq_ntrans == 1)
1200                        rpc_update_rtt(rtt, timer, m);
1201                rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
1202        }
1203}
1204EXPORT_SYMBOL_GPL(xprt_update_rtt);
1205
1206/**
1207 * xprt_complete_rqst - called when reply processing is complete
1208 * @task: RPC request that recently completed
1209 * @copied: actual number of bytes received from the transport
1210 *
1211 * Caller holds xprt->queue_lock.
1212 */
1213void xprt_complete_rqst(struct rpc_task *task, int copied)
1214{
1215        struct rpc_rqst *req = task->tk_rqstp;
1216        struct rpc_xprt *xprt = req->rq_xprt;
1217
1218        xprt->stat.recvs++;
1219
1220        req->rq_private_buf.len = copied;
1221        /* Ensure all writes are done before we update */
1222        /* req->rq_reply_bytes_recvd */
1223        smp_wmb();
1224        req->rq_reply_bytes_recvd = copied;
1225        xprt_request_dequeue_receive_locked(task);
1226        rpc_wake_up_queued_task(&xprt->pending, task);
1227}
1228EXPORT_SYMBOL_GPL(xprt_complete_rqst);
1229
1230static void xprt_timer(struct rpc_task *task)
1231{
1232        struct rpc_rqst *req = task->tk_rqstp;
1233        struct rpc_xprt *xprt = req->rq_xprt;
1234
1235        if (task->tk_status != -ETIMEDOUT)
1236                return;
1237
1238        trace_xprt_timer(xprt, req->rq_xid, task->tk_status);
1239        if (!req->rq_reply_bytes_recvd) {
1240                if (xprt->ops->timer)
1241                        xprt->ops->timer(xprt, task);
1242        } else
1243                task->tk_status = 0;
1244}
1245
1246/**
1247 * xprt_wait_for_reply_request_def - wait for reply
1248 * @task: pointer to rpc_task
1249 *
1250 * Set a request's retransmit timeout based on the transport's
1251 * default timeout parameters.  Used by transports that don't adjust
1252 * the retransmit timeout based on round-trip time estimation,
1253 * and put the task to sleep on the pending queue.
1254 */
1255void xprt_wait_for_reply_request_def(struct rpc_task *task)
1256{
1257        struct rpc_rqst *req = task->tk_rqstp;
1258
1259        rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer,
1260                        xprt_request_timeout(req));
1261}
1262EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def);
1263
1264/**
1265 * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator
1266 * @task: pointer to rpc_task
1267 *
1268 * Set a request's retransmit timeout using the RTT estimator,
1269 * and put the task to sleep on the pending queue.
1270 */
1271void xprt_wait_for_reply_request_rtt(struct rpc_task *task)
1272{
1273        int timer = task->tk_msg.rpc_proc->p_timer;
1274        struct rpc_clnt *clnt = task->tk_client;
1275        struct rpc_rtt *rtt = clnt->cl_rtt;
1276        struct rpc_rqst *req = task->tk_rqstp;
1277        unsigned long max_timeout = clnt->cl_timeout->to_maxval;
1278        unsigned long timeout;
1279
1280        timeout = rpc_calc_rto(rtt, timer);
1281        timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
1282        if (timeout > max_timeout || timeout == 0)
1283                timeout = max_timeout;
1284        rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer,
1285                        jiffies + timeout);
1286}
1287EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt);
1288
1289/**
1290 * xprt_request_wait_receive - wait for the reply to an RPC request
1291 * @task: RPC task about to send a request
1292 *
1293 */
1294void xprt_request_wait_receive(struct rpc_task *task)
1295{
1296        struct rpc_rqst *req = task->tk_rqstp;
1297        struct rpc_xprt *xprt = req->rq_xprt;
1298
1299        if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
1300                return;
1301        /*
1302         * Sleep on the pending queue if we're expecting a reply.
1303         * The spinlock ensures atomicity between the test of
1304         * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
1305         */
1306        spin_lock(&xprt->queue_lock);
1307        if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
1308                xprt->ops->wait_for_reply_request(task);
1309                /*
1310                 * Send an extra queue wakeup call if the
1311                 * connection was dropped in case the call to
1312                 * rpc_sleep_on() raced.
1313                 */
1314                if (xprt_request_retransmit_after_disconnect(task))
1315                        rpc_wake_up_queued_task_set_status(&xprt->pending,
1316                                        task, -ENOTCONN);
1317        }
1318        spin_unlock(&xprt->queue_lock);
1319}
1320
1321static bool
1322xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req)
1323{
1324        return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1325}
1326
1327/**
1328 * xprt_request_enqueue_transmit - queue a task for transmission
1329 * @task: pointer to rpc_task
1330 *
1331 * Add a task to the transmission queue.
1332 */
1333void
1334xprt_request_enqueue_transmit(struct rpc_task *task)
1335{
1336        struct rpc_rqst *pos, *req = task->tk_rqstp;
1337        struct rpc_xprt *xprt = req->rq_xprt;
1338
1339        if (xprt_request_need_enqueue_transmit(task, req)) {
1340                req->rq_bytes_sent = 0;
1341                spin_lock(&xprt->queue_lock);
1342                /*
1343                 * Requests that carry congestion control credits are added
1344                 * to the head of the list to avoid starvation issues.
1345                 */
1346                if (req->rq_cong) {
1347                        xprt_clear_congestion_window_wait(xprt);
1348                        list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1349                                if (pos->rq_cong)
1350                                        continue;
1351                                /* Note: req is added _before_ pos */
1352                                list_add_tail(&req->rq_xmit, &pos->rq_xmit);
1353                                INIT_LIST_HEAD(&req->rq_xmit2);
1354                                goto out;
1355                        }
1356                } else if (RPC_IS_SWAPPER(task)) {
1357                        list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1358                                if (pos->rq_cong || pos->rq_bytes_sent)
1359                                        continue;
1360                                if (RPC_IS_SWAPPER(pos->rq_task))
1361                                        continue;
1362                                /* Note: req is added _before_ pos */
1363                                list_add_tail(&req->rq_xmit, &pos->rq_xmit);
1364                                INIT_LIST_HEAD(&req->rq_xmit2);
1365                                goto out;
1366                        }
1367                } else if (!req->rq_seqno) {
1368                        list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1369                                if (pos->rq_task->tk_owner != task->tk_owner)
1370                                        continue;
1371                                list_add_tail(&req->rq_xmit2, &pos->rq_xmit2);
1372                                INIT_LIST_HEAD(&req->rq_xmit);
1373                                goto out;
1374                        }
1375                }
1376                list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
1377                INIT_LIST_HEAD(&req->rq_xmit2);
1378out:
1379                atomic_long_inc(&xprt->xmit_queuelen);
1380                set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1381                spin_unlock(&xprt->queue_lock);
1382        }
1383}
1384
1385/**
1386 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue
1387 * @task: pointer to rpc_task
1388 *
1389 * Remove a task from the transmission queue
1390 * Caller must hold xprt->queue_lock
1391 */
1392static void
1393xprt_request_dequeue_transmit_locked(struct rpc_task *task)
1394{
1395        struct rpc_rqst *req = task->tk_rqstp;
1396
1397        if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1398                return;
1399        if (!list_empty(&req->rq_xmit)) {
1400                list_del(&req->rq_xmit);
1401                if (!list_empty(&req->rq_xmit2)) {
1402                        struct rpc_rqst *next = list_first_entry(&req->rq_xmit2,
1403                                        struct rpc_rqst, rq_xmit2);
1404                        list_del(&req->rq_xmit2);
1405                        list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue);
1406                }
1407        } else
1408                list_del(&req->rq_xmit2);
1409        atomic_long_dec(&req->rq_xprt->xmit_queuelen);
1410}
1411
1412/**
1413 * xprt_request_dequeue_transmit - remove a task from the transmission queue
1414 * @task: pointer to rpc_task
1415 *
1416 * Remove a task from the transmission queue
1417 */
1418static void
1419xprt_request_dequeue_transmit(struct rpc_task *task)
1420{
1421        struct rpc_rqst *req = task->tk_rqstp;
1422        struct rpc_xprt *xprt = req->rq_xprt;
1423
1424        spin_lock(&xprt->queue_lock);
1425        xprt_request_dequeue_transmit_locked(task);
1426        spin_unlock(&xprt->queue_lock);
1427}
1428
1429/**
1430 * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue
1431 * @task: pointer to rpc_task
1432 *
1433 * Remove a task from the transmit and receive queues, and ensure that
1434 * it is not pinned by the receive work item.
1435 */
1436void
1437xprt_request_dequeue_xprt(struct rpc_task *task)
1438{
1439        struct rpc_rqst *req = task->tk_rqstp;
1440        struct rpc_xprt *xprt = req->rq_xprt;
1441
1442        if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
1443            test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
1444            xprt_is_pinned_rqst(req)) {
1445                spin_lock(&xprt->queue_lock);
1446                xprt_request_dequeue_transmit_locked(task);
1447                xprt_request_dequeue_receive_locked(task);
1448                while (xprt_is_pinned_rqst(req)) {
1449                        set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
1450                        spin_unlock(&xprt->queue_lock);
1451                        xprt_wait_on_pinned_rqst(req);
1452                        spin_lock(&xprt->queue_lock);
1453                        clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
1454                }
1455                spin_unlock(&xprt->queue_lock);
1456        }
1457}
1458
1459/**
1460 * xprt_request_prepare - prepare an encoded request for transport
1461 * @req: pointer to rpc_rqst
1462 *
1463 * Calls into the transport layer to do whatever is needed to prepare
1464 * the request for transmission or receive.
1465 */
1466void
1467xprt_request_prepare(struct rpc_rqst *req)
1468{
1469        struct rpc_xprt *xprt = req->rq_xprt;
1470
1471        if (xprt->ops->prepare_request)
1472                xprt->ops->prepare_request(req);
1473}
1474
1475/**
1476 * xprt_request_need_retransmit - Test if a task needs retransmission
1477 * @task: pointer to rpc_task
1478 *
1479 * Test for whether a connection breakage requires the task to retransmit
1480 */
1481bool
1482xprt_request_need_retransmit(struct rpc_task *task)
1483{
1484        return xprt_request_retransmit_after_disconnect(task);
1485}
1486
1487/**
1488 * xprt_prepare_transmit - reserve the transport before sending a request
1489 * @task: RPC task about to send a request
1490 *
1491 */
1492bool xprt_prepare_transmit(struct rpc_task *task)
1493{
1494        struct rpc_rqst *req = task->tk_rqstp;
1495        struct rpc_xprt *xprt = req->rq_xprt;
1496
1497        if (!xprt_lock_write(xprt, task)) {
1498                /* Race breaker: someone may have transmitted us */
1499                if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1500                        rpc_wake_up_queued_task_set_status(&xprt->sending,
1501                                        task, 0);
1502                return false;
1503
1504        }
1505        return true;
1506}
1507
1508void xprt_end_transmit(struct rpc_task *task)
1509{
1510        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1511
1512        xprt_inject_disconnect(xprt);
1513        xprt_release_write(xprt, task);
1514}
1515
1516/**
1517 * xprt_request_transmit - send an RPC request on a transport
1518 * @req: pointer to request to transmit
1519 * @snd_task: RPC task that owns the transport lock
1520 *
1521 * This performs the transmission of a single request.
1522 * Note that if the request is not the same as snd_task, then it
1523 * does need to be pinned.
1524 * Returns '0' on success.
1525 */
1526static int
1527xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
1528{
1529        struct rpc_xprt *xprt = req->rq_xprt;
1530        struct rpc_task *task = req->rq_task;
1531        unsigned int connect_cookie;
1532        int is_retrans = RPC_WAS_SENT(task);
1533        int status;
1534
1535        if (!req->rq_bytes_sent) {
1536                if (xprt_request_data_received(task)) {
1537                        status = 0;
1538                        goto out_dequeue;
1539                }
1540                /* Verify that our message lies in the RPCSEC_GSS window */
1541                if (rpcauth_xmit_need_reencode(task)) {
1542                        status = -EBADMSG;
1543                        goto out_dequeue;
1544                }
1545                if (RPC_SIGNALLED(task)) {
1546                        status = -ERESTARTSYS;
1547                        goto out_dequeue;
1548                }
1549        }
1550
1551        /*
1552         * Update req->rq_ntrans before transmitting to avoid races with
1553         * xprt_update_rtt(), which needs to know that it is recording a
1554         * reply to the first transmission.
1555         */
1556        req->rq_ntrans++;
1557
1558        trace_rpc_xdr_sendto(task, &req->rq_snd_buf);
1559        connect_cookie = xprt->connect_cookie;
1560        status = xprt->ops->send_request(req);
1561        if (status != 0) {
1562                req->rq_ntrans--;
1563                trace_xprt_transmit(req, status);
1564                return status;
1565        }
1566
1567        if (is_retrans) {
1568                task->tk_client->cl_stats->rpcretrans++;
1569                trace_xprt_retransmit(req);
1570        }
1571
1572        xprt_inject_disconnect(xprt);
1573
1574        task->tk_flags |= RPC_TASK_SENT;
1575        spin_lock(&xprt->transport_lock);
1576
1577        xprt->stat.sends++;
1578        xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
1579        xprt->stat.bklog_u += xprt->backlog.qlen;
1580        xprt->stat.sending_u += xprt->sending.qlen;
1581        xprt->stat.pending_u += xprt->pending.qlen;
1582        spin_unlock(&xprt->transport_lock);
1583
1584        req->rq_connect_cookie = connect_cookie;
1585out_dequeue:
1586        trace_xprt_transmit(req, status);
1587        xprt_request_dequeue_transmit(task);
1588        rpc_wake_up_queued_task_set_status(&xprt->sending, task, status);
1589        return status;
1590}
1591
1592/**
1593 * xprt_transmit - send an RPC request on a transport
1594 * @task: controlling RPC task
1595 *
1596 * Attempts to drain the transmit queue. On exit, either the transport
1597 * signalled an error that needs to be handled before transmission can
1598 * resume, or @task finished transmitting, and detected that it already
1599 * received a reply.
1600 */
1601void
1602xprt_transmit(struct rpc_task *task)
1603{
1604        struct rpc_rqst *next, *req = task->tk_rqstp;
1605        struct rpc_xprt *xprt = req->rq_xprt;
1606        int counter, status;
1607
1608        spin_lock(&xprt->queue_lock);
1609        counter = 0;
1610        while (!list_empty(&xprt->xmit_queue)) {
1611                if (++counter == 20)
1612                        break;
1613                next = list_first_entry(&xprt->xmit_queue,
1614                                struct rpc_rqst, rq_xmit);
1615                xprt_pin_rqst(next);
1616                spin_unlock(&xprt->queue_lock);
1617                status = xprt_request_transmit(next, task);
1618                if (status == -EBADMSG && next != req)
1619                        status = 0;
1620                spin_lock(&xprt->queue_lock);
1621                xprt_unpin_rqst(next);
1622                if (status == 0) {
1623                        if (!xprt_request_data_received(task) ||
1624                            test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1625                                continue;
1626                } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1627                        task->tk_status = status;
1628                break;
1629        }
1630        spin_unlock(&xprt->queue_lock);
1631}
1632
1633static void xprt_complete_request_init(struct rpc_task *task)
1634{
1635        if (task->tk_rqstp)
1636                xprt_request_init(task);
1637}
1638
1639void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
1640{
1641        set_bit(XPRT_CONGESTED, &xprt->state);
1642        rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init);
1643}
1644EXPORT_SYMBOL_GPL(xprt_add_backlog);
1645
1646static bool __xprt_set_rq(struct rpc_task *task, void *data)
1647{
1648        struct rpc_rqst *req = data;
1649
1650        if (task->tk_rqstp == NULL) {
1651                memset(req, 0, sizeof(*req));   /* mark unused */
1652                task->tk_rqstp = req;
1653                return true;
1654        }
1655        return false;
1656}
1657
1658bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
1659{
1660        if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) {
1661                clear_bit(XPRT_CONGESTED, &xprt->state);
1662                return false;
1663        }
1664        return true;
1665}
1666EXPORT_SYMBOL_GPL(xprt_wake_up_backlog);
1667
1668static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
1669{
1670        bool ret = false;
1671
1672        if (!test_bit(XPRT_CONGESTED, &xprt->state))
1673                goto out;
1674        spin_lock(&xprt->reserve_lock);
1675        if (test_bit(XPRT_CONGESTED, &xprt->state)) {
1676                xprt_add_backlog(xprt, task);
1677                ret = true;
1678        }
1679        spin_unlock(&xprt->reserve_lock);
1680out:
1681        return ret;
1682}
1683
1684static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt)
1685{
1686        struct rpc_rqst *req = ERR_PTR(-EAGAIN);
1687
1688        if (xprt->num_reqs >= xprt->max_reqs)
1689                goto out;
1690        ++xprt->num_reqs;
1691        spin_unlock(&xprt->reserve_lock);
1692        req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS);
1693        spin_lock(&xprt->reserve_lock);
1694        if (req != NULL)
1695                goto out;
1696        --xprt->num_reqs;
1697        req = ERR_PTR(-ENOMEM);
1698out:
1699        return req;
1700}
1701
1702static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
1703{
1704        if (xprt->num_reqs > xprt->min_reqs) {
1705                --xprt->num_reqs;
1706                kfree(req);
1707                return true;
1708        }
1709        return false;
1710}
1711
1712void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
1713{
1714        struct rpc_rqst *req;
1715
1716        spin_lock(&xprt->reserve_lock);
1717        if (!list_empty(&xprt->free)) {
1718                req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
1719                list_del(&req->rq_list);
1720                goto out_init_req;
1721        }
1722        req = xprt_dynamic_alloc_slot(xprt);
1723        if (!IS_ERR(req))
1724                goto out_init_req;
1725        switch (PTR_ERR(req)) {
1726        case -ENOMEM:
1727                dprintk("RPC:       dynamic allocation of request slot "
1728                                "failed! Retrying\n");
1729                task->tk_status = -ENOMEM;
1730                break;
1731        case -EAGAIN:
1732                xprt_add_backlog(xprt, task);
1733                dprintk("RPC:       waiting for request slot\n");
1734                fallthrough;
1735        default:
1736                task->tk_status = -EAGAIN;
1737        }
1738        spin_unlock(&xprt->reserve_lock);
1739        return;
1740out_init_req:
1741        xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots,
1742                                     xprt->num_reqs);
1743        spin_unlock(&xprt->reserve_lock);
1744
1745        task->tk_status = 0;
1746        task->tk_rqstp = req;
1747}
1748EXPORT_SYMBOL_GPL(xprt_alloc_slot);
1749
1750void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
1751{
1752        spin_lock(&xprt->reserve_lock);
1753        if (!xprt_wake_up_backlog(xprt, req) &&
1754            !xprt_dynamic_free_slot(xprt, req)) {
1755                memset(req, 0, sizeof(*req));   /* mark unused */
1756                list_add(&req->rq_list, &xprt->free);
1757        }
1758        spin_unlock(&xprt->reserve_lock);
1759}
1760EXPORT_SYMBOL_GPL(xprt_free_slot);
1761
1762static void xprt_free_all_slots(struct rpc_xprt *xprt)
1763{
1764        struct rpc_rqst *req;
1765        while (!list_empty(&xprt->free)) {
1766                req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list);
1767                list_del(&req->rq_list);
1768                kfree(req);
1769        }
1770}
1771
1772static DEFINE_IDA(rpc_xprt_ids);
1773
1774void xprt_cleanup_ids(void)
1775{
1776        ida_destroy(&rpc_xprt_ids);
1777}
1778
1779static int xprt_alloc_id(struct rpc_xprt *xprt)
1780{
1781        int id;
1782
1783        id = ida_simple_get(&rpc_xprt_ids, 0, 0, GFP_KERNEL);
1784        if (id < 0)
1785                return id;
1786
1787        xprt->id = id;
1788        return 0;
1789}
1790
1791static void xprt_free_id(struct rpc_xprt *xprt)
1792{
1793        ida_simple_remove(&rpc_xprt_ids, xprt->id);
1794}
1795
1796struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
1797                unsigned int num_prealloc,
1798                unsigned int max_alloc)
1799{
1800        struct rpc_xprt *xprt;
1801        struct rpc_rqst *req;
1802        int i;
1803
1804        xprt = kzalloc(size, GFP_KERNEL);
1805        if (xprt == NULL)
1806                goto out;
1807
1808        xprt_alloc_id(xprt);
1809        xprt_init(xprt, net);
1810
1811        for (i = 0; i < num_prealloc; i++) {
1812                req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL);
1813                if (!req)
1814                        goto out_free;
1815                list_add(&req->rq_list, &xprt->free);
1816        }
1817        if (max_alloc > num_prealloc)
1818                xprt->max_reqs = max_alloc;
1819        else
1820                xprt->max_reqs = num_prealloc;
1821        xprt->min_reqs = num_prealloc;
1822        xprt->num_reqs = num_prealloc;
1823
1824        return xprt;
1825
1826out_free:
1827        xprt_free(xprt);
1828out:
1829        return NULL;
1830}
1831EXPORT_SYMBOL_GPL(xprt_alloc);
1832
1833void xprt_free(struct rpc_xprt *xprt)
1834{
1835        put_net(xprt->xprt_net);
1836        xprt_free_all_slots(xprt);
1837        xprt_free_id(xprt);
1838        rpc_sysfs_xprt_destroy(xprt);
1839        kfree_rcu(xprt, rcu);
1840}
1841EXPORT_SYMBOL_GPL(xprt_free);
1842
1843static void
1844xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt)
1845{
1846        req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1;
1847}
1848
1849static __be32
1850xprt_alloc_xid(struct rpc_xprt *xprt)
1851{
1852        __be32 xid;
1853
1854        spin_lock(&xprt->reserve_lock);
1855        xid = (__force __be32)xprt->xid++;
1856        spin_unlock(&xprt->reserve_lock);
1857        return xid;
1858}
1859
1860static void
1861xprt_init_xid(struct rpc_xprt *xprt)
1862{
1863        xprt->xid = prandom_u32();
1864}
1865
1866static void
1867xprt_request_init(struct rpc_task *task)
1868{
1869        struct rpc_xprt *xprt = task->tk_xprt;
1870        struct rpc_rqst *req = task->tk_rqstp;
1871
1872        req->rq_task    = task;
1873        req->rq_xprt    = xprt;
1874        req->rq_buffer  = NULL;
1875        req->rq_xid     = xprt_alloc_xid(xprt);
1876        xprt_init_connect_cookie(req, xprt);
1877        req->rq_snd_buf.len = 0;
1878        req->rq_snd_buf.buflen = 0;
1879        req->rq_rcv_buf.len = 0;
1880        req->rq_rcv_buf.buflen = 0;
1881        req->rq_snd_buf.bvec = NULL;
1882        req->rq_rcv_buf.bvec = NULL;
1883        req->rq_release_snd_buf = NULL;
1884        xprt_init_majortimeo(task, req);
1885
1886        trace_xprt_reserve(req);
1887}
1888
1889static void
1890xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task)
1891{
1892        xprt->ops->alloc_slot(xprt, task);
1893        if (task->tk_rqstp != NULL)
1894                xprt_request_init(task);
1895}
1896
1897/**
1898 * xprt_reserve - allocate an RPC request slot
1899 * @task: RPC task requesting a slot allocation
1900 *
1901 * If the transport is marked as being congested, or if no more
1902 * slots are available, place the task on the transport's
1903 * backlog queue.
1904 */
1905void xprt_reserve(struct rpc_task *task)
1906{
1907        struct rpc_xprt *xprt = task->tk_xprt;
1908
1909        task->tk_status = 0;
1910        if (task->tk_rqstp != NULL)
1911                return;
1912
1913        task->tk_status = -EAGAIN;
1914        if (!xprt_throttle_congested(xprt, task))
1915                xprt_do_reserve(xprt, task);
1916}
1917
1918/**
1919 * xprt_retry_reserve - allocate an RPC request slot
1920 * @task: RPC task requesting a slot allocation
1921 *
1922 * If no more slots are available, place the task on the transport's
1923 * backlog queue.
1924 * Note that the only difference with xprt_reserve is that we now
1925 * ignore the value of the XPRT_CONGESTED flag.
1926 */
1927void xprt_retry_reserve(struct rpc_task *task)
1928{
1929        struct rpc_xprt *xprt = task->tk_xprt;
1930
1931        task->tk_status = 0;
1932        if (task->tk_rqstp != NULL)
1933                return;
1934
1935        task->tk_status = -EAGAIN;
1936        xprt_do_reserve(xprt, task);
1937}
1938
1939/**
1940 * xprt_release - release an RPC request slot
1941 * @task: task which is finished with the slot
1942 *
1943 */
1944void xprt_release(struct rpc_task *task)
1945{
1946        struct rpc_xprt *xprt;
1947        struct rpc_rqst *req = task->tk_rqstp;
1948
1949        if (req == NULL) {
1950                if (task->tk_client) {
1951                        xprt = task->tk_xprt;
1952                        xprt_release_write(xprt, task);
1953                }
1954                return;
1955        }
1956
1957        xprt = req->rq_xprt;
1958        xprt_request_dequeue_xprt(task);
1959        spin_lock(&xprt->transport_lock);
1960        xprt->ops->release_xprt(xprt, task);
1961        if (xprt->ops->release_request)
1962                xprt->ops->release_request(task);
1963        xprt_schedule_autodisconnect(xprt);
1964        spin_unlock(&xprt->transport_lock);
1965        if (req->rq_buffer)
1966                xprt->ops->buf_free(task);
1967        xdr_free_bvec(&req->rq_rcv_buf);
1968        xdr_free_bvec(&req->rq_snd_buf);
1969        if (req->rq_cred != NULL)
1970                put_rpccred(req->rq_cred);
1971        if (req->rq_release_snd_buf)
1972                req->rq_release_snd_buf(req);
1973
1974        task->tk_rqstp = NULL;
1975        if (likely(!bc_prealloc(req)))
1976                xprt->ops->free_slot(xprt, req);
1977        else
1978                xprt_free_bc_request(req);
1979}
1980
1981#ifdef CONFIG_SUNRPC_BACKCHANNEL
1982void
1983xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task)
1984{
1985        struct xdr_buf *xbufp = &req->rq_snd_buf;
1986
1987        task->tk_rqstp = req;
1988        req->rq_task = task;
1989        xprt_init_connect_cookie(req, req->rq_xprt);
1990        /*
1991         * Set up the xdr_buf length.
1992         * This also indicates that the buffer is XDR encoded already.
1993         */
1994        xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
1995                xbufp->tail[0].iov_len;
1996}
1997#endif
1998
1999static void xprt_init(struct rpc_xprt *xprt, struct net *net)
2000{
2001        kref_init(&xprt->kref);
2002
2003        spin_lock_init(&xprt->transport_lock);
2004        spin_lock_init(&xprt->reserve_lock);
2005        spin_lock_init(&xprt->queue_lock);
2006
2007        INIT_LIST_HEAD(&xprt->free);
2008        xprt->recv_queue = RB_ROOT;
2009        INIT_LIST_HEAD(&xprt->xmit_queue);
2010#if defined(CONFIG_SUNRPC_BACKCHANNEL)
2011        spin_lock_init(&xprt->bc_pa_lock);
2012        INIT_LIST_HEAD(&xprt->bc_pa_list);
2013#endif /* CONFIG_SUNRPC_BACKCHANNEL */
2014        INIT_LIST_HEAD(&xprt->xprt_switch);
2015
2016        xprt->last_used = jiffies;
2017        xprt->cwnd = RPC_INITCWND;
2018        xprt->bind_index = 0;
2019
2020        rpc_init_wait_queue(&xprt->binding, "xprt_binding");
2021        rpc_init_wait_queue(&xprt->pending, "xprt_pending");
2022        rpc_init_wait_queue(&xprt->sending, "xprt_sending");
2023        rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
2024
2025        xprt_init_xid(xprt);
2026
2027        xprt->xprt_net = get_net(net);
2028}
2029
2030/**
2031 * xprt_create_transport - create an RPC transport
2032 * @args: rpc transport creation arguments
2033 *
2034 */
2035struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
2036{
2037        struct rpc_xprt *xprt;
2038        const struct xprt_class *t;
2039
2040        t = xprt_class_find_by_ident(args->ident);
2041        if (!t) {
2042                dprintk("RPC: transport (%d) not supported\n", args->ident);
2043                return ERR_PTR(-EIO);
2044        }
2045
2046        xprt = t->setup(args);
2047        xprt_class_release(t);
2048
2049        if (IS_ERR(xprt))
2050                goto out;
2051        if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT)
2052                xprt->idle_timeout = 0;
2053        INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
2054        if (xprt_has_timer(xprt))
2055                timer_setup(&xprt->timer, xprt_init_autodisconnect, 0);
2056        else
2057                timer_setup(&xprt->timer, NULL, 0);
2058
2059        if (strlen(args->servername) > RPC_MAXNETNAMELEN) {
2060                xprt_destroy(xprt);
2061                return ERR_PTR(-EINVAL);
2062        }
2063        xprt->servername = kstrdup(args->servername, GFP_KERNEL);
2064        if (xprt->servername == NULL) {
2065                xprt_destroy(xprt);
2066                return ERR_PTR(-ENOMEM);
2067        }
2068
2069        rpc_xprt_debugfs_register(xprt);
2070
2071        trace_xprt_create(xprt);
2072out:
2073        return xprt;
2074}
2075
2076static void xprt_destroy_cb(struct work_struct *work)
2077{
2078        struct rpc_xprt *xprt =
2079                container_of(work, struct rpc_xprt, task_cleanup);
2080
2081        trace_xprt_destroy(xprt);
2082
2083        rpc_xprt_debugfs_unregister(xprt);
2084        rpc_destroy_wait_queue(&xprt->binding);
2085        rpc_destroy_wait_queue(&xprt->pending);
2086        rpc_destroy_wait_queue(&xprt->sending);
2087        rpc_destroy_wait_queue(&xprt->backlog);
2088        kfree(xprt->servername);
2089        /*
2090         * Destroy any existing back channel
2091         */
2092        xprt_destroy_backchannel(xprt, UINT_MAX);
2093
2094        /*
2095         * Tear down transport state and free the rpc_xprt
2096         */
2097        xprt->ops->destroy(xprt);
2098}
2099
2100/**
2101 * xprt_destroy - destroy an RPC transport, killing off all requests.
2102 * @xprt: transport to destroy
2103 *
2104 */
2105static void xprt_destroy(struct rpc_xprt *xprt)
2106{
2107        /*
2108         * Exclude transport connect/disconnect handlers and autoclose
2109         */
2110        wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE);
2111
2112        del_timer_sync(&xprt->timer);
2113
2114        /*
2115         * Destroy sockets etc from the system workqueue so they can
2116         * safely flush receive work running on rpciod.
2117         */
2118        INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb);
2119        schedule_work(&xprt->task_cleanup);
2120}
2121
2122static void xprt_destroy_kref(struct kref *kref)
2123{
2124        xprt_destroy(container_of(kref, struct rpc_xprt, kref));
2125}
2126
2127/**
2128 * xprt_get - return a reference to an RPC transport.
2129 * @xprt: pointer to the transport
2130 *
2131 */
2132struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
2133{
2134        if (xprt != NULL && kref_get_unless_zero(&xprt->kref))
2135                return xprt;
2136        return NULL;
2137}
2138EXPORT_SYMBOL_GPL(xprt_get);
2139
2140/**
2141 * xprt_put - release a reference to an RPC transport.
2142 * @xprt: pointer to the transport
2143 *
2144 */
2145void xprt_put(struct rpc_xprt *xprt)
2146{
2147        if (xprt != NULL)
2148                kref_put(&xprt->kref, xprt_destroy_kref);
2149}
2150EXPORT_SYMBOL_GPL(xprt_put);
2151