1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59#define DEBUG_SUBSYSTEM S_LDLM
60
61#include <lustre_errno.h>
62#include <lustre_dlm.h>
63#include <obd_class.h>
64#include <obd.h>
65
66#include "ldlm_internal.h"
67
68unsigned int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
69module_param(ldlm_enqueue_min, uint, 0644);
70MODULE_PARM_DESC(ldlm_enqueue_min, "lock enqueue timeout minimum");
71
72
73unsigned int ldlm_cancel_unused_locks_before_replay = 1;
74
75static void interrupted_completion_wait(void *data)
76{
77}
78
79struct lock_wait_data {
80 struct ldlm_lock *lwd_lock;
81 __u32 lwd_conn_cnt;
82};
83
84struct ldlm_async_args {
85 struct lustre_handle lock_handle;
86};
87
88
89
90
91
92
93
94
95
96
97
98
99
100static int ldlm_request_bufsize(int count, int type)
101{
102 int avail = LDLM_LOCKREQ_HANDLES;
103
104 if (type == LDLM_ENQUEUE)
105 avail -= LDLM_ENQUEUE_CANCEL_OFF;
106
107 if (count > avail)
108 avail = (count - avail) * sizeof(struct lustre_handle);
109 else
110 avail = 0;
111
112 return sizeof(struct ldlm_request) + avail;
113}
114
115static int ldlm_expired_completion_wait(void *data)
116{
117 struct lock_wait_data *lwd = data;
118 struct ldlm_lock *lock = lwd->lwd_lock;
119 struct obd_import *imp;
120 struct obd_device *obd;
121
122 if (!lock->l_conn_export) {
123 static unsigned long next_dump, last_dump;
124
125 LDLM_ERROR(lock,
126 "lock timed out (enqueued at %lld, %llds ago); not entering recovery in server code, just going back to sleep",
127 (s64)lock->l_last_activity,
128 (s64)(ktime_get_real_seconds() -
129 lock->l_last_activity));
130 if (cfs_time_after(cfs_time_current(), next_dump)) {
131 last_dump = next_dump;
132 next_dump = cfs_time_shift(300);
133 ldlm_namespace_dump(D_DLMTRACE,
134 ldlm_lock_to_ns(lock));
135 if (last_dump == 0)
136 libcfs_debug_dumplog();
137 }
138 return 0;
139 }
140
141 obd = lock->l_conn_export->exp_obd;
142 imp = obd->u.cli.cl_import;
143 ptlrpc_fail_import(imp, lwd->lwd_conn_cnt);
144 LDLM_ERROR(lock,
145 "lock timed out (enqueued at %lld, %llds ago), entering recovery for %s@%s",
146 (s64)lock->l_last_activity,
147 (s64)(ktime_get_real_seconds() - lock->l_last_activity),
148 obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);
149
150 return 0;
151}
152
153
154
155
156
157
158
159
160
161
162
163
164
165static unsigned int ldlm_cp_timeout(struct ldlm_lock *lock)
166{
167 unsigned int timeout;
168
169 if (AT_OFF)
170 return obd_timeout;
171
172
173
174
175
176
177 timeout = at_get(ldlm_lock_to_ns_at(lock));
178 return max(3 * timeout, ldlm_enqueue_min);
179}
180
181
182
183
184
185static int ldlm_completion_tail(struct ldlm_lock *lock, void *data)
186{
187 long delay;
188 int result = 0;
189
190 if (ldlm_is_destroyed(lock) || ldlm_is_failed(lock)) {
191 LDLM_DEBUG(lock, "client-side enqueue: destroyed");
192 result = -EIO;
193 } else if (!data) {
194 LDLM_DEBUG(lock, "client-side enqueue: granted");
195 } else {
196
197 delay = ktime_get_real_seconds() - lock->l_last_activity;
198 LDLM_DEBUG(lock, "client-side enqueue: granted after %lds",
199 delay);
200
201
202 at_measured(ldlm_lock_to_ns_at(lock), delay);
203 }
204 return result;
205}
206
207
208
209
210
211
212int ldlm_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data)
213{
214 if (flags == LDLM_FL_WAIT_NOREPROC) {
215 LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock");
216 return 0;
217 }
218
219 if (!(flags & LDLM_FL_BLOCKED_MASK)) {
220 wake_up(&lock->l_waitq);
221 return ldlm_completion_tail(lock, data);
222 }
223
224 LDLM_DEBUG(lock,
225 "client-side enqueue returned a blocked lock, going forward");
226 return 0;
227}
228EXPORT_SYMBOL(ldlm_completion_ast_async);
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251int ldlm_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
252{
253
254 struct lock_wait_data lwd;
255 struct obd_device *obd;
256 struct obd_import *imp = NULL;
257 struct l_wait_info lwi;
258 __u32 timeout;
259 int rc = 0;
260
261 if (flags == LDLM_FL_WAIT_NOREPROC) {
262 LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock");
263 goto noreproc;
264 }
265
266 if (!(flags & LDLM_FL_BLOCKED_MASK)) {
267 wake_up(&lock->l_waitq);
268 return 0;
269 }
270
271 LDLM_DEBUG(lock,
272 "client-side enqueue returned a blocked lock, sleeping");
273
274noreproc:
275
276 obd = class_exp2obd(lock->l_conn_export);
277
278
279 if (obd)
280 imp = obd->u.cli.cl_import;
281
282 timeout = ldlm_cp_timeout(lock);
283
284 lwd.lwd_lock = lock;
285 lock->l_last_activity = ktime_get_real_seconds();
286
287 if (ldlm_is_no_timeout(lock)) {
288 LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
289 lwi = LWI_INTR(interrupted_completion_wait, &lwd);
290 } else {
291 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
292 ldlm_expired_completion_wait,
293 interrupted_completion_wait, &lwd);
294 }
295
296 if (imp) {
297 spin_lock(&imp->imp_lock);
298 lwd.lwd_conn_cnt = imp->imp_conn_cnt;
299 spin_unlock(&imp->imp_lock);
300 }
301
302 if (OBD_FAIL_CHECK_RESET(OBD_FAIL_LDLM_INTR_CP_AST,
303 OBD_FAIL_LDLM_CP_BL_RACE | OBD_FAIL_ONCE)) {
304 ldlm_set_fail_loc(lock);
305 rc = -EINTR;
306 } else {
307
308 rc = l_wait_event(lock->l_waitq,
309 is_granted_or_cancelled(lock), &lwi);
310 }
311
312 if (rc) {
313 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
314 rc);
315 return rc;
316 }
317
318 return ldlm_completion_tail(lock, data);
319}
320EXPORT_SYMBOL(ldlm_completion_ast);
321
322static void failed_lock_cleanup(struct ldlm_namespace *ns,
323 struct ldlm_lock *lock, int mode)
324{
325 int need_cancel = 0;
326
327
328 lock_res_and_lock(lock);
329
330 if ((lock->l_req_mode != lock->l_granted_mode) &&
331 !ldlm_is_failed(lock)) {
332
333
334
335
336 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED |
337 LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING;
338 need_cancel = 1;
339 }
340 unlock_res_and_lock(lock);
341
342 if (need_cancel)
343 LDLM_DEBUG(lock,
344 "setting FL_LOCAL_ONLY | LDLM_FL_FAILED | LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING");
345 else
346 LDLM_DEBUG(lock, "lock was granted or failed in race");
347
348
349
350
351
352
353
354
355
356 if (lock->l_resource->lr_type == LDLM_FLOCK) {
357 lock_res_and_lock(lock);
358 if (!ldlm_is_destroyed(lock)) {
359 ldlm_resource_unlink_lock(lock);
360 ldlm_lock_decref_internal_nolock(lock, mode);
361 ldlm_lock_destroy_nolock(lock);
362 }
363 unlock_res_and_lock(lock);
364 } else {
365 ldlm_lock_decref_internal(lock, mode);
366 }
367}
368
369
370
371
372
373
374int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
375 enum ldlm_type type, __u8 with_policy,
376 enum ldlm_mode mode,
377 __u64 *flags, void *lvb, __u32 lvb_len,
378 const struct lustre_handle *lockh, int rc)
379{
380 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
381 int is_replay = *flags & LDLM_FL_REPLAY;
382 struct ldlm_lock *lock;
383 struct ldlm_reply *reply;
384 int cleanup_phase = 1;
385
386 lock = ldlm_handle2lock(lockh);
387
388 if (!lock) {
389 LASSERT(type == LDLM_FLOCK);
390 return -ENOLCK;
391 }
392
393 LASSERTF(ergo(lvb_len != 0, lvb_len == lock->l_lvb_len),
394 "lvb_len = %d, l_lvb_len = %d\n", lvb_len, lock->l_lvb_len);
395
396 if (rc != ELDLM_OK) {
397 LASSERT(!is_replay);
398 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
399 rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
400
401 if (rc != ELDLM_LOCK_ABORTED)
402 goto cleanup;
403 }
404
405
406 reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
407 if (!reply) {
408 rc = -EPROTO;
409 goto cleanup;
410 }
411
412 if (lvb_len > 0) {
413 int size = 0;
414
415 size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB,
416 RCL_SERVER);
417 if (size < 0) {
418 LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", size);
419 rc = size;
420 goto cleanup;
421 } else if (unlikely(size > lvb_len)) {
422 LDLM_ERROR(lock,
423 "Replied LVB is larger than expectation, expected = %d, replied = %d",
424 lvb_len, size);
425 rc = -EINVAL;
426 goto cleanup;
427 }
428 lvb_len = size;
429 }
430
431 if (rc == ELDLM_LOCK_ABORTED) {
432 if (lvb_len > 0 && lvb)
433 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
434 lvb, lvb_len);
435 if (rc == 0)
436 rc = ELDLM_LOCK_ABORTED;
437 goto cleanup;
438 }
439
440
441 cleanup_phase = 0;
442
443 lock_res_and_lock(lock);
444
445 if (exp->exp_lock_hash) {
446
447
448
449
450 cfs_hash_rehash_key(exp->exp_lock_hash,
451 &lock->l_remote_handle,
452 &reply->lock_handle,
453 &lock->l_exp_hash);
454 } else {
455 lock->l_remote_handle = reply->lock_handle;
456 }
457
458 *flags = ldlm_flags_from_wire(reply->lock_flags);
459 lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
460 LDLM_FL_INHERIT_MASK);
461 unlock_res_and_lock(lock);
462
463 CDEBUG(D_INFO, "local: %p, remote cookie: %#llx, flags: 0x%llx\n",
464 lock, reply->lock_handle.cookie, *flags);
465
466
467
468
469
470 if ((*flags) & LDLM_FL_LOCK_CHANGED) {
471 int newmode = reply->lock_desc.l_req_mode;
472
473 LASSERT(!is_replay);
474 if (newmode && newmode != lock->l_req_mode) {
475 LDLM_DEBUG(lock, "server returned different mode %s",
476 ldlm_lockname[newmode]);
477 lock->l_req_mode = newmode;
478 }
479
480 if (!ldlm_res_eq(&reply->lock_desc.l_resource.lr_name,
481 &lock->l_resource->lr_name)) {
482 CDEBUG(D_INFO,
483 "remote intent success, locking " DLDLMRES " instead of " DLDLMRES "\n",
484 PLDLMRES(&reply->lock_desc.l_resource),
485 PLDLMRES(lock->l_resource));
486
487 rc = ldlm_lock_change_resource(ns, lock,
488 &reply->lock_desc.l_resource.lr_name);
489 if (rc || !lock->l_resource) {
490 rc = -ENOMEM;
491 goto cleanup;
492 }
493 LDLM_DEBUG(lock, "client-side enqueue, new resource");
494 }
495 if (with_policy)
496 if (!(type == LDLM_IBITS &&
497 !(exp_connect_flags(exp) & OBD_CONNECT_IBITS)))
498
499 ldlm_convert_policy_to_local(exp,
500 lock->l_resource->lr_type,
501 &reply->lock_desc.l_policy_data,
502 &lock->l_policy_data);
503 if (type != LDLM_PLAIN)
504 LDLM_DEBUG(lock,
505 "client-side enqueue, new policy data");
506 }
507
508 if ((*flags) & LDLM_FL_AST_SENT) {
509 lock_res_and_lock(lock);
510 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
511 unlock_res_and_lock(lock);
512 LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
513 }
514
515
516
517
518 if (lvb_len > 0) {
519
520
521
522
523
524 lock_res_and_lock(lock);
525 if (lock->l_req_mode != lock->l_granted_mode)
526 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
527 lock->l_lvb_data, lvb_len);
528 unlock_res_and_lock(lock);
529 if (rc < 0) {
530 cleanup_phase = 1;
531 goto cleanup;
532 }
533 }
534
535 if (!is_replay) {
536 rc = ldlm_lock_enqueue(ns, &lock, NULL, flags);
537 if (lock->l_completion_ast) {
538 int err = lock->l_completion_ast(lock, *flags, NULL);
539
540 if (!rc)
541 rc = err;
542 if (rc)
543 cleanup_phase = 1;
544 }
545 }
546
547 if (lvb_len > 0 && lvb) {
548
549
550
551 memcpy(lvb, lock->l_lvb_data, lvb_len);
552 }
553
554 LDLM_DEBUG(lock, "client-side enqueue END");
555cleanup:
556 if (cleanup_phase == 1 && rc)
557 failed_lock_cleanup(ns, lock, mode);
558
559 LDLM_LOCK_PUT(lock);
560 LDLM_LOCK_RELEASE(lock);
561 return rc;
562}
563EXPORT_SYMBOL(ldlm_cli_enqueue_fini);
564
565
566
567
568
569
570
571static inline int ldlm_req_handles_avail(int req_size, int off)
572{
573 int avail;
574
575 avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512) - req_size;
576 if (likely(avail >= 0))
577 avail /= (int)sizeof(struct lustre_handle);
578 else
579 avail = 0;
580 avail += LDLM_LOCKREQ_HANDLES - off;
581
582 return avail;
583}
584
585static inline int ldlm_capsule_handles_avail(struct req_capsule *pill,
586 enum req_location loc,
587 int off)
588{
589 u32 size = req_capsule_msg_size(pill, loc);
590
591 return ldlm_req_handles_avail(size, off);
592}
593
594static inline int ldlm_format_handles_avail(struct obd_import *imp,
595 const struct req_format *fmt,
596 enum req_location loc, int off)
597{
598 u32 size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc);
599
600 return ldlm_req_handles_avail(size, off);
601}
602
603
604
605
606
607
608
609
610
611int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
612 int version, int opc, int canceloff,
613 struct list_head *cancels, int count)
614{
615 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
616 struct req_capsule *pill = &req->rq_pill;
617 struct ldlm_request *dlm = NULL;
618 int flags, avail, to_free, pack = 0;
619 LIST_HEAD(head);
620 int rc;
621
622 if (!cancels)
623 cancels = &head;
624 if (ns_connect_cancelset(ns)) {
625
626 req_capsule_filled_sizes(pill, RCL_CLIENT);
627 avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff);
628
629 flags = ns_connect_lru_resize(ns) ?
630 LDLM_LRU_FLAG_LRUR_NO_WAIT : LDLM_LRU_FLAG_AGED;
631 to_free = !ns_connect_lru_resize(ns) &&
632 opc == LDLM_ENQUEUE ? 1 : 0;
633
634
635
636
637
638 if (avail > count)
639 count += ldlm_cancel_lru_local(ns, cancels, to_free,
640 avail - count, 0, flags);
641 if (avail > count)
642 pack = count;
643 else
644 pack = avail;
645 req_capsule_set_size(pill, &RMF_DLM_REQ, RCL_CLIENT,
646 ldlm_request_bufsize(pack, opc));
647 }
648
649 rc = ptlrpc_request_pack(req, version, opc);
650 if (rc) {
651 ldlm_lock_list_put(cancels, l_bl_ast, count);
652 return rc;
653 }
654
655 if (ns_connect_cancelset(ns)) {
656 if (canceloff) {
657 dlm = req_capsule_client_get(pill, &RMF_DLM_REQ);
658 LASSERT(dlm);
659
660
661
662
663
664 dlm->lock_count = canceloff;
665 }
666
667 ldlm_cli_cancel_list(cancels, pack, req, 0);
668
669 ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);
670 } else {
671 ldlm_lock_list_put(cancels, l_bl_ast, count);
672 }
673 return 0;
674}
675EXPORT_SYMBOL(ldlm_prep_elc_req);
676
677int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req,
678 struct list_head *cancels, int count)
679{
680 return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
681 LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
682}
683EXPORT_SYMBOL(ldlm_prep_enqueue_req);
684
685static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp,
686 int lvb_len)
687{
688 struct ptlrpc_request *req;
689 int rc;
690
691 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
692 if (!req)
693 return ERR_PTR(-ENOMEM);
694
695 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
696 if (rc) {
697 ptlrpc_request_free(req);
698 return ERR_PTR(rc);
699 }
700
701 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
702 ptlrpc_request_set_replen(req);
703 return req;
704}
705
706
707
708
709
710
711
712
713
714
715
716int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
717 struct ldlm_enqueue_info *einfo,
718 const struct ldlm_res_id *res_id,
719 union ldlm_policy_data const *policy, __u64 *flags,
720 void *lvb, __u32 lvb_len, enum lvb_type lvb_type,
721 struct lustre_handle *lockh, int async)
722{
723 struct ldlm_namespace *ns;
724 struct ldlm_lock *lock;
725 struct ldlm_request *body;
726 int is_replay = *flags & LDLM_FL_REPLAY;
727 int req_passed_in = 1;
728 int rc, err;
729 struct ptlrpc_request *req;
730
731 ns = exp->exp_obd->obd_namespace;
732
733
734
735
736 if (is_replay) {
737 lock = ldlm_handle2lock_long(lockh, 0);
738 LASSERT(lock);
739 LDLM_DEBUG(lock, "client-side enqueue START");
740 LASSERT(exp == lock->l_conn_export);
741 } else {
742 const struct ldlm_callback_suite cbs = {
743 .lcs_completion = einfo->ei_cb_cp,
744 .lcs_blocking = einfo->ei_cb_bl,
745 .lcs_glimpse = einfo->ei_cb_gl
746 };
747 lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
748 einfo->ei_mode, &cbs, einfo->ei_cbdata,
749 lvb_len, lvb_type);
750 if (IS_ERR(lock))
751 return PTR_ERR(lock);
752
753 ldlm_lock_addref_internal(lock, einfo->ei_mode);
754 ldlm_lock2handle(lock, lockh);
755 if (policy)
756 lock->l_policy_data = *policy;
757
758 if (einfo->ei_type == LDLM_EXTENT) {
759
760 if (!policy)
761 LBUG();
762
763 lock->l_req_extent = policy->l_extent;
764 }
765 LDLM_DEBUG(lock, "client-side enqueue START, flags %llx",
766 *flags);
767 }
768
769 lock->l_conn_export = exp;
770 lock->l_export = NULL;
771 lock->l_blocking_ast = einfo->ei_cb_bl;
772 lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL));
773 lock->l_last_activity = ktime_get_real_seconds();
774
775
776 if (!reqp || !*reqp) {
777 req = ldlm_enqueue_pack(exp, lvb_len);
778 if (IS_ERR(req)) {
779 failed_lock_cleanup(ns, lock, einfo->ei_mode);
780 LDLM_LOCK_RELEASE(lock);
781 return PTR_ERR(req);
782 }
783
784 req_passed_in = 0;
785 if (reqp)
786 *reqp = req;
787 } else {
788 int len;
789
790 req = *reqp;
791 len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ,
792 RCL_CLIENT);
793 LASSERTF(len >= sizeof(*body), "buflen[%d] = %d, not %d\n",
794 DLM_LOCKREQ_OFF, len, (int)sizeof(*body));
795 }
796
797
798 body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
799 ldlm_lock2desc(lock, &body->lock_desc);
800 body->lock_flags = ldlm_flags_to_wire(*flags);
801 body->lock_handle[0] = *lockh;
802
803 if (async) {
804 LASSERT(reqp);
805 return 0;
806 }
807
808 LDLM_DEBUG(lock, "sending request");
809
810 rc = ptlrpc_queue_wait(req);
811
812 err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
813 einfo->ei_mode, flags, lvb, lvb_len,
814 lockh, rc);
815
816
817
818
819 if (err == -ENOLCK)
820 LDLM_LOCK_RELEASE(lock);
821 else
822 rc = err;
823
824 if (!req_passed_in && req) {
825 ptlrpc_req_finished(req);
826 if (reqp)
827 *reqp = NULL;
828 }
829
830 return rc;
831}
832EXPORT_SYMBOL(ldlm_cli_enqueue);
833
834
835
836
837
838
839
840
841static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock)
842{
843 __u64 rc = LDLM_FL_LOCAL_ONLY;
844
845 if (lock->l_conn_export) {
846 bool local_only;
847
848 LDLM_DEBUG(lock, "client-side cancel");
849
850 lock_res_and_lock(lock);
851 ldlm_set_cbpending(lock);
852 local_only = !!(lock->l_flags &
853 (LDLM_FL_LOCAL_ONLY | LDLM_FL_CANCEL_ON_BLOCK));
854 ldlm_cancel_callback(lock);
855 rc = ldlm_is_bl_ast(lock) ? LDLM_FL_BL_AST : LDLM_FL_CANCELING;
856 unlock_res_and_lock(lock);
857
858 if (local_only) {
859 CDEBUG(D_DLMTRACE,
860 "not sending request (at caller's instruction)\n");
861 rc = LDLM_FL_LOCAL_ONLY;
862 }
863 ldlm_lock_cancel(lock);
864 } else {
865 LDLM_ERROR(lock, "Trying to cancel local lock");
866 LBUG();
867 }
868
869 return rc;
870}
871
872
873
874
875static void ldlm_cancel_pack(struct ptlrpc_request *req,
876 struct list_head *head, int count)
877{
878 struct ldlm_request *dlm;
879 struct ldlm_lock *lock;
880 int max, packed = 0;
881
882 dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
883 LASSERT(dlm);
884
885
886 max = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT) -
887 sizeof(struct ldlm_request);
888 max /= sizeof(struct lustre_handle);
889 max += LDLM_LOCKREQ_HANDLES;
890 LASSERT(max >= dlm->lock_count + count);
891
892
893
894
895
896 list_for_each_entry(lock, head, l_bl_ast) {
897 if (!count--)
898 break;
899 LASSERT(lock->l_conn_export);
900
901 LDLM_DEBUG(lock, "packing");
902 dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle;
903 packed++;
904 }
905 CDEBUG(D_DLMTRACE, "%d locks packed\n", packed);
906}
907
908
909
910
911
912static int ldlm_cli_cancel_req(struct obd_export *exp,
913 struct list_head *cancels,
914 int count, enum ldlm_cancel_flags flags)
915{
916 struct ptlrpc_request *req = NULL;
917 struct obd_import *imp;
918 int free, sent = 0;
919 int rc = 0;
920
921 LASSERT(exp);
922 LASSERT(count > 0);
923
924 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, cfs_fail_val);
925
926 if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
927 return count;
928
929 free = ldlm_format_handles_avail(class_exp2cliimp(exp),
930 &RQF_LDLM_CANCEL, RCL_CLIENT, 0);
931 if (count > free)
932 count = free;
933
934 while (1) {
935 imp = class_exp2cliimp(exp);
936 if (!imp || imp->imp_invalid) {
937 CDEBUG(D_DLMTRACE,
938 "skipping cancel on invalid import %p\n", imp);
939 return count;
940 }
941
942 req = ptlrpc_request_alloc(imp, &RQF_LDLM_CANCEL);
943 if (!req) {
944 rc = -ENOMEM;
945 goto out;
946 }
947
948 req_capsule_filled_sizes(&req->rq_pill, RCL_CLIENT);
949 req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
950 ldlm_request_bufsize(count, LDLM_CANCEL));
951
952 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CANCEL);
953 if (rc) {
954 ptlrpc_request_free(req);
955 goto out;
956 }
957
958 req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
959 req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
960 ptlrpc_at_set_req_timeout(req);
961
962 ldlm_cancel_pack(req, cancels, count);
963
964 ptlrpc_request_set_replen(req);
965 if (flags & LCF_ASYNC) {
966 ptlrpcd_add_req(req);
967 sent = count;
968 goto out;
969 }
970
971 rc = ptlrpc_queue_wait(req);
972 if (rc == LUSTRE_ESTALE) {
973 CDEBUG(D_DLMTRACE,
974 "client/server (nid %s) out of sync -- not fatal\n",
975 libcfs_nid2str(req->rq_import->
976 imp_connection->c_peer.nid));
977 rc = 0;
978 } else if (rc == -ETIMEDOUT &&
979 req->rq_import_generation == imp->imp_generation) {
980 ptlrpc_req_finished(req);
981 continue;
982 } else if (rc != ELDLM_OK) {
983
984 CDEBUG_LIMIT(rc == -ESHUTDOWN ? D_DLMTRACE : D_ERROR,
985 "Got rc %d from cancel RPC: canceling anyway\n",
986 rc);
987 break;
988 }
989 sent = count;
990 break;
991 }
992
993 ptlrpc_req_finished(req);
994out:
995 return sent ? sent : rc;
996}
997
998static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
999{
1000 return &imp->imp_obd->obd_namespace->ns_pool;
1001}
1002
1003
1004
1005
1006int ldlm_cli_update_pool(struct ptlrpc_request *req)
1007{
1008 struct obd_device *obd;
1009 __u64 new_slv;
1010 __u32 new_limit;
1011
1012 if (unlikely(!req->rq_import || !req->rq_import->imp_obd ||
1013 !imp_connect_lru_resize(req->rq_import))) {
1014
1015
1016
1017 return 0;
1018 }
1019
1020
1021
1022
1023
1024
1025
1026 if (lustre_msg_get_slv(req->rq_repmsg) == 0 ||
1027 lustre_msg_get_limit(req->rq_repmsg) == 0) {
1028 DEBUG_REQ(D_HA, req,
1029 "Zero SLV or Limit found (SLV: %llu, Limit: %u)",
1030 lustre_msg_get_slv(req->rq_repmsg),
1031 lustre_msg_get_limit(req->rq_repmsg));
1032 return 0;
1033 }
1034
1035 new_limit = lustre_msg_get_limit(req->rq_repmsg);
1036 new_slv = lustre_msg_get_slv(req->rq_repmsg);
1037 obd = req->rq_import->imp_obd;
1038
1039
1040
1041
1042
1043
1044
1045 write_lock(&obd->obd_pool_lock);
1046 obd->obd_pool_slv = new_slv;
1047 obd->obd_pool_limit = new_limit;
1048 write_unlock(&obd->obd_pool_lock);
1049
1050 return 0;
1051}
1052
1053
1054
1055
1056
1057
1058int ldlm_cli_cancel(const struct lustre_handle *lockh,
1059 enum ldlm_cancel_flags cancel_flags)
1060{
1061 struct obd_export *exp;
1062 int avail, flags, count = 1;
1063 __u64 rc = 0;
1064 struct ldlm_namespace *ns;
1065 struct ldlm_lock *lock;
1066 LIST_HEAD(cancels);
1067
1068 lock = ldlm_handle2lock_long(lockh, 0);
1069 if (!lock) {
1070 LDLM_DEBUG_NOLOCK("lock is already being destroyed");
1071 return 0;
1072 }
1073
1074 lock_res_and_lock(lock);
1075
1076 if (ldlm_is_canceling(lock) && (cancel_flags & LCF_ASYNC)) {
1077 unlock_res_and_lock(lock);
1078 LDLM_LOCK_RELEASE(lock);
1079 return 0;
1080 }
1081
1082 ldlm_set_canceling(lock);
1083 unlock_res_and_lock(lock);
1084
1085 rc = ldlm_cli_cancel_local(lock);
1086 if (rc == LDLM_FL_LOCAL_ONLY || cancel_flags & LCF_LOCAL) {
1087 LDLM_LOCK_RELEASE(lock);
1088 return 0;
1089 }
1090
1091
1092
1093
1094 LASSERT(list_empty(&lock->l_bl_ast));
1095 list_add(&lock->l_bl_ast, &cancels);
1096
1097 exp = lock->l_conn_export;
1098 if (exp_connect_cancelset(exp)) {
1099 avail = ldlm_format_handles_avail(class_exp2cliimp(exp),
1100 &RQF_LDLM_CANCEL,
1101 RCL_CLIENT, 0);
1102 LASSERT(avail > 0);
1103
1104 ns = ldlm_lock_to_ns(lock);
1105 flags = ns_connect_lru_resize(ns) ?
1106 LDLM_LRU_FLAG_LRUR : LDLM_LRU_FLAG_AGED;
1107 count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
1108 LCF_BL_AST, flags);
1109 }
1110 ldlm_cli_cancel_list(&cancels, count, NULL, cancel_flags);
1111 return 0;
1112}
1113EXPORT_SYMBOL(ldlm_cli_cancel);
1114
1115
1116
1117
1118
1119int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
1120 enum ldlm_cancel_flags flags)
1121{
1122 LIST_HEAD(head);
1123 struct ldlm_lock *lock, *next;
1124 int left = 0, bl_ast = 0;
1125 __u64 rc;
1126
1127 left = count;
1128 list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
1129 if (left-- == 0)
1130 break;
1131
1132 if (flags & LCF_LOCAL) {
1133 rc = LDLM_FL_LOCAL_ONLY;
1134 ldlm_lock_cancel(lock);
1135 } else {
1136 rc = ldlm_cli_cancel_local(lock);
1137 }
1138
1139
1140
1141
1142
1143 if (!(flags & LCF_BL_AST) && (rc == LDLM_FL_BL_AST)) {
1144 LDLM_DEBUG(lock, "Cancel lock separately");
1145 list_del_init(&lock->l_bl_ast);
1146 list_add(&lock->l_bl_ast, &head);
1147 bl_ast++;
1148 continue;
1149 }
1150 if (rc == LDLM_FL_LOCAL_ONLY) {
1151
1152 list_del_init(&lock->l_bl_ast);
1153 LDLM_LOCK_RELEASE(lock);
1154 count--;
1155 }
1156 }
1157 if (bl_ast > 0) {
1158 count -= bl_ast;
1159 ldlm_cli_cancel_list(&head, bl_ast, NULL, 0);
1160 }
1161
1162 return count;
1163}
1164
1165
1166
1167
1168
1169
1170static enum ldlm_policy_res
1171ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
1172 int unused, int added, int count)
1173{
1174 enum ldlm_policy_res result = LDLM_POLICY_CANCEL_LOCK;
1175
1176
1177
1178
1179
1180
1181 switch (lock->l_resource->lr_type) {
1182 case LDLM_EXTENT:
1183 case LDLM_IBITS:
1184 if (ns->ns_cancel && ns->ns_cancel(lock) != 0)
1185 break;
1186
1187 default:
1188 result = LDLM_POLICY_SKIP_LOCK;
1189 lock_res_and_lock(lock);
1190 ldlm_set_skipped(lock);
1191 unlock_res_and_lock(lock);
1192 break;
1193 }
1194
1195 return result;
1196}
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207static enum ldlm_policy_res ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
1208 struct ldlm_lock *lock,
1209 int unused, int added,
1210 int count)
1211{
1212 unsigned long cur = cfs_time_current();
1213 struct ldlm_pool *pl = &ns->ns_pool;
1214 __u64 slv, lvf, lv;
1215 unsigned long la;
1216
1217
1218
1219
1220 if (count && added >= count)
1221 return LDLM_POLICY_KEEP_LOCK;
1222
1223
1224
1225
1226
1227 if (cfs_time_after(cfs_time_current(),
1228 cfs_time_add(lock->l_last_used, ns->ns_max_age)))
1229 return LDLM_POLICY_CANCEL_LOCK;
1230
1231 slv = ldlm_pool_get_slv(pl);
1232 lvf = ldlm_pool_get_lvf(pl);
1233 la = cfs_duration_sec(cfs_time_sub(cur, lock->l_last_used));
1234 lv = lvf * la * unused;
1235
1236
1237 ldlm_pool_set_clv(pl, lv);
1238
1239
1240
1241
1242 if (slv == 0 || lv < slv)
1243 return LDLM_POLICY_KEEP_LOCK;
1244
1245 return LDLM_POLICY_CANCEL_LOCK;
1246}
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257static enum ldlm_policy_res ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
1258 struct ldlm_lock *lock,
1259 int unused, int added,
1260 int count)
1261{
1262
1263
1264
1265 return (added >= count) ?
1266 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1267}
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278static enum ldlm_policy_res ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
1279 struct ldlm_lock *lock,
1280 int unused, int added,
1281 int count)
1282{
1283 if ((added >= count) &&
1284 time_before(cfs_time_current(),
1285 cfs_time_add(lock->l_last_used, ns->ns_max_age)))
1286 return LDLM_POLICY_KEEP_LOCK;
1287
1288 return LDLM_POLICY_CANCEL_LOCK;
1289}
1290
1291static enum ldlm_policy_res
1292ldlm_cancel_lrur_no_wait_policy(struct ldlm_namespace *ns,
1293 struct ldlm_lock *lock,
1294 int unused, int added,
1295 int count)
1296{
1297 enum ldlm_policy_res result;
1298
1299 result = ldlm_cancel_lrur_policy(ns, lock, unused, added, count);
1300 if (result == LDLM_POLICY_KEEP_LOCK)
1301 return result;
1302
1303 return ldlm_cancel_no_wait_policy(ns, lock, unused, added, count);
1304}
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315static enum ldlm_policy_res
1316ldlm_cancel_default_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
1317 int unused, int added, int count)
1318{
1319
1320
1321
1322 return (added >= count) ?
1323 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1324}
1325
1326typedef enum ldlm_policy_res (*ldlm_cancel_lru_policy_t)(
1327 struct ldlm_namespace *,
1328 struct ldlm_lock *, int,
1329 int, int);
1330
1331static ldlm_cancel_lru_policy_t
1332ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
1333{
1334 if (flags & LDLM_LRU_FLAG_NO_WAIT)
1335 return ldlm_cancel_no_wait_policy;
1336
1337 if (ns_connect_lru_resize(ns)) {
1338 if (flags & LDLM_LRU_FLAG_SHRINK)
1339
1340 return ldlm_cancel_passed_policy;
1341 else if (flags & LDLM_LRU_FLAG_LRUR)
1342 return ldlm_cancel_lrur_policy;
1343 else if (flags & LDLM_LRU_FLAG_PASSED)
1344 return ldlm_cancel_passed_policy;
1345 else if (flags & LDLM_LRU_FLAG_LRUR_NO_WAIT)
1346 return ldlm_cancel_lrur_no_wait_policy;
1347 } else {
1348 if (flags & LDLM_LRU_FLAG_AGED)
1349 return ldlm_cancel_aged_policy;
1350 }
1351
1352 return ldlm_cancel_default_policy;
1353}
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
1390 struct list_head *cancels, int count, int max,
1391 int flags)
1392{
1393 ldlm_cancel_lru_policy_t pf;
1394 struct ldlm_lock *lock, *next;
1395 int added = 0, unused, remained;
1396 int no_wait = flags &
1397 (LDLM_LRU_FLAG_NO_WAIT | LDLM_LRU_FLAG_LRUR_NO_WAIT);
1398
1399 spin_lock(&ns->ns_lock);
1400 unused = ns->ns_nr_unused;
1401 remained = unused;
1402
1403 if (!ns_connect_lru_resize(ns))
1404 count += unused - ns->ns_max_unused;
1405
1406 pf = ldlm_cancel_lru_policy(ns, flags);
1407 LASSERT(pf);
1408
1409 while (!list_empty(&ns->ns_unused_list)) {
1410 enum ldlm_policy_res result;
1411 time_t last_use = 0;
1412
1413
1414 if (remained-- <= 0)
1415 break;
1416
1417
1418 if (max && added >= max)
1419 break;
1420
1421 list_for_each_entry_safe(lock, next, &ns->ns_unused_list,
1422 l_lru) {
1423
1424 LASSERT(!ldlm_is_bl_ast(lock));
1425
1426 if (no_wait && ldlm_is_skipped(lock))
1427
1428 continue;
1429
1430 last_use = lock->l_last_used;
1431 if (last_use == cfs_time_current())
1432 continue;
1433
1434
1435
1436
1437 if (!ldlm_is_canceling(lock))
1438 break;
1439
1440 ldlm_lock_remove_from_lru_nolock(lock);
1441 }
1442 if (&lock->l_lru == &ns->ns_unused_list)
1443 break;
1444
1445 LDLM_LOCK_GET(lock);
1446 spin_unlock(&ns->ns_lock);
1447 lu_ref_add(&lock->l_reference, __func__, current);
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463 result = pf(ns, lock, unused, added, count);
1464 if (result == LDLM_POLICY_KEEP_LOCK) {
1465 lu_ref_del(&lock->l_reference,
1466 __func__, current);
1467 LDLM_LOCK_RELEASE(lock);
1468 spin_lock(&ns->ns_lock);
1469 break;
1470 }
1471 if (result == LDLM_POLICY_SKIP_LOCK) {
1472 lu_ref_del(&lock->l_reference,
1473 __func__, current);
1474 LDLM_LOCK_RELEASE(lock);
1475 spin_lock(&ns->ns_lock);
1476 continue;
1477 }
1478
1479 lock_res_and_lock(lock);
1480
1481 if (ldlm_is_canceling(lock) ||
1482 (ldlm_lock_remove_from_lru_check(lock, last_use) == 0)) {
1483
1484
1485
1486
1487
1488
1489
1490 unlock_res_and_lock(lock);
1491 lu_ref_del(&lock->l_reference,
1492 __func__, current);
1493 LDLM_LOCK_RELEASE(lock);
1494 spin_lock(&ns->ns_lock);
1495 continue;
1496 }
1497 LASSERT(!lock->l_readers && !lock->l_writers);
1498
1499
1500
1501
1502
1503
1504
1505 ldlm_clear_cancel_on_block(lock);
1506
1507
1508
1509
1510
1511
1512
1513
1514 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING;
1515
1516
1517
1518
1519
1520
1521
1522
1523 LASSERT(list_empty(&lock->l_bl_ast));
1524 list_add(&lock->l_bl_ast, cancels);
1525 unlock_res_and_lock(lock);
1526 lu_ref_del(&lock->l_reference, __func__, current);
1527 spin_lock(&ns->ns_lock);
1528 added++;
1529 unused--;
1530 }
1531 spin_unlock(&ns->ns_lock);
1532 return added;
1533}
1534
1535int ldlm_cancel_lru_local(struct ldlm_namespace *ns,
1536 struct list_head *cancels, int count, int max,
1537 enum ldlm_cancel_flags cancel_flags, int flags)
1538{
1539 int added;
1540
1541 added = ldlm_prepare_lru_list(ns, cancels, count, max, flags);
1542 if (added <= 0)
1543 return added;
1544 return ldlm_cli_cancel_list_local(cancels, added, cancel_flags);
1545}
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
1556 enum ldlm_cancel_flags cancel_flags,
1557 int flags)
1558{
1559 LIST_HEAD(cancels);
1560 int count, rc;
1561
1562
1563
1564
1565 count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, flags);
1566 rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags);
1567 if (rc == 0)
1568 return count;
1569
1570 return 0;
1571}
1572
1573
1574
1575
1576
1577
1578int ldlm_cancel_resource_local(struct ldlm_resource *res,
1579 struct list_head *cancels,
1580 union ldlm_policy_data *policy,
1581 enum ldlm_mode mode, __u64 lock_flags,
1582 enum ldlm_cancel_flags cancel_flags,
1583 void *opaque)
1584{
1585 struct ldlm_lock *lock;
1586 int count = 0;
1587
1588 lock_res(res);
1589 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
1590 if (opaque && lock->l_ast_data != opaque) {
1591 LDLM_ERROR(lock, "data %p doesn't match opaque %p",
1592 lock->l_ast_data, opaque);
1593 continue;
1594 }
1595
1596 if (lock->l_readers || lock->l_writers)
1597 continue;
1598
1599
1600
1601
1602 if (ldlm_is_bl_ast(lock) || ldlm_is_canceling(lock))
1603 continue;
1604
1605 if (lockmode_compat(lock->l_granted_mode, mode))
1606 continue;
1607
1608
1609
1610
1611 if (policy && (lock->l_resource->lr_type == LDLM_IBITS) &&
1612 !(lock->l_policy_data.l_inodebits.bits &
1613 policy->l_inodebits.bits))
1614 continue;
1615
1616
1617 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
1618 lock_flags;
1619
1620 LASSERT(list_empty(&lock->l_bl_ast));
1621 list_add(&lock->l_bl_ast, cancels);
1622 LDLM_LOCK_GET(lock);
1623 count++;
1624 }
1625 unlock_res(res);
1626
1627 return ldlm_cli_cancel_list_local(cancels, count, cancel_flags);
1628}
1629EXPORT_SYMBOL(ldlm_cancel_resource_local);
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641int ldlm_cli_cancel_list(struct list_head *cancels, int count,
1642 struct ptlrpc_request *req,
1643 enum ldlm_cancel_flags flags)
1644{
1645 struct ldlm_lock *lock;
1646 int res = 0;
1647
1648 if (list_empty(cancels) || count == 0)
1649 return 0;
1650
1651
1652
1653
1654
1655
1656
1657 while (count > 0) {
1658 LASSERT(!list_empty(cancels));
1659 lock = list_first_entry(cancels, struct ldlm_lock, l_bl_ast);
1660 LASSERT(lock->l_conn_export);
1661
1662 if (exp_connect_cancelset(lock->l_conn_export)) {
1663 res = count;
1664 if (req)
1665 ldlm_cancel_pack(req, cancels, count);
1666 else
1667 res = ldlm_cli_cancel_req(lock->l_conn_export,
1668 cancels, count,
1669 flags);
1670 } else {
1671 res = ldlm_cli_cancel_req(lock->l_conn_export,
1672 cancels, 1, flags);
1673 }
1674
1675 if (res < 0) {
1676 CDEBUG_LIMIT(res == -ESHUTDOWN ? D_DLMTRACE : D_ERROR,
1677 "%s: %d\n", __func__, res);
1678 res = count;
1679 }
1680
1681 count -= res;
1682 ldlm_lock_list_put(cancels, l_bl_ast, res);
1683 }
1684 LASSERT(count == 0);
1685 return 0;
1686}
1687EXPORT_SYMBOL(ldlm_cli_cancel_list);
1688
1689
1690
1691
1692
1693
1694
1695int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
1696 const struct ldlm_res_id *res_id,
1697 union ldlm_policy_data *policy,
1698 enum ldlm_mode mode,
1699 enum ldlm_cancel_flags flags,
1700 void *opaque)
1701{
1702 struct ldlm_resource *res;
1703 LIST_HEAD(cancels);
1704 int count;
1705 int rc;
1706
1707 res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
1708 if (IS_ERR(res)) {
1709
1710 CDEBUG(D_INFO, "No resource %llu\n", res_id->name[0]);
1711 return 0;
1712 }
1713
1714 LDLM_RESOURCE_ADDREF(res);
1715 count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
1716 0, flags | LCF_BL_AST, opaque);
1717 rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags);
1718 if (rc != ELDLM_OK)
1719 CERROR("canceling unused lock " DLDLMRES ": rc = %d\n",
1720 PLDLMRES(res), rc);
1721
1722 LDLM_RESOURCE_DELREF(res);
1723 ldlm_resource_putref(res);
1724 return 0;
1725}
1726EXPORT_SYMBOL(ldlm_cli_cancel_unused_resource);
1727
1728struct ldlm_cli_cancel_arg {
1729 int lc_flags;
1730 void *lc_opaque;
1731};
1732
1733static int ldlm_cli_hash_cancel_unused(struct cfs_hash *hs,
1734 struct cfs_hash_bd *bd,
1735 struct hlist_node *hnode, void *arg)
1736{
1737 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
1738 struct ldlm_cli_cancel_arg *lc = arg;
1739
1740 ldlm_cli_cancel_unused_resource(ldlm_res_to_ns(res), &res->lr_name,
1741 NULL, LCK_MINMODE,
1742 lc->lc_flags, lc->lc_opaque);
1743
1744 return 0;
1745}
1746
1747
1748
1749
1750
1751
1752
1753
1754int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
1755 const struct ldlm_res_id *res_id,
1756 enum ldlm_cancel_flags flags, void *opaque)
1757{
1758 struct ldlm_cli_cancel_arg arg = {
1759 .lc_flags = flags,
1760 .lc_opaque = opaque,
1761 };
1762
1763 if (!ns)
1764 return ELDLM_OK;
1765
1766 if (res_id) {
1767 return ldlm_cli_cancel_unused_resource(ns, res_id, NULL,
1768 LCK_MINMODE, flags,
1769 opaque);
1770 } else {
1771 cfs_hash_for_each_nolock(ns->ns_rs_hash,
1772 ldlm_cli_hash_cancel_unused, &arg, 0);
1773 return ELDLM_OK;
1774 }
1775}
1776EXPORT_SYMBOL(ldlm_cli_cancel_unused);
1777
1778
1779
1780static int ldlm_resource_foreach(struct ldlm_resource *res,
1781 ldlm_iterator_t iter, void *closure)
1782{
1783 struct ldlm_lock *tmp;
1784 struct ldlm_lock *lock;
1785 int rc = LDLM_ITER_CONTINUE;
1786
1787 if (!res)
1788 return LDLM_ITER_CONTINUE;
1789
1790 lock_res(res);
1791 list_for_each_entry_safe(lock, tmp, &res->lr_granted, l_res_link) {
1792 if (iter(lock, closure) == LDLM_ITER_STOP) {
1793 rc = LDLM_ITER_STOP;
1794 goto out;
1795 }
1796 }
1797
1798 list_for_each_entry_safe(lock, tmp, &res->lr_waiting, l_res_link) {
1799 if (iter(lock, closure) == LDLM_ITER_STOP) {
1800 rc = LDLM_ITER_STOP;
1801 goto out;
1802 }
1803 }
1804 out:
1805 unlock_res(res);
1806 return rc;
1807}
1808
1809struct iter_helper_data {
1810 ldlm_iterator_t iter;
1811 void *closure;
1812};
1813
1814static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure)
1815{
1816 struct iter_helper_data *helper = closure;
1817
1818 return helper->iter(lock, helper->closure);
1819}
1820
1821static int ldlm_res_iter_helper(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1822 struct hlist_node *hnode, void *arg)
1823
1824{
1825 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
1826
1827 return ldlm_resource_foreach(res, ldlm_iter_helper, arg) ==
1828 LDLM_ITER_STOP;
1829}
1830
1831static void ldlm_namespace_foreach(struct ldlm_namespace *ns,
1832 ldlm_iterator_t iter, void *closure)
1833
1834{
1835 struct iter_helper_data helper = {
1836 .iter = iter,
1837 .closure = closure,
1838 };
1839
1840 cfs_hash_for_each_nolock(ns->ns_rs_hash,
1841 ldlm_res_iter_helper, &helper, 0);
1842}
1843
1844
1845
1846
1847
1848
1849int ldlm_resource_iterate(struct ldlm_namespace *ns,
1850 const struct ldlm_res_id *res_id,
1851 ldlm_iterator_t iter, void *data)
1852{
1853 struct ldlm_resource *res;
1854 int rc;
1855
1856 LASSERTF(ns, "must pass in namespace\n");
1857
1858 res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
1859 if (IS_ERR(res))
1860 return 0;
1861
1862 LDLM_RESOURCE_ADDREF(res);
1863 rc = ldlm_resource_foreach(res, iter, data);
1864 LDLM_RESOURCE_DELREF(res);
1865 ldlm_resource_putref(res);
1866 return rc;
1867}
1868EXPORT_SYMBOL(ldlm_resource_iterate);
1869
1870
1871
1872static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
1873{
1874 struct list_head *list = closure;
1875
1876
1877 LASSERTF(list_empty(&lock->l_pending_chain),
1878 "lock %p next %p prev %p\n",
1879 lock, &lock->l_pending_chain.next,
1880 &lock->l_pending_chain.prev);
1881
1882
1883
1884
1885 if (!(lock->l_flags & (LDLM_FL_FAILED | LDLM_FL_BL_DONE))) {
1886 list_add(&lock->l_pending_chain, list);
1887 LDLM_LOCK_GET(lock);
1888 }
1889
1890 return LDLM_ITER_CONTINUE;
1891}
1892
1893static int replay_lock_interpret(const struct lu_env *env,
1894 struct ptlrpc_request *req,
1895 struct ldlm_async_args *aa, int rc)
1896{
1897 struct ldlm_lock *lock;
1898 struct ldlm_reply *reply;
1899 struct obd_export *exp;
1900
1901 atomic_dec(&req->rq_import->imp_replay_inflight);
1902 if (rc != ELDLM_OK)
1903 goto out;
1904
1905 reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1906 if (!reply) {
1907 rc = -EPROTO;
1908 goto out;
1909 }
1910
1911 lock = ldlm_handle2lock(&aa->lock_handle);
1912 if (!lock) {
1913 CERROR("received replay ack for unknown local cookie %#llx remote cookie %#llx from server %s id %s\n",
1914 aa->lock_handle.cookie, reply->lock_handle.cookie,
1915 req->rq_export->exp_client_uuid.uuid,
1916 libcfs_id2str(req->rq_peer));
1917 rc = -ESTALE;
1918 goto out;
1919 }
1920
1921
1922 exp = req->rq_export;
1923 if (exp && exp->exp_lock_hash) {
1924
1925
1926
1927
1928 cfs_hash_rehash_key(exp->exp_lock_hash,
1929 &lock->l_remote_handle,
1930 &reply->lock_handle,
1931 &lock->l_exp_hash);
1932 } else {
1933 lock->l_remote_handle = reply->lock_handle;
1934 }
1935
1936 LDLM_DEBUG(lock, "replayed lock:");
1937 ptlrpc_import_recovery_state_machine(req->rq_import);
1938 LDLM_LOCK_PUT(lock);
1939out:
1940 if (rc != ELDLM_OK)
1941 ptlrpc_connect_import(req->rq_import);
1942
1943 return rc;
1944}
1945
1946static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
1947{
1948 struct ptlrpc_request *req;
1949 struct ldlm_async_args *aa;
1950 struct ldlm_request *body;
1951 int flags;
1952
1953
1954 if (ldlm_is_bl_done(lock)) {
1955 LDLM_DEBUG(lock, "Not replaying canceled lock:");
1956 return 0;
1957 }
1958
1959
1960
1961
1962
1963 if (ldlm_is_cancel_on_block(lock)) {
1964 LDLM_DEBUG(lock, "Not replaying reply-less lock:");
1965 ldlm_lock_cancel(lock);
1966 return 0;
1967 }
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983 if (lock->l_granted_mode == lock->l_req_mode)
1984 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED;
1985 else if (lock->l_granted_mode)
1986 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_CONV;
1987 else if (!list_empty(&lock->l_res_link))
1988 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT;
1989 else
1990 flags = LDLM_FL_REPLAY;
1991
1992 req = ptlrpc_request_alloc_pack(imp, &RQF_LDLM_ENQUEUE,
1993 LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
1994 if (!req)
1995 return -ENOMEM;
1996
1997
1998 req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
1999
2000 body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2001 ldlm_lock2desc(lock, &body->lock_desc);
2002 body->lock_flags = ldlm_flags_to_wire(flags);
2003
2004 ldlm_lock2handle(lock, &body->lock_handle[0]);
2005 if (lock->l_lvb_len > 0)
2006 req_capsule_extend(&req->rq_pill, &RQF_LDLM_ENQUEUE_LVB);
2007 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2008 lock->l_lvb_len);
2009 ptlrpc_request_set_replen(req);
2010
2011
2012
2013
2014
2015 lustre_msg_set_flags(req->rq_reqmsg, MSG_REQ_REPLAY_DONE);
2016
2017 LDLM_DEBUG(lock, "replaying lock:");
2018
2019 atomic_inc(&req->rq_import->imp_replay_inflight);
2020 BUILD_BUG_ON(sizeof(*aa) > sizeof(req->rq_async_args));
2021 aa = ptlrpc_req_async_args(req);
2022 aa->lock_handle = body->lock_handle[0];
2023 req->rq_interpret_reply = (ptlrpc_interpterer_t)replay_lock_interpret;
2024 ptlrpcd_add_req(req);
2025
2026 return 0;
2027}
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
2040{
2041 int canceled;
2042 LIST_HEAD(cancels);
2043
2044 CDEBUG(D_DLMTRACE,
2045 "Dropping as many unused locks as possible before replay for namespace %s (%d)\n",
2046 ldlm_ns_name(ns), ns->ns_nr_unused);
2047
2048
2049
2050
2051
2052 canceled = ldlm_cancel_lru_local(ns, &cancels, ns->ns_nr_unused, 0,
2053 LCF_LOCAL, LDLM_LRU_FLAG_NO_WAIT);
2054
2055 CDEBUG(D_DLMTRACE, "Canceled %d unused locks from namespace %s\n",
2056 canceled, ldlm_ns_name(ns));
2057}
2058
2059int ldlm_replay_locks(struct obd_import *imp)
2060{
2061 struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
2062 LIST_HEAD(list);
2063 struct ldlm_lock *lock, *next;
2064 int rc = 0;
2065
2066 LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
2067
2068
2069 if (imp->imp_vbr_failed)
2070 return 0;
2071
2072
2073 atomic_inc(&imp->imp_replay_inflight);
2074
2075 if (ldlm_cancel_unused_locks_before_replay)
2076 ldlm_cancel_unused_locks_for_replay(ns);
2077
2078 ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
2079
2080 list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
2081 list_del_init(&lock->l_pending_chain);
2082 if (rc) {
2083 LDLM_LOCK_RELEASE(lock);
2084 continue;
2085 }
2086 rc = replay_one_lock(imp, lock);
2087 LDLM_LOCK_RELEASE(lock);
2088 }
2089
2090 atomic_dec(&imp->imp_replay_inflight);
2091
2092 return rc;
2093}
2094