1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19#include <linux/module.h>
20#include <linux/kernel.h>
21#include <linux/sched.h>
22#include <linux/init.h>
23#include <linux/signal.h>
24#include <linux/completion.h>
25#include <linux/workqueue.h>
26#include <linux/slab.h>
27#include <linux/cpu.h>
28#include <linux/notifier.h>
29#include <linux/kthread.h>
30#include <linux/hardirq.h>
31#include <linux/mempolicy.h>
32#include <linux/freezer.h>
33#include <linux/kallsyms.h>
34#include <linux/debug_locks.h>
35#include <linux/lockdep.h>
36#define CREATE_TRACE_POINTS
37#include <trace/events/workqueue.h>
38
39
40
41
42
43struct cpu_workqueue_struct {
44
45 spinlock_t lock;
46
47 struct list_head worklist;
48 wait_queue_head_t more_work;
49 struct work_struct *current_work;
50
51 struct workqueue_struct *wq;
52 struct task_struct *thread;
53} ____cacheline_aligned;
54
55
56
57
58
59struct workqueue_struct {
60 struct cpu_workqueue_struct *cpu_wq;
61 struct list_head list;
62 const char *name;
63 int singlethread;
64 int freezeable;
65 int rt;
66#ifdef CONFIG_LOCKDEP
67 struct lockdep_map lockdep_map;
68#endif
69};
70
71
72static DEFINE_SPINLOCK(workqueue_lock);
73static LIST_HEAD(workqueues);
74
75static int singlethread_cpu __read_mostly;
76static const struct cpumask *cpu_singlethread_map __read_mostly;
77
78
79
80
81
82
83
84static cpumask_var_t cpu_populated_map __read_mostly;
85
86
87static inline int is_wq_single_threaded(struct workqueue_struct *wq)
88{
89 return wq->singlethread;
90}
91
92static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq)
93{
94 return is_wq_single_threaded(wq)
95 ? cpu_singlethread_map : cpu_populated_map;
96}
97
98static
99struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu)
100{
101 if (unlikely(is_wq_single_threaded(wq)))
102 cpu = singlethread_cpu;
103 return per_cpu_ptr(wq->cpu_wq, cpu);
104}
105
106
107
108
109
110static inline void set_wq_data(struct work_struct *work,
111 struct cpu_workqueue_struct *cwq)
112{
113 unsigned long new;
114
115 BUG_ON(!work_pending(work));
116
117 new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING);
118 new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work);
119 atomic_long_set(&work->data, new);
120}
121
122static inline
123struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
124{
125 return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK);
126}
127
128static void insert_work(struct cpu_workqueue_struct *cwq,
129 struct work_struct *work, struct list_head *head)
130{
131 trace_workqueue_insertion(cwq->thread, work);
132
133 set_wq_data(work, cwq);
134
135
136
137
138 smp_wmb();
139 list_add_tail(&work->entry, head);
140 wake_up(&cwq->more_work);
141}
142
143static void __queue_work(struct cpu_workqueue_struct *cwq,
144 struct work_struct *work)
145{
146 unsigned long flags;
147
148 spin_lock_irqsave(&cwq->lock, flags);
149 insert_work(cwq, work, &cwq->worklist);
150 spin_unlock_irqrestore(&cwq->lock, flags);
151}
152
153
154
155
156
157
158
159
160
161
162
163int queue_work(struct workqueue_struct *wq, struct work_struct *work)
164{
165 int ret;
166
167 ret = queue_work_on(get_cpu(), wq, work);
168 put_cpu();
169
170 return ret;
171}
172EXPORT_SYMBOL_GPL(queue_work);
173
174
175
176
177
178
179
180
181
182
183
184
185int
186queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
187{
188 int ret = 0;
189
190 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
191 BUG_ON(!list_empty(&work->entry));
192 __queue_work(wq_per_cpu(wq, cpu), work);
193 ret = 1;
194 }
195 return ret;
196}
197EXPORT_SYMBOL_GPL(queue_work_on);
198
199static void delayed_work_timer_fn(unsigned long __data)
200{
201 struct delayed_work *dwork = (struct delayed_work *)__data;
202 struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);
203 struct workqueue_struct *wq = cwq->wq;
204
205 __queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work);
206}
207
208
209
210
211
212
213
214
215
216int queue_delayed_work(struct workqueue_struct *wq,
217 struct delayed_work *dwork, unsigned long delay)
218{
219 if (delay == 0)
220 return queue_work(wq, &dwork->work);
221
222 return queue_delayed_work_on(-1, wq, dwork, delay);
223}
224EXPORT_SYMBOL_GPL(queue_delayed_work);
225
226
227
228
229
230
231
232
233
234
235int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
236 struct delayed_work *dwork, unsigned long delay)
237{
238 int ret = 0;
239 struct timer_list *timer = &dwork->timer;
240 struct work_struct *work = &dwork->work;
241
242 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
243 BUG_ON(timer_pending(timer));
244 BUG_ON(!list_empty(&work->entry));
245
246 timer_stats_timer_set_start_info(&dwork->timer);
247
248
249 set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id()));
250 timer->expires = jiffies + delay;
251 timer->data = (unsigned long)dwork;
252 timer->function = delayed_work_timer_fn;
253
254 if (unlikely(cpu >= 0))
255 add_timer_on(timer, cpu);
256 else
257 add_timer(timer);
258 ret = 1;
259 }
260 return ret;
261}
262EXPORT_SYMBOL_GPL(queue_delayed_work_on);
263
264static void run_workqueue(struct cpu_workqueue_struct *cwq)
265{
266 spin_lock_irq(&cwq->lock);
267 while (!list_empty(&cwq->worklist)) {
268 struct work_struct *work = list_entry(cwq->worklist.next,
269 struct work_struct, entry);
270 work_func_t f = work->func;
271#ifdef CONFIG_LOCKDEP
272
273
274
275
276
277
278
279
280 struct lockdep_map lockdep_map = work->lockdep_map;
281#endif
282 trace_workqueue_execution(cwq->thread, work);
283 cwq->current_work = work;
284 list_del_init(cwq->worklist.next);
285 spin_unlock_irq(&cwq->lock);
286
287 BUG_ON(get_wq_data(work) != cwq);
288 work_clear_pending(work);
289 lock_map_acquire(&cwq->wq->lockdep_map);
290 lock_map_acquire(&lockdep_map);
291 f(work);
292 lock_map_release(&lockdep_map);
293 lock_map_release(&cwq->wq->lockdep_map);
294
295 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
296 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
297 "%s/0x%08x/%d\n",
298 current->comm, preempt_count(),
299 task_pid_nr(current));
300 printk(KERN_ERR " last function: ");
301 print_symbol("%s\n", (unsigned long)f);
302 debug_show_held_locks(current);
303 dump_stack();
304 }
305
306 spin_lock_irq(&cwq->lock);
307 cwq->current_work = NULL;
308 }
309 spin_unlock_irq(&cwq->lock);
310}
311
312static int worker_thread(void *__cwq)
313{
314 struct cpu_workqueue_struct *cwq = __cwq;
315 DEFINE_WAIT(wait);
316
317 if (cwq->wq->freezeable)
318 set_freezable();
319
320 for (;;) {
321 prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
322 if (!freezing(current) &&
323 !kthread_should_stop() &&
324 list_empty(&cwq->worklist))
325 schedule();
326 finish_wait(&cwq->more_work, &wait);
327
328 try_to_freeze();
329
330 if (kthread_should_stop())
331 break;
332
333 run_workqueue(cwq);
334 }
335
336 return 0;
337}
338
339struct wq_barrier {
340 struct work_struct work;
341 struct completion done;
342};
343
344static void wq_barrier_func(struct work_struct *work)
345{
346 struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
347 complete(&barr->done);
348}
349
350static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
351 struct wq_barrier *barr, struct list_head *head)
352{
353 INIT_WORK(&barr->work, wq_barrier_func);
354 __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));
355
356 init_completion(&barr->done);
357
358 insert_work(cwq, &barr->work, head);
359}
360
361static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
362{
363 int active = 0;
364 struct wq_barrier barr;
365
366 WARN_ON(cwq->thread == current);
367
368 spin_lock_irq(&cwq->lock);
369 if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
370 insert_wq_barrier(cwq, &barr, &cwq->worklist);
371 active = 1;
372 }
373 spin_unlock_irq(&cwq->lock);
374
375 if (active)
376 wait_for_completion(&barr.done);
377
378 return active;
379}
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394void flush_workqueue(struct workqueue_struct *wq)
395{
396 const struct cpumask *cpu_map = wq_cpu_map(wq);
397 int cpu;
398
399 might_sleep();
400 lock_map_acquire(&wq->lockdep_map);
401 lock_map_release(&wq->lockdep_map);
402 for_each_cpu(cpu, cpu_map)
403 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
404}
405EXPORT_SYMBOL_GPL(flush_workqueue);
406
407
408
409
410
411
412
413
414
415
416
417int flush_work(struct work_struct *work)
418{
419 struct cpu_workqueue_struct *cwq;
420 struct list_head *prev;
421 struct wq_barrier barr;
422
423 might_sleep();
424 cwq = get_wq_data(work);
425 if (!cwq)
426 return 0;
427
428 lock_map_acquire(&cwq->wq->lockdep_map);
429 lock_map_release(&cwq->wq->lockdep_map);
430
431 prev = NULL;
432 spin_lock_irq(&cwq->lock);
433 if (!list_empty(&work->entry)) {
434
435
436
437
438 smp_rmb();
439 if (unlikely(cwq != get_wq_data(work)))
440 goto out;
441 prev = &work->entry;
442 } else {
443 if (cwq->current_work != work)
444 goto out;
445 prev = &cwq->worklist;
446 }
447 insert_wq_barrier(cwq, &barr, prev->next);
448out:
449 spin_unlock_irq(&cwq->lock);
450 if (!prev)
451 return 0;
452
453 wait_for_completion(&barr.done);
454 return 1;
455}
456EXPORT_SYMBOL_GPL(flush_work);
457
458
459
460
461
462static int try_to_grab_pending(struct work_struct *work)
463{
464 struct cpu_workqueue_struct *cwq;
465 int ret = -1;
466
467 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work)))
468 return 0;
469
470
471
472
473
474
475 cwq = get_wq_data(work);
476 if (!cwq)
477 return ret;
478
479 spin_lock_irq(&cwq->lock);
480 if (!list_empty(&work->entry)) {
481
482
483
484
485
486 smp_rmb();
487 if (cwq == get_wq_data(work)) {
488 list_del_init(&work->entry);
489 ret = 1;
490 }
491 }
492 spin_unlock_irq(&cwq->lock);
493
494 return ret;
495}
496
497static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
498 struct work_struct *work)
499{
500 struct wq_barrier barr;
501 int running = 0;
502
503 spin_lock_irq(&cwq->lock);
504 if (unlikely(cwq->current_work == work)) {
505 insert_wq_barrier(cwq, &barr, cwq->worklist.next);
506 running = 1;
507 }
508 spin_unlock_irq(&cwq->lock);
509
510 if (unlikely(running))
511 wait_for_completion(&barr.done);
512}
513
514static void wait_on_work(struct work_struct *work)
515{
516 struct cpu_workqueue_struct *cwq;
517 struct workqueue_struct *wq;
518 const struct cpumask *cpu_map;
519 int cpu;
520
521 might_sleep();
522
523 lock_map_acquire(&work->lockdep_map);
524 lock_map_release(&work->lockdep_map);
525
526 cwq = get_wq_data(work);
527 if (!cwq)
528 return;
529
530 wq = cwq->wq;
531 cpu_map = wq_cpu_map(wq);
532
533 for_each_cpu(cpu, cpu_map)
534 wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
535}
536
537static int __cancel_work_timer(struct work_struct *work,
538 struct timer_list* timer)
539{
540 int ret;
541
542 do {
543 ret = (timer && likely(del_timer(timer)));
544 if (!ret)
545 ret = try_to_grab_pending(work);
546 wait_on_work(work);
547 } while (unlikely(ret < 0));
548
549 work_clear_pending(work);
550 return ret;
551}
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574int cancel_work_sync(struct work_struct *work)
575{
576 return __cancel_work_timer(work, NULL);
577}
578EXPORT_SYMBOL_GPL(cancel_work_sync);
579
580
581
582
583
584
585
586
587
588
589int cancel_delayed_work_sync(struct delayed_work *dwork)
590{
591 return __cancel_work_timer(&dwork->work, &dwork->timer);
592}
593EXPORT_SYMBOL(cancel_delayed_work_sync);
594
595static struct workqueue_struct *keventd_wq __read_mostly;
596
597
598
599
600
601
602
603
604
605
606
607
608int schedule_work(struct work_struct *work)
609{
610 return queue_work(keventd_wq, work);
611}
612EXPORT_SYMBOL(schedule_work);
613
614
615
616
617
618
619
620
621int schedule_work_on(int cpu, struct work_struct *work)
622{
623 return queue_work_on(cpu, keventd_wq, work);
624}
625EXPORT_SYMBOL(schedule_work_on);
626
627
628
629
630
631
632
633
634
635int schedule_delayed_work(struct delayed_work *dwork,
636 unsigned long delay)
637{
638 return queue_delayed_work(keventd_wq, dwork, delay);
639}
640EXPORT_SYMBOL(schedule_delayed_work);
641
642
643
644
645
646
647
648void flush_delayed_work(struct delayed_work *dwork)
649{
650 if (del_timer_sync(&dwork->timer)) {
651 struct cpu_workqueue_struct *cwq;
652 cwq = wq_per_cpu(keventd_wq, get_cpu());
653 __queue_work(cwq, &dwork->work);
654 put_cpu();
655 }
656 flush_work(&dwork->work);
657}
658EXPORT_SYMBOL(flush_delayed_work);
659
660
661
662
663
664
665
666
667
668
669int schedule_delayed_work_on(int cpu,
670 struct delayed_work *dwork, unsigned long delay)
671{
672 return queue_delayed_work_on(cpu, keventd_wq, dwork, delay);
673}
674EXPORT_SYMBOL(schedule_delayed_work_on);
675
676
677
678
679
680
681
682
683
684
685int schedule_on_each_cpu(work_func_t func)
686{
687 int cpu;
688 int orig = -1;
689 struct work_struct *works;
690
691 works = alloc_percpu(struct work_struct);
692 if (!works)
693 return -ENOMEM;
694
695 get_online_cpus();
696
697
698
699
700
701
702 if (current_is_keventd())
703 orig = raw_smp_processor_id();
704
705 for_each_online_cpu(cpu) {
706 struct work_struct *work = per_cpu_ptr(works, cpu);
707
708 INIT_WORK(work, func);
709 if (cpu != orig)
710 schedule_work_on(cpu, work);
711 }
712 if (orig >= 0)
713 func(per_cpu_ptr(works, orig));
714
715 for_each_online_cpu(cpu)
716 flush_work(per_cpu_ptr(works, cpu));
717
718 put_online_cpus();
719 free_percpu(works);
720 return 0;
721}
722
723void flush_scheduled_work(void)
724{
725 flush_workqueue(keventd_wq);
726}
727EXPORT_SYMBOL(flush_scheduled_work);
728
729
730
731
732
733
734
735
736
737
738
739
740
741int execute_in_process_context(work_func_t fn, struct execute_work *ew)
742{
743 if (!in_interrupt()) {
744 fn(&ew->work);
745 return 0;
746 }
747
748 INIT_WORK(&ew->work, fn);
749 schedule_work(&ew->work);
750
751 return 1;
752}
753EXPORT_SYMBOL_GPL(execute_in_process_context);
754
755int keventd_up(void)
756{
757 return keventd_wq != NULL;
758}
759
760int current_is_keventd(void)
761{
762 struct cpu_workqueue_struct *cwq;
763 int cpu = raw_smp_processor_id();
764 int ret = 0;
765
766 BUG_ON(!keventd_wq);
767
768 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
769 if (current == cwq->thread)
770 ret = 1;
771
772 return ret;
773
774}
775
776static struct cpu_workqueue_struct *
777init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
778{
779 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
780
781 cwq->wq = wq;
782 spin_lock_init(&cwq->lock);
783 INIT_LIST_HEAD(&cwq->worklist);
784 init_waitqueue_head(&cwq->more_work);
785
786 return cwq;
787}
788
789static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
790{
791 struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
792 struct workqueue_struct *wq = cwq->wq;
793 const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
794 struct task_struct *p;
795
796 p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
797
798
799
800
801
802
803
804
805 if (IS_ERR(p))
806 return PTR_ERR(p);
807 if (cwq->wq->rt)
808 sched_setscheduler_nocheck(p, SCHED_FIFO, ¶m);
809 cwq->thread = p;
810
811 trace_workqueue_creation(cwq->thread, cpu);
812
813 return 0;
814}
815
816static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
817{
818 struct task_struct *p = cwq->thread;
819
820 if (p != NULL) {
821 if (cpu >= 0)
822 kthread_bind(p, cpu);
823 wake_up_process(p);
824 }
825}
826
827struct workqueue_struct *__create_workqueue_key(const char *name,
828 int singlethread,
829 int freezeable,
830 int rt,
831 struct lock_class_key *key,
832 const char *lock_name)
833{
834 struct workqueue_struct *wq;
835 struct cpu_workqueue_struct *cwq;
836 int err = 0, cpu;
837
838 wq = kzalloc(sizeof(*wq), GFP_KERNEL);
839 if (!wq)
840 return NULL;
841
842 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
843 if (!wq->cpu_wq) {
844 kfree(wq);
845 return NULL;
846 }
847
848 wq->name = name;
849 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
850 wq->singlethread = singlethread;
851 wq->freezeable = freezeable;
852 wq->rt = rt;
853 INIT_LIST_HEAD(&wq->list);
854
855 if (singlethread) {
856 cwq = init_cpu_workqueue(wq, singlethread_cpu);
857 err = create_workqueue_thread(cwq, singlethread_cpu);
858 start_workqueue_thread(cwq, -1);
859 } else {
860 cpu_maps_update_begin();
861
862
863
864
865
866
867 spin_lock(&workqueue_lock);
868 list_add(&wq->list, &workqueues);
869 spin_unlock(&workqueue_lock);
870
871
872
873
874
875
876 for_each_possible_cpu(cpu) {
877 cwq = init_cpu_workqueue(wq, cpu);
878 if (err || !cpu_online(cpu))
879 continue;
880 err = create_workqueue_thread(cwq, cpu);
881 start_workqueue_thread(cwq, cpu);
882 }
883 cpu_maps_update_done();
884 }
885
886 if (err) {
887 destroy_workqueue(wq);
888 wq = NULL;
889 }
890 return wq;
891}
892EXPORT_SYMBOL_GPL(__create_workqueue_key);
893
894static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
895{
896
897
898
899
900 if (cwq->thread == NULL)
901 return;
902
903 lock_map_acquire(&cwq->wq->lockdep_map);
904 lock_map_release(&cwq->wq->lockdep_map);
905
906 flush_cpu_workqueue(cwq);
907
908
909
910
911
912
913
914
915
916
917 trace_workqueue_destruction(cwq->thread);
918 kthread_stop(cwq->thread);
919 cwq->thread = NULL;
920}
921
922
923
924
925
926
927
928void destroy_workqueue(struct workqueue_struct *wq)
929{
930 const struct cpumask *cpu_map = wq_cpu_map(wq);
931 int cpu;
932
933 cpu_maps_update_begin();
934 spin_lock(&workqueue_lock);
935 list_del(&wq->list);
936 spin_unlock(&workqueue_lock);
937
938 for_each_cpu(cpu, cpu_map)
939 cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu));
940 cpu_maps_update_done();
941
942 free_percpu(wq->cpu_wq);
943 kfree(wq);
944}
945EXPORT_SYMBOL_GPL(destroy_workqueue);
946
947static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
948 unsigned long action,
949 void *hcpu)
950{
951 unsigned int cpu = (unsigned long)hcpu;
952 struct cpu_workqueue_struct *cwq;
953 struct workqueue_struct *wq;
954 int ret = NOTIFY_OK;
955
956 action &= ~CPU_TASKS_FROZEN;
957
958 switch (action) {
959 case CPU_UP_PREPARE:
960 cpumask_set_cpu(cpu, cpu_populated_map);
961 }
962undo:
963 list_for_each_entry(wq, &workqueues, list) {
964 cwq = per_cpu_ptr(wq->cpu_wq, cpu);
965
966 switch (action) {
967 case CPU_UP_PREPARE:
968 if (!create_workqueue_thread(cwq, cpu))
969 break;
970 printk(KERN_ERR "workqueue [%s] for %i failed\n",
971 wq->name, cpu);
972 action = CPU_UP_CANCELED;
973 ret = NOTIFY_BAD;
974 goto undo;
975
976 case CPU_ONLINE:
977 start_workqueue_thread(cwq, cpu);
978 break;
979
980 case CPU_UP_CANCELED:
981 start_workqueue_thread(cwq, -1);
982 case CPU_POST_DEAD:
983 cleanup_workqueue_thread(cwq);
984 break;
985 }
986 }
987
988 switch (action) {
989 case CPU_UP_CANCELED:
990 case CPU_POST_DEAD:
991 cpumask_clear_cpu(cpu, cpu_populated_map);
992 }
993
994 return ret;
995}
996
997#ifdef CONFIG_SMP
998
999struct work_for_cpu {
1000 struct completion completion;
1001 long (*fn)(void *);
1002 void *arg;
1003 long ret;
1004};
1005
1006static int do_work_for_cpu(void *_wfc)
1007{
1008 struct work_for_cpu *wfc = _wfc;
1009 wfc->ret = wfc->fn(wfc->arg);
1010 complete(&wfc->completion);
1011 return 0;
1012}
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
1025{
1026 struct task_struct *sub_thread;
1027 struct work_for_cpu wfc = {
1028 .completion = COMPLETION_INITIALIZER_ONSTACK(wfc.completion),
1029 .fn = fn,
1030 .arg = arg,
1031 };
1032
1033 sub_thread = kthread_create(do_work_for_cpu, &wfc, "work_for_cpu");
1034 if (IS_ERR(sub_thread))
1035 return PTR_ERR(sub_thread);
1036 kthread_bind(sub_thread, cpu);
1037 wake_up_process(sub_thread);
1038 wait_for_completion(&wfc.completion);
1039 return wfc.ret;
1040}
1041EXPORT_SYMBOL_GPL(work_on_cpu);
1042#endif
1043
1044void __init init_workqueues(void)
1045{
1046 alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL);
1047
1048 cpumask_copy(cpu_populated_map, cpu_online_mask);
1049 singlethread_cpu = cpumask_first(cpu_possible_mask);
1050 cpu_singlethread_map = cpumask_of(singlethread_cpu);
1051 hotcpu_notifier(workqueue_cpu_callback, 0);
1052 keventd_wq = create_workqueue("events");
1053 BUG_ON(!keventd_wq);
1054}
1055