linux/fs/ocfs2/cluster/heartbeat.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/* -*- mode: c; c-basic-offset: 8; -*-
   3 * vim: noexpandtab sw=8 ts=8 sts=0:
   4 *
   5 * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
   6 */
   7
   8#include <linux/kernel.h>
   9#include <linux/sched.h>
  10#include <linux/jiffies.h>
  11#include <linux/module.h>
  12#include <linux/fs.h>
  13#include <linux/bio.h>
  14#include <linux/blkdev.h>
  15#include <linux/delay.h>
  16#include <linux/file.h>
  17#include <linux/kthread.h>
  18#include <linux/configfs.h>
  19#include <linux/random.h>
  20#include <linux/crc32.h>
  21#include <linux/time.h>
  22#include <linux/debugfs.h>
  23#include <linux/slab.h>
  24#include <linux/bitmap.h>
  25#include <linux/ktime.h>
  26#include "heartbeat.h"
  27#include "tcp.h"
  28#include "nodemanager.h"
  29#include "quorum.h"
  30
  31#include "masklog.h"
  32
  33
  34/*
  35 * The first heartbeat pass had one global thread that would serialize all hb
  36 * callback calls.  This global serializing sem should only be removed once
  37 * we've made sure that all callees can deal with being called concurrently
  38 * from multiple hb region threads.
  39 */
  40static DECLARE_RWSEM(o2hb_callback_sem);
  41
  42/*
  43 * multiple hb threads are watching multiple regions.  A node is live
  44 * whenever any of the threads sees activity from the node in its region.
  45 */
  46static DEFINE_SPINLOCK(o2hb_live_lock);
  47static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
  48static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
  49static LIST_HEAD(o2hb_node_events);
  50static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
  51
  52/*
  53 * In global heartbeat, we maintain a series of region bitmaps.
  54 *      - o2hb_region_bitmap allows us to limit the region number to max region.
  55 *      - o2hb_live_region_bitmap tracks live regions (seen steady iterations).
  56 *      - o2hb_quorum_region_bitmap tracks live regions that have seen all nodes
  57 *              heartbeat on it.
  58 *      - o2hb_failed_region_bitmap tracks the regions that have seen io timeouts.
  59 */
  60static unsigned long o2hb_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
  61static unsigned long o2hb_live_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
  62static unsigned long o2hb_quorum_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
  63static unsigned long o2hb_failed_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
  64
  65#define O2HB_DB_TYPE_LIVENODES          0
  66#define O2HB_DB_TYPE_LIVEREGIONS        1
  67#define O2HB_DB_TYPE_QUORUMREGIONS      2
  68#define O2HB_DB_TYPE_FAILEDREGIONS      3
  69#define O2HB_DB_TYPE_REGION_LIVENODES   4
  70#define O2HB_DB_TYPE_REGION_NUMBER      5
  71#define O2HB_DB_TYPE_REGION_ELAPSED_TIME        6
  72#define O2HB_DB_TYPE_REGION_PINNED      7
  73struct o2hb_debug_buf {
  74        int db_type;
  75        int db_size;
  76        int db_len;
  77        void *db_data;
  78};
  79
  80static struct o2hb_debug_buf *o2hb_db_livenodes;
  81static struct o2hb_debug_buf *o2hb_db_liveregions;
  82static struct o2hb_debug_buf *o2hb_db_quorumregions;
  83static struct o2hb_debug_buf *o2hb_db_failedregions;
  84
  85#define O2HB_DEBUG_DIR                  "o2hb"
  86#define O2HB_DEBUG_LIVENODES            "livenodes"
  87#define O2HB_DEBUG_LIVEREGIONS          "live_regions"
  88#define O2HB_DEBUG_QUORUMREGIONS        "quorum_regions"
  89#define O2HB_DEBUG_FAILEDREGIONS        "failed_regions"
  90#define O2HB_DEBUG_REGION_NUMBER        "num"
  91#define O2HB_DEBUG_REGION_ELAPSED_TIME  "elapsed_time_in_ms"
  92#define O2HB_DEBUG_REGION_PINNED        "pinned"
  93
  94static struct dentry *o2hb_debug_dir;
  95
  96static LIST_HEAD(o2hb_all_regions);
  97
  98static struct o2hb_callback {
  99        struct list_head list;
 100} o2hb_callbacks[O2HB_NUM_CB];
 101
 102static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
 103
 104#define O2HB_DEFAULT_BLOCK_BITS       9
 105
 106enum o2hb_heartbeat_modes {
 107        O2HB_HEARTBEAT_LOCAL            = 0,
 108        O2HB_HEARTBEAT_GLOBAL,
 109        O2HB_HEARTBEAT_NUM_MODES,
 110};
 111
 112static const char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = {
 113        "local",        /* O2HB_HEARTBEAT_LOCAL */
 114        "global",       /* O2HB_HEARTBEAT_GLOBAL */
 115};
 116
 117unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
 118static unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL;
 119
 120/*
 121 * o2hb_dependent_users tracks the number of registered callbacks that depend
 122 * on heartbeat. o2net and o2dlm are two entities that register this callback.
 123 * However only o2dlm depends on the heartbeat. It does not want the heartbeat
 124 * to stop while a dlm domain is still active.
 125 */
 126static unsigned int o2hb_dependent_users;
 127
 128/*
 129 * In global heartbeat mode, all regions are pinned if there are one or more
 130 * dependent users and the quorum region count is <= O2HB_PIN_CUT_OFF. All
 131 * regions are unpinned if the region count exceeds the cut off or the number
 132 * of dependent users falls to zero.
 133 */
 134#define O2HB_PIN_CUT_OFF                3
 135
 136/*
 137 * In local heartbeat mode, we assume the dlm domain name to be the same as
 138 * region uuid. This is true for domains created for the file system but not
 139 * necessarily true for userdlm domains. This is a known limitation.
 140 *
 141 * In global heartbeat mode, we pin/unpin all o2hb regions. This solution
 142 * works for both file system and userdlm domains.
 143 */
 144static int o2hb_region_pin(const char *region_uuid);
 145static void o2hb_region_unpin(const char *region_uuid);
 146
 147/* Only sets a new threshold if there are no active regions.
 148 *
 149 * No locking or otherwise interesting code is required for reading
 150 * o2hb_dead_threshold as it can't change once regions are active and
 151 * it's not interesting to anyone until then anyway. */
 152static void o2hb_dead_threshold_set(unsigned int threshold)
 153{
 154        if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
 155                spin_lock(&o2hb_live_lock);
 156                if (list_empty(&o2hb_all_regions))
 157                        o2hb_dead_threshold = threshold;
 158                spin_unlock(&o2hb_live_lock);
 159        }
 160}
 161
 162static int o2hb_global_heartbeat_mode_set(unsigned int hb_mode)
 163{
 164        int ret = -1;
 165
 166        if (hb_mode < O2HB_HEARTBEAT_NUM_MODES) {
 167                spin_lock(&o2hb_live_lock);
 168                if (list_empty(&o2hb_all_regions)) {
 169                        o2hb_heartbeat_mode = hb_mode;
 170                        ret = 0;
 171                }
 172                spin_unlock(&o2hb_live_lock);
 173        }
 174
 175        return ret;
 176}
 177
 178struct o2hb_node_event {
 179        struct list_head        hn_item;
 180        enum o2hb_callback_type hn_event_type;
 181        struct o2nm_node        *hn_node;
 182        int                     hn_node_num;
 183};
 184
 185struct o2hb_disk_slot {
 186        struct o2hb_disk_heartbeat_block *ds_raw_block;
 187        u8                      ds_node_num;
 188        u64                     ds_last_time;
 189        u64                     ds_last_generation;
 190        u16                     ds_equal_samples;
 191        u16                     ds_changed_samples;
 192        struct list_head        ds_live_item;
 193};
 194
 195/* each thread owns a region.. when we're asked to tear down the region
 196 * we ask the thread to stop, who cleans up the region */
 197struct o2hb_region {
 198        struct config_item      hr_item;
 199
 200        struct list_head        hr_all_item;
 201        unsigned                hr_unclean_stop:1,
 202                                hr_aborted_start:1,
 203                                hr_item_pinned:1,
 204                                hr_item_dropped:1,
 205                                hr_node_deleted:1;
 206
 207        /* protected by the hr_callback_sem */
 208        struct task_struct      *hr_task;
 209
 210        unsigned int            hr_blocks;
 211        unsigned long long      hr_start_block;
 212
 213        unsigned int            hr_block_bits;
 214        unsigned int            hr_block_bytes;
 215
 216        unsigned int            hr_slots_per_page;
 217        unsigned int            hr_num_pages;
 218
 219        struct page             **hr_slot_data;
 220        struct block_device     *hr_bdev;
 221        struct o2hb_disk_slot   *hr_slots;
 222
 223        /* live node map of this region */
 224        unsigned long           hr_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 225        unsigned int            hr_region_num;
 226
 227        struct dentry           *hr_debug_dir;
 228        struct o2hb_debug_buf   *hr_db_livenodes;
 229        struct o2hb_debug_buf   *hr_db_regnum;
 230        struct o2hb_debug_buf   *hr_db_elapsed_time;
 231        struct o2hb_debug_buf   *hr_db_pinned;
 232
 233        /* let the person setting up hb wait for it to return until it
 234         * has reached a 'steady' state.  This will be fixed when we have
 235         * a more complete api that doesn't lead to this sort of fragility. */
 236        atomic_t                hr_steady_iterations;
 237
 238        /* terminate o2hb thread if it does not reach steady state
 239         * (hr_steady_iterations == 0) within hr_unsteady_iterations */
 240        atomic_t                hr_unsteady_iterations;
 241
 242        char                    hr_dev_name[BDEVNAME_SIZE];
 243
 244        unsigned int            hr_timeout_ms;
 245
 246        /* randomized as the region goes up and down so that a node
 247         * recognizes a node going up and down in one iteration */
 248        u64                     hr_generation;
 249
 250        struct delayed_work     hr_write_timeout_work;
 251        unsigned long           hr_last_timeout_start;
 252
 253        /* negotiate timer, used to negotiate extending hb timeout. */
 254        struct delayed_work     hr_nego_timeout_work;
 255        unsigned long           hr_nego_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 256
 257        /* Used during o2hb_check_slot to hold a copy of the block
 258         * being checked because we temporarily have to zero out the
 259         * crc field. */
 260        struct o2hb_disk_heartbeat_block *hr_tmp_block;
 261
 262        /* Message key for negotiate timeout message. */
 263        unsigned int            hr_key;
 264        struct list_head        hr_handler_list;
 265
 266        /* last hb status, 0 for success, other value for error. */
 267        int                     hr_last_hb_status;
 268};
 269
 270struct o2hb_bio_wait_ctxt {
 271        atomic_t          wc_num_reqs;
 272        struct completion wc_io_complete;
 273        int               wc_error;
 274};
 275
 276#define O2HB_NEGO_TIMEOUT_MS (O2HB_MAX_WRITE_TIMEOUT_MS/2)
 277
 278enum {
 279        O2HB_NEGO_TIMEOUT_MSG = 1,
 280        O2HB_NEGO_APPROVE_MSG = 2,
 281};
 282
 283struct o2hb_nego_msg {
 284        u8 node_num;
 285};
 286
 287static void o2hb_write_timeout(struct work_struct *work)
 288{
 289        int failed, quorum;
 290        struct o2hb_region *reg =
 291                container_of(work, struct o2hb_region,
 292                             hr_write_timeout_work.work);
 293
 294        mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
 295             "milliseconds\n", reg->hr_dev_name,
 296             jiffies_to_msecs(jiffies - reg->hr_last_timeout_start));
 297
 298        if (o2hb_global_heartbeat_active()) {
 299                spin_lock(&o2hb_live_lock);
 300                if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
 301                        set_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
 302                failed = bitmap_weight(o2hb_failed_region_bitmap,
 303                                        O2NM_MAX_REGIONS);
 304                quorum = bitmap_weight(o2hb_quorum_region_bitmap,
 305                                        O2NM_MAX_REGIONS);
 306                spin_unlock(&o2hb_live_lock);
 307
 308                mlog(ML_HEARTBEAT, "Number of regions %d, failed regions %d\n",
 309                     quorum, failed);
 310
 311                /*
 312                 * Fence if the number of failed regions >= half the number
 313                 * of  quorum regions
 314                 */
 315                if ((failed << 1) < quorum)
 316                        return;
 317        }
 318
 319        o2quo_disk_timeout();
 320}
 321
 322static void o2hb_arm_timeout(struct o2hb_region *reg)
 323{
 324        /* Arm writeout only after thread reaches steady state */
 325        if (atomic_read(&reg->hr_steady_iterations) != 0)
 326                return;
 327
 328        mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n",
 329             O2HB_MAX_WRITE_TIMEOUT_MS);
 330
 331        if (o2hb_global_heartbeat_active()) {
 332                spin_lock(&o2hb_live_lock);
 333                clear_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
 334                spin_unlock(&o2hb_live_lock);
 335        }
 336        cancel_delayed_work(&reg->hr_write_timeout_work);
 337        schedule_delayed_work(&reg->hr_write_timeout_work,
 338                              msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
 339
 340        cancel_delayed_work(&reg->hr_nego_timeout_work);
 341        /* negotiate timeout must be less than write timeout. */
 342        schedule_delayed_work(&reg->hr_nego_timeout_work,
 343                              msecs_to_jiffies(O2HB_NEGO_TIMEOUT_MS));
 344        memset(reg->hr_nego_node_bitmap, 0, sizeof(reg->hr_nego_node_bitmap));
 345}
 346
 347static void o2hb_disarm_timeout(struct o2hb_region *reg)
 348{
 349        cancel_delayed_work_sync(&reg->hr_write_timeout_work);
 350        cancel_delayed_work_sync(&reg->hr_nego_timeout_work);
 351}
 352
 353static int o2hb_send_nego_msg(int key, int type, u8 target)
 354{
 355        struct o2hb_nego_msg msg;
 356        int status, ret;
 357
 358        msg.node_num = o2nm_this_node();
 359again:
 360        ret = o2net_send_message(type, key, &msg, sizeof(msg),
 361                        target, &status);
 362
 363        if (ret == -EAGAIN || ret == -ENOMEM) {
 364                msleep(100);
 365                goto again;
 366        }
 367
 368        return ret;
 369}
 370
 371static void o2hb_nego_timeout(struct work_struct *work)
 372{
 373        unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 374        int master_node, i, ret;
 375        struct o2hb_region *reg;
 376
 377        reg = container_of(work, struct o2hb_region, hr_nego_timeout_work.work);
 378        /* don't negotiate timeout if last hb failed since it is very
 379         * possible io failed. Should let write timeout fence self.
 380         */
 381        if (reg->hr_last_hb_status)
 382                return;
 383
 384        o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
 385        /* lowest node as master node to make negotiate decision. */
 386        master_node = find_next_bit(live_node_bitmap, O2NM_MAX_NODES, 0);
 387
 388        if (master_node == o2nm_this_node()) {
 389                if (!test_bit(master_node, reg->hr_nego_node_bitmap)) {
 390                        printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s).\n",
 391                                o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000,
 392                                config_item_name(&reg->hr_item), reg->hr_dev_name);
 393                        set_bit(master_node, reg->hr_nego_node_bitmap);
 394                }
 395                if (memcmp(reg->hr_nego_node_bitmap, live_node_bitmap,
 396                                sizeof(reg->hr_nego_node_bitmap))) {
 397                        /* check negotiate bitmap every second to do timeout
 398                         * approve decision.
 399                         */
 400                        schedule_delayed_work(&reg->hr_nego_timeout_work,
 401                                msecs_to_jiffies(1000));
 402
 403                        return;
 404                }
 405
 406                printk(KERN_NOTICE "o2hb: all nodes hb write hung, maybe region %s (%s) is down.\n",
 407                        config_item_name(&reg->hr_item), reg->hr_dev_name);
 408                /* approve negotiate timeout request. */
 409                o2hb_arm_timeout(reg);
 410
 411                i = -1;
 412                while ((i = find_next_bit(live_node_bitmap,
 413                                O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
 414                        if (i == master_node)
 415                                continue;
 416
 417                        mlog(ML_HEARTBEAT, "send NEGO_APPROVE msg to node %d\n", i);
 418                        ret = o2hb_send_nego_msg(reg->hr_key,
 419                                        O2HB_NEGO_APPROVE_MSG, i);
 420                        if (ret)
 421                                mlog(ML_ERROR, "send NEGO_APPROVE msg to node %d fail %d\n",
 422                                        i, ret);
 423                }
 424        } else {
 425                /* negotiate timeout with master node. */
 426                printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s), negotiate timeout with node %d.\n",
 427                        o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000, config_item_name(&reg->hr_item),
 428                        reg->hr_dev_name, master_node);
 429                ret = o2hb_send_nego_msg(reg->hr_key, O2HB_NEGO_TIMEOUT_MSG,
 430                                master_node);
 431                if (ret)
 432                        mlog(ML_ERROR, "send NEGO_TIMEOUT msg to node %d fail %d\n",
 433                                master_node, ret);
 434        }
 435}
 436
 437static int o2hb_nego_timeout_handler(struct o2net_msg *msg, u32 len, void *data,
 438                                void **ret_data)
 439{
 440        struct o2hb_region *reg = data;
 441        struct o2hb_nego_msg *nego_msg;
 442
 443        nego_msg = (struct o2hb_nego_msg *)msg->buf;
 444        printk(KERN_NOTICE "o2hb: receive negotiate timeout message from node %d on region %s (%s).\n",
 445                nego_msg->node_num, config_item_name(&reg->hr_item), reg->hr_dev_name);
 446        if (nego_msg->node_num < O2NM_MAX_NODES)
 447                set_bit(nego_msg->node_num, reg->hr_nego_node_bitmap);
 448        else
 449                mlog(ML_ERROR, "got nego timeout message from bad node.\n");
 450
 451        return 0;
 452}
 453
 454static int o2hb_nego_approve_handler(struct o2net_msg *msg, u32 len, void *data,
 455                                void **ret_data)
 456{
 457        struct o2hb_region *reg = data;
 458
 459        printk(KERN_NOTICE "o2hb: negotiate timeout approved by master node on region %s (%s).\n",
 460                config_item_name(&reg->hr_item), reg->hr_dev_name);
 461        o2hb_arm_timeout(reg);
 462        return 0;
 463}
 464
 465static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
 466{
 467        atomic_set(&wc->wc_num_reqs, 1);
 468        init_completion(&wc->wc_io_complete);
 469        wc->wc_error = 0;
 470}
 471
 472/* Used in error paths too */
 473static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
 474                                     unsigned int num)
 475{
 476        /* sadly atomic_sub_and_test() isn't available on all platforms.  The
 477         * good news is that the fast path only completes one at a time */
 478        while(num--) {
 479                if (atomic_dec_and_test(&wc->wc_num_reqs)) {
 480                        BUG_ON(num > 0);
 481                        complete(&wc->wc_io_complete);
 482                }
 483        }
 484}
 485
 486static void o2hb_wait_on_io(struct o2hb_bio_wait_ctxt *wc)
 487{
 488        o2hb_bio_wait_dec(wc, 1);
 489        wait_for_completion(&wc->wc_io_complete);
 490}
 491
 492static void o2hb_bio_end_io(struct bio *bio)
 493{
 494        struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
 495
 496        if (bio->bi_status) {
 497                mlog(ML_ERROR, "IO Error %d\n", bio->bi_status);
 498                wc->wc_error = blk_status_to_errno(bio->bi_status);
 499        }
 500
 501        o2hb_bio_wait_dec(wc, 1);
 502        bio_put(bio);
 503}
 504
 505/* Setup a Bio to cover I/O against num_slots slots starting at
 506 * start_slot. */
 507static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
 508                                      struct o2hb_bio_wait_ctxt *wc,
 509                                      unsigned int *current_slot,
 510                                      unsigned int max_slots, int op,
 511                                      int op_flags)
 512{
 513        int len, current_page;
 514        unsigned int vec_len, vec_start;
 515        unsigned int bits = reg->hr_block_bits;
 516        unsigned int spp = reg->hr_slots_per_page;
 517        unsigned int cs = *current_slot;
 518        struct bio *bio;
 519        struct page *page;
 520
 521        /* Testing has shown this allocation to take long enough under
 522         * GFP_KERNEL that the local node can get fenced. It would be
 523         * nicest if we could pre-allocate these bios and avoid this
 524         * all together. */
 525        bio = bio_alloc(GFP_ATOMIC, 16);
 526        if (!bio) {
 527                mlog(ML_ERROR, "Could not alloc slots BIO!\n");
 528                bio = ERR_PTR(-ENOMEM);
 529                goto bail;
 530        }
 531
 532        /* Must put everything in 512 byte sectors for the bio... */
 533        bio->bi_iter.bi_sector = (reg->hr_start_block + cs) << (bits - 9);
 534        bio_set_dev(bio, reg->hr_bdev);
 535        bio->bi_private = wc;
 536        bio->bi_end_io = o2hb_bio_end_io;
 537        bio_set_op_attrs(bio, op, op_flags);
 538
 539        vec_start = (cs << bits) % PAGE_SIZE;
 540        while(cs < max_slots) {
 541                current_page = cs / spp;
 542                page = reg->hr_slot_data[current_page];
 543
 544                vec_len = min(PAGE_SIZE - vec_start,
 545                              (max_slots-cs) * (PAGE_SIZE/spp) );
 546
 547                mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
 548                     current_page, vec_len, vec_start);
 549
 550                len = bio_add_page(bio, page, vec_len, vec_start);
 551                if (len != vec_len) break;
 552
 553                cs += vec_len / (PAGE_SIZE/spp);
 554                vec_start = 0;
 555        }
 556
 557bail:
 558        *current_slot = cs;
 559        return bio;
 560}
 561
 562static int o2hb_read_slots(struct o2hb_region *reg,
 563                           unsigned int begin_slot,
 564                           unsigned int max_slots)
 565{
 566        unsigned int current_slot = begin_slot;
 567        int status;
 568        struct o2hb_bio_wait_ctxt wc;
 569        struct bio *bio;
 570
 571        o2hb_bio_wait_init(&wc);
 572
 573        while(current_slot < max_slots) {
 574                bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots,
 575                                         REQ_OP_READ, 0);
 576                if (IS_ERR(bio)) {
 577                        status = PTR_ERR(bio);
 578                        mlog_errno(status);
 579                        goto bail_and_wait;
 580                }
 581
 582                atomic_inc(&wc.wc_num_reqs);
 583                submit_bio(bio);
 584        }
 585
 586        status = 0;
 587
 588bail_and_wait:
 589        o2hb_wait_on_io(&wc);
 590        if (wc.wc_error && !status)
 591                status = wc.wc_error;
 592
 593        return status;
 594}
 595
 596static int o2hb_issue_node_write(struct o2hb_region *reg,
 597                                 struct o2hb_bio_wait_ctxt *write_wc)
 598{
 599        int status;
 600        unsigned int slot;
 601        struct bio *bio;
 602
 603        o2hb_bio_wait_init(write_wc);
 604
 605        slot = o2nm_this_node();
 606
 607        bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1, REQ_OP_WRITE,
 608                                 REQ_SYNC);
 609        if (IS_ERR(bio)) {
 610                status = PTR_ERR(bio);
 611                mlog_errno(status);
 612                goto bail;
 613        }
 614
 615        atomic_inc(&write_wc->wc_num_reqs);
 616        submit_bio(bio);
 617
 618        status = 0;
 619bail:
 620        return status;
 621}
 622
 623static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
 624                                     struct o2hb_disk_heartbeat_block *hb_block)
 625{
 626        __le32 old_cksum;
 627        u32 ret;
 628
 629        /* We want to compute the block crc with a 0 value in the
 630         * hb_cksum field. Save it off here and replace after the
 631         * crc. */
 632        old_cksum = hb_block->hb_cksum;
 633        hb_block->hb_cksum = 0;
 634
 635        ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
 636
 637        hb_block->hb_cksum = old_cksum;
 638
 639        return ret;
 640}
 641
 642static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
 643{
 644        mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
 645             "cksum = 0x%x, generation 0x%llx\n",
 646             (long long)le64_to_cpu(hb_block->hb_seq),
 647             hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
 648             (long long)le64_to_cpu(hb_block->hb_generation));
 649}
 650
 651static int o2hb_verify_crc(struct o2hb_region *reg,
 652                           struct o2hb_disk_heartbeat_block *hb_block)
 653{
 654        u32 read, computed;
 655
 656        read = le32_to_cpu(hb_block->hb_cksum);
 657        computed = o2hb_compute_block_crc_le(reg, hb_block);
 658
 659        return read == computed;
 660}
 661
 662/*
 663 * Compare the slot data with what we wrote in the last iteration.
 664 * If the match fails, print an appropriate error message. This is to
 665 * detect errors like... another node hearting on the same slot,
 666 * flaky device that is losing writes, etc.
 667 * Returns 1 if check succeeds, 0 otherwise.
 668 */
 669static int o2hb_check_own_slot(struct o2hb_region *reg)
 670{
 671        struct o2hb_disk_slot *slot;
 672        struct o2hb_disk_heartbeat_block *hb_block;
 673        char *errstr;
 674
 675        slot = &reg->hr_slots[o2nm_this_node()];
 676        /* Don't check on our 1st timestamp */
 677        if (!slot->ds_last_time)
 678                return 0;
 679
 680        hb_block = slot->ds_raw_block;
 681        if (le64_to_cpu(hb_block->hb_seq) == slot->ds_last_time &&
 682            le64_to_cpu(hb_block->hb_generation) == slot->ds_last_generation &&
 683            hb_block->hb_node == slot->ds_node_num)
 684                return 1;
 685
 686#define ERRSTR1         "Another node is heartbeating on device"
 687#define ERRSTR2         "Heartbeat generation mismatch on device"
 688#define ERRSTR3         "Heartbeat sequence mismatch on device"
 689
 690        if (hb_block->hb_node != slot->ds_node_num)
 691                errstr = ERRSTR1;
 692        else if (le64_to_cpu(hb_block->hb_generation) !=
 693                 slot->ds_last_generation)
 694                errstr = ERRSTR2;
 695        else
 696                errstr = ERRSTR3;
 697
 698        mlog(ML_ERROR, "%s (%s): expected(%u:0x%llx, 0x%llx), "
 699             "ondisk(%u:0x%llx, 0x%llx)\n", errstr, reg->hr_dev_name,
 700             slot->ds_node_num, (unsigned long long)slot->ds_last_generation,
 701             (unsigned long long)slot->ds_last_time, hb_block->hb_node,
 702             (unsigned long long)le64_to_cpu(hb_block->hb_generation),
 703             (unsigned long long)le64_to_cpu(hb_block->hb_seq));
 704
 705        return 0;
 706}
 707
 708static inline void o2hb_prepare_block(struct o2hb_region *reg,
 709                                      u64 generation)
 710{
 711        int node_num;
 712        u64 cputime;
 713        struct o2hb_disk_slot *slot;
 714        struct o2hb_disk_heartbeat_block *hb_block;
 715
 716        node_num = o2nm_this_node();
 717        slot = &reg->hr_slots[node_num];
 718
 719        hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
 720        memset(hb_block, 0, reg->hr_block_bytes);
 721        /* TODO: time stuff */
 722        cputime = ktime_get_real_seconds();
 723        if (!cputime)
 724                cputime = 1;
 725
 726        hb_block->hb_seq = cpu_to_le64(cputime);
 727        hb_block->hb_node = node_num;
 728        hb_block->hb_generation = cpu_to_le64(generation);
 729        hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
 730
 731        /* This step must always happen last! */
 732        hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
 733                                                                   hb_block));
 734
 735        mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
 736             (long long)generation,
 737             le32_to_cpu(hb_block->hb_cksum));
 738}
 739
 740static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
 741                                struct o2nm_node *node,
 742                                int idx)
 743{
 744        struct o2hb_callback_func *f;
 745
 746        list_for_each_entry(f, &hbcall->list, hc_item) {
 747                mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
 748                (f->hc_func)(node, idx, f->hc_data);
 749        }
 750}
 751
 752/* Will run the list in order until we process the passed event */
 753static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
 754{
 755        struct o2hb_callback *hbcall;
 756        struct o2hb_node_event *event;
 757
 758        /* Holding callback sem assures we don't alter the callback
 759         * lists when doing this, and serializes ourselves with other
 760         * processes wanting callbacks. */
 761        down_write(&o2hb_callback_sem);
 762
 763        spin_lock(&o2hb_live_lock);
 764        while (!list_empty(&o2hb_node_events)
 765               && !list_empty(&queued_event->hn_item)) {
 766                event = list_entry(o2hb_node_events.next,
 767                                   struct o2hb_node_event,
 768                                   hn_item);
 769                list_del_init(&event->hn_item);
 770                spin_unlock(&o2hb_live_lock);
 771
 772                mlog(ML_HEARTBEAT, "Node %s event for %d\n",
 773                     event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
 774                     event->hn_node_num);
 775
 776                hbcall = hbcall_from_type(event->hn_event_type);
 777
 778                /* We should *never* have gotten on to the list with a
 779                 * bad type... This isn't something that we should try
 780                 * to recover from. */
 781                BUG_ON(IS_ERR(hbcall));
 782
 783                o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
 784
 785                spin_lock(&o2hb_live_lock);
 786        }
 787        spin_unlock(&o2hb_live_lock);
 788
 789        up_write(&o2hb_callback_sem);
 790}
 791
 792static void o2hb_queue_node_event(struct o2hb_node_event *event,
 793                                  enum o2hb_callback_type type,
 794                                  struct o2nm_node *node,
 795                                  int node_num)
 796{
 797        assert_spin_locked(&o2hb_live_lock);
 798
 799        BUG_ON((!node) && (type != O2HB_NODE_DOWN_CB));
 800
 801        event->hn_event_type = type;
 802        event->hn_node = node;
 803        event->hn_node_num = node_num;
 804
 805        mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
 806             type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
 807
 808        list_add_tail(&event->hn_item, &o2hb_node_events);
 809}
 810
 811static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
 812{
 813        struct o2hb_node_event event =
 814                { .hn_item = LIST_HEAD_INIT(event.hn_item), };
 815        struct o2nm_node *node;
 816        int queued = 0;
 817
 818        node = o2nm_get_node_by_num(slot->ds_node_num);
 819        if (!node)
 820                return;
 821
 822        spin_lock(&o2hb_live_lock);
 823        if (!list_empty(&slot->ds_live_item)) {
 824                mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
 825                     slot->ds_node_num);
 826
 827                list_del_init(&slot->ds_live_item);
 828
 829                if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
 830                        clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 831
 832                        o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
 833                                              slot->ds_node_num);
 834                        queued = 1;
 835                }
 836        }
 837        spin_unlock(&o2hb_live_lock);
 838
 839        if (queued)
 840                o2hb_run_event_list(&event);
 841
 842        o2nm_node_put(node);
 843}
 844
 845static void o2hb_set_quorum_device(struct o2hb_region *reg)
 846{
 847        if (!o2hb_global_heartbeat_active())
 848                return;
 849
 850        /* Prevent race with o2hb_heartbeat_group_drop_item() */
 851        if (kthread_should_stop())
 852                return;
 853
 854        /* Tag region as quorum only after thread reaches steady state */
 855        if (atomic_read(&reg->hr_steady_iterations) != 0)
 856                return;
 857
 858        spin_lock(&o2hb_live_lock);
 859
 860        if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
 861                goto unlock;
 862
 863        /*
 864         * A region can be added to the quorum only when it sees all
 865         * live nodes heartbeat on it. In other words, the region has been
 866         * added to all nodes.
 867         */
 868        if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap,
 869                   sizeof(o2hb_live_node_bitmap)))
 870                goto unlock;
 871
 872        printk(KERN_NOTICE "o2hb: Region %s (%s) is now a quorum device\n",
 873               config_item_name(&reg->hr_item), reg->hr_dev_name);
 874
 875        set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
 876
 877        /*
 878         * If global heartbeat active, unpin all regions if the
 879         * region count > CUT_OFF
 880         */
 881        if (bitmap_weight(o2hb_quorum_region_bitmap,
 882                           O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF)
 883                o2hb_region_unpin(NULL);
 884unlock:
 885        spin_unlock(&o2hb_live_lock);
 886}
 887
 888static int o2hb_check_slot(struct o2hb_region *reg,
 889                           struct o2hb_disk_slot *slot)
 890{
 891        int changed = 0, gen_changed = 0;
 892        struct o2hb_node_event event =
 893                { .hn_item = LIST_HEAD_INIT(event.hn_item), };
 894        struct o2nm_node *node;
 895        struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
 896        u64 cputime;
 897        unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
 898        unsigned int slot_dead_ms;
 899        int tmp;
 900        int queued = 0;
 901
 902        memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
 903
 904        /*
 905         * If a node is no longer configured but is still in the livemap, we
 906         * may need to clear that bit from the livemap.
 907         */
 908        node = o2nm_get_node_by_num(slot->ds_node_num);
 909        if (!node) {
 910                spin_lock(&o2hb_live_lock);
 911                tmp = test_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 912                spin_unlock(&o2hb_live_lock);
 913                if (!tmp)
 914                        return 0;
 915        }
 916
 917        if (!o2hb_verify_crc(reg, hb_block)) {
 918                /* all paths from here will drop o2hb_live_lock for
 919                 * us. */
 920                spin_lock(&o2hb_live_lock);
 921
 922                /* Don't print an error on the console in this case -
 923                 * a freshly formatted heartbeat area will not have a
 924                 * crc set on it. */
 925                if (list_empty(&slot->ds_live_item))
 926                        goto out;
 927
 928                /* The node is live but pushed out a bad crc. We
 929                 * consider it a transient miss but don't populate any
 930                 * other values as they may be junk. */
 931                mlog(ML_ERROR, "Node %d has written a bad crc to %s\n",
 932                     slot->ds_node_num, reg->hr_dev_name);
 933                o2hb_dump_slot(hb_block);
 934
 935                slot->ds_equal_samples++;
 936                goto fire_callbacks;
 937        }
 938
 939        /* we don't care if these wrap.. the state transitions below
 940         * clear at the right places */
 941        cputime = le64_to_cpu(hb_block->hb_seq);
 942        if (slot->ds_last_time != cputime)
 943                slot->ds_changed_samples++;
 944        else
 945                slot->ds_equal_samples++;
 946        slot->ds_last_time = cputime;
 947
 948        /* The node changed heartbeat generations. We assume this to
 949         * mean it dropped off but came back before we timed out. We
 950         * want to consider it down for the time being but don't want
 951         * to lose any changed_samples state we might build up to
 952         * considering it live again. */
 953        if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
 954                gen_changed = 1;
 955                slot->ds_equal_samples = 0;
 956                mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
 957                     "to 0x%llx)\n", slot->ds_node_num,
 958                     (long long)slot->ds_last_generation,
 959                     (long long)le64_to_cpu(hb_block->hb_generation));
 960        }
 961
 962        slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
 963
 964        mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
 965             "seq %llu last %llu changed %u equal %u\n",
 966             slot->ds_node_num, (long long)slot->ds_last_generation,
 967             le32_to_cpu(hb_block->hb_cksum),
 968             (unsigned long long)le64_to_cpu(hb_block->hb_seq),
 969             (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
 970             slot->ds_equal_samples);
 971
 972        spin_lock(&o2hb_live_lock);
 973
 974fire_callbacks:
 975        /* dead nodes only come to life after some number of
 976         * changes at any time during their dead time */
 977        if (list_empty(&slot->ds_live_item) &&
 978            slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
 979                mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
 980                     slot->ds_node_num, (long long)slot->ds_last_generation);
 981
 982                set_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
 983
 984                /* first on the list generates a callback */
 985                if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
 986                        mlog(ML_HEARTBEAT, "o2hb: Add node %d to live nodes "
 987                             "bitmap\n", slot->ds_node_num);
 988                        set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 989
 990                        o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
 991                                              slot->ds_node_num);
 992
 993                        changed = 1;
 994                        queued = 1;
 995                }
 996
 997                list_add_tail(&slot->ds_live_item,
 998                              &o2hb_live_slots[slot->ds_node_num]);
 999
1000                slot->ds_equal_samples = 0;
1001
1002                /* We want to be sure that all nodes agree on the
1003                 * number of milliseconds before a node will be
1004                 * considered dead. The self-fencing timeout is
1005                 * computed from this value, and a discrepancy might
1006                 * result in heartbeat calling a node dead when it
1007                 * hasn't self-fenced yet. */
1008                slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
1009                if (slot_dead_ms && slot_dead_ms != dead_ms) {
1010                        /* TODO: Perhaps we can fail the region here. */
1011                        mlog(ML_ERROR, "Node %d on device %s has a dead count "
1012                             "of %u ms, but our count is %u ms.\n"
1013                             "Please double check your configuration values "
1014                             "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
1015                             slot->ds_node_num, reg->hr_dev_name, slot_dead_ms,
1016                             dead_ms);
1017                }
1018                goto out;
1019        }
1020
1021        /* if the list is dead, we're done.. */
1022        if (list_empty(&slot->ds_live_item))
1023                goto out;
1024
1025        /* live nodes only go dead after enough consequtive missed
1026         * samples..  reset the missed counter whenever we see
1027         * activity */
1028        if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
1029                mlog(ML_HEARTBEAT, "Node %d left my region\n",
1030                     slot->ds_node_num);
1031
1032                clear_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
1033
1034                /* last off the live_slot generates a callback */
1035                list_del_init(&slot->ds_live_item);
1036                if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
1037                        mlog(ML_HEARTBEAT, "o2hb: Remove node %d from live "
1038                             "nodes bitmap\n", slot->ds_node_num);
1039                        clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
1040
1041                        /* node can be null */
1042                        o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB,
1043                                              node, slot->ds_node_num);
1044
1045                        changed = 1;
1046                        queued = 1;
1047                }
1048
1049                /* We don't clear this because the node is still
1050                 * actually writing new blocks. */
1051                if (!gen_changed)
1052                        slot->ds_changed_samples = 0;
1053                goto out;
1054        }
1055        if (slot->ds_changed_samples) {
1056                slot->ds_changed_samples = 0;
1057                slot->ds_equal_samples = 0;
1058        }
1059out:
1060        spin_unlock(&o2hb_live_lock);
1061
1062        if (queued)
1063                o2hb_run_event_list(&event);
1064
1065        if (node)
1066                o2nm_node_put(node);
1067        return changed;
1068}
1069
1070static int o2hb_highest_node(unsigned long *nodes, int numbits)
1071{
1072        return find_last_bit(nodes, numbits);
1073}
1074
1075static int o2hb_lowest_node(unsigned long *nodes, int numbits)
1076{
1077        return find_first_bit(nodes, numbits);
1078}
1079
1080static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
1081{
1082        int i, ret, highest_node, lowest_node;
1083        int membership_change = 0, own_slot_ok = 0;
1084        unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
1085        unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
1086        struct o2hb_bio_wait_ctxt write_wc;
1087
1088        ret = o2nm_configured_node_map(configured_nodes,
1089                                       sizeof(configured_nodes));
1090        if (ret) {
1091                mlog_errno(ret);
1092                goto bail;
1093        }
1094
1095        /*
1096         * If a node is not configured but is in the livemap, we still need
1097         * to read the slot so as to be able to remove it from the livemap.
1098         */
1099        o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
1100        i = -1;
1101        while ((i = find_next_bit(live_node_bitmap,
1102                                  O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1103                set_bit(i, configured_nodes);
1104        }
1105
1106        highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
1107        lowest_node = o2hb_lowest_node(configured_nodes, O2NM_MAX_NODES);
1108        if (highest_node >= O2NM_MAX_NODES || lowest_node >= O2NM_MAX_NODES) {
1109                mlog(ML_NOTICE, "o2hb: No configured nodes found!\n");
1110                ret = -EINVAL;
1111                goto bail;
1112        }
1113
1114        /* No sense in reading the slots of nodes that don't exist
1115         * yet. Of course, if the node definitions have holes in them
1116         * then we're reading an empty slot anyway... Consider this
1117         * best-effort. */
1118        ret = o2hb_read_slots(reg, lowest_node, highest_node + 1);
1119        if (ret < 0) {
1120                mlog_errno(ret);
1121                goto bail;
1122        }
1123
1124        /* With an up to date view of the slots, we can check that no
1125         * other node has been improperly configured to heartbeat in
1126         * our slot. */
1127        own_slot_ok = o2hb_check_own_slot(reg);
1128
1129        /* fill in the proper info for our next heartbeat */
1130        o2hb_prepare_block(reg, reg->hr_generation);
1131
1132        ret = o2hb_issue_node_write(reg, &write_wc);
1133        if (ret < 0) {
1134                mlog_errno(ret);
1135                goto bail;
1136        }
1137
1138        i = -1;
1139        while((i = find_next_bit(configured_nodes,
1140                                 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1141                membership_change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
1142        }
1143
1144        /*
1145         * We have to be sure we've advertised ourselves on disk
1146         * before we can go to steady state.  This ensures that
1147         * people we find in our steady state have seen us.
1148         */
1149        o2hb_wait_on_io(&write_wc);
1150        if (write_wc.wc_error) {
1151                /* Do not re-arm the write timeout on I/O error - we
1152                 * can't be sure that the new block ever made it to
1153                 * disk */
1154                mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
1155                     write_wc.wc_error, reg->hr_dev_name);
1156                ret = write_wc.wc_error;
1157                goto bail;
1158        }
1159
1160        /* Skip disarming the timeout if own slot has stale/bad data */
1161        if (own_slot_ok) {
1162                o2hb_set_quorum_device(reg);
1163                o2hb_arm_timeout(reg);
1164                reg->hr_last_timeout_start = jiffies;
1165        }
1166
1167bail:
1168        /* let the person who launched us know when things are steady */
1169        if (atomic_read(&reg->hr_steady_iterations) != 0) {
1170                if (!ret && own_slot_ok && !membership_change) {
1171                        if (atomic_dec_and_test(&reg->hr_steady_iterations))
1172                                wake_up(&o2hb_steady_queue);
1173                }
1174        }
1175
1176        if (atomic_read(&reg->hr_steady_iterations) != 0) {
1177                if (atomic_dec_and_test(&reg->hr_unsteady_iterations)) {
1178                        printk(KERN_NOTICE "o2hb: Unable to stabilize "
1179                               "heartbeat on region %s (%s)\n",
1180                               config_item_name(&reg->hr_item),
1181                               reg->hr_dev_name);
1182                        atomic_set(&reg->hr_steady_iterations, 0);
1183                        reg->hr_aborted_start = 1;
1184                        wake_up(&o2hb_steady_queue);
1185                        ret = -EIO;
1186                }
1187        }
1188
1189        return ret;
1190}
1191
1192/*
1193 * we ride the region ref that the region dir holds.  before the region
1194 * dir is removed and drops it ref it will wait to tear down this
1195 * thread.
1196 */
1197static int o2hb_thread(void *data)
1198{
1199        int i, ret;
1200        struct o2hb_region *reg = data;
1201        struct o2hb_bio_wait_ctxt write_wc;
1202        ktime_t before_hb, after_hb;
1203        unsigned int elapsed_msec;
1204
1205        mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
1206
1207        set_user_nice(current, MIN_NICE);
1208
1209        /* Pin node */
1210        ret = o2nm_depend_this_node();
1211        if (ret) {
1212                mlog(ML_ERROR, "Node has been deleted, ret = %d\n", ret);
1213                reg->hr_node_deleted = 1;
1214                wake_up(&o2hb_steady_queue);
1215                return 0;
1216        }
1217
1218        while (!kthread_should_stop() &&
1219               !reg->hr_unclean_stop && !reg->hr_aborted_start) {
1220                /* We track the time spent inside
1221                 * o2hb_do_disk_heartbeat so that we avoid more than
1222                 * hr_timeout_ms between disk writes. On busy systems
1223                 * this should result in a heartbeat which is less
1224                 * likely to time itself out. */
1225                before_hb = ktime_get_real();
1226
1227                ret = o2hb_do_disk_heartbeat(reg);
1228                reg->hr_last_hb_status = ret;
1229
1230                after_hb = ktime_get_real();
1231
1232                elapsed_msec = (unsigned int)
1233                                ktime_ms_delta(after_hb, before_hb);
1234
1235                mlog(ML_HEARTBEAT,
1236                     "start = %lld, end = %lld, msec = %u, ret = %d\n",
1237                     before_hb, after_hb, elapsed_msec, ret);
1238
1239                if (!kthread_should_stop() &&
1240                    elapsed_msec < reg->hr_timeout_ms) {
1241                        /* the kthread api has blocked signals for us so no
1242                         * need to record the return value. */
1243                        msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
1244                }
1245        }
1246
1247        o2hb_disarm_timeout(reg);
1248
1249        /* unclean stop is only used in very bad situation */
1250        for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
1251                o2hb_shutdown_slot(&reg->hr_slots[i]);
1252
1253        /* Explicit down notification - avoid forcing the other nodes
1254         * to timeout on this region when we could just as easily
1255         * write a clear generation - thus indicating to them that
1256         * this node has left this region.
1257         */
1258        if (!reg->hr_unclean_stop && !reg->hr_aborted_start) {
1259                o2hb_prepare_block(reg, 0);
1260                ret = o2hb_issue_node_write(reg, &write_wc);
1261                if (ret == 0)
1262                        o2hb_wait_on_io(&write_wc);
1263                else
1264                        mlog_errno(ret);
1265        }
1266
1267        /* Unpin node */
1268        o2nm_undepend_this_node();
1269
1270        mlog(ML_HEARTBEAT|ML_KTHREAD, "o2hb thread exiting\n");
1271
1272        return 0;
1273}
1274
1275#ifdef CONFIG_DEBUG_FS
1276static int o2hb_debug_open(struct inode *inode, struct file *file)
1277{
1278        struct o2hb_debug_buf *db = inode->i_private;
1279        struct o2hb_region *reg;
1280        unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1281        unsigned long lts;
1282        char *buf = NULL;
1283        int i = -1;
1284        int out = 0;
1285
1286        /* max_nodes should be the largest bitmap we pass here */
1287        BUG_ON(sizeof(map) < db->db_size);
1288
1289        buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
1290        if (!buf)
1291                goto bail;
1292
1293        switch (db->db_type) {
1294        case O2HB_DB_TYPE_LIVENODES:
1295        case O2HB_DB_TYPE_LIVEREGIONS:
1296        case O2HB_DB_TYPE_QUORUMREGIONS:
1297        case O2HB_DB_TYPE_FAILEDREGIONS:
1298                spin_lock(&o2hb_live_lock);
1299                memcpy(map, db->db_data, db->db_size);
1300                spin_unlock(&o2hb_live_lock);
1301                break;
1302
1303        case O2HB_DB_TYPE_REGION_LIVENODES:
1304                spin_lock(&o2hb_live_lock);
1305                reg = (struct o2hb_region *)db->db_data;
1306                memcpy(map, reg->hr_live_node_bitmap, db->db_size);
1307                spin_unlock(&o2hb_live_lock);
1308                break;
1309
1310        case O2HB_DB_TYPE_REGION_NUMBER:
1311                reg = (struct o2hb_region *)db->db_data;
1312                out += snprintf(buf + out, PAGE_SIZE - out, "%d\n",
1313                                reg->hr_region_num);
1314                goto done;
1315
1316        case O2HB_DB_TYPE_REGION_ELAPSED_TIME:
1317                reg = (struct o2hb_region *)db->db_data;
1318                lts = reg->hr_last_timeout_start;
1319                /* If 0, it has never been set before */
1320                if (lts)
1321                        lts = jiffies_to_msecs(jiffies - lts);
1322                out += snprintf(buf + out, PAGE_SIZE - out, "%lu\n", lts);
1323                goto done;
1324
1325        case O2HB_DB_TYPE_REGION_PINNED:
1326                reg = (struct o2hb_region *)db->db_data;
1327                out += snprintf(buf + out, PAGE_SIZE - out, "%u\n",
1328                                !!reg->hr_item_pinned);
1329                goto done;
1330
1331        default:
1332                goto done;
1333        }
1334
1335        while ((i = find_next_bit(map, db->db_len, i + 1)) < db->db_len)
1336                out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i);
1337        out += snprintf(buf + out, PAGE_SIZE - out, "\n");
1338
1339done:
1340        i_size_write(inode, out);
1341
1342        file->private_data = buf;
1343
1344        return 0;
1345bail:
1346        return -ENOMEM;
1347}
1348
1349static int o2hb_debug_release(struct inode *inode, struct file *file)
1350{
1351        kfree(file->private_data);
1352        return 0;
1353}
1354
1355static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1356                                 size_t nbytes, loff_t *ppos)
1357{
1358        return simple_read_from_buffer(buf, nbytes, ppos, file->private_data,
1359                                       i_size_read(file->f_mapping->host));
1360}
1361#else
1362static int o2hb_debug_open(struct inode *inode, struct file *file)
1363{
1364        return 0;
1365}
1366static int o2hb_debug_release(struct inode *inode, struct file *file)
1367{
1368        return 0;
1369}
1370static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1371                               size_t nbytes, loff_t *ppos)
1372{
1373        return 0;
1374}
1375#endif  /* CONFIG_DEBUG_FS */
1376
1377static const struct file_operations o2hb_debug_fops = {
1378        .open =         o2hb_debug_open,
1379        .release =      o2hb_debug_release,
1380        .read =         o2hb_debug_read,
1381        .llseek =       generic_file_llseek,
1382};
1383
1384void o2hb_exit(void)
1385{
1386        debugfs_remove_recursive(o2hb_debug_dir);
1387        kfree(o2hb_db_livenodes);
1388        kfree(o2hb_db_liveregions);
1389        kfree(o2hb_db_quorumregions);
1390        kfree(o2hb_db_failedregions);
1391}
1392
1393static void o2hb_debug_create(const char *name, struct dentry *dir,
1394                              struct o2hb_debug_buf **db, int db_len, int type,
1395                              int size, int len, void *data)
1396{
1397        *db = kmalloc(db_len, GFP_KERNEL);
1398        if (!*db)
1399                return;
1400
1401        (*db)->db_type = type;
1402        (*db)->db_size = size;
1403        (*db)->db_len = len;
1404        (*db)->db_data = data;
1405
1406        debugfs_create_file(name, S_IFREG|S_IRUSR, dir, *db, &o2hb_debug_fops);
1407}
1408
1409static void o2hb_debug_init(void)
1410{
1411        o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL);
1412
1413        o2hb_debug_create(O2HB_DEBUG_LIVENODES, o2hb_debug_dir,
1414                          &o2hb_db_livenodes, sizeof(*o2hb_db_livenodes),
1415                          O2HB_DB_TYPE_LIVENODES, sizeof(o2hb_live_node_bitmap),
1416                          O2NM_MAX_NODES, o2hb_live_node_bitmap);
1417
1418        o2hb_debug_create(O2HB_DEBUG_LIVEREGIONS, o2hb_debug_dir,
1419                          &o2hb_db_liveregions, sizeof(*o2hb_db_liveregions),
1420                          O2HB_DB_TYPE_LIVEREGIONS,
1421                          sizeof(o2hb_live_region_bitmap), O2NM_MAX_REGIONS,
1422                          o2hb_live_region_bitmap);
1423
1424        o2hb_debug_create(O2HB_DEBUG_QUORUMREGIONS, o2hb_debug_dir,
1425                          &o2hb_db_quorumregions,
1426                          sizeof(*o2hb_db_quorumregions),
1427                          O2HB_DB_TYPE_QUORUMREGIONS,
1428                          sizeof(o2hb_quorum_region_bitmap), O2NM_MAX_REGIONS,
1429                          o2hb_quorum_region_bitmap);
1430
1431        o2hb_debug_create(O2HB_DEBUG_FAILEDREGIONS, o2hb_debug_dir,
1432                          &o2hb_db_failedregions,
1433                          sizeof(*o2hb_db_failedregions),
1434                          O2HB_DB_TYPE_FAILEDREGIONS,
1435                          sizeof(o2hb_failed_region_bitmap), O2NM_MAX_REGIONS,
1436                          o2hb_failed_region_bitmap);
1437}
1438
1439void o2hb_init(void)
1440{
1441        int i;
1442
1443        for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
1444                INIT_LIST_HEAD(&o2hb_callbacks[i].list);
1445
1446        for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
1447                INIT_LIST_HEAD(&o2hb_live_slots[i]);
1448
1449        INIT_LIST_HEAD(&o2hb_node_events);
1450
1451        memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
1452        memset(o2hb_region_bitmap, 0, sizeof(o2hb_region_bitmap));
1453        memset(o2hb_live_region_bitmap, 0, sizeof(o2hb_live_region_bitmap));
1454        memset(o2hb_quorum_region_bitmap, 0, sizeof(o2hb_quorum_region_bitmap));
1455        memset(o2hb_failed_region_bitmap, 0, sizeof(o2hb_failed_region_bitmap));
1456
1457        o2hb_dependent_users = 0;
1458
1459        o2hb_debug_init();
1460}
1461
1462/* if we're already in a callback then we're already serialized by the sem */
1463static void o2hb_fill_node_map_from_callback(unsigned long *map,
1464                                             unsigned bytes)
1465{
1466        BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
1467
1468        memcpy(map, &o2hb_live_node_bitmap, bytes);
1469}
1470
1471/*
1472 * get a map of all nodes that are heartbeating in any regions
1473 */
1474void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
1475{
1476        /* callers want to serialize this map and callbacks so that they
1477         * can trust that they don't miss nodes coming to the party */
1478        down_read(&o2hb_callback_sem);
1479        spin_lock(&o2hb_live_lock);
1480        o2hb_fill_node_map_from_callback(map, bytes);
1481        spin_unlock(&o2hb_live_lock);
1482        up_read(&o2hb_callback_sem);
1483}
1484EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
1485
1486/*
1487 * heartbeat configfs bits.  The heartbeat set is a default set under
1488 * the cluster set in nodemanager.c.
1489 */
1490
1491static struct o2hb_region *to_o2hb_region(struct config_item *item)
1492{
1493        return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
1494}
1495
1496/* drop_item only drops its ref after killing the thread, nothing should
1497 * be using the region anymore.  this has to clean up any state that
1498 * attributes might have built up. */
1499static void o2hb_region_release(struct config_item *item)
1500{
1501        int i;
1502        struct page *page;
1503        struct o2hb_region *reg = to_o2hb_region(item);
1504
1505        mlog(ML_HEARTBEAT, "hb region release (%s)\n", reg->hr_dev_name);
1506
1507        kfree(reg->hr_tmp_block);
1508
1509        if (reg->hr_slot_data) {
1510                for (i = 0; i < reg->hr_num_pages; i++) {
1511                        page = reg->hr_slot_data[i];
1512                        if (page)
1513                                __free_page(page);
1514                }
1515                kfree(reg->hr_slot_data);
1516        }
1517
1518        if (reg->hr_bdev)
1519                blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1520
1521        kfree(reg->hr_slots);
1522
1523        debugfs_remove_recursive(reg->hr_debug_dir);
1524        kfree(reg->hr_db_livenodes);
1525        kfree(reg->hr_db_regnum);
1526        kfree(reg->hr_db_elapsed_time);
1527        kfree(reg->hr_db_pinned);
1528
1529        spin_lock(&o2hb_live_lock);
1530        list_del(&reg->hr_all_item);
1531        spin_unlock(&o2hb_live_lock);
1532
1533        o2net_unregister_handler_list(&reg->hr_handler_list);
1534        kfree(reg);
1535}
1536
1537static int o2hb_read_block_input(struct o2hb_region *reg,
1538                                 const char *page,
1539                                 unsigned long *ret_bytes,
1540                                 unsigned int *ret_bits)
1541{
1542        unsigned long bytes;
1543        char *p = (char *)page;
1544
1545        bytes = simple_strtoul(p, &p, 0);
1546        if (!p || (*p && (*p != '\n')))
1547                return -EINVAL;
1548
1549        /* Heartbeat and fs min / max block sizes are the same. */
1550        if (bytes > 4096 || bytes < 512)
1551                return -ERANGE;
1552        if (hweight16(bytes) != 1)
1553                return -EINVAL;
1554
1555        if (ret_bytes)
1556                *ret_bytes = bytes;
1557        if (ret_bits)
1558                *ret_bits = ffs(bytes) - 1;
1559
1560        return 0;
1561}
1562
1563static ssize_t o2hb_region_block_bytes_show(struct config_item *item,
1564                                            char *page)
1565{
1566        return sprintf(page, "%u\n", to_o2hb_region(item)->hr_block_bytes);
1567}
1568
1569static ssize_t o2hb_region_block_bytes_store(struct config_item *item,
1570                                             const char *page,
1571                                             size_t count)
1572{
1573        struct o2hb_region *reg = to_o2hb_region(item);
1574        int status;
1575        unsigned long block_bytes;
1576        unsigned int block_bits;
1577
1578        if (reg->hr_bdev)
1579                return -EINVAL;
1580
1581        status = o2hb_read_block_input(reg, page, &block_bytes,
1582                                       &block_bits);
1583        if (status)
1584                return status;
1585
1586        reg->hr_block_bytes = (unsigned int)block_bytes;
1587        reg->hr_block_bits = block_bits;
1588
1589        return count;
1590}
1591
1592static ssize_t o2hb_region_start_block_show(struct config_item *item,
1593                                            char *page)
1594{
1595        return sprintf(page, "%llu\n", to_o2hb_region(item)->hr_start_block);
1596}
1597
1598static ssize_t o2hb_region_start_block_store(struct config_item *item,
1599                                             const char *page,
1600                                             size_t count)
1601{
1602        struct o2hb_region *reg = to_o2hb_region(item);
1603        unsigned long long tmp;
1604        char *p = (char *)page;
1605
1606        if (reg->hr_bdev)
1607                return -EINVAL;
1608
1609        tmp = simple_strtoull(p, &p, 0);
1610        if (!p || (*p && (*p != '\n')))
1611                return -EINVAL;
1612
1613        reg->hr_start_block = tmp;
1614
1615        return count;
1616}
1617
1618static ssize_t o2hb_region_blocks_show(struct config_item *item, char *page)
1619{
1620        return sprintf(page, "%d\n", to_o2hb_region(item)->hr_blocks);
1621}
1622
1623static ssize_t o2hb_region_blocks_store(struct config_item *item,
1624                                        const char *page,
1625                                        size_t count)
1626{
1627        struct o2hb_region *reg = to_o2hb_region(item);
1628        unsigned long tmp;
1629        char *p = (char *)page;
1630
1631        if (reg->hr_bdev)
1632                return -EINVAL;
1633
1634        tmp = simple_strtoul(p, &p, 0);
1635        if (!p || (*p && (*p != '\n')))
1636                return -EINVAL;
1637
1638        if (tmp > O2NM_MAX_NODES || tmp == 0)
1639                return -ERANGE;
1640
1641        reg->hr_blocks = (unsigned int)tmp;
1642
1643        return count;
1644}
1645
1646static ssize_t o2hb_region_dev_show(struct config_item *item, char *page)
1647{
1648        unsigned int ret = 0;
1649
1650        if (to_o2hb_region(item)->hr_bdev)
1651                ret = sprintf(page, "%s\n", to_o2hb_region(item)->hr_dev_name);
1652
1653        return ret;
1654}
1655
1656static void o2hb_init_region_params(struct o2hb_region *reg)
1657{
1658        reg->hr_slots_per_page = PAGE_SIZE >> reg->hr_block_bits;
1659        reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
1660
1661        mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
1662             reg->hr_start_block, reg->hr_blocks);
1663        mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
1664             reg->hr_block_bytes, reg->hr_block_bits);
1665        mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
1666        mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
1667}
1668
1669static int o2hb_map_slot_data(struct o2hb_region *reg)
1670{
1671        int i, j;
1672        unsigned int last_slot;
1673        unsigned int spp = reg->hr_slots_per_page;
1674        struct page *page;
1675        char *raw;
1676        struct o2hb_disk_slot *slot;
1677
1678        reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
1679        if (reg->hr_tmp_block == NULL)
1680                return -ENOMEM;
1681
1682        reg->hr_slots = kcalloc(reg->hr_blocks,
1683                                sizeof(struct o2hb_disk_slot), GFP_KERNEL);
1684        if (reg->hr_slots == NULL)
1685                return -ENOMEM;
1686
1687        for(i = 0; i < reg->hr_blocks; i++) {
1688                slot = &reg->hr_slots[i];
1689                slot->ds_node_num = i;
1690                INIT_LIST_HEAD(&slot->ds_live_item);
1691                slot->ds_raw_block = NULL;
1692        }
1693
1694        reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
1695        mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
1696                           "at %u blocks per page\n",
1697             reg->hr_num_pages, reg->hr_blocks, spp);
1698
1699        reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
1700                                    GFP_KERNEL);
1701        if (!reg->hr_slot_data)
1702                return -ENOMEM;
1703
1704        for(i = 0; i < reg->hr_num_pages; i++) {
1705                page = alloc_page(GFP_KERNEL);
1706                if (!page)
1707                        return -ENOMEM;
1708
1709                reg->hr_slot_data[i] = page;
1710
1711                last_slot = i * spp;
1712                raw = page_address(page);
1713                for (j = 0;
1714                     (j < spp) && ((j + last_slot) < reg->hr_blocks);
1715                     j++) {
1716                        BUG_ON((j + last_slot) >= reg->hr_blocks);
1717
1718                        slot = &reg->hr_slots[j + last_slot];
1719                        slot->ds_raw_block =
1720                                (struct o2hb_disk_heartbeat_block *) raw;
1721
1722                        raw += reg->hr_block_bytes;
1723                }
1724        }
1725
1726        return 0;
1727}
1728
1729/* Read in all the slots available and populate the tracking
1730 * structures so that we can start with a baseline idea of what's
1731 * there. */
1732static int o2hb_populate_slot_data(struct o2hb_region *reg)
1733{
1734        int ret, i;
1735        struct o2hb_disk_slot *slot;
1736        struct o2hb_disk_heartbeat_block *hb_block;
1737
1738        ret = o2hb_read_slots(reg, 0, reg->hr_blocks);
1739        if (ret)
1740                goto out;
1741
1742        /* We only want to get an idea of the values initially in each
1743         * slot, so we do no verification - o2hb_check_slot will
1744         * actually determine if each configured slot is valid and
1745         * whether any values have changed. */
1746        for(i = 0; i < reg->hr_blocks; i++) {
1747                slot = &reg->hr_slots[i];
1748                hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
1749
1750                /* Only fill the values that o2hb_check_slot uses to
1751                 * determine changing slots */
1752                slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
1753                slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
1754        }
1755
1756out:
1757        return ret;
1758}
1759
1760/* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
1761static ssize_t o2hb_region_dev_store(struct config_item *item,
1762                                     const char *page,
1763                                     size_t count)
1764{
1765        struct o2hb_region *reg = to_o2hb_region(item);
1766        struct task_struct *hb_task;
1767        long fd;
1768        int sectsize;
1769        char *p = (char *)page;
1770        struct fd f;
1771        struct inode *inode;
1772        ssize_t ret = -EINVAL;
1773        int live_threshold;
1774
1775        if (reg->hr_bdev)
1776                goto out;
1777
1778        /* We can't heartbeat without having had our node number
1779         * configured yet. */
1780        if (o2nm_this_node() == O2NM_MAX_NODES)
1781                goto out;
1782
1783        fd = simple_strtol(p, &p, 0);
1784        if (!p || (*p && (*p != '\n')))
1785                goto out;
1786
1787        if (fd < 0 || fd >= INT_MAX)
1788                goto out;
1789
1790        f = fdget(fd);
1791        if (f.file == NULL)
1792                goto out;
1793
1794        if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
1795            reg->hr_block_bytes == 0)
1796                goto out2;
1797
1798        inode = igrab(f.file->f_mapping->host);
1799        if (inode == NULL)
1800                goto out2;
1801
1802        if (!S_ISBLK(inode->i_mode))
1803                goto out3;
1804
1805        reg->hr_bdev = I_BDEV(f.file->f_mapping->host);
1806        ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, NULL);
1807        if (ret) {
1808                reg->hr_bdev = NULL;
1809                goto out3;
1810        }
1811        inode = NULL;
1812
1813        bdevname(reg->hr_bdev, reg->hr_dev_name);
1814
1815        sectsize = bdev_logical_block_size(reg->hr_bdev);
1816        if (sectsize != reg->hr_block_bytes) {
1817                mlog(ML_ERROR,
1818                     "blocksize %u incorrect for device, expected %d",
1819                     reg->hr_block_bytes, sectsize);
1820                ret = -EINVAL;
1821                goto out3;
1822        }
1823
1824        o2hb_init_region_params(reg);
1825
1826        /* Generation of zero is invalid */
1827        do {
1828                get_random_bytes(&reg->hr_generation,
1829                                 sizeof(reg->hr_generation));
1830        } while (reg->hr_generation == 0);
1831
1832        ret = o2hb_map_slot_data(reg);
1833        if (ret) {
1834                mlog_errno(ret);
1835                goto out3;
1836        }
1837
1838        ret = o2hb_populate_slot_data(reg);
1839        if (ret) {
1840                mlog_errno(ret);
1841                goto out3;
1842        }
1843
1844        INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
1845        INIT_DELAYED_WORK(&reg->hr_nego_timeout_work, o2hb_nego_timeout);
1846
1847        /*
1848         * A node is considered live after it has beat LIVE_THRESHOLD
1849         * times.  We're not steady until we've given them a chance
1850         * _after_ our first read.
1851         * The default threshold is bare minimum so as to limit the delay
1852         * during mounts. For global heartbeat, the threshold doubled for the
1853         * first region.
1854         */
1855        live_threshold = O2HB_LIVE_THRESHOLD;
1856        if (o2hb_global_heartbeat_active()) {
1857                spin_lock(&o2hb_live_lock);
1858                if (bitmap_weight(o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1)
1859                        live_threshold <<= 1;
1860                spin_unlock(&o2hb_live_lock);
1861        }
1862        ++live_threshold;
1863        atomic_set(&reg->hr_steady_iterations, live_threshold);
1864        /* unsteady_iterations is triple the steady_iterations */
1865        atomic_set(&reg->hr_unsteady_iterations, (live_threshold * 3));
1866
1867        hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
1868                              reg->hr_item.ci_name);
1869        if (IS_ERR(hb_task)) {
1870                ret = PTR_ERR(hb_task);
1871                mlog_errno(ret);
1872                goto out3;
1873        }
1874
1875        spin_lock(&o2hb_live_lock);
1876        reg->hr_task = hb_task;
1877        spin_unlock(&o2hb_live_lock);
1878
1879        ret = wait_event_interruptible(o2hb_steady_queue,
1880                                atomic_read(&reg->hr_steady_iterations) == 0 ||
1881                                reg->hr_node_deleted);
1882        if (ret) {
1883                atomic_set(&reg->hr_steady_iterations, 0);
1884                reg->hr_aborted_start = 1;
1885        }
1886
1887        if (reg->hr_aborted_start) {
1888                ret = -EIO;
1889                goto out3;
1890        }
1891
1892        if (reg->hr_node_deleted) {
1893                ret = -EINVAL;
1894                goto out3;
1895        }
1896
1897        /* Ok, we were woken.  Make sure it wasn't by drop_item() */
1898        spin_lock(&o2hb_live_lock);
1899        hb_task = reg->hr_task;
1900        if (o2hb_global_heartbeat_active())
1901                set_bit(reg->hr_region_num, o2hb_live_region_bitmap);
1902        spin_unlock(&o2hb_live_lock);
1903
1904        if (hb_task)
1905                ret = count;
1906        else
1907                ret = -EIO;
1908
1909        if (hb_task && o2hb_global_heartbeat_active())
1910                printk(KERN_NOTICE "o2hb: Heartbeat started on region %s (%s)\n",
1911                       config_item_name(&reg->hr_item), reg->hr_dev_name);
1912
1913out3:
1914        iput(inode);
1915out2:
1916        fdput(f);
1917out:
1918        if (ret < 0) {
1919                if (reg->hr_bdev) {
1920                        blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1921                        reg->hr_bdev = NULL;
1922                }
1923        }
1924        return ret;
1925}
1926
1927static ssize_t o2hb_region_pid_show(struct config_item *item, char *page)
1928{
1929        struct o2hb_region *reg = to_o2hb_region(item);
1930        pid_t pid = 0;
1931
1932        spin_lock(&o2hb_live_lock);
1933        if (reg->hr_task)
1934                pid = task_pid_nr(reg->hr_task);
1935        spin_unlock(&o2hb_live_lock);
1936
1937        if (!pid)
1938                return 0;
1939
1940        return sprintf(page, "%u\n", pid);
1941}
1942
1943CONFIGFS_ATTR(o2hb_region_, block_bytes);
1944CONFIGFS_ATTR(o2hb_region_, start_block);
1945CONFIGFS_ATTR(o2hb_region_, blocks);
1946CONFIGFS_ATTR(o2hb_region_, dev);
1947CONFIGFS_ATTR_RO(o2hb_region_, pid);
1948
1949static struct configfs_attribute *o2hb_region_attrs[] = {
1950        &o2hb_region_attr_block_bytes,
1951        &o2hb_region_attr_start_block,
1952        &o2hb_region_attr_blocks,
1953        &o2hb_region_attr_dev,
1954        &o2hb_region_attr_pid,
1955        NULL,
1956};
1957
1958static struct configfs_item_operations o2hb_region_item_ops = {
1959        .release                = o2hb_region_release,
1960};
1961
1962static const struct config_item_type o2hb_region_type = {
1963        .ct_item_ops    = &o2hb_region_item_ops,
1964        .ct_attrs       = o2hb_region_attrs,
1965        .ct_owner       = THIS_MODULE,
1966};
1967
1968/* heartbeat set */
1969
1970struct o2hb_heartbeat_group {
1971        struct config_group hs_group;
1972        /* some stuff? */
1973};
1974
1975static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
1976{
1977        return group ?
1978                container_of(group, struct o2hb_heartbeat_group, hs_group)
1979                : NULL;
1980}
1981
1982static void o2hb_debug_region_init(struct o2hb_region *reg,
1983                                   struct dentry *parent)
1984{
1985        struct dentry *dir;
1986
1987        dir = debugfs_create_dir(config_item_name(&reg->hr_item), parent);
1988        reg->hr_debug_dir = dir;
1989
1990        o2hb_debug_create(O2HB_DEBUG_LIVENODES, dir, &(reg->hr_db_livenodes),
1991                          sizeof(*(reg->hr_db_livenodes)),
1992                          O2HB_DB_TYPE_REGION_LIVENODES,
1993                          sizeof(reg->hr_live_node_bitmap), O2NM_MAX_NODES,
1994                          reg);
1995
1996        o2hb_debug_create(O2HB_DEBUG_REGION_NUMBER, dir, &(reg->hr_db_regnum),
1997                          sizeof(*(reg->hr_db_regnum)),
1998                          O2HB_DB_TYPE_REGION_NUMBER, 0, O2NM_MAX_NODES, reg);
1999
2000        o2hb_debug_create(O2HB_DEBUG_REGION_ELAPSED_TIME, dir,
2001                          &(reg->hr_db_elapsed_time),
2002                          sizeof(*(reg->hr_db_elapsed_time)),
2003                          O2HB_DB_TYPE_REGION_ELAPSED_TIME, 0, 0, reg);
2004
2005        o2hb_debug_create(O2HB_DEBUG_REGION_PINNED, dir, &(reg->hr_db_pinned),
2006                          sizeof(*(reg->hr_db_pinned)),
2007                          O2HB_DB_TYPE_REGION_PINNED, 0, 0, reg);
2008
2009}
2010
2011static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
2012                                                          const char *name)
2013{
2014        struct o2hb_region *reg = NULL;
2015        int ret;
2016
2017        reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
2018        if (reg == NULL)
2019                return ERR_PTR(-ENOMEM);
2020
2021        if (strlen(name) > O2HB_MAX_REGION_NAME_LEN) {
2022                ret = -ENAMETOOLONG;
2023                goto free;
2024        }
2025
2026        spin_lock(&o2hb_live_lock);
2027        reg->hr_region_num = 0;
2028        if (o2hb_global_heartbeat_active()) {
2029                reg->hr_region_num = find_first_zero_bit(o2hb_region_bitmap,
2030                                                         O2NM_MAX_REGIONS);
2031                if (reg->hr_region_num >= O2NM_MAX_REGIONS) {
2032                        spin_unlock(&o2hb_live_lock);
2033                        ret = -EFBIG;
2034                        goto free;
2035                }
2036                set_bit(reg->hr_region_num, o2hb_region_bitmap);
2037        }
2038        list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
2039        spin_unlock(&o2hb_live_lock);
2040
2041        config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
2042
2043        /* this is the same way to generate msg key as dlm, for local heartbeat,
2044         * name is also the same, so make initial crc value different to avoid
2045         * message key conflict.
2046         */
2047        reg->hr_key = crc32_le(reg->hr_region_num + O2NM_MAX_REGIONS,
2048                name, strlen(name));
2049        INIT_LIST_HEAD(&reg->hr_handler_list);
2050        ret = o2net_register_handler(O2HB_NEGO_TIMEOUT_MSG, reg->hr_key,
2051                        sizeof(struct o2hb_nego_msg),
2052                        o2hb_nego_timeout_handler,
2053                        reg, NULL, &reg->hr_handler_list);
2054        if (ret)
2055                goto free;
2056
2057        ret = o2net_register_handler(O2HB_NEGO_APPROVE_MSG, reg->hr_key,
2058                        sizeof(struct o2hb_nego_msg),
2059                        o2hb_nego_approve_handler,
2060                        reg, NULL, &reg->hr_handler_list);
2061        if (ret)
2062                goto unregister_handler;
2063
2064        o2hb_debug_region_init(reg, o2hb_debug_dir);
2065
2066        return &reg->hr_item;
2067
2068unregister_handler:
2069        o2net_unregister_handler_list(&reg->hr_handler_list);
2070free:
2071        kfree(reg);
2072        return ERR_PTR(ret);
2073}
2074
2075static void o2hb_heartbeat_group_drop_item(struct config_group *group,
2076                                           struct config_item *item)
2077{
2078        struct task_struct *hb_task;
2079        struct o2hb_region *reg = to_o2hb_region(item);
2080        int quorum_region = 0;
2081
2082        /* stop the thread when the user removes the region dir */
2083        spin_lock(&o2hb_live_lock);
2084        hb_task = reg->hr_task;
2085        reg->hr_task = NULL;
2086        reg->hr_item_dropped = 1;
2087        spin_unlock(&o2hb_live_lock);
2088
2089        if (hb_task)
2090                kthread_stop(hb_task);
2091
2092        if (o2hb_global_heartbeat_active()) {
2093                spin_lock(&o2hb_live_lock);
2094                clear_bit(reg->hr_region_num, o2hb_region_bitmap);
2095                clear_bit(reg->hr_region_num, o2hb_live_region_bitmap);
2096                if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
2097                        quorum_region = 1;
2098                clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
2099                spin_unlock(&o2hb_live_lock);
2100                printk(KERN_NOTICE "o2hb: Heartbeat %s on region %s (%s)\n",
2101                       ((atomic_read(&reg->hr_steady_iterations) == 0) ?
2102                        "stopped" : "start aborted"), config_item_name(item),
2103                       reg->hr_dev_name);
2104        }
2105
2106        /*
2107         * If we're racing a dev_write(), we need to wake them.  They will
2108         * check reg->hr_task
2109         */
2110        if (atomic_read(&reg->hr_steady_iterations) != 0) {
2111                reg->hr_aborted_start = 1;
2112                atomic_set(&reg->hr_steady_iterations, 0);
2113                wake_up(&o2hb_steady_queue);
2114        }
2115
2116        config_item_put(item);
2117
2118        if (!o2hb_global_heartbeat_active() || !quorum_region)
2119                return;
2120
2121        /*
2122         * If global heartbeat active and there are dependent users,
2123         * pin all regions if quorum region count <= CUT_OFF
2124         */
2125        spin_lock(&o2hb_live_lock);
2126
2127        if (!o2hb_dependent_users)
2128                goto unlock;
2129
2130        if (bitmap_weight(o2hb_quorum_region_bitmap,
2131                           O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2132                o2hb_region_pin(NULL);
2133
2134unlock:
2135        spin_unlock(&o2hb_live_lock);
2136}
2137
2138static ssize_t o2hb_heartbeat_group_dead_threshold_show(struct config_item *item,
2139                char *page)
2140{
2141        return sprintf(page, "%u\n", o2hb_dead_threshold);
2142}
2143
2144static ssize_t o2hb_heartbeat_group_dead_threshold_store(struct config_item *item,
2145                const char *page, size_t count)
2146{
2147        unsigned long tmp;
2148        char *p = (char *)page;
2149
2150        tmp = simple_strtoul(p, &p, 10);
2151        if (!p || (*p && (*p != '\n')))
2152                return -EINVAL;
2153
2154        /* this will validate ranges for us. */
2155        o2hb_dead_threshold_set((unsigned int) tmp);
2156
2157        return count;
2158}
2159
2160static ssize_t o2hb_heartbeat_group_mode_show(struct config_item *item,
2161                char *page)
2162{
2163        return sprintf(page, "%s\n",
2164                       o2hb_heartbeat_mode_desc[o2hb_heartbeat_mode]);
2165}
2166
2167static ssize_t o2hb_heartbeat_group_mode_store(struct config_item *item,
2168                const char *page, size_t count)
2169{
2170        unsigned int i;
2171        int ret;
2172        size_t len;
2173
2174        len = (page[count - 1] == '\n') ? count - 1 : count;
2175        if (!len)
2176                return -EINVAL;
2177
2178        for (i = 0; i < O2HB_HEARTBEAT_NUM_MODES; ++i) {
2179                if (strncasecmp(page, o2hb_heartbeat_mode_desc[i], len))
2180                        continue;
2181
2182                ret = o2hb_global_heartbeat_mode_set(i);
2183                if (!ret)
2184                        printk(KERN_NOTICE "o2hb: Heartbeat mode set to %s\n",
2185                               o2hb_heartbeat_mode_desc[i]);
2186                return count;
2187        }
2188
2189        return -EINVAL;
2190
2191}
2192
2193CONFIGFS_ATTR(o2hb_heartbeat_group_, dead_threshold);
2194CONFIGFS_ATTR(o2hb_heartbeat_group_, mode);
2195
2196static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
2197        &o2hb_heartbeat_group_attr_dead_threshold,
2198        &o2hb_heartbeat_group_attr_mode,
2199        NULL,
2200};
2201
2202static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
2203        .make_item      = o2hb_heartbeat_group_make_item,
2204        .drop_item      = o2hb_heartbeat_group_drop_item,
2205};
2206
2207static const struct config_item_type o2hb_heartbeat_group_type = {
2208        .ct_group_ops   = &o2hb_heartbeat_group_group_ops,
2209        .ct_attrs       = o2hb_heartbeat_group_attrs,
2210        .ct_owner       = THIS_MODULE,
2211};
2212
2213/* this is just here to avoid touching group in heartbeat.h which the
2214 * entire damn world #includes */
2215struct config_group *o2hb_alloc_hb_set(void)
2216{
2217        struct o2hb_heartbeat_group *hs = NULL;
2218        struct config_group *ret = NULL;
2219
2220        hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
2221        if (hs == NULL)
2222                goto out;
2223
2224        config_group_init_type_name(&hs->hs_group, "heartbeat",
2225                                    &o2hb_heartbeat_group_type);
2226
2227        ret = &hs->hs_group;
2228out:
2229        if (ret == NULL)
2230                kfree(hs);
2231        return ret;
2232}
2233
2234void o2hb_free_hb_set(struct config_group *group)
2235{
2236        struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
2237        kfree(hs);
2238}
2239
2240/* hb callback registration and issuing */
2241
2242static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
2243{
2244        if (type == O2HB_NUM_CB)
2245                return ERR_PTR(-EINVAL);
2246
2247        return &o2hb_callbacks[type];
2248}
2249
2250void o2hb_setup_callback(struct o2hb_callback_func *hc,
2251                         enum o2hb_callback_type type,
2252                         o2hb_cb_func *func,
2253                         void *data,
2254                         int priority)
2255{
2256        INIT_LIST_HEAD(&hc->hc_item);
2257        hc->hc_func = func;
2258        hc->hc_data = data;
2259        hc->hc_priority = priority;
2260        hc->hc_type = type;
2261        hc->hc_magic = O2HB_CB_MAGIC;
2262}
2263EXPORT_SYMBOL_GPL(o2hb_setup_callback);
2264
2265/*
2266 * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2267 * In global heartbeat mode, region_uuid passed is NULL.
2268 *
2269 * In local, we only pin the matching region. In global we pin all the active
2270 * regions.
2271 */
2272static int o2hb_region_pin(const char *region_uuid)
2273{
2274        int ret = 0, found = 0;
2275        struct o2hb_region *reg;
2276        char *uuid;
2277
2278        assert_spin_locked(&o2hb_live_lock);
2279
2280        list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2281                if (reg->hr_item_dropped)
2282                        continue;
2283
2284                uuid = config_item_name(&reg->hr_item);
2285
2286                /* local heartbeat */
2287                if (region_uuid) {
2288                        if (strcmp(region_uuid, uuid))
2289                                continue;
2290                        found = 1;
2291                }
2292
2293                if (reg->hr_item_pinned || reg->hr_item_dropped)
2294                        goto skip_pin;
2295
2296                /* Ignore ENOENT only for local hb (userdlm domain) */
2297                ret = o2nm_depend_item(&reg->hr_item);
2298                if (!ret) {
2299                        mlog(ML_CLUSTER, "Pin region %s\n", uuid);
2300                        reg->hr_item_pinned = 1;
2301                } else {
2302                        if (ret == -ENOENT && found)
2303                                ret = 0;
2304                        else {
2305                                mlog(ML_ERROR, "Pin region %s fails with %d\n",
2306                                     uuid, ret);
2307                                break;
2308                        }
2309                }
2310skip_pin:
2311                if (found)
2312                        break;
2313        }
2314
2315        return ret;
2316}
2317
2318/*
2319 * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2320 * In global heartbeat mode, region_uuid passed is NULL.
2321 *
2322 * In local, we only unpin the matching region. In global we unpin all the
2323 * active regions.
2324 */
2325static void o2hb_region_unpin(const char *region_uuid)
2326{
2327        struct o2hb_region *reg;
2328        char *uuid;
2329        int found = 0;
2330
2331        assert_spin_locked(&o2hb_live_lock);
2332
2333        list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2334                if (reg->hr_item_dropped)
2335                        continue;
2336
2337                uuid = config_item_name(&reg->hr_item);
2338                if (region_uuid) {
2339                        if (strcmp(region_uuid, uuid))
2340                                continue;
2341                        found = 1;
2342                }
2343
2344                if (reg->hr_item_pinned) {
2345                        mlog(ML_CLUSTER, "Unpin region %s\n", uuid);
2346                        o2nm_undepend_item(&reg->hr_item);
2347                        reg->hr_item_pinned = 0;
2348                }
2349                if (found)
2350                        break;
2351        }
2352}
2353
2354static int o2hb_region_inc_user(const char *region_uuid)
2355{
2356        int ret = 0;
2357
2358        spin_lock(&o2hb_live_lock);
2359
2360        /* local heartbeat */
2361        if (!o2hb_global_heartbeat_active()) {
2362            ret = o2hb_region_pin(region_uuid);
2363            goto unlock;
2364        }
2365
2366        /*
2367         * if global heartbeat active and this is the first dependent user,
2368         * pin all regions if quorum region count <= CUT_OFF
2369         */
2370        o2hb_dependent_users++;
2371        if (o2hb_dependent_users > 1)
2372                goto unlock;
2373
2374        if (bitmap_weight(o2hb_quorum_region_bitmap,
2375                           O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2376                ret = o2hb_region_pin(NULL);
2377
2378unlock:
2379        spin_unlock(&o2hb_live_lock);
2380        return ret;
2381}
2382
2383static void o2hb_region_dec_user(const char *region_uuid)
2384{
2385        spin_lock(&o2hb_live_lock);
2386
2387        /* local heartbeat */
2388        if (!o2hb_global_heartbeat_active()) {
2389            o2hb_region_unpin(region_uuid);
2390            goto unlock;
2391        }
2392
2393        /*
2394         * if global heartbeat active and there are no dependent users,
2395         * unpin all quorum regions
2396         */
2397        o2hb_dependent_users--;
2398        if (!o2hb_dependent_users)
2399                o2hb_region_unpin(NULL);
2400
2401unlock:
2402        spin_unlock(&o2hb_live_lock);
2403}
2404
2405int o2hb_register_callback(const char *region_uuid,
2406                           struct o2hb_callback_func *hc)
2407{
2408        struct o2hb_callback_func *f;
2409        struct o2hb_callback *hbcall;
2410        int ret;
2411
2412        BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2413        BUG_ON(!list_empty(&hc->hc_item));
2414
2415        hbcall = hbcall_from_type(hc->hc_type);
2416        if (IS_ERR(hbcall)) {
2417                ret = PTR_ERR(hbcall);
2418                goto out;
2419        }
2420
2421        if (region_uuid) {
2422                ret = o2hb_region_inc_user(region_uuid);
2423                if (ret) {
2424                        mlog_errno(ret);
2425                        goto out;
2426                }
2427        }
2428
2429        down_write(&o2hb_callback_sem);
2430
2431        list_for_each_entry(f, &hbcall->list, hc_item) {
2432                if (hc->hc_priority < f->hc_priority) {
2433                        list_add_tail(&hc->hc_item, &f->hc_item);
2434                        break;
2435                }
2436        }
2437        if (list_empty(&hc->hc_item))
2438                list_add_tail(&hc->hc_item, &hbcall->list);
2439
2440        up_write(&o2hb_callback_sem);
2441        ret = 0;
2442out:
2443        mlog(ML_CLUSTER, "returning %d on behalf of %p for funcs %p\n",
2444             ret, __builtin_return_address(0), hc);
2445        return ret;
2446}
2447EXPORT_SYMBOL_GPL(o2hb_register_callback);
2448
2449void o2hb_unregister_callback(const char *region_uuid,
2450                              struct o2hb_callback_func *hc)
2451{
2452        BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2453
2454        mlog(ML_CLUSTER, "on behalf of %p for funcs %p\n",
2455             __builtin_return_address(0), hc);
2456
2457        /* XXX Can this happen _with_ a region reference? */
2458        if (list_empty(&hc->hc_item))
2459                return;
2460
2461        if (region_uuid)
2462                o2hb_region_dec_user(region_uuid);
2463
2464        down_write(&o2hb_callback_sem);
2465
2466        list_del_init(&hc->hc_item);
2467
2468        up_write(&o2hb_callback_sem);
2469}
2470EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
2471
2472int o2hb_check_node_heartbeating_no_sem(u8 node_num)
2473{
2474        unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2475
2476        spin_lock(&o2hb_live_lock);
2477        o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
2478        spin_unlock(&o2hb_live_lock);
2479        if (!test_bit(node_num, testing_map)) {
2480                mlog(ML_HEARTBEAT,
2481                     "node (%u) does not have heartbeating enabled.\n",
2482                     node_num);
2483                return 0;
2484        }
2485
2486        return 1;
2487}
2488EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_no_sem);
2489
2490int o2hb_check_node_heartbeating_from_callback(u8 node_num)
2491{
2492        unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2493
2494        o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
2495        if (!test_bit(node_num, testing_map)) {
2496                mlog(ML_HEARTBEAT,
2497                     "node (%u) does not have heartbeating enabled.\n",
2498                     node_num);
2499                return 0;
2500        }
2501
2502        return 1;
2503}
2504EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
2505
2506/*
2507 * this is just a hack until we get the plumbing which flips file systems
2508 * read only and drops the hb ref instead of killing the node dead.
2509 */
2510void o2hb_stop_all_regions(void)
2511{
2512        struct o2hb_region *reg;
2513
2514        mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
2515
2516        spin_lock(&o2hb_live_lock);
2517
2518        list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
2519                reg->hr_unclean_stop = 1;
2520
2521        spin_unlock(&o2hb_live_lock);
2522}
2523EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
2524
2525int o2hb_get_all_regions(char *region_uuids, u8 max_regions)
2526{
2527        struct o2hb_region *reg;
2528        int numregs = 0;
2529        char *p;
2530
2531        spin_lock(&o2hb_live_lock);
2532
2533        p = region_uuids;
2534        list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2535                if (reg->hr_item_dropped)
2536                        continue;
2537
2538                mlog(0, "Region: %s\n", config_item_name(&reg->hr_item));
2539                if (numregs < max_regions) {
2540                        memcpy(p, config_item_name(&reg->hr_item),
2541                               O2HB_MAX_REGION_NAME_LEN);
2542                        p += O2HB_MAX_REGION_NAME_LEN;
2543                }
2544                numregs++;
2545        }
2546
2547        spin_unlock(&o2hb_live_lock);
2548
2549        return numregs;
2550}
2551EXPORT_SYMBOL_GPL(o2hb_get_all_regions);
2552
2553int o2hb_global_heartbeat_active(void)
2554{
2555        return (o2hb_heartbeat_mode == O2HB_HEARTBEAT_GLOBAL);
2556}
2557EXPORT_SYMBOL(o2hb_global_heartbeat_active);
2558