1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41#define DEBUG_SUBSYSTEM S_OSC
42
43# include <linux/libcfs/libcfs.h>
44
45#include <lustre_fid.h>
46
47#include "osc_cl_internal.h"
48
49
50
51
52
53#define _PAGEREF_MAGIC (-10000000)
54
55
56
57
58
59
60
61static const struct cl_lock_operations osc_lock_ops;
62static const struct cl_lock_operations osc_lock_lockless_ops;
63static void osc_lock_to_lockless(const struct lu_env *env,
64 struct osc_lock *ols, int force);
65static int osc_lock_has_pages(struct osc_lock *olck);
66
67int osc_lock_is_lockless(const struct osc_lock *olck)
68{
69 return (olck->ols_cl.cls_ops == &osc_lock_lockless_ops);
70}
71
72
73
74
75
76
77static struct ldlm_lock *osc_handle_ptr(struct lustre_handle *handle)
78{
79 struct ldlm_lock *lock;
80
81 lock = ldlm_handle2lock(handle);
82 if (lock != NULL)
83 LDLM_LOCK_PUT(lock);
84 return lock;
85}
86
87
88
89
90static int osc_lock_invariant(struct osc_lock *ols)
91{
92 struct ldlm_lock *lock = osc_handle_ptr(&ols->ols_handle);
93 struct ldlm_lock *olock = ols->ols_lock;
94 int handle_used = lustre_handle_is_used(&ols->ols_handle);
95
96 if (ergo(osc_lock_is_lockless(ols),
97 ols->ols_locklessable && ols->ols_lock == NULL))
98 return 1;
99
100
101
102
103 if (! ergo(olock != NULL, handle_used))
104 return 0;
105
106 if (! ergo(olock != NULL,
107 olock->l_handle.h_cookie == ols->ols_handle.cookie))
108 return 0;
109
110 if (! ergo(handle_used,
111 ergo(lock != NULL && olock != NULL, lock == olock) &&
112 ergo(lock == NULL, olock == NULL)))
113 return 0;
114
115
116
117
118 if (! ergo(ols->ols_state == OLS_CANCELLED,
119 olock == NULL && !handle_used))
120 return 0;
121
122
123
124
125 if (! ergo(olock != NULL && ols->ols_state < OLS_CANCELLED,
126 ((olock->l_flags & LDLM_FL_DESTROYED) == 0)))
127 return 0;
128
129 if (! ergo(ols->ols_state == OLS_GRANTED,
130 olock != NULL &&
131 olock->l_req_mode == olock->l_granted_mode &&
132 ols->ols_hold))
133 return 0;
134 return 1;
135}
136
137
138
139
140
141
142
143
144
145
146static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck)
147{
148 struct ldlm_lock *dlmlock;
149
150 spin_lock(&osc_ast_guard);
151 dlmlock = olck->ols_lock;
152 if (dlmlock == NULL) {
153 spin_unlock(&osc_ast_guard);
154 return;
155 }
156
157 olck->ols_lock = NULL;
158
159
160 dlmlock->l_ast_data = NULL;
161 olck->ols_handle.cookie = 0ULL;
162 spin_unlock(&osc_ast_guard);
163
164 lock_res_and_lock(dlmlock);
165 if (dlmlock->l_granted_mode == dlmlock->l_req_mode) {
166 struct cl_object *obj = olck->ols_cl.cls_obj;
167 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
168 __u64 old_kms;
169
170 cl_object_attr_lock(obj);
171
172 old_kms = cl2osc(obj)->oo_oinfo->loi_kms;
173
174
175 attr->cat_kms = ldlm_extent_shift_kms(dlmlock, old_kms);
176
177 cl_object_attr_set(env, obj, attr, CAT_KMS);
178 cl_object_attr_unlock(obj);
179 }
180 unlock_res_and_lock(dlmlock);
181
182
183 LASSERT(olck->ols_has_ref);
184 lu_ref_del(&dlmlock->l_reference, "osc_lock", olck);
185 LDLM_LOCK_RELEASE(dlmlock);
186 olck->ols_has_ref = 0;
187}
188
189static int osc_lock_unhold(struct osc_lock *ols)
190{
191 int result = 0;
192
193 if (ols->ols_hold) {
194 ols->ols_hold = 0;
195 result = osc_cancel_base(&ols->ols_handle,
196 ols->ols_einfo.ei_mode);
197 }
198 return result;
199}
200
201static int osc_lock_unuse(const struct lu_env *env,
202 const struct cl_lock_slice *slice)
203{
204 struct osc_lock *ols = cl2osc_lock(slice);
205
206 LINVRNT(osc_lock_invariant(ols));
207
208 switch (ols->ols_state) {
209 case OLS_NEW:
210 LASSERT(!ols->ols_hold);
211 LASSERT(ols->ols_agl);
212 return 0;
213 case OLS_UPCALL_RECEIVED:
214 osc_lock_unhold(ols);
215 case OLS_ENQUEUED:
216 LASSERT(!ols->ols_hold);
217 osc_lock_detach(env, ols);
218 ols->ols_state = OLS_NEW;
219 return 0;
220 case OLS_GRANTED:
221 LASSERT(!ols->ols_glimpse);
222 LASSERT(ols->ols_hold);
223
224
225
226
227
228
229 ols->ols_state = OLS_RELEASED;
230 return osc_lock_unhold(ols);
231 default:
232 CERROR("Impossible state: %d\n", ols->ols_state);
233 LBUG();
234 }
235}
236
237static void osc_lock_fini(const struct lu_env *env,
238 struct cl_lock_slice *slice)
239{
240 struct osc_lock *ols = cl2osc_lock(slice);
241
242 LINVRNT(osc_lock_invariant(ols));
243
244
245
246
247
248
249 osc_lock_unhold(ols);
250 LASSERT(ols->ols_lock == NULL);
251 LASSERT(atomic_read(&ols->ols_pageref) == 0 ||
252 atomic_read(&ols->ols_pageref) == _PAGEREF_MAGIC);
253
254 OBD_SLAB_FREE_PTR(ols, osc_lock_kmem);
255}
256
257static void osc_lock_build_policy(const struct lu_env *env,
258 const struct cl_lock *lock,
259 ldlm_policy_data_t *policy)
260{
261 const struct cl_lock_descr *d = &lock->cll_descr;
262
263 osc_index2policy(policy, d->cld_obj, d->cld_start, d->cld_end);
264 policy->l_extent.gid = d->cld_gid;
265}
266
267static __u64 osc_enq2ldlm_flags(__u32 enqflags)
268{
269 __u64 result = 0;
270
271 LASSERT((enqflags & ~CEF_MASK) == 0);
272
273 if (enqflags & CEF_NONBLOCK)
274 result |= LDLM_FL_BLOCK_NOWAIT;
275 if (enqflags & CEF_ASYNC)
276 result |= LDLM_FL_HAS_INTENT;
277 if (enqflags & CEF_DISCARD_DATA)
278 result |= LDLM_FL_AST_DISCARD_DATA;
279 return result;
280}
281
282
283
284
285
286spinlock_t osc_ast_guard;
287
288static struct osc_lock *osc_ast_data_get(struct ldlm_lock *dlm_lock)
289{
290 struct osc_lock *olck;
291
292 lock_res_and_lock(dlm_lock);
293 spin_lock(&osc_ast_guard);
294 olck = dlm_lock->l_ast_data;
295 if (olck != NULL) {
296 struct cl_lock *lock = olck->ols_cl.cls_lock;
297
298
299
300
301
302
303
304
305
306 if (lock->cll_state < CLS_FREEING || olck->ols_has_ref) {
307 cl_lock_get_trust(lock);
308 lu_ref_add_atomic(&lock->cll_reference,
309 "ast", current);
310 } else
311 olck = NULL;
312 }
313 spin_unlock(&osc_ast_guard);
314 unlock_res_and_lock(dlm_lock);
315 return olck;
316}
317
318static void osc_ast_data_put(const struct lu_env *env, struct osc_lock *olck)
319{
320 struct cl_lock *lock;
321
322 lock = olck->ols_cl.cls_lock;
323 lu_ref_del(&lock->cll_reference, "ast", current);
324 cl_lock_put(env, lock);
325}
326
327
328
329
330
331
332
333
334
335
336
337static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck,
338 int rc)
339{
340 struct ost_lvb *lvb;
341 struct cl_object *obj;
342 struct lov_oinfo *oinfo;
343 struct cl_attr *attr;
344 unsigned valid;
345
346 if (!(olck->ols_flags & LDLM_FL_LVB_READY))
347 return;
348
349 lvb = &olck->ols_lvb;
350 obj = olck->ols_cl.cls_obj;
351 oinfo = cl2osc(obj)->oo_oinfo;
352 attr = &osc_env_info(env)->oti_attr;
353 valid = CAT_BLOCKS | CAT_ATIME | CAT_CTIME | CAT_MTIME | CAT_SIZE;
354 cl_lvb2attr(attr, lvb);
355
356 cl_object_attr_lock(obj);
357 if (rc == 0) {
358 struct ldlm_lock *dlmlock;
359 __u64 size;
360
361 dlmlock = olck->ols_lock;
362 LASSERT(dlmlock != NULL);
363
364
365 *lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
366 size = lvb->lvb_size;
367
368
369 if (size > dlmlock->l_policy_data.l_extent.end)
370 size = dlmlock->l_policy_data.l_extent.end + 1;
371 if (size >= oinfo->loi_kms) {
372 LDLM_DEBUG(dlmlock, "lock acquired, setting rss="LPU64
373 ", kms="LPU64, lvb->lvb_size, size);
374 valid |= CAT_KMS;
375 attr->cat_kms = size;
376 } else {
377 LDLM_DEBUG(dlmlock, "lock acquired, setting rss="
378 LPU64"; leaving kms="LPU64", end="LPU64,
379 lvb->lvb_size, oinfo->loi_kms,
380 dlmlock->l_policy_data.l_extent.end);
381 }
382 ldlm_lock_allow_match_locked(dlmlock);
383 } else if (rc == -ENAVAIL && olck->ols_glimpse) {
384 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
385 " kms="LPU64"\n", lvb->lvb_size, oinfo->loi_kms);
386 } else
387 valid = 0;
388
389 if (valid != 0)
390 cl_object_attr_set(env, obj, attr, valid);
391
392 cl_object_attr_unlock(obj);
393}
394
395
396
397
398
399
400
401
402static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck,
403 struct ldlm_lock *dlmlock, int rc)
404{
405 struct ldlm_extent *ext;
406 struct cl_lock *lock;
407 struct cl_lock_descr *descr;
408
409 LASSERT(dlmlock->l_granted_mode == dlmlock->l_req_mode);
410
411 if (olck->ols_state < OLS_GRANTED) {
412 lock = olck->ols_cl.cls_lock;
413 ext = &dlmlock->l_policy_data.l_extent;
414 descr = &osc_env_info(env)->oti_descr;
415 descr->cld_obj = lock->cll_descr.cld_obj;
416
417
418 descr->cld_mode = osc_ldlm2cl_lock(dlmlock->l_granted_mode);
419 descr->cld_start = cl_index(descr->cld_obj, ext->start);
420 descr->cld_end = cl_index(descr->cld_obj, ext->end);
421 descr->cld_gid = ext->gid;
422
423
424
425
426 olck->ols_state = OLS_GRANTED;
427 osc_lock_lvb_update(env, olck, rc);
428
429
430
431
432
433
434 unlock_res_and_lock(dlmlock);
435 cl_lock_modify(env, lock, descr);
436 cl_lock_signal(env, lock);
437 LINVRNT(osc_lock_invariant(olck));
438 lock_res_and_lock(dlmlock);
439 }
440}
441
442static void osc_lock_upcall0(const struct lu_env *env, struct osc_lock *olck)
443
444{
445 struct ldlm_lock *dlmlock;
446
447 dlmlock = ldlm_handle2lock_long(&olck->ols_handle, 0);
448 LASSERT(dlmlock != NULL);
449
450 lock_res_and_lock(dlmlock);
451 spin_lock(&osc_ast_guard);
452 LASSERT(dlmlock->l_ast_data == olck);
453 LASSERT(olck->ols_lock == NULL);
454 olck->ols_lock = dlmlock;
455 spin_unlock(&osc_ast_guard);
456
457
458
459
460
461
462 if (dlmlock->l_granted_mode == dlmlock->l_req_mode)
463 osc_lock_granted(env, olck, dlmlock, 0);
464 unlock_res_and_lock(dlmlock);
465
466
467
468
469
470 ldlm_lock_addref(&olck->ols_handle, olck->ols_einfo.ei_mode);
471 olck->ols_hold = 1;
472
473
474
475 lu_ref_add(&dlmlock->l_reference, "osc_lock", olck);
476 olck->ols_has_ref = 1;
477}
478
479
480
481
482
483
484static int osc_lock_upcall(void *cookie, int errcode)
485{
486 struct osc_lock *olck = cookie;
487 struct cl_lock_slice *slice = &olck->ols_cl;
488 struct cl_lock *lock = slice->cls_lock;
489 struct lu_env *env;
490 struct cl_env_nest nest;
491
492 env = cl_env_nested_get(&nest);
493 if (!IS_ERR(env)) {
494 int rc;
495
496 cl_lock_mutex_get(env, lock);
497
498 LASSERT(lock->cll_state >= CLS_QUEUING);
499 if (olck->ols_state == OLS_ENQUEUED) {
500 olck->ols_state = OLS_UPCALL_RECEIVED;
501 rc = ldlm_error2errno(errcode);
502 } else if (olck->ols_state == OLS_CANCELLED) {
503 rc = -EIO;
504 } else {
505 CERROR("Impossible state: %d\n", olck->ols_state);
506 LBUG();
507 }
508 if (rc) {
509 struct ldlm_lock *dlmlock;
510
511 dlmlock = ldlm_handle2lock(&olck->ols_handle);
512 if (dlmlock != NULL) {
513 lock_res_and_lock(dlmlock);
514 spin_lock(&osc_ast_guard);
515 LASSERT(olck->ols_lock == NULL);
516 dlmlock->l_ast_data = NULL;
517 olck->ols_handle.cookie = 0ULL;
518 spin_unlock(&osc_ast_guard);
519 ldlm_lock_fail_match_locked(dlmlock);
520 unlock_res_and_lock(dlmlock);
521 LDLM_LOCK_PUT(dlmlock);
522 }
523 } else {
524 if (olck->ols_glimpse)
525 olck->ols_glimpse = 0;
526 osc_lock_upcall0(env, olck);
527 }
528
529
530 if (olck->ols_locklessable && rc == -EUSERS) {
531
532
533
534 osc_object_set_contended(cl2osc(slice->cls_obj));
535 LASSERT(slice->cls_ops == &osc_lock_ops);
536
537
538 osc_lock_to_lockless(env, olck, 1);
539 olck->ols_state = OLS_GRANTED;
540 rc = 0;
541 } else if (olck->ols_glimpse && rc == -ENAVAIL) {
542 osc_lock_lvb_update(env, olck, rc);
543 cl_lock_delete(env, lock);
544
545 rc = 0;
546 }
547
548 if (rc == 0) {
549
550
551
552
553 if (olck->ols_agl) {
554 lock->cll_flags |= CLF_FROM_UPCALL;
555 cl_wait_try(env, lock);
556 lock->cll_flags &= ~CLF_FROM_UPCALL;
557 if (!olck->ols_glimpse)
558 olck->ols_agl = 0;
559 }
560 cl_lock_signal(env, lock);
561
562 cl_unuse_try(env, lock);
563 } else {
564
565 cl_lock_user_del(env, lock);
566 cl_lock_error(env, lock, rc);
567 }
568
569
570 cl_lock_hold_release(env, lock, "upcall", lock);
571 cl_lock_mutex_put(env, lock);
572
573 lu_ref_del(&lock->cll_reference, "upcall", lock);
574
575
576 cl_lock_put(env, lock);
577
578 cl_env_nested_put(&nest, env);
579 } else {
580
581 LBUG();
582 }
583 return errcode;
584}
585
586
587
588
589static void osc_lock_blocking(const struct lu_env *env,
590 struct ldlm_lock *dlmlock,
591 struct osc_lock *olck, int blocking)
592{
593 struct cl_lock *lock = olck->ols_cl.cls_lock;
594
595 LASSERT(olck->ols_lock == dlmlock);
596 CLASSERT(OLS_BLOCKED < OLS_CANCELLED);
597 LASSERT(!osc_lock_is_lockless(olck));
598
599
600
601
602
603 osc_lock_unhold(olck);
604
605 if (blocking && olck->ols_state < OLS_BLOCKED)
606
607
608
609
610
611 olck->ols_state = OLS_BLOCKED;
612
613
614
615
616
617 cl_lock_cancel(env, lock);
618 cl_lock_delete(env, lock);
619}
620
621
622
623
624
625static int osc_dlm_blocking_ast0(const struct lu_env *env,
626 struct ldlm_lock *dlmlock,
627 void *data, int flag)
628{
629 struct osc_lock *olck;
630 struct cl_lock *lock;
631 int result;
632 int cancel;
633
634 LASSERT(flag == LDLM_CB_BLOCKING || flag == LDLM_CB_CANCELING);
635
636 cancel = 0;
637 olck = osc_ast_data_get(dlmlock);
638 if (olck != NULL) {
639 lock = olck->ols_cl.cls_lock;
640 cl_lock_mutex_get(env, lock);
641 LINVRNT(osc_lock_invariant(olck));
642 if (olck->ols_ast_wait) {
643
644 cl_lock_signal(env, lock);
645 olck->ols_ast_wait = 0;
646 }
647
648
649
650
651 if (olck == dlmlock->l_ast_data) {
652
653
654
655
656
657
658
659
660
661
662
663
664
665 LASSERT(data == olck);
666 osc_lock_blocking(env, dlmlock,
667 olck, flag == LDLM_CB_BLOCKING);
668 } else
669 cancel = 1;
670 cl_lock_mutex_put(env, lock);
671 osc_ast_data_put(env, olck);
672 } else
673
674
675
676
677
678 cancel = (flag == LDLM_CB_BLOCKING);
679
680 if (cancel) {
681 struct lustre_handle *lockh;
682
683 lockh = &osc_env_info(env)->oti_handle;
684 ldlm_lock2handle(dlmlock, lockh);
685 result = ldlm_cli_cancel(lockh, LCF_ASYNC);
686 } else
687 result = 0;
688 return result;
689}
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733static int osc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
734 struct ldlm_lock_desc *new, void *data,
735 int flag)
736{
737 struct lu_env *env;
738 struct cl_env_nest nest;
739 int result;
740
741
742
743
744
745
746
747
748
749
750
751
752 env = cl_env_nested_get(&nest);
753 if (!IS_ERR(env)) {
754 result = osc_dlm_blocking_ast0(env, dlmlock, data, flag);
755 cl_env_nested_put(&nest, env);
756 } else {
757 result = PTR_ERR(env);
758
759
760
761
762
763 LBUG();
764 }
765 if (result != 0) {
766 if (result == -ENODATA)
767 result = 0;
768 else
769 CERROR("BAST failed: %d\n", result);
770 }
771 return result;
772}
773
774static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock,
775 __u64 flags, void *data)
776{
777 struct cl_env_nest nest;
778 struct lu_env *env;
779 struct osc_lock *olck;
780 struct cl_lock *lock;
781 int result;
782 int dlmrc;
783
784
785 dlmrc = ldlm_completion_ast_async(dlmlock, flags, data);
786
787 env = cl_env_nested_get(&nest);
788 if (!IS_ERR(env)) {
789 olck = osc_ast_data_get(dlmlock);
790 if (olck != NULL) {
791 lock = olck->ols_cl.cls_lock;
792 cl_lock_mutex_get(env, lock);
793
794
795
796
797 LASSERT(dlmlock->l_lvb_data != NULL);
798 lock_res_and_lock(dlmlock);
799 olck->ols_lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
800 if (olck->ols_lock == NULL) {
801
802
803
804
805
806
807
808
809
810 } else if (dlmlock->l_granted_mode ==
811 dlmlock->l_req_mode) {
812 osc_lock_granted(env, olck, dlmlock, dlmrc);
813 }
814 unlock_res_and_lock(dlmlock);
815
816 if (dlmrc != 0) {
817 CL_LOCK_DEBUG(D_ERROR, env, lock,
818 "dlmlock returned %d\n", dlmrc);
819 cl_lock_error(env, lock, dlmrc);
820 }
821 cl_lock_mutex_put(env, lock);
822 osc_ast_data_put(env, olck);
823 result = 0;
824 } else
825 result = -ELDLM_NO_LOCK_DATA;
826 cl_env_nested_put(&nest, env);
827 } else
828 result = PTR_ERR(env);
829 return dlmrc ?: result;
830}
831
832static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
833{
834 struct ptlrpc_request *req = data;
835 struct osc_lock *olck;
836 struct cl_lock *lock;
837 struct cl_object *obj;
838 struct cl_env_nest nest;
839 struct lu_env *env;
840 struct ost_lvb *lvb;
841 struct req_capsule *cap;
842 int result;
843
844 LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
845
846 env = cl_env_nested_get(&nest);
847 if (!IS_ERR(env)) {
848
849
850
851
852
853 olck = osc_ast_data_get(dlmlock);
854 if (olck != NULL) {
855 lock = olck->ols_cl.cls_lock;
856
857
858
859
860
861
862 cap = &req->rq_pill;
863 req_capsule_extend(cap, &RQF_LDLM_GL_CALLBACK);
864 req_capsule_set_size(cap, &RMF_DLM_LVB, RCL_SERVER,
865 sizeof(*lvb));
866 result = req_capsule_server_pack(cap);
867 if (result == 0) {
868 lvb = req_capsule_server_get(cap, &RMF_DLM_LVB);
869 obj = lock->cll_descr.cld_obj;
870 result = cl_object_glimpse(env, obj, lvb);
871 }
872 if (!exp_connect_lvb_type(req->rq_export))
873 req_capsule_shrink(&req->rq_pill,
874 &RMF_DLM_LVB,
875 sizeof(struct ost_lvb_v1),
876 RCL_SERVER);
877 osc_ast_data_put(env, olck);
878 } else {
879
880
881
882
883
884 lustre_pack_reply(req, 1, NULL, NULL);
885 result = -ELDLM_NO_LOCK_DATA;
886 }
887 cl_env_nested_put(&nest, env);
888 } else
889 result = PTR_ERR(env);
890 req->rq_status = result;
891 return result;
892}
893
894static unsigned long osc_lock_weigh(const struct lu_env *env,
895 const struct cl_lock_slice *slice)
896{
897
898
899
900
901 return cl_object_header(slice->cls_obj)->coh_pages;
902}
903
904static void osc_lock_build_einfo(const struct lu_env *env,
905 const struct cl_lock *clock,
906 struct osc_lock *lock,
907 struct ldlm_enqueue_info *einfo)
908{
909 enum cl_lock_mode mode;
910
911 mode = clock->cll_descr.cld_mode;
912 if (mode == CLM_PHANTOM)
913
914
915
916
917
918 mode = CLM_READ;
919
920 einfo->ei_type = LDLM_EXTENT;
921 einfo->ei_mode = osc_cl_lock2ldlm(mode);
922 einfo->ei_cb_bl = osc_ldlm_blocking_ast;
923 einfo->ei_cb_cp = osc_ldlm_completion_ast;
924 einfo->ei_cb_gl = osc_ldlm_glimpse_ast;
925 einfo->ei_cbdata = lock;
926}
927
928
929
930
931
932
933
934
935
936
937
938
939
940static void osc_lock_to_lockless(const struct lu_env *env,
941 struct osc_lock *ols, int force)
942{
943 struct cl_lock_slice *slice = &ols->ols_cl;
944
945 LASSERT(ols->ols_state == OLS_NEW ||
946 ols->ols_state == OLS_UPCALL_RECEIVED);
947
948 if (force) {
949 ols->ols_locklessable = 1;
950 slice->cls_ops = &osc_lock_lockless_ops;
951 } else {
952 struct osc_io *oio = osc_env_io(env);
953 struct cl_io *io = oio->oi_cl.cis_io;
954 struct cl_object *obj = slice->cls_obj;
955 struct osc_object *oob = cl2osc(obj);
956 const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
957 struct obd_connect_data *ocd;
958
959 LASSERT(io->ci_lockreq == CILR_MANDATORY ||
960 io->ci_lockreq == CILR_MAYBE ||
961 io->ci_lockreq == CILR_NEVER);
962
963 ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
964 ols->ols_locklessable = (io->ci_type != CIT_SETATTR) &&
965 (io->ci_lockreq == CILR_MAYBE) &&
966 (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK);
967 if (io->ci_lockreq == CILR_NEVER ||
968
969 (ols->ols_locklessable && osc_object_is_contended(oob)) ||
970
971 (cl_io_is_trunc(io) &&
972 (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) &&
973 osd->od_lockless_truncate)) {
974 ols->ols_locklessable = 1;
975 slice->cls_ops = &osc_lock_lockless_ops;
976 }
977 }
978 LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
979}
980
981static int osc_lock_compatible(const struct osc_lock *qing,
982 const struct osc_lock *qed)
983{
984 enum cl_lock_mode qing_mode;
985 enum cl_lock_mode qed_mode;
986
987 qing_mode = qing->ols_cl.cls_lock->cll_descr.cld_mode;
988 if (qed->ols_glimpse &&
989 (qed->ols_state >= OLS_UPCALL_RECEIVED || qing_mode == CLM_READ))
990 return 1;
991
992 qed_mode = qed->ols_cl.cls_lock->cll_descr.cld_mode;
993 return ((qing_mode == CLM_READ) && (qed_mode == CLM_READ));
994}
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007static int osc_lock_enqueue_wait(const struct lu_env *env,
1008 const struct osc_lock *olck)
1009{
1010 struct cl_lock *lock = olck->ols_cl.cls_lock;
1011 struct cl_lock_descr *descr = &lock->cll_descr;
1012 struct cl_object_header *hdr = cl_object_header(descr->cld_obj);
1013 struct cl_lock *scan;
1014 struct cl_lock *conflict= NULL;
1015 int lockless = osc_lock_is_lockless(olck);
1016 int rc = 0;
1017
1018 LASSERT(cl_lock_is_mutexed(lock));
1019
1020
1021
1022 if (olck->ols_glimpse)
1023 return 0;
1024
1025 spin_lock(&hdr->coh_lock_guard);
1026 list_for_each_entry(scan, &hdr->coh_locks, cll_linkage) {
1027 struct cl_lock_descr *cld = &scan->cll_descr;
1028 const struct osc_lock *scan_ols;
1029
1030 if (scan == lock)
1031 break;
1032
1033 if (scan->cll_state < CLS_QUEUING ||
1034 scan->cll_state == CLS_FREEING ||
1035 cld->cld_start > descr->cld_end ||
1036 cld->cld_end < descr->cld_start)
1037 continue;
1038
1039
1040
1041
1042 if (scan->cll_descr.cld_mode == CLM_GROUP) {
1043 LASSERT(descr->cld_mode != CLM_GROUP ||
1044 descr->cld_gid != scan->cll_descr.cld_gid);
1045 continue;
1046 }
1047
1048 scan_ols = osc_lock_at(scan);
1049
1050
1051
1052
1053
1054
1055
1056 if (!lockless && osc_lock_compatible(olck, scan_ols))
1057 continue;
1058
1059 cl_lock_get_trust(scan);
1060 conflict = scan;
1061 break;
1062 }
1063 spin_unlock(&hdr->coh_lock_guard);
1064
1065 if (conflict) {
1066 if (lock->cll_descr.cld_mode == CLM_GROUP) {
1067
1068
1069
1070
1071 CDEBUG(D_DLMTRACE, "group lock %p is conflicted "
1072 "with %p, no wait, send to server\n",
1073 lock, conflict);
1074 cl_lock_put(env, conflict);
1075 rc = 0;
1076 } else {
1077 CDEBUG(D_DLMTRACE, "lock %p is conflicted with %p, "
1078 "will wait\n",
1079 lock, conflict);
1080 LASSERT(lock->cll_conflict == NULL);
1081 lu_ref_add(&conflict->cll_reference, "cancel-wait",
1082 lock);
1083 lock->cll_conflict = conflict;
1084 rc = CLO_WAIT;
1085 }
1086 }
1087 return rc;
1088}
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104static int osc_lock_enqueue(const struct lu_env *env,
1105 const struct cl_lock_slice *slice,
1106 struct cl_io *unused, __u32 enqflags)
1107{
1108 struct osc_lock *ols = cl2osc_lock(slice);
1109 struct cl_lock *lock = ols->ols_cl.cls_lock;
1110 int result;
1111
1112 LASSERT(cl_lock_is_mutexed(lock));
1113 LASSERTF(ols->ols_state == OLS_NEW,
1114 "Impossible state: %d\n", ols->ols_state);
1115
1116 LASSERTF(ergo(ols->ols_glimpse, lock->cll_descr.cld_mode <= CLM_READ),
1117 "lock = %p, ols = %p\n", lock, ols);
1118
1119 result = osc_lock_enqueue_wait(env, ols);
1120 if (result == 0) {
1121 if (!osc_lock_is_lockless(ols)) {
1122 struct osc_object *obj = cl2osc(slice->cls_obj);
1123 struct osc_thread_info *info = osc_env_info(env);
1124 struct ldlm_res_id *resname = &info->oti_resname;
1125 ldlm_policy_data_t *policy = &info->oti_policy;
1126 struct ldlm_enqueue_info *einfo = &ols->ols_einfo;
1127
1128
1129
1130 cl_lock_hold_add(env, lock, "upcall", lock);
1131
1132 cl_lock_user_add(env, lock);
1133 ols->ols_state = OLS_ENQUEUED;
1134
1135
1136
1137
1138
1139
1140 ostid_build_res_name(&obj->oo_oinfo->loi_oi, resname);
1141 osc_lock_build_policy(env, lock, policy);
1142 result = osc_enqueue_base(osc_export(obj), resname,
1143 &ols->ols_flags, policy,
1144 &ols->ols_lvb,
1145 obj->oo_oinfo->loi_kms_valid,
1146 osc_lock_upcall,
1147 ols, einfo, &ols->ols_handle,
1148 PTLRPCD_SET, 1, ols->ols_agl);
1149 if (result != 0) {
1150 cl_lock_user_del(env, lock);
1151 cl_lock_unhold(env, lock, "upcall", lock);
1152 if (unlikely(result == -ECANCELED)) {
1153 ols->ols_state = OLS_NEW;
1154 result = 0;
1155 }
1156 }
1157 } else {
1158 ols->ols_state = OLS_GRANTED;
1159 ols->ols_owner = osc_env_io(env);
1160 }
1161 }
1162 LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
1163 return result;
1164}
1165
1166static int osc_lock_wait(const struct lu_env *env,
1167 const struct cl_lock_slice *slice)
1168{
1169 struct osc_lock *olck = cl2osc_lock(slice);
1170 struct cl_lock *lock = olck->ols_cl.cls_lock;
1171
1172 LINVRNT(osc_lock_invariant(olck));
1173
1174 if (olck->ols_glimpse && olck->ols_state >= OLS_UPCALL_RECEIVED) {
1175 if (olck->ols_flags & LDLM_FL_LVB_READY) {
1176 return 0;
1177 } else if (olck->ols_agl) {
1178 if (lock->cll_flags & CLF_FROM_UPCALL)
1179
1180
1181 return -ENAVAIL;
1182 else
1183 olck->ols_state = OLS_NEW;
1184 } else {
1185 LASSERT(lock->cll_error);
1186 return lock->cll_error;
1187 }
1188 }
1189
1190 if (olck->ols_state == OLS_NEW) {
1191 int rc;
1192
1193 LASSERT(olck->ols_agl);
1194 olck->ols_agl = 0;
1195 rc = osc_lock_enqueue(env, slice, NULL, CEF_ASYNC | CEF_MUST);
1196 if (rc != 0)
1197 return rc;
1198 else
1199 return CLO_REENQUEUED;
1200 }
1201
1202 LASSERT(equi(olck->ols_state >= OLS_UPCALL_RECEIVED &&
1203 lock->cll_error == 0, olck->ols_lock != NULL));
1204
1205 return lock->cll_error ?: olck->ols_state >= OLS_GRANTED ? 0 : CLO_WAIT;
1206}
1207
1208
1209
1210
1211
1212static int osc_lock_use(const struct lu_env *env,
1213 const struct cl_lock_slice *slice)
1214{
1215 struct osc_lock *olck = cl2osc_lock(slice);
1216 int rc;
1217
1218 LASSERT(!olck->ols_hold);
1219
1220
1221
1222
1223
1224 rc = ldlm_lock_addref_try(&olck->ols_handle, olck->ols_einfo.ei_mode);
1225 if (rc == 0) {
1226 olck->ols_hold = 1;
1227 olck->ols_state = OLS_GRANTED;
1228 } else {
1229 struct cl_lock *lock;
1230
1231
1232
1233
1234
1235
1236
1237 lock = slice->cls_lock;
1238 LASSERT(lock->cll_state == CLS_INTRANSIT);
1239 LASSERT(lock->cll_users > 0);
1240
1241
1242 olck->ols_ast_wait = 1;
1243 rc = CLO_WAIT;
1244 }
1245 return rc;
1246}
1247
1248static int osc_lock_flush(struct osc_lock *ols, int discard)
1249{
1250 struct cl_lock *lock = ols->ols_cl.cls_lock;
1251 struct cl_env_nest nest;
1252 struct lu_env *env;
1253 int result = 0;
1254
1255 env = cl_env_nested_get(&nest);
1256 if (!IS_ERR(env)) {
1257 struct osc_object *obj = cl2osc(ols->ols_cl.cls_obj);
1258 struct cl_lock_descr *descr = &lock->cll_descr;
1259 int rc = 0;
1260
1261 if (descr->cld_mode >= CLM_WRITE) {
1262 result = osc_cache_writeback_range(env, obj,
1263 descr->cld_start, descr->cld_end,
1264 1, discard);
1265 LDLM_DEBUG(ols->ols_lock,
1266 "lock %p: %d pages were %s.\n", lock, result,
1267 discard ? "discarded" : "written");
1268 if (result > 0)
1269 result = 0;
1270 }
1271
1272 rc = cl_lock_discard_pages(env, lock);
1273 if (result == 0 && rc < 0)
1274 result = rc;
1275
1276 cl_env_nested_put(&nest, env);
1277 } else
1278 result = PTR_ERR(env);
1279 if (result == 0) {
1280 ols->ols_flush = 1;
1281 LINVRNT(!osc_lock_has_pages(ols));
1282 }
1283 return result;
1284}
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300static void osc_lock_cancel(const struct lu_env *env,
1301 const struct cl_lock_slice *slice)
1302{
1303 struct cl_lock *lock = slice->cls_lock;
1304 struct osc_lock *olck = cl2osc_lock(slice);
1305 struct ldlm_lock *dlmlock = olck->ols_lock;
1306 int result = 0;
1307 int discard;
1308
1309 LASSERT(cl_lock_is_mutexed(lock));
1310 LINVRNT(osc_lock_invariant(olck));
1311
1312 if (dlmlock != NULL) {
1313 int do_cancel;
1314
1315 discard = !!(dlmlock->l_flags & LDLM_FL_DISCARD_DATA);
1316 if (olck->ols_state >= OLS_GRANTED)
1317 result = osc_lock_flush(olck, discard);
1318 osc_lock_unhold(olck);
1319
1320 lock_res_and_lock(dlmlock);
1321
1322
1323
1324
1325 do_cancel = (dlmlock->l_readers == 0 &&
1326 dlmlock->l_writers == 0);
1327 dlmlock->l_flags |= LDLM_FL_CBPENDING;
1328 unlock_res_and_lock(dlmlock);
1329 if (do_cancel)
1330 result = ldlm_cli_cancel(&olck->ols_handle, LCF_ASYNC);
1331 if (result < 0)
1332 CL_LOCK_DEBUG(D_ERROR, env, lock,
1333 "lock %p cancel failure with error(%d)\n",
1334 lock, result);
1335 }
1336 olck->ols_state = OLS_CANCELLED;
1337 olck->ols_flags &= ~LDLM_FL_LVB_READY;
1338 osc_lock_detach(env, olck);
1339}
1340
1341static int osc_lock_has_pages(struct osc_lock *olck)
1342{
1343 return 0;
1344}
1345
1346static void osc_lock_delete(const struct lu_env *env,
1347 const struct cl_lock_slice *slice)
1348{
1349 struct osc_lock *olck;
1350
1351 olck = cl2osc_lock(slice);
1352 if (olck->ols_glimpse) {
1353 LASSERT(!olck->ols_hold);
1354 LASSERT(!olck->ols_lock);
1355 return;
1356 }
1357
1358 LINVRNT(osc_lock_invariant(olck));
1359 LINVRNT(!osc_lock_has_pages(olck));
1360
1361 osc_lock_unhold(olck);
1362 osc_lock_detach(env, olck);
1363}
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375static void osc_lock_state(const struct lu_env *env,
1376 const struct cl_lock_slice *slice,
1377 enum cl_lock_state state)
1378{
1379 struct osc_lock *lock = cl2osc_lock(slice);
1380
1381
1382
1383
1384 LINVRNT(osc_lock_invariant(lock));
1385 if (state == CLS_HELD && slice->cls_lock->cll_state != CLS_HELD) {
1386 struct osc_io *oio = osc_env_io(env);
1387
1388 LASSERT(lock->ols_owner == NULL);
1389 lock->ols_owner = oio;
1390 } else if (state != CLS_HELD)
1391 lock->ols_owner = NULL;
1392}
1393
1394static int osc_lock_print(const struct lu_env *env, void *cookie,
1395 lu_printer_t p, const struct cl_lock_slice *slice)
1396{
1397 struct osc_lock *lock = cl2osc_lock(slice);
1398
1399
1400
1401
1402 (*p)(env, cookie, "%p %#16llx "LPX64" %d %p ",
1403 lock->ols_lock, lock->ols_flags, lock->ols_handle.cookie,
1404 lock->ols_state, lock->ols_owner);
1405 osc_lvb_print(env, cookie, p, &lock->ols_lvb);
1406 return 0;
1407}
1408
1409static int osc_lock_fits_into(const struct lu_env *env,
1410 const struct cl_lock_slice *slice,
1411 const struct cl_lock_descr *need,
1412 const struct cl_io *io)
1413{
1414 struct osc_lock *ols = cl2osc_lock(slice);
1415
1416 if (need->cld_enq_flags & CEF_NEVER)
1417 return 0;
1418
1419 if (ols->ols_state >= OLS_CANCELLED)
1420 return 0;
1421
1422 if (need->cld_mode == CLM_PHANTOM) {
1423 if (ols->ols_agl)
1424 return !(ols->ols_state > OLS_RELEASED);
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439 if (ols->ols_state < OLS_GRANTED ||
1440 ols->ols_state > OLS_RELEASED)
1441 return 0;
1442 } else if (need->cld_enq_flags & CEF_MUST) {
1443
1444
1445
1446
1447
1448
1449 if (ols->ols_state < OLS_UPCALL_RECEIVED &&
1450 ols->ols_locklessable)
1451 return 0;
1452 }
1453 return 1;
1454}
1455
1456static const struct cl_lock_operations osc_lock_ops = {
1457 .clo_fini = osc_lock_fini,
1458 .clo_enqueue = osc_lock_enqueue,
1459 .clo_wait = osc_lock_wait,
1460 .clo_unuse = osc_lock_unuse,
1461 .clo_use = osc_lock_use,
1462 .clo_delete = osc_lock_delete,
1463 .clo_state = osc_lock_state,
1464 .clo_cancel = osc_lock_cancel,
1465 .clo_weigh = osc_lock_weigh,
1466 .clo_print = osc_lock_print,
1467 .clo_fits_into = osc_lock_fits_into,
1468};
1469
1470static int osc_lock_lockless_unuse(const struct lu_env *env,
1471 const struct cl_lock_slice *slice)
1472{
1473 struct osc_lock *ols = cl2osc_lock(slice);
1474 struct cl_lock *lock = slice->cls_lock;
1475
1476 LASSERT(ols->ols_state == OLS_GRANTED);
1477 LINVRNT(osc_lock_invariant(ols));
1478
1479 cl_lock_cancel(env, lock);
1480 cl_lock_delete(env, lock);
1481 return 0;
1482}
1483
1484static void osc_lock_lockless_cancel(const struct lu_env *env,
1485 const struct cl_lock_slice *slice)
1486{
1487 struct osc_lock *ols = cl2osc_lock(slice);
1488 int result;
1489
1490 result = osc_lock_flush(ols, 0);
1491 if (result)
1492 CERROR("Pages for lockless lock %p were not purged(%d)\n",
1493 ols, result);
1494 ols->ols_state = OLS_CANCELLED;
1495}
1496
1497static int osc_lock_lockless_wait(const struct lu_env *env,
1498 const struct cl_lock_slice *slice)
1499{
1500 struct osc_lock *olck = cl2osc_lock(slice);
1501 struct cl_lock *lock = olck->ols_cl.cls_lock;
1502
1503 LINVRNT(osc_lock_invariant(olck));
1504 LASSERT(olck->ols_state >= OLS_UPCALL_RECEIVED);
1505
1506 return lock->cll_error;
1507}
1508
1509static void osc_lock_lockless_state(const struct lu_env *env,
1510 const struct cl_lock_slice *slice,
1511 enum cl_lock_state state)
1512{
1513 struct osc_lock *lock = cl2osc_lock(slice);
1514
1515 LINVRNT(osc_lock_invariant(lock));
1516 if (state == CLS_HELD) {
1517 struct osc_io *oio = osc_env_io(env);
1518
1519 LASSERT(ergo(lock->ols_owner, lock->ols_owner == oio));
1520 lock->ols_owner = oio;
1521
1522
1523
1524 if (cl_object_same(oio->oi_cl.cis_obj, slice->cls_obj))
1525 oio->oi_lockless = 1;
1526 }
1527}
1528
1529static int osc_lock_lockless_fits_into(const struct lu_env *env,
1530 const struct cl_lock_slice *slice,
1531 const struct cl_lock_descr *need,
1532 const struct cl_io *io)
1533{
1534 struct osc_lock *lock = cl2osc_lock(slice);
1535
1536 if (!(need->cld_enq_flags & CEF_NEVER))
1537 return 0;
1538
1539
1540 return (lock->ols_owner == osc_env_io(env));
1541}
1542
1543static const struct cl_lock_operations osc_lock_lockless_ops = {
1544 .clo_fini = osc_lock_fini,
1545 .clo_enqueue = osc_lock_enqueue,
1546 .clo_wait = osc_lock_lockless_wait,
1547 .clo_unuse = osc_lock_lockless_unuse,
1548 .clo_state = osc_lock_lockless_state,
1549 .clo_fits_into = osc_lock_lockless_fits_into,
1550 .clo_cancel = osc_lock_lockless_cancel,
1551 .clo_print = osc_lock_print
1552};
1553
1554int osc_lock_init(const struct lu_env *env,
1555 struct cl_object *obj, struct cl_lock *lock,
1556 const struct cl_io *unused)
1557{
1558 struct osc_lock *clk;
1559 int result;
1560
1561 OBD_SLAB_ALLOC_PTR_GFP(clk, osc_lock_kmem, __GFP_IO);
1562 if (clk != NULL) {
1563 __u32 enqflags = lock->cll_descr.cld_enq_flags;
1564
1565 osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo);
1566 atomic_set(&clk->ols_pageref, 0);
1567 clk->ols_state = OLS_NEW;
1568
1569 clk->ols_flags = osc_enq2ldlm_flags(enqflags);
1570 clk->ols_agl = !!(enqflags & CEF_AGL);
1571 if (clk->ols_agl)
1572 clk->ols_flags |= LDLM_FL_BLOCK_NOWAIT;
1573 if (clk->ols_flags & LDLM_FL_HAS_INTENT)
1574 clk->ols_glimpse = 1;
1575
1576 cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops);
1577
1578 if (!(enqflags & CEF_MUST))
1579
1580 osc_lock_to_lockless(env, clk, (enqflags & CEF_NEVER));
1581 if (clk->ols_locklessable && !(enqflags & CEF_DISCARD_DATA))
1582 clk->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
1583
1584 LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags %llx\n",
1585 lock, clk, clk->ols_flags);
1586
1587 result = 0;
1588 } else
1589 result = -ENOMEM;
1590 return result;
1591}
1592
1593int osc_dlm_lock_pageref(struct ldlm_lock *dlm)
1594{
1595 struct osc_lock *olock;
1596 int rc = 0;
1597
1598 spin_lock(&osc_ast_guard);
1599 olock = dlm->l_ast_data;
1600
1601
1602
1603
1604
1605 if (olock != NULL &&
1606 atomic_add_return(_PAGEREF_MAGIC,
1607 &olock->ols_pageref) != _PAGEREF_MAGIC) {
1608 atomic_sub(_PAGEREF_MAGIC, &olock->ols_pageref);
1609 rc = 1;
1610 }
1611 spin_unlock(&osc_ast_guard);
1612 return rc;
1613}
1614
1615
1616