linux/fs/ocfs2/cluster/heartbeat.c
<<
>>
Prefs
   1/* -*- mode: c; c-basic-offset: 8; -*-
   2 * vim: noexpandtab sw=8 ts=8 sts=0:
   3 *
   4 * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public
   8 * License as published by the Free Software Foundation; either
   9 * version 2 of the License, or (at your option) any later version.
  10 *
  11 * This program is distributed in the hope that it will be useful,
  12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * General Public License for more details.
  15 *
  16 * You should have received a copy of the GNU General Public
  17 * License along with this program; if not, write to the
  18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  19 * Boston, MA 021110-1307, USA.
  20 */
  21
  22#include <linux/kernel.h>
  23#include <linux/sched.h>
  24#include <linux/jiffies.h>
  25#include <linux/module.h>
  26#include <linux/fs.h>
  27#include <linux/bio.h>
  28#include <linux/blkdev.h>
  29#include <linux/delay.h>
  30#include <linux/file.h>
  31#include <linux/kthread.h>
  32#include <linux/configfs.h>
  33#include <linux/random.h>
  34#include <linux/crc32.h>
  35#include <linux/time.h>
  36#include <linux/debugfs.h>
  37
  38#include "heartbeat.h"
  39#include "tcp.h"
  40#include "nodemanager.h"
  41#include "quorum.h"
  42
  43#include "masklog.h"
  44
  45
  46/*
  47 * The first heartbeat pass had one global thread that would serialize all hb
  48 * callback calls.  This global serializing sem should only be removed once
  49 * we've made sure that all callees can deal with being called concurrently
  50 * from multiple hb region threads.
  51 */
  52static DECLARE_RWSEM(o2hb_callback_sem);
  53
  54/*
  55 * multiple hb threads are watching multiple regions.  A node is live
  56 * whenever any of the threads sees activity from the node in its region.
  57 */
  58static DEFINE_SPINLOCK(o2hb_live_lock);
  59static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
  60static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
  61static LIST_HEAD(o2hb_node_events);
  62static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
  63
  64#define O2HB_DEBUG_DIR                  "o2hb"
  65#define O2HB_DEBUG_LIVENODES            "livenodes"
  66static struct dentry *o2hb_debug_dir;
  67static struct dentry *o2hb_debug_livenodes;
  68
  69static LIST_HEAD(o2hb_all_regions);
  70
  71static struct o2hb_callback {
  72        struct list_head list;
  73} o2hb_callbacks[O2HB_NUM_CB];
  74
  75static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
  76
  77#define O2HB_DEFAULT_BLOCK_BITS       9
  78
  79unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
  80
  81/* Only sets a new threshold if there are no active regions. 
  82 *
  83 * No locking or otherwise interesting code is required for reading
  84 * o2hb_dead_threshold as it can't change once regions are active and
  85 * it's not interesting to anyone until then anyway. */
  86static void o2hb_dead_threshold_set(unsigned int threshold)
  87{
  88        if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
  89                spin_lock(&o2hb_live_lock);
  90                if (list_empty(&o2hb_all_regions))
  91                        o2hb_dead_threshold = threshold;
  92                spin_unlock(&o2hb_live_lock);
  93        }
  94}
  95
  96struct o2hb_node_event {
  97        struct list_head        hn_item;
  98        enum o2hb_callback_type hn_event_type;
  99        struct o2nm_node        *hn_node;
 100        int                     hn_node_num;
 101};
 102
 103struct o2hb_disk_slot {
 104        struct o2hb_disk_heartbeat_block *ds_raw_block;
 105        u8                      ds_node_num;
 106        u64                     ds_last_time;
 107        u64                     ds_last_generation;
 108        u16                     ds_equal_samples;
 109        u16                     ds_changed_samples;
 110        struct list_head        ds_live_item;
 111};
 112
 113/* each thread owns a region.. when we're asked to tear down the region
 114 * we ask the thread to stop, who cleans up the region */
 115struct o2hb_region {
 116        struct config_item      hr_item;
 117
 118        struct list_head        hr_all_item;
 119        unsigned                hr_unclean_stop:1;
 120
 121        /* protected by the hr_callback_sem */
 122        struct task_struct      *hr_task;
 123
 124        unsigned int            hr_blocks;
 125        unsigned long long      hr_start_block;
 126
 127        unsigned int            hr_block_bits;
 128        unsigned int            hr_block_bytes;
 129
 130        unsigned int            hr_slots_per_page;
 131        unsigned int            hr_num_pages;
 132
 133        struct page             **hr_slot_data;
 134        struct block_device     *hr_bdev;
 135        struct o2hb_disk_slot   *hr_slots;
 136
 137        /* let the person setting up hb wait for it to return until it
 138         * has reached a 'steady' state.  This will be fixed when we have
 139         * a more complete api that doesn't lead to this sort of fragility. */
 140        atomic_t                hr_steady_iterations;
 141
 142        char                    hr_dev_name[BDEVNAME_SIZE];
 143
 144        unsigned int            hr_timeout_ms;
 145
 146        /* randomized as the region goes up and down so that a node
 147         * recognizes a node going up and down in one iteration */
 148        u64                     hr_generation;
 149
 150        struct delayed_work     hr_write_timeout_work;
 151        unsigned long           hr_last_timeout_start;
 152
 153        /* Used during o2hb_check_slot to hold a copy of the block
 154         * being checked because we temporarily have to zero out the
 155         * crc field. */
 156        struct o2hb_disk_heartbeat_block *hr_tmp_block;
 157};
 158
 159struct o2hb_bio_wait_ctxt {
 160        atomic_t          wc_num_reqs;
 161        struct completion wc_io_complete;
 162        int               wc_error;
 163};
 164
 165static void o2hb_write_timeout(struct work_struct *work)
 166{
 167        struct o2hb_region *reg =
 168                container_of(work, struct o2hb_region,
 169                             hr_write_timeout_work.work);
 170
 171        mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
 172             "milliseconds\n", reg->hr_dev_name,
 173             jiffies_to_msecs(jiffies - reg->hr_last_timeout_start)); 
 174        o2quo_disk_timeout();
 175}
 176
 177static void o2hb_arm_write_timeout(struct o2hb_region *reg)
 178{
 179        mlog(0, "Queue write timeout for %u ms\n", O2HB_MAX_WRITE_TIMEOUT_MS);
 180
 181        cancel_delayed_work(&reg->hr_write_timeout_work);
 182        reg->hr_last_timeout_start = jiffies;
 183        schedule_delayed_work(&reg->hr_write_timeout_work,
 184                              msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
 185}
 186
 187static void o2hb_disarm_write_timeout(struct o2hb_region *reg)
 188{
 189        cancel_delayed_work(&reg->hr_write_timeout_work);
 190        flush_scheduled_work();
 191}
 192
 193static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
 194{
 195        atomic_set(&wc->wc_num_reqs, 1);
 196        init_completion(&wc->wc_io_complete);
 197        wc->wc_error = 0;
 198}
 199
 200/* Used in error paths too */
 201static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
 202                                     unsigned int num)
 203{
 204        /* sadly atomic_sub_and_test() isn't available on all platforms.  The
 205         * good news is that the fast path only completes one at a time */
 206        while(num--) {
 207                if (atomic_dec_and_test(&wc->wc_num_reqs)) {
 208                        BUG_ON(num > 0);
 209                        complete(&wc->wc_io_complete);
 210                }
 211        }
 212}
 213
 214static void o2hb_wait_on_io(struct o2hb_region *reg,
 215                            struct o2hb_bio_wait_ctxt *wc)
 216{
 217        struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping;
 218
 219        blk_run_address_space(mapping);
 220        o2hb_bio_wait_dec(wc, 1);
 221
 222        wait_for_completion(&wc->wc_io_complete);
 223}
 224
 225static void o2hb_bio_end_io(struct bio *bio,
 226                           int error)
 227{
 228        struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
 229
 230        if (error) {
 231                mlog(ML_ERROR, "IO Error %d\n", error);
 232                wc->wc_error = error;
 233        }
 234
 235        o2hb_bio_wait_dec(wc, 1);
 236        bio_put(bio);
 237}
 238
 239/* Setup a Bio to cover I/O against num_slots slots starting at
 240 * start_slot. */
 241static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
 242                                      struct o2hb_bio_wait_ctxt *wc,
 243                                      unsigned int *current_slot,
 244                                      unsigned int max_slots)
 245{
 246        int len, current_page;
 247        unsigned int vec_len, vec_start;
 248        unsigned int bits = reg->hr_block_bits;
 249        unsigned int spp = reg->hr_slots_per_page;
 250        unsigned int cs = *current_slot;
 251        struct bio *bio;
 252        struct page *page;
 253
 254        /* Testing has shown this allocation to take long enough under
 255         * GFP_KERNEL that the local node can get fenced. It would be
 256         * nicest if we could pre-allocate these bios and avoid this
 257         * all together. */
 258        bio = bio_alloc(GFP_ATOMIC, 16);
 259        if (!bio) {
 260                mlog(ML_ERROR, "Could not alloc slots BIO!\n");
 261                bio = ERR_PTR(-ENOMEM);
 262                goto bail;
 263        }
 264
 265        /* Must put everything in 512 byte sectors for the bio... */
 266        bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9);
 267        bio->bi_bdev = reg->hr_bdev;
 268        bio->bi_private = wc;
 269        bio->bi_end_io = o2hb_bio_end_io;
 270
 271        vec_start = (cs << bits) % PAGE_CACHE_SIZE;
 272        while(cs < max_slots) {
 273                current_page = cs / spp;
 274                page = reg->hr_slot_data[current_page];
 275
 276                vec_len = min(PAGE_CACHE_SIZE - vec_start,
 277                              (max_slots-cs) * (PAGE_CACHE_SIZE/spp) );
 278
 279                mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
 280                     current_page, vec_len, vec_start);
 281
 282                len = bio_add_page(bio, page, vec_len, vec_start);
 283                if (len != vec_len) break;
 284
 285                cs += vec_len / (PAGE_CACHE_SIZE/spp);
 286                vec_start = 0;
 287        }
 288
 289bail:
 290        *current_slot = cs;
 291        return bio;
 292}
 293
 294static int o2hb_read_slots(struct o2hb_region *reg,
 295                           unsigned int max_slots)
 296{
 297        unsigned int current_slot=0;
 298        int status;
 299        struct o2hb_bio_wait_ctxt wc;
 300        struct bio *bio;
 301
 302        o2hb_bio_wait_init(&wc);
 303
 304        while(current_slot < max_slots) {
 305                bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots);
 306                if (IS_ERR(bio)) {
 307                        status = PTR_ERR(bio);
 308                        mlog_errno(status);
 309                        goto bail_and_wait;
 310                }
 311
 312                atomic_inc(&wc.wc_num_reqs);
 313                submit_bio(READ, bio);
 314        }
 315
 316        status = 0;
 317
 318bail_and_wait:
 319        o2hb_wait_on_io(reg, &wc);
 320        if (wc.wc_error && !status)
 321                status = wc.wc_error;
 322
 323        return status;
 324}
 325
 326static int o2hb_issue_node_write(struct o2hb_region *reg,
 327                                 struct o2hb_bio_wait_ctxt *write_wc)
 328{
 329        int status;
 330        unsigned int slot;
 331        struct bio *bio;
 332
 333        o2hb_bio_wait_init(write_wc);
 334
 335        slot = o2nm_this_node();
 336
 337        bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1);
 338        if (IS_ERR(bio)) {
 339                status = PTR_ERR(bio);
 340                mlog_errno(status);
 341                goto bail;
 342        }
 343
 344        atomic_inc(&write_wc->wc_num_reqs);
 345        submit_bio(WRITE, bio);
 346
 347        status = 0;
 348bail:
 349        return status;
 350}
 351
 352static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
 353                                     struct o2hb_disk_heartbeat_block *hb_block)
 354{
 355        __le32 old_cksum;
 356        u32 ret;
 357
 358        /* We want to compute the block crc with a 0 value in the
 359         * hb_cksum field. Save it off here and replace after the
 360         * crc. */
 361        old_cksum = hb_block->hb_cksum;
 362        hb_block->hb_cksum = 0;
 363
 364        ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
 365
 366        hb_block->hb_cksum = old_cksum;
 367
 368        return ret;
 369}
 370
 371static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
 372{
 373        mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
 374             "cksum = 0x%x, generation 0x%llx\n",
 375             (long long)le64_to_cpu(hb_block->hb_seq),
 376             hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
 377             (long long)le64_to_cpu(hb_block->hb_generation));
 378}
 379
 380static int o2hb_verify_crc(struct o2hb_region *reg,
 381                           struct o2hb_disk_heartbeat_block *hb_block)
 382{
 383        u32 read, computed;
 384
 385        read = le32_to_cpu(hb_block->hb_cksum);
 386        computed = o2hb_compute_block_crc_le(reg, hb_block);
 387
 388        return read == computed;
 389}
 390
 391/* We want to make sure that nobody is heartbeating on top of us --
 392 * this will help detect an invalid configuration. */
 393static int o2hb_check_last_timestamp(struct o2hb_region *reg)
 394{
 395        int node_num, ret;
 396        struct o2hb_disk_slot *slot;
 397        struct o2hb_disk_heartbeat_block *hb_block;
 398
 399        node_num = o2nm_this_node();
 400
 401        ret = 1;
 402        slot = &reg->hr_slots[node_num];
 403        /* Don't check on our 1st timestamp */
 404        if (slot->ds_last_time) {
 405                hb_block = slot->ds_raw_block;
 406
 407                if (le64_to_cpu(hb_block->hb_seq) != slot->ds_last_time)
 408                        ret = 0;
 409        }
 410
 411        return ret;
 412}
 413
 414static inline void o2hb_prepare_block(struct o2hb_region *reg,
 415                                      u64 generation)
 416{
 417        int node_num;
 418        u64 cputime;
 419        struct o2hb_disk_slot *slot;
 420        struct o2hb_disk_heartbeat_block *hb_block;
 421
 422        node_num = o2nm_this_node();
 423        slot = &reg->hr_slots[node_num];
 424
 425        hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
 426        memset(hb_block, 0, reg->hr_block_bytes);
 427        /* TODO: time stuff */
 428        cputime = CURRENT_TIME.tv_sec;
 429        if (!cputime)
 430                cputime = 1;
 431
 432        hb_block->hb_seq = cpu_to_le64(cputime);
 433        hb_block->hb_node = node_num;
 434        hb_block->hb_generation = cpu_to_le64(generation);
 435        hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
 436
 437        /* This step must always happen last! */
 438        hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
 439                                                                   hb_block));
 440
 441        mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
 442             (long long)generation,
 443             le32_to_cpu(hb_block->hb_cksum));
 444}
 445
 446static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
 447                                struct o2nm_node *node,
 448                                int idx)
 449{
 450        struct list_head *iter;
 451        struct o2hb_callback_func *f;
 452
 453        list_for_each(iter, &hbcall->list) {
 454                f = list_entry(iter, struct o2hb_callback_func, hc_item);
 455                mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
 456                (f->hc_func)(node, idx, f->hc_data);
 457        }
 458}
 459
 460/* Will run the list in order until we process the passed event */
 461static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
 462{
 463        int empty;
 464        struct o2hb_callback *hbcall;
 465        struct o2hb_node_event *event;
 466
 467        spin_lock(&o2hb_live_lock);
 468        empty = list_empty(&queued_event->hn_item);
 469        spin_unlock(&o2hb_live_lock);
 470        if (empty)
 471                return;
 472
 473        /* Holding callback sem assures we don't alter the callback
 474         * lists when doing this, and serializes ourselves with other
 475         * processes wanting callbacks. */
 476        down_write(&o2hb_callback_sem);
 477
 478        spin_lock(&o2hb_live_lock);
 479        while (!list_empty(&o2hb_node_events)
 480               && !list_empty(&queued_event->hn_item)) {
 481                event = list_entry(o2hb_node_events.next,
 482                                   struct o2hb_node_event,
 483                                   hn_item);
 484                list_del_init(&event->hn_item);
 485                spin_unlock(&o2hb_live_lock);
 486
 487                mlog(ML_HEARTBEAT, "Node %s event for %d\n",
 488                     event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
 489                     event->hn_node_num);
 490
 491                hbcall = hbcall_from_type(event->hn_event_type);
 492
 493                /* We should *never* have gotten on to the list with a
 494                 * bad type... This isn't something that we should try
 495                 * to recover from. */
 496                BUG_ON(IS_ERR(hbcall));
 497
 498                o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
 499
 500                spin_lock(&o2hb_live_lock);
 501        }
 502        spin_unlock(&o2hb_live_lock);
 503
 504        up_write(&o2hb_callback_sem);
 505}
 506
 507static void o2hb_queue_node_event(struct o2hb_node_event *event,
 508                                  enum o2hb_callback_type type,
 509                                  struct o2nm_node *node,
 510                                  int node_num)
 511{
 512        assert_spin_locked(&o2hb_live_lock);
 513
 514        event->hn_event_type = type;
 515        event->hn_node = node;
 516        event->hn_node_num = node_num;
 517
 518        mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
 519             type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
 520
 521        list_add_tail(&event->hn_item, &o2hb_node_events);
 522}
 523
 524static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
 525{
 526        struct o2hb_node_event event =
 527                { .hn_item = LIST_HEAD_INIT(event.hn_item), };
 528        struct o2nm_node *node;
 529
 530        node = o2nm_get_node_by_num(slot->ds_node_num);
 531        if (!node)
 532                return;
 533
 534        spin_lock(&o2hb_live_lock);
 535        if (!list_empty(&slot->ds_live_item)) {
 536                mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
 537                     slot->ds_node_num);
 538
 539                list_del_init(&slot->ds_live_item);
 540
 541                if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
 542                        clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 543
 544                        o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
 545                                              slot->ds_node_num);
 546                }
 547        }
 548        spin_unlock(&o2hb_live_lock);
 549
 550        o2hb_run_event_list(&event);
 551
 552        o2nm_node_put(node);
 553}
 554
 555static int o2hb_check_slot(struct o2hb_region *reg,
 556                           struct o2hb_disk_slot *slot)
 557{
 558        int changed = 0, gen_changed = 0;
 559        struct o2hb_node_event event =
 560                { .hn_item = LIST_HEAD_INIT(event.hn_item), };
 561        struct o2nm_node *node;
 562        struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
 563        u64 cputime;
 564        unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
 565        unsigned int slot_dead_ms;
 566
 567        memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
 568
 569        /* Is this correct? Do we assume that the node doesn't exist
 570         * if we're not configured for him? */
 571        node = o2nm_get_node_by_num(slot->ds_node_num);
 572        if (!node)
 573                return 0;
 574
 575        if (!o2hb_verify_crc(reg, hb_block)) {
 576                /* all paths from here will drop o2hb_live_lock for
 577                 * us. */
 578                spin_lock(&o2hb_live_lock);
 579
 580                /* Don't print an error on the console in this case -
 581                 * a freshly formatted heartbeat area will not have a
 582                 * crc set on it. */
 583                if (list_empty(&slot->ds_live_item))
 584                        goto out;
 585
 586                /* The node is live but pushed out a bad crc. We
 587                 * consider it a transient miss but don't populate any
 588                 * other values as they may be junk. */
 589                mlog(ML_ERROR, "Node %d has written a bad crc to %s\n",
 590                     slot->ds_node_num, reg->hr_dev_name);
 591                o2hb_dump_slot(hb_block);
 592
 593                slot->ds_equal_samples++;
 594                goto fire_callbacks;
 595        }
 596
 597        /* we don't care if these wrap.. the state transitions below
 598         * clear at the right places */
 599        cputime = le64_to_cpu(hb_block->hb_seq);
 600        if (slot->ds_last_time != cputime)
 601                slot->ds_changed_samples++;
 602        else
 603                slot->ds_equal_samples++;
 604        slot->ds_last_time = cputime;
 605
 606        /* The node changed heartbeat generations. We assume this to
 607         * mean it dropped off but came back before we timed out. We
 608         * want to consider it down for the time being but don't want
 609         * to lose any changed_samples state we might build up to
 610         * considering it live again. */
 611        if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
 612                gen_changed = 1;
 613                slot->ds_equal_samples = 0;
 614                mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
 615                     "to 0x%llx)\n", slot->ds_node_num,
 616                     (long long)slot->ds_last_generation,
 617                     (long long)le64_to_cpu(hb_block->hb_generation));
 618        }
 619
 620        slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
 621
 622        mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
 623             "seq %llu last %llu changed %u equal %u\n",
 624             slot->ds_node_num, (long long)slot->ds_last_generation,
 625             le32_to_cpu(hb_block->hb_cksum),
 626             (unsigned long long)le64_to_cpu(hb_block->hb_seq), 
 627             (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
 628             slot->ds_equal_samples);
 629
 630        spin_lock(&o2hb_live_lock);
 631
 632fire_callbacks:
 633        /* dead nodes only come to life after some number of
 634         * changes at any time during their dead time */
 635        if (list_empty(&slot->ds_live_item) &&
 636            slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
 637                mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
 638                     slot->ds_node_num, (long long)slot->ds_last_generation);
 639
 640                /* first on the list generates a callback */
 641                if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
 642                        set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 643
 644                        o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
 645                                              slot->ds_node_num);
 646
 647                        changed = 1;
 648                }
 649
 650                list_add_tail(&slot->ds_live_item,
 651                              &o2hb_live_slots[slot->ds_node_num]);
 652
 653                slot->ds_equal_samples = 0;
 654
 655                /* We want to be sure that all nodes agree on the
 656                 * number of milliseconds before a node will be
 657                 * considered dead. The self-fencing timeout is
 658                 * computed from this value, and a discrepancy might
 659                 * result in heartbeat calling a node dead when it
 660                 * hasn't self-fenced yet. */
 661                slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
 662                if (slot_dead_ms && slot_dead_ms != dead_ms) {
 663                        /* TODO: Perhaps we can fail the region here. */
 664                        mlog(ML_ERROR, "Node %d on device %s has a dead count "
 665                             "of %u ms, but our count is %u ms.\n"
 666                             "Please double check your configuration values "
 667                             "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
 668                             slot->ds_node_num, reg->hr_dev_name, slot_dead_ms,
 669                             dead_ms);
 670                }
 671                goto out;
 672        }
 673
 674        /* if the list is dead, we're done.. */
 675        if (list_empty(&slot->ds_live_item))
 676                goto out;
 677
 678        /* live nodes only go dead after enough consequtive missed
 679         * samples..  reset the missed counter whenever we see
 680         * activity */
 681        if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
 682                mlog(ML_HEARTBEAT, "Node %d left my region\n",
 683                     slot->ds_node_num);
 684
 685                /* last off the live_slot generates a callback */
 686                list_del_init(&slot->ds_live_item);
 687                if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
 688                        clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 689
 690                        o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
 691                                              slot->ds_node_num);
 692
 693                        changed = 1;
 694                }
 695
 696                /* We don't clear this because the node is still
 697                 * actually writing new blocks. */
 698                if (!gen_changed)
 699                        slot->ds_changed_samples = 0;
 700                goto out;
 701        }
 702        if (slot->ds_changed_samples) {
 703                slot->ds_changed_samples = 0;
 704                slot->ds_equal_samples = 0;
 705        }
 706out:
 707        spin_unlock(&o2hb_live_lock);
 708
 709        o2hb_run_event_list(&event);
 710
 711        o2nm_node_put(node);
 712        return changed;
 713}
 714
 715/* This could be faster if we just implmented a find_last_bit, but I
 716 * don't think the circumstances warrant it. */
 717static int o2hb_highest_node(unsigned long *nodes,
 718                             int numbits)
 719{
 720        int highest, node;
 721
 722        highest = numbits;
 723        node = -1;
 724        while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) {
 725                if (node >= numbits)
 726                        break;
 727
 728                highest = node;
 729        }
 730
 731        return highest;
 732}
 733
 734static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
 735{
 736        int i, ret, highest_node, change = 0;
 737        unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
 738        struct o2hb_bio_wait_ctxt write_wc;
 739
 740        ret = o2nm_configured_node_map(configured_nodes,
 741                                       sizeof(configured_nodes));
 742        if (ret) {
 743                mlog_errno(ret);
 744                return ret;
 745        }
 746
 747        highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
 748        if (highest_node >= O2NM_MAX_NODES) {
 749                mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n");
 750                return -EINVAL;
 751        }
 752
 753        /* No sense in reading the slots of nodes that don't exist
 754         * yet. Of course, if the node definitions have holes in them
 755         * then we're reading an empty slot anyway... Consider this
 756         * best-effort. */
 757        ret = o2hb_read_slots(reg, highest_node + 1);
 758        if (ret < 0) {
 759                mlog_errno(ret);
 760                return ret;
 761        }
 762
 763        /* With an up to date view of the slots, we can check that no
 764         * other node has been improperly configured to heartbeat in
 765         * our slot. */
 766        if (!o2hb_check_last_timestamp(reg))
 767                mlog(ML_ERROR, "Device \"%s\": another node is heartbeating "
 768                     "in our slot!\n", reg->hr_dev_name);
 769
 770        /* fill in the proper info for our next heartbeat */
 771        o2hb_prepare_block(reg, reg->hr_generation);
 772
 773        /* And fire off the write. Note that we don't wait on this I/O
 774         * until later. */
 775        ret = o2hb_issue_node_write(reg, &write_wc);
 776        if (ret < 0) {
 777                mlog_errno(ret);
 778                return ret;
 779        }
 780
 781        i = -1;
 782        while((i = find_next_bit(configured_nodes, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
 783
 784                change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
 785        }
 786
 787        /*
 788         * We have to be sure we've advertised ourselves on disk
 789         * before we can go to steady state.  This ensures that
 790         * people we find in our steady state have seen us.
 791         */
 792        o2hb_wait_on_io(reg, &write_wc);
 793        if (write_wc.wc_error) {
 794                /* Do not re-arm the write timeout on I/O error - we
 795                 * can't be sure that the new block ever made it to
 796                 * disk */
 797                mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
 798                     write_wc.wc_error, reg->hr_dev_name);
 799                return write_wc.wc_error;
 800        }
 801
 802        o2hb_arm_write_timeout(reg);
 803
 804        /* let the person who launched us know when things are steady */
 805        if (!change && (atomic_read(&reg->hr_steady_iterations) != 0)) {
 806                if (atomic_dec_and_test(&reg->hr_steady_iterations))
 807                        wake_up(&o2hb_steady_queue);
 808        }
 809
 810        return 0;
 811}
 812
 813/* Subtract b from a, storing the result in a. a *must* have a larger
 814 * value than b. */
 815static void o2hb_tv_subtract(struct timeval *a,
 816                             struct timeval *b)
 817{
 818        /* just return 0 when a is after b */
 819        if (a->tv_sec < b->tv_sec ||
 820            (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) {
 821                a->tv_sec = 0;
 822                a->tv_usec = 0;
 823                return;
 824        }
 825
 826        a->tv_sec -= b->tv_sec;
 827        a->tv_usec -= b->tv_usec;
 828        while ( a->tv_usec < 0 ) {
 829                a->tv_sec--;
 830                a->tv_usec += 1000000;
 831        }
 832}
 833
 834static unsigned int o2hb_elapsed_msecs(struct timeval *start,
 835                                       struct timeval *end)
 836{
 837        struct timeval res = *end;
 838
 839        o2hb_tv_subtract(&res, start);
 840
 841        return res.tv_sec * 1000 + res.tv_usec / 1000;
 842}
 843
 844/*
 845 * we ride the region ref that the region dir holds.  before the region
 846 * dir is removed and drops it ref it will wait to tear down this
 847 * thread.
 848 */
 849static int o2hb_thread(void *data)
 850{
 851        int i, ret;
 852        struct o2hb_region *reg = data;
 853        struct o2hb_bio_wait_ctxt write_wc;
 854        struct timeval before_hb, after_hb;
 855        unsigned int elapsed_msec;
 856
 857        mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
 858
 859        set_user_nice(current, -20);
 860
 861        while (!kthread_should_stop() && !reg->hr_unclean_stop) {
 862                /* We track the time spent inside
 863                 * o2hb_do_disk_heartbeat so that we avoid more than
 864                 * hr_timeout_ms between disk writes. On busy systems
 865                 * this should result in a heartbeat which is less
 866                 * likely to time itself out. */
 867                do_gettimeofday(&before_hb);
 868
 869                i = 0;
 870                do {
 871                        ret = o2hb_do_disk_heartbeat(reg);
 872                } while (ret && ++i < 2);
 873
 874                do_gettimeofday(&after_hb);
 875                elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb);
 876
 877                mlog(0, "start = %lu.%lu, end = %lu.%lu, msec = %u\n",
 878                     before_hb.tv_sec, (unsigned long) before_hb.tv_usec,
 879                     after_hb.tv_sec, (unsigned long) after_hb.tv_usec,
 880                     elapsed_msec);
 881
 882                if (elapsed_msec < reg->hr_timeout_ms) {
 883                        /* the kthread api has blocked signals for us so no
 884                         * need to record the return value. */
 885                        msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
 886                }
 887        }
 888
 889        o2hb_disarm_write_timeout(reg);
 890
 891        /* unclean stop is only used in very bad situation */
 892        for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
 893                o2hb_shutdown_slot(&reg->hr_slots[i]);
 894
 895        /* Explicit down notification - avoid forcing the other nodes
 896         * to timeout on this region when we could just as easily
 897         * write a clear generation - thus indicating to them that
 898         * this node has left this region.
 899         *
 900         * XXX: Should we skip this on unclean_stop? */
 901        o2hb_prepare_block(reg, 0);
 902        ret = o2hb_issue_node_write(reg, &write_wc);
 903        if (ret == 0) {
 904                o2hb_wait_on_io(reg, &write_wc);
 905        } else {
 906                mlog_errno(ret);
 907        }
 908
 909        mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n");
 910
 911        return 0;
 912}
 913
 914#ifdef CONFIG_DEBUG_FS
 915static int o2hb_debug_open(struct inode *inode, struct file *file)
 916{
 917        unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)];
 918        char *buf = NULL;
 919        int i = -1;
 920        int out = 0;
 921
 922        buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
 923        if (!buf)
 924                goto bail;
 925
 926        o2hb_fill_node_map(map, sizeof(map));
 927
 928        while ((i = find_next_bit(map, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES)
 929                out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i);
 930        out += snprintf(buf + out, PAGE_SIZE - out, "\n");
 931
 932        i_size_write(inode, out);
 933
 934        file->private_data = buf;
 935
 936        return 0;
 937bail:
 938        return -ENOMEM;
 939}
 940
 941static int o2hb_debug_release(struct inode *inode, struct file *file)
 942{
 943        kfree(file->private_data);
 944        return 0;
 945}
 946
 947static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
 948                                 size_t nbytes, loff_t *ppos)
 949{
 950        return simple_read_from_buffer(buf, nbytes, ppos, file->private_data,
 951                                       i_size_read(file->f_mapping->host));
 952}
 953#else
 954static int o2hb_debug_open(struct inode *inode, struct file *file)
 955{
 956        return 0;
 957}
 958static int o2hb_debug_release(struct inode *inode, struct file *file)
 959{
 960        return 0;
 961}
 962static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
 963                               size_t nbytes, loff_t *ppos)
 964{
 965        return 0;
 966}
 967#endif  /* CONFIG_DEBUG_FS */
 968
 969static const struct file_operations o2hb_debug_fops = {
 970        .open =         o2hb_debug_open,
 971        .release =      o2hb_debug_release,
 972        .read =         o2hb_debug_read,
 973        .llseek =       generic_file_llseek,
 974};
 975
 976void o2hb_exit(void)
 977{
 978        if (o2hb_debug_livenodes)
 979                debugfs_remove(o2hb_debug_livenodes);
 980        if (o2hb_debug_dir)
 981                debugfs_remove(o2hb_debug_dir);
 982}
 983
 984int o2hb_init(void)
 985{
 986        int i;
 987
 988        for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
 989                INIT_LIST_HEAD(&o2hb_callbacks[i].list);
 990
 991        for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
 992                INIT_LIST_HEAD(&o2hb_live_slots[i]);
 993
 994        INIT_LIST_HEAD(&o2hb_node_events);
 995
 996        memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
 997
 998        o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL);
 999        if (!o2hb_debug_dir) {
1000                mlog_errno(-ENOMEM);
1001                return -ENOMEM;
1002        }
1003
1004        o2hb_debug_livenodes = debugfs_create_file(O2HB_DEBUG_LIVENODES,
1005                                                   S_IFREG|S_IRUSR,
1006                                                   o2hb_debug_dir, NULL,
1007                                                   &o2hb_debug_fops);
1008        if (!o2hb_debug_livenodes) {
1009                mlog_errno(-ENOMEM);
1010                debugfs_remove(o2hb_debug_dir);
1011                return -ENOMEM;
1012        }
1013
1014        return 0;
1015}
1016
1017/* if we're already in a callback then we're already serialized by the sem */
1018static void o2hb_fill_node_map_from_callback(unsigned long *map,
1019                                             unsigned bytes)
1020{
1021        BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
1022
1023        memcpy(map, &o2hb_live_node_bitmap, bytes);
1024}
1025
1026/*
1027 * get a map of all nodes that are heartbeating in any regions
1028 */
1029void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
1030{
1031        /* callers want to serialize this map and callbacks so that they
1032         * can trust that they don't miss nodes coming to the party */
1033        down_read(&o2hb_callback_sem);
1034        spin_lock(&o2hb_live_lock);
1035        o2hb_fill_node_map_from_callback(map, bytes);
1036        spin_unlock(&o2hb_live_lock);
1037        up_read(&o2hb_callback_sem);
1038}
1039EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
1040
1041/*
1042 * heartbeat configfs bits.  The heartbeat set is a default set under
1043 * the cluster set in nodemanager.c.
1044 */
1045
1046static struct o2hb_region *to_o2hb_region(struct config_item *item)
1047{
1048        return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
1049}
1050
1051/* drop_item only drops its ref after killing the thread, nothing should
1052 * be using the region anymore.  this has to clean up any state that
1053 * attributes might have built up. */
1054static void o2hb_region_release(struct config_item *item)
1055{
1056        int i;
1057        struct page *page;
1058        struct o2hb_region *reg = to_o2hb_region(item);
1059
1060        if (reg->hr_tmp_block)
1061                kfree(reg->hr_tmp_block);
1062
1063        if (reg->hr_slot_data) {
1064                for (i = 0; i < reg->hr_num_pages; i++) {
1065                        page = reg->hr_slot_data[i];
1066                        if (page)
1067                                __free_page(page);
1068                }
1069                kfree(reg->hr_slot_data);
1070        }
1071
1072        if (reg->hr_bdev)
1073                blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1074
1075        if (reg->hr_slots)
1076                kfree(reg->hr_slots);
1077
1078        spin_lock(&o2hb_live_lock);
1079        list_del(&reg->hr_all_item);
1080        spin_unlock(&o2hb_live_lock);
1081
1082        kfree(reg);
1083}
1084
1085static int o2hb_read_block_input(struct o2hb_region *reg,
1086                                 const char *page,
1087                                 size_t count,
1088                                 unsigned long *ret_bytes,
1089                                 unsigned int *ret_bits)
1090{
1091        unsigned long bytes;
1092        char *p = (char *)page;
1093
1094        bytes = simple_strtoul(p, &p, 0);
1095        if (!p || (*p && (*p != '\n')))
1096                return -EINVAL;
1097
1098        /* Heartbeat and fs min / max block sizes are the same. */
1099        if (bytes > 4096 || bytes < 512)
1100                return -ERANGE;
1101        if (hweight16(bytes) != 1)
1102                return -EINVAL;
1103
1104        if (ret_bytes)
1105                *ret_bytes = bytes;
1106        if (ret_bits)
1107                *ret_bits = ffs(bytes) - 1;
1108
1109        return 0;
1110}
1111
1112static ssize_t o2hb_region_block_bytes_read(struct o2hb_region *reg,
1113                                            char *page)
1114{
1115        return sprintf(page, "%u\n", reg->hr_block_bytes);
1116}
1117
1118static ssize_t o2hb_region_block_bytes_write(struct o2hb_region *reg,
1119                                             const char *page,
1120                                             size_t count)
1121{
1122        int status;
1123        unsigned long block_bytes;
1124        unsigned int block_bits;
1125
1126        if (reg->hr_bdev)
1127                return -EINVAL;
1128
1129        status = o2hb_read_block_input(reg, page, count,
1130                                       &block_bytes, &block_bits);
1131        if (status)
1132                return status;
1133
1134        reg->hr_block_bytes = (unsigned int)block_bytes;
1135        reg->hr_block_bits = block_bits;
1136
1137        return count;
1138}
1139
1140static ssize_t o2hb_region_start_block_read(struct o2hb_region *reg,
1141                                            char *page)
1142{
1143        return sprintf(page, "%llu\n", reg->hr_start_block);
1144}
1145
1146static ssize_t o2hb_region_start_block_write(struct o2hb_region *reg,
1147                                             const char *page,
1148                                             size_t count)
1149{
1150        unsigned long long tmp;
1151        char *p = (char *)page;
1152
1153        if (reg->hr_bdev)
1154                return -EINVAL;
1155
1156        tmp = simple_strtoull(p, &p, 0);
1157        if (!p || (*p && (*p != '\n')))
1158                return -EINVAL;
1159
1160        reg->hr_start_block = tmp;
1161
1162        return count;
1163}
1164
1165static ssize_t o2hb_region_blocks_read(struct o2hb_region *reg,
1166                                       char *page)
1167{
1168        return sprintf(page, "%d\n", reg->hr_blocks);
1169}
1170
1171static ssize_t o2hb_region_blocks_write(struct o2hb_region *reg,
1172                                        const char *page,
1173                                        size_t count)
1174{
1175        unsigned long tmp;
1176        char *p = (char *)page;
1177
1178        if (reg->hr_bdev)
1179                return -EINVAL;
1180
1181        tmp = simple_strtoul(p, &p, 0);
1182        if (!p || (*p && (*p != '\n')))
1183                return -EINVAL;
1184
1185        if (tmp > O2NM_MAX_NODES || tmp == 0)
1186                return -ERANGE;
1187
1188        reg->hr_blocks = (unsigned int)tmp;
1189
1190        return count;
1191}
1192
1193static ssize_t o2hb_region_dev_read(struct o2hb_region *reg,
1194                                    char *page)
1195{
1196        unsigned int ret = 0;
1197
1198        if (reg->hr_bdev)
1199                ret = sprintf(page, "%s\n", reg->hr_dev_name);
1200
1201        return ret;
1202}
1203
1204static void o2hb_init_region_params(struct o2hb_region *reg)
1205{
1206        reg->hr_slots_per_page = PAGE_CACHE_SIZE >> reg->hr_block_bits;
1207        reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
1208
1209        mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
1210             reg->hr_start_block, reg->hr_blocks);
1211        mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
1212             reg->hr_block_bytes, reg->hr_block_bits);
1213        mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
1214        mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
1215}
1216
1217static int o2hb_map_slot_data(struct o2hb_region *reg)
1218{
1219        int i, j;
1220        unsigned int last_slot;
1221        unsigned int spp = reg->hr_slots_per_page;
1222        struct page *page;
1223        char *raw;
1224        struct o2hb_disk_slot *slot;
1225
1226        reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
1227        if (reg->hr_tmp_block == NULL) {
1228                mlog_errno(-ENOMEM);
1229                return -ENOMEM;
1230        }
1231
1232        reg->hr_slots = kcalloc(reg->hr_blocks,
1233                                sizeof(struct o2hb_disk_slot), GFP_KERNEL);
1234        if (reg->hr_slots == NULL) {
1235                mlog_errno(-ENOMEM);
1236                return -ENOMEM;
1237        }
1238
1239        for(i = 0; i < reg->hr_blocks; i++) {
1240                slot = &reg->hr_slots[i];
1241                slot->ds_node_num = i;
1242                INIT_LIST_HEAD(&slot->ds_live_item);
1243                slot->ds_raw_block = NULL;
1244        }
1245
1246        reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
1247        mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
1248                           "at %u blocks per page\n",
1249             reg->hr_num_pages, reg->hr_blocks, spp);
1250
1251        reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
1252                                    GFP_KERNEL);
1253        if (!reg->hr_slot_data) {
1254                mlog_errno(-ENOMEM);
1255                return -ENOMEM;
1256        }
1257
1258        for(i = 0; i < reg->hr_num_pages; i++) {
1259                page = alloc_page(GFP_KERNEL);
1260                if (!page) {
1261                        mlog_errno(-ENOMEM);
1262                        return -ENOMEM;
1263                }
1264
1265                reg->hr_slot_data[i] = page;
1266
1267                last_slot = i * spp;
1268                raw = page_address(page);
1269                for (j = 0;
1270                     (j < spp) && ((j + last_slot) < reg->hr_blocks);
1271                     j++) {
1272                        BUG_ON((j + last_slot) >= reg->hr_blocks);
1273
1274                        slot = &reg->hr_slots[j + last_slot];
1275                        slot->ds_raw_block =
1276                                (struct o2hb_disk_heartbeat_block *) raw;
1277
1278                        raw += reg->hr_block_bytes;
1279                }
1280        }
1281
1282        return 0;
1283}
1284
1285/* Read in all the slots available and populate the tracking
1286 * structures so that we can start with a baseline idea of what's
1287 * there. */
1288static int o2hb_populate_slot_data(struct o2hb_region *reg)
1289{
1290        int ret, i;
1291        struct o2hb_disk_slot *slot;
1292        struct o2hb_disk_heartbeat_block *hb_block;
1293
1294        mlog_entry_void();
1295
1296        ret = o2hb_read_slots(reg, reg->hr_blocks);
1297        if (ret) {
1298                mlog_errno(ret);
1299                goto out;
1300        }
1301
1302        /* We only want to get an idea of the values initially in each
1303         * slot, so we do no verification - o2hb_check_slot will
1304         * actually determine if each configured slot is valid and
1305         * whether any values have changed. */
1306        for(i = 0; i < reg->hr_blocks; i++) {
1307                slot = &reg->hr_slots[i];
1308                hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
1309
1310                /* Only fill the values that o2hb_check_slot uses to
1311                 * determine changing slots */
1312                slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
1313                slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
1314        }
1315
1316out:
1317        mlog_exit(ret);
1318        return ret;
1319}
1320
1321/* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
1322static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
1323                                     const char *page,
1324                                     size_t count)
1325{
1326        struct task_struct *hb_task;
1327        long fd;
1328        int sectsize;
1329        char *p = (char *)page;
1330        struct file *filp = NULL;
1331        struct inode *inode = NULL;
1332        ssize_t ret = -EINVAL;
1333
1334        if (reg->hr_bdev)
1335                goto out;
1336
1337        /* We can't heartbeat without having had our node number
1338         * configured yet. */
1339        if (o2nm_this_node() == O2NM_MAX_NODES)
1340                goto out;
1341
1342        fd = simple_strtol(p, &p, 0);
1343        if (!p || (*p && (*p != '\n')))
1344                goto out;
1345
1346        if (fd < 0 || fd >= INT_MAX)
1347                goto out;
1348
1349        filp = fget(fd);
1350        if (filp == NULL)
1351                goto out;
1352
1353        if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
1354            reg->hr_block_bytes == 0)
1355                goto out;
1356
1357        inode = igrab(filp->f_mapping->host);
1358        if (inode == NULL)
1359                goto out;
1360
1361        if (!S_ISBLK(inode->i_mode))
1362                goto out;
1363
1364        reg->hr_bdev = I_BDEV(filp->f_mapping->host);
1365        ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ);
1366        if (ret) {
1367                reg->hr_bdev = NULL;
1368                goto out;
1369        }
1370        inode = NULL;
1371
1372        bdevname(reg->hr_bdev, reg->hr_dev_name);
1373
1374        sectsize = bdev_logical_block_size(reg->hr_bdev);
1375        if (sectsize != reg->hr_block_bytes) {
1376                mlog(ML_ERROR,
1377                     "blocksize %u incorrect for device, expected %d",
1378                     reg->hr_block_bytes, sectsize);
1379                ret = -EINVAL;
1380                goto out;
1381        }
1382
1383        o2hb_init_region_params(reg);
1384
1385        /* Generation of zero is invalid */
1386        do {
1387                get_random_bytes(&reg->hr_generation,
1388                                 sizeof(reg->hr_generation));
1389        } while (reg->hr_generation == 0);
1390
1391        ret = o2hb_map_slot_data(reg);
1392        if (ret) {
1393                mlog_errno(ret);
1394                goto out;
1395        }
1396
1397        ret = o2hb_populate_slot_data(reg);
1398        if (ret) {
1399                mlog_errno(ret);
1400                goto out;
1401        }
1402
1403        INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
1404
1405        /*
1406         * A node is considered live after it has beat LIVE_THRESHOLD
1407         * times.  We're not steady until we've given them a chance
1408         * _after_ our first read.
1409         */
1410        atomic_set(&reg->hr_steady_iterations, O2HB_LIVE_THRESHOLD + 1);
1411
1412        hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
1413                              reg->hr_item.ci_name);
1414        if (IS_ERR(hb_task)) {
1415                ret = PTR_ERR(hb_task);
1416                mlog_errno(ret);
1417                goto out;
1418        }
1419
1420        spin_lock(&o2hb_live_lock);
1421        reg->hr_task = hb_task;
1422        spin_unlock(&o2hb_live_lock);
1423
1424        ret = wait_event_interruptible(o2hb_steady_queue,
1425                                atomic_read(&reg->hr_steady_iterations) == 0);
1426        if (ret) {
1427                /* We got interrupted (hello ptrace!).  Clean up */
1428                spin_lock(&o2hb_live_lock);
1429                hb_task = reg->hr_task;
1430                reg->hr_task = NULL;
1431                spin_unlock(&o2hb_live_lock);
1432
1433                if (hb_task)
1434                        kthread_stop(hb_task);
1435                goto out;
1436        }
1437
1438        /* Ok, we were woken.  Make sure it wasn't by drop_item() */
1439        spin_lock(&o2hb_live_lock);
1440        hb_task = reg->hr_task;
1441        spin_unlock(&o2hb_live_lock);
1442
1443        if (hb_task)
1444                ret = count;
1445        else
1446                ret = -EIO;
1447
1448out:
1449        if (filp)
1450                fput(filp);
1451        if (inode)
1452                iput(inode);
1453        if (ret < 0) {
1454                if (reg->hr_bdev) {
1455                        blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1456                        reg->hr_bdev = NULL;
1457                }
1458        }
1459        return ret;
1460}
1461
1462static ssize_t o2hb_region_pid_read(struct o2hb_region *reg,
1463                                      char *page)
1464{
1465        pid_t pid = 0;
1466
1467        spin_lock(&o2hb_live_lock);
1468        if (reg->hr_task)
1469                pid = task_pid_nr(reg->hr_task);
1470        spin_unlock(&o2hb_live_lock);
1471
1472        if (!pid)
1473                return 0;
1474
1475        return sprintf(page, "%u\n", pid);
1476}
1477
1478struct o2hb_region_attribute {
1479        struct configfs_attribute attr;
1480        ssize_t (*show)(struct o2hb_region *, char *);
1481        ssize_t (*store)(struct o2hb_region *, const char *, size_t);
1482};
1483
1484static struct o2hb_region_attribute o2hb_region_attr_block_bytes = {
1485        .attr   = { .ca_owner = THIS_MODULE,
1486                    .ca_name = "block_bytes",
1487                    .ca_mode = S_IRUGO | S_IWUSR },
1488        .show   = o2hb_region_block_bytes_read,
1489        .store  = o2hb_region_block_bytes_write,
1490};
1491
1492static struct o2hb_region_attribute o2hb_region_attr_start_block = {
1493        .attr   = { .ca_owner = THIS_MODULE,
1494                    .ca_name = "start_block",
1495                    .ca_mode = S_IRUGO | S_IWUSR },
1496        .show   = o2hb_region_start_block_read,
1497        .store  = o2hb_region_start_block_write,
1498};
1499
1500static struct o2hb_region_attribute o2hb_region_attr_blocks = {
1501        .attr   = { .ca_owner = THIS_MODULE,
1502                    .ca_name = "blocks",
1503                    .ca_mode = S_IRUGO | S_IWUSR },
1504        .show   = o2hb_region_blocks_read,
1505        .store  = o2hb_region_blocks_write,
1506};
1507
1508static struct o2hb_region_attribute o2hb_region_attr_dev = {
1509        .attr   = { .ca_owner = THIS_MODULE,
1510                    .ca_name = "dev",
1511                    .ca_mode = S_IRUGO | S_IWUSR },
1512        .show   = o2hb_region_dev_read,
1513        .store  = o2hb_region_dev_write,
1514};
1515
1516static struct o2hb_region_attribute o2hb_region_attr_pid = {
1517       .attr   = { .ca_owner = THIS_MODULE,
1518                   .ca_name = "pid",
1519                   .ca_mode = S_IRUGO | S_IRUSR },
1520       .show   = o2hb_region_pid_read,
1521};
1522
1523static struct configfs_attribute *o2hb_region_attrs[] = {
1524        &o2hb_region_attr_block_bytes.attr,
1525        &o2hb_region_attr_start_block.attr,
1526        &o2hb_region_attr_blocks.attr,
1527        &o2hb_region_attr_dev.attr,
1528        &o2hb_region_attr_pid.attr,
1529        NULL,
1530};
1531
1532static ssize_t o2hb_region_show(struct config_item *item,
1533                                struct configfs_attribute *attr,
1534                                char *page)
1535{
1536        struct o2hb_region *reg = to_o2hb_region(item);
1537        struct o2hb_region_attribute *o2hb_region_attr =
1538                container_of(attr, struct o2hb_region_attribute, attr);
1539        ssize_t ret = 0;
1540
1541        if (o2hb_region_attr->show)
1542                ret = o2hb_region_attr->show(reg, page);
1543        return ret;
1544}
1545
1546static ssize_t o2hb_region_store(struct config_item *item,
1547                                 struct configfs_attribute *attr,
1548                                 const char *page, size_t count)
1549{
1550        struct o2hb_region *reg = to_o2hb_region(item);
1551        struct o2hb_region_attribute *o2hb_region_attr =
1552                container_of(attr, struct o2hb_region_attribute, attr);
1553        ssize_t ret = -EINVAL;
1554
1555        if (o2hb_region_attr->store)
1556                ret = o2hb_region_attr->store(reg, page, count);
1557        return ret;
1558}
1559
1560static struct configfs_item_operations o2hb_region_item_ops = {
1561        .release                = o2hb_region_release,
1562        .show_attribute         = o2hb_region_show,
1563        .store_attribute        = o2hb_region_store,
1564};
1565
1566static struct config_item_type o2hb_region_type = {
1567        .ct_item_ops    = &o2hb_region_item_ops,
1568        .ct_attrs       = o2hb_region_attrs,
1569        .ct_owner       = THIS_MODULE,
1570};
1571
1572/* heartbeat set */
1573
1574struct o2hb_heartbeat_group {
1575        struct config_group hs_group;
1576        /* some stuff? */
1577};
1578
1579static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
1580{
1581        return group ?
1582                container_of(group, struct o2hb_heartbeat_group, hs_group)
1583                : NULL;
1584}
1585
1586static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
1587                                                          const char *name)
1588{
1589        struct o2hb_region *reg = NULL;
1590
1591        reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
1592        if (reg == NULL)
1593                return ERR_PTR(-ENOMEM);
1594
1595        config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
1596
1597        spin_lock(&o2hb_live_lock);
1598        list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
1599        spin_unlock(&o2hb_live_lock);
1600
1601        return &reg->hr_item;
1602}
1603
1604static void o2hb_heartbeat_group_drop_item(struct config_group *group,
1605                                           struct config_item *item)
1606{
1607        struct task_struct *hb_task;
1608        struct o2hb_region *reg = to_o2hb_region(item);
1609
1610        /* stop the thread when the user removes the region dir */
1611        spin_lock(&o2hb_live_lock);
1612        hb_task = reg->hr_task;
1613        reg->hr_task = NULL;
1614        spin_unlock(&o2hb_live_lock);
1615
1616        if (hb_task)
1617                kthread_stop(hb_task);
1618
1619        /*
1620         * If we're racing a dev_write(), we need to wake them.  They will
1621         * check reg->hr_task
1622         */
1623        if (atomic_read(&reg->hr_steady_iterations) != 0) {
1624                atomic_set(&reg->hr_steady_iterations, 0);
1625                wake_up(&o2hb_steady_queue);
1626        }
1627
1628        config_item_put(item);
1629}
1630
1631struct o2hb_heartbeat_group_attribute {
1632        struct configfs_attribute attr;
1633        ssize_t (*show)(struct o2hb_heartbeat_group *, char *);
1634        ssize_t (*store)(struct o2hb_heartbeat_group *, const char *, size_t);
1635};
1636
1637static ssize_t o2hb_heartbeat_group_show(struct config_item *item,
1638                                         struct configfs_attribute *attr,
1639                                         char *page)
1640{
1641        struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
1642        struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
1643                container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
1644        ssize_t ret = 0;
1645
1646        if (o2hb_heartbeat_group_attr->show)
1647                ret = o2hb_heartbeat_group_attr->show(reg, page);
1648        return ret;
1649}
1650
1651static ssize_t o2hb_heartbeat_group_store(struct config_item *item,
1652                                          struct configfs_attribute *attr,
1653                                          const char *page, size_t count)
1654{
1655        struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
1656        struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
1657                container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
1658        ssize_t ret = -EINVAL;
1659
1660        if (o2hb_heartbeat_group_attr->store)
1661                ret = o2hb_heartbeat_group_attr->store(reg, page, count);
1662        return ret;
1663}
1664
1665static ssize_t o2hb_heartbeat_group_threshold_show(struct o2hb_heartbeat_group *group,
1666                                                     char *page)
1667{
1668        return sprintf(page, "%u\n", o2hb_dead_threshold);
1669}
1670
1671static ssize_t o2hb_heartbeat_group_threshold_store(struct o2hb_heartbeat_group *group,
1672                                                    const char *page,
1673                                                    size_t count)
1674{
1675        unsigned long tmp;
1676        char *p = (char *)page;
1677
1678        tmp = simple_strtoul(p, &p, 10);
1679        if (!p || (*p && (*p != '\n')))
1680                return -EINVAL;
1681
1682        /* this will validate ranges for us. */
1683        o2hb_dead_threshold_set((unsigned int) tmp);
1684
1685        return count;
1686}
1687
1688static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold = {
1689        .attr   = { .ca_owner = THIS_MODULE,
1690                    .ca_name = "dead_threshold",
1691                    .ca_mode = S_IRUGO | S_IWUSR },
1692        .show   = o2hb_heartbeat_group_threshold_show,
1693        .store  = o2hb_heartbeat_group_threshold_store,
1694};
1695
1696static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
1697        &o2hb_heartbeat_group_attr_threshold.attr,
1698        NULL,
1699};
1700
1701static struct configfs_item_operations o2hb_hearbeat_group_item_ops = {
1702        .show_attribute         = o2hb_heartbeat_group_show,
1703        .store_attribute        = o2hb_heartbeat_group_store,
1704};
1705
1706static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
1707        .make_item      = o2hb_heartbeat_group_make_item,
1708        .drop_item      = o2hb_heartbeat_group_drop_item,
1709};
1710
1711static struct config_item_type o2hb_heartbeat_group_type = {
1712        .ct_group_ops   = &o2hb_heartbeat_group_group_ops,
1713        .ct_item_ops    = &o2hb_hearbeat_group_item_ops,
1714        .ct_attrs       = o2hb_heartbeat_group_attrs,
1715        .ct_owner       = THIS_MODULE,
1716};
1717
1718/* this is just here to avoid touching group in heartbeat.h which the
1719 * entire damn world #includes */
1720struct config_group *o2hb_alloc_hb_set(void)
1721{
1722        struct o2hb_heartbeat_group *hs = NULL;
1723        struct config_group *ret = NULL;
1724
1725        hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
1726        if (hs == NULL)
1727                goto out;
1728
1729        config_group_init_type_name(&hs->hs_group, "heartbeat",
1730                                    &o2hb_heartbeat_group_type);
1731
1732        ret = &hs->hs_group;
1733out:
1734        if (ret == NULL)
1735                kfree(hs);
1736        return ret;
1737}
1738
1739void o2hb_free_hb_set(struct config_group *group)
1740{
1741        struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
1742        kfree(hs);
1743}
1744
1745/* hb callback registration and issueing */
1746
1747static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
1748{
1749        if (type == O2HB_NUM_CB)
1750                return ERR_PTR(-EINVAL);
1751
1752        return &o2hb_callbacks[type];
1753}
1754
1755void o2hb_setup_callback(struct o2hb_callback_func *hc,
1756                         enum o2hb_callback_type type,
1757                         o2hb_cb_func *func,
1758                         void *data,
1759                         int priority)
1760{
1761        INIT_LIST_HEAD(&hc->hc_item);
1762        hc->hc_func = func;
1763        hc->hc_data = data;
1764        hc->hc_priority = priority;
1765        hc->hc_type = type;
1766        hc->hc_magic = O2HB_CB_MAGIC;
1767}
1768EXPORT_SYMBOL_GPL(o2hb_setup_callback);
1769
1770static struct o2hb_region *o2hb_find_region(const char *region_uuid)
1771{
1772        struct o2hb_region *p, *reg = NULL;
1773
1774        assert_spin_locked(&o2hb_live_lock);
1775
1776        list_for_each_entry(p, &o2hb_all_regions, hr_all_item) {
1777                if (!strcmp(region_uuid, config_item_name(&p->hr_item))) {
1778                        reg = p;
1779                        break;
1780                }
1781        }
1782
1783        return reg;
1784}
1785
1786static int o2hb_region_get(const char *region_uuid)
1787{
1788        int ret = 0;
1789        struct o2hb_region *reg;
1790
1791        spin_lock(&o2hb_live_lock);
1792
1793        reg = o2hb_find_region(region_uuid);
1794        if (!reg)
1795                ret = -ENOENT;
1796        spin_unlock(&o2hb_live_lock);
1797
1798        if (ret)
1799                goto out;
1800
1801        ret = o2nm_depend_this_node();
1802        if (ret)
1803                goto out;
1804
1805        ret = o2nm_depend_item(&reg->hr_item);
1806        if (ret)
1807                o2nm_undepend_this_node();
1808
1809out:
1810        return ret;
1811}
1812
1813static void o2hb_region_put(const char *region_uuid)
1814{
1815        struct o2hb_region *reg;
1816
1817        spin_lock(&o2hb_live_lock);
1818
1819        reg = o2hb_find_region(region_uuid);
1820
1821        spin_unlock(&o2hb_live_lock);
1822
1823        if (reg) {
1824                o2nm_undepend_item(&reg->hr_item);
1825                o2nm_undepend_this_node();
1826        }
1827}
1828
1829int o2hb_register_callback(const char *region_uuid,
1830                           struct o2hb_callback_func *hc)
1831{
1832        struct o2hb_callback_func *tmp;
1833        struct list_head *iter;
1834        struct o2hb_callback *hbcall;
1835        int ret;
1836
1837        BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
1838        BUG_ON(!list_empty(&hc->hc_item));
1839
1840        hbcall = hbcall_from_type(hc->hc_type);
1841        if (IS_ERR(hbcall)) {
1842                ret = PTR_ERR(hbcall);
1843                goto out;
1844        }
1845
1846        if (region_uuid) {
1847                ret = o2hb_region_get(region_uuid);
1848                if (ret)
1849                        goto out;
1850        }
1851
1852        down_write(&o2hb_callback_sem);
1853
1854        list_for_each(iter, &hbcall->list) {
1855                tmp = list_entry(iter, struct o2hb_callback_func, hc_item);
1856                if (hc->hc_priority < tmp->hc_priority) {
1857                        list_add_tail(&hc->hc_item, iter);
1858                        break;
1859                }
1860        }
1861        if (list_empty(&hc->hc_item))
1862                list_add_tail(&hc->hc_item, &hbcall->list);
1863
1864        up_write(&o2hb_callback_sem);
1865        ret = 0;
1866out:
1867        mlog(ML_HEARTBEAT, "returning %d on behalf of %p for funcs %p\n",
1868             ret, __builtin_return_address(0), hc);
1869        return ret;
1870}
1871EXPORT_SYMBOL_GPL(o2hb_register_callback);
1872
1873void o2hb_unregister_callback(const char *region_uuid,
1874                              struct o2hb_callback_func *hc)
1875{
1876        BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
1877
1878        mlog(ML_HEARTBEAT, "on behalf of %p for funcs %p\n",
1879             __builtin_return_address(0), hc);
1880
1881        /* XXX Can this happen _with_ a region reference? */
1882        if (list_empty(&hc->hc_item))
1883                return;
1884
1885        if (region_uuid)
1886                o2hb_region_put(region_uuid);
1887
1888        down_write(&o2hb_callback_sem);
1889
1890        list_del_init(&hc->hc_item);
1891
1892        up_write(&o2hb_callback_sem);
1893}
1894EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
1895
1896int o2hb_check_node_heartbeating(u8 node_num)
1897{
1898        unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1899
1900        o2hb_fill_node_map(testing_map, sizeof(testing_map));
1901        if (!test_bit(node_num, testing_map)) {
1902                mlog(ML_HEARTBEAT,
1903                     "node (%u) does not have heartbeating enabled.\n",
1904                     node_num);
1905                return 0;
1906        }
1907
1908        return 1;
1909}
1910EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating);
1911
1912int o2hb_check_node_heartbeating_from_callback(u8 node_num)
1913{
1914        unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1915
1916        o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
1917        if (!test_bit(node_num, testing_map)) {
1918                mlog(ML_HEARTBEAT,
1919                     "node (%u) does not have heartbeating enabled.\n",
1920                     node_num);
1921                return 0;
1922        }
1923
1924        return 1;
1925}
1926EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
1927
1928/* Makes sure our local node is configured with a node number, and is
1929 * heartbeating. */
1930int o2hb_check_local_node_heartbeating(void)
1931{
1932        u8 node_num;
1933
1934        /* if this node was set then we have networking */
1935        node_num = o2nm_this_node();
1936        if (node_num == O2NM_MAX_NODES) {
1937                mlog(ML_HEARTBEAT, "this node has not been configured.\n");
1938                return 0;
1939        }
1940
1941        return o2hb_check_node_heartbeating(node_num);
1942}
1943EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating);
1944
1945/*
1946 * this is just a hack until we get the plumbing which flips file systems
1947 * read only and drops the hb ref instead of killing the node dead.
1948 */
1949void o2hb_stop_all_regions(void)
1950{
1951        struct o2hb_region *reg;
1952
1953        mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
1954
1955        spin_lock(&o2hb_live_lock);
1956
1957        list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
1958                reg->hr_unclean_stop = 1;
1959
1960        spin_unlock(&o2hb_live_lock);
1961}
1962EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
1963