1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62#define DEBUG_SUBSYSTEM S_LDLM
63
64#include "../include/lustre_dlm.h"
65#include "../include/obd_class.h"
66#include "../include/obd.h"
67
68#include "ldlm_internal.h"
69
70int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
71module_param(ldlm_enqueue_min, int, 0644);
72MODULE_PARM_DESC(ldlm_enqueue_min, "lock enqueue timeout minimum");
73
74
75unsigned int ldlm_cancel_unused_locks_before_replay = 1;
76
77static void interrupted_completion_wait(void *data)
78{
79}
80
81struct lock_wait_data {
82 struct ldlm_lock *lwd_lock;
83 __u32 lwd_conn_cnt;
84};
85
86struct ldlm_async_args {
87 struct lustre_handle lock_handle;
88};
89
90static int ldlm_expired_completion_wait(void *data)
91{
92 struct lock_wait_data *lwd = data;
93 struct ldlm_lock *lock = lwd->lwd_lock;
94 struct obd_import *imp;
95 struct obd_device *obd;
96
97 if (!lock->l_conn_export) {
98 static unsigned long next_dump, last_dump;
99
100 LCONSOLE_WARN("lock timed out (enqueued at %lld, %llds ago)\n",
101 (s64)lock->l_last_activity,
102 (s64)(ktime_get_real_seconds() -
103 lock->l_last_activity));
104 LDLM_DEBUG(lock, "lock timed out (enqueued at %lld, %llds ago); not entering recovery in server code, just going back to sleep",
105 (s64)lock->l_last_activity,
106 (s64)(ktime_get_real_seconds() -
107 lock->l_last_activity));
108 if (cfs_time_after(cfs_time_current(), next_dump)) {
109 last_dump = next_dump;
110 next_dump = cfs_time_shift(300);
111 ldlm_namespace_dump(D_DLMTRACE,
112 ldlm_lock_to_ns(lock));
113 if (last_dump == 0)
114 libcfs_debug_dumplog();
115 }
116 return 0;
117 }
118
119 obd = lock->l_conn_export->exp_obd;
120 imp = obd->u.cli.cl_import;
121 ptlrpc_fail_import(imp, lwd->lwd_conn_cnt);
122 LDLM_ERROR(lock, "lock timed out (enqueued at %lld, %llds ago), entering recovery for %s@%s",
123 (s64)lock->l_last_activity,
124 (s64)(ktime_get_real_seconds() - lock->l_last_activity),
125 obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);
126
127 return 0;
128}
129
130
131
132
133static int ldlm_get_enq_timeout(struct ldlm_lock *lock)
134{
135 int timeout = at_get(ldlm_lock_to_ns_at(lock));
136
137 if (AT_OFF)
138 return obd_timeout / 2;
139
140
141
142
143 timeout = min_t(int, at_max, timeout + (timeout >> 1));
144 return max(timeout, ldlm_enqueue_min);
145}
146
147
148
149
150
151static int ldlm_completion_tail(struct ldlm_lock *lock)
152{
153 long delay;
154 int result;
155
156 if (lock->l_flags & (LDLM_FL_DESTROYED | LDLM_FL_FAILED)) {
157 LDLM_DEBUG(lock, "client-side enqueue: destroyed");
158 result = -EIO;
159 } else {
160 delay = ktime_get_real_seconds() - lock->l_last_activity;
161 LDLM_DEBUG(lock, "client-side enqueue: granted after %lds",
162 delay);
163
164
165 at_measured(ldlm_lock_to_ns_at(lock),
166 delay);
167 result = 0;
168 }
169 return result;
170}
171
172
173
174
175
176
177int ldlm_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data)
178{
179 if (flags == LDLM_FL_WAIT_NOREPROC) {
180 LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock");
181 return 0;
182 }
183
184 if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
185 LDLM_FL_BLOCK_CONV))) {
186 wake_up(&lock->l_waitq);
187 return ldlm_completion_tail(lock);
188 }
189
190 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, going forward");
191 return 0;
192}
193EXPORT_SYMBOL(ldlm_completion_ast_async);
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216int ldlm_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
217{
218
219 struct lock_wait_data lwd;
220 struct obd_device *obd;
221 struct obd_import *imp = NULL;
222 struct l_wait_info lwi;
223 __u32 timeout;
224 int rc = 0;
225
226 if (flags == LDLM_FL_WAIT_NOREPROC) {
227 LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock");
228 goto noreproc;
229 }
230
231 if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
232 LDLM_FL_BLOCK_CONV))) {
233 wake_up(&lock->l_waitq);
234 return 0;
235 }
236
237 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, sleeping");
238
239noreproc:
240
241 obd = class_exp2obd(lock->l_conn_export);
242
243
244 if (obd)
245 imp = obd->u.cli.cl_import;
246
247
248
249
250
251 timeout = ldlm_get_enq_timeout(lock) * 2;
252
253 lwd.lwd_lock = lock;
254
255 if (lock->l_flags & LDLM_FL_NO_TIMEOUT) {
256 LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
257 lwi = LWI_INTR(interrupted_completion_wait, &lwd);
258 } else {
259 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
260 ldlm_expired_completion_wait,
261 interrupted_completion_wait, &lwd);
262 }
263
264 if (imp) {
265 spin_lock(&imp->imp_lock);
266 lwd.lwd_conn_cnt = imp->imp_conn_cnt;
267 spin_unlock(&imp->imp_lock);
268 }
269
270 if (OBD_FAIL_CHECK_RESET(OBD_FAIL_LDLM_INTR_CP_AST,
271 OBD_FAIL_LDLM_CP_BL_RACE | OBD_FAIL_ONCE)) {
272 lock->l_flags |= LDLM_FL_FAIL_LOC;
273 rc = -EINTR;
274 } else {
275
276 rc = l_wait_event(lock->l_waitq,
277 is_granted_or_cancelled(lock), &lwi);
278 }
279
280 if (rc) {
281 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
282 rc);
283 return rc;
284 }
285
286 return ldlm_completion_tail(lock);
287}
288EXPORT_SYMBOL(ldlm_completion_ast);
289
290static void failed_lock_cleanup(struct ldlm_namespace *ns,
291 struct ldlm_lock *lock, int mode)
292{
293 int need_cancel = 0;
294
295
296 lock_res_and_lock(lock);
297
298 if ((lock->l_req_mode != lock->l_granted_mode) &&
299 !(lock->l_flags & LDLM_FL_FAILED)) {
300
301
302
303
304 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED |
305 LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING;
306 need_cancel = 1;
307 }
308 unlock_res_and_lock(lock);
309
310 if (need_cancel)
311 LDLM_DEBUG(lock,
312 "setting FL_LOCAL_ONLY | LDLM_FL_FAILED | LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING");
313 else
314 LDLM_DEBUG(lock, "lock was granted or failed in race");
315
316 ldlm_lock_decref_internal(lock, mode);
317
318
319
320
321
322
323
324
325
326 if (lock->l_resource->lr_type == LDLM_FLOCK) {
327 lock_res_and_lock(lock);
328 ldlm_resource_unlink_lock(lock);
329 ldlm_lock_destroy_nolock(lock);
330 unlock_res_and_lock(lock);
331 }
332}
333
334
335
336
337
338
339int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
340 enum ldlm_type type, __u8 with_policy,
341 enum ldlm_mode mode,
342 __u64 *flags, void *lvb, __u32 lvb_len,
343 struct lustre_handle *lockh, int rc)
344{
345 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
346 int is_replay = *flags & LDLM_FL_REPLAY;
347 struct ldlm_lock *lock;
348 struct ldlm_reply *reply;
349 int cleanup_phase = 1;
350 int size = 0;
351
352 lock = ldlm_handle2lock(lockh);
353
354 if (!lock) {
355 LASSERT(type == LDLM_FLOCK);
356 return -ENOLCK;
357 }
358
359 LASSERTF(ergo(lvb_len != 0, lvb_len == lock->l_lvb_len),
360 "lvb_len = %d, l_lvb_len = %d\n", lvb_len, lock->l_lvb_len);
361
362 if (rc != ELDLM_OK) {
363 LASSERT(!is_replay);
364 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
365 rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
366
367 if (rc != ELDLM_LOCK_ABORTED)
368 goto cleanup;
369 }
370
371
372 reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
373 if (!reply) {
374 rc = -EPROTO;
375 goto cleanup;
376 }
377
378 if (lvb_len != 0) {
379 LASSERT(lvb);
380
381 size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB,
382 RCL_SERVER);
383 if (size < 0) {
384 LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", size);
385 rc = size;
386 goto cleanup;
387 } else if (unlikely(size > lvb_len)) {
388 LDLM_ERROR(lock, "Replied LVB is larger than expectation, expected = %d, replied = %d",
389 lvb_len, size);
390 rc = -EINVAL;
391 goto cleanup;
392 }
393 }
394
395 if (rc == ELDLM_LOCK_ABORTED) {
396 if (lvb_len != 0)
397 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
398 lvb, size);
399 if (rc == 0)
400 rc = ELDLM_LOCK_ABORTED;
401 goto cleanup;
402 }
403
404
405 cleanup_phase = 0;
406
407 lock_res_and_lock(lock);
408
409 if (exp->exp_lock_hash) {
410
411
412
413
414 cfs_hash_rehash_key(exp->exp_lock_hash,
415 &lock->l_remote_handle,
416 &reply->lock_handle,
417 &lock->l_exp_hash);
418 } else {
419 lock->l_remote_handle = reply->lock_handle;
420 }
421
422 *flags = ldlm_flags_from_wire(reply->lock_flags);
423 lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
424 LDLM_INHERIT_FLAGS);
425
426
427
428 lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
429 LDLM_FL_NO_TIMEOUT);
430 unlock_res_and_lock(lock);
431
432 CDEBUG(D_INFO, "local: %p, remote cookie: %#llx, flags: 0x%llx\n",
433 lock, reply->lock_handle.cookie, *flags);
434
435
436
437
438
439 if ((*flags) & LDLM_FL_LOCK_CHANGED) {
440 int newmode = reply->lock_desc.l_req_mode;
441
442 LASSERT(!is_replay);
443 if (newmode && newmode != lock->l_req_mode) {
444 LDLM_DEBUG(lock, "server returned different mode %s",
445 ldlm_lockname[newmode]);
446 lock->l_req_mode = newmode;
447 }
448
449 if (!ldlm_res_eq(&reply->lock_desc.l_resource.lr_name,
450 &lock->l_resource->lr_name)) {
451 CDEBUG(D_INFO, "remote intent success, locking "DLDLMRES
452 " instead of "DLDLMRES"\n",
453 PLDLMRES(&reply->lock_desc.l_resource),
454 PLDLMRES(lock->l_resource));
455
456 rc = ldlm_lock_change_resource(ns, lock,
457 &reply->lock_desc.l_resource.lr_name);
458 if (rc || !lock->l_resource) {
459 rc = -ENOMEM;
460 goto cleanup;
461 }
462 LDLM_DEBUG(lock, "client-side enqueue, new resource");
463 }
464 if (with_policy)
465 if (!(type == LDLM_IBITS &&
466 !(exp_connect_flags(exp) & OBD_CONNECT_IBITS)))
467
468 ldlm_convert_policy_to_local(exp,
469 lock->l_resource->lr_type,
470 &reply->lock_desc.l_policy_data,
471 &lock->l_policy_data);
472 if (type != LDLM_PLAIN)
473 LDLM_DEBUG(lock,
474 "client-side enqueue, new policy data");
475 }
476
477 if ((*flags) & LDLM_FL_AST_SENT ||
478
479
480
481
482 (LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) {
483 lock_res_and_lock(lock);
484 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
485 unlock_res_and_lock(lock);
486 LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
487 }
488
489
490
491
492 if (lvb_len != 0) {
493
494
495
496
497
498 lock_res_and_lock(lock);
499 if (lock->l_req_mode != lock->l_granted_mode)
500 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
501 lock->l_lvb_data, size);
502 unlock_res_and_lock(lock);
503 if (rc < 0) {
504 cleanup_phase = 1;
505 goto cleanup;
506 }
507 }
508
509 if (!is_replay) {
510 rc = ldlm_lock_enqueue(ns, &lock, NULL, flags);
511 if (lock->l_completion_ast) {
512 int err = lock->l_completion_ast(lock, *flags, NULL);
513
514 if (!rc)
515 rc = err;
516 if (rc)
517 cleanup_phase = 1;
518 }
519 }
520
521 if (lvb_len && lvb) {
522
523
524
525 memcpy(lvb, lock->l_lvb_data, lvb_len);
526 }
527
528 LDLM_DEBUG(lock, "client-side enqueue END");
529cleanup:
530 if (cleanup_phase == 1 && rc)
531 failed_lock_cleanup(ns, lock, mode);
532
533 LDLM_LOCK_PUT(lock);
534 LDLM_LOCK_RELEASE(lock);
535 return rc;
536}
537EXPORT_SYMBOL(ldlm_cli_enqueue_fini);
538
539
540
541
542
543
544
545static inline int ldlm_req_handles_avail(int req_size, int off)
546{
547 int avail;
548
549 avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512) - req_size;
550 if (likely(avail >= 0))
551 avail /= (int)sizeof(struct lustre_handle);
552 else
553 avail = 0;
554 avail += LDLM_LOCKREQ_HANDLES - off;
555
556 return avail;
557}
558
559static inline int ldlm_capsule_handles_avail(struct req_capsule *pill,
560 enum req_location loc,
561 int off)
562{
563 int size = req_capsule_msg_size(pill, loc);
564
565 return ldlm_req_handles_avail(size, off);
566}
567
568static inline int ldlm_format_handles_avail(struct obd_import *imp,
569 const struct req_format *fmt,
570 enum req_location loc, int off)
571{
572 int size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc);
573
574 return ldlm_req_handles_avail(size, off);
575}
576
577
578
579
580
581
582
583
584
585int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
586 int version, int opc, int canceloff,
587 struct list_head *cancels, int count)
588{
589 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
590 struct req_capsule *pill = &req->rq_pill;
591 struct ldlm_request *dlm = NULL;
592 int flags, avail, to_free, pack = 0;
593 LIST_HEAD(head);
594 int rc;
595
596 if (!cancels)
597 cancels = &head;
598 if (ns_connect_cancelset(ns)) {
599
600 req_capsule_filled_sizes(pill, RCL_CLIENT);
601 avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff);
602
603 flags = ns_connect_lru_resize(ns) ?
604 LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
605 to_free = !ns_connect_lru_resize(ns) &&
606 opc == LDLM_ENQUEUE ? 1 : 0;
607
608
609
610
611
612 if (avail > count)
613 count += ldlm_cancel_lru_local(ns, cancels, to_free,
614 avail - count, 0, flags);
615 if (avail > count)
616 pack = count;
617 else
618 pack = avail;
619 req_capsule_set_size(pill, &RMF_DLM_REQ, RCL_CLIENT,
620 ldlm_request_bufsize(pack, opc));
621 }
622
623 rc = ptlrpc_request_pack(req, version, opc);
624 if (rc) {
625 ldlm_lock_list_put(cancels, l_bl_ast, count);
626 return rc;
627 }
628
629 if (ns_connect_cancelset(ns)) {
630 if (canceloff) {
631 dlm = req_capsule_client_get(pill, &RMF_DLM_REQ);
632 LASSERT(dlm);
633
634
635
636
637
638 dlm->lock_count = canceloff;
639 }
640
641 ldlm_cli_cancel_list(cancels, pack, req, 0);
642
643 ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);
644 } else {
645 ldlm_lock_list_put(cancels, l_bl_ast, count);
646 }
647 return 0;
648}
649EXPORT_SYMBOL(ldlm_prep_elc_req);
650
651int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req,
652 struct list_head *cancels, int count)
653{
654 return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
655 LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
656}
657EXPORT_SYMBOL(ldlm_prep_enqueue_req);
658
659
660
661
662
663
664
665
666
667
668
669int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
670 struct ldlm_enqueue_info *einfo,
671 const struct ldlm_res_id *res_id,
672 ldlm_policy_data_t const *policy, __u64 *flags,
673 void *lvb, __u32 lvb_len, enum lvb_type lvb_type,
674 struct lustre_handle *lockh, int async)
675{
676 struct ldlm_namespace *ns;
677 struct ldlm_lock *lock;
678 struct ldlm_request *body;
679 int is_replay = *flags & LDLM_FL_REPLAY;
680 int req_passed_in = 1;
681 int rc, err;
682 struct ptlrpc_request *req;
683
684 ns = exp->exp_obd->obd_namespace;
685
686
687
688
689 if (is_replay) {
690 lock = ldlm_handle2lock_long(lockh, 0);
691 LASSERT(lock);
692 LDLM_DEBUG(lock, "client-side enqueue START");
693 LASSERT(exp == lock->l_conn_export);
694 } else {
695 const struct ldlm_callback_suite cbs = {
696 .lcs_completion = einfo->ei_cb_cp,
697 .lcs_blocking = einfo->ei_cb_bl,
698 .lcs_glimpse = einfo->ei_cb_gl
699 };
700 lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
701 einfo->ei_mode, &cbs, einfo->ei_cbdata,
702 lvb_len, lvb_type);
703 if (!lock)
704 return -ENOMEM;
705
706 ldlm_lock_addref_internal(lock, einfo->ei_mode);
707 ldlm_lock2handle(lock, lockh);
708 if (policy)
709 lock->l_policy_data = *policy;
710
711 if (einfo->ei_type == LDLM_EXTENT) {
712
713 if (!policy)
714 LBUG();
715
716 lock->l_req_extent = policy->l_extent;
717 }
718 LDLM_DEBUG(lock, "client-side enqueue START, flags %llx\n",
719 *flags);
720 }
721
722 lock->l_conn_export = exp;
723 lock->l_export = NULL;
724 lock->l_blocking_ast = einfo->ei_cb_bl;
725 lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL));
726
727
728
729 if (!reqp || !*reqp) {
730 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
731 &RQF_LDLM_ENQUEUE,
732 LUSTRE_DLM_VERSION,
733 LDLM_ENQUEUE);
734 if (!req) {
735 failed_lock_cleanup(ns, lock, einfo->ei_mode);
736 LDLM_LOCK_RELEASE(lock);
737 return -ENOMEM;
738 }
739 req_passed_in = 0;
740 if (reqp)
741 *reqp = req;
742 } else {
743 int len;
744
745 req = *reqp;
746 len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ,
747 RCL_CLIENT);
748 LASSERTF(len >= sizeof(*body), "buflen[%d] = %d, not %d\n",
749 DLM_LOCKREQ_OFF, len, (int)sizeof(*body));
750 }
751
752
753 body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
754 ldlm_lock2desc(lock, &body->lock_desc);
755 body->lock_flags = ldlm_flags_to_wire(*flags);
756 body->lock_handle[0] = *lockh;
757
758
759 if (!req_passed_in) {
760 if (lvb_len > 0)
761 req_capsule_extend(&req->rq_pill,
762 &RQF_LDLM_ENQUEUE_LVB);
763 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
764 lvb_len);
765 ptlrpc_request_set_replen(req);
766 }
767
768
769
770
771
772
773 LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT ||
774 policy->l_extent.end == OBD_OBJECT_EOF));
775
776 if (async) {
777 LASSERT(reqp);
778 return 0;
779 }
780
781 LDLM_DEBUG(lock, "sending request");
782
783 rc = ptlrpc_queue_wait(req);
784
785 err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
786 einfo->ei_mode, flags, lvb, lvb_len,
787 lockh, rc);
788
789
790
791
792 if (err == -ENOLCK)
793 LDLM_LOCK_RELEASE(lock);
794 else
795 rc = err;
796
797 if (!req_passed_in && req) {
798 ptlrpc_req_finished(req);
799 if (reqp)
800 *reqp = NULL;
801 }
802
803 return rc;
804}
805EXPORT_SYMBOL(ldlm_cli_enqueue);
806
807
808
809
810
811
812
813
814static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock)
815{
816 __u64 rc = LDLM_FL_LOCAL_ONLY;
817
818 if (lock->l_conn_export) {
819 bool local_only;
820
821 LDLM_DEBUG(lock, "client-side cancel");
822
823 lock_res_and_lock(lock);
824 lock->l_flags |= LDLM_FL_CBPENDING;
825 local_only = !!(lock->l_flags &
826 (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK));
827 ldlm_cancel_callback(lock);
828 rc = (lock->l_flags & LDLM_FL_BL_AST) ?
829 LDLM_FL_BL_AST : LDLM_FL_CANCELING;
830 unlock_res_and_lock(lock);
831
832 if (local_only) {
833 CDEBUG(D_DLMTRACE, "not sending request (at caller's instruction)\n");
834 rc = LDLM_FL_LOCAL_ONLY;
835 }
836 ldlm_lock_cancel(lock);
837 } else {
838 LDLM_ERROR(lock, "Trying to cancel local lock");
839 LBUG();
840 }
841
842 return rc;
843}
844
845
846
847
848static void ldlm_cancel_pack(struct ptlrpc_request *req,
849 struct list_head *head, int count)
850{
851 struct ldlm_request *dlm;
852 struct ldlm_lock *lock;
853 int max, packed = 0;
854
855 dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
856 LASSERT(dlm);
857
858
859 max = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT) -
860 sizeof(struct ldlm_request);
861 max /= sizeof(struct lustre_handle);
862 max += LDLM_LOCKREQ_HANDLES;
863 LASSERT(max >= dlm->lock_count + count);
864
865
866
867
868
869 list_for_each_entry(lock, head, l_bl_ast) {
870 if (!count--)
871 break;
872 LASSERT(lock->l_conn_export);
873
874 LDLM_DEBUG(lock, "packing");
875 dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle;
876 packed++;
877 }
878 CDEBUG(D_DLMTRACE, "%d locks packed\n", packed);
879}
880
881
882
883
884
885static int ldlm_cli_cancel_req(struct obd_export *exp,
886 struct list_head *cancels,
887 int count, enum ldlm_cancel_flags flags)
888{
889 struct ptlrpc_request *req = NULL;
890 struct obd_import *imp;
891 int free, sent = 0;
892 int rc = 0;
893
894 LASSERT(exp);
895 LASSERT(count > 0);
896
897 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, cfs_fail_val);
898
899 if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
900 return count;
901
902 free = ldlm_format_handles_avail(class_exp2cliimp(exp),
903 &RQF_LDLM_CANCEL, RCL_CLIENT, 0);
904 if (count > free)
905 count = free;
906
907 while (1) {
908 imp = class_exp2cliimp(exp);
909 if (!imp || imp->imp_invalid) {
910 CDEBUG(D_DLMTRACE,
911 "skipping cancel on invalid import %p\n", imp);
912 return count;
913 }
914
915 req = ptlrpc_request_alloc(imp, &RQF_LDLM_CANCEL);
916 if (!req) {
917 rc = -ENOMEM;
918 goto out;
919 }
920
921 req_capsule_filled_sizes(&req->rq_pill, RCL_CLIENT);
922 req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
923 ldlm_request_bufsize(count, LDLM_CANCEL));
924
925 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CANCEL);
926 if (rc) {
927 ptlrpc_request_free(req);
928 goto out;
929 }
930
931 req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
932 req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
933 ptlrpc_at_set_req_timeout(req);
934
935 ldlm_cancel_pack(req, cancels, count);
936
937 ptlrpc_request_set_replen(req);
938 if (flags & LCF_ASYNC) {
939 ptlrpcd_add_req(req);
940 sent = count;
941 goto out;
942 }
943
944 rc = ptlrpc_queue_wait(req);
945 if (rc == LUSTRE_ESTALE) {
946 CDEBUG(D_DLMTRACE, "client/server (nid %s) out of sync -- not fatal\n",
947 libcfs_nid2str(req->rq_import->
948 imp_connection->c_peer.nid));
949 rc = 0;
950 } else if (rc == -ETIMEDOUT &&
951 req->rq_import_generation == imp->imp_generation) {
952 ptlrpc_req_finished(req);
953 continue;
954 } else if (rc != ELDLM_OK) {
955
956 CDEBUG_LIMIT(rc == -ESHUTDOWN ? D_DLMTRACE : D_ERROR,
957 "Got rc %d from cancel RPC: canceling anyway\n",
958 rc);
959 break;
960 }
961 sent = count;
962 break;
963 }
964
965 ptlrpc_req_finished(req);
966out:
967 return sent ? sent : rc;
968}
969
970static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
971{
972 return &imp->imp_obd->obd_namespace->ns_pool;
973}
974
975
976
977
978int ldlm_cli_update_pool(struct ptlrpc_request *req)
979{
980 struct obd_device *obd;
981 __u64 new_slv;
982 __u32 new_limit;
983
984 if (unlikely(!req->rq_import || !req->rq_import->imp_obd ||
985 !imp_connect_lru_resize(req->rq_import))) {
986
987
988
989 return 0;
990 }
991
992
993
994
995
996
997
998 if (lustre_msg_get_slv(req->rq_repmsg) == 0 ||
999 lustre_msg_get_limit(req->rq_repmsg) == 0) {
1000 DEBUG_REQ(D_HA, req,
1001 "Zero SLV or Limit found (SLV: %llu, Limit: %u)",
1002 lustre_msg_get_slv(req->rq_repmsg),
1003 lustre_msg_get_limit(req->rq_repmsg));
1004 return 0;
1005 }
1006
1007 new_limit = lustre_msg_get_limit(req->rq_repmsg);
1008 new_slv = lustre_msg_get_slv(req->rq_repmsg);
1009 obd = req->rq_import->imp_obd;
1010
1011
1012
1013
1014
1015
1016
1017 write_lock(&obd->obd_pool_lock);
1018 obd->obd_pool_slv = new_slv;
1019 obd->obd_pool_limit = new_limit;
1020 write_unlock(&obd->obd_pool_lock);
1021
1022 return 0;
1023}
1024EXPORT_SYMBOL(ldlm_cli_update_pool);
1025
1026
1027
1028
1029
1030
1031int ldlm_cli_cancel(struct lustre_handle *lockh,
1032 enum ldlm_cancel_flags cancel_flags)
1033{
1034 struct obd_export *exp;
1035 int avail, flags, count = 1;
1036 __u64 rc = 0;
1037 struct ldlm_namespace *ns;
1038 struct ldlm_lock *lock;
1039 LIST_HEAD(cancels);
1040
1041
1042 lock = ldlm_handle2lock_long(lockh, LDLM_FL_CANCELING);
1043 if (!lock) {
1044 LDLM_DEBUG_NOLOCK("lock is already being destroyed");
1045 return 0;
1046 }
1047
1048 rc = ldlm_cli_cancel_local(lock);
1049 if (rc == LDLM_FL_LOCAL_ONLY || cancel_flags & LCF_LOCAL) {
1050 LDLM_LOCK_RELEASE(lock);
1051 return 0;
1052 }
1053
1054
1055
1056
1057 LASSERT(list_empty(&lock->l_bl_ast));
1058 list_add(&lock->l_bl_ast, &cancels);
1059
1060 exp = lock->l_conn_export;
1061 if (exp_connect_cancelset(exp)) {
1062 avail = ldlm_format_handles_avail(class_exp2cliimp(exp),
1063 &RQF_LDLM_CANCEL,
1064 RCL_CLIENT, 0);
1065 LASSERT(avail > 0);
1066
1067 ns = ldlm_lock_to_ns(lock);
1068 flags = ns_connect_lru_resize(ns) ?
1069 LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
1070 count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
1071 LCF_BL_AST, flags);
1072 }
1073 ldlm_cli_cancel_list(&cancels, count, NULL, cancel_flags);
1074 return 0;
1075}
1076EXPORT_SYMBOL(ldlm_cli_cancel);
1077
1078
1079
1080
1081
1082int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
1083 enum ldlm_cancel_flags flags)
1084{
1085 LIST_HEAD(head);
1086 struct ldlm_lock *lock, *next;
1087 int left = 0, bl_ast = 0;
1088 __u64 rc;
1089
1090 left = count;
1091 list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
1092 if (left-- == 0)
1093 break;
1094
1095 if (flags & LCF_LOCAL) {
1096 rc = LDLM_FL_LOCAL_ONLY;
1097 ldlm_lock_cancel(lock);
1098 } else {
1099 rc = ldlm_cli_cancel_local(lock);
1100 }
1101
1102
1103
1104
1105
1106 if (!(flags & LCF_BL_AST) && (rc == LDLM_FL_BL_AST)) {
1107 LDLM_DEBUG(lock, "Cancel lock separately");
1108 list_del_init(&lock->l_bl_ast);
1109 list_add(&lock->l_bl_ast, &head);
1110 bl_ast++;
1111 continue;
1112 }
1113 if (rc == LDLM_FL_LOCAL_ONLY) {
1114
1115 list_del_init(&lock->l_bl_ast);
1116 LDLM_LOCK_RELEASE(lock);
1117 count--;
1118 }
1119 }
1120 if (bl_ast > 0) {
1121 count -= bl_ast;
1122 ldlm_cli_cancel_list(&head, bl_ast, NULL, 0);
1123 }
1124
1125 return count;
1126}
1127EXPORT_SYMBOL(ldlm_cli_cancel_list_local);
1128
1129
1130
1131
1132
1133
1134static ldlm_policy_res_t ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns,
1135 struct ldlm_lock *lock,
1136 int unused, int added,
1137 int count)
1138{
1139 ldlm_policy_res_t result = LDLM_POLICY_CANCEL_LOCK;
1140 ldlm_cancel_for_recovery cb = ns->ns_cancel_for_recovery;
1141
1142 lock_res_and_lock(lock);
1143
1144
1145
1146
1147 switch (lock->l_resource->lr_type) {
1148 case LDLM_EXTENT:
1149 case LDLM_IBITS:
1150 if (cb && cb(lock))
1151 break;
1152 default:
1153 result = LDLM_POLICY_SKIP_LOCK;
1154 lock->l_flags |= LDLM_FL_SKIPPED;
1155 break;
1156 }
1157
1158 unlock_res_and_lock(lock);
1159 return result;
1160}
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
1172 struct ldlm_lock *lock,
1173 int unused, int added,
1174 int count)
1175{
1176 unsigned long cur = cfs_time_current();
1177 struct ldlm_pool *pl = &ns->ns_pool;
1178 __u64 slv, lvf, lv;
1179 unsigned long la;
1180
1181
1182
1183
1184 if (count && added >= count)
1185 return LDLM_POLICY_KEEP_LOCK;
1186
1187 slv = ldlm_pool_get_slv(pl);
1188 lvf = ldlm_pool_get_lvf(pl);
1189 la = cfs_duration_sec(cfs_time_sub(cur,
1190 lock->l_last_used));
1191 lv = lvf * la * unused;
1192
1193
1194 ldlm_pool_set_clv(pl, lv);
1195
1196
1197
1198
1199 return (slv == 0 || lv < slv) ?
1200 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1201}
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212static ldlm_policy_res_t ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
1213 struct ldlm_lock *lock,
1214 int unused, int added,
1215 int count)
1216{
1217
1218
1219
1220 return (added >= count) ?
1221 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1222}
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233static ldlm_policy_res_t ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
1234 struct ldlm_lock *lock,
1235 int unused, int added,
1236 int count)
1237{
1238
1239 return ((added >= count) &&
1240 time_before(cfs_time_current(),
1241 cfs_time_add(lock->l_last_used, ns->ns_max_age))) ?
1242 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1243}
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254static ldlm_policy_res_t ldlm_cancel_default_policy(struct ldlm_namespace *ns,
1255 struct ldlm_lock *lock,
1256 int unused, int added,
1257 int count)
1258{
1259
1260
1261
1262 return (added >= count) ?
1263 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1264}
1265
1266typedef ldlm_policy_res_t (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *,
1267 struct ldlm_lock *, int,
1268 int, int);
1269
1270static ldlm_cancel_lru_policy_t
1271ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
1272{
1273 if (flags & LDLM_CANCEL_NO_WAIT)
1274 return ldlm_cancel_no_wait_policy;
1275
1276 if (ns_connect_lru_resize(ns)) {
1277 if (flags & LDLM_CANCEL_SHRINK)
1278
1279 return ldlm_cancel_passed_policy;
1280 else if (flags & LDLM_CANCEL_LRUR)
1281 return ldlm_cancel_lrur_policy;
1282 else if (flags & LDLM_CANCEL_PASSED)
1283 return ldlm_cancel_passed_policy;
1284 } else {
1285 if (flags & LDLM_CANCEL_AGED)
1286 return ldlm_cancel_aged_policy;
1287 }
1288
1289 return ldlm_cancel_default_policy;
1290}
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
1326 struct list_head *cancels, int count, int max,
1327 int flags)
1328{
1329 ldlm_cancel_lru_policy_t pf;
1330 struct ldlm_lock *lock, *next;
1331 int added = 0, unused, remained;
1332
1333 spin_lock(&ns->ns_lock);
1334 unused = ns->ns_nr_unused;
1335 remained = unused;
1336
1337 if (!ns_connect_lru_resize(ns))
1338 count += unused - ns->ns_max_unused;
1339
1340 pf = ldlm_cancel_lru_policy(ns, flags);
1341 LASSERT(pf);
1342
1343 while (!list_empty(&ns->ns_unused_list)) {
1344 ldlm_policy_res_t result;
1345
1346
1347 if (remained-- <= 0)
1348 break;
1349
1350
1351 if (max && added >= max)
1352 break;
1353
1354 list_for_each_entry_safe(lock, next, &ns->ns_unused_list,
1355 l_lru) {
1356
1357 LASSERT(!(lock->l_flags & LDLM_FL_BL_AST));
1358
1359 if (flags & LDLM_CANCEL_NO_WAIT &&
1360 lock->l_flags & LDLM_FL_SKIPPED)
1361
1362 continue;
1363
1364
1365
1366
1367 if (!(lock->l_flags & LDLM_FL_CANCELING))
1368 break;
1369
1370 ldlm_lock_remove_from_lru_nolock(lock);
1371 }
1372 if (&lock->l_lru == &ns->ns_unused_list)
1373 break;
1374
1375 LDLM_LOCK_GET(lock);
1376 spin_unlock(&ns->ns_lock);
1377 lu_ref_add(&lock->l_reference, __func__, current);
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392 result = pf(ns, lock, unused, added, count);
1393 if (result == LDLM_POLICY_KEEP_LOCK) {
1394 lu_ref_del(&lock->l_reference,
1395 __func__, current);
1396 LDLM_LOCK_RELEASE(lock);
1397 spin_lock(&ns->ns_lock);
1398 break;
1399 }
1400 if (result == LDLM_POLICY_SKIP_LOCK) {
1401 lu_ref_del(&lock->l_reference,
1402 __func__, current);
1403 LDLM_LOCK_RELEASE(lock);
1404 spin_lock(&ns->ns_lock);
1405 continue;
1406 }
1407
1408 lock_res_and_lock(lock);
1409
1410 if ((lock->l_flags & LDLM_FL_CANCELING) ||
1411 (ldlm_lock_remove_from_lru(lock) == 0)) {
1412
1413
1414
1415
1416
1417 unlock_res_and_lock(lock);
1418 lu_ref_del(&lock->l_reference,
1419 __func__, current);
1420 LDLM_LOCK_RELEASE(lock);
1421 spin_lock(&ns->ns_lock);
1422 continue;
1423 }
1424 LASSERT(!lock->l_readers && !lock->l_writers);
1425
1426
1427
1428
1429
1430
1431
1432 lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK;
1433
1434
1435
1436
1437
1438
1439
1440
1441 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING;
1442
1443
1444
1445
1446
1447
1448
1449
1450 LASSERT(list_empty(&lock->l_bl_ast));
1451 list_add(&lock->l_bl_ast, cancels);
1452 unlock_res_and_lock(lock);
1453 lu_ref_del(&lock->l_reference, __func__, current);
1454 spin_lock(&ns->ns_lock);
1455 added++;
1456 unused--;
1457 }
1458 spin_unlock(&ns->ns_lock);
1459 return added;
1460}
1461
1462int ldlm_cancel_lru_local(struct ldlm_namespace *ns,
1463 struct list_head *cancels, int count, int max,
1464 enum ldlm_cancel_flags cancel_flags, int flags)
1465{
1466 int added;
1467
1468 added = ldlm_prepare_lru_list(ns, cancels, count, max, flags);
1469 if (added <= 0)
1470 return added;
1471 return ldlm_cli_cancel_list_local(cancels, added, cancel_flags);
1472}
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
1483 enum ldlm_cancel_flags cancel_flags,
1484 int flags)
1485{
1486 LIST_HEAD(cancels);
1487 int count, rc;
1488
1489
1490
1491
1492 count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, flags);
1493 rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags);
1494 if (rc == 0)
1495 return count;
1496
1497 return 0;
1498}
1499
1500
1501
1502
1503
1504
1505int ldlm_cancel_resource_local(struct ldlm_resource *res,
1506 struct list_head *cancels,
1507 ldlm_policy_data_t *policy,
1508 enum ldlm_mode mode, __u64 lock_flags,
1509 enum ldlm_cancel_flags cancel_flags,
1510 void *opaque)
1511{
1512 struct ldlm_lock *lock;
1513 int count = 0;
1514
1515 lock_res(res);
1516 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
1517 if (opaque && lock->l_ast_data != opaque) {
1518 LDLM_ERROR(lock, "data %p doesn't match opaque %p",
1519 lock->l_ast_data, opaque);
1520 continue;
1521 }
1522
1523 if (lock->l_readers || lock->l_writers)
1524 continue;
1525
1526
1527
1528
1529 if (lock->l_flags & LDLM_FL_BL_AST ||
1530 lock->l_flags & LDLM_FL_CANCELING)
1531 continue;
1532
1533 if (lockmode_compat(lock->l_granted_mode, mode))
1534 continue;
1535
1536
1537
1538
1539 if (policy && (lock->l_resource->lr_type == LDLM_IBITS) &&
1540 !(lock->l_policy_data.l_inodebits.bits &
1541 policy->l_inodebits.bits))
1542 continue;
1543
1544
1545 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
1546 lock_flags;
1547
1548 LASSERT(list_empty(&lock->l_bl_ast));
1549 list_add(&lock->l_bl_ast, cancels);
1550 LDLM_LOCK_GET(lock);
1551 count++;
1552 }
1553 unlock_res(res);
1554
1555 return ldlm_cli_cancel_list_local(cancels, count, cancel_flags);
1556}
1557EXPORT_SYMBOL(ldlm_cancel_resource_local);
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569int ldlm_cli_cancel_list(struct list_head *cancels, int count,
1570 struct ptlrpc_request *req,
1571 enum ldlm_cancel_flags flags)
1572{
1573 struct ldlm_lock *lock;
1574 int res = 0;
1575
1576 if (list_empty(cancels) || count == 0)
1577 return 0;
1578
1579
1580
1581
1582
1583
1584
1585 while (count > 0) {
1586 LASSERT(!list_empty(cancels));
1587 lock = list_entry(cancels->next, struct ldlm_lock,
1588 l_bl_ast);
1589 LASSERT(lock->l_conn_export);
1590
1591 if (exp_connect_cancelset(lock->l_conn_export)) {
1592 res = count;
1593 if (req)
1594 ldlm_cancel_pack(req, cancels, count);
1595 else
1596 res = ldlm_cli_cancel_req(lock->l_conn_export,
1597 cancels, count,
1598 flags);
1599 } else {
1600 res = ldlm_cli_cancel_req(lock->l_conn_export,
1601 cancels, 1, flags);
1602 }
1603
1604 if (res < 0) {
1605 CDEBUG_LIMIT(res == -ESHUTDOWN ? D_DLMTRACE : D_ERROR,
1606 "ldlm_cli_cancel_list: %d\n", res);
1607 res = count;
1608 }
1609
1610 count -= res;
1611 ldlm_lock_list_put(cancels, l_bl_ast, res);
1612 }
1613 LASSERT(count == 0);
1614 return 0;
1615}
1616EXPORT_SYMBOL(ldlm_cli_cancel_list);
1617
1618
1619
1620
1621
1622
1623
1624int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
1625 const struct ldlm_res_id *res_id,
1626 ldlm_policy_data_t *policy,
1627 enum ldlm_mode mode,
1628 enum ldlm_cancel_flags flags,
1629 void *opaque)
1630{
1631 struct ldlm_resource *res;
1632 LIST_HEAD(cancels);
1633 int count;
1634 int rc;
1635
1636 res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
1637 if (!res) {
1638
1639 CDEBUG(D_INFO, "No resource %llu\n", res_id->name[0]);
1640 return 0;
1641 }
1642
1643 LDLM_RESOURCE_ADDREF(res);
1644 count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
1645 0, flags | LCF_BL_AST, opaque);
1646 rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags);
1647 if (rc != ELDLM_OK)
1648 CERROR("canceling unused lock "DLDLMRES": rc = %d\n",
1649 PLDLMRES(res), rc);
1650
1651 LDLM_RESOURCE_DELREF(res);
1652 ldlm_resource_putref(res);
1653 return 0;
1654}
1655EXPORT_SYMBOL(ldlm_cli_cancel_unused_resource);
1656
1657struct ldlm_cli_cancel_arg {
1658 int lc_flags;
1659 void *lc_opaque;
1660};
1661
1662static int ldlm_cli_hash_cancel_unused(struct cfs_hash *hs,
1663 struct cfs_hash_bd *bd,
1664 struct hlist_node *hnode, void *arg)
1665{
1666 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
1667 struct ldlm_cli_cancel_arg *lc = arg;
1668
1669 ldlm_cli_cancel_unused_resource(ldlm_res_to_ns(res), &res->lr_name,
1670 NULL, LCK_MINMODE,
1671 lc->lc_flags, lc->lc_opaque);
1672
1673 return 0;
1674}
1675
1676
1677
1678
1679
1680
1681
1682int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
1683 const struct ldlm_res_id *res_id,
1684 enum ldlm_cancel_flags flags, void *opaque)
1685{
1686 struct ldlm_cli_cancel_arg arg = {
1687 .lc_flags = flags,
1688 .lc_opaque = opaque,
1689 };
1690
1691 if (!ns)
1692 return ELDLM_OK;
1693
1694 if (res_id) {
1695 return ldlm_cli_cancel_unused_resource(ns, res_id, NULL,
1696 LCK_MINMODE, flags,
1697 opaque);
1698 } else {
1699 cfs_hash_for_each_nolock(ns->ns_rs_hash,
1700 ldlm_cli_hash_cancel_unused, &arg);
1701 return ELDLM_OK;
1702 }
1703}
1704EXPORT_SYMBOL(ldlm_cli_cancel_unused);
1705
1706
1707
1708static int ldlm_resource_foreach(struct ldlm_resource *res,
1709 ldlm_iterator_t iter, void *closure)
1710{
1711 struct list_head *tmp, *next;
1712 struct ldlm_lock *lock;
1713 int rc = LDLM_ITER_CONTINUE;
1714
1715 if (!res)
1716 return LDLM_ITER_CONTINUE;
1717
1718 lock_res(res);
1719 list_for_each_safe(tmp, next, &res->lr_granted) {
1720 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1721
1722 if (iter(lock, closure) == LDLM_ITER_STOP) {
1723 rc = LDLM_ITER_STOP;
1724 goto out;
1725 }
1726 }
1727
1728 list_for_each_safe(tmp, next, &res->lr_waiting) {
1729 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1730
1731 if (iter(lock, closure) == LDLM_ITER_STOP) {
1732 rc = LDLM_ITER_STOP;
1733 goto out;
1734 }
1735 }
1736 out:
1737 unlock_res(res);
1738 return rc;
1739}
1740
1741struct iter_helper_data {
1742 ldlm_iterator_t iter;
1743 void *closure;
1744};
1745
1746static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure)
1747{
1748 struct iter_helper_data *helper = closure;
1749
1750 return helper->iter(lock, helper->closure);
1751}
1752
1753static int ldlm_res_iter_helper(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1754 struct hlist_node *hnode, void *arg)
1755
1756{
1757 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
1758
1759 return ldlm_resource_foreach(res, ldlm_iter_helper, arg) ==
1760 LDLM_ITER_STOP;
1761}
1762
1763static void ldlm_namespace_foreach(struct ldlm_namespace *ns,
1764 ldlm_iterator_t iter, void *closure)
1765
1766{
1767 struct iter_helper_data helper = {
1768 .iter = iter,
1769 .closure = closure,
1770 };
1771
1772 cfs_hash_for_each_nolock(ns->ns_rs_hash,
1773 ldlm_res_iter_helper, &helper);
1774
1775}
1776
1777
1778
1779
1780
1781
1782int ldlm_resource_iterate(struct ldlm_namespace *ns,
1783 const struct ldlm_res_id *res_id,
1784 ldlm_iterator_t iter, void *data)
1785{
1786 struct ldlm_resource *res;
1787 int rc;
1788
1789 if (!ns) {
1790 CERROR("must pass in namespace\n");
1791 LBUG();
1792 }
1793
1794 res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
1795 if (!res)
1796 return 0;
1797
1798 LDLM_RESOURCE_ADDREF(res);
1799 rc = ldlm_resource_foreach(res, iter, data);
1800 LDLM_RESOURCE_DELREF(res);
1801 ldlm_resource_putref(res);
1802 return rc;
1803}
1804EXPORT_SYMBOL(ldlm_resource_iterate);
1805
1806
1807
1808static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
1809{
1810 struct list_head *list = closure;
1811
1812
1813 LASSERTF(list_empty(&lock->l_pending_chain),
1814 "lock %p next %p prev %p\n",
1815 lock, &lock->l_pending_chain.next,
1816 &lock->l_pending_chain.prev);
1817
1818
1819
1820
1821 if (!(lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_CANCELING))) {
1822 list_add(&lock->l_pending_chain, list);
1823 LDLM_LOCK_GET(lock);
1824 }
1825
1826 return LDLM_ITER_CONTINUE;
1827}
1828
1829static int replay_lock_interpret(const struct lu_env *env,
1830 struct ptlrpc_request *req,
1831 struct ldlm_async_args *aa, int rc)
1832{
1833 struct ldlm_lock *lock;
1834 struct ldlm_reply *reply;
1835 struct obd_export *exp;
1836
1837 atomic_dec(&req->rq_import->imp_replay_inflight);
1838 if (rc != ELDLM_OK)
1839 goto out;
1840
1841 reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1842 if (!reply) {
1843 rc = -EPROTO;
1844 goto out;
1845 }
1846
1847 lock = ldlm_handle2lock(&aa->lock_handle);
1848 if (!lock) {
1849 CERROR("received replay ack for unknown local cookie %#llx remote cookie %#llx from server %s id %s\n",
1850 aa->lock_handle.cookie, reply->lock_handle.cookie,
1851 req->rq_export->exp_client_uuid.uuid,
1852 libcfs_id2str(req->rq_peer));
1853 rc = -ESTALE;
1854 goto out;
1855 }
1856
1857
1858 exp = req->rq_export;
1859 if (exp && exp->exp_lock_hash) {
1860
1861
1862
1863
1864 cfs_hash_rehash_key(exp->exp_lock_hash,
1865 &lock->l_remote_handle,
1866 &reply->lock_handle,
1867 &lock->l_exp_hash);
1868 } else {
1869 lock->l_remote_handle = reply->lock_handle;
1870 }
1871
1872 LDLM_DEBUG(lock, "replayed lock:");
1873 ptlrpc_import_recovery_state_machine(req->rq_import);
1874 LDLM_LOCK_PUT(lock);
1875out:
1876 if (rc != ELDLM_OK)
1877 ptlrpc_connect_import(req->rq_import);
1878
1879 return rc;
1880}
1881
1882static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
1883{
1884 struct ptlrpc_request *req;
1885 struct ldlm_async_args *aa;
1886 struct ldlm_request *body;
1887 int flags;
1888
1889
1890 if (lock->l_flags & LDLM_FL_CANCELING) {
1891 LDLM_DEBUG(lock, "Not replaying canceled lock:");
1892 return 0;
1893 }
1894
1895
1896
1897
1898
1899 if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) {
1900 LDLM_DEBUG(lock, "Not replaying reply-less lock:");
1901 ldlm_lock_cancel(lock);
1902 return 0;
1903 }
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919 if (lock->l_granted_mode == lock->l_req_mode)
1920 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED;
1921 else if (lock->l_granted_mode)
1922 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_CONV;
1923 else if (!list_empty(&lock->l_res_link))
1924 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT;
1925 else
1926 flags = LDLM_FL_REPLAY;
1927
1928 req = ptlrpc_request_alloc_pack(imp, &RQF_LDLM_ENQUEUE,
1929 LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
1930 if (!req)
1931 return -ENOMEM;
1932
1933
1934 req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
1935
1936 body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1937 ldlm_lock2desc(lock, &body->lock_desc);
1938 body->lock_flags = ldlm_flags_to_wire(flags);
1939
1940 ldlm_lock2handle(lock, &body->lock_handle[0]);
1941 if (lock->l_lvb_len > 0)
1942 req_capsule_extend(&req->rq_pill, &RQF_LDLM_ENQUEUE_LVB);
1943 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
1944 lock->l_lvb_len);
1945 ptlrpc_request_set_replen(req);
1946
1947
1948
1949
1950
1951 lustre_msg_set_flags(req->rq_reqmsg, MSG_REQ_REPLAY_DONE);
1952
1953 LDLM_DEBUG(lock, "replaying lock:");
1954
1955 atomic_inc(&req->rq_import->imp_replay_inflight);
1956 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1957 aa = ptlrpc_req_async_args(req);
1958 aa->lock_handle = body->lock_handle[0];
1959 req->rq_interpret_reply = (ptlrpc_interpterer_t)replay_lock_interpret;
1960 ptlrpcd_add_req(req);
1961
1962 return 0;
1963}
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
1976{
1977 int canceled;
1978 LIST_HEAD(cancels);
1979
1980 CDEBUG(D_DLMTRACE, "Dropping as many unused locks as possible before replay for namespace %s (%d)\n",
1981 ldlm_ns_name(ns), ns->ns_nr_unused);
1982
1983
1984
1985
1986
1987 canceled = ldlm_cancel_lru_local(ns, &cancels, ns->ns_nr_unused, 0,
1988 LCF_LOCAL, LDLM_CANCEL_NO_WAIT);
1989
1990 CDEBUG(D_DLMTRACE, "Canceled %d unused locks from namespace %s\n",
1991 canceled, ldlm_ns_name(ns));
1992}
1993
1994int ldlm_replay_locks(struct obd_import *imp)
1995{
1996 struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
1997 LIST_HEAD(list);
1998 struct ldlm_lock *lock, *next;
1999 int rc = 0;
2000
2001 LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
2002
2003
2004 if (imp->imp_vbr_failed)
2005 return 0;
2006
2007
2008 atomic_inc(&imp->imp_replay_inflight);
2009
2010 if (ldlm_cancel_unused_locks_before_replay)
2011 ldlm_cancel_unused_locks_for_replay(ns);
2012
2013 ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
2014
2015 list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
2016 list_del_init(&lock->l_pending_chain);
2017 if (rc) {
2018 LDLM_LOCK_RELEASE(lock);
2019 continue;
2020 }
2021 rc = replay_one_lock(imp, lock);
2022 LDLM_LOCK_RELEASE(lock);
2023 }
2024
2025 atomic_dec(&imp->imp_replay_inflight);
2026
2027 return rc;
2028}
2029EXPORT_SYMBOL(ldlm_replay_locks);
2030