linux/drivers/staging/lustre/lustre/lov/lov_object.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.gnu.org/licenses/gpl-2.0.html
  19 *
  20 * GPL HEADER END
  21 */
  22/*
  23 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  24 * Use is subject to license terms.
  25 *
  26 * Copyright (c) 2011, 2015, Intel Corporation.
  27 */
  28/*
  29 * This file is part of Lustre, http://www.lustre.org/
  30 * Lustre is a trademark of Sun Microsystems, Inc.
  31 *
  32 * Implementation of cl_object for LOV layer.
  33 *
  34 *   Author: Nikita Danilov <nikita.danilov@sun.com>
  35 *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
  36 */
  37
  38#define DEBUG_SUBSYSTEM S_LOV
  39
  40#include "lov_cl_internal.h"
  41
  42/** \addtogroup lov
  43 *  @{
  44 */
  45
  46/*****************************************************************************
  47 *
  48 * Layout operations.
  49 *
  50 */
  51
  52struct lov_layout_operations {
  53        int (*llo_init)(const struct lu_env *env, struct lov_device *dev,
  54                        struct lov_object *lov,
  55                        const struct cl_object_conf *conf,
  56                        union lov_layout_state *state);
  57        int (*llo_delete)(const struct lu_env *env, struct lov_object *lov,
  58                          union lov_layout_state *state);
  59        void (*llo_fini)(const struct lu_env *env, struct lov_object *lov,
  60                         union lov_layout_state *state);
  61        void (*llo_install)(const struct lu_env *env, struct lov_object *lov,
  62                            union lov_layout_state *state);
  63        int  (*llo_print)(const struct lu_env *env, void *cookie,
  64                          lu_printer_t p, const struct lu_object *o);
  65        int  (*llo_page_init)(const struct lu_env *env, struct cl_object *obj,
  66                              struct cl_page *page, pgoff_t index);
  67        int  (*llo_lock_init)(const struct lu_env *env,
  68                              struct cl_object *obj, struct cl_lock *lock,
  69                              const struct cl_io *io);
  70        int  (*llo_io_init)(const struct lu_env *env,
  71                            struct cl_object *obj, struct cl_io *io);
  72        int  (*llo_getattr)(const struct lu_env *env, struct cl_object *obj,
  73                            struct cl_attr *attr);
  74};
  75
  76static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov);
  77
  78void lov_lsm_put(struct cl_object *unused, struct lov_stripe_md *lsm)
  79{
  80        if (lsm)
  81                lov_free_memmd(&lsm);
  82}
  83EXPORT_SYMBOL(lov_lsm_put);
  84
  85/*****************************************************************************
  86 *
  87 * Lov object layout operations.
  88 *
  89 */
  90
  91static void lov_install_empty(const struct lu_env *env,
  92                              struct lov_object *lov,
  93                              union  lov_layout_state *state)
  94{
  95        /*
  96         * File without objects.
  97         */
  98}
  99
 100static int lov_init_empty(const struct lu_env *env,
 101                          struct lov_device *dev, struct lov_object *lov,
 102                          const struct cl_object_conf *conf,
 103                          union  lov_layout_state *state)
 104{
 105        return 0;
 106}
 107
 108static void lov_install_raid0(const struct lu_env *env,
 109                              struct lov_object *lov,
 110                              union  lov_layout_state *state)
 111{
 112}
 113
 114static struct cl_object *lov_sub_find(const struct lu_env *env,
 115                                      struct cl_device *dev,
 116                                      const struct lu_fid *fid,
 117                                      const struct cl_object_conf *conf)
 118{
 119        struct lu_object *o;
 120
 121        o = lu_object_find_at(env, cl2lu_dev(dev), fid, &conf->coc_lu);
 122        LASSERT(ergo(!IS_ERR(o), o->lo_dev->ld_type == &lovsub_device_type));
 123        return lu2cl(o);
 124}
 125
 126static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
 127                        struct cl_object *stripe, struct lov_layout_raid0 *r0,
 128                        int idx)
 129{
 130        struct cl_object_header *hdr;
 131        struct cl_object_header *subhdr;
 132        struct cl_object_header *parent;
 133        struct lov_oinfo        *oinfo;
 134        int result;
 135
 136        if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) {
 137                /* For sanity:test_206.
 138                 * Do not leave the object in cache to avoid accessing
 139                 * freed memory. This is because osc_object is referring to
 140                 * lov_oinfo of lsm_stripe_data which will be freed due to
 141                 * this failure.
 142                 */
 143                cl_object_kill(env, stripe);
 144                cl_object_put(env, stripe);
 145                return -EIO;
 146        }
 147
 148        hdr    = cl_object_header(lov2cl(lov));
 149        subhdr = cl_object_header(stripe);
 150
 151        oinfo = lov->lo_lsm->lsm_oinfo[idx];
 152        CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: ostid: "DOSTID
 153               " idx: %d gen: %d\n",
 154               PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
 155               PFID(&hdr->coh_lu.loh_fid), hdr, POSTID(&oinfo->loi_oi),
 156               oinfo->loi_ost_idx, oinfo->loi_ost_gen);
 157
 158        /* reuse ->coh_attr_guard to protect coh_parent change */
 159        spin_lock(&subhdr->coh_attr_guard);
 160        parent = subhdr->coh_parent;
 161        if (!parent) {
 162                subhdr->coh_parent = hdr;
 163                spin_unlock(&subhdr->coh_attr_guard);
 164                subhdr->coh_nesting = hdr->coh_nesting + 1;
 165                lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
 166                r0->lo_sub[idx] = cl2lovsub(stripe);
 167                r0->lo_sub[idx]->lso_super = lov;
 168                r0->lo_sub[idx]->lso_index = idx;
 169                result = 0;
 170        } else {
 171                struct lu_object  *old_obj;
 172                struct lov_object *old_lov;
 173                unsigned int mask = D_INODE;
 174
 175                spin_unlock(&subhdr->coh_attr_guard);
 176                old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type);
 177                LASSERT(old_obj);
 178                old_lov = cl2lov(lu2cl(old_obj));
 179                if (old_lov->lo_layout_invalid) {
 180                        /* the object's layout has already changed but isn't
 181                         * refreshed
 182                         */
 183                        lu_object_unhash(env, &stripe->co_lu);
 184                        result = -EAGAIN;
 185                } else {
 186                        mask = D_ERROR;
 187                        result = -EIO;
 188                }
 189
 190                LU_OBJECT_DEBUG(mask, env, &stripe->co_lu,
 191                                "stripe %d is already owned.", idx);
 192                LU_OBJECT_DEBUG(mask, env, old_obj, "owned.");
 193                LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n");
 194                cl_object_put(env, stripe);
 195        }
 196        return result;
 197}
 198
 199static int lov_page_slice_fixup(struct lov_object *lov,
 200                                struct cl_object *stripe)
 201{
 202        struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
 203        struct cl_object *o;
 204
 205        if (!stripe)
 206                return hdr->coh_page_bufsize - lov->lo_cl.co_slice_off -
 207                       cfs_size_round(sizeof(struct lov_page));
 208
 209        cl_object_for_each(o, stripe)
 210                o->co_slice_off += hdr->coh_page_bufsize;
 211
 212        return cl_object_header(stripe)->coh_page_bufsize;
 213}
 214
 215static int lov_init_raid0(const struct lu_env *env,
 216                          struct lov_device *dev, struct lov_object *lov,
 217                          const struct cl_object_conf *conf,
 218                          union  lov_layout_state *state)
 219{
 220        int result;
 221        int i;
 222
 223        struct cl_object        *stripe;
 224        struct lov_thread_info  *lti     = lov_env_info(env);
 225        struct cl_object_conf   *subconf = &lti->lti_stripe_conf;
 226        struct lov_stripe_md    *lsm     = conf->u.coc_md->lsm;
 227        struct lu_fid      *ofid    = &lti->lti_fid;
 228        struct lov_layout_raid0 *r0      = &state->raid0;
 229
 230        if (lsm->lsm_magic != LOV_MAGIC_V1 && lsm->lsm_magic != LOV_MAGIC_V3) {
 231                dump_lsm(D_ERROR, lsm);
 232                LASSERTF(0, "magic mismatch, expected %d/%d, actual %d.\n",
 233                         LOV_MAGIC_V1, LOV_MAGIC_V3, lsm->lsm_magic);
 234        }
 235
 236        LASSERT(!lov->lo_lsm);
 237        lov->lo_lsm = lsm_addref(lsm);
 238        lov->lo_layout_invalid = true;
 239        r0->lo_nr  = lsm->lsm_stripe_count;
 240        LASSERT(r0->lo_nr <= lov_targets_nr(dev));
 241
 242        r0->lo_sub = libcfs_kvzalloc(r0->lo_nr * sizeof(r0->lo_sub[0]),
 243                                     GFP_NOFS);
 244        if (r0->lo_sub) {
 245                int psz = 0;
 246
 247                result = 0;
 248                subconf->coc_inode = conf->coc_inode;
 249                spin_lock_init(&r0->lo_sub_lock);
 250                /*
 251                 * Create stripe cl_objects.
 252                 */
 253                for (i = 0; i < r0->lo_nr && result == 0; ++i) {
 254                        struct cl_device *subdev;
 255                        struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
 256                        int ost_idx = oinfo->loi_ost_idx;
 257
 258                        if (lov_oinfo_is_dummy(oinfo))
 259                                continue;
 260
 261                        result = ostid_to_fid(ofid, &oinfo->loi_oi,
 262                                              oinfo->loi_ost_idx);
 263                        if (result != 0)
 264                                goto out;
 265
 266                        subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
 267                        subconf->u.coc_oinfo = oinfo;
 268                        LASSERTF(subdev, "not init ost %d\n", ost_idx);
 269                        /* In the function below, .hs_keycmp resolves to
 270                         * lu_obj_hop_keycmp()
 271                         */
 272                        /* coverity[overrun-buffer-val] */
 273                        stripe = lov_sub_find(env, subdev, ofid, subconf);
 274                        if (!IS_ERR(stripe)) {
 275                                result = lov_init_sub(env, lov, stripe, r0, i);
 276                                if (result == -EAGAIN) { /* try again */
 277                                        --i;
 278                                        result = 0;
 279                                        continue;
 280                                }
 281                        } else {
 282                                result = PTR_ERR(stripe);
 283                        }
 284
 285                        if (result == 0) {
 286                                int sz = lov_page_slice_fixup(lov, stripe);
 287
 288                                LASSERT(ergo(psz > 0, psz == sz));
 289                                psz = sz;
 290                        }
 291                }
 292                if (result == 0)
 293                        cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
 294        } else {
 295                result = -ENOMEM;
 296        }
 297out:
 298        return result;
 299}
 300
 301static int lov_init_released(const struct lu_env *env,
 302                             struct lov_device *dev, struct lov_object *lov,
 303                             const struct cl_object_conf *conf,
 304                             union  lov_layout_state *state)
 305{
 306        struct lov_stripe_md *lsm = conf->u.coc_md->lsm;
 307
 308        LASSERT(lsm);
 309        LASSERT(lsm_is_released(lsm));
 310        LASSERT(!lov->lo_lsm);
 311
 312        lov->lo_lsm = lsm_addref(lsm);
 313        return 0;
 314}
 315
 316static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
 317                            union lov_layout_state *state)
 318{
 319        LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
 320
 321        lov_layout_wait(env, lov);
 322        return 0;
 323}
 324
 325static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
 326                               struct lovsub_object *los, int idx)
 327{
 328        struct cl_object        *sub;
 329        struct lov_layout_raid0 *r0;
 330        struct lu_site    *site;
 331        struct lu_site_bkt_data *bkt;
 332        wait_queue_t      *waiter;
 333
 334        r0  = &lov->u.raid0;
 335        LASSERT(r0->lo_sub[idx] == los);
 336
 337        sub  = lovsub2cl(los);
 338        site = sub->co_lu.lo_dev->ld_site;
 339        bkt  = lu_site_bkt_from_fid(site, &sub->co_lu.lo_header->loh_fid);
 340
 341        cl_object_kill(env, sub);
 342        /* release a reference to the sub-object and ... */
 343        lu_object_ref_del(&sub->co_lu, "lov-parent", lov);
 344        cl_object_put(env, sub);
 345
 346        /* ... wait until it is actually destroyed---sub-object clears its
 347         * ->lo_sub[] slot in lovsub_object_fini()
 348         */
 349        if (r0->lo_sub[idx] == los) {
 350                waiter = &lov_env_info(env)->lti_waiter;
 351                init_waitqueue_entry(waiter, current);
 352                add_wait_queue(&bkt->lsb_marche_funebre, waiter);
 353                set_current_state(TASK_UNINTERRUPTIBLE);
 354                while (1) {
 355                        /* this wait-queue is signaled at the end of
 356                         * lu_object_free().
 357                         */
 358                        set_current_state(TASK_UNINTERRUPTIBLE);
 359                        spin_lock(&r0->lo_sub_lock);
 360                        if (r0->lo_sub[idx] == los) {
 361                                spin_unlock(&r0->lo_sub_lock);
 362                                schedule();
 363                        } else {
 364                                spin_unlock(&r0->lo_sub_lock);
 365                                set_current_state(TASK_RUNNING);
 366                                break;
 367                        }
 368                }
 369                remove_wait_queue(&bkt->lsb_marche_funebre, waiter);
 370        }
 371        LASSERT(!r0->lo_sub[idx]);
 372}
 373
 374static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
 375                            union lov_layout_state *state)
 376{
 377        struct lov_layout_raid0 *r0 = &state->raid0;
 378        struct lov_stripe_md    *lsm = lov->lo_lsm;
 379        int i;
 380
 381        dump_lsm(D_INODE, lsm);
 382
 383        lov_layout_wait(env, lov);
 384        if (r0->lo_sub) {
 385                for (i = 0; i < r0->lo_nr; ++i) {
 386                        struct lovsub_object *los = r0->lo_sub[i];
 387
 388                        if (los) {
 389                                cl_object_prune(env, &los->lso_cl);
 390                                /*
 391                                 * If top-level object is to be evicted from
 392                                 * the cache, so are its sub-objects.
 393                                 */
 394                                lov_subobject_kill(env, lov, los, i);
 395                        }
 396                }
 397        }
 398        return 0;
 399}
 400
 401static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov,
 402                           union lov_layout_state *state)
 403{
 404        LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
 405}
 406
 407static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
 408                           union lov_layout_state *state)
 409{
 410        struct lov_layout_raid0 *r0 = &state->raid0;
 411
 412        if (r0->lo_sub) {
 413                kvfree(r0->lo_sub);
 414                r0->lo_sub = NULL;
 415        }
 416
 417        dump_lsm(D_INODE, lov->lo_lsm);
 418        lov_free_memmd(&lov->lo_lsm);
 419}
 420
 421static void lov_fini_released(const struct lu_env *env, struct lov_object *lov,
 422                              union lov_layout_state *state)
 423{
 424        dump_lsm(D_INODE, lov->lo_lsm);
 425        lov_free_memmd(&lov->lo_lsm);
 426}
 427
 428static int lov_print_empty(const struct lu_env *env, void *cookie,
 429                           lu_printer_t p, const struct lu_object *o)
 430{
 431        (*p)(env, cookie, "empty %d\n", lu2lov(o)->lo_layout_invalid);
 432        return 0;
 433}
 434
 435static int lov_print_raid0(const struct lu_env *env, void *cookie,
 436                           lu_printer_t p, const struct lu_object *o)
 437{
 438        struct lov_object       *lov = lu2lov(o);
 439        struct lov_layout_raid0 *r0  = lov_r0(lov);
 440        struct lov_stripe_md    *lsm = lov->lo_lsm;
 441        int                      i;
 442
 443        (*p)(env, cookie, "stripes: %d, %s, lsm{%p 0x%08X %d %u %u}:\n",
 444             r0->lo_nr, lov->lo_layout_invalid ? "invalid" : "valid", lsm,
 445             lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
 446             lsm->lsm_stripe_count, lsm->lsm_layout_gen);
 447        for (i = 0; i < r0->lo_nr; ++i) {
 448                struct lu_object *sub;
 449
 450                if (r0->lo_sub[i]) {
 451                        sub = lovsub2lu(r0->lo_sub[i]);
 452                        lu_object_print(env, cookie, p, sub);
 453                } else {
 454                        (*p)(env, cookie, "sub %d absent\n", i);
 455                }
 456        }
 457        return 0;
 458}
 459
 460static int lov_print_released(const struct lu_env *env, void *cookie,
 461                              lu_printer_t p, const struct lu_object *o)
 462{
 463        struct lov_object       *lov = lu2lov(o);
 464        struct lov_stripe_md    *lsm = lov->lo_lsm;
 465
 466        (*p)(env, cookie,
 467             "released: %s, lsm{%p 0x%08X %d %u %u}:\n",
 468             lov->lo_layout_invalid ? "invalid" : "valid", lsm,
 469             lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
 470             lsm->lsm_stripe_count, lsm->lsm_layout_gen);
 471        return 0;
 472}
 473
 474/**
 475 * Implements cl_object_operations::coo_attr_get() method for an object
 476 * without stripes (LLT_EMPTY layout type).
 477 *
 478 * The only attributes this layer is authoritative in this case is
 479 * cl_attr::cat_blocks---it's 0.
 480 */
 481static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj,
 482                              struct cl_attr *attr)
 483{
 484        attr->cat_blocks = 0;
 485        return 0;
 486}
 487
 488static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj,
 489                              struct cl_attr *attr)
 490{
 491        struct lov_object       *lov = cl2lov(obj);
 492        struct lov_layout_raid0 *r0 = lov_r0(lov);
 493        struct cl_attr          *lov_attr = &r0->lo_attr;
 494        int                      result = 0;
 495
 496        /* this is called w/o holding type guard mutex, so it must be inside
 497         * an on going IO otherwise lsm may be replaced.
 498         * LU-2117: it turns out there exists one exception. For mmaped files,
 499         * the lock of those files may be requested in the other file's IO
 500         * context, and this function is called in ccc_lock_state(), it will
 501         * hit this assertion.
 502         * Anyway, it's still okay to call attr_get w/o type guard as layout
 503         * can't go if locks exist.
 504         */
 505        /* LASSERT(atomic_read(&lsm->lsm_refc) > 1); */
 506
 507        if (!r0->lo_attr_valid) {
 508                struct lov_stripe_md    *lsm = lov->lo_lsm;
 509                struct ost_lvb    *lvb = &lov_env_info(env)->lti_lvb;
 510                __u64               kms = 0;
 511
 512                memset(lvb, 0, sizeof(*lvb));
 513                /* XXX: timestamps can be negative by sanity:test_39m,
 514                 * how can it be?
 515                 */
 516                lvb->lvb_atime = LLONG_MIN;
 517                lvb->lvb_ctime = LLONG_MIN;
 518                lvb->lvb_mtime = LLONG_MIN;
 519
 520                /*
 521                 * XXX that should be replaced with a loop over sub-objects,
 522                 * doing cl_object_attr_get() on them. But for now, let's
 523                 * reuse old lov code.
 524                 */
 525
 526                /*
 527                 * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
 528                 * happy. It's not needed, because new code uses
 529                 * ->coh_attr_guard spin-lock to protect consistency of
 530                 * sub-object attributes.
 531                 */
 532                lov_stripe_lock(lsm);
 533                result = lov_merge_lvb_kms(lsm, lvb, &kms);
 534                lov_stripe_unlock(lsm);
 535                if (result == 0) {
 536                        cl_lvb2attr(lov_attr, lvb);
 537                        lov_attr->cat_kms = kms;
 538                        r0->lo_attr_valid = 1;
 539                }
 540        }
 541        if (result == 0) { /* merge results */
 542                attr->cat_blocks = lov_attr->cat_blocks;
 543                attr->cat_size = lov_attr->cat_size;
 544                attr->cat_kms = lov_attr->cat_kms;
 545                if (attr->cat_atime < lov_attr->cat_atime)
 546                        attr->cat_atime = lov_attr->cat_atime;
 547                if (attr->cat_ctime < lov_attr->cat_ctime)
 548                        attr->cat_ctime = lov_attr->cat_ctime;
 549                if (attr->cat_mtime < lov_attr->cat_mtime)
 550                        attr->cat_mtime = lov_attr->cat_mtime;
 551        }
 552        return result;
 553}
 554
 555static const struct lov_layout_operations lov_dispatch[] = {
 556        [LLT_EMPTY] = {
 557                .llo_init      = lov_init_empty,
 558                .llo_delete    = lov_delete_empty,
 559                .llo_fini      = lov_fini_empty,
 560                .llo_install   = lov_install_empty,
 561                .llo_print     = lov_print_empty,
 562                .llo_page_init = lov_page_init_empty,
 563                .llo_lock_init = lov_lock_init_empty,
 564                .llo_io_init   = lov_io_init_empty,
 565                .llo_getattr   = lov_attr_get_empty
 566        },
 567        [LLT_RAID0] = {
 568                .llo_init      = lov_init_raid0,
 569                .llo_delete    = lov_delete_raid0,
 570                .llo_fini      = lov_fini_raid0,
 571                .llo_install   = lov_install_raid0,
 572                .llo_print     = lov_print_raid0,
 573                .llo_page_init = lov_page_init_raid0,
 574                .llo_lock_init = lov_lock_init_raid0,
 575                .llo_io_init   = lov_io_init_raid0,
 576                .llo_getattr   = lov_attr_get_raid0
 577        },
 578        [LLT_RELEASED] = {
 579                .llo_init      = lov_init_released,
 580                .llo_delete    = lov_delete_empty,
 581                .llo_fini      = lov_fini_released,
 582                .llo_install   = lov_install_empty,
 583                .llo_print     = lov_print_released,
 584                .llo_page_init = lov_page_init_empty,
 585                .llo_lock_init = lov_lock_init_empty,
 586                .llo_io_init   = lov_io_init_released,
 587                .llo_getattr   = lov_attr_get_empty
 588        }
 589};
 590
 591/**
 592 * Performs a double-dispatch based on the layout type of an object.
 593 */
 594#define LOV_2DISPATCH_NOLOCK(obj, op, ...)                            \
 595({                                                                    \
 596        struct lov_object                     *__obj = (obj);     \
 597        enum lov_layout_type                __llt;                \
 598                                                                        \
 599        __llt = __obj->lo_type;                                  \
 600        LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
 601        lov_dispatch[__llt].op(__VA_ARGS__);                        \
 602})
 603
 604/**
 605 * Return lov_layout_type associated with a given lsm
 606 */
 607static enum lov_layout_type lov_type(struct lov_stripe_md *lsm)
 608{
 609        if (!lsm)
 610                return LLT_EMPTY;
 611        if (lsm_is_released(lsm))
 612                return LLT_RELEASED;
 613        return LLT_RAID0;
 614}
 615
 616static inline void lov_conf_freeze(struct lov_object *lov)
 617{
 618        if (lov->lo_owner != current)
 619                down_read(&lov->lo_type_guard);
 620}
 621
 622static inline void lov_conf_thaw(struct lov_object *lov)
 623{
 624        if (lov->lo_owner != current)
 625                up_read(&lov->lo_type_guard);
 626}
 627
 628#define LOV_2DISPATCH_MAYLOCK(obj, op, lock, ...)                      \
 629({                                                                    \
 630        struct lov_object                     *__obj = (obj);     \
 631        int                                  __lock = !!(lock);      \
 632        typeof(lov_dispatch[0].op(__VA_ARGS__)) __result;              \
 633                                                                        \
 634        if (__lock)                                                  \
 635                lov_conf_freeze(__obj);                                 \
 636        __result = LOV_2DISPATCH_NOLOCK(obj, op, __VA_ARGS__);    \
 637        if (__lock)                                                  \
 638                lov_conf_thaw(__obj);                                   \
 639        __result;                                                      \
 640})
 641
 642/**
 643 * Performs a locked double-dispatch based on the layout type of an object.
 644 */
 645#define LOV_2DISPATCH(obj, op, ...)                  \
 646        LOV_2DISPATCH_MAYLOCK(obj, op, 1, __VA_ARGS__)
 647
 648#define LOV_2DISPATCH_VOID(obj, op, ...)                                \
 649do {                                                                \
 650        struct lov_object                     *__obj = (obj);     \
 651        enum lov_layout_type                __llt;                \
 652                                                                        \
 653        lov_conf_freeze(__obj);                                         \
 654        __llt = __obj->lo_type;                                  \
 655        LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
 656        lov_dispatch[__llt].op(__VA_ARGS__);                        \
 657        lov_conf_thaw(__obj);                                           \
 658} while (0)
 659
 660static void lov_conf_lock(struct lov_object *lov)
 661{
 662        LASSERT(lov->lo_owner != current);
 663        down_write(&lov->lo_type_guard);
 664        LASSERT(!lov->lo_owner);
 665        lov->lo_owner = current;
 666}
 667
 668static void lov_conf_unlock(struct lov_object *lov)
 669{
 670        lov->lo_owner = NULL;
 671        up_write(&lov->lo_type_guard);
 672}
 673
 674static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
 675{
 676        struct l_wait_info lwi = { 0 };
 677
 678        while (atomic_read(&lov->lo_active_ios) > 0) {
 679                CDEBUG(D_INODE, "file:" DFID " wait for active IO, now: %d.\n",
 680                       PFID(lu_object_fid(lov2lu(lov))),
 681                       atomic_read(&lov->lo_active_ios));
 682
 683                l_wait_event(lov->lo_waitq,
 684                             atomic_read(&lov->lo_active_ios) == 0, &lwi);
 685        }
 686        return 0;
 687}
 688
 689static int lov_layout_change(const struct lu_env *unused,
 690                             struct lov_object *lov,
 691                             const struct cl_object_conf *conf)
 692{
 693        int result;
 694        enum lov_layout_type llt = LLT_EMPTY;
 695        union lov_layout_state *state = &lov->u;
 696        const struct lov_layout_operations *old_ops;
 697        const struct lov_layout_operations *new_ops;
 698
 699        void *cookie;
 700        struct lu_env *env;
 701        int refcheck;
 702
 703        LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch));
 704
 705        if (conf->u.coc_md)
 706                llt = lov_type(conf->u.coc_md->lsm);
 707        LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
 708
 709        cookie = cl_env_reenter();
 710        env = cl_env_get(&refcheck);
 711        if (IS_ERR(env)) {
 712                cl_env_reexit(cookie);
 713                return PTR_ERR(env);
 714        }
 715
 716        CDEBUG(D_INODE, DFID" from %s to %s\n",
 717               PFID(lu_object_fid(lov2lu(lov))),
 718               llt2str(lov->lo_type), llt2str(llt));
 719
 720        old_ops = &lov_dispatch[lov->lo_type];
 721        new_ops = &lov_dispatch[llt];
 722
 723        result = cl_object_prune(env, &lov->lo_cl);
 724        if (result != 0)
 725                goto out;
 726
 727        result = old_ops->llo_delete(env, lov, &lov->u);
 728        if (result == 0) {
 729                old_ops->llo_fini(env, lov, &lov->u);
 730
 731                LASSERT(atomic_read(&lov->lo_active_ios) == 0);
 732
 733                lov->lo_type = LLT_EMPTY;
 734                /* page bufsize fixup */
 735                cl_object_header(&lov->lo_cl)->coh_page_bufsize -=
 736                        lov_page_slice_fixup(lov, NULL);
 737
 738                result = new_ops->llo_init(env,
 739                                        lu2lov_dev(lov->lo_cl.co_lu.lo_dev),
 740                                        lov, conf, state);
 741                if (result == 0) {
 742                        new_ops->llo_install(env, lov, state);
 743                        lov->lo_type = llt;
 744                } else {
 745                        new_ops->llo_delete(env, lov, state);
 746                        new_ops->llo_fini(env, lov, state);
 747                        /* this file becomes an EMPTY file. */
 748                }
 749        }
 750
 751out:
 752        cl_env_put(env, &refcheck);
 753        cl_env_reexit(cookie);
 754        return result;
 755}
 756
 757/*****************************************************************************
 758 *
 759 * Lov object operations.
 760 *
 761 */
 762int lov_object_init(const struct lu_env *env, struct lu_object *obj,
 763                    const struct lu_object_conf *conf)
 764{
 765        struct lov_device           *dev   = lu2lov_dev(obj->lo_dev);
 766        struct lov_object           *lov   = lu2lov(obj);
 767        const struct cl_object_conf  *cconf = lu2cl_conf(conf);
 768        union  lov_layout_state      *set   = &lov->u;
 769        const struct lov_layout_operations *ops;
 770        int result;
 771
 772        init_rwsem(&lov->lo_type_guard);
 773        atomic_set(&lov->lo_active_ios, 0);
 774        init_waitqueue_head(&lov->lo_waitq);
 775
 776        cl_object_page_init(lu2cl(obj), sizeof(struct lov_page));
 777
 778        /* no locking is necessary, as object is being created */
 779        lov->lo_type = lov_type(cconf->u.coc_md->lsm);
 780        ops = &lov_dispatch[lov->lo_type];
 781        result = ops->llo_init(env, dev, lov, cconf, set);
 782        if (result == 0)
 783                ops->llo_install(env, lov, set);
 784        return result;
 785}
 786
 787static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
 788                        const struct cl_object_conf *conf)
 789{
 790        struct lov_stripe_md    *lsm = NULL;
 791        struct lov_object       *lov = cl2lov(obj);
 792        int                      result = 0;
 793
 794        lov_conf_lock(lov);
 795        if (conf->coc_opc == OBJECT_CONF_INVALIDATE) {
 796                lov->lo_layout_invalid = true;
 797                result = 0;
 798                goto out;
 799        }
 800
 801        if (conf->coc_opc == OBJECT_CONF_WAIT) {
 802                if (lov->lo_layout_invalid &&
 803                    atomic_read(&lov->lo_active_ios) > 0) {
 804                        lov_conf_unlock(lov);
 805                        result = lov_layout_wait(env, lov);
 806                        lov_conf_lock(lov);
 807                }
 808                goto out;
 809        }
 810
 811        LASSERT(conf->coc_opc == OBJECT_CONF_SET);
 812
 813        if (conf->u.coc_md)
 814                lsm = conf->u.coc_md->lsm;
 815        if ((!lsm && !lov->lo_lsm) ||
 816            ((lsm && lov->lo_lsm) &&
 817             (lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen) &&
 818             (lov->lo_lsm->lsm_pattern == lsm->lsm_pattern))) {
 819                /* same version of layout */
 820                lov->lo_layout_invalid = false;
 821                result = 0;
 822                goto out;
 823        }
 824
 825        /* will change layout - check if there still exists active IO. */
 826        if (atomic_read(&lov->lo_active_ios) > 0) {
 827                lov->lo_layout_invalid = true;
 828                result = -EBUSY;
 829                goto out;
 830        }
 831
 832        result = lov_layout_change(env, lov, conf);
 833        lov->lo_layout_invalid = result != 0;
 834
 835out:
 836        lov_conf_unlock(lov);
 837        CDEBUG(D_INODE, DFID" lo_layout_invalid=%d\n",
 838               PFID(lu_object_fid(lov2lu(lov))), lov->lo_layout_invalid);
 839        return result;
 840}
 841
 842static void lov_object_delete(const struct lu_env *env, struct lu_object *obj)
 843{
 844        struct lov_object *lov = lu2lov(obj);
 845
 846        LOV_2DISPATCH_VOID(lov, llo_delete, env, lov, &lov->u);
 847}
 848
 849static void lov_object_free(const struct lu_env *env, struct lu_object *obj)
 850{
 851        struct lov_object *lov = lu2lov(obj);
 852
 853        LOV_2DISPATCH_VOID(lov, llo_fini, env, lov, &lov->u);
 854        lu_object_fini(obj);
 855        kmem_cache_free(lov_object_kmem, lov);
 856}
 857
 858static int lov_object_print(const struct lu_env *env, void *cookie,
 859                            lu_printer_t p, const struct lu_object *o)
 860{
 861        return LOV_2DISPATCH_NOLOCK(lu2lov(o), llo_print, env, cookie, p, o);
 862}
 863
 864int lov_page_init(const struct lu_env *env, struct cl_object *obj,
 865                  struct cl_page *page, pgoff_t index)
 866{
 867        return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_page_init, env, obj, page,
 868                                    index);
 869}
 870
 871/**
 872 * Implements cl_object_operations::clo_io_init() method for lov
 873 * layer. Dispatches to the appropriate layout io initialization method.
 874 */
 875int lov_io_init(const struct lu_env *env, struct cl_object *obj,
 876                struct cl_io *io)
 877{
 878        CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
 879        return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init,
 880                                     !io->ci_ignore_layout, env, obj, io);
 881}
 882
 883/**
 884 * An implementation of cl_object_operations::clo_attr_get() method for lov
 885 * layer. For raid0 layout this collects and merges attributes of all
 886 * sub-objects.
 887 */
 888static int lov_attr_get(const struct lu_env *env, struct cl_object *obj,
 889                        struct cl_attr *attr)
 890{
 891        /* do not take lock, as this function is called under a
 892         * spin-lock. Layout is protected from changing by ongoing IO.
 893         */
 894        return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_getattr, env, obj, attr);
 895}
 896
 897static int lov_attr_update(const struct lu_env *env, struct cl_object *obj,
 898                           const struct cl_attr *attr, unsigned int valid)
 899{
 900        /*
 901         * No dispatch is required here, as no layout implements this.
 902         */
 903        return 0;
 904}
 905
 906int lov_lock_init(const struct lu_env *env, struct cl_object *obj,
 907                  struct cl_lock *lock, const struct cl_io *io)
 908{
 909        /* No need to lock because we've taken one refcount of layout.  */
 910        return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_lock_init, env, obj, lock,
 911                                    io);
 912}
 913
 914static int lov_object_getstripe(const struct lu_env *env, struct cl_object *obj,
 915                                struct lov_user_md __user *lum)
 916{
 917        struct lov_object *lov = cl2lov(obj);
 918        struct lov_stripe_md *lsm;
 919        int rc = 0;
 920
 921        lsm = lov_lsm_addref(lov);
 922        if (!lsm)
 923                return -ENODATA;
 924
 925        rc = lov_getstripe(cl2lov(obj), lsm, lum);
 926        lov_lsm_put(obj, lsm);
 927        return rc;
 928}
 929
 930static const struct cl_object_operations lov_ops = {
 931        .coo_page_init = lov_page_init,
 932        .coo_lock_init = lov_lock_init,
 933        .coo_io_init   = lov_io_init,
 934        .coo_attr_get  = lov_attr_get,
 935        .coo_attr_update = lov_attr_update,
 936        .coo_conf_set  = lov_conf_set,
 937        .coo_getstripe = lov_object_getstripe
 938};
 939
 940static const struct lu_object_operations lov_lu_obj_ops = {
 941        .loo_object_init      = lov_object_init,
 942        .loo_object_delete    = lov_object_delete,
 943        .loo_object_release   = NULL,
 944        .loo_object_free      = lov_object_free,
 945        .loo_object_print     = lov_object_print,
 946        .loo_object_invariant = NULL
 947};
 948
 949struct lu_object *lov_object_alloc(const struct lu_env *env,
 950                                   const struct lu_object_header *unused,
 951                                   struct lu_device *dev)
 952{
 953        struct lov_object *lov;
 954        struct lu_object  *obj;
 955
 956        lov = kmem_cache_zalloc(lov_object_kmem, GFP_NOFS);
 957        if (lov) {
 958                obj = lov2lu(lov);
 959                lu_object_init(obj, NULL, dev);
 960                lov->lo_cl.co_ops = &lov_ops;
 961                lov->lo_type = -1; /* invalid, to catch uninitialized type */
 962                /*
 963                 * object io operation vector (cl_object::co_iop) is installed
 964                 * later in lov_object_init(), as different vectors are used
 965                 * for object with different layouts.
 966                 */
 967                obj->lo_ops = &lov_lu_obj_ops;
 968        } else {
 969                obj = NULL;
 970        }
 971        return obj;
 972}
 973
 974struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
 975{
 976        struct lov_stripe_md *lsm = NULL;
 977
 978        lov_conf_freeze(lov);
 979        if (lov->lo_lsm) {
 980                lsm = lsm_addref(lov->lo_lsm);
 981                CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
 982                       lsm, atomic_read(&lsm->lsm_refc),
 983                       lov->lo_layout_invalid, current);
 984        }
 985        lov_conf_thaw(lov);
 986        return lsm;
 987}
 988
 989struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj)
 990{
 991        struct lu_object *luobj;
 992        struct lov_stripe_md *lsm = NULL;
 993
 994        if (!clobj)
 995                return NULL;
 996
 997        luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu,
 998                                 &lov_device_type);
 999        if (luobj)
1000                lsm = lov_lsm_addref(lu2lov(luobj));
1001        return lsm;
1002}
1003EXPORT_SYMBOL(lov_lsm_get);
1004
1005int lov_read_and_clear_async_rc(struct cl_object *clob)
1006{
1007        struct lu_object *luobj;
1008        int rc = 0;
1009
1010        luobj = lu_object_locate(&cl_object_header(clob)->coh_lu,
1011                                 &lov_device_type);
1012        if (luobj) {
1013                struct lov_object *lov = lu2lov(luobj);
1014
1015                lov_conf_freeze(lov);
1016                switch (lov->lo_type) {
1017                case LLT_RAID0: {
1018                        struct lov_stripe_md *lsm;
1019                        int i;
1020
1021                        lsm = lov->lo_lsm;
1022                        for (i = 0; i < lsm->lsm_stripe_count; i++) {
1023                                struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1024
1025                                if (lov_oinfo_is_dummy(loi))
1026                                        continue;
1027
1028                                if (loi->loi_ar.ar_rc && !rc)
1029                                        rc = loi->loi_ar.ar_rc;
1030                                loi->loi_ar.ar_rc = 0;
1031                        }
1032                }
1033                case LLT_RELEASED:
1034                case LLT_EMPTY:
1035                        break;
1036                default:
1037                        LBUG();
1038                }
1039                lov_conf_thaw(lov);
1040        }
1041        return rc;
1042}
1043EXPORT_SYMBOL(lov_read_and_clear_async_rc);
1044
1045/** @} lov */
1046