1
2
3
4
5
6
7
8
9
10
11
12#include <linux/module.h>
13
14#include <linux/sched.h>
15#include <linux/interrupt.h>
16#include <linux/slab.h>
17#include <linux/mempool.h>
18#include <linux/smp.h>
19#include <linux/spinlock.h>
20#include <linux/mutex.h>
21
22#include <linux/sunrpc/clnt.h>
23
24#include "sunrpc.h"
25
26#ifdef RPC_DEBUG
27#define RPCDBG_FACILITY RPCDBG_SCHED
28#endif
29
30
31
32
33#define RPC_BUFFER_MAXSIZE (2048)
34#define RPC_BUFFER_POOLSIZE (8)
35#define RPC_TASK_POOLSIZE (8)
36static struct kmem_cache *rpc_task_slabp __read_mostly;
37static struct kmem_cache *rpc_buffer_slabp __read_mostly;
38static mempool_t *rpc_task_mempool __read_mostly;
39static mempool_t *rpc_buffer_mempool __read_mostly;
40
41static void rpc_async_schedule(struct work_struct *);
42static void rpc_release_task(struct rpc_task *task);
43static void __rpc_queue_timer_fn(unsigned long ptr);
44
45
46
47
48static struct rpc_wait_queue delay_queue;
49
50
51
52
53struct workqueue_struct *rpciod_workqueue;
54
55
56
57
58
59
60static void
61__rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
62{
63 if (task->tk_timeout == 0)
64 return;
65 dprintk("RPC: %5u disabling timer\n", task->tk_pid);
66 task->tk_timeout = 0;
67 list_del(&task->u.tk_wait.timer_list);
68 if (list_empty(&queue->timer_list.list))
69 del_timer(&queue->timer_list.timer);
70}
71
72static void
73rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
74{
75 queue->timer_list.expires = expires;
76 mod_timer(&queue->timer_list.timer, expires);
77}
78
79
80
81
82static void
83__rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
84{
85 if (!task->tk_timeout)
86 return;
87
88 dprintk("RPC: %5u setting alarm for %lu ms\n",
89 task->tk_pid, task->tk_timeout * 1000 / HZ);
90
91 task->u.tk_wait.expires = jiffies + task->tk_timeout;
92 if (list_empty(&queue->timer_list.list) || time_before(task->u.tk_wait.expires, queue->timer_list.expires))
93 rpc_set_queue_timer(queue, task->u.tk_wait.expires);
94 list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
95}
96
97
98
99
100static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct rpc_task *task)
101{
102 struct list_head *q;
103 struct rpc_task *t;
104
105 INIT_LIST_HEAD(&task->u.tk_wait.links);
106 q = &queue->tasks[task->tk_priority];
107 if (unlikely(task->tk_priority > queue->maxpriority))
108 q = &queue->tasks[queue->maxpriority];
109 list_for_each_entry(t, q, u.tk_wait.list) {
110 if (t->tk_owner == task->tk_owner) {
111 list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
112 return;
113 }
114 }
115 list_add_tail(&task->u.tk_wait.list, q);
116}
117
118
119
120
121
122
123
124
125
126static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
127{
128 BUG_ON (RPC_IS_QUEUED(task));
129
130 if (RPC_IS_PRIORITY(queue))
131 __rpc_add_wait_queue_priority(queue, task);
132 else if (RPC_IS_SWAPPER(task))
133 list_add(&task->u.tk_wait.list, &queue->tasks[0]);
134 else
135 list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
136 task->tk_waitqueue = queue;
137 queue->qlen++;
138 rpc_set_queued(task);
139
140 dprintk("RPC: %5u added to queue %p \"%s\"\n",
141 task->tk_pid, queue, rpc_qname(queue));
142}
143
144
145
146
147static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
148{
149 struct rpc_task *t;
150
151 if (!list_empty(&task->u.tk_wait.links)) {
152 t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list);
153 list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
154 list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
155 }
156}
157
158
159
160
161
162static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
163{
164 __rpc_disable_timer(queue, task);
165 if (RPC_IS_PRIORITY(queue))
166 __rpc_remove_wait_queue_priority(task);
167 list_del(&task->u.tk_wait.list);
168 queue->qlen--;
169 dprintk("RPC: %5u removed from queue %p \"%s\"\n",
170 task->tk_pid, queue, rpc_qname(queue));
171}
172
173static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
174{
175 queue->priority = priority;
176 queue->count = 1 << (priority * 2);
177}
178
179static inline void rpc_set_waitqueue_owner(struct rpc_wait_queue *queue, pid_t pid)
180{
181 queue->owner = pid;
182 queue->nr = RPC_BATCH_COUNT;
183}
184
185static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
186{
187 rpc_set_waitqueue_priority(queue, queue->maxpriority);
188 rpc_set_waitqueue_owner(queue, 0);
189}
190
191static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues)
192{
193 int i;
194
195 spin_lock_init(&queue->lock);
196 for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
197 INIT_LIST_HEAD(&queue->tasks[i]);
198 queue->maxpriority = nr_queues - 1;
199 rpc_reset_waitqueue_priority(queue);
200 queue->qlen = 0;
201 setup_timer(&queue->timer_list.timer, __rpc_queue_timer_fn, (unsigned long)queue);
202 INIT_LIST_HEAD(&queue->timer_list.list);
203#ifdef RPC_DEBUG
204 queue->name = qname;
205#endif
206}
207
208void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
209{
210 __rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
211}
212EXPORT_SYMBOL_GPL(rpc_init_priority_wait_queue);
213
214void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
215{
216 __rpc_init_priority_wait_queue(queue, qname, 1);
217}
218EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
219
220void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
221{
222 del_timer_sync(&queue->timer_list.timer);
223}
224EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
225
226static int rpc_wait_bit_killable(void *word)
227{
228 if (fatal_signal_pending(current))
229 return -ERESTARTSYS;
230 schedule();
231 return 0;
232}
233
234#ifdef RPC_DEBUG
235static void rpc_task_set_debuginfo(struct rpc_task *task)
236{
237 static atomic_t rpc_pid;
238
239 task->tk_pid = atomic_inc_return(&rpc_pid);
240}
241#else
242static inline void rpc_task_set_debuginfo(struct rpc_task *task)
243{
244}
245#endif
246
247static void rpc_set_active(struct rpc_task *task)
248{
249 rpc_task_set_debuginfo(task);
250 set_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
251}
252
253
254
255
256
257static int rpc_complete_task(struct rpc_task *task)
258{
259 void *m = &task->tk_runstate;
260 wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
261 struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
262 unsigned long flags;
263 int ret;
264
265 spin_lock_irqsave(&wq->lock, flags);
266 clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
267 ret = atomic_dec_and_test(&task->tk_count);
268 if (waitqueue_active(wq))
269 __wake_up_locked_key(wq, TASK_NORMAL, &k);
270 spin_unlock_irqrestore(&wq->lock, flags);
271 return ret;
272}
273
274
275
276
277
278
279
280
281int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
282{
283 if (action == NULL)
284 action = rpc_wait_bit_killable;
285 return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
286 action, TASK_KILLABLE);
287}
288EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
289
290
291
292
293
294
295
296static void rpc_make_runnable(struct rpc_task *task)
297{
298 rpc_clear_queued(task);
299 if (rpc_test_and_set_running(task))
300 return;
301 if (RPC_IS_ASYNC(task)) {
302 INIT_WORK(&task->u.tk_work, rpc_async_schedule);
303 queue_work(rpciod_workqueue, &task->u.tk_work);
304 } else
305 wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
306}
307
308
309
310
311
312
313
314static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
315 rpc_action action)
316{
317 dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n",
318 task->tk_pid, rpc_qname(q), jiffies);
319
320 __rpc_add_wait_queue(q, task);
321
322 BUG_ON(task->tk_callback != NULL);
323 task->tk_callback = action;
324 __rpc_add_timer(q, task);
325}
326
327void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
328 rpc_action action)
329{
330
331 BUG_ON(!RPC_IS_ACTIVATED(task));
332
333
334
335
336 spin_lock_bh(&q->lock);
337 __rpc_sleep_on(q, task, action);
338 spin_unlock_bh(&q->lock);
339}
340EXPORT_SYMBOL_GPL(rpc_sleep_on);
341
342
343
344
345
346
347
348
349static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task)
350{
351 dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
352 task->tk_pid, jiffies);
353
354
355 if (!RPC_IS_ACTIVATED(task)) {
356 printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
357 return;
358 }
359
360 __rpc_remove_wait_queue(queue, task);
361
362 rpc_make_runnable(task);
363
364 dprintk("RPC: __rpc_wake_up_task done\n");
365}
366
367
368
369
370static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
371{
372 if (RPC_IS_QUEUED(task) && task->tk_waitqueue == queue)
373 __rpc_do_wake_up_task(queue, task);
374}
375
376
377
378
379int rpc_queue_empty(struct rpc_wait_queue *queue)
380{
381 int res;
382
383 spin_lock_bh(&queue->lock);
384 res = queue->qlen;
385 spin_unlock_bh(&queue->lock);
386 return res == 0;
387}
388EXPORT_SYMBOL_GPL(rpc_queue_empty);
389
390
391
392
393void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
394{
395 spin_lock_bh(&queue->lock);
396 rpc_wake_up_task_queue_locked(queue, task);
397 spin_unlock_bh(&queue->lock);
398}
399EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
400
401
402
403
404static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queue)
405{
406 struct list_head *q;
407 struct rpc_task *task;
408
409
410
411
412 q = &queue->tasks[queue->priority];
413 if (!list_empty(q)) {
414 task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
415 if (queue->owner == task->tk_owner) {
416 if (--queue->nr)
417 goto out;
418 list_move_tail(&task->u.tk_wait.list, q);
419 }
420
421
422
423 if (--queue->count)
424 goto new_owner;
425 }
426
427
428
429
430 do {
431 if (q == &queue->tasks[0])
432 q = &queue->tasks[queue->maxpriority];
433 else
434 q = q - 1;
435 if (!list_empty(q)) {
436 task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
437 goto new_queue;
438 }
439 } while (q != &queue->tasks[queue->priority]);
440
441 rpc_reset_waitqueue_priority(queue);
442 return NULL;
443
444new_queue:
445 rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
446new_owner:
447 rpc_set_waitqueue_owner(queue, task->tk_owner);
448out:
449 rpc_wake_up_task_queue_locked(queue, task);
450 return task;
451}
452
453
454
455
456struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
457{
458 struct rpc_task *task = NULL;
459
460 dprintk("RPC: wake_up_next(%p \"%s\")\n",
461 queue, rpc_qname(queue));
462 spin_lock_bh(&queue->lock);
463 if (RPC_IS_PRIORITY(queue))
464 task = __rpc_wake_up_next_priority(queue);
465 else {
466 task_for_first(task, &queue->tasks[0])
467 rpc_wake_up_task_queue_locked(queue, task);
468 }
469 spin_unlock_bh(&queue->lock);
470
471 return task;
472}
473EXPORT_SYMBOL_GPL(rpc_wake_up_next);
474
475
476
477
478
479
480
481void rpc_wake_up(struct rpc_wait_queue *queue)
482{
483 struct rpc_task *task, *next;
484 struct list_head *head;
485
486 spin_lock_bh(&queue->lock);
487 head = &queue->tasks[queue->maxpriority];
488 for (;;) {
489 list_for_each_entry_safe(task, next, head, u.tk_wait.list)
490 rpc_wake_up_task_queue_locked(queue, task);
491 if (head == &queue->tasks[0])
492 break;
493 head--;
494 }
495 spin_unlock_bh(&queue->lock);
496}
497EXPORT_SYMBOL_GPL(rpc_wake_up);
498
499
500
501
502
503
504
505
506void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
507{
508 struct rpc_task *task, *next;
509 struct list_head *head;
510
511 spin_lock_bh(&queue->lock);
512 head = &queue->tasks[queue->maxpriority];
513 for (;;) {
514 list_for_each_entry_safe(task, next, head, u.tk_wait.list) {
515 task->tk_status = status;
516 rpc_wake_up_task_queue_locked(queue, task);
517 }
518 if (head == &queue->tasks[0])
519 break;
520 head--;
521 }
522 spin_unlock_bh(&queue->lock);
523}
524EXPORT_SYMBOL_GPL(rpc_wake_up_status);
525
526static void __rpc_queue_timer_fn(unsigned long ptr)
527{
528 struct rpc_wait_queue *queue = (struct rpc_wait_queue *)ptr;
529 struct rpc_task *task, *n;
530 unsigned long expires, now, timeo;
531
532 spin_lock(&queue->lock);
533 expires = now = jiffies;
534 list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) {
535 timeo = task->u.tk_wait.expires;
536 if (time_after_eq(now, timeo)) {
537 dprintk("RPC: %5u timeout\n", task->tk_pid);
538 task->tk_status = -ETIMEDOUT;
539 rpc_wake_up_task_queue_locked(queue, task);
540 continue;
541 }
542 if (expires == now || time_after(expires, timeo))
543 expires = timeo;
544 }
545 if (!list_empty(&queue->timer_list.list))
546 rpc_set_queue_timer(queue, expires);
547 spin_unlock(&queue->lock);
548}
549
550static void __rpc_atrun(struct rpc_task *task)
551{
552 task->tk_status = 0;
553}
554
555
556
557
558void rpc_delay(struct rpc_task *task, unsigned long delay)
559{
560 task->tk_timeout = delay;
561 rpc_sleep_on(&delay_queue, task, __rpc_atrun);
562}
563EXPORT_SYMBOL_GPL(rpc_delay);
564
565
566
567
568void rpc_prepare_task(struct rpc_task *task)
569{
570 task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
571}
572
573
574
575
576void rpc_exit_task(struct rpc_task *task)
577{
578 task->tk_action = NULL;
579 if (task->tk_ops->rpc_call_done != NULL) {
580 task->tk_ops->rpc_call_done(task, task->tk_calldata);
581 if (task->tk_action != NULL) {
582 WARN_ON(RPC_ASSASSINATED(task));
583
584 xprt_release(task);
585 }
586 }
587}
588
589void rpc_exit(struct rpc_task *task, int status)
590{
591 task->tk_status = status;
592 task->tk_action = rpc_exit_task;
593 if (RPC_IS_QUEUED(task))
594 rpc_wake_up_queued_task(task->tk_waitqueue, task);
595}
596EXPORT_SYMBOL_GPL(rpc_exit);
597
598void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
599{
600 if (ops->rpc_release != NULL)
601 ops->rpc_release(calldata);
602}
603
604
605
606
607static void __rpc_execute(struct rpc_task *task)
608{
609 struct rpc_wait_queue *queue;
610 int task_is_async = RPC_IS_ASYNC(task);
611 int status = 0;
612
613 dprintk("RPC: %5u __rpc_execute flags=0x%x\n",
614 task->tk_pid, task->tk_flags);
615
616 BUG_ON(RPC_IS_QUEUED(task));
617
618 for (;;) {
619
620
621
622
623 if (task->tk_callback) {
624 void (*save_callback)(struct rpc_task *);
625
626
627
628
629
630 save_callback = task->tk_callback;
631 task->tk_callback = NULL;
632 save_callback(task);
633 } else {
634
635
636
637
638
639 if (task->tk_action == NULL)
640 break;
641 task->tk_action(task);
642 }
643
644
645
646
647 if (!RPC_IS_QUEUED(task))
648 continue;
649
650
651
652
653
654
655
656
657
658 queue = task->tk_waitqueue;
659 spin_lock_bh(&queue->lock);
660 if (!RPC_IS_QUEUED(task)) {
661 spin_unlock_bh(&queue->lock);
662 continue;
663 }
664 rpc_clear_running(task);
665 spin_unlock_bh(&queue->lock);
666 if (task_is_async)
667 return;
668
669
670 dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid);
671 status = out_of_line_wait_on_bit(&task->tk_runstate,
672 RPC_TASK_QUEUED, rpc_wait_bit_killable,
673 TASK_KILLABLE);
674 if (status == -ERESTARTSYS) {
675
676
677
678
679
680
681 dprintk("RPC: %5u got signal\n", task->tk_pid);
682 task->tk_flags |= RPC_TASK_KILLED;
683 rpc_exit(task, -ERESTARTSYS);
684 }
685 rpc_set_running(task);
686 dprintk("RPC: %5u sync task resuming\n", task->tk_pid);
687 }
688
689 dprintk("RPC: %5u return %d, status %d\n", task->tk_pid, status,
690 task->tk_status);
691
692 rpc_release_task(task);
693}
694
695
696
697
698
699
700
701
702
703
704void rpc_execute(struct rpc_task *task)
705{
706 rpc_set_active(task);
707 rpc_make_runnable(task);
708 if (!RPC_IS_ASYNC(task))
709 __rpc_execute(task);
710}
711
712static void rpc_async_schedule(struct work_struct *work)
713{
714 __rpc_execute(container_of(work, struct rpc_task, u.tk_work));
715}
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733void *rpc_malloc(struct rpc_task *task, size_t size)
734{
735 struct rpc_buffer *buf;
736 gfp_t gfp = RPC_IS_SWAPPER(task) ? GFP_ATOMIC : GFP_NOWAIT;
737
738 size += sizeof(struct rpc_buffer);
739 if (size <= RPC_BUFFER_MAXSIZE)
740 buf = mempool_alloc(rpc_buffer_mempool, gfp);
741 else
742 buf = kmalloc(size, gfp);
743
744 if (!buf)
745 return NULL;
746
747 buf->len = size;
748 dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
749 task->tk_pid, size, buf);
750 return &buf->data;
751}
752EXPORT_SYMBOL_GPL(rpc_malloc);
753
754
755
756
757
758
759void rpc_free(void *buffer)
760{
761 size_t size;
762 struct rpc_buffer *buf;
763
764 if (!buffer)
765 return;
766
767 buf = container_of(buffer, struct rpc_buffer, data);
768 size = buf->len;
769
770 dprintk("RPC: freeing buffer of size %zu at %p\n",
771 size, buf);
772
773 if (size <= RPC_BUFFER_MAXSIZE)
774 mempool_free(buf, rpc_buffer_mempool);
775 else
776 kfree(buf);
777}
778EXPORT_SYMBOL_GPL(rpc_free);
779
780
781
782
783static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
784{
785 memset(task, 0, sizeof(*task));
786 atomic_set(&task->tk_count, 1);
787 task->tk_flags = task_setup_data->flags;
788 task->tk_ops = task_setup_data->callback_ops;
789 task->tk_calldata = task_setup_data->callback_data;
790 INIT_LIST_HEAD(&task->tk_task);
791
792
793 task->tk_garb_retry = 2;
794 task->tk_cred_retry = 2;
795
796 task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW;
797 task->tk_owner = current->tgid;
798
799
800 task->tk_workqueue = task_setup_data->workqueue;
801
802 if (task->tk_ops->rpc_call_prepare != NULL)
803 task->tk_action = rpc_prepare_task;
804
805
806 task->tk_start = ktime_get();
807
808 dprintk("RPC: new task initialized, procpid %u\n",
809 task_pid_nr(current));
810}
811
812static struct rpc_task *
813rpc_alloc_task(void)
814{
815 return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
816}
817
818
819
820
821struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
822{
823 struct rpc_task *task = setup_data->task;
824 unsigned short flags = 0;
825
826 if (task == NULL) {
827 task = rpc_alloc_task();
828 if (task == NULL) {
829 rpc_release_calldata(setup_data->callback_ops,
830 setup_data->callback_data);
831 return ERR_PTR(-ENOMEM);
832 }
833 flags = RPC_TASK_DYNAMIC;
834 }
835
836 rpc_init_task(task, setup_data);
837 task->tk_flags |= flags;
838 dprintk("RPC: allocated task %p\n", task);
839 return task;
840}
841
842static void rpc_free_task(struct rpc_task *task)
843{
844 const struct rpc_call_ops *tk_ops = task->tk_ops;
845 void *calldata = task->tk_calldata;
846
847 if (task->tk_flags & RPC_TASK_DYNAMIC) {
848 dprintk("RPC: %5u freeing task\n", task->tk_pid);
849 mempool_free(task, rpc_task_mempool);
850 }
851 rpc_release_calldata(tk_ops, calldata);
852}
853
854static void rpc_async_release(struct work_struct *work)
855{
856 rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
857}
858
859static void rpc_release_resources_task(struct rpc_task *task)
860{
861 if (task->tk_rqstp)
862 xprt_release(task);
863 if (task->tk_msg.rpc_cred) {
864 put_rpccred(task->tk_msg.rpc_cred);
865 task->tk_msg.rpc_cred = NULL;
866 }
867 rpc_task_release_client(task);
868}
869
870static void rpc_final_put_task(struct rpc_task *task,
871 struct workqueue_struct *q)
872{
873 if (q != NULL) {
874 INIT_WORK(&task->u.tk_work, rpc_async_release);
875 queue_work(q, &task->u.tk_work);
876 } else
877 rpc_free_task(task);
878}
879
880static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
881{
882 if (atomic_dec_and_test(&task->tk_count)) {
883 rpc_release_resources_task(task);
884 rpc_final_put_task(task, q);
885 }
886}
887
888void rpc_put_task(struct rpc_task *task)
889{
890 rpc_do_put_task(task, NULL);
891}
892EXPORT_SYMBOL_GPL(rpc_put_task);
893
894void rpc_put_task_async(struct rpc_task *task)
895{
896 rpc_do_put_task(task, task->tk_workqueue);
897}
898EXPORT_SYMBOL_GPL(rpc_put_task_async);
899
900static void rpc_release_task(struct rpc_task *task)
901{
902 dprintk("RPC: %5u release task\n", task->tk_pid);
903
904 BUG_ON (RPC_IS_QUEUED(task));
905
906 rpc_release_resources_task(task);
907
908
909
910
911
912
913 if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) {
914
915 if (!rpc_complete_task(task))
916 return;
917 } else {
918 if (!atomic_dec_and_test(&task->tk_count))
919 return;
920 }
921 rpc_final_put_task(task, task->tk_workqueue);
922}
923
924int rpciod_up(void)
925{
926 return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
927}
928
929void rpciod_down(void)
930{
931 module_put(THIS_MODULE);
932}
933
934
935
936
937static int rpciod_start(void)
938{
939 struct workqueue_struct *wq;
940
941
942
943
944 dprintk("RPC: creating workqueue rpciod\n");
945 wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0);
946 rpciod_workqueue = wq;
947 return rpciod_workqueue != NULL;
948}
949
950static void rpciod_stop(void)
951{
952 struct workqueue_struct *wq = NULL;
953
954 if (rpciod_workqueue == NULL)
955 return;
956 dprintk("RPC: destroying workqueue rpciod\n");
957
958 wq = rpciod_workqueue;
959 rpciod_workqueue = NULL;
960 destroy_workqueue(wq);
961}
962
963void
964rpc_destroy_mempool(void)
965{
966 rpciod_stop();
967 if (rpc_buffer_mempool)
968 mempool_destroy(rpc_buffer_mempool);
969 if (rpc_task_mempool)
970 mempool_destroy(rpc_task_mempool);
971 if (rpc_task_slabp)
972 kmem_cache_destroy(rpc_task_slabp);
973 if (rpc_buffer_slabp)
974 kmem_cache_destroy(rpc_buffer_slabp);
975 rpc_destroy_wait_queue(&delay_queue);
976}
977
978int
979rpc_init_mempool(void)
980{
981
982
983
984
985 rpc_init_wait_queue(&delay_queue, "delayq");
986 if (!rpciod_start())
987 goto err_nomem;
988
989 rpc_task_slabp = kmem_cache_create("rpc_tasks",
990 sizeof(struct rpc_task),
991 0, SLAB_HWCACHE_ALIGN,
992 NULL);
993 if (!rpc_task_slabp)
994 goto err_nomem;
995 rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
996 RPC_BUFFER_MAXSIZE,
997 0, SLAB_HWCACHE_ALIGN,
998 NULL);
999 if (!rpc_buffer_slabp)
1000 goto err_nomem;
1001 rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
1002 rpc_task_slabp);
1003 if (!rpc_task_mempool)
1004 goto err_nomem;
1005 rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
1006 rpc_buffer_slabp);
1007 if (!rpc_buffer_mempool)
1008 goto err_nomem;
1009 return 0;
1010err_nomem:
1011 rpc_destroy_mempool();
1012 return -ENOMEM;
1013}
1014