linux/fs/dlm/member.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/******************************************************************************
   3*******************************************************************************
   4**
   5**  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
   6**
   7**
   8*******************************************************************************
   9******************************************************************************/
  10
  11#include "dlm_internal.h"
  12#include "lockspace.h"
  13#include "member.h"
  14#include "recoverd.h"
  15#include "recover.h"
  16#include "rcom.h"
  17#include "config.h"
  18#include "lowcomms.h"
  19
  20int dlm_slots_version(struct dlm_header *h)
  21{
  22        if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS)
  23                return 0;
  24        return 1;
  25}
  26
  27void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
  28                   struct dlm_member *memb)
  29{
  30        struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
  31
  32        if (!dlm_slots_version(&rc->rc_header))
  33                return;
  34
  35        memb->slot = le16_to_cpu(rf->rf_our_slot);
  36        memb->generation = le32_to_cpu(rf->rf_generation);
  37}
  38
  39void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
  40{
  41        struct dlm_slot *slot;
  42        struct rcom_slot *ro;
  43        int i;
  44
  45        ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
  46
  47        /* ls_slots array is sparse, but not rcom_slots */
  48
  49        for (i = 0; i < ls->ls_slots_size; i++) {
  50                slot = &ls->ls_slots[i];
  51                if (!slot->nodeid)
  52                        continue;
  53                ro->ro_nodeid = cpu_to_le32(slot->nodeid);
  54                ro->ro_slot = cpu_to_le16(slot->slot);
  55                ro++;
  56        }
  57}
  58
  59#define SLOT_DEBUG_LINE 128
  60
  61static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
  62                      struct rcom_slot *ro0, struct dlm_slot *array,
  63                      int array_size)
  64{
  65        char line[SLOT_DEBUG_LINE];
  66        int len = SLOT_DEBUG_LINE - 1;
  67        int pos = 0;
  68        int ret, i;
  69
  70        memset(line, 0, sizeof(line));
  71
  72        if (array) {
  73                for (i = 0; i < array_size; i++) {
  74                        if (!array[i].nodeid)
  75                                continue;
  76
  77                        ret = snprintf(line + pos, len - pos, " %d:%d",
  78                                       array[i].slot, array[i].nodeid);
  79                        if (ret >= len - pos)
  80                                break;
  81                        pos += ret;
  82                }
  83        } else if (ro0) {
  84                for (i = 0; i < num_slots; i++) {
  85                        ret = snprintf(line + pos, len - pos, " %d:%d",
  86                                       ro0[i].ro_slot, ro0[i].ro_nodeid);
  87                        if (ret >= len - pos)
  88                                break;
  89                        pos += ret;
  90                }
  91        }
  92
  93        log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line);
  94}
  95
  96int dlm_slots_copy_in(struct dlm_ls *ls)
  97{
  98        struct dlm_member *memb;
  99        struct dlm_rcom *rc = ls->ls_recover_buf;
 100        struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
 101        struct rcom_slot *ro0, *ro;
 102        int our_nodeid = dlm_our_nodeid();
 103        int i, num_slots;
 104        uint32_t gen;
 105
 106        if (!dlm_slots_version(&rc->rc_header))
 107                return -1;
 108
 109        gen = le32_to_cpu(rf->rf_generation);
 110        if (gen <= ls->ls_generation) {
 111                log_error(ls, "dlm_slots_copy_in gen %u old %u",
 112                          gen, ls->ls_generation);
 113        }
 114        ls->ls_generation = gen;
 115
 116        num_slots = le16_to_cpu(rf->rf_num_slots);
 117        if (!num_slots)
 118                return -1;
 119
 120        ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
 121
 122        for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
 123                ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid);
 124                ro->ro_slot = le16_to_cpu(ro->ro_slot);
 125        }
 126
 127        log_slots(ls, gen, num_slots, ro0, NULL, 0);
 128
 129        list_for_each_entry(memb, &ls->ls_nodes, list) {
 130                for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
 131                        if (ro->ro_nodeid != memb->nodeid)
 132                                continue;
 133                        memb->slot = ro->ro_slot;
 134                        memb->slot_prev = memb->slot;
 135                        break;
 136                }
 137
 138                if (memb->nodeid == our_nodeid) {
 139                        if (ls->ls_slot && ls->ls_slot != memb->slot) {
 140                                log_error(ls, "dlm_slots_copy_in our slot "
 141                                          "changed %d %d", ls->ls_slot,
 142                                          memb->slot);
 143                                return -1;
 144                        }
 145
 146                        if (!ls->ls_slot)
 147                                ls->ls_slot = memb->slot;
 148                }
 149
 150                if (!memb->slot) {
 151                        log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
 152                                   memb->nodeid);
 153                        return -1;
 154                }
 155        }
 156
 157        return 0;
 158}
 159
 160/* for any nodes that do not support slots, we will not have set memb->slot
 161   in wait_status_all(), so memb->slot will remain -1, and we will not
 162   assign slots or set ls_num_slots here */
 163
 164int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
 165                     struct dlm_slot **slots_out, uint32_t *gen_out)
 166{
 167        struct dlm_member *memb;
 168        struct dlm_slot *array;
 169        int our_nodeid = dlm_our_nodeid();
 170        int array_size, max_slots, i;
 171        int need = 0;
 172        int max = 0;
 173        int num = 0;
 174        uint32_t gen = 0;
 175
 176        /* our own memb struct will have slot -1 gen 0 */
 177
 178        list_for_each_entry(memb, &ls->ls_nodes, list) {
 179                if (memb->nodeid == our_nodeid) {
 180                        memb->slot = ls->ls_slot;
 181                        memb->generation = ls->ls_generation;
 182                        break;
 183                }
 184        }
 185
 186        list_for_each_entry(memb, &ls->ls_nodes, list) {
 187                if (memb->generation > gen)
 188                        gen = memb->generation;
 189
 190                /* node doesn't support slots */
 191
 192                if (memb->slot == -1)
 193                        return -1;
 194
 195                /* node needs a slot assigned */
 196
 197                if (!memb->slot)
 198                        need++;
 199
 200                /* node has a slot assigned */
 201
 202                num++;
 203
 204                if (!max || max < memb->slot)
 205                        max = memb->slot;
 206
 207                /* sanity check, once slot is assigned it shouldn't change */
 208
 209                if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
 210                        log_error(ls, "nodeid %d slot changed %d %d",
 211                                  memb->nodeid, memb->slot_prev, memb->slot);
 212                        return -1;
 213                }
 214                memb->slot_prev = memb->slot;
 215        }
 216
 217        array_size = max + need;
 218        array = kcalloc(array_size, sizeof(*array), GFP_NOFS);
 219        if (!array)
 220                return -ENOMEM;
 221
 222        num = 0;
 223
 224        /* fill in slots (offsets) that are used */
 225
 226        list_for_each_entry(memb, &ls->ls_nodes, list) {
 227                if (!memb->slot)
 228                        continue;
 229
 230                if (memb->slot > array_size) {
 231                        log_error(ls, "invalid slot number %d", memb->slot);
 232                        kfree(array);
 233                        return -1;
 234                }
 235
 236                array[memb->slot - 1].nodeid = memb->nodeid;
 237                array[memb->slot - 1].slot = memb->slot;
 238                num++;
 239        }
 240
 241        /* assign new slots from unused offsets */
 242
 243        list_for_each_entry(memb, &ls->ls_nodes, list) {
 244                if (memb->slot)
 245                        continue;
 246
 247                for (i = 0; i < array_size; i++) {
 248                        if (array[i].nodeid)
 249                                continue;
 250
 251                        memb->slot = i + 1;
 252                        memb->slot_prev = memb->slot;
 253                        array[i].nodeid = memb->nodeid;
 254                        array[i].slot = memb->slot;
 255                        num++;
 256
 257                        if (!ls->ls_slot && memb->nodeid == our_nodeid)
 258                                ls->ls_slot = memb->slot;
 259                        break;
 260                }
 261
 262                if (!memb->slot) {
 263                        log_error(ls, "no free slot found");
 264                        kfree(array);
 265                        return -1;
 266                }
 267        }
 268
 269        gen++;
 270
 271        log_slots(ls, gen, num, NULL, array, array_size);
 272
 273        max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) -
 274                     sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
 275
 276        if (num > max_slots) {
 277                log_error(ls, "num_slots %d exceeds max_slots %d",
 278                          num, max_slots);
 279                kfree(array);
 280                return -1;
 281        }
 282
 283        *gen_out = gen;
 284        *slots_out = array;
 285        *slots_size = array_size;
 286        *num_slots = num;
 287        return 0;
 288}
 289
 290static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
 291{
 292        struct dlm_member *memb = NULL;
 293        struct list_head *tmp;
 294        struct list_head *newlist = &new->list;
 295        struct list_head *head = &ls->ls_nodes;
 296
 297        list_for_each(tmp, head) {
 298                memb = list_entry(tmp, struct dlm_member, list);
 299                if (new->nodeid < memb->nodeid)
 300                        break;
 301        }
 302
 303        if (!memb)
 304                list_add_tail(newlist, head);
 305        else {
 306                /* FIXME: can use list macro here */
 307                newlist->prev = tmp->prev;
 308                newlist->next = tmp;
 309                tmp->prev->next = newlist;
 310                tmp->prev = newlist;
 311        }
 312}
 313
 314static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
 315{
 316        struct dlm_member *memb;
 317        int error;
 318
 319        memb = kzalloc(sizeof(*memb), GFP_NOFS);
 320        if (!memb)
 321                return -ENOMEM;
 322
 323        error = dlm_lowcomms_connect_node(node->nodeid);
 324        if (error < 0) {
 325                kfree(memb);
 326                return error;
 327        }
 328
 329        memb->nodeid = node->nodeid;
 330        memb->weight = node->weight;
 331        memb->comm_seq = node->comm_seq;
 332        add_ordered_member(ls, memb);
 333        ls->ls_num_nodes++;
 334        return 0;
 335}
 336
 337static struct dlm_member *find_memb(struct list_head *head, int nodeid)
 338{
 339        struct dlm_member *memb;
 340
 341        list_for_each_entry(memb, head, list) {
 342                if (memb->nodeid == nodeid)
 343                        return memb;
 344        }
 345        return NULL;
 346}
 347
 348int dlm_is_member(struct dlm_ls *ls, int nodeid)
 349{
 350        if (find_memb(&ls->ls_nodes, nodeid))
 351                return 1;
 352        return 0;
 353}
 354
 355int dlm_is_removed(struct dlm_ls *ls, int nodeid)
 356{
 357        if (find_memb(&ls->ls_nodes_gone, nodeid))
 358                return 1;
 359        return 0;
 360}
 361
 362static void clear_memb_list(struct list_head *head)
 363{
 364        struct dlm_member *memb;
 365
 366        while (!list_empty(head)) {
 367                memb = list_entry(head->next, struct dlm_member, list);
 368                list_del(&memb->list);
 369                kfree(memb);
 370        }
 371}
 372
 373void dlm_clear_members(struct dlm_ls *ls)
 374{
 375        clear_memb_list(&ls->ls_nodes);
 376        ls->ls_num_nodes = 0;
 377}
 378
 379void dlm_clear_members_gone(struct dlm_ls *ls)
 380{
 381        clear_memb_list(&ls->ls_nodes_gone);
 382}
 383
 384static void make_member_array(struct dlm_ls *ls)
 385{
 386        struct dlm_member *memb;
 387        int i, w, x = 0, total = 0, all_zero = 0, *array;
 388
 389        kfree(ls->ls_node_array);
 390        ls->ls_node_array = NULL;
 391
 392        list_for_each_entry(memb, &ls->ls_nodes, list) {
 393                if (memb->weight)
 394                        total += memb->weight;
 395        }
 396
 397        /* all nodes revert to weight of 1 if all have weight 0 */
 398
 399        if (!total) {
 400                total = ls->ls_num_nodes;
 401                all_zero = 1;
 402        }
 403
 404        ls->ls_total_weight = total;
 405        array = kmalloc_array(total, sizeof(*array), GFP_NOFS);
 406        if (!array)
 407                return;
 408
 409        list_for_each_entry(memb, &ls->ls_nodes, list) {
 410                if (!all_zero && !memb->weight)
 411                        continue;
 412
 413                if (all_zero)
 414                        w = 1;
 415                else
 416                        w = memb->weight;
 417
 418                DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
 419
 420                for (i = 0; i < w; i++)
 421                        array[x++] = memb->nodeid;
 422        }
 423
 424        ls->ls_node_array = array;
 425}
 426
 427/* send a status request to all members just to establish comms connections */
 428
 429static int ping_members(struct dlm_ls *ls)
 430{
 431        struct dlm_member *memb;
 432        int error = 0;
 433
 434        list_for_each_entry(memb, &ls->ls_nodes, list) {
 435                error = dlm_recovery_stopped(ls);
 436                if (error)
 437                        break;
 438                error = dlm_rcom_status(ls, memb->nodeid, 0);
 439                if (error)
 440                        break;
 441        }
 442        if (error)
 443                log_rinfo(ls, "ping_members aborted %d last nodeid %d",
 444                          error, ls->ls_recover_nodeid);
 445        return error;
 446}
 447
 448static void dlm_lsop_recover_prep(struct dlm_ls *ls)
 449{
 450        if (!ls->ls_ops || !ls->ls_ops->recover_prep)
 451                return;
 452        ls->ls_ops->recover_prep(ls->ls_ops_arg);
 453}
 454
 455static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
 456{
 457        struct dlm_slot slot;
 458        uint32_t seq;
 459        int error;
 460
 461        if (!ls->ls_ops || !ls->ls_ops->recover_slot)
 462                return;
 463
 464        /* if there is no comms connection with this node
 465           or the present comms connection is newer
 466           than the one when this member was added, then
 467           we consider the node to have failed (versus
 468           being removed due to dlm_release_lockspace) */
 469
 470        error = dlm_comm_seq(memb->nodeid, &seq);
 471
 472        if (!error && seq == memb->comm_seq)
 473                return;
 474
 475        slot.nodeid = memb->nodeid;
 476        slot.slot = memb->slot;
 477
 478        ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
 479}
 480
 481void dlm_lsop_recover_done(struct dlm_ls *ls)
 482{
 483        struct dlm_member *memb;
 484        struct dlm_slot *slots;
 485        int i, num;
 486
 487        if (!ls->ls_ops || !ls->ls_ops->recover_done)
 488                return;
 489
 490        num = ls->ls_num_nodes;
 491        slots = kcalloc(num, sizeof(*slots), GFP_KERNEL);
 492        if (!slots)
 493                return;
 494
 495        i = 0;
 496        list_for_each_entry(memb, &ls->ls_nodes, list) {
 497                if (i == num) {
 498                        log_error(ls, "dlm_lsop_recover_done bad num %d", num);
 499                        goto out;
 500                }
 501                slots[i].nodeid = memb->nodeid;
 502                slots[i].slot = memb->slot;
 503                i++;
 504        }
 505
 506        ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
 507                                 ls->ls_slot, ls->ls_generation);
 508 out:
 509        kfree(slots);
 510}
 511
 512static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
 513                                                int nodeid)
 514{
 515        int i;
 516
 517        for (i = 0; i < rv->nodes_count; i++) {
 518                if (rv->nodes[i].nodeid == nodeid)
 519                        return &rv->nodes[i];
 520        }
 521        return NULL;
 522}
 523
 524int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
 525{
 526        struct dlm_member *memb, *safe;
 527        struct dlm_config_node *node;
 528        int i, error, neg = 0, low = -1;
 529
 530        /* previously removed members that we've not finished removing need to
 531           count as a negative change so the "neg" recovery steps will happen */
 532
 533        list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
 534                log_rinfo(ls, "prev removed member %d", memb->nodeid);
 535                neg++;
 536        }
 537
 538        /* move departed members from ls_nodes to ls_nodes_gone */
 539
 540        list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
 541                node = find_config_node(rv, memb->nodeid);
 542                if (node && !node->new)
 543                        continue;
 544
 545                if (!node) {
 546                        log_rinfo(ls, "remove member %d", memb->nodeid);
 547                } else {
 548                        /* removed and re-added */
 549                        log_rinfo(ls, "remove member %d comm_seq %u %u",
 550                                  memb->nodeid, memb->comm_seq, node->comm_seq);
 551                }
 552
 553                neg++;
 554                list_move(&memb->list, &ls->ls_nodes_gone);
 555                ls->ls_num_nodes--;
 556                dlm_lsop_recover_slot(ls, memb);
 557        }
 558
 559        /* add new members to ls_nodes */
 560
 561        for (i = 0; i < rv->nodes_count; i++) {
 562                node = &rv->nodes[i];
 563                if (dlm_is_member(ls, node->nodeid))
 564                        continue;
 565                dlm_add_member(ls, node);
 566                log_rinfo(ls, "add member %d", node->nodeid);
 567        }
 568
 569        list_for_each_entry(memb, &ls->ls_nodes, list) {
 570                if (low == -1 || memb->nodeid < low)
 571                        low = memb->nodeid;
 572        }
 573        ls->ls_low_nodeid = low;
 574
 575        make_member_array(ls);
 576        *neg_out = neg;
 577
 578        error = ping_members(ls);
 579        if (!error || error == -EPROTO) {
 580                /* new_lockspace() may be waiting to know if the config
 581                   is good or bad */
 582                ls->ls_members_result = error;
 583                complete(&ls->ls_members_done);
 584        }
 585
 586        log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
 587        return error;
 588}
 589
 590/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
 591   dlm_ls_start() is called on any of them to start the new recovery. */
 592
 593int dlm_ls_stop(struct dlm_ls *ls)
 594{
 595        int new;
 596
 597        /*
 598         * Prevent dlm_recv from being in the middle of something when we do
 599         * the stop.  This includes ensuring dlm_recv isn't processing a
 600         * recovery message (rcom), while dlm_recoverd is aborting and
 601         * resetting things from an in-progress recovery.  i.e. we want
 602         * dlm_recoverd to abort its recovery without worrying about dlm_recv
 603         * processing an rcom at the same time.  Stopping dlm_recv also makes
 604         * it easy for dlm_receive_message() to check locking stopped and add a
 605         * message to the requestqueue without races.
 606         */
 607
 608        down_write(&ls->ls_recv_active);
 609
 610        /*
 611         * Abort any recovery that's in progress (see RECOVER_STOP,
 612         * dlm_recovery_stopped()) and tell any other threads running in the
 613         * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
 614         */
 615
 616        spin_lock(&ls->ls_recover_lock);
 617        set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 618        new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
 619        ls->ls_recover_seq++;
 620        spin_unlock(&ls->ls_recover_lock);
 621
 622        /*
 623         * Let dlm_recv run again, now any normal messages will be saved on the
 624         * requestqueue for later.
 625         */
 626
 627        up_write(&ls->ls_recv_active);
 628
 629        /*
 630         * This in_recovery lock does two things:
 631         * 1) Keeps this function from returning until all threads are out
 632         *    of locking routines and locking is truly stopped.
 633         * 2) Keeps any new requests from being processed until it's unlocked
 634         *    when recovery is complete.
 635         */
 636
 637        if (new) {
 638                set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
 639                wake_up_process(ls->ls_recoverd_task);
 640                wait_event(ls->ls_recover_lock_wait,
 641                           test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
 642        }
 643
 644        /*
 645         * The recoverd suspend/resume makes sure that dlm_recoverd (if
 646         * running) has noticed RECOVER_STOP above and quit processing the
 647         * previous recovery.
 648         */
 649
 650        dlm_recoverd_suspend(ls);
 651
 652        spin_lock(&ls->ls_recover_lock);
 653        kfree(ls->ls_slots);
 654        ls->ls_slots = NULL;
 655        ls->ls_num_slots = 0;
 656        ls->ls_slots_size = 0;
 657        ls->ls_recover_status = 0;
 658        spin_unlock(&ls->ls_recover_lock);
 659
 660        dlm_recoverd_resume(ls);
 661
 662        if (!ls->ls_recover_begin)
 663                ls->ls_recover_begin = jiffies;
 664
 665        dlm_lsop_recover_prep(ls);
 666        return 0;
 667}
 668
 669int dlm_ls_start(struct dlm_ls *ls)
 670{
 671        struct dlm_recover *rv, *rv_old;
 672        struct dlm_config_node *nodes = NULL;
 673        int error, count;
 674
 675        rv = kzalloc(sizeof(*rv), GFP_NOFS);
 676        if (!rv)
 677                return -ENOMEM;
 678
 679        error = dlm_config_nodes(ls->ls_name, &nodes, &count);
 680        if (error < 0)
 681                goto fail_rv;
 682
 683        spin_lock(&ls->ls_recover_lock);
 684
 685        /* the lockspace needs to be stopped before it can be started */
 686
 687        if (!dlm_locking_stopped(ls)) {
 688                spin_unlock(&ls->ls_recover_lock);
 689                log_error(ls, "start ignored: lockspace running");
 690                error = -EINVAL;
 691                goto fail;
 692        }
 693
 694        rv->nodes = nodes;
 695        rv->nodes_count = count;
 696        rv->seq = ++ls->ls_recover_seq;
 697        rv_old = ls->ls_recover_args;
 698        ls->ls_recover_args = rv;
 699        spin_unlock(&ls->ls_recover_lock);
 700
 701        if (rv_old) {
 702                log_error(ls, "unused recovery %llx %d",
 703                          (unsigned long long)rv_old->seq, rv_old->nodes_count);
 704                kfree(rv_old->nodes);
 705                kfree(rv_old);
 706        }
 707
 708        set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
 709        wake_up_process(ls->ls_recoverd_task);
 710        return 0;
 711
 712 fail:
 713        kfree(nodes);
 714 fail_rv:
 715        kfree(rv);
 716        return error;
 717}
 718
 719