1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37#define DEBUG_SUBSYSTEM S_LOV
38
39#include "../../include/linux/libcfs/libcfs.h"
40
41#include "../include/obd_class.h"
42#include "../include/lustre/lustre_idl.h"
43#include "lov_internal.h"
44
45static void lov_init_set(struct lov_request_set *set)
46{
47 set->set_count = 0;
48 atomic_set(&set->set_completes, 0);
49 atomic_set(&set->set_success, 0);
50 atomic_set(&set->set_finish_checked, 0);
51 set->set_cookies = NULL;
52 INIT_LIST_HEAD(&set->set_list);
53 atomic_set(&set->set_refcount, 1);
54 init_waitqueue_head(&set->set_waitq);
55 spin_lock_init(&set->set_lock);
56}
57
58void lov_finish_set(struct lov_request_set *set)
59{
60 struct list_head *pos, *n;
61
62 LASSERT(set);
63 list_for_each_safe(pos, n, &set->set_list) {
64 struct lov_request *req = list_entry(pos,
65 struct lov_request,
66 rq_link);
67 list_del_init(&req->rq_link);
68
69 if (req->rq_oi.oi_oa)
70 OBDO_FREE(req->rq_oi.oi_oa);
71 if (req->rq_oi.oi_md)
72 OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
73 if (req->rq_oi.oi_osfs)
74 OBD_FREE(req->rq_oi.oi_osfs,
75 sizeof(*req->rq_oi.oi_osfs));
76 OBD_FREE(req, sizeof(*req));
77 }
78
79 if (set->set_pga) {
80 int len = set->set_oabufs * sizeof(*set->set_pga);
81 OBD_FREE_LARGE(set->set_pga, len);
82 }
83 if (set->set_lockh)
84 lov_llh_put(set->set_lockh);
85
86 OBD_FREE(set, sizeof(*set));
87}
88
89int lov_set_finished(struct lov_request_set *set, int idempotent)
90{
91 int completes = atomic_read(&set->set_completes);
92
93 CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
94
95 if (completes == set->set_count) {
96 if (idempotent)
97 return 1;
98 if (atomic_inc_return(&set->set_finish_checked) == 1)
99 return 1;
100 }
101 return 0;
102}
103
104void lov_update_set(struct lov_request_set *set,
105 struct lov_request *req, int rc)
106{
107 req->rq_complete = 1;
108 req->rq_rc = rc;
109
110 atomic_inc(&set->set_completes);
111 if (rc == 0)
112 atomic_inc(&set->set_success);
113
114 wake_up(&set->set_waitq);
115}
116
117int lov_update_common_set(struct lov_request_set *set,
118 struct lov_request *req, int rc)
119{
120 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
121
122 lov_update_set(set, req, rc);
123
124
125 if (rc && !(lov->lov_tgts[req->rq_idx] &&
126 lov->lov_tgts[req->rq_idx]->ltd_active))
127 rc = 0;
128
129
130 return rc;
131}
132
133void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
134{
135 list_add_tail(&req->rq_link, &set->set_list);
136 set->set_count++;
137 req->rq_rqset = set;
138}
139
140static int lov_check_set(struct lov_obd *lov, int idx)
141{
142 int rc;
143 struct lov_tgt_desc *tgt;
144
145 mutex_lock(&lov->lov_lock);
146 tgt = lov->lov_tgts[idx];
147 rc = !tgt || tgt->ltd_active ||
148 (tgt->ltd_exp &&
149 class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried);
150 mutex_unlock(&lov->lov_lock);
151
152 return rc;
153}
154
155
156
157
158
159int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx)
160{
161 wait_queue_head_t waitq;
162 struct l_wait_info lwi;
163 struct lov_tgt_desc *tgt;
164 int rc = 0;
165
166 mutex_lock(&lov->lov_lock);
167
168 tgt = lov->lov_tgts[ost_idx];
169
170 if (unlikely(tgt == NULL)) {
171 rc = 0;
172 goto out;
173 }
174
175 if (likely(tgt->ltd_active)) {
176 rc = 1;
177 goto out;
178 }
179
180 if (tgt->ltd_exp && class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried) {
181 rc = 0;
182 goto out;
183 }
184
185 mutex_unlock(&lov->lov_lock);
186
187 init_waitqueue_head(&waitq);
188 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(obd_timeout),
189 cfs_time_seconds(1), NULL, NULL);
190
191 rc = l_wait_event(waitq, lov_check_set(lov, ost_idx), &lwi);
192 if (tgt != NULL && tgt->ltd_active)
193 return 1;
194
195 return 0;
196
197out:
198 mutex_unlock(&lov->lov_lock);
199 return rc;
200}
201
202static int common_attr_done(struct lov_request_set *set)
203{
204 struct list_head *pos;
205 struct lov_request *req;
206 struct obdo *tmp_oa;
207 int rc = 0, attrset = 0;
208
209 LASSERT(set->set_oi != NULL);
210
211 if (set->set_oi->oi_oa == NULL)
212 return 0;
213
214 if (!atomic_read(&set->set_success))
215 return -EIO;
216
217 OBDO_ALLOC(tmp_oa);
218 if (tmp_oa == NULL) {
219 rc = -ENOMEM;
220 goto out;
221 }
222
223 list_for_each(pos, &set->set_list) {
224 req = list_entry(pos, struct lov_request, rq_link);
225
226 if (!req->rq_complete || req->rq_rc)
227 continue;
228 if (req->rq_oi.oi_oa->o_valid == 0)
229 continue;
230 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
231 req->rq_oi.oi_oa->o_valid,
232 set->set_oi->oi_md, req->rq_stripe, &attrset);
233 }
234 if (!attrset) {
235 CERROR("No stripes had valid attrs\n");
236 rc = -EIO;
237 }
238 if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
239 (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
240
241
242 CERROR("Not all the stripes had valid attrs\n");
243 rc = -EIO;
244 goto out;
245 }
246
247 tmp_oa->o_oi = set->set_oi->oi_oa->o_oi;
248 memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
249out:
250 if (tmp_oa)
251 OBDO_FREE(tmp_oa);
252 return rc;
253
254}
255
256int lov_fini_getattr_set(struct lov_request_set *set)
257{
258 int rc = 0;
259
260 if (set == NULL)
261 return 0;
262 LASSERT(set->set_exp);
263 if (atomic_read(&set->set_completes))
264 rc = common_attr_done(set);
265
266 lov_put_reqset(set);
267
268 return rc;
269}
270
271
272
273static int cb_getattr_update(void *cookie, int rc)
274{
275 struct obd_info *oinfo = cookie;
276 struct lov_request *lovreq;
277
278 lovreq = container_of(oinfo, struct lov_request, rq_oi);
279 return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
280}
281
282int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
283 struct lov_request_set **reqset)
284{
285 struct lov_request_set *set;
286 struct lov_obd *lov = &exp->exp_obd->u.lov;
287 int rc = 0, i;
288
289 OBD_ALLOC(set, sizeof(*set));
290 if (set == NULL)
291 return -ENOMEM;
292 lov_init_set(set);
293
294 set->set_exp = exp;
295 set->set_oi = oinfo;
296
297 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
298 struct lov_oinfo *loi;
299 struct lov_request *req;
300
301 loi = oinfo->oi_md->lsm_oinfo[i];
302 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
303 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
304 if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH) {
305
306 rc = -EIO;
307 goto out_set;
308 }
309 continue;
310 }
311
312 OBD_ALLOC(req, sizeof(*req));
313 if (req == NULL) {
314 rc = -ENOMEM;
315 goto out_set;
316 }
317
318 req->rq_stripe = i;
319 req->rq_idx = loi->loi_ost_idx;
320
321 OBDO_ALLOC(req->rq_oi.oi_oa);
322 if (req->rq_oi.oi_oa == NULL) {
323 OBD_FREE(req, sizeof(*req));
324 rc = -ENOMEM;
325 goto out_set;
326 }
327 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
328 sizeof(*req->rq_oi.oi_oa));
329 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
330 req->rq_oi.oi_cb_up = cb_getattr_update;
331 req->rq_oi.oi_capa = oinfo->oi_capa;
332
333 lov_set_add_req(req, set);
334 }
335 if (!set->set_count) {
336 rc = -EIO;
337 goto out_set;
338 }
339 *reqset = set;
340 return rc;
341out_set:
342 lov_fini_getattr_set(set);
343 return rc;
344}
345
346int lov_fini_destroy_set(struct lov_request_set *set)
347{
348 if (set == NULL)
349 return 0;
350 LASSERT(set->set_exp);
351 if (atomic_read(&set->set_completes)) {
352
353 }
354
355 lov_put_reqset(set);
356
357 return 0;
358}
359
360int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
361 struct obdo *src_oa, struct lov_stripe_md *lsm,
362 struct obd_trans_info *oti,
363 struct lov_request_set **reqset)
364{
365 struct lov_request_set *set;
366 struct lov_obd *lov = &exp->exp_obd->u.lov;
367 int rc = 0, i;
368
369 OBD_ALLOC(set, sizeof(*set));
370 if (set == NULL)
371 return -ENOMEM;
372 lov_init_set(set);
373
374 set->set_exp = exp;
375 set->set_oi = oinfo;
376 set->set_oi->oi_md = lsm;
377 set->set_oi->oi_oa = src_oa;
378 set->set_oti = oti;
379 if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
380 set->set_cookies = oti->oti_logcookies;
381
382 for (i = 0; i < lsm->lsm_stripe_count; i++) {
383 struct lov_oinfo *loi;
384 struct lov_request *req;
385
386 loi = lsm->lsm_oinfo[i];
387 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
388 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
389 continue;
390 }
391
392 OBD_ALLOC(req, sizeof(*req));
393 if (req == NULL) {
394 rc = -ENOMEM;
395 goto out_set;
396 }
397
398 req->rq_stripe = i;
399 req->rq_idx = loi->loi_ost_idx;
400
401 OBDO_ALLOC(req->rq_oi.oi_oa);
402 if (req->rq_oi.oi_oa == NULL) {
403 OBD_FREE(req, sizeof(*req));
404 rc = -ENOMEM;
405 goto out_set;
406 }
407 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
408 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
409 lov_set_add_req(req, set);
410 }
411 if (!set->set_count) {
412 rc = -EIO;
413 goto out_set;
414 }
415 *reqset = set;
416 return rc;
417out_set:
418 lov_fini_destroy_set(set);
419 return rc;
420}
421
422int lov_fini_setattr_set(struct lov_request_set *set)
423{
424 int rc = 0;
425
426 if (set == NULL)
427 return 0;
428 LASSERT(set->set_exp);
429 if (atomic_read(&set->set_completes)) {
430 rc = common_attr_done(set);
431
432 }
433
434 lov_put_reqset(set);
435 return rc;
436}
437
438int lov_update_setattr_set(struct lov_request_set *set,
439 struct lov_request *req, int rc)
440{
441 struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
442 struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
443
444 lov_update_set(set, req, rc);
445
446
447 if (rc && !(lov->lov_tgts[req->rq_idx] &&
448 lov->lov_tgts[req->rq_idx]->ltd_active))
449 rc = 0;
450
451 if (rc == 0) {
452 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
453 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
454 req->rq_oi.oi_oa->o_ctime;
455 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
456 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
457 req->rq_oi.oi_oa->o_mtime;
458 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
459 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
460 req->rq_oi.oi_oa->o_atime;
461 }
462
463 return rc;
464}
465
466
467
468static int cb_setattr_update(void *cookie, int rc)
469{
470 struct obd_info *oinfo = cookie;
471 struct lov_request *lovreq;
472
473 lovreq = container_of(oinfo, struct lov_request, rq_oi);
474 return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
475}
476
477int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
478 struct obd_trans_info *oti,
479 struct lov_request_set **reqset)
480{
481 struct lov_request_set *set;
482 struct lov_obd *lov = &exp->exp_obd->u.lov;
483 int rc = 0, i;
484
485 OBD_ALLOC(set, sizeof(*set));
486 if (set == NULL)
487 return -ENOMEM;
488 lov_init_set(set);
489
490 set->set_exp = exp;
491 set->set_oti = oti;
492 set->set_oi = oinfo;
493 if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
494 set->set_cookies = oti->oti_logcookies;
495
496 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
497 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
498 struct lov_request *req;
499
500 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
501 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
502 continue;
503 }
504
505 OBD_ALLOC(req, sizeof(*req));
506 if (req == NULL) {
507 rc = -ENOMEM;
508 goto out_set;
509 }
510 req->rq_stripe = i;
511 req->rq_idx = loi->loi_ost_idx;
512
513 OBDO_ALLOC(req->rq_oi.oi_oa);
514 if (req->rq_oi.oi_oa == NULL) {
515 OBD_FREE(req, sizeof(*req));
516 rc = -ENOMEM;
517 goto out_set;
518 }
519 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
520 sizeof(*req->rq_oi.oi_oa));
521 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
522 req->rq_oi.oi_oa->o_stripe_idx = i;
523 req->rq_oi.oi_cb_up = cb_setattr_update;
524 req->rq_oi.oi_capa = oinfo->oi_capa;
525
526 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
527 int off = lov_stripe_offset(oinfo->oi_md,
528 oinfo->oi_oa->o_size, i,
529 &req->rq_oi.oi_oa->o_size);
530
531 if (off < 0 && req->rq_oi.oi_oa->o_size)
532 req->rq_oi.oi_oa->o_size--;
533
534 CDEBUG(D_INODE, "stripe %d has size %llu/%llu\n",
535 i, req->rq_oi.oi_oa->o_size,
536 oinfo->oi_oa->o_size);
537 }
538 lov_set_add_req(req, set);
539 }
540 if (!set->set_count) {
541 rc = -EIO;
542 goto out_set;
543 }
544 *reqset = set;
545 return rc;
546out_set:
547 lov_fini_setattr_set(set);
548 return rc;
549}
550
551#define LOV_U64_MAX ((__u64)~0ULL)
552#define LOV_SUM_MAX(tot, add) \
553 do { \
554 if ((tot) + (add) < (tot)) \
555 (tot) = LOV_U64_MAX; \
556 else \
557 (tot) += (add); \
558 } while (0)
559
560int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,
561 int success)
562{
563 if (success) {
564 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
565 LOV_MAGIC, 0);
566 if (osfs->os_files != LOV_U64_MAX)
567 lov_do_div64(osfs->os_files, expected_stripes);
568 if (osfs->os_ffree != LOV_U64_MAX)
569 lov_do_div64(osfs->os_ffree, expected_stripes);
570
571 spin_lock(&obd->obd_osfs_lock);
572 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
573 obd->obd_osfs_age = cfs_time_current_64();
574 spin_unlock(&obd->obd_osfs_lock);
575 return 0;
576 }
577
578 return -EIO;
579}
580
581int lov_fini_statfs_set(struct lov_request_set *set)
582{
583 int rc = 0;
584
585 if (set == NULL)
586 return 0;
587
588 if (atomic_read(&set->set_completes)) {
589 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
590 atomic_read(&set->set_success));
591 }
592 lov_put_reqset(set);
593 return rc;
594}
595
596void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
597 int success)
598{
599 int shift = 0, quit = 0;
600 __u64 tmp;
601
602 if (success == 0) {
603 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
604 } else {
605 if (osfs->os_bsize != lov_sfs->os_bsize) {
606
607
608 tmp = osfs->os_bsize | lov_sfs->os_bsize;
609 for (shift = 0; shift <= 64; ++shift) {
610 if (tmp & 1) {
611 if (quit)
612 break;
613 else
614 quit = 1;
615 shift = 0;
616 }
617 tmp >>= 1;
618 }
619 }
620
621 if (osfs->os_bsize < lov_sfs->os_bsize) {
622 osfs->os_bsize = lov_sfs->os_bsize;
623
624 osfs->os_bfree >>= shift;
625 osfs->os_bavail >>= shift;
626 osfs->os_blocks >>= shift;
627 } else if (shift != 0) {
628 lov_sfs->os_bfree >>= shift;
629 lov_sfs->os_bavail >>= shift;
630 lov_sfs->os_blocks >>= shift;
631 }
632 osfs->os_bfree += lov_sfs->os_bfree;
633 osfs->os_bavail += lov_sfs->os_bavail;
634 osfs->os_blocks += lov_sfs->os_blocks;
635
636
637
638
639
640
641
642
643
644
645
646 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
647 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
648 }
649}
650
651
652
653static int cb_statfs_update(void *cookie, int rc)
654{
655 struct obd_info *oinfo = cookie;
656 struct lov_request *lovreq;
657 struct lov_request_set *set;
658 struct obd_statfs *osfs, *lov_sfs;
659 struct lov_obd *lov;
660 struct lov_tgt_desc *tgt;
661 struct obd_device *lovobd, *tgtobd;
662 int success;
663
664 lovreq = container_of(oinfo, struct lov_request, rq_oi);
665 set = lovreq->rq_rqset;
666 lovobd = set->set_obd;
667 lov = &lovobd->u.lov;
668 osfs = set->set_oi->oi_osfs;
669 lov_sfs = oinfo->oi_osfs;
670 success = atomic_read(&set->set_success);
671
672
673 lov_update_set(set, lovreq, rc);
674 if (rc)
675 goto out;
676
677 obd_getref(lovobd);
678 tgt = lov->lov_tgts[lovreq->rq_idx];
679 if (!tgt || !tgt->ltd_active)
680 goto out_update;
681
682 tgtobd = class_exp2obd(tgt->ltd_exp);
683 spin_lock(&tgtobd->obd_osfs_lock);
684 memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
685 if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
686 tgtobd->obd_osfs_age = cfs_time_current_64();
687 spin_unlock(&tgtobd->obd_osfs_lock);
688
689out_update:
690 lov_update_statfs(osfs, lov_sfs, success);
691 obd_putref(lovobd);
692
693out:
694 if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
695 lov_set_finished(set, 0)) {
696 lov_statfs_interpret(NULL, set, set->set_count !=
697 atomic_read(&set->set_success));
698 }
699
700 return 0;
701}
702
703int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
704 struct lov_request_set **reqset)
705{
706 struct lov_request_set *set;
707 struct lov_obd *lov = &obd->u.lov;
708 int rc = 0, i;
709
710 OBD_ALLOC(set, sizeof(*set));
711 if (set == NULL)
712 return -ENOMEM;
713 lov_init_set(set);
714
715 set->set_obd = obd;
716 set->set_oi = oinfo;
717
718
719 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
720 struct lov_request *req;
721
722 if (lov->lov_tgts[i] == NULL ||
723 (!lov_check_and_wait_active(lov, i) &&
724 (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
725 CDEBUG(D_HA, "lov idx %d inactive\n", i);
726 continue;
727 }
728
729
730
731 if (!lov->lov_tgts[i]->ltd_exp) {
732 CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
733 continue;
734 }
735
736 OBD_ALLOC(req, sizeof(*req));
737 if (req == NULL) {
738 rc = -ENOMEM;
739 goto out_set;
740 }
741
742 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
743 if (req->rq_oi.oi_osfs == NULL) {
744 OBD_FREE(req, sizeof(*req));
745 rc = -ENOMEM;
746 goto out_set;
747 }
748
749 req->rq_idx = i;
750 req->rq_oi.oi_cb_up = cb_statfs_update;
751 req->rq_oi.oi_flags = oinfo->oi_flags;
752
753 lov_set_add_req(req, set);
754 }
755 if (!set->set_count) {
756 rc = -EIO;
757 goto out_set;
758 }
759 *reqset = set;
760 return rc;
761out_set:
762 lov_fini_statfs_set(set);
763 return rc;
764}
765