linux/fs/dlm/member.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) 2005-2009 Red Hat, Inc.  All rights reserved.
   5**
   6**  This copyrighted material is made available to anyone wishing to use,
   7**  modify, copy, or redistribute it subject to the terms and conditions
   8**  of the GNU General Public License v.2.
   9**
  10*******************************************************************************
  11******************************************************************************/
  12
  13#include "dlm_internal.h"
  14#include "lockspace.h"
  15#include "member.h"
  16#include "recoverd.h"
  17#include "recover.h"
  18#include "rcom.h"
  19#include "config.h"
  20#include "lowcomms.h"
  21
  22static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
  23{
  24        struct dlm_member *memb = NULL;
  25        struct list_head *tmp;
  26        struct list_head *newlist = &new->list;
  27        struct list_head *head = &ls->ls_nodes;
  28
  29        list_for_each(tmp, head) {
  30                memb = list_entry(tmp, struct dlm_member, list);
  31                if (new->nodeid < memb->nodeid)
  32                        break;
  33        }
  34
  35        if (!memb)
  36                list_add_tail(newlist, head);
  37        else {
  38                /* FIXME: can use list macro here */
  39                newlist->prev = tmp->prev;
  40                newlist->next = tmp;
  41                tmp->prev->next = newlist;
  42                tmp->prev = newlist;
  43        }
  44}
  45
  46static int dlm_add_member(struct dlm_ls *ls, int nodeid)
  47{
  48        struct dlm_member *memb;
  49        int w, error;
  50
  51        memb = kzalloc(sizeof(struct dlm_member), ls->ls_allocation);
  52        if (!memb)
  53                return -ENOMEM;
  54
  55        w = dlm_node_weight(ls->ls_name, nodeid);
  56        if (w < 0) {
  57                kfree(memb);
  58                return w;
  59        }
  60
  61        error = dlm_lowcomms_connect_node(nodeid);
  62        if (error < 0) {
  63                kfree(memb);
  64                return error;
  65        }
  66
  67        memb->nodeid = nodeid;
  68        memb->weight = w;
  69        add_ordered_member(ls, memb);
  70        ls->ls_num_nodes++;
  71        return 0;
  72}
  73
  74static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb)
  75{
  76        list_move(&memb->list, &ls->ls_nodes_gone);
  77        ls->ls_num_nodes--;
  78}
  79
  80int dlm_is_member(struct dlm_ls *ls, int nodeid)
  81{
  82        struct dlm_member *memb;
  83
  84        list_for_each_entry(memb, &ls->ls_nodes, list) {
  85                if (memb->nodeid == nodeid)
  86                        return 1;
  87        }
  88        return 0;
  89}
  90
  91int dlm_is_removed(struct dlm_ls *ls, int nodeid)
  92{
  93        struct dlm_member *memb;
  94
  95        list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
  96                if (memb->nodeid == nodeid)
  97                        return 1;
  98        }
  99        return 0;
 100}
 101
 102static void clear_memb_list(struct list_head *head)
 103{
 104        struct dlm_member *memb;
 105
 106        while (!list_empty(head)) {
 107                memb = list_entry(head->next, struct dlm_member, list);
 108                list_del(&memb->list);
 109                kfree(memb);
 110        }
 111}
 112
 113void dlm_clear_members(struct dlm_ls *ls)
 114{
 115        clear_memb_list(&ls->ls_nodes);
 116        ls->ls_num_nodes = 0;
 117}
 118
 119void dlm_clear_members_gone(struct dlm_ls *ls)
 120{
 121        clear_memb_list(&ls->ls_nodes_gone);
 122}
 123
 124static void make_member_array(struct dlm_ls *ls)
 125{
 126        struct dlm_member *memb;
 127        int i, w, x = 0, total = 0, all_zero = 0, *array;
 128
 129        kfree(ls->ls_node_array);
 130        ls->ls_node_array = NULL;
 131
 132        list_for_each_entry(memb, &ls->ls_nodes, list) {
 133                if (memb->weight)
 134                        total += memb->weight;
 135        }
 136
 137        /* all nodes revert to weight of 1 if all have weight 0 */
 138
 139        if (!total) {
 140                total = ls->ls_num_nodes;
 141                all_zero = 1;
 142        }
 143
 144        ls->ls_total_weight = total;
 145
 146        array = kmalloc(sizeof(int) * total, ls->ls_allocation);
 147        if (!array)
 148                return;
 149
 150        list_for_each_entry(memb, &ls->ls_nodes, list) {
 151                if (!all_zero && !memb->weight)
 152                        continue;
 153
 154                if (all_zero)
 155                        w = 1;
 156                else
 157                        w = memb->weight;
 158
 159                DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
 160
 161                for (i = 0; i < w; i++)
 162                        array[x++] = memb->nodeid;
 163        }
 164
 165        ls->ls_node_array = array;
 166}
 167
 168/* send a status request to all members just to establish comms connections */
 169
 170static int ping_members(struct dlm_ls *ls)
 171{
 172        struct dlm_member *memb;
 173        int error = 0;
 174
 175        list_for_each_entry(memb, &ls->ls_nodes, list) {
 176                error = dlm_recovery_stopped(ls);
 177                if (error)
 178                        break;
 179                error = dlm_rcom_status(ls, memb->nodeid);
 180                if (error)
 181                        break;
 182        }
 183        if (error)
 184                log_debug(ls, "ping_members aborted %d last nodeid %d",
 185                          error, ls->ls_recover_nodeid);
 186        return error;
 187}
 188
 189int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
 190{
 191        struct dlm_member *memb, *safe;
 192        int i, error, found, pos = 0, neg = 0, low = -1;
 193
 194        /* previously removed members that we've not finished removing need to
 195           count as a negative change so the "neg" recovery steps will happen */
 196
 197        list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
 198                log_debug(ls, "prev removed member %d", memb->nodeid);
 199                neg++;
 200        }
 201
 202        /* move departed members from ls_nodes to ls_nodes_gone */
 203
 204        list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
 205                found = 0;
 206                for (i = 0; i < rv->node_count; i++) {
 207                        if (memb->nodeid == rv->nodeids[i]) {
 208                                found = 1;
 209                                break;
 210                        }
 211                }
 212
 213                if (!found) {
 214                        neg++;
 215                        dlm_remove_member(ls, memb);
 216                        log_debug(ls, "remove member %d", memb->nodeid);
 217                }
 218        }
 219
 220        /* Add an entry to ls_nodes_gone for members that were removed and
 221           then added again, so that previous state for these nodes will be
 222           cleared during recovery. */
 223
 224        for (i = 0; i < rv->new_count; i++) {
 225                if (!dlm_is_member(ls, rv->new[i]))
 226                        continue;
 227                log_debug(ls, "new nodeid %d is a re-added member", rv->new[i]);
 228
 229                memb = kzalloc(sizeof(struct dlm_member), ls->ls_allocation);
 230                if (!memb)
 231                        return -ENOMEM;
 232                memb->nodeid = rv->new[i];
 233                list_add_tail(&memb->list, &ls->ls_nodes_gone);
 234                neg++;
 235        }
 236
 237        /* add new members to ls_nodes */
 238
 239        for (i = 0; i < rv->node_count; i++) {
 240                if (dlm_is_member(ls, rv->nodeids[i]))
 241                        continue;
 242                dlm_add_member(ls, rv->nodeids[i]);
 243                pos++;
 244                log_debug(ls, "add member %d", rv->nodeids[i]);
 245        }
 246
 247        list_for_each_entry(memb, &ls->ls_nodes, list) {
 248                if (low == -1 || memb->nodeid < low)
 249                        low = memb->nodeid;
 250        }
 251        ls->ls_low_nodeid = low;
 252
 253        make_member_array(ls);
 254        dlm_set_recover_status(ls, DLM_RS_NODES);
 255        *neg_out = neg;
 256
 257        error = ping_members(ls);
 258        if (!error || error == -EPROTO) {
 259                /* new_lockspace() may be waiting to know if the config
 260                   is good or bad */
 261                ls->ls_members_result = error;
 262                complete(&ls->ls_members_done);
 263        }
 264        if (error)
 265                goto out;
 266
 267        error = dlm_recover_members_wait(ls);
 268 out:
 269        log_debug(ls, "total members %d error %d", ls->ls_num_nodes, error);
 270        return error;
 271}
 272
 273/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
 274   dlm_ls_start() is called on any of them to start the new recovery. */
 275
 276int dlm_ls_stop(struct dlm_ls *ls)
 277{
 278        int new;
 279
 280        /*
 281         * Prevent dlm_recv from being in the middle of something when we do
 282         * the stop.  This includes ensuring dlm_recv isn't processing a
 283         * recovery message (rcom), while dlm_recoverd is aborting and
 284         * resetting things from an in-progress recovery.  i.e. we want
 285         * dlm_recoverd to abort its recovery without worrying about dlm_recv
 286         * processing an rcom at the same time.  Stopping dlm_recv also makes
 287         * it easy for dlm_receive_message() to check locking stopped and add a
 288         * message to the requestqueue without races.
 289         */
 290
 291        down_write(&ls->ls_recv_active);
 292
 293        /*
 294         * Abort any recovery that's in progress (see RECOVERY_STOP,
 295         * dlm_recovery_stopped()) and tell any other threads running in the
 296         * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
 297         */
 298
 299        spin_lock(&ls->ls_recover_lock);
 300        set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
 301        new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
 302        ls->ls_recover_seq++;
 303        spin_unlock(&ls->ls_recover_lock);
 304
 305        /*
 306         * Let dlm_recv run again, now any normal messages will be saved on the
 307         * requestqueue for later.
 308         */
 309
 310        up_write(&ls->ls_recv_active);
 311
 312        /*
 313         * This in_recovery lock does two things:
 314         * 1) Keeps this function from returning until all threads are out
 315         *    of locking routines and locking is truely stopped.
 316         * 2) Keeps any new requests from being processed until it's unlocked
 317         *    when recovery is complete.
 318         */
 319
 320        if (new)
 321                down_write(&ls->ls_in_recovery);
 322
 323        /*
 324         * The recoverd suspend/resume makes sure that dlm_recoverd (if
 325         * running) has noticed RECOVERY_STOP above and quit processing the
 326         * previous recovery.
 327         */
 328
 329        dlm_recoverd_suspend(ls);
 330        ls->ls_recover_status = 0;
 331        dlm_recoverd_resume(ls);
 332
 333        if (!ls->ls_recover_begin)
 334                ls->ls_recover_begin = jiffies;
 335        return 0;
 336}
 337
 338int dlm_ls_start(struct dlm_ls *ls)
 339{
 340        struct dlm_recover *rv = NULL, *rv_old;
 341        int *ids = NULL, *new = NULL;
 342        int error, ids_count = 0, new_count = 0;
 343
 344        rv = kzalloc(sizeof(struct dlm_recover), ls->ls_allocation);
 345        if (!rv)
 346                return -ENOMEM;
 347
 348        error = dlm_nodeid_list(ls->ls_name, &ids, &ids_count,
 349                                &new, &new_count);
 350        if (error < 0)
 351                goto fail;
 352
 353        spin_lock(&ls->ls_recover_lock);
 354
 355        /* the lockspace needs to be stopped before it can be started */
 356
 357        if (!dlm_locking_stopped(ls)) {
 358                spin_unlock(&ls->ls_recover_lock);
 359                log_error(ls, "start ignored: lockspace running");
 360                error = -EINVAL;
 361                goto fail;
 362        }
 363
 364        rv->nodeids = ids;
 365        rv->node_count = ids_count;
 366        rv->new = new;
 367        rv->new_count = new_count;
 368        rv->seq = ++ls->ls_recover_seq;
 369        rv_old = ls->ls_recover_args;
 370        ls->ls_recover_args = rv;
 371        spin_unlock(&ls->ls_recover_lock);
 372
 373        if (rv_old) {
 374                log_error(ls, "unused recovery %llx %d",
 375                          (unsigned long long)rv_old->seq, rv_old->node_count);
 376                kfree(rv_old->nodeids);
 377                kfree(rv_old->new);
 378                kfree(rv_old);
 379        }
 380
 381        dlm_recoverd_kick(ls);
 382        return 0;
 383
 384 fail:
 385        kfree(rv);
 386        kfree(ids);
 387        kfree(new);
 388        return error;
 389}
 390
 391