linux/drivers/staging/lustre/lustre/lov/lovsub_lock.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * Implementation of cl_lock for LOVSUB layer.
  37 *
  38 *   Author: Nikita Danilov <nikita.danilov@sun.com>
  39 */
  40
  41#define DEBUG_SUBSYSTEM S_LOV
  42
  43#include "lov_cl_internal.h"
  44
  45/** \addtogroup lov
  46 *  @{
  47 */
  48
  49/*****************************************************************************
  50 *
  51 * Lovsub lock operations.
  52 *
  53 */
  54
  55static void lovsub_lock_fini(const struct lu_env *env,
  56                             struct cl_lock_slice *slice)
  57{
  58        struct lovsub_lock   *lsl;
  59
  60        ENTRY;
  61        lsl = cl2lovsub_lock(slice);
  62        LASSERT(list_empty(&lsl->lss_parents));
  63        OBD_SLAB_FREE_PTR(lsl, lovsub_lock_kmem);
  64        EXIT;
  65}
  66
  67static void lovsub_parent_lock(const struct lu_env *env, struct lov_lock *lov)
  68{
  69        struct cl_lock *parent;
  70
  71        ENTRY;
  72        parent = lov->lls_cl.cls_lock;
  73        cl_lock_get(parent);
  74        lu_ref_add(&parent->cll_reference, "lovsub-parent", current);
  75        cl_lock_mutex_get(env, parent);
  76        EXIT;
  77}
  78
  79static void lovsub_parent_unlock(const struct lu_env *env, struct lov_lock *lov)
  80{
  81        struct cl_lock *parent;
  82
  83        ENTRY;
  84        parent = lov->lls_cl.cls_lock;
  85        cl_lock_mutex_put(env, lov->lls_cl.cls_lock);
  86        lu_ref_del(&parent->cll_reference, "lovsub-parent", current);
  87        cl_lock_put(env, parent);
  88        EXIT;
  89}
  90
  91/**
  92 * Implements cl_lock_operations::clo_state() method for lovsub layer, which
  93 * method is called whenever sub-lock state changes. Propagates state change
  94 * to the top-locks.
  95 */
  96static void lovsub_lock_state(const struct lu_env *env,
  97                              const struct cl_lock_slice *slice,
  98                              enum cl_lock_state state)
  99{
 100        struct lovsub_lock   *sub = cl2lovsub_lock(slice);
 101        struct lov_lock_link *scan;
 102
 103        LASSERT(cl_lock_is_mutexed(slice->cls_lock));
 104        ENTRY;
 105
 106        list_for_each_entry(scan, &sub->lss_parents, lll_list) {
 107                struct lov_lock *lov    = scan->lll_super;
 108                struct cl_lock  *parent = lov->lls_cl.cls_lock;
 109
 110                if (sub->lss_active != parent) {
 111                        lovsub_parent_lock(env, lov);
 112                        cl_lock_signal(env, parent);
 113                        lovsub_parent_unlock(env, lov);
 114                }
 115        }
 116        EXIT;
 117}
 118
 119/**
 120 * Implementation of cl_lock_operation::clo_weigh() estimating lock weight by
 121 * asking parent lock.
 122 */
 123static unsigned long lovsub_lock_weigh(const struct lu_env *env,
 124                                       const struct cl_lock_slice *slice)
 125{
 126        struct lovsub_lock *lock = cl2lovsub_lock(slice);
 127        struct lov_lock    *lov;
 128        unsigned long       dumbbell;
 129
 130        ENTRY;
 131
 132        LASSERT(cl_lock_is_mutexed(slice->cls_lock));
 133
 134        if (!list_empty(&lock->lss_parents)) {
 135                /*
 136                 * It is not clear whether all parents have to be asked and
 137                 * their estimations summed, or it is enough to ask one. For
 138                 * the current usages, one is always enough.
 139                 */
 140                lov = container_of(lock->lss_parents.next,
 141                                   struct lov_lock_link, lll_list)->lll_super;
 142
 143                lovsub_parent_lock(env, lov);
 144                dumbbell = cl_lock_weigh(env, lov->lls_cl.cls_lock);
 145                lovsub_parent_unlock(env, lov);
 146        } else
 147                dumbbell = 0;
 148
 149        RETURN(dumbbell);
 150}
 151
 152/**
 153 * Maps start/end offsets within a stripe, to offsets within a file.
 154 */
 155static void lovsub_lock_descr_map(const struct cl_lock_descr *in,
 156                                  struct lov_object *lov,
 157                                  int stripe, struct cl_lock_descr *out)
 158{
 159        pgoff_t size; /* stripe size in pages */
 160        pgoff_t skip; /* how many pages in every stripe are occupied by
 161                       * "other" stripes */
 162        pgoff_t start;
 163        pgoff_t end;
 164
 165        ENTRY;
 166        start = in->cld_start;
 167        end   = in->cld_end;
 168
 169        if (lov->lo_lsm->lsm_stripe_count > 1) {
 170                size = cl_index(lov2cl(lov), lov->lo_lsm->lsm_stripe_size);
 171                skip = (lov->lo_lsm->lsm_stripe_count - 1) * size;
 172
 173                /* XXX overflow check here? */
 174                start += start/size * skip + stripe * size;
 175
 176                if (end != CL_PAGE_EOF) {
 177                        end += end/size * skip + stripe * size;
 178                        /*
 179                         * And check for overflow...
 180                         */
 181                        if (end < in->cld_end)
 182                                end = CL_PAGE_EOF;
 183                }
 184        }
 185        out->cld_start = start;
 186        out->cld_end   = end;
 187        EXIT;
 188}
 189
 190/**
 191 * Adjusts parent lock extent when a sub-lock is attached to a parent. This is
 192 * called in two ways:
 193 *
 194 *     - as part of receive call-back, when server returns granted extent to
 195 *       the client, and
 196 *
 197 *     - when top-lock finds existing sub-lock in the cache.
 198 *
 199 * Note, that lock mode is not propagated to the parent: i.e., if CLM_READ
 200 * top-lock matches CLM_WRITE sub-lock, top-lock is still CLM_READ.
 201 */
 202int lov_sublock_modify(const struct lu_env *env, struct lov_lock *lov,
 203                       struct lovsub_lock *sublock,
 204                       const struct cl_lock_descr *d, int idx)
 205{
 206        struct cl_lock       *parent;
 207        struct lovsub_object *subobj;
 208        struct cl_lock_descr *pd;
 209        struct cl_lock_descr *parent_descr;
 210        int                result;
 211
 212        parent       = lov->lls_cl.cls_lock;
 213        parent_descr = &parent->cll_descr;
 214        LASSERT(cl_lock_mode_match(d->cld_mode, parent_descr->cld_mode));
 215
 216        subobj = cl2lovsub(sublock->lss_cl.cls_obj);
 217        pd     = &lov_env_info(env)->lti_ldescr;
 218
 219        pd->cld_obj  = parent_descr->cld_obj;
 220        pd->cld_mode = parent_descr->cld_mode;
 221        pd->cld_gid  = parent_descr->cld_gid;
 222        lovsub_lock_descr_map(d, subobj->lso_super, subobj->lso_index, pd);
 223        lov->lls_sub[idx].sub_got = *d;
 224        /*
 225         * Notify top-lock about modification, if lock description changes
 226         * materially.
 227         */
 228        if (!cl_lock_ext_match(parent_descr, pd))
 229                result = cl_lock_modify(env, parent, pd);
 230        else
 231                result = 0;
 232        return result;
 233}
 234
 235static int lovsub_lock_modify(const struct lu_env *env,
 236                              const struct cl_lock_slice *s,
 237                              const struct cl_lock_descr *d)
 238{
 239        struct lovsub_lock   *lock   = cl2lovsub_lock(s);
 240        struct lov_lock_link *scan;
 241        struct lov_lock      *lov;
 242        int result                 = 0;
 243
 244        ENTRY;
 245
 246        LASSERT(cl_lock_mode_match(d->cld_mode,
 247                                   s->cls_lock->cll_descr.cld_mode));
 248        list_for_each_entry(scan, &lock->lss_parents, lll_list) {
 249                int rc;
 250
 251                lov = scan->lll_super;
 252                lovsub_parent_lock(env, lov);
 253                rc = lov_sublock_modify(env, lov, lock, d, scan->lll_idx);
 254                lovsub_parent_unlock(env, lov);
 255                result = result ?: rc;
 256        }
 257        RETURN(result);
 258}
 259
 260static int lovsub_lock_closure(const struct lu_env *env,
 261                               const struct cl_lock_slice *slice,
 262                               struct cl_lock_closure *closure)
 263{
 264        struct lovsub_lock   *sub;
 265        struct cl_lock       *parent;
 266        struct lov_lock_link *scan;
 267        int                result;
 268
 269        LASSERT(cl_lock_is_mutexed(slice->cls_lock));
 270        ENTRY;
 271
 272        sub    = cl2lovsub_lock(slice);
 273        result = 0;
 274
 275        list_for_each_entry(scan, &sub->lss_parents, lll_list) {
 276                parent = scan->lll_super->lls_cl.cls_lock;
 277                result = cl_lock_closure_build(env, parent, closure);
 278                if (result != 0)
 279                        break;
 280        }
 281        RETURN(result);
 282}
 283
 284/**
 285 * A helper function for lovsub_lock_delete() that deals with a given parent
 286 * top-lock.
 287 */
 288static int lovsub_lock_delete_one(const struct lu_env *env,
 289                                  struct cl_lock *child, struct lov_lock *lov)
 290{
 291        struct cl_lock *parent;
 292        int          result;
 293        ENTRY;
 294
 295        parent = lov->lls_cl.cls_lock;
 296        if (parent->cll_error)
 297                RETURN(0);
 298
 299        result = 0;
 300        switch (parent->cll_state) {
 301        case CLS_ENQUEUED:
 302                /* See LU-1355 for the case that a glimpse lock is
 303                 * interrupted by signal */
 304                LASSERT(parent->cll_flags & CLF_CANCELLED);
 305                break;
 306        case CLS_QUEUING:
 307        case CLS_FREEING:
 308                cl_lock_signal(env, parent);
 309                break;
 310        case CLS_INTRANSIT:
 311                /*
 312                 * Here lies a problem: a sub-lock is canceled while top-lock
 313                 * is being unlocked. Top-lock cannot be moved into CLS_NEW
 314                 * state, because unlocking has to succeed eventually by
 315                 * placing lock into CLS_CACHED (or failing it), see
 316                 * cl_unuse_try(). Nor can top-lock be left in CLS_CACHED
 317                 * state, because lov maintains an invariant that all
 318                 * sub-locks exist in CLS_CACHED (this allows cached top-lock
 319                 * to be reused immediately). Nor can we wait for top-lock
 320                 * state to change, because this can be synchronous to the
 321                 * current thread.
 322                 *
 323                 * We know for sure that lov_lock_unuse() will be called at
 324                 * least one more time to finish un-using, so leave a mark on
 325                 * the top-lock, that will be seen by the next call to
 326                 * lov_lock_unuse().
 327                 */
 328                if (cl_lock_is_intransit(parent))
 329                        lov->lls_cancel_race = 1;
 330                break;
 331        case CLS_CACHED:
 332                /*
 333                 * if a sub-lock is canceled move its top-lock into CLS_NEW
 334                 * state to preserve an invariant that a top-lock in
 335                 * CLS_CACHED is immediately ready for re-use (i.e., has all
 336                 * sub-locks), and so that next attempt to re-use the top-lock
 337                 * enqueues missing sub-lock.
 338                 */
 339                cl_lock_state_set(env, parent, CLS_NEW);
 340                /* fall through */
 341        case CLS_NEW:
 342                /*
 343                 * if last sub-lock is canceled, destroy the top-lock (which
 344                 * is now `empty') proactively.
 345                 */
 346                if (lov->lls_nr_filled == 0) {
 347                        /* ... but unfortunately, this cannot be done easily,
 348                         * as cancellation of a top-lock might acquire mutices
 349                         * of its other sub-locks, violating lock ordering,
 350                         * see cl_lock_{cancel,delete}() preconditions.
 351                         *
 352                         * To work around this, the mutex of this sub-lock is
 353                         * released, top-lock is destroyed, and sub-lock mutex
 354                         * acquired again. The list of parents has to be
 355                         * re-scanned from the beginning after this.
 356                         *
 357                         * Only do this if no mutices other than on @child and
 358                         * @parent are held by the current thread.
 359                         *
 360                         * TODO: The lock modal here is too complex, because
 361                         * the lock may be canceled and deleted by voluntarily:
 362                         *    cl_lock_request
 363                         *      -> osc_lock_enqueue_wait
 364                         *      -> osc_lock_cancel_wait
 365                         *        -> cl_lock_delete
 366                         *          -> lovsub_lock_delete
 367                         *            -> cl_lock_cancel/delete
 368                         *              -> ...
 369                         *
 370                         * The better choice is to spawn a kernel thread for
 371                         * this purpose. -jay
 372                         */
 373                        if (cl_lock_nr_mutexed(env) == 2) {
 374                                cl_lock_mutex_put(env, child);
 375                                cl_lock_cancel(env, parent);
 376                                cl_lock_delete(env, parent);
 377                                result = 1;
 378                        }
 379                }
 380                break;
 381        case CLS_HELD:
 382                CL_LOCK_DEBUG(D_ERROR, env, parent, "Delete CLS_HELD lock\n");
 383        default:
 384                CERROR("Impossible state: %d\n", parent->cll_state);
 385                LBUG();
 386                break;
 387        }
 388
 389        RETURN(result);
 390}
 391
 392/**
 393 * An implementation of cl_lock_operations::clo_delete() method. This is
 394 * invoked in "bottom-to-top" delete, when lock destruction starts from the
 395 * sub-lock (e.g, as a result of ldlm lock LRU policy).
 396 */
 397static void lovsub_lock_delete(const struct lu_env *env,
 398                               const struct cl_lock_slice *slice)
 399{
 400        struct cl_lock     *child = slice->cls_lock;
 401        struct lovsub_lock *sub   = cl2lovsub_lock(slice);
 402        int restart;
 403
 404        LASSERT(cl_lock_is_mutexed(child));
 405
 406        ENTRY;
 407        /*
 408         * Destruction of a sub-lock might take multiple iterations, because
 409         * when the last sub-lock of a given top-lock is deleted, top-lock is
 410         * canceled proactively, and this requires to release sub-lock
 411         * mutex. Once sub-lock mutex has been released, list of its parents
 412         * has to be re-scanned from the beginning.
 413         */
 414        do {
 415                struct lov_lock      *lov;
 416                struct lov_lock_link *scan;
 417                struct lov_lock_link *temp;
 418                struct lov_lock_sub  *subdata;
 419
 420                restart = 0;
 421                list_for_each_entry_safe(scan, temp,
 422                                             &sub->lss_parents, lll_list) {
 423                        lov     = scan->lll_super;
 424                        subdata = &lov->lls_sub[scan->lll_idx];
 425                        lovsub_parent_lock(env, lov);
 426                        subdata->sub_got = subdata->sub_descr;
 427                        lov_lock_unlink(env, scan, sub);
 428                        restart = lovsub_lock_delete_one(env, child, lov);
 429                        lovsub_parent_unlock(env, lov);
 430
 431                        if (restart) {
 432                                cl_lock_mutex_get(env, child);
 433                                break;
 434                        }
 435               }
 436        } while (restart);
 437        EXIT;
 438}
 439
 440static int lovsub_lock_print(const struct lu_env *env, void *cookie,
 441                             lu_printer_t p, const struct cl_lock_slice *slice)
 442{
 443        struct lovsub_lock   *sub = cl2lovsub_lock(slice);
 444        struct lov_lock      *lov;
 445        struct lov_lock_link *scan;
 446
 447        list_for_each_entry(scan, &sub->lss_parents, lll_list) {
 448                lov = scan->lll_super;
 449                (*p)(env, cookie, "[%d %p ", scan->lll_idx, lov);
 450                if (lov != NULL)
 451                        cl_lock_descr_print(env, cookie, p,
 452                                            &lov->lls_cl.cls_lock->cll_descr);
 453                (*p)(env, cookie, "] ");
 454        }
 455        return 0;
 456}
 457
 458static const struct cl_lock_operations lovsub_lock_ops = {
 459        .clo_fini    = lovsub_lock_fini,
 460        .clo_state   = lovsub_lock_state,
 461        .clo_delete  = lovsub_lock_delete,
 462        .clo_modify  = lovsub_lock_modify,
 463        .clo_closure = lovsub_lock_closure,
 464        .clo_weigh   = lovsub_lock_weigh,
 465        .clo_print   = lovsub_lock_print
 466};
 467
 468int lovsub_lock_init(const struct lu_env *env, struct cl_object *obj,
 469                     struct cl_lock *lock, const struct cl_io *io)
 470{
 471        struct lovsub_lock *lsk;
 472        int result;
 473
 474        ENTRY;
 475        OBD_SLAB_ALLOC_PTR_GFP(lsk, lovsub_lock_kmem, __GFP_IO);
 476        if (lsk != NULL) {
 477                INIT_LIST_HEAD(&lsk->lss_parents);
 478                cl_lock_slice_add(lock, &lsk->lss_cl, obj, &lovsub_lock_ops);
 479                result = 0;
 480        } else
 481                result = -ENOMEM;
 482        RETURN(result);
 483}
 484
 485/** @} lov */
 486