linux/net/sunrpc/xprt.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/*
   3 *  linux/net/sunrpc/xprt.c
   4 *
   5 *  This is a generic RPC call interface supporting congestion avoidance,
   6 *  and asynchronous calls.
   7 *
   8 *  The interface works like this:
   9 *
  10 *  -   When a process places a call, it allocates a request slot if
  11 *      one is available. Otherwise, it sleeps on the backlog queue
  12 *      (xprt_reserve).
  13 *  -   Next, the caller puts together the RPC message, stuffs it into
  14 *      the request struct, and calls xprt_transmit().
  15 *  -   xprt_transmit sends the message and installs the caller on the
  16 *      transport's wait list. At the same time, if a reply is expected,
  17 *      it installs a timer that is run after the packet's timeout has
  18 *      expired.
  19 *  -   When a packet arrives, the data_ready handler walks the list of
  20 *      pending requests for that transport. If a matching XID is found, the
  21 *      caller is woken up, and the timer removed.
  22 *  -   When no reply arrives within the timeout interval, the timer is
  23 *      fired by the kernel and runs xprt_timer(). It either adjusts the
  24 *      timeout values (minor timeout) or wakes up the caller with a status
  25 *      of -ETIMEDOUT.
  26 *  -   When the caller receives a notification from RPC that a reply arrived,
  27 *      it should release the RPC slot, and process the reply.
  28 *      If the call timed out, it may choose to retry the operation by
  29 *      adjusting the initial timeout value, and simply calling rpc_call
  30 *      again.
  31 *
  32 *  Support for async RPC is done through a set of RPC-specific scheduling
  33 *  primitives that `transparently' work for processes as well as async
  34 *  tasks that rely on callbacks.
  35 *
  36 *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
  37 *
  38 *  Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
  39 */
  40
  41#include <linux/module.h>
  42
  43#include <linux/types.h>
  44#include <linux/interrupt.h>
  45#include <linux/workqueue.h>
  46#include <linux/net.h>
  47#include <linux/ktime.h>
  48
  49#include <linux/sunrpc/clnt.h>
  50#include <linux/sunrpc/metrics.h>
  51#include <linux/sunrpc/bc_xprt.h>
  52#include <linux/rcupdate.h>
  53#include <linux/sched/mm.h>
  54
  55#include <trace/events/sunrpc.h>
  56
  57#include "sunrpc.h"
  58
  59/*
  60 * Local variables
  61 */
  62
  63#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  64# define RPCDBG_FACILITY        RPCDBG_XPRT
  65#endif
  66
  67/*
  68 * Local functions
  69 */
  70static void      xprt_init(struct rpc_xprt *xprt, struct net *net);
  71static __be32   xprt_alloc_xid(struct rpc_xprt *xprt);
  72static void      xprt_destroy(struct rpc_xprt *xprt);
  73
  74static DEFINE_SPINLOCK(xprt_list_lock);
  75static LIST_HEAD(xprt_list);
  76
  77static unsigned long xprt_request_timeout(const struct rpc_rqst *req)
  78{
  79        unsigned long timeout = jiffies + req->rq_timeout;
  80
  81        if (time_before(timeout, req->rq_majortimeo))
  82                return timeout;
  83        return req->rq_majortimeo;
  84}
  85
  86/**
  87 * xprt_register_transport - register a transport implementation
  88 * @transport: transport to register
  89 *
  90 * If a transport implementation is loaded as a kernel module, it can
  91 * call this interface to make itself known to the RPC client.
  92 *
  93 * Returns:
  94 * 0:           transport successfully registered
  95 * -EEXIST:     transport already registered
  96 * -EINVAL:     transport module being unloaded
  97 */
  98int xprt_register_transport(struct xprt_class *transport)
  99{
 100        struct xprt_class *t;
 101        int result;
 102
 103        result = -EEXIST;
 104        spin_lock(&xprt_list_lock);
 105        list_for_each_entry(t, &xprt_list, list) {
 106                /* don't register the same transport class twice */
 107                if (t->ident == transport->ident)
 108                        goto out;
 109        }
 110
 111        list_add_tail(&transport->list, &xprt_list);
 112        printk(KERN_INFO "RPC: Registered %s transport module.\n",
 113               transport->name);
 114        result = 0;
 115
 116out:
 117        spin_unlock(&xprt_list_lock);
 118        return result;
 119}
 120EXPORT_SYMBOL_GPL(xprt_register_transport);
 121
 122/**
 123 * xprt_unregister_transport - unregister a transport implementation
 124 * @transport: transport to unregister
 125 *
 126 * Returns:
 127 * 0:           transport successfully unregistered
 128 * -ENOENT:     transport never registered
 129 */
 130int xprt_unregister_transport(struct xprt_class *transport)
 131{
 132        struct xprt_class *t;
 133        int result;
 134
 135        result = 0;
 136        spin_lock(&xprt_list_lock);
 137        list_for_each_entry(t, &xprt_list, list) {
 138                if (t == transport) {
 139                        printk(KERN_INFO
 140                                "RPC: Unregistered %s transport module.\n",
 141                                transport->name);
 142                        list_del_init(&transport->list);
 143                        goto out;
 144                }
 145        }
 146        result = -ENOENT;
 147
 148out:
 149        spin_unlock(&xprt_list_lock);
 150        return result;
 151}
 152EXPORT_SYMBOL_GPL(xprt_unregister_transport);
 153
 154/**
 155 * xprt_load_transport - load a transport implementation
 156 * @transport_name: transport to load
 157 *
 158 * Returns:
 159 * 0:           transport successfully loaded
 160 * -ENOENT:     transport module not available
 161 */
 162int xprt_load_transport(const char *transport_name)
 163{
 164        struct xprt_class *t;
 165        int result;
 166
 167        result = 0;
 168        spin_lock(&xprt_list_lock);
 169        list_for_each_entry(t, &xprt_list, list) {
 170                if (strcmp(t->name, transport_name) == 0) {
 171                        spin_unlock(&xprt_list_lock);
 172                        goto out;
 173                }
 174        }
 175        spin_unlock(&xprt_list_lock);
 176        result = request_module("xprt%s", transport_name);
 177out:
 178        return result;
 179}
 180EXPORT_SYMBOL_GPL(xprt_load_transport);
 181
 182static void xprt_clear_locked(struct rpc_xprt *xprt)
 183{
 184        xprt->snd_task = NULL;
 185        if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
 186                smp_mb__before_atomic();
 187                clear_bit(XPRT_LOCKED, &xprt->state);
 188                smp_mb__after_atomic();
 189        } else
 190                queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 191}
 192
 193/**
 194 * xprt_reserve_xprt - serialize write access to transports
 195 * @task: task that is requesting access to the transport
 196 * @xprt: pointer to the target transport
 197 *
 198 * This prevents mixing the payload of separate requests, and prevents
 199 * transport connects from colliding with writes.  No congestion control
 200 * is provided.
 201 */
 202int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 203{
 204        struct rpc_rqst *req = task->tk_rqstp;
 205
 206        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
 207                if (task == xprt->snd_task)
 208                        goto out_locked;
 209                goto out_sleep;
 210        }
 211        if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
 212                goto out_unlock;
 213        xprt->snd_task = task;
 214
 215out_locked:
 216        trace_xprt_reserve_xprt(xprt, task);
 217        return 1;
 218
 219out_unlock:
 220        xprt_clear_locked(xprt);
 221out_sleep:
 222        task->tk_status = -EAGAIN;
 223        if  (RPC_IS_SOFT(task))
 224                rpc_sleep_on_timeout(&xprt->sending, task, NULL,
 225                                xprt_request_timeout(req));
 226        else
 227                rpc_sleep_on(&xprt->sending, task, NULL);
 228        return 0;
 229}
 230EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
 231
 232static bool
 233xprt_need_congestion_window_wait(struct rpc_xprt *xprt)
 234{
 235        return test_bit(XPRT_CWND_WAIT, &xprt->state);
 236}
 237
 238static void
 239xprt_set_congestion_window_wait(struct rpc_xprt *xprt)
 240{
 241        if (!list_empty(&xprt->xmit_queue)) {
 242                /* Peek at head of queue to see if it can make progress */
 243                if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst,
 244                                        rq_xmit)->rq_cong)
 245                        return;
 246        }
 247        set_bit(XPRT_CWND_WAIT, &xprt->state);
 248}
 249
 250static void
 251xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt)
 252{
 253        if (!RPCXPRT_CONGESTED(xprt))
 254                clear_bit(XPRT_CWND_WAIT, &xprt->state);
 255}
 256
 257/*
 258 * xprt_reserve_xprt_cong - serialize write access to transports
 259 * @task: task that is requesting access to the transport
 260 *
 261 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
 262 * integrated into the decision of whether a request is allowed to be
 263 * woken up and given access to the transport.
 264 * Note that the lock is only granted if we know there are free slots.
 265 */
 266int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 267{
 268        struct rpc_rqst *req = task->tk_rqstp;
 269
 270        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
 271                if (task == xprt->snd_task)
 272                        goto out_locked;
 273                goto out_sleep;
 274        }
 275        if (req == NULL) {
 276                xprt->snd_task = task;
 277                goto out_locked;
 278        }
 279        if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
 280                goto out_unlock;
 281        if (!xprt_need_congestion_window_wait(xprt)) {
 282                xprt->snd_task = task;
 283                goto out_locked;
 284        }
 285out_unlock:
 286        xprt_clear_locked(xprt);
 287out_sleep:
 288        task->tk_status = -EAGAIN;
 289        if (RPC_IS_SOFT(task))
 290                rpc_sleep_on_timeout(&xprt->sending, task, NULL,
 291                                xprt_request_timeout(req));
 292        else
 293                rpc_sleep_on(&xprt->sending, task, NULL);
 294        return 0;
 295out_locked:
 296        trace_xprt_reserve_cong(xprt, task);
 297        return 1;
 298}
 299EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
 300
 301static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
 302{
 303        int retval;
 304
 305        if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task)
 306                return 1;
 307        spin_lock(&xprt->transport_lock);
 308        retval = xprt->ops->reserve_xprt(xprt, task);
 309        spin_unlock(&xprt->transport_lock);
 310        return retval;
 311}
 312
 313static bool __xprt_lock_write_func(struct rpc_task *task, void *data)
 314{
 315        struct rpc_xprt *xprt = data;
 316
 317        xprt->snd_task = task;
 318        return true;
 319}
 320
 321static void __xprt_lock_write_next(struct rpc_xprt *xprt)
 322{
 323        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 324                return;
 325        if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
 326                goto out_unlock;
 327        if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
 328                                __xprt_lock_write_func, xprt))
 329                return;
 330out_unlock:
 331        xprt_clear_locked(xprt);
 332}
 333
 334static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
 335{
 336        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 337                return;
 338        if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
 339                goto out_unlock;
 340        if (xprt_need_congestion_window_wait(xprt))
 341                goto out_unlock;
 342        if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
 343                                __xprt_lock_write_func, xprt))
 344                return;
 345out_unlock:
 346        xprt_clear_locked(xprt);
 347}
 348
 349/**
 350 * xprt_release_xprt - allow other requests to use a transport
 351 * @xprt: transport with other tasks potentially waiting
 352 * @task: task that is releasing access to the transport
 353 *
 354 * Note that "task" can be NULL.  No congestion control is provided.
 355 */
 356void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 357{
 358        if (xprt->snd_task == task) {
 359                xprt_clear_locked(xprt);
 360                __xprt_lock_write_next(xprt);
 361        }
 362        trace_xprt_release_xprt(xprt, task);
 363}
 364EXPORT_SYMBOL_GPL(xprt_release_xprt);
 365
 366/**
 367 * xprt_release_xprt_cong - allow other requests to use a transport
 368 * @xprt: transport with other tasks potentially waiting
 369 * @task: task that is releasing access to the transport
 370 *
 371 * Note that "task" can be NULL.  Another task is awoken to use the
 372 * transport if the transport's congestion window allows it.
 373 */
 374void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 375{
 376        if (xprt->snd_task == task) {
 377                xprt_clear_locked(xprt);
 378                __xprt_lock_write_next_cong(xprt);
 379        }
 380        trace_xprt_release_cong(xprt, task);
 381}
 382EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
 383
 384static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
 385{
 386        if (xprt->snd_task != task)
 387                return;
 388        spin_lock(&xprt->transport_lock);
 389        xprt->ops->release_xprt(xprt, task);
 390        spin_unlock(&xprt->transport_lock);
 391}
 392
 393/*
 394 * Van Jacobson congestion avoidance. Check if the congestion window
 395 * overflowed. Put the task to sleep if this is the case.
 396 */
 397static int
 398__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
 399{
 400        if (req->rq_cong)
 401                return 1;
 402        trace_xprt_get_cong(xprt, req->rq_task);
 403        if (RPCXPRT_CONGESTED(xprt)) {
 404                xprt_set_congestion_window_wait(xprt);
 405                return 0;
 406        }
 407        req->rq_cong = 1;
 408        xprt->cong += RPC_CWNDSCALE;
 409        return 1;
 410}
 411
 412/*
 413 * Adjust the congestion window, and wake up the next task
 414 * that has been sleeping due to congestion
 415 */
 416static void
 417__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
 418{
 419        if (!req->rq_cong)
 420                return;
 421        req->rq_cong = 0;
 422        xprt->cong -= RPC_CWNDSCALE;
 423        xprt_test_and_clear_congestion_window_wait(xprt);
 424        trace_xprt_put_cong(xprt, req->rq_task);
 425        __xprt_lock_write_next_cong(xprt);
 426}
 427
 428/**
 429 * xprt_request_get_cong - Request congestion control credits
 430 * @xprt: pointer to transport
 431 * @req: pointer to RPC request
 432 *
 433 * Useful for transports that require congestion control.
 434 */
 435bool
 436xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
 437{
 438        bool ret = false;
 439
 440        if (req->rq_cong)
 441                return true;
 442        spin_lock(&xprt->transport_lock);
 443        ret = __xprt_get_cong(xprt, req) != 0;
 444        spin_unlock(&xprt->transport_lock);
 445        return ret;
 446}
 447EXPORT_SYMBOL_GPL(xprt_request_get_cong);
 448
 449/**
 450 * xprt_release_rqst_cong - housekeeping when request is complete
 451 * @task: RPC request that recently completed
 452 *
 453 * Useful for transports that require congestion control.
 454 */
 455void xprt_release_rqst_cong(struct rpc_task *task)
 456{
 457        struct rpc_rqst *req = task->tk_rqstp;
 458
 459        __xprt_put_cong(req->rq_xprt, req);
 460}
 461EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
 462
 463static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt)
 464{
 465        if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state))
 466                __xprt_lock_write_next_cong(xprt);
 467}
 468
 469/*
 470 * Clear the congestion window wait flag and wake up the next
 471 * entry on xprt->sending
 472 */
 473static void
 474xprt_clear_congestion_window_wait(struct rpc_xprt *xprt)
 475{
 476        if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) {
 477                spin_lock(&xprt->transport_lock);
 478                __xprt_lock_write_next_cong(xprt);
 479                spin_unlock(&xprt->transport_lock);
 480        }
 481}
 482
 483/**
 484 * xprt_adjust_cwnd - adjust transport congestion window
 485 * @xprt: pointer to xprt
 486 * @task: recently completed RPC request used to adjust window
 487 * @result: result code of completed RPC request
 488 *
 489 * The transport code maintains an estimate on the maximum number of out-
 490 * standing RPC requests, using a smoothed version of the congestion
 491 * avoidance implemented in 44BSD. This is basically the Van Jacobson
 492 * congestion algorithm: If a retransmit occurs, the congestion window is
 493 * halved; otherwise, it is incremented by 1/cwnd when
 494 *
 495 *      -       a reply is received and
 496 *      -       a full number of requests are outstanding and
 497 *      -       the congestion window hasn't been updated recently.
 498 */
 499void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result)
 500{
 501        struct rpc_rqst *req = task->tk_rqstp;
 502        unsigned long cwnd = xprt->cwnd;
 503
 504        if (result >= 0 && cwnd <= xprt->cong) {
 505                /* The (cwnd >> 1) term makes sure
 506                 * the result gets rounded properly. */
 507                cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
 508                if (cwnd > RPC_MAXCWND(xprt))
 509                        cwnd = RPC_MAXCWND(xprt);
 510                __xprt_lock_write_next_cong(xprt);
 511        } else if (result == -ETIMEDOUT) {
 512                cwnd >>= 1;
 513                if (cwnd < RPC_CWNDSCALE)
 514                        cwnd = RPC_CWNDSCALE;
 515        }
 516        dprintk("RPC:       cong %ld, cwnd was %ld, now %ld\n",
 517                        xprt->cong, xprt->cwnd, cwnd);
 518        xprt->cwnd = cwnd;
 519        __xprt_put_cong(xprt, req);
 520}
 521EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
 522
 523/**
 524 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
 525 * @xprt: transport with waiting tasks
 526 * @status: result code to plant in each task before waking it
 527 *
 528 */
 529void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
 530{
 531        if (status < 0)
 532                rpc_wake_up_status(&xprt->pending, status);
 533        else
 534                rpc_wake_up(&xprt->pending);
 535}
 536EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
 537
 538/**
 539 * xprt_wait_for_buffer_space - wait for transport output buffer to clear
 540 * @xprt: transport
 541 *
 542 * Note that we only set the timer for the case of RPC_IS_SOFT(), since
 543 * we don't in general want to force a socket disconnection due to
 544 * an incomplete RPC call transmission.
 545 */
 546void xprt_wait_for_buffer_space(struct rpc_xprt *xprt)
 547{
 548        set_bit(XPRT_WRITE_SPACE, &xprt->state);
 549}
 550EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
 551
 552static bool
 553xprt_clear_write_space_locked(struct rpc_xprt *xprt)
 554{
 555        if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) {
 556                __xprt_lock_write_next(xprt);
 557                dprintk("RPC:       write space: waking waiting task on "
 558                                "xprt %p\n", xprt);
 559                return true;
 560        }
 561        return false;
 562}
 563
 564/**
 565 * xprt_write_space - wake the task waiting for transport output buffer space
 566 * @xprt: transport with waiting tasks
 567 *
 568 * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
 569 */
 570bool xprt_write_space(struct rpc_xprt *xprt)
 571{
 572        bool ret;
 573
 574        if (!test_bit(XPRT_WRITE_SPACE, &xprt->state))
 575                return false;
 576        spin_lock(&xprt->transport_lock);
 577        ret = xprt_clear_write_space_locked(xprt);
 578        spin_unlock(&xprt->transport_lock);
 579        return ret;
 580}
 581EXPORT_SYMBOL_GPL(xprt_write_space);
 582
 583static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime)
 584{
 585        s64 delta = ktime_to_ns(ktime_get() - abstime);
 586        return likely(delta >= 0) ?
 587                jiffies - nsecs_to_jiffies(delta) :
 588                jiffies + nsecs_to_jiffies(-delta);
 589}
 590
 591static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req)
 592{
 593        const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
 594        unsigned long majortimeo = req->rq_timeout;
 595
 596        if (to->to_exponential)
 597                majortimeo <<= to->to_retries;
 598        else
 599                majortimeo += to->to_increment * to->to_retries;
 600        if (majortimeo > to->to_maxval || majortimeo == 0)
 601                majortimeo = to->to_maxval;
 602        return majortimeo;
 603}
 604
 605static void xprt_reset_majortimeo(struct rpc_rqst *req)
 606{
 607        req->rq_majortimeo += xprt_calc_majortimeo(req);
 608}
 609
 610static void xprt_reset_minortimeo(struct rpc_rqst *req)
 611{
 612        req->rq_minortimeo += req->rq_timeout;
 613}
 614
 615static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req)
 616{
 617        unsigned long time_init;
 618        struct rpc_xprt *xprt = req->rq_xprt;
 619
 620        if (likely(xprt && xprt_connected(xprt)))
 621                time_init = jiffies;
 622        else
 623                time_init = xprt_abs_ktime_to_jiffies(task->tk_start);
 624        req->rq_timeout = task->tk_client->cl_timeout->to_initval;
 625        req->rq_majortimeo = time_init + xprt_calc_majortimeo(req);
 626        req->rq_minortimeo = time_init + req->rq_timeout;
 627}
 628
 629/**
 630 * xprt_adjust_timeout - adjust timeout values for next retransmit
 631 * @req: RPC request containing parameters to use for the adjustment
 632 *
 633 */
 634int xprt_adjust_timeout(struct rpc_rqst *req)
 635{
 636        struct rpc_xprt *xprt = req->rq_xprt;
 637        const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
 638        int status = 0;
 639
 640        if (time_before(jiffies, req->rq_minortimeo))
 641                return status;
 642        if (time_before(jiffies, req->rq_majortimeo)) {
 643                if (to->to_exponential)
 644                        req->rq_timeout <<= 1;
 645                else
 646                        req->rq_timeout += to->to_increment;
 647                if (to->to_maxval && req->rq_timeout >= to->to_maxval)
 648                        req->rq_timeout = to->to_maxval;
 649                req->rq_retries++;
 650        } else {
 651                req->rq_timeout = to->to_initval;
 652                req->rq_retries = 0;
 653                xprt_reset_majortimeo(req);
 654                /* Reset the RTT counters == "slow start" */
 655                spin_lock(&xprt->transport_lock);
 656                rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
 657                spin_unlock(&xprt->transport_lock);
 658                status = -ETIMEDOUT;
 659        }
 660        xprt_reset_minortimeo(req);
 661
 662        if (req->rq_timeout == 0) {
 663                printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
 664                req->rq_timeout = 5 * HZ;
 665        }
 666        return status;
 667}
 668
 669static void xprt_autoclose(struct work_struct *work)
 670{
 671        struct rpc_xprt *xprt =
 672                container_of(work, struct rpc_xprt, task_cleanup);
 673        unsigned int pflags = memalloc_nofs_save();
 674
 675        trace_xprt_disconnect_auto(xprt);
 676        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
 677        xprt->ops->close(xprt);
 678        xprt_release_write(xprt, NULL);
 679        wake_up_bit(&xprt->state, XPRT_LOCKED);
 680        memalloc_nofs_restore(pflags);
 681}
 682
 683/**
 684 * xprt_disconnect_done - mark a transport as disconnected
 685 * @xprt: transport to flag for disconnect
 686 *
 687 */
 688void xprt_disconnect_done(struct rpc_xprt *xprt)
 689{
 690        trace_xprt_disconnect_done(xprt);
 691        spin_lock(&xprt->transport_lock);
 692        xprt_clear_connected(xprt);
 693        xprt_clear_write_space_locked(xprt);
 694        xprt_clear_congestion_window_wait_locked(xprt);
 695        xprt_wake_pending_tasks(xprt, -ENOTCONN);
 696        spin_unlock(&xprt->transport_lock);
 697}
 698EXPORT_SYMBOL_GPL(xprt_disconnect_done);
 699
 700/**
 701 * xprt_force_disconnect - force a transport to disconnect
 702 * @xprt: transport to disconnect
 703 *
 704 */
 705void xprt_force_disconnect(struct rpc_xprt *xprt)
 706{
 707        trace_xprt_disconnect_force(xprt);
 708
 709        /* Don't race with the test_bit() in xprt_clear_locked() */
 710        spin_lock(&xprt->transport_lock);
 711        set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 712        /* Try to schedule an autoclose RPC call */
 713        if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
 714                queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 715        else if (xprt->snd_task)
 716                rpc_wake_up_queued_task_set_status(&xprt->pending,
 717                                xprt->snd_task, -ENOTCONN);
 718        spin_unlock(&xprt->transport_lock);
 719}
 720EXPORT_SYMBOL_GPL(xprt_force_disconnect);
 721
 722static unsigned int
 723xprt_connect_cookie(struct rpc_xprt *xprt)
 724{
 725        return READ_ONCE(xprt->connect_cookie);
 726}
 727
 728static bool
 729xprt_request_retransmit_after_disconnect(struct rpc_task *task)
 730{
 731        struct rpc_rqst *req = task->tk_rqstp;
 732        struct rpc_xprt *xprt = req->rq_xprt;
 733
 734        return req->rq_connect_cookie != xprt_connect_cookie(xprt) ||
 735                !xprt_connected(xprt);
 736}
 737
 738/**
 739 * xprt_conditional_disconnect - force a transport to disconnect
 740 * @xprt: transport to disconnect
 741 * @cookie: 'connection cookie'
 742 *
 743 * This attempts to break the connection if and only if 'cookie' matches
 744 * the current transport 'connection cookie'. It ensures that we don't
 745 * try to break the connection more than once when we need to retransmit
 746 * a batch of RPC requests.
 747 *
 748 */
 749void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
 750{
 751        /* Don't race with the test_bit() in xprt_clear_locked() */
 752        spin_lock(&xprt->transport_lock);
 753        if (cookie != xprt->connect_cookie)
 754                goto out;
 755        if (test_bit(XPRT_CLOSING, &xprt->state))
 756                goto out;
 757        set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 758        /* Try to schedule an autoclose RPC call */
 759        if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
 760                queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 761        xprt_wake_pending_tasks(xprt, -EAGAIN);
 762out:
 763        spin_unlock(&xprt->transport_lock);
 764}
 765
 766static bool
 767xprt_has_timer(const struct rpc_xprt *xprt)
 768{
 769        return xprt->idle_timeout != 0;
 770}
 771
 772static void
 773xprt_schedule_autodisconnect(struct rpc_xprt *xprt)
 774        __must_hold(&xprt->transport_lock)
 775{
 776        xprt->last_used = jiffies;
 777        if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt))
 778                mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout);
 779}
 780
 781static void
 782xprt_init_autodisconnect(struct timer_list *t)
 783{
 784        struct rpc_xprt *xprt = from_timer(xprt, t, timer);
 785
 786        if (!RB_EMPTY_ROOT(&xprt->recv_queue))
 787                return;
 788        /* Reset xprt->last_used to avoid connect/autodisconnect cycling */
 789        xprt->last_used = jiffies;
 790        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 791                return;
 792        queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 793}
 794
 795bool xprt_lock_connect(struct rpc_xprt *xprt,
 796                struct rpc_task *task,
 797                void *cookie)
 798{
 799        bool ret = false;
 800
 801        spin_lock(&xprt->transport_lock);
 802        if (!test_bit(XPRT_LOCKED, &xprt->state))
 803                goto out;
 804        if (xprt->snd_task != task)
 805                goto out;
 806        xprt->snd_task = cookie;
 807        ret = true;
 808out:
 809        spin_unlock(&xprt->transport_lock);
 810        return ret;
 811}
 812
 813void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
 814{
 815        spin_lock(&xprt->transport_lock);
 816        if (xprt->snd_task != cookie)
 817                goto out;
 818        if (!test_bit(XPRT_LOCKED, &xprt->state))
 819                goto out;
 820        xprt->snd_task =NULL;
 821        xprt->ops->release_xprt(xprt, NULL);
 822        xprt_schedule_autodisconnect(xprt);
 823out:
 824        spin_unlock(&xprt->transport_lock);
 825        wake_up_bit(&xprt->state, XPRT_LOCKED);
 826}
 827
 828/**
 829 * xprt_connect - schedule a transport connect operation
 830 * @task: RPC task that is requesting the connect
 831 *
 832 */
 833void xprt_connect(struct rpc_task *task)
 834{
 835        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
 836
 837        trace_xprt_connect(xprt);
 838
 839        if (!xprt_bound(xprt)) {
 840                task->tk_status = -EAGAIN;
 841                return;
 842        }
 843        if (!xprt_lock_write(xprt, task))
 844                return;
 845
 846        if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
 847                trace_xprt_disconnect_cleanup(xprt);
 848                xprt->ops->close(xprt);
 849        }
 850
 851        if (!xprt_connected(xprt)) {
 852                task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie;
 853                rpc_sleep_on_timeout(&xprt->pending, task, NULL,
 854                                xprt_request_timeout(task->tk_rqstp));
 855
 856                if (test_bit(XPRT_CLOSING, &xprt->state))
 857                        return;
 858                if (xprt_test_and_set_connecting(xprt))
 859                        return;
 860                /* Race breaker */
 861                if (!xprt_connected(xprt)) {
 862                        xprt->stat.connect_start = jiffies;
 863                        xprt->ops->connect(xprt, task);
 864                } else {
 865                        xprt_clear_connecting(xprt);
 866                        task->tk_status = 0;
 867                        rpc_wake_up_queued_task(&xprt->pending, task);
 868                }
 869        }
 870        xprt_release_write(xprt, task);
 871}
 872
 873/**
 874 * xprt_reconnect_delay - compute the wait before scheduling a connect
 875 * @xprt: transport instance
 876 *
 877 */
 878unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt)
 879{
 880        unsigned long start, now = jiffies;
 881
 882        start = xprt->stat.connect_start + xprt->reestablish_timeout;
 883        if (time_after(start, now))
 884                return start - now;
 885        return 0;
 886}
 887EXPORT_SYMBOL_GPL(xprt_reconnect_delay);
 888
 889/**
 890 * xprt_reconnect_backoff - compute the new re-establish timeout
 891 * @xprt: transport instance
 892 * @init_to: initial reestablish timeout
 893 *
 894 */
 895void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to)
 896{
 897        xprt->reestablish_timeout <<= 1;
 898        if (xprt->reestablish_timeout > xprt->max_reconnect_timeout)
 899                xprt->reestablish_timeout = xprt->max_reconnect_timeout;
 900        if (xprt->reestablish_timeout < init_to)
 901                xprt->reestablish_timeout = init_to;
 902}
 903EXPORT_SYMBOL_GPL(xprt_reconnect_backoff);
 904
 905enum xprt_xid_rb_cmp {
 906        XID_RB_EQUAL,
 907        XID_RB_LEFT,
 908        XID_RB_RIGHT,
 909};
 910static enum xprt_xid_rb_cmp
 911xprt_xid_cmp(__be32 xid1, __be32 xid2)
 912{
 913        if (xid1 == xid2)
 914                return XID_RB_EQUAL;
 915        if ((__force u32)xid1 < (__force u32)xid2)
 916                return XID_RB_LEFT;
 917        return XID_RB_RIGHT;
 918}
 919
 920static struct rpc_rqst *
 921xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid)
 922{
 923        struct rb_node *n = xprt->recv_queue.rb_node;
 924        struct rpc_rqst *req;
 925
 926        while (n != NULL) {
 927                req = rb_entry(n, struct rpc_rqst, rq_recv);
 928                switch (xprt_xid_cmp(xid, req->rq_xid)) {
 929                case XID_RB_LEFT:
 930                        n = n->rb_left;
 931                        break;
 932                case XID_RB_RIGHT:
 933                        n = n->rb_right;
 934                        break;
 935                case XID_RB_EQUAL:
 936                        return req;
 937                }
 938        }
 939        return NULL;
 940}
 941
 942static void
 943xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new)
 944{
 945        struct rb_node **p = &xprt->recv_queue.rb_node;
 946        struct rb_node *n = NULL;
 947        struct rpc_rqst *req;
 948
 949        while (*p != NULL) {
 950                n = *p;
 951                req = rb_entry(n, struct rpc_rqst, rq_recv);
 952                switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) {
 953                case XID_RB_LEFT:
 954                        p = &n->rb_left;
 955                        break;
 956                case XID_RB_RIGHT:
 957                        p = &n->rb_right;
 958                        break;
 959                case XID_RB_EQUAL:
 960                        WARN_ON_ONCE(new != req);
 961                        return;
 962                }
 963        }
 964        rb_link_node(&new->rq_recv, n, p);
 965        rb_insert_color(&new->rq_recv, &xprt->recv_queue);
 966}
 967
 968static void
 969xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req)
 970{
 971        rb_erase(&req->rq_recv, &xprt->recv_queue);
 972}
 973
 974/**
 975 * xprt_lookup_rqst - find an RPC request corresponding to an XID
 976 * @xprt: transport on which the original request was transmitted
 977 * @xid: RPC XID of incoming reply
 978 *
 979 * Caller holds xprt->queue_lock.
 980 */
 981struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
 982{
 983        struct rpc_rqst *entry;
 984
 985        entry = xprt_request_rb_find(xprt, xid);
 986        if (entry != NULL) {
 987                trace_xprt_lookup_rqst(xprt, xid, 0);
 988                entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
 989                return entry;
 990        }
 991
 992        dprintk("RPC:       xprt_lookup_rqst did not find xid %08x\n",
 993                        ntohl(xid));
 994        trace_xprt_lookup_rqst(xprt, xid, -ENOENT);
 995        xprt->stat.bad_xids++;
 996        return NULL;
 997}
 998EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
 999
1000static bool
1001xprt_is_pinned_rqst(struct rpc_rqst *req)
1002{
1003        return atomic_read(&req->rq_pin) != 0;
1004}
1005
1006/**
1007 * xprt_pin_rqst - Pin a request on the transport receive list
1008 * @req: Request to pin
1009 *
1010 * Caller must ensure this is atomic with the call to xprt_lookup_rqst()
1011 * so should be holding xprt->queue_lock.
1012 */
1013void xprt_pin_rqst(struct rpc_rqst *req)
1014{
1015        atomic_inc(&req->rq_pin);
1016}
1017EXPORT_SYMBOL_GPL(xprt_pin_rqst);
1018
1019/**
1020 * xprt_unpin_rqst - Unpin a request on the transport receive list
1021 * @req: Request to pin
1022 *
1023 * Caller should be holding xprt->queue_lock.
1024 */
1025void xprt_unpin_rqst(struct rpc_rqst *req)
1026{
1027        if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) {
1028                atomic_dec(&req->rq_pin);
1029                return;
1030        }
1031        if (atomic_dec_and_test(&req->rq_pin))
1032                wake_up_var(&req->rq_pin);
1033}
1034EXPORT_SYMBOL_GPL(xprt_unpin_rqst);
1035
1036static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
1037{
1038        wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req));
1039}
1040
1041static bool
1042xprt_request_data_received(struct rpc_task *task)
1043{
1044        return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
1045                READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0;
1046}
1047
1048static bool
1049xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req)
1050{
1051        return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
1052                READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0;
1053}
1054
1055/**
1056 * xprt_request_enqueue_receive - Add an request to the receive queue
1057 * @task: RPC task
1058 *
1059 */
1060void
1061xprt_request_enqueue_receive(struct rpc_task *task)
1062{
1063        struct rpc_rqst *req = task->tk_rqstp;
1064        struct rpc_xprt *xprt = req->rq_xprt;
1065
1066        if (!xprt_request_need_enqueue_receive(task, req))
1067                return;
1068
1069        xprt_request_prepare(task->tk_rqstp);
1070        spin_lock(&xprt->queue_lock);
1071
1072        /* Update the softirq receive buffer */
1073        memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
1074                        sizeof(req->rq_private_buf));
1075
1076        /* Add request to the receive list */
1077        xprt_request_rb_insert(xprt, req);
1078        set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
1079        spin_unlock(&xprt->queue_lock);
1080
1081        /* Turn off autodisconnect */
1082        del_singleshot_timer_sync(&xprt->timer);
1083}
1084
1085/**
1086 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue
1087 * @task: RPC task
1088 *
1089 * Caller must hold xprt->queue_lock.
1090 */
1091static void
1092xprt_request_dequeue_receive_locked(struct rpc_task *task)
1093{
1094        struct rpc_rqst *req = task->tk_rqstp;
1095
1096        if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
1097                xprt_request_rb_remove(req->rq_xprt, req);
1098}
1099
1100/**
1101 * xprt_update_rtt - Update RPC RTT statistics
1102 * @task: RPC request that recently completed
1103 *
1104 * Caller holds xprt->queue_lock.
1105 */
1106void xprt_update_rtt(struct rpc_task *task)
1107{
1108        struct rpc_rqst *req = task->tk_rqstp;
1109        struct rpc_rtt *rtt = task->tk_client->cl_rtt;
1110        unsigned int timer = task->tk_msg.rpc_proc->p_timer;
1111        long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt));
1112
1113        if (timer) {
1114                if (req->rq_ntrans == 1)
1115                        rpc_update_rtt(rtt, timer, m);
1116                rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
1117        }
1118}
1119EXPORT_SYMBOL_GPL(xprt_update_rtt);
1120
1121/**
1122 * xprt_complete_rqst - called when reply processing is complete
1123 * @task: RPC request that recently completed
1124 * @copied: actual number of bytes received from the transport
1125 *
1126 * Caller holds xprt->queue_lock.
1127 */
1128void xprt_complete_rqst(struct rpc_task *task, int copied)
1129{
1130        struct rpc_rqst *req = task->tk_rqstp;
1131        struct rpc_xprt *xprt = req->rq_xprt;
1132
1133        xprt->stat.recvs++;
1134
1135        req->rq_private_buf.len = copied;
1136        /* Ensure all writes are done before we update */
1137        /* req->rq_reply_bytes_recvd */
1138        smp_wmb();
1139        req->rq_reply_bytes_recvd = copied;
1140        xprt_request_dequeue_receive_locked(task);
1141        rpc_wake_up_queued_task(&xprt->pending, task);
1142}
1143EXPORT_SYMBOL_GPL(xprt_complete_rqst);
1144
1145static void xprt_timer(struct rpc_task *task)
1146{
1147        struct rpc_rqst *req = task->tk_rqstp;
1148        struct rpc_xprt *xprt = req->rq_xprt;
1149
1150        if (task->tk_status != -ETIMEDOUT)
1151                return;
1152
1153        trace_xprt_timer(xprt, req->rq_xid, task->tk_status);
1154        if (!req->rq_reply_bytes_recvd) {
1155                if (xprt->ops->timer)
1156                        xprt->ops->timer(xprt, task);
1157        } else
1158                task->tk_status = 0;
1159}
1160
1161/**
1162 * xprt_wait_for_reply_request_def - wait for reply
1163 * @task: pointer to rpc_task
1164 *
1165 * Set a request's retransmit timeout based on the transport's
1166 * default timeout parameters.  Used by transports that don't adjust
1167 * the retransmit timeout based on round-trip time estimation,
1168 * and put the task to sleep on the pending queue.
1169 */
1170void xprt_wait_for_reply_request_def(struct rpc_task *task)
1171{
1172        struct rpc_rqst *req = task->tk_rqstp;
1173
1174        rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer,
1175                        xprt_request_timeout(req));
1176}
1177EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def);
1178
1179/**
1180 * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator
1181 * @task: pointer to rpc_task
1182 *
1183 * Set a request's retransmit timeout using the RTT estimator,
1184 * and put the task to sleep on the pending queue.
1185 */
1186void xprt_wait_for_reply_request_rtt(struct rpc_task *task)
1187{
1188        int timer = task->tk_msg.rpc_proc->p_timer;
1189        struct rpc_clnt *clnt = task->tk_client;
1190        struct rpc_rtt *rtt = clnt->cl_rtt;
1191        struct rpc_rqst *req = task->tk_rqstp;
1192        unsigned long max_timeout = clnt->cl_timeout->to_maxval;
1193        unsigned long timeout;
1194
1195        timeout = rpc_calc_rto(rtt, timer);
1196        timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
1197        if (timeout > max_timeout || timeout == 0)
1198                timeout = max_timeout;
1199        rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer,
1200                        jiffies + timeout);
1201}
1202EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt);
1203
1204/**
1205 * xprt_request_wait_receive - wait for the reply to an RPC request
1206 * @task: RPC task about to send a request
1207 *
1208 */
1209void xprt_request_wait_receive(struct rpc_task *task)
1210{
1211        struct rpc_rqst *req = task->tk_rqstp;
1212        struct rpc_xprt *xprt = req->rq_xprt;
1213
1214        if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
1215                return;
1216        /*
1217         * Sleep on the pending queue if we're expecting a reply.
1218         * The spinlock ensures atomicity between the test of
1219         * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
1220         */
1221        spin_lock(&xprt->queue_lock);
1222        if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
1223                xprt->ops->wait_for_reply_request(task);
1224                /*
1225                 * Send an extra queue wakeup call if the
1226                 * connection was dropped in case the call to
1227                 * rpc_sleep_on() raced.
1228                 */
1229                if (xprt_request_retransmit_after_disconnect(task))
1230                        rpc_wake_up_queued_task_set_status(&xprt->pending,
1231                                        task, -ENOTCONN);
1232        }
1233        spin_unlock(&xprt->queue_lock);
1234}
1235
1236static bool
1237xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req)
1238{
1239        return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1240}
1241
1242/**
1243 * xprt_request_enqueue_transmit - queue a task for transmission
1244 * @task: pointer to rpc_task
1245 *
1246 * Add a task to the transmission queue.
1247 */
1248void
1249xprt_request_enqueue_transmit(struct rpc_task *task)
1250{
1251        struct rpc_rqst *pos, *req = task->tk_rqstp;
1252        struct rpc_xprt *xprt = req->rq_xprt;
1253
1254        if (xprt_request_need_enqueue_transmit(task, req)) {
1255                req->rq_bytes_sent = 0;
1256                spin_lock(&xprt->queue_lock);
1257                /*
1258                 * Requests that carry congestion control credits are added
1259                 * to the head of the list to avoid starvation issues.
1260                 */
1261                if (req->rq_cong) {
1262                        xprt_clear_congestion_window_wait(xprt);
1263                        list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1264                                if (pos->rq_cong)
1265                                        continue;
1266                                /* Note: req is added _before_ pos */
1267                                list_add_tail(&req->rq_xmit, &pos->rq_xmit);
1268                                INIT_LIST_HEAD(&req->rq_xmit2);
1269                                goto out;
1270                        }
1271                } else if (RPC_IS_SWAPPER(task)) {
1272                        list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1273                                if (pos->rq_cong || pos->rq_bytes_sent)
1274                                        continue;
1275                                if (RPC_IS_SWAPPER(pos->rq_task))
1276                                        continue;
1277                                /* Note: req is added _before_ pos */
1278                                list_add_tail(&req->rq_xmit, &pos->rq_xmit);
1279                                INIT_LIST_HEAD(&req->rq_xmit2);
1280                                goto out;
1281                        }
1282                } else if (!req->rq_seqno) {
1283                        list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1284                                if (pos->rq_task->tk_owner != task->tk_owner)
1285                                        continue;
1286                                list_add_tail(&req->rq_xmit2, &pos->rq_xmit2);
1287                                INIT_LIST_HEAD(&req->rq_xmit);
1288                                goto out;
1289                        }
1290                }
1291                list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
1292                INIT_LIST_HEAD(&req->rq_xmit2);
1293out:
1294                set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1295                spin_unlock(&xprt->queue_lock);
1296        }
1297}
1298
1299/**
1300 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue
1301 * @task: pointer to rpc_task
1302 *
1303 * Remove a task from the transmission queue
1304 * Caller must hold xprt->queue_lock
1305 */
1306static void
1307xprt_request_dequeue_transmit_locked(struct rpc_task *task)
1308{
1309        struct rpc_rqst *req = task->tk_rqstp;
1310
1311        if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1312                return;
1313        if (!list_empty(&req->rq_xmit)) {
1314                list_del(&req->rq_xmit);
1315                if (!list_empty(&req->rq_xmit2)) {
1316                        struct rpc_rqst *next = list_first_entry(&req->rq_xmit2,
1317                                        struct rpc_rqst, rq_xmit2);
1318                        list_del(&req->rq_xmit2);
1319                        list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue);
1320                }
1321        } else
1322                list_del(&req->rq_xmit2);
1323}
1324
1325/**
1326 * xprt_request_dequeue_transmit - remove a task from the transmission queue
1327 * @task: pointer to rpc_task
1328 *
1329 * Remove a task from the transmission queue
1330 */
1331static void
1332xprt_request_dequeue_transmit(struct rpc_task *task)
1333{
1334        struct rpc_rqst *req = task->tk_rqstp;
1335        struct rpc_xprt *xprt = req->rq_xprt;
1336
1337        spin_lock(&xprt->queue_lock);
1338        xprt_request_dequeue_transmit_locked(task);
1339        spin_unlock(&xprt->queue_lock);
1340}
1341
1342/**
1343 * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue
1344 * @task: pointer to rpc_task
1345 *
1346 * Remove a task from the transmit and receive queues, and ensure that
1347 * it is not pinned by the receive work item.
1348 */
1349void
1350xprt_request_dequeue_xprt(struct rpc_task *task)
1351{
1352        struct rpc_rqst *req = task->tk_rqstp;
1353        struct rpc_xprt *xprt = req->rq_xprt;
1354
1355        if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
1356            test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
1357            xprt_is_pinned_rqst(req)) {
1358                spin_lock(&xprt->queue_lock);
1359                xprt_request_dequeue_transmit_locked(task);
1360                xprt_request_dequeue_receive_locked(task);
1361                while (xprt_is_pinned_rqst(req)) {
1362                        set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
1363                        spin_unlock(&xprt->queue_lock);
1364                        xprt_wait_on_pinned_rqst(req);
1365                        spin_lock(&xprt->queue_lock);
1366                        clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
1367                }
1368                spin_unlock(&xprt->queue_lock);
1369        }
1370}
1371
1372/**
1373 * xprt_request_prepare - prepare an encoded request for transport
1374 * @req: pointer to rpc_rqst
1375 *
1376 * Calls into the transport layer to do whatever is needed to prepare
1377 * the request for transmission or receive.
1378 */
1379void
1380xprt_request_prepare(struct rpc_rqst *req)
1381{
1382        struct rpc_xprt *xprt = req->rq_xprt;
1383
1384        if (xprt->ops->prepare_request)
1385                xprt->ops->prepare_request(req);
1386}
1387
1388/**
1389 * xprt_request_need_retransmit - Test if a task needs retransmission
1390 * @task: pointer to rpc_task
1391 *
1392 * Test for whether a connection breakage requires the task to retransmit
1393 */
1394bool
1395xprt_request_need_retransmit(struct rpc_task *task)
1396{
1397        return xprt_request_retransmit_after_disconnect(task);
1398}
1399
1400/**
1401 * xprt_prepare_transmit - reserve the transport before sending a request
1402 * @task: RPC task about to send a request
1403 *
1404 */
1405bool xprt_prepare_transmit(struct rpc_task *task)
1406{
1407        struct rpc_rqst *req = task->tk_rqstp;
1408        struct rpc_xprt *xprt = req->rq_xprt;
1409
1410        if (!xprt_lock_write(xprt, task)) {
1411                trace_xprt_transmit_queued(xprt, task);
1412
1413                /* Race breaker: someone may have transmitted us */
1414                if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1415                        rpc_wake_up_queued_task_set_status(&xprt->sending,
1416                                        task, 0);
1417                return false;
1418
1419        }
1420        return true;
1421}
1422
1423void xprt_end_transmit(struct rpc_task *task)
1424{
1425        xprt_release_write(task->tk_rqstp->rq_xprt, task);
1426}
1427
1428/**
1429 * xprt_request_transmit - send an RPC request on a transport
1430 * @req: pointer to request to transmit
1431 * @snd_task: RPC task that owns the transport lock
1432 *
1433 * This performs the transmission of a single request.
1434 * Note that if the request is not the same as snd_task, then it
1435 * does need to be pinned.
1436 * Returns '0' on success.
1437 */
1438static int
1439xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
1440{
1441        struct rpc_xprt *xprt = req->rq_xprt;
1442        struct rpc_task *task = req->rq_task;
1443        unsigned int connect_cookie;
1444        int is_retrans = RPC_WAS_SENT(task);
1445        int status;
1446
1447        if (!req->rq_bytes_sent) {
1448                if (xprt_request_data_received(task)) {
1449                        status = 0;
1450                        goto out_dequeue;
1451                }
1452                /* Verify that our message lies in the RPCSEC_GSS window */
1453                if (rpcauth_xmit_need_reencode(task)) {
1454                        status = -EBADMSG;
1455                        goto out_dequeue;
1456                }
1457                if (RPC_SIGNALLED(task)) {
1458                        status = -ERESTARTSYS;
1459                        goto out_dequeue;
1460                }
1461        }
1462
1463        /*
1464         * Update req->rq_ntrans before transmitting to avoid races with
1465         * xprt_update_rtt(), which needs to know that it is recording a
1466         * reply to the first transmission.
1467         */
1468        req->rq_ntrans++;
1469
1470        trace_rpc_xdr_sendto(task, &req->rq_snd_buf);
1471        connect_cookie = xprt->connect_cookie;
1472        status = xprt->ops->send_request(req);
1473        if (status != 0) {
1474                req->rq_ntrans--;
1475                trace_xprt_transmit(req, status);
1476                return status;
1477        }
1478
1479        if (is_retrans)
1480                task->tk_client->cl_stats->rpcretrans++;
1481
1482        xprt_inject_disconnect(xprt);
1483
1484        task->tk_flags |= RPC_TASK_SENT;
1485        spin_lock(&xprt->transport_lock);
1486
1487        xprt->stat.sends++;
1488        xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
1489        xprt->stat.bklog_u += xprt->backlog.qlen;
1490        xprt->stat.sending_u += xprt->sending.qlen;
1491        xprt->stat.pending_u += xprt->pending.qlen;
1492        spin_unlock(&xprt->transport_lock);
1493
1494        req->rq_connect_cookie = connect_cookie;
1495out_dequeue:
1496        trace_xprt_transmit(req, status);
1497        xprt_request_dequeue_transmit(task);
1498        rpc_wake_up_queued_task_set_status(&xprt->sending, task, status);
1499        return status;
1500}
1501
1502/**
1503 * xprt_transmit - send an RPC request on a transport
1504 * @task: controlling RPC task
1505 *
1506 * Attempts to drain the transmit queue. On exit, either the transport
1507 * signalled an error that needs to be handled before transmission can
1508 * resume, or @task finished transmitting, and detected that it already
1509 * received a reply.
1510 */
1511void
1512xprt_transmit(struct rpc_task *task)
1513{
1514        struct rpc_rqst *next, *req = task->tk_rqstp;
1515        struct rpc_xprt *xprt = req->rq_xprt;
1516        int counter, status;
1517
1518        spin_lock(&xprt->queue_lock);
1519        counter = 0;
1520        while (!list_empty(&xprt->xmit_queue)) {
1521                if (++counter == 20)
1522                        break;
1523                next = list_first_entry(&xprt->xmit_queue,
1524                                struct rpc_rqst, rq_xmit);
1525                xprt_pin_rqst(next);
1526                spin_unlock(&xprt->queue_lock);
1527                status = xprt_request_transmit(next, task);
1528                if (status == -EBADMSG && next != req)
1529                        status = 0;
1530                spin_lock(&xprt->queue_lock);
1531                xprt_unpin_rqst(next);
1532                if (status == 0) {
1533                        if (!xprt_request_data_received(task) ||
1534                            test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1535                                continue;
1536                } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1537                        task->tk_status = status;
1538                break;
1539        }
1540        spin_unlock(&xprt->queue_lock);
1541}
1542
1543static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
1544{
1545        set_bit(XPRT_CONGESTED, &xprt->state);
1546        rpc_sleep_on(&xprt->backlog, task, NULL);
1547}
1548
1549static void xprt_wake_up_backlog(struct rpc_xprt *xprt)
1550{
1551        if (rpc_wake_up_next(&xprt->backlog) == NULL)
1552                clear_bit(XPRT_CONGESTED, &xprt->state);
1553}
1554
1555static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
1556{
1557        bool ret = false;
1558
1559        if (!test_bit(XPRT_CONGESTED, &xprt->state))
1560                goto out;
1561        spin_lock(&xprt->reserve_lock);
1562        if (test_bit(XPRT_CONGESTED, &xprt->state)) {
1563                rpc_sleep_on(&xprt->backlog, task, NULL);
1564                ret = true;
1565        }
1566        spin_unlock(&xprt->reserve_lock);
1567out:
1568        return ret;
1569}
1570
1571static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt)
1572{
1573        struct rpc_rqst *req = ERR_PTR(-EAGAIN);
1574
1575        if (xprt->num_reqs >= xprt->max_reqs)
1576                goto out;
1577        ++xprt->num_reqs;
1578        spin_unlock(&xprt->reserve_lock);
1579        req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS);
1580        spin_lock(&xprt->reserve_lock);
1581        if (req != NULL)
1582                goto out;
1583        --xprt->num_reqs;
1584        req = ERR_PTR(-ENOMEM);
1585out:
1586        return req;
1587}
1588
1589static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
1590{
1591        if (xprt->num_reqs > xprt->min_reqs) {
1592                --xprt->num_reqs;
1593                kfree(req);
1594                return true;
1595        }
1596        return false;
1597}
1598
1599void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
1600{
1601        struct rpc_rqst *req;
1602
1603        spin_lock(&xprt->reserve_lock);
1604        if (!list_empty(&xprt->free)) {
1605                req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
1606                list_del(&req->rq_list);
1607                goto out_init_req;
1608        }
1609        req = xprt_dynamic_alloc_slot(xprt);
1610        if (!IS_ERR(req))
1611                goto out_init_req;
1612        switch (PTR_ERR(req)) {
1613        case -ENOMEM:
1614                dprintk("RPC:       dynamic allocation of request slot "
1615                                "failed! Retrying\n");
1616                task->tk_status = -ENOMEM;
1617                break;
1618        case -EAGAIN:
1619                xprt_add_backlog(xprt, task);
1620                dprintk("RPC:       waiting for request slot\n");
1621                fallthrough;
1622        default:
1623                task->tk_status = -EAGAIN;
1624        }
1625        spin_unlock(&xprt->reserve_lock);
1626        return;
1627out_init_req:
1628        xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots,
1629                                     xprt->num_reqs);
1630        spin_unlock(&xprt->reserve_lock);
1631
1632        task->tk_status = 0;
1633        task->tk_rqstp = req;
1634}
1635EXPORT_SYMBOL_GPL(xprt_alloc_slot);
1636
1637void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
1638{
1639        spin_lock(&xprt->reserve_lock);
1640        if (!xprt_dynamic_free_slot(xprt, req)) {
1641                memset(req, 0, sizeof(*req));   /* mark unused */
1642                list_add(&req->rq_list, &xprt->free);
1643        }
1644        xprt_wake_up_backlog(xprt);
1645        spin_unlock(&xprt->reserve_lock);
1646}
1647EXPORT_SYMBOL_GPL(xprt_free_slot);
1648
1649static void xprt_free_all_slots(struct rpc_xprt *xprt)
1650{
1651        struct rpc_rqst *req;
1652        while (!list_empty(&xprt->free)) {
1653                req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list);
1654                list_del(&req->rq_list);
1655                kfree(req);
1656        }
1657}
1658
1659struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
1660                unsigned int num_prealloc,
1661                unsigned int max_alloc)
1662{
1663        struct rpc_xprt *xprt;
1664        struct rpc_rqst *req;
1665        int i;
1666
1667        xprt = kzalloc(size, GFP_KERNEL);
1668        if (xprt == NULL)
1669                goto out;
1670
1671        xprt_init(xprt, net);
1672
1673        for (i = 0; i < num_prealloc; i++) {
1674                req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL);
1675                if (!req)
1676                        goto out_free;
1677                list_add(&req->rq_list, &xprt->free);
1678        }
1679        if (max_alloc > num_prealloc)
1680                xprt->max_reqs = max_alloc;
1681        else
1682                xprt->max_reqs = num_prealloc;
1683        xprt->min_reqs = num_prealloc;
1684        xprt->num_reqs = num_prealloc;
1685
1686        return xprt;
1687
1688out_free:
1689        xprt_free(xprt);
1690out:
1691        return NULL;
1692}
1693EXPORT_SYMBOL_GPL(xprt_alloc);
1694
1695void xprt_free(struct rpc_xprt *xprt)
1696{
1697        put_net(xprt->xprt_net);
1698        xprt_free_all_slots(xprt);
1699        kfree_rcu(xprt, rcu);
1700}
1701EXPORT_SYMBOL_GPL(xprt_free);
1702
1703static void
1704xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt)
1705{
1706        req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1;
1707}
1708
1709static __be32
1710xprt_alloc_xid(struct rpc_xprt *xprt)
1711{
1712        __be32 xid;
1713
1714        spin_lock(&xprt->reserve_lock);
1715        xid = (__force __be32)xprt->xid++;
1716        spin_unlock(&xprt->reserve_lock);
1717        return xid;
1718}
1719
1720static void
1721xprt_init_xid(struct rpc_xprt *xprt)
1722{
1723        xprt->xid = prandom_u32();
1724}
1725
1726static void
1727xprt_request_init(struct rpc_task *task)
1728{
1729        struct rpc_xprt *xprt = task->tk_xprt;
1730        struct rpc_rqst *req = task->tk_rqstp;
1731
1732        req->rq_task    = task;
1733        req->rq_xprt    = xprt;
1734        req->rq_buffer  = NULL;
1735        req->rq_xid     = xprt_alloc_xid(xprt);
1736        xprt_init_connect_cookie(req, xprt);
1737        req->rq_snd_buf.len = 0;
1738        req->rq_snd_buf.buflen = 0;
1739        req->rq_rcv_buf.len = 0;
1740        req->rq_rcv_buf.buflen = 0;
1741        req->rq_snd_buf.bvec = NULL;
1742        req->rq_rcv_buf.bvec = NULL;
1743        req->rq_release_snd_buf = NULL;
1744        xprt_init_majortimeo(task, req);
1745
1746        trace_xprt_reserve(req);
1747}
1748
1749static void
1750xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task)
1751{
1752        xprt->ops->alloc_slot(xprt, task);
1753        if (task->tk_rqstp != NULL)
1754                xprt_request_init(task);
1755}
1756
1757/**
1758 * xprt_reserve - allocate an RPC request slot
1759 * @task: RPC task requesting a slot allocation
1760 *
1761 * If the transport is marked as being congested, or if no more
1762 * slots are available, place the task on the transport's
1763 * backlog queue.
1764 */
1765void xprt_reserve(struct rpc_task *task)
1766{
1767        struct rpc_xprt *xprt = task->tk_xprt;
1768
1769        task->tk_status = 0;
1770        if (task->tk_rqstp != NULL)
1771                return;
1772
1773        task->tk_status = -EAGAIN;
1774        if (!xprt_throttle_congested(xprt, task))
1775                xprt_do_reserve(xprt, task);
1776}
1777
1778/**
1779 * xprt_retry_reserve - allocate an RPC request slot
1780 * @task: RPC task requesting a slot allocation
1781 *
1782 * If no more slots are available, place the task on the transport's
1783 * backlog queue.
1784 * Note that the only difference with xprt_reserve is that we now
1785 * ignore the value of the XPRT_CONGESTED flag.
1786 */
1787void xprt_retry_reserve(struct rpc_task *task)
1788{
1789        struct rpc_xprt *xprt = task->tk_xprt;
1790
1791        task->tk_status = 0;
1792        if (task->tk_rqstp != NULL)
1793                return;
1794
1795        task->tk_status = -EAGAIN;
1796        xprt_do_reserve(xprt, task);
1797}
1798
1799/**
1800 * xprt_release - release an RPC request slot
1801 * @task: task which is finished with the slot
1802 *
1803 */
1804void xprt_release(struct rpc_task *task)
1805{
1806        struct rpc_xprt *xprt;
1807        struct rpc_rqst *req = task->tk_rqstp;
1808
1809        if (req == NULL) {
1810                if (task->tk_client) {
1811                        xprt = task->tk_xprt;
1812                        xprt_release_write(xprt, task);
1813                }
1814                return;
1815        }
1816
1817        xprt = req->rq_xprt;
1818        xprt_request_dequeue_xprt(task);
1819        spin_lock(&xprt->transport_lock);
1820        xprt->ops->release_xprt(xprt, task);
1821        if (xprt->ops->release_request)
1822                xprt->ops->release_request(task);
1823        xprt_schedule_autodisconnect(xprt);
1824        spin_unlock(&xprt->transport_lock);
1825        if (req->rq_buffer)
1826                xprt->ops->buf_free(task);
1827        xprt_inject_disconnect(xprt);
1828        xdr_free_bvec(&req->rq_rcv_buf);
1829        xdr_free_bvec(&req->rq_snd_buf);
1830        if (req->rq_cred != NULL)
1831                put_rpccred(req->rq_cred);
1832        task->tk_rqstp = NULL;
1833        if (req->rq_release_snd_buf)
1834                req->rq_release_snd_buf(req);
1835
1836        if (likely(!bc_prealloc(req)))
1837                xprt->ops->free_slot(xprt, req);
1838        else
1839                xprt_free_bc_request(req);
1840}
1841
1842#ifdef CONFIG_SUNRPC_BACKCHANNEL
1843void
1844xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task)
1845{
1846        struct xdr_buf *xbufp = &req->rq_snd_buf;
1847
1848        task->tk_rqstp = req;
1849        req->rq_task = task;
1850        xprt_init_connect_cookie(req, req->rq_xprt);
1851        /*
1852         * Set up the xdr_buf length.
1853         * This also indicates that the buffer is XDR encoded already.
1854         */
1855        xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
1856                xbufp->tail[0].iov_len;
1857}
1858#endif
1859
1860static void xprt_init(struct rpc_xprt *xprt, struct net *net)
1861{
1862        kref_init(&xprt->kref);
1863
1864        spin_lock_init(&xprt->transport_lock);
1865        spin_lock_init(&xprt->reserve_lock);
1866        spin_lock_init(&xprt->queue_lock);
1867
1868        INIT_LIST_HEAD(&xprt->free);
1869        xprt->recv_queue = RB_ROOT;
1870        INIT_LIST_HEAD(&xprt->xmit_queue);
1871#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1872        spin_lock_init(&xprt->bc_pa_lock);
1873        INIT_LIST_HEAD(&xprt->bc_pa_list);
1874#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1875        INIT_LIST_HEAD(&xprt->xprt_switch);
1876
1877        xprt->last_used = jiffies;
1878        xprt->cwnd = RPC_INITCWND;
1879        xprt->bind_index = 0;
1880
1881        rpc_init_wait_queue(&xprt->binding, "xprt_binding");
1882        rpc_init_wait_queue(&xprt->pending, "xprt_pending");
1883        rpc_init_wait_queue(&xprt->sending, "xprt_sending");
1884        rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
1885
1886        xprt_init_xid(xprt);
1887
1888        xprt->xprt_net = get_net(net);
1889}
1890
1891/**
1892 * xprt_create_transport - create an RPC transport
1893 * @args: rpc transport creation arguments
1894 *
1895 */
1896struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
1897{
1898        struct rpc_xprt *xprt;
1899        struct xprt_class *t;
1900
1901        spin_lock(&xprt_list_lock);
1902        list_for_each_entry(t, &xprt_list, list) {
1903                if (t->ident == args->ident) {
1904                        spin_unlock(&xprt_list_lock);
1905                        goto found;
1906                }
1907        }
1908        spin_unlock(&xprt_list_lock);
1909        dprintk("RPC: transport (%d) not supported\n", args->ident);
1910        return ERR_PTR(-EIO);
1911
1912found:
1913        xprt = t->setup(args);
1914        if (IS_ERR(xprt))
1915                goto out;
1916        if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT)
1917                xprt->idle_timeout = 0;
1918        INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
1919        if (xprt_has_timer(xprt))
1920                timer_setup(&xprt->timer, xprt_init_autodisconnect, 0);
1921        else
1922                timer_setup(&xprt->timer, NULL, 0);
1923
1924        if (strlen(args->servername) > RPC_MAXNETNAMELEN) {
1925                xprt_destroy(xprt);
1926                return ERR_PTR(-EINVAL);
1927        }
1928        xprt->servername = kstrdup(args->servername, GFP_KERNEL);
1929        if (xprt->servername == NULL) {
1930                xprt_destroy(xprt);
1931                return ERR_PTR(-ENOMEM);
1932        }
1933
1934        rpc_xprt_debugfs_register(xprt);
1935
1936        trace_xprt_create(xprt);
1937out:
1938        return xprt;
1939}
1940
1941static void xprt_destroy_cb(struct work_struct *work)
1942{
1943        struct rpc_xprt *xprt =
1944                container_of(work, struct rpc_xprt, task_cleanup);
1945
1946        trace_xprt_destroy(xprt);
1947
1948        rpc_xprt_debugfs_unregister(xprt);
1949        rpc_destroy_wait_queue(&xprt->binding);
1950        rpc_destroy_wait_queue(&xprt->pending);
1951        rpc_destroy_wait_queue(&xprt->sending);
1952        rpc_destroy_wait_queue(&xprt->backlog);
1953        kfree(xprt->servername);
1954        /*
1955         * Destroy any existing back channel
1956         */
1957        xprt_destroy_backchannel(xprt, UINT_MAX);
1958
1959        /*
1960         * Tear down transport state and free the rpc_xprt
1961         */
1962        xprt->ops->destroy(xprt);
1963}
1964
1965/**
1966 * xprt_destroy - destroy an RPC transport, killing off all requests.
1967 * @xprt: transport to destroy
1968 *
1969 */
1970static void xprt_destroy(struct rpc_xprt *xprt)
1971{
1972        /*
1973         * Exclude transport connect/disconnect handlers and autoclose
1974         */
1975        wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE);
1976
1977        del_timer_sync(&xprt->timer);
1978
1979        /*
1980         * Destroy sockets etc from the system workqueue so they can
1981         * safely flush receive work running on rpciod.
1982         */
1983        INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb);
1984        schedule_work(&xprt->task_cleanup);
1985}
1986
1987static void xprt_destroy_kref(struct kref *kref)
1988{
1989        xprt_destroy(container_of(kref, struct rpc_xprt, kref));
1990}
1991
1992/**
1993 * xprt_get - return a reference to an RPC transport.
1994 * @xprt: pointer to the transport
1995 *
1996 */
1997struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
1998{
1999        if (xprt != NULL && kref_get_unless_zero(&xprt->kref))
2000                return xprt;
2001        return NULL;
2002}
2003EXPORT_SYMBOL_GPL(xprt_get);
2004
2005/**
2006 * xprt_put - release a reference to an RPC transport.
2007 * @xprt: pointer to the transport
2008 *
2009 */
2010void xprt_put(struct rpc_xprt *xprt)
2011{
2012        if (xprt != NULL)
2013                kref_put(&xprt->kref, xprt_destroy_kref);
2014}
2015EXPORT_SYMBOL_GPL(xprt_put);
2016