linux/kernel/workqueue.c
<<
>>
Prefs
   1/*
   2 * linux/kernel/workqueue.c
   3 *
   4 * Generic mechanism for defining kernel helper threads for running
   5 * arbitrary tasks in process context.
   6 *
   7 * Started by Ingo Molnar, Copyright (C) 2002
   8 *
   9 * Derived from the taskqueue/keventd code by:
  10 *
  11 *   David Woodhouse <dwmw2@infradead.org>
  12 *   Andrew Morton <andrewm@uow.edu.au>
  13 *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
  14 *   Theodore Ts'o <tytso@mit.edu>
  15 *
  16 * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>.
  17 */
  18
  19#include <linux/module.h>
  20#include <linux/kernel.h>
  21#include <linux/sched.h>
  22#include <linux/init.h>
  23#include <linux/signal.h>
  24#include <linux/completion.h>
  25#include <linux/workqueue.h>
  26#include <linux/slab.h>
  27#include <linux/cpu.h>
  28#include <linux/notifier.h>
  29#include <linux/kthread.h>
  30#include <linux/hardirq.h>
  31#include <linux/mempolicy.h>
  32#include <linux/freezer.h>
  33#include <linux/kallsyms.h>
  34#include <linux/debug_locks.h>
  35#include <linux/lockdep.h>
  36
  37/*
  38 * The per-CPU workqueue (if single thread, we always use the first
  39 * possible cpu).
  40 */
  41struct cpu_workqueue_struct {
  42
  43        spinlock_t lock;
  44
  45        struct list_head worklist;
  46        wait_queue_head_t more_work;
  47        struct work_struct *current_work;
  48
  49        struct workqueue_struct *wq;
  50        struct task_struct *thread;
  51
  52        int run_depth;          /* Detect run_workqueue() recursion depth */
  53} ____cacheline_aligned;
  54
  55/*
  56 * The externally visible workqueue abstraction is an array of
  57 * per-CPU workqueues:
  58 */
  59struct workqueue_struct {
  60        struct cpu_workqueue_struct *cpu_wq;
  61        struct list_head list;
  62        const char *name;
  63        int singlethread;
  64        int freezeable;         /* Freeze threads during suspend */
  65#ifdef CONFIG_LOCKDEP
  66        struct lockdep_map lockdep_map;
  67#endif
  68};
  69
  70/* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
  71   threads to each one as cpus come/go. */
  72static DEFINE_MUTEX(workqueue_mutex);
  73static LIST_HEAD(workqueues);
  74
  75static int singlethread_cpu __read_mostly;
  76static cpumask_t cpu_singlethread_map __read_mostly;
  77/*
  78 * _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD
  79 * flushes cwq->worklist. This means that flush_workqueue/wait_on_work
  80 * which comes in between can't use for_each_online_cpu(). We could
  81 * use cpu_possible_map, the cpumask below is more a documentation
  82 * than optimization.
  83 */
  84static cpumask_t cpu_populated_map __read_mostly;
  85
  86/* If it's single threaded, it isn't in the list of workqueues. */
  87static inline int is_single_threaded(struct workqueue_struct *wq)
  88{
  89        return wq->singlethread;
  90}
  91
  92static const cpumask_t *wq_cpu_map(struct workqueue_struct *wq)
  93{
  94        return is_single_threaded(wq)
  95                ? &cpu_singlethread_map : &cpu_populated_map;
  96}
  97
  98static
  99struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu)
 100{
 101        if (unlikely(is_single_threaded(wq)))
 102                cpu = singlethread_cpu;
 103        return per_cpu_ptr(wq->cpu_wq, cpu);
 104}
 105
 106/*
 107 * Set the workqueue on which a work item is to be run
 108 * - Must *only* be called if the pending flag is set
 109 */
 110static inline void set_wq_data(struct work_struct *work,
 111                                struct cpu_workqueue_struct *cwq)
 112{
 113        unsigned long new;
 114
 115        BUG_ON(!work_pending(work));
 116
 117        new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING);
 118        new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work);
 119        atomic_long_set(&work->data, new);
 120}
 121
 122static inline
 123struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
 124{
 125        return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK);
 126}
 127
 128static void insert_work(struct cpu_workqueue_struct *cwq,
 129                                struct work_struct *work, int tail)
 130{
 131        set_wq_data(work, cwq);
 132        /*
 133         * Ensure that we get the right work->data if we see the
 134         * result of list_add() below, see try_to_grab_pending().
 135         */
 136        smp_wmb();
 137        if (tail)
 138                list_add_tail(&work->entry, &cwq->worklist);
 139        else
 140                list_add(&work->entry, &cwq->worklist);
 141        wake_up(&cwq->more_work);
 142}
 143
 144/* Preempt must be disabled. */
 145static void __queue_work(struct cpu_workqueue_struct *cwq,
 146                         struct work_struct *work)
 147{
 148        unsigned long flags;
 149
 150        spin_lock_irqsave(&cwq->lock, flags);
 151        insert_work(cwq, work, 1);
 152        spin_unlock_irqrestore(&cwq->lock, flags);
 153}
 154
 155/**
 156 * queue_work - queue work on a workqueue
 157 * @wq: workqueue to use
 158 * @work: work to queue
 159 *
 160 * Returns 0 if @work was already on a queue, non-zero otherwise.
 161 *
 162 * We queue the work to the CPU it was submitted, but there is no
 163 * guarantee that it will be processed by that CPU.
 164 */
 165int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
 166{
 167        int ret = 0;
 168
 169        if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
 170                BUG_ON(!list_empty(&work->entry));
 171                __queue_work(wq_per_cpu(wq, get_cpu()), work);
 172                put_cpu();
 173                ret = 1;
 174        }
 175        return ret;
 176}
 177EXPORT_SYMBOL_GPL(queue_work);
 178
 179void delayed_work_timer_fn(unsigned long __data)
 180{
 181        struct delayed_work *dwork = (struct delayed_work *)__data;
 182        struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);
 183        struct workqueue_struct *wq = cwq->wq;
 184
 185        __queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work);
 186}
 187
 188/**
 189 * queue_delayed_work - queue work on a workqueue after delay
 190 * @wq: workqueue to use
 191 * @dwork: delayable work to queue
 192 * @delay: number of jiffies to wait before queueing
 193 *
 194 * Returns 0 if @work was already on a queue, non-zero otherwise.
 195 */
 196int fastcall queue_delayed_work(struct workqueue_struct *wq,
 197                        struct delayed_work *dwork, unsigned long delay)
 198{
 199        timer_stats_timer_set_start_info(&dwork->timer);
 200        if (delay == 0)
 201                return queue_work(wq, &dwork->work);
 202
 203        return queue_delayed_work_on(-1, wq, dwork, delay);
 204}
 205EXPORT_SYMBOL_GPL(queue_delayed_work);
 206
 207/**
 208 * queue_delayed_work_on - queue work on specific CPU after delay
 209 * @cpu: CPU number to execute work on
 210 * @wq: workqueue to use
 211 * @dwork: work to queue
 212 * @delay: number of jiffies to wait before queueing
 213 *
 214 * Returns 0 if @work was already on a queue, non-zero otherwise.
 215 */
 216int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
 217                        struct delayed_work *dwork, unsigned long delay)
 218{
 219        int ret = 0;
 220        struct timer_list *timer = &dwork->timer;
 221        struct work_struct *work = &dwork->work;
 222
 223        if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
 224                BUG_ON(timer_pending(timer));
 225                BUG_ON(!list_empty(&work->entry));
 226
 227                /* This stores cwq for the moment, for the timer_fn */
 228                set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id()));
 229                timer->expires = jiffies + delay;
 230                timer->data = (unsigned long)dwork;
 231                timer->function = delayed_work_timer_fn;
 232
 233                if (unlikely(cpu >= 0))
 234                        add_timer_on(timer, cpu);
 235                else
 236                        add_timer(timer);
 237                ret = 1;
 238        }
 239        return ret;
 240}
 241EXPORT_SYMBOL_GPL(queue_delayed_work_on);
 242
 243static void run_workqueue(struct cpu_workqueue_struct *cwq)
 244{
 245        spin_lock_irq(&cwq->lock);
 246        cwq->run_depth++;
 247        if (cwq->run_depth > 3) {
 248                /* morton gets to eat his hat */
 249                printk("%s: recursion depth exceeded: %d\n",
 250                        __FUNCTION__, cwq->run_depth);
 251                dump_stack();
 252        }
 253        while (!list_empty(&cwq->worklist)) {
 254                struct work_struct *work = list_entry(cwq->worklist.next,
 255                                                struct work_struct, entry);
 256                work_func_t f = work->func;
 257#ifdef CONFIG_LOCKDEP
 258                /*
 259                 * It is permissible to free the struct work_struct
 260                 * from inside the function that is called from it,
 261                 * this we need to take into account for lockdep too.
 262                 * To avoid bogus "held lock freed" warnings as well
 263                 * as problems when looking into work->lockdep_map,
 264                 * make a copy and use that here.
 265                 */
 266                struct lockdep_map lockdep_map = work->lockdep_map;
 267#endif
 268
 269                cwq->current_work = work;
 270                list_del_init(cwq->worklist.next);
 271                spin_unlock_irq(&cwq->lock);
 272
 273                BUG_ON(get_wq_data(work) != cwq);
 274                work_clear_pending(work);
 275                lock_acquire(&cwq->wq->lockdep_map, 0, 0, 0, 2, _THIS_IP_);
 276                lock_acquire(&lockdep_map, 0, 0, 0, 2, _THIS_IP_);
 277                f(work);
 278                lock_release(&lockdep_map, 1, _THIS_IP_);
 279                lock_release(&cwq->wq->lockdep_map, 1, _THIS_IP_);
 280
 281                if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
 282                        printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
 283                                        "%s/0x%08x/%d\n",
 284                                        current->comm, preempt_count(),
 285                                        task_pid_nr(current));
 286                        printk(KERN_ERR "    last function: ");
 287                        print_symbol("%s\n", (unsigned long)f);
 288                        debug_show_held_locks(current);
 289                        dump_stack();
 290                }
 291
 292                spin_lock_irq(&cwq->lock);
 293                cwq->current_work = NULL;
 294        }
 295        cwq->run_depth--;
 296        spin_unlock_irq(&cwq->lock);
 297}
 298
 299static int worker_thread(void *__cwq)
 300{
 301        struct cpu_workqueue_struct *cwq = __cwq;
 302        DEFINE_WAIT(wait);
 303
 304        if (cwq->wq->freezeable)
 305                set_freezable();
 306
 307        set_user_nice(current, -5);
 308
 309        for (;;) {
 310                prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
 311                if (!freezing(current) &&
 312                    !kthread_should_stop() &&
 313                    list_empty(&cwq->worklist))
 314                        schedule();
 315                finish_wait(&cwq->more_work, &wait);
 316
 317                try_to_freeze();
 318
 319                if (kthread_should_stop())
 320                        break;
 321
 322                run_workqueue(cwq);
 323        }
 324
 325        return 0;
 326}
 327
 328struct wq_barrier {
 329        struct work_struct      work;
 330        struct completion       done;
 331};
 332
 333static void wq_barrier_func(struct work_struct *work)
 334{
 335        struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
 336        complete(&barr->done);
 337}
 338
 339static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
 340                                        struct wq_barrier *barr, int tail)
 341{
 342        INIT_WORK(&barr->work, wq_barrier_func);
 343        __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));
 344
 345        init_completion(&barr->done);
 346
 347        insert_work(cwq, &barr->work, tail);
 348}
 349
 350static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
 351{
 352        int active;
 353
 354        if (cwq->thread == current) {
 355                /*
 356                 * Probably keventd trying to flush its own queue. So simply run
 357                 * it by hand rather than deadlocking.
 358                 */
 359                run_workqueue(cwq);
 360                active = 1;
 361        } else {
 362                struct wq_barrier barr;
 363
 364                active = 0;
 365                spin_lock_irq(&cwq->lock);
 366                if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
 367                        insert_wq_barrier(cwq, &barr, 1);
 368                        active = 1;
 369                }
 370                spin_unlock_irq(&cwq->lock);
 371
 372                if (active)
 373                        wait_for_completion(&barr.done);
 374        }
 375
 376        return active;
 377}
 378
 379/**
 380 * flush_workqueue - ensure that any scheduled work has run to completion.
 381 * @wq: workqueue to flush
 382 *
 383 * Forces execution of the workqueue and blocks until its completion.
 384 * This is typically used in driver shutdown handlers.
 385 *
 386 * We sleep until all works which were queued on entry have been handled,
 387 * but we are not livelocked by new incoming ones.
 388 *
 389 * This function used to run the workqueues itself.  Now we just wait for the
 390 * helper threads to do it.
 391 */
 392void fastcall flush_workqueue(struct workqueue_struct *wq)
 393{
 394        const cpumask_t *cpu_map = wq_cpu_map(wq);
 395        int cpu;
 396
 397        might_sleep();
 398        lock_acquire(&wq->lockdep_map, 0, 0, 0, 2, _THIS_IP_);
 399        lock_release(&wq->lockdep_map, 1, _THIS_IP_);
 400        for_each_cpu_mask(cpu, *cpu_map)
 401                flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
 402}
 403EXPORT_SYMBOL_GPL(flush_workqueue);
 404
 405/*
 406 * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit,
 407 * so this work can't be re-armed in any way.
 408 */
 409static int try_to_grab_pending(struct work_struct *work)
 410{
 411        struct cpu_workqueue_struct *cwq;
 412        int ret = -1;
 413
 414        if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work)))
 415                return 0;
 416
 417        /*
 418         * The queueing is in progress, or it is already queued. Try to
 419         * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
 420         */
 421
 422        cwq = get_wq_data(work);
 423        if (!cwq)
 424                return ret;
 425
 426        spin_lock_irq(&cwq->lock);
 427        if (!list_empty(&work->entry)) {
 428                /*
 429                 * This work is queued, but perhaps we locked the wrong cwq.
 430                 * In that case we must see the new value after rmb(), see
 431                 * insert_work()->wmb().
 432                 */
 433                smp_rmb();
 434                if (cwq == get_wq_data(work)) {
 435                        list_del_init(&work->entry);
 436                        ret = 1;
 437                }
 438        }
 439        spin_unlock_irq(&cwq->lock);
 440
 441        return ret;
 442}
 443
 444static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
 445                                struct work_struct *work)
 446{
 447        struct wq_barrier barr;
 448        int running = 0;
 449
 450        spin_lock_irq(&cwq->lock);
 451        if (unlikely(cwq->current_work == work)) {
 452                insert_wq_barrier(cwq, &barr, 0);
 453                running = 1;
 454        }
 455        spin_unlock_irq(&cwq->lock);
 456
 457        if (unlikely(running))
 458                wait_for_completion(&barr.done);
 459}
 460
 461static void wait_on_work(struct work_struct *work)
 462{
 463        struct cpu_workqueue_struct *cwq;
 464        struct workqueue_struct *wq;
 465        const cpumask_t *cpu_map;
 466        int cpu;
 467
 468        might_sleep();
 469
 470        lock_acquire(&work->lockdep_map, 0, 0, 0, 2, _THIS_IP_);
 471        lock_release(&work->lockdep_map, 1, _THIS_IP_);
 472
 473        cwq = get_wq_data(work);
 474        if (!cwq)
 475                return;
 476
 477        wq = cwq->wq;
 478        cpu_map = wq_cpu_map(wq);
 479
 480        for_each_cpu_mask(cpu, *cpu_map)
 481                wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
 482}
 483
 484static int __cancel_work_timer(struct work_struct *work,
 485                                struct timer_list* timer)
 486{
 487        int ret;
 488
 489        do {
 490                ret = (timer && likely(del_timer(timer)));
 491                if (!ret)
 492                        ret = try_to_grab_pending(work);
 493                wait_on_work(work);
 494        } while (unlikely(ret < 0));
 495
 496        work_clear_pending(work);
 497        return ret;
 498}
 499
 500/**
 501 * cancel_work_sync - block until a work_struct's callback has terminated
 502 * @work: the work which is to be flushed
 503 *
 504 * Returns true if @work was pending.
 505 *
 506 * cancel_work_sync() will cancel the work if it is queued. If the work's
 507 * callback appears to be running, cancel_work_sync() will block until it
 508 * has completed.
 509 *
 510 * It is possible to use this function if the work re-queues itself. It can
 511 * cancel the work even if it migrates to another workqueue, however in that
 512 * case it only guarantees that work->func() has completed on the last queued
 513 * workqueue.
 514 *
 515 * cancel_work_sync(&delayed_work->work) should be used only if ->timer is not
 516 * pending, otherwise it goes into a busy-wait loop until the timer expires.
 517 *
 518 * The caller must ensure that workqueue_struct on which this work was last
 519 * queued can't be destroyed before this function returns.
 520 */
 521int cancel_work_sync(struct work_struct *work)
 522{
 523        return __cancel_work_timer(work, NULL);
 524}
 525EXPORT_SYMBOL_GPL(cancel_work_sync);
 526
 527/**
 528 * cancel_delayed_work_sync - reliably kill off a delayed work.
 529 * @dwork: the delayed work struct
 530 *
 531 * Returns true if @dwork was pending.
 532 *
 533 * It is possible to use this function if @dwork rearms itself via queue_work()
 534 * or queue_delayed_work(). See also the comment for cancel_work_sync().
 535 */
 536int cancel_delayed_work_sync(struct delayed_work *dwork)
 537{
 538        return __cancel_work_timer(&dwork->work, &dwork->timer);
 539}
 540EXPORT_SYMBOL(cancel_delayed_work_sync);
 541
 542static struct workqueue_struct *keventd_wq __read_mostly;
 543
 544/**
 545 * schedule_work - put work task in global workqueue
 546 * @work: job to be done
 547 *
 548 * This puts a job in the kernel-global workqueue.
 549 */
 550int fastcall schedule_work(struct work_struct *work)
 551{
 552        return queue_work(keventd_wq, work);
 553}
 554EXPORT_SYMBOL(schedule_work);
 555
 556/**
 557 * schedule_delayed_work - put work task in global workqueue after delay
 558 * @dwork: job to be done
 559 * @delay: number of jiffies to wait or 0 for immediate execution
 560 *
 561 * After waiting for a given time this puts a job in the kernel-global
 562 * workqueue.
 563 */
 564int fastcall schedule_delayed_work(struct delayed_work *dwork,
 565                                        unsigned long delay)
 566{
 567        timer_stats_timer_set_start_info(&dwork->timer);
 568        return queue_delayed_work(keventd_wq, dwork, delay);
 569}
 570EXPORT_SYMBOL(schedule_delayed_work);
 571
 572/**
 573 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
 574 * @cpu: cpu to use
 575 * @dwork: job to be done
 576 * @delay: number of jiffies to wait
 577 *
 578 * After waiting for a given time this puts a job in the kernel-global
 579 * workqueue on the specified CPU.
 580 */
 581int schedule_delayed_work_on(int cpu,
 582                        struct delayed_work *dwork, unsigned long delay)
 583{
 584        return queue_delayed_work_on(cpu, keventd_wq, dwork, delay);
 585}
 586EXPORT_SYMBOL(schedule_delayed_work_on);
 587
 588/**
 589 * schedule_on_each_cpu - call a function on each online CPU from keventd
 590 * @func: the function to call
 591 *
 592 * Returns zero on success.
 593 * Returns -ve errno on failure.
 594 *
 595 * Appears to be racy against CPU hotplug.
 596 *
 597 * schedule_on_each_cpu() is very slow.
 598 */
 599int schedule_on_each_cpu(work_func_t func)
 600{
 601        int cpu;
 602        struct work_struct *works;
 603
 604        works = alloc_percpu(struct work_struct);
 605        if (!works)
 606                return -ENOMEM;
 607
 608        preempt_disable();              /* CPU hotplug */
 609        for_each_online_cpu(cpu) {
 610                struct work_struct *work = per_cpu_ptr(works, cpu);
 611
 612                INIT_WORK(work, func);
 613                set_bit(WORK_STRUCT_PENDING, work_data_bits(work));
 614                __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), work);
 615        }
 616        preempt_enable();
 617        flush_workqueue(keventd_wq);
 618        free_percpu(works);
 619        return 0;
 620}
 621
 622void flush_scheduled_work(void)
 623{
 624        flush_workqueue(keventd_wq);
 625}
 626EXPORT_SYMBOL(flush_scheduled_work);
 627
 628/**
 629 * execute_in_process_context - reliably execute the routine with user context
 630 * @fn:         the function to execute
 631 * @ew:         guaranteed storage for the execute work structure (must
 632 *              be available when the work executes)
 633 *
 634 * Executes the function immediately if process context is available,
 635 * otherwise schedules the function for delayed execution.
 636 *
 637 * Returns:     0 - function was executed
 638 *              1 - function was scheduled for execution
 639 */
 640int execute_in_process_context(work_func_t fn, struct execute_work *ew)
 641{
 642        if (!in_interrupt()) {
 643                fn(&ew->work);
 644                return 0;
 645        }
 646
 647        INIT_WORK(&ew->work, fn);
 648        schedule_work(&ew->work);
 649
 650        return 1;
 651}
 652EXPORT_SYMBOL_GPL(execute_in_process_context);
 653
 654int keventd_up(void)
 655{
 656        return keventd_wq != NULL;
 657}
 658
 659int current_is_keventd(void)
 660{
 661        struct cpu_workqueue_struct *cwq;
 662        int cpu = raw_smp_processor_id(); /* preempt-safe: keventd is per-cpu */
 663        int ret = 0;
 664
 665        BUG_ON(!keventd_wq);
 666
 667        cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
 668        if (current == cwq->thread)
 669                ret = 1;
 670
 671        return ret;
 672
 673}
 674
 675static struct cpu_workqueue_struct *
 676init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
 677{
 678        struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
 679
 680        cwq->wq = wq;
 681        spin_lock_init(&cwq->lock);
 682        INIT_LIST_HEAD(&cwq->worklist);
 683        init_waitqueue_head(&cwq->more_work);
 684
 685        return cwq;
 686}
 687
 688static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
 689{
 690        struct workqueue_struct *wq = cwq->wq;
 691        const char *fmt = is_single_threaded(wq) ? "%s" : "%s/%d";
 692        struct task_struct *p;
 693
 694        p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
 695        /*
 696         * Nobody can add the work_struct to this cwq,
 697         *      if (caller is __create_workqueue)
 698         *              nobody should see this wq
 699         *      else // caller is CPU_UP_PREPARE
 700         *              cpu is not on cpu_online_map
 701         * so we can abort safely.
 702         */
 703        if (IS_ERR(p))
 704                return PTR_ERR(p);
 705
 706        cwq->thread = p;
 707
 708        return 0;
 709}
 710
 711static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
 712{
 713        struct task_struct *p = cwq->thread;
 714
 715        if (p != NULL) {
 716                if (cpu >= 0)
 717                        kthread_bind(p, cpu);
 718                wake_up_process(p);
 719        }
 720}
 721
 722struct workqueue_struct *__create_workqueue_key(const char *name,
 723                                                int singlethread,
 724                                                int freezeable,
 725                                                struct lock_class_key *key,
 726                                                const char *lock_name)
 727{
 728        struct workqueue_struct *wq;
 729        struct cpu_workqueue_struct *cwq;
 730        int err = 0, cpu;
 731
 732        wq = kzalloc(sizeof(*wq), GFP_KERNEL);
 733        if (!wq)
 734                return NULL;
 735
 736        wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
 737        if (!wq->cpu_wq) {
 738                kfree(wq);
 739                return NULL;
 740        }
 741
 742        wq->name = name;
 743        lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
 744        wq->singlethread = singlethread;
 745        wq->freezeable = freezeable;
 746        INIT_LIST_HEAD(&wq->list);
 747
 748        if (singlethread) {
 749                cwq = init_cpu_workqueue(wq, singlethread_cpu);
 750                err = create_workqueue_thread(cwq, singlethread_cpu);
 751                start_workqueue_thread(cwq, -1);
 752        } else {
 753                mutex_lock(&workqueue_mutex);
 754                list_add(&wq->list, &workqueues);
 755
 756                for_each_possible_cpu(cpu) {
 757                        cwq = init_cpu_workqueue(wq, cpu);
 758                        if (err || !cpu_online(cpu))
 759                                continue;
 760                        err = create_workqueue_thread(cwq, cpu);
 761                        start_workqueue_thread(cwq, cpu);
 762                }
 763                mutex_unlock(&workqueue_mutex);
 764        }
 765
 766        if (err) {
 767                destroy_workqueue(wq);
 768                wq = NULL;
 769        }
 770        return wq;
 771}
 772EXPORT_SYMBOL_GPL(__create_workqueue_key);
 773
 774static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
 775{
 776        /*
 777         * Our caller is either destroy_workqueue() or CPU_DEAD,
 778         * workqueue_mutex protects cwq->thread
 779         */
 780        if (cwq->thread == NULL)
 781                return;
 782
 783        lock_acquire(&cwq->wq->lockdep_map, 0, 0, 0, 2, _THIS_IP_);
 784        lock_release(&cwq->wq->lockdep_map, 1, _THIS_IP_);
 785
 786        flush_cpu_workqueue(cwq);
 787        /*
 788         * If the caller is CPU_DEAD and cwq->worklist was not empty,
 789         * a concurrent flush_workqueue() can insert a barrier after us.
 790         * However, in that case run_workqueue() won't return and check
 791         * kthread_should_stop() until it flushes all work_struct's.
 792         * When ->worklist becomes empty it is safe to exit because no
 793         * more work_structs can be queued on this cwq: flush_workqueue
 794         * checks list_empty(), and a "normal" queue_work() can't use
 795         * a dead CPU.
 796         */
 797        kthread_stop(cwq->thread);
 798        cwq->thread = NULL;
 799}
 800
 801/**
 802 * destroy_workqueue - safely terminate a workqueue
 803 * @wq: target workqueue
 804 *
 805 * Safely destroy a workqueue. All work currently pending will be done first.
 806 */
 807void destroy_workqueue(struct workqueue_struct *wq)
 808{
 809        const cpumask_t *cpu_map = wq_cpu_map(wq);
 810        struct cpu_workqueue_struct *cwq;
 811        int cpu;
 812
 813        mutex_lock(&workqueue_mutex);
 814        list_del(&wq->list);
 815        mutex_unlock(&workqueue_mutex);
 816
 817        for_each_cpu_mask(cpu, *cpu_map) {
 818                cwq = per_cpu_ptr(wq->cpu_wq, cpu);
 819                cleanup_workqueue_thread(cwq, cpu);
 820        }
 821
 822        free_percpu(wq->cpu_wq);
 823        kfree(wq);
 824}
 825EXPORT_SYMBOL_GPL(destroy_workqueue);
 826
 827static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
 828                                                unsigned long action,
 829                                                void *hcpu)
 830{
 831        unsigned int cpu = (unsigned long)hcpu;
 832        struct cpu_workqueue_struct *cwq;
 833        struct workqueue_struct *wq;
 834
 835        action &= ~CPU_TASKS_FROZEN;
 836
 837        switch (action) {
 838        case CPU_LOCK_ACQUIRE:
 839                mutex_lock(&workqueue_mutex);
 840                return NOTIFY_OK;
 841
 842        case CPU_LOCK_RELEASE:
 843                mutex_unlock(&workqueue_mutex);
 844                return NOTIFY_OK;
 845
 846        case CPU_UP_PREPARE:
 847                cpu_set(cpu, cpu_populated_map);
 848        }
 849
 850        list_for_each_entry(wq, &workqueues, list) {
 851                cwq = per_cpu_ptr(wq->cpu_wq, cpu);
 852
 853                switch (action) {
 854                case CPU_UP_PREPARE:
 855                        if (!create_workqueue_thread(cwq, cpu))
 856                                break;
 857                        printk(KERN_ERR "workqueue for %i failed\n", cpu);
 858                        return NOTIFY_BAD;
 859
 860                case CPU_ONLINE:
 861                        start_workqueue_thread(cwq, cpu);
 862                        break;
 863
 864                case CPU_UP_CANCELED:
 865                        start_workqueue_thread(cwq, -1);
 866                case CPU_DEAD:
 867                        cleanup_workqueue_thread(cwq, cpu);
 868                        break;
 869                }
 870        }
 871
 872        return NOTIFY_OK;
 873}
 874
 875void __init init_workqueues(void)
 876{
 877        cpu_populated_map = cpu_online_map;
 878        singlethread_cpu = first_cpu(cpu_possible_map);
 879        cpu_singlethread_map = cpumask_of_cpu(singlethread_cpu);
 880        hotcpu_notifier(workqueue_cpu_callback, 0);
 881        keventd_wq = create_workqueue("events");
 882        BUG_ON(!keventd_wq);
 883}
 884