1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62#define DEBUG_SUBSYSTEM S_LDLM
63
64#include <lustre_dlm.h>
65#include <obd_class.h>
66#include <obd.h>
67
68#include "ldlm_internal.h"
69
70int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
71CFS_MODULE_PARM(ldlm_enqueue_min, "i", int, 0644,
72 "lock enqueue timeout minimum");
73
74
75unsigned int ldlm_cancel_unused_locks_before_replay = 1;
76
77static void interrupted_completion_wait(void *data)
78{
79}
80
81struct lock_wait_data {
82 struct ldlm_lock *lwd_lock;
83 __u32 lwd_conn_cnt;
84};
85
86struct ldlm_async_args {
87 struct lustre_handle lock_handle;
88};
89
90int ldlm_expired_completion_wait(void *data)
91{
92 struct lock_wait_data *lwd = data;
93 struct ldlm_lock *lock = lwd->lwd_lock;
94 struct obd_import *imp;
95 struct obd_device *obd;
96
97 ENTRY;
98 if (lock->l_conn_export == NULL) {
99 static cfs_time_t next_dump = 0, last_dump = 0;
100
101 if (ptlrpc_check_suspend())
102 RETURN(0);
103
104 LCONSOLE_WARN("lock timed out (enqueued at "CFS_TIME_T", "
105 CFS_DURATION_T"s ago)\n",
106 lock->l_last_activity,
107 cfs_time_sub(cfs_time_current_sec(),
108 lock->l_last_activity));
109 LDLM_DEBUG(lock, "lock timed out (enqueued at "CFS_TIME_T", "
110 CFS_DURATION_T"s ago); not entering recovery in "
111 "server code, just going back to sleep",
112 lock->l_last_activity,
113 cfs_time_sub(cfs_time_current_sec(),
114 lock->l_last_activity));
115 if (cfs_time_after(cfs_time_current(), next_dump)) {
116 last_dump = next_dump;
117 next_dump = cfs_time_shift(300);
118 ldlm_namespace_dump(D_DLMTRACE,
119 ldlm_lock_to_ns(lock));
120 if (last_dump == 0)
121 libcfs_debug_dumplog();
122 }
123 RETURN(0);
124 }
125
126 obd = lock->l_conn_export->exp_obd;
127 imp = obd->u.cli.cl_import;
128 ptlrpc_fail_import(imp, lwd->lwd_conn_cnt);
129 LDLM_ERROR(lock, "lock timed out (enqueued at "CFS_TIME_T", "
130 CFS_DURATION_T"s ago), entering recovery for %s@%s",
131 lock->l_last_activity,
132 cfs_time_sub(cfs_time_current_sec(), lock->l_last_activity),
133 obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);
134
135 RETURN(0);
136}
137EXPORT_SYMBOL(ldlm_expired_completion_wait);
138
139
140
141int ldlm_get_enq_timeout(struct ldlm_lock *lock)
142{
143 int timeout = at_get(ldlm_lock_to_ns_at(lock));
144 if (AT_OFF)
145 return obd_timeout / 2;
146
147
148
149 timeout = min_t(int, at_max, timeout + (timeout >> 1));
150 return max(timeout, ldlm_enqueue_min);
151}
152EXPORT_SYMBOL(ldlm_get_enq_timeout);
153
154
155
156
157
158static int ldlm_completion_tail(struct ldlm_lock *lock)
159{
160 long delay;
161 int result;
162
163 if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED) {
164 LDLM_DEBUG(lock, "client-side enqueue: destroyed");
165 result = -EIO;
166 } else {
167 delay = cfs_time_sub(cfs_time_current_sec(),
168 lock->l_last_activity);
169 LDLM_DEBUG(lock, "client-side enqueue: granted after "
170 CFS_DURATION_T"s", delay);
171
172
173 at_measured(ldlm_lock_to_ns_at(lock),
174 delay);
175 result = 0;
176 }
177 return result;
178}
179
180
181
182
183
184
185int ldlm_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data)
186{
187 ENTRY;
188
189 if (flags == LDLM_FL_WAIT_NOREPROC) {
190 LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock");
191 RETURN(0);
192 }
193
194 if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
195 LDLM_FL_BLOCK_CONV))) {
196 wake_up(&lock->l_waitq);
197 RETURN(ldlm_completion_tail(lock));
198 }
199
200 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
201 "going forward");
202 ldlm_reprocess_all(lock->l_resource);
203 RETURN(0);
204}
205EXPORT_SYMBOL(ldlm_completion_ast_async);
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228int ldlm_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
229{
230
231 struct lock_wait_data lwd;
232 struct obd_device *obd;
233 struct obd_import *imp = NULL;
234 struct l_wait_info lwi;
235 __u32 timeout;
236 int rc = 0;
237 ENTRY;
238
239 if (flags == LDLM_FL_WAIT_NOREPROC) {
240 LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock");
241 goto noreproc;
242 }
243
244 if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
245 LDLM_FL_BLOCK_CONV))) {
246 wake_up(&lock->l_waitq);
247 RETURN(0);
248 }
249
250 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
251 "sleeping");
252
253noreproc:
254
255 obd = class_exp2obd(lock->l_conn_export);
256
257
258 if (obd != NULL) {
259 imp = obd->u.cli.cl_import;
260 }
261
262
263
264
265 timeout = ldlm_get_enq_timeout(lock) * 2;
266
267 lwd.lwd_lock = lock;
268
269 if (lock->l_flags & LDLM_FL_NO_TIMEOUT) {
270 LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
271 lwi = LWI_INTR(interrupted_completion_wait, &lwd);
272 } else {
273 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
274 ldlm_expired_completion_wait,
275 interrupted_completion_wait, &lwd);
276 }
277
278 if (imp != NULL) {
279 spin_lock(&imp->imp_lock);
280 lwd.lwd_conn_cnt = imp->imp_conn_cnt;
281 spin_unlock(&imp->imp_lock);
282 }
283
284 if (ns_is_client(ldlm_lock_to_ns(lock)) &&
285 OBD_FAIL_CHECK_RESET(OBD_FAIL_LDLM_INTR_CP_AST,
286 OBD_FAIL_LDLM_CP_BL_RACE | OBD_FAIL_ONCE)) {
287 lock->l_flags |= LDLM_FL_FAIL_LOC;
288 rc = -EINTR;
289 } else {
290
291 rc = l_wait_event(lock->l_waitq,
292 is_granted_or_cancelled(lock), &lwi);
293 }
294
295 if (rc) {
296 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
297 rc);
298 RETURN(rc);
299 }
300
301 RETURN(ldlm_completion_tail(lock));
302}
303EXPORT_SYMBOL(ldlm_completion_ast);
304
305
306
307
308
309
310
311
312
313
314
315
316int ldlm_blocking_ast_nocheck(struct ldlm_lock *lock)
317{
318 int do_ast;
319 ENTRY;
320
321 lock->l_flags |= LDLM_FL_CBPENDING;
322 do_ast = (!lock->l_readers && !lock->l_writers);
323 unlock_res_and_lock(lock);
324
325 if (do_ast) {
326 struct lustre_handle lockh;
327 int rc;
328
329 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
330 ldlm_lock2handle(lock, &lockh);
331 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
332 if (rc < 0)
333 CERROR("ldlm_cli_cancel: %d\n", rc);
334 } else {
335 LDLM_DEBUG(lock, "Lock still has references, will be "
336 "cancelled later");
337 }
338 RETURN(0);
339}
340EXPORT_SYMBOL(ldlm_blocking_ast_nocheck);
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
356 void *data, int flag)
357{
358 ENTRY;
359
360 if (flag == LDLM_CB_CANCELING) {
361
362 RETURN(0);
363 }
364
365 lock_res_and_lock(lock);
366
367
368
369
370
371 if (lock->l_blocking_ast != ldlm_blocking_ast) {
372 unlock_res_and_lock(lock);
373 RETURN(0);
374 }
375 RETURN(ldlm_blocking_ast_nocheck(lock));
376}
377EXPORT_SYMBOL(ldlm_blocking_ast);
378
379
380
381
382
383int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp)
384{
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403 return -ELDLM_NO_LOCK_DATA;
404}
405EXPORT_SYMBOL(ldlm_glimpse_ast);
406
407
408
409
410int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
411 const struct ldlm_res_id *res_id,
412 ldlm_type_t type, ldlm_policy_data_t *policy,
413 ldlm_mode_t mode, __u64 *flags,
414 ldlm_blocking_callback blocking,
415 ldlm_completion_callback completion,
416 ldlm_glimpse_callback glimpse,
417 void *data, __u32 lvb_len, enum lvb_type lvb_type,
418 const __u64 *client_cookie,
419 struct lustre_handle *lockh)
420{
421 struct ldlm_lock *lock;
422 int err;
423 const struct ldlm_callback_suite cbs = { .lcs_completion = completion,
424 .lcs_blocking = blocking,
425 .lcs_glimpse = glimpse,
426 };
427 ENTRY;
428
429 LASSERT(!(*flags & LDLM_FL_REPLAY));
430 if (unlikely(ns_is_client(ns))) {
431 CERROR("Trying to enqueue local lock in a shadow namespace\n");
432 LBUG();
433 }
434
435 lock = ldlm_lock_create(ns, res_id, type, mode, &cbs, data, lvb_len,
436 lvb_type);
437 if (unlikely(!lock))
438 GOTO(out_nolock, err = -ENOMEM);
439
440 ldlm_lock2handle(lock, lockh);
441
442
443
444 ldlm_lock_addref_internal_nolock(lock, mode);
445 lock->l_flags |= LDLM_FL_LOCAL;
446 if (*flags & LDLM_FL_ATOMIC_CB)
447 lock->l_flags |= LDLM_FL_ATOMIC_CB;
448
449 if (policy != NULL)
450 lock->l_policy_data = *policy;
451 if (client_cookie != NULL)
452 lock->l_client_cookie = *client_cookie;
453 if (type == LDLM_EXTENT)
454 lock->l_req_extent = policy->l_extent;
455
456 err = ldlm_lock_enqueue(ns, &lock, policy, flags);
457 if (unlikely(err != ELDLM_OK))
458 GOTO(out, err);
459
460 if (policy != NULL)
461 *policy = lock->l_policy_data;
462
463 if (lock->l_completion_ast)
464 lock->l_completion_ast(lock, *flags, NULL);
465
466 LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
467 EXIT;
468 out:
469 LDLM_LOCK_RELEASE(lock);
470 out_nolock:
471 return err;
472}
473EXPORT_SYMBOL(ldlm_cli_enqueue_local);
474
475static void failed_lock_cleanup(struct ldlm_namespace *ns,
476 struct ldlm_lock *lock, int mode)
477{
478 int need_cancel = 0;
479
480
481 lock_res_and_lock(lock);
482
483 if ((lock->l_req_mode != lock->l_granted_mode) &&
484 !(lock->l_flags & LDLM_FL_FAILED)) {
485
486
487
488 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED |
489 LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING;
490 need_cancel = 1;
491 }
492 unlock_res_and_lock(lock);
493
494 if (need_cancel)
495 LDLM_DEBUG(lock,
496 "setting FL_LOCAL_ONLY | LDLM_FL_FAILED | "
497 "LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING");
498 else
499 LDLM_DEBUG(lock, "lock was granted or failed in race");
500
501 ldlm_lock_decref_internal(lock, mode);
502
503
504
505
506
507
508
509 if (lock->l_resource->lr_type == LDLM_FLOCK) {
510 lock_res_and_lock(lock);
511 ldlm_resource_unlink_lock(lock);
512 ldlm_lock_destroy_nolock(lock);
513 unlock_res_and_lock(lock);
514 }
515}
516
517
518
519
520
521
522int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
523 ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode,
524 __u64 *flags, void *lvb, __u32 lvb_len,
525 struct lustre_handle *lockh,int rc)
526{
527 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
528 int is_replay = *flags & LDLM_FL_REPLAY;
529 struct ldlm_lock *lock;
530 struct ldlm_reply *reply;
531 int cleanup_phase = 1;
532 int size = 0;
533 ENTRY;
534
535 lock = ldlm_handle2lock(lockh);
536
537 if (!lock) {
538 LASSERT(type == LDLM_FLOCK);
539 RETURN(-ENOLCK);
540 }
541
542 LASSERTF(ergo(lvb_len != 0, lvb_len == lock->l_lvb_len),
543 "lvb_len = %d, l_lvb_len = %d\n", lvb_len, lock->l_lvb_len);
544
545 if (rc != ELDLM_OK) {
546 LASSERT(!is_replay);
547 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
548 rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
549
550 if (rc != ELDLM_LOCK_ABORTED)
551 GOTO(cleanup, rc);
552 }
553
554
555 reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
556 if (reply == NULL)
557 GOTO(cleanup, rc = -EPROTO);
558
559 if (lvb_len != 0) {
560 LASSERT(lvb != NULL);
561
562 size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB,
563 RCL_SERVER);
564 if (size < 0) {
565 LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", size);
566 GOTO(cleanup, rc = size);
567 } else if (unlikely(size > lvb_len)) {
568 LDLM_ERROR(lock, "Replied LVB is larger than "
569 "expectation, expected = %d, replied = %d",
570 lvb_len, size);
571 GOTO(cleanup, rc = -EINVAL);
572 }
573 }
574
575 if (rc == ELDLM_LOCK_ABORTED) {
576 if (lvb_len != 0)
577 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
578 lvb, size);
579 GOTO(cleanup, rc = (rc != 0 ? rc : ELDLM_LOCK_ABORTED));
580 }
581
582
583 cleanup_phase = 0;
584
585 lock_res_and_lock(lock);
586
587 if (exp->exp_lock_hash) {
588
589
590
591 cfs_hash_rehash_key(exp->exp_lock_hash,
592 &lock->l_remote_handle,
593 &reply->lock_handle,
594 &lock->l_exp_hash);
595 } else {
596 lock->l_remote_handle = reply->lock_handle;
597 }
598
599 *flags = ldlm_flags_from_wire(reply->lock_flags);
600 lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
601 LDLM_INHERIT_FLAGS);
602
603
604 lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
605 LDLM_FL_NO_TIMEOUT);
606 unlock_res_and_lock(lock);
607
608 CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: 0x%llx\n",
609 lock, reply->lock_handle.cookie, *flags);
610
611
612
613
614 if ((*flags) & LDLM_FL_LOCK_CHANGED) {
615 int newmode = reply->lock_desc.l_req_mode;
616 LASSERT(!is_replay);
617 if (newmode && newmode != lock->l_req_mode) {
618 LDLM_DEBUG(lock, "server returned different mode %s",
619 ldlm_lockname[newmode]);
620 lock->l_req_mode = newmode;
621 }
622
623 if (memcmp(reply->lock_desc.l_resource.lr_name.name,
624 lock->l_resource->lr_name.name,
625 sizeof(struct ldlm_res_id))) {
626 CDEBUG(D_INFO, "remote intent success, locking "
627 "(%ld,%ld,%ld) instead of "
628 "(%ld,%ld,%ld)\n",
629 (long)reply->lock_desc.l_resource.lr_name.name[0],
630 (long)reply->lock_desc.l_resource.lr_name.name[1],
631 (long)reply->lock_desc.l_resource.lr_name.name[2],
632 (long)lock->l_resource->lr_name.name[0],
633 (long)lock->l_resource->lr_name.name[1],
634 (long)lock->l_resource->lr_name.name[2]);
635
636 rc = ldlm_lock_change_resource(ns, lock,
637 &reply->lock_desc.l_resource.lr_name);
638 if (rc || lock->l_resource == NULL)
639 GOTO(cleanup, rc = -ENOMEM);
640 LDLM_DEBUG(lock, "client-side enqueue, new resource");
641 }
642 if (with_policy)
643 if (!(type == LDLM_IBITS &&
644 !(exp_connect_flags(exp) & OBD_CONNECT_IBITS)))
645
646 ldlm_convert_policy_to_local(exp,
647 lock->l_resource->lr_type,
648 &reply->lock_desc.l_policy_data,
649 &lock->l_policy_data);
650 if (type != LDLM_PLAIN)
651 LDLM_DEBUG(lock,"client-side enqueue, new policy data");
652 }
653
654 if ((*flags) & LDLM_FL_AST_SENT ||
655
656
657
658 (LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) {
659 lock_res_and_lock(lock);
660 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
661 unlock_res_and_lock(lock);
662 LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
663 }
664
665
666
667 if (lvb_len != 0) {
668
669
670
671
672 lock_res_and_lock(lock);
673 if (lock->l_req_mode != lock->l_granted_mode)
674 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
675 lock->l_lvb_data, size);
676 unlock_res_and_lock(lock);
677 if (rc < 0) {
678 cleanup_phase = 1;
679 GOTO(cleanup, rc);
680 }
681 }
682
683 if (!is_replay) {
684 rc = ldlm_lock_enqueue(ns, &lock, NULL, flags);
685 if (lock->l_completion_ast != NULL) {
686 int err = lock->l_completion_ast(lock, *flags, NULL);
687 if (!rc)
688 rc = err;
689 if (rc)
690 cleanup_phase = 1;
691 }
692 }
693
694 if (lvb_len && lvb != NULL) {
695
696
697 memcpy(lvb, lock->l_lvb_data, lvb_len);
698 }
699
700 LDLM_DEBUG(lock, "client-side enqueue END");
701 EXIT;
702cleanup:
703 if (cleanup_phase == 1 && rc)
704 failed_lock_cleanup(ns, lock, mode);
705
706 LDLM_LOCK_PUT(lock);
707 LDLM_LOCK_RELEASE(lock);
708 return rc;
709}
710EXPORT_SYMBOL(ldlm_cli_enqueue_fini);
711
712
713
714
715
716
717
718static inline int ldlm_req_handles_avail(int req_size, int off)
719{
720 int avail;
721
722 avail = min_t(int, LDLM_MAXREQSIZE, PAGE_CACHE_SIZE - 512) - req_size;
723 if (likely(avail >= 0))
724 avail /= (int)sizeof(struct lustre_handle);
725 else
726 avail = 0;
727 avail += LDLM_LOCKREQ_HANDLES - off;
728
729 return avail;
730}
731
732static inline int ldlm_capsule_handles_avail(struct req_capsule *pill,
733 enum req_location loc,
734 int off)
735{
736 int size = req_capsule_msg_size(pill, loc);
737 return ldlm_req_handles_avail(size, off);
738}
739
740static inline int ldlm_format_handles_avail(struct obd_import *imp,
741 const struct req_format *fmt,
742 enum req_location loc, int off)
743{
744 int size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc);
745 return ldlm_req_handles_avail(size, off);
746}
747
748
749
750
751
752
753
754
755
756int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
757 int version, int opc, int canceloff,
758 struct list_head *cancels, int count)
759{
760 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
761 struct req_capsule *pill = &req->rq_pill;
762 struct ldlm_request *dlm = NULL;
763 int flags, avail, to_free, pack = 0;
764 LIST_HEAD(head);
765 int rc;
766 ENTRY;
767
768 if (cancels == NULL)
769 cancels = &head;
770 if (ns_connect_cancelset(ns)) {
771
772 req_capsule_filled_sizes(pill, RCL_CLIENT);
773 avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff);
774
775 flags = ns_connect_lru_resize(ns) ?
776 LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
777 to_free = !ns_connect_lru_resize(ns) &&
778 opc == LDLM_ENQUEUE ? 1 : 0;
779
780
781
782
783 if (avail > count)
784 count += ldlm_cancel_lru_local(ns, cancels, to_free,
785 avail - count, 0, flags);
786 if (avail > count)
787 pack = count;
788 else
789 pack = avail;
790 req_capsule_set_size(pill, &RMF_DLM_REQ, RCL_CLIENT,
791 ldlm_request_bufsize(pack, opc));
792 }
793
794 rc = ptlrpc_request_pack(req, version, opc);
795 if (rc) {
796 ldlm_lock_list_put(cancels, l_bl_ast, count);
797 RETURN(rc);
798 }
799
800 if (ns_connect_cancelset(ns)) {
801 if (canceloff) {
802 dlm = req_capsule_client_get(pill, &RMF_DLM_REQ);
803 LASSERT(dlm);
804
805
806
807
808 dlm->lock_count = canceloff;
809 }
810
811 ldlm_cli_cancel_list(cancels, pack, req, 0);
812
813 ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);
814 } else {
815 ldlm_lock_list_put(cancels, l_bl_ast, count);
816 }
817 RETURN(0);
818}
819EXPORT_SYMBOL(ldlm_prep_elc_req);
820
821int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req,
822 struct list_head *cancels, int count)
823{
824 return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
825 LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
826}
827EXPORT_SYMBOL(ldlm_prep_enqueue_req);
828
829struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, int lvb_len)
830{
831 struct ptlrpc_request *req;
832 int rc;
833 ENTRY;
834
835 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
836 if (req == NULL)
837 RETURN(ERR_PTR(-ENOMEM));
838
839 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
840 if (rc) {
841 ptlrpc_request_free(req);
842 RETURN(ERR_PTR(rc));
843 }
844
845 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
846 ptlrpc_request_set_replen(req);
847 RETURN(req);
848}
849EXPORT_SYMBOL(ldlm_enqueue_pack);
850
851
852
853
854
855
856
857
858
859
860
861int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
862 struct ldlm_enqueue_info *einfo,
863 const struct ldlm_res_id *res_id,
864 ldlm_policy_data_t const *policy, __u64 *flags,
865 void *lvb, __u32 lvb_len, enum lvb_type lvb_type,
866 struct lustre_handle *lockh, int async)
867{
868 struct ldlm_namespace *ns;
869 struct ldlm_lock *lock;
870 struct ldlm_request *body;
871 int is_replay = *flags & LDLM_FL_REPLAY;
872 int req_passed_in = 1;
873 int rc, err;
874 struct ptlrpc_request *req;
875 ENTRY;
876
877 LASSERT(exp != NULL);
878
879 ns = exp->exp_obd->obd_namespace;
880
881
882
883 if (is_replay) {
884 lock = ldlm_handle2lock_long(lockh, 0);
885 LASSERT(lock != NULL);
886 LDLM_DEBUG(lock, "client-side enqueue START");
887 LASSERT(exp == lock->l_conn_export);
888 } else {
889 const struct ldlm_callback_suite cbs = {
890 .lcs_completion = einfo->ei_cb_cp,
891 .lcs_blocking = einfo->ei_cb_bl,
892 .lcs_glimpse = einfo->ei_cb_gl,
893 .lcs_weigh = einfo->ei_cb_wg
894 };
895 lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
896 einfo->ei_mode, &cbs, einfo->ei_cbdata,
897 lvb_len, lvb_type);
898 if (lock == NULL)
899 RETURN(-ENOMEM);
900
901 ldlm_lock_addref_internal(lock, einfo->ei_mode);
902 ldlm_lock2handle(lock, lockh);
903 if (policy != NULL) {
904
905
906
907
908
909 if (einfo->ei_type == LDLM_IBITS &&
910 !(exp_connect_flags(exp) &
911 OBD_CONNECT_IBITS))
912 lock->l_policy_data.l_inodebits.bits =
913 MDS_INODELOCK_LOOKUP |
914 MDS_INODELOCK_UPDATE;
915 else
916 lock->l_policy_data = *policy;
917 }
918
919 if (einfo->ei_type == LDLM_EXTENT)
920 lock->l_req_extent = policy->l_extent;
921 LDLM_DEBUG(lock, "client-side enqueue START, flags %llx\n",
922 *flags);
923 }
924
925 lock->l_conn_export = exp;
926 lock->l_export = NULL;
927 lock->l_blocking_ast = einfo->ei_cb_bl;
928 lock->l_flags |= (*flags & LDLM_FL_NO_LRU);
929
930
931
932 if (reqp == NULL || *reqp == NULL) {
933 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
934 &RQF_LDLM_ENQUEUE,
935 LUSTRE_DLM_VERSION,
936 LDLM_ENQUEUE);
937 if (req == NULL) {
938 failed_lock_cleanup(ns, lock, einfo->ei_mode);
939 LDLM_LOCK_RELEASE(lock);
940 RETURN(-ENOMEM);
941 }
942 req_passed_in = 0;
943 if (reqp)
944 *reqp = req;
945 } else {
946 int len;
947
948 req = *reqp;
949 len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ,
950 RCL_CLIENT);
951 LASSERTF(len >= sizeof(*body), "buflen[%d] = %d, not %d\n",
952 DLM_LOCKREQ_OFF, len, (int)sizeof(*body));
953 }
954
955
956 body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
957 ldlm_lock2desc(lock, &body->lock_desc);
958 body->lock_flags = ldlm_flags_to_wire(*flags);
959 body->lock_handle[0] = *lockh;
960
961
962 if (!req_passed_in) {
963 if (lvb_len > 0)
964 req_capsule_extend(&req->rq_pill,
965 &RQF_LDLM_ENQUEUE_LVB);
966 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
967 lvb_len);
968 ptlrpc_request_set_replen(req);
969 }
970
971
972
973
974
975
976 LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT ||
977 policy->l_extent.end == OBD_OBJECT_EOF));
978
979 if (async) {
980 LASSERT(reqp != NULL);
981 RETURN(0);
982 }
983
984 LDLM_DEBUG(lock, "sending request");
985
986 rc = ptlrpc_queue_wait(req);
987
988 err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
989 einfo->ei_mode, flags, lvb, lvb_len,
990 lockh, rc);
991
992
993
994 if (err == -ENOLCK)
995 LDLM_LOCK_RELEASE(lock);
996 else
997 rc = err;
998
999 if (!req_passed_in && req != NULL) {
1000 ptlrpc_req_finished(req);
1001 if (reqp)
1002 *reqp = NULL;
1003 }
1004
1005 RETURN(rc);
1006}
1007EXPORT_SYMBOL(ldlm_cli_enqueue);
1008
1009static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
1010 __u32 *flags)
1011{
1012 struct ldlm_resource *res;
1013 int rc;
1014 ENTRY;
1015 if (ns_is_client(ldlm_lock_to_ns(lock))) {
1016 CERROR("Trying to cancel local lock\n");
1017 LBUG();
1018 }
1019 LDLM_DEBUG(lock, "client-side local convert");
1020
1021 res = ldlm_lock_convert(lock, new_mode, flags);
1022 if (res) {
1023 ldlm_reprocess_all(res);
1024 rc = 0;
1025 } else {
1026 rc = EDEADLOCK;
1027 }
1028 LDLM_DEBUG(lock, "client-side local convert handler END");
1029 LDLM_LOCK_PUT(lock);
1030 RETURN(rc);
1031}
1032
1033
1034
1035
1036
1037int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags)
1038{
1039 struct ldlm_request *body;
1040 struct ldlm_reply *reply;
1041 struct ldlm_lock *lock;
1042 struct ldlm_resource *res;
1043 struct ptlrpc_request *req;
1044 int rc;
1045 ENTRY;
1046
1047 lock = ldlm_handle2lock(lockh);
1048 if (!lock) {
1049 LBUG();
1050 RETURN(-EINVAL);
1051 }
1052 *flags = 0;
1053
1054 if (lock->l_conn_export == NULL)
1055 RETURN(ldlm_cli_convert_local(lock, new_mode, flags));
1056
1057 LDLM_DEBUG(lock, "client-side convert");
1058
1059 req = ptlrpc_request_alloc_pack(class_exp2cliimp(lock->l_conn_export),
1060 &RQF_LDLM_CONVERT, LUSTRE_DLM_VERSION,
1061 LDLM_CONVERT);
1062 if (req == NULL) {
1063 LDLM_LOCK_PUT(lock);
1064 RETURN(-ENOMEM);
1065 }
1066
1067 body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1068 body->lock_handle[0] = lock->l_remote_handle;
1069
1070 body->lock_desc.l_req_mode = new_mode;
1071 body->lock_flags = ldlm_flags_to_wire(*flags);
1072
1073
1074 ptlrpc_request_set_replen(req);
1075 rc = ptlrpc_queue_wait(req);
1076 if (rc != ELDLM_OK)
1077 GOTO(out, rc);
1078
1079 reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1080 if (reply == NULL)
1081 GOTO(out, rc = -EPROTO);
1082
1083 if (req->rq_status)
1084 GOTO(out, rc = req->rq_status);
1085
1086 res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
1087 if (res != NULL) {
1088 ldlm_reprocess_all(res);
1089
1090
1091 if (lock->l_completion_ast) {
1092 rc = lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC,
1093 NULL);
1094 if (rc)
1095 GOTO(out, rc);
1096 }
1097 } else {
1098 rc = EDEADLOCK;
1099 }
1100 EXIT;
1101 out:
1102 LDLM_LOCK_PUT(lock);
1103 ptlrpc_req_finished(req);
1104 return rc;
1105}
1106EXPORT_SYMBOL(ldlm_cli_convert);
1107
1108
1109
1110
1111
1112
1113
1114
1115static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock)
1116{
1117 __u64 rc = LDLM_FL_LOCAL_ONLY;
1118 ENTRY;
1119
1120 if (lock->l_conn_export) {
1121 bool local_only;
1122
1123 LDLM_DEBUG(lock, "client-side cancel");
1124
1125 lock_res_and_lock(lock);
1126 lock->l_flags |= LDLM_FL_CBPENDING;
1127 local_only = !!(lock->l_flags &
1128 (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK));
1129 ldlm_cancel_callback(lock);
1130 rc = (lock->l_flags & LDLM_FL_BL_AST) ?
1131 LDLM_FL_BL_AST : LDLM_FL_CANCELING;
1132 unlock_res_and_lock(lock);
1133
1134 if (local_only) {
1135 CDEBUG(D_DLMTRACE, "not sending request (at caller's "
1136 "instruction)\n");
1137 rc = LDLM_FL_LOCAL_ONLY;
1138 }
1139 ldlm_lock_cancel(lock);
1140 } else {
1141 if (ns_is_client(ldlm_lock_to_ns(lock))) {
1142 LDLM_ERROR(lock, "Trying to cancel local lock");
1143 LBUG();
1144 }
1145 LDLM_DEBUG(lock, "server-side local cancel");
1146 ldlm_lock_cancel(lock);
1147 ldlm_reprocess_all(lock->l_resource);
1148 }
1149
1150 RETURN(rc);
1151}
1152
1153
1154
1155
1156static void ldlm_cancel_pack(struct ptlrpc_request *req,
1157 struct list_head *head, int count)
1158{
1159 struct ldlm_request *dlm;
1160 struct ldlm_lock *lock;
1161 int max, packed = 0;
1162 ENTRY;
1163
1164 dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1165 LASSERT(dlm != NULL);
1166
1167
1168 max = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT) -
1169 sizeof(struct ldlm_request);
1170 max /= sizeof(struct lustre_handle);
1171 max += LDLM_LOCKREQ_HANDLES;
1172 LASSERT(max >= dlm->lock_count + count);
1173
1174
1175
1176
1177 list_for_each_entry(lock, head, l_bl_ast) {
1178 if (!count--)
1179 break;
1180 LASSERT(lock->l_conn_export);
1181
1182 LDLM_DEBUG(lock, "packing");
1183 dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle;
1184 packed++;
1185 }
1186 CDEBUG(D_DLMTRACE, "%d locks packed\n", packed);
1187 EXIT;
1188}
1189
1190
1191
1192
1193int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
1194 int count, ldlm_cancel_flags_t flags)
1195{
1196 struct ptlrpc_request *req = NULL;
1197 struct obd_import *imp;
1198 int free, sent = 0;
1199 int rc = 0;
1200 ENTRY;
1201
1202 LASSERT(exp != NULL);
1203 LASSERT(count > 0);
1204
1205 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, cfs_fail_val);
1206
1207 if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
1208 RETURN(count);
1209
1210 free = ldlm_format_handles_avail(class_exp2cliimp(exp),
1211 &RQF_LDLM_CANCEL, RCL_CLIENT, 0);
1212 if (count > free)
1213 count = free;
1214
1215 while (1) {
1216 imp = class_exp2cliimp(exp);
1217 if (imp == NULL || imp->imp_invalid) {
1218 CDEBUG(D_DLMTRACE,
1219 "skipping cancel on invalid import %p\n", imp);
1220 RETURN(count);
1221 }
1222
1223 req = ptlrpc_request_alloc(imp, &RQF_LDLM_CANCEL);
1224 if (req == NULL)
1225 GOTO(out, rc = -ENOMEM);
1226
1227 req_capsule_filled_sizes(&req->rq_pill, RCL_CLIENT);
1228 req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
1229 ldlm_request_bufsize(count, LDLM_CANCEL));
1230
1231 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CANCEL);
1232 if (rc) {
1233 ptlrpc_request_free(req);
1234 GOTO(out, rc);
1235 }
1236
1237 req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
1238 req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
1239 ptlrpc_at_set_req_timeout(req);
1240
1241 ldlm_cancel_pack(req, cancels, count);
1242
1243 ptlrpc_request_set_replen(req);
1244 if (flags & LCF_ASYNC) {
1245 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1246 sent = count;
1247 GOTO(out, 0);
1248 } else {
1249 rc = ptlrpc_queue_wait(req);
1250 }
1251 if (rc == ESTALE) {
1252 CDEBUG(D_DLMTRACE, "client/server (nid %s) "
1253 "out of sync -- not fatal\n",
1254 libcfs_nid2str(req->rq_import->
1255 imp_connection->c_peer.nid));
1256 rc = 0;
1257 } else if (rc == -ETIMEDOUT &&
1258 req->rq_import_generation == imp->imp_generation) {
1259 ptlrpc_req_finished(req);
1260 continue;
1261 } else if (rc != ELDLM_OK) {
1262
1263 CDEBUG_LIMIT(rc == -ESHUTDOWN ? D_DLMTRACE : D_ERROR,
1264 "Got rc %d from cancel RPC: "
1265 "canceling anyway\n", rc);
1266 break;
1267 }
1268 sent = count;
1269 break;
1270 }
1271
1272 ptlrpc_req_finished(req);
1273 EXIT;
1274out:
1275 return sent ? sent : rc;
1276}
1277EXPORT_SYMBOL(ldlm_cli_cancel_req);
1278
1279static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
1280{
1281 LASSERT(imp != NULL);
1282 return &imp->imp_obd->obd_namespace->ns_pool;
1283}
1284
1285
1286
1287
1288int ldlm_cli_update_pool(struct ptlrpc_request *req)
1289{
1290 struct obd_device *obd;
1291 __u64 new_slv;
1292 __u32 new_limit;
1293 ENTRY;
1294 if (unlikely(!req->rq_import || !req->rq_import->imp_obd ||
1295 !imp_connect_lru_resize(req->rq_import)))
1296 {
1297
1298
1299
1300 RETURN(0);
1301 }
1302
1303
1304
1305
1306
1307
1308 if (lustre_msg_get_slv(req->rq_repmsg) == 0 ||
1309 lustre_msg_get_limit(req->rq_repmsg) == 0) {
1310 DEBUG_REQ(D_HA, req, "Zero SLV or Limit found "
1311 "(SLV: "LPU64", Limit: %u)",
1312 lustre_msg_get_slv(req->rq_repmsg),
1313 lustre_msg_get_limit(req->rq_repmsg));
1314 RETURN(0);
1315 }
1316
1317 new_limit = lustre_msg_get_limit(req->rq_repmsg);
1318 new_slv = lustre_msg_get_slv(req->rq_repmsg);
1319 obd = req->rq_import->imp_obd;
1320
1321
1322
1323
1324
1325
1326 write_lock(&obd->obd_pool_lock);
1327 obd->obd_pool_slv = new_slv;
1328 obd->obd_pool_limit = new_limit;
1329 write_unlock(&obd->obd_pool_lock);
1330
1331 RETURN(0);
1332}
1333EXPORT_SYMBOL(ldlm_cli_update_pool);
1334
1335
1336
1337
1338
1339
1340int ldlm_cli_cancel(struct lustre_handle *lockh,
1341 ldlm_cancel_flags_t cancel_flags)
1342{
1343 struct obd_export *exp;
1344 int avail, flags, count = 1;
1345 __u64 rc = 0;
1346 struct ldlm_namespace *ns;
1347 struct ldlm_lock *lock;
1348 LIST_HEAD(cancels);
1349 ENTRY;
1350
1351
1352 lock = ldlm_handle2lock_long(lockh, LDLM_FL_CANCELING);
1353 if (lock == NULL) {
1354 LDLM_DEBUG_NOLOCK("lock is already being destroyed\n");
1355 RETURN(0);
1356 }
1357
1358 rc = ldlm_cli_cancel_local(lock);
1359 if (rc == LDLM_FL_LOCAL_ONLY) {
1360 LDLM_LOCK_RELEASE(lock);
1361 RETURN(0);
1362 }
1363
1364
1365
1366 LASSERT(list_empty(&lock->l_bl_ast));
1367 list_add(&lock->l_bl_ast, &cancels);
1368
1369 exp = lock->l_conn_export;
1370 if (exp_connect_cancelset(exp)) {
1371 avail = ldlm_format_handles_avail(class_exp2cliimp(exp),
1372 &RQF_LDLM_CANCEL,
1373 RCL_CLIENT, 0);
1374 LASSERT(avail > 0);
1375
1376 ns = ldlm_lock_to_ns(lock);
1377 flags = ns_connect_lru_resize(ns) ?
1378 LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
1379 count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
1380 LCF_BL_AST, flags);
1381 }
1382 ldlm_cli_cancel_list(&cancels, count, NULL, cancel_flags);
1383 RETURN(0);
1384}
1385EXPORT_SYMBOL(ldlm_cli_cancel);
1386
1387
1388
1389
1390
1391int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
1392 ldlm_cancel_flags_t flags)
1393{
1394 LIST_HEAD(head);
1395 struct ldlm_lock *lock, *next;
1396 int left = 0, bl_ast = 0;
1397 __u64 rc;
1398
1399 left = count;
1400 list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
1401 if (left-- == 0)
1402 break;
1403
1404 if (flags & LCF_LOCAL) {
1405 rc = LDLM_FL_LOCAL_ONLY;
1406 ldlm_lock_cancel(lock);
1407 } else {
1408 rc = ldlm_cli_cancel_local(lock);
1409 }
1410
1411
1412
1413
1414 if (!(flags & LCF_BL_AST) && (rc == LDLM_FL_BL_AST)) {
1415 LDLM_DEBUG(lock, "Cancel lock separately");
1416 list_del_init(&lock->l_bl_ast);
1417 list_add(&lock->l_bl_ast, &head);
1418 bl_ast++;
1419 continue;
1420 }
1421 if (rc == LDLM_FL_LOCAL_ONLY) {
1422
1423 list_del_init(&lock->l_bl_ast);
1424 LDLM_LOCK_RELEASE(lock);
1425 count--;
1426 }
1427 }
1428 if (bl_ast > 0) {
1429 count -= bl_ast;
1430 ldlm_cli_cancel_list(&head, bl_ast, NULL, 0);
1431 }
1432
1433 RETURN(count);
1434}
1435EXPORT_SYMBOL(ldlm_cli_cancel_list_local);
1436
1437
1438
1439
1440
1441
1442static ldlm_policy_res_t ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns,
1443 struct ldlm_lock *lock,
1444 int unused, int added,
1445 int count)
1446{
1447 ldlm_policy_res_t result = LDLM_POLICY_CANCEL_LOCK;
1448 ldlm_cancel_for_recovery cb = ns->ns_cancel_for_recovery;
1449 lock_res_and_lock(lock);
1450
1451
1452
1453 switch (lock->l_resource->lr_type) {
1454 case LDLM_EXTENT:
1455 case LDLM_IBITS:
1456 if (cb && cb(lock))
1457 break;
1458 default:
1459 result = LDLM_POLICY_SKIP_LOCK;
1460 lock->l_flags |= LDLM_FL_SKIPPED;
1461 break;
1462 }
1463
1464 unlock_res_and_lock(lock);
1465 RETURN(result);
1466}
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
1478 struct ldlm_lock *lock,
1479 int unused, int added,
1480 int count)
1481{
1482 cfs_time_t cur = cfs_time_current();
1483 struct ldlm_pool *pl = &ns->ns_pool;
1484 __u64 slv, lvf, lv;
1485 cfs_time_t la;
1486
1487
1488
1489 if (count && added >= count)
1490 return LDLM_POLICY_KEEP_LOCK;
1491
1492 slv = ldlm_pool_get_slv(pl);
1493 lvf = ldlm_pool_get_lvf(pl);
1494 la = cfs_duration_sec(cfs_time_sub(cur,
1495 lock->l_last_used));
1496 lv = lvf * la * unused;
1497
1498
1499 ldlm_pool_set_clv(pl, lv);
1500
1501
1502
1503 return (slv == 0 || lv < slv) ?
1504 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1505}
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516static ldlm_policy_res_t ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
1517 struct ldlm_lock *lock,
1518 int unused, int added,
1519 int count)
1520{
1521
1522
1523 return (added >= count) ?
1524 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1525}
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536static ldlm_policy_res_t ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
1537 struct ldlm_lock *lock,
1538 int unused, int added,
1539 int count)
1540{
1541
1542 return ((added >= count) &&
1543 cfs_time_before(cfs_time_current(),
1544 cfs_time_add(lock->l_last_used,
1545 ns->ns_max_age))) ?
1546 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1547}
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558static ldlm_policy_res_t ldlm_cancel_default_policy(struct ldlm_namespace *ns,
1559 struct ldlm_lock *lock,
1560 int unused, int added,
1561 int count)
1562{
1563
1564
1565 return (added >= count) ?
1566 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1567}
1568
1569typedef ldlm_policy_res_t (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *,
1570 struct ldlm_lock *, int,
1571 int, int);
1572
1573static ldlm_cancel_lru_policy_t
1574ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
1575{
1576 if (flags & LDLM_CANCEL_NO_WAIT)
1577 return ldlm_cancel_no_wait_policy;
1578
1579 if (ns_connect_lru_resize(ns)) {
1580 if (flags & LDLM_CANCEL_SHRINK)
1581
1582 return ldlm_cancel_passed_policy;
1583 else if (flags & LDLM_CANCEL_LRUR)
1584 return ldlm_cancel_lrur_policy;
1585 else if (flags & LDLM_CANCEL_PASSED)
1586 return ldlm_cancel_passed_policy;
1587 } else {
1588 if (flags & LDLM_CANCEL_AGED)
1589 return ldlm_cancel_aged_policy;
1590 }
1591
1592 return ldlm_cancel_default_policy;
1593}
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, struct list_head *cancels,
1629 int count, int max, int flags)
1630{
1631 ldlm_cancel_lru_policy_t pf;
1632 struct ldlm_lock *lock, *next;
1633 int added = 0, unused, remained;
1634 ENTRY;
1635
1636 spin_lock(&ns->ns_lock);
1637 unused = ns->ns_nr_unused;
1638 remained = unused;
1639
1640 if (!ns_connect_lru_resize(ns))
1641 count += unused - ns->ns_max_unused;
1642
1643 pf = ldlm_cancel_lru_policy(ns, flags);
1644 LASSERT(pf != NULL);
1645
1646 while (!list_empty(&ns->ns_unused_list)) {
1647 ldlm_policy_res_t result;
1648
1649
1650 if (remained-- <= 0)
1651 break;
1652
1653
1654 if (max && added >= max)
1655 break;
1656
1657 list_for_each_entry_safe(lock, next, &ns->ns_unused_list,
1658 l_lru) {
1659
1660 LASSERT(!(lock->l_flags & LDLM_FL_BL_AST));
1661
1662 if (flags & LDLM_CANCEL_NO_WAIT &&
1663 lock->l_flags & LDLM_FL_SKIPPED)
1664
1665 continue;
1666
1667
1668
1669 if (!(lock->l_flags & LDLM_FL_CANCELING))
1670 break;
1671
1672 ldlm_lock_remove_from_lru_nolock(lock);
1673 }
1674 if (&lock->l_lru == &ns->ns_unused_list)
1675 break;
1676
1677 LDLM_LOCK_GET(lock);
1678 spin_unlock(&ns->ns_lock);
1679 lu_ref_add(&lock->l_reference, __FUNCTION__, current);
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694 result = pf(ns, lock, unused, added, count);
1695 if (result == LDLM_POLICY_KEEP_LOCK) {
1696 lu_ref_del(&lock->l_reference,
1697 __FUNCTION__, current);
1698 LDLM_LOCK_RELEASE(lock);
1699 spin_lock(&ns->ns_lock);
1700 break;
1701 }
1702 if (result == LDLM_POLICY_SKIP_LOCK) {
1703 lu_ref_del(&lock->l_reference,
1704 __func__, current);
1705 LDLM_LOCK_RELEASE(lock);
1706 spin_lock(&ns->ns_lock);
1707 continue;
1708 }
1709
1710 lock_res_and_lock(lock);
1711
1712 if ((lock->l_flags & LDLM_FL_CANCELING) ||
1713 (ldlm_lock_remove_from_lru(lock) == 0)) {
1714
1715
1716
1717
1718 unlock_res_and_lock(lock);
1719 lu_ref_del(&lock->l_reference,
1720 __FUNCTION__, current);
1721 LDLM_LOCK_RELEASE(lock);
1722 spin_lock(&ns->ns_lock);
1723 continue;
1724 }
1725 LASSERT(!lock->l_readers && !lock->l_writers);
1726
1727
1728
1729
1730
1731
1732 lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK;
1733
1734
1735
1736
1737
1738
1739
1740 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING;
1741
1742
1743
1744
1745
1746
1747
1748 LASSERT(list_empty(&lock->l_bl_ast));
1749 list_add(&lock->l_bl_ast, cancels);
1750 unlock_res_and_lock(lock);
1751 lu_ref_del(&lock->l_reference, __FUNCTION__, current);
1752 spin_lock(&ns->ns_lock);
1753 added++;
1754 unused--;
1755 }
1756 spin_unlock(&ns->ns_lock);
1757 RETURN(added);
1758}
1759
1760int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
1761 int count, int max, ldlm_cancel_flags_t cancel_flags,
1762 int flags)
1763{
1764 int added;
1765 added = ldlm_prepare_lru_list(ns, cancels, count, max, flags);
1766 if (added <= 0)
1767 return added;
1768 return ldlm_cli_cancel_list_local(cancels, added, cancel_flags);
1769}
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
1780 ldlm_cancel_flags_t cancel_flags,
1781 int flags)
1782{
1783 LIST_HEAD(cancels);
1784 int count, rc;
1785 ENTRY;
1786
1787
1788
1789 count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, flags);
1790 rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags);
1791 if (rc == 0)
1792 RETURN(count);
1793
1794 RETURN(0);
1795}
1796
1797
1798
1799
1800
1801
1802int ldlm_cancel_resource_local(struct ldlm_resource *res,
1803 struct list_head *cancels,
1804 ldlm_policy_data_t *policy,
1805 ldlm_mode_t mode, int lock_flags,
1806 ldlm_cancel_flags_t cancel_flags, void *opaque)
1807{
1808 struct ldlm_lock *lock;
1809 int count = 0;
1810 ENTRY;
1811
1812 lock_res(res);
1813 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
1814 if (opaque != NULL && lock->l_ast_data != opaque) {
1815 LDLM_ERROR(lock, "data %p doesn't match opaque %p",
1816 lock->l_ast_data, opaque);
1817
1818 continue;
1819 }
1820
1821 if (lock->l_readers || lock->l_writers)
1822 continue;
1823
1824
1825
1826 if (lock->l_flags & LDLM_FL_BL_AST ||
1827 lock->l_flags & LDLM_FL_CANCELING)
1828 continue;
1829
1830 if (lockmode_compat(lock->l_granted_mode, mode))
1831 continue;
1832
1833
1834
1835 if (policy && (lock->l_resource->lr_type == LDLM_IBITS) &&
1836 !(lock->l_policy_data.l_inodebits.bits &
1837 policy->l_inodebits.bits))
1838 continue;
1839
1840
1841 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
1842 lock_flags;
1843
1844 LASSERT(list_empty(&lock->l_bl_ast));
1845 list_add(&lock->l_bl_ast, cancels);
1846 LDLM_LOCK_GET(lock);
1847 count++;
1848 }
1849 unlock_res(res);
1850
1851 RETURN(ldlm_cli_cancel_list_local(cancels, count, cancel_flags));
1852}
1853EXPORT_SYMBOL(ldlm_cancel_resource_local);
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865int ldlm_cli_cancel_list(struct list_head *cancels, int count,
1866 struct ptlrpc_request *req, ldlm_cancel_flags_t flags)
1867{
1868 struct ldlm_lock *lock;
1869 int res = 0;
1870 ENTRY;
1871
1872 if (list_empty(cancels) || count == 0)
1873 RETURN(0);
1874
1875
1876
1877
1878
1879
1880 while (count > 0) {
1881 LASSERT(!list_empty(cancels));
1882 lock = list_entry(cancels->next, struct ldlm_lock,
1883 l_bl_ast);
1884 LASSERT(lock->l_conn_export);
1885
1886 if (exp_connect_cancelset(lock->l_conn_export)) {
1887 res = count;
1888 if (req)
1889 ldlm_cancel_pack(req, cancels, count);
1890 else
1891 res = ldlm_cli_cancel_req(lock->l_conn_export,
1892 cancels, count,
1893 flags);
1894 } else {
1895 res = ldlm_cli_cancel_req(lock->l_conn_export,
1896 cancels, 1, flags);
1897 }
1898
1899 if (res < 0) {
1900 CDEBUG_LIMIT(res == -ESHUTDOWN ? D_DLMTRACE : D_ERROR,
1901 "ldlm_cli_cancel_list: %d\n", res);
1902 res = count;
1903 }
1904
1905 count -= res;
1906 ldlm_lock_list_put(cancels, l_bl_ast, res);
1907 }
1908 LASSERT(count == 0);
1909 RETURN(0);
1910}
1911EXPORT_SYMBOL(ldlm_cli_cancel_list);
1912
1913
1914
1915
1916
1917
1918int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
1919 const struct ldlm_res_id *res_id,
1920 ldlm_policy_data_t *policy,
1921 ldlm_mode_t mode,
1922 ldlm_cancel_flags_t flags,
1923 void *opaque)
1924{
1925 struct ldlm_resource *res;
1926 LIST_HEAD(cancels);
1927 int count;
1928 int rc;
1929 ENTRY;
1930
1931 res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
1932 if (res == NULL) {
1933
1934 CDEBUG(D_INFO, "No resource "LPU64"\n", res_id->name[0]);
1935 RETURN(0);
1936 }
1937
1938 LDLM_RESOURCE_ADDREF(res);
1939 count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
1940 0, flags | LCF_BL_AST, opaque);
1941 rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags);
1942 if (rc != ELDLM_OK)
1943 CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc);
1944
1945 LDLM_RESOURCE_DELREF(res);
1946 ldlm_resource_putref(res);
1947 RETURN(0);
1948}
1949EXPORT_SYMBOL(ldlm_cli_cancel_unused_resource);
1950
1951struct ldlm_cli_cancel_arg {
1952 int lc_flags;
1953 void *lc_opaque;
1954};
1955
1956static int ldlm_cli_hash_cancel_unused(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1957 struct hlist_node *hnode, void *arg)
1958{
1959 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
1960 struct ldlm_cli_cancel_arg *lc = arg;
1961 int rc;
1962
1963 rc = ldlm_cli_cancel_unused_resource(ldlm_res_to_ns(res), &res->lr_name,
1964 NULL, LCK_MINMODE,
1965 lc->lc_flags, lc->lc_opaque);
1966 if (rc != 0) {
1967 CERROR("ldlm_cli_cancel_unused ("LPU64"): %d\n",
1968 res->lr_name.name[0], rc);
1969 }
1970
1971 return 0;
1972}
1973
1974
1975
1976
1977
1978
1979
1980int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
1981 const struct ldlm_res_id *res_id,
1982 ldlm_cancel_flags_t flags, void *opaque)
1983{
1984 struct ldlm_cli_cancel_arg arg = {
1985 .lc_flags = flags,
1986 .lc_opaque = opaque,
1987 };
1988
1989 ENTRY;
1990
1991 if (ns == NULL)
1992 RETURN(ELDLM_OK);
1993
1994 if (res_id != NULL) {
1995 RETURN(ldlm_cli_cancel_unused_resource(ns, res_id, NULL,
1996 LCK_MINMODE, flags,
1997 opaque));
1998 } else {
1999 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2000 ldlm_cli_hash_cancel_unused, &arg);
2001 RETURN(ELDLM_OK);
2002 }
2003}
2004EXPORT_SYMBOL(ldlm_cli_cancel_unused);
2005
2006
2007
2008int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
2009 void *closure)
2010{
2011 struct list_head *tmp, *next;
2012 struct ldlm_lock *lock;
2013 int rc = LDLM_ITER_CONTINUE;
2014
2015 ENTRY;
2016
2017 if (!res)
2018 RETURN(LDLM_ITER_CONTINUE);
2019
2020 lock_res(res);
2021 list_for_each_safe(tmp, next, &res->lr_granted) {
2022 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
2023
2024 if (iter(lock, closure) == LDLM_ITER_STOP)
2025 GOTO(out, rc = LDLM_ITER_STOP);
2026 }
2027
2028 list_for_each_safe(tmp, next, &res->lr_converting) {
2029 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
2030
2031 if (iter(lock, closure) == LDLM_ITER_STOP)
2032 GOTO(out, rc = LDLM_ITER_STOP);
2033 }
2034
2035 list_for_each_safe(tmp, next, &res->lr_waiting) {
2036 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
2037
2038 if (iter(lock, closure) == LDLM_ITER_STOP)
2039 GOTO(out, rc = LDLM_ITER_STOP);
2040 }
2041 out:
2042 unlock_res(res);
2043 RETURN(rc);
2044}
2045EXPORT_SYMBOL(ldlm_resource_foreach);
2046
2047struct iter_helper_data {
2048 ldlm_iterator_t iter;
2049 void *closure;
2050};
2051
2052static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure)
2053{
2054 struct iter_helper_data *helper = closure;
2055 return helper->iter(lock, helper->closure);
2056}
2057
2058static int ldlm_res_iter_helper(cfs_hash_t *hs, cfs_hash_bd_t *bd,
2059 struct hlist_node *hnode, void *arg)
2060
2061{
2062 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2063
2064 return ldlm_resource_foreach(res, ldlm_iter_helper, arg) ==
2065 LDLM_ITER_STOP;
2066}
2067
2068void ldlm_namespace_foreach(struct ldlm_namespace *ns,
2069 ldlm_iterator_t iter, void *closure)
2070
2071{
2072 struct iter_helper_data helper = { iter: iter, closure: closure };
2073
2074 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2075 ldlm_res_iter_helper, &helper);
2076
2077}
2078EXPORT_SYMBOL(ldlm_namespace_foreach);
2079
2080
2081
2082
2083
2084
2085int ldlm_resource_iterate(struct ldlm_namespace *ns,
2086 const struct ldlm_res_id *res_id,
2087 ldlm_iterator_t iter, void *data)
2088{
2089 struct ldlm_resource *res;
2090 int rc;
2091 ENTRY;
2092
2093 if (ns == NULL) {
2094 CERROR("must pass in namespace\n");
2095 LBUG();
2096 }
2097
2098 res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
2099 if (res == NULL)
2100 RETURN(0);
2101
2102 LDLM_RESOURCE_ADDREF(res);
2103 rc = ldlm_resource_foreach(res, iter, data);
2104 LDLM_RESOURCE_DELREF(res);
2105 ldlm_resource_putref(res);
2106 RETURN(rc);
2107}
2108EXPORT_SYMBOL(ldlm_resource_iterate);
2109
2110
2111
2112static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
2113{
2114 struct list_head *list = closure;
2115
2116
2117 LASSERTF(list_empty(&lock->l_pending_chain),
2118 "lock %p next %p prev %p\n",
2119 lock, &lock->l_pending_chain.next,&lock->l_pending_chain.prev);
2120
2121
2122
2123
2124 if (!(lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_CANCELING))) {
2125 list_add(&lock->l_pending_chain, list);
2126 LDLM_LOCK_GET(lock);
2127 }
2128
2129 return LDLM_ITER_CONTINUE;
2130}
2131
2132static int replay_lock_interpret(const struct lu_env *env,
2133 struct ptlrpc_request *req,
2134 struct ldlm_async_args *aa, int rc)
2135{
2136 struct ldlm_lock *lock;
2137 struct ldlm_reply *reply;
2138 struct obd_export *exp;
2139
2140 ENTRY;
2141 atomic_dec(&req->rq_import->imp_replay_inflight);
2142 if (rc != ELDLM_OK)
2143 GOTO(out, rc);
2144
2145
2146 reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2147 if (reply == NULL)
2148 GOTO(out, rc = -EPROTO);
2149
2150 lock = ldlm_handle2lock(&aa->lock_handle);
2151 if (!lock) {
2152 CERROR("received replay ack for unknown local cookie "LPX64
2153 " remote cookie "LPX64 " from server %s id %s\n",
2154 aa->lock_handle.cookie, reply->lock_handle.cookie,
2155 req->rq_export->exp_client_uuid.uuid,
2156 libcfs_id2str(req->rq_peer));
2157 GOTO(out, rc = -ESTALE);
2158 }
2159
2160
2161 exp = req->rq_export;
2162 if (exp && exp->exp_lock_hash) {
2163
2164
2165
2166 cfs_hash_rehash_key(exp->exp_lock_hash,
2167 &lock->l_remote_handle,
2168 &reply->lock_handle,
2169 &lock->l_exp_hash);
2170 } else {
2171 lock->l_remote_handle = reply->lock_handle;
2172 }
2173
2174 LDLM_DEBUG(lock, "replayed lock:");
2175 ptlrpc_import_recovery_state_machine(req->rq_import);
2176 LDLM_LOCK_PUT(lock);
2177out:
2178 if (rc != ELDLM_OK)
2179 ptlrpc_connect_import(req->rq_import);
2180
2181 RETURN(rc);
2182}
2183
2184static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
2185{
2186 struct ptlrpc_request *req;
2187 struct ldlm_async_args *aa;
2188 struct ldlm_request *body;
2189 int flags;
2190 ENTRY;
2191
2192
2193
2194 if (lock->l_flags & LDLM_FL_CANCELING) {
2195 LDLM_DEBUG(lock, "Not replaying canceled lock:");
2196 RETURN(0);
2197 }
2198
2199
2200
2201
2202 if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) {
2203 LDLM_DEBUG(lock, "Not replaying reply-less lock:");
2204 ldlm_lock_cancel(lock);
2205 RETURN(0);
2206 }
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222 if (lock->l_granted_mode == lock->l_req_mode)
2223 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED;
2224 else if (lock->l_granted_mode)
2225 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_CONV;
2226 else if (!list_empty(&lock->l_res_link))
2227 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT;
2228 else
2229 flags = LDLM_FL_REPLAY;
2230
2231 req = ptlrpc_request_alloc_pack(imp, &RQF_LDLM_ENQUEUE,
2232 LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2233 if (req == NULL)
2234 RETURN(-ENOMEM);
2235
2236
2237 req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
2238
2239 body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2240 ldlm_lock2desc(lock, &body->lock_desc);
2241 body->lock_flags = ldlm_flags_to_wire(flags);
2242
2243 ldlm_lock2handle(lock, &body->lock_handle[0]);
2244 if (lock->l_lvb_len > 0)
2245 req_capsule_extend(&req->rq_pill, &RQF_LDLM_ENQUEUE_LVB);
2246 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2247 lock->l_lvb_len);
2248 ptlrpc_request_set_replen(req);
2249
2250
2251
2252
2253 lustre_msg_set_flags(req->rq_reqmsg, MSG_REQ_REPLAY_DONE);
2254
2255 LDLM_DEBUG(lock, "replaying lock:");
2256
2257 atomic_inc(&req->rq_import->imp_replay_inflight);
2258 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2259 aa = ptlrpc_req_async_args(req);
2260 aa->lock_handle = body->lock_handle[0];
2261 req->rq_interpret_reply = (ptlrpc_interpterer_t)replay_lock_interpret;
2262 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
2263
2264 RETURN(0);
2265}
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
2278{
2279 int canceled;
2280 LIST_HEAD(cancels);
2281
2282 CDEBUG(D_DLMTRACE, "Dropping as many unused locks as possible before"
2283 "replay for namespace %s (%d)\n",
2284 ldlm_ns_name(ns), ns->ns_nr_unused);
2285
2286
2287
2288
2289 canceled = ldlm_cancel_lru_local(ns, &cancels, ns->ns_nr_unused, 0,
2290 LCF_LOCAL, LDLM_CANCEL_NO_WAIT);
2291
2292 CDEBUG(D_DLMTRACE, "Canceled %d unused locks from namespace %s\n",
2293 canceled, ldlm_ns_name(ns));
2294}
2295
2296int ldlm_replay_locks(struct obd_import *imp)
2297{
2298 struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
2299 LIST_HEAD(list);
2300 struct ldlm_lock *lock, *next;
2301 int rc = 0;
2302
2303 ENTRY;
2304
2305 LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
2306
2307
2308 if (imp->imp_vbr_failed)
2309 RETURN(0);
2310
2311
2312 atomic_inc(&imp->imp_replay_inflight);
2313
2314 if (ldlm_cancel_unused_locks_before_replay)
2315 ldlm_cancel_unused_locks_for_replay(ns);
2316
2317 ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
2318
2319 list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
2320 list_del_init(&lock->l_pending_chain);
2321 if (rc) {
2322 LDLM_LOCK_RELEASE(lock);
2323 continue;
2324 }
2325 rc = replay_one_lock(imp, lock);
2326 LDLM_LOCK_RELEASE(lock);
2327 }
2328
2329 atomic_dec(&imp->imp_replay_inflight);
2330
2331 RETURN(rc);
2332}
2333EXPORT_SYMBOL(ldlm_replay_locks);
2334