1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41#define DEBUG_SUBSYSTEM S_LOV
42
43#include "lov_cl_internal.h"
44
45
46
47
48
49static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
50 struct cl_lock *parent);
51
52static int lov_lock_unuse(const struct lu_env *env,
53 const struct cl_lock_slice *slice);
54
55
56
57
58
59
60static struct lov_sublock_env *lov_sublock_env_get(const struct lu_env *env,
61 struct cl_lock *parent,
62 struct lov_lock_sub *lls)
63{
64 struct lov_sublock_env *subenv;
65 struct lov_io *lio = lov_env_io(env);
66 struct cl_io *io = lio->lis_cl.cis_io;
67 struct lov_io_sub *sub;
68
69 subenv = &lov_env_session(env)->ls_subenv;
70
71
72
73
74
75
76
77
78
79
80 if (!io || !cl_object_same(io->ci_obj, parent->cll_descr.cld_obj)) {
81 subenv->lse_env = env;
82 subenv->lse_io = io;
83 subenv->lse_sub = NULL;
84 } else {
85 sub = lov_sub_get(env, lio, lls->sub_stripe);
86 if (!IS_ERR(sub)) {
87 subenv->lse_env = sub->sub_env;
88 subenv->lse_io = sub->sub_io;
89 subenv->lse_sub = sub;
90 } else {
91 subenv = (void *)sub;
92 }
93 }
94 return subenv;
95}
96
97static void lov_sublock_env_put(struct lov_sublock_env *subenv)
98{
99 if (subenv && subenv->lse_sub)
100 lov_sub_put(subenv->lse_sub);
101}
102
103static void lov_sublock_adopt(const struct lu_env *env, struct lov_lock *lck,
104 struct cl_lock *sublock, int idx,
105 struct lov_lock_link *link)
106{
107 struct lovsub_lock *lsl;
108 struct cl_lock *parent = lck->lls_cl.cls_lock;
109 int rc;
110
111 LASSERT(cl_lock_is_mutexed(parent));
112 LASSERT(cl_lock_is_mutexed(sublock));
113
114 lsl = cl2sub_lock(sublock);
115
116
117
118 LASSERT(lov_lock_link_find(env, lck, lsl) == NULL);
119 LASSERT(idx < lck->lls_nr);
120
121 lck->lls_sub[idx].sub_lock = lsl;
122 lck->lls_nr_filled++;
123 LASSERT(lck->lls_nr_filled <= lck->lls_nr);
124 list_add_tail(&link->lll_list, &lsl->lss_parents);
125 link->lll_idx = idx;
126 link->lll_super = lck;
127 cl_lock_get(parent);
128 lu_ref_add(&parent->cll_reference, "lov-child", sublock);
129 lck->lls_sub[idx].sub_flags |= LSF_HELD;
130 cl_lock_user_add(env, sublock);
131
132 rc = lov_sublock_modify(env, lck, lsl, &sublock->cll_descr, idx);
133 LASSERT(rc == 0);
134}
135
136static struct cl_lock *lov_sublock_alloc(const struct lu_env *env,
137 const struct cl_io *io,
138 struct lov_lock *lck,
139 int idx, struct lov_lock_link **out)
140{
141 struct cl_lock *sublock;
142 struct cl_lock *parent;
143 struct lov_lock_link *link;
144
145 LASSERT(idx < lck->lls_nr);
146
147 OBD_SLAB_ALLOC_PTR_GFP(link, lov_lock_link_kmem, __GFP_IO);
148 if (link != NULL) {
149 struct lov_sublock_env *subenv;
150 struct lov_lock_sub *lls;
151 struct cl_lock_descr *descr;
152
153 parent = lck->lls_cl.cls_lock;
154 lls = &lck->lls_sub[idx];
155 descr = &lls->sub_got;
156
157 subenv = lov_sublock_env_get(env, parent, lls);
158 if (!IS_ERR(subenv)) {
159
160
161
162
163
164
165 sublock = cl_lock_hold(subenv->lse_env, subenv->lse_io,
166 descr, "lov-parent", parent);
167 lov_sublock_env_put(subenv);
168 } else {
169
170 sublock = (void *)subenv;
171 }
172
173 if (!IS_ERR(sublock))
174 *out = link;
175 else
176 OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
177 } else
178 sublock = ERR_PTR(-ENOMEM);
179 return sublock;
180}
181
182static void lov_sublock_unlock(const struct lu_env *env,
183 struct lovsub_lock *lsl,
184 struct cl_lock_closure *closure,
185 struct lov_sublock_env *subenv)
186{
187 lov_sublock_env_put(subenv);
188 lsl->lss_active = NULL;
189 cl_lock_disclosure(env, closure);
190}
191
192static int lov_sublock_lock(const struct lu_env *env,
193 struct lov_lock *lck,
194 struct lov_lock_sub *lls,
195 struct cl_lock_closure *closure,
196 struct lov_sublock_env **lsep)
197{
198 struct lovsub_lock *sublock;
199 struct cl_lock *child;
200 int result = 0;
201
202 LASSERT(list_empty(&closure->clc_list));
203
204 sublock = lls->sub_lock;
205 child = sublock->lss_cl.cls_lock;
206 result = cl_lock_closure_build(env, child, closure);
207 if (result == 0) {
208 struct cl_lock *parent = closure->clc_origin;
209
210 LASSERT(cl_lock_is_mutexed(child));
211 sublock->lss_active = parent;
212
213 if (unlikely((child->cll_state == CLS_FREEING) ||
214 (child->cll_flags & CLF_CANCELLED))) {
215 struct lov_lock_link *link;
216
217
218
219
220 LASSERT(!(lls->sub_flags & LSF_HELD));
221
222 link = lov_lock_link_find(env, lck, sublock);
223 LASSERT(link != NULL);
224 lov_lock_unlink(env, link, sublock);
225 lov_sublock_unlock(env, sublock, closure, NULL);
226 lck->lls_cancel_race = 1;
227 result = CLO_REPEAT;
228 } else if (lsep) {
229 struct lov_sublock_env *subenv;
230 subenv = lov_sublock_env_get(env, parent, lls);
231 if (IS_ERR(subenv)) {
232 lov_sublock_unlock(env, sublock,
233 closure, NULL);
234 result = PTR_ERR(subenv);
235 } else {
236 *lsep = subenv;
237 }
238 }
239 }
240 return result;
241}
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259static int lov_subresult(int result, int rc)
260{
261 int result_rank;
262 int rc_rank;
263
264 LASSERTF(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT,
265 "result = %d", result);
266 LASSERTF(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT,
267 "rc = %d\n", rc);
268 CLASSERT(CLO_WAIT < CLO_REPEAT);
269
270
271 result_rank = result < 0 ? 1 + CLO_REPEAT : result;
272 rc_rank = rc < 0 ? 1 + CLO_REPEAT : rc;
273
274 if (result_rank < rc_rank)
275 result = rc;
276 return result;
277}
278
279
280
281
282
283
284
285
286
287static int lov_lock_sub_init(const struct lu_env *env,
288 struct lov_lock *lck, const struct cl_io *io)
289{
290 int result = 0;
291 int i;
292 int nr;
293 obd_off start;
294 obd_off end;
295 obd_off file_start;
296 obd_off file_end;
297
298 struct lov_object *loo = cl2lov(lck->lls_cl.cls_obj);
299 struct lov_layout_raid0 *r0 = lov_r0(loo);
300 struct cl_lock *parent = lck->lls_cl.cls_lock;
301
302 lck->lls_orig = parent->cll_descr;
303 file_start = cl_offset(lov2cl(loo), parent->cll_descr.cld_start);
304 file_end = cl_offset(lov2cl(loo), parent->cll_descr.cld_end + 1) - 1;
305
306 for (i = 0, nr = 0; i < r0->lo_nr; i++) {
307
308
309
310
311 if (lov_stripe_intersects(loo->lo_lsm, i,
312 file_start, file_end, &start, &end))
313 nr++;
314 }
315 LASSERT(nr > 0);
316 OBD_ALLOC_LARGE(lck->lls_sub, nr * sizeof(lck->lls_sub[0]));
317 if (lck->lls_sub == NULL)
318 return -ENOMEM;
319
320 lck->lls_nr = nr;
321
322
323
324
325
326
327
328 for (i = 0, nr = 0; i < r0->lo_nr; ++i) {
329 if (lov_stripe_intersects(loo->lo_lsm, i,
330 file_start, file_end, &start, &end)) {
331 struct cl_lock_descr *descr;
332
333 descr = &lck->lls_sub[nr].sub_descr;
334
335 LASSERT(descr->cld_obj == NULL);
336 descr->cld_obj = lovsub2cl(r0->lo_sub[i]);
337 descr->cld_start = cl_index(descr->cld_obj, start);
338 descr->cld_end = cl_index(descr->cld_obj, end);
339 descr->cld_mode = parent->cll_descr.cld_mode;
340 descr->cld_gid = parent->cll_descr.cld_gid;
341 descr->cld_enq_flags = parent->cll_descr.cld_enq_flags;
342
343 lck->lls_sub[nr].sub_got = *descr;
344 lck->lls_sub[nr].sub_stripe = i;
345 nr++;
346 }
347 }
348 LASSERT(nr == lck->lls_nr);
349
350
351
352
353 for (i = 0; i < lck->lls_nr; ++i) {
354 struct cl_lock *sublock;
355 struct lov_lock_link *link;
356
357 if (lck->lls_sub[i].sub_lock == NULL) {
358 sublock = lov_sublock_alloc(env, io, lck, i, &link);
359 if (IS_ERR(sublock)) {
360 result = PTR_ERR(sublock);
361 break;
362 }
363 cl_lock_get_trust(sublock);
364 cl_lock_mutex_get(env, sublock);
365 cl_lock_mutex_get(env, parent);
366
367
368
369
370 if (lck->lls_sub[i].sub_lock == NULL &&
371 parent->cll_state < CLS_FREEING) {
372 lov_sublock_adopt(env, lck, sublock, i, link);
373 cl_lock_mutex_put(env, parent);
374 } else {
375 OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
376 cl_lock_mutex_put(env, parent);
377 cl_lock_unhold(env, sublock,
378 "lov-parent", parent);
379 }
380 cl_lock_mutex_put(env, sublock);
381 cl_lock_put(env, sublock);
382 }
383 }
384
385
386
387
388
389 return result;
390}
391
392static int lov_sublock_release(const struct lu_env *env, struct lov_lock *lck,
393 int i, int deluser, int rc)
394{
395 struct cl_lock *parent = lck->lls_cl.cls_lock;
396
397 LASSERT(cl_lock_is_mutexed(parent));
398
399 if (lck->lls_sub[i].sub_flags & LSF_HELD) {
400 struct cl_lock *sublock;
401 int dying;
402
403 LASSERT(lck->lls_sub[i].sub_lock != NULL);
404 sublock = lck->lls_sub[i].sub_lock->lss_cl.cls_lock;
405 LASSERT(cl_lock_is_mutexed(sublock));
406
407 lck->lls_sub[i].sub_flags &= ~LSF_HELD;
408 if (deluser)
409 cl_lock_user_del(env, sublock);
410
411
412
413
414
415 dying = (sublock->cll_descr.cld_mode == CLM_PHANTOM ||
416 sublock->cll_descr.cld_mode == CLM_GROUP ||
417 (sublock->cll_flags & (CLF_CANCELPEND|CLF_DOOMED))) &&
418 sublock->cll_holds == 1;
419 if (dying)
420 cl_lock_mutex_put(env, parent);
421 cl_lock_unhold(env, sublock, "lov-parent", parent);
422 if (dying) {
423 cl_lock_mutex_get(env, parent);
424 rc = lov_subresult(rc, CLO_REPEAT);
425 }
426
427
428
429
430
431
432
433 }
434 return rc;
435}
436
437static void lov_sublock_hold(const struct lu_env *env, struct lov_lock *lck,
438 int i)
439{
440 struct cl_lock *parent = lck->lls_cl.cls_lock;
441
442 LASSERT(cl_lock_is_mutexed(parent));
443
444 if (!(lck->lls_sub[i].sub_flags & LSF_HELD)) {
445 struct cl_lock *sublock;
446
447 LASSERT(lck->lls_sub[i].sub_lock != NULL);
448 sublock = lck->lls_sub[i].sub_lock->lss_cl.cls_lock;
449 LASSERT(cl_lock_is_mutexed(sublock));
450 LASSERT(sublock->cll_state != CLS_FREEING);
451
452 lck->lls_sub[i].sub_flags |= LSF_HELD;
453
454 cl_lock_get_trust(sublock);
455 cl_lock_hold_add(env, sublock, "lov-parent", parent);
456 cl_lock_user_add(env, sublock);
457 cl_lock_put(env, sublock);
458 }
459}
460
461static void lov_lock_fini(const struct lu_env *env,
462 struct cl_lock_slice *slice)
463{
464 struct lov_lock *lck;
465 int i;
466
467 lck = cl2lov_lock(slice);
468 LASSERT(lck->lls_nr_filled == 0);
469 if (lck->lls_sub != NULL) {
470 for (i = 0; i < lck->lls_nr; ++i)
471
472
473
474
475 LASSERT(lck->lls_sub[i].sub_lock == NULL);
476 OBD_FREE_LARGE(lck->lls_sub,
477 lck->lls_nr * sizeof(lck->lls_sub[0]));
478 }
479 OBD_SLAB_FREE_PTR(lck, lov_lock_kmem);
480}
481
482static int lov_lock_enqueue_wait(const struct lu_env *env,
483 struct lov_lock *lck,
484 struct cl_lock *sublock)
485{
486 struct cl_lock *lock = lck->lls_cl.cls_lock;
487 int result;
488
489 LASSERT(cl_lock_is_mutexed(lock));
490
491 cl_lock_mutex_put(env, lock);
492 result = cl_lock_enqueue_wait(env, sublock, 0);
493 cl_lock_mutex_get(env, lock);
494 return result ?: CLO_REPEAT;
495}
496
497
498
499
500
501
502
503
504static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck,
505 struct cl_lock *sublock,
506 struct cl_io *io, __u32 enqflags, int last)
507{
508 int result;
509
510
511 result = cl_enqueue_try(env, sublock, io, enqflags);
512 if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL)) {
513
514
515 result = cl_wait_try(env, sublock);
516 if (result == CLO_REENQUEUED)
517 result = CLO_WAIT;
518 }
519
520
521
522
523
524 if ((result == CLO_WAIT) && (sublock->cll_state <= CLS_HELD) &&
525 (enqflags & CEF_ASYNC) && (!last || (enqflags & CEF_AGL)))
526 result = 0;
527 return result;
528}
529
530
531
532
533static int lov_sublock_fill(const struct lu_env *env, struct cl_lock *parent,
534 struct cl_io *io, struct lov_lock *lck, int idx)
535{
536 struct lov_lock_link *link;
537 struct cl_lock *sublock;
538 int result;
539
540 LASSERT(parent->cll_depth == 1);
541 cl_lock_mutex_put(env, parent);
542 sublock = lov_sublock_alloc(env, io, lck, idx, &link);
543 if (!IS_ERR(sublock))
544 cl_lock_mutex_get(env, sublock);
545 cl_lock_mutex_get(env, parent);
546
547 if (!IS_ERR(sublock)) {
548 cl_lock_get_trust(sublock);
549 if (parent->cll_state == CLS_QUEUING &&
550 lck->lls_sub[idx].sub_lock == NULL) {
551 lov_sublock_adopt(env, lck, sublock, idx, link);
552 } else {
553 OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
554
555
556 cl_lock_mutex_put(env, parent);
557 cl_lock_unhold(env, sublock, "lov-parent", parent);
558 cl_lock_mutex_get(env, parent);
559 }
560 cl_lock_mutex_put(env, sublock);
561 cl_lock_put(env, sublock);
562 result = CLO_REPEAT;
563 } else
564 result = PTR_ERR(sublock);
565 return result;
566}
567
568
569
570
571
572
573
574
575static int lov_lock_enqueue(const struct lu_env *env,
576 const struct cl_lock_slice *slice,
577 struct cl_io *io, __u32 enqflags)
578{
579 struct cl_lock *lock = slice->cls_lock;
580 struct lov_lock *lck = cl2lov_lock(slice);
581 struct cl_lock_closure *closure = lov_closure_get(env, lock);
582 int i;
583 int result;
584 enum cl_lock_state minstate;
585
586 for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
587 int rc;
588 struct lovsub_lock *sub;
589 struct lov_lock_sub *lls;
590 struct cl_lock *sublock;
591 struct lov_sublock_env *subenv;
592
593 if (lock->cll_state != CLS_QUEUING) {
594
595
596
597
598
599 LASSERT(i > 0 && result != 0);
600 break;
601 }
602
603 lls = &lck->lls_sub[i];
604 sub = lls->sub_lock;
605
606
607
608
609 if (sub == NULL) {
610 result = lov_sublock_fill(env, lock, io, lck, i);
611
612
613 break;
614 }
615 sublock = sub->lss_cl.cls_lock;
616 rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
617 if (rc == 0) {
618 lov_sublock_hold(env, lck, i);
619 rc = lov_lock_enqueue_one(subenv->lse_env, lck, sublock,
620 subenv->lse_io, enqflags,
621 i == lck->lls_nr - 1);
622 minstate = min(minstate, sublock->cll_state);
623 if (rc == CLO_WAIT) {
624 switch (sublock->cll_state) {
625 case CLS_QUEUING:
626
627
628
629 cl_lock_mutex_get(env, sublock);
630 lov_sublock_unlock(env, sub, closure,
631 subenv);
632 rc = lov_lock_enqueue_wait(env, lck,
633 sublock);
634 break;
635 case CLS_CACHED:
636 cl_lock_get(sublock);
637
638 cl_lock_mutex_get(env, sublock);
639
640
641 lov_sublock_unlock(env, sub, closure,
642 subenv);
643
644 rc = lov_sublock_release(env, lck, i,
645 1, rc);
646 cl_lock_mutex_put(env, sublock);
647 cl_lock_put(env, sublock);
648 break;
649 default:
650 lov_sublock_unlock(env, sub, closure,
651 subenv);
652 break;
653 }
654 } else {
655 LASSERT(sublock->cll_conflict == NULL);
656 lov_sublock_unlock(env, sub, closure, subenv);
657 }
658 }
659 result = lov_subresult(result, rc);
660 if (result != 0)
661 break;
662 }
663 cl_lock_closure_fini(closure);
664 return result ?: minstate >= CLS_ENQUEUED ? 0 : CLO_WAIT;
665}
666
667static int lov_lock_unuse(const struct lu_env *env,
668 const struct cl_lock_slice *slice)
669{
670 struct lov_lock *lck = cl2lov_lock(slice);
671 struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
672 int i;
673 int result;
674
675 for (result = 0, i = 0; i < lck->lls_nr; ++i) {
676 int rc;
677 struct lovsub_lock *sub;
678 struct cl_lock *sublock;
679 struct lov_lock_sub *lls;
680 struct lov_sublock_env *subenv;
681
682
683
684
685 LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
686 lls = &lck->lls_sub[i];
687 sub = lls->sub_lock;
688 if (sub == NULL)
689 continue;
690
691 sublock = sub->lss_cl.cls_lock;
692 rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
693 if (rc == 0) {
694 if (lls->sub_flags & LSF_HELD) {
695 LASSERT(sublock->cll_state == CLS_HELD ||
696 sublock->cll_state == CLS_ENQUEUED);
697 rc = cl_unuse_try(subenv->lse_env, sublock);
698 rc = lov_sublock_release(env, lck, i, 0, rc);
699 }
700 lov_sublock_unlock(env, sub, closure, subenv);
701 }
702 result = lov_subresult(result, rc);
703 }
704
705 if (result == 0 && lck->lls_cancel_race) {
706 lck->lls_cancel_race = 0;
707 result = -ESTALE;
708 }
709 cl_lock_closure_fini(closure);
710 return result;
711}
712
713
714static void lov_lock_cancel(const struct lu_env *env,
715 const struct cl_lock_slice *slice)
716{
717 struct lov_lock *lck = cl2lov_lock(slice);
718 struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
719 int i;
720 int result;
721
722 for (result = 0, i = 0; i < lck->lls_nr; ++i) {
723 int rc;
724 struct lovsub_lock *sub;
725 struct cl_lock *sublock;
726 struct lov_lock_sub *lls;
727 struct lov_sublock_env *subenv;
728
729
730
731
732 lls = &lck->lls_sub[i];
733 sub = lls->sub_lock;
734 if (sub == NULL)
735 continue;
736
737 sublock = sub->lss_cl.cls_lock;
738 rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
739 if (rc == 0) {
740 if (!(lls->sub_flags & LSF_HELD)) {
741 lov_sublock_unlock(env, sub, closure, subenv);
742 continue;
743 }
744
745 switch (sublock->cll_state) {
746 case CLS_HELD:
747 rc = cl_unuse_try(subenv->lse_env, sublock);
748 lov_sublock_release(env, lck, i, 0, 0);
749 break;
750 default:
751 lov_sublock_release(env, lck, i, 1, 0);
752 break;
753 }
754 lov_sublock_unlock(env, sub, closure, subenv);
755 }
756
757 if (rc == CLO_REPEAT) {
758 --i;
759 continue;
760 }
761
762 result = lov_subresult(result, rc);
763 }
764
765 if (result)
766 CL_LOCK_DEBUG(D_ERROR, env, slice->cls_lock,
767 "lov_lock_cancel fails with %d.\n", result);
768
769 cl_lock_closure_fini(closure);
770}
771
772static int lov_lock_wait(const struct lu_env *env,
773 const struct cl_lock_slice *slice)
774{
775 struct lov_lock *lck = cl2lov_lock(slice);
776 struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
777 enum cl_lock_state minstate;
778 int reenqueued;
779 int result;
780 int i;
781
782again:
783 for (result = 0, minstate = CLS_FREEING, i = 0, reenqueued = 0;
784 i < lck->lls_nr; ++i) {
785 int rc;
786 struct lovsub_lock *sub;
787 struct cl_lock *sublock;
788 struct lov_lock_sub *lls;
789 struct lov_sublock_env *subenv;
790
791 lls = &lck->lls_sub[i];
792 sub = lls->sub_lock;
793 LASSERT(sub != NULL);
794 sublock = sub->lss_cl.cls_lock;
795 rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
796 if (rc == 0) {
797 LASSERT(sublock->cll_state >= CLS_ENQUEUED);
798 if (sublock->cll_state < CLS_HELD)
799 rc = cl_wait_try(env, sublock);
800
801 minstate = min(minstate, sublock->cll_state);
802 lov_sublock_unlock(env, sub, closure, subenv);
803 }
804 if (rc == CLO_REENQUEUED) {
805 reenqueued++;
806 rc = 0;
807 }
808 result = lov_subresult(result, rc);
809 if (result != 0)
810 break;
811 }
812
813
814 if (result == 0 && reenqueued != 0)
815 goto again;
816 cl_lock_closure_fini(closure);
817 return result ?: minstate >= CLS_HELD ? 0 : CLO_WAIT;
818}
819
820static int lov_lock_use(const struct lu_env *env,
821 const struct cl_lock_slice *slice)
822{
823 struct lov_lock *lck = cl2lov_lock(slice);
824 struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
825 int result;
826 int i;
827
828 LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
829
830 for (result = 0, i = 0; i < lck->lls_nr; ++i) {
831 int rc;
832 struct lovsub_lock *sub;
833 struct cl_lock *sublock;
834 struct lov_lock_sub *lls;
835 struct lov_sublock_env *subenv;
836
837 LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
838
839 lls = &lck->lls_sub[i];
840 sub = lls->sub_lock;
841 if (sub == NULL) {
842
843
844
845
846 result = -ESTALE;
847 break;
848 }
849
850 sublock = sub->lss_cl.cls_lock;
851 rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
852 if (rc == 0) {
853 LASSERT(sublock->cll_state != CLS_FREEING);
854 lov_sublock_hold(env, lck, i);
855 if (sublock->cll_state == CLS_CACHED) {
856 rc = cl_use_try(subenv->lse_env, sublock, 0);
857 if (rc != 0)
858 rc = lov_sublock_release(env, lck,
859 i, 1, rc);
860 } else if (sublock->cll_state == CLS_NEW) {
861
862
863 result = -ESTALE;
864 lov_sublock_release(env, lck, i, 1, result);
865 }
866 lov_sublock_unlock(env, sub, closure, subenv);
867 }
868 result = lov_subresult(result, rc);
869 if (result != 0)
870 break;
871 }
872
873 if (lck->lls_cancel_race) {
874
875
876
877
878
879
880 lck->lls_cancel_race = 0;
881 LASSERT(result != 0);
882 result = -ESTALE;
883 }
884 cl_lock_closure_fini(closure);
885 return result;
886}
887
888#if 0
889static int lock_lock_multi_match()
890{
891 struct cl_lock *lock = slice->cls_lock;
892 struct cl_lock_descr *subneed = &lov_env_info(env)->lti_ldescr;
893 struct lov_object *loo = cl2lov(lov->lls_cl.cls_obj);
894 struct lov_layout_raid0 *r0 = lov_r0(loo);
895 struct lov_lock_sub *sub;
896 struct cl_object *subobj;
897 obd_off fstart;
898 obd_off fend;
899 obd_off start;
900 obd_off end;
901 int i;
902
903 fstart = cl_offset(need->cld_obj, need->cld_start);
904 fend = cl_offset(need->cld_obj, need->cld_end + 1) - 1;
905 subneed->cld_mode = need->cld_mode;
906 cl_lock_mutex_get(env, lock);
907 for (i = 0; i < lov->lls_nr; ++i) {
908 sub = &lov->lls_sub[i];
909 if (sub->sub_lock == NULL)
910 continue;
911 subobj = sub->sub_descr.cld_obj;
912 if (!lov_stripe_intersects(loo->lo_lsm, sub->sub_stripe,
913 fstart, fend, &start, &end))
914 continue;
915 subneed->cld_start = cl_index(subobj, start);
916 subneed->cld_end = cl_index(subobj, end);
917 subneed->cld_obj = subobj;
918 if (!cl_lock_ext_match(&sub->sub_got, subneed)) {
919 result = 0;
920 break;
921 }
922 }
923 cl_lock_mutex_put(env, lock);
924}
925#endif
926
927
928
929
930
931static int lov_lock_stripe_is_matching(const struct lu_env *env,
932 struct lov_object *lov, int stripe,
933 const struct cl_lock_descr *child,
934 const struct cl_lock_descr *descr)
935{
936 struct lov_stripe_md *lsm = lov->lo_lsm;
937 obd_off start;
938 obd_off end;
939 int result;
940
941 if (lov_r0(lov)->lo_nr == 1)
942 return cl_lock_ext_match(child, descr);
943
944
945
946
947
948
949 start = cl_offset(&lov->lo_cl, descr->cld_start);
950 end = cl_offset(&lov->lo_cl, descr->cld_end + 1) - 1;
951 result = end - start <= lsm->lsm_stripe_size &&
952 stripe == lov_stripe_number(lsm, start) &&
953 stripe == lov_stripe_number(lsm, end);
954 if (result) {
955 struct cl_lock_descr *subd = &lov_env_info(env)->lti_ldescr;
956 obd_off sub_start;
957 obd_off sub_end;
958
959 subd->cld_obj = NULL;
960 subd->cld_mode = descr->cld_mode;
961 subd->cld_gid = descr->cld_gid;
962 result = lov_stripe_intersects(lsm, stripe, start, end,
963 &sub_start, &sub_end);
964 LASSERT(result);
965 subd->cld_start = cl_index(child->cld_obj, sub_start);
966 subd->cld_end = cl_index(child->cld_obj, sub_end);
967 result = cl_lock_ext_match(child, subd);
968 }
969 return result;
970}
971
972
973
974
975
976
977
978
979
980
981static int lov_lock_fits_into(const struct lu_env *env,
982 const struct cl_lock_slice *slice,
983 const struct cl_lock_descr *need,
984 const struct cl_io *io)
985{
986 struct lov_lock *lov = cl2lov_lock(slice);
987 struct lov_object *obj = cl2lov(slice->cls_obj);
988 int result;
989
990 LASSERT(cl_object_same(need->cld_obj, slice->cls_obj));
991 LASSERT(lov->lls_nr > 0);
992
993
994
995 if (need->cld_enq_flags != lov->lls_orig.cld_enq_flags)
996 return 0;
997
998 if (need->cld_mode == CLM_GROUP)
999
1000
1001
1002 result = cl_lock_ext_match(&lov->lls_orig, need);
1003 else if (lov->lls_nr == 1) {
1004 struct cl_lock_descr *got = &lov->lls_sub[0].sub_got;
1005 result = lov_lock_stripe_is_matching(env,
1006 cl2lov(slice->cls_obj),
1007 lov->lls_sub[0].sub_stripe,
1008 got, need);
1009 } else if (io->ci_type != CIT_SETATTR && io->ci_type != CIT_MISC &&
1010 !cl_io_is_append(io) && need->cld_mode != CLM_PHANTOM)
1011
1012
1013
1014
1015 result = 0;
1016 else
1017
1018
1019
1020
1021
1022
1023
1024
1025 result = cl_lock_ext_match(&lov->lls_orig, need);
1026 CDEBUG(D_DLMTRACE, DDESCR"/"DDESCR" %d %d/%d: %d\n",
1027 PDESCR(&lov->lls_orig), PDESCR(&lov->lls_sub[0].sub_got),
1028 lov->lls_sub[0].sub_stripe, lov->lls_nr, lov_r0(obj)->lo_nr,
1029 result);
1030 return result;
1031}
1032
1033void lov_lock_unlink(const struct lu_env *env,
1034 struct lov_lock_link *link, struct lovsub_lock *sub)
1035{
1036 struct lov_lock *lck = link->lll_super;
1037 struct cl_lock *parent = lck->lls_cl.cls_lock;
1038
1039 LASSERT(cl_lock_is_mutexed(parent));
1040 LASSERT(cl_lock_is_mutexed(sub->lss_cl.cls_lock));
1041
1042 list_del_init(&link->lll_list);
1043 LASSERT(lck->lls_sub[link->lll_idx].sub_lock == sub);
1044
1045 lck->lls_sub[link->lll_idx].sub_lock = NULL;
1046 LASSERT(lck->lls_nr_filled > 0);
1047 lck->lls_nr_filled--;
1048 lu_ref_del(&parent->cll_reference, "lov-child", sub->lss_cl.cls_lock);
1049 cl_lock_put(env, parent);
1050 OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
1051}
1052
1053struct lov_lock_link *lov_lock_link_find(const struct lu_env *env,
1054 struct lov_lock *lck,
1055 struct lovsub_lock *sub)
1056{
1057 struct lov_lock_link *scan;
1058
1059 LASSERT(cl_lock_is_mutexed(sub->lss_cl.cls_lock));
1060
1061 list_for_each_entry(scan, &sub->lss_parents, lll_list) {
1062 if (scan->lll_super == lck)
1063 return scan;
1064 }
1065 return NULL;
1066}
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082static void lov_lock_delete(const struct lu_env *env,
1083 const struct cl_lock_slice *slice)
1084{
1085 struct lov_lock *lck = cl2lov_lock(slice);
1086 struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
1087 struct lov_lock_link *link;
1088 int rc;
1089 int i;
1090
1091 LASSERT(slice->cls_lock->cll_state == CLS_FREEING);
1092
1093 for (i = 0; i < lck->lls_nr; ++i) {
1094 struct lov_lock_sub *lls = &lck->lls_sub[i];
1095 struct lovsub_lock *lsl = lls->sub_lock;
1096
1097 if (lsl == NULL)
1098 continue;
1099
1100 rc = lov_sublock_lock(env, lck, lls, closure, NULL);
1101 if (rc == CLO_REPEAT) {
1102 --i;
1103 continue;
1104 }
1105
1106 LASSERT(rc == 0);
1107 LASSERT(lsl->lss_cl.cls_lock->cll_state < CLS_FREEING);
1108
1109 if (lls->sub_flags & LSF_HELD)
1110 lov_sublock_release(env, lck, i, 1, 0);
1111
1112 link = lov_lock_link_find(env, lck, lsl);
1113 LASSERT(link != NULL);
1114 lov_lock_unlink(env, link, lsl);
1115 LASSERT(lck->lls_sub[i].sub_lock == NULL);
1116
1117 lov_sublock_unlock(env, lsl, closure, NULL);
1118 }
1119
1120 cl_lock_closure_fini(closure);
1121}
1122
1123static int lov_lock_print(const struct lu_env *env, void *cookie,
1124 lu_printer_t p, const struct cl_lock_slice *slice)
1125{
1126 struct lov_lock *lck = cl2lov_lock(slice);
1127 int i;
1128
1129 (*p)(env, cookie, "%d\n", lck->lls_nr);
1130 for (i = 0; i < lck->lls_nr; ++i) {
1131 struct lov_lock_sub *sub;
1132
1133 sub = &lck->lls_sub[i];
1134 (*p)(env, cookie, " %d %x: ", i, sub->sub_flags);
1135 if (sub->sub_lock != NULL)
1136 cl_lock_print(env, cookie, p,
1137 sub->sub_lock->lss_cl.cls_lock);
1138 else
1139 (*p)(env, cookie, "---\n");
1140 }
1141 return 0;
1142}
1143
1144static const struct cl_lock_operations lov_lock_ops = {
1145 .clo_fini = lov_lock_fini,
1146 .clo_enqueue = lov_lock_enqueue,
1147 .clo_wait = lov_lock_wait,
1148 .clo_use = lov_lock_use,
1149 .clo_unuse = lov_lock_unuse,
1150 .clo_cancel = lov_lock_cancel,
1151 .clo_fits_into = lov_lock_fits_into,
1152 .clo_delete = lov_lock_delete,
1153 .clo_print = lov_lock_print
1154};
1155
1156int lov_lock_init_raid0(const struct lu_env *env, struct cl_object *obj,
1157 struct cl_lock *lock, const struct cl_io *io)
1158{
1159 struct lov_lock *lck;
1160 int result;
1161
1162 OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, __GFP_IO);
1163 if (lck != NULL) {
1164 cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_lock_ops);
1165 result = lov_lock_sub_init(env, lck, io);
1166 } else
1167 result = -ENOMEM;
1168 return result;
1169}
1170
1171static void lov_empty_lock_fini(const struct lu_env *env,
1172 struct cl_lock_slice *slice)
1173{
1174 struct lov_lock *lck = cl2lov_lock(slice);
1175 OBD_SLAB_FREE_PTR(lck, lov_lock_kmem);
1176}
1177
1178static int lov_empty_lock_print(const struct lu_env *env, void *cookie,
1179 lu_printer_t p, const struct cl_lock_slice *slice)
1180{
1181 (*p)(env, cookie, "empty\n");
1182 return 0;
1183}
1184
1185
1186static const struct cl_lock_operations lov_empty_lock_ops = {
1187 .clo_fini = lov_empty_lock_fini,
1188 .clo_print = lov_empty_lock_print
1189};
1190
1191int lov_lock_init_empty(const struct lu_env *env, struct cl_object *obj,
1192 struct cl_lock *lock, const struct cl_io *io)
1193{
1194 struct lov_lock *lck;
1195 int result = -ENOMEM;
1196
1197 OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, __GFP_IO);
1198 if (lck != NULL) {
1199 cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_empty_lock_ops);
1200 lck->lls_orig = lock->cll_descr;
1201 result = 0;
1202 }
1203 return result;
1204}
1205
1206static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
1207 struct cl_lock *parent)
1208{
1209 struct cl_lock_closure *closure;
1210
1211 closure = &lov_env_info(env)->lti_closure;
1212 LASSERT(list_empty(&closure->clc_list));
1213 cl_lock_closure_init(env, closure, parent, 1);
1214 return closure;
1215}
1216
1217
1218
1219