linux/fs/ocfs2/cluster/heartbeat.c
<<
>>
Prefs
   1/* -*- mode: c; c-basic-offset: 8; -*-
   2 * vim: noexpandtab sw=8 ts=8 sts=0:
   3 *
   4 * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public
   8 * License as published by the Free Software Foundation; either
   9 * version 2 of the License, or (at your option) any later version.
  10 *
  11 * This program is distributed in the hope that it will be useful,
  12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * General Public License for more details.
  15 *
  16 * You should have received a copy of the GNU General Public
  17 * License along with this program; if not, write to the
  18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  19 * Boston, MA 021110-1307, USA.
  20 */
  21
  22#include <linux/kernel.h>
  23#include <linux/sched.h>
  24#include <linux/jiffies.h>
  25#include <linux/module.h>
  26#include <linux/fs.h>
  27#include <linux/bio.h>
  28#include <linux/blkdev.h>
  29#include <linux/delay.h>
  30#include <linux/file.h>
  31#include <linux/kthread.h>
  32#include <linux/configfs.h>
  33#include <linux/random.h>
  34#include <linux/crc32.h>
  35#include <linux/time.h>
  36#include <linux/debugfs.h>
  37#include <linux/slab.h>
  38
  39#include "heartbeat.h"
  40#include "tcp.h"
  41#include "nodemanager.h"
  42#include "quorum.h"
  43
  44#include "masklog.h"
  45
  46
  47/*
  48 * The first heartbeat pass had one global thread that would serialize all hb
  49 * callback calls.  This global serializing sem should only be removed once
  50 * we've made sure that all callees can deal with being called concurrently
  51 * from multiple hb region threads.
  52 */
  53static DECLARE_RWSEM(o2hb_callback_sem);
  54
  55/*
  56 * multiple hb threads are watching multiple regions.  A node is live
  57 * whenever any of the threads sees activity from the node in its region.
  58 */
  59static DEFINE_SPINLOCK(o2hb_live_lock);
  60static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
  61static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
  62static LIST_HEAD(o2hb_node_events);
  63static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
  64
  65/*
  66 * In global heartbeat, we maintain a series of region bitmaps.
  67 *      - o2hb_region_bitmap allows us to limit the region number to max region.
  68 *      - o2hb_live_region_bitmap tracks live regions (seen steady iterations).
  69 *      - o2hb_quorum_region_bitmap tracks live regions that have seen all nodes
  70 *              heartbeat on it.
  71 *      - o2hb_failed_region_bitmap tracks the regions that have seen io timeouts.
  72 */
  73static unsigned long o2hb_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
  74static unsigned long o2hb_live_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
  75static unsigned long o2hb_quorum_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
  76static unsigned long o2hb_failed_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
  77
  78#define O2HB_DB_TYPE_LIVENODES          0
  79#define O2HB_DB_TYPE_LIVEREGIONS        1
  80#define O2HB_DB_TYPE_QUORUMREGIONS      2
  81#define O2HB_DB_TYPE_FAILEDREGIONS      3
  82#define O2HB_DB_TYPE_REGION_LIVENODES   4
  83#define O2HB_DB_TYPE_REGION_NUMBER      5
  84#define O2HB_DB_TYPE_REGION_ELAPSED_TIME        6
  85#define O2HB_DB_TYPE_REGION_PINNED      7
  86struct o2hb_debug_buf {
  87        int db_type;
  88        int db_size;
  89        int db_len;
  90        void *db_data;
  91};
  92
  93static struct o2hb_debug_buf *o2hb_db_livenodes;
  94static struct o2hb_debug_buf *o2hb_db_liveregions;
  95static struct o2hb_debug_buf *o2hb_db_quorumregions;
  96static struct o2hb_debug_buf *o2hb_db_failedregions;
  97
  98#define O2HB_DEBUG_DIR                  "o2hb"
  99#define O2HB_DEBUG_LIVENODES            "livenodes"
 100#define O2HB_DEBUG_LIVEREGIONS          "live_regions"
 101#define O2HB_DEBUG_QUORUMREGIONS        "quorum_regions"
 102#define O2HB_DEBUG_FAILEDREGIONS        "failed_regions"
 103#define O2HB_DEBUG_REGION_NUMBER        "num"
 104#define O2HB_DEBUG_REGION_ELAPSED_TIME  "elapsed_time_in_ms"
 105#define O2HB_DEBUG_REGION_PINNED        "pinned"
 106
 107static struct dentry *o2hb_debug_dir;
 108static struct dentry *o2hb_debug_livenodes;
 109static struct dentry *o2hb_debug_liveregions;
 110static struct dentry *o2hb_debug_quorumregions;
 111static struct dentry *o2hb_debug_failedregions;
 112
 113static LIST_HEAD(o2hb_all_regions);
 114
 115static struct o2hb_callback {
 116        struct list_head list;
 117} o2hb_callbacks[O2HB_NUM_CB];
 118
 119static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
 120
 121#define O2HB_DEFAULT_BLOCK_BITS       9
 122
 123enum o2hb_heartbeat_modes {
 124        O2HB_HEARTBEAT_LOCAL            = 0,
 125        O2HB_HEARTBEAT_GLOBAL,
 126        O2HB_HEARTBEAT_NUM_MODES,
 127};
 128
 129char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = {
 130                "local",        /* O2HB_HEARTBEAT_LOCAL */
 131                "global",       /* O2HB_HEARTBEAT_GLOBAL */
 132};
 133
 134unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
 135unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL;
 136
 137/*
 138 * o2hb_dependent_users tracks the number of registered callbacks that depend
 139 * on heartbeat. o2net and o2dlm are two entities that register this callback.
 140 * However only o2dlm depends on the heartbeat. It does not want the heartbeat
 141 * to stop while a dlm domain is still active.
 142 */
 143unsigned int o2hb_dependent_users;
 144
 145/*
 146 * In global heartbeat mode, all regions are pinned if there are one or more
 147 * dependent users and the quorum region count is <= O2HB_PIN_CUT_OFF. All
 148 * regions are unpinned if the region count exceeds the cut off or the number
 149 * of dependent users falls to zero.
 150 */
 151#define O2HB_PIN_CUT_OFF                3
 152
 153/*
 154 * In local heartbeat mode, we assume the dlm domain name to be the same as
 155 * region uuid. This is true for domains created for the file system but not
 156 * necessarily true for userdlm domains. This is a known limitation.
 157 *
 158 * In global heartbeat mode, we pin/unpin all o2hb regions. This solution
 159 * works for both file system and userdlm domains.
 160 */
 161static int o2hb_region_pin(const char *region_uuid);
 162static void o2hb_region_unpin(const char *region_uuid);
 163
 164/* Only sets a new threshold if there are no active regions.
 165 *
 166 * No locking or otherwise interesting code is required for reading
 167 * o2hb_dead_threshold as it can't change once regions are active and
 168 * it's not interesting to anyone until then anyway. */
 169static void o2hb_dead_threshold_set(unsigned int threshold)
 170{
 171        if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
 172                spin_lock(&o2hb_live_lock);
 173                if (list_empty(&o2hb_all_regions))
 174                        o2hb_dead_threshold = threshold;
 175                spin_unlock(&o2hb_live_lock);
 176        }
 177}
 178
 179static int o2hb_global_hearbeat_mode_set(unsigned int hb_mode)
 180{
 181        int ret = -1;
 182
 183        if (hb_mode < O2HB_HEARTBEAT_NUM_MODES) {
 184                spin_lock(&o2hb_live_lock);
 185                if (list_empty(&o2hb_all_regions)) {
 186                        o2hb_heartbeat_mode = hb_mode;
 187                        ret = 0;
 188                }
 189                spin_unlock(&o2hb_live_lock);
 190        }
 191
 192        return ret;
 193}
 194
 195struct o2hb_node_event {
 196        struct list_head        hn_item;
 197        enum o2hb_callback_type hn_event_type;
 198        struct o2nm_node        *hn_node;
 199        int                     hn_node_num;
 200};
 201
 202struct o2hb_disk_slot {
 203        struct o2hb_disk_heartbeat_block *ds_raw_block;
 204        u8                      ds_node_num;
 205        u64                     ds_last_time;
 206        u64                     ds_last_generation;
 207        u16                     ds_equal_samples;
 208        u16                     ds_changed_samples;
 209        struct list_head        ds_live_item;
 210};
 211
 212/* each thread owns a region.. when we're asked to tear down the region
 213 * we ask the thread to stop, who cleans up the region */
 214struct o2hb_region {
 215        struct config_item      hr_item;
 216
 217        struct list_head        hr_all_item;
 218        unsigned                hr_unclean_stop:1,
 219                                hr_item_pinned:1,
 220                                hr_item_dropped:1;
 221
 222        /* protected by the hr_callback_sem */
 223        struct task_struct      *hr_task;
 224
 225        unsigned int            hr_blocks;
 226        unsigned long long      hr_start_block;
 227
 228        unsigned int            hr_block_bits;
 229        unsigned int            hr_block_bytes;
 230
 231        unsigned int            hr_slots_per_page;
 232        unsigned int            hr_num_pages;
 233
 234        struct page             **hr_slot_data;
 235        struct block_device     *hr_bdev;
 236        struct o2hb_disk_slot   *hr_slots;
 237
 238        /* live node map of this region */
 239        unsigned long           hr_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 240        unsigned int            hr_region_num;
 241
 242        struct dentry           *hr_debug_dir;
 243        struct dentry           *hr_debug_livenodes;
 244        struct dentry           *hr_debug_regnum;
 245        struct dentry           *hr_debug_elapsed_time;
 246        struct dentry           *hr_debug_pinned;
 247        struct o2hb_debug_buf   *hr_db_livenodes;
 248        struct o2hb_debug_buf   *hr_db_regnum;
 249        struct o2hb_debug_buf   *hr_db_elapsed_time;
 250        struct o2hb_debug_buf   *hr_db_pinned;
 251
 252        /* let the person setting up hb wait for it to return until it
 253         * has reached a 'steady' state.  This will be fixed when we have
 254         * a more complete api that doesn't lead to this sort of fragility. */
 255        atomic_t                hr_steady_iterations;
 256
 257        char                    hr_dev_name[BDEVNAME_SIZE];
 258
 259        unsigned int            hr_timeout_ms;
 260
 261        /* randomized as the region goes up and down so that a node
 262         * recognizes a node going up and down in one iteration */
 263        u64                     hr_generation;
 264
 265        struct delayed_work     hr_write_timeout_work;
 266        unsigned long           hr_last_timeout_start;
 267
 268        /* Used during o2hb_check_slot to hold a copy of the block
 269         * being checked because we temporarily have to zero out the
 270         * crc field. */
 271        struct o2hb_disk_heartbeat_block *hr_tmp_block;
 272};
 273
 274struct o2hb_bio_wait_ctxt {
 275        atomic_t          wc_num_reqs;
 276        struct completion wc_io_complete;
 277        int               wc_error;
 278};
 279
 280static int o2hb_pop_count(void *map, int count)
 281{
 282        int i = -1, pop = 0;
 283
 284        while ((i = find_next_bit(map, count, i + 1)) < count)
 285                pop++;
 286        return pop;
 287}
 288
 289static void o2hb_write_timeout(struct work_struct *work)
 290{
 291        int failed, quorum;
 292        unsigned long flags;
 293        struct o2hb_region *reg =
 294                container_of(work, struct o2hb_region,
 295                             hr_write_timeout_work.work);
 296
 297        mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
 298             "milliseconds\n", reg->hr_dev_name,
 299             jiffies_to_msecs(jiffies - reg->hr_last_timeout_start));
 300
 301        if (o2hb_global_heartbeat_active()) {
 302                spin_lock_irqsave(&o2hb_live_lock, flags);
 303                if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
 304                        set_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
 305                failed = o2hb_pop_count(&o2hb_failed_region_bitmap,
 306                                        O2NM_MAX_REGIONS);
 307                quorum = o2hb_pop_count(&o2hb_quorum_region_bitmap,
 308                                        O2NM_MAX_REGIONS);
 309                spin_unlock_irqrestore(&o2hb_live_lock, flags);
 310
 311                mlog(ML_HEARTBEAT, "Number of regions %d, failed regions %d\n",
 312                     quorum, failed);
 313
 314                /*
 315                 * Fence if the number of failed regions >= half the number
 316                 * of  quorum regions
 317                 */
 318                if ((failed << 1) < quorum)
 319                        return;
 320        }
 321
 322        o2quo_disk_timeout();
 323}
 324
 325static void o2hb_arm_write_timeout(struct o2hb_region *reg)
 326{
 327        mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n",
 328             O2HB_MAX_WRITE_TIMEOUT_MS);
 329
 330        if (o2hb_global_heartbeat_active()) {
 331                spin_lock(&o2hb_live_lock);
 332                clear_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
 333                spin_unlock(&o2hb_live_lock);
 334        }
 335        cancel_delayed_work(&reg->hr_write_timeout_work);
 336        reg->hr_last_timeout_start = jiffies;
 337        schedule_delayed_work(&reg->hr_write_timeout_work,
 338                              msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
 339}
 340
 341static void o2hb_disarm_write_timeout(struct o2hb_region *reg)
 342{
 343        cancel_delayed_work_sync(&reg->hr_write_timeout_work);
 344}
 345
 346static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
 347{
 348        atomic_set(&wc->wc_num_reqs, 1);
 349        init_completion(&wc->wc_io_complete);
 350        wc->wc_error = 0;
 351}
 352
 353/* Used in error paths too */
 354static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
 355                                     unsigned int num)
 356{
 357        /* sadly atomic_sub_and_test() isn't available on all platforms.  The
 358         * good news is that the fast path only completes one at a time */
 359        while(num--) {
 360                if (atomic_dec_and_test(&wc->wc_num_reqs)) {
 361                        BUG_ON(num > 0);
 362                        complete(&wc->wc_io_complete);
 363                }
 364        }
 365}
 366
 367static void o2hb_wait_on_io(struct o2hb_region *reg,
 368                            struct o2hb_bio_wait_ctxt *wc)
 369{
 370        o2hb_bio_wait_dec(wc, 1);
 371        wait_for_completion(&wc->wc_io_complete);
 372}
 373
 374static void o2hb_bio_end_io(struct bio *bio,
 375                           int error)
 376{
 377        struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
 378
 379        if (error) {
 380                mlog(ML_ERROR, "IO Error %d\n", error);
 381                wc->wc_error = error;
 382        }
 383
 384        o2hb_bio_wait_dec(wc, 1);
 385        bio_put(bio);
 386}
 387
 388/* Setup a Bio to cover I/O against num_slots slots starting at
 389 * start_slot. */
 390static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
 391                                      struct o2hb_bio_wait_ctxt *wc,
 392                                      unsigned int *current_slot,
 393                                      unsigned int max_slots)
 394{
 395        int len, current_page;
 396        unsigned int vec_len, vec_start;
 397        unsigned int bits = reg->hr_block_bits;
 398        unsigned int spp = reg->hr_slots_per_page;
 399        unsigned int cs = *current_slot;
 400        struct bio *bio;
 401        struct page *page;
 402
 403        /* Testing has shown this allocation to take long enough under
 404         * GFP_KERNEL that the local node can get fenced. It would be
 405         * nicest if we could pre-allocate these bios and avoid this
 406         * all together. */
 407        bio = bio_alloc(GFP_ATOMIC, 16);
 408        if (!bio) {
 409                mlog(ML_ERROR, "Could not alloc slots BIO!\n");
 410                bio = ERR_PTR(-ENOMEM);
 411                goto bail;
 412        }
 413
 414        /* Must put everything in 512 byte sectors for the bio... */
 415        bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9);
 416        bio->bi_bdev = reg->hr_bdev;
 417        bio->bi_private = wc;
 418        bio->bi_end_io = o2hb_bio_end_io;
 419
 420        vec_start = (cs << bits) % PAGE_CACHE_SIZE;
 421        while(cs < max_slots) {
 422                current_page = cs / spp;
 423                page = reg->hr_slot_data[current_page];
 424
 425                vec_len = min(PAGE_CACHE_SIZE - vec_start,
 426                              (max_slots-cs) * (PAGE_CACHE_SIZE/spp) );
 427
 428                mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
 429                     current_page, vec_len, vec_start);
 430
 431                len = bio_add_page(bio, page, vec_len, vec_start);
 432                if (len != vec_len) break;
 433
 434                cs += vec_len / (PAGE_CACHE_SIZE/spp);
 435                vec_start = 0;
 436        }
 437
 438bail:
 439        *current_slot = cs;
 440        return bio;
 441}
 442
 443static int o2hb_read_slots(struct o2hb_region *reg,
 444                           unsigned int max_slots)
 445{
 446        unsigned int current_slot=0;
 447        int status;
 448        struct o2hb_bio_wait_ctxt wc;
 449        struct bio *bio;
 450
 451        o2hb_bio_wait_init(&wc);
 452
 453        while(current_slot < max_slots) {
 454                bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots);
 455                if (IS_ERR(bio)) {
 456                        status = PTR_ERR(bio);
 457                        mlog_errno(status);
 458                        goto bail_and_wait;
 459                }
 460
 461                atomic_inc(&wc.wc_num_reqs);
 462                submit_bio(READ, bio);
 463        }
 464
 465        status = 0;
 466
 467bail_and_wait:
 468        o2hb_wait_on_io(reg, &wc);
 469        if (wc.wc_error && !status)
 470                status = wc.wc_error;
 471
 472        return status;
 473}
 474
 475static int o2hb_issue_node_write(struct o2hb_region *reg,
 476                                 struct o2hb_bio_wait_ctxt *write_wc)
 477{
 478        int status;
 479        unsigned int slot;
 480        struct bio *bio;
 481
 482        o2hb_bio_wait_init(write_wc);
 483
 484        slot = o2nm_this_node();
 485
 486        bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1);
 487        if (IS_ERR(bio)) {
 488                status = PTR_ERR(bio);
 489                mlog_errno(status);
 490                goto bail;
 491        }
 492
 493        atomic_inc(&write_wc->wc_num_reqs);
 494        submit_bio(WRITE, bio);
 495
 496        status = 0;
 497bail:
 498        return status;
 499}
 500
 501static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
 502                                     struct o2hb_disk_heartbeat_block *hb_block)
 503{
 504        __le32 old_cksum;
 505        u32 ret;
 506
 507        /* We want to compute the block crc with a 0 value in the
 508         * hb_cksum field. Save it off here and replace after the
 509         * crc. */
 510        old_cksum = hb_block->hb_cksum;
 511        hb_block->hb_cksum = 0;
 512
 513        ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
 514
 515        hb_block->hb_cksum = old_cksum;
 516
 517        return ret;
 518}
 519
 520static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
 521{
 522        mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
 523             "cksum = 0x%x, generation 0x%llx\n",
 524             (long long)le64_to_cpu(hb_block->hb_seq),
 525             hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
 526             (long long)le64_to_cpu(hb_block->hb_generation));
 527}
 528
 529static int o2hb_verify_crc(struct o2hb_region *reg,
 530                           struct o2hb_disk_heartbeat_block *hb_block)
 531{
 532        u32 read, computed;
 533
 534        read = le32_to_cpu(hb_block->hb_cksum);
 535        computed = o2hb_compute_block_crc_le(reg, hb_block);
 536
 537        return read == computed;
 538}
 539
 540/* We want to make sure that nobody is heartbeating on top of us --
 541 * this will help detect an invalid configuration. */
 542static void o2hb_check_last_timestamp(struct o2hb_region *reg)
 543{
 544        struct o2hb_disk_slot *slot;
 545        struct o2hb_disk_heartbeat_block *hb_block;
 546        char *errstr;
 547
 548        slot = &reg->hr_slots[o2nm_this_node()];
 549        /* Don't check on our 1st timestamp */
 550        if (!slot->ds_last_time)
 551                return;
 552
 553        hb_block = slot->ds_raw_block;
 554        if (le64_to_cpu(hb_block->hb_seq) == slot->ds_last_time &&
 555            le64_to_cpu(hb_block->hb_generation) == slot->ds_last_generation &&
 556            hb_block->hb_node == slot->ds_node_num)
 557                return;
 558
 559#define ERRSTR1         "Another node is heartbeating on device"
 560#define ERRSTR2         "Heartbeat generation mismatch on device"
 561#define ERRSTR3         "Heartbeat sequence mismatch on device"
 562
 563        if (hb_block->hb_node != slot->ds_node_num)
 564                errstr = ERRSTR1;
 565        else if (le64_to_cpu(hb_block->hb_generation) !=
 566                 slot->ds_last_generation)
 567                errstr = ERRSTR2;
 568        else
 569                errstr = ERRSTR3;
 570
 571        mlog(ML_ERROR, "%s (%s): expected(%u:0x%llx, 0x%llx), "
 572             "ondisk(%u:0x%llx, 0x%llx)\n", errstr, reg->hr_dev_name,
 573             slot->ds_node_num, (unsigned long long)slot->ds_last_generation,
 574             (unsigned long long)slot->ds_last_time, hb_block->hb_node,
 575             (unsigned long long)le64_to_cpu(hb_block->hb_generation),
 576             (unsigned long long)le64_to_cpu(hb_block->hb_seq));
 577}
 578
 579static inline void o2hb_prepare_block(struct o2hb_region *reg,
 580                                      u64 generation)
 581{
 582        int node_num;
 583        u64 cputime;
 584        struct o2hb_disk_slot *slot;
 585        struct o2hb_disk_heartbeat_block *hb_block;
 586
 587        node_num = o2nm_this_node();
 588        slot = &reg->hr_slots[node_num];
 589
 590        hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
 591        memset(hb_block, 0, reg->hr_block_bytes);
 592        /* TODO: time stuff */
 593        cputime = CURRENT_TIME.tv_sec;
 594        if (!cputime)
 595                cputime = 1;
 596
 597        hb_block->hb_seq = cpu_to_le64(cputime);
 598        hb_block->hb_node = node_num;
 599        hb_block->hb_generation = cpu_to_le64(generation);
 600        hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
 601
 602        /* This step must always happen last! */
 603        hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
 604                                                                   hb_block));
 605
 606        mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
 607             (long long)generation,
 608             le32_to_cpu(hb_block->hb_cksum));
 609}
 610
 611static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
 612                                struct o2nm_node *node,
 613                                int idx)
 614{
 615        struct list_head *iter;
 616        struct o2hb_callback_func *f;
 617
 618        list_for_each(iter, &hbcall->list) {
 619                f = list_entry(iter, struct o2hb_callback_func, hc_item);
 620                mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
 621                (f->hc_func)(node, idx, f->hc_data);
 622        }
 623}
 624
 625/* Will run the list in order until we process the passed event */
 626static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
 627{
 628        int empty;
 629        struct o2hb_callback *hbcall;
 630        struct o2hb_node_event *event;
 631
 632        spin_lock(&o2hb_live_lock);
 633        empty = list_empty(&queued_event->hn_item);
 634        spin_unlock(&o2hb_live_lock);
 635        if (empty)
 636                return;
 637
 638        /* Holding callback sem assures we don't alter the callback
 639         * lists when doing this, and serializes ourselves with other
 640         * processes wanting callbacks. */
 641        down_write(&o2hb_callback_sem);
 642
 643        spin_lock(&o2hb_live_lock);
 644        while (!list_empty(&o2hb_node_events)
 645               && !list_empty(&queued_event->hn_item)) {
 646                event = list_entry(o2hb_node_events.next,
 647                                   struct o2hb_node_event,
 648                                   hn_item);
 649                list_del_init(&event->hn_item);
 650                spin_unlock(&o2hb_live_lock);
 651
 652                mlog(ML_HEARTBEAT, "Node %s event for %d\n",
 653                     event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
 654                     event->hn_node_num);
 655
 656                hbcall = hbcall_from_type(event->hn_event_type);
 657
 658                /* We should *never* have gotten on to the list with a
 659                 * bad type... This isn't something that we should try
 660                 * to recover from. */
 661                BUG_ON(IS_ERR(hbcall));
 662
 663                o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
 664
 665                spin_lock(&o2hb_live_lock);
 666        }
 667        spin_unlock(&o2hb_live_lock);
 668
 669        up_write(&o2hb_callback_sem);
 670}
 671
 672static void o2hb_queue_node_event(struct o2hb_node_event *event,
 673                                  enum o2hb_callback_type type,
 674                                  struct o2nm_node *node,
 675                                  int node_num)
 676{
 677        assert_spin_locked(&o2hb_live_lock);
 678
 679        BUG_ON((!node) && (type != O2HB_NODE_DOWN_CB));
 680
 681        event->hn_event_type = type;
 682        event->hn_node = node;
 683        event->hn_node_num = node_num;
 684
 685        mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
 686             type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
 687
 688        list_add_tail(&event->hn_item, &o2hb_node_events);
 689}
 690
 691static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
 692{
 693        struct o2hb_node_event event =
 694                { .hn_item = LIST_HEAD_INIT(event.hn_item), };
 695        struct o2nm_node *node;
 696
 697        node = o2nm_get_node_by_num(slot->ds_node_num);
 698        if (!node)
 699                return;
 700
 701        spin_lock(&o2hb_live_lock);
 702        if (!list_empty(&slot->ds_live_item)) {
 703                mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
 704                     slot->ds_node_num);
 705
 706                list_del_init(&slot->ds_live_item);
 707
 708                if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
 709                        clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 710
 711                        o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
 712                                              slot->ds_node_num);
 713                }
 714        }
 715        spin_unlock(&o2hb_live_lock);
 716
 717        o2hb_run_event_list(&event);
 718
 719        o2nm_node_put(node);
 720}
 721
 722static void o2hb_set_quorum_device(struct o2hb_region *reg,
 723                                   struct o2hb_disk_slot *slot)
 724{
 725        assert_spin_locked(&o2hb_live_lock);
 726
 727        if (!o2hb_global_heartbeat_active())
 728                return;
 729
 730        if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
 731                return;
 732
 733        /*
 734         * A region can be added to the quorum only when it sees all
 735         * live nodes heartbeat on it. In other words, the region has been
 736         * added to all nodes.
 737         */
 738        if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap,
 739                   sizeof(o2hb_live_node_bitmap)))
 740                return;
 741
 742        if (slot->ds_changed_samples < O2HB_LIVE_THRESHOLD)
 743                return;
 744
 745        printk(KERN_NOTICE "o2hb: Region %s is now a quorum device\n",
 746               config_item_name(&reg->hr_item));
 747
 748        set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
 749
 750        /*
 751         * If global heartbeat active, unpin all regions if the
 752         * region count > CUT_OFF
 753         */
 754        if (o2hb_pop_count(&o2hb_quorum_region_bitmap,
 755                           O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF)
 756                o2hb_region_unpin(NULL);
 757}
 758
 759static int o2hb_check_slot(struct o2hb_region *reg,
 760                           struct o2hb_disk_slot *slot)
 761{
 762        int changed = 0, gen_changed = 0;
 763        struct o2hb_node_event event =
 764                { .hn_item = LIST_HEAD_INIT(event.hn_item), };
 765        struct o2nm_node *node;
 766        struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
 767        u64 cputime;
 768        unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
 769        unsigned int slot_dead_ms;
 770        int tmp;
 771
 772        memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
 773
 774        /*
 775         * If a node is no longer configured but is still in the livemap, we
 776         * may need to clear that bit from the livemap.
 777         */
 778        node = o2nm_get_node_by_num(slot->ds_node_num);
 779        if (!node) {
 780                spin_lock(&o2hb_live_lock);
 781                tmp = test_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 782                spin_unlock(&o2hb_live_lock);
 783                if (!tmp)
 784                        return 0;
 785        }
 786
 787        if (!o2hb_verify_crc(reg, hb_block)) {
 788                /* all paths from here will drop o2hb_live_lock for
 789                 * us. */
 790                spin_lock(&o2hb_live_lock);
 791
 792                /* Don't print an error on the console in this case -
 793                 * a freshly formatted heartbeat area will not have a
 794                 * crc set on it. */
 795                if (list_empty(&slot->ds_live_item))
 796                        goto out;
 797
 798                /* The node is live but pushed out a bad crc. We
 799                 * consider it a transient miss but don't populate any
 800                 * other values as they may be junk. */
 801                mlog(ML_ERROR, "Node %d has written a bad crc to %s\n",
 802                     slot->ds_node_num, reg->hr_dev_name);
 803                o2hb_dump_slot(hb_block);
 804
 805                slot->ds_equal_samples++;
 806                goto fire_callbacks;
 807        }
 808
 809        /* we don't care if these wrap.. the state transitions below
 810         * clear at the right places */
 811        cputime = le64_to_cpu(hb_block->hb_seq);
 812        if (slot->ds_last_time != cputime)
 813                slot->ds_changed_samples++;
 814        else
 815                slot->ds_equal_samples++;
 816        slot->ds_last_time = cputime;
 817
 818        /* The node changed heartbeat generations. We assume this to
 819         * mean it dropped off but came back before we timed out. We
 820         * want to consider it down for the time being but don't want
 821         * to lose any changed_samples state we might build up to
 822         * considering it live again. */
 823        if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
 824                gen_changed = 1;
 825                slot->ds_equal_samples = 0;
 826                mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
 827                     "to 0x%llx)\n", slot->ds_node_num,
 828                     (long long)slot->ds_last_generation,
 829                     (long long)le64_to_cpu(hb_block->hb_generation));
 830        }
 831
 832        slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
 833
 834        mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
 835             "seq %llu last %llu changed %u equal %u\n",
 836             slot->ds_node_num, (long long)slot->ds_last_generation,
 837             le32_to_cpu(hb_block->hb_cksum),
 838             (unsigned long long)le64_to_cpu(hb_block->hb_seq),
 839             (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
 840             slot->ds_equal_samples);
 841
 842        spin_lock(&o2hb_live_lock);
 843
 844fire_callbacks:
 845        /* dead nodes only come to life after some number of
 846         * changes at any time during their dead time */
 847        if (list_empty(&slot->ds_live_item) &&
 848            slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
 849                mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
 850                     slot->ds_node_num, (long long)slot->ds_last_generation);
 851
 852                set_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
 853
 854                /* first on the list generates a callback */
 855                if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
 856                        mlog(ML_HEARTBEAT, "o2hb: Add node %d to live nodes "
 857                             "bitmap\n", slot->ds_node_num);
 858                        set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 859
 860                        o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
 861                                              slot->ds_node_num);
 862
 863                        changed = 1;
 864                }
 865
 866                list_add_tail(&slot->ds_live_item,
 867                              &o2hb_live_slots[slot->ds_node_num]);
 868
 869                slot->ds_equal_samples = 0;
 870
 871                /* We want to be sure that all nodes agree on the
 872                 * number of milliseconds before a node will be
 873                 * considered dead. The self-fencing timeout is
 874                 * computed from this value, and a discrepancy might
 875                 * result in heartbeat calling a node dead when it
 876                 * hasn't self-fenced yet. */
 877                slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
 878                if (slot_dead_ms && slot_dead_ms != dead_ms) {
 879                        /* TODO: Perhaps we can fail the region here. */
 880                        mlog(ML_ERROR, "Node %d on device %s has a dead count "
 881                             "of %u ms, but our count is %u ms.\n"
 882                             "Please double check your configuration values "
 883                             "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
 884                             slot->ds_node_num, reg->hr_dev_name, slot_dead_ms,
 885                             dead_ms);
 886                }
 887                goto out;
 888        }
 889
 890        /* if the list is dead, we're done.. */
 891        if (list_empty(&slot->ds_live_item))
 892                goto out;
 893
 894        /* live nodes only go dead after enough consequtive missed
 895         * samples..  reset the missed counter whenever we see
 896         * activity */
 897        if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
 898                mlog(ML_HEARTBEAT, "Node %d left my region\n",
 899                     slot->ds_node_num);
 900
 901                clear_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
 902
 903                /* last off the live_slot generates a callback */
 904                list_del_init(&slot->ds_live_item);
 905                if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
 906                        mlog(ML_HEARTBEAT, "o2hb: Remove node %d from live "
 907                             "nodes bitmap\n", slot->ds_node_num);
 908                        clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 909
 910                        /* node can be null */
 911                        o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB,
 912                                              node, slot->ds_node_num);
 913
 914                        changed = 1;
 915                }
 916
 917                /* We don't clear this because the node is still
 918                 * actually writing new blocks. */
 919                if (!gen_changed)
 920                        slot->ds_changed_samples = 0;
 921                goto out;
 922        }
 923        if (slot->ds_changed_samples) {
 924                slot->ds_changed_samples = 0;
 925                slot->ds_equal_samples = 0;
 926        }
 927out:
 928        o2hb_set_quorum_device(reg, slot);
 929
 930        spin_unlock(&o2hb_live_lock);
 931
 932        o2hb_run_event_list(&event);
 933
 934        if (node)
 935                o2nm_node_put(node);
 936        return changed;
 937}
 938
 939/* This could be faster if we just implmented a find_last_bit, but I
 940 * don't think the circumstances warrant it. */
 941static int o2hb_highest_node(unsigned long *nodes,
 942                             int numbits)
 943{
 944        int highest, node;
 945
 946        highest = numbits;
 947        node = -1;
 948        while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) {
 949                if (node >= numbits)
 950                        break;
 951
 952                highest = node;
 953        }
 954
 955        return highest;
 956}
 957
 958static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
 959{
 960        int i, ret, highest_node, change = 0;
 961        unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
 962        unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 963        struct o2hb_bio_wait_ctxt write_wc;
 964
 965        ret = o2nm_configured_node_map(configured_nodes,
 966                                       sizeof(configured_nodes));
 967        if (ret) {
 968                mlog_errno(ret);
 969                return ret;
 970        }
 971
 972        /*
 973         * If a node is not configured but is in the livemap, we still need
 974         * to read the slot so as to be able to remove it from the livemap.
 975         */
 976        o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
 977        i = -1;
 978        while ((i = find_next_bit(live_node_bitmap,
 979                                  O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
 980                set_bit(i, configured_nodes);
 981        }
 982
 983        highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
 984        if (highest_node >= O2NM_MAX_NODES) {
 985                mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n");
 986                return -EINVAL;
 987        }
 988
 989        /* No sense in reading the slots of nodes that don't exist
 990         * yet. Of course, if the node definitions have holes in them
 991         * then we're reading an empty slot anyway... Consider this
 992         * best-effort. */
 993        ret = o2hb_read_slots(reg, highest_node + 1);
 994        if (ret < 0) {
 995                mlog_errno(ret);
 996                return ret;
 997        }
 998
 999        /* With an up to date view of the slots, we can check that no
1000         * other node has been improperly configured to heartbeat in
1001         * our slot. */
1002        o2hb_check_last_timestamp(reg);
1003
1004        /* fill in the proper info for our next heartbeat */
1005        o2hb_prepare_block(reg, reg->hr_generation);
1006
1007        /* And fire off the write. Note that we don't wait on this I/O
1008         * until later. */
1009        ret = o2hb_issue_node_write(reg, &write_wc);
1010        if (ret < 0) {
1011                mlog_errno(ret);
1012                return ret;
1013        }
1014
1015        i = -1;
1016        while((i = find_next_bit(configured_nodes,
1017                                 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1018                change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
1019        }
1020
1021        /*
1022         * We have to be sure we've advertised ourselves on disk
1023         * before we can go to steady state.  This ensures that
1024         * people we find in our steady state have seen us.
1025         */
1026        o2hb_wait_on_io(reg, &write_wc);
1027        if (write_wc.wc_error) {
1028                /* Do not re-arm the write timeout on I/O error - we
1029                 * can't be sure that the new block ever made it to
1030                 * disk */
1031                mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
1032                     write_wc.wc_error, reg->hr_dev_name);
1033                return write_wc.wc_error;
1034        }
1035
1036        o2hb_arm_write_timeout(reg);
1037
1038        /* let the person who launched us know when things are steady */
1039        if (!change && (atomic_read(&reg->hr_steady_iterations) != 0)) {
1040                if (atomic_dec_and_test(&reg->hr_steady_iterations))
1041                        wake_up(&o2hb_steady_queue);
1042        }
1043
1044        return 0;
1045}
1046
1047/* Subtract b from a, storing the result in a. a *must* have a larger
1048 * value than b. */
1049static void o2hb_tv_subtract(struct timeval *a,
1050                             struct timeval *b)
1051{
1052        /* just return 0 when a is after b */
1053        if (a->tv_sec < b->tv_sec ||
1054            (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) {
1055                a->tv_sec = 0;
1056                a->tv_usec = 0;
1057                return;
1058        }
1059
1060        a->tv_sec -= b->tv_sec;
1061        a->tv_usec -= b->tv_usec;
1062        while ( a->tv_usec < 0 ) {
1063                a->tv_sec--;
1064                a->tv_usec += 1000000;
1065        }
1066}
1067
1068static unsigned int o2hb_elapsed_msecs(struct timeval *start,
1069                                       struct timeval *end)
1070{
1071        struct timeval res = *end;
1072
1073        o2hb_tv_subtract(&res, start);
1074
1075        return res.tv_sec * 1000 + res.tv_usec / 1000;
1076}
1077
1078/*
1079 * we ride the region ref that the region dir holds.  before the region
1080 * dir is removed and drops it ref it will wait to tear down this
1081 * thread.
1082 */
1083static int o2hb_thread(void *data)
1084{
1085        int i, ret;
1086        struct o2hb_region *reg = data;
1087        struct o2hb_bio_wait_ctxt write_wc;
1088        struct timeval before_hb, after_hb;
1089        unsigned int elapsed_msec;
1090
1091        mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
1092
1093        set_user_nice(current, -20);
1094
1095        /* Pin node */
1096        o2nm_depend_this_node();
1097
1098        while (!kthread_should_stop() && !reg->hr_unclean_stop) {
1099                /* We track the time spent inside
1100                 * o2hb_do_disk_heartbeat so that we avoid more than
1101                 * hr_timeout_ms between disk writes. On busy systems
1102                 * this should result in a heartbeat which is less
1103                 * likely to time itself out. */
1104                do_gettimeofday(&before_hb);
1105
1106                i = 0;
1107                do {
1108                        ret = o2hb_do_disk_heartbeat(reg);
1109                } while (ret && ++i < 2);
1110
1111                do_gettimeofday(&after_hb);
1112                elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb);
1113
1114                mlog(ML_HEARTBEAT,
1115                     "start = %lu.%lu, end = %lu.%lu, msec = %u\n",
1116                     before_hb.tv_sec, (unsigned long) before_hb.tv_usec,
1117                     after_hb.tv_sec, (unsigned long) after_hb.tv_usec,
1118                     elapsed_msec);
1119
1120                if (elapsed_msec < reg->hr_timeout_ms) {
1121                        /* the kthread api has blocked signals for us so no
1122                         * need to record the return value. */
1123                        msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
1124                }
1125        }
1126
1127        o2hb_disarm_write_timeout(reg);
1128
1129        /* unclean stop is only used in very bad situation */
1130        for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
1131                o2hb_shutdown_slot(&reg->hr_slots[i]);
1132
1133        /* Explicit down notification - avoid forcing the other nodes
1134         * to timeout on this region when we could just as easily
1135         * write a clear generation - thus indicating to them that
1136         * this node has left this region.
1137         *
1138         * XXX: Should we skip this on unclean_stop? */
1139        o2hb_prepare_block(reg, 0);
1140        ret = o2hb_issue_node_write(reg, &write_wc);
1141        if (ret == 0) {
1142                o2hb_wait_on_io(reg, &write_wc);
1143        } else {
1144                mlog_errno(ret);
1145        }
1146
1147        /* Unpin node */
1148        o2nm_undepend_this_node();
1149
1150        mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n");
1151
1152        return 0;
1153}
1154
1155#ifdef CONFIG_DEBUG_FS
1156static int o2hb_debug_open(struct inode *inode, struct file *file)
1157{
1158        struct o2hb_debug_buf *db = inode->i_private;
1159        struct o2hb_region *reg;
1160        unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1161        char *buf = NULL;
1162        int i = -1;
1163        int out = 0;
1164
1165        /* max_nodes should be the largest bitmap we pass here */
1166        BUG_ON(sizeof(map) < db->db_size);
1167
1168        buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
1169        if (!buf)
1170                goto bail;
1171
1172        switch (db->db_type) {
1173        case O2HB_DB_TYPE_LIVENODES:
1174        case O2HB_DB_TYPE_LIVEREGIONS:
1175        case O2HB_DB_TYPE_QUORUMREGIONS:
1176        case O2HB_DB_TYPE_FAILEDREGIONS:
1177                spin_lock(&o2hb_live_lock);
1178                memcpy(map, db->db_data, db->db_size);
1179                spin_unlock(&o2hb_live_lock);
1180                break;
1181
1182        case O2HB_DB_TYPE_REGION_LIVENODES:
1183                spin_lock(&o2hb_live_lock);
1184                reg = (struct o2hb_region *)db->db_data;
1185                memcpy(map, reg->hr_live_node_bitmap, db->db_size);
1186                spin_unlock(&o2hb_live_lock);
1187                break;
1188
1189        case O2HB_DB_TYPE_REGION_NUMBER:
1190                reg = (struct o2hb_region *)db->db_data;
1191                out += snprintf(buf + out, PAGE_SIZE - out, "%d\n",
1192                                reg->hr_region_num);
1193                goto done;
1194
1195        case O2HB_DB_TYPE_REGION_ELAPSED_TIME:
1196                reg = (struct o2hb_region *)db->db_data;
1197                out += snprintf(buf + out, PAGE_SIZE - out, "%u\n",
1198                                jiffies_to_msecs(jiffies -
1199                                                 reg->hr_last_timeout_start));
1200                goto done;
1201
1202        case O2HB_DB_TYPE_REGION_PINNED:
1203                reg = (struct o2hb_region *)db->db_data;
1204                out += snprintf(buf + out, PAGE_SIZE - out, "%u\n",
1205                                !!reg->hr_item_pinned);
1206                goto done;
1207
1208        default:
1209                goto done;
1210        }
1211
1212        while ((i = find_next_bit(map, db->db_len, i + 1)) < db->db_len)
1213                out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i);
1214        out += snprintf(buf + out, PAGE_SIZE - out, "\n");
1215
1216done:
1217        i_size_write(inode, out);
1218
1219        file->private_data = buf;
1220
1221        return 0;
1222bail:
1223        return -ENOMEM;
1224}
1225
1226static int o2hb_debug_release(struct inode *inode, struct file *file)
1227{
1228        kfree(file->private_data);
1229        return 0;
1230}
1231
1232static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1233                                 size_t nbytes, loff_t *ppos)
1234{
1235        return simple_read_from_buffer(buf, nbytes, ppos, file->private_data,
1236                                       i_size_read(file->f_mapping->host));
1237}
1238#else
1239static int o2hb_debug_open(struct inode *inode, struct file *file)
1240{
1241        return 0;
1242}
1243static int o2hb_debug_release(struct inode *inode, struct file *file)
1244{
1245        return 0;
1246}
1247static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1248                               size_t nbytes, loff_t *ppos)
1249{
1250        return 0;
1251}
1252#endif  /* CONFIG_DEBUG_FS */
1253
1254static const struct file_operations o2hb_debug_fops = {
1255        .open =         o2hb_debug_open,
1256        .release =      o2hb_debug_release,
1257        .read =         o2hb_debug_read,
1258        .llseek =       generic_file_llseek,
1259};
1260
1261void o2hb_exit(void)
1262{
1263        kfree(o2hb_db_livenodes);
1264        kfree(o2hb_db_liveregions);
1265        kfree(o2hb_db_quorumregions);
1266        kfree(o2hb_db_failedregions);
1267        debugfs_remove(o2hb_debug_failedregions);
1268        debugfs_remove(o2hb_debug_quorumregions);
1269        debugfs_remove(o2hb_debug_liveregions);
1270        debugfs_remove(o2hb_debug_livenodes);
1271        debugfs_remove(o2hb_debug_dir);
1272}
1273
1274static struct dentry *o2hb_debug_create(const char *name, struct dentry *dir,
1275                                        struct o2hb_debug_buf **db, int db_len,
1276                                        int type, int size, int len, void *data)
1277{
1278        *db = kmalloc(db_len, GFP_KERNEL);
1279        if (!*db)
1280                return NULL;
1281
1282        (*db)->db_type = type;
1283        (*db)->db_size = size;
1284        (*db)->db_len = len;
1285        (*db)->db_data = data;
1286
1287        return debugfs_create_file(name, S_IFREG|S_IRUSR, dir, *db,
1288                                   &o2hb_debug_fops);
1289}
1290
1291static int o2hb_debug_init(void)
1292{
1293        int ret = -ENOMEM;
1294
1295        o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL);
1296        if (!o2hb_debug_dir) {
1297                mlog_errno(ret);
1298                goto bail;
1299        }
1300
1301        o2hb_debug_livenodes = o2hb_debug_create(O2HB_DEBUG_LIVENODES,
1302                                                 o2hb_debug_dir,
1303                                                 &o2hb_db_livenodes,
1304                                                 sizeof(*o2hb_db_livenodes),
1305                                                 O2HB_DB_TYPE_LIVENODES,
1306                                                 sizeof(o2hb_live_node_bitmap),
1307                                                 O2NM_MAX_NODES,
1308                                                 o2hb_live_node_bitmap);
1309        if (!o2hb_debug_livenodes) {
1310                mlog_errno(ret);
1311                goto bail;
1312        }
1313
1314        o2hb_debug_liveregions = o2hb_debug_create(O2HB_DEBUG_LIVEREGIONS,
1315                                                   o2hb_debug_dir,
1316                                                   &o2hb_db_liveregions,
1317                                                   sizeof(*o2hb_db_liveregions),
1318                                                   O2HB_DB_TYPE_LIVEREGIONS,
1319                                                   sizeof(o2hb_live_region_bitmap),
1320                                                   O2NM_MAX_REGIONS,
1321                                                   o2hb_live_region_bitmap);
1322        if (!o2hb_debug_liveregions) {
1323                mlog_errno(ret);
1324                goto bail;
1325        }
1326
1327        o2hb_debug_quorumregions =
1328                        o2hb_debug_create(O2HB_DEBUG_QUORUMREGIONS,
1329                                          o2hb_debug_dir,
1330                                          &o2hb_db_quorumregions,
1331                                          sizeof(*o2hb_db_quorumregions),
1332                                          O2HB_DB_TYPE_QUORUMREGIONS,
1333                                          sizeof(o2hb_quorum_region_bitmap),
1334                                          O2NM_MAX_REGIONS,
1335                                          o2hb_quorum_region_bitmap);
1336        if (!o2hb_debug_quorumregions) {
1337                mlog_errno(ret);
1338                goto bail;
1339        }
1340
1341        o2hb_debug_failedregions =
1342                        o2hb_debug_create(O2HB_DEBUG_FAILEDREGIONS,
1343                                          o2hb_debug_dir,
1344                                          &o2hb_db_failedregions,
1345                                          sizeof(*o2hb_db_failedregions),
1346                                          O2HB_DB_TYPE_FAILEDREGIONS,
1347                                          sizeof(o2hb_failed_region_bitmap),
1348                                          O2NM_MAX_REGIONS,
1349                                          o2hb_failed_region_bitmap);
1350        if (!o2hb_debug_failedregions) {
1351                mlog_errno(ret);
1352                goto bail;
1353        }
1354
1355        ret = 0;
1356bail:
1357        if (ret)
1358                o2hb_exit();
1359
1360        return ret;
1361}
1362
1363int o2hb_init(void)
1364{
1365        int i;
1366
1367        for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
1368                INIT_LIST_HEAD(&o2hb_callbacks[i].list);
1369
1370        for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
1371                INIT_LIST_HEAD(&o2hb_live_slots[i]);
1372
1373        INIT_LIST_HEAD(&o2hb_node_events);
1374
1375        memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
1376        memset(o2hb_region_bitmap, 0, sizeof(o2hb_region_bitmap));
1377        memset(o2hb_live_region_bitmap, 0, sizeof(o2hb_live_region_bitmap));
1378        memset(o2hb_quorum_region_bitmap, 0, sizeof(o2hb_quorum_region_bitmap));
1379        memset(o2hb_failed_region_bitmap, 0, sizeof(o2hb_failed_region_bitmap));
1380
1381        o2hb_dependent_users = 0;
1382
1383        return o2hb_debug_init();
1384}
1385
1386/* if we're already in a callback then we're already serialized by the sem */
1387static void o2hb_fill_node_map_from_callback(unsigned long *map,
1388                                             unsigned bytes)
1389{
1390        BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
1391
1392        memcpy(map, &o2hb_live_node_bitmap, bytes);
1393}
1394
1395/*
1396 * get a map of all nodes that are heartbeating in any regions
1397 */
1398void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
1399{
1400        /* callers want to serialize this map and callbacks so that they
1401         * can trust that they don't miss nodes coming to the party */
1402        down_read(&o2hb_callback_sem);
1403        spin_lock(&o2hb_live_lock);
1404        o2hb_fill_node_map_from_callback(map, bytes);
1405        spin_unlock(&o2hb_live_lock);
1406        up_read(&o2hb_callback_sem);
1407}
1408EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
1409
1410/*
1411 * heartbeat configfs bits.  The heartbeat set is a default set under
1412 * the cluster set in nodemanager.c.
1413 */
1414
1415static struct o2hb_region *to_o2hb_region(struct config_item *item)
1416{
1417        return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
1418}
1419
1420/* drop_item only drops its ref after killing the thread, nothing should
1421 * be using the region anymore.  this has to clean up any state that
1422 * attributes might have built up. */
1423static void o2hb_region_release(struct config_item *item)
1424{
1425        int i;
1426        struct page *page;
1427        struct o2hb_region *reg = to_o2hb_region(item);
1428
1429        if (reg->hr_tmp_block)
1430                kfree(reg->hr_tmp_block);
1431
1432        if (reg->hr_slot_data) {
1433                for (i = 0; i < reg->hr_num_pages; i++) {
1434                        page = reg->hr_slot_data[i];
1435                        if (page)
1436                                __free_page(page);
1437                }
1438                kfree(reg->hr_slot_data);
1439        }
1440
1441        if (reg->hr_bdev)
1442                blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1443
1444        if (reg->hr_slots)
1445                kfree(reg->hr_slots);
1446
1447        kfree(reg->hr_db_regnum);
1448        kfree(reg->hr_db_livenodes);
1449        debugfs_remove(reg->hr_debug_livenodes);
1450        debugfs_remove(reg->hr_debug_regnum);
1451        debugfs_remove(reg->hr_debug_elapsed_time);
1452        debugfs_remove(reg->hr_debug_pinned);
1453        debugfs_remove(reg->hr_debug_dir);
1454
1455        spin_lock(&o2hb_live_lock);
1456        list_del(&reg->hr_all_item);
1457        spin_unlock(&o2hb_live_lock);
1458
1459        kfree(reg);
1460}
1461
1462static int o2hb_read_block_input(struct o2hb_region *reg,
1463                                 const char *page,
1464                                 size_t count,
1465                                 unsigned long *ret_bytes,
1466                                 unsigned int *ret_bits)
1467{
1468        unsigned long bytes;
1469        char *p = (char *)page;
1470
1471        bytes = simple_strtoul(p, &p, 0);
1472        if (!p || (*p && (*p != '\n')))
1473                return -EINVAL;
1474
1475        /* Heartbeat and fs min / max block sizes are the same. */
1476        if (bytes > 4096 || bytes < 512)
1477                return -ERANGE;
1478        if (hweight16(bytes) != 1)
1479                return -EINVAL;
1480
1481        if (ret_bytes)
1482                *ret_bytes = bytes;
1483        if (ret_bits)
1484                *ret_bits = ffs(bytes) - 1;
1485
1486        return 0;
1487}
1488
1489static ssize_t o2hb_region_block_bytes_read(struct o2hb_region *reg,
1490                                            char *page)
1491{
1492        return sprintf(page, "%u\n", reg->hr_block_bytes);
1493}
1494
1495static ssize_t o2hb_region_block_bytes_write(struct o2hb_region *reg,
1496                                             const char *page,
1497                                             size_t count)
1498{
1499        int status;
1500        unsigned long block_bytes;
1501        unsigned int block_bits;
1502
1503        if (reg->hr_bdev)
1504                return -EINVAL;
1505
1506        status = o2hb_read_block_input(reg, page, count,
1507                                       &block_bytes, &block_bits);
1508        if (status)
1509                return status;
1510
1511        reg->hr_block_bytes = (unsigned int)block_bytes;
1512        reg->hr_block_bits = block_bits;
1513
1514        return count;
1515}
1516
1517static ssize_t o2hb_region_start_block_read(struct o2hb_region *reg,
1518                                            char *page)
1519{
1520        return sprintf(page, "%llu\n", reg->hr_start_block);
1521}
1522
1523static ssize_t o2hb_region_start_block_write(struct o2hb_region *reg,
1524                                             const char *page,
1525                                             size_t count)
1526{
1527        unsigned long long tmp;
1528        char *p = (char *)page;
1529
1530        if (reg->hr_bdev)
1531                return -EINVAL;
1532
1533        tmp = simple_strtoull(p, &p, 0);
1534        if (!p || (*p && (*p != '\n')))
1535                return -EINVAL;
1536
1537        reg->hr_start_block = tmp;
1538
1539        return count;
1540}
1541
1542static ssize_t o2hb_region_blocks_read(struct o2hb_region *reg,
1543                                       char *page)
1544{
1545        return sprintf(page, "%d\n", reg->hr_blocks);
1546}
1547
1548static ssize_t o2hb_region_blocks_write(struct o2hb_region *reg,
1549                                        const char *page,
1550                                        size_t count)
1551{
1552        unsigned long tmp;
1553        char *p = (char *)page;
1554
1555        if (reg->hr_bdev)
1556                return -EINVAL;
1557
1558        tmp = simple_strtoul(p, &p, 0);
1559        if (!p || (*p && (*p != '\n')))
1560                return -EINVAL;
1561
1562        if (tmp > O2NM_MAX_NODES || tmp == 0)
1563                return -ERANGE;
1564
1565        reg->hr_blocks = (unsigned int)tmp;
1566
1567        return count;
1568}
1569
1570static ssize_t o2hb_region_dev_read(struct o2hb_region *reg,
1571                                    char *page)
1572{
1573        unsigned int ret = 0;
1574
1575        if (reg->hr_bdev)
1576                ret = sprintf(page, "%s\n", reg->hr_dev_name);
1577
1578        return ret;
1579}
1580
1581static void o2hb_init_region_params(struct o2hb_region *reg)
1582{
1583        reg->hr_slots_per_page = PAGE_CACHE_SIZE >> reg->hr_block_bits;
1584        reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
1585
1586        mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
1587             reg->hr_start_block, reg->hr_blocks);
1588        mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
1589             reg->hr_block_bytes, reg->hr_block_bits);
1590        mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
1591        mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
1592}
1593
1594static int o2hb_map_slot_data(struct o2hb_region *reg)
1595{
1596        int i, j;
1597        unsigned int last_slot;
1598        unsigned int spp = reg->hr_slots_per_page;
1599        struct page *page;
1600        char *raw;
1601        struct o2hb_disk_slot *slot;
1602
1603        reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
1604        if (reg->hr_tmp_block == NULL) {
1605                mlog_errno(-ENOMEM);
1606                return -ENOMEM;
1607        }
1608
1609        reg->hr_slots = kcalloc(reg->hr_blocks,
1610                                sizeof(struct o2hb_disk_slot), GFP_KERNEL);
1611        if (reg->hr_slots == NULL) {
1612                mlog_errno(-ENOMEM);
1613                return -ENOMEM;
1614        }
1615
1616        for(i = 0; i < reg->hr_blocks; i++) {
1617                slot = &reg->hr_slots[i];
1618                slot->ds_node_num = i;
1619                INIT_LIST_HEAD(&slot->ds_live_item);
1620                slot->ds_raw_block = NULL;
1621        }
1622
1623        reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
1624        mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
1625                           "at %u blocks per page\n",
1626             reg->hr_num_pages, reg->hr_blocks, spp);
1627
1628        reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
1629                                    GFP_KERNEL);
1630        if (!reg->hr_slot_data) {
1631                mlog_errno(-ENOMEM);
1632                return -ENOMEM;
1633        }
1634
1635        for(i = 0; i < reg->hr_num_pages; i++) {
1636                page = alloc_page(GFP_KERNEL);
1637                if (!page) {
1638                        mlog_errno(-ENOMEM);
1639                        return -ENOMEM;
1640                }
1641
1642                reg->hr_slot_data[i] = page;
1643
1644                last_slot = i * spp;
1645                raw = page_address(page);
1646                for (j = 0;
1647                     (j < spp) && ((j + last_slot) < reg->hr_blocks);
1648                     j++) {
1649                        BUG_ON((j + last_slot) >= reg->hr_blocks);
1650
1651                        slot = &reg->hr_slots[j + last_slot];
1652                        slot->ds_raw_block =
1653                                (struct o2hb_disk_heartbeat_block *) raw;
1654
1655                        raw += reg->hr_block_bytes;
1656                }
1657        }
1658
1659        return 0;
1660}
1661
1662/* Read in all the slots available and populate the tracking
1663 * structures so that we can start with a baseline idea of what's
1664 * there. */
1665static int o2hb_populate_slot_data(struct o2hb_region *reg)
1666{
1667        int ret, i;
1668        struct o2hb_disk_slot *slot;
1669        struct o2hb_disk_heartbeat_block *hb_block;
1670
1671        ret = o2hb_read_slots(reg, reg->hr_blocks);
1672        if (ret) {
1673                mlog_errno(ret);
1674                goto out;
1675        }
1676
1677        /* We only want to get an idea of the values initially in each
1678         * slot, so we do no verification - o2hb_check_slot will
1679         * actually determine if each configured slot is valid and
1680         * whether any values have changed. */
1681        for(i = 0; i < reg->hr_blocks; i++) {
1682                slot = &reg->hr_slots[i];
1683                hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
1684
1685                /* Only fill the values that o2hb_check_slot uses to
1686                 * determine changing slots */
1687                slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
1688                slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
1689        }
1690
1691out:
1692        return ret;
1693}
1694
1695/* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
1696static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
1697                                     const char *page,
1698                                     size_t count)
1699{
1700        struct task_struct *hb_task;
1701        long fd;
1702        int sectsize;
1703        char *p = (char *)page;
1704        struct file *filp = NULL;
1705        struct inode *inode = NULL;
1706        ssize_t ret = -EINVAL;
1707        int live_threshold;
1708
1709        if (reg->hr_bdev)
1710                goto out;
1711
1712        /* We can't heartbeat without having had our node number
1713         * configured yet. */
1714        if (o2nm_this_node() == O2NM_MAX_NODES)
1715                goto out;
1716
1717        fd = simple_strtol(p, &p, 0);
1718        if (!p || (*p && (*p != '\n')))
1719                goto out;
1720
1721        if (fd < 0 || fd >= INT_MAX)
1722                goto out;
1723
1724        filp = fget(fd);
1725        if (filp == NULL)
1726                goto out;
1727
1728        if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
1729            reg->hr_block_bytes == 0)
1730                goto out;
1731
1732        inode = igrab(filp->f_mapping->host);
1733        if (inode == NULL)
1734                goto out;
1735
1736        if (!S_ISBLK(inode->i_mode))
1737                goto out;
1738
1739        reg->hr_bdev = I_BDEV(filp->f_mapping->host);
1740        ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, NULL);
1741        if (ret) {
1742                reg->hr_bdev = NULL;
1743                goto out;
1744        }
1745        inode = NULL;
1746
1747        bdevname(reg->hr_bdev, reg->hr_dev_name);
1748
1749        sectsize = bdev_logical_block_size(reg->hr_bdev);
1750        if (sectsize != reg->hr_block_bytes) {
1751                mlog(ML_ERROR,
1752                     "blocksize %u incorrect for device, expected %d",
1753                     reg->hr_block_bytes, sectsize);
1754                ret = -EINVAL;
1755                goto out;
1756        }
1757
1758        o2hb_init_region_params(reg);
1759
1760        /* Generation of zero is invalid */
1761        do {
1762                get_random_bytes(&reg->hr_generation,
1763                                 sizeof(reg->hr_generation));
1764        } while (reg->hr_generation == 0);
1765
1766        ret = o2hb_map_slot_data(reg);
1767        if (ret) {
1768                mlog_errno(ret);
1769                goto out;
1770        }
1771
1772        ret = o2hb_populate_slot_data(reg);
1773        if (ret) {
1774                mlog_errno(ret);
1775                goto out;
1776        }
1777
1778        INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
1779
1780        /*
1781         * A node is considered live after it has beat LIVE_THRESHOLD
1782         * times.  We're not steady until we've given them a chance
1783         * _after_ our first read.
1784         * The default threshold is bare minimum so as to limit the delay
1785         * during mounts. For global heartbeat, the threshold doubled for the
1786         * first region.
1787         */
1788        live_threshold = O2HB_LIVE_THRESHOLD;
1789        if (o2hb_global_heartbeat_active()) {
1790                spin_lock(&o2hb_live_lock);
1791                if (o2hb_pop_count(&o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1)
1792                        live_threshold <<= 1;
1793                spin_unlock(&o2hb_live_lock);
1794        }
1795        atomic_set(&reg->hr_steady_iterations, live_threshold + 1);
1796
1797        hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
1798                              reg->hr_item.ci_name);
1799        if (IS_ERR(hb_task)) {
1800                ret = PTR_ERR(hb_task);
1801                mlog_errno(ret);
1802                goto out;
1803        }
1804
1805        spin_lock(&o2hb_live_lock);
1806        reg->hr_task = hb_task;
1807        spin_unlock(&o2hb_live_lock);
1808
1809        ret = wait_event_interruptible(o2hb_steady_queue,
1810                                atomic_read(&reg->hr_steady_iterations) == 0);
1811        if (ret) {
1812                /* We got interrupted (hello ptrace!).  Clean up */
1813                spin_lock(&o2hb_live_lock);
1814                hb_task = reg->hr_task;
1815                reg->hr_task = NULL;
1816                spin_unlock(&o2hb_live_lock);
1817
1818                if (hb_task)
1819                        kthread_stop(hb_task);
1820                goto out;
1821        }
1822
1823        /* Ok, we were woken.  Make sure it wasn't by drop_item() */
1824        spin_lock(&o2hb_live_lock);
1825        hb_task = reg->hr_task;
1826        if (o2hb_global_heartbeat_active())
1827                set_bit(reg->hr_region_num, o2hb_live_region_bitmap);
1828        spin_unlock(&o2hb_live_lock);
1829
1830        if (hb_task)
1831                ret = count;
1832        else
1833                ret = -EIO;
1834
1835        if (hb_task && o2hb_global_heartbeat_active())
1836                printk(KERN_NOTICE "o2hb: Heartbeat started on region %s\n",
1837                       config_item_name(&reg->hr_item));
1838
1839out:
1840        if (filp)
1841                fput(filp);
1842        if (inode)
1843                iput(inode);
1844        if (ret < 0) {
1845                if (reg->hr_bdev) {
1846                        blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1847                        reg->hr_bdev = NULL;
1848                }
1849        }
1850        return ret;
1851}
1852
1853static ssize_t o2hb_region_pid_read(struct o2hb_region *reg,
1854                                      char *page)
1855{
1856        pid_t pid = 0;
1857
1858        spin_lock(&o2hb_live_lock);
1859        if (reg->hr_task)
1860                pid = task_pid_nr(reg->hr_task);
1861        spin_unlock(&o2hb_live_lock);
1862
1863        if (!pid)
1864                return 0;
1865
1866        return sprintf(page, "%u\n", pid);
1867}
1868
1869struct o2hb_region_attribute {
1870        struct configfs_attribute attr;
1871        ssize_t (*show)(struct o2hb_region *, char *);
1872        ssize_t (*store)(struct o2hb_region *, const char *, size_t);
1873};
1874
1875static struct o2hb_region_attribute o2hb_region_attr_block_bytes = {
1876        .attr   = { .ca_owner = THIS_MODULE,
1877                    .ca_name = "block_bytes",
1878                    .ca_mode = S_IRUGO | S_IWUSR },
1879        .show   = o2hb_region_block_bytes_read,
1880        .store  = o2hb_region_block_bytes_write,
1881};
1882
1883static struct o2hb_region_attribute o2hb_region_attr_start_block = {
1884        .attr   = { .ca_owner = THIS_MODULE,
1885                    .ca_name = "start_block",
1886                    .ca_mode = S_IRUGO | S_IWUSR },
1887        .show   = o2hb_region_start_block_read,
1888        .store  = o2hb_region_start_block_write,
1889};
1890
1891static struct o2hb_region_attribute o2hb_region_attr_blocks = {
1892        .attr   = { .ca_owner = THIS_MODULE,
1893                    .ca_name = "blocks",
1894                    .ca_mode = S_IRUGO | S_IWUSR },
1895        .show   = o2hb_region_blocks_read,
1896        .store  = o2hb_region_blocks_write,
1897};
1898
1899static struct o2hb_region_attribute o2hb_region_attr_dev = {
1900        .attr   = { .ca_owner = THIS_MODULE,
1901                    .ca_name = "dev",
1902                    .ca_mode = S_IRUGO | S_IWUSR },
1903        .show   = o2hb_region_dev_read,
1904        .store  = o2hb_region_dev_write,
1905};
1906
1907static struct o2hb_region_attribute o2hb_region_attr_pid = {
1908       .attr   = { .ca_owner = THIS_MODULE,
1909                   .ca_name = "pid",
1910                   .ca_mode = S_IRUGO | S_IRUSR },
1911       .show   = o2hb_region_pid_read,
1912};
1913
1914static struct configfs_attribute *o2hb_region_attrs[] = {
1915        &o2hb_region_attr_block_bytes.attr,
1916        &o2hb_region_attr_start_block.attr,
1917        &o2hb_region_attr_blocks.attr,
1918        &o2hb_region_attr_dev.attr,
1919        &o2hb_region_attr_pid.attr,
1920        NULL,
1921};
1922
1923static ssize_t o2hb_region_show(struct config_item *item,
1924                                struct configfs_attribute *attr,
1925                                char *page)
1926{
1927        struct o2hb_region *reg = to_o2hb_region(item);
1928        struct o2hb_region_attribute *o2hb_region_attr =
1929                container_of(attr, struct o2hb_region_attribute, attr);
1930        ssize_t ret = 0;
1931
1932        if (o2hb_region_attr->show)
1933                ret = o2hb_region_attr->show(reg, page);
1934        return ret;
1935}
1936
1937static ssize_t o2hb_region_store(struct config_item *item,
1938                                 struct configfs_attribute *attr,
1939                                 const char *page, size_t count)
1940{
1941        struct o2hb_region *reg = to_o2hb_region(item);
1942        struct o2hb_region_attribute *o2hb_region_attr =
1943                container_of(attr, struct o2hb_region_attribute, attr);
1944        ssize_t ret = -EINVAL;
1945
1946        if (o2hb_region_attr->store)
1947                ret = o2hb_region_attr->store(reg, page, count);
1948        return ret;
1949}
1950
1951static struct configfs_item_operations o2hb_region_item_ops = {
1952        .release                = o2hb_region_release,
1953        .show_attribute         = o2hb_region_show,
1954        .store_attribute        = o2hb_region_store,
1955};
1956
1957static struct config_item_type o2hb_region_type = {
1958        .ct_item_ops    = &o2hb_region_item_ops,
1959        .ct_attrs       = o2hb_region_attrs,
1960        .ct_owner       = THIS_MODULE,
1961};
1962
1963/* heartbeat set */
1964
1965struct o2hb_heartbeat_group {
1966        struct config_group hs_group;
1967        /* some stuff? */
1968};
1969
1970static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
1971{
1972        return group ?
1973                container_of(group, struct o2hb_heartbeat_group, hs_group)
1974                : NULL;
1975}
1976
1977static int o2hb_debug_region_init(struct o2hb_region *reg, struct dentry *dir)
1978{
1979        int ret = -ENOMEM;
1980
1981        reg->hr_debug_dir =
1982                debugfs_create_dir(config_item_name(&reg->hr_item), dir);
1983        if (!reg->hr_debug_dir) {
1984                mlog_errno(ret);
1985                goto bail;
1986        }
1987
1988        reg->hr_debug_livenodes =
1989                        o2hb_debug_create(O2HB_DEBUG_LIVENODES,
1990                                          reg->hr_debug_dir,
1991                                          &(reg->hr_db_livenodes),
1992                                          sizeof(*(reg->hr_db_livenodes)),
1993                                          O2HB_DB_TYPE_REGION_LIVENODES,
1994                                          sizeof(reg->hr_live_node_bitmap),
1995                                          O2NM_MAX_NODES, reg);
1996        if (!reg->hr_debug_livenodes) {
1997                mlog_errno(ret);
1998                goto bail;
1999        }
2000
2001        reg->hr_debug_regnum =
2002                        o2hb_debug_create(O2HB_DEBUG_REGION_NUMBER,
2003                                          reg->hr_debug_dir,
2004                                          &(reg->hr_db_regnum),
2005                                          sizeof(*(reg->hr_db_regnum)),
2006                                          O2HB_DB_TYPE_REGION_NUMBER,
2007                                          0, O2NM_MAX_NODES, reg);
2008        if (!reg->hr_debug_regnum) {
2009                mlog_errno(ret);
2010                goto bail;
2011        }
2012
2013        reg->hr_debug_elapsed_time =
2014                        o2hb_debug_create(O2HB_DEBUG_REGION_ELAPSED_TIME,
2015                                          reg->hr_debug_dir,
2016                                          &(reg->hr_db_elapsed_time),
2017                                          sizeof(*(reg->hr_db_elapsed_time)),
2018                                          O2HB_DB_TYPE_REGION_ELAPSED_TIME,
2019                                          0, 0, reg);
2020        if (!reg->hr_debug_elapsed_time) {
2021                mlog_errno(ret);
2022                goto bail;
2023        }
2024
2025        reg->hr_debug_pinned =
2026                        o2hb_debug_create(O2HB_DEBUG_REGION_PINNED,
2027                                          reg->hr_debug_dir,
2028                                          &(reg->hr_db_pinned),
2029                                          sizeof(*(reg->hr_db_pinned)),
2030                                          O2HB_DB_TYPE_REGION_PINNED,
2031                                          0, 0, reg);
2032        if (!reg->hr_debug_pinned) {
2033                mlog_errno(ret);
2034                goto bail;
2035        }
2036
2037        ret = 0;
2038bail:
2039        return ret;
2040}
2041
2042static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
2043                                                          const char *name)
2044{
2045        struct o2hb_region *reg = NULL;
2046        int ret;
2047
2048        reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
2049        if (reg == NULL)
2050                return ERR_PTR(-ENOMEM);
2051
2052        if (strlen(name) > O2HB_MAX_REGION_NAME_LEN) {
2053                ret = -ENAMETOOLONG;
2054                goto free;
2055        }
2056
2057        spin_lock(&o2hb_live_lock);
2058        reg->hr_region_num = 0;
2059        if (o2hb_global_heartbeat_active()) {
2060                reg->hr_region_num = find_first_zero_bit(o2hb_region_bitmap,
2061                                                         O2NM_MAX_REGIONS);
2062                if (reg->hr_region_num >= O2NM_MAX_REGIONS) {
2063                        spin_unlock(&o2hb_live_lock);
2064                        ret = -EFBIG;
2065                        goto free;
2066                }
2067                set_bit(reg->hr_region_num, o2hb_region_bitmap);
2068        }
2069        list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
2070        spin_unlock(&o2hb_live_lock);
2071
2072        config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
2073
2074        ret = o2hb_debug_region_init(reg, o2hb_debug_dir);
2075        if (ret) {
2076                config_item_put(&reg->hr_item);
2077                goto free;
2078        }
2079
2080        return &reg->hr_item;
2081free:
2082        kfree(reg);
2083        return ERR_PTR(ret);
2084}
2085
2086static void o2hb_heartbeat_group_drop_item(struct config_group *group,
2087                                           struct config_item *item)
2088{
2089        struct task_struct *hb_task;
2090        struct o2hb_region *reg = to_o2hb_region(item);
2091        int quorum_region = 0;
2092
2093        /* stop the thread when the user removes the region dir */
2094        spin_lock(&o2hb_live_lock);
2095        if (o2hb_global_heartbeat_active()) {
2096                clear_bit(reg->hr_region_num, o2hb_region_bitmap);
2097                clear_bit(reg->hr_region_num, o2hb_live_region_bitmap);
2098                if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
2099                        quorum_region = 1;
2100                clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
2101        }
2102        hb_task = reg->hr_task;
2103        reg->hr_task = NULL;
2104        reg->hr_item_dropped = 1;
2105        spin_unlock(&o2hb_live_lock);
2106
2107        if (hb_task)
2108                kthread_stop(hb_task);
2109
2110        /*
2111         * If we're racing a dev_write(), we need to wake them.  They will
2112         * check reg->hr_task
2113         */
2114        if (atomic_read(&reg->hr_steady_iterations) != 0) {
2115                atomic_set(&reg->hr_steady_iterations, 0);
2116                wake_up(&o2hb_steady_queue);
2117        }
2118
2119        if (o2hb_global_heartbeat_active())
2120                printk(KERN_NOTICE "o2hb: Heartbeat stopped on region %s\n",
2121                       config_item_name(&reg->hr_item));
2122
2123        config_item_put(item);
2124
2125        if (!o2hb_global_heartbeat_active() || !quorum_region)
2126                return;
2127
2128        /*
2129         * If global heartbeat active and there are dependent users,
2130         * pin all regions if quorum region count <= CUT_OFF
2131         */
2132        spin_lock(&o2hb_live_lock);
2133
2134        if (!o2hb_dependent_users)
2135                goto unlock;
2136
2137        if (o2hb_pop_count(&o2hb_quorum_region_bitmap,
2138                           O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2139                o2hb_region_pin(NULL);
2140
2141unlock:
2142        spin_unlock(&o2hb_live_lock);
2143}
2144
2145struct o2hb_heartbeat_group_attribute {
2146        struct configfs_attribute attr;
2147        ssize_t (*show)(struct o2hb_heartbeat_group *, char *);
2148        ssize_t (*store)(struct o2hb_heartbeat_group *, const char *, size_t);
2149};
2150
2151static ssize_t o2hb_heartbeat_group_show(struct config_item *item,
2152                                         struct configfs_attribute *attr,
2153                                         char *page)
2154{
2155        struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
2156        struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
2157                container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
2158        ssize_t ret = 0;
2159
2160        if (o2hb_heartbeat_group_attr->show)
2161                ret = o2hb_heartbeat_group_attr->show(reg, page);
2162        return ret;
2163}
2164
2165static ssize_t o2hb_heartbeat_group_store(struct config_item *item,
2166                                          struct configfs_attribute *attr,
2167                                          const char *page, size_t count)
2168{
2169        struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
2170        struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
2171                container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
2172        ssize_t ret = -EINVAL;
2173
2174        if (o2hb_heartbeat_group_attr->store)
2175                ret = o2hb_heartbeat_group_attr->store(reg, page, count);
2176        return ret;
2177}
2178
2179static ssize_t o2hb_heartbeat_group_threshold_show(struct o2hb_heartbeat_group *group,
2180                                                     char *page)
2181{
2182        return sprintf(page, "%u\n", o2hb_dead_threshold);
2183}
2184
2185static ssize_t o2hb_heartbeat_group_threshold_store(struct o2hb_heartbeat_group *group,
2186                                                    const char *page,
2187                                                    size_t count)
2188{
2189        unsigned long tmp;
2190        char *p = (char *)page;
2191
2192        tmp = simple_strtoul(p, &p, 10);
2193        if (!p || (*p && (*p != '\n')))
2194                return -EINVAL;
2195
2196        /* this will validate ranges for us. */
2197        o2hb_dead_threshold_set((unsigned int) tmp);
2198
2199        return count;
2200}
2201
2202static
2203ssize_t o2hb_heartbeat_group_mode_show(struct o2hb_heartbeat_group *group,
2204                                       char *page)
2205{
2206        return sprintf(page, "%s\n",
2207                       o2hb_heartbeat_mode_desc[o2hb_heartbeat_mode]);
2208}
2209
2210static
2211ssize_t o2hb_heartbeat_group_mode_store(struct o2hb_heartbeat_group *group,
2212                                        const char *page, size_t count)
2213{
2214        unsigned int i;
2215        int ret;
2216        size_t len;
2217
2218        len = (page[count - 1] == '\n') ? count - 1 : count;
2219        if (!len)
2220                return -EINVAL;
2221
2222        for (i = 0; i < O2HB_HEARTBEAT_NUM_MODES; ++i) {
2223                if (strnicmp(page, o2hb_heartbeat_mode_desc[i], len))
2224                        continue;
2225
2226                ret = o2hb_global_hearbeat_mode_set(i);
2227                if (!ret)
2228                        printk(KERN_NOTICE "o2hb: Heartbeat mode set to %s\n",
2229                               o2hb_heartbeat_mode_desc[i]);
2230                return count;
2231        }
2232
2233        return -EINVAL;
2234
2235}
2236
2237static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold = {
2238        .attr   = { .ca_owner = THIS_MODULE,
2239                    .ca_name = "dead_threshold",
2240                    .ca_mode = S_IRUGO | S_IWUSR },
2241        .show   = o2hb_heartbeat_group_threshold_show,
2242        .store  = o2hb_heartbeat_group_threshold_store,
2243};
2244
2245static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_mode = {
2246        .attr   = { .ca_owner = THIS_MODULE,
2247                .ca_name = "mode",
2248                .ca_mode = S_IRUGO | S_IWUSR },
2249        .show   = o2hb_heartbeat_group_mode_show,
2250        .store  = o2hb_heartbeat_group_mode_store,
2251};
2252
2253static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
2254        &o2hb_heartbeat_group_attr_threshold.attr,
2255        &o2hb_heartbeat_group_attr_mode.attr,
2256        NULL,
2257};
2258
2259static struct configfs_item_operations o2hb_hearbeat_group_item_ops = {
2260        .show_attribute         = o2hb_heartbeat_group_show,
2261        .store_attribute        = o2hb_heartbeat_group_store,
2262};
2263
2264static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
2265        .make_item      = o2hb_heartbeat_group_make_item,
2266        .drop_item      = o2hb_heartbeat_group_drop_item,
2267};
2268
2269static struct config_item_type o2hb_heartbeat_group_type = {
2270        .ct_group_ops   = &o2hb_heartbeat_group_group_ops,
2271        .ct_item_ops    = &o2hb_hearbeat_group_item_ops,
2272        .ct_attrs       = o2hb_heartbeat_group_attrs,
2273        .ct_owner       = THIS_MODULE,
2274};
2275
2276/* this is just here to avoid touching group in heartbeat.h which the
2277 * entire damn world #includes */
2278struct config_group *o2hb_alloc_hb_set(void)
2279{
2280        struct o2hb_heartbeat_group *hs = NULL;
2281        struct config_group *ret = NULL;
2282
2283        hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
2284        if (hs == NULL)
2285                goto out;
2286
2287        config_group_init_type_name(&hs->hs_group, "heartbeat",
2288                                    &o2hb_heartbeat_group_type);
2289
2290        ret = &hs->hs_group;
2291out:
2292        if (ret == NULL)
2293                kfree(hs);
2294        return ret;
2295}
2296
2297void o2hb_free_hb_set(struct config_group *group)
2298{
2299        struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
2300        kfree(hs);
2301}
2302
2303/* hb callback registration and issuing */
2304
2305static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
2306{
2307        if (type == O2HB_NUM_CB)
2308                return ERR_PTR(-EINVAL);
2309
2310        return &o2hb_callbacks[type];
2311}
2312
2313void o2hb_setup_callback(struct o2hb_callback_func *hc,
2314                         enum o2hb_callback_type type,
2315                         o2hb_cb_func *func,
2316                         void *data,
2317                         int priority)
2318{
2319        INIT_LIST_HEAD(&hc->hc_item);
2320        hc->hc_func = func;
2321        hc->hc_data = data;
2322        hc->hc_priority = priority;
2323        hc->hc_type = type;
2324        hc->hc_magic = O2HB_CB_MAGIC;
2325}
2326EXPORT_SYMBOL_GPL(o2hb_setup_callback);
2327
2328/*
2329 * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2330 * In global heartbeat mode, region_uuid passed is NULL.
2331 *
2332 * In local, we only pin the matching region. In global we pin all the active
2333 * regions.
2334 */
2335static int o2hb_region_pin(const char *region_uuid)
2336{
2337        int ret = 0, found = 0;
2338        struct o2hb_region *reg;
2339        char *uuid;
2340
2341        assert_spin_locked(&o2hb_live_lock);
2342
2343        list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2344                uuid = config_item_name(&reg->hr_item);
2345
2346                /* local heartbeat */
2347                if (region_uuid) {
2348                        if (strcmp(region_uuid, uuid))
2349                                continue;
2350                        found = 1;
2351                }
2352
2353                if (reg->hr_item_pinned || reg->hr_item_dropped)
2354                        goto skip_pin;
2355
2356                /* Ignore ENOENT only for local hb (userdlm domain) */
2357                ret = o2nm_depend_item(&reg->hr_item);
2358                if (!ret) {
2359                        mlog(ML_CLUSTER, "Pin region %s\n", uuid);
2360                        reg->hr_item_pinned = 1;
2361                } else {
2362                        if (ret == -ENOENT && found)
2363                                ret = 0;
2364                        else {
2365                                mlog(ML_ERROR, "Pin region %s fails with %d\n",
2366                                     uuid, ret);
2367                                break;
2368                        }
2369                }
2370skip_pin:
2371                if (found)
2372                        break;
2373        }
2374
2375        return ret;
2376}
2377
2378/*
2379 * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2380 * In global heartbeat mode, region_uuid passed is NULL.
2381 *
2382 * In local, we only unpin the matching region. In global we unpin all the
2383 * active regions.
2384 */
2385static void o2hb_region_unpin(const char *region_uuid)
2386{
2387        struct o2hb_region *reg;
2388        char *uuid;
2389        int found = 0;
2390
2391        assert_spin_locked(&o2hb_live_lock);
2392
2393        list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2394                uuid = config_item_name(&reg->hr_item);
2395                if (region_uuid) {
2396                        if (strcmp(region_uuid, uuid))
2397                                continue;
2398                        found = 1;
2399                }
2400
2401                if (reg->hr_item_pinned) {
2402                        mlog(ML_CLUSTER, "Unpin region %s\n", uuid);
2403                        o2nm_undepend_item(&reg->hr_item);
2404                        reg->hr_item_pinned = 0;
2405                }
2406                if (found)
2407                        break;
2408        }
2409}
2410
2411static int o2hb_region_inc_user(const char *region_uuid)
2412{
2413        int ret = 0;
2414
2415        spin_lock(&o2hb_live_lock);
2416
2417        /* local heartbeat */
2418        if (!o2hb_global_heartbeat_active()) {
2419            ret = o2hb_region_pin(region_uuid);
2420            goto unlock;
2421        }
2422
2423        /*
2424         * if global heartbeat active and this is the first dependent user,
2425         * pin all regions if quorum region count <= CUT_OFF
2426         */
2427        o2hb_dependent_users++;
2428        if (o2hb_dependent_users > 1)
2429                goto unlock;
2430
2431        if (o2hb_pop_count(&o2hb_quorum_region_bitmap,
2432                           O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2433                ret = o2hb_region_pin(NULL);
2434
2435unlock:
2436        spin_unlock(&o2hb_live_lock);
2437        return ret;
2438}
2439
2440void o2hb_region_dec_user(const char *region_uuid)
2441{
2442        spin_lock(&o2hb_live_lock);
2443
2444        /* local heartbeat */
2445        if (!o2hb_global_heartbeat_active()) {
2446            o2hb_region_unpin(region_uuid);
2447            goto unlock;
2448        }
2449
2450        /*
2451         * if global heartbeat active and there are no dependent users,
2452         * unpin all quorum regions
2453         */
2454        o2hb_dependent_users--;
2455        if (!o2hb_dependent_users)
2456                o2hb_region_unpin(NULL);
2457
2458unlock:
2459        spin_unlock(&o2hb_live_lock);
2460}
2461
2462int o2hb_register_callback(const char *region_uuid,
2463                           struct o2hb_callback_func *hc)
2464{
2465        struct o2hb_callback_func *tmp;
2466        struct list_head *iter;
2467        struct o2hb_callback *hbcall;
2468        int ret;
2469
2470        BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2471        BUG_ON(!list_empty(&hc->hc_item));
2472
2473        hbcall = hbcall_from_type(hc->hc_type);
2474        if (IS_ERR(hbcall)) {
2475                ret = PTR_ERR(hbcall);
2476                goto out;
2477        }
2478
2479        if (region_uuid) {
2480                ret = o2hb_region_inc_user(region_uuid);
2481                if (ret) {
2482                        mlog_errno(ret);
2483                        goto out;
2484                }
2485        }
2486
2487        down_write(&o2hb_callback_sem);
2488
2489        list_for_each(iter, &hbcall->list) {
2490                tmp = list_entry(iter, struct o2hb_callback_func, hc_item);
2491                if (hc->hc_priority < tmp->hc_priority) {
2492                        list_add_tail(&hc->hc_item, iter);
2493                        break;
2494                }
2495        }
2496        if (list_empty(&hc->hc_item))
2497                list_add_tail(&hc->hc_item, &hbcall->list);
2498
2499        up_write(&o2hb_callback_sem);
2500        ret = 0;
2501out:
2502        mlog(ML_CLUSTER, "returning %d on behalf of %p for funcs %p\n",
2503             ret, __builtin_return_address(0), hc);
2504        return ret;
2505}
2506EXPORT_SYMBOL_GPL(o2hb_register_callback);
2507
2508void o2hb_unregister_callback(const char *region_uuid,
2509                              struct o2hb_callback_func *hc)
2510{
2511        BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2512
2513        mlog(ML_CLUSTER, "on behalf of %p for funcs %p\n",
2514             __builtin_return_address(0), hc);
2515
2516        /* XXX Can this happen _with_ a region reference? */
2517        if (list_empty(&hc->hc_item))
2518                return;
2519
2520        if (region_uuid)
2521                o2hb_region_dec_user(region_uuid);
2522
2523        down_write(&o2hb_callback_sem);
2524
2525        list_del_init(&hc->hc_item);
2526
2527        up_write(&o2hb_callback_sem);
2528}
2529EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
2530
2531int o2hb_check_node_heartbeating(u8 node_num)
2532{
2533        unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2534
2535        o2hb_fill_node_map(testing_map, sizeof(testing_map));
2536        if (!test_bit(node_num, testing_map)) {
2537                mlog(ML_HEARTBEAT,
2538                     "node (%u) does not have heartbeating enabled.\n",
2539                     node_num);
2540                return 0;
2541        }
2542
2543        return 1;
2544}
2545EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating);
2546
2547int o2hb_check_node_heartbeating_from_callback(u8 node_num)
2548{
2549        unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2550
2551        o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
2552        if (!test_bit(node_num, testing_map)) {
2553                mlog(ML_HEARTBEAT,
2554                     "node (%u) does not have heartbeating enabled.\n",
2555                     node_num);
2556                return 0;
2557        }
2558
2559        return 1;
2560}
2561EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
2562
2563/* Makes sure our local node is configured with a node number, and is
2564 * heartbeating. */
2565int o2hb_check_local_node_heartbeating(void)
2566{
2567        u8 node_num;
2568
2569        /* if this node was set then we have networking */
2570        node_num = o2nm_this_node();
2571        if (node_num == O2NM_MAX_NODES) {
2572                mlog(ML_HEARTBEAT, "this node has not been configured.\n");
2573                return 0;
2574        }
2575
2576        return o2hb_check_node_heartbeating(node_num);
2577}
2578EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating);
2579
2580/*
2581 * this is just a hack until we get the plumbing which flips file systems
2582 * read only and drops the hb ref instead of killing the node dead.
2583 */
2584void o2hb_stop_all_regions(void)
2585{
2586        struct o2hb_region *reg;
2587
2588        mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
2589
2590        spin_lock(&o2hb_live_lock);
2591
2592        list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
2593                reg->hr_unclean_stop = 1;
2594
2595        spin_unlock(&o2hb_live_lock);
2596}
2597EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
2598
2599int o2hb_get_all_regions(char *region_uuids, u8 max_regions)
2600{
2601        struct o2hb_region *reg;
2602        int numregs = 0;
2603        char *p;
2604
2605        spin_lock(&o2hb_live_lock);
2606
2607        p = region_uuids;
2608        list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2609                mlog(0, "Region: %s\n", config_item_name(&reg->hr_item));
2610                if (numregs < max_regions) {
2611                        memcpy(p, config_item_name(&reg->hr_item),
2612                               O2HB_MAX_REGION_NAME_LEN);
2613                        p += O2HB_MAX_REGION_NAME_LEN;
2614                }
2615                numregs++;
2616        }
2617
2618        spin_unlock(&o2hb_live_lock);
2619
2620        return numregs;
2621}
2622EXPORT_SYMBOL_GPL(o2hb_get_all_regions);
2623
2624int o2hb_global_heartbeat_active(void)
2625{
2626        return (o2hb_heartbeat_mode == O2HB_HEARTBEAT_GLOBAL);
2627}
2628EXPORT_SYMBOL(o2hb_global_heartbeat_active);
2629