linux/fs/ocfs2/dlm/dlmmaster.c
<<
>>
Prefs
   1/* -*- mode: c; c-basic-offset: 8; -*-
   2 * vim: noexpandtab sw=8 ts=8 sts=0:
   3 *
   4 * dlmmod.c
   5 *
   6 * standalone DLM module
   7 *
   8 * Copyright (C) 2004 Oracle.  All rights reserved.
   9 *
  10 * This program is free software; you can redistribute it and/or
  11 * modify it under the terms of the GNU General Public
  12 * License as published by the Free Software Foundation; either
  13 * version 2 of the License, or (at your option) any later version.
  14 *
  15 * This program is distributed in the hope that it will be useful,
  16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  18 * General Public License for more details.
  19 *
  20 * You should have received a copy of the GNU General Public
  21 * License along with this program; if not, write to the
  22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  23 * Boston, MA 021110-1307, USA.
  24 *
  25 */
  26
  27
  28#include <linux/module.h>
  29#include <linux/fs.h>
  30#include <linux/types.h>
  31#include <linux/slab.h>
  32#include <linux/highmem.h>
  33#include <linux/init.h>
  34#include <linux/sysctl.h>
  35#include <linux/random.h>
  36#include <linux/blkdev.h>
  37#include <linux/socket.h>
  38#include <linux/inet.h>
  39#include <linux/spinlock.h>
  40#include <linux/delay.h>
  41
  42
  43#include "cluster/heartbeat.h"
  44#include "cluster/nodemanager.h"
  45#include "cluster/tcp.h"
  46
  47#include "dlmapi.h"
  48#include "dlmcommon.h"
  49#include "dlmdomain.h"
  50#include "dlmdebug.h"
  51
  52#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
  53#include "cluster/masklog.h"
  54
  55static void dlm_mle_node_down(struct dlm_ctxt *dlm,
  56                              struct dlm_master_list_entry *mle,
  57                              struct o2nm_node *node,
  58                              int idx);
  59static void dlm_mle_node_up(struct dlm_ctxt *dlm,
  60                            struct dlm_master_list_entry *mle,
  61                            struct o2nm_node *node,
  62                            int idx);
  63
  64static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
  65static int dlm_do_assert_master(struct dlm_ctxt *dlm,
  66                                struct dlm_lock_resource *res,
  67                                void *nodemap, u32 flags);
  68static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
  69
  70static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
  71                                struct dlm_master_list_entry *mle,
  72                                const char *name,
  73                                unsigned int namelen)
  74{
  75        if (dlm != mle->dlm)
  76                return 0;
  77
  78        if (namelen != mle->mnamelen ||
  79            memcmp(name, mle->mname, namelen) != 0)
  80                return 0;
  81
  82        return 1;
  83}
  84
  85static struct kmem_cache *dlm_lockres_cache;
  86static struct kmem_cache *dlm_lockname_cache;
  87static struct kmem_cache *dlm_mle_cache;
  88
  89static void dlm_mle_release(struct kref *kref);
  90static void dlm_init_mle(struct dlm_master_list_entry *mle,
  91                        enum dlm_mle_type type,
  92                        struct dlm_ctxt *dlm,
  93                        struct dlm_lock_resource *res,
  94                        const char *name,
  95                        unsigned int namelen);
  96static void dlm_put_mle(struct dlm_master_list_entry *mle);
  97static void __dlm_put_mle(struct dlm_master_list_entry *mle);
  98static int dlm_find_mle(struct dlm_ctxt *dlm,
  99                        struct dlm_master_list_entry **mle,
 100                        char *name, unsigned int namelen);
 101
 102static int dlm_do_master_request(struct dlm_lock_resource *res,
 103                                 struct dlm_master_list_entry *mle, int to);
 104
 105
 106static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
 107                                     struct dlm_lock_resource *res,
 108                                     struct dlm_master_list_entry *mle,
 109                                     int *blocked);
 110static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
 111                                    struct dlm_lock_resource *res,
 112                                    struct dlm_master_list_entry *mle,
 113                                    int blocked);
 114static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
 115                                 struct dlm_lock_resource *res,
 116                                 struct dlm_master_list_entry *mle,
 117                                 struct dlm_master_list_entry **oldmle,
 118                                 const char *name, unsigned int namelen,
 119                                 u8 new_master, u8 master);
 120
 121static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
 122                                    struct dlm_lock_resource *res);
 123static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
 124                                      struct dlm_lock_resource *res);
 125static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
 126                                       struct dlm_lock_resource *res,
 127                                       u8 target);
 128static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
 129                                       struct dlm_lock_resource *res);
 130
 131
 132int dlm_is_host_down(int errno)
 133{
 134        switch (errno) {
 135                case -EBADF:
 136                case -ECONNREFUSED:
 137                case -ENOTCONN:
 138                case -ECONNRESET:
 139                case -EPIPE:
 140                case -EHOSTDOWN:
 141                case -EHOSTUNREACH:
 142                case -ETIMEDOUT:
 143                case -ECONNABORTED:
 144                case -ENETDOWN:
 145                case -ENETUNREACH:
 146                case -ENETRESET:
 147                case -ESHUTDOWN:
 148                case -ENOPROTOOPT:
 149                case -EINVAL:   /* if returned from our tcp code,
 150                                   this means there is no socket */
 151                        return 1;
 152        }
 153        return 0;
 154}
 155
 156
 157/*
 158 * MASTER LIST FUNCTIONS
 159 */
 160
 161
 162/*
 163 * regarding master list entries and heartbeat callbacks:
 164 *
 165 * in order to avoid sleeping and allocation that occurs in
 166 * heartbeat, master list entries are simply attached to the
 167 * dlm's established heartbeat callbacks.  the mle is attached
 168 * when it is created, and since the dlm->spinlock is held at
 169 * that time, any heartbeat event will be properly discovered
 170 * by the mle.  the mle needs to be detached from the
 171 * dlm->mle_hb_events list as soon as heartbeat events are no
 172 * longer useful to the mle, and before the mle is freed.
 173 *
 174 * as a general rule, heartbeat events are no longer needed by
 175 * the mle once an "answer" regarding the lock master has been
 176 * received.
 177 */
 178static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
 179                                              struct dlm_master_list_entry *mle)
 180{
 181        assert_spin_locked(&dlm->spinlock);
 182
 183        list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
 184}
 185
 186
 187static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
 188                                              struct dlm_master_list_entry *mle)
 189{
 190        if (!list_empty(&mle->hb_events))
 191                list_del_init(&mle->hb_events);
 192}
 193
 194
 195static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
 196                                            struct dlm_master_list_entry *mle)
 197{
 198        spin_lock(&dlm->spinlock);
 199        __dlm_mle_detach_hb_events(dlm, mle);
 200        spin_unlock(&dlm->spinlock);
 201}
 202
 203static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
 204{
 205        struct dlm_ctxt *dlm;
 206        dlm = mle->dlm;
 207
 208        assert_spin_locked(&dlm->spinlock);
 209        assert_spin_locked(&dlm->master_lock);
 210        mle->inuse++;
 211        kref_get(&mle->mle_refs);
 212}
 213
 214static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
 215{
 216        struct dlm_ctxt *dlm;
 217        dlm = mle->dlm;
 218
 219        spin_lock(&dlm->spinlock);
 220        spin_lock(&dlm->master_lock);
 221        mle->inuse--;
 222        __dlm_put_mle(mle);
 223        spin_unlock(&dlm->master_lock);
 224        spin_unlock(&dlm->spinlock);
 225
 226}
 227
 228/* remove from list and free */
 229static void __dlm_put_mle(struct dlm_master_list_entry *mle)
 230{
 231        struct dlm_ctxt *dlm;
 232        dlm = mle->dlm;
 233
 234        assert_spin_locked(&dlm->spinlock);
 235        assert_spin_locked(&dlm->master_lock);
 236        if (!atomic_read(&mle->mle_refs.refcount)) {
 237                /* this may or may not crash, but who cares.
 238                 * it's a BUG. */
 239                mlog(ML_ERROR, "bad mle: %p\n", mle);
 240                dlm_print_one_mle(mle);
 241                BUG();
 242        } else
 243                kref_put(&mle->mle_refs, dlm_mle_release);
 244}
 245
 246
 247/* must not have any spinlocks coming in */
 248static void dlm_put_mle(struct dlm_master_list_entry *mle)
 249{
 250        struct dlm_ctxt *dlm;
 251        dlm = mle->dlm;
 252
 253        spin_lock(&dlm->spinlock);
 254        spin_lock(&dlm->master_lock);
 255        __dlm_put_mle(mle);
 256        spin_unlock(&dlm->master_lock);
 257        spin_unlock(&dlm->spinlock);
 258}
 259
 260static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
 261{
 262        kref_get(&mle->mle_refs);
 263}
 264
 265static void dlm_init_mle(struct dlm_master_list_entry *mle,
 266                        enum dlm_mle_type type,
 267                        struct dlm_ctxt *dlm,
 268                        struct dlm_lock_resource *res,
 269                        const char *name,
 270                        unsigned int namelen)
 271{
 272        assert_spin_locked(&dlm->spinlock);
 273
 274        mle->dlm = dlm;
 275        mle->type = type;
 276        INIT_HLIST_NODE(&mle->master_hash_node);
 277        INIT_LIST_HEAD(&mle->hb_events);
 278        memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
 279        spin_lock_init(&mle->spinlock);
 280        init_waitqueue_head(&mle->wq);
 281        atomic_set(&mle->woken, 0);
 282        kref_init(&mle->mle_refs);
 283        memset(mle->response_map, 0, sizeof(mle->response_map));
 284        mle->master = O2NM_MAX_NODES;
 285        mle->new_master = O2NM_MAX_NODES;
 286        mle->inuse = 0;
 287
 288        BUG_ON(mle->type != DLM_MLE_BLOCK &&
 289               mle->type != DLM_MLE_MASTER &&
 290               mle->type != DLM_MLE_MIGRATION);
 291
 292        if (mle->type == DLM_MLE_MASTER) {
 293                BUG_ON(!res);
 294                mle->mleres = res;
 295                memcpy(mle->mname, res->lockname.name, res->lockname.len);
 296                mle->mnamelen = res->lockname.len;
 297                mle->mnamehash = res->lockname.hash;
 298        } else {
 299                BUG_ON(!name);
 300                mle->mleres = NULL;
 301                memcpy(mle->mname, name, namelen);
 302                mle->mnamelen = namelen;
 303                mle->mnamehash = dlm_lockid_hash(name, namelen);
 304        }
 305
 306        atomic_inc(&dlm->mle_tot_count[mle->type]);
 307        atomic_inc(&dlm->mle_cur_count[mle->type]);
 308
 309        /* copy off the node_map and register hb callbacks on our copy */
 310        memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
 311        memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
 312        clear_bit(dlm->node_num, mle->vote_map);
 313        clear_bit(dlm->node_num, mle->node_map);
 314
 315        /* attach the mle to the domain node up/down events */
 316        __dlm_mle_attach_hb_events(dlm, mle);
 317}
 318
 319void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
 320{
 321        assert_spin_locked(&dlm->spinlock);
 322        assert_spin_locked(&dlm->master_lock);
 323
 324        if (!hlist_unhashed(&mle->master_hash_node))
 325                hlist_del_init(&mle->master_hash_node);
 326}
 327
 328void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
 329{
 330        struct hlist_head *bucket;
 331
 332        assert_spin_locked(&dlm->master_lock);
 333
 334        bucket = dlm_master_hash(dlm, mle->mnamehash);
 335        hlist_add_head(&mle->master_hash_node, bucket);
 336}
 337
 338/* returns 1 if found, 0 if not */
 339static int dlm_find_mle(struct dlm_ctxt *dlm,
 340                        struct dlm_master_list_entry **mle,
 341                        char *name, unsigned int namelen)
 342{
 343        struct dlm_master_list_entry *tmpmle;
 344        struct hlist_head *bucket;
 345        unsigned int hash;
 346
 347        assert_spin_locked(&dlm->master_lock);
 348
 349        hash = dlm_lockid_hash(name, namelen);
 350        bucket = dlm_master_hash(dlm, hash);
 351        hlist_for_each_entry(tmpmle, bucket, master_hash_node) {
 352                if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
 353                        continue;
 354                dlm_get_mle(tmpmle);
 355                *mle = tmpmle;
 356                return 1;
 357        }
 358        return 0;
 359}
 360
 361void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
 362{
 363        struct dlm_master_list_entry *mle;
 364
 365        assert_spin_locked(&dlm->spinlock);
 366
 367        list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
 368                if (node_up)
 369                        dlm_mle_node_up(dlm, mle, NULL, idx);
 370                else
 371                        dlm_mle_node_down(dlm, mle, NULL, idx);
 372        }
 373}
 374
 375static void dlm_mle_node_down(struct dlm_ctxt *dlm,
 376                              struct dlm_master_list_entry *mle,
 377                              struct o2nm_node *node, int idx)
 378{
 379        spin_lock(&mle->spinlock);
 380
 381        if (!test_bit(idx, mle->node_map))
 382                mlog(0, "node %u already removed from nodemap!\n", idx);
 383        else
 384                clear_bit(idx, mle->node_map);
 385
 386        spin_unlock(&mle->spinlock);
 387}
 388
 389static void dlm_mle_node_up(struct dlm_ctxt *dlm,
 390                            struct dlm_master_list_entry *mle,
 391                            struct o2nm_node *node, int idx)
 392{
 393        spin_lock(&mle->spinlock);
 394
 395        if (test_bit(idx, mle->node_map))
 396                mlog(0, "node %u already in node map!\n", idx);
 397        else
 398                set_bit(idx, mle->node_map);
 399
 400        spin_unlock(&mle->spinlock);
 401}
 402
 403
 404int dlm_init_mle_cache(void)
 405{
 406        dlm_mle_cache = kmem_cache_create("o2dlm_mle",
 407                                          sizeof(struct dlm_master_list_entry),
 408                                          0, SLAB_HWCACHE_ALIGN,
 409                                          NULL);
 410        if (dlm_mle_cache == NULL)
 411                return -ENOMEM;
 412        return 0;
 413}
 414
 415void dlm_destroy_mle_cache(void)
 416{
 417        if (dlm_mle_cache)
 418                kmem_cache_destroy(dlm_mle_cache);
 419}
 420
 421static void dlm_mle_release(struct kref *kref)
 422{
 423        struct dlm_master_list_entry *mle;
 424        struct dlm_ctxt *dlm;
 425
 426        mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
 427        dlm = mle->dlm;
 428
 429        assert_spin_locked(&dlm->spinlock);
 430        assert_spin_locked(&dlm->master_lock);
 431
 432        mlog(0, "Releasing mle for %.*s, type %d\n", mle->mnamelen, mle->mname,
 433             mle->type);
 434
 435        /* remove from list if not already */
 436        __dlm_unlink_mle(dlm, mle);
 437
 438        /* detach the mle from the domain node up/down events */
 439        __dlm_mle_detach_hb_events(dlm, mle);
 440
 441        atomic_dec(&dlm->mle_cur_count[mle->type]);
 442
 443        /* NOTE: kfree under spinlock here.
 444         * if this is bad, we can move this to a freelist. */
 445        kmem_cache_free(dlm_mle_cache, mle);
 446}
 447
 448
 449/*
 450 * LOCK RESOURCE FUNCTIONS
 451 */
 452
 453int dlm_init_master_caches(void)
 454{
 455        dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
 456                                              sizeof(struct dlm_lock_resource),
 457                                              0, SLAB_HWCACHE_ALIGN, NULL);
 458        if (!dlm_lockres_cache)
 459                goto bail;
 460
 461        dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
 462                                               DLM_LOCKID_NAME_MAX, 0,
 463                                               SLAB_HWCACHE_ALIGN, NULL);
 464        if (!dlm_lockname_cache)
 465                goto bail;
 466
 467        return 0;
 468bail:
 469        dlm_destroy_master_caches();
 470        return -ENOMEM;
 471}
 472
 473void dlm_destroy_master_caches(void)
 474{
 475        if (dlm_lockname_cache) {
 476                kmem_cache_destroy(dlm_lockname_cache);
 477                dlm_lockname_cache = NULL;
 478        }
 479
 480        if (dlm_lockres_cache) {
 481                kmem_cache_destroy(dlm_lockres_cache);
 482                dlm_lockres_cache = NULL;
 483        }
 484}
 485
 486static void dlm_lockres_release(struct kref *kref)
 487{
 488        struct dlm_lock_resource *res;
 489        struct dlm_ctxt *dlm;
 490
 491        res = container_of(kref, struct dlm_lock_resource, refs);
 492        dlm = res->dlm;
 493
 494        /* This should not happen -- all lockres' have a name
 495         * associated with them at init time. */
 496        BUG_ON(!res->lockname.name);
 497
 498        mlog(0, "destroying lockres %.*s\n", res->lockname.len,
 499             res->lockname.name);
 500
 501        spin_lock(&dlm->track_lock);
 502        if (!list_empty(&res->tracking))
 503                list_del_init(&res->tracking);
 504        else {
 505                mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n",
 506                     res->lockname.len, res->lockname.name);
 507                dlm_print_one_lock_resource(res);
 508        }
 509        spin_unlock(&dlm->track_lock);
 510
 511        atomic_dec(&dlm->res_cur_count);
 512
 513        if (!hlist_unhashed(&res->hash_node) ||
 514            !list_empty(&res->granted) ||
 515            !list_empty(&res->converting) ||
 516            !list_empty(&res->blocked) ||
 517            !list_empty(&res->dirty) ||
 518            !list_empty(&res->recovering) ||
 519            !list_empty(&res->purge)) {
 520                mlog(ML_ERROR,
 521                     "Going to BUG for resource %.*s."
 522                     "  We're on a list! [%c%c%c%c%c%c%c]\n",
 523                     res->lockname.len, res->lockname.name,
 524                     !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
 525                     !list_empty(&res->granted) ? 'G' : ' ',
 526                     !list_empty(&res->converting) ? 'C' : ' ',
 527                     !list_empty(&res->blocked) ? 'B' : ' ',
 528                     !list_empty(&res->dirty) ? 'D' : ' ',
 529                     !list_empty(&res->recovering) ? 'R' : ' ',
 530                     !list_empty(&res->purge) ? 'P' : ' ');
 531
 532                dlm_print_one_lock_resource(res);
 533        }
 534
 535        /* By the time we're ready to blow this guy away, we shouldn't
 536         * be on any lists. */
 537        BUG_ON(!hlist_unhashed(&res->hash_node));
 538        BUG_ON(!list_empty(&res->granted));
 539        BUG_ON(!list_empty(&res->converting));
 540        BUG_ON(!list_empty(&res->blocked));
 541        BUG_ON(!list_empty(&res->dirty));
 542        BUG_ON(!list_empty(&res->recovering));
 543        BUG_ON(!list_empty(&res->purge));
 544
 545        kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
 546
 547        kmem_cache_free(dlm_lockres_cache, res);
 548}
 549
 550void dlm_lockres_put(struct dlm_lock_resource *res)
 551{
 552        kref_put(&res->refs, dlm_lockres_release);
 553}
 554
 555static void dlm_init_lockres(struct dlm_ctxt *dlm,
 556                             struct dlm_lock_resource *res,
 557                             const char *name, unsigned int namelen)
 558{
 559        char *qname;
 560
 561        /* If we memset here, we lose our reference to the kmalloc'd
 562         * res->lockname.name, so be sure to init every field
 563         * correctly! */
 564
 565        qname = (char *) res->lockname.name;
 566        memcpy(qname, name, namelen);
 567
 568        res->lockname.len = namelen;
 569        res->lockname.hash = dlm_lockid_hash(name, namelen);
 570
 571        init_waitqueue_head(&res->wq);
 572        spin_lock_init(&res->spinlock);
 573        INIT_HLIST_NODE(&res->hash_node);
 574        INIT_LIST_HEAD(&res->granted);
 575        INIT_LIST_HEAD(&res->converting);
 576        INIT_LIST_HEAD(&res->blocked);
 577        INIT_LIST_HEAD(&res->dirty);
 578        INIT_LIST_HEAD(&res->recovering);
 579        INIT_LIST_HEAD(&res->purge);
 580        INIT_LIST_HEAD(&res->tracking);
 581        atomic_set(&res->asts_reserved, 0);
 582        res->migration_pending = 0;
 583        res->inflight_locks = 0;
 584        res->inflight_assert_workers = 0;
 585
 586        res->dlm = dlm;
 587
 588        kref_init(&res->refs);
 589
 590        atomic_inc(&dlm->res_tot_count);
 591        atomic_inc(&dlm->res_cur_count);
 592
 593        /* just for consistency */
 594        spin_lock(&res->spinlock);
 595        dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
 596        spin_unlock(&res->spinlock);
 597
 598        res->state = DLM_LOCK_RES_IN_PROGRESS;
 599
 600        res->last_used = 0;
 601
 602        spin_lock(&dlm->spinlock);
 603        list_add_tail(&res->tracking, &dlm->tracking_list);
 604        spin_unlock(&dlm->spinlock);
 605
 606        memset(res->lvb, 0, DLM_LVB_LEN);
 607        memset(res->refmap, 0, sizeof(res->refmap));
 608}
 609
 610struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
 611                                   const char *name,
 612                                   unsigned int namelen)
 613{
 614        struct dlm_lock_resource *res = NULL;
 615
 616        res = kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
 617        if (!res)
 618                goto error;
 619
 620        res->lockname.name = kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
 621        if (!res->lockname.name)
 622                goto error;
 623
 624        dlm_init_lockres(dlm, res, name, namelen);
 625        return res;
 626
 627error:
 628        if (res)
 629                kmem_cache_free(dlm_lockres_cache, res);
 630        return NULL;
 631}
 632
 633void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm,
 634                                struct dlm_lock_resource *res, int bit)
 635{
 636        assert_spin_locked(&res->spinlock);
 637
 638        mlog(0, "res %.*s, set node %u, %ps()\n", res->lockname.len,
 639             res->lockname.name, bit, __builtin_return_address(0));
 640
 641        set_bit(bit, res->refmap);
 642}
 643
 644void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm,
 645                                  struct dlm_lock_resource *res, int bit)
 646{
 647        assert_spin_locked(&res->spinlock);
 648
 649        mlog(0, "res %.*s, clr node %u, %ps()\n", res->lockname.len,
 650             res->lockname.name, bit, __builtin_return_address(0));
 651
 652        clear_bit(bit, res->refmap);
 653}
 654
 655static void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
 656                                   struct dlm_lock_resource *res)
 657{
 658        res->inflight_locks++;
 659
 660        mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name,
 661             res->lockname.len, res->lockname.name, res->inflight_locks,
 662             __builtin_return_address(0));
 663}
 664
 665void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
 666                                   struct dlm_lock_resource *res)
 667{
 668        assert_spin_locked(&res->spinlock);
 669        __dlm_lockres_grab_inflight_ref(dlm, res);
 670}
 671
 672void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
 673                                   struct dlm_lock_resource *res)
 674{
 675        assert_spin_locked(&res->spinlock);
 676
 677        BUG_ON(res->inflight_locks == 0);
 678
 679        res->inflight_locks--;
 680
 681        mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name,
 682             res->lockname.len, res->lockname.name, res->inflight_locks,
 683             __builtin_return_address(0));
 684
 685        wake_up(&res->wq);
 686}
 687
 688void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
 689                struct dlm_lock_resource *res)
 690{
 691        assert_spin_locked(&res->spinlock);
 692        res->inflight_assert_workers++;
 693        mlog(0, "%s:%.*s: inflight assert worker++: now %u\n",
 694                        dlm->name, res->lockname.len, res->lockname.name,
 695                        res->inflight_assert_workers);
 696}
 697
 698static void dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
 699                struct dlm_lock_resource *res)
 700{
 701        spin_lock(&res->spinlock);
 702        __dlm_lockres_grab_inflight_worker(dlm, res);
 703        spin_unlock(&res->spinlock);
 704}
 705
 706static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
 707                struct dlm_lock_resource *res)
 708{
 709        assert_spin_locked(&res->spinlock);
 710        BUG_ON(res->inflight_assert_workers == 0);
 711        res->inflight_assert_workers--;
 712        mlog(0, "%s:%.*s: inflight assert worker--: now %u\n",
 713                        dlm->name, res->lockname.len, res->lockname.name,
 714                        res->inflight_assert_workers);
 715}
 716
 717static void dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
 718                struct dlm_lock_resource *res)
 719{
 720        spin_lock(&res->spinlock);
 721        __dlm_lockres_drop_inflight_worker(dlm, res);
 722        spin_unlock(&res->spinlock);
 723}
 724
 725/*
 726 * lookup a lock resource by name.
 727 * may already exist in the hashtable.
 728 * lockid is null terminated
 729 *
 730 * if not, allocate enough for the lockres and for
 731 * the temporary structure used in doing the mastering.
 732 *
 733 * also, do a lookup in the dlm->master_list to see
 734 * if another node has begun mastering the same lock.
 735 * if so, there should be a block entry in there
 736 * for this name, and we should *not* attempt to master
 737 * the lock here.   need to wait around for that node
 738 * to assert_master (or die).
 739 *
 740 */
 741struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
 742                                          const char *lockid,
 743                                          int namelen,
 744                                          int flags)
 745{
 746        struct dlm_lock_resource *tmpres=NULL, *res=NULL;
 747        struct dlm_master_list_entry *mle = NULL;
 748        struct dlm_master_list_entry *alloc_mle = NULL;
 749        int blocked = 0;
 750        int ret, nodenum;
 751        struct dlm_node_iter iter;
 752        unsigned int hash;
 753        int tries = 0;
 754        int bit, wait_on_recovery = 0;
 755
 756        BUG_ON(!lockid);
 757
 758        hash = dlm_lockid_hash(lockid, namelen);
 759
 760        mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
 761
 762lookup:
 763        spin_lock(&dlm->spinlock);
 764        tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
 765        if (tmpres) {
 766                spin_unlock(&dlm->spinlock);
 767                spin_lock(&tmpres->spinlock);
 768                /* Wait on the thread that is mastering the resource */
 769                if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
 770                        __dlm_wait_on_lockres(tmpres);
 771                        BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
 772                        spin_unlock(&tmpres->spinlock);
 773                        dlm_lockres_put(tmpres);
 774                        tmpres = NULL;
 775                        goto lookup;
 776                }
 777
 778                /* Wait on the resource purge to complete before continuing */
 779                if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) {
 780                        BUG_ON(tmpres->owner == dlm->node_num);
 781                        __dlm_wait_on_lockres_flags(tmpres,
 782                                                    DLM_LOCK_RES_DROPPING_REF);
 783                        spin_unlock(&tmpres->spinlock);
 784                        dlm_lockres_put(tmpres);
 785                        tmpres = NULL;
 786                        goto lookup;
 787                }
 788
 789                /* Grab inflight ref to pin the resource */
 790                dlm_lockres_grab_inflight_ref(dlm, tmpres);
 791
 792                spin_unlock(&tmpres->spinlock);
 793                if (res)
 794                        dlm_lockres_put(res);
 795                res = tmpres;
 796                goto leave;
 797        }
 798
 799        if (!res) {
 800                spin_unlock(&dlm->spinlock);
 801                mlog(0, "allocating a new resource\n");
 802                /* nothing found and we need to allocate one. */
 803                alloc_mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
 804                if (!alloc_mle)
 805                        goto leave;
 806                res = dlm_new_lockres(dlm, lockid, namelen);
 807                if (!res)
 808                        goto leave;
 809                goto lookup;
 810        }
 811
 812        mlog(0, "no lockres found, allocated our own: %p\n", res);
 813
 814        if (flags & LKM_LOCAL) {
 815                /* caller knows it's safe to assume it's not mastered elsewhere
 816                 * DONE!  return right away */
 817                spin_lock(&res->spinlock);
 818                dlm_change_lockres_owner(dlm, res, dlm->node_num);
 819                __dlm_insert_lockres(dlm, res);
 820                dlm_lockres_grab_inflight_ref(dlm, res);
 821                spin_unlock(&res->spinlock);
 822                spin_unlock(&dlm->spinlock);
 823                /* lockres still marked IN_PROGRESS */
 824                goto wake_waiters;
 825        }
 826
 827        /* check master list to see if another node has started mastering it */
 828        spin_lock(&dlm->master_lock);
 829
 830        /* if we found a block, wait for lock to be mastered by another node */
 831        blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
 832        if (blocked) {
 833                int mig;
 834                if (mle->type == DLM_MLE_MASTER) {
 835                        mlog(ML_ERROR, "master entry for nonexistent lock!\n");
 836                        BUG();
 837                }
 838                mig = (mle->type == DLM_MLE_MIGRATION);
 839                /* if there is a migration in progress, let the migration
 840                 * finish before continuing.  we can wait for the absence
 841                 * of the MIGRATION mle: either the migrate finished or
 842                 * one of the nodes died and the mle was cleaned up.
 843                 * if there is a BLOCK here, but it already has a master
 844                 * set, we are too late.  the master does not have a ref
 845                 * for us in the refmap.  detach the mle and drop it.
 846                 * either way, go back to the top and start over. */
 847                if (mig || mle->master != O2NM_MAX_NODES) {
 848                        BUG_ON(mig && mle->master == dlm->node_num);
 849                        /* we arrived too late.  the master does not
 850                         * have a ref for us. retry. */
 851                        mlog(0, "%s:%.*s: late on %s\n",
 852                             dlm->name, namelen, lockid,
 853                             mig ?  "MIGRATION" : "BLOCK");
 854                        spin_unlock(&dlm->master_lock);
 855                        spin_unlock(&dlm->spinlock);
 856
 857                        /* master is known, detach */
 858                        if (!mig)
 859                                dlm_mle_detach_hb_events(dlm, mle);
 860                        dlm_put_mle(mle);
 861                        mle = NULL;
 862                        /* this is lame, but we can't wait on either
 863                         * the mle or lockres waitqueue here */
 864                        if (mig)
 865                                msleep(100);
 866                        goto lookup;
 867                }
 868        } else {
 869                /* go ahead and try to master lock on this node */
 870                mle = alloc_mle;
 871                /* make sure this does not get freed below */
 872                alloc_mle = NULL;
 873                dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
 874                set_bit(dlm->node_num, mle->maybe_map);
 875                __dlm_insert_mle(dlm, mle);
 876
 877                /* still holding the dlm spinlock, check the recovery map
 878                 * to see if there are any nodes that still need to be
 879                 * considered.  these will not appear in the mle nodemap
 880                 * but they might own this lockres.  wait on them. */
 881                bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
 882                if (bit < O2NM_MAX_NODES) {
 883                        mlog(0, "%s: res %.*s, At least one node (%d) "
 884                             "to recover before lock mastery can begin\n",
 885                             dlm->name, namelen, (char *)lockid, bit);
 886                        wait_on_recovery = 1;
 887                }
 888        }
 889
 890        /* at this point there is either a DLM_MLE_BLOCK or a
 891         * DLM_MLE_MASTER on the master list, so it's safe to add the
 892         * lockres to the hashtable.  anyone who finds the lock will
 893         * still have to wait on the IN_PROGRESS. */
 894
 895        /* finally add the lockres to its hash bucket */
 896        __dlm_insert_lockres(dlm, res);
 897
 898        /* since this lockres is new it doesn't not require the spinlock */
 899        __dlm_lockres_grab_inflight_ref(dlm, res);
 900
 901        /* get an extra ref on the mle in case this is a BLOCK
 902         * if so, the creator of the BLOCK may try to put the last
 903         * ref at this time in the assert master handler, so we
 904         * need an extra one to keep from a bad ptr deref. */
 905        dlm_get_mle_inuse(mle);
 906        spin_unlock(&dlm->master_lock);
 907        spin_unlock(&dlm->spinlock);
 908
 909redo_request:
 910        while (wait_on_recovery) {
 911                /* any cluster changes that occurred after dropping the
 912                 * dlm spinlock would be detectable be a change on the mle,
 913                 * so we only need to clear out the recovery map once. */
 914                if (dlm_is_recovery_lock(lockid, namelen)) {
 915                        mlog(0, "%s: Recovery map is not empty, but must "
 916                             "master $RECOVERY lock now\n", dlm->name);
 917                        if (!dlm_pre_master_reco_lockres(dlm, res))
 918                                wait_on_recovery = 0;
 919                        else {
 920                                mlog(0, "%s: waiting 500ms for heartbeat state "
 921                                    "change\n", dlm->name);
 922                                msleep(500);
 923                        }
 924                        continue;
 925                }
 926
 927                dlm_kick_recovery_thread(dlm);
 928                msleep(1000);
 929                dlm_wait_for_recovery(dlm);
 930
 931                spin_lock(&dlm->spinlock);
 932                bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
 933                if (bit < O2NM_MAX_NODES) {
 934                        mlog(0, "%s: res %.*s, At least one node (%d) "
 935                             "to recover before lock mastery can begin\n",
 936                             dlm->name, namelen, (char *)lockid, bit);
 937                        wait_on_recovery = 1;
 938                } else
 939                        wait_on_recovery = 0;
 940                spin_unlock(&dlm->spinlock);
 941
 942                if (wait_on_recovery)
 943                        dlm_wait_for_node_recovery(dlm, bit, 10000);
 944        }
 945
 946        /* must wait for lock to be mastered elsewhere */
 947        if (blocked)
 948                goto wait;
 949
 950        ret = -EINVAL;
 951        dlm_node_iter_init(mle->vote_map, &iter);
 952        while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
 953                ret = dlm_do_master_request(res, mle, nodenum);
 954                if (ret < 0)
 955                        mlog_errno(ret);
 956                if (mle->master != O2NM_MAX_NODES) {
 957                        /* found a master ! */
 958                        if (mle->master <= nodenum)
 959                                break;
 960                        /* if our master request has not reached the master
 961                         * yet, keep going until it does.  this is how the
 962                         * master will know that asserts are needed back to
 963                         * the lower nodes. */
 964                        mlog(0, "%s: res %.*s, Requests only up to %u but "
 965                             "master is %u, keep going\n", dlm->name, namelen,
 966                             lockid, nodenum, mle->master);
 967                }
 968        }
 969
 970wait:
 971        /* keep going until the response map includes all nodes */
 972        ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
 973        if (ret < 0) {
 974                wait_on_recovery = 1;
 975                mlog(0, "%s: res %.*s, Node map changed, redo the master "
 976                     "request now, blocked=%d\n", dlm->name, res->lockname.len,
 977                     res->lockname.name, blocked);
 978                if (++tries > 20) {
 979                        mlog(ML_ERROR, "%s: res %.*s, Spinning on "
 980                             "dlm_wait_for_lock_mastery, blocked = %d\n",
 981                             dlm->name, res->lockname.len,
 982                             res->lockname.name, blocked);
 983                        dlm_print_one_lock_resource(res);
 984                        dlm_print_one_mle(mle);
 985                        tries = 0;
 986                }
 987                goto redo_request;
 988        }
 989
 990        mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len,
 991             res->lockname.name, res->owner);
 992        /* make sure we never continue without this */
 993        BUG_ON(res->owner == O2NM_MAX_NODES);
 994
 995        /* master is known, detach if not already detached */
 996        dlm_mle_detach_hb_events(dlm, mle);
 997        dlm_put_mle(mle);
 998        /* put the extra ref */
 999        dlm_put_mle_inuse(mle);
1000
1001wake_waiters:
1002        spin_lock(&res->spinlock);
1003        res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
1004        spin_unlock(&res->spinlock);
1005        wake_up(&res->wq);
1006
1007leave:
1008        /* need to free the unused mle */
1009        if (alloc_mle)
1010                kmem_cache_free(dlm_mle_cache, alloc_mle);
1011
1012        return res;
1013}
1014
1015
1016#define DLM_MASTERY_TIMEOUT_MS   5000
1017
1018static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
1019                                     struct dlm_lock_resource *res,
1020                                     struct dlm_master_list_entry *mle,
1021                                     int *blocked)
1022{
1023        u8 m;
1024        int ret, bit;
1025        int map_changed, voting_done;
1026        int assert, sleep;
1027
1028recheck:
1029        ret = 0;
1030        assert = 0;
1031
1032        /* check if another node has already become the owner */
1033        spin_lock(&res->spinlock);
1034        if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1035                mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
1036                     res->lockname.len, res->lockname.name, res->owner);
1037                spin_unlock(&res->spinlock);
1038                /* this will cause the master to re-assert across
1039                 * the whole cluster, freeing up mles */
1040                if (res->owner != dlm->node_num) {
1041                        ret = dlm_do_master_request(res, mle, res->owner);
1042                        if (ret < 0) {
1043                                /* give recovery a chance to run */
1044                                mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
1045                                msleep(500);
1046                                goto recheck;
1047                        }
1048                }
1049                ret = 0;
1050                goto leave;
1051        }
1052        spin_unlock(&res->spinlock);
1053
1054        spin_lock(&mle->spinlock);
1055        m = mle->master;
1056        map_changed = (memcmp(mle->vote_map, mle->node_map,
1057                              sizeof(mle->vote_map)) != 0);
1058        voting_done = (memcmp(mle->vote_map, mle->response_map,
1059                             sizeof(mle->vote_map)) == 0);
1060
1061        /* restart if we hit any errors */
1062        if (map_changed) {
1063                int b;
1064                mlog(0, "%s: %.*s: node map changed, restarting\n",
1065                     dlm->name, res->lockname.len, res->lockname.name);
1066                ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
1067                b = (mle->type == DLM_MLE_BLOCK);
1068                if ((*blocked && !b) || (!*blocked && b)) {
1069                        mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
1070                             dlm->name, res->lockname.len, res->lockname.name,
1071                             *blocked, b);
1072                        *blocked = b;
1073                }
1074                spin_unlock(&mle->spinlock);
1075                if (ret < 0) {
1076                        mlog_errno(ret);
1077                        goto leave;
1078                }
1079                mlog(0, "%s:%.*s: restart lock mastery succeeded, "
1080                     "rechecking now\n", dlm->name, res->lockname.len,
1081                     res->lockname.name);
1082                goto recheck;
1083        } else {
1084                if (!voting_done) {
1085                        mlog(0, "map not changed and voting not done "
1086                             "for %s:%.*s\n", dlm->name, res->lockname.len,
1087                             res->lockname.name);
1088                }
1089        }
1090
1091        if (m != O2NM_MAX_NODES) {
1092                /* another node has done an assert!
1093                 * all done! */
1094                sleep = 0;
1095        } else {
1096                sleep = 1;
1097                /* have all nodes responded? */
1098                if (voting_done && !*blocked) {
1099                        bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
1100                        if (dlm->node_num <= bit) {
1101                                /* my node number is lowest.
1102                                 * now tell other nodes that I am
1103                                 * mastering this. */
1104                                mle->master = dlm->node_num;
1105                                /* ref was grabbed in get_lock_resource
1106                                 * will be dropped in dlmlock_master */
1107                                assert = 1;
1108                                sleep = 0;
1109                        }
1110                        /* if voting is done, but we have not received
1111                         * an assert master yet, we must sleep */
1112                }
1113        }
1114
1115        spin_unlock(&mle->spinlock);
1116
1117        /* sleep if we haven't finished voting yet */
1118        if (sleep) {
1119                unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
1120
1121                /*
1122                if (atomic_read(&mle->mle_refs.refcount) < 2)
1123                        mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
1124                        atomic_read(&mle->mle_refs.refcount),
1125                        res->lockname.len, res->lockname.name);
1126                */
1127                atomic_set(&mle->woken, 0);
1128                (void)wait_event_timeout(mle->wq,
1129                                         (atomic_read(&mle->woken) == 1),
1130                                         timeo);
1131                if (res->owner == O2NM_MAX_NODES) {
1132                        mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1133                             res->lockname.len, res->lockname.name);
1134                        goto recheck;
1135                }
1136                mlog(0, "done waiting, master is %u\n", res->owner);
1137                ret = 0;
1138                goto leave;
1139        }
1140
1141        ret = 0;   /* done */
1142        if (assert) {
1143                m = dlm->node_num;
1144                mlog(0, "about to master %.*s here, this=%u\n",
1145                     res->lockname.len, res->lockname.name, m);
1146                ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1147                if (ret) {
1148                        /* This is a failure in the network path,
1149                         * not in the response to the assert_master
1150                         * (any nonzero response is a BUG on this node).
1151                         * Most likely a socket just got disconnected
1152                         * due to node death. */
1153                        mlog_errno(ret);
1154                }
1155                /* no longer need to restart lock mastery.
1156                 * all living nodes have been contacted. */
1157                ret = 0;
1158        }
1159
1160        /* set the lockres owner */
1161        spin_lock(&res->spinlock);
1162        /* mastery reference obtained either during
1163         * assert_master_handler or in get_lock_resource */
1164        dlm_change_lockres_owner(dlm, res, m);
1165        spin_unlock(&res->spinlock);
1166
1167leave:
1168        return ret;
1169}
1170
1171struct dlm_bitmap_diff_iter
1172{
1173        int curnode;
1174        unsigned long *orig_bm;
1175        unsigned long *cur_bm;
1176        unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
1177};
1178
1179enum dlm_node_state_change
1180{
1181        NODE_DOWN = -1,
1182        NODE_NO_CHANGE = 0,
1183        NODE_UP
1184};
1185
1186static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
1187                                      unsigned long *orig_bm,
1188                                      unsigned long *cur_bm)
1189{
1190        unsigned long p1, p2;
1191        int i;
1192
1193        iter->curnode = -1;
1194        iter->orig_bm = orig_bm;
1195        iter->cur_bm = cur_bm;
1196
1197        for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1198                p1 = *(iter->orig_bm + i);
1199                p2 = *(iter->cur_bm + i);
1200                iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1201        }
1202}
1203
1204static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1205                                     enum dlm_node_state_change *state)
1206{
1207        int bit;
1208
1209        if (iter->curnode >= O2NM_MAX_NODES)
1210                return -ENOENT;
1211
1212        bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1213                            iter->curnode+1);
1214        if (bit >= O2NM_MAX_NODES) {
1215                iter->curnode = O2NM_MAX_NODES;
1216                return -ENOENT;
1217        }
1218
1219        /* if it was there in the original then this node died */
1220        if (test_bit(bit, iter->orig_bm))
1221                *state = NODE_DOWN;
1222        else
1223                *state = NODE_UP;
1224
1225        iter->curnode = bit;
1226        return bit;
1227}
1228
1229
1230static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1231                                    struct dlm_lock_resource *res,
1232                                    struct dlm_master_list_entry *mle,
1233                                    int blocked)
1234{
1235        struct dlm_bitmap_diff_iter bdi;
1236        enum dlm_node_state_change sc;
1237        int node;
1238        int ret = 0;
1239
1240        mlog(0, "something happened such that the "
1241             "master process may need to be restarted!\n");
1242
1243        assert_spin_locked(&mle->spinlock);
1244
1245        dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1246        node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1247        while (node >= 0) {
1248                if (sc == NODE_UP) {
1249                        /* a node came up.  clear any old vote from
1250                         * the response map and set it in the vote map
1251                         * then restart the mastery. */
1252                        mlog(ML_NOTICE, "node %d up while restarting\n", node);
1253
1254                        /* redo the master request, but only for the new node */
1255                        mlog(0, "sending request to new node\n");
1256                        clear_bit(node, mle->response_map);
1257                        set_bit(node, mle->vote_map);
1258                } else {
1259                        mlog(ML_ERROR, "node down! %d\n", node);
1260                        if (blocked) {
1261                                int lowest = find_next_bit(mle->maybe_map,
1262                                                       O2NM_MAX_NODES, 0);
1263
1264                                /* act like it was never there */
1265                                clear_bit(node, mle->maybe_map);
1266
1267                                if (node == lowest) {
1268                                        mlog(0, "expected master %u died"
1269                                            " while this node was blocked "
1270                                            "waiting on it!\n", node);
1271                                        lowest = find_next_bit(mle->maybe_map,
1272                                                        O2NM_MAX_NODES,
1273                                                        lowest+1);
1274                                        if (lowest < O2NM_MAX_NODES) {
1275                                                mlog(0, "%s:%.*s:still "
1276                                                     "blocked. waiting on %u "
1277                                                     "now\n", dlm->name,
1278                                                     res->lockname.len,
1279                                                     res->lockname.name,
1280                                                     lowest);
1281                                        } else {
1282                                                /* mle is an MLE_BLOCK, but
1283                                                 * there is now nothing left to
1284                                                 * block on.  we need to return
1285                                                 * all the way back out and try
1286                                                 * again with an MLE_MASTER.
1287                                                 * dlm_do_local_recovery_cleanup
1288                                                 * has already run, so the mle
1289                                                 * refcount is ok */
1290                                                mlog(0, "%s:%.*s: no "
1291                                                     "longer blocking. try to "
1292                                                     "master this here\n",
1293                                                     dlm->name,
1294                                                     res->lockname.len,
1295                                                     res->lockname.name);
1296                                                mle->type = DLM_MLE_MASTER;
1297                                                mle->mleres = res;
1298                                        }
1299                                }
1300                        }
1301
1302                        /* now blank out everything, as if we had never
1303                         * contacted anyone */
1304                        memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
1305                        memset(mle->response_map, 0, sizeof(mle->response_map));
1306                        /* reset the vote_map to the current node_map */
1307                        memcpy(mle->vote_map, mle->node_map,
1308                               sizeof(mle->node_map));
1309                        /* put myself into the maybe map */
1310                        if (mle->type != DLM_MLE_BLOCK)
1311                                set_bit(dlm->node_num, mle->maybe_map);
1312                }
1313                ret = -EAGAIN;
1314                node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1315        }
1316        return ret;
1317}
1318
1319
1320/*
1321 * DLM_MASTER_REQUEST_MSG
1322 *
1323 * returns: 0 on success,
1324 *          -errno on a network error
1325 *
1326 * on error, the caller should assume the target node is "dead"
1327 *
1328 */
1329
1330static int dlm_do_master_request(struct dlm_lock_resource *res,
1331                                 struct dlm_master_list_entry *mle, int to)
1332{
1333        struct dlm_ctxt *dlm = mle->dlm;
1334        struct dlm_master_request request;
1335        int ret, response=0, resend;
1336
1337        memset(&request, 0, sizeof(request));
1338        request.node_idx = dlm->node_num;
1339
1340        BUG_ON(mle->type == DLM_MLE_MIGRATION);
1341
1342        request.namelen = (u8)mle->mnamelen;
1343        memcpy(request.name, mle->mname, request.namelen);
1344
1345again:
1346        ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1347                                 sizeof(request), to, &response);
1348        if (ret < 0)  {
1349                if (ret == -ESRCH) {
1350                        /* should never happen */
1351                        mlog(ML_ERROR, "TCP stack not ready!\n");
1352                        BUG();
1353                } else if (ret == -EINVAL) {
1354                        mlog(ML_ERROR, "bad args passed to o2net!\n");
1355                        BUG();
1356                } else if (ret == -ENOMEM) {
1357                        mlog(ML_ERROR, "out of memory while trying to send "
1358                             "network message!  retrying\n");
1359                        /* this is totally crude */
1360                        msleep(50);
1361                        goto again;
1362                } else if (!dlm_is_host_down(ret)) {
1363                        /* not a network error. bad. */
1364                        mlog_errno(ret);
1365                        mlog(ML_ERROR, "unhandled error!");
1366                        BUG();
1367                }
1368                /* all other errors should be network errors,
1369                 * and likely indicate node death */
1370                mlog(ML_ERROR, "link to %d went down!\n", to);
1371                goto out;
1372        }
1373
1374        ret = 0;
1375        resend = 0;
1376        spin_lock(&mle->spinlock);
1377        switch (response) {
1378                case DLM_MASTER_RESP_YES:
1379                        set_bit(to, mle->response_map);
1380                        mlog(0, "node %u is the master, response=YES\n", to);
1381                        mlog(0, "%s:%.*s: master node %u now knows I have a "
1382                             "reference\n", dlm->name, res->lockname.len,
1383                             res->lockname.name, to);
1384                        mle->master = to;
1385                        break;
1386                case DLM_MASTER_RESP_NO:
1387                        mlog(0, "node %u not master, response=NO\n", to);
1388                        set_bit(to, mle->response_map);
1389                        break;
1390                case DLM_MASTER_RESP_MAYBE:
1391                        mlog(0, "node %u not master, response=MAYBE\n", to);
1392                        set_bit(to, mle->response_map);
1393                        set_bit(to, mle->maybe_map);
1394                        break;
1395                case DLM_MASTER_RESP_ERROR:
1396                        mlog(0, "node %u hit an error, resending\n", to);
1397                        resend = 1;
1398                        response = 0;
1399                        break;
1400                default:
1401                        mlog(ML_ERROR, "bad response! %u\n", response);
1402                        BUG();
1403        }
1404        spin_unlock(&mle->spinlock);
1405        if (resend) {
1406                /* this is also totally crude */
1407                msleep(50);
1408                goto again;
1409        }
1410
1411out:
1412        return ret;
1413}
1414
1415/*
1416 * locks that can be taken here:
1417 * dlm->spinlock
1418 * res->spinlock
1419 * mle->spinlock
1420 * dlm->master_list
1421 *
1422 * if possible, TRIM THIS DOWN!!!
1423 */
1424int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
1425                               void **ret_data)
1426{
1427        u8 response = DLM_MASTER_RESP_MAYBE;
1428        struct dlm_ctxt *dlm = data;
1429        struct dlm_lock_resource *res = NULL;
1430        struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1431        struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1432        char *name;
1433        unsigned int namelen, hash;
1434        int found, ret;
1435        int set_maybe;
1436        int dispatch_assert = 0;
1437
1438        if (!dlm_grab(dlm))
1439                return DLM_MASTER_RESP_NO;
1440
1441        if (!dlm_domain_fully_joined(dlm)) {
1442                response = DLM_MASTER_RESP_NO;
1443                goto send_response;
1444        }
1445
1446        name = request->name;
1447        namelen = request->namelen;
1448        hash = dlm_lockid_hash(name, namelen);
1449
1450        if (namelen > DLM_LOCKID_NAME_MAX) {
1451                response = DLM_IVBUFLEN;
1452                goto send_response;
1453        }
1454
1455way_up_top:
1456        spin_lock(&dlm->spinlock);
1457        res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1458        if (res) {
1459                spin_unlock(&dlm->spinlock);
1460
1461                /* take care of the easy cases up front */
1462                spin_lock(&res->spinlock);
1463                if (res->state & (DLM_LOCK_RES_RECOVERING|
1464                                  DLM_LOCK_RES_MIGRATING)) {
1465                        spin_unlock(&res->spinlock);
1466                        mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1467                             "being recovered/migrated\n");
1468                        response = DLM_MASTER_RESP_ERROR;
1469                        if (mle)
1470                                kmem_cache_free(dlm_mle_cache, mle);
1471                        goto send_response;
1472                }
1473
1474                if (res->owner == dlm->node_num) {
1475                        dlm_lockres_set_refmap_bit(dlm, res, request->node_idx);
1476                        spin_unlock(&res->spinlock);
1477                        response = DLM_MASTER_RESP_YES;
1478                        if (mle)
1479                                kmem_cache_free(dlm_mle_cache, mle);
1480
1481                        /* this node is the owner.
1482                         * there is some extra work that needs to
1483                         * happen now.  the requesting node has
1484                         * caused all nodes up to this one to
1485                         * create mles.  this node now needs to
1486                         * go back and clean those up. */
1487                        dispatch_assert = 1;
1488                        goto send_response;
1489                } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1490                        spin_unlock(&res->spinlock);
1491                        // mlog(0, "node %u is the master\n", res->owner);
1492                        response = DLM_MASTER_RESP_NO;
1493                        if (mle)
1494                                kmem_cache_free(dlm_mle_cache, mle);
1495                        goto send_response;
1496                }
1497
1498                /* ok, there is no owner.  either this node is
1499                 * being blocked, or it is actively trying to
1500                 * master this lock. */
1501                if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1502                        mlog(ML_ERROR, "lock with no owner should be "
1503                             "in-progress!\n");
1504                        BUG();
1505                }
1506
1507                // mlog(0, "lockres is in progress...\n");
1508                spin_lock(&dlm->master_lock);
1509                found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1510                if (!found) {
1511                        mlog(ML_ERROR, "no mle found for this lock!\n");
1512                        BUG();
1513                }
1514                set_maybe = 1;
1515                spin_lock(&tmpmle->spinlock);
1516                if (tmpmle->type == DLM_MLE_BLOCK) {
1517                        // mlog(0, "this node is waiting for "
1518                        // "lockres to be mastered\n");
1519                        response = DLM_MASTER_RESP_NO;
1520                } else if (tmpmle->type == DLM_MLE_MIGRATION) {
1521                        mlog(0, "node %u is master, but trying to migrate to "
1522                             "node %u.\n", tmpmle->master, tmpmle->new_master);
1523                        if (tmpmle->master == dlm->node_num) {
1524                                mlog(ML_ERROR, "no owner on lockres, but this "
1525                                     "node is trying to migrate it to %u?!\n",
1526                                     tmpmle->new_master);
1527                                BUG();
1528                        } else {
1529                                /* the real master can respond on its own */
1530                                response = DLM_MASTER_RESP_NO;
1531                        }
1532                } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1533                        set_maybe = 0;
1534                        if (tmpmle->master == dlm->node_num) {
1535                                response = DLM_MASTER_RESP_YES;
1536                                /* this node will be the owner.
1537                                 * go back and clean the mles on any
1538                                 * other nodes */
1539                                dispatch_assert = 1;
1540                                dlm_lockres_set_refmap_bit(dlm, res,
1541                                                           request->node_idx);
1542                        } else
1543                                response = DLM_MASTER_RESP_NO;
1544                } else {
1545                        // mlog(0, "this node is attempting to "
1546                        // "master lockres\n");
1547                        response = DLM_MASTER_RESP_MAYBE;
1548                }
1549                if (set_maybe)
1550                        set_bit(request->node_idx, tmpmle->maybe_map);
1551                spin_unlock(&tmpmle->spinlock);
1552
1553                spin_unlock(&dlm->master_lock);
1554                spin_unlock(&res->spinlock);
1555
1556                /* keep the mle attached to heartbeat events */
1557                dlm_put_mle(tmpmle);
1558                if (mle)
1559                        kmem_cache_free(dlm_mle_cache, mle);
1560                goto send_response;
1561        }
1562
1563        /*
1564         * lockres doesn't exist on this node
1565         * if there is an MLE_BLOCK, return NO
1566         * if there is an MLE_MASTER, return MAYBE
1567         * otherwise, add an MLE_BLOCK, return NO
1568         */
1569        spin_lock(&dlm->master_lock);
1570        found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1571        if (!found) {
1572                /* this lockid has never been seen on this node yet */
1573                // mlog(0, "no mle found\n");
1574                if (!mle) {
1575                        spin_unlock(&dlm->master_lock);
1576                        spin_unlock(&dlm->spinlock);
1577
1578                        mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
1579                        if (!mle) {
1580                                response = DLM_MASTER_RESP_ERROR;
1581                                mlog_errno(-ENOMEM);
1582                                goto send_response;
1583                        }
1584                        goto way_up_top;
1585                }
1586
1587                // mlog(0, "this is second time thru, already allocated, "
1588                // "add the block.\n");
1589                dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1590                set_bit(request->node_idx, mle->maybe_map);
1591                __dlm_insert_mle(dlm, mle);
1592                response = DLM_MASTER_RESP_NO;
1593        } else {
1594                // mlog(0, "mle was found\n");
1595                set_maybe = 1;
1596                spin_lock(&tmpmle->spinlock);
1597                if (tmpmle->master == dlm->node_num) {
1598                        mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1599                        BUG();
1600                }
1601                if (tmpmle->type == DLM_MLE_BLOCK)
1602                        response = DLM_MASTER_RESP_NO;
1603                else if (tmpmle->type == DLM_MLE_MIGRATION) {
1604                        mlog(0, "migration mle was found (%u->%u)\n",
1605                             tmpmle->master, tmpmle->new_master);
1606                        /* real master can respond on its own */
1607                        response = DLM_MASTER_RESP_NO;
1608                } else
1609                        response = DLM_MASTER_RESP_MAYBE;
1610                if (set_maybe)
1611                        set_bit(request->node_idx, tmpmle->maybe_map);
1612                spin_unlock(&tmpmle->spinlock);
1613        }
1614        spin_unlock(&dlm->master_lock);
1615        spin_unlock(&dlm->spinlock);
1616
1617        if (found) {
1618                /* keep the mle attached to heartbeat events */
1619                dlm_put_mle(tmpmle);
1620        }
1621send_response:
1622        /*
1623         * __dlm_lookup_lockres() grabbed a reference to this lockres.
1624         * The reference is released by dlm_assert_master_worker() under
1625         * the call to dlm_dispatch_assert_master().  If
1626         * dlm_assert_master_worker() isn't called, we drop it here.
1627         */
1628        if (dispatch_assert) {
1629                if (response != DLM_MASTER_RESP_YES)
1630                        mlog(ML_ERROR, "invalid response %d\n", response);
1631                if (!res) {
1632                        mlog(ML_ERROR, "bad lockres while trying to assert!\n");
1633                        BUG();
1634                }
1635                mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1636                             dlm->node_num, res->lockname.len, res->lockname.name);
1637                ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
1638                                                 DLM_ASSERT_MASTER_MLE_CLEANUP);
1639                if (ret < 0) {
1640                        mlog(ML_ERROR, "failed to dispatch assert master work\n");
1641                        response = DLM_MASTER_RESP_ERROR;
1642                        dlm_lockres_put(res);
1643                } else
1644                        dlm_lockres_grab_inflight_worker(dlm, res);
1645        } else {
1646                if (res)
1647                        dlm_lockres_put(res);
1648        }
1649
1650        dlm_put(dlm);
1651        return response;
1652}
1653
1654/*
1655 * DLM_ASSERT_MASTER_MSG
1656 */
1657
1658
1659/*
1660 * NOTE: this can be used for debugging
1661 * can periodically run all locks owned by this node
1662 * and re-assert across the cluster...
1663 */
1664static int dlm_do_assert_master(struct dlm_ctxt *dlm,
1665                                struct dlm_lock_resource *res,
1666                                void *nodemap, u32 flags)
1667{
1668        struct dlm_assert_master assert;
1669        int to, tmpret;
1670        struct dlm_node_iter iter;
1671        int ret = 0;
1672        int reassert;
1673        const char *lockname = res->lockname.name;
1674        unsigned int namelen = res->lockname.len;
1675
1676        BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1677
1678        spin_lock(&res->spinlock);
1679        res->state |= DLM_LOCK_RES_SETREF_INPROG;
1680        spin_unlock(&res->spinlock);
1681
1682again:
1683        reassert = 0;
1684
1685        /* note that if this nodemap is empty, it returns 0 */
1686        dlm_node_iter_init(nodemap, &iter);
1687        while ((to = dlm_node_iter_next(&iter)) >= 0) {
1688                int r = 0;
1689                struct dlm_master_list_entry *mle = NULL;
1690
1691                mlog(0, "sending assert master to %d (%.*s)\n", to,
1692                     namelen, lockname);
1693                memset(&assert, 0, sizeof(assert));
1694                assert.node_idx = dlm->node_num;
1695                assert.namelen = namelen;
1696                memcpy(assert.name, lockname, namelen);
1697                assert.flags = cpu_to_be32(flags);
1698
1699                tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1700                                            &assert, sizeof(assert), to, &r);
1701                if (tmpret < 0) {
1702                        mlog(ML_ERROR, "Error %d when sending message %u (key "
1703                             "0x%x) to node %u\n", tmpret,
1704                             DLM_ASSERT_MASTER_MSG, dlm->key, to);
1705                        if (!dlm_is_host_down(tmpret)) {
1706                                mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
1707                                BUG();
1708                        }
1709                        /* a node died.  finish out the rest of the nodes. */
1710                        mlog(0, "link to %d went down!\n", to);
1711                        /* any nonzero status return will do */
1712                        ret = tmpret;
1713                        r = 0;
1714                } else if (r < 0) {
1715                        /* ok, something horribly messed.  kill thyself. */
1716                        mlog(ML_ERROR,"during assert master of %.*s to %u, "
1717                             "got %d.\n", namelen, lockname, to, r);
1718                        spin_lock(&dlm->spinlock);
1719                        spin_lock(&dlm->master_lock);
1720                        if (dlm_find_mle(dlm, &mle, (char *)lockname,
1721                                         namelen)) {
1722                                dlm_print_one_mle(mle);
1723                                __dlm_put_mle(mle);
1724                        }
1725                        spin_unlock(&dlm->master_lock);
1726                        spin_unlock(&dlm->spinlock);
1727                        BUG();
1728                }
1729
1730                if (r & DLM_ASSERT_RESPONSE_REASSERT &&
1731                    !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
1732                                mlog(ML_ERROR, "%.*s: very strange, "
1733                                     "master MLE but no lockres on %u\n",
1734                                     namelen, lockname, to);
1735                }
1736
1737                if (r & DLM_ASSERT_RESPONSE_REASSERT) {
1738                        mlog(0, "%.*s: node %u create mles on other "
1739                             "nodes and requests a re-assert\n",
1740                             namelen, lockname, to);
1741                        reassert = 1;
1742                }
1743                if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
1744                        mlog(0, "%.*s: node %u has a reference to this "
1745                             "lockres, set the bit in the refmap\n",
1746                             namelen, lockname, to);
1747                        spin_lock(&res->spinlock);
1748                        dlm_lockres_set_refmap_bit(dlm, res, to);
1749                        spin_unlock(&res->spinlock);
1750                }
1751        }
1752
1753        if (reassert)
1754                goto again;
1755
1756        spin_lock(&res->spinlock);
1757        res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
1758        spin_unlock(&res->spinlock);
1759        wake_up(&res->wq);
1760
1761        return ret;
1762}
1763
1764/*
1765 * locks that can be taken here:
1766 * dlm->spinlock
1767 * res->spinlock
1768 * mle->spinlock
1769 * dlm->master_list
1770 *
1771 * if possible, TRIM THIS DOWN!!!
1772 */
1773int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
1774                              void **ret_data)
1775{
1776        struct dlm_ctxt *dlm = data;
1777        struct dlm_master_list_entry *mle = NULL;
1778        struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1779        struct dlm_lock_resource *res = NULL;
1780        char *name;
1781        unsigned int namelen, hash;
1782        u32 flags;
1783        int master_request = 0, have_lockres_ref = 0;
1784        int ret = 0;
1785
1786        if (!dlm_grab(dlm))
1787                return 0;
1788
1789        name = assert->name;
1790        namelen = assert->namelen;
1791        hash = dlm_lockid_hash(name, namelen);
1792        flags = be32_to_cpu(assert->flags);
1793
1794        if (namelen > DLM_LOCKID_NAME_MAX) {
1795                mlog(ML_ERROR, "Invalid name length!");
1796                goto done;
1797        }
1798
1799        spin_lock(&dlm->spinlock);
1800
1801        if (flags)
1802                mlog(0, "assert_master with flags: %u\n", flags);
1803
1804        /* find the MLE */
1805        spin_lock(&dlm->master_lock);
1806        if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1807                /* not an error, could be master just re-asserting */
1808                mlog(0, "just got an assert_master from %u, but no "
1809                     "MLE for it! (%.*s)\n", assert->node_idx,
1810                     namelen, name);
1811        } else {
1812                int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
1813                if (bit >= O2NM_MAX_NODES) {
1814                        /* not necessarily an error, though less likely.
1815                         * could be master just re-asserting. */
1816                        mlog(0, "no bits set in the maybe_map, but %u "
1817                             "is asserting! (%.*s)\n", assert->node_idx,
1818                             namelen, name);
1819                } else if (bit != assert->node_idx) {
1820                        if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1821                                mlog(0, "master %u was found, %u should "
1822                                     "back off\n", assert->node_idx, bit);
1823                        } else {
1824                                /* with the fix for bug 569, a higher node
1825                                 * number winning the mastery will respond
1826                                 * YES to mastery requests, but this node
1827                                 * had no way of knowing.  let it pass. */
1828                                mlog(0, "%u is the lowest node, "
1829                                     "%u is asserting. (%.*s)  %u must "
1830                                     "have begun after %u won.\n", bit,
1831                                     assert->node_idx, namelen, name, bit,
1832                                     assert->node_idx);
1833                        }
1834                }
1835                if (mle->type == DLM_MLE_MIGRATION) {
1836                        if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1837                                mlog(0, "%s:%.*s: got cleanup assert"
1838                                     " from %u for migration\n",
1839                                     dlm->name, namelen, name,
1840                                     assert->node_idx);
1841                        } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
1842                                mlog(0, "%s:%.*s: got unrelated assert"
1843                                     " from %u for migration, ignoring\n",
1844                                     dlm->name, namelen, name,
1845                                     assert->node_idx);
1846                                __dlm_put_mle(mle);
1847                                spin_unlock(&dlm->master_lock);
1848                                spin_unlock(&dlm->spinlock);
1849                                goto done;
1850                        }
1851                }
1852        }
1853        spin_unlock(&dlm->master_lock);
1854
1855        /* ok everything checks out with the MLE
1856         * now check to see if there is a lockres */
1857        res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1858        if (res) {
1859                spin_lock(&res->spinlock);
1860                if (res->state & DLM_LOCK_RES_RECOVERING)  {
1861                        mlog(ML_ERROR, "%u asserting but %.*s is "
1862                             "RECOVERING!\n", assert->node_idx, namelen, name);
1863                        goto kill;
1864                }
1865                if (!mle) {
1866                        if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
1867                            res->owner != assert->node_idx) {
1868                                mlog(ML_ERROR, "DIE! Mastery assert from %u, "
1869                                     "but current owner is %u! (%.*s)\n",
1870                                     assert->node_idx, res->owner, namelen,
1871                                     name);
1872                                __dlm_print_one_lock_resource(res);
1873                                BUG();
1874                        }
1875                } else if (mle->type != DLM_MLE_MIGRATION) {
1876                        if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1877                                /* owner is just re-asserting */
1878                                if (res->owner == assert->node_idx) {
1879                                        mlog(0, "owner %u re-asserting on "
1880                                             "lock %.*s\n", assert->node_idx,
1881                                             namelen, name);
1882                                        goto ok;
1883                                }
1884                                mlog(ML_ERROR, "got assert_master from "
1885                                     "node %u, but %u is the owner! "
1886                                     "(%.*s)\n", assert->node_idx,
1887                                     res->owner, namelen, name);
1888                                goto kill;
1889                        }
1890                        if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1891                                mlog(ML_ERROR, "got assert from %u, but lock "
1892                                     "with no owner should be "
1893                                     "in-progress! (%.*s)\n",
1894                                     assert->node_idx,
1895                                     namelen, name);
1896                                goto kill;
1897                        }
1898                } else /* mle->type == DLM_MLE_MIGRATION */ {
1899                        /* should only be getting an assert from new master */
1900                        if (assert->node_idx != mle->new_master) {
1901                                mlog(ML_ERROR, "got assert from %u, but "
1902                                     "new master is %u, and old master "
1903                                     "was %u (%.*s)\n",
1904                                     assert->node_idx, mle->new_master,
1905                                     mle->master, namelen, name);
1906                                goto kill;
1907                        }
1908
1909                }
1910ok:
1911                spin_unlock(&res->spinlock);
1912        }
1913
1914        // mlog(0, "woo!  got an assert_master from node %u!\n",
1915        //           assert->node_idx);
1916        if (mle) {
1917                int extra_ref = 0;
1918                int nn = -1;
1919                int rr, err = 0;
1920
1921                spin_lock(&mle->spinlock);
1922                if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1923                        extra_ref = 1;
1924                else {
1925                        /* MASTER mle: if any bits set in the response map
1926                         * then the calling node needs to re-assert to clear
1927                         * up nodes that this node contacted */
1928                        while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
1929                                                    nn+1)) < O2NM_MAX_NODES) {
1930                                if (nn != dlm->node_num && nn != assert->node_idx) {
1931                                        master_request = 1;
1932                                        break;
1933                                }
1934                        }
1935                }
1936                mle->master = assert->node_idx;
1937                atomic_set(&mle->woken, 1);
1938                wake_up(&mle->wq);
1939                spin_unlock(&mle->spinlock);
1940
1941                if (res) {
1942                        int wake = 0;
1943                        spin_lock(&res->spinlock);
1944                        if (mle->type == DLM_MLE_MIGRATION) {
1945                                mlog(0, "finishing off migration of lockres %.*s, "
1946                                        "from %u to %u\n",
1947                                        res->lockname.len, res->lockname.name,
1948                                        dlm->node_num, mle->new_master);
1949                                res->state &= ~DLM_LOCK_RES_MIGRATING;
1950                                wake = 1;
1951                                dlm_change_lockres_owner(dlm, res, mle->new_master);
1952                                BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1953                        } else {
1954                                dlm_change_lockres_owner(dlm, res, mle->master);
1955                        }
1956                        spin_unlock(&res->spinlock);
1957                        have_lockres_ref = 1;
1958                        if (wake)
1959                                wake_up(&res->wq);
1960                }
1961
1962                /* master is known, detach if not already detached.
1963                 * ensures that only one assert_master call will happen
1964                 * on this mle. */
1965                spin_lock(&dlm->master_lock);
1966
1967                rr = atomic_read(&mle->mle_refs.refcount);
1968                if (mle->inuse > 0) {
1969                        if (extra_ref && rr < 3)
1970                                err = 1;
1971                        else if (!extra_ref && rr < 2)
1972                                err = 1;
1973                } else {
1974                        if (extra_ref && rr < 2)
1975                                err = 1;
1976                        else if (!extra_ref && rr < 1)
1977                                err = 1;
1978                }
1979                if (err) {
1980                        mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
1981                             "that will mess up this node, refs=%d, extra=%d, "
1982                             "inuse=%d\n", dlm->name, namelen, name,
1983                             assert->node_idx, rr, extra_ref, mle->inuse);
1984                        dlm_print_one_mle(mle);
1985                }
1986                __dlm_unlink_mle(dlm, mle);
1987                __dlm_mle_detach_hb_events(dlm, mle);
1988                __dlm_put_mle(mle);
1989                if (extra_ref) {
1990                        /* the assert master message now balances the extra
1991                         * ref given by the master / migration request message.
1992                         * if this is the last put, it will be removed
1993                         * from the list. */
1994                        __dlm_put_mle(mle);
1995                }
1996                spin_unlock(&dlm->master_lock);
1997        } else if (res) {
1998                if (res->owner != assert->node_idx) {
1999                        mlog(0, "assert_master from %u, but current "
2000                             "owner is %u (%.*s), no mle\n", assert->node_idx,
2001                             res->owner, namelen, name);
2002                }
2003        }
2004        spin_unlock(&dlm->spinlock);
2005
2006done:
2007        ret = 0;
2008        if (res) {
2009                spin_lock(&res->spinlock);
2010                res->state |= DLM_LOCK_RES_SETREF_INPROG;
2011                spin_unlock(&res->spinlock);
2012                *ret_data = (void *)res;
2013        }
2014        dlm_put(dlm);
2015        if (master_request) {
2016                mlog(0, "need to tell master to reassert\n");
2017                /* positive. negative would shoot down the node. */
2018                ret |= DLM_ASSERT_RESPONSE_REASSERT;
2019                if (!have_lockres_ref) {
2020                        mlog(ML_ERROR, "strange, got assert from %u, MASTER "
2021                             "mle present here for %s:%.*s, but no lockres!\n",
2022                             assert->node_idx, dlm->name, namelen, name);
2023                }
2024        }
2025        if (have_lockres_ref) {
2026                /* let the master know we have a reference to the lockres */
2027                ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
2028                mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
2029                     dlm->name, namelen, name, assert->node_idx);
2030        }
2031        return ret;
2032
2033kill:
2034        /* kill the caller! */
2035        mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
2036             "and killing the other node now!  This node is OK and can continue.\n");
2037        __dlm_print_one_lock_resource(res);
2038        spin_unlock(&res->spinlock);
2039        spin_lock(&dlm->master_lock);
2040        if (mle)
2041                __dlm_put_mle(mle);
2042        spin_unlock(&dlm->master_lock);
2043        spin_unlock(&dlm->spinlock);
2044        *ret_data = (void *)res;
2045        dlm_put(dlm);
2046        return -EINVAL;
2047}
2048
2049void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
2050{
2051        struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
2052
2053        if (ret_data) {
2054                spin_lock(&res->spinlock);
2055                res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
2056                spin_unlock(&res->spinlock);
2057                wake_up(&res->wq);
2058                dlm_lockres_put(res);
2059        }
2060        return;
2061}
2062
2063int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
2064                               struct dlm_lock_resource *res,
2065                               int ignore_higher, u8 request_from, u32 flags)
2066{
2067        struct dlm_work_item *item;
2068        item = kzalloc(sizeof(*item), GFP_ATOMIC);
2069        if (!item)
2070                return -ENOMEM;
2071
2072
2073        /* queue up work for dlm_assert_master_worker */
2074        dlm_grab(dlm);  /* get an extra ref for the work item */
2075        dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
2076        item->u.am.lockres = res; /* already have a ref */
2077        /* can optionally ignore node numbers higher than this node */
2078        item->u.am.ignore_higher = ignore_higher;
2079        item->u.am.request_from = request_from;
2080        item->u.am.flags = flags;
2081
2082        if (ignore_higher)
2083                mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
2084                     res->lockname.name);
2085
2086        spin_lock(&dlm->work_lock);
2087        list_add_tail(&item->list, &dlm->work_list);
2088        spin_unlock(&dlm->work_lock);
2089
2090        queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2091        return 0;
2092}
2093
2094static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
2095{
2096        struct dlm_ctxt *dlm = data;
2097        int ret = 0;
2098        struct dlm_lock_resource *res;
2099        unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
2100        int ignore_higher;
2101        int bit;
2102        u8 request_from;
2103        u32 flags;
2104
2105        dlm = item->dlm;
2106        res = item->u.am.lockres;
2107        ignore_higher = item->u.am.ignore_higher;
2108        request_from = item->u.am.request_from;
2109        flags = item->u.am.flags;
2110
2111        spin_lock(&dlm->spinlock);
2112        memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
2113        spin_unlock(&dlm->spinlock);
2114
2115        clear_bit(dlm->node_num, nodemap);
2116        if (ignore_higher) {
2117                /* if is this just to clear up mles for nodes below
2118                 * this node, do not send the message to the original
2119                 * caller or any node number higher than this */
2120                clear_bit(request_from, nodemap);
2121                bit = dlm->node_num;
2122                while (1) {
2123                        bit = find_next_bit(nodemap, O2NM_MAX_NODES,
2124                                            bit+1);
2125                        if (bit >= O2NM_MAX_NODES)
2126                                break;
2127                        clear_bit(bit, nodemap);
2128                }
2129        }
2130
2131        /*
2132         * If we're migrating this lock to someone else, we are no
2133         * longer allowed to assert out own mastery.  OTOH, we need to
2134         * prevent migration from starting while we're still asserting
2135         * our dominance.  The reserved ast delays migration.
2136         */
2137        spin_lock(&res->spinlock);
2138        if (res->state & DLM_LOCK_RES_MIGRATING) {
2139                mlog(0, "Someone asked us to assert mastery, but we're "
2140                     "in the middle of migration.  Skipping assert, "
2141                     "the new master will handle that.\n");
2142                spin_unlock(&res->spinlock);
2143                goto put;
2144        } else
2145                __dlm_lockres_reserve_ast(res);
2146        spin_unlock(&res->spinlock);
2147
2148        /* this call now finishes out the nodemap
2149         * even if one or more nodes die */
2150        mlog(0, "worker about to master %.*s here, this=%u\n",
2151                     res->lockname.len, res->lockname.name, dlm->node_num);
2152        ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2153        if (ret < 0) {
2154                /* no need to restart, we are done */
2155                if (!dlm_is_host_down(ret))
2156                        mlog_errno(ret);
2157        }
2158
2159        /* Ok, we've asserted ourselves.  Let's let migration start. */
2160        dlm_lockres_release_ast(dlm, res);
2161
2162put:
2163        dlm_lockres_drop_inflight_worker(dlm, res);
2164
2165        dlm_lockres_put(res);
2166
2167        mlog(0, "finished with dlm_assert_master_worker\n");
2168}
2169
2170/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
2171 * We cannot wait for node recovery to complete to begin mastering this
2172 * lockres because this lockres is used to kick off recovery! ;-)
2173 * So, do a pre-check on all living nodes to see if any of those nodes
2174 * think that $RECOVERY is currently mastered by a dead node.  If so,
2175 * we wait a short time to allow that node to get notified by its own
2176 * heartbeat stack, then check again.  All $RECOVERY lock resources
2177 * mastered by dead nodes are purged when the hearbeat callback is
2178 * fired, so we can know for sure that it is safe to continue once
2179 * the node returns a live node or no node.  */
2180static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2181                                       struct dlm_lock_resource *res)
2182{
2183        struct dlm_node_iter iter;
2184        int nodenum;
2185        int ret = 0;
2186        u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
2187
2188        spin_lock(&dlm->spinlock);
2189        dlm_node_iter_init(dlm->domain_map, &iter);
2190        spin_unlock(&dlm->spinlock);
2191
2192        while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2193                /* do not send to self */
2194                if (nodenum == dlm->node_num)
2195                        continue;
2196                ret = dlm_do_master_requery(dlm, res, nodenum, &master);
2197                if (ret < 0) {
2198                        mlog_errno(ret);
2199                        if (!dlm_is_host_down(ret))
2200                                BUG();
2201                        /* host is down, so answer for that node would be
2202                         * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
2203                        ret = 0;
2204                }
2205
2206                if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
2207                        /* check to see if this master is in the recovery map */
2208                        spin_lock(&dlm->spinlock);
2209                        if (test_bit(master, dlm->recovery_map)) {
2210                                mlog(ML_NOTICE, "%s: node %u has not seen "
2211                                     "node %u go down yet, and thinks the "
2212                                     "dead node is mastering the recovery "
2213                                     "lock.  must wait.\n", dlm->name,
2214                                     nodenum, master);
2215                                ret = -EAGAIN;
2216                        }
2217                        spin_unlock(&dlm->spinlock);
2218                        mlog(0, "%s: reco lock master is %u\n", dlm->name,
2219                             master);
2220                        break;
2221                }
2222        }
2223        return ret;
2224}
2225
2226/*
2227 * DLM_DEREF_LOCKRES_MSG
2228 */
2229
2230int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2231{
2232        struct dlm_deref_lockres deref;
2233        int ret = 0, r;
2234        const char *lockname;
2235        unsigned int namelen;
2236
2237        lockname = res->lockname.name;
2238        namelen = res->lockname.len;
2239        BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2240
2241        memset(&deref, 0, sizeof(deref));
2242        deref.node_idx = dlm->node_num;
2243        deref.namelen = namelen;
2244        memcpy(deref.name, lockname, namelen);
2245
2246        ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2247                                 &deref, sizeof(deref), res->owner, &r);
2248        if (ret < 0)
2249                mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF to node %u\n",
2250                     dlm->name, namelen, lockname, ret, res->owner);
2251        else if (r < 0) {
2252                /* BAD.  other node says I did not have a ref. */
2253                mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
2254                     dlm->name, namelen, lockname, res->owner, r);
2255                dlm_print_one_lock_resource(res);
2256                BUG();
2257        }
2258        return ret;
2259}
2260
2261int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
2262                              void **ret_data)
2263{
2264        struct dlm_ctxt *dlm = data;
2265        struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
2266        struct dlm_lock_resource *res = NULL;
2267        char *name;
2268        unsigned int namelen;
2269        int ret = -EINVAL;
2270        u8 node;
2271        unsigned int hash;
2272        struct dlm_work_item *item;
2273        int cleared = 0;
2274        int dispatch = 0;
2275
2276        if (!dlm_grab(dlm))
2277                return 0;
2278
2279        name = deref->name;
2280        namelen = deref->namelen;
2281        node = deref->node_idx;
2282
2283        if (namelen > DLM_LOCKID_NAME_MAX) {
2284                mlog(ML_ERROR, "Invalid name length!");
2285                goto done;
2286        }
2287        if (deref->node_idx >= O2NM_MAX_NODES) {
2288                mlog(ML_ERROR, "Invalid node number: %u\n", node);
2289                goto done;
2290        }
2291
2292        hash = dlm_lockid_hash(name, namelen);
2293
2294        spin_lock(&dlm->spinlock);
2295        res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2296        if (!res) {
2297                spin_unlock(&dlm->spinlock);
2298                mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2299                     dlm->name, namelen, name);
2300                goto done;
2301        }
2302        spin_unlock(&dlm->spinlock);
2303
2304        spin_lock(&res->spinlock);
2305        if (res->state & DLM_LOCK_RES_SETREF_INPROG)
2306                dispatch = 1;
2307        else {
2308                BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2309                if (test_bit(node, res->refmap)) {
2310                        dlm_lockres_clear_refmap_bit(dlm, res, node);
2311                        cleared = 1;
2312                }
2313        }
2314        spin_unlock(&res->spinlock);
2315
2316        if (!dispatch) {
2317                if (cleared)
2318                        dlm_lockres_calc_usage(dlm, res);
2319                else {
2320                        mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2321                        "but it is already dropped!\n", dlm->name,
2322                        res->lockname.len, res->lockname.name, node);
2323                        dlm_print_one_lock_resource(res);
2324                }
2325                ret = 0;
2326                goto done;
2327        }
2328
2329        item = kzalloc(sizeof(*item), GFP_NOFS);
2330        if (!item) {
2331                ret = -ENOMEM;
2332                mlog_errno(ret);
2333                goto done;
2334        }
2335
2336        dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
2337        item->u.dl.deref_res = res;
2338        item->u.dl.deref_node = node;
2339
2340        spin_lock(&dlm->work_lock);
2341        list_add_tail(&item->list, &dlm->work_list);
2342        spin_unlock(&dlm->work_lock);
2343
2344        queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2345        return 0;
2346
2347done:
2348        if (res)
2349                dlm_lockres_put(res);
2350        dlm_put(dlm);
2351
2352        return ret;
2353}
2354
2355static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
2356{
2357        struct dlm_ctxt *dlm;
2358        struct dlm_lock_resource *res;
2359        u8 node;
2360        u8 cleared = 0;
2361
2362        dlm = item->dlm;
2363        res = item->u.dl.deref_res;
2364        node = item->u.dl.deref_node;
2365
2366        spin_lock(&res->spinlock);
2367        BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2368        if (test_bit(node, res->refmap)) {
2369                __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
2370                dlm_lockres_clear_refmap_bit(dlm, res, node);
2371                cleared = 1;
2372        }
2373        spin_unlock(&res->spinlock);
2374
2375        if (cleared) {
2376                mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
2377                     dlm->name, res->lockname.len, res->lockname.name, node);
2378                dlm_lockres_calc_usage(dlm, res);
2379        } else {
2380                mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2381                     "but it is already dropped!\n", dlm->name,
2382                     res->lockname.len, res->lockname.name, node);
2383                dlm_print_one_lock_resource(res);
2384        }
2385
2386        dlm_lockres_put(res);
2387}
2388
2389/*
2390 * A migrateable resource is one that is :
2391 * 1. locally mastered, and,
2392 * 2. zero local locks, and,
2393 * 3. one or more non-local locks, or, one or more references
2394 * Returns 1 if yes, 0 if not.
2395 */
2396static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm,
2397                                      struct dlm_lock_resource *res)
2398{
2399        enum dlm_lockres_list idx;
2400        int nonlocal = 0, node_ref;
2401        struct list_head *queue;
2402        struct dlm_lock *lock;
2403        u64 cookie;
2404
2405        assert_spin_locked(&res->spinlock);
2406
2407        /* delay migration when the lockres is in MIGRATING state */
2408        if (res->state & DLM_LOCK_RES_MIGRATING)
2409                return 0;
2410
2411        /* delay migration when the lockres is in RECOCERING state */
2412        if (res->state & DLM_LOCK_RES_RECOVERING)
2413                return 0;
2414
2415        if (res->owner != dlm->node_num)
2416                return 0;
2417
2418        for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
2419                queue = dlm_list_idx_to_ptr(res, idx);
2420                list_for_each_entry(lock, queue, list) {
2421                        if (lock->ml.node != dlm->node_num) {
2422                                nonlocal++;
2423                                continue;
2424                        }
2425                        cookie = be64_to_cpu(lock->ml.cookie);
2426                        mlog(0, "%s: Not migrateable res %.*s, lock %u:%llu on "
2427                             "%s list\n", dlm->name, res->lockname.len,
2428                             res->lockname.name,
2429                             dlm_get_lock_cookie_node(cookie),
2430                             dlm_get_lock_cookie_seq(cookie),
2431                             dlm_list_in_text(idx));
2432                        return 0;
2433                }
2434        }
2435
2436        if (!nonlocal) {
2437                node_ref = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
2438                if (node_ref >= O2NM_MAX_NODES)
2439                        return 0;
2440        }
2441
2442        mlog(0, "%s: res %.*s, Migrateable\n", dlm->name, res->lockname.len,
2443             res->lockname.name);
2444
2445        return 1;
2446}
2447
2448/*
2449 * DLM_MIGRATE_LOCKRES
2450 */
2451
2452
2453static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
2454                               struct dlm_lock_resource *res, u8 target)
2455{
2456        struct dlm_master_list_entry *mle = NULL;
2457        struct dlm_master_list_entry *oldmle = NULL;
2458        struct dlm_migratable_lockres *mres = NULL;
2459        int ret = 0;
2460        const char *name;
2461        unsigned int namelen;
2462        int mle_added = 0;
2463        int wake = 0;
2464
2465        if (!dlm_grab(dlm))
2466                return -EINVAL;
2467
2468        BUG_ON(target == O2NM_MAX_NODES);
2469
2470        name = res->lockname.name;
2471        namelen = res->lockname.len;
2472
2473        mlog(0, "%s: Migrating %.*s to node %u\n", dlm->name, namelen, name,
2474             target);
2475
2476        /* preallocate up front. if this fails, abort */
2477        ret = -ENOMEM;
2478        mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
2479        if (!mres) {
2480                mlog_errno(ret);
2481                goto leave;
2482        }
2483
2484        mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
2485        if (!mle) {
2486                mlog_errno(ret);
2487                goto leave;
2488        }
2489        ret = 0;
2490
2491        /*
2492         * clear any existing master requests and
2493         * add the migration mle to the list
2494         */
2495        spin_lock(&dlm->spinlock);
2496        spin_lock(&dlm->master_lock);
2497        ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2498                                    namelen, target, dlm->node_num);
2499        spin_unlock(&dlm->master_lock);
2500        spin_unlock(&dlm->spinlock);
2501
2502        if (ret == -EEXIST) {
2503                mlog(0, "another process is already migrating it\n");
2504                goto fail;
2505        }
2506        mle_added = 1;
2507
2508        /*
2509         * set the MIGRATING flag and flush asts
2510         * if we fail after this we need to re-dirty the lockres
2511         */
2512        if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2513                mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
2514                     "the target went down.\n", res->lockname.len,
2515                     res->lockname.name, target);
2516                spin_lock(&res->spinlock);
2517                res->state &= ~DLM_LOCK_RES_MIGRATING;
2518                wake = 1;
2519                spin_unlock(&res->spinlock);
2520                ret = -EINVAL;
2521        }
2522
2523fail:
2524        if (oldmle) {
2525                /* master is known, detach if not already detached */
2526                dlm_mle_detach_hb_events(dlm, oldmle);
2527                dlm_put_mle(oldmle);
2528        }
2529
2530        if (ret < 0) {
2531                if (mle_added) {
2532                        dlm_mle_detach_hb_events(dlm, mle);
2533                        dlm_put_mle(mle);
2534                } else if (mle) {
2535                        kmem_cache_free(dlm_mle_cache, mle);
2536                        mle = NULL;
2537                }
2538                goto leave;
2539        }
2540
2541        /*
2542         * at this point, we have a migration target, an mle
2543         * in the master list, and the MIGRATING flag set on
2544         * the lockres
2545         */
2546
2547        /* now that remote nodes are spinning on the MIGRATING flag,
2548         * ensure that all assert_master work is flushed. */
2549        flush_workqueue(dlm->dlm_worker);
2550
2551        /* get an extra reference on the mle.
2552         * otherwise the assert_master from the new
2553         * master will destroy this.
2554         * also, make sure that all callers of dlm_get_mle
2555         * take both dlm->spinlock and dlm->master_lock */
2556        spin_lock(&dlm->spinlock);
2557        spin_lock(&dlm->master_lock);
2558        dlm_get_mle_inuse(mle);
2559        spin_unlock(&dlm->master_lock);
2560        spin_unlock(&dlm->spinlock);
2561
2562        /* notify new node and send all lock state */
2563        /* call send_one_lockres with migration flag.
2564         * this serves as notice to the target node that a
2565         * migration is starting. */
2566        ret = dlm_send_one_lockres(dlm, res, mres, target,
2567                                   DLM_MRES_MIGRATION);
2568
2569        if (ret < 0) {
2570                mlog(0, "migration to node %u failed with %d\n",
2571                     target, ret);
2572                /* migration failed, detach and clean up mle */
2573                dlm_mle_detach_hb_events(dlm, mle);
2574                dlm_put_mle(mle);
2575                dlm_put_mle_inuse(mle);
2576                spin_lock(&res->spinlock);
2577                res->state &= ~DLM_LOCK_RES_MIGRATING;
2578                wake = 1;
2579                spin_unlock(&res->spinlock);
2580                if (dlm_is_host_down(ret))
2581                        dlm_wait_for_node_death(dlm, target,
2582                                                DLM_NODE_DEATH_WAIT_MAX);
2583                goto leave;
2584        }
2585
2586        /* at this point, the target sends a message to all nodes,
2587         * (using dlm_do_migrate_request).  this node is skipped since
2588         * we had to put an mle in the list to begin the process.  this
2589         * node now waits for target to do an assert master.  this node
2590         * will be the last one notified, ensuring that the migration
2591         * is complete everywhere.  if the target dies while this is
2592         * going on, some nodes could potentially see the target as the
2593         * master, so it is important that my recovery finds the migration
2594         * mle and sets the master to UNKNOWN. */
2595
2596
2597        /* wait for new node to assert master */
2598        while (1) {
2599                ret = wait_event_interruptible_timeout(mle->wq,
2600                                        (atomic_read(&mle->woken) == 1),
2601                                        msecs_to_jiffies(5000));
2602
2603                if (ret >= 0) {
2604                        if (atomic_read(&mle->woken) == 1 ||
2605                            res->owner == target)
2606                                break;
2607
2608                        mlog(0, "%s:%.*s: timed out during migration\n",
2609                             dlm->name, res->lockname.len, res->lockname.name);
2610                        /* avoid hang during shutdown when migrating lockres
2611                         * to a node which also goes down */
2612                        if (dlm_is_node_dead(dlm, target)) {
2613                                mlog(0, "%s:%.*s: expected migration "
2614                                     "target %u is no longer up, restarting\n",
2615                                     dlm->name, res->lockname.len,
2616                                     res->lockname.name, target);
2617                                ret = -EINVAL;
2618                                /* migration failed, detach and clean up mle */
2619                                dlm_mle_detach_hb_events(dlm, mle);
2620                                dlm_put_mle(mle);
2621                                dlm_put_mle_inuse(mle);
2622                                spin_lock(&res->spinlock);
2623                                res->state &= ~DLM_LOCK_RES_MIGRATING;
2624                                wake = 1;
2625                                spin_unlock(&res->spinlock);
2626                                goto leave;
2627                        }
2628                } else
2629                        mlog(0, "%s:%.*s: caught signal during migration\n",
2630                             dlm->name, res->lockname.len, res->lockname.name);
2631        }
2632
2633        /* all done, set the owner, clear the flag */
2634        spin_lock(&res->spinlock);
2635        dlm_set_lockres_owner(dlm, res, target);
2636        res->state &= ~DLM_LOCK_RES_MIGRATING;
2637        dlm_remove_nonlocal_locks(dlm, res);
2638        spin_unlock(&res->spinlock);
2639        wake_up(&res->wq);
2640
2641        /* master is known, detach if not already detached */
2642        dlm_mle_detach_hb_events(dlm, mle);
2643        dlm_put_mle_inuse(mle);
2644        ret = 0;
2645
2646        dlm_lockres_calc_usage(dlm, res);
2647
2648leave:
2649        /* re-dirty the lockres if we failed */
2650        if (ret < 0)
2651                dlm_kick_thread(dlm, res);
2652
2653        /* wake up waiters if the MIGRATING flag got set
2654         * but migration failed */
2655        if (wake)
2656                wake_up(&res->wq);
2657
2658        if (mres)
2659                free_page((unsigned long)mres);
2660
2661        dlm_put(dlm);
2662
2663        mlog(0, "%s: Migrating %.*s to %u, returns %d\n", dlm->name, namelen,
2664             name, target, ret);
2665        return ret;
2666}
2667
2668#define DLM_MIGRATION_RETRY_MS  100
2669
2670/*
2671 * Should be called only after beginning the domain leave process.
2672 * There should not be any remaining locks on nonlocal lock resources,
2673 * and there should be no local locks left on locally mastered resources.
2674 *
2675 * Called with the dlm spinlock held, may drop it to do migration, but
2676 * will re-acquire before exit.
2677 *
2678 * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped
2679 */
2680int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2681{
2682        int ret;
2683        int lock_dropped = 0;
2684        u8 target = O2NM_MAX_NODES;
2685
2686        assert_spin_locked(&dlm->spinlock);
2687
2688        spin_lock(&res->spinlock);
2689        if (dlm_is_lockres_migrateable(dlm, res))
2690                target = dlm_pick_migration_target(dlm, res);
2691        spin_unlock(&res->spinlock);
2692
2693        if (target == O2NM_MAX_NODES)
2694                goto leave;
2695
2696        /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
2697        spin_unlock(&dlm->spinlock);
2698        lock_dropped = 1;
2699        ret = dlm_migrate_lockres(dlm, res, target);
2700        if (ret)
2701                mlog(0, "%s: res %.*s, Migrate to node %u failed with %d\n",
2702                     dlm->name, res->lockname.len, res->lockname.name,
2703                     target, ret);
2704        spin_lock(&dlm->spinlock);
2705leave:
2706        return lock_dropped;
2707}
2708
2709int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2710{
2711        int ret;
2712        spin_lock(&dlm->ast_lock);
2713        spin_lock(&lock->spinlock);
2714        ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2715        spin_unlock(&lock->spinlock);
2716        spin_unlock(&dlm->ast_lock);
2717        return ret;
2718}
2719
2720static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2721                                     struct dlm_lock_resource *res,
2722                                     u8 mig_target)
2723{
2724        int can_proceed;
2725        spin_lock(&res->spinlock);
2726        can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2727        spin_unlock(&res->spinlock);
2728
2729        /* target has died, so make the caller break out of the
2730         * wait_event, but caller must recheck the domain_map */
2731        spin_lock(&dlm->spinlock);
2732        if (!test_bit(mig_target, dlm->domain_map))
2733                can_proceed = 1;
2734        spin_unlock(&dlm->spinlock);
2735        return can_proceed;
2736}
2737
2738static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
2739                                struct dlm_lock_resource *res)
2740{
2741        int ret;
2742        spin_lock(&res->spinlock);
2743        ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2744        spin_unlock(&res->spinlock);
2745        return ret;
2746}
2747
2748
2749static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2750                                       struct dlm_lock_resource *res,
2751                                       u8 target)
2752{
2753        int ret = 0;
2754
2755        mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2756               res->lockname.len, res->lockname.name, dlm->node_num,
2757               target);
2758        /* need to set MIGRATING flag on lockres.  this is done by
2759         * ensuring that all asts have been flushed for this lockres. */
2760        spin_lock(&res->spinlock);
2761        BUG_ON(res->migration_pending);
2762        res->migration_pending = 1;
2763        /* strategy is to reserve an extra ast then release
2764         * it below, letting the release do all of the work */
2765        __dlm_lockres_reserve_ast(res);
2766        spin_unlock(&res->spinlock);
2767
2768        /* now flush all the pending asts */
2769        dlm_kick_thread(dlm, res);
2770        /* before waiting on DIRTY, block processes which may
2771         * try to dirty the lockres before MIGRATING is set */
2772        spin_lock(&res->spinlock);
2773        BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
2774        res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
2775        spin_unlock(&res->spinlock);
2776        /* now wait on any pending asts and the DIRTY state */
2777        wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2778        dlm_lockres_release_ast(dlm, res);
2779
2780        mlog(0, "about to wait on migration_wq, dirty=%s\n",
2781               res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2782        /* if the extra ref we just put was the final one, this
2783         * will pass thru immediately.  otherwise, we need to wait
2784         * for the last ast to finish. */
2785again:
2786        ret = wait_event_interruptible_timeout(dlm->migration_wq,
2787                   dlm_migration_can_proceed(dlm, res, target),
2788                   msecs_to_jiffies(1000));
2789        if (ret < 0) {
2790                mlog(0, "woken again: migrating? %s, dead? %s\n",
2791                       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2792                       test_bit(target, dlm->domain_map) ? "no":"yes");
2793        } else {
2794                mlog(0, "all is well: migrating? %s, dead? %s\n",
2795                       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2796                       test_bit(target, dlm->domain_map) ? "no":"yes");
2797        }
2798        if (!dlm_migration_can_proceed(dlm, res, target)) {
2799                mlog(0, "trying again...\n");
2800                goto again;
2801        }
2802
2803        ret = 0;
2804        /* did the target go down or die? */
2805        spin_lock(&dlm->spinlock);
2806        if (!test_bit(target, dlm->domain_map)) {
2807                mlog(ML_ERROR, "aha. migration target %u just went down\n",
2808                     target);
2809                ret = -EHOSTDOWN;
2810        }
2811        spin_unlock(&dlm->spinlock);
2812
2813        /*
2814         * if target is down, we need to clear DLM_LOCK_RES_BLOCK_DIRTY for
2815         * another try; otherwise, we are sure the MIGRATING state is there,
2816         * drop the unneded state which blocked threads trying to DIRTY
2817         */
2818        spin_lock(&res->spinlock);
2819        BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
2820        res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
2821        if (!ret)
2822                BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
2823        spin_unlock(&res->spinlock);
2824
2825        /*
2826         * at this point:
2827         *
2828         *   o the DLM_LOCK_RES_MIGRATING flag is set if target not down
2829         *   o there are no pending asts on this lockres
2830         *   o all processes trying to reserve an ast on this
2831         *     lockres must wait for the MIGRATING flag to clear
2832         */
2833        return ret;
2834}
2835
2836/* last step in the migration process.
2837 * original master calls this to free all of the dlm_lock
2838 * structures that used to be for other nodes. */
2839static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2840                                      struct dlm_lock_resource *res)
2841{
2842        struct list_head *queue = &res->granted;
2843        int i, bit;
2844        struct dlm_lock *lock, *next;
2845
2846        assert_spin_locked(&res->spinlock);
2847
2848        BUG_ON(res->owner == dlm->node_num);
2849
2850        for (i=0; i<3; i++) {
2851                list_for_each_entry_safe(lock, next, queue, list) {
2852                        if (lock->ml.node != dlm->node_num) {
2853                                mlog(0, "putting lock for node %u\n",
2854                                     lock->ml.node);
2855                                /* be extra careful */
2856                                BUG_ON(!list_empty(&lock->ast_list));
2857                                BUG_ON(!list_empty(&lock->bast_list));
2858                                BUG_ON(lock->ast_pending);
2859                                BUG_ON(lock->bast_pending);
2860                                dlm_lockres_clear_refmap_bit(dlm, res,
2861                                                             lock->ml.node);
2862                                list_del_init(&lock->list);
2863                                dlm_lock_put(lock);
2864                                /* In a normal unlock, we would have added a
2865                                 * DLM_UNLOCK_FREE_LOCK action. Force it. */
2866                                dlm_lock_put(lock);
2867                        }
2868                }
2869                queue++;
2870        }
2871        bit = 0;
2872        while (1) {
2873                bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
2874                if (bit >= O2NM_MAX_NODES)
2875                        break;
2876                /* do not clear the local node reference, if there is a
2877                 * process holding this, let it drop the ref itself */
2878                if (bit != dlm->node_num) {
2879                        mlog(0, "%s:%.*s: node %u had a ref to this "
2880                             "migrating lockres, clearing\n", dlm->name,
2881                             res->lockname.len, res->lockname.name, bit);
2882                        dlm_lockres_clear_refmap_bit(dlm, res, bit);
2883                }
2884                bit++;
2885        }
2886}
2887
2888/*
2889 * Pick a node to migrate the lock resource to. This function selects a
2890 * potential target based first on the locks and then on refmap. It skips
2891 * nodes that are in the process of exiting the domain.
2892 */
2893static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2894                                    struct dlm_lock_resource *res)
2895{
2896        enum dlm_lockres_list idx;
2897        struct list_head *queue = &res->granted;
2898        struct dlm_lock *lock;
2899        int noderef;
2900        u8 nodenum = O2NM_MAX_NODES;
2901
2902        assert_spin_locked(&dlm->spinlock);
2903        assert_spin_locked(&res->spinlock);
2904
2905        /* Go through all the locks */
2906        for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
2907                queue = dlm_list_idx_to_ptr(res, idx);
2908                list_for_each_entry(lock, queue, list) {
2909                        if (lock->ml.node == dlm->node_num)
2910                                continue;
2911                        if (test_bit(lock->ml.node, dlm->exit_domain_map))
2912                                continue;
2913                        nodenum = lock->ml.node;
2914                        goto bail;
2915                }
2916        }
2917
2918        /* Go thru the refmap */
2919        noderef = -1;
2920        while (1) {
2921                noderef = find_next_bit(res->refmap, O2NM_MAX_NODES,
2922                                        noderef + 1);
2923                if (noderef >= O2NM_MAX_NODES)
2924                        break;
2925                if (noderef == dlm->node_num)
2926                        continue;
2927                if (test_bit(noderef, dlm->exit_domain_map))
2928                        continue;
2929                nodenum = noderef;
2930                goto bail;
2931        }
2932
2933bail:
2934        return nodenum;
2935}
2936
2937/* this is called by the new master once all lockres
2938 * data has been received */
2939static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2940                                  struct dlm_lock_resource *res,
2941                                  u8 master, u8 new_master,
2942                                  struct dlm_node_iter *iter)
2943{
2944        struct dlm_migrate_request migrate;
2945        int ret, skip, status = 0;
2946        int nodenum;
2947
2948        memset(&migrate, 0, sizeof(migrate));
2949        migrate.namelen = res->lockname.len;
2950        memcpy(migrate.name, res->lockname.name, migrate.namelen);
2951        migrate.new_master = new_master;
2952        migrate.master = master;
2953
2954        ret = 0;
2955
2956        /* send message to all nodes, except the master and myself */
2957        while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
2958                if (nodenum == master ||
2959                    nodenum == new_master)
2960                        continue;
2961
2962                /* We could race exit domain. If exited, skip. */
2963                spin_lock(&dlm->spinlock);
2964                skip = (!test_bit(nodenum, dlm->domain_map));
2965                spin_unlock(&dlm->spinlock);
2966                if (skip) {
2967                        clear_bit(nodenum, iter->node_map);
2968                        continue;
2969                }
2970
2971                ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
2972                                         &migrate, sizeof(migrate), nodenum,
2973                                         &status);
2974                if (ret < 0) {
2975                        mlog(ML_ERROR, "%s: res %.*s, Error %d send "
2976                             "MIGRATE_REQUEST to node %u\n", dlm->name,
2977                             migrate.namelen, migrate.name, ret, nodenum);
2978                        if (!dlm_is_host_down(ret)) {
2979                                mlog(ML_ERROR, "unhandled error=%d!\n", ret);
2980                                BUG();
2981                        }
2982                        clear_bit(nodenum, iter->node_map);
2983                        ret = 0;
2984                } else if (status < 0) {
2985                        mlog(0, "migrate request (node %u) returned %d!\n",
2986                             nodenum, status);
2987                        ret = status;
2988                } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
2989                        /* during the migration request we short-circuited
2990                         * the mastery of the lockres.  make sure we have
2991                         * a mastery ref for nodenum */
2992                        mlog(0, "%s:%.*s: need ref for node %u\n",
2993                             dlm->name, res->lockname.len, res->lockname.name,
2994                             nodenum);
2995                        spin_lock(&res->spinlock);
2996                        dlm_lockres_set_refmap_bit(dlm, res, nodenum);
2997                        spin_unlock(&res->spinlock);
2998                }
2999        }
3000
3001        if (ret < 0)
3002                mlog_errno(ret);
3003
3004        mlog(0, "returning ret=%d\n", ret);
3005        return ret;
3006}
3007
3008
3009/* if there is an existing mle for this lockres, we now know who the master is.
3010 * (the one who sent us *this* message) we can clear it up right away.
3011 * since the process that put the mle on the list still has a reference to it,
3012 * we can unhash it now, set the master and wake the process.  as a result,
3013 * we will have no mle in the list to start with.  now we can add an mle for
3014 * the migration and this should be the only one found for those scanning the
3015 * list.  */
3016int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
3017                                void **ret_data)
3018{
3019        struct dlm_ctxt *dlm = data;
3020        struct dlm_lock_resource *res = NULL;
3021        struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
3022        struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
3023        const char *name;
3024        unsigned int namelen, hash;
3025        int ret = 0;
3026
3027        if (!dlm_grab(dlm))
3028                return -EINVAL;
3029
3030        name = migrate->name;
3031        namelen = migrate->namelen;
3032        hash = dlm_lockid_hash(name, namelen);
3033
3034        /* preallocate.. if this fails, abort */
3035        mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
3036
3037        if (!mle) {
3038                ret = -ENOMEM;
3039                goto leave;
3040        }
3041
3042        /* check for pre-existing lock */
3043        spin_lock(&dlm->spinlock);
3044        res = __dlm_lookup_lockres(dlm, name, namelen, hash);
3045        if (res) {
3046                spin_lock(&res->spinlock);
3047                if (res->state & DLM_LOCK_RES_RECOVERING) {
3048                        /* if all is working ok, this can only mean that we got
3049                        * a migrate request from a node that we now see as
3050                        * dead.  what can we do here?  drop it to the floor? */
3051                        spin_unlock(&res->spinlock);
3052                        mlog(ML_ERROR, "Got a migrate request, but the "
3053                             "lockres is marked as recovering!");
3054                        kmem_cache_free(dlm_mle_cache, mle);
3055                        ret = -EINVAL; /* need a better solution */
3056                        goto unlock;
3057                }
3058                res->state |= DLM_LOCK_RES_MIGRATING;
3059                spin_unlock(&res->spinlock);
3060        }
3061
3062        spin_lock(&dlm->master_lock);
3063        /* ignore status.  only nonzero status would BUG. */
3064        ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
3065                                    name, namelen,
3066                                    migrate->new_master,
3067                                    migrate->master);
3068
3069        spin_unlock(&dlm->master_lock);
3070unlock:
3071        spin_unlock(&dlm->spinlock);
3072
3073        if (oldmle) {
3074                /* master is known, detach if not already detached */
3075                dlm_mle_detach_hb_events(dlm, oldmle);
3076                dlm_put_mle(oldmle);
3077        }
3078
3079        if (res)
3080                dlm_lockres_put(res);
3081leave:
3082        dlm_put(dlm);
3083        return ret;
3084}
3085
3086/* must be holding dlm->spinlock and dlm->master_lock
3087 * when adding a migration mle, we can clear any other mles
3088 * in the master list because we know with certainty that
3089 * the master is "master".  so we remove any old mle from
3090 * the list after setting it's master field, and then add
3091 * the new migration mle.  this way we can hold with the rule
3092 * of having only one mle for a given lock name at all times. */
3093static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
3094                                 struct dlm_lock_resource *res,
3095                                 struct dlm_master_list_entry *mle,
3096                                 struct dlm_master_list_entry **oldmle,
3097                                 const char *name, unsigned int namelen,
3098                                 u8 new_master, u8 master)
3099{
3100        int found;
3101        int ret = 0;
3102
3103        *oldmle = NULL;
3104
3105        assert_spin_locked(&dlm->spinlock);
3106        assert_spin_locked(&dlm->master_lock);
3107
3108        /* caller is responsible for any ref taken here on oldmle */
3109        found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
3110        if (found) {
3111                struct dlm_master_list_entry *tmp = *oldmle;
3112                spin_lock(&tmp->spinlock);
3113                if (tmp->type == DLM_MLE_MIGRATION) {
3114                        if (master == dlm->node_num) {
3115                                /* ah another process raced me to it */
3116                                mlog(0, "tried to migrate %.*s, but some "
3117                                     "process beat me to it\n",
3118                                     namelen, name);
3119                                ret = -EEXIST;
3120                        } else {
3121                                /* bad.  2 NODES are trying to migrate! */
3122                                mlog(ML_ERROR, "migration error  mle: "
3123                                     "master=%u new_master=%u // request: "
3124                                     "master=%u new_master=%u // "
3125                                     "lockres=%.*s\n",
3126                                     tmp->master, tmp->new_master,
3127                                     master, new_master,
3128                                     namelen, name);
3129                                BUG();
3130                        }
3131                } else {
3132                        /* this is essentially what assert_master does */
3133                        tmp->master = master;
3134                        atomic_set(&tmp->woken, 1);
3135                        wake_up(&tmp->wq);
3136                        /* remove it so that only one mle will be found */
3137                        __dlm_unlink_mle(dlm, tmp);
3138                        __dlm_mle_detach_hb_events(dlm, tmp);
3139                        if (tmp->type == DLM_MLE_MASTER) {
3140                                ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
3141                                mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
3142                                                "telling master to get ref "
3143                                                "for cleared out mle during "
3144                                                "migration\n", dlm->name,
3145                                                namelen, name, master,
3146                                                new_master);
3147                        }
3148                }
3149                spin_unlock(&tmp->spinlock);
3150        }
3151
3152        /* now add a migration mle to the tail of the list */
3153        dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
3154        mle->new_master = new_master;
3155        /* the new master will be sending an assert master for this.
3156         * at that point we will get the refmap reference */
3157        mle->master = master;
3158        /* do this for consistency with other mle types */
3159        set_bit(new_master, mle->maybe_map);
3160        __dlm_insert_mle(dlm, mle);
3161
3162        return ret;
3163}
3164
3165/*
3166 * Sets the owner of the lockres, associated to the mle, to UNKNOWN
3167 */
3168static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
3169                                        struct dlm_master_list_entry *mle)
3170{
3171        struct dlm_lock_resource *res;
3172
3173        /* Find the lockres associated to the mle and set its owner to UNK */
3174        res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen,
3175                                   mle->mnamehash);
3176        if (res) {
3177                spin_unlock(&dlm->master_lock);
3178
3179                /* move lockres onto recovery list */
3180                spin_lock(&res->spinlock);
3181                dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
3182                dlm_move_lockres_to_recovery_list(dlm, res);
3183                spin_unlock(&res->spinlock);
3184                dlm_lockres_put(res);
3185
3186                /* about to get rid of mle, detach from heartbeat */
3187                __dlm_mle_detach_hb_events(dlm, mle);
3188
3189                /* dump the mle */
3190                spin_lock(&dlm->master_lock);
3191                __dlm_put_mle(mle);
3192                spin_unlock(&dlm->master_lock);
3193        }
3194
3195        return res;
3196}
3197
3198static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
3199                                    struct dlm_master_list_entry *mle)
3200{
3201        __dlm_mle_detach_hb_events(dlm, mle);
3202
3203        spin_lock(&mle->spinlock);
3204        __dlm_unlink_mle(dlm, mle);
3205        atomic_set(&mle->woken, 1);
3206        spin_unlock(&mle->spinlock);
3207
3208        wake_up(&mle->wq);
3209}
3210
3211static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
3212                                struct dlm_master_list_entry *mle, u8 dead_node)
3213{
3214        int bit;
3215
3216        BUG_ON(mle->type != DLM_MLE_BLOCK);
3217
3218        spin_lock(&mle->spinlock);
3219        bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
3220        if (bit != dead_node) {
3221                mlog(0, "mle found, but dead node %u would not have been "
3222                     "master\n", dead_node);
3223                spin_unlock(&mle->spinlock);
3224        } else {
3225                /* Must drop the refcount by one since the assert_master will
3226                 * never arrive. This may result in the mle being unlinked and
3227                 * freed, but there may still be a process waiting in the
3228                 * dlmlock path which is fine. */
3229                mlog(0, "node %u was expected master\n", dead_node);
3230                atomic_set(&mle->woken, 1);
3231                spin_unlock(&mle->spinlock);
3232                wake_up(&mle->wq);
3233
3234                /* Do not need events any longer, so detach from heartbeat */
3235                __dlm_mle_detach_hb_events(dlm, mle);
3236                __dlm_put_mle(mle);
3237        }
3238}
3239
3240void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
3241{
3242        struct dlm_master_list_entry *mle;
3243        struct dlm_lock_resource *res;
3244        struct hlist_head *bucket;
3245        struct hlist_node *tmp;
3246        unsigned int i;
3247
3248        mlog(0, "dlm=%s, dead node=%u\n", dlm->name, dead_node);
3249top:
3250        assert_spin_locked(&dlm->spinlock);
3251
3252        /* clean the master list */
3253        spin_lock(&dlm->master_lock);
3254        for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3255                bucket = dlm_master_hash(dlm, i);
3256                hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
3257                        BUG_ON(mle->type != DLM_MLE_BLOCK &&
3258                               mle->type != DLM_MLE_MASTER &&
3259                               mle->type != DLM_MLE_MIGRATION);
3260
3261                        /* MASTER mles are initiated locally. The waiting
3262                         * process will notice the node map change shortly.
3263                         * Let that happen as normal. */
3264                        if (mle->type == DLM_MLE_MASTER)
3265                                continue;
3266
3267                        /* BLOCK mles are initiated by other nodes. Need to
3268                         * clean up if the dead node would have been the
3269                         * master. */
3270                        if (mle->type == DLM_MLE_BLOCK) {
3271                                dlm_clean_block_mle(dlm, mle, dead_node);
3272                                continue;
3273                        }
3274
3275                        /* Everything else is a MIGRATION mle */
3276
3277                        /* The rule for MIGRATION mles is that the master
3278                         * becomes UNKNOWN if *either* the original or the new
3279                         * master dies. All UNKNOWN lockres' are sent to
3280                         * whichever node becomes the recovery master. The new
3281                         * master is responsible for determining if there is
3282                         * still a master for this lockres, or if he needs to
3283                         * take over mastery. Either way, this node should
3284                         * expect another message to resolve this. */
3285
3286                        if (mle->master != dead_node &&
3287                            mle->new_master != dead_node)
3288                                continue;
3289
3290                        /* If we have reached this point, this mle needs to be
3291                         * removed from the list and freed. */
3292                        dlm_clean_migration_mle(dlm, mle);
3293
3294                        mlog(0, "%s: node %u died during migration from "
3295                             "%u to %u!\n", dlm->name, dead_node, mle->master,
3296                             mle->new_master);
3297
3298                        /* If we find a lockres associated with the mle, we've
3299                         * hit this rare case that messes up our lock ordering.
3300                         * If so, we need to drop the master lock so that we can
3301                         * take the lockres lock, meaning that we will have to
3302                         * restart from the head of list. */
3303                        res = dlm_reset_mleres_owner(dlm, mle);
3304                        if (res)
3305                                /* restart */
3306                                goto top;
3307
3308                        /* This may be the last reference */
3309                        __dlm_put_mle(mle);
3310                }
3311        }
3312        spin_unlock(&dlm->master_lock);
3313}
3314
3315int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
3316                         u8 old_master)
3317{
3318        struct dlm_node_iter iter;
3319        int ret = 0;
3320
3321        spin_lock(&dlm->spinlock);
3322        dlm_node_iter_init(dlm->domain_map, &iter);
3323        clear_bit(old_master, iter.node_map);
3324        clear_bit(dlm->node_num, iter.node_map);
3325        spin_unlock(&dlm->spinlock);
3326
3327        /* ownership of the lockres is changing.  account for the
3328         * mastery reference here since old_master will briefly have
3329         * a reference after the migration completes */
3330        spin_lock(&res->spinlock);
3331        dlm_lockres_set_refmap_bit(dlm, res, old_master);
3332        spin_unlock(&res->spinlock);
3333
3334        mlog(0, "now time to do a migrate request to other nodes\n");
3335        ret = dlm_do_migrate_request(dlm, res, old_master,
3336                                     dlm->node_num, &iter);
3337        if (ret < 0) {
3338                mlog_errno(ret);
3339                goto leave;
3340        }
3341
3342        mlog(0, "doing assert master of %.*s to all except the original node\n",
3343             res->lockname.len, res->lockname.name);
3344        /* this call now finishes out the nodemap
3345         * even if one or more nodes die */
3346        ret = dlm_do_assert_master(dlm, res, iter.node_map,
3347                                   DLM_ASSERT_MASTER_FINISH_MIGRATION);
3348        if (ret < 0) {
3349                /* no longer need to retry.  all living nodes contacted. */
3350                mlog_errno(ret);
3351                ret = 0;
3352        }
3353
3354        memset(iter.node_map, 0, sizeof(iter.node_map));
3355        set_bit(old_master, iter.node_map);
3356        mlog(0, "doing assert master of %.*s back to %u\n",
3357             res->lockname.len, res->lockname.name, old_master);
3358        ret = dlm_do_assert_master(dlm, res, iter.node_map,
3359                                   DLM_ASSERT_MASTER_FINISH_MIGRATION);
3360        if (ret < 0) {
3361                mlog(0, "assert master to original master failed "
3362                     "with %d.\n", ret);
3363                /* the only nonzero status here would be because of
3364                 * a dead original node.  we're done. */
3365                ret = 0;
3366        }
3367
3368        /* all done, set the owner, clear the flag */
3369        spin_lock(&res->spinlock);
3370        dlm_set_lockres_owner(dlm, res, dlm->node_num);
3371        res->state &= ~DLM_LOCK_RES_MIGRATING;
3372        spin_unlock(&res->spinlock);
3373        /* re-dirty it on the new master */
3374        dlm_kick_thread(dlm, res);
3375        wake_up(&res->wq);
3376leave:
3377        return ret;
3378}
3379
3380/*
3381 * LOCKRES AST REFCOUNT
3382 * this is integral to migration
3383 */
3384
3385/* for future intent to call an ast, reserve one ahead of time.
3386 * this should be called only after waiting on the lockres
3387 * with dlm_wait_on_lockres, and while still holding the
3388 * spinlock after the call. */
3389void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
3390{
3391        assert_spin_locked(&res->spinlock);
3392        if (res->state & DLM_LOCK_RES_MIGRATING) {
3393                __dlm_print_one_lock_resource(res);
3394        }
3395        BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3396
3397        atomic_inc(&res->asts_reserved);
3398}
3399
3400/*
3401 * used to drop the reserved ast, either because it went unused,
3402 * or because the ast/bast was actually called.
3403 *
3404 * also, if there is a pending migration on this lockres,
3405 * and this was the last pending ast on the lockres,
3406 * atomically set the MIGRATING flag before we drop the lock.
3407 * this is how we ensure that migration can proceed with no
3408 * asts in progress.  note that it is ok if the state of the
3409 * queues is such that a lock should be granted in the future
3410 * or that a bast should be fired, because the new master will
3411 * shuffle the lists on this lockres as soon as it is migrated.
3412 */
3413void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
3414                             struct dlm_lock_resource *res)
3415{
3416        if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
3417                return;
3418
3419        if (!res->migration_pending) {
3420                spin_unlock(&res->spinlock);
3421                return;
3422        }
3423
3424        BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3425        res->migration_pending = 0;
3426        res->state |= DLM_LOCK_RES_MIGRATING;
3427        spin_unlock(&res->spinlock);
3428        wake_up(&res->wq);
3429        wake_up(&dlm->migration_wq);
3430}
3431
3432void dlm_force_free_mles(struct dlm_ctxt *dlm)
3433{
3434        int i;
3435        struct hlist_head *bucket;
3436        struct dlm_master_list_entry *mle;
3437        struct hlist_node *tmp;
3438
3439        /*
3440         * We notified all other nodes that we are exiting the domain and
3441         * marked the dlm state to DLM_CTXT_LEAVING. If any mles are still
3442         * around we force free them and wake any processes that are waiting
3443         * on the mles
3444         */
3445        spin_lock(&dlm->spinlock);
3446        spin_lock(&dlm->master_lock);
3447
3448        BUG_ON(dlm->dlm_state != DLM_CTXT_LEAVING);
3449        BUG_ON((find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 0) < O2NM_MAX_NODES));
3450
3451        for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3452                bucket = dlm_master_hash(dlm, i);
3453                hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
3454                        if (mle->type != DLM_MLE_BLOCK) {
3455                                mlog(ML_ERROR, "bad mle: %p\n", mle);
3456                                dlm_print_one_mle(mle);
3457                        }
3458                        atomic_set(&mle->woken, 1);
3459                        wake_up(&mle->wq);
3460
3461                        __dlm_unlink_mle(dlm, mle);
3462                        __dlm_mle_detach_hb_events(dlm, mle);
3463                        __dlm_put_mle(mle);
3464                }
3465        }
3466        spin_unlock(&dlm->master_lock);
3467        spin_unlock(&dlm->spinlock);
3468}
3469