1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38#define DEBUG_SUBSYSTEM S_RPC
39#include "../include/obd_support.h"
40#include "../include/obd_class.h"
41#include "../include/lustre_net.h"
42#include "../include/lprocfs_status.h"
43#include "../../include/linux/libcfs/libcfs.h"
44#include "ptlrpc_internal.h"
45
46
47
48
49struct nrs_core nrs_core;
50
51static int nrs_policy_init(struct ptlrpc_nrs_policy *policy)
52{
53 return policy->pol_desc->pd_ops->op_policy_init ?
54 policy->pol_desc->pd_ops->op_policy_init(policy) : 0;
55}
56
57static void nrs_policy_fini(struct ptlrpc_nrs_policy *policy)
58{
59 LASSERT(policy->pol_ref == 0);
60 LASSERT(policy->pol_req_queued == 0);
61
62 if (policy->pol_desc->pd_ops->op_policy_fini)
63 policy->pol_desc->pd_ops->op_policy_fini(policy);
64}
65
66static int nrs_policy_ctl_locked(struct ptlrpc_nrs_policy *policy,
67 enum ptlrpc_nrs_ctl opc, void *arg)
68{
69
70
71
72
73
74
75 if (policy->pol_state == NRS_POL_STATE_STOPPED)
76 return -ENODEV;
77
78 return policy->pol_desc->pd_ops->op_policy_ctl ?
79 policy->pol_desc->pd_ops->op_policy_ctl(policy, opc, arg) :
80 -ENOSYS;
81}
82
83static void nrs_policy_stop0(struct ptlrpc_nrs_policy *policy)
84{
85 struct ptlrpc_nrs *nrs = policy->pol_nrs;
86
87 if (policy->pol_desc->pd_ops->op_policy_stop) {
88 spin_unlock(&nrs->nrs_lock);
89
90 policy->pol_desc->pd_ops->op_policy_stop(policy);
91
92 spin_lock(&nrs->nrs_lock);
93 }
94
95 LASSERT(list_empty(&policy->pol_list_queued));
96 LASSERT(policy->pol_req_queued == 0 &&
97 policy->pol_req_started == 0);
98
99 policy->pol_private = NULL;
100
101 policy->pol_state = NRS_POL_STATE_STOPPED;
102
103 if (atomic_dec_and_test(&policy->pol_desc->pd_refs))
104 module_put(policy->pol_desc->pd_owner);
105}
106
107static int nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy)
108{
109 struct ptlrpc_nrs *nrs = policy->pol_nrs;
110
111 if (nrs->nrs_policy_fallback == policy && !nrs->nrs_stopping)
112 return -EPERM;
113
114 if (policy->pol_state == NRS_POL_STATE_STARTING)
115 return -EAGAIN;
116
117
118 if (policy->pol_state != NRS_POL_STATE_STARTED)
119 return 0;
120
121 policy->pol_state = NRS_POL_STATE_STOPPING;
122
123
124 if (nrs->nrs_policy_primary == policy) {
125 nrs->nrs_policy_primary = NULL;
126
127 } else {
128 LASSERT(nrs->nrs_policy_fallback == policy);
129 nrs->nrs_policy_fallback = NULL;
130 }
131
132
133 if (policy->pol_ref == 1)
134 nrs_policy_stop0(policy);
135
136 return 0;
137}
138
139
140
141
142
143
144
145
146static void nrs_policy_stop_primary(struct ptlrpc_nrs *nrs)
147{
148 struct ptlrpc_nrs_policy *tmp = nrs->nrs_policy_primary;
149
150 if (!tmp)
151 return;
152
153 nrs->nrs_policy_primary = NULL;
154
155 LASSERT(tmp->pol_state == NRS_POL_STATE_STARTED);
156 tmp->pol_state = NRS_POL_STATE_STOPPING;
157
158 if (tmp->pol_ref == 0)
159 nrs_policy_stop0(tmp);
160}
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy)
182{
183 struct ptlrpc_nrs *nrs = policy->pol_nrs;
184 int rc = 0;
185
186
187
188
189
190 if (nrs->nrs_policy_starting)
191 return -EAGAIN;
192
193 LASSERT(policy->pol_state != NRS_POL_STATE_STARTING);
194
195 if (policy->pol_state == NRS_POL_STATE_STOPPING)
196 return -EAGAIN;
197
198 if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
199
200
201
202
203
204
205 if (policy == nrs->nrs_policy_fallback) {
206 nrs_policy_stop_primary(nrs);
207 return 0;
208 }
209
210
211
212
213
214
215
216 LASSERT(!nrs->nrs_policy_fallback);
217 } else {
218
219
220
221 if (!nrs->nrs_policy_fallback)
222 return -EPERM;
223
224 if (policy->pol_state == NRS_POL_STATE_STARTED)
225 return 0;
226 }
227
228
229
230
231
232 if (atomic_inc_return(&policy->pol_desc->pd_refs) == 1 &&
233 !try_module_get(policy->pol_desc->pd_owner)) {
234 atomic_dec(&policy->pol_desc->pd_refs);
235 CERROR("NRS: cannot get module for policy %s; is it alive?\n",
236 policy->pol_desc->pd_name);
237 return -ENODEV;
238 }
239
240
241
242
243 nrs->nrs_policy_starting = 1;
244
245 policy->pol_state = NRS_POL_STATE_STARTING;
246
247 if (policy->pol_desc->pd_ops->op_policy_start) {
248 spin_unlock(&nrs->nrs_lock);
249
250 rc = policy->pol_desc->pd_ops->op_policy_start(policy);
251
252 spin_lock(&nrs->nrs_lock);
253 if (rc != 0) {
254 if (atomic_dec_and_test(&policy->pol_desc->pd_refs))
255 module_put(policy->pol_desc->pd_owner);
256
257 policy->pol_state = NRS_POL_STATE_STOPPED;
258 goto out;
259 }
260 }
261
262 policy->pol_state = NRS_POL_STATE_STARTED;
263
264 if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
265
266
267
268 nrs->nrs_policy_fallback = policy;
269 } else {
270
271
272
273 nrs_policy_stop_primary(nrs);
274
275
276
277
278 nrs->nrs_policy_primary = policy;
279 }
280
281out:
282 nrs->nrs_policy_starting = 0;
283
284 return rc;
285}
286
287
288
289
290static inline void nrs_policy_get_locked(struct ptlrpc_nrs_policy *policy)
291{
292 policy->pol_ref++;
293}
294
295
296
297
298
299
300
301static void nrs_policy_put_locked(struct ptlrpc_nrs_policy *policy)
302{
303 LASSERT(policy->pol_ref > 0);
304
305 policy->pol_ref--;
306 if (unlikely(policy->pol_ref == 0 &&
307 policy->pol_state == NRS_POL_STATE_STOPPING))
308 nrs_policy_stop0(policy);
309}
310
311static void nrs_policy_put(struct ptlrpc_nrs_policy *policy)
312{
313 spin_lock(&policy->pol_nrs->nrs_lock);
314 nrs_policy_put_locked(policy);
315 spin_unlock(&policy->pol_nrs->nrs_lock);
316}
317
318
319
320
321static struct ptlrpc_nrs_policy *nrs_policy_find_locked(struct ptlrpc_nrs *nrs,
322 char *name)
323{
324 struct ptlrpc_nrs_policy *tmp;
325
326 list_for_each_entry(tmp, &nrs->nrs_policy_list, pol_list) {
327 if (strncmp(tmp->pol_desc->pd_name, name,
328 NRS_POL_NAME_MAX) == 0) {
329 nrs_policy_get_locked(tmp);
330 return tmp;
331 }
332 }
333 return NULL;
334}
335
336
337
338
339
340static void nrs_resource_put(struct ptlrpc_nrs_resource *res)
341{
342 struct ptlrpc_nrs_policy *policy = res->res_policy;
343
344 if (policy->pol_desc->pd_ops->op_res_put) {
345 struct ptlrpc_nrs_resource *parent;
346
347 for (; res; res = parent) {
348 parent = res->res_parent;
349 policy->pol_desc->pd_ops->op_res_put(policy, res);
350 }
351 }
352}
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370static
371struct ptlrpc_nrs_resource *nrs_resource_get(struct ptlrpc_nrs_policy *policy,
372 struct ptlrpc_nrs_request *nrq,
373 bool moving_req)
374{
375
376
377
378 struct ptlrpc_nrs_resource *res = NULL;
379 struct ptlrpc_nrs_resource *tmp = NULL;
380 int rc;
381
382 while (1) {
383 rc = policy->pol_desc->pd_ops->op_res_get(policy, nrq, res,
384 &tmp, moving_req);
385 if (rc < 0) {
386 if (res)
387 nrs_resource_put(res);
388 return NULL;
389 }
390
391 tmp->res_parent = res;
392 tmp->res_policy = policy;
393 res = tmp;
394 tmp = NULL;
395
396
397
398
399 if (rc > 0)
400 return res;
401 }
402}
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs,
422 struct ptlrpc_nrs_request *nrq,
423 struct ptlrpc_nrs_resource **resp,
424 bool moving_req)
425{
426 struct ptlrpc_nrs_policy *primary = NULL;
427 struct ptlrpc_nrs_policy *fallback = NULL;
428
429 memset(resp, 0, sizeof(resp[0]) * NRS_RES_MAX);
430
431
432
433
434 spin_lock(&nrs->nrs_lock);
435
436 fallback = nrs->nrs_policy_fallback;
437 nrs_policy_get_locked(fallback);
438
439 primary = nrs->nrs_policy_primary;
440 if (primary)
441 nrs_policy_get_locked(primary);
442
443 spin_unlock(&nrs->nrs_lock);
444
445
446
447
448 resp[NRS_RES_FALLBACK] = nrs_resource_get(fallback, nrq, moving_req);
449 LASSERT(resp[NRS_RES_FALLBACK]);
450
451 if (primary) {
452 resp[NRS_RES_PRIMARY] = nrs_resource_get(primary, nrq,
453 moving_req);
454
455
456
457
458
459
460 if (!resp[NRS_RES_PRIMARY])
461 nrs_policy_put(primary);
462 }
463}
464
465
466
467
468
469
470
471
472
473
474static void nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp)
475{
476 struct ptlrpc_nrs_policy *pols[NRS_RES_MAX];
477 int i;
478
479 for (i = 0; i < NRS_RES_MAX; i++) {
480 if (resp[i]) {
481 pols[i] = resp[i]->res_policy;
482 nrs_resource_put(resp[i]);
483 resp[i] = NULL;
484 } else {
485 pols[i] = NULL;
486 }
487 }
488
489 for (i = 0; i < NRS_RES_MAX; i++) {
490 if (pols[i])
491 nrs_policy_put(pols[i]);
492 }
493}
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511static inline
512struct ptlrpc_nrs_request *nrs_request_get(struct ptlrpc_nrs_policy *policy,
513 bool peek, bool force)
514{
515 struct ptlrpc_nrs_request *nrq;
516
517 LASSERT(policy->pol_req_queued > 0);
518
519 nrq = policy->pol_desc->pd_ops->op_req_get(policy, peek, force);
520
521 LASSERT(ergo(nrq, nrs_request_policy(nrq) == policy));
522
523 return nrq;
524}
525
526
527
528
529
530
531
532
533
534
535
536static inline void nrs_request_enqueue(struct ptlrpc_nrs_request *nrq)
537{
538 struct ptlrpc_nrs_policy *policy;
539 int rc;
540 int i;
541
542
543
544
545
546 for (i = NRS_RES_MAX - 1; i >= 0; i--) {
547 if (!nrq->nr_res_ptrs[i])
548 continue;
549
550 nrq->nr_res_idx = i;
551 policy = nrq->nr_res_ptrs[i]->res_policy;
552
553 rc = policy->pol_desc->pd_ops->op_req_enqueue(policy, nrq);
554 if (rc == 0) {
555 policy->pol_nrs->nrs_req_queued++;
556 policy->pol_req_queued++;
557 return;
558 }
559 }
560
561
562
563
564
565 LBUG();
566}
567
568
569
570
571
572
573
574
575
576static inline void nrs_request_stop(struct ptlrpc_nrs_request *nrq)
577{
578 struct ptlrpc_nrs_policy *policy = nrs_request_policy(nrq);
579
580 if (policy->pol_desc->pd_ops->op_req_stop)
581 policy->pol_desc->pd_ops->op_req_stop(policy, nrq);
582
583 LASSERT(policy->pol_nrs->nrs_req_started > 0);
584 LASSERT(policy->pol_req_started > 0);
585
586 policy->pol_nrs->nrs_req_started--;
587 policy->pol_req_started--;
588}
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608static int nrs_policy_ctl(struct ptlrpc_nrs *nrs, char *name,
609 enum ptlrpc_nrs_ctl opc, void *arg)
610{
611 struct ptlrpc_nrs_policy *policy;
612 int rc = 0;
613
614 spin_lock(&nrs->nrs_lock);
615
616 policy = nrs_policy_find_locked(nrs, name);
617 if (!policy) {
618 rc = -ENOENT;
619 goto out;
620 }
621
622 switch (opc) {
623
624
625
626
627 default:
628 rc = nrs_policy_ctl_locked(policy, opc, arg);
629 break;
630
631
632
633
634 case PTLRPC_NRS_CTL_START:
635 rc = nrs_policy_start_locked(policy);
636 break;
637 }
638out:
639 if (policy)
640 nrs_policy_put_locked(policy);
641
642 spin_unlock(&nrs->nrs_lock);
643
644 return rc;
645}
646
647
648
649
650
651
652
653
654
655
656
657static int nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name)
658{
659 struct ptlrpc_nrs_policy *policy = NULL;
660
661 spin_lock(&nrs->nrs_lock);
662
663 policy = nrs_policy_find_locked(nrs, name);
664 if (!policy) {
665 spin_unlock(&nrs->nrs_lock);
666
667 CERROR("Can't find NRS policy %s\n", name);
668 return -ENOENT;
669 }
670
671 if (policy->pol_ref > 1) {
672 CERROR("Policy %s is busy with %d references\n", name,
673 (int)policy->pol_ref);
674 nrs_policy_put_locked(policy);
675
676 spin_unlock(&nrs->nrs_lock);
677 return -EBUSY;
678 }
679
680 LASSERT(policy->pol_req_queued == 0);
681 LASSERT(policy->pol_req_started == 0);
682
683 if (policy->pol_state != NRS_POL_STATE_STOPPED) {
684 nrs_policy_stop_locked(policy);
685 LASSERT(policy->pol_state == NRS_POL_STATE_STOPPED);
686 }
687
688 list_del(&policy->pol_list);
689 nrs->nrs_num_pols--;
690
691 nrs_policy_put_locked(policy);
692
693 spin_unlock(&nrs->nrs_lock);
694
695 nrs_policy_fini(policy);
696
697 LASSERT(!policy->pol_private);
698 kfree(policy);
699
700 return 0;
701}
702
703
704
705
706
707
708
709
710
711
712
713static int nrs_policy_register(struct ptlrpc_nrs *nrs,
714 struct ptlrpc_nrs_pol_desc *desc)
715{
716 struct ptlrpc_nrs_policy *policy;
717 struct ptlrpc_nrs_policy *tmp;
718 struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
719 int rc;
720
721 LASSERT(desc->pd_ops->op_res_get);
722 LASSERT(desc->pd_ops->op_req_get);
723 LASSERT(desc->pd_ops->op_req_enqueue);
724 LASSERT(desc->pd_ops->op_req_dequeue);
725 LASSERT(desc->pd_compat);
726
727 policy = kzalloc_node(sizeof(*policy), GFP_NOFS,
728 cfs_cpt_spread_node(svcpt->scp_service->srv_cptable,
729 svcpt->scp_cpt));
730 if (!policy)
731 return -ENOMEM;
732
733 policy->pol_nrs = nrs;
734 policy->pol_desc = desc;
735 policy->pol_state = NRS_POL_STATE_STOPPED;
736 policy->pol_flags = desc->pd_flags;
737
738 INIT_LIST_HEAD(&policy->pol_list);
739 INIT_LIST_HEAD(&policy->pol_list_queued);
740
741 rc = nrs_policy_init(policy);
742 if (rc != 0) {
743 kfree(policy);
744 return rc;
745 }
746
747 spin_lock(&nrs->nrs_lock);
748
749 tmp = nrs_policy_find_locked(nrs, policy->pol_desc->pd_name);
750 if (tmp) {
751 CERROR("NRS policy %s has been registered, can't register it for %s\n",
752 policy->pol_desc->pd_name,
753 svcpt->scp_service->srv_name);
754 nrs_policy_put_locked(tmp);
755
756 spin_unlock(&nrs->nrs_lock);
757 nrs_policy_fini(policy);
758 kfree(policy);
759
760 return -EEXIST;
761 }
762
763 list_add_tail(&policy->pol_list, &nrs->nrs_policy_list);
764 nrs->nrs_num_pols++;
765
766 if (policy->pol_flags & PTLRPC_NRS_FL_REG_START)
767 rc = nrs_policy_start_locked(policy);
768
769 spin_unlock(&nrs->nrs_lock);
770
771 if (rc != 0)
772 (void) nrs_policy_unregister(nrs, policy->pol_desc->pd_name);
773
774 return rc;
775}
776
777
778
779
780
781
782
783static void ptlrpc_nrs_req_add_nolock(struct ptlrpc_request *req)
784{
785 struct ptlrpc_nrs_policy *policy;
786
787 LASSERT(req->rq_nrq.nr_initialized);
788 LASSERT(!req->rq_nrq.nr_enqueued);
789
790 nrs_request_enqueue(&req->rq_nrq);
791 req->rq_nrq.nr_enqueued = 1;
792
793 policy = nrs_request_policy(&req->rq_nrq);
794
795
796
797
798 if (unlikely(list_empty(&policy->pol_list_queued)))
799 list_add_tail(&policy->pol_list_queued,
800 &policy->pol_nrs->nrs_policy_queued);
801}
802
803
804
805
806
807
808static void ptlrpc_nrs_hpreq_add_nolock(struct ptlrpc_request *req)
809{
810 int opc = lustre_msg_get_opc(req->rq_reqmsg);
811
812 spin_lock(&req->rq_lock);
813 req->rq_hp = 1;
814 ptlrpc_nrs_req_add_nolock(req);
815 if (opc != OBD_PING)
816 DEBUG_REQ(D_NET, req, "high priority req");
817 spin_unlock(&req->rq_lock);
818}
819
820
821
822
823
824
825
826
827
828
829
830static inline bool nrs_policy_compatible(const struct ptlrpc_service *svc,
831 const struct ptlrpc_nrs_pol_desc *desc)
832{
833 return desc->pd_compat(svc, desc);
834}
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849static int nrs_register_policies_locked(struct ptlrpc_nrs *nrs)
850{
851 struct ptlrpc_nrs_pol_desc *desc;
852
853 struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
854 struct ptlrpc_service *svc = svcpt->scp_service;
855 int rc = -EINVAL;
856
857 LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
858
859 list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
860 if (nrs_policy_compatible(svc, desc)) {
861 rc = nrs_policy_register(nrs, desc);
862 if (rc != 0) {
863 CERROR("Failed to register NRS policy %s for partition %d of service %s: %d\n",
864 desc->pd_name, svcpt->scp_cpt,
865 svc->srv_name, rc);
866
867
868
869
870 break;
871 }
872 }
873 }
874
875 return rc;
876}
877
878
879
880
881
882
883
884
885
886
887
888
889
890static int nrs_svcpt_setup_locked0(struct ptlrpc_nrs *nrs,
891 struct ptlrpc_service_part *svcpt)
892{
893 enum ptlrpc_nrs_queue_type queue;
894
895 LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
896
897 if (nrs == &svcpt->scp_nrs_reg)
898 queue = PTLRPC_NRS_QUEUE_REG;
899 else if (nrs == svcpt->scp_nrs_hp)
900 queue = PTLRPC_NRS_QUEUE_HP;
901 else
902 LBUG();
903
904 nrs->nrs_svcpt = svcpt;
905 nrs->nrs_queue_type = queue;
906 spin_lock_init(&nrs->nrs_lock);
907 INIT_LIST_HEAD(&nrs->nrs_policy_list);
908 INIT_LIST_HEAD(&nrs->nrs_policy_queued);
909
910 return nrs_register_policies_locked(nrs);
911}
912
913
914
915
916
917
918
919
920
921
922static int nrs_svcpt_setup_locked(struct ptlrpc_service_part *svcpt)
923{
924 struct ptlrpc_nrs *nrs;
925 int rc;
926
927 LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
928
929
930
931
932 nrs = nrs_svcpt2nrs(svcpt, false);
933 rc = nrs_svcpt_setup_locked0(nrs, svcpt);
934 if (rc < 0)
935 goto out;
936
937
938
939
940 if (!svcpt->scp_service->srv_ops.so_hpreq_handler)
941 goto out;
942
943 svcpt->scp_nrs_hp =
944 kzalloc_node(sizeof(*svcpt->scp_nrs_hp), GFP_NOFS,
945 cfs_cpt_spread_node(svcpt->scp_service->srv_cptable,
946 svcpt->scp_cpt));
947 if (!svcpt->scp_nrs_hp) {
948 rc = -ENOMEM;
949 goto out;
950 }
951
952 nrs = nrs_svcpt2nrs(svcpt, true);
953 rc = nrs_svcpt_setup_locked0(nrs, svcpt);
954
955out:
956 return rc;
957}
958
959
960
961
962
963
964
965
966
967static void nrs_svcpt_cleanup_locked(struct ptlrpc_service_part *svcpt)
968{
969 struct ptlrpc_nrs *nrs;
970 struct ptlrpc_nrs_policy *policy;
971 struct ptlrpc_nrs_policy *tmp;
972 int rc;
973 bool hp = false;
974
975 LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
976
977again:
978 nrs = nrs_svcpt2nrs(svcpt, hp);
979 nrs->nrs_stopping = 1;
980
981 list_for_each_entry_safe(policy, tmp, &nrs->nrs_policy_list, pol_list) {
982 rc = nrs_policy_unregister(nrs, policy->pol_desc->pd_name);
983 LASSERT(rc == 0);
984 }
985
986
987
988
989 if (!hp && nrs_svcpt_has_hp(svcpt)) {
990 hp = true;
991 goto again;
992 }
993
994 if (hp)
995 kfree(nrs);
996}
997
998
999
1000
1001
1002
1003
1004
1005
1006static struct ptlrpc_nrs_pol_desc *nrs_policy_find_desc_locked(const char *name)
1007{
1008 struct ptlrpc_nrs_pol_desc *tmp;
1009
1010 list_for_each_entry(tmp, &nrs_core.nrs_policies, pd_list) {
1011 if (strncmp(tmp->pd_name, name, NRS_POL_NAME_MAX) == 0)
1012 return tmp;
1013 }
1014 return NULL;
1015}
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029static int nrs_policy_unregister_locked(struct ptlrpc_nrs_pol_desc *desc)
1030{
1031 struct ptlrpc_nrs *nrs;
1032 struct ptlrpc_service *svc;
1033 struct ptlrpc_service_part *svcpt;
1034 int i;
1035 int rc = 0;
1036
1037 LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
1038 LASSERT(mutex_is_locked(&ptlrpc_all_services_mutex));
1039
1040 list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
1041
1042 if (!nrs_policy_compatible(svc, desc) ||
1043 unlikely(svc->srv_is_stopping))
1044 continue;
1045
1046 ptlrpc_service_for_each_part(svcpt, i, svc) {
1047 bool hp = false;
1048
1049again:
1050 nrs = nrs_svcpt2nrs(svcpt, hp);
1051 rc = nrs_policy_unregister(nrs, desc->pd_name);
1052
1053
1054
1055
1056 if (rc == -ENOENT) {
1057 rc = 0;
1058 } else if (rc != 0) {
1059 CERROR("Failed to unregister NRS policy %s for partition %d of service %s: %d\n",
1060 desc->pd_name, svcpt->scp_cpt,
1061 svcpt->scp_service->srv_name, rc);
1062 return rc;
1063 }
1064
1065 if (!hp && nrs_svc_has_hp(svc)) {
1066 hp = true;
1067 goto again;
1068 }
1069 }
1070
1071 if (desc->pd_ops->op_lprocfs_fini)
1072 desc->pd_ops->op_lprocfs_fini(svc);
1073 }
1074
1075 return rc;
1076}
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093static int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf)
1094{
1095 struct ptlrpc_service *svc;
1096 struct ptlrpc_nrs_pol_desc *desc;
1097 size_t len;
1098 int rc = 0;
1099
1100 LASSERT(conf->nc_ops);
1101 LASSERT(conf->nc_compat);
1102 LASSERT(ergo(conf->nc_compat == nrs_policy_compat_one,
1103 conf->nc_compat_svc_name));
1104 LASSERT(ergo((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) != 0,
1105 conf->nc_owner));
1106
1107 conf->nc_name[NRS_POL_NAME_MAX - 1] = '\0';
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118 if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) &&
1119 (conf->nc_flags & (PTLRPC_NRS_FL_FALLBACK |
1120 PTLRPC_NRS_FL_REG_START))) {
1121 CERROR("NRS: failing to register policy %s. Please check policy flags; external policies cannot act as fallback policies, or be started immediately upon registration without interaction with lprocfs\n",
1122 conf->nc_name);
1123 return -EINVAL;
1124 }
1125
1126 mutex_lock(&nrs_core.nrs_mutex);
1127
1128 if (nrs_policy_find_desc_locked(conf->nc_name)) {
1129 CERROR("NRS: failing to register policy %s which has already been registered with NRS core!\n",
1130 conf->nc_name);
1131 rc = -EEXIST;
1132 goto fail;
1133 }
1134
1135 desc = kzalloc(sizeof(*desc), GFP_NOFS);
1136 if (!desc) {
1137 rc = -ENOMEM;
1138 goto fail;
1139 }
1140
1141 len = strlcpy(desc->pd_name, conf->nc_name, sizeof(desc->pd_name));
1142 if (len >= sizeof(desc->pd_name)) {
1143 kfree(desc);
1144 rc = -E2BIG;
1145 goto fail;
1146 }
1147 desc->pd_ops = conf->nc_ops;
1148 desc->pd_compat = conf->nc_compat;
1149 desc->pd_compat_svc_name = conf->nc_compat_svc_name;
1150 if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) != 0)
1151 desc->pd_owner = conf->nc_owner;
1152 desc->pd_flags = conf->nc_flags;
1153 atomic_set(&desc->pd_refs, 0);
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163 if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) == 0)
1164 goto internal;
1165
1166
1167
1168
1169 mutex_lock(&ptlrpc_all_services_mutex);
1170
1171 list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
1172 struct ptlrpc_service_part *svcpt;
1173 int i;
1174 int rc2;
1175
1176 if (!nrs_policy_compatible(svc, desc) ||
1177 unlikely(svc->srv_is_stopping))
1178 continue;
1179
1180 ptlrpc_service_for_each_part(svcpt, i, svc) {
1181 struct ptlrpc_nrs *nrs;
1182 bool hp = false;
1183again:
1184 nrs = nrs_svcpt2nrs(svcpt, hp);
1185 rc = nrs_policy_register(nrs, desc);
1186 if (rc != 0) {
1187 CERROR("Failed to register NRS policy %s for partition %d of service %s: %d\n",
1188 desc->pd_name, svcpt->scp_cpt,
1189 svcpt->scp_service->srv_name, rc);
1190
1191 rc2 = nrs_policy_unregister_locked(desc);
1192
1193
1194
1195 LASSERT(rc2 == 0);
1196 mutex_unlock(&ptlrpc_all_services_mutex);
1197 kfree(desc);
1198 goto fail;
1199 }
1200
1201 if (!hp && nrs_svc_has_hp(svc)) {
1202 hp = true;
1203 goto again;
1204 }
1205 }
1206
1207
1208
1209
1210
1211 if (desc->pd_ops->op_lprocfs_init) {
1212 rc = desc->pd_ops->op_lprocfs_init(svc);
1213 if (rc != 0) {
1214 rc2 = nrs_policy_unregister_locked(desc);
1215
1216
1217
1218 LASSERT(rc2 == 0);
1219 mutex_unlock(&ptlrpc_all_services_mutex);
1220 kfree(desc);
1221 goto fail;
1222 }
1223 }
1224 }
1225
1226 mutex_unlock(&ptlrpc_all_services_mutex);
1227internal:
1228 list_add_tail(&desc->pd_list, &nrs_core.nrs_policies);
1229fail:
1230 mutex_unlock(&nrs_core.nrs_mutex);
1231
1232 return rc;
1233}
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249int ptlrpc_service_nrs_setup(struct ptlrpc_service *svc)
1250{
1251 struct ptlrpc_service_part *svcpt;
1252 const struct ptlrpc_nrs_pol_desc *desc;
1253 int i;
1254 int rc = 0;
1255
1256 mutex_lock(&nrs_core.nrs_mutex);
1257
1258
1259
1260
1261 ptlrpc_service_for_each_part(svcpt, i, svc) {
1262 rc = nrs_svcpt_setup_locked(svcpt);
1263 if (rc != 0)
1264 goto failed;
1265 }
1266
1267
1268
1269
1270
1271 list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
1272 if (!nrs_policy_compatible(svc, desc))
1273 continue;
1274
1275 if (desc->pd_ops->op_lprocfs_init) {
1276 rc = desc->pd_ops->op_lprocfs_init(svc);
1277 if (rc != 0)
1278 goto failed;
1279 }
1280 }
1281
1282failed:
1283
1284 mutex_unlock(&nrs_core.nrs_mutex);
1285
1286 return rc;
1287}
1288
1289
1290
1291
1292
1293
1294void ptlrpc_service_nrs_cleanup(struct ptlrpc_service *svc)
1295{
1296 struct ptlrpc_service_part *svcpt;
1297 const struct ptlrpc_nrs_pol_desc *desc;
1298 int i;
1299
1300 mutex_lock(&nrs_core.nrs_mutex);
1301
1302
1303
1304
1305 ptlrpc_service_for_each_part(svcpt, i, svc)
1306 nrs_svcpt_cleanup_locked(svcpt);
1307
1308
1309
1310
1311
1312 list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
1313 if (!nrs_policy_compatible(svc, desc))
1314 continue;
1315
1316 if (desc->pd_ops->op_lprocfs_fini)
1317 desc->pd_ops->op_lprocfs_fini(svc);
1318 }
1319
1320 mutex_unlock(&nrs_core.nrs_mutex);
1321}
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334void ptlrpc_nrs_req_initialize(struct ptlrpc_service_part *svcpt,
1335 struct ptlrpc_request *req, bool hp)
1336{
1337 struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
1338
1339 memset(&req->rq_nrq, 0, sizeof(req->rq_nrq));
1340 nrs_resource_get_safe(nrs, &req->rq_nrq, req->rq_nrq.nr_res_ptrs,
1341 false);
1342
1343
1344
1345
1346
1347 req->rq_nrq.nr_initialized = 1;
1348}
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358void ptlrpc_nrs_req_finalize(struct ptlrpc_request *req)
1359{
1360 if (req->rq_nrq.nr_initialized) {
1361 nrs_resource_put_safe(req->rq_nrq.nr_res_ptrs);
1362
1363
1364
1365 req->rq_nrq.nr_finalized = 1;
1366 }
1367}
1368
1369void ptlrpc_nrs_req_stop_nolock(struct ptlrpc_request *req)
1370{
1371 if (req->rq_nrq.nr_started)
1372 nrs_request_stop(&req->rq_nrq);
1373}
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384void ptlrpc_nrs_req_add(struct ptlrpc_service_part *svcpt,
1385 struct ptlrpc_request *req, bool hp)
1386{
1387 spin_lock(&svcpt->scp_req_lock);
1388
1389 if (hp)
1390 ptlrpc_nrs_hpreq_add_nolock(req);
1391 else
1392 ptlrpc_nrs_req_add_nolock(req);
1393
1394 spin_unlock(&svcpt->scp_req_lock);
1395}
1396
1397static void nrs_request_removed(struct ptlrpc_nrs_policy *policy)
1398{
1399 LASSERT(policy->pol_nrs->nrs_req_queued > 0);
1400 LASSERT(policy->pol_req_queued > 0);
1401
1402 policy->pol_nrs->nrs_req_queued--;
1403 policy->pol_req_queued--;
1404
1405
1406
1407
1408
1409 if (unlikely(policy->pol_req_queued == 0)) {
1410 list_del_init(&policy->pol_list_queued);
1411
1412
1413
1414
1415
1416
1417 } else if (policy->pol_req_queued != policy->pol_nrs->nrs_req_queued) {
1418 LASSERT(policy->pol_req_queued <
1419 policy->pol_nrs->nrs_req_queued);
1420
1421 list_move_tail(&policy->pol_list_queued,
1422 &policy->pol_nrs->nrs_policy_queued);
1423 }
1424}
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442struct ptlrpc_request *
1443ptlrpc_nrs_req_get_nolock0(struct ptlrpc_service_part *svcpt, bool hp,
1444 bool peek, bool force)
1445{
1446 struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
1447 struct ptlrpc_nrs_policy *policy;
1448 struct ptlrpc_nrs_request *nrq;
1449
1450
1451
1452
1453
1454 list_for_each_entry(policy, &nrs->nrs_policy_queued, pol_list_queued) {
1455 nrq = nrs_request_get(policy, peek, force);
1456 if (nrq) {
1457 if (likely(!peek)) {
1458 nrq->nr_started = 1;
1459
1460 policy->pol_req_started++;
1461 policy->pol_nrs->nrs_req_started++;
1462
1463 nrs_request_removed(policy);
1464 }
1465
1466 return container_of(nrq, struct ptlrpc_request, rq_nrq);
1467 }
1468 }
1469
1470 return NULL;
1471}
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486bool ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp)
1487{
1488 struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
1489
1490 return nrs->nrs_req_queued > 0;
1491};
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519int ptlrpc_nrs_policy_control(const struct ptlrpc_service *svc,
1520 enum ptlrpc_nrs_queue_type queue, char *name,
1521 enum ptlrpc_nrs_ctl opc, bool single, void *arg)
1522{
1523 struct ptlrpc_service_part *svcpt;
1524 int i;
1525 int rc = 0;
1526
1527 LASSERT(opc != PTLRPC_NRS_CTL_INVALID);
1528
1529 if ((queue & PTLRPC_NRS_QUEUE_BOTH) == 0)
1530 return -EINVAL;
1531
1532 ptlrpc_service_for_each_part(svcpt, i, svc) {
1533 if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1534 rc = nrs_policy_ctl(nrs_svcpt2nrs(svcpt, false), name,
1535 opc, arg);
1536 if (rc != 0 || (queue == PTLRPC_NRS_QUEUE_REG &&
1537 single))
1538 goto out;
1539 }
1540
1541 if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1542
1543
1544
1545
1546
1547
1548
1549
1550 rc = nrs_policy_ctl(nrs_svcpt2nrs(svcpt, true), name,
1551 opc, arg);
1552 if (rc != 0 || single)
1553 goto out;
1554 }
1555 }
1556out:
1557 return rc;
1558}
1559
1560
1561extern struct ptlrpc_nrs_pol_conf nrs_conf_fifo;
1562
1563
1564
1565
1566
1567
1568
1569
1570int ptlrpc_nrs_init(void)
1571{
1572 int rc;
1573
1574 mutex_init(&nrs_core.nrs_mutex);
1575 INIT_LIST_HEAD(&nrs_core.nrs_policies);
1576
1577 rc = ptlrpc_nrs_policy_register(&nrs_conf_fifo);
1578 if (rc != 0)
1579 goto fail;
1580
1581 return rc;
1582fail:
1583
1584
1585
1586
1587 ptlrpc_nrs_fini();
1588
1589 return rc;
1590}
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601void ptlrpc_nrs_fini(void)
1602{
1603 struct ptlrpc_nrs_pol_desc *desc;
1604 struct ptlrpc_nrs_pol_desc *tmp;
1605
1606 list_for_each_entry_safe(desc, tmp, &nrs_core.nrs_policies, pd_list) {
1607 list_del_init(&desc->pd_list);
1608 kfree(desc);
1609 }
1610}
1611
1612
1613