linux/drivers/staging/lustre/lustre/obdclass/cl_object.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * Client Lustre Object.
  37 *
  38 *   Author: Nikita Danilov <nikita.danilov@sun.com>
  39 */
  40
  41/*
  42 * Locking.
  43 *
  44 *  i_mutex
  45 *      PG_locked
  46 *        ->coh_page_guard
  47 *        ->coh_lock_guard
  48 *        ->coh_attr_guard
  49 *        ->ls_guard
  50 */
  51
  52#define DEBUG_SUBSYSTEM S_CLASS
  53
  54#include <linux/libcfs/libcfs.h>
  55/* class_put_type() */
  56#include <obd_class.h>
  57#include <obd_support.h>
  58#include <lustre_fid.h>
  59#include <linux/list.h>
  60#include <linux/libcfs/libcfs_hash.h> /* for cfs_hash stuff */
  61#include <cl_object.h>
  62#include "cl_internal.h"
  63
  64static struct kmem_cache *cl_env_kmem;
  65
  66/** Lock class of cl_object_header::coh_page_guard */
  67static struct lock_class_key cl_page_guard_class;
  68/** Lock class of cl_object_header::coh_lock_guard */
  69static struct lock_class_key cl_lock_guard_class;
  70/** Lock class of cl_object_header::coh_attr_guard */
  71static struct lock_class_key cl_attr_guard_class;
  72
  73extern __u32 lu_context_tags_default;
  74extern __u32 lu_session_tags_default;
  75/**
  76 * Initialize cl_object_header.
  77 */
  78int cl_object_header_init(struct cl_object_header *h)
  79{
  80        int result;
  81
  82        ENTRY;
  83        result = lu_object_header_init(&h->coh_lu);
  84        if (result == 0) {
  85                spin_lock_init(&h->coh_page_guard);
  86                spin_lock_init(&h->coh_lock_guard);
  87                spin_lock_init(&h->coh_attr_guard);
  88                lockdep_set_class(&h->coh_page_guard, &cl_page_guard_class);
  89                lockdep_set_class(&h->coh_lock_guard, &cl_lock_guard_class);
  90                lockdep_set_class(&h->coh_attr_guard, &cl_attr_guard_class);
  91                h->coh_pages = 0;
  92                /* XXX hard coded GFP_* mask. */
  93                INIT_RADIX_TREE(&h->coh_tree, GFP_ATOMIC);
  94                INIT_LIST_HEAD(&h->coh_locks);
  95                h->coh_page_bufsize = ALIGN(sizeof(struct cl_page), 8);
  96        }
  97        RETURN(result);
  98}
  99EXPORT_SYMBOL(cl_object_header_init);
 100
 101/**
 102 * Finalize cl_object_header.
 103 */
 104void cl_object_header_fini(struct cl_object_header *h)
 105{
 106        LASSERT(list_empty(&h->coh_locks));
 107        lu_object_header_fini(&h->coh_lu);
 108}
 109EXPORT_SYMBOL(cl_object_header_fini);
 110
 111/**
 112 * Returns a cl_object with a given \a fid.
 113 *
 114 * Returns either cached or newly created object. Additional reference on the
 115 * returned object is acquired.
 116 *
 117 * \see lu_object_find(), cl_page_find(), cl_lock_find()
 118 */
 119struct cl_object *cl_object_find(const struct lu_env *env,
 120                                 struct cl_device *cd, const struct lu_fid *fid,
 121                                 const struct cl_object_conf *c)
 122{
 123        might_sleep();
 124        return lu2cl(lu_object_find_slice(env, cl2lu_dev(cd), fid, &c->coc_lu));
 125}
 126EXPORT_SYMBOL(cl_object_find);
 127
 128/**
 129 * Releases a reference on \a o.
 130 *
 131 * When last reference is released object is returned to the cache, unless
 132 * lu_object_header_flags::LU_OBJECT_HEARD_BANSHEE bit is set in its header.
 133 *
 134 * \see cl_page_put(), cl_lock_put().
 135 */
 136void cl_object_put(const struct lu_env *env, struct cl_object *o)
 137{
 138        lu_object_put(env, &o->co_lu);
 139}
 140EXPORT_SYMBOL(cl_object_put);
 141
 142/**
 143 * Acquire an additional reference to the object \a o.
 144 *
 145 * This can only be used to acquire _additional_ reference, i.e., caller
 146 * already has to possess at least one reference to \a o before calling this.
 147 *
 148 * \see cl_page_get(), cl_lock_get().
 149 */
 150void cl_object_get(struct cl_object *o)
 151{
 152        lu_object_get(&o->co_lu);
 153}
 154EXPORT_SYMBOL(cl_object_get);
 155
 156/**
 157 * Returns the top-object for a given \a o.
 158 *
 159 * \see cl_page_top(), cl_io_top()
 160 */
 161struct cl_object *cl_object_top(struct cl_object *o)
 162{
 163        struct cl_object_header *hdr = cl_object_header(o);
 164        struct cl_object *top;
 165
 166        while (hdr->coh_parent != NULL)
 167                hdr = hdr->coh_parent;
 168
 169        top = lu2cl(lu_object_top(&hdr->coh_lu));
 170        CDEBUG(D_TRACE, "%p -> %p\n", o, top);
 171        return top;
 172}
 173EXPORT_SYMBOL(cl_object_top);
 174
 175/**
 176 * Returns pointer to the lock protecting data-attributes for the given object
 177 * \a o.
 178 *
 179 * Data-attributes are protected by the cl_object_header::coh_attr_guard
 180 * spin-lock in the top-object.
 181 *
 182 * \see cl_attr, cl_object_attr_lock(), cl_object_operations::coo_attr_get().
 183 */
 184static spinlock_t *cl_object_attr_guard(struct cl_object *o)
 185{
 186        return &cl_object_header(cl_object_top(o))->coh_attr_guard;
 187}
 188
 189/**
 190 * Locks data-attributes.
 191 *
 192 * Prevents data-attributes from changing, until lock is released by
 193 * cl_object_attr_unlock(). This has to be called before calls to
 194 * cl_object_attr_get(), cl_object_attr_set().
 195 */
 196void cl_object_attr_lock(struct cl_object *o)
 197{
 198        spin_lock(cl_object_attr_guard(o));
 199}
 200EXPORT_SYMBOL(cl_object_attr_lock);
 201
 202/**
 203 * Releases data-attributes lock, acquired by cl_object_attr_lock().
 204 */
 205void cl_object_attr_unlock(struct cl_object *o)
 206{
 207        spin_unlock(cl_object_attr_guard(o));
 208}
 209EXPORT_SYMBOL(cl_object_attr_unlock);
 210
 211/**
 212 * Returns data-attributes of an object \a obj.
 213 *
 214 * Every layer is asked (by calling cl_object_operations::coo_attr_get())
 215 * top-to-bottom to fill in parts of \a attr that this layer is responsible
 216 * for.
 217 */
 218int cl_object_attr_get(const struct lu_env *env, struct cl_object *obj,
 219                       struct cl_attr *attr)
 220{
 221        struct lu_object_header *top;
 222        int result;
 223
 224        LASSERT(spin_is_locked(cl_object_attr_guard(obj)));
 225        ENTRY;
 226
 227        top = obj->co_lu.lo_header;
 228        result = 0;
 229        list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
 230                if (obj->co_ops->coo_attr_get != NULL) {
 231                        result = obj->co_ops->coo_attr_get(env, obj, attr);
 232                        if (result != 0) {
 233                                if (result > 0)
 234                                        result = 0;
 235                                break;
 236                        }
 237                }
 238        }
 239        RETURN(result);
 240}
 241EXPORT_SYMBOL(cl_object_attr_get);
 242
 243/**
 244 * Updates data-attributes of an object \a obj.
 245 *
 246 * Only attributes, mentioned in a validness bit-mask \a v are
 247 * updated. Calls cl_object_operations::coo_attr_set() on every layer, bottom
 248 * to top.
 249 */
 250int cl_object_attr_set(const struct lu_env *env, struct cl_object *obj,
 251                       const struct cl_attr *attr, unsigned v)
 252{
 253        struct lu_object_header *top;
 254        int result;
 255
 256        LASSERT(spin_is_locked(cl_object_attr_guard(obj)));
 257        ENTRY;
 258
 259        top = obj->co_lu.lo_header;
 260        result = 0;
 261        list_for_each_entry_reverse(obj, &top->loh_layers,
 262                                        co_lu.lo_linkage) {
 263                if (obj->co_ops->coo_attr_set != NULL) {
 264                        result = obj->co_ops->coo_attr_set(env, obj, attr, v);
 265                        if (result != 0) {
 266                                if (result > 0)
 267                                        result = 0;
 268                                break;
 269                        }
 270                }
 271        }
 272        RETURN(result);
 273}
 274EXPORT_SYMBOL(cl_object_attr_set);
 275
 276/**
 277 * Notifies layers (bottom-to-top) that glimpse AST was received.
 278 *
 279 * Layers have to fill \a lvb fields with information that will be shipped
 280 * back to glimpse issuer.
 281 *
 282 * \see cl_lock_operations::clo_glimpse()
 283 */
 284int cl_object_glimpse(const struct lu_env *env, struct cl_object *obj,
 285                      struct ost_lvb *lvb)
 286{
 287        struct lu_object_header *top;
 288        int result;
 289
 290        ENTRY;
 291        top = obj->co_lu.lo_header;
 292        result = 0;
 293        list_for_each_entry_reverse(obj, &top->loh_layers,
 294                                        co_lu.lo_linkage) {
 295                if (obj->co_ops->coo_glimpse != NULL) {
 296                        result = obj->co_ops->coo_glimpse(env, obj, lvb);
 297                        if (result != 0)
 298                                break;
 299                }
 300        }
 301        LU_OBJECT_HEADER(D_DLMTRACE, env, lu_object_top(top),
 302                         "size: "LPU64" mtime: "LPU64" atime: "LPU64" "
 303                         "ctime: "LPU64" blocks: "LPU64"\n",
 304                         lvb->lvb_size, lvb->lvb_mtime, lvb->lvb_atime,
 305                         lvb->lvb_ctime, lvb->lvb_blocks);
 306        RETURN(result);
 307}
 308EXPORT_SYMBOL(cl_object_glimpse);
 309
 310/**
 311 * Updates a configuration of an object \a obj.
 312 */
 313int cl_conf_set(const struct lu_env *env, struct cl_object *obj,
 314                const struct cl_object_conf *conf)
 315{
 316        struct lu_object_header *top;
 317        int result;
 318
 319        ENTRY;
 320        top = obj->co_lu.lo_header;
 321        result = 0;
 322        list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
 323                if (obj->co_ops->coo_conf_set != NULL) {
 324                        result = obj->co_ops->coo_conf_set(env, obj, conf);
 325                        if (result != 0)
 326                                break;
 327                }
 328        }
 329        RETURN(result);
 330}
 331EXPORT_SYMBOL(cl_conf_set);
 332
 333/**
 334 * Helper function removing all object locks, and marking object for
 335 * deletion. All object pages must have been deleted at this point.
 336 *
 337 * This is called by cl_inode_fini() and lov_object_delete() to destroy top-
 338 * and sub- objects respectively.
 339 */
 340void cl_object_kill(const struct lu_env *env, struct cl_object *obj)
 341{
 342        struct cl_object_header *hdr;
 343
 344        hdr = cl_object_header(obj);
 345        LASSERT(hdr->coh_tree.rnode == NULL);
 346        LASSERT(hdr->coh_pages == 0);
 347
 348        set_bit(LU_OBJECT_HEARD_BANSHEE, &hdr->coh_lu.loh_flags);
 349        /*
 350         * Destroy all locks. Object destruction (including cl_inode_fini())
 351         * cannot cancel the locks, because in the case of a local client,
 352         * where client and server share the same thread running
 353         * prune_icache(), this can dead-lock with ldlm_cancel_handler()
 354         * waiting on __wait_on_freeing_inode().
 355         */
 356        cl_locks_prune(env, obj, 0);
 357}
 358EXPORT_SYMBOL(cl_object_kill);
 359
 360/**
 361 * Prunes caches of pages and locks for this object.
 362 */
 363void cl_object_prune(const struct lu_env *env, struct cl_object *obj)
 364{
 365        ENTRY;
 366        cl_pages_prune(env, obj);
 367        cl_locks_prune(env, obj, 1);
 368        EXIT;
 369}
 370EXPORT_SYMBOL(cl_object_prune);
 371
 372/**
 373 * Check if the object has locks.
 374 */
 375int cl_object_has_locks(struct cl_object *obj)
 376{
 377        struct cl_object_header *head = cl_object_header(obj);
 378        int has;
 379
 380        spin_lock(&head->coh_lock_guard);
 381        has = list_empty(&head->coh_locks);
 382        spin_unlock(&head->coh_lock_guard);
 383
 384        return (has == 0);
 385}
 386EXPORT_SYMBOL(cl_object_has_locks);
 387
 388void cache_stats_init(struct cache_stats *cs, const char *name)
 389{
 390        int i;
 391
 392        cs->cs_name = name;
 393        for (i = 0; i < CS_NR; i++)
 394                atomic_set(&cs->cs_stats[i], 0);
 395}
 396
 397int cache_stats_print(const struct cache_stats *cs, struct seq_file *m, int h)
 398{
 399        int i;
 400        /*
 401         *   lookup    hit    total  cached create
 402         * env: ...... ...... ...... ...... ......
 403         */
 404        if (h) {
 405                const char *names[CS_NR] = CS_NAMES;
 406
 407                seq_printf(m, "%6s", " ");
 408                for (i = 0; i < CS_NR; i++)
 409                        seq_printf(m, "%8s", names[i]);
 410                seq_printf(m, "\n");
 411        }
 412
 413        seq_printf(m, "%5.5s:", cs->cs_name);
 414        for (i = 0; i < CS_NR; i++)
 415                seq_printf(m, "%8u", atomic_read(&cs->cs_stats[i]));
 416        return 0;
 417}
 418
 419/**
 420 * Initialize client site.
 421 *
 422 * Perform common initialization (lu_site_init()), and initialize statistical
 423 * counters. Also perform global initializations on the first call.
 424 */
 425int cl_site_init(struct cl_site *s, struct cl_device *d)
 426{
 427        int i;
 428        int result;
 429
 430        result = lu_site_init(&s->cs_lu, &d->cd_lu_dev);
 431        if (result == 0) {
 432                cache_stats_init(&s->cs_pages, "pages");
 433                cache_stats_init(&s->cs_locks, "locks");
 434                for (i = 0; i < ARRAY_SIZE(s->cs_pages_state); ++i)
 435                        atomic_set(&s->cs_pages_state[0], 0);
 436                for (i = 0; i < ARRAY_SIZE(s->cs_locks_state); ++i)
 437                        atomic_set(&s->cs_locks_state[i], 0);
 438        }
 439        return result;
 440}
 441EXPORT_SYMBOL(cl_site_init);
 442
 443/**
 444 * Finalize client site. Dual to cl_site_init().
 445 */
 446void cl_site_fini(struct cl_site *s)
 447{
 448        lu_site_fini(&s->cs_lu);
 449}
 450EXPORT_SYMBOL(cl_site_fini);
 451
 452static struct cache_stats cl_env_stats = {
 453        .cs_name    = "envs",
 454        .cs_stats = { ATOMIC_INIT(0), }
 455};
 456
 457/**
 458 * Outputs client site statistical counters into a buffer. Suitable for
 459 * ll_rd_*()-style functions.
 460 */
 461int cl_site_stats_print(const struct cl_site *site, struct seq_file *m)
 462{
 463        int i;
 464        static const char *pstate[] = {
 465                [CPS_CACHED]  = "c",
 466                [CPS_OWNED]   = "o",
 467                [CPS_PAGEOUT] = "w",
 468                [CPS_PAGEIN]  = "r",
 469                [CPS_FREEING] = "f"
 470        };
 471        static const char *lstate[] = {
 472                [CLS_NEW]       = "n",
 473                [CLS_QUEUING]   = "q",
 474                [CLS_ENQUEUED]  = "e",
 475                [CLS_HELD]      = "h",
 476                [CLS_INTRANSIT] = "t",
 477                [CLS_CACHED]    = "c",
 478                [CLS_FREEING]   = "f"
 479        };
 480/*
 481       lookup    hit  total   busy create
 482pages: ...... ...... ...... ...... ...... [...... ...... ...... ......]
 483locks: ...... ...... ...... ...... ...... [...... ...... ...... ...... ......]
 484  env: ...... ...... ...... ...... ......
 485 */
 486        lu_site_stats_print(&site->cs_lu, m);
 487        cache_stats_print(&site->cs_pages, m, 1);
 488        seq_printf(m, " [");
 489        for (i = 0; i < ARRAY_SIZE(site->cs_pages_state); ++i)
 490                seq_printf(m, "%s: %u ", pstate[i],
 491                                atomic_read(&site->cs_pages_state[i]));
 492        seq_printf(m, "]\n");
 493        cache_stats_print(&site->cs_locks, m, 0);
 494        seq_printf(m, " [");
 495        for (i = 0; i < ARRAY_SIZE(site->cs_locks_state); ++i)
 496                seq_printf(m, "%s: %u ", lstate[i],
 497                                atomic_read(&site->cs_locks_state[i]));
 498        seq_printf(m, "]\n");
 499        cache_stats_print(&cl_env_stats, m, 0);
 500        seq_printf(m, "\n");
 501        return 0;
 502}
 503EXPORT_SYMBOL(cl_site_stats_print);
 504
 505/*****************************************************************************
 506 *
 507 * lu_env handling on client.
 508 *
 509 */
 510
 511/**
 512 * The most efficient way is to store cl_env pointer in task specific
 513 * structures. On Linux, it wont' be easy to use task_struct->journal_info
 514 * because Lustre code may call into other fs which has certain assumptions
 515 * about journal_info. Currently following fields in task_struct are identified
 516 * can be used for this purpose:
 517 *  - cl_env: for liblustre.
 518 *  - tux_info: ony on RedHat kernel.
 519 *  - ...
 520 * \note As long as we use task_struct to store cl_env, we assume that once
 521 * called into Lustre, we'll never call into the other part of the kernel
 522 * which will use those fields in task_struct without explicitly exiting
 523 * Lustre.
 524 *
 525 * If there's no space in task_struct is available, hash will be used.
 526 * bz20044, bz22683.
 527 */
 528
 529struct cl_env {
 530        void         *ce_magic;
 531        struct lu_env     ce_lu;
 532        struct lu_context ce_ses;
 533
 534        /**
 535         * This allows cl_env to be entered into cl_env_hash which implements
 536         * the current thread -> client environment lookup.
 537         */
 538        struct hlist_node  ce_node;
 539        /**
 540         * Owner for the current cl_env.
 541         *
 542         * If LL_TASK_CL_ENV is defined, this point to the owning current,
 543         * only for debugging purpose ;
 544         * Otherwise hash is used, and this is the key for cfs_hash.
 545         * Now current thread pid is stored. Note using thread pointer would
 546         * lead to unbalanced hash because of its specific allocation locality
 547         * and could be varied for different platforms and OSes, even different
 548         * OS versions.
 549         */
 550        void         *ce_owner;
 551
 552        /*
 553         * Linkage into global list of all client environments. Used for
 554         * garbage collection.
 555         */
 556        struct list_head        ce_linkage;
 557        /*
 558         *
 559         */
 560        int            ce_ref;
 561        /*
 562         * Debugging field: address of the caller who made original
 563         * allocation.
 564         */
 565        void         *ce_debug;
 566};
 567
 568#define CL_ENV_INC(counter)
 569#define CL_ENV_DEC(counter)
 570
 571static void cl_env_init0(struct cl_env *cle, void *debug)
 572{
 573        LASSERT(cle->ce_ref == 0);
 574        LASSERT(cle->ce_magic == &cl_env_init0);
 575        LASSERT(cle->ce_debug == NULL && cle->ce_owner == NULL);
 576
 577        cle->ce_ref = 1;
 578        cle->ce_debug = debug;
 579        CL_ENV_INC(busy);
 580}
 581
 582
 583/*
 584 * The implementation of using hash table to connect cl_env and thread
 585 */
 586
 587static cfs_hash_t *cl_env_hash;
 588
 589static unsigned cl_env_hops_hash(cfs_hash_t *lh,
 590                                 const void *key, unsigned mask)
 591{
 592#if BITS_PER_LONG == 64
 593        return cfs_hash_u64_hash((__u64)key, mask);
 594#else
 595        return cfs_hash_u32_hash((__u32)key, mask);
 596#endif
 597}
 598
 599static void *cl_env_hops_obj(struct hlist_node *hn)
 600{
 601        struct cl_env *cle = hlist_entry(hn, struct cl_env, ce_node);
 602        LASSERT(cle->ce_magic == &cl_env_init0);
 603        return (void *)cle;
 604}
 605
 606static int cl_env_hops_keycmp(const void *key, struct hlist_node *hn)
 607{
 608        struct cl_env *cle = cl_env_hops_obj(hn);
 609
 610        LASSERT(cle->ce_owner != NULL);
 611        return (key == cle->ce_owner);
 612}
 613
 614static void cl_env_hops_noop(cfs_hash_t *hs, struct hlist_node *hn)
 615{
 616        struct cl_env *cle = hlist_entry(hn, struct cl_env, ce_node);
 617        LASSERT(cle->ce_magic == &cl_env_init0);
 618}
 619
 620static cfs_hash_ops_t cl_env_hops = {
 621        .hs_hash        = cl_env_hops_hash,
 622        .hs_key  = cl_env_hops_obj,
 623        .hs_keycmp      = cl_env_hops_keycmp,
 624        .hs_object      = cl_env_hops_obj,
 625        .hs_get  = cl_env_hops_noop,
 626        .hs_put_locked  = cl_env_hops_noop,
 627};
 628
 629static inline struct cl_env *cl_env_fetch(void)
 630{
 631        struct cl_env *cle;
 632
 633        cle = cfs_hash_lookup(cl_env_hash, (void *) (long) current->pid);
 634        LASSERT(ergo(cle, cle->ce_magic == &cl_env_init0));
 635        return cle;
 636}
 637
 638static inline void cl_env_attach(struct cl_env *cle)
 639{
 640        if (cle) {
 641                int rc;
 642
 643                LASSERT(cle->ce_owner == NULL);
 644                cle->ce_owner = (void *) (long) current->pid;
 645                rc = cfs_hash_add_unique(cl_env_hash, cle->ce_owner,
 646                                         &cle->ce_node);
 647                LASSERT(rc == 0);
 648        }
 649}
 650
 651static inline void cl_env_do_detach(struct cl_env *cle)
 652{
 653        void *cookie;
 654
 655        LASSERT(cle->ce_owner == (void *) (long) current->pid);
 656        cookie = cfs_hash_del(cl_env_hash, cle->ce_owner,
 657                              &cle->ce_node);
 658        LASSERT(cookie == cle);
 659        cle->ce_owner = NULL;
 660}
 661
 662static int cl_env_store_init(void) {
 663        cl_env_hash = cfs_hash_create("cl_env",
 664                                      HASH_CL_ENV_BITS, HASH_CL_ENV_BITS,
 665                                      HASH_CL_ENV_BKT_BITS, 0,
 666                                      CFS_HASH_MIN_THETA,
 667                                      CFS_HASH_MAX_THETA,
 668                                      &cl_env_hops,
 669                                      CFS_HASH_RW_BKTLOCK);
 670        return cl_env_hash != NULL ? 0 :-ENOMEM;
 671}
 672
 673static void cl_env_store_fini(void) {
 674        cfs_hash_putref(cl_env_hash);
 675}
 676
 677
 678static inline struct cl_env *cl_env_detach(struct cl_env *cle)
 679{
 680        if (cle == NULL)
 681                cle = cl_env_fetch();
 682
 683        if (cle && cle->ce_owner)
 684                cl_env_do_detach(cle);
 685
 686        return cle;
 687}
 688
 689static struct lu_env *cl_env_new(__u32 ctx_tags, __u32 ses_tags, void *debug)
 690{
 691        struct lu_env *env;
 692        struct cl_env *cle;
 693
 694        OBD_SLAB_ALLOC_PTR_GFP(cle, cl_env_kmem, __GFP_IO);
 695        if (cle != NULL) {
 696                int rc;
 697
 698                INIT_LIST_HEAD(&cle->ce_linkage);
 699                cle->ce_magic = &cl_env_init0;
 700                env = &cle->ce_lu;
 701                rc = lu_env_init(env, LCT_CL_THREAD|ctx_tags);
 702                if (rc == 0) {
 703                        rc = lu_context_init(&cle->ce_ses,
 704                                             LCT_SESSION | ses_tags);
 705                        if (rc == 0) {
 706                                lu_context_enter(&cle->ce_ses);
 707                                env->le_ses = &cle->ce_ses;
 708                                cl_env_init0(cle, debug);
 709                        } else
 710                                lu_env_fini(env);
 711                }
 712                if (rc != 0) {
 713                        OBD_SLAB_FREE_PTR(cle, cl_env_kmem);
 714                        env = ERR_PTR(rc);
 715                } else {
 716                        CL_ENV_INC(create);
 717                        CL_ENV_INC(total);
 718                }
 719        } else
 720                env = ERR_PTR(-ENOMEM);
 721        return env;
 722}
 723
 724static void cl_env_fini(struct cl_env *cle)
 725{
 726        CL_ENV_DEC(total);
 727        lu_context_fini(&cle->ce_lu.le_ctx);
 728        lu_context_fini(&cle->ce_ses);
 729        OBD_SLAB_FREE_PTR(cle, cl_env_kmem);
 730}
 731
 732static inline struct cl_env *cl_env_container(struct lu_env *env)
 733{
 734        return container_of(env, struct cl_env, ce_lu);
 735}
 736
 737struct lu_env *cl_env_peek(int *refcheck)
 738{
 739        struct lu_env *env;
 740        struct cl_env *cle;
 741
 742        CL_ENV_INC(lookup);
 743
 744        /* check that we don't go far from untrusted pointer */
 745        CLASSERT(offsetof(struct cl_env, ce_magic) == 0);
 746
 747        env = NULL;
 748        cle = cl_env_fetch();
 749        if (cle != NULL) {
 750                CL_ENV_INC(hit);
 751                env = &cle->ce_lu;
 752                *refcheck = ++cle->ce_ref;
 753        }
 754        CDEBUG(D_OTHER, "%d@%p\n", cle ? cle->ce_ref : 0, cle);
 755        return env;
 756}
 757EXPORT_SYMBOL(cl_env_peek);
 758
 759/**
 760 * Returns lu_env: if there already is an environment associated with the
 761 * current thread, it is returned, otherwise, new environment is allocated.
 762 *
 763 * \param refcheck pointer to a counter used to detect environment leaks. In
 764 * the usual case cl_env_get() and cl_env_put() are called in the same lexical
 765 * scope and pointer to the same integer is passed as \a refcheck. This is
 766 * used to detect missed cl_env_put().
 767 *
 768 * \see cl_env_put()
 769 */
 770struct lu_env *cl_env_get(int *refcheck)
 771{
 772        struct lu_env *env;
 773
 774        env = cl_env_peek(refcheck);
 775        if (env == NULL) {
 776                env = cl_env_new(lu_context_tags_default,
 777                                 lu_session_tags_default,
 778                                 __builtin_return_address(0));
 779
 780                if (!IS_ERR(env)) {
 781                        struct cl_env *cle;
 782
 783                        cle = cl_env_container(env);
 784                        cl_env_attach(cle);
 785                        *refcheck = cle->ce_ref;
 786                        CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
 787                }
 788        }
 789        return env;
 790}
 791EXPORT_SYMBOL(cl_env_get);
 792
 793/**
 794 * Forces an allocation of a fresh environment with given tags.
 795 *
 796 * \see cl_env_get()
 797 */
 798struct lu_env *cl_env_alloc(int *refcheck, __u32 tags)
 799{
 800        struct lu_env *env;
 801
 802        LASSERT(cl_env_peek(refcheck) == NULL);
 803        env = cl_env_new(tags, tags, __builtin_return_address(0));
 804        if (!IS_ERR(env)) {
 805                struct cl_env *cle;
 806
 807                cle = cl_env_container(env);
 808                *refcheck = cle->ce_ref;
 809                CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
 810        }
 811        return env;
 812}
 813EXPORT_SYMBOL(cl_env_alloc);
 814
 815static void cl_env_exit(struct cl_env *cle)
 816{
 817        LASSERT(cle->ce_owner == NULL);
 818        lu_context_exit(&cle->ce_lu.le_ctx);
 819        lu_context_exit(&cle->ce_ses);
 820}
 821
 822/**
 823 * Release an environment.
 824 *
 825 * Decrement \a env reference counter. When counter drops to 0, nothing in
 826 * this thread is using environment and it is returned to the allocation
 827 * cache, or freed straight away, if cache is large enough.
 828 */
 829void cl_env_put(struct lu_env *env, int *refcheck)
 830{
 831        struct cl_env *cle;
 832
 833        cle = cl_env_container(env);
 834
 835        LASSERT(cle->ce_ref > 0);
 836        LASSERT(ergo(refcheck != NULL, cle->ce_ref == *refcheck));
 837
 838        CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
 839        if (--cle->ce_ref == 0) {
 840                CL_ENV_DEC(busy);
 841                cl_env_detach(cle);
 842                cle->ce_debug = NULL;
 843                cl_env_exit(cle);
 844                cl_env_fini(cle);
 845        }
 846}
 847EXPORT_SYMBOL(cl_env_put);
 848
 849/**
 850 * Declares a point of re-entrancy.
 851 *
 852 * \see cl_env_reexit()
 853 */
 854void *cl_env_reenter(void)
 855{
 856        return cl_env_detach(NULL);
 857}
 858EXPORT_SYMBOL(cl_env_reenter);
 859
 860/**
 861 * Exits re-entrancy.
 862 */
 863void cl_env_reexit(void *cookie)
 864{
 865        cl_env_detach(NULL);
 866        cl_env_attach(cookie);
 867}
 868EXPORT_SYMBOL(cl_env_reexit);
 869
 870/**
 871 * Setup user-supplied \a env as a current environment. This is to be used to
 872 * guaranteed that environment exists even when cl_env_get() fails. It is up
 873 * to user to ensure proper concurrency control.
 874 *
 875 * \see cl_env_unplant()
 876 */
 877void cl_env_implant(struct lu_env *env, int *refcheck)
 878{
 879        struct cl_env *cle = cl_env_container(env);
 880
 881        LASSERT(cle->ce_ref > 0);
 882
 883        cl_env_attach(cle);
 884        cl_env_get(refcheck);
 885        CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
 886}
 887EXPORT_SYMBOL(cl_env_implant);
 888
 889/**
 890 * Detach environment installed earlier by cl_env_implant().
 891 */
 892void cl_env_unplant(struct lu_env *env, int *refcheck)
 893{
 894        struct cl_env *cle = cl_env_container(env);
 895
 896        LASSERT(cle->ce_ref > 1);
 897
 898        CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
 899
 900        cl_env_detach(cle);
 901        cl_env_put(env, refcheck);
 902}
 903EXPORT_SYMBOL(cl_env_unplant);
 904
 905struct lu_env *cl_env_nested_get(struct cl_env_nest *nest)
 906{
 907        struct lu_env *env;
 908
 909        nest->cen_cookie = NULL;
 910        env = cl_env_peek(&nest->cen_refcheck);
 911        if (env != NULL) {
 912                if (!cl_io_is_going(env))
 913                        return env;
 914                else {
 915                        cl_env_put(env, &nest->cen_refcheck);
 916                        nest->cen_cookie = cl_env_reenter();
 917                }
 918        }
 919        env = cl_env_get(&nest->cen_refcheck);
 920        if (IS_ERR(env)) {
 921                cl_env_reexit(nest->cen_cookie);
 922                return env;
 923        }
 924
 925        LASSERT(!cl_io_is_going(env));
 926        return env;
 927}
 928EXPORT_SYMBOL(cl_env_nested_get);
 929
 930void cl_env_nested_put(struct cl_env_nest *nest, struct lu_env *env)
 931{
 932        cl_env_put(env, &nest->cen_refcheck);
 933        cl_env_reexit(nest->cen_cookie);
 934}
 935EXPORT_SYMBOL(cl_env_nested_put);
 936
 937/**
 938 * Converts struct cl_attr to struct ost_lvb.
 939 *
 940 * \see cl_lvb2attr
 941 */
 942void cl_attr2lvb(struct ost_lvb *lvb, const struct cl_attr *attr)
 943{
 944        ENTRY;
 945        lvb->lvb_size   = attr->cat_size;
 946        lvb->lvb_mtime  = attr->cat_mtime;
 947        lvb->lvb_atime  = attr->cat_atime;
 948        lvb->lvb_ctime  = attr->cat_ctime;
 949        lvb->lvb_blocks = attr->cat_blocks;
 950        EXIT;
 951}
 952EXPORT_SYMBOL(cl_attr2lvb);
 953
 954/**
 955 * Converts struct ost_lvb to struct cl_attr.
 956 *
 957 * \see cl_attr2lvb
 958 */
 959void cl_lvb2attr(struct cl_attr *attr, const struct ost_lvb *lvb)
 960{
 961        ENTRY;
 962        attr->cat_size   = lvb->lvb_size;
 963        attr->cat_mtime  = lvb->lvb_mtime;
 964        attr->cat_atime  = lvb->lvb_atime;
 965        attr->cat_ctime  = lvb->lvb_ctime;
 966        attr->cat_blocks = lvb->lvb_blocks;
 967        EXIT;
 968}
 969EXPORT_SYMBOL(cl_lvb2attr);
 970
 971/*****************************************************************************
 972 *
 973 * Temporary prototype thing: mirror obd-devices into cl devices.
 974 *
 975 */
 976
 977struct cl_device *cl_type_setup(const struct lu_env *env, struct lu_site *site,
 978                                struct lu_device_type *ldt,
 979                                struct lu_device *next)
 980{
 981        const char       *typename;
 982        struct lu_device *d;
 983
 984        LASSERT(ldt != NULL);
 985
 986        typename = ldt->ldt_name;
 987        d = ldt->ldt_ops->ldto_device_alloc(env, ldt, NULL);
 988        if (!IS_ERR(d)) {
 989                int rc;
 990
 991                if (site != NULL)
 992                        d->ld_site = site;
 993                rc = ldt->ldt_ops->ldto_device_init(env, d, typename, next);
 994                if (rc == 0) {
 995                        lu_device_get(d);
 996                        lu_ref_add(&d->ld_reference,
 997                                   "lu-stack", &lu_site_init);
 998                } else {
 999                        ldt->ldt_ops->ldto_device_free(env, d);
1000                        CERROR("can't init device '%s', %d\n", typename, rc);
1001                        d = ERR_PTR(rc);
1002                }
1003        } else
1004                CERROR("Cannot allocate device: '%s'\n", typename);
1005        return lu2cl_dev(d);
1006}
1007EXPORT_SYMBOL(cl_type_setup);
1008
1009/**
1010 * Finalize device stack by calling lu_stack_fini().
1011 */
1012void cl_stack_fini(const struct lu_env *env, struct cl_device *cl)
1013{
1014        lu_stack_fini(env, cl2lu_dev(cl));
1015}
1016EXPORT_SYMBOL(cl_stack_fini);
1017
1018int  cl_lock_init(void);
1019void cl_lock_fini(void);
1020
1021int  cl_page_init(void);
1022void cl_page_fini(void);
1023
1024static struct lu_context_key cl_key;
1025
1026struct cl_thread_info *cl_env_info(const struct lu_env *env)
1027{
1028        return lu_context_key_get(&env->le_ctx, &cl_key);
1029}
1030
1031/* defines cl0_key_{init,fini}() */
1032LU_KEY_INIT_FINI(cl0, struct cl_thread_info);
1033
1034static void *cl_key_init(const struct lu_context *ctx,
1035                         struct lu_context_key *key)
1036{
1037        struct cl_thread_info *info;
1038
1039        info = cl0_key_init(ctx, key);
1040        if (!IS_ERR(info)) {
1041                int i;
1042
1043                for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
1044                        lu_ref_init(&info->clt_counters[i].ctc_locks_locked);
1045        }
1046        return info;
1047}
1048
1049static void cl_key_fini(const struct lu_context *ctx,
1050                        struct lu_context_key *key, void *data)
1051{
1052        struct cl_thread_info *info;
1053        int i;
1054
1055        info = data;
1056        for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
1057                lu_ref_fini(&info->clt_counters[i].ctc_locks_locked);
1058        cl0_key_fini(ctx, key, data);
1059}
1060
1061static void cl_key_exit(const struct lu_context *ctx,
1062                        struct lu_context_key *key, void *data)
1063{
1064        struct cl_thread_info *info = data;
1065        int i;
1066
1067        for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i) {
1068                LASSERT(info->clt_counters[i].ctc_nr_held == 0);
1069                LASSERT(info->clt_counters[i].ctc_nr_used == 0);
1070                LASSERT(info->clt_counters[i].ctc_nr_locks_acquired == 0);
1071                LASSERT(info->clt_counters[i].ctc_nr_locks_locked == 0);
1072                lu_ref_fini(&info->clt_counters[i].ctc_locks_locked);
1073                lu_ref_init(&info->clt_counters[i].ctc_locks_locked);
1074        }
1075}
1076
1077static struct lu_context_key cl_key = {
1078        .lct_tags = LCT_CL_THREAD,
1079        .lct_init = cl_key_init,
1080        .lct_fini = cl_key_fini,
1081        .lct_exit = cl_key_exit
1082};
1083
1084static struct lu_kmem_descr cl_object_caches[] = {
1085        {
1086                .ckd_cache = &cl_env_kmem,
1087                .ckd_name  = "cl_env_kmem",
1088                .ckd_size  = sizeof (struct cl_env)
1089        },
1090        {
1091                .ckd_cache = NULL
1092        }
1093};
1094
1095/**
1096 * Global initialization of cl-data. Create kmem caches, register
1097 * lu_context_key's, etc.
1098 *
1099 * \see cl_global_fini()
1100 */
1101int cl_global_init(void)
1102{
1103        int result;
1104
1105        result = cl_env_store_init();
1106        if (result)
1107                return result;
1108
1109        result = lu_kmem_init(cl_object_caches);
1110        if (result)
1111                goto out_store;
1112
1113        LU_CONTEXT_KEY_INIT(&cl_key);
1114        result = lu_context_key_register(&cl_key);
1115        if (result)
1116                goto out_kmem;
1117
1118        result = cl_lock_init();
1119        if (result)
1120                goto out_context;
1121
1122        result = cl_page_init();
1123        if (result)
1124                goto out_lock;
1125
1126        return 0;
1127out_lock:
1128        cl_lock_fini();
1129out_context:
1130        lu_context_key_degister(&cl_key);
1131out_kmem:
1132        lu_kmem_fini(cl_object_caches);
1133out_store:
1134        cl_env_store_fini();
1135        return result;
1136}
1137
1138/**
1139 * Finalization of global cl-data. Dual to cl_global_init().
1140 */
1141void cl_global_fini(void)
1142{
1143        cl_lock_fini();
1144        cl_page_fini();
1145        lu_context_key_degister(&cl_key);
1146        lu_kmem_fini(cl_object_caches);
1147        cl_env_store_fini();
1148}
1149