1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35#define DEBUG_SUBSYSTEM S_RPC
36
37#include "../include/obd_support.h"
38#include "../include/obd_class.h"
39#include "../include/lustre_lib.h"
40#include "../include/lustre_ha.h"
41#include "../include/lustre_import.h"
42#include "../include/lustre_req_layout.h"
43
44#include "ptlrpc_internal.h"
45
46static int ptlrpc_send_new_req(struct ptlrpc_request *req);
47static int ptlrpcd_check_work(struct ptlrpc_request *req);
48static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async);
49
50
51
52
53void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
54 struct ptlrpc_client *cl)
55{
56 cl->cli_request_portal = req_portal;
57 cl->cli_reply_portal = rep_portal;
58 cl->cli_name = name;
59}
60EXPORT_SYMBOL(ptlrpc_init_client);
61
62
63
64
65struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
66{
67 struct ptlrpc_connection *c;
68 lnet_nid_t self;
69 lnet_process_id_t peer;
70 int err;
71
72
73
74
75
76
77 err = ptlrpc_uuid_to_peer(uuid, &peer, &self);
78 if (err != 0) {
79 CNETERR("cannot find peer %s!\n", uuid->uuid);
80 return NULL;
81 }
82
83 c = ptlrpc_connection_get(peer, self, uuid);
84 if (c) {
85 memcpy(c->c_remote_uuid.uuid,
86 uuid->uuid, sizeof(c->c_remote_uuid.uuid));
87 }
88
89 CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
90
91 return c;
92}
93
94
95
96
97
98struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw,
99 unsigned type, unsigned portal)
100{
101 struct ptlrpc_bulk_desc *desc;
102 int i;
103
104 desc = kzalloc(offsetof(struct ptlrpc_bulk_desc, bd_iov[npages]),
105 GFP_NOFS);
106 if (!desc)
107 return NULL;
108
109 spin_lock_init(&desc->bd_lock);
110 init_waitqueue_head(&desc->bd_waitq);
111 desc->bd_max_iov = npages;
112 desc->bd_iov_count = 0;
113 desc->bd_portal = portal;
114 desc->bd_type = type;
115 desc->bd_md_count = 0;
116 LASSERT(max_brw > 0);
117 desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
118
119
120
121
122 for (i = 0; i < PTLRPC_BULK_OPS_COUNT; i++)
123 LNetInvalidateHandle(&desc->bd_mds[i]);
124
125 return desc;
126}
127
128
129
130
131
132
133
134
135struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
136 unsigned npages, unsigned max_brw,
137 unsigned type, unsigned portal)
138{
139 struct obd_import *imp = req->rq_import;
140 struct ptlrpc_bulk_desc *desc;
141
142 LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
143 desc = ptlrpc_new_bulk(npages, max_brw, type, portal);
144 if (!desc)
145 return NULL;
146
147 desc->bd_import_generation = req->rq_import_generation;
148 desc->bd_import = class_import_get(imp);
149 desc->bd_req = req;
150
151 desc->bd_cbid.cbid_fn = client_bulk_callback;
152 desc->bd_cbid.cbid_arg = desc;
153
154
155 req->rq_bulk = desc;
156
157 return desc;
158}
159EXPORT_SYMBOL(ptlrpc_prep_bulk_imp);
160
161
162
163
164
165
166void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
167 struct page *page, int pageoffset, int len, int pin)
168{
169 LASSERT(desc->bd_iov_count < desc->bd_max_iov);
170 LASSERT(page);
171 LASSERT(pageoffset >= 0);
172 LASSERT(len > 0);
173 LASSERT(pageoffset + len <= PAGE_SIZE);
174
175 desc->bd_nob += len;
176
177 if (pin)
178 get_page(page);
179
180 ptlrpc_add_bulk_page(desc, page, pageoffset, len);
181}
182EXPORT_SYMBOL(__ptlrpc_prep_bulk_page);
183
184
185
186
187
188void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc, int unpin)
189{
190 int i;
191
192 LASSERT(desc->bd_iov_count != LI_POISON);
193 LASSERT(desc->bd_md_count == 0);
194 LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
195
196 sptlrpc_enc_pool_put_pages(desc);
197
198 if (desc->bd_export)
199 class_export_put(desc->bd_export);
200 else
201 class_import_put(desc->bd_import);
202
203 if (unpin) {
204 for (i = 0; i < desc->bd_iov_count; i++)
205 put_page(desc->bd_iov[i].bv_page);
206 }
207
208 kfree(desc);
209}
210EXPORT_SYMBOL(__ptlrpc_free_bulk);
211
212
213
214
215
216void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
217{
218 __u32 serv_est;
219 int idx;
220 struct imp_at *at;
221
222 LASSERT(req->rq_import);
223
224 if (AT_OFF) {
225
226
227
228
229
230
231
232
233
234 req->rq_timeout = req->rq_import->imp_server_timeout ?
235 obd_timeout / 2 : obd_timeout;
236 } else {
237 at = &req->rq_import->imp_at;
238 idx = import_at_get_index(req->rq_import,
239 req->rq_request_portal);
240 serv_est = at_get(&at->iat_service_estimate[idx]);
241 req->rq_timeout = at_est2timeout(serv_est);
242 }
243
244
245
246
247
248
249
250
251
252 lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
253}
254EXPORT_SYMBOL(ptlrpc_at_set_req_timeout);
255
256
257static void ptlrpc_at_adj_service(struct ptlrpc_request *req,
258 unsigned int serv_est)
259{
260 int idx;
261 unsigned int oldse;
262 struct imp_at *at;
263
264 LASSERT(req->rq_import);
265 at = &req->rq_import->imp_at;
266
267 idx = import_at_get_index(req->rq_import, req->rq_request_portal);
268
269
270
271
272 oldse = at_measured(&at->iat_service_estimate[idx], serv_est);
273 if (oldse != 0)
274 CDEBUG(D_ADAPTTO, "The RPC service estimate for %s ptl %d has changed from %d to %d\n",
275 req->rq_import->imp_obd->obd_name, req->rq_request_portal,
276 oldse, at_get(&at->iat_service_estimate[idx]));
277}
278
279
280int ptlrpc_at_get_net_latency(struct ptlrpc_request *req)
281{
282 return AT_OFF ? 0 : at_get(&req->rq_import->imp_at.iat_net_latency);
283}
284
285
286void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
287 unsigned int service_time)
288{
289 unsigned int nl, oldnl;
290 struct imp_at *at;
291 time64_t now = ktime_get_real_seconds();
292
293 LASSERT(req->rq_import);
294
295 if (service_time > now - req->rq_sent + 3) {
296
297
298
299
300
301
302
303
304 CDEBUG((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ?
305 D_ADAPTTO : D_WARNING,
306 "Reported service time %u > total measured time "
307 CFS_DURATION_T"\n", service_time,
308 (long)(now - req->rq_sent));
309 return;
310 }
311
312
313 nl = max_t(int, now - req->rq_sent -
314 service_time, 0) + 1;
315 at = &req->rq_import->imp_at;
316
317 oldnl = at_measured(&at->iat_net_latency, nl);
318 if (oldnl != 0)
319 CDEBUG(D_ADAPTTO, "The network latency for %s (nid %s) has changed from %d to %d\n",
320 req->rq_import->imp_obd->obd_name,
321 obd_uuid2str(
322 &req->rq_import->imp_connection->c_remote_uuid),
323 oldnl, at_get(&at->iat_net_latency));
324}
325
326static int unpack_reply(struct ptlrpc_request *req)
327{
328 int rc;
329
330 if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) {
331 rc = ptlrpc_unpack_rep_msg(req, req->rq_replen);
332 if (rc) {
333 DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc);
334 return -EPROTO;
335 }
336 }
337
338 rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
339 if (rc) {
340 DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc);
341 return -EPROTO;
342 }
343 return 0;
344}
345
346
347
348
349
350static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req)
351 __must_hold(&req->rq_lock)
352{
353 struct ptlrpc_request *early_req;
354 time64_t olddl;
355 int rc;
356
357 req->rq_early = 0;
358 spin_unlock(&req->rq_lock);
359
360 rc = sptlrpc_cli_unwrap_early_reply(req, &early_req);
361 if (rc) {
362 spin_lock(&req->rq_lock);
363 return rc;
364 }
365
366 rc = unpack_reply(early_req);
367 if (rc) {
368 sptlrpc_cli_finish_early_reply(early_req);
369 spin_lock(&req->rq_lock);
370 return rc;
371 }
372
373
374
375
376
377
378
379
380 req->rq_timeout = lustre_msg_get_timeout(early_req->rq_repmsg);
381 lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
382
383
384 ptlrpc_at_adj_net_latency(req,
385 lustre_msg_get_service_time(early_req->rq_repmsg));
386
387 sptlrpc_cli_finish_early_reply(early_req);
388
389 spin_lock(&req->rq_lock);
390 olddl = req->rq_deadline;
391
392
393
394
395
396
397 req->rq_deadline = req->rq_sent + req->rq_timeout +
398 ptlrpc_at_get_net_latency(req);
399
400 DEBUG_REQ(D_ADAPTTO, req,
401 "Early reply #%d, new deadline in %lds (%lds)",
402 req->rq_early_count,
403 (long)(req->rq_deadline - ktime_get_real_seconds()),
404 (long)(req->rq_deadline - olddl));
405
406 return rc;
407}
408
409static struct kmem_cache *request_cache;
410
411int ptlrpc_request_cache_init(void)
412{
413 request_cache = kmem_cache_create("ptlrpc_cache",
414 sizeof(struct ptlrpc_request),
415 0, SLAB_HWCACHE_ALIGN, NULL);
416 return !request_cache ? -ENOMEM : 0;
417}
418
419void ptlrpc_request_cache_fini(void)
420{
421 kmem_cache_destroy(request_cache);
422}
423
424struct ptlrpc_request *ptlrpc_request_cache_alloc(gfp_t flags)
425{
426 struct ptlrpc_request *req;
427
428 req = kmem_cache_zalloc(request_cache, flags);
429 return req;
430}
431
432void ptlrpc_request_cache_free(struct ptlrpc_request *req)
433{
434 kmem_cache_free(request_cache, req);
435}
436
437
438
439
440
441void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool)
442{
443 struct list_head *l, *tmp;
444 struct ptlrpc_request *req;
445
446 spin_lock(&pool->prp_lock);
447 list_for_each_safe(l, tmp, &pool->prp_req_list) {
448 req = list_entry(l, struct ptlrpc_request, rq_list);
449 list_del(&req->rq_list);
450 LASSERT(req->rq_reqbuf);
451 LASSERT(req->rq_reqbuf_len == pool->prp_rq_size);
452 kvfree(req->rq_reqbuf);
453 ptlrpc_request_cache_free(req);
454 }
455 spin_unlock(&pool->prp_lock);
456 kfree(pool);
457}
458EXPORT_SYMBOL(ptlrpc_free_rq_pool);
459
460
461
462
463int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
464{
465 int i;
466 int size = 1;
467
468 while (size < pool->prp_rq_size)
469 size <<= 1;
470
471 LASSERTF(list_empty(&pool->prp_req_list) ||
472 size == pool->prp_rq_size,
473 "Trying to change pool size with nonempty pool from %d to %d bytes\n",
474 pool->prp_rq_size, size);
475
476 spin_lock(&pool->prp_lock);
477 pool->prp_rq_size = size;
478 for (i = 0; i < num_rq; i++) {
479 struct ptlrpc_request *req;
480 struct lustre_msg *msg;
481
482 spin_unlock(&pool->prp_lock);
483 req = ptlrpc_request_cache_alloc(GFP_NOFS);
484 if (!req)
485 return i;
486 msg = libcfs_kvzalloc(size, GFP_NOFS);
487 if (!msg) {
488 ptlrpc_request_cache_free(req);
489 return i;
490 }
491 req->rq_reqbuf = msg;
492 req->rq_reqbuf_len = size;
493 req->rq_pool = pool;
494 spin_lock(&pool->prp_lock);
495 list_add_tail(&req->rq_list, &pool->prp_req_list);
496 }
497 spin_unlock(&pool->prp_lock);
498 return num_rq;
499}
500EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool);
501
502
503
504
505
506
507
508
509
510struct ptlrpc_request_pool *
511ptlrpc_init_rq_pool(int num_rq, int msgsize,
512 int (*populate_pool)(struct ptlrpc_request_pool *, int))
513{
514 struct ptlrpc_request_pool *pool;
515
516 pool = kzalloc(sizeof(struct ptlrpc_request_pool), GFP_NOFS);
517 if (!pool)
518 return NULL;
519
520
521
522
523
524
525 spin_lock_init(&pool->prp_lock);
526 INIT_LIST_HEAD(&pool->prp_req_list);
527 pool->prp_rq_size = msgsize + SPTLRPC_MAX_PAYLOAD;
528 pool->prp_populate = populate_pool;
529
530 populate_pool(pool, num_rq);
531
532 return pool;
533}
534EXPORT_SYMBOL(ptlrpc_init_rq_pool);
535
536
537
538
539static struct ptlrpc_request *
540ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
541{
542 struct ptlrpc_request *request;
543 struct lustre_msg *reqbuf;
544
545 if (!pool)
546 return NULL;
547
548 spin_lock(&pool->prp_lock);
549
550
551
552
553
554
555
556 if (unlikely(list_empty(&pool->prp_req_list))) {
557 spin_unlock(&pool->prp_lock);
558 return NULL;
559 }
560
561 request = list_entry(pool->prp_req_list.next, struct ptlrpc_request,
562 rq_list);
563 list_del_init(&request->rq_list);
564 spin_unlock(&pool->prp_lock);
565
566 LASSERT(request->rq_reqbuf);
567 LASSERT(request->rq_pool);
568
569 reqbuf = request->rq_reqbuf;
570 memset(request, 0, sizeof(*request));
571 request->rq_reqbuf = reqbuf;
572 request->rq_reqbuf_len = pool->prp_rq_size;
573 request->rq_pool = pool;
574
575 return request;
576}
577
578
579
580
581static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
582{
583 struct ptlrpc_request_pool *pool = request->rq_pool;
584
585 spin_lock(&pool->prp_lock);
586 LASSERT(list_empty(&request->rq_list));
587 LASSERT(!request->rq_receiving_reply);
588 list_add_tail(&request->rq_list, &pool->prp_req_list);
589 spin_unlock(&pool->prp_lock);
590}
591
592int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
593 __u32 version, int opcode, char **bufs,
594 struct ptlrpc_cli_ctx *ctx)
595{
596 int count;
597 struct obd_import *imp;
598 __u32 *lengths;
599 int rc;
600
601 count = req_capsule_filled_sizes(&request->rq_pill, RCL_CLIENT);
602 imp = request->rq_import;
603 lengths = request->rq_pill.rc_area[RCL_CLIENT];
604
605 if (unlikely(ctx)) {
606 request->rq_cli_ctx = sptlrpc_cli_ctx_get(ctx);
607 } else {
608 rc = sptlrpc_req_get_ctx(request);
609 if (rc)
610 goto out_free;
611 }
612 sptlrpc_req_set_flavor(request, opcode);
613
614 rc = lustre_pack_request(request, imp->imp_msg_magic, count,
615 lengths, bufs);
616 if (rc)
617 goto out_ctx;
618
619 lustre_msg_add_version(request->rq_reqmsg, version);
620 request->rq_send_state = LUSTRE_IMP_FULL;
621 request->rq_type = PTL_RPC_MSG_REQUEST;
622
623 request->rq_req_cbid.cbid_fn = request_out_callback;
624 request->rq_req_cbid.cbid_arg = request;
625
626 request->rq_reply_cbid.cbid_fn = reply_in_callback;
627 request->rq_reply_cbid.cbid_arg = request;
628
629 request->rq_reply_deadline = 0;
630 request->rq_bulk_deadline = 0;
631 request->rq_req_deadline = 0;
632 request->rq_phase = RQ_PHASE_NEW;
633 request->rq_next_phase = RQ_PHASE_UNDEFINED;
634
635 request->rq_request_portal = imp->imp_client->cli_request_portal;
636 request->rq_reply_portal = imp->imp_client->cli_reply_portal;
637
638 ptlrpc_at_set_req_timeout(request);
639
640 request->rq_xid = ptlrpc_next_xid();
641 lustre_msg_set_opc(request->rq_reqmsg, opcode);
642
643
644 if (cfs_fail_val == opcode) {
645 time_t *fail_t = NULL, *fail2_t = NULL;
646
647 if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) {
648 fail_t = &request->rq_bulk_deadline;
649 } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
650 fail_t = &request->rq_reply_deadline;
651 } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK)) {
652 fail_t = &request->rq_req_deadline;
653 } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BOTH_UNLINK)) {
654 fail_t = &request->rq_reply_deadline;
655 fail2_t = &request->rq_bulk_deadline;
656 }
657
658 if (fail_t) {
659 *fail_t = ktime_get_real_seconds() + LONG_UNLINK;
660
661 if (fail2_t)
662 *fail2_t = ktime_get_real_seconds() +
663 LONG_UNLINK;
664
665
666
667
668 set_current_state(TASK_UNINTERRUPTIBLE);
669 schedule_timeout(cfs_time_seconds(2));
670 set_current_state(TASK_RUNNING);
671 }
672 }
673
674 return 0;
675
676out_ctx:
677 LASSERT(!request->rq_pool);
678 sptlrpc_cli_ctx_put(request->rq_cli_ctx, 1);
679out_free:
680 class_import_put(imp);
681 return rc;
682}
683EXPORT_SYMBOL(ptlrpc_request_bufs_pack);
684
685
686
687
688
689int ptlrpc_request_pack(struct ptlrpc_request *request,
690 __u32 version, int opcode)
691{
692 int rc;
693
694 rc = ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL);
695 if (rc)
696 return rc;
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712 if (opcode == LDLM_BL_CALLBACK || opcode == LDLM_CP_CALLBACK ||
713 opcode == LDLM_GL_CALLBACK)
714 req_capsule_shrink(&request->rq_pill, &RMF_PTLRPC_BODY,
715 sizeof(struct ptlrpc_body_v2), RCL_CLIENT);
716
717 return rc;
718}
719EXPORT_SYMBOL(ptlrpc_request_pack);
720
721
722
723
724
725
726
727static inline
728struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp,
729 struct ptlrpc_request_pool *pool)
730{
731 struct ptlrpc_request *request;
732
733 request = ptlrpc_request_cache_alloc(GFP_NOFS);
734
735 if (!request && pool)
736 request = ptlrpc_prep_req_from_pool(pool);
737
738 if (request) {
739 ptlrpc_cli_req_init(request);
740
741 LASSERTF((unsigned long)imp > 0x1000, "%p", imp);
742 LASSERT(imp != LP_POISON);
743 LASSERTF((unsigned long)imp->imp_client > 0x1000, "%p\n",
744 imp->imp_client);
745 LASSERT(imp->imp_client != LP_POISON);
746
747 request->rq_import = class_import_get(imp);
748 } else {
749 CERROR("request allocation out of memory\n");
750 }
751
752 return request;
753}
754
755
756
757
758
759
760
761static struct ptlrpc_request *
762ptlrpc_request_alloc_internal(struct obd_import *imp,
763 struct ptlrpc_request_pool *pool,
764 const struct req_format *format)
765{
766 struct ptlrpc_request *request;
767
768 request = __ptlrpc_request_alloc(imp, pool);
769 if (!request)
770 return NULL;
771
772 req_capsule_init(&request->rq_pill, request, RCL_CLIENT);
773 req_capsule_set(&request->rq_pill, format);
774 return request;
775}
776
777
778
779
780
781struct ptlrpc_request *ptlrpc_request_alloc(struct obd_import *imp,
782 const struct req_format *format)
783{
784 return ptlrpc_request_alloc_internal(imp, NULL, format);
785}
786EXPORT_SYMBOL(ptlrpc_request_alloc);
787
788
789
790
791
792struct ptlrpc_request *ptlrpc_request_alloc_pool(struct obd_import *imp,
793 struct ptlrpc_request_pool *pool,
794 const struct req_format *format)
795{
796 return ptlrpc_request_alloc_internal(imp, pool, format);
797}
798EXPORT_SYMBOL(ptlrpc_request_alloc_pool);
799
800
801
802
803
804void ptlrpc_request_free(struct ptlrpc_request *request)
805{
806 if (request->rq_pool)
807 __ptlrpc_free_req_to_pool(request);
808 else
809 ptlrpc_request_cache_free(request);
810}
811EXPORT_SYMBOL(ptlrpc_request_free);
812
813
814
815
816
817
818
819
820struct ptlrpc_request *ptlrpc_request_alloc_pack(struct obd_import *imp,
821 const struct req_format *format,
822 __u32 version, int opcode)
823{
824 struct ptlrpc_request *req = ptlrpc_request_alloc(imp, format);
825 int rc;
826
827 if (req) {
828 rc = ptlrpc_request_pack(req, version, opcode);
829 if (rc) {
830 ptlrpc_request_free(req);
831 req = NULL;
832 }
833 }
834 return req;
835}
836EXPORT_SYMBOL(ptlrpc_request_alloc_pack);
837
838
839
840
841
842struct ptlrpc_request_set *ptlrpc_prep_set(void)
843{
844 struct ptlrpc_request_set *set;
845 int cpt;
846
847 cpt = cfs_cpt_current(cfs_cpt_table, 0);
848 set = kzalloc_node(sizeof(*set), GFP_NOFS,
849 cfs_cpt_spread_node(cfs_cpt_table, cpt));
850 if (!set)
851 return NULL;
852 atomic_set(&set->set_refcount, 1);
853 INIT_LIST_HEAD(&set->set_requests);
854 init_waitqueue_head(&set->set_waitq);
855 atomic_set(&set->set_new_count, 0);
856 atomic_set(&set->set_remaining, 0);
857 spin_lock_init(&set->set_new_req_lock);
858 INIT_LIST_HEAD(&set->set_new_requests);
859 INIT_LIST_HEAD(&set->set_cblist);
860 set->set_max_inflight = UINT_MAX;
861 set->set_producer = NULL;
862 set->set_producer_arg = NULL;
863 set->set_rc = 0;
864
865 return set;
866}
867EXPORT_SYMBOL(ptlrpc_prep_set);
868
869
870
871
872
873
874
875
876
877struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
878 void *arg)
879
880{
881 struct ptlrpc_request_set *set;
882
883 set = ptlrpc_prep_set();
884 if (!set)
885 return NULL;
886
887 set->set_max_inflight = max;
888 set->set_producer = func;
889 set->set_producer_arg = arg;
890
891 return set;
892}
893
894
895
896
897
898
899
900
901
902void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
903{
904 struct list_head *tmp;
905 struct list_head *next;
906 int expected_phase;
907 int n = 0;
908
909
910 expected_phase = (atomic_read(&set->set_remaining) == 0) ?
911 RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
912 list_for_each(tmp, &set->set_requests) {
913 struct ptlrpc_request *req =
914 list_entry(tmp, struct ptlrpc_request, rq_set_chain);
915
916 LASSERT(req->rq_phase == expected_phase);
917 n++;
918 }
919
920 LASSERTF(atomic_read(&set->set_remaining) == 0 ||
921 atomic_read(&set->set_remaining) == n, "%d / %d\n",
922 atomic_read(&set->set_remaining), n);
923
924 list_for_each_safe(tmp, next, &set->set_requests) {
925 struct ptlrpc_request *req =
926 list_entry(tmp, struct ptlrpc_request, rq_set_chain);
927 list_del_init(&req->rq_set_chain);
928
929 LASSERT(req->rq_phase == expected_phase);
930
931 if (req->rq_phase == RQ_PHASE_NEW) {
932 ptlrpc_req_interpret(NULL, req, -EBADR);
933 atomic_dec(&set->set_remaining);
934 }
935
936 spin_lock(&req->rq_lock);
937 req->rq_set = NULL;
938 req->rq_invalid_rqset = 0;
939 spin_unlock(&req->rq_lock);
940
941 ptlrpc_req_finished(req);
942 }
943
944 LASSERT(atomic_read(&set->set_remaining) == 0);
945
946 ptlrpc_reqset_put(set);
947}
948EXPORT_SYMBOL(ptlrpc_set_destroy);
949
950
951
952
953
954void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
955 struct ptlrpc_request *req)
956{
957 LASSERT(list_empty(&req->rq_set_chain));
958
959
960 list_add_tail(&req->rq_set_chain, &set->set_requests);
961 req->rq_set = set;
962 atomic_inc(&set->set_remaining);
963 req->rq_queued_time = cfs_time_current();
964
965 if (req->rq_reqmsg)
966 lustre_msg_set_jobid(req->rq_reqmsg, NULL);
967
968 if (set->set_producer)
969
970
971
972
973 ptlrpc_send_new_req(req);
974}
975EXPORT_SYMBOL(ptlrpc_set_add_req);
976
977
978
979
980
981
982void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
983 struct ptlrpc_request *req)
984{
985 struct ptlrpc_request_set *set = pc->pc_set;
986 int count, i;
987
988 LASSERT(!req->rq_set);
989 LASSERT(test_bit(LIOD_STOP, &pc->pc_flags) == 0);
990
991 spin_lock(&set->set_new_req_lock);
992
993 req->rq_set = set;
994 req->rq_queued_time = cfs_time_current();
995 list_add_tail(&req->rq_set_chain, &set->set_new_requests);
996 count = atomic_inc_return(&set->set_new_count);
997 spin_unlock(&set->set_new_req_lock);
998
999
1000 if (count == 1) {
1001 wake_up(&set->set_waitq);
1002
1003
1004
1005
1006
1007
1008 for (i = 0; i < pc->pc_npartners; i++)
1009 wake_up(&pc->pc_partners[i]->pc_set->set_waitq);
1010 }
1011}
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023static int ptlrpc_import_delay_req(struct obd_import *imp,
1024 struct ptlrpc_request *req, int *status)
1025{
1026 int delay = 0;
1027
1028 *status = 0;
1029
1030 if (req->rq_ctx_init || req->rq_ctx_fini) {
1031
1032 } else if (imp->imp_state == LUSTRE_IMP_NEW) {
1033 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
1034 *status = -EIO;
1035 } else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
1036
1037 DEBUG_REQ(lustre_msg_get_opc(req->rq_reqmsg) == OBD_PING ?
1038 D_HA : D_ERROR, req, "IMP_CLOSED ");
1039 *status = -EIO;
1040 } else if (ptlrpc_send_limit_expired(req)) {
1041
1042 DEBUG_REQ(D_HA, req, "send limit expired ");
1043 *status = -ETIMEDOUT;
1044 } else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
1045 imp->imp_state == LUSTRE_IMP_CONNECTING) {
1046
1047 if (atomic_read(&imp->imp_inval_count) != 0) {
1048 DEBUG_REQ(D_ERROR, req, "invalidate in flight");
1049 *status = -EIO;
1050 }
1051 } else if (imp->imp_invalid || imp->imp_obd->obd_no_recov) {
1052 if (!imp->imp_deactive)
1053 DEBUG_REQ(D_NET, req, "IMP_INVALID");
1054 *status = -ESHUTDOWN;
1055 } else if (req->rq_import_generation != imp->imp_generation) {
1056 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
1057 *status = -EIO;
1058 } else if (req->rq_send_state != imp->imp_state) {
1059
1060 if (atomic_read(&imp->imp_inval_count) != 0) {
1061 DEBUG_REQ(D_ERROR, req, "invalidate in flight");
1062 *status = -EIO;
1063 } else if (imp->imp_dlm_fake || req->rq_no_delay) {
1064 *status = -EWOULDBLOCK;
1065 } else if (req->rq_allow_replay &&
1066 (imp->imp_state == LUSTRE_IMP_REPLAY ||
1067 imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS ||
1068 imp->imp_state == LUSTRE_IMP_REPLAY_WAIT ||
1069 imp->imp_state == LUSTRE_IMP_RECOVER)) {
1070 DEBUG_REQ(D_HA, req, "allow during recovery.\n");
1071 } else {
1072 delay = 1;
1073 }
1074 }
1075
1076 return delay;
1077}
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088static bool ptlrpc_console_allow(struct ptlrpc_request *req)
1089{
1090 __u32 opc;
1091
1092 LASSERT(req->rq_reqmsg);
1093 opc = lustre_msg_get_opc(req->rq_reqmsg);
1094
1095
1096 if (opc == OST_CONNECT || opc == MDS_CONNECT || opc == MGS_CONNECT) {
1097 int err;
1098
1099
1100 if (lustre_handle_is_used(&req->rq_import->imp_remote_handle) ||
1101 req->rq_timedout)
1102 return false;
1103
1104
1105
1106
1107
1108
1109 err = lustre_msg_get_status(req->rq_repmsg);
1110 if ((err == -ENODEV || err == -EAGAIN) &&
1111 req->rq_import->imp_conn_cnt % 30 != 20)
1112 return false;
1113 }
1114
1115 return true;
1116}
1117
1118
1119
1120
1121
1122static int ptlrpc_check_status(struct ptlrpc_request *req)
1123{
1124 int err;
1125
1126 err = lustre_msg_get_status(req->rq_repmsg);
1127 if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
1128 struct obd_import *imp = req->rq_import;
1129 lnet_nid_t nid = imp->imp_connection->c_peer.nid;
1130 __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1131
1132 if (ptlrpc_console_allow(req))
1133 LCONSOLE_ERROR_MSG(0x011, "%s: operation %s to node %s failed: rc = %d\n",
1134 imp->imp_obd->obd_name,
1135 ll_opcode2str(opc),
1136 libcfs_nid2str(nid), err);
1137 return err < 0 ? err : -EINVAL;
1138 }
1139
1140 if (err < 0)
1141 DEBUG_REQ(D_INFO, req, "status is %d", err);
1142 else if (err > 0)
1143
1144 DEBUG_REQ(D_INFO, req, "status is %d", err);
1145
1146 return err;
1147}
1148
1149
1150
1151
1152
1153
1154static void ptlrpc_save_versions(struct ptlrpc_request *req)
1155{
1156 struct lustre_msg *repmsg = req->rq_repmsg;
1157 struct lustre_msg *reqmsg = req->rq_reqmsg;
1158 __u64 *versions = lustre_msg_get_versions(repmsg);
1159
1160 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
1161 return;
1162
1163 LASSERT(versions);
1164 lustre_msg_set_versions(reqmsg, versions);
1165 CDEBUG(D_INFO, "Client save versions [%#llx/%#llx]\n",
1166 versions[0], versions[1]);
1167}
1168
1169
1170
1171
1172
1173
1174
1175
1176static int after_reply(struct ptlrpc_request *req)
1177{
1178 struct obd_import *imp = req->rq_import;
1179 struct obd_device *obd = req->rq_import->imp_obd;
1180 int rc;
1181 struct timespec64 work_start;
1182 long timediff;
1183
1184 LASSERT(obd);
1185
1186 LASSERT(!req->rq_receiving_reply && req->rq_reply_unlinked);
1187
1188 if (req->rq_reply_truncated) {
1189 if (ptlrpc_no_resend(req)) {
1190 DEBUG_REQ(D_ERROR, req, "reply buffer overflow, expected: %d, actual size: %d",
1191 req->rq_nob_received, req->rq_repbuf_len);
1192 return -EOVERFLOW;
1193 }
1194
1195 sptlrpc_cli_free_repbuf(req);
1196
1197
1198
1199
1200
1201 req->rq_replen = req->rq_nob_received;
1202 req->rq_nob_received = 0;
1203 spin_lock(&req->rq_lock);
1204 req->rq_resend = 1;
1205 spin_unlock(&req->rq_lock);
1206 return 0;
1207 }
1208
1209
1210
1211
1212
1213 rc = sptlrpc_cli_unwrap_reply(req);
1214 if (rc) {
1215 DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc);
1216 return rc;
1217 }
1218
1219
1220 if (req->rq_resend)
1221 return 0;
1222
1223 rc = unpack_reply(req);
1224 if (rc)
1225 return rc;
1226
1227
1228 if (lustre_msg_get_status(req->rq_repmsg) == -EINPROGRESS &&
1229 ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) {
1230 time64_t now = ktime_get_real_seconds();
1231
1232 DEBUG_REQ(D_RPCTRACE, req, "Resending request on EINPROGRESS");
1233 spin_lock(&req->rq_lock);
1234 req->rq_resend = 1;
1235 spin_unlock(&req->rq_lock);
1236 req->rq_nr_resend++;
1237
1238
1239 if (!req->rq_bulk) {
1240
1241 req->rq_xid = ptlrpc_next_xid();
1242 DEBUG_REQ(D_RPCTRACE, req, "Allocating new xid for resend on EINPROGRESS");
1243 }
1244
1245
1246 ptlrpc_at_set_req_timeout(req);
1247
1248
1249
1250
1251
1252
1253 if (req->rq_nr_resend > req->rq_timeout)
1254 req->rq_sent = now + req->rq_timeout;
1255 else
1256 req->rq_sent = now + req->rq_nr_resend;
1257
1258 return 0;
1259 }
1260
1261 ktime_get_real_ts64(&work_start);
1262 timediff = (work_start.tv_sec - req->rq_sent_tv.tv_sec) * USEC_PER_SEC +
1263 (work_start.tv_nsec - req->rq_sent_tv.tv_nsec) /
1264 NSEC_PER_USEC;
1265 if (obd->obd_svc_stats) {
1266 lprocfs_counter_add(obd->obd_svc_stats, PTLRPC_REQWAIT_CNTR,
1267 timediff);
1268 ptlrpc_lprocfs_rpc_sent(req, timediff);
1269 }
1270
1271 if (lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_REPLY &&
1272 lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_ERR) {
1273 DEBUG_REQ(D_ERROR, req, "invalid packet received (type=%u)",
1274 lustre_msg_get_type(req->rq_repmsg));
1275 return -EPROTO;
1276 }
1277
1278 if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING)
1279 CFS_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, cfs_fail_val);
1280 ptlrpc_at_adj_service(req, lustre_msg_get_timeout(req->rq_repmsg));
1281 ptlrpc_at_adj_net_latency(req,
1282 lustre_msg_get_service_time(req->rq_repmsg));
1283
1284 rc = ptlrpc_check_status(req);
1285 imp->imp_connect_error = rc;
1286
1287 if (rc) {
1288
1289
1290
1291
1292
1293 if (ptlrpc_recoverable_error(rc)) {
1294 if (req->rq_send_state != LUSTRE_IMP_FULL ||
1295 imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
1296 return rc;
1297 }
1298 ptlrpc_request_handle_notconn(req);
1299 return rc;
1300 }
1301 } else {
1302
1303
1304
1305
1306 ldlm_cli_update_pool(req);
1307 }
1308
1309
1310 if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) {
1311 req->rq_transno = lustre_msg_get_transno(req->rq_repmsg);
1312 lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno);
1313 }
1314
1315 if (imp->imp_replayable) {
1316 spin_lock(&imp->imp_lock);
1317
1318
1319
1320
1321 if (req->rq_transno != 0 &&
1322 (req->rq_transno >
1323 lustre_msg_get_last_committed(req->rq_repmsg) ||
1324 req->rq_replay)) {
1325
1326 ptlrpc_save_versions(req);
1327 ptlrpc_retain_replayable_request(req, imp);
1328 } else if (req->rq_commit_cb &&
1329 list_empty(&req->rq_replay_list)) {
1330
1331
1332
1333
1334
1335 spin_unlock(&imp->imp_lock);
1336 req->rq_commit_cb(req);
1337 spin_lock(&imp->imp_lock);
1338 }
1339
1340
1341 if (lustre_msg_get_last_committed(req->rq_repmsg)) {
1342 imp->imp_peer_committed_transno =
1343 lustre_msg_get_last_committed(req->rq_repmsg);
1344 }
1345
1346 ptlrpc_free_committed(imp);
1347
1348 if (!list_empty(&imp->imp_replay_list)) {
1349 struct ptlrpc_request *last;
1350
1351 last = list_entry(imp->imp_replay_list.prev,
1352 struct ptlrpc_request,
1353 rq_replay_list);
1354
1355
1356
1357
1358 if (last->rq_transno > imp->imp_peer_committed_transno)
1359 ptlrpc_pinger_commit_expected(imp);
1360 }
1361
1362 spin_unlock(&imp->imp_lock);
1363 }
1364
1365 return rc;
1366}
1367
1368
1369
1370
1371
1372
1373static int ptlrpc_send_new_req(struct ptlrpc_request *req)
1374{
1375 struct obd_import *imp = req->rq_import;
1376 int rc;
1377
1378 LASSERT(req->rq_phase == RQ_PHASE_NEW);
1379 if (req->rq_sent && (req->rq_sent > ktime_get_real_seconds()) &&
1380 (!req->rq_generation_set ||
1381 req->rq_import_generation == imp->imp_generation))
1382 return 0;
1383
1384 ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
1385
1386 spin_lock(&imp->imp_lock);
1387
1388 if (!req->rq_generation_set)
1389 req->rq_import_generation = imp->imp_generation;
1390
1391 if (ptlrpc_import_delay_req(imp, req, &rc)) {
1392 spin_lock(&req->rq_lock);
1393 req->rq_waiting = 1;
1394 spin_unlock(&req->rq_lock);
1395
1396 DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: (%s != %s)",
1397 lustre_msg_get_status(req->rq_reqmsg),
1398 ptlrpc_import_state_name(req->rq_send_state),
1399 ptlrpc_import_state_name(imp->imp_state));
1400 LASSERT(list_empty(&req->rq_list));
1401 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1402 atomic_inc(&req->rq_import->imp_inflight);
1403 spin_unlock(&imp->imp_lock);
1404 return 0;
1405 }
1406
1407 if (rc != 0) {
1408 spin_unlock(&imp->imp_lock);
1409 req->rq_status = rc;
1410 ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1411 return rc;
1412 }
1413
1414 LASSERT(list_empty(&req->rq_list));
1415 list_add_tail(&req->rq_list, &imp->imp_sending_list);
1416 atomic_inc(&req->rq_import->imp_inflight);
1417 spin_unlock(&imp->imp_lock);
1418
1419 lustre_msg_set_status(req->rq_reqmsg, current_pid());
1420
1421 rc = sptlrpc_req_refresh_ctx(req, -1);
1422 if (rc) {
1423 if (req->rq_err) {
1424 req->rq_status = rc;
1425 return 1;
1426 }
1427 spin_lock(&req->rq_lock);
1428 req->rq_wait_ctx = 1;
1429 spin_unlock(&req->rq_lock);
1430 return 0;
1431 }
1432
1433 CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n",
1434 current_comm(),
1435 imp->imp_obd->obd_uuid.uuid,
1436 lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
1437 libcfs_nid2str(imp->imp_connection->c_peer.nid),
1438 lustre_msg_get_opc(req->rq_reqmsg));
1439
1440 rc = ptl_send_rpc(req, 0);
1441 if (rc) {
1442 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
1443 spin_lock(&req->rq_lock);
1444 req->rq_net_err = 1;
1445 spin_unlock(&req->rq_lock);
1446 return rc;
1447 }
1448 return 0;
1449}
1450
1451static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set)
1452{
1453 int remaining, rc;
1454
1455 LASSERT(set->set_producer);
1456
1457 remaining = atomic_read(&set->set_remaining);
1458
1459
1460
1461
1462
1463 while (atomic_read(&set->set_remaining) < set->set_max_inflight) {
1464 rc = set->set_producer(set, set->set_producer_arg);
1465 if (rc == -ENOENT) {
1466
1467 set->set_producer = NULL;
1468 set->set_producer_arg = NULL;
1469 return 0;
1470 }
1471 }
1472
1473 return (atomic_read(&set->set_remaining) - remaining);
1474}
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
1485{
1486 struct list_head *tmp, *next;
1487 struct list_head comp_reqs;
1488 int force_timer_recalc = 0;
1489
1490 if (atomic_read(&set->set_remaining) == 0)
1491 return 1;
1492
1493 INIT_LIST_HEAD(&comp_reqs);
1494 list_for_each_safe(tmp, next, &set->set_requests) {
1495 struct ptlrpc_request *req =
1496 list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1497 struct obd_import *imp = req->rq_import;
1498 int unregistered = 0;
1499 int rc = 0;
1500
1501
1502
1503
1504
1505
1506
1507
1508 cond_resched();
1509
1510 if (req->rq_phase == RQ_PHASE_NEW &&
1511 ptlrpc_send_new_req(req)) {
1512 force_timer_recalc = 1;
1513 }
1514
1515
1516 if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent)
1517 continue;
1518
1519
1520 if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend &&
1521 req->rq_sent > ktime_get_real_seconds())
1522 continue;
1523
1524 if (!(req->rq_phase == RQ_PHASE_RPC ||
1525 req->rq_phase == RQ_PHASE_BULK ||
1526 req->rq_phase == RQ_PHASE_INTERPRET ||
1527 req->rq_phase == RQ_PHASE_UNREG_RPC ||
1528 req->rq_phase == RQ_PHASE_UNREG_BULK ||
1529 req->rq_phase == RQ_PHASE_COMPLETE)) {
1530 DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
1531 LBUG();
1532 }
1533
1534 if (req->rq_phase == RQ_PHASE_UNREG_RPC ||
1535 req->rq_phase == RQ_PHASE_UNREG_BULK) {
1536 LASSERT(req->rq_next_phase != req->rq_phase);
1537 LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED);
1538
1539 if (req->rq_req_deadline &&
1540 !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK))
1541 req->rq_req_deadline = 0;
1542 if (req->rq_reply_deadline &&
1543 !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK))
1544 req->rq_reply_deadline = 0;
1545 if (req->rq_bulk_deadline &&
1546 !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK))
1547 req->rq_bulk_deadline = 0;
1548
1549
1550
1551
1552
1553
1554
1555
1556 if (req->rq_phase == RQ_PHASE_UNREG_RPC &&
1557 ptlrpc_client_recv_or_unlink(req))
1558 continue;
1559 if (req->rq_phase == RQ_PHASE_UNREG_BULK &&
1560 ptlrpc_client_bulk_active(req))
1561 continue;
1562
1563
1564
1565
1566
1567 if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
1568 OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK,
1569 OBD_FAIL_ONCE);
1570 }
1571 if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) {
1572 OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK,
1573 OBD_FAIL_ONCE);
1574 }
1575
1576
1577
1578
1579 ptlrpc_rqphase_move(req, req->rq_next_phase);
1580 }
1581
1582 if (req->rq_phase == RQ_PHASE_COMPLETE) {
1583 list_move_tail(&req->rq_set_chain, &comp_reqs);
1584 continue;
1585 }
1586
1587 if (req->rq_phase == RQ_PHASE_INTERPRET)
1588 goto interpret;
1589
1590
1591 if (req->rq_net_err && !req->rq_timedout) {
1592 ptlrpc_expire_one_request(req, 1);
1593
1594
1595 if (ptlrpc_client_recv_or_unlink(req) ||
1596 ptlrpc_client_bulk_active(req))
1597 continue;
1598
1599 if (req->rq_no_resend) {
1600 if (req->rq_status == 0)
1601 req->rq_status = -EIO;
1602 ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1603 goto interpret;
1604 } else {
1605 continue;
1606 }
1607 }
1608
1609 if (req->rq_err) {
1610 spin_lock(&req->rq_lock);
1611 req->rq_replied = 0;
1612 spin_unlock(&req->rq_lock);
1613 if (req->rq_status == 0)
1614 req->rq_status = -EIO;
1615 ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1616 goto interpret;
1617 }
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629 if (req->rq_intr && (req->rq_timedout || req->rq_waiting ||
1630 req->rq_wait_ctx)) {
1631 req->rq_status = -EINTR;
1632 ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1633 goto interpret;
1634 }
1635
1636 if (req->rq_phase == RQ_PHASE_RPC) {
1637 if (req->rq_timedout || req->rq_resend ||
1638 req->rq_waiting || req->rq_wait_ctx) {
1639 int status;
1640
1641 if (!ptlrpc_unregister_reply(req, 1)) {
1642 ptlrpc_unregister_bulk(req, 1);
1643 continue;
1644 }
1645
1646 spin_lock(&imp->imp_lock);
1647 if (ptlrpc_import_delay_req(imp, req,
1648 &status)) {
1649
1650
1651
1652
1653 list_del_init(&req->rq_list);
1654 list_add_tail(&req->rq_list,
1655 &imp->imp_delayed_list);
1656 spin_unlock(&imp->imp_lock);
1657 continue;
1658 }
1659
1660 if (status != 0) {
1661 req->rq_status = status;
1662 ptlrpc_rqphase_move(req,
1663 RQ_PHASE_INTERPRET);
1664 spin_unlock(&imp->imp_lock);
1665 goto interpret;
1666 }
1667 if (ptlrpc_no_resend(req) &&
1668 !req->rq_wait_ctx) {
1669 req->rq_status = -ENOTCONN;
1670 ptlrpc_rqphase_move(req,
1671 RQ_PHASE_INTERPRET);
1672 spin_unlock(&imp->imp_lock);
1673 goto interpret;
1674 }
1675
1676 list_del_init(&req->rq_list);
1677 list_add_tail(&req->rq_list,
1678 &imp->imp_sending_list);
1679
1680 spin_unlock(&imp->imp_lock);
1681
1682 spin_lock(&req->rq_lock);
1683 req->rq_waiting = 0;
1684 spin_unlock(&req->rq_lock);
1685
1686 if (req->rq_timedout || req->rq_resend) {
1687
1688 spin_lock(&req->rq_lock);
1689 req->rq_resend = 1;
1690 spin_unlock(&req->rq_lock);
1691 if (req->rq_bulk) {
1692 __u64 old_xid;
1693
1694 if (!ptlrpc_unregister_bulk(req, 1))
1695 continue;
1696
1697
1698 old_xid = req->rq_xid;
1699 req->rq_xid = ptlrpc_next_xid();
1700 CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
1701 old_xid, req->rq_xid);
1702 }
1703 }
1704
1705
1706
1707
1708 status = sptlrpc_req_refresh_ctx(req, -1);
1709 if (status) {
1710 if (req->rq_err) {
1711 req->rq_status = status;
1712 spin_lock(&req->rq_lock);
1713 req->rq_wait_ctx = 0;
1714 spin_unlock(&req->rq_lock);
1715 force_timer_recalc = 1;
1716 } else {
1717 spin_lock(&req->rq_lock);
1718 req->rq_wait_ctx = 1;
1719 spin_unlock(&req->rq_lock);
1720 }
1721
1722 continue;
1723 } else {
1724 spin_lock(&req->rq_lock);
1725 req->rq_wait_ctx = 0;
1726 spin_unlock(&req->rq_lock);
1727 }
1728
1729 rc = ptl_send_rpc(req, 0);
1730 if (rc) {
1731 DEBUG_REQ(D_HA, req,
1732 "send failed: rc = %d", rc);
1733 force_timer_recalc = 1;
1734 spin_lock(&req->rq_lock);
1735 req->rq_net_err = 1;
1736 spin_unlock(&req->rq_lock);
1737 continue;
1738 }
1739
1740 force_timer_recalc = 1;
1741 }
1742
1743 spin_lock(&req->rq_lock);
1744
1745 if (ptlrpc_client_early(req)) {
1746 ptlrpc_at_recv_early_reply(req);
1747 spin_unlock(&req->rq_lock);
1748 continue;
1749 }
1750
1751
1752 if (ptlrpc_client_recv(req)) {
1753 spin_unlock(&req->rq_lock);
1754 continue;
1755 }
1756
1757
1758 if (!ptlrpc_client_replied(req)) {
1759 spin_unlock(&req->rq_lock);
1760 continue;
1761 }
1762
1763 spin_unlock(&req->rq_lock);
1764
1765
1766
1767
1768
1769 unregistered = ptlrpc_unregister_reply(req, 1);
1770 if (!unregistered)
1771 continue;
1772
1773 req->rq_status = after_reply(req);
1774 if (req->rq_resend)
1775 continue;
1776
1777
1778
1779
1780
1781
1782
1783 if (!req->rq_bulk || req->rq_status < 0) {
1784 ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1785 goto interpret;
1786 }
1787
1788 ptlrpc_rqphase_move(req, RQ_PHASE_BULK);
1789 }
1790
1791 LASSERT(req->rq_phase == RQ_PHASE_BULK);
1792 if (ptlrpc_client_bulk_active(req))
1793 continue;
1794
1795 if (req->rq_bulk->bd_failure) {
1796
1797
1798
1799
1800
1801
1802 DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1803 req->rq_status = -EIO;
1804 }
1805
1806 ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1807
1808interpret:
1809 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
1810
1811
1812
1813
1814
1815 if (!unregistered && !ptlrpc_unregister_reply(req, 1)) {
1816
1817 ptlrpc_unregister_bulk(req, 1);
1818 continue;
1819 }
1820
1821 if (!ptlrpc_unregister_bulk(req, 1))
1822 continue;
1823
1824
1825 LASSERT(!req->rq_receiving_reply);
1826
1827 ptlrpc_req_interpret(env, req, req->rq_status);
1828
1829 if (ptlrpcd_check_work(req)) {
1830 atomic_dec(&set->set_remaining);
1831 continue;
1832 }
1833 ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
1834
1835 CDEBUG(req->rq_reqmsg ? D_RPCTRACE : 0,
1836 "Completed RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n",
1837 current_comm(), imp->imp_obd->obd_uuid.uuid,
1838 lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
1839 libcfs_nid2str(imp->imp_connection->c_peer.nid),
1840 lustre_msg_get_opc(req->rq_reqmsg));
1841
1842 spin_lock(&imp->imp_lock);
1843
1844
1845
1846
1847
1848
1849 if (!list_empty(&req->rq_list)) {
1850 list_del_init(&req->rq_list);
1851 atomic_dec(&imp->imp_inflight);
1852 }
1853 spin_unlock(&imp->imp_lock);
1854
1855 atomic_dec(&set->set_remaining);
1856 wake_up_all(&imp->imp_recovery_waitq);
1857
1858 if (set->set_producer) {
1859
1860 if (ptlrpc_set_producer(set) > 0)
1861 force_timer_recalc = 1;
1862
1863
1864
1865
1866
1867 list_del_init(&req->rq_set_chain);
1868 spin_lock(&req->rq_lock);
1869 req->rq_set = NULL;
1870 req->rq_invalid_rqset = 0;
1871 spin_unlock(&req->rq_lock);
1872
1873
1874 if (req->rq_status != 0)
1875 set->set_rc = req->rq_status;
1876 ptlrpc_req_finished(req);
1877 } else {
1878 list_move_tail(&req->rq_set_chain, &comp_reqs);
1879 }
1880 }
1881
1882
1883
1884
1885
1886 list_splice(&comp_reqs, &set->set_requests);
1887
1888
1889 return atomic_read(&set->set_remaining) == 0 || force_timer_recalc;
1890}
1891EXPORT_SYMBOL(ptlrpc_check_set);
1892
1893
1894
1895
1896
1897
1898int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
1899{
1900 struct obd_import *imp = req->rq_import;
1901 int rc = 0;
1902
1903 spin_lock(&req->rq_lock);
1904 req->rq_timedout = 1;
1905 spin_unlock(&req->rq_lock);
1906
1907 DEBUG_REQ(D_WARNING, req, "Request sent has %s: [sent %lld/real %lld]",
1908 req->rq_net_err ? "failed due to network error" :
1909 ((req->rq_real_sent == 0 ||
1910 req->rq_real_sent < req->rq_sent ||
1911 req->rq_real_sent >= req->rq_deadline) ?
1912 "timed out for sent delay" : "timed out for slow reply"),
1913 (s64)req->rq_sent, (s64)req->rq_real_sent);
1914
1915 if (imp && obd_debug_peer_on_timeout)
1916 LNetDebugPeer(imp->imp_connection->c_peer);
1917
1918 ptlrpc_unregister_reply(req, async_unlink);
1919 ptlrpc_unregister_bulk(req, async_unlink);
1920
1921 if (obd_dump_on_timeout)
1922 libcfs_debug_dumplog();
1923
1924 if (!imp) {
1925 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
1926 return 1;
1927 }
1928
1929 atomic_inc(&imp->imp_timeouts);
1930
1931
1932 if (imp->imp_dlm_fake)
1933 return 1;
1934
1935
1936
1937
1938
1939 if (req->rq_ctx_init || req->rq_ctx_fini ||
1940 req->rq_send_state != LUSTRE_IMP_FULL ||
1941 imp->imp_obd->obd_no_recov) {
1942 DEBUG_REQ(D_RPCTRACE, req, "err -110, sent_state=%s (now=%s)",
1943 ptlrpc_import_state_name(req->rq_send_state),
1944 ptlrpc_import_state_name(imp->imp_state));
1945 spin_lock(&req->rq_lock);
1946 req->rq_status = -ETIMEDOUT;
1947 req->rq_err = 1;
1948 spin_unlock(&req->rq_lock);
1949 return 1;
1950 }
1951
1952
1953
1954
1955
1956 if (ptlrpc_no_resend(req)) {
1957 DEBUG_REQ(D_RPCTRACE, req, "TIMEOUT-NORESEND:");
1958 rc = 1;
1959 }
1960
1961 ptlrpc_fail_import(imp, lustre_msg_get_conn_cnt(req->rq_reqmsg));
1962
1963 return rc;
1964}
1965
1966
1967
1968
1969
1970
1971int ptlrpc_expired_set(void *data)
1972{
1973 struct ptlrpc_request_set *set = data;
1974 struct list_head *tmp;
1975 time64_t now = ktime_get_real_seconds();
1976
1977
1978 list_for_each(tmp, &set->set_requests) {
1979 struct ptlrpc_request *req =
1980 list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1981
1982
1983 if (req->rq_wait_ctx)
1984 continue;
1985
1986
1987 if (!((req->rq_phase == RQ_PHASE_RPC &&
1988 !req->rq_waiting && !req->rq_resend) ||
1989 (req->rq_phase == RQ_PHASE_BULK)))
1990 continue;
1991
1992 if (req->rq_timedout ||
1993 req->rq_deadline > now)
1994 continue;
1995
1996
1997
1998
1999
2000 ptlrpc_expire_one_request(req, 1);
2001 }
2002
2003
2004
2005
2006
2007
2008 return 1;
2009}
2010
2011
2012
2013
2014void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
2015{
2016 spin_lock(&req->rq_lock);
2017 req->rq_intr = 1;
2018 spin_unlock(&req->rq_lock);
2019}
2020EXPORT_SYMBOL(ptlrpc_mark_interrupted);
2021
2022
2023
2024
2025
2026static void ptlrpc_interrupted_set(void *data)
2027{
2028 struct ptlrpc_request_set *set = data;
2029 struct list_head *tmp;
2030
2031 CDEBUG(D_RPCTRACE, "INTERRUPTED SET %p\n", set);
2032
2033 list_for_each(tmp, &set->set_requests) {
2034 struct ptlrpc_request *req =
2035 list_entry(tmp, struct ptlrpc_request, rq_set_chain);
2036
2037 if (req->rq_phase != RQ_PHASE_RPC &&
2038 req->rq_phase != RQ_PHASE_UNREG_RPC)
2039 continue;
2040
2041 ptlrpc_mark_interrupted(req);
2042 }
2043}
2044
2045
2046
2047
2048int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
2049{
2050 struct list_head *tmp;
2051 time64_t now = ktime_get_real_seconds();
2052 int timeout = 0;
2053 struct ptlrpc_request *req;
2054 time64_t deadline;
2055
2056 list_for_each(tmp, &set->set_requests) {
2057 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
2058
2059
2060 if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
2061 (req->rq_phase == RQ_PHASE_BULK) ||
2062 (req->rq_phase == RQ_PHASE_NEW)))
2063 continue;
2064
2065
2066 if (req->rq_timedout)
2067 continue;
2068
2069
2070 if (req->rq_wait_ctx)
2071 continue;
2072
2073 if (req->rq_phase == RQ_PHASE_NEW)
2074 deadline = req->rq_sent;
2075 else if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend)
2076 deadline = req->rq_sent;
2077 else
2078 deadline = req->rq_sent + req->rq_timeout;
2079
2080 if (deadline <= now)
2081 timeout = 1;
2082 else if (timeout == 0 || timeout > deadline - now)
2083 timeout = deadline - now;
2084 }
2085 return timeout;
2086}
2087
2088
2089
2090
2091
2092
2093
2094int ptlrpc_set_wait(struct ptlrpc_request_set *set)
2095{
2096 struct list_head *tmp;
2097 struct ptlrpc_request *req;
2098 struct l_wait_info lwi;
2099 int rc, timeout;
2100
2101 if (set->set_producer)
2102 (void)ptlrpc_set_producer(set);
2103 else
2104 list_for_each(tmp, &set->set_requests) {
2105 req = list_entry(tmp, struct ptlrpc_request,
2106 rq_set_chain);
2107 if (req->rq_phase == RQ_PHASE_NEW)
2108 (void)ptlrpc_send_new_req(req);
2109 }
2110
2111 if (list_empty(&set->set_requests))
2112 return 0;
2113
2114 do {
2115 timeout = ptlrpc_set_next_timeout(set);
2116
2117
2118
2119
2120
2121 CDEBUG(D_RPCTRACE, "set %p going to sleep for %d seconds\n",
2122 set, timeout);
2123
2124 if (timeout == 0 && !signal_pending(current))
2125
2126
2127
2128
2129
2130
2131 lwi = LWI_TIMEOUT_INTR_ALL(cfs_time_seconds(1),
2132 ptlrpc_expired_set,
2133 ptlrpc_interrupted_set, set);
2134 else
2135
2136
2137
2138
2139
2140 lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
2141 ptlrpc_expired_set, set);
2142
2143 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(NULL, set), &lwi);
2144
2145
2146
2147
2148
2149
2150 if (rc == -ETIMEDOUT && !lwi.lwi_allow_intr &&
2151 signal_pending(current)) {
2152 sigset_t blocked_sigs =
2153 cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
2154
2155
2156
2157
2158
2159
2160
2161 if (signal_pending(current))
2162 ptlrpc_interrupted_set(set);
2163 cfs_restore_sigs(blocked_sigs);
2164 }
2165
2166 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177 if (rc == 0 && atomic_read(&set->set_remaining) == 0) {
2178 list_for_each(tmp, &set->set_requests) {
2179 req = list_entry(tmp, struct ptlrpc_request,
2180 rq_set_chain);
2181 spin_lock(&req->rq_lock);
2182 req->rq_invalid_rqset = 1;
2183 spin_unlock(&req->rq_lock);
2184 }
2185 }
2186 } while (rc != 0 || atomic_read(&set->set_remaining) != 0);
2187
2188 LASSERT(atomic_read(&set->set_remaining) == 0);
2189
2190 rc = set->set_rc;
2191 list_for_each(tmp, &set->set_requests) {
2192 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
2193
2194 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
2195 if (req->rq_status != 0)
2196 rc = req->rq_status;
2197 }
2198
2199 if (set->set_interpret) {
2200 int (*interpreter)(struct ptlrpc_request_set *set, void *, int) =
2201 set->set_interpret;
2202 rc = interpreter(set, set->set_arg, rc);
2203 } else {
2204 struct ptlrpc_set_cbdata *cbdata, *n;
2205 int err;
2206
2207 list_for_each_entry_safe(cbdata, n,
2208 &set->set_cblist, psc_item) {
2209 list_del_init(&cbdata->psc_item);
2210 err = cbdata->psc_interpret(set, cbdata->psc_data, rc);
2211 if (err && !rc)
2212 rc = err;
2213 kfree(cbdata);
2214 }
2215 }
2216
2217 return rc;
2218}
2219EXPORT_SYMBOL(ptlrpc_set_wait);
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
2230{
2231 if (!request)
2232 return;
2233 LASSERT(!request->rq_srv_req);
2234 LASSERT(!request->rq_export);
2235 LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
2236 LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
2237 LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
2238 LASSERTF(!request->rq_replay, "req %p\n", request);
2239
2240 req_capsule_fini(&request->rq_pill);
2241
2242
2243
2244
2245
2246 if (request->rq_import) {
2247 if (!locked)
2248 spin_lock(&request->rq_import->imp_lock);
2249 list_del_init(&request->rq_replay_list);
2250 if (!locked)
2251 spin_unlock(&request->rq_import->imp_lock);
2252 }
2253 LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
2254
2255 if (atomic_read(&request->rq_refcount) != 0) {
2256 DEBUG_REQ(D_ERROR, request,
2257 "freeing request with nonzero refcount");
2258 LBUG();
2259 }
2260
2261 if (request->rq_repbuf)
2262 sptlrpc_cli_free_repbuf(request);
2263
2264 if (request->rq_import) {
2265 class_import_put(request->rq_import);
2266 request->rq_import = NULL;
2267 }
2268 if (request->rq_bulk)
2269 ptlrpc_free_bulk_pin(request->rq_bulk);
2270
2271 if (request->rq_reqbuf || request->rq_clrbuf)
2272 sptlrpc_cli_free_reqbuf(request);
2273
2274 if (request->rq_cli_ctx)
2275 sptlrpc_req_put_ctx(request, !locked);
2276
2277 if (request->rq_pool)
2278 __ptlrpc_free_req_to_pool(request);
2279 else
2280 ptlrpc_request_cache_free(request);
2281}
2282
2283
2284
2285
2286
2287
2288
2289static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
2290{
2291 if (!request)
2292 return 1;
2293
2294 if (request == LP_POISON ||
2295 request->rq_reqmsg == LP_POISON) {
2296 CERROR("dereferencing freed request (bug 575)\n");
2297 LBUG();
2298 return 1;
2299 }
2300
2301 DEBUG_REQ(D_INFO, request, "refcount now %u",
2302 atomic_read(&request->rq_refcount) - 1);
2303
2304 if (atomic_dec_and_test(&request->rq_refcount)) {
2305 __ptlrpc_free_req(request, locked);
2306 return 1;
2307 }
2308
2309 return 0;
2310}
2311
2312
2313
2314
2315void ptlrpc_req_finished(struct ptlrpc_request *request)
2316{
2317 __ptlrpc_req_finished(request, 0);
2318}
2319EXPORT_SYMBOL(ptlrpc_req_finished);
2320
2321
2322
2323
2324__u64 ptlrpc_req_xid(struct ptlrpc_request *request)
2325{
2326 return request->rq_xid;
2327}
2328EXPORT_SYMBOL(ptlrpc_req_xid);
2329
2330
2331
2332
2333
2334
2335
2336
2337static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
2338{
2339 int rc;
2340 wait_queue_head_t *wq;
2341 struct l_wait_info lwi;
2342
2343
2344 LASSERT(!in_interrupt());
2345
2346
2347 if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
2348 async && request->rq_reply_deadline == 0 && cfs_fail_val == 0)
2349 request->rq_reply_deadline =
2350 ktime_get_real_seconds() + LONG_UNLINK;
2351
2352
2353 if (!ptlrpc_client_recv_or_unlink(request))
2354 return 1;
2355
2356 LNetMDUnlink(request->rq_reply_md_h);
2357
2358
2359 if (!ptlrpc_client_recv_or_unlink(request))
2360 return 1;
2361
2362
2363 ptlrpc_rqphase_move(request, RQ_PHASE_UNREG_RPC);
2364
2365
2366 if (async)
2367 return 0;
2368
2369
2370
2371
2372
2373
2374 if (request->rq_set)
2375 wq = &request->rq_set->set_waitq;
2376 else
2377 wq = &request->rq_reply_waitq;
2378
2379 for (;;) {
2380
2381
2382
2383
2384 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
2385 cfs_time_seconds(1), NULL, NULL);
2386 rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
2387 &lwi);
2388 if (rc == 0) {
2389 ptlrpc_rqphase_move(request, request->rq_next_phase);
2390 return 1;
2391 }
2392
2393 LASSERT(rc == -ETIMEDOUT);
2394 DEBUG_REQ(D_WARNING, request,
2395 "Unexpectedly long timeout receiving_reply=%d req_ulinked=%d reply_unlinked=%d",
2396 request->rq_receiving_reply,
2397 request->rq_req_unlinked,
2398 request->rq_reply_unlinked);
2399 }
2400 return 0;
2401}
2402
2403static void ptlrpc_free_request(struct ptlrpc_request *req)
2404{
2405 spin_lock(&req->rq_lock);
2406 req->rq_replay = 0;
2407 spin_unlock(&req->rq_lock);
2408
2409 if (req->rq_commit_cb)
2410 req->rq_commit_cb(req);
2411 list_del_init(&req->rq_replay_list);
2412
2413 __ptlrpc_req_finished(req, 1);
2414}
2415
2416
2417
2418
2419void ptlrpc_request_committed(struct ptlrpc_request *req, int force)
2420{
2421 struct obd_import *imp = req->rq_import;
2422
2423 spin_lock(&imp->imp_lock);
2424 if (list_empty(&req->rq_replay_list)) {
2425 spin_unlock(&imp->imp_lock);
2426 return;
2427 }
2428
2429 if (force || req->rq_transno <= imp->imp_peer_committed_transno)
2430 ptlrpc_free_request(req);
2431
2432 spin_unlock(&imp->imp_lock);
2433}
2434EXPORT_SYMBOL(ptlrpc_request_committed);
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444void ptlrpc_free_committed(struct obd_import *imp)
2445{
2446 struct ptlrpc_request *req, *saved;
2447 struct ptlrpc_request *last_req = NULL;
2448 bool skip_committed_list = true;
2449
2450 assert_spin_locked(&imp->imp_lock);
2451
2452 if (imp->imp_peer_committed_transno == imp->imp_last_transno_checked &&
2453 imp->imp_generation == imp->imp_last_generation_checked) {
2454 CDEBUG(D_INFO, "%s: skip recheck: last_committed %llu\n",
2455 imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
2456 return;
2457 }
2458 CDEBUG(D_RPCTRACE, "%s: committing for last_committed %llu gen %d\n",
2459 imp->imp_obd->obd_name, imp->imp_peer_committed_transno,
2460 imp->imp_generation);
2461
2462 if (imp->imp_generation != imp->imp_last_generation_checked ||
2463 !imp->imp_last_transno_checked)
2464 skip_committed_list = false;
2465
2466 imp->imp_last_transno_checked = imp->imp_peer_committed_transno;
2467 imp->imp_last_generation_checked = imp->imp_generation;
2468
2469 list_for_each_entry_safe(req, saved, &imp->imp_replay_list,
2470 rq_replay_list) {
2471
2472 LASSERT(req != last_req);
2473 last_req = req;
2474
2475 if (req->rq_transno == 0) {
2476 DEBUG_REQ(D_EMERG, req, "zero transno during replay");
2477 LBUG();
2478 }
2479 if (req->rq_import_generation < imp->imp_generation) {
2480 DEBUG_REQ(D_RPCTRACE, req, "free request with old gen");
2481 goto free_req;
2482 }
2483
2484
2485 if (req->rq_transno > imp->imp_peer_committed_transno) {
2486 DEBUG_REQ(D_RPCTRACE, req, "stopping search");
2487 break;
2488 }
2489
2490 if (req->rq_replay) {
2491 DEBUG_REQ(D_RPCTRACE, req, "keeping (FL_REPLAY)");
2492 list_move_tail(&req->rq_replay_list,
2493 &imp->imp_committed_list);
2494 continue;
2495 }
2496
2497 DEBUG_REQ(D_INFO, req, "commit (last_committed %llu)",
2498 imp->imp_peer_committed_transno);
2499free_req:
2500 ptlrpc_free_request(req);
2501 }
2502 if (skip_committed_list)
2503 return;
2504
2505 list_for_each_entry_safe(req, saved, &imp->imp_committed_list,
2506 rq_replay_list) {
2507 LASSERT(req->rq_transno != 0);
2508 if (req->rq_import_generation < imp->imp_generation) {
2509 DEBUG_REQ(D_RPCTRACE, req, "free stale open request");
2510 ptlrpc_free_request(req);
2511 } else if (!req->rq_replay) {
2512 DEBUG_REQ(D_RPCTRACE, req, "free closed open request");
2513 ptlrpc_free_request(req);
2514 }
2515 }
2516}
2517
2518
2519
2520
2521
2522
2523
2524void ptlrpc_resend_req(struct ptlrpc_request *req)
2525{
2526 DEBUG_REQ(D_HA, req, "going to resend");
2527 spin_lock(&req->rq_lock);
2528
2529
2530
2531
2532
2533 if (ptlrpc_client_replied(req)) {
2534 spin_unlock(&req->rq_lock);
2535 DEBUG_REQ(D_HA, req, "it has reply, so skip it");
2536 return;
2537 }
2538
2539 lustre_msg_set_handle(req->rq_reqmsg, &(struct lustre_handle){ 0 });
2540 req->rq_status = -EAGAIN;
2541
2542 req->rq_resend = 1;
2543 req->rq_net_err = 0;
2544 req->rq_timedout = 0;
2545 if (req->rq_bulk) {
2546 __u64 old_xid = req->rq_xid;
2547
2548
2549 req->rq_xid = ptlrpc_next_xid();
2550 CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
2551 old_xid, req->rq_xid);
2552 }
2553 ptlrpc_client_wake_req(req);
2554 spin_unlock(&req->rq_lock);
2555}
2556
2557
2558
2559
2560struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
2561{
2562 atomic_inc(&req->rq_refcount);
2563 return req;
2564}
2565EXPORT_SYMBOL(ptlrpc_request_addref);
2566
2567
2568
2569
2570
2571void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
2572 struct obd_import *imp)
2573{
2574 struct list_head *tmp;
2575
2576 assert_spin_locked(&imp->imp_lock);
2577
2578 if (req->rq_transno == 0) {
2579 DEBUG_REQ(D_EMERG, req, "saving request with zero transno");
2580 LBUG();
2581 }
2582
2583
2584
2585
2586
2587 lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2588
2589
2590 if (!list_empty(&req->rq_replay_list))
2591 return;
2592
2593 lustre_msg_add_flags(req->rq_reqmsg, MSG_REPLAY);
2594
2595 LASSERT(imp->imp_replayable);
2596
2597 ptlrpc_request_addref(req);
2598 list_for_each_prev(tmp, &imp->imp_replay_list) {
2599 struct ptlrpc_request *iter =
2600 list_entry(tmp, struct ptlrpc_request, rq_replay_list);
2601
2602
2603
2604
2605
2606
2607
2608
2609 if (iter->rq_transno > req->rq_transno)
2610 continue;
2611
2612 if (iter->rq_transno == req->rq_transno) {
2613 LASSERT(iter->rq_xid != req->rq_xid);
2614 if (iter->rq_xid > req->rq_xid)
2615 continue;
2616 }
2617
2618 list_add(&req->rq_replay_list, &iter->rq_replay_list);
2619 return;
2620 }
2621
2622 list_add(&req->rq_replay_list, &imp->imp_replay_list);
2623}
2624
2625
2626
2627
2628
2629int ptlrpc_queue_wait(struct ptlrpc_request *req)
2630{
2631 struct ptlrpc_request_set *set;
2632 int rc;
2633
2634 LASSERT(!req->rq_set);
2635 LASSERT(!req->rq_receiving_reply);
2636
2637 set = ptlrpc_prep_set();
2638 if (!set) {
2639 CERROR("cannot allocate ptlrpc set: rc = %d\n", -ENOMEM);
2640 return -ENOMEM;
2641 }
2642
2643
2644 lustre_msg_set_status(req->rq_reqmsg, current_pid());
2645
2646
2647 ptlrpc_request_addref(req);
2648 ptlrpc_set_add_req(set, req);
2649 rc = ptlrpc_set_wait(set);
2650 ptlrpc_set_destroy(set);
2651
2652 return rc;
2653}
2654EXPORT_SYMBOL(ptlrpc_queue_wait);
2655
2656
2657
2658
2659
2660
2661static int ptlrpc_replay_interpret(const struct lu_env *env,
2662 struct ptlrpc_request *req,
2663 void *data, int rc)
2664{
2665 struct ptlrpc_replay_async_args *aa = data;
2666 struct obd_import *imp = req->rq_import;
2667
2668 atomic_dec(&imp->imp_replay_inflight);
2669
2670 if (!ptlrpc_client_replied(req)) {
2671 CERROR("request replay timed out, restarting recovery\n");
2672 rc = -ETIMEDOUT;
2673 goto out;
2674 }
2675
2676 if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR &&
2677 (lustre_msg_get_status(req->rq_repmsg) == -ENOTCONN ||
2678 lustre_msg_get_status(req->rq_repmsg) == -ENODEV)) {
2679 rc = lustre_msg_get_status(req->rq_repmsg);
2680 goto out;
2681 }
2682
2683
2684 if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) {
2685
2686 DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n");
2687 spin_lock(&imp->imp_lock);
2688 imp->imp_vbr_failed = 1;
2689 imp->imp_no_lock_replay = 1;
2690 spin_unlock(&imp->imp_lock);
2691 lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status);
2692 } else {
2693
2694 LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) ==
2695 lustre_msg_get_transno(req->rq_repmsg) ||
2696 lustre_msg_get_transno(req->rq_repmsg) == 0,
2697 "%#llx/%#llx\n",
2698 lustre_msg_get_transno(req->rq_reqmsg),
2699 lustre_msg_get_transno(req->rq_repmsg));
2700 }
2701
2702 spin_lock(&imp->imp_lock);
2703
2704 if (lustre_msg_get_flags(req->rq_repmsg) & MSG_VERSION_REPLAY)
2705 imp->imp_no_lock_replay = 1;
2706 imp->imp_last_replay_transno = lustre_msg_get_transno(req->rq_reqmsg);
2707 spin_unlock(&imp->imp_lock);
2708 LASSERT(imp->imp_last_replay_transno);
2709
2710
2711 if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) {
2712 DEBUG_REQ(D_ERROR, req,
2713 "Reported transno %llu is bigger than the replayed one: %llu",
2714 req->rq_transno,
2715 lustre_msg_get_transno(req->rq_reqmsg));
2716 rc = -EINVAL;
2717 goto out;
2718 }
2719
2720 DEBUG_REQ(D_HA, req, "got rep");
2721
2722
2723 if (req->rq_replay_cb)
2724 req->rq_replay_cb(req);
2725
2726 if (ptlrpc_client_replied(req) &&
2727 lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) {
2728 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
2729 lustre_msg_get_status(req->rq_repmsg),
2730 aa->praa_old_status);
2731 } else {
2732
2733 lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status);
2734 }
2735
2736
2737
2738
2739
2740 if (req->rq_transno == 0)
2741 CERROR("Transno is 0 during replay!\n");
2742
2743
2744 rc = ptlrpc_import_recovery_state_machine(imp);
2745 out:
2746 req->rq_send_state = aa->praa_old_state;
2747
2748 if (rc != 0)
2749
2750 ptlrpc_connect_import(imp);
2751
2752 return rc;
2753}
2754
2755
2756
2757
2758
2759
2760int ptlrpc_replay_req(struct ptlrpc_request *req)
2761{
2762 struct ptlrpc_replay_async_args *aa;
2763
2764 LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
2765
2766 LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2767 aa = ptlrpc_req_async_args(req);
2768 memset(aa, 0, sizeof(*aa));
2769
2770
2771 aa->praa_old_state = req->rq_send_state;
2772 req->rq_send_state = LUSTRE_IMP_REPLAY;
2773 req->rq_phase = RQ_PHASE_NEW;
2774 req->rq_next_phase = RQ_PHASE_UNDEFINED;
2775 if (req->rq_repmsg)
2776 aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg);
2777 req->rq_status = 0;
2778 req->rq_interpret_reply = ptlrpc_replay_interpret;
2779
2780 ptlrpc_at_set_req_timeout(req);
2781
2782
2783
2784
2785
2786 lustre_msg_set_service_time(req->rq_reqmsg,
2787 ptlrpc_at_get_net_latency(req));
2788 DEBUG_REQ(D_HA, req, "REPLAY");
2789
2790 atomic_inc(&req->rq_import->imp_replay_inflight);
2791 ptlrpc_request_addref(req);
2792
2793 ptlrpcd_add_req(req);
2794 return 0;
2795}
2796
2797
2798
2799
2800void ptlrpc_abort_inflight(struct obd_import *imp)
2801{
2802 struct list_head *tmp, *n;
2803
2804
2805
2806
2807
2808
2809 spin_lock(&imp->imp_lock);
2810
2811
2812
2813
2814
2815
2816 list_for_each_safe(tmp, n, &imp->imp_sending_list) {
2817 struct ptlrpc_request *req =
2818 list_entry(tmp, struct ptlrpc_request, rq_list);
2819
2820 DEBUG_REQ(D_RPCTRACE, req, "inflight");
2821
2822 spin_lock(&req->rq_lock);
2823 if (req->rq_import_generation < imp->imp_generation) {
2824 req->rq_err = 1;
2825 req->rq_status = -EIO;
2826 ptlrpc_client_wake_req(req);
2827 }
2828 spin_unlock(&req->rq_lock);
2829 }
2830
2831 list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
2832 struct ptlrpc_request *req =
2833 list_entry(tmp, struct ptlrpc_request, rq_list);
2834
2835 DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req");
2836
2837 spin_lock(&req->rq_lock);
2838 if (req->rq_import_generation < imp->imp_generation) {
2839 req->rq_err = 1;
2840 req->rq_status = -EIO;
2841 ptlrpc_client_wake_req(req);
2842 }
2843 spin_unlock(&req->rq_lock);
2844 }
2845
2846
2847
2848
2849
2850 if (imp->imp_replayable)
2851 ptlrpc_free_committed(imp);
2852
2853 spin_unlock(&imp->imp_lock);
2854}
2855
2856
2857
2858
2859void ptlrpc_abort_set(struct ptlrpc_request_set *set)
2860{
2861 struct list_head *tmp, *pos;
2862
2863 list_for_each_safe(pos, tmp, &set->set_requests) {
2864 struct ptlrpc_request *req =
2865 list_entry(pos, struct ptlrpc_request, rq_set_chain);
2866
2867 spin_lock(&req->rq_lock);
2868 if (req->rq_phase != RQ_PHASE_RPC) {
2869 spin_unlock(&req->rq_lock);
2870 continue;
2871 }
2872
2873 req->rq_err = 1;
2874 req->rq_status = -EINTR;
2875 ptlrpc_client_wake_req(req);
2876 spin_unlock(&req->rq_lock);
2877 }
2878}
2879
2880static __u64 ptlrpc_last_xid;
2881static spinlock_t ptlrpc_last_xid_lock;
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894
2895
2896
2897
2898#define YEAR_2004 (1ULL << 30)
2899void ptlrpc_init_xid(void)
2900{
2901 time64_t now = ktime_get_real_seconds();
2902
2903 spin_lock_init(&ptlrpc_last_xid_lock);
2904 if (now < YEAR_2004) {
2905 cfs_get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid));
2906 ptlrpc_last_xid >>= 2;
2907 ptlrpc_last_xid |= (1ULL << 61);
2908 } else {
2909 ptlrpc_last_xid = (__u64)now << 20;
2910 }
2911
2912
2913 CLASSERT(((PTLRPC_BULK_OPS_COUNT - 1) & PTLRPC_BULK_OPS_COUNT) == 0);
2914 ptlrpc_last_xid &= PTLRPC_BULK_OPS_MASK;
2915}
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929__u64 ptlrpc_next_xid(void)
2930{
2931 __u64 next;
2932
2933 spin_lock(&ptlrpc_last_xid_lock);
2934 next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
2935 ptlrpc_last_xid = next;
2936 spin_unlock(&ptlrpc_last_xid_lock);
2937
2938 return next;
2939}
2940
2941
2942
2943
2944
2945__u64 ptlrpc_sample_next_xid(void)
2946{
2947#if BITS_PER_LONG == 32
2948
2949 __u64 next;
2950
2951 spin_lock(&ptlrpc_last_xid_lock);
2952 next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
2953 spin_unlock(&ptlrpc_last_xid_lock);
2954
2955 return next;
2956#else
2957
2958 return ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
2959#endif
2960}
2961EXPORT_SYMBOL(ptlrpc_sample_next_xid);
2962
2963
2964
2965
2966
2967
2968
2969
2970
2971
2972
2973
2974
2975
2976
2977
2978
2979
2980struct ptlrpc_work_async_args {
2981 int (*cb)(const struct lu_env *, void *);
2982 void *cbdata;
2983};
2984
2985static void ptlrpcd_add_work_req(struct ptlrpc_request *req)
2986{
2987
2988 req->rq_timeout = obd_timeout;
2989 req->rq_sent = ktime_get_real_seconds();
2990 req->rq_deadline = req->rq_sent + req->rq_timeout;
2991 req->rq_phase = RQ_PHASE_INTERPRET;
2992 req->rq_next_phase = RQ_PHASE_COMPLETE;
2993 req->rq_xid = ptlrpc_next_xid();
2994 req->rq_import_generation = req->rq_import->imp_generation;
2995
2996 ptlrpcd_add_req(req);
2997}
2998
2999static int work_interpreter(const struct lu_env *env,
3000 struct ptlrpc_request *req, void *data, int rc)
3001{
3002 struct ptlrpc_work_async_args *arg = data;
3003
3004 LASSERT(ptlrpcd_check_work(req));
3005
3006 rc = arg->cb(env, arg->cbdata);
3007
3008 list_del_init(&req->rq_set_chain);
3009 req->rq_set = NULL;
3010
3011 if (atomic_dec_return(&req->rq_refcount) > 1) {
3012 atomic_set(&req->rq_refcount, 2);
3013 ptlrpcd_add_work_req(req);
3014 }
3015 return rc;
3016}
3017
3018static int worker_format;
3019
3020static int ptlrpcd_check_work(struct ptlrpc_request *req)
3021{
3022 return req->rq_pill.rc_fmt == (void *)&worker_format;
3023}
3024
3025
3026
3027
3028void *ptlrpcd_alloc_work(struct obd_import *imp,
3029 int (*cb)(const struct lu_env *, void *), void *cbdata)
3030{
3031 struct ptlrpc_request *req = NULL;
3032 struct ptlrpc_work_async_args *args;
3033
3034 might_sleep();
3035
3036 if (!cb)
3037 return ERR_PTR(-EINVAL);
3038
3039
3040 req = ptlrpc_request_cache_alloc(GFP_NOFS);
3041 if (!req) {
3042 CERROR("ptlrpc: run out of memory!\n");
3043 return ERR_PTR(-ENOMEM);
3044 }
3045
3046 ptlrpc_cli_req_init(req);
3047
3048 req->rq_send_state = LUSTRE_IMP_FULL;
3049 req->rq_type = PTL_RPC_MSG_REQUEST;
3050 req->rq_import = class_import_get(imp);
3051 req->rq_interpret_reply = work_interpreter;
3052
3053 req->rq_no_delay = 1;
3054 req->rq_no_resend = 1;
3055 req->rq_pill.rc_fmt = (void *)&worker_format;
3056
3057 CLASSERT(sizeof(*args) <= sizeof(req->rq_async_args));
3058 args = ptlrpc_req_async_args(req);
3059 args->cb = cb;
3060 args->cbdata = cbdata;
3061
3062 return req;
3063}
3064EXPORT_SYMBOL(ptlrpcd_alloc_work);
3065
3066void ptlrpcd_destroy_work(void *handler)
3067{
3068 struct ptlrpc_request *req = handler;
3069
3070 if (req)
3071 ptlrpc_req_finished(req);
3072}
3073EXPORT_SYMBOL(ptlrpcd_destroy_work);
3074
3075int ptlrpcd_queue_work(void *handler)
3076{
3077 struct ptlrpc_request *req = handler;
3078
3079
3080
3081
3082
3083
3084
3085
3086
3087 LASSERT(atomic_read(&req->rq_refcount) > 0);
3088 if (atomic_inc_return(&req->rq_refcount) == 2)
3089 ptlrpcd_add_work_req(req);
3090 return 0;
3091}
3092EXPORT_SYMBOL(ptlrpcd_queue_work);
3093