linux/fs/dlm/recover.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14#include "dlm_internal.h"
  15#include "lockspace.h"
  16#include "dir.h"
  17#include "config.h"
  18#include "ast.h"
  19#include "memory.h"
  20#include "rcom.h"
  21#include "lock.h"
  22#include "lowcomms.h"
  23#include "member.h"
  24#include "recover.h"
  25
  26
  27/*
  28 * Recovery waiting routines: these functions wait for a particular reply from
  29 * a remote node, or for the remote node to report a certain status.  They need
  30 * to abort if the lockspace is stopped indicating a node has failed (perhaps
  31 * the one being waited for).
  32 */
  33
  34/*
  35 * Wait until given function returns non-zero or lockspace is stopped
  36 * (LS_RECOVERY_STOP set due to failure of a node in ls_nodes).  When another
  37 * function thinks it could have completed the waited-on task, they should wake
  38 * up ls_wait_general to get an immediate response rather than waiting for the
  39 * timeout.  This uses a timeout so it can check periodically if the wait
  40 * should abort due to node failure (which doesn't cause a wake_up).
  41 * This should only be called by the dlm_recoverd thread.
  42 */
  43
  44int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
  45{
  46        int error = 0;
  47        int rv;
  48
  49        while (1) {
  50                rv = wait_event_timeout(ls->ls_wait_general,
  51                                        testfn(ls) || dlm_recovery_stopped(ls),
  52                                        dlm_config.ci_recover_timer * HZ);
  53                if (rv)
  54                        break;
  55                if (test_bit(LSFL_RCOM_WAIT, &ls->ls_flags)) {
  56                        log_debug(ls, "dlm_wait_function timed out");
  57                        return -ETIMEDOUT;
  58                }
  59        }
  60
  61        if (dlm_recovery_stopped(ls)) {
  62                log_debug(ls, "dlm_wait_function aborted");
  63                error = -EINTR;
  64        }
  65        return error;
  66}
  67
  68/*
  69 * An efficient way for all nodes to wait for all others to have a certain
  70 * status.  The node with the lowest nodeid polls all the others for their
  71 * status (wait_status_all) and all the others poll the node with the low id
  72 * for its accumulated result (wait_status_low).  When all nodes have set
  73 * status flag X, then status flag X_ALL will be set on the low nodeid.
  74 */
  75
  76uint32_t dlm_recover_status(struct dlm_ls *ls)
  77{
  78        uint32_t status;
  79        spin_lock(&ls->ls_recover_lock);
  80        status = ls->ls_recover_status;
  81        spin_unlock(&ls->ls_recover_lock);
  82        return status;
  83}
  84
  85static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
  86{
  87        ls->ls_recover_status |= status;
  88}
  89
  90void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
  91{
  92        spin_lock(&ls->ls_recover_lock);
  93        _set_recover_status(ls, status);
  94        spin_unlock(&ls->ls_recover_lock);
  95}
  96
  97static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
  98                           int save_slots)
  99{
 100        struct dlm_rcom *rc = ls->ls_recover_buf;
 101        struct dlm_member *memb;
 102        int error = 0, delay;
 103
 104        list_for_each_entry(memb, &ls->ls_nodes, list) {
 105                delay = 0;
 106                for (;;) {
 107                        if (dlm_recovery_stopped(ls)) {
 108                                error = -EINTR;
 109                                goto out;
 110                        }
 111
 112                        error = dlm_rcom_status(ls, memb->nodeid, 0);
 113                        if (error)
 114                                goto out;
 115
 116                        if (save_slots)
 117                                dlm_slot_save(ls, rc, memb);
 118
 119                        if (rc->rc_result & wait_status)
 120                                break;
 121                        if (delay < 1000)
 122                                delay += 20;
 123                        msleep(delay);
 124                }
 125        }
 126 out:
 127        return error;
 128}
 129
 130static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
 131                           uint32_t status_flags)
 132{
 133        struct dlm_rcom *rc = ls->ls_recover_buf;
 134        int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
 135
 136        for (;;) {
 137                if (dlm_recovery_stopped(ls)) {
 138                        error = -EINTR;
 139                        goto out;
 140                }
 141
 142                error = dlm_rcom_status(ls, nodeid, status_flags);
 143                if (error)
 144                        break;
 145
 146                if (rc->rc_result & wait_status)
 147                        break;
 148                if (delay < 1000)
 149                        delay += 20;
 150                msleep(delay);
 151        }
 152 out:
 153        return error;
 154}
 155
 156static int wait_status(struct dlm_ls *ls, uint32_t status)
 157{
 158        uint32_t status_all = status << 1;
 159        int error;
 160
 161        if (ls->ls_low_nodeid == dlm_our_nodeid()) {
 162                error = wait_status_all(ls, status, 0);
 163                if (!error)
 164                        dlm_set_recover_status(ls, status_all);
 165        } else
 166                error = wait_status_low(ls, status_all, 0);
 167
 168        return error;
 169}
 170
 171int dlm_recover_members_wait(struct dlm_ls *ls)
 172{
 173        struct dlm_member *memb;
 174        struct dlm_slot *slots;
 175        int num_slots, slots_size;
 176        int error, rv;
 177        uint32_t gen;
 178
 179        list_for_each_entry(memb, &ls->ls_nodes, list) {
 180                memb->slot = -1;
 181                memb->generation = 0;
 182        }
 183
 184        if (ls->ls_low_nodeid == dlm_our_nodeid()) {
 185                error = wait_status_all(ls, DLM_RS_NODES, 1);
 186                if (error)
 187                        goto out;
 188
 189                /* slots array is sparse, slots_size may be > num_slots */
 190
 191                rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
 192                if (!rv) {
 193                        spin_lock(&ls->ls_recover_lock);
 194                        _set_recover_status(ls, DLM_RS_NODES_ALL);
 195                        ls->ls_num_slots = num_slots;
 196                        ls->ls_slots_size = slots_size;
 197                        ls->ls_slots = slots;
 198                        ls->ls_generation = gen;
 199                        spin_unlock(&ls->ls_recover_lock);
 200                } else {
 201                        dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
 202                }
 203        } else {
 204                error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS);
 205                if (error)
 206                        goto out;
 207
 208                dlm_slots_copy_in(ls);
 209        }
 210 out:
 211        return error;
 212}
 213
 214int dlm_recover_directory_wait(struct dlm_ls *ls)
 215{
 216        return wait_status(ls, DLM_RS_DIR);
 217}
 218
 219int dlm_recover_locks_wait(struct dlm_ls *ls)
 220{
 221        return wait_status(ls, DLM_RS_LOCKS);
 222}
 223
 224int dlm_recover_done_wait(struct dlm_ls *ls)
 225{
 226        return wait_status(ls, DLM_RS_DONE);
 227}
 228
 229/*
 230 * The recover_list contains all the rsb's for which we've requested the new
 231 * master nodeid.  As replies are returned from the resource directories the
 232 * rsb's are removed from the list.  When the list is empty we're done.
 233 *
 234 * The recover_list is later similarly used for all rsb's for which we've sent
 235 * new lkb's and need to receive new corresponding lkid's.
 236 *
 237 * We use the address of the rsb struct as a simple local identifier for the
 238 * rsb so we can match an rcom reply with the rsb it was sent for.
 239 */
 240
 241static int recover_list_empty(struct dlm_ls *ls)
 242{
 243        int empty;
 244
 245        spin_lock(&ls->ls_recover_list_lock);
 246        empty = list_empty(&ls->ls_recover_list);
 247        spin_unlock(&ls->ls_recover_list_lock);
 248
 249        return empty;
 250}
 251
 252static void recover_list_add(struct dlm_rsb *r)
 253{
 254        struct dlm_ls *ls = r->res_ls;
 255
 256        spin_lock(&ls->ls_recover_list_lock);
 257        if (list_empty(&r->res_recover_list)) {
 258                list_add_tail(&r->res_recover_list, &ls->ls_recover_list);
 259                ls->ls_recover_list_count++;
 260                dlm_hold_rsb(r);
 261        }
 262        spin_unlock(&ls->ls_recover_list_lock);
 263}
 264
 265static void recover_list_del(struct dlm_rsb *r)
 266{
 267        struct dlm_ls *ls = r->res_ls;
 268
 269        spin_lock(&ls->ls_recover_list_lock);
 270        list_del_init(&r->res_recover_list);
 271        ls->ls_recover_list_count--;
 272        spin_unlock(&ls->ls_recover_list_lock);
 273
 274        dlm_put_rsb(r);
 275}
 276
 277static void recover_list_clear(struct dlm_ls *ls)
 278{
 279        struct dlm_rsb *r, *s;
 280
 281        spin_lock(&ls->ls_recover_list_lock);
 282        list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) {
 283                list_del_init(&r->res_recover_list);
 284                r->res_recover_locks_count = 0;
 285                dlm_put_rsb(r);
 286                ls->ls_recover_list_count--;
 287        }
 288
 289        if (ls->ls_recover_list_count != 0) {
 290                log_error(ls, "warning: recover_list_count %d",
 291                          ls->ls_recover_list_count);
 292                ls->ls_recover_list_count = 0;
 293        }
 294        spin_unlock(&ls->ls_recover_list_lock);
 295}
 296
 297static int recover_idr_empty(struct dlm_ls *ls)
 298{
 299        int empty = 1;
 300
 301        spin_lock(&ls->ls_recover_idr_lock);
 302        if (ls->ls_recover_list_count)
 303                empty = 0;
 304        spin_unlock(&ls->ls_recover_idr_lock);
 305
 306        return empty;
 307}
 308
 309static int recover_idr_add(struct dlm_rsb *r)
 310{
 311        struct dlm_ls *ls = r->res_ls;
 312        int rv;
 313
 314        idr_preload(GFP_NOFS);
 315        spin_lock(&ls->ls_recover_idr_lock);
 316        if (r->res_id) {
 317                rv = -1;
 318                goto out_unlock;
 319        }
 320        rv = idr_alloc(&ls->ls_recover_idr, r, 1, 0, GFP_NOWAIT);
 321        if (rv < 0)
 322                goto out_unlock;
 323
 324        r->res_id = rv;
 325        ls->ls_recover_list_count++;
 326        dlm_hold_rsb(r);
 327        rv = 0;
 328out_unlock:
 329        spin_unlock(&ls->ls_recover_idr_lock);
 330        idr_preload_end();
 331        return rv;
 332}
 333
 334static void recover_idr_del(struct dlm_rsb *r)
 335{
 336        struct dlm_ls *ls = r->res_ls;
 337
 338        spin_lock(&ls->ls_recover_idr_lock);
 339        idr_remove(&ls->ls_recover_idr, r->res_id);
 340        r->res_id = 0;
 341        ls->ls_recover_list_count--;
 342        spin_unlock(&ls->ls_recover_idr_lock);
 343
 344        dlm_put_rsb(r);
 345}
 346
 347static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
 348{
 349        struct dlm_rsb *r;
 350
 351        spin_lock(&ls->ls_recover_idr_lock);
 352        r = idr_find(&ls->ls_recover_idr, (int)id);
 353        spin_unlock(&ls->ls_recover_idr_lock);
 354        return r;
 355}
 356
 357static void recover_idr_clear(struct dlm_ls *ls)
 358{
 359        struct dlm_rsb *r;
 360        int id;
 361
 362        spin_lock(&ls->ls_recover_idr_lock);
 363
 364        idr_for_each_entry(&ls->ls_recover_idr, r, id) {
 365                idr_remove(&ls->ls_recover_idr, id);
 366                r->res_id = 0;
 367                r->res_recover_locks_count = 0;
 368                ls->ls_recover_list_count--;
 369
 370                dlm_put_rsb(r);
 371        }
 372
 373        if (ls->ls_recover_list_count != 0) {
 374                log_error(ls, "warning: recover_list_count %d",
 375                          ls->ls_recover_list_count);
 376                ls->ls_recover_list_count = 0;
 377        }
 378        spin_unlock(&ls->ls_recover_idr_lock);
 379}
 380
 381
 382/* Master recovery: find new master node for rsb's that were
 383   mastered on nodes that have been removed.
 384
 385   dlm_recover_masters
 386   recover_master
 387   dlm_send_rcom_lookup            ->  receive_rcom_lookup
 388                                       dlm_dir_lookup
 389   receive_rcom_lookup_reply       <-
 390   dlm_recover_master_reply
 391   set_new_master
 392   set_master_lkbs
 393   set_lock_master
 394*/
 395
 396/*
 397 * Set the lock master for all LKBs in a lock queue
 398 * If we are the new master of the rsb, we may have received new
 399 * MSTCPY locks from other nodes already which we need to ignore
 400 * when setting the new nodeid.
 401 */
 402
 403static void set_lock_master(struct list_head *queue, int nodeid)
 404{
 405        struct dlm_lkb *lkb;
 406
 407        list_for_each_entry(lkb, queue, lkb_statequeue) {
 408                if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) {
 409                        lkb->lkb_nodeid = nodeid;
 410                        lkb->lkb_remid = 0;
 411                }
 412        }
 413}
 414
 415static void set_master_lkbs(struct dlm_rsb *r)
 416{
 417        set_lock_master(&r->res_grantqueue, r->res_nodeid);
 418        set_lock_master(&r->res_convertqueue, r->res_nodeid);
 419        set_lock_master(&r->res_waitqueue, r->res_nodeid);
 420}
 421
 422/*
 423 * Propagate the new master nodeid to locks
 424 * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
 425 * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which
 426 * rsb's to consider.
 427 */
 428
 429static void set_new_master(struct dlm_rsb *r)
 430{
 431        set_master_lkbs(r);
 432        rsb_set_flag(r, RSB_NEW_MASTER);
 433        rsb_set_flag(r, RSB_NEW_MASTER2);
 434}
 435
 436/*
 437 * We do async lookups on rsb's that need new masters.  The rsb's
 438 * waiting for a lookup reply are kept on the recover_list.
 439 *
 440 * Another node recovering the master may have sent us a rcom lookup,
 441 * and our dlm_master_lookup() set it as the new master, along with
 442 * NEW_MASTER so that we'll recover it here (this implies dir_nodeid
 443 * equals our_nodeid below).
 444 */
 445
 446static int recover_master(struct dlm_rsb *r, unsigned int *count)
 447{
 448        struct dlm_ls *ls = r->res_ls;
 449        int our_nodeid, dir_nodeid;
 450        int is_removed = 0;
 451        int error;
 452
 453        if (is_master(r))
 454                return 0;
 455
 456        is_removed = dlm_is_removed(ls, r->res_nodeid);
 457
 458        if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
 459                return 0;
 460
 461        our_nodeid = dlm_our_nodeid();
 462        dir_nodeid = dlm_dir_nodeid(r);
 463
 464        if (dir_nodeid == our_nodeid) {
 465                if (is_removed) {
 466                        r->res_master_nodeid = our_nodeid;
 467                        r->res_nodeid = 0;
 468                }
 469
 470                /* set master of lkbs to ourself when is_removed, or to
 471                   another new master which we set along with NEW_MASTER
 472                   in dlm_master_lookup */
 473                set_new_master(r);
 474                error = 0;
 475        } else {
 476                recover_idr_add(r);
 477                error = dlm_send_rcom_lookup(r, dir_nodeid);
 478        }
 479
 480        (*count)++;
 481        return error;
 482}
 483
 484/*
 485 * All MSTCPY locks are purged and rebuilt, even if the master stayed the same.
 486 * This is necessary because recovery can be started, aborted and restarted,
 487 * causing the master nodeid to briefly change during the aborted recovery, and
 488 * change back to the original value in the second recovery.  The MSTCPY locks
 489 * may or may not have been purged during the aborted recovery.  Another node
 490 * with an outstanding request in waiters list and a request reply saved in the
 491 * requestqueue, cannot know whether it should ignore the reply and resend the
 492 * request, or accept the reply and complete the request.  It must do the
 493 * former if the remote node purged MSTCPY locks, and it must do the later if
 494 * the remote node did not.  This is solved by always purging MSTCPY locks, in
 495 * which case, the request reply would always be ignored and the request
 496 * resent.
 497 */
 498
 499static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
 500{
 501        int dir_nodeid = dlm_dir_nodeid(r);
 502        int new_master = dir_nodeid;
 503
 504        if (dir_nodeid == dlm_our_nodeid())
 505                new_master = 0;
 506
 507        dlm_purge_mstcpy_locks(r);
 508        r->res_master_nodeid = dir_nodeid;
 509        r->res_nodeid = new_master;
 510        set_new_master(r);
 511        (*count)++;
 512        return 0;
 513}
 514
 515/*
 516 * Go through local root resources and for each rsb which has a master which
 517 * has departed, get the new master nodeid from the directory.  The dir will
 518 * assign mastery to the first node to look up the new master.  That means
 519 * we'll discover in this lookup if we're the new master of any rsb's.
 520 *
 521 * We fire off all the dir lookup requests individually and asynchronously to
 522 * the correct dir node.
 523 */
 524
 525int dlm_recover_masters(struct dlm_ls *ls)
 526{
 527        struct dlm_rsb *r;
 528        unsigned int total = 0;
 529        unsigned int count = 0;
 530        int nodir = dlm_no_directory(ls);
 531        int error;
 532
 533        log_rinfo(ls, "dlm_recover_masters");
 534
 535        down_read(&ls->ls_root_sem);
 536        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 537                if (dlm_recovery_stopped(ls)) {
 538                        up_read(&ls->ls_root_sem);
 539                        error = -EINTR;
 540                        goto out;
 541                }
 542
 543                lock_rsb(r);
 544                if (nodir)
 545                        error = recover_master_static(r, &count);
 546                else
 547                        error = recover_master(r, &count);
 548                unlock_rsb(r);
 549                cond_resched();
 550                total++;
 551
 552                if (error) {
 553                        up_read(&ls->ls_root_sem);
 554                        goto out;
 555                }
 556        }
 557        up_read(&ls->ls_root_sem);
 558
 559        log_rinfo(ls, "dlm_recover_masters %u of %u", count, total);
 560
 561        error = dlm_wait_function(ls, &recover_idr_empty);
 562 out:
 563        if (error)
 564                recover_idr_clear(ls);
 565        return error;
 566}
 567
 568int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
 569{
 570        struct dlm_rsb *r;
 571        int ret_nodeid, new_master;
 572
 573        r = recover_idr_find(ls, rc->rc_id);
 574        if (!r) {
 575                log_error(ls, "dlm_recover_master_reply no id %llx",
 576                          (unsigned long long)rc->rc_id);
 577                goto out;
 578        }
 579
 580        ret_nodeid = rc->rc_result;
 581
 582        if (ret_nodeid == dlm_our_nodeid())
 583                new_master = 0;
 584        else
 585                new_master = ret_nodeid;
 586
 587        lock_rsb(r);
 588        r->res_master_nodeid = ret_nodeid;
 589        r->res_nodeid = new_master;
 590        set_new_master(r);
 591        unlock_rsb(r);
 592        recover_idr_del(r);
 593
 594        if (recover_idr_empty(ls))
 595                wake_up(&ls->ls_wait_general);
 596 out:
 597        return 0;
 598}
 599
 600
 601/* Lock recovery: rebuild the process-copy locks we hold on a
 602   remastered rsb on the new rsb master.
 603
 604   dlm_recover_locks
 605   recover_locks
 606   recover_locks_queue
 607   dlm_send_rcom_lock              ->  receive_rcom_lock
 608                                       dlm_recover_master_copy
 609   receive_rcom_lock_reply         <-
 610   dlm_recover_process_copy
 611*/
 612
 613
 614/*
 615 * keep a count of the number of lkb's we send to the new master; when we get
 616 * an equal number of replies then recovery for the rsb is done
 617 */
 618
 619static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head)
 620{
 621        struct dlm_lkb *lkb;
 622        int error = 0;
 623
 624        list_for_each_entry(lkb, head, lkb_statequeue) {
 625                error = dlm_send_rcom_lock(r, lkb);
 626                if (error)
 627                        break;
 628                r->res_recover_locks_count++;
 629        }
 630
 631        return error;
 632}
 633
 634static int recover_locks(struct dlm_rsb *r)
 635{
 636        int error = 0;
 637
 638        lock_rsb(r);
 639
 640        DLM_ASSERT(!r->res_recover_locks_count, dlm_dump_rsb(r););
 641
 642        error = recover_locks_queue(r, &r->res_grantqueue);
 643        if (error)
 644                goto out;
 645        error = recover_locks_queue(r, &r->res_convertqueue);
 646        if (error)
 647                goto out;
 648        error = recover_locks_queue(r, &r->res_waitqueue);
 649        if (error)
 650                goto out;
 651
 652        if (r->res_recover_locks_count)
 653                recover_list_add(r);
 654        else
 655                rsb_clear_flag(r, RSB_NEW_MASTER);
 656 out:
 657        unlock_rsb(r);
 658        return error;
 659}
 660
 661int dlm_recover_locks(struct dlm_ls *ls)
 662{
 663        struct dlm_rsb *r;
 664        int error, count = 0;
 665
 666        down_read(&ls->ls_root_sem);
 667        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 668                if (is_master(r)) {
 669                        rsb_clear_flag(r, RSB_NEW_MASTER);
 670                        continue;
 671                }
 672
 673                if (!rsb_flag(r, RSB_NEW_MASTER))
 674                        continue;
 675
 676                if (dlm_recovery_stopped(ls)) {
 677                        error = -EINTR;
 678                        up_read(&ls->ls_root_sem);
 679                        goto out;
 680                }
 681
 682                error = recover_locks(r);
 683                if (error) {
 684                        up_read(&ls->ls_root_sem);
 685                        goto out;
 686                }
 687
 688                count += r->res_recover_locks_count;
 689        }
 690        up_read(&ls->ls_root_sem);
 691
 692        log_rinfo(ls, "dlm_recover_locks %d out", count);
 693
 694        error = dlm_wait_function(ls, &recover_list_empty);
 695 out:
 696        if (error)
 697                recover_list_clear(ls);
 698        return error;
 699}
 700
 701void dlm_recovered_lock(struct dlm_rsb *r)
 702{
 703        DLM_ASSERT(rsb_flag(r, RSB_NEW_MASTER), dlm_dump_rsb(r););
 704
 705        r->res_recover_locks_count--;
 706        if (!r->res_recover_locks_count) {
 707                rsb_clear_flag(r, RSB_NEW_MASTER);
 708                recover_list_del(r);
 709        }
 710
 711        if (recover_list_empty(r->res_ls))
 712                wake_up(&r->res_ls->ls_wait_general);
 713}
 714
 715/*
 716 * The lvb needs to be recovered on all master rsb's.  This includes setting
 717 * the VALNOTVALID flag if necessary, and determining the correct lvb contents
 718 * based on the lvb's of the locks held on the rsb.
 719 *
 720 * RSB_VALNOTVALID is set in two cases:
 721 *
 722 * 1. we are master, but not new, and we purged an EX/PW lock held by a
 723 * failed node (in dlm_recover_purge which set RSB_RECOVER_LVB_INVAL)
 724 *
 725 * 2. we are a new master, and there are only NL/CR locks left.
 726 * (We could probably improve this by only invaliding in this way when
 727 * the previous master left uncleanly.  VMS docs mention that.)
 728 *
 729 * The LVB contents are only considered for changing when this is a new master
 730 * of the rsb (NEW_MASTER2).  Then, the rsb's lvb is taken from any lkb with
 731 * mode > CR.  If no lkb's exist with mode above CR, the lvb contents are taken
 732 * from the lkb with the largest lvb sequence number.
 733 */
 734
 735static void recover_lvb(struct dlm_rsb *r)
 736{
 737        struct dlm_lkb *lkb, *high_lkb = NULL;
 738        uint32_t high_seq = 0;
 739        int lock_lvb_exists = 0;
 740        int big_lock_exists = 0;
 741        int lvblen = r->res_ls->ls_lvblen;
 742
 743        if (!rsb_flag(r, RSB_NEW_MASTER2) &&
 744            rsb_flag(r, RSB_RECOVER_LVB_INVAL)) {
 745                /* case 1 above */
 746                rsb_set_flag(r, RSB_VALNOTVALID);
 747                return;
 748        }
 749
 750        if (!rsb_flag(r, RSB_NEW_MASTER2))
 751                return;
 752
 753        /* we are the new master, so figure out if VALNOTVALID should
 754           be set, and set the rsb lvb from the best lkb available. */
 755
 756        list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
 757                if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
 758                        continue;
 759
 760                lock_lvb_exists = 1;
 761
 762                if (lkb->lkb_grmode > DLM_LOCK_CR) {
 763                        big_lock_exists = 1;
 764                        goto setflag;
 765                }
 766
 767                if (((int)lkb->lkb_lvbseq - (int)high_seq) >= 0) {
 768                        high_lkb = lkb;
 769                        high_seq = lkb->lkb_lvbseq;
 770                }
 771        }
 772
 773        list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
 774                if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
 775                        continue;
 776
 777                lock_lvb_exists = 1;
 778
 779                if (lkb->lkb_grmode > DLM_LOCK_CR) {
 780                        big_lock_exists = 1;
 781                        goto setflag;
 782                }
 783
 784                if (((int)lkb->lkb_lvbseq - (int)high_seq) >= 0) {
 785                        high_lkb = lkb;
 786                        high_seq = lkb->lkb_lvbseq;
 787                }
 788        }
 789
 790 setflag:
 791        if (!lock_lvb_exists)
 792                goto out;
 793
 794        /* lvb is invalidated if only NL/CR locks remain */
 795        if (!big_lock_exists)
 796                rsb_set_flag(r, RSB_VALNOTVALID);
 797
 798        if (!r->res_lvbptr) {
 799                r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
 800                if (!r->res_lvbptr)
 801                        goto out;
 802        }
 803
 804        if (big_lock_exists) {
 805                r->res_lvbseq = lkb->lkb_lvbseq;
 806                memcpy(r->res_lvbptr, lkb->lkb_lvbptr, lvblen);
 807        } else if (high_lkb) {
 808                r->res_lvbseq = high_lkb->lkb_lvbseq;
 809                memcpy(r->res_lvbptr, high_lkb->lkb_lvbptr, lvblen);
 810        } else {
 811                r->res_lvbseq = 0;
 812                memset(r->res_lvbptr, 0, lvblen);
 813        }
 814 out:
 815        return;
 816}
 817
 818/* All master rsb's flagged RECOVER_CONVERT need to be looked at.  The locks
 819   converting PR->CW or CW->PR need to have their lkb_grmode set. */
 820
 821static void recover_conversion(struct dlm_rsb *r)
 822{
 823        struct dlm_ls *ls = r->res_ls;
 824        struct dlm_lkb *lkb;
 825        int grmode = -1;
 826
 827        list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
 828                if (lkb->lkb_grmode == DLM_LOCK_PR ||
 829                    lkb->lkb_grmode == DLM_LOCK_CW) {
 830                        grmode = lkb->lkb_grmode;
 831                        break;
 832                }
 833        }
 834
 835        list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
 836                if (lkb->lkb_grmode != DLM_LOCK_IV)
 837                        continue;
 838                if (grmode == -1) {
 839                        log_debug(ls, "recover_conversion %x set gr to rq %d",
 840                                  lkb->lkb_id, lkb->lkb_rqmode);
 841                        lkb->lkb_grmode = lkb->lkb_rqmode;
 842                } else {
 843                        log_debug(ls, "recover_conversion %x set gr %d",
 844                                  lkb->lkb_id, grmode);
 845                        lkb->lkb_grmode = grmode;
 846                }
 847        }
 848}
 849
 850/* We've become the new master for this rsb and waiting/converting locks may
 851   need to be granted in dlm_recover_grant() due to locks that may have
 852   existed from a removed node. */
 853
 854static void recover_grant(struct dlm_rsb *r)
 855{
 856        if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
 857                rsb_set_flag(r, RSB_RECOVER_GRANT);
 858}
 859
 860void dlm_recover_rsbs(struct dlm_ls *ls)
 861{
 862        struct dlm_rsb *r;
 863        unsigned int count = 0;
 864
 865        down_read(&ls->ls_root_sem);
 866        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 867                lock_rsb(r);
 868                if (is_master(r)) {
 869                        if (rsb_flag(r, RSB_RECOVER_CONVERT))
 870                                recover_conversion(r);
 871
 872                        /* recover lvb before granting locks so the updated
 873                           lvb/VALNOTVALID is presented in the completion */
 874                        recover_lvb(r);
 875
 876                        if (rsb_flag(r, RSB_NEW_MASTER2))
 877                                recover_grant(r);
 878                        count++;
 879                } else {
 880                        rsb_clear_flag(r, RSB_VALNOTVALID);
 881                }
 882                rsb_clear_flag(r, RSB_RECOVER_CONVERT);
 883                rsb_clear_flag(r, RSB_RECOVER_LVB_INVAL);
 884                rsb_clear_flag(r, RSB_NEW_MASTER2);
 885                unlock_rsb(r);
 886        }
 887        up_read(&ls->ls_root_sem);
 888
 889        if (count)
 890                log_rinfo(ls, "dlm_recover_rsbs %d done", count);
 891}
 892
 893/* Create a single list of all root rsb's to be used during recovery */
 894
 895int dlm_create_root_list(struct dlm_ls *ls)
 896{
 897        struct rb_node *n;
 898        struct dlm_rsb *r;
 899        int i, error = 0;
 900
 901        down_write(&ls->ls_root_sem);
 902        if (!list_empty(&ls->ls_root_list)) {
 903                log_error(ls, "root list not empty");
 904                error = -EINVAL;
 905                goto out;
 906        }
 907
 908        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 909                spin_lock(&ls->ls_rsbtbl[i].lock);
 910                for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
 911                        r = rb_entry(n, struct dlm_rsb, res_hashnode);
 912                        list_add(&r->res_root_list, &ls->ls_root_list);
 913                        dlm_hold_rsb(r);
 914                }
 915
 916                if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
 917                        log_error(ls, "dlm_create_root_list toss not empty");
 918                spin_unlock(&ls->ls_rsbtbl[i].lock);
 919        }
 920 out:
 921        up_write(&ls->ls_root_sem);
 922        return error;
 923}
 924
 925void dlm_release_root_list(struct dlm_ls *ls)
 926{
 927        struct dlm_rsb *r, *safe;
 928
 929        down_write(&ls->ls_root_sem);
 930        list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) {
 931                list_del_init(&r->res_root_list);
 932                dlm_put_rsb(r);
 933        }
 934        up_write(&ls->ls_root_sem);
 935}
 936
 937void dlm_clear_toss(struct dlm_ls *ls)
 938{
 939        struct rb_node *n, *next;
 940        struct dlm_rsb *r;
 941        unsigned int count = 0;
 942        int i;
 943
 944        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 945                spin_lock(&ls->ls_rsbtbl[i].lock);
 946                for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
 947                        next = rb_next(n);
 948                        r = rb_entry(n, struct dlm_rsb, res_hashnode);
 949                        rb_erase(n, &ls->ls_rsbtbl[i].toss);
 950                        dlm_free_rsb(r);
 951                        count++;
 952                }
 953                spin_unlock(&ls->ls_rsbtbl[i].lock);
 954        }
 955
 956        if (count)
 957                log_rinfo(ls, "dlm_clear_toss %u done", count);
 958}
 959
 960