1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41#define DEBUG_SUBSYSTEM S_OSC
42
43#include "../../include/linux/libcfs/libcfs.h"
44
45#include "../include/lustre_fid.h"
46
47#include "osc_cl_internal.h"
48
49
50
51
52
53#define _PAGEREF_MAGIC (-10000000)
54
55
56
57
58
59
60
61static const struct cl_lock_operations osc_lock_ops;
62static const struct cl_lock_operations osc_lock_lockless_ops;
63static void osc_lock_to_lockless(const struct lu_env *env,
64 struct osc_lock *ols, int force);
65static int osc_lock_has_pages(struct osc_lock *olck);
66
67int osc_lock_is_lockless(const struct osc_lock *olck)
68{
69 return (olck->ols_cl.cls_ops == &osc_lock_lockless_ops);
70}
71
72
73
74
75
76
77static struct ldlm_lock *osc_handle_ptr(struct lustre_handle *handle)
78{
79 struct ldlm_lock *lock;
80
81 lock = ldlm_handle2lock(handle);
82 if (lock)
83 LDLM_LOCK_PUT(lock);
84 return lock;
85}
86
87
88
89
90static int osc_lock_invariant(struct osc_lock *ols)
91{
92 struct ldlm_lock *lock = osc_handle_ptr(&ols->ols_handle);
93 struct ldlm_lock *olock = ols->ols_lock;
94 int handle_used = lustre_handle_is_used(&ols->ols_handle);
95
96 if (ergo(osc_lock_is_lockless(ols),
97 ols->ols_locklessable && !ols->ols_lock))
98 return 1;
99
100
101
102
103 if (!ergo(olock, handle_used))
104 return 0;
105
106 if (!ergo(olock, olock->l_handle.h_cookie == ols->ols_handle.cookie))
107 return 0;
108
109 if (!ergo(handle_used,
110 ergo(lock && olock, lock == olock) &&
111 ergo(!lock, !olock)))
112 return 0;
113
114
115
116
117 if (!ergo(ols->ols_state == OLS_CANCELLED,
118 !olock && !handle_used))
119 return 0;
120
121
122
123
124 if (!ergo(olock && ols->ols_state < OLS_CANCELLED,
125 ((olock->l_flags & LDLM_FL_DESTROYED) == 0)))
126 return 0;
127
128 if (!ergo(ols->ols_state == OLS_GRANTED,
129 olock && olock->l_req_mode == olock->l_granted_mode &&
130 ols->ols_hold))
131 return 0;
132 return 1;
133}
134
135
136
137
138
139
140
141
142
143
144static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck)
145{
146 struct ldlm_lock *dlmlock;
147
148 spin_lock(&osc_ast_guard);
149 dlmlock = olck->ols_lock;
150 if (!dlmlock) {
151 spin_unlock(&osc_ast_guard);
152 return;
153 }
154
155 olck->ols_lock = NULL;
156
157
158
159 dlmlock->l_ast_data = NULL;
160 olck->ols_handle.cookie = 0ULL;
161 spin_unlock(&osc_ast_guard);
162
163 lock_res_and_lock(dlmlock);
164 if (dlmlock->l_granted_mode == dlmlock->l_req_mode) {
165 struct cl_object *obj = olck->ols_cl.cls_obj;
166 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
167 __u64 old_kms;
168
169 cl_object_attr_lock(obj);
170
171 old_kms = cl2osc(obj)->oo_oinfo->loi_kms;
172
173
174
175 attr->cat_kms = ldlm_extent_shift_kms(dlmlock, old_kms);
176
177 cl_object_attr_set(env, obj, attr, CAT_KMS);
178 cl_object_attr_unlock(obj);
179 }
180 unlock_res_and_lock(dlmlock);
181
182
183 LASSERT(olck->ols_has_ref);
184 lu_ref_del(&dlmlock->l_reference, "osc_lock", olck);
185 LDLM_LOCK_RELEASE(dlmlock);
186 olck->ols_has_ref = 0;
187}
188
189static int osc_lock_unhold(struct osc_lock *ols)
190{
191 int result = 0;
192
193 if (ols->ols_hold) {
194 ols->ols_hold = 0;
195 result = osc_cancel_base(&ols->ols_handle,
196 ols->ols_einfo.ei_mode);
197 }
198 return result;
199}
200
201static int osc_lock_unuse(const struct lu_env *env,
202 const struct cl_lock_slice *slice)
203{
204 struct osc_lock *ols = cl2osc_lock(slice);
205
206 LINVRNT(osc_lock_invariant(ols));
207
208 switch (ols->ols_state) {
209 case OLS_NEW:
210 LASSERT(!ols->ols_hold);
211 LASSERT(ols->ols_agl);
212 return 0;
213 case OLS_UPCALL_RECEIVED:
214 osc_lock_unhold(ols);
215 case OLS_ENQUEUED:
216 LASSERT(!ols->ols_hold);
217 osc_lock_detach(env, ols);
218 ols->ols_state = OLS_NEW;
219 return 0;
220 case OLS_GRANTED:
221 LASSERT(!ols->ols_glimpse);
222 LASSERT(ols->ols_hold);
223
224
225
226
227
228 ols->ols_state = OLS_RELEASED;
229 return osc_lock_unhold(ols);
230 default:
231 CERROR("Impossible state: %d\n", ols->ols_state);
232 LBUG();
233 }
234}
235
236static void osc_lock_fini(const struct lu_env *env,
237 struct cl_lock_slice *slice)
238{
239 struct osc_lock *ols = cl2osc_lock(slice);
240
241 LINVRNT(osc_lock_invariant(ols));
242
243
244
245
246
247
248 osc_lock_unhold(ols);
249 LASSERT(!ols->ols_lock);
250 LASSERT(atomic_read(&ols->ols_pageref) == 0 ||
251 atomic_read(&ols->ols_pageref) == _PAGEREF_MAGIC);
252
253 kmem_cache_free(osc_lock_kmem, ols);
254}
255
256static void osc_lock_build_policy(const struct lu_env *env,
257 const struct cl_lock *lock,
258 ldlm_policy_data_t *policy)
259{
260 const struct cl_lock_descr *d = &lock->cll_descr;
261
262 osc_index2policy(policy, d->cld_obj, d->cld_start, d->cld_end);
263 policy->l_extent.gid = d->cld_gid;
264}
265
266static __u64 osc_enq2ldlm_flags(__u32 enqflags)
267{
268 __u64 result = 0;
269
270 LASSERT((enqflags & ~CEF_MASK) == 0);
271
272 if (enqflags & CEF_NONBLOCK)
273 result |= LDLM_FL_BLOCK_NOWAIT;
274 if (enqflags & CEF_ASYNC)
275 result |= LDLM_FL_HAS_INTENT;
276 if (enqflags & CEF_DISCARD_DATA)
277 result |= LDLM_FL_AST_DISCARD_DATA;
278 return result;
279}
280
281
282
283
284
285spinlock_t osc_ast_guard;
286
287static struct osc_lock *osc_ast_data_get(struct ldlm_lock *dlm_lock)
288{
289 struct osc_lock *olck;
290
291 lock_res_and_lock(dlm_lock);
292 spin_lock(&osc_ast_guard);
293 olck = dlm_lock->l_ast_data;
294 if (olck) {
295 struct cl_lock *lock = olck->ols_cl.cls_lock;
296
297
298
299
300
301
302
303
304
305 if (lock->cll_state < CLS_FREEING || olck->ols_has_ref) {
306 cl_lock_get_trust(lock);
307 lu_ref_add_atomic(&lock->cll_reference,
308 "ast", current);
309 } else
310 olck = NULL;
311 }
312 spin_unlock(&osc_ast_guard);
313 unlock_res_and_lock(dlm_lock);
314 return olck;
315}
316
317static void osc_ast_data_put(const struct lu_env *env, struct osc_lock *olck)
318{
319 struct cl_lock *lock;
320
321 lock = olck->ols_cl.cls_lock;
322 lu_ref_del(&lock->cll_reference, "ast", current);
323 cl_lock_put(env, lock);
324}
325
326
327
328
329
330
331
332
333
334
335
336static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck,
337 int rc)
338{
339 struct ost_lvb *lvb;
340 struct cl_object *obj;
341 struct lov_oinfo *oinfo;
342 struct cl_attr *attr;
343 unsigned valid;
344
345 if (!(olck->ols_flags & LDLM_FL_LVB_READY))
346 return;
347
348 lvb = &olck->ols_lvb;
349 obj = olck->ols_cl.cls_obj;
350 oinfo = cl2osc(obj)->oo_oinfo;
351 attr = &osc_env_info(env)->oti_attr;
352 valid = CAT_BLOCKS | CAT_ATIME | CAT_CTIME | CAT_MTIME | CAT_SIZE;
353 cl_lvb2attr(attr, lvb);
354
355 cl_object_attr_lock(obj);
356 if (rc == 0) {
357 struct ldlm_lock *dlmlock;
358 __u64 size;
359
360 dlmlock = olck->ols_lock;
361
362
363 *lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
364 size = lvb->lvb_size;
365
366
367
368 if (size > dlmlock->l_policy_data.l_extent.end)
369 size = dlmlock->l_policy_data.l_extent.end + 1;
370 if (size >= oinfo->loi_kms) {
371 LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu, kms=%llu",
372 lvb->lvb_size, size);
373 valid |= CAT_KMS;
374 attr->cat_kms = size;
375 } else {
376 LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu; leaving kms=%llu, end=%llu",
377 lvb->lvb_size, oinfo->loi_kms,
378 dlmlock->l_policy_data.l_extent.end);
379 }
380 ldlm_lock_allow_match_locked(dlmlock);
381 } else if (rc == -ENAVAIL && olck->ols_glimpse) {
382 CDEBUG(D_INODE, "glimpsed, setting rss=%llu; leaving kms=%llu\n",
383 lvb->lvb_size, oinfo->loi_kms);
384 } else
385 valid = 0;
386
387 if (valid != 0)
388 cl_object_attr_set(env, obj, attr, valid);
389
390 cl_object_attr_unlock(obj);
391}
392
393
394
395
396
397
398
399
400static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck,
401 struct ldlm_lock *dlmlock, int rc)
402{
403 struct ldlm_extent *ext;
404 struct cl_lock *lock;
405 struct cl_lock_descr *descr;
406
407 LASSERT(dlmlock->l_granted_mode == dlmlock->l_req_mode);
408
409 if (olck->ols_state < OLS_GRANTED) {
410 lock = olck->ols_cl.cls_lock;
411 ext = &dlmlock->l_policy_data.l_extent;
412 descr = &osc_env_info(env)->oti_descr;
413 descr->cld_obj = lock->cll_descr.cld_obj;
414
415
416 descr->cld_mode = osc_ldlm2cl_lock(dlmlock->l_granted_mode);
417 descr->cld_start = cl_index(descr->cld_obj, ext->start);
418 descr->cld_end = cl_index(descr->cld_obj, ext->end);
419 descr->cld_gid = ext->gid;
420
421
422
423
424 olck->ols_state = OLS_GRANTED;
425 osc_lock_lvb_update(env, olck, rc);
426
427
428
429
430
431
432
433 unlock_res_and_lock(dlmlock);
434 cl_lock_modify(env, lock, descr);
435 cl_lock_signal(env, lock);
436 LINVRNT(osc_lock_invariant(olck));
437 lock_res_and_lock(dlmlock);
438 }
439}
440
441static void osc_lock_upcall0(const struct lu_env *env, struct osc_lock *olck)
442
443{
444 struct ldlm_lock *dlmlock;
445
446 dlmlock = ldlm_handle2lock_long(&olck->ols_handle, 0);
447 LASSERT(dlmlock);
448
449 lock_res_and_lock(dlmlock);
450 spin_lock(&osc_ast_guard);
451 LASSERT(dlmlock->l_ast_data == olck);
452 LASSERT(!olck->ols_lock);
453 olck->ols_lock = dlmlock;
454 spin_unlock(&osc_ast_guard);
455
456
457
458
459
460
461 if (dlmlock->l_granted_mode == dlmlock->l_req_mode)
462 osc_lock_granted(env, olck, dlmlock, 0);
463 unlock_res_and_lock(dlmlock);
464
465
466
467
468
469 ldlm_lock_addref(&olck->ols_handle, olck->ols_einfo.ei_mode);
470 olck->ols_hold = 1;
471
472
473
474
475 lu_ref_add(&dlmlock->l_reference, "osc_lock", olck);
476 olck->ols_has_ref = 1;
477}
478
479
480
481
482
483
484static int osc_lock_upcall(void *cookie, int errcode)
485{
486 struct osc_lock *olck = cookie;
487 struct cl_lock_slice *slice = &olck->ols_cl;
488 struct cl_lock *lock = slice->cls_lock;
489 struct lu_env *env;
490 struct cl_env_nest nest;
491
492 env = cl_env_nested_get(&nest);
493 if (!IS_ERR(env)) {
494 int rc;
495
496 cl_lock_mutex_get(env, lock);
497
498 LASSERT(lock->cll_state >= CLS_QUEUING);
499 if (olck->ols_state == OLS_ENQUEUED) {
500 olck->ols_state = OLS_UPCALL_RECEIVED;
501 rc = ldlm_error2errno(errcode);
502 } else if (olck->ols_state == OLS_CANCELLED) {
503 rc = -EIO;
504 } else {
505 CERROR("Impossible state: %d\n", olck->ols_state);
506 LBUG();
507 }
508 if (rc) {
509 struct ldlm_lock *dlmlock;
510
511 dlmlock = ldlm_handle2lock(&olck->ols_handle);
512 if (dlmlock) {
513 lock_res_and_lock(dlmlock);
514 spin_lock(&osc_ast_guard);
515 LASSERT(!olck->ols_lock);
516 dlmlock->l_ast_data = NULL;
517 olck->ols_handle.cookie = 0ULL;
518 spin_unlock(&osc_ast_guard);
519 ldlm_lock_fail_match_locked(dlmlock);
520 unlock_res_and_lock(dlmlock);
521 LDLM_LOCK_PUT(dlmlock);
522 }
523 } else {
524 if (olck->ols_glimpse)
525 olck->ols_glimpse = 0;
526 osc_lock_upcall0(env, olck);
527 }
528
529
530 if (olck->ols_locklessable && rc == -EUSERS) {
531
532
533
534 osc_object_set_contended(cl2osc(slice->cls_obj));
535 LASSERT(slice->cls_ops == &osc_lock_ops);
536
537
538 osc_lock_to_lockless(env, olck, 1);
539 olck->ols_state = OLS_GRANTED;
540 rc = 0;
541 } else if (olck->ols_glimpse && rc == -ENAVAIL) {
542 osc_lock_lvb_update(env, olck, rc);
543 cl_lock_delete(env, lock);
544
545 rc = 0;
546 }
547
548 if (rc == 0) {
549
550
551
552
553
554 if (olck->ols_agl) {
555 lock->cll_flags |= CLF_FROM_UPCALL;
556 cl_wait_try(env, lock);
557 lock->cll_flags &= ~CLF_FROM_UPCALL;
558 if (!olck->ols_glimpse)
559 olck->ols_agl = 0;
560 }
561 cl_lock_signal(env, lock);
562
563 cl_unuse_try(env, lock);
564 } else {
565
566 cl_lock_user_del(env, lock);
567 cl_lock_error(env, lock, rc);
568 }
569
570
571 cl_lock_hold_release(env, lock, "upcall", lock);
572 cl_lock_mutex_put(env, lock);
573
574 lu_ref_del(&lock->cll_reference, "upcall", lock);
575
576
577
578 cl_lock_put(env, lock);
579
580 cl_env_nested_put(&nest, env);
581 } else {
582
583 LBUG();
584 }
585 return errcode;
586}
587
588
589
590
591static void osc_lock_blocking(const struct lu_env *env,
592 struct ldlm_lock *dlmlock,
593 struct osc_lock *olck, int blocking)
594{
595 struct cl_lock *lock = olck->ols_cl.cls_lock;
596
597 LASSERT(olck->ols_lock == dlmlock);
598 CLASSERT(OLS_BLOCKED < OLS_CANCELLED);
599 LASSERT(!osc_lock_is_lockless(olck));
600
601
602
603
604
605 osc_lock_unhold(olck);
606
607 if (blocking && olck->ols_state < OLS_BLOCKED)
608
609
610
611
612
613 olck->ols_state = OLS_BLOCKED;
614
615
616
617
618
619 cl_lock_cancel(env, lock);
620 cl_lock_delete(env, lock);
621}
622
623
624
625
626
627static int osc_dlm_blocking_ast0(const struct lu_env *env,
628 struct ldlm_lock *dlmlock,
629 void *data, int flag)
630{
631 struct osc_lock *olck;
632 struct cl_lock *lock;
633 int result;
634 int cancel;
635
636 LASSERT(flag == LDLM_CB_BLOCKING || flag == LDLM_CB_CANCELING);
637
638 cancel = 0;
639 olck = osc_ast_data_get(dlmlock);
640 if (olck) {
641 lock = olck->ols_cl.cls_lock;
642 cl_lock_mutex_get(env, lock);
643 LINVRNT(osc_lock_invariant(olck));
644 if (olck->ols_ast_wait) {
645
646 cl_lock_signal(env, lock);
647 olck->ols_ast_wait = 0;
648 }
649
650
651
652
653 if (olck == dlmlock->l_ast_data) {
654
655
656
657
658
659
660
661
662
663
664
665
666
667 LASSERT(data == olck);
668 osc_lock_blocking(env, dlmlock,
669 olck, flag == LDLM_CB_BLOCKING);
670 } else
671 cancel = 1;
672 cl_lock_mutex_put(env, lock);
673 osc_ast_data_put(env, olck);
674 } else
675
676
677
678
679
680 cancel = (flag == LDLM_CB_BLOCKING);
681
682 if (cancel) {
683 struct lustre_handle *lockh;
684
685 lockh = &osc_env_info(env)->oti_handle;
686 ldlm_lock2handle(dlmlock, lockh);
687 result = ldlm_cli_cancel(lockh, LCF_ASYNC);
688 } else
689 result = 0;
690 return result;
691}
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735static int osc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
736 struct ldlm_lock_desc *new, void *data,
737 int flag)
738{
739 struct lu_env *env;
740 struct cl_env_nest nest;
741 int result;
742
743
744
745
746
747
748
749
750
751
752
753
754 env = cl_env_nested_get(&nest);
755 if (!IS_ERR(env)) {
756 result = osc_dlm_blocking_ast0(env, dlmlock, data, flag);
757 cl_env_nested_put(&nest, env);
758 } else {
759 result = PTR_ERR(env);
760
761
762
763
764
765 LBUG();
766 }
767 if (result != 0) {
768 if (result == -ENODATA)
769 result = 0;
770 else
771 CERROR("BAST failed: %d\n", result);
772 }
773 return result;
774}
775
776static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock,
777 __u64 flags, void *data)
778{
779 struct cl_env_nest nest;
780 struct lu_env *env;
781 struct osc_lock *olck;
782 struct cl_lock *lock;
783 int result;
784 int dlmrc;
785
786
787 dlmrc = ldlm_completion_ast_async(dlmlock, flags, data);
788
789 env = cl_env_nested_get(&nest);
790 if (!IS_ERR(env)) {
791 olck = osc_ast_data_get(dlmlock);
792 if (olck) {
793 lock = olck->ols_cl.cls_lock;
794 cl_lock_mutex_get(env, lock);
795
796
797
798
799 LASSERT(dlmlock->l_lvb_data);
800 lock_res_and_lock(dlmlock);
801 olck->ols_lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
802 if (!olck->ols_lock) {
803
804
805
806
807
808
809
810
811
812 } else if (dlmlock->l_granted_mode ==
813 dlmlock->l_req_mode) {
814 osc_lock_granted(env, olck, dlmlock, dlmrc);
815 }
816 unlock_res_and_lock(dlmlock);
817
818 if (dlmrc != 0) {
819 CL_LOCK_DEBUG(D_ERROR, env, lock,
820 "dlmlock returned %d\n", dlmrc);
821 cl_lock_error(env, lock, dlmrc);
822 }
823 cl_lock_mutex_put(env, lock);
824 osc_ast_data_put(env, olck);
825 result = 0;
826 } else
827 result = -ELDLM_NO_LOCK_DATA;
828 cl_env_nested_put(&nest, env);
829 } else
830 result = PTR_ERR(env);
831 return dlmrc ?: result;
832}
833
834static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
835{
836 struct ptlrpc_request *req = data;
837 struct osc_lock *olck;
838 struct cl_lock *lock;
839 struct cl_object *obj;
840 struct cl_env_nest nest;
841 struct lu_env *env;
842 struct ost_lvb *lvb;
843 struct req_capsule *cap;
844 int result;
845
846 LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
847
848 env = cl_env_nested_get(&nest);
849 if (!IS_ERR(env)) {
850
851
852
853
854
855 olck = osc_ast_data_get(dlmlock);
856 if (olck) {
857 lock = olck->ols_cl.cls_lock;
858
859
860
861
862
863
864
865 cap = &req->rq_pill;
866 req_capsule_extend(cap, &RQF_LDLM_GL_CALLBACK);
867 req_capsule_set_size(cap, &RMF_DLM_LVB, RCL_SERVER,
868 sizeof(*lvb));
869 result = req_capsule_server_pack(cap);
870 if (result == 0) {
871 lvb = req_capsule_server_get(cap, &RMF_DLM_LVB);
872 obj = lock->cll_descr.cld_obj;
873 result = cl_object_glimpse(env, obj, lvb);
874 }
875 if (!exp_connect_lvb_type(req->rq_export))
876 req_capsule_shrink(&req->rq_pill,
877 &RMF_DLM_LVB,
878 sizeof(struct ost_lvb_v1),
879 RCL_SERVER);
880 osc_ast_data_put(env, olck);
881 } else {
882
883
884
885
886
887 lustre_pack_reply(req, 1, NULL, NULL);
888 result = -ELDLM_NO_LOCK_DATA;
889 }
890 cl_env_nested_put(&nest, env);
891 } else
892 result = PTR_ERR(env);
893 req->rq_status = result;
894 return result;
895}
896
897static unsigned long osc_lock_weigh(const struct lu_env *env,
898 const struct cl_lock_slice *slice)
899{
900
901
902
903
904 return cl_object_header(slice->cls_obj)->coh_pages;
905}
906
907static void osc_lock_build_einfo(const struct lu_env *env,
908 const struct cl_lock *clock,
909 struct osc_lock *lock,
910 struct ldlm_enqueue_info *einfo)
911{
912 enum cl_lock_mode mode;
913
914 mode = clock->cll_descr.cld_mode;
915 if (mode == CLM_PHANTOM)
916
917
918
919
920
921 mode = CLM_READ;
922
923 einfo->ei_type = LDLM_EXTENT;
924 einfo->ei_mode = osc_cl_lock2ldlm(mode);
925 einfo->ei_cb_bl = osc_ldlm_blocking_ast;
926 einfo->ei_cb_cp = osc_ldlm_completion_ast;
927 einfo->ei_cb_gl = osc_ldlm_glimpse_ast;
928 einfo->ei_cbdata = lock;
929}
930
931
932
933
934
935
936
937
938
939
940
941
942
943static void osc_lock_to_lockless(const struct lu_env *env,
944 struct osc_lock *ols, int force)
945{
946 struct cl_lock_slice *slice = &ols->ols_cl;
947
948 LASSERT(ols->ols_state == OLS_NEW ||
949 ols->ols_state == OLS_UPCALL_RECEIVED);
950
951 if (force) {
952 ols->ols_locklessable = 1;
953 slice->cls_ops = &osc_lock_lockless_ops;
954 } else {
955 struct osc_io *oio = osc_env_io(env);
956 struct cl_io *io = oio->oi_cl.cis_io;
957 struct cl_object *obj = slice->cls_obj;
958 struct osc_object *oob = cl2osc(obj);
959 const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
960 struct obd_connect_data *ocd;
961
962 LASSERT(io->ci_lockreq == CILR_MANDATORY ||
963 io->ci_lockreq == CILR_MAYBE ||
964 io->ci_lockreq == CILR_NEVER);
965
966 ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
967 ols->ols_locklessable = (io->ci_type != CIT_SETATTR) &&
968 (io->ci_lockreq == CILR_MAYBE) &&
969 (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK);
970 if (io->ci_lockreq == CILR_NEVER ||
971
972 (ols->ols_locklessable && osc_object_is_contended(oob)) ||
973
974 (cl_io_is_trunc(io) &&
975 (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) &&
976 osd->od_lockless_truncate)) {
977 ols->ols_locklessable = 1;
978 slice->cls_ops = &osc_lock_lockless_ops;
979 }
980 }
981 LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
982}
983
984static int osc_lock_compatible(const struct osc_lock *qing,
985 const struct osc_lock *qed)
986{
987 enum cl_lock_mode qing_mode;
988 enum cl_lock_mode qed_mode;
989
990 qing_mode = qing->ols_cl.cls_lock->cll_descr.cld_mode;
991 if (qed->ols_glimpse &&
992 (qed->ols_state >= OLS_UPCALL_RECEIVED || qing_mode == CLM_READ))
993 return 1;
994
995 qed_mode = qed->ols_cl.cls_lock->cll_descr.cld_mode;
996 return ((qing_mode == CLM_READ) && (qed_mode == CLM_READ));
997}
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010static int osc_lock_enqueue_wait(const struct lu_env *env,
1011 const struct osc_lock *olck)
1012{
1013 struct cl_lock *lock = olck->ols_cl.cls_lock;
1014 struct cl_lock_descr *descr = &lock->cll_descr;
1015 struct cl_object_header *hdr = cl_object_header(descr->cld_obj);
1016 struct cl_lock *scan;
1017 struct cl_lock *conflict = NULL;
1018 int lockless = osc_lock_is_lockless(olck);
1019 int rc = 0;
1020
1021 LASSERT(cl_lock_is_mutexed(lock));
1022
1023
1024
1025
1026 if (olck->ols_glimpse)
1027 return 0;
1028
1029 spin_lock(&hdr->coh_lock_guard);
1030 list_for_each_entry(scan, &hdr->coh_locks, cll_linkage) {
1031 struct cl_lock_descr *cld = &scan->cll_descr;
1032 const struct osc_lock *scan_ols;
1033
1034 if (scan == lock)
1035 break;
1036
1037 if (scan->cll_state < CLS_QUEUING ||
1038 scan->cll_state == CLS_FREEING ||
1039 cld->cld_start > descr->cld_end ||
1040 cld->cld_end < descr->cld_start)
1041 continue;
1042
1043
1044
1045
1046 if (scan->cll_descr.cld_mode == CLM_GROUP) {
1047 LASSERT(descr->cld_mode != CLM_GROUP ||
1048 descr->cld_gid != scan->cll_descr.cld_gid);
1049 continue;
1050 }
1051
1052 scan_ols = osc_lock_at(scan);
1053
1054
1055
1056
1057
1058
1059
1060
1061 if (!lockless && osc_lock_compatible(olck, scan_ols))
1062 continue;
1063
1064 cl_lock_get_trust(scan);
1065 conflict = scan;
1066 break;
1067 }
1068 spin_unlock(&hdr->coh_lock_guard);
1069
1070 if (conflict) {
1071 if (lock->cll_descr.cld_mode == CLM_GROUP) {
1072
1073
1074
1075
1076 CDEBUG(D_DLMTRACE, "group lock %p is conflicted with %p, no wait, send to server\n",
1077 lock, conflict);
1078 cl_lock_put(env, conflict);
1079 rc = 0;
1080 } else {
1081 CDEBUG(D_DLMTRACE, "lock %p is conflicted with %p, will wait\n",
1082 lock, conflict);
1083 LASSERT(!lock->cll_conflict);
1084 lu_ref_add(&conflict->cll_reference, "cancel-wait",
1085 lock);
1086 lock->cll_conflict = conflict;
1087 rc = CLO_WAIT;
1088 }
1089 }
1090 return rc;
1091}
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107static int osc_lock_enqueue(const struct lu_env *env,
1108 const struct cl_lock_slice *slice,
1109 struct cl_io *unused, __u32 enqflags)
1110{
1111 struct osc_lock *ols = cl2osc_lock(slice);
1112 struct cl_lock *lock = ols->ols_cl.cls_lock;
1113 int result;
1114
1115 LASSERT(cl_lock_is_mutexed(lock));
1116 LASSERTF(ols->ols_state == OLS_NEW,
1117 "Impossible state: %d\n", ols->ols_state);
1118
1119 LASSERTF(ergo(ols->ols_glimpse, lock->cll_descr.cld_mode <= CLM_READ),
1120 "lock = %p, ols = %p\n", lock, ols);
1121
1122 result = osc_lock_enqueue_wait(env, ols);
1123 if (result == 0) {
1124 if (!osc_lock_is_lockless(ols)) {
1125 struct osc_object *obj = cl2osc(slice->cls_obj);
1126 struct osc_thread_info *info = osc_env_info(env);
1127 struct ldlm_res_id *resname = &info->oti_resname;
1128 ldlm_policy_data_t *policy = &info->oti_policy;
1129 struct ldlm_enqueue_info *einfo = &ols->ols_einfo;
1130
1131
1132
1133
1134 cl_lock_hold_add(env, lock, "upcall", lock);
1135
1136 cl_lock_user_add(env, lock);
1137 ols->ols_state = OLS_ENQUEUED;
1138
1139
1140
1141
1142
1143
1144 ostid_build_res_name(&obj->oo_oinfo->loi_oi, resname);
1145 osc_lock_build_policy(env, lock, policy);
1146 result = osc_enqueue_base(osc_export(obj), resname,
1147 &ols->ols_flags, policy,
1148 &ols->ols_lvb,
1149 obj->oo_oinfo->loi_kms_valid,
1150 osc_lock_upcall,
1151 ols, einfo, &ols->ols_handle,
1152 PTLRPCD_SET, 1, ols->ols_agl);
1153 if (result != 0) {
1154 cl_lock_user_del(env, lock);
1155 cl_lock_unhold(env, lock, "upcall", lock);
1156 if (unlikely(result == -ECANCELED)) {
1157 ols->ols_state = OLS_NEW;
1158 result = 0;
1159 }
1160 }
1161 } else {
1162 ols->ols_state = OLS_GRANTED;
1163 ols->ols_owner = osc_env_io(env);
1164 }
1165 }
1166 LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
1167 return result;
1168}
1169
1170static int osc_lock_wait(const struct lu_env *env,
1171 const struct cl_lock_slice *slice)
1172{
1173 struct osc_lock *olck = cl2osc_lock(slice);
1174 struct cl_lock *lock = olck->ols_cl.cls_lock;
1175
1176 LINVRNT(osc_lock_invariant(olck));
1177
1178 if (olck->ols_glimpse && olck->ols_state >= OLS_UPCALL_RECEIVED) {
1179 if (olck->ols_flags & LDLM_FL_LVB_READY) {
1180 return 0;
1181 } else if (olck->ols_agl) {
1182 if (lock->cll_flags & CLF_FROM_UPCALL)
1183
1184
1185
1186 return -ENAVAIL;
1187 olck->ols_state = OLS_NEW;
1188 } else {
1189 LASSERT(lock->cll_error);
1190 return lock->cll_error;
1191 }
1192 }
1193
1194 if (olck->ols_state == OLS_NEW) {
1195 int rc;
1196
1197 LASSERT(olck->ols_agl);
1198 olck->ols_agl = 0;
1199 olck->ols_flags &= ~LDLM_FL_BLOCK_NOWAIT;
1200 rc = osc_lock_enqueue(env, slice, NULL, CEF_ASYNC | CEF_MUST);
1201 if (rc != 0)
1202 return rc;
1203 else
1204 return CLO_REENQUEUED;
1205 }
1206
1207 LASSERT(equi(olck->ols_state >= OLS_UPCALL_RECEIVED &&
1208 lock->cll_error == 0, olck->ols_lock));
1209
1210 return lock->cll_error ?: olck->ols_state >= OLS_GRANTED ? 0 : CLO_WAIT;
1211}
1212
1213
1214
1215
1216
1217static int osc_lock_use(const struct lu_env *env,
1218 const struct cl_lock_slice *slice)
1219{
1220 struct osc_lock *olck = cl2osc_lock(slice);
1221 int rc;
1222
1223 LASSERT(!olck->ols_hold);
1224
1225
1226
1227
1228
1229 rc = ldlm_lock_addref_try(&olck->ols_handle, olck->ols_einfo.ei_mode);
1230 if (rc == 0) {
1231 olck->ols_hold = 1;
1232 olck->ols_state = OLS_GRANTED;
1233 } else {
1234 struct cl_lock *lock;
1235
1236
1237
1238
1239
1240
1241
1242 lock = slice->cls_lock;
1243 LASSERT(lock->cll_state == CLS_INTRANSIT);
1244 LASSERT(lock->cll_users > 0);
1245
1246
1247
1248 olck->ols_ast_wait = 1;
1249 rc = CLO_WAIT;
1250 }
1251 return rc;
1252}
1253
1254static int osc_lock_flush(struct osc_lock *ols, int discard)
1255{
1256 struct cl_lock *lock = ols->ols_cl.cls_lock;
1257 struct cl_env_nest nest;
1258 struct lu_env *env;
1259 int result = 0;
1260
1261 env = cl_env_nested_get(&nest);
1262 if (!IS_ERR(env)) {
1263 struct osc_object *obj = cl2osc(ols->ols_cl.cls_obj);
1264 struct cl_lock_descr *descr = &lock->cll_descr;
1265 int rc = 0;
1266
1267 if (descr->cld_mode >= CLM_WRITE) {
1268 result = osc_cache_writeback_range(env, obj,
1269 descr->cld_start,
1270 descr->cld_end,
1271 1, discard);
1272 LDLM_DEBUG(ols->ols_lock,
1273 "lock %p: %d pages were %s.\n", lock, result,
1274 discard ? "discarded" : "written");
1275 if (result > 0)
1276 result = 0;
1277 }
1278
1279 rc = cl_lock_discard_pages(env, lock);
1280 if (result == 0 && rc < 0)
1281 result = rc;
1282
1283 cl_env_nested_put(&nest, env);
1284 } else
1285 result = PTR_ERR(env);
1286 if (result == 0) {
1287 ols->ols_flush = 1;
1288 LINVRNT(!osc_lock_has_pages(ols));
1289 }
1290 return result;
1291}
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307static void osc_lock_cancel(const struct lu_env *env,
1308 const struct cl_lock_slice *slice)
1309{
1310 struct cl_lock *lock = slice->cls_lock;
1311 struct osc_lock *olck = cl2osc_lock(slice);
1312 struct ldlm_lock *dlmlock = olck->ols_lock;
1313 int result = 0;
1314 int discard;
1315
1316 LASSERT(cl_lock_is_mutexed(lock));
1317 LINVRNT(osc_lock_invariant(olck));
1318
1319 if (dlmlock) {
1320 int do_cancel;
1321
1322 discard = !!(dlmlock->l_flags & LDLM_FL_DISCARD_DATA);
1323 if (olck->ols_state >= OLS_GRANTED)
1324 result = osc_lock_flush(olck, discard);
1325 osc_lock_unhold(olck);
1326
1327 lock_res_and_lock(dlmlock);
1328
1329
1330
1331
1332
1333 do_cancel = (dlmlock->l_readers == 0 &&
1334 dlmlock->l_writers == 0);
1335 dlmlock->l_flags |= LDLM_FL_CBPENDING;
1336 unlock_res_and_lock(dlmlock);
1337 if (do_cancel)
1338 result = ldlm_cli_cancel(&olck->ols_handle, LCF_ASYNC);
1339 if (result < 0)
1340 CL_LOCK_DEBUG(D_ERROR, env, lock,
1341 "lock %p cancel failure with error(%d)\n",
1342 lock, result);
1343 }
1344 olck->ols_state = OLS_CANCELLED;
1345 olck->ols_flags &= ~LDLM_FL_LVB_READY;
1346 osc_lock_detach(env, olck);
1347}
1348
1349static int osc_lock_has_pages(struct osc_lock *olck)
1350{
1351 return 0;
1352}
1353
1354static void osc_lock_delete(const struct lu_env *env,
1355 const struct cl_lock_slice *slice)
1356{
1357 struct osc_lock *olck;
1358
1359 olck = cl2osc_lock(slice);
1360 if (olck->ols_glimpse) {
1361 LASSERT(!olck->ols_hold);
1362 LASSERT(!olck->ols_lock);
1363 return;
1364 }
1365
1366 LINVRNT(osc_lock_invariant(olck));
1367 LINVRNT(!osc_lock_has_pages(olck));
1368
1369 osc_lock_unhold(olck);
1370 osc_lock_detach(env, olck);
1371}
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383static void osc_lock_state(const struct lu_env *env,
1384 const struct cl_lock_slice *slice,
1385 enum cl_lock_state state)
1386{
1387 struct osc_lock *lock = cl2osc_lock(slice);
1388
1389
1390
1391
1392 LINVRNT(osc_lock_invariant(lock));
1393 if (state == CLS_HELD && slice->cls_lock->cll_state != CLS_HELD) {
1394 struct osc_io *oio = osc_env_io(env);
1395
1396 LASSERT(!lock->ols_owner);
1397 lock->ols_owner = oio;
1398 } else if (state != CLS_HELD)
1399 lock->ols_owner = NULL;
1400}
1401
1402static int osc_lock_print(const struct lu_env *env, void *cookie,
1403 lu_printer_t p, const struct cl_lock_slice *slice)
1404{
1405 struct osc_lock *lock = cl2osc_lock(slice);
1406
1407
1408
1409
1410 (*p)(env, cookie, "%p %#16llx %#llx %d %p ",
1411 lock->ols_lock, lock->ols_flags, lock->ols_handle.cookie,
1412 lock->ols_state, lock->ols_owner);
1413 osc_lvb_print(env, cookie, p, &lock->ols_lvb);
1414 return 0;
1415}
1416
1417static int osc_lock_fits_into(const struct lu_env *env,
1418 const struct cl_lock_slice *slice,
1419 const struct cl_lock_descr *need,
1420 const struct cl_io *io)
1421{
1422 struct osc_lock *ols = cl2osc_lock(slice);
1423
1424 if (need->cld_enq_flags & CEF_NEVER)
1425 return 0;
1426
1427 if (ols->ols_state >= OLS_CANCELLED)
1428 return 0;
1429
1430 if (need->cld_mode == CLM_PHANTOM) {
1431 if (ols->ols_agl)
1432 return !(ols->ols_state > OLS_RELEASED);
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447 if (ols->ols_state < OLS_GRANTED ||
1448 ols->ols_state > OLS_RELEASED)
1449 return 0;
1450 } else if (need->cld_enq_flags & CEF_MUST) {
1451
1452
1453
1454
1455
1456
1457 if (ols->ols_state < OLS_UPCALL_RECEIVED &&
1458 ols->ols_locklessable)
1459 return 0;
1460 }
1461 return 1;
1462}
1463
1464static const struct cl_lock_operations osc_lock_ops = {
1465 .clo_fini = osc_lock_fini,
1466 .clo_enqueue = osc_lock_enqueue,
1467 .clo_wait = osc_lock_wait,
1468 .clo_unuse = osc_lock_unuse,
1469 .clo_use = osc_lock_use,
1470 .clo_delete = osc_lock_delete,
1471 .clo_state = osc_lock_state,
1472 .clo_cancel = osc_lock_cancel,
1473 .clo_weigh = osc_lock_weigh,
1474 .clo_print = osc_lock_print,
1475 .clo_fits_into = osc_lock_fits_into,
1476};
1477
1478static int osc_lock_lockless_unuse(const struct lu_env *env,
1479 const struct cl_lock_slice *slice)
1480{
1481 struct osc_lock *ols = cl2osc_lock(slice);
1482 struct cl_lock *lock = slice->cls_lock;
1483
1484 LASSERT(ols->ols_state == OLS_GRANTED);
1485 LINVRNT(osc_lock_invariant(ols));
1486
1487 cl_lock_cancel(env, lock);
1488 cl_lock_delete(env, lock);
1489 return 0;
1490}
1491
1492static void osc_lock_lockless_cancel(const struct lu_env *env,
1493 const struct cl_lock_slice *slice)
1494{
1495 struct osc_lock *ols = cl2osc_lock(slice);
1496 int result;
1497
1498 result = osc_lock_flush(ols, 0);
1499 if (result)
1500 CERROR("Pages for lockless lock %p were not purged(%d)\n",
1501 ols, result);
1502 ols->ols_state = OLS_CANCELLED;
1503}
1504
1505static int osc_lock_lockless_wait(const struct lu_env *env,
1506 const struct cl_lock_slice *slice)
1507{
1508 struct osc_lock *olck = cl2osc_lock(slice);
1509 struct cl_lock *lock = olck->ols_cl.cls_lock;
1510
1511 LINVRNT(osc_lock_invariant(olck));
1512 LASSERT(olck->ols_state >= OLS_UPCALL_RECEIVED);
1513
1514 return lock->cll_error;
1515}
1516
1517static void osc_lock_lockless_state(const struct lu_env *env,
1518 const struct cl_lock_slice *slice,
1519 enum cl_lock_state state)
1520{
1521 struct osc_lock *lock = cl2osc_lock(slice);
1522
1523 LINVRNT(osc_lock_invariant(lock));
1524 if (state == CLS_HELD) {
1525 struct osc_io *oio = osc_env_io(env);
1526
1527 LASSERT(ergo(lock->ols_owner, lock->ols_owner == oio));
1528 lock->ols_owner = oio;
1529
1530
1531
1532
1533 if (cl_object_same(oio->oi_cl.cis_obj, slice->cls_obj))
1534 oio->oi_lockless = 1;
1535 }
1536}
1537
1538static int osc_lock_lockless_fits_into(const struct lu_env *env,
1539 const struct cl_lock_slice *slice,
1540 const struct cl_lock_descr *need,
1541 const struct cl_io *io)
1542{
1543 struct osc_lock *lock = cl2osc_lock(slice);
1544
1545 if (!(need->cld_enq_flags & CEF_NEVER))
1546 return 0;
1547
1548
1549 return (lock->ols_owner == osc_env_io(env));
1550}
1551
1552static const struct cl_lock_operations osc_lock_lockless_ops = {
1553 .clo_fini = osc_lock_fini,
1554 .clo_enqueue = osc_lock_enqueue,
1555 .clo_wait = osc_lock_lockless_wait,
1556 .clo_unuse = osc_lock_lockless_unuse,
1557 .clo_state = osc_lock_lockless_state,
1558 .clo_fits_into = osc_lock_lockless_fits_into,
1559 .clo_cancel = osc_lock_lockless_cancel,
1560 .clo_print = osc_lock_print
1561};
1562
1563int osc_lock_init(const struct lu_env *env,
1564 struct cl_object *obj, struct cl_lock *lock,
1565 const struct cl_io *unused)
1566{
1567 struct osc_lock *clk;
1568 int result;
1569
1570 clk = kmem_cache_zalloc(osc_lock_kmem, GFP_NOFS);
1571 if (clk) {
1572 __u32 enqflags = lock->cll_descr.cld_enq_flags;
1573
1574 osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo);
1575 atomic_set(&clk->ols_pageref, 0);
1576 clk->ols_state = OLS_NEW;
1577
1578 clk->ols_flags = osc_enq2ldlm_flags(enqflags);
1579 clk->ols_agl = !!(enqflags & CEF_AGL);
1580 if (clk->ols_agl)
1581 clk->ols_flags |= LDLM_FL_BLOCK_NOWAIT;
1582 if (clk->ols_flags & LDLM_FL_HAS_INTENT)
1583 clk->ols_glimpse = 1;
1584
1585 cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops);
1586
1587 if (!(enqflags & CEF_MUST))
1588
1589 osc_lock_to_lockless(env, clk, (enqflags & CEF_NEVER));
1590 if (clk->ols_locklessable && !(enqflags & CEF_DISCARD_DATA))
1591 clk->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
1592
1593 LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags %llx",
1594 lock, clk, clk->ols_flags);
1595
1596 result = 0;
1597 } else
1598 result = -ENOMEM;
1599 return result;
1600}
1601
1602int osc_dlm_lock_pageref(struct ldlm_lock *dlm)
1603{
1604 struct osc_lock *olock;
1605 int rc = 0;
1606
1607 spin_lock(&osc_ast_guard);
1608 olock = dlm->l_ast_data;
1609
1610
1611
1612
1613
1614 if (olock &&
1615 atomic_add_return(_PAGEREF_MAGIC,
1616 &olock->ols_pageref) != _PAGEREF_MAGIC) {
1617 atomic_sub(_PAGEREF_MAGIC, &olock->ols_pageref);
1618 rc = 1;
1619 }
1620 spin_unlock(&osc_ast_guard);
1621 return rc;
1622}
1623
1624
1625