linux/fs/ocfs2/dlm/dlmdomain.c
<<
>>
Prefs
   1/* -*- mode: c; c-basic-offset: 8; -*-
   2 * vim: noexpandtab sw=8 ts=8 sts=0:
   3 *
   4 * dlmdomain.c
   5 *
   6 * defines domain join / leave apis
   7 *
   8 * Copyright (C) 2004 Oracle.  All rights reserved.
   9 *
  10 * This program is free software; you can redistribute it and/or
  11 * modify it under the terms of the GNU General Public
  12 * License as published by the Free Software Foundation; either
  13 * version 2 of the License, or (at your option) any later version.
  14 *
  15 * This program is distributed in the hope that it will be useful,
  16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  18 * General Public License for more details.
  19 *
  20 * You should have received a copy of the GNU General Public
  21 * License along with this program; if not, write to the
  22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  23 * Boston, MA 021110-1307, USA.
  24 *
  25 */
  26
  27#include <linux/module.h>
  28#include <linux/types.h>
  29#include <linux/slab.h>
  30#include <linux/highmem.h>
  31#include <linux/init.h>
  32#include <linux/spinlock.h>
  33#include <linux/delay.h>
  34#include <linux/err.h>
  35#include <linux/debugfs.h>
  36#include <linux/sched/signal.h>
  37
  38#include "cluster/heartbeat.h"
  39#include "cluster/nodemanager.h"
  40#include "cluster/tcp.h"
  41
  42#include "dlmapi.h"
  43#include "dlmcommon.h"
  44#include "dlmdomain.h"
  45#include "dlmdebug.h"
  46
  47#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
  48#include "cluster/masklog.h"
  49
  50/*
  51 * ocfs2 node maps are array of long int, which limits to send them freely
  52 * across the wire due to endianness issues. To workaround this, we convert
  53 * long ints to byte arrays. Following 3 routines are helper functions to
  54 * set/test/copy bits within those array of bytes
  55 */
  56static inline void byte_set_bit(u8 nr, u8 map[])
  57{
  58        map[nr >> 3] |= (1UL << (nr & 7));
  59}
  60
  61static inline int byte_test_bit(u8 nr, u8 map[])
  62{
  63        return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0;
  64}
  65
  66static inline void byte_copymap(u8 dmap[], unsigned long smap[],
  67                        unsigned int sz)
  68{
  69        unsigned int nn;
  70
  71        if (!sz)
  72                return;
  73
  74        memset(dmap, 0, ((sz + 7) >> 3));
  75        for (nn = 0 ; nn < sz; nn++)
  76                if (test_bit(nn, smap))
  77                        byte_set_bit(nn, dmap);
  78}
  79
  80static void dlm_free_pagevec(void **vec, int pages)
  81{
  82        while (pages--)
  83                free_page((unsigned long)vec[pages]);
  84        kfree(vec);
  85}
  86
  87static void **dlm_alloc_pagevec(int pages)
  88{
  89        void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL);
  90        int i;
  91
  92        if (!vec)
  93                return NULL;
  94
  95        for (i = 0; i < pages; i++)
  96                if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL)))
  97                        goto out_free;
  98
  99        mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n",
 100             pages, (unsigned long)DLM_HASH_PAGES,
 101             (unsigned long)DLM_BUCKETS_PER_PAGE);
 102        return vec;
 103out_free:
 104        dlm_free_pagevec(vec, i);
 105        return NULL;
 106}
 107
 108/*
 109 *
 110 * spinlock lock ordering: if multiple locks are needed, obey this ordering:
 111 *    dlm_domain_lock
 112 *    struct dlm_ctxt->spinlock
 113 *    struct dlm_lock_resource->spinlock
 114 *    struct dlm_ctxt->master_lock
 115 *    struct dlm_ctxt->ast_lock
 116 *    dlm_master_list_entry->spinlock
 117 *    dlm_lock->spinlock
 118 *
 119 */
 120
 121DEFINE_SPINLOCK(dlm_domain_lock);
 122LIST_HEAD(dlm_domains);
 123static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
 124
 125/*
 126 * The supported protocol version for DLM communication.  Running domains
 127 * will have a negotiated version with the same major number and a minor
 128 * number equal or smaller.  The dlm_ctxt->dlm_locking_proto field should
 129 * be used to determine what a running domain is actually using.
 130 *
 131 * New in version 1.1:
 132 *      - Message DLM_QUERY_REGION added to support global heartbeat
 133 *      - Message DLM_QUERY_NODEINFO added to allow online node removes
 134 * New in version 1.2:
 135 *      - Message DLM_BEGIN_EXIT_DOMAIN_MSG added to mark start of exit domain
 136 * New in version 1.3:
 137 *      - Message DLM_DEREF_LOCKRES_DONE added to inform non-master that the
 138 *        refmap is cleared
 139 */
 140static const struct dlm_protocol_version dlm_protocol = {
 141        .pv_major = 1,
 142        .pv_minor = 3,
 143};
 144
 145#define DLM_DOMAIN_BACKOFF_MS 200
 146
 147static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
 148                                  void **ret_data);
 149static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
 150                                     void **ret_data);
 151static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
 152                                   void **ret_data);
 153static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
 154                                    void *data, void **ret_data);
 155static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
 156                                   void **ret_data);
 157static int dlm_protocol_compare(struct dlm_protocol_version *existing,
 158                                struct dlm_protocol_version *request);
 159
 160static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
 161
 162void __dlm_unhash_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 163{
 164        if (hlist_unhashed(&res->hash_node))
 165                return;
 166
 167        mlog(0, "%s: Unhash res %.*s\n", dlm->name, res->lockname.len,
 168             res->lockname.name);
 169        hlist_del_init(&res->hash_node);
 170        dlm_lockres_put(res);
 171}
 172
 173void __dlm_insert_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 174{
 175        struct hlist_head *bucket;
 176
 177        assert_spin_locked(&dlm->spinlock);
 178
 179        bucket = dlm_lockres_hash(dlm, res->lockname.hash);
 180
 181        /* get a reference for our hashtable */
 182        dlm_lockres_get(res);
 183
 184        hlist_add_head(&res->hash_node, bucket);
 185
 186        mlog(0, "%s: Hash res %.*s\n", dlm->name, res->lockname.len,
 187             res->lockname.name);
 188}
 189
 190struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
 191                                                     const char *name,
 192                                                     unsigned int len,
 193                                                     unsigned int hash)
 194{
 195        struct hlist_head *bucket;
 196        struct dlm_lock_resource *res;
 197
 198        mlog(0, "%.*s\n", len, name);
 199
 200        assert_spin_locked(&dlm->spinlock);
 201
 202        bucket = dlm_lockres_hash(dlm, hash);
 203
 204        hlist_for_each_entry(res, bucket, hash_node) {
 205                if (res->lockname.name[0] != name[0])
 206                        continue;
 207                if (unlikely(res->lockname.len != len))
 208                        continue;
 209                if (memcmp(res->lockname.name + 1, name + 1, len - 1))
 210                        continue;
 211                dlm_lockres_get(res);
 212                return res;
 213        }
 214        return NULL;
 215}
 216
 217/* intended to be called by functions which do not care about lock
 218 * resources which are being purged (most net _handler functions).
 219 * this will return NULL for any lock resource which is found but
 220 * currently in the process of dropping its mastery reference.
 221 * use __dlm_lookup_lockres_full when you need the lock resource
 222 * regardless (e.g. dlm_get_lock_resource) */
 223struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
 224                                                const char *name,
 225                                                unsigned int len,
 226                                                unsigned int hash)
 227{
 228        struct dlm_lock_resource *res = NULL;
 229
 230        mlog(0, "%.*s\n", len, name);
 231
 232        assert_spin_locked(&dlm->spinlock);
 233
 234        res = __dlm_lookup_lockres_full(dlm, name, len, hash);
 235        if (res) {
 236                spin_lock(&res->spinlock);
 237                if (res->state & DLM_LOCK_RES_DROPPING_REF) {
 238                        spin_unlock(&res->spinlock);
 239                        dlm_lockres_put(res);
 240                        return NULL;
 241                }
 242                spin_unlock(&res->spinlock);
 243        }
 244
 245        return res;
 246}
 247
 248struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
 249                                    const char *name,
 250                                    unsigned int len)
 251{
 252        struct dlm_lock_resource *res;
 253        unsigned int hash = dlm_lockid_hash(name, len);
 254
 255        spin_lock(&dlm->spinlock);
 256        res = __dlm_lookup_lockres(dlm, name, len, hash);
 257        spin_unlock(&dlm->spinlock);
 258        return res;
 259}
 260
 261static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len)
 262{
 263        struct dlm_ctxt *tmp;
 264
 265        assert_spin_locked(&dlm_domain_lock);
 266
 267        /* tmp->name here is always NULL terminated,
 268         * but domain may not be! */
 269        list_for_each_entry(tmp, &dlm_domains, list) {
 270                if (strlen(tmp->name) == len &&
 271                    memcmp(tmp->name, domain, len)==0)
 272                        return tmp;
 273        }
 274
 275        return NULL;
 276}
 277
 278/* For null terminated domain strings ONLY */
 279static struct dlm_ctxt * __dlm_lookup_domain(const char *domain)
 280{
 281        assert_spin_locked(&dlm_domain_lock);
 282
 283        return __dlm_lookup_domain_full(domain, strlen(domain));
 284}
 285
 286
 287/* returns true on one of two conditions:
 288 * 1) the domain does not exist
 289 * 2) the domain exists and it's state is "joined" */
 290static int dlm_wait_on_domain_helper(const char *domain)
 291{
 292        int ret = 0;
 293        struct dlm_ctxt *tmp = NULL;
 294
 295        spin_lock(&dlm_domain_lock);
 296
 297        tmp = __dlm_lookup_domain(domain);
 298        if (!tmp)
 299                ret = 1;
 300        else if (tmp->dlm_state == DLM_CTXT_JOINED)
 301                ret = 1;
 302
 303        spin_unlock(&dlm_domain_lock);
 304        return ret;
 305}
 306
 307static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
 308{
 309        dlm_destroy_debugfs_subroot(dlm);
 310
 311        if (dlm->lockres_hash)
 312                dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
 313
 314        if (dlm->master_hash)
 315                dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES);
 316
 317        kfree(dlm->name);
 318        kfree(dlm);
 319}
 320
 321/* A little strange - this function will be called while holding
 322 * dlm_domain_lock and is expected to be holding it on the way out. We
 323 * will however drop and reacquire it multiple times */
 324static void dlm_ctxt_release(struct kref *kref)
 325{
 326        struct dlm_ctxt *dlm;
 327
 328        dlm = container_of(kref, struct dlm_ctxt, dlm_refs);
 329
 330        BUG_ON(dlm->num_joins);
 331        BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED);
 332
 333        /* we may still be in the list if we hit an error during join. */
 334        list_del_init(&dlm->list);
 335
 336        spin_unlock(&dlm_domain_lock);
 337
 338        mlog(0, "freeing memory from domain %s\n", dlm->name);
 339
 340        wake_up(&dlm_domain_events);
 341
 342        dlm_free_ctxt_mem(dlm);
 343
 344        spin_lock(&dlm_domain_lock);
 345}
 346
 347void dlm_put(struct dlm_ctxt *dlm)
 348{
 349        spin_lock(&dlm_domain_lock);
 350        kref_put(&dlm->dlm_refs, dlm_ctxt_release);
 351        spin_unlock(&dlm_domain_lock);
 352}
 353
 354static void __dlm_get(struct dlm_ctxt *dlm)
 355{
 356        kref_get(&dlm->dlm_refs);
 357}
 358
 359/* given a questionable reference to a dlm object, gets a reference if
 360 * it can find it in the list, otherwise returns NULL in which case
 361 * you shouldn't trust your pointer. */
 362struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm)
 363{
 364        struct dlm_ctxt *target;
 365        struct dlm_ctxt *ret = NULL;
 366
 367        spin_lock(&dlm_domain_lock);
 368
 369        list_for_each_entry(target, &dlm_domains, list) {
 370                if (target == dlm) {
 371                        __dlm_get(target);
 372                        ret = target;
 373                        break;
 374                }
 375        }
 376
 377        spin_unlock(&dlm_domain_lock);
 378
 379        return ret;
 380}
 381
 382int dlm_domain_fully_joined(struct dlm_ctxt *dlm)
 383{
 384        int ret;
 385
 386        spin_lock(&dlm_domain_lock);
 387        ret = (dlm->dlm_state == DLM_CTXT_JOINED) ||
 388                (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN);
 389        spin_unlock(&dlm_domain_lock);
 390
 391        return ret;
 392}
 393
 394static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm)
 395{
 396        if (dlm->dlm_worker) {
 397                destroy_workqueue(dlm->dlm_worker);
 398                dlm->dlm_worker = NULL;
 399        }
 400}
 401
 402static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
 403{
 404        dlm_unregister_domain_handlers(dlm);
 405        dlm_debug_shutdown(dlm);
 406        dlm_complete_thread(dlm);
 407        dlm_complete_recovery_thread(dlm);
 408        dlm_destroy_dlm_worker(dlm);
 409
 410        /* We've left the domain. Now we can take ourselves out of the
 411         * list and allow the kref stuff to help us free the
 412         * memory. */
 413        spin_lock(&dlm_domain_lock);
 414        list_del_init(&dlm->list);
 415        spin_unlock(&dlm_domain_lock);
 416
 417        /* Wake up anyone waiting for us to remove this domain */
 418        wake_up(&dlm_domain_events);
 419}
 420
 421static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
 422{
 423        int i, num, n, ret = 0;
 424        struct dlm_lock_resource *res;
 425        struct hlist_node *iter;
 426        struct hlist_head *bucket;
 427        int dropped;
 428
 429        mlog(0, "Migrating locks from domain %s\n", dlm->name);
 430
 431        num = 0;
 432        spin_lock(&dlm->spinlock);
 433        for (i = 0; i < DLM_HASH_BUCKETS; i++) {
 434redo_bucket:
 435                n = 0;
 436                bucket = dlm_lockres_hash(dlm, i);
 437                iter = bucket->first;
 438                while (iter) {
 439                        n++;
 440                        res = hlist_entry(iter, struct dlm_lock_resource,
 441                                          hash_node);
 442                        dlm_lockres_get(res);
 443                        /* migrate, if necessary.  this will drop the dlm
 444                         * spinlock and retake it if it does migration. */
 445                        dropped = dlm_empty_lockres(dlm, res);
 446
 447                        spin_lock(&res->spinlock);
 448                        if (dropped)
 449                                __dlm_lockres_calc_usage(dlm, res);
 450                        else
 451                                iter = res->hash_node.next;
 452                        spin_unlock(&res->spinlock);
 453
 454                        dlm_lockres_put(res);
 455
 456                        if (dropped) {
 457                                cond_resched_lock(&dlm->spinlock);
 458                                goto redo_bucket;
 459                        }
 460                }
 461                cond_resched_lock(&dlm->spinlock);
 462                num += n;
 463        }
 464        spin_unlock(&dlm->spinlock);
 465        wake_up(&dlm->dlm_thread_wq);
 466
 467        /* let the dlm thread take care of purging, keep scanning until
 468         * nothing remains in the hash */
 469        if (num) {
 470                mlog(0, "%s: %d lock resources in hash last pass\n",
 471                     dlm->name, num);
 472                ret = -EAGAIN;
 473        }
 474        mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
 475        return ret;
 476}
 477
 478static int dlm_no_joining_node(struct dlm_ctxt *dlm)
 479{
 480        int ret;
 481
 482        spin_lock(&dlm->spinlock);
 483        ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN;
 484        spin_unlock(&dlm->spinlock);
 485
 486        return ret;
 487}
 488
 489static int dlm_begin_exit_domain_handler(struct o2net_msg *msg, u32 len,
 490                                         void *data, void **ret_data)
 491{
 492        struct dlm_ctxt *dlm = data;
 493        unsigned int node;
 494        struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
 495
 496        if (!dlm_grab(dlm))
 497                return 0;
 498
 499        node = exit_msg->node_idx;
 500        mlog(0, "%s: Node %u sent a begin exit domain message\n", dlm->name, node);
 501
 502        spin_lock(&dlm->spinlock);
 503        set_bit(node, dlm->exit_domain_map);
 504        spin_unlock(&dlm->spinlock);
 505
 506        dlm_put(dlm);
 507
 508        return 0;
 509}
 510
 511static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm)
 512{
 513        /* Yikes, a double spinlock! I need domain_lock for the dlm
 514         * state and the dlm spinlock for join state... Sorry! */
 515again:
 516        spin_lock(&dlm_domain_lock);
 517        spin_lock(&dlm->spinlock);
 518
 519        if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
 520                mlog(0, "Node %d is joining, we wait on it.\n",
 521                          dlm->joining_node);
 522                spin_unlock(&dlm->spinlock);
 523                spin_unlock(&dlm_domain_lock);
 524
 525                wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm));
 526                goto again;
 527        }
 528
 529        dlm->dlm_state = DLM_CTXT_LEAVING;
 530        spin_unlock(&dlm->spinlock);
 531        spin_unlock(&dlm_domain_lock);
 532}
 533
 534static void __dlm_print_nodes(struct dlm_ctxt *dlm)
 535{
 536        int node = -1, num = 0;
 537
 538        assert_spin_locked(&dlm->spinlock);
 539
 540        printk("( ");
 541        while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
 542                                     node + 1)) < O2NM_MAX_NODES) {
 543                printk("%d ", node);
 544                ++num;
 545        }
 546        printk(") %u nodes\n", num);
 547}
 548
 549static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
 550                                   void **ret_data)
 551{
 552        struct dlm_ctxt *dlm = data;
 553        unsigned int node;
 554        struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
 555
 556        mlog(0, "%p %u %p", msg, len, data);
 557
 558        if (!dlm_grab(dlm))
 559                return 0;
 560
 561        node = exit_msg->node_idx;
 562
 563        spin_lock(&dlm->spinlock);
 564        clear_bit(node, dlm->domain_map);
 565        clear_bit(node, dlm->exit_domain_map);
 566        printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s ", node, dlm->name);
 567        __dlm_print_nodes(dlm);
 568
 569        /* notify anything attached to the heartbeat events */
 570        dlm_hb_event_notify_attached(dlm, node, 0);
 571
 572        spin_unlock(&dlm->spinlock);
 573
 574        dlm_put(dlm);
 575
 576        return 0;
 577}
 578
 579static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm, u32 msg_type,
 580                                    unsigned int node)
 581{
 582        int status;
 583        struct dlm_exit_domain leave_msg;
 584
 585        mlog(0, "%s: Sending domain exit message %u to node %u\n", dlm->name,
 586             msg_type, node);
 587
 588        memset(&leave_msg, 0, sizeof(leave_msg));
 589        leave_msg.node_idx = dlm->node_num;
 590
 591        status = o2net_send_message(msg_type, dlm->key, &leave_msg,
 592                                    sizeof(leave_msg), node, NULL);
 593        if (status < 0)
 594                mlog(ML_ERROR, "Error %d sending domain exit message %u "
 595                     "to node %u on domain %s\n", status, msg_type, node,
 596                     dlm->name);
 597
 598        return status;
 599}
 600
 601static void dlm_begin_exit_domain(struct dlm_ctxt *dlm)
 602{
 603        int node = -1;
 604
 605        /* Support for begin exit domain was added in 1.2 */
 606        if (dlm->dlm_locking_proto.pv_major == 1 &&
 607            dlm->dlm_locking_proto.pv_minor < 2)
 608                return;
 609
 610        /*
 611         * Unlike DLM_EXIT_DOMAIN_MSG, DLM_BEGIN_EXIT_DOMAIN_MSG is purely
 612         * informational. Meaning if a node does not receive the message,
 613         * so be it.
 614         */
 615        spin_lock(&dlm->spinlock);
 616        while (1) {
 617                node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, node + 1);
 618                if (node >= O2NM_MAX_NODES)
 619                        break;
 620                if (node == dlm->node_num)
 621                        continue;
 622
 623                spin_unlock(&dlm->spinlock);
 624                dlm_send_one_domain_exit(dlm, DLM_BEGIN_EXIT_DOMAIN_MSG, node);
 625                spin_lock(&dlm->spinlock);
 626        }
 627        spin_unlock(&dlm->spinlock);
 628}
 629
 630static void dlm_leave_domain(struct dlm_ctxt *dlm)
 631{
 632        int node, clear_node, status;
 633
 634        /* At this point we've migrated away all our locks and won't
 635         * accept mastership of new ones. The dlm is responsible for
 636         * almost nothing now. We make sure not to confuse any joining
 637         * nodes and then commence shutdown procedure. */
 638
 639        spin_lock(&dlm->spinlock);
 640        /* Clear ourselves from the domain map */
 641        clear_bit(dlm->node_num, dlm->domain_map);
 642        while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
 643                                     0)) < O2NM_MAX_NODES) {
 644                /* Drop the dlm spinlock. This is safe wrt the domain_map.
 645                 * -nodes cannot be added now as the
 646                 *   query_join_handlers knows to respond with OK_NO_MAP
 647                 * -we catch the right network errors if a node is
 648                 *   removed from the map while we're sending him the
 649                 *   exit message. */
 650                spin_unlock(&dlm->spinlock);
 651
 652                clear_node = 1;
 653
 654                status = dlm_send_one_domain_exit(dlm, DLM_EXIT_DOMAIN_MSG,
 655                                                  node);
 656                if (status < 0 &&
 657                    status != -ENOPROTOOPT &&
 658                    status != -ENOTCONN) {
 659                        mlog(ML_NOTICE, "Error %d sending domain exit message "
 660                             "to node %d\n", status, node);
 661
 662                        /* Not sure what to do here but lets sleep for
 663                         * a bit in case this was a transient
 664                         * error... */
 665                        msleep(DLM_DOMAIN_BACKOFF_MS);
 666                        clear_node = 0;
 667                }
 668
 669                spin_lock(&dlm->spinlock);
 670                /* If we're not clearing the node bit then we intend
 671                 * to loop back around to try again. */
 672                if (clear_node)
 673                        clear_bit(node, dlm->domain_map);
 674        }
 675        spin_unlock(&dlm->spinlock);
 676}
 677
 678int dlm_shutting_down(struct dlm_ctxt *dlm)
 679{
 680        int ret = 0;
 681
 682        spin_lock(&dlm_domain_lock);
 683
 684        if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN)
 685                ret = 1;
 686
 687        spin_unlock(&dlm_domain_lock);
 688
 689        return ret;
 690}
 691
 692void dlm_unregister_domain(struct dlm_ctxt *dlm)
 693{
 694        int leave = 0;
 695        struct dlm_lock_resource *res;
 696
 697        spin_lock(&dlm_domain_lock);
 698        BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED);
 699        BUG_ON(!dlm->num_joins);
 700
 701        dlm->num_joins--;
 702        if (!dlm->num_joins) {
 703                /* We mark it "in shutdown" now so new register
 704                 * requests wait until we've completely left the
 705                 * domain. Don't use DLM_CTXT_LEAVING yet as we still
 706                 * want new domain joins to communicate with us at
 707                 * least until we've completed migration of our
 708                 * resources. */
 709                dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN;
 710                leave = 1;
 711        }
 712        spin_unlock(&dlm_domain_lock);
 713
 714        if (leave) {
 715                mlog(0, "shutting down domain %s\n", dlm->name);
 716                dlm_begin_exit_domain(dlm);
 717
 718                /* We changed dlm state, notify the thread */
 719                dlm_kick_thread(dlm, NULL);
 720
 721                while (dlm_migrate_all_locks(dlm)) {
 722                        /* Give dlm_thread time to purge the lockres' */
 723                        msleep(500);
 724                        mlog(0, "%s: more migration to do\n", dlm->name);
 725                }
 726
 727                /* This list should be empty. If not, print remaining lockres */
 728                if (!list_empty(&dlm->tracking_list)) {
 729                        mlog(ML_ERROR, "Following lockres' are still on the "
 730                             "tracking list:\n");
 731                        list_for_each_entry(res, &dlm->tracking_list, tracking)
 732                                dlm_print_one_lock_resource(res);
 733                }
 734
 735                dlm_mark_domain_leaving(dlm);
 736                dlm_leave_domain(dlm);
 737                printk(KERN_NOTICE "o2dlm: Leaving domain %s\n", dlm->name);
 738                dlm_force_free_mles(dlm);
 739                dlm_complete_dlm_shutdown(dlm);
 740        }
 741        dlm_put(dlm);
 742}
 743EXPORT_SYMBOL_GPL(dlm_unregister_domain);
 744
 745static int dlm_query_join_proto_check(char *proto_type, int node,
 746                                      struct dlm_protocol_version *ours,
 747                                      struct dlm_protocol_version *request)
 748{
 749        int rc;
 750        struct dlm_protocol_version proto = *request;
 751
 752        if (!dlm_protocol_compare(ours, &proto)) {
 753                mlog(0,
 754                     "node %u wanted to join with %s locking protocol "
 755                     "%u.%u, we respond with %u.%u\n",
 756                     node, proto_type,
 757                     request->pv_major,
 758                     request->pv_minor,
 759                     proto.pv_major, proto.pv_minor);
 760                request->pv_minor = proto.pv_minor;
 761                rc = 0;
 762        } else {
 763                mlog(ML_NOTICE,
 764                     "Node %u wanted to join with %s locking "
 765                     "protocol %u.%u, but we have %u.%u, disallowing\n",
 766                     node, proto_type,
 767                     request->pv_major,
 768                     request->pv_minor,
 769                     ours->pv_major,
 770                     ours->pv_minor);
 771                rc = 1;
 772        }
 773
 774        return rc;
 775}
 776
 777/*
 778 * struct dlm_query_join_packet is made up of four one-byte fields.  They
 779 * are effectively in big-endian order already.  However, little-endian
 780 * machines swap them before putting the packet on the wire (because
 781 * query_join's response is a status, and that status is treated as a u32
 782 * on the wire).  Thus, a big-endian and little-endian machines will treat
 783 * this structure differently.
 784 *
 785 * The solution is to have little-endian machines swap the structure when
 786 * converting from the structure to the u32 representation.  This will
 787 * result in the structure having the correct format on the wire no matter
 788 * the host endian format.
 789 */
 790static void dlm_query_join_packet_to_wire(struct dlm_query_join_packet *packet,
 791                                          u32 *wire)
 792{
 793        union dlm_query_join_response response;
 794
 795        response.packet = *packet;
 796        *wire = be32_to_cpu(response.intval);
 797}
 798
 799static void dlm_query_join_wire_to_packet(u32 wire,
 800                                          struct dlm_query_join_packet *packet)
 801{
 802        union dlm_query_join_response response;
 803
 804        response.intval = cpu_to_be32(wire);
 805        *packet = response.packet;
 806}
 807
 808static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
 809                                  void **ret_data)
 810{
 811        struct dlm_query_join_request *query;
 812        struct dlm_query_join_packet packet = {
 813                .code = JOIN_DISALLOW,
 814        };
 815        struct dlm_ctxt *dlm = NULL;
 816        u32 response;
 817        u8 nodenum;
 818
 819        query = (struct dlm_query_join_request *) msg->buf;
 820
 821        mlog(0, "node %u wants to join domain %s\n", query->node_idx,
 822                  query->domain);
 823
 824        /*
 825         * If heartbeat doesn't consider the node live, tell it
 826         * to back off and try again.  This gives heartbeat a chance
 827         * to catch up.
 828         */
 829        if (!o2hb_check_node_heartbeating_no_sem(query->node_idx)) {
 830                mlog(0, "node %u is not in our live map yet\n",
 831                     query->node_idx);
 832
 833                packet.code = JOIN_DISALLOW;
 834                goto respond;
 835        }
 836
 837        packet.code = JOIN_OK_NO_MAP;
 838
 839        spin_lock(&dlm_domain_lock);
 840        dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
 841        if (!dlm)
 842                goto unlock_respond;
 843
 844        /*
 845         * There is a small window where the joining node may not see the
 846         * node(s) that just left but still part of the cluster. DISALLOW
 847         * join request if joining node has different node map.
 848         */
 849        nodenum=0;
 850        while (nodenum < O2NM_MAX_NODES) {
 851                if (test_bit(nodenum, dlm->domain_map)) {
 852                        if (!byte_test_bit(nodenum, query->node_map)) {
 853                                mlog(0, "disallow join as node %u does not "
 854                                     "have node %u in its nodemap\n",
 855                                     query->node_idx, nodenum);
 856                                packet.code = JOIN_DISALLOW;
 857                                goto unlock_respond;
 858                        }
 859                }
 860                nodenum++;
 861        }
 862
 863        /* Once the dlm ctxt is marked as leaving then we don't want
 864         * to be put in someone's domain map.
 865         * Also, explicitly disallow joining at certain troublesome
 866         * times (ie. during recovery). */
 867        if (dlm->dlm_state != DLM_CTXT_LEAVING) {
 868                int bit = query->node_idx;
 869                spin_lock(&dlm->spinlock);
 870
 871                if (dlm->dlm_state == DLM_CTXT_NEW &&
 872                    dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) {
 873                        /*If this is a brand new context and we
 874                         * haven't started our join process yet, then
 875                         * the other node won the race. */
 876                        packet.code = JOIN_OK_NO_MAP;
 877                } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
 878                        /* Disallow parallel joins. */
 879                        packet.code = JOIN_DISALLOW;
 880                } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
 881                        mlog(0, "node %u trying to join, but recovery "
 882                             "is ongoing.\n", bit);
 883                        packet.code = JOIN_DISALLOW;
 884                } else if (test_bit(bit, dlm->recovery_map)) {
 885                        mlog(0, "node %u trying to join, but it "
 886                             "still needs recovery.\n", bit);
 887                        packet.code = JOIN_DISALLOW;
 888                } else if (test_bit(bit, dlm->domain_map)) {
 889                        mlog(0, "node %u trying to join, but it "
 890                             "is still in the domain! needs recovery?\n",
 891                             bit);
 892                        packet.code = JOIN_DISALLOW;
 893                } else {
 894                        /* Alright we're fully a part of this domain
 895                         * so we keep some state as to who's joining
 896                         * and indicate to him that needs to be fixed
 897                         * up. */
 898
 899                        /* Make sure we speak compatible locking protocols.  */
 900                        if (dlm_query_join_proto_check("DLM", bit,
 901                                                       &dlm->dlm_locking_proto,
 902                                                       &query->dlm_proto)) {
 903                                packet.code = JOIN_PROTOCOL_MISMATCH;
 904                        } else if (dlm_query_join_proto_check("fs", bit,
 905                                                              &dlm->fs_locking_proto,
 906                                                              &query->fs_proto)) {
 907                                packet.code = JOIN_PROTOCOL_MISMATCH;
 908                        } else {
 909                                packet.dlm_minor = query->dlm_proto.pv_minor;
 910                                packet.fs_minor = query->fs_proto.pv_minor;
 911                                packet.code = JOIN_OK;
 912                                __dlm_set_joining_node(dlm, query->node_idx);
 913                        }
 914                }
 915
 916                spin_unlock(&dlm->spinlock);
 917        }
 918unlock_respond:
 919        spin_unlock(&dlm_domain_lock);
 920
 921respond:
 922        mlog(0, "We respond with %u\n", packet.code);
 923
 924        dlm_query_join_packet_to_wire(&packet, &response);
 925        return response;
 926}
 927
 928static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
 929                                     void **ret_data)
 930{
 931        struct dlm_assert_joined *assert;
 932        struct dlm_ctxt *dlm = NULL;
 933
 934        assert = (struct dlm_assert_joined *) msg->buf;
 935
 936        mlog(0, "node %u asserts join on domain %s\n", assert->node_idx,
 937                  assert->domain);
 938
 939        spin_lock(&dlm_domain_lock);
 940        dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len);
 941        /* XXX should we consider no dlm ctxt an error? */
 942        if (dlm) {
 943                spin_lock(&dlm->spinlock);
 944
 945                /* Alright, this node has officially joined our
 946                 * domain. Set him in the map and clean up our
 947                 * leftover join state. */
 948                BUG_ON(dlm->joining_node != assert->node_idx);
 949
 950                if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
 951                        mlog(0, "dlm recovery is ongoing, disallow join\n");
 952                        spin_unlock(&dlm->spinlock);
 953                        spin_unlock(&dlm_domain_lock);
 954                        return -EAGAIN;
 955                }
 956
 957                set_bit(assert->node_idx, dlm->domain_map);
 958                clear_bit(assert->node_idx, dlm->exit_domain_map);
 959                __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
 960
 961                printk(KERN_NOTICE "o2dlm: Node %u joins domain %s ",
 962                       assert->node_idx, dlm->name);
 963                __dlm_print_nodes(dlm);
 964
 965                /* notify anything attached to the heartbeat events */
 966                dlm_hb_event_notify_attached(dlm, assert->node_idx, 1);
 967
 968                spin_unlock(&dlm->spinlock);
 969        }
 970        spin_unlock(&dlm_domain_lock);
 971
 972        return 0;
 973}
 974
 975static int dlm_match_regions(struct dlm_ctxt *dlm,
 976                             struct dlm_query_region *qr,
 977                             char *local, int locallen)
 978{
 979        char *remote = qr->qr_regions;
 980        char *l, *r;
 981        int localnr, i, j, foundit;
 982        int status = 0;
 983
 984        if (!o2hb_global_heartbeat_active()) {
 985                if (qr->qr_numregions) {
 986                        mlog(ML_ERROR, "Domain %s: Joining node %d has global "
 987                             "heartbeat enabled but local node %d does not\n",
 988                             qr->qr_domain, qr->qr_node, dlm->node_num);
 989                        status = -EINVAL;
 990                }
 991                goto bail;
 992        }
 993
 994        if (o2hb_global_heartbeat_active() && !qr->qr_numregions) {
 995                mlog(ML_ERROR, "Domain %s: Local node %d has global "
 996                     "heartbeat enabled but joining node %d does not\n",
 997                     qr->qr_domain, dlm->node_num, qr->qr_node);
 998                status = -EINVAL;
 999                goto bail;
1000        }
1001
1002        r = remote;
1003        for (i = 0; i < qr->qr_numregions; ++i) {
1004                mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, r);
1005                r += O2HB_MAX_REGION_NAME_LEN;
1006        }
1007
1008        localnr = min(O2NM_MAX_REGIONS, locallen/O2HB_MAX_REGION_NAME_LEN);
1009        localnr = o2hb_get_all_regions(local, (u8)localnr);
1010
1011        /* compare local regions with remote */
1012        l = local;
1013        for (i = 0; i < localnr; ++i) {
1014                foundit = 0;
1015                r = remote;
1016                for (j = 0; j <= qr->qr_numregions; ++j) {
1017                        if (!memcmp(l, r, O2HB_MAX_REGION_NAME_LEN)) {
1018                                foundit = 1;
1019                                break;
1020                        }
1021                        r += O2HB_MAX_REGION_NAME_LEN;
1022                }
1023                if (!foundit) {
1024                        status = -EINVAL;
1025                        mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
1026                             "in local node %d but not in joining node %d\n",
1027                             qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, l,
1028                             dlm->node_num, qr->qr_node);
1029                        goto bail;
1030                }
1031                l += O2HB_MAX_REGION_NAME_LEN;
1032        }
1033
1034        /* compare remote with local regions */
1035        r = remote;
1036        for (i = 0; i < qr->qr_numregions; ++i) {
1037                foundit = 0;
1038                l = local;
1039                for (j = 0; j < localnr; ++j) {
1040                        if (!memcmp(r, l, O2HB_MAX_REGION_NAME_LEN)) {
1041                                foundit = 1;
1042                                break;
1043                        }
1044                        l += O2HB_MAX_REGION_NAME_LEN;
1045                }
1046                if (!foundit) {
1047                        status = -EINVAL;
1048                        mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
1049                             "in joining node %d but not in local node %d\n",
1050                             qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, r,
1051                             qr->qr_node, dlm->node_num);
1052                        goto bail;
1053                }
1054                r += O2HB_MAX_REGION_NAME_LEN;
1055        }
1056
1057bail:
1058        return status;
1059}
1060
1061static int dlm_send_regions(struct dlm_ctxt *dlm, unsigned long *node_map)
1062{
1063        struct dlm_query_region *qr = NULL;
1064        int status, ret = 0, i;
1065        char *p;
1066
1067        if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1068                goto bail;
1069
1070        qr = kzalloc(sizeof(struct dlm_query_region), GFP_KERNEL);
1071        if (!qr) {
1072                ret = -ENOMEM;
1073                mlog_errno(ret);
1074                goto bail;
1075        }
1076
1077        qr->qr_node = dlm->node_num;
1078        qr->qr_namelen = strlen(dlm->name);
1079        memcpy(qr->qr_domain, dlm->name, qr->qr_namelen);
1080        /* if local hb, the numregions will be zero */
1081        if (o2hb_global_heartbeat_active())
1082                qr->qr_numregions = o2hb_get_all_regions(qr->qr_regions,
1083                                                         O2NM_MAX_REGIONS);
1084
1085        p = qr->qr_regions;
1086        for (i = 0; i < qr->qr_numregions; ++i, p += O2HB_MAX_REGION_NAME_LEN)
1087                mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, p);
1088
1089        i = -1;
1090        while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1091                                  i + 1)) < O2NM_MAX_NODES) {
1092                if (i == dlm->node_num)
1093                        continue;
1094
1095                mlog(0, "Sending regions to node %d\n", i);
1096
1097                ret = o2net_send_message(DLM_QUERY_REGION, DLM_MOD_KEY, qr,
1098                                         sizeof(struct dlm_query_region),
1099                                         i, &status);
1100                if (ret >= 0)
1101                        ret = status;
1102                if (ret) {
1103                        mlog(ML_ERROR, "Region mismatch %d, node %d\n",
1104                             ret, i);
1105                        break;
1106                }
1107        }
1108
1109bail:
1110        kfree(qr);
1111        return ret;
1112}
1113
1114static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
1115                                    void *data, void **ret_data)
1116{
1117        struct dlm_query_region *qr;
1118        struct dlm_ctxt *dlm = NULL;
1119        char *local = NULL;
1120        int status = 0;
1121
1122        qr = (struct dlm_query_region *) msg->buf;
1123
1124        mlog(0, "Node %u queries hb regions on domain %s\n", qr->qr_node,
1125             qr->qr_domain);
1126
1127        /* buffer used in dlm_mast_regions() */
1128        local = kmalloc(sizeof(qr->qr_regions), GFP_KERNEL);
1129        if (!local)
1130                return -ENOMEM;
1131
1132        status = -EINVAL;
1133
1134        spin_lock(&dlm_domain_lock);
1135        dlm = __dlm_lookup_domain_full(qr->qr_domain, qr->qr_namelen);
1136        if (!dlm) {
1137                mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1138                     "before join domain\n", qr->qr_node, qr->qr_domain);
1139                goto out_domain_lock;
1140        }
1141
1142        spin_lock(&dlm->spinlock);
1143        if (dlm->joining_node != qr->qr_node) {
1144                mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1145                     "but joining node is %d\n", qr->qr_node, qr->qr_domain,
1146                     dlm->joining_node);
1147                goto out_dlm_lock;
1148        }
1149
1150        /* Support for global heartbeat was added in 1.1 */
1151        if (dlm->dlm_locking_proto.pv_major == 1 &&
1152            dlm->dlm_locking_proto.pv_minor == 0) {
1153                mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1154                     "but active dlm protocol is %d.%d\n", qr->qr_node,
1155                     qr->qr_domain, dlm->dlm_locking_proto.pv_major,
1156                     dlm->dlm_locking_proto.pv_minor);
1157                goto out_dlm_lock;
1158        }
1159
1160        status = dlm_match_regions(dlm, qr, local, sizeof(qr->qr_regions));
1161
1162out_dlm_lock:
1163        spin_unlock(&dlm->spinlock);
1164
1165out_domain_lock:
1166        spin_unlock(&dlm_domain_lock);
1167
1168        kfree(local);
1169
1170        return status;
1171}
1172
1173static int dlm_match_nodes(struct dlm_ctxt *dlm, struct dlm_query_nodeinfo *qn)
1174{
1175        struct o2nm_node *local;
1176        struct dlm_node_info *remote;
1177        int i, j;
1178        int status = 0;
1179
1180        for (j = 0; j < qn->qn_numnodes; ++j)
1181                mlog(0, "Node %3d, %pI4:%u\n", qn->qn_nodes[j].ni_nodenum,
1182                     &(qn->qn_nodes[j].ni_ipv4_address),
1183                     ntohs(qn->qn_nodes[j].ni_ipv4_port));
1184
1185        for (i = 0; i < O2NM_MAX_NODES && !status; ++i) {
1186                local = o2nm_get_node_by_num(i);
1187                remote = NULL;
1188                for (j = 0; j < qn->qn_numnodes; ++j) {
1189                        if (qn->qn_nodes[j].ni_nodenum == i) {
1190                                remote = &(qn->qn_nodes[j]);
1191                                break;
1192                        }
1193                }
1194
1195                if (!local && !remote)
1196                        continue;
1197
1198                if ((local && !remote) || (!local && remote))
1199                        status = -EINVAL;
1200
1201                if (!status &&
1202                    ((remote->ni_nodenum != local->nd_num) ||
1203                     (remote->ni_ipv4_port != local->nd_ipv4_port) ||
1204                     (remote->ni_ipv4_address != local->nd_ipv4_address)))
1205                        status = -EINVAL;
1206
1207                if (status) {
1208                        if (remote && !local)
1209                                mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
1210                                     "registered in joining node %d but not in "
1211                                     "local node %d\n", qn->qn_domain,
1212                                     remote->ni_nodenum,
1213                                     &(remote->ni_ipv4_address),
1214                                     ntohs(remote->ni_ipv4_port),
1215                                     qn->qn_nodenum, dlm->node_num);
1216                        if (local && !remote)
1217                                mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
1218                                     "registered in local node %d but not in "
1219                                     "joining node %d\n", qn->qn_domain,
1220                                     local->nd_num, &(local->nd_ipv4_address),
1221                                     ntohs(local->nd_ipv4_port),
1222                                     dlm->node_num, qn->qn_nodenum);
1223                        BUG_ON((!local && !remote));
1224                }
1225
1226                if (local)
1227                        o2nm_node_put(local);
1228        }
1229
1230        return status;
1231}
1232
1233static int dlm_send_nodeinfo(struct dlm_ctxt *dlm, unsigned long *node_map)
1234{
1235        struct dlm_query_nodeinfo *qn = NULL;
1236        struct o2nm_node *node;
1237        int ret = 0, status, count, i;
1238
1239        if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1240                goto bail;
1241
1242        qn = kzalloc(sizeof(struct dlm_query_nodeinfo), GFP_KERNEL);
1243        if (!qn) {
1244                ret = -ENOMEM;
1245                mlog_errno(ret);
1246                goto bail;
1247        }
1248
1249        for (i = 0, count = 0; i < O2NM_MAX_NODES; ++i) {
1250                node = o2nm_get_node_by_num(i);
1251                if (!node)
1252                        continue;
1253                qn->qn_nodes[count].ni_nodenum = node->nd_num;
1254                qn->qn_nodes[count].ni_ipv4_port = node->nd_ipv4_port;
1255                qn->qn_nodes[count].ni_ipv4_address = node->nd_ipv4_address;
1256                mlog(0, "Node %3d, %pI4:%u\n", node->nd_num,
1257                     &(node->nd_ipv4_address), ntohs(node->nd_ipv4_port));
1258                ++count;
1259                o2nm_node_put(node);
1260        }
1261
1262        qn->qn_nodenum = dlm->node_num;
1263        qn->qn_numnodes = count;
1264        qn->qn_namelen = strlen(dlm->name);
1265        memcpy(qn->qn_domain, dlm->name, qn->qn_namelen);
1266
1267        i = -1;
1268        while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1269                                  i + 1)) < O2NM_MAX_NODES) {
1270                if (i == dlm->node_num)
1271                        continue;
1272
1273                mlog(0, "Sending nodeinfo to node %d\n", i);
1274
1275                ret = o2net_send_message(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
1276                                         qn, sizeof(struct dlm_query_nodeinfo),
1277                                         i, &status);
1278                if (ret >= 0)
1279                        ret = status;
1280                if (ret) {
1281                        mlog(ML_ERROR, "node mismatch %d, node %d\n", ret, i);
1282                        break;
1283                }
1284        }
1285
1286bail:
1287        kfree(qn);
1288        return ret;
1289}
1290
1291static int dlm_query_nodeinfo_handler(struct o2net_msg *msg, u32 len,
1292                                      void *data, void **ret_data)
1293{
1294        struct dlm_query_nodeinfo *qn;
1295        struct dlm_ctxt *dlm = NULL;
1296        int locked = 0, status = -EINVAL;
1297
1298        qn = (struct dlm_query_nodeinfo *) msg->buf;
1299
1300        mlog(0, "Node %u queries nodes on domain %s\n", qn->qn_nodenum,
1301             qn->qn_domain);
1302
1303        spin_lock(&dlm_domain_lock);
1304        dlm = __dlm_lookup_domain_full(qn->qn_domain, qn->qn_namelen);
1305        if (!dlm) {
1306                mlog(ML_ERROR, "Node %d queried nodes on domain %s before "
1307                     "join domain\n", qn->qn_nodenum, qn->qn_domain);
1308                goto bail;
1309        }
1310
1311        spin_lock(&dlm->spinlock);
1312        locked = 1;
1313        if (dlm->joining_node != qn->qn_nodenum) {
1314                mlog(ML_ERROR, "Node %d queried nodes on domain %s but "
1315                     "joining node is %d\n", qn->qn_nodenum, qn->qn_domain,
1316                     dlm->joining_node);
1317                goto bail;
1318        }
1319
1320        /* Support for node query was added in 1.1 */
1321        if (dlm->dlm_locking_proto.pv_major == 1 &&
1322            dlm->dlm_locking_proto.pv_minor == 0) {
1323                mlog(ML_ERROR, "Node %d queried nodes on domain %s "
1324                     "but active dlm protocol is %d.%d\n", qn->qn_nodenum,
1325                     qn->qn_domain, dlm->dlm_locking_proto.pv_major,
1326                     dlm->dlm_locking_proto.pv_minor);
1327                goto bail;
1328        }
1329
1330        status = dlm_match_nodes(dlm, qn);
1331
1332bail:
1333        if (locked)
1334                spin_unlock(&dlm->spinlock);
1335        spin_unlock(&dlm_domain_lock);
1336
1337        return status;
1338}
1339
1340static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
1341                                   void **ret_data)
1342{
1343        struct dlm_cancel_join *cancel;
1344        struct dlm_ctxt *dlm = NULL;
1345
1346        cancel = (struct dlm_cancel_join *) msg->buf;
1347
1348        mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx,
1349                  cancel->domain);
1350
1351        spin_lock(&dlm_domain_lock);
1352        dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len);
1353
1354        if (dlm) {
1355                spin_lock(&dlm->spinlock);
1356
1357                /* Yikes, this guy wants to cancel his join. No
1358                 * problem, we simply cleanup our join state. */
1359                BUG_ON(dlm->joining_node != cancel->node_idx);
1360                __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1361
1362                spin_unlock(&dlm->spinlock);
1363        }
1364        spin_unlock(&dlm_domain_lock);
1365
1366        return 0;
1367}
1368
1369static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm,
1370                                    unsigned int node)
1371{
1372        int status;
1373        struct dlm_cancel_join cancel_msg;
1374
1375        memset(&cancel_msg, 0, sizeof(cancel_msg));
1376        cancel_msg.node_idx = dlm->node_num;
1377        cancel_msg.name_len = strlen(dlm->name);
1378        memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len);
1379
1380        status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1381                                    &cancel_msg, sizeof(cancel_msg), node,
1382                                    NULL);
1383        if (status < 0) {
1384                mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1385                     "node %u\n", status, DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1386                     node);
1387                goto bail;
1388        }
1389
1390bail:
1391        return status;
1392}
1393
1394/* map_size should be in bytes. */
1395static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
1396                                 unsigned long *node_map,
1397                                 unsigned int map_size)
1398{
1399        int status, tmpstat;
1400        int node;
1401
1402        if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) *
1403                         sizeof(unsigned long))) {
1404                mlog(ML_ERROR,
1405                     "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n",
1406                     map_size, (unsigned)BITS_TO_LONGS(O2NM_MAX_NODES));
1407                return -EINVAL;
1408        }
1409
1410        status = 0;
1411        node = -1;
1412        while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
1413                                     node + 1)) < O2NM_MAX_NODES) {
1414                if (node == dlm->node_num)
1415                        continue;
1416
1417                tmpstat = dlm_send_one_join_cancel(dlm, node);
1418                if (tmpstat) {
1419                        mlog(ML_ERROR, "Error return %d cancelling join on "
1420                             "node %d\n", tmpstat, node);
1421                        if (!status)
1422                                status = tmpstat;
1423                }
1424        }
1425
1426        if (status)
1427                mlog_errno(status);
1428        return status;
1429}
1430
1431static int dlm_request_join(struct dlm_ctxt *dlm,
1432                            int node,
1433                            enum dlm_query_join_response_code *response)
1434{
1435        int status;
1436        struct dlm_query_join_request join_msg;
1437        struct dlm_query_join_packet packet;
1438        u32 join_resp;
1439
1440        mlog(0, "querying node %d\n", node);
1441
1442        memset(&join_msg, 0, sizeof(join_msg));
1443        join_msg.node_idx = dlm->node_num;
1444        join_msg.name_len = strlen(dlm->name);
1445        memcpy(join_msg.domain, dlm->name, join_msg.name_len);
1446        join_msg.dlm_proto = dlm->dlm_locking_proto;
1447        join_msg.fs_proto = dlm->fs_locking_proto;
1448
1449        /* copy live node map to join message */
1450        byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES);
1451
1452        status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
1453                                    sizeof(join_msg), node, &join_resp);
1454        if (status < 0 && status != -ENOPROTOOPT) {
1455                mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1456                     "node %u\n", status, DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1457                     node);
1458                goto bail;
1459        }
1460        dlm_query_join_wire_to_packet(join_resp, &packet);
1461
1462        /* -ENOPROTOOPT from the net code means the other side isn't
1463            listening for our message type -- that's fine, it means
1464            his dlm isn't up, so we can consider him a 'yes' but not
1465            joined into the domain.  */
1466        if (status == -ENOPROTOOPT) {
1467                status = 0;
1468                *response = JOIN_OK_NO_MAP;
1469        } else {
1470                *response = packet.code;
1471                switch (packet.code) {
1472                case JOIN_DISALLOW:
1473                case JOIN_OK_NO_MAP:
1474                        break;
1475                case JOIN_PROTOCOL_MISMATCH:
1476                        mlog(ML_NOTICE,
1477                             "This node requested DLM locking protocol %u.%u and "
1478                             "filesystem locking protocol %u.%u.  At least one of "
1479                             "the protocol versions on node %d is not compatible, "
1480                             "disconnecting\n",
1481                             dlm->dlm_locking_proto.pv_major,
1482                             dlm->dlm_locking_proto.pv_minor,
1483                             dlm->fs_locking_proto.pv_major,
1484                             dlm->fs_locking_proto.pv_minor,
1485                             node);
1486                        status = -EPROTO;
1487                        break;
1488                case JOIN_OK:
1489                        /* Use the same locking protocol as the remote node */
1490                        dlm->dlm_locking_proto.pv_minor = packet.dlm_minor;
1491                        dlm->fs_locking_proto.pv_minor = packet.fs_minor;
1492                        mlog(0,
1493                             "Node %d responds JOIN_OK with DLM locking protocol "
1494                             "%u.%u and fs locking protocol %u.%u\n",
1495                             node,
1496                             dlm->dlm_locking_proto.pv_major,
1497                             dlm->dlm_locking_proto.pv_minor,
1498                             dlm->fs_locking_proto.pv_major,
1499                             dlm->fs_locking_proto.pv_minor);
1500                        break;
1501                default:
1502                        status = -EINVAL;
1503                        mlog(ML_ERROR, "invalid response %d from node %u\n",
1504                             packet.code, node);
1505                        /* Reset response to JOIN_DISALLOW */
1506                        *response = JOIN_DISALLOW;
1507                        break;
1508                }
1509        }
1510
1511        mlog(0, "status %d, node %d response is %d\n", status, node,
1512             *response);
1513
1514bail:
1515        return status;
1516}
1517
1518static int dlm_send_one_join_assert(struct dlm_ctxt *dlm,
1519                                    unsigned int node)
1520{
1521        int status;
1522        int ret;
1523        struct dlm_assert_joined assert_msg;
1524
1525        mlog(0, "Sending join assert to node %u\n", node);
1526
1527        memset(&assert_msg, 0, sizeof(assert_msg));
1528        assert_msg.node_idx = dlm->node_num;
1529        assert_msg.name_len = strlen(dlm->name);
1530        memcpy(assert_msg.domain, dlm->name, assert_msg.name_len);
1531
1532        status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1533                                    &assert_msg, sizeof(assert_msg), node,
1534                                    &ret);
1535        if (status < 0)
1536                mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1537                     "node %u\n", status, DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1538                     node);
1539        else
1540                status = ret;
1541
1542        return status;
1543}
1544
1545static void dlm_send_join_asserts(struct dlm_ctxt *dlm,
1546                                  unsigned long *node_map)
1547{
1548        int status, node, live;
1549
1550        status = 0;
1551        node = -1;
1552        while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
1553                                     node + 1)) < O2NM_MAX_NODES) {
1554                if (node == dlm->node_num)
1555                        continue;
1556
1557                do {
1558                        /* It is very important that this message be
1559                         * received so we spin until either the node
1560                         * has died or it gets the message. */
1561                        status = dlm_send_one_join_assert(dlm, node);
1562
1563                        spin_lock(&dlm->spinlock);
1564                        live = test_bit(node, dlm->live_nodes_map);
1565                        spin_unlock(&dlm->spinlock);
1566
1567                        if (status) {
1568                                mlog(ML_ERROR, "Error return %d asserting "
1569                                     "join on node %d\n", status, node);
1570
1571                                /* give us some time between errors... */
1572                                if (live)
1573                                        msleep(DLM_DOMAIN_BACKOFF_MS);
1574                        }
1575                } while (status && live);
1576        }
1577}
1578
1579struct domain_join_ctxt {
1580        unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1581        unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1582};
1583
1584static int dlm_should_restart_join(struct dlm_ctxt *dlm,
1585                                   struct domain_join_ctxt *ctxt,
1586                                   enum dlm_query_join_response_code response)
1587{
1588        int ret;
1589
1590        if (response == JOIN_DISALLOW) {
1591                mlog(0, "Latest response of disallow -- should restart\n");
1592                return 1;
1593        }
1594
1595        spin_lock(&dlm->spinlock);
1596        /* For now, we restart the process if the node maps have
1597         * changed at all */
1598        ret = memcmp(ctxt->live_map, dlm->live_nodes_map,
1599                     sizeof(dlm->live_nodes_map));
1600        spin_unlock(&dlm->spinlock);
1601
1602        if (ret)
1603                mlog(0, "Node maps changed -- should restart\n");
1604
1605        return ret;
1606}
1607
1608static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
1609{
1610        int status = 0, tmpstat, node;
1611        struct domain_join_ctxt *ctxt;
1612        enum dlm_query_join_response_code response = JOIN_DISALLOW;
1613
1614        mlog(0, "%p", dlm);
1615
1616        ctxt = kzalloc(sizeof(*ctxt), GFP_KERNEL);
1617        if (!ctxt) {
1618                status = -ENOMEM;
1619                mlog_errno(status);
1620                goto bail;
1621        }
1622
1623        /* group sem locking should work for us here -- we're already
1624         * registered for heartbeat events so filling this should be
1625         * atomic wrt getting those handlers called. */
1626        o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map));
1627
1628        spin_lock(&dlm->spinlock);
1629        memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map));
1630
1631        __dlm_set_joining_node(dlm, dlm->node_num);
1632
1633        spin_unlock(&dlm->spinlock);
1634
1635        node = -1;
1636        while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES,
1637                                     node + 1)) < O2NM_MAX_NODES) {
1638                if (node == dlm->node_num)
1639                        continue;
1640
1641                status = dlm_request_join(dlm, node, &response);
1642                if (status < 0) {
1643                        mlog_errno(status);
1644                        goto bail;
1645                }
1646
1647                /* Ok, either we got a response or the node doesn't have a
1648                 * dlm up. */
1649                if (response == JOIN_OK)
1650                        set_bit(node, ctxt->yes_resp_map);
1651
1652                if (dlm_should_restart_join(dlm, ctxt, response)) {
1653                        status = -EAGAIN;
1654                        goto bail;
1655                }
1656        }
1657
1658        mlog(0, "Yay, done querying nodes!\n");
1659
1660        /* Yay, everyone agree's we can join the domain. My domain is
1661         * comprised of all nodes who were put in the
1662         * yes_resp_map. Copy that into our domain map and send a join
1663         * assert message to clean up everyone elses state. */
1664        spin_lock(&dlm->spinlock);
1665        memcpy(dlm->domain_map, ctxt->yes_resp_map,
1666               sizeof(ctxt->yes_resp_map));
1667        set_bit(dlm->node_num, dlm->domain_map);
1668        spin_unlock(&dlm->spinlock);
1669
1670        /* Support for global heartbeat and node info was added in 1.1 */
1671        if (dlm->dlm_locking_proto.pv_major > 1 ||
1672            dlm->dlm_locking_proto.pv_minor > 0) {
1673                status = dlm_send_nodeinfo(dlm, ctxt->yes_resp_map);
1674                if (status) {
1675                        mlog_errno(status);
1676                        goto bail;
1677                }
1678                status = dlm_send_regions(dlm, ctxt->yes_resp_map);
1679                if (status) {
1680                        mlog_errno(status);
1681                        goto bail;
1682                }
1683        }
1684
1685        dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
1686
1687        /* Joined state *must* be set before the joining node
1688         * information, otherwise the query_join handler may read no
1689         * current joiner but a state of NEW and tell joining nodes
1690         * we're not in the domain. */
1691        spin_lock(&dlm_domain_lock);
1692        dlm->dlm_state = DLM_CTXT_JOINED;
1693        dlm->num_joins++;
1694        spin_unlock(&dlm_domain_lock);
1695
1696bail:
1697        spin_lock(&dlm->spinlock);
1698        __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1699        if (!status) {
1700                printk(KERN_NOTICE "o2dlm: Joining domain %s ", dlm->name);
1701                __dlm_print_nodes(dlm);
1702        }
1703        spin_unlock(&dlm->spinlock);
1704
1705        if (ctxt) {
1706                /* Do we need to send a cancel message to any nodes? */
1707                if (status < 0) {
1708                        tmpstat = dlm_send_join_cancels(dlm,
1709                                                        ctxt->yes_resp_map,
1710                                                        sizeof(ctxt->yes_resp_map));
1711                        if (tmpstat < 0)
1712                                mlog_errno(tmpstat);
1713                }
1714                kfree(ctxt);
1715        }
1716
1717        mlog(0, "returning %d\n", status);
1718        return status;
1719}
1720
1721static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm)
1722{
1723        o2hb_unregister_callback(dlm->name, &dlm->dlm_hb_up);
1724        o2hb_unregister_callback(dlm->name, &dlm->dlm_hb_down);
1725        o2net_unregister_handler_list(&dlm->dlm_domain_handlers);
1726}
1727
1728static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
1729{
1730        int status;
1731
1732        mlog(0, "registering handlers.\n");
1733
1734        o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB,
1735                            dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI);
1736        o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB,
1737                            dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI);
1738
1739        status = o2hb_register_callback(dlm->name, &dlm->dlm_hb_down);
1740        if (status)
1741                goto bail;
1742
1743        status = o2hb_register_callback(dlm->name, &dlm->dlm_hb_up);
1744        if (status)
1745                goto bail;
1746
1747        status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
1748                                        sizeof(struct dlm_master_request),
1749                                        dlm_master_request_handler,
1750                                        dlm, NULL, &dlm->dlm_domain_handlers);
1751        if (status)
1752                goto bail;
1753
1754        status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
1755                                        sizeof(struct dlm_assert_master),
1756                                        dlm_assert_master_handler,
1757                                        dlm, dlm_assert_master_post_handler,
1758                                        &dlm->dlm_domain_handlers);
1759        if (status)
1760                goto bail;
1761
1762        status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
1763                                        sizeof(struct dlm_create_lock),
1764                                        dlm_create_lock_handler,
1765                                        dlm, NULL, &dlm->dlm_domain_handlers);
1766        if (status)
1767                goto bail;
1768
1769        status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
1770                                        DLM_CONVERT_LOCK_MAX_LEN,
1771                                        dlm_convert_lock_handler,
1772                                        dlm, NULL, &dlm->dlm_domain_handlers);
1773        if (status)
1774                goto bail;
1775
1776        status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
1777                                        DLM_UNLOCK_LOCK_MAX_LEN,
1778                                        dlm_unlock_lock_handler,
1779                                        dlm, NULL, &dlm->dlm_domain_handlers);
1780        if (status)
1781                goto bail;
1782
1783        status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
1784                                        DLM_PROXY_AST_MAX_LEN,
1785                                        dlm_proxy_ast_handler,
1786                                        dlm, NULL, &dlm->dlm_domain_handlers);
1787        if (status)
1788                goto bail;
1789
1790        status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
1791                                        sizeof(struct dlm_exit_domain),
1792                                        dlm_exit_domain_handler,
1793                                        dlm, NULL, &dlm->dlm_domain_handlers);
1794        if (status)
1795                goto bail;
1796
1797        status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
1798                                        sizeof(struct dlm_deref_lockres),
1799                                        dlm_deref_lockres_handler,
1800                                        dlm, NULL, &dlm->dlm_domain_handlers);
1801        if (status)
1802                goto bail;
1803
1804        status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1805                                        sizeof(struct dlm_migrate_request),
1806                                        dlm_migrate_request_handler,
1807                                        dlm, NULL, &dlm->dlm_domain_handlers);
1808        if (status)
1809                goto bail;
1810
1811        status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
1812                                        DLM_MIG_LOCKRES_MAX_LEN,
1813                                        dlm_mig_lockres_handler,
1814                                        dlm, NULL, &dlm->dlm_domain_handlers);
1815        if (status)
1816                goto bail;
1817
1818        status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
1819                                        sizeof(struct dlm_master_requery),
1820                                        dlm_master_requery_handler,
1821                                        dlm, NULL, &dlm->dlm_domain_handlers);
1822        if (status)
1823                goto bail;
1824
1825        status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
1826                                        sizeof(struct dlm_lock_request),
1827                                        dlm_request_all_locks_handler,
1828                                        dlm, NULL, &dlm->dlm_domain_handlers);
1829        if (status)
1830                goto bail;
1831
1832        status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
1833                                        sizeof(struct dlm_reco_data_done),
1834                                        dlm_reco_data_done_handler,
1835                                        dlm, NULL, &dlm->dlm_domain_handlers);
1836        if (status)
1837                goto bail;
1838
1839        status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
1840                                        sizeof(struct dlm_begin_reco),
1841                                        dlm_begin_reco_handler,
1842                                        dlm, NULL, &dlm->dlm_domain_handlers);
1843        if (status)
1844                goto bail;
1845
1846        status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
1847                                        sizeof(struct dlm_finalize_reco),
1848                                        dlm_finalize_reco_handler,
1849                                        dlm, NULL, &dlm->dlm_domain_handlers);
1850        if (status)
1851                goto bail;
1852
1853        status = o2net_register_handler(DLM_BEGIN_EXIT_DOMAIN_MSG, dlm->key,
1854                                        sizeof(struct dlm_exit_domain),
1855                                        dlm_begin_exit_domain_handler,
1856                                        dlm, NULL, &dlm->dlm_domain_handlers);
1857        if (status)
1858                goto bail;
1859
1860        status = o2net_register_handler(DLM_DEREF_LOCKRES_DONE, dlm->key,
1861                                        sizeof(struct dlm_deref_lockres_done),
1862                                        dlm_deref_lockres_done_handler,
1863                                        dlm, NULL, &dlm->dlm_domain_handlers);
1864bail:
1865        if (status)
1866                dlm_unregister_domain_handlers(dlm);
1867
1868        return status;
1869}
1870
1871static int dlm_join_domain(struct dlm_ctxt *dlm)
1872{
1873        int status;
1874        unsigned int backoff;
1875        unsigned int total_backoff = 0;
1876        char wq_name[O2NM_MAX_NAME_LEN];
1877
1878        BUG_ON(!dlm);
1879
1880        mlog(0, "Join domain %s\n", dlm->name);
1881
1882        status = dlm_register_domain_handlers(dlm);
1883        if (status) {
1884                mlog_errno(status);
1885                goto bail;
1886        }
1887
1888        status = dlm_launch_thread(dlm);
1889        if (status < 0) {
1890                mlog_errno(status);
1891                goto bail;
1892        }
1893
1894        status = dlm_launch_recovery_thread(dlm);
1895        if (status < 0) {
1896                mlog_errno(status);
1897                goto bail;
1898        }
1899
1900        status = dlm_debug_init(dlm);
1901        if (status < 0) {
1902                mlog_errno(status);
1903                goto bail;
1904        }
1905
1906        snprintf(wq_name, O2NM_MAX_NAME_LEN, "dlm_wq-%s", dlm->name);
1907        dlm->dlm_worker = alloc_workqueue(wq_name, WQ_MEM_RECLAIM, 0);
1908        if (!dlm->dlm_worker) {
1909                status = -ENOMEM;
1910                mlog_errno(status);
1911                goto bail;
1912        }
1913
1914        do {
1915                status = dlm_try_to_join_domain(dlm);
1916
1917                /* If we're racing another node to the join, then we
1918                 * need to back off temporarily and let them
1919                 * complete. */
1920#define DLM_JOIN_TIMEOUT_MSECS  90000
1921                if (status == -EAGAIN) {
1922                        if (signal_pending(current)) {
1923                                status = -ERESTARTSYS;
1924                                goto bail;
1925                        }
1926
1927                        if (total_backoff > DLM_JOIN_TIMEOUT_MSECS) {
1928                                status = -ERESTARTSYS;
1929                                mlog(ML_NOTICE, "Timed out joining dlm domain "
1930                                     "%s after %u msecs\n", dlm->name,
1931                                     total_backoff);
1932                                goto bail;
1933                        }
1934
1935                        /*
1936                         * <chip> After you!
1937                         * <dale> No, after you!
1938                         * <chip> I insist!
1939                         * <dale> But you first!
1940                         * ...
1941                         */
1942                        backoff = (unsigned int)(jiffies & 0x3);
1943                        backoff *= DLM_DOMAIN_BACKOFF_MS;
1944                        total_backoff += backoff;
1945                        mlog(0, "backoff %d\n", backoff);
1946                        msleep(backoff);
1947                }
1948        } while (status == -EAGAIN);
1949
1950        if (status < 0) {
1951                mlog_errno(status);
1952                goto bail;
1953        }
1954
1955        status = 0;
1956bail:
1957        wake_up(&dlm_domain_events);
1958
1959        if (status) {
1960                dlm_unregister_domain_handlers(dlm);
1961                dlm_debug_shutdown(dlm);
1962                dlm_complete_thread(dlm);
1963                dlm_complete_recovery_thread(dlm);
1964                dlm_destroy_dlm_worker(dlm);
1965        }
1966
1967        return status;
1968}
1969
1970static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1971                                u32 key)
1972{
1973        int i;
1974        int ret;
1975        struct dlm_ctxt *dlm = NULL;
1976
1977        dlm = kzalloc(sizeof(*dlm), GFP_KERNEL);
1978        if (!dlm) {
1979                ret = -ENOMEM;
1980                mlog_errno(ret);
1981                goto leave;
1982        }
1983
1984        dlm->name = kstrdup(domain, GFP_KERNEL);
1985        if (dlm->name == NULL) {
1986                ret = -ENOMEM;
1987                mlog_errno(ret);
1988                goto leave;
1989        }
1990
1991        dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES);
1992        if (!dlm->lockres_hash) {
1993                ret = -ENOMEM;
1994                mlog_errno(ret);
1995                goto leave;
1996        }
1997
1998        for (i = 0; i < DLM_HASH_BUCKETS; i++)
1999                INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i));
2000
2001        dlm->master_hash = (struct hlist_head **)
2002                                dlm_alloc_pagevec(DLM_HASH_PAGES);
2003        if (!dlm->master_hash) {
2004                ret = -ENOMEM;
2005                mlog_errno(ret);
2006                goto leave;
2007        }
2008
2009        for (i = 0; i < DLM_HASH_BUCKETS; i++)
2010                INIT_HLIST_HEAD(dlm_master_hash(dlm, i));
2011
2012        dlm->key = key;
2013        dlm->node_num = o2nm_this_node();
2014
2015        ret = dlm_create_debugfs_subroot(dlm);
2016        if (ret < 0)
2017                goto leave;
2018
2019        spin_lock_init(&dlm->spinlock);
2020        spin_lock_init(&dlm->master_lock);
2021        spin_lock_init(&dlm->ast_lock);
2022        spin_lock_init(&dlm->track_lock);
2023        INIT_LIST_HEAD(&dlm->list);
2024        INIT_LIST_HEAD(&dlm->dirty_list);
2025        INIT_LIST_HEAD(&dlm->reco.resources);
2026        INIT_LIST_HEAD(&dlm->reco.node_data);
2027        INIT_LIST_HEAD(&dlm->purge_list);
2028        INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
2029        INIT_LIST_HEAD(&dlm->tracking_list);
2030        dlm->reco.state = 0;
2031
2032        INIT_LIST_HEAD(&dlm->pending_asts);
2033        INIT_LIST_HEAD(&dlm->pending_basts);
2034
2035        mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n",
2036                  dlm->recovery_map, &(dlm->recovery_map[0]));
2037
2038        memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map));
2039        memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map));
2040        memset(dlm->domain_map, 0, sizeof(dlm->domain_map));
2041
2042        dlm->dlm_thread_task = NULL;
2043        dlm->dlm_reco_thread_task = NULL;
2044        dlm->dlm_worker = NULL;
2045        init_waitqueue_head(&dlm->dlm_thread_wq);
2046        init_waitqueue_head(&dlm->dlm_reco_thread_wq);
2047        init_waitqueue_head(&dlm->reco.event);
2048        init_waitqueue_head(&dlm->ast_wq);
2049        init_waitqueue_head(&dlm->migration_wq);
2050        INIT_LIST_HEAD(&dlm->mle_hb_events);
2051
2052        dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
2053        init_waitqueue_head(&dlm->dlm_join_events);
2054
2055        dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
2056        dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
2057
2058        atomic_set(&dlm->res_tot_count, 0);
2059        atomic_set(&dlm->res_cur_count, 0);
2060        for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) {
2061                atomic_set(&dlm->mle_tot_count[i], 0);
2062                atomic_set(&dlm->mle_cur_count[i], 0);
2063        }
2064
2065        spin_lock_init(&dlm->work_lock);
2066        INIT_LIST_HEAD(&dlm->work_list);
2067        INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work);
2068
2069        kref_init(&dlm->dlm_refs);
2070        dlm->dlm_state = DLM_CTXT_NEW;
2071
2072        INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks);
2073
2074        mlog(0, "context init: refcount %u\n",
2075                  kref_read(&dlm->dlm_refs));
2076
2077leave:
2078        if (ret < 0 && dlm) {
2079                if (dlm->master_hash)
2080                        dlm_free_pagevec((void **)dlm->master_hash,
2081                                        DLM_HASH_PAGES);
2082
2083                if (dlm->lockres_hash)
2084                        dlm_free_pagevec((void **)dlm->lockres_hash,
2085                                        DLM_HASH_PAGES);
2086
2087                kfree(dlm->name);
2088                kfree(dlm);
2089                dlm = NULL;
2090        }
2091        return dlm;
2092}
2093
2094/*
2095 * Compare a requested locking protocol version against the current one.
2096 *
2097 * If the major numbers are different, they are incompatible.
2098 * If the current minor is greater than the request, they are incompatible.
2099 * If the current minor is less than or equal to the request, they are
2100 * compatible, and the requester should run at the current minor version.
2101 */
2102static int dlm_protocol_compare(struct dlm_protocol_version *existing,
2103                                struct dlm_protocol_version *request)
2104{
2105        if (existing->pv_major != request->pv_major)
2106                return 1;
2107
2108        if (existing->pv_minor > request->pv_minor)
2109                return 1;
2110
2111        if (existing->pv_minor < request->pv_minor)
2112                request->pv_minor = existing->pv_minor;
2113
2114        return 0;
2115}
2116
2117/*
2118 * dlm_register_domain: one-time setup per "domain".
2119 *
2120 * The filesystem passes in the requested locking version via proto.
2121 * If registration was successful, proto will contain the negotiated
2122 * locking protocol.
2123 */
2124struct dlm_ctxt * dlm_register_domain(const char *domain,
2125                               u32 key,
2126                               struct dlm_protocol_version *fs_proto)
2127{
2128        int ret;
2129        struct dlm_ctxt *dlm = NULL;
2130        struct dlm_ctxt *new_ctxt = NULL;
2131
2132        if (strlen(domain) >= O2NM_MAX_NAME_LEN) {
2133                ret = -ENAMETOOLONG;
2134                mlog(ML_ERROR, "domain name length too long\n");
2135                goto leave;
2136        }
2137
2138        mlog(0, "register called for domain \"%s\"\n", domain);
2139
2140retry:
2141        dlm = NULL;
2142        if (signal_pending(current)) {
2143                ret = -ERESTARTSYS;
2144                mlog_errno(ret);
2145                goto leave;
2146        }
2147
2148        spin_lock(&dlm_domain_lock);
2149
2150        dlm = __dlm_lookup_domain(domain);
2151        if (dlm) {
2152                if (dlm->dlm_state != DLM_CTXT_JOINED) {
2153                        spin_unlock(&dlm_domain_lock);
2154
2155                        mlog(0, "This ctxt is not joined yet!\n");
2156                        wait_event_interruptible(dlm_domain_events,
2157                                                 dlm_wait_on_domain_helper(
2158                                                         domain));
2159                        goto retry;
2160                }
2161
2162                if (dlm_protocol_compare(&dlm->fs_locking_proto, fs_proto)) {
2163                        spin_unlock(&dlm_domain_lock);
2164                        mlog(ML_ERROR,
2165                             "Requested locking protocol version is not "
2166                             "compatible with already registered domain "
2167                             "\"%s\"\n", domain);
2168                        ret = -EPROTO;
2169                        goto leave;
2170                }
2171
2172                __dlm_get(dlm);
2173                dlm->num_joins++;
2174
2175                spin_unlock(&dlm_domain_lock);
2176
2177                ret = 0;
2178                goto leave;
2179        }
2180
2181        /* doesn't exist */
2182        if (!new_ctxt) {
2183                spin_unlock(&dlm_domain_lock);
2184
2185                new_ctxt = dlm_alloc_ctxt(domain, key);
2186                if (new_ctxt)
2187                        goto retry;
2188
2189                ret = -ENOMEM;
2190                mlog_errno(ret);
2191                goto leave;
2192        }
2193
2194        /* a little variable switch-a-roo here... */
2195        dlm = new_ctxt;
2196        new_ctxt = NULL;
2197
2198        /* add the new domain */
2199        list_add_tail(&dlm->list, &dlm_domains);
2200        spin_unlock(&dlm_domain_lock);
2201
2202        /*
2203         * Pass the locking protocol version into the join.  If the join
2204         * succeeds, it will have the negotiated protocol set.
2205         */
2206        dlm->dlm_locking_proto = dlm_protocol;
2207        dlm->fs_locking_proto = *fs_proto;
2208
2209        ret = dlm_join_domain(dlm);
2210        if (ret) {
2211                mlog_errno(ret);
2212                dlm_put(dlm);
2213                goto leave;
2214        }
2215
2216        /* Tell the caller what locking protocol we negotiated */
2217        *fs_proto = dlm->fs_locking_proto;
2218
2219        ret = 0;
2220leave:
2221        if (new_ctxt)
2222                dlm_free_ctxt_mem(new_ctxt);
2223
2224        if (ret < 0)
2225                dlm = ERR_PTR(ret);
2226
2227        return dlm;
2228}
2229EXPORT_SYMBOL_GPL(dlm_register_domain);
2230
2231static LIST_HEAD(dlm_join_handlers);
2232
2233static void dlm_unregister_net_handlers(void)
2234{
2235        o2net_unregister_handler_list(&dlm_join_handlers);
2236}
2237
2238static int dlm_register_net_handlers(void)
2239{
2240        int status = 0;
2241
2242        status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
2243                                        sizeof(struct dlm_query_join_request),
2244                                        dlm_query_join_handler,
2245                                        NULL, NULL, &dlm_join_handlers);
2246        if (status)
2247                goto bail;
2248
2249        status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
2250                                        sizeof(struct dlm_assert_joined),
2251                                        dlm_assert_joined_handler,
2252                                        NULL, NULL, &dlm_join_handlers);
2253        if (status)
2254                goto bail;
2255
2256        status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
2257                                        sizeof(struct dlm_cancel_join),
2258                                        dlm_cancel_join_handler,
2259                                        NULL, NULL, &dlm_join_handlers);
2260        if (status)
2261                goto bail;
2262
2263        status = o2net_register_handler(DLM_QUERY_REGION, DLM_MOD_KEY,
2264                                        sizeof(struct dlm_query_region),
2265                                        dlm_query_region_handler,
2266                                        NULL, NULL, &dlm_join_handlers);
2267
2268        if (status)
2269                goto bail;
2270
2271        status = o2net_register_handler(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
2272                                        sizeof(struct dlm_query_nodeinfo),
2273                                        dlm_query_nodeinfo_handler,
2274                                        NULL, NULL, &dlm_join_handlers);
2275bail:
2276        if (status < 0)
2277                dlm_unregister_net_handlers();
2278
2279        return status;
2280}
2281
2282/* Domain eviction callback handling.
2283 *
2284 * The file system requires notification of node death *before* the
2285 * dlm completes it's recovery work, otherwise it may be able to
2286 * acquire locks on resources requiring recovery. Since the dlm can
2287 * evict a node from it's domain *before* heartbeat fires, a similar
2288 * mechanism is required. */
2289
2290/* Eviction is not expected to happen often, so a per-domain lock is
2291 * not necessary. Eviction callbacks are allowed to sleep for short
2292 * periods of time. */
2293static DECLARE_RWSEM(dlm_callback_sem);
2294
2295void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm,
2296                                        int node_num)
2297{
2298        struct dlm_eviction_cb *cb;
2299
2300        down_read(&dlm_callback_sem);
2301        list_for_each_entry(cb, &dlm->dlm_eviction_callbacks, ec_item) {
2302                cb->ec_func(node_num, cb->ec_data);
2303        }
2304        up_read(&dlm_callback_sem);
2305}
2306
2307void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb,
2308                           dlm_eviction_func *f,
2309                           void *data)
2310{
2311        INIT_LIST_HEAD(&cb->ec_item);
2312        cb->ec_func = f;
2313        cb->ec_data = data;
2314}
2315EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb);
2316
2317void dlm_register_eviction_cb(struct dlm_ctxt *dlm,
2318                              struct dlm_eviction_cb *cb)
2319{
2320        down_write(&dlm_callback_sem);
2321        list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks);
2322        up_write(&dlm_callback_sem);
2323}
2324EXPORT_SYMBOL_GPL(dlm_register_eviction_cb);
2325
2326void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb)
2327{
2328        down_write(&dlm_callback_sem);
2329        list_del_init(&cb->ec_item);
2330        up_write(&dlm_callback_sem);
2331}
2332EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb);
2333
2334static int __init dlm_init(void)
2335{
2336        int status;
2337
2338        status = dlm_init_mle_cache();
2339        if (status) {
2340                mlog(ML_ERROR, "Could not create o2dlm_mle slabcache\n");
2341                goto error;
2342        }
2343
2344        status = dlm_init_master_caches();
2345        if (status) {
2346                mlog(ML_ERROR, "Could not create o2dlm_lockres and "
2347                     "o2dlm_lockname slabcaches\n");
2348                goto error;
2349        }
2350
2351        status = dlm_init_lock_cache();
2352        if (status) {
2353                mlog(ML_ERROR, "Count not create o2dlm_lock slabcache\n");
2354                goto error;
2355        }
2356
2357        status = dlm_register_net_handlers();
2358        if (status) {
2359                mlog(ML_ERROR, "Unable to register network handlers\n");
2360                goto error;
2361        }
2362
2363        status = dlm_create_debugfs_root();
2364        if (status)
2365                goto error;
2366
2367        return 0;
2368error:
2369        dlm_unregister_net_handlers();
2370        dlm_destroy_lock_cache();
2371        dlm_destroy_master_caches();
2372        dlm_destroy_mle_cache();
2373        return -1;
2374}
2375
2376static void __exit dlm_exit (void)
2377{
2378        dlm_destroy_debugfs_root();
2379        dlm_unregister_net_handlers();
2380        dlm_destroy_lock_cache();
2381        dlm_destroy_master_caches();
2382        dlm_destroy_mle_cache();
2383}
2384
2385MODULE_AUTHOR("Oracle");
2386MODULE_LICENSE("GPL");
2387MODULE_DESCRIPTION("OCFS2 Distributed Lock Management");
2388
2389module_init(dlm_init);
2390module_exit(dlm_exit);
2391