linux/fs/dlm/member.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
   5**
   6**  This copyrighted material is made available to anyone wishing to use,
   7**  modify, copy, or redistribute it subject to the terms and conditions
   8**  of the GNU General Public License v.2.
   9**
  10*******************************************************************************
  11******************************************************************************/
  12
  13#include "dlm_internal.h"
  14#include "lockspace.h"
  15#include "member.h"
  16#include "recoverd.h"
  17#include "recover.h"
  18#include "rcom.h"
  19#include "config.h"
  20#include "lowcomms.h"
  21
  22int dlm_slots_version(struct dlm_header *h)
  23{
  24        if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS)
  25                return 0;
  26        return 1;
  27}
  28
  29void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
  30                   struct dlm_member *memb)
  31{
  32        struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
  33
  34        if (!dlm_slots_version(&rc->rc_header))
  35                return;
  36
  37        memb->slot = le16_to_cpu(rf->rf_our_slot);
  38        memb->generation = le32_to_cpu(rf->rf_generation);
  39}
  40
  41void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
  42{
  43        struct dlm_slot *slot;
  44        struct rcom_slot *ro;
  45        int i;
  46
  47        ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
  48
  49        /* ls_slots array is sparse, but not rcom_slots */
  50
  51        for (i = 0; i < ls->ls_slots_size; i++) {
  52                slot = &ls->ls_slots[i];
  53                if (!slot->nodeid)
  54                        continue;
  55                ro->ro_nodeid = cpu_to_le32(slot->nodeid);
  56                ro->ro_slot = cpu_to_le16(slot->slot);
  57                ro++;
  58        }
  59}
  60
  61#define SLOT_DEBUG_LINE 128
  62
  63static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
  64                      struct rcom_slot *ro0, struct dlm_slot *array,
  65                      int array_size)
  66{
  67        char line[SLOT_DEBUG_LINE];
  68        int len = SLOT_DEBUG_LINE - 1;
  69        int pos = 0;
  70        int ret, i;
  71
  72        memset(line, 0, sizeof(line));
  73
  74        if (array) {
  75                for (i = 0; i < array_size; i++) {
  76                        if (!array[i].nodeid)
  77                                continue;
  78
  79                        ret = snprintf(line + pos, len - pos, " %d:%d",
  80                                       array[i].slot, array[i].nodeid);
  81                        if (ret >= len - pos)
  82                                break;
  83                        pos += ret;
  84                }
  85        } else if (ro0) {
  86                for (i = 0; i < num_slots; i++) {
  87                        ret = snprintf(line + pos, len - pos, " %d:%d",
  88                                       ro0[i].ro_slot, ro0[i].ro_nodeid);
  89                        if (ret >= len - pos)
  90                                break;
  91                        pos += ret;
  92                }
  93        }
  94
  95        log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line);
  96}
  97
  98int dlm_slots_copy_in(struct dlm_ls *ls)
  99{
 100        struct dlm_member *memb;
 101        struct dlm_rcom *rc = ls->ls_recover_buf;
 102        struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
 103        struct rcom_slot *ro0, *ro;
 104        int our_nodeid = dlm_our_nodeid();
 105        int i, num_slots;
 106        uint32_t gen;
 107
 108        if (!dlm_slots_version(&rc->rc_header))
 109                return -1;
 110
 111        gen = le32_to_cpu(rf->rf_generation);
 112        if (gen <= ls->ls_generation) {
 113                log_error(ls, "dlm_slots_copy_in gen %u old %u",
 114                          gen, ls->ls_generation);
 115        }
 116        ls->ls_generation = gen;
 117
 118        num_slots = le16_to_cpu(rf->rf_num_slots);
 119        if (!num_slots)
 120                return -1;
 121
 122        ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
 123
 124        for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
 125                ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid);
 126                ro->ro_slot = le16_to_cpu(ro->ro_slot);
 127        }
 128
 129        log_slots(ls, gen, num_slots, ro0, NULL, 0);
 130
 131        list_for_each_entry(memb, &ls->ls_nodes, list) {
 132                for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
 133                        if (ro->ro_nodeid != memb->nodeid)
 134                                continue;
 135                        memb->slot = ro->ro_slot;
 136                        memb->slot_prev = memb->slot;
 137                        break;
 138                }
 139
 140                if (memb->nodeid == our_nodeid) {
 141                        if (ls->ls_slot && ls->ls_slot != memb->slot) {
 142                                log_error(ls, "dlm_slots_copy_in our slot "
 143                                          "changed %d %d", ls->ls_slot,
 144                                          memb->slot);
 145                                return -1;
 146                        }
 147
 148                        if (!ls->ls_slot)
 149                                ls->ls_slot = memb->slot;
 150                }
 151
 152                if (!memb->slot) {
 153                        log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
 154                                   memb->nodeid);
 155                        return -1;
 156                }
 157        }
 158
 159        return 0;
 160}
 161
 162/* for any nodes that do not support slots, we will not have set memb->slot
 163   in wait_status_all(), so memb->slot will remain -1, and we will not
 164   assign slots or set ls_num_slots here */
 165
 166int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
 167                     struct dlm_slot **slots_out, uint32_t *gen_out)
 168{
 169        struct dlm_member *memb;
 170        struct dlm_slot *array;
 171        int our_nodeid = dlm_our_nodeid();
 172        int array_size, max_slots, i;
 173        int need = 0;
 174        int max = 0;
 175        int num = 0;
 176        uint32_t gen = 0;
 177
 178        /* our own memb struct will have slot -1 gen 0 */
 179
 180        list_for_each_entry(memb, &ls->ls_nodes, list) {
 181                if (memb->nodeid == our_nodeid) {
 182                        memb->slot = ls->ls_slot;
 183                        memb->generation = ls->ls_generation;
 184                        break;
 185                }
 186        }
 187
 188        list_for_each_entry(memb, &ls->ls_nodes, list) {
 189                if (memb->generation > gen)
 190                        gen = memb->generation;
 191
 192                /* node doesn't support slots */
 193
 194                if (memb->slot == -1)
 195                        return -1;
 196
 197                /* node needs a slot assigned */
 198
 199                if (!memb->slot)
 200                        need++;
 201
 202                /* node has a slot assigned */
 203
 204                num++;
 205
 206                if (!max || max < memb->slot)
 207                        max = memb->slot;
 208
 209                /* sanity check, once slot is assigned it shouldn't change */
 210
 211                if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
 212                        log_error(ls, "nodeid %d slot changed %d %d",
 213                                  memb->nodeid, memb->slot_prev, memb->slot);
 214                        return -1;
 215                }
 216                memb->slot_prev = memb->slot;
 217        }
 218
 219        array_size = max + need;
 220
 221        array = kzalloc(array_size * sizeof(struct dlm_slot), GFP_NOFS);
 222        if (!array)
 223                return -ENOMEM;
 224
 225        num = 0;
 226
 227        /* fill in slots (offsets) that are used */
 228
 229        list_for_each_entry(memb, &ls->ls_nodes, list) {
 230                if (!memb->slot)
 231                        continue;
 232
 233                if (memb->slot > array_size) {
 234                        log_error(ls, "invalid slot number %d", memb->slot);
 235                        kfree(array);
 236                        return -1;
 237                }
 238
 239                array[memb->slot - 1].nodeid = memb->nodeid;
 240                array[memb->slot - 1].slot = memb->slot;
 241                num++;
 242        }
 243
 244        /* assign new slots from unused offsets */
 245
 246        list_for_each_entry(memb, &ls->ls_nodes, list) {
 247                if (memb->slot)
 248                        continue;
 249
 250                for (i = 0; i < array_size; i++) {
 251                        if (array[i].nodeid)
 252                                continue;
 253
 254                        memb->slot = i + 1;
 255                        memb->slot_prev = memb->slot;
 256                        array[i].nodeid = memb->nodeid;
 257                        array[i].slot = memb->slot;
 258                        num++;
 259
 260                        if (!ls->ls_slot && memb->nodeid == our_nodeid)
 261                                ls->ls_slot = memb->slot;
 262                        break;
 263                }
 264
 265                if (!memb->slot) {
 266                        log_error(ls, "no free slot found");
 267                        kfree(array);
 268                        return -1;
 269                }
 270        }
 271
 272        gen++;
 273
 274        log_slots(ls, gen, num, NULL, array, array_size);
 275
 276        max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) -
 277                     sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
 278
 279        if (num > max_slots) {
 280                log_error(ls, "num_slots %d exceeds max_slots %d",
 281                          num, max_slots);
 282                kfree(array);
 283                return -1;
 284        }
 285
 286        *gen_out = gen;
 287        *slots_out = array;
 288        *slots_size = array_size;
 289        *num_slots = num;
 290        return 0;
 291}
 292
 293static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
 294{
 295        struct dlm_member *memb = NULL;
 296        struct list_head *tmp;
 297        struct list_head *newlist = &new->list;
 298        struct list_head *head = &ls->ls_nodes;
 299
 300        list_for_each(tmp, head) {
 301                memb = list_entry(tmp, struct dlm_member, list);
 302                if (new->nodeid < memb->nodeid)
 303                        break;
 304        }
 305
 306        if (!memb)
 307                list_add_tail(newlist, head);
 308        else {
 309                /* FIXME: can use list macro here */
 310                newlist->prev = tmp->prev;
 311                newlist->next = tmp;
 312                tmp->prev->next = newlist;
 313                tmp->prev = newlist;
 314        }
 315}
 316
 317static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
 318{
 319        struct dlm_member *memb;
 320        int error;
 321
 322        memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
 323        if (!memb)
 324                return -ENOMEM;
 325
 326        error = dlm_lowcomms_connect_node(node->nodeid);
 327        if (error < 0) {
 328                kfree(memb);
 329                return error;
 330        }
 331
 332        memb->nodeid = node->nodeid;
 333        memb->weight = node->weight;
 334        memb->comm_seq = node->comm_seq;
 335        add_ordered_member(ls, memb);
 336        ls->ls_num_nodes++;
 337        return 0;
 338}
 339
 340static struct dlm_member *find_memb(struct list_head *head, int nodeid)
 341{
 342        struct dlm_member *memb;
 343
 344        list_for_each_entry(memb, head, list) {
 345                if (memb->nodeid == nodeid)
 346                        return memb;
 347        }
 348        return NULL;
 349}
 350
 351int dlm_is_member(struct dlm_ls *ls, int nodeid)
 352{
 353        if (find_memb(&ls->ls_nodes, nodeid))
 354                return 1;
 355        return 0;
 356}
 357
 358int dlm_is_removed(struct dlm_ls *ls, int nodeid)
 359{
 360        if (find_memb(&ls->ls_nodes_gone, nodeid))
 361                return 1;
 362        return 0;
 363}
 364
 365static void clear_memb_list(struct list_head *head)
 366{
 367        struct dlm_member *memb;
 368
 369        while (!list_empty(head)) {
 370                memb = list_entry(head->next, struct dlm_member, list);
 371                list_del(&memb->list);
 372                kfree(memb);
 373        }
 374}
 375
 376void dlm_clear_members(struct dlm_ls *ls)
 377{
 378        clear_memb_list(&ls->ls_nodes);
 379        ls->ls_num_nodes = 0;
 380}
 381
 382void dlm_clear_members_gone(struct dlm_ls *ls)
 383{
 384        clear_memb_list(&ls->ls_nodes_gone);
 385}
 386
 387static void make_member_array(struct dlm_ls *ls)
 388{
 389        struct dlm_member *memb;
 390        int i, w, x = 0, total = 0, all_zero = 0, *array;
 391
 392        kfree(ls->ls_node_array);
 393        ls->ls_node_array = NULL;
 394
 395        list_for_each_entry(memb, &ls->ls_nodes, list) {
 396                if (memb->weight)
 397                        total += memb->weight;
 398        }
 399
 400        /* all nodes revert to weight of 1 if all have weight 0 */
 401
 402        if (!total) {
 403                total = ls->ls_num_nodes;
 404                all_zero = 1;
 405        }
 406
 407        ls->ls_total_weight = total;
 408
 409        array = kmalloc(sizeof(int) * total, GFP_NOFS);
 410        if (!array)
 411                return;
 412
 413        list_for_each_entry(memb, &ls->ls_nodes, list) {
 414                if (!all_zero && !memb->weight)
 415                        continue;
 416
 417                if (all_zero)
 418                        w = 1;
 419                else
 420                        w = memb->weight;
 421
 422                DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
 423
 424                for (i = 0; i < w; i++)
 425                        array[x++] = memb->nodeid;
 426        }
 427
 428        ls->ls_node_array = array;
 429}
 430
 431/* send a status request to all members just to establish comms connections */
 432
 433static int ping_members(struct dlm_ls *ls)
 434{
 435        struct dlm_member *memb;
 436        int error = 0;
 437
 438        list_for_each_entry(memb, &ls->ls_nodes, list) {
 439                error = dlm_recovery_stopped(ls);
 440                if (error)
 441                        break;
 442                error = dlm_rcom_status(ls, memb->nodeid, 0);
 443                if (error)
 444                        break;
 445        }
 446        if (error)
 447                log_rinfo(ls, "ping_members aborted %d last nodeid %d",
 448                          error, ls->ls_recover_nodeid);
 449        return error;
 450}
 451
 452static void dlm_lsop_recover_prep(struct dlm_ls *ls)
 453{
 454        if (!ls->ls_ops || !ls->ls_ops->recover_prep)
 455                return;
 456        ls->ls_ops->recover_prep(ls->ls_ops_arg);
 457}
 458
 459static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
 460{
 461        struct dlm_slot slot;
 462        uint32_t seq;
 463        int error;
 464
 465        if (!ls->ls_ops || !ls->ls_ops->recover_slot)
 466                return;
 467
 468        /* if there is no comms connection with this node
 469           or the present comms connection is newer
 470           than the one when this member was added, then
 471           we consider the node to have failed (versus
 472           being removed due to dlm_release_lockspace) */
 473
 474        error = dlm_comm_seq(memb->nodeid, &seq);
 475
 476        if (!error && seq == memb->comm_seq)
 477                return;
 478
 479        slot.nodeid = memb->nodeid;
 480        slot.slot = memb->slot;
 481
 482        ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
 483}
 484
 485void dlm_lsop_recover_done(struct dlm_ls *ls)
 486{
 487        struct dlm_member *memb;
 488        struct dlm_slot *slots;
 489        int i, num;
 490
 491        if (!ls->ls_ops || !ls->ls_ops->recover_done)
 492                return;
 493
 494        num = ls->ls_num_nodes;
 495
 496        slots = kzalloc(num * sizeof(struct dlm_slot), GFP_KERNEL);
 497        if (!slots)
 498                return;
 499
 500        i = 0;
 501        list_for_each_entry(memb, &ls->ls_nodes, list) {
 502                if (i == num) {
 503                        log_error(ls, "dlm_lsop_recover_done bad num %d", num);
 504                        goto out;
 505                }
 506                slots[i].nodeid = memb->nodeid;
 507                slots[i].slot = memb->slot;
 508                i++;
 509        }
 510
 511        ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
 512                                 ls->ls_slot, ls->ls_generation);
 513 out:
 514        kfree(slots);
 515}
 516
 517static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
 518                                                int nodeid)
 519{
 520        int i;
 521
 522        for (i = 0; i < rv->nodes_count; i++) {
 523                if (rv->nodes[i].nodeid == nodeid)
 524                        return &rv->nodes[i];
 525        }
 526        return NULL;
 527}
 528
 529int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
 530{
 531        struct dlm_member *memb, *safe;
 532        struct dlm_config_node *node;
 533        int i, error, neg = 0, low = -1;
 534
 535        /* previously removed members that we've not finished removing need to
 536           count as a negative change so the "neg" recovery steps will happen */
 537
 538        list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
 539                log_rinfo(ls, "prev removed member %d", memb->nodeid);
 540                neg++;
 541        }
 542
 543        /* move departed members from ls_nodes to ls_nodes_gone */
 544
 545        list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
 546                node = find_config_node(rv, memb->nodeid);
 547                if (node && !node->new)
 548                        continue;
 549
 550                if (!node) {
 551                        log_rinfo(ls, "remove member %d", memb->nodeid);
 552                } else {
 553                        /* removed and re-added */
 554                        log_rinfo(ls, "remove member %d comm_seq %u %u",
 555                                  memb->nodeid, memb->comm_seq, node->comm_seq);
 556                }
 557
 558                neg++;
 559                list_move(&memb->list, &ls->ls_nodes_gone);
 560                ls->ls_num_nodes--;
 561                dlm_lsop_recover_slot(ls, memb);
 562        }
 563
 564        /* add new members to ls_nodes */
 565
 566        for (i = 0; i < rv->nodes_count; i++) {
 567                node = &rv->nodes[i];
 568                if (dlm_is_member(ls, node->nodeid))
 569                        continue;
 570                dlm_add_member(ls, node);
 571                log_rinfo(ls, "add member %d", node->nodeid);
 572        }
 573
 574        list_for_each_entry(memb, &ls->ls_nodes, list) {
 575                if (low == -1 || memb->nodeid < low)
 576                        low = memb->nodeid;
 577        }
 578        ls->ls_low_nodeid = low;
 579
 580        make_member_array(ls);
 581        *neg_out = neg;
 582
 583        error = ping_members(ls);
 584        if (!error || error == -EPROTO) {
 585                /* new_lockspace() may be waiting to know if the config
 586                   is good or bad */
 587                ls->ls_members_result = error;
 588                complete(&ls->ls_members_done);
 589        }
 590
 591        log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
 592        return error;
 593}
 594
 595/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
 596   dlm_ls_start() is called on any of them to start the new recovery. */
 597
 598int dlm_ls_stop(struct dlm_ls *ls)
 599{
 600        int new;
 601
 602        /*
 603         * Prevent dlm_recv from being in the middle of something when we do
 604         * the stop.  This includes ensuring dlm_recv isn't processing a
 605         * recovery message (rcom), while dlm_recoverd is aborting and
 606         * resetting things from an in-progress recovery.  i.e. we want
 607         * dlm_recoverd to abort its recovery without worrying about dlm_recv
 608         * processing an rcom at the same time.  Stopping dlm_recv also makes
 609         * it easy for dlm_receive_message() to check locking stopped and add a
 610         * message to the requestqueue without races.
 611         */
 612
 613        down_write(&ls->ls_recv_active);
 614
 615        /*
 616         * Abort any recovery that's in progress (see RECOVER_STOP,
 617         * dlm_recovery_stopped()) and tell any other threads running in the
 618         * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
 619         */
 620
 621        spin_lock(&ls->ls_recover_lock);
 622        set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 623        new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
 624        ls->ls_recover_seq++;
 625        spin_unlock(&ls->ls_recover_lock);
 626
 627        /*
 628         * Let dlm_recv run again, now any normal messages will be saved on the
 629         * requestqueue for later.
 630         */
 631
 632        up_write(&ls->ls_recv_active);
 633
 634        /*
 635         * This in_recovery lock does two things:
 636         * 1) Keeps this function from returning until all threads are out
 637         *    of locking routines and locking is truly stopped.
 638         * 2) Keeps any new requests from being processed until it's unlocked
 639         *    when recovery is complete.
 640         */
 641
 642        if (new) {
 643                set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
 644                wake_up_process(ls->ls_recoverd_task);
 645                wait_event(ls->ls_recover_lock_wait,
 646                           test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
 647        }
 648
 649        /*
 650         * The recoverd suspend/resume makes sure that dlm_recoverd (if
 651         * running) has noticed RECOVER_STOP above and quit processing the
 652         * previous recovery.
 653         */
 654
 655        dlm_recoverd_suspend(ls);
 656
 657        spin_lock(&ls->ls_recover_lock);
 658        kfree(ls->ls_slots);
 659        ls->ls_slots = NULL;
 660        ls->ls_num_slots = 0;
 661        ls->ls_slots_size = 0;
 662        ls->ls_recover_status = 0;
 663        spin_unlock(&ls->ls_recover_lock);
 664
 665        dlm_recoverd_resume(ls);
 666
 667        if (!ls->ls_recover_begin)
 668                ls->ls_recover_begin = jiffies;
 669
 670        dlm_lsop_recover_prep(ls);
 671        return 0;
 672}
 673
 674int dlm_ls_start(struct dlm_ls *ls)
 675{
 676        struct dlm_recover *rv = NULL, *rv_old;
 677        struct dlm_config_node *nodes;
 678        int error, count;
 679
 680        rv = kzalloc(sizeof(struct dlm_recover), GFP_NOFS);
 681        if (!rv)
 682                return -ENOMEM;
 683
 684        error = dlm_config_nodes(ls->ls_name, &nodes, &count);
 685        if (error < 0)
 686                goto fail;
 687
 688        spin_lock(&ls->ls_recover_lock);
 689
 690        /* the lockspace needs to be stopped before it can be started */
 691
 692        if (!dlm_locking_stopped(ls)) {
 693                spin_unlock(&ls->ls_recover_lock);
 694                log_error(ls, "start ignored: lockspace running");
 695                error = -EINVAL;
 696                goto fail;
 697        }
 698
 699        rv->nodes = nodes;
 700        rv->nodes_count = count;
 701        rv->seq = ++ls->ls_recover_seq;
 702        rv_old = ls->ls_recover_args;
 703        ls->ls_recover_args = rv;
 704        spin_unlock(&ls->ls_recover_lock);
 705
 706        if (rv_old) {
 707                log_error(ls, "unused recovery %llx %d",
 708                          (unsigned long long)rv_old->seq, rv_old->nodes_count);
 709                kfree(rv_old->nodes);
 710                kfree(rv_old);
 711        }
 712
 713        set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
 714        wake_up_process(ls->ls_recoverd_task);
 715        return 0;
 716
 717 fail:
 718        kfree(rv);
 719        kfree(nodes);
 720        return error;
 721}
 722
 723