1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37#define DEBUG_SUBSYSTEM S_RPC
38#include "../include/obd_support.h"
39#include "../include/obd_class.h"
40#include "../include/lustre_net.h"
41#include "../include/lu_object.h"
42#include "../../include/linux/lnet/types.h"
43#include "ptlrpc_internal.h"
44
45
46int test_req_buffer_pressure;
47module_param(test_req_buffer_pressure, int, 0444);
48MODULE_PARM_DESC(test_req_buffer_pressure, "set non-zero to put pressure on request buffer pools");
49module_param(at_min, int, 0644);
50MODULE_PARM_DESC(at_min, "Adaptive timeout minimum (sec)");
51module_param(at_max, int, 0644);
52MODULE_PARM_DESC(at_max, "Adaptive timeout maximum (sec)");
53module_param(at_history, int, 0644);
54MODULE_PARM_DESC(at_history,
55 "Adaptive timeouts remember the slowest event that took place within this period (sec)");
56module_param(at_early_margin, int, 0644);
57MODULE_PARM_DESC(at_early_margin, "How soon before an RPC deadline to send an early reply");
58module_param(at_extra, int, 0644);
59MODULE_PARM_DESC(at_extra, "How much extra time to give with each early reply");
60
61
62static int ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt);
63static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req);
64static void ptlrpc_at_remove_timed(struct ptlrpc_request *req);
65
66
67LIST_HEAD(ptlrpc_all_services);
68
69struct mutex ptlrpc_all_services_mutex;
70
71static struct ptlrpc_request_buffer_desc *
72ptlrpc_alloc_rqbd(struct ptlrpc_service_part *svcpt)
73{
74 struct ptlrpc_service *svc = svcpt->scp_service;
75 struct ptlrpc_request_buffer_desc *rqbd;
76
77 rqbd = kzalloc_node(sizeof(*rqbd), GFP_NOFS,
78 cfs_cpt_spread_node(svc->srv_cptable,
79 svcpt->scp_cpt));
80 if (!rqbd)
81 return NULL;
82
83 rqbd->rqbd_svcpt = svcpt;
84 rqbd->rqbd_refcount = 0;
85 rqbd->rqbd_cbid.cbid_fn = request_in_callback;
86 rqbd->rqbd_cbid.cbid_arg = rqbd;
87 INIT_LIST_HEAD(&rqbd->rqbd_reqs);
88 rqbd->rqbd_buffer = libcfs_kvzalloc_cpt(svc->srv_cptable,
89 svcpt->scp_cpt,
90 svc->srv_buf_size,
91 GFP_KERNEL);
92 if (!rqbd->rqbd_buffer) {
93 kfree(rqbd);
94 return NULL;
95 }
96
97 spin_lock(&svcpt->scp_lock);
98 list_add(&rqbd->rqbd_list, &svcpt->scp_rqbd_idle);
99 svcpt->scp_nrqbds_total++;
100 spin_unlock(&svcpt->scp_lock);
101
102 return rqbd;
103}
104
105static void
106ptlrpc_free_rqbd(struct ptlrpc_request_buffer_desc *rqbd)
107{
108 struct ptlrpc_service_part *svcpt = rqbd->rqbd_svcpt;
109
110 LASSERT(rqbd->rqbd_refcount == 0);
111 LASSERT(list_empty(&rqbd->rqbd_reqs));
112
113 spin_lock(&svcpt->scp_lock);
114 list_del(&rqbd->rqbd_list);
115 svcpt->scp_nrqbds_total--;
116 spin_unlock(&svcpt->scp_lock);
117
118 kvfree(rqbd->rqbd_buffer);
119 kfree(rqbd);
120}
121
122static int
123ptlrpc_grow_req_bufs(struct ptlrpc_service_part *svcpt, int post)
124{
125 struct ptlrpc_service *svc = svcpt->scp_service;
126 struct ptlrpc_request_buffer_desc *rqbd;
127 int rc = 0;
128 int i;
129
130 if (svcpt->scp_rqbd_allocating)
131 goto try_post;
132
133 spin_lock(&svcpt->scp_lock);
134
135 if (svcpt->scp_rqbd_allocating) {
136
137 LASSERT(svcpt->scp_rqbd_allocating == 1);
138 spin_unlock(&svcpt->scp_lock);
139 goto try_post;
140 }
141
142 svcpt->scp_rqbd_allocating++;
143 spin_unlock(&svcpt->scp_lock);
144
145 for (i = 0; i < svc->srv_nbuf_per_group; i++) {
146
147
148
149 if (svcpt->scp_nrqbds_posted >= svc->srv_nbuf_per_group)
150 break;
151
152 rqbd = ptlrpc_alloc_rqbd(svcpt);
153
154 if (!rqbd) {
155 CERROR("%s: Can't allocate request buffer\n",
156 svc->srv_name);
157 rc = -ENOMEM;
158 break;
159 }
160 }
161
162 spin_lock(&svcpt->scp_lock);
163
164 LASSERT(svcpt->scp_rqbd_allocating == 1);
165 svcpt->scp_rqbd_allocating--;
166
167 spin_unlock(&svcpt->scp_lock);
168
169 CDEBUG(D_RPCTRACE,
170 "%s: allocate %d new %d-byte reqbufs (%d/%d left), rc = %d\n",
171 svc->srv_name, i, svc->srv_buf_size, svcpt->scp_nrqbds_posted,
172 svcpt->scp_nrqbds_total, rc);
173
174 try_post:
175 if (post && rc == 0)
176 rc = ptlrpc_server_post_idle_rqbds(svcpt);
177
178 return rc;
179}
180
181struct ptlrpc_hr_partition;
182
183struct ptlrpc_hr_thread {
184 int hrt_id;
185 spinlock_t hrt_lock;
186 wait_queue_head_t hrt_waitq;
187 struct list_head hrt_queue;
188 struct ptlrpc_hr_partition *hrt_partition;
189};
190
191struct ptlrpc_hr_partition {
192
193 atomic_t hrp_nstarted;
194
195 atomic_t hrp_nstopped;
196
197 int hrp_cpt;
198
199 int hrp_rotor;
200
201 int hrp_nthrs;
202
203 struct ptlrpc_hr_thread *hrp_thrs;
204};
205
206#define HRT_RUNNING 0
207#define HRT_STOPPING 1
208
209struct ptlrpc_hr_service {
210
211 struct cfs_cpt_table *hr_cpt_table;
212
213 wait_queue_head_t hr_waitq;
214 unsigned int hr_stopping;
215
216 unsigned int hr_rotor;
217
218 struct ptlrpc_hr_partition **hr_partitions;
219};
220
221
222static struct ptlrpc_hr_service ptlrpc_hr;
223
224
225
226
227static struct ptlrpc_hr_thread *
228ptlrpc_hr_select(struct ptlrpc_service_part *svcpt)
229{
230 struct ptlrpc_hr_partition *hrp;
231 unsigned int rotor;
232
233 if (svcpt->scp_cpt >= 0 &&
234 svcpt->scp_service->srv_cptable == ptlrpc_hr.hr_cpt_table) {
235
236 hrp = ptlrpc_hr.hr_partitions[svcpt->scp_cpt];
237
238 } else {
239 rotor = ptlrpc_hr.hr_rotor++;
240 rotor %= cfs_cpt_number(ptlrpc_hr.hr_cpt_table);
241
242 hrp = ptlrpc_hr.hr_partitions[rotor];
243 }
244
245 rotor = hrp->hrp_rotor++;
246 return &hrp->hrp_thrs[rotor % hrp->hrp_nthrs];
247}
248
249
250
251
252
253void ptlrpc_dispatch_difficult_reply(struct ptlrpc_reply_state *rs)
254{
255 struct ptlrpc_hr_thread *hrt;
256
257 LASSERT(list_empty(&rs->rs_list));
258
259 hrt = ptlrpc_hr_select(rs->rs_svcpt);
260
261 spin_lock(&hrt->hrt_lock);
262 list_add_tail(&rs->rs_list, &hrt->hrt_queue);
263 spin_unlock(&hrt->hrt_lock);
264
265 wake_up(&hrt->hrt_waitq);
266}
267
268void
269ptlrpc_schedule_difficult_reply(struct ptlrpc_reply_state *rs)
270{
271 assert_spin_locked(&rs->rs_svcpt->scp_rep_lock);
272 assert_spin_locked(&rs->rs_lock);
273 LASSERT(rs->rs_difficult);
274 rs->rs_scheduled_ever = 1;
275
276 if (rs->rs_scheduled) {
277 return;
278 }
279
280 rs->rs_scheduled = 1;
281 list_del_init(&rs->rs_list);
282 ptlrpc_dispatch_difficult_reply(rs);
283}
284EXPORT_SYMBOL(ptlrpc_schedule_difficult_reply);
285
286static int
287ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt)
288{
289 struct ptlrpc_request_buffer_desc *rqbd;
290 int rc;
291 int posted = 0;
292
293 for (;;) {
294 spin_lock(&svcpt->scp_lock);
295
296 if (list_empty(&svcpt->scp_rqbd_idle)) {
297 spin_unlock(&svcpt->scp_lock);
298 return posted;
299 }
300
301 rqbd = list_entry(svcpt->scp_rqbd_idle.next,
302 struct ptlrpc_request_buffer_desc,
303 rqbd_list);
304 list_del(&rqbd->rqbd_list);
305
306
307 svcpt->scp_nrqbds_posted++;
308 list_add(&rqbd->rqbd_list, &svcpt->scp_rqbd_posted);
309
310 spin_unlock(&svcpt->scp_lock);
311
312 rc = ptlrpc_register_rqbd(rqbd);
313 if (rc != 0)
314 break;
315
316 posted = 1;
317 }
318
319 spin_lock(&svcpt->scp_lock);
320
321 svcpt->scp_nrqbds_posted--;
322 list_del(&rqbd->rqbd_list);
323 list_add_tail(&rqbd->rqbd_list, &svcpt->scp_rqbd_idle);
324
325
326
327
328
329 spin_unlock(&svcpt->scp_lock);
330
331 return -1;
332}
333
334static void ptlrpc_at_timer(unsigned long castmeharder)
335{
336 struct ptlrpc_service_part *svcpt;
337
338 svcpt = (struct ptlrpc_service_part *)castmeharder;
339
340 svcpt->scp_at_check = 1;
341 svcpt->scp_at_checktime = cfs_time_current();
342 wake_up(&svcpt->scp_waitq);
343}
344
345static void
346ptlrpc_server_nthreads_check(struct ptlrpc_service *svc,
347 struct ptlrpc_service_conf *conf)
348{
349 struct ptlrpc_service_thr_conf *tc = &conf->psc_thr;
350 unsigned init;
351 unsigned total;
352 unsigned nthrs;
353 int weight;
354
355
356
357
358
359
360
361
362
363
364 init = PTLRPC_NTHRS_INIT + (svc->srv_ops.so_hpreq_handler != NULL);
365 init = max_t(int, init, tc->tc_nthrs_init);
366
367
368
369
370 LASSERT(tc->tc_nthrs_max != 0);
371
372 if (tc->tc_nthrs_user != 0) {
373
374
375
376
377 total = min(tc->tc_nthrs_max * 8, tc->tc_nthrs_user);
378 nthrs = total / svc->srv_ncpts;
379 init = max(init, nthrs);
380 goto out;
381 }
382
383 total = tc->tc_nthrs_max;
384 if (tc->tc_nthrs_base == 0) {
385
386
387
388 nthrs = total / svc->srv_ncpts;
389 goto out;
390 }
391
392 nthrs = tc->tc_nthrs_base;
393 if (svc->srv_ncpts == 1) {
394 int i;
395
396
397
398
399
400 weight = cfs_cpt_weight(svc->srv_cptable, CFS_CPT_ANY);
401 for (i = 1; (weight >> (i + 1)) != 0 &&
402 (tc->tc_nthrs_base >> i) != 0; i++)
403 nthrs += tc->tc_nthrs_base >> i;
404 }
405
406 if (tc->tc_thr_factor != 0) {
407 int factor = tc->tc_thr_factor;
408 const int fade = 4;
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423 if (cpumask_weight(topology_sibling_cpumask(0)) > 1) {
424
425 factor = factor - (factor >> 1) + (factor >> 3);
426 }
427
428 weight = cfs_cpt_weight(svc->srv_cptable, 0);
429 LASSERT(weight > 0);
430
431 for (; factor > 0 && weight > 0; factor--, weight -= fade)
432 nthrs += min(weight, fade) * factor;
433 }
434
435 if (nthrs * svc->srv_ncpts > tc->tc_nthrs_max) {
436 nthrs = max(tc->tc_nthrs_base,
437 tc->tc_nthrs_max / svc->srv_ncpts);
438 }
439 out:
440 nthrs = max(nthrs, tc->tc_nthrs_init);
441 svc->srv_nthrs_cpt_limit = nthrs;
442 svc->srv_nthrs_cpt_init = init;
443
444 if (nthrs * svc->srv_ncpts > tc->tc_nthrs_max) {
445 CDEBUG(D_OTHER, "%s: This service may have more threads (%d) than the given soft limit (%d)\n",
446 svc->srv_name, nthrs * svc->srv_ncpts,
447 tc->tc_nthrs_max);
448 }
449}
450
451
452
453
454static int
455ptlrpc_service_part_init(struct ptlrpc_service *svc,
456 struct ptlrpc_service_part *svcpt, int cpt)
457{
458 struct ptlrpc_at_array *array;
459 int size;
460 int index;
461 int rc;
462
463 svcpt->scp_cpt = cpt;
464 INIT_LIST_HEAD(&svcpt->scp_threads);
465
466
467 spin_lock_init(&svcpt->scp_lock);
468 INIT_LIST_HEAD(&svcpt->scp_rqbd_idle);
469 INIT_LIST_HEAD(&svcpt->scp_rqbd_posted);
470 INIT_LIST_HEAD(&svcpt->scp_req_incoming);
471 init_waitqueue_head(&svcpt->scp_waitq);
472
473 INIT_LIST_HEAD(&svcpt->scp_hist_reqs);
474 INIT_LIST_HEAD(&svcpt->scp_hist_rqbds);
475
476
477 spin_lock_init(&svcpt->scp_req_lock);
478
479
480 spin_lock_init(&svcpt->scp_rep_lock);
481 INIT_LIST_HEAD(&svcpt->scp_rep_active);
482 INIT_LIST_HEAD(&svcpt->scp_rep_idle);
483 init_waitqueue_head(&svcpt->scp_rep_waitq);
484 atomic_set(&svcpt->scp_nreps_difficult, 0);
485
486
487 spin_lock_init(&svcpt->scp_at_lock);
488 array = &svcpt->scp_at_array;
489
490 size = at_est2timeout(at_max);
491 array->paa_size = size;
492 array->paa_count = 0;
493 array->paa_deadline = -1;
494
495
496 array->paa_reqs_array =
497 kzalloc_node(sizeof(struct list_head) * size, GFP_NOFS,
498 cfs_cpt_spread_node(svc->srv_cptable, cpt));
499 if (!array->paa_reqs_array)
500 return -ENOMEM;
501
502 for (index = 0; index < size; index++)
503 INIT_LIST_HEAD(&array->paa_reqs_array[index]);
504
505 array->paa_reqs_count =
506 kzalloc_node(sizeof(__u32) * size, GFP_NOFS,
507 cfs_cpt_spread_node(svc->srv_cptable, cpt));
508 if (!array->paa_reqs_count)
509 goto free_reqs_array;
510
511 setup_timer(&svcpt->scp_at_timer, ptlrpc_at_timer,
512 (unsigned long)svcpt);
513
514
515
516
517 at_init(&svcpt->scp_at_estimate, 10, 0);
518
519
520 svcpt->scp_service = svc;
521
522 rc = ptlrpc_grow_req_bufs(svcpt, 0);
523
524
525
526 if (rc != 0)
527 goto free_reqs_count;
528
529 return 0;
530
531free_reqs_count:
532 kfree(array->paa_reqs_count);
533 array->paa_reqs_count = NULL;
534free_reqs_array:
535 kfree(array->paa_reqs_array);
536 array->paa_reqs_array = NULL;
537
538 return -ENOMEM;
539}
540
541
542
543
544
545
546struct ptlrpc_service *
547ptlrpc_register_service(struct ptlrpc_service_conf *conf,
548 struct kset *parent,
549 struct dentry *debugfs_entry)
550{
551 struct ptlrpc_service_cpt_conf *cconf = &conf->psc_cpt;
552 struct ptlrpc_service *service;
553 struct ptlrpc_service_part *svcpt;
554 struct cfs_cpt_table *cptable;
555 __u32 *cpts = NULL;
556 int ncpts;
557 int cpt;
558 int rc;
559 int i;
560
561 LASSERT(conf->psc_buf.bc_nbufs > 0);
562 LASSERT(conf->psc_buf.bc_buf_size >=
563 conf->psc_buf.bc_req_max_size + SPTLRPC_MAX_PAYLOAD);
564 LASSERT(conf->psc_thr.tc_ctx_tags != 0);
565
566 cptable = cconf->cc_cptable;
567 if (!cptable)
568 cptable = cfs_cpt_table;
569
570 if (!conf->psc_thr.tc_cpu_affinity) {
571 ncpts = 1;
572 } else {
573 ncpts = cfs_cpt_number(cptable);
574 if (cconf->cc_pattern) {
575 struct cfs_expr_list *el;
576
577 rc = cfs_expr_list_parse(cconf->cc_pattern,
578 strlen(cconf->cc_pattern),
579 0, ncpts - 1, &el);
580 if (rc != 0) {
581 CERROR("%s: invalid CPT pattern string: %s",
582 conf->psc_name, cconf->cc_pattern);
583 return ERR_PTR(-EINVAL);
584 }
585
586 rc = cfs_expr_list_values(el, ncpts, &cpts);
587 cfs_expr_list_free(el);
588 if (rc <= 0) {
589 CERROR("%s: failed to parse CPT array %s: %d\n",
590 conf->psc_name, cconf->cc_pattern, rc);
591 kfree(cpts);
592 return ERR_PTR(rc < 0 ? rc : -EINVAL);
593 }
594 ncpts = rc;
595 }
596 }
597
598 service = kzalloc(offsetof(struct ptlrpc_service, srv_parts[ncpts]),
599 GFP_NOFS);
600 if (!service) {
601 kfree(cpts);
602 return ERR_PTR(-ENOMEM);
603 }
604
605 service->srv_cptable = cptable;
606 service->srv_cpts = cpts;
607 service->srv_ncpts = ncpts;
608
609 service->srv_cpt_bits = 0;
610 while ((1 << service->srv_cpt_bits) < cfs_cpt_number(cptable))
611 service->srv_cpt_bits++;
612
613
614 spin_lock_init(&service->srv_lock);
615 service->srv_name = conf->psc_name;
616 service->srv_watchdog_factor = conf->psc_watchdog_factor;
617 INIT_LIST_HEAD(&service->srv_list);
618
619
620 service->srv_nbuf_per_group = test_req_buffer_pressure ?
621 1 : conf->psc_buf.bc_nbufs;
622 service->srv_max_req_size = conf->psc_buf.bc_req_max_size +
623 SPTLRPC_MAX_PAYLOAD;
624 service->srv_buf_size = conf->psc_buf.bc_buf_size;
625 service->srv_rep_portal = conf->psc_buf.bc_rep_portal;
626 service->srv_req_portal = conf->psc_buf.bc_req_portal;
627
628
629 service->srv_max_reply_size = 1;
630 while (service->srv_max_reply_size <
631 conf->psc_buf.bc_rep_max_size + SPTLRPC_MAX_PAYLOAD)
632 service->srv_max_reply_size <<= 1;
633
634 service->srv_thread_name = conf->psc_thr.tc_thr_name;
635 service->srv_ctx_tags = conf->psc_thr.tc_ctx_tags;
636 service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO;
637 service->srv_ops = conf->psc_ops;
638
639 for (i = 0; i < ncpts; i++) {
640 if (!conf->psc_thr.tc_cpu_affinity)
641 cpt = CFS_CPT_ANY;
642 else
643 cpt = cpts ? cpts[i] : i;
644
645 svcpt = kzalloc_node(sizeof(*svcpt), GFP_NOFS,
646 cfs_cpt_spread_node(cptable, cpt));
647 if (!svcpt) {
648 rc = -ENOMEM;
649 goto failed;
650 }
651
652 service->srv_parts[i] = svcpt;
653 rc = ptlrpc_service_part_init(service, svcpt, cpt);
654 if (rc != 0)
655 goto failed;
656 }
657
658 ptlrpc_server_nthreads_check(service, conf);
659
660 rc = LNetSetLazyPortal(service->srv_req_portal);
661 LASSERT(rc == 0);
662
663 mutex_lock(&ptlrpc_all_services_mutex);
664 list_add(&service->srv_list, &ptlrpc_all_services);
665 mutex_unlock(&ptlrpc_all_services_mutex);
666
667 if (parent) {
668 rc = ptlrpc_sysfs_register_service(parent, service);
669 if (rc)
670 goto failed;
671 }
672
673 if (!IS_ERR_OR_NULL(debugfs_entry))
674 ptlrpc_ldebugfs_register_service(debugfs_entry, service);
675
676 rc = ptlrpc_service_nrs_setup(service);
677 if (rc != 0)
678 goto failed;
679
680 CDEBUG(D_NET, "%s: Started, listening on portal %d\n",
681 service->srv_name, service->srv_req_portal);
682
683 rc = ptlrpc_start_threads(service);
684 if (rc != 0) {
685 CERROR("Failed to start threads for service %s: %d\n",
686 service->srv_name, rc);
687 goto failed;
688 }
689
690 return service;
691failed:
692 ptlrpc_unregister_service(service);
693 return ERR_PTR(rc);
694}
695EXPORT_SYMBOL(ptlrpc_register_service);
696
697
698
699
700
701static void ptlrpc_server_free_request(struct ptlrpc_request *req)
702{
703 LASSERT(atomic_read(&req->rq_refcount) == 0);
704 LASSERT(list_empty(&req->rq_timed_list));
705
706
707
708
709 ptlrpc_req_drop_rs(req);
710
711 sptlrpc_svc_ctx_decref(req);
712
713 if (req != &req->rq_rqbd->rqbd_req) {
714
715
716
717
718 ptlrpc_request_cache_free(req);
719 }
720}
721
722
723
724
725
726static void ptlrpc_server_drop_request(struct ptlrpc_request *req)
727{
728 struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
729 struct ptlrpc_service_part *svcpt = rqbd->rqbd_svcpt;
730 struct ptlrpc_service *svc = svcpt->scp_service;
731 int refcount;
732 struct list_head *tmp;
733 struct list_head *nxt;
734
735 if (!atomic_dec_and_test(&req->rq_refcount))
736 return;
737
738 if (req->rq_at_linked) {
739 spin_lock(&svcpt->scp_at_lock);
740
741
742
743 if (likely(req->rq_at_linked))
744 ptlrpc_at_remove_timed(req);
745 spin_unlock(&svcpt->scp_at_lock);
746 }
747
748 LASSERT(list_empty(&req->rq_timed_list));
749
750
751 if (req->rq_export) {
752 class_export_put(req->rq_export);
753 req->rq_export = NULL;
754 }
755
756 spin_lock(&svcpt->scp_lock);
757
758 list_add(&req->rq_list, &rqbd->rqbd_reqs);
759
760 refcount = --(rqbd->rqbd_refcount);
761 if (refcount == 0) {
762
763 list_del(&rqbd->rqbd_list);
764
765 list_add_tail(&rqbd->rqbd_list, &svcpt->scp_hist_rqbds);
766 svcpt->scp_hist_nrqbds++;
767
768
769
770
771 while (svcpt->scp_hist_nrqbds > svc->srv_hist_nrqbds_cpt_max) {
772 rqbd = list_entry(svcpt->scp_hist_rqbds.next,
773 struct ptlrpc_request_buffer_desc,
774 rqbd_list);
775
776 list_del(&rqbd->rqbd_list);
777 svcpt->scp_hist_nrqbds--;
778
779
780
781
782 list_for_each(tmp, &rqbd->rqbd_reqs) {
783 req = list_entry(tmp, struct ptlrpc_request,
784 rq_list);
785
786 if (req->rq_history_seq >
787 svcpt->scp_hist_seq_culled) {
788 svcpt->scp_hist_seq_culled =
789 req->rq_history_seq;
790 }
791 list_del(&req->rq_history_list);
792 }
793
794 spin_unlock(&svcpt->scp_lock);
795
796 list_for_each_safe(tmp, nxt, &rqbd->rqbd_reqs) {
797 req = list_entry(rqbd->rqbd_reqs.next,
798 struct ptlrpc_request,
799 rq_list);
800 list_del(&req->rq_list);
801 ptlrpc_server_free_request(req);
802 }
803
804 spin_lock(&svcpt->scp_lock);
805
806
807
808
809 LASSERT(atomic_read(&rqbd->rqbd_req.rq_refcount) ==
810 0);
811 list_add_tail(&rqbd->rqbd_list, &svcpt->scp_rqbd_idle);
812 }
813
814 spin_unlock(&svcpt->scp_lock);
815 } else if (req->rq_reply_state && req->rq_reply_state->rs_prealloc) {
816
817 list_del(&req->rq_list);
818 list_del_init(&req->rq_history_list);
819
820
821 if (req->rq_history_seq > svcpt->scp_hist_seq_culled)
822 svcpt->scp_hist_seq_culled = req->rq_history_seq;
823
824 spin_unlock(&svcpt->scp_lock);
825
826 ptlrpc_server_free_request(req);
827 } else {
828 spin_unlock(&svcpt->scp_lock);
829 }
830}
831
832
833
834
835
836static void ptlrpc_server_finish_request(struct ptlrpc_service_part *svcpt,
837 struct ptlrpc_request *req)
838{
839 ptlrpc_server_hpreq_fini(req);
840
841 ptlrpc_server_drop_request(req);
842}
843
844
845
846
847
848static void ptlrpc_server_finish_active_request(
849 struct ptlrpc_service_part *svcpt,
850 struct ptlrpc_request *req)
851{
852 spin_lock(&svcpt->scp_req_lock);
853 ptlrpc_nrs_req_stop_nolock(req);
854 svcpt->scp_nreqs_active--;
855 if (req->rq_hp)
856 svcpt->scp_nhreqs_active--;
857 spin_unlock(&svcpt->scp_req_lock);
858
859 ptlrpc_nrs_req_finalize(req);
860
861 if (req->rq_export)
862 class_export_rpc_dec(req->rq_export);
863
864 ptlrpc_server_finish_request(svcpt, req);
865}
866
867
868
869
870
871static int ptlrpc_check_req(struct ptlrpc_request *req)
872{
873 struct obd_device *obd = req->rq_export->exp_obd;
874 int rc = 0;
875
876 if (unlikely(lustre_msg_get_conn_cnt(req->rq_reqmsg) <
877 req->rq_export->exp_conn_cnt)) {
878 DEBUG_REQ(D_RPCTRACE, req,
879 "DROPPING req from old connection %d < %d",
880 lustre_msg_get_conn_cnt(req->rq_reqmsg),
881 req->rq_export->exp_conn_cnt);
882 return -EEXIST;
883 }
884 if (unlikely(!obd || obd->obd_fail)) {
885
886
887
888
889 CDEBUG(D_RPCTRACE, "Dropping req %p for failed obd %s\n",
890 req, obd ? obd->obd_name : "unknown");
891 rc = -ENODEV;
892 } else if (lustre_msg_get_flags(req->rq_reqmsg) &
893 (MSG_REPLAY | MSG_REQ_REPLAY_DONE)) {
894 DEBUG_REQ(D_ERROR, req, "Invalid replay without recovery");
895 class_fail_export(req->rq_export);
896 rc = -ENODEV;
897 } else if (lustre_msg_get_transno(req->rq_reqmsg) != 0) {
898 DEBUG_REQ(D_ERROR, req,
899 "Invalid req with transno %llu without recovery",
900 lustre_msg_get_transno(req->rq_reqmsg));
901 class_fail_export(req->rq_export);
902 rc = -ENODEV;
903 }
904
905 if (unlikely(rc < 0)) {
906 req->rq_status = rc;
907 ptlrpc_error(req);
908 }
909 return rc;
910}
911
912static void ptlrpc_at_set_timer(struct ptlrpc_service_part *svcpt)
913{
914 struct ptlrpc_at_array *array = &svcpt->scp_at_array;
915 __s32 next;
916
917 if (array->paa_count == 0) {
918 del_timer(&svcpt->scp_at_timer);
919 return;
920 }
921
922
923 next = (__s32)(array->paa_deadline - ktime_get_real_seconds() -
924 at_early_margin);
925 if (next <= 0) {
926 ptlrpc_at_timer((unsigned long)svcpt);
927 } else {
928 mod_timer(&svcpt->scp_at_timer, cfs_time_shift(next));
929 CDEBUG(D_INFO, "armed %s at %+ds\n",
930 svcpt->scp_service->srv_name, next);
931 }
932}
933
934
935static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
936{
937 struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
938 struct ptlrpc_at_array *array = &svcpt->scp_at_array;
939 struct ptlrpc_request *rq = NULL;
940 __u32 index;
941
942 if (AT_OFF)
943 return 0;
944
945 if (req->rq_no_reply)
946 return 0;
947
948 if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0)
949 return -ENOSYS;
950
951 spin_lock(&svcpt->scp_at_lock);
952 LASSERT(list_empty(&req->rq_timed_list));
953
954 div_u64_rem(req->rq_deadline, array->paa_size, &index);
955 if (array->paa_reqs_count[index] > 0) {
956
957
958
959 list_for_each_entry_reverse(rq, &array->paa_reqs_array[index],
960 rq_timed_list) {
961 if (req->rq_deadline >= rq->rq_deadline) {
962 list_add(&req->rq_timed_list,
963 &rq->rq_timed_list);
964 break;
965 }
966 }
967 }
968
969
970 if (list_empty(&req->rq_timed_list))
971 list_add(&req->rq_timed_list, &array->paa_reqs_array[index]);
972
973 spin_lock(&req->rq_lock);
974 req->rq_at_linked = 1;
975 spin_unlock(&req->rq_lock);
976 req->rq_at_index = index;
977 array->paa_reqs_count[index]++;
978 array->paa_count++;
979 if (array->paa_count == 1 || array->paa_deadline > req->rq_deadline) {
980 array->paa_deadline = req->rq_deadline;
981 ptlrpc_at_set_timer(svcpt);
982 }
983 spin_unlock(&svcpt->scp_at_lock);
984
985 return 0;
986}
987
988static void
989ptlrpc_at_remove_timed(struct ptlrpc_request *req)
990{
991 struct ptlrpc_at_array *array;
992
993 array = &req->rq_rqbd->rqbd_svcpt->scp_at_array;
994
995
996 LASSERT(!list_empty(&req->rq_timed_list));
997 list_del_init(&req->rq_timed_list);
998
999 spin_lock(&req->rq_lock);
1000 req->rq_at_linked = 0;
1001 spin_unlock(&req->rq_lock);
1002
1003 array->paa_reqs_count[req->rq_at_index]--;
1004 array->paa_count--;
1005}
1006
1007static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
1008{
1009 struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
1010 struct ptlrpc_request *reqcopy;
1011 struct lustre_msg *reqmsg;
1012 long olddl = req->rq_deadline - ktime_get_real_seconds();
1013 time64_t newdl;
1014 int rc;
1015
1016
1017
1018
1019 DEBUG_REQ(D_ADAPTTO, req,
1020 "%ssending early reply (deadline %+lds, margin %+lds) for %d+%d",
1021 AT_OFF ? "AT off - not " : "",
1022 olddl, olddl - at_get(&svcpt->scp_at_estimate),
1023 at_get(&svcpt->scp_at_estimate), at_extra);
1024
1025 if (AT_OFF)
1026 return 0;
1027
1028 if (olddl < 0) {
1029 DEBUG_REQ(D_WARNING, req, "Already past deadline (%+lds), not sending early reply. Consider increasing at_early_margin (%d)?",
1030 olddl, at_early_margin);
1031
1032
1033 return -ETIMEDOUT;
1034 }
1035
1036 if (!(lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT)) {
1037 DEBUG_REQ(D_INFO, req, "Wanted to ask client for more time, but no AT support");
1038 return -ENOSYS;
1039 }
1040
1041
1042
1043
1044 at_measured(&svcpt->scp_at_estimate, at_extra +
1045 ktime_get_real_seconds() - req->rq_arrival_time.tv_sec);
1046
1047
1048
1049
1050 if (req->rq_deadline >= req->rq_arrival_time.tv_sec +
1051 at_get(&svcpt->scp_at_estimate)) {
1052 DEBUG_REQ(D_WARNING, req, "Couldn't add any time (%ld/%lld), not sending early reply\n",
1053 olddl, req->rq_arrival_time.tv_sec +
1054 at_get(&svcpt->scp_at_estimate) -
1055 ktime_get_real_seconds());
1056 return -ETIMEDOUT;
1057 }
1058 newdl = ktime_get_real_seconds() + at_get(&svcpt->scp_at_estimate);
1059
1060 reqcopy = ptlrpc_request_cache_alloc(GFP_NOFS);
1061 if (!reqcopy)
1062 return -ENOMEM;
1063 reqmsg = libcfs_kvzalloc(req->rq_reqlen, GFP_NOFS);
1064 if (!reqmsg) {
1065 rc = -ENOMEM;
1066 goto out_free;
1067 }
1068
1069 *reqcopy = *req;
1070 reqcopy->rq_reply_state = NULL;
1071 reqcopy->rq_rep_swab_mask = 0;
1072 reqcopy->rq_pack_bulk = 0;
1073 reqcopy->rq_pack_udesc = 0;
1074 reqcopy->rq_packed_final = 0;
1075 sptlrpc_svc_ctx_addref(reqcopy);
1076
1077 reqcopy->rq_reqmsg = reqmsg;
1078 memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
1079
1080 LASSERT(atomic_read(&req->rq_refcount));
1081
1082 if (atomic_read(&req->rq_refcount) == 1) {
1083 DEBUG_REQ(D_ADAPTTO, reqcopy, "Normal reply already sent out, abort sending early reply\n");
1084 rc = -EINVAL;
1085 goto out;
1086 }
1087
1088
1089 reqcopy->rq_export = class_conn2export(
1090 lustre_msg_get_handle(reqcopy->rq_reqmsg));
1091 if (!reqcopy->rq_export) {
1092 rc = -ENODEV;
1093 goto out;
1094 }
1095
1096
1097 class_export_rpc_inc(reqcopy->rq_export);
1098 if (reqcopy->rq_export->exp_obd &&
1099 reqcopy->rq_export->exp_obd->obd_fail) {
1100 rc = -ENODEV;
1101 goto out_put;
1102 }
1103
1104 rc = lustre_pack_reply_flags(reqcopy, 1, NULL, NULL, LPRFL_EARLY_REPLY);
1105 if (rc)
1106 goto out_put;
1107
1108 rc = ptlrpc_send_reply(reqcopy, PTLRPC_REPLY_EARLY);
1109
1110 if (!rc) {
1111
1112 req->rq_deadline = newdl;
1113 req->rq_early_count++;
1114 } else {
1115 DEBUG_REQ(D_ERROR, req, "Early reply send failed %d", rc);
1116 }
1117
1118
1119
1120
1121 ptlrpc_req_drop_rs(reqcopy);
1122
1123out_put:
1124 class_export_rpc_dec(reqcopy->rq_export);
1125 class_export_put(reqcopy->rq_export);
1126out:
1127 sptlrpc_svc_ctx_decref(reqcopy);
1128 kvfree(reqmsg);
1129out_free:
1130 ptlrpc_request_cache_free(reqcopy);
1131 return rc;
1132}
1133
1134
1135
1136
1137static void ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
1138{
1139 struct ptlrpc_at_array *array = &svcpt->scp_at_array;
1140 struct ptlrpc_request *rq, *n;
1141 struct list_head work_list;
1142 __u32 index, count;
1143 time64_t deadline;
1144 time64_t now = ktime_get_real_seconds();
1145 long delay;
1146 int first, counter = 0;
1147
1148 spin_lock(&svcpt->scp_at_lock);
1149 if (svcpt->scp_at_check == 0) {
1150 spin_unlock(&svcpt->scp_at_lock);
1151 return;
1152 }
1153 delay = cfs_time_sub(cfs_time_current(), svcpt->scp_at_checktime);
1154 svcpt->scp_at_check = 0;
1155
1156 if (array->paa_count == 0) {
1157 spin_unlock(&svcpt->scp_at_lock);
1158 return;
1159 }
1160
1161
1162 first = array->paa_deadline - now;
1163 if (first > at_early_margin) {
1164
1165 ptlrpc_at_set_timer(svcpt);
1166 spin_unlock(&svcpt->scp_at_lock);
1167 return;
1168 }
1169
1170
1171
1172
1173 INIT_LIST_HEAD(&work_list);
1174 deadline = -1;
1175 div_u64_rem(array->paa_deadline, array->paa_size, &index);
1176 count = array->paa_count;
1177 while (count > 0) {
1178 count -= array->paa_reqs_count[index];
1179 list_for_each_entry_safe(rq, n, &array->paa_reqs_array[index],
1180 rq_timed_list) {
1181 if (rq->rq_deadline > now + at_early_margin) {
1182
1183 if (deadline == -1 ||
1184 rq->rq_deadline < deadline)
1185 deadline = rq->rq_deadline;
1186 break;
1187 }
1188
1189 ptlrpc_at_remove_timed(rq);
1190
1191
1192
1193
1194
1195 if (likely(atomic_inc_not_zero(&rq->rq_refcount)))
1196 list_add(&rq->rq_timed_list, &work_list);
1197 counter++;
1198 }
1199
1200 if (++index >= array->paa_size)
1201 index = 0;
1202 }
1203 array->paa_deadline = deadline;
1204
1205 ptlrpc_at_set_timer(svcpt);
1206
1207 spin_unlock(&svcpt->scp_at_lock);
1208
1209 CDEBUG(D_ADAPTTO, "timeout in %+ds, asking for %d secs on %d early replies\n",
1210 first, at_extra, counter);
1211 if (first < 0) {
1212
1213
1214
1215 LCONSOLE_WARN("%s: This server is not able to keep up with request traffic (cpu-bound).\n",
1216 svcpt->scp_service->srv_name);
1217 CWARN("earlyQ=%d reqQ=%d recA=%d, svcEst=%d, delay=%ld(jiff)\n",
1218 counter, svcpt->scp_nreqs_incoming,
1219 svcpt->scp_nreqs_active,
1220 at_get(&svcpt->scp_at_estimate), delay);
1221 }
1222
1223
1224
1225
1226 while (!list_empty(&work_list)) {
1227 rq = list_entry(work_list.next, struct ptlrpc_request,
1228 rq_timed_list);
1229 list_del_init(&rq->rq_timed_list);
1230
1231 if (ptlrpc_at_send_early_reply(rq) == 0)
1232 ptlrpc_at_add_timed(rq);
1233
1234 ptlrpc_server_drop_request(rq);
1235 }
1236}
1237
1238
1239
1240
1241
1242static int ptlrpc_server_hpreq_init(struct ptlrpc_service_part *svcpt,
1243 struct ptlrpc_request *req)
1244{
1245 int rc = 0;
1246
1247 if (svcpt->scp_service->srv_ops.so_hpreq_handler) {
1248 rc = svcpt->scp_service->srv_ops.so_hpreq_handler(req);
1249 if (rc < 0)
1250 return rc;
1251 LASSERT(rc == 0);
1252 }
1253 if (req->rq_export && req->rq_ops) {
1254
1255
1256
1257
1258 if (req->rq_ops->hpreq_check) {
1259 rc = req->rq_ops->hpreq_check(req);
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271 if (rc < 0)
1272 return rc;
1273 LASSERT(rc == 0 || rc == 1);
1274 }
1275
1276 spin_lock_bh(&req->rq_export->exp_rpc_lock);
1277 list_add(&req->rq_exp_list, &req->rq_export->exp_hp_rpcs);
1278 spin_unlock_bh(&req->rq_export->exp_rpc_lock);
1279 }
1280
1281 ptlrpc_nrs_req_initialize(svcpt, req, rc);
1282
1283 return rc;
1284}
1285
1286
1287static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req)
1288{
1289 if (req->rq_export && req->rq_ops) {
1290
1291
1292
1293 if (req->rq_ops->hpreq_fini)
1294 req->rq_ops->hpreq_fini(req);
1295
1296 spin_lock_bh(&req->rq_export->exp_rpc_lock);
1297 list_del_init(&req->rq_exp_list);
1298 spin_unlock_bh(&req->rq_export->exp_rpc_lock);
1299 }
1300}
1301
1302static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
1303 struct ptlrpc_request *req)
1304{
1305 int rc;
1306
1307 rc = ptlrpc_server_hpreq_init(svcpt, req);
1308 if (rc < 0)
1309 return rc;
1310
1311 ptlrpc_nrs_req_add(svcpt, req, !!rc);
1312
1313 return 0;
1314}
1315
1316
1317
1318
1319
1320
1321static bool ptlrpc_server_allow_high(struct ptlrpc_service_part *svcpt,
1322 bool force)
1323{
1324 int running = svcpt->scp_nthrs_running;
1325
1326 if (!nrs_svcpt_has_hp(svcpt))
1327 return false;
1328
1329 if (force)
1330 return true;
1331
1332 if (unlikely(svcpt->scp_service->srv_req_portal == MDS_REQUEST_PORTAL &&
1333 CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND))) {
1334
1335 running = PTLRPC_NTHRS_INIT;
1336 if (svcpt->scp_service->srv_ops.so_hpreq_handler)
1337 running += 1;
1338 }
1339
1340 if (svcpt->scp_nreqs_active >= running - 1)
1341 return false;
1342
1343 if (svcpt->scp_nhreqs_active == 0)
1344 return true;
1345
1346 return !ptlrpc_nrs_req_pending_nolock(svcpt, false) ||
1347 svcpt->scp_hreq_count < svcpt->scp_service->srv_hpreq_ratio;
1348}
1349
1350static bool ptlrpc_server_high_pending(struct ptlrpc_service_part *svcpt,
1351 bool force)
1352{
1353 return ptlrpc_server_allow_high(svcpt, force) &&
1354 ptlrpc_nrs_req_pending_nolock(svcpt, true);
1355}
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366static bool ptlrpc_server_allow_normal(struct ptlrpc_service_part *svcpt,
1367 bool force)
1368{
1369 int running = svcpt->scp_nthrs_running;
1370
1371 if (unlikely(svcpt->scp_service->srv_req_portal == MDS_REQUEST_PORTAL &&
1372 CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND))) {
1373
1374 running = PTLRPC_NTHRS_INIT;
1375 if (svcpt->scp_service->srv_ops.so_hpreq_handler)
1376 running += 1;
1377 }
1378
1379 if (force ||
1380 svcpt->scp_nreqs_active < running - 2)
1381 return true;
1382
1383 if (svcpt->scp_nreqs_active >= running - 1)
1384 return false;
1385
1386 return svcpt->scp_nhreqs_active > 0 || !nrs_svcpt_has_hp(svcpt);
1387}
1388
1389static bool ptlrpc_server_normal_pending(struct ptlrpc_service_part *svcpt,
1390 bool force)
1391{
1392 return ptlrpc_server_allow_normal(svcpt, force) &&
1393 ptlrpc_nrs_req_pending_nolock(svcpt, false);
1394}
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404static inline bool
1405ptlrpc_server_request_pending(struct ptlrpc_service_part *svcpt, bool force)
1406{
1407 return ptlrpc_server_high_pending(svcpt, force) ||
1408 ptlrpc_server_normal_pending(svcpt, force);
1409}
1410
1411
1412
1413
1414
1415
1416static struct ptlrpc_request *
1417ptlrpc_server_request_get(struct ptlrpc_service_part *svcpt, bool force)
1418{
1419 struct ptlrpc_request *req = NULL;
1420
1421 spin_lock(&svcpt->scp_req_lock);
1422
1423 if (ptlrpc_server_high_pending(svcpt, force)) {
1424 req = ptlrpc_nrs_req_get_nolock(svcpt, true, force);
1425 if (req) {
1426 svcpt->scp_hreq_count++;
1427 goto got_request;
1428 }
1429 }
1430
1431 if (ptlrpc_server_normal_pending(svcpt, force)) {
1432 req = ptlrpc_nrs_req_get_nolock(svcpt, false, force);
1433 if (req) {
1434 svcpt->scp_hreq_count = 0;
1435 goto got_request;
1436 }
1437 }
1438
1439 spin_unlock(&svcpt->scp_req_lock);
1440 return NULL;
1441
1442got_request:
1443 svcpt->scp_nreqs_active++;
1444 if (req->rq_hp)
1445 svcpt->scp_nhreqs_active++;
1446
1447 spin_unlock(&svcpt->scp_req_lock);
1448
1449 if (likely(req->rq_export))
1450 class_export_rpc_inc(req->rq_export);
1451
1452 return req;
1453}
1454
1455
1456
1457
1458
1459
1460
1461static int
1462ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
1463 struct ptlrpc_thread *thread)
1464{
1465 struct ptlrpc_service *svc = svcpt->scp_service;
1466 struct ptlrpc_request *req;
1467 __u32 deadline;
1468 int rc;
1469
1470 spin_lock(&svcpt->scp_lock);
1471 if (list_empty(&svcpt->scp_req_incoming)) {
1472 spin_unlock(&svcpt->scp_lock);
1473 return 0;
1474 }
1475
1476 req = list_entry(svcpt->scp_req_incoming.next,
1477 struct ptlrpc_request, rq_list);
1478 list_del_init(&req->rq_list);
1479 svcpt->scp_nreqs_incoming--;
1480
1481
1482
1483 spin_unlock(&svcpt->scp_lock);
1484
1485
1486 rc = sptlrpc_svc_unwrap_request(req);
1487 switch (rc) {
1488 case SECSVC_OK:
1489 break;
1490 case SECSVC_COMPLETE:
1491 target_send_reply(req, 0, OBD_FAIL_MDS_ALL_REPLY_NET);
1492 goto err_req;
1493 case SECSVC_DROP:
1494 goto err_req;
1495 default:
1496 LBUG();
1497 }
1498
1499
1500
1501
1502
1503 if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) {
1504 rc = ptlrpc_unpack_req_msg(req, req->rq_reqlen);
1505 if (rc != 0) {
1506 CERROR("error unpacking request: ptl %d from %s x%llu\n",
1507 svc->srv_req_portal, libcfs_id2str(req->rq_peer),
1508 req->rq_xid);
1509 goto err_req;
1510 }
1511 }
1512
1513 rc = lustre_unpack_req_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
1514 if (rc) {
1515 CERROR("error unpacking ptlrpc body: ptl %d from %s x%llu\n",
1516 svc->srv_req_portal, libcfs_id2str(req->rq_peer),
1517 req->rq_xid);
1518 goto err_req;
1519 }
1520
1521 if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_REQ_OPC) &&
1522 lustre_msg_get_opc(req->rq_reqmsg) == cfs_fail_val) {
1523 CERROR("drop incoming rpc opc %u, x%llu\n",
1524 cfs_fail_val, req->rq_xid);
1525 goto err_req;
1526 }
1527
1528 rc = -EINVAL;
1529 if (lustre_msg_get_type(req->rq_reqmsg) != PTL_RPC_MSG_REQUEST) {
1530 CERROR("wrong packet type received (type=%u) from %s\n",
1531 lustre_msg_get_type(req->rq_reqmsg),
1532 libcfs_id2str(req->rq_peer));
1533 goto err_req;
1534 }
1535
1536 switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1537 case MDS_WRITEPAGE:
1538 case OST_WRITE:
1539 req->rq_bulk_write = 1;
1540 break;
1541 case MDS_READPAGE:
1542 case OST_READ:
1543 case MGS_CONFIG_READ:
1544 req->rq_bulk_read = 1;
1545 break;
1546 }
1547
1548 CDEBUG(D_RPCTRACE, "got req x%llu\n", req->rq_xid);
1549
1550 req->rq_export = class_conn2export(
1551 lustre_msg_get_handle(req->rq_reqmsg));
1552 if (req->rq_export) {
1553 rc = ptlrpc_check_req(req);
1554 if (rc == 0) {
1555 rc = sptlrpc_target_export_check(req->rq_export, req);
1556 if (rc)
1557 DEBUG_REQ(D_ERROR, req, "DROPPING req with illegal security flavor,");
1558 }
1559
1560 if (rc)
1561 goto err_req;
1562 }
1563
1564
1565 if (ktime_get_real_seconds() - req->rq_arrival_time.tv_sec > 5)
1566 DEBUG_REQ(D_WARNING, req, "Slow req_in handling "CFS_DURATION_T"s",
1567 (long)(ktime_get_real_seconds() -
1568 req->rq_arrival_time.tv_sec));
1569
1570
1571 deadline = (lustre_msghdr_get_flags(req->rq_reqmsg) &
1572 MSGHDR_AT_SUPPORT) ?
1573
1574 lustre_msg_get_timeout(req->rq_reqmsg) : obd_timeout;
1575 req->rq_deadline = req->rq_arrival_time.tv_sec + deadline;
1576 if (unlikely(deadline == 0)) {
1577 DEBUG_REQ(D_ERROR, req, "Dropping request with 0 timeout");
1578 goto err_req;
1579 }
1580
1581 req->rq_svc_thread = thread;
1582
1583 ptlrpc_at_add_timed(req);
1584
1585
1586 rc = ptlrpc_server_request_add(svcpt, req);
1587 if (rc)
1588 goto err_req;
1589
1590 wake_up(&svcpt->scp_waitq);
1591 return 1;
1592
1593err_req:
1594 ptlrpc_server_finish_request(svcpt, req);
1595
1596 return 1;
1597}
1598
1599
1600
1601
1602
1603static int
1604ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
1605 struct ptlrpc_thread *thread)
1606{
1607 struct ptlrpc_service *svc = svcpt->scp_service;
1608 struct ptlrpc_request *request;
1609 struct timespec64 work_start;
1610 struct timespec64 work_end;
1611 struct timespec64 timediff;
1612 struct timespec64 arrived;
1613 unsigned long timediff_usecs;
1614 unsigned long arrived_usecs;
1615 int rc;
1616 int fail_opc = 0;
1617
1618 request = ptlrpc_server_request_get(svcpt, false);
1619 if (!request)
1620 return 0;
1621
1622 if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT))
1623 fail_opc = OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT;
1624 else if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
1625 fail_opc = OBD_FAIL_PTLRPC_HPREQ_TIMEOUT;
1626
1627 if (unlikely(fail_opc)) {
1628 if (request->rq_export && request->rq_ops)
1629 OBD_FAIL_TIMEOUT(fail_opc, 4);
1630 }
1631
1632 ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
1633
1634 if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG))
1635 libcfs_debug_dumplog();
1636
1637 ktime_get_real_ts64(&work_start);
1638 timediff = timespec64_sub(work_start, request->rq_arrival_time);
1639 timediff_usecs = timediff.tv_sec * USEC_PER_SEC +
1640 timediff.tv_nsec / NSEC_PER_USEC;
1641 if (likely(svc->srv_stats)) {
1642 lprocfs_counter_add(svc->srv_stats, PTLRPC_REQWAIT_CNTR,
1643 timediff_usecs);
1644 lprocfs_counter_add(svc->srv_stats, PTLRPC_REQQDEPTH_CNTR,
1645 svcpt->scp_nreqs_incoming);
1646 lprocfs_counter_add(svc->srv_stats, PTLRPC_REQACTIVE_CNTR,
1647 svcpt->scp_nreqs_active);
1648 lprocfs_counter_add(svc->srv_stats, PTLRPC_TIMEOUT,
1649 at_get(&svcpt->scp_at_estimate));
1650 }
1651
1652 rc = lu_context_init(&request->rq_session, LCT_SESSION | LCT_NOREF);
1653 if (rc) {
1654 CERROR("Failure to initialize session: %d\n", rc);
1655 goto out_req;
1656 }
1657 request->rq_session.lc_thread = thread;
1658 request->rq_session.lc_cookie = 0x5;
1659 lu_context_enter(&request->rq_session);
1660
1661 CDEBUG(D_NET, "got req %llu\n", request->rq_xid);
1662
1663 request->rq_svc_thread = thread;
1664 if (thread)
1665 request->rq_svc_thread->t_env->le_ses = &request->rq_session;
1666
1667 if (likely(request->rq_export)) {
1668 if (unlikely(ptlrpc_check_req(request)))
1669 goto put_conn;
1670 }
1671
1672
1673
1674
1675 if (ktime_get_real_seconds() > request->rq_deadline) {
1676 DEBUG_REQ(D_ERROR, request, "Dropping timed-out request from %s: deadline " CFS_DURATION_T ":" CFS_DURATION_T "s ago\n",
1677 libcfs_id2str(request->rq_peer),
1678 (long)(request->rq_deadline -
1679 request->rq_arrival_time.tv_sec),
1680 (long)(ktime_get_real_seconds() -
1681 request->rq_deadline));
1682 goto put_conn;
1683 }
1684
1685 CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:nid:opc %s:%s+%d:%d:x%llu:%s:%d\n",
1686 current_comm(),
1687 (request->rq_export ?
1688 (char *)request->rq_export->exp_client_uuid.uuid : "0"),
1689 (request->rq_export ?
1690 atomic_read(&request->rq_export->exp_refcount) : -99),
1691 lustre_msg_get_status(request->rq_reqmsg), request->rq_xid,
1692 libcfs_id2str(request->rq_peer),
1693 lustre_msg_get_opc(request->rq_reqmsg));
1694
1695 if (lustre_msg_get_opc(request->rq_reqmsg) != OBD_PING)
1696 CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, cfs_fail_val);
1697
1698 rc = svc->srv_ops.so_req_handler(request);
1699
1700 ptlrpc_rqphase_move(request, RQ_PHASE_COMPLETE);
1701
1702put_conn:
1703 lu_context_exit(&request->rq_session);
1704 lu_context_fini(&request->rq_session);
1705
1706 if (unlikely(ktime_get_real_seconds() > request->rq_deadline)) {
1707 DEBUG_REQ(D_WARNING, request,
1708 "Request took longer than estimated (%lld:%llds); "
1709 "client may timeout.",
1710 (s64)request->rq_deadline -
1711 request->rq_arrival_time.tv_sec,
1712 (s64)ktime_get_real_seconds() - request->rq_deadline);
1713 }
1714
1715 ktime_get_real_ts64(&work_end);
1716 timediff = timespec64_sub(work_end, work_start);
1717 timediff_usecs = timediff.tv_sec * USEC_PER_SEC +
1718 timediff.tv_nsec / NSEC_PER_USEC;
1719 arrived = timespec64_sub(work_end, request->rq_arrival_time);
1720 arrived_usecs = arrived.tv_sec * USEC_PER_SEC +
1721 arrived.tv_nsec / NSEC_PER_USEC;
1722 CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc %s:%s+%d:%d:x%llu:%s:%d Request processed in %ldus (%ldus total) trans %llu rc %d/%d\n",
1723 current_comm(),
1724 (request->rq_export ?
1725 (char *)request->rq_export->exp_client_uuid.uuid : "0"),
1726 (request->rq_export ?
1727 atomic_read(&request->rq_export->exp_refcount) : -99),
1728 lustre_msg_get_status(request->rq_reqmsg),
1729 request->rq_xid,
1730 libcfs_id2str(request->rq_peer),
1731 lustre_msg_get_opc(request->rq_reqmsg),
1732 timediff_usecs,
1733 arrived_usecs,
1734 (request->rq_repmsg ?
1735 lustre_msg_get_transno(request->rq_repmsg) :
1736 request->rq_transno),
1737 request->rq_status,
1738 (request->rq_repmsg ?
1739 lustre_msg_get_status(request->rq_repmsg) : -999));
1740 if (likely(svc->srv_stats && request->rq_reqmsg)) {
1741 __u32 op = lustre_msg_get_opc(request->rq_reqmsg);
1742 int opc = opcode_offset(op);
1743
1744 if (opc > 0 && !(op == LDLM_ENQUEUE || op == MDS_REINT)) {
1745 LASSERT(opc < LUSTRE_MAX_OPCODES);
1746 lprocfs_counter_add(svc->srv_stats,
1747 opc + EXTRA_MAX_OPCODES,
1748 timediff_usecs);
1749 }
1750 }
1751 if (unlikely(request->rq_early_count)) {
1752 DEBUG_REQ(D_ADAPTTO, request,
1753 "sent %d early replies before finishing in %llds",
1754 request->rq_early_count,
1755 (s64)work_end.tv_sec -
1756 request->rq_arrival_time.tv_sec);
1757 }
1758
1759out_req:
1760 ptlrpc_server_finish_active_request(svcpt, request);
1761
1762 return 1;
1763}
1764
1765
1766
1767
1768static int
1769ptlrpc_handle_rs(struct ptlrpc_reply_state *rs)
1770{
1771 struct ptlrpc_service_part *svcpt = rs->rs_svcpt;
1772 struct ptlrpc_service *svc = svcpt->scp_service;
1773 struct obd_export *exp;
1774 int nlocks;
1775 int been_handled;
1776
1777 exp = rs->rs_export;
1778
1779 LASSERT(rs->rs_difficult);
1780 LASSERT(rs->rs_scheduled);
1781 LASSERT(list_empty(&rs->rs_list));
1782
1783 spin_lock(&exp->exp_lock);
1784
1785 list_del_init(&rs->rs_exp_list);
1786 spin_unlock(&exp->exp_lock);
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810 if (!rs->rs_committed) {
1811 spin_lock(&exp->exp_uncommitted_replies_lock);
1812 list_del_init(&rs->rs_obd_list);
1813 spin_unlock(&exp->exp_uncommitted_replies_lock);
1814 }
1815
1816 spin_lock(&rs->rs_lock);
1817
1818 been_handled = rs->rs_handled;
1819 rs->rs_handled = 1;
1820
1821 nlocks = rs->rs_nlocks;
1822 rs->rs_nlocks = 0;
1823
1824 if (nlocks == 0 && !been_handled) {
1825
1826
1827
1828 CDEBUG(D_HA, "All locks stolen from rs %p x%lld.t%lld o%d NID %s\n",
1829 rs,
1830 rs->rs_xid, rs->rs_transno, rs->rs_opc,
1831 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1832 }
1833
1834 if ((!been_handled && rs->rs_on_net) || nlocks > 0) {
1835 spin_unlock(&rs->rs_lock);
1836
1837 if (!been_handled && rs->rs_on_net) {
1838 LNetMDUnlink(rs->rs_md_h);
1839
1840 }
1841
1842 while (nlocks-- > 0)
1843 ldlm_lock_decref(&rs->rs_locks[nlocks],
1844 rs->rs_modes[nlocks]);
1845
1846 spin_lock(&rs->rs_lock);
1847 }
1848
1849 rs->rs_scheduled = 0;
1850
1851 if (!rs->rs_on_net) {
1852
1853 spin_unlock(&rs->rs_lock);
1854
1855 class_export_put(exp);
1856 rs->rs_export = NULL;
1857 ptlrpc_rs_decref(rs);
1858 if (atomic_dec_and_test(&svcpt->scp_nreps_difficult) &&
1859 svc->srv_is_stopping)
1860 wake_up_all(&svcpt->scp_waitq);
1861 return 1;
1862 }
1863
1864
1865 spin_unlock(&rs->rs_lock);
1866 return 1;
1867}
1868
1869static void
1870ptlrpc_check_rqbd_pool(struct ptlrpc_service_part *svcpt)
1871{
1872 int avail = svcpt->scp_nrqbds_posted;
1873 int low_water = test_req_buffer_pressure ? 0 :
1874 svcpt->scp_service->srv_nbuf_per_group / 2;
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884 if (avail <= low_water)
1885 ptlrpc_grow_req_bufs(svcpt, 1);
1886
1887 if (svcpt->scp_service->srv_stats) {
1888 lprocfs_counter_add(svcpt->scp_service->srv_stats,
1889 PTLRPC_REQBUF_AVAIL_CNTR, avail);
1890 }
1891}
1892
1893static int
1894ptlrpc_retry_rqbds(void *arg)
1895{
1896 struct ptlrpc_service_part *svcpt = arg;
1897
1898 svcpt->scp_rqbd_timeout = 0;
1899 return -ETIMEDOUT;
1900}
1901
1902static inline int
1903ptlrpc_threads_enough(struct ptlrpc_service_part *svcpt)
1904{
1905 return svcpt->scp_nreqs_active <
1906 svcpt->scp_nthrs_running - 1 -
1907 (svcpt->scp_service->srv_ops.so_hpreq_handler != NULL);
1908}
1909
1910
1911
1912
1913
1914
1915static inline int
1916ptlrpc_threads_increasable(struct ptlrpc_service_part *svcpt)
1917{
1918 return svcpt->scp_nthrs_running +
1919 svcpt->scp_nthrs_starting <
1920 svcpt->scp_service->srv_nthrs_cpt_limit;
1921}
1922
1923
1924
1925
1926static inline int
1927ptlrpc_threads_need_create(struct ptlrpc_service_part *svcpt)
1928{
1929 return !ptlrpc_threads_enough(svcpt) &&
1930 ptlrpc_threads_increasable(svcpt);
1931}
1932
1933static inline int
1934ptlrpc_thread_stopping(struct ptlrpc_thread *thread)
1935{
1936 return thread_is_stopping(thread) ||
1937 thread->t_svcpt->scp_service->srv_is_stopping;
1938}
1939
1940static inline int
1941ptlrpc_rqbd_pending(struct ptlrpc_service_part *svcpt)
1942{
1943 return !list_empty(&svcpt->scp_rqbd_idle) &&
1944 svcpt->scp_rqbd_timeout == 0;
1945}
1946
1947static inline int
1948ptlrpc_at_check(struct ptlrpc_service_part *svcpt)
1949{
1950 return svcpt->scp_at_check;
1951}
1952
1953
1954
1955
1956
1957
1958static inline int
1959ptlrpc_server_request_incoming(struct ptlrpc_service_part *svcpt)
1960{
1961 return !list_empty(&svcpt->scp_req_incoming);
1962}
1963
1964static __attribute__((__noinline__)) int
1965ptlrpc_wait_event(struct ptlrpc_service_part *svcpt,
1966 struct ptlrpc_thread *thread)
1967{
1968
1969 struct l_wait_info lwi = LWI_TIMEOUT(svcpt->scp_rqbd_timeout,
1970 ptlrpc_retry_rqbds, svcpt);
1971
1972
1973
1974
1975
1976 cond_resched();
1977
1978 l_wait_event_exclusive_head(svcpt->scp_waitq,
1979 ptlrpc_thread_stopping(thread) ||
1980 ptlrpc_server_request_incoming(svcpt) ||
1981 ptlrpc_server_request_pending(svcpt, false) ||
1982 ptlrpc_rqbd_pending(svcpt) ||
1983 ptlrpc_at_check(svcpt), &lwi);
1984
1985 if (ptlrpc_thread_stopping(thread))
1986 return -EINTR;
1987
1988
1989
1990
1991
1992 return 0;
1993}
1994
1995
1996
1997
1998
1999
2000
2001static int ptlrpc_main(void *arg)
2002{
2003 struct ptlrpc_thread *thread = arg;
2004 struct ptlrpc_service_part *svcpt = thread->t_svcpt;
2005 struct ptlrpc_service *svc = svcpt->scp_service;
2006 struct ptlrpc_reply_state *rs;
2007 struct group_info *ginfo = NULL;
2008 struct lu_env *env;
2009 int counter = 0, rc = 0;
2010
2011 thread->t_pid = current_pid();
2012 unshare_fs_struct();
2013
2014
2015
2016
2017
2018 rc = cfs_cpt_bind(svc->srv_cptable, svcpt->scp_cpt);
2019 if (rc != 0) {
2020 CWARN("%s: failed to bind %s on CPT %d\n",
2021 svc->srv_name, thread->t_name, svcpt->scp_cpt);
2022 }
2023
2024 ginfo = groups_alloc(0);
2025 if (!ginfo) {
2026 rc = -ENOMEM;
2027 goto out;
2028 }
2029
2030 set_current_groups(ginfo);
2031 put_group_info(ginfo);
2032
2033 if (svc->srv_ops.so_thr_init) {
2034 rc = svc->srv_ops.so_thr_init(thread);
2035 if (rc)
2036 goto out;
2037 }
2038
2039 env = kzalloc(sizeof(*env), GFP_NOFS);
2040 if (!env) {
2041 rc = -ENOMEM;
2042 goto out_srv_fini;
2043 }
2044
2045 rc = lu_context_init(&env->le_ctx,
2046 svc->srv_ctx_tags|LCT_REMEMBER|LCT_NOREF);
2047 if (rc)
2048 goto out_srv_fini;
2049
2050 thread->t_env = env;
2051 env->le_ctx.lc_thread = thread;
2052 env->le_ctx.lc_cookie = 0x6;
2053
2054 while (!list_empty(&svcpt->scp_rqbd_idle)) {
2055 rc = ptlrpc_server_post_idle_rqbds(svcpt);
2056 if (rc >= 0)
2057 continue;
2058
2059 CERROR("Failed to post rqbd for %s on CPT %d: %d\n",
2060 svc->srv_name, svcpt->scp_cpt, rc);
2061 goto out_srv_fini;
2062 }
2063
2064
2065 rs = libcfs_kvzalloc(svc->srv_max_reply_size, GFP_NOFS);
2066 if (!rs) {
2067 rc = -ENOMEM;
2068 goto out_srv_fini;
2069 }
2070
2071 spin_lock(&svcpt->scp_lock);
2072
2073 LASSERT(thread_is_starting(thread));
2074 thread_clear_flags(thread, SVC_STARTING);
2075
2076 LASSERT(svcpt->scp_nthrs_starting == 1);
2077 svcpt->scp_nthrs_starting--;
2078
2079
2080
2081
2082
2083
2084 thread_add_flags(thread, SVC_RUNNING);
2085 svcpt->scp_nthrs_running++;
2086 spin_unlock(&svcpt->scp_lock);
2087
2088
2089 wake_up(&thread->t_ctl_waitq);
2090
2091
2092
2093
2094
2095
2096 spin_lock(&svcpt->scp_rep_lock);
2097 list_add(&rs->rs_list, &svcpt->scp_rep_idle);
2098 wake_up(&svcpt->scp_rep_waitq);
2099 spin_unlock(&svcpt->scp_rep_lock);
2100
2101 CDEBUG(D_NET, "service thread %d (#%d) started\n", thread->t_id,
2102 svcpt->scp_nthrs_running);
2103
2104
2105 while (!ptlrpc_thread_stopping(thread)) {
2106 if (ptlrpc_wait_event(svcpt, thread))
2107 break;
2108
2109 ptlrpc_check_rqbd_pool(svcpt);
2110
2111 if (ptlrpc_threads_need_create(svcpt)) {
2112
2113 ptlrpc_start_thread(svcpt, 0);
2114 }
2115
2116
2117 if (ptlrpc_server_request_incoming(svcpt)) {
2118 lu_context_enter(&env->le_ctx);
2119 env->le_ses = NULL;
2120 ptlrpc_server_handle_req_in(svcpt, thread);
2121 lu_context_exit(&env->le_ctx);
2122
2123
2124 if (counter++ < 100)
2125 continue;
2126 counter = 0;
2127 }
2128
2129 if (ptlrpc_at_check(svcpt))
2130 ptlrpc_at_check_timed(svcpt);
2131
2132 if (ptlrpc_server_request_pending(svcpt, false)) {
2133 lu_context_enter(&env->le_ctx);
2134 ptlrpc_server_handle_request(svcpt, thread);
2135 lu_context_exit(&env->le_ctx);
2136 }
2137
2138 if (ptlrpc_rqbd_pending(svcpt) &&
2139 ptlrpc_server_post_idle_rqbds(svcpt) < 0) {
2140
2141
2142
2143
2144 svcpt->scp_rqbd_timeout = cfs_time_seconds(1) / 10;
2145 CDEBUG(D_RPCTRACE, "Posted buffers: %d\n",
2146 svcpt->scp_nrqbds_posted);
2147 }
2148 }
2149
2150
2151
2152
2153
2154
2155out_srv_fini:
2156
2157
2158
2159 if (svc->srv_ops.so_thr_done)
2160 svc->srv_ops.so_thr_done(thread);
2161
2162 if (env) {
2163 lu_context_fini(&env->le_ctx);
2164 kfree(env);
2165 }
2166out:
2167 CDEBUG(D_RPCTRACE, "service thread [ %p : %u ] %d exiting: rc %d\n",
2168 thread, thread->t_pid, thread->t_id, rc);
2169
2170 spin_lock(&svcpt->scp_lock);
2171 if (thread_test_and_clear_flags(thread, SVC_STARTING))
2172 svcpt->scp_nthrs_starting--;
2173
2174 if (thread_test_and_clear_flags(thread, SVC_RUNNING)) {
2175
2176 svcpt->scp_nthrs_running--;
2177 }
2178
2179 thread->t_id = rc;
2180 thread_add_flags(thread, SVC_STOPPED);
2181
2182 wake_up(&thread->t_ctl_waitq);
2183 spin_unlock(&svcpt->scp_lock);
2184
2185 return rc;
2186}
2187
2188static int hrt_dont_sleep(struct ptlrpc_hr_thread *hrt,
2189 struct list_head *replies)
2190{
2191 int result;
2192
2193 spin_lock(&hrt->hrt_lock);
2194
2195 list_splice_init(&hrt->hrt_queue, replies);
2196 result = ptlrpc_hr.hr_stopping || !list_empty(replies);
2197
2198 spin_unlock(&hrt->hrt_lock);
2199 return result;
2200}
2201
2202
2203
2204
2205
2206static int ptlrpc_hr_main(void *arg)
2207{
2208 struct ptlrpc_hr_thread *hrt = arg;
2209 struct ptlrpc_hr_partition *hrp = hrt->hrt_partition;
2210 LIST_HEAD(replies);
2211 char threadname[20];
2212 int rc;
2213
2214 snprintf(threadname, sizeof(threadname), "ptlrpc_hr%02d_%03d",
2215 hrp->hrp_cpt, hrt->hrt_id);
2216 unshare_fs_struct();
2217
2218 rc = cfs_cpt_bind(ptlrpc_hr.hr_cpt_table, hrp->hrp_cpt);
2219 if (rc != 0) {
2220 CWARN("Failed to bind %s on CPT %d of CPT table %p: rc = %d\n",
2221 threadname, hrp->hrp_cpt, ptlrpc_hr.hr_cpt_table, rc);
2222 }
2223
2224 atomic_inc(&hrp->hrp_nstarted);
2225 wake_up(&ptlrpc_hr.hr_waitq);
2226
2227 while (!ptlrpc_hr.hr_stopping) {
2228 l_wait_condition(hrt->hrt_waitq, hrt_dont_sleep(hrt, &replies));
2229
2230 while (!list_empty(&replies)) {
2231 struct ptlrpc_reply_state *rs;
2232
2233 rs = list_entry(replies.prev, struct ptlrpc_reply_state,
2234 rs_list);
2235 list_del_init(&rs->rs_list);
2236 ptlrpc_handle_rs(rs);
2237 }
2238 }
2239
2240 atomic_inc(&hrp->hrp_nstopped);
2241 wake_up(&ptlrpc_hr.hr_waitq);
2242
2243 return 0;
2244}
2245
2246static void ptlrpc_stop_hr_threads(void)
2247{
2248 struct ptlrpc_hr_partition *hrp;
2249 int i;
2250 int j;
2251
2252 ptlrpc_hr.hr_stopping = 1;
2253
2254 cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
2255 if (!hrp->hrp_thrs)
2256 continue;
2257 for (j = 0; j < hrp->hrp_nthrs; j++)
2258 wake_up_all(&hrp->hrp_thrs[j].hrt_waitq);
2259 }
2260
2261 cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
2262 if (!hrp->hrp_thrs)
2263 continue;
2264 wait_event(ptlrpc_hr.hr_waitq,
2265 atomic_read(&hrp->hrp_nstopped) ==
2266 atomic_read(&hrp->hrp_nstarted));
2267 }
2268}
2269
2270static int ptlrpc_start_hr_threads(void)
2271{
2272 struct ptlrpc_hr_partition *hrp;
2273 int i;
2274 int j;
2275
2276 cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
2277 int rc = 0;
2278
2279 for (j = 0; j < hrp->hrp_nthrs; j++) {
2280 struct ptlrpc_hr_thread *hrt = &hrp->hrp_thrs[j];
2281 struct task_struct *task;
2282
2283 task = kthread_run(ptlrpc_hr_main,
2284 &hrp->hrp_thrs[j],
2285 "ptlrpc_hr%02d_%03d",
2286 hrp->hrp_cpt, hrt->hrt_id);
2287 if (IS_ERR(task)) {
2288 rc = PTR_ERR(task);
2289 break;
2290 }
2291 }
2292 wait_event(ptlrpc_hr.hr_waitq,
2293 atomic_read(&hrp->hrp_nstarted) == j);
2294
2295 if (rc < 0) {
2296 CERROR("cannot start reply handler thread %d:%d: rc = %d\n",
2297 i, j, rc);
2298 ptlrpc_stop_hr_threads();
2299 return rc;
2300 }
2301 }
2302 return 0;
2303}
2304
2305static void ptlrpc_svcpt_stop_threads(struct ptlrpc_service_part *svcpt)
2306{
2307 struct l_wait_info lwi = { 0 };
2308 struct ptlrpc_thread *thread;
2309 LIST_HEAD(zombie);
2310
2311 CDEBUG(D_INFO, "Stopping threads for service %s\n",
2312 svcpt->scp_service->srv_name);
2313
2314 spin_lock(&svcpt->scp_lock);
2315
2316 list_for_each_entry(thread, &svcpt->scp_threads, t_link) {
2317 CDEBUG(D_INFO, "Stopping thread %s #%u\n",
2318 svcpt->scp_service->srv_thread_name, thread->t_id);
2319 thread_add_flags(thread, SVC_STOPPING);
2320 }
2321
2322 wake_up_all(&svcpt->scp_waitq);
2323
2324 while (!list_empty(&svcpt->scp_threads)) {
2325 thread = list_entry(svcpt->scp_threads.next,
2326 struct ptlrpc_thread, t_link);
2327 if (thread_is_stopped(thread)) {
2328 list_del(&thread->t_link);
2329 list_add(&thread->t_link, &zombie);
2330 continue;
2331 }
2332 spin_unlock(&svcpt->scp_lock);
2333
2334 CDEBUG(D_INFO, "waiting for stopping-thread %s #%u\n",
2335 svcpt->scp_service->srv_thread_name, thread->t_id);
2336 l_wait_event(thread->t_ctl_waitq,
2337 thread_is_stopped(thread), &lwi);
2338
2339 spin_lock(&svcpt->scp_lock);
2340 }
2341
2342 spin_unlock(&svcpt->scp_lock);
2343
2344 while (!list_empty(&zombie)) {
2345 thread = list_entry(zombie.next,
2346 struct ptlrpc_thread, t_link);
2347 list_del(&thread->t_link);
2348 kfree(thread);
2349 }
2350}
2351
2352
2353
2354
2355static void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
2356{
2357 struct ptlrpc_service_part *svcpt;
2358 int i;
2359
2360 ptlrpc_service_for_each_part(svcpt, i, svc) {
2361 if (svcpt->scp_service)
2362 ptlrpc_svcpt_stop_threads(svcpt);
2363 }
2364}
2365
2366int ptlrpc_start_threads(struct ptlrpc_service *svc)
2367{
2368 int rc = 0;
2369 int i;
2370 int j;
2371
2372
2373 LASSERT(svc->srv_nthrs_cpt_init >= PTLRPC_NTHRS_INIT);
2374
2375 for (i = 0; i < svc->srv_ncpts; i++) {
2376 for (j = 0; j < svc->srv_nthrs_cpt_init; j++) {
2377 rc = ptlrpc_start_thread(svc->srv_parts[i], 1);
2378 if (rc == 0)
2379 continue;
2380
2381 if (rc != -EMFILE)
2382 goto failed;
2383
2384 break;
2385 }
2386 }
2387
2388 return 0;
2389 failed:
2390 CERROR("cannot start %s thread #%d_%d: rc %d\n",
2391 svc->srv_thread_name, i, j, rc);
2392 ptlrpc_stop_all_threads(svc);
2393 return rc;
2394}
2395EXPORT_SYMBOL(ptlrpc_start_threads);
2396
2397int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
2398{
2399 struct l_wait_info lwi = { 0 };
2400 struct ptlrpc_thread *thread;
2401 struct ptlrpc_service *svc;
2402 struct task_struct *task;
2403 int rc;
2404
2405 svc = svcpt->scp_service;
2406
2407 CDEBUG(D_RPCTRACE, "%s[%d] started %d min %d max %d\n",
2408 svc->srv_name, svcpt->scp_cpt, svcpt->scp_nthrs_running,
2409 svc->srv_nthrs_cpt_init, svc->srv_nthrs_cpt_limit);
2410
2411 again:
2412 if (unlikely(svc->srv_is_stopping))
2413 return -ESRCH;
2414
2415 if (!ptlrpc_threads_increasable(svcpt) ||
2416 (OBD_FAIL_CHECK(OBD_FAIL_TGT_TOOMANY_THREADS) &&
2417 svcpt->scp_nthrs_running == svc->srv_nthrs_cpt_init - 1))
2418 return -EMFILE;
2419
2420 thread = kzalloc_node(sizeof(*thread), GFP_NOFS,
2421 cfs_cpt_spread_node(svc->srv_cptable,
2422 svcpt->scp_cpt));
2423 if (!thread)
2424 return -ENOMEM;
2425 init_waitqueue_head(&thread->t_ctl_waitq);
2426
2427 spin_lock(&svcpt->scp_lock);
2428 if (!ptlrpc_threads_increasable(svcpt)) {
2429 spin_unlock(&svcpt->scp_lock);
2430 kfree(thread);
2431 return -EMFILE;
2432 }
2433
2434 if (svcpt->scp_nthrs_starting != 0) {
2435
2436
2437
2438 LASSERT(svcpt->scp_nthrs_starting == 1);
2439 spin_unlock(&svcpt->scp_lock);
2440 kfree(thread);
2441 if (wait) {
2442 CDEBUG(D_INFO, "Waiting for creating thread %s #%d\n",
2443 svc->srv_thread_name, svcpt->scp_thr_nextid);
2444 schedule();
2445 goto again;
2446 }
2447
2448 CDEBUG(D_INFO, "Creating thread %s #%d race, retry later\n",
2449 svc->srv_thread_name, svcpt->scp_thr_nextid);
2450 return -EAGAIN;
2451 }
2452
2453 svcpt->scp_nthrs_starting++;
2454 thread->t_id = svcpt->scp_thr_nextid++;
2455 thread_add_flags(thread, SVC_STARTING);
2456 thread->t_svcpt = svcpt;
2457
2458 list_add(&thread->t_link, &svcpt->scp_threads);
2459 spin_unlock(&svcpt->scp_lock);
2460
2461 if (svcpt->scp_cpt >= 0) {
2462 snprintf(thread->t_name, sizeof(thread->t_name), "%s%02d_%03d",
2463 svc->srv_thread_name, svcpt->scp_cpt, thread->t_id);
2464 } else {
2465 snprintf(thread->t_name, sizeof(thread->t_name), "%s_%04d",
2466 svc->srv_thread_name, thread->t_id);
2467 }
2468
2469 CDEBUG(D_RPCTRACE, "starting thread '%s'\n", thread->t_name);
2470 task = kthread_run(ptlrpc_main, thread, "%s", thread->t_name);
2471 if (IS_ERR(task)) {
2472 rc = PTR_ERR(task);
2473 CERROR("cannot start thread '%s': rc = %d\n",
2474 thread->t_name, rc);
2475 spin_lock(&svcpt->scp_lock);
2476 --svcpt->scp_nthrs_starting;
2477 if (thread_is_stopping(thread)) {
2478
2479
2480
2481 thread_add_flags(thread, SVC_STOPPED);
2482 wake_up(&thread->t_ctl_waitq);
2483 spin_unlock(&svcpt->scp_lock);
2484 } else {
2485 list_del(&thread->t_link);
2486 spin_unlock(&svcpt->scp_lock);
2487 kfree(thread);
2488 }
2489 return rc;
2490 }
2491
2492 if (!wait)
2493 return 0;
2494
2495 l_wait_event(thread->t_ctl_waitq,
2496 thread_is_running(thread) || thread_is_stopped(thread),
2497 &lwi);
2498
2499 rc = thread_is_stopped(thread) ? thread->t_id : 0;
2500 return rc;
2501}
2502
2503int ptlrpc_hr_init(void)
2504{
2505 struct ptlrpc_hr_partition *hrp;
2506 struct ptlrpc_hr_thread *hrt;
2507 int rc;
2508 int i;
2509 int j;
2510 int weight;
2511
2512 memset(&ptlrpc_hr, 0, sizeof(ptlrpc_hr));
2513 ptlrpc_hr.hr_cpt_table = cfs_cpt_table;
2514
2515 ptlrpc_hr.hr_partitions = cfs_percpt_alloc(ptlrpc_hr.hr_cpt_table,
2516 sizeof(*hrp));
2517 if (!ptlrpc_hr.hr_partitions)
2518 return -ENOMEM;
2519
2520 init_waitqueue_head(&ptlrpc_hr.hr_waitq);
2521
2522 weight = cpumask_weight(topology_sibling_cpumask(0));
2523
2524 cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
2525 hrp->hrp_cpt = i;
2526
2527 atomic_set(&hrp->hrp_nstarted, 0);
2528 atomic_set(&hrp->hrp_nstopped, 0);
2529
2530 hrp->hrp_nthrs = cfs_cpt_weight(ptlrpc_hr.hr_cpt_table, i);
2531 hrp->hrp_nthrs /= weight;
2532
2533 LASSERT(hrp->hrp_nthrs > 0);
2534 hrp->hrp_thrs =
2535 kzalloc_node(hrp->hrp_nthrs * sizeof(*hrt), GFP_NOFS,
2536 cfs_cpt_spread_node(ptlrpc_hr.hr_cpt_table,
2537 i));
2538 if (!hrp->hrp_thrs) {
2539 rc = -ENOMEM;
2540 goto out;
2541 }
2542
2543 for (j = 0; j < hrp->hrp_nthrs; j++) {
2544 hrt = &hrp->hrp_thrs[j];
2545
2546 hrt->hrt_id = j;
2547 hrt->hrt_partition = hrp;
2548 init_waitqueue_head(&hrt->hrt_waitq);
2549 spin_lock_init(&hrt->hrt_lock);
2550 INIT_LIST_HEAD(&hrt->hrt_queue);
2551 }
2552 }
2553
2554 rc = ptlrpc_start_hr_threads();
2555out:
2556 if (rc != 0)
2557 ptlrpc_hr_fini();
2558 return rc;
2559}
2560
2561void ptlrpc_hr_fini(void)
2562{
2563 struct ptlrpc_hr_partition *hrp;
2564 int i;
2565
2566 if (!ptlrpc_hr.hr_partitions)
2567 return;
2568
2569 ptlrpc_stop_hr_threads();
2570
2571 cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) {
2572 kfree(hrp->hrp_thrs);
2573 }
2574
2575 cfs_percpt_free(ptlrpc_hr.hr_partitions);
2576 ptlrpc_hr.hr_partitions = NULL;
2577}
2578
2579
2580
2581
2582static void ptlrpc_wait_replies(struct ptlrpc_service_part *svcpt)
2583{
2584 while (1) {
2585 int rc;
2586 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(10),
2587 NULL, NULL);
2588
2589 rc = l_wait_event(svcpt->scp_waitq,
2590 atomic_read(&svcpt->scp_nreps_difficult) == 0, &lwi);
2591 if (rc == 0)
2592 break;
2593 CWARN("Unexpectedly long timeout %s %p\n",
2594 svcpt->scp_service->srv_name, svcpt->scp_service);
2595 }
2596}
2597
2598static void
2599ptlrpc_service_del_atimer(struct ptlrpc_service *svc)
2600{
2601 struct ptlrpc_service_part *svcpt;
2602 int i;
2603
2604
2605 ptlrpc_service_for_each_part(svcpt, i, svc) {
2606 if (svcpt->scp_service)
2607 del_timer(&svcpt->scp_at_timer);
2608 }
2609}
2610
2611static void
2612ptlrpc_service_unlink_rqbd(struct ptlrpc_service *svc)
2613{
2614 struct ptlrpc_service_part *svcpt;
2615 struct ptlrpc_request_buffer_desc *rqbd;
2616 struct l_wait_info lwi;
2617 int rc;
2618 int i;
2619
2620
2621
2622
2623 svc->srv_hist_nrqbds_cpt_max = 0;
2624
2625 rc = LNetClearLazyPortal(svc->srv_req_portal);
2626 LASSERT(rc == 0);
2627
2628 ptlrpc_service_for_each_part(svcpt, i, svc) {
2629 if (!svcpt->scp_service)
2630 break;
2631
2632
2633
2634
2635 list_for_each_entry(rqbd, &svcpt->scp_rqbd_posted,
2636 rqbd_list) {
2637 rc = LNetMDUnlink(rqbd->rqbd_md_h);
2638 LASSERT(rc == 0 || rc == -ENOENT);
2639 }
2640 }
2641
2642 ptlrpc_service_for_each_part(svcpt, i, svc) {
2643 if (!svcpt->scp_service)
2644 break;
2645
2646
2647
2648
2649 spin_lock(&svcpt->scp_lock);
2650 while (svcpt->scp_nrqbds_posted != 0) {
2651 spin_unlock(&svcpt->scp_lock);
2652
2653
2654
2655
2656 lwi = LWI_TIMEOUT_INTERVAL(
2657 cfs_time_seconds(LONG_UNLINK),
2658 cfs_time_seconds(1), NULL, NULL);
2659 rc = l_wait_event(svcpt->scp_waitq,
2660 svcpt->scp_nrqbds_posted == 0, &lwi);
2661 if (rc == -ETIMEDOUT) {
2662 CWARN("Service %s waiting for request buffers\n",
2663 svcpt->scp_service->srv_name);
2664 }
2665 spin_lock(&svcpt->scp_lock);
2666 }
2667 spin_unlock(&svcpt->scp_lock);
2668 }
2669}
2670
2671static void
2672ptlrpc_service_purge_all(struct ptlrpc_service *svc)
2673{
2674 struct ptlrpc_service_part *svcpt;
2675 struct ptlrpc_request_buffer_desc *rqbd;
2676 struct ptlrpc_request *req;
2677 struct ptlrpc_reply_state *rs;
2678 int i;
2679
2680 ptlrpc_service_for_each_part(svcpt, i, svc) {
2681 if (!svcpt->scp_service)
2682 break;
2683
2684 spin_lock(&svcpt->scp_rep_lock);
2685 while (!list_empty(&svcpt->scp_rep_active)) {
2686 rs = list_entry(svcpt->scp_rep_active.next,
2687 struct ptlrpc_reply_state, rs_list);
2688 spin_lock(&rs->rs_lock);
2689 ptlrpc_schedule_difficult_reply(rs);
2690 spin_unlock(&rs->rs_lock);
2691 }
2692 spin_unlock(&svcpt->scp_rep_lock);
2693
2694
2695
2696
2697
2698 while (!list_empty(&svcpt->scp_req_incoming)) {
2699 req = list_entry(svcpt->scp_req_incoming.next,
2700 struct ptlrpc_request, rq_list);
2701
2702 list_del(&req->rq_list);
2703 svcpt->scp_nreqs_incoming--;
2704 ptlrpc_server_finish_request(svcpt, req);
2705 }
2706
2707 while (ptlrpc_server_request_pending(svcpt, true)) {
2708 req = ptlrpc_server_request_get(svcpt, true);
2709 ptlrpc_server_finish_active_request(svcpt, req);
2710 }
2711
2712 LASSERT(list_empty(&svcpt->scp_rqbd_posted));
2713 LASSERT(svcpt->scp_nreqs_incoming == 0);
2714 LASSERT(svcpt->scp_nreqs_active == 0);
2715
2716
2717
2718 LASSERT(svcpt->scp_hist_nrqbds == 0);
2719
2720
2721
2722
2723
2724 while (!list_empty(&svcpt->scp_rqbd_idle)) {
2725 rqbd = list_entry(svcpt->scp_rqbd_idle.next,
2726 struct ptlrpc_request_buffer_desc,
2727 rqbd_list);
2728 ptlrpc_free_rqbd(rqbd);
2729 }
2730 ptlrpc_wait_replies(svcpt);
2731
2732 while (!list_empty(&svcpt->scp_rep_idle)) {
2733 rs = list_entry(svcpt->scp_rep_idle.next,
2734 struct ptlrpc_reply_state,
2735 rs_list);
2736 list_del(&rs->rs_list);
2737 kvfree(rs);
2738 }
2739 }
2740}
2741
2742static void
2743ptlrpc_service_free(struct ptlrpc_service *svc)
2744{
2745 struct ptlrpc_service_part *svcpt;
2746 struct ptlrpc_at_array *array;
2747 int i;
2748
2749 ptlrpc_service_for_each_part(svcpt, i, svc) {
2750 if (!svcpt->scp_service)
2751 break;
2752
2753
2754 del_timer(&svcpt->scp_at_timer);
2755 array = &svcpt->scp_at_array;
2756
2757 kfree(array->paa_reqs_array);
2758 array->paa_reqs_array = NULL;
2759 kfree(array->paa_reqs_count);
2760 array->paa_reqs_count = NULL;
2761 }
2762
2763 ptlrpc_service_for_each_part(svcpt, i, svc)
2764 kfree(svcpt);
2765
2766 if (svc->srv_cpts)
2767 cfs_expr_list_values_free(svc->srv_cpts, svc->srv_ncpts);
2768
2769 kfree(svc);
2770}
2771
2772int ptlrpc_unregister_service(struct ptlrpc_service *service)
2773{
2774 CDEBUG(D_NET, "%s: tearing down\n", service->srv_name);
2775
2776 service->srv_is_stopping = 1;
2777
2778 mutex_lock(&ptlrpc_all_services_mutex);
2779 list_del_init(&service->srv_list);
2780 mutex_unlock(&ptlrpc_all_services_mutex);
2781
2782 ptlrpc_service_del_atimer(service);
2783 ptlrpc_stop_all_threads(service);
2784
2785 ptlrpc_service_unlink_rqbd(service);
2786 ptlrpc_service_purge_all(service);
2787 ptlrpc_service_nrs_cleanup(service);
2788
2789 ptlrpc_lprocfs_unregister_service(service);
2790 ptlrpc_sysfs_unregister_service(service);
2791
2792 ptlrpc_service_free(service);
2793
2794 return 0;
2795}
2796EXPORT_SYMBOL(ptlrpc_unregister_service);
2797