linux/drivers/staging/lustre/lustre/obdclass/local_storage.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9
  10 * This program is distributed in the hope that it will be useful,
  11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  13 * GNU General Public License version 2 for more details.  A copy is
  14 * included in the COPYING file that accompanied this code.
  15
  16 * You should have received a copy of the GNU General Public License
  17 * along with this program; if not, write to the Free Software
  18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  19 *
  20 * GPL HEADER END
  21 */
  22/*
  23 * Copyright (c) 2012, Intel Corporation.
  24 */
  25/*
  26 * lustre/obdclass/local_storage.c
  27 *
  28 * Local storage for file/objects with fid generation. Works on top of OSD.
  29 *
  30 * Author: Mikhail Pershin <mike.pershin@intel.com>
  31 */
  32
  33#define DEBUG_SUBSYSTEM S_CLASS
  34
  35#include "local_storage.h"
  36
  37/* all initialized local storages on this node are linked on this */
  38static LIST_HEAD(ls_list_head);
  39static DEFINE_MUTEX(ls_list_mutex);
  40
  41static int ls_object_init(const struct lu_env *env, struct lu_object *o,
  42                          const struct lu_object_conf *unused)
  43{
  44        struct ls_device        *ls;
  45        struct lu_object        *below;
  46        struct lu_device        *under;
  47
  48        ls = container_of0(o->lo_dev, struct ls_device, ls_top_dev.dd_lu_dev);
  49        under = &ls->ls_osd->dd_lu_dev;
  50        below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
  51        if (below == NULL)
  52                return -ENOMEM;
  53
  54        lu_object_add(o, below);
  55
  56        return 0;
  57}
  58
  59static void ls_object_free(const struct lu_env *env, struct lu_object *o)
  60{
  61        struct ls_object        *obj = lu2ls_obj(o);
  62        struct lu_object_header *h = o->lo_header;
  63
  64        dt_object_fini(&obj->ls_obj);
  65        lu_object_header_fini(h);
  66        OBD_FREE_PTR(obj);
  67}
  68
  69struct lu_object_operations ls_lu_obj_ops = {
  70        .loo_object_init  = ls_object_init,
  71        .loo_object_free  = ls_object_free,
  72};
  73
  74struct lu_object *ls_object_alloc(const struct lu_env *env,
  75                                  const struct lu_object_header *_h,
  76                                  struct lu_device *d)
  77{
  78        struct lu_object_header *h;
  79        struct ls_object        *o;
  80        struct lu_object        *l;
  81
  82        LASSERT(_h == NULL);
  83
  84        OBD_ALLOC_PTR(o);
  85        if (o != NULL) {
  86                l = &o->ls_obj.do_lu;
  87                h = &o->ls_header;
  88
  89                lu_object_header_init(h);
  90                dt_object_init(&o->ls_obj, h, d);
  91                lu_object_add_top(h, l);
  92
  93                l->lo_ops = &ls_lu_obj_ops;
  94
  95                return l;
  96        } else {
  97                return NULL;
  98        }
  99}
 100
 101static struct lu_device_operations ls_lu_dev_ops = {
 102        .ldo_object_alloc =     ls_object_alloc
 103};
 104
 105static struct ls_device *__ls_find_dev(struct dt_device *dev)
 106{
 107        struct ls_device *ls, *ret = NULL;
 108
 109        list_for_each_entry(ls, &ls_list_head, ls_linkage) {
 110                if (ls->ls_osd == dev) {
 111                        atomic_inc(&ls->ls_refcount);
 112                        ret = ls;
 113                        break;
 114                }
 115        }
 116        return ret;
 117}
 118
 119struct ls_device *ls_find_dev(struct dt_device *dev)
 120{
 121        struct ls_device *ls;
 122
 123        mutex_lock(&ls_list_mutex);
 124        ls = __ls_find_dev(dev);
 125        mutex_unlock(&ls_list_mutex);
 126
 127        return ls;
 128}
 129
 130static struct lu_device_type_operations ls_device_type_ops = {
 131        .ldto_start = NULL,
 132        .ldto_stop  = NULL,
 133};
 134
 135static struct lu_device_type ls_lu_type = {
 136        .ldt_name = "local_storage",
 137        .ldt_ops  = &ls_device_type_ops,
 138};
 139
 140struct ls_device *ls_device_get(struct dt_device *dev)
 141{
 142        struct ls_device *ls;
 143
 144        mutex_lock(&ls_list_mutex);
 145        ls = __ls_find_dev(dev);
 146        if (ls)
 147                GOTO(out_ls, ls);
 148
 149        /* not found, then create */
 150        OBD_ALLOC_PTR(ls);
 151        if (ls == NULL)
 152                GOTO(out_ls, ls = ERR_PTR(-ENOMEM));
 153
 154        atomic_set(&ls->ls_refcount, 1);
 155        INIT_LIST_HEAD(&ls->ls_los_list);
 156        mutex_init(&ls->ls_los_mutex);
 157
 158        ls->ls_osd = dev;
 159
 160        LASSERT(dev->dd_lu_dev.ld_site);
 161        lu_device_init(&ls->ls_top_dev.dd_lu_dev, &ls_lu_type);
 162        ls->ls_top_dev.dd_lu_dev.ld_ops = &ls_lu_dev_ops;
 163        ls->ls_top_dev.dd_lu_dev.ld_site = dev->dd_lu_dev.ld_site;
 164
 165        /* finally add ls to the list */
 166        list_add(&ls->ls_linkage, &ls_list_head);
 167out_ls:
 168        mutex_unlock(&ls_list_mutex);
 169        return ls;
 170}
 171
 172void ls_device_put(const struct lu_env *env, struct ls_device *ls)
 173{
 174        LASSERT(env);
 175        if (!atomic_dec_and_test(&ls->ls_refcount))
 176                return;
 177
 178        mutex_lock(&ls_list_mutex);
 179        if (atomic_read(&ls->ls_refcount) == 0) {
 180                LASSERT(list_empty(&ls->ls_los_list));
 181                list_del(&ls->ls_linkage);
 182                lu_site_purge(env, ls->ls_top_dev.dd_lu_dev.ld_site, ~0);
 183                lu_device_fini(&ls->ls_top_dev.dd_lu_dev);
 184                OBD_FREE_PTR(ls);
 185        }
 186        mutex_unlock(&ls_list_mutex);
 187}
 188
 189/**
 190 * local file fid generation
 191 */
 192int local_object_fid_generate(const struct lu_env *env,
 193                              struct local_oid_storage *los,
 194                              struct lu_fid *fid)
 195{
 196        LASSERT(los->los_dev);
 197        LASSERT(los->los_obj);
 198
 199        /* take next OID */
 200
 201        /* to make it unique after reboot we store
 202         * the latest generated fid atomically with
 203         * object creation see local_object_create() */
 204
 205        mutex_lock(&los->los_id_lock);
 206        fid->f_seq = los->los_seq;
 207        fid->f_oid = ++los->los_last_oid;
 208        fid->f_ver = 0;
 209        mutex_unlock(&los->los_id_lock);
 210
 211        return 0;
 212}
 213
 214int local_object_declare_create(const struct lu_env *env,
 215                                struct local_oid_storage *los,
 216                                struct dt_object *o, struct lu_attr *attr,
 217                                struct dt_object_format *dof,
 218                                struct thandle *th)
 219{
 220        struct dt_thread_info   *dti = dt_info(env);
 221        int                      rc;
 222
 223        /* update fid generation file */
 224        if (los != NULL) {
 225                LASSERT(dt_object_exists(los->los_obj));
 226                rc = dt_declare_record_write(env, los->los_obj,
 227                                             sizeof(struct los_ondisk), 0, th);
 228                if (rc)
 229                        return rc;
 230        }
 231
 232        rc = dt_declare_create(env, o, attr, NULL, dof, th);
 233        if (rc)
 234                return rc;
 235
 236        dti->dti_lb.lb_buf = NULL;
 237        dti->dti_lb.lb_len = sizeof(dti->dti_lma);
 238        rc = dt_declare_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA, 0, th);
 239
 240        return rc;
 241}
 242
 243int local_object_create(const struct lu_env *env,
 244                        struct local_oid_storage *los,
 245                        struct dt_object *o, struct lu_attr *attr,
 246                        struct dt_object_format *dof, struct thandle *th)
 247{
 248        struct dt_thread_info   *dti = dt_info(env);
 249        obd_id                   lastid;
 250        int                      rc;
 251
 252        rc = dt_create(env, o, attr, NULL, dof, th);
 253        if (rc)
 254                return rc;
 255
 256        if (los == NULL)
 257                return rc;
 258
 259        LASSERT(los->los_obj);
 260        LASSERT(dt_object_exists(los->los_obj));
 261
 262        /* many threads can be updated this, serialize
 263         * them here to avoid the race where one thread
 264         * takes the value first, but writes it last */
 265        mutex_lock(&los->los_id_lock);
 266
 267        /* update local oid number on disk so that
 268         * we know the last one used after reboot */
 269        lastid = cpu_to_le64(los->los_last_oid);
 270
 271        dti->dti_off = 0;
 272        dti->dti_lb.lb_buf = &lastid;
 273        dti->dti_lb.lb_len = sizeof(lastid);
 274        rc = dt_record_write(env, los->los_obj, &dti->dti_lb, &dti->dti_off,
 275                             th);
 276        mutex_unlock(&los->los_id_lock);
 277
 278        return rc;
 279}
 280
 281/*
 282 * Create local named object (file, directory or index) in parent directory.
 283 */
 284struct dt_object *__local_file_create(const struct lu_env *env,
 285                                      const struct lu_fid *fid,
 286                                      struct local_oid_storage *los,
 287                                      struct ls_device *ls,
 288                                      struct dt_object *parent,
 289                                      const char *name, struct lu_attr *attr,
 290                                      struct dt_object_format *dof)
 291{
 292        struct dt_thread_info   *dti = dt_info(env);
 293        struct dt_object        *dto;
 294        struct thandle          *th;
 295        int                      rc;
 296
 297        dto = ls_locate(env, ls, fid);
 298        if (unlikely(IS_ERR(dto)))
 299                return dto;
 300
 301        LASSERT(dto != NULL);
 302        if (dt_object_exists(dto))
 303                GOTO(out, rc = -EEXIST);
 304
 305        th = dt_trans_create(env, ls->ls_osd);
 306        if (IS_ERR(th))
 307                GOTO(out, rc = PTR_ERR(th));
 308
 309        rc = local_object_declare_create(env, los, dto, attr, dof, th);
 310        if (rc)
 311                GOTO(trans_stop, rc);
 312
 313        if (dti->dti_dof.dof_type == DFT_DIR) {
 314                dt_declare_ref_add(env, dto, th);
 315                dt_declare_ref_add(env, parent, th);
 316        }
 317
 318        rc = dt_declare_insert(env, parent, (void *)fid, (void *)name, th);
 319        if (rc)
 320                GOTO(trans_stop, rc);
 321
 322        rc = dt_trans_start_local(env, ls->ls_osd, th);
 323        if (rc)
 324                GOTO(trans_stop, rc);
 325
 326        dt_write_lock(env, dto, 0);
 327        if (dt_object_exists(dto))
 328                GOTO(unlock, rc = 0);
 329
 330        CDEBUG(D_OTHER, "create new object "DFID"\n",
 331               PFID(lu_object_fid(&dto->do_lu)));
 332        rc = local_object_create(env, los, dto, attr, dof, th);
 333        if (rc)
 334                GOTO(unlock, rc);
 335        LASSERT(dt_object_exists(dto));
 336
 337        if (dti->dti_dof.dof_type == DFT_DIR) {
 338                if (!dt_try_as_dir(env, dto))
 339                        GOTO(destroy, rc = -ENOTDIR);
 340                /* Add "." and ".." for newly created dir */
 341                rc = dt_insert(env, dto, (void *)fid, (void *)".", th,
 342                               BYPASS_CAPA, 1);
 343                if (rc)
 344                        GOTO(destroy, rc);
 345                dt_ref_add(env, dto, th);
 346                rc = dt_insert(env, dto, (void *)lu_object_fid(&parent->do_lu),
 347                               (void *)"..", th, BYPASS_CAPA, 1);
 348                if (rc)
 349                        GOTO(destroy, rc);
 350        }
 351
 352        dt_write_lock(env, parent, 0);
 353        rc = dt_insert(env, parent, (const struct dt_rec *)fid,
 354                       (const struct dt_key *)name, th, BYPASS_CAPA, 1);
 355        if (dti->dti_dof.dof_type == DFT_DIR)
 356                dt_ref_add(env, parent, th);
 357        dt_write_unlock(env, parent);
 358        if (rc)
 359                GOTO(destroy, rc);
 360destroy:
 361        if (rc)
 362                dt_destroy(env, dto, th);
 363unlock:
 364        dt_write_unlock(env, dto);
 365trans_stop:
 366        dt_trans_stop(env, ls->ls_osd, th);
 367out:
 368        if (rc) {
 369                lu_object_put_nocache(env, &dto->do_lu);
 370                dto = ERR_PTR(rc);
 371        }
 372        return dto;
 373}
 374
 375/*
 376 * Look up and create (if it does not exist) a local named file or directory in
 377 * parent directory.
 378 */
 379struct dt_object *local_file_find_or_create(const struct lu_env *env,
 380                                            struct local_oid_storage *los,
 381                                            struct dt_object *parent,
 382                                            const char *name, __u32 mode)
 383{
 384        struct dt_thread_info   *dti = dt_info(env);
 385        struct dt_object        *dto;
 386        int                      rc;
 387
 388        LASSERT(parent);
 389
 390        rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
 391        if (rc == 0)
 392                /* name is found, get the object */
 393                dto = ls_locate(env, dt2ls_dev(los->los_dev), &dti->dti_fid);
 394        else if (rc != -ENOENT)
 395                dto = ERR_PTR(rc);
 396        else {
 397                rc = local_object_fid_generate(env, los, &dti->dti_fid);
 398                if (rc < 0) {
 399                        dto = ERR_PTR(rc);
 400                } else {
 401                        /* create the object */
 402                        dti->dti_attr.la_valid  = LA_MODE;
 403                        dti->dti_attr.la_mode   = mode;
 404                        dti->dti_dof.dof_type   = dt_mode_to_dft(mode & S_IFMT);
 405                        dto = __local_file_create(env, &dti->dti_fid, los,
 406                                                  dt2ls_dev(los->los_dev),
 407                                                  parent, name, &dti->dti_attr,
 408                                                  &dti->dti_dof);
 409                }
 410        }
 411        return dto;
 412}
 413EXPORT_SYMBOL(local_file_find_or_create);
 414
 415struct dt_object *local_file_find_or_create_with_fid(const struct lu_env *env,
 416                                                     struct dt_device *dt,
 417                                                     const struct lu_fid *fid,
 418                                                     struct dt_object *parent,
 419                                                     const char *name,
 420                                                     __u32 mode)
 421{
 422        struct dt_thread_info   *dti = dt_info(env);
 423        struct dt_object        *dto;
 424        int                      rc;
 425
 426        LASSERT(parent);
 427
 428        rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
 429        if (rc == 0) {
 430                dto = dt_locate(env, dt, &dti->dti_fid);
 431        } else if (rc != -ENOENT) {
 432                dto = ERR_PTR(rc);
 433        } else {
 434                struct ls_device *ls;
 435
 436                ls = ls_device_get(dt);
 437                if (IS_ERR(ls)) {
 438                        dto = ERR_CAST(ls);
 439                } else {
 440                        /* create the object */
 441                        dti->dti_attr.la_valid  = LA_MODE;
 442                        dti->dti_attr.la_mode   = mode;
 443                        dti->dti_dof.dof_type   = dt_mode_to_dft(mode & S_IFMT);
 444                        dto = __local_file_create(env, fid, NULL, ls, parent,
 445                                                  name, &dti->dti_attr,
 446                                                  &dti->dti_dof);
 447                        /* ls_device_put() will finalize the ls device, we
 448                         * have to open the object in other device stack */
 449                        if (!IS_ERR(dto)) {
 450                                dti->dti_fid = dto->do_lu.lo_header->loh_fid;
 451                                lu_object_put_nocache(env, &dto->do_lu);
 452                                dto = dt_locate(env, dt, &dti->dti_fid);
 453                        }
 454                        ls_device_put(env, ls);
 455                }
 456        }
 457        return dto;
 458}
 459EXPORT_SYMBOL(local_file_find_or_create_with_fid);
 460
 461/*
 462 * Look up and create (if it does not exist) a local named index file in parent
 463 * directory.
 464 */
 465struct dt_object *local_index_find_or_create(const struct lu_env *env,
 466                                             struct local_oid_storage *los,
 467                                             struct dt_object *parent,
 468                                             const char *name, __u32 mode,
 469                                             const struct dt_index_features *ft)
 470{
 471        struct dt_thread_info   *dti = dt_info(env);
 472        struct dt_object        *dto;
 473        int                      rc;
 474
 475        LASSERT(parent);
 476
 477        rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
 478        if (rc == 0) {
 479                /* name is found, get the object */
 480                dto = ls_locate(env, dt2ls_dev(los->los_dev), &dti->dti_fid);
 481        } else if (rc != -ENOENT) {
 482                dto = ERR_PTR(rc);
 483        } else {
 484                rc = local_object_fid_generate(env, los, &dti->dti_fid);
 485                if (rc < 0) {
 486                        dto = ERR_PTR(rc);
 487                } else {
 488                        /* create the object */
 489                        dti->dti_attr.la_valid          = LA_MODE;
 490                        dti->dti_attr.la_mode           = mode;
 491                        dti->dti_dof.dof_type           = DFT_INDEX;
 492                        dti->dti_dof.u.dof_idx.di_feat  = ft;
 493                        dto = __local_file_create(env, &dti->dti_fid, los,
 494                                                  dt2ls_dev(los->los_dev),
 495                                                  parent, name, &dti->dti_attr,
 496                                                  &dti->dti_dof);
 497                }
 498        }
 499        return dto;
 500
 501}
 502EXPORT_SYMBOL(local_index_find_or_create);
 503
 504struct dt_object *
 505local_index_find_or_create_with_fid(const struct lu_env *env,
 506                                    struct dt_device *dt,
 507                                    const struct lu_fid *fid,
 508                                    struct dt_object *parent,
 509                                    const char *name, __u32 mode,
 510                                    const struct dt_index_features *ft)
 511{
 512        struct dt_thread_info   *dti = dt_info(env);
 513        struct dt_object        *dto;
 514        int                      rc;
 515
 516        LASSERT(parent);
 517
 518        rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
 519        if (rc == 0) {
 520                /* name is found, get the object */
 521                if (!lu_fid_eq(fid, &dti->dti_fid))
 522                        dto = ERR_PTR(-EINVAL);
 523                else
 524                        dto = dt_locate(env, dt, fid);
 525        } else if (rc != -ENOENT) {
 526                dto = ERR_PTR(rc);
 527        } else {
 528                struct ls_device *ls;
 529
 530                ls = ls_device_get(dt);
 531                if (IS_ERR(ls)) {
 532                        dto = ERR_CAST(ls);
 533                } else {
 534                        /* create the object */
 535                        dti->dti_attr.la_valid          = LA_MODE;
 536                        dti->dti_attr.la_mode           = mode;
 537                        dti->dti_dof.dof_type           = DFT_INDEX;
 538                        dti->dti_dof.u.dof_idx.di_feat  = ft;
 539                        dto = __local_file_create(env, fid, NULL, ls, parent,
 540                                                  name, &dti->dti_attr,
 541                                                  &dti->dti_dof);
 542                        /* ls_device_put() will finalize the ls device, we
 543                         * have to open the object in other device stack */
 544                        if (!IS_ERR(dto)) {
 545                                dti->dti_fid = dto->do_lu.lo_header->loh_fid;
 546                                lu_object_put_nocache(env, &dto->do_lu);
 547                                dto = dt_locate(env, dt, &dti->dti_fid);
 548                        }
 549                        ls_device_put(env, ls);
 550                }
 551        }
 552        return dto;
 553}
 554EXPORT_SYMBOL(local_index_find_or_create_with_fid);
 555
 556static int local_object_declare_unlink(const struct lu_env *env,
 557                                       struct dt_device *dt,
 558                                       struct dt_object *p,
 559                                       struct dt_object *c, const char *name,
 560                                       struct thandle *th)
 561{
 562        int rc;
 563
 564        rc = dt_declare_delete(env, p, (const struct dt_key *)name, th);
 565        if (rc < 0)
 566                return rc;
 567
 568        rc = dt_declare_ref_del(env, c, th);
 569        if (rc < 0)
 570                return rc;
 571
 572        return dt_declare_destroy(env, c, th);
 573}
 574
 575int local_object_unlink(const struct lu_env *env, struct dt_device *dt,
 576                        struct dt_object *parent, const char *name)
 577{
 578        struct dt_thread_info   *dti = dt_info(env);
 579        struct dt_object        *dto;
 580        struct thandle          *th;
 581        int                      rc;
 582
 583        rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
 584        if (rc == -ENOENT)
 585                return 0;
 586        else if (rc < 0)
 587                return rc;
 588
 589        dto = dt_locate(env, dt, &dti->dti_fid);
 590        if (unlikely(IS_ERR(dto)))
 591                return PTR_ERR(dto);
 592
 593        th = dt_trans_create(env, dt);
 594        if (IS_ERR(th))
 595                GOTO(out, rc = PTR_ERR(th));
 596
 597        rc = local_object_declare_unlink(env, dt, parent, dto, name, th);
 598        if (rc < 0)
 599                GOTO(stop, rc);
 600
 601        rc = dt_trans_start_local(env, dt, th);
 602        if (rc < 0)
 603                GOTO(stop, rc);
 604
 605        dt_write_lock(env, dto, 0);
 606        rc = dt_delete(env, parent, (struct dt_key *)name, th, BYPASS_CAPA);
 607        if (rc < 0)
 608                GOTO(unlock, rc);
 609
 610        rc = dt_ref_del(env, dto, th);
 611        if (rc < 0) {
 612                rc = dt_insert(env, parent,
 613                               (const struct dt_rec *)&dti->dti_fid,
 614                               (const struct dt_key *)name, th, BYPASS_CAPA, 1);
 615                GOTO(unlock, rc);
 616        }
 617
 618        rc = dt_destroy(env, dto, th);
 619unlock:
 620        dt_write_unlock(env, dto);
 621stop:
 622        dt_trans_stop(env, dt, th);
 623out:
 624        lu_object_put_nocache(env, &dto->do_lu);
 625        return rc;
 626}
 627EXPORT_SYMBOL(local_object_unlink);
 628
 629struct local_oid_storage *dt_los_find(struct ls_device *ls, __u64 seq)
 630{
 631        struct local_oid_storage *los, *ret = NULL;
 632
 633        list_for_each_entry(los, &ls->ls_los_list, los_list) {
 634                if (los->los_seq == seq) {
 635                        atomic_inc(&los->los_refcount);
 636                        ret = los;
 637                        break;
 638                }
 639        }
 640        return ret;
 641}
 642
 643void dt_los_put(struct local_oid_storage *los)
 644{
 645        if (atomic_dec_and_test(&los->los_refcount))
 646                /* should never happen, only local_oid_storage_fini should
 647                 * drop refcount to zero */
 648                LBUG();
 649        return;
 650}
 651
 652/* after Lustre 2.3 release there may be old file to store last generated FID
 653 * If such file exists then we have to read its content
 654 */
 655int lastid_compat_check(const struct lu_env *env, struct dt_device *dev,
 656                        __u64 lastid_seq, __u32 *first_oid, struct ls_device *ls)
 657{
 658        struct dt_thread_info   *dti = dt_info(env);
 659        struct dt_object        *root = NULL;
 660        struct los_ondisk        losd;
 661        struct dt_object        *o = NULL;
 662        int                      rc = 0;
 663
 664        rc = dt_root_get(env, dev, &dti->dti_fid);
 665        if (rc)
 666                return rc;
 667
 668        root = ls_locate(env, ls, &dti->dti_fid);
 669        if (IS_ERR(root))
 670                return PTR_ERR(root);
 671
 672        /* find old last_id file */
 673        snprintf(dti->dti_buf, sizeof(dti->dti_buf), "seq-"LPX64"-lastid",
 674                 lastid_seq);
 675        rc = dt_lookup_dir(env, root, dti->dti_buf, &dti->dti_fid);
 676        lu_object_put_nocache(env, &root->do_lu);
 677        if (rc == -ENOENT) {
 678                /* old llog lastid accessed by FID only */
 679                if (lastid_seq != FID_SEQ_LLOG)
 680                        return 0;
 681                dti->dti_fid.f_seq = FID_SEQ_LLOG;
 682                dti->dti_fid.f_oid = 1;
 683                dti->dti_fid.f_ver = 0;
 684                o = ls_locate(env, ls, &dti->dti_fid);
 685                if (IS_ERR(o))
 686                        return PTR_ERR(o);
 687
 688                if (!dt_object_exists(o)) {
 689                        lu_object_put_nocache(env, &o->do_lu);
 690                        return 0;
 691                }
 692                CDEBUG(D_INFO, "Found old llog lastid file\n");
 693        } else if (rc < 0) {
 694                return rc;
 695        } else {
 696                CDEBUG(D_INFO, "Found old lastid file for sequence "LPX64"\n",
 697                       lastid_seq);
 698                o = ls_locate(env, ls, &dti->dti_fid);
 699                if (IS_ERR(o))
 700                        return PTR_ERR(o);
 701        }
 702        /* let's read seq-NNNNNN-lastid file value */
 703        LASSERT(dt_object_exists(o));
 704        dti->dti_off = 0;
 705        dti->dti_lb.lb_buf = &losd;
 706        dti->dti_lb.lb_len = sizeof(losd);
 707        dt_read_lock(env, o, 0);
 708        rc = dt_record_read(env, o, &dti->dti_lb, &dti->dti_off);
 709        dt_read_unlock(env, o);
 710        lu_object_put_nocache(env, &o->do_lu);
 711        if (rc == 0 && le32_to_cpu(losd.lso_magic) != LOS_MAGIC) {
 712                CERROR("%s: wrong content of seq-"LPX64"-lastid file, magic %x\n",
 713                       o->do_lu.lo_dev->ld_obd->obd_name, lastid_seq,
 714                       le32_to_cpu(losd.lso_magic));
 715                return -EINVAL;
 716        } else if (rc < 0) {
 717                CERROR("%s: failed to read seq-"LPX64"-lastid: rc = %d\n",
 718                       o->do_lu.lo_dev->ld_obd->obd_name, lastid_seq, rc);
 719                return rc;
 720        }
 721        *first_oid = le32_to_cpu(losd.lso_next_oid);
 722        return rc;
 723}
 724
 725/**
 726 * Initialize local OID storage for required sequence.
 727 * That may be needed for services that uses local files and requires
 728 * dynamic OID allocation for them.
 729 *
 730 * Per each sequence we have an object with 'first_fid' identificator
 731 * containing the counter for OIDs of locally created files with that
 732 * sequence.
 733 *
 734 * It is used now by llog subsystem and MGS for NID tables
 735 *
 736 * Function gets first_fid to create counter object.
 737 * All dynamic fids will be generated with the same sequence and incremented
 738 * OIDs
 739 *
 740 * Returned local_oid_storage is in-memory representaion of OID storage
 741 */
 742int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
 743                           const struct lu_fid *first_fid,
 744                           struct local_oid_storage **los)
 745{
 746        struct dt_thread_info   *dti = dt_info(env);
 747        struct ls_device        *ls;
 748        obd_id                   lastid;
 749        struct dt_object        *o = NULL;
 750        struct thandle          *th;
 751        __u32                    first_oid = fid_oid(first_fid);
 752        int                      rc = 0;
 753
 754        ls = ls_device_get(dev);
 755        if (IS_ERR(ls))
 756                return PTR_ERR(ls);
 757
 758        mutex_lock(&ls->ls_los_mutex);
 759        *los = dt_los_find(ls, fid_seq(first_fid));
 760        if (*los != NULL)
 761                GOTO(out, rc = 0);
 762
 763        /* not found, then create */
 764        OBD_ALLOC_PTR(*los);
 765        if (*los == NULL)
 766                GOTO(out, rc = -ENOMEM);
 767
 768        atomic_set(&(*los)->los_refcount, 1);
 769        mutex_init(&(*los)->los_id_lock);
 770        (*los)->los_dev = &ls->ls_top_dev;
 771        atomic_inc(&ls->ls_refcount);
 772        list_add(&(*los)->los_list, &ls->ls_los_list);
 773
 774        /* Use {seq, 0, 0} to create the LAST_ID file for every
 775         * sequence.  OIDs start at LUSTRE_FID_INIT_OID.
 776         */
 777        dti->dti_fid.f_seq = fid_seq(first_fid);
 778        dti->dti_fid.f_oid = LUSTRE_FID_LASTID_OID;
 779        dti->dti_fid.f_ver = 0;
 780        o = ls_locate(env, ls, &dti->dti_fid);
 781        if (IS_ERR(o))
 782                GOTO(out_los, rc = PTR_ERR(o));
 783
 784        if (!dt_object_exists(o)) {
 785                rc = lastid_compat_check(env, dev, fid_seq(first_fid),
 786                                         &first_oid, ls);
 787                if (rc < 0)
 788                        GOTO(out_los, rc);
 789
 790                th = dt_trans_create(env, dev);
 791                if (IS_ERR(th))
 792                        GOTO(out_los, rc = PTR_ERR(th));
 793
 794                dti->dti_attr.la_valid = LA_MODE | LA_TYPE;
 795                dti->dti_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
 796                dti->dti_dof.dof_type = dt_mode_to_dft(S_IFREG);
 797
 798                rc = dt_declare_create(env, o, &dti->dti_attr, NULL,
 799                                       &dti->dti_dof, th);
 800                if (rc)
 801                        GOTO(out_trans, rc);
 802
 803                rc = dt_declare_record_write(env, o, sizeof(lastid), 0, th);
 804                if (rc)
 805                        GOTO(out_trans, rc);
 806
 807                rc = dt_trans_start_local(env, dev, th);
 808                if (rc)
 809                        GOTO(out_trans, rc);
 810
 811                dt_write_lock(env, o, 0);
 812                if (dt_object_exists(o))
 813                        GOTO(out_lock, rc = 0);
 814
 815                rc = dt_create(env, o, &dti->dti_attr, NULL, &dti->dti_dof,
 816                               th);
 817                if (rc)
 818                        GOTO(out_lock, rc);
 819
 820                lastid = cpu_to_le64(first_oid);
 821
 822                dti->dti_off = 0;
 823                dti->dti_lb.lb_buf = &lastid;
 824                dti->dti_lb.lb_len = sizeof(lastid);
 825                rc = dt_record_write(env, o, &dti->dti_lb, &dti->dti_off, th);
 826                if (rc)
 827                        GOTO(out_lock, rc);
 828out_lock:
 829                dt_write_unlock(env, o);
 830out_trans:
 831                dt_trans_stop(env, dev, th);
 832        } else {
 833                dti->dti_off = 0;
 834                dti->dti_lb.lb_buf = &lastid;
 835                dti->dti_lb.lb_len = sizeof(lastid);
 836                dt_read_lock(env, o, 0);
 837                rc = dt_record_read(env, o, &dti->dti_lb, &dti->dti_off);
 838                dt_read_unlock(env, o);
 839                if (rc == 0 && le64_to_cpu(lastid) > OBIF_MAX_OID) {
 840                        CERROR("%s: bad oid "LPU64" is read from LAST_ID\n",
 841                               o->do_lu.lo_dev->ld_obd->obd_name,
 842                               le64_to_cpu(lastid));
 843                        rc = -EINVAL;
 844                }
 845        }
 846out_los:
 847        if (rc != 0) {
 848                list_del(&(*los)->los_list);
 849                atomic_dec(&ls->ls_refcount);
 850                OBD_FREE_PTR(*los);
 851                *los = NULL;
 852                if (o != NULL && !IS_ERR(o))
 853                        lu_object_put_nocache(env, &o->do_lu);
 854        } else {
 855                (*los)->los_seq = fid_seq(first_fid);
 856                (*los)->los_last_oid = le64_to_cpu(lastid);
 857                (*los)->los_obj = o;
 858                /* read value should not be less than initial one */
 859                LASSERTF((*los)->los_last_oid >= first_oid, "%u < %u\n",
 860                         (*los)->los_last_oid, first_oid);
 861        }
 862out:
 863        mutex_unlock(&ls->ls_los_mutex);
 864        ls_device_put(env, ls);
 865        return rc;
 866}
 867EXPORT_SYMBOL(local_oid_storage_init);
 868
 869void local_oid_storage_fini(const struct lu_env *env,
 870                            struct local_oid_storage *los)
 871{
 872        struct ls_device *ls;
 873
 874        if (!atomic_dec_and_test(&los->los_refcount))
 875                return;
 876
 877        LASSERT(env);
 878        LASSERT(los->los_dev);
 879        ls = dt2ls_dev(los->los_dev);
 880
 881        mutex_lock(&ls->ls_los_mutex);
 882        if (atomic_read(&los->los_refcount) == 0) {
 883                if (los->los_obj)
 884                        lu_object_put_nocache(env, &los->los_obj->do_lu);
 885                list_del(&los->los_list);
 886                OBD_FREE_PTR(los);
 887        }
 888        mutex_unlock(&ls->ls_los_mutex);
 889        ls_device_put(env, ls);
 890}
 891EXPORT_SYMBOL(local_oid_storage_fini);
 892