linux/drivers/staging/lustre/lustre/lov/lov_request.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 */
  36
  37#define DEBUG_SUBSYSTEM S_LOV
  38
  39#include "../../include/linux/libcfs/libcfs.h"
  40
  41#include "../include/obd_class.h"
  42#include "../include/lustre/lustre_idl.h"
  43#include "lov_internal.h"
  44
  45static void lov_init_set(struct lov_request_set *set)
  46{
  47        set->set_count = 0;
  48        atomic_set(&set->set_completes, 0);
  49        atomic_set(&set->set_success, 0);
  50        atomic_set(&set->set_finish_checked, 0);
  51        set->set_cookies = NULL;
  52        INIT_LIST_HEAD(&set->set_list);
  53        atomic_set(&set->set_refcount, 1);
  54        init_waitqueue_head(&set->set_waitq);
  55        spin_lock_init(&set->set_lock);
  56}
  57
  58void lov_finish_set(struct lov_request_set *set)
  59{
  60        struct list_head *pos, *n;
  61
  62        LASSERT(set);
  63        list_for_each_safe(pos, n, &set->set_list) {
  64                struct lov_request *req = list_entry(pos,
  65                                                         struct lov_request,
  66                                                         rq_link);
  67                list_del_init(&req->rq_link);
  68
  69                if (req->rq_oi.oi_oa)
  70                        OBDO_FREE(req->rq_oi.oi_oa);
  71                if (req->rq_oi.oi_md)
  72                        OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
  73                if (req->rq_oi.oi_osfs)
  74                        OBD_FREE(req->rq_oi.oi_osfs,
  75                                 sizeof(*req->rq_oi.oi_osfs));
  76                OBD_FREE(req, sizeof(*req));
  77        }
  78
  79        if (set->set_pga) {
  80                int len = set->set_oabufs * sizeof(*set->set_pga);
  81                OBD_FREE_LARGE(set->set_pga, len);
  82        }
  83        if (set->set_lockh)
  84                lov_llh_put(set->set_lockh);
  85
  86        OBD_FREE(set, sizeof(*set));
  87}
  88
  89int lov_set_finished(struct lov_request_set *set, int idempotent)
  90{
  91        int completes = atomic_read(&set->set_completes);
  92
  93        CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
  94
  95        if (completes == set->set_count) {
  96                if (idempotent)
  97                        return 1;
  98                if (atomic_inc_return(&set->set_finish_checked) == 1)
  99                        return 1;
 100        }
 101        return 0;
 102}
 103
 104void lov_update_set(struct lov_request_set *set,
 105                    struct lov_request *req, int rc)
 106{
 107        req->rq_complete = 1;
 108        req->rq_rc = rc;
 109
 110        atomic_inc(&set->set_completes);
 111        if (rc == 0)
 112                atomic_inc(&set->set_success);
 113
 114        wake_up(&set->set_waitq);
 115}
 116
 117int lov_update_common_set(struct lov_request_set *set,
 118                          struct lov_request *req, int rc)
 119{
 120        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
 121
 122        lov_update_set(set, req, rc);
 123
 124        /* grace error on inactive ost */
 125        if (rc && !(lov->lov_tgts[req->rq_idx] &&
 126                    lov->lov_tgts[req->rq_idx]->ltd_active))
 127                rc = 0;
 128
 129        /* FIXME in raid1 regime, should return 0 */
 130        return rc;
 131}
 132
 133void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
 134{
 135        list_add_tail(&req->rq_link, &set->set_list);
 136        set->set_count++;
 137        req->rq_rqset = set;
 138}
 139
 140static int lov_check_set(struct lov_obd *lov, int idx)
 141{
 142        int rc;
 143        struct lov_tgt_desc *tgt;
 144
 145        mutex_lock(&lov->lov_lock);
 146        tgt = lov->lov_tgts[idx];
 147        rc = !tgt || tgt->ltd_active ||
 148                (tgt->ltd_exp &&
 149                 class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried);
 150        mutex_unlock(&lov->lov_lock);
 151
 152        return rc;
 153}
 154
 155/* Check if the OSC connection exists and is active.
 156 * If the OSC has not yet had a chance to connect to the OST the first time,
 157 * wait once for it to connect instead of returning an error.
 158 */
 159int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx)
 160{
 161        wait_queue_head_t waitq;
 162        struct l_wait_info lwi;
 163        struct lov_tgt_desc *tgt;
 164        int rc = 0;
 165
 166        mutex_lock(&lov->lov_lock);
 167
 168        tgt = lov->lov_tgts[ost_idx];
 169
 170        if (unlikely(tgt == NULL)) {
 171                rc = 0;
 172                goto out;
 173        }
 174
 175        if (likely(tgt->ltd_active)) {
 176                rc = 1;
 177                goto out;
 178        }
 179
 180        if (tgt->ltd_exp && class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried) {
 181                rc = 0;
 182                goto out;
 183        }
 184
 185        mutex_unlock(&lov->lov_lock);
 186
 187        init_waitqueue_head(&waitq);
 188        lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(obd_timeout),
 189                                   cfs_time_seconds(1), NULL, NULL);
 190
 191        rc = l_wait_event(waitq, lov_check_set(lov, ost_idx), &lwi);
 192        if (tgt != NULL && tgt->ltd_active)
 193                return 1;
 194
 195        return 0;
 196
 197out:
 198        mutex_unlock(&lov->lov_lock);
 199        return rc;
 200}
 201
 202static int common_attr_done(struct lov_request_set *set)
 203{
 204        struct list_head *pos;
 205        struct lov_request *req;
 206        struct obdo *tmp_oa;
 207        int rc = 0, attrset = 0;
 208
 209        LASSERT(set->set_oi != NULL);
 210
 211        if (set->set_oi->oi_oa == NULL)
 212                return 0;
 213
 214        if (!atomic_read(&set->set_success))
 215                return -EIO;
 216
 217        OBDO_ALLOC(tmp_oa);
 218        if (tmp_oa == NULL) {
 219                rc = -ENOMEM;
 220                goto out;
 221        }
 222
 223        list_for_each(pos, &set->set_list) {
 224                req = list_entry(pos, struct lov_request, rq_link);
 225
 226                if (!req->rq_complete || req->rq_rc)
 227                        continue;
 228                if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
 229                        continue;
 230                lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
 231                                req->rq_oi.oi_oa->o_valid,
 232                                set->set_oi->oi_md, req->rq_stripe, &attrset);
 233        }
 234        if (!attrset) {
 235                CERROR("No stripes had valid attrs\n");
 236                rc = -EIO;
 237        }
 238        if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
 239            (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
 240                /* When we take attributes of some epoch, we require all the
 241                 * ost to be active. */
 242                CERROR("Not all the stripes had valid attrs\n");
 243                rc = -EIO;
 244                goto out;
 245        }
 246
 247        tmp_oa->o_oi = set->set_oi->oi_oa->o_oi;
 248        memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
 249out:
 250        if (tmp_oa)
 251                OBDO_FREE(tmp_oa);
 252        return rc;
 253
 254}
 255
 256int lov_fini_getattr_set(struct lov_request_set *set)
 257{
 258        int rc = 0;
 259
 260        if (set == NULL)
 261                return 0;
 262        LASSERT(set->set_exp);
 263        if (atomic_read(&set->set_completes))
 264                rc = common_attr_done(set);
 265
 266        lov_put_reqset(set);
 267
 268        return rc;
 269}
 270
 271/* The callback for osc_getattr_async that finalizes a request info when a
 272 * response is received. */
 273static int cb_getattr_update(void *cookie, int rc)
 274{
 275        struct obd_info *oinfo = cookie;
 276        struct lov_request *lovreq;
 277
 278        lovreq = container_of(oinfo, struct lov_request, rq_oi);
 279        return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
 280}
 281
 282int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
 283                         struct lov_request_set **reqset)
 284{
 285        struct lov_request_set *set;
 286        struct lov_obd *lov = &exp->exp_obd->u.lov;
 287        int rc = 0, i;
 288
 289        OBD_ALLOC(set, sizeof(*set));
 290        if (set == NULL)
 291                return -ENOMEM;
 292        lov_init_set(set);
 293
 294        set->set_exp = exp;
 295        set->set_oi = oinfo;
 296
 297        for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
 298                struct lov_oinfo *loi;
 299                struct lov_request *req;
 300
 301                loi = oinfo->oi_md->lsm_oinfo[i];
 302                if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
 303                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
 304                        if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH) {
 305                                /* SOM requires all the OSTs to be active. */
 306                                rc = -EIO;
 307                                goto out_set;
 308                        }
 309                        continue;
 310                }
 311
 312                OBD_ALLOC(req, sizeof(*req));
 313                if (req == NULL) {
 314                        rc = -ENOMEM;
 315                        goto out_set;
 316                }
 317
 318                req->rq_stripe = i;
 319                req->rq_idx = loi->loi_ost_idx;
 320
 321                OBDO_ALLOC(req->rq_oi.oi_oa);
 322                if (req->rq_oi.oi_oa == NULL) {
 323                        OBD_FREE(req, sizeof(*req));
 324                        rc = -ENOMEM;
 325                        goto out_set;
 326                }
 327                memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
 328                       sizeof(*req->rq_oi.oi_oa));
 329                req->rq_oi.oi_oa->o_oi = loi->loi_oi;
 330                req->rq_oi.oi_cb_up = cb_getattr_update;
 331                req->rq_oi.oi_capa = oinfo->oi_capa;
 332
 333                lov_set_add_req(req, set);
 334        }
 335        if (!set->set_count) {
 336                rc = -EIO;
 337                goto out_set;
 338        }
 339        *reqset = set;
 340        return rc;
 341out_set:
 342        lov_fini_getattr_set(set);
 343        return rc;
 344}
 345
 346int lov_fini_destroy_set(struct lov_request_set *set)
 347{
 348        if (set == NULL)
 349                return 0;
 350        LASSERT(set->set_exp);
 351        if (atomic_read(&set->set_completes)) {
 352                /* FIXME update qos data here */
 353        }
 354
 355        lov_put_reqset(set);
 356
 357        return 0;
 358}
 359
 360int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
 361                         struct obdo *src_oa, struct lov_stripe_md *lsm,
 362                         struct obd_trans_info *oti,
 363                         struct lov_request_set **reqset)
 364{
 365        struct lov_request_set *set;
 366        struct lov_obd *lov = &exp->exp_obd->u.lov;
 367        int rc = 0, i;
 368
 369        OBD_ALLOC(set, sizeof(*set));
 370        if (set == NULL)
 371                return -ENOMEM;
 372        lov_init_set(set);
 373
 374        set->set_exp = exp;
 375        set->set_oi = oinfo;
 376        set->set_oi->oi_md = lsm;
 377        set->set_oi->oi_oa = src_oa;
 378        set->set_oti = oti;
 379        if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
 380                set->set_cookies = oti->oti_logcookies;
 381
 382        for (i = 0; i < lsm->lsm_stripe_count; i++) {
 383                struct lov_oinfo *loi;
 384                struct lov_request *req;
 385
 386                loi = lsm->lsm_oinfo[i];
 387                if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
 388                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
 389                        continue;
 390                }
 391
 392                OBD_ALLOC(req, sizeof(*req));
 393                if (req == NULL) {
 394                        rc = -ENOMEM;
 395                        goto out_set;
 396                }
 397
 398                req->rq_stripe = i;
 399                req->rq_idx = loi->loi_ost_idx;
 400
 401                OBDO_ALLOC(req->rq_oi.oi_oa);
 402                if (req->rq_oi.oi_oa == NULL) {
 403                        OBD_FREE(req, sizeof(*req));
 404                        rc = -ENOMEM;
 405                        goto out_set;
 406                }
 407                memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
 408                req->rq_oi.oi_oa->o_oi = loi->loi_oi;
 409                lov_set_add_req(req, set);
 410        }
 411        if (!set->set_count) {
 412                rc = -EIO;
 413                goto out_set;
 414        }
 415        *reqset = set;
 416        return rc;
 417out_set:
 418        lov_fini_destroy_set(set);
 419        return rc;
 420}
 421
 422int lov_fini_setattr_set(struct lov_request_set *set)
 423{
 424        int rc = 0;
 425
 426        if (set == NULL)
 427                return 0;
 428        LASSERT(set->set_exp);
 429        if (atomic_read(&set->set_completes)) {
 430                rc = common_attr_done(set);
 431                /* FIXME update qos data here */
 432        }
 433
 434        lov_put_reqset(set);
 435        return rc;
 436}
 437
 438int lov_update_setattr_set(struct lov_request_set *set,
 439                           struct lov_request *req, int rc)
 440{
 441        struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
 442        struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
 443
 444        lov_update_set(set, req, rc);
 445
 446        /* grace error on inactive ost */
 447        if (rc && !(lov->lov_tgts[req->rq_idx] &&
 448                    lov->lov_tgts[req->rq_idx]->ltd_active))
 449                rc = 0;
 450
 451        if (rc == 0) {
 452                if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
 453                        lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
 454                                req->rq_oi.oi_oa->o_ctime;
 455                if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
 456                        lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
 457                                req->rq_oi.oi_oa->o_mtime;
 458                if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
 459                        lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
 460                                req->rq_oi.oi_oa->o_atime;
 461        }
 462
 463        return rc;
 464}
 465
 466/* The callback for osc_setattr_async that finalizes a request info when a
 467 * response is received. */
 468static int cb_setattr_update(void *cookie, int rc)
 469{
 470        struct obd_info *oinfo = cookie;
 471        struct lov_request *lovreq;
 472
 473        lovreq = container_of(oinfo, struct lov_request, rq_oi);
 474        return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
 475}
 476
 477int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
 478                         struct obd_trans_info *oti,
 479                         struct lov_request_set **reqset)
 480{
 481        struct lov_request_set *set;
 482        struct lov_obd *lov = &exp->exp_obd->u.lov;
 483        int rc = 0, i;
 484
 485        OBD_ALLOC(set, sizeof(*set));
 486        if (set == NULL)
 487                return -ENOMEM;
 488        lov_init_set(set);
 489
 490        set->set_exp = exp;
 491        set->set_oti = oti;
 492        set->set_oi = oinfo;
 493        if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
 494                set->set_cookies = oti->oti_logcookies;
 495
 496        for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
 497                struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
 498                struct lov_request *req;
 499
 500                if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
 501                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
 502                        continue;
 503                }
 504
 505                OBD_ALLOC(req, sizeof(*req));
 506                if (req == NULL) {
 507                        rc = -ENOMEM;
 508                        goto out_set;
 509                }
 510                req->rq_stripe = i;
 511                req->rq_idx = loi->loi_ost_idx;
 512
 513                OBDO_ALLOC(req->rq_oi.oi_oa);
 514                if (req->rq_oi.oi_oa == NULL) {
 515                        OBD_FREE(req, sizeof(*req));
 516                        rc = -ENOMEM;
 517                        goto out_set;
 518                }
 519                memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
 520                       sizeof(*req->rq_oi.oi_oa));
 521                req->rq_oi.oi_oa->o_oi = loi->loi_oi;
 522                req->rq_oi.oi_oa->o_stripe_idx = i;
 523                req->rq_oi.oi_cb_up = cb_setattr_update;
 524                req->rq_oi.oi_capa = oinfo->oi_capa;
 525
 526                if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
 527                        int off = lov_stripe_offset(oinfo->oi_md,
 528                                                    oinfo->oi_oa->o_size, i,
 529                                                    &req->rq_oi.oi_oa->o_size);
 530
 531                        if (off < 0 && req->rq_oi.oi_oa->o_size)
 532                                req->rq_oi.oi_oa->o_size--;
 533
 534                        CDEBUG(D_INODE, "stripe %d has size %llu/%llu\n",
 535                               i, req->rq_oi.oi_oa->o_size,
 536                               oinfo->oi_oa->o_size);
 537                }
 538                lov_set_add_req(req, set);
 539        }
 540        if (!set->set_count) {
 541                rc = -EIO;
 542                goto out_set;
 543        }
 544        *reqset = set;
 545        return rc;
 546out_set:
 547        lov_fini_setattr_set(set);
 548        return rc;
 549}
 550
 551#define LOV_U64_MAX ((__u64)~0ULL)
 552#define LOV_SUM_MAX(tot, add)                                      \
 553        do {                                                        \
 554                if ((tot) + (add) < (tot))                            \
 555                        (tot) = LOV_U64_MAX;                        \
 556                else                                                \
 557                        (tot) += (add);                          \
 558        } while (0)
 559
 560int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,
 561                    int success)
 562{
 563        if (success) {
 564                __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
 565                                                           LOV_MAGIC, 0);
 566                if (osfs->os_files != LOV_U64_MAX)
 567                        lov_do_div64(osfs->os_files, expected_stripes);
 568                if (osfs->os_ffree != LOV_U64_MAX)
 569                        lov_do_div64(osfs->os_ffree, expected_stripes);
 570
 571                spin_lock(&obd->obd_osfs_lock);
 572                memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
 573                obd->obd_osfs_age = cfs_time_current_64();
 574                spin_unlock(&obd->obd_osfs_lock);
 575                return 0;
 576        }
 577
 578        return -EIO;
 579}
 580
 581int lov_fini_statfs_set(struct lov_request_set *set)
 582{
 583        int rc = 0;
 584
 585        if (set == NULL)
 586                return 0;
 587
 588        if (atomic_read(&set->set_completes)) {
 589                rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
 590                                     atomic_read(&set->set_success));
 591        }
 592        lov_put_reqset(set);
 593        return rc;
 594}
 595
 596void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
 597                       int success)
 598{
 599        int shift = 0, quit = 0;
 600        __u64 tmp;
 601
 602        if (success == 0) {
 603                memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
 604        } else {
 605                if (osfs->os_bsize != lov_sfs->os_bsize) {
 606                        /* assume all block sizes are always powers of 2 */
 607                        /* get the bits difference */
 608                        tmp = osfs->os_bsize | lov_sfs->os_bsize;
 609                        for (shift = 0; shift <= 64; ++shift) {
 610                                if (tmp & 1) {
 611                                        if (quit)
 612                                                break;
 613                                        else
 614                                                quit = 1;
 615                                        shift = 0;
 616                                }
 617                                tmp >>= 1;
 618                        }
 619                }
 620
 621                if (osfs->os_bsize < lov_sfs->os_bsize) {
 622                        osfs->os_bsize = lov_sfs->os_bsize;
 623
 624                        osfs->os_bfree  >>= shift;
 625                        osfs->os_bavail >>= shift;
 626                        osfs->os_blocks >>= shift;
 627                } else if (shift != 0) {
 628                        lov_sfs->os_bfree  >>= shift;
 629                        lov_sfs->os_bavail >>= shift;
 630                        lov_sfs->os_blocks >>= shift;
 631                }
 632                osfs->os_bfree += lov_sfs->os_bfree;
 633                osfs->os_bavail += lov_sfs->os_bavail;
 634                osfs->os_blocks += lov_sfs->os_blocks;
 635                /* XXX not sure about this one - depends on policy.
 636                 *   - could be minimum if we always stripe on all OBDs
 637                 *     (but that would be wrong for any other policy,
 638                 *     if one of the OBDs has no more objects left)
 639                 *   - could be sum if we stripe whole objects
 640                 *   - could be average, just to give a nice number
 641                 *
 642                 * To give a "reasonable" (if not wholly accurate)
 643                 * number, we divide the total number of free objects
 644                 * by expected stripe count (watch out for overflow).
 645                 */
 646                LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
 647                LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
 648        }
 649}
 650
 651/* The callback for osc_statfs_async that finalizes a request info when a
 652 * response is received. */
 653static int cb_statfs_update(void *cookie, int rc)
 654{
 655        struct obd_info *oinfo = cookie;
 656        struct lov_request *lovreq;
 657        struct lov_request_set *set;
 658        struct obd_statfs *osfs, *lov_sfs;
 659        struct lov_obd *lov;
 660        struct lov_tgt_desc *tgt;
 661        struct obd_device *lovobd, *tgtobd;
 662        int success;
 663
 664        lovreq = container_of(oinfo, struct lov_request, rq_oi);
 665        set = lovreq->rq_rqset;
 666        lovobd = set->set_obd;
 667        lov = &lovobd->u.lov;
 668        osfs = set->set_oi->oi_osfs;
 669        lov_sfs = oinfo->oi_osfs;
 670        success = atomic_read(&set->set_success);
 671        /* XXX: the same is done in lov_update_common_set, however
 672           lovset->set_exp is not initialized. */
 673        lov_update_set(set, lovreq, rc);
 674        if (rc)
 675                goto out;
 676
 677        obd_getref(lovobd);
 678        tgt = lov->lov_tgts[lovreq->rq_idx];
 679        if (!tgt || !tgt->ltd_active)
 680                goto out_update;
 681
 682        tgtobd = class_exp2obd(tgt->ltd_exp);
 683        spin_lock(&tgtobd->obd_osfs_lock);
 684        memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
 685        if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
 686                tgtobd->obd_osfs_age = cfs_time_current_64();
 687        spin_unlock(&tgtobd->obd_osfs_lock);
 688
 689out_update:
 690        lov_update_statfs(osfs, lov_sfs, success);
 691        obd_putref(lovobd);
 692
 693out:
 694        if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
 695            lov_set_finished(set, 0)) {
 696                lov_statfs_interpret(NULL, set, set->set_count !=
 697                                     atomic_read(&set->set_success));
 698        }
 699
 700        return 0;
 701}
 702
 703int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
 704                        struct lov_request_set **reqset)
 705{
 706        struct lov_request_set *set;
 707        struct lov_obd *lov = &obd->u.lov;
 708        int rc = 0, i;
 709
 710        OBD_ALLOC(set, sizeof(*set));
 711        if (set == NULL)
 712                return -ENOMEM;
 713        lov_init_set(set);
 714
 715        set->set_obd = obd;
 716        set->set_oi = oinfo;
 717
 718        /* We only get block data from the OBD */
 719        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
 720                struct lov_request *req;
 721
 722                if (lov->lov_tgts[i] == NULL ||
 723                    (!lov_check_and_wait_active(lov, i) &&
 724                     (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
 725                        CDEBUG(D_HA, "lov idx %d inactive\n", i);
 726                        continue;
 727                }
 728
 729                /* skip targets that have been explicitly disabled by the
 730                 * administrator */
 731                if (!lov->lov_tgts[i]->ltd_exp) {
 732                        CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
 733                        continue;
 734                }
 735
 736                OBD_ALLOC(req, sizeof(*req));
 737                if (req == NULL) {
 738                        rc = -ENOMEM;
 739                        goto out_set;
 740                }
 741
 742                OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
 743                if (req->rq_oi.oi_osfs == NULL) {
 744                        OBD_FREE(req, sizeof(*req));
 745                        rc = -ENOMEM;
 746                        goto out_set;
 747                }
 748
 749                req->rq_idx = i;
 750                req->rq_oi.oi_cb_up = cb_statfs_update;
 751                req->rq_oi.oi_flags = oinfo->oi_flags;
 752
 753                lov_set_add_req(req, set);
 754        }
 755        if (!set->set_count) {
 756                rc = -EIO;
 757                goto out_set;
 758        }
 759        *reqset = set;
 760        return rc;
 761out_set:
 762        lov_fini_statfs_set(set);
 763        return rc;
 764}
 765