linux/fs/dlm/member.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/******************************************************************************
   3*******************************************************************************
   4**
   5**  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
   6**
   7**
   8*******************************************************************************
   9******************************************************************************/
  10
  11#include "dlm_internal.h"
  12#include "lockspace.h"
  13#include "member.h"
  14#include "recoverd.h"
  15#include "recover.h"
  16#include "rcom.h"
  17#include "config.h"
  18#include "midcomms.h"
  19#include "lowcomms.h"
  20
  21int dlm_slots_version(struct dlm_header *h)
  22{
  23        if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS)
  24                return 0;
  25        return 1;
  26}
  27
  28void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
  29                   struct dlm_member *memb)
  30{
  31        struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
  32
  33        if (!dlm_slots_version(&rc->rc_header))
  34                return;
  35
  36        memb->slot = le16_to_cpu(rf->rf_our_slot);
  37        memb->generation = le32_to_cpu(rf->rf_generation);
  38}
  39
  40void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
  41{
  42        struct dlm_slot *slot;
  43        struct rcom_slot *ro;
  44        int i;
  45
  46        ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
  47
  48        /* ls_slots array is sparse, but not rcom_slots */
  49
  50        for (i = 0; i < ls->ls_slots_size; i++) {
  51                slot = &ls->ls_slots[i];
  52                if (!slot->nodeid)
  53                        continue;
  54                ro->ro_nodeid = cpu_to_le32(slot->nodeid);
  55                ro->ro_slot = cpu_to_le16(slot->slot);
  56                ro++;
  57        }
  58}
  59
  60#define SLOT_DEBUG_LINE 128
  61
  62static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
  63                      struct rcom_slot *ro0, struct dlm_slot *array,
  64                      int array_size)
  65{
  66        char line[SLOT_DEBUG_LINE];
  67        int len = SLOT_DEBUG_LINE - 1;
  68        int pos = 0;
  69        int ret, i;
  70
  71        memset(line, 0, sizeof(line));
  72
  73        if (array) {
  74                for (i = 0; i < array_size; i++) {
  75                        if (!array[i].nodeid)
  76                                continue;
  77
  78                        ret = snprintf(line + pos, len - pos, " %d:%d",
  79                                       array[i].slot, array[i].nodeid);
  80                        if (ret >= len - pos)
  81                                break;
  82                        pos += ret;
  83                }
  84        } else if (ro0) {
  85                for (i = 0; i < num_slots; i++) {
  86                        ret = snprintf(line + pos, len - pos, " %d:%d",
  87                                       ro0[i].ro_slot, ro0[i].ro_nodeid);
  88                        if (ret >= len - pos)
  89                                break;
  90                        pos += ret;
  91                }
  92        }
  93
  94        log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line);
  95}
  96
  97int dlm_slots_copy_in(struct dlm_ls *ls)
  98{
  99        struct dlm_member *memb;
 100        struct dlm_rcom *rc = ls->ls_recover_buf;
 101        struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
 102        struct rcom_slot *ro0, *ro;
 103        int our_nodeid = dlm_our_nodeid();
 104        int i, num_slots;
 105        uint32_t gen;
 106
 107        if (!dlm_slots_version(&rc->rc_header))
 108                return -1;
 109
 110        gen = le32_to_cpu(rf->rf_generation);
 111        if (gen <= ls->ls_generation) {
 112                log_error(ls, "dlm_slots_copy_in gen %u old %u",
 113                          gen, ls->ls_generation);
 114        }
 115        ls->ls_generation = gen;
 116
 117        num_slots = le16_to_cpu(rf->rf_num_slots);
 118        if (!num_slots)
 119                return -1;
 120
 121        ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
 122
 123        for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
 124                ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid);
 125                ro->ro_slot = le16_to_cpu(ro->ro_slot);
 126        }
 127
 128        log_slots(ls, gen, num_slots, ro0, NULL, 0);
 129
 130        list_for_each_entry(memb, &ls->ls_nodes, list) {
 131                for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
 132                        if (ro->ro_nodeid != memb->nodeid)
 133                                continue;
 134                        memb->slot = ro->ro_slot;
 135                        memb->slot_prev = memb->slot;
 136                        break;
 137                }
 138
 139                if (memb->nodeid == our_nodeid) {
 140                        if (ls->ls_slot && ls->ls_slot != memb->slot) {
 141                                log_error(ls, "dlm_slots_copy_in our slot "
 142                                          "changed %d %d", ls->ls_slot,
 143                                          memb->slot);
 144                                return -1;
 145                        }
 146
 147                        if (!ls->ls_slot)
 148                                ls->ls_slot = memb->slot;
 149                }
 150
 151                if (!memb->slot) {
 152                        log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
 153                                   memb->nodeid);
 154                        return -1;
 155                }
 156        }
 157
 158        return 0;
 159}
 160
 161/* for any nodes that do not support slots, we will not have set memb->slot
 162   in wait_status_all(), so memb->slot will remain -1, and we will not
 163   assign slots or set ls_num_slots here */
 164
 165int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
 166                     struct dlm_slot **slots_out, uint32_t *gen_out)
 167{
 168        struct dlm_member *memb;
 169        struct dlm_slot *array;
 170        int our_nodeid = dlm_our_nodeid();
 171        int array_size, max_slots, i;
 172        int need = 0;
 173        int max = 0;
 174        int num = 0;
 175        uint32_t gen = 0;
 176
 177        /* our own memb struct will have slot -1 gen 0 */
 178
 179        list_for_each_entry(memb, &ls->ls_nodes, list) {
 180                if (memb->nodeid == our_nodeid) {
 181                        memb->slot = ls->ls_slot;
 182                        memb->generation = ls->ls_generation;
 183                        break;
 184                }
 185        }
 186
 187        list_for_each_entry(memb, &ls->ls_nodes, list) {
 188                if (memb->generation > gen)
 189                        gen = memb->generation;
 190
 191                /* node doesn't support slots */
 192
 193                if (memb->slot == -1)
 194                        return -1;
 195
 196                /* node needs a slot assigned */
 197
 198                if (!memb->slot)
 199                        need++;
 200
 201                /* node has a slot assigned */
 202
 203                num++;
 204
 205                if (!max || max < memb->slot)
 206                        max = memb->slot;
 207
 208                /* sanity check, once slot is assigned it shouldn't change */
 209
 210                if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
 211                        log_error(ls, "nodeid %d slot changed %d %d",
 212                                  memb->nodeid, memb->slot_prev, memb->slot);
 213                        return -1;
 214                }
 215                memb->slot_prev = memb->slot;
 216        }
 217
 218        array_size = max + need;
 219        array = kcalloc(array_size, sizeof(*array), GFP_NOFS);
 220        if (!array)
 221                return -ENOMEM;
 222
 223        num = 0;
 224
 225        /* fill in slots (offsets) that are used */
 226
 227        list_for_each_entry(memb, &ls->ls_nodes, list) {
 228                if (!memb->slot)
 229                        continue;
 230
 231                if (memb->slot > array_size) {
 232                        log_error(ls, "invalid slot number %d", memb->slot);
 233                        kfree(array);
 234                        return -1;
 235                }
 236
 237                array[memb->slot - 1].nodeid = memb->nodeid;
 238                array[memb->slot - 1].slot = memb->slot;
 239                num++;
 240        }
 241
 242        /* assign new slots from unused offsets */
 243
 244        list_for_each_entry(memb, &ls->ls_nodes, list) {
 245                if (memb->slot)
 246                        continue;
 247
 248                for (i = 0; i < array_size; i++) {
 249                        if (array[i].nodeid)
 250                                continue;
 251
 252                        memb->slot = i + 1;
 253                        memb->slot_prev = memb->slot;
 254                        array[i].nodeid = memb->nodeid;
 255                        array[i].slot = memb->slot;
 256                        num++;
 257
 258                        if (!ls->ls_slot && memb->nodeid == our_nodeid)
 259                                ls->ls_slot = memb->slot;
 260                        break;
 261                }
 262
 263                if (!memb->slot) {
 264                        log_error(ls, "no free slot found");
 265                        kfree(array);
 266                        return -1;
 267                }
 268        }
 269
 270        gen++;
 271
 272        log_slots(ls, gen, num, NULL, array, array_size);
 273
 274        max_slots = (DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom) -
 275                     sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
 276
 277        if (num > max_slots) {
 278                log_error(ls, "num_slots %d exceeds max_slots %d",
 279                          num, max_slots);
 280                kfree(array);
 281                return -1;
 282        }
 283
 284        *gen_out = gen;
 285        *slots_out = array;
 286        *slots_size = array_size;
 287        *num_slots = num;
 288        return 0;
 289}
 290
 291static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
 292{
 293        struct dlm_member *memb = NULL;
 294        struct list_head *tmp;
 295        struct list_head *newlist = &new->list;
 296        struct list_head *head = &ls->ls_nodes;
 297
 298        list_for_each(tmp, head) {
 299                memb = list_entry(tmp, struct dlm_member, list);
 300                if (new->nodeid < memb->nodeid)
 301                        break;
 302        }
 303
 304        if (!memb)
 305                list_add_tail(newlist, head);
 306        else {
 307                /* FIXME: can use list macro here */
 308                newlist->prev = tmp->prev;
 309                newlist->next = tmp;
 310                tmp->prev->next = newlist;
 311                tmp->prev = newlist;
 312        }
 313}
 314
 315static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
 316{
 317        struct dlm_member *memb;
 318        int error;
 319
 320        memb = kzalloc(sizeof(*memb), GFP_NOFS);
 321        if (!memb)
 322                return -ENOMEM;
 323
 324        error = dlm_lowcomms_connect_node(node->nodeid);
 325        if (error < 0) {
 326                kfree(memb);
 327                return error;
 328        }
 329
 330        memb->nodeid = node->nodeid;
 331        memb->weight = node->weight;
 332        memb->comm_seq = node->comm_seq;
 333        dlm_midcomms_add_member(node->nodeid);
 334        add_ordered_member(ls, memb);
 335        ls->ls_num_nodes++;
 336        return 0;
 337}
 338
 339static struct dlm_member *find_memb(struct list_head *head, int nodeid)
 340{
 341        struct dlm_member *memb;
 342
 343        list_for_each_entry(memb, head, list) {
 344                if (memb->nodeid == nodeid)
 345                        return memb;
 346        }
 347        return NULL;
 348}
 349
 350int dlm_is_member(struct dlm_ls *ls, int nodeid)
 351{
 352        if (find_memb(&ls->ls_nodes, nodeid))
 353                return 1;
 354        return 0;
 355}
 356
 357int dlm_is_removed(struct dlm_ls *ls, int nodeid)
 358{
 359        if (find_memb(&ls->ls_nodes_gone, nodeid))
 360                return 1;
 361        return 0;
 362}
 363
 364static void clear_memb_list(struct list_head *head,
 365                            void (*after_del)(int nodeid))
 366{
 367        struct dlm_member *memb;
 368
 369        while (!list_empty(head)) {
 370                memb = list_entry(head->next, struct dlm_member, list);
 371                list_del(&memb->list);
 372                if (after_del)
 373                        after_del(memb->nodeid);
 374                kfree(memb);
 375        }
 376}
 377
 378static void clear_members_cb(int nodeid)
 379{
 380        dlm_midcomms_remove_member(nodeid);
 381}
 382
 383void dlm_clear_members(struct dlm_ls *ls)
 384{
 385        clear_memb_list(&ls->ls_nodes, clear_members_cb);
 386        ls->ls_num_nodes = 0;
 387}
 388
 389void dlm_clear_members_gone(struct dlm_ls *ls)
 390{
 391        clear_memb_list(&ls->ls_nodes_gone, NULL);
 392}
 393
 394static void make_member_array(struct dlm_ls *ls)
 395{
 396        struct dlm_member *memb;
 397        int i, w, x = 0, total = 0, all_zero = 0, *array;
 398
 399        kfree(ls->ls_node_array);
 400        ls->ls_node_array = NULL;
 401
 402        list_for_each_entry(memb, &ls->ls_nodes, list) {
 403                if (memb->weight)
 404                        total += memb->weight;
 405        }
 406
 407        /* all nodes revert to weight of 1 if all have weight 0 */
 408
 409        if (!total) {
 410                total = ls->ls_num_nodes;
 411                all_zero = 1;
 412        }
 413
 414        ls->ls_total_weight = total;
 415        array = kmalloc_array(total, sizeof(*array), GFP_NOFS);
 416        if (!array)
 417                return;
 418
 419        list_for_each_entry(memb, &ls->ls_nodes, list) {
 420                if (!all_zero && !memb->weight)
 421                        continue;
 422
 423                if (all_zero)
 424                        w = 1;
 425                else
 426                        w = memb->weight;
 427
 428                DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
 429
 430                for (i = 0; i < w; i++)
 431                        array[x++] = memb->nodeid;
 432        }
 433
 434        ls->ls_node_array = array;
 435}
 436
 437/* send a status request to all members just to establish comms connections */
 438
 439static int ping_members(struct dlm_ls *ls)
 440{
 441        struct dlm_member *memb;
 442        int error = 0;
 443
 444        list_for_each_entry(memb, &ls->ls_nodes, list) {
 445                error = dlm_recovery_stopped(ls);
 446                if (error) {
 447                        error = -EINTR;
 448                        break;
 449                }
 450                error = dlm_rcom_status(ls, memb->nodeid, 0);
 451                if (error)
 452                        break;
 453        }
 454        if (error)
 455                log_rinfo(ls, "ping_members aborted %d last nodeid %d",
 456                          error, ls->ls_recover_nodeid);
 457        return error;
 458}
 459
 460static void dlm_lsop_recover_prep(struct dlm_ls *ls)
 461{
 462        if (!ls->ls_ops || !ls->ls_ops->recover_prep)
 463                return;
 464        ls->ls_ops->recover_prep(ls->ls_ops_arg);
 465}
 466
 467static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
 468{
 469        struct dlm_slot slot;
 470        uint32_t seq;
 471        int error;
 472
 473        if (!ls->ls_ops || !ls->ls_ops->recover_slot)
 474                return;
 475
 476        /* if there is no comms connection with this node
 477           or the present comms connection is newer
 478           than the one when this member was added, then
 479           we consider the node to have failed (versus
 480           being removed due to dlm_release_lockspace) */
 481
 482        error = dlm_comm_seq(memb->nodeid, &seq);
 483
 484        if (!error && seq == memb->comm_seq)
 485                return;
 486
 487        slot.nodeid = memb->nodeid;
 488        slot.slot = memb->slot;
 489
 490        ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
 491}
 492
 493void dlm_lsop_recover_done(struct dlm_ls *ls)
 494{
 495        struct dlm_member *memb;
 496        struct dlm_slot *slots;
 497        int i, num;
 498
 499        if (!ls->ls_ops || !ls->ls_ops->recover_done)
 500                return;
 501
 502        num = ls->ls_num_nodes;
 503        slots = kcalloc(num, sizeof(*slots), GFP_KERNEL);
 504        if (!slots)
 505                return;
 506
 507        i = 0;
 508        list_for_each_entry(memb, &ls->ls_nodes, list) {
 509                if (i == num) {
 510                        log_error(ls, "dlm_lsop_recover_done bad num %d", num);
 511                        goto out;
 512                }
 513                slots[i].nodeid = memb->nodeid;
 514                slots[i].slot = memb->slot;
 515                i++;
 516        }
 517
 518        ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
 519                                 ls->ls_slot, ls->ls_generation);
 520 out:
 521        kfree(slots);
 522}
 523
 524static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
 525                                                int nodeid)
 526{
 527        int i;
 528
 529        for (i = 0; i < rv->nodes_count; i++) {
 530                if (rv->nodes[i].nodeid == nodeid)
 531                        return &rv->nodes[i];
 532        }
 533        return NULL;
 534}
 535
 536int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
 537{
 538        struct dlm_member *memb, *safe;
 539        struct dlm_config_node *node;
 540        int i, error, neg = 0, low = -1;
 541
 542        /* previously removed members that we've not finished removing need to
 543           count as a negative change so the "neg" recovery steps will happen */
 544
 545        list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
 546                log_rinfo(ls, "prev removed member %d", memb->nodeid);
 547                neg++;
 548        }
 549
 550        /* move departed members from ls_nodes to ls_nodes_gone */
 551
 552        list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
 553                node = find_config_node(rv, memb->nodeid);
 554                if (node && !node->new)
 555                        continue;
 556
 557                if (!node) {
 558                        log_rinfo(ls, "remove member %d", memb->nodeid);
 559                } else {
 560                        /* removed and re-added */
 561                        log_rinfo(ls, "remove member %d comm_seq %u %u",
 562                                  memb->nodeid, memb->comm_seq, node->comm_seq);
 563                }
 564
 565                neg++;
 566                list_move(&memb->list, &ls->ls_nodes_gone);
 567                dlm_midcomms_remove_member(memb->nodeid);
 568                ls->ls_num_nodes--;
 569                dlm_lsop_recover_slot(ls, memb);
 570        }
 571
 572        /* add new members to ls_nodes */
 573
 574        for (i = 0; i < rv->nodes_count; i++) {
 575                node = &rv->nodes[i];
 576                if (dlm_is_member(ls, node->nodeid))
 577                        continue;
 578                dlm_add_member(ls, node);
 579                log_rinfo(ls, "add member %d", node->nodeid);
 580        }
 581
 582        list_for_each_entry(memb, &ls->ls_nodes, list) {
 583                if (low == -1 || memb->nodeid < low)
 584                        low = memb->nodeid;
 585        }
 586        ls->ls_low_nodeid = low;
 587
 588        make_member_array(ls);
 589        *neg_out = neg;
 590
 591        error = ping_members(ls);
 592        /* error -EINTR means that a new recovery action is triggered.
 593         * We ignore this recovery action and let run the new one which might
 594         * have new member configuration.
 595         */
 596        if (error == -EINTR)
 597                error = 0;
 598
 599        /* new_lockspace() may be waiting to know if the config
 600         * is good or bad
 601         */
 602        ls->ls_members_result = error;
 603        complete(&ls->ls_members_done);
 604
 605        log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
 606        return error;
 607}
 608
 609/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
 610   dlm_ls_start() is called on any of them to start the new recovery. */
 611
 612int dlm_ls_stop(struct dlm_ls *ls)
 613{
 614        int new;
 615
 616        /*
 617         * Prevent dlm_recv from being in the middle of something when we do
 618         * the stop.  This includes ensuring dlm_recv isn't processing a
 619         * recovery message (rcom), while dlm_recoverd is aborting and
 620         * resetting things from an in-progress recovery.  i.e. we want
 621         * dlm_recoverd to abort its recovery without worrying about dlm_recv
 622         * processing an rcom at the same time.  Stopping dlm_recv also makes
 623         * it easy for dlm_receive_message() to check locking stopped and add a
 624         * message to the requestqueue without races.
 625         */
 626
 627        down_write(&ls->ls_recv_active);
 628
 629        /*
 630         * Abort any recovery that's in progress (see RECOVER_STOP,
 631         * dlm_recovery_stopped()) and tell any other threads running in the
 632         * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
 633         */
 634
 635        spin_lock(&ls->ls_recover_lock);
 636        set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 637        new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
 638        ls->ls_recover_seq++;
 639        spin_unlock(&ls->ls_recover_lock);
 640
 641        /*
 642         * Let dlm_recv run again, now any normal messages will be saved on the
 643         * requestqueue for later.
 644         */
 645
 646        up_write(&ls->ls_recv_active);
 647
 648        /*
 649         * This in_recovery lock does two things:
 650         * 1) Keeps this function from returning until all threads are out
 651         *    of locking routines and locking is truly stopped.
 652         * 2) Keeps any new requests from being processed until it's unlocked
 653         *    when recovery is complete.
 654         */
 655
 656        if (new) {
 657                set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
 658                wake_up_process(ls->ls_recoverd_task);
 659                wait_event(ls->ls_recover_lock_wait,
 660                           test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
 661        }
 662
 663        /*
 664         * The recoverd suspend/resume makes sure that dlm_recoverd (if
 665         * running) has noticed RECOVER_STOP above and quit processing the
 666         * previous recovery.
 667         */
 668
 669        dlm_recoverd_suspend(ls);
 670
 671        spin_lock(&ls->ls_recover_lock);
 672        kfree(ls->ls_slots);
 673        ls->ls_slots = NULL;
 674        ls->ls_num_slots = 0;
 675        ls->ls_slots_size = 0;
 676        ls->ls_recover_status = 0;
 677        spin_unlock(&ls->ls_recover_lock);
 678
 679        dlm_recoverd_resume(ls);
 680
 681        if (!ls->ls_recover_begin)
 682                ls->ls_recover_begin = jiffies;
 683
 684        dlm_lsop_recover_prep(ls);
 685        return 0;
 686}
 687
 688int dlm_ls_start(struct dlm_ls *ls)
 689{
 690        struct dlm_recover *rv, *rv_old;
 691        struct dlm_config_node *nodes = NULL;
 692        int error, count;
 693
 694        rv = kzalloc(sizeof(*rv), GFP_NOFS);
 695        if (!rv)
 696                return -ENOMEM;
 697
 698        error = dlm_config_nodes(ls->ls_name, &nodes, &count);
 699        if (error < 0)
 700                goto fail_rv;
 701
 702        spin_lock(&ls->ls_recover_lock);
 703
 704        /* the lockspace needs to be stopped before it can be started */
 705
 706        if (!dlm_locking_stopped(ls)) {
 707                spin_unlock(&ls->ls_recover_lock);
 708                log_error(ls, "start ignored: lockspace running");
 709                error = -EINVAL;
 710                goto fail;
 711        }
 712
 713        rv->nodes = nodes;
 714        rv->nodes_count = count;
 715        rv->seq = ++ls->ls_recover_seq;
 716        rv_old = ls->ls_recover_args;
 717        ls->ls_recover_args = rv;
 718        spin_unlock(&ls->ls_recover_lock);
 719
 720        if (rv_old) {
 721                log_error(ls, "unused recovery %llx %d",
 722                          (unsigned long long)rv_old->seq, rv_old->nodes_count);
 723                kfree(rv_old->nodes);
 724                kfree(rv_old);
 725        }
 726
 727        set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
 728        wake_up_process(ls->ls_recoverd_task);
 729        return 0;
 730
 731 fail:
 732        kfree(nodes);
 733 fail_rv:
 734        kfree(rv);
 735        return error;
 736}
 737
 738