linux/drivers/staging/lustre/lustre/lov/lov_lock.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.gnu.org/licenses/gpl-2.0.html
  19 *
  20 * GPL HEADER END
  21 */
  22/*
  23 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  24 * Use is subject to license terms.
  25 *
  26 * Copyright (c) 2011, 2012, Intel Corporation.
  27 */
  28/*
  29 * This file is part of Lustre, http://www.lustre.org/
  30 * Lustre is a trademark of Sun Microsystems, Inc.
  31 *
  32 * Implementation of cl_lock for LOV layer.
  33 *
  34 *   Author: Nikita Danilov <nikita.danilov@sun.com>
  35 */
  36
  37#define DEBUG_SUBSYSTEM S_LOV
  38
  39#include "lov_cl_internal.h"
  40
  41/** \addtogroup lov
  42 *  @{
  43 */
  44
  45/*****************************************************************************
  46 *
  47 * Lov lock operations.
  48 *
  49 */
  50
  51static struct lov_sublock_env *lov_sublock_env_get(const struct lu_env *env,
  52                                                   const struct cl_lock *parent,
  53                                                   struct lov_lock_sub *lls)
  54{
  55        struct lov_sublock_env *subenv;
  56        struct lov_io     *lio    = lov_env_io(env);
  57        struct cl_io       *io     = lio->lis_cl.cis_io;
  58        struct lov_io_sub      *sub;
  59
  60        subenv = &lov_env_session(env)->ls_subenv;
  61
  62        /*
  63         * FIXME: We tend to use the subio's env & io to call the sublock
  64         * lock operations because osc lock sometimes stores some control
  65         * variables in thread's IO information(Now only lockless information).
  66         * However, if the lock's host(object) is different from the object
  67         * for current IO, we have no way to get the subenv and subio because
  68         * they are not initialized at all. As a temp fix, in this case,
  69         * we still borrow the parent's env to call sublock operations.
  70         */
  71        if (!io || !cl_object_same(io->ci_obj, parent->cll_descr.cld_obj)) {
  72                subenv->lse_env = env;
  73                subenv->lse_io  = io;
  74                subenv->lse_sub = NULL;
  75        } else {
  76                sub = lov_sub_get(env, lio, lls->sub_stripe);
  77                if (!IS_ERR(sub)) {
  78                        subenv->lse_env = sub->sub_env;
  79                        subenv->lse_io  = sub->sub_io;
  80                        subenv->lse_sub = sub;
  81                } else {
  82                        subenv = (void *)sub;
  83                }
  84        }
  85        return subenv;
  86}
  87
  88static void lov_sublock_env_put(struct lov_sublock_env *subenv)
  89{
  90        if (subenv && subenv->lse_sub)
  91                lov_sub_put(subenv->lse_sub);
  92}
  93
  94static int lov_sublock_init(const struct lu_env *env,
  95                            const struct cl_lock *parent,
  96                            struct lov_lock_sub *lls)
  97{
  98        struct lov_sublock_env *subenv;
  99        int result;
 100
 101        subenv = lov_sublock_env_get(env, parent, lls);
 102        if (!IS_ERR(subenv)) {
 103                result = cl_lock_init(subenv->lse_env, &lls->sub_lock,
 104                                      subenv->lse_io);
 105                lov_sublock_env_put(subenv);
 106        } else {
 107                /* error occurs. */
 108                result = PTR_ERR(subenv);
 109        }
 110        return result;
 111}
 112
 113/**
 114 * Creates sub-locks for a given lov_lock for the first time.
 115 *
 116 * Goes through all sub-objects of top-object, and creates sub-locks on every
 117 * sub-object intersecting with top-lock extent. This is complicated by the
 118 * fact that top-lock (that is being created) can be accessed concurrently
 119 * through already created sub-locks (possibly shared with other top-locks).
 120 */
 121static struct lov_lock *lov_lock_sub_init(const struct lu_env *env,
 122                                          const struct cl_object *obj,
 123                                          struct cl_lock *lock)
 124{
 125        int result = 0;
 126        int i;
 127        int nr;
 128        u64 start;
 129        u64 end;
 130        u64 file_start;
 131        u64 file_end;
 132
 133        struct lov_object       *loo    = cl2lov(obj);
 134        struct lov_layout_raid0 *r0     = lov_r0(loo);
 135        struct lov_lock         *lovlck;
 136
 137        file_start = cl_offset(lov2cl(loo), lock->cll_descr.cld_start);
 138        file_end   = cl_offset(lov2cl(loo), lock->cll_descr.cld_end + 1) - 1;
 139
 140        for (i = 0, nr = 0; i < r0->lo_nr; i++) {
 141                /*
 142                 * XXX for wide striping smarter algorithm is desirable,
 143                 * breaking out of the loop, early.
 144                 */
 145                if (likely(r0->lo_sub[i]) && /* spare layout */
 146                    lov_stripe_intersects(loo->lo_lsm, i,
 147                                          file_start, file_end, &start, &end))
 148                        nr++;
 149        }
 150        LASSERT(nr > 0);
 151        lovlck = libcfs_kvzalloc(offsetof(struct lov_lock, lls_sub[nr]),
 152                                 GFP_NOFS);
 153        if (!lovlck)
 154                return ERR_PTR(-ENOMEM);
 155
 156        lovlck->lls_nr = nr;
 157        for (i = 0, nr = 0; i < r0->lo_nr; ++i) {
 158                if (likely(r0->lo_sub[i]) &&
 159                    lov_stripe_intersects(loo->lo_lsm, i,
 160                                          file_start, file_end, &start, &end)) {
 161                        struct lov_lock_sub *lls = &lovlck->lls_sub[nr];
 162                        struct cl_lock_descr *descr;
 163
 164                        descr = &lls->sub_lock.cll_descr;
 165
 166                        LASSERT(!descr->cld_obj);
 167                        descr->cld_obj   = lovsub2cl(r0->lo_sub[i]);
 168                        descr->cld_start = cl_index(descr->cld_obj, start);
 169                        descr->cld_end   = cl_index(descr->cld_obj, end);
 170                        descr->cld_mode  = lock->cll_descr.cld_mode;
 171                        descr->cld_gid   = lock->cll_descr.cld_gid;
 172                        descr->cld_enq_flags = lock->cll_descr.cld_enq_flags;
 173                        lls->sub_stripe = i;
 174
 175                        /* initialize sub lock */
 176                        result = lov_sublock_init(env, lock, lls);
 177                        if (result < 0)
 178                                break;
 179
 180                        lls->sub_initialized = 1;
 181                        nr++;
 182                }
 183        }
 184        LASSERT(ergo(result == 0, nr == lovlck->lls_nr));
 185
 186        if (result != 0) {
 187                for (i = 0; i < nr; ++i) {
 188                        if (!lovlck->lls_sub[i].sub_initialized)
 189                                break;
 190
 191                        cl_lock_fini(env, &lovlck->lls_sub[i].sub_lock);
 192                }
 193                kvfree(lovlck);
 194                lovlck = ERR_PTR(result);
 195        }
 196
 197        return lovlck;
 198}
 199
 200static void lov_lock_fini(const struct lu_env *env,
 201                          struct cl_lock_slice *slice)
 202{
 203        struct lov_lock *lovlck;
 204        int i;
 205
 206        lovlck = cl2lov_lock(slice);
 207        for (i = 0; i < lovlck->lls_nr; ++i) {
 208                LASSERT(!lovlck->lls_sub[i].sub_is_enqueued);
 209                if (lovlck->lls_sub[i].sub_initialized)
 210                        cl_lock_fini(env, &lovlck->lls_sub[i].sub_lock);
 211        }
 212        kvfree(lovlck);
 213}
 214
 215/**
 216 * Implementation of cl_lock_operations::clo_enqueue() for lov layer. This
 217 * function is rather subtle, as it enqueues top-lock (i.e., advances top-lock
 218 * state machine from CLS_QUEUING to CLS_ENQUEUED states) by juggling sub-lock
 219 * state machines in the face of sub-locks sharing (by multiple top-locks),
 220 * and concurrent sub-lock cancellations.
 221 */
 222static int lov_lock_enqueue(const struct lu_env *env,
 223                            const struct cl_lock_slice *slice,
 224                            struct cl_io *io, struct cl_sync_io *anchor)
 225{
 226        struct cl_lock *lock = slice->cls_lock;
 227        struct lov_lock *lovlck = cl2lov_lock(slice);
 228        int i;
 229        int rc = 0;
 230
 231        for (i = 0; i < lovlck->lls_nr; ++i) {
 232                struct lov_lock_sub  *lls = &lovlck->lls_sub[i];
 233                struct lov_sublock_env *subenv;
 234
 235                subenv = lov_sublock_env_get(env, lock, lls);
 236                if (IS_ERR(subenv)) {
 237                        rc = PTR_ERR(subenv);
 238                        break;
 239                }
 240                rc = cl_lock_enqueue(subenv->lse_env, subenv->lse_io,
 241                                     &lls->sub_lock, anchor);
 242                lov_sublock_env_put(subenv);
 243                if (rc != 0)
 244                        break;
 245
 246                lls->sub_is_enqueued = 1;
 247        }
 248        return rc;
 249}
 250
 251static void lov_lock_cancel(const struct lu_env *env,
 252                            const struct cl_lock_slice *slice)
 253{
 254        struct cl_lock *lock = slice->cls_lock;
 255        struct lov_lock *lovlck = cl2lov_lock(slice);
 256        int i;
 257
 258        for (i = 0; i < lovlck->lls_nr; ++i) {
 259                struct lov_lock_sub *lls = &lovlck->lls_sub[i];
 260                struct cl_lock *sublock = &lls->sub_lock;
 261                struct lov_sublock_env *subenv;
 262
 263                if (!lls->sub_is_enqueued)
 264                        continue;
 265
 266                lls->sub_is_enqueued = 0;
 267                subenv = lov_sublock_env_get(env, lock, lls);
 268                if (!IS_ERR(subenv)) {
 269                        cl_lock_cancel(subenv->lse_env, sublock);
 270                        lov_sublock_env_put(subenv);
 271                } else {
 272                        CL_LOCK_DEBUG(D_ERROR, env, slice->cls_lock,
 273                                      "lov_lock_cancel fails with %ld.\n",
 274                                      PTR_ERR(subenv));
 275                }
 276        }
 277}
 278
 279static int lov_lock_print(const struct lu_env *env, void *cookie,
 280                          lu_printer_t p, const struct cl_lock_slice *slice)
 281{
 282        struct lov_lock *lck = cl2lov_lock(slice);
 283        int           i;
 284
 285        (*p)(env, cookie, "%d\n", lck->lls_nr);
 286        for (i = 0; i < lck->lls_nr; ++i) {
 287                struct lov_lock_sub *sub;
 288
 289                sub = &lck->lls_sub[i];
 290                (*p)(env, cookie, "    %d %x: ", i, sub->sub_is_enqueued);
 291                cl_lock_print(env, cookie, p, &sub->sub_lock);
 292        }
 293        return 0;
 294}
 295
 296static const struct cl_lock_operations lov_lock_ops = {
 297        .clo_fini      = lov_lock_fini,
 298        .clo_enqueue   = lov_lock_enqueue,
 299        .clo_cancel    = lov_lock_cancel,
 300        .clo_print     = lov_lock_print
 301};
 302
 303int lov_lock_init_raid0(const struct lu_env *env, struct cl_object *obj,
 304                        struct cl_lock *lock, const struct cl_io *io)
 305{
 306        struct lov_lock *lck;
 307        int result = 0;
 308
 309        lck = lov_lock_sub_init(env, obj, lock);
 310        if (!IS_ERR(lck))
 311                cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_lock_ops);
 312        else
 313                result = PTR_ERR(lck);
 314        return result;
 315}
 316
 317static void lov_empty_lock_fini(const struct lu_env *env,
 318                                struct cl_lock_slice *slice)
 319{
 320        struct lov_lock *lck = cl2lov_lock(slice);
 321
 322        kmem_cache_free(lov_lock_kmem, lck);
 323}
 324
 325static int lov_empty_lock_print(const struct lu_env *env, void *cookie,
 326                                lu_printer_t p,
 327                                const struct cl_lock_slice *slice)
 328{
 329        (*p)(env, cookie, "empty\n");
 330        return 0;
 331}
 332
 333/* XXX: more methods will be added later. */
 334static const struct cl_lock_operations lov_empty_lock_ops = {
 335        .clo_fini  = lov_empty_lock_fini,
 336        .clo_print = lov_empty_lock_print
 337};
 338
 339int lov_lock_init_empty(const struct lu_env *env, struct cl_object *obj,
 340                        struct cl_lock *lock, const struct cl_io *io)
 341{
 342        struct lov_lock *lck;
 343        int result = -ENOMEM;
 344
 345        lck = kmem_cache_zalloc(lov_lock_kmem, GFP_NOFS);
 346        if (lck) {
 347                cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_empty_lock_ops);
 348                result = 0;
 349        }
 350        return result;
 351}
 352
 353/** @} lov */
 354