linux/fs/dlm/recover.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14#include "dlm_internal.h"
  15#include "lockspace.h"
  16#include "dir.h"
  17#include "config.h"
  18#include "ast.h"
  19#include "memory.h"
  20#include "rcom.h"
  21#include "lock.h"
  22#include "lowcomms.h"
  23#include "member.h"
  24#include "recover.h"
  25
  26
  27/*
  28 * Recovery waiting routines: these functions wait for a particular reply from
  29 * a remote node, or for the remote node to report a certain status.  They need
  30 * to abort if the lockspace is stopped indicating a node has failed (perhaps
  31 * the one being waited for).
  32 */
  33
  34/*
  35 * Wait until given function returns non-zero or lockspace is stopped
  36 * (LS_RECOVERY_STOP set due to failure of a node in ls_nodes).  When another
  37 * function thinks it could have completed the waited-on task, they should wake
  38 * up ls_wait_general to get an immediate response rather than waiting for the
  39 * timeout.  This uses a timeout so it can check periodically if the wait
  40 * should abort due to node failure (which doesn't cause a wake_up).
  41 * This should only be called by the dlm_recoverd thread.
  42 */
  43
  44int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
  45{
  46        int error = 0;
  47        int rv;
  48
  49        while (1) {
  50                rv = wait_event_timeout(ls->ls_wait_general,
  51                                        testfn(ls) || dlm_recovery_stopped(ls),
  52                                        dlm_config.ci_recover_timer * HZ);
  53                if (rv)
  54                        break;
  55        }
  56
  57        if (dlm_recovery_stopped(ls)) {
  58                log_debug(ls, "dlm_wait_function aborted");
  59                error = -EINTR;
  60        }
  61        return error;
  62}
  63
  64/*
  65 * An efficient way for all nodes to wait for all others to have a certain
  66 * status.  The node with the lowest nodeid polls all the others for their
  67 * status (wait_status_all) and all the others poll the node with the low id
  68 * for its accumulated result (wait_status_low).  When all nodes have set
  69 * status flag X, then status flag X_ALL will be set on the low nodeid.
  70 */
  71
  72uint32_t dlm_recover_status(struct dlm_ls *ls)
  73{
  74        uint32_t status;
  75        spin_lock(&ls->ls_recover_lock);
  76        status = ls->ls_recover_status;
  77        spin_unlock(&ls->ls_recover_lock);
  78        return status;
  79}
  80
  81static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
  82{
  83        ls->ls_recover_status |= status;
  84}
  85
  86void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
  87{
  88        spin_lock(&ls->ls_recover_lock);
  89        _set_recover_status(ls, status);
  90        spin_unlock(&ls->ls_recover_lock);
  91}
  92
  93static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
  94                           int save_slots)
  95{
  96        struct dlm_rcom *rc = ls->ls_recover_buf;
  97        struct dlm_member *memb;
  98        int error = 0, delay;
  99
 100        list_for_each_entry(memb, &ls->ls_nodes, list) {
 101                delay = 0;
 102                for (;;) {
 103                        if (dlm_recovery_stopped(ls)) {
 104                                error = -EINTR;
 105                                goto out;
 106                        }
 107
 108                        error = dlm_rcom_status(ls, memb->nodeid, 0);
 109                        if (error)
 110                                goto out;
 111
 112                        if (save_slots)
 113                                dlm_slot_save(ls, rc, memb);
 114
 115                        if (rc->rc_result & wait_status)
 116                                break;
 117                        if (delay < 1000)
 118                                delay += 20;
 119                        msleep(delay);
 120                }
 121        }
 122 out:
 123        return error;
 124}
 125
 126static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
 127                           uint32_t status_flags)
 128{
 129        struct dlm_rcom *rc = ls->ls_recover_buf;
 130        int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
 131
 132        for (;;) {
 133                if (dlm_recovery_stopped(ls)) {
 134                        error = -EINTR;
 135                        goto out;
 136                }
 137
 138                error = dlm_rcom_status(ls, nodeid, status_flags);
 139                if (error)
 140                        break;
 141
 142                if (rc->rc_result & wait_status)
 143                        break;
 144                if (delay < 1000)
 145                        delay += 20;
 146                msleep(delay);
 147        }
 148 out:
 149        return error;
 150}
 151
 152static int wait_status(struct dlm_ls *ls, uint32_t status)
 153{
 154        uint32_t status_all = status << 1;
 155        int error;
 156
 157        if (ls->ls_low_nodeid == dlm_our_nodeid()) {
 158                error = wait_status_all(ls, status, 0);
 159                if (!error)
 160                        dlm_set_recover_status(ls, status_all);
 161        } else
 162                error = wait_status_low(ls, status_all, 0);
 163
 164        return error;
 165}
 166
 167int dlm_recover_members_wait(struct dlm_ls *ls)
 168{
 169        struct dlm_member *memb;
 170        struct dlm_slot *slots;
 171        int num_slots, slots_size;
 172        int error, rv;
 173        uint32_t gen;
 174
 175        list_for_each_entry(memb, &ls->ls_nodes, list) {
 176                memb->slot = -1;
 177                memb->generation = 0;
 178        }
 179
 180        if (ls->ls_low_nodeid == dlm_our_nodeid()) {
 181                error = wait_status_all(ls, DLM_RS_NODES, 1);
 182                if (error)
 183                        goto out;
 184
 185                /* slots array is sparse, slots_size may be > num_slots */
 186
 187                rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
 188                if (!rv) {
 189                        spin_lock(&ls->ls_recover_lock);
 190                        _set_recover_status(ls, DLM_RS_NODES_ALL);
 191                        ls->ls_num_slots = num_slots;
 192                        ls->ls_slots_size = slots_size;
 193                        ls->ls_slots = slots;
 194                        ls->ls_generation = gen;
 195                        spin_unlock(&ls->ls_recover_lock);
 196                } else {
 197                        dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
 198                }
 199        } else {
 200                error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS);
 201                if (error)
 202                        goto out;
 203
 204                dlm_slots_copy_in(ls);
 205        }
 206 out:
 207        return error;
 208}
 209
 210int dlm_recover_directory_wait(struct dlm_ls *ls)
 211{
 212        return wait_status(ls, DLM_RS_DIR);
 213}
 214
 215int dlm_recover_locks_wait(struct dlm_ls *ls)
 216{
 217        return wait_status(ls, DLM_RS_LOCKS);
 218}
 219
 220int dlm_recover_done_wait(struct dlm_ls *ls)
 221{
 222        return wait_status(ls, DLM_RS_DONE);
 223}
 224
 225/*
 226 * The recover_list contains all the rsb's for which we've requested the new
 227 * master nodeid.  As replies are returned from the resource directories the
 228 * rsb's are removed from the list.  When the list is empty we're done.
 229 *
 230 * The recover_list is later similarly used for all rsb's for which we've sent
 231 * new lkb's and need to receive new corresponding lkid's.
 232 *
 233 * We use the address of the rsb struct as a simple local identifier for the
 234 * rsb so we can match an rcom reply with the rsb it was sent for.
 235 */
 236
 237static int recover_list_empty(struct dlm_ls *ls)
 238{
 239        int empty;
 240
 241        spin_lock(&ls->ls_recover_list_lock);
 242        empty = list_empty(&ls->ls_recover_list);
 243        spin_unlock(&ls->ls_recover_list_lock);
 244
 245        return empty;
 246}
 247
 248static void recover_list_add(struct dlm_rsb *r)
 249{
 250        struct dlm_ls *ls = r->res_ls;
 251
 252        spin_lock(&ls->ls_recover_list_lock);
 253        if (list_empty(&r->res_recover_list)) {
 254                list_add_tail(&r->res_recover_list, &ls->ls_recover_list);
 255                ls->ls_recover_list_count++;
 256                dlm_hold_rsb(r);
 257        }
 258        spin_unlock(&ls->ls_recover_list_lock);
 259}
 260
 261static void recover_list_del(struct dlm_rsb *r)
 262{
 263        struct dlm_ls *ls = r->res_ls;
 264
 265        spin_lock(&ls->ls_recover_list_lock);
 266        list_del_init(&r->res_recover_list);
 267        ls->ls_recover_list_count--;
 268        spin_unlock(&ls->ls_recover_list_lock);
 269
 270        dlm_put_rsb(r);
 271}
 272
 273static void recover_list_clear(struct dlm_ls *ls)
 274{
 275        struct dlm_rsb *r, *s;
 276
 277        spin_lock(&ls->ls_recover_list_lock);
 278        list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) {
 279                list_del_init(&r->res_recover_list);
 280                r->res_recover_locks_count = 0;
 281                dlm_put_rsb(r);
 282                ls->ls_recover_list_count--;
 283        }
 284
 285        if (ls->ls_recover_list_count != 0) {
 286                log_error(ls, "warning: recover_list_count %d",
 287                          ls->ls_recover_list_count);
 288                ls->ls_recover_list_count = 0;
 289        }
 290        spin_unlock(&ls->ls_recover_list_lock);
 291}
 292
 293static int recover_idr_empty(struct dlm_ls *ls)
 294{
 295        int empty = 1;
 296
 297        spin_lock(&ls->ls_recover_idr_lock);
 298        if (ls->ls_recover_list_count)
 299                empty = 0;
 300        spin_unlock(&ls->ls_recover_idr_lock);
 301
 302        return empty;
 303}
 304
 305static int recover_idr_add(struct dlm_rsb *r)
 306{
 307        struct dlm_ls *ls = r->res_ls;
 308        int rv;
 309
 310        idr_preload(GFP_NOFS);
 311        spin_lock(&ls->ls_recover_idr_lock);
 312        if (r->res_id) {
 313                rv = -1;
 314                goto out_unlock;
 315        }
 316        rv = idr_alloc(&ls->ls_recover_idr, r, 1, 0, GFP_NOWAIT);
 317        if (rv < 0)
 318                goto out_unlock;
 319
 320        r->res_id = rv;
 321        ls->ls_recover_list_count++;
 322        dlm_hold_rsb(r);
 323        rv = 0;
 324out_unlock:
 325        spin_unlock(&ls->ls_recover_idr_lock);
 326        idr_preload_end();
 327        return rv;
 328}
 329
 330static void recover_idr_del(struct dlm_rsb *r)
 331{
 332        struct dlm_ls *ls = r->res_ls;
 333
 334        spin_lock(&ls->ls_recover_idr_lock);
 335        idr_remove(&ls->ls_recover_idr, r->res_id);
 336        r->res_id = 0;
 337        ls->ls_recover_list_count--;
 338        spin_unlock(&ls->ls_recover_idr_lock);
 339
 340        dlm_put_rsb(r);
 341}
 342
 343static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
 344{
 345        struct dlm_rsb *r;
 346
 347        spin_lock(&ls->ls_recover_idr_lock);
 348        r = idr_find(&ls->ls_recover_idr, (int)id);
 349        spin_unlock(&ls->ls_recover_idr_lock);
 350        return r;
 351}
 352
 353static void recover_idr_clear(struct dlm_ls *ls)
 354{
 355        struct dlm_rsb *r;
 356        int id;
 357
 358        spin_lock(&ls->ls_recover_idr_lock);
 359
 360        idr_for_each_entry(&ls->ls_recover_idr, r, id) {
 361                idr_remove(&ls->ls_recover_idr, id);
 362                r->res_id = 0;
 363                r->res_recover_locks_count = 0;
 364                ls->ls_recover_list_count--;
 365
 366                dlm_put_rsb(r);
 367        }
 368
 369        if (ls->ls_recover_list_count != 0) {
 370                log_error(ls, "warning: recover_list_count %d",
 371                          ls->ls_recover_list_count);
 372                ls->ls_recover_list_count = 0;
 373        }
 374        spin_unlock(&ls->ls_recover_idr_lock);
 375}
 376
 377
 378/* Master recovery: find new master node for rsb's that were
 379   mastered on nodes that have been removed.
 380
 381   dlm_recover_masters
 382   recover_master
 383   dlm_send_rcom_lookup            ->  receive_rcom_lookup
 384                                       dlm_dir_lookup
 385   receive_rcom_lookup_reply       <-
 386   dlm_recover_master_reply
 387   set_new_master
 388   set_master_lkbs
 389   set_lock_master
 390*/
 391
 392/*
 393 * Set the lock master for all LKBs in a lock queue
 394 * If we are the new master of the rsb, we may have received new
 395 * MSTCPY locks from other nodes already which we need to ignore
 396 * when setting the new nodeid.
 397 */
 398
 399static void set_lock_master(struct list_head *queue, int nodeid)
 400{
 401        struct dlm_lkb *lkb;
 402
 403        list_for_each_entry(lkb, queue, lkb_statequeue) {
 404                if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) {
 405                        lkb->lkb_nodeid = nodeid;
 406                        lkb->lkb_remid = 0;
 407                }
 408        }
 409}
 410
 411static void set_master_lkbs(struct dlm_rsb *r)
 412{
 413        set_lock_master(&r->res_grantqueue, r->res_nodeid);
 414        set_lock_master(&r->res_convertqueue, r->res_nodeid);
 415        set_lock_master(&r->res_waitqueue, r->res_nodeid);
 416}
 417
 418/*
 419 * Propagate the new master nodeid to locks
 420 * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
 421 * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which
 422 * rsb's to consider.
 423 */
 424
 425static void set_new_master(struct dlm_rsb *r)
 426{
 427        set_master_lkbs(r);
 428        rsb_set_flag(r, RSB_NEW_MASTER);
 429        rsb_set_flag(r, RSB_NEW_MASTER2);
 430}
 431
 432/*
 433 * We do async lookups on rsb's that need new masters.  The rsb's
 434 * waiting for a lookup reply are kept on the recover_list.
 435 *
 436 * Another node recovering the master may have sent us a rcom lookup,
 437 * and our dlm_master_lookup() set it as the new master, along with
 438 * NEW_MASTER so that we'll recover it here (this implies dir_nodeid
 439 * equals our_nodeid below).
 440 */
 441
 442static int recover_master(struct dlm_rsb *r, unsigned int *count)
 443{
 444        struct dlm_ls *ls = r->res_ls;
 445        int our_nodeid, dir_nodeid;
 446        int is_removed = 0;
 447        int error;
 448
 449        if (is_master(r))
 450                return 0;
 451
 452        is_removed = dlm_is_removed(ls, r->res_nodeid);
 453
 454        if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
 455                return 0;
 456
 457        our_nodeid = dlm_our_nodeid();
 458        dir_nodeid = dlm_dir_nodeid(r);
 459
 460        if (dir_nodeid == our_nodeid) {
 461                if (is_removed) {
 462                        r->res_master_nodeid = our_nodeid;
 463                        r->res_nodeid = 0;
 464                }
 465
 466                /* set master of lkbs to ourself when is_removed, or to
 467                   another new master which we set along with NEW_MASTER
 468                   in dlm_master_lookup */
 469                set_new_master(r);
 470                error = 0;
 471        } else {
 472                recover_idr_add(r);
 473                error = dlm_send_rcom_lookup(r, dir_nodeid);
 474        }
 475
 476        (*count)++;
 477        return error;
 478}
 479
 480/*
 481 * All MSTCPY locks are purged and rebuilt, even if the master stayed the same.
 482 * This is necessary because recovery can be started, aborted and restarted,
 483 * causing the master nodeid to briefly change during the aborted recovery, and
 484 * change back to the original value in the second recovery.  The MSTCPY locks
 485 * may or may not have been purged during the aborted recovery.  Another node
 486 * with an outstanding request in waiters list and a request reply saved in the
 487 * requestqueue, cannot know whether it should ignore the reply and resend the
 488 * request, or accept the reply and complete the request.  It must do the
 489 * former if the remote node purged MSTCPY locks, and it must do the later if
 490 * the remote node did not.  This is solved by always purging MSTCPY locks, in
 491 * which case, the request reply would always be ignored and the request
 492 * resent.
 493 */
 494
 495static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
 496{
 497        int dir_nodeid = dlm_dir_nodeid(r);
 498        int new_master = dir_nodeid;
 499
 500        if (dir_nodeid == dlm_our_nodeid())
 501                new_master = 0;
 502
 503        dlm_purge_mstcpy_locks(r);
 504        r->res_master_nodeid = dir_nodeid;
 505        r->res_nodeid = new_master;
 506        set_new_master(r);
 507        (*count)++;
 508        return 0;
 509}
 510
 511/*
 512 * Go through local root resources and for each rsb which has a master which
 513 * has departed, get the new master nodeid from the directory.  The dir will
 514 * assign mastery to the first node to look up the new master.  That means
 515 * we'll discover in this lookup if we're the new master of any rsb's.
 516 *
 517 * We fire off all the dir lookup requests individually and asynchronously to
 518 * the correct dir node.
 519 */
 520
 521int dlm_recover_masters(struct dlm_ls *ls)
 522{
 523        struct dlm_rsb *r;
 524        unsigned int total = 0;
 525        unsigned int count = 0;
 526        int nodir = dlm_no_directory(ls);
 527        int error;
 528
 529        log_rinfo(ls, "dlm_recover_masters");
 530
 531        down_read(&ls->ls_root_sem);
 532        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 533                if (dlm_recovery_stopped(ls)) {
 534                        up_read(&ls->ls_root_sem);
 535                        error = -EINTR;
 536                        goto out;
 537                }
 538
 539                lock_rsb(r);
 540                if (nodir)
 541                        error = recover_master_static(r, &count);
 542                else
 543                        error = recover_master(r, &count);
 544                unlock_rsb(r);
 545                cond_resched();
 546                total++;
 547
 548                if (error) {
 549                        up_read(&ls->ls_root_sem);
 550                        goto out;
 551                }
 552        }
 553        up_read(&ls->ls_root_sem);
 554
 555        log_rinfo(ls, "dlm_recover_masters %u of %u", count, total);
 556
 557        error = dlm_wait_function(ls, &recover_idr_empty);
 558 out:
 559        if (error)
 560                recover_idr_clear(ls);
 561        return error;
 562}
 563
 564int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
 565{
 566        struct dlm_rsb *r;
 567        int ret_nodeid, new_master;
 568
 569        r = recover_idr_find(ls, rc->rc_id);
 570        if (!r) {
 571                log_error(ls, "dlm_recover_master_reply no id %llx",
 572                          (unsigned long long)rc->rc_id);
 573                goto out;
 574        }
 575
 576        ret_nodeid = rc->rc_result;
 577
 578        if (ret_nodeid == dlm_our_nodeid())
 579                new_master = 0;
 580        else
 581                new_master = ret_nodeid;
 582
 583        lock_rsb(r);
 584        r->res_master_nodeid = ret_nodeid;
 585        r->res_nodeid = new_master;
 586        set_new_master(r);
 587        unlock_rsb(r);
 588        recover_idr_del(r);
 589
 590        if (recover_idr_empty(ls))
 591                wake_up(&ls->ls_wait_general);
 592 out:
 593        return 0;
 594}
 595
 596
 597/* Lock recovery: rebuild the process-copy locks we hold on a
 598   remastered rsb on the new rsb master.
 599
 600   dlm_recover_locks
 601   recover_locks
 602   recover_locks_queue
 603   dlm_send_rcom_lock              ->  receive_rcom_lock
 604                                       dlm_recover_master_copy
 605   receive_rcom_lock_reply         <-
 606   dlm_recover_process_copy
 607*/
 608
 609
 610/*
 611 * keep a count of the number of lkb's we send to the new master; when we get
 612 * an equal number of replies then recovery for the rsb is done
 613 */
 614
 615static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head)
 616{
 617        struct dlm_lkb *lkb;
 618        int error = 0;
 619
 620        list_for_each_entry(lkb, head, lkb_statequeue) {
 621                error = dlm_send_rcom_lock(r, lkb);
 622                if (error)
 623                        break;
 624                r->res_recover_locks_count++;
 625        }
 626
 627        return error;
 628}
 629
 630static int recover_locks(struct dlm_rsb *r)
 631{
 632        int error = 0;
 633
 634        lock_rsb(r);
 635
 636        DLM_ASSERT(!r->res_recover_locks_count, dlm_dump_rsb(r););
 637
 638        error = recover_locks_queue(r, &r->res_grantqueue);
 639        if (error)
 640                goto out;
 641        error = recover_locks_queue(r, &r->res_convertqueue);
 642        if (error)
 643                goto out;
 644        error = recover_locks_queue(r, &r->res_waitqueue);
 645        if (error)
 646                goto out;
 647
 648        if (r->res_recover_locks_count)
 649                recover_list_add(r);
 650        else
 651                rsb_clear_flag(r, RSB_NEW_MASTER);
 652 out:
 653        unlock_rsb(r);
 654        return error;
 655}
 656
 657int dlm_recover_locks(struct dlm_ls *ls)
 658{
 659        struct dlm_rsb *r;
 660        int error, count = 0;
 661
 662        down_read(&ls->ls_root_sem);
 663        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 664                if (is_master(r)) {
 665                        rsb_clear_flag(r, RSB_NEW_MASTER);
 666                        continue;
 667                }
 668
 669                if (!rsb_flag(r, RSB_NEW_MASTER))
 670                        continue;
 671
 672                if (dlm_recovery_stopped(ls)) {
 673                        error = -EINTR;
 674                        up_read(&ls->ls_root_sem);
 675                        goto out;
 676                }
 677
 678                error = recover_locks(r);
 679                if (error) {
 680                        up_read(&ls->ls_root_sem);
 681                        goto out;
 682                }
 683
 684                count += r->res_recover_locks_count;
 685        }
 686        up_read(&ls->ls_root_sem);
 687
 688        log_rinfo(ls, "dlm_recover_locks %d out", count);
 689
 690        error = dlm_wait_function(ls, &recover_list_empty);
 691 out:
 692        if (error)
 693                recover_list_clear(ls);
 694        return error;
 695}
 696
 697void dlm_recovered_lock(struct dlm_rsb *r)
 698{
 699        DLM_ASSERT(rsb_flag(r, RSB_NEW_MASTER), dlm_dump_rsb(r););
 700
 701        r->res_recover_locks_count--;
 702        if (!r->res_recover_locks_count) {
 703                rsb_clear_flag(r, RSB_NEW_MASTER);
 704                recover_list_del(r);
 705        }
 706
 707        if (recover_list_empty(r->res_ls))
 708                wake_up(&r->res_ls->ls_wait_general);
 709}
 710
 711/*
 712 * The lvb needs to be recovered on all master rsb's.  This includes setting
 713 * the VALNOTVALID flag if necessary, and determining the correct lvb contents
 714 * based on the lvb's of the locks held on the rsb.
 715 *
 716 * RSB_VALNOTVALID is set in two cases:
 717 *
 718 * 1. we are master, but not new, and we purged an EX/PW lock held by a
 719 * failed node (in dlm_recover_purge which set RSB_RECOVER_LVB_INVAL)
 720 *
 721 * 2. we are a new master, and there are only NL/CR locks left.
 722 * (We could probably improve this by only invaliding in this way when
 723 * the previous master left uncleanly.  VMS docs mention that.)
 724 *
 725 * The LVB contents are only considered for changing when this is a new master
 726 * of the rsb (NEW_MASTER2).  Then, the rsb's lvb is taken from any lkb with
 727 * mode > CR.  If no lkb's exist with mode above CR, the lvb contents are taken
 728 * from the lkb with the largest lvb sequence number.
 729 */
 730
 731static void recover_lvb(struct dlm_rsb *r)
 732{
 733        struct dlm_lkb *lkb, *high_lkb = NULL;
 734        uint32_t high_seq = 0;
 735        int lock_lvb_exists = 0;
 736        int big_lock_exists = 0;
 737        int lvblen = r->res_ls->ls_lvblen;
 738
 739        if (!rsb_flag(r, RSB_NEW_MASTER2) &&
 740            rsb_flag(r, RSB_RECOVER_LVB_INVAL)) {
 741                /* case 1 above */
 742                rsb_set_flag(r, RSB_VALNOTVALID);
 743                return;
 744        }
 745
 746        if (!rsb_flag(r, RSB_NEW_MASTER2))
 747                return;
 748
 749        /* we are the new master, so figure out if VALNOTVALID should
 750           be set, and set the rsb lvb from the best lkb available. */
 751
 752        list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
 753                if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
 754                        continue;
 755
 756                lock_lvb_exists = 1;
 757
 758                if (lkb->lkb_grmode > DLM_LOCK_CR) {
 759                        big_lock_exists = 1;
 760                        goto setflag;
 761                }
 762
 763                if (((int)lkb->lkb_lvbseq - (int)high_seq) >= 0) {
 764                        high_lkb = lkb;
 765                        high_seq = lkb->lkb_lvbseq;
 766                }
 767        }
 768
 769        list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
 770                if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
 771                        continue;
 772
 773                lock_lvb_exists = 1;
 774
 775                if (lkb->lkb_grmode > DLM_LOCK_CR) {
 776                        big_lock_exists = 1;
 777                        goto setflag;
 778                }
 779
 780                if (((int)lkb->lkb_lvbseq - (int)high_seq) >= 0) {
 781                        high_lkb = lkb;
 782                        high_seq = lkb->lkb_lvbseq;
 783                }
 784        }
 785
 786 setflag:
 787        if (!lock_lvb_exists)
 788                goto out;
 789
 790        /* lvb is invalidated if only NL/CR locks remain */
 791        if (!big_lock_exists)
 792                rsb_set_flag(r, RSB_VALNOTVALID);
 793
 794        if (!r->res_lvbptr) {
 795                r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
 796                if (!r->res_lvbptr)
 797                        goto out;
 798        }
 799
 800        if (big_lock_exists) {
 801                r->res_lvbseq = lkb->lkb_lvbseq;
 802                memcpy(r->res_lvbptr, lkb->lkb_lvbptr, lvblen);
 803        } else if (high_lkb) {
 804                r->res_lvbseq = high_lkb->lkb_lvbseq;
 805                memcpy(r->res_lvbptr, high_lkb->lkb_lvbptr, lvblen);
 806        } else {
 807                r->res_lvbseq = 0;
 808                memset(r->res_lvbptr, 0, lvblen);
 809        }
 810 out:
 811        return;
 812}
 813
 814/* All master rsb's flagged RECOVER_CONVERT need to be looked at.  The locks
 815   converting PR->CW or CW->PR need to have their lkb_grmode set. */
 816
 817static void recover_conversion(struct dlm_rsb *r)
 818{
 819        struct dlm_ls *ls = r->res_ls;
 820        struct dlm_lkb *lkb;
 821        int grmode = -1;
 822
 823        list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
 824                if (lkb->lkb_grmode == DLM_LOCK_PR ||
 825                    lkb->lkb_grmode == DLM_LOCK_CW) {
 826                        grmode = lkb->lkb_grmode;
 827                        break;
 828                }
 829        }
 830
 831        list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
 832                if (lkb->lkb_grmode != DLM_LOCK_IV)
 833                        continue;
 834                if (grmode == -1) {
 835                        log_debug(ls, "recover_conversion %x set gr to rq %d",
 836                                  lkb->lkb_id, lkb->lkb_rqmode);
 837                        lkb->lkb_grmode = lkb->lkb_rqmode;
 838                } else {
 839                        log_debug(ls, "recover_conversion %x set gr %d",
 840                                  lkb->lkb_id, grmode);
 841                        lkb->lkb_grmode = grmode;
 842                }
 843        }
 844}
 845
 846/* We've become the new master for this rsb and waiting/converting locks may
 847   need to be granted in dlm_recover_grant() due to locks that may have
 848   existed from a removed node. */
 849
 850static void recover_grant(struct dlm_rsb *r)
 851{
 852        if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
 853                rsb_set_flag(r, RSB_RECOVER_GRANT);
 854}
 855
 856void dlm_recover_rsbs(struct dlm_ls *ls)
 857{
 858        struct dlm_rsb *r;
 859        unsigned int count = 0;
 860
 861        down_read(&ls->ls_root_sem);
 862        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 863                lock_rsb(r);
 864                if (is_master(r)) {
 865                        if (rsb_flag(r, RSB_RECOVER_CONVERT))
 866                                recover_conversion(r);
 867
 868                        /* recover lvb before granting locks so the updated
 869                           lvb/VALNOTVALID is presented in the completion */
 870                        recover_lvb(r);
 871
 872                        if (rsb_flag(r, RSB_NEW_MASTER2))
 873                                recover_grant(r);
 874                        count++;
 875                } else {
 876                        rsb_clear_flag(r, RSB_VALNOTVALID);
 877                }
 878                rsb_clear_flag(r, RSB_RECOVER_CONVERT);
 879                rsb_clear_flag(r, RSB_RECOVER_LVB_INVAL);
 880                rsb_clear_flag(r, RSB_NEW_MASTER2);
 881                unlock_rsb(r);
 882        }
 883        up_read(&ls->ls_root_sem);
 884
 885        if (count)
 886                log_rinfo(ls, "dlm_recover_rsbs %d done", count);
 887}
 888
 889/* Create a single list of all root rsb's to be used during recovery */
 890
 891int dlm_create_root_list(struct dlm_ls *ls)
 892{
 893        struct rb_node *n;
 894        struct dlm_rsb *r;
 895        int i, error = 0;
 896
 897        down_write(&ls->ls_root_sem);
 898        if (!list_empty(&ls->ls_root_list)) {
 899                log_error(ls, "root list not empty");
 900                error = -EINVAL;
 901                goto out;
 902        }
 903
 904        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 905                spin_lock(&ls->ls_rsbtbl[i].lock);
 906                for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
 907                        r = rb_entry(n, struct dlm_rsb, res_hashnode);
 908                        list_add(&r->res_root_list, &ls->ls_root_list);
 909                        dlm_hold_rsb(r);
 910                }
 911
 912                if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
 913                        log_error(ls, "dlm_create_root_list toss not empty");
 914                spin_unlock(&ls->ls_rsbtbl[i].lock);
 915        }
 916 out:
 917        up_write(&ls->ls_root_sem);
 918        return error;
 919}
 920
 921void dlm_release_root_list(struct dlm_ls *ls)
 922{
 923        struct dlm_rsb *r, *safe;
 924
 925        down_write(&ls->ls_root_sem);
 926        list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) {
 927                list_del_init(&r->res_root_list);
 928                dlm_put_rsb(r);
 929        }
 930        up_write(&ls->ls_root_sem);
 931}
 932
 933void dlm_clear_toss(struct dlm_ls *ls)
 934{
 935        struct rb_node *n, *next;
 936        struct dlm_rsb *r;
 937        unsigned int count = 0;
 938        int i;
 939
 940        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 941                spin_lock(&ls->ls_rsbtbl[i].lock);
 942                for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
 943                        next = rb_next(n);
 944                        r = rb_entry(n, struct dlm_rsb, res_hashnode);
 945                        rb_erase(n, &ls->ls_rsbtbl[i].toss);
 946                        dlm_free_rsb(r);
 947                        count++;
 948                }
 949                spin_unlock(&ls->ls_rsbtbl[i].lock);
 950        }
 951
 952        if (count)
 953                log_rinfo(ls, "dlm_clear_toss %u done", count);
 954}
 955
 956