1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42#define DEBUG_SUBSYSTEM S_ECHO
43
44#include <obd_support.h>
45#include <obd_class.h>
46#include <lustre_debug.h>
47#include <lustre_dlm.h>
48#include <lprocfs_status.h>
49
50#include "echo_internal.h"
51
52
53
54#define ECHO_INIT_OID 0x10000000ULL
55#define ECHO_HANDLE_MAGIC 0xabcd0123fedc9876ULL
56
57#define ECHO_PERSISTENT_PAGES (ECHO_PERSISTENT_SIZE >> PAGE_CACHE_SHIFT)
58static struct page *echo_persistent_pages[ECHO_PERSISTENT_PAGES];
59
60enum {
61 LPROC_ECHO_READ_BYTES = 1,
62 LPROC_ECHO_WRITE_BYTES = 2,
63 LPROC_ECHO_LAST = LPROC_ECHO_WRITE_BYTES +1
64};
65
66static int echo_connect(const struct lu_env *env,
67 struct obd_export **exp, struct obd_device *obd,
68 struct obd_uuid *cluuid, struct obd_connect_data *data,
69 void *localdata)
70{
71 struct lustre_handle conn = { 0 };
72 int rc;
73
74 data->ocd_connect_flags &= ECHO_CONNECT_SUPPORTED;
75 rc = class_connect(&conn, obd, cluuid);
76 if (rc) {
77 CERROR("can't connect %d\n", rc);
78 return rc;
79 }
80 *exp = class_conn2export(&conn);
81
82 return 0;
83}
84
85static int echo_disconnect(struct obd_export *exp)
86{
87 LASSERT (exp != NULL);
88
89 return server_disconnect_export(exp);
90}
91
92static int echo_init_export(struct obd_export *exp)
93{
94 return ldlm_init_export(exp);
95}
96
97static int echo_destroy_export(struct obd_export *exp)
98{
99 ENTRY;
100
101 target_destroy_export(exp);
102 ldlm_destroy_export(exp);
103
104 RETURN(0);
105}
106
107 static __u64 echo_next_id(struct obd_device *obddev)
108{
109 obd_id id;
110
111 spin_lock(&obddev->u.echo.eo_lock);
112 id = ++obddev->u.echo.eo_lastino;
113 spin_unlock(&obddev->u.echo.eo_lock);
114
115 return id;
116}
117
118static int echo_create(const struct lu_env *env, struct obd_export *exp,
119 struct obdo *oa, struct lov_stripe_md **ea,
120 struct obd_trans_info *oti)
121{
122 struct obd_device *obd = class_exp2obd(exp);
123
124 if (!obd) {
125 CERROR("invalid client cookie "LPX64"\n",
126 exp->exp_handle.h_cookie);
127 return -EINVAL;
128 }
129
130 if (!(oa->o_mode && S_IFMT)) {
131 CERROR("echo obd: no type!\n");
132 return -ENOENT;
133 }
134
135 if (!(oa->o_valid & OBD_MD_FLTYPE)) {
136 CERROR("invalid o_valid "LPX64"\n", oa->o_valid);
137 return -EINVAL;
138 }
139
140 ostid_set_seq_echo(&oa->o_oi);
141 ostid_set_id(&oa->o_oi, echo_next_id(obd));
142 oa->o_valid = OBD_MD_FLID;
143
144 return 0;
145}
146
147static int echo_destroy(const struct lu_env *env, struct obd_export *exp,
148 struct obdo *oa, struct lov_stripe_md *ea,
149 struct obd_trans_info *oti, struct obd_export *md_exp,
150 void *capa)
151{
152 struct obd_device *obd = class_exp2obd(exp);
153
154 ENTRY;
155 if (!obd) {
156 CERROR("invalid client cookie "LPX64"\n",
157 exp->exp_handle.h_cookie);
158 RETURN(-EINVAL);
159 }
160
161 if (!(oa->o_valid & OBD_MD_FLID)) {
162 CERROR("obdo missing FLID valid flag: "LPX64"\n", oa->o_valid);
163 RETURN(-EINVAL);
164 }
165
166 if (ostid_id(&oa->o_oi) > obd->u.echo.eo_lastino ||
167 ostid_id(&oa->o_oi) < ECHO_INIT_OID) {
168 CERROR("bad destroy objid: "DOSTID"\n", POSTID(&oa->o_oi));
169 RETURN(-EINVAL);
170 }
171
172 RETURN(0);
173}
174
175static int echo_getattr(const struct lu_env *env, struct obd_export *exp,
176 struct obd_info *oinfo)
177{
178 struct obd_device *obd = class_exp2obd(exp);
179 obd_id id = ostid_id(&oinfo->oi_oa->o_oi);
180
181 ENTRY;
182 if (!obd) {
183 CERROR("invalid client cookie "LPX64"\n",
184 exp->exp_handle.h_cookie);
185 RETURN(-EINVAL);
186 }
187
188 if (!(oinfo->oi_oa->o_valid & OBD_MD_FLID)) {
189 CERROR("obdo missing FLID valid flag: "LPX64"\n",
190 oinfo->oi_oa->o_valid);
191 RETURN(-EINVAL);
192 }
193
194 obdo_cpy_md(oinfo->oi_oa, &obd->u.echo.eo_oa, oinfo->oi_oa->o_valid);
195 ostid_set_seq_echo(&oinfo->oi_oa->o_oi);
196 ostid_set_id(&oinfo->oi_oa->o_oi, id);
197
198 RETURN(0);
199}
200
201static int echo_setattr(const struct lu_env *env, struct obd_export *exp,
202 struct obd_info *oinfo, struct obd_trans_info *oti)
203{
204 struct obd_device *obd = class_exp2obd(exp);
205
206 ENTRY;
207 if (!obd) {
208 CERROR("invalid client cookie "LPX64"\n",
209 exp->exp_handle.h_cookie);
210 RETURN(-EINVAL);
211 }
212
213 if (!(oinfo->oi_oa->o_valid & OBD_MD_FLID)) {
214 CERROR("obdo missing FLID valid flag: "LPX64"\n",
215 oinfo->oi_oa->o_valid);
216 RETURN(-EINVAL);
217 }
218
219 memcpy(&obd->u.echo.eo_oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
220
221 if (ostid_id(&oinfo->oi_oa->o_oi) & 4) {
222
223 ldlm_lock_addref (&obd->u.echo.eo_nl_lock, LCK_NL);
224 oti->oti_ack_locks[0].mode = LCK_NL;
225 oti->oti_ack_locks[0].lock = obd->u.echo.eo_nl_lock;
226 }
227
228 RETURN(0);
229}
230
231static void
232echo_page_debug_setup(struct page *page, int rw, obd_id id,
233 __u64 offset, int len)
234{
235 int page_offset = offset & ~CFS_PAGE_MASK;
236 char *addr = ((char *)kmap(page)) + page_offset;
237
238 if (len % OBD_ECHO_BLOCK_SIZE != 0)
239 CERROR("Unexpected block size %d\n", len);
240
241 while (len > 0) {
242 if (rw & OBD_BRW_READ)
243 block_debug_setup(addr, OBD_ECHO_BLOCK_SIZE,
244 offset, id);
245 else
246 block_debug_setup(addr, OBD_ECHO_BLOCK_SIZE,
247 0xecc0ecc0ecc0ecc0ULL,
248 0xecc0ecc0ecc0ecc0ULL);
249
250 addr += OBD_ECHO_BLOCK_SIZE;
251 offset += OBD_ECHO_BLOCK_SIZE;
252 len -= OBD_ECHO_BLOCK_SIZE;
253 }
254
255 kunmap(page);
256}
257
258static int
259echo_page_debug_check(struct page *page, obd_id id,
260 __u64 offset, int len)
261{
262 int page_offset = offset & ~CFS_PAGE_MASK;
263 char *addr = ((char *)kmap(page)) + page_offset;
264 int rc = 0;
265 int rc2;
266
267 if (len % OBD_ECHO_BLOCK_SIZE != 0)
268 CERROR("Unexpected block size %d\n", len);
269
270 while (len > 0) {
271 rc2 = block_debug_check("echo", addr, OBD_ECHO_BLOCK_SIZE,
272 offset, id);
273
274 if (rc2 != 0 && rc == 0)
275 rc = rc2;
276
277 addr += OBD_ECHO_BLOCK_SIZE;
278 offset += OBD_ECHO_BLOCK_SIZE;
279 len -= OBD_ECHO_BLOCK_SIZE;
280 }
281
282 kunmap(page);
283
284 return (rc);
285}
286
287
288#define DESC_PRIV 0x10293847
289
290static int echo_map_nb_to_lb(struct obdo *oa, struct obd_ioobj *obj,
291 struct niobuf_remote *nb, int *pages,
292 struct niobuf_local *lb, int cmd, int *left)
293{
294 int gfp_mask = (ostid_id(&obj->ioo_oid) & 1) ?
295 GFP_HIGHUSER : GFP_IOFS;
296 int ispersistent = ostid_id(&obj->ioo_oid) == ECHO_PERSISTENT_OBJID;
297 int debug_setup = (!ispersistent &&
298 (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
299 (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
300 struct niobuf_local *res = lb;
301 obd_off offset = nb->offset;
302 int len = nb->len;
303
304 while (len > 0) {
305 int plen = PAGE_CACHE_SIZE - (offset & (PAGE_CACHE_SIZE-1));
306 if (len < plen)
307 plen = len;
308
309
310 if (*left == 0)
311 return -EINVAL;
312
313 res->lnb_file_offset = offset;
314 res->len = plen;
315 LASSERT((res->lnb_file_offset & ~CFS_PAGE_MASK) + res->len <=
316 PAGE_CACHE_SIZE);
317
318 if (ispersistent &&
319 ((res->lnb_file_offset >> PAGE_CACHE_SHIFT) <
320 ECHO_PERSISTENT_PAGES)) {
321 res->page =
322 echo_persistent_pages[res->lnb_file_offset >>
323 PAGE_CACHE_SHIFT];
324
325 get_page (res->page);
326 } else {
327 OBD_PAGE_ALLOC(res->page, gfp_mask);
328 if (res->page == NULL) {
329 CERROR("can't get page for id " DOSTID"\n",
330 POSTID(&obj->ioo_oid));
331 return -ENOMEM;
332 }
333 }
334
335 CDEBUG(D_PAGE, "$$$$ get page %p @ "LPU64" for %d\n",
336 res->page, res->lnb_file_offset, res->len);
337
338 if (cmd & OBD_BRW_READ)
339 res->rc = res->len;
340
341 if (debug_setup)
342 echo_page_debug_setup(res->page, cmd,
343 ostid_id(&obj->ioo_oid),
344 res->lnb_file_offset, res->len);
345
346 offset += plen;
347 len -= plen;
348 res++;
349
350 (*left)--;
351 (*pages)++;
352 }
353
354 return 0;
355}
356
357static int echo_finalize_lb(struct obdo *oa, struct obd_ioobj *obj,
358 struct niobuf_remote *rb, int *pgs,
359 struct niobuf_local *lb, int verify)
360{
361 struct niobuf_local *res = lb;
362 obd_off start = rb->offset >> PAGE_CACHE_SHIFT;
363 obd_off end = (rb->offset + rb->len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
364 int count = (int)(end - start);
365 int rc = 0;
366 int i;
367
368 for (i = 0; i < count; i++, (*pgs) ++, res++) {
369 struct page *page = res->page;
370 void *addr;
371
372 if (page == NULL) {
373 CERROR("null page objid "LPU64":%p, buf %d/%d\n",
374 ostid_id(&obj->ioo_oid), page, i,
375 obj->ioo_bufcnt);
376 return -EFAULT;
377 }
378
379 addr = kmap(page);
380
381 CDEBUG(D_PAGE, "$$$$ use page %p, addr %p@"LPU64"\n",
382 res->page, addr, res->lnb_file_offset);
383
384 if (verify) {
385 int vrc = echo_page_debug_check(page,
386 ostid_id(&obj->ioo_oid),
387 res->lnb_file_offset,
388 res->len);
389
390 if (vrc != 0 && rc == 0)
391 rc = vrc;
392 }
393
394 kunmap(page);
395
396 OBD_PAGE_FREE(page);
397 }
398
399 return rc;
400}
401
402static int echo_preprw(const struct lu_env *env, int cmd,
403 struct obd_export *export, struct obdo *oa,
404 int objcount, struct obd_ioobj *obj,
405 struct niobuf_remote *nb, int *pages,
406 struct niobuf_local *res, struct obd_trans_info *oti,
407 struct lustre_capa *unused)
408{
409 struct obd_device *obd;
410 int tot_bytes = 0;
411 int rc = 0;
412 int i, left;
413 ENTRY;
414
415 obd = export->exp_obd;
416 if (obd == NULL)
417 RETURN(-EINVAL);
418
419
420 oa->o_valid &= ~(OBD_MD_FLBLOCKS | OBD_MD_FLGRANT);
421
422 memset(res, 0, sizeof(*res) * *pages);
423
424 CDEBUG(D_PAGE, "%s %d obdos with %d IOs\n",
425 cmd == OBD_BRW_READ ? "reading" : "writing", objcount, *pages);
426
427 if (oti)
428 oti->oti_handle = (void *)DESC_PRIV;
429
430 left = *pages;
431 *pages = 0;
432
433 for (i = 0; i < objcount; i++, obj++) {
434 int j;
435
436 for (j = 0 ; j < obj->ioo_bufcnt ; j++, nb++) {
437
438 rc = echo_map_nb_to_lb(oa, obj, nb, pages,
439 res + *pages, cmd, &left);
440 if (rc)
441 GOTO(preprw_cleanup, rc);
442
443 tot_bytes += nb->len;
444 }
445 }
446
447 atomic_add(*pages, &obd->u.echo.eo_prep);
448
449 if (cmd & OBD_BRW_READ)
450 lprocfs_counter_add(obd->obd_stats, LPROC_ECHO_READ_BYTES,
451 tot_bytes);
452 else
453 lprocfs_counter_add(obd->obd_stats, LPROC_ECHO_WRITE_BYTES,
454 tot_bytes);
455
456 CDEBUG(D_PAGE, "%d pages allocated after prep\n",
457 atomic_read(&obd->u.echo.eo_prep));
458
459 RETURN(0);
460
461preprw_cleanup:
462
463
464
465
466
467 CERROR("cleaning up %u pages (%d obdos)\n", *pages, objcount);
468 for (i = 0; i < *pages; i++) {
469 kunmap(res[i].page);
470
471
472 OBD_PAGE_FREE(res[i].page);
473 res[i].page = NULL;
474 atomic_dec(&obd->u.echo.eo_prep);
475 }
476
477 return rc;
478}
479
480static int echo_commitrw(const struct lu_env *env, int cmd,
481 struct obd_export *export, struct obdo *oa,
482 int objcount, struct obd_ioobj *obj,
483 struct niobuf_remote *rb, int niocount,
484 struct niobuf_local *res, struct obd_trans_info *oti,
485 int rc)
486{
487 struct obd_device *obd;
488 int pgs = 0;
489 int i;
490 ENTRY;
491
492 obd = export->exp_obd;
493 if (obd == NULL)
494 RETURN(-EINVAL);
495
496 if (rc)
497 GOTO(commitrw_cleanup, rc);
498
499 if ((cmd & OBD_BRW_RWMASK) == OBD_BRW_READ) {
500 CDEBUG(D_PAGE, "reading %d obdos with %d IOs\n",
501 objcount, niocount);
502 } else {
503 CDEBUG(D_PAGE, "writing %d obdos with %d IOs\n",
504 objcount, niocount);
505 }
506
507 if (niocount && res == NULL) {
508 CERROR("NULL res niobuf with niocount %d\n", niocount);
509 RETURN(-EINVAL);
510 }
511
512 LASSERT(oti == NULL || oti->oti_handle == (void *)DESC_PRIV);
513
514 for (i = 0; i < objcount; i++, obj++) {
515 int verify = (rc == 0 &&
516 ostid_id(&obj->ioo_oid) != ECHO_PERSISTENT_OBJID &&
517 (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
518 (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
519 int j;
520
521 for (j = 0 ; j < obj->ioo_bufcnt ; j++, rb++) {
522 int vrc = echo_finalize_lb(oa, obj, rb, &pgs, &res[pgs],
523 verify);
524 if (vrc == 0)
525 continue;
526
527 if (vrc == -EFAULT)
528 GOTO(commitrw_cleanup, rc = vrc);
529
530 if (rc == 0)
531 rc = vrc;
532 }
533
534 }
535
536 atomic_sub(pgs, &obd->u.echo.eo_prep);
537
538 CDEBUG(D_PAGE, "%d pages remain after commit\n",
539 atomic_read(&obd->u.echo.eo_prep));
540 RETURN(rc);
541
542commitrw_cleanup:
543 atomic_sub(pgs, &obd->u.echo.eo_prep);
544
545 CERROR("cleaning up %d pages (%d obdos)\n",
546 niocount - pgs - 1, objcount);
547
548 while (pgs < niocount) {
549 struct page *page = res[pgs++].page;
550
551 if (page == NULL)
552 continue;
553
554
555 OBD_PAGE_FREE(page);
556 atomic_dec(&obd->u.echo.eo_prep);
557 }
558 return rc;
559}
560
561static int echo_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
562{
563 struct lprocfs_static_vars lvars;
564 int rc;
565 __u64 lock_flags = 0;
566 struct ldlm_res_id res_id = {.name = {1}};
567 char ns_name[48];
568 ENTRY;
569
570 obd->u.echo.eo_obt.obt_magic = OBT_MAGIC;
571 spin_lock_init(&obd->u.echo.eo_lock);
572 obd->u.echo.eo_lastino = ECHO_INIT_OID;
573
574 sprintf(ns_name, "echotgt-%s", obd->obd_uuid.uuid);
575 obd->obd_namespace = ldlm_namespace_new(obd, ns_name,
576 LDLM_NAMESPACE_SERVER,
577 LDLM_NAMESPACE_MODEST,
578 LDLM_NS_TYPE_OST);
579 if (obd->obd_namespace == NULL) {
580 LBUG();
581 RETURN(-ENOMEM);
582 }
583
584 rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, LDLM_PLAIN,
585 NULL, LCK_NL, &lock_flags, NULL,
586 ldlm_completion_ast, NULL, NULL, 0,
587 LVB_T_NONE, NULL, &obd->u.echo.eo_nl_lock);
588 LASSERT (rc == ELDLM_OK);
589
590 lprocfs_echo_init_vars(&lvars);
591 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 &&
592 lprocfs_alloc_obd_stats(obd, LPROC_ECHO_LAST) == 0) {
593 lprocfs_counter_init(obd->obd_stats, LPROC_ECHO_READ_BYTES,
594 LPROCFS_CNTR_AVGMINMAX,
595 "read_bytes", "bytes");
596 lprocfs_counter_init(obd->obd_stats, LPROC_ECHO_WRITE_BYTES,
597 LPROCFS_CNTR_AVGMINMAX,
598 "write_bytes", "bytes");
599 }
600
601 ptlrpc_init_client (LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
602 "echo_ldlm_cb_client", &obd->obd_ldlm_client);
603 RETURN(0);
604}
605
606static int echo_cleanup(struct obd_device *obd)
607{
608 int leaked;
609 ENTRY;
610
611 lprocfs_obd_cleanup(obd);
612 lprocfs_free_obd_stats(obd);
613
614 ldlm_lock_decref(&obd->u.echo.eo_nl_lock, LCK_NL);
615
616
617
618 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE, cfs_time_seconds(1));
619
620 ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force);
621 obd->obd_namespace = NULL;
622
623 leaked = atomic_read(&obd->u.echo.eo_prep);
624 if (leaked != 0)
625 CERROR("%d prep/commitrw pages leaked\n", leaked);
626
627 RETURN(0);
628}
629
630struct obd_ops echo_obd_ops = {
631 .o_owner = THIS_MODULE,
632 .o_connect = echo_connect,
633 .o_disconnect = echo_disconnect,
634 .o_init_export = echo_init_export,
635 .o_destroy_export = echo_destroy_export,
636 .o_create = echo_create,
637 .o_destroy = echo_destroy,
638 .o_getattr = echo_getattr,
639 .o_setattr = echo_setattr,
640 .o_preprw = echo_preprw,
641 .o_commitrw = echo_commitrw,
642 .o_setup = echo_setup,
643 .o_cleanup = echo_cleanup
644};
645
646void echo_persistent_pages_fini(void)
647{
648 int i;
649
650 for (i = 0; i < ECHO_PERSISTENT_PAGES; i++)
651 if (echo_persistent_pages[i] != NULL) {
652 OBD_PAGE_FREE(echo_persistent_pages[i]);
653 echo_persistent_pages[i] = NULL;
654 }
655}
656
657int echo_persistent_pages_init(void)
658{
659 struct page *pg;
660 int i;
661
662 for (i = 0; i < ECHO_PERSISTENT_PAGES; i++) {
663 int gfp_mask = (i < ECHO_PERSISTENT_PAGES/2) ?
664 GFP_IOFS : GFP_HIGHUSER;
665
666 OBD_PAGE_ALLOC(pg, gfp_mask);
667 if (pg == NULL) {
668 echo_persistent_pages_fini ();
669 return (-ENOMEM);
670 }
671
672 memset (kmap (pg), 0, PAGE_CACHE_SIZE);
673 kunmap (pg);
674
675 echo_persistent_pages[i] = pg;
676 }
677
678 return (0);
679}
680