linux/drivers/staging/lustre/lustre/osc/osc_lock.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * Implementation of cl_lock for OSC layer.
  37 *
  38 *   Author: Nikita Danilov <nikita.danilov@sun.com>
  39 */
  40
  41#define DEBUG_SUBSYSTEM S_OSC
  42
  43# include <linux/libcfs/libcfs.h>
  44/* fid_build_reg_res_name() */
  45#include <lustre_fid.h>
  46
  47#include "osc_cl_internal.h"
  48
  49/** \addtogroup osc
  50 *  @{
  51 */
  52
  53#define _PAGEREF_MAGIC  (-10000000)
  54
  55/*****************************************************************************
  56 *
  57 * Type conversions.
  58 *
  59 */
  60
  61static const struct cl_lock_operations osc_lock_ops;
  62static const struct cl_lock_operations osc_lock_lockless_ops;
  63static void osc_lock_to_lockless(const struct lu_env *env,
  64                                 struct osc_lock *ols, int force);
  65static int osc_lock_has_pages(struct osc_lock *olck);
  66
  67int osc_lock_is_lockless(const struct osc_lock *olck)
  68{
  69        return (olck->ols_cl.cls_ops == &osc_lock_lockless_ops);
  70}
  71
  72/**
  73 * Returns a weak pointer to the ldlm lock identified by a handle. Returned
  74 * pointer cannot be dereferenced, as lock is not protected from concurrent
  75 * reclaim. This function is a helper for osc_lock_invariant().
  76 */
  77static struct ldlm_lock *osc_handle_ptr(struct lustre_handle *handle)
  78{
  79        struct ldlm_lock *lock;
  80
  81        lock = ldlm_handle2lock(handle);
  82        if (lock != NULL)
  83                LDLM_LOCK_PUT(lock);
  84        return lock;
  85}
  86
  87/**
  88 * Invariant that has to be true all of the time.
  89 */
  90static int osc_lock_invariant(struct osc_lock *ols)
  91{
  92        struct ldlm_lock *lock        = osc_handle_ptr(&ols->ols_handle);
  93        struct ldlm_lock *olock       = ols->ols_lock;
  94        int               handle_used = lustre_handle_is_used(&ols->ols_handle);
  95
  96        if (ergo(osc_lock_is_lockless(ols),
  97                 ols->ols_locklessable && ols->ols_lock == NULL))
  98                return 1;
  99
 100        /*
 101         * If all the following "ergo"s are true, return 1, otherwise 0
 102         */
 103        if (! ergo(olock != NULL, handle_used))
 104                return 0;
 105
 106        if (! ergo(olock != NULL,
 107                   olock->l_handle.h_cookie == ols->ols_handle.cookie))
 108                return 0;
 109
 110        if (! ergo(handle_used,
 111                   ergo(lock != NULL && olock != NULL, lock == olock) &&
 112                   ergo(lock == NULL, olock == NULL)))
 113                return 0;
 114        /*
 115         * Check that ->ols_handle and ->ols_lock are consistent, but
 116         * take into account that they are set at the different time.
 117         */
 118        if (! ergo(ols->ols_state == OLS_CANCELLED,
 119                   olock == NULL && !handle_used))
 120                return 0;
 121        /*
 122         * DLM lock is destroyed only after we have seen cancellation
 123         * ast.
 124         */
 125        if (! ergo(olock != NULL && ols->ols_state < OLS_CANCELLED,
 126                   ((olock->l_flags & LDLM_FL_DESTROYED) == 0)))
 127                return 0;
 128
 129        if (! ergo(ols->ols_state == OLS_GRANTED,
 130                   olock != NULL &&
 131                   olock->l_req_mode == olock->l_granted_mode &&
 132                   ols->ols_hold))
 133                return 0;
 134        return 1;
 135}
 136
 137/*****************************************************************************
 138 *
 139 * Lock operations.
 140 *
 141 */
 142
 143/**
 144 * Breaks a link between osc_lock and dlm_lock.
 145 */
 146static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck)
 147{
 148        struct ldlm_lock *dlmlock;
 149
 150        spin_lock(&osc_ast_guard);
 151        dlmlock = olck->ols_lock;
 152        if (dlmlock == NULL) {
 153                spin_unlock(&osc_ast_guard);
 154                return;
 155        }
 156
 157        olck->ols_lock = NULL;
 158        /* wb(); --- for all who checks (ols->ols_lock != NULL) before
 159         * call to osc_lock_detach() */
 160        dlmlock->l_ast_data = NULL;
 161        olck->ols_handle.cookie = 0ULL;
 162        spin_unlock(&osc_ast_guard);
 163
 164        lock_res_and_lock(dlmlock);
 165        if (dlmlock->l_granted_mode == dlmlock->l_req_mode) {
 166                struct cl_object *obj = olck->ols_cl.cls_obj;
 167                struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
 168                __u64 old_kms;
 169
 170                cl_object_attr_lock(obj);
 171                /* Must get the value under the lock to avoid possible races. */
 172                old_kms = cl2osc(obj)->oo_oinfo->loi_kms;
 173                /* Update the kms. Need to loop all granted locks.
 174                 * Not a problem for the client */
 175                attr->cat_kms = ldlm_extent_shift_kms(dlmlock, old_kms);
 176
 177                cl_object_attr_set(env, obj, attr, CAT_KMS);
 178                cl_object_attr_unlock(obj);
 179        }
 180        unlock_res_and_lock(dlmlock);
 181
 182        /* release a reference taken in osc_lock_upcall0(). */
 183        LASSERT(olck->ols_has_ref);
 184        lu_ref_del(&dlmlock->l_reference, "osc_lock", olck);
 185        LDLM_LOCK_RELEASE(dlmlock);
 186        olck->ols_has_ref = 0;
 187}
 188
 189static int osc_lock_unhold(struct osc_lock *ols)
 190{
 191        int result = 0;
 192
 193        if (ols->ols_hold) {
 194                ols->ols_hold = 0;
 195                result = osc_cancel_base(&ols->ols_handle,
 196                                         ols->ols_einfo.ei_mode);
 197        }
 198        return result;
 199}
 200
 201static int osc_lock_unuse(const struct lu_env *env,
 202                          const struct cl_lock_slice *slice)
 203{
 204        struct osc_lock *ols = cl2osc_lock(slice);
 205
 206        LINVRNT(osc_lock_invariant(ols));
 207
 208        switch (ols->ols_state) {
 209        case OLS_NEW:
 210                LASSERT(!ols->ols_hold);
 211                LASSERT(ols->ols_agl);
 212                return 0;
 213        case OLS_UPCALL_RECEIVED:
 214                osc_lock_unhold(ols);
 215        case OLS_ENQUEUED:
 216                LASSERT(!ols->ols_hold);
 217                osc_lock_detach(env, ols);
 218                ols->ols_state = OLS_NEW;
 219                return 0;
 220        case OLS_GRANTED:
 221                LASSERT(!ols->ols_glimpse);
 222                LASSERT(ols->ols_hold);
 223                /*
 224                 * Move lock into OLS_RELEASED state before calling
 225                 * osc_cancel_base() so that possible synchronous cancellation
 226                 * (that always happens e.g., for liblustre) sees that lock is
 227                 * released.
 228                 */
 229                ols->ols_state = OLS_RELEASED;
 230                return osc_lock_unhold(ols);
 231        default:
 232                CERROR("Impossible state: %d\n", ols->ols_state);
 233                LBUG();
 234        }
 235}
 236
 237static void osc_lock_fini(const struct lu_env *env,
 238                          struct cl_lock_slice *slice)
 239{
 240        struct osc_lock  *ols = cl2osc_lock(slice);
 241
 242        LINVRNT(osc_lock_invariant(ols));
 243        /*
 244         * ->ols_hold can still be true at this point if, for example, a
 245         * thread that requested a lock was killed (and released a reference
 246         * to the lock), before reply from a server was received. In this case
 247         * lock is destroyed immediately after upcall.
 248         */
 249        osc_lock_unhold(ols);
 250        LASSERT(ols->ols_lock == NULL);
 251        LASSERT(atomic_read(&ols->ols_pageref) == 0 ||
 252                atomic_read(&ols->ols_pageref) == _PAGEREF_MAGIC);
 253
 254        OBD_SLAB_FREE_PTR(ols, osc_lock_kmem);
 255}
 256
 257static void osc_lock_build_policy(const struct lu_env *env,
 258                                  const struct cl_lock *lock,
 259                                  ldlm_policy_data_t *policy)
 260{
 261        const struct cl_lock_descr *d = &lock->cll_descr;
 262
 263        osc_index2policy(policy, d->cld_obj, d->cld_start, d->cld_end);
 264        policy->l_extent.gid = d->cld_gid;
 265}
 266
 267static __u64 osc_enq2ldlm_flags(__u32 enqflags)
 268{
 269        __u64 result = 0;
 270
 271        LASSERT((enqflags & ~CEF_MASK) == 0);
 272
 273        if (enqflags & CEF_NONBLOCK)
 274                result |= LDLM_FL_BLOCK_NOWAIT;
 275        if (enqflags & CEF_ASYNC)
 276                result |= LDLM_FL_HAS_INTENT;
 277        if (enqflags & CEF_DISCARD_DATA)
 278                result |= LDLM_FL_AST_DISCARD_DATA;
 279        return result;
 280}
 281
 282/**
 283 * Global spin-lock protecting consistency of ldlm_lock::l_ast_data
 284 * pointers. Initialized in osc_init().
 285 */
 286spinlock_t osc_ast_guard;
 287
 288static struct osc_lock *osc_ast_data_get(struct ldlm_lock *dlm_lock)
 289{
 290        struct osc_lock *olck;
 291
 292        lock_res_and_lock(dlm_lock);
 293        spin_lock(&osc_ast_guard);
 294        olck = dlm_lock->l_ast_data;
 295        if (olck != NULL) {
 296                struct cl_lock *lock = olck->ols_cl.cls_lock;
 297                /*
 298                 * If osc_lock holds a reference on ldlm lock, return it even
 299                 * when cl_lock is in CLS_FREEING state. This way
 300                 *
 301                 *       osc_ast_data_get(dlmlock) == NULL
 302                 *
 303                 * guarantees that all osc references on dlmlock were
 304                 * released. osc_dlm_blocking_ast0() relies on that.
 305                 */
 306                if (lock->cll_state < CLS_FREEING || olck->ols_has_ref) {
 307                        cl_lock_get_trust(lock);
 308                        lu_ref_add_atomic(&lock->cll_reference,
 309                                          "ast", current);
 310                } else
 311                        olck = NULL;
 312        }
 313        spin_unlock(&osc_ast_guard);
 314        unlock_res_and_lock(dlm_lock);
 315        return olck;
 316}
 317
 318static void osc_ast_data_put(const struct lu_env *env, struct osc_lock *olck)
 319{
 320        struct cl_lock *lock;
 321
 322        lock = olck->ols_cl.cls_lock;
 323        lu_ref_del(&lock->cll_reference, "ast", current);
 324        cl_lock_put(env, lock);
 325}
 326
 327/**
 328 * Updates object attributes from a lock value block (lvb) received together
 329 * with the DLM lock reply from the server. Copy of osc_update_enqueue()
 330 * logic.
 331 *
 332 * This can be optimized to not update attributes when lock is a result of a
 333 * local match.
 334 *
 335 * Called under lock and resource spin-locks.
 336 */
 337static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck,
 338                                int rc)
 339{
 340        struct ost_lvb    *lvb;
 341        struct cl_object  *obj;
 342        struct lov_oinfo  *oinfo;
 343        struct cl_attr    *attr;
 344        unsigned           valid;
 345
 346        if (!(olck->ols_flags & LDLM_FL_LVB_READY))
 347                return;
 348
 349        lvb   = &olck->ols_lvb;
 350        obj   = olck->ols_cl.cls_obj;
 351        oinfo = cl2osc(obj)->oo_oinfo;
 352        attr  = &osc_env_info(env)->oti_attr;
 353        valid = CAT_BLOCKS | CAT_ATIME | CAT_CTIME | CAT_MTIME | CAT_SIZE;
 354        cl_lvb2attr(attr, lvb);
 355
 356        cl_object_attr_lock(obj);
 357        if (rc == 0) {
 358                struct ldlm_lock  *dlmlock;
 359                __u64 size;
 360
 361                dlmlock = olck->ols_lock;
 362                LASSERT(dlmlock != NULL);
 363
 364                /* re-grab LVB from a dlm lock under DLM spin-locks. */
 365                *lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
 366                size = lvb->lvb_size;
 367                /* Extend KMS up to the end of this lock and no further
 368                 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
 369                if (size > dlmlock->l_policy_data.l_extent.end)
 370                        size = dlmlock->l_policy_data.l_extent.end + 1;
 371                if (size >= oinfo->loi_kms) {
 372                        LDLM_DEBUG(dlmlock, "lock acquired, setting rss="LPU64
 373                                   ", kms="LPU64, lvb->lvb_size, size);
 374                        valid |= CAT_KMS;
 375                        attr->cat_kms = size;
 376                } else {
 377                        LDLM_DEBUG(dlmlock, "lock acquired, setting rss="
 378                                   LPU64"; leaving kms="LPU64", end="LPU64,
 379                                   lvb->lvb_size, oinfo->loi_kms,
 380                                   dlmlock->l_policy_data.l_extent.end);
 381                }
 382                ldlm_lock_allow_match_locked(dlmlock);
 383        } else if (rc == -ENAVAIL && olck->ols_glimpse) {
 384                CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
 385                       " kms="LPU64"\n", lvb->lvb_size, oinfo->loi_kms);
 386        } else
 387                valid = 0;
 388
 389        if (valid != 0)
 390                cl_object_attr_set(env, obj, attr, valid);
 391
 392        cl_object_attr_unlock(obj);
 393}
 394
 395/**
 396 * Called when a lock is granted, from an upcall (when server returned a
 397 * granted lock), or from completion AST, when server returned a blocked lock.
 398 *
 399 * Called under lock and resource spin-locks, that are released temporarily
 400 * here.
 401 */
 402static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck,
 403                             struct ldlm_lock *dlmlock, int rc)
 404{
 405        struct ldlm_extent   *ext;
 406        struct cl_lock       *lock;
 407        struct cl_lock_descr *descr;
 408
 409        LASSERT(dlmlock->l_granted_mode == dlmlock->l_req_mode);
 410
 411        if (olck->ols_state < OLS_GRANTED) {
 412                lock  = olck->ols_cl.cls_lock;
 413                ext   = &dlmlock->l_policy_data.l_extent;
 414                descr = &osc_env_info(env)->oti_descr;
 415                descr->cld_obj = lock->cll_descr.cld_obj;
 416
 417                /* XXX check that ->l_granted_mode is valid. */
 418                descr->cld_mode  = osc_ldlm2cl_lock(dlmlock->l_granted_mode);
 419                descr->cld_start = cl_index(descr->cld_obj, ext->start);
 420                descr->cld_end   = cl_index(descr->cld_obj, ext->end);
 421                descr->cld_gid   = ext->gid;
 422                /*
 423                 * tell upper layers the extent of the lock that was actually
 424                 * granted
 425                 */
 426                olck->ols_state = OLS_GRANTED;
 427                osc_lock_lvb_update(env, olck, rc);
 428
 429                /* release DLM spin-locks to allow cl_lock_{modify,signal}()
 430                 * to take a semaphore on a parent lock. This is safe, because
 431                 * spin-locks are needed to protect consistency of
 432                 * dlmlock->l_*_mode and LVB, and we have finished processing
 433                 * them. */
 434                unlock_res_and_lock(dlmlock);
 435                cl_lock_modify(env, lock, descr);
 436                cl_lock_signal(env, lock);
 437                LINVRNT(osc_lock_invariant(olck));
 438                lock_res_and_lock(dlmlock);
 439        }
 440}
 441
 442static void osc_lock_upcall0(const struct lu_env *env, struct osc_lock *olck)
 443
 444{
 445        struct ldlm_lock *dlmlock;
 446
 447        dlmlock = ldlm_handle2lock_long(&olck->ols_handle, 0);
 448        LASSERT(dlmlock != NULL);
 449
 450        lock_res_and_lock(dlmlock);
 451        spin_lock(&osc_ast_guard);
 452        LASSERT(dlmlock->l_ast_data == olck);
 453        LASSERT(olck->ols_lock == NULL);
 454        olck->ols_lock = dlmlock;
 455        spin_unlock(&osc_ast_guard);
 456
 457        /*
 458         * Lock might be not yet granted. In this case, completion ast
 459         * (osc_ldlm_completion_ast()) comes later and finishes lock
 460         * granting.
 461         */
 462        if (dlmlock->l_granted_mode == dlmlock->l_req_mode)
 463                osc_lock_granted(env, olck, dlmlock, 0);
 464        unlock_res_and_lock(dlmlock);
 465
 466        /*
 467         * osc_enqueue_interpret() decrefs asynchronous locks, counter
 468         * this.
 469         */
 470        ldlm_lock_addref(&olck->ols_handle, olck->ols_einfo.ei_mode);
 471        olck->ols_hold = 1;
 472
 473        /* lock reference taken by ldlm_handle2lock_long() is owned by
 474         * osc_lock and released in osc_lock_detach() */
 475        lu_ref_add(&dlmlock->l_reference, "osc_lock", olck);
 476        olck->ols_has_ref = 1;
 477}
 478
 479/**
 480 * Lock upcall function that is executed either when a reply to ENQUEUE rpc is
 481 * received from a server, or after osc_enqueue_base() matched a local DLM
 482 * lock.
 483 */
 484static int osc_lock_upcall(void *cookie, int errcode)
 485{
 486        struct osc_lock  *olck  = cookie;
 487        struct cl_lock_slice    *slice = &olck->ols_cl;
 488        struct cl_lock    *lock  = slice->cls_lock;
 489        struct lu_env      *env;
 490        struct cl_env_nest       nest;
 491
 492        env = cl_env_nested_get(&nest);
 493        if (!IS_ERR(env)) {
 494                int rc;
 495
 496                cl_lock_mutex_get(env, lock);
 497
 498                LASSERT(lock->cll_state >= CLS_QUEUING);
 499                if (olck->ols_state == OLS_ENQUEUED) {
 500                        olck->ols_state = OLS_UPCALL_RECEIVED;
 501                        rc = ldlm_error2errno(errcode);
 502                } else if (olck->ols_state == OLS_CANCELLED) {
 503                        rc = -EIO;
 504                } else {
 505                        CERROR("Impossible state: %d\n", olck->ols_state);
 506                        LBUG();
 507                }
 508                if (rc) {
 509                        struct ldlm_lock *dlmlock;
 510
 511                        dlmlock = ldlm_handle2lock(&olck->ols_handle);
 512                        if (dlmlock != NULL) {
 513                                lock_res_and_lock(dlmlock);
 514                                spin_lock(&osc_ast_guard);
 515                                LASSERT(olck->ols_lock == NULL);
 516                                dlmlock->l_ast_data = NULL;
 517                                olck->ols_handle.cookie = 0ULL;
 518                                spin_unlock(&osc_ast_guard);
 519                                ldlm_lock_fail_match_locked(dlmlock);
 520                                unlock_res_and_lock(dlmlock);
 521                                LDLM_LOCK_PUT(dlmlock);
 522                        }
 523                } else {
 524                        if (olck->ols_glimpse)
 525                                olck->ols_glimpse = 0;
 526                        osc_lock_upcall0(env, olck);
 527                }
 528
 529                /* Error handling, some errors are tolerable. */
 530                if (olck->ols_locklessable && rc == -EUSERS) {
 531                        /* This is a tolerable error, turn this lock into
 532                         * lockless lock.
 533                         */
 534                        osc_object_set_contended(cl2osc(slice->cls_obj));
 535                        LASSERT(slice->cls_ops == &osc_lock_ops);
 536
 537                        /* Change this lock to ldlmlock-less lock. */
 538                        osc_lock_to_lockless(env, olck, 1);
 539                        olck->ols_state = OLS_GRANTED;
 540                        rc = 0;
 541                } else if (olck->ols_glimpse && rc == -ENAVAIL) {
 542                        osc_lock_lvb_update(env, olck, rc);
 543                        cl_lock_delete(env, lock);
 544                        /* Hide the error. */
 545                        rc = 0;
 546                }
 547
 548                if (rc == 0) {
 549                        /* For AGL case, the RPC sponsor may exits the cl_lock
 550                        *  processing without wait() called before related OSC
 551                        *  lock upcall(). So update the lock status according
 552                        *  to the enqueue result inside AGL upcall(). */
 553                        if (olck->ols_agl) {
 554                                lock->cll_flags |= CLF_FROM_UPCALL;
 555                                cl_wait_try(env, lock);
 556                                lock->cll_flags &= ~CLF_FROM_UPCALL;
 557                                if (!olck->ols_glimpse)
 558                                        olck->ols_agl = 0;
 559                        }
 560                        cl_lock_signal(env, lock);
 561                        /* del user for lock upcall cookie */
 562                        cl_unuse_try(env, lock);
 563                } else {
 564                        /* del user for lock upcall cookie */
 565                        cl_lock_user_del(env, lock);
 566                        cl_lock_error(env, lock, rc);
 567                }
 568
 569                /* release cookie reference, acquired by osc_lock_enqueue() */
 570                cl_lock_hold_release(env, lock, "upcall", lock);
 571                cl_lock_mutex_put(env, lock);
 572
 573                lu_ref_del(&lock->cll_reference, "upcall", lock);
 574                /* This maybe the last reference, so must be called after
 575                 * cl_lock_mutex_put(). */
 576                cl_lock_put(env, lock);
 577
 578                cl_env_nested_put(&nest, env);
 579        } else {
 580                /* should never happen, similar to osc_ldlm_blocking_ast(). */
 581                LBUG();
 582        }
 583        return errcode;
 584}
 585
 586/**
 587 * Core of osc_dlm_blocking_ast() logic.
 588 */
 589static void osc_lock_blocking(const struct lu_env *env,
 590                              struct ldlm_lock *dlmlock,
 591                              struct osc_lock *olck, int blocking)
 592{
 593        struct cl_lock *lock = olck->ols_cl.cls_lock;
 594
 595        LASSERT(olck->ols_lock == dlmlock);
 596        CLASSERT(OLS_BLOCKED < OLS_CANCELLED);
 597        LASSERT(!osc_lock_is_lockless(olck));
 598
 599        /*
 600         * Lock might be still addref-ed here, if e.g., blocking ast
 601         * is sent for a failed lock.
 602         */
 603        osc_lock_unhold(olck);
 604
 605        if (blocking && olck->ols_state < OLS_BLOCKED)
 606                /*
 607                 * Move osc_lock into OLS_BLOCKED before canceling the lock,
 608                 * because it recursively re-enters osc_lock_blocking(), with
 609                 * the state set to OLS_CANCELLED.
 610                 */
 611                olck->ols_state = OLS_BLOCKED;
 612        /*
 613         * cancel and destroy lock at least once no matter how blocking ast is
 614         * entered (see comment above osc_ldlm_blocking_ast() for use
 615         * cases). cl_lock_cancel() and cl_lock_delete() are idempotent.
 616         */
 617        cl_lock_cancel(env, lock);
 618        cl_lock_delete(env, lock);
 619}
 620
 621/**
 622 * Helper for osc_dlm_blocking_ast() handling discrepancies between cl_lock
 623 * and ldlm_lock caches.
 624 */
 625static int osc_dlm_blocking_ast0(const struct lu_env *env,
 626                                 struct ldlm_lock *dlmlock,
 627                                 void *data, int flag)
 628{
 629        struct osc_lock *olck;
 630        struct cl_lock  *lock;
 631        int result;
 632        int cancel;
 633
 634        LASSERT(flag == LDLM_CB_BLOCKING || flag == LDLM_CB_CANCELING);
 635
 636        cancel = 0;
 637        olck = osc_ast_data_get(dlmlock);
 638        if (olck != NULL) {
 639                lock = olck->ols_cl.cls_lock;
 640                cl_lock_mutex_get(env, lock);
 641                LINVRNT(osc_lock_invariant(olck));
 642                if (olck->ols_ast_wait) {
 643                        /* wake up osc_lock_use() */
 644                        cl_lock_signal(env, lock);
 645                        olck->ols_ast_wait = 0;
 646                }
 647                /*
 648                 * Lock might have been canceled while this thread was
 649                 * sleeping for lock mutex, but olck is pinned in memory.
 650                 */
 651                if (olck == dlmlock->l_ast_data) {
 652                        /*
 653                         * NOTE: DLM sends blocking AST's for failed locks
 654                         *       (that are still in pre-OLS_GRANTED state)
 655                         *       too, and they have to be canceled otherwise
 656                         *       DLM lock is never destroyed and stuck in
 657                         *       the memory.
 658                         *
 659                         *       Alternatively, ldlm_cli_cancel() can be
 660                         *       called here directly for osc_locks with
 661                         *       ols_state < OLS_GRANTED to maintain an
 662                         *       invariant that ->clo_cancel() is only called
 663                         *       for locks that were granted.
 664                         */
 665                        LASSERT(data == olck);
 666                        osc_lock_blocking(env, dlmlock,
 667                                          olck, flag == LDLM_CB_BLOCKING);
 668                } else
 669                        cancel = 1;
 670                cl_lock_mutex_put(env, lock);
 671                osc_ast_data_put(env, olck);
 672        } else
 673                /*
 674                 * DLM lock exists, but there is no cl_lock attached to it.
 675                 * This is a `normal' race. cl_object and its cl_lock's can be
 676                 * removed by memory pressure, together with all pages.
 677                 */
 678                cancel = (flag == LDLM_CB_BLOCKING);
 679
 680        if (cancel) {
 681                struct lustre_handle *lockh;
 682
 683                lockh = &osc_env_info(env)->oti_handle;
 684                ldlm_lock2handle(dlmlock, lockh);
 685                result = ldlm_cli_cancel(lockh, LCF_ASYNC);
 686        } else
 687                result = 0;
 688        return result;
 689}
 690
 691/**
 692 * Blocking ast invoked by ldlm when dlm lock is either blocking progress of
 693 * some other lock, or is canceled. This function is installed as a
 694 * ldlm_lock::l_blocking_ast() for client extent locks.
 695 *
 696 * Control flow is tricky, because ldlm uses the same call-back
 697 * (ldlm_lock::l_blocking_ast()) for both blocking and cancellation ast's.
 698 *
 699 * \param dlmlock lock for which ast occurred.
 700 *
 701 * \param new description of a conflicting lock in case of blocking ast.
 702 *
 703 * \param data value of dlmlock->l_ast_data
 704 *
 705 * \param flag LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
 706 *           cancellation and blocking ast's.
 707 *
 708 * Possible use cases:
 709 *
 710 *     - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING) to cancel
 711 *       lock due to lock lru pressure, or explicit user request to purge
 712 *       locks.
 713 *
 714 *     - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_BLOCKING) to notify
 715 *       us that dlmlock conflicts with another lock that some client is
 716 *       enqueing. Lock is canceled.
 717 *
 718 *         - cl_lock_cancel() is called. osc_lock_cancel() calls
 719 *           ldlm_cli_cancel() that calls
 720 *
 721 *                dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING)
 722 *
 723 *           recursively entering osc_ldlm_blocking_ast().
 724 *
 725 *     - client cancels lock voluntary (e.g., as a part of early cancellation):
 726 *
 727 *         cl_lock_cancel()->
 728 *           osc_lock_cancel()->
 729 *             ldlm_cli_cancel()->
 730 *               dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING)
 731 *
 732 */
 733static int osc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
 734                                 struct ldlm_lock_desc *new, void *data,
 735                                 int flag)
 736{
 737        struct lu_env     *env;
 738        struct cl_env_nest nest;
 739        int             result;
 740
 741        /*
 742         * This can be called in the context of outer IO, e.g.,
 743         *
 744         *     cl_enqueue()->...
 745         *       ->osc_enqueue_base()->...
 746         *       ->ldlm_prep_elc_req()->...
 747         *         ->ldlm_cancel_callback()->...
 748         *           ->osc_ldlm_blocking_ast()
 749         *
 750         * new environment has to be created to not corrupt outer context.
 751         */
 752        env = cl_env_nested_get(&nest);
 753        if (!IS_ERR(env)) {
 754                result = osc_dlm_blocking_ast0(env, dlmlock, data, flag);
 755                cl_env_nested_put(&nest, env);
 756        } else {
 757                result = PTR_ERR(env);
 758                /*
 759                 * XXX This should never happen, as cl_lock is
 760                 * stuck. Pre-allocated environment a la vvp_inode_fini_env
 761                 * should be used.
 762                 */
 763                LBUG();
 764        }
 765        if (result != 0) {
 766                if (result == -ENODATA)
 767                        result = 0;
 768                else
 769                        CERROR("BAST failed: %d\n", result);
 770        }
 771        return result;
 772}
 773
 774static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock,
 775                                   __u64 flags, void *data)
 776{
 777        struct cl_env_nest nest;
 778        struct lu_env     *env;
 779        struct osc_lock   *olck;
 780        struct cl_lock    *lock;
 781        int result;
 782        int dlmrc;
 783
 784        /* first, do dlm part of the work */
 785        dlmrc = ldlm_completion_ast_async(dlmlock, flags, data);
 786        /* then, notify cl_lock */
 787        env = cl_env_nested_get(&nest);
 788        if (!IS_ERR(env)) {
 789                olck = osc_ast_data_get(dlmlock);
 790                if (olck != NULL) {
 791                        lock = olck->ols_cl.cls_lock;
 792                        cl_lock_mutex_get(env, lock);
 793                        /*
 794                         * ldlm_handle_cp_callback() copied LVB from request
 795                         * to lock->l_lvb_data, store it in osc_lock.
 796                         */
 797                        LASSERT(dlmlock->l_lvb_data != NULL);
 798                        lock_res_and_lock(dlmlock);
 799                        olck->ols_lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
 800                        if (olck->ols_lock == NULL) {
 801                                /*
 802                                 * upcall (osc_lock_upcall()) hasn't yet been
 803                                 * called. Do nothing now, upcall will bind
 804                                 * olck to dlmlock and signal the waiters.
 805                                 *
 806                                 * This maintains an invariant that osc_lock
 807                                 * and ldlm_lock are always bound when
 808                                 * osc_lock is in OLS_GRANTED state.
 809                                 */
 810                        } else if (dlmlock->l_granted_mode ==
 811                                   dlmlock->l_req_mode) {
 812                                osc_lock_granted(env, olck, dlmlock, dlmrc);
 813                        }
 814                        unlock_res_and_lock(dlmlock);
 815
 816                        if (dlmrc != 0) {
 817                                CL_LOCK_DEBUG(D_ERROR, env, lock,
 818                                              "dlmlock returned %d\n", dlmrc);
 819                                cl_lock_error(env, lock, dlmrc);
 820                        }
 821                        cl_lock_mutex_put(env, lock);
 822                        osc_ast_data_put(env, olck);
 823                        result = 0;
 824                } else
 825                        result = -ELDLM_NO_LOCK_DATA;
 826                cl_env_nested_put(&nest, env);
 827        } else
 828                result = PTR_ERR(env);
 829        return dlmrc ?: result;
 830}
 831
 832static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
 833{
 834        struct ptlrpc_request  *req  = data;
 835        struct osc_lock *olck;
 836        struct cl_lock   *lock;
 837        struct cl_object       *obj;
 838        struct cl_env_nest      nest;
 839        struct lu_env     *env;
 840        struct ost_lvb   *lvb;
 841        struct req_capsule     *cap;
 842        int                  result;
 843
 844        LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
 845
 846        env = cl_env_nested_get(&nest);
 847        if (!IS_ERR(env)) {
 848                /* osc_ast_data_get() has to go after environment is
 849                 * allocated, because osc_ast_data() acquires a
 850                 * reference to a lock, and it can only be released in
 851                 * environment.
 852                 */
 853                olck = osc_ast_data_get(dlmlock);
 854                if (olck != NULL) {
 855                        lock = olck->ols_cl.cls_lock;
 856                        /* Do not grab the mutex of cl_lock for glimpse.
 857                         * See LU-1274 for details.
 858                         * BTW, it's okay for cl_lock to be cancelled during
 859                         * this period because server can handle this race.
 860                         * See ldlm_server_glimpse_ast() for details.
 861                         * cl_lock_mutex_get(env, lock); */
 862                        cap = &req->rq_pill;
 863                        req_capsule_extend(cap, &RQF_LDLM_GL_CALLBACK);
 864                        req_capsule_set_size(cap, &RMF_DLM_LVB, RCL_SERVER,
 865                                             sizeof(*lvb));
 866                        result = req_capsule_server_pack(cap);
 867                        if (result == 0) {
 868                                lvb = req_capsule_server_get(cap, &RMF_DLM_LVB);
 869                                obj = lock->cll_descr.cld_obj;
 870                                result = cl_object_glimpse(env, obj, lvb);
 871                        }
 872                        if (!exp_connect_lvb_type(req->rq_export))
 873                                req_capsule_shrink(&req->rq_pill,
 874                                                   &RMF_DLM_LVB,
 875                                                   sizeof(struct ost_lvb_v1),
 876                                                   RCL_SERVER);
 877                        osc_ast_data_put(env, olck);
 878                } else {
 879                        /*
 880                         * These errors are normal races, so we don't want to
 881                         * fill the console with messages by calling
 882                         * ptlrpc_error()
 883                         */
 884                        lustre_pack_reply(req, 1, NULL, NULL);
 885                        result = -ELDLM_NO_LOCK_DATA;
 886                }
 887                cl_env_nested_put(&nest, env);
 888        } else
 889                result = PTR_ERR(env);
 890        req->rq_status = result;
 891        return result;
 892}
 893
 894static unsigned long osc_lock_weigh(const struct lu_env *env,
 895                                    const struct cl_lock_slice *slice)
 896{
 897        /*
 898         * don't need to grab coh_page_guard since we don't care the exact #
 899         * of pages..
 900         */
 901        return cl_object_header(slice->cls_obj)->coh_pages;
 902}
 903
 904static void osc_lock_build_einfo(const struct lu_env *env,
 905                                 const struct cl_lock *clock,
 906                                 struct osc_lock *lock,
 907                                 struct ldlm_enqueue_info *einfo)
 908{
 909        enum cl_lock_mode mode;
 910
 911        mode = clock->cll_descr.cld_mode;
 912        if (mode == CLM_PHANTOM)
 913                /*
 914                 * For now, enqueue all glimpse locks in read mode. In the
 915                 * future, client might choose to enqueue LCK_PW lock for
 916                 * glimpse on a file opened for write.
 917                 */
 918                mode = CLM_READ;
 919
 920        einfo->ei_type   = LDLM_EXTENT;
 921        einfo->ei_mode   = osc_cl_lock2ldlm(mode);
 922        einfo->ei_cb_bl  = osc_ldlm_blocking_ast;
 923        einfo->ei_cb_cp  = osc_ldlm_completion_ast;
 924        einfo->ei_cb_gl  = osc_ldlm_glimpse_ast;
 925        einfo->ei_cbdata = lock; /* value to be put into ->l_ast_data */
 926}
 927
 928/**
 929 * Determine if the lock should be converted into a lockless lock.
 930 *
 931 * Steps to check:
 932 * - if the lock has an explicite requirment for a non-lockless lock;
 933 * - if the io lock request type ci_lockreq;
 934 * - send the enqueue rpc to ost to make the further decision;
 935 * - special treat to truncate lockless lock
 936 *
 937 *  Additional policy can be implemented here, e.g., never do lockless-io
 938 *  for large extents.
 939 */
 940static void osc_lock_to_lockless(const struct lu_env *env,
 941                                 struct osc_lock *ols, int force)
 942{
 943        struct cl_lock_slice *slice = &ols->ols_cl;
 944
 945        LASSERT(ols->ols_state == OLS_NEW ||
 946                ols->ols_state == OLS_UPCALL_RECEIVED);
 947
 948        if (force) {
 949                ols->ols_locklessable = 1;
 950                slice->cls_ops = &osc_lock_lockless_ops;
 951        } else {
 952                struct osc_io *oio     = osc_env_io(env);
 953                struct cl_io  *io      = oio->oi_cl.cis_io;
 954                struct cl_object *obj  = slice->cls_obj;
 955                struct osc_object *oob = cl2osc(obj);
 956                const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
 957                struct obd_connect_data *ocd;
 958
 959                LASSERT(io->ci_lockreq == CILR_MANDATORY ||
 960                        io->ci_lockreq == CILR_MAYBE ||
 961                        io->ci_lockreq == CILR_NEVER);
 962
 963                ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
 964                ols->ols_locklessable = (io->ci_type != CIT_SETATTR) &&
 965                                (io->ci_lockreq == CILR_MAYBE) &&
 966                                (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK);
 967                if (io->ci_lockreq == CILR_NEVER ||
 968                        /* lockless IO */
 969                    (ols->ols_locklessable && osc_object_is_contended(oob)) ||
 970                        /* lockless truncate */
 971                    (cl_io_is_trunc(io) &&
 972                     (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) &&
 973                      osd->od_lockless_truncate)) {
 974                        ols->ols_locklessable = 1;
 975                        slice->cls_ops = &osc_lock_lockless_ops;
 976                }
 977        }
 978        LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
 979}
 980
 981static int osc_lock_compatible(const struct osc_lock *qing,
 982                               const struct osc_lock *qed)
 983{
 984        enum cl_lock_mode qing_mode;
 985        enum cl_lock_mode qed_mode;
 986
 987        qing_mode = qing->ols_cl.cls_lock->cll_descr.cld_mode;
 988        if (qed->ols_glimpse &&
 989            (qed->ols_state >= OLS_UPCALL_RECEIVED || qing_mode == CLM_READ))
 990                return 1;
 991
 992        qed_mode = qed->ols_cl.cls_lock->cll_descr.cld_mode;
 993        return ((qing_mode == CLM_READ) && (qed_mode == CLM_READ));
 994}
 995
 996/**
 997 * Cancel all conflicting locks and wait for them to be destroyed.
 998 *
 999 * This function is used for two purposes:
1000 *
1001 *     - early cancel all conflicting locks before starting IO, and
1002 *
1003 *     - guarantee that pages added to the page cache by lockless IO are never
1004 *       covered by locks other than lockless IO lock, and, hence, are not
1005 *       visible to other threads.
1006 */
1007static int osc_lock_enqueue_wait(const struct lu_env *env,
1008                                 const struct osc_lock *olck)
1009{
1010        struct cl_lock    *lock    = olck->ols_cl.cls_lock;
1011        struct cl_lock_descr    *descr   = &lock->cll_descr;
1012        struct cl_object_header *hdr     = cl_object_header(descr->cld_obj);
1013        struct cl_lock    *scan;
1014        struct cl_lock    *conflict= NULL;
1015        int lockless                 = osc_lock_is_lockless(olck);
1016        int rc                     = 0;
1017
1018        LASSERT(cl_lock_is_mutexed(lock));
1019
1020        /* make it enqueue anyway for glimpse lock, because we actually
1021         * don't need to cancel any conflicting locks. */
1022        if (olck->ols_glimpse)
1023                return 0;
1024
1025        spin_lock(&hdr->coh_lock_guard);
1026        list_for_each_entry(scan, &hdr->coh_locks, cll_linkage) {
1027                struct cl_lock_descr *cld = &scan->cll_descr;
1028                const struct osc_lock *scan_ols;
1029
1030                if (scan == lock)
1031                        break;
1032
1033                if (scan->cll_state < CLS_QUEUING ||
1034                    scan->cll_state == CLS_FREEING ||
1035                    cld->cld_start > descr->cld_end ||
1036                    cld->cld_end < descr->cld_start)
1037                        continue;
1038
1039                /* overlapped and living locks. */
1040
1041                /* We're not supposed to give up group lock. */
1042                if (scan->cll_descr.cld_mode == CLM_GROUP) {
1043                        LASSERT(descr->cld_mode != CLM_GROUP ||
1044                                descr->cld_gid != scan->cll_descr.cld_gid);
1045                        continue;
1046                }
1047
1048                scan_ols = osc_lock_at(scan);
1049
1050                /* We need to cancel the compatible locks if we're enqueuing
1051                 * a lockless lock, for example:
1052                 * imagine that client has PR lock on [0, 1000], and thread T0
1053                 * is doing lockless IO in [500, 1500] region. Concurrent
1054                 * thread T1 can see lockless data in [500, 1000], which is
1055                 * wrong, because these data are possibly stale. */
1056                if (!lockless && osc_lock_compatible(olck, scan_ols))
1057                        continue;
1058
1059                cl_lock_get_trust(scan);
1060                conflict = scan;
1061                break;
1062        }
1063        spin_unlock(&hdr->coh_lock_guard);
1064
1065        if (conflict) {
1066                if (lock->cll_descr.cld_mode == CLM_GROUP) {
1067                        /* we want a group lock but a previous lock request
1068                         * conflicts, we do not wait but return 0 so the
1069                         * request is send to the server
1070                         */
1071                        CDEBUG(D_DLMTRACE, "group lock %p is conflicted "
1072                                           "with %p, no wait, send to server\n",
1073                               lock, conflict);
1074                        cl_lock_put(env, conflict);
1075                        rc = 0;
1076                } else {
1077                        CDEBUG(D_DLMTRACE, "lock %p is conflicted with %p, "
1078                                           "will wait\n",
1079                               lock, conflict);
1080                        LASSERT(lock->cll_conflict == NULL);
1081                        lu_ref_add(&conflict->cll_reference, "cancel-wait",
1082                                   lock);
1083                        lock->cll_conflict = conflict;
1084                        rc = CLO_WAIT;
1085                }
1086        }
1087        return rc;
1088}
1089
1090/**
1091 * Implementation of cl_lock_operations::clo_enqueue() method for osc
1092 * layer. This initiates ldlm enqueue:
1093 *
1094 *     - cancels conflicting locks early (osc_lock_enqueue_wait());
1095 *
1096 *     - calls osc_enqueue_base() to do actual enqueue.
1097 *
1098 * osc_enqueue_base() is supplied with an upcall function that is executed
1099 * when lock is received either after a local cached ldlm lock is matched, or
1100 * when a reply from the server is received.
1101 *
1102 * This function does not wait for the network communication to complete.
1103 */
1104static int osc_lock_enqueue(const struct lu_env *env,
1105                            const struct cl_lock_slice *slice,
1106                            struct cl_io *unused, __u32 enqflags)
1107{
1108        struct osc_lock   *ols     = cl2osc_lock(slice);
1109        struct cl_lock     *lock    = ols->ols_cl.cls_lock;
1110        int result;
1111
1112        LASSERT(cl_lock_is_mutexed(lock));
1113        LASSERTF(ols->ols_state == OLS_NEW,
1114                 "Impossible state: %d\n", ols->ols_state);
1115
1116        LASSERTF(ergo(ols->ols_glimpse, lock->cll_descr.cld_mode <= CLM_READ),
1117                "lock = %p, ols = %p\n", lock, ols);
1118
1119        result = osc_lock_enqueue_wait(env, ols);
1120        if (result == 0) {
1121                if (!osc_lock_is_lockless(ols)) {
1122                        struct osc_object       *obj = cl2osc(slice->cls_obj);
1123                        struct osc_thread_info   *info = osc_env_info(env);
1124                        struct ldlm_res_id       *resname = &info->oti_resname;
1125                        ldlm_policy_data_t       *policy = &info->oti_policy;
1126                        struct ldlm_enqueue_info *einfo = &ols->ols_einfo;
1127
1128                        /* lock will be passed as upcall cookie,
1129                         * hold ref to prevent to be released. */
1130                        cl_lock_hold_add(env, lock, "upcall", lock);
1131                        /* a user for lock also */
1132                        cl_lock_user_add(env, lock);
1133                        ols->ols_state = OLS_ENQUEUED;
1134
1135                        /*
1136                         * XXX: this is possible blocking point as
1137                         * ldlm_lock_match(LDLM_FL_LVB_READY) waits for
1138                         * LDLM_CP_CALLBACK.
1139                         */
1140                        ostid_build_res_name(&obj->oo_oinfo->loi_oi, resname);
1141                        osc_lock_build_policy(env, lock, policy);
1142                        result = osc_enqueue_base(osc_export(obj), resname,
1143                                          &ols->ols_flags, policy,
1144                                          &ols->ols_lvb,
1145                                          obj->oo_oinfo->loi_kms_valid,
1146                                          osc_lock_upcall,
1147                                          ols, einfo, &ols->ols_handle,
1148                                          PTLRPCD_SET, 1, ols->ols_agl);
1149                        if (result != 0) {
1150                                cl_lock_user_del(env, lock);
1151                                cl_lock_unhold(env, lock, "upcall", lock);
1152                                if (unlikely(result == -ECANCELED)) {
1153                                        ols->ols_state = OLS_NEW;
1154                                        result = 0;
1155                                }
1156                        }
1157                } else {
1158                        ols->ols_state = OLS_GRANTED;
1159                        ols->ols_owner = osc_env_io(env);
1160                }
1161        }
1162        LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
1163        return result;
1164}
1165
1166static int osc_lock_wait(const struct lu_env *env,
1167                         const struct cl_lock_slice *slice)
1168{
1169        struct osc_lock *olck = cl2osc_lock(slice);
1170        struct cl_lock  *lock = olck->ols_cl.cls_lock;
1171
1172        LINVRNT(osc_lock_invariant(olck));
1173
1174        if (olck->ols_glimpse && olck->ols_state >= OLS_UPCALL_RECEIVED) {
1175                if (olck->ols_flags & LDLM_FL_LVB_READY) {
1176                        return 0;
1177                } else if (olck->ols_agl) {
1178                        if (lock->cll_flags & CLF_FROM_UPCALL)
1179                                /* It is from enqueue RPC reply upcall for
1180                                 * updating state. Do not re-enqueue. */
1181                                return -ENAVAIL;
1182                        else
1183                                olck->ols_state = OLS_NEW;
1184                } else {
1185                        LASSERT(lock->cll_error);
1186                        return lock->cll_error;
1187                }
1188        }
1189
1190        if (olck->ols_state == OLS_NEW) {
1191                int rc;
1192
1193                LASSERT(olck->ols_agl);
1194                olck->ols_agl = 0;
1195                rc = osc_lock_enqueue(env, slice, NULL, CEF_ASYNC | CEF_MUST);
1196                if (rc != 0)
1197                        return rc;
1198                else
1199                        return CLO_REENQUEUED;
1200        }
1201
1202        LASSERT(equi(olck->ols_state >= OLS_UPCALL_RECEIVED &&
1203                     lock->cll_error == 0, olck->ols_lock != NULL));
1204
1205        return lock->cll_error ?: olck->ols_state >= OLS_GRANTED ? 0 : CLO_WAIT;
1206}
1207
1208/**
1209 * An implementation of cl_lock_operations::clo_use() method that pins cached
1210 * lock.
1211 */
1212static int osc_lock_use(const struct lu_env *env,
1213                        const struct cl_lock_slice *slice)
1214{
1215        struct osc_lock *olck = cl2osc_lock(slice);
1216        int rc;
1217
1218        LASSERT(!olck->ols_hold);
1219
1220        /*
1221         * Atomically check for LDLM_FL_CBPENDING and addref a lock if this
1222         * flag is not set. This protects us from a concurrent blocking ast.
1223         */
1224        rc = ldlm_lock_addref_try(&olck->ols_handle, olck->ols_einfo.ei_mode);
1225        if (rc == 0) {
1226                olck->ols_hold = 1;
1227                olck->ols_state = OLS_GRANTED;
1228        } else {
1229                struct cl_lock *lock;
1230
1231                /*
1232                 * Lock is being cancelled somewhere within
1233                 * ldlm_handle_bl_callback(): LDLM_FL_CBPENDING is already
1234                 * set, but osc_ldlm_blocking_ast() hasn't yet acquired
1235                 * cl_lock mutex.
1236                 */
1237                lock = slice->cls_lock;
1238                LASSERT(lock->cll_state == CLS_INTRANSIT);
1239                LASSERT(lock->cll_users > 0);
1240                /* set a flag for osc_dlm_blocking_ast0() to signal the
1241                 * lock.*/
1242                olck->ols_ast_wait = 1;
1243                rc = CLO_WAIT;
1244        }
1245        return rc;
1246}
1247
1248static int osc_lock_flush(struct osc_lock *ols, int discard)
1249{
1250        struct cl_lock       *lock  = ols->ols_cl.cls_lock;
1251        struct cl_env_nest    nest;
1252        struct lu_env   *env;
1253        int result = 0;
1254
1255        env = cl_env_nested_get(&nest);
1256        if (!IS_ERR(env)) {
1257                struct osc_object    *obj   = cl2osc(ols->ols_cl.cls_obj);
1258                struct cl_lock_descr *descr = &lock->cll_descr;
1259                int rc = 0;
1260
1261                if (descr->cld_mode >= CLM_WRITE) {
1262                        result = osc_cache_writeback_range(env, obj,
1263                                        descr->cld_start, descr->cld_end,
1264                                        1, discard);
1265                        LDLM_DEBUG(ols->ols_lock,
1266                                "lock %p: %d pages were %s.\n", lock, result,
1267                                discard ? "discarded" : "written");
1268                        if (result > 0)
1269                                result = 0;
1270                }
1271
1272                rc = cl_lock_discard_pages(env, lock);
1273                if (result == 0 && rc < 0)
1274                        result = rc;
1275
1276                cl_env_nested_put(&nest, env);
1277        } else
1278                result = PTR_ERR(env);
1279        if (result == 0) {
1280                ols->ols_flush = 1;
1281                LINVRNT(!osc_lock_has_pages(ols));
1282        }
1283        return result;
1284}
1285
1286/**
1287 * Implements cl_lock_operations::clo_cancel() method for osc layer. This is
1288 * called (as part of cl_lock_cancel()) when lock is canceled either voluntary
1289 * (LRU pressure, early cancellation, umount, etc.) or due to the conflict
1290 * with some other lock some where in the cluster. This function does the
1291 * following:
1292 *
1293 *     - invalidates all pages protected by this lock (after sending dirty
1294 *       ones to the server, as necessary);
1295 *
1296 *     - decref's underlying ldlm lock;
1297 *
1298 *     - cancels ldlm lock (ldlm_cli_cancel()).
1299 */
1300static void osc_lock_cancel(const struct lu_env *env,
1301                            const struct cl_lock_slice *slice)
1302{
1303        struct cl_lock   *lock    = slice->cls_lock;
1304        struct osc_lock  *olck    = cl2osc_lock(slice);
1305        struct ldlm_lock *dlmlock = olck->ols_lock;
1306        int            result  = 0;
1307        int            discard;
1308
1309        LASSERT(cl_lock_is_mutexed(lock));
1310        LINVRNT(osc_lock_invariant(olck));
1311
1312        if (dlmlock != NULL) {
1313                int do_cancel;
1314
1315                discard = !!(dlmlock->l_flags & LDLM_FL_DISCARD_DATA);
1316                if (olck->ols_state >= OLS_GRANTED)
1317                        result = osc_lock_flush(olck, discard);
1318                osc_lock_unhold(olck);
1319
1320                lock_res_and_lock(dlmlock);
1321                /* Now that we're the only user of dlm read/write reference,
1322                 * mostly the ->l_readers + ->l_writers should be zero.
1323                 * However, there is a corner case.
1324                 * See bug 18829 for details.*/
1325                do_cancel = (dlmlock->l_readers == 0 &&
1326                             dlmlock->l_writers == 0);
1327                dlmlock->l_flags |= LDLM_FL_CBPENDING;
1328                unlock_res_and_lock(dlmlock);
1329                if (do_cancel)
1330                        result = ldlm_cli_cancel(&olck->ols_handle, LCF_ASYNC);
1331                if (result < 0)
1332                        CL_LOCK_DEBUG(D_ERROR, env, lock,
1333                                      "lock %p cancel failure with error(%d)\n",
1334                                      lock, result);
1335        }
1336        olck->ols_state = OLS_CANCELLED;
1337        olck->ols_flags &= ~LDLM_FL_LVB_READY;
1338        osc_lock_detach(env, olck);
1339}
1340
1341static int osc_lock_has_pages(struct osc_lock *olck)
1342{
1343        return 0;
1344}
1345
1346static void osc_lock_delete(const struct lu_env *env,
1347                            const struct cl_lock_slice *slice)
1348{
1349        struct osc_lock *olck;
1350
1351        olck = cl2osc_lock(slice);
1352        if (olck->ols_glimpse) {
1353                LASSERT(!olck->ols_hold);
1354                LASSERT(!olck->ols_lock);
1355                return;
1356        }
1357
1358        LINVRNT(osc_lock_invariant(olck));
1359        LINVRNT(!osc_lock_has_pages(olck));
1360
1361        osc_lock_unhold(olck);
1362        osc_lock_detach(env, olck);
1363}
1364
1365/**
1366 * Implements cl_lock_operations::clo_state() method for osc layer.
1367 *
1368 * Maintains osc_lock::ols_owner field.
1369 *
1370 * This assumes that lock always enters CLS_HELD (from some other state) in
1371 * the same IO context as one that requested the lock. This should not be a
1372 * problem, because context is by definition shared by all activity pertaining
1373 * to the same high-level IO.
1374 */
1375static void osc_lock_state(const struct lu_env *env,
1376                           const struct cl_lock_slice *slice,
1377                           enum cl_lock_state state)
1378{
1379        struct osc_lock *lock = cl2osc_lock(slice);
1380
1381        /*
1382         * XXX multiple io contexts can use the lock at the same time.
1383         */
1384        LINVRNT(osc_lock_invariant(lock));
1385        if (state == CLS_HELD && slice->cls_lock->cll_state != CLS_HELD) {
1386                struct osc_io *oio = osc_env_io(env);
1387
1388                LASSERT(lock->ols_owner == NULL);
1389                lock->ols_owner = oio;
1390        } else if (state != CLS_HELD)
1391                lock->ols_owner = NULL;
1392}
1393
1394static int osc_lock_print(const struct lu_env *env, void *cookie,
1395                          lu_printer_t p, const struct cl_lock_slice *slice)
1396{
1397        struct osc_lock *lock = cl2osc_lock(slice);
1398
1399        /*
1400         * XXX print ldlm lock and einfo properly.
1401         */
1402        (*p)(env, cookie, "%p %#16llx "LPX64" %d %p ",
1403             lock->ols_lock, lock->ols_flags, lock->ols_handle.cookie,
1404             lock->ols_state, lock->ols_owner);
1405        osc_lvb_print(env, cookie, p, &lock->ols_lvb);
1406        return 0;
1407}
1408
1409static int osc_lock_fits_into(const struct lu_env *env,
1410                              const struct cl_lock_slice *slice,
1411                              const struct cl_lock_descr *need,
1412                              const struct cl_io *io)
1413{
1414        struct osc_lock *ols = cl2osc_lock(slice);
1415
1416        if (need->cld_enq_flags & CEF_NEVER)
1417                return 0;
1418
1419        if (ols->ols_state >= OLS_CANCELLED)
1420                return 0;
1421
1422        if (need->cld_mode == CLM_PHANTOM) {
1423                if (ols->ols_agl)
1424                        return !(ols->ols_state > OLS_RELEASED);
1425
1426                /*
1427                 * Note: the QUEUED lock can't be matched here, otherwise
1428                 * it might cause the deadlocks.
1429                 * In read_process,
1430                 * P1: enqueued read lock, create sublock1
1431                 * P2: enqueued write lock, create sublock2(conflicted
1432                 *     with sublock1).
1433                 * P1: Grant read lock.
1434                 * P1: enqueued glimpse lock(with holding sublock1_read),
1435                 *     matched with sublock2, waiting sublock2 to be granted.
1436                 *     But sublock2 can not be granted, because P1
1437                 *     will not release sublock1. Bang!
1438                 */
1439                if (ols->ols_state < OLS_GRANTED ||
1440                    ols->ols_state > OLS_RELEASED)
1441                        return 0;
1442        } else if (need->cld_enq_flags & CEF_MUST) {
1443                /*
1444                 * If the lock hasn't ever enqueued, it can't be matched
1445                 * because enqueue process brings in many information
1446                 * which can be used to determine things such as lockless,
1447                 * CEF_MUST, etc.
1448                 */
1449                if (ols->ols_state < OLS_UPCALL_RECEIVED &&
1450                    ols->ols_locklessable)
1451                        return 0;
1452        }
1453        return 1;
1454}
1455
1456static const struct cl_lock_operations osc_lock_ops = {
1457        .clo_fini    = osc_lock_fini,
1458        .clo_enqueue = osc_lock_enqueue,
1459        .clo_wait    = osc_lock_wait,
1460        .clo_unuse   = osc_lock_unuse,
1461        .clo_use     = osc_lock_use,
1462        .clo_delete  = osc_lock_delete,
1463        .clo_state   = osc_lock_state,
1464        .clo_cancel  = osc_lock_cancel,
1465        .clo_weigh   = osc_lock_weigh,
1466        .clo_print   = osc_lock_print,
1467        .clo_fits_into = osc_lock_fits_into,
1468};
1469
1470static int osc_lock_lockless_unuse(const struct lu_env *env,
1471                                   const struct cl_lock_slice *slice)
1472{
1473        struct osc_lock *ols = cl2osc_lock(slice);
1474        struct cl_lock *lock = slice->cls_lock;
1475
1476        LASSERT(ols->ols_state == OLS_GRANTED);
1477        LINVRNT(osc_lock_invariant(ols));
1478
1479        cl_lock_cancel(env, lock);
1480        cl_lock_delete(env, lock);
1481        return 0;
1482}
1483
1484static void osc_lock_lockless_cancel(const struct lu_env *env,
1485                                     const struct cl_lock_slice *slice)
1486{
1487        struct osc_lock   *ols  = cl2osc_lock(slice);
1488        int result;
1489
1490        result = osc_lock_flush(ols, 0);
1491        if (result)
1492                CERROR("Pages for lockless lock %p were not purged(%d)\n",
1493                       ols, result);
1494        ols->ols_state = OLS_CANCELLED;
1495}
1496
1497static int osc_lock_lockless_wait(const struct lu_env *env,
1498                                  const struct cl_lock_slice *slice)
1499{
1500        struct osc_lock *olck = cl2osc_lock(slice);
1501        struct cl_lock  *lock = olck->ols_cl.cls_lock;
1502
1503        LINVRNT(osc_lock_invariant(olck));
1504        LASSERT(olck->ols_state >= OLS_UPCALL_RECEIVED);
1505
1506        return lock->cll_error;
1507}
1508
1509static void osc_lock_lockless_state(const struct lu_env *env,
1510                                    const struct cl_lock_slice *slice,
1511                                    enum cl_lock_state state)
1512{
1513        struct osc_lock *lock = cl2osc_lock(slice);
1514
1515        LINVRNT(osc_lock_invariant(lock));
1516        if (state == CLS_HELD) {
1517                struct osc_io *oio  = osc_env_io(env);
1518
1519                LASSERT(ergo(lock->ols_owner, lock->ols_owner == oio));
1520                lock->ols_owner = oio;
1521
1522                /* set the io to be lockless if this lock is for io's
1523                 * host object */
1524                if (cl_object_same(oio->oi_cl.cis_obj, slice->cls_obj))
1525                        oio->oi_lockless = 1;
1526        }
1527}
1528
1529static int osc_lock_lockless_fits_into(const struct lu_env *env,
1530                                       const struct cl_lock_slice *slice,
1531                                       const struct cl_lock_descr *need,
1532                                       const struct cl_io *io)
1533{
1534        struct osc_lock *lock = cl2osc_lock(slice);
1535
1536        if (!(need->cld_enq_flags & CEF_NEVER))
1537                return 0;
1538
1539        /* lockless lock should only be used by its owning io. b22147 */
1540        return (lock->ols_owner == osc_env_io(env));
1541}
1542
1543static const struct cl_lock_operations osc_lock_lockless_ops = {
1544        .clo_fini      = osc_lock_fini,
1545        .clo_enqueue   = osc_lock_enqueue,
1546        .clo_wait      = osc_lock_lockless_wait,
1547        .clo_unuse     = osc_lock_lockless_unuse,
1548        .clo_state     = osc_lock_lockless_state,
1549        .clo_fits_into = osc_lock_lockless_fits_into,
1550        .clo_cancel    = osc_lock_lockless_cancel,
1551        .clo_print     = osc_lock_print
1552};
1553
1554int osc_lock_init(const struct lu_env *env,
1555                  struct cl_object *obj, struct cl_lock *lock,
1556                  const struct cl_io *unused)
1557{
1558        struct osc_lock *clk;
1559        int result;
1560
1561        OBD_SLAB_ALLOC_PTR_GFP(clk, osc_lock_kmem, __GFP_IO);
1562        if (clk != NULL) {
1563                __u32 enqflags = lock->cll_descr.cld_enq_flags;
1564
1565                osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo);
1566                atomic_set(&clk->ols_pageref, 0);
1567                clk->ols_state = OLS_NEW;
1568
1569                clk->ols_flags = osc_enq2ldlm_flags(enqflags);
1570                clk->ols_agl = !!(enqflags & CEF_AGL);
1571                if (clk->ols_agl)
1572                        clk->ols_flags |= LDLM_FL_BLOCK_NOWAIT;
1573                if (clk->ols_flags & LDLM_FL_HAS_INTENT)
1574                        clk->ols_glimpse = 1;
1575
1576                cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops);
1577
1578                if (!(enqflags & CEF_MUST))
1579                        /* try to convert this lock to a lockless lock */
1580                        osc_lock_to_lockless(env, clk, (enqflags & CEF_NEVER));
1581                if (clk->ols_locklessable && !(enqflags & CEF_DISCARD_DATA))
1582                        clk->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
1583
1584                LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags %llx\n",
1585                                lock, clk, clk->ols_flags);
1586
1587                result = 0;
1588        } else
1589                result = -ENOMEM;
1590        return result;
1591}
1592
1593int osc_dlm_lock_pageref(struct ldlm_lock *dlm)
1594{
1595        struct osc_lock *olock;
1596        int           rc = 0;
1597
1598        spin_lock(&osc_ast_guard);
1599        olock = dlm->l_ast_data;
1600        /*
1601         * there's a very rare race with osc_page_addref_lock(), but that
1602         * doesn't matter because in the worst case we don't cancel a lock
1603         * which we actually can, that's no harm.
1604         */
1605        if (olock != NULL &&
1606            atomic_add_return(_PAGEREF_MAGIC,
1607                                  &olock->ols_pageref) != _PAGEREF_MAGIC) {
1608                atomic_sub(_PAGEREF_MAGIC, &olock->ols_pageref);
1609                rc = 1;
1610        }
1611        spin_unlock(&osc_ast_guard);
1612        return rc;
1613}
1614
1615/** @} osc */
1616