linux/fs/dlm/member.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
   5**
   6**  This copyrighted material is made available to anyone wishing to use,
   7**  modify, copy, or redistribute it subject to the terms and conditions
   8**  of the GNU General Public License v.2.
   9**
  10*******************************************************************************
  11******************************************************************************/
  12
  13#include "dlm_internal.h"
  14#include "lockspace.h"
  15#include "member.h"
  16#include "recoverd.h"
  17#include "recover.h"
  18#include "rcom.h"
  19#include "config.h"
  20#include "lowcomms.h"
  21
  22int dlm_slots_version(struct dlm_header *h)
  23{
  24        if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS)
  25                return 0;
  26        return 1;
  27}
  28
  29void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
  30                   struct dlm_member *memb)
  31{
  32        struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
  33
  34        if (!dlm_slots_version(&rc->rc_header))
  35                return;
  36
  37        memb->slot = le16_to_cpu(rf->rf_our_slot);
  38        memb->generation = le32_to_cpu(rf->rf_generation);
  39}
  40
  41void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
  42{
  43        struct dlm_slot *slot;
  44        struct rcom_slot *ro;
  45        int i;
  46
  47        ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
  48
  49        /* ls_slots array is sparse, but not rcom_slots */
  50
  51        for (i = 0; i < ls->ls_slots_size; i++) {
  52                slot = &ls->ls_slots[i];
  53                if (!slot->nodeid)
  54                        continue;
  55                ro->ro_nodeid = cpu_to_le32(slot->nodeid);
  56                ro->ro_slot = cpu_to_le16(slot->slot);
  57                ro++;
  58        }
  59}
  60
  61#define SLOT_DEBUG_LINE 128
  62
  63static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
  64                      struct rcom_slot *ro0, struct dlm_slot *array,
  65                      int array_size)
  66{
  67        char line[SLOT_DEBUG_LINE];
  68        int len = SLOT_DEBUG_LINE - 1;
  69        int pos = 0;
  70        int ret, i;
  71
  72        memset(line, 0, sizeof(line));
  73
  74        if (array) {
  75                for (i = 0; i < array_size; i++) {
  76                        if (!array[i].nodeid)
  77                                continue;
  78
  79                        ret = snprintf(line + pos, len - pos, " %d:%d",
  80                                       array[i].slot, array[i].nodeid);
  81                        if (ret >= len - pos)
  82                                break;
  83                        pos += ret;
  84                }
  85        } else if (ro0) {
  86                for (i = 0; i < num_slots; i++) {
  87                        ret = snprintf(line + pos, len - pos, " %d:%d",
  88                                       ro0[i].ro_slot, ro0[i].ro_nodeid);
  89                        if (ret >= len - pos)
  90                                break;
  91                        pos += ret;
  92                }
  93        }
  94
  95        log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line);
  96}
  97
  98int dlm_slots_copy_in(struct dlm_ls *ls)
  99{
 100        struct dlm_member *memb;
 101        struct dlm_rcom *rc = ls->ls_recover_buf;
 102        struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
 103        struct rcom_slot *ro0, *ro;
 104        int our_nodeid = dlm_our_nodeid();
 105        int i, num_slots;
 106        uint32_t gen;
 107
 108        if (!dlm_slots_version(&rc->rc_header))
 109                return -1;
 110
 111        gen = le32_to_cpu(rf->rf_generation);
 112        if (gen <= ls->ls_generation) {
 113                log_error(ls, "dlm_slots_copy_in gen %u old %u",
 114                          gen, ls->ls_generation);
 115        }
 116        ls->ls_generation = gen;
 117
 118        num_slots = le16_to_cpu(rf->rf_num_slots);
 119        if (!num_slots)
 120                return -1;
 121
 122        ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
 123
 124        for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
 125                ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid);
 126                ro->ro_slot = le16_to_cpu(ro->ro_slot);
 127        }
 128
 129        log_slots(ls, gen, num_slots, ro0, NULL, 0);
 130
 131        list_for_each_entry(memb, &ls->ls_nodes, list) {
 132                for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
 133                        if (ro->ro_nodeid != memb->nodeid)
 134                                continue;
 135                        memb->slot = ro->ro_slot;
 136                        memb->slot_prev = memb->slot;
 137                        break;
 138                }
 139
 140                if (memb->nodeid == our_nodeid) {
 141                        if (ls->ls_slot && ls->ls_slot != memb->slot) {
 142                                log_error(ls, "dlm_slots_copy_in our slot "
 143                                          "changed %d %d", ls->ls_slot,
 144                                          memb->slot);
 145                                return -1;
 146                        }
 147
 148                        if (!ls->ls_slot)
 149                                ls->ls_slot = memb->slot;
 150                }
 151
 152                if (!memb->slot) {
 153                        log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
 154                                   memb->nodeid);
 155                        return -1;
 156                }
 157        }
 158
 159        return 0;
 160}
 161
 162/* for any nodes that do not support slots, we will not have set memb->slot
 163   in wait_status_all(), so memb->slot will remain -1, and we will not
 164   assign slots or set ls_num_slots here */
 165
 166int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
 167                     struct dlm_slot **slots_out, uint32_t *gen_out)
 168{
 169        struct dlm_member *memb;
 170        struct dlm_slot *array;
 171        int our_nodeid = dlm_our_nodeid();
 172        int array_size, max_slots, i;
 173        int need = 0;
 174        int max = 0;
 175        int num = 0;
 176        uint32_t gen = 0;
 177
 178        /* our own memb struct will have slot -1 gen 0 */
 179
 180        list_for_each_entry(memb, &ls->ls_nodes, list) {
 181                if (memb->nodeid == our_nodeid) {
 182                        memb->slot = ls->ls_slot;
 183                        memb->generation = ls->ls_generation;
 184                        break;
 185                }
 186        }
 187
 188        list_for_each_entry(memb, &ls->ls_nodes, list) {
 189                if (memb->generation > gen)
 190                        gen = memb->generation;
 191
 192                /* node doesn't support slots */
 193
 194                if (memb->slot == -1)
 195                        return -1;
 196
 197                /* node needs a slot assigned */
 198
 199                if (!memb->slot)
 200                        need++;
 201
 202                /* node has a slot assigned */
 203
 204                num++;
 205
 206                if (!max || max < memb->slot)
 207                        max = memb->slot;
 208
 209                /* sanity check, once slot is assigned it shouldn't change */
 210
 211                if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
 212                        log_error(ls, "nodeid %d slot changed %d %d",
 213                                  memb->nodeid, memb->slot_prev, memb->slot);
 214                        return -1;
 215                }
 216                memb->slot_prev = memb->slot;
 217        }
 218
 219        array_size = max + need;
 220        array = kcalloc(array_size, sizeof(*array), GFP_NOFS);
 221        if (!array)
 222                return -ENOMEM;
 223
 224        num = 0;
 225
 226        /* fill in slots (offsets) that are used */
 227
 228        list_for_each_entry(memb, &ls->ls_nodes, list) {
 229                if (!memb->slot)
 230                        continue;
 231
 232                if (memb->slot > array_size) {
 233                        log_error(ls, "invalid slot number %d", memb->slot);
 234                        kfree(array);
 235                        return -1;
 236                }
 237
 238                array[memb->slot - 1].nodeid = memb->nodeid;
 239                array[memb->slot - 1].slot = memb->slot;
 240                num++;
 241        }
 242
 243        /* assign new slots from unused offsets */
 244
 245        list_for_each_entry(memb, &ls->ls_nodes, list) {
 246                if (memb->slot)
 247                        continue;
 248
 249                for (i = 0; i < array_size; i++) {
 250                        if (array[i].nodeid)
 251                                continue;
 252
 253                        memb->slot = i + 1;
 254                        memb->slot_prev = memb->slot;
 255                        array[i].nodeid = memb->nodeid;
 256                        array[i].slot = memb->slot;
 257                        num++;
 258
 259                        if (!ls->ls_slot && memb->nodeid == our_nodeid)
 260                                ls->ls_slot = memb->slot;
 261                        break;
 262                }
 263
 264                if (!memb->slot) {
 265                        log_error(ls, "no free slot found");
 266                        kfree(array);
 267                        return -1;
 268                }
 269        }
 270
 271        gen++;
 272
 273        log_slots(ls, gen, num, NULL, array, array_size);
 274
 275        max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) -
 276                     sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
 277
 278        if (num > max_slots) {
 279                log_error(ls, "num_slots %d exceeds max_slots %d",
 280                          num, max_slots);
 281                kfree(array);
 282                return -1;
 283        }
 284
 285        *gen_out = gen;
 286        *slots_out = array;
 287        *slots_size = array_size;
 288        *num_slots = num;
 289        return 0;
 290}
 291
 292static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
 293{
 294        struct dlm_member *memb = NULL;
 295        struct list_head *tmp;
 296        struct list_head *newlist = &new->list;
 297        struct list_head *head = &ls->ls_nodes;
 298
 299        list_for_each(tmp, head) {
 300                memb = list_entry(tmp, struct dlm_member, list);
 301                if (new->nodeid < memb->nodeid)
 302                        break;
 303        }
 304
 305        if (!memb)
 306                list_add_tail(newlist, head);
 307        else {
 308                /* FIXME: can use list macro here */
 309                newlist->prev = tmp->prev;
 310                newlist->next = tmp;
 311                tmp->prev->next = newlist;
 312                tmp->prev = newlist;
 313        }
 314}
 315
 316static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
 317{
 318        struct dlm_member *memb;
 319        int error;
 320
 321        memb = kzalloc(sizeof(*memb), GFP_NOFS);
 322        if (!memb)
 323                return -ENOMEM;
 324
 325        error = dlm_lowcomms_connect_node(node->nodeid);
 326        if (error < 0) {
 327                kfree(memb);
 328                return error;
 329        }
 330
 331        memb->nodeid = node->nodeid;
 332        memb->weight = node->weight;
 333        memb->comm_seq = node->comm_seq;
 334        add_ordered_member(ls, memb);
 335        ls->ls_num_nodes++;
 336        return 0;
 337}
 338
 339static struct dlm_member *find_memb(struct list_head *head, int nodeid)
 340{
 341        struct dlm_member *memb;
 342
 343        list_for_each_entry(memb, head, list) {
 344                if (memb->nodeid == nodeid)
 345                        return memb;
 346        }
 347        return NULL;
 348}
 349
 350int dlm_is_member(struct dlm_ls *ls, int nodeid)
 351{
 352        if (find_memb(&ls->ls_nodes, nodeid))
 353                return 1;
 354        return 0;
 355}
 356
 357int dlm_is_removed(struct dlm_ls *ls, int nodeid)
 358{
 359        if (find_memb(&ls->ls_nodes_gone, nodeid))
 360                return 1;
 361        return 0;
 362}
 363
 364static void clear_memb_list(struct list_head *head)
 365{
 366        struct dlm_member *memb;
 367
 368        while (!list_empty(head)) {
 369                memb = list_entry(head->next, struct dlm_member, list);
 370                list_del(&memb->list);
 371                kfree(memb);
 372        }
 373}
 374
 375void dlm_clear_members(struct dlm_ls *ls)
 376{
 377        clear_memb_list(&ls->ls_nodes);
 378        ls->ls_num_nodes = 0;
 379}
 380
 381void dlm_clear_members_gone(struct dlm_ls *ls)
 382{
 383        clear_memb_list(&ls->ls_nodes_gone);
 384}
 385
 386static void make_member_array(struct dlm_ls *ls)
 387{
 388        struct dlm_member *memb;
 389        int i, w, x = 0, total = 0, all_zero = 0, *array;
 390
 391        kfree(ls->ls_node_array);
 392        ls->ls_node_array = NULL;
 393
 394        list_for_each_entry(memb, &ls->ls_nodes, list) {
 395                if (memb->weight)
 396                        total += memb->weight;
 397        }
 398
 399        /* all nodes revert to weight of 1 if all have weight 0 */
 400
 401        if (!total) {
 402                total = ls->ls_num_nodes;
 403                all_zero = 1;
 404        }
 405
 406        ls->ls_total_weight = total;
 407        array = kmalloc_array(total, sizeof(*array), GFP_NOFS);
 408        if (!array)
 409                return;
 410
 411        list_for_each_entry(memb, &ls->ls_nodes, list) {
 412                if (!all_zero && !memb->weight)
 413                        continue;
 414
 415                if (all_zero)
 416                        w = 1;
 417                else
 418                        w = memb->weight;
 419
 420                DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
 421
 422                for (i = 0; i < w; i++)
 423                        array[x++] = memb->nodeid;
 424        }
 425
 426        ls->ls_node_array = array;
 427}
 428
 429/* send a status request to all members just to establish comms connections */
 430
 431static int ping_members(struct dlm_ls *ls)
 432{
 433        struct dlm_member *memb;
 434        int error = 0;
 435
 436        list_for_each_entry(memb, &ls->ls_nodes, list) {
 437                error = dlm_recovery_stopped(ls);
 438                if (error)
 439                        break;
 440                error = dlm_rcom_status(ls, memb->nodeid, 0);
 441                if (error)
 442                        break;
 443        }
 444        if (error)
 445                log_rinfo(ls, "ping_members aborted %d last nodeid %d",
 446                          error, ls->ls_recover_nodeid);
 447        return error;
 448}
 449
 450static void dlm_lsop_recover_prep(struct dlm_ls *ls)
 451{
 452        if (!ls->ls_ops || !ls->ls_ops->recover_prep)
 453                return;
 454        ls->ls_ops->recover_prep(ls->ls_ops_arg);
 455}
 456
 457static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
 458{
 459        struct dlm_slot slot;
 460        uint32_t seq;
 461        int error;
 462
 463        if (!ls->ls_ops || !ls->ls_ops->recover_slot)
 464                return;
 465
 466        /* if there is no comms connection with this node
 467           or the present comms connection is newer
 468           than the one when this member was added, then
 469           we consider the node to have failed (versus
 470           being removed due to dlm_release_lockspace) */
 471
 472        error = dlm_comm_seq(memb->nodeid, &seq);
 473
 474        if (!error && seq == memb->comm_seq)
 475                return;
 476
 477        slot.nodeid = memb->nodeid;
 478        slot.slot = memb->slot;
 479
 480        ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
 481}
 482
 483void dlm_lsop_recover_done(struct dlm_ls *ls)
 484{
 485        struct dlm_member *memb;
 486        struct dlm_slot *slots;
 487        int i, num;
 488
 489        if (!ls->ls_ops || !ls->ls_ops->recover_done)
 490                return;
 491
 492        num = ls->ls_num_nodes;
 493        slots = kcalloc(num, sizeof(*slots), GFP_KERNEL);
 494        if (!slots)
 495                return;
 496
 497        i = 0;
 498        list_for_each_entry(memb, &ls->ls_nodes, list) {
 499                if (i == num) {
 500                        log_error(ls, "dlm_lsop_recover_done bad num %d", num);
 501                        goto out;
 502                }
 503                slots[i].nodeid = memb->nodeid;
 504                slots[i].slot = memb->slot;
 505                i++;
 506        }
 507
 508        ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
 509                                 ls->ls_slot, ls->ls_generation);
 510 out:
 511        kfree(slots);
 512}
 513
 514static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
 515                                                int nodeid)
 516{
 517        int i;
 518
 519        for (i = 0; i < rv->nodes_count; i++) {
 520                if (rv->nodes[i].nodeid == nodeid)
 521                        return &rv->nodes[i];
 522        }
 523        return NULL;
 524}
 525
 526int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
 527{
 528        struct dlm_member *memb, *safe;
 529        struct dlm_config_node *node;
 530        int i, error, neg = 0, low = -1;
 531
 532        /* previously removed members that we've not finished removing need to
 533           count as a negative change so the "neg" recovery steps will happen */
 534
 535        list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
 536                log_rinfo(ls, "prev removed member %d", memb->nodeid);
 537                neg++;
 538        }
 539
 540        /* move departed members from ls_nodes to ls_nodes_gone */
 541
 542        list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
 543                node = find_config_node(rv, memb->nodeid);
 544                if (node && !node->new)
 545                        continue;
 546
 547                if (!node) {
 548                        log_rinfo(ls, "remove member %d", memb->nodeid);
 549                } else {
 550                        /* removed and re-added */
 551                        log_rinfo(ls, "remove member %d comm_seq %u %u",
 552                                  memb->nodeid, memb->comm_seq, node->comm_seq);
 553                }
 554
 555                neg++;
 556                list_move(&memb->list, &ls->ls_nodes_gone);
 557                ls->ls_num_nodes--;
 558                dlm_lsop_recover_slot(ls, memb);
 559        }
 560
 561        /* add new members to ls_nodes */
 562
 563        for (i = 0; i < rv->nodes_count; i++) {
 564                node = &rv->nodes[i];
 565                if (dlm_is_member(ls, node->nodeid))
 566                        continue;
 567                dlm_add_member(ls, node);
 568                log_rinfo(ls, "add member %d", node->nodeid);
 569        }
 570
 571        list_for_each_entry(memb, &ls->ls_nodes, list) {
 572                if (low == -1 || memb->nodeid < low)
 573                        low = memb->nodeid;
 574        }
 575        ls->ls_low_nodeid = low;
 576
 577        make_member_array(ls);
 578        *neg_out = neg;
 579
 580        error = ping_members(ls);
 581        if (!error || error == -EPROTO) {
 582                /* new_lockspace() may be waiting to know if the config
 583                   is good or bad */
 584                ls->ls_members_result = error;
 585                complete(&ls->ls_members_done);
 586        }
 587
 588        log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
 589        return error;
 590}
 591
 592/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
 593   dlm_ls_start() is called on any of them to start the new recovery. */
 594
 595int dlm_ls_stop(struct dlm_ls *ls)
 596{
 597        int new;
 598
 599        /*
 600         * Prevent dlm_recv from being in the middle of something when we do
 601         * the stop.  This includes ensuring dlm_recv isn't processing a
 602         * recovery message (rcom), while dlm_recoverd is aborting and
 603         * resetting things from an in-progress recovery.  i.e. we want
 604         * dlm_recoverd to abort its recovery without worrying about dlm_recv
 605         * processing an rcom at the same time.  Stopping dlm_recv also makes
 606         * it easy for dlm_receive_message() to check locking stopped and add a
 607         * message to the requestqueue without races.
 608         */
 609
 610        down_write(&ls->ls_recv_active);
 611
 612        /*
 613         * Abort any recovery that's in progress (see RECOVER_STOP,
 614         * dlm_recovery_stopped()) and tell any other threads running in the
 615         * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
 616         */
 617
 618        spin_lock(&ls->ls_recover_lock);
 619        set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 620        new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
 621        ls->ls_recover_seq++;
 622        spin_unlock(&ls->ls_recover_lock);
 623
 624        /*
 625         * Let dlm_recv run again, now any normal messages will be saved on the
 626         * requestqueue for later.
 627         */
 628
 629        up_write(&ls->ls_recv_active);
 630
 631        /*
 632         * This in_recovery lock does two things:
 633         * 1) Keeps this function from returning until all threads are out
 634         *    of locking routines and locking is truly stopped.
 635         * 2) Keeps any new requests from being processed until it's unlocked
 636         *    when recovery is complete.
 637         */
 638
 639        if (new) {
 640                set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
 641                wake_up_process(ls->ls_recoverd_task);
 642                wait_event(ls->ls_recover_lock_wait,
 643                           test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
 644        }
 645
 646        /*
 647         * The recoverd suspend/resume makes sure that dlm_recoverd (if
 648         * running) has noticed RECOVER_STOP above and quit processing the
 649         * previous recovery.
 650         */
 651
 652        dlm_recoverd_suspend(ls);
 653
 654        spin_lock(&ls->ls_recover_lock);
 655        kfree(ls->ls_slots);
 656        ls->ls_slots = NULL;
 657        ls->ls_num_slots = 0;
 658        ls->ls_slots_size = 0;
 659        ls->ls_recover_status = 0;
 660        spin_unlock(&ls->ls_recover_lock);
 661
 662        dlm_recoverd_resume(ls);
 663
 664        if (!ls->ls_recover_begin)
 665                ls->ls_recover_begin = jiffies;
 666
 667        dlm_lsop_recover_prep(ls);
 668        return 0;
 669}
 670
 671int dlm_ls_start(struct dlm_ls *ls)
 672{
 673        struct dlm_recover *rv, *rv_old;
 674        struct dlm_config_node *nodes;
 675        int error, count;
 676
 677        rv = kzalloc(sizeof(*rv), GFP_NOFS);
 678        if (!rv)
 679                return -ENOMEM;
 680
 681        error = dlm_config_nodes(ls->ls_name, &nodes, &count);
 682        if (error < 0)
 683                goto fail;
 684
 685        spin_lock(&ls->ls_recover_lock);
 686
 687        /* the lockspace needs to be stopped before it can be started */
 688
 689        if (!dlm_locking_stopped(ls)) {
 690                spin_unlock(&ls->ls_recover_lock);
 691                log_error(ls, "start ignored: lockspace running");
 692                error = -EINVAL;
 693                goto fail;
 694        }
 695
 696        rv->nodes = nodes;
 697        rv->nodes_count = count;
 698        rv->seq = ++ls->ls_recover_seq;
 699        rv_old = ls->ls_recover_args;
 700        ls->ls_recover_args = rv;
 701        spin_unlock(&ls->ls_recover_lock);
 702
 703        if (rv_old) {
 704                log_error(ls, "unused recovery %llx %d",
 705                          (unsigned long long)rv_old->seq, rv_old->nodes_count);
 706                kfree(rv_old->nodes);
 707                kfree(rv_old);
 708        }
 709
 710        set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
 711        wake_up_process(ls->ls_recoverd_task);
 712        return 0;
 713
 714 fail:
 715        kfree(rv);
 716        kfree(nodes);
 717        return error;
 718}
 719
 720