linux/drivers/staging/zcache/ramster/heartbeat.c
<<
>>
Prefs
   1/* -*- mode: c; c-basic-offset: 8; -*-
   2 * vim: noexpandtab sw=8 ts=8 sts=0:
   3 *
   4 * Copyright (C) 2004, 2005, 2012 Oracle.  All rights reserved.
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public
   8 * License as published by the Free Software Foundation; either
   9 * version 2 of the License, or (at your option) any later version.
  10 *
  11 * This program is distributed in the hope that it will be useful,
  12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * General Public License for more details.
  15 *
  16 * You should have received a copy of the GNU General Public
  17 * License along with this program; if not, write to the
  18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  19 * Boston, MA 021110-1307, USA.
  20 */
  21
  22#include <linux/kernel.h>
  23#include <linux/module.h>
  24#include <linux/configfs.h>
  25
  26#include "heartbeat.h"
  27#include "tcp.h"
  28#include "nodemanager.h"
  29
  30#include "masklog.h"
  31
  32/*
  33 * The first heartbeat pass had one global thread that would serialize all hb
  34 * callback calls.  This global serializing sem should only be removed once
  35 * we've made sure that all callees can deal with being called concurrently
  36 * from multiple hb region threads.
  37 */
  38static DECLARE_RWSEM(r2hb_callback_sem);
  39
  40/*
  41 * multiple hb threads are watching multiple regions.  A node is live
  42 * whenever any of the threads sees activity from the node in its region.
  43 */
  44static DEFINE_SPINLOCK(r2hb_live_lock);
  45static unsigned long r2hb_live_node_bitmap[BITS_TO_LONGS(R2NM_MAX_NODES)];
  46
  47static struct r2hb_callback {
  48        struct list_head list;
  49} r2hb_callbacks[R2HB_NUM_CB];
  50
  51enum r2hb_heartbeat_modes {
  52        R2HB_HEARTBEAT_LOCAL            = 0,
  53        R2HB_HEARTBEAT_GLOBAL,
  54        R2HB_HEARTBEAT_NUM_MODES,
  55};
  56
  57char *r2hb_heartbeat_mode_desc[R2HB_HEARTBEAT_NUM_MODES] = {
  58                "local",        /* R2HB_HEARTBEAT_LOCAL */
  59                "global",       /* R2HB_HEARTBEAT_GLOBAL */
  60};
  61
  62unsigned int r2hb_dead_threshold = R2HB_DEFAULT_DEAD_THRESHOLD;
  63unsigned int r2hb_heartbeat_mode = R2HB_HEARTBEAT_LOCAL;
  64
  65/* Only sets a new threshold if there are no active regions.
  66 *
  67 * No locking or otherwise interesting code is required for reading
  68 * r2hb_dead_threshold as it can't change once regions are active and
  69 * it's not interesting to anyone until then anyway. */
  70static void r2hb_dead_threshold_set(unsigned int threshold)
  71{
  72        if (threshold > R2HB_MIN_DEAD_THRESHOLD) {
  73                spin_lock(&r2hb_live_lock);
  74                r2hb_dead_threshold = threshold;
  75                spin_unlock(&r2hb_live_lock);
  76        }
  77}
  78
  79static int r2hb_global_hearbeat_mode_set(unsigned int hb_mode)
  80{
  81        int ret = -1;
  82
  83        if (hb_mode < R2HB_HEARTBEAT_NUM_MODES) {
  84                spin_lock(&r2hb_live_lock);
  85                r2hb_heartbeat_mode = hb_mode;
  86                ret = 0;
  87                spin_unlock(&r2hb_live_lock);
  88        }
  89
  90        return ret;
  91}
  92
  93void r2hb_exit(void)
  94{
  95}
  96
  97int r2hb_init(void)
  98{
  99        int i;
 100
 101        for (i = 0; i < ARRAY_SIZE(r2hb_callbacks); i++)
 102                INIT_LIST_HEAD(&r2hb_callbacks[i].list);
 103
 104        memset(r2hb_live_node_bitmap, 0, sizeof(r2hb_live_node_bitmap));
 105
 106        return 0;
 107}
 108
 109/* if we're already in a callback then we're already serialized by the sem */
 110static void r2hb_fill_node_map_from_callback(unsigned long *map,
 111                                             unsigned bytes)
 112{
 113        BUG_ON(bytes < (BITS_TO_LONGS(R2NM_MAX_NODES) * sizeof(unsigned long)));
 114
 115        memcpy(map, &r2hb_live_node_bitmap, bytes);
 116}
 117
 118/*
 119 * get a map of all nodes that are heartbeating in any regions
 120 */
 121void r2hb_fill_node_map(unsigned long *map, unsigned bytes)
 122{
 123        /* callers want to serialize this map and callbacks so that they
 124         * can trust that they don't miss nodes coming to the party */
 125        down_read(&r2hb_callback_sem);
 126        spin_lock(&r2hb_live_lock);
 127        r2hb_fill_node_map_from_callback(map, bytes);
 128        spin_unlock(&r2hb_live_lock);
 129        up_read(&r2hb_callback_sem);
 130}
 131EXPORT_SYMBOL_GPL(r2hb_fill_node_map);
 132
 133/*
 134 * heartbeat configfs bits.  The heartbeat set is a default set under
 135 * the cluster set in nodemanager.c.
 136 */
 137
 138/* heartbeat set */
 139
 140struct r2hb_hb_group {
 141        struct config_group hs_group;
 142        /* some stuff? */
 143};
 144
 145static struct r2hb_hb_group *to_r2hb_hb_group(struct config_group *group)
 146{
 147        return group ?
 148                container_of(group, struct r2hb_hb_group, hs_group)
 149                : NULL;
 150}
 151
 152static struct config_item r2hb_config_item;
 153
 154static struct config_item *r2hb_hb_group_make_item(struct config_group *group,
 155                                                          const char *name)
 156{
 157        int ret;
 158
 159        if (strlen(name) > R2HB_MAX_REGION_NAME_LEN) {
 160                ret = -ENAMETOOLONG;
 161                goto free;
 162        }
 163
 164        config_item_put(&r2hb_config_item);
 165
 166        return &r2hb_config_item;
 167free:
 168        return ERR_PTR(ret);
 169}
 170
 171static void r2hb_hb_group_drop_item(struct config_group *group,
 172                                           struct config_item *item)
 173{
 174        if (r2hb_global_heartbeat_active()) {
 175                pr_notice("ramster: Heartbeat %s on region %s (%s)\n",
 176                        "stopped/aborted", config_item_name(item),
 177                        "no region");
 178        }
 179
 180        config_item_put(item);
 181}
 182
 183struct r2hb_hb_group_attribute {
 184        struct configfs_attribute attr;
 185        ssize_t (*show)(struct r2hb_hb_group *, char *);
 186        ssize_t (*store)(struct r2hb_hb_group *, const char *, size_t);
 187};
 188
 189static ssize_t r2hb_hb_group_show(struct config_item *item,
 190                                         struct configfs_attribute *attr,
 191                                         char *page)
 192{
 193        struct r2hb_hb_group *reg = to_r2hb_hb_group(to_config_group(item));
 194        struct r2hb_hb_group_attribute *r2hb_hb_group_attr =
 195                container_of(attr, struct r2hb_hb_group_attribute, attr);
 196        ssize_t ret = 0;
 197
 198        if (r2hb_hb_group_attr->show)
 199                ret = r2hb_hb_group_attr->show(reg, page);
 200        return ret;
 201}
 202
 203static ssize_t r2hb_hb_group_store(struct config_item *item,
 204                                          struct configfs_attribute *attr,
 205                                          const char *page, size_t count)
 206{
 207        struct r2hb_hb_group *reg = to_r2hb_hb_group(to_config_group(item));
 208        struct r2hb_hb_group_attribute *r2hb_hb_group_attr =
 209                container_of(attr, struct r2hb_hb_group_attribute, attr);
 210        ssize_t ret = -EINVAL;
 211
 212        if (r2hb_hb_group_attr->store)
 213                ret = r2hb_hb_group_attr->store(reg, page, count);
 214        return ret;
 215}
 216
 217static ssize_t r2hb_hb_group_threshold_show(struct r2hb_hb_group *group,
 218                                                     char *page)
 219{
 220        return sprintf(page, "%u\n", r2hb_dead_threshold);
 221}
 222
 223static ssize_t r2hb_hb_group_threshold_store(struct r2hb_hb_group *group,
 224                                                    const char *page,
 225                                                    size_t count)
 226{
 227        unsigned long tmp;
 228        char *p = (char *)page;
 229        int err;
 230
 231        err = kstrtoul(p, 10, &tmp);
 232        if (err)
 233                return err;
 234
 235        /* this will validate ranges for us. */
 236        r2hb_dead_threshold_set((unsigned int) tmp);
 237
 238        return count;
 239}
 240
 241static
 242ssize_t r2hb_hb_group_mode_show(struct r2hb_hb_group *group,
 243                                       char *page)
 244{
 245        return sprintf(page, "%s\n",
 246                       r2hb_heartbeat_mode_desc[r2hb_heartbeat_mode]);
 247}
 248
 249static
 250ssize_t r2hb_hb_group_mode_store(struct r2hb_hb_group *group,
 251                                        const char *page, size_t count)
 252{
 253        unsigned int i;
 254        int ret;
 255        size_t len;
 256
 257        len = (page[count - 1] == '\n') ? count - 1 : count;
 258        if (!len)
 259                return -EINVAL;
 260
 261        for (i = 0; i < R2HB_HEARTBEAT_NUM_MODES; ++i) {
 262                if (strnicmp(page, r2hb_heartbeat_mode_desc[i], len))
 263                        continue;
 264
 265                ret = r2hb_global_hearbeat_mode_set(i);
 266                if (!ret)
 267                        pr_notice("ramster: Heartbeat mode set to %s\n",
 268                               r2hb_heartbeat_mode_desc[i]);
 269                return count;
 270        }
 271
 272        return -EINVAL;
 273
 274}
 275
 276static struct r2hb_hb_group_attribute r2hb_hb_group_attr_threshold = {
 277        .attr   = { .ca_owner = THIS_MODULE,
 278                    .ca_name = "dead_threshold",
 279                    .ca_mode = S_IRUGO | S_IWUSR },
 280        .show   = r2hb_hb_group_threshold_show,
 281        .store  = r2hb_hb_group_threshold_store,
 282};
 283
 284static struct r2hb_hb_group_attribute r2hb_hb_group_attr_mode = {
 285        .attr   = { .ca_owner = THIS_MODULE,
 286                .ca_name = "mode",
 287                .ca_mode = S_IRUGO | S_IWUSR },
 288        .show   = r2hb_hb_group_mode_show,
 289        .store  = r2hb_hb_group_mode_store,
 290};
 291
 292static struct configfs_attribute *r2hb_hb_group_attrs[] = {
 293        &r2hb_hb_group_attr_threshold.attr,
 294        &r2hb_hb_group_attr_mode.attr,
 295        NULL,
 296};
 297
 298static struct configfs_item_operations r2hb_hearbeat_group_item_ops = {
 299        .show_attribute         = r2hb_hb_group_show,
 300        .store_attribute        = r2hb_hb_group_store,
 301};
 302
 303static struct configfs_group_operations r2hb_hb_group_group_ops = {
 304        .make_item      = r2hb_hb_group_make_item,
 305        .drop_item      = r2hb_hb_group_drop_item,
 306};
 307
 308static struct config_item_type r2hb_hb_group_type = {
 309        .ct_group_ops   = &r2hb_hb_group_group_ops,
 310        .ct_item_ops    = &r2hb_hearbeat_group_item_ops,
 311        .ct_attrs       = r2hb_hb_group_attrs,
 312        .ct_owner       = THIS_MODULE,
 313};
 314
 315/* this is just here to avoid touching group in heartbeat.h which the
 316 * entire damn world #includes */
 317struct config_group *r2hb_alloc_hb_set(void)
 318{
 319        struct r2hb_hb_group *hs = NULL;
 320        struct config_group *ret = NULL;
 321
 322        hs = kzalloc(sizeof(struct r2hb_hb_group), GFP_KERNEL);
 323        if (hs == NULL)
 324                goto out;
 325
 326        config_group_init_type_name(&hs->hs_group, "heartbeat",
 327                                    &r2hb_hb_group_type);
 328
 329        ret = &hs->hs_group;
 330out:
 331        if (ret == NULL)
 332                kfree(hs);
 333        return ret;
 334}
 335
 336void r2hb_free_hb_set(struct config_group *group)
 337{
 338        struct r2hb_hb_group *hs = to_r2hb_hb_group(group);
 339        kfree(hs);
 340}
 341
 342/* hb callback registration and issuing */
 343
 344static struct r2hb_callback *hbcall_from_type(enum r2hb_callback_type type)
 345{
 346        if (type == R2HB_NUM_CB)
 347                return ERR_PTR(-EINVAL);
 348
 349        return &r2hb_callbacks[type];
 350}
 351
 352void r2hb_setup_callback(struct r2hb_callback_func *hc,
 353                         enum r2hb_callback_type type,
 354                         r2hb_cb_func *func,
 355                         void *data,
 356                         int priority)
 357{
 358        INIT_LIST_HEAD(&hc->hc_item);
 359        hc->hc_func = func;
 360        hc->hc_data = data;
 361        hc->hc_priority = priority;
 362        hc->hc_type = type;
 363        hc->hc_magic = R2HB_CB_MAGIC;
 364}
 365EXPORT_SYMBOL_GPL(r2hb_setup_callback);
 366
 367int r2hb_register_callback(const char *region_uuid,
 368                           struct r2hb_callback_func *hc)
 369{
 370        struct r2hb_callback_func *tmp;
 371        struct list_head *iter;
 372        struct r2hb_callback *hbcall;
 373        int ret;
 374
 375        BUG_ON(hc->hc_magic != R2HB_CB_MAGIC);
 376        BUG_ON(!list_empty(&hc->hc_item));
 377
 378        hbcall = hbcall_from_type(hc->hc_type);
 379        if (IS_ERR(hbcall)) {
 380                ret = PTR_ERR(hbcall);
 381                goto out;
 382        }
 383
 384        down_write(&r2hb_callback_sem);
 385
 386        list_for_each(iter, &hbcall->list) {
 387                tmp = list_entry(iter, struct r2hb_callback_func, hc_item);
 388                if (hc->hc_priority < tmp->hc_priority) {
 389                        list_add_tail(&hc->hc_item, iter);
 390                        break;
 391                }
 392        }
 393        if (list_empty(&hc->hc_item))
 394                list_add_tail(&hc->hc_item, &hbcall->list);
 395
 396        up_write(&r2hb_callback_sem);
 397        ret = 0;
 398out:
 399        mlog(ML_CLUSTER, "returning %d on behalf of %p for funcs %p\n",
 400             ret, __builtin_return_address(0), hc);
 401        return ret;
 402}
 403EXPORT_SYMBOL_GPL(r2hb_register_callback);
 404
 405void r2hb_unregister_callback(const char *region_uuid,
 406                              struct r2hb_callback_func *hc)
 407{
 408        BUG_ON(hc->hc_magic != R2HB_CB_MAGIC);
 409
 410        mlog(ML_CLUSTER, "on behalf of %p for funcs %p\n",
 411             __builtin_return_address(0), hc);
 412
 413        /* XXX Can this happen _with_ a region reference? */
 414        if (list_empty(&hc->hc_item))
 415                return;
 416
 417        down_write(&r2hb_callback_sem);
 418
 419        list_del_init(&hc->hc_item);
 420
 421        up_write(&r2hb_callback_sem);
 422}
 423EXPORT_SYMBOL_GPL(r2hb_unregister_callback);
 424
 425int r2hb_check_node_heartbeating_from_callback(u8 node_num)
 426{
 427        unsigned long testing_map[BITS_TO_LONGS(R2NM_MAX_NODES)];
 428
 429        r2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
 430        if (!test_bit(node_num, testing_map)) {
 431                mlog(ML_HEARTBEAT,
 432                     "node (%u) does not have heartbeating enabled.\n",
 433                     node_num);
 434                return 0;
 435        }
 436
 437        return 1;
 438}
 439EXPORT_SYMBOL_GPL(r2hb_check_node_heartbeating_from_callback);
 440
 441void r2hb_stop_all_regions(void)
 442{
 443}
 444EXPORT_SYMBOL_GPL(r2hb_stop_all_regions);
 445
 446/*
 447 * this is just a hack until we get the plumbing which flips file systems
 448 * read only and drops the hb ref instead of killing the node dead.
 449 */
 450int r2hb_global_heartbeat_active(void)
 451{
 452        return (r2hb_heartbeat_mode == R2HB_HEARTBEAT_GLOBAL);
 453}
 454EXPORT_SYMBOL(r2hb_global_heartbeat_active);
 455
 456/* added for RAMster */
 457void r2hb_manual_set_node_heartbeating(int node_num)
 458{
 459        if (node_num < R2NM_MAX_NODES)
 460                set_bit(node_num, r2hb_live_node_bitmap);
 461}
 462EXPORT_SYMBOL(r2hb_manual_set_node_heartbeating);
 463