linux/drivers/staging/lustre/lustre/ldlm/ldlm_flock.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2003 Hewlett-Packard Development Company LP.
  28 * Developed under the sponsorship of the US Government under
  29 * Subcontract No. B514193
  30 *
  31 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  32 * Use is subject to license terms.
  33 *
  34 * Copyright (c) 2010, 2012, Intel Corporation.
  35 */
  36/*
  37 * This file is part of Lustre, http://www.lustre.org/
  38 * Lustre is a trademark of Sun Microsystems, Inc.
  39 */
  40
  41/**
  42 * This file implements POSIX lock type for Lustre.
  43 * Its policy properties are start and end of extent and PID.
  44 *
  45 * These locks are only done through MDS due to POSIX semantics requiring
  46 * e.g. that locks could be only partially released and as such split into
  47 * two parts, and also that two adjacent locks from the same process may be
  48 * merged into a single wider lock.
  49 *
  50 * Lock modes are mapped like this:
  51 * PR and PW for READ and WRITE locks
  52 * NL to request a releasing of a portion of the lock
  53 *
  54 * These flock locks never timeout.
  55 */
  56
  57#define DEBUG_SUBSYSTEM S_LDLM
  58
  59#include "../include/lustre_dlm.h"
  60#include "../include/obd_support.h"
  61#include "../include/obd_class.h"
  62#include "../include/lustre_lib.h"
  63#include <linux/list.h>
  64#include "ldlm_internal.h"
  65
  66int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
  67                            void *data, int flag);
  68
  69/**
  70 * list_for_remaining_safe - iterate over the remaining entries in a list
  71 *            and safeguard against removal of a list entry.
  72 * \param pos   the &struct list_head to use as a loop counter. pos MUST
  73 *            have been initialized prior to using it in this macro.
  74 * \param n     another &struct list_head to use as temporary storage
  75 * \param head  the head for your list.
  76 */
  77#define list_for_remaining_safe(pos, n, head) \
  78        for (n = pos->next; pos != (head); pos = n, n = pos->next)
  79
  80static inline int
  81ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
  82{
  83        return((new->l_policy_data.l_flock.owner ==
  84                lock->l_policy_data.l_flock.owner) &&
  85               (new->l_export == lock->l_export));
  86}
  87
  88static inline int
  89ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
  90{
  91        return((new->l_policy_data.l_flock.start <=
  92                lock->l_policy_data.l_flock.end) &&
  93               (new->l_policy_data.l_flock.end >=
  94                lock->l_policy_data.l_flock.start));
  95}
  96
  97static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
  98                                            struct ldlm_lock *lock)
  99{
 100        /* For server only */
 101        if (req->l_export == NULL)
 102                return;
 103
 104        LASSERT(hlist_unhashed(&req->l_exp_flock_hash));
 105
 106        req->l_policy_data.l_flock.blocking_owner =
 107                lock->l_policy_data.l_flock.owner;
 108        req->l_policy_data.l_flock.blocking_export =
 109                lock->l_export;
 110        req->l_policy_data.l_flock.blocking_refs = 0;
 111
 112        cfs_hash_add(req->l_export->exp_flock_hash,
 113                     &req->l_policy_data.l_flock.owner,
 114                     &req->l_exp_flock_hash);
 115}
 116
 117static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
 118{
 119        /* For server only */
 120        if (req->l_export == NULL)
 121                return;
 122
 123        check_res_locked(req->l_resource);
 124        if (req->l_export->exp_flock_hash != NULL &&
 125            !hlist_unhashed(&req->l_exp_flock_hash))
 126                cfs_hash_del(req->l_export->exp_flock_hash,
 127                             &req->l_policy_data.l_flock.owner,
 128                             &req->l_exp_flock_hash);
 129}
 130
 131static inline void
 132ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
 133{
 134        LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
 135                   mode, flags);
 136
 137        /* Safe to not lock here, since it should be empty anyway */
 138        LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
 139
 140        list_del_init(&lock->l_res_link);
 141        if (flags == LDLM_FL_WAIT_NOREPROC &&
 142            !(lock->l_flags & LDLM_FL_FAILED)) {
 143                /* client side - set a flag to prevent sending a CANCEL */
 144                lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
 145
 146                /* when reaching here, it is under lock_res_and_lock(). Thus,
 147                   need call the nolock version of ldlm_lock_decref_internal*/
 148                ldlm_lock_decref_internal_nolock(lock, mode);
 149        }
 150
 151        ldlm_lock_destroy_nolock(lock);
 152}
 153
 154/**
 155 * POSIX locks deadlock detection code.
 156 *
 157 * Given a new lock \a req and an existing lock \a bl_lock it conflicts
 158 * with, we need to iterate through all blocked POSIX locks for this
 159 * export and see if there is a deadlock condition arising. (i.e. when
 160 * one client holds a lock on something and want a lock on something
 161 * else and at the same time another client has the opposite situation).
 162 */
 163static int
 164ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
 165{
 166        struct obd_export *req_exp = req->l_export;
 167        struct obd_export *bl_exp = bl_lock->l_export;
 168        __u64 req_owner = req->l_policy_data.l_flock.owner;
 169        __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
 170
 171        /* For server only */
 172        if (req_exp == NULL)
 173                return 0;
 174
 175        class_export_get(bl_exp);
 176        while (1) {
 177                struct obd_export *bl_exp_new;
 178                struct ldlm_lock *lock = NULL;
 179                struct ldlm_flock *flock;
 180
 181                if (bl_exp->exp_flock_hash != NULL)
 182                        lock = cfs_hash_lookup(bl_exp->exp_flock_hash,
 183                                               &bl_owner);
 184                if (lock == NULL)
 185                        break;
 186
 187                LASSERT(req != lock);
 188                flock = &lock->l_policy_data.l_flock;
 189                LASSERT(flock->owner == bl_owner);
 190                bl_owner = flock->blocking_owner;
 191                bl_exp_new = class_export_get(flock->blocking_export);
 192                class_export_put(bl_exp);
 193
 194                cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
 195                bl_exp = bl_exp_new;
 196
 197                if (bl_owner == req_owner && bl_exp == req_exp) {
 198                        class_export_put(bl_exp);
 199                        return 1;
 200                }
 201        }
 202        class_export_put(bl_exp);
 203
 204        return 0;
 205}
 206
 207static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
 208                                          struct list_head *work_list)
 209{
 210        CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
 211
 212        if ((exp_connect_flags(lock->l_export) &
 213                                OBD_CONNECT_FLOCK_DEAD) == 0) {
 214                CERROR(
 215                      "deadlock found, but client doesn't support flock canceliation\n");
 216        } else {
 217                LASSERT(lock->l_completion_ast);
 218                LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0);
 219                lock->l_flags |= LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
 220                        LDLM_FL_FLOCK_DEADLOCK;
 221                ldlm_flock_blocking_unlink(lock);
 222                ldlm_resource_unlink_lock(lock);
 223                ldlm_add_ast_work_item(lock, NULL, work_list);
 224        }
 225}
 226
 227/**
 228 * Process a granting attempt for flock lock.
 229 * Must be called under ns lock held.
 230 *
 231 * This function looks for any conflicts for \a lock in the granted or
 232 * waiting queues. The lock is granted if no conflicts are found in
 233 * either queue.
 234 *
 235 * It is also responsible for splitting a lock if a portion of the lock
 236 * is released.
 237 *
 238 * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
 239 *   - blocking ASTs have already been sent
 240 *
 241 * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
 242 *   - blocking ASTs have not been sent yet, so list of conflicting locks
 243 *     would be collected and ASTs sent.
 244 */
 245int
 246ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
 247                        ldlm_error_t *err, struct list_head *work_list)
 248{
 249        struct ldlm_resource *res = req->l_resource;
 250        struct ldlm_namespace *ns = ldlm_res_to_ns(res);
 251        struct list_head *tmp;
 252        struct list_head *ownlocks = NULL;
 253        struct ldlm_lock *lock = NULL;
 254        struct ldlm_lock *new = req;
 255        struct ldlm_lock *new2 = NULL;
 256        ldlm_mode_t mode = req->l_req_mode;
 257        int local = ns_is_client(ns);
 258        int added = (mode == LCK_NL);
 259        int overlaps = 0;
 260        int splitted = 0;
 261        const struct ldlm_callback_suite null_cbs = { NULL };
 262
 263        CDEBUG(D_DLMTRACE,
 264               "flags %#llx owner %llu pid %u mode %u start %llu end %llu\n",
 265               *flags, new->l_policy_data.l_flock.owner,
 266               new->l_policy_data.l_flock.pid, mode,
 267               req->l_policy_data.l_flock.start,
 268               req->l_policy_data.l_flock.end);
 269
 270        *err = ELDLM_OK;
 271
 272        if (local) {
 273                /* No blocking ASTs are sent to the clients for
 274                 * Posix file & record locks */
 275                req->l_blocking_ast = NULL;
 276        } else {
 277                /* Called on the server for lock cancels. */
 278                req->l_blocking_ast = ldlm_flock_blocking_ast;
 279        }
 280
 281reprocess:
 282        if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
 283                /* This loop determines where this processes locks start
 284                 * in the resource lr_granted list. */
 285                list_for_each(tmp, &res->lr_granted) {
 286                        lock = list_entry(tmp, struct ldlm_lock,
 287                                              l_res_link);
 288                        if (ldlm_same_flock_owner(lock, req)) {
 289                                ownlocks = tmp;
 290                                break;
 291                        }
 292                }
 293        } else {
 294                int reprocess_failed = 0;
 295
 296                lockmode_verify(mode);
 297
 298                /* This loop determines if there are existing locks
 299                 * that conflict with the new lock request. */
 300                list_for_each(tmp, &res->lr_granted) {
 301                        lock = list_entry(tmp, struct ldlm_lock,
 302                                              l_res_link);
 303
 304                        if (ldlm_same_flock_owner(lock, req)) {
 305                                if (!ownlocks)
 306                                        ownlocks = tmp;
 307                                continue;
 308                        }
 309
 310                        /* locks are compatible, overlap doesn't matter */
 311                        if (lockmode_compat(lock->l_granted_mode, mode))
 312                                continue;
 313
 314                        if (!ldlm_flocks_overlap(lock, req))
 315                                continue;
 316
 317                        if (!first_enq) {
 318                                reprocess_failed = 1;
 319                                if (ldlm_flock_deadlock(req, lock)) {
 320                                        ldlm_flock_cancel_on_deadlock(req,
 321                                                        work_list);
 322                                        return LDLM_ITER_CONTINUE;
 323                                }
 324                                continue;
 325                        }
 326
 327                        if (*flags & LDLM_FL_BLOCK_NOWAIT) {
 328                                ldlm_flock_destroy(req, mode, *flags);
 329                                *err = -EAGAIN;
 330                                return LDLM_ITER_STOP;
 331                        }
 332
 333                        if (*flags & LDLM_FL_TEST_LOCK) {
 334                                ldlm_flock_destroy(req, mode, *flags);
 335                                req->l_req_mode = lock->l_granted_mode;
 336                                req->l_policy_data.l_flock.pid =
 337                                        lock->l_policy_data.l_flock.pid;
 338                                req->l_policy_data.l_flock.start =
 339                                        lock->l_policy_data.l_flock.start;
 340                                req->l_policy_data.l_flock.end =
 341                                        lock->l_policy_data.l_flock.end;
 342                                *flags |= LDLM_FL_LOCK_CHANGED;
 343                                return LDLM_ITER_STOP;
 344                        }
 345
 346                        /* add lock to blocking list before deadlock
 347                         * check to prevent race */
 348                        ldlm_flock_blocking_link(req, lock);
 349
 350                        if (ldlm_flock_deadlock(req, lock)) {
 351                                ldlm_flock_blocking_unlink(req);
 352                                ldlm_flock_destroy(req, mode, *flags);
 353                                *err = -EDEADLK;
 354                                return LDLM_ITER_STOP;
 355                        }
 356
 357                        ldlm_resource_add_lock(res, &res->lr_waiting, req);
 358                        *flags |= LDLM_FL_BLOCK_GRANTED;
 359                        return LDLM_ITER_STOP;
 360                }
 361                if (reprocess_failed)
 362                        return LDLM_ITER_CONTINUE;
 363        }
 364
 365        if (*flags & LDLM_FL_TEST_LOCK) {
 366                ldlm_flock_destroy(req, mode, *flags);
 367                req->l_req_mode = LCK_NL;
 368                *flags |= LDLM_FL_LOCK_CHANGED;
 369                return LDLM_ITER_STOP;
 370        }
 371
 372        /* In case we had slept on this lock request take it off of the
 373         * deadlock detection hash list. */
 374        ldlm_flock_blocking_unlink(req);
 375
 376        /* Scan the locks owned by this process that overlap this request.
 377         * We may have to merge or split existing locks. */
 378
 379        if (!ownlocks)
 380                ownlocks = &res->lr_granted;
 381
 382        list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
 383                lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
 384
 385                if (!ldlm_same_flock_owner(lock, new))
 386                        break;
 387
 388                if (lock->l_granted_mode == mode) {
 389                        /* If the modes are the same then we need to process
 390                         * locks that overlap OR adjoin the new lock. The extra
 391                         * logic condition is necessary to deal with arithmetic
 392                         * overflow and underflow. */
 393                        if ((new->l_policy_data.l_flock.start >
 394                             (lock->l_policy_data.l_flock.end + 1))
 395                            && (lock->l_policy_data.l_flock.end !=
 396                                OBD_OBJECT_EOF))
 397                                continue;
 398
 399                        if ((new->l_policy_data.l_flock.end <
 400                             (lock->l_policy_data.l_flock.start - 1))
 401                            && (lock->l_policy_data.l_flock.start != 0))
 402                                break;
 403
 404                        if (new->l_policy_data.l_flock.start <
 405                            lock->l_policy_data.l_flock.start) {
 406                                lock->l_policy_data.l_flock.start =
 407                                        new->l_policy_data.l_flock.start;
 408                        } else {
 409                                new->l_policy_data.l_flock.start =
 410                                        lock->l_policy_data.l_flock.start;
 411                        }
 412
 413                        if (new->l_policy_data.l_flock.end >
 414                            lock->l_policy_data.l_flock.end) {
 415                                lock->l_policy_data.l_flock.end =
 416                                        new->l_policy_data.l_flock.end;
 417                        } else {
 418                                new->l_policy_data.l_flock.end =
 419                                        lock->l_policy_data.l_flock.end;
 420                        }
 421
 422                        if (added) {
 423                                ldlm_flock_destroy(lock, mode, *flags);
 424                        } else {
 425                                new = lock;
 426                                added = 1;
 427                        }
 428                        continue;
 429                }
 430
 431                if (new->l_policy_data.l_flock.start >
 432                    lock->l_policy_data.l_flock.end)
 433                        continue;
 434
 435                if (new->l_policy_data.l_flock.end <
 436                    lock->l_policy_data.l_flock.start)
 437                        break;
 438
 439                ++overlaps;
 440
 441                if (new->l_policy_data.l_flock.start <=
 442                    lock->l_policy_data.l_flock.start) {
 443                        if (new->l_policy_data.l_flock.end <
 444                            lock->l_policy_data.l_flock.end) {
 445                                lock->l_policy_data.l_flock.start =
 446                                        new->l_policy_data.l_flock.end + 1;
 447                                break;
 448                        }
 449                        ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
 450                        continue;
 451                }
 452                if (new->l_policy_data.l_flock.end >=
 453                    lock->l_policy_data.l_flock.end) {
 454                        lock->l_policy_data.l_flock.end =
 455                                new->l_policy_data.l_flock.start - 1;
 456                        continue;
 457                }
 458
 459                /* split the existing lock into two locks */
 460
 461                /* if this is an F_UNLCK operation then we could avoid
 462                 * allocating a new lock and use the req lock passed in
 463                 * with the request but this would complicate the reply
 464                 * processing since updates to req get reflected in the
 465                 * reply. The client side replays the lock request so
 466                 * it must see the original lock data in the reply. */
 467
 468                /* XXX - if ldlm_lock_new() can sleep we should
 469                 * release the lr_lock, allocate the new lock,
 470                 * and restart processing this lock. */
 471                if (!new2) {
 472                        unlock_res_and_lock(req);
 473                        new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
 474                                                lock->l_granted_mode, &null_cbs,
 475                                                NULL, 0, LVB_T_NONE);
 476                        lock_res_and_lock(req);
 477                        if (!new2) {
 478                                ldlm_flock_destroy(req, lock->l_granted_mode,
 479                                                   *flags);
 480                                *err = -ENOLCK;
 481                                return LDLM_ITER_STOP;
 482                        }
 483                        goto reprocess;
 484                }
 485
 486                splitted = 1;
 487
 488                new2->l_granted_mode = lock->l_granted_mode;
 489                new2->l_policy_data.l_flock.pid =
 490                        new->l_policy_data.l_flock.pid;
 491                new2->l_policy_data.l_flock.owner =
 492                        new->l_policy_data.l_flock.owner;
 493                new2->l_policy_data.l_flock.start =
 494                        lock->l_policy_data.l_flock.start;
 495                new2->l_policy_data.l_flock.end =
 496                        new->l_policy_data.l_flock.start - 1;
 497                lock->l_policy_data.l_flock.start =
 498                        new->l_policy_data.l_flock.end + 1;
 499                new2->l_conn_export = lock->l_conn_export;
 500                if (lock->l_export != NULL) {
 501                        new2->l_export = class_export_lock_get(lock->l_export,
 502                                                               new2);
 503                        if (new2->l_export->exp_lock_hash &&
 504                            hlist_unhashed(&new2->l_exp_hash))
 505                                cfs_hash_add(new2->l_export->exp_lock_hash,
 506                                             &new2->l_remote_handle,
 507                                             &new2->l_exp_hash);
 508                }
 509                if (*flags == LDLM_FL_WAIT_NOREPROC)
 510                        ldlm_lock_addref_internal_nolock(new2,
 511                                                         lock->l_granted_mode);
 512
 513                /* insert new2 at lock */
 514                ldlm_resource_add_lock(res, ownlocks, new2);
 515                LDLM_LOCK_RELEASE(new2);
 516                break;
 517        }
 518
 519        /* if new2 is created but never used, destroy it*/
 520        if (splitted == 0 && new2 != NULL)
 521                ldlm_lock_destroy_nolock(new2);
 522
 523        /* At this point we're granting the lock request. */
 524        req->l_granted_mode = req->l_req_mode;
 525
 526        /* Add req to the granted queue before calling ldlm_reprocess_all(). */
 527        if (!added) {
 528                list_del_init(&req->l_res_link);
 529                /* insert new lock before ownlocks in list. */
 530                ldlm_resource_add_lock(res, ownlocks, req);
 531        }
 532
 533        if (*flags != LDLM_FL_WAIT_NOREPROC) {
 534                /* The only one possible case for client-side calls flock
 535                 * policy function is ldlm_flock_completion_ast inside which
 536                 * carries LDLM_FL_WAIT_NOREPROC flag. */
 537                CERROR("Illegal parameter for client-side-only module.\n");
 538                LBUG();
 539        }
 540
 541        /* In case we're reprocessing the requested lock we can't destroy
 542         * it until after calling ldlm_add_ast_work_item() above so that laawi()
 543         * can bump the reference count on \a req. Otherwise \a req
 544         * could be freed before the completion AST can be sent.  */
 545        if (added)
 546                ldlm_flock_destroy(req, mode, *flags);
 547
 548        ldlm_resource_dump(D_INFO, res);
 549        return LDLM_ITER_CONTINUE;
 550}
 551
 552struct ldlm_flock_wait_data {
 553        struct ldlm_lock *fwd_lock;
 554        int            fwd_generation;
 555};
 556
 557static void
 558ldlm_flock_interrupted_wait(void *data)
 559{
 560        struct ldlm_lock *lock;
 561
 562        lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
 563
 564        /* take lock off the deadlock detection hash list. */
 565        lock_res_and_lock(lock);
 566        ldlm_flock_blocking_unlink(lock);
 567
 568        /* client side - set flag to prevent lock from being put on LRU list */
 569        lock->l_flags |= LDLM_FL_CBPENDING;
 570        unlock_res_and_lock(lock);
 571}
 572
 573/**
 574 * Flock completion callback function.
 575 *
 576 * \param lock [in,out]: A lock to be handled
 577 * \param flags    [in]: flags
 578 * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
 579 *
 580 * \retval 0    : success
 581 * \retval <0   : failure
 582 */
 583int
 584ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
 585{
 586        struct file_lock                *getlk = lock->l_ast_data;
 587        struct obd_device             *obd;
 588        struct obd_import             *imp = NULL;
 589        struct ldlm_flock_wait_data     fwd;
 590        struct l_wait_info            lwi;
 591        ldlm_error_t                err;
 592        int                          rc = 0;
 593
 594        CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
 595               flags, data, getlk);
 596
 597        /* Import invalidation. We need to actually release the lock
 598         * references being held, so that it can go away. No point in
 599         * holding the lock even if app still believes it has it, since
 600         * server already dropped it anyway. Only for granted locks too. */
 601        if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
 602            (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
 603                if (lock->l_req_mode == lock->l_granted_mode &&
 604                    lock->l_granted_mode != LCK_NL &&
 605                    NULL == data)
 606                        ldlm_lock_decref_internal(lock, lock->l_req_mode);
 607
 608                /* Need to wake up the waiter if we were evicted */
 609                wake_up(&lock->l_waitq);
 610                return 0;
 611        }
 612
 613        LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
 614
 615        if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
 616                       LDLM_FL_BLOCK_CONV))) {
 617                if (NULL == data)
 618                        /* mds granted the lock in the reply */
 619                        goto granted;
 620                /* CP AST RPC: lock get granted, wake it up */
 621                wake_up(&lock->l_waitq);
 622                return 0;
 623        }
 624
 625        LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, sleeping");
 626        fwd.fwd_lock = lock;
 627        obd = class_exp2obd(lock->l_conn_export);
 628
 629        /* if this is a local lock, there is no import */
 630        if (NULL != obd)
 631                imp = obd->u.cli.cl_import;
 632
 633        if (NULL != imp) {
 634                spin_lock(&imp->imp_lock);
 635                fwd.fwd_generation = imp->imp_generation;
 636                spin_unlock(&imp->imp_lock);
 637        }
 638
 639        lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
 640
 641        /* Go to sleep until the lock is granted. */
 642        rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
 643
 644        if (rc) {
 645                LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
 646                           rc);
 647                return rc;
 648        }
 649
 650granted:
 651        OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
 652
 653        if (lock->l_flags & LDLM_FL_DESTROYED) {
 654                LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
 655                return 0;
 656        }
 657
 658        if (lock->l_flags & LDLM_FL_FAILED) {
 659                LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
 660                return -EIO;
 661        }
 662
 663        if (rc) {
 664                LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
 665                           rc);
 666                return rc;
 667        }
 668
 669        LDLM_DEBUG(lock, "client-side enqueue granted");
 670
 671        lock_res_and_lock(lock);
 672
 673        /* take lock off the deadlock detection hash list. */
 674        ldlm_flock_blocking_unlink(lock);
 675
 676        /* ldlm_lock_enqueue() has already placed lock on the granted list. */
 677        list_del_init(&lock->l_res_link);
 678
 679        if (lock->l_flags & LDLM_FL_FLOCK_DEADLOCK) {
 680                LDLM_DEBUG(lock, "client-side enqueue deadlock received");
 681                rc = -EDEADLK;
 682        } else if (flags & LDLM_FL_TEST_LOCK) {
 683                /* fcntl(F_GETLK) request */
 684                /* The old mode was saved in getlk->fl_type so that if the mode
 685                 * in the lock changes we can decref the appropriate refcount.*/
 686                ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
 687                switch (lock->l_granted_mode) {
 688                case LCK_PR:
 689                        getlk->fl_type = F_RDLCK;
 690                        break;
 691                case LCK_PW:
 692                        getlk->fl_type = F_WRLCK;
 693                        break;
 694                default:
 695                        getlk->fl_type = F_UNLCK;
 696                }
 697                getlk->fl_pid = (pid_t)lock->l_policy_data.l_flock.pid;
 698                getlk->fl_start = (loff_t)lock->l_policy_data.l_flock.start;
 699                getlk->fl_end = (loff_t)lock->l_policy_data.l_flock.end;
 700        } else {
 701                __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
 702
 703                /* We need to reprocess the lock to do merges or splits
 704                 * with existing locks owned by this process. */
 705                ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
 706        }
 707        unlock_res_and_lock(lock);
 708        return rc;
 709}
 710EXPORT_SYMBOL(ldlm_flock_completion_ast);
 711
 712int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 713                            void *data, int flag)
 714{
 715        LASSERT(lock);
 716        LASSERT(flag == LDLM_CB_CANCELING);
 717
 718        /* take lock off the deadlock detection hash list. */
 719        lock_res_and_lock(lock);
 720        ldlm_flock_blocking_unlink(lock);
 721        unlock_res_and_lock(lock);
 722        return 0;
 723}
 724
 725void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
 726                                       ldlm_policy_data_t *lpolicy)
 727{
 728        memset(lpolicy, 0, sizeof(*lpolicy));
 729        lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
 730        lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
 731        lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
 732        /* Compat code, old clients had no idea about owner field and
 733         * relied solely on pid for ownership. Introduced in LU-104, 2.1,
 734         * April 2011 */
 735        lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
 736}
 737
 738
 739void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
 740                                       ldlm_policy_data_t *lpolicy)
 741{
 742        memset(lpolicy, 0, sizeof(*lpolicy));
 743        lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
 744        lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
 745        lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
 746        lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
 747}
 748
 749void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
 750                                     ldlm_wire_policy_data_t *wpolicy)
 751{
 752        memset(wpolicy, 0, sizeof(*wpolicy));
 753        wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
 754        wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
 755        wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
 756        wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
 757}
 758
 759/*
 760 * Export handle<->flock hash operations.
 761 */
 762static unsigned
 763ldlm_export_flock_hash(struct cfs_hash *hs, const void *key, unsigned mask)
 764{
 765        return cfs_hash_u64_hash(*(__u64 *)key, mask);
 766}
 767
 768static void *
 769ldlm_export_flock_key(struct hlist_node *hnode)
 770{
 771        struct ldlm_lock *lock;
 772
 773        lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
 774        return &lock->l_policy_data.l_flock.owner;
 775}
 776
 777static int
 778ldlm_export_flock_keycmp(const void *key, struct hlist_node *hnode)
 779{
 780        return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
 781}
 782
 783static void *
 784ldlm_export_flock_object(struct hlist_node *hnode)
 785{
 786        return hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
 787}
 788
 789static void
 790ldlm_export_flock_get(struct cfs_hash *hs, struct hlist_node *hnode)
 791{
 792        struct ldlm_lock *lock;
 793        struct ldlm_flock *flock;
 794
 795        lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
 796        LDLM_LOCK_GET(lock);
 797
 798        flock = &lock->l_policy_data.l_flock;
 799        LASSERT(flock->blocking_export != NULL);
 800        class_export_get(flock->blocking_export);
 801        flock->blocking_refs++;
 802}
 803
 804static void
 805ldlm_export_flock_put(struct cfs_hash *hs, struct hlist_node *hnode)
 806{
 807        struct ldlm_lock *lock;
 808        struct ldlm_flock *flock;
 809
 810        lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
 811        LDLM_LOCK_RELEASE(lock);
 812
 813        flock = &lock->l_policy_data.l_flock;
 814        LASSERT(flock->blocking_export != NULL);
 815        class_export_put(flock->blocking_export);
 816        if (--flock->blocking_refs == 0) {
 817                flock->blocking_owner = 0;
 818                flock->blocking_export = NULL;
 819        }
 820}
 821
 822static cfs_hash_ops_t ldlm_export_flock_ops = {
 823        .hs_hash        = ldlm_export_flock_hash,
 824        .hs_key  = ldlm_export_flock_key,
 825        .hs_keycmp      = ldlm_export_flock_keycmp,
 826        .hs_object      = ldlm_export_flock_object,
 827        .hs_get  = ldlm_export_flock_get,
 828        .hs_put  = ldlm_export_flock_put,
 829        .hs_put_locked  = ldlm_export_flock_put,
 830};
 831
 832int ldlm_init_flock_export(struct obd_export *exp)
 833{
 834        if (strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDT_NAME) != 0)
 835                return 0;
 836
 837        exp->exp_flock_hash =
 838                cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
 839                                HASH_EXP_LOCK_CUR_BITS,
 840                                HASH_EXP_LOCK_MAX_BITS,
 841                                HASH_EXP_LOCK_BKT_BITS, 0,
 842                                CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
 843                                &ldlm_export_flock_ops,
 844                                CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
 845        if (!exp->exp_flock_hash)
 846                return -ENOMEM;
 847
 848        return 0;
 849}
 850EXPORT_SYMBOL(ldlm_init_flock_export);
 851
 852void ldlm_destroy_flock_export(struct obd_export *exp)
 853{
 854        if (exp->exp_flock_hash) {
 855                cfs_hash_putref(exp->exp_flock_hash);
 856                exp->exp_flock_hash = NULL;
 857        }
 858}
 859EXPORT_SYMBOL(ldlm_destroy_flock_export);
 860