linux/fs/dlm/recoverd.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/******************************************************************************
   3*******************************************************************************
   4**
   5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
   7**
   8**
   9*******************************************************************************
  10******************************************************************************/
  11
  12#include "dlm_internal.h"
  13#include "lockspace.h"
  14#include "member.h"
  15#include "dir.h"
  16#include "ast.h"
  17#include "recover.h"
  18#include "lowcomms.h"
  19#include "lock.h"
  20#include "requestqueue.h"
  21#include "recoverd.h"
  22
  23
  24/* If the start for which we're re-enabling locking (seq) has been superseded
  25   by a newer stop (ls_recover_seq), we need to leave locking disabled.
  26
  27   We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees
  28   locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
  29   enables locking and clears the requestqueue between a and b. */
  30
  31static int enable_locking(struct dlm_ls *ls, uint64_t seq)
  32{
  33        int error = -EINTR;
  34
  35        down_write(&ls->ls_recv_active);
  36
  37        spin_lock(&ls->ls_recover_lock);
  38        if (ls->ls_recover_seq == seq) {
  39                set_bit(LSFL_RUNNING, &ls->ls_flags);
  40                /* unblocks processes waiting to enter the dlm */
  41                up_write(&ls->ls_in_recovery);
  42                clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
  43                error = 0;
  44        }
  45        spin_unlock(&ls->ls_recover_lock);
  46
  47        up_write(&ls->ls_recv_active);
  48        return error;
  49}
  50
  51static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
  52{
  53        unsigned long start;
  54        int error, neg = 0;
  55
  56        log_rinfo(ls, "dlm_recover %llu", (unsigned long long)rv->seq);
  57
  58        mutex_lock(&ls->ls_recoverd_active);
  59
  60        dlm_callback_suspend(ls);
  61
  62        dlm_clear_toss(ls);
  63
  64        /*
  65         * This list of root rsb's will be the basis of most of the recovery
  66         * routines.
  67         */
  68
  69        dlm_create_root_list(ls);
  70
  71        /*
  72         * Add or remove nodes from the lockspace's ls_nodes list.
  73         */
  74
  75        error = dlm_recover_members(ls, rv, &neg);
  76        if (error) {
  77                log_rinfo(ls, "dlm_recover_members error %d", error);
  78                goto fail;
  79        }
  80
  81        dlm_recover_dir_nodeid(ls);
  82
  83        ls->ls_recover_dir_sent_res = 0;
  84        ls->ls_recover_dir_sent_msg = 0;
  85        ls->ls_recover_locks_in = 0;
  86
  87        dlm_set_recover_status(ls, DLM_RS_NODES);
  88
  89        error = dlm_recover_members_wait(ls);
  90        if (error) {
  91                log_rinfo(ls, "dlm_recover_members_wait error %d", error);
  92                goto fail;
  93        }
  94
  95        start = jiffies;
  96
  97        /*
  98         * Rebuild our own share of the directory by collecting from all other
  99         * nodes their master rsb names that hash to us.
 100         */
 101
 102        error = dlm_recover_directory(ls);
 103        if (error) {
 104                log_rinfo(ls, "dlm_recover_directory error %d", error);
 105                goto fail;
 106        }
 107
 108        dlm_set_recover_status(ls, DLM_RS_DIR);
 109
 110        error = dlm_recover_directory_wait(ls);
 111        if (error) {
 112                log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
 113                goto fail;
 114        }
 115
 116        log_rinfo(ls, "dlm_recover_directory %u out %u messages",
 117                  ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
 118
 119        /*
 120         * We may have outstanding operations that are waiting for a reply from
 121         * a failed node.  Mark these to be resent after recovery.  Unlock and
 122         * cancel ops can just be completed.
 123         */
 124
 125        dlm_recover_waiters_pre(ls);
 126
 127        error = dlm_recovery_stopped(ls);
 128        if (error) {
 129                error = -EINTR;
 130                goto fail;
 131        }
 132
 133        if (neg || dlm_no_directory(ls)) {
 134                /*
 135                 * Clear lkb's for departed nodes.
 136                 */
 137
 138                dlm_recover_purge(ls);
 139
 140                /*
 141                 * Get new master nodeid's for rsb's that were mastered on
 142                 * departed nodes.
 143                 */
 144
 145                error = dlm_recover_masters(ls);
 146                if (error) {
 147                        log_rinfo(ls, "dlm_recover_masters error %d", error);
 148                        goto fail;
 149                }
 150
 151                /*
 152                 * Send our locks on remastered rsb's to the new masters.
 153                 */
 154
 155                error = dlm_recover_locks(ls);
 156                if (error) {
 157                        log_rinfo(ls, "dlm_recover_locks error %d", error);
 158                        goto fail;
 159                }
 160
 161                dlm_set_recover_status(ls, DLM_RS_LOCKS);
 162
 163                error = dlm_recover_locks_wait(ls);
 164                if (error) {
 165                        log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
 166                        goto fail;
 167                }
 168
 169                log_rinfo(ls, "dlm_recover_locks %u in",
 170                          ls->ls_recover_locks_in);
 171
 172                /*
 173                 * Finalize state in master rsb's now that all locks can be
 174                 * checked.  This includes conversion resolution and lvb
 175                 * settings.
 176                 */
 177
 178                dlm_recover_rsbs(ls);
 179        } else {
 180                /*
 181                 * Other lockspace members may be going through the "neg" steps
 182                 * while also adding us to the lockspace, in which case they'll
 183                 * be doing the recover_locks (RS_LOCKS) barrier.
 184                 */
 185                dlm_set_recover_status(ls, DLM_RS_LOCKS);
 186
 187                error = dlm_recover_locks_wait(ls);
 188                if (error) {
 189                        log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
 190                        goto fail;
 191                }
 192        }
 193
 194        dlm_release_root_list(ls);
 195
 196        /*
 197         * Purge directory-related requests that are saved in requestqueue.
 198         * All dir requests from before recovery are invalid now due to the dir
 199         * rebuild and will be resent by the requesting nodes.
 200         */
 201
 202        dlm_purge_requestqueue(ls);
 203
 204        dlm_set_recover_status(ls, DLM_RS_DONE);
 205
 206        error = dlm_recover_done_wait(ls);
 207        if (error) {
 208                log_rinfo(ls, "dlm_recover_done_wait error %d", error);
 209                goto fail;
 210        }
 211
 212        dlm_clear_members_gone(ls);
 213
 214        dlm_adjust_timeouts(ls);
 215
 216        dlm_callback_resume(ls);
 217
 218        error = enable_locking(ls, rv->seq);
 219        if (error) {
 220                log_rinfo(ls, "enable_locking error %d", error);
 221                goto fail;
 222        }
 223
 224        error = dlm_process_requestqueue(ls);
 225        if (error) {
 226                log_rinfo(ls, "dlm_process_requestqueue error %d", error);
 227                goto fail;
 228        }
 229
 230        error = dlm_recover_waiters_post(ls);
 231        if (error) {
 232                log_rinfo(ls, "dlm_recover_waiters_post error %d", error);
 233                goto fail;
 234        }
 235
 236        dlm_recover_grant(ls);
 237
 238        log_rinfo(ls, "dlm_recover %llu generation %u done: %u ms",
 239                  (unsigned long long)rv->seq, ls->ls_generation,
 240                  jiffies_to_msecs(jiffies - start));
 241        mutex_unlock(&ls->ls_recoverd_active);
 242
 243        dlm_lsop_recover_done(ls);
 244        return 0;
 245
 246 fail:
 247        dlm_release_root_list(ls);
 248        log_rinfo(ls, "dlm_recover %llu error %d",
 249                  (unsigned long long)rv->seq, error);
 250        mutex_unlock(&ls->ls_recoverd_active);
 251        return error;
 252}
 253
 254/* The dlm_ls_start() that created the rv we take here may already have been
 255   stopped via dlm_ls_stop(); in that case we need to leave the RECOVERY_STOP
 256   flag set. */
 257
 258static void do_ls_recovery(struct dlm_ls *ls)
 259{
 260        struct dlm_recover *rv = NULL;
 261
 262        spin_lock(&ls->ls_recover_lock);
 263        rv = ls->ls_recover_args;
 264        ls->ls_recover_args = NULL;
 265        if (rv && ls->ls_recover_seq == rv->seq)
 266                clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 267        spin_unlock(&ls->ls_recover_lock);
 268
 269        if (rv) {
 270                ls_recover(ls, rv);
 271                kfree(rv->nodes);
 272                kfree(rv);
 273        }
 274}
 275
 276static int dlm_recoverd(void *arg)
 277{
 278        struct dlm_ls *ls;
 279
 280        ls = dlm_find_lockspace_local(arg);
 281        if (!ls) {
 282                log_print("dlm_recoverd: no lockspace %p", arg);
 283                return -1;
 284        }
 285
 286        down_write(&ls->ls_in_recovery);
 287        set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
 288        wake_up(&ls->ls_recover_lock_wait);
 289
 290        while (1) {
 291                /*
 292                 * We call kthread_should_stop() after set_current_state().
 293                 * This is because it works correctly if kthread_stop() is
 294                 * called just before set_current_state().
 295                 */
 296                set_current_state(TASK_INTERRUPTIBLE);
 297                if (kthread_should_stop()) {
 298                        set_current_state(TASK_RUNNING);
 299                        break;
 300                }
 301                if (!test_bit(LSFL_RECOVER_WORK, &ls->ls_flags) &&
 302                    !test_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
 303                        if (kthread_should_stop())
 304                                break;
 305                        schedule();
 306                }
 307                set_current_state(TASK_RUNNING);
 308
 309                if (test_and_clear_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
 310                        down_write(&ls->ls_in_recovery);
 311                        set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
 312                        wake_up(&ls->ls_recover_lock_wait);
 313                }
 314
 315                if (test_and_clear_bit(LSFL_RECOVER_WORK, &ls->ls_flags))
 316                        do_ls_recovery(ls);
 317        }
 318
 319        if (test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags))
 320                up_write(&ls->ls_in_recovery);
 321
 322        dlm_put_lockspace(ls);
 323        return 0;
 324}
 325
 326int dlm_recoverd_start(struct dlm_ls *ls)
 327{
 328        struct task_struct *p;
 329        int error = 0;
 330
 331        p = kthread_run(dlm_recoverd, ls, "dlm_recoverd");
 332        if (IS_ERR(p))
 333                error = PTR_ERR(p);
 334        else
 335                ls->ls_recoverd_task = p;
 336        return error;
 337}
 338
 339void dlm_recoverd_stop(struct dlm_ls *ls)
 340{
 341        kthread_stop(ls->ls_recoverd_task);
 342}
 343
 344void dlm_recoverd_suspend(struct dlm_ls *ls)
 345{
 346        wake_up(&ls->ls_wait_general);
 347        mutex_lock(&ls->ls_recoverd_active);
 348}
 349
 350void dlm_recoverd_resume(struct dlm_ls *ls)
 351{
 352        mutex_unlock(&ls->ls_recoverd_active);
 353}
 354
 355