1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37#define DEBUG_SUBSYSTEM S_ECHO
38#include <linux/libcfs/libcfs.h>
39
40#include <obd.h>
41#include <obd_support.h>
42#include <obd_class.h>
43#include <lustre_debug.h>
44#include <lprocfs_status.h>
45#include <cl_object.h>
46#include <lustre_fid.h>
47#include <lustre_acl.h>
48#include <lustre_net.h>
49#include <obd_lov.h>
50
51#include "echo_internal.h"
52
53
54
55
56
57struct echo_device {
58 struct cl_device ed_cl;
59 struct echo_client_obd *ed_ec;
60
61 struct cl_site ed_site_myself;
62 struct cl_site *ed_site;
63 struct lu_device *ed_next;
64 int ed_next_islov;
65 int ed_next_ismd;
66 struct lu_client_seq *ed_cl_seq;
67};
68
69struct echo_object {
70 struct cl_object eo_cl;
71 struct cl_object_header eo_hdr;
72
73 struct echo_device *eo_dev;
74 struct list_head eo_obj_chain;
75 struct lov_stripe_md *eo_lsm;
76 atomic_t eo_npages;
77 int eo_deleted;
78};
79
80struct echo_object_conf {
81 struct cl_object_conf eoc_cl;
82 struct lov_stripe_md **eoc_md;
83};
84
85struct echo_page {
86 struct cl_page_slice ep_cl;
87 struct mutex ep_lock;
88 struct page *ep_vmpage;
89};
90
91struct echo_lock {
92 struct cl_lock_slice el_cl;
93 struct list_head el_chain;
94 struct echo_object *el_object;
95 __u64 el_cookie;
96 atomic_t el_refcount;
97};
98
99struct echo_io {
100 struct cl_io_slice ei_cl;
101};
102
103#if 0
104struct echo_req {
105 struct cl_req_slice er_cl;
106};
107#endif
108
109static int echo_client_setup(const struct lu_env *env,
110 struct obd_device *obddev,
111 struct lustre_cfg *lcfg);
112static int echo_client_cleanup(struct obd_device *obddev);
113
114
115
116
117
118static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
119{
120 return container_of0(dev, struct echo_device, ed_cl);
121}
122
123static inline struct cl_device *echo_dev2cl(struct echo_device *d)
124{
125 return &d->ed_cl;
126}
127
128static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
129{
130 return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
131}
132
133static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
134{
135 return &eco->eo_cl;
136}
137
138static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
139{
140 return container_of(o, struct echo_object, eo_cl);
141}
142
143static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
144{
145 return container_of(s, struct echo_page, ep_cl);
146}
147
148static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
149{
150 return container_of(s, struct echo_lock, el_cl);
151}
152
153static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
154{
155 return ecl->el_cl.cls_lock;
156}
157
158static struct lu_context_key echo_thread_key;
159static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
160{
161 struct echo_thread_info *info;
162 info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
163 LASSERT(info != NULL);
164 return info;
165}
166
167static inline
168struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
169{
170 return container_of(c, struct echo_object_conf, eoc_cl);
171}
172
173
174
175static struct echo_object *cl_echo_object_find(struct echo_device *d,
176 struct lov_stripe_md **lsm);
177static int cl_echo_object_put(struct echo_object *eco);
178static int cl_echo_enqueue (struct echo_object *eco, obd_off start,
179 obd_off end, int mode, __u64 *cookie);
180static int cl_echo_cancel (struct echo_device *d, __u64 cookie);
181static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
182 struct page **pages, int npages, int async);
183
184static struct echo_thread_info *echo_env_info(const struct lu_env *env);
185
186struct echo_thread_info {
187 struct echo_object_conf eti_conf;
188 struct lustre_md eti_md;
189
190 struct cl_2queue eti_queue;
191 struct cl_io eti_io;
192 struct cl_lock_descr eti_descr;
193 struct lu_fid eti_fid;
194 struct lu_fid eti_fid2;
195 struct md_op_spec eti_spec;
196 struct lov_mds_md_v3 eti_lmm;
197 struct lov_user_md_v3 eti_lum;
198 struct md_attr eti_ma;
199 struct lu_name eti_lname;
200
201 void *eti_big_lmm;
202 int eti_big_lmmsize;
203 char eti_name[20];
204 struct lu_buf eti_buf;
205 char eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
206};
207
208
209struct echo_session_info {
210 unsigned long dummy;
211};
212
213static struct kmem_cache *echo_lock_kmem;
214static struct kmem_cache *echo_object_kmem;
215static struct kmem_cache *echo_thread_kmem;
216static struct kmem_cache *echo_session_kmem;
217
218
219static struct lu_kmem_descr echo_caches[] = {
220 {
221 .ckd_cache = &echo_lock_kmem,
222 .ckd_name = "echo_lock_kmem",
223 .ckd_size = sizeof (struct echo_lock)
224 },
225 {
226 .ckd_cache = &echo_object_kmem,
227 .ckd_name = "echo_object_kmem",
228 .ckd_size = sizeof (struct echo_object)
229 },
230 {
231 .ckd_cache = &echo_thread_kmem,
232 .ckd_name = "echo_thread_kmem",
233 .ckd_size = sizeof (struct echo_thread_info)
234 },
235 {
236 .ckd_cache = &echo_session_kmem,
237 .ckd_name = "echo_session_kmem",
238 .ckd_size = sizeof (struct echo_session_info)
239 },
240#if 0
241 {
242 .ckd_cache = &echo_req_kmem,
243 .ckd_name = "echo_req_kmem",
244 .ckd_size = sizeof (struct echo_req)
245 },
246#endif
247 {
248 .ckd_cache = NULL
249 }
250};
251
252
253
254
255
256
257
258static struct page *echo_page_vmpage(const struct lu_env *env,
259 const struct cl_page_slice *slice)
260{
261 return cl2echo_page(slice)->ep_vmpage;
262}
263
264static int echo_page_own(const struct lu_env *env,
265 const struct cl_page_slice *slice,
266 struct cl_io *io, int nonblock)
267{
268 struct echo_page *ep = cl2echo_page(slice);
269
270 if (!nonblock)
271 mutex_lock(&ep->ep_lock);
272 else if (!mutex_trylock(&ep->ep_lock))
273 return -EAGAIN;
274 return 0;
275}
276
277static void echo_page_disown(const struct lu_env *env,
278 const struct cl_page_slice *slice,
279 struct cl_io *io)
280{
281 struct echo_page *ep = cl2echo_page(slice);
282
283 LASSERT(mutex_is_locked(&ep->ep_lock));
284 mutex_unlock(&ep->ep_lock);
285}
286
287static void echo_page_discard(const struct lu_env *env,
288 const struct cl_page_slice *slice,
289 struct cl_io *unused)
290{
291 cl_page_delete(env, slice->cpl_page);
292}
293
294static int echo_page_is_vmlocked(const struct lu_env *env,
295 const struct cl_page_slice *slice)
296{
297 if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
298 return -EBUSY;
299 return -ENODATA;
300}
301
302static void echo_page_completion(const struct lu_env *env,
303 const struct cl_page_slice *slice,
304 int ioret)
305{
306 LASSERT(slice->cpl_page->cp_sync_io != NULL);
307}
308
309static void echo_page_fini(const struct lu_env *env,
310 struct cl_page_slice *slice)
311{
312 struct echo_page *ep = cl2echo_page(slice);
313 struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
314 struct page *vmpage = ep->ep_vmpage;
315 ENTRY;
316
317 atomic_dec(&eco->eo_npages);
318 page_cache_release(vmpage);
319 EXIT;
320}
321
322static int echo_page_prep(const struct lu_env *env,
323 const struct cl_page_slice *slice,
324 struct cl_io *unused)
325{
326 return 0;
327}
328
329static int echo_page_print(const struct lu_env *env,
330 const struct cl_page_slice *slice,
331 void *cookie, lu_printer_t printer)
332{
333 struct echo_page *ep = cl2echo_page(slice);
334
335 (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
336 ep, mutex_is_locked(&ep->ep_lock), ep->ep_vmpage);
337 return 0;
338}
339
340static const struct cl_page_operations echo_page_ops = {
341 .cpo_own = echo_page_own,
342 .cpo_disown = echo_page_disown,
343 .cpo_discard = echo_page_discard,
344 .cpo_vmpage = echo_page_vmpage,
345 .cpo_fini = echo_page_fini,
346 .cpo_print = echo_page_print,
347 .cpo_is_vmlocked = echo_page_is_vmlocked,
348 .io = {
349 [CRT_READ] = {
350 .cpo_prep = echo_page_prep,
351 .cpo_completion = echo_page_completion,
352 },
353 [CRT_WRITE] = {
354 .cpo_prep = echo_page_prep,
355 .cpo_completion = echo_page_completion,
356 }
357 }
358};
359
360
361
362
363
364
365
366
367static void echo_lock_fini(const struct lu_env *env,
368 struct cl_lock_slice *slice)
369{
370 struct echo_lock *ecl = cl2echo_lock(slice);
371
372 LASSERT(list_empty(&ecl->el_chain));
373 OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
374}
375
376static void echo_lock_delete(const struct lu_env *env,
377 const struct cl_lock_slice *slice)
378{
379 struct echo_lock *ecl = cl2echo_lock(slice);
380
381 LASSERT(list_empty(&ecl->el_chain));
382}
383
384static int echo_lock_fits_into(const struct lu_env *env,
385 const struct cl_lock_slice *slice,
386 const struct cl_lock_descr *need,
387 const struct cl_io *unused)
388{
389 return 1;
390}
391
392static struct cl_lock_operations echo_lock_ops = {
393 .clo_fini = echo_lock_fini,
394 .clo_delete = echo_lock_delete,
395 .clo_fits_into = echo_lock_fits_into
396};
397
398
399
400
401
402
403
404
405
406static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
407 struct cl_page *page, struct page *vmpage)
408{
409 struct echo_page *ep = cl_object_page_slice(obj, page);
410 struct echo_object *eco = cl2echo_obj(obj);
411 ENTRY;
412
413 ep->ep_vmpage = vmpage;
414 page_cache_get(vmpage);
415 mutex_init(&ep->ep_lock);
416 cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops);
417 atomic_inc(&eco->eo_npages);
418 RETURN(0);
419}
420
421static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
422 struct cl_io *io)
423{
424 return 0;
425}
426
427static int echo_lock_init(const struct lu_env *env,
428 struct cl_object *obj, struct cl_lock *lock,
429 const struct cl_io *unused)
430{
431 struct echo_lock *el;
432 ENTRY;
433
434 OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, __GFP_IO);
435 if (el != NULL) {
436 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
437 el->el_object = cl2echo_obj(obj);
438 INIT_LIST_HEAD(&el->el_chain);
439 atomic_set(&el->el_refcount, 0);
440 }
441 RETURN(el == NULL ? -ENOMEM : 0);
442}
443
444static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
445 const struct cl_object_conf *conf)
446{
447 return 0;
448}
449
450static const struct cl_object_operations echo_cl_obj_ops = {
451 .coo_page_init = echo_page_init,
452 .coo_lock_init = echo_lock_init,
453 .coo_io_init = echo_io_init,
454 .coo_conf_set = echo_conf_set
455};
456
457
458
459
460
461
462
463
464static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
465 const struct lu_object_conf *conf)
466{
467 struct echo_device *ed = cl2echo_dev(lu2cl_dev(obj->lo_dev));
468 struct echo_client_obd *ec = ed->ed_ec;
469 struct echo_object *eco = cl2echo_obj(lu2cl(obj));
470 ENTRY;
471
472 if (ed->ed_next) {
473 struct lu_object *below;
474 struct lu_device *under;
475
476 under = ed->ed_next;
477 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
478 under);
479 if (below == NULL)
480 RETURN(-ENOMEM);
481 lu_object_add(obj, below);
482 }
483
484 if (!ed->ed_next_ismd) {
485 const struct cl_object_conf *cconf = lu2cl_conf(conf);
486 struct echo_object_conf *econf = cl2echo_conf(cconf);
487
488 LASSERT(econf->eoc_md);
489 eco->eo_lsm = *econf->eoc_md;
490
491 *econf->eoc_md = NULL;
492 } else {
493 eco->eo_lsm = NULL;
494 }
495
496 eco->eo_dev = ed;
497 atomic_set(&eco->eo_npages, 0);
498 cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
499
500 spin_lock(&ec->ec_lock);
501 list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
502 spin_unlock(&ec->ec_lock);
503
504 RETURN(0);
505}
506
507
508static int echo_alloc_memmd(struct echo_device *ed,
509 struct lov_stripe_md **lsmp)
510{
511 int lsm_size;
512
513 ENTRY;
514
515
516 if (ed->ed_next != NULL)
517 return obd_alloc_memmd(ed->ed_ec->ec_exp, lsmp);
518
519 lsm_size = lov_stripe_md_size(1);
520
521 LASSERT(*lsmp == NULL);
522 OBD_ALLOC(*lsmp, lsm_size);
523 if (*lsmp == NULL)
524 RETURN(-ENOMEM);
525
526 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
527 if ((*lsmp)->lsm_oinfo[0] == NULL) {
528 OBD_FREE(*lsmp, lsm_size);
529 RETURN(-ENOMEM);
530 }
531
532 loi_init((*lsmp)->lsm_oinfo[0]);
533 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
534 ostid_set_seq_echo(&(*lsmp)->lsm_oi);
535
536 RETURN(lsm_size);
537}
538
539static int echo_free_memmd(struct echo_device *ed, struct lov_stripe_md **lsmp)
540{
541 int lsm_size;
542
543 ENTRY;
544
545
546 if (ed->ed_next != NULL)
547 return obd_free_memmd(ed->ed_ec->ec_exp, lsmp);
548
549 lsm_size = lov_stripe_md_size(1);
550
551 LASSERT(*lsmp != NULL);
552 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
553 OBD_FREE(*lsmp, lsm_size);
554 *lsmp = NULL;
555 RETURN(0);
556}
557
558static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
559{
560 struct echo_object *eco = cl2echo_obj(lu2cl(obj));
561 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
562 ENTRY;
563
564 LASSERT(atomic_read(&eco->eo_npages) == 0);
565
566 spin_lock(&ec->ec_lock);
567 list_del_init(&eco->eo_obj_chain);
568 spin_unlock(&ec->ec_lock);
569
570 lu_object_fini(obj);
571 lu_object_header_fini(obj->lo_header);
572
573 if (eco->eo_lsm)
574 echo_free_memmd(eco->eo_dev, &eco->eo_lsm);
575 OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
576 EXIT;
577}
578
579static int echo_object_print(const struct lu_env *env, void *cookie,
580 lu_printer_t p, const struct lu_object *o)
581{
582 struct echo_object *obj = cl2echo_obj(lu2cl(o));
583
584 return (*p)(env, cookie, "echoclient-object@%p", obj);
585}
586
587static const struct lu_object_operations echo_lu_obj_ops = {
588 .loo_object_init = echo_object_init,
589 .loo_object_delete = NULL,
590 .loo_object_release = NULL,
591 .loo_object_free = echo_object_free,
592 .loo_object_print = echo_object_print,
593 .loo_object_invariant = NULL
594};
595
596
597
598
599
600
601
602
603static struct lu_object *echo_object_alloc(const struct lu_env *env,
604 const struct lu_object_header *hdr,
605 struct lu_device *dev)
606{
607 struct echo_object *eco;
608 struct lu_object *obj = NULL;
609 ENTRY;
610
611
612 LASSERT(hdr == NULL);
613 OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, __GFP_IO);
614 if (eco != NULL) {
615 struct cl_object_header *hdr = &eco->eo_hdr;
616
617 obj = &echo_obj2cl(eco)->co_lu;
618 cl_object_header_init(hdr);
619 lu_object_init(obj, &hdr->coh_lu, dev);
620 lu_object_add_top(&hdr->coh_lu, obj);
621
622 eco->eo_cl.co_ops = &echo_cl_obj_ops;
623 obj->lo_ops = &echo_lu_obj_ops;
624 }
625 RETURN(obj);
626}
627
628static struct lu_device_operations echo_device_lu_ops = {
629 .ldo_object_alloc = echo_object_alloc,
630};
631
632
633
634static struct cl_device_operations echo_device_cl_ops = {
635};
636
637
638
639
640
641
642
643static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
644{
645 struct cl_site *site = &ed->ed_site_myself;
646 int rc;
647
648
649 rc = cl_site_init(site, &ed->ed_cl);
650 if (rc) {
651 CERROR("Cannot initilize site for echo client(%d)\n", rc);
652 return rc;
653 }
654
655 rc = lu_site_init_finish(&site->cs_lu);
656 if (rc)
657 return rc;
658
659 ed->ed_site = site;
660 return 0;
661}
662
663static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
664{
665 if (ed->ed_site) {
666 if (!ed->ed_next_ismd)
667 cl_site_fini(ed->ed_site);
668 ed->ed_site = NULL;
669 }
670}
671
672static void *echo_thread_key_init(const struct lu_context *ctx,
673 struct lu_context_key *key)
674{
675 struct echo_thread_info *info;
676
677 OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, __GFP_IO);
678 if (info == NULL)
679 info = ERR_PTR(-ENOMEM);
680 return info;
681}
682
683static void echo_thread_key_fini(const struct lu_context *ctx,
684 struct lu_context_key *key, void *data)
685{
686 struct echo_thread_info *info = data;
687 OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
688}
689
690static void echo_thread_key_exit(const struct lu_context *ctx,
691 struct lu_context_key *key, void *data)
692{
693}
694
695static struct lu_context_key echo_thread_key = {
696 .lct_tags = LCT_CL_THREAD,
697 .lct_init = echo_thread_key_init,
698 .lct_fini = echo_thread_key_fini,
699 .lct_exit = echo_thread_key_exit
700};
701
702static void *echo_session_key_init(const struct lu_context *ctx,
703 struct lu_context_key *key)
704{
705 struct echo_session_info *session;
706
707 OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, __GFP_IO);
708 if (session == NULL)
709 session = ERR_PTR(-ENOMEM);
710 return session;
711}
712
713static void echo_session_key_fini(const struct lu_context *ctx,
714 struct lu_context_key *key, void *data)
715{
716 struct echo_session_info *session = data;
717 OBD_SLAB_FREE_PTR(session, echo_session_kmem);
718}
719
720static void echo_session_key_exit(const struct lu_context *ctx,
721 struct lu_context_key *key, void *data)
722{
723}
724
725static struct lu_context_key echo_session_key = {
726 .lct_tags = LCT_SESSION,
727 .lct_init = echo_session_key_init,
728 .lct_fini = echo_session_key_fini,
729 .lct_exit = echo_session_key_exit
730};
731
732LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
733
734#define ECHO_SEQ_WIDTH 0xffffffff
735static int echo_fid_init(struct echo_device *ed, char *obd_name,
736 struct seq_server_site *ss)
737{
738 char *prefix;
739 int rc;
740 ENTRY;
741
742 OBD_ALLOC_PTR(ed->ed_cl_seq);
743 if (ed->ed_cl_seq == NULL)
744 RETURN(-ENOMEM);
745
746 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
747 if (prefix == NULL)
748 GOTO(out_free_seq, rc = -ENOMEM);
749
750 snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name);
751
752
753 rc = seq_client_init(ed->ed_cl_seq, NULL,
754 LUSTRE_SEQ_METADATA,
755 prefix, ss->ss_server_seq);
756 ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH;
757 OBD_FREE(prefix, MAX_OBD_NAME + 5);
758 if (rc)
759 GOTO(out_free_seq, rc);
760
761 RETURN(0);
762
763out_free_seq:
764 OBD_FREE_PTR(ed->ed_cl_seq);
765 ed->ed_cl_seq = NULL;
766 RETURN(rc);
767}
768
769static int echo_fid_fini(struct obd_device *obddev)
770{
771 struct echo_device *ed = obd2echo_dev(obddev);
772 ENTRY;
773
774 if (ed->ed_cl_seq != NULL) {
775 seq_client_fini(ed->ed_cl_seq);
776 OBD_FREE_PTR(ed->ed_cl_seq);
777 ed->ed_cl_seq = NULL;
778 }
779
780 RETURN(0);
781}
782
783static struct lu_device *echo_device_alloc(const struct lu_env *env,
784 struct lu_device_type *t,
785 struct lustre_cfg *cfg)
786{
787 struct lu_device *next;
788 struct echo_device *ed;
789 struct cl_device *cd;
790 struct obd_device *obd = NULL;
791 struct obd_device *tgt;
792 const char *tgt_type_name;
793 int rc;
794 int cleanup = 0;
795 ENTRY;
796
797 OBD_ALLOC_PTR(ed);
798 if (ed == NULL)
799 GOTO(out, rc = -ENOMEM);
800
801 cleanup = 1;
802 cd = &ed->ed_cl;
803 rc = cl_device_init(cd, t);
804 if (rc)
805 GOTO(out, rc);
806
807 cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
808 cd->cd_ops = &echo_device_cl_ops;
809
810 cleanup = 2;
811 obd = class_name2obd(lustre_cfg_string(cfg, 0));
812 LASSERT(obd != NULL);
813 LASSERT(env != NULL);
814
815 tgt = class_name2obd(lustre_cfg_string(cfg, 1));
816 if (tgt == NULL) {
817 CERROR("Can not find tgt device %s\n",
818 lustre_cfg_string(cfg, 1));
819 GOTO(out, rc = -ENODEV);
820 }
821
822 next = tgt->obd_lu_dev;
823 if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
824 ed->ed_next_ismd = 1;
825 } else {
826 ed->ed_next_ismd = 0;
827 rc = echo_site_init(env, ed);
828 if (rc)
829 GOTO(out, rc);
830 }
831 cleanup = 3;
832
833 rc = echo_client_setup(env, obd, cfg);
834 if (rc)
835 GOTO(out, rc);
836
837 ed->ed_ec = &obd->u.echo_client;
838 cleanup = 4;
839
840 if (ed->ed_next_ismd) {
841
842 struct lu_site *ls;
843 struct lu_device *ld;
844 int found = 0;
845
846 if (next == NULL) {
847 CERROR("%s is not lu device type!\n",
848 lustre_cfg_string(cfg, 1));
849 GOTO(out, rc = -EINVAL);
850 }
851
852 tgt_type_name = lustre_cfg_string(cfg, 2);
853 if (!tgt_type_name) {
854 CERROR("%s no type name for echo %s setup\n",
855 lustre_cfg_string(cfg, 1),
856 tgt->obd_type->typ_name);
857 GOTO(out, rc = -EINVAL);
858 }
859
860 ls = next->ld_site;
861
862 spin_lock(&ls->ls_ld_lock);
863 list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) {
864 if (strcmp(ld->ld_type->ldt_name, tgt_type_name) == 0) {
865 found = 1;
866 break;
867 }
868 }
869 spin_unlock(&ls->ls_ld_lock);
870
871 if (found == 0) {
872 CERROR("%s is not lu device type!\n",
873 lustre_cfg_string(cfg, 1));
874 GOTO(out, rc = -EINVAL);
875 }
876
877 next = ld;
878
879 ed->ed_site_myself.cs_lu = *ls;
880 ed->ed_site = &ed->ed_site_myself;
881 ed->ed_cl.cd_lu_dev.ld_site = &ed->ed_site_myself.cs_lu;
882 rc = echo_fid_init(ed, obd->obd_name, lu_site2seq(ls));
883 if (rc) {
884 CERROR("echo fid init error %d\n", rc);
885 GOTO(out, rc);
886 }
887 } else {
888
889
890 if (next != NULL && !lu_device_is_cl(next))
891 next = NULL;
892
893 tgt_type_name = tgt->obd_type->typ_name;
894 if (next != NULL) {
895 LASSERT(next != NULL);
896 if (next->ld_site != NULL)
897 GOTO(out, rc = -EBUSY);
898
899 next->ld_site = &ed->ed_site->cs_lu;
900 rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
901 next->ld_type->ldt_name,
902 NULL);
903 if (rc)
904 GOTO(out, rc);
905
906
907
908
909 if (strcmp(tgt_type_name, LUSTRE_LOV_NAME) == 0)
910 ed->ed_next_islov = 1;
911 else
912 LASSERT(strcmp(tgt_type_name,
913 LUSTRE_OSC_NAME) == 0);
914 } else
915 LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
916 }
917
918 ed->ed_next = next;
919 RETURN(&cd->cd_lu_dev);
920out:
921 switch(cleanup) {
922 case 4: {
923 int rc2;
924 rc2 = echo_client_cleanup(obd);
925 if (rc2)
926 CERROR("Cleanup obd device %s error(%d)\n",
927 obd->obd_name, rc2);
928 }
929
930 case 3:
931 echo_site_fini(env, ed);
932 case 2:
933 cl_device_fini(&ed->ed_cl);
934 case 1:
935 OBD_FREE_PTR(ed);
936 case 0:
937 default:
938 break;
939 }
940 return(ERR_PTR(rc));
941}
942
943static int echo_device_init(const struct lu_env *env, struct lu_device *d,
944 const char *name, struct lu_device *next)
945{
946 LBUG();
947 return 0;
948}
949
950static struct lu_device *echo_device_fini(const struct lu_env *env,
951 struct lu_device *d)
952{
953 struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
954 struct lu_device *next = ed->ed_next;
955
956 while (next && !ed->ed_next_ismd)
957 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
958 return NULL;
959}
960
961static void echo_lock_release(const struct lu_env *env,
962 struct echo_lock *ecl,
963 int still_used)
964{
965 struct cl_lock *clk = echo_lock2cl(ecl);
966
967 cl_lock_get(clk);
968 cl_unuse(env, clk);
969 cl_lock_release(env, clk, "ec enqueue", ecl->el_object);
970 if (!still_used) {
971 cl_lock_mutex_get(env, clk);
972 cl_lock_cancel(env, clk);
973 cl_lock_delete(env, clk);
974 cl_lock_mutex_put(env, clk);
975 }
976 cl_lock_put(env, clk);
977}
978
979static struct lu_device *echo_device_free(const struct lu_env *env,
980 struct lu_device *d)
981{
982 struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
983 struct echo_client_obd *ec = ed->ed_ec;
984 struct echo_object *eco;
985 struct lu_device *next = ed->ed_next;
986
987 CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
988 ed, next);
989
990 lu_site_purge(env, &ed->ed_site->cs_lu, -1);
991
992
993
994
995
996
997 spin_lock(&ec->ec_lock);
998 list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
999 eco->eo_deleted = 1;
1000 spin_unlock(&ec->ec_lock);
1001
1002
1003 lu_site_purge(env, &ed->ed_site->cs_lu, -1);
1004
1005 CDEBUG(D_INFO,
1006 "Waiting for the reference of echo object to be dropped\n");
1007
1008
1009 spin_lock(&ec->ec_lock);
1010 while (!list_empty(&ec->ec_objects)) {
1011 spin_unlock(&ec->ec_lock);
1012 CERROR("echo_client still has objects at cleanup time, "
1013 "wait for 1 second\n");
1014 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1015 cfs_time_seconds(1));
1016 lu_site_purge(env, &ed->ed_site->cs_lu, -1);
1017 spin_lock(&ec->ec_lock);
1018 }
1019 spin_unlock(&ec->ec_lock);
1020
1021 LASSERT(list_empty(&ec->ec_locks));
1022
1023 CDEBUG(D_INFO, "No object exists, exiting...\n");
1024
1025 echo_client_cleanup(d->ld_obd);
1026 echo_fid_fini(d->ld_obd);
1027 while (next && !ed->ed_next_ismd)
1028 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
1029
1030 LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
1031 echo_site_fini(env, ed);
1032 cl_device_fini(&ed->ed_cl);
1033 OBD_FREE_PTR(ed);
1034
1035 return NULL;
1036}
1037
1038static const struct lu_device_type_operations echo_device_type_ops = {
1039 .ldto_init = echo_type_init,
1040 .ldto_fini = echo_type_fini,
1041
1042 .ldto_start = echo_type_start,
1043 .ldto_stop = echo_type_stop,
1044
1045 .ldto_device_alloc = echo_device_alloc,
1046 .ldto_device_free = echo_device_free,
1047 .ldto_device_init = echo_device_init,
1048 .ldto_device_fini = echo_device_fini
1049};
1050
1051static struct lu_device_type echo_device_type = {
1052 .ldt_tags = LU_DEVICE_CL,
1053 .ldt_name = LUSTRE_ECHO_CLIENT_NAME,
1054 .ldt_ops = &echo_device_type_ops,
1055 .ldt_ctx_tags = LCT_CL_THREAD | LCT_MD_THREAD | LCT_DT_THREAD,
1056};
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067static struct echo_object *cl_echo_object_find(struct echo_device *d,
1068 struct lov_stripe_md **lsmp)
1069{
1070 struct lu_env *env;
1071 struct echo_thread_info *info;
1072 struct echo_object_conf *conf;
1073 struct lov_stripe_md *lsm;
1074 struct echo_object *eco;
1075 struct cl_object *obj;
1076 struct lu_fid *fid;
1077 int refcheck;
1078 int rc;
1079 ENTRY;
1080
1081 LASSERT(lsmp);
1082 lsm = *lsmp;
1083 LASSERT(lsm);
1084 LASSERTF(ostid_id(&lsm->lsm_oi) != 0, DOSTID"\n", POSTID(&lsm->lsm_oi));
1085 LASSERTF(ostid_seq(&lsm->lsm_oi) == FID_SEQ_ECHO, DOSTID"\n",
1086 POSTID(&lsm->lsm_oi));
1087
1088
1089 if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
1090 RETURN(ERR_PTR(-ENODEV));
1091
1092 env = cl_env_get(&refcheck);
1093 if (IS_ERR(env))
1094 RETURN((void *)env);
1095
1096 info = echo_env_info(env);
1097 conf = &info->eti_conf;
1098 if (d->ed_next) {
1099 if (!d->ed_next_islov) {
1100 struct lov_oinfo *oinfo = lsm->lsm_oinfo[0];
1101 LASSERT(oinfo != NULL);
1102 oinfo->loi_oi = lsm->lsm_oi;
1103 conf->eoc_cl.u.coc_oinfo = oinfo;
1104 } else {
1105 struct lustre_md *md;
1106 md = &info->eti_md;
1107 memset(md, 0, sizeof *md);
1108 md->lsm = lsm;
1109 conf->eoc_cl.u.coc_md = md;
1110 }
1111 }
1112 conf->eoc_md = lsmp;
1113
1114 fid = &info->eti_fid;
1115 rc = ostid_to_fid(fid, &lsm->lsm_oi, 0);
1116 if (rc != 0)
1117 GOTO(out, eco = ERR_PTR(rc));
1118
1119
1120
1121
1122 obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
1123 if (IS_ERR(obj))
1124 GOTO(out, eco = (void*)obj);
1125
1126 eco = cl2echo_obj(obj);
1127 if (eco->eo_deleted) {
1128 cl_object_put(env, obj);
1129 eco = ERR_PTR(-EAGAIN);
1130 }
1131
1132out:
1133 cl_env_put(env, &refcheck);
1134 RETURN(eco);
1135}
1136
1137static int cl_echo_object_put(struct echo_object *eco)
1138{
1139 struct lu_env *env;
1140 struct cl_object *obj = echo_obj2cl(eco);
1141 int refcheck;
1142 ENTRY;
1143
1144 env = cl_env_get(&refcheck);
1145 if (IS_ERR(env))
1146 RETURN(PTR_ERR(env));
1147
1148
1149 if (eco->eo_deleted) {
1150 struct lu_object_header *loh = obj->co_lu.lo_header;
1151 LASSERT(&eco->eo_hdr == luh2coh(loh));
1152 set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
1153 }
1154
1155 cl_object_put(env, obj);
1156 cl_env_put(env, &refcheck);
1157 RETURN(0);
1158}
1159
1160static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
1161 obd_off start, obd_off end, int mode,
1162 __u64 *cookie , __u32 enqflags)
1163{
1164 struct cl_io *io;
1165 struct cl_lock *lck;
1166 struct cl_object *obj;
1167 struct cl_lock_descr *descr;
1168 struct echo_thread_info *info;
1169 int rc = -ENOMEM;
1170 ENTRY;
1171
1172 info = echo_env_info(env);
1173 io = &info->eti_io;
1174 descr = &info->eti_descr;
1175 obj = echo_obj2cl(eco);
1176
1177 descr->cld_obj = obj;
1178 descr->cld_start = cl_index(obj, start);
1179 descr->cld_end = cl_index(obj, end);
1180 descr->cld_mode = mode == LCK_PW ? CLM_WRITE : CLM_READ;
1181 descr->cld_enq_flags = enqflags;
1182 io->ci_obj = obj;
1183
1184 lck = cl_lock_request(env, io, descr, "ec enqueue", eco);
1185 if (lck) {
1186 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
1187 struct echo_lock *el;
1188
1189 rc = cl_wait(env, lck);
1190 if (rc == 0) {
1191 el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
1192 spin_lock(&ec->ec_lock);
1193 if (list_empty(&el->el_chain)) {
1194 list_add(&el->el_chain, &ec->ec_locks);
1195 el->el_cookie = ++ec->ec_unique;
1196 }
1197 atomic_inc(&el->el_refcount);
1198 *cookie = el->el_cookie;
1199 spin_unlock(&ec->ec_lock);
1200 } else {
1201 cl_lock_release(env, lck, "ec enqueue", current);
1202 }
1203 }
1204 RETURN(rc);
1205}
1206
1207static int cl_echo_enqueue(struct echo_object *eco, obd_off start, obd_off end,
1208 int mode, __u64 *cookie)
1209{
1210 struct echo_thread_info *info;
1211 struct lu_env *env;
1212 struct cl_io *io;
1213 int refcheck;
1214 int result;
1215 ENTRY;
1216
1217 env = cl_env_get(&refcheck);
1218 if (IS_ERR(env))
1219 RETURN(PTR_ERR(env));
1220
1221 info = echo_env_info(env);
1222 io = &info->eti_io;
1223
1224 io->ci_ignore_layout = 1;
1225 result = cl_io_init(env, io, CIT_MISC, echo_obj2cl(eco));
1226 if (result < 0)
1227 GOTO(out, result);
1228 LASSERT(result == 0);
1229
1230 result = cl_echo_enqueue0(env, eco, start, end, mode, cookie, 0);
1231 cl_io_fini(env, io);
1232
1233 EXIT;
1234out:
1235 cl_env_put(env, &refcheck);
1236 return result;
1237}
1238
1239static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1240 __u64 cookie)
1241{
1242 struct echo_client_obd *ec = ed->ed_ec;
1243 struct echo_lock *ecl = NULL;
1244 struct list_head *el;
1245 int found = 0, still_used = 0;
1246 ENTRY;
1247
1248 LASSERT(ec != NULL);
1249 spin_lock(&ec->ec_lock);
1250 list_for_each (el, &ec->ec_locks) {
1251 ecl = list_entry (el, struct echo_lock, el_chain);
1252 CDEBUG(D_INFO, "ecl: %p, cookie: "LPX64"\n", ecl, ecl->el_cookie);
1253 found = (ecl->el_cookie == cookie);
1254 if (found) {
1255 if (atomic_dec_and_test(&ecl->el_refcount))
1256 list_del_init(&ecl->el_chain);
1257 else
1258 still_used = 1;
1259 break;
1260 }
1261 }
1262 spin_unlock(&ec->ec_lock);
1263
1264 if (!found)
1265 RETURN(-ENOENT);
1266
1267 echo_lock_release(env, ecl, still_used);
1268 RETURN(0);
1269}
1270
1271static int cl_echo_cancel(struct echo_device *ed, __u64 cookie)
1272{
1273 struct lu_env *env;
1274 int refcheck;
1275 int rc;
1276 ENTRY;
1277
1278 env = cl_env_get(&refcheck);
1279 if (IS_ERR(env))
1280 RETURN(PTR_ERR(env));
1281
1282 rc = cl_echo_cancel0(env, ed, cookie);
1283
1284 cl_env_put(env, &refcheck);
1285 RETURN(rc);
1286}
1287
1288static int cl_echo_async_brw(const struct lu_env *env, struct cl_io *io,
1289 enum cl_req_type unused, struct cl_2queue *queue)
1290{
1291 struct cl_page *clp;
1292 struct cl_page *temp;
1293 int result = 0;
1294 ENTRY;
1295
1296 cl_page_list_for_each_safe(clp, temp, &queue->c2_qin) {
1297 int rc;
1298 rc = cl_page_cache_add(env, io, clp, CRT_WRITE);
1299 if (rc == 0)
1300 continue;
1301 result = result ?: rc;
1302 }
1303 RETURN(result);
1304}
1305
1306static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
1307 struct page **pages, int npages, int async)
1308{
1309 struct lu_env *env;
1310 struct echo_thread_info *info;
1311 struct cl_object *obj = echo_obj2cl(eco);
1312 struct echo_device *ed = eco->eo_dev;
1313 struct cl_2queue *queue;
1314 struct cl_io *io;
1315 struct cl_page *clp;
1316 struct lustre_handle lh = { 0 };
1317 int page_size = cl_page_size(obj);
1318 int refcheck;
1319 int rc;
1320 int i;
1321 ENTRY;
1322
1323 LASSERT((offset & ~CFS_PAGE_MASK) == 0);
1324 LASSERT(ed->ed_next != NULL);
1325 env = cl_env_get(&refcheck);
1326 if (IS_ERR(env))
1327 RETURN(PTR_ERR(env));
1328
1329 info = echo_env_info(env);
1330 io = &info->eti_io;
1331 queue = &info->eti_queue;
1332
1333 cl_2queue_init(queue);
1334
1335 io->ci_ignore_layout = 1;
1336 rc = cl_io_init(env, io, CIT_MISC, obj);
1337 if (rc < 0)
1338 GOTO(out, rc);
1339 LASSERT(rc == 0);
1340
1341
1342 rc = cl_echo_enqueue0(env, eco, offset,
1343 offset + npages * PAGE_CACHE_SIZE - 1,
1344 rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1345 CEF_NEVER);
1346 if (rc < 0)
1347 GOTO(error_lock, rc);
1348
1349 for (i = 0; i < npages; i++) {
1350 LASSERT(pages[i]);
1351 clp = cl_page_find(env, obj, cl_index(obj, offset),
1352 pages[i], CPT_TRANSIENT);
1353 if (IS_ERR(clp)) {
1354 rc = PTR_ERR(clp);
1355 break;
1356 }
1357 LASSERT(clp->cp_type == CPT_TRANSIENT);
1358
1359 rc = cl_page_own(env, io, clp);
1360 if (rc) {
1361 LASSERT(clp->cp_state == CPS_FREEING);
1362 cl_page_put(env, clp);
1363 break;
1364 }
1365
1366 cl_2queue_add(queue, clp);
1367
1368
1369
1370 cl_page_put(env, clp);
1371 cl_page_clip(env, clp, 0, page_size);
1372
1373 offset += page_size;
1374 }
1375
1376 if (rc == 0) {
1377 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1378
1379 async = async && (typ == CRT_WRITE);
1380 if (async)
1381 rc = cl_echo_async_brw(env, io, typ, queue);
1382 else
1383 rc = cl_io_submit_sync(env, io, typ, queue, 0);
1384 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1385 async ? "async" : "sync", rc);
1386 }
1387
1388 cl_echo_cancel0(env, ed, lh.cookie);
1389 EXIT;
1390error_lock:
1391 cl_2queue_discard(env, io, queue);
1392 cl_2queue_disown(env, io, queue);
1393 cl_2queue_fini(env, queue);
1394 cl_io_fini(env, io);
1395out:
1396 cl_env_put(env, &refcheck);
1397 return rc;
1398}
1399
1400
1401
1402static obd_id last_object_id;
1403
1404static int
1405echo_copyout_lsm (struct lov_stripe_md *lsm, void *_ulsm, int ulsm_nob)
1406{
1407 struct lov_stripe_md *ulsm = _ulsm;
1408 int nob, i;
1409
1410 nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
1411 if (nob > ulsm_nob)
1412 return (-EINVAL);
1413
1414 if (copy_to_user (ulsm, lsm, sizeof(ulsm)))
1415 return (-EFAULT);
1416
1417 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1418 if (copy_to_user (ulsm->lsm_oinfo[i], lsm->lsm_oinfo[i],
1419 sizeof(lsm->lsm_oinfo[0])))
1420 return (-EFAULT);
1421 }
1422 return 0;
1423}
1424
1425static int
1426echo_copyin_lsm (struct echo_device *ed, struct lov_stripe_md *lsm,
1427 void *ulsm, int ulsm_nob)
1428{
1429 struct echo_client_obd *ec = ed->ed_ec;
1430 int i;
1431
1432 if (ulsm_nob < sizeof (*lsm))
1433 return (-EINVAL);
1434
1435 if (copy_from_user (lsm, ulsm, sizeof (*lsm)))
1436 return (-EFAULT);
1437
1438 if (lsm->lsm_stripe_count > ec->ec_nstripes ||
1439 lsm->lsm_magic != LOV_MAGIC ||
1440 (lsm->lsm_stripe_size & (~CFS_PAGE_MASK)) != 0 ||
1441 ((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
1442 return (-EINVAL);
1443
1444
1445 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1446 if (copy_from_user(lsm->lsm_oinfo[i],
1447 ((struct lov_stripe_md *)ulsm)-> \
1448 lsm_oinfo[i],
1449 sizeof(lsm->lsm_oinfo[0])))
1450 return (-EFAULT);
1451 }
1452 return (0);
1453}
1454
1455static inline void echo_md_build_name(struct lu_name *lname, char *name,
1456 __u64 id)
1457{
1458 sprintf(name, LPU64, id);
1459 lname->ln_name = name;
1460 lname->ln_namelen = strlen(name);
1461}
1462
1463
1464static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
1465 struct md_attr *ma)
1466{
1467 struct echo_thread_info *info = echo_env_info(env);
1468 int rc;
1469
1470 ENTRY;
1471
1472 LASSERT(ma->ma_lmm_size > 0);
1473
1474 rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
1475 if (rc < 0)
1476 RETURN(rc);
1477
1478
1479 if (info->eti_big_lmmsize < rc) {
1480 int size = size_roundup_power2(rc);
1481
1482 if (info->eti_big_lmmsize > 0) {
1483
1484 LASSERT(info->eti_big_lmm);
1485 OBD_FREE_LARGE(info->eti_big_lmm,
1486 info->eti_big_lmmsize);
1487 info->eti_big_lmm = NULL;
1488 info->eti_big_lmmsize = 0;
1489 }
1490
1491 OBD_ALLOC_LARGE(info->eti_big_lmm, size);
1492 if (info->eti_big_lmm == NULL)
1493 RETURN(-ENOMEM);
1494 info->eti_big_lmmsize = size;
1495 }
1496 LASSERT(info->eti_big_lmmsize >= rc);
1497
1498 info->eti_buf.lb_buf = info->eti_big_lmm;
1499 info->eti_buf.lb_len = info->eti_big_lmmsize;
1500 rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
1501 if (rc < 0)
1502 RETURN(rc);
1503
1504 ma->ma_valid |= MA_LOV;
1505 ma->ma_lmm = info->eti_big_lmm;
1506 ma->ma_lmm_size = rc;
1507
1508 RETURN(0);
1509}
1510
1511int echo_attr_get_complex(const struct lu_env *env, struct md_object *next,
1512 struct md_attr *ma)
1513{
1514 struct echo_thread_info *info = echo_env_info(env);
1515 struct lu_buf *buf = &info->eti_buf;
1516 umode_t mode = lu_object_attr(&next->mo_lu);
1517 int need = ma->ma_need;
1518 int rc = 0, rc2;
1519
1520 ENTRY;
1521
1522 ma->ma_valid = 0;
1523
1524 if (need & MA_INODE) {
1525 ma->ma_need = MA_INODE;
1526 rc = mo_attr_get(env, next, ma);
1527 if (rc)
1528 GOTO(out, rc);
1529 ma->ma_valid |= MA_INODE;
1530 }
1531
1532 if (need & MA_LOV) {
1533 if (S_ISREG(mode) || S_ISDIR(mode)) {
1534 LASSERT(ma->ma_lmm_size > 0);
1535 buf->lb_buf = ma->ma_lmm;
1536 buf->lb_len = ma->ma_lmm_size;
1537 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV);
1538 if (rc2 > 0) {
1539 ma->ma_lmm_size = rc2;
1540 ma->ma_valid |= MA_LOV;
1541 } else if (rc2 == -ENODATA) {
1542
1543 ma->ma_lmm_size = 0;
1544 } else if (rc2 == -ERANGE) {
1545 rc2 = echo_big_lmm_get(env, next, ma);
1546 if (rc2 < 0)
1547 GOTO(out, rc = rc2);
1548 } else {
1549 GOTO(out, rc = rc2);
1550 }
1551 }
1552 }
1553
1554#ifdef CONFIG_FS_POSIX_ACL
1555 if (need & MA_ACL_DEF && S_ISDIR(mode)) {
1556 buf->lb_buf = ma->ma_acl;
1557 buf->lb_len = ma->ma_acl_size;
1558 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1559 if (rc2 > 0) {
1560 ma->ma_acl_size = rc2;
1561 ma->ma_valid |= MA_ACL_DEF;
1562 } else if (rc2 == -ENODATA) {
1563
1564 ma->ma_acl_size = 0;
1565 } else {
1566 GOTO(out, rc = rc2);
1567 }
1568 }
1569#endif
1570out:
1571 ma->ma_need = need;
1572 CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
1573 rc, ma->ma_valid, ma->ma_lmm);
1574 RETURN(rc);
1575}
1576
1577static int
1578echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
1579 struct md_object *parent, struct lu_fid *fid,
1580 struct lu_name *lname, struct md_op_spec *spec,
1581 struct md_attr *ma)
1582{
1583 struct lu_object *ec_child, *child;
1584 struct lu_device *ld = ed->ed_next;
1585 struct echo_thread_info *info = echo_env_info(env);
1586 struct lu_fid *fid2 = &info->eti_fid2;
1587 struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
1588 int rc;
1589
1590 ENTRY;
1591
1592 rc = mdo_lookup(env, parent, lname, fid2, spec);
1593 if (rc == 0)
1594 return -EEXIST;
1595 else if (rc != -ENOENT)
1596 return rc;
1597
1598 ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev,
1599 fid, &conf);
1600 if (IS_ERR(ec_child)) {
1601 CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
1602 PTR_ERR(ec_child));
1603 RETURN(PTR_ERR(ec_child));
1604 }
1605
1606 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1607 if (child == NULL) {
1608 CERROR("Can not locate the child "DFID"\n", PFID(fid));
1609 GOTO(out_put, rc = -EINVAL);
1610 }
1611
1612 CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
1613 PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1614
1615
1616
1617
1618 spec->sp_cr_lookup = 0;
1619 rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
1620 if (rc) {
1621 CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
1622 GOTO(out_put, rc);
1623 }
1624 CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc = %d\n",
1625 PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
1626 EXIT;
1627out_put:
1628 lu_object_put(env, ec_child);
1629 return rc;
1630}
1631
1632static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld,
1633 struct md_attr *ma)
1634{
1635 struct echo_thread_info *info = echo_env_info(env);
1636
1637 if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
1638 ma->ma_lmm = (void *)&info->eti_lmm;
1639 ma->ma_lmm_size = sizeof(info->eti_lmm);
1640 } else {
1641 LASSERT(info->eti_big_lmmsize);
1642 ma->ma_lmm = info->eti_big_lmm;
1643 ma->ma_lmm_size = info->eti_big_lmmsize;
1644 }
1645
1646 return 0;
1647}
1648
1649static int echo_create_md_object(const struct lu_env *env,
1650 struct echo_device *ed,
1651 struct lu_object *ec_parent,
1652 struct lu_fid *fid,
1653 char *name, int namelen,
1654 __u64 id, __u32 mode, int count,
1655 int stripe_count, int stripe_offset)
1656{
1657 struct lu_object *parent;
1658 struct echo_thread_info *info = echo_env_info(env);
1659 struct lu_name *lname = &info->eti_lname;
1660 struct md_op_spec *spec = &info->eti_spec;
1661 struct md_attr *ma = &info->eti_ma;
1662 struct lu_device *ld = ed->ed_next;
1663 int rc = 0;
1664 int i;
1665
1666 ENTRY;
1667
1668 if (ec_parent == NULL)
1669 return -1;
1670 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1671 if (parent == NULL)
1672 RETURN(-ENXIO);
1673
1674 memset(ma, 0, sizeof(*ma));
1675 memset(spec, 0, sizeof(*spec));
1676 if (stripe_count != 0) {
1677 spec->sp_cr_flags |= FMODE_WRITE;
1678 echo_set_lmm_size(env, ld, ma);
1679 if (stripe_count != -1) {
1680 struct lov_user_md_v3 *lum = &info->eti_lum;
1681
1682 lum->lmm_magic = LOV_USER_MAGIC_V3;
1683 lum->lmm_stripe_count = stripe_count;
1684 lum->lmm_stripe_offset = stripe_offset;
1685 lum->lmm_pattern = 0;
1686 spec->u.sp_ea.eadata = lum;
1687 spec->u.sp_ea.eadatalen = sizeof(*lum);
1688 spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
1689 }
1690 }
1691
1692 ma->ma_attr.la_mode = mode;
1693 ma->ma_attr.la_valid = LA_CTIME | LA_MODE;
1694 ma->ma_attr.la_ctime = cfs_time_current_64();
1695
1696 if (name != NULL) {
1697 lname->ln_name = name;
1698 lname->ln_namelen = namelen;
1699
1700 rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
1701 spec, ma);
1702 RETURN(rc);
1703 }
1704
1705
1706 for (i = 0; i < count; i++) {
1707 char *tmp_name = info->eti_name;
1708
1709 echo_md_build_name(lname, tmp_name, id);
1710
1711 rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
1712 spec, ma);
1713 if (rc) {
1714 CERROR("Can not create child %s: rc = %d\n", tmp_name,
1715 rc);
1716 break;
1717 }
1718 id++;
1719 fid->f_oid++;
1720 }
1721
1722 RETURN(rc);
1723}
1724
1725static struct lu_object *echo_md_lookup(const struct lu_env *env,
1726 struct echo_device *ed,
1727 struct md_object *parent,
1728 struct lu_name *lname)
1729{
1730 struct echo_thread_info *info = echo_env_info(env);
1731 struct lu_fid *fid = &info->eti_fid;
1732 struct lu_object *child;
1733 int rc;
1734 ENTRY;
1735
1736 CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
1737 PFID(fid), parent);
1738 rc = mdo_lookup(env, parent, lname, fid, NULL);
1739 if (rc) {
1740 CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
1741 RETURN(ERR_PTR(rc));
1742 }
1743
1744
1745
1746
1747 child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
1748
1749 RETURN(child);
1750}
1751
1752static int echo_setattr_object(const struct lu_env *env,
1753 struct echo_device *ed,
1754 struct lu_object *ec_parent,
1755 __u64 id, int count)
1756{
1757 struct lu_object *parent;
1758 struct echo_thread_info *info = echo_env_info(env);
1759 struct lu_name *lname = &info->eti_lname;
1760 char *name = info->eti_name;
1761 struct lu_device *ld = ed->ed_next;
1762 struct lu_buf *buf = &info->eti_buf;
1763 int rc = 0;
1764 int i;
1765
1766 ENTRY;
1767
1768 if (ec_parent == NULL)
1769 return -1;
1770 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1771 if (parent == NULL)
1772 RETURN(-ENXIO);
1773
1774 for (i = 0; i < count; i++) {
1775 struct lu_object *ec_child, *child;
1776
1777 echo_md_build_name(lname, name, id);
1778
1779 ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
1780 if (IS_ERR(ec_child)) {
1781 CERROR("Can't find child %s: rc = %ld\n",
1782 lname->ln_name, PTR_ERR(ec_child));
1783 RETURN(PTR_ERR(ec_child));
1784 }
1785
1786 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1787 if (child == NULL) {
1788 CERROR("Can not locate the child %s\n", lname->ln_name);
1789 lu_object_put(env, ec_child);
1790 rc = -EINVAL;
1791 break;
1792 }
1793
1794 CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
1795 PFID(lu_object_fid(child)));
1796
1797 buf->lb_buf = info->eti_xattr_buf;
1798 buf->lb_len = sizeof(info->eti_xattr_buf);
1799
1800 sprintf(name, "%s.test1", XATTR_USER_PREFIX);
1801 rc = mo_xattr_set(env, lu2md(child), buf, name,
1802 LU_XATTR_CREATE);
1803 if (rc < 0) {
1804 CERROR("Can not setattr child "DFID": rc = %d\n",
1805 PFID(lu_object_fid(child)), rc);
1806 lu_object_put(env, ec_child);
1807 break;
1808 }
1809 CDEBUG(D_RPCTRACE, "End setattr object "DFID"\n",
1810 PFID(lu_object_fid(child)));
1811 id++;
1812 lu_object_put(env, ec_child);
1813 }
1814 RETURN(rc);
1815}
1816
1817static int echo_getattr_object(const struct lu_env *env,
1818 struct echo_device *ed,
1819 struct lu_object *ec_parent,
1820 __u64 id, int count)
1821{
1822 struct lu_object *parent;
1823 struct echo_thread_info *info = echo_env_info(env);
1824 struct lu_name *lname = &info->eti_lname;
1825 char *name = info->eti_name;
1826 struct md_attr *ma = &info->eti_ma;
1827 struct lu_device *ld = ed->ed_next;
1828 int rc = 0;
1829 int i;
1830
1831 ENTRY;
1832
1833 if (ec_parent == NULL)
1834 return -1;
1835 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1836 if (parent == NULL)
1837 RETURN(-ENXIO);
1838
1839 memset(ma, 0, sizeof(*ma));
1840 ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
1841 ma->ma_acl = info->eti_xattr_buf;
1842 ma->ma_acl_size = sizeof(info->eti_xattr_buf);
1843
1844 for (i = 0; i < count; i++) {
1845 struct lu_object *ec_child, *child;
1846
1847 ma->ma_valid = 0;
1848 echo_md_build_name(lname, name, id);
1849 echo_set_lmm_size(env, ld, ma);
1850
1851 ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
1852 if (IS_ERR(ec_child)) {
1853 CERROR("Can't find child %s: rc = %ld\n",
1854 lname->ln_name, PTR_ERR(ec_child));
1855 RETURN(PTR_ERR(ec_child));
1856 }
1857
1858 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1859 if (child == NULL) {
1860 CERROR("Can not locate the child %s\n", lname->ln_name);
1861 lu_object_put(env, ec_child);
1862 RETURN(-EINVAL);
1863 }
1864
1865 CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
1866 PFID(lu_object_fid(child)));
1867 rc = echo_attr_get_complex(env, lu2md(child), ma);
1868 if (rc) {
1869 CERROR("Can not getattr child "DFID": rc = %d\n",
1870 PFID(lu_object_fid(child)), rc);
1871 lu_object_put(env, ec_child);
1872 break;
1873 }
1874 CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
1875 PFID(lu_object_fid(child)));
1876 id++;
1877 lu_object_put(env, ec_child);
1878 }
1879
1880 RETURN(rc);
1881}
1882
1883static int echo_lookup_object(const struct lu_env *env,
1884 struct echo_device *ed,
1885 struct lu_object *ec_parent,
1886 __u64 id, int count)
1887{
1888 struct lu_object *parent;
1889 struct echo_thread_info *info = echo_env_info(env);
1890 struct lu_name *lname = &info->eti_lname;
1891 char *name = info->eti_name;
1892 struct lu_fid *fid = &info->eti_fid;
1893 struct lu_device *ld = ed->ed_next;
1894 int rc = 0;
1895 int i;
1896
1897 if (ec_parent == NULL)
1898 return -1;
1899 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1900 if (parent == NULL)
1901 return -ENXIO;
1902
1903
1904 for (i = 0; i < count; i++) {
1905 echo_md_build_name(lname, name, id);
1906
1907 CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
1908 PFID(lu_object_fid(parent)), lname->ln_name, parent);
1909
1910 rc = mdo_lookup(env, lu2md(parent), lname, fid, NULL);
1911 if (rc) {
1912 CERROR("Can not lookup child %s: rc = %d\n", name, rc);
1913 break;
1914 }
1915 CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
1916 PFID(lu_object_fid(parent)), lname->ln_name, parent);
1917
1918 id++;
1919 }
1920 return rc;
1921}
1922
1923static int echo_md_destroy_internal(const struct lu_env *env,
1924 struct echo_device *ed,
1925 struct md_object *parent,
1926 struct lu_name *lname,
1927 struct md_attr *ma)
1928{
1929 struct lu_device *ld = ed->ed_next;
1930 struct lu_object *ec_child;
1931 struct lu_object *child;
1932 int rc;
1933
1934 ENTRY;
1935
1936 ec_child = echo_md_lookup(env, ed, parent, lname);
1937 if (IS_ERR(ec_child)) {
1938 CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
1939 PTR_ERR(ec_child));
1940 RETURN(PTR_ERR(ec_child));
1941 }
1942
1943 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
1944 if (child == NULL) {
1945 CERROR("Can not locate the child %s\n", lname->ln_name);
1946 GOTO(out_put, rc = -EINVAL);
1947 }
1948
1949 CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
1950 PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1951
1952 rc = mdo_unlink(env, parent, lu2md(child), lname, ma, 0);
1953 if (rc) {
1954 CERROR("Can not unlink child %s: rc = %d\n",
1955 lname->ln_name, rc);
1956 GOTO(out_put, rc);
1957 }
1958 CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n",
1959 PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
1960out_put:
1961 lu_object_put(env, ec_child);
1962 return rc;
1963}
1964
1965static int echo_destroy_object(const struct lu_env *env,
1966 struct echo_device *ed,
1967 struct lu_object *ec_parent,
1968 char *name, int namelen,
1969 __u64 id, __u32 mode,
1970 int count)
1971{
1972 struct echo_thread_info *info = echo_env_info(env);
1973 struct lu_name *lname = &info->eti_lname;
1974 struct md_attr *ma = &info->eti_ma;
1975 struct lu_device *ld = ed->ed_next;
1976 struct lu_object *parent;
1977 int rc = 0;
1978 int i;
1979 ENTRY;
1980
1981 parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
1982 if (parent == NULL)
1983 RETURN(-EINVAL);
1984
1985 memset(ma, 0, sizeof(*ma));
1986 ma->ma_attr.la_mode = mode;
1987 ma->ma_attr.la_valid = LA_CTIME;
1988 ma->ma_attr.la_ctime = cfs_time_current_64();
1989 ma->ma_need = MA_INODE;
1990 ma->ma_valid = 0;
1991
1992 if (name != NULL) {
1993 lname->ln_name = name;
1994 lname->ln_namelen = namelen;
1995 rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
1996 ma);
1997 RETURN(rc);
1998 }
1999
2000
2001 for (i = 0; i < count; i++) {
2002 char *tmp_name = info->eti_name;
2003
2004 ma->ma_valid = 0;
2005 echo_md_build_name(lname, tmp_name, id);
2006
2007 rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
2008 ma);
2009 if (rc) {
2010 CERROR("Can not unlink child %s: rc = %d\n", name, rc);
2011 break;
2012 }
2013 id++;
2014 }
2015
2016 RETURN(rc);
2017}
2018
2019static struct lu_object *echo_resolve_path(const struct lu_env *env,
2020 struct echo_device *ed, char *path,
2021 int path_len)
2022{
2023 struct lu_device *ld = ed->ed_next;
2024 struct md_device *md = lu2md_dev(ld);
2025 struct echo_thread_info *info = echo_env_info(env);
2026 struct lu_fid *fid = &info->eti_fid;
2027 struct lu_name *lname = &info->eti_lname;
2028 struct lu_object *parent = NULL;
2029 struct lu_object *child = NULL;
2030 int rc = 0;
2031 ENTRY;
2032
2033
2034 rc = md->md_ops->mdo_root_get(env, md, fid);
2035 if (rc) {
2036 CERROR("get root error: rc = %d\n", rc);
2037 RETURN(ERR_PTR(rc));
2038 }
2039
2040
2041
2042
2043 parent = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
2044 if (IS_ERR(parent)) {
2045 CERROR("Can not find the parent "DFID": rc = %ld\n",
2046 PFID(fid), PTR_ERR(parent));
2047 RETURN(parent);
2048 }
2049
2050 while (1) {
2051 struct lu_object *ld_parent;
2052 char *e;
2053
2054 e = strsep(&path, "/");
2055 if (e == NULL)
2056 break;
2057
2058 if (e[0] == 0) {
2059 if (!path || path[0] == '\0')
2060 break;
2061 continue;
2062 }
2063
2064 lname->ln_name = e;
2065 lname->ln_namelen = strlen(e);
2066
2067 ld_parent = lu_object_locate(parent->lo_header, ld->ld_type);
2068 if (ld_parent == NULL) {
2069 lu_object_put(env, parent);
2070 rc = -EINVAL;
2071 break;
2072 }
2073
2074 child = echo_md_lookup(env, ed, lu2md(ld_parent), lname);
2075 lu_object_put(env, parent);
2076 if (IS_ERR(child)) {
2077 rc = (int)PTR_ERR(child);
2078 CERROR("lookup %s under parent "DFID": rc = %d\n",
2079 lname->ln_name, PFID(lu_object_fid(ld_parent)),
2080 rc);
2081 break;
2082 }
2083 parent = child;
2084 }
2085 if (rc)
2086 RETURN(ERR_PTR(rc));
2087
2088 RETURN(parent);
2089}
2090
2091static void echo_ucred_init(struct lu_env *env)
2092{
2093 struct lu_ucred *ucred = lu_ucred(env);
2094
2095 ucred->uc_valid = UCRED_INVALID;
2096
2097 ucred->uc_suppgids[0] = -1;
2098 ucred->uc_suppgids[1] = -1;
2099
2100 ucred->uc_uid = ucred->uc_o_uid = current_uid();
2101 ucred->uc_gid = ucred->uc_o_gid = current_gid();
2102 ucred->uc_fsuid = ucred->uc_o_fsuid = current_fsuid();
2103 ucred->uc_fsgid = ucred->uc_o_fsgid = current_fsgid();
2104 ucred->uc_cap = cfs_curproc_cap_pack();
2105
2106
2107 if (ucred->uc_fsuid)
2108 ucred->uc_cap &= ~CFS_CAP_FS_MASK;
2109 ucred->uc_valid = UCRED_NEW;
2110}
2111
2112static void echo_ucred_fini(struct lu_env *env)
2113{
2114 struct lu_ucred *ucred = lu_ucred(env);
2115 ucred->uc_valid = UCRED_INIT;
2116}
2117
2118#define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
2119#define ECHO_MD_SES_TAG (LCT_REMEMBER | LCT_SESSION)
2120static int echo_md_handler(struct echo_device *ed, int command,
2121 char *path, int path_len, __u64 id, int count,
2122 struct obd_ioctl_data *data)
2123{
2124 struct echo_thread_info *info;
2125 struct lu_device *ld = ed->ed_next;
2126 struct lu_env *env;
2127 int refcheck;
2128 struct lu_object *parent;
2129 char *name = NULL;
2130 int namelen = data->ioc_plen2;
2131 int rc = 0;
2132 ENTRY;
2133
2134 if (ld == NULL) {
2135 CERROR("MD echo client is not being initialized properly\n");
2136 RETURN(-EINVAL);
2137 }
2138
2139 if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
2140 CERROR("Only support MDD layer right now!\n");
2141 RETURN(-EINVAL);
2142 }
2143
2144 env = cl_env_get(&refcheck);
2145 if (IS_ERR(env))
2146 RETURN(PTR_ERR(env));
2147
2148 rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_MD_SES_TAG);
2149 if (rc != 0)
2150 GOTO(out_env, rc);
2151
2152
2153 info = echo_env_info(env);
2154 LASSERT(info->eti_big_lmm == NULL);
2155 OBD_ALLOC_LARGE(info->eti_big_lmm, MIN_MD_SIZE);
2156 if (info->eti_big_lmm == NULL)
2157 GOTO(out_env, rc = -ENOMEM);
2158 info->eti_big_lmmsize = MIN_MD_SIZE;
2159
2160 parent = echo_resolve_path(env, ed, path, path_len);
2161 if (IS_ERR(parent)) {
2162 CERROR("Can not resolve the path %s: rc = %ld\n", path,
2163 PTR_ERR(parent));
2164 GOTO(out_free, rc = PTR_ERR(parent));
2165 }
2166
2167 if (namelen > 0) {
2168 OBD_ALLOC(name, namelen + 1);
2169 if (name == NULL)
2170 GOTO(out_put, rc = -ENOMEM);
2171 if (copy_from_user(name, data->ioc_pbuf2, namelen))
2172 GOTO(out_name, rc = -EFAULT);
2173 }
2174
2175 echo_ucred_init(env);
2176
2177 switch (command) {
2178 case ECHO_MD_CREATE:
2179 case ECHO_MD_MKDIR: {
2180 struct echo_thread_info *info = echo_env_info(env);
2181 __u32 mode = data->ioc_obdo2.o_mode;
2182 struct lu_fid *fid = &info->eti_fid;
2183 int stripe_count = (int)data->ioc_obdo2.o_misc;
2184 int stripe_index = (int)data->ioc_obdo2.o_stripe_idx;
2185
2186 rc = ostid_to_fid(fid, &data->ioc_obdo1.o_oi, 0);
2187 if (rc != 0)
2188 break;
2189
2190
2191
2192
2193 rc = echo_create_md_object(env, ed, parent, fid, name, namelen,
2194 id, mode, count, stripe_count,
2195 stripe_index);
2196 break;
2197 }
2198 case ECHO_MD_DESTROY:
2199 case ECHO_MD_RMDIR: {
2200 __u32 mode = data->ioc_obdo2.o_mode;
2201
2202 rc = echo_destroy_object(env, ed, parent, name, namelen,
2203 id, mode, count);
2204 break;
2205 }
2206 case ECHO_MD_LOOKUP:
2207 rc = echo_lookup_object(env, ed, parent, id, count);
2208 break;
2209 case ECHO_MD_GETATTR:
2210 rc = echo_getattr_object(env, ed, parent, id, count);
2211 break;
2212 case ECHO_MD_SETATTR:
2213 rc = echo_setattr_object(env, ed, parent, id, count);
2214 break;
2215 default:
2216 CERROR("unknown command %d\n", command);
2217 rc = -EINVAL;
2218 break;
2219 }
2220 echo_ucred_fini(env);
2221
2222out_name:
2223 if (name != NULL)
2224 OBD_FREE(name, namelen + 1);
2225out_put:
2226 lu_object_put(env, parent);
2227out_free:
2228 LASSERT(info->eti_big_lmm);
2229 OBD_FREE_LARGE(info->eti_big_lmm, info->eti_big_lmmsize);
2230 info->eti_big_lmm = NULL;
2231 info->eti_big_lmmsize = 0;
2232out_env:
2233 cl_env_put(env, &refcheck);
2234 return rc;
2235}
2236
2237static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
2238 int on_target, struct obdo *oa, void *ulsm,
2239 int ulsm_nob, struct obd_trans_info *oti)
2240{
2241 struct echo_object *eco;
2242 struct echo_client_obd *ec = ed->ed_ec;
2243 struct lov_stripe_md *lsm = NULL;
2244 int rc;
2245 int created = 0;
2246 ENTRY;
2247
2248 if ((oa->o_valid & OBD_MD_FLID) == 0 &&
2249 (on_target ||
2250 ec->ec_nstripes != 0)) {
2251 CERROR ("No valid oid\n");
2252 RETURN(-EINVAL);
2253 }
2254
2255 rc = echo_alloc_memmd(ed, &lsm);
2256 if (rc < 0) {
2257 CERROR("Cannot allocate md: rc = %d\n", rc);
2258 GOTO(failed, rc);
2259 }
2260
2261 if (ulsm != NULL) {
2262 int i, idx;
2263
2264 rc = echo_copyin_lsm (ed, lsm, ulsm, ulsm_nob);
2265 if (rc != 0)
2266 GOTO(failed, rc);
2267
2268 if (lsm->lsm_stripe_count == 0)
2269 lsm->lsm_stripe_count = ec->ec_nstripes;
2270
2271 if (lsm->lsm_stripe_size == 0)
2272 lsm->lsm_stripe_size = PAGE_CACHE_SIZE;
2273
2274 idx = cfs_rand();
2275
2276
2277 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2278 if (ostid_id(&lsm->lsm_oinfo[i]->loi_oi) == 0)
2279 lsm->lsm_oinfo[i]->loi_oi = lsm->lsm_oi;
2280
2281 lsm->lsm_oinfo[i]->loi_ost_idx =
2282 (idx + i) % ec->ec_nstripes;
2283 }
2284 }
2285
2286
2287 if (oa->o_valid & OBD_MD_FLID) {
2288 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
2289 lsm->lsm_oi = oa->o_oi;
2290 }
2291
2292 if (ostid_id(&lsm->lsm_oi) == 0)
2293 ostid_set_id(&lsm->lsm_oi, ++last_object_id);
2294
2295 rc = 0;
2296 if (on_target) {
2297
2298 LASSERT((oa->o_valid & OBD_MD_FLGROUP) &&
2299 (ostid_seq(&oa->o_oi) == FID_SEQ_ECHO));
2300 rc = obd_create(env, ec->ec_exp, oa, &lsm, oti);
2301 if (rc != 0) {
2302 CERROR("Cannot create objects: rc = %d\n", rc);
2303 GOTO(failed, rc);
2304 }
2305 created = 1;
2306 }
2307
2308
2309 oa->o_oi = lsm->lsm_oi;
2310 oa->o_valid |= OBD_MD_FLID;
2311
2312 eco = cl_echo_object_find(ed, &lsm);
2313 if (IS_ERR(eco))
2314 GOTO(failed, rc = PTR_ERR(eco));
2315 cl_echo_object_put(eco);
2316
2317 CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
2318 EXIT;
2319
2320 failed:
2321 if (created && rc)
2322 obd_destroy(env, ec->ec_exp, oa, lsm, oti, NULL, NULL);
2323 if (lsm)
2324 echo_free_memmd(ed, &lsm);
2325 if (rc)
2326 CERROR("create object failed with: rc = %d\n", rc);
2327 return (rc);
2328}
2329
2330static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
2331 struct obdo *oa)
2332{
2333 struct lov_stripe_md *lsm = NULL;
2334 struct echo_object *eco;
2335 int rc;
2336 ENTRY;
2337
2338 if ((oa->o_valid & OBD_MD_FLID) == 0 || ostid_id(&oa->o_oi) == 0) {
2339
2340 CERROR ("No valid oid\n");
2341 RETURN(-EINVAL);
2342 }
2343
2344 rc = echo_alloc_memmd(ed, &lsm);
2345 if (rc < 0)
2346 RETURN(rc);
2347
2348 lsm->lsm_oi = oa->o_oi;
2349 if (!(oa->o_valid & OBD_MD_FLGROUP))
2350 ostid_set_seq_echo(&lsm->lsm_oi);
2351
2352 rc = 0;
2353 eco = cl_echo_object_find(ed, &lsm);
2354 if (!IS_ERR(eco))
2355 *ecop = eco;
2356 else
2357 rc = PTR_ERR(eco);
2358 if (lsm)
2359 echo_free_memmd(ed, &lsm);
2360 RETURN(rc);
2361}
2362
2363static void echo_put_object(struct echo_object *eco)
2364{
2365 if (cl_echo_object_put(eco))
2366 CERROR("echo client: drop an object failed");
2367}
2368
2369static void
2370echo_get_stripe_off_id (struct lov_stripe_md *lsm, obd_off *offp, obd_id *idp)
2371{
2372 unsigned long stripe_count;
2373 unsigned long stripe_size;
2374 unsigned long width;
2375 unsigned long woffset;
2376 int stripe_index;
2377 obd_off offset;
2378
2379 if (lsm->lsm_stripe_count <= 1)
2380 return;
2381
2382 offset = *offp;
2383 stripe_size = lsm->lsm_stripe_size;
2384 stripe_count = lsm->lsm_stripe_count;
2385
2386
2387 width = stripe_size * stripe_count;
2388
2389
2390 woffset = do_div (offset, width);
2391
2392 stripe_index = woffset / stripe_size;
2393
2394 *idp = ostid_id(&lsm->lsm_oinfo[stripe_index]->loi_oi);
2395 *offp = offset * stripe_size + woffset % stripe_size;
2396}
2397
2398static void
2399echo_client_page_debug_setup(struct lov_stripe_md *lsm,
2400 struct page *page, int rw, obd_id id,
2401 obd_off offset, obd_off count)
2402{
2403 char *addr;
2404 obd_off stripe_off;
2405 obd_id stripe_id;
2406 int delta;
2407
2408
2409 LASSERT(count == PAGE_CACHE_SIZE);
2410
2411 addr = kmap(page);
2412
2413 for (delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2414 if (rw == OBD_BRW_WRITE) {
2415 stripe_off = offset + delta;
2416 stripe_id = id;
2417 echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
2418 } else {
2419 stripe_off = 0xdeadbeef00c0ffeeULL;
2420 stripe_id = 0xdeadbeef00c0ffeeULL;
2421 }
2422 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
2423 stripe_off, stripe_id);
2424 }
2425
2426 kunmap(page);
2427}
2428
2429static int echo_client_page_debug_check(struct lov_stripe_md *lsm,
2430 struct page *page, obd_id id,
2431 obd_off offset, obd_off count)
2432{
2433 obd_off stripe_off;
2434 obd_id stripe_id;
2435 char *addr;
2436 int delta;
2437 int rc;
2438 int rc2;
2439
2440
2441 LASSERT(count == PAGE_CACHE_SIZE);
2442
2443 addr = kmap(page);
2444
2445 for (rc = delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
2446 stripe_off = offset + delta;
2447 stripe_id = id;
2448 echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
2449
2450 rc2 = block_debug_check("test_brw",
2451 addr + delta, OBD_ECHO_BLOCK_SIZE,
2452 stripe_off, stripe_id);
2453 if (rc2 != 0) {
2454 CERROR ("Error in echo object "LPX64"\n", id);
2455 rc = rc2;
2456 }
2457 }
2458
2459 kunmap(page);
2460 return rc;
2461}
2462
2463static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
2464 struct echo_object *eco, obd_off offset,
2465 obd_size count, int async,
2466 struct obd_trans_info *oti)
2467{
2468 struct lov_stripe_md *lsm = eco->eo_lsm;
2469 obd_count npages;
2470 struct brw_page *pga;
2471 struct brw_page *pgp;
2472 struct page **pages;
2473 obd_off off;
2474 int i;
2475 int rc;
2476 int verify;
2477 int gfp_mask;
2478 int brw_flags = 0;
2479 ENTRY;
2480
2481 verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
2482 (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
2483 (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
2484
2485 gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_IOFS : GFP_HIGHUSER;
2486
2487 LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
2488 LASSERT(lsm != NULL);
2489 LASSERT(ostid_id(&lsm->lsm_oi) == ostid_id(&oa->o_oi));
2490
2491 if (count <= 0 ||
2492 (count & (~CFS_PAGE_MASK)) != 0)
2493 RETURN(-EINVAL);
2494
2495
2496 npages = count >> PAGE_CACHE_SHIFT;
2497
2498 if (rw == OBD_BRW_WRITE)
2499 brw_flags = OBD_BRW_ASYNC;
2500
2501 OBD_ALLOC(pga, npages * sizeof(*pga));
2502 if (pga == NULL)
2503 RETURN(-ENOMEM);
2504
2505 OBD_ALLOC(pages, npages * sizeof(*pages));
2506 if (pages == NULL) {
2507 OBD_FREE(pga, npages * sizeof(*pga));
2508 RETURN(-ENOMEM);
2509 }
2510
2511 for (i = 0, pgp = pga, off = offset;
2512 i < npages;
2513 i++, pgp++, off += PAGE_CACHE_SIZE) {
2514
2515 LASSERT (pgp->pg == NULL);
2516
2517 rc = -ENOMEM;
2518 OBD_PAGE_ALLOC(pgp->pg, gfp_mask);
2519 if (pgp->pg == NULL)
2520 goto out;
2521
2522 pages[i] = pgp->pg;
2523 pgp->count = PAGE_CACHE_SIZE;
2524 pgp->off = off;
2525 pgp->flag = brw_flags;
2526
2527 if (verify)
2528 echo_client_page_debug_setup(lsm, pgp->pg, rw,
2529 ostid_id(&oa->o_oi), off,
2530 pgp->count);
2531 }
2532
2533
2534 LASSERT(ed->ed_next != NULL);
2535 rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
2536
2537 out:
2538 if (rc != 0 || rw != OBD_BRW_READ)
2539 verify = 0;
2540
2541 for (i = 0, pgp = pga; i < npages; i++, pgp++) {
2542 if (pgp->pg == NULL)
2543 continue;
2544
2545 if (verify) {
2546 int vrc;
2547 vrc = echo_client_page_debug_check(lsm, pgp->pg,
2548 ostid_id(&oa->o_oi),
2549 pgp->off, pgp->count);
2550 if (vrc != 0 && rc == 0)
2551 rc = vrc;
2552 }
2553 OBD_PAGE_FREE(pgp->pg);
2554 }
2555 OBD_FREE(pga, npages * sizeof(*pga));
2556 OBD_FREE(pages, npages * sizeof(*pages));
2557 RETURN(rc);
2558}
2559
2560static int echo_client_prep_commit(const struct lu_env *env,
2561 struct obd_export *exp, int rw,
2562 struct obdo *oa, struct echo_object *eco,
2563 obd_off offset, obd_size count,
2564 obd_size batch, struct obd_trans_info *oti,
2565 int async)
2566{
2567 struct lov_stripe_md *lsm = eco->eo_lsm;
2568 struct obd_ioobj ioo;
2569 struct niobuf_local *lnb;
2570 struct niobuf_remote *rnb;
2571 obd_off off;
2572 obd_size npages, tot_pages;
2573 int i, ret = 0, brw_flags = 0;
2574
2575 ENTRY;
2576
2577 if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 ||
2578 (lsm != NULL && ostid_id(&lsm->lsm_oi) != ostid_id(&oa->o_oi)))
2579 RETURN(-EINVAL);
2580
2581 npages = batch >> PAGE_CACHE_SHIFT;
2582 tot_pages = count >> PAGE_CACHE_SHIFT;
2583
2584 OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local));
2585 OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote));
2586
2587 if (lnb == NULL || rnb == NULL)
2588 GOTO(out, ret = -ENOMEM);
2589
2590 if (rw == OBD_BRW_WRITE && async)
2591 brw_flags |= OBD_BRW_ASYNC;
2592
2593 obdo_to_ioobj(oa, &ioo);
2594
2595 off = offset;
2596
2597 for(; tot_pages; tot_pages -= npages) {
2598 int lpages;
2599
2600 if (tot_pages < npages)
2601 npages = tot_pages;
2602
2603 for (i = 0; i < npages; i++, off += PAGE_CACHE_SIZE) {
2604 rnb[i].offset = off;
2605 rnb[i].len = PAGE_CACHE_SIZE;
2606 rnb[i].flags = brw_flags;
2607 }
2608
2609 ioo.ioo_bufcnt = npages;
2610 oti->oti_transno = 0;
2611
2612 lpages = npages;
2613 ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages,
2614 lnb, oti, NULL);
2615 if (ret != 0)
2616 GOTO(out, ret);
2617 LASSERT(lpages == npages);
2618
2619 for (i = 0; i < lpages; i++) {
2620 struct page *page = lnb[i].page;
2621
2622
2623 if (page == NULL && lnb[i].rc == 0)
2624 continue;
2625
2626 if (async)
2627 lnb[i].flags |= OBD_BRW_ASYNC;
2628
2629 if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
2630 (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
2631 (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
2632 continue;
2633
2634 if (rw == OBD_BRW_WRITE)
2635 echo_client_page_debug_setup(lsm, page, rw,
2636 ostid_id(&oa->o_oi),
2637 rnb[i].offset,
2638 rnb[i].len);
2639 else
2640 echo_client_page_debug_check(lsm, page,
2641 ostid_id(&oa->o_oi),
2642 rnb[i].offset,
2643 rnb[i].len);
2644 }
2645
2646 ret = obd_commitrw(env, rw, exp, oa, 1, &ioo,
2647 rnb, npages, lnb, oti, ret);
2648 if (ret != 0)
2649 GOTO(out, ret);
2650
2651
2652 memset(oti, 0, sizeof(*oti));
2653
2654
2655 lu_context_exit((struct lu_context *)&env->le_ctx);
2656 lu_context_enter((struct lu_context *)&env->le_ctx);
2657 }
2658
2659out:
2660 if (lnb)
2661 OBD_FREE(lnb, npages * sizeof(struct niobuf_local));
2662 if (rnb)
2663 OBD_FREE(rnb, npages * sizeof(struct niobuf_remote));
2664 RETURN(ret);
2665}
2666
2667static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
2668 struct obd_export *exp,
2669 struct obd_ioctl_data *data,
2670 struct obd_trans_info *dummy_oti)
2671{
2672 struct obd_device *obd = class_exp2obd(exp);
2673 struct echo_device *ed = obd2echo_dev(obd);
2674 struct echo_client_obd *ec = ed->ed_ec;
2675 struct obdo *oa = &data->ioc_obdo1;
2676 struct echo_object *eco;
2677 int rc;
2678 int async = 1;
2679 long test_mode;
2680 ENTRY;
2681
2682 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
2683
2684 rc = echo_get_object(&eco, ed, oa);
2685 if (rc)
2686 RETURN(rc);
2687
2688 oa->o_valid &= ~OBD_MD_FLHANDLE;
2689
2690
2691 test_mode = (long)data->ioc_pbuf1;
2692 if (test_mode == 1)
2693 async = 0;
2694
2695 if (ed->ed_next == NULL && test_mode != 3) {
2696 test_mode = 3;
2697 data->ioc_plen1 = data->ioc_count;
2698 }
2699
2700
2701 if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
2702 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
2703
2704 switch (test_mode) {
2705 case 1:
2706
2707 case 2:
2708 rc = echo_client_kbrw(ed, rw, oa,
2709 eco, data->ioc_offset,
2710 data->ioc_count, async, dummy_oti);
2711 break;
2712 case 3:
2713 rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa,
2714 eco, data->ioc_offset,
2715 data->ioc_count, data->ioc_plen1,
2716 dummy_oti, async);
2717 break;
2718 default:
2719 rc = -EINVAL;
2720 }
2721 echo_put_object(eco);
2722 RETURN(rc);
2723}
2724
2725static int
2726echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
2727 int mode, obd_off offset, obd_size nob)
2728{
2729 struct echo_device *ed = obd2echo_dev(exp->exp_obd);
2730 struct lustre_handle *ulh = &oa->o_handle;
2731 struct echo_object *eco;
2732 obd_off end;
2733 int rc;
2734 ENTRY;
2735
2736 if (ed->ed_next == NULL)
2737 RETURN(-EOPNOTSUPP);
2738
2739 if (!(mode == LCK_PR || mode == LCK_PW))
2740 RETURN(-EINVAL);
2741
2742 if ((offset & (~CFS_PAGE_MASK)) != 0 ||
2743 (nob & (~CFS_PAGE_MASK)) != 0)
2744 RETURN(-EINVAL);
2745
2746 rc = echo_get_object (&eco, ed, oa);
2747 if (rc != 0)
2748 RETURN(rc);
2749
2750 end = (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
2751 rc = cl_echo_enqueue(eco, offset, end, mode, &ulh->cookie);
2752 if (rc == 0) {
2753 oa->o_valid |= OBD_MD_FLHANDLE;
2754 CDEBUG(D_INFO, "Cookie is "LPX64"\n", ulh->cookie);
2755 }
2756 echo_put_object(eco);
2757 RETURN(rc);
2758}
2759
2760static int
2761echo_client_cancel(struct obd_export *exp, struct obdo *oa)
2762{
2763 struct echo_device *ed = obd2echo_dev(exp->exp_obd);
2764 __u64 cookie = oa->o_handle.cookie;
2765
2766 if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
2767 return -EINVAL;
2768
2769 CDEBUG(D_INFO, "Cookie is "LPX64"\n", cookie);
2770 return cl_echo_cancel(ed, cookie);
2771}
2772
2773static int
2774echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2775 void *karg, void *uarg)
2776{
2777 struct obd_device *obd = exp->exp_obd;
2778 struct echo_device *ed = obd2echo_dev(obd);
2779 struct echo_client_obd *ec = ed->ed_ec;
2780 struct echo_object *eco;
2781 struct obd_ioctl_data *data = karg;
2782 struct obd_trans_info dummy_oti;
2783 struct lu_env *env;
2784 struct oti_req_ack_lock *ack_lock;
2785 struct obdo *oa;
2786 struct lu_fid fid;
2787 int rw = OBD_BRW_READ;
2788 int rc = 0;
2789 int i;
2790 ENTRY;
2791
2792 memset(&dummy_oti, 0, sizeof(dummy_oti));
2793
2794 oa = &data->ioc_obdo1;
2795 if (!(oa->o_valid & OBD_MD_FLGROUP)) {
2796 oa->o_valid |= OBD_MD_FLGROUP;
2797 ostid_set_seq_echo(&oa->o_oi);
2798 }
2799
2800
2801 rc = ostid_to_fid(&fid, &oa->o_oi, 0);
2802 if (rc < 0)
2803 RETURN(rc);
2804
2805 OBD_ALLOC_PTR(env);
2806 if (env == NULL)
2807 RETURN(-ENOMEM);
2808
2809 rc = lu_env_init(env, LCT_DT_THREAD);
2810 if (rc)
2811 GOTO(out, rc = -ENOMEM);
2812
2813 switch (cmd) {
2814 case OBD_IOC_CREATE:
2815 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2816 GOTO (out, rc = -EPERM);
2817
2818 rc = echo_create_object(env, ed, 1, oa, data->ioc_pbuf1,
2819 data->ioc_plen1, &dummy_oti);
2820 GOTO(out, rc);
2821
2822 case OBD_IOC_ECHO_MD: {
2823 int count;
2824 int cmd;
2825 char *dir = NULL;
2826 int dirlen;
2827 __u64 id;
2828
2829 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2830 GOTO(out, rc = -EPERM);
2831
2832 count = data->ioc_count;
2833 cmd = data->ioc_command;
2834
2835 id = ostid_id(&data->ioc_obdo2.o_oi);
2836
2837 dirlen = data->ioc_plen1;
2838 OBD_ALLOC(dir, dirlen + 1);
2839 if (dir == NULL)
2840 GOTO(out, rc = -ENOMEM);
2841
2842 if (copy_from_user(dir, data->ioc_pbuf1, dirlen)) {
2843 OBD_FREE(dir, data->ioc_plen1 + 1);
2844 GOTO(out, rc = -EFAULT);
2845 }
2846
2847 rc = echo_md_handler(ed, cmd, dir, dirlen, id, count, data);
2848 OBD_FREE(dir, dirlen + 1);
2849 GOTO(out, rc);
2850 }
2851 case OBD_IOC_ECHO_ALLOC_SEQ: {
2852 struct lu_env *cl_env;
2853 int refcheck;
2854 __u64 seq;
2855 int max_count;
2856
2857 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2858 GOTO(out, rc = -EPERM);
2859
2860 cl_env = cl_env_get(&refcheck);
2861 if (IS_ERR(cl_env))
2862 GOTO(out, rc = PTR_ERR(cl_env));
2863
2864 rc = lu_env_refill_by_tags(cl_env, ECHO_MD_CTX_TAG,
2865 ECHO_MD_SES_TAG);
2866 if (rc != 0) {
2867 cl_env_put(cl_env, &refcheck);
2868 GOTO(out, rc);
2869 }
2870
2871 rc = seq_client_get_seq(cl_env, ed->ed_cl_seq, &seq);
2872 cl_env_put(cl_env, &refcheck);
2873 if (rc < 0) {
2874 CERROR("%s: Can not alloc seq: rc = %d\n",
2875 obd->obd_name, rc);
2876 GOTO(out, rc);
2877 }
2878
2879 if (copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1))
2880 return -EFAULT;
2881
2882 max_count = LUSTRE_METADATA_SEQ_MAX_WIDTH;
2883 if (copy_to_user(data->ioc_pbuf2, &max_count,
2884 data->ioc_plen2))
2885 return -EFAULT;
2886 GOTO(out, rc);
2887 }
2888 case OBD_IOC_DESTROY:
2889 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2890 GOTO (out, rc = -EPERM);
2891
2892 rc = echo_get_object(&eco, ed, oa);
2893 if (rc == 0) {
2894 rc = obd_destroy(env, ec->ec_exp, oa, eco->eo_lsm,
2895 &dummy_oti, NULL, NULL);
2896 if (rc == 0)
2897 eco->eo_deleted = 1;
2898 echo_put_object(eco);
2899 }
2900 GOTO(out, rc);
2901
2902 case OBD_IOC_GETATTR:
2903 rc = echo_get_object(&eco, ed, oa);
2904 if (rc == 0) {
2905 struct obd_info oinfo = { { { 0 } } };
2906 oinfo.oi_md = eco->eo_lsm;
2907 oinfo.oi_oa = oa;
2908 rc = obd_getattr(env, ec->ec_exp, &oinfo);
2909 echo_put_object(eco);
2910 }
2911 GOTO(out, rc);
2912
2913 case OBD_IOC_SETATTR:
2914 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2915 GOTO (out, rc = -EPERM);
2916
2917 rc = echo_get_object(&eco, ed, oa);
2918 if (rc == 0) {
2919 struct obd_info oinfo = { { { 0 } } };
2920 oinfo.oi_oa = oa;
2921 oinfo.oi_md = eco->eo_lsm;
2922
2923 rc = obd_setattr(env, ec->ec_exp, &oinfo, NULL);
2924 echo_put_object(eco);
2925 }
2926 GOTO(out, rc);
2927
2928 case OBD_IOC_BRW_WRITE:
2929 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2930 GOTO (out, rc = -EPERM);
2931
2932 rw = OBD_BRW_WRITE;
2933
2934 case OBD_IOC_BRW_READ:
2935 rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti);
2936 GOTO(out, rc);
2937
2938 case ECHO_IOC_GET_STRIPE:
2939 rc = echo_get_object(&eco, ed, oa);
2940 if (rc == 0) {
2941 rc = echo_copyout_lsm(eco->eo_lsm, data->ioc_pbuf1,
2942 data->ioc_plen1);
2943 echo_put_object(eco);
2944 }
2945 GOTO(out, rc);
2946
2947 case ECHO_IOC_SET_STRIPE:
2948 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2949 GOTO (out, rc = -EPERM);
2950
2951 if (data->ioc_pbuf1 == NULL) {
2952 rc = echo_get_object(&eco, ed, oa);
2953 if (rc == 0) {
2954 eco->eo_deleted = 1;
2955 echo_put_object(eco);
2956 }
2957 } else {
2958 rc = echo_create_object(env, ed, 0, oa,
2959 data->ioc_pbuf1,
2960 data->ioc_plen1, &dummy_oti);
2961 }
2962 GOTO (out, rc);
2963
2964 case ECHO_IOC_ENQUEUE:
2965 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
2966 GOTO (out, rc = -EPERM);
2967
2968 rc = echo_client_enqueue(exp, oa,
2969 data->ioc_conn1,
2970 data->ioc_offset,
2971 data->ioc_count);
2972 GOTO (out, rc);
2973
2974 case ECHO_IOC_CANCEL:
2975 rc = echo_client_cancel(exp, oa);
2976 GOTO (out, rc);
2977
2978 default:
2979 CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
2980 GOTO (out, rc = -ENOTTY);
2981 }
2982
2983 EXIT;
2984out:
2985 lu_env_fini(env);
2986 OBD_FREE_PTR(env);
2987
2988
2989 for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
2990 i++, ack_lock++) {
2991 if (!ack_lock->mode)
2992 break;
2993 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
2994 }
2995
2996 return rc;
2997}
2998
2999static int echo_client_setup(const struct lu_env *env,
3000 struct obd_device *obddev, struct lustre_cfg *lcfg)
3001{
3002 struct echo_client_obd *ec = &obddev->u.echo_client;
3003 struct obd_device *tgt;
3004 struct obd_uuid echo_uuid = { "ECHO_UUID" };
3005 struct obd_connect_data *ocd = NULL;
3006 int rc;
3007 ENTRY;
3008
3009 if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
3010 CERROR("requires a TARGET OBD name\n");
3011 RETURN(-EINVAL);
3012 }
3013
3014 tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
3015 if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
3016 CERROR("device not attached or not set up (%s)\n",
3017 lustre_cfg_string(lcfg, 1));
3018 RETURN(-EINVAL);
3019 }
3020
3021 spin_lock_init(&ec->ec_lock);
3022 INIT_LIST_HEAD (&ec->ec_objects);
3023 INIT_LIST_HEAD (&ec->ec_locks);
3024 ec->ec_unique = 0;
3025 ec->ec_nstripes = 0;
3026
3027 if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
3028 lu_context_tags_update(ECHO_MD_CTX_TAG);
3029 lu_session_tags_update(ECHO_MD_SES_TAG);
3030 RETURN(0);
3031 }
3032
3033 OBD_ALLOC(ocd, sizeof(*ocd));
3034 if (ocd == NULL) {
3035 CERROR("Can't alloc ocd connecting to %s\n",
3036 lustre_cfg_string(lcfg, 1));
3037 return -ENOMEM;
3038 }
3039
3040 ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
3041 OBD_CONNECT_BRW_SIZE |
3042 OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
3043 OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
3044 OBD_CONNECT_FID;
3045 ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
3046 ocd->ocd_version = LUSTRE_VERSION_CODE;
3047 ocd->ocd_group = FID_SEQ_ECHO;
3048
3049 rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
3050 if (rc == 0) {
3051
3052 spin_lock(&tgt->obd_dev_lock);
3053 list_del_init(&ec->ec_exp->exp_obd_chain_timed);
3054 spin_unlock(&tgt->obd_dev_lock);
3055 }
3056
3057 OBD_FREE(ocd, sizeof(*ocd));
3058
3059 if (rc != 0) {
3060 CERROR("fail to connect to device %s\n",
3061 lustre_cfg_string(lcfg, 1));
3062 return (rc);
3063 }
3064
3065 RETURN(rc);
3066}
3067
3068static int echo_client_cleanup(struct obd_device *obddev)
3069{
3070 struct echo_device *ed = obd2echo_dev(obddev);
3071 struct echo_client_obd *ec = &obddev->u.echo_client;
3072 int rc;
3073 ENTRY;
3074
3075
3076 if (ed == NULL )
3077 RETURN(0);
3078
3079 if (ed->ed_next_ismd) {
3080 lu_context_tags_clear(ECHO_MD_CTX_TAG);
3081 lu_session_tags_clear(ECHO_MD_SES_TAG);
3082 RETURN(0);
3083 }
3084
3085 if (!list_empty(&obddev->obd_exports)) {
3086 CERROR("still has clients!\n");
3087 RETURN(-EBUSY);
3088 }
3089
3090 LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
3091 rc = obd_disconnect(ec->ec_exp);
3092 if (rc != 0)
3093 CERROR("fail to disconnect device: %d\n", rc);
3094
3095 RETURN(rc);
3096}
3097
3098static int echo_client_connect(const struct lu_env *env,
3099 struct obd_export **exp,
3100 struct obd_device *src, struct obd_uuid *cluuid,
3101 struct obd_connect_data *data, void *localdata)
3102{
3103 int rc;
3104 struct lustre_handle conn = { 0 };
3105
3106 ENTRY;
3107 rc = class_connect(&conn, src, cluuid);
3108 if (rc == 0) {
3109 *exp = class_conn2export(&conn);
3110 }
3111
3112 RETURN (rc);
3113}
3114
3115static int echo_client_disconnect(struct obd_export *exp)
3116{
3117#if 0
3118 struct obd_device *obd;
3119 struct echo_client_obd *ec;
3120 struct ec_lock *ecl;
3121#endif
3122 int rc;
3123 ENTRY;
3124
3125 if (exp == NULL)
3126 GOTO(out, rc = -EINVAL);
3127
3128#if 0
3129 obd = exp->exp_obd;
3130 ec = &obd->u.echo_client;
3131
3132
3133 while (!list_empty (&exp->exp_ec_data.eced_locks)) {
3134 ecl = list_entry (exp->exp_ec_data.eced_locks.next,
3135 struct ec_lock, ecl_exp_chain);
3136 list_del (&ecl->ecl_exp_chain);
3137
3138 rc = obd_cancel(ec->ec_exp, ecl->ecl_object->eco_lsm,
3139 ecl->ecl_mode, &ecl->ecl_lock_handle);
3140
3141 CDEBUG (D_INFO, "Cancel lock on object "LPX64" on disconnect "
3142 "(%d)\n", ecl->ecl_object->eco_id, rc);
3143
3144 echo_put_object (ecl->ecl_object);
3145 OBD_FREE (ecl, sizeof (*ecl));
3146 }
3147#endif
3148
3149 rc = class_disconnect(exp);
3150 GOTO(out, rc);
3151 out:
3152 return rc;
3153}
3154
3155static struct obd_ops echo_client_obd_ops = {
3156 .o_owner = THIS_MODULE,
3157
3158#if 0
3159 .o_setup = echo_client_setup,
3160 .o_cleanup = echo_client_cleanup,
3161#endif
3162
3163 .o_iocontrol = echo_client_iocontrol,
3164 .o_connect = echo_client_connect,
3165 .o_disconnect = echo_client_disconnect
3166};
3167
3168int echo_client_init(void)
3169{
3170 struct lprocfs_static_vars lvars = { 0 };
3171 int rc;
3172
3173 lprocfs_echo_init_vars(&lvars);
3174
3175 rc = lu_kmem_init(echo_caches);
3176 if (rc == 0) {
3177 rc = class_register_type(&echo_client_obd_ops, NULL,
3178 lvars.module_vars,
3179 LUSTRE_ECHO_CLIENT_NAME,
3180 &echo_device_type);
3181 if (rc)
3182 lu_kmem_fini(echo_caches);
3183 }
3184 return rc;
3185}
3186
3187void echo_client_exit(void)
3188{
3189 class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
3190 lu_kmem_fini(echo_caches);
3191}
3192
3193static int __init obdecho_init(void)
3194{
3195 struct lprocfs_static_vars lvars;
3196 int rc;
3197
3198 ENTRY;
3199 LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
3200
3201 LASSERT(PAGE_CACHE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
3202
3203 lprocfs_echo_init_vars(&lvars);
3204
3205
3206 rc = echo_client_init();
3207
3208 RETURN(rc);
3209}
3210
3211static void obdecho_exit(void)
3212{
3213 echo_client_exit();
3214
3215}
3216
3217MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3218MODULE_DESCRIPTION("Lustre Testing Echo OBD driver");
3219MODULE_LICENSE("GPL");
3220
3221cfs_module(obdecho, LUSTRE_VERSION_STRING, obdecho_init, obdecho_exit);
3222
3223
3224