linux/fs/ocfs2/vote.c
<<
>>
Prefs
   1/* -*- mode: c; c-basic-offset: 8; -*-
   2 * vim: noexpandtab sw=8 ts=8 sts=0:
   3 *
   4 * vote.c
   5 *
   6 * description here
   7 *
   8 * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
   9 *
  10 * This program is free software; you can redistribute it and/or
  11 * modify it under the terms of the GNU General Public
  12 * License as published by the Free Software Foundation; either
  13 * version 2 of the License, or (at your option) any later version.
  14 *
  15 * This program is distributed in the hope that it will be useful,
  16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  18 * General Public License for more details.
  19 *
  20 * You should have received a copy of the GNU General Public
  21 * License along with this program; if not, write to the
  22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  23 * Boston, MA 021110-1307, USA.
  24 */
  25
  26#include <linux/types.h>
  27#include <linux/slab.h>
  28#include <linux/highmem.h>
  29#include <linux/kthread.h>
  30
  31#include <cluster/heartbeat.h>
  32#include <cluster/nodemanager.h>
  33#include <cluster/tcp.h>
  34
  35#include <dlm/dlmapi.h>
  36
  37#define MLOG_MASK_PREFIX ML_VOTE
  38#include <cluster/masklog.h>
  39
  40#include "ocfs2.h"
  41
  42#include "alloc.h"
  43#include "dlmglue.h"
  44#include "extent_map.h"
  45#include "heartbeat.h"
  46#include "inode.h"
  47#include "journal.h"
  48#include "slot_map.h"
  49#include "vote.h"
  50
  51#include "buffer_head_io.h"
  52
  53#define OCFS2_MESSAGE_TYPE_VOTE     (0x1)
  54#define OCFS2_MESSAGE_TYPE_RESPONSE (0x2)
  55struct ocfs2_msg_hdr
  56{
  57        __be32 h_response_id; /* used to lookup message handle on sending
  58                            * node. */
  59        __be32 h_request;
  60        __be64 h_blkno;
  61        __be32 h_generation;
  62        __be32 h_node_num;    /* node sending this particular message. */
  63};
  64
  65struct ocfs2_vote_msg
  66{
  67        struct ocfs2_msg_hdr v_hdr;
  68        __be32 v_reserved1;
  69} __attribute__ ((packed));
  70
  71/* Responses are given these values to maintain backwards
  72 * compatibility with older ocfs2 versions */
  73#define OCFS2_RESPONSE_OK               (0)
  74#define OCFS2_RESPONSE_BUSY             (-16)
  75#define OCFS2_RESPONSE_BAD_MSG          (-22)
  76
  77struct ocfs2_response_msg
  78{
  79        struct ocfs2_msg_hdr r_hdr;
  80        __be32 r_response;
  81} __attribute__ ((packed));
  82
  83struct ocfs2_vote_work {
  84        struct list_head   w_list;
  85        struct ocfs2_vote_msg w_msg;
  86};
  87
  88enum ocfs2_vote_request {
  89        OCFS2_VOTE_REQ_INVALID = 0,
  90        OCFS2_VOTE_REQ_MOUNT,
  91        OCFS2_VOTE_REQ_UMOUNT,
  92        OCFS2_VOTE_REQ_LAST
  93};
  94
  95static inline int ocfs2_is_valid_vote_request(int request)
  96{
  97        return OCFS2_VOTE_REQ_INVALID < request &&
  98                request < OCFS2_VOTE_REQ_LAST;
  99}
 100
 101typedef void (*ocfs2_net_response_callback)(void *priv,
 102                                            struct ocfs2_response_msg *resp);
 103struct ocfs2_net_response_cb {
 104        ocfs2_net_response_callback     rc_cb;
 105        void                            *rc_priv;
 106};
 107
 108struct ocfs2_net_wait_ctxt {
 109        struct list_head        n_list;
 110        u32                     n_response_id;
 111        wait_queue_head_t       n_event;
 112        struct ocfs2_node_map   n_node_map;
 113        int                     n_response; /* an agreggate response. 0 if
 114                                             * all nodes are go, < 0 on any
 115                                             * negative response from any
 116                                             * node or network error. */
 117        struct ocfs2_net_response_cb *n_callback;
 118};
 119
 120static void ocfs2_process_mount_request(struct ocfs2_super *osb,
 121                                        unsigned int node_num)
 122{
 123        mlog(0, "MOUNT vote from node %u\n", node_num);
 124        /* The other node only sends us this message when he has an EX
 125         * on the superblock, so our recovery threads (if having been
 126         * launched) are waiting on it.*/
 127        ocfs2_recovery_map_clear(osb, node_num);
 128        ocfs2_node_map_set_bit(osb, &osb->mounted_map, node_num);
 129
 130        /* We clear the umount map here because a node may have been
 131         * previously mounted, safely unmounted but never stopped
 132         * heartbeating - in which case we'd have a stale entry. */
 133        ocfs2_node_map_clear_bit(osb, &osb->umount_map, node_num);
 134}
 135
 136static void ocfs2_process_umount_request(struct ocfs2_super *osb,
 137                                         unsigned int node_num)
 138{
 139        mlog(0, "UMOUNT vote from node %u\n", node_num);
 140        ocfs2_node_map_clear_bit(osb, &osb->mounted_map, node_num);
 141        ocfs2_node_map_set_bit(osb, &osb->umount_map, node_num);
 142}
 143
 144static void ocfs2_process_vote(struct ocfs2_super *osb,
 145                               struct ocfs2_vote_msg *msg)
 146{
 147        int net_status, vote_response;
 148        unsigned int node_num;
 149        u64 blkno;
 150        enum ocfs2_vote_request request;
 151        struct ocfs2_msg_hdr *hdr = &msg->v_hdr;
 152        struct ocfs2_response_msg response;
 153
 154        /* decode the network mumbo jumbo into local variables. */
 155        request = be32_to_cpu(hdr->h_request);
 156        blkno = be64_to_cpu(hdr->h_blkno);
 157        node_num = be32_to_cpu(hdr->h_node_num);
 158
 159        mlog(0, "processing vote: request = %u, blkno = %llu, node_num = %u\n",
 160             request, (unsigned long long)blkno, node_num);
 161
 162        if (!ocfs2_is_valid_vote_request(request)) {
 163                mlog(ML_ERROR, "Invalid vote request %d from node %u\n",
 164                     request, node_num);
 165                vote_response = OCFS2_RESPONSE_BAD_MSG;
 166                goto respond;
 167        }
 168
 169        vote_response = OCFS2_RESPONSE_OK;
 170
 171        switch (request) {
 172        case OCFS2_VOTE_REQ_UMOUNT:
 173                ocfs2_process_umount_request(osb, node_num);
 174                goto respond;
 175        case OCFS2_VOTE_REQ_MOUNT:
 176                ocfs2_process_mount_request(osb, node_num);
 177                goto respond;
 178        default:
 179                /* avoids a gcc warning */
 180                break;
 181        }
 182
 183respond:
 184        /* Response struture is small so we just put it on the stack
 185         * and stuff it inline. */
 186        memset(&response, 0, sizeof(struct ocfs2_response_msg));
 187        response.r_hdr.h_response_id = hdr->h_response_id;
 188        response.r_hdr.h_blkno = hdr->h_blkno;
 189        response.r_hdr.h_generation = hdr->h_generation;
 190        response.r_hdr.h_node_num = cpu_to_be32(osb->node_num);
 191        response.r_response = cpu_to_be32(vote_response);
 192
 193        net_status = o2net_send_message(OCFS2_MESSAGE_TYPE_RESPONSE,
 194                                        osb->net_key,
 195                                        &response,
 196                                        sizeof(struct ocfs2_response_msg),
 197                                        node_num,
 198                                        NULL);
 199        /* We still want to error print for ENOPROTOOPT here. The
 200         * sending node shouldn't have unregistered his net handler
 201         * without sending an unmount vote 1st */
 202        if (net_status < 0
 203            && net_status != -ETIMEDOUT
 204            && net_status != -ENOTCONN)
 205                mlog(ML_ERROR, "message to node %u fails with error %d!\n",
 206                     node_num, net_status);
 207}
 208
 209static void ocfs2_vote_thread_do_work(struct ocfs2_super *osb)
 210{
 211        unsigned long processed;
 212        struct ocfs2_lock_res *lockres;
 213        struct ocfs2_vote_work *work;
 214
 215        mlog_entry_void();
 216
 217        spin_lock(&osb->vote_task_lock);
 218        /* grab this early so we know to try again if a state change and
 219         * wake happens part-way through our work  */
 220        osb->vote_work_sequence = osb->vote_wake_sequence;
 221
 222        processed = osb->blocked_lock_count;
 223        while (processed) {
 224                BUG_ON(list_empty(&osb->blocked_lock_list));
 225
 226                lockres = list_entry(osb->blocked_lock_list.next,
 227                                     struct ocfs2_lock_res, l_blocked_list);
 228                list_del_init(&lockres->l_blocked_list);
 229                osb->blocked_lock_count--;
 230                spin_unlock(&osb->vote_task_lock);
 231
 232                BUG_ON(!processed);
 233                processed--;
 234
 235                ocfs2_process_blocked_lock(osb, lockres);
 236
 237                spin_lock(&osb->vote_task_lock);
 238        }
 239
 240        while (osb->vote_count) {
 241                BUG_ON(list_empty(&osb->vote_list));
 242                work = list_entry(osb->vote_list.next,
 243                                  struct ocfs2_vote_work, w_list);
 244                list_del(&work->w_list);
 245                osb->vote_count--;
 246                spin_unlock(&osb->vote_task_lock);
 247
 248                ocfs2_process_vote(osb, &work->w_msg);
 249                kfree(work);
 250
 251                spin_lock(&osb->vote_task_lock);
 252        }
 253        spin_unlock(&osb->vote_task_lock);
 254
 255        mlog_exit_void();
 256}
 257
 258static int ocfs2_vote_thread_lists_empty(struct ocfs2_super *osb)
 259{
 260        int empty = 0;
 261
 262        spin_lock(&osb->vote_task_lock);
 263        if (list_empty(&osb->blocked_lock_list) &&
 264            list_empty(&osb->vote_list))
 265                empty = 1;
 266
 267        spin_unlock(&osb->vote_task_lock);
 268        return empty;
 269}
 270
 271static int ocfs2_vote_thread_should_wake(struct ocfs2_super *osb)
 272{
 273        int should_wake = 0;
 274
 275        spin_lock(&osb->vote_task_lock);
 276        if (osb->vote_work_sequence != osb->vote_wake_sequence)
 277                should_wake = 1;
 278        spin_unlock(&osb->vote_task_lock);
 279
 280        return should_wake;
 281}
 282
 283int ocfs2_vote_thread(void *arg)
 284{
 285        int status = 0;
 286        struct ocfs2_super *osb = arg;
 287
 288        /* only quit once we've been asked to stop and there is no more
 289         * work available */
 290        while (!(kthread_should_stop() &&
 291                 ocfs2_vote_thread_lists_empty(osb))) {
 292
 293                wait_event_interruptible(osb->vote_event,
 294                                         ocfs2_vote_thread_should_wake(osb) ||
 295                                         kthread_should_stop());
 296
 297                mlog(0, "vote_thread: awoken\n");
 298
 299                ocfs2_vote_thread_do_work(osb);
 300        }
 301
 302        osb->vote_task = NULL;
 303        return status;
 304}
 305
 306static struct ocfs2_net_wait_ctxt *ocfs2_new_net_wait_ctxt(unsigned int response_id)
 307{
 308        struct ocfs2_net_wait_ctxt *w;
 309
 310        w = kzalloc(sizeof(*w), GFP_NOFS);
 311        if (!w) {
 312                mlog_errno(-ENOMEM);
 313                goto bail;
 314        }
 315
 316        INIT_LIST_HEAD(&w->n_list);
 317        init_waitqueue_head(&w->n_event);
 318        ocfs2_node_map_init(&w->n_node_map);
 319        w->n_response_id = response_id;
 320        w->n_callback = NULL;
 321bail:
 322        return w;
 323}
 324
 325static unsigned int ocfs2_new_response_id(struct ocfs2_super *osb)
 326{
 327        unsigned int ret;
 328
 329        spin_lock(&osb->net_response_lock);
 330        ret = ++osb->net_response_ids;
 331        spin_unlock(&osb->net_response_lock);
 332
 333        return ret;
 334}
 335
 336static void ocfs2_dequeue_net_wait_ctxt(struct ocfs2_super *osb,
 337                                        struct ocfs2_net_wait_ctxt *w)
 338{
 339        spin_lock(&osb->net_response_lock);
 340        list_del(&w->n_list);
 341        spin_unlock(&osb->net_response_lock);
 342}
 343
 344static void ocfs2_queue_net_wait_ctxt(struct ocfs2_super *osb,
 345                                      struct ocfs2_net_wait_ctxt *w)
 346{
 347        spin_lock(&osb->net_response_lock);
 348        list_add_tail(&w->n_list,
 349                      &osb->net_response_list);
 350        spin_unlock(&osb->net_response_lock);
 351}
 352
 353static void __ocfs2_mark_node_responded(struct ocfs2_super *osb,
 354                                        struct ocfs2_net_wait_ctxt *w,
 355                                        int node_num)
 356{
 357        assert_spin_locked(&osb->net_response_lock);
 358
 359        ocfs2_node_map_clear_bit(osb, &w->n_node_map, node_num);
 360        if (ocfs2_node_map_is_empty(osb, &w->n_node_map))
 361                wake_up(&w->n_event);
 362}
 363
 364/* Intended to be called from the node down callback, we fake remove
 365 * the node from all our response contexts */
 366void ocfs2_remove_node_from_vote_queues(struct ocfs2_super *osb,
 367                                        int node_num)
 368{
 369        struct list_head *p;
 370        struct ocfs2_net_wait_ctxt *w = NULL;
 371
 372        spin_lock(&osb->net_response_lock);
 373
 374        list_for_each(p, &osb->net_response_list) {
 375                w = list_entry(p, struct ocfs2_net_wait_ctxt, n_list);
 376
 377                __ocfs2_mark_node_responded(osb, w, node_num);
 378        }
 379
 380        spin_unlock(&osb->net_response_lock);
 381}
 382
 383static int ocfs2_broadcast_vote(struct ocfs2_super *osb,
 384                                struct ocfs2_vote_msg *request,
 385                                unsigned int response_id,
 386                                int *response,
 387                                struct ocfs2_net_response_cb *callback)
 388{
 389        int status, i, remote_err;
 390        struct ocfs2_net_wait_ctxt *w = NULL;
 391        int dequeued = 0;
 392
 393        mlog_entry_void();
 394
 395        w = ocfs2_new_net_wait_ctxt(response_id);
 396        if (!w) {
 397                status = -ENOMEM;
 398                mlog_errno(status);
 399                goto bail;
 400        }
 401        w->n_callback = callback;
 402
 403        /* we're pretty much ready to go at this point, and this fills
 404         * in n_response which we need anyway... */
 405        ocfs2_queue_net_wait_ctxt(osb, w);
 406
 407        i = ocfs2_node_map_iterate(osb, &osb->mounted_map, 0);
 408
 409        while (i != O2NM_INVALID_NODE_NUM) {
 410                if (i != osb->node_num) {
 411                        mlog(0, "trying to send request to node %i\n", i);
 412                        ocfs2_node_map_set_bit(osb, &w->n_node_map, i);
 413
 414                        remote_err = 0;
 415                        status = o2net_send_message(OCFS2_MESSAGE_TYPE_VOTE,
 416                                                    osb->net_key,
 417                                                    request,
 418                                                    sizeof(*request),
 419                                                    i,
 420                                                    &remote_err);
 421                        if (status == -ETIMEDOUT) {
 422                                mlog(0, "remote node %d timed out!\n", i);
 423                                status = -EAGAIN;
 424                                goto bail;
 425                        }
 426                        if (remote_err < 0) {
 427                                status = remote_err;
 428                                mlog(0, "remote error %d on node %d!\n",
 429                                     remote_err, i);
 430                                mlog_errno(status);
 431                                goto bail;
 432                        }
 433                        if (status < 0) {
 434                                mlog_errno(status);
 435                                goto bail;
 436                        }
 437                }
 438                i++;
 439                i = ocfs2_node_map_iterate(osb, &osb->mounted_map, i);
 440                mlog(0, "next is %d, i am %d\n", i, osb->node_num);
 441        }
 442        mlog(0, "done sending, now waiting on responses...\n");
 443
 444        wait_event(w->n_event, ocfs2_node_map_is_empty(osb, &w->n_node_map));
 445
 446        ocfs2_dequeue_net_wait_ctxt(osb, w);
 447        dequeued = 1;
 448
 449        *response = w->n_response;
 450        status = 0;
 451bail:
 452        if (w) {
 453                if (!dequeued)
 454                        ocfs2_dequeue_net_wait_ctxt(osb, w);
 455                kfree(w);
 456        }
 457
 458        mlog_exit(status);
 459        return status;
 460}
 461
 462static struct ocfs2_vote_msg * ocfs2_new_vote_request(struct ocfs2_super *osb,
 463                                                      u64 blkno,
 464                                                      unsigned int generation,
 465                                                      enum ocfs2_vote_request type)
 466{
 467        struct ocfs2_vote_msg *request;
 468        struct ocfs2_msg_hdr *hdr;
 469
 470        BUG_ON(!ocfs2_is_valid_vote_request(type));
 471
 472        request = kzalloc(sizeof(*request), GFP_NOFS);
 473        if (!request) {
 474                mlog_errno(-ENOMEM);
 475        } else {
 476                hdr = &request->v_hdr;
 477                hdr->h_node_num = cpu_to_be32(osb->node_num);
 478                hdr->h_request = cpu_to_be32(type);
 479                hdr->h_blkno = cpu_to_be64(blkno);
 480                hdr->h_generation = cpu_to_be32(generation);
 481        }
 482
 483        return request;
 484}
 485
 486/* Complete the buildup of a new vote request and process the
 487 * broadcast return value. */
 488static int ocfs2_do_request_vote(struct ocfs2_super *osb,
 489                                 struct ocfs2_vote_msg *request,
 490                                 struct ocfs2_net_response_cb *callback)
 491{
 492        int status, response = -EBUSY;
 493        unsigned int response_id;
 494        struct ocfs2_msg_hdr *hdr;
 495
 496        response_id = ocfs2_new_response_id(osb);
 497
 498        hdr = &request->v_hdr;
 499        hdr->h_response_id = cpu_to_be32(response_id);
 500
 501        status = ocfs2_broadcast_vote(osb, request, response_id, &response,
 502                                      callback);
 503        if (status < 0) {
 504                mlog_errno(status);
 505                goto bail;
 506        }
 507
 508        status = response;
 509bail:
 510
 511        return status;
 512}
 513
 514int ocfs2_request_mount_vote(struct ocfs2_super *osb)
 515{
 516        int status;
 517        struct ocfs2_vote_msg *request = NULL;
 518
 519        request = ocfs2_new_vote_request(osb, 0ULL, 0, OCFS2_VOTE_REQ_MOUNT);
 520        if (!request) {
 521                status = -ENOMEM;
 522                goto bail;
 523        }
 524
 525        status = -EAGAIN;
 526        while (status == -EAGAIN) {
 527                if (!(osb->s_mount_opt & OCFS2_MOUNT_NOINTR) &&
 528                    signal_pending(current)) {
 529                        status = -ERESTARTSYS;
 530                        goto bail;
 531                }
 532
 533                if (ocfs2_node_map_is_only(osb, &osb->mounted_map,
 534                                           osb->node_num)) {
 535                        status = 0;
 536                        goto bail;
 537                }
 538
 539                status = ocfs2_do_request_vote(osb, request, NULL);
 540        }
 541
 542bail:
 543        kfree(request);
 544        return status;
 545}
 546
 547int ocfs2_request_umount_vote(struct ocfs2_super *osb)
 548{
 549        int status;
 550        struct ocfs2_vote_msg *request = NULL;
 551
 552        request = ocfs2_new_vote_request(osb, 0ULL, 0, OCFS2_VOTE_REQ_UMOUNT);
 553        if (!request) {
 554                status = -ENOMEM;
 555                goto bail;
 556        }
 557
 558        status = -EAGAIN;
 559        while (status == -EAGAIN) {
 560                /* Do not check signals on this vote... We really want
 561                 * this one to go all the way through. */
 562
 563                if (ocfs2_node_map_is_only(osb, &osb->mounted_map,
 564                                           osb->node_num)) {
 565                        status = 0;
 566                        goto bail;
 567                }
 568
 569                status = ocfs2_do_request_vote(osb, request, NULL);
 570        }
 571
 572bail:
 573        kfree(request);
 574        return status;
 575}
 576
 577/* TODO: This should eventually be a hash table! */
 578static struct ocfs2_net_wait_ctxt * __ocfs2_find_net_wait_ctxt(struct ocfs2_super *osb,
 579                                                               u32 response_id)
 580{
 581        struct list_head *p;
 582        struct ocfs2_net_wait_ctxt *w = NULL;
 583
 584        list_for_each(p, &osb->net_response_list) {
 585                w = list_entry(p, struct ocfs2_net_wait_ctxt, n_list);
 586                if (response_id == w->n_response_id)
 587                        break;
 588                w = NULL;
 589        }
 590
 591        return w;
 592}
 593
 594/* Translate response codes into local node errno values */
 595static inline int ocfs2_translate_response(int response)
 596{
 597        int ret;
 598
 599        switch (response) {
 600        case OCFS2_RESPONSE_OK:
 601                ret = 0;
 602                break;
 603
 604        case OCFS2_RESPONSE_BUSY:
 605                ret = -EBUSY;
 606                break;
 607
 608        default:
 609                ret = -EINVAL;
 610        }
 611
 612        return ret;
 613}
 614
 615static int ocfs2_handle_response_message(struct o2net_msg *msg,
 616                                         u32 len,
 617                                         void *data, void **ret_data)
 618{
 619        unsigned int response_id, node_num;
 620        int response_status;
 621        struct ocfs2_super *osb = data;
 622        struct ocfs2_response_msg *resp;
 623        struct ocfs2_net_wait_ctxt * w;
 624        struct ocfs2_net_response_cb *resp_cb;
 625
 626        resp = (struct ocfs2_response_msg *) msg->buf;
 627
 628        response_id = be32_to_cpu(resp->r_hdr.h_response_id);
 629        node_num = be32_to_cpu(resp->r_hdr.h_node_num);
 630        response_status = 
 631                ocfs2_translate_response(be32_to_cpu(resp->r_response));
 632
 633        mlog(0, "received response message:\n");
 634        mlog(0, "h_response_id = %u\n", response_id);
 635        mlog(0, "h_request = %u\n", be32_to_cpu(resp->r_hdr.h_request));
 636        mlog(0, "h_blkno = %llu\n",
 637             (unsigned long long)be64_to_cpu(resp->r_hdr.h_blkno));
 638        mlog(0, "h_generation = %u\n", be32_to_cpu(resp->r_hdr.h_generation));
 639        mlog(0, "h_node_num = %u\n", node_num);
 640        mlog(0, "r_response = %d\n", response_status);
 641
 642        spin_lock(&osb->net_response_lock);
 643        w = __ocfs2_find_net_wait_ctxt(osb, response_id);
 644        if (!w) {
 645                mlog(0, "request not found!\n");
 646                goto bail;
 647        }
 648        resp_cb = w->n_callback;
 649
 650        if (response_status && (!w->n_response)) {
 651                /* we only really need one negative response so don't
 652                 * set it twice. */
 653                w->n_response = response_status;
 654        }
 655
 656        if (resp_cb) {
 657                spin_unlock(&osb->net_response_lock);
 658
 659                resp_cb->rc_cb(resp_cb->rc_priv, resp);
 660
 661                spin_lock(&osb->net_response_lock);
 662        }
 663
 664        __ocfs2_mark_node_responded(osb, w, node_num);
 665bail:
 666        spin_unlock(&osb->net_response_lock);
 667
 668        return 0;
 669}
 670
 671static int ocfs2_handle_vote_message(struct o2net_msg *msg,
 672                                     u32 len,
 673                                     void *data, void **ret_data)
 674{
 675        int status;
 676        struct ocfs2_super *osb = data;
 677        struct ocfs2_vote_work *work;
 678
 679        work = kmalloc(sizeof(struct ocfs2_vote_work), GFP_NOFS);
 680        if (!work) {
 681                status = -ENOMEM;
 682                mlog_errno(status);
 683                goto bail;
 684        }
 685
 686        INIT_LIST_HEAD(&work->w_list);
 687        memcpy(&work->w_msg, msg->buf, sizeof(struct ocfs2_vote_msg));
 688
 689        mlog(0, "scheduling vote request:\n");
 690        mlog(0, "h_response_id = %u\n",
 691             be32_to_cpu(work->w_msg.v_hdr.h_response_id));
 692        mlog(0, "h_request = %u\n", be32_to_cpu(work->w_msg.v_hdr.h_request));
 693        mlog(0, "h_blkno = %llu\n",
 694             (unsigned long long)be64_to_cpu(work->w_msg.v_hdr.h_blkno));
 695        mlog(0, "h_generation = %u\n",
 696             be32_to_cpu(work->w_msg.v_hdr.h_generation));
 697        mlog(0, "h_node_num = %u\n",
 698             be32_to_cpu(work->w_msg.v_hdr.h_node_num));
 699
 700        spin_lock(&osb->vote_task_lock);
 701        list_add_tail(&work->w_list, &osb->vote_list);
 702        osb->vote_count++;
 703        spin_unlock(&osb->vote_task_lock);
 704
 705        ocfs2_kick_vote_thread(osb);
 706
 707        status = 0;
 708bail:
 709        return status;
 710}
 711
 712void ocfs2_unregister_net_handlers(struct ocfs2_super *osb)
 713{
 714        if (!osb->net_key)
 715                return;
 716
 717        o2net_unregister_handler_list(&osb->osb_net_handlers);
 718
 719        if (!list_empty(&osb->net_response_list))
 720                mlog(ML_ERROR, "net response list not empty!\n");
 721
 722        osb->net_key = 0;
 723}
 724
 725int ocfs2_register_net_handlers(struct ocfs2_super *osb)
 726{
 727        int status = 0;
 728
 729        if (ocfs2_mount_local(osb))
 730                return 0;
 731
 732        status = o2net_register_handler(OCFS2_MESSAGE_TYPE_RESPONSE,
 733                                        osb->net_key,
 734                                        sizeof(struct ocfs2_response_msg),
 735                                        ocfs2_handle_response_message,
 736                                        osb, NULL, &osb->osb_net_handlers);
 737        if (status) {
 738                mlog_errno(status);
 739                goto bail;
 740        }
 741
 742        status = o2net_register_handler(OCFS2_MESSAGE_TYPE_VOTE,
 743                                        osb->net_key,
 744                                        sizeof(struct ocfs2_vote_msg),
 745                                        ocfs2_handle_vote_message,
 746                                        osb, NULL, &osb->osb_net_handlers);
 747        if (status) {
 748                mlog_errno(status);
 749                goto bail;
 750        }
 751bail:
 752        if (status < 0)
 753                ocfs2_unregister_net_handlers(osb);
 754
 755        return status;
 756}
 757