linux/fs/ocfs2/dlm/dlmdomain.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3 * dlmdomain.c
   4 *
   5 * defines domain join / leave apis
   6 *
   7 * Copyright (C) 2004 Oracle.  All rights reserved.
   8 */
   9
  10#include <linux/module.h>
  11#include <linux/types.h>
  12#include <linux/slab.h>
  13#include <linux/highmem.h>
  14#include <linux/init.h>
  15#include <linux/spinlock.h>
  16#include <linux/delay.h>
  17#include <linux/err.h>
  18#include <linux/debugfs.h>
  19#include <linux/sched/signal.h>
  20
  21#include "../cluster/heartbeat.h"
  22#include "../cluster/nodemanager.h"
  23#include "../cluster/tcp.h"
  24
  25#include "dlmapi.h"
  26#include "dlmcommon.h"
  27#include "dlmdomain.h"
  28#include "dlmdebug.h"
  29
  30#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
  31#include "../cluster/masklog.h"
  32
  33/*
  34 * ocfs2 node maps are array of long int, which limits to send them freely
  35 * across the wire due to endianness issues. To workaround this, we convert
  36 * long ints to byte arrays. Following 3 routines are helper functions to
  37 * set/test/copy bits within those array of bytes
  38 */
  39static inline void byte_set_bit(u8 nr, u8 map[])
  40{
  41        map[nr >> 3] |= (1UL << (nr & 7));
  42}
  43
  44static inline int byte_test_bit(u8 nr, u8 map[])
  45{
  46        return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0;
  47}
  48
  49static inline void byte_copymap(u8 dmap[], unsigned long smap[],
  50                        unsigned int sz)
  51{
  52        unsigned int nn;
  53
  54        if (!sz)
  55                return;
  56
  57        memset(dmap, 0, ((sz + 7) >> 3));
  58        for (nn = 0 ; nn < sz; nn++)
  59                if (test_bit(nn, smap))
  60                        byte_set_bit(nn, dmap);
  61}
  62
  63static void dlm_free_pagevec(void **vec, int pages)
  64{
  65        while (pages--)
  66                free_page((unsigned long)vec[pages]);
  67        kfree(vec);
  68}
  69
  70static void **dlm_alloc_pagevec(int pages)
  71{
  72        void **vec = kmalloc_array(pages, sizeof(void *), GFP_KERNEL);
  73        int i;
  74
  75        if (!vec)
  76                return NULL;
  77
  78        for (i = 0; i < pages; i++)
  79                if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL)))
  80                        goto out_free;
  81
  82        mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n",
  83             pages, (unsigned long)DLM_HASH_PAGES,
  84             (unsigned long)DLM_BUCKETS_PER_PAGE);
  85        return vec;
  86out_free:
  87        dlm_free_pagevec(vec, i);
  88        return NULL;
  89}
  90
  91/*
  92 *
  93 * spinlock lock ordering: if multiple locks are needed, obey this ordering:
  94 *    dlm_domain_lock
  95 *    struct dlm_ctxt->spinlock
  96 *    struct dlm_lock_resource->spinlock
  97 *    struct dlm_ctxt->master_lock
  98 *    struct dlm_ctxt->ast_lock
  99 *    dlm_master_list_entry->spinlock
 100 *    dlm_lock->spinlock
 101 *
 102 */
 103
 104DEFINE_SPINLOCK(dlm_domain_lock);
 105LIST_HEAD(dlm_domains);
 106static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
 107
 108/*
 109 * The supported protocol version for DLM communication.  Running domains
 110 * will have a negotiated version with the same major number and a minor
 111 * number equal or smaller.  The dlm_ctxt->dlm_locking_proto field should
 112 * be used to determine what a running domain is actually using.
 113 *
 114 * New in version 1.1:
 115 *      - Message DLM_QUERY_REGION added to support global heartbeat
 116 *      - Message DLM_QUERY_NODEINFO added to allow online node removes
 117 * New in version 1.2:
 118 *      - Message DLM_BEGIN_EXIT_DOMAIN_MSG added to mark start of exit domain
 119 * New in version 1.3:
 120 *      - Message DLM_DEREF_LOCKRES_DONE added to inform non-master that the
 121 *        refmap is cleared
 122 */
 123static const struct dlm_protocol_version dlm_protocol = {
 124        .pv_major = 1,
 125        .pv_minor = 3,
 126};
 127
 128#define DLM_DOMAIN_BACKOFF_MS 200
 129
 130static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
 131                                  void **ret_data);
 132static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
 133                                     void **ret_data);
 134static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
 135                                   void **ret_data);
 136static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
 137                                    void *data, void **ret_data);
 138static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
 139                                   void **ret_data);
 140static int dlm_protocol_compare(struct dlm_protocol_version *existing,
 141                                struct dlm_protocol_version *request);
 142
 143static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
 144
 145void __dlm_unhash_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 146{
 147        if (hlist_unhashed(&res->hash_node))
 148                return;
 149
 150        mlog(0, "%s: Unhash res %.*s\n", dlm->name, res->lockname.len,
 151             res->lockname.name);
 152        hlist_del_init(&res->hash_node);
 153        dlm_lockres_put(res);
 154}
 155
 156void __dlm_insert_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 157{
 158        struct hlist_head *bucket;
 159
 160        assert_spin_locked(&dlm->spinlock);
 161
 162        bucket = dlm_lockres_hash(dlm, res->lockname.hash);
 163
 164        /* get a reference for our hashtable */
 165        dlm_lockres_get(res);
 166
 167        hlist_add_head(&res->hash_node, bucket);
 168
 169        mlog(0, "%s: Hash res %.*s\n", dlm->name, res->lockname.len,
 170             res->lockname.name);
 171}
 172
 173struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
 174                                                     const char *name,
 175                                                     unsigned int len,
 176                                                     unsigned int hash)
 177{
 178        struct hlist_head *bucket;
 179        struct dlm_lock_resource *res;
 180
 181        mlog(0, "%.*s\n", len, name);
 182
 183        assert_spin_locked(&dlm->spinlock);
 184
 185        bucket = dlm_lockres_hash(dlm, hash);
 186
 187        hlist_for_each_entry(res, bucket, hash_node) {
 188                if (res->lockname.name[0] != name[0])
 189                        continue;
 190                if (unlikely(res->lockname.len != len))
 191                        continue;
 192                if (memcmp(res->lockname.name + 1, name + 1, len - 1))
 193                        continue;
 194                dlm_lockres_get(res);
 195                return res;
 196        }
 197        return NULL;
 198}
 199
 200/* intended to be called by functions which do not care about lock
 201 * resources which are being purged (most net _handler functions).
 202 * this will return NULL for any lock resource which is found but
 203 * currently in the process of dropping its mastery reference.
 204 * use __dlm_lookup_lockres_full when you need the lock resource
 205 * regardless (e.g. dlm_get_lock_resource) */
 206struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
 207                                                const char *name,
 208                                                unsigned int len,
 209                                                unsigned int hash)
 210{
 211        struct dlm_lock_resource *res = NULL;
 212
 213        mlog(0, "%.*s\n", len, name);
 214
 215        assert_spin_locked(&dlm->spinlock);
 216
 217        res = __dlm_lookup_lockres_full(dlm, name, len, hash);
 218        if (res) {
 219                spin_lock(&res->spinlock);
 220                if (res->state & DLM_LOCK_RES_DROPPING_REF) {
 221                        spin_unlock(&res->spinlock);
 222                        dlm_lockres_put(res);
 223                        return NULL;
 224                }
 225                spin_unlock(&res->spinlock);
 226        }
 227
 228        return res;
 229}
 230
 231struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
 232                                    const char *name,
 233                                    unsigned int len)
 234{
 235        struct dlm_lock_resource *res;
 236        unsigned int hash = dlm_lockid_hash(name, len);
 237
 238        spin_lock(&dlm->spinlock);
 239        res = __dlm_lookup_lockres(dlm, name, len, hash);
 240        spin_unlock(&dlm->spinlock);
 241        return res;
 242}
 243
 244static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len)
 245{
 246        struct dlm_ctxt *tmp;
 247
 248        assert_spin_locked(&dlm_domain_lock);
 249
 250        /* tmp->name here is always NULL terminated,
 251         * but domain may not be! */
 252        list_for_each_entry(tmp, &dlm_domains, list) {
 253                if (strlen(tmp->name) == len &&
 254                    memcmp(tmp->name, domain, len)==0)
 255                        return tmp;
 256        }
 257
 258        return NULL;
 259}
 260
 261/* For null terminated domain strings ONLY */
 262static struct dlm_ctxt * __dlm_lookup_domain(const char *domain)
 263{
 264        assert_spin_locked(&dlm_domain_lock);
 265
 266        return __dlm_lookup_domain_full(domain, strlen(domain));
 267}
 268
 269
 270/* returns true on one of two conditions:
 271 * 1) the domain does not exist
 272 * 2) the domain exists and it's state is "joined" */
 273static int dlm_wait_on_domain_helper(const char *domain)
 274{
 275        int ret = 0;
 276        struct dlm_ctxt *tmp = NULL;
 277
 278        spin_lock(&dlm_domain_lock);
 279
 280        tmp = __dlm_lookup_domain(domain);
 281        if (!tmp)
 282                ret = 1;
 283        else if (tmp->dlm_state == DLM_CTXT_JOINED)
 284                ret = 1;
 285
 286        spin_unlock(&dlm_domain_lock);
 287        return ret;
 288}
 289
 290static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
 291{
 292        dlm_destroy_debugfs_subroot(dlm);
 293
 294        if (dlm->lockres_hash)
 295                dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
 296
 297        if (dlm->master_hash)
 298                dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES);
 299
 300        kfree(dlm->name);
 301        kfree(dlm);
 302}
 303
 304/* A little strange - this function will be called while holding
 305 * dlm_domain_lock and is expected to be holding it on the way out. We
 306 * will however drop and reacquire it multiple times */
 307static void dlm_ctxt_release(struct kref *kref)
 308{
 309        struct dlm_ctxt *dlm;
 310
 311        dlm = container_of(kref, struct dlm_ctxt, dlm_refs);
 312
 313        BUG_ON(dlm->num_joins);
 314        BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED);
 315
 316        /* we may still be in the list if we hit an error during join. */
 317        list_del_init(&dlm->list);
 318
 319        spin_unlock(&dlm_domain_lock);
 320
 321        mlog(0, "freeing memory from domain %s\n", dlm->name);
 322
 323        wake_up(&dlm_domain_events);
 324
 325        dlm_free_ctxt_mem(dlm);
 326
 327        spin_lock(&dlm_domain_lock);
 328}
 329
 330void dlm_put(struct dlm_ctxt *dlm)
 331{
 332        spin_lock(&dlm_domain_lock);
 333        kref_put(&dlm->dlm_refs, dlm_ctxt_release);
 334        spin_unlock(&dlm_domain_lock);
 335}
 336
 337static void __dlm_get(struct dlm_ctxt *dlm)
 338{
 339        kref_get(&dlm->dlm_refs);
 340}
 341
 342/* given a questionable reference to a dlm object, gets a reference if
 343 * it can find it in the list, otherwise returns NULL in which case
 344 * you shouldn't trust your pointer. */
 345struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm)
 346{
 347        struct dlm_ctxt *target;
 348        struct dlm_ctxt *ret = NULL;
 349
 350        spin_lock(&dlm_domain_lock);
 351
 352        list_for_each_entry(target, &dlm_domains, list) {
 353                if (target == dlm) {
 354                        __dlm_get(target);
 355                        ret = target;
 356                        break;
 357                }
 358        }
 359
 360        spin_unlock(&dlm_domain_lock);
 361
 362        return ret;
 363}
 364
 365int dlm_domain_fully_joined(struct dlm_ctxt *dlm)
 366{
 367        int ret;
 368
 369        spin_lock(&dlm_domain_lock);
 370        ret = (dlm->dlm_state == DLM_CTXT_JOINED) ||
 371                (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN);
 372        spin_unlock(&dlm_domain_lock);
 373
 374        return ret;
 375}
 376
 377static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm)
 378{
 379        if (dlm->dlm_worker) {
 380                destroy_workqueue(dlm->dlm_worker);
 381                dlm->dlm_worker = NULL;
 382        }
 383}
 384
 385static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
 386{
 387        dlm_unregister_domain_handlers(dlm);
 388        dlm_complete_thread(dlm);
 389        dlm_complete_recovery_thread(dlm);
 390        dlm_destroy_dlm_worker(dlm);
 391
 392        /* We've left the domain. Now we can take ourselves out of the
 393         * list and allow the kref stuff to help us free the
 394         * memory. */
 395        spin_lock(&dlm_domain_lock);
 396        list_del_init(&dlm->list);
 397        spin_unlock(&dlm_domain_lock);
 398
 399        /* Wake up anyone waiting for us to remove this domain */
 400        wake_up(&dlm_domain_events);
 401}
 402
 403static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
 404{
 405        int i, num, n, ret = 0;
 406        struct dlm_lock_resource *res;
 407        struct hlist_node *iter;
 408        struct hlist_head *bucket;
 409        int dropped;
 410
 411        mlog(0, "Migrating locks from domain %s\n", dlm->name);
 412
 413        num = 0;
 414        spin_lock(&dlm->spinlock);
 415        for (i = 0; i < DLM_HASH_BUCKETS; i++) {
 416redo_bucket:
 417                n = 0;
 418                bucket = dlm_lockres_hash(dlm, i);
 419                iter = bucket->first;
 420                while (iter) {
 421                        n++;
 422                        res = hlist_entry(iter, struct dlm_lock_resource,
 423                                          hash_node);
 424                        dlm_lockres_get(res);
 425                        /* migrate, if necessary.  this will drop the dlm
 426                         * spinlock and retake it if it does migration. */
 427                        dropped = dlm_empty_lockres(dlm, res);
 428
 429                        spin_lock(&res->spinlock);
 430                        if (dropped)
 431                                __dlm_lockres_calc_usage(dlm, res);
 432                        else
 433                                iter = res->hash_node.next;
 434                        spin_unlock(&res->spinlock);
 435
 436                        dlm_lockres_put(res);
 437
 438                        if (dropped) {
 439                                cond_resched_lock(&dlm->spinlock);
 440                                goto redo_bucket;
 441                        }
 442                }
 443                cond_resched_lock(&dlm->spinlock);
 444                num += n;
 445        }
 446
 447        if (!num) {
 448                if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
 449                        mlog(0, "%s: perhaps there are more lock resources "
 450                             "need to be migrated after dlm recovery\n", dlm->name);
 451                        ret = -EAGAIN;
 452                } else {
 453                        mlog(0, "%s: we won't do dlm recovery after migrating "
 454                             "all lock resources\n", dlm->name);
 455                        dlm->migrate_done = 1;
 456                }
 457        }
 458
 459        spin_unlock(&dlm->spinlock);
 460        wake_up(&dlm->dlm_thread_wq);
 461
 462        /* let the dlm thread take care of purging, keep scanning until
 463         * nothing remains in the hash */
 464        if (num) {
 465                mlog(0, "%s: %d lock resources in hash last pass\n",
 466                     dlm->name, num);
 467                ret = -EAGAIN;
 468        }
 469        mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
 470        return ret;
 471}
 472
 473static int dlm_no_joining_node(struct dlm_ctxt *dlm)
 474{
 475        int ret;
 476
 477        spin_lock(&dlm->spinlock);
 478        ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN;
 479        spin_unlock(&dlm->spinlock);
 480
 481        return ret;
 482}
 483
 484static int dlm_begin_exit_domain_handler(struct o2net_msg *msg, u32 len,
 485                                         void *data, void **ret_data)
 486{
 487        struct dlm_ctxt *dlm = data;
 488        unsigned int node;
 489        struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
 490
 491        if (!dlm_grab(dlm))
 492                return 0;
 493
 494        node = exit_msg->node_idx;
 495        mlog(0, "%s: Node %u sent a begin exit domain message\n", dlm->name, node);
 496
 497        spin_lock(&dlm->spinlock);
 498        set_bit(node, dlm->exit_domain_map);
 499        spin_unlock(&dlm->spinlock);
 500
 501        dlm_put(dlm);
 502
 503        return 0;
 504}
 505
 506static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm)
 507{
 508        /* Yikes, a double spinlock! I need domain_lock for the dlm
 509         * state and the dlm spinlock for join state... Sorry! */
 510again:
 511        spin_lock(&dlm_domain_lock);
 512        spin_lock(&dlm->spinlock);
 513
 514        if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
 515                mlog(0, "Node %d is joining, we wait on it.\n",
 516                          dlm->joining_node);
 517                spin_unlock(&dlm->spinlock);
 518                spin_unlock(&dlm_domain_lock);
 519
 520                wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm));
 521                goto again;
 522        }
 523
 524        dlm->dlm_state = DLM_CTXT_LEAVING;
 525        spin_unlock(&dlm->spinlock);
 526        spin_unlock(&dlm_domain_lock);
 527}
 528
 529static void __dlm_print_nodes(struct dlm_ctxt *dlm)
 530{
 531        int node = -1, num = 0;
 532
 533        assert_spin_locked(&dlm->spinlock);
 534
 535        printk("( ");
 536        while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
 537                                     node + 1)) < O2NM_MAX_NODES) {
 538                printk("%d ", node);
 539                ++num;
 540        }
 541        printk(") %u nodes\n", num);
 542}
 543
 544static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
 545                                   void **ret_data)
 546{
 547        struct dlm_ctxt *dlm = data;
 548        unsigned int node;
 549        struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
 550
 551        mlog(0, "%p %u %p", msg, len, data);
 552
 553        if (!dlm_grab(dlm))
 554                return 0;
 555
 556        node = exit_msg->node_idx;
 557
 558        spin_lock(&dlm->spinlock);
 559        clear_bit(node, dlm->domain_map);
 560        clear_bit(node, dlm->exit_domain_map);
 561        printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s ", node, dlm->name);
 562        __dlm_print_nodes(dlm);
 563
 564        /* notify anything attached to the heartbeat events */
 565        dlm_hb_event_notify_attached(dlm, node, 0);
 566
 567        spin_unlock(&dlm->spinlock);
 568
 569        dlm_put(dlm);
 570
 571        return 0;
 572}
 573
 574static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm, u32 msg_type,
 575                                    unsigned int node)
 576{
 577        int status;
 578        struct dlm_exit_domain leave_msg;
 579
 580        mlog(0, "%s: Sending domain exit message %u to node %u\n", dlm->name,
 581             msg_type, node);
 582
 583        memset(&leave_msg, 0, sizeof(leave_msg));
 584        leave_msg.node_idx = dlm->node_num;
 585
 586        status = o2net_send_message(msg_type, dlm->key, &leave_msg,
 587                                    sizeof(leave_msg), node, NULL);
 588        if (status < 0)
 589                mlog(ML_ERROR, "Error %d sending domain exit message %u "
 590                     "to node %u on domain %s\n", status, msg_type, node,
 591                     dlm->name);
 592
 593        return status;
 594}
 595
 596static void dlm_begin_exit_domain(struct dlm_ctxt *dlm)
 597{
 598        int node = -1;
 599
 600        /* Support for begin exit domain was added in 1.2 */
 601        if (dlm->dlm_locking_proto.pv_major == 1 &&
 602            dlm->dlm_locking_proto.pv_minor < 2)
 603                return;
 604
 605        /*
 606         * Unlike DLM_EXIT_DOMAIN_MSG, DLM_BEGIN_EXIT_DOMAIN_MSG is purely
 607         * informational. Meaning if a node does not receive the message,
 608         * so be it.
 609         */
 610        spin_lock(&dlm->spinlock);
 611        while (1) {
 612                node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, node + 1);
 613                if (node >= O2NM_MAX_NODES)
 614                        break;
 615                if (node == dlm->node_num)
 616                        continue;
 617
 618                spin_unlock(&dlm->spinlock);
 619                dlm_send_one_domain_exit(dlm, DLM_BEGIN_EXIT_DOMAIN_MSG, node);
 620                spin_lock(&dlm->spinlock);
 621        }
 622        spin_unlock(&dlm->spinlock);
 623}
 624
 625static void dlm_leave_domain(struct dlm_ctxt *dlm)
 626{
 627        int node, clear_node, status;
 628
 629        /* At this point we've migrated away all our locks and won't
 630         * accept mastership of new ones. The dlm is responsible for
 631         * almost nothing now. We make sure not to confuse any joining
 632         * nodes and then commence shutdown procedure. */
 633
 634        spin_lock(&dlm->spinlock);
 635        /* Clear ourselves from the domain map */
 636        clear_bit(dlm->node_num, dlm->domain_map);
 637        while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
 638                                     0)) < O2NM_MAX_NODES) {
 639                /* Drop the dlm spinlock. This is safe wrt the domain_map.
 640                 * -nodes cannot be added now as the
 641                 *   query_join_handlers knows to respond with OK_NO_MAP
 642                 * -we catch the right network errors if a node is
 643                 *   removed from the map while we're sending him the
 644                 *   exit message. */
 645                spin_unlock(&dlm->spinlock);
 646
 647                clear_node = 1;
 648
 649                status = dlm_send_one_domain_exit(dlm, DLM_EXIT_DOMAIN_MSG,
 650                                                  node);
 651                if (status < 0 &&
 652                    status != -ENOPROTOOPT &&
 653                    status != -ENOTCONN) {
 654                        mlog(ML_NOTICE, "Error %d sending domain exit message "
 655                             "to node %d\n", status, node);
 656
 657                        /* Not sure what to do here but lets sleep for
 658                         * a bit in case this was a transient
 659                         * error... */
 660                        msleep(DLM_DOMAIN_BACKOFF_MS);
 661                        clear_node = 0;
 662                }
 663
 664                spin_lock(&dlm->spinlock);
 665                /* If we're not clearing the node bit then we intend
 666                 * to loop back around to try again. */
 667                if (clear_node)
 668                        clear_bit(node, dlm->domain_map);
 669        }
 670        spin_unlock(&dlm->spinlock);
 671}
 672
 673void dlm_unregister_domain(struct dlm_ctxt *dlm)
 674{
 675        int leave = 0;
 676        struct dlm_lock_resource *res;
 677
 678        spin_lock(&dlm_domain_lock);
 679        BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED);
 680        BUG_ON(!dlm->num_joins);
 681
 682        dlm->num_joins--;
 683        if (!dlm->num_joins) {
 684                /* We mark it "in shutdown" now so new register
 685                 * requests wait until we've completely left the
 686                 * domain. Don't use DLM_CTXT_LEAVING yet as we still
 687                 * want new domain joins to communicate with us at
 688                 * least until we've completed migration of our
 689                 * resources. */
 690                dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN;
 691                leave = 1;
 692        }
 693        spin_unlock(&dlm_domain_lock);
 694
 695        if (leave) {
 696                mlog(0, "shutting down domain %s\n", dlm->name);
 697                dlm_begin_exit_domain(dlm);
 698
 699                /* We changed dlm state, notify the thread */
 700                dlm_kick_thread(dlm, NULL);
 701
 702                while (dlm_migrate_all_locks(dlm)) {
 703                        /* Give dlm_thread time to purge the lockres' */
 704                        msleep(500);
 705                        mlog(0, "%s: more migration to do\n", dlm->name);
 706                }
 707
 708                /* This list should be empty. If not, print remaining lockres */
 709                if (!list_empty(&dlm->tracking_list)) {
 710                        mlog(ML_ERROR, "Following lockres' are still on the "
 711                             "tracking list:\n");
 712                        list_for_each_entry(res, &dlm->tracking_list, tracking)
 713                                dlm_print_one_lock_resource(res);
 714                }
 715
 716                dlm_mark_domain_leaving(dlm);
 717                dlm_leave_domain(dlm);
 718                printk(KERN_NOTICE "o2dlm: Leaving domain %s\n", dlm->name);
 719                dlm_force_free_mles(dlm);
 720                dlm_complete_dlm_shutdown(dlm);
 721        }
 722        dlm_put(dlm);
 723}
 724EXPORT_SYMBOL_GPL(dlm_unregister_domain);
 725
 726static int dlm_query_join_proto_check(char *proto_type, int node,
 727                                      struct dlm_protocol_version *ours,
 728                                      struct dlm_protocol_version *request)
 729{
 730        int rc;
 731        struct dlm_protocol_version proto = *request;
 732
 733        if (!dlm_protocol_compare(ours, &proto)) {
 734                mlog(0,
 735                     "node %u wanted to join with %s locking protocol "
 736                     "%u.%u, we respond with %u.%u\n",
 737                     node, proto_type,
 738                     request->pv_major,
 739                     request->pv_minor,
 740                     proto.pv_major, proto.pv_minor);
 741                request->pv_minor = proto.pv_minor;
 742                rc = 0;
 743        } else {
 744                mlog(ML_NOTICE,
 745                     "Node %u wanted to join with %s locking "
 746                     "protocol %u.%u, but we have %u.%u, disallowing\n",
 747                     node, proto_type,
 748                     request->pv_major,
 749                     request->pv_minor,
 750                     ours->pv_major,
 751                     ours->pv_minor);
 752                rc = 1;
 753        }
 754
 755        return rc;
 756}
 757
 758/*
 759 * struct dlm_query_join_packet is made up of four one-byte fields.  They
 760 * are effectively in big-endian order already.  However, little-endian
 761 * machines swap them before putting the packet on the wire (because
 762 * query_join's response is a status, and that status is treated as a u32
 763 * on the wire).  Thus, a big-endian and little-endian machines will treat
 764 * this structure differently.
 765 *
 766 * The solution is to have little-endian machines swap the structure when
 767 * converting from the structure to the u32 representation.  This will
 768 * result in the structure having the correct format on the wire no matter
 769 * the host endian format.
 770 */
 771static void dlm_query_join_packet_to_wire(struct dlm_query_join_packet *packet,
 772                                          u32 *wire)
 773{
 774        union dlm_query_join_response response;
 775
 776        response.packet = *packet;
 777        *wire = be32_to_cpu(response.intval);
 778}
 779
 780static void dlm_query_join_wire_to_packet(u32 wire,
 781                                          struct dlm_query_join_packet *packet)
 782{
 783        union dlm_query_join_response response;
 784
 785        response.intval = cpu_to_be32(wire);
 786        *packet = response.packet;
 787}
 788
 789static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
 790                                  void **ret_data)
 791{
 792        struct dlm_query_join_request *query;
 793        struct dlm_query_join_packet packet = {
 794                .code = JOIN_DISALLOW,
 795        };
 796        struct dlm_ctxt *dlm = NULL;
 797        u32 response;
 798        u8 nodenum;
 799
 800        query = (struct dlm_query_join_request *) msg->buf;
 801
 802        mlog(0, "node %u wants to join domain %s\n", query->node_idx,
 803                  query->domain);
 804
 805        /*
 806         * If heartbeat doesn't consider the node live, tell it
 807         * to back off and try again.  This gives heartbeat a chance
 808         * to catch up.
 809         */
 810        if (!o2hb_check_node_heartbeating_no_sem(query->node_idx)) {
 811                mlog(0, "node %u is not in our live map yet\n",
 812                     query->node_idx);
 813
 814                packet.code = JOIN_DISALLOW;
 815                goto respond;
 816        }
 817
 818        packet.code = JOIN_OK_NO_MAP;
 819
 820        spin_lock(&dlm_domain_lock);
 821        dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
 822        if (!dlm)
 823                goto unlock_respond;
 824
 825        /*
 826         * There is a small window where the joining node may not see the
 827         * node(s) that just left but still part of the cluster. DISALLOW
 828         * join request if joining node has different node map.
 829         */
 830        nodenum=0;
 831        while (nodenum < O2NM_MAX_NODES) {
 832                if (test_bit(nodenum, dlm->domain_map)) {
 833                        if (!byte_test_bit(nodenum, query->node_map)) {
 834                                mlog(0, "disallow join as node %u does not "
 835                                     "have node %u in its nodemap\n",
 836                                     query->node_idx, nodenum);
 837                                packet.code = JOIN_DISALLOW;
 838                                goto unlock_respond;
 839                        }
 840                }
 841                nodenum++;
 842        }
 843
 844        /* Once the dlm ctxt is marked as leaving then we don't want
 845         * to be put in someone's domain map.
 846         * Also, explicitly disallow joining at certain troublesome
 847         * times (ie. during recovery). */
 848        if (dlm->dlm_state != DLM_CTXT_LEAVING) {
 849                int bit = query->node_idx;
 850                spin_lock(&dlm->spinlock);
 851
 852                if (dlm->dlm_state == DLM_CTXT_NEW &&
 853                    dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) {
 854                        /*If this is a brand new context and we
 855                         * haven't started our join process yet, then
 856                         * the other node won the race. */
 857                        packet.code = JOIN_OK_NO_MAP;
 858                } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
 859                        /* Disallow parallel joins. */
 860                        packet.code = JOIN_DISALLOW;
 861                } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
 862                        mlog(0, "node %u trying to join, but recovery "
 863                             "is ongoing.\n", bit);
 864                        packet.code = JOIN_DISALLOW;
 865                } else if (test_bit(bit, dlm->recovery_map)) {
 866                        mlog(0, "node %u trying to join, but it "
 867                             "still needs recovery.\n", bit);
 868                        packet.code = JOIN_DISALLOW;
 869                } else if (test_bit(bit, dlm->domain_map)) {
 870                        mlog(0, "node %u trying to join, but it "
 871                             "is still in the domain! needs recovery?\n",
 872                             bit);
 873                        packet.code = JOIN_DISALLOW;
 874                } else {
 875                        /* Alright we're fully a part of this domain
 876                         * so we keep some state as to who's joining
 877                         * and indicate to him that needs to be fixed
 878                         * up. */
 879
 880                        /* Make sure we speak compatible locking protocols.  */
 881                        if (dlm_query_join_proto_check("DLM", bit,
 882                                                       &dlm->dlm_locking_proto,
 883                                                       &query->dlm_proto)) {
 884                                packet.code = JOIN_PROTOCOL_MISMATCH;
 885                        } else if (dlm_query_join_proto_check("fs", bit,
 886                                                              &dlm->fs_locking_proto,
 887                                                              &query->fs_proto)) {
 888                                packet.code = JOIN_PROTOCOL_MISMATCH;
 889                        } else {
 890                                packet.dlm_minor = query->dlm_proto.pv_minor;
 891                                packet.fs_minor = query->fs_proto.pv_minor;
 892                                packet.code = JOIN_OK;
 893                                __dlm_set_joining_node(dlm, query->node_idx);
 894                        }
 895                }
 896
 897                spin_unlock(&dlm->spinlock);
 898        }
 899unlock_respond:
 900        spin_unlock(&dlm_domain_lock);
 901
 902respond:
 903        mlog(0, "We respond with %u\n", packet.code);
 904
 905        dlm_query_join_packet_to_wire(&packet, &response);
 906        return response;
 907}
 908
 909static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
 910                                     void **ret_data)
 911{
 912        struct dlm_assert_joined *assert;
 913        struct dlm_ctxt *dlm = NULL;
 914
 915        assert = (struct dlm_assert_joined *) msg->buf;
 916
 917        mlog(0, "node %u asserts join on domain %s\n", assert->node_idx,
 918                  assert->domain);
 919
 920        spin_lock(&dlm_domain_lock);
 921        dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len);
 922        /* XXX should we consider no dlm ctxt an error? */
 923        if (dlm) {
 924                spin_lock(&dlm->spinlock);
 925
 926                /* Alright, this node has officially joined our
 927                 * domain. Set him in the map and clean up our
 928                 * leftover join state. */
 929                BUG_ON(dlm->joining_node != assert->node_idx);
 930
 931                if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
 932                        mlog(0, "dlm recovery is ongoing, disallow join\n");
 933                        spin_unlock(&dlm->spinlock);
 934                        spin_unlock(&dlm_domain_lock);
 935                        return -EAGAIN;
 936                }
 937
 938                set_bit(assert->node_idx, dlm->domain_map);
 939                clear_bit(assert->node_idx, dlm->exit_domain_map);
 940                __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
 941
 942                printk(KERN_NOTICE "o2dlm: Node %u joins domain %s ",
 943                       assert->node_idx, dlm->name);
 944                __dlm_print_nodes(dlm);
 945
 946                /* notify anything attached to the heartbeat events */
 947                dlm_hb_event_notify_attached(dlm, assert->node_idx, 1);
 948
 949                spin_unlock(&dlm->spinlock);
 950        }
 951        spin_unlock(&dlm_domain_lock);
 952
 953        return 0;
 954}
 955
 956static int dlm_match_regions(struct dlm_ctxt *dlm,
 957                             struct dlm_query_region *qr,
 958                             char *local, int locallen)
 959{
 960        char *remote = qr->qr_regions;
 961        char *l, *r;
 962        int localnr, i, j, foundit;
 963        int status = 0;
 964
 965        if (!o2hb_global_heartbeat_active()) {
 966                if (qr->qr_numregions) {
 967                        mlog(ML_ERROR, "Domain %s: Joining node %d has global "
 968                             "heartbeat enabled but local node %d does not\n",
 969                             qr->qr_domain, qr->qr_node, dlm->node_num);
 970                        status = -EINVAL;
 971                }
 972                goto bail;
 973        }
 974
 975        if (o2hb_global_heartbeat_active() && !qr->qr_numregions) {
 976                mlog(ML_ERROR, "Domain %s: Local node %d has global "
 977                     "heartbeat enabled but joining node %d does not\n",
 978                     qr->qr_domain, dlm->node_num, qr->qr_node);
 979                status = -EINVAL;
 980                goto bail;
 981        }
 982
 983        r = remote;
 984        for (i = 0; i < qr->qr_numregions; ++i) {
 985                mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, r);
 986                r += O2HB_MAX_REGION_NAME_LEN;
 987        }
 988
 989        localnr = min(O2NM_MAX_REGIONS, locallen/O2HB_MAX_REGION_NAME_LEN);
 990        localnr = o2hb_get_all_regions(local, (u8)localnr);
 991
 992        /* compare local regions with remote */
 993        l = local;
 994        for (i = 0; i < localnr; ++i) {
 995                foundit = 0;
 996                r = remote;
 997                for (j = 0; j <= qr->qr_numregions; ++j) {
 998                        if (!memcmp(l, r, O2HB_MAX_REGION_NAME_LEN)) {
 999                                foundit = 1;
1000                                break;
1001                        }
1002                        r += O2HB_MAX_REGION_NAME_LEN;
1003                }
1004                if (!foundit) {
1005                        status = -EINVAL;
1006                        mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
1007                             "in local node %d but not in joining node %d\n",
1008                             qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, l,
1009                             dlm->node_num, qr->qr_node);
1010                        goto bail;
1011                }
1012                l += O2HB_MAX_REGION_NAME_LEN;
1013        }
1014
1015        /* compare remote with local regions */
1016        r = remote;
1017        for (i = 0; i < qr->qr_numregions; ++i) {
1018                foundit = 0;
1019                l = local;
1020                for (j = 0; j < localnr; ++j) {
1021                        if (!memcmp(r, l, O2HB_MAX_REGION_NAME_LEN)) {
1022                                foundit = 1;
1023                                break;
1024                        }
1025                        l += O2HB_MAX_REGION_NAME_LEN;
1026                }
1027                if (!foundit) {
1028                        status = -EINVAL;
1029                        mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
1030                             "in joining node %d but not in local node %d\n",
1031                             qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, r,
1032                             qr->qr_node, dlm->node_num);
1033                        goto bail;
1034                }
1035                r += O2HB_MAX_REGION_NAME_LEN;
1036        }
1037
1038bail:
1039        return status;
1040}
1041
1042static int dlm_send_regions(struct dlm_ctxt *dlm, unsigned long *node_map)
1043{
1044        struct dlm_query_region *qr = NULL;
1045        int status, ret = 0, i;
1046        char *p;
1047
1048        if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1049                goto bail;
1050
1051        qr = kzalloc(sizeof(struct dlm_query_region), GFP_KERNEL);
1052        if (!qr) {
1053                ret = -ENOMEM;
1054                mlog_errno(ret);
1055                goto bail;
1056        }
1057
1058        qr->qr_node = dlm->node_num;
1059        qr->qr_namelen = strlen(dlm->name);
1060        memcpy(qr->qr_domain, dlm->name, qr->qr_namelen);
1061        /* if local hb, the numregions will be zero */
1062        if (o2hb_global_heartbeat_active())
1063                qr->qr_numregions = o2hb_get_all_regions(qr->qr_regions,
1064                                                         O2NM_MAX_REGIONS);
1065
1066        p = qr->qr_regions;
1067        for (i = 0; i < qr->qr_numregions; ++i, p += O2HB_MAX_REGION_NAME_LEN)
1068                mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, p);
1069
1070        i = -1;
1071        while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1072                                  i + 1)) < O2NM_MAX_NODES) {
1073                if (i == dlm->node_num)
1074                        continue;
1075
1076                mlog(0, "Sending regions to node %d\n", i);
1077
1078                ret = o2net_send_message(DLM_QUERY_REGION, DLM_MOD_KEY, qr,
1079                                         sizeof(struct dlm_query_region),
1080                                         i, &status);
1081                if (ret >= 0)
1082                        ret = status;
1083                if (ret) {
1084                        mlog(ML_ERROR, "Region mismatch %d, node %d\n",
1085                             ret, i);
1086                        break;
1087                }
1088        }
1089
1090bail:
1091        kfree(qr);
1092        return ret;
1093}
1094
1095static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
1096                                    void *data, void **ret_data)
1097{
1098        struct dlm_query_region *qr;
1099        struct dlm_ctxt *dlm = NULL;
1100        char *local = NULL;
1101        int status = 0;
1102
1103        qr = (struct dlm_query_region *) msg->buf;
1104
1105        mlog(0, "Node %u queries hb regions on domain %s\n", qr->qr_node,
1106             qr->qr_domain);
1107
1108        /* buffer used in dlm_mast_regions() */
1109        local = kmalloc(sizeof(qr->qr_regions), GFP_KERNEL);
1110        if (!local)
1111                return -ENOMEM;
1112
1113        status = -EINVAL;
1114
1115        spin_lock(&dlm_domain_lock);
1116        dlm = __dlm_lookup_domain_full(qr->qr_domain, qr->qr_namelen);
1117        if (!dlm) {
1118                mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1119                     "before join domain\n", qr->qr_node, qr->qr_domain);
1120                goto out_domain_lock;
1121        }
1122
1123        spin_lock(&dlm->spinlock);
1124        if (dlm->joining_node != qr->qr_node) {
1125                mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1126                     "but joining node is %d\n", qr->qr_node, qr->qr_domain,
1127                     dlm->joining_node);
1128                goto out_dlm_lock;
1129        }
1130
1131        /* Support for global heartbeat was added in 1.1 */
1132        if (dlm->dlm_locking_proto.pv_major == 1 &&
1133            dlm->dlm_locking_proto.pv_minor == 0) {
1134                mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1135                     "but active dlm protocol is %d.%d\n", qr->qr_node,
1136                     qr->qr_domain, dlm->dlm_locking_proto.pv_major,
1137                     dlm->dlm_locking_proto.pv_minor);
1138                goto out_dlm_lock;
1139        }
1140
1141        status = dlm_match_regions(dlm, qr, local, sizeof(qr->qr_regions));
1142
1143out_dlm_lock:
1144        spin_unlock(&dlm->spinlock);
1145
1146out_domain_lock:
1147        spin_unlock(&dlm_domain_lock);
1148
1149        kfree(local);
1150
1151        return status;
1152}
1153
1154static int dlm_match_nodes(struct dlm_ctxt *dlm, struct dlm_query_nodeinfo *qn)
1155{
1156        struct o2nm_node *local;
1157        struct dlm_node_info *remote;
1158        int i, j;
1159        int status = 0;
1160
1161        for (j = 0; j < qn->qn_numnodes; ++j)
1162                mlog(0, "Node %3d, %pI4:%u\n", qn->qn_nodes[j].ni_nodenum,
1163                     &(qn->qn_nodes[j].ni_ipv4_address),
1164                     ntohs(qn->qn_nodes[j].ni_ipv4_port));
1165
1166        for (i = 0; i < O2NM_MAX_NODES && !status; ++i) {
1167                local = o2nm_get_node_by_num(i);
1168                remote = NULL;
1169                for (j = 0; j < qn->qn_numnodes; ++j) {
1170                        if (qn->qn_nodes[j].ni_nodenum == i) {
1171                                remote = &(qn->qn_nodes[j]);
1172                                break;
1173                        }
1174                }
1175
1176                if (!local && !remote)
1177                        continue;
1178
1179                if ((local && !remote) || (!local && remote))
1180                        status = -EINVAL;
1181
1182                if (!status &&
1183                    ((remote->ni_nodenum != local->nd_num) ||
1184                     (remote->ni_ipv4_port != local->nd_ipv4_port) ||
1185                     (remote->ni_ipv4_address != local->nd_ipv4_address)))
1186                        status = -EINVAL;
1187
1188                if (status) {
1189                        if (remote && !local)
1190                                mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
1191                                     "registered in joining node %d but not in "
1192                                     "local node %d\n", qn->qn_domain,
1193                                     remote->ni_nodenum,
1194                                     &(remote->ni_ipv4_address),
1195                                     ntohs(remote->ni_ipv4_port),
1196                                     qn->qn_nodenum, dlm->node_num);
1197                        if (local && !remote)
1198                                mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
1199                                     "registered in local node %d but not in "
1200                                     "joining node %d\n", qn->qn_domain,
1201                                     local->nd_num, &(local->nd_ipv4_address),
1202                                     ntohs(local->nd_ipv4_port),
1203                                     dlm->node_num, qn->qn_nodenum);
1204                        BUG_ON((!local && !remote));
1205                }
1206
1207                if (local)
1208                        o2nm_node_put(local);
1209        }
1210
1211        return status;
1212}
1213
1214static int dlm_send_nodeinfo(struct dlm_ctxt *dlm, unsigned long *node_map)
1215{
1216        struct dlm_query_nodeinfo *qn = NULL;
1217        struct o2nm_node *node;
1218        int ret = 0, status, count, i;
1219
1220        if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1221                goto bail;
1222
1223        qn = kzalloc(sizeof(struct dlm_query_nodeinfo), GFP_KERNEL);
1224        if (!qn) {
1225                ret = -ENOMEM;
1226                mlog_errno(ret);
1227                goto bail;
1228        }
1229
1230        for (i = 0, count = 0; i < O2NM_MAX_NODES; ++i) {
1231                node = o2nm_get_node_by_num(i);
1232                if (!node)
1233                        continue;
1234                qn->qn_nodes[count].ni_nodenum = node->nd_num;
1235                qn->qn_nodes[count].ni_ipv4_port = node->nd_ipv4_port;
1236                qn->qn_nodes[count].ni_ipv4_address = node->nd_ipv4_address;
1237                mlog(0, "Node %3d, %pI4:%u\n", node->nd_num,
1238                     &(node->nd_ipv4_address), ntohs(node->nd_ipv4_port));
1239                ++count;
1240                o2nm_node_put(node);
1241        }
1242
1243        qn->qn_nodenum = dlm->node_num;
1244        qn->qn_numnodes = count;
1245        qn->qn_namelen = strlen(dlm->name);
1246        memcpy(qn->qn_domain, dlm->name, qn->qn_namelen);
1247
1248        i = -1;
1249        while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1250                                  i + 1)) < O2NM_MAX_NODES) {
1251                if (i == dlm->node_num)
1252                        continue;
1253
1254                mlog(0, "Sending nodeinfo to node %d\n", i);
1255
1256                ret = o2net_send_message(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
1257                                         qn, sizeof(struct dlm_query_nodeinfo),
1258                                         i, &status);
1259                if (ret >= 0)
1260                        ret = status;
1261                if (ret) {
1262                        mlog(ML_ERROR, "node mismatch %d, node %d\n", ret, i);
1263                        break;
1264                }
1265        }
1266
1267bail:
1268        kfree(qn);
1269        return ret;
1270}
1271
1272static int dlm_query_nodeinfo_handler(struct o2net_msg *msg, u32 len,
1273                                      void *data, void **ret_data)
1274{
1275        struct dlm_query_nodeinfo *qn;
1276        struct dlm_ctxt *dlm = NULL;
1277        int locked = 0, status = -EINVAL;
1278
1279        qn = (struct dlm_query_nodeinfo *) msg->buf;
1280
1281        mlog(0, "Node %u queries nodes on domain %s\n", qn->qn_nodenum,
1282             qn->qn_domain);
1283
1284        spin_lock(&dlm_domain_lock);
1285        dlm = __dlm_lookup_domain_full(qn->qn_domain, qn->qn_namelen);
1286        if (!dlm) {
1287                mlog(ML_ERROR, "Node %d queried nodes on domain %s before "
1288                     "join domain\n", qn->qn_nodenum, qn->qn_domain);
1289                goto bail;
1290        }
1291
1292        spin_lock(&dlm->spinlock);
1293        locked = 1;
1294        if (dlm->joining_node != qn->qn_nodenum) {
1295                mlog(ML_ERROR, "Node %d queried nodes on domain %s but "
1296                     "joining node is %d\n", qn->qn_nodenum, qn->qn_domain,
1297                     dlm->joining_node);
1298                goto bail;
1299        }
1300
1301        /* Support for node query was added in 1.1 */
1302        if (dlm->dlm_locking_proto.pv_major == 1 &&
1303            dlm->dlm_locking_proto.pv_minor == 0) {
1304                mlog(ML_ERROR, "Node %d queried nodes on domain %s "
1305                     "but active dlm protocol is %d.%d\n", qn->qn_nodenum,
1306                     qn->qn_domain, dlm->dlm_locking_proto.pv_major,
1307                     dlm->dlm_locking_proto.pv_minor);
1308                goto bail;
1309        }
1310
1311        status = dlm_match_nodes(dlm, qn);
1312
1313bail:
1314        if (locked)
1315                spin_unlock(&dlm->spinlock);
1316        spin_unlock(&dlm_domain_lock);
1317
1318        return status;
1319}
1320
1321static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
1322                                   void **ret_data)
1323{
1324        struct dlm_cancel_join *cancel;
1325        struct dlm_ctxt *dlm = NULL;
1326
1327        cancel = (struct dlm_cancel_join *) msg->buf;
1328
1329        mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx,
1330                  cancel->domain);
1331
1332        spin_lock(&dlm_domain_lock);
1333        dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len);
1334
1335        if (dlm) {
1336                spin_lock(&dlm->spinlock);
1337
1338                /* Yikes, this guy wants to cancel his join. No
1339                 * problem, we simply cleanup our join state. */
1340                BUG_ON(dlm->joining_node != cancel->node_idx);
1341                __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1342
1343                spin_unlock(&dlm->spinlock);
1344        }
1345        spin_unlock(&dlm_domain_lock);
1346
1347        return 0;
1348}
1349
1350static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm,
1351                                    unsigned int node)
1352{
1353        int status;
1354        struct dlm_cancel_join cancel_msg;
1355
1356        memset(&cancel_msg, 0, sizeof(cancel_msg));
1357        cancel_msg.node_idx = dlm->node_num;
1358        cancel_msg.name_len = strlen(dlm->name);
1359        memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len);
1360
1361        status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1362                                    &cancel_msg, sizeof(cancel_msg), node,
1363                                    NULL);
1364        if (status < 0) {
1365                mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1366                     "node %u\n", status, DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1367                     node);
1368                goto bail;
1369        }
1370
1371bail:
1372        return status;
1373}
1374
1375/* map_size should be in bytes. */
1376static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
1377                                 unsigned long *node_map,
1378                                 unsigned int map_size)
1379{
1380        int status, tmpstat;
1381        int node;
1382
1383        if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) *
1384                         sizeof(unsigned long))) {
1385                mlog(ML_ERROR,
1386                     "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n",
1387                     map_size, (unsigned)BITS_TO_LONGS(O2NM_MAX_NODES));
1388                return -EINVAL;
1389        }
1390
1391        status = 0;
1392        node = -1;
1393        while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
1394                                     node + 1)) < O2NM_MAX_NODES) {
1395                if (node == dlm->node_num)
1396                        continue;
1397
1398                tmpstat = dlm_send_one_join_cancel(dlm, node);
1399                if (tmpstat) {
1400                        mlog(ML_ERROR, "Error return %d cancelling join on "
1401                             "node %d\n", tmpstat, node);
1402                        if (!status)
1403                                status = tmpstat;
1404                }
1405        }
1406
1407        if (status)
1408                mlog_errno(status);
1409        return status;
1410}
1411
1412static int dlm_request_join(struct dlm_ctxt *dlm,
1413                            int node,
1414                            enum dlm_query_join_response_code *response)
1415{
1416        int status;
1417        struct dlm_query_join_request join_msg;
1418        struct dlm_query_join_packet packet;
1419        u32 join_resp;
1420
1421        mlog(0, "querying node %d\n", node);
1422
1423        memset(&join_msg, 0, sizeof(join_msg));
1424        join_msg.node_idx = dlm->node_num;
1425        join_msg.name_len = strlen(dlm->name);
1426        memcpy(join_msg.domain, dlm->name, join_msg.name_len);
1427        join_msg.dlm_proto = dlm->dlm_locking_proto;
1428        join_msg.fs_proto = dlm->fs_locking_proto;
1429
1430        /* copy live node map to join message */
1431        byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES);
1432
1433        status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
1434                                    sizeof(join_msg), node, &join_resp);
1435        if (status < 0 && status != -ENOPROTOOPT) {
1436                mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1437                     "node %u\n", status, DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1438                     node);
1439                goto bail;
1440        }
1441        dlm_query_join_wire_to_packet(join_resp, &packet);
1442
1443        /* -ENOPROTOOPT from the net code means the other side isn't
1444            listening for our message type -- that's fine, it means
1445            his dlm isn't up, so we can consider him a 'yes' but not
1446            joined into the domain.  */
1447        if (status == -ENOPROTOOPT) {
1448                status = 0;
1449                *response = JOIN_OK_NO_MAP;
1450        } else {
1451                *response = packet.code;
1452                switch (packet.code) {
1453                case JOIN_DISALLOW:
1454                case JOIN_OK_NO_MAP:
1455                        break;
1456                case JOIN_PROTOCOL_MISMATCH:
1457                        mlog(ML_NOTICE,
1458                             "This node requested DLM locking protocol %u.%u and "
1459                             "filesystem locking protocol %u.%u.  At least one of "
1460                             "the protocol versions on node %d is not compatible, "
1461                             "disconnecting\n",
1462                             dlm->dlm_locking_proto.pv_major,
1463                             dlm->dlm_locking_proto.pv_minor,
1464                             dlm->fs_locking_proto.pv_major,
1465                             dlm->fs_locking_proto.pv_minor,
1466                             node);
1467                        status = -EPROTO;
1468                        break;
1469                case JOIN_OK:
1470                        /* Use the same locking protocol as the remote node */
1471                        dlm->dlm_locking_proto.pv_minor = packet.dlm_minor;
1472                        dlm->fs_locking_proto.pv_minor = packet.fs_minor;
1473                        mlog(0,
1474                             "Node %d responds JOIN_OK with DLM locking protocol "
1475                             "%u.%u and fs locking protocol %u.%u\n",
1476                             node,
1477                             dlm->dlm_locking_proto.pv_major,
1478                             dlm->dlm_locking_proto.pv_minor,
1479                             dlm->fs_locking_proto.pv_major,
1480                             dlm->fs_locking_proto.pv_minor);
1481                        break;
1482                default:
1483                        status = -EINVAL;
1484                        mlog(ML_ERROR, "invalid response %d from node %u\n",
1485                             packet.code, node);
1486                        /* Reset response to JOIN_DISALLOW */
1487                        *response = JOIN_DISALLOW;
1488                        break;
1489                }
1490        }
1491
1492        mlog(0, "status %d, node %d response is %d\n", status, node,
1493             *response);
1494
1495bail:
1496        return status;
1497}
1498
1499static int dlm_send_one_join_assert(struct dlm_ctxt *dlm,
1500                                    unsigned int node)
1501{
1502        int status;
1503        int ret;
1504        struct dlm_assert_joined assert_msg;
1505
1506        mlog(0, "Sending join assert to node %u\n", node);
1507
1508        memset(&assert_msg, 0, sizeof(assert_msg));
1509        assert_msg.node_idx = dlm->node_num;
1510        assert_msg.name_len = strlen(dlm->name);
1511        memcpy(assert_msg.domain, dlm->name, assert_msg.name_len);
1512
1513        status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1514                                    &assert_msg, sizeof(assert_msg), node,
1515                                    &ret);
1516        if (status < 0)
1517                mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1518                     "node %u\n", status, DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1519                     node);
1520        else
1521                status = ret;
1522
1523        return status;
1524}
1525
1526static void dlm_send_join_asserts(struct dlm_ctxt *dlm,
1527                                  unsigned long *node_map)
1528{
1529        int status, node, live;
1530
1531        status = 0;
1532        node = -1;
1533        while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
1534                                     node + 1)) < O2NM_MAX_NODES) {
1535                if (node == dlm->node_num)
1536                        continue;
1537
1538                do {
1539                        /* It is very important that this message be
1540                         * received so we spin until either the node
1541                         * has died or it gets the message. */
1542                        status = dlm_send_one_join_assert(dlm, node);
1543
1544                        spin_lock(&dlm->spinlock);
1545                        live = test_bit(node, dlm->live_nodes_map);
1546                        spin_unlock(&dlm->spinlock);
1547
1548                        if (status) {
1549                                mlog(ML_ERROR, "Error return %d asserting "
1550                                     "join on node %d\n", status, node);
1551
1552                                /* give us some time between errors... */
1553                                if (live)
1554                                        msleep(DLM_DOMAIN_BACKOFF_MS);
1555                        }
1556                } while (status && live);
1557        }
1558}
1559
1560struct domain_join_ctxt {
1561        unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1562        unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1563};
1564
1565static int dlm_should_restart_join(struct dlm_ctxt *dlm,
1566                                   struct domain_join_ctxt *ctxt,
1567                                   enum dlm_query_join_response_code response)
1568{
1569        int ret;
1570
1571        if (response == JOIN_DISALLOW) {
1572                mlog(0, "Latest response of disallow -- should restart\n");
1573                return 1;
1574        }
1575
1576        spin_lock(&dlm->spinlock);
1577        /* For now, we restart the process if the node maps have
1578         * changed at all */
1579        ret = memcmp(ctxt->live_map, dlm->live_nodes_map,
1580                     sizeof(dlm->live_nodes_map));
1581        spin_unlock(&dlm->spinlock);
1582
1583        if (ret)
1584                mlog(0, "Node maps changed -- should restart\n");
1585
1586        return ret;
1587}
1588
1589static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
1590{
1591        int status = 0, tmpstat, node;
1592        struct domain_join_ctxt *ctxt;
1593        enum dlm_query_join_response_code response = JOIN_DISALLOW;
1594
1595        mlog(0, "%p", dlm);
1596
1597        ctxt = kzalloc(sizeof(*ctxt), GFP_KERNEL);
1598        if (!ctxt) {
1599                status = -ENOMEM;
1600                mlog_errno(status);
1601                goto bail;
1602        }
1603
1604        /* group sem locking should work for us here -- we're already
1605         * registered for heartbeat events so filling this should be
1606         * atomic wrt getting those handlers called. */
1607        o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map));
1608
1609        spin_lock(&dlm->spinlock);
1610        memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map));
1611
1612        __dlm_set_joining_node(dlm, dlm->node_num);
1613
1614        spin_unlock(&dlm->spinlock);
1615
1616        node = -1;
1617        while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES,
1618                                     node + 1)) < O2NM_MAX_NODES) {
1619                if (node == dlm->node_num)
1620                        continue;
1621
1622                status = dlm_request_join(dlm, node, &response);
1623                if (status < 0) {
1624                        mlog_errno(status);
1625                        goto bail;
1626                }
1627
1628                /* Ok, either we got a response or the node doesn't have a
1629                 * dlm up. */
1630                if (response == JOIN_OK)
1631                        set_bit(node, ctxt->yes_resp_map);
1632
1633                if (dlm_should_restart_join(dlm, ctxt, response)) {
1634                        status = -EAGAIN;
1635                        goto bail;
1636                }
1637        }
1638
1639        mlog(0, "Yay, done querying nodes!\n");
1640
1641        /* Yay, everyone agree's we can join the domain. My domain is
1642         * comprised of all nodes who were put in the
1643         * yes_resp_map. Copy that into our domain map and send a join
1644         * assert message to clean up everyone elses state. */
1645        spin_lock(&dlm->spinlock);
1646        memcpy(dlm->domain_map, ctxt->yes_resp_map,
1647               sizeof(ctxt->yes_resp_map));
1648        set_bit(dlm->node_num, dlm->domain_map);
1649        spin_unlock(&dlm->spinlock);
1650
1651        /* Support for global heartbeat and node info was added in 1.1 */
1652        if (dlm->dlm_locking_proto.pv_major > 1 ||
1653            dlm->dlm_locking_proto.pv_minor > 0) {
1654                status = dlm_send_nodeinfo(dlm, ctxt->yes_resp_map);
1655                if (status) {
1656                        mlog_errno(status);
1657                        goto bail;
1658                }
1659                status = dlm_send_regions(dlm, ctxt->yes_resp_map);
1660                if (status) {
1661                        mlog_errno(status);
1662                        goto bail;
1663                }
1664        }
1665
1666        dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
1667
1668        /* Joined state *must* be set before the joining node
1669         * information, otherwise the query_join handler may read no
1670         * current joiner but a state of NEW and tell joining nodes
1671         * we're not in the domain. */
1672        spin_lock(&dlm_domain_lock);
1673        dlm->dlm_state = DLM_CTXT_JOINED;
1674        dlm->num_joins++;
1675        spin_unlock(&dlm_domain_lock);
1676
1677bail:
1678        spin_lock(&dlm->spinlock);
1679        __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1680        if (!status) {
1681                printk(KERN_NOTICE "o2dlm: Joining domain %s ", dlm->name);
1682                __dlm_print_nodes(dlm);
1683        }
1684        spin_unlock(&dlm->spinlock);
1685
1686        if (ctxt) {
1687                /* Do we need to send a cancel message to any nodes? */
1688                if (status < 0) {
1689                        tmpstat = dlm_send_join_cancels(dlm,
1690                                                        ctxt->yes_resp_map,
1691                                                        sizeof(ctxt->yes_resp_map));
1692                        if (tmpstat < 0)
1693                                mlog_errno(tmpstat);
1694                }
1695                kfree(ctxt);
1696        }
1697
1698        mlog(0, "returning %d\n", status);
1699        return status;
1700}
1701
1702static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm)
1703{
1704        o2hb_unregister_callback(dlm->name, &dlm->dlm_hb_up);
1705        o2hb_unregister_callback(dlm->name, &dlm->dlm_hb_down);
1706        o2net_unregister_handler_list(&dlm->dlm_domain_handlers);
1707}
1708
1709static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
1710{
1711        int status;
1712
1713        mlog(0, "registering handlers.\n");
1714
1715        o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB,
1716                            dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI);
1717        o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB,
1718                            dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI);
1719
1720        status = o2hb_register_callback(dlm->name, &dlm->dlm_hb_down);
1721        if (status)
1722                goto bail;
1723
1724        status = o2hb_register_callback(dlm->name, &dlm->dlm_hb_up);
1725        if (status)
1726                goto bail;
1727
1728        status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
1729                                        sizeof(struct dlm_master_request),
1730                                        dlm_master_request_handler,
1731                                        dlm, NULL, &dlm->dlm_domain_handlers);
1732        if (status)
1733                goto bail;
1734
1735        status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
1736                                        sizeof(struct dlm_assert_master),
1737                                        dlm_assert_master_handler,
1738                                        dlm, dlm_assert_master_post_handler,
1739                                        &dlm->dlm_domain_handlers);
1740        if (status)
1741                goto bail;
1742
1743        status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
1744                                        sizeof(struct dlm_create_lock),
1745                                        dlm_create_lock_handler,
1746                                        dlm, NULL, &dlm->dlm_domain_handlers);
1747        if (status)
1748                goto bail;
1749
1750        status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
1751                                        DLM_CONVERT_LOCK_MAX_LEN,
1752                                        dlm_convert_lock_handler,
1753                                        dlm, NULL, &dlm->dlm_domain_handlers);
1754        if (status)
1755                goto bail;
1756
1757        status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
1758                                        DLM_UNLOCK_LOCK_MAX_LEN,
1759                                        dlm_unlock_lock_handler,
1760                                        dlm, NULL, &dlm->dlm_domain_handlers);
1761        if (status)
1762                goto bail;
1763
1764        status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
1765                                        DLM_PROXY_AST_MAX_LEN,
1766                                        dlm_proxy_ast_handler,
1767                                        dlm, NULL, &dlm->dlm_domain_handlers);
1768        if (status)
1769                goto bail;
1770
1771        status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
1772                                        sizeof(struct dlm_exit_domain),
1773                                        dlm_exit_domain_handler,
1774                                        dlm, NULL, &dlm->dlm_domain_handlers);
1775        if (status)
1776                goto bail;
1777
1778        status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
1779                                        sizeof(struct dlm_deref_lockres),
1780                                        dlm_deref_lockres_handler,
1781                                        dlm, NULL, &dlm->dlm_domain_handlers);
1782        if (status)
1783                goto bail;
1784
1785        status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1786                                        sizeof(struct dlm_migrate_request),
1787                                        dlm_migrate_request_handler,
1788                                        dlm, NULL, &dlm->dlm_domain_handlers);
1789        if (status)
1790                goto bail;
1791
1792        status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
1793                                        DLM_MIG_LOCKRES_MAX_LEN,
1794                                        dlm_mig_lockres_handler,
1795                                        dlm, NULL, &dlm->dlm_domain_handlers);
1796        if (status)
1797                goto bail;
1798
1799        status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
1800                                        sizeof(struct dlm_master_requery),
1801                                        dlm_master_requery_handler,
1802                                        dlm, NULL, &dlm->dlm_domain_handlers);
1803        if (status)
1804                goto bail;
1805
1806        status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
1807                                        sizeof(struct dlm_lock_request),
1808                                        dlm_request_all_locks_handler,
1809                                        dlm, NULL, &dlm->dlm_domain_handlers);
1810        if (status)
1811                goto bail;
1812
1813        status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
1814                                        sizeof(struct dlm_reco_data_done),
1815                                        dlm_reco_data_done_handler,
1816                                        dlm, NULL, &dlm->dlm_domain_handlers);
1817        if (status)
1818                goto bail;
1819
1820        status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
1821                                        sizeof(struct dlm_begin_reco),
1822                                        dlm_begin_reco_handler,
1823                                        dlm, NULL, &dlm->dlm_domain_handlers);
1824        if (status)
1825                goto bail;
1826
1827        status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
1828                                        sizeof(struct dlm_finalize_reco),
1829                                        dlm_finalize_reco_handler,
1830                                        dlm, NULL, &dlm->dlm_domain_handlers);
1831        if (status)
1832                goto bail;
1833
1834        status = o2net_register_handler(DLM_BEGIN_EXIT_DOMAIN_MSG, dlm->key,
1835                                        sizeof(struct dlm_exit_domain),
1836                                        dlm_begin_exit_domain_handler,
1837                                        dlm, NULL, &dlm->dlm_domain_handlers);
1838        if (status)
1839                goto bail;
1840
1841        status = o2net_register_handler(DLM_DEREF_LOCKRES_DONE, dlm->key,
1842                                        sizeof(struct dlm_deref_lockres_done),
1843                                        dlm_deref_lockres_done_handler,
1844                                        dlm, NULL, &dlm->dlm_domain_handlers);
1845bail:
1846        if (status)
1847                dlm_unregister_domain_handlers(dlm);
1848
1849        return status;
1850}
1851
1852static int dlm_join_domain(struct dlm_ctxt *dlm)
1853{
1854        int status;
1855        unsigned int backoff;
1856        unsigned int total_backoff = 0;
1857        char wq_name[O2NM_MAX_NAME_LEN];
1858
1859        BUG_ON(!dlm);
1860
1861        mlog(0, "Join domain %s\n", dlm->name);
1862
1863        status = dlm_register_domain_handlers(dlm);
1864        if (status) {
1865                mlog_errno(status);
1866                goto bail;
1867        }
1868
1869        status = dlm_launch_thread(dlm);
1870        if (status < 0) {
1871                mlog_errno(status);
1872                goto bail;
1873        }
1874
1875        status = dlm_launch_recovery_thread(dlm);
1876        if (status < 0) {
1877                mlog_errno(status);
1878                goto bail;
1879        }
1880
1881        dlm_debug_init(dlm);
1882
1883        snprintf(wq_name, O2NM_MAX_NAME_LEN, "dlm_wq-%s", dlm->name);
1884        dlm->dlm_worker = alloc_workqueue(wq_name, WQ_MEM_RECLAIM, 0);
1885        if (!dlm->dlm_worker) {
1886                status = -ENOMEM;
1887                mlog_errno(status);
1888                goto bail;
1889        }
1890
1891        do {
1892                status = dlm_try_to_join_domain(dlm);
1893
1894                /* If we're racing another node to the join, then we
1895                 * need to back off temporarily and let them
1896                 * complete. */
1897#define DLM_JOIN_TIMEOUT_MSECS  90000
1898                if (status == -EAGAIN) {
1899                        if (signal_pending(current)) {
1900                                status = -ERESTARTSYS;
1901                                goto bail;
1902                        }
1903
1904                        if (total_backoff > DLM_JOIN_TIMEOUT_MSECS) {
1905                                status = -ERESTARTSYS;
1906                                mlog(ML_NOTICE, "Timed out joining dlm domain "
1907                                     "%s after %u msecs\n", dlm->name,
1908                                     total_backoff);
1909                                goto bail;
1910                        }
1911
1912                        /*
1913                         * <chip> After you!
1914                         * <dale> No, after you!
1915                         * <chip> I insist!
1916                         * <dale> But you first!
1917                         * ...
1918                         */
1919                        backoff = (unsigned int)(jiffies & 0x3);
1920                        backoff *= DLM_DOMAIN_BACKOFF_MS;
1921                        total_backoff += backoff;
1922                        mlog(0, "backoff %d\n", backoff);
1923                        msleep(backoff);
1924                }
1925        } while (status == -EAGAIN);
1926
1927        if (status < 0) {
1928                mlog_errno(status);
1929                goto bail;
1930        }
1931
1932        status = 0;
1933bail:
1934        wake_up(&dlm_domain_events);
1935
1936        if (status) {
1937                dlm_unregister_domain_handlers(dlm);
1938                dlm_complete_thread(dlm);
1939                dlm_complete_recovery_thread(dlm);
1940                dlm_destroy_dlm_worker(dlm);
1941        }
1942
1943        return status;
1944}
1945
1946static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1947                                u32 key)
1948{
1949        int i;
1950        int ret;
1951        struct dlm_ctxt *dlm = NULL;
1952
1953        dlm = kzalloc(sizeof(*dlm), GFP_KERNEL);
1954        if (!dlm) {
1955                ret = -ENOMEM;
1956                mlog_errno(ret);
1957                goto leave;
1958        }
1959
1960        dlm->name = kstrdup(domain, GFP_KERNEL);
1961        if (dlm->name == NULL) {
1962                ret = -ENOMEM;
1963                mlog_errno(ret);
1964                goto leave;
1965        }
1966
1967        dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES);
1968        if (!dlm->lockres_hash) {
1969                ret = -ENOMEM;
1970                mlog_errno(ret);
1971                goto leave;
1972        }
1973
1974        for (i = 0; i < DLM_HASH_BUCKETS; i++)
1975                INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i));
1976
1977        dlm->master_hash = (struct hlist_head **)
1978                                dlm_alloc_pagevec(DLM_HASH_PAGES);
1979        if (!dlm->master_hash) {
1980                ret = -ENOMEM;
1981                mlog_errno(ret);
1982                goto leave;
1983        }
1984
1985        for (i = 0; i < DLM_HASH_BUCKETS; i++)
1986                INIT_HLIST_HEAD(dlm_master_hash(dlm, i));
1987
1988        dlm->key = key;
1989        dlm->node_num = o2nm_this_node();
1990
1991        dlm_create_debugfs_subroot(dlm);
1992
1993        spin_lock_init(&dlm->spinlock);
1994        spin_lock_init(&dlm->master_lock);
1995        spin_lock_init(&dlm->ast_lock);
1996        spin_lock_init(&dlm->track_lock);
1997        INIT_LIST_HEAD(&dlm->list);
1998        INIT_LIST_HEAD(&dlm->dirty_list);
1999        INIT_LIST_HEAD(&dlm->reco.resources);
2000        INIT_LIST_HEAD(&dlm->reco.node_data);
2001        INIT_LIST_HEAD(&dlm->purge_list);
2002        INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
2003        INIT_LIST_HEAD(&dlm->tracking_list);
2004        dlm->reco.state = 0;
2005
2006        INIT_LIST_HEAD(&dlm->pending_asts);
2007        INIT_LIST_HEAD(&dlm->pending_basts);
2008
2009        mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n",
2010                  dlm->recovery_map, &(dlm->recovery_map[0]));
2011
2012        memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map));
2013        memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map));
2014        memset(dlm->domain_map, 0, sizeof(dlm->domain_map));
2015
2016        dlm->dlm_thread_task = NULL;
2017        dlm->dlm_reco_thread_task = NULL;
2018        dlm->dlm_worker = NULL;
2019        init_waitqueue_head(&dlm->dlm_thread_wq);
2020        init_waitqueue_head(&dlm->dlm_reco_thread_wq);
2021        init_waitqueue_head(&dlm->reco.event);
2022        init_waitqueue_head(&dlm->ast_wq);
2023        init_waitqueue_head(&dlm->migration_wq);
2024        INIT_LIST_HEAD(&dlm->mle_hb_events);
2025
2026        dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
2027        init_waitqueue_head(&dlm->dlm_join_events);
2028
2029        dlm->migrate_done = 0;
2030
2031        dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
2032        dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
2033
2034        atomic_set(&dlm->res_tot_count, 0);
2035        atomic_set(&dlm->res_cur_count, 0);
2036        for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) {
2037                atomic_set(&dlm->mle_tot_count[i], 0);
2038                atomic_set(&dlm->mle_cur_count[i], 0);
2039        }
2040
2041        spin_lock_init(&dlm->work_lock);
2042        INIT_LIST_HEAD(&dlm->work_list);
2043        INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work);
2044
2045        kref_init(&dlm->dlm_refs);
2046        dlm->dlm_state = DLM_CTXT_NEW;
2047
2048        INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks);
2049
2050        mlog(0, "context init: refcount %u\n",
2051                  kref_read(&dlm->dlm_refs));
2052
2053        ret = 0;
2054leave:
2055        if (ret < 0 && dlm) {
2056                if (dlm->master_hash)
2057                        dlm_free_pagevec((void **)dlm->master_hash,
2058                                        DLM_HASH_PAGES);
2059
2060                if (dlm->lockres_hash)
2061                        dlm_free_pagevec((void **)dlm->lockres_hash,
2062                                        DLM_HASH_PAGES);
2063
2064                kfree(dlm->name);
2065                kfree(dlm);
2066                dlm = NULL;
2067        }
2068        return dlm;
2069}
2070
2071/*
2072 * Compare a requested locking protocol version against the current one.
2073 *
2074 * If the major numbers are different, they are incompatible.
2075 * If the current minor is greater than the request, they are incompatible.
2076 * If the current minor is less than or equal to the request, they are
2077 * compatible, and the requester should run at the current minor version.
2078 */
2079static int dlm_protocol_compare(struct dlm_protocol_version *existing,
2080                                struct dlm_protocol_version *request)
2081{
2082        if (existing->pv_major != request->pv_major)
2083                return 1;
2084
2085        if (existing->pv_minor > request->pv_minor)
2086                return 1;
2087
2088        if (existing->pv_minor < request->pv_minor)
2089                request->pv_minor = existing->pv_minor;
2090
2091        return 0;
2092}
2093
2094/*
2095 * dlm_register_domain: one-time setup per "domain".
2096 *
2097 * The filesystem passes in the requested locking version via proto.
2098 * If registration was successful, proto will contain the negotiated
2099 * locking protocol.
2100 */
2101struct dlm_ctxt * dlm_register_domain(const char *domain,
2102                               u32 key,
2103                               struct dlm_protocol_version *fs_proto)
2104{
2105        int ret;
2106        struct dlm_ctxt *dlm = NULL;
2107        struct dlm_ctxt *new_ctxt = NULL;
2108
2109        if (strlen(domain) >= O2NM_MAX_NAME_LEN) {
2110                ret = -ENAMETOOLONG;
2111                mlog(ML_ERROR, "domain name length too long\n");
2112                goto leave;
2113        }
2114
2115        mlog(0, "register called for domain \"%s\"\n", domain);
2116
2117retry:
2118        dlm = NULL;
2119        if (signal_pending(current)) {
2120                ret = -ERESTARTSYS;
2121                mlog_errno(ret);
2122                goto leave;
2123        }
2124
2125        spin_lock(&dlm_domain_lock);
2126
2127        dlm = __dlm_lookup_domain(domain);
2128        if (dlm) {
2129                if (dlm->dlm_state != DLM_CTXT_JOINED) {
2130                        spin_unlock(&dlm_domain_lock);
2131
2132                        mlog(0, "This ctxt is not joined yet!\n");
2133                        wait_event_interruptible(dlm_domain_events,
2134                                                 dlm_wait_on_domain_helper(
2135                                                         domain));
2136                        goto retry;
2137                }
2138
2139                if (dlm_protocol_compare(&dlm->fs_locking_proto, fs_proto)) {
2140                        spin_unlock(&dlm_domain_lock);
2141                        mlog(ML_ERROR,
2142                             "Requested locking protocol version is not "
2143                             "compatible with already registered domain "
2144                             "\"%s\"\n", domain);
2145                        ret = -EPROTO;
2146                        goto leave;
2147                }
2148
2149                __dlm_get(dlm);
2150                dlm->num_joins++;
2151
2152                spin_unlock(&dlm_domain_lock);
2153
2154                ret = 0;
2155                goto leave;
2156        }
2157
2158        /* doesn't exist */
2159        if (!new_ctxt) {
2160                spin_unlock(&dlm_domain_lock);
2161
2162                new_ctxt = dlm_alloc_ctxt(domain, key);
2163                if (new_ctxt)
2164                        goto retry;
2165
2166                ret = -ENOMEM;
2167                mlog_errno(ret);
2168                goto leave;
2169        }
2170
2171        /* a little variable switch-a-roo here... */
2172        dlm = new_ctxt;
2173        new_ctxt = NULL;
2174
2175        /* add the new domain */
2176        list_add_tail(&dlm->list, &dlm_domains);
2177        spin_unlock(&dlm_domain_lock);
2178
2179        /*
2180         * Pass the locking protocol version into the join.  If the join
2181         * succeeds, it will have the negotiated protocol set.
2182         */
2183        dlm->dlm_locking_proto = dlm_protocol;
2184        dlm->fs_locking_proto = *fs_proto;
2185
2186        ret = dlm_join_domain(dlm);
2187        if (ret) {
2188                mlog_errno(ret);
2189                dlm_put(dlm);
2190                goto leave;
2191        }
2192
2193        /* Tell the caller what locking protocol we negotiated */
2194        *fs_proto = dlm->fs_locking_proto;
2195
2196        ret = 0;
2197leave:
2198        if (new_ctxt)
2199                dlm_free_ctxt_mem(new_ctxt);
2200
2201        if (ret < 0)
2202                dlm = ERR_PTR(ret);
2203
2204        return dlm;
2205}
2206EXPORT_SYMBOL_GPL(dlm_register_domain);
2207
2208static LIST_HEAD(dlm_join_handlers);
2209
2210static void dlm_unregister_net_handlers(void)
2211{
2212        o2net_unregister_handler_list(&dlm_join_handlers);
2213}
2214
2215static int dlm_register_net_handlers(void)
2216{
2217        int status = 0;
2218
2219        status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
2220                                        sizeof(struct dlm_query_join_request),
2221                                        dlm_query_join_handler,
2222                                        NULL, NULL, &dlm_join_handlers);
2223        if (status)
2224                goto bail;
2225
2226        status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
2227                                        sizeof(struct dlm_assert_joined),
2228                                        dlm_assert_joined_handler,
2229                                        NULL, NULL, &dlm_join_handlers);
2230        if (status)
2231                goto bail;
2232
2233        status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
2234                                        sizeof(struct dlm_cancel_join),
2235                                        dlm_cancel_join_handler,
2236                                        NULL, NULL, &dlm_join_handlers);
2237        if (status)
2238                goto bail;
2239
2240        status = o2net_register_handler(DLM_QUERY_REGION, DLM_MOD_KEY,
2241                                        sizeof(struct dlm_query_region),
2242                                        dlm_query_region_handler,
2243                                        NULL, NULL, &dlm_join_handlers);
2244
2245        if (status)
2246                goto bail;
2247
2248        status = o2net_register_handler(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
2249                                        sizeof(struct dlm_query_nodeinfo),
2250                                        dlm_query_nodeinfo_handler,
2251                                        NULL, NULL, &dlm_join_handlers);
2252bail:
2253        if (status < 0)
2254                dlm_unregister_net_handlers();
2255
2256        return status;
2257}
2258
2259/* Domain eviction callback handling.
2260 *
2261 * The file system requires notification of node death *before* the
2262 * dlm completes it's recovery work, otherwise it may be able to
2263 * acquire locks on resources requiring recovery. Since the dlm can
2264 * evict a node from it's domain *before* heartbeat fires, a similar
2265 * mechanism is required. */
2266
2267/* Eviction is not expected to happen often, so a per-domain lock is
2268 * not necessary. Eviction callbacks are allowed to sleep for short
2269 * periods of time. */
2270static DECLARE_RWSEM(dlm_callback_sem);
2271
2272void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm,
2273                                        int node_num)
2274{
2275        struct dlm_eviction_cb *cb;
2276
2277        down_read(&dlm_callback_sem);
2278        list_for_each_entry(cb, &dlm->dlm_eviction_callbacks, ec_item) {
2279                cb->ec_func(node_num, cb->ec_data);
2280        }
2281        up_read(&dlm_callback_sem);
2282}
2283
2284void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb,
2285                           dlm_eviction_func *f,
2286                           void *data)
2287{
2288        INIT_LIST_HEAD(&cb->ec_item);
2289        cb->ec_func = f;
2290        cb->ec_data = data;
2291}
2292EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb);
2293
2294void dlm_register_eviction_cb(struct dlm_ctxt *dlm,
2295                              struct dlm_eviction_cb *cb)
2296{
2297        down_write(&dlm_callback_sem);
2298        list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks);
2299        up_write(&dlm_callback_sem);
2300}
2301EXPORT_SYMBOL_GPL(dlm_register_eviction_cb);
2302
2303void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb)
2304{
2305        down_write(&dlm_callback_sem);
2306        list_del_init(&cb->ec_item);
2307        up_write(&dlm_callback_sem);
2308}
2309EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb);
2310
2311static int __init dlm_init(void)
2312{
2313        int status;
2314
2315        status = dlm_init_mle_cache();
2316        if (status) {
2317                mlog(ML_ERROR, "Could not create o2dlm_mle slabcache\n");
2318                goto error;
2319        }
2320
2321        status = dlm_init_master_caches();
2322        if (status) {
2323                mlog(ML_ERROR, "Could not create o2dlm_lockres and "
2324                     "o2dlm_lockname slabcaches\n");
2325                goto error;
2326        }
2327
2328        status = dlm_init_lock_cache();
2329        if (status) {
2330                mlog(ML_ERROR, "Count not create o2dlm_lock slabcache\n");
2331                goto error;
2332        }
2333
2334        status = dlm_register_net_handlers();
2335        if (status) {
2336                mlog(ML_ERROR, "Unable to register network handlers\n");
2337                goto error;
2338        }
2339
2340        dlm_create_debugfs_root();
2341
2342        return 0;
2343error:
2344        dlm_unregister_net_handlers();
2345        dlm_destroy_lock_cache();
2346        dlm_destroy_master_caches();
2347        dlm_destroy_mle_cache();
2348        return -1;
2349}
2350
2351static void __exit dlm_exit (void)
2352{
2353        dlm_destroy_debugfs_root();
2354        dlm_unregister_net_handlers();
2355        dlm_destroy_lock_cache();
2356        dlm_destroy_master_caches();
2357        dlm_destroy_mle_cache();
2358}
2359
2360MODULE_AUTHOR("Oracle");
2361MODULE_LICENSE("GPL");
2362MODULE_DESCRIPTION("OCFS2 Distributed Lock Management");
2363
2364module_init(dlm_init);
2365module_exit(dlm_exit);
2366