linux/fs/ocfs2/dlm/dlmdomain.c
<<
>>
Prefs
   1/* -*- mode: c; c-basic-offset: 8; -*-
   2 * vim: noexpandtab sw=8 ts=8 sts=0:
   3 *
   4 * dlmdomain.c
   5 *
   6 * defines domain join / leave apis
   7 *
   8 * Copyright (C) 2004 Oracle.  All rights reserved.
   9 *
  10 * This program is free software; you can redistribute it and/or
  11 * modify it under the terms of the GNU General Public
  12 * License as published by the Free Software Foundation; either
  13 * version 2 of the License, or (at your option) any later version.
  14 *
  15 * This program is distributed in the hope that it will be useful,
  16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  18 * General Public License for more details.
  19 *
  20 * You should have received a copy of the GNU General Public
  21 * License along with this program; if not, write to the
  22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  23 * Boston, MA 021110-1307, USA.
  24 *
  25 */
  26
  27#include <linux/module.h>
  28#include <linux/types.h>
  29#include <linux/slab.h>
  30#include <linux/highmem.h>
  31#include <linux/init.h>
  32#include <linux/spinlock.h>
  33#include <linux/delay.h>
  34#include <linux/err.h>
  35#include <linux/debugfs.h>
  36
  37#include "cluster/heartbeat.h"
  38#include "cluster/nodemanager.h"
  39#include "cluster/tcp.h"
  40
  41#include "dlmapi.h"
  42#include "dlmcommon.h"
  43#include "dlmdomain.h"
  44#include "dlmdebug.h"
  45
  46#include "dlmver.h"
  47
  48#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
  49#include "cluster/masklog.h"
  50
  51/*
  52 * ocfs2 node maps are array of long int, which limits to send them freely
  53 * across the wire due to endianness issues. To workaround this, we convert
  54 * long ints to byte arrays. Following 3 routines are helper functions to
  55 * set/test/copy bits within those array of bytes
  56 */
  57static inline void byte_set_bit(u8 nr, u8 map[])
  58{
  59        map[nr >> 3] |= (1UL << (nr & 7));
  60}
  61
  62static inline int byte_test_bit(u8 nr, u8 map[])
  63{
  64        return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0;
  65}
  66
  67static inline void byte_copymap(u8 dmap[], unsigned long smap[],
  68                        unsigned int sz)
  69{
  70        unsigned int nn;
  71
  72        if (!sz)
  73                return;
  74
  75        memset(dmap, 0, ((sz + 7) >> 3));
  76        for (nn = 0 ; nn < sz; nn++)
  77                if (test_bit(nn, smap))
  78                        byte_set_bit(nn, dmap);
  79}
  80
  81static void dlm_free_pagevec(void **vec, int pages)
  82{
  83        while (pages--)
  84                free_page((unsigned long)vec[pages]);
  85        kfree(vec);
  86}
  87
  88static void **dlm_alloc_pagevec(int pages)
  89{
  90        void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL);
  91        int i;
  92
  93        if (!vec)
  94                return NULL;
  95
  96        for (i = 0; i < pages; i++)
  97                if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL)))
  98                        goto out_free;
  99
 100        mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n",
 101             pages, (unsigned long)DLM_HASH_PAGES,
 102             (unsigned long)DLM_BUCKETS_PER_PAGE);
 103        return vec;
 104out_free:
 105        dlm_free_pagevec(vec, i);
 106        return NULL;
 107}
 108
 109/*
 110 *
 111 * spinlock lock ordering: if multiple locks are needed, obey this ordering:
 112 *    dlm_domain_lock
 113 *    struct dlm_ctxt->spinlock
 114 *    struct dlm_lock_resource->spinlock
 115 *    struct dlm_ctxt->master_lock
 116 *    struct dlm_ctxt->ast_lock
 117 *    dlm_master_list_entry->spinlock
 118 *    dlm_lock->spinlock
 119 *
 120 */
 121
 122DEFINE_SPINLOCK(dlm_domain_lock);
 123LIST_HEAD(dlm_domains);
 124static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
 125
 126/*
 127 * The supported protocol version for DLM communication.  Running domains
 128 * will have a negotiated version with the same major number and a minor
 129 * number equal or smaller.  The dlm_ctxt->dlm_locking_proto field should
 130 * be used to determine what a running domain is actually using.
 131 *
 132 * New in version 1.1:
 133 *      - Message DLM_QUERY_REGION added to support global heartbeat
 134 *      - Message DLM_QUERY_NODEINFO added to allow online node removes
 135 */
 136static const struct dlm_protocol_version dlm_protocol = {
 137        .pv_major = 1,
 138        .pv_minor = 1,
 139};
 140
 141#define DLM_DOMAIN_BACKOFF_MS 200
 142
 143static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
 144                                  void **ret_data);
 145static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
 146                                     void **ret_data);
 147static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
 148                                   void **ret_data);
 149static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
 150                                    void *data, void **ret_data);
 151static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
 152                                   void **ret_data);
 153static int dlm_protocol_compare(struct dlm_protocol_version *existing,
 154                                struct dlm_protocol_version *request);
 155
 156static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
 157
 158void __dlm_unhash_lockres(struct dlm_lock_resource *lockres)
 159{
 160        if (!hlist_unhashed(&lockres->hash_node)) {
 161                hlist_del_init(&lockres->hash_node);
 162                dlm_lockres_put(lockres);
 163        }
 164}
 165
 166void __dlm_insert_lockres(struct dlm_ctxt *dlm,
 167                       struct dlm_lock_resource *res)
 168{
 169        struct hlist_head *bucket;
 170        struct qstr *q;
 171
 172        assert_spin_locked(&dlm->spinlock);
 173
 174        q = &res->lockname;
 175        bucket = dlm_lockres_hash(dlm, q->hash);
 176
 177        /* get a reference for our hashtable */
 178        dlm_lockres_get(res);
 179
 180        hlist_add_head(&res->hash_node, bucket);
 181}
 182
 183struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
 184                                                     const char *name,
 185                                                     unsigned int len,
 186                                                     unsigned int hash)
 187{
 188        struct hlist_head *bucket;
 189        struct hlist_node *list;
 190
 191        mlog_entry("%.*s\n", len, name);
 192
 193        assert_spin_locked(&dlm->spinlock);
 194
 195        bucket = dlm_lockres_hash(dlm, hash);
 196
 197        hlist_for_each(list, bucket) {
 198                struct dlm_lock_resource *res = hlist_entry(list,
 199                        struct dlm_lock_resource, hash_node);
 200                if (res->lockname.name[0] != name[0])
 201                        continue;
 202                if (unlikely(res->lockname.len != len))
 203                        continue;
 204                if (memcmp(res->lockname.name + 1, name + 1, len - 1))
 205                        continue;
 206                dlm_lockres_get(res);
 207                return res;
 208        }
 209        return NULL;
 210}
 211
 212/* intended to be called by functions which do not care about lock
 213 * resources which are being purged (most net _handler functions).
 214 * this will return NULL for any lock resource which is found but
 215 * currently in the process of dropping its mastery reference.
 216 * use __dlm_lookup_lockres_full when you need the lock resource
 217 * regardless (e.g. dlm_get_lock_resource) */
 218struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
 219                                                const char *name,
 220                                                unsigned int len,
 221                                                unsigned int hash)
 222{
 223        struct dlm_lock_resource *res = NULL;
 224
 225        mlog_entry("%.*s\n", len, name);
 226
 227        assert_spin_locked(&dlm->spinlock);
 228
 229        res = __dlm_lookup_lockres_full(dlm, name, len, hash);
 230        if (res) {
 231                spin_lock(&res->spinlock);
 232                if (res->state & DLM_LOCK_RES_DROPPING_REF) {
 233                        spin_unlock(&res->spinlock);
 234                        dlm_lockres_put(res);
 235                        return NULL;
 236                }
 237                spin_unlock(&res->spinlock);
 238        }
 239
 240        return res;
 241}
 242
 243struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
 244                                    const char *name,
 245                                    unsigned int len)
 246{
 247        struct dlm_lock_resource *res;
 248        unsigned int hash = dlm_lockid_hash(name, len);
 249
 250        spin_lock(&dlm->spinlock);
 251        res = __dlm_lookup_lockres(dlm, name, len, hash);
 252        spin_unlock(&dlm->spinlock);
 253        return res;
 254}
 255
 256static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len)
 257{
 258        struct dlm_ctxt *tmp = NULL;
 259        struct list_head *iter;
 260
 261        assert_spin_locked(&dlm_domain_lock);
 262
 263        /* tmp->name here is always NULL terminated,
 264         * but domain may not be! */
 265        list_for_each(iter, &dlm_domains) {
 266                tmp = list_entry (iter, struct dlm_ctxt, list);
 267                if (strlen(tmp->name) == len &&
 268                    memcmp(tmp->name, domain, len)==0)
 269                        break;
 270                tmp = NULL;
 271        }
 272
 273        return tmp;
 274}
 275
 276/* For null terminated domain strings ONLY */
 277static struct dlm_ctxt * __dlm_lookup_domain(const char *domain)
 278{
 279        assert_spin_locked(&dlm_domain_lock);
 280
 281        return __dlm_lookup_domain_full(domain, strlen(domain));
 282}
 283
 284
 285/* returns true on one of two conditions:
 286 * 1) the domain does not exist
 287 * 2) the domain exists and it's state is "joined" */
 288static int dlm_wait_on_domain_helper(const char *domain)
 289{
 290        int ret = 0;
 291        struct dlm_ctxt *tmp = NULL;
 292
 293        spin_lock(&dlm_domain_lock);
 294
 295        tmp = __dlm_lookup_domain(domain);
 296        if (!tmp)
 297                ret = 1;
 298        else if (tmp->dlm_state == DLM_CTXT_JOINED)
 299                ret = 1;
 300
 301        spin_unlock(&dlm_domain_lock);
 302        return ret;
 303}
 304
 305static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
 306{
 307        dlm_destroy_debugfs_subroot(dlm);
 308
 309        if (dlm->lockres_hash)
 310                dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
 311
 312        if (dlm->master_hash)
 313                dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES);
 314
 315        if (dlm->name)
 316                kfree(dlm->name);
 317
 318        kfree(dlm);
 319}
 320
 321/* A little strange - this function will be called while holding
 322 * dlm_domain_lock and is expected to be holding it on the way out. We
 323 * will however drop and reacquire it multiple times */
 324static void dlm_ctxt_release(struct kref *kref)
 325{
 326        struct dlm_ctxt *dlm;
 327
 328        dlm = container_of(kref, struct dlm_ctxt, dlm_refs);
 329
 330        BUG_ON(dlm->num_joins);
 331        BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED);
 332
 333        /* we may still be in the list if we hit an error during join. */
 334        list_del_init(&dlm->list);
 335
 336        spin_unlock(&dlm_domain_lock);
 337
 338        mlog(0, "freeing memory from domain %s\n", dlm->name);
 339
 340        wake_up(&dlm_domain_events);
 341
 342        dlm_free_ctxt_mem(dlm);
 343
 344        spin_lock(&dlm_domain_lock);
 345}
 346
 347void dlm_put(struct dlm_ctxt *dlm)
 348{
 349        spin_lock(&dlm_domain_lock);
 350        kref_put(&dlm->dlm_refs, dlm_ctxt_release);
 351        spin_unlock(&dlm_domain_lock);
 352}
 353
 354static void __dlm_get(struct dlm_ctxt *dlm)
 355{
 356        kref_get(&dlm->dlm_refs);
 357}
 358
 359/* given a questionable reference to a dlm object, gets a reference if
 360 * it can find it in the list, otherwise returns NULL in which case
 361 * you shouldn't trust your pointer. */
 362struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm)
 363{
 364        struct list_head *iter;
 365        struct dlm_ctxt *target = NULL;
 366
 367        spin_lock(&dlm_domain_lock);
 368
 369        list_for_each(iter, &dlm_domains) {
 370                target = list_entry (iter, struct dlm_ctxt, list);
 371
 372                if (target == dlm) {
 373                        __dlm_get(target);
 374                        break;
 375                }
 376
 377                target = NULL;
 378        }
 379
 380        spin_unlock(&dlm_domain_lock);
 381
 382        return target;
 383}
 384
 385int dlm_domain_fully_joined(struct dlm_ctxt *dlm)
 386{
 387        int ret;
 388
 389        spin_lock(&dlm_domain_lock);
 390        ret = (dlm->dlm_state == DLM_CTXT_JOINED) ||
 391                (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN);
 392        spin_unlock(&dlm_domain_lock);
 393
 394        return ret;
 395}
 396
 397static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm)
 398{
 399        if (dlm->dlm_worker) {
 400                flush_workqueue(dlm->dlm_worker);
 401                destroy_workqueue(dlm->dlm_worker);
 402                dlm->dlm_worker = NULL;
 403        }
 404}
 405
 406static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
 407{
 408        dlm_unregister_domain_handlers(dlm);
 409        dlm_debug_shutdown(dlm);
 410        dlm_complete_thread(dlm);
 411        dlm_complete_recovery_thread(dlm);
 412        dlm_destroy_dlm_worker(dlm);
 413
 414        /* We've left the domain. Now we can take ourselves out of the
 415         * list and allow the kref stuff to help us free the
 416         * memory. */
 417        spin_lock(&dlm_domain_lock);
 418        list_del_init(&dlm->list);
 419        spin_unlock(&dlm_domain_lock);
 420
 421        /* Wake up anyone waiting for us to remove this domain */
 422        wake_up(&dlm_domain_events);
 423}
 424
 425static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
 426{
 427        int i, num, n, ret = 0;
 428        struct dlm_lock_resource *res;
 429        struct hlist_node *iter;
 430        struct hlist_head *bucket;
 431        int dropped;
 432
 433        mlog(0, "Migrating locks from domain %s\n", dlm->name);
 434
 435        num = 0;
 436        spin_lock(&dlm->spinlock);
 437        for (i = 0; i < DLM_HASH_BUCKETS; i++) {
 438redo_bucket:
 439                n = 0;
 440                bucket = dlm_lockres_hash(dlm, i);
 441                iter = bucket->first;
 442                while (iter) {
 443                        n++;
 444                        res = hlist_entry(iter, struct dlm_lock_resource,
 445                                          hash_node);
 446                        dlm_lockres_get(res);
 447                        /* migrate, if necessary.  this will drop the dlm
 448                         * spinlock and retake it if it does migration. */
 449                        dropped = dlm_empty_lockres(dlm, res);
 450
 451                        spin_lock(&res->spinlock);
 452                        __dlm_lockres_calc_usage(dlm, res);
 453                        iter = res->hash_node.next;
 454                        spin_unlock(&res->spinlock);
 455
 456                        dlm_lockres_put(res);
 457
 458                        if (dropped)
 459                                goto redo_bucket;
 460                }
 461                cond_resched_lock(&dlm->spinlock);
 462                num += n;
 463        }
 464        spin_unlock(&dlm->spinlock);
 465        wake_up(&dlm->dlm_thread_wq);
 466
 467        /* let the dlm thread take care of purging, keep scanning until
 468         * nothing remains in the hash */
 469        if (num) {
 470                mlog(0, "%s: %d lock resources in hash last pass\n",
 471                     dlm->name, num);
 472                ret = -EAGAIN;
 473        }
 474        mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
 475        return ret;
 476}
 477
 478static int dlm_no_joining_node(struct dlm_ctxt *dlm)
 479{
 480        int ret;
 481
 482        spin_lock(&dlm->spinlock);
 483        ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN;
 484        spin_unlock(&dlm->spinlock);
 485
 486        return ret;
 487}
 488
 489static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm)
 490{
 491        /* Yikes, a double spinlock! I need domain_lock for the dlm
 492         * state and the dlm spinlock for join state... Sorry! */
 493again:
 494        spin_lock(&dlm_domain_lock);
 495        spin_lock(&dlm->spinlock);
 496
 497        if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
 498                mlog(0, "Node %d is joining, we wait on it.\n",
 499                          dlm->joining_node);
 500                spin_unlock(&dlm->spinlock);
 501                spin_unlock(&dlm_domain_lock);
 502
 503                wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm));
 504                goto again;
 505        }
 506
 507        dlm->dlm_state = DLM_CTXT_LEAVING;
 508        spin_unlock(&dlm->spinlock);
 509        spin_unlock(&dlm_domain_lock);
 510}
 511
 512static void __dlm_print_nodes(struct dlm_ctxt *dlm)
 513{
 514        int node = -1;
 515
 516        assert_spin_locked(&dlm->spinlock);
 517
 518        printk(KERN_NOTICE "o2dlm: Nodes in domain %s: ", dlm->name);
 519
 520        while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
 521                                     node + 1)) < O2NM_MAX_NODES) {
 522                printk("%d ", node);
 523        }
 524        printk("\n");
 525}
 526
 527static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
 528                                   void **ret_data)
 529{
 530        struct dlm_ctxt *dlm = data;
 531        unsigned int node;
 532        struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
 533
 534        mlog_entry("%p %u %p", msg, len, data);
 535
 536        if (!dlm_grab(dlm))
 537                return 0;
 538
 539        node = exit_msg->node_idx;
 540
 541        printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s\n", node, dlm->name);
 542
 543        spin_lock(&dlm->spinlock);
 544        clear_bit(node, dlm->domain_map);
 545        __dlm_print_nodes(dlm);
 546
 547        /* notify anything attached to the heartbeat events */
 548        dlm_hb_event_notify_attached(dlm, node, 0);
 549
 550        spin_unlock(&dlm->spinlock);
 551
 552        dlm_put(dlm);
 553
 554        return 0;
 555}
 556
 557static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm,
 558                                    unsigned int node)
 559{
 560        int status;
 561        struct dlm_exit_domain leave_msg;
 562
 563        mlog(0, "Asking node %u if we can leave the domain %s me = %u\n",
 564                  node, dlm->name, dlm->node_num);
 565
 566        memset(&leave_msg, 0, sizeof(leave_msg));
 567        leave_msg.node_idx = dlm->node_num;
 568
 569        status = o2net_send_message(DLM_EXIT_DOMAIN_MSG, dlm->key,
 570                                    &leave_msg, sizeof(leave_msg), node,
 571                                    NULL);
 572        if (status < 0)
 573                mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
 574                     "node %u\n", status, DLM_EXIT_DOMAIN_MSG, dlm->key, node);
 575        mlog(0, "status return %d from o2net_send_message\n", status);
 576
 577        return status;
 578}
 579
 580
 581static void dlm_leave_domain(struct dlm_ctxt *dlm)
 582{
 583        int node, clear_node, status;
 584
 585        /* At this point we've migrated away all our locks and won't
 586         * accept mastership of new ones. The dlm is responsible for
 587         * almost nothing now. We make sure not to confuse any joining
 588         * nodes and then commence shutdown procedure. */
 589
 590        spin_lock(&dlm->spinlock);
 591        /* Clear ourselves from the domain map */
 592        clear_bit(dlm->node_num, dlm->domain_map);
 593        while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
 594                                     0)) < O2NM_MAX_NODES) {
 595                /* Drop the dlm spinlock. This is safe wrt the domain_map.
 596                 * -nodes cannot be added now as the
 597                 *   query_join_handlers knows to respond with OK_NO_MAP
 598                 * -we catch the right network errors if a node is
 599                 *   removed from the map while we're sending him the
 600                 *   exit message. */
 601                spin_unlock(&dlm->spinlock);
 602
 603                clear_node = 1;
 604
 605                status = dlm_send_one_domain_exit(dlm, node);
 606                if (status < 0 &&
 607                    status != -ENOPROTOOPT &&
 608                    status != -ENOTCONN) {
 609                        mlog(ML_NOTICE, "Error %d sending domain exit message "
 610                             "to node %d\n", status, node);
 611
 612                        /* Not sure what to do here but lets sleep for
 613                         * a bit in case this was a transient
 614                         * error... */
 615                        msleep(DLM_DOMAIN_BACKOFF_MS);
 616                        clear_node = 0;
 617                }
 618
 619                spin_lock(&dlm->spinlock);
 620                /* If we're not clearing the node bit then we intend
 621                 * to loop back around to try again. */
 622                if (clear_node)
 623                        clear_bit(node, dlm->domain_map);
 624        }
 625        spin_unlock(&dlm->spinlock);
 626}
 627
 628int dlm_joined(struct dlm_ctxt *dlm)
 629{
 630        int ret = 0;
 631
 632        spin_lock(&dlm_domain_lock);
 633
 634        if (dlm->dlm_state == DLM_CTXT_JOINED)
 635                ret = 1;
 636
 637        spin_unlock(&dlm_domain_lock);
 638
 639        return ret;
 640}
 641
 642int dlm_shutting_down(struct dlm_ctxt *dlm)
 643{
 644        int ret = 0;
 645
 646        spin_lock(&dlm_domain_lock);
 647
 648        if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN)
 649                ret = 1;
 650
 651        spin_unlock(&dlm_domain_lock);
 652
 653        return ret;
 654}
 655
 656void dlm_unregister_domain(struct dlm_ctxt *dlm)
 657{
 658        int leave = 0;
 659        struct dlm_lock_resource *res;
 660
 661        spin_lock(&dlm_domain_lock);
 662        BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED);
 663        BUG_ON(!dlm->num_joins);
 664
 665        dlm->num_joins--;
 666        if (!dlm->num_joins) {
 667                /* We mark it "in shutdown" now so new register
 668                 * requests wait until we've completely left the
 669                 * domain. Don't use DLM_CTXT_LEAVING yet as we still
 670                 * want new domain joins to communicate with us at
 671                 * least until we've completed migration of our
 672                 * resources. */
 673                dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN;
 674                leave = 1;
 675        }
 676        spin_unlock(&dlm_domain_lock);
 677
 678        if (leave) {
 679                mlog(0, "shutting down domain %s\n", dlm->name);
 680
 681                /* We changed dlm state, notify the thread */
 682                dlm_kick_thread(dlm, NULL);
 683
 684                while (dlm_migrate_all_locks(dlm)) {
 685                        /* Give dlm_thread time to purge the lockres' */
 686                        msleep(500);
 687                        mlog(0, "%s: more migration to do\n", dlm->name);
 688                }
 689
 690                /* This list should be empty. If not, print remaining lockres */
 691                if (!list_empty(&dlm->tracking_list)) {
 692                        mlog(ML_ERROR, "Following lockres' are still on the "
 693                             "tracking list:\n");
 694                        list_for_each_entry(res, &dlm->tracking_list, tracking)
 695                                dlm_print_one_lock_resource(res);
 696                }
 697
 698                dlm_mark_domain_leaving(dlm);
 699                dlm_leave_domain(dlm);
 700                dlm_force_free_mles(dlm);
 701                dlm_complete_dlm_shutdown(dlm);
 702        }
 703        dlm_put(dlm);
 704}
 705EXPORT_SYMBOL_GPL(dlm_unregister_domain);
 706
 707static int dlm_query_join_proto_check(char *proto_type, int node,
 708                                      struct dlm_protocol_version *ours,
 709                                      struct dlm_protocol_version *request)
 710{
 711        int rc;
 712        struct dlm_protocol_version proto = *request;
 713
 714        if (!dlm_protocol_compare(ours, &proto)) {
 715                mlog(0,
 716                     "node %u wanted to join with %s locking protocol "
 717                     "%u.%u, we respond with %u.%u\n",
 718                     node, proto_type,
 719                     request->pv_major,
 720                     request->pv_minor,
 721                     proto.pv_major, proto.pv_minor);
 722                request->pv_minor = proto.pv_minor;
 723                rc = 0;
 724        } else {
 725                mlog(ML_NOTICE,
 726                     "Node %u wanted to join with %s locking "
 727                     "protocol %u.%u, but we have %u.%u, disallowing\n",
 728                     node, proto_type,
 729                     request->pv_major,
 730                     request->pv_minor,
 731                     ours->pv_major,
 732                     ours->pv_minor);
 733                rc = 1;
 734        }
 735
 736        return rc;
 737}
 738
 739/*
 740 * struct dlm_query_join_packet is made up of four one-byte fields.  They
 741 * are effectively in big-endian order already.  However, little-endian
 742 * machines swap them before putting the packet on the wire (because
 743 * query_join's response is a status, and that status is treated as a u32
 744 * on the wire).  Thus, a big-endian and little-endian machines will treat
 745 * this structure differently.
 746 *
 747 * The solution is to have little-endian machines swap the structure when
 748 * converting from the structure to the u32 representation.  This will
 749 * result in the structure having the correct format on the wire no matter
 750 * the host endian format.
 751 */
 752static void dlm_query_join_packet_to_wire(struct dlm_query_join_packet *packet,
 753                                          u32 *wire)
 754{
 755        union dlm_query_join_response response;
 756
 757        response.packet = *packet;
 758        *wire = cpu_to_be32(response.intval);
 759}
 760
 761static void dlm_query_join_wire_to_packet(u32 wire,
 762                                          struct dlm_query_join_packet *packet)
 763{
 764        union dlm_query_join_response response;
 765
 766        response.intval = cpu_to_be32(wire);
 767        *packet = response.packet;
 768}
 769
 770static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
 771                                  void **ret_data)
 772{
 773        struct dlm_query_join_request *query;
 774        struct dlm_query_join_packet packet = {
 775                .code = JOIN_DISALLOW,
 776        };
 777        struct dlm_ctxt *dlm = NULL;
 778        u32 response;
 779        u8 nodenum;
 780
 781        query = (struct dlm_query_join_request *) msg->buf;
 782
 783        mlog(0, "node %u wants to join domain %s\n", query->node_idx,
 784                  query->domain);
 785
 786        /*
 787         * If heartbeat doesn't consider the node live, tell it
 788         * to back off and try again.  This gives heartbeat a chance
 789         * to catch up.
 790         */
 791        if (!o2hb_check_node_heartbeating(query->node_idx)) {
 792                mlog(0, "node %u is not in our live map yet\n",
 793                     query->node_idx);
 794
 795                packet.code = JOIN_DISALLOW;
 796                goto respond;
 797        }
 798
 799        packet.code = JOIN_OK_NO_MAP;
 800
 801        spin_lock(&dlm_domain_lock);
 802        dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
 803        if (!dlm)
 804                goto unlock_respond;
 805
 806        /*
 807         * There is a small window where the joining node may not see the
 808         * node(s) that just left but still part of the cluster. DISALLOW
 809         * join request if joining node has different node map.
 810         */
 811        nodenum=0;
 812        while (nodenum < O2NM_MAX_NODES) {
 813                if (test_bit(nodenum, dlm->domain_map)) {
 814                        if (!byte_test_bit(nodenum, query->node_map)) {
 815                                mlog(0, "disallow join as node %u does not "
 816                                     "have node %u in its nodemap\n",
 817                                     query->node_idx, nodenum);
 818                                packet.code = JOIN_DISALLOW;
 819                                goto unlock_respond;
 820                        }
 821                }
 822                nodenum++;
 823        }
 824
 825        /* Once the dlm ctxt is marked as leaving then we don't want
 826         * to be put in someone's domain map.
 827         * Also, explicitly disallow joining at certain troublesome
 828         * times (ie. during recovery). */
 829        if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) {
 830                int bit = query->node_idx;
 831                spin_lock(&dlm->spinlock);
 832
 833                if (dlm->dlm_state == DLM_CTXT_NEW &&
 834                    dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) {
 835                        /*If this is a brand new context and we
 836                         * haven't started our join process yet, then
 837                         * the other node won the race. */
 838                        packet.code = JOIN_OK_NO_MAP;
 839                } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
 840                        /* Disallow parallel joins. */
 841                        packet.code = JOIN_DISALLOW;
 842                } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
 843                        mlog(0, "node %u trying to join, but recovery "
 844                             "is ongoing.\n", bit);
 845                        packet.code = JOIN_DISALLOW;
 846                } else if (test_bit(bit, dlm->recovery_map)) {
 847                        mlog(0, "node %u trying to join, but it "
 848                             "still needs recovery.\n", bit);
 849                        packet.code = JOIN_DISALLOW;
 850                } else if (test_bit(bit, dlm->domain_map)) {
 851                        mlog(0, "node %u trying to join, but it "
 852                             "is still in the domain! needs recovery?\n",
 853                             bit);
 854                        packet.code = JOIN_DISALLOW;
 855                } else {
 856                        /* Alright we're fully a part of this domain
 857                         * so we keep some state as to who's joining
 858                         * and indicate to him that needs to be fixed
 859                         * up. */
 860
 861                        /* Make sure we speak compatible locking protocols.  */
 862                        if (dlm_query_join_proto_check("DLM", bit,
 863                                                       &dlm->dlm_locking_proto,
 864                                                       &query->dlm_proto)) {
 865                                packet.code = JOIN_PROTOCOL_MISMATCH;
 866                        } else if (dlm_query_join_proto_check("fs", bit,
 867                                                              &dlm->fs_locking_proto,
 868                                                              &query->fs_proto)) {
 869                                packet.code = JOIN_PROTOCOL_MISMATCH;
 870                        } else {
 871                                packet.dlm_minor = query->dlm_proto.pv_minor;
 872                                packet.fs_minor = query->fs_proto.pv_minor;
 873                                packet.code = JOIN_OK;
 874                                __dlm_set_joining_node(dlm, query->node_idx);
 875                        }
 876                }
 877
 878                spin_unlock(&dlm->spinlock);
 879        }
 880unlock_respond:
 881        spin_unlock(&dlm_domain_lock);
 882
 883respond:
 884        mlog(0, "We respond with %u\n", packet.code);
 885
 886        dlm_query_join_packet_to_wire(&packet, &response);
 887        return response;
 888}
 889
 890static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
 891                                     void **ret_data)
 892{
 893        struct dlm_assert_joined *assert;
 894        struct dlm_ctxt *dlm = NULL;
 895
 896        assert = (struct dlm_assert_joined *) msg->buf;
 897
 898        mlog(0, "node %u asserts join on domain %s\n", assert->node_idx,
 899                  assert->domain);
 900
 901        spin_lock(&dlm_domain_lock);
 902        dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len);
 903        /* XXX should we consider no dlm ctxt an error? */
 904        if (dlm) {
 905                spin_lock(&dlm->spinlock);
 906
 907                /* Alright, this node has officially joined our
 908                 * domain. Set him in the map and clean up our
 909                 * leftover join state. */
 910                BUG_ON(dlm->joining_node != assert->node_idx);
 911                set_bit(assert->node_idx, dlm->domain_map);
 912                __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
 913
 914                printk(KERN_NOTICE "o2dlm: Node %u joins domain %s\n",
 915                       assert->node_idx, dlm->name);
 916                __dlm_print_nodes(dlm);
 917
 918                /* notify anything attached to the heartbeat events */
 919                dlm_hb_event_notify_attached(dlm, assert->node_idx, 1);
 920
 921                spin_unlock(&dlm->spinlock);
 922        }
 923        spin_unlock(&dlm_domain_lock);
 924
 925        return 0;
 926}
 927
 928static int dlm_match_regions(struct dlm_ctxt *dlm,
 929                             struct dlm_query_region *qr)
 930{
 931        char *local = NULL, *remote = qr->qr_regions;
 932        char *l, *r;
 933        int localnr, i, j, foundit;
 934        int status = 0;
 935
 936        if (!o2hb_global_heartbeat_active()) {
 937                if (qr->qr_numregions) {
 938                        mlog(ML_ERROR, "Domain %s: Joining node %d has global "
 939                             "heartbeat enabled but local node %d does not\n",
 940                             qr->qr_domain, qr->qr_node, dlm->node_num);
 941                        status = -EINVAL;
 942                }
 943                goto bail;
 944        }
 945
 946        if (o2hb_global_heartbeat_active() && !qr->qr_numregions) {
 947                mlog(ML_ERROR, "Domain %s: Local node %d has global "
 948                     "heartbeat enabled but joining node %d does not\n",
 949                     qr->qr_domain, dlm->node_num, qr->qr_node);
 950                status = -EINVAL;
 951                goto bail;
 952        }
 953
 954        r = remote;
 955        for (i = 0; i < qr->qr_numregions; ++i) {
 956                mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, r);
 957                r += O2HB_MAX_REGION_NAME_LEN;
 958        }
 959
 960        local = kmalloc(sizeof(qr->qr_regions), GFP_ATOMIC);
 961        if (!local) {
 962                status = -ENOMEM;
 963                goto bail;
 964        }
 965
 966        localnr = o2hb_get_all_regions(local, O2NM_MAX_REGIONS);
 967
 968        /* compare local regions with remote */
 969        l = local;
 970        for (i = 0; i < localnr; ++i) {
 971                foundit = 0;
 972                r = remote;
 973                for (j = 0; j <= qr->qr_numregions; ++j) {
 974                        if (!memcmp(l, r, O2HB_MAX_REGION_NAME_LEN)) {
 975                                foundit = 1;
 976                                break;
 977                        }
 978                        r += O2HB_MAX_REGION_NAME_LEN;
 979                }
 980                if (!foundit) {
 981                        status = -EINVAL;
 982                        mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
 983                             "in local node %d but not in joining node %d\n",
 984                             qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, l,
 985                             dlm->node_num, qr->qr_node);
 986                        goto bail;
 987                }
 988                l += O2HB_MAX_REGION_NAME_LEN;
 989        }
 990
 991        /* compare remote with local regions */
 992        r = remote;
 993        for (i = 0; i < qr->qr_numregions; ++i) {
 994                foundit = 0;
 995                l = local;
 996                for (j = 0; j < localnr; ++j) {
 997                        if (!memcmp(r, l, O2HB_MAX_REGION_NAME_LEN)) {
 998                                foundit = 1;
 999                                break;
1000                        }
1001                        l += O2HB_MAX_REGION_NAME_LEN;
1002                }
1003                if (!foundit) {
1004                        status = -EINVAL;
1005                        mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
1006                             "in joining node %d but not in local node %d\n",
1007                             qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, r,
1008                             qr->qr_node, dlm->node_num);
1009                        goto bail;
1010                }
1011                r += O2HB_MAX_REGION_NAME_LEN;
1012        }
1013
1014bail:
1015        kfree(local);
1016
1017        return status;
1018}
1019
1020static int dlm_send_regions(struct dlm_ctxt *dlm, unsigned long *node_map)
1021{
1022        struct dlm_query_region *qr = NULL;
1023        int status, ret = 0, i;
1024        char *p;
1025
1026        if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1027                goto bail;
1028
1029        qr = kzalloc(sizeof(struct dlm_query_region), GFP_KERNEL);
1030        if (!qr) {
1031                ret = -ENOMEM;
1032                mlog_errno(ret);
1033                goto bail;
1034        }
1035
1036        qr->qr_node = dlm->node_num;
1037        qr->qr_namelen = strlen(dlm->name);
1038        memcpy(qr->qr_domain, dlm->name, qr->qr_namelen);
1039        /* if local hb, the numregions will be zero */
1040        if (o2hb_global_heartbeat_active())
1041                qr->qr_numregions = o2hb_get_all_regions(qr->qr_regions,
1042                                                         O2NM_MAX_REGIONS);
1043
1044        p = qr->qr_regions;
1045        for (i = 0; i < qr->qr_numregions; ++i, p += O2HB_MAX_REGION_NAME_LEN)
1046                mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, p);
1047
1048        i = -1;
1049        while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1050                                  i + 1)) < O2NM_MAX_NODES) {
1051                if (i == dlm->node_num)
1052                        continue;
1053
1054                mlog(0, "Sending regions to node %d\n", i);
1055
1056                ret = o2net_send_message(DLM_QUERY_REGION, DLM_MOD_KEY, qr,
1057                                         sizeof(struct dlm_query_region),
1058                                         i, &status);
1059                if (ret >= 0)
1060                        ret = status;
1061                if (ret) {
1062                        mlog(ML_ERROR, "Region mismatch %d, node %d\n",
1063                             ret, i);
1064                        break;
1065                }
1066        }
1067
1068bail:
1069        kfree(qr);
1070        return ret;
1071}
1072
1073static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
1074                                    void *data, void **ret_data)
1075{
1076        struct dlm_query_region *qr;
1077        struct dlm_ctxt *dlm = NULL;
1078        int status = 0;
1079        int locked = 0;
1080
1081        qr = (struct dlm_query_region *) msg->buf;
1082
1083        mlog(0, "Node %u queries hb regions on domain %s\n", qr->qr_node,
1084             qr->qr_domain);
1085
1086        status = -EINVAL;
1087
1088        spin_lock(&dlm_domain_lock);
1089        dlm = __dlm_lookup_domain_full(qr->qr_domain, qr->qr_namelen);
1090        if (!dlm) {
1091                mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1092                     "before join domain\n", qr->qr_node, qr->qr_domain);
1093                goto bail;
1094        }
1095
1096        spin_lock(&dlm->spinlock);
1097        locked = 1;
1098        if (dlm->joining_node != qr->qr_node) {
1099                mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1100                     "but joining node is %d\n", qr->qr_node, qr->qr_domain,
1101                     dlm->joining_node);
1102                goto bail;
1103        }
1104
1105        /* Support for global heartbeat was added in 1.1 */
1106        if (dlm->dlm_locking_proto.pv_major == 1 &&
1107            dlm->dlm_locking_proto.pv_minor == 0) {
1108                mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1109                     "but active dlm protocol is %d.%d\n", qr->qr_node,
1110                     qr->qr_domain, dlm->dlm_locking_proto.pv_major,
1111                     dlm->dlm_locking_proto.pv_minor);
1112                goto bail;
1113        }
1114
1115        status = dlm_match_regions(dlm, qr);
1116
1117bail:
1118        if (locked)
1119                spin_unlock(&dlm->spinlock);
1120        spin_unlock(&dlm_domain_lock);
1121
1122        return status;
1123}
1124
1125static int dlm_match_nodes(struct dlm_ctxt *dlm, struct dlm_query_nodeinfo *qn)
1126{
1127        struct o2nm_node *local;
1128        struct dlm_node_info *remote;
1129        int i, j;
1130        int status = 0;
1131
1132        for (j = 0; j < qn->qn_numnodes; ++j)
1133                mlog(0, "Node %3d, %pI4:%u\n", qn->qn_nodes[j].ni_nodenum,
1134                     &(qn->qn_nodes[j].ni_ipv4_address),
1135                     ntohs(qn->qn_nodes[j].ni_ipv4_port));
1136
1137        for (i = 0; i < O2NM_MAX_NODES && !status; ++i) {
1138                local = o2nm_get_node_by_num(i);
1139                remote = NULL;
1140                for (j = 0; j < qn->qn_numnodes; ++j) {
1141                        if (qn->qn_nodes[j].ni_nodenum == i) {
1142                                remote = &(qn->qn_nodes[j]);
1143                                break;
1144                        }
1145                }
1146
1147                if (!local && !remote)
1148                        continue;
1149
1150                if ((local && !remote) || (!local && remote))
1151                        status = -EINVAL;
1152
1153                if (!status &&
1154                    ((remote->ni_nodenum != local->nd_num) ||
1155                     (remote->ni_ipv4_port != local->nd_ipv4_port) ||
1156                     (remote->ni_ipv4_address != local->nd_ipv4_address)))
1157                        status = -EINVAL;
1158
1159                if (status) {
1160                        if (remote && !local)
1161                                mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
1162                                     "registered in joining node %d but not in "
1163                                     "local node %d\n", qn->qn_domain,
1164                                     remote->ni_nodenum,
1165                                     &(remote->ni_ipv4_address),
1166                                     ntohs(remote->ni_ipv4_port),
1167                                     qn->qn_nodenum, dlm->node_num);
1168                        if (local && !remote)
1169                                mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
1170                                     "registered in local node %d but not in "
1171                                     "joining node %d\n", qn->qn_domain,
1172                                     local->nd_num, &(local->nd_ipv4_address),
1173                                     ntohs(local->nd_ipv4_port),
1174                                     dlm->node_num, qn->qn_nodenum);
1175                        BUG_ON((!local && !remote));
1176                }
1177
1178                if (local)
1179                        o2nm_node_put(local);
1180        }
1181
1182        return status;
1183}
1184
1185static int dlm_send_nodeinfo(struct dlm_ctxt *dlm, unsigned long *node_map)
1186{
1187        struct dlm_query_nodeinfo *qn = NULL;
1188        struct o2nm_node *node;
1189        int ret = 0, status, count, i;
1190
1191        if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1192                goto bail;
1193
1194        qn = kzalloc(sizeof(struct dlm_query_nodeinfo), GFP_KERNEL);
1195        if (!qn) {
1196                ret = -ENOMEM;
1197                mlog_errno(ret);
1198                goto bail;
1199        }
1200
1201        for (i = 0, count = 0; i < O2NM_MAX_NODES; ++i) {
1202                node = o2nm_get_node_by_num(i);
1203                if (!node)
1204                        continue;
1205                qn->qn_nodes[count].ni_nodenum = node->nd_num;
1206                qn->qn_nodes[count].ni_ipv4_port = node->nd_ipv4_port;
1207                qn->qn_nodes[count].ni_ipv4_address = node->nd_ipv4_address;
1208                mlog(0, "Node %3d, %pI4:%u\n", node->nd_num,
1209                     &(node->nd_ipv4_address), ntohs(node->nd_ipv4_port));
1210                ++count;
1211                o2nm_node_put(node);
1212        }
1213
1214        qn->qn_nodenum = dlm->node_num;
1215        qn->qn_numnodes = count;
1216        qn->qn_namelen = strlen(dlm->name);
1217        memcpy(qn->qn_domain, dlm->name, qn->qn_namelen);
1218
1219        i = -1;
1220        while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1221                                  i + 1)) < O2NM_MAX_NODES) {
1222                if (i == dlm->node_num)
1223                        continue;
1224
1225                mlog(0, "Sending nodeinfo to node %d\n", i);
1226
1227                ret = o2net_send_message(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
1228                                         qn, sizeof(struct dlm_query_nodeinfo),
1229                                         i, &status);
1230                if (ret >= 0)
1231                        ret = status;
1232                if (ret) {
1233                        mlog(ML_ERROR, "node mismatch %d, node %d\n", ret, i);
1234                        break;
1235                }
1236        }
1237
1238bail:
1239        kfree(qn);
1240        return ret;
1241}
1242
1243static int dlm_query_nodeinfo_handler(struct o2net_msg *msg, u32 len,
1244                                      void *data, void **ret_data)
1245{
1246        struct dlm_query_nodeinfo *qn;
1247        struct dlm_ctxt *dlm = NULL;
1248        int locked = 0, status = -EINVAL;
1249
1250        qn = (struct dlm_query_nodeinfo *) msg->buf;
1251
1252        mlog(0, "Node %u queries nodes on domain %s\n", qn->qn_nodenum,
1253             qn->qn_domain);
1254
1255        spin_lock(&dlm_domain_lock);
1256        dlm = __dlm_lookup_domain_full(qn->qn_domain, qn->qn_namelen);
1257        if (!dlm) {
1258                mlog(ML_ERROR, "Node %d queried nodes on domain %s before "
1259                     "join domain\n", qn->qn_nodenum, qn->qn_domain);
1260                goto bail;
1261        }
1262
1263        spin_lock(&dlm->spinlock);
1264        locked = 1;
1265        if (dlm->joining_node != qn->qn_nodenum) {
1266                mlog(ML_ERROR, "Node %d queried nodes on domain %s but "
1267                     "joining node is %d\n", qn->qn_nodenum, qn->qn_domain,
1268                     dlm->joining_node);
1269                goto bail;
1270        }
1271
1272        /* Support for node query was added in 1.1 */
1273        if (dlm->dlm_locking_proto.pv_major == 1 &&
1274            dlm->dlm_locking_proto.pv_minor == 0) {
1275                mlog(ML_ERROR, "Node %d queried nodes on domain %s "
1276                     "but active dlm protocol is %d.%d\n", qn->qn_nodenum,
1277                     qn->qn_domain, dlm->dlm_locking_proto.pv_major,
1278                     dlm->dlm_locking_proto.pv_minor);
1279                goto bail;
1280        }
1281
1282        status = dlm_match_nodes(dlm, qn);
1283
1284bail:
1285        if (locked)
1286                spin_unlock(&dlm->spinlock);
1287        spin_unlock(&dlm_domain_lock);
1288
1289        return status;
1290}
1291
1292static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
1293                                   void **ret_data)
1294{
1295        struct dlm_cancel_join *cancel;
1296        struct dlm_ctxt *dlm = NULL;
1297
1298        cancel = (struct dlm_cancel_join *) msg->buf;
1299
1300        mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx,
1301                  cancel->domain);
1302
1303        spin_lock(&dlm_domain_lock);
1304        dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len);
1305
1306        if (dlm) {
1307                spin_lock(&dlm->spinlock);
1308
1309                /* Yikes, this guy wants to cancel his join. No
1310                 * problem, we simply cleanup our join state. */
1311                BUG_ON(dlm->joining_node != cancel->node_idx);
1312                __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1313
1314                spin_unlock(&dlm->spinlock);
1315        }
1316        spin_unlock(&dlm_domain_lock);
1317
1318        return 0;
1319}
1320
1321static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm,
1322                                    unsigned int node)
1323{
1324        int status;
1325        struct dlm_cancel_join cancel_msg;
1326
1327        memset(&cancel_msg, 0, sizeof(cancel_msg));
1328        cancel_msg.node_idx = dlm->node_num;
1329        cancel_msg.name_len = strlen(dlm->name);
1330        memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len);
1331
1332        status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1333                                    &cancel_msg, sizeof(cancel_msg), node,
1334                                    NULL);
1335        if (status < 0) {
1336                mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1337                     "node %u\n", status, DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1338                     node);
1339                goto bail;
1340        }
1341
1342bail:
1343        return status;
1344}
1345
1346/* map_size should be in bytes. */
1347static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
1348                                 unsigned long *node_map,
1349                                 unsigned int map_size)
1350{
1351        int status, tmpstat;
1352        unsigned int node;
1353
1354        if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) *
1355                         sizeof(unsigned long))) {
1356                mlog(ML_ERROR,
1357                     "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n",
1358                     map_size, (unsigned)BITS_TO_LONGS(O2NM_MAX_NODES));
1359                return -EINVAL;
1360        }
1361
1362        status = 0;
1363        node = -1;
1364        while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
1365                                     node + 1)) < O2NM_MAX_NODES) {
1366                if (node == dlm->node_num)
1367                        continue;
1368
1369                tmpstat = dlm_send_one_join_cancel(dlm, node);
1370                if (tmpstat) {
1371                        mlog(ML_ERROR, "Error return %d cancelling join on "
1372                             "node %d\n", tmpstat, node);
1373                        if (!status)
1374                                status = tmpstat;
1375                }
1376        }
1377
1378        if (status)
1379                mlog_errno(status);
1380        return status;
1381}
1382
1383static int dlm_request_join(struct dlm_ctxt *dlm,
1384                            int node,
1385                            enum dlm_query_join_response_code *response)
1386{
1387        int status;
1388        struct dlm_query_join_request join_msg;
1389        struct dlm_query_join_packet packet;
1390        u32 join_resp;
1391
1392        mlog(0, "querying node %d\n", node);
1393
1394        memset(&join_msg, 0, sizeof(join_msg));
1395        join_msg.node_idx = dlm->node_num;
1396        join_msg.name_len = strlen(dlm->name);
1397        memcpy(join_msg.domain, dlm->name, join_msg.name_len);
1398        join_msg.dlm_proto = dlm->dlm_locking_proto;
1399        join_msg.fs_proto = dlm->fs_locking_proto;
1400
1401        /* copy live node map to join message */
1402        byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES);
1403
1404        status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
1405                                    sizeof(join_msg), node, &join_resp);
1406        if (status < 0 && status != -ENOPROTOOPT) {
1407                mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1408                     "node %u\n", status, DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1409                     node);
1410                goto bail;
1411        }
1412        dlm_query_join_wire_to_packet(join_resp, &packet);
1413
1414        /* -ENOPROTOOPT from the net code means the other side isn't
1415            listening for our message type -- that's fine, it means
1416            his dlm isn't up, so we can consider him a 'yes' but not
1417            joined into the domain.  */
1418        if (status == -ENOPROTOOPT) {
1419                status = 0;
1420                *response = JOIN_OK_NO_MAP;
1421        } else if (packet.code == JOIN_DISALLOW ||
1422                   packet.code == JOIN_OK_NO_MAP) {
1423                *response = packet.code;
1424        } else if (packet.code == JOIN_PROTOCOL_MISMATCH) {
1425                mlog(ML_NOTICE,
1426                     "This node requested DLM locking protocol %u.%u and "
1427                     "filesystem locking protocol %u.%u.  At least one of "
1428                     "the protocol versions on node %d is not compatible, "
1429                     "disconnecting\n",
1430                     dlm->dlm_locking_proto.pv_major,
1431                     dlm->dlm_locking_proto.pv_minor,
1432                     dlm->fs_locking_proto.pv_major,
1433                     dlm->fs_locking_proto.pv_minor,
1434                     node);
1435                status = -EPROTO;
1436                *response = packet.code;
1437        } else if (packet.code == JOIN_OK) {
1438                *response = packet.code;
1439                /* Use the same locking protocol as the remote node */
1440                dlm->dlm_locking_proto.pv_minor = packet.dlm_minor;
1441                dlm->fs_locking_proto.pv_minor = packet.fs_minor;
1442                mlog(0,
1443                     "Node %d responds JOIN_OK with DLM locking protocol "
1444                     "%u.%u and fs locking protocol %u.%u\n",
1445                     node,
1446                     dlm->dlm_locking_proto.pv_major,
1447                     dlm->dlm_locking_proto.pv_minor,
1448                     dlm->fs_locking_proto.pv_major,
1449                     dlm->fs_locking_proto.pv_minor);
1450        } else {
1451                status = -EINVAL;
1452                mlog(ML_ERROR, "invalid response %d from node %u\n",
1453                     packet.code, node);
1454        }
1455
1456        mlog(0, "status %d, node %d response is %d\n", status, node,
1457             *response);
1458
1459bail:
1460        return status;
1461}
1462
1463static int dlm_send_one_join_assert(struct dlm_ctxt *dlm,
1464                                    unsigned int node)
1465{
1466        int status;
1467        struct dlm_assert_joined assert_msg;
1468
1469        mlog(0, "Sending join assert to node %u\n", node);
1470
1471        memset(&assert_msg, 0, sizeof(assert_msg));
1472        assert_msg.node_idx = dlm->node_num;
1473        assert_msg.name_len = strlen(dlm->name);
1474        memcpy(assert_msg.domain, dlm->name, assert_msg.name_len);
1475
1476        status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1477                                    &assert_msg, sizeof(assert_msg), node,
1478                                    NULL);
1479        if (status < 0)
1480                mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1481                     "node %u\n", status, DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1482                     node);
1483
1484        return status;
1485}
1486
1487static void dlm_send_join_asserts(struct dlm_ctxt *dlm,
1488                                  unsigned long *node_map)
1489{
1490        int status, node, live;
1491
1492        status = 0;
1493        node = -1;
1494        while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
1495                                     node + 1)) < O2NM_MAX_NODES) {
1496                if (node == dlm->node_num)
1497                        continue;
1498
1499                do {
1500                        /* It is very important that this message be
1501                         * received so we spin until either the node
1502                         * has died or it gets the message. */
1503                        status = dlm_send_one_join_assert(dlm, node);
1504
1505                        spin_lock(&dlm->spinlock);
1506                        live = test_bit(node, dlm->live_nodes_map);
1507                        spin_unlock(&dlm->spinlock);
1508
1509                        if (status) {
1510                                mlog(ML_ERROR, "Error return %d asserting "
1511                                     "join on node %d\n", status, node);
1512
1513                                /* give us some time between errors... */
1514                                if (live)
1515                                        msleep(DLM_DOMAIN_BACKOFF_MS);
1516                        }
1517                } while (status && live);
1518        }
1519}
1520
1521struct domain_join_ctxt {
1522        unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1523        unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1524};
1525
1526static int dlm_should_restart_join(struct dlm_ctxt *dlm,
1527                                   struct domain_join_ctxt *ctxt,
1528                                   enum dlm_query_join_response_code response)
1529{
1530        int ret;
1531
1532        if (response == JOIN_DISALLOW) {
1533                mlog(0, "Latest response of disallow -- should restart\n");
1534                return 1;
1535        }
1536
1537        spin_lock(&dlm->spinlock);
1538        /* For now, we restart the process if the node maps have
1539         * changed at all */
1540        ret = memcmp(ctxt->live_map, dlm->live_nodes_map,
1541                     sizeof(dlm->live_nodes_map));
1542        spin_unlock(&dlm->spinlock);
1543
1544        if (ret)
1545                mlog(0, "Node maps changed -- should restart\n");
1546
1547        return ret;
1548}
1549
1550static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
1551{
1552        int status = 0, tmpstat, node;
1553        struct domain_join_ctxt *ctxt;
1554        enum dlm_query_join_response_code response = JOIN_DISALLOW;
1555
1556        mlog_entry("%p", dlm);
1557
1558        ctxt = kzalloc(sizeof(*ctxt), GFP_KERNEL);
1559        if (!ctxt) {
1560                status = -ENOMEM;
1561                mlog_errno(status);
1562                goto bail;
1563        }
1564
1565        /* group sem locking should work for us here -- we're already
1566         * registered for heartbeat events so filling this should be
1567         * atomic wrt getting those handlers called. */
1568        o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map));
1569
1570        spin_lock(&dlm->spinlock);
1571        memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map));
1572
1573        __dlm_set_joining_node(dlm, dlm->node_num);
1574
1575        spin_unlock(&dlm->spinlock);
1576
1577        node = -1;
1578        while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES,
1579                                     node + 1)) < O2NM_MAX_NODES) {
1580                if (node == dlm->node_num)
1581                        continue;
1582
1583                status = dlm_request_join(dlm, node, &response);
1584                if (status < 0) {
1585                        mlog_errno(status);
1586                        goto bail;
1587                }
1588
1589                /* Ok, either we got a response or the node doesn't have a
1590                 * dlm up. */
1591                if (response == JOIN_OK)
1592                        set_bit(node, ctxt->yes_resp_map);
1593
1594                if (dlm_should_restart_join(dlm, ctxt, response)) {
1595                        status = -EAGAIN;
1596                        goto bail;
1597                }
1598        }
1599
1600        mlog(0, "Yay, done querying nodes!\n");
1601
1602        /* Yay, everyone agree's we can join the domain. My domain is
1603         * comprised of all nodes who were put in the
1604         * yes_resp_map. Copy that into our domain map and send a join
1605         * assert message to clean up everyone elses state. */
1606        spin_lock(&dlm->spinlock);
1607        memcpy(dlm->domain_map, ctxt->yes_resp_map,
1608               sizeof(ctxt->yes_resp_map));
1609        set_bit(dlm->node_num, dlm->domain_map);
1610        spin_unlock(&dlm->spinlock);
1611
1612        /* Support for global heartbeat and node info was added in 1.1 */
1613        if (dlm_protocol.pv_major > 1 || dlm_protocol.pv_minor > 0) {
1614                status = dlm_send_nodeinfo(dlm, ctxt->yes_resp_map);
1615                if (status) {
1616                        mlog_errno(status);
1617                        goto bail;
1618                }
1619                status = dlm_send_regions(dlm, ctxt->yes_resp_map);
1620                if (status) {
1621                        mlog_errno(status);
1622                        goto bail;
1623                }
1624        }
1625
1626        dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
1627
1628        /* Joined state *must* be set before the joining node
1629         * information, otherwise the query_join handler may read no
1630         * current joiner but a state of NEW and tell joining nodes
1631         * we're not in the domain. */
1632        spin_lock(&dlm_domain_lock);
1633        dlm->dlm_state = DLM_CTXT_JOINED;
1634        dlm->num_joins++;
1635        spin_unlock(&dlm_domain_lock);
1636
1637bail:
1638        spin_lock(&dlm->spinlock);
1639        __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1640        if (!status)
1641                __dlm_print_nodes(dlm);
1642        spin_unlock(&dlm->spinlock);
1643
1644        if (ctxt) {
1645                /* Do we need to send a cancel message to any nodes? */
1646                if (status < 0) {
1647                        tmpstat = dlm_send_join_cancels(dlm,
1648                                                        ctxt->yes_resp_map,
1649                                                        sizeof(ctxt->yes_resp_map));
1650                        if (tmpstat < 0)
1651                                mlog_errno(tmpstat);
1652                }
1653                kfree(ctxt);
1654        }
1655
1656        mlog(0, "returning %d\n", status);
1657        return status;
1658}
1659
1660static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm)
1661{
1662        o2hb_unregister_callback(dlm->name, &dlm->dlm_hb_up);
1663        o2hb_unregister_callback(dlm->name, &dlm->dlm_hb_down);
1664        o2net_unregister_handler_list(&dlm->dlm_domain_handlers);
1665}
1666
1667static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
1668{
1669        int status;
1670
1671        mlog(0, "registering handlers.\n");
1672
1673        o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB,
1674                            dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI);
1675        status = o2hb_register_callback(dlm->name, &dlm->dlm_hb_down);
1676        if (status)
1677                goto bail;
1678
1679        o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB,
1680                            dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI);
1681        status = o2hb_register_callback(dlm->name, &dlm->dlm_hb_up);
1682        if (status)
1683                goto bail;
1684
1685        status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
1686                                        sizeof(struct dlm_master_request),
1687                                        dlm_master_request_handler,
1688                                        dlm, NULL, &dlm->dlm_domain_handlers);
1689        if (status)
1690                goto bail;
1691
1692        status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
1693                                        sizeof(struct dlm_assert_master),
1694                                        dlm_assert_master_handler,
1695                                        dlm, dlm_assert_master_post_handler,
1696                                        &dlm->dlm_domain_handlers);
1697        if (status)
1698                goto bail;
1699
1700        status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
1701                                        sizeof(struct dlm_create_lock),
1702                                        dlm_create_lock_handler,
1703                                        dlm, NULL, &dlm->dlm_domain_handlers);
1704        if (status)
1705                goto bail;
1706
1707        status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
1708                                        DLM_CONVERT_LOCK_MAX_LEN,
1709                                        dlm_convert_lock_handler,
1710                                        dlm, NULL, &dlm->dlm_domain_handlers);
1711        if (status)
1712                goto bail;
1713
1714        status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
1715                                        DLM_UNLOCK_LOCK_MAX_LEN,
1716                                        dlm_unlock_lock_handler,
1717                                        dlm, NULL, &dlm->dlm_domain_handlers);
1718        if (status)
1719                goto bail;
1720
1721        status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
1722                                        DLM_PROXY_AST_MAX_LEN,
1723                                        dlm_proxy_ast_handler,
1724                                        dlm, NULL, &dlm->dlm_domain_handlers);
1725        if (status)
1726                goto bail;
1727
1728        status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
1729                                        sizeof(struct dlm_exit_domain),
1730                                        dlm_exit_domain_handler,
1731                                        dlm, NULL, &dlm->dlm_domain_handlers);
1732        if (status)
1733                goto bail;
1734
1735        status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
1736                                        sizeof(struct dlm_deref_lockres),
1737                                        dlm_deref_lockres_handler,
1738                                        dlm, NULL, &dlm->dlm_domain_handlers);
1739        if (status)
1740                goto bail;
1741
1742        status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1743                                        sizeof(struct dlm_migrate_request),
1744                                        dlm_migrate_request_handler,
1745                                        dlm, NULL, &dlm->dlm_domain_handlers);
1746        if (status)
1747                goto bail;
1748
1749        status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
1750                                        DLM_MIG_LOCKRES_MAX_LEN,
1751                                        dlm_mig_lockres_handler,
1752                                        dlm, NULL, &dlm->dlm_domain_handlers);
1753        if (status)
1754                goto bail;
1755
1756        status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
1757                                        sizeof(struct dlm_master_requery),
1758                                        dlm_master_requery_handler,
1759                                        dlm, NULL, &dlm->dlm_domain_handlers);
1760        if (status)
1761                goto bail;
1762
1763        status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
1764                                        sizeof(struct dlm_lock_request),
1765                                        dlm_request_all_locks_handler,
1766                                        dlm, NULL, &dlm->dlm_domain_handlers);
1767        if (status)
1768                goto bail;
1769
1770        status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
1771                                        sizeof(struct dlm_reco_data_done),
1772                                        dlm_reco_data_done_handler,
1773                                        dlm, NULL, &dlm->dlm_domain_handlers);
1774        if (status)
1775                goto bail;
1776
1777        status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
1778                                        sizeof(struct dlm_begin_reco),
1779                                        dlm_begin_reco_handler,
1780                                        dlm, NULL, &dlm->dlm_domain_handlers);
1781        if (status)
1782                goto bail;
1783
1784        status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
1785                                        sizeof(struct dlm_finalize_reco),
1786                                        dlm_finalize_reco_handler,
1787                                        dlm, NULL, &dlm->dlm_domain_handlers);
1788        if (status)
1789                goto bail;
1790
1791bail:
1792        if (status)
1793                dlm_unregister_domain_handlers(dlm);
1794
1795        return status;
1796}
1797
1798static int dlm_join_domain(struct dlm_ctxt *dlm)
1799{
1800        int status;
1801        unsigned int backoff;
1802        unsigned int total_backoff = 0;
1803
1804        BUG_ON(!dlm);
1805
1806        mlog(0, "Join domain %s\n", dlm->name);
1807
1808        status = dlm_register_domain_handlers(dlm);
1809        if (status) {
1810                mlog_errno(status);
1811                goto bail;
1812        }
1813
1814        status = dlm_debug_init(dlm);
1815        if (status < 0) {
1816                mlog_errno(status);
1817                goto bail;
1818        }
1819
1820        status = dlm_launch_thread(dlm);
1821        if (status < 0) {
1822                mlog_errno(status);
1823                goto bail;
1824        }
1825
1826        status = dlm_launch_recovery_thread(dlm);
1827        if (status < 0) {
1828                mlog_errno(status);
1829                goto bail;
1830        }
1831
1832        dlm->dlm_worker = create_singlethread_workqueue("dlm_wq");
1833        if (!dlm->dlm_worker) {
1834                status = -ENOMEM;
1835                mlog_errno(status);
1836                goto bail;
1837        }
1838
1839        do {
1840                status = dlm_try_to_join_domain(dlm);
1841
1842                /* If we're racing another node to the join, then we
1843                 * need to back off temporarily and let them
1844                 * complete. */
1845#define DLM_JOIN_TIMEOUT_MSECS  90000
1846                if (status == -EAGAIN) {
1847                        if (signal_pending(current)) {
1848                                status = -ERESTARTSYS;
1849                                goto bail;
1850                        }
1851
1852                        if (total_backoff >
1853                            msecs_to_jiffies(DLM_JOIN_TIMEOUT_MSECS)) {
1854                                status = -ERESTARTSYS;
1855                                mlog(ML_NOTICE, "Timed out joining dlm domain "
1856                                     "%s after %u msecs\n", dlm->name,
1857                                     jiffies_to_msecs(total_backoff));
1858                                goto bail;
1859                        }
1860
1861                        /*
1862                         * <chip> After you!
1863                         * <dale> No, after you!
1864                         * <chip> I insist!
1865                         * <dale> But you first!
1866                         * ...
1867                         */
1868                        backoff = (unsigned int)(jiffies & 0x3);
1869                        backoff *= DLM_DOMAIN_BACKOFF_MS;
1870                        total_backoff += backoff;
1871                        mlog(0, "backoff %d\n", backoff);
1872                        msleep(backoff);
1873                }
1874        } while (status == -EAGAIN);
1875
1876        if (status < 0) {
1877                mlog_errno(status);
1878                goto bail;
1879        }
1880
1881        status = 0;
1882bail:
1883        wake_up(&dlm_domain_events);
1884
1885        if (status) {
1886                dlm_unregister_domain_handlers(dlm);
1887                dlm_debug_shutdown(dlm);
1888                dlm_complete_thread(dlm);
1889                dlm_complete_recovery_thread(dlm);
1890                dlm_destroy_dlm_worker(dlm);
1891        }
1892
1893        return status;
1894}
1895
1896static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1897                                u32 key)
1898{
1899        int i;
1900        int ret;
1901        struct dlm_ctxt *dlm = NULL;
1902
1903        dlm = kzalloc(sizeof(*dlm), GFP_KERNEL);
1904        if (!dlm) {
1905                mlog_errno(-ENOMEM);
1906                goto leave;
1907        }
1908
1909        dlm->name = kstrdup(domain, GFP_KERNEL);
1910        if (dlm->name == NULL) {
1911                mlog_errno(-ENOMEM);
1912                kfree(dlm);
1913                dlm = NULL;
1914                goto leave;
1915        }
1916
1917        dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES);
1918        if (!dlm->lockres_hash) {
1919                mlog_errno(-ENOMEM);
1920                kfree(dlm->name);
1921                kfree(dlm);
1922                dlm = NULL;
1923                goto leave;
1924        }
1925
1926        for (i = 0; i < DLM_HASH_BUCKETS; i++)
1927                INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i));
1928
1929        dlm->master_hash = (struct hlist_head **)
1930                                dlm_alloc_pagevec(DLM_HASH_PAGES);
1931        if (!dlm->master_hash) {
1932                mlog_errno(-ENOMEM);
1933                dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
1934                kfree(dlm->name);
1935                kfree(dlm);
1936                dlm = NULL;
1937                goto leave;
1938        }
1939
1940        for (i = 0; i < DLM_HASH_BUCKETS; i++)
1941                INIT_HLIST_HEAD(dlm_master_hash(dlm, i));
1942
1943        dlm->key = key;
1944        dlm->node_num = o2nm_this_node();
1945
1946        ret = dlm_create_debugfs_subroot(dlm);
1947        if (ret < 0) {
1948                dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES);
1949                dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
1950                kfree(dlm->name);
1951                kfree(dlm);
1952                dlm = NULL;
1953                goto leave;
1954        }
1955
1956        spin_lock_init(&dlm->spinlock);
1957        spin_lock_init(&dlm->master_lock);
1958        spin_lock_init(&dlm->ast_lock);
1959        spin_lock_init(&dlm->track_lock);
1960        INIT_LIST_HEAD(&dlm->list);
1961        INIT_LIST_HEAD(&dlm->dirty_list);
1962        INIT_LIST_HEAD(&dlm->reco.resources);
1963        INIT_LIST_HEAD(&dlm->reco.received);
1964        INIT_LIST_HEAD(&dlm->reco.node_data);
1965        INIT_LIST_HEAD(&dlm->purge_list);
1966        INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
1967        INIT_LIST_HEAD(&dlm->tracking_list);
1968        dlm->reco.state = 0;
1969
1970        INIT_LIST_HEAD(&dlm->pending_asts);
1971        INIT_LIST_HEAD(&dlm->pending_basts);
1972
1973        mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n",
1974                  dlm->recovery_map, &(dlm->recovery_map[0]));
1975
1976        memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map));
1977        memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map));
1978        memset(dlm->domain_map, 0, sizeof(dlm->domain_map));
1979
1980        dlm->dlm_thread_task = NULL;
1981        dlm->dlm_reco_thread_task = NULL;
1982        dlm->dlm_worker = NULL;
1983        init_waitqueue_head(&dlm->dlm_thread_wq);
1984        init_waitqueue_head(&dlm->dlm_reco_thread_wq);
1985        init_waitqueue_head(&dlm->reco.event);
1986        init_waitqueue_head(&dlm->ast_wq);
1987        init_waitqueue_head(&dlm->migration_wq);
1988        INIT_LIST_HEAD(&dlm->mle_hb_events);
1989
1990        dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
1991        init_waitqueue_head(&dlm->dlm_join_events);
1992
1993        dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
1994        dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
1995
1996        atomic_set(&dlm->res_tot_count, 0);
1997        atomic_set(&dlm->res_cur_count, 0);
1998        for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) {
1999                atomic_set(&dlm->mle_tot_count[i], 0);
2000                atomic_set(&dlm->mle_cur_count[i], 0);
2001        }
2002
2003        spin_lock_init(&dlm->work_lock);
2004        INIT_LIST_HEAD(&dlm->work_list);
2005        INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work);
2006
2007        kref_init(&dlm->dlm_refs);
2008        dlm->dlm_state = DLM_CTXT_NEW;
2009
2010        INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks);
2011
2012        mlog(0, "context init: refcount %u\n",
2013                  atomic_read(&dlm->dlm_refs.refcount));
2014
2015leave:
2016        return dlm;
2017}
2018
2019/*
2020 * Compare a requested locking protocol version against the current one.
2021 *
2022 * If the major numbers are different, they are incompatible.
2023 * If the current minor is greater than the request, they are incompatible.
2024 * If the current minor is less than or equal to the request, they are
2025 * compatible, and the requester should run at the current minor version.
2026 */
2027static int dlm_protocol_compare(struct dlm_protocol_version *existing,
2028                                struct dlm_protocol_version *request)
2029{
2030        if (existing->pv_major != request->pv_major)
2031                return 1;
2032
2033        if (existing->pv_minor > request->pv_minor)
2034                return 1;
2035
2036        if (existing->pv_minor < request->pv_minor)
2037                request->pv_minor = existing->pv_minor;
2038
2039        return 0;
2040}
2041
2042/*
2043 * dlm_register_domain: one-time setup per "domain".
2044 *
2045 * The filesystem passes in the requested locking version via proto.
2046 * If registration was successful, proto will contain the negotiated
2047 * locking protocol.
2048 */
2049struct dlm_ctxt * dlm_register_domain(const char *domain,
2050                               u32 key,
2051                               struct dlm_protocol_version *fs_proto)
2052{
2053        int ret;
2054        struct dlm_ctxt *dlm = NULL;
2055        struct dlm_ctxt *new_ctxt = NULL;
2056
2057        if (strlen(domain) >= O2NM_MAX_NAME_LEN) {
2058                ret = -ENAMETOOLONG;
2059                mlog(ML_ERROR, "domain name length too long\n");
2060                goto leave;
2061        }
2062
2063        if (!o2hb_check_local_node_heartbeating()) {
2064                mlog(ML_ERROR, "the local node has not been configured, or is "
2065                     "not heartbeating\n");
2066                ret = -EPROTO;
2067                goto leave;
2068        }
2069
2070        mlog(0, "register called for domain \"%s\"\n", domain);
2071
2072retry:
2073        dlm = NULL;
2074        if (signal_pending(current)) {
2075                ret = -ERESTARTSYS;
2076                mlog_errno(ret);
2077                goto leave;
2078        }
2079
2080        spin_lock(&dlm_domain_lock);
2081
2082        dlm = __dlm_lookup_domain(domain);
2083        if (dlm) {
2084                if (dlm->dlm_state != DLM_CTXT_JOINED) {
2085                        spin_unlock(&dlm_domain_lock);
2086
2087                        mlog(0, "This ctxt is not joined yet!\n");
2088                        wait_event_interruptible(dlm_domain_events,
2089                                                 dlm_wait_on_domain_helper(
2090                                                         domain));
2091                        goto retry;
2092                }
2093
2094                if (dlm_protocol_compare(&dlm->fs_locking_proto, fs_proto)) {
2095                        spin_unlock(&dlm_domain_lock);
2096                        mlog(ML_ERROR,
2097                             "Requested locking protocol version is not "
2098                             "compatible with already registered domain "
2099                             "\"%s\"\n", domain);
2100                        ret = -EPROTO;
2101                        goto leave;
2102                }
2103
2104                __dlm_get(dlm);
2105                dlm->num_joins++;
2106
2107                spin_unlock(&dlm_domain_lock);
2108
2109                ret = 0;
2110                goto leave;
2111        }
2112
2113        /* doesn't exist */
2114        if (!new_ctxt) {
2115                spin_unlock(&dlm_domain_lock);
2116
2117                new_ctxt = dlm_alloc_ctxt(domain, key);
2118                if (new_ctxt)
2119                        goto retry;
2120
2121                ret = -ENOMEM;
2122                mlog_errno(ret);
2123                goto leave;
2124        }
2125
2126        /* a little variable switch-a-roo here... */
2127        dlm = new_ctxt;
2128        new_ctxt = NULL;
2129
2130        /* add the new domain */
2131        list_add_tail(&dlm->list, &dlm_domains);
2132        spin_unlock(&dlm_domain_lock);
2133
2134        /*
2135         * Pass the locking protocol version into the join.  If the join
2136         * succeeds, it will have the negotiated protocol set.
2137         */
2138        dlm->dlm_locking_proto = dlm_protocol;
2139        dlm->fs_locking_proto = *fs_proto;
2140
2141        ret = dlm_join_domain(dlm);
2142        if (ret) {
2143                mlog_errno(ret);
2144                dlm_put(dlm);
2145                goto leave;
2146        }
2147
2148        /* Tell the caller what locking protocol we negotiated */
2149        *fs_proto = dlm->fs_locking_proto;
2150
2151        ret = 0;
2152leave:
2153        if (new_ctxt)
2154                dlm_free_ctxt_mem(new_ctxt);
2155
2156        if (ret < 0)
2157                dlm = ERR_PTR(ret);
2158
2159        return dlm;
2160}
2161EXPORT_SYMBOL_GPL(dlm_register_domain);
2162
2163static LIST_HEAD(dlm_join_handlers);
2164
2165static void dlm_unregister_net_handlers(void)
2166{
2167        o2net_unregister_handler_list(&dlm_join_handlers);
2168}
2169
2170static int dlm_register_net_handlers(void)
2171{
2172        int status = 0;
2173
2174        status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
2175                                        sizeof(struct dlm_query_join_request),
2176                                        dlm_query_join_handler,
2177                                        NULL, NULL, &dlm_join_handlers);
2178        if (status)
2179                goto bail;
2180
2181        status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
2182                                        sizeof(struct dlm_assert_joined),
2183                                        dlm_assert_joined_handler,
2184                                        NULL, NULL, &dlm_join_handlers);
2185        if (status)
2186                goto bail;
2187
2188        status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
2189                                        sizeof(struct dlm_cancel_join),
2190                                        dlm_cancel_join_handler,
2191                                        NULL, NULL, &dlm_join_handlers);
2192        if (status)
2193                goto bail;
2194
2195        status = o2net_register_handler(DLM_QUERY_REGION, DLM_MOD_KEY,
2196                                        sizeof(struct dlm_query_region),
2197                                        dlm_query_region_handler,
2198                                        NULL, NULL, &dlm_join_handlers);
2199
2200        if (status)
2201                goto bail;
2202
2203        status = o2net_register_handler(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
2204                                        sizeof(struct dlm_query_nodeinfo),
2205                                        dlm_query_nodeinfo_handler,
2206                                        NULL, NULL, &dlm_join_handlers);
2207bail:
2208        if (status < 0)
2209                dlm_unregister_net_handlers();
2210
2211        return status;
2212}
2213
2214/* Domain eviction callback handling.
2215 *
2216 * The file system requires notification of node death *before* the
2217 * dlm completes it's recovery work, otherwise it may be able to
2218 * acquire locks on resources requiring recovery. Since the dlm can
2219 * evict a node from it's domain *before* heartbeat fires, a similar
2220 * mechanism is required. */
2221
2222/* Eviction is not expected to happen often, so a per-domain lock is
2223 * not necessary. Eviction callbacks are allowed to sleep for short
2224 * periods of time. */
2225static DECLARE_RWSEM(dlm_callback_sem);
2226
2227void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm,
2228                                        int node_num)
2229{
2230        struct list_head *iter;
2231        struct dlm_eviction_cb *cb;
2232
2233        down_read(&dlm_callback_sem);
2234        list_for_each(iter, &dlm->dlm_eviction_callbacks) {
2235                cb = list_entry(iter, struct dlm_eviction_cb, ec_item);
2236
2237                cb->ec_func(node_num, cb->ec_data);
2238        }
2239        up_read(&dlm_callback_sem);
2240}
2241
2242void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb,
2243                           dlm_eviction_func *f,
2244                           void *data)
2245{
2246        INIT_LIST_HEAD(&cb->ec_item);
2247        cb->ec_func = f;
2248        cb->ec_data = data;
2249}
2250EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb);
2251
2252void dlm_register_eviction_cb(struct dlm_ctxt *dlm,
2253                              struct dlm_eviction_cb *cb)
2254{
2255        down_write(&dlm_callback_sem);
2256        list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks);
2257        up_write(&dlm_callback_sem);
2258}
2259EXPORT_SYMBOL_GPL(dlm_register_eviction_cb);
2260
2261void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb)
2262{
2263        down_write(&dlm_callback_sem);
2264        list_del_init(&cb->ec_item);
2265        up_write(&dlm_callback_sem);
2266}
2267EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb);
2268
2269static int __init dlm_init(void)
2270{
2271        int status;
2272
2273        dlm_print_version();
2274
2275        status = dlm_init_mle_cache();
2276        if (status) {
2277                mlog(ML_ERROR, "Could not create o2dlm_mle slabcache\n");
2278                goto error;
2279        }
2280
2281        status = dlm_init_master_caches();
2282        if (status) {
2283                mlog(ML_ERROR, "Could not create o2dlm_lockres and "
2284                     "o2dlm_lockname slabcaches\n");
2285                goto error;
2286        }
2287
2288        status = dlm_init_lock_cache();
2289        if (status) {
2290                mlog(ML_ERROR, "Count not create o2dlm_lock slabcache\n");
2291                goto error;
2292        }
2293
2294        status = dlm_register_net_handlers();
2295        if (status) {
2296                mlog(ML_ERROR, "Unable to register network handlers\n");
2297                goto error;
2298        }
2299
2300        status = dlm_create_debugfs_root();
2301        if (status)
2302                goto error;
2303
2304        return 0;
2305error:
2306        dlm_unregister_net_handlers();
2307        dlm_destroy_lock_cache();
2308        dlm_destroy_master_caches();
2309        dlm_destroy_mle_cache();
2310        return -1;
2311}
2312
2313static void __exit dlm_exit (void)
2314{
2315        dlm_destroy_debugfs_root();
2316        dlm_unregister_net_handlers();
2317        dlm_destroy_lock_cache();
2318        dlm_destroy_master_caches();
2319        dlm_destroy_mle_cache();
2320}
2321
2322MODULE_AUTHOR("Oracle");
2323MODULE_LICENSE("GPL");
2324
2325module_init(dlm_init);
2326module_exit(dlm_exit);
2327