linux/fs/ocfs2/dlmglue.c
<<
>>
Prefs
   1/* -*- mode: c; c-basic-offset: 8; -*-
   2 * vim: noexpandtab sw=8 ts=8 sts=0:
   3 *
   4 * dlmglue.c
   5 *
   6 * Code which implements an OCFS2 specific interface to our DLM.
   7 *
   8 * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
   9 *
  10 * This program is free software; you can redistribute it and/or
  11 * modify it under the terms of the GNU General Public
  12 * License as published by the Free Software Foundation; either
  13 * version 2 of the License, or (at your option) any later version.
  14 *
  15 * This program is distributed in the hope that it will be useful,
  16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  18 * General Public License for more details.
  19 *
  20 * You should have received a copy of the GNU General Public
  21 * License along with this program; if not, write to the
  22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  23 * Boston, MA 021110-1307, USA.
  24 */
  25
  26#include <linux/types.h>
  27#include <linux/slab.h>
  28#include <linux/highmem.h>
  29#include <linux/mm.h>
  30#include <linux/crc32.h>
  31#include <linux/kthread.h>
  32#include <linux/pagemap.h>
  33#include <linux/debugfs.h>
  34#include <linux/seq_file.h>
  35
  36#include <cluster/heartbeat.h>
  37#include <cluster/nodemanager.h>
  38#include <cluster/tcp.h>
  39
  40#include <dlm/dlmapi.h>
  41
  42#define MLOG_MASK_PREFIX ML_DLM_GLUE
  43#include <cluster/masklog.h>
  44
  45#include "ocfs2.h"
  46
  47#include "alloc.h"
  48#include "dcache.h"
  49#include "dlmglue.h"
  50#include "extent_map.h"
  51#include "file.h"
  52#include "heartbeat.h"
  53#include "inode.h"
  54#include "journal.h"
  55#include "slot_map.h"
  56#include "super.h"
  57#include "uptodate.h"
  58#include "vote.h"
  59
  60#include "buffer_head_io.h"
  61
  62struct ocfs2_mask_waiter {
  63        struct list_head        mw_item;
  64        int                     mw_status;
  65        struct completion       mw_complete;
  66        unsigned long           mw_mask;
  67        unsigned long           mw_goal;
  68};
  69
  70static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
  71static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
  72
  73/*
  74 * Return value from ->downconvert_worker functions.
  75 *
  76 * These control the precise actions of ocfs2_unblock_lock()
  77 * and ocfs2_process_blocked_lock()
  78 *
  79 */
  80enum ocfs2_unblock_action {
  81        UNBLOCK_CONTINUE        = 0, /* Continue downconvert */
  82        UNBLOCK_CONTINUE_POST   = 1, /* Continue downconvert, fire
  83                                      * ->post_unlock callback */
  84        UNBLOCK_STOP_POST       = 2, /* Do not downconvert, fire
  85                                      * ->post_unlock() callback. */
  86};
  87
  88struct ocfs2_unblock_ctl {
  89        int requeue;
  90        enum ocfs2_unblock_action unblock_action;
  91};
  92
  93static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
  94                                        int new_level);
  95static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
  96
  97static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
  98                                     int blocking);
  99
 100static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
 101                                       int blocking);
 102
 103static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
 104                                     struct ocfs2_lock_res *lockres);
 105
 106
 107#define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
 108
 109/* This aids in debugging situations where a bad LVB might be involved. */
 110static void ocfs2_dump_meta_lvb_info(u64 level,
 111                                     const char *function,
 112                                     unsigned int line,
 113                                     struct ocfs2_lock_res *lockres)
 114{
 115        struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
 116
 117        mlog(level, "LVB information for %s (called from %s:%u):\n",
 118             lockres->l_name, function, line);
 119        mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
 120             lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
 121             be32_to_cpu(lvb->lvb_igeneration));
 122        mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
 123             (unsigned long long)be64_to_cpu(lvb->lvb_isize),
 124             be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
 125             be16_to_cpu(lvb->lvb_imode));
 126        mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
 127             "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
 128             (long long)be64_to_cpu(lvb->lvb_iatime_packed),
 129             (long long)be64_to_cpu(lvb->lvb_ictime_packed),
 130             (long long)be64_to_cpu(lvb->lvb_imtime_packed),
 131             be32_to_cpu(lvb->lvb_iattr));
 132}
 133
 134
 135/*
 136 * OCFS2 Lock Resource Operations
 137 *
 138 * These fine tune the behavior of the generic dlmglue locking infrastructure.
 139 *
 140 * The most basic of lock types can point ->l_priv to their respective
 141 * struct ocfs2_super and allow the default actions to manage things.
 142 *
 143 * Right now, each lock type also needs to implement an init function,
 144 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
 145 * should be called when the lock is no longer needed (i.e., object
 146 * destruction time).
 147 */
 148struct ocfs2_lock_res_ops {
 149        /*
 150         * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
 151         * this callback if ->l_priv is not an ocfs2_super pointer
 152         */
 153        struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
 154
 155        /*
 156         * Optionally called in the downconvert (or "vote") thread
 157         * after a successful downconvert. The lockres will not be
 158         * referenced after this callback is called, so it is safe to
 159         * free memory, etc.
 160         *
 161         * The exact semantics of when this is called are controlled
 162         * by ->downconvert_worker()
 163         */
 164        void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
 165
 166        /*
 167         * Allow a lock type to add checks to determine whether it is
 168         * safe to downconvert a lock. Return 0 to re-queue the
 169         * downconvert at a later time, nonzero to continue.
 170         *
 171         * For most locks, the default checks that there are no
 172         * incompatible holders are sufficient.
 173         *
 174         * Called with the lockres spinlock held.
 175         */
 176        int (*check_downconvert)(struct ocfs2_lock_res *, int);
 177
 178        /*
 179         * Allows a lock type to populate the lock value block. This
 180         * is called on downconvert, and when we drop a lock.
 181         *
 182         * Locks that want to use this should set LOCK_TYPE_USES_LVB
 183         * in the flags field.
 184         *
 185         * Called with the lockres spinlock held.
 186         */
 187        void (*set_lvb)(struct ocfs2_lock_res *);
 188
 189        /*
 190         * Called from the downconvert thread when it is determined
 191         * that a lock will be downconverted. This is called without
 192         * any locks held so the function can do work that might
 193         * schedule (syncing out data, etc).
 194         *
 195         * This should return any one of the ocfs2_unblock_action
 196         * values, depending on what it wants the thread to do.
 197         */
 198        int (*downconvert_worker)(struct ocfs2_lock_res *, int);
 199
 200        /*
 201         * LOCK_TYPE_* flags which describe the specific requirements
 202         * of a lock type. Descriptions of each individual flag follow.
 203         */
 204        int flags;
 205};
 206
 207/*
 208 * Some locks want to "refresh" potentially stale data when a
 209 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
 210 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
 211 * individual lockres l_flags member from the ast function. It is
 212 * expected that the locking wrapper will clear the
 213 * OCFS2_LOCK_NEEDS_REFRESH flag when done.
 214 */
 215#define LOCK_TYPE_REQUIRES_REFRESH 0x1
 216
 217/*
 218 * Indicate that a lock type makes use of the lock value block. The
 219 * ->set_lvb lock type callback must be defined.
 220 */
 221#define LOCK_TYPE_USES_LVB              0x2
 222
 223static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
 224        .get_osb        = ocfs2_get_inode_osb,
 225        .flags          = 0,
 226};
 227
 228static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
 229        .get_osb        = ocfs2_get_inode_osb,
 230        .check_downconvert = ocfs2_check_meta_downconvert,
 231        .set_lvb        = ocfs2_set_meta_lvb,
 232        .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
 233};
 234
 235static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
 236        .get_osb        = ocfs2_get_inode_osb,
 237        .downconvert_worker = ocfs2_data_convert_worker,
 238        .flags          = 0,
 239};
 240
 241static struct ocfs2_lock_res_ops ocfs2_super_lops = {
 242        .flags          = LOCK_TYPE_REQUIRES_REFRESH,
 243};
 244
 245static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
 246        .flags          = 0,
 247};
 248
 249static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
 250        .get_osb        = ocfs2_get_dentry_osb,
 251        .post_unlock    = ocfs2_dentry_post_unlock,
 252        .downconvert_worker = ocfs2_dentry_convert_worker,
 253        .flags          = 0,
 254};
 255
 256static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
 257        .get_osb        = ocfs2_get_inode_osb,
 258        .flags          = 0,
 259};
 260
 261static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
 262{
 263        return lockres->l_type == OCFS2_LOCK_TYPE_META ||
 264                lockres->l_type == OCFS2_LOCK_TYPE_DATA ||
 265                lockres->l_type == OCFS2_LOCK_TYPE_RW ||
 266                lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
 267}
 268
 269static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
 270{
 271        BUG_ON(!ocfs2_is_inode_lock(lockres));
 272
 273        return (struct inode *) lockres->l_priv;
 274}
 275
 276static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
 277{
 278        BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
 279
 280        return (struct ocfs2_dentry_lock *)lockres->l_priv;
 281}
 282
 283static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
 284{
 285        if (lockres->l_ops->get_osb)
 286                return lockres->l_ops->get_osb(lockres);
 287
 288        return (struct ocfs2_super *)lockres->l_priv;
 289}
 290
 291static int ocfs2_lock_create(struct ocfs2_super *osb,
 292                             struct ocfs2_lock_res *lockres,
 293                             int level,
 294                             int dlm_flags);
 295static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
 296                                                     int wanted);
 297static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
 298                                 struct ocfs2_lock_res *lockres,
 299                                 int level);
 300static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
 301static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
 302static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
 303static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
 304static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
 305                                        struct ocfs2_lock_res *lockres);
 306static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
 307                                                int convert);
 308#define ocfs2_log_dlm_error(_func, _stat, _lockres) do {        \
 309        mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "  \
 310                "resource %s: %s\n", dlm_errname(_stat), _func, \
 311                _lockres->l_name, dlm_errmsg(_stat));           \
 312} while (0)
 313static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
 314                                 struct ocfs2_lock_res *lockres);
 315static int ocfs2_meta_lock_update(struct inode *inode,
 316                                  struct buffer_head **bh);
 317static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
 318static inline int ocfs2_highest_compat_lock_level(int level);
 319
 320static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
 321                                  u64 blkno,
 322                                  u32 generation,
 323                                  char *name)
 324{
 325        int len;
 326
 327        mlog_entry_void();
 328
 329        BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
 330
 331        len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
 332                       ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
 333                       (long long)blkno, generation);
 334
 335        BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
 336
 337        mlog(0, "built lock resource with name: %s\n", name);
 338
 339        mlog_exit_void();
 340}
 341
 342static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
 343
 344static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
 345                                       struct ocfs2_dlm_debug *dlm_debug)
 346{
 347        mlog(0, "Add tracking for lockres %s\n", res->l_name);
 348
 349        spin_lock(&ocfs2_dlm_tracking_lock);
 350        list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
 351        spin_unlock(&ocfs2_dlm_tracking_lock);
 352}
 353
 354static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
 355{
 356        spin_lock(&ocfs2_dlm_tracking_lock);
 357        if (!list_empty(&res->l_debug_list))
 358                list_del_init(&res->l_debug_list);
 359        spin_unlock(&ocfs2_dlm_tracking_lock);
 360}
 361
 362static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
 363                                       struct ocfs2_lock_res *res,
 364                                       enum ocfs2_lock_type type,
 365                                       struct ocfs2_lock_res_ops *ops,
 366                                       void *priv)
 367{
 368        res->l_type          = type;
 369        res->l_ops           = ops;
 370        res->l_priv          = priv;
 371
 372        res->l_level         = LKM_IVMODE;
 373        res->l_requested     = LKM_IVMODE;
 374        res->l_blocking      = LKM_IVMODE;
 375        res->l_action        = OCFS2_AST_INVALID;
 376        res->l_unlock_action = OCFS2_UNLOCK_INVALID;
 377
 378        res->l_flags         = OCFS2_LOCK_INITIALIZED;
 379
 380        ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
 381}
 382
 383void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
 384{
 385        /* This also clears out the lock status block */
 386        memset(res, 0, sizeof(struct ocfs2_lock_res));
 387        spin_lock_init(&res->l_lock);
 388        init_waitqueue_head(&res->l_event);
 389        INIT_LIST_HEAD(&res->l_blocked_list);
 390        INIT_LIST_HEAD(&res->l_mask_waiters);
 391}
 392
 393void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
 394                               enum ocfs2_lock_type type,
 395                               unsigned int generation,
 396                               struct inode *inode)
 397{
 398        struct ocfs2_lock_res_ops *ops;
 399
 400        switch(type) {
 401                case OCFS2_LOCK_TYPE_RW:
 402                        ops = &ocfs2_inode_rw_lops;
 403                        break;
 404                case OCFS2_LOCK_TYPE_META:
 405                        ops = &ocfs2_inode_meta_lops;
 406                        break;
 407                case OCFS2_LOCK_TYPE_DATA:
 408                        ops = &ocfs2_inode_data_lops;
 409                        break;
 410                case OCFS2_LOCK_TYPE_OPEN:
 411                        ops = &ocfs2_inode_open_lops;
 412                        break;
 413                default:
 414                        mlog_bug_on_msg(1, "type: %d\n", type);
 415                        ops = NULL; /* thanks, gcc */
 416                        break;
 417        };
 418
 419        ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
 420                              generation, res->l_name);
 421        ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
 422}
 423
 424static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
 425{
 426        struct inode *inode = ocfs2_lock_res_inode(lockres);
 427
 428        return OCFS2_SB(inode->i_sb);
 429}
 430
 431static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
 432{
 433        __be64 inode_blkno_be;
 434
 435        memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
 436               sizeof(__be64));
 437
 438        return be64_to_cpu(inode_blkno_be);
 439}
 440
 441static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
 442{
 443        struct ocfs2_dentry_lock *dl = lockres->l_priv;
 444
 445        return OCFS2_SB(dl->dl_inode->i_sb);
 446}
 447
 448void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
 449                                u64 parent, struct inode *inode)
 450{
 451        int len;
 452        u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
 453        __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
 454        struct ocfs2_lock_res *lockres = &dl->dl_lockres;
 455
 456        ocfs2_lock_res_init_once(lockres);
 457
 458        /*
 459         * Unfortunately, the standard lock naming scheme won't work
 460         * here because we have two 16 byte values to use. Instead,
 461         * we'll stuff the inode number as a binary value. We still
 462         * want error prints to show something without garbling the
 463         * display, so drop a null byte in there before the inode
 464         * number. A future version of OCFS2 will likely use all
 465         * binary lock names. The stringified names have been a
 466         * tremendous aid in debugging, but now that the debugfs
 467         * interface exists, we can mangle things there if need be.
 468         *
 469         * NOTE: We also drop the standard "pad" value (the total lock
 470         * name size stays the same though - the last part is all
 471         * zeros due to the memset in ocfs2_lock_res_init_once()
 472         */
 473        len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
 474                       "%c%016llx",
 475                       ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
 476                       (long long)parent);
 477
 478        BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
 479
 480        memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
 481               sizeof(__be64));
 482
 483        ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
 484                                   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
 485                                   dl);
 486}
 487
 488static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
 489                                      struct ocfs2_super *osb)
 490{
 491        /* Superblock lockres doesn't come from a slab so we call init
 492         * once on it manually.  */
 493        ocfs2_lock_res_init_once(res);
 494        ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
 495                              0, res->l_name);
 496        ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
 497                                   &ocfs2_super_lops, osb);
 498}
 499
 500static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
 501                                       struct ocfs2_super *osb)
 502{
 503        /* Rename lockres doesn't come from a slab so we call init
 504         * once on it manually.  */
 505        ocfs2_lock_res_init_once(res);
 506        ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
 507        ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
 508                                   &ocfs2_rename_lops, osb);
 509}
 510
 511void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
 512{
 513        mlog_entry_void();
 514
 515        if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
 516                return;
 517
 518        ocfs2_remove_lockres_tracking(res);
 519
 520        mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
 521                        "Lockres %s is on the blocked list\n",
 522                        res->l_name);
 523        mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
 524                        "Lockres %s has mask waiters pending\n",
 525                        res->l_name);
 526        mlog_bug_on_msg(spin_is_locked(&res->l_lock),
 527                        "Lockres %s is locked\n",
 528                        res->l_name);
 529        mlog_bug_on_msg(res->l_ro_holders,
 530                        "Lockres %s has %u ro holders\n",
 531                        res->l_name, res->l_ro_holders);
 532        mlog_bug_on_msg(res->l_ex_holders,
 533                        "Lockres %s has %u ex holders\n",
 534                        res->l_name, res->l_ex_holders);
 535
 536        /* Need to clear out the lock status block for the dlm */
 537        memset(&res->l_lksb, 0, sizeof(res->l_lksb));
 538
 539        res->l_flags = 0UL;
 540        mlog_exit_void();
 541}
 542
 543static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
 544                                     int level)
 545{
 546        mlog_entry_void();
 547
 548        BUG_ON(!lockres);
 549
 550        switch(level) {
 551        case LKM_EXMODE:
 552                lockres->l_ex_holders++;
 553                break;
 554        case LKM_PRMODE:
 555                lockres->l_ro_holders++;
 556                break;
 557        default:
 558                BUG();
 559        }
 560
 561        mlog_exit_void();
 562}
 563
 564static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
 565                                     int level)
 566{
 567        mlog_entry_void();
 568
 569        BUG_ON(!lockres);
 570
 571        switch(level) {
 572        case LKM_EXMODE:
 573                BUG_ON(!lockres->l_ex_holders);
 574                lockres->l_ex_holders--;
 575                break;
 576        case LKM_PRMODE:
 577                BUG_ON(!lockres->l_ro_holders);
 578                lockres->l_ro_holders--;
 579                break;
 580        default:
 581                BUG();
 582        }
 583        mlog_exit_void();
 584}
 585
 586/* WARNING: This function lives in a world where the only three lock
 587 * levels are EX, PR, and NL. It *will* have to be adjusted when more
 588 * lock types are added. */
 589static inline int ocfs2_highest_compat_lock_level(int level)
 590{
 591        int new_level = LKM_EXMODE;
 592
 593        if (level == LKM_EXMODE)
 594                new_level = LKM_NLMODE;
 595        else if (level == LKM_PRMODE)
 596                new_level = LKM_PRMODE;
 597        return new_level;
 598}
 599
 600static void lockres_set_flags(struct ocfs2_lock_res *lockres,
 601                              unsigned long newflags)
 602{
 603        struct ocfs2_mask_waiter *mw, *tmp;
 604
 605        assert_spin_locked(&lockres->l_lock);
 606
 607        lockres->l_flags = newflags;
 608
 609        list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
 610                if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
 611                        continue;
 612
 613                list_del_init(&mw->mw_item);
 614                mw->mw_status = 0;
 615                complete(&mw->mw_complete);
 616        }
 617}
 618static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
 619{
 620        lockres_set_flags(lockres, lockres->l_flags | or);
 621}
 622static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
 623                                unsigned long clear)
 624{
 625        lockres_set_flags(lockres, lockres->l_flags & ~clear);
 626}
 627
 628static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
 629{
 630        mlog_entry_void();
 631
 632        BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
 633        BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
 634        BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
 635        BUG_ON(lockres->l_blocking <= LKM_NLMODE);
 636
 637        lockres->l_level = lockres->l_requested;
 638        if (lockres->l_level <=
 639            ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
 640                lockres->l_blocking = LKM_NLMODE;
 641                lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
 642        }
 643        lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
 644
 645        mlog_exit_void();
 646}
 647
 648static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
 649{
 650        mlog_entry_void();
 651
 652        BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
 653        BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
 654
 655        /* Convert from RO to EX doesn't really need anything as our
 656         * information is already up to data. Convert from NL to
 657         * *anything* however should mark ourselves as needing an
 658         * update */
 659        if (lockres->l_level == LKM_NLMODE &&
 660            lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
 661                lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
 662
 663        lockres->l_level = lockres->l_requested;
 664        lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
 665
 666        mlog_exit_void();
 667}
 668
 669static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
 670{
 671        mlog_entry_void();
 672
 673        BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
 674        BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
 675
 676        if (lockres->l_requested > LKM_NLMODE &&
 677            !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
 678            lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
 679                lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
 680
 681        lockres->l_level = lockres->l_requested;
 682        lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
 683        lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
 684
 685        mlog_exit_void();
 686}
 687
 688static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
 689                                     int level)
 690{
 691        int needs_downconvert = 0;
 692        mlog_entry_void();
 693
 694        assert_spin_locked(&lockres->l_lock);
 695
 696        lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
 697
 698        if (level > lockres->l_blocking) {
 699                /* only schedule a downconvert if we haven't already scheduled
 700                 * one that goes low enough to satisfy the level we're
 701                 * blocking.  this also catches the case where we get
 702                 * duplicate BASTs */
 703                if (ocfs2_highest_compat_lock_level(level) <
 704                    ocfs2_highest_compat_lock_level(lockres->l_blocking))
 705                        needs_downconvert = 1;
 706
 707                lockres->l_blocking = level;
 708        }
 709
 710        mlog_exit(needs_downconvert);
 711        return needs_downconvert;
 712}
 713
 714static void ocfs2_blocking_ast(void *opaque, int level)
 715{
 716        struct ocfs2_lock_res *lockres = opaque;
 717        struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
 718        int needs_downconvert;
 719        unsigned long flags;
 720
 721        BUG_ON(level <= LKM_NLMODE);
 722
 723        mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
 724             lockres->l_name, level, lockres->l_level,
 725             ocfs2_lock_type_string(lockres->l_type));
 726
 727        spin_lock_irqsave(&lockres->l_lock, flags);
 728        needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
 729        if (needs_downconvert)
 730                ocfs2_schedule_blocked_lock(osb, lockres);
 731        spin_unlock_irqrestore(&lockres->l_lock, flags);
 732
 733        wake_up(&lockres->l_event);
 734
 735        ocfs2_kick_vote_thread(osb);
 736}
 737
 738static void ocfs2_locking_ast(void *opaque)
 739{
 740        struct ocfs2_lock_res *lockres = opaque;
 741        struct dlm_lockstatus *lksb = &lockres->l_lksb;
 742        unsigned long flags;
 743
 744        spin_lock_irqsave(&lockres->l_lock, flags);
 745
 746        if (lksb->status != DLM_NORMAL) {
 747                mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
 748                     lockres->l_name, lksb->status);
 749                spin_unlock_irqrestore(&lockres->l_lock, flags);
 750                return;
 751        }
 752
 753        switch(lockres->l_action) {
 754        case OCFS2_AST_ATTACH:
 755                ocfs2_generic_handle_attach_action(lockres);
 756                lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
 757                break;
 758        case OCFS2_AST_CONVERT:
 759                ocfs2_generic_handle_convert_action(lockres);
 760                break;
 761        case OCFS2_AST_DOWNCONVERT:
 762                ocfs2_generic_handle_downconvert_action(lockres);
 763                break;
 764        default:
 765                mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
 766                     "lockres flags = 0x%lx, unlock action: %u\n",
 767                     lockres->l_name, lockres->l_action, lockres->l_flags,
 768                     lockres->l_unlock_action);
 769                BUG();
 770        }
 771
 772        /* set it to something invalid so if we get called again we
 773         * can catch it. */
 774        lockres->l_action = OCFS2_AST_INVALID;
 775
 776        wake_up(&lockres->l_event);
 777        spin_unlock_irqrestore(&lockres->l_lock, flags);
 778}
 779
 780static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
 781                                                int convert)
 782{
 783        unsigned long flags;
 784
 785        mlog_entry_void();
 786        spin_lock_irqsave(&lockres->l_lock, flags);
 787        lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
 788        if (convert)
 789                lockres->l_action = OCFS2_AST_INVALID;
 790        else
 791                lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
 792        spin_unlock_irqrestore(&lockres->l_lock, flags);
 793
 794        wake_up(&lockres->l_event);
 795        mlog_exit_void();
 796}
 797
 798/* Note: If we detect another process working on the lock (i.e.,
 799 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
 800 * to do the right thing in that case.
 801 */
 802static int ocfs2_lock_create(struct ocfs2_super *osb,
 803                             struct ocfs2_lock_res *lockres,
 804                             int level,
 805                             int dlm_flags)
 806{
 807        int ret = 0;
 808        enum dlm_status status = DLM_NORMAL;
 809        unsigned long flags;
 810
 811        mlog_entry_void();
 812
 813        mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
 814             dlm_flags);
 815
 816        spin_lock_irqsave(&lockres->l_lock, flags);
 817        if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
 818            (lockres->l_flags & OCFS2_LOCK_BUSY)) {
 819                spin_unlock_irqrestore(&lockres->l_lock, flags);
 820                goto bail;
 821        }
 822
 823        lockres->l_action = OCFS2_AST_ATTACH;
 824        lockres->l_requested = level;
 825        lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
 826        spin_unlock_irqrestore(&lockres->l_lock, flags);
 827
 828        status = dlmlock(osb->dlm,
 829                         level,
 830                         &lockres->l_lksb,
 831                         dlm_flags,
 832                         lockres->l_name,
 833                         OCFS2_LOCK_ID_MAX_LEN - 1,
 834                         ocfs2_locking_ast,
 835                         lockres,
 836                         ocfs2_blocking_ast);
 837        if (status != DLM_NORMAL) {
 838                ocfs2_log_dlm_error("dlmlock", status, lockres);
 839                ret = -EINVAL;
 840                ocfs2_recover_from_dlm_error(lockres, 1);
 841        }
 842
 843        mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
 844
 845bail:
 846        mlog_exit(ret);
 847        return ret;
 848}
 849
 850static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
 851                                        int flag)
 852{
 853        unsigned long flags;
 854        int ret;
 855
 856        spin_lock_irqsave(&lockres->l_lock, flags);
 857        ret = lockres->l_flags & flag;
 858        spin_unlock_irqrestore(&lockres->l_lock, flags);
 859
 860        return ret;
 861}
 862
 863static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
 864
 865{
 866        wait_event(lockres->l_event,
 867                   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
 868}
 869
 870static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
 871
 872{
 873        wait_event(lockres->l_event,
 874                   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
 875}
 876
 877/* predict what lock level we'll be dropping down to on behalf
 878 * of another node, and return true if the currently wanted
 879 * level will be compatible with it. */
 880static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
 881                                                     int wanted)
 882{
 883        BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
 884
 885        return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
 886}
 887
 888static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
 889{
 890        INIT_LIST_HEAD(&mw->mw_item);
 891        init_completion(&mw->mw_complete);
 892}
 893
 894static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
 895{
 896        wait_for_completion(&mw->mw_complete);
 897        /* Re-arm the completion in case we want to wait on it again */
 898        INIT_COMPLETION(mw->mw_complete);
 899        return mw->mw_status;
 900}
 901
 902static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
 903                                    struct ocfs2_mask_waiter *mw,
 904                                    unsigned long mask,
 905                                    unsigned long goal)
 906{
 907        BUG_ON(!list_empty(&mw->mw_item));
 908
 909        assert_spin_locked(&lockres->l_lock);
 910
 911        list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
 912        mw->mw_mask = mask;
 913        mw->mw_goal = goal;
 914}
 915
 916/* returns 0 if the mw that was removed was already satisfied, -EBUSY
 917 * if the mask still hadn't reached its goal */
 918static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
 919                                      struct ocfs2_mask_waiter *mw)
 920{
 921        unsigned long flags;
 922        int ret = 0;
 923
 924        spin_lock_irqsave(&lockres->l_lock, flags);
 925        if (!list_empty(&mw->mw_item)) {
 926                if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
 927                        ret = -EBUSY;
 928
 929                list_del_init(&mw->mw_item);
 930                init_completion(&mw->mw_complete);
 931        }
 932        spin_unlock_irqrestore(&lockres->l_lock, flags);
 933
 934        return ret;
 935
 936}
 937
 938static int ocfs2_cluster_lock(struct ocfs2_super *osb,
 939                              struct ocfs2_lock_res *lockres,
 940                              int level,
 941                              int lkm_flags,
 942                              int arg_flags)
 943{
 944        struct ocfs2_mask_waiter mw;
 945        enum dlm_status status;
 946        int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
 947        int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
 948        unsigned long flags;
 949
 950        mlog_entry_void();
 951
 952        ocfs2_init_mask_waiter(&mw);
 953
 954        if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
 955                lkm_flags |= LKM_VALBLK;
 956
 957again:
 958        wait = 0;
 959
 960        if (catch_signals && signal_pending(current)) {
 961                ret = -ERESTARTSYS;
 962                goto out;
 963        }
 964
 965        spin_lock_irqsave(&lockres->l_lock, flags);
 966
 967        mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
 968                        "Cluster lock called on freeing lockres %s! flags "
 969                        "0x%lx\n", lockres->l_name, lockres->l_flags);
 970
 971        /* We only compare against the currently granted level
 972         * here. If the lock is blocked waiting on a downconvert,
 973         * we'll get caught below. */
 974        if (lockres->l_flags & OCFS2_LOCK_BUSY &&
 975            level > lockres->l_level) {
 976                /* is someone sitting in dlm_lock? If so, wait on
 977                 * them. */
 978                lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
 979                wait = 1;
 980                goto unlock;
 981        }
 982
 983        if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
 984            !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
 985                /* is the lock is currently blocked on behalf of
 986                 * another node */
 987                lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
 988                wait = 1;
 989                goto unlock;
 990        }
 991
 992        if (level > lockres->l_level) {
 993                if (lockres->l_action != OCFS2_AST_INVALID)
 994                        mlog(ML_ERROR, "lockres %s has action %u pending\n",
 995                             lockres->l_name, lockres->l_action);
 996
 997                if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
 998                        lockres->l_action = OCFS2_AST_ATTACH;
 999                        lkm_flags &= ~LKM_CONVERT;
1000                } else {
1001                        lockres->l_action = OCFS2_AST_CONVERT;
1002                        lkm_flags |= LKM_CONVERT;
1003                }
1004
1005                lockres->l_requested = level;
1006                lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1007                spin_unlock_irqrestore(&lockres->l_lock, flags);
1008
1009                BUG_ON(level == LKM_IVMODE);
1010                BUG_ON(level == LKM_NLMODE);
1011
1012                mlog(0, "lock %s, convert from %d to level = %d\n",
1013                     lockres->l_name, lockres->l_level, level);
1014
1015                /* call dlm_lock to upgrade lock now */
1016                status = dlmlock(osb->dlm,
1017                                 level,
1018                                 &lockres->l_lksb,
1019                                 lkm_flags,
1020                                 lockres->l_name,
1021                                 OCFS2_LOCK_ID_MAX_LEN - 1,
1022                                 ocfs2_locking_ast,
1023                                 lockres,
1024                                 ocfs2_blocking_ast);
1025                if (status != DLM_NORMAL) {
1026                        if ((lkm_flags & LKM_NOQUEUE) &&
1027                            (status == DLM_NOTQUEUED))
1028                                ret = -EAGAIN;
1029                        else {
1030                                ocfs2_log_dlm_error("dlmlock", status,
1031                                                    lockres);
1032                                ret = -EINVAL;
1033                        }
1034                        ocfs2_recover_from_dlm_error(lockres, 1);
1035                        goto out;
1036                }
1037
1038                mlog(0, "lock %s, successfull return from dlmlock\n",
1039                     lockres->l_name);
1040
1041                /* At this point we've gone inside the dlm and need to
1042                 * complete our work regardless. */
1043                catch_signals = 0;
1044
1045                /* wait for busy to clear and carry on */
1046                goto again;
1047        }
1048
1049        /* Ok, if we get here then we're good to go. */
1050        ocfs2_inc_holders(lockres, level);
1051
1052        ret = 0;
1053unlock:
1054        spin_unlock_irqrestore(&lockres->l_lock, flags);
1055out:
1056        /*
1057         * This is helping work around a lock inversion between the page lock
1058         * and dlm locks.  One path holds the page lock while calling aops
1059         * which block acquiring dlm locks.  The voting thread holds dlm
1060         * locks while acquiring page locks while down converting data locks.
1061         * This block is helping an aop path notice the inversion and back
1062         * off to unlock its page lock before trying the dlm lock again.
1063         */
1064        if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1065            mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1066                wait = 0;
1067                if (lockres_remove_mask_waiter(lockres, &mw))
1068                        ret = -EAGAIN;
1069                else
1070                        goto again;
1071        }
1072        if (wait) {
1073                ret = ocfs2_wait_for_mask(&mw);
1074                if (ret == 0)
1075                        goto again;
1076                mlog_errno(ret);
1077        }
1078
1079        mlog_exit(ret);
1080        return ret;
1081}
1082
1083static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1084                                 struct ocfs2_lock_res *lockres,
1085                                 int level)
1086{
1087        unsigned long flags;
1088
1089        mlog_entry_void();
1090        spin_lock_irqsave(&lockres->l_lock, flags);
1091        ocfs2_dec_holders(lockres, level);
1092        ocfs2_vote_on_unlock(osb, lockres);
1093        spin_unlock_irqrestore(&lockres->l_lock, flags);
1094        mlog_exit_void();
1095}
1096
1097static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1098                                 struct ocfs2_lock_res *lockres,
1099                                 int ex,
1100                                 int local)
1101{
1102        int level =  ex ? LKM_EXMODE : LKM_PRMODE;
1103        unsigned long flags;
1104        int lkm_flags = local ? LKM_LOCAL : 0;
1105
1106        spin_lock_irqsave(&lockres->l_lock, flags);
1107        BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1108        lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1109        spin_unlock_irqrestore(&lockres->l_lock, flags);
1110
1111        return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1112}
1113
1114/* Grants us an EX lock on the data and metadata resources, skipping
1115 * the normal cluster directory lookup. Use this ONLY on newly created
1116 * inodes which other nodes can't possibly see, and which haven't been
1117 * hashed in the inode hash yet. This can give us a good performance
1118 * increase as it'll skip the network broadcast normally associated
1119 * with creating a new lock resource. */
1120int ocfs2_create_new_inode_locks(struct inode *inode)
1121{
1122        int ret;
1123        struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1124
1125        BUG_ON(!inode);
1126        BUG_ON(!ocfs2_inode_is_new(inode));
1127
1128        mlog_entry_void();
1129
1130        mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1131
1132        /* NOTE: That we don't increment any of the holder counts, nor
1133         * do we add anything to a journal handle. Since this is
1134         * supposed to be a new inode which the cluster doesn't know
1135         * about yet, there is no need to.  As far as the LVB handling
1136         * is concerned, this is basically like acquiring an EX lock
1137         * on a resource which has an invalid one -- we'll set it
1138         * valid when we release the EX. */
1139
1140        ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1141        if (ret) {
1142                mlog_errno(ret);
1143                goto bail;
1144        }
1145
1146        /*
1147         * We don't want to use LKM_LOCAL on a meta data lock as they
1148         * don't use a generation in their lock names.
1149         */
1150        ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0);
1151        if (ret) {
1152                mlog_errno(ret);
1153                goto bail;
1154        }
1155
1156        ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1);
1157        if (ret) {
1158                mlog_errno(ret);
1159                goto bail;
1160        }
1161
1162        ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1163        if (ret) {
1164                mlog_errno(ret);
1165                goto bail;
1166        }
1167
1168bail:
1169        mlog_exit(ret);
1170        return ret;
1171}
1172
1173int ocfs2_rw_lock(struct inode *inode, int write)
1174{
1175        int status, level;
1176        struct ocfs2_lock_res *lockres;
1177        struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1178
1179        BUG_ON(!inode);
1180
1181        mlog_entry_void();
1182
1183        mlog(0, "inode %llu take %s RW lock\n",
1184             (unsigned long long)OCFS2_I(inode)->ip_blkno,
1185             write ? "EXMODE" : "PRMODE");
1186
1187        if (ocfs2_mount_local(osb))
1188                return 0;
1189
1190        lockres = &OCFS2_I(inode)->ip_rw_lockres;
1191
1192        level = write ? LKM_EXMODE : LKM_PRMODE;
1193
1194        status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1195                                    0);
1196        if (status < 0)
1197                mlog_errno(status);
1198
1199        mlog_exit(status);
1200        return status;
1201}
1202
1203void ocfs2_rw_unlock(struct inode *inode, int write)
1204{
1205        int level = write ? LKM_EXMODE : LKM_PRMODE;
1206        struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1207        struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1208
1209        mlog_entry_void();
1210
1211        mlog(0, "inode %llu drop %s RW lock\n",
1212             (unsigned long long)OCFS2_I(inode)->ip_blkno,
1213             write ? "EXMODE" : "PRMODE");
1214
1215        if (!ocfs2_mount_local(osb))
1216                ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1217
1218        mlog_exit_void();
1219}
1220
1221/*
1222 * ocfs2_open_lock always get PR mode lock.
1223 */
1224int ocfs2_open_lock(struct inode *inode)
1225{
1226        int status = 0;
1227        struct ocfs2_lock_res *lockres;
1228        struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1229
1230        BUG_ON(!inode);
1231
1232        mlog_entry_void();
1233
1234        mlog(0, "inode %llu take PRMODE open lock\n",
1235             (unsigned long long)OCFS2_I(inode)->ip_blkno);
1236
1237        if (ocfs2_mount_local(osb))
1238                goto out;
1239
1240        lockres = &OCFS2_I(inode)->ip_open_lockres;
1241
1242        status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1243                                    LKM_PRMODE, 0, 0);
1244        if (status < 0)
1245                mlog_errno(status);
1246
1247out:
1248        mlog_exit(status);
1249        return status;
1250}
1251
1252int ocfs2_try_open_lock(struct inode *inode, int write)
1253{
1254        int status = 0, level;
1255        struct ocfs2_lock_res *lockres;
1256        struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1257
1258        BUG_ON(!inode);
1259
1260        mlog_entry_void();
1261
1262        mlog(0, "inode %llu try to take %s open lock\n",
1263             (unsigned long long)OCFS2_I(inode)->ip_blkno,
1264             write ? "EXMODE" : "PRMODE");
1265
1266        if (ocfs2_mount_local(osb))
1267                goto out;
1268
1269        lockres = &OCFS2_I(inode)->ip_open_lockres;
1270
1271        level = write ? LKM_EXMODE : LKM_PRMODE;
1272
1273        /*
1274         * The file system may already holding a PRMODE/EXMODE open lock.
1275         * Since we pass LKM_NOQUEUE, the request won't block waiting on
1276         * other nodes and the -EAGAIN will indicate to the caller that
1277         * this inode is still in use.
1278         */
1279        status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1280                                    level, LKM_NOQUEUE, 0);
1281
1282out:
1283        mlog_exit(status);
1284        return status;
1285}
1286
1287/*
1288 * ocfs2_open_unlock unlock PR and EX mode open locks.
1289 */
1290void ocfs2_open_unlock(struct inode *inode)
1291{
1292        struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1293        struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1294
1295        mlog_entry_void();
1296
1297        mlog(0, "inode %llu drop open lock\n",
1298             (unsigned long long)OCFS2_I(inode)->ip_blkno);
1299
1300        if (ocfs2_mount_local(osb))
1301                goto out;
1302
1303        if(lockres->l_ro_holders)
1304                ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1305                                     LKM_PRMODE);
1306        if(lockres->l_ex_holders)
1307                ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1308                                     LKM_EXMODE);
1309
1310out:
1311        mlog_exit_void();
1312}
1313
1314int ocfs2_data_lock_full(struct inode *inode,
1315                         int write,
1316                         int arg_flags)
1317{
1318        int status = 0, level;
1319        struct ocfs2_lock_res *lockres;
1320        struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1321
1322        BUG_ON(!inode);
1323
1324        mlog_entry_void();
1325
1326        mlog(0, "inode %llu take %s DATA lock\n",
1327             (unsigned long long)OCFS2_I(inode)->ip_blkno,
1328             write ? "EXMODE" : "PRMODE");
1329
1330        /* We'll allow faking a readonly data lock for
1331         * rodevices. */
1332        if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) {
1333                if (write) {
1334                        status = -EROFS;
1335                        mlog_errno(status);
1336                }
1337                goto out;
1338        }
1339
1340        if (ocfs2_mount_local(osb))
1341                goto out;
1342
1343        lockres = &OCFS2_I(inode)->ip_data_lockres;
1344
1345        level = write ? LKM_EXMODE : LKM_PRMODE;
1346
1347        status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level,
1348                                    0, arg_flags);
1349        if (status < 0 && status != -EAGAIN)
1350                mlog_errno(status);
1351
1352out:
1353        mlog_exit(status);
1354        return status;
1355}
1356
1357/* see ocfs2_meta_lock_with_page() */
1358int ocfs2_data_lock_with_page(struct inode *inode,
1359                              int write,
1360                              struct page *page)
1361{
1362        int ret;
1363
1364        ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK);
1365        if (ret == -EAGAIN) {
1366                unlock_page(page);
1367                if (ocfs2_data_lock(inode, write) == 0)
1368                        ocfs2_data_unlock(inode, write);
1369                ret = AOP_TRUNCATED_PAGE;
1370        }
1371
1372        return ret;
1373}
1374
1375static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
1376                                 struct ocfs2_lock_res *lockres)
1377{
1378        int kick = 0;
1379
1380        mlog_entry_void();
1381
1382        /* If we know that another node is waiting on our lock, kick
1383         * the vote thread * pre-emptively when we reach a release
1384         * condition. */
1385        if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1386                switch(lockres->l_blocking) {
1387                case LKM_EXMODE:
1388                        if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1389                                kick = 1;
1390                        break;
1391                case LKM_PRMODE:
1392                        if (!lockres->l_ex_holders)
1393                                kick = 1;
1394                        break;
1395                default:
1396                        BUG();
1397                }
1398        }
1399
1400        if (kick)
1401                ocfs2_kick_vote_thread(osb);
1402
1403        mlog_exit_void();
1404}
1405
1406void ocfs2_data_unlock(struct inode *inode,
1407                       int write)
1408{
1409        int level = write ? LKM_EXMODE : LKM_PRMODE;
1410        struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres;
1411        struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1412
1413        mlog_entry_void();
1414
1415        mlog(0, "inode %llu drop %s DATA lock\n",
1416             (unsigned long long)OCFS2_I(inode)->ip_blkno,
1417             write ? "EXMODE" : "PRMODE");
1418
1419        if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
1420            !ocfs2_mount_local(osb))
1421                ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1422
1423        mlog_exit_void();
1424}
1425
1426#define OCFS2_SEC_BITS   34
1427#define OCFS2_SEC_SHIFT  (64 - 34)
1428#define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1429
1430/* LVB only has room for 64 bits of time here so we pack it for
1431 * now. */
1432static u64 ocfs2_pack_timespec(struct timespec *spec)
1433{
1434        u64 res;
1435        u64 sec = spec->tv_sec;
1436        u32 nsec = spec->tv_nsec;
1437
1438        res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1439
1440        return res;
1441}
1442
1443/* Call this with the lockres locked. I am reasonably sure we don't
1444 * need ip_lock in this function as anyone who would be changing those
1445 * values is supposed to be blocked in ocfs2_meta_lock right now. */
1446static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1447{
1448        struct ocfs2_inode_info *oi = OCFS2_I(inode);
1449        struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1450        struct ocfs2_meta_lvb *lvb;
1451
1452        mlog_entry_void();
1453
1454        lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1455
1456        /*
1457         * Invalidate the LVB of a deleted inode - this way other
1458         * nodes are forced to go to disk and discover the new inode
1459         * status.
1460         */
1461        if (oi->ip_flags & OCFS2_INODE_DELETED) {
1462                lvb->lvb_version = 0;
1463                goto out;
1464        }
1465
1466        lvb->lvb_version   = OCFS2_LVB_VERSION;
1467        lvb->lvb_isize     = cpu_to_be64(i_size_read(inode));
1468        lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1469        lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1470        lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1471        lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1472        lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1473        lvb->lvb_iatime_packed  =
1474                cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1475        lvb->lvb_ictime_packed =
1476                cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1477        lvb->lvb_imtime_packed =
1478                cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1479        lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1480        lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
1481        lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1482
1483out:
1484        mlog_meta_lvb(0, lockres);
1485
1486        mlog_exit_void();
1487}
1488
1489static void ocfs2_unpack_timespec(struct timespec *spec,
1490                                  u64 packed_time)
1491{
1492        spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1493        spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1494}
1495
1496static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1497{
1498        struct ocfs2_inode_info *oi = OCFS2_I(inode);
1499        struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1500        struct ocfs2_meta_lvb *lvb;
1501
1502        mlog_entry_void();
1503
1504        mlog_meta_lvb(0, lockres);
1505
1506        lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1507
1508        /* We're safe here without the lockres lock... */
1509        spin_lock(&oi->ip_lock);
1510        oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1511        i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1512
1513        oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1514        oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
1515        ocfs2_set_inode_flags(inode);
1516
1517        /* fast-symlinks are a special case */
1518        if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1519                inode->i_blocks = 0;
1520        else
1521                inode->i_blocks = ocfs2_inode_sector_count(inode);
1522
1523        inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
1524        inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
1525        inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
1526        inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
1527        ocfs2_unpack_timespec(&inode->i_atime,
1528                              be64_to_cpu(lvb->lvb_iatime_packed));
1529        ocfs2_unpack_timespec(&inode->i_mtime,
1530                              be64_to_cpu(lvb->lvb_imtime_packed));
1531        ocfs2_unpack_timespec(&inode->i_ctime,
1532                              be64_to_cpu(lvb->lvb_ictime_packed));
1533        spin_unlock(&oi->ip_lock);
1534
1535        mlog_exit_void();
1536}
1537
1538static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1539                                              struct ocfs2_lock_res *lockres)
1540{
1541        struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1542
1543        if (lvb->lvb_version == OCFS2_LVB_VERSION
1544            && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
1545                return 1;
1546        return 0;
1547}
1548
1549/* Determine whether a lock resource needs to be refreshed, and
1550 * arbitrate who gets to refresh it.
1551 *
1552 *   0 means no refresh needed.
1553 *
1554 *   > 0 means you need to refresh this and you MUST call
1555 *   ocfs2_complete_lock_res_refresh afterwards. */
1556static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1557{
1558        unsigned long flags;
1559        int status = 0;
1560
1561        mlog_entry_void();
1562
1563refresh_check:
1564        spin_lock_irqsave(&lockres->l_lock, flags);
1565        if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1566                spin_unlock_irqrestore(&lockres->l_lock, flags);
1567                goto bail;
1568        }
1569
1570        if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1571                spin_unlock_irqrestore(&lockres->l_lock, flags);
1572
1573                ocfs2_wait_on_refreshing_lock(lockres);
1574                goto refresh_check;
1575        }
1576
1577        /* Ok, I'll be the one to refresh this lock. */
1578        lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1579        spin_unlock_irqrestore(&lockres->l_lock, flags);
1580
1581        status = 1;
1582bail:
1583        mlog_exit(status);
1584        return status;
1585}
1586
1587/* If status is non zero, I'll mark it as not being in refresh
1588 * anymroe, but i won't clear the needs refresh flag. */
1589static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1590                                                   int status)
1591{
1592        unsigned long flags;
1593        mlog_entry_void();
1594
1595        spin_lock_irqsave(&lockres->l_lock, flags);
1596        lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1597        if (!status)
1598                lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1599        spin_unlock_irqrestore(&lockres->l_lock, flags);
1600
1601        wake_up(&lockres->l_event);
1602
1603        mlog_exit_void();
1604}
1605
1606/* may or may not return a bh if it went to disk. */
1607static int ocfs2_meta_lock_update(struct inode *inode,
1608                                  struct buffer_head **bh)
1609{
1610        int status = 0;
1611        struct ocfs2_inode_info *oi = OCFS2_I(inode);
1612        struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1613        struct ocfs2_dinode *fe;
1614        struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1615
1616        mlog_entry_void();
1617
1618        if (ocfs2_mount_local(osb))
1619                goto bail;
1620
1621        spin_lock(&oi->ip_lock);
1622        if (oi->ip_flags & OCFS2_INODE_DELETED) {
1623                mlog(0, "Orphaned inode %llu was deleted while we "
1624                     "were waiting on a lock. ip_flags = 0x%x\n",
1625                     (unsigned long long)oi->ip_blkno, oi->ip_flags);
1626                spin_unlock(&oi->ip_lock);
1627                status = -ENOENT;
1628                goto bail;
1629        }
1630        spin_unlock(&oi->ip_lock);
1631
1632        if (!ocfs2_should_refresh_lock_res(lockres))
1633                goto bail;
1634
1635        /* This will discard any caching information we might have had
1636         * for the inode metadata. */
1637        ocfs2_metadata_cache_purge(inode);
1638
1639        ocfs2_extent_map_trunc(inode, 0);
1640
1641        if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
1642                mlog(0, "Trusting LVB on inode %llu\n",
1643                     (unsigned long long)oi->ip_blkno);
1644                ocfs2_refresh_inode_from_lvb(inode);
1645        } else {
1646                /* Boo, we have to go to disk. */
1647                /* read bh, cast, ocfs2_refresh_inode */
1648                status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
1649                                          bh, OCFS2_BH_CACHED, inode);
1650                if (status < 0) {
1651                        mlog_errno(status);
1652                        goto bail_refresh;
1653                }
1654                fe = (struct ocfs2_dinode *) (*bh)->b_data;
1655
1656                /* This is a good chance to make sure we're not
1657                 * locking an invalid object.
1658                 *
1659                 * We bug on a stale inode here because we checked
1660                 * above whether it was wiped from disk. The wiping
1661                 * node provides a guarantee that we receive that
1662                 * message and can mark the inode before dropping any
1663                 * locks associated with it. */
1664                if (!OCFS2_IS_VALID_DINODE(fe)) {
1665                        OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
1666                        status = -EIO;
1667                        goto bail_refresh;
1668                }
1669                mlog_bug_on_msg(inode->i_generation !=
1670                                le32_to_cpu(fe->i_generation),
1671                                "Invalid dinode %llu disk generation: %u "
1672                                "inode->i_generation: %u\n",
1673                                (unsigned long long)oi->ip_blkno,
1674                                le32_to_cpu(fe->i_generation),
1675                                inode->i_generation);
1676                mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
1677                                !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
1678                                "Stale dinode %llu dtime: %llu flags: 0x%x\n",
1679                                (unsigned long long)oi->ip_blkno,
1680                                (unsigned long long)le64_to_cpu(fe->i_dtime),
1681                                le32_to_cpu(fe->i_flags));
1682
1683                ocfs2_refresh_inode(inode, fe);
1684        }
1685
1686        status = 0;
1687bail_refresh:
1688        ocfs2_complete_lock_res_refresh(lockres, status);
1689bail:
1690        mlog_exit(status);
1691        return status;
1692}
1693
1694static int ocfs2_assign_bh(struct inode *inode,
1695                           struct buffer_head **ret_bh,
1696                           struct buffer_head *passed_bh)
1697{
1698        int status;
1699
1700        if (passed_bh) {
1701                /* Ok, the update went to disk for us, use the
1702                 * returned bh. */
1703                *ret_bh = passed_bh;
1704                get_bh(*ret_bh);
1705
1706                return 0;
1707        }
1708
1709        status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
1710                                  OCFS2_I(inode)->ip_blkno,
1711                                  ret_bh,
1712                                  OCFS2_BH_CACHED,
1713                                  inode);
1714        if (status < 0)
1715                mlog_errno(status);
1716
1717        return status;
1718}
1719
1720/*
1721 * returns < 0 error if the callback will never be called, otherwise
1722 * the result of the lock will be communicated via the callback.
1723 */
1724int ocfs2_meta_lock_full(struct inode *inode,
1725                         struct buffer_head **ret_bh,
1726                         int ex,
1727                         int arg_flags)
1728{
1729        int status, level, dlm_flags, acquired;
1730        struct ocfs2_lock_res *lockres = NULL;
1731        struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1732        struct buffer_head *local_bh = NULL;
1733
1734        BUG_ON(!inode);
1735
1736        mlog_entry_void();
1737
1738        mlog(0, "inode %llu, take %s META lock\n",
1739             (unsigned long long)OCFS2_I(inode)->ip_blkno,
1740             ex ? "EXMODE" : "PRMODE");
1741
1742        status = 0;
1743        acquired = 0;
1744        /* We'll allow faking a readonly metadata lock for
1745         * rodevices. */
1746        if (ocfs2_is_hard_readonly(osb)) {
1747                if (ex)
1748                        status = -EROFS;
1749                goto bail;
1750        }
1751
1752        if (ocfs2_mount_local(osb))
1753                goto local;
1754
1755        if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1756                wait_event(osb->recovery_event,
1757                           ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1758
1759        lockres = &OCFS2_I(inode)->ip_meta_lockres;
1760        level = ex ? LKM_EXMODE : LKM_PRMODE;
1761        dlm_flags = 0;
1762        if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1763                dlm_flags |= LKM_NOQUEUE;
1764
1765        status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1766        if (status < 0) {
1767                if (status != -EAGAIN && status != -EIOCBRETRY)
1768                        mlog_errno(status);
1769                goto bail;
1770        }
1771
1772        /* Notify the error cleanup path to drop the cluster lock. */
1773        acquired = 1;
1774
1775        /* We wait twice because a node may have died while we were in
1776         * the lower dlm layers. The second time though, we've
1777         * committed to owning this lock so we don't allow signals to
1778         * abort the operation. */
1779        if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1780                wait_event(osb->recovery_event,
1781                           ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1782
1783local:
1784        /*
1785         * We only see this flag if we're being called from
1786         * ocfs2_read_locked_inode(). It means we're locking an inode
1787         * which hasn't been populated yet, so clear the refresh flag
1788         * and let the caller handle it.
1789         */
1790        if (inode->i_state & I_NEW) {
1791                status = 0;
1792                if (lockres)
1793                        ocfs2_complete_lock_res_refresh(lockres, 0);
1794                goto bail;
1795        }
1796
1797        /* This is fun. The caller may want a bh back, or it may
1798         * not. ocfs2_meta_lock_update definitely wants one in, but
1799         * may or may not read one, depending on what's in the
1800         * LVB. The result of all of this is that we've *only* gone to
1801         * disk if we have to, so the complexity is worthwhile. */
1802        status = ocfs2_meta_lock_update(inode, &local_bh);
1803        if (status < 0) {
1804                if (status != -ENOENT)
1805                        mlog_errno(status);
1806                goto bail;
1807        }
1808
1809        if (ret_bh) {
1810                status = ocfs2_assign_bh(inode, ret_bh, local_bh);
1811                if (status < 0) {
1812                        mlog_errno(status);
1813                        goto bail;
1814                }
1815        }
1816
1817bail:
1818        if (status < 0) {
1819                if (ret_bh && (*ret_bh)) {
1820                        brelse(*ret_bh);
1821                        *ret_bh = NULL;
1822                }
1823                if (acquired)
1824                        ocfs2_meta_unlock(inode, ex);
1825        }
1826
1827        if (local_bh)
1828                brelse(local_bh);
1829
1830        mlog_exit(status);
1831        return status;
1832}
1833
1834/*
1835 * This is working around a lock inversion between tasks acquiring DLM locks
1836 * while holding a page lock and the vote thread which blocks dlm lock acquiry
1837 * while acquiring page locks.
1838 *
1839 * ** These _with_page variantes are only intended to be called from aop
1840 * methods that hold page locks and return a very specific *positive* error
1841 * code that aop methods pass up to the VFS -- test for errors with != 0. **
1842 *
1843 * The DLM is called such that it returns -EAGAIN if it would have blocked
1844 * waiting for the vote thread.  In that case we unlock our page so the vote
1845 * thread can make progress.  Once we've done this we have to return
1846 * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
1847 * into the VFS who will then immediately retry the aop call.
1848 *
1849 * We do a blocking lock and immediate unlock before returning, though, so that
1850 * the lock has a great chance of being cached on this node by the time the VFS
1851 * calls back to retry the aop.    This has a potential to livelock as nodes
1852 * ping locks back and forth, but that's a risk we're willing to take to avoid
1853 * the lock inversion simply.
1854 */
1855int ocfs2_meta_lock_with_page(struct inode *inode,
1856                              struct buffer_head **ret_bh,
1857                              int ex,
1858                              struct page *page)
1859{
1860        int ret;
1861
1862        ret = ocfs2_meta_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
1863        if (ret == -EAGAIN) {
1864                unlock_page(page);
1865                if (ocfs2_meta_lock(inode, ret_bh, ex) == 0)
1866                        ocfs2_meta_unlock(inode, ex);
1867                ret = AOP_TRUNCATED_PAGE;
1868        }
1869
1870        return ret;
1871}
1872
1873int ocfs2_meta_lock_atime(struct inode *inode,
1874                          struct vfsmount *vfsmnt,
1875                          int *level)
1876{
1877        int ret;
1878
1879        mlog_entry_void();
1880        ret = ocfs2_meta_lock(inode, NULL, 0);
1881        if (ret < 0) {
1882                mlog_errno(ret);
1883                return ret;
1884        }
1885
1886        /*
1887         * If we should update atime, we will get EX lock,
1888         * otherwise we just get PR lock.
1889         */
1890        if (ocfs2_should_update_atime(inode, vfsmnt)) {
1891                struct buffer_head *bh = NULL;
1892
1893                ocfs2_meta_unlock(inode, 0);
1894                ret = ocfs2_meta_lock(inode, &bh, 1);
1895                if (ret < 0) {
1896                        mlog_errno(ret);
1897                        return ret;
1898                }
1899                *level = 1;
1900                if (ocfs2_should_update_atime(inode, vfsmnt))
1901                        ocfs2_update_inode_atime(inode, bh);
1902                if (bh)
1903                        brelse(bh);
1904        } else
1905                *level = 0;
1906
1907        mlog_exit(ret);
1908        return ret;
1909}
1910
1911void ocfs2_meta_unlock(struct inode *inode,
1912                       int ex)
1913{
1914        int level = ex ? LKM_EXMODE : LKM_PRMODE;
1915        struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
1916        struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1917
1918        mlog_entry_void();
1919
1920        mlog(0, "inode %llu drop %s META lock\n",
1921             (unsigned long long)OCFS2_I(inode)->ip_blkno,
1922             ex ? "EXMODE" : "PRMODE");
1923
1924        if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
1925            !ocfs2_mount_local(osb))
1926                ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1927
1928        mlog_exit_void();
1929}
1930
1931int ocfs2_super_lock(struct ocfs2_super *osb,
1932                     int ex)
1933{
1934        int status = 0;
1935        int level = ex ? LKM_EXMODE : LKM_PRMODE;
1936        struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1937        struct buffer_head *bh;
1938        struct ocfs2_slot_info *si = osb->slot_info;
1939
1940        mlog_entry_void();
1941
1942        if (ocfs2_is_hard_readonly(osb))
1943                return -EROFS;
1944
1945        if (ocfs2_mount_local(osb))
1946                goto bail;
1947
1948        status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1949        if (status < 0) {
1950                mlog_errno(status);
1951                goto bail;
1952        }
1953
1954        /* The super block lock path is really in the best position to
1955         * know when resources covered by the lock need to be
1956         * refreshed, so we do it here. Of course, making sense of
1957         * everything is up to the caller :) */
1958        status = ocfs2_should_refresh_lock_res(lockres);
1959        if (status < 0) {
1960                mlog_errno(status);
1961                goto bail;
1962        }
1963        if (status) {
1964                bh = si->si_bh;
1965                status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
1966                                          si->si_inode);
1967                if (status == 0)
1968                        ocfs2_update_slot_info(si);
1969
1970                ocfs2_complete_lock_res_refresh(lockres, status);
1971
1972                if (status < 0)
1973                        mlog_errno(status);
1974        }
1975bail:
1976        mlog_exit(status);
1977        return status;
1978}
1979
1980void ocfs2_super_unlock(struct ocfs2_super *osb,
1981                        int ex)
1982{
1983        int level = ex ? LKM_EXMODE : LKM_PRMODE;
1984        struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1985
1986        if (!ocfs2_mount_local(osb))
1987                ocfs2_cluster_unlock(osb, lockres, level);
1988}
1989
1990int ocfs2_rename_lock(struct ocfs2_super *osb)
1991{
1992        int status;
1993        struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1994
1995        if (ocfs2_is_hard_readonly(osb))
1996                return -EROFS;
1997
1998        if (ocfs2_mount_local(osb))
1999                return 0;
2000
2001        status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
2002        if (status < 0)
2003                mlog_errno(status);
2004
2005        return status;
2006}
2007
2008void ocfs2_rename_unlock(struct ocfs2_super *osb)
2009{
2010        struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2011
2012        if (!ocfs2_mount_local(osb))
2013                ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
2014}
2015
2016int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2017{
2018        int ret;
2019        int level = ex ? LKM_EXMODE : LKM_PRMODE;
2020        struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2021        struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2022
2023        BUG_ON(!dl);
2024
2025        if (ocfs2_is_hard_readonly(osb))
2026                return -EROFS;
2027
2028        if (ocfs2_mount_local(osb))
2029                return 0;
2030
2031        ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2032        if (ret < 0)
2033                mlog_errno(ret);
2034
2035        return ret;
2036}
2037
2038void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2039{
2040        int level = ex ? LKM_EXMODE : LKM_PRMODE;
2041        struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2042        struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2043
2044        if (!ocfs2_mount_local(osb))
2045                ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
2046}
2047
2048/* Reference counting of the dlm debug structure. We want this because
2049 * open references on the debug inodes can live on after a mount, so
2050 * we can't rely on the ocfs2_super to always exist. */
2051static void ocfs2_dlm_debug_free(struct kref *kref)
2052{
2053        struct ocfs2_dlm_debug *dlm_debug;
2054
2055        dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
2056
2057        kfree(dlm_debug);
2058}
2059
2060void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
2061{
2062        if (dlm_debug)
2063                kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
2064}
2065
2066static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
2067{
2068        kref_get(&debug->d_refcnt);
2069}
2070
2071struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
2072{
2073        struct ocfs2_dlm_debug *dlm_debug;
2074
2075        dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
2076        if (!dlm_debug) {
2077                mlog_errno(-ENOMEM);
2078                goto out;
2079        }
2080
2081        kref_init(&dlm_debug->d_refcnt);
2082        INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
2083        dlm_debug->d_locking_state = NULL;
2084out:
2085        return dlm_debug;
2086}
2087
2088/* Access to this is arbitrated for us via seq_file->sem. */
2089struct ocfs2_dlm_seq_priv {
2090        struct ocfs2_dlm_debug *p_dlm_debug;
2091        struct ocfs2_lock_res p_iter_res;
2092        struct ocfs2_lock_res p_tmp_res;
2093};
2094
2095static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
2096                                                 struct ocfs2_dlm_seq_priv *priv)
2097{
2098        struct ocfs2_lock_res *iter, *ret = NULL;
2099        struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
2100
2101        assert_spin_locked(&ocfs2_dlm_tracking_lock);
2102
2103        list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
2104                /* discover the head of the list */
2105                if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
2106                        mlog(0, "End of list found, %p\n", ret);
2107                        break;
2108                }
2109
2110                /* We track our "dummy" iteration lockres' by a NULL
2111                 * l_ops field. */
2112                if (iter->l_ops != NULL) {
2113                        ret = iter;
2114                        break;
2115                }
2116        }
2117
2118        return ret;
2119}
2120
2121static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
2122{
2123        struct ocfs2_dlm_seq_priv *priv = m->private;
2124        struct ocfs2_lock_res *iter;
2125
2126        spin_lock(&ocfs2_dlm_tracking_lock);
2127        iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
2128        if (iter) {
2129                /* Since lockres' have the lifetime of their container
2130                 * (which can be inodes, ocfs2_supers, etc) we want to
2131                 * copy this out to a temporary lockres while still
2132                 * under the spinlock. Obviously after this we can't
2133                 * trust any pointers on the copy returned, but that's
2134                 * ok as the information we want isn't typically held
2135                 * in them. */
2136                priv->p_tmp_res = *iter;
2137                iter = &priv->p_tmp_res;
2138        }
2139        spin_unlock(&ocfs2_dlm_tracking_lock);
2140
2141        return iter;
2142}
2143
2144static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
2145{
2146}
2147
2148static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
2149{
2150        struct ocfs2_dlm_seq_priv *priv = m->private;
2151        struct ocfs2_lock_res *iter = v;
2152        struct ocfs2_lock_res *dummy = &priv->p_iter_res;
2153
2154        spin_lock(&ocfs2_dlm_tracking_lock);
2155        iter = ocfs2_dlm_next_res(iter, priv);
2156        list_del_init(&dummy->l_debug_list);
2157        if (iter) {
2158                list_add(&dummy->l_debug_list, &iter->l_debug_list);
2159                priv->p_tmp_res = *iter;
2160                iter = &priv->p_tmp_res;
2161        }
2162        spin_unlock(&ocfs2_dlm_tracking_lock);
2163
2164        return iter;
2165}
2166
2167/* So that debugfs.ocfs2 can determine which format is being used */
2168#define OCFS2_DLM_DEBUG_STR_VERSION 1
2169static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2170{
2171        int i;
2172        char *lvb;
2173        struct ocfs2_lock_res *lockres = v;
2174
2175        if (!lockres)
2176                return -EINVAL;
2177
2178        seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2179
2180        if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2181                seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2182                           lockres->l_name,
2183                           (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2184        else
2185                seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2186
2187        seq_printf(m, "%d\t"
2188                   "0x%lx\t"
2189                   "0x%x\t"
2190                   "0x%x\t"
2191                   "%u\t"
2192                   "%u\t"
2193                   "%d\t"
2194                   "%d\t",
2195                   lockres->l_level,
2196                   lockres->l_flags,
2197                   lockres->l_action,
2198                   lockres->l_unlock_action,
2199                   lockres->l_ro_holders,
2200                   lockres->l_ex_holders,
2201                   lockres->l_requested,
2202                   lockres->l_blocking);
2203
2204        /* Dump the raw LVB */
2205        lvb = lockres->l_lksb.lvb;
2206        for(i = 0; i < DLM_LVB_LEN; i++)
2207                seq_printf(m, "0x%x\t", lvb[i]);
2208
2209        /* End the line */
2210        seq_printf(m, "\n");
2211        return 0;
2212}
2213
2214static struct seq_operations ocfs2_dlm_seq_ops = {
2215        .start =        ocfs2_dlm_seq_start,
2216        .stop =         ocfs2_dlm_seq_stop,
2217        .next =         ocfs2_dlm_seq_next,
2218        .show =         ocfs2_dlm_seq_show,
2219};
2220
2221static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2222{
2223        struct seq_file *seq = (struct seq_file *) file->private_data;
2224        struct ocfs2_dlm_seq_priv *priv = seq->private;
2225        struct ocfs2_lock_res *res = &priv->p_iter_res;
2226
2227        ocfs2_remove_lockres_tracking(res);
2228        ocfs2_put_dlm_debug(priv->p_dlm_debug);
2229        return seq_release_private(inode, file);
2230}
2231
2232static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2233{
2234        int ret;
2235        struct ocfs2_dlm_seq_priv *priv;
2236        struct seq_file *seq;
2237        struct ocfs2_super *osb;
2238
2239        priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2240        if (!priv) {
2241                ret = -ENOMEM;
2242                mlog_errno(ret);
2243                goto out;
2244        }
2245        osb = inode->i_private;
2246        ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2247        priv->p_dlm_debug = osb->osb_dlm_debug;
2248        INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2249
2250        ret = seq_open(file, &ocfs2_dlm_seq_ops);
2251        if (ret) {
2252                kfree(priv);
2253                mlog_errno(ret);
2254                goto out;
2255        }
2256
2257        seq = (struct seq_file *) file->private_data;
2258        seq->private = priv;
2259
2260        ocfs2_add_lockres_tracking(&priv->p_iter_res,
2261                                   priv->p_dlm_debug);
2262
2263out:
2264        return ret;
2265}
2266
2267static const struct file_operations ocfs2_dlm_debug_fops = {
2268        .open =         ocfs2_dlm_debug_open,
2269        .release =      ocfs2_dlm_debug_release,
2270        .read =         seq_read,
2271        .llseek =       seq_lseek,
2272};
2273
2274static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2275{
2276        int ret = 0;
2277        struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2278
2279        dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2280                                                         S_IFREG|S_IRUSR,
2281                                                         osb->osb_debug_root,
2282                                                         osb,
2283                                                         &ocfs2_dlm_debug_fops);
2284        if (!dlm_debug->d_locking_state) {
2285                ret = -EINVAL;
2286                mlog(ML_ERROR,
2287                     "Unable to create locking state debugfs file.\n");
2288                goto out;
2289        }
2290
2291        ocfs2_get_dlm_debug(dlm_debug);
2292out:
2293        return ret;
2294}
2295
2296static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2297{
2298        struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2299
2300        if (dlm_debug) {
2301                debugfs_remove(dlm_debug->d_locking_state);
2302                ocfs2_put_dlm_debug(dlm_debug);
2303        }
2304}
2305
2306int ocfs2_dlm_init(struct ocfs2_super *osb)
2307{
2308        int status = 0;
2309        u32 dlm_key;
2310        struct dlm_ctxt *dlm = NULL;
2311
2312        mlog_entry_void();
2313
2314        if (ocfs2_mount_local(osb))
2315                goto local;
2316
2317        status = ocfs2_dlm_init_debug(osb);
2318        if (status < 0) {
2319                mlog_errno(status);
2320                goto bail;
2321        }
2322
2323        /* launch vote thread */
2324        osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote");
2325        if (IS_ERR(osb->vote_task)) {
2326                status = PTR_ERR(osb->vote_task);
2327                osb->vote_task = NULL;
2328                mlog_errno(status);
2329                goto bail;
2330        }
2331
2332        /* used by the dlm code to make message headers unique, each
2333         * node in this domain must agree on this. */
2334        dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2335
2336        /* for now, uuid == domain */
2337        dlm = dlm_register_domain(osb->uuid_str, dlm_key);
2338        if (IS_ERR(dlm)) {
2339                status = PTR_ERR(dlm);
2340                mlog_errno(status);
2341                goto bail;
2342        }
2343
2344        dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2345
2346local:
2347        ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2348        ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2349
2350        osb->dlm = dlm;
2351
2352        status = 0;
2353bail:
2354        if (status < 0) {
2355                ocfs2_dlm_shutdown_debug(osb);
2356                if (osb->vote_task)
2357                        kthread_stop(osb->vote_task);
2358        }
2359
2360        mlog_exit(status);
2361        return status;
2362}
2363
2364void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2365{
2366        mlog_entry_void();
2367
2368        dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2369
2370        ocfs2_drop_osb_locks(osb);
2371
2372        if (osb->vote_task) {
2373                kthread_stop(osb->vote_task);
2374                osb->vote_task = NULL;
2375        }
2376
2377        ocfs2_lock_res_free(&osb->osb_super_lockres);
2378        ocfs2_lock_res_free(&osb->osb_rename_lockres);
2379
2380        dlm_unregister_domain(osb->dlm);
2381        osb->dlm = NULL;
2382
2383        ocfs2_dlm_shutdown_debug(osb);
2384
2385        mlog_exit_void();
2386}
2387
2388static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
2389{
2390        struct ocfs2_lock_res *lockres = opaque;
2391        unsigned long flags;
2392
2393        mlog_entry_void();
2394
2395        mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2396             lockres->l_unlock_action);
2397
2398        spin_lock_irqsave(&lockres->l_lock, flags);
2399        /* We tried to cancel a convert request, but it was already
2400         * granted. All we want to do here is clear our unlock
2401         * state. The wake_up call done at the bottom is redundant
2402         * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2403         * hurt anything anyway */
2404        if (status == DLM_CANCELGRANT &&
2405            lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2406                mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2407
2408                /* We don't clear the busy flag in this case as it
2409                 * should have been cleared by the ast which the dlm
2410                 * has called. */
2411                goto complete_unlock;
2412        }
2413
2414        if (status != DLM_NORMAL) {
2415                mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
2416                     "unlock_action %d\n", status, lockres->l_name,
2417                     lockres->l_unlock_action);
2418                spin_unlock_irqrestore(&lockres->l_lock, flags);
2419                return;
2420        }
2421
2422        switch(lockres->l_unlock_action) {
2423        case OCFS2_UNLOCK_CANCEL_CONVERT:
2424                mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2425                lockres->l_action = OCFS2_AST_INVALID;
2426                break;
2427        case OCFS2_UNLOCK_DROP_LOCK:
2428                lockres->l_level = LKM_IVMODE;
2429                break;
2430        default:
2431                BUG();
2432        }
2433
2434        lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2435complete_unlock:
2436        lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2437        spin_unlock_irqrestore(&lockres->l_lock, flags);
2438
2439        wake_up(&lockres->l_event);
2440
2441        mlog_exit_void();
2442}
2443
2444static int ocfs2_drop_lock(struct ocfs2_super *osb,
2445                           struct ocfs2_lock_res *lockres)
2446{
2447        enum dlm_status status;
2448        unsigned long flags;
2449        int lkm_flags = 0;
2450
2451        /* We didn't get anywhere near actually using this lockres. */
2452        if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2453                goto out;
2454
2455        if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
2456                lkm_flags |= LKM_VALBLK;
2457
2458        spin_lock_irqsave(&lockres->l_lock, flags);
2459
2460        mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2461                        "lockres %s, flags 0x%lx\n",
2462                        lockres->l_name, lockres->l_flags);
2463
2464        while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2465                mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2466                     "%u, unlock_action = %u\n",
2467                     lockres->l_name, lockres->l_flags, lockres->l_action,
2468                     lockres->l_unlock_action);
2469
2470                spin_unlock_irqrestore(&lockres->l_lock, flags);
2471
2472                /* XXX: Today we just wait on any busy
2473                 * locks... Perhaps we need to cancel converts in the
2474                 * future? */
2475                ocfs2_wait_on_busy_lock(lockres);
2476
2477                spin_lock_irqsave(&lockres->l_lock, flags);
2478        }
2479
2480        if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2481                if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2482                    lockres->l_level == LKM_EXMODE &&
2483                    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2484                        lockres->l_ops->set_lvb(lockres);
2485        }
2486
2487        if (lockres->l_flags & OCFS2_LOCK_BUSY)
2488                mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2489                     lockres->l_name);
2490        if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2491                mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2492
2493        if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2494                spin_unlock_irqrestore(&lockres->l_lock, flags);
2495                goto out;
2496        }
2497
2498        lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2499
2500        /* make sure we never get here while waiting for an ast to
2501         * fire. */
2502        BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2503
2504        /* is this necessary? */
2505        lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2506        lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2507        spin_unlock_irqrestore(&lockres->l_lock, flags);
2508
2509        mlog(0, "lock %s\n", lockres->l_name);
2510
2511        status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags,
2512                           ocfs2_unlock_ast, lockres);
2513        if (status != DLM_NORMAL) {
2514                ocfs2_log_dlm_error("dlmunlock", status, lockres);
2515                mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2516                dlm_print_one_lock(lockres->l_lksb.lockid);
2517                BUG();
2518        }
2519        mlog(0, "lock %s, successfull return from dlmunlock\n",
2520             lockres->l_name);
2521
2522        ocfs2_wait_on_busy_lock(lockres);
2523out:
2524        mlog_exit(0);
2525        return 0;
2526}
2527
2528/* Mark the lockres as being dropped. It will no longer be
2529 * queued if blocking, but we still may have to wait on it
2530 * being dequeued from the vote thread before we can consider
2531 * it safe to drop. 
2532 *
2533 * You can *not* attempt to call cluster_lock on this lockres anymore. */
2534void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2535{
2536        int status;
2537        struct ocfs2_mask_waiter mw;
2538        unsigned long flags;
2539
2540        ocfs2_init_mask_waiter(&mw);
2541
2542        spin_lock_irqsave(&lockres->l_lock, flags);
2543        lockres->l_flags |= OCFS2_LOCK_FREEING;
2544        while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2545                lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2546                spin_unlock_irqrestore(&lockres->l_lock, flags);
2547
2548                mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2549
2550                status = ocfs2_wait_for_mask(&mw);
2551                if (status)
2552                        mlog_errno(status);
2553
2554                spin_lock_irqsave(&lockres->l_lock, flags);
2555        }
2556        spin_unlock_irqrestore(&lockres->l_lock, flags);
2557}
2558
2559void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2560                               struct ocfs2_lock_res *lockres)
2561{
2562        int ret;
2563
2564        ocfs2_mark_lockres_freeing(lockres);
2565        ret = ocfs2_drop_lock(osb, lockres);
2566        if (ret)
2567                mlog_errno(ret);
2568}
2569
2570static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2571{
2572        ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2573        ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2574}
2575
2576int ocfs2_drop_inode_locks(struct inode *inode)
2577{
2578        int status, err;
2579
2580        mlog_entry_void();
2581
2582        /* No need to call ocfs2_mark_lockres_freeing here -
2583         * ocfs2_clear_inode has done it for us. */
2584
2585        err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2586                              &OCFS2_I(inode)->ip_open_lockres);
2587        if (err < 0)
2588                mlog_errno(err);
2589
2590        status = err;
2591
2592        err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2593                              &OCFS2_I(inode)->ip_data_lockres);
2594        if (err < 0)
2595                mlog_errno(err);
2596        if (err < 0 && !status)
2597                status = err;
2598
2599        err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2600                              &OCFS2_I(inode)->ip_meta_lockres);
2601        if (err < 0)
2602                mlog_errno(err);
2603        if (err < 0 && !status)
2604                status = err;
2605
2606        err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2607                              &OCFS2_I(inode)->ip_rw_lockres);
2608        if (err < 0)
2609                mlog_errno(err);
2610        if (err < 0 && !status)
2611                status = err;
2612
2613        mlog_exit(status);
2614        return status;
2615}
2616
2617static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2618                                      int new_level)
2619{
2620        assert_spin_locked(&lockres->l_lock);
2621
2622        BUG_ON(lockres->l_blocking <= LKM_NLMODE);
2623
2624        if (lockres->l_level <= new_level) {
2625                mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
2626                     lockres->l_level, new_level);
2627                BUG();
2628        }
2629
2630        mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2631             lockres->l_name, new_level, lockres->l_blocking);
2632
2633        lockres->l_action = OCFS2_AST_DOWNCONVERT;
2634        lockres->l_requested = new_level;
2635        lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2636}
2637
2638static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2639                                  struct ocfs2_lock_res *lockres,
2640                                  int new_level,
2641                                  int lvb)
2642{
2643        int ret, dlm_flags = LKM_CONVERT;
2644        enum dlm_status status;
2645
2646        mlog_entry_void();
2647
2648        if (lvb)
2649                dlm_flags |= LKM_VALBLK;
2650
2651        status = dlmlock(osb->dlm,
2652                         new_level,
2653                         &lockres->l_lksb,
2654                         dlm_flags,
2655                         lockres->l_name,
2656                         OCFS2_LOCK_ID_MAX_LEN - 1,
2657                         ocfs2_locking_ast,
2658                         lockres,
2659                         ocfs2_blocking_ast);
2660        if (status != DLM_NORMAL) {
2661                ocfs2_log_dlm_error("dlmlock", status, lockres);
2662                ret = -EINVAL;
2663                ocfs2_recover_from_dlm_error(lockres, 1);
2664                goto bail;
2665        }
2666
2667        ret = 0;
2668bail:
2669        mlog_exit(ret);
2670        return ret;
2671}
2672
2673/* returns 1 when the caller should unlock and call dlmunlock */
2674static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2675                                        struct ocfs2_lock_res *lockres)
2676{
2677        assert_spin_locked(&lockres->l_lock);
2678
2679        mlog_entry_void();
2680        mlog(0, "lock %s\n", lockres->l_name);
2681
2682        if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2683                /* If we're already trying to cancel a lock conversion
2684                 * then just drop the spinlock and allow the caller to
2685                 * requeue this lock. */
2686
2687                mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
2688                return 0;
2689        }
2690
2691        /* were we in a convert when we got the bast fire? */
2692        BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
2693               lockres->l_action != OCFS2_AST_DOWNCONVERT);
2694        /* set things up for the unlockast to know to just
2695         * clear out the ast_action and unset busy, etc. */
2696        lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
2697
2698        mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
2699                        "lock %s, invalid flags: 0x%lx\n",
2700                        lockres->l_name, lockres->l_flags);
2701
2702        return 1;
2703}
2704
2705static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2706                                struct ocfs2_lock_res *lockres)
2707{
2708        int ret;
2709        enum dlm_status status;
2710
2711        mlog_entry_void();
2712        mlog(0, "lock %s\n", lockres->l_name);
2713
2714        ret = 0;
2715        status = dlmunlock(osb->dlm,
2716                           &lockres->l_lksb,
2717                           LKM_CANCEL,
2718                           ocfs2_unlock_ast,
2719                           lockres);
2720        if (status != DLM_NORMAL) {
2721                ocfs2_log_dlm_error("dlmunlock", status, lockres);
2722                ret = -EINVAL;
2723                ocfs2_recover_from_dlm_error(lockres, 0);
2724        }
2725
2726        mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
2727
2728        mlog_exit(ret);
2729        return ret;
2730}
2731
2732static int ocfs2_unblock_lock(struct ocfs2_super *osb,
2733                              struct ocfs2_lock_res *lockres,
2734                              struct ocfs2_unblock_ctl *ctl)
2735{
2736        unsigned long flags;
2737        int blocking;
2738        int new_level;
2739        int ret = 0;
2740        int set_lvb = 0;
2741
2742        mlog_entry_void();
2743
2744        spin_lock_irqsave(&lockres->l_lock, flags);
2745
2746        BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2747
2748recheck:
2749        if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2750                ctl->requeue = 1;
2751                ret = ocfs2_prepare_cancel_convert(osb, lockres);
2752                spin_unlock_irqrestore(&lockres->l_lock, flags);
2753                if (ret) {
2754                        ret = ocfs2_cancel_convert(osb, lockres);
2755                        if (ret < 0)
2756                                mlog_errno(ret);
2757                }
2758                goto leave;
2759        }
2760
2761        /* if we're blocking an exclusive and we have *any* holders,
2762         * then requeue. */
2763        if ((lockres->l_blocking == LKM_EXMODE)
2764            && (lockres->l_ex_holders || lockres->l_ro_holders))
2765                goto leave_requeue;
2766
2767        /* If it's a PR we're blocking, then only
2768         * requeue if we've got any EX holders */
2769        if (lockres->l_blocking == LKM_PRMODE &&
2770            lockres->l_ex_holders)
2771                goto leave_requeue;
2772
2773        /*
2774         * Can we get a lock in this state if the holder counts are
2775         * zero? The meta data unblock code used to check this.
2776         */
2777        if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
2778            && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
2779                goto leave_requeue;
2780
2781        new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2782
2783        if (lockres->l_ops->check_downconvert
2784            && !lockres->l_ops->check_downconvert(lockres, new_level))
2785                goto leave_requeue;
2786
2787        /* If we get here, then we know that there are no more
2788         * incompatible holders (and anyone asking for an incompatible
2789         * lock is blocked). We can now downconvert the lock */
2790        if (!lockres->l_ops->downconvert_worker)
2791                goto downconvert;
2792
2793        /* Some lockres types want to do a bit of work before
2794         * downconverting a lock. Allow that here. The worker function
2795         * may sleep, so we save off a copy of what we're blocking as
2796         * it may change while we're not holding the spin lock. */
2797        blocking = lockres->l_blocking;
2798        spin_unlock_irqrestore(&lockres->l_lock, flags);
2799
2800        ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
2801
2802        if (ctl->unblock_action == UNBLOCK_STOP_POST)
2803                goto leave;
2804
2805        spin_lock_irqsave(&lockres->l_lock, flags);
2806        if (blocking != lockres->l_blocking) {
2807                /* If this changed underneath us, then we can't drop
2808                 * it just yet. */
2809                goto recheck;
2810        }
2811
2812downconvert:
2813        ctl->requeue = 0;
2814
2815        if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2816                if (lockres->l_level == LKM_EXMODE)
2817                        set_lvb = 1;
2818
2819                /*
2820                 * We only set the lvb if the lock has been fully
2821                 * refreshed - otherwise we risk setting stale
2822                 * data. Otherwise, there's no need to actually clear
2823                 * out the lvb here as it's value is still valid.
2824                 */
2825                if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2826                        lockres->l_ops->set_lvb(lockres);
2827        }
2828
2829        ocfs2_prepare_downconvert(lockres, new_level);
2830        spin_unlock_irqrestore(&lockres->l_lock, flags);
2831        ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
2832leave:
2833        mlog_exit(ret);
2834        return ret;
2835
2836leave_requeue:
2837        spin_unlock_irqrestore(&lockres->l_lock, flags);
2838        ctl->requeue = 1;
2839
2840        mlog_exit(0);
2841        return 0;
2842}
2843
2844static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
2845                                     int blocking)
2846{
2847        struct inode *inode;
2848        struct address_space *mapping;
2849
2850        inode = ocfs2_lock_res_inode(lockres);
2851        mapping = inode->i_mapping;
2852
2853        /*
2854         * We need this before the filemap_fdatawrite() so that it can
2855         * transfer the dirty bit from the PTE to the
2856         * page. Unfortunately this means that even for EX->PR
2857         * downconverts, we'll lose our mappings and have to build
2858         * them up again.
2859         */
2860        unmap_mapping_range(mapping, 0, 0, 0);
2861
2862        if (filemap_fdatawrite(mapping)) {
2863                mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
2864                     (unsigned long long)OCFS2_I(inode)->ip_blkno);
2865        }
2866        sync_mapping_buffers(mapping);
2867        if (blocking == LKM_EXMODE) {
2868                truncate_inode_pages(mapping, 0);
2869        } else {
2870                /* We only need to wait on the I/O if we're not also
2871                 * truncating pages because truncate_inode_pages waits
2872                 * for us above. We don't truncate pages if we're
2873                 * blocking anything < EXMODE because we want to keep
2874                 * them around in that case. */
2875                filemap_fdatawait(mapping);
2876        }
2877
2878        return UNBLOCK_CONTINUE;
2879}
2880
2881static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
2882                                        int new_level)
2883{
2884        struct inode *inode = ocfs2_lock_res_inode(lockres);
2885        int checkpointed = ocfs2_inode_fully_checkpointed(inode);
2886
2887        BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
2888        BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed);
2889
2890        if (checkpointed)
2891                return 1;
2892
2893        ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
2894        return 0;
2895}
2896
2897static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
2898{
2899        struct inode *inode = ocfs2_lock_res_inode(lockres);
2900
2901        __ocfs2_stuff_meta_lvb(inode);
2902}
2903
2904/*
2905 * Does the final reference drop on our dentry lock. Right now this
2906 * happens in the vote thread, but we could choose to simplify the
2907 * dlmglue API and push these off to the ocfs2_wq in the future.
2908 */
2909static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
2910                                     struct ocfs2_lock_res *lockres)
2911{
2912        struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2913        ocfs2_dentry_lock_put(osb, dl);
2914}
2915
2916/*
2917 * d_delete() matching dentries before the lock downconvert.
2918 *
2919 * At this point, any process waiting to destroy the
2920 * dentry_lock due to last ref count is stopped by the
2921 * OCFS2_LOCK_QUEUED flag.
2922 *
2923 * We have two potential problems
2924 *
2925 * 1) If we do the last reference drop on our dentry_lock (via dput)
2926 *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
2927 *    the downconvert to finish. Instead we take an elevated
2928 *    reference and push the drop until after we've completed our
2929 *    unblock processing.
2930 *
2931 * 2) There might be another process with a final reference,
2932 *    waiting on us to finish processing. If this is the case, we
2933 *    detect it and exit out - there's no more dentries anyway.
2934 */
2935static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
2936                                       int blocking)
2937{
2938        struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2939        struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
2940        struct dentry *dentry;
2941        unsigned long flags;
2942        int extra_ref = 0;
2943
2944        /*
2945         * This node is blocking another node from getting a read
2946         * lock. This happens when we've renamed within a
2947         * directory. We've forced the other nodes to d_delete(), but
2948         * we never actually dropped our lock because it's still
2949         * valid. The downconvert code will retain a PR for this node,
2950         * so there's no further work to do.
2951         */
2952        if (blocking == LKM_PRMODE)
2953                return UNBLOCK_CONTINUE;
2954
2955        /*
2956         * Mark this inode as potentially orphaned. The code in
2957         * ocfs2_delete_inode() will figure out whether it actually
2958         * needs to be freed or not.
2959         */
2960        spin_lock(&oi->ip_lock);
2961        oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
2962        spin_unlock(&oi->ip_lock);
2963
2964        /*
2965         * Yuck. We need to make sure however that the check of
2966         * OCFS2_LOCK_FREEING and the extra reference are atomic with
2967         * respect to a reference decrement or the setting of that
2968         * flag.
2969         */
2970        spin_lock_irqsave(&lockres->l_lock, flags);
2971        spin_lock(&dentry_attach_lock);
2972        if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
2973            && dl->dl_count) {
2974                dl->dl_count++;
2975                extra_ref = 1;
2976        }
2977        spin_unlock(&dentry_attach_lock);
2978        spin_unlock_irqrestore(&lockres->l_lock, flags);
2979
2980        mlog(0, "extra_ref = %d\n", extra_ref);
2981
2982        /*
2983         * We have a process waiting on us in ocfs2_dentry_iput(),
2984         * which means we can't have any more outstanding
2985         * aliases. There's no need to do any more work.
2986         */
2987        if (!extra_ref)
2988                return UNBLOCK_CONTINUE;
2989
2990        spin_lock(&dentry_attach_lock);
2991        while (1) {
2992                dentry = ocfs2_find_local_alias(dl->dl_inode,
2993                                                dl->dl_parent_blkno, 1);
2994                if (!dentry)
2995                        break;
2996                spin_unlock(&dentry_attach_lock);
2997
2998                mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
2999                     dentry->d_name.name);
3000
3001                /*
3002                 * The following dcache calls may do an
3003                 * iput(). Normally we don't want that from the
3004                 * downconverting thread, but in this case it's ok
3005                 * because the requesting node already has an
3006                 * exclusive lock on the inode, so it can't be queued
3007                 * for a downconvert.
3008                 */
3009                d_delete(dentry);
3010                dput(dentry);
3011
3012                spin_lock(&dentry_attach_lock);
3013        }
3014        spin_unlock(&dentry_attach_lock);
3015
3016        /*
3017         * If we are the last holder of this dentry lock, there is no
3018         * reason to downconvert so skip straight to the unlock.
3019         */
3020        if (dl->dl_count == 1)
3021                return UNBLOCK_STOP_POST;
3022
3023        return UNBLOCK_CONTINUE_POST;
3024}
3025
3026void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3027                                struct ocfs2_lock_res *lockres)
3028{
3029        int status;
3030        struct ocfs2_unblock_ctl ctl = {0, 0,};
3031        unsigned long flags;
3032
3033        /* Our reference to the lockres in this function can be
3034         * considered valid until we remove the OCFS2_LOCK_QUEUED
3035         * flag. */
3036
3037        mlog_entry_void();
3038
3039        BUG_ON(!lockres);
3040        BUG_ON(!lockres->l_ops);
3041
3042        mlog(0, "lockres %s blocked.\n", lockres->l_name);
3043
3044        /* Detect whether a lock has been marked as going away while
3045         * the vote thread was processing other things. A lock can
3046         * still be marked with OCFS2_LOCK_FREEING after this check,
3047         * but short circuiting here will still save us some
3048         * performance. */
3049        spin_lock_irqsave(&lockres->l_lock, flags);
3050        if (lockres->l_flags & OCFS2_LOCK_FREEING)
3051                goto unqueue;
3052        spin_unlock_irqrestore(&lockres->l_lock, flags);
3053
3054        status = ocfs2_unblock_lock(osb, lockres, &ctl);
3055        if (status < 0)
3056                mlog_errno(status);
3057
3058        spin_lock_irqsave(&lockres->l_lock, flags);
3059unqueue:
3060        if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3061                lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3062        } else
3063                ocfs2_schedule_blocked_lock(osb, lockres);
3064
3065        mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
3066             ctl.requeue ? "yes" : "no");
3067        spin_unlock_irqrestore(&lockres->l_lock, flags);
3068
3069        if (ctl.unblock_action != UNBLOCK_CONTINUE
3070            && lockres->l_ops->post_unlock)
3071                lockres->l_ops->post_unlock(osb, lockres);
3072
3073        mlog_exit_void();
3074}
3075
3076static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3077                                        struct ocfs2_lock_res *lockres)
3078{
3079        mlog_entry_void();
3080
3081        assert_spin_locked(&lockres->l_lock);
3082
3083        if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3084                /* Do not schedule a lock for downconvert when it's on
3085                 * the way to destruction - any nodes wanting access
3086                 * to the resource will get it soon. */
3087                mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3088                     lockres->l_name, lockres->l_flags);
3089                return;
3090        }
3091
3092        lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3093
3094        spin_lock(&osb->vote_task_lock);
3095        if (list_empty(&lockres->l_blocked_list)) {
3096                list_add_tail(&lockres->l_blocked_list,
3097                              &osb->blocked_lock_list);
3098                osb->blocked_lock_count++;
3099        }
3100        spin_unlock(&osb->vote_task_lock);
3101
3102        mlog_exit_void();
3103}
3104