1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57#define DEBUG_SUBSYSTEM S_LDLM
58
59#include "../include/lustre_dlm.h"
60#include "../include/obd_support.h"
61#include "../include/obd_class.h"
62#include "../include/lustre_lib.h"
63#include <linux/list.h>
64#include "ldlm_internal.h"
65
66int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
67 void *data, int flag);
68
69
70
71
72
73
74
75
76
77#define list_for_remaining_safe(pos, n, head) \
78 for (n = pos->next; pos != (head); pos = n, n = pos->next)
79
80static inline int
81ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
82{
83 return((new->l_policy_data.l_flock.owner ==
84 lock->l_policy_data.l_flock.owner) &&
85 (new->l_export == lock->l_export));
86}
87
88static inline int
89ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
90{
91 return((new->l_policy_data.l_flock.start <=
92 lock->l_policy_data.l_flock.end) &&
93 (new->l_policy_data.l_flock.end >=
94 lock->l_policy_data.l_flock.start));
95}
96
97static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
98 struct ldlm_lock *lock)
99{
100
101 if (req->l_export == NULL)
102 return;
103
104 LASSERT(hlist_unhashed(&req->l_exp_flock_hash));
105
106 req->l_policy_data.l_flock.blocking_owner =
107 lock->l_policy_data.l_flock.owner;
108 req->l_policy_data.l_flock.blocking_export =
109 lock->l_export;
110 req->l_policy_data.l_flock.blocking_refs = 0;
111
112 cfs_hash_add(req->l_export->exp_flock_hash,
113 &req->l_policy_data.l_flock.owner,
114 &req->l_exp_flock_hash);
115}
116
117static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
118{
119
120 if (req->l_export == NULL)
121 return;
122
123 check_res_locked(req->l_resource);
124 if (req->l_export->exp_flock_hash != NULL &&
125 !hlist_unhashed(&req->l_exp_flock_hash))
126 cfs_hash_del(req->l_export->exp_flock_hash,
127 &req->l_policy_data.l_flock.owner,
128 &req->l_exp_flock_hash);
129}
130
131static inline void
132ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
133{
134 LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
135 mode, flags);
136
137
138 LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
139
140 list_del_init(&lock->l_res_link);
141 if (flags == LDLM_FL_WAIT_NOREPROC &&
142 !(lock->l_flags & LDLM_FL_FAILED)) {
143
144 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
145
146
147
148 ldlm_lock_decref_internal_nolock(lock, mode);
149 }
150
151 ldlm_lock_destroy_nolock(lock);
152}
153
154
155
156
157
158
159
160
161
162
163static int
164ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
165{
166 struct obd_export *req_exp = req->l_export;
167 struct obd_export *bl_exp = bl_lock->l_export;
168 __u64 req_owner = req->l_policy_data.l_flock.owner;
169 __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
170
171
172 if (req_exp == NULL)
173 return 0;
174
175 class_export_get(bl_exp);
176 while (1) {
177 struct obd_export *bl_exp_new;
178 struct ldlm_lock *lock = NULL;
179 struct ldlm_flock *flock;
180
181 if (bl_exp->exp_flock_hash != NULL)
182 lock = cfs_hash_lookup(bl_exp->exp_flock_hash,
183 &bl_owner);
184 if (lock == NULL)
185 break;
186
187 LASSERT(req != lock);
188 flock = &lock->l_policy_data.l_flock;
189 LASSERT(flock->owner == bl_owner);
190 bl_owner = flock->blocking_owner;
191 bl_exp_new = class_export_get(flock->blocking_export);
192 class_export_put(bl_exp);
193
194 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
195 bl_exp = bl_exp_new;
196
197 if (bl_owner == req_owner && bl_exp == req_exp) {
198 class_export_put(bl_exp);
199 return 1;
200 }
201 }
202 class_export_put(bl_exp);
203
204 return 0;
205}
206
207static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
208 struct list_head *work_list)
209{
210 CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
211
212 if ((exp_connect_flags(lock->l_export) &
213 OBD_CONNECT_FLOCK_DEAD) == 0) {
214 CERROR(
215 "deadlock found, but client doesn't support flock canceliation\n");
216 } else {
217 LASSERT(lock->l_completion_ast);
218 LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0);
219 lock->l_flags |= LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
220 LDLM_FL_FLOCK_DEADLOCK;
221 ldlm_flock_blocking_unlink(lock);
222 ldlm_resource_unlink_lock(lock);
223 ldlm_add_ast_work_item(lock, NULL, work_list);
224 }
225}
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245int
246ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
247 ldlm_error_t *err, struct list_head *work_list)
248{
249 struct ldlm_resource *res = req->l_resource;
250 struct ldlm_namespace *ns = ldlm_res_to_ns(res);
251 struct list_head *tmp;
252 struct list_head *ownlocks = NULL;
253 struct ldlm_lock *lock = NULL;
254 struct ldlm_lock *new = req;
255 struct ldlm_lock *new2 = NULL;
256 ldlm_mode_t mode = req->l_req_mode;
257 int local = ns_is_client(ns);
258 int added = (mode == LCK_NL);
259 int overlaps = 0;
260 int splitted = 0;
261 const struct ldlm_callback_suite null_cbs = { NULL };
262
263 CDEBUG(D_DLMTRACE,
264 "flags %#llx owner %llu pid %u mode %u start %llu end %llu\n",
265 *flags, new->l_policy_data.l_flock.owner,
266 new->l_policy_data.l_flock.pid, mode,
267 req->l_policy_data.l_flock.start,
268 req->l_policy_data.l_flock.end);
269
270 *err = ELDLM_OK;
271
272 if (local) {
273
274
275 req->l_blocking_ast = NULL;
276 } else {
277
278 req->l_blocking_ast = ldlm_flock_blocking_ast;
279 }
280
281reprocess:
282 if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
283
284
285 list_for_each(tmp, &res->lr_granted) {
286 lock = list_entry(tmp, struct ldlm_lock,
287 l_res_link);
288 if (ldlm_same_flock_owner(lock, req)) {
289 ownlocks = tmp;
290 break;
291 }
292 }
293 } else {
294 int reprocess_failed = 0;
295
296 lockmode_verify(mode);
297
298
299
300 list_for_each(tmp, &res->lr_granted) {
301 lock = list_entry(tmp, struct ldlm_lock,
302 l_res_link);
303
304 if (ldlm_same_flock_owner(lock, req)) {
305 if (!ownlocks)
306 ownlocks = tmp;
307 continue;
308 }
309
310
311 if (lockmode_compat(lock->l_granted_mode, mode))
312 continue;
313
314 if (!ldlm_flocks_overlap(lock, req))
315 continue;
316
317 if (!first_enq) {
318 reprocess_failed = 1;
319 if (ldlm_flock_deadlock(req, lock)) {
320 ldlm_flock_cancel_on_deadlock(req,
321 work_list);
322 return LDLM_ITER_CONTINUE;
323 }
324 continue;
325 }
326
327 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
328 ldlm_flock_destroy(req, mode, *flags);
329 *err = -EAGAIN;
330 return LDLM_ITER_STOP;
331 }
332
333 if (*flags & LDLM_FL_TEST_LOCK) {
334 ldlm_flock_destroy(req, mode, *flags);
335 req->l_req_mode = lock->l_granted_mode;
336 req->l_policy_data.l_flock.pid =
337 lock->l_policy_data.l_flock.pid;
338 req->l_policy_data.l_flock.start =
339 lock->l_policy_data.l_flock.start;
340 req->l_policy_data.l_flock.end =
341 lock->l_policy_data.l_flock.end;
342 *flags |= LDLM_FL_LOCK_CHANGED;
343 return LDLM_ITER_STOP;
344 }
345
346
347
348 ldlm_flock_blocking_link(req, lock);
349
350 if (ldlm_flock_deadlock(req, lock)) {
351 ldlm_flock_blocking_unlink(req);
352 ldlm_flock_destroy(req, mode, *flags);
353 *err = -EDEADLK;
354 return LDLM_ITER_STOP;
355 }
356
357 ldlm_resource_add_lock(res, &res->lr_waiting, req);
358 *flags |= LDLM_FL_BLOCK_GRANTED;
359 return LDLM_ITER_STOP;
360 }
361 if (reprocess_failed)
362 return LDLM_ITER_CONTINUE;
363 }
364
365 if (*flags & LDLM_FL_TEST_LOCK) {
366 ldlm_flock_destroy(req, mode, *flags);
367 req->l_req_mode = LCK_NL;
368 *flags |= LDLM_FL_LOCK_CHANGED;
369 return LDLM_ITER_STOP;
370 }
371
372
373
374 ldlm_flock_blocking_unlink(req);
375
376
377
378
379 if (!ownlocks)
380 ownlocks = &res->lr_granted;
381
382 list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
383 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
384
385 if (!ldlm_same_flock_owner(lock, new))
386 break;
387
388 if (lock->l_granted_mode == mode) {
389
390
391
392
393 if ((new->l_policy_data.l_flock.start >
394 (lock->l_policy_data.l_flock.end + 1))
395 && (lock->l_policy_data.l_flock.end !=
396 OBD_OBJECT_EOF))
397 continue;
398
399 if ((new->l_policy_data.l_flock.end <
400 (lock->l_policy_data.l_flock.start - 1))
401 && (lock->l_policy_data.l_flock.start != 0))
402 break;
403
404 if (new->l_policy_data.l_flock.start <
405 lock->l_policy_data.l_flock.start) {
406 lock->l_policy_data.l_flock.start =
407 new->l_policy_data.l_flock.start;
408 } else {
409 new->l_policy_data.l_flock.start =
410 lock->l_policy_data.l_flock.start;
411 }
412
413 if (new->l_policy_data.l_flock.end >
414 lock->l_policy_data.l_flock.end) {
415 lock->l_policy_data.l_flock.end =
416 new->l_policy_data.l_flock.end;
417 } else {
418 new->l_policy_data.l_flock.end =
419 lock->l_policy_data.l_flock.end;
420 }
421
422 if (added) {
423 ldlm_flock_destroy(lock, mode, *flags);
424 } else {
425 new = lock;
426 added = 1;
427 }
428 continue;
429 }
430
431 if (new->l_policy_data.l_flock.start >
432 lock->l_policy_data.l_flock.end)
433 continue;
434
435 if (new->l_policy_data.l_flock.end <
436 lock->l_policy_data.l_flock.start)
437 break;
438
439 ++overlaps;
440
441 if (new->l_policy_data.l_flock.start <=
442 lock->l_policy_data.l_flock.start) {
443 if (new->l_policy_data.l_flock.end <
444 lock->l_policy_data.l_flock.end) {
445 lock->l_policy_data.l_flock.start =
446 new->l_policy_data.l_flock.end + 1;
447 break;
448 }
449 ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
450 continue;
451 }
452 if (new->l_policy_data.l_flock.end >=
453 lock->l_policy_data.l_flock.end) {
454 lock->l_policy_data.l_flock.end =
455 new->l_policy_data.l_flock.start - 1;
456 continue;
457 }
458
459
460
461
462
463
464
465
466
467
468
469
470
471 if (!new2) {
472 unlock_res_and_lock(req);
473 new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
474 lock->l_granted_mode, &null_cbs,
475 NULL, 0, LVB_T_NONE);
476 lock_res_and_lock(req);
477 if (!new2) {
478 ldlm_flock_destroy(req, lock->l_granted_mode,
479 *flags);
480 *err = -ENOLCK;
481 return LDLM_ITER_STOP;
482 }
483 goto reprocess;
484 }
485
486 splitted = 1;
487
488 new2->l_granted_mode = lock->l_granted_mode;
489 new2->l_policy_data.l_flock.pid =
490 new->l_policy_data.l_flock.pid;
491 new2->l_policy_data.l_flock.owner =
492 new->l_policy_data.l_flock.owner;
493 new2->l_policy_data.l_flock.start =
494 lock->l_policy_data.l_flock.start;
495 new2->l_policy_data.l_flock.end =
496 new->l_policy_data.l_flock.start - 1;
497 lock->l_policy_data.l_flock.start =
498 new->l_policy_data.l_flock.end + 1;
499 new2->l_conn_export = lock->l_conn_export;
500 if (lock->l_export != NULL) {
501 new2->l_export = class_export_lock_get(lock->l_export,
502 new2);
503 if (new2->l_export->exp_lock_hash &&
504 hlist_unhashed(&new2->l_exp_hash))
505 cfs_hash_add(new2->l_export->exp_lock_hash,
506 &new2->l_remote_handle,
507 &new2->l_exp_hash);
508 }
509 if (*flags == LDLM_FL_WAIT_NOREPROC)
510 ldlm_lock_addref_internal_nolock(new2,
511 lock->l_granted_mode);
512
513
514 ldlm_resource_add_lock(res, ownlocks, new2);
515 LDLM_LOCK_RELEASE(new2);
516 break;
517 }
518
519
520 if (splitted == 0 && new2 != NULL)
521 ldlm_lock_destroy_nolock(new2);
522
523
524 req->l_granted_mode = req->l_req_mode;
525
526
527 if (!added) {
528 list_del_init(&req->l_res_link);
529
530 ldlm_resource_add_lock(res, ownlocks, req);
531 }
532
533 if (*flags != LDLM_FL_WAIT_NOREPROC) {
534
535
536
537 CERROR("Illegal parameter for client-side-only module.\n");
538 LBUG();
539 }
540
541
542
543
544
545 if (added)
546 ldlm_flock_destroy(req, mode, *flags);
547
548 ldlm_resource_dump(D_INFO, res);
549 return LDLM_ITER_CONTINUE;
550}
551
552struct ldlm_flock_wait_data {
553 struct ldlm_lock *fwd_lock;
554 int fwd_generation;
555};
556
557static void
558ldlm_flock_interrupted_wait(void *data)
559{
560 struct ldlm_lock *lock;
561
562 lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
563
564
565 lock_res_and_lock(lock);
566 ldlm_flock_blocking_unlink(lock);
567
568
569 lock->l_flags |= LDLM_FL_CBPENDING;
570 unlock_res_and_lock(lock);
571}
572
573
574
575
576
577
578
579
580
581
582
583int
584ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
585{
586 struct file_lock *getlk = lock->l_ast_data;
587 struct obd_device *obd;
588 struct obd_import *imp = NULL;
589 struct ldlm_flock_wait_data fwd;
590 struct l_wait_info lwi;
591 ldlm_error_t err;
592 int rc = 0;
593
594 CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
595 flags, data, getlk);
596
597
598
599
600
601 if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
602 (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
603 if (lock->l_req_mode == lock->l_granted_mode &&
604 lock->l_granted_mode != LCK_NL &&
605 NULL == data)
606 ldlm_lock_decref_internal(lock, lock->l_req_mode);
607
608
609 wake_up(&lock->l_waitq);
610 return 0;
611 }
612
613 LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
614
615 if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
616 LDLM_FL_BLOCK_CONV))) {
617 if (NULL == data)
618
619 goto granted;
620
621 wake_up(&lock->l_waitq);
622 return 0;
623 }
624
625 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, sleeping");
626 fwd.fwd_lock = lock;
627 obd = class_exp2obd(lock->l_conn_export);
628
629
630 if (NULL != obd)
631 imp = obd->u.cli.cl_import;
632
633 if (NULL != imp) {
634 spin_lock(&imp->imp_lock);
635 fwd.fwd_generation = imp->imp_generation;
636 spin_unlock(&imp->imp_lock);
637 }
638
639 lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
640
641
642 rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
643
644 if (rc) {
645 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
646 rc);
647 return rc;
648 }
649
650granted:
651 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
652
653 if (lock->l_flags & LDLM_FL_DESTROYED) {
654 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
655 return 0;
656 }
657
658 if (lock->l_flags & LDLM_FL_FAILED) {
659 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
660 return -EIO;
661 }
662
663 if (rc) {
664 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
665 rc);
666 return rc;
667 }
668
669 LDLM_DEBUG(lock, "client-side enqueue granted");
670
671 lock_res_and_lock(lock);
672
673
674 ldlm_flock_blocking_unlink(lock);
675
676
677 list_del_init(&lock->l_res_link);
678
679 if (lock->l_flags & LDLM_FL_FLOCK_DEADLOCK) {
680 LDLM_DEBUG(lock, "client-side enqueue deadlock received");
681 rc = -EDEADLK;
682 } else if (flags & LDLM_FL_TEST_LOCK) {
683
684
685
686 ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
687 switch (lock->l_granted_mode) {
688 case LCK_PR:
689 getlk->fl_type = F_RDLCK;
690 break;
691 case LCK_PW:
692 getlk->fl_type = F_WRLCK;
693 break;
694 default:
695 getlk->fl_type = F_UNLCK;
696 }
697 getlk->fl_pid = (pid_t)lock->l_policy_data.l_flock.pid;
698 getlk->fl_start = (loff_t)lock->l_policy_data.l_flock.start;
699 getlk->fl_end = (loff_t)lock->l_policy_data.l_flock.end;
700 } else {
701 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
702
703
704
705 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
706 }
707 unlock_res_and_lock(lock);
708 return rc;
709}
710EXPORT_SYMBOL(ldlm_flock_completion_ast);
711
712int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
713 void *data, int flag)
714{
715 LASSERT(lock);
716 LASSERT(flag == LDLM_CB_CANCELING);
717
718
719 lock_res_and_lock(lock);
720 ldlm_flock_blocking_unlink(lock);
721 unlock_res_and_lock(lock);
722 return 0;
723}
724
725void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
726 ldlm_policy_data_t *lpolicy)
727{
728 memset(lpolicy, 0, sizeof(*lpolicy));
729 lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
730 lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
731 lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
732
733
734
735 lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
736}
737
738
739void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
740 ldlm_policy_data_t *lpolicy)
741{
742 memset(lpolicy, 0, sizeof(*lpolicy));
743 lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
744 lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
745 lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
746 lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
747}
748
749void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
750 ldlm_wire_policy_data_t *wpolicy)
751{
752 memset(wpolicy, 0, sizeof(*wpolicy));
753 wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
754 wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
755 wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
756 wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
757}
758
759
760
761
762static unsigned
763ldlm_export_flock_hash(struct cfs_hash *hs, const void *key, unsigned mask)
764{
765 return cfs_hash_u64_hash(*(__u64 *)key, mask);
766}
767
768static void *
769ldlm_export_flock_key(struct hlist_node *hnode)
770{
771 struct ldlm_lock *lock;
772
773 lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
774 return &lock->l_policy_data.l_flock.owner;
775}
776
777static int
778ldlm_export_flock_keycmp(const void *key, struct hlist_node *hnode)
779{
780 return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
781}
782
783static void *
784ldlm_export_flock_object(struct hlist_node *hnode)
785{
786 return hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
787}
788
789static void
790ldlm_export_flock_get(struct cfs_hash *hs, struct hlist_node *hnode)
791{
792 struct ldlm_lock *lock;
793 struct ldlm_flock *flock;
794
795 lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
796 LDLM_LOCK_GET(lock);
797
798 flock = &lock->l_policy_data.l_flock;
799 LASSERT(flock->blocking_export != NULL);
800 class_export_get(flock->blocking_export);
801 flock->blocking_refs++;
802}
803
804static void
805ldlm_export_flock_put(struct cfs_hash *hs, struct hlist_node *hnode)
806{
807 struct ldlm_lock *lock;
808 struct ldlm_flock *flock;
809
810 lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
811 LDLM_LOCK_RELEASE(lock);
812
813 flock = &lock->l_policy_data.l_flock;
814 LASSERT(flock->blocking_export != NULL);
815 class_export_put(flock->blocking_export);
816 if (--flock->blocking_refs == 0) {
817 flock->blocking_owner = 0;
818 flock->blocking_export = NULL;
819 }
820}
821
822static cfs_hash_ops_t ldlm_export_flock_ops = {
823 .hs_hash = ldlm_export_flock_hash,
824 .hs_key = ldlm_export_flock_key,
825 .hs_keycmp = ldlm_export_flock_keycmp,
826 .hs_object = ldlm_export_flock_object,
827 .hs_get = ldlm_export_flock_get,
828 .hs_put = ldlm_export_flock_put,
829 .hs_put_locked = ldlm_export_flock_put,
830};
831
832int ldlm_init_flock_export(struct obd_export *exp)
833{
834 if (strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDT_NAME) != 0)
835 return 0;
836
837 exp->exp_flock_hash =
838 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
839 HASH_EXP_LOCK_CUR_BITS,
840 HASH_EXP_LOCK_MAX_BITS,
841 HASH_EXP_LOCK_BKT_BITS, 0,
842 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
843 &ldlm_export_flock_ops,
844 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
845 if (!exp->exp_flock_hash)
846 return -ENOMEM;
847
848 return 0;
849}
850EXPORT_SYMBOL(ldlm_init_flock_export);
851
852void ldlm_destroy_flock_export(struct obd_export *exp)
853{
854 if (exp->exp_flock_hash) {
855 cfs_hash_putref(exp->exp_flock_hash);
856 exp->exp_flock_hash = NULL;
857 }
858}
859EXPORT_SYMBOL(ldlm_destroy_flock_export);
860