linux/net/sunrpc/sched.c
<<
>>
Prefs
   1/*
   2 * linux/net/sunrpc/sched.c
   3 *
   4 * Scheduling for synchronous and asynchronous RPC requests.
   5 *
   6 * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
   7 *
   8 * TCP NFS related read + write fixes
   9 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
  10 */
  11
  12#include <linux/module.h>
  13
  14#include <linux/sched.h>
  15#include <linux/interrupt.h>
  16#include <linux/slab.h>
  17#include <linux/mempool.h>
  18#include <linux/smp.h>
  19#include <linux/spinlock.h>
  20#include <linux/mutex.h>
  21
  22#include <linux/sunrpc/clnt.h>
  23
  24#include "sunrpc.h"
  25
  26#ifdef RPC_DEBUG
  27#define RPCDBG_FACILITY         RPCDBG_SCHED
  28#define RPC_TASK_MAGIC_ID       0xf00baa
  29#endif
  30
  31/*
  32 * RPC slabs and memory pools
  33 */
  34#define RPC_BUFFER_MAXSIZE      (2048)
  35#define RPC_BUFFER_POOLSIZE     (8)
  36#define RPC_TASK_POOLSIZE       (8)
  37static struct kmem_cache        *rpc_task_slabp __read_mostly;
  38static struct kmem_cache        *rpc_buffer_slabp __read_mostly;
  39static mempool_t        *rpc_task_mempool __read_mostly;
  40static mempool_t        *rpc_buffer_mempool __read_mostly;
  41
  42static void                     rpc_async_schedule(struct work_struct *);
  43static void                      rpc_release_task(struct rpc_task *task);
  44static void __rpc_queue_timer_fn(unsigned long ptr);
  45
  46/*
  47 * RPC tasks sit here while waiting for conditions to improve.
  48 */
  49static struct rpc_wait_queue delay_queue;
  50
  51/*
  52 * rpciod-related stuff
  53 */
  54struct workqueue_struct *rpciod_workqueue;
  55
  56/*
  57 * Disable the timer for a given RPC task. Should be called with
  58 * queue->lock and bh_disabled in order to avoid races within
  59 * rpc_run_timer().
  60 */
  61static void
  62__rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
  63{
  64        if (task->tk_timeout == 0)
  65                return;
  66        dprintk("RPC: %5u disabling timer\n", task->tk_pid);
  67        task->tk_timeout = 0;
  68        list_del(&task->u.tk_wait.timer_list);
  69        if (list_empty(&queue->timer_list.list))
  70                del_timer(&queue->timer_list.timer);
  71}
  72
  73static void
  74rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
  75{
  76        queue->timer_list.expires = expires;
  77        mod_timer(&queue->timer_list.timer, expires);
  78}
  79
  80/*
  81 * Set up a timer for the current task.
  82 */
  83static void
  84__rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
  85{
  86        if (!task->tk_timeout)
  87                return;
  88
  89        dprintk("RPC: %5u setting alarm for %lu ms\n",
  90                        task->tk_pid, task->tk_timeout * 1000 / HZ);
  91
  92        task->u.tk_wait.expires = jiffies + task->tk_timeout;
  93        if (list_empty(&queue->timer_list.list) || time_before(task->u.tk_wait.expires, queue->timer_list.expires))
  94                rpc_set_queue_timer(queue, task->u.tk_wait.expires);
  95        list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
  96}
  97
  98/*
  99 * Add new request to a priority queue.
 100 */
 101static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct rpc_task *task)
 102{
 103        struct list_head *q;
 104        struct rpc_task *t;
 105
 106        INIT_LIST_HEAD(&task->u.tk_wait.links);
 107        q = &queue->tasks[task->tk_priority];
 108        if (unlikely(task->tk_priority > queue->maxpriority))
 109                q = &queue->tasks[queue->maxpriority];
 110        list_for_each_entry(t, q, u.tk_wait.list) {
 111                if (t->tk_owner == task->tk_owner) {
 112                        list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
 113                        return;
 114                }
 115        }
 116        list_add_tail(&task->u.tk_wait.list, q);
 117}
 118
 119/*
 120 * Add new request to wait queue.
 121 *
 122 * Swapper tasks always get inserted at the head of the queue.
 123 * This should avoid many nasty memory deadlocks and hopefully
 124 * improve overall performance.
 125 * Everyone else gets appended to the queue to ensure proper FIFO behavior.
 126 */
 127static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
 128{
 129        BUG_ON (RPC_IS_QUEUED(task));
 130
 131        if (RPC_IS_PRIORITY(queue))
 132                __rpc_add_wait_queue_priority(queue, task);
 133        else if (RPC_IS_SWAPPER(task))
 134                list_add(&task->u.tk_wait.list, &queue->tasks[0]);
 135        else
 136                list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
 137        task->tk_waitqueue = queue;
 138        queue->qlen++;
 139        rpc_set_queued(task);
 140
 141        dprintk("RPC: %5u added to queue %p \"%s\"\n",
 142                        task->tk_pid, queue, rpc_qname(queue));
 143}
 144
 145/*
 146 * Remove request from a priority queue.
 147 */
 148static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
 149{
 150        struct rpc_task *t;
 151
 152        if (!list_empty(&task->u.tk_wait.links)) {
 153                t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list);
 154                list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
 155                list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
 156        }
 157}
 158
 159/*
 160 * Remove request from queue.
 161 * Note: must be called with spin lock held.
 162 */
 163static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
 164{
 165        __rpc_disable_timer(queue, task);
 166        if (RPC_IS_PRIORITY(queue))
 167                __rpc_remove_wait_queue_priority(task);
 168        list_del(&task->u.tk_wait.list);
 169        queue->qlen--;
 170        dprintk("RPC: %5u removed from queue %p \"%s\"\n",
 171                        task->tk_pid, queue, rpc_qname(queue));
 172}
 173
 174static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
 175{
 176        queue->priority = priority;
 177        queue->count = 1 << (priority * 2);
 178}
 179
 180static inline void rpc_set_waitqueue_owner(struct rpc_wait_queue *queue, pid_t pid)
 181{
 182        queue->owner = pid;
 183        queue->nr = RPC_BATCH_COUNT;
 184}
 185
 186static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
 187{
 188        rpc_set_waitqueue_priority(queue, queue->maxpriority);
 189        rpc_set_waitqueue_owner(queue, 0);
 190}
 191
 192static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues)
 193{
 194        int i;
 195
 196        spin_lock_init(&queue->lock);
 197        for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
 198                INIT_LIST_HEAD(&queue->tasks[i]);
 199        queue->maxpriority = nr_queues - 1;
 200        rpc_reset_waitqueue_priority(queue);
 201        queue->qlen = 0;
 202        setup_timer(&queue->timer_list.timer, __rpc_queue_timer_fn, (unsigned long)queue);
 203        INIT_LIST_HEAD(&queue->timer_list.list);
 204#ifdef RPC_DEBUG
 205        queue->name = qname;
 206#endif
 207}
 208
 209void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
 210{
 211        __rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
 212}
 213
 214void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
 215{
 216        __rpc_init_priority_wait_queue(queue, qname, 1);
 217}
 218EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
 219
 220void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
 221{
 222        del_timer_sync(&queue->timer_list.timer);
 223}
 224EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
 225
 226static int rpc_wait_bit_killable(void *word)
 227{
 228        if (fatal_signal_pending(current))
 229                return -ERESTARTSYS;
 230        schedule();
 231        return 0;
 232}
 233
 234#ifdef RPC_DEBUG
 235static void rpc_task_set_debuginfo(struct rpc_task *task)
 236{
 237        static atomic_t rpc_pid;
 238
 239        task->tk_magic = RPC_TASK_MAGIC_ID;
 240        task->tk_pid = atomic_inc_return(&rpc_pid);
 241}
 242#else
 243static inline void rpc_task_set_debuginfo(struct rpc_task *task)
 244{
 245}
 246#endif
 247
 248static void rpc_set_active(struct rpc_task *task)
 249{
 250        struct rpc_clnt *clnt;
 251        if (test_and_set_bit(RPC_TASK_ACTIVE, &task->tk_runstate) != 0)
 252                return;
 253        rpc_task_set_debuginfo(task);
 254        /* Add to global list of all tasks */
 255        clnt = task->tk_client;
 256        if (clnt != NULL) {
 257                spin_lock(&clnt->cl_lock);
 258                list_add_tail(&task->tk_task, &clnt->cl_tasks);
 259                spin_unlock(&clnt->cl_lock);
 260        }
 261}
 262
 263/*
 264 * Mark an RPC call as having completed by clearing the 'active' bit
 265 */
 266static void rpc_mark_complete_task(struct rpc_task *task)
 267{
 268        smp_mb__before_clear_bit();
 269        clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
 270        smp_mb__after_clear_bit();
 271        wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
 272}
 273
 274/*
 275 * Allow callers to wait for completion of an RPC call
 276 */
 277int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
 278{
 279        if (action == NULL)
 280                action = rpc_wait_bit_killable;
 281        return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
 282                        action, TASK_KILLABLE);
 283}
 284EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
 285
 286/*
 287 * Make an RPC task runnable.
 288 *
 289 * Note: If the task is ASYNC, this must be called with
 290 * the spinlock held to protect the wait queue operation.
 291 */
 292static void rpc_make_runnable(struct rpc_task *task)
 293{
 294        rpc_clear_queued(task);
 295        if (rpc_test_and_set_running(task))
 296                return;
 297        if (RPC_IS_ASYNC(task)) {
 298                int status;
 299
 300                INIT_WORK(&task->u.tk_work, rpc_async_schedule);
 301                status = queue_work(rpciod_workqueue, &task->u.tk_work);
 302                if (status < 0) {
 303                        printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
 304                        task->tk_status = status;
 305                        return;
 306                }
 307        } else
 308                wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
 309}
 310
 311/*
 312 * Prepare for sleeping on a wait queue.
 313 * By always appending tasks to the list we ensure FIFO behavior.
 314 * NB: An RPC task will only receive interrupt-driven events as long
 315 * as it's on a wait queue.
 316 */
 317static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
 318                        rpc_action action)
 319{
 320        dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n",
 321                        task->tk_pid, rpc_qname(q), jiffies);
 322
 323        if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) {
 324                printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n");
 325                return;
 326        }
 327
 328        __rpc_add_wait_queue(q, task);
 329
 330        BUG_ON(task->tk_callback != NULL);
 331        task->tk_callback = action;
 332        __rpc_add_timer(q, task);
 333}
 334
 335void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
 336                                rpc_action action)
 337{
 338        /* Mark the task as being activated if so needed */
 339        rpc_set_active(task);
 340
 341        /*
 342         * Protect the queue operations.
 343         */
 344        spin_lock_bh(&q->lock);
 345        __rpc_sleep_on(q, task, action);
 346        spin_unlock_bh(&q->lock);
 347}
 348EXPORT_SYMBOL_GPL(rpc_sleep_on);
 349
 350/**
 351 * __rpc_do_wake_up_task - wake up a single rpc_task
 352 * @queue: wait queue
 353 * @task: task to be woken up
 354 *
 355 * Caller must hold queue->lock, and have cleared the task queued flag.
 356 */
 357static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task)
 358{
 359        dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
 360                        task->tk_pid, jiffies);
 361
 362#ifdef RPC_DEBUG
 363        BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
 364#endif
 365        /* Has the task been executed yet? If not, we cannot wake it up! */
 366        if (!RPC_IS_ACTIVATED(task)) {
 367                printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
 368                return;
 369        }
 370
 371        __rpc_remove_wait_queue(queue, task);
 372
 373        rpc_make_runnable(task);
 374
 375        dprintk("RPC:       __rpc_wake_up_task done\n");
 376}
 377
 378/*
 379 * Wake up a queued task while the queue lock is being held
 380 */
 381static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
 382{
 383        if (RPC_IS_QUEUED(task) && task->tk_waitqueue == queue)
 384                __rpc_do_wake_up_task(queue, task);
 385}
 386
 387/*
 388 * Wake up a task on a specific queue
 389 */
 390void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
 391{
 392        spin_lock_bh(&queue->lock);
 393        rpc_wake_up_task_queue_locked(queue, task);
 394        spin_unlock_bh(&queue->lock);
 395}
 396EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
 397
 398/*
 399 * Wake up the specified task
 400 */
 401static void rpc_wake_up_task(struct rpc_task *task)
 402{
 403        rpc_wake_up_queued_task(task->tk_waitqueue, task);
 404}
 405
 406/*
 407 * Wake up the next task on a priority queue.
 408 */
 409static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queue)
 410{
 411        struct list_head *q;
 412        struct rpc_task *task;
 413
 414        /*
 415         * Service a batch of tasks from a single owner.
 416         */
 417        q = &queue->tasks[queue->priority];
 418        if (!list_empty(q)) {
 419                task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
 420                if (queue->owner == task->tk_owner) {
 421                        if (--queue->nr)
 422                                goto out;
 423                        list_move_tail(&task->u.tk_wait.list, q);
 424                }
 425                /*
 426                 * Check if we need to switch queues.
 427                 */
 428                if (--queue->count)
 429                        goto new_owner;
 430        }
 431
 432        /*
 433         * Service the next queue.
 434         */
 435        do {
 436                if (q == &queue->tasks[0])
 437                        q = &queue->tasks[queue->maxpriority];
 438                else
 439                        q = q - 1;
 440                if (!list_empty(q)) {
 441                        task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
 442                        goto new_queue;
 443                }
 444        } while (q != &queue->tasks[queue->priority]);
 445
 446        rpc_reset_waitqueue_priority(queue);
 447        return NULL;
 448
 449new_queue:
 450        rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
 451new_owner:
 452        rpc_set_waitqueue_owner(queue, task->tk_owner);
 453out:
 454        rpc_wake_up_task_queue_locked(queue, task);
 455        return task;
 456}
 457
 458/*
 459 * Wake up the next task on the wait queue.
 460 */
 461struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
 462{
 463        struct rpc_task *task = NULL;
 464
 465        dprintk("RPC:       wake_up_next(%p \"%s\")\n",
 466                        queue, rpc_qname(queue));
 467        spin_lock_bh(&queue->lock);
 468        if (RPC_IS_PRIORITY(queue))
 469                task = __rpc_wake_up_next_priority(queue);
 470        else {
 471                task_for_first(task, &queue->tasks[0])
 472                        rpc_wake_up_task_queue_locked(queue, task);
 473        }
 474        spin_unlock_bh(&queue->lock);
 475
 476        return task;
 477}
 478EXPORT_SYMBOL_GPL(rpc_wake_up_next);
 479
 480/**
 481 * rpc_wake_up - wake up all rpc_tasks
 482 * @queue: rpc_wait_queue on which the tasks are sleeping
 483 *
 484 * Grabs queue->lock
 485 */
 486void rpc_wake_up(struct rpc_wait_queue *queue)
 487{
 488        struct rpc_task *task, *next;
 489        struct list_head *head;
 490
 491        spin_lock_bh(&queue->lock);
 492        head = &queue->tasks[queue->maxpriority];
 493        for (;;) {
 494                list_for_each_entry_safe(task, next, head, u.tk_wait.list)
 495                        rpc_wake_up_task_queue_locked(queue, task);
 496                if (head == &queue->tasks[0])
 497                        break;
 498                head--;
 499        }
 500        spin_unlock_bh(&queue->lock);
 501}
 502EXPORT_SYMBOL_GPL(rpc_wake_up);
 503
 504/**
 505 * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
 506 * @queue: rpc_wait_queue on which the tasks are sleeping
 507 * @status: status value to set
 508 *
 509 * Grabs queue->lock
 510 */
 511void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
 512{
 513        struct rpc_task *task, *next;
 514        struct list_head *head;
 515
 516        spin_lock_bh(&queue->lock);
 517        head = &queue->tasks[queue->maxpriority];
 518        for (;;) {
 519                list_for_each_entry_safe(task, next, head, u.tk_wait.list) {
 520                        task->tk_status = status;
 521                        rpc_wake_up_task_queue_locked(queue, task);
 522                }
 523                if (head == &queue->tasks[0])
 524                        break;
 525                head--;
 526        }
 527        spin_unlock_bh(&queue->lock);
 528}
 529EXPORT_SYMBOL_GPL(rpc_wake_up_status);
 530
 531static void __rpc_queue_timer_fn(unsigned long ptr)
 532{
 533        struct rpc_wait_queue *queue = (struct rpc_wait_queue *)ptr;
 534        struct rpc_task *task, *n;
 535        unsigned long expires, now, timeo;
 536
 537        spin_lock(&queue->lock);
 538        expires = now = jiffies;
 539        list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) {
 540                timeo = task->u.tk_wait.expires;
 541                if (time_after_eq(now, timeo)) {
 542                        dprintk("RPC: %5u timeout\n", task->tk_pid);
 543                        task->tk_status = -ETIMEDOUT;
 544                        rpc_wake_up_task_queue_locked(queue, task);
 545                        continue;
 546                }
 547                if (expires == now || time_after(expires, timeo))
 548                        expires = timeo;
 549        }
 550        if (!list_empty(&queue->timer_list.list))
 551                rpc_set_queue_timer(queue, expires);
 552        spin_unlock(&queue->lock);
 553}
 554
 555static void __rpc_atrun(struct rpc_task *task)
 556{
 557        task->tk_status = 0;
 558}
 559
 560/*
 561 * Run a task at a later time
 562 */
 563void rpc_delay(struct rpc_task *task, unsigned long delay)
 564{
 565        task->tk_timeout = delay;
 566        rpc_sleep_on(&delay_queue, task, __rpc_atrun);
 567}
 568EXPORT_SYMBOL_GPL(rpc_delay);
 569
 570/*
 571 * Helper to call task->tk_ops->rpc_call_prepare
 572 */
 573void rpc_prepare_task(struct rpc_task *task)
 574{
 575        task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
 576}
 577
 578/*
 579 * Helper that calls task->tk_ops->rpc_call_done if it exists
 580 */
 581void rpc_exit_task(struct rpc_task *task)
 582{
 583        task->tk_action = NULL;
 584        if (task->tk_ops->rpc_call_done != NULL) {
 585                task->tk_ops->rpc_call_done(task, task->tk_calldata);
 586                if (task->tk_action != NULL) {
 587                        WARN_ON(RPC_ASSASSINATED(task));
 588                        /* Always release the RPC slot and buffer memory */
 589                        xprt_release(task);
 590                }
 591        }
 592}
 593EXPORT_SYMBOL_GPL(rpc_exit_task);
 594
 595void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
 596{
 597        if (ops->rpc_release != NULL)
 598                ops->rpc_release(calldata);
 599}
 600
 601/*
 602 * This is the RPC `scheduler' (or rather, the finite state machine).
 603 */
 604static void __rpc_execute(struct rpc_task *task)
 605{
 606        struct rpc_wait_queue *queue;
 607        int task_is_async = RPC_IS_ASYNC(task);
 608        int status = 0;
 609
 610        dprintk("RPC: %5u __rpc_execute flags=0x%x\n",
 611                        task->tk_pid, task->tk_flags);
 612
 613        BUG_ON(RPC_IS_QUEUED(task));
 614
 615        for (;;) {
 616
 617                /*
 618                 * Execute any pending callback.
 619                 */
 620                if (task->tk_callback) {
 621                        void (*save_callback)(struct rpc_task *);
 622
 623                        /*
 624                         * We set tk_callback to NULL before calling it,
 625                         * in case it sets the tk_callback field itself:
 626                         */
 627                        save_callback = task->tk_callback;
 628                        task->tk_callback = NULL;
 629                        save_callback(task);
 630                }
 631
 632                /*
 633                 * Perform the next FSM step.
 634                 * tk_action may be NULL when the task has been killed
 635                 * by someone else.
 636                 */
 637                if (!RPC_IS_QUEUED(task)) {
 638                        if (task->tk_action == NULL)
 639                                break;
 640                        task->tk_action(task);
 641                }
 642
 643                /*
 644                 * Lockless check for whether task is sleeping or not.
 645                 */
 646                if (!RPC_IS_QUEUED(task))
 647                        continue;
 648                /*
 649                 * The queue->lock protects against races with
 650                 * rpc_make_runnable().
 651                 *
 652                 * Note that once we clear RPC_TASK_RUNNING on an asynchronous
 653                 * rpc_task, rpc_make_runnable() can assign it to a
 654                 * different workqueue. We therefore cannot assume that the
 655                 * rpc_task pointer may still be dereferenced.
 656                 */
 657                queue = task->tk_waitqueue;
 658                spin_lock_bh(&queue->lock);
 659                if (!RPC_IS_QUEUED(task)) {
 660                        spin_unlock_bh(&queue->lock);
 661                        continue;
 662                }
 663                rpc_clear_running(task);
 664                spin_unlock_bh(&queue->lock);
 665                if (task_is_async)
 666                        return;
 667
 668                /* sync task: sleep here */
 669                dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid);
 670                status = out_of_line_wait_on_bit(&task->tk_runstate,
 671                                RPC_TASK_QUEUED, rpc_wait_bit_killable,
 672                                TASK_KILLABLE);
 673                if (status == -ERESTARTSYS) {
 674                        /*
 675                         * When a sync task receives a signal, it exits with
 676                         * -ERESTARTSYS. In order to catch any callbacks that
 677                         * clean up after sleeping on some queue, we don't
 678                         * break the loop here, but go around once more.
 679                         */
 680                        dprintk("RPC: %5u got signal\n", task->tk_pid);
 681                        task->tk_flags |= RPC_TASK_KILLED;
 682                        rpc_exit(task, -ERESTARTSYS);
 683                        rpc_wake_up_task(task);
 684                }
 685                rpc_set_running(task);
 686                dprintk("RPC: %5u sync task resuming\n", task->tk_pid);
 687        }
 688
 689        dprintk("RPC: %5u return %d, status %d\n", task->tk_pid, status,
 690                        task->tk_status);
 691        /* Release all resources associated with the task */
 692        rpc_release_task(task);
 693}
 694
 695/*
 696 * User-visible entry point to the scheduler.
 697 *
 698 * This may be called recursively if e.g. an async NFS task updates
 699 * the attributes and finds that dirty pages must be flushed.
 700 * NOTE: Upon exit of this function the task is guaranteed to be
 701 *       released. In particular note that tk_release() will have
 702 *       been called, so your task memory may have been freed.
 703 */
 704void rpc_execute(struct rpc_task *task)
 705{
 706        rpc_set_active(task);
 707        rpc_set_running(task);
 708        __rpc_execute(task);
 709}
 710
 711static void rpc_async_schedule(struct work_struct *work)
 712{
 713        __rpc_execute(container_of(work, struct rpc_task, u.tk_work));
 714}
 715
 716/**
 717 * rpc_malloc - allocate an RPC buffer
 718 * @task: RPC task that will use this buffer
 719 * @size: requested byte size
 720 *
 721 * To prevent rpciod from hanging, this allocator never sleeps,
 722 * returning NULL if the request cannot be serviced immediately.
 723 * The caller can arrange to sleep in a way that is safe for rpciod.
 724 *
 725 * Most requests are 'small' (under 2KiB) and can be serviced from a
 726 * mempool, ensuring that NFS reads and writes can always proceed,
 727 * and that there is good locality of reference for these buffers.
 728 *
 729 * In order to avoid memory starvation triggering more writebacks of
 730 * NFS requests, we avoid using GFP_KERNEL.
 731 */
 732void *rpc_malloc(struct rpc_task *task, size_t size)
 733{
 734        struct rpc_buffer *buf;
 735        gfp_t gfp = RPC_IS_SWAPPER(task) ? GFP_ATOMIC : GFP_NOWAIT;
 736
 737        size += sizeof(struct rpc_buffer);
 738        if (size <= RPC_BUFFER_MAXSIZE)
 739                buf = mempool_alloc(rpc_buffer_mempool, gfp);
 740        else
 741                buf = kmalloc(size, gfp);
 742
 743        if (!buf)
 744                return NULL;
 745
 746        buf->len = size;
 747        dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
 748                        task->tk_pid, size, buf);
 749        return &buf->data;
 750}
 751EXPORT_SYMBOL_GPL(rpc_malloc);
 752
 753/**
 754 * rpc_free - free buffer allocated via rpc_malloc
 755 * @buffer: buffer to free
 756 *
 757 */
 758void rpc_free(void *buffer)
 759{
 760        size_t size;
 761        struct rpc_buffer *buf;
 762
 763        if (!buffer)
 764                return;
 765
 766        buf = container_of(buffer, struct rpc_buffer, data);
 767        size = buf->len;
 768
 769        dprintk("RPC:       freeing buffer of size %zu at %p\n",
 770                        size, buf);
 771
 772        if (size <= RPC_BUFFER_MAXSIZE)
 773                mempool_free(buf, rpc_buffer_mempool);
 774        else
 775                kfree(buf);
 776}
 777EXPORT_SYMBOL_GPL(rpc_free);
 778
 779/*
 780 * Creation and deletion of RPC task structures
 781 */
 782static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
 783{
 784        memset(task, 0, sizeof(*task));
 785        atomic_set(&task->tk_count, 1);
 786        task->tk_flags  = task_setup_data->flags;
 787        task->tk_ops = task_setup_data->callback_ops;
 788        task->tk_calldata = task_setup_data->callback_data;
 789        INIT_LIST_HEAD(&task->tk_task);
 790
 791        /* Initialize retry counters */
 792        task->tk_garb_retry = 2;
 793        task->tk_cred_retry = 2;
 794
 795        task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW;
 796        task->tk_owner = current->tgid;
 797
 798        /* Initialize workqueue for async tasks */
 799        task->tk_workqueue = task_setup_data->workqueue;
 800
 801        task->tk_client = task_setup_data->rpc_client;
 802        if (task->tk_client != NULL) {
 803                kref_get(&task->tk_client->cl_kref);
 804                if (task->tk_client->cl_softrtry)
 805                        task->tk_flags |= RPC_TASK_SOFT;
 806        }
 807
 808        if (task->tk_ops->rpc_call_prepare != NULL)
 809                task->tk_action = rpc_prepare_task;
 810
 811        if (task_setup_data->rpc_message != NULL) {
 812                task->tk_msg.rpc_proc = task_setup_data->rpc_message->rpc_proc;
 813                task->tk_msg.rpc_argp = task_setup_data->rpc_message->rpc_argp;
 814                task->tk_msg.rpc_resp = task_setup_data->rpc_message->rpc_resp;
 815                /* Bind the user cred */
 816                rpcauth_bindcred(task, task_setup_data->rpc_message->rpc_cred, task_setup_data->flags);
 817                if (task->tk_action == NULL)
 818                        rpc_call_start(task);
 819        }
 820
 821        /* starting timestamp */
 822        task->tk_start = jiffies;
 823
 824        dprintk("RPC:       new task initialized, procpid %u\n",
 825                                task_pid_nr(current));
 826}
 827
 828static struct rpc_task *
 829rpc_alloc_task(void)
 830{
 831        return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
 832}
 833
 834/*
 835 * Create a new task for the specified client.
 836 */
 837struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
 838{
 839        struct rpc_task *task = setup_data->task;
 840        unsigned short flags = 0;
 841
 842        if (task == NULL) {
 843                task = rpc_alloc_task();
 844                if (task == NULL)
 845                        goto out;
 846                flags = RPC_TASK_DYNAMIC;
 847        }
 848
 849        rpc_init_task(task, setup_data);
 850
 851        task->tk_flags |= flags;
 852        dprintk("RPC:       allocated task %p\n", task);
 853out:
 854        return task;
 855}
 856
 857static void rpc_free_task(struct rpc_task *task)
 858{
 859        const struct rpc_call_ops *tk_ops = task->tk_ops;
 860        void *calldata = task->tk_calldata;
 861
 862        if (task->tk_flags & RPC_TASK_DYNAMIC) {
 863                dprintk("RPC: %5u freeing task\n", task->tk_pid);
 864                mempool_free(task, rpc_task_mempool);
 865        }
 866        rpc_release_calldata(tk_ops, calldata);
 867}
 868
 869static void rpc_async_release(struct work_struct *work)
 870{
 871        rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
 872}
 873
 874void rpc_put_task(struct rpc_task *task)
 875{
 876        if (!atomic_dec_and_test(&task->tk_count))
 877                return;
 878        /* Release resources */
 879        if (task->tk_rqstp)
 880                xprt_release(task);
 881        if (task->tk_msg.rpc_cred)
 882                rpcauth_unbindcred(task);
 883        if (task->tk_client) {
 884                rpc_release_client(task->tk_client);
 885                task->tk_client = NULL;
 886        }
 887        if (task->tk_workqueue != NULL) {
 888                INIT_WORK(&task->u.tk_work, rpc_async_release);
 889                queue_work(task->tk_workqueue, &task->u.tk_work);
 890        } else
 891                rpc_free_task(task);
 892}
 893EXPORT_SYMBOL_GPL(rpc_put_task);
 894
 895static void rpc_release_task(struct rpc_task *task)
 896{
 897#ifdef RPC_DEBUG
 898        BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
 899#endif
 900        dprintk("RPC: %5u release task\n", task->tk_pid);
 901
 902        if (!list_empty(&task->tk_task)) {
 903                struct rpc_clnt *clnt = task->tk_client;
 904                /* Remove from client task list */
 905                spin_lock(&clnt->cl_lock);
 906                list_del(&task->tk_task);
 907                spin_unlock(&clnt->cl_lock);
 908        }
 909        BUG_ON (RPC_IS_QUEUED(task));
 910
 911#ifdef RPC_DEBUG
 912        task->tk_magic = 0;
 913#endif
 914        /* Wake up anyone who is waiting for task completion */
 915        rpc_mark_complete_task(task);
 916
 917        rpc_put_task(task);
 918}
 919
 920/*
 921 * Kill all tasks for the given client.
 922 * XXX: kill their descendants as well?
 923 */
 924void rpc_killall_tasks(struct rpc_clnt *clnt)
 925{
 926        struct rpc_task *rovr;
 927
 928
 929        if (list_empty(&clnt->cl_tasks))
 930                return;
 931        dprintk("RPC:       killing all tasks for client %p\n", clnt);
 932        /*
 933         * Spin lock all_tasks to prevent changes...
 934         */
 935        spin_lock(&clnt->cl_lock);
 936        list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
 937                if (! RPC_IS_ACTIVATED(rovr))
 938                        continue;
 939                if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
 940                        rovr->tk_flags |= RPC_TASK_KILLED;
 941                        rpc_exit(rovr, -EIO);
 942                        rpc_wake_up_task(rovr);
 943                }
 944        }
 945        spin_unlock(&clnt->cl_lock);
 946}
 947EXPORT_SYMBOL_GPL(rpc_killall_tasks);
 948
 949int rpciod_up(void)
 950{
 951        return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
 952}
 953
 954void rpciod_down(void)
 955{
 956        module_put(THIS_MODULE);
 957}
 958
 959/*
 960 * Start up the rpciod workqueue.
 961 */
 962static int rpciod_start(void)
 963{
 964        struct workqueue_struct *wq;
 965
 966        /*
 967         * Create the rpciod thread and wait for it to start.
 968         */
 969        dprintk("RPC:       creating workqueue rpciod\n");
 970        wq = create_workqueue("rpciod");
 971        rpciod_workqueue = wq;
 972        return rpciod_workqueue != NULL;
 973}
 974
 975static void rpciod_stop(void)
 976{
 977        struct workqueue_struct *wq = NULL;
 978
 979        if (rpciod_workqueue == NULL)
 980                return;
 981        dprintk("RPC:       destroying workqueue rpciod\n");
 982
 983        wq = rpciod_workqueue;
 984        rpciod_workqueue = NULL;
 985        destroy_workqueue(wq);
 986}
 987
 988void
 989rpc_destroy_mempool(void)
 990{
 991        rpciod_stop();
 992        if (rpc_buffer_mempool)
 993                mempool_destroy(rpc_buffer_mempool);
 994        if (rpc_task_mempool)
 995                mempool_destroy(rpc_task_mempool);
 996        if (rpc_task_slabp)
 997                kmem_cache_destroy(rpc_task_slabp);
 998        if (rpc_buffer_slabp)
 999                kmem_cache_destroy(rpc_buffer_slabp);
1000        rpc_destroy_wait_queue(&delay_queue);
1001}
1002
1003int
1004rpc_init_mempool(void)
1005{
1006        /*
1007         * The following is not strictly a mempool initialisation,
1008         * but there is no harm in doing it here
1009         */
1010        rpc_init_wait_queue(&delay_queue, "delayq");
1011        if (!rpciod_start())
1012                goto err_nomem;
1013
1014        rpc_task_slabp = kmem_cache_create("rpc_tasks",
1015                                             sizeof(struct rpc_task),
1016                                             0, SLAB_HWCACHE_ALIGN,
1017                                             NULL);
1018        if (!rpc_task_slabp)
1019                goto err_nomem;
1020        rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
1021                                             RPC_BUFFER_MAXSIZE,
1022                                             0, SLAB_HWCACHE_ALIGN,
1023                                             NULL);
1024        if (!rpc_buffer_slabp)
1025                goto err_nomem;
1026        rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
1027                                                    rpc_task_slabp);
1028        if (!rpc_task_mempool)
1029                goto err_nomem;
1030        rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
1031                                                      rpc_buffer_slabp);
1032        if (!rpc_buffer_mempool)
1033                goto err_nomem;
1034        return 0;
1035err_nomem:
1036        rpc_destroy_mempool();
1037        return -ENOMEM;
1038}
1039