linux/fs/dlm/recover.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/******************************************************************************
   3*******************************************************************************
   4**
   5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
   7**
   8**
   9*******************************************************************************
  10******************************************************************************/
  11
  12#include "dlm_internal.h"
  13#include "lockspace.h"
  14#include "dir.h"
  15#include "config.h"
  16#include "ast.h"
  17#include "memory.h"
  18#include "rcom.h"
  19#include "lock.h"
  20#include "lowcomms.h"
  21#include "member.h"
  22#include "recover.h"
  23
  24
  25/*
  26 * Recovery waiting routines: these functions wait for a particular reply from
  27 * a remote node, or for the remote node to report a certain status.  They need
  28 * to abort if the lockspace is stopped indicating a node has failed (perhaps
  29 * the one being waited for).
  30 */
  31
  32/*
  33 * Wait until given function returns non-zero or lockspace is stopped
  34 * (LS_RECOVERY_STOP set due to failure of a node in ls_nodes).  When another
  35 * function thinks it could have completed the waited-on task, they should wake
  36 * up ls_wait_general to get an immediate response rather than waiting for the
  37 * timeout.  This uses a timeout so it can check periodically if the wait
  38 * should abort due to node failure (which doesn't cause a wake_up).
  39 * This should only be called by the dlm_recoverd thread.
  40 */
  41
  42int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
  43{
  44        int error = 0;
  45        int rv;
  46
  47        while (1) {
  48                rv = wait_event_timeout(ls->ls_wait_general,
  49                                        testfn(ls) || dlm_recovery_stopped(ls),
  50                                        dlm_config.ci_recover_timer * HZ);
  51                if (rv)
  52                        break;
  53                if (test_bit(LSFL_RCOM_WAIT, &ls->ls_flags)) {
  54                        log_debug(ls, "dlm_wait_function timed out");
  55                        return -ETIMEDOUT;
  56                }
  57        }
  58
  59        if (dlm_recovery_stopped(ls)) {
  60                log_debug(ls, "dlm_wait_function aborted");
  61                error = -EINTR;
  62        }
  63        return error;
  64}
  65
  66/*
  67 * An efficient way for all nodes to wait for all others to have a certain
  68 * status.  The node with the lowest nodeid polls all the others for their
  69 * status (wait_status_all) and all the others poll the node with the low id
  70 * for its accumulated result (wait_status_low).  When all nodes have set
  71 * status flag X, then status flag X_ALL will be set on the low nodeid.
  72 */
  73
  74uint32_t dlm_recover_status(struct dlm_ls *ls)
  75{
  76        uint32_t status;
  77        spin_lock(&ls->ls_recover_lock);
  78        status = ls->ls_recover_status;
  79        spin_unlock(&ls->ls_recover_lock);
  80        return status;
  81}
  82
  83static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
  84{
  85        ls->ls_recover_status |= status;
  86}
  87
  88void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
  89{
  90        spin_lock(&ls->ls_recover_lock);
  91        _set_recover_status(ls, status);
  92        spin_unlock(&ls->ls_recover_lock);
  93}
  94
  95static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
  96                           int save_slots)
  97{
  98        struct dlm_rcom *rc = ls->ls_recover_buf;
  99        struct dlm_member *memb;
 100        int error = 0, delay;
 101
 102        list_for_each_entry(memb, &ls->ls_nodes, list) {
 103                delay = 0;
 104                for (;;) {
 105                        if (dlm_recovery_stopped(ls)) {
 106                                error = -EINTR;
 107                                goto out;
 108                        }
 109
 110                        error = dlm_rcom_status(ls, memb->nodeid, 0);
 111                        if (error)
 112                                goto out;
 113
 114                        if (save_slots)
 115                                dlm_slot_save(ls, rc, memb);
 116
 117                        if (rc->rc_result & wait_status)
 118                                break;
 119                        if (delay < 1000)
 120                                delay += 20;
 121                        msleep(delay);
 122                }
 123        }
 124 out:
 125        return error;
 126}
 127
 128static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
 129                           uint32_t status_flags)
 130{
 131        struct dlm_rcom *rc = ls->ls_recover_buf;
 132        int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
 133
 134        for (;;) {
 135                if (dlm_recovery_stopped(ls)) {
 136                        error = -EINTR;
 137                        goto out;
 138                }
 139
 140                error = dlm_rcom_status(ls, nodeid, status_flags);
 141                if (error)
 142                        break;
 143
 144                if (rc->rc_result & wait_status)
 145                        break;
 146                if (delay < 1000)
 147                        delay += 20;
 148                msleep(delay);
 149        }
 150 out:
 151        return error;
 152}
 153
 154static int wait_status(struct dlm_ls *ls, uint32_t status)
 155{
 156        uint32_t status_all = status << 1;
 157        int error;
 158
 159        if (ls->ls_low_nodeid == dlm_our_nodeid()) {
 160                error = wait_status_all(ls, status, 0);
 161                if (!error)
 162                        dlm_set_recover_status(ls, status_all);
 163        } else
 164                error = wait_status_low(ls, status_all, 0);
 165
 166        return error;
 167}
 168
 169int dlm_recover_members_wait(struct dlm_ls *ls)
 170{
 171        struct dlm_member *memb;
 172        struct dlm_slot *slots;
 173        int num_slots, slots_size;
 174        int error, rv;
 175        uint32_t gen;
 176
 177        list_for_each_entry(memb, &ls->ls_nodes, list) {
 178                memb->slot = -1;
 179                memb->generation = 0;
 180        }
 181
 182        if (ls->ls_low_nodeid == dlm_our_nodeid()) {
 183                error = wait_status_all(ls, DLM_RS_NODES, 1);
 184                if (error)
 185                        goto out;
 186
 187                /* slots array is sparse, slots_size may be > num_slots */
 188
 189                rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
 190                if (!rv) {
 191                        spin_lock(&ls->ls_recover_lock);
 192                        _set_recover_status(ls, DLM_RS_NODES_ALL);
 193                        ls->ls_num_slots = num_slots;
 194                        ls->ls_slots_size = slots_size;
 195                        ls->ls_slots = slots;
 196                        ls->ls_generation = gen;
 197                        spin_unlock(&ls->ls_recover_lock);
 198                } else {
 199                        dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
 200                }
 201        } else {
 202                error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS);
 203                if (error)
 204                        goto out;
 205
 206                dlm_slots_copy_in(ls);
 207        }
 208 out:
 209        return error;
 210}
 211
 212int dlm_recover_directory_wait(struct dlm_ls *ls)
 213{
 214        return wait_status(ls, DLM_RS_DIR);
 215}
 216
 217int dlm_recover_locks_wait(struct dlm_ls *ls)
 218{
 219        return wait_status(ls, DLM_RS_LOCKS);
 220}
 221
 222int dlm_recover_done_wait(struct dlm_ls *ls)
 223{
 224        return wait_status(ls, DLM_RS_DONE);
 225}
 226
 227/*
 228 * The recover_list contains all the rsb's for which we've requested the new
 229 * master nodeid.  As replies are returned from the resource directories the
 230 * rsb's are removed from the list.  When the list is empty we're done.
 231 *
 232 * The recover_list is later similarly used for all rsb's for which we've sent
 233 * new lkb's and need to receive new corresponding lkid's.
 234 *
 235 * We use the address of the rsb struct as a simple local identifier for the
 236 * rsb so we can match an rcom reply with the rsb it was sent for.
 237 */
 238
 239static int recover_list_empty(struct dlm_ls *ls)
 240{
 241        int empty;
 242
 243        spin_lock(&ls->ls_recover_list_lock);
 244        empty = list_empty(&ls->ls_recover_list);
 245        spin_unlock(&ls->ls_recover_list_lock);
 246
 247        return empty;
 248}
 249
 250static void recover_list_add(struct dlm_rsb *r)
 251{
 252        struct dlm_ls *ls = r->res_ls;
 253
 254        spin_lock(&ls->ls_recover_list_lock);
 255        if (list_empty(&r->res_recover_list)) {
 256                list_add_tail(&r->res_recover_list, &ls->ls_recover_list);
 257                ls->ls_recover_list_count++;
 258                dlm_hold_rsb(r);
 259        }
 260        spin_unlock(&ls->ls_recover_list_lock);
 261}
 262
 263static void recover_list_del(struct dlm_rsb *r)
 264{
 265        struct dlm_ls *ls = r->res_ls;
 266
 267        spin_lock(&ls->ls_recover_list_lock);
 268        list_del_init(&r->res_recover_list);
 269        ls->ls_recover_list_count--;
 270        spin_unlock(&ls->ls_recover_list_lock);
 271
 272        dlm_put_rsb(r);
 273}
 274
 275static void recover_list_clear(struct dlm_ls *ls)
 276{
 277        struct dlm_rsb *r, *s;
 278
 279        spin_lock(&ls->ls_recover_list_lock);
 280        list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) {
 281                list_del_init(&r->res_recover_list);
 282                r->res_recover_locks_count = 0;
 283                dlm_put_rsb(r);
 284                ls->ls_recover_list_count--;
 285        }
 286
 287        if (ls->ls_recover_list_count != 0) {
 288                log_error(ls, "warning: recover_list_count %d",
 289                          ls->ls_recover_list_count);
 290                ls->ls_recover_list_count = 0;
 291        }
 292        spin_unlock(&ls->ls_recover_list_lock);
 293}
 294
 295static int recover_idr_empty(struct dlm_ls *ls)
 296{
 297        int empty = 1;
 298
 299        spin_lock(&ls->ls_recover_idr_lock);
 300        if (ls->ls_recover_list_count)
 301                empty = 0;
 302        spin_unlock(&ls->ls_recover_idr_lock);
 303
 304        return empty;
 305}
 306
 307static int recover_idr_add(struct dlm_rsb *r)
 308{
 309        struct dlm_ls *ls = r->res_ls;
 310        int rv;
 311
 312        idr_preload(GFP_NOFS);
 313        spin_lock(&ls->ls_recover_idr_lock);
 314        if (r->res_id) {
 315                rv = -1;
 316                goto out_unlock;
 317        }
 318        rv = idr_alloc(&ls->ls_recover_idr, r, 1, 0, GFP_NOWAIT);
 319        if (rv < 0)
 320                goto out_unlock;
 321
 322        r->res_id = rv;
 323        ls->ls_recover_list_count++;
 324        dlm_hold_rsb(r);
 325        rv = 0;
 326out_unlock:
 327        spin_unlock(&ls->ls_recover_idr_lock);
 328        idr_preload_end();
 329        return rv;
 330}
 331
 332static void recover_idr_del(struct dlm_rsb *r)
 333{
 334        struct dlm_ls *ls = r->res_ls;
 335
 336        spin_lock(&ls->ls_recover_idr_lock);
 337        idr_remove(&ls->ls_recover_idr, r->res_id);
 338        r->res_id = 0;
 339        ls->ls_recover_list_count--;
 340        spin_unlock(&ls->ls_recover_idr_lock);
 341
 342        dlm_put_rsb(r);
 343}
 344
 345static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
 346{
 347        struct dlm_rsb *r;
 348
 349        spin_lock(&ls->ls_recover_idr_lock);
 350        r = idr_find(&ls->ls_recover_idr, (int)id);
 351        spin_unlock(&ls->ls_recover_idr_lock);
 352        return r;
 353}
 354
 355static void recover_idr_clear(struct dlm_ls *ls)
 356{
 357        struct dlm_rsb *r;
 358        int id;
 359
 360        spin_lock(&ls->ls_recover_idr_lock);
 361
 362        idr_for_each_entry(&ls->ls_recover_idr, r, id) {
 363                idr_remove(&ls->ls_recover_idr, id);
 364                r->res_id = 0;
 365                r->res_recover_locks_count = 0;
 366                ls->ls_recover_list_count--;
 367
 368                dlm_put_rsb(r);
 369        }
 370
 371        if (ls->ls_recover_list_count != 0) {
 372                log_error(ls, "warning: recover_list_count %d",
 373                          ls->ls_recover_list_count);
 374                ls->ls_recover_list_count = 0;
 375        }
 376        spin_unlock(&ls->ls_recover_idr_lock);
 377}
 378
 379
 380/* Master recovery: find new master node for rsb's that were
 381   mastered on nodes that have been removed.
 382
 383   dlm_recover_masters
 384   recover_master
 385   dlm_send_rcom_lookup            ->  receive_rcom_lookup
 386                                       dlm_dir_lookup
 387   receive_rcom_lookup_reply       <-
 388   dlm_recover_master_reply
 389   set_new_master
 390   set_master_lkbs
 391   set_lock_master
 392*/
 393
 394/*
 395 * Set the lock master for all LKBs in a lock queue
 396 * If we are the new master of the rsb, we may have received new
 397 * MSTCPY locks from other nodes already which we need to ignore
 398 * when setting the new nodeid.
 399 */
 400
 401static void set_lock_master(struct list_head *queue, int nodeid)
 402{
 403        struct dlm_lkb *lkb;
 404
 405        list_for_each_entry(lkb, queue, lkb_statequeue) {
 406                if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) {
 407                        lkb->lkb_nodeid = nodeid;
 408                        lkb->lkb_remid = 0;
 409                }
 410        }
 411}
 412
 413static void set_master_lkbs(struct dlm_rsb *r)
 414{
 415        set_lock_master(&r->res_grantqueue, r->res_nodeid);
 416        set_lock_master(&r->res_convertqueue, r->res_nodeid);
 417        set_lock_master(&r->res_waitqueue, r->res_nodeid);
 418}
 419
 420/*
 421 * Propagate the new master nodeid to locks
 422 * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
 423 * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which
 424 * rsb's to consider.
 425 */
 426
 427static void set_new_master(struct dlm_rsb *r)
 428{
 429        set_master_lkbs(r);
 430        rsb_set_flag(r, RSB_NEW_MASTER);
 431        rsb_set_flag(r, RSB_NEW_MASTER2);
 432}
 433
 434/*
 435 * We do async lookups on rsb's that need new masters.  The rsb's
 436 * waiting for a lookup reply are kept on the recover_list.
 437 *
 438 * Another node recovering the master may have sent us a rcom lookup,
 439 * and our dlm_master_lookup() set it as the new master, along with
 440 * NEW_MASTER so that we'll recover it here (this implies dir_nodeid
 441 * equals our_nodeid below).
 442 */
 443
 444static int recover_master(struct dlm_rsb *r, unsigned int *count)
 445{
 446        struct dlm_ls *ls = r->res_ls;
 447        int our_nodeid, dir_nodeid;
 448        int is_removed = 0;
 449        int error;
 450
 451        if (is_master(r))
 452                return 0;
 453
 454        is_removed = dlm_is_removed(ls, r->res_nodeid);
 455
 456        if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
 457                return 0;
 458
 459        our_nodeid = dlm_our_nodeid();
 460        dir_nodeid = dlm_dir_nodeid(r);
 461
 462        if (dir_nodeid == our_nodeid) {
 463                if (is_removed) {
 464                        r->res_master_nodeid = our_nodeid;
 465                        r->res_nodeid = 0;
 466                }
 467
 468                /* set master of lkbs to ourself when is_removed, or to
 469                   another new master which we set along with NEW_MASTER
 470                   in dlm_master_lookup */
 471                set_new_master(r);
 472                error = 0;
 473        } else {
 474                recover_idr_add(r);
 475                error = dlm_send_rcom_lookup(r, dir_nodeid);
 476        }
 477
 478        (*count)++;
 479        return error;
 480}
 481
 482/*
 483 * All MSTCPY locks are purged and rebuilt, even if the master stayed the same.
 484 * This is necessary because recovery can be started, aborted and restarted,
 485 * causing the master nodeid to briefly change during the aborted recovery, and
 486 * change back to the original value in the second recovery.  The MSTCPY locks
 487 * may or may not have been purged during the aborted recovery.  Another node
 488 * with an outstanding request in waiters list and a request reply saved in the
 489 * requestqueue, cannot know whether it should ignore the reply and resend the
 490 * request, or accept the reply and complete the request.  It must do the
 491 * former if the remote node purged MSTCPY locks, and it must do the later if
 492 * the remote node did not.  This is solved by always purging MSTCPY locks, in
 493 * which case, the request reply would always be ignored and the request
 494 * resent.
 495 */
 496
 497static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
 498{
 499        int dir_nodeid = dlm_dir_nodeid(r);
 500        int new_master = dir_nodeid;
 501
 502        if (dir_nodeid == dlm_our_nodeid())
 503                new_master = 0;
 504
 505        dlm_purge_mstcpy_locks(r);
 506        r->res_master_nodeid = dir_nodeid;
 507        r->res_nodeid = new_master;
 508        set_new_master(r);
 509        (*count)++;
 510        return 0;
 511}
 512
 513/*
 514 * Go through local root resources and for each rsb which has a master which
 515 * has departed, get the new master nodeid from the directory.  The dir will
 516 * assign mastery to the first node to look up the new master.  That means
 517 * we'll discover in this lookup if we're the new master of any rsb's.
 518 *
 519 * We fire off all the dir lookup requests individually and asynchronously to
 520 * the correct dir node.
 521 */
 522
 523int dlm_recover_masters(struct dlm_ls *ls)
 524{
 525        struct dlm_rsb *r;
 526        unsigned int total = 0;
 527        unsigned int count = 0;
 528        int nodir = dlm_no_directory(ls);
 529        int error;
 530
 531        log_rinfo(ls, "dlm_recover_masters");
 532
 533        down_read(&ls->ls_root_sem);
 534        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 535                if (dlm_recovery_stopped(ls)) {
 536                        up_read(&ls->ls_root_sem);
 537                        error = -EINTR;
 538                        goto out;
 539                }
 540
 541                lock_rsb(r);
 542                if (nodir)
 543                        error = recover_master_static(r, &count);
 544                else
 545                        error = recover_master(r, &count);
 546                unlock_rsb(r);
 547                cond_resched();
 548                total++;
 549
 550                if (error) {
 551                        up_read(&ls->ls_root_sem);
 552                        goto out;
 553                }
 554        }
 555        up_read(&ls->ls_root_sem);
 556
 557        log_rinfo(ls, "dlm_recover_masters %u of %u", count, total);
 558
 559        error = dlm_wait_function(ls, &recover_idr_empty);
 560 out:
 561        if (error)
 562                recover_idr_clear(ls);
 563        return error;
 564}
 565
 566int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
 567{
 568        struct dlm_rsb *r;
 569        int ret_nodeid, new_master;
 570
 571        r = recover_idr_find(ls, rc->rc_id);
 572        if (!r) {
 573                log_error(ls, "dlm_recover_master_reply no id %llx",
 574                          (unsigned long long)rc->rc_id);
 575                goto out;
 576        }
 577
 578        ret_nodeid = rc->rc_result;
 579
 580        if (ret_nodeid == dlm_our_nodeid())
 581                new_master = 0;
 582        else
 583                new_master = ret_nodeid;
 584
 585        lock_rsb(r);
 586        r->res_master_nodeid = ret_nodeid;
 587        r->res_nodeid = new_master;
 588        set_new_master(r);
 589        unlock_rsb(r);
 590        recover_idr_del(r);
 591
 592        if (recover_idr_empty(ls))
 593                wake_up(&ls->ls_wait_general);
 594 out:
 595        return 0;
 596}
 597
 598
 599/* Lock recovery: rebuild the process-copy locks we hold on a
 600   remastered rsb on the new rsb master.
 601
 602   dlm_recover_locks
 603   recover_locks
 604   recover_locks_queue
 605   dlm_send_rcom_lock              ->  receive_rcom_lock
 606                                       dlm_recover_master_copy
 607   receive_rcom_lock_reply         <-
 608   dlm_recover_process_copy
 609*/
 610
 611
 612/*
 613 * keep a count of the number of lkb's we send to the new master; when we get
 614 * an equal number of replies then recovery for the rsb is done
 615 */
 616
 617static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head)
 618{
 619        struct dlm_lkb *lkb;
 620        int error = 0;
 621
 622        list_for_each_entry(lkb, head, lkb_statequeue) {
 623                error = dlm_send_rcom_lock(r, lkb);
 624                if (error)
 625                        break;
 626                r->res_recover_locks_count++;
 627        }
 628
 629        return error;
 630}
 631
 632static int recover_locks(struct dlm_rsb *r)
 633{
 634        int error = 0;
 635
 636        lock_rsb(r);
 637
 638        DLM_ASSERT(!r->res_recover_locks_count, dlm_dump_rsb(r););
 639
 640        error = recover_locks_queue(r, &r->res_grantqueue);
 641        if (error)
 642                goto out;
 643        error = recover_locks_queue(r, &r->res_convertqueue);
 644        if (error)
 645                goto out;
 646        error = recover_locks_queue(r, &r->res_waitqueue);
 647        if (error)
 648                goto out;
 649
 650        if (r->res_recover_locks_count)
 651                recover_list_add(r);
 652        else
 653                rsb_clear_flag(r, RSB_NEW_MASTER);
 654 out:
 655        unlock_rsb(r);
 656        return error;
 657}
 658
 659int dlm_recover_locks(struct dlm_ls *ls)
 660{
 661        struct dlm_rsb *r;
 662        int error, count = 0;
 663
 664        down_read(&ls->ls_root_sem);
 665        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 666                if (is_master(r)) {
 667                        rsb_clear_flag(r, RSB_NEW_MASTER);
 668                        continue;
 669                }
 670
 671                if (!rsb_flag(r, RSB_NEW_MASTER))
 672                        continue;
 673
 674                if (dlm_recovery_stopped(ls)) {
 675                        error = -EINTR;
 676                        up_read(&ls->ls_root_sem);
 677                        goto out;
 678                }
 679
 680                error = recover_locks(r);
 681                if (error) {
 682                        up_read(&ls->ls_root_sem);
 683                        goto out;
 684                }
 685
 686                count += r->res_recover_locks_count;
 687        }
 688        up_read(&ls->ls_root_sem);
 689
 690        log_rinfo(ls, "dlm_recover_locks %d out", count);
 691
 692        error = dlm_wait_function(ls, &recover_list_empty);
 693 out:
 694        if (error)
 695                recover_list_clear(ls);
 696        return error;
 697}
 698
 699void dlm_recovered_lock(struct dlm_rsb *r)
 700{
 701        DLM_ASSERT(rsb_flag(r, RSB_NEW_MASTER), dlm_dump_rsb(r););
 702
 703        r->res_recover_locks_count--;
 704        if (!r->res_recover_locks_count) {
 705                rsb_clear_flag(r, RSB_NEW_MASTER);
 706                recover_list_del(r);
 707        }
 708
 709        if (recover_list_empty(r->res_ls))
 710                wake_up(&r->res_ls->ls_wait_general);
 711}
 712
 713/*
 714 * The lvb needs to be recovered on all master rsb's.  This includes setting
 715 * the VALNOTVALID flag if necessary, and determining the correct lvb contents
 716 * based on the lvb's of the locks held on the rsb.
 717 *
 718 * RSB_VALNOTVALID is set in two cases:
 719 *
 720 * 1. we are master, but not new, and we purged an EX/PW lock held by a
 721 * failed node (in dlm_recover_purge which set RSB_RECOVER_LVB_INVAL)
 722 *
 723 * 2. we are a new master, and there are only NL/CR locks left.
 724 * (We could probably improve this by only invaliding in this way when
 725 * the previous master left uncleanly.  VMS docs mention that.)
 726 *
 727 * The LVB contents are only considered for changing when this is a new master
 728 * of the rsb (NEW_MASTER2).  Then, the rsb's lvb is taken from any lkb with
 729 * mode > CR.  If no lkb's exist with mode above CR, the lvb contents are taken
 730 * from the lkb with the largest lvb sequence number.
 731 */
 732
 733static void recover_lvb(struct dlm_rsb *r)
 734{
 735        struct dlm_lkb *lkb, *high_lkb = NULL;
 736        uint32_t high_seq = 0;
 737        int lock_lvb_exists = 0;
 738        int big_lock_exists = 0;
 739        int lvblen = r->res_ls->ls_lvblen;
 740
 741        if (!rsb_flag(r, RSB_NEW_MASTER2) &&
 742            rsb_flag(r, RSB_RECOVER_LVB_INVAL)) {
 743                /* case 1 above */
 744                rsb_set_flag(r, RSB_VALNOTVALID);
 745                return;
 746        }
 747
 748        if (!rsb_flag(r, RSB_NEW_MASTER2))
 749                return;
 750
 751        /* we are the new master, so figure out if VALNOTVALID should
 752           be set, and set the rsb lvb from the best lkb available. */
 753
 754        list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
 755                if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
 756                        continue;
 757
 758                lock_lvb_exists = 1;
 759
 760                if (lkb->lkb_grmode > DLM_LOCK_CR) {
 761                        big_lock_exists = 1;
 762                        goto setflag;
 763                }
 764
 765                if (((int)lkb->lkb_lvbseq - (int)high_seq) >= 0) {
 766                        high_lkb = lkb;
 767                        high_seq = lkb->lkb_lvbseq;
 768                }
 769        }
 770
 771        list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
 772                if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
 773                        continue;
 774
 775                lock_lvb_exists = 1;
 776
 777                if (lkb->lkb_grmode > DLM_LOCK_CR) {
 778                        big_lock_exists = 1;
 779                        goto setflag;
 780                }
 781
 782                if (((int)lkb->lkb_lvbseq - (int)high_seq) >= 0) {
 783                        high_lkb = lkb;
 784                        high_seq = lkb->lkb_lvbseq;
 785                }
 786        }
 787
 788 setflag:
 789        if (!lock_lvb_exists)
 790                goto out;
 791
 792        /* lvb is invalidated if only NL/CR locks remain */
 793        if (!big_lock_exists)
 794                rsb_set_flag(r, RSB_VALNOTVALID);
 795
 796        if (!r->res_lvbptr) {
 797                r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
 798                if (!r->res_lvbptr)
 799                        goto out;
 800        }
 801
 802        if (big_lock_exists) {
 803                r->res_lvbseq = lkb->lkb_lvbseq;
 804                memcpy(r->res_lvbptr, lkb->lkb_lvbptr, lvblen);
 805        } else if (high_lkb) {
 806                r->res_lvbseq = high_lkb->lkb_lvbseq;
 807                memcpy(r->res_lvbptr, high_lkb->lkb_lvbptr, lvblen);
 808        } else {
 809                r->res_lvbseq = 0;
 810                memset(r->res_lvbptr, 0, lvblen);
 811        }
 812 out:
 813        return;
 814}
 815
 816/* All master rsb's flagged RECOVER_CONVERT need to be looked at.  The locks
 817   converting PR->CW or CW->PR need to have their lkb_grmode set. */
 818
 819static void recover_conversion(struct dlm_rsb *r)
 820{
 821        struct dlm_ls *ls = r->res_ls;
 822        struct dlm_lkb *lkb;
 823        int grmode = -1;
 824
 825        list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
 826                if (lkb->lkb_grmode == DLM_LOCK_PR ||
 827                    lkb->lkb_grmode == DLM_LOCK_CW) {
 828                        grmode = lkb->lkb_grmode;
 829                        break;
 830                }
 831        }
 832
 833        list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
 834                if (lkb->lkb_grmode != DLM_LOCK_IV)
 835                        continue;
 836                if (grmode == -1) {
 837                        log_debug(ls, "recover_conversion %x set gr to rq %d",
 838                                  lkb->lkb_id, lkb->lkb_rqmode);
 839                        lkb->lkb_grmode = lkb->lkb_rqmode;
 840                } else {
 841                        log_debug(ls, "recover_conversion %x set gr %d",
 842                                  lkb->lkb_id, grmode);
 843                        lkb->lkb_grmode = grmode;
 844                }
 845        }
 846}
 847
 848/* We've become the new master for this rsb and waiting/converting locks may
 849   need to be granted in dlm_recover_grant() due to locks that may have
 850   existed from a removed node. */
 851
 852static void recover_grant(struct dlm_rsb *r)
 853{
 854        if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
 855                rsb_set_flag(r, RSB_RECOVER_GRANT);
 856}
 857
 858void dlm_recover_rsbs(struct dlm_ls *ls)
 859{
 860        struct dlm_rsb *r;
 861        unsigned int count = 0;
 862
 863        down_read(&ls->ls_root_sem);
 864        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 865                lock_rsb(r);
 866                if (is_master(r)) {
 867                        if (rsb_flag(r, RSB_RECOVER_CONVERT))
 868                                recover_conversion(r);
 869
 870                        /* recover lvb before granting locks so the updated
 871                           lvb/VALNOTVALID is presented in the completion */
 872                        recover_lvb(r);
 873
 874                        if (rsb_flag(r, RSB_NEW_MASTER2))
 875                                recover_grant(r);
 876                        count++;
 877                } else {
 878                        rsb_clear_flag(r, RSB_VALNOTVALID);
 879                }
 880                rsb_clear_flag(r, RSB_RECOVER_CONVERT);
 881                rsb_clear_flag(r, RSB_RECOVER_LVB_INVAL);
 882                rsb_clear_flag(r, RSB_NEW_MASTER2);
 883                unlock_rsb(r);
 884        }
 885        up_read(&ls->ls_root_sem);
 886
 887        if (count)
 888                log_rinfo(ls, "dlm_recover_rsbs %d done", count);
 889}
 890
 891/* Create a single list of all root rsb's to be used during recovery */
 892
 893int dlm_create_root_list(struct dlm_ls *ls)
 894{
 895        struct rb_node *n;
 896        struct dlm_rsb *r;
 897        int i, error = 0;
 898
 899        down_write(&ls->ls_root_sem);
 900        if (!list_empty(&ls->ls_root_list)) {
 901                log_error(ls, "root list not empty");
 902                error = -EINVAL;
 903                goto out;
 904        }
 905
 906        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 907                spin_lock(&ls->ls_rsbtbl[i].lock);
 908                for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
 909                        r = rb_entry(n, struct dlm_rsb, res_hashnode);
 910                        list_add(&r->res_root_list, &ls->ls_root_list);
 911                        dlm_hold_rsb(r);
 912                }
 913
 914                if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
 915                        log_error(ls, "dlm_create_root_list toss not empty");
 916                spin_unlock(&ls->ls_rsbtbl[i].lock);
 917        }
 918 out:
 919        up_write(&ls->ls_root_sem);
 920        return error;
 921}
 922
 923void dlm_release_root_list(struct dlm_ls *ls)
 924{
 925        struct dlm_rsb *r, *safe;
 926
 927        down_write(&ls->ls_root_sem);
 928        list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) {
 929                list_del_init(&r->res_root_list);
 930                dlm_put_rsb(r);
 931        }
 932        up_write(&ls->ls_root_sem);
 933}
 934
 935void dlm_clear_toss(struct dlm_ls *ls)
 936{
 937        struct rb_node *n, *next;
 938        struct dlm_rsb *r;
 939        unsigned int count = 0;
 940        int i;
 941
 942        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 943                spin_lock(&ls->ls_rsbtbl[i].lock);
 944                for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
 945                        next = rb_next(n);
 946                        r = rb_entry(n, struct dlm_rsb, res_hashnode);
 947                        rb_erase(n, &ls->ls_rsbtbl[i].toss);
 948                        dlm_free_rsb(r);
 949                        count++;
 950                }
 951                spin_unlock(&ls->ls_rsbtbl[i].lock);
 952        }
 953
 954        if (count)
 955                log_rinfo(ls, "dlm_clear_toss %u done", count);
 956}
 957
 958