linux/fs/gfs2/lock_dlm.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/*
   3 * Copyright (C) Sistina Software, Inc.  1997-2003 All rights reserved.
   4 * Copyright 2004-2011 Red Hat, Inc.
   5 */
   6
   7#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
   8
   9#include <linux/fs.h>
  10#include <linux/dlm.h>
  11#include <linux/slab.h>
  12#include <linux/types.h>
  13#include <linux/delay.h>
  14#include <linux/gfs2_ondisk.h>
  15#include <linux/sched/signal.h>
  16
  17#include "incore.h"
  18#include "glock.h"
  19#include "glops.h"
  20#include "recovery.h"
  21#include "util.h"
  22#include "sys.h"
  23#include "trace_gfs2.h"
  24
  25/**
  26 * gfs2_update_stats - Update time based stats
  27 * @mv: Pointer to mean/variance structure to update
  28 * @sample: New data to include
  29 *
  30 * @delta is the difference between the current rtt sample and the
  31 * running average srtt. We add 1/8 of that to the srtt in order to
  32 * update the current srtt estimate. The variance estimate is a bit
  33 * more complicated. We subtract the current variance estimate from
  34 * the abs value of the @delta and add 1/4 of that to the running
  35 * total.  That's equivalent to 3/4 of the current variance
  36 * estimate plus 1/4 of the abs of @delta.
  37 *
  38 * Note that the index points at the array entry containing the smoothed
  39 * mean value, and the variance is always in the following entry
  40 *
  41 * Reference: TCP/IP Illustrated, vol 2, p. 831,832
  42 * All times are in units of integer nanoseconds. Unlike the TCP/IP case,
  43 * they are not scaled fixed point.
  44 */
  45
  46static inline void gfs2_update_stats(struct gfs2_lkstats *s, unsigned index,
  47                                     s64 sample)
  48{
  49        s64 delta = sample - s->stats[index];
  50        s->stats[index] += (delta >> 3);
  51        index++;
  52        s->stats[index] += (s64)(abs(delta) - s->stats[index]) >> 2;
  53}
  54
  55/**
  56 * gfs2_update_reply_times - Update locking statistics
  57 * @gl: The glock to update
  58 *
  59 * This assumes that gl->gl_dstamp has been set earlier.
  60 *
  61 * The rtt (lock round trip time) is an estimate of the time
  62 * taken to perform a dlm lock request. We update it on each
  63 * reply from the dlm.
  64 *
  65 * The blocking flag is set on the glock for all dlm requests
  66 * which may potentially block due to lock requests from other nodes.
  67 * DLM requests where the current lock state is exclusive, the
  68 * requested state is null (or unlocked) or where the TRY or
  69 * TRY_1CB flags are set are classified as non-blocking. All
  70 * other DLM requests are counted as (potentially) blocking.
  71 */
  72static inline void gfs2_update_reply_times(struct gfs2_glock *gl)
  73{
  74        struct gfs2_pcpu_lkstats *lks;
  75        const unsigned gltype = gl->gl_name.ln_type;
  76        unsigned index = test_bit(GLF_BLOCKING, &gl->gl_flags) ?
  77                         GFS2_LKS_SRTTB : GFS2_LKS_SRTT;
  78        s64 rtt;
  79
  80        preempt_disable();
  81        rtt = ktime_to_ns(ktime_sub(ktime_get_real(), gl->gl_dstamp));
  82        lks = this_cpu_ptr(gl->gl_name.ln_sbd->sd_lkstats);
  83        gfs2_update_stats(&gl->gl_stats, index, rtt);           /* Local */
  84        gfs2_update_stats(&lks->lkstats[gltype], index, rtt);   /* Global */
  85        preempt_enable();
  86
  87        trace_gfs2_glock_lock_time(gl, rtt);
  88}
  89
  90/**
  91 * gfs2_update_request_times - Update locking statistics
  92 * @gl: The glock to update
  93 *
  94 * The irt (lock inter-request times) measures the average time
  95 * between requests to the dlm. It is updated immediately before
  96 * each dlm call.
  97 */
  98
  99static inline void gfs2_update_request_times(struct gfs2_glock *gl)
 100{
 101        struct gfs2_pcpu_lkstats *lks;
 102        const unsigned gltype = gl->gl_name.ln_type;
 103        ktime_t dstamp;
 104        s64 irt;
 105
 106        preempt_disable();
 107        dstamp = gl->gl_dstamp;
 108        gl->gl_dstamp = ktime_get_real();
 109        irt = ktime_to_ns(ktime_sub(gl->gl_dstamp, dstamp));
 110        lks = this_cpu_ptr(gl->gl_name.ln_sbd->sd_lkstats);
 111        gfs2_update_stats(&gl->gl_stats, GFS2_LKS_SIRT, irt);           /* Local */
 112        gfs2_update_stats(&lks->lkstats[gltype], GFS2_LKS_SIRT, irt);   /* Global */
 113        preempt_enable();
 114}
 115 
 116static void gdlm_ast(void *arg)
 117{
 118        struct gfs2_glock *gl = arg;
 119        unsigned ret = gl->gl_state;
 120
 121        gfs2_update_reply_times(gl);
 122        BUG_ON(gl->gl_lksb.sb_flags & DLM_SBF_DEMOTED);
 123
 124        if ((gl->gl_lksb.sb_flags & DLM_SBF_VALNOTVALID) && gl->gl_lksb.sb_lvbptr)
 125                memset(gl->gl_lksb.sb_lvbptr, 0, GDLM_LVB_SIZE);
 126
 127        switch (gl->gl_lksb.sb_status) {
 128        case -DLM_EUNLOCK: /* Unlocked, so glock can be freed */
 129                if (gl->gl_ops->go_free)
 130                        gl->gl_ops->go_free(gl);
 131                gfs2_glock_free(gl);
 132                return;
 133        case -DLM_ECANCEL: /* Cancel while getting lock */
 134                ret |= LM_OUT_CANCELED;
 135                goto out;
 136        case -EAGAIN: /* Try lock fails */
 137        case -EDEADLK: /* Deadlock detected */
 138                goto out;
 139        case -ETIMEDOUT: /* Canceled due to timeout */
 140                ret |= LM_OUT_ERROR;
 141                goto out;
 142        case 0: /* Success */
 143                break;
 144        default: /* Something unexpected */
 145                BUG();
 146        }
 147
 148        ret = gl->gl_req;
 149        if (gl->gl_lksb.sb_flags & DLM_SBF_ALTMODE) {
 150                if (gl->gl_req == LM_ST_SHARED)
 151                        ret = LM_ST_DEFERRED;
 152                else if (gl->gl_req == LM_ST_DEFERRED)
 153                        ret = LM_ST_SHARED;
 154                else
 155                        BUG();
 156        }
 157
 158        set_bit(GLF_INITIAL, &gl->gl_flags);
 159        gfs2_glock_complete(gl, ret);
 160        return;
 161out:
 162        if (!test_bit(GLF_INITIAL, &gl->gl_flags))
 163                gl->gl_lksb.sb_lkid = 0;
 164        gfs2_glock_complete(gl, ret);
 165}
 166
 167static void gdlm_bast(void *arg, int mode)
 168{
 169        struct gfs2_glock *gl = arg;
 170
 171        switch (mode) {
 172        case DLM_LOCK_EX:
 173                gfs2_glock_cb(gl, LM_ST_UNLOCKED);
 174                break;
 175        case DLM_LOCK_CW:
 176                gfs2_glock_cb(gl, LM_ST_DEFERRED);
 177                break;
 178        case DLM_LOCK_PR:
 179                gfs2_glock_cb(gl, LM_ST_SHARED);
 180                break;
 181        default:
 182                fs_err(gl->gl_name.ln_sbd, "unknown bast mode %d\n", mode);
 183                BUG();
 184        }
 185}
 186
 187/* convert gfs lock-state to dlm lock-mode */
 188
 189static int make_mode(struct gfs2_sbd *sdp, const unsigned int lmstate)
 190{
 191        switch (lmstate) {
 192        case LM_ST_UNLOCKED:
 193                return DLM_LOCK_NL;
 194        case LM_ST_EXCLUSIVE:
 195                return DLM_LOCK_EX;
 196        case LM_ST_DEFERRED:
 197                return DLM_LOCK_CW;
 198        case LM_ST_SHARED:
 199                return DLM_LOCK_PR;
 200        }
 201        fs_err(sdp, "unknown LM state %d\n", lmstate);
 202        BUG();
 203        return -1;
 204}
 205
 206static u32 make_flags(struct gfs2_glock *gl, const unsigned int gfs_flags,
 207                      const int req)
 208{
 209        u32 lkf = 0;
 210
 211        if (gl->gl_lksb.sb_lvbptr)
 212                lkf |= DLM_LKF_VALBLK;
 213
 214        if (gfs_flags & LM_FLAG_TRY)
 215                lkf |= DLM_LKF_NOQUEUE;
 216
 217        if (gfs_flags & LM_FLAG_TRY_1CB) {
 218                lkf |= DLM_LKF_NOQUEUE;
 219                lkf |= DLM_LKF_NOQUEUEBAST;
 220        }
 221
 222        if (gfs_flags & LM_FLAG_PRIORITY) {
 223                lkf |= DLM_LKF_NOORDER;
 224                lkf |= DLM_LKF_HEADQUE;
 225        }
 226
 227        if (gfs_flags & LM_FLAG_ANY) {
 228                if (req == DLM_LOCK_PR)
 229                        lkf |= DLM_LKF_ALTCW;
 230                else if (req == DLM_LOCK_CW)
 231                        lkf |= DLM_LKF_ALTPR;
 232                else
 233                        BUG();
 234        }
 235
 236        if (gl->gl_lksb.sb_lkid != 0) {
 237                lkf |= DLM_LKF_CONVERT;
 238                if (test_bit(GLF_BLOCKING, &gl->gl_flags))
 239                        lkf |= DLM_LKF_QUECVT;
 240        }
 241
 242        return lkf;
 243}
 244
 245static void gfs2_reverse_hex(char *c, u64 value)
 246{
 247        *c = '0';
 248        while (value) {
 249                *c-- = hex_asc[value & 0x0f];
 250                value >>= 4;
 251        }
 252}
 253
 254static int gdlm_lock(struct gfs2_glock *gl, unsigned int req_state,
 255                     unsigned int flags)
 256{
 257        struct lm_lockstruct *ls = &gl->gl_name.ln_sbd->sd_lockstruct;
 258        int req;
 259        u32 lkf;
 260        char strname[GDLM_STRNAME_BYTES] = "";
 261
 262        req = make_mode(gl->gl_name.ln_sbd, req_state);
 263        lkf = make_flags(gl, flags, req);
 264        gfs2_glstats_inc(gl, GFS2_LKS_DCOUNT);
 265        gfs2_sbstats_inc(gl, GFS2_LKS_DCOUNT);
 266        if (gl->gl_lksb.sb_lkid) {
 267                gfs2_update_request_times(gl);
 268        } else {
 269                memset(strname, ' ', GDLM_STRNAME_BYTES - 1);
 270                strname[GDLM_STRNAME_BYTES - 1] = '\0';
 271                gfs2_reverse_hex(strname + 7, gl->gl_name.ln_type);
 272                gfs2_reverse_hex(strname + 23, gl->gl_name.ln_number);
 273                gl->gl_dstamp = ktime_get_real();
 274        }
 275        /*
 276         * Submit the actual lock request.
 277         */
 278
 279        return dlm_lock(ls->ls_dlm, req, &gl->gl_lksb, lkf, strname,
 280                        GDLM_STRNAME_BYTES - 1, 0, gdlm_ast, gl, gdlm_bast);
 281}
 282
 283static void gdlm_put_lock(struct gfs2_glock *gl)
 284{
 285        struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
 286        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 287        int lvb_needs_unlock = 0;
 288        int error;
 289
 290        if (gl->gl_lksb.sb_lkid == 0) {
 291                gfs2_glock_free(gl);
 292                return;
 293        }
 294
 295        clear_bit(GLF_BLOCKING, &gl->gl_flags);
 296        gfs2_glstats_inc(gl, GFS2_LKS_DCOUNT);
 297        gfs2_sbstats_inc(gl, GFS2_LKS_DCOUNT);
 298        gfs2_update_request_times(gl);
 299
 300        /* don't want to skip dlm_unlock writing the lvb when lock is ex */
 301
 302        if (gl->gl_lksb.sb_lvbptr && (gl->gl_state == LM_ST_EXCLUSIVE))
 303                lvb_needs_unlock = 1;
 304
 305        if (test_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags) &&
 306            !lvb_needs_unlock) {
 307                gfs2_glock_free(gl);
 308                return;
 309        }
 310
 311        error = dlm_unlock(ls->ls_dlm, gl->gl_lksb.sb_lkid, DLM_LKF_VALBLK,
 312                           NULL, gl);
 313        if (error) {
 314                fs_err(sdp, "gdlm_unlock %x,%llx err=%d\n",
 315                       gl->gl_name.ln_type,
 316                       (unsigned long long)gl->gl_name.ln_number, error);
 317                return;
 318        }
 319}
 320
 321static void gdlm_cancel(struct gfs2_glock *gl)
 322{
 323        struct lm_lockstruct *ls = &gl->gl_name.ln_sbd->sd_lockstruct;
 324        dlm_unlock(ls->ls_dlm, gl->gl_lksb.sb_lkid, DLM_LKF_CANCEL, NULL, gl);
 325}
 326
 327/*
 328 * dlm/gfs2 recovery coordination using dlm_recover callbacks
 329 *
 330 *  0. gfs2 checks for another cluster node withdraw, needing journal replay
 331 *  1. dlm_controld sees lockspace members change
 332 *  2. dlm_controld blocks dlm-kernel locking activity
 333 *  3. dlm_controld within dlm-kernel notifies gfs2 (recover_prep)
 334 *  4. dlm_controld starts and finishes its own user level recovery
 335 *  5. dlm_controld starts dlm-kernel dlm_recoverd to do kernel recovery
 336 *  6. dlm_recoverd notifies gfs2 of failed nodes (recover_slot)
 337 *  7. dlm_recoverd does its own lock recovery
 338 *  8. dlm_recoverd unblocks dlm-kernel locking activity
 339 *  9. dlm_recoverd notifies gfs2 when done (recover_done with new generation)
 340 * 10. gfs2_control updates control_lock lvb with new generation and jid bits
 341 * 11. gfs2_control enqueues journals for gfs2_recover to recover (maybe none)
 342 * 12. gfs2_recover dequeues and recovers journals of failed nodes
 343 * 13. gfs2_recover provides recovery results to gfs2_control (recovery_result)
 344 * 14. gfs2_control updates control_lock lvb jid bits for recovered journals
 345 * 15. gfs2_control unblocks normal locking when all journals are recovered
 346 *
 347 * - failures during recovery
 348 *
 349 * recover_prep() may set BLOCK_LOCKS (step 3) again before gfs2_control
 350 * clears BLOCK_LOCKS (step 15), e.g. another node fails while still
 351 * recovering for a prior failure.  gfs2_control needs a way to detect
 352 * this so it can leave BLOCK_LOCKS set in step 15.  This is managed using
 353 * the recover_block and recover_start values.
 354 *
 355 * recover_done() provides a new lockspace generation number each time it
 356 * is called (step 9).  This generation number is saved as recover_start.
 357 * When recover_prep() is called, it sets BLOCK_LOCKS and sets
 358 * recover_block = recover_start.  So, while recover_block is equal to
 359 * recover_start, BLOCK_LOCKS should remain set.  (recover_spin must
 360 * be held around the BLOCK_LOCKS/recover_block/recover_start logic.)
 361 *
 362 * - more specific gfs2 steps in sequence above
 363 *
 364 *  3. recover_prep sets BLOCK_LOCKS and sets recover_block = recover_start
 365 *  6. recover_slot records any failed jids (maybe none)
 366 *  9. recover_done sets recover_start = new generation number
 367 * 10. gfs2_control sets control_lock lvb = new gen + bits for failed jids
 368 * 12. gfs2_recover does journal recoveries for failed jids identified above
 369 * 14. gfs2_control clears control_lock lvb bits for recovered jids
 370 * 15. gfs2_control checks if recover_block == recover_start (step 3 occured
 371 *     again) then do nothing, otherwise if recover_start > recover_block
 372 *     then clear BLOCK_LOCKS.
 373 *
 374 * - parallel recovery steps across all nodes
 375 *
 376 * All nodes attempt to update the control_lock lvb with the new generation
 377 * number and jid bits, but only the first to get the control_lock EX will
 378 * do so; others will see that it's already done (lvb already contains new
 379 * generation number.)
 380 *
 381 * . All nodes get the same recover_prep/recover_slot/recover_done callbacks
 382 * . All nodes attempt to set control_lock lvb gen + bits for the new gen
 383 * . One node gets control_lock first and writes the lvb, others see it's done
 384 * . All nodes attempt to recover jids for which they see control_lock bits set
 385 * . One node succeeds for a jid, and that one clears the jid bit in the lvb
 386 * . All nodes will eventually see all lvb bits clear and unblock locks
 387 *
 388 * - is there a problem with clearing an lvb bit that should be set
 389 *   and missing a journal recovery?
 390 *
 391 * 1. jid fails
 392 * 2. lvb bit set for step 1
 393 * 3. jid recovered for step 1
 394 * 4. jid taken again (new mount)
 395 * 5. jid fails (for step 4)
 396 * 6. lvb bit set for step 5 (will already be set)
 397 * 7. lvb bit cleared for step 3
 398 *
 399 * This is not a problem because the failure in step 5 does not
 400 * require recovery, because the mount in step 4 could not have
 401 * progressed far enough to unblock locks and access the fs.  The
 402 * control_mount() function waits for all recoveries to be complete
 403 * for the latest lockspace generation before ever unblocking locks
 404 * and returning.  The mount in step 4 waits until the recovery in
 405 * step 1 is done.
 406 *
 407 * - special case of first mounter: first node to mount the fs
 408 *
 409 * The first node to mount a gfs2 fs needs to check all the journals
 410 * and recover any that need recovery before other nodes are allowed
 411 * to mount the fs.  (Others may begin mounting, but they must wait
 412 * for the first mounter to be done before taking locks on the fs
 413 * or accessing the fs.)  This has two parts:
 414 *
 415 * 1. The mounted_lock tells a node it's the first to mount the fs.
 416 * Each node holds the mounted_lock in PR while it's mounted.
 417 * Each node tries to acquire the mounted_lock in EX when it mounts.
 418 * If a node is granted the mounted_lock EX it means there are no
 419 * other mounted nodes (no PR locks exist), and it is the first mounter.
 420 * The mounted_lock is demoted to PR when first recovery is done, so
 421 * others will fail to get an EX lock, but will get a PR lock.
 422 *
 423 * 2. The control_lock blocks others in control_mount() while the first
 424 * mounter is doing first mount recovery of all journals.
 425 * A mounting node needs to acquire control_lock in EX mode before
 426 * it can proceed.  The first mounter holds control_lock in EX while doing
 427 * the first mount recovery, blocking mounts from other nodes, then demotes
 428 * control_lock to NL when it's done (others_may_mount/first_done),
 429 * allowing other nodes to continue mounting.
 430 *
 431 * first mounter:
 432 * control_lock EX/NOQUEUE success
 433 * mounted_lock EX/NOQUEUE success (no other PR, so no other mounters)
 434 * set first=1
 435 * do first mounter recovery
 436 * mounted_lock EX->PR
 437 * control_lock EX->NL, write lvb generation
 438 *
 439 * other mounter:
 440 * control_lock EX/NOQUEUE success (if fail -EAGAIN, retry)
 441 * mounted_lock EX/NOQUEUE fail -EAGAIN (expected due to other mounters PR)
 442 * mounted_lock PR/NOQUEUE success
 443 * read lvb generation
 444 * control_lock EX->NL
 445 * set first=0
 446 *
 447 * - mount during recovery
 448 *
 449 * If a node mounts while others are doing recovery (not first mounter),
 450 * the mounting node will get its initial recover_done() callback without
 451 * having seen any previous failures/callbacks.
 452 *
 453 * It must wait for all recoveries preceding its mount to be finished
 454 * before it unblocks locks.  It does this by repeating the "other mounter"
 455 * steps above until the lvb generation number is >= its mount generation
 456 * number (from initial recover_done) and all lvb bits are clear.
 457 *
 458 * - control_lock lvb format
 459 *
 460 * 4 bytes generation number: the latest dlm lockspace generation number
 461 * from recover_done callback.  Indicates the jid bitmap has been updated
 462 * to reflect all slot failures through that generation.
 463 * 4 bytes unused.
 464 * GDLM_LVB_SIZE-8 bytes of jid bit map. If bit N is set, it indicates
 465 * that jid N needs recovery.
 466 */
 467
 468#define JID_BITMAP_OFFSET 8 /* 4 byte generation number + 4 byte unused */
 469
 470static void control_lvb_read(struct lm_lockstruct *ls, uint32_t *lvb_gen,
 471                             char *lvb_bits)
 472{
 473        __le32 gen;
 474        memcpy(lvb_bits, ls->ls_control_lvb, GDLM_LVB_SIZE);
 475        memcpy(&gen, lvb_bits, sizeof(__le32));
 476        *lvb_gen = le32_to_cpu(gen);
 477}
 478
 479static void control_lvb_write(struct lm_lockstruct *ls, uint32_t lvb_gen,
 480                              char *lvb_bits)
 481{
 482        __le32 gen;
 483        memcpy(ls->ls_control_lvb, lvb_bits, GDLM_LVB_SIZE);
 484        gen = cpu_to_le32(lvb_gen);
 485        memcpy(ls->ls_control_lvb, &gen, sizeof(__le32));
 486}
 487
 488static int all_jid_bits_clear(char *lvb)
 489{
 490        return !memchr_inv(lvb + JID_BITMAP_OFFSET, 0,
 491                        GDLM_LVB_SIZE - JID_BITMAP_OFFSET);
 492}
 493
 494static void sync_wait_cb(void *arg)
 495{
 496        struct lm_lockstruct *ls = arg;
 497        complete(&ls->ls_sync_wait);
 498}
 499
 500static int sync_unlock(struct gfs2_sbd *sdp, struct dlm_lksb *lksb, char *name)
 501{
 502        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 503        int error;
 504
 505        error = dlm_unlock(ls->ls_dlm, lksb->sb_lkid, 0, lksb, ls);
 506        if (error) {
 507                fs_err(sdp, "%s lkid %x error %d\n",
 508                       name, lksb->sb_lkid, error);
 509                return error;
 510        }
 511
 512        wait_for_completion(&ls->ls_sync_wait);
 513
 514        if (lksb->sb_status != -DLM_EUNLOCK) {
 515                fs_err(sdp, "%s lkid %x status %d\n",
 516                       name, lksb->sb_lkid, lksb->sb_status);
 517                return -1;
 518        }
 519        return 0;
 520}
 521
 522static int sync_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags,
 523                     unsigned int num, struct dlm_lksb *lksb, char *name)
 524{
 525        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 526        char strname[GDLM_STRNAME_BYTES];
 527        int error, status;
 528
 529        memset(strname, 0, GDLM_STRNAME_BYTES);
 530        snprintf(strname, GDLM_STRNAME_BYTES, "%8x%16x", LM_TYPE_NONDISK, num);
 531
 532        error = dlm_lock(ls->ls_dlm, mode, lksb, flags,
 533                         strname, GDLM_STRNAME_BYTES - 1,
 534                         0, sync_wait_cb, ls, NULL);
 535        if (error) {
 536                fs_err(sdp, "%s lkid %x flags %x mode %d error %d\n",
 537                       name, lksb->sb_lkid, flags, mode, error);
 538                return error;
 539        }
 540
 541        wait_for_completion(&ls->ls_sync_wait);
 542
 543        status = lksb->sb_status;
 544
 545        if (status && status != -EAGAIN) {
 546                fs_err(sdp, "%s lkid %x flags %x mode %d status %d\n",
 547                       name, lksb->sb_lkid, flags, mode, status);
 548        }
 549
 550        return status;
 551}
 552
 553static int mounted_unlock(struct gfs2_sbd *sdp)
 554{
 555        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 556        return sync_unlock(sdp, &ls->ls_mounted_lksb, "mounted_lock");
 557}
 558
 559static int mounted_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags)
 560{
 561        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 562        return sync_lock(sdp, mode, flags, GFS2_MOUNTED_LOCK,
 563                         &ls->ls_mounted_lksb, "mounted_lock");
 564}
 565
 566static int control_unlock(struct gfs2_sbd *sdp)
 567{
 568        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 569        return sync_unlock(sdp, &ls->ls_control_lksb, "control_lock");
 570}
 571
 572static int control_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags)
 573{
 574        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 575        return sync_lock(sdp, mode, flags, GFS2_CONTROL_LOCK,
 576                         &ls->ls_control_lksb, "control_lock");
 577}
 578
 579/**
 580 * remote_withdraw - react to a node withdrawing from the file system
 581 * @sdp: The superblock
 582 */
 583static void remote_withdraw(struct gfs2_sbd *sdp)
 584{
 585        struct gfs2_jdesc *jd;
 586        int ret = 0, count = 0;
 587
 588        list_for_each_entry(jd, &sdp->sd_jindex_list, jd_list) {
 589                if (jd->jd_jid == sdp->sd_lockstruct.ls_jid)
 590                        continue;
 591                ret = gfs2_recover_journal(jd, true);
 592                if (ret)
 593                        break;
 594                count++;
 595        }
 596
 597        /* Now drop the additional reference we acquired */
 598        fs_err(sdp, "Journals checked: %d, ret = %d.\n", count, ret);
 599}
 600
 601static void gfs2_control_func(struct work_struct *work)
 602{
 603        struct gfs2_sbd *sdp = container_of(work, struct gfs2_sbd, sd_control_work.work);
 604        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 605        uint32_t block_gen, start_gen, lvb_gen, flags;
 606        int recover_set = 0;
 607        int write_lvb = 0;
 608        int recover_size;
 609        int i, error;
 610
 611        /* First check for other nodes that may have done a withdraw. */
 612        if (test_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags)) {
 613                remote_withdraw(sdp);
 614                clear_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags);
 615                return;
 616        }
 617
 618        spin_lock(&ls->ls_recover_spin);
 619        /*
 620         * No MOUNT_DONE means we're still mounting; control_mount()
 621         * will set this flag, after which this thread will take over
 622         * all further clearing of BLOCK_LOCKS.
 623         *
 624         * FIRST_MOUNT means this node is doing first mounter recovery,
 625         * for which recovery control is handled by
 626         * control_mount()/control_first_done(), not this thread.
 627         */
 628        if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
 629             test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
 630                spin_unlock(&ls->ls_recover_spin);
 631                return;
 632        }
 633        block_gen = ls->ls_recover_block;
 634        start_gen = ls->ls_recover_start;
 635        spin_unlock(&ls->ls_recover_spin);
 636
 637        /*
 638         * Equal block_gen and start_gen implies we are between
 639         * recover_prep and recover_done callbacks, which means
 640         * dlm recovery is in progress and dlm locking is blocked.
 641         * There's no point trying to do any work until recover_done.
 642         */
 643
 644        if (block_gen == start_gen)
 645                return;
 646
 647        /*
 648         * Propagate recover_submit[] and recover_result[] to lvb:
 649         * dlm_recoverd adds to recover_submit[] jids needing recovery
 650         * gfs2_recover adds to recover_result[] journal recovery results
 651         *
 652         * set lvb bit for jids in recover_submit[] if the lvb has not
 653         * yet been updated for the generation of the failure
 654         *
 655         * clear lvb bit for jids in recover_result[] if the result of
 656         * the journal recovery is SUCCESS
 657         */
 658
 659        error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
 660        if (error) {
 661                fs_err(sdp, "control lock EX error %d\n", error);
 662                return;
 663        }
 664
 665        control_lvb_read(ls, &lvb_gen, ls->ls_lvb_bits);
 666
 667        spin_lock(&ls->ls_recover_spin);
 668        if (block_gen != ls->ls_recover_block ||
 669            start_gen != ls->ls_recover_start) {
 670                fs_info(sdp, "recover generation %u block1 %u %u\n",
 671                        start_gen, block_gen, ls->ls_recover_block);
 672                spin_unlock(&ls->ls_recover_spin);
 673                control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
 674                return;
 675        }
 676
 677        recover_size = ls->ls_recover_size;
 678
 679        if (lvb_gen <= start_gen) {
 680                /*
 681                 * Clear lvb bits for jids we've successfully recovered.
 682                 * Because all nodes attempt to recover failed journals,
 683                 * a journal can be recovered multiple times successfully
 684                 * in succession.  Only the first will really do recovery,
 685                 * the others find it clean, but still report a successful
 686                 * recovery.  So, another node may have already recovered
 687                 * the jid and cleared the lvb bit for it.
 688                 */
 689                for (i = 0; i < recover_size; i++) {
 690                        if (ls->ls_recover_result[i] != LM_RD_SUCCESS)
 691                                continue;
 692
 693                        ls->ls_recover_result[i] = 0;
 694
 695                        if (!test_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET))
 696                                continue;
 697
 698                        __clear_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET);
 699                        write_lvb = 1;
 700                }
 701        }
 702
 703        if (lvb_gen == start_gen) {
 704                /*
 705                 * Failed slots before start_gen are already set in lvb.
 706                 */
 707                for (i = 0; i < recover_size; i++) {
 708                        if (!ls->ls_recover_submit[i])
 709                                continue;
 710                        if (ls->ls_recover_submit[i] < lvb_gen)
 711                                ls->ls_recover_submit[i] = 0;
 712                }
 713        } else if (lvb_gen < start_gen) {
 714                /*
 715                 * Failed slots before start_gen are not yet set in lvb.
 716                 */
 717                for (i = 0; i < recover_size; i++) {
 718                        if (!ls->ls_recover_submit[i])
 719                                continue;
 720                        if (ls->ls_recover_submit[i] < start_gen) {
 721                                ls->ls_recover_submit[i] = 0;
 722                                __set_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET);
 723                        }
 724                }
 725                /* even if there are no bits to set, we need to write the
 726                   latest generation to the lvb */
 727                write_lvb = 1;
 728        } else {
 729                /*
 730                 * we should be getting a recover_done() for lvb_gen soon
 731                 */
 732        }
 733        spin_unlock(&ls->ls_recover_spin);
 734
 735        if (write_lvb) {
 736                control_lvb_write(ls, start_gen, ls->ls_lvb_bits);
 737                flags = DLM_LKF_CONVERT | DLM_LKF_VALBLK;
 738        } else {
 739                flags = DLM_LKF_CONVERT;
 740        }
 741
 742        error = control_lock(sdp, DLM_LOCK_NL, flags);
 743        if (error) {
 744                fs_err(sdp, "control lock NL error %d\n", error);
 745                return;
 746        }
 747
 748        /*
 749         * Everyone will see jid bits set in the lvb, run gfs2_recover_set(),
 750         * and clear a jid bit in the lvb if the recovery is a success.
 751         * Eventually all journals will be recovered, all jid bits will
 752         * be cleared in the lvb, and everyone will clear BLOCK_LOCKS.
 753         */
 754
 755        for (i = 0; i < recover_size; i++) {
 756                if (test_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET)) {
 757                        fs_info(sdp, "recover generation %u jid %d\n",
 758                                start_gen, i);
 759                        gfs2_recover_set(sdp, i);
 760                        recover_set++;
 761                }
 762        }
 763        if (recover_set)
 764                return;
 765
 766        /*
 767         * No more jid bits set in lvb, all recovery is done, unblock locks
 768         * (unless a new recover_prep callback has occured blocking locks
 769         * again while working above)
 770         */
 771
 772        spin_lock(&ls->ls_recover_spin);
 773        if (ls->ls_recover_block == block_gen &&
 774            ls->ls_recover_start == start_gen) {
 775                clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
 776                spin_unlock(&ls->ls_recover_spin);
 777                fs_info(sdp, "recover generation %u done\n", start_gen);
 778                gfs2_glock_thaw(sdp);
 779        } else {
 780                fs_info(sdp, "recover generation %u block2 %u %u\n",
 781                        start_gen, block_gen, ls->ls_recover_block);
 782                spin_unlock(&ls->ls_recover_spin);
 783        }
 784}
 785
 786static int control_mount(struct gfs2_sbd *sdp)
 787{
 788        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 789        uint32_t start_gen, block_gen, mount_gen, lvb_gen;
 790        int mounted_mode;
 791        int retries = 0;
 792        int error;
 793
 794        memset(&ls->ls_mounted_lksb, 0, sizeof(struct dlm_lksb));
 795        memset(&ls->ls_control_lksb, 0, sizeof(struct dlm_lksb));
 796        memset(&ls->ls_control_lvb, 0, GDLM_LVB_SIZE);
 797        ls->ls_control_lksb.sb_lvbptr = ls->ls_control_lvb;
 798        init_completion(&ls->ls_sync_wait);
 799
 800        set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
 801
 802        error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_VALBLK);
 803        if (error) {
 804                fs_err(sdp, "control_mount control_lock NL error %d\n", error);
 805                return error;
 806        }
 807
 808        error = mounted_lock(sdp, DLM_LOCK_NL, 0);
 809        if (error) {
 810                fs_err(sdp, "control_mount mounted_lock NL error %d\n", error);
 811                control_unlock(sdp);
 812                return error;
 813        }
 814        mounted_mode = DLM_LOCK_NL;
 815
 816restart:
 817        if (retries++ && signal_pending(current)) {
 818                error = -EINTR;
 819                goto fail;
 820        }
 821
 822        /*
 823         * We always start with both locks in NL. control_lock is
 824         * demoted to NL below so we don't need to do it here.
 825         */
 826
 827        if (mounted_mode != DLM_LOCK_NL) {
 828                error = mounted_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
 829                if (error)
 830                        goto fail;
 831                mounted_mode = DLM_LOCK_NL;
 832        }
 833
 834        /*
 835         * Other nodes need to do some work in dlm recovery and gfs2_control
 836         * before the recover_done and control_lock will be ready for us below.
 837         * A delay here is not required but often avoids having to retry.
 838         */
 839
 840        msleep_interruptible(500);
 841
 842        /*
 843         * Acquire control_lock in EX and mounted_lock in either EX or PR.
 844         * control_lock lvb keeps track of any pending journal recoveries.
 845         * mounted_lock indicates if any other nodes have the fs mounted.
 846         */
 847
 848        error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE|DLM_LKF_VALBLK);
 849        if (error == -EAGAIN) {
 850                goto restart;
 851        } else if (error) {
 852                fs_err(sdp, "control_mount control_lock EX error %d\n", error);
 853                goto fail;
 854        }
 855
 856        /**
 857         * If we're a spectator, we don't want to take the lock in EX because
 858         * we cannot do the first-mount responsibility it implies: recovery.
 859         */
 860        if (sdp->sd_args.ar_spectator)
 861                goto locks_done;
 862
 863        error = mounted_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE);
 864        if (!error) {
 865                mounted_mode = DLM_LOCK_EX;
 866                goto locks_done;
 867        } else if (error != -EAGAIN) {
 868                fs_err(sdp, "control_mount mounted_lock EX error %d\n", error);
 869                goto fail;
 870        }
 871
 872        error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE);
 873        if (!error) {
 874                mounted_mode = DLM_LOCK_PR;
 875                goto locks_done;
 876        } else {
 877                /* not even -EAGAIN should happen here */
 878                fs_err(sdp, "control_mount mounted_lock PR error %d\n", error);
 879                goto fail;
 880        }
 881
 882locks_done:
 883        /*
 884         * If we got both locks above in EX, then we're the first mounter.
 885         * If not, then we need to wait for the control_lock lvb to be
 886         * updated by other mounted nodes to reflect our mount generation.
 887         *
 888         * In simple first mounter cases, first mounter will see zero lvb_gen,
 889         * but in cases where all existing nodes leave/fail before mounting
 890         * nodes finish control_mount, then all nodes will be mounting and
 891         * lvb_gen will be non-zero.
 892         */
 893
 894        control_lvb_read(ls, &lvb_gen, ls->ls_lvb_bits);
 895
 896        if (lvb_gen == 0xFFFFFFFF) {
 897                /* special value to force mount attempts to fail */
 898                fs_err(sdp, "control_mount control_lock disabled\n");
 899                error = -EINVAL;
 900                goto fail;
 901        }
 902
 903        if (mounted_mode == DLM_LOCK_EX) {
 904                /* first mounter, keep both EX while doing first recovery */
 905                spin_lock(&ls->ls_recover_spin);
 906                clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
 907                set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags);
 908                set_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
 909                spin_unlock(&ls->ls_recover_spin);
 910                fs_info(sdp, "first mounter control generation %u\n", lvb_gen);
 911                return 0;
 912        }
 913
 914        error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
 915        if (error)
 916                goto fail;
 917
 918        /*
 919         * We are not first mounter, now we need to wait for the control_lock
 920         * lvb generation to be >= the generation from our first recover_done
 921         * and all lvb bits to be clear (no pending journal recoveries.)
 922         */
 923
 924        if (!all_jid_bits_clear(ls->ls_lvb_bits)) {
 925                /* journals need recovery, wait until all are clear */
 926                fs_info(sdp, "control_mount wait for journal recovery\n");
 927                goto restart;
 928        }
 929
 930        spin_lock(&ls->ls_recover_spin);
 931        block_gen = ls->ls_recover_block;
 932        start_gen = ls->ls_recover_start;
 933        mount_gen = ls->ls_recover_mount;
 934
 935        if (lvb_gen < mount_gen) {
 936                /* wait for mounted nodes to update control_lock lvb to our
 937                   generation, which might include new recovery bits set */
 938                if (sdp->sd_args.ar_spectator) {
 939                        fs_info(sdp, "Recovery is required. Waiting for a "
 940                                "non-spectator to mount.\n");
 941                        msleep_interruptible(1000);
 942                } else {
 943                        fs_info(sdp, "control_mount wait1 block %u start %u "
 944                                "mount %u lvb %u flags %lx\n", block_gen,
 945                                start_gen, mount_gen, lvb_gen,
 946                                ls->ls_recover_flags);
 947                }
 948                spin_unlock(&ls->ls_recover_spin);
 949                goto restart;
 950        }
 951
 952        if (lvb_gen != start_gen) {
 953                /* wait for mounted nodes to update control_lock lvb to the
 954                   latest recovery generation */
 955                fs_info(sdp, "control_mount wait2 block %u start %u mount %u "
 956                        "lvb %u flags %lx\n", block_gen, start_gen, mount_gen,
 957                        lvb_gen, ls->ls_recover_flags);
 958                spin_unlock(&ls->ls_recover_spin);
 959                goto restart;
 960        }
 961
 962        if (block_gen == start_gen) {
 963                /* dlm recovery in progress, wait for it to finish */
 964                fs_info(sdp, "control_mount wait3 block %u start %u mount %u "
 965                        "lvb %u flags %lx\n", block_gen, start_gen, mount_gen,
 966                        lvb_gen, ls->ls_recover_flags);
 967                spin_unlock(&ls->ls_recover_spin);
 968                goto restart;
 969        }
 970
 971        clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
 972        set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags);
 973        memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t));
 974        memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t));
 975        spin_unlock(&ls->ls_recover_spin);
 976        return 0;
 977
 978fail:
 979        mounted_unlock(sdp);
 980        control_unlock(sdp);
 981        return error;
 982}
 983
 984static int control_first_done(struct gfs2_sbd *sdp)
 985{
 986        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 987        uint32_t start_gen, block_gen;
 988        int error;
 989
 990restart:
 991        spin_lock(&ls->ls_recover_spin);
 992        start_gen = ls->ls_recover_start;
 993        block_gen = ls->ls_recover_block;
 994
 995        if (test_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags) ||
 996            !test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
 997            !test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
 998                /* sanity check, should not happen */
 999                fs_err(sdp, "control_first_done start %u block %u flags %lx\n",
1000                       start_gen, block_gen, ls->ls_recover_flags);
1001                spin_unlock(&ls->ls_recover_spin);
1002                control_unlock(sdp);
1003                return -1;
1004        }
1005
1006        if (start_gen == block_gen) {
1007                /*
1008                 * Wait for the end of a dlm recovery cycle to switch from
1009                 * first mounter recovery.  We can ignore any recover_slot
1010                 * callbacks between the recover_prep and next recover_done
1011                 * because we are still the first mounter and any failed nodes
1012                 * have not fully mounted, so they don't need recovery.
1013                 */
1014                spin_unlock(&ls->ls_recover_spin);
1015                fs_info(sdp, "control_first_done wait gen %u\n", start_gen);
1016
1017                wait_on_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY,
1018                            TASK_UNINTERRUPTIBLE);
1019                goto restart;
1020        }
1021
1022        clear_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
1023        set_bit(DFL_FIRST_MOUNT_DONE, &ls->ls_recover_flags);
1024        memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t));
1025        memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t));
1026        spin_unlock(&ls->ls_recover_spin);
1027
1028        memset(ls->ls_lvb_bits, 0, GDLM_LVB_SIZE);
1029        control_lvb_write(ls, start_gen, ls->ls_lvb_bits);
1030
1031        error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT);
1032        if (error)
1033                fs_err(sdp, "control_first_done mounted PR error %d\n", error);
1034
1035        error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
1036        if (error)
1037                fs_err(sdp, "control_first_done control NL error %d\n", error);
1038
1039        return error;
1040}
1041
1042/*
1043 * Expand static jid arrays if necessary (by increments of RECOVER_SIZE_INC)
1044 * to accomodate the largest slot number.  (NB dlm slot numbers start at 1,
1045 * gfs2 jids start at 0, so jid = slot - 1)
1046 */
1047
1048#define RECOVER_SIZE_INC 16
1049
1050static int set_recover_size(struct gfs2_sbd *sdp, struct dlm_slot *slots,
1051                            int num_slots)
1052{
1053        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1054        uint32_t *submit = NULL;
1055        uint32_t *result = NULL;
1056        uint32_t old_size, new_size;
1057        int i, max_jid;
1058
1059        if (!ls->ls_lvb_bits) {
1060                ls->ls_lvb_bits = kzalloc(GDLM_LVB_SIZE, GFP_NOFS);
1061                if (!ls->ls_lvb_bits)
1062                        return -ENOMEM;
1063        }
1064
1065        max_jid = 0;
1066        for (i = 0; i < num_slots; i++) {
1067                if (max_jid < slots[i].slot - 1)
1068                        max_jid = slots[i].slot - 1;
1069        }
1070
1071        old_size = ls->ls_recover_size;
1072        new_size = old_size;
1073        while (new_size < max_jid + 1)
1074                new_size += RECOVER_SIZE_INC;
1075        if (new_size == old_size)
1076                return 0;
1077
1078        submit = kcalloc(new_size, sizeof(uint32_t), GFP_NOFS);
1079        result = kcalloc(new_size, sizeof(uint32_t), GFP_NOFS);
1080        if (!submit || !result) {
1081                kfree(submit);
1082                kfree(result);
1083                return -ENOMEM;
1084        }
1085
1086        spin_lock(&ls->ls_recover_spin);
1087        memcpy(submit, ls->ls_recover_submit, old_size * sizeof(uint32_t));
1088        memcpy(result, ls->ls_recover_result, old_size * sizeof(uint32_t));
1089        kfree(ls->ls_recover_submit);
1090        kfree(ls->ls_recover_result);
1091        ls->ls_recover_submit = submit;
1092        ls->ls_recover_result = result;
1093        ls->ls_recover_size = new_size;
1094        spin_unlock(&ls->ls_recover_spin);
1095        return 0;
1096}
1097
1098static void free_recover_size(struct lm_lockstruct *ls)
1099{
1100        kfree(ls->ls_lvb_bits);
1101        kfree(ls->ls_recover_submit);
1102        kfree(ls->ls_recover_result);
1103        ls->ls_recover_submit = NULL;
1104        ls->ls_recover_result = NULL;
1105        ls->ls_recover_size = 0;
1106        ls->ls_lvb_bits = NULL;
1107}
1108
1109/* dlm calls before it does lock recovery */
1110
1111static void gdlm_recover_prep(void *arg)
1112{
1113        struct gfs2_sbd *sdp = arg;
1114        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1115
1116        if (gfs2_withdrawn(sdp)) {
1117                fs_err(sdp, "recover_prep ignored due to withdraw.\n");
1118                return;
1119        }
1120        spin_lock(&ls->ls_recover_spin);
1121        ls->ls_recover_block = ls->ls_recover_start;
1122        set_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags);
1123
1124        if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
1125             test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
1126                spin_unlock(&ls->ls_recover_spin);
1127                return;
1128        }
1129        set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
1130        spin_unlock(&ls->ls_recover_spin);
1131}
1132
1133/* dlm calls after recover_prep has been completed on all lockspace members;
1134   identifies slot/jid of failed member */
1135
1136static void gdlm_recover_slot(void *arg, struct dlm_slot *slot)
1137{
1138        struct gfs2_sbd *sdp = arg;
1139        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1140        int jid = slot->slot - 1;
1141
1142        if (gfs2_withdrawn(sdp)) {
1143                fs_err(sdp, "recover_slot jid %d ignored due to withdraw.\n",
1144                       jid);
1145                return;
1146        }
1147        spin_lock(&ls->ls_recover_spin);
1148        if (ls->ls_recover_size < jid + 1) {
1149                fs_err(sdp, "recover_slot jid %d gen %u short size %d\n",
1150                       jid, ls->ls_recover_block, ls->ls_recover_size);
1151                spin_unlock(&ls->ls_recover_spin);
1152                return;
1153        }
1154
1155        if (ls->ls_recover_submit[jid]) {
1156                fs_info(sdp, "recover_slot jid %d gen %u prev %u\n",
1157                        jid, ls->ls_recover_block, ls->ls_recover_submit[jid]);
1158        }
1159        ls->ls_recover_submit[jid] = ls->ls_recover_block;
1160        spin_unlock(&ls->ls_recover_spin);
1161}
1162
1163/* dlm calls after recover_slot and after it completes lock recovery */
1164
1165static void gdlm_recover_done(void *arg, struct dlm_slot *slots, int num_slots,
1166                              int our_slot, uint32_t generation)
1167{
1168        struct gfs2_sbd *sdp = arg;
1169        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1170
1171        if (gfs2_withdrawn(sdp)) {
1172                fs_err(sdp, "recover_done ignored due to withdraw.\n");
1173                return;
1174        }
1175        /* ensure the ls jid arrays are large enough */
1176        set_recover_size(sdp, slots, num_slots);
1177
1178        spin_lock(&ls->ls_recover_spin);
1179        ls->ls_recover_start = generation;
1180
1181        if (!ls->ls_recover_mount) {
1182                ls->ls_recover_mount = generation;
1183                ls->ls_jid = our_slot - 1;
1184        }
1185
1186        if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags))
1187                queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work, 0);
1188
1189        clear_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags);
1190        smp_mb__after_atomic();
1191        wake_up_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY);
1192        spin_unlock(&ls->ls_recover_spin);
1193}
1194
1195/* gfs2_recover thread has a journal recovery result */
1196
1197static void gdlm_recovery_result(struct gfs2_sbd *sdp, unsigned int jid,
1198                                 unsigned int result)
1199{
1200        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1201
1202        if (gfs2_withdrawn(sdp)) {
1203                fs_err(sdp, "recovery_result jid %d ignored due to withdraw.\n",
1204                       jid);
1205                return;
1206        }
1207        if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1208                return;
1209
1210        /* don't care about the recovery of own journal during mount */
1211        if (jid == ls->ls_jid)
1212                return;
1213
1214        spin_lock(&ls->ls_recover_spin);
1215        if (test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
1216                spin_unlock(&ls->ls_recover_spin);
1217                return;
1218        }
1219        if (ls->ls_recover_size < jid + 1) {
1220                fs_err(sdp, "recovery_result jid %d short size %d\n",
1221                       jid, ls->ls_recover_size);
1222                spin_unlock(&ls->ls_recover_spin);
1223                return;
1224        }
1225
1226        fs_info(sdp, "recover jid %d result %s\n", jid,
1227                result == LM_RD_GAVEUP ? "busy" : "success");
1228
1229        ls->ls_recover_result[jid] = result;
1230
1231        /* GAVEUP means another node is recovering the journal; delay our
1232           next attempt to recover it, to give the other node a chance to
1233           finish before trying again */
1234
1235        if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags))
1236                queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work,
1237                                   result == LM_RD_GAVEUP ? HZ : 0);
1238        spin_unlock(&ls->ls_recover_spin);
1239}
1240
1241static const struct dlm_lockspace_ops gdlm_lockspace_ops = {
1242        .recover_prep = gdlm_recover_prep,
1243        .recover_slot = gdlm_recover_slot,
1244        .recover_done = gdlm_recover_done,
1245};
1246
1247static int gdlm_mount(struct gfs2_sbd *sdp, const char *table)
1248{
1249        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1250        char cluster[GFS2_LOCKNAME_LEN];
1251        const char *fsname;
1252        uint32_t flags;
1253        int error, ops_result;
1254
1255        /*
1256         * initialize everything
1257         */
1258
1259        INIT_DELAYED_WORK(&sdp->sd_control_work, gfs2_control_func);
1260        spin_lock_init(&ls->ls_recover_spin);
1261        ls->ls_recover_flags = 0;
1262        ls->ls_recover_mount = 0;
1263        ls->ls_recover_start = 0;
1264        ls->ls_recover_block = 0;
1265        ls->ls_recover_size = 0;
1266        ls->ls_recover_submit = NULL;
1267        ls->ls_recover_result = NULL;
1268        ls->ls_lvb_bits = NULL;
1269
1270        error = set_recover_size(sdp, NULL, 0);
1271        if (error)
1272                goto fail;
1273
1274        /*
1275         * prepare dlm_new_lockspace args
1276         */
1277
1278        fsname = strchr(table, ':');
1279        if (!fsname) {
1280                fs_info(sdp, "no fsname found\n");
1281                error = -EINVAL;
1282                goto fail_free;
1283        }
1284        memset(cluster, 0, sizeof(cluster));
1285        memcpy(cluster, table, strlen(table) - strlen(fsname));
1286        fsname++;
1287
1288        flags = DLM_LSFL_FS | DLM_LSFL_NEWEXCL;
1289
1290        /*
1291         * create/join lockspace
1292         */
1293
1294        error = dlm_new_lockspace(fsname, cluster, flags, GDLM_LVB_SIZE,
1295                                  &gdlm_lockspace_ops, sdp, &ops_result,
1296                                  &ls->ls_dlm);
1297        if (error) {
1298                fs_err(sdp, "dlm_new_lockspace error %d\n", error);
1299                goto fail_free;
1300        }
1301
1302        if (ops_result < 0) {
1303                /*
1304                 * dlm does not support ops callbacks,
1305                 * old dlm_controld/gfs_controld are used, try without ops.
1306                 */
1307                fs_info(sdp, "dlm lockspace ops not used\n");
1308                free_recover_size(ls);
1309                set_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags);
1310                return 0;
1311        }
1312
1313        if (!test_bit(SDF_NOJOURNALID, &sdp->sd_flags)) {
1314                fs_err(sdp, "dlm lockspace ops disallow jid preset\n");
1315                error = -EINVAL;
1316                goto fail_release;
1317        }
1318
1319        /*
1320         * control_mount() uses control_lock to determine first mounter,
1321         * and for later mounts, waits for any recoveries to be cleared.
1322         */
1323
1324        error = control_mount(sdp);
1325        if (error) {
1326                fs_err(sdp, "mount control error %d\n", error);
1327                goto fail_release;
1328        }
1329
1330        ls->ls_first = !!test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
1331        clear_bit(SDF_NOJOURNALID, &sdp->sd_flags);
1332        smp_mb__after_atomic();
1333        wake_up_bit(&sdp->sd_flags, SDF_NOJOURNALID);
1334        return 0;
1335
1336fail_release:
1337        dlm_release_lockspace(ls->ls_dlm, 2);
1338fail_free:
1339        free_recover_size(ls);
1340fail:
1341        return error;
1342}
1343
1344static void gdlm_first_done(struct gfs2_sbd *sdp)
1345{
1346        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1347        int error;
1348
1349        if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1350                return;
1351
1352        error = control_first_done(sdp);
1353        if (error)
1354                fs_err(sdp, "mount first_done error %d\n", error);
1355}
1356
1357static void gdlm_unmount(struct gfs2_sbd *sdp)
1358{
1359        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1360
1361        if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1362                goto release;
1363
1364        /* wait for gfs2_control_wq to be done with this mount */
1365
1366        spin_lock(&ls->ls_recover_spin);
1367        set_bit(DFL_UNMOUNT, &ls->ls_recover_flags);
1368        spin_unlock(&ls->ls_recover_spin);
1369        flush_delayed_work(&sdp->sd_control_work);
1370
1371        /* mounted_lock and control_lock will be purged in dlm recovery */
1372release:
1373        if (ls->ls_dlm) {
1374                dlm_release_lockspace(ls->ls_dlm, 2);
1375                ls->ls_dlm = NULL;
1376        }
1377
1378        free_recover_size(ls);
1379}
1380
1381static const match_table_t dlm_tokens = {
1382        { Opt_jid, "jid=%d"},
1383        { Opt_id, "id=%d"},
1384        { Opt_first, "first=%d"},
1385        { Opt_nodir, "nodir=%d"},
1386        { Opt_err, NULL },
1387};
1388
1389const struct lm_lockops gfs2_dlm_ops = {
1390        .lm_proto_name = "lock_dlm",
1391        .lm_mount = gdlm_mount,
1392        .lm_first_done = gdlm_first_done,
1393        .lm_recovery_result = gdlm_recovery_result,
1394        .lm_unmount = gdlm_unmount,
1395        .lm_put_lock = gdlm_put_lock,
1396        .lm_lock = gdlm_lock,
1397        .lm_cancel = gdlm_cancel,
1398        .lm_tokens = &dlm_tokens,
1399};
1400
1401