linux/fs/ocfs2/dlm/dlmmaster.c
<<
>>
Prefs
   1/* -*- mode: c; c-basic-offset: 8; -*-
   2 * vim: noexpandtab sw=8 ts=8 sts=0:
   3 *
   4 * dlmmod.c
   5 *
   6 * standalone DLM module
   7 *
   8 * Copyright (C) 2004 Oracle.  All rights reserved.
   9 *
  10 * This program is free software; you can redistribute it and/or
  11 * modify it under the terms of the GNU General Public
  12 * License as published by the Free Software Foundation; either
  13 * version 2 of the License, or (at your option) any later version.
  14 *
  15 * This program is distributed in the hope that it will be useful,
  16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  18 * General Public License for more details.
  19 *
  20 * You should have received a copy of the GNU General Public
  21 * License along with this program; if not, write to the
  22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  23 * Boston, MA 021110-1307, USA.
  24 *
  25 */
  26
  27
  28#include <linux/module.h>
  29#include <linux/fs.h>
  30#include <linux/types.h>
  31#include <linux/slab.h>
  32#include <linux/highmem.h>
  33#include <linux/init.h>
  34#include <linux/sysctl.h>
  35#include <linux/random.h>
  36#include <linux/blkdev.h>
  37#include <linux/socket.h>
  38#include <linux/inet.h>
  39#include <linux/spinlock.h>
  40#include <linux/delay.h>
  41
  42
  43#include "cluster/heartbeat.h"
  44#include "cluster/nodemanager.h"
  45#include "cluster/tcp.h"
  46
  47#include "dlmapi.h"
  48#include "dlmcommon.h"
  49#include "dlmdomain.h"
  50#include "dlmdebug.h"
  51
  52#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
  53#include "cluster/masklog.h"
  54
  55static void dlm_mle_node_down(struct dlm_ctxt *dlm,
  56                              struct dlm_master_list_entry *mle,
  57                              struct o2nm_node *node,
  58                              int idx);
  59static void dlm_mle_node_up(struct dlm_ctxt *dlm,
  60                            struct dlm_master_list_entry *mle,
  61                            struct o2nm_node *node,
  62                            int idx);
  63
  64static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
  65static int dlm_do_assert_master(struct dlm_ctxt *dlm,
  66                                struct dlm_lock_resource *res,
  67                                void *nodemap, u32 flags);
  68static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
  69
  70static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
  71                                struct dlm_master_list_entry *mle,
  72                                const char *name,
  73                                unsigned int namelen)
  74{
  75        if (dlm != mle->dlm)
  76                return 0;
  77
  78        if (namelen != mle->mnamelen ||
  79            memcmp(name, mle->mname, namelen) != 0)
  80                return 0;
  81
  82        return 1;
  83}
  84
  85static struct kmem_cache *dlm_lockres_cache = NULL;
  86static struct kmem_cache *dlm_lockname_cache = NULL;
  87static struct kmem_cache *dlm_mle_cache = NULL;
  88
  89static void dlm_mle_release(struct kref *kref);
  90static void dlm_init_mle(struct dlm_master_list_entry *mle,
  91                        enum dlm_mle_type type,
  92                        struct dlm_ctxt *dlm,
  93                        struct dlm_lock_resource *res,
  94                        const char *name,
  95                        unsigned int namelen);
  96static void dlm_put_mle(struct dlm_master_list_entry *mle);
  97static void __dlm_put_mle(struct dlm_master_list_entry *mle);
  98static int dlm_find_mle(struct dlm_ctxt *dlm,
  99                        struct dlm_master_list_entry **mle,
 100                        char *name, unsigned int namelen);
 101
 102static int dlm_do_master_request(struct dlm_lock_resource *res,
 103                                 struct dlm_master_list_entry *mle, int to);
 104
 105
 106static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
 107                                     struct dlm_lock_resource *res,
 108                                     struct dlm_master_list_entry *mle,
 109                                     int *blocked);
 110static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
 111                                    struct dlm_lock_resource *res,
 112                                    struct dlm_master_list_entry *mle,
 113                                    int blocked);
 114static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
 115                                 struct dlm_lock_resource *res,
 116                                 struct dlm_master_list_entry *mle,
 117                                 struct dlm_master_list_entry **oldmle,
 118                                 const char *name, unsigned int namelen,
 119                                 u8 new_master, u8 master);
 120
 121static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
 122                                    struct dlm_lock_resource *res);
 123static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
 124                                      struct dlm_lock_resource *res);
 125static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
 126                                       struct dlm_lock_resource *res,
 127                                       u8 target);
 128static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
 129                                       struct dlm_lock_resource *res);
 130
 131
 132int dlm_is_host_down(int errno)
 133{
 134        switch (errno) {
 135                case -EBADF:
 136                case -ECONNREFUSED:
 137                case -ENOTCONN:
 138                case -ECONNRESET:
 139                case -EPIPE:
 140                case -EHOSTDOWN:
 141                case -EHOSTUNREACH:
 142                case -ETIMEDOUT:
 143                case -ECONNABORTED:
 144                case -ENETDOWN:
 145                case -ENETUNREACH:
 146                case -ENETRESET:
 147                case -ESHUTDOWN:
 148                case -ENOPROTOOPT:
 149                case -EINVAL:   /* if returned from our tcp code,
 150                                   this means there is no socket */
 151                        return 1;
 152        }
 153        return 0;
 154}
 155
 156
 157/*
 158 * MASTER LIST FUNCTIONS
 159 */
 160
 161
 162/*
 163 * regarding master list entries and heartbeat callbacks:
 164 *
 165 * in order to avoid sleeping and allocation that occurs in
 166 * heartbeat, master list entries are simply attached to the
 167 * dlm's established heartbeat callbacks.  the mle is attached
 168 * when it is created, and since the dlm->spinlock is held at
 169 * that time, any heartbeat event will be properly discovered
 170 * by the mle.  the mle needs to be detached from the
 171 * dlm->mle_hb_events list as soon as heartbeat events are no
 172 * longer useful to the mle, and before the mle is freed.
 173 *
 174 * as a general rule, heartbeat events are no longer needed by
 175 * the mle once an "answer" regarding the lock master has been
 176 * received.
 177 */
 178static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
 179                                              struct dlm_master_list_entry *mle)
 180{
 181        assert_spin_locked(&dlm->spinlock);
 182
 183        list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
 184}
 185
 186
 187static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
 188                                              struct dlm_master_list_entry *mle)
 189{
 190        if (!list_empty(&mle->hb_events))
 191                list_del_init(&mle->hb_events);
 192}
 193
 194
 195static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
 196                                            struct dlm_master_list_entry *mle)
 197{
 198        spin_lock(&dlm->spinlock);
 199        __dlm_mle_detach_hb_events(dlm, mle);
 200        spin_unlock(&dlm->spinlock);
 201}
 202
 203static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
 204{
 205        struct dlm_ctxt *dlm;
 206        dlm = mle->dlm;
 207
 208        assert_spin_locked(&dlm->spinlock);
 209        assert_spin_locked(&dlm->master_lock);
 210        mle->inuse++;
 211        kref_get(&mle->mle_refs);
 212}
 213
 214static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
 215{
 216        struct dlm_ctxt *dlm;
 217        dlm = mle->dlm;
 218
 219        spin_lock(&dlm->spinlock);
 220        spin_lock(&dlm->master_lock);
 221        mle->inuse--;
 222        __dlm_put_mle(mle);
 223        spin_unlock(&dlm->master_lock);
 224        spin_unlock(&dlm->spinlock);
 225
 226}
 227
 228/* remove from list and free */
 229static void __dlm_put_mle(struct dlm_master_list_entry *mle)
 230{
 231        struct dlm_ctxt *dlm;
 232        dlm = mle->dlm;
 233
 234        assert_spin_locked(&dlm->spinlock);
 235        assert_spin_locked(&dlm->master_lock);
 236        if (!atomic_read(&mle->mle_refs.refcount)) {
 237                /* this may or may not crash, but who cares.
 238                 * it's a BUG. */
 239                mlog(ML_ERROR, "bad mle: %p\n", mle);
 240                dlm_print_one_mle(mle);
 241                BUG();
 242        } else
 243                kref_put(&mle->mle_refs, dlm_mle_release);
 244}
 245
 246
 247/* must not have any spinlocks coming in */
 248static void dlm_put_mle(struct dlm_master_list_entry *mle)
 249{
 250        struct dlm_ctxt *dlm;
 251        dlm = mle->dlm;
 252
 253        spin_lock(&dlm->spinlock);
 254        spin_lock(&dlm->master_lock);
 255        __dlm_put_mle(mle);
 256        spin_unlock(&dlm->master_lock);
 257        spin_unlock(&dlm->spinlock);
 258}
 259
 260static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
 261{
 262        kref_get(&mle->mle_refs);
 263}
 264
 265static void dlm_init_mle(struct dlm_master_list_entry *mle,
 266                        enum dlm_mle_type type,
 267                        struct dlm_ctxt *dlm,
 268                        struct dlm_lock_resource *res,
 269                        const char *name,
 270                        unsigned int namelen)
 271{
 272        assert_spin_locked(&dlm->spinlock);
 273
 274        mle->dlm = dlm;
 275        mle->type = type;
 276        INIT_HLIST_NODE(&mle->master_hash_node);
 277        INIT_LIST_HEAD(&mle->hb_events);
 278        memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
 279        spin_lock_init(&mle->spinlock);
 280        init_waitqueue_head(&mle->wq);
 281        atomic_set(&mle->woken, 0);
 282        kref_init(&mle->mle_refs);
 283        memset(mle->response_map, 0, sizeof(mle->response_map));
 284        mle->master = O2NM_MAX_NODES;
 285        mle->new_master = O2NM_MAX_NODES;
 286        mle->inuse = 0;
 287
 288        BUG_ON(mle->type != DLM_MLE_BLOCK &&
 289               mle->type != DLM_MLE_MASTER &&
 290               mle->type != DLM_MLE_MIGRATION);
 291
 292        if (mle->type == DLM_MLE_MASTER) {
 293                BUG_ON(!res);
 294                mle->mleres = res;
 295                memcpy(mle->mname, res->lockname.name, res->lockname.len);
 296                mle->mnamelen = res->lockname.len;
 297                mle->mnamehash = res->lockname.hash;
 298        } else {
 299                BUG_ON(!name);
 300                mle->mleres = NULL;
 301                memcpy(mle->mname, name, namelen);
 302                mle->mnamelen = namelen;
 303                mle->mnamehash = dlm_lockid_hash(name, namelen);
 304        }
 305
 306        atomic_inc(&dlm->mle_tot_count[mle->type]);
 307        atomic_inc(&dlm->mle_cur_count[mle->type]);
 308
 309        /* copy off the node_map and register hb callbacks on our copy */
 310        memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
 311        memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
 312        clear_bit(dlm->node_num, mle->vote_map);
 313        clear_bit(dlm->node_num, mle->node_map);
 314
 315        /* attach the mle to the domain node up/down events */
 316        __dlm_mle_attach_hb_events(dlm, mle);
 317}
 318
 319void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
 320{
 321        assert_spin_locked(&dlm->spinlock);
 322        assert_spin_locked(&dlm->master_lock);
 323
 324        if (!hlist_unhashed(&mle->master_hash_node))
 325                hlist_del_init(&mle->master_hash_node);
 326}
 327
 328void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
 329{
 330        struct hlist_head *bucket;
 331
 332        assert_spin_locked(&dlm->master_lock);
 333
 334        bucket = dlm_master_hash(dlm, mle->mnamehash);
 335        hlist_add_head(&mle->master_hash_node, bucket);
 336}
 337
 338/* returns 1 if found, 0 if not */
 339static int dlm_find_mle(struct dlm_ctxt *dlm,
 340                        struct dlm_master_list_entry **mle,
 341                        char *name, unsigned int namelen)
 342{
 343        struct dlm_master_list_entry *tmpmle;
 344        struct hlist_head *bucket;
 345        struct hlist_node *list;
 346        unsigned int hash;
 347
 348        assert_spin_locked(&dlm->master_lock);
 349
 350        hash = dlm_lockid_hash(name, namelen);
 351        bucket = dlm_master_hash(dlm, hash);
 352        hlist_for_each(list, bucket) {
 353                tmpmle = hlist_entry(list, struct dlm_master_list_entry,
 354                                     master_hash_node);
 355                if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
 356                        continue;
 357                dlm_get_mle(tmpmle);
 358                *mle = tmpmle;
 359                return 1;
 360        }
 361        return 0;
 362}
 363
 364void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
 365{
 366        struct dlm_master_list_entry *mle;
 367
 368        assert_spin_locked(&dlm->spinlock);
 369        
 370        list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
 371                if (node_up)
 372                        dlm_mle_node_up(dlm, mle, NULL, idx);
 373                else
 374                        dlm_mle_node_down(dlm, mle, NULL, idx);
 375        }
 376}
 377
 378static void dlm_mle_node_down(struct dlm_ctxt *dlm,
 379                              struct dlm_master_list_entry *mle,
 380                              struct o2nm_node *node, int idx)
 381{
 382        spin_lock(&mle->spinlock);
 383
 384        if (!test_bit(idx, mle->node_map))
 385                mlog(0, "node %u already removed from nodemap!\n", idx);
 386        else
 387                clear_bit(idx, mle->node_map);
 388
 389        spin_unlock(&mle->spinlock);
 390}
 391
 392static void dlm_mle_node_up(struct dlm_ctxt *dlm,
 393                            struct dlm_master_list_entry *mle,
 394                            struct o2nm_node *node, int idx)
 395{
 396        spin_lock(&mle->spinlock);
 397
 398        if (test_bit(idx, mle->node_map))
 399                mlog(0, "node %u already in node map!\n", idx);
 400        else
 401                set_bit(idx, mle->node_map);
 402
 403        spin_unlock(&mle->spinlock);
 404}
 405
 406
 407int dlm_init_mle_cache(void)
 408{
 409        dlm_mle_cache = kmem_cache_create("o2dlm_mle",
 410                                          sizeof(struct dlm_master_list_entry),
 411                                          0, SLAB_HWCACHE_ALIGN,
 412                                          NULL);
 413        if (dlm_mle_cache == NULL)
 414                return -ENOMEM;
 415        return 0;
 416}
 417
 418void dlm_destroy_mle_cache(void)
 419{
 420        if (dlm_mle_cache)
 421                kmem_cache_destroy(dlm_mle_cache);
 422}
 423
 424static void dlm_mle_release(struct kref *kref)
 425{
 426        struct dlm_master_list_entry *mle;
 427        struct dlm_ctxt *dlm;
 428
 429        mlog_entry_void();
 430
 431        mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
 432        dlm = mle->dlm;
 433
 434        assert_spin_locked(&dlm->spinlock);
 435        assert_spin_locked(&dlm->master_lock);
 436
 437        mlog(0, "Releasing mle for %.*s, type %d\n", mle->mnamelen, mle->mname,
 438             mle->type);
 439
 440        /* remove from list if not already */
 441        __dlm_unlink_mle(dlm, mle);
 442
 443        /* detach the mle from the domain node up/down events */
 444        __dlm_mle_detach_hb_events(dlm, mle);
 445
 446        atomic_dec(&dlm->mle_cur_count[mle->type]);
 447
 448        /* NOTE: kfree under spinlock here.
 449         * if this is bad, we can move this to a freelist. */
 450        kmem_cache_free(dlm_mle_cache, mle);
 451}
 452
 453
 454/*
 455 * LOCK RESOURCE FUNCTIONS
 456 */
 457
 458int dlm_init_master_caches(void)
 459{
 460        dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
 461                                              sizeof(struct dlm_lock_resource),
 462                                              0, SLAB_HWCACHE_ALIGN, NULL);
 463        if (!dlm_lockres_cache)
 464                goto bail;
 465
 466        dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
 467                                               DLM_LOCKID_NAME_MAX, 0,
 468                                               SLAB_HWCACHE_ALIGN, NULL);
 469        if (!dlm_lockname_cache)
 470                goto bail;
 471
 472        return 0;
 473bail:
 474        dlm_destroy_master_caches();
 475        return -ENOMEM;
 476}
 477
 478void dlm_destroy_master_caches(void)
 479{
 480        if (dlm_lockname_cache)
 481                kmem_cache_destroy(dlm_lockname_cache);
 482
 483        if (dlm_lockres_cache)
 484                kmem_cache_destroy(dlm_lockres_cache);
 485}
 486
 487static void dlm_lockres_release(struct kref *kref)
 488{
 489        struct dlm_lock_resource *res;
 490        struct dlm_ctxt *dlm;
 491
 492        res = container_of(kref, struct dlm_lock_resource, refs);
 493        dlm = res->dlm;
 494
 495        /* This should not happen -- all lockres' have a name
 496         * associated with them at init time. */
 497        BUG_ON(!res->lockname.name);
 498
 499        mlog(0, "destroying lockres %.*s\n", res->lockname.len,
 500             res->lockname.name);
 501
 502        spin_lock(&dlm->track_lock);
 503        if (!list_empty(&res->tracking))
 504                list_del_init(&res->tracking);
 505        else {
 506                mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n",
 507                     res->lockname.len, res->lockname.name);
 508                dlm_print_one_lock_resource(res);
 509        }
 510        spin_unlock(&dlm->track_lock);
 511
 512        atomic_dec(&dlm->res_cur_count);
 513
 514        dlm_put(dlm);
 515
 516        if (!hlist_unhashed(&res->hash_node) ||
 517            !list_empty(&res->granted) ||
 518            !list_empty(&res->converting) ||
 519            !list_empty(&res->blocked) ||
 520            !list_empty(&res->dirty) ||
 521            !list_empty(&res->recovering) ||
 522            !list_empty(&res->purge)) {
 523                mlog(ML_ERROR,
 524                     "Going to BUG for resource %.*s."
 525                     "  We're on a list! [%c%c%c%c%c%c%c]\n",
 526                     res->lockname.len, res->lockname.name,
 527                     !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
 528                     !list_empty(&res->granted) ? 'G' : ' ',
 529                     !list_empty(&res->converting) ? 'C' : ' ',
 530                     !list_empty(&res->blocked) ? 'B' : ' ',
 531                     !list_empty(&res->dirty) ? 'D' : ' ',
 532                     !list_empty(&res->recovering) ? 'R' : ' ',
 533                     !list_empty(&res->purge) ? 'P' : ' ');
 534
 535                dlm_print_one_lock_resource(res);
 536        }
 537
 538        /* By the time we're ready to blow this guy away, we shouldn't
 539         * be on any lists. */
 540        BUG_ON(!hlist_unhashed(&res->hash_node));
 541        BUG_ON(!list_empty(&res->granted));
 542        BUG_ON(!list_empty(&res->converting));
 543        BUG_ON(!list_empty(&res->blocked));
 544        BUG_ON(!list_empty(&res->dirty));
 545        BUG_ON(!list_empty(&res->recovering));
 546        BUG_ON(!list_empty(&res->purge));
 547
 548        kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
 549
 550        kmem_cache_free(dlm_lockres_cache, res);
 551}
 552
 553void dlm_lockres_put(struct dlm_lock_resource *res)
 554{
 555        kref_put(&res->refs, dlm_lockres_release);
 556}
 557
 558static void dlm_init_lockres(struct dlm_ctxt *dlm,
 559                             struct dlm_lock_resource *res,
 560                             const char *name, unsigned int namelen)
 561{
 562        char *qname;
 563
 564        /* If we memset here, we lose our reference to the kmalloc'd
 565         * res->lockname.name, so be sure to init every field
 566         * correctly! */
 567
 568        qname = (char *) res->lockname.name;
 569        memcpy(qname, name, namelen);
 570
 571        res->lockname.len = namelen;
 572        res->lockname.hash = dlm_lockid_hash(name, namelen);
 573
 574        init_waitqueue_head(&res->wq);
 575        spin_lock_init(&res->spinlock);
 576        INIT_HLIST_NODE(&res->hash_node);
 577        INIT_LIST_HEAD(&res->granted);
 578        INIT_LIST_HEAD(&res->converting);
 579        INIT_LIST_HEAD(&res->blocked);
 580        INIT_LIST_HEAD(&res->dirty);
 581        INIT_LIST_HEAD(&res->recovering);
 582        INIT_LIST_HEAD(&res->purge);
 583        INIT_LIST_HEAD(&res->tracking);
 584        atomic_set(&res->asts_reserved, 0);
 585        res->migration_pending = 0;
 586        res->inflight_locks = 0;
 587
 588        /* put in dlm_lockres_release */
 589        dlm_grab(dlm);
 590        res->dlm = dlm;
 591
 592        kref_init(&res->refs);
 593
 594        atomic_inc(&dlm->res_tot_count);
 595        atomic_inc(&dlm->res_cur_count);
 596
 597        /* just for consistency */
 598        spin_lock(&res->spinlock);
 599        dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
 600        spin_unlock(&res->spinlock);
 601
 602        res->state = DLM_LOCK_RES_IN_PROGRESS;
 603
 604        res->last_used = 0;
 605
 606        spin_lock(&dlm->spinlock);
 607        list_add_tail(&res->tracking, &dlm->tracking_list);
 608        spin_unlock(&dlm->spinlock);
 609
 610        memset(res->lvb, 0, DLM_LVB_LEN);
 611        memset(res->refmap, 0, sizeof(res->refmap));
 612}
 613
 614struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
 615                                   const char *name,
 616                                   unsigned int namelen)
 617{
 618        struct dlm_lock_resource *res = NULL;
 619
 620        res = (struct dlm_lock_resource *)
 621                                kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
 622        if (!res)
 623                goto error;
 624
 625        res->lockname.name = (char *)
 626                                kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
 627        if (!res->lockname.name)
 628                goto error;
 629
 630        dlm_init_lockres(dlm, res, name, namelen);
 631        return res;
 632
 633error:
 634        if (res && res->lockname.name)
 635                kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
 636
 637        if (res)
 638                kmem_cache_free(dlm_lockres_cache, res);
 639        return NULL;
 640}
 641
 642void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
 643                                   struct dlm_lock_resource *res,
 644                                   int new_lockres,
 645                                   const char *file,
 646                                   int line)
 647{
 648        if (!new_lockres)
 649                assert_spin_locked(&res->spinlock);
 650
 651        if (!test_bit(dlm->node_num, res->refmap)) {
 652                BUG_ON(res->inflight_locks != 0);
 653                dlm_lockres_set_refmap_bit(dlm->node_num, res);
 654        }
 655        res->inflight_locks++;
 656        mlog(0, "%s:%.*s: inflight++: now %u\n",
 657             dlm->name, res->lockname.len, res->lockname.name,
 658             res->inflight_locks);
 659}
 660
 661void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
 662                                   struct dlm_lock_resource *res,
 663                                   const char *file,
 664                                   int line)
 665{
 666        assert_spin_locked(&res->spinlock);
 667
 668        BUG_ON(res->inflight_locks == 0);
 669        res->inflight_locks--;
 670        mlog(0, "%s:%.*s: inflight--: now %u\n",
 671             dlm->name, res->lockname.len, res->lockname.name,
 672             res->inflight_locks);
 673        if (res->inflight_locks == 0)
 674                dlm_lockres_clear_refmap_bit(dlm->node_num, res);
 675        wake_up(&res->wq);
 676}
 677
 678/*
 679 * lookup a lock resource by name.
 680 * may already exist in the hashtable.
 681 * lockid is null terminated
 682 *
 683 * if not, allocate enough for the lockres and for
 684 * the temporary structure used in doing the mastering.
 685 *
 686 * also, do a lookup in the dlm->master_list to see
 687 * if another node has begun mastering the same lock.
 688 * if so, there should be a block entry in there
 689 * for this name, and we should *not* attempt to master
 690 * the lock here.   need to wait around for that node
 691 * to assert_master (or die).
 692 *
 693 */
 694struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
 695                                          const char *lockid,
 696                                          int namelen,
 697                                          int flags)
 698{
 699        struct dlm_lock_resource *tmpres=NULL, *res=NULL;
 700        struct dlm_master_list_entry *mle = NULL;
 701        struct dlm_master_list_entry *alloc_mle = NULL;
 702        int blocked = 0;
 703        int ret, nodenum;
 704        struct dlm_node_iter iter;
 705        unsigned int hash;
 706        int tries = 0;
 707        int bit, wait_on_recovery = 0;
 708        int drop_inflight_if_nonlocal = 0;
 709
 710        BUG_ON(!lockid);
 711
 712        hash = dlm_lockid_hash(lockid, namelen);
 713
 714        mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
 715
 716lookup:
 717        spin_lock(&dlm->spinlock);
 718        tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
 719        if (tmpres) {
 720                int dropping_ref = 0;
 721
 722                spin_unlock(&dlm->spinlock);
 723
 724                spin_lock(&tmpres->spinlock);
 725                /* We wait for the other thread that is mastering the resource */
 726                if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
 727                        __dlm_wait_on_lockres(tmpres);
 728                        BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
 729                }
 730
 731                if (tmpres->owner == dlm->node_num) {
 732                        BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
 733                        dlm_lockres_grab_inflight_ref(dlm, tmpres);
 734                } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
 735                        dropping_ref = 1;
 736                spin_unlock(&tmpres->spinlock);
 737
 738                /* wait until done messaging the master, drop our ref to allow
 739                 * the lockres to be purged, start over. */
 740                if (dropping_ref) {
 741                        spin_lock(&tmpres->spinlock);
 742                        __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF);
 743                        spin_unlock(&tmpres->spinlock);
 744                        dlm_lockres_put(tmpres);
 745                        tmpres = NULL;
 746                        goto lookup;
 747                }
 748
 749                mlog(0, "found in hash!\n");
 750                if (res)
 751                        dlm_lockres_put(res);
 752                res = tmpres;
 753                goto leave;
 754        }
 755
 756        if (!res) {
 757                spin_unlock(&dlm->spinlock);
 758                mlog(0, "allocating a new resource\n");
 759                /* nothing found and we need to allocate one. */
 760                alloc_mle = (struct dlm_master_list_entry *)
 761                        kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
 762                if (!alloc_mle)
 763                        goto leave;
 764                res = dlm_new_lockres(dlm, lockid, namelen);
 765                if (!res)
 766                        goto leave;
 767                goto lookup;
 768        }
 769
 770        mlog(0, "no lockres found, allocated our own: %p\n", res);
 771
 772        if (flags & LKM_LOCAL) {
 773                /* caller knows it's safe to assume it's not mastered elsewhere
 774                 * DONE!  return right away */
 775                spin_lock(&res->spinlock);
 776                dlm_change_lockres_owner(dlm, res, dlm->node_num);
 777                __dlm_insert_lockres(dlm, res);
 778                dlm_lockres_grab_inflight_ref(dlm, res);
 779                spin_unlock(&res->spinlock);
 780                spin_unlock(&dlm->spinlock);
 781                /* lockres still marked IN_PROGRESS */
 782                goto wake_waiters;
 783        }
 784
 785        /* check master list to see if another node has started mastering it */
 786        spin_lock(&dlm->master_lock);
 787
 788        /* if we found a block, wait for lock to be mastered by another node */
 789        blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
 790        if (blocked) {
 791                int mig;
 792                if (mle->type == DLM_MLE_MASTER) {
 793                        mlog(ML_ERROR, "master entry for nonexistent lock!\n");
 794                        BUG();
 795                }
 796                mig = (mle->type == DLM_MLE_MIGRATION);
 797                /* if there is a migration in progress, let the migration
 798                 * finish before continuing.  we can wait for the absence
 799                 * of the MIGRATION mle: either the migrate finished or
 800                 * one of the nodes died and the mle was cleaned up.
 801                 * if there is a BLOCK here, but it already has a master
 802                 * set, we are too late.  the master does not have a ref
 803                 * for us in the refmap.  detach the mle and drop it.
 804                 * either way, go back to the top and start over. */
 805                if (mig || mle->master != O2NM_MAX_NODES) {
 806                        BUG_ON(mig && mle->master == dlm->node_num);
 807                        /* we arrived too late.  the master does not
 808                         * have a ref for us. retry. */
 809                        mlog(0, "%s:%.*s: late on %s\n",
 810                             dlm->name, namelen, lockid,
 811                             mig ?  "MIGRATION" : "BLOCK");
 812                        spin_unlock(&dlm->master_lock);
 813                        spin_unlock(&dlm->spinlock);
 814
 815                        /* master is known, detach */
 816                        if (!mig)
 817                                dlm_mle_detach_hb_events(dlm, mle);
 818                        dlm_put_mle(mle);
 819                        mle = NULL;
 820                        /* this is lame, but we cant wait on either
 821                         * the mle or lockres waitqueue here */
 822                        if (mig)
 823                                msleep(100);
 824                        goto lookup;
 825                }
 826        } else {
 827                /* go ahead and try to master lock on this node */
 828                mle = alloc_mle;
 829                /* make sure this does not get freed below */
 830                alloc_mle = NULL;
 831                dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
 832                set_bit(dlm->node_num, mle->maybe_map);
 833                __dlm_insert_mle(dlm, mle);
 834
 835                /* still holding the dlm spinlock, check the recovery map
 836                 * to see if there are any nodes that still need to be 
 837                 * considered.  these will not appear in the mle nodemap
 838                 * but they might own this lockres.  wait on them. */
 839                bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
 840                if (bit < O2NM_MAX_NODES) {
 841                        mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
 842                             "recover before lock mastery can begin\n",
 843                             dlm->name, namelen, (char *)lockid, bit);
 844                        wait_on_recovery = 1;
 845                }
 846        }
 847
 848        /* at this point there is either a DLM_MLE_BLOCK or a
 849         * DLM_MLE_MASTER on the master list, so it's safe to add the
 850         * lockres to the hashtable.  anyone who finds the lock will
 851         * still have to wait on the IN_PROGRESS. */
 852
 853        /* finally add the lockres to its hash bucket */
 854        __dlm_insert_lockres(dlm, res);
 855        /* since this lockres is new it doesnt not require the spinlock */
 856        dlm_lockres_grab_inflight_ref_new(dlm, res);
 857
 858        /* if this node does not become the master make sure to drop
 859         * this inflight reference below */
 860        drop_inflight_if_nonlocal = 1;
 861
 862        /* get an extra ref on the mle in case this is a BLOCK
 863         * if so, the creator of the BLOCK may try to put the last
 864         * ref at this time in the assert master handler, so we
 865         * need an extra one to keep from a bad ptr deref. */
 866        dlm_get_mle_inuse(mle);
 867        spin_unlock(&dlm->master_lock);
 868        spin_unlock(&dlm->spinlock);
 869
 870redo_request:
 871        while (wait_on_recovery) {
 872                /* any cluster changes that occurred after dropping the
 873                 * dlm spinlock would be detectable be a change on the mle,
 874                 * so we only need to clear out the recovery map once. */
 875                if (dlm_is_recovery_lock(lockid, namelen)) {
 876                        mlog(ML_NOTICE, "%s: recovery map is not empty, but "
 877                             "must master $RECOVERY lock now\n", dlm->name);
 878                        if (!dlm_pre_master_reco_lockres(dlm, res))
 879                                wait_on_recovery = 0;
 880                        else {
 881                                mlog(0, "%s: waiting 500ms for heartbeat state "
 882                                    "change\n", dlm->name);
 883                                msleep(500);
 884                        }
 885                        continue;
 886                } 
 887
 888                dlm_kick_recovery_thread(dlm);
 889                msleep(1000);
 890                dlm_wait_for_recovery(dlm);
 891
 892                spin_lock(&dlm->spinlock);
 893                bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
 894                if (bit < O2NM_MAX_NODES) {
 895                        mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
 896                             "recover before lock mastery can begin\n",
 897                             dlm->name, namelen, (char *)lockid, bit);
 898                        wait_on_recovery = 1;
 899                } else
 900                        wait_on_recovery = 0;
 901                spin_unlock(&dlm->spinlock);
 902
 903                if (wait_on_recovery)
 904                        dlm_wait_for_node_recovery(dlm, bit, 10000);
 905        }
 906
 907        /* must wait for lock to be mastered elsewhere */
 908        if (blocked)
 909                goto wait;
 910
 911        ret = -EINVAL;
 912        dlm_node_iter_init(mle->vote_map, &iter);
 913        while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
 914                ret = dlm_do_master_request(res, mle, nodenum);
 915                if (ret < 0)
 916                        mlog_errno(ret);
 917                if (mle->master != O2NM_MAX_NODES) {
 918                        /* found a master ! */
 919                        if (mle->master <= nodenum)
 920                                break;
 921                        /* if our master request has not reached the master
 922                         * yet, keep going until it does.  this is how the
 923                         * master will know that asserts are needed back to
 924                         * the lower nodes. */
 925                        mlog(0, "%s:%.*s: requests only up to %u but master "
 926                             "is %u, keep going\n", dlm->name, namelen,
 927                             lockid, nodenum, mle->master);
 928                }
 929        }
 930
 931wait:
 932        /* keep going until the response map includes all nodes */
 933        ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
 934        if (ret < 0) {
 935                wait_on_recovery = 1;
 936                mlog(0, "%s:%.*s: node map changed, redo the "
 937                     "master request now, blocked=%d\n",
 938                     dlm->name, res->lockname.len,
 939                     res->lockname.name, blocked);
 940                if (++tries > 20) {
 941                        mlog(ML_ERROR, "%s:%.*s: spinning on "
 942                             "dlm_wait_for_lock_mastery, blocked=%d\n", 
 943                             dlm->name, res->lockname.len, 
 944                             res->lockname.name, blocked);
 945                        dlm_print_one_lock_resource(res);
 946                        dlm_print_one_mle(mle);
 947                        tries = 0;
 948                }
 949                goto redo_request;
 950        }
 951
 952        mlog(0, "lockres mastered by %u\n", res->owner);
 953        /* make sure we never continue without this */
 954        BUG_ON(res->owner == O2NM_MAX_NODES);
 955
 956        /* master is known, detach if not already detached */
 957        dlm_mle_detach_hb_events(dlm, mle);
 958        dlm_put_mle(mle);
 959        /* put the extra ref */
 960        dlm_put_mle_inuse(mle);
 961
 962wake_waiters:
 963        spin_lock(&res->spinlock);
 964        if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)
 965                dlm_lockres_drop_inflight_ref(dlm, res);
 966        res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
 967        spin_unlock(&res->spinlock);
 968        wake_up(&res->wq);
 969
 970leave:
 971        /* need to free the unused mle */
 972        if (alloc_mle)
 973                kmem_cache_free(dlm_mle_cache, alloc_mle);
 974
 975        return res;
 976}
 977
 978
 979#define DLM_MASTERY_TIMEOUT_MS   5000
 980
 981static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
 982                                     struct dlm_lock_resource *res,
 983                                     struct dlm_master_list_entry *mle,
 984                                     int *blocked)
 985{
 986        u8 m;
 987        int ret, bit;
 988        int map_changed, voting_done;
 989        int assert, sleep;
 990
 991recheck:
 992        ret = 0;
 993        assert = 0;
 994
 995        /* check if another node has already become the owner */
 996        spin_lock(&res->spinlock);
 997        if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
 998                mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
 999                     res->lockname.len, res->lockname.name, res->owner);
1000                spin_unlock(&res->spinlock);
1001                /* this will cause the master to re-assert across
1002                 * the whole cluster, freeing up mles */
1003                if (res->owner != dlm->node_num) {
1004                        ret = dlm_do_master_request(res, mle, res->owner);
1005                        if (ret < 0) {
1006                                /* give recovery a chance to run */
1007                                mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
1008                                msleep(500);
1009                                goto recheck;
1010                        }
1011                }
1012                ret = 0;
1013                goto leave;
1014        }
1015        spin_unlock(&res->spinlock);
1016
1017        spin_lock(&mle->spinlock);
1018        m = mle->master;
1019        map_changed = (memcmp(mle->vote_map, mle->node_map,
1020                              sizeof(mle->vote_map)) != 0);
1021        voting_done = (memcmp(mle->vote_map, mle->response_map,
1022                             sizeof(mle->vote_map)) == 0);
1023
1024        /* restart if we hit any errors */
1025        if (map_changed) {
1026                int b;
1027                mlog(0, "%s: %.*s: node map changed, restarting\n",
1028                     dlm->name, res->lockname.len, res->lockname.name);
1029                ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
1030                b = (mle->type == DLM_MLE_BLOCK);
1031                if ((*blocked && !b) || (!*blocked && b)) {
1032                        mlog(0, "%s:%.*s: status change: old=%d new=%d\n", 
1033                             dlm->name, res->lockname.len, res->lockname.name,
1034                             *blocked, b);
1035                        *blocked = b;
1036                }
1037                spin_unlock(&mle->spinlock);
1038                if (ret < 0) {
1039                        mlog_errno(ret);
1040                        goto leave;
1041                }
1042                mlog(0, "%s:%.*s: restart lock mastery succeeded, "
1043                     "rechecking now\n", dlm->name, res->lockname.len,
1044                     res->lockname.name);
1045                goto recheck;
1046        } else {
1047                if (!voting_done) {
1048                        mlog(0, "map not changed and voting not done "
1049                             "for %s:%.*s\n", dlm->name, res->lockname.len,
1050                             res->lockname.name);
1051                }
1052        }
1053
1054        if (m != O2NM_MAX_NODES) {
1055                /* another node has done an assert!
1056                 * all done! */
1057                sleep = 0;
1058        } else {
1059                sleep = 1;
1060                /* have all nodes responded? */
1061                if (voting_done && !*blocked) {
1062                        bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
1063                        if (dlm->node_num <= bit) {
1064                                /* my node number is lowest.
1065                                 * now tell other nodes that I am
1066                                 * mastering this. */
1067                                mle->master = dlm->node_num;
1068                                /* ref was grabbed in get_lock_resource
1069                                 * will be dropped in dlmlock_master */
1070                                assert = 1;
1071                                sleep = 0;
1072                        }
1073                        /* if voting is done, but we have not received
1074                         * an assert master yet, we must sleep */
1075                }
1076        }
1077
1078        spin_unlock(&mle->spinlock);
1079
1080        /* sleep if we haven't finished voting yet */
1081        if (sleep) {
1082                unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
1083
1084                /*
1085                if (atomic_read(&mle->mle_refs.refcount) < 2)
1086                        mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
1087                        atomic_read(&mle->mle_refs.refcount),
1088                        res->lockname.len, res->lockname.name);
1089                */
1090                atomic_set(&mle->woken, 0);
1091                (void)wait_event_timeout(mle->wq,
1092                                         (atomic_read(&mle->woken) == 1),
1093                                         timeo);
1094                if (res->owner == O2NM_MAX_NODES) {
1095                        mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1096                             res->lockname.len, res->lockname.name);
1097                        goto recheck;
1098                }
1099                mlog(0, "done waiting, master is %u\n", res->owner);
1100                ret = 0;
1101                goto leave;
1102        }
1103
1104        ret = 0;   /* done */
1105        if (assert) {
1106                m = dlm->node_num;
1107                mlog(0, "about to master %.*s here, this=%u\n",
1108                     res->lockname.len, res->lockname.name, m);
1109                ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1110                if (ret) {
1111                        /* This is a failure in the network path,
1112                         * not in the response to the assert_master
1113                         * (any nonzero response is a BUG on this node).
1114                         * Most likely a socket just got disconnected
1115                         * due to node death. */
1116                        mlog_errno(ret);
1117                }
1118                /* no longer need to restart lock mastery.
1119                 * all living nodes have been contacted. */
1120                ret = 0;
1121        }
1122
1123        /* set the lockres owner */
1124        spin_lock(&res->spinlock);
1125        /* mastery reference obtained either during
1126         * assert_master_handler or in get_lock_resource */
1127        dlm_change_lockres_owner(dlm, res, m);
1128        spin_unlock(&res->spinlock);
1129
1130leave:
1131        return ret;
1132}
1133
1134struct dlm_bitmap_diff_iter
1135{
1136        int curnode;
1137        unsigned long *orig_bm;
1138        unsigned long *cur_bm;
1139        unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
1140};
1141
1142enum dlm_node_state_change
1143{
1144        NODE_DOWN = -1,
1145        NODE_NO_CHANGE = 0,
1146        NODE_UP
1147};
1148
1149static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
1150                                      unsigned long *orig_bm,
1151                                      unsigned long *cur_bm)
1152{
1153        unsigned long p1, p2;
1154        int i;
1155
1156        iter->curnode = -1;
1157        iter->orig_bm = orig_bm;
1158        iter->cur_bm = cur_bm;
1159
1160        for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1161                p1 = *(iter->orig_bm + i);
1162                p2 = *(iter->cur_bm + i);
1163                iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1164        }
1165}
1166
1167static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1168                                     enum dlm_node_state_change *state)
1169{
1170        int bit;
1171
1172        if (iter->curnode >= O2NM_MAX_NODES)
1173                return -ENOENT;
1174
1175        bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1176                            iter->curnode+1);
1177        if (bit >= O2NM_MAX_NODES) {
1178                iter->curnode = O2NM_MAX_NODES;
1179                return -ENOENT;
1180        }
1181
1182        /* if it was there in the original then this node died */
1183        if (test_bit(bit, iter->orig_bm))
1184                *state = NODE_DOWN;
1185        else
1186                *state = NODE_UP;
1187
1188        iter->curnode = bit;
1189        return bit;
1190}
1191
1192
1193static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1194                                    struct dlm_lock_resource *res,
1195                                    struct dlm_master_list_entry *mle,
1196                                    int blocked)
1197{
1198        struct dlm_bitmap_diff_iter bdi;
1199        enum dlm_node_state_change sc;
1200        int node;
1201        int ret = 0;
1202
1203        mlog(0, "something happened such that the "
1204             "master process may need to be restarted!\n");
1205
1206        assert_spin_locked(&mle->spinlock);
1207
1208        dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1209        node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1210        while (node >= 0) {
1211                if (sc == NODE_UP) {
1212                        /* a node came up.  clear any old vote from
1213                         * the response map and set it in the vote map
1214                         * then restart the mastery. */
1215                        mlog(ML_NOTICE, "node %d up while restarting\n", node);
1216
1217                        /* redo the master request, but only for the new node */
1218                        mlog(0, "sending request to new node\n");
1219                        clear_bit(node, mle->response_map);
1220                        set_bit(node, mle->vote_map);
1221                } else {
1222                        mlog(ML_ERROR, "node down! %d\n", node);
1223                        if (blocked) {
1224                                int lowest = find_next_bit(mle->maybe_map,
1225                                                       O2NM_MAX_NODES, 0);
1226
1227                                /* act like it was never there */
1228                                clear_bit(node, mle->maybe_map);
1229
1230                                if (node == lowest) {
1231                                        mlog(0, "expected master %u died"
1232                                            " while this node was blocked "
1233                                            "waiting on it!\n", node);
1234                                        lowest = find_next_bit(mle->maybe_map,
1235                                                        O2NM_MAX_NODES,
1236                                                        lowest+1);
1237                                        if (lowest < O2NM_MAX_NODES) {
1238                                                mlog(0, "%s:%.*s:still "
1239                                                     "blocked. waiting on %u "
1240                                                     "now\n", dlm->name,
1241                                                     res->lockname.len,
1242                                                     res->lockname.name,
1243                                                     lowest);
1244                                        } else {
1245                                                /* mle is an MLE_BLOCK, but
1246                                                 * there is now nothing left to
1247                                                 * block on.  we need to return
1248                                                 * all the way back out and try
1249                                                 * again with an MLE_MASTER.
1250                                                 * dlm_do_local_recovery_cleanup
1251                                                 * has already run, so the mle
1252                                                 * refcount is ok */
1253                                                mlog(0, "%s:%.*s: no "
1254                                                     "longer blocking. try to "
1255                                                     "master this here\n",
1256                                                     dlm->name,
1257                                                     res->lockname.len,
1258                                                     res->lockname.name);
1259                                                mle->type = DLM_MLE_MASTER;
1260                                                mle->mleres = res;
1261                                        }
1262                                }
1263                        }
1264
1265                        /* now blank out everything, as if we had never
1266                         * contacted anyone */
1267                        memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
1268                        memset(mle->response_map, 0, sizeof(mle->response_map));
1269                        /* reset the vote_map to the current node_map */
1270                        memcpy(mle->vote_map, mle->node_map,
1271                               sizeof(mle->node_map));
1272                        /* put myself into the maybe map */
1273                        if (mle->type != DLM_MLE_BLOCK)
1274                                set_bit(dlm->node_num, mle->maybe_map);
1275                }
1276                ret = -EAGAIN;
1277                node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1278        }
1279        return ret;
1280}
1281
1282
1283/*
1284 * DLM_MASTER_REQUEST_MSG
1285 *
1286 * returns: 0 on success,
1287 *          -errno on a network error
1288 *
1289 * on error, the caller should assume the target node is "dead"
1290 *
1291 */
1292
1293static int dlm_do_master_request(struct dlm_lock_resource *res,
1294                                 struct dlm_master_list_entry *mle, int to)
1295{
1296        struct dlm_ctxt *dlm = mle->dlm;
1297        struct dlm_master_request request;
1298        int ret, response=0, resend;
1299
1300        memset(&request, 0, sizeof(request));
1301        request.node_idx = dlm->node_num;
1302
1303        BUG_ON(mle->type == DLM_MLE_MIGRATION);
1304
1305        request.namelen = (u8)mle->mnamelen;
1306        memcpy(request.name, mle->mname, request.namelen);
1307
1308again:
1309        ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1310                                 sizeof(request), to, &response);
1311        if (ret < 0)  {
1312                if (ret == -ESRCH) {
1313                        /* should never happen */
1314                        mlog(ML_ERROR, "TCP stack not ready!\n");
1315                        BUG();
1316                } else if (ret == -EINVAL) {
1317                        mlog(ML_ERROR, "bad args passed to o2net!\n");
1318                        BUG();
1319                } else if (ret == -ENOMEM) {
1320                        mlog(ML_ERROR, "out of memory while trying to send "
1321                             "network message!  retrying\n");
1322                        /* this is totally crude */
1323                        msleep(50);
1324                        goto again;
1325                } else if (!dlm_is_host_down(ret)) {
1326                        /* not a network error. bad. */
1327                        mlog_errno(ret);
1328                        mlog(ML_ERROR, "unhandled error!");
1329                        BUG();
1330                }
1331                /* all other errors should be network errors,
1332                 * and likely indicate node death */
1333                mlog(ML_ERROR, "link to %d went down!\n", to);
1334                goto out;
1335        }
1336
1337        ret = 0;
1338        resend = 0;
1339        spin_lock(&mle->spinlock);
1340        switch (response) {
1341                case DLM_MASTER_RESP_YES:
1342                        set_bit(to, mle->response_map);
1343                        mlog(0, "node %u is the master, response=YES\n", to);
1344                        mlog(0, "%s:%.*s: master node %u now knows I have a "
1345                             "reference\n", dlm->name, res->lockname.len,
1346                             res->lockname.name, to);
1347                        mle->master = to;
1348                        break;
1349                case DLM_MASTER_RESP_NO:
1350                        mlog(0, "node %u not master, response=NO\n", to);
1351                        set_bit(to, mle->response_map);
1352                        break;
1353                case DLM_MASTER_RESP_MAYBE:
1354                        mlog(0, "node %u not master, response=MAYBE\n", to);
1355                        set_bit(to, mle->response_map);
1356                        set_bit(to, mle->maybe_map);
1357                        break;
1358                case DLM_MASTER_RESP_ERROR:
1359                        mlog(0, "node %u hit an error, resending\n", to);
1360                        resend = 1;
1361                        response = 0;
1362                        break;
1363                default:
1364                        mlog(ML_ERROR, "bad response! %u\n", response);
1365                        BUG();
1366        }
1367        spin_unlock(&mle->spinlock);
1368        if (resend) {
1369                /* this is also totally crude */
1370                msleep(50);
1371                goto again;
1372        }
1373
1374out:
1375        return ret;
1376}
1377
1378/*
1379 * locks that can be taken here:
1380 * dlm->spinlock
1381 * res->spinlock
1382 * mle->spinlock
1383 * dlm->master_list
1384 *
1385 * if possible, TRIM THIS DOWN!!!
1386 */
1387int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
1388                               void **ret_data)
1389{
1390        u8 response = DLM_MASTER_RESP_MAYBE;
1391        struct dlm_ctxt *dlm = data;
1392        struct dlm_lock_resource *res = NULL;
1393        struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1394        struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1395        char *name;
1396        unsigned int namelen, hash;
1397        int found, ret;
1398        int set_maybe;
1399        int dispatch_assert = 0;
1400
1401        if (!dlm_grab(dlm))
1402                return DLM_MASTER_RESP_NO;
1403
1404        if (!dlm_domain_fully_joined(dlm)) {
1405                response = DLM_MASTER_RESP_NO;
1406                goto send_response;
1407        }
1408
1409        name = request->name;
1410        namelen = request->namelen;
1411        hash = dlm_lockid_hash(name, namelen);
1412
1413        if (namelen > DLM_LOCKID_NAME_MAX) {
1414                response = DLM_IVBUFLEN;
1415                goto send_response;
1416        }
1417
1418way_up_top:
1419        spin_lock(&dlm->spinlock);
1420        res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1421        if (res) {
1422                spin_unlock(&dlm->spinlock);
1423
1424                /* take care of the easy cases up front */
1425                spin_lock(&res->spinlock);
1426                if (res->state & (DLM_LOCK_RES_RECOVERING|
1427                                  DLM_LOCK_RES_MIGRATING)) {
1428                        spin_unlock(&res->spinlock);
1429                        mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1430                             "being recovered/migrated\n");
1431                        response = DLM_MASTER_RESP_ERROR;
1432                        if (mle)
1433                                kmem_cache_free(dlm_mle_cache, mle);
1434                        goto send_response;
1435                }
1436
1437                if (res->owner == dlm->node_num) {
1438                        mlog(0, "%s:%.*s: setting bit %u in refmap\n",
1439                             dlm->name, namelen, name, request->node_idx);
1440                        dlm_lockres_set_refmap_bit(request->node_idx, res);
1441                        spin_unlock(&res->spinlock);
1442                        response = DLM_MASTER_RESP_YES;
1443                        if (mle)
1444                                kmem_cache_free(dlm_mle_cache, mle);
1445
1446                        /* this node is the owner.
1447                         * there is some extra work that needs to
1448                         * happen now.  the requesting node has
1449                         * caused all nodes up to this one to
1450                         * create mles.  this node now needs to
1451                         * go back and clean those up. */
1452                        dispatch_assert = 1;
1453                        goto send_response;
1454                } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1455                        spin_unlock(&res->spinlock);
1456                        // mlog(0, "node %u is the master\n", res->owner);
1457                        response = DLM_MASTER_RESP_NO;
1458                        if (mle)
1459                                kmem_cache_free(dlm_mle_cache, mle);
1460                        goto send_response;
1461                }
1462
1463                /* ok, there is no owner.  either this node is
1464                 * being blocked, or it is actively trying to
1465                 * master this lock. */
1466                if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1467                        mlog(ML_ERROR, "lock with no owner should be "
1468                             "in-progress!\n");
1469                        BUG();
1470                }
1471
1472                // mlog(0, "lockres is in progress...\n");
1473                spin_lock(&dlm->master_lock);
1474                found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1475                if (!found) {
1476                        mlog(ML_ERROR, "no mle found for this lock!\n");
1477                        BUG();
1478                }
1479                set_maybe = 1;
1480                spin_lock(&tmpmle->spinlock);
1481                if (tmpmle->type == DLM_MLE_BLOCK) {
1482                        // mlog(0, "this node is waiting for "
1483                        // "lockres to be mastered\n");
1484                        response = DLM_MASTER_RESP_NO;
1485                } else if (tmpmle->type == DLM_MLE_MIGRATION) {
1486                        mlog(0, "node %u is master, but trying to migrate to "
1487                             "node %u.\n", tmpmle->master, tmpmle->new_master);
1488                        if (tmpmle->master == dlm->node_num) {
1489                                mlog(ML_ERROR, "no owner on lockres, but this "
1490                                     "node is trying to migrate it to %u?!\n",
1491                                     tmpmle->new_master);
1492                                BUG();
1493                        } else {
1494                                /* the real master can respond on its own */
1495                                response = DLM_MASTER_RESP_NO;
1496                        }
1497                } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1498                        set_maybe = 0;
1499                        if (tmpmle->master == dlm->node_num) {
1500                                response = DLM_MASTER_RESP_YES;
1501                                /* this node will be the owner.
1502                                 * go back and clean the mles on any
1503                                 * other nodes */
1504                                dispatch_assert = 1;
1505                                dlm_lockres_set_refmap_bit(request->node_idx, res);
1506                                mlog(0, "%s:%.*s: setting bit %u in refmap\n",
1507                                     dlm->name, namelen, name,
1508                                     request->node_idx);
1509                        } else
1510                                response = DLM_MASTER_RESP_NO;
1511                } else {
1512                        // mlog(0, "this node is attempting to "
1513                        // "master lockres\n");
1514                        response = DLM_MASTER_RESP_MAYBE;
1515                }
1516                if (set_maybe)
1517                        set_bit(request->node_idx, tmpmle->maybe_map);
1518                spin_unlock(&tmpmle->spinlock);
1519
1520                spin_unlock(&dlm->master_lock);
1521                spin_unlock(&res->spinlock);
1522
1523                /* keep the mle attached to heartbeat events */
1524                dlm_put_mle(tmpmle);
1525                if (mle)
1526                        kmem_cache_free(dlm_mle_cache, mle);
1527                goto send_response;
1528        }
1529
1530        /*
1531         * lockres doesn't exist on this node
1532         * if there is an MLE_BLOCK, return NO
1533         * if there is an MLE_MASTER, return MAYBE
1534         * otherwise, add an MLE_BLOCK, return NO
1535         */
1536        spin_lock(&dlm->master_lock);
1537        found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1538        if (!found) {
1539                /* this lockid has never been seen on this node yet */
1540                // mlog(0, "no mle found\n");
1541                if (!mle) {
1542                        spin_unlock(&dlm->master_lock);
1543                        spin_unlock(&dlm->spinlock);
1544
1545                        mle = (struct dlm_master_list_entry *)
1546                                kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
1547                        if (!mle) {
1548                                response = DLM_MASTER_RESP_ERROR;
1549                                mlog_errno(-ENOMEM);
1550                                goto send_response;
1551                        }
1552                        goto way_up_top;
1553                }
1554
1555                // mlog(0, "this is second time thru, already allocated, "
1556                // "add the block.\n");
1557                dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1558                set_bit(request->node_idx, mle->maybe_map);
1559                __dlm_insert_mle(dlm, mle);
1560                response = DLM_MASTER_RESP_NO;
1561        } else {
1562                // mlog(0, "mle was found\n");
1563                set_maybe = 1;
1564                spin_lock(&tmpmle->spinlock);
1565                if (tmpmle->master == dlm->node_num) {
1566                        mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1567                        BUG();
1568                }
1569                if (tmpmle->type == DLM_MLE_BLOCK)
1570                        response = DLM_MASTER_RESP_NO;
1571                else if (tmpmle->type == DLM_MLE_MIGRATION) {
1572                        mlog(0, "migration mle was found (%u->%u)\n",
1573                             tmpmle->master, tmpmle->new_master);
1574                        /* real master can respond on its own */
1575                        response = DLM_MASTER_RESP_NO;
1576                } else
1577                        response = DLM_MASTER_RESP_MAYBE;
1578                if (set_maybe)
1579                        set_bit(request->node_idx, tmpmle->maybe_map);
1580                spin_unlock(&tmpmle->spinlock);
1581        }
1582        spin_unlock(&dlm->master_lock);
1583        spin_unlock(&dlm->spinlock);
1584
1585        if (found) {
1586                /* keep the mle attached to heartbeat events */
1587                dlm_put_mle(tmpmle);
1588        }
1589send_response:
1590        /*
1591         * __dlm_lookup_lockres() grabbed a reference to this lockres.
1592         * The reference is released by dlm_assert_master_worker() under
1593         * the call to dlm_dispatch_assert_master().  If
1594         * dlm_assert_master_worker() isn't called, we drop it here.
1595         */
1596        if (dispatch_assert) {
1597                if (response != DLM_MASTER_RESP_YES)
1598                        mlog(ML_ERROR, "invalid response %d\n", response);
1599                if (!res) {
1600                        mlog(ML_ERROR, "bad lockres while trying to assert!\n");
1601                        BUG();
1602                }
1603                mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1604                             dlm->node_num, res->lockname.len, res->lockname.name);
1605                ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx, 
1606                                                 DLM_ASSERT_MASTER_MLE_CLEANUP);
1607                if (ret < 0) {
1608                        mlog(ML_ERROR, "failed to dispatch assert master work\n");
1609                        response = DLM_MASTER_RESP_ERROR;
1610                        dlm_lockres_put(res);
1611                }
1612        } else {
1613                if (res)
1614                        dlm_lockres_put(res);
1615        }
1616
1617        dlm_put(dlm);
1618        return response;
1619}
1620
1621/*
1622 * DLM_ASSERT_MASTER_MSG
1623 */
1624
1625
1626/*
1627 * NOTE: this can be used for debugging
1628 * can periodically run all locks owned by this node
1629 * and re-assert across the cluster...
1630 */
1631static int dlm_do_assert_master(struct dlm_ctxt *dlm,
1632                                struct dlm_lock_resource *res,
1633                                void *nodemap, u32 flags)
1634{
1635        struct dlm_assert_master assert;
1636        int to, tmpret;
1637        struct dlm_node_iter iter;
1638        int ret = 0;
1639        int reassert;
1640        const char *lockname = res->lockname.name;
1641        unsigned int namelen = res->lockname.len;
1642
1643        BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1644
1645        spin_lock(&res->spinlock);
1646        res->state |= DLM_LOCK_RES_SETREF_INPROG;
1647        spin_unlock(&res->spinlock);
1648
1649again:
1650        reassert = 0;
1651
1652        /* note that if this nodemap is empty, it returns 0 */
1653        dlm_node_iter_init(nodemap, &iter);
1654        while ((to = dlm_node_iter_next(&iter)) >= 0) {
1655                int r = 0;
1656                struct dlm_master_list_entry *mle = NULL;
1657
1658                mlog(0, "sending assert master to %d (%.*s)\n", to,
1659                     namelen, lockname);
1660                memset(&assert, 0, sizeof(assert));
1661                assert.node_idx = dlm->node_num;
1662                assert.namelen = namelen;
1663                memcpy(assert.name, lockname, namelen);
1664                assert.flags = cpu_to_be32(flags);
1665
1666                tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1667                                            &assert, sizeof(assert), to, &r);
1668                if (tmpret < 0) {
1669                        mlog(0, "assert_master returned %d!\n", tmpret);
1670                        if (!dlm_is_host_down(tmpret)) {
1671                                mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
1672                                BUG();
1673                        }
1674                        /* a node died.  finish out the rest of the nodes. */
1675                        mlog(0, "link to %d went down!\n", to);
1676                        /* any nonzero status return will do */
1677                        ret = tmpret;
1678                        r = 0;
1679                } else if (r < 0) {
1680                        /* ok, something horribly messed.  kill thyself. */
1681                        mlog(ML_ERROR,"during assert master of %.*s to %u, "
1682                             "got %d.\n", namelen, lockname, to, r);
1683                        spin_lock(&dlm->spinlock);
1684                        spin_lock(&dlm->master_lock);
1685                        if (dlm_find_mle(dlm, &mle, (char *)lockname,
1686                                         namelen)) {
1687                                dlm_print_one_mle(mle);
1688                                __dlm_put_mle(mle);
1689                        }
1690                        spin_unlock(&dlm->master_lock);
1691                        spin_unlock(&dlm->spinlock);
1692                        BUG();
1693                }
1694
1695                if (r & DLM_ASSERT_RESPONSE_REASSERT &&
1696                    !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
1697                                mlog(ML_ERROR, "%.*s: very strange, "
1698                                     "master MLE but no lockres on %u\n",
1699                                     namelen, lockname, to);
1700                }
1701
1702                if (r & DLM_ASSERT_RESPONSE_REASSERT) {
1703                        mlog(0, "%.*s: node %u create mles on other "
1704                             "nodes and requests a re-assert\n", 
1705                             namelen, lockname, to);
1706                        reassert = 1;
1707                }
1708                if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
1709                        mlog(0, "%.*s: node %u has a reference to this "
1710                             "lockres, set the bit in the refmap\n",
1711                             namelen, lockname, to);
1712                        spin_lock(&res->spinlock);
1713                        dlm_lockres_set_refmap_bit(to, res);
1714                        spin_unlock(&res->spinlock);
1715                }
1716        }
1717
1718        if (reassert)
1719                goto again;
1720
1721        spin_lock(&res->spinlock);
1722        res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
1723        spin_unlock(&res->spinlock);
1724        wake_up(&res->wq);
1725
1726        return ret;
1727}
1728
1729/*
1730 * locks that can be taken here:
1731 * dlm->spinlock
1732 * res->spinlock
1733 * mle->spinlock
1734 * dlm->master_list
1735 *
1736 * if possible, TRIM THIS DOWN!!!
1737 */
1738int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
1739                              void **ret_data)
1740{
1741        struct dlm_ctxt *dlm = data;
1742        struct dlm_master_list_entry *mle = NULL;
1743        struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1744        struct dlm_lock_resource *res = NULL;
1745        char *name;
1746        unsigned int namelen, hash;
1747        u32 flags;
1748        int master_request = 0, have_lockres_ref = 0;
1749        int ret = 0;
1750
1751        if (!dlm_grab(dlm))
1752                return 0;
1753
1754        name = assert->name;
1755        namelen = assert->namelen;
1756        hash = dlm_lockid_hash(name, namelen);
1757        flags = be32_to_cpu(assert->flags);
1758
1759        if (namelen > DLM_LOCKID_NAME_MAX) {
1760                mlog(ML_ERROR, "Invalid name length!");
1761                goto done;
1762        }
1763
1764        spin_lock(&dlm->spinlock);
1765
1766        if (flags)
1767                mlog(0, "assert_master with flags: %u\n", flags);
1768
1769        /* find the MLE */
1770        spin_lock(&dlm->master_lock);
1771        if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1772                /* not an error, could be master just re-asserting */
1773                mlog(0, "just got an assert_master from %u, but no "
1774                     "MLE for it! (%.*s)\n", assert->node_idx,
1775                     namelen, name);
1776        } else {
1777                int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
1778                if (bit >= O2NM_MAX_NODES) {
1779                        /* not necessarily an error, though less likely.
1780                         * could be master just re-asserting. */
1781                        mlog(0, "no bits set in the maybe_map, but %u "
1782                             "is asserting! (%.*s)\n", assert->node_idx,
1783                             namelen, name);
1784                } else if (bit != assert->node_idx) {
1785                        if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1786                                mlog(0, "master %u was found, %u should "
1787                                     "back off\n", assert->node_idx, bit);
1788                        } else {
1789                                /* with the fix for bug 569, a higher node
1790                                 * number winning the mastery will respond
1791                                 * YES to mastery requests, but this node
1792                                 * had no way of knowing.  let it pass. */
1793                                mlog(0, "%u is the lowest node, "
1794                                     "%u is asserting. (%.*s)  %u must "
1795                                     "have begun after %u won.\n", bit,
1796                                     assert->node_idx, namelen, name, bit,
1797                                     assert->node_idx);
1798                        }
1799                }
1800                if (mle->type == DLM_MLE_MIGRATION) {
1801                        if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1802                                mlog(0, "%s:%.*s: got cleanup assert"
1803                                     " from %u for migration\n",
1804                                     dlm->name, namelen, name,
1805                                     assert->node_idx);
1806                        } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
1807                                mlog(0, "%s:%.*s: got unrelated assert"
1808                                     " from %u for migration, ignoring\n",
1809                                     dlm->name, namelen, name,
1810                                     assert->node_idx);
1811                                __dlm_put_mle(mle);
1812                                spin_unlock(&dlm->master_lock);
1813                                spin_unlock(&dlm->spinlock);
1814                                goto done;
1815                        }       
1816                }
1817        }
1818        spin_unlock(&dlm->master_lock);
1819
1820        /* ok everything checks out with the MLE
1821         * now check to see if there is a lockres */
1822        res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1823        if (res) {
1824                spin_lock(&res->spinlock);
1825                if (res->state & DLM_LOCK_RES_RECOVERING)  {
1826                        mlog(ML_ERROR, "%u asserting but %.*s is "
1827                             "RECOVERING!\n", assert->node_idx, namelen, name);
1828                        goto kill;
1829                }
1830                if (!mle) {
1831                        if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
1832                            res->owner != assert->node_idx) {
1833                                mlog(ML_ERROR, "DIE! Mastery assert from %u, "
1834                                     "but current owner is %u! (%.*s)\n",
1835                                     assert->node_idx, res->owner, namelen,
1836                                     name);
1837                                __dlm_print_one_lock_resource(res);
1838                                BUG();
1839                        }
1840                } else if (mle->type != DLM_MLE_MIGRATION) {
1841                        if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1842                                /* owner is just re-asserting */
1843                                if (res->owner == assert->node_idx) {
1844                                        mlog(0, "owner %u re-asserting on "
1845                                             "lock %.*s\n", assert->node_idx,
1846                                             namelen, name);
1847                                        goto ok;
1848                                }
1849                                mlog(ML_ERROR, "got assert_master from "
1850                                     "node %u, but %u is the owner! "
1851                                     "(%.*s)\n", assert->node_idx,
1852                                     res->owner, namelen, name);
1853                                goto kill;
1854                        }
1855                        if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1856                                mlog(ML_ERROR, "got assert from %u, but lock "
1857                                     "with no owner should be "
1858                                     "in-progress! (%.*s)\n",
1859                                     assert->node_idx,
1860                                     namelen, name);
1861                                goto kill;
1862                        }
1863                } else /* mle->type == DLM_MLE_MIGRATION */ {
1864                        /* should only be getting an assert from new master */
1865                        if (assert->node_idx != mle->new_master) {
1866                                mlog(ML_ERROR, "got assert from %u, but "
1867                                     "new master is %u, and old master "
1868                                     "was %u (%.*s)\n",
1869                                     assert->node_idx, mle->new_master,
1870                                     mle->master, namelen, name);
1871                                goto kill;
1872                        }
1873
1874                }
1875ok:
1876                spin_unlock(&res->spinlock);
1877        }
1878        spin_unlock(&dlm->spinlock);
1879
1880        // mlog(0, "woo!  got an assert_master from node %u!\n",
1881        //           assert->node_idx);
1882        if (mle) {
1883                int extra_ref = 0;
1884                int nn = -1;
1885                int rr, err = 0;
1886                
1887                spin_lock(&mle->spinlock);
1888                if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1889                        extra_ref = 1;
1890                else {
1891                        /* MASTER mle: if any bits set in the response map
1892                         * then the calling node needs to re-assert to clear
1893                         * up nodes that this node contacted */
1894                        while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES, 
1895                                                    nn+1)) < O2NM_MAX_NODES) {
1896                                if (nn != dlm->node_num && nn != assert->node_idx)
1897                                        master_request = 1;
1898                        }
1899                }
1900                mle->master = assert->node_idx;
1901                atomic_set(&mle->woken, 1);
1902                wake_up(&mle->wq);
1903                spin_unlock(&mle->spinlock);
1904
1905                if (res) {
1906                        int wake = 0;
1907                        spin_lock(&res->spinlock);
1908                        if (mle->type == DLM_MLE_MIGRATION) {
1909                                mlog(0, "finishing off migration of lockres %.*s, "
1910                                        "from %u to %u\n",
1911                                        res->lockname.len, res->lockname.name,
1912                                        dlm->node_num, mle->new_master);
1913                                res->state &= ~DLM_LOCK_RES_MIGRATING;
1914                                wake = 1;
1915                                dlm_change_lockres_owner(dlm, res, mle->new_master);
1916                                BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1917                        } else {
1918                                dlm_change_lockres_owner(dlm, res, mle->master);
1919                        }
1920                        spin_unlock(&res->spinlock);
1921                        have_lockres_ref = 1;
1922                        if (wake)
1923                                wake_up(&res->wq);
1924                }
1925
1926                /* master is known, detach if not already detached.
1927                 * ensures that only one assert_master call will happen
1928                 * on this mle. */
1929                spin_lock(&dlm->spinlock);
1930                spin_lock(&dlm->master_lock);
1931
1932                rr = atomic_read(&mle->mle_refs.refcount);
1933                if (mle->inuse > 0) {
1934                        if (extra_ref && rr < 3)
1935                                err = 1;
1936                        else if (!extra_ref && rr < 2)
1937                                err = 1;
1938                } else {
1939                        if (extra_ref && rr < 2)
1940                                err = 1;
1941                        else if (!extra_ref && rr < 1)
1942                                err = 1;
1943                }
1944                if (err) {
1945                        mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
1946                             "that will mess up this node, refs=%d, extra=%d, "
1947                             "inuse=%d\n", dlm->name, namelen, name,
1948                             assert->node_idx, rr, extra_ref, mle->inuse);
1949                        dlm_print_one_mle(mle);
1950                }
1951                __dlm_unlink_mle(dlm, mle);
1952                __dlm_mle_detach_hb_events(dlm, mle);
1953                __dlm_put_mle(mle);
1954                if (extra_ref) {
1955                        /* the assert master message now balances the extra
1956                         * ref given by the master / migration request message.
1957                         * if this is the last put, it will be removed
1958                         * from the list. */
1959                        __dlm_put_mle(mle);
1960                }
1961                spin_unlock(&dlm->master_lock);
1962                spin_unlock(&dlm->spinlock);
1963        } else if (res) {
1964                if (res->owner != assert->node_idx) {
1965                        mlog(0, "assert_master from %u, but current "
1966                             "owner is %u (%.*s), no mle\n", assert->node_idx,
1967                             res->owner, namelen, name);
1968                }
1969        }
1970
1971done:
1972        ret = 0;
1973        if (res) {
1974                spin_lock(&res->spinlock);
1975                res->state |= DLM_LOCK_RES_SETREF_INPROG;
1976                spin_unlock(&res->spinlock);
1977                *ret_data = (void *)res;
1978        }
1979        dlm_put(dlm);
1980        if (master_request) {
1981                mlog(0, "need to tell master to reassert\n");
1982                /* positive. negative would shoot down the node. */
1983                ret |= DLM_ASSERT_RESPONSE_REASSERT;
1984                if (!have_lockres_ref) {
1985                        mlog(ML_ERROR, "strange, got assert from %u, MASTER "
1986                             "mle present here for %s:%.*s, but no lockres!\n",
1987                             assert->node_idx, dlm->name, namelen, name);
1988                }
1989        }
1990        if (have_lockres_ref) {
1991                /* let the master know we have a reference to the lockres */
1992                ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
1993                mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
1994                     dlm->name, namelen, name, assert->node_idx);
1995        }
1996        return ret;
1997
1998kill:
1999        /* kill the caller! */
2000        mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
2001             "and killing the other node now!  This node is OK and can continue.\n");
2002        __dlm_print_one_lock_resource(res);
2003        spin_unlock(&res->spinlock);
2004        spin_unlock(&dlm->spinlock);
2005        *ret_data = (void *)res; 
2006        dlm_put(dlm);
2007        return -EINVAL;
2008}
2009
2010void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
2011{
2012        struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
2013
2014        if (ret_data) {
2015                spin_lock(&res->spinlock);
2016                res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
2017                spin_unlock(&res->spinlock);
2018                wake_up(&res->wq);
2019                dlm_lockres_put(res);
2020        }
2021        return;
2022}
2023
2024int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
2025                               struct dlm_lock_resource *res,
2026                               int ignore_higher, u8 request_from, u32 flags)
2027{
2028        struct dlm_work_item *item;
2029        item = kzalloc(sizeof(*item), GFP_NOFS);
2030        if (!item)
2031                return -ENOMEM;
2032
2033
2034        /* queue up work for dlm_assert_master_worker */
2035        dlm_grab(dlm);  /* get an extra ref for the work item */
2036        dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
2037        item->u.am.lockres = res; /* already have a ref */
2038        /* can optionally ignore node numbers higher than this node */
2039        item->u.am.ignore_higher = ignore_higher;
2040        item->u.am.request_from = request_from;
2041        item->u.am.flags = flags;
2042
2043        if (ignore_higher) 
2044                mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len, 
2045                     res->lockname.name);
2046                
2047        spin_lock(&dlm->work_lock);
2048        list_add_tail(&item->list, &dlm->work_list);
2049        spin_unlock(&dlm->work_lock);
2050
2051        queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2052        return 0;
2053}
2054
2055static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
2056{
2057        struct dlm_ctxt *dlm = data;
2058        int ret = 0;
2059        struct dlm_lock_resource *res;
2060        unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
2061        int ignore_higher;
2062        int bit;
2063        u8 request_from;
2064        u32 flags;
2065
2066        dlm = item->dlm;
2067        res = item->u.am.lockres;
2068        ignore_higher = item->u.am.ignore_higher;
2069        request_from = item->u.am.request_from;
2070        flags = item->u.am.flags;
2071
2072        spin_lock(&dlm->spinlock);
2073        memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
2074        spin_unlock(&dlm->spinlock);
2075
2076        clear_bit(dlm->node_num, nodemap);
2077        if (ignore_higher) {
2078                /* if is this just to clear up mles for nodes below
2079                 * this node, do not send the message to the original
2080                 * caller or any node number higher than this */
2081                clear_bit(request_from, nodemap);
2082                bit = dlm->node_num;
2083                while (1) {
2084                        bit = find_next_bit(nodemap, O2NM_MAX_NODES,
2085                                            bit+1);
2086                        if (bit >= O2NM_MAX_NODES)
2087                                break;
2088                        clear_bit(bit, nodemap);
2089                }
2090        }
2091
2092        /*
2093         * If we're migrating this lock to someone else, we are no
2094         * longer allowed to assert out own mastery.  OTOH, we need to
2095         * prevent migration from starting while we're still asserting
2096         * our dominance.  The reserved ast delays migration.
2097         */
2098        spin_lock(&res->spinlock);
2099        if (res->state & DLM_LOCK_RES_MIGRATING) {
2100                mlog(0, "Someone asked us to assert mastery, but we're "
2101                     "in the middle of migration.  Skipping assert, "
2102                     "the new master will handle that.\n");
2103                spin_unlock(&res->spinlock);
2104                goto put;
2105        } else
2106                __dlm_lockres_reserve_ast(res);
2107        spin_unlock(&res->spinlock);
2108
2109        /* this call now finishes out the nodemap
2110         * even if one or more nodes die */
2111        mlog(0, "worker about to master %.*s here, this=%u\n",
2112                     res->lockname.len, res->lockname.name, dlm->node_num);
2113        ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2114        if (ret < 0) {
2115                /* no need to restart, we are done */
2116                if (!dlm_is_host_down(ret))
2117                        mlog_errno(ret);
2118        }
2119
2120        /* Ok, we've asserted ourselves.  Let's let migration start. */
2121        dlm_lockres_release_ast(dlm, res);
2122
2123put:
2124        dlm_lockres_put(res);
2125
2126        mlog(0, "finished with dlm_assert_master_worker\n");
2127}
2128
2129/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
2130 * We cannot wait for node recovery to complete to begin mastering this
2131 * lockres because this lockres is used to kick off recovery! ;-)
2132 * So, do a pre-check on all living nodes to see if any of those nodes
2133 * think that $RECOVERY is currently mastered by a dead node.  If so,
2134 * we wait a short time to allow that node to get notified by its own
2135 * heartbeat stack, then check again.  All $RECOVERY lock resources
2136 * mastered by dead nodes are purged when the hearbeat callback is 
2137 * fired, so we can know for sure that it is safe to continue once
2138 * the node returns a live node or no node.  */
2139static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2140                                       struct dlm_lock_resource *res)
2141{
2142        struct dlm_node_iter iter;
2143        int nodenum;
2144        int ret = 0;
2145        u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
2146
2147        spin_lock(&dlm->spinlock);
2148        dlm_node_iter_init(dlm->domain_map, &iter);
2149        spin_unlock(&dlm->spinlock);
2150
2151        while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2152                /* do not send to self */
2153                if (nodenum == dlm->node_num)
2154                        continue;
2155                ret = dlm_do_master_requery(dlm, res, nodenum, &master);
2156                if (ret < 0) {
2157                        mlog_errno(ret);
2158                        if (!dlm_is_host_down(ret))
2159                                BUG();
2160                        /* host is down, so answer for that node would be
2161                         * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
2162                        ret = 0;
2163                }
2164
2165                if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
2166                        /* check to see if this master is in the recovery map */
2167                        spin_lock(&dlm->spinlock);
2168                        if (test_bit(master, dlm->recovery_map)) {
2169                                mlog(ML_NOTICE, "%s: node %u has not seen "
2170                                     "node %u go down yet, and thinks the "
2171                                     "dead node is mastering the recovery "
2172                                     "lock.  must wait.\n", dlm->name,
2173                                     nodenum, master);
2174                                ret = -EAGAIN;
2175                        }
2176                        spin_unlock(&dlm->spinlock);
2177                        mlog(0, "%s: reco lock master is %u\n", dlm->name, 
2178                             master);
2179                        break;
2180                }
2181        }
2182        return ret;
2183}
2184
2185/*
2186 * DLM_DEREF_LOCKRES_MSG
2187 */
2188
2189int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2190{
2191        struct dlm_deref_lockres deref;
2192        int ret = 0, r;
2193        const char *lockname;
2194        unsigned int namelen;
2195
2196        lockname = res->lockname.name;
2197        namelen = res->lockname.len;
2198        BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2199
2200        mlog(0, "%s:%.*s: sending deref to %d\n",
2201             dlm->name, namelen, lockname, res->owner);
2202        memset(&deref, 0, sizeof(deref));
2203        deref.node_idx = dlm->node_num;
2204        deref.namelen = namelen;
2205        memcpy(deref.name, lockname, namelen);
2206
2207        ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2208                                 &deref, sizeof(deref), res->owner, &r);
2209        if (ret < 0)
2210                mlog_errno(ret);
2211        else if (r < 0) {
2212                /* BAD.  other node says I did not have a ref. */
2213                mlog(ML_ERROR,"while dropping ref on %s:%.*s "
2214                    "(master=%u) got %d.\n", dlm->name, namelen,
2215                    lockname, res->owner, r);
2216                dlm_print_one_lock_resource(res);
2217                BUG();
2218        }
2219        return ret;
2220}
2221
2222int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
2223                              void **ret_data)
2224{
2225        struct dlm_ctxt *dlm = data;
2226        struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
2227        struct dlm_lock_resource *res = NULL;
2228        char *name;
2229        unsigned int namelen;
2230        int ret = -EINVAL;
2231        u8 node;
2232        unsigned int hash;
2233        struct dlm_work_item *item;
2234        int cleared = 0;
2235        int dispatch = 0;
2236
2237        if (!dlm_grab(dlm))
2238                return 0;
2239
2240        name = deref->name;
2241        namelen = deref->namelen;
2242        node = deref->node_idx;
2243
2244        if (namelen > DLM_LOCKID_NAME_MAX) {
2245                mlog(ML_ERROR, "Invalid name length!");
2246                goto done;
2247        }
2248        if (deref->node_idx >= O2NM_MAX_NODES) {
2249                mlog(ML_ERROR, "Invalid node number: %u\n", node);
2250                goto done;
2251        }
2252
2253        hash = dlm_lockid_hash(name, namelen);
2254
2255        spin_lock(&dlm->spinlock);
2256        res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2257        if (!res) {
2258                spin_unlock(&dlm->spinlock);
2259                mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2260                     dlm->name, namelen, name);
2261                goto done;
2262        }
2263        spin_unlock(&dlm->spinlock);
2264
2265        spin_lock(&res->spinlock);
2266        if (res->state & DLM_LOCK_RES_SETREF_INPROG)
2267                dispatch = 1;
2268        else {
2269                BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2270                if (test_bit(node, res->refmap)) {
2271                        dlm_lockres_clear_refmap_bit(node, res);
2272                        cleared = 1;
2273                }
2274        }
2275        spin_unlock(&res->spinlock);
2276
2277        if (!dispatch) {
2278                if (cleared)
2279                        dlm_lockres_calc_usage(dlm, res);
2280                else {
2281                        mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2282                        "but it is already dropped!\n", dlm->name,
2283                        res->lockname.len, res->lockname.name, node);
2284                        dlm_print_one_lock_resource(res);
2285                }
2286                ret = 0;
2287                goto done;
2288        }
2289
2290        item = kzalloc(sizeof(*item), GFP_NOFS);
2291        if (!item) {
2292                ret = -ENOMEM;
2293                mlog_errno(ret);
2294                goto done;
2295        }
2296
2297        dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
2298        item->u.dl.deref_res = res;
2299        item->u.dl.deref_node = node;
2300
2301        spin_lock(&dlm->work_lock);
2302        list_add_tail(&item->list, &dlm->work_list);
2303        spin_unlock(&dlm->work_lock);
2304
2305        queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2306        return 0;
2307
2308done:
2309        if (res)
2310                dlm_lockres_put(res);
2311        dlm_put(dlm);
2312
2313        return ret;
2314}
2315
2316static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
2317{
2318        struct dlm_ctxt *dlm;
2319        struct dlm_lock_resource *res;
2320        u8 node;
2321        u8 cleared = 0;
2322
2323        dlm = item->dlm;
2324        res = item->u.dl.deref_res;
2325        node = item->u.dl.deref_node;
2326
2327        spin_lock(&res->spinlock);
2328        BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2329        if (test_bit(node, res->refmap)) {
2330                __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
2331                dlm_lockres_clear_refmap_bit(node, res);
2332                cleared = 1;
2333        }
2334        spin_unlock(&res->spinlock);
2335
2336        if (cleared) {
2337                mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
2338                     dlm->name, res->lockname.len, res->lockname.name, node);
2339                dlm_lockres_calc_usage(dlm, res);
2340        } else {
2341                mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2342                     "but it is already dropped!\n", dlm->name,
2343                     res->lockname.len, res->lockname.name, node);
2344                dlm_print_one_lock_resource(res);
2345        }
2346
2347        dlm_lockres_put(res);
2348}
2349
2350/* Checks whether the lockres can be migrated. Returns 0 if yes, < 0
2351 * if not. If 0, numlocks is set to the number of locks in the lockres.
2352 */
2353static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm,
2354                                      struct dlm_lock_resource *res,
2355                                      int *numlocks)
2356{
2357        int ret;
2358        int i;
2359        int count = 0;
2360        struct list_head *queue;
2361        struct dlm_lock *lock;
2362
2363        assert_spin_locked(&res->spinlock);
2364
2365        ret = -EINVAL;
2366        if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
2367                mlog(0, "cannot migrate lockres with unknown owner!\n");
2368                goto leave;
2369        }
2370
2371        if (res->owner != dlm->node_num) {
2372                mlog(0, "cannot migrate lockres this node doesn't own!\n");
2373                goto leave;
2374        }
2375
2376        ret = 0;
2377        queue = &res->granted;
2378        for (i = 0; i < 3; i++) {
2379                list_for_each_entry(lock, queue, list) {
2380                        ++count;
2381                        if (lock->ml.node == dlm->node_num) {
2382                                mlog(0, "found a lock owned by this node still "
2383                                     "on the %s queue!  will not migrate this "
2384                                     "lockres\n", (i == 0 ? "granted" :
2385                                                   (i == 1 ? "converting" :
2386                                                    "blocked")));
2387                                ret = -ENOTEMPTY;
2388                                goto leave;
2389                        }
2390                }
2391                queue++;
2392        }
2393
2394        *numlocks = count;
2395        mlog(0, "migrateable lockres having %d locks\n", *numlocks);
2396
2397leave:
2398        return ret;
2399}
2400
2401/*
2402 * DLM_MIGRATE_LOCKRES
2403 */
2404
2405
2406static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
2407                               struct dlm_lock_resource *res,
2408                               u8 target)
2409{
2410        struct dlm_master_list_entry *mle = NULL;
2411        struct dlm_master_list_entry *oldmle = NULL;
2412        struct dlm_migratable_lockres *mres = NULL;
2413        int ret = 0;
2414        const char *name;
2415        unsigned int namelen;
2416        int mle_added = 0;
2417        int numlocks;
2418        int wake = 0;
2419
2420        if (!dlm_grab(dlm))
2421                return -EINVAL;
2422
2423        name = res->lockname.name;
2424        namelen = res->lockname.len;
2425
2426        mlog(0, "migrating %.*s to %u\n", namelen, name, target);
2427
2428        /*
2429         * ensure this lockres is a proper candidate for migration
2430         */
2431        spin_lock(&res->spinlock);
2432        ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
2433        if (ret < 0) {
2434                spin_unlock(&res->spinlock);
2435                goto leave;
2436        }
2437        spin_unlock(&res->spinlock);
2438
2439        /* no work to do */
2440        if (numlocks == 0) {
2441                mlog(0, "no locks were found on this lockres! done!\n");
2442                goto leave;
2443        }
2444
2445        /*
2446         * preallocate up front
2447         * if this fails, abort
2448         */
2449
2450        ret = -ENOMEM;
2451        mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
2452        if (!mres) {
2453                mlog_errno(ret);
2454                goto leave;
2455        }
2456
2457        mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
2458                                                                GFP_NOFS);
2459        if (!mle) {
2460                mlog_errno(ret);
2461                goto leave;
2462        }
2463        ret = 0;
2464
2465        /*
2466         * find a node to migrate the lockres to
2467         */
2468
2469        mlog(0, "picking a migration node\n");
2470        spin_lock(&dlm->spinlock);
2471        /* pick a new node */
2472        if (!test_bit(target, dlm->domain_map) ||
2473            target >= O2NM_MAX_NODES) {
2474                target = dlm_pick_migration_target(dlm, res);
2475        }
2476        mlog(0, "node %u chosen for migration\n", target);
2477
2478        if (target >= O2NM_MAX_NODES ||
2479            !test_bit(target, dlm->domain_map)) {
2480                /* target chosen is not alive */
2481                ret = -EINVAL;
2482        }
2483
2484        if (ret) {
2485                spin_unlock(&dlm->spinlock);
2486                goto fail;
2487        }
2488
2489        mlog(0, "continuing with target = %u\n", target);
2490
2491        /*
2492         * clear any existing master requests and
2493         * add the migration mle to the list
2494         */
2495        spin_lock(&dlm->master_lock);
2496        ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2497                                    namelen, target, dlm->node_num);
2498        spin_unlock(&dlm->master_lock);
2499        spin_unlock(&dlm->spinlock);
2500
2501        if (ret == -EEXIST) {
2502                mlog(0, "another process is already migrating it\n");
2503                goto fail;
2504        }
2505        mle_added = 1;
2506
2507        /*
2508         * set the MIGRATING flag and flush asts
2509         * if we fail after this we need to re-dirty the lockres
2510         */
2511        if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2512                mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
2513                     "the target went down.\n", res->lockname.len,
2514                     res->lockname.name, target);
2515                spin_lock(&res->spinlock);
2516                res->state &= ~DLM_LOCK_RES_MIGRATING;
2517                wake = 1;
2518                spin_unlock(&res->spinlock);
2519                ret = -EINVAL;
2520        }
2521
2522fail:
2523        if (oldmle) {
2524                /* master is known, detach if not already detached */
2525                dlm_mle_detach_hb_events(dlm, oldmle);
2526                dlm_put_mle(oldmle);
2527        }
2528
2529        if (ret < 0) {
2530                if (mle_added) {
2531                        dlm_mle_detach_hb_events(dlm, mle);
2532                        dlm_put_mle(mle);
2533                } else if (mle) {
2534                        kmem_cache_free(dlm_mle_cache, mle);
2535                }
2536                goto leave;
2537        }
2538
2539        /*
2540         * at this point, we have a migration target, an mle
2541         * in the master list, and the MIGRATING flag set on
2542         * the lockres
2543         */
2544
2545        /* now that remote nodes are spinning on the MIGRATING flag,
2546         * ensure that all assert_master work is flushed. */
2547        flush_workqueue(dlm->dlm_worker);
2548
2549        /* get an extra reference on the mle.
2550         * otherwise the assert_master from the new
2551         * master will destroy this.
2552         * also, make sure that all callers of dlm_get_mle
2553         * take both dlm->spinlock and dlm->master_lock */
2554        spin_lock(&dlm->spinlock);
2555        spin_lock(&dlm->master_lock);
2556        dlm_get_mle_inuse(mle);
2557        spin_unlock(&dlm->master_lock);
2558        spin_unlock(&dlm->spinlock);
2559
2560        /* notify new node and send all lock state */
2561        /* call send_one_lockres with migration flag.
2562         * this serves as notice to the target node that a
2563         * migration is starting. */
2564        ret = dlm_send_one_lockres(dlm, res, mres, target,
2565                                   DLM_MRES_MIGRATION);
2566
2567        if (ret < 0) {
2568                mlog(0, "migration to node %u failed with %d\n",
2569                     target, ret);
2570                /* migration failed, detach and clean up mle */
2571                dlm_mle_detach_hb_events(dlm, mle);
2572                dlm_put_mle(mle);
2573                dlm_put_mle_inuse(mle);
2574                spin_lock(&res->spinlock);
2575                res->state &= ~DLM_LOCK_RES_MIGRATING;
2576                wake = 1;
2577                spin_unlock(&res->spinlock);
2578                goto leave;
2579        }
2580
2581        /* at this point, the target sends a message to all nodes,
2582         * (using dlm_do_migrate_request).  this node is skipped since
2583         * we had to put an mle in the list to begin the process.  this
2584         * node now waits for target to do an assert master.  this node
2585         * will be the last one notified, ensuring that the migration
2586         * is complete everywhere.  if the target dies while this is
2587         * going on, some nodes could potentially see the target as the
2588         * master, so it is important that my recovery finds the migration
2589         * mle and sets the master to UNKNONWN. */
2590
2591
2592        /* wait for new node to assert master */
2593        while (1) {
2594                ret = wait_event_interruptible_timeout(mle->wq,
2595                                        (atomic_read(&mle->woken) == 1),
2596                                        msecs_to_jiffies(5000));
2597
2598                if (ret >= 0) {
2599                        if (atomic_read(&mle->woken) == 1 ||
2600                            res->owner == target)
2601                                break;
2602
2603                        mlog(0, "%s:%.*s: timed out during migration\n",
2604                             dlm->name, res->lockname.len, res->lockname.name);
2605                        /* avoid hang during shutdown when migrating lockres 
2606                         * to a node which also goes down */
2607                        if (dlm_is_node_dead(dlm, target)) {
2608                                mlog(0, "%s:%.*s: expected migration "
2609                                     "target %u is no longer up, restarting\n",
2610                                     dlm->name, res->lockname.len,
2611                                     res->lockname.name, target);
2612                                ret = -EINVAL;
2613                                /* migration failed, detach and clean up mle */
2614                                dlm_mle_detach_hb_events(dlm, mle);
2615                                dlm_put_mle(mle);
2616                                dlm_put_mle_inuse(mle);
2617                                spin_lock(&res->spinlock);
2618                                res->state &= ~DLM_LOCK_RES_MIGRATING;
2619                                wake = 1;
2620                                spin_unlock(&res->spinlock);
2621                                goto leave;
2622                        }
2623                } else
2624                        mlog(0, "%s:%.*s: caught signal during migration\n",
2625                             dlm->name, res->lockname.len, res->lockname.name);
2626        }
2627
2628        /* all done, set the owner, clear the flag */
2629        spin_lock(&res->spinlock);
2630        dlm_set_lockres_owner(dlm, res, target);
2631        res->state &= ~DLM_LOCK_RES_MIGRATING;
2632        dlm_remove_nonlocal_locks(dlm, res);
2633        spin_unlock(&res->spinlock);
2634        wake_up(&res->wq);
2635
2636        /* master is known, detach if not already detached */
2637        dlm_mle_detach_hb_events(dlm, mle);
2638        dlm_put_mle_inuse(mle);
2639        ret = 0;
2640
2641        dlm_lockres_calc_usage(dlm, res);
2642
2643leave:
2644        /* re-dirty the lockres if we failed */
2645        if (ret < 0)
2646                dlm_kick_thread(dlm, res);
2647
2648        /* wake up waiters if the MIGRATING flag got set
2649         * but migration failed */
2650        if (wake)
2651                wake_up(&res->wq);
2652
2653        /* TODO: cleanup */
2654        if (mres)
2655                free_page((unsigned long)mres);
2656
2657        dlm_put(dlm);
2658
2659        mlog(0, "returning %d\n", ret);
2660        return ret;
2661}
2662
2663#define DLM_MIGRATION_RETRY_MS  100
2664
2665/* Should be called only after beginning the domain leave process.
2666 * There should not be any remaining locks on nonlocal lock resources,
2667 * and there should be no local locks left on locally mastered resources.
2668 *
2669 * Called with the dlm spinlock held, may drop it to do migration, but
2670 * will re-acquire before exit.
2671 *
2672 * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */
2673int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2674{
2675        int ret;
2676        int lock_dropped = 0;
2677        int numlocks;
2678
2679        spin_lock(&res->spinlock);
2680        if (res->owner != dlm->node_num) {
2681                if (!__dlm_lockres_unused(res)) {
2682                        mlog(ML_ERROR, "%s:%.*s: this node is not master, "
2683                             "trying to free this but locks remain\n",
2684                             dlm->name, res->lockname.len, res->lockname.name);
2685                }
2686                spin_unlock(&res->spinlock);
2687                goto leave;
2688        }
2689
2690        /* No need to migrate a lockres having no locks */
2691        ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
2692        if (ret >= 0 && numlocks == 0) {
2693                spin_unlock(&res->spinlock);
2694                goto leave;
2695        }
2696        spin_unlock(&res->spinlock);
2697
2698        /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
2699        spin_unlock(&dlm->spinlock);
2700        lock_dropped = 1;
2701        while (1) {
2702                ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES);
2703                if (ret >= 0)
2704                        break;
2705                if (ret == -ENOTEMPTY) {
2706                        mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
2707                                res->lockname.len, res->lockname.name);
2708                        BUG();
2709                }
2710
2711                mlog(0, "lockres %.*s: migrate failed, "
2712                     "retrying\n", res->lockname.len,
2713                     res->lockname.name);
2714                msleep(DLM_MIGRATION_RETRY_MS);
2715        }
2716        spin_lock(&dlm->spinlock);
2717leave:
2718        return lock_dropped;
2719}
2720
2721int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2722{
2723        int ret;
2724        spin_lock(&dlm->ast_lock);
2725        spin_lock(&lock->spinlock);
2726        ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2727        spin_unlock(&lock->spinlock);
2728        spin_unlock(&dlm->ast_lock);
2729        return ret;
2730}
2731
2732static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2733                                     struct dlm_lock_resource *res,
2734                                     u8 mig_target)
2735{
2736        int can_proceed;
2737        spin_lock(&res->spinlock);
2738        can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2739        spin_unlock(&res->spinlock);
2740
2741        /* target has died, so make the caller break out of the 
2742         * wait_event, but caller must recheck the domain_map */
2743        spin_lock(&dlm->spinlock);
2744        if (!test_bit(mig_target, dlm->domain_map))
2745                can_proceed = 1;
2746        spin_unlock(&dlm->spinlock);
2747        return can_proceed;
2748}
2749
2750static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
2751                                struct dlm_lock_resource *res)
2752{
2753        int ret;
2754        spin_lock(&res->spinlock);
2755        ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2756        spin_unlock(&res->spinlock);
2757        return ret;
2758}
2759
2760
2761static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2762                                       struct dlm_lock_resource *res,
2763                                       u8 target)
2764{
2765        int ret = 0;
2766
2767        mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2768               res->lockname.len, res->lockname.name, dlm->node_num,
2769               target);
2770        /* need to set MIGRATING flag on lockres.  this is done by
2771         * ensuring that all asts have been flushed for this lockres. */
2772        spin_lock(&res->spinlock);
2773        BUG_ON(res->migration_pending);
2774        res->migration_pending = 1;
2775        /* strategy is to reserve an extra ast then release
2776         * it below, letting the release do all of the work */
2777        __dlm_lockres_reserve_ast(res);
2778        spin_unlock(&res->spinlock);
2779
2780        /* now flush all the pending asts */
2781        dlm_kick_thread(dlm, res);
2782        /* before waiting on DIRTY, block processes which may
2783         * try to dirty the lockres before MIGRATING is set */
2784        spin_lock(&res->spinlock);
2785        BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
2786        res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
2787        spin_unlock(&res->spinlock);
2788        /* now wait on any pending asts and the DIRTY state */
2789        wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2790        dlm_lockres_release_ast(dlm, res);
2791
2792        mlog(0, "about to wait on migration_wq, dirty=%s\n",
2793               res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2794        /* if the extra ref we just put was the final one, this
2795         * will pass thru immediately.  otherwise, we need to wait
2796         * for the last ast to finish. */
2797again:
2798        ret = wait_event_interruptible_timeout(dlm->migration_wq,
2799                   dlm_migration_can_proceed(dlm, res, target),
2800                   msecs_to_jiffies(1000));
2801        if (ret < 0) {
2802                mlog(0, "woken again: migrating? %s, dead? %s\n",
2803                       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2804                       test_bit(target, dlm->domain_map) ? "no":"yes");
2805        } else {
2806                mlog(0, "all is well: migrating? %s, dead? %s\n",
2807                       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2808                       test_bit(target, dlm->domain_map) ? "no":"yes");
2809        }
2810        if (!dlm_migration_can_proceed(dlm, res, target)) {
2811                mlog(0, "trying again...\n");
2812                goto again;
2813        }
2814        /* now that we are sure the MIGRATING state is there, drop
2815         * the unneded state which blocked threads trying to DIRTY */
2816        spin_lock(&res->spinlock);
2817        BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
2818        BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
2819        res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
2820        spin_unlock(&res->spinlock);
2821
2822        /* did the target go down or die? */
2823        spin_lock(&dlm->spinlock);
2824        if (!test_bit(target, dlm->domain_map)) {
2825                mlog(ML_ERROR, "aha. migration target %u just went down\n",
2826                     target);
2827                ret = -EHOSTDOWN;
2828        }
2829        spin_unlock(&dlm->spinlock);
2830
2831        /*
2832         * at this point:
2833         *
2834         *   o the DLM_LOCK_RES_MIGRATING flag is set
2835         *   o there are no pending asts on this lockres
2836         *   o all processes trying to reserve an ast on this
2837         *     lockres must wait for the MIGRATING flag to clear
2838         */
2839        return ret;
2840}
2841
2842/* last step in the migration process.
2843 * original master calls this to free all of the dlm_lock
2844 * structures that used to be for other nodes. */
2845static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2846                                      struct dlm_lock_resource *res)
2847{
2848        struct list_head *queue = &res->granted;
2849        int i, bit;
2850        struct dlm_lock *lock, *next;
2851
2852        assert_spin_locked(&res->spinlock);
2853
2854        BUG_ON(res->owner == dlm->node_num);
2855
2856        for (i=0; i<3; i++) {
2857                list_for_each_entry_safe(lock, next, queue, list) {
2858                        if (lock->ml.node != dlm->node_num) {
2859                                mlog(0, "putting lock for node %u\n",
2860                                     lock->ml.node);
2861                                /* be extra careful */
2862                                BUG_ON(!list_empty(&lock->ast_list));
2863                                BUG_ON(!list_empty(&lock->bast_list));
2864                                BUG_ON(lock->ast_pending);
2865                                BUG_ON(lock->bast_pending);
2866                                dlm_lockres_clear_refmap_bit(lock->ml.node, res);
2867                                list_del_init(&lock->list);
2868                                dlm_lock_put(lock);
2869                                /* In a normal unlock, we would have added a
2870                                 * DLM_UNLOCK_FREE_LOCK action. Force it. */
2871                                dlm_lock_put(lock);
2872                        }
2873                }
2874                queue++;
2875        }
2876        bit = 0;
2877        while (1) {
2878                bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
2879                if (bit >= O2NM_MAX_NODES)
2880                        break;
2881                /* do not clear the local node reference, if there is a
2882                 * process holding this, let it drop the ref itself */
2883                if (bit != dlm->node_num) {
2884                        mlog(0, "%s:%.*s: node %u had a ref to this "
2885                             "migrating lockres, clearing\n", dlm->name,
2886                             res->lockname.len, res->lockname.name, bit);
2887                        dlm_lockres_clear_refmap_bit(bit, res);
2888                }
2889                bit++;
2890        }
2891}
2892
2893/* for now this is not too intelligent.  we will
2894 * need stats to make this do the right thing.
2895 * this just finds the first lock on one of the
2896 * queues and uses that node as the target. */
2897static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2898                                    struct dlm_lock_resource *res)
2899{
2900        int i;
2901        struct list_head *queue = &res->granted;
2902        struct dlm_lock *lock;
2903        int nodenum;
2904
2905        assert_spin_locked(&dlm->spinlock);
2906
2907        spin_lock(&res->spinlock);
2908        for (i=0; i<3; i++) {
2909                list_for_each_entry(lock, queue, list) {
2910                        /* up to the caller to make sure this node
2911                         * is alive */
2912                        if (lock->ml.node != dlm->node_num) {
2913                                spin_unlock(&res->spinlock);
2914                                return lock->ml.node;
2915                        }
2916                }
2917                queue++;
2918        }
2919        spin_unlock(&res->spinlock);
2920        mlog(0, "have not found a suitable target yet! checking domain map\n");
2921
2922        /* ok now we're getting desperate.  pick anyone alive. */
2923        nodenum = -1;
2924        while (1) {
2925                nodenum = find_next_bit(dlm->domain_map,
2926                                        O2NM_MAX_NODES, nodenum+1);
2927                mlog(0, "found %d in domain map\n", nodenum);
2928                if (nodenum >= O2NM_MAX_NODES)
2929                        break;
2930                if (nodenum != dlm->node_num) {
2931                        mlog(0, "picking %d\n", nodenum);
2932                        return nodenum;
2933                }
2934        }
2935
2936        mlog(0, "giving up.  no master to migrate to\n");
2937        return DLM_LOCK_RES_OWNER_UNKNOWN;
2938}
2939
2940
2941
2942/* this is called by the new master once all lockres
2943 * data has been received */
2944static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2945                                  struct dlm_lock_resource *res,
2946                                  u8 master, u8 new_master,
2947                                  struct dlm_node_iter *iter)
2948{
2949        struct dlm_migrate_request migrate;
2950        int ret, skip, status = 0;
2951        int nodenum;
2952
2953        memset(&migrate, 0, sizeof(migrate));
2954        migrate.namelen = res->lockname.len;
2955        memcpy(migrate.name, res->lockname.name, migrate.namelen);
2956        migrate.new_master = new_master;
2957        migrate.master = master;
2958
2959        ret = 0;
2960
2961        /* send message to all nodes, except the master and myself */
2962        while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
2963                if (nodenum == master ||
2964                    nodenum == new_master)
2965                        continue;
2966
2967                /* We could race exit domain. If exited, skip. */
2968                spin_lock(&dlm->spinlock);
2969                skip = (!test_bit(nodenum, dlm->domain_map));
2970                spin_unlock(&dlm->spinlock);
2971                if (skip) {
2972                        clear_bit(nodenum, iter->node_map);
2973                        continue;
2974                }
2975
2976                ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
2977                                         &migrate, sizeof(migrate), nodenum,
2978                                         &status);
2979                if (ret < 0) {
2980                        mlog(0, "migrate_request returned %d!\n", ret);
2981                        if (!dlm_is_host_down(ret)) {
2982                                mlog(ML_ERROR, "unhandled error=%d!\n", ret);
2983                                BUG();
2984                        }
2985                        clear_bit(nodenum, iter->node_map);
2986                        ret = 0;
2987                } else if (status < 0) {
2988                        mlog(0, "migrate request (node %u) returned %d!\n",
2989                             nodenum, status);
2990                        ret = status;
2991                } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
2992                        /* during the migration request we short-circuited
2993                         * the mastery of the lockres.  make sure we have
2994                         * a mastery ref for nodenum */
2995                        mlog(0, "%s:%.*s: need ref for node %u\n",
2996                             dlm->name, res->lockname.len, res->lockname.name,
2997                             nodenum);
2998                        spin_lock(&res->spinlock);
2999                        dlm_lockres_set_refmap_bit(nodenum, res);
3000                        spin_unlock(&res->spinlock);
3001                }
3002        }
3003
3004        if (ret < 0)
3005                mlog_errno(ret);
3006
3007        mlog(0, "returning ret=%d\n", ret);
3008        return ret;
3009}
3010
3011
3012/* if there is an existing mle for this lockres, we now know who the master is.
3013 * (the one who sent us *this* message) we can clear it up right away.
3014 * since the process that put the mle on the list still has a reference to it,
3015 * we can unhash it now, set the master and wake the process.  as a result,
3016 * we will have no mle in the list to start with.  now we can add an mle for
3017 * the migration and this should be the only one found for those scanning the
3018 * list.  */
3019int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
3020                                void **ret_data)
3021{
3022        struct dlm_ctxt *dlm = data;
3023        struct dlm_lock_resource *res = NULL;
3024        struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
3025        struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
3026        const char *name;
3027        unsigned int namelen, hash;
3028        int ret = 0;
3029
3030        if (!dlm_grab(dlm))
3031                return -EINVAL;
3032
3033        name = migrate->name;
3034        namelen = migrate->namelen;
3035        hash = dlm_lockid_hash(name, namelen);
3036
3037        /* preallocate.. if this fails, abort */
3038        mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
3039                                                         GFP_NOFS);
3040
3041        if (!mle) {
3042                ret = -ENOMEM;
3043                goto leave;
3044        }
3045
3046        /* check for pre-existing lock */
3047        spin_lock(&dlm->spinlock);
3048        res = __dlm_lookup_lockres(dlm, name, namelen, hash);
3049        spin_lock(&dlm->master_lock);
3050
3051        if (res) {
3052                spin_lock(&res->spinlock);
3053                if (res->state & DLM_LOCK_RES_RECOVERING) {
3054                        /* if all is working ok, this can only mean that we got
3055                        * a migrate request from a node that we now see as
3056                        * dead.  what can we do here?  drop it to the floor? */
3057                        spin_unlock(&res->spinlock);
3058                        mlog(ML_ERROR, "Got a migrate request, but the "
3059                             "lockres is marked as recovering!");
3060                        kmem_cache_free(dlm_mle_cache, mle);
3061                        ret = -EINVAL; /* need a better solution */
3062                        goto unlock;
3063                }
3064                res->state |= DLM_LOCK_RES_MIGRATING;
3065                spin_unlock(&res->spinlock);
3066        }
3067
3068        /* ignore status.  only nonzero status would BUG. */
3069        ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
3070                                    name, namelen,
3071                                    migrate->new_master,
3072                                    migrate->master);
3073
3074unlock:
3075        spin_unlock(&dlm->master_lock);
3076        spin_unlock(&dlm->spinlock);
3077
3078        if (oldmle) {
3079                /* master is known, detach if not already detached */
3080                dlm_mle_detach_hb_events(dlm, oldmle);
3081                dlm_put_mle(oldmle);
3082        }
3083
3084        if (res)
3085                dlm_lockres_put(res);
3086leave:
3087        dlm_put(dlm);
3088        return ret;
3089}
3090
3091/* must be holding dlm->spinlock and dlm->master_lock
3092 * when adding a migration mle, we can clear any other mles
3093 * in the master list because we know with certainty that
3094 * the master is "master".  so we remove any old mle from
3095 * the list after setting it's master field, and then add
3096 * the new migration mle.  this way we can hold with the rule
3097 * of having only one mle for a given lock name at all times. */
3098static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
3099                                 struct dlm_lock_resource *res,
3100                                 struct dlm_master_list_entry *mle,
3101                                 struct dlm_master_list_entry **oldmle,
3102                                 const char *name, unsigned int namelen,
3103                                 u8 new_master, u8 master)
3104{
3105        int found;
3106        int ret = 0;
3107
3108        *oldmle = NULL;
3109
3110        mlog_entry_void();
3111
3112        assert_spin_locked(&dlm->spinlock);
3113        assert_spin_locked(&dlm->master_lock);
3114
3115        /* caller is responsible for any ref taken here on oldmle */
3116        found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
3117        if (found) {
3118                struct dlm_master_list_entry *tmp = *oldmle;
3119                spin_lock(&tmp->spinlock);
3120                if (tmp->type == DLM_MLE_MIGRATION) {
3121                        if (master == dlm->node_num) {
3122                                /* ah another process raced me to it */
3123                                mlog(0, "tried to migrate %.*s, but some "
3124                                     "process beat me to it\n",
3125                                     namelen, name);
3126                                ret = -EEXIST;
3127                        } else {
3128                                /* bad.  2 NODES are trying to migrate! */
3129                                mlog(ML_ERROR, "migration error  mle: "
3130                                     "master=%u new_master=%u // request: "
3131                                     "master=%u new_master=%u // "
3132                                     "lockres=%.*s\n",
3133                                     tmp->master, tmp->new_master,
3134                                     master, new_master,
3135                                     namelen, name);
3136                                BUG();
3137                        }
3138                } else {
3139                        /* this is essentially what assert_master does */
3140                        tmp->master = master;
3141                        atomic_set(&tmp->woken, 1);
3142                        wake_up(&tmp->wq);
3143                        /* remove it so that only one mle will be found */
3144                        __dlm_unlink_mle(dlm, tmp);
3145                        __dlm_mle_detach_hb_events(dlm, tmp);
3146                        ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
3147                        mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
3148                            "telling master to get ref for cleared out mle "
3149                            "during migration\n", dlm->name, namelen, name,
3150                            master, new_master);
3151                }
3152                spin_unlock(&tmp->spinlock);
3153        }
3154
3155        /* now add a migration mle to the tail of the list */
3156        dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
3157        mle->new_master = new_master;
3158        /* the new master will be sending an assert master for this.
3159         * at that point we will get the refmap reference */
3160        mle->master = master;
3161        /* do this for consistency with other mle types */
3162        set_bit(new_master, mle->maybe_map);
3163        __dlm_insert_mle(dlm, mle);
3164
3165        return ret;
3166}
3167
3168/*
3169 * Sets the owner of the lockres, associated to the mle, to UNKNOWN
3170 */
3171static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
3172                                        struct dlm_master_list_entry *mle)
3173{
3174        struct dlm_lock_resource *res;
3175
3176        /* Find the lockres associated to the mle and set its owner to UNK */
3177        res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen,
3178                                   mle->mnamehash);
3179        if (res) {
3180                spin_unlock(&dlm->master_lock);
3181
3182                /* move lockres onto recovery list */
3183                spin_lock(&res->spinlock);
3184                dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
3185                dlm_move_lockres_to_recovery_list(dlm, res);
3186                spin_unlock(&res->spinlock);
3187                dlm_lockres_put(res);
3188
3189                /* about to get rid of mle, detach from heartbeat */
3190                __dlm_mle_detach_hb_events(dlm, mle);
3191
3192                /* dump the mle */
3193                spin_lock(&dlm->master_lock);
3194                __dlm_put_mle(mle);
3195                spin_unlock(&dlm->master_lock);
3196        }
3197
3198        return res;
3199}
3200
3201static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
3202                                    struct dlm_master_list_entry *mle)
3203{
3204        __dlm_mle_detach_hb_events(dlm, mle);
3205
3206        spin_lock(&mle->spinlock);
3207        __dlm_unlink_mle(dlm, mle);
3208        atomic_set(&mle->woken, 1);
3209        spin_unlock(&mle->spinlock);
3210
3211        wake_up(&mle->wq);
3212}
3213
3214static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
3215                                struct dlm_master_list_entry *mle, u8 dead_node)
3216{
3217        int bit;
3218
3219        BUG_ON(mle->type != DLM_MLE_BLOCK);
3220
3221        spin_lock(&mle->spinlock);
3222        bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
3223        if (bit != dead_node) {
3224                mlog(0, "mle found, but dead node %u would not have been "
3225                     "master\n", dead_node);
3226                spin_unlock(&mle->spinlock);
3227        } else {
3228                /* Must drop the refcount by one since the assert_master will
3229                 * never arrive. This may result in the mle being unlinked and
3230                 * freed, but there may still be a process waiting in the
3231                 * dlmlock path which is fine. */
3232                mlog(0, "node %u was expected master\n", dead_node);
3233                atomic_set(&mle->woken, 1);
3234                spin_unlock(&mle->spinlock);
3235                wake_up(&mle->wq);
3236
3237                /* Do not need events any longer, so detach from heartbeat */
3238                __dlm_mle_detach_hb_events(dlm, mle);
3239                __dlm_put_mle(mle);
3240        }
3241}
3242
3243void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
3244{
3245        struct dlm_master_list_entry *mle;
3246        struct dlm_lock_resource *res;
3247        struct hlist_head *bucket;
3248        struct hlist_node *list;
3249        unsigned int i;
3250
3251        mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
3252top:
3253        assert_spin_locked(&dlm->spinlock);
3254
3255        /* clean the master list */
3256        spin_lock(&dlm->master_lock);
3257        for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3258                bucket = dlm_master_hash(dlm, i);
3259                hlist_for_each(list, bucket) {
3260                        mle = hlist_entry(list, struct dlm_master_list_entry,
3261                                          master_hash_node);
3262
3263                        BUG_ON(mle->type != DLM_MLE_BLOCK &&
3264                               mle->type != DLM_MLE_MASTER &&
3265                               mle->type != DLM_MLE_MIGRATION);
3266
3267                        /* MASTER mles are initiated locally. The waiting
3268                         * process will notice the node map change shortly.
3269                         * Let that happen as normal. */
3270                        if (mle->type == DLM_MLE_MASTER)
3271                                continue;
3272
3273                        /* BLOCK mles are initiated by other nodes. Need to
3274                         * clean up if the dead node would have been the
3275                         * master. */
3276                        if (mle->type == DLM_MLE_BLOCK) {
3277                                dlm_clean_block_mle(dlm, mle, dead_node);
3278                                continue;
3279                        }
3280
3281                        /* Everything else is a MIGRATION mle */
3282
3283                        /* The rule for MIGRATION mles is that the master
3284                         * becomes UNKNOWN if *either* the original or the new
3285                         * master dies. All UNKNOWN lockres' are sent to
3286                         * whichever node becomes the recovery master. The new
3287                         * master is responsible for determining if there is
3288                         * still a master for this lockres, or if he needs to
3289                         * take over mastery. Either way, this node should
3290                         * expect another message to resolve this. */
3291
3292                        if (mle->master != dead_node &&
3293                            mle->new_master != dead_node)
3294                                continue;
3295
3296                        /* If we have reached this point, this mle needs to be
3297                         * removed from the list and freed. */
3298                        dlm_clean_migration_mle(dlm, mle);
3299
3300                        mlog(0, "%s: node %u died during migration from "
3301                             "%u to %u!\n", dlm->name, dead_node, mle->master,
3302                             mle->new_master);
3303
3304                        /* If we find a lockres associated with the mle, we've
3305                         * hit this rare case that messes up our lock ordering.
3306                         * If so, we need to drop the master lock so that we can
3307                         * take the lockres lock, meaning that we will have to
3308                         * restart from the head of list. */
3309                        res = dlm_reset_mleres_owner(dlm, mle);
3310                        if (res)
3311                                /* restart */
3312                                goto top;
3313
3314                        /* This may be the last reference */
3315                        __dlm_put_mle(mle);
3316                }
3317        }
3318        spin_unlock(&dlm->master_lock);
3319}
3320
3321int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
3322                         u8 old_master)
3323{
3324        struct dlm_node_iter iter;
3325        int ret = 0;
3326
3327        spin_lock(&dlm->spinlock);
3328        dlm_node_iter_init(dlm->domain_map, &iter);
3329        clear_bit(old_master, iter.node_map);
3330        clear_bit(dlm->node_num, iter.node_map);
3331        spin_unlock(&dlm->spinlock);
3332
3333        /* ownership of the lockres is changing.  account for the
3334         * mastery reference here since old_master will briefly have
3335         * a reference after the migration completes */
3336        spin_lock(&res->spinlock);
3337        dlm_lockres_set_refmap_bit(old_master, res);
3338        spin_unlock(&res->spinlock);
3339
3340        mlog(0, "now time to do a migrate request to other nodes\n");
3341        ret = dlm_do_migrate_request(dlm, res, old_master,
3342                                     dlm->node_num, &iter);
3343        if (ret < 0) {
3344                mlog_errno(ret);
3345                goto leave;
3346        }
3347
3348        mlog(0, "doing assert master of %.*s to all except the original node\n",
3349             res->lockname.len, res->lockname.name);
3350        /* this call now finishes out the nodemap
3351         * even if one or more nodes die */
3352        ret = dlm_do_assert_master(dlm, res, iter.node_map,
3353                                   DLM_ASSERT_MASTER_FINISH_MIGRATION);
3354        if (ret < 0) {
3355                /* no longer need to retry.  all living nodes contacted. */
3356                mlog_errno(ret);
3357                ret = 0;
3358        }
3359
3360        memset(iter.node_map, 0, sizeof(iter.node_map));
3361        set_bit(old_master, iter.node_map);
3362        mlog(0, "doing assert master of %.*s back to %u\n",
3363             res->lockname.len, res->lockname.name, old_master);
3364        ret = dlm_do_assert_master(dlm, res, iter.node_map,
3365                                   DLM_ASSERT_MASTER_FINISH_MIGRATION);
3366        if (ret < 0) {
3367                mlog(0, "assert master to original master failed "
3368                     "with %d.\n", ret);
3369                /* the only nonzero status here would be because of
3370                 * a dead original node.  we're done. */
3371                ret = 0;
3372        }
3373
3374        /* all done, set the owner, clear the flag */
3375        spin_lock(&res->spinlock);
3376        dlm_set_lockres_owner(dlm, res, dlm->node_num);
3377        res->state &= ~DLM_LOCK_RES_MIGRATING;
3378        spin_unlock(&res->spinlock);
3379        /* re-dirty it on the new master */
3380        dlm_kick_thread(dlm, res);
3381        wake_up(&res->wq);
3382leave:
3383        return ret;
3384}
3385
3386/*
3387 * LOCKRES AST REFCOUNT
3388 * this is integral to migration
3389 */
3390
3391/* for future intent to call an ast, reserve one ahead of time.
3392 * this should be called only after waiting on the lockres
3393 * with dlm_wait_on_lockres, and while still holding the
3394 * spinlock after the call. */
3395void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
3396{
3397        assert_spin_locked(&res->spinlock);
3398        if (res->state & DLM_LOCK_RES_MIGRATING) {
3399                __dlm_print_one_lock_resource(res);
3400        }
3401        BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3402
3403        atomic_inc(&res->asts_reserved);
3404}
3405
3406/*
3407 * used to drop the reserved ast, either because it went unused,
3408 * or because the ast/bast was actually called.
3409 *
3410 * also, if there is a pending migration on this lockres,
3411 * and this was the last pending ast on the lockres,
3412 * atomically set the MIGRATING flag before we drop the lock.
3413 * this is how we ensure that migration can proceed with no
3414 * asts in progress.  note that it is ok if the state of the
3415 * queues is such that a lock should be granted in the future
3416 * or that a bast should be fired, because the new master will
3417 * shuffle the lists on this lockres as soon as it is migrated.
3418 */
3419void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
3420                             struct dlm_lock_resource *res)
3421{
3422        if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
3423                return;
3424
3425        if (!res->migration_pending) {
3426                spin_unlock(&res->spinlock);
3427                return;
3428        }
3429
3430        BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3431        res->migration_pending = 0;
3432        res->state |= DLM_LOCK_RES_MIGRATING;
3433        spin_unlock(&res->spinlock);
3434        wake_up(&res->wq);
3435        wake_up(&dlm->migration_wq);
3436}
3437