linux/net/sunrpc/xprt.c
<<
>>
Prefs
   1/*
   2 *  linux/net/sunrpc/xprt.c
   3 *
   4 *  This is a generic RPC call interface supporting congestion avoidance,
   5 *  and asynchronous calls.
   6 *
   7 *  The interface works like this:
   8 *
   9 *  -   When a process places a call, it allocates a request slot if
  10 *      one is available. Otherwise, it sleeps on the backlog queue
  11 *      (xprt_reserve).
  12 *  -   Next, the caller puts together the RPC message, stuffs it into
  13 *      the request struct, and calls xprt_transmit().
  14 *  -   xprt_transmit sends the message and installs the caller on the
  15 *      transport's wait list. At the same time, if a reply is expected,
  16 *      it installs a timer that is run after the packet's timeout has
  17 *      expired.
  18 *  -   When a packet arrives, the data_ready handler walks the list of
  19 *      pending requests for that transport. If a matching XID is found, the
  20 *      caller is woken up, and the timer removed.
  21 *  -   When no reply arrives within the timeout interval, the timer is
  22 *      fired by the kernel and runs xprt_timer(). It either adjusts the
  23 *      timeout values (minor timeout) or wakes up the caller with a status
  24 *      of -ETIMEDOUT.
  25 *  -   When the caller receives a notification from RPC that a reply arrived,
  26 *      it should release the RPC slot, and process the reply.
  27 *      If the call timed out, it may choose to retry the operation by
  28 *      adjusting the initial timeout value, and simply calling rpc_call
  29 *      again.
  30 *
  31 *  Support for async RPC is done through a set of RPC-specific scheduling
  32 *  primitives that `transparently' work for processes as well as async
  33 *  tasks that rely on callbacks.
  34 *
  35 *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
  36 *
  37 *  Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
  38 */
  39
  40#include <linux/module.h>
  41
  42#include <linux/types.h>
  43#include <linux/interrupt.h>
  44#include <linux/workqueue.h>
  45#include <linux/net.h>
  46
  47#include <linux/sunrpc/clnt.h>
  48#include <linux/sunrpc/metrics.h>
  49
  50#include "sunrpc.h"
  51
  52/*
  53 * Local variables
  54 */
  55
  56#ifdef RPC_DEBUG
  57# define RPCDBG_FACILITY        RPCDBG_XPRT
  58#endif
  59
  60/*
  61 * Local functions
  62 */
  63static void     xprt_request_init(struct rpc_task *, struct rpc_xprt *);
  64static inline void      do_xprt_reserve(struct rpc_task *);
  65static void     xprt_connect_status(struct rpc_task *task);
  66static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
  67
  68static DEFINE_SPINLOCK(xprt_list_lock);
  69static LIST_HEAD(xprt_list);
  70
  71/*
  72 * The transport code maintains an estimate on the maximum number of out-
  73 * standing RPC requests, using a smoothed version of the congestion
  74 * avoidance implemented in 44BSD. This is basically the Van Jacobson
  75 * congestion algorithm: If a retransmit occurs, the congestion window is
  76 * halved; otherwise, it is incremented by 1/cwnd when
  77 *
  78 *      -       a reply is received and
  79 *      -       a full number of requests are outstanding and
  80 *      -       the congestion window hasn't been updated recently.
  81 */
  82#define RPC_CWNDSHIFT           (8U)
  83#define RPC_CWNDSCALE           (1U << RPC_CWNDSHIFT)
  84#define RPC_INITCWND            RPC_CWNDSCALE
  85#define RPC_MAXCWND(xprt)       ((xprt)->max_reqs << RPC_CWNDSHIFT)
  86
  87#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
  88
  89/**
  90 * xprt_register_transport - register a transport implementation
  91 * @transport: transport to register
  92 *
  93 * If a transport implementation is loaded as a kernel module, it can
  94 * call this interface to make itself known to the RPC client.
  95 *
  96 * Returns:
  97 * 0:           transport successfully registered
  98 * -EEXIST:     transport already registered
  99 * -EINVAL:     transport module being unloaded
 100 */
 101int xprt_register_transport(struct xprt_class *transport)
 102{
 103        struct xprt_class *t;
 104        int result;
 105
 106        result = -EEXIST;
 107        spin_lock(&xprt_list_lock);
 108        list_for_each_entry(t, &xprt_list, list) {
 109                /* don't register the same transport class twice */
 110                if (t->ident == transport->ident)
 111                        goto out;
 112        }
 113
 114        list_add_tail(&transport->list, &xprt_list);
 115        printk(KERN_INFO "RPC: Registered %s transport module.\n",
 116               transport->name);
 117        result = 0;
 118
 119out:
 120        spin_unlock(&xprt_list_lock);
 121        return result;
 122}
 123EXPORT_SYMBOL_GPL(xprt_register_transport);
 124
 125/**
 126 * xprt_unregister_transport - unregister a transport implementation
 127 * @transport: transport to unregister
 128 *
 129 * Returns:
 130 * 0:           transport successfully unregistered
 131 * -ENOENT:     transport never registered
 132 */
 133int xprt_unregister_transport(struct xprt_class *transport)
 134{
 135        struct xprt_class *t;
 136        int result;
 137
 138        result = 0;
 139        spin_lock(&xprt_list_lock);
 140        list_for_each_entry(t, &xprt_list, list) {
 141                if (t == transport) {
 142                        printk(KERN_INFO
 143                                "RPC: Unregistered %s transport module.\n",
 144                                transport->name);
 145                        list_del_init(&transport->list);
 146                        goto out;
 147                }
 148        }
 149        result = -ENOENT;
 150
 151out:
 152        spin_unlock(&xprt_list_lock);
 153        return result;
 154}
 155EXPORT_SYMBOL_GPL(xprt_unregister_transport);
 156
 157/**
 158 * xprt_load_transport - load a transport implementation
 159 * @transport_name: transport to load
 160 *
 161 * Returns:
 162 * 0:           transport successfully loaded
 163 * -ENOENT:     transport module not available
 164 */
 165int xprt_load_transport(const char *transport_name)
 166{
 167        struct xprt_class *t;
 168        char module_name[sizeof t->name + 5];
 169        int result;
 170
 171        result = 0;
 172        spin_lock(&xprt_list_lock);
 173        list_for_each_entry(t, &xprt_list, list) {
 174                if (strcmp(t->name, transport_name) == 0) {
 175                        spin_unlock(&xprt_list_lock);
 176                        goto out;
 177                }
 178        }
 179        spin_unlock(&xprt_list_lock);
 180        strcpy(module_name, "xprt");
 181        strncat(module_name, transport_name, sizeof t->name);
 182        result = request_module(module_name);
 183out:
 184        return result;
 185}
 186EXPORT_SYMBOL_GPL(xprt_load_transport);
 187
 188/**
 189 * xprt_reserve_xprt - serialize write access to transports
 190 * @task: task that is requesting access to the transport
 191 *
 192 * This prevents mixing the payload of separate requests, and prevents
 193 * transport connects from colliding with writes.  No congestion control
 194 * is provided.
 195 */
 196int xprt_reserve_xprt(struct rpc_task *task)
 197{
 198        struct rpc_rqst *req = task->tk_rqstp;
 199        struct rpc_xprt *xprt = req->rq_xprt;
 200
 201        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
 202                if (task == xprt->snd_task)
 203                        return 1;
 204                if (task == NULL)
 205                        return 0;
 206                goto out_sleep;
 207        }
 208        xprt->snd_task = task;
 209        if (req) {
 210                req->rq_bytes_sent = 0;
 211                req->rq_ntrans++;
 212        }
 213        return 1;
 214
 215out_sleep:
 216        dprintk("RPC: %5u failed to lock transport %p\n",
 217                        task->tk_pid, xprt);
 218        task->tk_timeout = 0;
 219        task->tk_status = -EAGAIN;
 220        if (req && req->rq_ntrans)
 221                rpc_sleep_on(&xprt->resend, task, NULL);
 222        else
 223                rpc_sleep_on(&xprt->sending, task, NULL);
 224        return 0;
 225}
 226EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
 227
 228static void xprt_clear_locked(struct rpc_xprt *xprt)
 229{
 230        xprt->snd_task = NULL;
 231        if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) {
 232                smp_mb__before_clear_bit();
 233                clear_bit(XPRT_LOCKED, &xprt->state);
 234                smp_mb__after_clear_bit();
 235        } else
 236                queue_work(rpciod_workqueue, &xprt->task_cleanup);
 237}
 238
 239/*
 240 * xprt_reserve_xprt_cong - serialize write access to transports
 241 * @task: task that is requesting access to the transport
 242 *
 243 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
 244 * integrated into the decision of whether a request is allowed to be
 245 * woken up and given access to the transport.
 246 */
 247int xprt_reserve_xprt_cong(struct rpc_task *task)
 248{
 249        struct rpc_xprt *xprt = task->tk_xprt;
 250        struct rpc_rqst *req = task->tk_rqstp;
 251
 252        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
 253                if (task == xprt->snd_task)
 254                        return 1;
 255                goto out_sleep;
 256        }
 257        if (__xprt_get_cong(xprt, task)) {
 258                xprt->snd_task = task;
 259                if (req) {
 260                        req->rq_bytes_sent = 0;
 261                        req->rq_ntrans++;
 262                }
 263                return 1;
 264        }
 265        xprt_clear_locked(xprt);
 266out_sleep:
 267        dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
 268        task->tk_timeout = 0;
 269        task->tk_status = -EAGAIN;
 270        if (req && req->rq_ntrans)
 271                rpc_sleep_on(&xprt->resend, task, NULL);
 272        else
 273                rpc_sleep_on(&xprt->sending, task, NULL);
 274        return 0;
 275}
 276EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
 277
 278static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
 279{
 280        int retval;
 281
 282        spin_lock_bh(&xprt->transport_lock);
 283        retval = xprt->ops->reserve_xprt(task);
 284        spin_unlock_bh(&xprt->transport_lock);
 285        return retval;
 286}
 287
 288static void __xprt_lock_write_next(struct rpc_xprt *xprt)
 289{
 290        struct rpc_task *task;
 291        struct rpc_rqst *req;
 292
 293        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 294                return;
 295
 296        task = rpc_wake_up_next(&xprt->resend);
 297        if (!task) {
 298                task = rpc_wake_up_next(&xprt->sending);
 299                if (!task)
 300                        goto out_unlock;
 301        }
 302
 303        req = task->tk_rqstp;
 304        xprt->snd_task = task;
 305        if (req) {
 306                req->rq_bytes_sent = 0;
 307                req->rq_ntrans++;
 308        }
 309        return;
 310
 311out_unlock:
 312        xprt_clear_locked(xprt);
 313}
 314
 315static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
 316{
 317        struct rpc_task *task;
 318
 319        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 320                return;
 321        if (RPCXPRT_CONGESTED(xprt))
 322                goto out_unlock;
 323        task = rpc_wake_up_next(&xprt->resend);
 324        if (!task) {
 325                task = rpc_wake_up_next(&xprt->sending);
 326                if (!task)
 327                        goto out_unlock;
 328        }
 329        if (__xprt_get_cong(xprt, task)) {
 330                struct rpc_rqst *req = task->tk_rqstp;
 331                xprt->snd_task = task;
 332                if (req) {
 333                        req->rq_bytes_sent = 0;
 334                        req->rq_ntrans++;
 335                }
 336                return;
 337        }
 338out_unlock:
 339        xprt_clear_locked(xprt);
 340}
 341
 342/**
 343 * xprt_release_xprt - allow other requests to use a transport
 344 * @xprt: transport with other tasks potentially waiting
 345 * @task: task that is releasing access to the transport
 346 *
 347 * Note that "task" can be NULL.  No congestion control is provided.
 348 */
 349void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 350{
 351        if (xprt->snd_task == task) {
 352                xprt_clear_locked(xprt);
 353                __xprt_lock_write_next(xprt);
 354        }
 355}
 356EXPORT_SYMBOL_GPL(xprt_release_xprt);
 357
 358/**
 359 * xprt_release_xprt_cong - allow other requests to use a transport
 360 * @xprt: transport with other tasks potentially waiting
 361 * @task: task that is releasing access to the transport
 362 *
 363 * Note that "task" can be NULL.  Another task is awoken to use the
 364 * transport if the transport's congestion window allows it.
 365 */
 366void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 367{
 368        if (xprt->snd_task == task) {
 369                xprt_clear_locked(xprt);
 370                __xprt_lock_write_next_cong(xprt);
 371        }
 372}
 373EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
 374
 375static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
 376{
 377        spin_lock_bh(&xprt->transport_lock);
 378        xprt->ops->release_xprt(xprt, task);
 379        spin_unlock_bh(&xprt->transport_lock);
 380}
 381
 382/*
 383 * Van Jacobson congestion avoidance. Check if the congestion window
 384 * overflowed. Put the task to sleep if this is the case.
 385 */
 386static int
 387__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 388{
 389        struct rpc_rqst *req = task->tk_rqstp;
 390
 391        if (req->rq_cong)
 392                return 1;
 393        dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",
 394                        task->tk_pid, xprt->cong, xprt->cwnd);
 395        if (RPCXPRT_CONGESTED(xprt))
 396                return 0;
 397        req->rq_cong = 1;
 398        xprt->cong += RPC_CWNDSCALE;
 399        return 1;
 400}
 401
 402/*
 403 * Adjust the congestion window, and wake up the next task
 404 * that has been sleeping due to congestion
 405 */
 406static void
 407__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
 408{
 409        if (!req->rq_cong)
 410                return;
 411        req->rq_cong = 0;
 412        xprt->cong -= RPC_CWNDSCALE;
 413        __xprt_lock_write_next_cong(xprt);
 414}
 415
 416/**
 417 * xprt_release_rqst_cong - housekeeping when request is complete
 418 * @task: RPC request that recently completed
 419 *
 420 * Useful for transports that require congestion control.
 421 */
 422void xprt_release_rqst_cong(struct rpc_task *task)
 423{
 424        __xprt_put_cong(task->tk_xprt, task->tk_rqstp);
 425}
 426EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
 427
 428/**
 429 * xprt_adjust_cwnd - adjust transport congestion window
 430 * @task: recently completed RPC request used to adjust window
 431 * @result: result code of completed RPC request
 432 *
 433 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
 434 */
 435void xprt_adjust_cwnd(struct rpc_task *task, int result)
 436{
 437        struct rpc_rqst *req = task->tk_rqstp;
 438        struct rpc_xprt *xprt = task->tk_xprt;
 439        unsigned long cwnd = xprt->cwnd;
 440
 441        if (result >= 0 && cwnd <= xprt->cong) {
 442                /* The (cwnd >> 1) term makes sure
 443                 * the result gets rounded properly. */
 444                cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
 445                if (cwnd > RPC_MAXCWND(xprt))
 446                        cwnd = RPC_MAXCWND(xprt);
 447                __xprt_lock_write_next_cong(xprt);
 448        } else if (result == -ETIMEDOUT) {
 449                cwnd >>= 1;
 450                if (cwnd < RPC_CWNDSCALE)
 451                        cwnd = RPC_CWNDSCALE;
 452        }
 453        dprintk("RPC:       cong %ld, cwnd was %ld, now %ld\n",
 454                        xprt->cong, xprt->cwnd, cwnd);
 455        xprt->cwnd = cwnd;
 456        __xprt_put_cong(xprt, req);
 457}
 458EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
 459
 460/**
 461 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
 462 * @xprt: transport with waiting tasks
 463 * @status: result code to plant in each task before waking it
 464 *
 465 */
 466void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
 467{
 468        if (status < 0)
 469                rpc_wake_up_status(&xprt->pending, status);
 470        else
 471                rpc_wake_up(&xprt->pending);
 472}
 473EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
 474
 475/**
 476 * xprt_wait_for_buffer_space - wait for transport output buffer to clear
 477 * @task: task to be put to sleep
 478 * @action: function pointer to be executed after wait
 479 */
 480void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action)
 481{
 482        struct rpc_rqst *req = task->tk_rqstp;
 483        struct rpc_xprt *xprt = req->rq_xprt;
 484
 485        task->tk_timeout = req->rq_timeout;
 486        rpc_sleep_on(&xprt->pending, task, action);
 487}
 488EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
 489
 490/**
 491 * xprt_write_space - wake the task waiting for transport output buffer space
 492 * @xprt: transport with waiting tasks
 493 *
 494 * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
 495 */
 496void xprt_write_space(struct rpc_xprt *xprt)
 497{
 498        if (unlikely(xprt->shutdown))
 499                return;
 500
 501        spin_lock_bh(&xprt->transport_lock);
 502        if (xprt->snd_task) {
 503                dprintk("RPC:       write space: waking waiting task on "
 504                                "xprt %p\n", xprt);
 505                rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task);
 506        }
 507        spin_unlock_bh(&xprt->transport_lock);
 508}
 509EXPORT_SYMBOL_GPL(xprt_write_space);
 510
 511/**
 512 * xprt_set_retrans_timeout_def - set a request's retransmit timeout
 513 * @task: task whose timeout is to be set
 514 *
 515 * Set a request's retransmit timeout based on the transport's
 516 * default timeout parameters.  Used by transports that don't adjust
 517 * the retransmit timeout based on round-trip time estimation.
 518 */
 519void xprt_set_retrans_timeout_def(struct rpc_task *task)
 520{
 521        task->tk_timeout = task->tk_rqstp->rq_timeout;
 522}
 523EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def);
 524
 525/*
 526 * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout
 527 * @task: task whose timeout is to be set
 528 *
 529 * Set a request's retransmit timeout using the RTT estimator.
 530 */
 531void xprt_set_retrans_timeout_rtt(struct rpc_task *task)
 532{
 533        int timer = task->tk_msg.rpc_proc->p_timer;
 534        struct rpc_clnt *clnt = task->tk_client;
 535        struct rpc_rtt *rtt = clnt->cl_rtt;
 536        struct rpc_rqst *req = task->tk_rqstp;
 537        unsigned long max_timeout = clnt->cl_timeout->to_maxval;
 538
 539        task->tk_timeout = rpc_calc_rto(rtt, timer);
 540        task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
 541        if (task->tk_timeout > max_timeout || task->tk_timeout == 0)
 542                task->tk_timeout = max_timeout;
 543}
 544EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt);
 545
 546static void xprt_reset_majortimeo(struct rpc_rqst *req)
 547{
 548        const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
 549
 550        req->rq_majortimeo = req->rq_timeout;
 551        if (to->to_exponential)
 552                req->rq_majortimeo <<= to->to_retries;
 553        else
 554                req->rq_majortimeo += to->to_increment * to->to_retries;
 555        if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
 556                req->rq_majortimeo = to->to_maxval;
 557        req->rq_majortimeo += jiffies;
 558}
 559
 560/**
 561 * xprt_adjust_timeout - adjust timeout values for next retransmit
 562 * @req: RPC request containing parameters to use for the adjustment
 563 *
 564 */
 565int xprt_adjust_timeout(struct rpc_rqst *req)
 566{
 567        struct rpc_xprt *xprt = req->rq_xprt;
 568        const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
 569        int status = 0;
 570
 571        if (time_before(jiffies, req->rq_majortimeo)) {
 572                if (to->to_exponential)
 573                        req->rq_timeout <<= 1;
 574                else
 575                        req->rq_timeout += to->to_increment;
 576                if (to->to_maxval && req->rq_timeout >= to->to_maxval)
 577                        req->rq_timeout = to->to_maxval;
 578                req->rq_retries++;
 579        } else {
 580                req->rq_timeout = to->to_initval;
 581                req->rq_retries = 0;
 582                xprt_reset_majortimeo(req);
 583                /* Reset the RTT counters == "slow start" */
 584                spin_lock_bh(&xprt->transport_lock);
 585                rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
 586                spin_unlock_bh(&xprt->transport_lock);
 587                status = -ETIMEDOUT;
 588        }
 589
 590        if (req->rq_timeout == 0) {
 591                printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
 592                req->rq_timeout = 5 * HZ;
 593        }
 594        return status;
 595}
 596
 597static void xprt_autoclose(struct work_struct *work)
 598{
 599        struct rpc_xprt *xprt =
 600                container_of(work, struct rpc_xprt, task_cleanup);
 601
 602        xprt->ops->close(xprt);
 603        clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
 604        xprt_release_write(xprt, NULL);
 605}
 606
 607/**
 608 * xprt_disconnect_done - mark a transport as disconnected
 609 * @xprt: transport to flag for disconnect
 610 *
 611 */
 612void xprt_disconnect_done(struct rpc_xprt *xprt)
 613{
 614        dprintk("RPC:       disconnected transport %p\n", xprt);
 615        spin_lock_bh(&xprt->transport_lock);
 616        xprt_clear_connected(xprt);
 617        xprt_wake_pending_tasks(xprt, -EAGAIN);
 618        spin_unlock_bh(&xprt->transport_lock);
 619}
 620EXPORT_SYMBOL_GPL(xprt_disconnect_done);
 621
 622/**
 623 * xprt_force_disconnect - force a transport to disconnect
 624 * @xprt: transport to disconnect
 625 *
 626 */
 627void xprt_force_disconnect(struct rpc_xprt *xprt)
 628{
 629        /* Don't race with the test_bit() in xprt_clear_locked() */
 630        spin_lock_bh(&xprt->transport_lock);
 631        set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 632        /* Try to schedule an autoclose RPC call */
 633        if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
 634                queue_work(rpciod_workqueue, &xprt->task_cleanup);
 635        xprt_wake_pending_tasks(xprt, -EAGAIN);
 636        spin_unlock_bh(&xprt->transport_lock);
 637}
 638
 639/**
 640 * xprt_conditional_disconnect - force a transport to disconnect
 641 * @xprt: transport to disconnect
 642 * @cookie: 'connection cookie'
 643 *
 644 * This attempts to break the connection if and only if 'cookie' matches
 645 * the current transport 'connection cookie'. It ensures that we don't
 646 * try to break the connection more than once when we need to retransmit
 647 * a batch of RPC requests.
 648 *
 649 */
 650void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
 651{
 652        /* Don't race with the test_bit() in xprt_clear_locked() */
 653        spin_lock_bh(&xprt->transport_lock);
 654        if (cookie != xprt->connect_cookie)
 655                goto out;
 656        if (test_bit(XPRT_CLOSING, &xprt->state) || !xprt_connected(xprt))
 657                goto out;
 658        set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 659        /* Try to schedule an autoclose RPC call */
 660        if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
 661                queue_work(rpciod_workqueue, &xprt->task_cleanup);
 662        xprt_wake_pending_tasks(xprt, -EAGAIN);
 663out:
 664        spin_unlock_bh(&xprt->transport_lock);
 665}
 666
 667static void
 668xprt_init_autodisconnect(unsigned long data)
 669{
 670        struct rpc_xprt *xprt = (struct rpc_xprt *)data;
 671
 672        spin_lock(&xprt->transport_lock);
 673        if (!list_empty(&xprt->recv) || xprt->shutdown)
 674                goto out_abort;
 675        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 676                goto out_abort;
 677        spin_unlock(&xprt->transport_lock);
 678        set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
 679        queue_work(rpciod_workqueue, &xprt->task_cleanup);
 680        return;
 681out_abort:
 682        spin_unlock(&xprt->transport_lock);
 683}
 684
 685/**
 686 * xprt_connect - schedule a transport connect operation
 687 * @task: RPC task that is requesting the connect
 688 *
 689 */
 690void xprt_connect(struct rpc_task *task)
 691{
 692        struct rpc_xprt *xprt = task->tk_xprt;
 693
 694        dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid,
 695                        xprt, (xprt_connected(xprt) ? "is" : "is not"));
 696
 697        if (!xprt_bound(xprt)) {
 698                task->tk_status = -EAGAIN;
 699                return;
 700        }
 701        if (!xprt_lock_write(xprt, task))
 702                return;
 703        if (xprt_connected(xprt))
 704                xprt_release_write(xprt, task);
 705        else {
 706                if (task->tk_rqstp)
 707                        task->tk_rqstp->rq_bytes_sent = 0;
 708
 709                task->tk_timeout = xprt->connect_timeout;
 710                rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
 711                xprt->stat.connect_start = jiffies;
 712                xprt->ops->connect(task);
 713        }
 714        return;
 715}
 716
 717static void xprt_connect_status(struct rpc_task *task)
 718{
 719        struct rpc_xprt *xprt = task->tk_xprt;
 720
 721        if (task->tk_status == 0) {
 722                xprt->stat.connect_count++;
 723                xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start;
 724                dprintk("RPC: %5u xprt_connect_status: connection established\n",
 725                                task->tk_pid);
 726                return;
 727        }
 728
 729        switch (task->tk_status) {
 730        case -EAGAIN:
 731                dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid);
 732                break;
 733        case -ETIMEDOUT:
 734                dprintk("RPC: %5u xprt_connect_status: connect attempt timed "
 735                                "out\n", task->tk_pid);
 736                break;
 737        default:
 738                dprintk("RPC: %5u xprt_connect_status: error %d connecting to "
 739                                "server %s\n", task->tk_pid, -task->tk_status,
 740                                task->tk_client->cl_server);
 741                xprt_release_write(xprt, task);
 742                task->tk_status = -EIO;
 743        }
 744}
 745
 746/**
 747 * xprt_lookup_rqst - find an RPC request corresponding to an XID
 748 * @xprt: transport on which the original request was transmitted
 749 * @xid: RPC XID of incoming reply
 750 *
 751 */
 752struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
 753{
 754        struct list_head *pos;
 755
 756        list_for_each(pos, &xprt->recv) {
 757                struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
 758                if (entry->rq_xid == xid)
 759                        return entry;
 760        }
 761
 762        dprintk("RPC:       xprt_lookup_rqst did not find xid %08x\n",
 763                        ntohl(xid));
 764        xprt->stat.bad_xids++;
 765        return NULL;
 766}
 767EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
 768
 769/**
 770 * xprt_update_rtt - update an RPC client's RTT state after receiving a reply
 771 * @task: RPC request that recently completed
 772 *
 773 */
 774void xprt_update_rtt(struct rpc_task *task)
 775{
 776        struct rpc_rqst *req = task->tk_rqstp;
 777        struct rpc_rtt *rtt = task->tk_client->cl_rtt;
 778        unsigned timer = task->tk_msg.rpc_proc->p_timer;
 779
 780        if (timer) {
 781                if (req->rq_ntrans == 1)
 782                        rpc_update_rtt(rtt, timer,
 783                                        (long)jiffies - req->rq_xtime);
 784                rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
 785        }
 786}
 787EXPORT_SYMBOL_GPL(xprt_update_rtt);
 788
 789/**
 790 * xprt_complete_rqst - called when reply processing is complete
 791 * @task: RPC request that recently completed
 792 * @copied: actual number of bytes received from the transport
 793 *
 794 * Caller holds transport lock.
 795 */
 796void xprt_complete_rqst(struct rpc_task *task, int copied)
 797{
 798        struct rpc_rqst *req = task->tk_rqstp;
 799        struct rpc_xprt *xprt = req->rq_xprt;
 800
 801        dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
 802                        task->tk_pid, ntohl(req->rq_xid), copied);
 803
 804        xprt->stat.recvs++;
 805        task->tk_rtt = (long)jiffies - req->rq_xtime;
 806
 807        list_del_init(&req->rq_list);
 808        req->rq_private_buf.len = copied;
 809        /* Ensure all writes are done before we update */
 810        /* req->rq_reply_bytes_recvd */
 811        smp_wmb();
 812        req->rq_reply_bytes_recvd = copied;
 813        rpc_wake_up_queued_task(&xprt->pending, task);
 814}
 815EXPORT_SYMBOL_GPL(xprt_complete_rqst);
 816
 817static void xprt_timer(struct rpc_task *task)
 818{
 819        struct rpc_rqst *req = task->tk_rqstp;
 820        struct rpc_xprt *xprt = req->rq_xprt;
 821
 822        if (task->tk_status != -ETIMEDOUT)
 823                return;
 824        dprintk("RPC: %5u xprt_timer\n", task->tk_pid);
 825
 826        spin_lock_bh(&xprt->transport_lock);
 827        if (!req->rq_reply_bytes_recvd) {
 828                if (xprt->ops->timer)
 829                        xprt->ops->timer(task);
 830        } else
 831                task->tk_status = 0;
 832        spin_unlock_bh(&xprt->transport_lock);
 833}
 834
 835static inline int xprt_has_timer(struct rpc_xprt *xprt)
 836{
 837        return xprt->idle_timeout != 0;
 838}
 839
 840/**
 841 * xprt_prepare_transmit - reserve the transport before sending a request
 842 * @task: RPC task about to send a request
 843 *
 844 */
 845int xprt_prepare_transmit(struct rpc_task *task)
 846{
 847        struct rpc_rqst *req = task->tk_rqstp;
 848        struct rpc_xprt *xprt = req->rq_xprt;
 849        int err = 0;
 850
 851        dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid);
 852
 853        spin_lock_bh(&xprt->transport_lock);
 854        if (req->rq_reply_bytes_recvd && !req->rq_bytes_sent) {
 855                err = req->rq_reply_bytes_recvd;
 856                goto out_unlock;
 857        }
 858        if (!xprt->ops->reserve_xprt(task))
 859                err = -EAGAIN;
 860out_unlock:
 861        spin_unlock_bh(&xprt->transport_lock);
 862        return err;
 863}
 864
 865void xprt_end_transmit(struct rpc_task *task)
 866{
 867        xprt_release_write(task->tk_rqstp->rq_xprt, task);
 868}
 869
 870/**
 871 * xprt_transmit - send an RPC request on a transport
 872 * @task: controlling RPC task
 873 *
 874 * We have to copy the iovec because sendmsg fiddles with its contents.
 875 */
 876void xprt_transmit(struct rpc_task *task)
 877{
 878        struct rpc_rqst *req = task->tk_rqstp;
 879        struct rpc_xprt *xprt = req->rq_xprt;
 880        int status;
 881
 882        dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
 883
 884        if (!req->rq_reply_bytes_recvd) {
 885                if (list_empty(&req->rq_list) && rpc_reply_expected(task)) {
 886                        /*
 887                         * Add to the list only if we're expecting a reply
 888                         */
 889                        spin_lock_bh(&xprt->transport_lock);
 890                        /* Update the softirq receive buffer */
 891                        memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
 892                                        sizeof(req->rq_private_buf));
 893                        /* Add request to the receive list */
 894                        list_add_tail(&req->rq_list, &xprt->recv);
 895                        spin_unlock_bh(&xprt->transport_lock);
 896                        xprt_reset_majortimeo(req);
 897                        /* Turn off autodisconnect */
 898                        del_singleshot_timer_sync(&xprt->timer);
 899                }
 900        } else if (!req->rq_bytes_sent)
 901                return;
 902
 903        req->rq_connect_cookie = xprt->connect_cookie;
 904        req->rq_xtime = jiffies;
 905        status = xprt->ops->send_request(task);
 906        if (status != 0) {
 907                task->tk_status = status;
 908                return;
 909        }
 910
 911        dprintk("RPC: %5u xmit complete\n", task->tk_pid);
 912        spin_lock_bh(&xprt->transport_lock);
 913
 914        xprt->ops->set_retrans_timeout(task);
 915
 916        xprt->stat.sends++;
 917        xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
 918        xprt->stat.bklog_u += xprt->backlog.qlen;
 919
 920        /* Don't race with disconnect */
 921        if (!xprt_connected(xprt))
 922                task->tk_status = -ENOTCONN;
 923        else if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) {
 924                /*
 925                 * Sleep on the pending queue since
 926                 * we're expecting a reply.
 927                 */
 928                rpc_sleep_on(&xprt->pending, task, xprt_timer);
 929        }
 930        spin_unlock_bh(&xprt->transport_lock);
 931}
 932
 933static inline void do_xprt_reserve(struct rpc_task *task)
 934{
 935        struct rpc_xprt *xprt = task->tk_xprt;
 936
 937        task->tk_status = 0;
 938        if (task->tk_rqstp)
 939                return;
 940        if (!list_empty(&xprt->free)) {
 941                struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
 942                list_del_init(&req->rq_list);
 943                task->tk_rqstp = req;
 944                xprt_request_init(task, xprt);
 945                return;
 946        }
 947        dprintk("RPC:       waiting for request slot\n");
 948        task->tk_status = -EAGAIN;
 949        task->tk_timeout = 0;
 950        rpc_sleep_on(&xprt->backlog, task, NULL);
 951}
 952
 953/**
 954 * xprt_reserve - allocate an RPC request slot
 955 * @task: RPC task requesting a slot allocation
 956 *
 957 * If no more slots are available, place the task on the transport's
 958 * backlog queue.
 959 */
 960void xprt_reserve(struct rpc_task *task)
 961{
 962        struct rpc_xprt *xprt = task->tk_xprt;
 963
 964        task->tk_status = -EIO;
 965        spin_lock(&xprt->reserve_lock);
 966        do_xprt_reserve(task);
 967        spin_unlock(&xprt->reserve_lock);
 968}
 969
 970static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
 971{
 972        return xprt->xid++;
 973}
 974
 975static inline void xprt_init_xid(struct rpc_xprt *xprt)
 976{
 977        xprt->xid = net_random();
 978}
 979
 980static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
 981{
 982        struct rpc_rqst *req = task->tk_rqstp;
 983
 984        req->rq_timeout = task->tk_client->cl_timeout->to_initval;
 985        req->rq_task    = task;
 986        req->rq_xprt    = xprt;
 987        req->rq_buffer  = NULL;
 988        req->rq_xid     = xprt_alloc_xid(xprt);
 989        req->rq_release_snd_buf = NULL;
 990        xprt_reset_majortimeo(req);
 991        dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
 992                        req, ntohl(req->rq_xid));
 993}
 994
 995/**
 996 * xprt_release - release an RPC request slot
 997 * @task: task which is finished with the slot
 998 *
 999 */
1000void xprt_release(struct rpc_task *task)
1001{
1002        struct rpc_xprt *xprt;
1003        struct rpc_rqst *req;
1004        int is_bc_request;
1005
1006        if (!(req = task->tk_rqstp))
1007                return;
1008
1009        /* Preallocated backchannel request? */
1010        is_bc_request = bc_prealloc(req);
1011
1012        xprt = req->rq_xprt;
1013        rpc_count_iostats(task);
1014        spin_lock_bh(&xprt->transport_lock);
1015        xprt->ops->release_xprt(xprt, task);
1016        if (xprt->ops->release_request)
1017                xprt->ops->release_request(task);
1018        if (!list_empty(&req->rq_list))
1019                list_del(&req->rq_list);
1020        xprt->last_used = jiffies;
1021        if (list_empty(&xprt->recv) && xprt_has_timer(xprt))
1022                mod_timer(&xprt->timer,
1023                                xprt->last_used + xprt->idle_timeout);
1024        spin_unlock_bh(&xprt->transport_lock);
1025        if (!bc_prealloc(req))
1026                xprt->ops->buf_free(req->rq_buffer);
1027        task->tk_rqstp = NULL;
1028        if (req->rq_release_snd_buf)
1029                req->rq_release_snd_buf(req);
1030
1031        /*
1032         * Early exit if this is a backchannel preallocated request.
1033         * There is no need to have it added to the RPC slot list.
1034         */
1035        if (is_bc_request)
1036                return;
1037
1038        memset(req, 0, sizeof(*req));   /* mark unused */
1039
1040        dprintk("RPC: %5u release request %p\n", task->tk_pid, req);
1041
1042        spin_lock(&xprt->reserve_lock);
1043        list_add(&req->rq_list, &xprt->free);
1044        rpc_wake_up_next(&xprt->backlog);
1045        spin_unlock(&xprt->reserve_lock);
1046}
1047
1048/**
1049 * xprt_create_transport - create an RPC transport
1050 * @args: rpc transport creation arguments
1051 *
1052 */
1053struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
1054{
1055        struct rpc_xprt *xprt;
1056        struct rpc_rqst *req;
1057        struct xprt_class *t;
1058
1059        spin_lock(&xprt_list_lock);
1060        list_for_each_entry(t, &xprt_list, list) {
1061                if (t->ident == args->ident) {
1062                        spin_unlock(&xprt_list_lock);
1063                        goto found;
1064                }
1065        }
1066        spin_unlock(&xprt_list_lock);
1067        printk(KERN_ERR "RPC: transport (%d) not supported\n", args->ident);
1068        return ERR_PTR(-EIO);
1069
1070found:
1071        xprt = t->setup(args);
1072        if (IS_ERR(xprt)) {
1073                dprintk("RPC:       xprt_create_transport: failed, %ld\n",
1074                                -PTR_ERR(xprt));
1075                return xprt;
1076        }
1077
1078        kref_init(&xprt->kref);
1079        spin_lock_init(&xprt->transport_lock);
1080        spin_lock_init(&xprt->reserve_lock);
1081
1082        INIT_LIST_HEAD(&xprt->free);
1083        INIT_LIST_HEAD(&xprt->recv);
1084#if defined(CONFIG_NFS_V4_1)
1085        spin_lock_init(&xprt->bc_pa_lock);
1086        INIT_LIST_HEAD(&xprt->bc_pa_list);
1087#endif /* CONFIG_NFS_V4_1 */
1088
1089        INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
1090        if (xprt_has_timer(xprt))
1091                setup_timer(&xprt->timer, xprt_init_autodisconnect,
1092                            (unsigned long)xprt);
1093        else
1094                init_timer(&xprt->timer);
1095        xprt->last_used = jiffies;
1096        xprt->cwnd = RPC_INITCWND;
1097        xprt->bind_index = 0;
1098
1099        rpc_init_wait_queue(&xprt->binding, "xprt_binding");
1100        rpc_init_wait_queue(&xprt->pending, "xprt_pending");
1101        rpc_init_wait_queue(&xprt->sending, "xprt_sending");
1102        rpc_init_wait_queue(&xprt->resend, "xprt_resend");
1103        rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
1104
1105        /* initialize free list */
1106        for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--)
1107                list_add(&req->rq_list, &xprt->free);
1108
1109        xprt_init_xid(xprt);
1110
1111        dprintk("RPC:       created transport %p with %u slots\n", xprt,
1112                        xprt->max_reqs);
1113        return xprt;
1114}
1115
1116/**
1117 * xprt_destroy - destroy an RPC transport, killing off all requests.
1118 * @kref: kref for the transport to destroy
1119 *
1120 */
1121static void xprt_destroy(struct kref *kref)
1122{
1123        struct rpc_xprt *xprt = container_of(kref, struct rpc_xprt, kref);
1124
1125        dprintk("RPC:       destroying transport %p\n", xprt);
1126        xprt->shutdown = 1;
1127        del_timer_sync(&xprt->timer);
1128
1129        rpc_destroy_wait_queue(&xprt->binding);
1130        rpc_destroy_wait_queue(&xprt->pending);
1131        rpc_destroy_wait_queue(&xprt->sending);
1132        rpc_destroy_wait_queue(&xprt->resend);
1133        rpc_destroy_wait_queue(&xprt->backlog);
1134        /*
1135         * Tear down transport state and free the rpc_xprt
1136         */
1137        xprt->ops->destroy(xprt);
1138}
1139
1140/**
1141 * xprt_put - release a reference to an RPC transport.
1142 * @xprt: pointer to the transport
1143 *
1144 */
1145void xprt_put(struct rpc_xprt *xprt)
1146{
1147        kref_put(&xprt->kref, xprt_destroy);
1148}
1149
1150/**
1151 * xprt_get - return a reference to an RPC transport.
1152 * @xprt: pointer to the transport
1153 *
1154 */
1155struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
1156{
1157        kref_get(&xprt->kref);
1158        return xprt;
1159}
1160