linux/fs/ocfs2/cluster/nodemanager.c
<<
>>
Prefs
   1/* -*- mode: c; c-basic-offset: 8; -*-
   2 * vim: noexpandtab sw=8 ts=8 sts=0:
   3 *
   4 * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public
   8 * License as published by the Free Software Foundation; either
   9 * version 2 of the License, or (at your option) any later version.
  10 *
  11 * This program is distributed in the hope that it will be useful,
  12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * General Public License for more details.
  15 *
  16 * You should have received a copy of the GNU General Public
  17 * License along with this program; if not, write to the
  18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  19 * Boston, MA 021110-1307, USA.
  20 */
  21
  22#include <linux/slab.h>
  23#include <linux/kernel.h>
  24#include <linux/module.h>
  25#include <linux/configfs.h>
  26
  27#include "tcp.h"
  28#include "nodemanager.h"
  29#include "heartbeat.h"
  30#include "masklog.h"
  31#include "sys.h"
  32
  33/* for now we operate under the assertion that there can be only one
  34 * cluster active at a time.  Changing this will require trickling
  35 * cluster references throughout where nodes are looked up */
  36struct o2nm_cluster *o2nm_single_cluster = NULL;
  37
  38char *o2nm_fence_method_desc[O2NM_FENCE_METHODS] = {
  39                "reset",        /* O2NM_FENCE_RESET */
  40                "panic",        /* O2NM_FENCE_PANIC */
  41};
  42
  43struct o2nm_node *o2nm_get_node_by_num(u8 node_num)
  44{
  45        struct o2nm_node *node = NULL;
  46
  47        if (node_num >= O2NM_MAX_NODES || o2nm_single_cluster == NULL)
  48                goto out;
  49
  50        read_lock(&o2nm_single_cluster->cl_nodes_lock);
  51        node = o2nm_single_cluster->cl_nodes[node_num];
  52        if (node)
  53                config_item_get(&node->nd_item);
  54        read_unlock(&o2nm_single_cluster->cl_nodes_lock);
  55out:
  56        return node;
  57}
  58EXPORT_SYMBOL_GPL(o2nm_get_node_by_num);
  59
  60int o2nm_configured_node_map(unsigned long *map, unsigned bytes)
  61{
  62        struct o2nm_cluster *cluster = o2nm_single_cluster;
  63
  64        BUG_ON(bytes < (sizeof(cluster->cl_nodes_bitmap)));
  65
  66        if (cluster == NULL)
  67                return -EINVAL;
  68
  69        read_lock(&cluster->cl_nodes_lock);
  70        memcpy(map, cluster->cl_nodes_bitmap, sizeof(cluster->cl_nodes_bitmap));
  71        read_unlock(&cluster->cl_nodes_lock);
  72
  73        return 0;
  74}
  75EXPORT_SYMBOL_GPL(o2nm_configured_node_map);
  76
  77static struct o2nm_node *o2nm_node_ip_tree_lookup(struct o2nm_cluster *cluster,
  78                                                  __be32 ip_needle,
  79                                                  struct rb_node ***ret_p,
  80                                                  struct rb_node **ret_parent)
  81{
  82        struct rb_node **p = &cluster->cl_node_ip_tree.rb_node;
  83        struct rb_node *parent = NULL;
  84        struct o2nm_node *node, *ret = NULL;
  85
  86        while (*p) {
  87                int cmp;
  88
  89                parent = *p;
  90                node = rb_entry(parent, struct o2nm_node, nd_ip_node);
  91
  92                cmp = memcmp(&ip_needle, &node->nd_ipv4_address,
  93                                sizeof(ip_needle));
  94                if (cmp < 0)
  95                        p = &(*p)->rb_left;
  96                else if (cmp > 0)
  97                        p = &(*p)->rb_right;
  98                else {
  99                        ret = node;
 100                        break;
 101                }
 102        }
 103
 104        if (ret_p != NULL)
 105                *ret_p = p;
 106        if (ret_parent != NULL)
 107                *ret_parent = parent;
 108
 109        return ret;
 110}
 111
 112struct o2nm_node *o2nm_get_node_by_ip(__be32 addr)
 113{
 114        struct o2nm_node *node = NULL;
 115        struct o2nm_cluster *cluster = o2nm_single_cluster;
 116
 117        if (cluster == NULL)
 118                goto out;
 119
 120        read_lock(&cluster->cl_nodes_lock);
 121        node = o2nm_node_ip_tree_lookup(cluster, addr, NULL, NULL);
 122        if (node)
 123                config_item_get(&node->nd_item);
 124        read_unlock(&cluster->cl_nodes_lock);
 125
 126out:
 127        return node;
 128}
 129EXPORT_SYMBOL_GPL(o2nm_get_node_by_ip);
 130
 131void o2nm_node_put(struct o2nm_node *node)
 132{
 133        config_item_put(&node->nd_item);
 134}
 135EXPORT_SYMBOL_GPL(o2nm_node_put);
 136
 137void o2nm_node_get(struct o2nm_node *node)
 138{
 139        config_item_get(&node->nd_item);
 140}
 141EXPORT_SYMBOL_GPL(o2nm_node_get);
 142
 143u8 o2nm_this_node(void)
 144{
 145        u8 node_num = O2NM_MAX_NODES;
 146
 147        if (o2nm_single_cluster && o2nm_single_cluster->cl_has_local)
 148                node_num = o2nm_single_cluster->cl_local_node;
 149
 150        return node_num;
 151}
 152EXPORT_SYMBOL_GPL(o2nm_this_node);
 153
 154/* node configfs bits */
 155
 156static struct o2nm_cluster *to_o2nm_cluster(struct config_item *item)
 157{
 158        return item ?
 159                container_of(to_config_group(item), struct o2nm_cluster,
 160                             cl_group)
 161                : NULL;
 162}
 163
 164static struct o2nm_node *to_o2nm_node(struct config_item *item)
 165{
 166        return item ? container_of(item, struct o2nm_node, nd_item) : NULL;
 167}
 168
 169static void o2nm_node_release(struct config_item *item)
 170{
 171        struct o2nm_node *node = to_o2nm_node(item);
 172        kfree(node);
 173}
 174
 175static ssize_t o2nm_node_num_show(struct config_item *item, char *page)
 176{
 177        return sprintf(page, "%d\n", to_o2nm_node(item)->nd_num);
 178}
 179
 180static struct o2nm_cluster *to_o2nm_cluster_from_node(struct o2nm_node *node)
 181{
 182        /* through the first node_set .parent
 183         * mycluster/nodes/mynode == o2nm_cluster->o2nm_node_group->o2nm_node */
 184        return to_o2nm_cluster(node->nd_item.ci_parent->ci_parent);
 185}
 186
 187enum {
 188        O2NM_NODE_ATTR_NUM = 0,
 189        O2NM_NODE_ATTR_PORT,
 190        O2NM_NODE_ATTR_ADDRESS,
 191};
 192
 193static ssize_t o2nm_node_num_store(struct config_item *item, const char *page,
 194                                   size_t count)
 195{
 196        struct o2nm_node *node = to_o2nm_node(item);
 197        struct o2nm_cluster *cluster = to_o2nm_cluster_from_node(node);
 198        unsigned long tmp;
 199        char *p = (char *)page;
 200        int ret = 0;
 201
 202        tmp = simple_strtoul(p, &p, 0);
 203        if (!p || (*p && (*p != '\n')))
 204                return -EINVAL;
 205
 206        if (tmp >= O2NM_MAX_NODES)
 207                return -ERANGE;
 208
 209        /* once we're in the cl_nodes tree networking can look us up by
 210         * node number and try to use our address and port attributes
 211         * to connect to this node.. make sure that they've been set
 212         * before writing the node attribute? */
 213        if (!test_bit(O2NM_NODE_ATTR_ADDRESS, &node->nd_set_attributes) ||
 214            !test_bit(O2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
 215                return -EINVAL; /* XXX */
 216
 217        write_lock(&cluster->cl_nodes_lock);
 218        if (cluster->cl_nodes[tmp])
 219                ret = -EEXIST;
 220        else if (test_and_set_bit(O2NM_NODE_ATTR_NUM,
 221                        &node->nd_set_attributes))
 222                ret = -EBUSY;
 223        else  {
 224                cluster->cl_nodes[tmp] = node;
 225                node->nd_num = tmp;
 226                set_bit(tmp, cluster->cl_nodes_bitmap);
 227        }
 228        write_unlock(&cluster->cl_nodes_lock);
 229        if (ret)
 230                return ret;
 231
 232        return count;
 233}
 234static ssize_t o2nm_node_ipv4_port_show(struct config_item *item, char *page)
 235{
 236        return sprintf(page, "%u\n", ntohs(to_o2nm_node(item)->nd_ipv4_port));
 237}
 238
 239static ssize_t o2nm_node_ipv4_port_store(struct config_item *item,
 240                                         const char *page, size_t count)
 241{
 242        struct o2nm_node *node = to_o2nm_node(item);
 243        unsigned long tmp;
 244        char *p = (char *)page;
 245
 246        tmp = simple_strtoul(p, &p, 0);
 247        if (!p || (*p && (*p != '\n')))
 248                return -EINVAL;
 249
 250        if (tmp == 0)
 251                return -EINVAL;
 252        if (tmp >= (u16)-1)
 253                return -ERANGE;
 254
 255        if (test_and_set_bit(O2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
 256                return -EBUSY;
 257        node->nd_ipv4_port = htons(tmp);
 258
 259        return count;
 260}
 261
 262static ssize_t o2nm_node_ipv4_address_show(struct config_item *item, char *page)
 263{
 264        return sprintf(page, "%pI4\n", &to_o2nm_node(item)->nd_ipv4_address);
 265}
 266
 267static ssize_t o2nm_node_ipv4_address_store(struct config_item *item,
 268                                            const char *page,
 269                                            size_t count)
 270{
 271        struct o2nm_node *node = to_o2nm_node(item);
 272        struct o2nm_cluster *cluster = to_o2nm_cluster_from_node(node);
 273        int ret, i;
 274        struct rb_node **p, *parent;
 275        unsigned int octets[4];
 276        __be32 ipv4_addr = 0;
 277
 278        ret = sscanf(page, "%3u.%3u.%3u.%3u", &octets[3], &octets[2],
 279                     &octets[1], &octets[0]);
 280        if (ret != 4)
 281                return -EINVAL;
 282
 283        for (i = 0; i < ARRAY_SIZE(octets); i++) {
 284                if (octets[i] > 255)
 285                        return -ERANGE;
 286                be32_add_cpu(&ipv4_addr, octets[i] << (i * 8));
 287        }
 288
 289        ret = 0;
 290        write_lock(&cluster->cl_nodes_lock);
 291        if (o2nm_node_ip_tree_lookup(cluster, ipv4_addr, &p, &parent))
 292                ret = -EEXIST;
 293        else if (test_and_set_bit(O2NM_NODE_ATTR_ADDRESS,
 294                        &node->nd_set_attributes))
 295                ret = -EBUSY;
 296        else {
 297                rb_link_node(&node->nd_ip_node, parent, p);
 298                rb_insert_color(&node->nd_ip_node, &cluster->cl_node_ip_tree);
 299        }
 300        write_unlock(&cluster->cl_nodes_lock);
 301        if (ret)
 302                return ret;
 303
 304        memcpy(&node->nd_ipv4_address, &ipv4_addr, sizeof(ipv4_addr));
 305
 306        return count;
 307}
 308
 309static ssize_t o2nm_node_local_show(struct config_item *item, char *page)
 310{
 311        return sprintf(page, "%d\n", to_o2nm_node(item)->nd_local);
 312}
 313
 314static ssize_t o2nm_node_local_store(struct config_item *item, const char *page,
 315                                     size_t count)
 316{
 317        struct o2nm_node *node = to_o2nm_node(item);
 318        struct o2nm_cluster *cluster = to_o2nm_cluster_from_node(node);
 319        unsigned long tmp;
 320        char *p = (char *)page;
 321        ssize_t ret;
 322
 323        tmp = simple_strtoul(p, &p, 0);
 324        if (!p || (*p && (*p != '\n')))
 325                return -EINVAL;
 326
 327        tmp = !!tmp; /* boolean of whether this node wants to be local */
 328
 329        /* setting local turns on networking rx for now so we require having
 330         * set everything else first */
 331        if (!test_bit(O2NM_NODE_ATTR_ADDRESS, &node->nd_set_attributes) ||
 332            !test_bit(O2NM_NODE_ATTR_NUM, &node->nd_set_attributes) ||
 333            !test_bit(O2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
 334                return -EINVAL; /* XXX */
 335
 336        /* the only failure case is trying to set a new local node
 337         * when a different one is already set */
 338        if (tmp && tmp == cluster->cl_has_local &&
 339            cluster->cl_local_node != node->nd_num)
 340                return -EBUSY;
 341
 342        /* bring up the rx thread if we're setting the new local node. */
 343        if (tmp && !cluster->cl_has_local) {
 344                ret = o2net_start_listening(node);
 345                if (ret)
 346                        return ret;
 347        }
 348
 349        if (!tmp && cluster->cl_has_local &&
 350            cluster->cl_local_node == node->nd_num) {
 351                o2net_stop_listening(node);
 352                cluster->cl_local_node = O2NM_INVALID_NODE_NUM;
 353        }
 354
 355        node->nd_local = tmp;
 356        if (node->nd_local) {
 357                cluster->cl_has_local = tmp;
 358                cluster->cl_local_node = node->nd_num;
 359        }
 360
 361        return count;
 362}
 363
 364CONFIGFS_ATTR(o2nm_node_, num);
 365CONFIGFS_ATTR(o2nm_node_, ipv4_port);
 366CONFIGFS_ATTR(o2nm_node_, ipv4_address);
 367CONFIGFS_ATTR(o2nm_node_, local);
 368
 369static struct configfs_attribute *o2nm_node_attrs[] = {
 370        &o2nm_node_attr_num,
 371        &o2nm_node_attr_ipv4_port,
 372        &o2nm_node_attr_ipv4_address,
 373        &o2nm_node_attr_local,
 374        NULL,
 375};
 376
 377static struct configfs_item_operations o2nm_node_item_ops = {
 378        .release                = o2nm_node_release,
 379};
 380
 381static struct config_item_type o2nm_node_type = {
 382        .ct_item_ops    = &o2nm_node_item_ops,
 383        .ct_attrs       = o2nm_node_attrs,
 384        .ct_owner       = THIS_MODULE,
 385};
 386
 387/* node set */
 388
 389struct o2nm_node_group {
 390        struct config_group ns_group;
 391        /* some stuff? */
 392};
 393
 394#if 0
 395static struct o2nm_node_group *to_o2nm_node_group(struct config_group *group)
 396{
 397        return group ?
 398                container_of(group, struct o2nm_node_group, ns_group)
 399                : NULL;
 400}
 401#endif
 402
 403static ssize_t o2nm_cluster_attr_write(const char *page, ssize_t count,
 404                                       unsigned int *val)
 405{
 406        unsigned long tmp;
 407        char *p = (char *)page;
 408
 409        tmp = simple_strtoul(p, &p, 0);
 410        if (!p || (*p && (*p != '\n')))
 411                return -EINVAL;
 412
 413        if (tmp == 0)
 414                return -EINVAL;
 415        if (tmp >= (u32)-1)
 416                return -ERANGE;
 417
 418        *val = tmp;
 419
 420        return count;
 421}
 422
 423static ssize_t o2nm_cluster_idle_timeout_ms_show(struct config_item *item,
 424        char *page)
 425{
 426        return sprintf(page, "%u\n", to_o2nm_cluster(item)->cl_idle_timeout_ms);
 427}
 428
 429static ssize_t o2nm_cluster_idle_timeout_ms_store(struct config_item *item,
 430        const char *page, size_t count)
 431{
 432        struct o2nm_cluster *cluster = to_o2nm_cluster(item);
 433        ssize_t ret;
 434        unsigned int val;
 435
 436        ret =  o2nm_cluster_attr_write(page, count, &val);
 437
 438        if (ret > 0) {
 439                if (cluster->cl_idle_timeout_ms != val
 440                        && o2net_num_connected_peers()) {
 441                        mlog(ML_NOTICE,
 442                             "o2net: cannot change idle timeout after "
 443                             "the first peer has agreed to it."
 444                             "  %d connected peers\n",
 445                             o2net_num_connected_peers());
 446                        ret = -EINVAL;
 447                } else if (val <= cluster->cl_keepalive_delay_ms) {
 448                        mlog(ML_NOTICE, "o2net: idle timeout must be larger "
 449                             "than keepalive delay\n");
 450                        ret = -EINVAL;
 451                } else {
 452                        cluster->cl_idle_timeout_ms = val;
 453                }
 454        }
 455
 456        return ret;
 457}
 458
 459static ssize_t o2nm_cluster_keepalive_delay_ms_show(
 460        struct config_item *item, char *page)
 461{
 462        return sprintf(page, "%u\n",
 463                        to_o2nm_cluster(item)->cl_keepalive_delay_ms);
 464}
 465
 466static ssize_t o2nm_cluster_keepalive_delay_ms_store(
 467        struct config_item *item, const char *page, size_t count)
 468{
 469        struct o2nm_cluster *cluster = to_o2nm_cluster(item);
 470        ssize_t ret;
 471        unsigned int val;
 472
 473        ret =  o2nm_cluster_attr_write(page, count, &val);
 474
 475        if (ret > 0) {
 476                if (cluster->cl_keepalive_delay_ms != val
 477                    && o2net_num_connected_peers()) {
 478                        mlog(ML_NOTICE,
 479                             "o2net: cannot change keepalive delay after"
 480                             " the first peer has agreed to it."
 481                             "  %d connected peers\n",
 482                             o2net_num_connected_peers());
 483                        ret = -EINVAL;
 484                } else if (val >= cluster->cl_idle_timeout_ms) {
 485                        mlog(ML_NOTICE, "o2net: keepalive delay must be "
 486                             "smaller than idle timeout\n");
 487                        ret = -EINVAL;
 488                } else {
 489                        cluster->cl_keepalive_delay_ms = val;
 490                }
 491        }
 492
 493        return ret;
 494}
 495
 496static ssize_t o2nm_cluster_reconnect_delay_ms_show(
 497        struct config_item *item, char *page)
 498{
 499        return sprintf(page, "%u\n",
 500                        to_o2nm_cluster(item)->cl_reconnect_delay_ms);
 501}
 502
 503static ssize_t o2nm_cluster_reconnect_delay_ms_store(
 504        struct config_item *item, const char *page, size_t count)
 505{
 506        return o2nm_cluster_attr_write(page, count,
 507                               &to_o2nm_cluster(item)->cl_reconnect_delay_ms);
 508}
 509
 510static ssize_t o2nm_cluster_fence_method_show(
 511        struct config_item *item, char *page)
 512{
 513        struct o2nm_cluster *cluster = to_o2nm_cluster(item);
 514        ssize_t ret = 0;
 515
 516        if (cluster)
 517                ret = sprintf(page, "%s\n",
 518                              o2nm_fence_method_desc[cluster->cl_fence_method]);
 519        return ret;
 520}
 521
 522static ssize_t o2nm_cluster_fence_method_store(
 523        struct config_item *item, const char *page, size_t count)
 524{
 525        unsigned int i;
 526
 527        if (page[count - 1] != '\n')
 528                goto bail;
 529
 530        for (i = 0; i < O2NM_FENCE_METHODS; ++i) {
 531                if (count != strlen(o2nm_fence_method_desc[i]) + 1)
 532                        continue;
 533                if (strncasecmp(page, o2nm_fence_method_desc[i], count - 1))
 534                        continue;
 535                if (to_o2nm_cluster(item)->cl_fence_method != i) {
 536                        printk(KERN_INFO "ocfs2: Changing fence method to %s\n",
 537                               o2nm_fence_method_desc[i]);
 538                        to_o2nm_cluster(item)->cl_fence_method = i;
 539                }
 540                return count;
 541        }
 542
 543bail:
 544        return -EINVAL;
 545}
 546
 547CONFIGFS_ATTR(o2nm_cluster_, idle_timeout_ms);
 548CONFIGFS_ATTR(o2nm_cluster_, keepalive_delay_ms);
 549CONFIGFS_ATTR(o2nm_cluster_, reconnect_delay_ms);
 550CONFIGFS_ATTR(o2nm_cluster_, fence_method);
 551
 552static struct configfs_attribute *o2nm_cluster_attrs[] = {
 553        &o2nm_cluster_attr_idle_timeout_ms,
 554        &o2nm_cluster_attr_keepalive_delay_ms,
 555        &o2nm_cluster_attr_reconnect_delay_ms,
 556        &o2nm_cluster_attr_fence_method,
 557        NULL,
 558};
 559
 560static struct config_item *o2nm_node_group_make_item(struct config_group *group,
 561                                                     const char *name)
 562{
 563        struct o2nm_node *node = NULL;
 564
 565        if (strlen(name) > O2NM_MAX_NAME_LEN)
 566                return ERR_PTR(-ENAMETOOLONG);
 567
 568        node = kzalloc(sizeof(struct o2nm_node), GFP_KERNEL);
 569        if (node == NULL)
 570                return ERR_PTR(-ENOMEM);
 571
 572        strcpy(node->nd_name, name); /* use item.ci_namebuf instead? */
 573        config_item_init_type_name(&node->nd_item, name, &o2nm_node_type);
 574        spin_lock_init(&node->nd_lock);
 575
 576        mlog(ML_CLUSTER, "o2nm: Registering node %s\n", name);
 577
 578        return &node->nd_item;
 579}
 580
 581static void o2nm_node_group_drop_item(struct config_group *group,
 582                                      struct config_item *item)
 583{
 584        struct o2nm_node *node = to_o2nm_node(item);
 585        struct o2nm_cluster *cluster = to_o2nm_cluster(group->cg_item.ci_parent);
 586
 587        o2net_disconnect_node(node);
 588
 589        if (cluster->cl_has_local &&
 590            (cluster->cl_local_node == node->nd_num)) {
 591                cluster->cl_has_local = 0;
 592                cluster->cl_local_node = O2NM_INVALID_NODE_NUM;
 593                o2net_stop_listening(node);
 594        }
 595
 596        /* XXX call into net to stop this node from trading messages */
 597
 598        write_lock(&cluster->cl_nodes_lock);
 599
 600        /* XXX sloppy */
 601        if (node->nd_ipv4_address)
 602                rb_erase(&node->nd_ip_node, &cluster->cl_node_ip_tree);
 603
 604        /* nd_num might be 0 if the node number hasn't been set.. */
 605        if (cluster->cl_nodes[node->nd_num] == node) {
 606                cluster->cl_nodes[node->nd_num] = NULL;
 607                clear_bit(node->nd_num, cluster->cl_nodes_bitmap);
 608        }
 609        write_unlock(&cluster->cl_nodes_lock);
 610
 611        mlog(ML_CLUSTER, "o2nm: Unregistered node %s\n",
 612             config_item_name(&node->nd_item));
 613
 614        config_item_put(item);
 615}
 616
 617static struct configfs_group_operations o2nm_node_group_group_ops = {
 618        .make_item      = o2nm_node_group_make_item,
 619        .drop_item      = o2nm_node_group_drop_item,
 620};
 621
 622static struct config_item_type o2nm_node_group_type = {
 623        .ct_group_ops   = &o2nm_node_group_group_ops,
 624        .ct_owner       = THIS_MODULE,
 625};
 626
 627/* cluster */
 628
 629static void o2nm_cluster_release(struct config_item *item)
 630{
 631        struct o2nm_cluster *cluster = to_o2nm_cluster(item);
 632
 633        kfree(cluster);
 634}
 635
 636static struct configfs_item_operations o2nm_cluster_item_ops = {
 637        .release        = o2nm_cluster_release,
 638};
 639
 640static struct config_item_type o2nm_cluster_type = {
 641        .ct_item_ops    = &o2nm_cluster_item_ops,
 642        .ct_attrs       = o2nm_cluster_attrs,
 643        .ct_owner       = THIS_MODULE,
 644};
 645
 646/* cluster set */
 647
 648struct o2nm_cluster_group {
 649        struct configfs_subsystem cs_subsys;
 650        /* some stuff? */
 651};
 652
 653#if 0
 654static struct o2nm_cluster_group *to_o2nm_cluster_group(struct config_group *group)
 655{
 656        return group ?
 657                container_of(to_configfs_subsystem(group), struct o2nm_cluster_group, cs_subsys)
 658               : NULL;
 659}
 660#endif
 661
 662static struct config_group *o2nm_cluster_group_make_group(struct config_group *group,
 663                                                          const char *name)
 664{
 665        struct o2nm_cluster *cluster = NULL;
 666        struct o2nm_node_group *ns = NULL;
 667        struct config_group *o2hb_group = NULL, *ret = NULL;
 668
 669        /* this runs under the parent dir's i_mutex; there can be only
 670         * one caller in here at a time */
 671        if (o2nm_single_cluster)
 672                return ERR_PTR(-ENOSPC);
 673
 674        cluster = kzalloc(sizeof(struct o2nm_cluster), GFP_KERNEL);
 675        ns = kzalloc(sizeof(struct o2nm_node_group), GFP_KERNEL);
 676        o2hb_group = o2hb_alloc_hb_set();
 677        if (cluster == NULL || ns == NULL || o2hb_group == NULL)
 678                goto out;
 679
 680        config_group_init_type_name(&cluster->cl_group, name,
 681                                    &o2nm_cluster_type);
 682        configfs_add_default_group(&ns->ns_group, &cluster->cl_group);
 683
 684        config_group_init_type_name(&ns->ns_group, "node",
 685                                    &o2nm_node_group_type);
 686        configfs_add_default_group(o2hb_group, &cluster->cl_group);
 687
 688        rwlock_init(&cluster->cl_nodes_lock);
 689        cluster->cl_node_ip_tree = RB_ROOT;
 690        cluster->cl_reconnect_delay_ms = O2NET_RECONNECT_DELAY_MS_DEFAULT;
 691        cluster->cl_idle_timeout_ms    = O2NET_IDLE_TIMEOUT_MS_DEFAULT;
 692        cluster->cl_keepalive_delay_ms = O2NET_KEEPALIVE_DELAY_MS_DEFAULT;
 693        cluster->cl_fence_method       = O2NM_FENCE_RESET;
 694
 695        ret = &cluster->cl_group;
 696        o2nm_single_cluster = cluster;
 697
 698out:
 699        if (ret == NULL) {
 700                kfree(cluster);
 701                kfree(ns);
 702                o2hb_free_hb_set(o2hb_group);
 703                ret = ERR_PTR(-ENOMEM);
 704        }
 705
 706        return ret;
 707}
 708
 709static void o2nm_cluster_group_drop_item(struct config_group *group, struct config_item *item)
 710{
 711        struct o2nm_cluster *cluster = to_o2nm_cluster(item);
 712
 713        BUG_ON(o2nm_single_cluster != cluster);
 714        o2nm_single_cluster = NULL;
 715
 716        configfs_remove_default_groups(&cluster->cl_group);
 717        config_item_put(item);
 718}
 719
 720static struct configfs_group_operations o2nm_cluster_group_group_ops = {
 721        .make_group     = o2nm_cluster_group_make_group,
 722        .drop_item      = o2nm_cluster_group_drop_item,
 723};
 724
 725static struct config_item_type o2nm_cluster_group_type = {
 726        .ct_group_ops   = &o2nm_cluster_group_group_ops,
 727        .ct_owner       = THIS_MODULE,
 728};
 729
 730static struct o2nm_cluster_group o2nm_cluster_group = {
 731        .cs_subsys = {
 732                .su_group = {
 733                        .cg_item = {
 734                                .ci_namebuf = "cluster",
 735                                .ci_type = &o2nm_cluster_group_type,
 736                        },
 737                },
 738        },
 739};
 740
 741int o2nm_depend_item(struct config_item *item)
 742{
 743        return configfs_depend_item(&o2nm_cluster_group.cs_subsys, item);
 744}
 745
 746void o2nm_undepend_item(struct config_item *item)
 747{
 748        configfs_undepend_item(item);
 749}
 750
 751int o2nm_depend_this_node(void)
 752{
 753        int ret = 0;
 754        struct o2nm_node *local_node;
 755
 756        local_node = o2nm_get_node_by_num(o2nm_this_node());
 757        if (!local_node) {
 758                ret = -EINVAL;
 759                goto out;
 760        }
 761
 762        ret = o2nm_depend_item(&local_node->nd_item);
 763        o2nm_node_put(local_node);
 764
 765out:
 766        return ret;
 767}
 768
 769void o2nm_undepend_this_node(void)
 770{
 771        struct o2nm_node *local_node;
 772
 773        local_node = o2nm_get_node_by_num(o2nm_this_node());
 774        BUG_ON(!local_node);
 775
 776        o2nm_undepend_item(&local_node->nd_item);
 777        o2nm_node_put(local_node);
 778}
 779
 780
 781static void __exit exit_o2nm(void)
 782{
 783        /* XXX sync with hb callbacks and shut down hb? */
 784        o2net_unregister_hb_callbacks();
 785        configfs_unregister_subsystem(&o2nm_cluster_group.cs_subsys);
 786        o2cb_sys_shutdown();
 787
 788        o2net_exit();
 789        o2hb_exit();
 790}
 791
 792static int __init init_o2nm(void)
 793{
 794        int ret = -1;
 795
 796        ret = o2hb_init();
 797        if (ret)
 798                goto out;
 799
 800        ret = o2net_init();
 801        if (ret)
 802                goto out_o2hb;
 803
 804        ret = o2net_register_hb_callbacks();
 805        if (ret)
 806                goto out_o2net;
 807
 808        config_group_init(&o2nm_cluster_group.cs_subsys.su_group);
 809        mutex_init(&o2nm_cluster_group.cs_subsys.su_mutex);
 810        ret = configfs_register_subsystem(&o2nm_cluster_group.cs_subsys);
 811        if (ret) {
 812                printk(KERN_ERR "nodemanager: Registration returned %d\n", ret);
 813                goto out_callbacks;
 814        }
 815
 816        ret = o2cb_sys_init();
 817        if (!ret)
 818                goto out;
 819
 820        configfs_unregister_subsystem(&o2nm_cluster_group.cs_subsys);
 821out_callbacks:
 822        o2net_unregister_hb_callbacks();
 823out_o2net:
 824        o2net_exit();
 825out_o2hb:
 826        o2hb_exit();
 827out:
 828        return ret;
 829}
 830
 831MODULE_AUTHOR("Oracle");
 832MODULE_LICENSE("GPL");
 833MODULE_DESCRIPTION("OCFS2 cluster management");
 834
 835module_init(init_o2nm)
 836module_exit(exit_o2nm)
 837