linux/fs/dlm/recoverd.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/******************************************************************************
   3*******************************************************************************
   4**
   5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
   7**
   8**
   9*******************************************************************************
  10******************************************************************************/
  11
  12#include "dlm_internal.h"
  13#include "lockspace.h"
  14#include "member.h"
  15#include "dir.h"
  16#include "ast.h"
  17#include "recover.h"
  18#include "lowcomms.h"
  19#include "lock.h"
  20#include "requestqueue.h"
  21#include "recoverd.h"
  22
  23
  24/* If the start for which we're re-enabling locking (seq) has been superseded
  25   by a newer stop (ls_recover_seq), we need to leave locking disabled.
  26
  27   We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees
  28   locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
  29   enables locking and clears the requestqueue between a and b. */
  30
  31static int enable_locking(struct dlm_ls *ls, uint64_t seq)
  32{
  33        int error = -EINTR;
  34
  35        down_write(&ls->ls_recv_active);
  36
  37        spin_lock(&ls->ls_recover_lock);
  38        if (ls->ls_recover_seq == seq) {
  39                set_bit(LSFL_RUNNING, &ls->ls_flags);
  40                /* unblocks processes waiting to enter the dlm */
  41                up_write(&ls->ls_in_recovery);
  42                clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
  43                error = 0;
  44        }
  45        spin_unlock(&ls->ls_recover_lock);
  46
  47        up_write(&ls->ls_recv_active);
  48        return error;
  49}
  50
  51static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
  52{
  53        unsigned long start;
  54        int error, neg = 0;
  55
  56        log_rinfo(ls, "dlm_recover %llu", (unsigned long long)rv->seq);
  57
  58        mutex_lock(&ls->ls_recoverd_active);
  59
  60        dlm_callback_suspend(ls);
  61
  62        dlm_clear_toss(ls);
  63
  64        /*
  65         * This list of root rsb's will be the basis of most of the recovery
  66         * routines.
  67         */
  68
  69        dlm_create_root_list(ls);
  70
  71        /*
  72         * Add or remove nodes from the lockspace's ls_nodes list.
  73         */
  74
  75        error = dlm_recover_members(ls, rv, &neg);
  76        if (error) {
  77                log_rinfo(ls, "dlm_recover_members error %d", error);
  78                goto fail;
  79        }
  80
  81        dlm_recover_dir_nodeid(ls);
  82
  83        ls->ls_recover_dir_sent_res = 0;
  84        ls->ls_recover_dir_sent_msg = 0;
  85        ls->ls_recover_locks_in = 0;
  86
  87        dlm_set_recover_status(ls, DLM_RS_NODES);
  88
  89        error = dlm_recover_members_wait(ls);
  90        if (error) {
  91                log_rinfo(ls, "dlm_recover_members_wait error %d", error);
  92                goto fail;
  93        }
  94
  95        start = jiffies;
  96
  97        /*
  98         * Rebuild our own share of the directory by collecting from all other
  99         * nodes their master rsb names that hash to us.
 100         */
 101
 102        error = dlm_recover_directory(ls);
 103        if (error) {
 104                log_rinfo(ls, "dlm_recover_directory error %d", error);
 105                goto fail;
 106        }
 107
 108        dlm_set_recover_status(ls, DLM_RS_DIR);
 109
 110        error = dlm_recover_directory_wait(ls);
 111        if (error) {
 112                log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
 113                goto fail;
 114        }
 115
 116        log_rinfo(ls, "dlm_recover_directory %u out %u messages",
 117                  ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
 118
 119        /*
 120         * We may have outstanding operations that are waiting for a reply from
 121         * a failed node.  Mark these to be resent after recovery.  Unlock and
 122         * cancel ops can just be completed.
 123         */
 124
 125        dlm_recover_waiters_pre(ls);
 126
 127        error = dlm_recovery_stopped(ls);
 128        if (error)
 129                goto fail;
 130
 131        if (neg || dlm_no_directory(ls)) {
 132                /*
 133                 * Clear lkb's for departed nodes.
 134                 */
 135
 136                dlm_recover_purge(ls);
 137
 138                /*
 139                 * Get new master nodeid's for rsb's that were mastered on
 140                 * departed nodes.
 141                 */
 142
 143                error = dlm_recover_masters(ls);
 144                if (error) {
 145                        log_rinfo(ls, "dlm_recover_masters error %d", error);
 146                        goto fail;
 147                }
 148
 149                /*
 150                 * Send our locks on remastered rsb's to the new masters.
 151                 */
 152
 153                error = dlm_recover_locks(ls);
 154                if (error) {
 155                        log_rinfo(ls, "dlm_recover_locks error %d", error);
 156                        goto fail;
 157                }
 158
 159                dlm_set_recover_status(ls, DLM_RS_LOCKS);
 160
 161                error = dlm_recover_locks_wait(ls);
 162                if (error) {
 163                        log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
 164                        goto fail;
 165                }
 166
 167                log_rinfo(ls, "dlm_recover_locks %u in",
 168                          ls->ls_recover_locks_in);
 169
 170                /*
 171                 * Finalize state in master rsb's now that all locks can be
 172                 * checked.  This includes conversion resolution and lvb
 173                 * settings.
 174                 */
 175
 176                dlm_recover_rsbs(ls);
 177        } else {
 178                /*
 179                 * Other lockspace members may be going through the "neg" steps
 180                 * while also adding us to the lockspace, in which case they'll
 181                 * be doing the recover_locks (RS_LOCKS) barrier.
 182                 */
 183                dlm_set_recover_status(ls, DLM_RS_LOCKS);
 184
 185                error = dlm_recover_locks_wait(ls);
 186                if (error) {
 187                        log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
 188                        goto fail;
 189                }
 190        }
 191
 192        dlm_release_root_list(ls);
 193
 194        /*
 195         * Purge directory-related requests that are saved in requestqueue.
 196         * All dir requests from before recovery are invalid now due to the dir
 197         * rebuild and will be resent by the requesting nodes.
 198         */
 199
 200        dlm_purge_requestqueue(ls);
 201
 202        dlm_set_recover_status(ls, DLM_RS_DONE);
 203
 204        error = dlm_recover_done_wait(ls);
 205        if (error) {
 206                log_rinfo(ls, "dlm_recover_done_wait error %d", error);
 207                goto fail;
 208        }
 209
 210        dlm_clear_members_gone(ls);
 211
 212        dlm_adjust_timeouts(ls);
 213
 214        dlm_callback_resume(ls);
 215
 216        error = enable_locking(ls, rv->seq);
 217        if (error) {
 218                log_rinfo(ls, "enable_locking error %d", error);
 219                goto fail;
 220        }
 221
 222        error = dlm_process_requestqueue(ls);
 223        if (error) {
 224                log_rinfo(ls, "dlm_process_requestqueue error %d", error);
 225                goto fail;
 226        }
 227
 228        error = dlm_recover_waiters_post(ls);
 229        if (error) {
 230                log_rinfo(ls, "dlm_recover_waiters_post error %d", error);
 231                goto fail;
 232        }
 233
 234        dlm_recover_grant(ls);
 235
 236        log_rinfo(ls, "dlm_recover %llu generation %u done: %u ms",
 237                  (unsigned long long)rv->seq, ls->ls_generation,
 238                  jiffies_to_msecs(jiffies - start));
 239        mutex_unlock(&ls->ls_recoverd_active);
 240
 241        dlm_lsop_recover_done(ls);
 242        return 0;
 243
 244 fail:
 245        dlm_release_root_list(ls);
 246        log_rinfo(ls, "dlm_recover %llu error %d",
 247                  (unsigned long long)rv->seq, error);
 248        mutex_unlock(&ls->ls_recoverd_active);
 249        return error;
 250}
 251
 252/* The dlm_ls_start() that created the rv we take here may already have been
 253   stopped via dlm_ls_stop(); in that case we need to leave the RECOVERY_STOP
 254   flag set. */
 255
 256static void do_ls_recovery(struct dlm_ls *ls)
 257{
 258        struct dlm_recover *rv = NULL;
 259
 260        spin_lock(&ls->ls_recover_lock);
 261        rv = ls->ls_recover_args;
 262        ls->ls_recover_args = NULL;
 263        if (rv && ls->ls_recover_seq == rv->seq)
 264                clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 265        spin_unlock(&ls->ls_recover_lock);
 266
 267        if (rv) {
 268                ls_recover(ls, rv);
 269                kfree(rv->nodes);
 270                kfree(rv);
 271        }
 272}
 273
 274static int dlm_recoverd(void *arg)
 275{
 276        struct dlm_ls *ls;
 277
 278        ls = dlm_find_lockspace_local(arg);
 279        if (!ls) {
 280                log_print("dlm_recoverd: no lockspace %p", arg);
 281                return -1;
 282        }
 283
 284        down_write(&ls->ls_in_recovery);
 285        set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
 286        wake_up(&ls->ls_recover_lock_wait);
 287
 288        while (1) {
 289                /*
 290                 * We call kthread_should_stop() after set_current_state().
 291                 * This is because it works correctly if kthread_stop() is
 292                 * called just before set_current_state().
 293                 */
 294                set_current_state(TASK_INTERRUPTIBLE);
 295                if (kthread_should_stop()) {
 296                        set_current_state(TASK_RUNNING);
 297                        break;
 298                }
 299                if (!test_bit(LSFL_RECOVER_WORK, &ls->ls_flags) &&
 300                    !test_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
 301                        if (kthread_should_stop())
 302                                break;
 303                        schedule();
 304                }
 305                set_current_state(TASK_RUNNING);
 306
 307                if (test_and_clear_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
 308                        down_write(&ls->ls_in_recovery);
 309                        set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
 310                        wake_up(&ls->ls_recover_lock_wait);
 311                }
 312
 313                if (test_and_clear_bit(LSFL_RECOVER_WORK, &ls->ls_flags))
 314                        do_ls_recovery(ls);
 315        }
 316
 317        if (test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags))
 318                up_write(&ls->ls_in_recovery);
 319
 320        dlm_put_lockspace(ls);
 321        return 0;
 322}
 323
 324int dlm_recoverd_start(struct dlm_ls *ls)
 325{
 326        struct task_struct *p;
 327        int error = 0;
 328
 329        p = kthread_run(dlm_recoverd, ls, "dlm_recoverd");
 330        if (IS_ERR(p))
 331                error = PTR_ERR(p);
 332        else
 333                ls->ls_recoverd_task = p;
 334        return error;
 335}
 336
 337void dlm_recoverd_stop(struct dlm_ls *ls)
 338{
 339        kthread_stop(ls->ls_recoverd_task);
 340}
 341
 342void dlm_recoverd_suspend(struct dlm_ls *ls)
 343{
 344        wake_up(&ls->ls_wait_general);
 345        mutex_lock(&ls->ls_recoverd_active);
 346}
 347
 348void dlm_recoverd_resume(struct dlm_ls *ls)
 349{
 350        mutex_unlock(&ls->ls_recoverd_active);
 351}
 352
 353