1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38#define DEBUG_SUBSYSTEM S_RPC
39#include "../include/obd_support.h"
40#include "../include/obd_class.h"
41#include "../include/lustre_net.h"
42#include "../include/lprocfs_status.h"
43#include "../../include/linux/libcfs/libcfs.h"
44#include "ptlrpc_internal.h"
45
46
47
48
49struct nrs_core nrs_core;
50
51static int nrs_policy_init(struct ptlrpc_nrs_policy *policy)
52{
53 return policy->pol_desc->pd_ops->op_policy_init ?
54 policy->pol_desc->pd_ops->op_policy_init(policy) : 0;
55}
56
57static void nrs_policy_fini(struct ptlrpc_nrs_policy *policy)
58{
59 LASSERT(policy->pol_ref == 0);
60 LASSERT(policy->pol_req_queued == 0);
61
62 if (policy->pol_desc->pd_ops->op_policy_fini)
63 policy->pol_desc->pd_ops->op_policy_fini(policy);
64}
65
66static int nrs_policy_ctl_locked(struct ptlrpc_nrs_policy *policy,
67 enum ptlrpc_nrs_ctl opc, void *arg)
68{
69
70
71
72
73
74
75 if (policy->pol_state == NRS_POL_STATE_STOPPED)
76 return -ENODEV;
77
78 return policy->pol_desc->pd_ops->op_policy_ctl ?
79 policy->pol_desc->pd_ops->op_policy_ctl(policy, opc, arg) :
80 -ENOSYS;
81}
82
83static void nrs_policy_stop0(struct ptlrpc_nrs_policy *policy)
84{
85 struct ptlrpc_nrs *nrs = policy->pol_nrs;
86
87 if (policy->pol_desc->pd_ops->op_policy_stop) {
88 spin_unlock(&nrs->nrs_lock);
89
90 policy->pol_desc->pd_ops->op_policy_stop(policy);
91
92 spin_lock(&nrs->nrs_lock);
93 }
94
95 LASSERT(list_empty(&policy->pol_list_queued));
96 LASSERT(policy->pol_req_queued == 0 &&
97 policy->pol_req_started == 0);
98
99 policy->pol_private = NULL;
100
101 policy->pol_state = NRS_POL_STATE_STOPPED;
102
103 if (atomic_dec_and_test(&policy->pol_desc->pd_refs))
104 module_put(policy->pol_desc->pd_owner);
105}
106
107static int nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy)
108{
109 struct ptlrpc_nrs *nrs = policy->pol_nrs;
110
111 if (nrs->nrs_policy_fallback == policy && !nrs->nrs_stopping)
112 return -EPERM;
113
114 if (policy->pol_state == NRS_POL_STATE_STARTING)
115 return -EAGAIN;
116
117
118 if (policy->pol_state != NRS_POL_STATE_STARTED)
119 return 0;
120
121 policy->pol_state = NRS_POL_STATE_STOPPING;
122
123
124 if (nrs->nrs_policy_primary == policy) {
125 nrs->nrs_policy_primary = NULL;
126
127 } else {
128 LASSERT(nrs->nrs_policy_fallback == policy);
129 nrs->nrs_policy_fallback = NULL;
130 }
131
132
133 if (policy->pol_ref == 1)
134 nrs_policy_stop0(policy);
135
136 return 0;
137}
138
139
140
141
142
143
144
145
146static void nrs_policy_stop_primary(struct ptlrpc_nrs *nrs)
147{
148 struct ptlrpc_nrs_policy *tmp = nrs->nrs_policy_primary;
149
150 if (!tmp)
151 return;
152
153 nrs->nrs_policy_primary = NULL;
154
155 LASSERT(tmp->pol_state == NRS_POL_STATE_STARTED);
156 tmp->pol_state = NRS_POL_STATE_STOPPING;
157
158 if (tmp->pol_ref == 0)
159 nrs_policy_stop0(tmp);
160}
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy)
182{
183 struct ptlrpc_nrs *nrs = policy->pol_nrs;
184 int rc = 0;
185
186
187
188
189
190 if (nrs->nrs_policy_starting)
191 return -EAGAIN;
192
193 LASSERT(policy->pol_state != NRS_POL_STATE_STARTING);
194
195 if (policy->pol_state == NRS_POL_STATE_STOPPING)
196 return -EAGAIN;
197
198 if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
199
200
201
202
203
204
205 if (policy == nrs->nrs_policy_fallback) {
206 nrs_policy_stop_primary(nrs);
207 return 0;
208 }
209
210
211
212
213
214
215
216 LASSERT(!nrs->nrs_policy_fallback);
217 } else {
218
219
220
221 if (!nrs->nrs_policy_fallback)
222 return -EPERM;
223
224 if (policy->pol_state == NRS_POL_STATE_STARTED)
225 return 0;
226 }
227
228
229
230
231
232 if (atomic_inc_return(&policy->pol_desc->pd_refs) == 1 &&
233 !try_module_get(policy->pol_desc->pd_owner)) {
234 atomic_dec(&policy->pol_desc->pd_refs);
235 CERROR("NRS: cannot get module for policy %s; is it alive?\n",
236 policy->pol_desc->pd_name);
237 return -ENODEV;
238 }
239
240
241
242
243 nrs->nrs_policy_starting = 1;
244
245 policy->pol_state = NRS_POL_STATE_STARTING;
246
247 if (policy->pol_desc->pd_ops->op_policy_start) {
248 spin_unlock(&nrs->nrs_lock);
249
250 rc = policy->pol_desc->pd_ops->op_policy_start(policy);
251
252 spin_lock(&nrs->nrs_lock);
253 if (rc != 0) {
254 if (atomic_dec_and_test(&policy->pol_desc->pd_refs))
255 module_put(policy->pol_desc->pd_owner);
256
257 policy->pol_state = NRS_POL_STATE_STOPPED;
258 goto out;
259 }
260 }
261
262 policy->pol_state = NRS_POL_STATE_STARTED;
263
264 if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
265
266
267
268 nrs->nrs_policy_fallback = policy;
269 } else {
270
271
272
273 nrs_policy_stop_primary(nrs);
274
275
276
277
278 nrs->nrs_policy_primary = policy;
279 }
280
281out:
282 nrs->nrs_policy_starting = 0;
283
284 return rc;
285}
286
287
288
289
290static inline void nrs_policy_get_locked(struct ptlrpc_nrs_policy *policy)
291{
292 policy->pol_ref++;
293}
294
295
296
297
298
299
300
301static void nrs_policy_put_locked(struct ptlrpc_nrs_policy *policy)
302{
303 LASSERT(policy->pol_ref > 0);
304
305 policy->pol_ref--;
306 if (unlikely(policy->pol_ref == 0 &&
307 policy->pol_state == NRS_POL_STATE_STOPPING))
308 nrs_policy_stop0(policy);
309}
310
311static void nrs_policy_put(struct ptlrpc_nrs_policy *policy)
312{
313 spin_lock(&policy->pol_nrs->nrs_lock);
314 nrs_policy_put_locked(policy);
315 spin_unlock(&policy->pol_nrs->nrs_lock);
316}
317
318
319
320
321static struct ptlrpc_nrs_policy *nrs_policy_find_locked(struct ptlrpc_nrs *nrs,
322 char *name)
323{
324 struct ptlrpc_nrs_policy *tmp;
325
326 list_for_each_entry(tmp, &nrs->nrs_policy_list, pol_list) {
327 if (strncmp(tmp->pol_desc->pd_name, name,
328 NRS_POL_NAME_MAX) == 0) {
329 nrs_policy_get_locked(tmp);
330 return tmp;
331 }
332 }
333 return NULL;
334}
335
336
337
338
339
340static void nrs_resource_put(struct ptlrpc_nrs_resource *res)
341{
342 struct ptlrpc_nrs_policy *policy = res->res_policy;
343
344 if (policy->pol_desc->pd_ops->op_res_put) {
345 struct ptlrpc_nrs_resource *parent;
346
347 for (; res; res = parent) {
348 parent = res->res_parent;
349 policy->pol_desc->pd_ops->op_res_put(policy, res);
350 }
351 }
352}
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370static
371struct ptlrpc_nrs_resource *nrs_resource_get(struct ptlrpc_nrs_policy *policy,
372 struct ptlrpc_nrs_request *nrq,
373 bool moving_req)
374{
375
376
377
378 struct ptlrpc_nrs_resource *res = NULL;
379 struct ptlrpc_nrs_resource *tmp = NULL;
380 int rc;
381
382 while (1) {
383 rc = policy->pol_desc->pd_ops->op_res_get(policy, nrq, res,
384 &tmp, moving_req);
385 if (rc < 0) {
386 if (res)
387 nrs_resource_put(res);
388 return NULL;
389 }
390
391 tmp->res_parent = res;
392 tmp->res_policy = policy;
393 res = tmp;
394 tmp = NULL;
395
396
397
398
399 if (rc > 0)
400 return res;
401 }
402}
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs,
422 struct ptlrpc_nrs_request *nrq,
423 struct ptlrpc_nrs_resource **resp,
424 bool moving_req)
425{
426 struct ptlrpc_nrs_policy *primary = NULL;
427 struct ptlrpc_nrs_policy *fallback = NULL;
428
429 memset(resp, 0, sizeof(resp[0]) * NRS_RES_MAX);
430
431
432
433
434 spin_lock(&nrs->nrs_lock);
435
436 fallback = nrs->nrs_policy_fallback;
437 nrs_policy_get_locked(fallback);
438
439 primary = nrs->nrs_policy_primary;
440 if (primary)
441 nrs_policy_get_locked(primary);
442
443 spin_unlock(&nrs->nrs_lock);
444
445
446
447
448 resp[NRS_RES_FALLBACK] = nrs_resource_get(fallback, nrq, moving_req);
449 LASSERT(resp[NRS_RES_FALLBACK]);
450
451 if (primary) {
452 resp[NRS_RES_PRIMARY] = nrs_resource_get(primary, nrq,
453 moving_req);
454
455
456
457
458
459
460 if (!resp[NRS_RES_PRIMARY])
461 nrs_policy_put(primary);
462 }
463}
464
465
466
467
468
469
470
471
472
473
474static void nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp)
475{
476 struct ptlrpc_nrs_policy *pols[NRS_RES_MAX];
477 int i;
478
479 for (i = 0; i < NRS_RES_MAX; i++) {
480 if (resp[i]) {
481 pols[i] = resp[i]->res_policy;
482 nrs_resource_put(resp[i]);
483 resp[i] = NULL;
484 } else {
485 pols[i] = NULL;
486 }
487 }
488
489 for (i = 0; i < NRS_RES_MAX; i++) {
490 if (pols[i])
491 nrs_policy_put(pols[i]);
492 }
493}
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511static inline
512struct ptlrpc_nrs_request *nrs_request_get(struct ptlrpc_nrs_policy *policy,
513 bool peek, bool force)
514{
515 struct ptlrpc_nrs_request *nrq;
516
517 LASSERT(policy->pol_req_queued > 0);
518
519 nrq = policy->pol_desc->pd_ops->op_req_get(policy, peek, force);
520
521 LASSERT(ergo(nrq, nrs_request_policy(nrq) == policy));
522
523 return nrq;
524}
525
526
527
528
529
530
531
532
533
534
535
536static inline void nrs_request_enqueue(struct ptlrpc_nrs_request *nrq)
537{
538 struct ptlrpc_nrs_policy *policy;
539 int rc;
540 int i;
541
542
543
544
545
546 for (i = NRS_RES_MAX - 1; i >= 0; i--) {
547 if (!nrq->nr_res_ptrs[i])
548 continue;
549
550 nrq->nr_res_idx = i;
551 policy = nrq->nr_res_ptrs[i]->res_policy;
552
553 rc = policy->pol_desc->pd_ops->op_req_enqueue(policy, nrq);
554 if (rc == 0) {
555 policy->pol_nrs->nrs_req_queued++;
556 policy->pol_req_queued++;
557 return;
558 }
559 }
560
561
562
563
564
565 LBUG();
566}
567
568
569
570
571
572
573
574
575
576static inline void nrs_request_stop(struct ptlrpc_nrs_request *nrq)
577{
578 struct ptlrpc_nrs_policy *policy = nrs_request_policy(nrq);
579
580 if (policy->pol_desc->pd_ops->op_req_stop)
581 policy->pol_desc->pd_ops->op_req_stop(policy, nrq);
582
583 LASSERT(policy->pol_nrs->nrs_req_started > 0);
584 LASSERT(policy->pol_req_started > 0);
585
586 policy->pol_nrs->nrs_req_started--;
587 policy->pol_req_started--;
588}
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608static int nrs_policy_ctl(struct ptlrpc_nrs *nrs, char *name,
609 enum ptlrpc_nrs_ctl opc, void *arg)
610{
611 struct ptlrpc_nrs_policy *policy;
612 int rc = 0;
613
614 spin_lock(&nrs->nrs_lock);
615
616 policy = nrs_policy_find_locked(nrs, name);
617 if (!policy) {
618 rc = -ENOENT;
619 goto out;
620 }
621
622 switch (opc) {
623
624
625
626
627 default:
628 rc = nrs_policy_ctl_locked(policy, opc, arg);
629 break;
630
631
632
633
634 case PTLRPC_NRS_CTL_START:
635 rc = nrs_policy_start_locked(policy);
636 break;
637 }
638out:
639 if (policy)
640 nrs_policy_put_locked(policy);
641
642 spin_unlock(&nrs->nrs_lock);
643
644 return rc;
645}
646
647
648
649
650
651
652
653
654
655
656
657static int nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name)
658{
659 struct ptlrpc_nrs_policy *policy = NULL;
660
661 spin_lock(&nrs->nrs_lock);
662
663 policy = nrs_policy_find_locked(nrs, name);
664 if (!policy) {
665 spin_unlock(&nrs->nrs_lock);
666
667 CERROR("Can't find NRS policy %s\n", name);
668 return -ENOENT;
669 }
670
671 if (policy->pol_ref > 1) {
672 CERROR("Policy %s is busy with %d references\n", name,
673 (int)policy->pol_ref);
674 nrs_policy_put_locked(policy);
675
676 spin_unlock(&nrs->nrs_lock);
677 return -EBUSY;
678 }
679
680 LASSERT(policy->pol_req_queued == 0);
681 LASSERT(policy->pol_req_started == 0);
682
683 if (policy->pol_state != NRS_POL_STATE_STOPPED) {
684 nrs_policy_stop_locked(policy);
685 LASSERT(policy->pol_state == NRS_POL_STATE_STOPPED);
686 }
687
688 list_del(&policy->pol_list);
689 nrs->nrs_num_pols--;
690
691 nrs_policy_put_locked(policy);
692
693 spin_unlock(&nrs->nrs_lock);
694
695 nrs_policy_fini(policy);
696
697 LASSERT(!policy->pol_private);
698 kfree(policy);
699
700 return 0;
701}
702
703
704
705
706
707
708
709
710
711
712
713static int nrs_policy_register(struct ptlrpc_nrs *nrs,
714 struct ptlrpc_nrs_pol_desc *desc)
715{
716 struct ptlrpc_nrs_policy *policy;
717 struct ptlrpc_nrs_policy *tmp;
718 struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
719 int rc;
720
721 LASSERT(desc->pd_ops->op_res_get);
722 LASSERT(desc->pd_ops->op_req_get);
723 LASSERT(desc->pd_ops->op_req_enqueue);
724 LASSERT(desc->pd_ops->op_req_dequeue);
725 LASSERT(desc->pd_compat);
726
727 policy = kzalloc_node(sizeof(*policy), GFP_NOFS,
728 cfs_cpt_spread_node(svcpt->scp_service->srv_cptable,
729 svcpt->scp_cpt));
730 if (!policy)
731 return -ENOMEM;
732
733 policy->pol_nrs = nrs;
734 policy->pol_desc = desc;
735 policy->pol_state = NRS_POL_STATE_STOPPED;
736 policy->pol_flags = desc->pd_flags;
737
738 INIT_LIST_HEAD(&policy->pol_list);
739 INIT_LIST_HEAD(&policy->pol_list_queued);
740
741 rc = nrs_policy_init(policy);
742 if (rc != 0) {
743 kfree(policy);
744 return rc;
745 }
746
747 spin_lock(&nrs->nrs_lock);
748
749 tmp = nrs_policy_find_locked(nrs, policy->pol_desc->pd_name);
750 if (tmp) {
751 CERROR("NRS policy %s has been registered, can't register it for %s\n",
752 policy->pol_desc->pd_name,
753 svcpt->scp_service->srv_name);
754 nrs_policy_put_locked(tmp);
755
756 spin_unlock(&nrs->nrs_lock);
757 nrs_policy_fini(policy);
758 kfree(policy);
759
760 return -EEXIST;
761 }
762
763 list_add_tail(&policy->pol_list, &nrs->nrs_policy_list);
764 nrs->nrs_num_pols++;
765
766 if (policy->pol_flags & PTLRPC_NRS_FL_REG_START)
767 rc = nrs_policy_start_locked(policy);
768
769 spin_unlock(&nrs->nrs_lock);
770
771 if (rc != 0)
772 (void)nrs_policy_unregister(nrs, policy->pol_desc->pd_name);
773
774 return rc;
775}
776
777
778
779
780
781
782
783static void ptlrpc_nrs_req_add_nolock(struct ptlrpc_request *req)
784{
785 struct ptlrpc_nrs_policy *policy;
786
787 LASSERT(req->rq_nrq.nr_initialized);
788 LASSERT(!req->rq_nrq.nr_enqueued);
789
790 nrs_request_enqueue(&req->rq_nrq);
791 req->rq_nrq.nr_enqueued = 1;
792
793 policy = nrs_request_policy(&req->rq_nrq);
794
795
796
797
798 if (unlikely(list_empty(&policy->pol_list_queued)))
799 list_add_tail(&policy->pol_list_queued,
800 &policy->pol_nrs->nrs_policy_queued);
801}
802
803
804
805
806
807
808static void ptlrpc_nrs_hpreq_add_nolock(struct ptlrpc_request *req)
809{
810 int opc = lustre_msg_get_opc(req->rq_reqmsg);
811
812 spin_lock(&req->rq_lock);
813 req->rq_hp = 1;
814 ptlrpc_nrs_req_add_nolock(req);
815 if (opc != OBD_PING)
816 DEBUG_REQ(D_NET, req, "high priority req");
817 spin_unlock(&req->rq_lock);
818}
819
820
821
822
823
824
825
826
827
828
829
830static inline bool nrs_policy_compatible(const struct ptlrpc_service *svc,
831 const struct ptlrpc_nrs_pol_desc *desc)
832{
833 return desc->pd_compat(svc, desc);
834}
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849static int nrs_register_policies_locked(struct ptlrpc_nrs *nrs)
850{
851 struct ptlrpc_nrs_pol_desc *desc;
852
853 struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
854 struct ptlrpc_service *svc = svcpt->scp_service;
855 int rc = -EINVAL;
856
857 LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
858
859 list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
860 if (nrs_policy_compatible(svc, desc)) {
861 rc = nrs_policy_register(nrs, desc);
862 if (rc != 0) {
863 CERROR("Failed to register NRS policy %s for partition %d of service %s: %d\n",
864 desc->pd_name, svcpt->scp_cpt,
865 svc->srv_name, rc);
866
867
868
869
870 break;
871 }
872 }
873 }
874
875 return rc;
876}
877
878
879
880
881
882
883
884
885
886
887
888
889
890static int nrs_svcpt_setup_locked0(struct ptlrpc_nrs *nrs,
891 struct ptlrpc_service_part *svcpt)
892{
893 enum ptlrpc_nrs_queue_type queue;
894
895 LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
896
897 if (nrs == &svcpt->scp_nrs_reg)
898 queue = PTLRPC_NRS_QUEUE_REG;
899 else if (nrs == svcpt->scp_nrs_hp)
900 queue = PTLRPC_NRS_QUEUE_HP;
901 else
902 LBUG();
903
904 nrs->nrs_svcpt = svcpt;
905 nrs->nrs_queue_type = queue;
906 spin_lock_init(&nrs->nrs_lock);
907 INIT_LIST_HEAD(&nrs->nrs_policy_list);
908 INIT_LIST_HEAD(&nrs->nrs_policy_queued);
909
910 return nrs_register_policies_locked(nrs);
911}
912
913
914
915
916
917
918
919
920
921
922static int nrs_svcpt_setup_locked(struct ptlrpc_service_part *svcpt)
923{
924 struct ptlrpc_nrs *nrs;
925 int rc;
926
927 LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
928
929
930
931
932 nrs = nrs_svcpt2nrs(svcpt, false);
933 rc = nrs_svcpt_setup_locked0(nrs, svcpt);
934 if (rc < 0)
935 goto out;
936
937
938
939
940 if (!svcpt->scp_service->srv_ops.so_hpreq_handler)
941 goto out;
942
943 svcpt->scp_nrs_hp =
944 kzalloc_node(sizeof(*svcpt->scp_nrs_hp), GFP_NOFS,
945 cfs_cpt_spread_node(svcpt->scp_service->srv_cptable,
946 svcpt->scp_cpt));
947 if (!svcpt->scp_nrs_hp) {
948 rc = -ENOMEM;
949 goto out;
950 }
951
952 nrs = nrs_svcpt2nrs(svcpt, true);
953 rc = nrs_svcpt_setup_locked0(nrs, svcpt);
954
955out:
956 return rc;
957}
958
959
960
961
962
963
964
965
966
967static void nrs_svcpt_cleanup_locked(struct ptlrpc_service_part *svcpt)
968{
969 struct ptlrpc_nrs *nrs;
970 struct ptlrpc_nrs_policy *policy;
971 struct ptlrpc_nrs_policy *tmp;
972 int rc;
973 bool hp = false;
974
975 LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
976
977again:
978
979 nrs = hp ? svcpt->scp_nrs_hp : &svcpt->scp_nrs_reg;
980
981 if (!nrs || !nrs->nrs_svcpt)
982 return;
983 nrs->nrs_stopping = 1;
984
985 list_for_each_entry_safe(policy, tmp, &nrs->nrs_policy_list, pol_list) {
986 rc = nrs_policy_unregister(nrs, policy->pol_desc->pd_name);
987 LASSERT(rc == 0);
988 }
989
990
991
992
993 if (!hp && nrs_svcpt_has_hp(svcpt)) {
994 hp = true;
995 goto again;
996 }
997
998 if (hp)
999 kfree(nrs);
1000}
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010static struct ptlrpc_nrs_pol_desc *nrs_policy_find_desc_locked(const char *name)
1011{
1012 struct ptlrpc_nrs_pol_desc *tmp;
1013
1014 list_for_each_entry(tmp, &nrs_core.nrs_policies, pd_list) {
1015 if (strncmp(tmp->pd_name, name, NRS_POL_NAME_MAX) == 0)
1016 return tmp;
1017 }
1018 return NULL;
1019}
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033static int nrs_policy_unregister_locked(struct ptlrpc_nrs_pol_desc *desc)
1034{
1035 struct ptlrpc_nrs *nrs;
1036 struct ptlrpc_service *svc;
1037 struct ptlrpc_service_part *svcpt;
1038 int i;
1039 int rc = 0;
1040
1041 LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
1042 LASSERT(mutex_is_locked(&ptlrpc_all_services_mutex));
1043
1044 list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
1045 if (!nrs_policy_compatible(svc, desc) ||
1046 unlikely(svc->srv_is_stopping))
1047 continue;
1048
1049 ptlrpc_service_for_each_part(svcpt, i, svc) {
1050 bool hp = false;
1051
1052again:
1053 nrs = nrs_svcpt2nrs(svcpt, hp);
1054 rc = nrs_policy_unregister(nrs, desc->pd_name);
1055
1056
1057
1058
1059 if (rc == -ENOENT) {
1060 rc = 0;
1061 } else if (rc != 0) {
1062 CERROR("Failed to unregister NRS policy %s for partition %d of service %s: %d\n",
1063 desc->pd_name, svcpt->scp_cpt,
1064 svcpt->scp_service->srv_name, rc);
1065 return rc;
1066 }
1067
1068 if (!hp && nrs_svc_has_hp(svc)) {
1069 hp = true;
1070 goto again;
1071 }
1072 }
1073
1074 if (desc->pd_ops->op_lprocfs_fini)
1075 desc->pd_ops->op_lprocfs_fini(svc);
1076 }
1077
1078 return rc;
1079}
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096static int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf)
1097{
1098 struct ptlrpc_service *svc;
1099 struct ptlrpc_nrs_pol_desc *desc;
1100 size_t len;
1101 int rc = 0;
1102
1103 LASSERT(conf->nc_ops);
1104 LASSERT(conf->nc_compat);
1105 LASSERT(ergo(conf->nc_compat == nrs_policy_compat_one,
1106 conf->nc_compat_svc_name));
1107 LASSERT(ergo((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) != 0,
1108 conf->nc_owner));
1109
1110 conf->nc_name[NRS_POL_NAME_MAX - 1] = '\0';
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121 if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) &&
1122 (conf->nc_flags & (PTLRPC_NRS_FL_FALLBACK |
1123 PTLRPC_NRS_FL_REG_START))) {
1124 CERROR("NRS: failing to register policy %s. Please check policy flags; external policies cannot act as fallback policies, or be started immediately upon registration without interaction with lprocfs\n",
1125 conf->nc_name);
1126 return -EINVAL;
1127 }
1128
1129 mutex_lock(&nrs_core.nrs_mutex);
1130
1131 if (nrs_policy_find_desc_locked(conf->nc_name)) {
1132 CERROR("NRS: failing to register policy %s which has already been registered with NRS core!\n",
1133 conf->nc_name);
1134 rc = -EEXIST;
1135 goto fail;
1136 }
1137
1138 desc = kzalloc(sizeof(*desc), GFP_NOFS);
1139 if (!desc) {
1140 rc = -ENOMEM;
1141 goto fail;
1142 }
1143
1144 len = strlcpy(desc->pd_name, conf->nc_name, sizeof(desc->pd_name));
1145 if (len >= sizeof(desc->pd_name)) {
1146 kfree(desc);
1147 rc = -E2BIG;
1148 goto fail;
1149 }
1150 desc->pd_ops = conf->nc_ops;
1151 desc->pd_compat = conf->nc_compat;
1152 desc->pd_compat_svc_name = conf->nc_compat_svc_name;
1153 if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) != 0)
1154 desc->pd_owner = conf->nc_owner;
1155 desc->pd_flags = conf->nc_flags;
1156 atomic_set(&desc->pd_refs, 0);
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166 if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) == 0)
1167 goto internal;
1168
1169
1170
1171
1172 mutex_lock(&ptlrpc_all_services_mutex);
1173
1174 list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
1175 struct ptlrpc_service_part *svcpt;
1176 int i;
1177 int rc2;
1178
1179 if (!nrs_policy_compatible(svc, desc) ||
1180 unlikely(svc->srv_is_stopping))
1181 continue;
1182
1183 ptlrpc_service_for_each_part(svcpt, i, svc) {
1184 struct ptlrpc_nrs *nrs;
1185 bool hp = false;
1186again:
1187 nrs = nrs_svcpt2nrs(svcpt, hp);
1188 rc = nrs_policy_register(nrs, desc);
1189 if (rc != 0) {
1190 CERROR("Failed to register NRS policy %s for partition %d of service %s: %d\n",
1191 desc->pd_name, svcpt->scp_cpt,
1192 svcpt->scp_service->srv_name, rc);
1193
1194 rc2 = nrs_policy_unregister_locked(desc);
1195
1196
1197
1198 LASSERT(rc2 == 0);
1199 mutex_unlock(&ptlrpc_all_services_mutex);
1200 kfree(desc);
1201 goto fail;
1202 }
1203
1204 if (!hp && nrs_svc_has_hp(svc)) {
1205 hp = true;
1206 goto again;
1207 }
1208 }
1209
1210
1211
1212
1213
1214 if (desc->pd_ops->op_lprocfs_init) {
1215 rc = desc->pd_ops->op_lprocfs_init(svc);
1216 if (rc != 0) {
1217 rc2 = nrs_policy_unregister_locked(desc);
1218
1219
1220
1221 LASSERT(rc2 == 0);
1222 mutex_unlock(&ptlrpc_all_services_mutex);
1223 kfree(desc);
1224 goto fail;
1225 }
1226 }
1227 }
1228
1229 mutex_unlock(&ptlrpc_all_services_mutex);
1230internal:
1231 list_add_tail(&desc->pd_list, &nrs_core.nrs_policies);
1232fail:
1233 mutex_unlock(&nrs_core.nrs_mutex);
1234
1235 return rc;
1236}
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252int ptlrpc_service_nrs_setup(struct ptlrpc_service *svc)
1253{
1254 struct ptlrpc_service_part *svcpt;
1255 const struct ptlrpc_nrs_pol_desc *desc;
1256 int i;
1257 int rc = 0;
1258
1259 mutex_lock(&nrs_core.nrs_mutex);
1260
1261
1262
1263
1264 ptlrpc_service_for_each_part(svcpt, i, svc) {
1265 rc = nrs_svcpt_setup_locked(svcpt);
1266 if (rc != 0)
1267 goto failed;
1268 }
1269
1270
1271
1272
1273
1274 list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
1275 if (!nrs_policy_compatible(svc, desc))
1276 continue;
1277
1278 if (desc->pd_ops->op_lprocfs_init) {
1279 rc = desc->pd_ops->op_lprocfs_init(svc);
1280 if (rc != 0)
1281 goto failed;
1282 }
1283 }
1284
1285failed:
1286
1287 mutex_unlock(&nrs_core.nrs_mutex);
1288
1289 return rc;
1290}
1291
1292
1293
1294
1295
1296
1297void ptlrpc_service_nrs_cleanup(struct ptlrpc_service *svc)
1298{
1299 struct ptlrpc_service_part *svcpt;
1300 const struct ptlrpc_nrs_pol_desc *desc;
1301 int i;
1302
1303 mutex_lock(&nrs_core.nrs_mutex);
1304
1305
1306
1307
1308 ptlrpc_service_for_each_part(svcpt, i, svc)
1309 nrs_svcpt_cleanup_locked(svcpt);
1310
1311
1312
1313
1314
1315 list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
1316 if (!nrs_policy_compatible(svc, desc))
1317 continue;
1318
1319 if (desc->pd_ops->op_lprocfs_fini)
1320 desc->pd_ops->op_lprocfs_fini(svc);
1321 }
1322
1323 mutex_unlock(&nrs_core.nrs_mutex);
1324}
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337void ptlrpc_nrs_req_initialize(struct ptlrpc_service_part *svcpt,
1338 struct ptlrpc_request *req, bool hp)
1339{
1340 struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
1341
1342 memset(&req->rq_nrq, 0, sizeof(req->rq_nrq));
1343 nrs_resource_get_safe(nrs, &req->rq_nrq, req->rq_nrq.nr_res_ptrs,
1344 false);
1345
1346
1347
1348
1349
1350 req->rq_nrq.nr_initialized = 1;
1351}
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361void ptlrpc_nrs_req_finalize(struct ptlrpc_request *req)
1362{
1363 if (req->rq_nrq.nr_initialized) {
1364 nrs_resource_put_safe(req->rq_nrq.nr_res_ptrs);
1365
1366
1367
1368 req->rq_nrq.nr_finalized = 1;
1369 }
1370}
1371
1372void ptlrpc_nrs_req_stop_nolock(struct ptlrpc_request *req)
1373{
1374 if (req->rq_nrq.nr_started)
1375 nrs_request_stop(&req->rq_nrq);
1376}
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387void ptlrpc_nrs_req_add(struct ptlrpc_service_part *svcpt,
1388 struct ptlrpc_request *req, bool hp)
1389{
1390 spin_lock(&svcpt->scp_req_lock);
1391
1392 if (hp)
1393 ptlrpc_nrs_hpreq_add_nolock(req);
1394 else
1395 ptlrpc_nrs_req_add_nolock(req);
1396
1397 spin_unlock(&svcpt->scp_req_lock);
1398}
1399
1400static void nrs_request_removed(struct ptlrpc_nrs_policy *policy)
1401{
1402 LASSERT(policy->pol_nrs->nrs_req_queued > 0);
1403 LASSERT(policy->pol_req_queued > 0);
1404
1405 policy->pol_nrs->nrs_req_queued--;
1406 policy->pol_req_queued--;
1407
1408
1409
1410
1411
1412 if (unlikely(policy->pol_req_queued == 0)) {
1413 list_del_init(&policy->pol_list_queued);
1414
1415
1416
1417
1418
1419
1420 } else if (policy->pol_req_queued != policy->pol_nrs->nrs_req_queued) {
1421 LASSERT(policy->pol_req_queued <
1422 policy->pol_nrs->nrs_req_queued);
1423
1424 list_move_tail(&policy->pol_list_queued,
1425 &policy->pol_nrs->nrs_policy_queued);
1426 }
1427}
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445struct ptlrpc_request *
1446ptlrpc_nrs_req_get_nolock0(struct ptlrpc_service_part *svcpt, bool hp,
1447 bool peek, bool force)
1448{
1449 struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
1450 struct ptlrpc_nrs_policy *policy;
1451 struct ptlrpc_nrs_request *nrq;
1452
1453
1454
1455
1456
1457 list_for_each_entry(policy, &nrs->nrs_policy_queued, pol_list_queued) {
1458 nrq = nrs_request_get(policy, peek, force);
1459 if (nrq) {
1460 if (likely(!peek)) {
1461 nrq->nr_started = 1;
1462
1463 policy->pol_req_started++;
1464 policy->pol_nrs->nrs_req_started++;
1465
1466 nrs_request_removed(policy);
1467 }
1468
1469 return container_of(nrq, struct ptlrpc_request, rq_nrq);
1470 }
1471 }
1472
1473 return NULL;
1474}
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489bool ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp)
1490{
1491 struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
1492
1493 return nrs->nrs_req_queued > 0;
1494};
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522int ptlrpc_nrs_policy_control(const struct ptlrpc_service *svc,
1523 enum ptlrpc_nrs_queue_type queue, char *name,
1524 enum ptlrpc_nrs_ctl opc, bool single, void *arg)
1525{
1526 struct ptlrpc_service_part *svcpt;
1527 int i;
1528 int rc = 0;
1529
1530 LASSERT(opc != PTLRPC_NRS_CTL_INVALID);
1531
1532 if ((queue & PTLRPC_NRS_QUEUE_BOTH) == 0)
1533 return -EINVAL;
1534
1535 ptlrpc_service_for_each_part(svcpt, i, svc) {
1536 if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1537 rc = nrs_policy_ctl(nrs_svcpt2nrs(svcpt, false), name,
1538 opc, arg);
1539 if (rc != 0 || (queue == PTLRPC_NRS_QUEUE_REG &&
1540 single))
1541 goto out;
1542 }
1543
1544 if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1545
1546
1547
1548
1549
1550
1551
1552
1553 rc = nrs_policy_ctl(nrs_svcpt2nrs(svcpt, true), name,
1554 opc, arg);
1555 if (rc != 0 || single)
1556 goto out;
1557 }
1558 }
1559out:
1560 return rc;
1561}
1562
1563
1564extern struct ptlrpc_nrs_pol_conf nrs_conf_fifo;
1565
1566
1567
1568
1569
1570
1571
1572
1573int ptlrpc_nrs_init(void)
1574{
1575 int rc;
1576
1577 mutex_init(&nrs_core.nrs_mutex);
1578 INIT_LIST_HEAD(&nrs_core.nrs_policies);
1579
1580 rc = ptlrpc_nrs_policy_register(&nrs_conf_fifo);
1581 if (rc != 0)
1582 goto fail;
1583
1584 return rc;
1585fail:
1586
1587
1588
1589
1590 ptlrpc_nrs_fini();
1591
1592 return rc;
1593}
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604void ptlrpc_nrs_fini(void)
1605{
1606 struct ptlrpc_nrs_pol_desc *desc;
1607 struct ptlrpc_nrs_pol_desc *tmp;
1608
1609 list_for_each_entry_safe(desc, tmp, &nrs_core.nrs_policies, pd_list) {
1610 list_del_init(&desc->pd_list);
1611 kfree(desc);
1612 }
1613}
1614
1615
1616