linux/drivers/staging/lustre/lustre/ldlm/ldlm_lock.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2010, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * lustre/ldlm/ldlm_lock.c
  37 *
  38 * Author: Peter Braam <braam@clusterfs.com>
  39 * Author: Phil Schwan <phil@clusterfs.com>
  40 */
  41
  42#define DEBUG_SUBSYSTEM S_LDLM
  43
  44# include <linux/libcfs/libcfs.h>
  45# include <linux/lustre_intent.h>
  46
  47#include <obd_class.h>
  48#include "ldlm_internal.h"
  49
  50/* lock types */
  51char *ldlm_lockname[] = {
  52        [0] "--",
  53        [LCK_EX] "EX",
  54        [LCK_PW] "PW",
  55        [LCK_PR] "PR",
  56        [LCK_CW] "CW",
  57        [LCK_CR] "CR",
  58        [LCK_NL] "NL",
  59        [LCK_GROUP] "GROUP",
  60        [LCK_COS] "COS"
  61};
  62EXPORT_SYMBOL(ldlm_lockname);
  63
  64char *ldlm_typename[] = {
  65        [LDLM_PLAIN] "PLN",
  66        [LDLM_EXTENT] "EXT",
  67        [LDLM_FLOCK] "FLK",
  68        [LDLM_IBITS] "IBT",
  69};
  70EXPORT_SYMBOL(ldlm_typename);
  71
  72static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = {
  73        [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_wire_to_local,
  74        [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_wire_to_local,
  75        [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_wire18_to_local,
  76        [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_wire_to_local,
  77};
  78
  79static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = {
  80        [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_wire_to_local,
  81        [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_wire_to_local,
  82        [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_wire21_to_local,
  83        [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_wire_to_local,
  84};
  85
  86static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = {
  87        [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_local_to_wire,
  88        [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_local_to_wire,
  89        [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_local_to_wire,
  90        [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_local_to_wire,
  91};
  92
  93/**
  94 * Converts lock policy from local format to on the wire lock_desc format
  95 */
  96void ldlm_convert_policy_to_wire(ldlm_type_t type,
  97                                 const ldlm_policy_data_t *lpolicy,
  98                                 ldlm_wire_policy_data_t *wpolicy)
  99{
 100        ldlm_policy_local_to_wire_t convert;
 101
 102        convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
 103
 104        convert(lpolicy, wpolicy);
 105}
 106
 107/**
 108 * Converts lock policy from on the wire lock_desc format to local format
 109 */
 110void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type,
 111                                  const ldlm_wire_policy_data_t *wpolicy,
 112                                  ldlm_policy_data_t *lpolicy)
 113{
 114        ldlm_policy_wire_to_local_t convert;
 115        int new_client;
 116
 117        /** some badness for 2.0.0 clients, but 2.0.0 isn't supported */
 118        new_client = (exp_connect_flags(exp) & OBD_CONNECT_FULL20) != 0;
 119        if (new_client)
 120                convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE];
 121        else
 122                convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE];
 123
 124        convert(wpolicy, lpolicy);
 125}
 126
 127char *ldlm_it2str(int it)
 128{
 129        switch (it) {
 130        case IT_OPEN:
 131                return "open";
 132        case IT_CREAT:
 133                return "creat";
 134        case (IT_OPEN | IT_CREAT):
 135                return "open|creat";
 136        case IT_READDIR:
 137                return "readdir";
 138        case IT_GETATTR:
 139                return "getattr";
 140        case IT_LOOKUP:
 141                return "lookup";
 142        case IT_UNLINK:
 143                return "unlink";
 144        case IT_GETXATTR:
 145                return "getxattr";
 146        case IT_LAYOUT:
 147                return "layout";
 148        default:
 149                CERROR("Unknown intent %d\n", it);
 150                return "UNKNOWN";
 151        }
 152}
 153EXPORT_SYMBOL(ldlm_it2str);
 154
 155extern struct kmem_cache *ldlm_lock_slab;
 156
 157
 158void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
 159{
 160        ns->ns_policy = arg;
 161}
 162EXPORT_SYMBOL(ldlm_register_intent);
 163
 164/*
 165 * REFCOUNTED LOCK OBJECTS
 166 */
 167
 168
 169/**
 170 * Get a reference on a lock.
 171 *
 172 * Lock refcounts, during creation:
 173 *   - one special one for allocation, dec'd only once in destroy
 174 *   - one for being a lock that's in-use
 175 *   - one for the addref associated with a new lock
 176 */
 177struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
 178{
 179        atomic_inc(&lock->l_refc);
 180        return lock;
 181}
 182EXPORT_SYMBOL(ldlm_lock_get);
 183
 184/**
 185 * Release lock reference.
 186 *
 187 * Also frees the lock if it was last reference.
 188 */
 189void ldlm_lock_put(struct ldlm_lock *lock)
 190{
 191        ENTRY;
 192
 193        LASSERT(lock->l_resource != LP_POISON);
 194        LASSERT(atomic_read(&lock->l_refc) > 0);
 195        if (atomic_dec_and_test(&lock->l_refc)) {
 196                struct ldlm_resource *res;
 197
 198                LDLM_DEBUG(lock,
 199                           "final lock_put on destroyed lock, freeing it.");
 200
 201                res = lock->l_resource;
 202                LASSERT(lock->l_destroyed);
 203                LASSERT(list_empty(&lock->l_res_link));
 204                LASSERT(list_empty(&lock->l_pending_chain));
 205
 206                lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
 207                                     LDLM_NSS_LOCKS);
 208                lu_ref_del(&res->lr_reference, "lock", lock);
 209                ldlm_resource_putref(res);
 210                lock->l_resource = NULL;
 211                if (lock->l_export) {
 212                        class_export_lock_put(lock->l_export, lock);
 213                        lock->l_export = NULL;
 214                }
 215
 216                if (lock->l_lvb_data != NULL)
 217                        OBD_FREE(lock->l_lvb_data, lock->l_lvb_len);
 218
 219                ldlm_interval_free(ldlm_interval_detach(lock));
 220                lu_ref_fini(&lock->l_reference);
 221                OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
 222        }
 223
 224        EXIT;
 225}
 226EXPORT_SYMBOL(ldlm_lock_put);
 227
 228/**
 229 * Removes LDLM lock \a lock from LRU. Assumes LRU is already locked.
 230 */
 231int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
 232{
 233        int rc = 0;
 234        if (!list_empty(&lock->l_lru)) {
 235                struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
 236
 237                LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
 238                list_del_init(&lock->l_lru);
 239                if (lock->l_flags & LDLM_FL_SKIPPED)
 240                        lock->l_flags &= ~LDLM_FL_SKIPPED;
 241                LASSERT(ns->ns_nr_unused > 0);
 242                ns->ns_nr_unused--;
 243                rc = 1;
 244        }
 245        return rc;
 246}
 247
 248/**
 249 * Removes LDLM lock \a lock from LRU. Obtains the LRU lock first.
 250 */
 251int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
 252{
 253        struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
 254        int rc;
 255
 256        ENTRY;
 257        if (lock->l_ns_srv) {
 258                LASSERT(list_empty(&lock->l_lru));
 259                RETURN(0);
 260        }
 261
 262        spin_lock(&ns->ns_lock);
 263        rc = ldlm_lock_remove_from_lru_nolock(lock);
 264        spin_unlock(&ns->ns_lock);
 265        EXIT;
 266        return rc;
 267}
 268
 269/**
 270 * Adds LDLM lock \a lock to namespace LRU. Assumes LRU is already locked.
 271 */
 272void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
 273{
 274        struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
 275
 276        lock->l_last_used = cfs_time_current();
 277        LASSERT(list_empty(&lock->l_lru));
 278        LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
 279        list_add_tail(&lock->l_lru, &ns->ns_unused_list);
 280        LASSERT(ns->ns_nr_unused >= 0);
 281        ns->ns_nr_unused++;
 282}
 283
 284/**
 285 * Adds LDLM lock \a lock to namespace LRU. Obtains necessary LRU locks
 286 * first.
 287 */
 288void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
 289{
 290        struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
 291
 292        ENTRY;
 293        spin_lock(&ns->ns_lock);
 294        ldlm_lock_add_to_lru_nolock(lock);
 295        spin_unlock(&ns->ns_lock);
 296        EXIT;
 297}
 298
 299/**
 300 * Moves LDLM lock \a lock that is already in namespace LRU to the tail of
 301 * the LRU. Performs necessary LRU locking
 302 */
 303void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
 304{
 305        struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
 306
 307        ENTRY;
 308        if (lock->l_ns_srv) {
 309                LASSERT(list_empty(&lock->l_lru));
 310                EXIT;
 311                return;
 312        }
 313
 314        spin_lock(&ns->ns_lock);
 315        if (!list_empty(&lock->l_lru)) {
 316                ldlm_lock_remove_from_lru_nolock(lock);
 317                ldlm_lock_add_to_lru_nolock(lock);
 318        }
 319        spin_unlock(&ns->ns_lock);
 320        EXIT;
 321}
 322
 323/**
 324 * Helper to destroy a locked lock.
 325 *
 326 * Used by ldlm_lock_destroy and ldlm_lock_destroy_nolock
 327 * Must be called with l_lock and lr_lock held.
 328 *
 329 * Does not actually free the lock data, but rather marks the lock as
 330 * destroyed by setting l_destroyed field in the lock to 1.  Destroys a
 331 * handle->lock association too, so that the lock can no longer be found
 332 * and removes the lock from LRU list.  Actual lock freeing occurs when
 333 * last lock reference goes away.
 334 *
 335 * Original comment (of some historical value):
 336 * This used to have a 'strict' flag, which recovery would use to mark an
 337 * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
 338 * shall explain why it's gone: with the new hash table scheme, once you call
 339 * ldlm_lock_destroy, you can never drop your final references on this lock.
 340 * Because it's not in the hash table anymore.  -phil
 341 */
 342int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
 343{
 344        ENTRY;
 345
 346        if (lock->l_readers || lock->l_writers) {
 347                LDLM_ERROR(lock, "lock still has references");
 348                LBUG();
 349        }
 350
 351        if (!list_empty(&lock->l_res_link)) {
 352                LDLM_ERROR(lock, "lock still on resource");
 353                LBUG();
 354        }
 355
 356        if (lock->l_destroyed) {
 357                LASSERT(list_empty(&lock->l_lru));
 358                EXIT;
 359                return 0;
 360        }
 361        lock->l_destroyed = 1;
 362
 363        if (lock->l_export && lock->l_export->exp_lock_hash) {
 364                /* NB: it's safe to call cfs_hash_del() even lock isn't
 365                 * in exp_lock_hash. */
 366                /* In the function below, .hs_keycmp resolves to
 367                 * ldlm_export_lock_keycmp() */
 368                /* coverity[overrun-buffer-val] */
 369                cfs_hash_del(lock->l_export->exp_lock_hash,
 370                             &lock->l_remote_handle, &lock->l_exp_hash);
 371        }
 372
 373        ldlm_lock_remove_from_lru(lock);
 374        class_handle_unhash(&lock->l_handle);
 375
 376#if 0
 377        /* Wake anyone waiting for this lock */
 378        /* FIXME: I should probably add yet another flag, instead of using
 379         * l_export to only call this on clients */
 380        if (lock->l_export)
 381                class_export_put(lock->l_export);
 382        lock->l_export = NULL;
 383        if (lock->l_export && lock->l_completion_ast)
 384                lock->l_completion_ast(lock, 0);
 385#endif
 386        EXIT;
 387        return 1;
 388}
 389
 390/**
 391 * Destroys a LDLM lock \a lock. Performs necessary locking first.
 392 */
 393void ldlm_lock_destroy(struct ldlm_lock *lock)
 394{
 395        int first;
 396        ENTRY;
 397        lock_res_and_lock(lock);
 398        first = ldlm_lock_destroy_internal(lock);
 399        unlock_res_and_lock(lock);
 400
 401        /* drop reference from hashtable only for first destroy */
 402        if (first) {
 403                lu_ref_del(&lock->l_reference, "hash", lock);
 404                LDLM_LOCK_RELEASE(lock);
 405        }
 406        EXIT;
 407}
 408
 409/**
 410 * Destroys a LDLM lock \a lock that is already locked.
 411 */
 412void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
 413{
 414        int first;
 415        ENTRY;
 416        first = ldlm_lock_destroy_internal(lock);
 417        /* drop reference from hashtable only for first destroy */
 418        if (first) {
 419                lu_ref_del(&lock->l_reference, "hash", lock);
 420                LDLM_LOCK_RELEASE(lock);
 421        }
 422        EXIT;
 423}
 424
 425/* this is called by portals_handle2object with the handle lock taken */
 426static void lock_handle_addref(void *lock)
 427{
 428        LDLM_LOCK_GET((struct ldlm_lock *)lock);
 429}
 430
 431static void lock_handle_free(void *lock, int size)
 432{
 433        LASSERT(size == sizeof(struct ldlm_lock));
 434        OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
 435}
 436
 437struct portals_handle_ops lock_handle_ops = {
 438        .hop_addref = lock_handle_addref,
 439        .hop_free   = lock_handle_free,
 440};
 441
 442/**
 443 *
 444 * Allocate and initialize new lock structure.
 445 *
 446 * usage: pass in a resource on which you have done ldlm_resource_get
 447 *      new lock will take over the refcount.
 448 * returns: lock with refcount 2 - one for current caller and one for remote
 449 */
 450static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
 451{
 452        struct ldlm_lock *lock;
 453        ENTRY;
 454
 455        if (resource == NULL)
 456                LBUG();
 457
 458        OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, __GFP_IO);
 459        if (lock == NULL)
 460                RETURN(NULL);
 461
 462        spin_lock_init(&lock->l_lock);
 463        lock->l_resource = resource;
 464        lu_ref_add(&resource->lr_reference, "lock", lock);
 465
 466        atomic_set(&lock->l_refc, 2);
 467        INIT_LIST_HEAD(&lock->l_res_link);
 468        INIT_LIST_HEAD(&lock->l_lru);
 469        INIT_LIST_HEAD(&lock->l_pending_chain);
 470        INIT_LIST_HEAD(&lock->l_bl_ast);
 471        INIT_LIST_HEAD(&lock->l_cp_ast);
 472        INIT_LIST_HEAD(&lock->l_rk_ast);
 473        init_waitqueue_head(&lock->l_waitq);
 474        lock->l_blocking_lock = NULL;
 475        INIT_LIST_HEAD(&lock->l_sl_mode);
 476        INIT_LIST_HEAD(&lock->l_sl_policy);
 477        INIT_HLIST_NODE(&lock->l_exp_hash);
 478        INIT_HLIST_NODE(&lock->l_exp_flock_hash);
 479
 480        lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
 481                             LDLM_NSS_LOCKS);
 482        INIT_LIST_HEAD(&lock->l_handle.h_link);
 483        class_handle_hash(&lock->l_handle, &lock_handle_ops);
 484
 485        lu_ref_init(&lock->l_reference);
 486        lu_ref_add(&lock->l_reference, "hash", lock);
 487        lock->l_callback_timeout = 0;
 488
 489#if LUSTRE_TRACKS_LOCK_EXP_REFS
 490        INIT_LIST_HEAD(&lock->l_exp_refs_link);
 491        lock->l_exp_refs_nr = 0;
 492        lock->l_exp_refs_target = NULL;
 493#endif
 494        INIT_LIST_HEAD(&lock->l_exp_list);
 495
 496        RETURN(lock);
 497}
 498
 499/**
 500 * Moves LDLM lock \a lock to another resource.
 501 * This is used on client when server returns some other lock than requested
 502 * (typically as a result of intent operation)
 503 */
 504int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
 505                              const struct ldlm_res_id *new_resid)
 506{
 507        struct ldlm_resource *oldres = lock->l_resource;
 508        struct ldlm_resource *newres;
 509        int type;
 510        ENTRY;
 511
 512        LASSERT(ns_is_client(ns));
 513
 514        lock_res_and_lock(lock);
 515        if (memcmp(new_resid, &lock->l_resource->lr_name,
 516                   sizeof(lock->l_resource->lr_name)) == 0) {
 517                /* Nothing to do */
 518                unlock_res_and_lock(lock);
 519                RETURN(0);
 520        }
 521
 522        LASSERT(new_resid->name[0] != 0);
 523
 524        /* This function assumes that the lock isn't on any lists */
 525        LASSERT(list_empty(&lock->l_res_link));
 526
 527        type = oldres->lr_type;
 528        unlock_res_and_lock(lock);
 529
 530        newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
 531        if (newres == NULL)
 532                RETURN(-ENOMEM);
 533
 534        lu_ref_add(&newres->lr_reference, "lock", lock);
 535        /*
 536         * To flip the lock from the old to the new resource, lock, oldres and
 537         * newres have to be locked. Resource spin-locks are nested within
 538         * lock->l_lock, and are taken in the memory address order to avoid
 539         * dead-locks.
 540         */
 541        spin_lock(&lock->l_lock);
 542        oldres = lock->l_resource;
 543        if (oldres < newres) {
 544                lock_res(oldres);
 545                lock_res_nested(newres, LRT_NEW);
 546        } else {
 547                lock_res(newres);
 548                lock_res_nested(oldres, LRT_NEW);
 549        }
 550        LASSERT(memcmp(new_resid, &oldres->lr_name,
 551                       sizeof oldres->lr_name) != 0);
 552        lock->l_resource = newres;
 553        unlock_res(oldres);
 554        unlock_res_and_lock(lock);
 555
 556        /* ...and the flowers are still standing! */
 557        lu_ref_del(&oldres->lr_reference, "lock", lock);
 558        ldlm_resource_putref(oldres);
 559
 560        RETURN(0);
 561}
 562EXPORT_SYMBOL(ldlm_lock_change_resource);
 563
 564/** \defgroup ldlm_handles LDLM HANDLES
 565 * Ways to get hold of locks without any addresses.
 566 * @{
 567 */
 568
 569/**
 570 * Fills in handle for LDLM lock \a lock into supplied \a lockh
 571 * Does not take any references.
 572 */
 573void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh)
 574{
 575        lockh->cookie = lock->l_handle.h_cookie;
 576}
 577EXPORT_SYMBOL(ldlm_lock2handle);
 578
 579/**
 580 * Obtain a lock reference by handle.
 581 *
 582 * if \a flags: atomically get the lock and set the flags.
 583 *            Return NULL if flag already set
 584 */
 585struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
 586                                     __u64 flags)
 587{
 588        struct ldlm_lock *lock;
 589        ENTRY;
 590
 591        LASSERT(handle);
 592
 593        lock = class_handle2object(handle->cookie);
 594        if (lock == NULL)
 595                RETURN(NULL);
 596
 597        /* It's unlikely but possible that someone marked the lock as
 598         * destroyed after we did handle2object on it */
 599        if (flags == 0 && !lock->l_destroyed) {
 600                lu_ref_add(&lock->l_reference, "handle", current);
 601                RETURN(lock);
 602        }
 603
 604        lock_res_and_lock(lock);
 605
 606        LASSERT(lock->l_resource != NULL);
 607
 608        lu_ref_add_atomic(&lock->l_reference, "handle", current);
 609        if (unlikely(lock->l_destroyed)) {
 610                unlock_res_and_lock(lock);
 611                CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
 612                LDLM_LOCK_PUT(lock);
 613                RETURN(NULL);
 614        }
 615
 616        if (flags && (lock->l_flags & flags)) {
 617                unlock_res_and_lock(lock);
 618                LDLM_LOCK_PUT(lock);
 619                RETURN(NULL);
 620        }
 621
 622        if (flags)
 623                lock->l_flags |= flags;
 624
 625        unlock_res_and_lock(lock);
 626        RETURN(lock);
 627}
 628EXPORT_SYMBOL(__ldlm_handle2lock);
 629/** @} ldlm_handles */
 630
 631/**
 632 * Fill in "on the wire" representation for given LDLM lock into supplied
 633 * lock descriptor \a desc structure.
 634 */
 635void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
 636{
 637        struct obd_export *exp = lock->l_export ?: lock->l_conn_export;
 638
 639        /* INODEBITS_INTEROP: If the other side does not support
 640         * inodebits, reply with a plain lock descriptor. */
 641        if ((lock->l_resource->lr_type == LDLM_IBITS) &&
 642            (exp && !(exp_connect_flags(exp) & OBD_CONNECT_IBITS))) {
 643                /* Make sure all the right bits are set in this lock we
 644                   are going to pass to client */
 645                LASSERTF(lock->l_policy_data.l_inodebits.bits ==
 646                         (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
 647                          MDS_INODELOCK_LAYOUT),
 648                         "Inappropriate inode lock bits during "
 649                         "conversion " LPU64 "\n",
 650                         lock->l_policy_data.l_inodebits.bits);
 651
 652                ldlm_res2desc(lock->l_resource, &desc->l_resource);
 653                desc->l_resource.lr_type = LDLM_PLAIN;
 654
 655                /* Convert "new" lock mode to something old client can
 656                   understand */
 657                if ((lock->l_req_mode == LCK_CR) ||
 658                    (lock->l_req_mode == LCK_CW))
 659                        desc->l_req_mode = LCK_PR;
 660                else
 661                        desc->l_req_mode = lock->l_req_mode;
 662                if ((lock->l_granted_mode == LCK_CR) ||
 663                    (lock->l_granted_mode == LCK_CW)) {
 664                        desc->l_granted_mode = LCK_PR;
 665                } else {
 666                        /* We never grant PW/EX locks to clients */
 667                        LASSERT((lock->l_granted_mode != LCK_PW) &&
 668                                (lock->l_granted_mode != LCK_EX));
 669                        desc->l_granted_mode = lock->l_granted_mode;
 670                }
 671
 672                /* We do not copy policy here, because there is no
 673                   policy for plain locks */
 674        } else {
 675                ldlm_res2desc(lock->l_resource, &desc->l_resource);
 676                desc->l_req_mode = lock->l_req_mode;
 677                desc->l_granted_mode = lock->l_granted_mode;
 678                ldlm_convert_policy_to_wire(lock->l_resource->lr_type,
 679                                            &lock->l_policy_data,
 680                                            &desc->l_policy_data);
 681        }
 682}
 683EXPORT_SYMBOL(ldlm_lock2desc);
 684
 685/**
 686 * Add a lock to list of conflicting locks to send AST to.
 687 *
 688 * Only add if we have not sent a blocking AST to the lock yet.
 689 */
 690void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
 691                           struct list_head *work_list)
 692{
 693        if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) {
 694                LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
 695                lock->l_flags |= LDLM_FL_AST_SENT;
 696                /* If the enqueuing client said so, tell the AST recipient to
 697                 * discard dirty data, rather than writing back. */
 698                if (new->l_flags & LDLM_AST_DISCARD_DATA)
 699                        lock->l_flags |= LDLM_FL_DISCARD_DATA;
 700                LASSERT(list_empty(&lock->l_bl_ast));
 701                list_add(&lock->l_bl_ast, work_list);
 702                LDLM_LOCK_GET(lock);
 703                LASSERT(lock->l_blocking_lock == NULL);
 704                lock->l_blocking_lock = LDLM_LOCK_GET(new);
 705        }
 706}
 707
 708/**
 709 * Add a lock to list of just granted locks to send completion AST to.
 710 */
 711void ldlm_add_cp_work_item(struct ldlm_lock *lock, struct list_head *work_list)
 712{
 713        if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) {
 714                lock->l_flags |= LDLM_FL_CP_REQD;
 715                LDLM_DEBUG(lock, "lock granted; sending completion AST.");
 716                LASSERT(list_empty(&lock->l_cp_ast));
 717                list_add(&lock->l_cp_ast, work_list);
 718                LDLM_LOCK_GET(lock);
 719        }
 720}
 721
 722/**
 723 * Aggregator function to add AST work items into a list. Determines
 724 * what sort of an AST work needs to be done and calls the proper
 725 * adding function.
 726 * Must be called with lr_lock held.
 727 */
 728void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
 729                            struct list_head *work_list)
 730{
 731        ENTRY;
 732        check_res_locked(lock->l_resource);
 733        if (new)
 734                ldlm_add_bl_work_item(lock, new, work_list);
 735        else
 736                ldlm_add_cp_work_item(lock, work_list);
 737        EXIT;
 738}
 739
 740/**
 741 * Add specified reader/writer reference to LDLM lock with handle \a lockh.
 742 * r/w reference type is determined by \a mode
 743 * Calls ldlm_lock_addref_internal.
 744 */
 745void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
 746{
 747        struct ldlm_lock *lock;
 748
 749        lock = ldlm_handle2lock(lockh);
 750        LASSERT(lock != NULL);
 751        ldlm_lock_addref_internal(lock, mode);
 752        LDLM_LOCK_PUT(lock);
 753}
 754EXPORT_SYMBOL(ldlm_lock_addref);
 755
 756/**
 757 * Helper function.
 758 * Add specified reader/writer reference to LDLM lock \a lock.
 759 * r/w reference type is determined by \a mode
 760 * Removes lock from LRU if it is there.
 761 * Assumes the LDLM lock is already locked.
 762 */
 763void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
 764{
 765        ldlm_lock_remove_from_lru(lock);
 766        if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
 767                lock->l_readers++;
 768                lu_ref_add_atomic(&lock->l_reference, "reader", lock);
 769        }
 770        if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
 771                lock->l_writers++;
 772                lu_ref_add_atomic(&lock->l_reference, "writer", lock);
 773        }
 774        LDLM_LOCK_GET(lock);
 775        lu_ref_add_atomic(&lock->l_reference, "user", lock);
 776        LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
 777}
 778
 779/**
 780 * Attempts to add reader/writer reference to a lock with handle \a lockh, and
 781 * fails if lock is already LDLM_FL_CBPENDING or destroyed.
 782 *
 783 * \retval 0 success, lock was addref-ed
 784 *
 785 * \retval -EAGAIN lock is being canceled.
 786 */
 787int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
 788{
 789        struct ldlm_lock *lock;
 790        int            result;
 791
 792        result = -EAGAIN;
 793        lock = ldlm_handle2lock(lockh);
 794        if (lock != NULL) {
 795                lock_res_and_lock(lock);
 796                if (lock->l_readers != 0 || lock->l_writers != 0 ||
 797                    !(lock->l_flags & LDLM_FL_CBPENDING)) {
 798                        ldlm_lock_addref_internal_nolock(lock, mode);
 799                        result = 0;
 800                }
 801                unlock_res_and_lock(lock);
 802                LDLM_LOCK_PUT(lock);
 803        }
 804        return result;
 805}
 806EXPORT_SYMBOL(ldlm_lock_addref_try);
 807
 808/**
 809 * Add specified reader/writer reference to LDLM lock \a lock.
 810 * Locks LDLM lock and calls ldlm_lock_addref_internal_nolock to do the work.
 811 * Only called for local locks.
 812 */
 813void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
 814{
 815        lock_res_and_lock(lock);
 816        ldlm_lock_addref_internal_nolock(lock, mode);
 817        unlock_res_and_lock(lock);
 818}
 819
 820/**
 821 * Removes reader/writer reference for LDLM lock \a lock.
 822 * Assumes LDLM lock is already locked.
 823 * only called in ldlm_flock_destroy and for local locks.
 824 * Does NOT add lock to LRU if no r/w references left to accomodate flock locks
 825 * that cannot be placed in LRU.
 826 */
 827void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
 828{
 829        LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
 830        if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
 831                LASSERT(lock->l_readers > 0);
 832                lu_ref_del(&lock->l_reference, "reader", lock);
 833                lock->l_readers--;
 834        }
 835        if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
 836                LASSERT(lock->l_writers > 0);
 837                lu_ref_del(&lock->l_reference, "writer", lock);
 838                lock->l_writers--;
 839        }
 840
 841        lu_ref_del(&lock->l_reference, "user", lock);
 842        LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
 843}
 844
 845/**
 846 * Removes reader/writer reference for LDLM lock \a lock.
 847 * Locks LDLM lock first.
 848 * If the lock is determined to be client lock on a client and r/w refcount
 849 * drops to zero and the lock is not blocked, the lock is added to LRU lock
 850 * on the namespace.
 851 * For blocked LDLM locks if r/w count drops to zero, blocking_ast is called.
 852 */
 853void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
 854{
 855        struct ldlm_namespace *ns;
 856        ENTRY;
 857
 858        lock_res_and_lock(lock);
 859
 860        ns = ldlm_lock_to_ns(lock);
 861
 862        ldlm_lock_decref_internal_nolock(lock, mode);
 863
 864        if (lock->l_flags & LDLM_FL_LOCAL &&
 865            !lock->l_readers && !lock->l_writers) {
 866                /* If this is a local lock on a server namespace and this was
 867                 * the last reference, cancel the lock. */
 868                CDEBUG(D_INFO, "forcing cancel of local lock\n");
 869                lock->l_flags |= LDLM_FL_CBPENDING;
 870        }
 871
 872        if (!lock->l_readers && !lock->l_writers &&
 873            (lock->l_flags & LDLM_FL_CBPENDING)) {
 874                /* If we received a blocked AST and this was the last reference,
 875                 * run the callback. */
 876                if (lock->l_ns_srv && lock->l_export)
 877                        CERROR("FL_CBPENDING set on non-local lock--just a "
 878                               "warning\n");
 879
 880                LDLM_DEBUG(lock, "final decref done on cbpending lock");
 881
 882                LDLM_LOCK_GET(lock); /* dropped by bl thread */
 883                ldlm_lock_remove_from_lru(lock);
 884                unlock_res_and_lock(lock);
 885
 886                if (lock->l_flags & LDLM_FL_FAIL_LOC)
 887                        OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
 888
 889                if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
 890                    ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
 891                        ldlm_handle_bl_callback(ns, NULL, lock);
 892        } else if (ns_is_client(ns) &&
 893                   !lock->l_readers && !lock->l_writers &&
 894                   !(lock->l_flags & LDLM_FL_NO_LRU) &&
 895                   !(lock->l_flags & LDLM_FL_BL_AST)) {
 896
 897                LDLM_DEBUG(lock, "add lock into lru list");
 898
 899                /* If this is a client-side namespace and this was the last
 900                 * reference, put it on the LRU. */
 901                ldlm_lock_add_to_lru(lock);
 902                unlock_res_and_lock(lock);
 903
 904                if (lock->l_flags & LDLM_FL_FAIL_LOC)
 905                        OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
 906
 907                /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
 908                 * are not supported by the server, otherwise, it is done on
 909                 * enqueue. */
 910                if (!exp_connect_cancelset(lock->l_conn_export) &&
 911                    !ns_connect_lru_resize(ns))
 912                        ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
 913        } else {
 914                LDLM_DEBUG(lock, "do not add lock into lru list");
 915                unlock_res_and_lock(lock);
 916        }
 917
 918        EXIT;
 919}
 920
 921/**
 922 * Decrease reader/writer refcount for LDLM lock with handle \a lockh
 923 */
 924void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
 925{
 926        struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
 927        LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
 928        ldlm_lock_decref_internal(lock, mode);
 929        LDLM_LOCK_PUT(lock);
 930}
 931EXPORT_SYMBOL(ldlm_lock_decref);
 932
 933/**
 934 * Decrease reader/writer refcount for LDLM lock with handle
 935 * \a lockh and mark it for subsequent cancellation once r/w refcount
 936 * drops to zero instead of putting into LRU.
 937 *
 938 * Typical usage is for GROUP locks which we cannot allow to be cached.
 939 */
 940void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
 941{
 942        struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
 943        ENTRY;
 944
 945        LASSERT(lock != NULL);
 946
 947        LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
 948        lock_res_and_lock(lock);
 949        lock->l_flags |= LDLM_FL_CBPENDING;
 950        unlock_res_and_lock(lock);
 951        ldlm_lock_decref_internal(lock, mode);
 952        LDLM_LOCK_PUT(lock);
 953}
 954EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
 955
 956struct sl_insert_point {
 957        struct list_head *res_link;
 958        struct list_head *mode_link;
 959        struct list_head *policy_link;
 960};
 961
 962/**
 963 * Finds a position to insert the new lock into granted lock list.
 964 *
 965 * Used for locks eligible for skiplist optimization.
 966 *
 967 * Parameters:
 968 *      queue [input]:  the granted list where search acts on;
 969 *      req [input]:    the lock whose position to be located;
 970 *      prev [output]:  positions within 3 lists to insert @req to
 971 * Return Value:
 972 *      filled @prev
 973 * NOTE: called by
 974 *  - ldlm_grant_lock_with_skiplist
 975 */
 976static void search_granted_lock(struct list_head *queue,
 977                                struct ldlm_lock *req,
 978                                struct sl_insert_point *prev)
 979{
 980        struct list_head *tmp;
 981        struct ldlm_lock *lock, *mode_end, *policy_end;
 982        ENTRY;
 983
 984        list_for_each(tmp, queue) {
 985                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 986
 987                mode_end = list_entry(lock->l_sl_mode.prev,
 988                                          struct ldlm_lock, l_sl_mode);
 989
 990                if (lock->l_req_mode != req->l_req_mode) {
 991                        /* jump to last lock of mode group */
 992                        tmp = &mode_end->l_res_link;
 993                        continue;
 994                }
 995
 996                /* suitable mode group is found */
 997                if (lock->l_resource->lr_type == LDLM_PLAIN) {
 998                        /* insert point is last lock of the mode group */
 999                        prev->res_link = &mode_end->l_res_link;
1000                        prev->mode_link = &mode_end->l_sl_mode;
1001                        prev->policy_link = &req->l_sl_policy;
1002                        EXIT;
1003                        return;
1004                } else if (lock->l_resource->lr_type == LDLM_IBITS) {
1005                        for (;;) {
1006                                policy_end =
1007                                        list_entry(lock->l_sl_policy.prev,
1008                                                       struct ldlm_lock,
1009                                                       l_sl_policy);
1010
1011                                if (lock->l_policy_data.l_inodebits.bits ==
1012                                    req->l_policy_data.l_inodebits.bits) {
1013                                        /* insert point is last lock of
1014                                         * the policy group */
1015                                        prev->res_link =
1016                                                &policy_end->l_res_link;
1017                                        prev->mode_link =
1018                                                &policy_end->l_sl_mode;
1019                                        prev->policy_link =
1020                                                &policy_end->l_sl_policy;
1021                                        EXIT;
1022                                        return;
1023                                }
1024
1025                                if (policy_end == mode_end)
1026                                        /* done with mode group */
1027                                        break;
1028
1029                                /* go to next policy group within mode group */
1030                                tmp = policy_end->l_res_link.next;
1031                                lock = list_entry(tmp, struct ldlm_lock,
1032                                                      l_res_link);
1033                        }  /* loop over policy groups within the mode group */
1034
1035                        /* insert point is last lock of the mode group,
1036                         * new policy group is started */
1037                        prev->res_link = &mode_end->l_res_link;
1038                        prev->mode_link = &mode_end->l_sl_mode;
1039                        prev->policy_link = &req->l_sl_policy;
1040                        EXIT;
1041                        return;
1042                } else {
1043                        LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
1044                        LBUG();
1045                }
1046        }
1047
1048        /* insert point is last lock on the queue,
1049         * new mode group and new policy group are started */
1050        prev->res_link = queue->prev;
1051        prev->mode_link = &req->l_sl_mode;
1052        prev->policy_link = &req->l_sl_policy;
1053        EXIT;
1054        return;
1055}
1056
1057/**
1058 * Add a lock into resource granted list after a position described by
1059 * \a prev.
1060 */
1061static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
1062                                       struct sl_insert_point *prev)
1063{
1064        struct ldlm_resource *res = lock->l_resource;
1065        ENTRY;
1066
1067        check_res_locked(res);
1068
1069        ldlm_resource_dump(D_INFO, res);
1070        LDLM_DEBUG(lock, "About to add lock:");
1071
1072        if (lock->l_destroyed) {
1073                CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
1074                return;
1075        }
1076
1077        LASSERT(list_empty(&lock->l_res_link));
1078        LASSERT(list_empty(&lock->l_sl_mode));
1079        LASSERT(list_empty(&lock->l_sl_policy));
1080
1081        /*
1082         * lock->link == prev->link means lock is first starting the group.
1083         * Don't re-add to itself to suppress kernel warnings.
1084         */
1085        if (&lock->l_res_link != prev->res_link)
1086                list_add(&lock->l_res_link, prev->res_link);
1087        if (&lock->l_sl_mode != prev->mode_link)
1088                list_add(&lock->l_sl_mode, prev->mode_link);
1089        if (&lock->l_sl_policy != prev->policy_link)
1090                list_add(&lock->l_sl_policy, prev->policy_link);
1091
1092        EXIT;
1093}
1094
1095/**
1096 * Add a lock to granted list on a resource maintaining skiplist
1097 * correctness.
1098 */
1099static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
1100{
1101        struct sl_insert_point prev;
1102        ENTRY;
1103
1104        LASSERT(lock->l_req_mode == lock->l_granted_mode);
1105
1106        search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
1107        ldlm_granted_list_add_lock(lock, &prev);
1108        EXIT;
1109}
1110
1111/**
1112 * Perform lock granting bookkeeping.
1113 *
1114 * Includes putting the lock into granted list and updating lock mode.
1115 * NOTE: called by
1116 *  - ldlm_lock_enqueue
1117 *  - ldlm_reprocess_queue
1118 *  - ldlm_lock_convert
1119 *
1120 * must be called with lr_lock held
1121 */
1122void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
1123{
1124        struct ldlm_resource *res = lock->l_resource;
1125        ENTRY;
1126
1127        check_res_locked(res);
1128
1129        lock->l_granted_mode = lock->l_req_mode;
1130        if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
1131                ldlm_grant_lock_with_skiplist(lock);
1132        else if (res->lr_type == LDLM_EXTENT)
1133                ldlm_extent_add_lock(res, lock);
1134        else
1135                ldlm_resource_add_lock(res, &res->lr_granted, lock);
1136
1137        if (lock->l_granted_mode < res->lr_most_restr)
1138                res->lr_most_restr = lock->l_granted_mode;
1139
1140        if (work_list && lock->l_completion_ast != NULL)
1141                ldlm_add_ast_work_item(lock, NULL, work_list);
1142
1143        ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
1144        EXIT;
1145}
1146
1147/**
1148 * Search for a lock with given properties in a queue.
1149 *
1150 * \retval a referenced lock or NULL.  See the flag descriptions below, in the
1151 * comment above ldlm_lock_match
1152 */
1153static struct ldlm_lock *search_queue(struct list_head *queue,
1154                                      ldlm_mode_t *mode,
1155                                      ldlm_policy_data_t *policy,
1156                                      struct ldlm_lock *old_lock,
1157                                      __u64 flags, int unref)
1158{
1159        struct ldlm_lock *lock;
1160        struct list_head       *tmp;
1161
1162        list_for_each(tmp, queue) {
1163                ldlm_mode_t match;
1164
1165                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1166
1167                if (lock == old_lock)
1168                        break;
1169
1170                /* llite sometimes wants to match locks that will be
1171                 * canceled when their users drop, but we allow it to match
1172                 * if it passes in CBPENDING and the lock still has users.
1173                 * this is generally only going to be used by children
1174                 * whose parents already hold a lock so forward progress
1175                 * can still happen. */
1176                if (lock->l_flags & LDLM_FL_CBPENDING &&
1177                    !(flags & LDLM_FL_CBPENDING))
1178                        continue;
1179                if (!unref && lock->l_flags & LDLM_FL_CBPENDING &&
1180                    lock->l_readers == 0 && lock->l_writers == 0)
1181                        continue;
1182
1183                if (!(lock->l_req_mode & *mode))
1184                        continue;
1185                match = lock->l_req_mode;
1186
1187                if (lock->l_resource->lr_type == LDLM_EXTENT &&
1188                    (lock->l_policy_data.l_extent.start >
1189                     policy->l_extent.start ||
1190                     lock->l_policy_data.l_extent.end < policy->l_extent.end))
1191                        continue;
1192
1193                if (unlikely(match == LCK_GROUP) &&
1194                    lock->l_resource->lr_type == LDLM_EXTENT &&
1195                    lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
1196                        continue;
1197
1198                /* We match if we have existing lock with same or wider set
1199                   of bits. */
1200                if (lock->l_resource->lr_type == LDLM_IBITS &&
1201                     ((lock->l_policy_data.l_inodebits.bits &
1202                      policy->l_inodebits.bits) !=
1203                      policy->l_inodebits.bits))
1204                        continue;
1205
1206                if (!unref &&
1207                    (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
1208                     lock->l_failed))
1209                        continue;
1210
1211                if ((flags & LDLM_FL_LOCAL_ONLY) &&
1212                    !(lock->l_flags & LDLM_FL_LOCAL))
1213                        continue;
1214
1215                if (flags & LDLM_FL_TEST_LOCK) {
1216                        LDLM_LOCK_GET(lock);
1217                        ldlm_lock_touch_in_lru(lock);
1218                } else {
1219                        ldlm_lock_addref_internal_nolock(lock, match);
1220                }
1221                *mode = match;
1222                return lock;
1223        }
1224
1225        return NULL;
1226}
1227
1228void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
1229{
1230        if (!lock->l_failed) {
1231                lock->l_failed = 1;
1232                wake_up_all(&lock->l_waitq);
1233        }
1234}
1235EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
1236
1237void ldlm_lock_fail_match(struct ldlm_lock *lock)
1238{
1239        lock_res_and_lock(lock);
1240        ldlm_lock_fail_match_locked(lock);
1241        unlock_res_and_lock(lock);
1242}
1243EXPORT_SYMBOL(ldlm_lock_fail_match);
1244
1245/**
1246 * Mark lock as "matchable" by OST.
1247 *
1248 * Used to prevent certain races in LOV/OSC where the lock is granted, but LVB
1249 * is not yet valid.
1250 * Assumes LDLM lock is already locked.
1251 */
1252void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
1253{
1254        lock->l_flags |= LDLM_FL_LVB_READY;
1255        wake_up_all(&lock->l_waitq);
1256}
1257EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
1258
1259/**
1260 * Mark lock as "matchable" by OST.
1261 * Locks the lock and then \see ldlm_lock_allow_match_locked
1262 */
1263void ldlm_lock_allow_match(struct ldlm_lock *lock)
1264{
1265        lock_res_and_lock(lock);
1266        ldlm_lock_allow_match_locked(lock);
1267        unlock_res_and_lock(lock);
1268}
1269EXPORT_SYMBOL(ldlm_lock_allow_match);
1270
1271/**
1272 * Attempt to find a lock with specified properties.
1273 *
1274 * Typically returns a reference to matched lock unless LDLM_FL_TEST_LOCK is
1275 * set in \a flags
1276 *
1277 * Can be called in two ways:
1278 *
1279 * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1280 * for a duplicate of.
1281 *
1282 * Otherwise, all of the fields must be filled in, to match against.
1283 *
1284 * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1285 *     server (ie, connh is NULL)
1286 * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1287 *     list will be considered
1288 * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1289 *     to be canceled can still be matched as long as they still have reader
1290 *     or writer refernces
1291 * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1292 *     just tell us if we would have matched.
1293 *
1294 * \retval 1 if it finds an already-existing lock that is compatible; in this
1295 * case, lockh is filled in with a addref()ed lock
1296 *
1297 * We also check security context, and if that fails we simply return 0 (to
1298 * keep caller code unchanged), the context failure will be discovered by
1299 * caller sometime later.
1300 */
1301ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags,
1302                            const struct ldlm_res_id *res_id, ldlm_type_t type,
1303                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
1304                            struct lustre_handle *lockh, int unref)
1305{
1306        struct ldlm_resource *res;
1307        struct ldlm_lock *lock, *old_lock = NULL;
1308        int rc = 0;
1309        ENTRY;
1310
1311        if (ns == NULL) {
1312                old_lock = ldlm_handle2lock(lockh);
1313                LASSERT(old_lock);
1314
1315                ns = ldlm_lock_to_ns(old_lock);
1316                res_id = &old_lock->l_resource->lr_name;
1317                type = old_lock->l_resource->lr_type;
1318                mode = old_lock->l_req_mode;
1319        }
1320
1321        res = ldlm_resource_get(ns, NULL, res_id, type, 0);
1322        if (res == NULL) {
1323                LASSERT(old_lock == NULL);
1324                RETURN(0);
1325        }
1326
1327        LDLM_RESOURCE_ADDREF(res);
1328        lock_res(res);
1329
1330        lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
1331                            flags, unref);
1332        if (lock != NULL)
1333                GOTO(out, rc = 1);
1334        if (flags & LDLM_FL_BLOCK_GRANTED)
1335                GOTO(out, rc = 0);
1336        lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
1337                            flags, unref);
1338        if (lock != NULL)
1339                GOTO(out, rc = 1);
1340        lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
1341                            flags, unref);
1342        if (lock != NULL)
1343                GOTO(out, rc = 1);
1344
1345        EXIT;
1346 out:
1347        unlock_res(res);
1348        LDLM_RESOURCE_DELREF(res);
1349        ldlm_resource_putref(res);
1350
1351        if (lock) {
1352                ldlm_lock2handle(lock, lockh);
1353                if ((flags & LDLM_FL_LVB_READY) &&
1354                    (!(lock->l_flags & LDLM_FL_LVB_READY))) {
1355                        struct l_wait_info lwi;
1356                        if (lock->l_completion_ast) {
1357                                int err = lock->l_completion_ast(lock,
1358                                                          LDLM_FL_WAIT_NOREPROC,
1359                                                                 NULL);
1360                                if (err) {
1361                                        if (flags & LDLM_FL_TEST_LOCK)
1362                                                LDLM_LOCK_RELEASE(lock);
1363                                        else
1364                                                ldlm_lock_decref_internal(lock,
1365                                                                          mode);
1366                                        rc = 0;
1367                                        goto out2;
1368                                }
1369                        }
1370
1371                        lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
1372                                               NULL, LWI_ON_SIGNAL_NOOP, NULL);
1373
1374                        /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1375                        l_wait_event(lock->l_waitq,
1376                                     lock->l_flags & LDLM_FL_LVB_READY ||
1377                                     lock->l_destroyed || lock->l_failed,
1378                                     &lwi);
1379                        if (!(lock->l_flags & LDLM_FL_LVB_READY)) {
1380                                if (flags & LDLM_FL_TEST_LOCK)
1381                                        LDLM_LOCK_RELEASE(lock);
1382                                else
1383                                        ldlm_lock_decref_internal(lock, mode);
1384                                rc = 0;
1385                        }
1386                }
1387        }
1388 out2:
1389        if (rc) {
1390                LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1391                           (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1392                                res_id->name[2] : policy->l_extent.start,
1393                           (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1394                                res_id->name[3] : policy->l_extent.end);
1395
1396                /* check user's security context */
1397                if (lock->l_conn_export &&
1398                    sptlrpc_import_check_ctx(
1399                                class_exp2cliimp(lock->l_conn_export))) {
1400                        if (!(flags & LDLM_FL_TEST_LOCK))
1401                                ldlm_lock_decref_internal(lock, mode);
1402                        rc = 0;
1403                }
1404
1405                if (flags & LDLM_FL_TEST_LOCK)
1406                        LDLM_LOCK_RELEASE(lock);
1407
1408        } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1409                LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1410                                  LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1411                                  type, mode, res_id->name[0], res_id->name[1],
1412                                  (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1413                                        res_id->name[2] :policy->l_extent.start,
1414                                  (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1415                                        res_id->name[3] : policy->l_extent.end);
1416        }
1417        if (old_lock)
1418                LDLM_LOCK_PUT(old_lock);
1419
1420        return rc ? mode : 0;
1421}
1422EXPORT_SYMBOL(ldlm_lock_match);
1423
1424ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
1425                                        __u64 *bits)
1426{
1427        struct ldlm_lock *lock;
1428        ldlm_mode_t mode = 0;
1429        ENTRY;
1430
1431        lock = ldlm_handle2lock(lockh);
1432        if (lock != NULL) {
1433                lock_res_and_lock(lock);
1434                if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
1435                    lock->l_failed)
1436                        GOTO(out, mode);
1437
1438                if (lock->l_flags & LDLM_FL_CBPENDING &&
1439                    lock->l_readers == 0 && lock->l_writers == 0)
1440                        GOTO(out, mode);
1441
1442                if (bits)
1443                        *bits = lock->l_policy_data.l_inodebits.bits;
1444                mode = lock->l_granted_mode;
1445                ldlm_lock_addref_internal_nolock(lock, mode);
1446        }
1447
1448        EXIT;
1449
1450out:
1451        if (lock != NULL) {
1452                unlock_res_and_lock(lock);
1453                LDLM_LOCK_PUT(lock);
1454        }
1455        return mode;
1456}
1457EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
1458
1459/** The caller must guarantee that the buffer is large enough. */
1460int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill,
1461                  enum req_location loc, void *data, int size)
1462{
1463        void *lvb;
1464        ENTRY;
1465
1466        LASSERT(data != NULL);
1467        LASSERT(size >= 0);
1468
1469        switch (lock->l_lvb_type) {
1470        case LVB_T_OST:
1471                if (size == sizeof(struct ost_lvb)) {
1472                        if (loc == RCL_CLIENT)
1473                                lvb = req_capsule_client_swab_get(pill,
1474                                                &RMF_DLM_LVB,
1475                                                lustre_swab_ost_lvb);
1476                        else
1477                                lvb = req_capsule_server_swab_get(pill,
1478                                                &RMF_DLM_LVB,
1479                                                lustre_swab_ost_lvb);
1480                        if (unlikely(lvb == NULL)) {
1481                                LDLM_ERROR(lock, "no LVB");
1482                                RETURN(-EPROTO);
1483                        }
1484
1485                        memcpy(data, lvb, size);
1486                } else if (size == sizeof(struct ost_lvb_v1)) {
1487                        struct ost_lvb *olvb = data;
1488
1489                        if (loc == RCL_CLIENT)
1490                                lvb = req_capsule_client_swab_get(pill,
1491                                                &RMF_DLM_LVB,
1492                                                lustre_swab_ost_lvb_v1);
1493                        else
1494                                lvb = req_capsule_server_sized_swab_get(pill,
1495                                                &RMF_DLM_LVB, size,
1496                                                lustre_swab_ost_lvb_v1);
1497                        if (unlikely(lvb == NULL)) {
1498                                LDLM_ERROR(lock, "no LVB");
1499                                RETURN(-EPROTO);
1500                        }
1501
1502                        memcpy(data, lvb, size);
1503                        olvb->lvb_mtime_ns = 0;
1504                        olvb->lvb_atime_ns = 0;
1505                        olvb->lvb_ctime_ns = 0;
1506                } else {
1507                        LDLM_ERROR(lock, "Replied unexpected ost LVB size %d",
1508                                   size);
1509                        RETURN(-EINVAL);
1510                }
1511                break;
1512        case LVB_T_LQUOTA:
1513                if (size == sizeof(struct lquota_lvb)) {
1514                        if (loc == RCL_CLIENT)
1515                                lvb = req_capsule_client_swab_get(pill,
1516                                                &RMF_DLM_LVB,
1517                                                lustre_swab_lquota_lvb);
1518                        else
1519                                lvb = req_capsule_server_swab_get(pill,
1520                                                &RMF_DLM_LVB,
1521                                                lustre_swab_lquota_lvb);
1522                        if (unlikely(lvb == NULL)) {
1523                                LDLM_ERROR(lock, "no LVB");
1524                                RETURN(-EPROTO);
1525                        }
1526
1527                        memcpy(data, lvb, size);
1528                } else {
1529                        LDLM_ERROR(lock, "Replied unexpected lquota LVB size %d",
1530                                   size);
1531                        RETURN(-EINVAL);
1532                }
1533                break;
1534        case LVB_T_LAYOUT:
1535                if (size == 0)
1536                        break;
1537
1538                if (loc == RCL_CLIENT)
1539                        lvb = req_capsule_client_get(pill, &RMF_DLM_LVB);
1540                else
1541                        lvb = req_capsule_server_get(pill, &RMF_DLM_LVB);
1542                if (unlikely(lvb == NULL)) {
1543                        LDLM_ERROR(lock, "no LVB");
1544                        RETURN(-EPROTO);
1545                }
1546
1547                memcpy(data, lvb, size);
1548                break;
1549        default:
1550                LDLM_ERROR(lock, "Unknown LVB type: %d\n", lock->l_lvb_type);
1551                libcfs_debug_dumpstack(NULL);
1552                RETURN(-EINVAL);
1553        }
1554
1555        RETURN(0);
1556}
1557
1558/**
1559 * Create and fill in new LDLM lock with specified properties.
1560 * Returns a referenced lock
1561 */
1562struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1563                                   const struct ldlm_res_id *res_id,
1564                                   ldlm_type_t type,
1565                                   ldlm_mode_t mode,
1566                                   const struct ldlm_callback_suite *cbs,
1567                                   void *data, __u32 lvb_len,
1568                                   enum lvb_type lvb_type)
1569{
1570        struct ldlm_lock *lock;
1571        struct ldlm_resource *res;
1572        ENTRY;
1573
1574        res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1575        if (res == NULL)
1576                RETURN(NULL);
1577
1578        lock = ldlm_lock_new(res);
1579
1580        if (lock == NULL)
1581                RETURN(NULL);
1582
1583        lock->l_req_mode = mode;
1584        lock->l_ast_data = data;
1585        lock->l_pid = current_pid();
1586        lock->l_ns_srv = !!ns_is_server(ns);
1587        if (cbs) {
1588                lock->l_blocking_ast = cbs->lcs_blocking;
1589                lock->l_completion_ast = cbs->lcs_completion;
1590                lock->l_glimpse_ast = cbs->lcs_glimpse;
1591                lock->l_weigh_ast = cbs->lcs_weigh;
1592        }
1593
1594        lock->l_tree_node = NULL;
1595        /* if this is the extent lock, allocate the interval tree node */
1596        if (type == LDLM_EXTENT) {
1597                if (ldlm_interval_alloc(lock) == NULL)
1598                        GOTO(out, 0);
1599        }
1600
1601        if (lvb_len) {
1602                lock->l_lvb_len = lvb_len;
1603                OBD_ALLOC(lock->l_lvb_data, lvb_len);
1604                if (lock->l_lvb_data == NULL)
1605                        GOTO(out, 0);
1606        }
1607
1608        lock->l_lvb_type = lvb_type;
1609        if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
1610                GOTO(out, 0);
1611
1612        RETURN(lock);
1613
1614out:
1615        ldlm_lock_destroy(lock);
1616        LDLM_LOCK_RELEASE(lock);
1617        return NULL;
1618}
1619
1620/**
1621 * Enqueue (request) a lock.
1622 *
1623 * Does not block. As a result of enqueue the lock would be put
1624 * into granted or waiting list.
1625 *
1626 * If namespace has intent policy sent and the lock has LDLM_FL_HAS_INTENT flag
1627 * set, skip all the enqueueing and delegate lock processing to intent policy
1628 * function.
1629 */
1630ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1631                               struct ldlm_lock **lockp,
1632                               void *cookie, __u64 *flags)
1633{
1634        struct ldlm_lock *lock = *lockp;
1635        struct ldlm_resource *res = lock->l_resource;
1636        int local = ns_is_client(ldlm_res_to_ns(res));
1637        ldlm_error_t rc = ELDLM_OK;
1638        struct ldlm_interval *node = NULL;
1639        ENTRY;
1640
1641        lock->l_last_activity = cfs_time_current_sec();
1642        /* policies are not executed on the client or during replay */
1643        if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1644            && !local && ns->ns_policy) {
1645                rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1646                                   NULL);
1647                if (rc == ELDLM_LOCK_REPLACED) {
1648                        /* The lock that was returned has already been granted,
1649                         * and placed into lockp.  If it's not the same as the
1650                         * one we passed in, then destroy the old one and our
1651                         * work here is done. */
1652                        if (lock != *lockp) {
1653                                ldlm_lock_destroy(lock);
1654                                LDLM_LOCK_RELEASE(lock);
1655                        }
1656                        *flags |= LDLM_FL_LOCK_CHANGED;
1657                        RETURN(0);
1658                } else if (rc != ELDLM_OK ||
1659                           (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1660                        ldlm_lock_destroy(lock);
1661                        RETURN(rc);
1662                }
1663        }
1664
1665        /* For a replaying lock, it might be already in granted list. So
1666         * unlinking the lock will cause the interval node to be freed, we
1667         * have to allocate the interval node early otherwise we can't regrant
1668         * this lock in the future. - jay */
1669        if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
1670                OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, __GFP_IO);
1671
1672        lock_res_and_lock(lock);
1673        if (local && lock->l_req_mode == lock->l_granted_mode) {
1674                /* The server returned a blocked lock, but it was granted
1675                 * before we got a chance to actually enqueue it.  We don't
1676                 * need to do anything else. */
1677                *flags &= ~(LDLM_FL_BLOCK_GRANTED |
1678                            LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
1679                GOTO(out, ELDLM_OK);
1680        }
1681
1682        ldlm_resource_unlink_lock(lock);
1683        if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
1684                if (node == NULL) {
1685                        ldlm_lock_destroy_nolock(lock);
1686                        GOTO(out, rc = -ENOMEM);
1687                }
1688
1689                INIT_LIST_HEAD(&node->li_group);
1690                ldlm_interval_attach(node, lock);
1691                node = NULL;
1692        }
1693
1694        /* Some flags from the enqueue want to make it into the AST, via the
1695         * lock's l_flags. */
1696        lock->l_flags |= *flags & LDLM_AST_DISCARD_DATA;
1697
1698        /* This distinction between local lock trees is very important; a client
1699         * namespace only has information about locks taken by that client, and
1700         * thus doesn't have enough information to decide for itself if it can
1701         * be granted (below).  In this case, we do exactly what the server
1702         * tells us to do, as dictated by the 'flags'.
1703         *
1704         * We do exactly the same thing during recovery, when the server is
1705         * more or less trusting the clients not to lie.
1706         *
1707         * FIXME (bug 268): Detect obvious lies by checking compatibility in
1708         * granted/converting queues. */
1709        if (local) {
1710                if (*flags & LDLM_FL_BLOCK_CONV)
1711                        ldlm_resource_add_lock(res, &res->lr_converting, lock);
1712                else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1713                        ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1714                else
1715                        ldlm_grant_lock(lock, NULL);
1716                GOTO(out, ELDLM_OK);
1717        } else {
1718                CERROR("This is client-side-only module, cannot handle "
1719                       "LDLM_NAMESPACE_SERVER resource type lock.\n");
1720                LBUG();
1721        }
1722
1723out:
1724        unlock_res_and_lock(lock);
1725        if (node)
1726                OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1727        return rc;
1728}
1729
1730
1731/**
1732 * Process a call to blocking AST callback for a lock in ast_work list
1733 */
1734static int
1735ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1736{
1737        struct ldlm_cb_set_arg *arg = opaq;
1738        struct ldlm_lock_desc   d;
1739        int                  rc;
1740        struct ldlm_lock       *lock;
1741        ENTRY;
1742
1743        if (list_empty(arg->list))
1744                RETURN(-ENOENT);
1745
1746        lock = list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
1747
1748        /* nobody should touch l_bl_ast */
1749        lock_res_and_lock(lock);
1750        list_del_init(&lock->l_bl_ast);
1751
1752        LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
1753        LASSERT(lock->l_bl_ast_run == 0);
1754        LASSERT(lock->l_blocking_lock);
1755        lock->l_bl_ast_run++;
1756        unlock_res_and_lock(lock);
1757
1758        ldlm_lock2desc(lock->l_blocking_lock, &d);
1759
1760        rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
1761        LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1762        lock->l_blocking_lock = NULL;
1763        LDLM_LOCK_RELEASE(lock);
1764
1765        RETURN(rc);
1766}
1767
1768/**
1769 * Process a call to completion AST callback for a lock in ast_work list
1770 */
1771static int
1772ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1773{
1774        struct ldlm_cb_set_arg  *arg = opaq;
1775        int                   rc = 0;
1776        struct ldlm_lock        *lock;
1777        ldlm_completion_callback completion_callback;
1778        ENTRY;
1779
1780        if (list_empty(arg->list))
1781                RETURN(-ENOENT);
1782
1783        lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
1784
1785        /* It's possible to receive a completion AST before we've set
1786         * the l_completion_ast pointer: either because the AST arrived
1787         * before the reply, or simply because there's a small race
1788         * window between receiving the reply and finishing the local
1789         * enqueue. (bug 842)
1790         *
1791         * This can't happen with the blocking_ast, however, because we
1792         * will never call the local blocking_ast until we drop our
1793         * reader/writer reference, which we won't do until we get the
1794         * reply and finish enqueueing. */
1795
1796        /* nobody should touch l_cp_ast */
1797        lock_res_and_lock(lock);
1798        list_del_init(&lock->l_cp_ast);
1799        LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
1800        /* save l_completion_ast since it can be changed by
1801         * mds_intent_policy(), see bug 14225 */
1802        completion_callback = lock->l_completion_ast;
1803        lock->l_flags &= ~LDLM_FL_CP_REQD;
1804        unlock_res_and_lock(lock);
1805
1806        if (completion_callback != NULL)
1807                rc = completion_callback(lock, 0, (void *)arg);
1808        LDLM_LOCK_RELEASE(lock);
1809
1810        RETURN(rc);
1811}
1812
1813/**
1814 * Process a call to revocation AST callback for a lock in ast_work list
1815 */
1816static int
1817ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1818{
1819        struct ldlm_cb_set_arg *arg = opaq;
1820        struct ldlm_lock_desc   desc;
1821        int                  rc;
1822        struct ldlm_lock       *lock;
1823        ENTRY;
1824
1825        if (list_empty(arg->list))
1826                RETURN(-ENOENT);
1827
1828        lock = list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
1829        list_del_init(&lock->l_rk_ast);
1830
1831        /* the desc just pretend to exclusive */
1832        ldlm_lock2desc(lock, &desc);
1833        desc.l_req_mode = LCK_EX;
1834        desc.l_granted_mode = 0;
1835
1836        rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
1837        LDLM_LOCK_RELEASE(lock);
1838
1839        RETURN(rc);
1840}
1841
1842/**
1843 * Process a call to glimpse AST callback for a lock in ast_work list
1844 */
1845int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1846{
1847        struct ldlm_cb_set_arg          *arg = opaq;
1848        struct ldlm_glimpse_work        *gl_work;
1849        struct ldlm_lock                *lock;
1850        int                              rc = 0;
1851        ENTRY;
1852
1853        if (list_empty(arg->list))
1854                RETURN(-ENOENT);
1855
1856        gl_work = list_entry(arg->list->next, struct ldlm_glimpse_work,
1857                                 gl_list);
1858        list_del_init(&gl_work->gl_list);
1859
1860        lock = gl_work->gl_lock;
1861
1862        /* transfer the glimpse descriptor to ldlm_cb_set_arg */
1863        arg->gl_desc = gl_work->gl_desc;
1864
1865        /* invoke the actual glimpse callback */
1866        if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
1867                rc = 1;
1868
1869        LDLM_LOCK_RELEASE(lock);
1870
1871        if ((gl_work->gl_flags & LDLM_GL_WORK_NOFREE) == 0)
1872                OBD_FREE_PTR(gl_work);
1873
1874        RETURN(rc);
1875}
1876
1877/**
1878 * Process list of locks in need of ASTs being sent.
1879 *
1880 * Used on server to send multiple ASTs together instead of sending one by
1881 * one.
1882 */
1883int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
1884                      ldlm_desc_ast_t ast_type)
1885{
1886        struct ldlm_cb_set_arg *arg;
1887        set_producer_func       work_ast_lock;
1888        int                  rc;
1889
1890        if (list_empty(rpc_list))
1891                RETURN(0);
1892
1893        OBD_ALLOC_PTR(arg);
1894        if (arg == NULL)
1895                RETURN(-ENOMEM);
1896
1897        atomic_set(&arg->restart, 0);
1898        arg->list = rpc_list;
1899
1900        switch (ast_type) {
1901                case LDLM_WORK_BL_AST:
1902                        arg->type = LDLM_BL_CALLBACK;
1903                        work_ast_lock = ldlm_work_bl_ast_lock;
1904                        break;
1905                case LDLM_WORK_CP_AST:
1906                        arg->type = LDLM_CP_CALLBACK;
1907                        work_ast_lock = ldlm_work_cp_ast_lock;
1908                        break;
1909                case LDLM_WORK_REVOKE_AST:
1910                        arg->type = LDLM_BL_CALLBACK;
1911                        work_ast_lock = ldlm_work_revoke_ast_lock;
1912                        break;
1913                case LDLM_WORK_GL_AST:
1914                        arg->type = LDLM_GL_CALLBACK;
1915                        work_ast_lock = ldlm_work_gl_ast_lock;
1916                        break;
1917                default:
1918                        LBUG();
1919        }
1920
1921        /* We create a ptlrpc request set with flow control extension.
1922         * This request set will use the work_ast_lock function to produce new
1923         * requests and will send a new request each time one completes in order
1924         * to keep the number of requests in flight to ns_max_parallel_ast */
1925        arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
1926                                     work_ast_lock, arg);
1927        if (arg->set == NULL)
1928                GOTO(out, rc = -ENOMEM);
1929
1930        ptlrpc_set_wait(arg->set);
1931        ptlrpc_set_destroy(arg->set);
1932
1933        rc = atomic_read(&arg->restart) ? -ERESTART : 0;
1934        GOTO(out, rc);
1935out:
1936        OBD_FREE_PTR(arg);
1937        return rc;
1938}
1939
1940static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
1941{
1942        ldlm_reprocess_all(res);
1943        return LDLM_ITER_CONTINUE;
1944}
1945
1946static int ldlm_reprocess_res(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1947                              struct hlist_node *hnode, void *arg)
1948{
1949        struct ldlm_resource *res = cfs_hash_object(hs, hnode);
1950        int    rc;
1951
1952        rc = reprocess_one_queue(res, arg);
1953
1954        return rc == LDLM_ITER_STOP;
1955}
1956
1957/**
1958 * Iterate through all resources on a namespace attempting to grant waiting
1959 * locks.
1960 */
1961void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
1962{
1963        ENTRY;
1964
1965        if (ns != NULL) {
1966                cfs_hash_for_each_nolock(ns->ns_rs_hash,
1967                                         ldlm_reprocess_res, NULL);
1968        }
1969        EXIT;
1970}
1971EXPORT_SYMBOL(ldlm_reprocess_all_ns);
1972
1973/**
1974 * Try to grant all waiting locks on a resource.
1975 *
1976 * Calls ldlm_reprocess_queue on converting and waiting queues.
1977 *
1978 * Typically called after some resource locks are cancelled to see
1979 * if anything could be granted as a result of the cancellation.
1980 */
1981void ldlm_reprocess_all(struct ldlm_resource *res)
1982{
1983        LIST_HEAD(rpc_list);
1984
1985        ENTRY;
1986        if (!ns_is_client(ldlm_res_to_ns(res))) {
1987                CERROR("This is client-side-only module, cannot handle "
1988                       "LDLM_NAMESPACE_SERVER resource type lock.\n");
1989                LBUG();
1990        }
1991        EXIT;
1992}
1993
1994/**
1995 * Helper function to call blocking AST for LDLM lock \a lock in a
1996 * "cancelling" mode.
1997 */
1998void ldlm_cancel_callback(struct ldlm_lock *lock)
1999{
2000        check_res_locked(lock->l_resource);
2001        if (!(lock->l_flags & LDLM_FL_CANCEL)) {
2002                lock->l_flags |= LDLM_FL_CANCEL;
2003                if (lock->l_blocking_ast) {
2004                        unlock_res_and_lock(lock);
2005                        lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
2006                                             LDLM_CB_CANCELING);
2007                        lock_res_and_lock(lock);
2008                } else {
2009                        LDLM_DEBUG(lock, "no blocking ast");
2010                }
2011        }
2012        lock->l_flags |= LDLM_FL_BL_DONE;
2013}
2014
2015/**
2016 * Remove skiplist-enabled LDLM lock \a req from granted list
2017 */
2018void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
2019{
2020        if (req->l_resource->lr_type != LDLM_PLAIN &&
2021            req->l_resource->lr_type != LDLM_IBITS)
2022                return;
2023
2024        list_del_init(&req->l_sl_policy);
2025        list_del_init(&req->l_sl_mode);
2026}
2027
2028/**
2029 * Attempts to cancel LDLM lock \a lock that has no reader/writer references.
2030 */
2031void ldlm_lock_cancel(struct ldlm_lock *lock)
2032{
2033        struct ldlm_resource *res;
2034        struct ldlm_namespace *ns;
2035        ENTRY;
2036
2037        lock_res_and_lock(lock);
2038
2039        res = lock->l_resource;
2040        ns  = ldlm_res_to_ns(res);
2041
2042        /* Please do not, no matter how tempting, remove this LBUG without
2043         * talking to me first. -phik */
2044        if (lock->l_readers || lock->l_writers) {
2045                LDLM_ERROR(lock, "lock still has references");
2046                LBUG();
2047        }
2048
2049        if (lock->l_waited)
2050                ldlm_del_waiting_lock(lock);
2051
2052        /* Releases cancel callback. */
2053        ldlm_cancel_callback(lock);
2054
2055        /* Yes, second time, just in case it was added again while we were
2056           running with no res lock in ldlm_cancel_callback */
2057        if (lock->l_waited)
2058                ldlm_del_waiting_lock(lock);
2059
2060        ldlm_resource_unlink_lock(lock);
2061        ldlm_lock_destroy_nolock(lock);
2062
2063        if (lock->l_granted_mode == lock->l_req_mode)
2064                ldlm_pool_del(&ns->ns_pool, lock);
2065
2066        /* Make sure we will not be called again for same lock what is possible
2067         * if not to zero out lock->l_granted_mode */
2068        lock->l_granted_mode = LCK_MINMODE;
2069        unlock_res_and_lock(lock);
2070
2071        EXIT;
2072}
2073EXPORT_SYMBOL(ldlm_lock_cancel);
2074
2075/**
2076 * Set opaque data into the lock that only makes sense to upper layer.
2077 */
2078int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
2079{
2080        struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2081        int rc = -EINVAL;
2082        ENTRY;
2083
2084        if (lock) {
2085                if (lock->l_ast_data == NULL)
2086                        lock->l_ast_data = data;
2087                if (lock->l_ast_data == data)
2088                        rc = 0;
2089                LDLM_LOCK_PUT(lock);
2090        }
2091        RETURN(rc);
2092}
2093EXPORT_SYMBOL(ldlm_lock_set_data);
2094
2095struct export_cl_data {
2096        struct obd_export       *ecl_exp;
2097        int                     ecl_loop;
2098};
2099
2100/**
2101 * Iterator function for ldlm_cancel_locks_for_export.
2102 * Cancels passed locks.
2103 */
2104int ldlm_cancel_locks_for_export_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
2105                                    struct hlist_node *hnode, void *data)
2106
2107{
2108        struct export_cl_data   *ecl = (struct export_cl_data *)data;
2109        struct obd_export       *exp  = ecl->ecl_exp;
2110        struct ldlm_lock     *lock = cfs_hash_object(hs, hnode);
2111        struct ldlm_resource *res;
2112
2113        res = ldlm_resource_getref(lock->l_resource);
2114        LDLM_LOCK_GET(lock);
2115
2116        LDLM_DEBUG(lock, "export %p", exp);
2117        ldlm_res_lvbo_update(res, NULL, 1);
2118        ldlm_lock_cancel(lock);
2119        ldlm_reprocess_all(res);
2120        ldlm_resource_putref(res);
2121        LDLM_LOCK_RELEASE(lock);
2122
2123        ecl->ecl_loop++;
2124        if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
2125                CDEBUG(D_INFO,
2126                       "Cancel lock %p for export %p (loop %d), still have "
2127                       "%d locks left on hash table.\n",
2128                       lock, exp, ecl->ecl_loop,
2129                       atomic_read(&hs->hs_count));
2130        }
2131
2132        return 0;
2133}
2134
2135/**
2136 * Cancel all locks for given export.
2137 *
2138 * Typically called on client disconnection/eviction
2139 */
2140void ldlm_cancel_locks_for_export(struct obd_export *exp)
2141{
2142        struct export_cl_data   ecl = {
2143                .ecl_exp        = exp,
2144                .ecl_loop       = 0,
2145        };
2146
2147        cfs_hash_for_each_empty(exp->exp_lock_hash,
2148                                ldlm_cancel_locks_for_export_cb, &ecl);
2149}
2150
2151/**
2152 * Downgrade an exclusive lock.
2153 *
2154 * A fast variant of ldlm_lock_convert for convertion of exclusive
2155 * locks. The convertion is always successful.
2156 * Used by Commit on Sharing (COS) code.
2157 *
2158 * \param lock A lock to convert
2159 * \param new_mode new lock mode
2160 */
2161void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
2162{
2163        ENTRY;
2164
2165        LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
2166        LASSERT(new_mode == LCK_COS);
2167
2168        lock_res_and_lock(lock);
2169        ldlm_resource_unlink_lock(lock);
2170        /*
2171         * Remove the lock from pool as it will be added again in
2172         * ldlm_grant_lock() called below.
2173         */
2174        ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
2175
2176        lock->l_req_mode = new_mode;
2177        ldlm_grant_lock(lock, NULL);
2178        unlock_res_and_lock(lock);
2179        ldlm_reprocess_all(lock->l_resource);
2180
2181        EXIT;
2182}
2183EXPORT_SYMBOL(ldlm_lock_downgrade);
2184
2185/**
2186 * Attempt to convert already granted lock to a different mode.
2187 *
2188 * While lock conversion is not currently used, future client-side
2189 * optimizations could take advantage of it to avoid discarding cached
2190 * pages on a file.
2191 */
2192struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
2193                                        __u32 *flags)
2194{
2195        LIST_HEAD(rpc_list);
2196        struct ldlm_resource *res;
2197        struct ldlm_namespace *ns;
2198        int granted = 0;
2199        struct ldlm_interval *node;
2200        ENTRY;
2201
2202        /* Just return if mode is unchanged. */
2203        if (new_mode == lock->l_granted_mode) {
2204                *flags |= LDLM_FL_BLOCK_GRANTED;
2205                RETURN(lock->l_resource);
2206        }
2207
2208        /* I can't check the type of lock here because the bitlock of lock
2209         * is not held here, so do the allocation blindly. -jay */
2210        OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, __GFP_IO);
2211        if (node == NULL)  /* Actually, this causes EDEADLOCK to be returned */
2212                RETURN(NULL);
2213
2214        LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
2215                 "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
2216
2217        lock_res_and_lock(lock);
2218
2219        res = lock->l_resource;
2220        ns  = ldlm_res_to_ns(res);
2221
2222        lock->l_req_mode = new_mode;
2223        if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
2224                ldlm_resource_unlink_lock(lock);
2225        } else {
2226                ldlm_resource_unlink_lock(lock);
2227                if (res->lr_type == LDLM_EXTENT) {
2228                        /* FIXME: ugly code, I have to attach the lock to a
2229                         * interval node again since perhaps it will be granted
2230                         * soon */
2231                        INIT_LIST_HEAD(&node->li_group);
2232                        ldlm_interval_attach(node, lock);
2233                        node = NULL;
2234                }
2235        }
2236
2237        /*
2238         * Remove old lock from the pool before adding the lock with new
2239         * mode below in ->policy()
2240         */
2241        ldlm_pool_del(&ns->ns_pool, lock);
2242
2243        /* If this is a local resource, put it on the appropriate list. */
2244        if (ns_is_client(ldlm_res_to_ns(res))) {
2245                if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
2246                        ldlm_resource_add_lock(res, &res->lr_converting, lock);
2247                } else {
2248                        /* This should never happen, because of the way the
2249                         * server handles conversions. */
2250                        LDLM_ERROR(lock, "Erroneous flags %x on local lock\n",
2251                                   *flags);
2252                        LBUG();
2253
2254                        ldlm_grant_lock(lock, &rpc_list);
2255                        granted = 1;
2256                        /* FIXME: completion handling not with lr_lock held ! */
2257                        if (lock->l_completion_ast)
2258                                lock->l_completion_ast(lock, 0, NULL);
2259                }
2260        } else {
2261                CERROR("This is client-side-only module, cannot handle "
2262                       "LDLM_NAMESPACE_SERVER resource type lock.\n");
2263                LBUG();
2264        }
2265        unlock_res_and_lock(lock);
2266
2267        if (granted)
2268                ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
2269        if (node)
2270                OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
2271        RETURN(res);
2272}
2273EXPORT_SYMBOL(ldlm_lock_convert);
2274
2275/**
2276 * Print lock with lock handle \a lockh description into debug log.
2277 *
2278 * Used when printing all locks on a resource for debug purposes.
2279 */
2280void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
2281{
2282        struct ldlm_lock *lock;
2283
2284        if (!((libcfs_debug | D_ERROR) & level))
2285                return;
2286
2287        lock = ldlm_handle2lock(lockh);
2288        if (lock == NULL)
2289                return;
2290
2291        LDLM_DEBUG_LIMIT(level, lock, "###");
2292
2293        LDLM_LOCK_PUT(lock);
2294}
2295EXPORT_SYMBOL(ldlm_lock_dump_handle);
2296
2297/**
2298 * Print lock information with custom message into debug log.
2299 * Helper function.
2300 */
2301void _ldlm_lock_debug(struct ldlm_lock *lock,
2302                      struct libcfs_debug_msg_data *msgdata,
2303                      const char *fmt, ...)
2304{
2305        va_list args;
2306        struct obd_export *exp = lock->l_export;
2307        struct ldlm_resource *resource = lock->l_resource;
2308        char *nid = "local";
2309
2310        va_start(args, fmt);
2311
2312        if (exp && exp->exp_connection) {
2313                nid = libcfs_nid2str(exp->exp_connection->c_peer.nid);
2314        } else if (exp && exp->exp_obd != NULL) {
2315                struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2316                nid = libcfs_nid2str(imp->imp_connection->c_peer.nid);
2317        }
2318
2319        if (resource == NULL) {
2320                libcfs_debug_vmsg2(msgdata, fmt, args,
2321                       " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2322                       "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s "
2323                       "remote: "LPX64" expref: %d pid: %u timeout: %lu "
2324                       "lvb_type: %d\n",
2325                       lock,
2326                       lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2327                       lock->l_readers, lock->l_writers,
2328                       ldlm_lockname[lock->l_granted_mode],
2329                       ldlm_lockname[lock->l_req_mode],
2330                       lock->l_flags, nid, lock->l_remote_handle.cookie,
2331                       exp ? atomic_read(&exp->exp_refcount) : -99,
2332                       lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type);
2333                va_end(args);
2334                return;
2335        }
2336
2337        switch (resource->lr_type) {
2338        case LDLM_EXTENT:
2339                libcfs_debug_vmsg2(msgdata, fmt, args,
2340                       " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2341                       "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64
2342                       "] (req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote:"
2343                       " "LPX64" expref: %d pid: %u timeout: %lu lvb_type: %d\n",
2344                       ldlm_lock_to_ns_name(lock), lock,
2345                       lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2346                       lock->l_readers, lock->l_writers,
2347                       ldlm_lockname[lock->l_granted_mode],
2348                       ldlm_lockname[lock->l_req_mode],
2349                       resource->lr_name.name[0],
2350                       resource->lr_name.name[1],
2351                       atomic_read(&resource->lr_refcount),
2352                       ldlm_typename[resource->lr_type],
2353                       lock->l_policy_data.l_extent.start,
2354                       lock->l_policy_data.l_extent.end,
2355                       lock->l_req_extent.start, lock->l_req_extent.end,
2356                       lock->l_flags, nid, lock->l_remote_handle.cookie,
2357                       exp ? atomic_read(&exp->exp_refcount) : -99,
2358                       lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type);
2359                break;
2360
2361        case LDLM_FLOCK:
2362                libcfs_debug_vmsg2(msgdata, fmt, args,
2363                       " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2364                       "res: "LPU64"/"LPU64" rrc: %d type: %s pid: %d "
2365                       "["LPU64"->"LPU64"] flags: "LPX64" nid: %s remote: "LPX64
2366                       " expref: %d pid: %u timeout: %lu\n",
2367                       ldlm_lock_to_ns_name(lock), lock,
2368                       lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2369                       lock->l_readers, lock->l_writers,
2370                       ldlm_lockname[lock->l_granted_mode],
2371                       ldlm_lockname[lock->l_req_mode],
2372                       resource->lr_name.name[0],
2373                       resource->lr_name.name[1],
2374                       atomic_read(&resource->lr_refcount),
2375                       ldlm_typename[resource->lr_type],
2376                       lock->l_policy_data.l_flock.pid,
2377                       lock->l_policy_data.l_flock.start,
2378                       lock->l_policy_data.l_flock.end,
2379                       lock->l_flags, nid, lock->l_remote_handle.cookie,
2380                       exp ? atomic_read(&exp->exp_refcount) : -99,
2381                       lock->l_pid, lock->l_callback_timeout);
2382                break;
2383
2384        case LDLM_IBITS:
2385                libcfs_debug_vmsg2(msgdata, fmt, args,
2386                       " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2387                       "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s "
2388                       "flags: "LPX64" nid: %s remote: "LPX64" expref: %d "
2389                       "pid: %u timeout: %lu lvb_type: %d\n",
2390                       ldlm_lock_to_ns_name(lock),
2391                       lock, lock->l_handle.h_cookie,
2392                       atomic_read (&lock->l_refc),
2393                       lock->l_readers, lock->l_writers,
2394                       ldlm_lockname[lock->l_granted_mode],
2395                       ldlm_lockname[lock->l_req_mode],
2396                       resource->lr_name.name[0],
2397                       resource->lr_name.name[1],
2398                       lock->l_policy_data.l_inodebits.bits,
2399                       atomic_read(&resource->lr_refcount),
2400                       ldlm_typename[resource->lr_type],
2401                       lock->l_flags, nid, lock->l_remote_handle.cookie,
2402                       exp ? atomic_read(&exp->exp_refcount) : -99,
2403                       lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type);
2404                break;
2405
2406        default:
2407                libcfs_debug_vmsg2(msgdata, fmt, args,
2408                       " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2409                       "res: "LPU64"/"LPU64" rrc: %d type: %s flags: "LPX64" "
2410                       "nid: %s remote: "LPX64" expref: %d pid: %u timeout: %lu"
2411                       "lvb_type: %d\n",
2412                       ldlm_lock_to_ns_name(lock),
2413                       lock, lock->l_handle.h_cookie,
2414                       atomic_read (&lock->l_refc),
2415                       lock->l_readers, lock->l_writers,
2416                       ldlm_lockname[lock->l_granted_mode],
2417                       ldlm_lockname[lock->l_req_mode],
2418                       resource->lr_name.name[0],
2419                       resource->lr_name.name[1],
2420                       atomic_read(&resource->lr_refcount),
2421                       ldlm_typename[resource->lr_type],
2422                       lock->l_flags, nid, lock->l_remote_handle.cookie,
2423                       exp ? atomic_read(&exp->exp_refcount) : -99,
2424                       lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type);
2425                break;
2426        }
2427        va_end(args);
2428}
2429EXPORT_SYMBOL(_ldlm_lock_debug);
2430