1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41#define DEBUG_SUBSYSTEM S_LOV
42
43#include "lov_cl_internal.h"
44
45
46
47
48
49static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
50 struct cl_lock *parent);
51
52static int lov_lock_unuse(const struct lu_env *env,
53 const struct cl_lock_slice *slice);
54
55
56
57
58
59
60static struct lov_sublock_env *lov_sublock_env_get(const struct lu_env *env,
61 struct cl_lock *parent,
62 struct lov_lock_sub *lls)
63{
64 struct lov_sublock_env *subenv;
65 struct lov_io *lio = lov_env_io(env);
66 struct cl_io *io = lio->lis_cl.cis_io;
67 struct lov_io_sub *sub;
68
69 subenv = &lov_env_session(env)->ls_subenv;
70
71
72
73
74
75
76
77
78
79
80 if (!io || !cl_object_same(io->ci_obj, parent->cll_descr.cld_obj)) {
81 subenv->lse_env = env;
82 subenv->lse_io = io;
83 subenv->lse_sub = NULL;
84 } else {
85 sub = lov_sub_get(env, lio, lls->sub_stripe);
86 if (!IS_ERR(sub)) {
87 subenv->lse_env = sub->sub_env;
88 subenv->lse_io = sub->sub_io;
89 subenv->lse_sub = sub;
90 } else {
91 subenv = (void*)sub;
92 }
93 }
94 return subenv;
95}
96
97static void lov_sublock_env_put(struct lov_sublock_env *subenv)
98{
99 if (subenv && subenv->lse_sub)
100 lov_sub_put(subenv->lse_sub);
101}
102
103static void lov_sublock_adopt(const struct lu_env *env, struct lov_lock *lck,
104 struct cl_lock *sublock, int idx,
105 struct lov_lock_link *link)
106{
107 struct lovsub_lock *lsl;
108 struct cl_lock *parent = lck->lls_cl.cls_lock;
109 int rc;
110
111 LASSERT(cl_lock_is_mutexed(parent));
112 LASSERT(cl_lock_is_mutexed(sublock));
113 ENTRY;
114
115 lsl = cl2sub_lock(sublock);
116
117
118
119 LASSERT(lov_lock_link_find(env, lck, lsl) == NULL);
120 LASSERT(idx < lck->lls_nr);
121
122 lck->lls_sub[idx].sub_lock = lsl;
123 lck->lls_nr_filled++;
124 LASSERT(lck->lls_nr_filled <= lck->lls_nr);
125 list_add_tail(&link->lll_list, &lsl->lss_parents);
126 link->lll_idx = idx;
127 link->lll_super = lck;
128 cl_lock_get(parent);
129 lu_ref_add(&parent->cll_reference, "lov-child", sublock);
130 lck->lls_sub[idx].sub_flags |= LSF_HELD;
131 cl_lock_user_add(env, sublock);
132
133 rc = lov_sublock_modify(env, lck, lsl, &sublock->cll_descr, idx);
134 LASSERT(rc == 0);
135 EXIT;
136}
137
138static struct cl_lock *lov_sublock_alloc(const struct lu_env *env,
139 const struct cl_io *io,
140 struct lov_lock *lck,
141 int idx, struct lov_lock_link **out)
142{
143 struct cl_lock *sublock;
144 struct cl_lock *parent;
145 struct lov_lock_link *link;
146
147 LASSERT(idx < lck->lls_nr);
148 ENTRY;
149
150 OBD_SLAB_ALLOC_PTR_GFP(link, lov_lock_link_kmem, __GFP_IO);
151 if (link != NULL) {
152 struct lov_sublock_env *subenv;
153 struct lov_lock_sub *lls;
154 struct cl_lock_descr *descr;
155
156 parent = lck->lls_cl.cls_lock;
157 lls = &lck->lls_sub[idx];
158 descr = &lls->sub_got;
159
160 subenv = lov_sublock_env_get(env, parent, lls);
161 if (!IS_ERR(subenv)) {
162
163
164
165
166
167
168 sublock = cl_lock_hold(subenv->lse_env, subenv->lse_io,
169 descr, "lov-parent", parent);
170 lov_sublock_env_put(subenv);
171 } else {
172
173 sublock = (void*)subenv;
174 }
175
176 if (!IS_ERR(sublock))
177 *out = link;
178 else
179 OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
180 } else
181 sublock = ERR_PTR(-ENOMEM);
182 RETURN(sublock);
183}
184
185static void lov_sublock_unlock(const struct lu_env *env,
186 struct lovsub_lock *lsl,
187 struct cl_lock_closure *closure,
188 struct lov_sublock_env *subenv)
189{
190 ENTRY;
191 lov_sublock_env_put(subenv);
192 lsl->lss_active = NULL;
193 cl_lock_disclosure(env, closure);
194 EXIT;
195}
196
197static int lov_sublock_lock(const struct lu_env *env,
198 struct lov_lock *lck,
199 struct lov_lock_sub *lls,
200 struct cl_lock_closure *closure,
201 struct lov_sublock_env **lsep)
202{
203 struct lovsub_lock *sublock;
204 struct cl_lock *child;
205 int result = 0;
206 ENTRY;
207
208 LASSERT(list_empty(&closure->clc_list));
209
210 sublock = lls->sub_lock;
211 child = sublock->lss_cl.cls_lock;
212 result = cl_lock_closure_build(env, child, closure);
213 if (result == 0) {
214 struct cl_lock *parent = closure->clc_origin;
215
216 LASSERT(cl_lock_is_mutexed(child));
217 sublock->lss_active = parent;
218
219 if (unlikely((child->cll_state == CLS_FREEING) ||
220 (child->cll_flags & CLF_CANCELLED))) {
221 struct lov_lock_link *link;
222
223
224
225
226 LASSERT(!(lls->sub_flags & LSF_HELD));
227
228 link = lov_lock_link_find(env, lck, sublock);
229 LASSERT(link != NULL);
230 lov_lock_unlink(env, link, sublock);
231 lov_sublock_unlock(env, sublock, closure, NULL);
232 lck->lls_cancel_race = 1;
233 result = CLO_REPEAT;
234 } else if (lsep) {
235 struct lov_sublock_env *subenv;
236 subenv = lov_sublock_env_get(env, parent, lls);
237 if (IS_ERR(subenv)) {
238 lov_sublock_unlock(env, sublock,
239 closure, NULL);
240 result = PTR_ERR(subenv);
241 } else {
242 *lsep = subenv;
243 }
244 }
245 }
246 RETURN(result);
247}
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265static int lov_subresult(int result, int rc)
266{
267 int result_rank;
268 int rc_rank;
269
270 ENTRY;
271
272 LASSERTF(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT,
273 "result = %d", result);
274 LASSERTF(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT,
275 "rc = %d\n", rc);
276 CLASSERT(CLO_WAIT < CLO_REPEAT);
277
278
279 result_rank = result < 0 ? 1 + CLO_REPEAT : result;
280 rc_rank = rc < 0 ? 1 + CLO_REPEAT : rc;
281
282 if (result_rank < rc_rank)
283 result = rc;
284 RETURN(result);
285}
286
287
288
289
290
291
292
293
294
295static int lov_lock_sub_init(const struct lu_env *env,
296 struct lov_lock *lck, const struct cl_io *io)
297{
298 int result = 0;
299 int i;
300 int nr;
301 obd_off start;
302 obd_off end;
303 obd_off file_start;
304 obd_off file_end;
305
306 struct lov_object *loo = cl2lov(lck->lls_cl.cls_obj);
307 struct lov_layout_raid0 *r0 = lov_r0(loo);
308 struct cl_lock *parent = lck->lls_cl.cls_lock;
309
310 ENTRY;
311
312 lck->lls_orig = parent->cll_descr;
313 file_start = cl_offset(lov2cl(loo), parent->cll_descr.cld_start);
314 file_end = cl_offset(lov2cl(loo), parent->cll_descr.cld_end + 1) - 1;
315
316 for (i = 0, nr = 0; i < r0->lo_nr; i++) {
317
318
319
320
321 if (lov_stripe_intersects(loo->lo_lsm, i,
322 file_start, file_end, &start, &end))
323 nr++;
324 }
325 LASSERT(nr > 0);
326 OBD_ALLOC_LARGE(lck->lls_sub, nr * sizeof lck->lls_sub[0]);
327 if (lck->lls_sub == NULL)
328 RETURN(-ENOMEM);
329
330 lck->lls_nr = nr;
331
332
333
334
335
336
337
338 for (i = 0, nr = 0; i < r0->lo_nr; ++i) {
339 if (lov_stripe_intersects(loo->lo_lsm, i,
340 file_start, file_end, &start, &end)) {
341 struct cl_lock_descr *descr;
342
343 descr = &lck->lls_sub[nr].sub_descr;
344
345 LASSERT(descr->cld_obj == NULL);
346 descr->cld_obj = lovsub2cl(r0->lo_sub[i]);
347 descr->cld_start = cl_index(descr->cld_obj, start);
348 descr->cld_end = cl_index(descr->cld_obj, end);
349 descr->cld_mode = parent->cll_descr.cld_mode;
350 descr->cld_gid = parent->cll_descr.cld_gid;
351 descr->cld_enq_flags = parent->cll_descr.cld_enq_flags;
352
353 lck->lls_sub[nr].sub_got = *descr;
354 lck->lls_sub[nr].sub_stripe = i;
355 nr++;
356 }
357 }
358 LASSERT(nr == lck->lls_nr);
359
360
361
362
363 for (i = 0; i < lck->lls_nr; ++i) {
364 struct cl_lock *sublock;
365 struct lov_lock_link *link;
366
367 if (lck->lls_sub[i].sub_lock == NULL) {
368 sublock = lov_sublock_alloc(env, io, lck, i, &link);
369 if (IS_ERR(sublock)) {
370 result = PTR_ERR(sublock);
371 break;
372 }
373 cl_lock_get_trust(sublock);
374 cl_lock_mutex_get(env, sublock);
375 cl_lock_mutex_get(env, parent);
376
377
378
379
380 if (lck->lls_sub[i].sub_lock == NULL &&
381 parent->cll_state < CLS_FREEING) {
382 lov_sublock_adopt(env, lck, sublock, i, link);
383 cl_lock_mutex_put(env, parent);
384 } else {
385 OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
386 cl_lock_mutex_put(env, parent);
387 cl_lock_unhold(env, sublock,
388 "lov-parent", parent);
389 }
390 cl_lock_mutex_put(env, sublock);
391 cl_lock_put(env, sublock);
392 }
393 }
394
395
396
397
398
399 RETURN(result);
400}
401
402static int lov_sublock_release(const struct lu_env *env, struct lov_lock *lck,
403 int i, int deluser, int rc)
404{
405 struct cl_lock *parent = lck->lls_cl.cls_lock;
406
407 LASSERT(cl_lock_is_mutexed(parent));
408 ENTRY;
409
410 if (lck->lls_sub[i].sub_flags & LSF_HELD) {
411 struct cl_lock *sublock;
412 int dying;
413
414 LASSERT(lck->lls_sub[i].sub_lock != NULL);
415 sublock = lck->lls_sub[i].sub_lock->lss_cl.cls_lock;
416 LASSERT(cl_lock_is_mutexed(sublock));
417
418 lck->lls_sub[i].sub_flags &= ~LSF_HELD;
419 if (deluser)
420 cl_lock_user_del(env, sublock);
421
422
423
424
425
426 dying = (sublock->cll_descr.cld_mode == CLM_PHANTOM ||
427 sublock->cll_descr.cld_mode == CLM_GROUP ||
428 (sublock->cll_flags & (CLF_CANCELPEND|CLF_DOOMED))) &&
429 sublock->cll_holds == 1;
430 if (dying)
431 cl_lock_mutex_put(env, parent);
432 cl_lock_unhold(env, sublock, "lov-parent", parent);
433 if (dying) {
434 cl_lock_mutex_get(env, parent);
435 rc = lov_subresult(rc, CLO_REPEAT);
436 }
437
438
439
440
441
442
443
444 }
445 RETURN(rc);
446}
447
448static void lov_sublock_hold(const struct lu_env *env, struct lov_lock *lck,
449 int i)
450{
451 struct cl_lock *parent = lck->lls_cl.cls_lock;
452
453 LASSERT(cl_lock_is_mutexed(parent));
454 ENTRY;
455
456 if (!(lck->lls_sub[i].sub_flags & LSF_HELD)) {
457 struct cl_lock *sublock;
458
459 LASSERT(lck->lls_sub[i].sub_lock != NULL);
460 sublock = lck->lls_sub[i].sub_lock->lss_cl.cls_lock;
461 LASSERT(cl_lock_is_mutexed(sublock));
462 LASSERT(sublock->cll_state != CLS_FREEING);
463
464 lck->lls_sub[i].sub_flags |= LSF_HELD;
465
466 cl_lock_get_trust(sublock);
467 cl_lock_hold_add(env, sublock, "lov-parent", parent);
468 cl_lock_user_add(env, sublock);
469 cl_lock_put(env, sublock);
470 }
471 EXIT;
472}
473
474static void lov_lock_fini(const struct lu_env *env,
475 struct cl_lock_slice *slice)
476{
477 struct lov_lock *lck;
478 int i;
479
480 ENTRY;
481 lck = cl2lov_lock(slice);
482 LASSERT(lck->lls_nr_filled == 0);
483 if (lck->lls_sub != NULL) {
484 for (i = 0; i < lck->lls_nr; ++i)
485
486
487
488
489 LASSERT(lck->lls_sub[i].sub_lock == NULL);
490 OBD_FREE_LARGE(lck->lls_sub,
491 lck->lls_nr * sizeof lck->lls_sub[0]);
492 }
493 OBD_SLAB_FREE_PTR(lck, lov_lock_kmem);
494 EXIT;
495}
496
497static int lov_lock_enqueue_wait(const struct lu_env *env,
498 struct lov_lock *lck,
499 struct cl_lock *sublock)
500{
501 struct cl_lock *lock = lck->lls_cl.cls_lock;
502 int result;
503 ENTRY;
504
505 LASSERT(cl_lock_is_mutexed(lock));
506
507 cl_lock_mutex_put(env, lock);
508 result = cl_lock_enqueue_wait(env, sublock, 0);
509 cl_lock_mutex_get(env, lock);
510 RETURN(result ?: CLO_REPEAT);
511}
512
513
514
515
516
517
518
519
520static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck,
521 struct cl_lock *sublock,
522 struct cl_io *io, __u32 enqflags, int last)
523{
524 int result;
525 ENTRY;
526
527
528 result = cl_enqueue_try(env, sublock, io, enqflags);
529 if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL)) {
530
531
532 result = cl_wait_try(env, sublock);
533 if (result == CLO_REENQUEUED)
534 result = CLO_WAIT;
535 }
536
537
538
539
540
541 if ((result == CLO_WAIT) && (sublock->cll_state <= CLS_HELD) &&
542 (enqflags & CEF_ASYNC) && (!last || (enqflags & CEF_AGL)))
543 result = 0;
544 RETURN(result);
545}
546
547
548
549
550static int lov_sublock_fill(const struct lu_env *env, struct cl_lock *parent,
551 struct cl_io *io, struct lov_lock *lck, int idx)
552{
553 struct lov_lock_link *link;
554 struct cl_lock *sublock;
555 int result;
556
557 LASSERT(parent->cll_depth == 1);
558 cl_lock_mutex_put(env, parent);
559 sublock = lov_sublock_alloc(env, io, lck, idx, &link);
560 if (!IS_ERR(sublock))
561 cl_lock_mutex_get(env, sublock);
562 cl_lock_mutex_get(env, parent);
563
564 if (!IS_ERR(sublock)) {
565 cl_lock_get_trust(sublock);
566 if (parent->cll_state == CLS_QUEUING &&
567 lck->lls_sub[idx].sub_lock == NULL) {
568 lov_sublock_adopt(env, lck, sublock, idx, link);
569 } else {
570 OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
571
572
573 cl_lock_mutex_put(env, parent);
574 cl_lock_unhold(env, sublock, "lov-parent", parent);
575 cl_lock_mutex_get(env, parent);
576 }
577 cl_lock_mutex_put(env, sublock);
578 cl_lock_put(env, sublock);
579 result = CLO_REPEAT;
580 } else
581 result = PTR_ERR(sublock);
582 return result;
583}
584
585
586
587
588
589
590
591
592static int lov_lock_enqueue(const struct lu_env *env,
593 const struct cl_lock_slice *slice,
594 struct cl_io *io, __u32 enqflags)
595{
596 struct cl_lock *lock = slice->cls_lock;
597 struct lov_lock *lck = cl2lov_lock(slice);
598 struct cl_lock_closure *closure = lov_closure_get(env, lock);
599 int i;
600 int result;
601 enum cl_lock_state minstate;
602
603 ENTRY;
604
605 for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
606 int rc;
607 struct lovsub_lock *sub;
608 struct lov_lock_sub *lls;
609 struct cl_lock *sublock;
610 struct lov_sublock_env *subenv;
611
612 if (lock->cll_state != CLS_QUEUING) {
613
614
615
616
617
618 LASSERT(i > 0 && result != 0);
619 break;
620 }
621
622 lls = &lck->lls_sub[i];
623 sub = lls->sub_lock;
624
625
626
627
628 if (sub == NULL) {
629 result = lov_sublock_fill(env, lock, io, lck, i);
630
631
632 break;
633 }
634 sublock = sub->lss_cl.cls_lock;
635 rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
636 if (rc == 0) {
637 lov_sublock_hold(env, lck, i);
638 rc = lov_lock_enqueue_one(subenv->lse_env, lck, sublock,
639 subenv->lse_io, enqflags,
640 i == lck->lls_nr - 1);
641 minstate = min(minstate, sublock->cll_state);
642 if (rc == CLO_WAIT) {
643 switch (sublock->cll_state) {
644 case CLS_QUEUING:
645
646
647
648 cl_lock_mutex_get(env, sublock);
649 lov_sublock_unlock(env, sub, closure,
650 subenv);
651 rc = lov_lock_enqueue_wait(env, lck,
652 sublock);
653 break;
654 case CLS_CACHED:
655 cl_lock_get(sublock);
656
657 cl_lock_mutex_get(env, sublock);
658
659
660 lov_sublock_unlock(env, sub, closure,
661 subenv);
662
663 rc = lov_sublock_release(env, lck, i,
664 1, rc);
665 cl_lock_mutex_put(env, sublock);
666 cl_lock_put(env, sublock);
667 break;
668 default:
669 lov_sublock_unlock(env, sub, closure,
670 subenv);
671 break;
672 }
673 } else {
674 LASSERT(sublock->cll_conflict == NULL);
675 lov_sublock_unlock(env, sub, closure, subenv);
676 }
677 }
678 result = lov_subresult(result, rc);
679 if (result != 0)
680 break;
681 }
682 cl_lock_closure_fini(closure);
683 RETURN(result ?: minstate >= CLS_ENQUEUED ? 0 : CLO_WAIT);
684}
685
686static int lov_lock_unuse(const struct lu_env *env,
687 const struct cl_lock_slice *slice)
688{
689 struct lov_lock *lck = cl2lov_lock(slice);
690 struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
691 int i;
692 int result;
693
694 ENTRY;
695
696 for (result = 0, i = 0; i < lck->lls_nr; ++i) {
697 int rc;
698 struct lovsub_lock *sub;
699 struct cl_lock *sublock;
700 struct lov_lock_sub *lls;
701 struct lov_sublock_env *subenv;
702
703
704
705
706 LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
707 lls = &lck->lls_sub[i];
708 sub = lls->sub_lock;
709 if (sub == NULL)
710 continue;
711
712 sublock = sub->lss_cl.cls_lock;
713 rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
714 if (rc == 0) {
715 if (lls->sub_flags & LSF_HELD) {
716 LASSERT(sublock->cll_state == CLS_HELD ||
717 sublock->cll_state == CLS_ENQUEUED);
718 rc = cl_unuse_try(subenv->lse_env, sublock);
719 rc = lov_sublock_release(env, lck, i, 0, rc);
720 }
721 lov_sublock_unlock(env, sub, closure, subenv);
722 }
723 result = lov_subresult(result, rc);
724 }
725
726 if (result == 0 && lck->lls_cancel_race) {
727 lck->lls_cancel_race = 0;
728 result = -ESTALE;
729 }
730 cl_lock_closure_fini(closure);
731 RETURN(result);
732}
733
734
735static void lov_lock_cancel(const struct lu_env *env,
736 const struct cl_lock_slice *slice)
737{
738 struct lov_lock *lck = cl2lov_lock(slice);
739 struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
740 int i;
741 int result;
742
743 ENTRY;
744
745 for (result = 0, i = 0; i < lck->lls_nr; ++i) {
746 int rc;
747 struct lovsub_lock *sub;
748 struct cl_lock *sublock;
749 struct lov_lock_sub *lls;
750 struct lov_sublock_env *subenv;
751
752
753
754
755 lls = &lck->lls_sub[i];
756 sub = lls->sub_lock;
757 if (sub == NULL)
758 continue;
759
760 sublock = sub->lss_cl.cls_lock;
761 rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
762 if (rc == 0) {
763 if (!(lls->sub_flags & LSF_HELD)) {
764 lov_sublock_unlock(env, sub, closure, subenv);
765 continue;
766 }
767
768 switch(sublock->cll_state) {
769 case CLS_HELD:
770 rc = cl_unuse_try(subenv->lse_env, sublock);
771 lov_sublock_release(env, lck, i, 0, 0);
772 break;
773 default:
774 lov_sublock_release(env, lck, i, 1, 0);
775 break;
776 }
777 lov_sublock_unlock(env, sub, closure, subenv);
778 }
779
780 if (rc == CLO_REPEAT) {
781 --i;
782 continue;
783 }
784
785 result = lov_subresult(result, rc);
786 }
787
788 if (result)
789 CL_LOCK_DEBUG(D_ERROR, env, slice->cls_lock,
790 "lov_lock_cancel fails with %d.\n", result);
791
792 cl_lock_closure_fini(closure);
793}
794
795static int lov_lock_wait(const struct lu_env *env,
796 const struct cl_lock_slice *slice)
797{
798 struct lov_lock *lck = cl2lov_lock(slice);
799 struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
800 enum cl_lock_state minstate;
801 int reenqueued;
802 int result;
803 int i;
804
805 ENTRY;
806
807again:
808 for (result = 0, minstate = CLS_FREEING, i = 0, reenqueued = 0;
809 i < lck->lls_nr; ++i) {
810 int rc;
811 struct lovsub_lock *sub;
812 struct cl_lock *sublock;
813 struct lov_lock_sub *lls;
814 struct lov_sublock_env *subenv;
815
816 lls = &lck->lls_sub[i];
817 sub = lls->sub_lock;
818 LASSERT(sub != NULL);
819 sublock = sub->lss_cl.cls_lock;
820 rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
821 if (rc == 0) {
822 LASSERT(sublock->cll_state >= CLS_ENQUEUED);
823 if (sublock->cll_state < CLS_HELD)
824 rc = cl_wait_try(env, sublock);
825
826 minstate = min(minstate, sublock->cll_state);
827 lov_sublock_unlock(env, sub, closure, subenv);
828 }
829 if (rc == CLO_REENQUEUED) {
830 reenqueued++;
831 rc = 0;
832 }
833 result = lov_subresult(result, rc);
834 if (result != 0)
835 break;
836 }
837
838
839 if (result == 0 && reenqueued != 0)
840 goto again;
841 cl_lock_closure_fini(closure);
842 RETURN(result ?: minstate >= CLS_HELD ? 0 : CLO_WAIT);
843}
844
845static int lov_lock_use(const struct lu_env *env,
846 const struct cl_lock_slice *slice)
847{
848 struct lov_lock *lck = cl2lov_lock(slice);
849 struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
850 int result;
851 int i;
852
853 LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
854 ENTRY;
855
856 for (result = 0, i = 0; i < lck->lls_nr; ++i) {
857 int rc;
858 struct lovsub_lock *sub;
859 struct cl_lock *sublock;
860 struct lov_lock_sub *lls;
861 struct lov_sublock_env *subenv;
862
863 LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
864
865 lls = &lck->lls_sub[i];
866 sub = lls->sub_lock;
867 if (sub == NULL) {
868
869
870
871
872 result = -ESTALE;
873 break;
874 }
875
876 sublock = sub->lss_cl.cls_lock;
877 rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
878 if (rc == 0) {
879 LASSERT(sublock->cll_state != CLS_FREEING);
880 lov_sublock_hold(env, lck, i);
881 if (sublock->cll_state == CLS_CACHED) {
882 rc = cl_use_try(subenv->lse_env, sublock, 0);
883 if (rc != 0)
884 rc = lov_sublock_release(env, lck,
885 i, 1, rc);
886 } else if (sublock->cll_state == CLS_NEW) {
887
888
889 result = -ESTALE;
890 lov_sublock_release(env, lck, i, 1, result);
891 }
892 lov_sublock_unlock(env, sub, closure, subenv);
893 }
894 result = lov_subresult(result, rc);
895 if (result != 0)
896 break;
897 }
898
899 if (lck->lls_cancel_race) {
900
901
902
903
904
905
906 lck->lls_cancel_race = 0;
907 LASSERT(result != 0);
908 result = -ESTALE;
909 }
910 cl_lock_closure_fini(closure);
911 RETURN(result);
912}
913
914#if 0
915static int lock_lock_multi_match()
916{
917 struct cl_lock *lock = slice->cls_lock;
918 struct cl_lock_descr *subneed = &lov_env_info(env)->lti_ldescr;
919 struct lov_object *loo = cl2lov(lov->lls_cl.cls_obj);
920 struct lov_layout_raid0 *r0 = lov_r0(loo);
921 struct lov_lock_sub *sub;
922 struct cl_object *subobj;
923 obd_off fstart;
924 obd_off fend;
925 obd_off start;
926 obd_off end;
927 int i;
928
929 fstart = cl_offset(need->cld_obj, need->cld_start);
930 fend = cl_offset(need->cld_obj, need->cld_end + 1) - 1;
931 subneed->cld_mode = need->cld_mode;
932 cl_lock_mutex_get(env, lock);
933 for (i = 0; i < lov->lls_nr; ++i) {
934 sub = &lov->lls_sub[i];
935 if (sub->sub_lock == NULL)
936 continue;
937 subobj = sub->sub_descr.cld_obj;
938 if (!lov_stripe_intersects(loo->lo_lsm, sub->sub_stripe,
939 fstart, fend, &start, &end))
940 continue;
941 subneed->cld_start = cl_index(subobj, start);
942 subneed->cld_end = cl_index(subobj, end);
943 subneed->cld_obj = subobj;
944 if (!cl_lock_ext_match(&sub->sub_got, subneed)) {
945 result = 0;
946 break;
947 }
948 }
949 cl_lock_mutex_put(env, lock);
950}
951#endif
952
953
954
955
956
957static int lov_lock_stripe_is_matching(const struct lu_env *env,
958 struct lov_object *lov, int stripe,
959 const struct cl_lock_descr *child,
960 const struct cl_lock_descr *descr)
961{
962 struct lov_stripe_md *lsm = lov->lo_lsm;
963 obd_off start;
964 obd_off end;
965 int result;
966
967 if (lov_r0(lov)->lo_nr == 1)
968 return cl_lock_ext_match(child, descr);
969
970
971
972
973
974
975 start = cl_offset(&lov->lo_cl, descr->cld_start);
976 end = cl_offset(&lov->lo_cl, descr->cld_end + 1) - 1;
977 result = end - start <= lsm->lsm_stripe_size &&
978 stripe == lov_stripe_number(lsm, start) &&
979 stripe == lov_stripe_number(lsm, end);
980 if (result) {
981 struct cl_lock_descr *subd = &lov_env_info(env)->lti_ldescr;
982 obd_off sub_start;
983 obd_off sub_end;
984
985 subd->cld_obj = NULL;
986 subd->cld_mode = descr->cld_mode;
987 subd->cld_gid = descr->cld_gid;
988 result = lov_stripe_intersects(lsm, stripe, start, end,
989 &sub_start, &sub_end);
990 LASSERT(result);
991 subd->cld_start = cl_index(child->cld_obj, sub_start);
992 subd->cld_end = cl_index(child->cld_obj, sub_end);
993 result = cl_lock_ext_match(child, subd);
994 }
995 return result;
996}
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007static int lov_lock_fits_into(const struct lu_env *env,
1008 const struct cl_lock_slice *slice,
1009 const struct cl_lock_descr *need,
1010 const struct cl_io *io)
1011{
1012 struct lov_lock *lov = cl2lov_lock(slice);
1013 struct lov_object *obj = cl2lov(slice->cls_obj);
1014 int result;
1015
1016 LASSERT(cl_object_same(need->cld_obj, slice->cls_obj));
1017 LASSERT(lov->lls_nr > 0);
1018
1019 ENTRY;
1020
1021
1022
1023 if (need->cld_enq_flags != lov->lls_orig.cld_enq_flags)
1024 return 0;
1025
1026 if (need->cld_mode == CLM_GROUP)
1027
1028
1029
1030 result = cl_lock_ext_match(&lov->lls_orig, need);
1031 else if (lov->lls_nr == 1) {
1032 struct cl_lock_descr *got = &lov->lls_sub[0].sub_got;
1033 result = lov_lock_stripe_is_matching(env,
1034 cl2lov(slice->cls_obj),
1035 lov->lls_sub[0].sub_stripe,
1036 got, need);
1037 } else if (io->ci_type != CIT_SETATTR && io->ci_type != CIT_MISC &&
1038 !cl_io_is_append(io) && need->cld_mode != CLM_PHANTOM)
1039
1040
1041
1042
1043 result = 0;
1044 else
1045
1046
1047
1048
1049
1050
1051
1052
1053 result = cl_lock_ext_match(&lov->lls_orig, need);
1054 CDEBUG(D_DLMTRACE, DDESCR"/"DDESCR" %d %d/%d: %d\n",
1055 PDESCR(&lov->lls_orig), PDESCR(&lov->lls_sub[0].sub_got),
1056 lov->lls_sub[0].sub_stripe, lov->lls_nr, lov_r0(obj)->lo_nr,
1057 result);
1058 RETURN(result);
1059}
1060
1061void lov_lock_unlink(const struct lu_env *env,
1062 struct lov_lock_link *link, struct lovsub_lock *sub)
1063{
1064 struct lov_lock *lck = link->lll_super;
1065 struct cl_lock *parent = lck->lls_cl.cls_lock;
1066
1067 LASSERT(cl_lock_is_mutexed(parent));
1068 LASSERT(cl_lock_is_mutexed(sub->lss_cl.cls_lock));
1069 ENTRY;
1070
1071 list_del_init(&link->lll_list);
1072 LASSERT(lck->lls_sub[link->lll_idx].sub_lock == sub);
1073
1074 lck->lls_sub[link->lll_idx].sub_lock = NULL;
1075 LASSERT(lck->lls_nr_filled > 0);
1076 lck->lls_nr_filled--;
1077 lu_ref_del(&parent->cll_reference, "lov-child", sub->lss_cl.cls_lock);
1078 cl_lock_put(env, parent);
1079 OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
1080 EXIT;
1081}
1082
1083struct lov_lock_link *lov_lock_link_find(const struct lu_env *env,
1084 struct lov_lock *lck,
1085 struct lovsub_lock *sub)
1086{
1087 struct lov_lock_link *scan;
1088
1089 LASSERT(cl_lock_is_mutexed(sub->lss_cl.cls_lock));
1090 ENTRY;
1091
1092 list_for_each_entry(scan, &sub->lss_parents, lll_list) {
1093 if (scan->lll_super == lck)
1094 RETURN(scan);
1095 }
1096 RETURN(NULL);
1097}
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113static void lov_lock_delete(const struct lu_env *env,
1114 const struct cl_lock_slice *slice)
1115{
1116 struct lov_lock *lck = cl2lov_lock(slice);
1117 struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
1118 struct lov_lock_link *link;
1119 int rc;
1120 int i;
1121
1122 LASSERT(slice->cls_lock->cll_state == CLS_FREEING);
1123 ENTRY;
1124
1125 for (i = 0; i < lck->lls_nr; ++i) {
1126 struct lov_lock_sub *lls = &lck->lls_sub[i];
1127 struct lovsub_lock *lsl = lls->sub_lock;
1128
1129 if (lsl == NULL)
1130 continue;
1131
1132 rc = lov_sublock_lock(env, lck, lls, closure, NULL);
1133 if (rc == CLO_REPEAT) {
1134 --i;
1135 continue;
1136 }
1137
1138 LASSERT(rc == 0);
1139 LASSERT(lsl->lss_cl.cls_lock->cll_state < CLS_FREEING);
1140
1141 if (lls->sub_flags & LSF_HELD)
1142 lov_sublock_release(env, lck, i, 1, 0);
1143
1144 link = lov_lock_link_find(env, lck, lsl);
1145 LASSERT(link != NULL);
1146 lov_lock_unlink(env, link, lsl);
1147 LASSERT(lck->lls_sub[i].sub_lock == NULL);
1148
1149 lov_sublock_unlock(env, lsl, closure, NULL);
1150 }
1151
1152 cl_lock_closure_fini(closure);
1153 EXIT;
1154}
1155
1156static int lov_lock_print(const struct lu_env *env, void *cookie,
1157 lu_printer_t p, const struct cl_lock_slice *slice)
1158{
1159 struct lov_lock *lck = cl2lov_lock(slice);
1160 int i;
1161
1162 (*p)(env, cookie, "%d\n", lck->lls_nr);
1163 for (i = 0; i < lck->lls_nr; ++i) {
1164 struct lov_lock_sub *sub;
1165
1166 sub = &lck->lls_sub[i];
1167 (*p)(env, cookie, " %d %x: ", i, sub->sub_flags);
1168 if (sub->sub_lock != NULL)
1169 cl_lock_print(env, cookie, p,
1170 sub->sub_lock->lss_cl.cls_lock);
1171 else
1172 (*p)(env, cookie, "---\n");
1173 }
1174 return 0;
1175}
1176
1177static const struct cl_lock_operations lov_lock_ops = {
1178 .clo_fini = lov_lock_fini,
1179 .clo_enqueue = lov_lock_enqueue,
1180 .clo_wait = lov_lock_wait,
1181 .clo_use = lov_lock_use,
1182 .clo_unuse = lov_lock_unuse,
1183 .clo_cancel = lov_lock_cancel,
1184 .clo_fits_into = lov_lock_fits_into,
1185 .clo_delete = lov_lock_delete,
1186 .clo_print = lov_lock_print
1187};
1188
1189int lov_lock_init_raid0(const struct lu_env *env, struct cl_object *obj,
1190 struct cl_lock *lock, const struct cl_io *io)
1191{
1192 struct lov_lock *lck;
1193 int result;
1194
1195 ENTRY;
1196 OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, __GFP_IO);
1197 if (lck != NULL) {
1198 cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_lock_ops);
1199 result = lov_lock_sub_init(env, lck, io);
1200 } else
1201 result = -ENOMEM;
1202 RETURN(result);
1203}
1204
1205static void lov_empty_lock_fini(const struct lu_env *env,
1206 struct cl_lock_slice *slice)
1207{
1208 struct lov_lock *lck = cl2lov_lock(slice);
1209 OBD_SLAB_FREE_PTR(lck, lov_lock_kmem);
1210}
1211
1212static int lov_empty_lock_print(const struct lu_env *env, void *cookie,
1213 lu_printer_t p, const struct cl_lock_slice *slice)
1214{
1215 (*p)(env, cookie, "empty\n");
1216 return 0;
1217}
1218
1219
1220static const struct cl_lock_operations lov_empty_lock_ops = {
1221 .clo_fini = lov_empty_lock_fini,
1222 .clo_print = lov_empty_lock_print
1223};
1224
1225int lov_lock_init_empty(const struct lu_env *env, struct cl_object *obj,
1226 struct cl_lock *lock, const struct cl_io *io)
1227{
1228 struct lov_lock *lck;
1229 int result = -ENOMEM;
1230
1231 ENTRY;
1232 OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, __GFP_IO);
1233 if (lck != NULL) {
1234 cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_empty_lock_ops);
1235 lck->lls_orig = lock->cll_descr;
1236 result = 0;
1237 }
1238 RETURN(result);
1239}
1240
1241static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
1242 struct cl_lock *parent)
1243{
1244 struct cl_lock_closure *closure;
1245
1246 closure = &lov_env_info(env)->lti_closure;
1247 LASSERT(list_empty(&closure->clc_list));
1248 cl_lock_closure_init(env, closure, parent, 1);
1249 return closure;
1250}
1251
1252
1253
1254