1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52#define DEBUG_SUBSYSTEM S_RPC
53
54#include "../../include/linux/libcfs/libcfs.h"
55
56#include "../include/lustre_net.h"
57#include "../include/lustre_lib.h"
58#include "../include/lustre_ha.h"
59#include "../include/obd_class.h"
60#include "../include/obd_support.h"
61#include "../include/cl_object.h"
62#include "../include/lprocfs_status.h"
63
64#include "ptlrpc_internal.h"
65
66
67struct ptlrpcd {
68 int pd_size;
69 int pd_index;
70 int pd_cpt;
71 int pd_cursor;
72 int pd_nthreads;
73 int pd_groupsize;
74 struct ptlrpcd_ctl pd_threads[0];
75};
76
77
78
79
80
81
82
83static int max_ptlrpcds;
84module_param(max_ptlrpcds, int, 0644);
85MODULE_PARM_DESC(max_ptlrpcds,
86 "Max ptlrpcd thread count to be started (obsolete).");
87
88
89
90
91
92
93
94static int ptlrpcd_bind_policy;
95module_param(ptlrpcd_bind_policy, int, 0644);
96MODULE_PARM_DESC(ptlrpcd_bind_policy,
97 "Ptlrpcd threads binding mode (obsolete).");
98
99
100
101
102
103static int ptlrpcd_per_cpt_max;
104module_param(ptlrpcd_per_cpt_max, int, 0644);
105MODULE_PARM_DESC(ptlrpcd_per_cpt_max,
106 "Max ptlrpcd thread count to be started per CPT.");
107
108
109
110
111
112
113
114static int ptlrpcd_partner_group_size;
115module_param(ptlrpcd_partner_group_size, int, 0644);
116MODULE_PARM_DESC(ptlrpcd_partner_group_size,
117 "Number of ptlrpcd threads in a partner group.");
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135static char *ptlrpcd_cpts;
136module_param(ptlrpcd_cpts, charp, 0644);
137MODULE_PARM_DESC(ptlrpcd_cpts,
138 "CPU partitions ptlrpcd threads should run in");
139
140
141static int *ptlrpcds_cpt_idx;
142
143
144static int ptlrpcds_num;
145static struct ptlrpcd **ptlrpcds;
146
147
148
149
150
151
152
153
154static struct ptlrpcd_ctl ptlrpcd_rcv;
155
156struct mutex ptlrpcd_mutex;
157static int ptlrpcd_users;
158
159void ptlrpcd_wake(struct ptlrpc_request *req)
160{
161 struct ptlrpc_request_set *set = req->rq_set;
162
163 wake_up(&set->set_waitq);
164}
165EXPORT_SYMBOL(ptlrpcd_wake);
166
167static struct ptlrpcd_ctl *
168ptlrpcd_select_pc(struct ptlrpc_request *req)
169{
170 struct ptlrpcd *pd;
171 int cpt;
172 int idx;
173
174 if (req && req->rq_send_state != LUSTRE_IMP_FULL)
175 return &ptlrpcd_rcv;
176
177 cpt = cfs_cpt_current(cfs_cpt_table, 1);
178 if (!ptlrpcds_cpt_idx)
179 idx = cpt;
180 else
181 idx = ptlrpcds_cpt_idx[cpt];
182 pd = ptlrpcds[idx];
183
184
185 idx = pd->pd_cursor;
186 if (++idx == pd->pd_nthreads)
187 idx = 0;
188 pd->pd_cursor = idx;
189
190 return &pd->pd_threads[idx];
191}
192
193
194
195
196static int ptlrpcd_steal_rqset(struct ptlrpc_request_set *des,
197 struct ptlrpc_request_set *src)
198{
199 struct list_head *tmp, *pos;
200 struct ptlrpc_request *req;
201 int rc = 0;
202
203 spin_lock(&src->set_new_req_lock);
204 if (likely(!list_empty(&src->set_new_requests))) {
205 list_for_each_safe(pos, tmp, &src->set_new_requests) {
206 req = list_entry(pos, struct ptlrpc_request,
207 rq_set_chain);
208 req->rq_set = des;
209 }
210 list_splice_init(&src->set_new_requests, &des->set_requests);
211 rc = atomic_read(&src->set_new_count);
212 atomic_add(rc, &des->set_remaining);
213 atomic_set(&src->set_new_count, 0);
214 }
215 spin_unlock(&src->set_new_req_lock);
216 return rc;
217}
218
219
220
221
222
223void ptlrpcd_add_req(struct ptlrpc_request *req)
224{
225 struct ptlrpcd_ctl *pc;
226
227 if (req->rq_reqmsg)
228 lustre_msg_set_jobid(req->rq_reqmsg, NULL);
229
230 spin_lock(&req->rq_lock);
231 if (req->rq_invalid_rqset) {
232 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(5),
233 back_to_sleep, NULL);
234
235 req->rq_invalid_rqset = 0;
236 spin_unlock(&req->rq_lock);
237 l_wait_event(req->rq_set_waitq, !req->rq_set, &lwi);
238 } else if (req->rq_set) {
239
240
241
242 LASSERT(req->rq_phase == RQ_PHASE_NEW);
243 LASSERT(req->rq_send_state == LUSTRE_IMP_REPLAY);
244
245
246 atomic_inc(&req->rq_set->set_remaining);
247 spin_unlock(&req->rq_lock);
248 wake_up(&req->rq_set->set_waitq);
249 return;
250 } else {
251 spin_unlock(&req->rq_lock);
252 }
253
254 pc = ptlrpcd_select_pc(req);
255
256 DEBUG_REQ(D_INFO, req, "add req [%p] to pc [%s:%d]",
257 req, pc->pc_name, pc->pc_index);
258
259 ptlrpc_set_add_new_req(pc, req);
260}
261EXPORT_SYMBOL(ptlrpcd_add_req);
262
263static inline void ptlrpc_reqset_get(struct ptlrpc_request_set *set)
264{
265 atomic_inc(&set->set_refcount);
266}
267
268
269
270
271
272static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc)
273{
274 struct list_head *tmp, *pos;
275 struct ptlrpc_request *req;
276 struct ptlrpc_request_set *set = pc->pc_set;
277 int rc = 0;
278 int rc2;
279
280 if (atomic_read(&set->set_new_count)) {
281 spin_lock(&set->set_new_req_lock);
282 if (likely(!list_empty(&set->set_new_requests))) {
283 list_splice_init(&set->set_new_requests,
284 &set->set_requests);
285 atomic_add(atomic_read(&set->set_new_count),
286 &set->set_remaining);
287 atomic_set(&set->set_new_count, 0);
288
289
290
291 rc = 1;
292 }
293 spin_unlock(&set->set_new_req_lock);
294 }
295
296
297
298
299 rc2 = lu_env_refill(env);
300 if (rc2 != 0) {
301
302
303
304
305
306
307
308
309
310
311 CERROR("Failure to refill session: %d\n", rc2);
312 return rc;
313 }
314
315 if (atomic_read(&set->set_remaining))
316 rc |= ptlrpc_check_set(env, set);
317
318
319
320
321 list_for_each_safe(pos, tmp, &set->set_requests) {
322 req = list_entry(pos, struct ptlrpc_request, rq_set_chain);
323 if (req->rq_phase != RQ_PHASE_COMPLETE)
324 break;
325
326 list_del_init(&req->rq_set_chain);
327 req->rq_set = NULL;
328 ptlrpc_req_finished(req);
329 }
330
331 if (rc == 0) {
332
333
334
335 rc = atomic_read(&set->set_new_count);
336
337
338
339
340 if (rc == 0 && pc->pc_npartners > 0) {
341 struct ptlrpcd_ctl *partner;
342 struct ptlrpc_request_set *ps;
343 int first = pc->pc_cursor;
344
345 do {
346 partner = pc->pc_partners[pc->pc_cursor++];
347 if (pc->pc_cursor >= pc->pc_npartners)
348 pc->pc_cursor = 0;
349 if (!partner)
350 continue;
351
352 spin_lock(&partner->pc_lock);
353 ps = partner->pc_set;
354 if (!ps) {
355 spin_unlock(&partner->pc_lock);
356 continue;
357 }
358
359 ptlrpc_reqset_get(ps);
360 spin_unlock(&partner->pc_lock);
361
362 if (atomic_read(&ps->set_new_count)) {
363 rc = ptlrpcd_steal_rqset(set, ps);
364 if (rc > 0)
365 CDEBUG(D_RPCTRACE, "transfer %d async RPCs [%d->%d]\n",
366 rc, partner->pc_index,
367 pc->pc_index);
368 }
369 ptlrpc_reqset_put(ps);
370 } while (rc == 0 && pc->pc_cursor != first);
371 }
372 }
373
374 return rc;
375}
376
377
378
379
380
381
382
383static int ptlrpcd(void *arg)
384{
385 struct ptlrpcd_ctl *pc = arg;
386 struct ptlrpc_request_set *set;
387 struct lu_context ses = { 0 };
388 struct lu_env env = { .le_ses = &ses };
389 int rc = 0;
390 int exit = 0;
391
392 unshare_fs_struct();
393 if (cfs_cpt_bind(cfs_cpt_table, pc->pc_cpt) != 0)
394 CWARN("Failed to bind %s on CPT %d\n", pc->pc_name, pc->pc_cpt);
395
396
397
398
399
400
401
402 set = ptlrpc_prep_set();
403 if (!set) {
404 rc = -ENOMEM;
405 goto failed;
406 }
407 spin_lock(&pc->pc_lock);
408 pc->pc_set = set;
409 spin_unlock(&pc->pc_lock);
410
411
412
413
414
415 rc = lu_context_init(&env.le_ctx,
416 LCT_CL_THREAD | LCT_REMEMBER | LCT_NOREF);
417 if (rc == 0) {
418 rc = lu_context_init(env.le_ses,
419 LCT_SESSION | LCT_REMEMBER | LCT_NOREF);
420 if (rc != 0)
421 lu_context_fini(&env.le_ctx);
422 }
423
424 if (rc != 0)
425 goto failed;
426
427 complete(&pc->pc_starting);
428
429
430
431
432
433
434
435 do {
436 struct l_wait_info lwi;
437 int timeout;
438
439 timeout = ptlrpc_set_next_timeout(set);
440 lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
441 ptlrpc_expired_set, set);
442
443 lu_context_enter(&env.le_ctx);
444 lu_context_enter(env.le_ses);
445 l_wait_event(set->set_waitq, ptlrpcd_check(&env, pc), &lwi);
446 lu_context_exit(&env.le_ctx);
447 lu_context_exit(env.le_ses);
448
449
450
451
452 if (test_bit(LIOD_STOP, &pc->pc_flags)) {
453 if (test_bit(LIOD_FORCE, &pc->pc_flags))
454 ptlrpc_abort_set(set);
455 exit++;
456 }
457
458
459
460
461
462 } while (exit < 2);
463
464
465
466
467 if (!list_empty(&set->set_requests))
468 ptlrpc_set_wait(set);
469 lu_context_fini(&env.le_ctx);
470 lu_context_fini(env.le_ses);
471
472 complete(&pc->pc_finishing);
473
474 return 0;
475failed:
476 pc->pc_error = rc;
477 complete(&pc->pc_starting);
478 return rc;
479}
480
481static void ptlrpcd_ctl_init(struct ptlrpcd_ctl *pc, int index, int cpt)
482{
483 pc->pc_index = index;
484 pc->pc_cpt = cpt;
485 init_completion(&pc->pc_starting);
486 init_completion(&pc->pc_finishing);
487 spin_lock_init(&pc->pc_lock);
488
489 if (index < 0) {
490
491 snprintf(pc->pc_name, sizeof(pc->pc_name), "ptlrpcd_rcv");
492 } else {
493
494 snprintf(pc->pc_name, sizeof(pc->pc_name),
495 "ptlrpcd_%02d_%02d", cpt, index);
496 }
497}
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516static int ptlrpcd_partners(struct ptlrpcd *pd, int index)
517{
518 struct ptlrpcd_ctl *pc;
519 struct ptlrpcd_ctl **ppc;
520 int first;
521 int i;
522 int rc = 0;
523 int size;
524
525 LASSERT(index >= 0 && index < pd->pd_nthreads);
526 pc = &pd->pd_threads[index];
527 pc->pc_npartners = pd->pd_groupsize - 1;
528
529 if (pc->pc_npartners <= 0)
530 goto out;
531
532 size = sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners;
533 pc->pc_partners = kzalloc_node(size, GFP_NOFS,
534 cfs_cpt_spread_node(cfs_cpt_table,
535 pc->pc_cpt));
536 if (!pc->pc_partners) {
537 pc->pc_npartners = 0;
538 rc = -ENOMEM;
539 goto out;
540 }
541
542 first = index - index % pd->pd_groupsize;
543 ppc = pc->pc_partners;
544 for (i = first; i < first + pd->pd_groupsize; i++) {
545 if (i != index)
546 *ppc++ = &pd->pd_threads[i];
547 }
548out:
549 return rc;
550}
551
552int ptlrpcd_start(struct ptlrpcd_ctl *pc)
553{
554 struct task_struct *task;
555 int rc = 0;
556
557
558
559
560 if (test_and_set_bit(LIOD_START, &pc->pc_flags)) {
561 CWARN("Starting second thread (%s) for same pc %p\n",
562 pc->pc_name, pc);
563 return 0;
564 }
565
566 task = kthread_run(ptlrpcd, pc, "%s", pc->pc_name);
567 if (IS_ERR(task)) {
568 rc = PTR_ERR(task);
569 goto out_set;
570 }
571
572 wait_for_completion(&pc->pc_starting);
573 rc = pc->pc_error;
574 if (rc != 0)
575 goto out_set;
576
577 return 0;
578
579out_set:
580 if (pc->pc_set) {
581 struct ptlrpc_request_set *set = pc->pc_set;
582
583 spin_lock(&pc->pc_lock);
584 pc->pc_set = NULL;
585 spin_unlock(&pc->pc_lock);
586 ptlrpc_set_destroy(set);
587 }
588 clear_bit(LIOD_START, &pc->pc_flags);
589 return rc;
590}
591
592void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force)
593{
594 if (!test_bit(LIOD_START, &pc->pc_flags)) {
595 CWARN("Thread for pc %p was not started\n", pc);
596 return;
597 }
598
599 set_bit(LIOD_STOP, &pc->pc_flags);
600 if (force)
601 set_bit(LIOD_FORCE, &pc->pc_flags);
602 wake_up(&pc->pc_set->set_waitq);
603}
604
605void ptlrpcd_free(struct ptlrpcd_ctl *pc)
606{
607 struct ptlrpc_request_set *set = pc->pc_set;
608
609 if (!test_bit(LIOD_START, &pc->pc_flags)) {
610 CWARN("Thread for pc %p was not started\n", pc);
611 goto out;
612 }
613
614 wait_for_completion(&pc->pc_finishing);
615
616 spin_lock(&pc->pc_lock);
617 pc->pc_set = NULL;
618 spin_unlock(&pc->pc_lock);
619 ptlrpc_set_destroy(set);
620
621 clear_bit(LIOD_START, &pc->pc_flags);
622 clear_bit(LIOD_STOP, &pc->pc_flags);
623 clear_bit(LIOD_FORCE, &pc->pc_flags);
624
625out:
626 if (pc->pc_npartners > 0) {
627 LASSERT(pc->pc_partners);
628
629 kfree(pc->pc_partners);
630 pc->pc_partners = NULL;
631 }
632 pc->pc_npartners = 0;
633 pc->pc_error = 0;
634}
635
636static void ptlrpcd_fini(void)
637{
638 int i;
639 int j;
640
641 if (ptlrpcds) {
642 for (i = 0; i < ptlrpcds_num; i++) {
643 if (!ptlrpcds[i])
644 break;
645 for (j = 0; j < ptlrpcds[i]->pd_nthreads; j++)
646 ptlrpcd_stop(&ptlrpcds[i]->pd_threads[j], 0);
647 for (j = 0; j < ptlrpcds[i]->pd_nthreads; j++)
648 ptlrpcd_free(&ptlrpcds[i]->pd_threads[j]);
649 kfree(ptlrpcds[i]);
650 ptlrpcds[i] = NULL;
651 }
652 kfree(ptlrpcds);
653 }
654 ptlrpcds_num = 0;
655
656 ptlrpcd_stop(&ptlrpcd_rcv, 0);
657 ptlrpcd_free(&ptlrpcd_rcv);
658
659 kfree(ptlrpcds_cpt_idx);
660 ptlrpcds_cpt_idx = NULL;
661}
662
663static int ptlrpcd_init(void)
664{
665 int nthreads;
666 int groupsize;
667 int size;
668 int i;
669 int j;
670 int rc = 0;
671 struct cfs_cpt_table *cptable;
672 __u32 *cpts = NULL;
673 int ncpts;
674 int cpt;
675 struct ptlrpcd *pd;
676
677
678
679
680 cptable = cfs_cpt_table;
681 ncpts = cfs_cpt_number(cptable);
682 if (ptlrpcd_cpts) {
683 struct cfs_expr_list *el;
684
685 size = ncpts * sizeof(ptlrpcds_cpt_idx[0]);
686 ptlrpcds_cpt_idx = kzalloc(size, GFP_KERNEL);
687 if (!ptlrpcds_cpt_idx) {
688 rc = -ENOMEM;
689 goto out;
690 }
691
692 rc = cfs_expr_list_parse(ptlrpcd_cpts,
693 strlen(ptlrpcd_cpts),
694 0, ncpts - 1, &el);
695
696 if (rc != 0) {
697 CERROR("ptlrpcd_cpts: invalid CPT pattern string: %s",
698 ptlrpcd_cpts);
699 rc = -EINVAL;
700 goto out;
701 }
702
703 rc = cfs_expr_list_values(el, ncpts, &cpts);
704 cfs_expr_list_free(el);
705 if (rc <= 0) {
706 CERROR("ptlrpcd_cpts: failed to parse CPT array %s: %d\n",
707 ptlrpcd_cpts, rc);
708 if (rc == 0)
709 rc = -EINVAL;
710 goto out;
711 }
712
713
714
715
716
717
718
719 for (cpt = 0; cpt < ncpts; cpt++) {
720 for (i = 0; i < rc; i++)
721 if (cpts[i] == cpt)
722 break;
723 if (i >= rc)
724 i = cpt % rc;
725 ptlrpcds_cpt_idx[cpt] = i;
726 }
727
728 cfs_expr_list_values_free(cpts, rc);
729 ncpts = rc;
730 }
731 ptlrpcds_num = ncpts;
732
733 size = ncpts * sizeof(ptlrpcds[0]);
734 ptlrpcds = kzalloc(size, GFP_KERNEL);
735 if (!ptlrpcds) {
736 rc = -ENOMEM;
737 goto out;
738 }
739
740
741
742
743
744
745 if (max_ptlrpcds != 0) {
746 CWARN("max_ptlrpcds is obsolete.\n");
747 if (ptlrpcd_per_cpt_max == 0) {
748 ptlrpcd_per_cpt_max = max_ptlrpcds / ncpts;
749
750 if (max_ptlrpcds % ncpts != 0)
751 ptlrpcd_per_cpt_max++;
752 CWARN("Setting ptlrpcd_per_cpt_max = %d\n",
753 ptlrpcd_per_cpt_max);
754 } else {
755 CWARN("ptlrpd_per_cpt_max is also set!\n");
756 }
757 }
758
759
760
761
762
763
764 if (ptlrpcd_bind_policy != 0) {
765 CWARN("ptlrpcd_bind_policy is obsolete.\n");
766 if (ptlrpcd_partner_group_size == 0) {
767 switch (ptlrpcd_bind_policy) {
768 case 1:
769 case 2:
770 ptlrpcd_partner_group_size = 1;
771 break;
772 case 3:
773 ptlrpcd_partner_group_size = 2;
774 break;
775 case 4:
776#ifdef CONFIG_NUMA
777 ptlrpcd_partner_group_size = -1;
778#else
779 ptlrpcd_partner_group_size = 3;
780#endif
781 break;
782 default:
783 ptlrpcd_partner_group_size = 2;
784 break;
785 }
786 CWARN("Setting ptlrpcd_partner_group_size = %d\n",
787 ptlrpcd_partner_group_size);
788 } else {
789 CWARN("ptlrpcd_partner_group_size is also set!\n");
790 }
791 }
792
793 if (ptlrpcd_partner_group_size == 0)
794 ptlrpcd_partner_group_size = 2;
795 else if (ptlrpcd_partner_group_size < 0)
796 ptlrpcd_partner_group_size = -1;
797 else if (ptlrpcd_per_cpt_max > 0 &&
798 ptlrpcd_partner_group_size > ptlrpcd_per_cpt_max)
799 ptlrpcd_partner_group_size = ptlrpcd_per_cpt_max;
800
801
802
803
804 set_bit(LIOD_RECOVERY, &ptlrpcd_rcv.pc_flags);
805 ptlrpcd_ctl_init(&ptlrpcd_rcv, -1, CFS_CPT_ANY);
806 rc = ptlrpcd_start(&ptlrpcd_rcv);
807 if (rc < 0)
808 goto out;
809
810 for (i = 0; i < ncpts; i++) {
811 if (!cpts)
812 cpt = i;
813 else
814 cpt = cpts[i];
815
816 nthreads = cfs_cpt_weight(cptable, cpt);
817 if (ptlrpcd_per_cpt_max > 0 && ptlrpcd_per_cpt_max < nthreads)
818 nthreads = ptlrpcd_per_cpt_max;
819 if (nthreads < 2)
820 nthreads = 2;
821
822 if (ptlrpcd_partner_group_size <= 0) {
823 groupsize = nthreads;
824 } else if (nthreads <= ptlrpcd_partner_group_size) {
825 groupsize = nthreads;
826 } else {
827 groupsize = ptlrpcd_partner_group_size;
828 if (nthreads % groupsize != 0)
829 nthreads += groupsize - (nthreads % groupsize);
830 }
831
832 size = offsetof(struct ptlrpcd, pd_threads[nthreads]);
833 pd = kzalloc_node(size, GFP_NOFS,
834 cfs_cpt_spread_node(cfs_cpt_table, cpt));
835 if (!pd) {
836 rc = -ENOMEM;
837 goto out;
838 }
839 pd->pd_size = size;
840 pd->pd_index = i;
841 pd->pd_cpt = cpt;
842 pd->pd_cursor = 0;
843 pd->pd_nthreads = nthreads;
844 pd->pd_groupsize = groupsize;
845 ptlrpcds[i] = pd;
846
847
848
849
850
851
852 for (j = 0; j < nthreads; j++) {
853 ptlrpcd_ctl_init(&pd->pd_threads[j], j, cpt);
854 rc = ptlrpcd_partners(pd, j);
855 if (rc < 0)
856 goto out;
857 }
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878 for (j = 0; j < nthreads; j++) {
879 rc = ptlrpcd_start(&pd->pd_threads[j]);
880 if (rc < 0)
881 goto out;
882 }
883 }
884out:
885 if (rc != 0)
886 ptlrpcd_fini();
887
888 return rc;
889}
890
891int ptlrpcd_addref(void)
892{
893 int rc = 0;
894
895 mutex_lock(&ptlrpcd_mutex);
896 if (++ptlrpcd_users == 1) {
897 rc = ptlrpcd_init();
898 if (rc < 0)
899 ptlrpcd_users--;
900 }
901 mutex_unlock(&ptlrpcd_mutex);
902 return rc;
903}
904EXPORT_SYMBOL(ptlrpcd_addref);
905
906void ptlrpcd_decref(void)
907{
908 mutex_lock(&ptlrpcd_mutex);
909 if (--ptlrpcd_users == 0)
910 ptlrpcd_fini();
911 mutex_unlock(&ptlrpcd_mutex);
912}
913EXPORT_SYMBOL(ptlrpcd_decref);
914
915