1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41#define DEBUG_SUBSYSTEM S_LOV
42
43#include "lov_cl_internal.h"
44
45
46
47
48
49
50
51
52
53
54
55static void lovsub_lock_fini(const struct lu_env *env,
56 struct cl_lock_slice *slice)
57{
58 struct lovsub_lock *lsl;
59
60 ENTRY;
61 lsl = cl2lovsub_lock(slice);
62 LASSERT(list_empty(&lsl->lss_parents));
63 OBD_SLAB_FREE_PTR(lsl, lovsub_lock_kmem);
64 EXIT;
65}
66
67static void lovsub_parent_lock(const struct lu_env *env, struct lov_lock *lov)
68{
69 struct cl_lock *parent;
70
71 ENTRY;
72 parent = lov->lls_cl.cls_lock;
73 cl_lock_get(parent);
74 lu_ref_add(&parent->cll_reference, "lovsub-parent", current);
75 cl_lock_mutex_get(env, parent);
76 EXIT;
77}
78
79static void lovsub_parent_unlock(const struct lu_env *env, struct lov_lock *lov)
80{
81 struct cl_lock *parent;
82
83 ENTRY;
84 parent = lov->lls_cl.cls_lock;
85 cl_lock_mutex_put(env, lov->lls_cl.cls_lock);
86 lu_ref_del(&parent->cll_reference, "lovsub-parent", current);
87 cl_lock_put(env, parent);
88 EXIT;
89}
90
91
92
93
94
95
96static void lovsub_lock_state(const struct lu_env *env,
97 const struct cl_lock_slice *slice,
98 enum cl_lock_state state)
99{
100 struct lovsub_lock *sub = cl2lovsub_lock(slice);
101 struct lov_lock_link *scan;
102
103 LASSERT(cl_lock_is_mutexed(slice->cls_lock));
104 ENTRY;
105
106 list_for_each_entry(scan, &sub->lss_parents, lll_list) {
107 struct lov_lock *lov = scan->lll_super;
108 struct cl_lock *parent = lov->lls_cl.cls_lock;
109
110 if (sub->lss_active != parent) {
111 lovsub_parent_lock(env, lov);
112 cl_lock_signal(env, parent);
113 lovsub_parent_unlock(env, lov);
114 }
115 }
116 EXIT;
117}
118
119
120
121
122
123static unsigned long lovsub_lock_weigh(const struct lu_env *env,
124 const struct cl_lock_slice *slice)
125{
126 struct lovsub_lock *lock = cl2lovsub_lock(slice);
127 struct lov_lock *lov;
128 unsigned long dumbbell;
129
130 ENTRY;
131
132 LASSERT(cl_lock_is_mutexed(slice->cls_lock));
133
134 if (!list_empty(&lock->lss_parents)) {
135
136
137
138
139
140 lov = container_of(lock->lss_parents.next,
141 struct lov_lock_link, lll_list)->lll_super;
142
143 lovsub_parent_lock(env, lov);
144 dumbbell = cl_lock_weigh(env, lov->lls_cl.cls_lock);
145 lovsub_parent_unlock(env, lov);
146 } else
147 dumbbell = 0;
148
149 RETURN(dumbbell);
150}
151
152
153
154
155static void lovsub_lock_descr_map(const struct cl_lock_descr *in,
156 struct lov_object *lov,
157 int stripe, struct cl_lock_descr *out)
158{
159 pgoff_t size;
160 pgoff_t skip;
161
162 pgoff_t start;
163 pgoff_t end;
164
165 ENTRY;
166 start = in->cld_start;
167 end = in->cld_end;
168
169 if (lov->lo_lsm->lsm_stripe_count > 1) {
170 size = cl_index(lov2cl(lov), lov->lo_lsm->lsm_stripe_size);
171 skip = (lov->lo_lsm->lsm_stripe_count - 1) * size;
172
173
174 start += start/size * skip + stripe * size;
175
176 if (end != CL_PAGE_EOF) {
177 end += end/size * skip + stripe * size;
178
179
180
181 if (end < in->cld_end)
182 end = CL_PAGE_EOF;
183 }
184 }
185 out->cld_start = start;
186 out->cld_end = end;
187 EXIT;
188}
189
190
191
192
193
194
195
196
197
198
199
200
201
202int lov_sublock_modify(const struct lu_env *env, struct lov_lock *lov,
203 struct lovsub_lock *sublock,
204 const struct cl_lock_descr *d, int idx)
205{
206 struct cl_lock *parent;
207 struct lovsub_object *subobj;
208 struct cl_lock_descr *pd;
209 struct cl_lock_descr *parent_descr;
210 int result;
211
212 parent = lov->lls_cl.cls_lock;
213 parent_descr = &parent->cll_descr;
214 LASSERT(cl_lock_mode_match(d->cld_mode, parent_descr->cld_mode));
215
216 subobj = cl2lovsub(sublock->lss_cl.cls_obj);
217 pd = &lov_env_info(env)->lti_ldescr;
218
219 pd->cld_obj = parent_descr->cld_obj;
220 pd->cld_mode = parent_descr->cld_mode;
221 pd->cld_gid = parent_descr->cld_gid;
222 lovsub_lock_descr_map(d, subobj->lso_super, subobj->lso_index, pd);
223 lov->lls_sub[idx].sub_got = *d;
224
225
226
227
228 if (!cl_lock_ext_match(parent_descr, pd))
229 result = cl_lock_modify(env, parent, pd);
230 else
231 result = 0;
232 return result;
233}
234
235static int lovsub_lock_modify(const struct lu_env *env,
236 const struct cl_lock_slice *s,
237 const struct cl_lock_descr *d)
238{
239 struct lovsub_lock *lock = cl2lovsub_lock(s);
240 struct lov_lock_link *scan;
241 struct lov_lock *lov;
242 int result = 0;
243
244 ENTRY;
245
246 LASSERT(cl_lock_mode_match(d->cld_mode,
247 s->cls_lock->cll_descr.cld_mode));
248 list_for_each_entry(scan, &lock->lss_parents, lll_list) {
249 int rc;
250
251 lov = scan->lll_super;
252 lovsub_parent_lock(env, lov);
253 rc = lov_sublock_modify(env, lov, lock, d, scan->lll_idx);
254 lovsub_parent_unlock(env, lov);
255 result = result ?: rc;
256 }
257 RETURN(result);
258}
259
260static int lovsub_lock_closure(const struct lu_env *env,
261 const struct cl_lock_slice *slice,
262 struct cl_lock_closure *closure)
263{
264 struct lovsub_lock *sub;
265 struct cl_lock *parent;
266 struct lov_lock_link *scan;
267 int result;
268
269 LASSERT(cl_lock_is_mutexed(slice->cls_lock));
270 ENTRY;
271
272 sub = cl2lovsub_lock(slice);
273 result = 0;
274
275 list_for_each_entry(scan, &sub->lss_parents, lll_list) {
276 parent = scan->lll_super->lls_cl.cls_lock;
277 result = cl_lock_closure_build(env, parent, closure);
278 if (result != 0)
279 break;
280 }
281 RETURN(result);
282}
283
284
285
286
287
288static int lovsub_lock_delete_one(const struct lu_env *env,
289 struct cl_lock *child, struct lov_lock *lov)
290{
291 struct cl_lock *parent;
292 int result;
293 ENTRY;
294
295 parent = lov->lls_cl.cls_lock;
296 if (parent->cll_error)
297 RETURN(0);
298
299 result = 0;
300 switch (parent->cll_state) {
301 case CLS_ENQUEUED:
302
303
304 LASSERT(parent->cll_flags & CLF_CANCELLED);
305 break;
306 case CLS_QUEUING:
307 case CLS_FREEING:
308 cl_lock_signal(env, parent);
309 break;
310 case CLS_INTRANSIT:
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328 if (cl_lock_is_intransit(parent))
329 lov->lls_cancel_race = 1;
330 break;
331 case CLS_CACHED:
332
333
334
335
336
337
338
339 cl_lock_state_set(env, parent, CLS_NEW);
340
341 case CLS_NEW:
342
343
344
345
346 if (lov->lls_nr_filled == 0) {
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373 if (cl_lock_nr_mutexed(env) == 2) {
374 cl_lock_mutex_put(env, child);
375 cl_lock_cancel(env, parent);
376 cl_lock_delete(env, parent);
377 result = 1;
378 }
379 }
380 break;
381 case CLS_HELD:
382 CL_LOCK_DEBUG(D_ERROR, env, parent, "Delete CLS_HELD lock\n");
383 default:
384 CERROR("Impossible state: %d\n", parent->cll_state);
385 LBUG();
386 break;
387 }
388
389 RETURN(result);
390}
391
392
393
394
395
396
397static void lovsub_lock_delete(const struct lu_env *env,
398 const struct cl_lock_slice *slice)
399{
400 struct cl_lock *child = slice->cls_lock;
401 struct lovsub_lock *sub = cl2lovsub_lock(slice);
402 int restart;
403
404 LASSERT(cl_lock_is_mutexed(child));
405
406 ENTRY;
407
408
409
410
411
412
413
414 do {
415 struct lov_lock *lov;
416 struct lov_lock_link *scan;
417 struct lov_lock_link *temp;
418 struct lov_lock_sub *subdata;
419
420 restart = 0;
421 list_for_each_entry_safe(scan, temp,
422 &sub->lss_parents, lll_list) {
423 lov = scan->lll_super;
424 subdata = &lov->lls_sub[scan->lll_idx];
425 lovsub_parent_lock(env, lov);
426 subdata->sub_got = subdata->sub_descr;
427 lov_lock_unlink(env, scan, sub);
428 restart = lovsub_lock_delete_one(env, child, lov);
429 lovsub_parent_unlock(env, lov);
430
431 if (restart) {
432 cl_lock_mutex_get(env, child);
433 break;
434 }
435 }
436 } while (restart);
437 EXIT;
438}
439
440static int lovsub_lock_print(const struct lu_env *env, void *cookie,
441 lu_printer_t p, const struct cl_lock_slice *slice)
442{
443 struct lovsub_lock *sub = cl2lovsub_lock(slice);
444 struct lov_lock *lov;
445 struct lov_lock_link *scan;
446
447 list_for_each_entry(scan, &sub->lss_parents, lll_list) {
448 lov = scan->lll_super;
449 (*p)(env, cookie, "[%d %p ", scan->lll_idx, lov);
450 if (lov != NULL)
451 cl_lock_descr_print(env, cookie, p,
452 &lov->lls_cl.cls_lock->cll_descr);
453 (*p)(env, cookie, "] ");
454 }
455 return 0;
456}
457
458static const struct cl_lock_operations lovsub_lock_ops = {
459 .clo_fini = lovsub_lock_fini,
460 .clo_state = lovsub_lock_state,
461 .clo_delete = lovsub_lock_delete,
462 .clo_modify = lovsub_lock_modify,
463 .clo_closure = lovsub_lock_closure,
464 .clo_weigh = lovsub_lock_weigh,
465 .clo_print = lovsub_lock_print
466};
467
468int lovsub_lock_init(const struct lu_env *env, struct cl_object *obj,
469 struct cl_lock *lock, const struct cl_io *io)
470{
471 struct lovsub_lock *lsk;
472 int result;
473
474 ENTRY;
475 OBD_SLAB_ALLOC_PTR_GFP(lsk, lovsub_lock_kmem, __GFP_IO);
476 if (lsk != NULL) {
477 INIT_LIST_HEAD(&lsk->lss_parents);
478 cl_lock_slice_add(lock, &lsk->lss_cl, obj, &lovsub_lock_ops);
479 result = 0;
480 } else
481 result = -ENOMEM;
482 RETURN(result);
483}
484
485
486