1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33#define DEBUG_SUBSYSTEM S_MDC
34
35# include <linux/module.h>
36
37#include "../include/lustre_intent.h"
38#include "../include/obd.h"
39#include "../include/obd_class.h"
40#include "../include/lustre_dlm.h"
41#include "../include/lustre_fid.h"
42#include "../include/lustre_mdc.h"
43#include "../include/lustre_net.h"
44#include "../include/lustre_req_layout.h"
45#include "../include/lustre_swab.h"
46
47#include "mdc_internal.h"
48
49struct mdc_getattr_args {
50 struct obd_export *ga_exp;
51 struct md_enqueue_info *ga_minfo;
52};
53
54int it_open_error(int phase, struct lookup_intent *it)
55{
56 if (it_disposition(it, DISP_OPEN_LEASE)) {
57 if (phase >= DISP_OPEN_LEASE)
58 return it->it_status;
59 else
60 return 0;
61 }
62 if (it_disposition(it, DISP_OPEN_OPEN)) {
63 if (phase >= DISP_OPEN_OPEN)
64 return it->it_status;
65 else
66 return 0;
67 }
68
69 if (it_disposition(it, DISP_OPEN_CREATE)) {
70 if (phase >= DISP_OPEN_CREATE)
71 return it->it_status;
72 else
73 return 0;
74 }
75
76 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77 if (phase >= DISP_LOOKUP_EXECD)
78 return it->it_status;
79 else
80 return 0;
81 }
82
83 if (it_disposition(it, DISP_IT_EXECD)) {
84 if (phase >= DISP_IT_EXECD)
85 return it->it_status;
86 else
87 return 0;
88 }
89 CERROR("it disp: %X, status: %d\n", it->it_disposition,
90 it->it_status);
91 LBUG();
92 return 0;
93}
94EXPORT_SYMBOL(it_open_error);
95
96
97int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
98 void *data, __u64 *bits)
99{
100 struct ldlm_lock *lock;
101 struct inode *new_inode = data;
102
103 if (bits)
104 *bits = 0;
105
106 if (!lustre_handle_is_used(lockh))
107 return 0;
108
109 lock = ldlm_handle2lock(lockh);
110
111 LASSERT(lock);
112 lock_res_and_lock(lock);
113 if (lock->l_resource->lr_lvb_inode &&
114 lock->l_resource->lr_lvb_inode != data) {
115 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
116
117 LASSERTF(old_inode->i_state & I_FREEING,
118 "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
119 old_inode, old_inode->i_ino, old_inode->i_generation,
120 old_inode->i_state, new_inode, new_inode->i_ino,
121 new_inode->i_generation);
122 }
123 lock->l_resource->lr_lvb_inode = new_inode;
124 if (bits)
125 *bits = lock->l_policy_data.l_inodebits.bits;
126
127 unlock_res_and_lock(lock);
128 LDLM_LOCK_PUT(lock);
129
130 return 0;
131}
132
133enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
134 const struct lu_fid *fid, enum ldlm_type type,
135 union ldlm_policy_data *policy,
136 enum ldlm_mode mode,
137 struct lustre_handle *lockh)
138{
139 struct ldlm_res_id res_id;
140 enum ldlm_mode rc;
141
142 fid_build_reg_res_name(fid, &res_id);
143
144 policy->l_inodebits.bits &= exp_connect_ibits(exp);
145 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
146 &res_id, type, policy, mode, lockh, 0);
147 return rc;
148}
149
150int mdc_cancel_unused(struct obd_export *exp,
151 const struct lu_fid *fid,
152 union ldlm_policy_data *policy,
153 enum ldlm_mode mode,
154 enum ldlm_cancel_flags flags,
155 void *opaque)
156{
157 struct ldlm_res_id res_id;
158 struct obd_device *obd = class_exp2obd(exp);
159 int rc;
160
161 fid_build_reg_res_name(fid, &res_id);
162 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
163 policy, mode, flags, opaque);
164 return rc;
165}
166
167int mdc_null_inode(struct obd_export *exp,
168 const struct lu_fid *fid)
169{
170 struct ldlm_res_id res_id;
171 struct ldlm_resource *res;
172 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
173
174 LASSERTF(ns, "no namespace passed\n");
175
176 fid_build_reg_res_name(fid, &res_id);
177
178 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
179 if (IS_ERR(res))
180 return 0;
181
182 lock_res(res);
183 res->lr_lvb_inode = NULL;
184 unlock_res(res);
185
186 ldlm_resource_putref(res);
187 return 0;
188}
189
190static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
191{
192
193 if (req->rq_replay) {
194 spin_lock(&req->rq_lock);
195 req->rq_replay = 0;
196 spin_unlock(&req->rq_lock);
197 }
198 if (rc && req->rq_transno != 0) {
199 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
200 LBUG();
201 }
202}
203
204
205
206
207
208
209
210
211
212
213
214
215
216static void mdc_realloc_openmsg(struct ptlrpc_request *req,
217 struct mdt_body *body)
218{
219 int rc;
220
221
222 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
223 body->mbo_eadatasize);
224 if (rc) {
225 CERROR("Can't enlarge segment %d size to %d\n",
226 DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
227 body->mbo_valid &= ~OBD_MD_FLEASIZE;
228 body->mbo_eadatasize = 0;
229 }
230}
231
232static struct ptlrpc_request *
233mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
234 struct md_op_data *op_data)
235{
236 struct ptlrpc_request *req;
237 struct obd_device *obddev = class_exp2obd(exp);
238 struct ldlm_intent *lit;
239 const void *lmm = op_data->op_data;
240 u32 lmmsize = op_data->op_data_size;
241 LIST_HEAD(cancels);
242 int count = 0;
243 int mode;
244 int rc;
245
246 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
247
248
249
250 if (fid_is_sane(&op_data->op_fid2)) {
251 if (it->it_flags & MDS_OPEN_LEASE) {
252 if (it->it_flags & FMODE_WRITE)
253 mode = LCK_EX;
254 else
255 mode = LCK_PR;
256 } else {
257 if (it->it_flags & (FMODE_WRITE | MDS_OPEN_TRUNC))
258 mode = LCK_CW;
259 else if (it->it_flags & __FMODE_EXEC)
260 mode = LCK_PR;
261 else
262 mode = LCK_CR;
263 }
264 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
265 &cancels, mode,
266 MDS_INODELOCK_OPEN);
267 }
268
269
270 if (it->it_op & IT_CREAT)
271 mode = LCK_EX;
272 else
273 mode = LCK_CR;
274 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
275 &cancels, mode,
276 MDS_INODELOCK_UPDATE);
277
278 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
279 &RQF_LDLM_INTENT_OPEN);
280 if (!req) {
281 ldlm_lock_list_put(&cancels, l_bl_ast, count);
282 return ERR_PTR(-ENOMEM);
283 }
284
285 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
286 op_data->op_namelen + 1);
287 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
288 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
289
290 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
291 if (rc < 0) {
292 ptlrpc_request_free(req);
293 return ERR_PTR(rc);
294 }
295
296 spin_lock(&req->rq_lock);
297 req->rq_replay = req->rq_import->imp_replayable;
298 spin_unlock(&req->rq_lock);
299
300
301 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
302 lit->opc = (__u64)it->it_op;
303
304
305 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
306 lmmsize);
307
308 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
309 obddev->u.cli.cl_max_mds_easize);
310
311 ptlrpc_request_set_replen(req);
312 return req;
313}
314
315static struct ptlrpc_request *
316mdc_intent_getxattr_pack(struct obd_export *exp,
317 struct lookup_intent *it,
318 struct md_op_data *op_data)
319{
320 struct ptlrpc_request *req;
321 struct ldlm_intent *lit;
322 int rc, count = 0;
323 u32 maxdata;
324 LIST_HEAD(cancels);
325
326 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
327 &RQF_LDLM_INTENT_GETXATTR);
328 if (!req)
329 return ERR_PTR(-ENOMEM);
330
331 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
332 if (rc) {
333 ptlrpc_request_free(req);
334 return ERR_PTR(rc);
335 }
336
337
338 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
339 lit->opc = IT_GETXATTR;
340
341 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
342
343
344 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
345 0);
346
347 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, maxdata);
348
349 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER, maxdata);
350
351 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
352 RCL_SERVER, maxdata);
353
354 ptlrpc_request_set_replen(req);
355
356 return req;
357}
358
359static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
360 struct lookup_intent *it,
361 struct md_op_data *op_data)
362{
363 struct ptlrpc_request *req;
364 struct obd_device *obddev = class_exp2obd(exp);
365 struct ldlm_intent *lit;
366 int rc;
367
368 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
369 &RQF_LDLM_INTENT_UNLINK);
370 if (!req)
371 return ERR_PTR(-ENOMEM);
372
373 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
374 op_data->op_namelen + 1);
375
376 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
377 if (rc) {
378 ptlrpc_request_free(req);
379 return ERR_PTR(rc);
380 }
381
382
383 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
384 lit->opc = (__u64)it->it_op;
385
386
387 mdc_unlink_pack(req, op_data);
388
389 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
390 obddev->u.cli.cl_default_mds_easize);
391 ptlrpc_request_set_replen(req);
392 return req;
393}
394
395static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
396 struct lookup_intent *it,
397 struct md_op_data *op_data)
398{
399 struct ptlrpc_request *req;
400 struct obd_device *obddev = class_exp2obd(exp);
401 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
402 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
403 OBD_MD_MEA | OBD_MD_FLACL;
404 struct ldlm_intent *lit;
405 int rc;
406 u32 easize;
407
408 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
409 &RQF_LDLM_INTENT_GETATTR);
410 if (!req)
411 return ERR_PTR(-ENOMEM);
412
413 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
414 op_data->op_namelen + 1);
415
416 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
417 if (rc) {
418 ptlrpc_request_free(req);
419 return ERR_PTR(rc);
420 }
421
422
423 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
424 lit->opc = (__u64)it->it_op;
425
426 if (obddev->u.cli.cl_default_mds_easize > 0)
427 easize = obddev->u.cli.cl_default_mds_easize;
428 else
429 easize = obddev->u.cli.cl_max_mds_easize;
430
431
432 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
433
434 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
435 ptlrpc_request_set_replen(req);
436 return req;
437}
438
439static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
440 struct lookup_intent *it,
441 struct md_op_data *unused)
442{
443 struct obd_device *obd = class_exp2obd(exp);
444 struct ptlrpc_request *req;
445 struct ldlm_intent *lit;
446 struct layout_intent *layout;
447 int rc;
448
449 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
450 &RQF_LDLM_INTENT_LAYOUT);
451 if (!req)
452 return ERR_PTR(-ENOMEM);
453
454 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
455 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
456 if (rc) {
457 ptlrpc_request_free(req);
458 return ERR_PTR(rc);
459 }
460
461
462 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
463 lit->opc = (__u64)it->it_op;
464
465
466 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
467
468
469
470 layout->li_opc = LAYOUT_INTENT_ACCESS;
471
472 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
473 obd->u.cli.cl_default_mds_easize);
474 ptlrpc_request_set_replen(req);
475 return req;
476}
477
478static struct ptlrpc_request *
479mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
480{
481 struct ptlrpc_request *req;
482 int rc;
483
484 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
485 if (!req)
486 return ERR_PTR(-ENOMEM);
487
488 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
489 if (rc) {
490 ptlrpc_request_free(req);
491 return ERR_PTR(rc);
492 }
493
494 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
495 ptlrpc_request_set_replen(req);
496 return req;
497}
498
499static int mdc_finish_enqueue(struct obd_export *exp,
500 struct ptlrpc_request *req,
501 struct ldlm_enqueue_info *einfo,
502 struct lookup_intent *it,
503 struct lustre_handle *lockh,
504 int rc)
505{
506 struct req_capsule *pill = &req->rq_pill;
507 struct ldlm_request *lockreq;
508 struct ldlm_reply *lockrep;
509 struct ldlm_lock *lock;
510 void *lvb_data = NULL;
511 u32 lvb_len = 0;
512
513 LASSERT(rc >= 0);
514
515
516
517 if (req->rq_transno || req->rq_replay) {
518 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
519 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
520 }
521
522 if (rc == ELDLM_LOCK_ABORTED) {
523 einfo->ei_mode = 0;
524 memset(lockh, 0, sizeof(*lockh));
525 rc = 0;
526 } else {
527 lock = ldlm_handle2lock(lockh);
528
529
530
531
532 if (lock->l_req_mode != einfo->ei_mode) {
533 ldlm_lock_addref(lockh, lock->l_req_mode);
534 ldlm_lock_decref(lockh, einfo->ei_mode);
535 einfo->ei_mode = lock->l_req_mode;
536 }
537 LDLM_LOCK_PUT(lock);
538 }
539
540 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
541
542 it->it_disposition = (int)lockrep->lock_policy_res1;
543 it->it_status = (int)lockrep->lock_policy_res2;
544 it->it_lock_mode = einfo->ei_mode;
545 it->it_lock_handle = lockh->cookie;
546 it->it_request = req;
547
548
549
550
551 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
552 mdc_clear_replay_flag(req, it->it_status);
553
554
555
556
557
558
559
560
561
562 if (it->it_op & IT_OPEN && req->rq_replay &&
563 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
564 mdc_clear_replay_flag(req, it->it_status);
565
566 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
567 it->it_op, it->it_disposition, it->it_status);
568
569
570 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
571 struct mdt_body *body;
572
573 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
574 if (!body) {
575 CERROR("Can't swab mdt_body\n");
576 return -EPROTO;
577 }
578
579 if (it_disposition(it, DISP_OPEN_OPEN) &&
580 !it_open_error(DISP_OPEN_OPEN, it)) {
581
582
583
584
585
586
587 mdc_set_open_replay_data(NULL, NULL, it);
588 }
589
590 if ((body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
591 void *eadata;
592
593 mdc_update_max_ea_from_body(exp, body);
594
595
596
597
598
599 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
600 body->mbo_eadatasize);
601 if (!eadata)
602 return -EPROTO;
603
604
605
606
607 lvb_data = eadata;
608 lvb_len = body->mbo_eadatasize;
609
610
611
612
613
614
615
616
617
618
619 if ((it->it_op & IT_OPEN) && req->rq_replay) {
620 void *lmm;
621
622 if (req_capsule_get_size(pill, &RMF_EADATA,
623 RCL_CLIENT) <
624 body->mbo_eadatasize)
625 mdc_realloc_openmsg(req, body);
626 else
627 req_capsule_shrink(pill, &RMF_EADATA,
628 body->mbo_eadatasize,
629 RCL_CLIENT);
630
631 req_capsule_set_size(pill, &RMF_EADATA,
632 RCL_CLIENT,
633 body->mbo_eadatasize);
634
635 lmm = req_capsule_client_get(pill, &RMF_EADATA);
636 if (lmm)
637 memcpy(lmm, eadata, body->mbo_eadatasize);
638 }
639 }
640 } else if (it->it_op & IT_LAYOUT) {
641
642
643
644 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
645 if (lvb_len > 0) {
646 lvb_data = req_capsule_server_sized_get(pill,
647 &RMF_DLM_LVB,
648 lvb_len);
649 if (!lvb_data)
650 return -EPROTO;
651 }
652 }
653
654
655 lock = ldlm_handle2lock(lockh);
656 if (lock && ldlm_has_layout(lock) && lvb_data) {
657 void *lmm;
658
659 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
660 ldlm_it2str(it->it_op), lvb_len);
661
662 lmm = libcfs_kvzalloc(lvb_len, GFP_NOFS);
663 if (!lmm) {
664 LDLM_LOCK_PUT(lock);
665 return -ENOMEM;
666 }
667 memcpy(lmm, lvb_data, lvb_len);
668
669
670 lock_res_and_lock(lock);
671 if (!lock->l_lvb_data) {
672 lock->l_lvb_type = LVB_T_LAYOUT;
673 lock->l_lvb_data = lmm;
674 lock->l_lvb_len = lvb_len;
675 lmm = NULL;
676 }
677 unlock_res_and_lock(lock);
678 if (lmm)
679 kvfree(lmm);
680 }
681 if (lock)
682 LDLM_LOCK_PUT(lock);
683
684 return rc;
685}
686
687
688
689
690int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
691 const union ldlm_policy_data *policy,
692 struct lookup_intent *it, struct md_op_data *op_data,
693 struct lustre_handle *lockh, u64 extra_lock_flags)
694{
695 static const union ldlm_policy_data lookup_policy = {
696 .l_inodebits = { MDS_INODELOCK_LOOKUP }
697 };
698 static const union ldlm_policy_data update_policy = {
699 .l_inodebits = { MDS_INODELOCK_UPDATE }
700 };
701 static const union ldlm_policy_data layout_policy = {
702 .l_inodebits = { MDS_INODELOCK_LAYOUT }
703 };
704 static const union ldlm_policy_data getxattr_policy = {
705 .l_inodebits = { MDS_INODELOCK_XATTR }
706 };
707 struct obd_device *obddev = class_exp2obd(exp);
708 struct ptlrpc_request *req = NULL;
709 u64 flags, saved_flags = extra_lock_flags;
710 struct ldlm_res_id res_id;
711 int generation, resends = 0;
712 struct ldlm_reply *lockrep;
713 enum lvb_type lvb_type = LVB_T_NONE;
714 int rc;
715
716 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
717 einfo->ei_type);
718 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
719
720 if (it) {
721 LASSERT(!policy);
722
723 saved_flags |= LDLM_FL_HAS_INTENT;
724 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
725 policy = &update_policy;
726 else if (it->it_op & IT_LAYOUT)
727 policy = &layout_policy;
728 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
729 policy = &getxattr_policy;
730 else
731 policy = &lookup_policy;
732 }
733
734 generation = obddev->u.cli.cl_import->imp_generation;
735resend:
736 flags = saved_flags;
737 if (!it) {
738
739 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
740 einfo->ei_type);
741 res_id.name[3] = LDLM_FLOCK;
742 } else if (it->it_op & IT_OPEN) {
743 req = mdc_intent_open_pack(exp, it, op_data);
744 } else if (it->it_op & IT_UNLINK) {
745 req = mdc_intent_unlink_pack(exp, it, op_data);
746 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
747 req = mdc_intent_getattr_pack(exp, it, op_data);
748 } else if (it->it_op & IT_READDIR) {
749 req = mdc_enqueue_pack(exp, 0);
750 } else if (it->it_op & IT_LAYOUT) {
751 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
752 return -EOPNOTSUPP;
753 req = mdc_intent_layout_pack(exp, it, op_data);
754 lvb_type = LVB_T_LAYOUT;
755 } else if (it->it_op & IT_GETXATTR) {
756 req = mdc_intent_getxattr_pack(exp, it, op_data);
757 } else {
758 LBUG();
759 return -EINVAL;
760 }
761
762 if (IS_ERR(req))
763 return PTR_ERR(req);
764
765 if (resends) {
766 req->rq_generation_set = 1;
767 req->rq_import_generation = generation;
768 req->rq_sent = ktime_get_real_seconds() + resends;
769 }
770
771
772
773
774
775
776 if (it) {
777 mdc_get_mod_rpc_slot(req, it);
778 rc = obd_get_request_slot(&obddev->u.cli);
779 if (rc != 0) {
780 mdc_put_mod_rpc_slot(req, it);
781 mdc_clear_replay_flag(req, 0);
782 ptlrpc_req_finished(req);
783 return rc;
784 }
785 }
786
787 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
788 0, lvb_type, lockh, 0);
789 if (!it) {
790
791
792
793
794
795
796
797
798
799 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
800 (einfo->ei_type == LDLM_FLOCK) &&
801 (einfo->ei_mode == LCK_NL))
802 goto resend;
803 return rc;
804 }
805
806 obd_put_request_slot(&obddev->u.cli);
807 mdc_put_mod_rpc_slot(req, it);
808
809 if (rc < 0) {
810 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
811 obddev->obd_name, rc);
812
813 mdc_clear_replay_flag(req, rc);
814 ptlrpc_req_finished(req);
815 return rc;
816 }
817
818 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
819
820 lockrep->lock_policy_res2 =
821 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
822
823
824
825
826
827
828 if (it->it_op && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
829 mdc_clear_replay_flag(req, rc);
830 ptlrpc_req_finished(req);
831 resends++;
832
833 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
834 obddev->obd_name, resends, it->it_op,
835 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
836
837 if (generation == obddev->u.cli.cl_import->imp_generation) {
838 goto resend;
839 } else {
840 CDEBUG(D_HA, "resend cross eviction\n");
841 return -EIO;
842 }
843 }
844
845 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
846 if (rc < 0) {
847 if (lustre_handle_is_used(lockh)) {
848 ldlm_lock_decref(lockh, einfo->ei_mode);
849 memset(lockh, 0, sizeof(*lockh));
850 }
851 ptlrpc_req_finished(req);
852
853 it->it_lock_handle = 0;
854 it->it_lock_mode = 0;
855 it->it_request = NULL;
856 }
857
858 return rc;
859}
860
861static int mdc_finish_intent_lock(struct obd_export *exp,
862 struct ptlrpc_request *request,
863 struct md_op_data *op_data,
864 struct lookup_intent *it,
865 struct lustre_handle *lockh)
866{
867 struct lustre_handle old_lock;
868 struct mdt_body *mdt_body;
869 struct ldlm_lock *lock;
870 int rc;
871
872 LASSERT(request != LP_POISON);
873 LASSERT(request->rq_repmsg != LP_POISON);
874
875 if (it->it_op & IT_READDIR)
876 return 0;
877
878 if (!it_disposition(it, DISP_IT_EXECD)) {
879
880
881
882 LASSERT(it->it_status != 0);
883 return it->it_status;
884 }
885 rc = it_open_error(DISP_IT_EXECD, it);
886 if (rc)
887 return rc;
888
889 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
890 LASSERT(mdt_body);
891
892 rc = it_open_error(DISP_LOOKUP_EXECD, it);
893 if (rc)
894 return rc;
895
896
897
898
899 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
900 it_disposition(it, DISP_OPEN_CREATE) &&
901 !it_open_error(DISP_OPEN_CREATE, it)) {
902 it_set_disposition(it, DISP_ENQ_CREATE_REF);
903 ptlrpc_request_addref(request);
904 }
905 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
906 it_disposition(it, DISP_OPEN_OPEN) &&
907 !it_open_error(DISP_OPEN_OPEN, it)) {
908 it_set_disposition(it, DISP_ENQ_OPEN_REF);
909 ptlrpc_request_addref(request);
910
911 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
912 }
913
914 if (it->it_op & IT_CREAT) {
915
916 } else if (it->it_op == IT_OPEN) {
917 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
918 } else {
919 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
920 }
921
922
923
924
925
926
927
928 lock = ldlm_handle2lock(lockh);
929 if (lock) {
930 union ldlm_policy_data policy = lock->l_policy_data;
931
932 LDLM_DEBUG(lock, "matching against this");
933
934 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
935 &lock->l_resource->lr_name),
936 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
937 PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
938 LDLM_LOCK_PUT(lock);
939
940 memcpy(&old_lock, lockh, sizeof(*lockh));
941 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
942 LDLM_IBITS, &policy, LCK_NL,
943 &old_lock, 0)) {
944 ldlm_lock_decref_and_cancel(lockh,
945 it->it_lock_mode);
946 memcpy(lockh, &old_lock, sizeof(old_lock));
947 it->it_lock_handle = lockh->cookie;
948 }
949 }
950 CDEBUG(D_DENTRY,
951 "D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
952 (int)op_data->op_namelen, op_data->op_name,
953 ldlm_it2str(it->it_op), it->it_status, it->it_disposition, rc);
954 return rc;
955}
956
957int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
958 struct lu_fid *fid, __u64 *bits)
959{
960
961
962
963
964 struct ldlm_res_id res_id;
965 struct lustre_handle lockh;
966 union ldlm_policy_data policy;
967 enum ldlm_mode mode;
968
969 if (it->it_lock_handle) {
970 lockh.cookie = it->it_lock_handle;
971 mode = ldlm_revalidate_lock_handle(&lockh, bits);
972 } else {
973 fid_build_reg_res_name(fid, &res_id);
974 switch (it->it_op) {
975 case IT_GETATTR:
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
993 MDS_INODELOCK_LOOKUP |
994 MDS_INODELOCK_PERM;
995 break;
996 case IT_READDIR:
997 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
998 break;
999 case IT_LAYOUT:
1000 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1001 break;
1002 default:
1003 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1004 break;
1005 }
1006
1007 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1008 LDLM_IBITS, &policy,
1009 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1010 &lockh);
1011 }
1012
1013 if (mode) {
1014 it->it_lock_handle = lockh.cookie;
1015 it->it_lock_mode = mode;
1016 } else {
1017 it->it_lock_handle = 0;
1018 it->it_lock_mode = 0;
1019 }
1020
1021 return !!mode;
1022}
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1052 struct lookup_intent *it, struct ptlrpc_request **reqp,
1053 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1054{
1055 struct ldlm_enqueue_info einfo = {
1056 .ei_type = LDLM_IBITS,
1057 .ei_mode = it_to_lock_mode(it),
1058 .ei_cb_bl = cb_blocking,
1059 .ei_cb_cp = ldlm_completion_ast,
1060 };
1061 struct lustre_handle lockh;
1062 int rc = 0;
1063
1064 LASSERT(it);
1065
1066 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1067 ", intent: %s flags %#Lo\n", (int)op_data->op_namelen,
1068 op_data->op_name, PFID(&op_data->op_fid2),
1069 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1070 it->it_flags);
1071
1072 lockh.cookie = 0;
1073 if (fid_is_sane(&op_data->op_fid2) &&
1074 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1075
1076
1077
1078
1079 it->it_lock_handle = 0;
1080 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1081
1082
1083
1084 if (rc || op_data->op_namelen != 0)
1085 return rc;
1086 }
1087
1088
1089 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1090 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1091 if (rc < 0) {
1092 CERROR("Can't alloc new fid, rc %d\n", rc);
1093 return rc;
1094 }
1095 }
1096 rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1097 extra_lock_flags);
1098 if (rc < 0)
1099 return rc;
1100
1101 *reqp = it->it_request;
1102 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1103 return rc;
1104}
1105
1106static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1107 struct ptlrpc_request *req,
1108 void *args, int rc)
1109{
1110 struct mdc_getattr_args *ga = args;
1111 struct obd_export *exp = ga->ga_exp;
1112 struct md_enqueue_info *minfo = ga->ga_minfo;
1113 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1114 struct lookup_intent *it;
1115 struct lustre_handle *lockh;
1116 struct obd_device *obddev;
1117 struct ldlm_reply *lockrep;
1118 __u64 flags = LDLM_FL_HAS_INTENT;
1119
1120 it = &minfo->mi_it;
1121 lockh = &minfo->mi_lockh;
1122
1123 obddev = class_exp2obd(exp);
1124
1125 obd_put_request_slot(&obddev->u.cli);
1126 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1127 rc = -ETIMEDOUT;
1128
1129 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1130 &flags, NULL, 0, lockh, rc);
1131 if (rc < 0) {
1132 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1133 mdc_clear_replay_flag(req, rc);
1134 goto out;
1135 }
1136
1137 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1138
1139 lockrep->lock_policy_res2 =
1140 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1141
1142 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1143 if (rc)
1144 goto out;
1145
1146 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1147
1148out:
1149 minfo->mi_cb(req, minfo, rc);
1150 return 0;
1151}
1152
1153int mdc_intent_getattr_async(struct obd_export *exp,
1154 struct md_enqueue_info *minfo)
1155{
1156 struct md_op_data *op_data = &minfo->mi_data;
1157 struct lookup_intent *it = &minfo->mi_it;
1158 struct ptlrpc_request *req;
1159 struct mdc_getattr_args *ga;
1160 struct obd_device *obddev = class_exp2obd(exp);
1161 struct ldlm_res_id res_id;
1162 union ldlm_policy_data policy = {
1163 .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE }
1164 };
1165 int rc = 0;
1166 __u64 flags = LDLM_FL_HAS_INTENT;
1167
1168 CDEBUG(D_DLMTRACE,
1169 "name: %.*s in inode " DFID ", intent: %s flags %#Lo\n",
1170 (int)op_data->op_namelen, op_data->op_name,
1171 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1172
1173 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1174 req = mdc_intent_getattr_pack(exp, it, op_data);
1175 if (IS_ERR(req))
1176 return PTR_ERR(req);
1177
1178 rc = obd_get_request_slot(&obddev->u.cli);
1179 if (rc != 0) {
1180 ptlrpc_req_finished(req);
1181 return rc;
1182 }
1183
1184 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1185 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1186 if (rc < 0) {
1187 obd_put_request_slot(&obddev->u.cli);
1188 ptlrpc_req_finished(req);
1189 return rc;
1190 }
1191
1192 BUILD_BUG_ON(sizeof(*ga) > sizeof(req->rq_async_args));
1193 ga = ptlrpc_req_async_args(req);
1194 ga->ga_exp = exp;
1195 ga->ga_minfo = minfo;
1196
1197 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1198 ptlrpcd_add_req(req);
1199
1200 return 0;
1201}
1202