linux/fs/ocfs2/dlm/dlmdomain.c
<<
>>
Prefs
   1/* -*- mode: c; c-basic-offset: 8; -*-
   2 * vim: noexpandtab sw=8 ts=8 sts=0:
   3 *
   4 * dlmdomain.c
   5 *
   6 * defines domain join / leave apis
   7 *
   8 * Copyright (C) 2004 Oracle.  All rights reserved.
   9 *
  10 * This program is free software; you can redistribute it and/or
  11 * modify it under the terms of the GNU General Public
  12 * License as published by the Free Software Foundation; either
  13 * version 2 of the License, or (at your option) any later version.
  14 *
  15 * This program is distributed in the hope that it will be useful,
  16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  18 * General Public License for more details.
  19 *
  20 * You should have received a copy of the GNU General Public
  21 * License along with this program; if not, write to the
  22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  23 * Boston, MA 021110-1307, USA.
  24 *
  25 */
  26
  27#include <linux/module.h>
  28#include <linux/types.h>
  29#include <linux/slab.h>
  30#include <linux/highmem.h>
  31#include <linux/init.h>
  32#include <linux/spinlock.h>
  33#include <linux/delay.h>
  34#include <linux/err.h>
  35#include <linux/debugfs.h>
  36
  37#include "cluster/heartbeat.h"
  38#include "cluster/nodemanager.h"
  39#include "cluster/tcp.h"
  40
  41#include "dlmapi.h"
  42#include "dlmcommon.h"
  43#include "dlmdomain.h"
  44#include "dlmdebug.h"
  45
  46#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
  47#include "cluster/masklog.h"
  48
  49/*
  50 * ocfs2 node maps are array of long int, which limits to send them freely
  51 * across the wire due to endianness issues. To workaround this, we convert
  52 * long ints to byte arrays. Following 3 routines are helper functions to
  53 * set/test/copy bits within those array of bytes
  54 */
  55static inline void byte_set_bit(u8 nr, u8 map[])
  56{
  57        map[nr >> 3] |= (1UL << (nr & 7));
  58}
  59
  60static inline int byte_test_bit(u8 nr, u8 map[])
  61{
  62        return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0;
  63}
  64
  65static inline void byte_copymap(u8 dmap[], unsigned long smap[],
  66                        unsigned int sz)
  67{
  68        unsigned int nn;
  69
  70        if (!sz)
  71                return;
  72
  73        memset(dmap, 0, ((sz + 7) >> 3));
  74        for (nn = 0 ; nn < sz; nn++)
  75                if (test_bit(nn, smap))
  76                        byte_set_bit(nn, dmap);
  77}
  78
  79static void dlm_free_pagevec(void **vec, int pages)
  80{
  81        while (pages--)
  82                free_page((unsigned long)vec[pages]);
  83        kfree(vec);
  84}
  85
  86static void **dlm_alloc_pagevec(int pages)
  87{
  88        void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL);
  89        int i;
  90
  91        if (!vec)
  92                return NULL;
  93
  94        for (i = 0; i < pages; i++)
  95                if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL)))
  96                        goto out_free;
  97
  98        mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n",
  99             pages, (unsigned long)DLM_HASH_PAGES,
 100             (unsigned long)DLM_BUCKETS_PER_PAGE);
 101        return vec;
 102out_free:
 103        dlm_free_pagevec(vec, i);
 104        return NULL;
 105}
 106
 107/*
 108 *
 109 * spinlock lock ordering: if multiple locks are needed, obey this ordering:
 110 *    dlm_domain_lock
 111 *    struct dlm_ctxt->spinlock
 112 *    struct dlm_lock_resource->spinlock
 113 *    struct dlm_ctxt->master_lock
 114 *    struct dlm_ctxt->ast_lock
 115 *    dlm_master_list_entry->spinlock
 116 *    dlm_lock->spinlock
 117 *
 118 */
 119
 120DEFINE_SPINLOCK(dlm_domain_lock);
 121LIST_HEAD(dlm_domains);
 122static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
 123
 124/*
 125 * The supported protocol version for DLM communication.  Running domains
 126 * will have a negotiated version with the same major number and a minor
 127 * number equal or smaller.  The dlm_ctxt->dlm_locking_proto field should
 128 * be used to determine what a running domain is actually using.
 129 *
 130 * New in version 1.1:
 131 *      - Message DLM_QUERY_REGION added to support global heartbeat
 132 *      - Message DLM_QUERY_NODEINFO added to allow online node removes
 133 * New in version 1.2:
 134 *      - Message DLM_BEGIN_EXIT_DOMAIN_MSG added to mark start of exit domain
 135 * New in version 1.3:
 136 *      - Message DLM_DEREF_LOCKRES_DONE added to inform non-master that the
 137 *        refmap is cleared
 138 */
 139static const struct dlm_protocol_version dlm_protocol = {
 140        .pv_major = 1,
 141        .pv_minor = 3,
 142};
 143
 144#define DLM_DOMAIN_BACKOFF_MS 200
 145
 146static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
 147                                  void **ret_data);
 148static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
 149                                     void **ret_data);
 150static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
 151                                   void **ret_data);
 152static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
 153                                    void *data, void **ret_data);
 154static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
 155                                   void **ret_data);
 156static int dlm_protocol_compare(struct dlm_protocol_version *existing,
 157                                struct dlm_protocol_version *request);
 158
 159static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
 160
 161void __dlm_unhash_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 162{
 163        if (hlist_unhashed(&res->hash_node))
 164                return;
 165
 166        mlog(0, "%s: Unhash res %.*s\n", dlm->name, res->lockname.len,
 167             res->lockname.name);
 168        hlist_del_init(&res->hash_node);
 169        dlm_lockres_put(res);
 170}
 171
 172void __dlm_insert_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 173{
 174        struct hlist_head *bucket;
 175        struct qstr *q;
 176
 177        assert_spin_locked(&dlm->spinlock);
 178
 179        q = &res->lockname;
 180        bucket = dlm_lockres_hash(dlm, q->hash);
 181
 182        /* get a reference for our hashtable */
 183        dlm_lockres_get(res);
 184
 185        hlist_add_head(&res->hash_node, bucket);
 186
 187        mlog(0, "%s: Hash res %.*s\n", dlm->name, res->lockname.len,
 188             res->lockname.name);
 189}
 190
 191struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
 192                                                     const char *name,
 193                                                     unsigned int len,
 194                                                     unsigned int hash)
 195{
 196        struct hlist_head *bucket;
 197        struct dlm_lock_resource *res;
 198
 199        mlog(0, "%.*s\n", len, name);
 200
 201        assert_spin_locked(&dlm->spinlock);
 202
 203        bucket = dlm_lockres_hash(dlm, hash);
 204
 205        hlist_for_each_entry(res, bucket, hash_node) {
 206                if (res->lockname.name[0] != name[0])
 207                        continue;
 208                if (unlikely(res->lockname.len != len))
 209                        continue;
 210                if (memcmp(res->lockname.name + 1, name + 1, len - 1))
 211                        continue;
 212                dlm_lockres_get(res);
 213                return res;
 214        }
 215        return NULL;
 216}
 217
 218/* intended to be called by functions which do not care about lock
 219 * resources which are being purged (most net _handler functions).
 220 * this will return NULL for any lock resource which is found but
 221 * currently in the process of dropping its mastery reference.
 222 * use __dlm_lookup_lockres_full when you need the lock resource
 223 * regardless (e.g. dlm_get_lock_resource) */
 224struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
 225                                                const char *name,
 226                                                unsigned int len,
 227                                                unsigned int hash)
 228{
 229        struct dlm_lock_resource *res = NULL;
 230
 231        mlog(0, "%.*s\n", len, name);
 232
 233        assert_spin_locked(&dlm->spinlock);
 234
 235        res = __dlm_lookup_lockres_full(dlm, name, len, hash);
 236        if (res) {
 237                spin_lock(&res->spinlock);
 238                if (res->state & DLM_LOCK_RES_DROPPING_REF) {
 239                        spin_unlock(&res->spinlock);
 240                        dlm_lockres_put(res);
 241                        return NULL;
 242                }
 243                spin_unlock(&res->spinlock);
 244        }
 245
 246        return res;
 247}
 248
 249struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
 250                                    const char *name,
 251                                    unsigned int len)
 252{
 253        struct dlm_lock_resource *res;
 254        unsigned int hash = dlm_lockid_hash(name, len);
 255
 256        spin_lock(&dlm->spinlock);
 257        res = __dlm_lookup_lockres(dlm, name, len, hash);
 258        spin_unlock(&dlm->spinlock);
 259        return res;
 260}
 261
 262static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len)
 263{
 264        struct dlm_ctxt *tmp;
 265
 266        assert_spin_locked(&dlm_domain_lock);
 267
 268        /* tmp->name here is always NULL terminated,
 269         * but domain may not be! */
 270        list_for_each_entry(tmp, &dlm_domains, list) {
 271                if (strlen(tmp->name) == len &&
 272                    memcmp(tmp->name, domain, len)==0)
 273                        return tmp;
 274        }
 275
 276        return NULL;
 277}
 278
 279/* For null terminated domain strings ONLY */
 280static struct dlm_ctxt * __dlm_lookup_domain(const char *domain)
 281{
 282        assert_spin_locked(&dlm_domain_lock);
 283
 284        return __dlm_lookup_domain_full(domain, strlen(domain));
 285}
 286
 287
 288/* returns true on one of two conditions:
 289 * 1) the domain does not exist
 290 * 2) the domain exists and it's state is "joined" */
 291static int dlm_wait_on_domain_helper(const char *domain)
 292{
 293        int ret = 0;
 294        struct dlm_ctxt *tmp = NULL;
 295
 296        spin_lock(&dlm_domain_lock);
 297
 298        tmp = __dlm_lookup_domain(domain);
 299        if (!tmp)
 300                ret = 1;
 301        else if (tmp->dlm_state == DLM_CTXT_JOINED)
 302                ret = 1;
 303
 304        spin_unlock(&dlm_domain_lock);
 305        return ret;
 306}
 307
 308static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
 309{
 310        dlm_destroy_debugfs_subroot(dlm);
 311
 312        if (dlm->lockres_hash)
 313                dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
 314
 315        if (dlm->master_hash)
 316                dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES);
 317
 318        kfree(dlm->name);
 319        kfree(dlm);
 320}
 321
 322/* A little strange - this function will be called while holding
 323 * dlm_domain_lock and is expected to be holding it on the way out. We
 324 * will however drop and reacquire it multiple times */
 325static void dlm_ctxt_release(struct kref *kref)
 326{
 327        struct dlm_ctxt *dlm;
 328
 329        dlm = container_of(kref, struct dlm_ctxt, dlm_refs);
 330
 331        BUG_ON(dlm->num_joins);
 332        BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED);
 333
 334        /* we may still be in the list if we hit an error during join. */
 335        list_del_init(&dlm->list);
 336
 337        spin_unlock(&dlm_domain_lock);
 338
 339        mlog(0, "freeing memory from domain %s\n", dlm->name);
 340
 341        wake_up(&dlm_domain_events);
 342
 343        dlm_free_ctxt_mem(dlm);
 344
 345        spin_lock(&dlm_domain_lock);
 346}
 347
 348void dlm_put(struct dlm_ctxt *dlm)
 349{
 350        spin_lock(&dlm_domain_lock);
 351        kref_put(&dlm->dlm_refs, dlm_ctxt_release);
 352        spin_unlock(&dlm_domain_lock);
 353}
 354
 355static void __dlm_get(struct dlm_ctxt *dlm)
 356{
 357        kref_get(&dlm->dlm_refs);
 358}
 359
 360/* given a questionable reference to a dlm object, gets a reference if
 361 * it can find it in the list, otherwise returns NULL in which case
 362 * you shouldn't trust your pointer. */
 363struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm)
 364{
 365        struct dlm_ctxt *target;
 366        struct dlm_ctxt *ret = NULL;
 367
 368        spin_lock(&dlm_domain_lock);
 369
 370        list_for_each_entry(target, &dlm_domains, list) {
 371                if (target == dlm) {
 372                        __dlm_get(target);
 373                        ret = target;
 374                        break;
 375                }
 376        }
 377
 378        spin_unlock(&dlm_domain_lock);
 379
 380        return ret;
 381}
 382
 383int dlm_domain_fully_joined(struct dlm_ctxt *dlm)
 384{
 385        int ret;
 386
 387        spin_lock(&dlm_domain_lock);
 388        ret = (dlm->dlm_state == DLM_CTXT_JOINED) ||
 389                (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN);
 390        spin_unlock(&dlm_domain_lock);
 391
 392        return ret;
 393}
 394
 395static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm)
 396{
 397        if (dlm->dlm_worker) {
 398                flush_workqueue(dlm->dlm_worker);
 399                destroy_workqueue(dlm->dlm_worker);
 400                dlm->dlm_worker = NULL;
 401        }
 402}
 403
 404static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
 405{
 406        dlm_unregister_domain_handlers(dlm);
 407        dlm_debug_shutdown(dlm);
 408        dlm_complete_thread(dlm);
 409        dlm_complete_recovery_thread(dlm);
 410        dlm_destroy_dlm_worker(dlm);
 411
 412        /* We've left the domain. Now we can take ourselves out of the
 413         * list and allow the kref stuff to help us free the
 414         * memory. */
 415        spin_lock(&dlm_domain_lock);
 416        list_del_init(&dlm->list);
 417        spin_unlock(&dlm_domain_lock);
 418
 419        /* Wake up anyone waiting for us to remove this domain */
 420        wake_up(&dlm_domain_events);
 421}
 422
 423static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
 424{
 425        int i, num, n, ret = 0;
 426        struct dlm_lock_resource *res;
 427        struct hlist_node *iter;
 428        struct hlist_head *bucket;
 429        int dropped;
 430
 431        mlog(0, "Migrating locks from domain %s\n", dlm->name);
 432
 433        num = 0;
 434        spin_lock(&dlm->spinlock);
 435        for (i = 0; i < DLM_HASH_BUCKETS; i++) {
 436redo_bucket:
 437                n = 0;
 438                bucket = dlm_lockres_hash(dlm, i);
 439                iter = bucket->first;
 440                while (iter) {
 441                        n++;
 442                        res = hlist_entry(iter, struct dlm_lock_resource,
 443                                          hash_node);
 444                        dlm_lockres_get(res);
 445                        /* migrate, if necessary.  this will drop the dlm
 446                         * spinlock and retake it if it does migration. */
 447                        dropped = dlm_empty_lockres(dlm, res);
 448
 449                        spin_lock(&res->spinlock);
 450                        if (dropped)
 451                                __dlm_lockres_calc_usage(dlm, res);
 452                        else
 453                                iter = res->hash_node.next;
 454                        spin_unlock(&res->spinlock);
 455
 456                        dlm_lockres_put(res);
 457
 458                        if (dropped) {
 459                                cond_resched_lock(&dlm->spinlock);
 460                                goto redo_bucket;
 461                        }
 462                }
 463                cond_resched_lock(&dlm->spinlock);
 464                num += n;
 465        }
 466        spin_unlock(&dlm->spinlock);
 467        wake_up(&dlm->dlm_thread_wq);
 468
 469        /* let the dlm thread take care of purging, keep scanning until
 470         * nothing remains in the hash */
 471        if (num) {
 472                mlog(0, "%s: %d lock resources in hash last pass\n",
 473                     dlm->name, num);
 474                ret = -EAGAIN;
 475        }
 476        mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
 477        return ret;
 478}
 479
 480static int dlm_no_joining_node(struct dlm_ctxt *dlm)
 481{
 482        int ret;
 483
 484        spin_lock(&dlm->spinlock);
 485        ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN;
 486        spin_unlock(&dlm->spinlock);
 487
 488        return ret;
 489}
 490
 491static int dlm_begin_exit_domain_handler(struct o2net_msg *msg, u32 len,
 492                                         void *data, void **ret_data)
 493{
 494        struct dlm_ctxt *dlm = data;
 495        unsigned int node;
 496        struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
 497
 498        if (!dlm_grab(dlm))
 499                return 0;
 500
 501        node = exit_msg->node_idx;
 502        mlog(0, "%s: Node %u sent a begin exit domain message\n", dlm->name, node);
 503
 504        spin_lock(&dlm->spinlock);
 505        set_bit(node, dlm->exit_domain_map);
 506        spin_unlock(&dlm->spinlock);
 507
 508        dlm_put(dlm);
 509
 510        return 0;
 511}
 512
 513static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm)
 514{
 515        /* Yikes, a double spinlock! I need domain_lock for the dlm
 516         * state and the dlm spinlock for join state... Sorry! */
 517again:
 518        spin_lock(&dlm_domain_lock);
 519        spin_lock(&dlm->spinlock);
 520
 521        if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
 522                mlog(0, "Node %d is joining, we wait on it.\n",
 523                          dlm->joining_node);
 524                spin_unlock(&dlm->spinlock);
 525                spin_unlock(&dlm_domain_lock);
 526
 527                wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm));
 528                goto again;
 529        }
 530
 531        dlm->dlm_state = DLM_CTXT_LEAVING;
 532        spin_unlock(&dlm->spinlock);
 533        spin_unlock(&dlm_domain_lock);
 534}
 535
 536static void __dlm_print_nodes(struct dlm_ctxt *dlm)
 537{
 538        int node = -1, num = 0;
 539
 540        assert_spin_locked(&dlm->spinlock);
 541
 542        printk("( ");
 543        while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
 544                                     node + 1)) < O2NM_MAX_NODES) {
 545                printk("%d ", node);
 546                ++num;
 547        }
 548        printk(") %u nodes\n", num);
 549}
 550
 551static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
 552                                   void **ret_data)
 553{
 554        struct dlm_ctxt *dlm = data;
 555        unsigned int node;
 556        struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
 557
 558        mlog(0, "%p %u %p", msg, len, data);
 559
 560        if (!dlm_grab(dlm))
 561                return 0;
 562
 563        node = exit_msg->node_idx;
 564
 565        spin_lock(&dlm->spinlock);
 566        clear_bit(node, dlm->domain_map);
 567        clear_bit(node, dlm->exit_domain_map);
 568        printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s ", node, dlm->name);
 569        __dlm_print_nodes(dlm);
 570
 571        /* notify anything attached to the heartbeat events */
 572        dlm_hb_event_notify_attached(dlm, node, 0);
 573
 574        spin_unlock(&dlm->spinlock);
 575
 576        dlm_put(dlm);
 577
 578        return 0;
 579}
 580
 581static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm, u32 msg_type,
 582                                    unsigned int node)
 583{
 584        int status;
 585        struct dlm_exit_domain leave_msg;
 586
 587        mlog(0, "%s: Sending domain exit message %u to node %u\n", dlm->name,
 588             msg_type, node);
 589
 590        memset(&leave_msg, 0, sizeof(leave_msg));
 591        leave_msg.node_idx = dlm->node_num;
 592
 593        status = o2net_send_message(msg_type, dlm->key, &leave_msg,
 594                                    sizeof(leave_msg), node, NULL);
 595        if (status < 0)
 596                mlog(ML_ERROR, "Error %d sending domain exit message %u "
 597                     "to node %u on domain %s\n", status, msg_type, node,
 598                     dlm->name);
 599
 600        return status;
 601}
 602
 603static void dlm_begin_exit_domain(struct dlm_ctxt *dlm)
 604{
 605        int node = -1;
 606
 607        /* Support for begin exit domain was added in 1.2 */
 608        if (dlm->dlm_locking_proto.pv_major == 1 &&
 609            dlm->dlm_locking_proto.pv_minor < 2)
 610                return;
 611
 612        /*
 613         * Unlike DLM_EXIT_DOMAIN_MSG, DLM_BEGIN_EXIT_DOMAIN_MSG is purely
 614         * informational. Meaning if a node does not receive the message,
 615         * so be it.
 616         */
 617        spin_lock(&dlm->spinlock);
 618        while (1) {
 619                node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, node + 1);
 620                if (node >= O2NM_MAX_NODES)
 621                        break;
 622                if (node == dlm->node_num)
 623                        continue;
 624
 625                spin_unlock(&dlm->spinlock);
 626                dlm_send_one_domain_exit(dlm, DLM_BEGIN_EXIT_DOMAIN_MSG, node);
 627                spin_lock(&dlm->spinlock);
 628        }
 629        spin_unlock(&dlm->spinlock);
 630}
 631
 632static void dlm_leave_domain(struct dlm_ctxt *dlm)
 633{
 634        int node, clear_node, status;
 635
 636        /* At this point we've migrated away all our locks and won't
 637         * accept mastership of new ones. The dlm is responsible for
 638         * almost nothing now. We make sure not to confuse any joining
 639         * nodes and then commence shutdown procedure. */
 640
 641        spin_lock(&dlm->spinlock);
 642        /* Clear ourselves from the domain map */
 643        clear_bit(dlm->node_num, dlm->domain_map);
 644        while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
 645                                     0)) < O2NM_MAX_NODES) {
 646                /* Drop the dlm spinlock. This is safe wrt the domain_map.
 647                 * -nodes cannot be added now as the
 648                 *   query_join_handlers knows to respond with OK_NO_MAP
 649                 * -we catch the right network errors if a node is
 650                 *   removed from the map while we're sending him the
 651                 *   exit message. */
 652                spin_unlock(&dlm->spinlock);
 653
 654                clear_node = 1;
 655
 656                status = dlm_send_one_domain_exit(dlm, DLM_EXIT_DOMAIN_MSG,
 657                                                  node);
 658                if (status < 0 &&
 659                    status != -ENOPROTOOPT &&
 660                    status != -ENOTCONN) {
 661                        mlog(ML_NOTICE, "Error %d sending domain exit message "
 662                             "to node %d\n", status, node);
 663
 664                        /* Not sure what to do here but lets sleep for
 665                         * a bit in case this was a transient
 666                         * error... */
 667                        msleep(DLM_DOMAIN_BACKOFF_MS);
 668                        clear_node = 0;
 669                }
 670
 671                spin_lock(&dlm->spinlock);
 672                /* If we're not clearing the node bit then we intend
 673                 * to loop back around to try again. */
 674                if (clear_node)
 675                        clear_bit(node, dlm->domain_map);
 676        }
 677        spin_unlock(&dlm->spinlock);
 678}
 679
 680int dlm_shutting_down(struct dlm_ctxt *dlm)
 681{
 682        int ret = 0;
 683
 684        spin_lock(&dlm_domain_lock);
 685
 686        if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN)
 687                ret = 1;
 688
 689        spin_unlock(&dlm_domain_lock);
 690
 691        return ret;
 692}
 693
 694void dlm_unregister_domain(struct dlm_ctxt *dlm)
 695{
 696        int leave = 0;
 697        struct dlm_lock_resource *res;
 698
 699        spin_lock(&dlm_domain_lock);
 700        BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED);
 701        BUG_ON(!dlm->num_joins);
 702
 703        dlm->num_joins--;
 704        if (!dlm->num_joins) {
 705                /* We mark it "in shutdown" now so new register
 706                 * requests wait until we've completely left the
 707                 * domain. Don't use DLM_CTXT_LEAVING yet as we still
 708                 * want new domain joins to communicate with us at
 709                 * least until we've completed migration of our
 710                 * resources. */
 711                dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN;
 712                leave = 1;
 713        }
 714        spin_unlock(&dlm_domain_lock);
 715
 716        if (leave) {
 717                mlog(0, "shutting down domain %s\n", dlm->name);
 718                dlm_begin_exit_domain(dlm);
 719
 720                /* We changed dlm state, notify the thread */
 721                dlm_kick_thread(dlm, NULL);
 722
 723                while (dlm_migrate_all_locks(dlm)) {
 724                        /* Give dlm_thread time to purge the lockres' */
 725                        msleep(500);
 726                        mlog(0, "%s: more migration to do\n", dlm->name);
 727                }
 728
 729                /* This list should be empty. If not, print remaining lockres */
 730                if (!list_empty(&dlm->tracking_list)) {
 731                        mlog(ML_ERROR, "Following lockres' are still on the "
 732                             "tracking list:\n");
 733                        list_for_each_entry(res, &dlm->tracking_list, tracking)
 734                                dlm_print_one_lock_resource(res);
 735                }
 736
 737                dlm_mark_domain_leaving(dlm);
 738                dlm_leave_domain(dlm);
 739                printk(KERN_NOTICE "o2dlm: Leaving domain %s\n", dlm->name);
 740                dlm_force_free_mles(dlm);
 741                dlm_complete_dlm_shutdown(dlm);
 742        }
 743        dlm_put(dlm);
 744}
 745EXPORT_SYMBOL_GPL(dlm_unregister_domain);
 746
 747static int dlm_query_join_proto_check(char *proto_type, int node,
 748                                      struct dlm_protocol_version *ours,
 749                                      struct dlm_protocol_version *request)
 750{
 751        int rc;
 752        struct dlm_protocol_version proto = *request;
 753
 754        if (!dlm_protocol_compare(ours, &proto)) {
 755                mlog(0,
 756                     "node %u wanted to join with %s locking protocol "
 757                     "%u.%u, we respond with %u.%u\n",
 758                     node, proto_type,
 759                     request->pv_major,
 760                     request->pv_minor,
 761                     proto.pv_major, proto.pv_minor);
 762                request->pv_minor = proto.pv_minor;
 763                rc = 0;
 764        } else {
 765                mlog(ML_NOTICE,
 766                     "Node %u wanted to join with %s locking "
 767                     "protocol %u.%u, but we have %u.%u, disallowing\n",
 768                     node, proto_type,
 769                     request->pv_major,
 770                     request->pv_minor,
 771                     ours->pv_major,
 772                     ours->pv_minor);
 773                rc = 1;
 774        }
 775
 776        return rc;
 777}
 778
 779/*
 780 * struct dlm_query_join_packet is made up of four one-byte fields.  They
 781 * are effectively in big-endian order already.  However, little-endian
 782 * machines swap them before putting the packet on the wire (because
 783 * query_join's response is a status, and that status is treated as a u32
 784 * on the wire).  Thus, a big-endian and little-endian machines will treat
 785 * this structure differently.
 786 *
 787 * The solution is to have little-endian machines swap the structure when
 788 * converting from the structure to the u32 representation.  This will
 789 * result in the structure having the correct format on the wire no matter
 790 * the host endian format.
 791 */
 792static void dlm_query_join_packet_to_wire(struct dlm_query_join_packet *packet,
 793                                          u32 *wire)
 794{
 795        union dlm_query_join_response response;
 796
 797        response.packet = *packet;
 798        *wire = be32_to_cpu(response.intval);
 799}
 800
 801static void dlm_query_join_wire_to_packet(u32 wire,
 802                                          struct dlm_query_join_packet *packet)
 803{
 804        union dlm_query_join_response response;
 805
 806        response.intval = cpu_to_be32(wire);
 807        *packet = response.packet;
 808}
 809
 810static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
 811                                  void **ret_data)
 812{
 813        struct dlm_query_join_request *query;
 814        struct dlm_query_join_packet packet = {
 815                .code = JOIN_DISALLOW,
 816        };
 817        struct dlm_ctxt *dlm = NULL;
 818        u32 response;
 819        u8 nodenum;
 820
 821        query = (struct dlm_query_join_request *) msg->buf;
 822
 823        mlog(0, "node %u wants to join domain %s\n", query->node_idx,
 824                  query->domain);
 825
 826        /*
 827         * If heartbeat doesn't consider the node live, tell it
 828         * to back off and try again.  This gives heartbeat a chance
 829         * to catch up.
 830         */
 831        if (!o2hb_check_node_heartbeating_no_sem(query->node_idx)) {
 832                mlog(0, "node %u is not in our live map yet\n",
 833                     query->node_idx);
 834
 835                packet.code = JOIN_DISALLOW;
 836                goto respond;
 837        }
 838
 839        packet.code = JOIN_OK_NO_MAP;
 840
 841        spin_lock(&dlm_domain_lock);
 842        dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
 843        if (!dlm)
 844                goto unlock_respond;
 845
 846        /*
 847         * There is a small window where the joining node may not see the
 848         * node(s) that just left but still part of the cluster. DISALLOW
 849         * join request if joining node has different node map.
 850         */
 851        nodenum=0;
 852        while (nodenum < O2NM_MAX_NODES) {
 853                if (test_bit(nodenum, dlm->domain_map)) {
 854                        if (!byte_test_bit(nodenum, query->node_map)) {
 855                                mlog(0, "disallow join as node %u does not "
 856                                     "have node %u in its nodemap\n",
 857                                     query->node_idx, nodenum);
 858                                packet.code = JOIN_DISALLOW;
 859                                goto unlock_respond;
 860                        }
 861                }
 862                nodenum++;
 863        }
 864
 865        /* Once the dlm ctxt is marked as leaving then we don't want
 866         * to be put in someone's domain map.
 867         * Also, explicitly disallow joining at certain troublesome
 868         * times (ie. during recovery). */
 869        if (dlm->dlm_state != DLM_CTXT_LEAVING) {
 870                int bit = query->node_idx;
 871                spin_lock(&dlm->spinlock);
 872
 873                if (dlm->dlm_state == DLM_CTXT_NEW &&
 874                    dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) {
 875                        /*If this is a brand new context and we
 876                         * haven't started our join process yet, then
 877                         * the other node won the race. */
 878                        packet.code = JOIN_OK_NO_MAP;
 879                } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
 880                        /* Disallow parallel joins. */
 881                        packet.code = JOIN_DISALLOW;
 882                } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
 883                        mlog(0, "node %u trying to join, but recovery "
 884                             "is ongoing.\n", bit);
 885                        packet.code = JOIN_DISALLOW;
 886                } else if (test_bit(bit, dlm->recovery_map)) {
 887                        mlog(0, "node %u trying to join, but it "
 888                             "still needs recovery.\n", bit);
 889                        packet.code = JOIN_DISALLOW;
 890                } else if (test_bit(bit, dlm->domain_map)) {
 891                        mlog(0, "node %u trying to join, but it "
 892                             "is still in the domain! needs recovery?\n",
 893                             bit);
 894                        packet.code = JOIN_DISALLOW;
 895                } else {
 896                        /* Alright we're fully a part of this domain
 897                         * so we keep some state as to who's joining
 898                         * and indicate to him that needs to be fixed
 899                         * up. */
 900
 901                        /* Make sure we speak compatible locking protocols.  */
 902                        if (dlm_query_join_proto_check("DLM", bit,
 903                                                       &dlm->dlm_locking_proto,
 904                                                       &query->dlm_proto)) {
 905                                packet.code = JOIN_PROTOCOL_MISMATCH;
 906                        } else if (dlm_query_join_proto_check("fs", bit,
 907                                                              &dlm->fs_locking_proto,
 908                                                              &query->fs_proto)) {
 909                                packet.code = JOIN_PROTOCOL_MISMATCH;
 910                        } else {
 911                                packet.dlm_minor = query->dlm_proto.pv_minor;
 912                                packet.fs_minor = query->fs_proto.pv_minor;
 913                                packet.code = JOIN_OK;
 914                                __dlm_set_joining_node(dlm, query->node_idx);
 915                        }
 916                }
 917
 918                spin_unlock(&dlm->spinlock);
 919        }
 920unlock_respond:
 921        spin_unlock(&dlm_domain_lock);
 922
 923respond:
 924        mlog(0, "We respond with %u\n", packet.code);
 925
 926        dlm_query_join_packet_to_wire(&packet, &response);
 927        return response;
 928}
 929
 930static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
 931                                     void **ret_data)
 932{
 933        struct dlm_assert_joined *assert;
 934        struct dlm_ctxt *dlm = NULL;
 935
 936        assert = (struct dlm_assert_joined *) msg->buf;
 937
 938        mlog(0, "node %u asserts join on domain %s\n", assert->node_idx,
 939                  assert->domain);
 940
 941        spin_lock(&dlm_domain_lock);
 942        dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len);
 943        /* XXX should we consider no dlm ctxt an error? */
 944        if (dlm) {
 945                spin_lock(&dlm->spinlock);
 946
 947                /* Alright, this node has officially joined our
 948                 * domain. Set him in the map and clean up our
 949                 * leftover join state. */
 950                BUG_ON(dlm->joining_node != assert->node_idx);
 951
 952                if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
 953                        mlog(0, "dlm recovery is ongoing, disallow join\n");
 954                        spin_unlock(&dlm->spinlock);
 955                        spin_unlock(&dlm_domain_lock);
 956                        return -EAGAIN;
 957                }
 958
 959                set_bit(assert->node_idx, dlm->domain_map);
 960                clear_bit(assert->node_idx, dlm->exit_domain_map);
 961                __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
 962
 963                printk(KERN_NOTICE "o2dlm: Node %u joins domain %s ",
 964                       assert->node_idx, dlm->name);
 965                __dlm_print_nodes(dlm);
 966
 967                /* notify anything attached to the heartbeat events */
 968                dlm_hb_event_notify_attached(dlm, assert->node_idx, 1);
 969
 970                spin_unlock(&dlm->spinlock);
 971        }
 972        spin_unlock(&dlm_domain_lock);
 973
 974        return 0;
 975}
 976
 977static int dlm_match_regions(struct dlm_ctxt *dlm,
 978                             struct dlm_query_region *qr,
 979                             char *local, int locallen)
 980{
 981        char *remote = qr->qr_regions;
 982        char *l, *r;
 983        int localnr, i, j, foundit;
 984        int status = 0;
 985
 986        if (!o2hb_global_heartbeat_active()) {
 987                if (qr->qr_numregions) {
 988                        mlog(ML_ERROR, "Domain %s: Joining node %d has global "
 989                             "heartbeat enabled but local node %d does not\n",
 990                             qr->qr_domain, qr->qr_node, dlm->node_num);
 991                        status = -EINVAL;
 992                }
 993                goto bail;
 994        }
 995
 996        if (o2hb_global_heartbeat_active() && !qr->qr_numregions) {
 997                mlog(ML_ERROR, "Domain %s: Local node %d has global "
 998                     "heartbeat enabled but joining node %d does not\n",
 999                     qr->qr_domain, dlm->node_num, qr->qr_node);
1000                status = -EINVAL;
1001                goto bail;
1002        }
1003
1004        r = remote;
1005        for (i = 0; i < qr->qr_numregions; ++i) {
1006                mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, r);
1007                r += O2HB_MAX_REGION_NAME_LEN;
1008        }
1009
1010        localnr = min(O2NM_MAX_REGIONS, locallen/O2HB_MAX_REGION_NAME_LEN);
1011        localnr = o2hb_get_all_regions(local, (u8)localnr);
1012
1013        /* compare local regions with remote */
1014        l = local;
1015        for (i = 0; i < localnr; ++i) {
1016                foundit = 0;
1017                r = remote;
1018                for (j = 0; j <= qr->qr_numregions; ++j) {
1019                        if (!memcmp(l, r, O2HB_MAX_REGION_NAME_LEN)) {
1020                                foundit = 1;
1021                                break;
1022                        }
1023                        r += O2HB_MAX_REGION_NAME_LEN;
1024                }
1025                if (!foundit) {
1026                        status = -EINVAL;
1027                        mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
1028                             "in local node %d but not in joining node %d\n",
1029                             qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, l,
1030                             dlm->node_num, qr->qr_node);
1031                        goto bail;
1032                }
1033                l += O2HB_MAX_REGION_NAME_LEN;
1034        }
1035
1036        /* compare remote with local regions */
1037        r = remote;
1038        for (i = 0; i < qr->qr_numregions; ++i) {
1039                foundit = 0;
1040                l = local;
1041                for (j = 0; j < localnr; ++j) {
1042                        if (!memcmp(r, l, O2HB_MAX_REGION_NAME_LEN)) {
1043                                foundit = 1;
1044                                break;
1045                        }
1046                        l += O2HB_MAX_REGION_NAME_LEN;
1047                }
1048                if (!foundit) {
1049                        status = -EINVAL;
1050                        mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
1051                             "in joining node %d but not in local node %d\n",
1052                             qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, r,
1053                             qr->qr_node, dlm->node_num);
1054                        goto bail;
1055                }
1056                r += O2HB_MAX_REGION_NAME_LEN;
1057        }
1058
1059bail:
1060        return status;
1061}
1062
1063static int dlm_send_regions(struct dlm_ctxt *dlm, unsigned long *node_map)
1064{
1065        struct dlm_query_region *qr = NULL;
1066        int status, ret = 0, i;
1067        char *p;
1068
1069        if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1070                goto bail;
1071
1072        qr = kzalloc(sizeof(struct dlm_query_region), GFP_KERNEL);
1073        if (!qr) {
1074                ret = -ENOMEM;
1075                mlog_errno(ret);
1076                goto bail;
1077        }
1078
1079        qr->qr_node = dlm->node_num;
1080        qr->qr_namelen = strlen(dlm->name);
1081        memcpy(qr->qr_domain, dlm->name, qr->qr_namelen);
1082        /* if local hb, the numregions will be zero */
1083        if (o2hb_global_heartbeat_active())
1084                qr->qr_numregions = o2hb_get_all_regions(qr->qr_regions,
1085                                                         O2NM_MAX_REGIONS);
1086
1087        p = qr->qr_regions;
1088        for (i = 0; i < qr->qr_numregions; ++i, p += O2HB_MAX_REGION_NAME_LEN)
1089                mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, p);
1090
1091        i = -1;
1092        while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1093                                  i + 1)) < O2NM_MAX_NODES) {
1094                if (i == dlm->node_num)
1095                        continue;
1096
1097                mlog(0, "Sending regions to node %d\n", i);
1098
1099                ret = o2net_send_message(DLM_QUERY_REGION, DLM_MOD_KEY, qr,
1100                                         sizeof(struct dlm_query_region),
1101                                         i, &status);
1102                if (ret >= 0)
1103                        ret = status;
1104                if (ret) {
1105                        mlog(ML_ERROR, "Region mismatch %d, node %d\n",
1106                             ret, i);
1107                        break;
1108                }
1109        }
1110
1111bail:
1112        kfree(qr);
1113        return ret;
1114}
1115
1116static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
1117                                    void *data, void **ret_data)
1118{
1119        struct dlm_query_region *qr;
1120        struct dlm_ctxt *dlm = NULL;
1121        char *local = NULL;
1122        int status = 0;
1123
1124        qr = (struct dlm_query_region *) msg->buf;
1125
1126        mlog(0, "Node %u queries hb regions on domain %s\n", qr->qr_node,
1127             qr->qr_domain);
1128
1129        /* buffer used in dlm_mast_regions() */
1130        local = kmalloc(sizeof(qr->qr_regions), GFP_KERNEL);
1131        if (!local)
1132                return -ENOMEM;
1133
1134        status = -EINVAL;
1135
1136        spin_lock(&dlm_domain_lock);
1137        dlm = __dlm_lookup_domain_full(qr->qr_domain, qr->qr_namelen);
1138        if (!dlm) {
1139                mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1140                     "before join domain\n", qr->qr_node, qr->qr_domain);
1141                goto out_domain_lock;
1142        }
1143
1144        spin_lock(&dlm->spinlock);
1145        if (dlm->joining_node != qr->qr_node) {
1146                mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1147                     "but joining node is %d\n", qr->qr_node, qr->qr_domain,
1148                     dlm->joining_node);
1149                goto out_dlm_lock;
1150        }
1151
1152        /* Support for global heartbeat was added in 1.1 */
1153        if (dlm->dlm_locking_proto.pv_major == 1 &&
1154            dlm->dlm_locking_proto.pv_minor == 0) {
1155                mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1156                     "but active dlm protocol is %d.%d\n", qr->qr_node,
1157                     qr->qr_domain, dlm->dlm_locking_proto.pv_major,
1158                     dlm->dlm_locking_proto.pv_minor);
1159                goto out_dlm_lock;
1160        }
1161
1162        status = dlm_match_regions(dlm, qr, local, sizeof(qr->qr_regions));
1163
1164out_dlm_lock:
1165        spin_unlock(&dlm->spinlock);
1166
1167out_domain_lock:
1168        spin_unlock(&dlm_domain_lock);
1169
1170        kfree(local);
1171
1172        return status;
1173}
1174
1175static int dlm_match_nodes(struct dlm_ctxt *dlm, struct dlm_query_nodeinfo *qn)
1176{
1177        struct o2nm_node *local;
1178        struct dlm_node_info *remote;
1179        int i, j;
1180        int status = 0;
1181
1182        for (j = 0; j < qn->qn_numnodes; ++j)
1183                mlog(0, "Node %3d, %pI4:%u\n", qn->qn_nodes[j].ni_nodenum,
1184                     &(qn->qn_nodes[j].ni_ipv4_address),
1185                     ntohs(qn->qn_nodes[j].ni_ipv4_port));
1186
1187        for (i = 0; i < O2NM_MAX_NODES && !status; ++i) {
1188                local = o2nm_get_node_by_num(i);
1189                remote = NULL;
1190                for (j = 0; j < qn->qn_numnodes; ++j) {
1191                        if (qn->qn_nodes[j].ni_nodenum == i) {
1192                                remote = &(qn->qn_nodes[j]);
1193                                break;
1194                        }
1195                }
1196
1197                if (!local && !remote)
1198                        continue;
1199
1200                if ((local && !remote) || (!local && remote))
1201                        status = -EINVAL;
1202
1203                if (!status &&
1204                    ((remote->ni_nodenum != local->nd_num) ||
1205                     (remote->ni_ipv4_port != local->nd_ipv4_port) ||
1206                     (remote->ni_ipv4_address != local->nd_ipv4_address)))
1207                        status = -EINVAL;
1208
1209                if (status) {
1210                        if (remote && !local)
1211                                mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
1212                                     "registered in joining node %d but not in "
1213                                     "local node %d\n", qn->qn_domain,
1214                                     remote->ni_nodenum,
1215                                     &(remote->ni_ipv4_address),
1216                                     ntohs(remote->ni_ipv4_port),
1217                                     qn->qn_nodenum, dlm->node_num);
1218                        if (local && !remote)
1219                                mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
1220                                     "registered in local node %d but not in "
1221                                     "joining node %d\n", qn->qn_domain,
1222                                     local->nd_num, &(local->nd_ipv4_address),
1223                                     ntohs(local->nd_ipv4_port),
1224                                     dlm->node_num, qn->qn_nodenum);
1225                        BUG_ON((!local && !remote));
1226                }
1227
1228                if (local)
1229                        o2nm_node_put(local);
1230        }
1231
1232        return status;
1233}
1234
1235static int dlm_send_nodeinfo(struct dlm_ctxt *dlm, unsigned long *node_map)
1236{
1237        struct dlm_query_nodeinfo *qn = NULL;
1238        struct o2nm_node *node;
1239        int ret = 0, status, count, i;
1240
1241        if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1242                goto bail;
1243
1244        qn = kzalloc(sizeof(struct dlm_query_nodeinfo), GFP_KERNEL);
1245        if (!qn) {
1246                ret = -ENOMEM;
1247                mlog_errno(ret);
1248                goto bail;
1249        }
1250
1251        for (i = 0, count = 0; i < O2NM_MAX_NODES; ++i) {
1252                node = o2nm_get_node_by_num(i);
1253                if (!node)
1254                        continue;
1255                qn->qn_nodes[count].ni_nodenum = node->nd_num;
1256                qn->qn_nodes[count].ni_ipv4_port = node->nd_ipv4_port;
1257                qn->qn_nodes[count].ni_ipv4_address = node->nd_ipv4_address;
1258                mlog(0, "Node %3d, %pI4:%u\n", node->nd_num,
1259                     &(node->nd_ipv4_address), ntohs(node->nd_ipv4_port));
1260                ++count;
1261                o2nm_node_put(node);
1262        }
1263
1264        qn->qn_nodenum = dlm->node_num;
1265        qn->qn_numnodes = count;
1266        qn->qn_namelen = strlen(dlm->name);
1267        memcpy(qn->qn_domain, dlm->name, qn->qn_namelen);
1268
1269        i = -1;
1270        while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1271                                  i + 1)) < O2NM_MAX_NODES) {
1272                if (i == dlm->node_num)
1273                        continue;
1274
1275                mlog(0, "Sending nodeinfo to node %d\n", i);
1276
1277                ret = o2net_send_message(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
1278                                         qn, sizeof(struct dlm_query_nodeinfo),
1279                                         i, &status);
1280                if (ret >= 0)
1281                        ret = status;
1282                if (ret) {
1283                        mlog(ML_ERROR, "node mismatch %d, node %d\n", ret, i);
1284                        break;
1285                }
1286        }
1287
1288bail:
1289        kfree(qn);
1290        return ret;
1291}
1292
1293static int dlm_query_nodeinfo_handler(struct o2net_msg *msg, u32 len,
1294                                      void *data, void **ret_data)
1295{
1296        struct dlm_query_nodeinfo *qn;
1297        struct dlm_ctxt *dlm = NULL;
1298        int locked = 0, status = -EINVAL;
1299
1300        qn = (struct dlm_query_nodeinfo *) msg->buf;
1301
1302        mlog(0, "Node %u queries nodes on domain %s\n", qn->qn_nodenum,
1303             qn->qn_domain);
1304
1305        spin_lock(&dlm_domain_lock);
1306        dlm = __dlm_lookup_domain_full(qn->qn_domain, qn->qn_namelen);
1307        if (!dlm) {
1308                mlog(ML_ERROR, "Node %d queried nodes on domain %s before "
1309                     "join domain\n", qn->qn_nodenum, qn->qn_domain);
1310                goto bail;
1311        }
1312
1313        spin_lock(&dlm->spinlock);
1314        locked = 1;
1315        if (dlm->joining_node != qn->qn_nodenum) {
1316                mlog(ML_ERROR, "Node %d queried nodes on domain %s but "
1317                     "joining node is %d\n", qn->qn_nodenum, qn->qn_domain,
1318                     dlm->joining_node);
1319                goto bail;
1320        }
1321
1322        /* Support for node query was added in 1.1 */
1323        if (dlm->dlm_locking_proto.pv_major == 1 &&
1324            dlm->dlm_locking_proto.pv_minor == 0) {
1325                mlog(ML_ERROR, "Node %d queried nodes on domain %s "
1326                     "but active dlm protocol is %d.%d\n", qn->qn_nodenum,
1327                     qn->qn_domain, dlm->dlm_locking_proto.pv_major,
1328                     dlm->dlm_locking_proto.pv_minor);
1329                goto bail;
1330        }
1331
1332        status = dlm_match_nodes(dlm, qn);
1333
1334bail:
1335        if (locked)
1336                spin_unlock(&dlm->spinlock);
1337        spin_unlock(&dlm_domain_lock);
1338
1339        return status;
1340}
1341
1342static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
1343                                   void **ret_data)
1344{
1345        struct dlm_cancel_join *cancel;
1346        struct dlm_ctxt *dlm = NULL;
1347
1348        cancel = (struct dlm_cancel_join *) msg->buf;
1349
1350        mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx,
1351                  cancel->domain);
1352
1353        spin_lock(&dlm_domain_lock);
1354        dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len);
1355
1356        if (dlm) {
1357                spin_lock(&dlm->spinlock);
1358
1359                /* Yikes, this guy wants to cancel his join. No
1360                 * problem, we simply cleanup our join state. */
1361                BUG_ON(dlm->joining_node != cancel->node_idx);
1362                __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1363
1364                spin_unlock(&dlm->spinlock);
1365        }
1366        spin_unlock(&dlm_domain_lock);
1367
1368        return 0;
1369}
1370
1371static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm,
1372                                    unsigned int node)
1373{
1374        int status;
1375        struct dlm_cancel_join cancel_msg;
1376
1377        memset(&cancel_msg, 0, sizeof(cancel_msg));
1378        cancel_msg.node_idx = dlm->node_num;
1379        cancel_msg.name_len = strlen(dlm->name);
1380        memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len);
1381
1382        status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1383                                    &cancel_msg, sizeof(cancel_msg), node,
1384                                    NULL);
1385        if (status < 0) {
1386                mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1387                     "node %u\n", status, DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1388                     node);
1389                goto bail;
1390        }
1391
1392bail:
1393        return status;
1394}
1395
1396/* map_size should be in bytes. */
1397static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
1398                                 unsigned long *node_map,
1399                                 unsigned int map_size)
1400{
1401        int status, tmpstat;
1402        int node;
1403
1404        if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) *
1405                         sizeof(unsigned long))) {
1406                mlog(ML_ERROR,
1407                     "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n",
1408                     map_size, (unsigned)BITS_TO_LONGS(O2NM_MAX_NODES));
1409                return -EINVAL;
1410        }
1411
1412        status = 0;
1413        node = -1;
1414        while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
1415                                     node + 1)) < O2NM_MAX_NODES) {
1416                if (node == dlm->node_num)
1417                        continue;
1418
1419                tmpstat = dlm_send_one_join_cancel(dlm, node);
1420                if (tmpstat) {
1421                        mlog(ML_ERROR, "Error return %d cancelling join on "
1422                             "node %d\n", tmpstat, node);
1423                        if (!status)
1424                                status = tmpstat;
1425                }
1426        }
1427
1428        if (status)
1429                mlog_errno(status);
1430        return status;
1431}
1432
1433static int dlm_request_join(struct dlm_ctxt *dlm,
1434                            int node,
1435                            enum dlm_query_join_response_code *response)
1436{
1437        int status;
1438        struct dlm_query_join_request join_msg;
1439        struct dlm_query_join_packet packet;
1440        u32 join_resp;
1441
1442        mlog(0, "querying node %d\n", node);
1443
1444        memset(&join_msg, 0, sizeof(join_msg));
1445        join_msg.node_idx = dlm->node_num;
1446        join_msg.name_len = strlen(dlm->name);
1447        memcpy(join_msg.domain, dlm->name, join_msg.name_len);
1448        join_msg.dlm_proto = dlm->dlm_locking_proto;
1449        join_msg.fs_proto = dlm->fs_locking_proto;
1450
1451        /* copy live node map to join message */
1452        byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES);
1453
1454        status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
1455                                    sizeof(join_msg), node, &join_resp);
1456        if (status < 0 && status != -ENOPROTOOPT) {
1457                mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1458                     "node %u\n", status, DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1459                     node);
1460                goto bail;
1461        }
1462        dlm_query_join_wire_to_packet(join_resp, &packet);
1463
1464        /* -ENOPROTOOPT from the net code means the other side isn't
1465            listening for our message type -- that's fine, it means
1466            his dlm isn't up, so we can consider him a 'yes' but not
1467            joined into the domain.  */
1468        if (status == -ENOPROTOOPT) {
1469                status = 0;
1470                *response = JOIN_OK_NO_MAP;
1471        } else {
1472                *response = packet.code;
1473                switch (packet.code) {
1474                case JOIN_DISALLOW:
1475                case JOIN_OK_NO_MAP:
1476                        break;
1477                case JOIN_PROTOCOL_MISMATCH:
1478                        mlog(ML_NOTICE,
1479                             "This node requested DLM locking protocol %u.%u and "
1480                             "filesystem locking protocol %u.%u.  At least one of "
1481                             "the protocol versions on node %d is not compatible, "
1482                             "disconnecting\n",
1483                             dlm->dlm_locking_proto.pv_major,
1484                             dlm->dlm_locking_proto.pv_minor,
1485                             dlm->fs_locking_proto.pv_major,
1486                             dlm->fs_locking_proto.pv_minor,
1487                             node);
1488                        status = -EPROTO;
1489                        break;
1490                case JOIN_OK:
1491                        /* Use the same locking protocol as the remote node */
1492                        dlm->dlm_locking_proto.pv_minor = packet.dlm_minor;
1493                        dlm->fs_locking_proto.pv_minor = packet.fs_minor;
1494                        mlog(0,
1495                             "Node %d responds JOIN_OK with DLM locking protocol "
1496                             "%u.%u and fs locking protocol %u.%u\n",
1497                             node,
1498                             dlm->dlm_locking_proto.pv_major,
1499                             dlm->dlm_locking_proto.pv_minor,
1500                             dlm->fs_locking_proto.pv_major,
1501                             dlm->fs_locking_proto.pv_minor);
1502                        break;
1503                default:
1504                        status = -EINVAL;
1505                        mlog(ML_ERROR, "invalid response %d from node %u\n",
1506                             packet.code, node);
1507                        /* Reset response to JOIN_DISALLOW */
1508                        *response = JOIN_DISALLOW;
1509                        break;
1510                }
1511        }
1512
1513        mlog(0, "status %d, node %d response is %d\n", status, node,
1514             *response);
1515
1516bail:
1517        return status;
1518}
1519
1520static int dlm_send_one_join_assert(struct dlm_ctxt *dlm,
1521                                    unsigned int node)
1522{
1523        int status;
1524        int ret;
1525        struct dlm_assert_joined assert_msg;
1526
1527        mlog(0, "Sending join assert to node %u\n", node);
1528
1529        memset(&assert_msg, 0, sizeof(assert_msg));
1530        assert_msg.node_idx = dlm->node_num;
1531        assert_msg.name_len = strlen(dlm->name);
1532        memcpy(assert_msg.domain, dlm->name, assert_msg.name_len);
1533
1534        status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1535                                    &assert_msg, sizeof(assert_msg), node,
1536                                    &ret);
1537        if (status < 0)
1538                mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1539                     "node %u\n", status, DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1540                     node);
1541        else
1542                status = ret;
1543
1544        return status;
1545}
1546
1547static void dlm_send_join_asserts(struct dlm_ctxt *dlm,
1548                                  unsigned long *node_map)
1549{
1550        int status, node, live;
1551
1552        status = 0;
1553        node = -1;
1554        while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
1555                                     node + 1)) < O2NM_MAX_NODES) {
1556                if (node == dlm->node_num)
1557                        continue;
1558
1559                do {
1560                        /* It is very important that this message be
1561                         * received so we spin until either the node
1562                         * has died or it gets the message. */
1563                        status = dlm_send_one_join_assert(dlm, node);
1564
1565                        spin_lock(&dlm->spinlock);
1566                        live = test_bit(node, dlm->live_nodes_map);
1567                        spin_unlock(&dlm->spinlock);
1568
1569                        if (status) {
1570                                mlog(ML_ERROR, "Error return %d asserting "
1571                                     "join on node %d\n", status, node);
1572
1573                                /* give us some time between errors... */
1574                                if (live)
1575                                        msleep(DLM_DOMAIN_BACKOFF_MS);
1576                        }
1577                } while (status && live);
1578        }
1579}
1580
1581struct domain_join_ctxt {
1582        unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1583        unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1584};
1585
1586static int dlm_should_restart_join(struct dlm_ctxt *dlm,
1587                                   struct domain_join_ctxt *ctxt,
1588                                   enum dlm_query_join_response_code response)
1589{
1590        int ret;
1591
1592        if (response == JOIN_DISALLOW) {
1593                mlog(0, "Latest response of disallow -- should restart\n");
1594                return 1;
1595        }
1596
1597        spin_lock(&dlm->spinlock);
1598        /* For now, we restart the process if the node maps have
1599         * changed at all */
1600        ret = memcmp(ctxt->live_map, dlm->live_nodes_map,
1601                     sizeof(dlm->live_nodes_map));
1602        spin_unlock(&dlm->spinlock);
1603
1604        if (ret)
1605                mlog(0, "Node maps changed -- should restart\n");
1606
1607        return ret;
1608}
1609
1610static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
1611{
1612        int status = 0, tmpstat, node;
1613        struct domain_join_ctxt *ctxt;
1614        enum dlm_query_join_response_code response = JOIN_DISALLOW;
1615
1616        mlog(0, "%p", dlm);
1617
1618        ctxt = kzalloc(sizeof(*ctxt), GFP_KERNEL);
1619        if (!ctxt) {
1620                status = -ENOMEM;
1621                mlog_errno(status);
1622                goto bail;
1623        }
1624
1625        /* group sem locking should work for us here -- we're already
1626         * registered for heartbeat events so filling this should be
1627         * atomic wrt getting those handlers called. */
1628        o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map));
1629
1630        spin_lock(&dlm->spinlock);
1631        memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map));
1632
1633        __dlm_set_joining_node(dlm, dlm->node_num);
1634
1635        spin_unlock(&dlm->spinlock);
1636
1637        node = -1;
1638        while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES,
1639                                     node + 1)) < O2NM_MAX_NODES) {
1640                if (node == dlm->node_num)
1641                        continue;
1642
1643                status = dlm_request_join(dlm, node, &response);
1644                if (status < 0) {
1645                        mlog_errno(status);
1646                        goto bail;
1647                }
1648
1649                /* Ok, either we got a response or the node doesn't have a
1650                 * dlm up. */
1651                if (response == JOIN_OK)
1652                        set_bit(node, ctxt->yes_resp_map);
1653
1654                if (dlm_should_restart_join(dlm, ctxt, response)) {
1655                        status = -EAGAIN;
1656                        goto bail;
1657                }
1658        }
1659
1660        mlog(0, "Yay, done querying nodes!\n");
1661
1662        /* Yay, everyone agree's we can join the domain. My domain is
1663         * comprised of all nodes who were put in the
1664         * yes_resp_map. Copy that into our domain map and send a join
1665         * assert message to clean up everyone elses state. */
1666        spin_lock(&dlm->spinlock);
1667        memcpy(dlm->domain_map, ctxt->yes_resp_map,
1668               sizeof(ctxt->yes_resp_map));
1669        set_bit(dlm->node_num, dlm->domain_map);
1670        spin_unlock(&dlm->spinlock);
1671
1672        /* Support for global heartbeat and node info was added in 1.1 */
1673        if (dlm->dlm_locking_proto.pv_major > 1 ||
1674            dlm->dlm_locking_proto.pv_minor > 0) {
1675                status = dlm_send_nodeinfo(dlm, ctxt->yes_resp_map);
1676                if (status) {
1677                        mlog_errno(status);
1678                        goto bail;
1679                }
1680                status = dlm_send_regions(dlm, ctxt->yes_resp_map);
1681                if (status) {
1682                        mlog_errno(status);
1683                        goto bail;
1684                }
1685        }
1686
1687        dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
1688
1689        /* Joined state *must* be set before the joining node
1690         * information, otherwise the query_join handler may read no
1691         * current joiner but a state of NEW and tell joining nodes
1692         * we're not in the domain. */
1693        spin_lock(&dlm_domain_lock);
1694        dlm->dlm_state = DLM_CTXT_JOINED;
1695        dlm->num_joins++;
1696        spin_unlock(&dlm_domain_lock);
1697
1698bail:
1699        spin_lock(&dlm->spinlock);
1700        __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1701        if (!status) {
1702                printk(KERN_NOTICE "o2dlm: Joining domain %s ", dlm->name);
1703                __dlm_print_nodes(dlm);
1704        }
1705        spin_unlock(&dlm->spinlock);
1706
1707        if (ctxt) {
1708                /* Do we need to send a cancel message to any nodes? */
1709                if (status < 0) {
1710                        tmpstat = dlm_send_join_cancels(dlm,
1711                                                        ctxt->yes_resp_map,
1712                                                        sizeof(ctxt->yes_resp_map));
1713                        if (tmpstat < 0)
1714                                mlog_errno(tmpstat);
1715                }
1716                kfree(ctxt);
1717        }
1718
1719        mlog(0, "returning %d\n", status);
1720        return status;
1721}
1722
1723static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm)
1724{
1725        o2hb_unregister_callback(dlm->name, &dlm->dlm_hb_up);
1726        o2hb_unregister_callback(dlm->name, &dlm->dlm_hb_down);
1727        o2net_unregister_handler_list(&dlm->dlm_domain_handlers);
1728}
1729
1730static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
1731{
1732        int status;
1733
1734        mlog(0, "registering handlers.\n");
1735
1736        o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB,
1737                            dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI);
1738        o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB,
1739                            dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI);
1740
1741        status = o2hb_register_callback(dlm->name, &dlm->dlm_hb_down);
1742        if (status)
1743                goto bail;
1744
1745        status = o2hb_register_callback(dlm->name, &dlm->dlm_hb_up);
1746        if (status)
1747                goto bail;
1748
1749        status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
1750                                        sizeof(struct dlm_master_request),
1751                                        dlm_master_request_handler,
1752                                        dlm, NULL, &dlm->dlm_domain_handlers);
1753        if (status)
1754                goto bail;
1755
1756        status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
1757                                        sizeof(struct dlm_assert_master),
1758                                        dlm_assert_master_handler,
1759                                        dlm, dlm_assert_master_post_handler,
1760                                        &dlm->dlm_domain_handlers);
1761        if (status)
1762                goto bail;
1763
1764        status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
1765                                        sizeof(struct dlm_create_lock),
1766                                        dlm_create_lock_handler,
1767                                        dlm, NULL, &dlm->dlm_domain_handlers);
1768        if (status)
1769                goto bail;
1770
1771        status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
1772                                        DLM_CONVERT_LOCK_MAX_LEN,
1773                                        dlm_convert_lock_handler,
1774                                        dlm, NULL, &dlm->dlm_domain_handlers);
1775        if (status)
1776                goto bail;
1777
1778        status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
1779                                        DLM_UNLOCK_LOCK_MAX_LEN,
1780                                        dlm_unlock_lock_handler,
1781                                        dlm, NULL, &dlm->dlm_domain_handlers);
1782        if (status)
1783                goto bail;
1784
1785        status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
1786                                        DLM_PROXY_AST_MAX_LEN,
1787                                        dlm_proxy_ast_handler,
1788                                        dlm, NULL, &dlm->dlm_domain_handlers);
1789        if (status)
1790                goto bail;
1791
1792        status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
1793                                        sizeof(struct dlm_exit_domain),
1794                                        dlm_exit_domain_handler,
1795                                        dlm, NULL, &dlm->dlm_domain_handlers);
1796        if (status)
1797                goto bail;
1798
1799        status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
1800                                        sizeof(struct dlm_deref_lockres),
1801                                        dlm_deref_lockres_handler,
1802                                        dlm, NULL, &dlm->dlm_domain_handlers);
1803        if (status)
1804                goto bail;
1805
1806        status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1807                                        sizeof(struct dlm_migrate_request),
1808                                        dlm_migrate_request_handler,
1809                                        dlm, NULL, &dlm->dlm_domain_handlers);
1810        if (status)
1811                goto bail;
1812
1813        status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
1814                                        DLM_MIG_LOCKRES_MAX_LEN,
1815                                        dlm_mig_lockres_handler,
1816                                        dlm, NULL, &dlm->dlm_domain_handlers);
1817        if (status)
1818                goto bail;
1819
1820        status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
1821                                        sizeof(struct dlm_master_requery),
1822                                        dlm_master_requery_handler,
1823                                        dlm, NULL, &dlm->dlm_domain_handlers);
1824        if (status)
1825                goto bail;
1826
1827        status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
1828                                        sizeof(struct dlm_lock_request),
1829                                        dlm_request_all_locks_handler,
1830                                        dlm, NULL, &dlm->dlm_domain_handlers);
1831        if (status)
1832                goto bail;
1833
1834        status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
1835                                        sizeof(struct dlm_reco_data_done),
1836                                        dlm_reco_data_done_handler,
1837                                        dlm, NULL, &dlm->dlm_domain_handlers);
1838        if (status)
1839                goto bail;
1840
1841        status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
1842                                        sizeof(struct dlm_begin_reco),
1843                                        dlm_begin_reco_handler,
1844                                        dlm, NULL, &dlm->dlm_domain_handlers);
1845        if (status)
1846                goto bail;
1847
1848        status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
1849                                        sizeof(struct dlm_finalize_reco),
1850                                        dlm_finalize_reco_handler,
1851                                        dlm, NULL, &dlm->dlm_domain_handlers);
1852        if (status)
1853                goto bail;
1854
1855        status = o2net_register_handler(DLM_BEGIN_EXIT_DOMAIN_MSG, dlm->key,
1856                                        sizeof(struct dlm_exit_domain),
1857                                        dlm_begin_exit_domain_handler,
1858                                        dlm, NULL, &dlm->dlm_domain_handlers);
1859        if (status)
1860                goto bail;
1861
1862        status = o2net_register_handler(DLM_DEREF_LOCKRES_DONE, dlm->key,
1863                                        sizeof(struct dlm_deref_lockres_done),
1864                                        dlm_deref_lockres_done_handler,
1865                                        dlm, NULL, &dlm->dlm_domain_handlers);
1866bail:
1867        if (status)
1868                dlm_unregister_domain_handlers(dlm);
1869
1870        return status;
1871}
1872
1873static int dlm_join_domain(struct dlm_ctxt *dlm)
1874{
1875        int status;
1876        unsigned int backoff;
1877        unsigned int total_backoff = 0;
1878        char wq_name[O2NM_MAX_NAME_LEN];
1879
1880        BUG_ON(!dlm);
1881
1882        mlog(0, "Join domain %s\n", dlm->name);
1883
1884        status = dlm_register_domain_handlers(dlm);
1885        if (status) {
1886                mlog_errno(status);
1887                goto bail;
1888        }
1889
1890        status = dlm_launch_thread(dlm);
1891        if (status < 0) {
1892                mlog_errno(status);
1893                goto bail;
1894        }
1895
1896        status = dlm_launch_recovery_thread(dlm);
1897        if (status < 0) {
1898                mlog_errno(status);
1899                goto bail;
1900        }
1901
1902        status = dlm_debug_init(dlm);
1903        if (status < 0) {
1904                mlog_errno(status);
1905                goto bail;
1906        }
1907
1908        snprintf(wq_name, O2NM_MAX_NAME_LEN, "dlm_wq-%s", dlm->name);
1909        dlm->dlm_worker = create_singlethread_workqueue(wq_name);
1910        if (!dlm->dlm_worker) {
1911                status = -ENOMEM;
1912                mlog_errno(status);
1913                goto bail;
1914        }
1915
1916        do {
1917                status = dlm_try_to_join_domain(dlm);
1918
1919                /* If we're racing another node to the join, then we
1920                 * need to back off temporarily and let them
1921                 * complete. */
1922#define DLM_JOIN_TIMEOUT_MSECS  90000
1923                if (status == -EAGAIN) {
1924                        if (signal_pending(current)) {
1925                                status = -ERESTARTSYS;
1926                                goto bail;
1927                        }
1928
1929                        if (total_backoff > DLM_JOIN_TIMEOUT_MSECS) {
1930                                status = -ERESTARTSYS;
1931                                mlog(ML_NOTICE, "Timed out joining dlm domain "
1932                                     "%s after %u msecs\n", dlm->name,
1933                                     total_backoff);
1934                                goto bail;
1935                        }
1936
1937                        /*
1938                         * <chip> After you!
1939                         * <dale> No, after you!
1940                         * <chip> I insist!
1941                         * <dale> But you first!
1942                         * ...
1943                         */
1944                        backoff = (unsigned int)(jiffies & 0x3);
1945                        backoff *= DLM_DOMAIN_BACKOFF_MS;
1946                        total_backoff += backoff;
1947                        mlog(0, "backoff %d\n", backoff);
1948                        msleep(backoff);
1949                }
1950        } while (status == -EAGAIN);
1951
1952        if (status < 0) {
1953                mlog_errno(status);
1954                goto bail;
1955        }
1956
1957        status = 0;
1958bail:
1959        wake_up(&dlm_domain_events);
1960
1961        if (status) {
1962                dlm_unregister_domain_handlers(dlm);
1963                dlm_debug_shutdown(dlm);
1964                dlm_complete_thread(dlm);
1965                dlm_complete_recovery_thread(dlm);
1966                dlm_destroy_dlm_worker(dlm);
1967        }
1968
1969        return status;
1970}
1971
1972static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1973                                u32 key)
1974{
1975        int i;
1976        int ret;
1977        struct dlm_ctxt *dlm = NULL;
1978
1979        dlm = kzalloc(sizeof(*dlm), GFP_KERNEL);
1980        if (!dlm) {
1981                ret = -ENOMEM;
1982                mlog_errno(ret);
1983                goto leave;
1984        }
1985
1986        dlm->name = kstrdup(domain, GFP_KERNEL);
1987        if (dlm->name == NULL) {
1988                ret = -ENOMEM;
1989                mlog_errno(ret);
1990                goto leave;
1991        }
1992
1993        dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES);
1994        if (!dlm->lockres_hash) {
1995                ret = -ENOMEM;
1996                mlog_errno(ret);
1997                goto leave;
1998        }
1999
2000        for (i = 0; i < DLM_HASH_BUCKETS; i++)
2001                INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i));
2002
2003        dlm->master_hash = (struct hlist_head **)
2004                                dlm_alloc_pagevec(DLM_HASH_PAGES);
2005        if (!dlm->master_hash) {
2006                ret = -ENOMEM;
2007                mlog_errno(ret);
2008                goto leave;
2009        }
2010
2011        for (i = 0; i < DLM_HASH_BUCKETS; i++)
2012                INIT_HLIST_HEAD(dlm_master_hash(dlm, i));
2013
2014        dlm->key = key;
2015        dlm->node_num = o2nm_this_node();
2016
2017        ret = dlm_create_debugfs_subroot(dlm);
2018        if (ret < 0)
2019                goto leave;
2020
2021        spin_lock_init(&dlm->spinlock);
2022        spin_lock_init(&dlm->master_lock);
2023        spin_lock_init(&dlm->ast_lock);
2024        spin_lock_init(&dlm->track_lock);
2025        INIT_LIST_HEAD(&dlm->list);
2026        INIT_LIST_HEAD(&dlm->dirty_list);
2027        INIT_LIST_HEAD(&dlm->reco.resources);
2028        INIT_LIST_HEAD(&dlm->reco.node_data);
2029        INIT_LIST_HEAD(&dlm->purge_list);
2030        INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
2031        INIT_LIST_HEAD(&dlm->tracking_list);
2032        dlm->reco.state = 0;
2033
2034        INIT_LIST_HEAD(&dlm->pending_asts);
2035        INIT_LIST_HEAD(&dlm->pending_basts);
2036
2037        mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n",
2038                  dlm->recovery_map, &(dlm->recovery_map[0]));
2039
2040        memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map));
2041        memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map));
2042        memset(dlm->domain_map, 0, sizeof(dlm->domain_map));
2043
2044        dlm->dlm_thread_task = NULL;
2045        dlm->dlm_reco_thread_task = NULL;
2046        dlm->dlm_worker = NULL;
2047        init_waitqueue_head(&dlm->dlm_thread_wq);
2048        init_waitqueue_head(&dlm->dlm_reco_thread_wq);
2049        init_waitqueue_head(&dlm->reco.event);
2050        init_waitqueue_head(&dlm->ast_wq);
2051        init_waitqueue_head(&dlm->migration_wq);
2052        INIT_LIST_HEAD(&dlm->mle_hb_events);
2053
2054        dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
2055        init_waitqueue_head(&dlm->dlm_join_events);
2056
2057        dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
2058        dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
2059
2060        atomic_set(&dlm->res_tot_count, 0);
2061        atomic_set(&dlm->res_cur_count, 0);
2062        for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) {
2063                atomic_set(&dlm->mle_tot_count[i], 0);
2064                atomic_set(&dlm->mle_cur_count[i], 0);
2065        }
2066
2067        spin_lock_init(&dlm->work_lock);
2068        INIT_LIST_HEAD(&dlm->work_list);
2069        INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work);
2070
2071        kref_init(&dlm->dlm_refs);
2072        dlm->dlm_state = DLM_CTXT_NEW;
2073
2074        INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks);
2075
2076        mlog(0, "context init: refcount %u\n",
2077                  atomic_read(&dlm->dlm_refs.refcount));
2078
2079leave:
2080        if (ret < 0 && dlm) {
2081                if (dlm->master_hash)
2082                        dlm_free_pagevec((void **)dlm->master_hash,
2083                                        DLM_HASH_PAGES);
2084
2085                if (dlm->lockres_hash)
2086                        dlm_free_pagevec((void **)dlm->lockres_hash,
2087                                        DLM_HASH_PAGES);
2088
2089                kfree(dlm->name);
2090                kfree(dlm);
2091                dlm = NULL;
2092        }
2093        return dlm;
2094}
2095
2096/*
2097 * Compare a requested locking protocol version against the current one.
2098 *
2099 * If the major numbers are different, they are incompatible.
2100 * If the current minor is greater than the request, they are incompatible.
2101 * If the current minor is less than or equal to the request, they are
2102 * compatible, and the requester should run at the current minor version.
2103 */
2104static int dlm_protocol_compare(struct dlm_protocol_version *existing,
2105                                struct dlm_protocol_version *request)
2106{
2107        if (existing->pv_major != request->pv_major)
2108                return 1;
2109
2110        if (existing->pv_minor > request->pv_minor)
2111                return 1;
2112
2113        if (existing->pv_minor < request->pv_minor)
2114                request->pv_minor = existing->pv_minor;
2115
2116        return 0;
2117}
2118
2119/*
2120 * dlm_register_domain: one-time setup per "domain".
2121 *
2122 * The filesystem passes in the requested locking version via proto.
2123 * If registration was successful, proto will contain the negotiated
2124 * locking protocol.
2125 */
2126struct dlm_ctxt * dlm_register_domain(const char *domain,
2127                               u32 key,
2128                               struct dlm_protocol_version *fs_proto)
2129{
2130        int ret;
2131        struct dlm_ctxt *dlm = NULL;
2132        struct dlm_ctxt *new_ctxt = NULL;
2133
2134        if (strlen(domain) >= O2NM_MAX_NAME_LEN) {
2135                ret = -ENAMETOOLONG;
2136                mlog(ML_ERROR, "domain name length too long\n");
2137                goto leave;
2138        }
2139
2140        mlog(0, "register called for domain \"%s\"\n", domain);
2141
2142retry:
2143        dlm = NULL;
2144        if (signal_pending(current)) {
2145                ret = -ERESTARTSYS;
2146                mlog_errno(ret);
2147                goto leave;
2148        }
2149
2150        spin_lock(&dlm_domain_lock);
2151
2152        dlm = __dlm_lookup_domain(domain);
2153        if (dlm) {
2154                if (dlm->dlm_state != DLM_CTXT_JOINED) {
2155                        spin_unlock(&dlm_domain_lock);
2156
2157                        mlog(0, "This ctxt is not joined yet!\n");
2158                        wait_event_interruptible(dlm_domain_events,
2159                                                 dlm_wait_on_domain_helper(
2160                                                         domain));
2161                        goto retry;
2162                }
2163
2164                if (dlm_protocol_compare(&dlm->fs_locking_proto, fs_proto)) {
2165                        spin_unlock(&dlm_domain_lock);
2166                        mlog(ML_ERROR,
2167                             "Requested locking protocol version is not "
2168                             "compatible with already registered domain "
2169                             "\"%s\"\n", domain);
2170                        ret = -EPROTO;
2171                        goto leave;
2172                }
2173
2174                __dlm_get(dlm);
2175                dlm->num_joins++;
2176
2177                spin_unlock(&dlm_domain_lock);
2178
2179                ret = 0;
2180                goto leave;
2181        }
2182
2183        /* doesn't exist */
2184        if (!new_ctxt) {
2185                spin_unlock(&dlm_domain_lock);
2186
2187                new_ctxt = dlm_alloc_ctxt(domain, key);
2188                if (new_ctxt)
2189                        goto retry;
2190
2191                ret = -ENOMEM;
2192                mlog_errno(ret);
2193                goto leave;
2194        }
2195
2196        /* a little variable switch-a-roo here... */
2197        dlm = new_ctxt;
2198        new_ctxt = NULL;
2199
2200        /* add the new domain */
2201        list_add_tail(&dlm->list, &dlm_domains);
2202        spin_unlock(&dlm_domain_lock);
2203
2204        /*
2205         * Pass the locking protocol version into the join.  If the join
2206         * succeeds, it will have the negotiated protocol set.
2207         */
2208        dlm->dlm_locking_proto = dlm_protocol;
2209        dlm->fs_locking_proto = *fs_proto;
2210
2211        ret = dlm_join_domain(dlm);
2212        if (ret) {
2213                mlog_errno(ret);
2214                dlm_put(dlm);
2215                goto leave;
2216        }
2217
2218        /* Tell the caller what locking protocol we negotiated */
2219        *fs_proto = dlm->fs_locking_proto;
2220
2221        ret = 0;
2222leave:
2223        if (new_ctxt)
2224                dlm_free_ctxt_mem(new_ctxt);
2225
2226        if (ret < 0)
2227                dlm = ERR_PTR(ret);
2228
2229        return dlm;
2230}
2231EXPORT_SYMBOL_GPL(dlm_register_domain);
2232
2233static LIST_HEAD(dlm_join_handlers);
2234
2235static void dlm_unregister_net_handlers(void)
2236{
2237        o2net_unregister_handler_list(&dlm_join_handlers);
2238}
2239
2240static int dlm_register_net_handlers(void)
2241{
2242        int status = 0;
2243
2244        status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
2245                                        sizeof(struct dlm_query_join_request),
2246                                        dlm_query_join_handler,
2247                                        NULL, NULL, &dlm_join_handlers);
2248        if (status)
2249                goto bail;
2250
2251        status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
2252                                        sizeof(struct dlm_assert_joined),
2253                                        dlm_assert_joined_handler,
2254                                        NULL, NULL, &dlm_join_handlers);
2255        if (status)
2256                goto bail;
2257
2258        status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
2259                                        sizeof(struct dlm_cancel_join),
2260                                        dlm_cancel_join_handler,
2261                                        NULL, NULL, &dlm_join_handlers);
2262        if (status)
2263                goto bail;
2264
2265        status = o2net_register_handler(DLM_QUERY_REGION, DLM_MOD_KEY,
2266                                        sizeof(struct dlm_query_region),
2267                                        dlm_query_region_handler,
2268                                        NULL, NULL, &dlm_join_handlers);
2269
2270        if (status)
2271                goto bail;
2272
2273        status = o2net_register_handler(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
2274                                        sizeof(struct dlm_query_nodeinfo),
2275                                        dlm_query_nodeinfo_handler,
2276                                        NULL, NULL, &dlm_join_handlers);
2277bail:
2278        if (status < 0)
2279                dlm_unregister_net_handlers();
2280
2281        return status;
2282}
2283
2284/* Domain eviction callback handling.
2285 *
2286 * The file system requires notification of node death *before* the
2287 * dlm completes it's recovery work, otherwise it may be able to
2288 * acquire locks on resources requiring recovery. Since the dlm can
2289 * evict a node from it's domain *before* heartbeat fires, a similar
2290 * mechanism is required. */
2291
2292/* Eviction is not expected to happen often, so a per-domain lock is
2293 * not necessary. Eviction callbacks are allowed to sleep for short
2294 * periods of time. */
2295static DECLARE_RWSEM(dlm_callback_sem);
2296
2297void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm,
2298                                        int node_num)
2299{
2300        struct dlm_eviction_cb *cb;
2301
2302        down_read(&dlm_callback_sem);
2303        list_for_each_entry(cb, &dlm->dlm_eviction_callbacks, ec_item) {
2304                cb->ec_func(node_num, cb->ec_data);
2305        }
2306        up_read(&dlm_callback_sem);
2307}
2308
2309void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb,
2310                           dlm_eviction_func *f,
2311                           void *data)
2312{
2313        INIT_LIST_HEAD(&cb->ec_item);
2314        cb->ec_func = f;
2315        cb->ec_data = data;
2316}
2317EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb);
2318
2319void dlm_register_eviction_cb(struct dlm_ctxt *dlm,
2320                              struct dlm_eviction_cb *cb)
2321{
2322        down_write(&dlm_callback_sem);
2323        list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks);
2324        up_write(&dlm_callback_sem);
2325}
2326EXPORT_SYMBOL_GPL(dlm_register_eviction_cb);
2327
2328void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb)
2329{
2330        down_write(&dlm_callback_sem);
2331        list_del_init(&cb->ec_item);
2332        up_write(&dlm_callback_sem);
2333}
2334EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb);
2335
2336static int __init dlm_init(void)
2337{
2338        int status;
2339
2340        status = dlm_init_mle_cache();
2341        if (status) {
2342                mlog(ML_ERROR, "Could not create o2dlm_mle slabcache\n");
2343                goto error;
2344        }
2345
2346        status = dlm_init_master_caches();
2347        if (status) {
2348                mlog(ML_ERROR, "Could not create o2dlm_lockres and "
2349                     "o2dlm_lockname slabcaches\n");
2350                goto error;
2351        }
2352
2353        status = dlm_init_lock_cache();
2354        if (status) {
2355                mlog(ML_ERROR, "Count not create o2dlm_lock slabcache\n");
2356                goto error;
2357        }
2358
2359        status = dlm_register_net_handlers();
2360        if (status) {
2361                mlog(ML_ERROR, "Unable to register network handlers\n");
2362                goto error;
2363        }
2364
2365        status = dlm_create_debugfs_root();
2366        if (status)
2367                goto error;
2368
2369        return 0;
2370error:
2371        dlm_unregister_net_handlers();
2372        dlm_destroy_lock_cache();
2373        dlm_destroy_master_caches();
2374        dlm_destroy_mle_cache();
2375        return -1;
2376}
2377
2378static void __exit dlm_exit (void)
2379{
2380        dlm_destroy_debugfs_root();
2381        dlm_unregister_net_handlers();
2382        dlm_destroy_lock_cache();
2383        dlm_destroy_master_caches();
2384        dlm_destroy_mle_cache();
2385}
2386
2387MODULE_AUTHOR("Oracle");
2388MODULE_LICENSE("GPL");
2389MODULE_DESCRIPTION("OCFS2 Distributed Lock Management");
2390
2391module_init(dlm_init);
2392module_exit(dlm_exit);
2393