linux/drivers/staging/ramster/cluster/nodemanager.c
<<
>>
Prefs
   1/* -*- mode: c; c-basic-offset: 8; -*-
   2 * vim: noexpandtab sw=8 ts=8 sts=0:
   3 *
   4 * Copyright (C) 2004, 2005, 2012 Oracle.  All rights reserved.
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public
   8 * License as published by the Free Software Foundation; either
   9 * version 2 of the License, or (at your option) any later version.
  10 *
  11 * This program is distributed in the hope that it will be useful,
  12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * General Public License for more details.
  15 *
  16 * You should have received a copy of the GNU General Public
  17 * License along with this program; if not, write to the
  18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  19 * Boston, MA 021110-1307, USA.
  20 */
  21
  22#include <linux/slab.h>
  23#include <linux/kernel.h>
  24#include <linux/module.h>
  25#include <linux/configfs.h>
  26
  27#include "tcp.h"
  28#include "nodemanager.h"
  29#include "heartbeat.h"
  30#include "masklog.h"
  31
  32/* for now we operate under the assertion that there can be only one
  33 * cluster active at a time.  Changing this will require trickling
  34 * cluster references throughout where nodes are looked up */
  35struct r2nm_cluster *r2nm_single_cluster;
  36
  37char *r2nm_fence_method_desc[R2NM_FENCE_METHODS] = {
  38                "reset",        /* R2NM_FENCE_RESET */
  39                "panic",        /* R2NM_FENCE_PANIC */
  40};
  41
  42struct r2nm_node *r2nm_get_node_by_num(u8 node_num)
  43{
  44        struct r2nm_node *node = NULL;
  45
  46        if (node_num >= R2NM_MAX_NODES || r2nm_single_cluster == NULL)
  47                goto out;
  48
  49        read_lock(&r2nm_single_cluster->cl_nodes_lock);
  50        node = r2nm_single_cluster->cl_nodes[node_num];
  51        if (node)
  52                config_item_get(&node->nd_item);
  53        read_unlock(&r2nm_single_cluster->cl_nodes_lock);
  54out:
  55        return node;
  56}
  57EXPORT_SYMBOL_GPL(r2nm_get_node_by_num);
  58
  59int r2nm_configured_node_map(unsigned long *map, unsigned bytes)
  60{
  61        struct r2nm_cluster *cluster = r2nm_single_cluster;
  62
  63        BUG_ON(bytes < (sizeof(cluster->cl_nodes_bitmap)));
  64
  65        if (cluster == NULL)
  66                return -EINVAL;
  67
  68        read_lock(&cluster->cl_nodes_lock);
  69        memcpy(map, cluster->cl_nodes_bitmap, sizeof(cluster->cl_nodes_bitmap));
  70        read_unlock(&cluster->cl_nodes_lock);
  71
  72        return 0;
  73}
  74EXPORT_SYMBOL_GPL(r2nm_configured_node_map);
  75
  76static struct r2nm_node *r2nm_node_ip_tree_lookup(struct r2nm_cluster *cluster,
  77                                                  __be32 ip_needle,
  78                                                  struct rb_node ***ret_p,
  79                                                  struct rb_node **ret_parent)
  80{
  81        struct rb_node **p = &cluster->cl_node_ip_tree.rb_node;
  82        struct rb_node *parent = NULL;
  83        struct r2nm_node *node, *ret = NULL;
  84
  85        while (*p) {
  86                int cmp;
  87
  88                parent = *p;
  89                node = rb_entry(parent, struct r2nm_node, nd_ip_node);
  90
  91                cmp = memcmp(&ip_needle, &node->nd_ipv4_address,
  92                                sizeof(ip_needle));
  93                if (cmp < 0)
  94                        p = &(*p)->rb_left;
  95                else if (cmp > 0)
  96                        p = &(*p)->rb_right;
  97                else {
  98                        ret = node;
  99                        break;
 100                }
 101        }
 102
 103        if (ret_p != NULL)
 104                *ret_p = p;
 105        if (ret_parent != NULL)
 106                *ret_parent = parent;
 107
 108        return ret;
 109}
 110
 111struct r2nm_node *r2nm_get_node_by_ip(__be32 addr)
 112{
 113        struct r2nm_node *node = NULL;
 114        struct r2nm_cluster *cluster = r2nm_single_cluster;
 115
 116        if (cluster == NULL)
 117                goto out;
 118
 119        read_lock(&cluster->cl_nodes_lock);
 120        node = r2nm_node_ip_tree_lookup(cluster, addr, NULL, NULL);
 121        if (node)
 122                config_item_get(&node->nd_item);
 123        read_unlock(&cluster->cl_nodes_lock);
 124
 125out:
 126        return node;
 127}
 128EXPORT_SYMBOL_GPL(r2nm_get_node_by_ip);
 129
 130void r2nm_node_put(struct r2nm_node *node)
 131{
 132        config_item_put(&node->nd_item);
 133}
 134EXPORT_SYMBOL_GPL(r2nm_node_put);
 135
 136void r2nm_node_get(struct r2nm_node *node)
 137{
 138        config_item_get(&node->nd_item);
 139}
 140EXPORT_SYMBOL_GPL(r2nm_node_get);
 141
 142u8 r2nm_this_node(void)
 143{
 144        u8 node_num = R2NM_MAX_NODES;
 145
 146        if (r2nm_single_cluster && r2nm_single_cluster->cl_has_local)
 147                node_num = r2nm_single_cluster->cl_local_node;
 148
 149        return node_num;
 150}
 151EXPORT_SYMBOL_GPL(r2nm_this_node);
 152
 153/* node configfs bits */
 154
 155static struct r2nm_cluster *to_r2nm_cluster(struct config_item *item)
 156{
 157        return item ?
 158                container_of(to_config_group(item), struct r2nm_cluster,
 159                             cl_group)
 160                : NULL;
 161}
 162
 163static struct r2nm_node *to_r2nm_node(struct config_item *item)
 164{
 165        return item ? container_of(item, struct r2nm_node, nd_item) : NULL;
 166}
 167
 168static void r2nm_node_release(struct config_item *item)
 169{
 170        struct r2nm_node *node = to_r2nm_node(item);
 171        kfree(node);
 172}
 173
 174static ssize_t r2nm_node_num_read(struct r2nm_node *node, char *page)
 175{
 176        return sprintf(page, "%d\n", node->nd_num);
 177}
 178
 179static struct r2nm_cluster *to_r2nm_cluster_from_node(struct r2nm_node *node)
 180{
 181        /* through the first node_set .parent
 182         * mycluster/nodes/mynode == r2nm_cluster->r2nm_node_group->r2nm_node */
 183        return to_r2nm_cluster(node->nd_item.ci_parent->ci_parent);
 184}
 185
 186enum {
 187        R2NM_NODE_ATTR_NUM = 0,
 188        R2NM_NODE_ATTR_PORT,
 189        R2NM_NODE_ATTR_ADDRESS,
 190        R2NM_NODE_ATTR_LOCAL,
 191};
 192
 193static ssize_t r2nm_node_num_write(struct r2nm_node *node, const char *page,
 194                                   size_t count)
 195{
 196        struct r2nm_cluster *cluster = to_r2nm_cluster_from_node(node);
 197        unsigned long tmp;
 198        char *p = (char *)page;
 199        int err;
 200
 201        err = kstrtoul(p, 10, &tmp);
 202        if (err)
 203                return err;
 204
 205        if (tmp >= R2NM_MAX_NODES)
 206                return -ERANGE;
 207
 208        /* once we're in the cl_nodes tree networking can look us up by
 209         * node number and try to use our address and port attributes
 210         * to connect to this node.. make sure that they've been set
 211         * before writing the node attribute? */
 212        if (!test_bit(R2NM_NODE_ATTR_ADDRESS, &node->nd_set_attributes) ||
 213            !test_bit(R2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
 214                return -EINVAL; /* XXX */
 215
 216        write_lock(&cluster->cl_nodes_lock);
 217        if (cluster->cl_nodes[tmp])
 218                p = NULL;
 219        else  {
 220                cluster->cl_nodes[tmp] = node;
 221                node->nd_num = tmp;
 222                set_bit(tmp, cluster->cl_nodes_bitmap);
 223        }
 224        write_unlock(&cluster->cl_nodes_lock);
 225        if (p == NULL)
 226                return -EEXIST;
 227
 228        return count;
 229}
 230static ssize_t r2nm_node_ipv4_port_read(struct r2nm_node *node, char *page)
 231{
 232        return sprintf(page, "%u\n", ntohs(node->nd_ipv4_port));
 233}
 234
 235static ssize_t r2nm_node_ipv4_port_write(struct r2nm_node *node,
 236                                         const char *page, size_t count)
 237{
 238        unsigned long tmp;
 239        char *p = (char *)page;
 240        int err;
 241
 242        err = kstrtoul(p, 10, &tmp);
 243        if (err)
 244                return err;
 245
 246        if (tmp == 0)
 247                return -EINVAL;
 248        if (tmp >= (u16)-1)
 249                return -ERANGE;
 250
 251        node->nd_ipv4_port = htons(tmp);
 252
 253        return count;
 254}
 255
 256static ssize_t r2nm_node_ipv4_address_read(struct r2nm_node *node, char *page)
 257{
 258        return sprintf(page, "%pI4\n", &node->nd_ipv4_address);
 259}
 260
 261static ssize_t r2nm_node_ipv4_address_write(struct r2nm_node *node,
 262                                            const char *page,
 263                                            size_t count)
 264{
 265        struct r2nm_cluster *cluster = to_r2nm_cluster_from_node(node);
 266        int ret, i;
 267        struct rb_node **p, *parent;
 268        unsigned int octets[4];
 269        __be32 ipv4_addr = 0;
 270
 271        ret = sscanf(page, "%3u.%3u.%3u.%3u", &octets[3], &octets[2],
 272                     &octets[1], &octets[0]);
 273        if (ret != 4)
 274                return -EINVAL;
 275
 276        for (i = 0; i < ARRAY_SIZE(octets); i++) {
 277                if (octets[i] > 255)
 278                        return -ERANGE;
 279                be32_add_cpu(&ipv4_addr, octets[i] << (i * 8));
 280        }
 281
 282        ret = 0;
 283        write_lock(&cluster->cl_nodes_lock);
 284        if (r2nm_node_ip_tree_lookup(cluster, ipv4_addr, &p, &parent))
 285                ret = -EEXIST;
 286        else {
 287                rb_link_node(&node->nd_ip_node, parent, p);
 288                rb_insert_color(&node->nd_ip_node, &cluster->cl_node_ip_tree);
 289        }
 290        write_unlock(&cluster->cl_nodes_lock);
 291        if (ret)
 292                return ret;
 293
 294        memcpy(&node->nd_ipv4_address, &ipv4_addr, sizeof(ipv4_addr));
 295
 296        return count;
 297}
 298
 299static ssize_t r2nm_node_local_read(struct r2nm_node *node, char *page)
 300{
 301        return sprintf(page, "%d\n", node->nd_local);
 302}
 303
 304static ssize_t r2nm_node_local_write(struct r2nm_node *node, const char *page,
 305                                     size_t count)
 306{
 307        struct r2nm_cluster *cluster = to_r2nm_cluster_from_node(node);
 308        unsigned long tmp;
 309        char *p = (char *)page;
 310        ssize_t ret;
 311        int err;
 312
 313        err = kstrtoul(p, 10, &tmp);
 314        if (err)
 315                return err;
 316
 317        tmp = !!tmp; /* boolean of whether this node wants to be local */
 318
 319        /* setting local turns on networking rx for now so we require having
 320         * set everything else first */
 321        if (!test_bit(R2NM_NODE_ATTR_ADDRESS, &node->nd_set_attributes) ||
 322            !test_bit(R2NM_NODE_ATTR_NUM, &node->nd_set_attributes) ||
 323            !test_bit(R2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
 324                return -EINVAL; /* XXX */
 325
 326        /* the only failure case is trying to set a new local node
 327         * when a different one is already set */
 328        if (tmp && tmp == cluster->cl_has_local &&
 329            cluster->cl_local_node != node->nd_num)
 330                return -EBUSY;
 331
 332        /* bring up the rx thread if we're setting the new local node. */
 333        if (tmp && !cluster->cl_has_local) {
 334                ret = r2net_start_listening(node);
 335                if (ret)
 336                        return ret;
 337        }
 338
 339        if (!tmp && cluster->cl_has_local &&
 340            cluster->cl_local_node == node->nd_num) {
 341                r2net_stop_listening(node);
 342                cluster->cl_local_node = R2NM_INVALID_NODE_NUM;
 343        }
 344
 345        node->nd_local = tmp;
 346        if (node->nd_local) {
 347                cluster->cl_has_local = tmp;
 348                cluster->cl_local_node = node->nd_num;
 349        }
 350
 351        return count;
 352}
 353
 354struct r2nm_node_attribute {
 355        struct configfs_attribute attr;
 356        ssize_t (*show)(struct r2nm_node *, char *);
 357        ssize_t (*store)(struct r2nm_node *, const char *, size_t);
 358};
 359
 360static struct r2nm_node_attribute r2nm_node_attr_num = {
 361        .attr   = { .ca_owner = THIS_MODULE,
 362                    .ca_name = "num",
 363                    .ca_mode = S_IRUGO | S_IWUSR },
 364        .show   = r2nm_node_num_read,
 365        .store  = r2nm_node_num_write,
 366};
 367
 368static struct r2nm_node_attribute r2nm_node_attr_ipv4_port = {
 369        .attr   = { .ca_owner = THIS_MODULE,
 370                    .ca_name = "ipv4_port",
 371                    .ca_mode = S_IRUGO | S_IWUSR },
 372        .show   = r2nm_node_ipv4_port_read,
 373        .store  = r2nm_node_ipv4_port_write,
 374};
 375
 376static struct r2nm_node_attribute r2nm_node_attr_ipv4_address = {
 377        .attr   = { .ca_owner = THIS_MODULE,
 378                    .ca_name = "ipv4_address",
 379                    .ca_mode = S_IRUGO | S_IWUSR },
 380        .show   = r2nm_node_ipv4_address_read,
 381        .store  = r2nm_node_ipv4_address_write,
 382};
 383
 384static struct r2nm_node_attribute r2nm_node_attr_local = {
 385        .attr   = { .ca_owner = THIS_MODULE,
 386                    .ca_name = "local",
 387                    .ca_mode = S_IRUGO | S_IWUSR },
 388        .show   = r2nm_node_local_read,
 389        .store  = r2nm_node_local_write,
 390};
 391
 392static struct configfs_attribute *r2nm_node_attrs[] = {
 393        [R2NM_NODE_ATTR_NUM] = &r2nm_node_attr_num.attr,
 394        [R2NM_NODE_ATTR_PORT] = &r2nm_node_attr_ipv4_port.attr,
 395        [R2NM_NODE_ATTR_ADDRESS] = &r2nm_node_attr_ipv4_address.attr,
 396        [R2NM_NODE_ATTR_LOCAL] = &r2nm_node_attr_local.attr,
 397        NULL,
 398};
 399
 400static int r2nm_attr_index(struct configfs_attribute *attr)
 401{
 402        int i;
 403        for (i = 0; i < ARRAY_SIZE(r2nm_node_attrs); i++) {
 404                if (attr == r2nm_node_attrs[i])
 405                        return i;
 406        }
 407        BUG();
 408        return 0;
 409}
 410
 411static ssize_t r2nm_node_show(struct config_item *item,
 412                              struct configfs_attribute *attr,
 413                              char *page)
 414{
 415        struct r2nm_node *node = to_r2nm_node(item);
 416        struct r2nm_node_attribute *r2nm_node_attr =
 417                container_of(attr, struct r2nm_node_attribute, attr);
 418        ssize_t ret = 0;
 419
 420        if (r2nm_node_attr->show)
 421                ret = r2nm_node_attr->show(node, page);
 422        return ret;
 423}
 424
 425static ssize_t r2nm_node_store(struct config_item *item,
 426                               struct configfs_attribute *attr,
 427                               const char *page, size_t count)
 428{
 429        struct r2nm_node *node = to_r2nm_node(item);
 430        struct r2nm_node_attribute *r2nm_node_attr =
 431                container_of(attr, struct r2nm_node_attribute, attr);
 432        ssize_t ret;
 433        int attr_index = r2nm_attr_index(attr);
 434
 435        if (r2nm_node_attr->store == NULL) {
 436                ret = -EINVAL;
 437                goto out;
 438        }
 439
 440        if (test_bit(attr_index, &node->nd_set_attributes))
 441                return -EBUSY;
 442
 443        ret = r2nm_node_attr->store(node, page, count);
 444        if (ret < count)
 445                goto out;
 446
 447        set_bit(attr_index, &node->nd_set_attributes);
 448out:
 449        return ret;
 450}
 451
 452static struct configfs_item_operations r2nm_node_item_ops = {
 453        .release                = r2nm_node_release,
 454        .show_attribute         = r2nm_node_show,
 455        .store_attribute        = r2nm_node_store,
 456};
 457
 458static struct config_item_type r2nm_node_type = {
 459        .ct_item_ops    = &r2nm_node_item_ops,
 460        .ct_attrs       = r2nm_node_attrs,
 461        .ct_owner       = THIS_MODULE,
 462};
 463
 464/* node set */
 465
 466struct r2nm_node_group {
 467        struct config_group ns_group;
 468        /* some stuff? */
 469};
 470
 471#if 0
 472static struct r2nm_node_group *to_r2nm_node_group(struct config_group *group)
 473{
 474        return group ?
 475                container_of(group, struct r2nm_node_group, ns_group)
 476                : NULL;
 477}
 478#endif
 479
 480struct r2nm_cluster_attribute {
 481        struct configfs_attribute attr;
 482        ssize_t (*show)(struct r2nm_cluster *, char *);
 483        ssize_t (*store)(struct r2nm_cluster *, const char *, size_t);
 484};
 485
 486static ssize_t r2nm_cluster_attr_write(const char *page, ssize_t count,
 487                                        unsigned int *val)
 488{
 489        unsigned long tmp;
 490        char *p = (char *)page;
 491        int err;
 492
 493        err = kstrtoul(p, 10, &tmp);
 494        if (err)
 495                return err;
 496
 497        if (tmp == 0)
 498                return -EINVAL;
 499        if (tmp >= (u32)-1)
 500                return -ERANGE;
 501
 502        *val = tmp;
 503
 504        return count;
 505}
 506
 507static ssize_t r2nm_cluster_attr_idle_timeout_ms_read(
 508        struct r2nm_cluster *cluster, char *page)
 509{
 510        return sprintf(page, "%u\n", cluster->cl_idle_timeout_ms);
 511}
 512
 513static ssize_t r2nm_cluster_attr_idle_timeout_ms_write(
 514        struct r2nm_cluster *cluster, const char *page, size_t count)
 515{
 516        ssize_t ret;
 517        unsigned int val = 0;
 518
 519        ret =  r2nm_cluster_attr_write(page, count, &val);
 520
 521        if (ret > 0) {
 522                if (cluster->cl_idle_timeout_ms != val
 523                        && r2net_num_connected_peers()) {
 524                        mlog(ML_NOTICE,
 525                             "r2net: cannot change idle timeout after "
 526                             "the first peer has agreed to it."
 527                             "  %d connected peers\n",
 528                             r2net_num_connected_peers());
 529                        ret = -EINVAL;
 530                } else if (val <= cluster->cl_keepalive_delay_ms) {
 531                        mlog(ML_NOTICE, "r2net: idle timeout must be larger "
 532                             "than keepalive delay\n");
 533                        ret = -EINVAL;
 534                } else {
 535                        cluster->cl_idle_timeout_ms = val;
 536                }
 537        }
 538
 539        return ret;
 540}
 541
 542static ssize_t r2nm_cluster_attr_keepalive_delay_ms_read(
 543        struct r2nm_cluster *cluster, char *page)
 544{
 545        return sprintf(page, "%u\n", cluster->cl_keepalive_delay_ms);
 546}
 547
 548static ssize_t r2nm_cluster_attr_keepalive_delay_ms_write(
 549        struct r2nm_cluster *cluster, const char *page, size_t count)
 550{
 551        ssize_t ret;
 552        unsigned int val = 0;
 553
 554        ret =  r2nm_cluster_attr_write(page, count, &val);
 555
 556        if (ret > 0) {
 557                if (cluster->cl_keepalive_delay_ms != val
 558                    && r2net_num_connected_peers()) {
 559                        mlog(ML_NOTICE,
 560                             "r2net: cannot change keepalive delay after"
 561                             " the first peer has agreed to it."
 562                             "  %d connected peers\n",
 563                             r2net_num_connected_peers());
 564                        ret = -EINVAL;
 565                } else if (val >= cluster->cl_idle_timeout_ms) {
 566                        mlog(ML_NOTICE, "r2net: keepalive delay must be "
 567                             "smaller than idle timeout\n");
 568                        ret = -EINVAL;
 569                } else {
 570                        cluster->cl_keepalive_delay_ms = val;
 571                }
 572        }
 573
 574        return ret;
 575}
 576
 577static ssize_t r2nm_cluster_attr_reconnect_delay_ms_read(
 578        struct r2nm_cluster *cluster, char *page)
 579{
 580        return sprintf(page, "%u\n", cluster->cl_reconnect_delay_ms);
 581}
 582
 583static ssize_t r2nm_cluster_attr_reconnect_delay_ms_write(
 584        struct r2nm_cluster *cluster, const char *page, size_t count)
 585{
 586        return r2nm_cluster_attr_write(page, count,
 587                                        &cluster->cl_reconnect_delay_ms);
 588}
 589
 590static ssize_t r2nm_cluster_attr_fence_method_read(
 591        struct r2nm_cluster *cluster, char *page)
 592{
 593        ssize_t ret = 0;
 594
 595        if (cluster)
 596                ret = sprintf(page, "%s\n",
 597                              r2nm_fence_method_desc[cluster->cl_fence_method]);
 598        return ret;
 599}
 600
 601static ssize_t r2nm_cluster_attr_fence_method_write(
 602        struct r2nm_cluster *cluster, const char *page, size_t count)
 603{
 604        unsigned int i;
 605
 606        if (page[count - 1] != '\n')
 607                goto bail;
 608
 609        for (i = 0; i < R2NM_FENCE_METHODS; ++i) {
 610                if (count != strlen(r2nm_fence_method_desc[i]) + 1)
 611                        continue;
 612                if (strncasecmp(page, r2nm_fence_method_desc[i], count - 1))
 613                        continue;
 614                if (cluster->cl_fence_method != i) {
 615                        printk(KERN_INFO "ramster: Changing fence method to %s\n",
 616                               r2nm_fence_method_desc[i]);
 617                        cluster->cl_fence_method = i;
 618                }
 619                return count;
 620        }
 621
 622bail:
 623        return -EINVAL;
 624}
 625
 626static struct r2nm_cluster_attribute r2nm_cluster_attr_idle_timeout_ms = {
 627        .attr   = { .ca_owner = THIS_MODULE,
 628                    .ca_name = "idle_timeout_ms",
 629                    .ca_mode = S_IRUGO | S_IWUSR },
 630        .show   = r2nm_cluster_attr_idle_timeout_ms_read,
 631        .store  = r2nm_cluster_attr_idle_timeout_ms_write,
 632};
 633
 634static struct r2nm_cluster_attribute r2nm_cluster_attr_keepalive_delay_ms = {
 635        .attr   = { .ca_owner = THIS_MODULE,
 636                    .ca_name = "keepalive_delay_ms",
 637                    .ca_mode = S_IRUGO | S_IWUSR },
 638        .show   = r2nm_cluster_attr_keepalive_delay_ms_read,
 639        .store  = r2nm_cluster_attr_keepalive_delay_ms_write,
 640};
 641
 642static struct r2nm_cluster_attribute r2nm_cluster_attr_reconnect_delay_ms = {
 643        .attr   = { .ca_owner = THIS_MODULE,
 644                    .ca_name = "reconnect_delay_ms",
 645                    .ca_mode = S_IRUGO | S_IWUSR },
 646        .show   = r2nm_cluster_attr_reconnect_delay_ms_read,
 647        .store  = r2nm_cluster_attr_reconnect_delay_ms_write,
 648};
 649
 650static struct r2nm_cluster_attribute r2nm_cluster_attr_fence_method = {
 651        .attr   = { .ca_owner = THIS_MODULE,
 652                    .ca_name = "fence_method",
 653                    .ca_mode = S_IRUGO | S_IWUSR },
 654        .show   = r2nm_cluster_attr_fence_method_read,
 655        .store  = r2nm_cluster_attr_fence_method_write,
 656};
 657
 658static struct configfs_attribute *r2nm_cluster_attrs[] = {
 659        &r2nm_cluster_attr_idle_timeout_ms.attr,
 660        &r2nm_cluster_attr_keepalive_delay_ms.attr,
 661        &r2nm_cluster_attr_reconnect_delay_ms.attr,
 662        &r2nm_cluster_attr_fence_method.attr,
 663        NULL,
 664};
 665static ssize_t r2nm_cluster_show(struct config_item *item,
 666                                        struct configfs_attribute *attr,
 667                                        char *page)
 668{
 669        struct r2nm_cluster *cluster = to_r2nm_cluster(item);
 670        struct r2nm_cluster_attribute *r2nm_cluster_attr =
 671                container_of(attr, struct r2nm_cluster_attribute, attr);
 672        ssize_t ret = 0;
 673
 674        if (r2nm_cluster_attr->show)
 675                ret = r2nm_cluster_attr->show(cluster, page);
 676        return ret;
 677}
 678
 679static ssize_t r2nm_cluster_store(struct config_item *item,
 680                                        struct configfs_attribute *attr,
 681                                        const char *page, size_t count)
 682{
 683        struct r2nm_cluster *cluster = to_r2nm_cluster(item);
 684        struct r2nm_cluster_attribute *r2nm_cluster_attr =
 685                container_of(attr, struct r2nm_cluster_attribute, attr);
 686        ssize_t ret;
 687
 688        if (r2nm_cluster_attr->store == NULL) {
 689                ret = -EINVAL;
 690                goto out;
 691        }
 692
 693        ret = r2nm_cluster_attr->store(cluster, page, count);
 694        if (ret < count)
 695                goto out;
 696out:
 697        return ret;
 698}
 699
 700static struct config_item *r2nm_node_group_make_item(struct config_group *group,
 701                                                     const char *name)
 702{
 703        struct r2nm_node *node = NULL;
 704
 705        if (strlen(name) > R2NM_MAX_NAME_LEN)
 706                return ERR_PTR(-ENAMETOOLONG);
 707
 708        node = kzalloc(sizeof(struct r2nm_node), GFP_KERNEL);
 709        if (node == NULL)
 710                return ERR_PTR(-ENOMEM);
 711
 712        strcpy(node->nd_name, name); /* use item.ci_namebuf instead? */
 713        config_item_init_type_name(&node->nd_item, name, &r2nm_node_type);
 714        spin_lock_init(&node->nd_lock);
 715
 716        mlog(ML_CLUSTER, "r2nm: Registering node %s\n", name);
 717
 718        return &node->nd_item;
 719}
 720
 721static void r2nm_node_group_drop_item(struct config_group *group,
 722                                      struct config_item *item)
 723{
 724        struct r2nm_node *node = to_r2nm_node(item);
 725        struct r2nm_cluster *cluster =
 726                                to_r2nm_cluster(group->cg_item.ci_parent);
 727
 728        r2net_disconnect_node(node);
 729
 730        if (cluster->cl_has_local &&
 731            (cluster->cl_local_node == node->nd_num)) {
 732                cluster->cl_has_local = 0;
 733                cluster->cl_local_node = R2NM_INVALID_NODE_NUM;
 734                r2net_stop_listening(node);
 735        }
 736
 737        /* XXX call into net to stop this node from trading messages */
 738
 739        write_lock(&cluster->cl_nodes_lock);
 740
 741        /* XXX sloppy */
 742        if (node->nd_ipv4_address)
 743                rb_erase(&node->nd_ip_node, &cluster->cl_node_ip_tree);
 744
 745        /* nd_num might be 0 if the node number hasn't been set.. */
 746        if (cluster->cl_nodes[node->nd_num] == node) {
 747                cluster->cl_nodes[node->nd_num] = NULL;
 748                clear_bit(node->nd_num, cluster->cl_nodes_bitmap);
 749        }
 750        write_unlock(&cluster->cl_nodes_lock);
 751
 752        mlog(ML_CLUSTER, "r2nm: Unregistered node %s\n",
 753             config_item_name(&node->nd_item));
 754
 755        config_item_put(item);
 756}
 757
 758static struct configfs_group_operations r2nm_node_group_group_ops = {
 759        .make_item      = r2nm_node_group_make_item,
 760        .drop_item      = r2nm_node_group_drop_item,
 761};
 762
 763static struct config_item_type r2nm_node_group_type = {
 764        .ct_group_ops   = &r2nm_node_group_group_ops,
 765        .ct_owner       = THIS_MODULE,
 766};
 767
 768/* cluster */
 769
 770static void r2nm_cluster_release(struct config_item *item)
 771{
 772        struct r2nm_cluster *cluster = to_r2nm_cluster(item);
 773
 774        kfree(cluster->cl_group.default_groups);
 775        kfree(cluster);
 776}
 777
 778static struct configfs_item_operations r2nm_cluster_item_ops = {
 779        .release        = r2nm_cluster_release,
 780        .show_attribute         = r2nm_cluster_show,
 781        .store_attribute        = r2nm_cluster_store,
 782};
 783
 784static struct config_item_type r2nm_cluster_type = {
 785        .ct_item_ops    = &r2nm_cluster_item_ops,
 786        .ct_attrs       = r2nm_cluster_attrs,
 787        .ct_owner       = THIS_MODULE,
 788};
 789
 790/* cluster set */
 791
 792struct r2nm_cluster_group {
 793        struct configfs_subsystem cs_subsys;
 794        /* some stuff? */
 795};
 796
 797#if 0
 798static struct r2nm_cluster_group *
 799to_r2nm_cluster_group(struct config_group *group)
 800{
 801        return group ?
 802                container_of(to_configfs_subsystem(group),
 803                                struct r2nm_cluster_group, cs_subsys)
 804               : NULL;
 805}
 806#endif
 807
 808static struct config_group *
 809r2nm_cluster_group_make_group(struct config_group *group,
 810                                                          const char *name)
 811{
 812        struct r2nm_cluster *cluster = NULL;
 813        struct r2nm_node_group *ns = NULL;
 814        struct config_group *r2hb_group = NULL, *ret = NULL;
 815        void *defs = NULL;
 816
 817        /* this runs under the parent dir's i_mutex; there can be only
 818         * one caller in here at a time */
 819        if (r2nm_single_cluster)
 820                return ERR_PTR(-ENOSPC);
 821
 822        cluster = kzalloc(sizeof(struct r2nm_cluster), GFP_KERNEL);
 823        ns = kzalloc(sizeof(struct r2nm_node_group), GFP_KERNEL);
 824        defs = kcalloc(3, sizeof(struct config_group *), GFP_KERNEL);
 825        r2hb_group = r2hb_alloc_hb_set();
 826        if (cluster == NULL || ns == NULL || r2hb_group == NULL || defs == NULL)
 827                goto out;
 828
 829        config_group_init_type_name(&cluster->cl_group, name,
 830                                    &r2nm_cluster_type);
 831        config_group_init_type_name(&ns->ns_group, "node",
 832                                    &r2nm_node_group_type);
 833
 834        cluster->cl_group.default_groups = defs;
 835        cluster->cl_group.default_groups[0] = &ns->ns_group;
 836        cluster->cl_group.default_groups[1] = r2hb_group;
 837        cluster->cl_group.default_groups[2] = NULL;
 838        rwlock_init(&cluster->cl_nodes_lock);
 839        cluster->cl_node_ip_tree = RB_ROOT;
 840        cluster->cl_reconnect_delay_ms = R2NET_RECONNECT_DELAY_MS_DEFAULT;
 841        cluster->cl_idle_timeout_ms    = R2NET_IDLE_TIMEOUT_MS_DEFAULT;
 842        cluster->cl_keepalive_delay_ms = R2NET_KEEPALIVE_DELAY_MS_DEFAULT;
 843        cluster->cl_fence_method       = R2NM_FENCE_RESET;
 844
 845        ret = &cluster->cl_group;
 846        r2nm_single_cluster = cluster;
 847
 848out:
 849        if (ret == NULL) {
 850                kfree(cluster);
 851                kfree(ns);
 852                r2hb_free_hb_set(r2hb_group);
 853                kfree(defs);
 854                ret = ERR_PTR(-ENOMEM);
 855        }
 856
 857        return ret;
 858}
 859
 860static void r2nm_cluster_group_drop_item(struct config_group *group,
 861                                                struct config_item *item)
 862{
 863        struct r2nm_cluster *cluster = to_r2nm_cluster(item);
 864        int i;
 865        struct config_item *killme;
 866
 867        BUG_ON(r2nm_single_cluster != cluster);
 868        r2nm_single_cluster = NULL;
 869
 870        for (i = 0; cluster->cl_group.default_groups[i]; i++) {
 871                killme = &cluster->cl_group.default_groups[i]->cg_item;
 872                cluster->cl_group.default_groups[i] = NULL;
 873                config_item_put(killme);
 874        }
 875
 876        config_item_put(item);
 877}
 878
 879static struct configfs_group_operations r2nm_cluster_group_group_ops = {
 880        .make_group     = r2nm_cluster_group_make_group,
 881        .drop_item      = r2nm_cluster_group_drop_item,
 882};
 883
 884static struct config_item_type r2nm_cluster_group_type = {
 885        .ct_group_ops   = &r2nm_cluster_group_group_ops,
 886        .ct_owner       = THIS_MODULE,
 887};
 888
 889static struct r2nm_cluster_group r2nm_cluster_group = {
 890        .cs_subsys = {
 891                .su_group = {
 892                        .cg_item = {
 893                                .ci_namebuf = "cluster",
 894                                .ci_type = &r2nm_cluster_group_type,
 895                        },
 896                },
 897        },
 898};
 899
 900int r2nm_depend_item(struct config_item *item)
 901{
 902        return configfs_depend_item(&r2nm_cluster_group.cs_subsys, item);
 903}
 904
 905void r2nm_undepend_item(struct config_item *item)
 906{
 907        configfs_undepend_item(&r2nm_cluster_group.cs_subsys, item);
 908}
 909
 910int r2nm_depend_this_node(void)
 911{
 912        int ret = 0;
 913        struct r2nm_node *local_node;
 914
 915        local_node = r2nm_get_node_by_num(r2nm_this_node());
 916        if (!local_node) {
 917                ret = -EINVAL;
 918                goto out;
 919        }
 920
 921        ret = r2nm_depend_item(&local_node->nd_item);
 922        r2nm_node_put(local_node);
 923
 924out:
 925        return ret;
 926}
 927
 928void r2nm_undepend_this_node(void)
 929{
 930        struct r2nm_node *local_node;
 931
 932        local_node = r2nm_get_node_by_num(r2nm_this_node());
 933        BUG_ON(!local_node);
 934
 935        r2nm_undepend_item(&local_node->nd_item);
 936        r2nm_node_put(local_node);
 937}
 938
 939
 940static void __exit exit_r2nm(void)
 941{
 942        /* XXX sync with hb callbacks and shut down hb? */
 943        r2net_unregister_hb_callbacks();
 944        configfs_unregister_subsystem(&r2nm_cluster_group.cs_subsys);
 945
 946        r2net_exit();
 947        r2hb_exit();
 948}
 949
 950static int __init init_r2nm(void)
 951{
 952        int ret = -1;
 953
 954        ret = r2hb_init();
 955        if (ret)
 956                goto out;
 957
 958        ret = r2net_init();
 959        if (ret)
 960                goto out_r2hb;
 961
 962        ret = r2net_register_hb_callbacks();
 963        if (ret)
 964                goto out_r2net;
 965
 966        config_group_init(&r2nm_cluster_group.cs_subsys.su_group);
 967        mutex_init(&r2nm_cluster_group.cs_subsys.su_mutex);
 968        ret = configfs_register_subsystem(&r2nm_cluster_group.cs_subsys);
 969        if (ret) {
 970                printk(KERN_ERR "nodemanager: Registration returned %d\n", ret);
 971                goto out_callbacks;
 972        }
 973
 974        if (!ret)
 975                goto out;
 976
 977        configfs_unregister_subsystem(&r2nm_cluster_group.cs_subsys);
 978out_callbacks:
 979        r2net_unregister_hb_callbacks();
 980out_r2net:
 981        r2net_exit();
 982out_r2hb:
 983        r2hb_exit();
 984out:
 985        return ret;
 986}
 987
 988MODULE_AUTHOR("Oracle");
 989MODULE_LICENSE("GPL");
 990
 991module_init(init_r2nm)
 992module_exit(exit_r2nm)
 993