linux/fs/dlm/member.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
   5**
   6**  This copyrighted material is made available to anyone wishing to use,
   7**  modify, copy, or redistribute it subject to the terms and conditions
   8**  of the GNU General Public License v.2.
   9**
  10*******************************************************************************
  11******************************************************************************/
  12
  13#include "dlm_internal.h"
  14#include "lockspace.h"
  15#include "member.h"
  16#include "recoverd.h"
  17#include "recover.h"
  18#include "rcom.h"
  19#include "config.h"
  20#include "lowcomms.h"
  21
  22int dlm_slots_version(struct dlm_header *h)
  23{
  24        if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS)
  25                return 0;
  26        return 1;
  27}
  28
  29void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
  30                   struct dlm_member *memb)
  31{
  32        struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
  33
  34        if (!dlm_slots_version(&rc->rc_header))
  35                return;
  36
  37        memb->slot = le16_to_cpu(rf->rf_our_slot);
  38        memb->generation = le32_to_cpu(rf->rf_generation);
  39}
  40
  41void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
  42{
  43        struct dlm_slot *slot;
  44        struct rcom_slot *ro;
  45        int i;
  46
  47        ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
  48
  49        /* ls_slots array is sparse, but not rcom_slots */
  50
  51        for (i = 0; i < ls->ls_slots_size; i++) {
  52                slot = &ls->ls_slots[i];
  53                if (!slot->nodeid)
  54                        continue;
  55                ro->ro_nodeid = cpu_to_le32(slot->nodeid);
  56                ro->ro_slot = cpu_to_le16(slot->slot);
  57                ro++;
  58        }
  59}
  60
  61#define SLOT_DEBUG_LINE 128
  62
  63static void log_debug_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
  64                            struct rcom_slot *ro0, struct dlm_slot *array,
  65                            int array_size)
  66{
  67        char line[SLOT_DEBUG_LINE];
  68        int len = SLOT_DEBUG_LINE - 1;
  69        int pos = 0;
  70        int ret, i;
  71
  72        if (!dlm_config.ci_log_debug)
  73                return;
  74
  75        memset(line, 0, sizeof(line));
  76
  77        if (array) {
  78                for (i = 0; i < array_size; i++) {
  79                        if (!array[i].nodeid)
  80                                continue;
  81
  82                        ret = snprintf(line + pos, len - pos, " %d:%d",
  83                                       array[i].slot, array[i].nodeid);
  84                        if (ret >= len - pos)
  85                                break;
  86                        pos += ret;
  87                }
  88        } else if (ro0) {
  89                for (i = 0; i < num_slots; i++) {
  90                        ret = snprintf(line + pos, len - pos, " %d:%d",
  91                                       ro0[i].ro_slot, ro0[i].ro_nodeid);
  92                        if (ret >= len - pos)
  93                                break;
  94                        pos += ret;
  95                }
  96        }
  97
  98        log_debug(ls, "generation %u slots %d%s", gen, num_slots, line);
  99}
 100
 101int dlm_slots_copy_in(struct dlm_ls *ls)
 102{
 103        struct dlm_member *memb;
 104        struct dlm_rcom *rc = ls->ls_recover_buf;
 105        struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
 106        struct rcom_slot *ro0, *ro;
 107        int our_nodeid = dlm_our_nodeid();
 108        int i, num_slots;
 109        uint32_t gen;
 110
 111        if (!dlm_slots_version(&rc->rc_header))
 112                return -1;
 113
 114        gen = le32_to_cpu(rf->rf_generation);
 115        if (gen <= ls->ls_generation) {
 116                log_error(ls, "dlm_slots_copy_in gen %u old %u",
 117                          gen, ls->ls_generation);
 118        }
 119        ls->ls_generation = gen;
 120
 121        num_slots = le16_to_cpu(rf->rf_num_slots);
 122        if (!num_slots)
 123                return -1;
 124
 125        ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
 126
 127        for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
 128                ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid);
 129                ro->ro_slot = le16_to_cpu(ro->ro_slot);
 130        }
 131
 132        log_debug_slots(ls, gen, num_slots, ro0, NULL, 0);
 133
 134        list_for_each_entry(memb, &ls->ls_nodes, list) {
 135                for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
 136                        if (ro->ro_nodeid != memb->nodeid)
 137                                continue;
 138                        memb->slot = ro->ro_slot;
 139                        memb->slot_prev = memb->slot;
 140                        break;
 141                }
 142
 143                if (memb->nodeid == our_nodeid) {
 144                        if (ls->ls_slot && ls->ls_slot != memb->slot) {
 145                                log_error(ls, "dlm_slots_copy_in our slot "
 146                                          "changed %d %d", ls->ls_slot,
 147                                          memb->slot);
 148                                return -1;
 149                        }
 150
 151                        if (!ls->ls_slot)
 152                                ls->ls_slot = memb->slot;
 153                }
 154
 155                if (!memb->slot) {
 156                        log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
 157                                   memb->nodeid);
 158                        return -1;
 159                }
 160        }
 161
 162        return 0;
 163}
 164
 165/* for any nodes that do not support slots, we will not have set memb->slot
 166   in wait_status_all(), so memb->slot will remain -1, and we will not
 167   assign slots or set ls_num_slots here */
 168
 169int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
 170                     struct dlm_slot **slots_out, uint32_t *gen_out)
 171{
 172        struct dlm_member *memb;
 173        struct dlm_slot *array;
 174        int our_nodeid = dlm_our_nodeid();
 175        int array_size, max_slots, i;
 176        int need = 0;
 177        int max = 0;
 178        int num = 0;
 179        uint32_t gen = 0;
 180
 181        /* our own memb struct will have slot -1 gen 0 */
 182
 183        list_for_each_entry(memb, &ls->ls_nodes, list) {
 184                if (memb->nodeid == our_nodeid) {
 185                        memb->slot = ls->ls_slot;
 186                        memb->generation = ls->ls_generation;
 187                        break;
 188                }
 189        }
 190
 191        list_for_each_entry(memb, &ls->ls_nodes, list) {
 192                if (memb->generation > gen)
 193                        gen = memb->generation;
 194
 195                /* node doesn't support slots */
 196
 197                if (memb->slot == -1)
 198                        return -1;
 199
 200                /* node needs a slot assigned */
 201
 202                if (!memb->slot)
 203                        need++;
 204
 205                /* node has a slot assigned */
 206
 207                num++;
 208
 209                if (!max || max < memb->slot)
 210                        max = memb->slot;
 211
 212                /* sanity check, once slot is assigned it shouldn't change */
 213
 214                if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
 215                        log_error(ls, "nodeid %d slot changed %d %d",
 216                                  memb->nodeid, memb->slot_prev, memb->slot);
 217                        return -1;
 218                }
 219                memb->slot_prev = memb->slot;
 220        }
 221
 222        array_size = max + need;
 223
 224        array = kzalloc(array_size * sizeof(struct dlm_slot), GFP_NOFS);
 225        if (!array)
 226                return -ENOMEM;
 227
 228        num = 0;
 229
 230        /* fill in slots (offsets) that are used */
 231
 232        list_for_each_entry(memb, &ls->ls_nodes, list) {
 233                if (!memb->slot)
 234                        continue;
 235
 236                if (memb->slot > array_size) {
 237                        log_error(ls, "invalid slot number %d", memb->slot);
 238                        kfree(array);
 239                        return -1;
 240                }
 241
 242                array[memb->slot - 1].nodeid = memb->nodeid;
 243                array[memb->slot - 1].slot = memb->slot;
 244                num++;
 245        }
 246
 247        /* assign new slots from unused offsets */
 248
 249        list_for_each_entry(memb, &ls->ls_nodes, list) {
 250                if (memb->slot)
 251                        continue;
 252
 253                for (i = 0; i < array_size; i++) {
 254                        if (array[i].nodeid)
 255                                continue;
 256
 257                        memb->slot = i + 1;
 258                        memb->slot_prev = memb->slot;
 259                        array[i].nodeid = memb->nodeid;
 260                        array[i].slot = memb->slot;
 261                        num++;
 262
 263                        if (!ls->ls_slot && memb->nodeid == our_nodeid)
 264                                ls->ls_slot = memb->slot;
 265                        break;
 266                }
 267
 268                if (!memb->slot) {
 269                        log_error(ls, "no free slot found");
 270                        kfree(array);
 271                        return -1;
 272                }
 273        }
 274
 275        gen++;
 276
 277        log_debug_slots(ls, gen, num, NULL, array, array_size);
 278
 279        max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) -
 280                     sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
 281
 282        if (num > max_slots) {
 283                log_error(ls, "num_slots %d exceeds max_slots %d",
 284                          num, max_slots);
 285                kfree(array);
 286                return -1;
 287        }
 288
 289        *gen_out = gen;
 290        *slots_out = array;
 291        *slots_size = array_size;
 292        *num_slots = num;
 293        return 0;
 294}
 295
 296static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
 297{
 298        struct dlm_member *memb = NULL;
 299        struct list_head *tmp;
 300        struct list_head *newlist = &new->list;
 301        struct list_head *head = &ls->ls_nodes;
 302
 303        list_for_each(tmp, head) {
 304                memb = list_entry(tmp, struct dlm_member, list);
 305                if (new->nodeid < memb->nodeid)
 306                        break;
 307        }
 308
 309        if (!memb)
 310                list_add_tail(newlist, head);
 311        else {
 312                /* FIXME: can use list macro here */
 313                newlist->prev = tmp->prev;
 314                newlist->next = tmp;
 315                tmp->prev->next = newlist;
 316                tmp->prev = newlist;
 317        }
 318}
 319
 320static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
 321{
 322        struct dlm_member *memb;
 323        int error;
 324
 325        memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
 326        if (!memb)
 327                return -ENOMEM;
 328
 329        error = dlm_lowcomms_connect_node(node->nodeid);
 330        if (error < 0) {
 331                kfree(memb);
 332                return error;
 333        }
 334
 335        memb->nodeid = node->nodeid;
 336        memb->weight = node->weight;
 337        memb->comm_seq = node->comm_seq;
 338        add_ordered_member(ls, memb);
 339        ls->ls_num_nodes++;
 340        return 0;
 341}
 342
 343static struct dlm_member *find_memb(struct list_head *head, int nodeid)
 344{
 345        struct dlm_member *memb;
 346
 347        list_for_each_entry(memb, head, list) {
 348                if (memb->nodeid == nodeid)
 349                        return memb;
 350        }
 351        return NULL;
 352}
 353
 354int dlm_is_member(struct dlm_ls *ls, int nodeid)
 355{
 356        if (find_memb(&ls->ls_nodes, nodeid))
 357                return 1;
 358        return 0;
 359}
 360
 361int dlm_is_removed(struct dlm_ls *ls, int nodeid)
 362{
 363        if (find_memb(&ls->ls_nodes_gone, nodeid))
 364                return 1;
 365        return 0;
 366}
 367
 368static void clear_memb_list(struct list_head *head)
 369{
 370        struct dlm_member *memb;
 371
 372        while (!list_empty(head)) {
 373                memb = list_entry(head->next, struct dlm_member, list);
 374                list_del(&memb->list);
 375                kfree(memb);
 376        }
 377}
 378
 379void dlm_clear_members(struct dlm_ls *ls)
 380{
 381        clear_memb_list(&ls->ls_nodes);
 382        ls->ls_num_nodes = 0;
 383}
 384
 385void dlm_clear_members_gone(struct dlm_ls *ls)
 386{
 387        clear_memb_list(&ls->ls_nodes_gone);
 388}
 389
 390static void make_member_array(struct dlm_ls *ls)
 391{
 392        struct dlm_member *memb;
 393        int i, w, x = 0, total = 0, all_zero = 0, *array;
 394
 395        kfree(ls->ls_node_array);
 396        ls->ls_node_array = NULL;
 397
 398        list_for_each_entry(memb, &ls->ls_nodes, list) {
 399                if (memb->weight)
 400                        total += memb->weight;
 401        }
 402
 403        /* all nodes revert to weight of 1 if all have weight 0 */
 404
 405        if (!total) {
 406                total = ls->ls_num_nodes;
 407                all_zero = 1;
 408        }
 409
 410        ls->ls_total_weight = total;
 411
 412        array = kmalloc(sizeof(int) * total, GFP_NOFS);
 413        if (!array)
 414                return;
 415
 416        list_for_each_entry(memb, &ls->ls_nodes, list) {
 417                if (!all_zero && !memb->weight)
 418                        continue;
 419
 420                if (all_zero)
 421                        w = 1;
 422                else
 423                        w = memb->weight;
 424
 425                DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
 426
 427                for (i = 0; i < w; i++)
 428                        array[x++] = memb->nodeid;
 429        }
 430
 431        ls->ls_node_array = array;
 432}
 433
 434/* send a status request to all members just to establish comms connections */
 435
 436static int ping_members(struct dlm_ls *ls)
 437{
 438        struct dlm_member *memb;
 439        int error = 0;
 440
 441        list_for_each_entry(memb, &ls->ls_nodes, list) {
 442                error = dlm_recovery_stopped(ls);
 443                if (error)
 444                        break;
 445                error = dlm_rcom_status(ls, memb->nodeid, 0);
 446                if (error)
 447                        break;
 448        }
 449        if (error)
 450                log_debug(ls, "ping_members aborted %d last nodeid %d",
 451                          error, ls->ls_recover_nodeid);
 452        return error;
 453}
 454
 455static void dlm_lsop_recover_prep(struct dlm_ls *ls)
 456{
 457        if (!ls->ls_ops || !ls->ls_ops->recover_prep)
 458                return;
 459        ls->ls_ops->recover_prep(ls->ls_ops_arg);
 460}
 461
 462static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
 463{
 464        struct dlm_slot slot;
 465        uint32_t seq;
 466        int error;
 467
 468        if (!ls->ls_ops || !ls->ls_ops->recover_slot)
 469                return;
 470
 471        /* if there is no comms connection with this node
 472           or the present comms connection is newer
 473           than the one when this member was added, then
 474           we consider the node to have failed (versus
 475           being removed due to dlm_release_lockspace) */
 476
 477        error = dlm_comm_seq(memb->nodeid, &seq);
 478
 479        if (!error && seq == memb->comm_seq)
 480                return;
 481
 482        slot.nodeid = memb->nodeid;
 483        slot.slot = memb->slot;
 484
 485        ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
 486}
 487
 488void dlm_lsop_recover_done(struct dlm_ls *ls)
 489{
 490        struct dlm_member *memb;
 491        struct dlm_slot *slots;
 492        int i, num;
 493
 494        if (!ls->ls_ops || !ls->ls_ops->recover_done)
 495                return;
 496
 497        num = ls->ls_num_nodes;
 498
 499        slots = kzalloc(num * sizeof(struct dlm_slot), GFP_KERNEL);
 500        if (!slots)
 501                return;
 502
 503        i = 0;
 504        list_for_each_entry(memb, &ls->ls_nodes, list) {
 505                if (i == num) {
 506                        log_error(ls, "dlm_lsop_recover_done bad num %d", num);
 507                        goto out;
 508                }
 509                slots[i].nodeid = memb->nodeid;
 510                slots[i].slot = memb->slot;
 511                i++;
 512        }
 513
 514        ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
 515                                 ls->ls_slot, ls->ls_generation);
 516 out:
 517        kfree(slots);
 518}
 519
 520static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
 521                                                int nodeid)
 522{
 523        int i;
 524
 525        for (i = 0; i < rv->nodes_count; i++) {
 526                if (rv->nodes[i].nodeid == nodeid)
 527                        return &rv->nodes[i];
 528        }
 529        return NULL;
 530}
 531
 532int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
 533{
 534        struct dlm_member *memb, *safe;
 535        struct dlm_config_node *node;
 536        int i, error, neg = 0, low = -1;
 537
 538        /* previously removed members that we've not finished removing need to
 539           count as a negative change so the "neg" recovery steps will happen */
 540
 541        list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
 542                log_debug(ls, "prev removed member %d", memb->nodeid);
 543                neg++;
 544        }
 545
 546        /* move departed members from ls_nodes to ls_nodes_gone */
 547
 548        list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
 549                node = find_config_node(rv, memb->nodeid);
 550                if (node && !node->new)
 551                        continue;
 552
 553                if (!node) {
 554                        log_debug(ls, "remove member %d", memb->nodeid);
 555                } else {
 556                        /* removed and re-added */
 557                        log_debug(ls, "remove member %d comm_seq %u %u",
 558                                  memb->nodeid, memb->comm_seq, node->comm_seq);
 559                }
 560
 561                neg++;
 562                list_move(&memb->list, &ls->ls_nodes_gone);
 563                ls->ls_num_nodes--;
 564                dlm_lsop_recover_slot(ls, memb);
 565        }
 566
 567        /* add new members to ls_nodes */
 568
 569        for (i = 0; i < rv->nodes_count; i++) {
 570                node = &rv->nodes[i];
 571                if (dlm_is_member(ls, node->nodeid))
 572                        continue;
 573                dlm_add_member(ls, node);
 574                log_debug(ls, "add member %d", node->nodeid);
 575        }
 576
 577        list_for_each_entry(memb, &ls->ls_nodes, list) {
 578                if (low == -1 || memb->nodeid < low)
 579                        low = memb->nodeid;
 580        }
 581        ls->ls_low_nodeid = low;
 582
 583        make_member_array(ls);
 584        *neg_out = neg;
 585
 586        error = ping_members(ls);
 587        if (!error || error == -EPROTO) {
 588                /* new_lockspace() may be waiting to know if the config
 589                   is good or bad */
 590                ls->ls_members_result = error;
 591                complete(&ls->ls_members_done);
 592        }
 593
 594        log_debug(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
 595        return error;
 596}
 597
 598/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
 599   dlm_ls_start() is called on any of them to start the new recovery. */
 600
 601int dlm_ls_stop(struct dlm_ls *ls)
 602{
 603        int new;
 604
 605        /*
 606         * Prevent dlm_recv from being in the middle of something when we do
 607         * the stop.  This includes ensuring dlm_recv isn't processing a
 608         * recovery message (rcom), while dlm_recoverd is aborting and
 609         * resetting things from an in-progress recovery.  i.e. we want
 610         * dlm_recoverd to abort its recovery without worrying about dlm_recv
 611         * processing an rcom at the same time.  Stopping dlm_recv also makes
 612         * it easy for dlm_receive_message() to check locking stopped and add a
 613         * message to the requestqueue without races.
 614         */
 615
 616        down_write(&ls->ls_recv_active);
 617
 618        /*
 619         * Abort any recovery that's in progress (see RECOVER_STOP,
 620         * dlm_recovery_stopped()) and tell any other threads running in the
 621         * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
 622         */
 623
 624        spin_lock(&ls->ls_recover_lock);
 625        set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 626        new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
 627        ls->ls_recover_seq++;
 628        spin_unlock(&ls->ls_recover_lock);
 629
 630        /*
 631         * Let dlm_recv run again, now any normal messages will be saved on the
 632         * requestqueue for later.
 633         */
 634
 635        up_write(&ls->ls_recv_active);
 636
 637        /*
 638         * This in_recovery lock does two things:
 639         * 1) Keeps this function from returning until all threads are out
 640         *    of locking routines and locking is truly stopped.
 641         * 2) Keeps any new requests from being processed until it's unlocked
 642         *    when recovery is complete.
 643         */
 644
 645        if (new) {
 646                set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
 647                wake_up_process(ls->ls_recoverd_task);
 648                wait_event(ls->ls_recover_lock_wait,
 649                           test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
 650        }
 651
 652        /*
 653         * The recoverd suspend/resume makes sure that dlm_recoverd (if
 654         * running) has noticed RECOVER_STOP above and quit processing the
 655         * previous recovery.
 656         */
 657
 658        dlm_recoverd_suspend(ls);
 659
 660        spin_lock(&ls->ls_recover_lock);
 661        kfree(ls->ls_slots);
 662        ls->ls_slots = NULL;
 663        ls->ls_num_slots = 0;
 664        ls->ls_slots_size = 0;
 665        ls->ls_recover_status = 0;
 666        spin_unlock(&ls->ls_recover_lock);
 667
 668        dlm_recoverd_resume(ls);
 669
 670        if (!ls->ls_recover_begin)
 671                ls->ls_recover_begin = jiffies;
 672
 673        dlm_lsop_recover_prep(ls);
 674        return 0;
 675}
 676
 677int dlm_ls_start(struct dlm_ls *ls)
 678{
 679        struct dlm_recover *rv = NULL, *rv_old;
 680        struct dlm_config_node *nodes;
 681        int error, count;
 682
 683        rv = kzalloc(sizeof(struct dlm_recover), GFP_NOFS);
 684        if (!rv)
 685                return -ENOMEM;
 686
 687        error = dlm_config_nodes(ls->ls_name, &nodes, &count);
 688        if (error < 0)
 689                goto fail;
 690
 691        spin_lock(&ls->ls_recover_lock);
 692
 693        /* the lockspace needs to be stopped before it can be started */
 694
 695        if (!dlm_locking_stopped(ls)) {
 696                spin_unlock(&ls->ls_recover_lock);
 697                log_error(ls, "start ignored: lockspace running");
 698                error = -EINVAL;
 699                goto fail;
 700        }
 701
 702        rv->nodes = nodes;
 703        rv->nodes_count = count;
 704        rv->seq = ++ls->ls_recover_seq;
 705        rv_old = ls->ls_recover_args;
 706        ls->ls_recover_args = rv;
 707        spin_unlock(&ls->ls_recover_lock);
 708
 709        if (rv_old) {
 710                log_error(ls, "unused recovery %llx %d",
 711                          (unsigned long long)rv_old->seq, rv_old->nodes_count);
 712                kfree(rv_old->nodes);
 713                kfree(rv_old);
 714        }
 715
 716        set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
 717        wake_up_process(ls->ls_recoverd_task);
 718        return 0;
 719
 720 fail:
 721        kfree(rv);
 722        kfree(nodes);
 723        return error;
 724}
 725
 726