linux/fs/ocfs2/dlmfs/userdlm.c
<<
>>
Prefs
   1/* -*- mode: c; c-basic-offset: 8; -*-
   2 * vim: noexpandtab sw=8 ts=8 sts=0:
   3 *
   4 * userdlm.c
   5 *
   6 * Code which implements the kernel side of a minimal userspace
   7 * interface to our DLM.
   8 *
   9 * Many of the functions here are pared down versions of dlmglue.c
  10 * functions.
  11 *
  12 * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
  13 *
  14 * This program is free software; you can redistribute it and/or
  15 * modify it under the terms of the GNU General Public
  16 * License as published by the Free Software Foundation; either
  17 * version 2 of the License, or (at your option) any later version.
  18 *
  19 * This program is distributed in the hope that it will be useful,
  20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  22 * General Public License for more details.
  23 *
  24 * You should have received a copy of the GNU General Public
  25 * License along with this program; if not, write to the
  26 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  27 * Boston, MA 021110-1307, USA.
  28 */
  29
  30#include <linux/signal.h>
  31#include <linux/sched/signal.h>
  32
  33#include <linux/module.h>
  34#include <linux/fs.h>
  35#include <linux/types.h>
  36#include <linux/crc32.h>
  37
  38#include "ocfs2_lockingver.h"
  39#include "stackglue.h"
  40#include "userdlm.h"
  41
  42#define MLOG_MASK_PREFIX ML_DLMFS
  43#include "cluster/masklog.h"
  44
  45
  46static inline struct user_lock_res *user_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
  47{
  48        return container_of(lksb, struct user_lock_res, l_lksb);
  49}
  50
  51static inline int user_check_wait_flag(struct user_lock_res *lockres,
  52                                       int flag)
  53{
  54        int ret;
  55
  56        spin_lock(&lockres->l_lock);
  57        ret = lockres->l_flags & flag;
  58        spin_unlock(&lockres->l_lock);
  59
  60        return ret;
  61}
  62
  63static inline void user_wait_on_busy_lock(struct user_lock_res *lockres)
  64
  65{
  66        wait_event(lockres->l_event,
  67                   !user_check_wait_flag(lockres, USER_LOCK_BUSY));
  68}
  69
  70static inline void user_wait_on_blocked_lock(struct user_lock_res *lockres)
  71
  72{
  73        wait_event(lockres->l_event,
  74                   !user_check_wait_flag(lockres, USER_LOCK_BLOCKED));
  75}
  76
  77/* I heart container_of... */
  78static inline struct ocfs2_cluster_connection *
  79cluster_connection_from_user_lockres(struct user_lock_res *lockres)
  80{
  81        struct dlmfs_inode_private *ip;
  82
  83        ip = container_of(lockres,
  84                          struct dlmfs_inode_private,
  85                          ip_lockres);
  86        return ip->ip_conn;
  87}
  88
  89static struct inode *
  90user_dlm_inode_from_user_lockres(struct user_lock_res *lockres)
  91{
  92        struct dlmfs_inode_private *ip;
  93
  94        ip = container_of(lockres,
  95                          struct dlmfs_inode_private,
  96                          ip_lockres);
  97        return &ip->ip_vfs_inode;
  98}
  99
 100static inline void user_recover_from_dlm_error(struct user_lock_res *lockres)
 101{
 102        spin_lock(&lockres->l_lock);
 103        lockres->l_flags &= ~USER_LOCK_BUSY;
 104        spin_unlock(&lockres->l_lock);
 105}
 106
 107#define user_log_dlm_error(_func, _stat, _lockres) do {                 \
 108        mlog(ML_ERROR, "Dlm error %d while calling %s on "              \
 109                "resource %.*s\n", _stat, _func,                        \
 110                _lockres->l_namelen, _lockres->l_name);                 \
 111} while (0)
 112
 113/* WARNING: This function lives in a world where the only three lock
 114 * levels are EX, PR, and NL. It *will* have to be adjusted when more
 115 * lock types are added. */
 116static inline int user_highest_compat_lock_level(int level)
 117{
 118        int new_level = DLM_LOCK_EX;
 119
 120        if (level == DLM_LOCK_EX)
 121                new_level = DLM_LOCK_NL;
 122        else if (level == DLM_LOCK_PR)
 123                new_level = DLM_LOCK_PR;
 124        return new_level;
 125}
 126
 127static void user_ast(struct ocfs2_dlm_lksb *lksb)
 128{
 129        struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
 130        int status;
 131
 132        mlog(ML_BASTS, "AST fired for lockres %.*s, level %d => %d\n",
 133             lockres->l_namelen, lockres->l_name, lockres->l_level,
 134             lockres->l_requested);
 135
 136        spin_lock(&lockres->l_lock);
 137
 138        status = ocfs2_dlm_lock_status(&lockres->l_lksb);
 139        if (status) {
 140                mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n",
 141                     status, lockres->l_namelen, lockres->l_name);
 142                spin_unlock(&lockres->l_lock);
 143                return;
 144        }
 145
 146        mlog_bug_on_msg(lockres->l_requested == DLM_LOCK_IV,
 147                        "Lockres %.*s, requested ivmode. flags 0x%x\n",
 148                        lockres->l_namelen, lockres->l_name, lockres->l_flags);
 149
 150        /* we're downconverting. */
 151        if (lockres->l_requested < lockres->l_level) {
 152                if (lockres->l_requested <=
 153                    user_highest_compat_lock_level(lockres->l_blocking)) {
 154                        lockres->l_blocking = DLM_LOCK_NL;
 155                        lockres->l_flags &= ~USER_LOCK_BLOCKED;
 156                }
 157        }
 158
 159        lockres->l_level = lockres->l_requested;
 160        lockres->l_requested = DLM_LOCK_IV;
 161        lockres->l_flags |= USER_LOCK_ATTACHED;
 162        lockres->l_flags &= ~USER_LOCK_BUSY;
 163
 164        spin_unlock(&lockres->l_lock);
 165
 166        wake_up(&lockres->l_event);
 167}
 168
 169static inline void user_dlm_grab_inode_ref(struct user_lock_res *lockres)
 170{
 171        struct inode *inode;
 172        inode = user_dlm_inode_from_user_lockres(lockres);
 173        if (!igrab(inode))
 174                BUG();
 175}
 176
 177static void user_dlm_unblock_lock(struct work_struct *work);
 178
 179static void __user_dlm_queue_lockres(struct user_lock_res *lockres)
 180{
 181        if (!(lockres->l_flags & USER_LOCK_QUEUED)) {
 182                user_dlm_grab_inode_ref(lockres);
 183
 184                INIT_WORK(&lockres->l_work, user_dlm_unblock_lock);
 185
 186                queue_work(user_dlm_worker, &lockres->l_work);
 187                lockres->l_flags |= USER_LOCK_QUEUED;
 188        }
 189}
 190
 191static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres)
 192{
 193        int queue = 0;
 194
 195        if (!(lockres->l_flags & USER_LOCK_BLOCKED))
 196                return;
 197
 198        switch (lockres->l_blocking) {
 199        case DLM_LOCK_EX:
 200                if (!lockres->l_ex_holders && !lockres->l_ro_holders)
 201                        queue = 1;
 202                break;
 203        case DLM_LOCK_PR:
 204                if (!lockres->l_ex_holders)
 205                        queue = 1;
 206                break;
 207        default:
 208                BUG();
 209        }
 210
 211        if (queue)
 212                __user_dlm_queue_lockres(lockres);
 213}
 214
 215static void user_bast(struct ocfs2_dlm_lksb *lksb, int level)
 216{
 217        struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
 218
 219        mlog(ML_BASTS, "BAST fired for lockres %.*s, blocking %d, level %d\n",
 220             lockres->l_namelen, lockres->l_name, level, lockres->l_level);
 221
 222        spin_lock(&lockres->l_lock);
 223        lockres->l_flags |= USER_LOCK_BLOCKED;
 224        if (level > lockres->l_blocking)
 225                lockres->l_blocking = level;
 226
 227        __user_dlm_queue_lockres(lockres);
 228        spin_unlock(&lockres->l_lock);
 229
 230        wake_up(&lockres->l_event);
 231}
 232
 233static void user_unlock_ast(struct ocfs2_dlm_lksb *lksb, int status)
 234{
 235        struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
 236
 237        mlog(ML_BASTS, "UNLOCK AST fired for lockres %.*s, flags 0x%x\n",
 238             lockres->l_namelen, lockres->l_name, lockres->l_flags);
 239
 240        if (status)
 241                mlog(ML_ERROR, "dlm returns status %d\n", status);
 242
 243        spin_lock(&lockres->l_lock);
 244        /* The teardown flag gets set early during the unlock process,
 245         * so test the cancel flag to make sure that this ast isn't
 246         * for a concurrent cancel. */
 247        if (lockres->l_flags & USER_LOCK_IN_TEARDOWN
 248            && !(lockres->l_flags & USER_LOCK_IN_CANCEL)) {
 249                lockres->l_level = DLM_LOCK_IV;
 250        } else if (status == DLM_CANCELGRANT) {
 251                /* We tried to cancel a convert request, but it was
 252                 * already granted. Don't clear the busy flag - the
 253                 * ast should've done this already. */
 254                BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
 255                lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
 256                goto out_noclear;
 257        } else {
 258                BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
 259                /* Cancel succeeded, we want to re-queue */
 260                lockres->l_requested = DLM_LOCK_IV; /* cancel an
 261                                                    * upconvert
 262                                                    * request. */
 263                lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
 264                /* we want the unblock thread to look at it again
 265                 * now. */
 266                if (lockres->l_flags & USER_LOCK_BLOCKED)
 267                        __user_dlm_queue_lockres(lockres);
 268        }
 269
 270        lockres->l_flags &= ~USER_LOCK_BUSY;
 271out_noclear:
 272        spin_unlock(&lockres->l_lock);
 273
 274        wake_up(&lockres->l_event);
 275}
 276
 277/*
 278 * This is the userdlmfs locking protocol version.
 279 *
 280 * See fs/ocfs2/dlmglue.c for more details on locking versions.
 281 */
 282static struct ocfs2_locking_protocol user_dlm_lproto = {
 283        .lp_max_version = {
 284                .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
 285                .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
 286        },
 287        .lp_lock_ast            = user_ast,
 288        .lp_blocking_ast        = user_bast,
 289        .lp_unlock_ast          = user_unlock_ast,
 290};
 291
 292static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres)
 293{
 294        struct inode *inode;
 295        inode = user_dlm_inode_from_user_lockres(lockres);
 296        iput(inode);
 297}
 298
 299static void user_dlm_unblock_lock(struct work_struct *work)
 300{
 301        int new_level, status;
 302        struct user_lock_res *lockres =
 303                container_of(work, struct user_lock_res, l_work);
 304        struct ocfs2_cluster_connection *conn =
 305                cluster_connection_from_user_lockres(lockres);
 306
 307        mlog(0, "lockres %.*s\n", lockres->l_namelen, lockres->l_name);
 308
 309        spin_lock(&lockres->l_lock);
 310
 311        mlog_bug_on_msg(!(lockres->l_flags & USER_LOCK_QUEUED),
 312                        "Lockres %.*s, flags 0x%x\n",
 313                        lockres->l_namelen, lockres->l_name, lockres->l_flags);
 314
 315        /* notice that we don't clear USER_LOCK_BLOCKED here. If it's
 316         * set, we want user_ast clear it. */
 317        lockres->l_flags &= ~USER_LOCK_QUEUED;
 318
 319        /* It's valid to get here and no longer be blocked - if we get
 320         * several basts in a row, we might be queued by the first
 321         * one, the unblock thread might run and clear the queued
 322         * flag, and finally we might get another bast which re-queues
 323         * us before our ast for the downconvert is called. */
 324        if (!(lockres->l_flags & USER_LOCK_BLOCKED)) {
 325                mlog(ML_BASTS, "lockres %.*s USER_LOCK_BLOCKED\n",
 326                     lockres->l_namelen, lockres->l_name);
 327                spin_unlock(&lockres->l_lock);
 328                goto drop_ref;
 329        }
 330
 331        if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
 332                mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_TEARDOWN\n",
 333                     lockres->l_namelen, lockres->l_name);
 334                spin_unlock(&lockres->l_lock);
 335                goto drop_ref;
 336        }
 337
 338        if (lockres->l_flags & USER_LOCK_BUSY) {
 339                if (lockres->l_flags & USER_LOCK_IN_CANCEL) {
 340                        mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_CANCEL\n",
 341                             lockres->l_namelen, lockres->l_name);
 342                        spin_unlock(&lockres->l_lock);
 343                        goto drop_ref;
 344                }
 345
 346                lockres->l_flags |= USER_LOCK_IN_CANCEL;
 347                spin_unlock(&lockres->l_lock);
 348
 349                status = ocfs2_dlm_unlock(conn, &lockres->l_lksb,
 350                                          DLM_LKF_CANCEL);
 351                if (status)
 352                        user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
 353                goto drop_ref;
 354        }
 355
 356        /* If there are still incompat holders, we can exit safely
 357         * without worrying about re-queueing this lock as that will
 358         * happen on the last call to user_cluster_unlock. */
 359        if ((lockres->l_blocking == DLM_LOCK_EX)
 360            && (lockres->l_ex_holders || lockres->l_ro_holders)) {
 361                spin_unlock(&lockres->l_lock);
 362                mlog(ML_BASTS, "lockres %.*s, EX/PR Holders %u,%u\n",
 363                     lockres->l_namelen, lockres->l_name,
 364                     lockres->l_ex_holders, lockres->l_ro_holders);
 365                goto drop_ref;
 366        }
 367
 368        if ((lockres->l_blocking == DLM_LOCK_PR)
 369            && lockres->l_ex_holders) {
 370                spin_unlock(&lockres->l_lock);
 371                mlog(ML_BASTS, "lockres %.*s, EX Holders %u\n",
 372                     lockres->l_namelen, lockres->l_name,
 373                     lockres->l_ex_holders);
 374                goto drop_ref;
 375        }
 376
 377        /* yay, we can downconvert now. */
 378        new_level = user_highest_compat_lock_level(lockres->l_blocking);
 379        lockres->l_requested = new_level;
 380        lockres->l_flags |= USER_LOCK_BUSY;
 381        mlog(ML_BASTS, "lockres %.*s, downconvert %d => %d\n",
 382             lockres->l_namelen, lockres->l_name, lockres->l_level, new_level);
 383        spin_unlock(&lockres->l_lock);
 384
 385        /* need lock downconvert request now... */
 386        status = ocfs2_dlm_lock(conn, new_level, &lockres->l_lksb,
 387                                DLM_LKF_CONVERT|DLM_LKF_VALBLK,
 388                                lockres->l_name,
 389                                lockres->l_namelen);
 390        if (status) {
 391                user_log_dlm_error("ocfs2_dlm_lock", status, lockres);
 392                user_recover_from_dlm_error(lockres);
 393        }
 394
 395drop_ref:
 396        user_dlm_drop_inode_ref(lockres);
 397}
 398
 399static inline void user_dlm_inc_holders(struct user_lock_res *lockres,
 400                                        int level)
 401{
 402        switch(level) {
 403        case DLM_LOCK_EX:
 404                lockres->l_ex_holders++;
 405                break;
 406        case DLM_LOCK_PR:
 407                lockres->l_ro_holders++;
 408                break;
 409        default:
 410                BUG();
 411        }
 412}
 413
 414/* predict what lock level we'll be dropping down to on behalf
 415 * of another node, and return true if the currently wanted
 416 * level will be compatible with it. */
 417static inline int
 418user_may_continue_on_blocked_lock(struct user_lock_res *lockres,
 419                                  int wanted)
 420{
 421        BUG_ON(!(lockres->l_flags & USER_LOCK_BLOCKED));
 422
 423        return wanted <= user_highest_compat_lock_level(lockres->l_blocking);
 424}
 425
 426int user_dlm_cluster_lock(struct user_lock_res *lockres,
 427                          int level,
 428                          int lkm_flags)
 429{
 430        int status, local_flags;
 431        struct ocfs2_cluster_connection *conn =
 432                cluster_connection_from_user_lockres(lockres);
 433
 434        if (level != DLM_LOCK_EX &&
 435            level != DLM_LOCK_PR) {
 436                mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
 437                     lockres->l_namelen, lockres->l_name);
 438                status = -EINVAL;
 439                goto bail;
 440        }
 441
 442        mlog(ML_BASTS, "lockres %.*s, level %d, flags = 0x%x\n",
 443             lockres->l_namelen, lockres->l_name, level, lkm_flags);
 444
 445again:
 446        if (signal_pending(current)) {
 447                status = -ERESTARTSYS;
 448                goto bail;
 449        }
 450
 451        spin_lock(&lockres->l_lock);
 452
 453        /* We only compare against the currently granted level
 454         * here. If the lock is blocked waiting on a downconvert,
 455         * we'll get caught below. */
 456        if ((lockres->l_flags & USER_LOCK_BUSY) &&
 457            (level > lockres->l_level)) {
 458                /* is someone sitting in dlm_lock? If so, wait on
 459                 * them. */
 460                spin_unlock(&lockres->l_lock);
 461
 462                user_wait_on_busy_lock(lockres);
 463                goto again;
 464        }
 465
 466        if ((lockres->l_flags & USER_LOCK_BLOCKED) &&
 467            (!user_may_continue_on_blocked_lock(lockres, level))) {
 468                /* is the lock is currently blocked on behalf of
 469                 * another node */
 470                spin_unlock(&lockres->l_lock);
 471
 472                user_wait_on_blocked_lock(lockres);
 473                goto again;
 474        }
 475
 476        if (level > lockres->l_level) {
 477                local_flags = lkm_flags | DLM_LKF_VALBLK;
 478                if (lockres->l_level != DLM_LOCK_IV)
 479                        local_flags |= DLM_LKF_CONVERT;
 480
 481                lockres->l_requested = level;
 482                lockres->l_flags |= USER_LOCK_BUSY;
 483                spin_unlock(&lockres->l_lock);
 484
 485                BUG_ON(level == DLM_LOCK_IV);
 486                BUG_ON(level == DLM_LOCK_NL);
 487
 488                /* call dlm_lock to upgrade lock now */
 489                status = ocfs2_dlm_lock(conn, level, &lockres->l_lksb,
 490                                        local_flags, lockres->l_name,
 491                                        lockres->l_namelen);
 492                if (status) {
 493                        if ((lkm_flags & DLM_LKF_NOQUEUE) &&
 494                            (status != -EAGAIN))
 495                                user_log_dlm_error("ocfs2_dlm_lock",
 496                                                   status, lockres);
 497                        user_recover_from_dlm_error(lockres);
 498                        goto bail;
 499                }
 500
 501                user_wait_on_busy_lock(lockres);
 502                goto again;
 503        }
 504
 505        user_dlm_inc_holders(lockres, level);
 506        spin_unlock(&lockres->l_lock);
 507
 508        status = 0;
 509bail:
 510        return status;
 511}
 512
 513static inline void user_dlm_dec_holders(struct user_lock_res *lockres,
 514                                        int level)
 515{
 516        switch(level) {
 517        case DLM_LOCK_EX:
 518                BUG_ON(!lockres->l_ex_holders);
 519                lockres->l_ex_holders--;
 520                break;
 521        case DLM_LOCK_PR:
 522                BUG_ON(!lockres->l_ro_holders);
 523                lockres->l_ro_holders--;
 524                break;
 525        default:
 526                BUG();
 527        }
 528}
 529
 530void user_dlm_cluster_unlock(struct user_lock_res *lockres,
 531                             int level)
 532{
 533        if (level != DLM_LOCK_EX &&
 534            level != DLM_LOCK_PR) {
 535                mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
 536                     lockres->l_namelen, lockres->l_name);
 537                return;
 538        }
 539
 540        spin_lock(&lockres->l_lock);
 541        user_dlm_dec_holders(lockres, level);
 542        __user_dlm_cond_queue_lockres(lockres);
 543        spin_unlock(&lockres->l_lock);
 544}
 545
 546void user_dlm_write_lvb(struct inode *inode,
 547                        const char *val,
 548                        unsigned int len)
 549{
 550        struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
 551        char *lvb;
 552
 553        BUG_ON(len > DLM_LVB_LEN);
 554
 555        spin_lock(&lockres->l_lock);
 556
 557        BUG_ON(lockres->l_level < DLM_LOCK_EX);
 558        lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
 559        memcpy(lvb, val, len);
 560
 561        spin_unlock(&lockres->l_lock);
 562}
 563
 564ssize_t user_dlm_read_lvb(struct inode *inode,
 565                          char *val,
 566                          unsigned int len)
 567{
 568        struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
 569        char *lvb;
 570        ssize_t ret = len;
 571
 572        BUG_ON(len > DLM_LVB_LEN);
 573
 574        spin_lock(&lockres->l_lock);
 575
 576        BUG_ON(lockres->l_level < DLM_LOCK_PR);
 577        if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)) {
 578                lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
 579                memcpy(val, lvb, len);
 580        } else
 581                ret = 0;
 582
 583        spin_unlock(&lockres->l_lock);
 584        return ret;
 585}
 586
 587void user_dlm_lock_res_init(struct user_lock_res *lockres,
 588                            struct dentry *dentry)
 589{
 590        memset(lockres, 0, sizeof(*lockres));
 591
 592        spin_lock_init(&lockres->l_lock);
 593        init_waitqueue_head(&lockres->l_event);
 594        lockres->l_level = DLM_LOCK_IV;
 595        lockres->l_requested = DLM_LOCK_IV;
 596        lockres->l_blocking = DLM_LOCK_IV;
 597
 598        /* should have been checked before getting here. */
 599        BUG_ON(dentry->d_name.len >= USER_DLM_LOCK_ID_MAX_LEN);
 600
 601        memcpy(lockres->l_name,
 602               dentry->d_name.name,
 603               dentry->d_name.len);
 604        lockres->l_namelen = dentry->d_name.len;
 605}
 606
 607int user_dlm_destroy_lock(struct user_lock_res *lockres)
 608{
 609        int status = -EBUSY;
 610        struct ocfs2_cluster_connection *conn =
 611                cluster_connection_from_user_lockres(lockres);
 612
 613        mlog(ML_BASTS, "lockres %.*s\n", lockres->l_namelen, lockres->l_name);
 614
 615        spin_lock(&lockres->l_lock);
 616        if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
 617                spin_unlock(&lockres->l_lock);
 618                return 0;
 619        }
 620
 621        lockres->l_flags |= USER_LOCK_IN_TEARDOWN;
 622
 623        while (lockres->l_flags & USER_LOCK_BUSY) {
 624                spin_unlock(&lockres->l_lock);
 625
 626                user_wait_on_busy_lock(lockres);
 627
 628                spin_lock(&lockres->l_lock);
 629        }
 630
 631        if (lockres->l_ro_holders || lockres->l_ex_holders) {
 632                spin_unlock(&lockres->l_lock);
 633                goto bail;
 634        }
 635
 636        status = 0;
 637        if (!(lockres->l_flags & USER_LOCK_ATTACHED)) {
 638                spin_unlock(&lockres->l_lock);
 639                goto bail;
 640        }
 641
 642        lockres->l_flags &= ~USER_LOCK_ATTACHED;
 643        lockres->l_flags |= USER_LOCK_BUSY;
 644        spin_unlock(&lockres->l_lock);
 645
 646        status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, DLM_LKF_VALBLK);
 647        if (status) {
 648                user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
 649                goto bail;
 650        }
 651
 652        user_wait_on_busy_lock(lockres);
 653
 654        status = 0;
 655bail:
 656        return status;
 657}
 658
 659static void user_dlm_recovery_handler_noop(int node_num,
 660                                           void *recovery_data)
 661{
 662        /* We ignore recovery events */
 663        return;
 664}
 665
 666void user_dlm_set_locking_protocol(void)
 667{
 668        ocfs2_stack_glue_set_max_proto_version(&user_dlm_lproto.lp_max_version);
 669}
 670
 671struct ocfs2_cluster_connection *user_dlm_register(const struct qstr *name)
 672{
 673        int rc;
 674        struct ocfs2_cluster_connection *conn;
 675
 676        rc = ocfs2_cluster_connect_agnostic(name->name, name->len,
 677                                            &user_dlm_lproto,
 678                                            user_dlm_recovery_handler_noop,
 679                                            NULL, &conn);
 680        if (rc)
 681                mlog_errno(rc);
 682
 683        return rc ? ERR_PTR(rc) : conn;
 684}
 685
 686void user_dlm_unregister(struct ocfs2_cluster_connection *conn)
 687{
 688        ocfs2_cluster_disconnect(conn, 0);
 689}
 690