linux/fs/ocfs2/dlm/dlmlock.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3 * dlmlock.c
   4 *
   5 * underlying calls for lock creation
   6 *
   7 * Copyright (C) 2004 Oracle.  All rights reserved.
   8 */
   9
  10
  11#include <linux/module.h>
  12#include <linux/fs.h>
  13#include <linux/types.h>
  14#include <linux/slab.h>
  15#include <linux/highmem.h>
  16#include <linux/init.h>
  17#include <linux/sysctl.h>
  18#include <linux/random.h>
  19#include <linux/blkdev.h>
  20#include <linux/socket.h>
  21#include <linux/inet.h>
  22#include <linux/spinlock.h>
  23#include <linux/delay.h>
  24
  25
  26#include "../cluster/heartbeat.h"
  27#include "../cluster/nodemanager.h"
  28#include "../cluster/tcp.h"
  29
  30#include "dlmapi.h"
  31#include "dlmcommon.h"
  32
  33#include "dlmconvert.h"
  34
  35#define MLOG_MASK_PREFIX ML_DLM
  36#include "../cluster/masklog.h"
  37
  38static struct kmem_cache *dlm_lock_cache;
  39
  40static DEFINE_SPINLOCK(dlm_cookie_lock);
  41static u64 dlm_next_cookie = 1;
  42
  43static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
  44                                               struct dlm_lock_resource *res,
  45                                               struct dlm_lock *lock, int flags);
  46static void dlm_init_lock(struct dlm_lock *newlock, int type,
  47                          u8 node, u64 cookie);
  48static void dlm_lock_release(struct kref *kref);
  49static void dlm_lock_detach_lockres(struct dlm_lock *lock);
  50
  51int dlm_init_lock_cache(void)
  52{
  53        dlm_lock_cache = kmem_cache_create("o2dlm_lock",
  54                                           sizeof(struct dlm_lock),
  55                                           0, SLAB_HWCACHE_ALIGN, NULL);
  56        if (dlm_lock_cache == NULL)
  57                return -ENOMEM;
  58        return 0;
  59}
  60
  61void dlm_destroy_lock_cache(void)
  62{
  63        kmem_cache_destroy(dlm_lock_cache);
  64}
  65
  66/* Tell us whether we can grant a new lock request.
  67 * locking:
  68 *   caller needs:  res->spinlock
  69 *   taken:         none
  70 *   held on exit:  none
  71 * returns: 1 if the lock can be granted, 0 otherwise.
  72 */
  73static int dlm_can_grant_new_lock(struct dlm_lock_resource *res,
  74                                  struct dlm_lock *lock)
  75{
  76        struct dlm_lock *tmplock;
  77
  78        list_for_each_entry(tmplock, &res->granted, list) {
  79                if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type))
  80                        return 0;
  81        }
  82
  83        list_for_each_entry(tmplock, &res->converting, list) {
  84                if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type))
  85                        return 0;
  86                if (!dlm_lock_compatible(tmplock->ml.convert_type,
  87                                         lock->ml.type))
  88                        return 0;
  89        }
  90
  91        return 1;
  92}
  93
  94/* performs lock creation at the lockres master site
  95 * locking:
  96 *   caller needs:  none
  97 *   taken:         takes and drops res->spinlock
  98 *   held on exit:  none
  99 * returns: DLM_NORMAL, DLM_NOTQUEUED
 100 */
 101static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
 102                                      struct dlm_lock_resource *res,
 103                                      struct dlm_lock *lock, int flags)
 104{
 105        int call_ast = 0, kick_thread = 0;
 106        enum dlm_status status = DLM_NORMAL;
 107
 108        mlog(0, "type=%d\n", lock->ml.type);
 109
 110        spin_lock(&res->spinlock);
 111        /* if called from dlm_create_lock_handler, need to
 112         * ensure it will not sleep in dlm_wait_on_lockres */
 113        status = __dlm_lockres_state_to_status(res);
 114        if (status != DLM_NORMAL &&
 115            lock->ml.node != dlm->node_num) {
 116                /* erf.  state changed after lock was dropped. */
 117                spin_unlock(&res->spinlock);
 118                dlm_error(status);
 119                return status;
 120        }
 121        __dlm_wait_on_lockres(res);
 122        __dlm_lockres_reserve_ast(res);
 123
 124        if (dlm_can_grant_new_lock(res, lock)) {
 125                mlog(0, "I can grant this lock right away\n");
 126                /* got it right away */
 127                lock->lksb->status = DLM_NORMAL;
 128                status = DLM_NORMAL;
 129                dlm_lock_get(lock);
 130                list_add_tail(&lock->list, &res->granted);
 131
 132                /* for the recovery lock, we can't allow the ast
 133                 * to be queued since the dlmthread is already
 134                 * frozen.  but the recovery lock is always locked
 135                 * with LKM_NOQUEUE so we do not need the ast in
 136                 * this special case */
 137                if (!dlm_is_recovery_lock(res->lockname.name,
 138                                          res->lockname.len)) {
 139                        kick_thread = 1;
 140                        call_ast = 1;
 141                } else {
 142                        mlog(0, "%s: returning DLM_NORMAL to "
 143                             "node %u for reco lock\n", dlm->name,
 144                             lock->ml.node);
 145                }
 146        } else {
 147                /* for NOQUEUE request, unless we get the
 148                 * lock right away, return DLM_NOTQUEUED */
 149                if (flags & LKM_NOQUEUE) {
 150                        status = DLM_NOTQUEUED;
 151                        if (dlm_is_recovery_lock(res->lockname.name,
 152                                                 res->lockname.len)) {
 153                                mlog(0, "%s: returning NOTQUEUED to "
 154                                     "node %u for reco lock\n", dlm->name,
 155                                     lock->ml.node);
 156                        }
 157                } else {
 158                        status = DLM_NORMAL;
 159                        dlm_lock_get(lock);
 160                        list_add_tail(&lock->list, &res->blocked);
 161                        kick_thread = 1;
 162                }
 163        }
 164
 165        spin_unlock(&res->spinlock);
 166        wake_up(&res->wq);
 167
 168        /* either queue the ast or release it */
 169        if (call_ast)
 170                dlm_queue_ast(dlm, lock);
 171        else
 172                dlm_lockres_release_ast(dlm, res);
 173
 174        dlm_lockres_calc_usage(dlm, res);
 175        if (kick_thread)
 176                dlm_kick_thread(dlm, res);
 177
 178        return status;
 179}
 180
 181void dlm_revert_pending_lock(struct dlm_lock_resource *res,
 182                             struct dlm_lock *lock)
 183{
 184        /* remove from local queue if it failed */
 185        list_del_init(&lock->list);
 186        lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
 187}
 188
 189
 190/*
 191 * locking:
 192 *   caller needs:  none
 193 *   taken:         takes and drops res->spinlock
 194 *   held on exit:  none
 195 * returns: DLM_DENIED, DLM_RECOVERING, or net status
 196 */
 197static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
 198                                      struct dlm_lock_resource *res,
 199                                      struct dlm_lock *lock, int flags)
 200{
 201        enum dlm_status status = DLM_DENIED;
 202        int lockres_changed = 1;
 203
 204        mlog(0, "type=%d, lockres %.*s, flags = 0x%x\n",
 205             lock->ml.type, res->lockname.len,
 206             res->lockname.name, flags);
 207
 208        /*
 209         * Wait if resource is getting recovered, remastered, etc.
 210         * If the resource was remastered and new owner is self, then exit.
 211         */
 212        spin_lock(&res->spinlock);
 213        __dlm_wait_on_lockres(res);
 214        if (res->owner == dlm->node_num) {
 215                spin_unlock(&res->spinlock);
 216                return DLM_RECOVERING;
 217        }
 218        res->state |= DLM_LOCK_RES_IN_PROGRESS;
 219
 220        /* add lock to local (secondary) queue */
 221        dlm_lock_get(lock);
 222        list_add_tail(&lock->list, &res->blocked);
 223        lock->lock_pending = 1;
 224        spin_unlock(&res->spinlock);
 225
 226        /* spec seems to say that you will get DLM_NORMAL when the lock
 227         * has been queued, meaning we need to wait for a reply here. */
 228        status = dlm_send_remote_lock_request(dlm, res, lock, flags);
 229
 230        spin_lock(&res->spinlock);
 231        res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
 232        lock->lock_pending = 0;
 233        if (status != DLM_NORMAL) {
 234                if (status == DLM_RECOVERING &&
 235                    dlm_is_recovery_lock(res->lockname.name,
 236                                         res->lockname.len)) {
 237                        /* recovery lock was mastered by dead node.
 238                         * we need to have calc_usage shoot down this
 239                         * lockres and completely remaster it. */
 240                        mlog(0, "%s: recovery lock was owned by "
 241                             "dead node %u, remaster it now.\n",
 242                             dlm->name, res->owner);
 243                } else if (status != DLM_NOTQUEUED) {
 244                        /*
 245                         * DO NOT call calc_usage, as this would unhash
 246                         * the remote lockres before we ever get to use
 247                         * it.  treat as if we never made any change to
 248                         * the lockres.
 249                         */
 250                        lockres_changed = 0;
 251                        dlm_error(status);
 252                }
 253                dlm_revert_pending_lock(res, lock);
 254                dlm_lock_put(lock);
 255        } else if (dlm_is_recovery_lock(res->lockname.name,
 256                                        res->lockname.len)) {
 257                /* special case for the $RECOVERY lock.
 258                 * there will never be an AST delivered to put
 259                 * this lock on the proper secondary queue
 260                 * (granted), so do it manually. */
 261                mlog(0, "%s: $RECOVERY lock for this node (%u) is "
 262                     "mastered by %u; got lock, manually granting (no ast)\n",
 263                     dlm->name, dlm->node_num, res->owner);
 264                list_move_tail(&lock->list, &res->granted);
 265        }
 266        spin_unlock(&res->spinlock);
 267
 268        if (lockres_changed)
 269                dlm_lockres_calc_usage(dlm, res);
 270
 271        wake_up(&res->wq);
 272        return status;
 273}
 274
 275
 276/* for remote lock creation.
 277 * locking:
 278 *   caller needs:  none, but need res->state & DLM_LOCK_RES_IN_PROGRESS
 279 *   taken:         none
 280 *   held on exit:  none
 281 * returns: DLM_NOLOCKMGR, or net status
 282 */
 283static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
 284                                               struct dlm_lock_resource *res,
 285                                               struct dlm_lock *lock, int flags)
 286{
 287        struct dlm_create_lock create;
 288        int tmpret, status = 0;
 289        enum dlm_status ret;
 290
 291        memset(&create, 0, sizeof(create));
 292        create.node_idx = dlm->node_num;
 293        create.requested_type = lock->ml.type;
 294        create.cookie = lock->ml.cookie;
 295        create.namelen = res->lockname.len;
 296        create.flags = cpu_to_be32(flags);
 297        memcpy(create.name, res->lockname.name, create.namelen);
 298
 299        tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create,
 300                                    sizeof(create), res->owner, &status);
 301        if (tmpret >= 0) {
 302                ret = status;
 303                if (ret == DLM_REJECTED) {
 304                        mlog(ML_ERROR, "%s: res %.*s, Stale lockres no longer "
 305                             "owned by node %u. That node is coming back up "
 306                             "currently.\n", dlm->name, create.namelen,
 307                             create.name, res->owner);
 308                        dlm_print_one_lock_resource(res);
 309                        BUG();
 310                }
 311        } else {
 312                mlog(ML_ERROR, "%s: res %.*s, Error %d send CREATE LOCK to "
 313                     "node %u\n", dlm->name, create.namelen, create.name,
 314                     tmpret, res->owner);
 315                if (dlm_is_host_down(tmpret))
 316                        ret = DLM_RECOVERING;
 317                else
 318                        ret = dlm_err_to_dlm_status(tmpret);
 319        }
 320
 321        return ret;
 322}
 323
 324void dlm_lock_get(struct dlm_lock *lock)
 325{
 326        kref_get(&lock->lock_refs);
 327}
 328
 329void dlm_lock_put(struct dlm_lock *lock)
 330{
 331        kref_put(&lock->lock_refs, dlm_lock_release);
 332}
 333
 334static void dlm_lock_release(struct kref *kref)
 335{
 336        struct dlm_lock *lock;
 337
 338        lock = container_of(kref, struct dlm_lock, lock_refs);
 339
 340        BUG_ON(!list_empty(&lock->list));
 341        BUG_ON(!list_empty(&lock->ast_list));
 342        BUG_ON(!list_empty(&lock->bast_list));
 343        BUG_ON(lock->ast_pending);
 344        BUG_ON(lock->bast_pending);
 345
 346        dlm_lock_detach_lockres(lock);
 347
 348        if (lock->lksb_kernel_allocated) {
 349                mlog(0, "freeing kernel-allocated lksb\n");
 350                kfree(lock->lksb);
 351        }
 352        kmem_cache_free(dlm_lock_cache, lock);
 353}
 354
 355/* associate a lock with it's lockres, getting a ref on the lockres */
 356void dlm_lock_attach_lockres(struct dlm_lock *lock,
 357                             struct dlm_lock_resource *res)
 358{
 359        dlm_lockres_get(res);
 360        lock->lockres = res;
 361}
 362
 363/* drop ref on lockres, if there is still one associated with lock */
 364static void dlm_lock_detach_lockres(struct dlm_lock *lock)
 365{
 366        struct dlm_lock_resource *res;
 367
 368        res = lock->lockres;
 369        if (res) {
 370                lock->lockres = NULL;
 371                mlog(0, "removing lock's lockres reference\n");
 372                dlm_lockres_put(res);
 373        }
 374}
 375
 376static void dlm_init_lock(struct dlm_lock *newlock, int type,
 377                          u8 node, u64 cookie)
 378{
 379        INIT_LIST_HEAD(&newlock->list);
 380        INIT_LIST_HEAD(&newlock->ast_list);
 381        INIT_LIST_HEAD(&newlock->bast_list);
 382        spin_lock_init(&newlock->spinlock);
 383        newlock->ml.type = type;
 384        newlock->ml.convert_type = LKM_IVMODE;
 385        newlock->ml.highest_blocked = LKM_IVMODE;
 386        newlock->ml.node = node;
 387        newlock->ml.pad1 = 0;
 388        newlock->ml.list = 0;
 389        newlock->ml.flags = 0;
 390        newlock->ast = NULL;
 391        newlock->bast = NULL;
 392        newlock->astdata = NULL;
 393        newlock->ml.cookie = cpu_to_be64(cookie);
 394        newlock->ast_pending = 0;
 395        newlock->bast_pending = 0;
 396        newlock->convert_pending = 0;
 397        newlock->lock_pending = 0;
 398        newlock->unlock_pending = 0;
 399        newlock->cancel_pending = 0;
 400        newlock->lksb_kernel_allocated = 0;
 401
 402        kref_init(&newlock->lock_refs);
 403}
 404
 405struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
 406                               struct dlm_lockstatus *lksb)
 407{
 408        struct dlm_lock *lock;
 409        int kernel_allocated = 0;
 410
 411        lock = kmem_cache_zalloc(dlm_lock_cache, GFP_NOFS);
 412        if (!lock)
 413                return NULL;
 414
 415        if (!lksb) {
 416                /* zero memory only if kernel-allocated */
 417                lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
 418                if (!lksb) {
 419                        kmem_cache_free(dlm_lock_cache, lock);
 420                        return NULL;
 421                }
 422                kernel_allocated = 1;
 423        }
 424
 425        dlm_init_lock(lock, type, node, cookie);
 426        if (kernel_allocated)
 427                lock->lksb_kernel_allocated = 1;
 428        lock->lksb = lksb;
 429        lksb->lockid = lock;
 430        return lock;
 431}
 432
 433/* handler for lock creation net message
 434 * locking:
 435 *   caller needs:  none
 436 *   taken:         takes and drops res->spinlock
 437 *   held on exit:  none
 438 * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED
 439 */
 440int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data,
 441                            void **ret_data)
 442{
 443        struct dlm_ctxt *dlm = data;
 444        struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf;
 445        struct dlm_lock_resource *res = NULL;
 446        struct dlm_lock *newlock = NULL;
 447        struct dlm_lockstatus *lksb = NULL;
 448        enum dlm_status status = DLM_NORMAL;
 449        char *name;
 450        unsigned int namelen;
 451
 452        BUG_ON(!dlm);
 453
 454        if (!dlm_grab(dlm))
 455                return DLM_REJECTED;
 456
 457        name = create->name;
 458        namelen = create->namelen;
 459        status = DLM_REJECTED;
 460        if (!dlm_domain_fully_joined(dlm)) {
 461                mlog(ML_ERROR, "Domain %s not fully joined, but node %u is "
 462                     "sending a create_lock message for lock %.*s!\n",
 463                     dlm->name, create->node_idx, namelen, name);
 464                dlm_error(status);
 465                goto leave;
 466        }
 467
 468        status = DLM_IVBUFLEN;
 469        if (namelen > DLM_LOCKID_NAME_MAX) {
 470                dlm_error(status);
 471                goto leave;
 472        }
 473
 474        status = DLM_SYSERR;
 475        newlock = dlm_new_lock(create->requested_type,
 476                               create->node_idx,
 477                               be64_to_cpu(create->cookie), NULL);
 478        if (!newlock) {
 479                dlm_error(status);
 480                goto leave;
 481        }
 482
 483        lksb = newlock->lksb;
 484
 485        if (be32_to_cpu(create->flags) & LKM_GET_LVB) {
 486                lksb->flags |= DLM_LKSB_GET_LVB;
 487                mlog(0, "set DLM_LKSB_GET_LVB flag\n");
 488        }
 489
 490        status = DLM_IVLOCKID;
 491        res = dlm_lookup_lockres(dlm, name, namelen);
 492        if (!res) {
 493                dlm_error(status);
 494                goto leave;
 495        }
 496
 497        spin_lock(&res->spinlock);
 498        status = __dlm_lockres_state_to_status(res);
 499        spin_unlock(&res->spinlock);
 500
 501        if (status != DLM_NORMAL) {
 502                mlog(0, "lockres recovering/migrating/in-progress\n");
 503                goto leave;
 504        }
 505
 506        dlm_lock_attach_lockres(newlock, res);
 507
 508        status = dlmlock_master(dlm, res, newlock, be32_to_cpu(create->flags));
 509leave:
 510        if (status != DLM_NORMAL)
 511                if (newlock)
 512                        dlm_lock_put(newlock);
 513
 514        if (res)
 515                dlm_lockres_put(res);
 516
 517        dlm_put(dlm);
 518
 519        return status;
 520}
 521
 522
 523/* fetch next node-local (u8 nodenum + u56 cookie) into u64 */
 524static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie)
 525{
 526        u64 tmpnode = node_num;
 527
 528        /* shift single byte of node num into top 8 bits */
 529        tmpnode <<= 56;
 530
 531        spin_lock(&dlm_cookie_lock);
 532        *cookie = (dlm_next_cookie | tmpnode);
 533        if (++dlm_next_cookie & 0xff00000000000000ull) {
 534                mlog(0, "This node's cookie will now wrap!\n");
 535                dlm_next_cookie = 1;
 536        }
 537        spin_unlock(&dlm_cookie_lock);
 538}
 539
 540enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode,
 541                        struct dlm_lockstatus *lksb, int flags,
 542                        const char *name, int namelen, dlm_astlockfunc_t *ast,
 543                        void *data, dlm_bastlockfunc_t *bast)
 544{
 545        enum dlm_status status;
 546        struct dlm_lock_resource *res = NULL;
 547        struct dlm_lock *lock = NULL;
 548        int convert = 0, recovery = 0;
 549
 550        /* yes this function is a mess.
 551         * TODO: clean this up.  lots of common code in the
 552         *       lock and convert paths, especially in the retry blocks */
 553        if (!lksb) {
 554                dlm_error(DLM_BADARGS);
 555                return DLM_BADARGS;
 556        }
 557
 558        status = DLM_BADPARAM;
 559        if (mode != LKM_EXMODE && mode != LKM_PRMODE && mode != LKM_NLMODE) {
 560                dlm_error(status);
 561                goto error;
 562        }
 563
 564        if (flags & ~LKM_VALID_FLAGS) {
 565                dlm_error(status);
 566                goto error;
 567        }
 568
 569        convert = (flags & LKM_CONVERT);
 570        recovery = (flags & LKM_RECOVERY);
 571
 572        if (recovery &&
 573            (!dlm_is_recovery_lock(name, namelen) || convert) ) {
 574                dlm_error(status);
 575                goto error;
 576        }
 577        if (convert && (flags & LKM_LOCAL)) {
 578                mlog(ML_ERROR, "strange LOCAL convert request!\n");
 579                goto error;
 580        }
 581
 582        if (convert) {
 583                /* CONVERT request */
 584
 585                /* if converting, must pass in a valid dlm_lock */
 586                lock = lksb->lockid;
 587                if (!lock) {
 588                        mlog(ML_ERROR, "NULL lock pointer in convert "
 589                             "request\n");
 590                        goto error;
 591                }
 592
 593                res = lock->lockres;
 594                if (!res) {
 595                        mlog(ML_ERROR, "NULL lockres pointer in convert "
 596                             "request\n");
 597                        goto error;
 598                }
 599                dlm_lockres_get(res);
 600
 601                /* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are
 602                 * static after the original lock call.  convert requests will
 603                 * ensure that everything is the same, or return DLM_BADARGS.
 604                 * this means that DLM_DENIED_NOASTS will never be returned.
 605                 */
 606                if (lock->lksb != lksb || lock->ast != ast ||
 607                    lock->bast != bast || lock->astdata != data) {
 608                        status = DLM_BADARGS;
 609                        mlog(ML_ERROR, "new args:  lksb=%p, ast=%p, bast=%p, "
 610                             "astdata=%p\n", lksb, ast, bast, data);
 611                        mlog(ML_ERROR, "orig args: lksb=%p, ast=%p, bast=%p, "
 612                             "astdata=%p\n", lock->lksb, lock->ast,
 613                             lock->bast, lock->astdata);
 614                        goto error;
 615                }
 616retry_convert:
 617                dlm_wait_for_recovery(dlm);
 618
 619                if (res->owner == dlm->node_num)
 620                        status = dlmconvert_master(dlm, res, lock, flags, mode);
 621                else
 622                        status = dlmconvert_remote(dlm, res, lock, flags, mode);
 623                if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
 624                    status == DLM_FORWARD) {
 625                        /* for now, see how this works without sleeping
 626                         * and just retry right away.  I suspect the reco
 627                         * or migration will complete fast enough that
 628                         * no waiting will be necessary */
 629                        mlog(0, "retrying convert with migration/recovery/"
 630                             "in-progress\n");
 631                        msleep(100);
 632                        goto retry_convert;
 633                }
 634        } else {
 635                u64 tmpcookie;
 636
 637                /* LOCK request */
 638                status = DLM_BADARGS;
 639                if (!name) {
 640                        dlm_error(status);
 641                        goto error;
 642                }
 643
 644                status = DLM_IVBUFLEN;
 645                if (namelen > DLM_LOCKID_NAME_MAX || namelen < 1) {
 646                        dlm_error(status);
 647                        goto error;
 648                }
 649
 650                dlm_get_next_cookie(dlm->node_num, &tmpcookie);
 651                lock = dlm_new_lock(mode, dlm->node_num, tmpcookie, lksb);
 652                if (!lock) {
 653                        dlm_error(status);
 654                        goto error;
 655                }
 656
 657                if (!recovery)
 658                        dlm_wait_for_recovery(dlm);
 659
 660                /* find or create the lock resource */
 661                res = dlm_get_lock_resource(dlm, name, namelen, flags);
 662                if (!res) {
 663                        status = DLM_IVLOCKID;
 664                        dlm_error(status);
 665                        goto error;
 666                }
 667
 668                mlog(0, "type=%d, flags = 0x%x\n", mode, flags);
 669                mlog(0, "creating lock: lock=%p res=%p\n", lock, res);
 670
 671                dlm_lock_attach_lockres(lock, res);
 672                lock->ast = ast;
 673                lock->bast = bast;
 674                lock->astdata = data;
 675
 676retry_lock:
 677                if (flags & LKM_VALBLK) {
 678                        mlog(0, "LKM_VALBLK passed by caller\n");
 679
 680                        /* LVB requests for non PR, PW or EX locks are
 681                         * ignored. */
 682                        if (mode < LKM_PRMODE)
 683                                flags &= ~LKM_VALBLK;
 684                        else {
 685                                flags |= LKM_GET_LVB;
 686                                lock->lksb->flags |= DLM_LKSB_GET_LVB;
 687                        }
 688                }
 689
 690                if (res->owner == dlm->node_num)
 691                        status = dlmlock_master(dlm, res, lock, flags);
 692                else
 693                        status = dlmlock_remote(dlm, res, lock, flags);
 694
 695                if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
 696                    status == DLM_FORWARD) {
 697                        msleep(100);
 698                        if (recovery) {
 699                                if (status != DLM_RECOVERING)
 700                                        goto retry_lock;
 701                                /* wait to see the node go down, then
 702                                 * drop down and allow the lockres to
 703                                 * get cleaned up.  need to remaster. */
 704                                dlm_wait_for_node_death(dlm, res->owner,
 705                                                DLM_NODE_DEATH_WAIT_MAX);
 706                        } else {
 707                                dlm_wait_for_recovery(dlm);
 708                                goto retry_lock;
 709                        }
 710                }
 711
 712                /* Inflight taken in dlm_get_lock_resource() is dropped here */
 713                spin_lock(&res->spinlock);
 714                dlm_lockres_drop_inflight_ref(dlm, res);
 715                spin_unlock(&res->spinlock);
 716
 717                dlm_lockres_calc_usage(dlm, res);
 718                dlm_kick_thread(dlm, res);
 719
 720                if (status != DLM_NORMAL) {
 721                        lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
 722                        if (status != DLM_NOTQUEUED)
 723                                dlm_error(status);
 724                        goto error;
 725                }
 726        }
 727
 728error:
 729        if (status != DLM_NORMAL) {
 730                if (lock && !convert)
 731                        dlm_lock_put(lock);
 732                // this is kind of unnecessary
 733                lksb->status = status;
 734        }
 735
 736        /* put lockres ref from the convert path
 737         * or from dlm_get_lock_resource */
 738        if (res)
 739                dlm_lockres_put(res);
 740
 741        return status;
 742}
 743EXPORT_SYMBOL_GPL(dlmlock);
 744