1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42#define DEBUG_SUBSYSTEM S_LDLM
43
44#include "../../include/linux/libcfs/libcfs.h"
45#include "../include/lustre_dlm.h"
46#include "../include/obd_class.h"
47#include <linux/list.h>
48#include "ldlm_internal.h"
49
50static int ldlm_num_threads;
51module_param(ldlm_num_threads, int, 0444);
52MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start");
53
54static char *ldlm_cpts;
55module_param(ldlm_cpts, charp, 0444);
56MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on");
57
58extern struct kmem_cache *ldlm_resource_slab;
59extern struct kmem_cache *ldlm_lock_slab;
60static struct mutex ldlm_ref_mutex;
61static int ldlm_refcount;
62
63struct ldlm_cb_async_args {
64 struct ldlm_cb_set_arg *ca_set_arg;
65 struct ldlm_lock *ca_lock;
66};
67
68
69
70static struct ldlm_state *ldlm_state;
71
72inline unsigned long round_timeout(unsigned long timeout)
73{
74 return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
75}
76
77
78static inline unsigned int ldlm_get_rq_timeout(void)
79{
80
81 unsigned int timeout = min(ldlm_timeout, obd_timeout / 3);
82
83 return timeout < 1 ? 1 : timeout;
84}
85
86#define ELT_STOPPED 0
87#define ELT_READY 1
88#define ELT_TERMINATE 2
89
90struct ldlm_bl_pool {
91 spinlock_t blp_lock;
92
93
94
95
96
97
98 struct list_head blp_prio_list;
99
100
101
102
103
104 struct list_head blp_list;
105
106 wait_queue_head_t blp_waitq;
107 struct completion blp_comp;
108 atomic_t blp_num_threads;
109 atomic_t blp_busy_threads;
110 int blp_min_threads;
111 int blp_max_threads;
112};
113
114struct ldlm_bl_work_item {
115 struct list_head blwi_entry;
116 struct ldlm_namespace *blwi_ns;
117 struct ldlm_lock_desc blwi_ld;
118 struct ldlm_lock *blwi_lock;
119 struct list_head blwi_head;
120 int blwi_count;
121 struct completion blwi_comp;
122 ldlm_cancel_flags_t blwi_flags;
123 int blwi_mem_pressure;
124};
125
126
127int ldlm_del_waiting_lock(struct ldlm_lock *lock)
128{
129 return 0;
130}
131
132int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
133{
134 return 0;
135}
136
137
138
139
140
141
142
143
144void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
145 struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
146{
147 int do_ast;
148
149 LDLM_DEBUG(lock, "client blocking AST callback handler");
150
151 lock_res_and_lock(lock);
152 lock->l_flags |= LDLM_FL_CBPENDING;
153
154 if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
155 lock->l_flags |= LDLM_FL_CANCEL;
156
157 do_ast = (!lock->l_readers && !lock->l_writers);
158 unlock_res_and_lock(lock);
159
160 if (do_ast) {
161 CDEBUG(D_DLMTRACE, "Lock %p already unused, calling callback (%p)\n",
162 lock, lock->l_blocking_ast);
163 if (lock->l_blocking_ast != NULL)
164 lock->l_blocking_ast(lock, ld, lock->l_ast_data,
165 LDLM_CB_BLOCKING);
166 } else {
167 CDEBUG(D_DLMTRACE, "Lock %p is referenced, will be cancelled later\n",
168 lock);
169 }
170
171 LDLM_DEBUG(lock, "client blocking callback handler END");
172 LDLM_LOCK_RELEASE(lock);
173}
174
175
176
177
178
179
180static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
181 struct ldlm_namespace *ns,
182 struct ldlm_request *dlm_req,
183 struct ldlm_lock *lock)
184{
185 int lvb_len;
186 LIST_HEAD(ast_list);
187 int rc = 0;
188
189 LDLM_DEBUG(lock, "client completion callback handler START");
190
191 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
192 int to = cfs_time_seconds(1);
193 while (to > 0) {
194 set_current_state(TASK_INTERRUPTIBLE);
195 schedule_timeout(to);
196 if (lock->l_granted_mode == lock->l_req_mode ||
197 lock->l_flags & LDLM_FL_DESTROYED)
198 break;
199 }
200 }
201
202 lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
203 if (lvb_len < 0) {
204 LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", lvb_len);
205 rc = lvb_len;
206 goto out;
207 } else if (lvb_len > 0) {
208 if (lock->l_lvb_len > 0) {
209
210 LASSERT(lock->l_lvb_data != NULL);
211
212 if (unlikely(lock->l_lvb_len < lvb_len)) {
213 LDLM_ERROR(lock, "Replied LVB is larger than "
214 "expectation, expected = %d, "
215 "replied = %d",
216 lock->l_lvb_len, lvb_len);
217 rc = -EINVAL;
218 goto out;
219 }
220 } else if (ldlm_has_layout(lock)) {
221
222 void *lvb_data;
223
224 OBD_ALLOC(lvb_data, lvb_len);
225 if (lvb_data == NULL) {
226 LDLM_ERROR(lock, "No memory: %d.\n", lvb_len);
227 rc = -ENOMEM;
228 goto out;
229 }
230
231 lock_res_and_lock(lock);
232 LASSERT(lock->l_lvb_data == NULL);
233 lock->l_lvb_type = LVB_T_LAYOUT;
234 lock->l_lvb_data = lvb_data;
235 lock->l_lvb_len = lvb_len;
236 unlock_res_and_lock(lock);
237 }
238 }
239
240 lock_res_and_lock(lock);
241 if ((lock->l_flags & LDLM_FL_DESTROYED) ||
242 lock->l_granted_mode == lock->l_req_mode) {
243
244 unlock_res_and_lock(lock);
245 LDLM_DEBUG(lock, "Double grant race happened");
246 rc = 0;
247 goto out;
248 }
249
250
251
252 if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
253 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
254 LDLM_DEBUG(lock, "completion AST, new lock mode");
255 }
256
257 if (lock->l_resource->lr_type != LDLM_PLAIN) {
258 ldlm_convert_policy_to_local(req->rq_export,
259 dlm_req->lock_desc.l_resource.lr_type,
260 &dlm_req->lock_desc.l_policy_data,
261 &lock->l_policy_data);
262 LDLM_DEBUG(lock, "completion AST, new policy data");
263 }
264
265 ldlm_resource_unlink_lock(lock);
266 if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
267 &lock->l_resource->lr_name,
268 sizeof(lock->l_resource->lr_name)) != 0) {
269 unlock_res_and_lock(lock);
270 rc = ldlm_lock_change_resource(ns, lock,
271 &dlm_req->lock_desc.l_resource.lr_name);
272 if (rc < 0) {
273 LDLM_ERROR(lock, "Failed to allocate resource");
274 goto out;
275 }
276 LDLM_DEBUG(lock, "completion AST, new resource");
277 CERROR("change resource!\n");
278 lock_res_and_lock(lock);
279 }
280
281 if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
282
283
284 ldlm_lock_remove_from_lru(lock);
285 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
286 LDLM_DEBUG(lock, "completion AST includes blocking AST");
287 }
288
289 if (lock->l_lvb_len > 0) {
290 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_CLIENT,
291 lock->l_lvb_data, lvb_len);
292 if (rc < 0) {
293 unlock_res_and_lock(lock);
294 goto out;
295 }
296 }
297
298 ldlm_grant_lock(lock, &ast_list);
299 unlock_res_and_lock(lock);
300
301 LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
302
303
304
305 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
306
307 ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
308
309 LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
310 lock);
311 goto out;
312
313out:
314 if (rc < 0) {
315 lock_res_and_lock(lock);
316 lock->l_flags |= LDLM_FL_FAILED;
317 unlock_res_and_lock(lock);
318 wake_up(&lock->l_waitq);
319 }
320 LDLM_LOCK_RELEASE(lock);
321}
322
323
324
325
326
327
328
329
330static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
331 struct ldlm_namespace *ns,
332 struct ldlm_request *dlm_req,
333 struct ldlm_lock *lock)
334{
335 int rc = -ENOSYS;
336
337 LDLM_DEBUG(lock, "client glimpse AST callback handler");
338
339 if (lock->l_glimpse_ast != NULL)
340 rc = lock->l_glimpse_ast(lock, req);
341
342 if (req->rq_repmsg != NULL) {
343 ptlrpc_reply(req);
344 } else {
345 req->rq_status = rc;
346 ptlrpc_error(req);
347 }
348
349 lock_res_and_lock(lock);
350 if (lock->l_granted_mode == LCK_PW &&
351 !lock->l_readers && !lock->l_writers &&
352 cfs_time_after(cfs_time_current(),
353 cfs_time_add(lock->l_last_used,
354 cfs_time_seconds(10)))) {
355 unlock_res_and_lock(lock);
356 if (ldlm_bl_to_thread_lock(ns, NULL, lock))
357 ldlm_handle_bl_callback(ns, NULL, lock);
358
359 return;
360 }
361 unlock_res_and_lock(lock);
362 LDLM_LOCK_RELEASE(lock);
363}
364
365static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
366{
367 if (req->rq_no_reply)
368 return 0;
369
370 req->rq_status = rc;
371 if (!req->rq_packed_final) {
372 rc = lustre_pack_reply(req, 1, NULL, NULL);
373 if (rc)
374 return rc;
375 }
376 return ptlrpc_reply(req);
377}
378
379static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
380 ldlm_cancel_flags_t cancel_flags)
381{
382 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
383
384 spin_lock(&blp->blp_lock);
385 if (blwi->blwi_lock &&
386 blwi->blwi_lock->l_flags & LDLM_FL_DISCARD_DATA) {
387
388 list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
389 } else {
390
391 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
392 }
393 spin_unlock(&blp->blp_lock);
394
395 wake_up(&blp->blp_waitq);
396
397
398
399 if (!(cancel_flags & LCF_ASYNC))
400 wait_for_completion(&blwi->blwi_comp);
401
402 return 0;
403}
404
405static inline void init_blwi(struct ldlm_bl_work_item *blwi,
406 struct ldlm_namespace *ns,
407 struct ldlm_lock_desc *ld,
408 struct list_head *cancels, int count,
409 struct ldlm_lock *lock,
410 ldlm_cancel_flags_t cancel_flags)
411{
412 init_completion(&blwi->blwi_comp);
413 INIT_LIST_HEAD(&blwi->blwi_head);
414
415 if (memory_pressure_get())
416 blwi->blwi_mem_pressure = 1;
417
418 blwi->blwi_ns = ns;
419 blwi->blwi_flags = cancel_flags;
420 if (ld != NULL)
421 blwi->blwi_ld = *ld;
422 if (count) {
423 list_add(&blwi->blwi_head, cancels);
424 list_del_init(cancels);
425 blwi->blwi_count = count;
426 } else {
427 blwi->blwi_lock = lock;
428 }
429}
430
431
432
433
434
435
436
437
438
439
440static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
441 struct ldlm_lock_desc *ld,
442 struct ldlm_lock *lock,
443 struct list_head *cancels, int count,
444 ldlm_cancel_flags_t cancel_flags)
445{
446 if (cancels && count == 0)
447 return 0;
448
449 if (cancel_flags & LCF_ASYNC) {
450 struct ldlm_bl_work_item *blwi;
451
452 OBD_ALLOC(blwi, sizeof(*blwi));
453 if (blwi == NULL)
454 return -ENOMEM;
455 init_blwi(blwi, ns, ld, cancels, count, lock, cancel_flags);
456
457 return __ldlm_bl_to_thread(blwi, cancel_flags);
458 } else {
459
460
461
462 struct ldlm_bl_work_item blwi;
463
464 memset(&blwi, 0, sizeof(blwi));
465 init_blwi(&blwi, ns, ld, cancels, count, lock, cancel_flags);
466 return __ldlm_bl_to_thread(&blwi, cancel_flags);
467 }
468}
469
470
471int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
472 struct ldlm_lock *lock)
473{
474 return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
475}
476
477int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
478 struct list_head *cancels, int count,
479 ldlm_cancel_flags_t cancel_flags)
480{
481 return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
482}
483
484
485static int ldlm_handle_setinfo(struct ptlrpc_request *req)
486{
487 struct obd_device *obd = req->rq_export->exp_obd;
488 char *key;
489 void *val;
490 int keylen, vallen;
491 int rc = -ENOSYS;
492
493 DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
494
495 req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
496
497 key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
498 if (key == NULL) {
499 DEBUG_REQ(D_IOCTL, req, "no set_info key");
500 return -EFAULT;
501 }
502 keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
503 RCL_CLIENT);
504 val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
505 if (val == NULL) {
506 DEBUG_REQ(D_IOCTL, req, "no set_info val");
507 return -EFAULT;
508 }
509 vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
510 RCL_CLIENT);
511
512
513
514 if (KEY_IS(KEY_HSM_COPYTOOL_SEND))
515
516 rc = obd_set_info_async(req->rq_svc_thread->t_env,
517 req->rq_export,
518 sizeof(KEY_HSM_COPYTOOL_SEND),
519 KEY_HSM_COPYTOOL_SEND,
520 vallen, val, NULL);
521 else
522 DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key);
523
524 return rc;
525}
526
527static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
528 const char *msg, int rc,
529 struct lustre_handle *handle)
530{
531 DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
532 "%s: [nid %s] [rc %d] [lock %#llx]",
533 msg, libcfs_id2str(req->rq_peer), rc,
534 handle ? handle->cookie : 0);
535 if (req->rq_no_reply)
536 CWARN("No reply was sent, maybe cause bug 21636.\n");
537 else if (rc)
538 CWARN("Send reply failed, maybe cause bug 21636.\n");
539}
540
541static int ldlm_handle_qc_callback(struct ptlrpc_request *req)
542{
543 struct obd_quotactl *oqctl;
544 struct client_obd *cli = &req->rq_export->exp_obd->u.cli;
545
546 oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
547 if (oqctl == NULL) {
548 CERROR("Can't unpack obd_quotactl\n");
549 return -EPROTO;
550 }
551
552 oqctl->qc_stat = ptlrpc_status_ntoh(oqctl->qc_stat);
553
554 cli->cl_qchk_stat = oqctl->qc_stat;
555 return 0;
556}
557
558
559static int ldlm_callback_handler(struct ptlrpc_request *req)
560{
561 struct ldlm_namespace *ns;
562 struct ldlm_request *dlm_req;
563 struct ldlm_lock *lock;
564 int rc;
565
566
567
568
569
570
571
572 if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI)
573 return 0;
574
575 req_capsule_init(&req->rq_pill, req, RCL_SERVER);
576
577 if (req->rq_export == NULL) {
578 rc = ldlm_callback_reply(req, -ENOTCONN);
579 ldlm_callback_errmsg(req, "Operate on unconnected server",
580 rc, NULL);
581 return 0;
582 }
583
584 LASSERT(req->rq_export != NULL);
585 LASSERT(req->rq_export->exp_obd != NULL);
586
587 switch (lustre_msg_get_opc(req->rq_reqmsg)) {
588 case LDLM_BL_CALLBACK:
589 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
590 return 0;
591 break;
592 case LDLM_CP_CALLBACK:
593 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
594 return 0;
595 break;
596 case LDLM_GL_CALLBACK:
597 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
598 return 0;
599 break;
600 case LDLM_SET_INFO:
601 rc = ldlm_handle_setinfo(req);
602 ldlm_callback_reply(req, rc);
603 return 0;
604 case OBD_QC_CALLBACK:
605 req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK);
606 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET))
607 return 0;
608 rc = ldlm_handle_qc_callback(req);
609 ldlm_callback_reply(req, rc);
610 return 0;
611 default:
612 CERROR("unknown opcode %u\n",
613 lustre_msg_get_opc(req->rq_reqmsg));
614 ldlm_callback_reply(req, -EPROTO);
615 return 0;
616 }
617
618 ns = req->rq_export->exp_obd->obd_namespace;
619 LASSERT(ns != NULL);
620
621 req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
622
623 dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
624 if (dlm_req == NULL) {
625 rc = ldlm_callback_reply(req, -EPROTO);
626 ldlm_callback_errmsg(req, "Operate without parameter", rc,
627 NULL);
628 return 0;
629 }
630
631
632
633 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
634 lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
635 rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
636 if (rc < 0)
637 CERROR("ldlm_cli_cancel: %d\n", rc);
638 }
639
640 lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0);
641 if (!lock) {
642 CDEBUG(D_DLMTRACE, "callback on lock %#llx - lock "
643 "disappeared\n", dlm_req->lock_handle[0].cookie);
644 rc = ldlm_callback_reply(req, -EINVAL);
645 ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
646 &dlm_req->lock_handle[0]);
647 return 0;
648 }
649
650 if ((lock->l_flags & LDLM_FL_FAIL_LOC) &&
651 lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
652 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
653
654
655 lock_res_and_lock(lock);
656 lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
657 LDLM_AST_FLAGS);
658 if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
659
660
661
662
663 if (((lock->l_flags & LDLM_FL_CANCELING) &&
664 (lock->l_flags & LDLM_FL_BL_DONE)) ||
665 (lock->l_flags & LDLM_FL_FAILED)) {
666 LDLM_DEBUG(lock, "callback on lock "
667 "%#llx - lock disappeared\n",
668 dlm_req->lock_handle[0].cookie);
669 unlock_res_and_lock(lock);
670 LDLM_LOCK_RELEASE(lock);
671 rc = ldlm_callback_reply(req, -EINVAL);
672 ldlm_callback_errmsg(req, "Operate on stale lock", rc,
673 &dlm_req->lock_handle[0]);
674 return 0;
675 }
676
677
678 ldlm_lock_remove_from_lru(lock);
679 lock->l_flags |= LDLM_FL_BL_AST;
680 }
681 unlock_res_and_lock(lock);
682
683
684
685
686
687
688
689
690
691
692 switch (lustre_msg_get_opc(req->rq_reqmsg)) {
693 case LDLM_BL_CALLBACK:
694 CDEBUG(D_INODE, "blocking ast\n");
695 req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
696 if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) {
697 rc = ldlm_callback_reply(req, 0);
698 if (req->rq_no_reply || rc)
699 ldlm_callback_errmsg(req, "Normal process", rc,
700 &dlm_req->lock_handle[0]);
701 }
702 if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
703 ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
704 break;
705 case LDLM_CP_CALLBACK:
706 CDEBUG(D_INODE, "completion ast\n");
707 req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
708 ldlm_callback_reply(req, 0);
709 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
710 break;
711 case LDLM_GL_CALLBACK:
712 CDEBUG(D_INODE, "glimpse ast\n");
713 req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
714 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
715 break;
716 default:
717 LBUG();
718 }
719
720 return 0;
721}
722
723
724static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
725{
726 struct ldlm_bl_work_item *blwi = NULL;
727 static unsigned int num_bl = 0;
728
729 spin_lock(&blp->blp_lock);
730
731 if (!list_empty(&blp->blp_list) &&
732 (list_empty(&blp->blp_prio_list) || num_bl == 0))
733 blwi = list_entry(blp->blp_list.next,
734 struct ldlm_bl_work_item, blwi_entry);
735 else
736 if (!list_empty(&blp->blp_prio_list))
737 blwi = list_entry(blp->blp_prio_list.next,
738 struct ldlm_bl_work_item,
739 blwi_entry);
740
741 if (blwi) {
742 if (++num_bl >= atomic_read(&blp->blp_num_threads))
743 num_bl = 0;
744 list_del(&blwi->blwi_entry);
745 }
746 spin_unlock(&blp->blp_lock);
747
748 return blwi;
749}
750
751
752struct ldlm_bl_thread_data {
753 char bltd_name[CFS_CURPROC_COMM_MAX];
754 struct ldlm_bl_pool *bltd_blp;
755 struct completion bltd_comp;
756 int bltd_num;
757};
758
759static int ldlm_bl_thread_main(void *arg);
760
761static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
762{
763 struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
764 struct task_struct *task;
765
766 init_completion(&bltd.bltd_comp);
767 bltd.bltd_num = atomic_read(&blp->blp_num_threads);
768 snprintf(bltd.bltd_name, sizeof(bltd.bltd_name),
769 "ldlm_bl_%02d", bltd.bltd_num);
770 task = kthread_run(ldlm_bl_thread_main, &bltd, "%s", bltd.bltd_name);
771 if (IS_ERR(task)) {
772 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
773 atomic_read(&blp->blp_num_threads), PTR_ERR(task));
774 return PTR_ERR(task);
775 }
776 wait_for_completion(&bltd.bltd_comp);
777
778 return 0;
779}
780
781
782
783
784
785
786
787
788static int ldlm_bl_thread_main(void *arg)
789{
790 struct ldlm_bl_pool *blp;
791
792 {
793 struct ldlm_bl_thread_data *bltd = arg;
794
795 blp = bltd->bltd_blp;
796
797 atomic_inc(&blp->blp_num_threads);
798 atomic_inc(&blp->blp_busy_threads);
799
800 complete(&bltd->bltd_comp);
801
802 }
803
804 while (1) {
805 struct l_wait_info lwi = { 0 };
806 struct ldlm_bl_work_item *blwi = NULL;
807 int busy;
808
809 blwi = ldlm_bl_get_work(blp);
810
811 if (blwi == NULL) {
812 atomic_dec(&blp->blp_busy_threads);
813 l_wait_event_exclusive(blp->blp_waitq,
814 (blwi = ldlm_bl_get_work(blp)) != NULL,
815 &lwi);
816 busy = atomic_inc_return(&blp->blp_busy_threads);
817 } else {
818 busy = atomic_read(&blp->blp_busy_threads);
819 }
820
821 if (blwi->blwi_ns == NULL)
822
823 break;
824
825
826 if (unlikely(busy < blp->blp_max_threads &&
827 busy >= atomic_read(&blp->blp_num_threads) &&
828 !blwi->blwi_mem_pressure))
829
830 ldlm_bl_thread_start(blp);
831
832 if (blwi->blwi_mem_pressure)
833 memory_pressure_set();
834
835 if (blwi->blwi_count) {
836 int count;
837
838
839
840
841 count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
842 blwi->blwi_count,
843 LCF_BL_AST);
844 ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
845 blwi->blwi_flags);
846 } else {
847 ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
848 blwi->blwi_lock);
849 }
850 if (blwi->blwi_mem_pressure)
851 memory_pressure_clr();
852
853 if (blwi->blwi_flags & LCF_ASYNC)
854 OBD_FREE(blwi, sizeof(*blwi));
855 else
856 complete(&blwi->blwi_comp);
857 }
858
859 atomic_dec(&blp->blp_busy_threads);
860 atomic_dec(&blp->blp_num_threads);
861 complete(&blp->blp_comp);
862 return 0;
863}
864
865
866static int ldlm_setup(void);
867static int ldlm_cleanup(void);
868
869int ldlm_get_ref(void)
870{
871 int rc = 0;
872
873 mutex_lock(&ldlm_ref_mutex);
874 if (++ldlm_refcount == 1) {
875 rc = ldlm_setup();
876 if (rc)
877 ldlm_refcount--;
878 }
879 mutex_unlock(&ldlm_ref_mutex);
880
881 return rc;
882}
883EXPORT_SYMBOL(ldlm_get_ref);
884
885void ldlm_put_ref(void)
886{
887 mutex_lock(&ldlm_ref_mutex);
888 if (ldlm_refcount == 1) {
889 int rc = ldlm_cleanup();
890 if (rc)
891 CERROR("ldlm_cleanup failed: %d\n", rc);
892 else
893 ldlm_refcount--;
894 } else {
895 ldlm_refcount--;
896 }
897 mutex_unlock(&ldlm_ref_mutex);
898}
899EXPORT_SYMBOL(ldlm_put_ref);
900
901
902
903
904static unsigned
905ldlm_export_lock_hash(struct cfs_hash *hs, const void *key, unsigned mask)
906{
907 return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask);
908}
909
910static void *
911ldlm_export_lock_key(struct hlist_node *hnode)
912{
913 struct ldlm_lock *lock;
914
915 lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
916 return &lock->l_remote_handle;
917}
918
919static void
920ldlm_export_lock_keycpy(struct hlist_node *hnode, void *key)
921{
922 struct ldlm_lock *lock;
923
924 lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
925 lock->l_remote_handle = *(struct lustre_handle *)key;
926}
927
928static int
929ldlm_export_lock_keycmp(const void *key, struct hlist_node *hnode)
930{
931 return lustre_handle_equal(ldlm_export_lock_key(hnode), key);
932}
933
934static void *
935ldlm_export_lock_object(struct hlist_node *hnode)
936{
937 return hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
938}
939
940static void
941ldlm_export_lock_get(struct cfs_hash *hs, struct hlist_node *hnode)
942{
943 struct ldlm_lock *lock;
944
945 lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
946 LDLM_LOCK_GET(lock);
947}
948
949static void
950ldlm_export_lock_put(struct cfs_hash *hs, struct hlist_node *hnode)
951{
952 struct ldlm_lock *lock;
953
954 lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
955 LDLM_LOCK_RELEASE(lock);
956}
957
958static cfs_hash_ops_t ldlm_export_lock_ops = {
959 .hs_hash = ldlm_export_lock_hash,
960 .hs_key = ldlm_export_lock_key,
961 .hs_keycmp = ldlm_export_lock_keycmp,
962 .hs_keycpy = ldlm_export_lock_keycpy,
963 .hs_object = ldlm_export_lock_object,
964 .hs_get = ldlm_export_lock_get,
965 .hs_put = ldlm_export_lock_put,
966 .hs_put_locked = ldlm_export_lock_put,
967};
968
969int ldlm_init_export(struct obd_export *exp)
970{
971 int rc;
972 exp->exp_lock_hash =
973 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
974 HASH_EXP_LOCK_CUR_BITS,
975 HASH_EXP_LOCK_MAX_BITS,
976 HASH_EXP_LOCK_BKT_BITS, 0,
977 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
978 &ldlm_export_lock_ops,
979 CFS_HASH_DEFAULT | CFS_HASH_REHASH_KEY |
980 CFS_HASH_NBLK_CHANGE);
981
982 if (!exp->exp_lock_hash)
983 return -ENOMEM;
984
985 rc = ldlm_init_flock_export(exp);
986 if (rc)
987 goto err;
988
989 return 0;
990err:
991 ldlm_destroy_export(exp);
992 return rc;
993}
994EXPORT_SYMBOL(ldlm_init_export);
995
996void ldlm_destroy_export(struct obd_export *exp)
997{
998 cfs_hash_putref(exp->exp_lock_hash);
999 exp->exp_lock_hash = NULL;
1000
1001 ldlm_destroy_flock_export(exp);
1002}
1003EXPORT_SYMBOL(ldlm_destroy_export);
1004
1005static int ldlm_setup(void)
1006{
1007 static struct ptlrpc_service_conf conf;
1008 struct ldlm_bl_pool *blp = NULL;
1009 int rc = 0;
1010 int i;
1011
1012 if (ldlm_state != NULL)
1013 return -EALREADY;
1014
1015 OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
1016 if (ldlm_state == NULL)
1017 return -ENOMEM;
1018
1019 rc = ldlm_proc_setup();
1020 if (rc != 0)
1021 goto out;
1022
1023 memset(&conf, 0, sizeof(conf));
1024 conf = (typeof(conf)) {
1025 .psc_name = "ldlm_cbd",
1026 .psc_watchdog_factor = 2,
1027 .psc_buf = {
1028 .bc_nbufs = LDLM_CLIENT_NBUFS,
1029 .bc_buf_size = LDLM_BUFSIZE,
1030 .bc_req_max_size = LDLM_MAXREQSIZE,
1031 .bc_rep_max_size = LDLM_MAXREPSIZE,
1032 .bc_req_portal = LDLM_CB_REQUEST_PORTAL,
1033 .bc_rep_portal = LDLM_CB_REPLY_PORTAL,
1034 },
1035 .psc_thr = {
1036 .tc_thr_name = "ldlm_cb",
1037 .tc_thr_factor = LDLM_THR_FACTOR,
1038 .tc_nthrs_init = LDLM_NTHRS_INIT,
1039 .tc_nthrs_base = LDLM_NTHRS_BASE,
1040 .tc_nthrs_max = LDLM_NTHRS_MAX,
1041 .tc_nthrs_user = ldlm_num_threads,
1042 .tc_cpu_affinity = 1,
1043 .tc_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD,
1044 },
1045 .psc_cpt = {
1046 .cc_pattern = ldlm_cpts,
1047 },
1048 .psc_ops = {
1049 .so_req_handler = ldlm_callback_handler,
1050 },
1051 };
1052 ldlm_state->ldlm_cb_service = \
1053 ptlrpc_register_service(&conf, ldlm_svc_proc_dir);
1054 if (IS_ERR(ldlm_state->ldlm_cb_service)) {
1055 CERROR("failed to start service\n");
1056 rc = PTR_ERR(ldlm_state->ldlm_cb_service);
1057 ldlm_state->ldlm_cb_service = NULL;
1058 goto out;
1059 }
1060
1061
1062 OBD_ALLOC(blp, sizeof(*blp));
1063 if (blp == NULL) {
1064 rc = -ENOMEM;
1065 goto out;
1066 }
1067 ldlm_state->ldlm_bl_pool = blp;
1068
1069 spin_lock_init(&blp->blp_lock);
1070 INIT_LIST_HEAD(&blp->blp_list);
1071 INIT_LIST_HEAD(&blp->blp_prio_list);
1072 init_waitqueue_head(&blp->blp_waitq);
1073 atomic_set(&blp->blp_num_threads, 0);
1074 atomic_set(&blp->blp_busy_threads, 0);
1075
1076 if (ldlm_num_threads == 0) {
1077 blp->blp_min_threads = LDLM_NTHRS_INIT;
1078 blp->blp_max_threads = LDLM_NTHRS_MAX;
1079 } else {
1080 blp->blp_min_threads = blp->blp_max_threads = \
1081 min_t(int, LDLM_NTHRS_MAX, max_t(int, LDLM_NTHRS_INIT,
1082 ldlm_num_threads));
1083 }
1084
1085 for (i = 0; i < blp->blp_min_threads; i++) {
1086 rc = ldlm_bl_thread_start(blp);
1087 if (rc < 0)
1088 goto out;
1089 }
1090
1091
1092 rc = ldlm_pools_init();
1093 if (rc) {
1094 CERROR("Failed to initialize LDLM pools: %d\n", rc);
1095 goto out;
1096 }
1097 return 0;
1098
1099 out:
1100 ldlm_cleanup();
1101 return rc;
1102}
1103
1104static int ldlm_cleanup(void)
1105{
1106 if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
1107 !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
1108 CERROR("ldlm still has namespaces; clean these up first.\n");
1109 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
1110 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
1111 return -EBUSY;
1112 }
1113
1114 ldlm_pools_fini();
1115
1116 if (ldlm_state->ldlm_bl_pool != NULL) {
1117 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1118
1119 while (atomic_read(&blp->blp_num_threads) > 0) {
1120 struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
1121
1122 init_completion(&blp->blp_comp);
1123
1124 spin_lock(&blp->blp_lock);
1125 list_add_tail(&blwi.blwi_entry, &blp->blp_list);
1126 wake_up(&blp->blp_waitq);
1127 spin_unlock(&blp->blp_lock);
1128
1129 wait_for_completion(&blp->blp_comp);
1130 }
1131
1132 OBD_FREE(blp, sizeof(*blp));
1133 }
1134
1135 if (ldlm_state->ldlm_cb_service != NULL)
1136 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1137
1138 ldlm_proc_cleanup();
1139
1140
1141 OBD_FREE(ldlm_state, sizeof(*ldlm_state));
1142 ldlm_state = NULL;
1143
1144 return 0;
1145}
1146
1147int ldlm_init(void)
1148{
1149 mutex_init(&ldlm_ref_mutex);
1150 mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
1151 mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
1152 ldlm_resource_slab = kmem_cache_create("ldlm_resources",
1153 sizeof(struct ldlm_resource), 0,
1154 SLAB_HWCACHE_ALIGN, NULL);
1155 if (ldlm_resource_slab == NULL)
1156 return -ENOMEM;
1157
1158 ldlm_lock_slab = kmem_cache_create("ldlm_locks",
1159 sizeof(struct ldlm_lock), 0,
1160 SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU, NULL);
1161 if (ldlm_lock_slab == NULL) {
1162 kmem_cache_destroy(ldlm_resource_slab);
1163 return -ENOMEM;
1164 }
1165
1166 ldlm_interval_slab = kmem_cache_create("interval_node",
1167 sizeof(struct ldlm_interval),
1168 0, SLAB_HWCACHE_ALIGN, NULL);
1169 if (ldlm_interval_slab == NULL) {
1170 kmem_cache_destroy(ldlm_resource_slab);
1171 kmem_cache_destroy(ldlm_lock_slab);
1172 return -ENOMEM;
1173 }
1174#if LUSTRE_TRACKS_LOCK_EXP_REFS
1175 class_export_dump_hook = ldlm_dump_export_locks;
1176#endif
1177 return 0;
1178}
1179
1180void ldlm_exit(void)
1181{
1182 if (ldlm_refcount)
1183 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
1184 kmem_cache_destroy(ldlm_resource_slab);
1185
1186
1187
1188 synchronize_rcu();
1189 kmem_cache_destroy(ldlm_lock_slab);
1190 kmem_cache_destroy(ldlm_interval_slab);
1191}
1192