linux/fs/ocfs2/cluster/nodemanager.c
<<
>>
Prefs
   1/* -*- mode: c; c-basic-offset: 8; -*-
   2 * vim: noexpandtab sw=8 ts=8 sts=0:
   3 *
   4 * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public
   8 * License as published by the Free Software Foundation; either
   9 * version 2 of the License, or (at your option) any later version.
  10 *
  11 * This program is distributed in the hope that it will be useful,
  12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * General Public License for more details.
  15 *
  16 * You should have received a copy of the GNU General Public
  17 * License along with this program; if not, write to the
  18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  19 * Boston, MA 021110-1307, USA.
  20 */
  21
  22#include <linux/slab.h>
  23#include <linux/kernel.h>
  24#include <linux/module.h>
  25#include <linux/configfs.h>
  26
  27#include "tcp.h"
  28#include "nodemanager.h"
  29#include "heartbeat.h"
  30#include "masklog.h"
  31#include "sys.h"
  32#include "ver.h"
  33
  34/* for now we operate under the assertion that there can be only one
  35 * cluster active at a time.  Changing this will require trickling
  36 * cluster references throughout where nodes are looked up */
  37struct o2nm_cluster *o2nm_single_cluster = NULL;
  38
  39char *o2nm_fence_method_desc[O2NM_FENCE_METHODS] = {
  40                "reset",        /* O2NM_FENCE_RESET */
  41                "panic",        /* O2NM_FENCE_PANIC */
  42};
  43
  44struct o2nm_node *o2nm_get_node_by_num(u8 node_num)
  45{
  46        struct o2nm_node *node = NULL;
  47
  48        if (node_num >= O2NM_MAX_NODES || o2nm_single_cluster == NULL)
  49                goto out;
  50
  51        read_lock(&o2nm_single_cluster->cl_nodes_lock);
  52        node = o2nm_single_cluster->cl_nodes[node_num];
  53        if (node)
  54                config_item_get(&node->nd_item);
  55        read_unlock(&o2nm_single_cluster->cl_nodes_lock);
  56out:
  57        return node;
  58}
  59EXPORT_SYMBOL_GPL(o2nm_get_node_by_num);
  60
  61int o2nm_configured_node_map(unsigned long *map, unsigned bytes)
  62{
  63        struct o2nm_cluster *cluster = o2nm_single_cluster;
  64
  65        BUG_ON(bytes < (sizeof(cluster->cl_nodes_bitmap)));
  66
  67        if (cluster == NULL)
  68                return -EINVAL;
  69
  70        read_lock(&cluster->cl_nodes_lock);
  71        memcpy(map, cluster->cl_nodes_bitmap, sizeof(cluster->cl_nodes_bitmap));
  72        read_unlock(&cluster->cl_nodes_lock);
  73
  74        return 0;
  75}
  76EXPORT_SYMBOL_GPL(o2nm_configured_node_map);
  77
  78static struct o2nm_node *o2nm_node_ip_tree_lookup(struct o2nm_cluster *cluster,
  79                                                  __be32 ip_needle,
  80                                                  struct rb_node ***ret_p,
  81                                                  struct rb_node **ret_parent)
  82{
  83        struct rb_node **p = &cluster->cl_node_ip_tree.rb_node;
  84        struct rb_node *parent = NULL;
  85        struct o2nm_node *node, *ret = NULL;
  86
  87        while (*p) {
  88                int cmp;
  89
  90                parent = *p;
  91                node = rb_entry(parent, struct o2nm_node, nd_ip_node);
  92
  93                cmp = memcmp(&ip_needle, &node->nd_ipv4_address,
  94                                sizeof(ip_needle));
  95                if (cmp < 0)
  96                        p = &(*p)->rb_left;
  97                else if (cmp > 0)
  98                        p = &(*p)->rb_right;
  99                else {
 100                        ret = node;
 101                        break;
 102                }
 103        }
 104
 105        if (ret_p != NULL)
 106                *ret_p = p;
 107        if (ret_parent != NULL)
 108                *ret_parent = parent;
 109
 110        return ret;
 111}
 112
 113struct o2nm_node *o2nm_get_node_by_ip(__be32 addr)
 114{
 115        struct o2nm_node *node = NULL;
 116        struct o2nm_cluster *cluster = o2nm_single_cluster;
 117
 118        if (cluster == NULL)
 119                goto out;
 120
 121        read_lock(&cluster->cl_nodes_lock);
 122        node = o2nm_node_ip_tree_lookup(cluster, addr, NULL, NULL);
 123        if (node)
 124                config_item_get(&node->nd_item);
 125        read_unlock(&cluster->cl_nodes_lock);
 126
 127out:
 128        return node;
 129}
 130EXPORT_SYMBOL_GPL(o2nm_get_node_by_ip);
 131
 132void o2nm_node_put(struct o2nm_node *node)
 133{
 134        config_item_put(&node->nd_item);
 135}
 136EXPORT_SYMBOL_GPL(o2nm_node_put);
 137
 138void o2nm_node_get(struct o2nm_node *node)
 139{
 140        config_item_get(&node->nd_item);
 141}
 142EXPORT_SYMBOL_GPL(o2nm_node_get);
 143
 144u8 o2nm_this_node(void)
 145{
 146        u8 node_num = O2NM_MAX_NODES;
 147
 148        if (o2nm_single_cluster && o2nm_single_cluster->cl_has_local)
 149                node_num = o2nm_single_cluster->cl_local_node;
 150
 151        return node_num;
 152}
 153EXPORT_SYMBOL_GPL(o2nm_this_node);
 154
 155/* node configfs bits */
 156
 157static struct o2nm_cluster *to_o2nm_cluster(struct config_item *item)
 158{
 159        return item ?
 160                container_of(to_config_group(item), struct o2nm_cluster,
 161                             cl_group)
 162                : NULL;
 163}
 164
 165static struct o2nm_node *to_o2nm_node(struct config_item *item)
 166{
 167        return item ? container_of(item, struct o2nm_node, nd_item) : NULL;
 168}
 169
 170static void o2nm_node_release(struct config_item *item)
 171{
 172        struct o2nm_node *node = to_o2nm_node(item);
 173        kfree(node);
 174}
 175
 176static ssize_t o2nm_node_num_read(struct o2nm_node *node, char *page)
 177{
 178        return sprintf(page, "%d\n", node->nd_num);
 179}
 180
 181static struct o2nm_cluster *to_o2nm_cluster_from_node(struct o2nm_node *node)
 182{
 183        /* through the first node_set .parent
 184         * mycluster/nodes/mynode == o2nm_cluster->o2nm_node_group->o2nm_node */
 185        return to_o2nm_cluster(node->nd_item.ci_parent->ci_parent);
 186}
 187
 188enum {
 189        O2NM_NODE_ATTR_NUM = 0,
 190        O2NM_NODE_ATTR_PORT,
 191        O2NM_NODE_ATTR_ADDRESS,
 192        O2NM_NODE_ATTR_LOCAL,
 193};
 194
 195static ssize_t o2nm_node_num_write(struct o2nm_node *node, const char *page,
 196                                   size_t count)
 197{
 198        struct o2nm_cluster *cluster = to_o2nm_cluster_from_node(node);
 199        unsigned long tmp;
 200        char *p = (char *)page;
 201
 202        tmp = simple_strtoul(p, &p, 0);
 203        if (!p || (*p && (*p != '\n')))
 204                return -EINVAL;
 205
 206        if (tmp >= O2NM_MAX_NODES)
 207                return -ERANGE;
 208
 209        /* once we're in the cl_nodes tree networking can look us up by
 210         * node number and try to use our address and port attributes
 211         * to connect to this node.. make sure that they've been set
 212         * before writing the node attribute? */
 213        if (!test_bit(O2NM_NODE_ATTR_ADDRESS, &node->nd_set_attributes) ||
 214            !test_bit(O2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
 215                return -EINVAL; /* XXX */
 216
 217        write_lock(&cluster->cl_nodes_lock);
 218        if (cluster->cl_nodes[tmp])
 219                p = NULL;
 220        else  {
 221                cluster->cl_nodes[tmp] = node;
 222                node->nd_num = tmp;
 223                set_bit(tmp, cluster->cl_nodes_bitmap);
 224        }
 225        write_unlock(&cluster->cl_nodes_lock);
 226        if (p == NULL)
 227                return -EEXIST;
 228
 229        return count;
 230}
 231static ssize_t o2nm_node_ipv4_port_read(struct o2nm_node *node, char *page)
 232{
 233        return sprintf(page, "%u\n", ntohs(node->nd_ipv4_port));
 234}
 235
 236static ssize_t o2nm_node_ipv4_port_write(struct o2nm_node *node,
 237                                         const char *page, size_t count)
 238{
 239        unsigned long tmp;
 240        char *p = (char *)page;
 241
 242        tmp = simple_strtoul(p, &p, 0);
 243        if (!p || (*p && (*p != '\n')))
 244                return -EINVAL;
 245
 246        if (tmp == 0)
 247                return -EINVAL;
 248        if (tmp >= (u16)-1)
 249                return -ERANGE;
 250
 251        node->nd_ipv4_port = htons(tmp);
 252
 253        return count;
 254}
 255
 256static ssize_t o2nm_node_ipv4_address_read(struct o2nm_node *node, char *page)
 257{
 258        return sprintf(page, "%pI4\n", &node->nd_ipv4_address);
 259}
 260
 261static ssize_t o2nm_node_ipv4_address_write(struct o2nm_node *node,
 262                                            const char *page,
 263                                            size_t count)
 264{
 265        struct o2nm_cluster *cluster = to_o2nm_cluster_from_node(node);
 266        int ret, i;
 267        struct rb_node **p, *parent;
 268        unsigned int octets[4];
 269        __be32 ipv4_addr = 0;
 270
 271        ret = sscanf(page, "%3u.%3u.%3u.%3u", &octets[3], &octets[2],
 272                     &octets[1], &octets[0]);
 273        if (ret != 4)
 274                return -EINVAL;
 275
 276        for (i = 0; i < ARRAY_SIZE(octets); i++) {
 277                if (octets[i] > 255)
 278                        return -ERANGE;
 279                be32_add_cpu(&ipv4_addr, octets[i] << (i * 8));
 280        }
 281
 282        ret = 0;
 283        write_lock(&cluster->cl_nodes_lock);
 284        if (o2nm_node_ip_tree_lookup(cluster, ipv4_addr, &p, &parent))
 285                ret = -EEXIST;
 286        else {
 287                rb_link_node(&node->nd_ip_node, parent, p);
 288                rb_insert_color(&node->nd_ip_node, &cluster->cl_node_ip_tree);
 289        }
 290        write_unlock(&cluster->cl_nodes_lock);
 291        if (ret)
 292                return ret;
 293
 294        memcpy(&node->nd_ipv4_address, &ipv4_addr, sizeof(ipv4_addr));
 295
 296        return count;
 297}
 298
 299static ssize_t o2nm_node_local_read(struct o2nm_node *node, char *page)
 300{
 301        return sprintf(page, "%d\n", node->nd_local);
 302}
 303
 304static ssize_t o2nm_node_local_write(struct o2nm_node *node, const char *page,
 305                                     size_t count)
 306{
 307        struct o2nm_cluster *cluster = to_o2nm_cluster_from_node(node);
 308        unsigned long tmp;
 309        char *p = (char *)page;
 310        ssize_t ret;
 311
 312        tmp = simple_strtoul(p, &p, 0);
 313        if (!p || (*p && (*p != '\n')))
 314                return -EINVAL;
 315
 316        tmp = !!tmp; /* boolean of whether this node wants to be local */
 317
 318        /* setting local turns on networking rx for now so we require having
 319         * set everything else first */
 320        if (!test_bit(O2NM_NODE_ATTR_ADDRESS, &node->nd_set_attributes) ||
 321            !test_bit(O2NM_NODE_ATTR_NUM, &node->nd_set_attributes) ||
 322            !test_bit(O2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
 323                return -EINVAL; /* XXX */
 324
 325        /* the only failure case is trying to set a new local node
 326         * when a different one is already set */
 327        if (tmp && tmp == cluster->cl_has_local &&
 328            cluster->cl_local_node != node->nd_num)
 329                return -EBUSY;
 330
 331        /* bring up the rx thread if we're setting the new local node. */
 332        if (tmp && !cluster->cl_has_local) {
 333                ret = o2net_start_listening(node);
 334                if (ret)
 335                        return ret;
 336        }
 337
 338        if (!tmp && cluster->cl_has_local &&
 339            cluster->cl_local_node == node->nd_num) {
 340                o2net_stop_listening(node);
 341                cluster->cl_local_node = O2NM_INVALID_NODE_NUM;
 342        }
 343
 344        node->nd_local = tmp;
 345        if (node->nd_local) {
 346                cluster->cl_has_local = tmp;
 347                cluster->cl_local_node = node->nd_num;
 348        }
 349
 350        return count;
 351}
 352
 353struct o2nm_node_attribute {
 354        struct configfs_attribute attr;
 355        ssize_t (*show)(struct o2nm_node *, char *);
 356        ssize_t (*store)(struct o2nm_node *, const char *, size_t);
 357};
 358
 359static struct o2nm_node_attribute o2nm_node_attr_num = {
 360        .attr   = { .ca_owner = THIS_MODULE,
 361                    .ca_name = "num",
 362                    .ca_mode = S_IRUGO | S_IWUSR },
 363        .show   = o2nm_node_num_read,
 364        .store  = o2nm_node_num_write,
 365};
 366
 367static struct o2nm_node_attribute o2nm_node_attr_ipv4_port = {
 368        .attr   = { .ca_owner = THIS_MODULE,
 369                    .ca_name = "ipv4_port",
 370                    .ca_mode = S_IRUGO | S_IWUSR },
 371        .show   = o2nm_node_ipv4_port_read,
 372        .store  = o2nm_node_ipv4_port_write,
 373};
 374
 375static struct o2nm_node_attribute o2nm_node_attr_ipv4_address = {
 376        .attr   = { .ca_owner = THIS_MODULE,
 377                    .ca_name = "ipv4_address",
 378                    .ca_mode = S_IRUGO | S_IWUSR },
 379        .show   = o2nm_node_ipv4_address_read,
 380        .store  = o2nm_node_ipv4_address_write,
 381};
 382
 383static struct o2nm_node_attribute o2nm_node_attr_local = {
 384        .attr   = { .ca_owner = THIS_MODULE,
 385                    .ca_name = "local",
 386                    .ca_mode = S_IRUGO | S_IWUSR },
 387        .show   = o2nm_node_local_read,
 388        .store  = o2nm_node_local_write,
 389};
 390
 391static struct configfs_attribute *o2nm_node_attrs[] = {
 392        [O2NM_NODE_ATTR_NUM] = &o2nm_node_attr_num.attr,
 393        [O2NM_NODE_ATTR_PORT] = &o2nm_node_attr_ipv4_port.attr,
 394        [O2NM_NODE_ATTR_ADDRESS] = &o2nm_node_attr_ipv4_address.attr,
 395        [O2NM_NODE_ATTR_LOCAL] = &o2nm_node_attr_local.attr,
 396        NULL,
 397};
 398
 399static int o2nm_attr_index(struct configfs_attribute *attr)
 400{
 401        int i;
 402        for (i = 0; i < ARRAY_SIZE(o2nm_node_attrs); i++) {
 403                if (attr == o2nm_node_attrs[i])
 404                        return i;
 405        }
 406        BUG();
 407        return 0;
 408}
 409
 410static ssize_t o2nm_node_show(struct config_item *item,
 411                              struct configfs_attribute *attr,
 412                              char *page)
 413{
 414        struct o2nm_node *node = to_o2nm_node(item);
 415        struct o2nm_node_attribute *o2nm_node_attr =
 416                container_of(attr, struct o2nm_node_attribute, attr);
 417        ssize_t ret = 0;
 418
 419        if (o2nm_node_attr->show)
 420                ret = o2nm_node_attr->show(node, page);
 421        return ret;
 422}
 423
 424static ssize_t o2nm_node_store(struct config_item *item,
 425                               struct configfs_attribute *attr,
 426                               const char *page, size_t count)
 427{
 428        struct o2nm_node *node = to_o2nm_node(item);
 429        struct o2nm_node_attribute *o2nm_node_attr =
 430                container_of(attr, struct o2nm_node_attribute, attr);
 431        ssize_t ret;
 432        int attr_index = o2nm_attr_index(attr);
 433
 434        if (o2nm_node_attr->store == NULL) {
 435                ret = -EINVAL;
 436                goto out;
 437        }
 438
 439        if (test_bit(attr_index, &node->nd_set_attributes))
 440                return -EBUSY;
 441
 442        ret = o2nm_node_attr->store(node, page, count);
 443        if (ret < count)
 444                goto out;
 445
 446        set_bit(attr_index, &node->nd_set_attributes);
 447out:
 448        return ret;
 449}
 450
 451static struct configfs_item_operations o2nm_node_item_ops = {
 452        .release                = o2nm_node_release,
 453        .show_attribute         = o2nm_node_show,
 454        .store_attribute        = o2nm_node_store,
 455};
 456
 457static struct config_item_type o2nm_node_type = {
 458        .ct_item_ops    = &o2nm_node_item_ops,
 459        .ct_attrs       = o2nm_node_attrs,
 460        .ct_owner       = THIS_MODULE,
 461};
 462
 463/* node set */
 464
 465struct o2nm_node_group {
 466        struct config_group ns_group;
 467        /* some stuff? */
 468};
 469
 470#if 0
 471static struct o2nm_node_group *to_o2nm_node_group(struct config_group *group)
 472{
 473        return group ?
 474                container_of(group, struct o2nm_node_group, ns_group)
 475                : NULL;
 476}
 477#endif
 478
 479struct o2nm_cluster_attribute {
 480        struct configfs_attribute attr;
 481        ssize_t (*show)(struct o2nm_cluster *, char *);
 482        ssize_t (*store)(struct o2nm_cluster *, const char *, size_t);
 483};
 484
 485static ssize_t o2nm_cluster_attr_write(const char *page, ssize_t count,
 486                                       unsigned int *val)
 487{
 488        unsigned long tmp;
 489        char *p = (char *)page;
 490
 491        tmp = simple_strtoul(p, &p, 0);
 492        if (!p || (*p && (*p != '\n')))
 493                return -EINVAL;
 494
 495        if (tmp == 0)
 496                return -EINVAL;
 497        if (tmp >= (u32)-1)
 498                return -ERANGE;
 499
 500        *val = tmp;
 501
 502        return count;
 503}
 504
 505static ssize_t o2nm_cluster_attr_idle_timeout_ms_read(
 506        struct o2nm_cluster *cluster, char *page)
 507{
 508        return sprintf(page, "%u\n", cluster->cl_idle_timeout_ms);
 509}
 510
 511static ssize_t o2nm_cluster_attr_idle_timeout_ms_write(
 512        struct o2nm_cluster *cluster, const char *page, size_t count)
 513{
 514        ssize_t ret;
 515        unsigned int val;
 516
 517        ret =  o2nm_cluster_attr_write(page, count, &val);
 518
 519        if (ret > 0) {
 520                if (cluster->cl_idle_timeout_ms != val
 521                        && o2net_num_connected_peers()) {
 522                        mlog(ML_NOTICE,
 523                             "o2net: cannot change idle timeout after "
 524                             "the first peer has agreed to it."
 525                             "  %d connected peers\n",
 526                             o2net_num_connected_peers());
 527                        ret = -EINVAL;
 528                } else if (val <= cluster->cl_keepalive_delay_ms) {
 529                        mlog(ML_NOTICE, "o2net: idle timeout must be larger "
 530                             "than keepalive delay\n");
 531                        ret = -EINVAL;
 532                } else {
 533                        cluster->cl_idle_timeout_ms = val;
 534                }
 535        }
 536
 537        return ret;
 538}
 539
 540static ssize_t o2nm_cluster_attr_keepalive_delay_ms_read(
 541        struct o2nm_cluster *cluster, char *page)
 542{
 543        return sprintf(page, "%u\n", cluster->cl_keepalive_delay_ms);
 544}
 545
 546static ssize_t o2nm_cluster_attr_keepalive_delay_ms_write(
 547        struct o2nm_cluster *cluster, const char *page, size_t count)
 548{
 549        ssize_t ret;
 550        unsigned int val;
 551
 552        ret =  o2nm_cluster_attr_write(page, count, &val);
 553
 554        if (ret > 0) {
 555                if (cluster->cl_keepalive_delay_ms != val
 556                    && o2net_num_connected_peers()) {
 557                        mlog(ML_NOTICE,
 558                             "o2net: cannot change keepalive delay after"
 559                             " the first peer has agreed to it."
 560                             "  %d connected peers\n",
 561                             o2net_num_connected_peers());
 562                        ret = -EINVAL;
 563                } else if (val >= cluster->cl_idle_timeout_ms) {
 564                        mlog(ML_NOTICE, "o2net: keepalive delay must be "
 565                             "smaller than idle timeout\n");
 566                        ret = -EINVAL;
 567                } else {
 568                        cluster->cl_keepalive_delay_ms = val;
 569                }
 570        }
 571
 572        return ret;
 573}
 574
 575static ssize_t o2nm_cluster_attr_reconnect_delay_ms_read(
 576        struct o2nm_cluster *cluster, char *page)
 577{
 578        return sprintf(page, "%u\n", cluster->cl_reconnect_delay_ms);
 579}
 580
 581static ssize_t o2nm_cluster_attr_reconnect_delay_ms_write(
 582        struct o2nm_cluster *cluster, const char *page, size_t count)
 583{
 584        return o2nm_cluster_attr_write(page, count,
 585                                       &cluster->cl_reconnect_delay_ms);
 586}
 587
 588static ssize_t o2nm_cluster_attr_fence_method_read(
 589        struct o2nm_cluster *cluster, char *page)
 590{
 591        ssize_t ret = 0;
 592
 593        if (cluster)
 594                ret = sprintf(page, "%s\n",
 595                              o2nm_fence_method_desc[cluster->cl_fence_method]);
 596        return ret;
 597}
 598
 599static ssize_t o2nm_cluster_attr_fence_method_write(
 600        struct o2nm_cluster *cluster, const char *page, size_t count)
 601{
 602        unsigned int i;
 603
 604        if (page[count - 1] != '\n')
 605                goto bail;
 606
 607        for (i = 0; i < O2NM_FENCE_METHODS; ++i) {
 608                if (count != strlen(o2nm_fence_method_desc[i]) + 1)
 609                        continue;
 610                if (strncasecmp(page, o2nm_fence_method_desc[i], count - 1))
 611                        continue;
 612                if (cluster->cl_fence_method != i) {
 613                        printk(KERN_INFO "ocfs2: Changing fence method to %s\n",
 614                               o2nm_fence_method_desc[i]);
 615                        cluster->cl_fence_method = i;
 616                }
 617                return count;
 618        }
 619
 620bail:
 621        return -EINVAL;
 622}
 623
 624static struct o2nm_cluster_attribute o2nm_cluster_attr_idle_timeout_ms = {
 625        .attr   = { .ca_owner = THIS_MODULE,
 626                    .ca_name = "idle_timeout_ms",
 627                    .ca_mode = S_IRUGO | S_IWUSR },
 628        .show   = o2nm_cluster_attr_idle_timeout_ms_read,
 629        .store  = o2nm_cluster_attr_idle_timeout_ms_write,
 630};
 631
 632static struct o2nm_cluster_attribute o2nm_cluster_attr_keepalive_delay_ms = {
 633        .attr   = { .ca_owner = THIS_MODULE,
 634                    .ca_name = "keepalive_delay_ms",
 635                    .ca_mode = S_IRUGO | S_IWUSR },
 636        .show   = o2nm_cluster_attr_keepalive_delay_ms_read,
 637        .store  = o2nm_cluster_attr_keepalive_delay_ms_write,
 638};
 639
 640static struct o2nm_cluster_attribute o2nm_cluster_attr_reconnect_delay_ms = {
 641        .attr   = { .ca_owner = THIS_MODULE,
 642                    .ca_name = "reconnect_delay_ms",
 643                    .ca_mode = S_IRUGO | S_IWUSR },
 644        .show   = o2nm_cluster_attr_reconnect_delay_ms_read,
 645        .store  = o2nm_cluster_attr_reconnect_delay_ms_write,
 646};
 647
 648static struct o2nm_cluster_attribute o2nm_cluster_attr_fence_method = {
 649        .attr   = { .ca_owner = THIS_MODULE,
 650                    .ca_name = "fence_method",
 651                    .ca_mode = S_IRUGO | S_IWUSR },
 652        .show   = o2nm_cluster_attr_fence_method_read,
 653        .store  = o2nm_cluster_attr_fence_method_write,
 654};
 655
 656static struct configfs_attribute *o2nm_cluster_attrs[] = {
 657        &o2nm_cluster_attr_idle_timeout_ms.attr,
 658        &o2nm_cluster_attr_keepalive_delay_ms.attr,
 659        &o2nm_cluster_attr_reconnect_delay_ms.attr,
 660        &o2nm_cluster_attr_fence_method.attr,
 661        NULL,
 662};
 663static ssize_t o2nm_cluster_show(struct config_item *item,
 664                                 struct configfs_attribute *attr,
 665                                 char *page)
 666{
 667        struct o2nm_cluster *cluster = to_o2nm_cluster(item);
 668        struct o2nm_cluster_attribute *o2nm_cluster_attr =
 669                container_of(attr, struct o2nm_cluster_attribute, attr);
 670        ssize_t ret = 0;
 671
 672        if (o2nm_cluster_attr->show)
 673                ret = o2nm_cluster_attr->show(cluster, page);
 674        return ret;
 675}
 676
 677static ssize_t o2nm_cluster_store(struct config_item *item,
 678                                  struct configfs_attribute *attr,
 679                                  const char *page, size_t count)
 680{
 681        struct o2nm_cluster *cluster = to_o2nm_cluster(item);
 682        struct o2nm_cluster_attribute *o2nm_cluster_attr =
 683                container_of(attr, struct o2nm_cluster_attribute, attr);
 684        ssize_t ret;
 685
 686        if (o2nm_cluster_attr->store == NULL) {
 687                ret = -EINVAL;
 688                goto out;
 689        }
 690
 691        ret = o2nm_cluster_attr->store(cluster, page, count);
 692        if (ret < count)
 693                goto out;
 694out:
 695        return ret;
 696}
 697
 698static struct config_item *o2nm_node_group_make_item(struct config_group *group,
 699                                                     const char *name)
 700{
 701        struct o2nm_node *node = NULL;
 702
 703        if (strlen(name) > O2NM_MAX_NAME_LEN)
 704                return ERR_PTR(-ENAMETOOLONG);
 705
 706        node = kzalloc(sizeof(struct o2nm_node), GFP_KERNEL);
 707        if (node == NULL)
 708                return ERR_PTR(-ENOMEM);
 709
 710        strcpy(node->nd_name, name); /* use item.ci_namebuf instead? */
 711        config_item_init_type_name(&node->nd_item, name, &o2nm_node_type);
 712        spin_lock_init(&node->nd_lock);
 713
 714        mlog(ML_CLUSTER, "o2nm: Registering node %s\n", name);
 715
 716        return &node->nd_item;
 717}
 718
 719static void o2nm_node_group_drop_item(struct config_group *group,
 720                                      struct config_item *item)
 721{
 722        struct o2nm_node *node = to_o2nm_node(item);
 723        struct o2nm_cluster *cluster = to_o2nm_cluster(group->cg_item.ci_parent);
 724
 725        o2net_disconnect_node(node);
 726
 727        if (cluster->cl_has_local &&
 728            (cluster->cl_local_node == node->nd_num)) {
 729                cluster->cl_has_local = 0;
 730                cluster->cl_local_node = O2NM_INVALID_NODE_NUM;
 731                o2net_stop_listening(node);
 732        }
 733
 734        /* XXX call into net to stop this node from trading messages */
 735
 736        write_lock(&cluster->cl_nodes_lock);
 737
 738        /* XXX sloppy */
 739        if (node->nd_ipv4_address)
 740                rb_erase(&node->nd_ip_node, &cluster->cl_node_ip_tree);
 741
 742        /* nd_num might be 0 if the node number hasn't been set.. */
 743        if (cluster->cl_nodes[node->nd_num] == node) {
 744                cluster->cl_nodes[node->nd_num] = NULL;
 745                clear_bit(node->nd_num, cluster->cl_nodes_bitmap);
 746        }
 747        write_unlock(&cluster->cl_nodes_lock);
 748
 749        mlog(ML_CLUSTER, "o2nm: Unregistered node %s\n",
 750             config_item_name(&node->nd_item));
 751
 752        config_item_put(item);
 753}
 754
 755static struct configfs_group_operations o2nm_node_group_group_ops = {
 756        .make_item      = o2nm_node_group_make_item,
 757        .drop_item      = o2nm_node_group_drop_item,
 758};
 759
 760static struct config_item_type o2nm_node_group_type = {
 761        .ct_group_ops   = &o2nm_node_group_group_ops,
 762        .ct_owner       = THIS_MODULE,
 763};
 764
 765/* cluster */
 766
 767static void o2nm_cluster_release(struct config_item *item)
 768{
 769        struct o2nm_cluster *cluster = to_o2nm_cluster(item);
 770
 771        kfree(cluster->cl_group.default_groups);
 772        kfree(cluster);
 773}
 774
 775static struct configfs_item_operations o2nm_cluster_item_ops = {
 776        .release        = o2nm_cluster_release,
 777        .show_attribute         = o2nm_cluster_show,
 778        .store_attribute        = o2nm_cluster_store,
 779};
 780
 781static struct config_item_type o2nm_cluster_type = {
 782        .ct_item_ops    = &o2nm_cluster_item_ops,
 783        .ct_attrs       = o2nm_cluster_attrs,
 784        .ct_owner       = THIS_MODULE,
 785};
 786
 787/* cluster set */
 788
 789struct o2nm_cluster_group {
 790        struct configfs_subsystem cs_subsys;
 791        /* some stuff? */
 792};
 793
 794#if 0
 795static struct o2nm_cluster_group *to_o2nm_cluster_group(struct config_group *group)
 796{
 797        return group ?
 798                container_of(to_configfs_subsystem(group), struct o2nm_cluster_group, cs_subsys)
 799               : NULL;
 800}
 801#endif
 802
 803static struct config_group *o2nm_cluster_group_make_group(struct config_group *group,
 804                                                          const char *name)
 805{
 806        struct o2nm_cluster *cluster = NULL;
 807        struct o2nm_node_group *ns = NULL;
 808        struct config_group *o2hb_group = NULL, *ret = NULL;
 809        void *defs = NULL;
 810
 811        /* this runs under the parent dir's i_mutex; there can be only
 812         * one caller in here at a time */
 813        if (o2nm_single_cluster)
 814                return ERR_PTR(-ENOSPC);
 815
 816        cluster = kzalloc(sizeof(struct o2nm_cluster), GFP_KERNEL);
 817        ns = kzalloc(sizeof(struct o2nm_node_group), GFP_KERNEL);
 818        defs = kcalloc(3, sizeof(struct config_group *), GFP_KERNEL);
 819        o2hb_group = o2hb_alloc_hb_set();
 820        if (cluster == NULL || ns == NULL || o2hb_group == NULL || defs == NULL)
 821                goto out;
 822
 823        config_group_init_type_name(&cluster->cl_group, name,
 824                                    &o2nm_cluster_type);
 825        config_group_init_type_name(&ns->ns_group, "node",
 826                                    &o2nm_node_group_type);
 827
 828        cluster->cl_group.default_groups = defs;
 829        cluster->cl_group.default_groups[0] = &ns->ns_group;
 830        cluster->cl_group.default_groups[1] = o2hb_group;
 831        cluster->cl_group.default_groups[2] = NULL;
 832        rwlock_init(&cluster->cl_nodes_lock);
 833        cluster->cl_node_ip_tree = RB_ROOT;
 834        cluster->cl_reconnect_delay_ms = O2NET_RECONNECT_DELAY_MS_DEFAULT;
 835        cluster->cl_idle_timeout_ms    = O2NET_IDLE_TIMEOUT_MS_DEFAULT;
 836        cluster->cl_keepalive_delay_ms = O2NET_KEEPALIVE_DELAY_MS_DEFAULT;
 837        cluster->cl_fence_method       = O2NM_FENCE_RESET;
 838
 839        ret = &cluster->cl_group;
 840        o2nm_single_cluster = cluster;
 841
 842out:
 843        if (ret == NULL) {
 844                kfree(cluster);
 845                kfree(ns);
 846                o2hb_free_hb_set(o2hb_group);
 847                kfree(defs);
 848                ret = ERR_PTR(-ENOMEM);
 849        }
 850
 851        return ret;
 852}
 853
 854static void o2nm_cluster_group_drop_item(struct config_group *group, struct config_item *item)
 855{
 856        struct o2nm_cluster *cluster = to_o2nm_cluster(item);
 857        int i;
 858        struct config_item *killme;
 859
 860        BUG_ON(o2nm_single_cluster != cluster);
 861        o2nm_single_cluster = NULL;
 862
 863        for (i = 0; cluster->cl_group.default_groups[i]; i++) {
 864                killme = &cluster->cl_group.default_groups[i]->cg_item;
 865                cluster->cl_group.default_groups[i] = NULL;
 866                config_item_put(killme);
 867        }
 868
 869        config_item_put(item);
 870}
 871
 872static struct configfs_group_operations o2nm_cluster_group_group_ops = {
 873        .make_group     = o2nm_cluster_group_make_group,
 874        .drop_item      = o2nm_cluster_group_drop_item,
 875};
 876
 877static struct config_item_type o2nm_cluster_group_type = {
 878        .ct_group_ops   = &o2nm_cluster_group_group_ops,
 879        .ct_owner       = THIS_MODULE,
 880};
 881
 882static struct o2nm_cluster_group o2nm_cluster_group = {
 883        .cs_subsys = {
 884                .su_group = {
 885                        .cg_item = {
 886                                .ci_namebuf = "cluster",
 887                                .ci_type = &o2nm_cluster_group_type,
 888                        },
 889                },
 890        },
 891};
 892
 893int o2nm_depend_item(struct config_item *item)
 894{
 895        return configfs_depend_item(&o2nm_cluster_group.cs_subsys, item);
 896}
 897
 898void o2nm_undepend_item(struct config_item *item)
 899{
 900        configfs_undepend_item(&o2nm_cluster_group.cs_subsys, item);
 901}
 902
 903int o2nm_depend_this_node(void)
 904{
 905        int ret = 0;
 906        struct o2nm_node *local_node;
 907
 908        local_node = o2nm_get_node_by_num(o2nm_this_node());
 909        if (!local_node) {
 910                ret = -EINVAL;
 911                goto out;
 912        }
 913
 914        ret = o2nm_depend_item(&local_node->nd_item);
 915        o2nm_node_put(local_node);
 916
 917out:
 918        return ret;
 919}
 920
 921void o2nm_undepend_this_node(void)
 922{
 923        struct o2nm_node *local_node;
 924
 925        local_node = o2nm_get_node_by_num(o2nm_this_node());
 926        BUG_ON(!local_node);
 927
 928        o2nm_undepend_item(&local_node->nd_item);
 929        o2nm_node_put(local_node);
 930}
 931
 932
 933static void __exit exit_o2nm(void)
 934{
 935        /* XXX sync with hb callbacks and shut down hb? */
 936        o2net_unregister_hb_callbacks();
 937        configfs_unregister_subsystem(&o2nm_cluster_group.cs_subsys);
 938        o2cb_sys_shutdown();
 939
 940        o2net_exit();
 941        o2hb_exit();
 942}
 943
 944static int __init init_o2nm(void)
 945{
 946        int ret = -1;
 947
 948        cluster_print_version();
 949
 950        ret = o2hb_init();
 951        if (ret)
 952                goto out;
 953
 954        ret = o2net_init();
 955        if (ret)
 956                goto out_o2hb;
 957
 958        ret = o2net_register_hb_callbacks();
 959        if (ret)
 960                goto out_o2net;
 961
 962        config_group_init(&o2nm_cluster_group.cs_subsys.su_group);
 963        mutex_init(&o2nm_cluster_group.cs_subsys.su_mutex);
 964        ret = configfs_register_subsystem(&o2nm_cluster_group.cs_subsys);
 965        if (ret) {
 966                printk(KERN_ERR "nodemanager: Registration returned %d\n", ret);
 967                goto out_callbacks;
 968        }
 969
 970        ret = o2cb_sys_init();
 971        if (!ret)
 972                goto out;
 973
 974        configfs_unregister_subsystem(&o2nm_cluster_group.cs_subsys);
 975out_callbacks:
 976        o2net_unregister_hb_callbacks();
 977out_o2net:
 978        o2net_exit();
 979out_o2hb:
 980        o2hb_exit();
 981out:
 982        return ret;
 983}
 984
 985MODULE_AUTHOR("Oracle");
 986MODULE_LICENSE("GPL");
 987
 988module_init(init_o2nm)
 989module_exit(exit_o2nm)
 990