linux/fs/dlm/recoverd.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14#include "dlm_internal.h"
  15#include "lockspace.h"
  16#include "member.h"
  17#include "dir.h"
  18#include "ast.h"
  19#include "recover.h"
  20#include "lowcomms.h"
  21#include "lock.h"
  22#include "requestqueue.h"
  23#include "recoverd.h"
  24
  25
  26/* If the start for which we're re-enabling locking (seq) has been superseded
  27   by a newer stop (ls_recover_seq), we need to leave locking disabled.
  28
  29   We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees
  30   locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
  31   enables locking and clears the requestqueue between a and b. */
  32
  33static int enable_locking(struct dlm_ls *ls, uint64_t seq)
  34{
  35        int error = -EINTR;
  36
  37        down_write(&ls->ls_recv_active);
  38
  39        spin_lock(&ls->ls_recover_lock);
  40        if (ls->ls_recover_seq == seq) {
  41                set_bit(LSFL_RUNNING, &ls->ls_flags);
  42                /* unblocks processes waiting to enter the dlm */
  43                up_write(&ls->ls_in_recovery);
  44                clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
  45                error = 0;
  46        }
  47        spin_unlock(&ls->ls_recover_lock);
  48
  49        up_write(&ls->ls_recv_active);
  50        return error;
  51}
  52
  53static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
  54{
  55        unsigned long start;
  56        int error, neg = 0;
  57
  58        log_rinfo(ls, "dlm_recover %llu", (unsigned long long)rv->seq);
  59
  60        mutex_lock(&ls->ls_recoverd_active);
  61
  62        dlm_callback_suspend(ls);
  63
  64        dlm_clear_toss(ls);
  65
  66        /*
  67         * This list of root rsb's will be the basis of most of the recovery
  68         * routines.
  69         */
  70
  71        dlm_create_root_list(ls);
  72
  73        /*
  74         * Add or remove nodes from the lockspace's ls_nodes list.
  75         */
  76
  77        error = dlm_recover_members(ls, rv, &neg);
  78        if (error) {
  79                log_rinfo(ls, "dlm_recover_members error %d", error);
  80                goto fail;
  81        }
  82
  83        dlm_recover_dir_nodeid(ls);
  84
  85        ls->ls_recover_dir_sent_res = 0;
  86        ls->ls_recover_dir_sent_msg = 0;
  87        ls->ls_recover_locks_in = 0;
  88
  89        dlm_set_recover_status(ls, DLM_RS_NODES);
  90
  91        error = dlm_recover_members_wait(ls);
  92        if (error) {
  93                log_rinfo(ls, "dlm_recover_members_wait error %d", error);
  94                goto fail;
  95        }
  96
  97        start = jiffies;
  98
  99        /*
 100         * Rebuild our own share of the directory by collecting from all other
 101         * nodes their master rsb names that hash to us.
 102         */
 103
 104        error = dlm_recover_directory(ls);
 105        if (error) {
 106                log_rinfo(ls, "dlm_recover_directory error %d", error);
 107                goto fail;
 108        }
 109
 110        dlm_set_recover_status(ls, DLM_RS_DIR);
 111
 112        error = dlm_recover_directory_wait(ls);
 113        if (error) {
 114                log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
 115                goto fail;
 116        }
 117
 118        log_rinfo(ls, "dlm_recover_directory %u out %u messages",
 119                  ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
 120
 121        /*
 122         * We may have outstanding operations that are waiting for a reply from
 123         * a failed node.  Mark these to be resent after recovery.  Unlock and
 124         * cancel ops can just be completed.
 125         */
 126
 127        dlm_recover_waiters_pre(ls);
 128
 129        error = dlm_recovery_stopped(ls);
 130        if (error)
 131                goto fail;
 132
 133        if (neg || dlm_no_directory(ls)) {
 134                /*
 135                 * Clear lkb's for departed nodes.
 136                 */
 137
 138                dlm_recover_purge(ls);
 139
 140                /*
 141                 * Get new master nodeid's for rsb's that were mastered on
 142                 * departed nodes.
 143                 */
 144
 145                error = dlm_recover_masters(ls);
 146                if (error) {
 147                        log_rinfo(ls, "dlm_recover_masters error %d", error);
 148                        goto fail;
 149                }
 150
 151                /*
 152                 * Send our locks on remastered rsb's to the new masters.
 153                 */
 154
 155                error = dlm_recover_locks(ls);
 156                if (error) {
 157                        log_rinfo(ls, "dlm_recover_locks error %d", error);
 158                        goto fail;
 159                }
 160
 161                dlm_set_recover_status(ls, DLM_RS_LOCKS);
 162
 163                error = dlm_recover_locks_wait(ls);
 164                if (error) {
 165                        log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
 166                        goto fail;
 167                }
 168
 169                log_rinfo(ls, "dlm_recover_locks %u in",
 170                          ls->ls_recover_locks_in);
 171
 172                /*
 173                 * Finalize state in master rsb's now that all locks can be
 174                 * checked.  This includes conversion resolution and lvb
 175                 * settings.
 176                 */
 177
 178                dlm_recover_rsbs(ls);
 179        } else {
 180                /*
 181                 * Other lockspace members may be going through the "neg" steps
 182                 * while also adding us to the lockspace, in which case they'll
 183                 * be doing the recover_locks (RS_LOCKS) barrier.
 184                 */
 185                dlm_set_recover_status(ls, DLM_RS_LOCKS);
 186
 187                error = dlm_recover_locks_wait(ls);
 188                if (error) {
 189                        log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
 190                        goto fail;
 191                }
 192        }
 193
 194        dlm_release_root_list(ls);
 195
 196        /*
 197         * Purge directory-related requests that are saved in requestqueue.
 198         * All dir requests from before recovery are invalid now due to the dir
 199         * rebuild and will be resent by the requesting nodes.
 200         */
 201
 202        dlm_purge_requestqueue(ls);
 203
 204        dlm_set_recover_status(ls, DLM_RS_DONE);
 205
 206        error = dlm_recover_done_wait(ls);
 207        if (error) {
 208                log_rinfo(ls, "dlm_recover_done_wait error %d", error);
 209                goto fail;
 210        }
 211
 212        dlm_clear_members_gone(ls);
 213
 214        dlm_adjust_timeouts(ls);
 215
 216        dlm_callback_resume(ls);
 217
 218        error = enable_locking(ls, rv->seq);
 219        if (error) {
 220                log_rinfo(ls, "enable_locking error %d", error);
 221                goto fail;
 222        }
 223
 224        error = dlm_process_requestqueue(ls);
 225        if (error) {
 226                log_rinfo(ls, "dlm_process_requestqueue error %d", error);
 227                goto fail;
 228        }
 229
 230        error = dlm_recover_waiters_post(ls);
 231        if (error) {
 232                log_rinfo(ls, "dlm_recover_waiters_post error %d", error);
 233                goto fail;
 234        }
 235
 236        dlm_recover_grant(ls);
 237
 238        log_rinfo(ls, "dlm_recover %llu generation %u done: %u ms",
 239                  (unsigned long long)rv->seq, ls->ls_generation,
 240                  jiffies_to_msecs(jiffies - start));
 241        mutex_unlock(&ls->ls_recoverd_active);
 242
 243        dlm_lsop_recover_done(ls);
 244        return 0;
 245
 246 fail:
 247        dlm_release_root_list(ls);
 248        log_rinfo(ls, "dlm_recover %llu error %d",
 249                  (unsigned long long)rv->seq, error);
 250        mutex_unlock(&ls->ls_recoverd_active);
 251        return error;
 252}
 253
 254/* The dlm_ls_start() that created the rv we take here may already have been
 255   stopped via dlm_ls_stop(); in that case we need to leave the RECOVERY_STOP
 256   flag set. */
 257
 258static void do_ls_recovery(struct dlm_ls *ls)
 259{
 260        struct dlm_recover *rv = NULL;
 261
 262        spin_lock(&ls->ls_recover_lock);
 263        rv = ls->ls_recover_args;
 264        ls->ls_recover_args = NULL;
 265        if (rv && ls->ls_recover_seq == rv->seq)
 266                clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 267        spin_unlock(&ls->ls_recover_lock);
 268
 269        if (rv) {
 270                ls_recover(ls, rv);
 271                kfree(rv->nodes);
 272                kfree(rv);
 273        }
 274}
 275
 276static int dlm_recoverd(void *arg)
 277{
 278        struct dlm_ls *ls;
 279
 280        ls = dlm_find_lockspace_local(arg);
 281        if (!ls) {
 282                log_print("dlm_recoverd: no lockspace %p", arg);
 283                return -1;
 284        }
 285
 286        down_write(&ls->ls_in_recovery);
 287        set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
 288        wake_up(&ls->ls_recover_lock_wait);
 289
 290        while (!kthread_should_stop()) {
 291                set_current_state(TASK_INTERRUPTIBLE);
 292                if (!test_bit(LSFL_RECOVER_WORK, &ls->ls_flags) &&
 293                    !test_bit(LSFL_RECOVER_DOWN, &ls->ls_flags))
 294                        schedule();
 295                set_current_state(TASK_RUNNING);
 296
 297                if (test_and_clear_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
 298                        down_write(&ls->ls_in_recovery);
 299                        set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
 300                        wake_up(&ls->ls_recover_lock_wait);
 301                }
 302
 303                if (test_and_clear_bit(LSFL_RECOVER_WORK, &ls->ls_flags))
 304                        do_ls_recovery(ls);
 305        }
 306
 307        if (test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags))
 308                up_write(&ls->ls_in_recovery);
 309
 310        dlm_put_lockspace(ls);
 311        return 0;
 312}
 313
 314int dlm_recoverd_start(struct dlm_ls *ls)
 315{
 316        struct task_struct *p;
 317        int error = 0;
 318
 319        p = kthread_run(dlm_recoverd, ls, "dlm_recoverd");
 320        if (IS_ERR(p))
 321                error = PTR_ERR(p);
 322        else
 323                ls->ls_recoverd_task = p;
 324        return error;
 325}
 326
 327void dlm_recoverd_stop(struct dlm_ls *ls)
 328{
 329        kthread_stop(ls->ls_recoverd_task);
 330}
 331
 332void dlm_recoverd_suspend(struct dlm_ls *ls)
 333{
 334        wake_up(&ls->ls_wait_general);
 335        mutex_lock(&ls->ls_recoverd_active);
 336}
 337
 338void dlm_recoverd_resume(struct dlm_ls *ls)
 339{
 340        mutex_unlock(&ls->ls_recoverd_active);
 341}
 342
 343