linux/fs/ocfs2/dlm/dlmmaster.c
<<
>>
Prefs
   1/* -*- mode: c; c-basic-offset: 8; -*-
   2 * vim: noexpandtab sw=8 ts=8 sts=0:
   3 *
   4 * dlmmod.c
   5 *
   6 * standalone DLM module
   7 *
   8 * Copyright (C) 2004 Oracle.  All rights reserved.
   9 *
  10 * This program is free software; you can redistribute it and/or
  11 * modify it under the terms of the GNU General Public
  12 * License as published by the Free Software Foundation; either
  13 * version 2 of the License, or (at your option) any later version.
  14 *
  15 * This program is distributed in the hope that it will be useful,
  16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  18 * General Public License for more details.
  19 *
  20 * You should have received a copy of the GNU General Public
  21 * License along with this program; if not, write to the
  22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  23 * Boston, MA 021110-1307, USA.
  24 *
  25 */
  26
  27
  28#include <linux/module.h>
  29#include <linux/fs.h>
  30#include <linux/types.h>
  31#include <linux/slab.h>
  32#include <linux/highmem.h>
  33#include <linux/init.h>
  34#include <linux/sysctl.h>
  35#include <linux/random.h>
  36#include <linux/blkdev.h>
  37#include <linux/socket.h>
  38#include <linux/inet.h>
  39#include <linux/spinlock.h>
  40#include <linux/delay.h>
  41
  42
  43#include "cluster/heartbeat.h"
  44#include "cluster/nodemanager.h"
  45#include "cluster/tcp.h"
  46
  47#include "dlmapi.h"
  48#include "dlmcommon.h"
  49#include "dlmdomain.h"
  50#include "dlmdebug.h"
  51
  52#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
  53#include "cluster/masklog.h"
  54
  55static void dlm_mle_node_down(struct dlm_ctxt *dlm,
  56                              struct dlm_master_list_entry *mle,
  57                              struct o2nm_node *node,
  58                              int idx);
  59static void dlm_mle_node_up(struct dlm_ctxt *dlm,
  60                            struct dlm_master_list_entry *mle,
  61                            struct o2nm_node *node,
  62                            int idx);
  63
  64static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
  65static int dlm_do_assert_master(struct dlm_ctxt *dlm,
  66                                struct dlm_lock_resource *res,
  67                                void *nodemap, u32 flags);
  68static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
  69
  70static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
  71                                struct dlm_master_list_entry *mle,
  72                                const char *name,
  73                                unsigned int namelen)
  74{
  75        if (dlm != mle->dlm)
  76                return 0;
  77
  78        if (namelen != mle->mnamelen ||
  79            memcmp(name, mle->mname, namelen) != 0)
  80                return 0;
  81
  82        return 1;
  83}
  84
  85static struct kmem_cache *dlm_lockres_cache;
  86static struct kmem_cache *dlm_lockname_cache;
  87static struct kmem_cache *dlm_mle_cache;
  88
  89static void dlm_mle_release(struct kref *kref);
  90static void dlm_init_mle(struct dlm_master_list_entry *mle,
  91                        enum dlm_mle_type type,
  92                        struct dlm_ctxt *dlm,
  93                        struct dlm_lock_resource *res,
  94                        const char *name,
  95                        unsigned int namelen);
  96static void dlm_put_mle(struct dlm_master_list_entry *mle);
  97static void __dlm_put_mle(struct dlm_master_list_entry *mle);
  98static int dlm_find_mle(struct dlm_ctxt *dlm,
  99                        struct dlm_master_list_entry **mle,
 100                        char *name, unsigned int namelen);
 101
 102static int dlm_do_master_request(struct dlm_lock_resource *res,
 103                                 struct dlm_master_list_entry *mle, int to);
 104
 105
 106static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
 107                                     struct dlm_lock_resource *res,
 108                                     struct dlm_master_list_entry *mle,
 109                                     int *blocked);
 110static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
 111                                    struct dlm_lock_resource *res,
 112                                    struct dlm_master_list_entry *mle,
 113                                    int blocked);
 114static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
 115                                 struct dlm_lock_resource *res,
 116                                 struct dlm_master_list_entry *mle,
 117                                 struct dlm_master_list_entry **oldmle,
 118                                 const char *name, unsigned int namelen,
 119                                 u8 new_master, u8 master);
 120
 121static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
 122                                    struct dlm_lock_resource *res);
 123static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
 124                                      struct dlm_lock_resource *res);
 125static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
 126                                       struct dlm_lock_resource *res,
 127                                       u8 target);
 128static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
 129                                       struct dlm_lock_resource *res);
 130
 131
 132int dlm_is_host_down(int errno)
 133{
 134        switch (errno) {
 135                case -EBADF:
 136                case -ECONNREFUSED:
 137                case -ENOTCONN:
 138                case -ECONNRESET:
 139                case -EPIPE:
 140                case -EHOSTDOWN:
 141                case -EHOSTUNREACH:
 142                case -ETIMEDOUT:
 143                case -ECONNABORTED:
 144                case -ENETDOWN:
 145                case -ENETUNREACH:
 146                case -ENETRESET:
 147                case -ESHUTDOWN:
 148                case -ENOPROTOOPT:
 149                case -EINVAL:   /* if returned from our tcp code,
 150                                   this means there is no socket */
 151                        return 1;
 152        }
 153        return 0;
 154}
 155
 156
 157/*
 158 * MASTER LIST FUNCTIONS
 159 */
 160
 161
 162/*
 163 * regarding master list entries and heartbeat callbacks:
 164 *
 165 * in order to avoid sleeping and allocation that occurs in
 166 * heartbeat, master list entries are simply attached to the
 167 * dlm's established heartbeat callbacks.  the mle is attached
 168 * when it is created, and since the dlm->spinlock is held at
 169 * that time, any heartbeat event will be properly discovered
 170 * by the mle.  the mle needs to be detached from the
 171 * dlm->mle_hb_events list as soon as heartbeat events are no
 172 * longer useful to the mle, and before the mle is freed.
 173 *
 174 * as a general rule, heartbeat events are no longer needed by
 175 * the mle once an "answer" regarding the lock master has been
 176 * received.
 177 */
 178static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
 179                                              struct dlm_master_list_entry *mle)
 180{
 181        assert_spin_locked(&dlm->spinlock);
 182
 183        list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
 184}
 185
 186
 187static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
 188                                              struct dlm_master_list_entry *mle)
 189{
 190        if (!list_empty(&mle->hb_events))
 191                list_del_init(&mle->hb_events);
 192}
 193
 194
 195static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
 196                                            struct dlm_master_list_entry *mle)
 197{
 198        spin_lock(&dlm->spinlock);
 199        __dlm_mle_detach_hb_events(dlm, mle);
 200        spin_unlock(&dlm->spinlock);
 201}
 202
 203static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
 204{
 205        struct dlm_ctxt *dlm;
 206        dlm = mle->dlm;
 207
 208        assert_spin_locked(&dlm->spinlock);
 209        assert_spin_locked(&dlm->master_lock);
 210        mle->inuse++;
 211        kref_get(&mle->mle_refs);
 212}
 213
 214static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
 215{
 216        struct dlm_ctxt *dlm;
 217        dlm = mle->dlm;
 218
 219        spin_lock(&dlm->spinlock);
 220        spin_lock(&dlm->master_lock);
 221        mle->inuse--;
 222        __dlm_put_mle(mle);
 223        spin_unlock(&dlm->master_lock);
 224        spin_unlock(&dlm->spinlock);
 225
 226}
 227
 228/* remove from list and free */
 229static void __dlm_put_mle(struct dlm_master_list_entry *mle)
 230{
 231        struct dlm_ctxt *dlm;
 232        dlm = mle->dlm;
 233
 234        assert_spin_locked(&dlm->spinlock);
 235        assert_spin_locked(&dlm->master_lock);
 236        if (!kref_read(&mle->mle_refs)) {
 237                /* this may or may not crash, but who cares.
 238                 * it's a BUG. */
 239                mlog(ML_ERROR, "bad mle: %p\n", mle);
 240                dlm_print_one_mle(mle);
 241                BUG();
 242        } else
 243                kref_put(&mle->mle_refs, dlm_mle_release);
 244}
 245
 246
 247/* must not have any spinlocks coming in */
 248static void dlm_put_mle(struct dlm_master_list_entry *mle)
 249{
 250        struct dlm_ctxt *dlm;
 251        dlm = mle->dlm;
 252
 253        spin_lock(&dlm->spinlock);
 254        spin_lock(&dlm->master_lock);
 255        __dlm_put_mle(mle);
 256        spin_unlock(&dlm->master_lock);
 257        spin_unlock(&dlm->spinlock);
 258}
 259
 260static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
 261{
 262        kref_get(&mle->mle_refs);
 263}
 264
 265static void dlm_init_mle(struct dlm_master_list_entry *mle,
 266                        enum dlm_mle_type type,
 267                        struct dlm_ctxt *dlm,
 268                        struct dlm_lock_resource *res,
 269                        const char *name,
 270                        unsigned int namelen)
 271{
 272        assert_spin_locked(&dlm->spinlock);
 273
 274        mle->dlm = dlm;
 275        mle->type = type;
 276        INIT_HLIST_NODE(&mle->master_hash_node);
 277        INIT_LIST_HEAD(&mle->hb_events);
 278        memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
 279        spin_lock_init(&mle->spinlock);
 280        init_waitqueue_head(&mle->wq);
 281        atomic_set(&mle->woken, 0);
 282        kref_init(&mle->mle_refs);
 283        memset(mle->response_map, 0, sizeof(mle->response_map));
 284        mle->master = O2NM_MAX_NODES;
 285        mle->new_master = O2NM_MAX_NODES;
 286        mle->inuse = 0;
 287
 288        BUG_ON(mle->type != DLM_MLE_BLOCK &&
 289               mle->type != DLM_MLE_MASTER &&
 290               mle->type != DLM_MLE_MIGRATION);
 291
 292        if (mle->type == DLM_MLE_MASTER) {
 293                BUG_ON(!res);
 294                mle->mleres = res;
 295                memcpy(mle->mname, res->lockname.name, res->lockname.len);
 296                mle->mnamelen = res->lockname.len;
 297                mle->mnamehash = res->lockname.hash;
 298        } else {
 299                BUG_ON(!name);
 300                mle->mleres = NULL;
 301                memcpy(mle->mname, name, namelen);
 302                mle->mnamelen = namelen;
 303                mle->mnamehash = dlm_lockid_hash(name, namelen);
 304        }
 305
 306        atomic_inc(&dlm->mle_tot_count[mle->type]);
 307        atomic_inc(&dlm->mle_cur_count[mle->type]);
 308
 309        /* copy off the node_map and register hb callbacks on our copy */
 310        memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
 311        memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
 312        clear_bit(dlm->node_num, mle->vote_map);
 313        clear_bit(dlm->node_num, mle->node_map);
 314
 315        /* attach the mle to the domain node up/down events */
 316        __dlm_mle_attach_hb_events(dlm, mle);
 317}
 318
 319void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
 320{
 321        assert_spin_locked(&dlm->spinlock);
 322        assert_spin_locked(&dlm->master_lock);
 323
 324        if (!hlist_unhashed(&mle->master_hash_node))
 325                hlist_del_init(&mle->master_hash_node);
 326}
 327
 328void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
 329{
 330        struct hlist_head *bucket;
 331
 332        assert_spin_locked(&dlm->master_lock);
 333
 334        bucket = dlm_master_hash(dlm, mle->mnamehash);
 335        hlist_add_head(&mle->master_hash_node, bucket);
 336}
 337
 338/* returns 1 if found, 0 if not */
 339static int dlm_find_mle(struct dlm_ctxt *dlm,
 340                        struct dlm_master_list_entry **mle,
 341                        char *name, unsigned int namelen)
 342{
 343        struct dlm_master_list_entry *tmpmle;
 344        struct hlist_head *bucket;
 345        unsigned int hash;
 346
 347        assert_spin_locked(&dlm->master_lock);
 348
 349        hash = dlm_lockid_hash(name, namelen);
 350        bucket = dlm_master_hash(dlm, hash);
 351        hlist_for_each_entry(tmpmle, bucket, master_hash_node) {
 352                if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
 353                        continue;
 354                dlm_get_mle(tmpmle);
 355                *mle = tmpmle;
 356                return 1;
 357        }
 358        return 0;
 359}
 360
 361void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
 362{
 363        struct dlm_master_list_entry *mle;
 364
 365        assert_spin_locked(&dlm->spinlock);
 366
 367        list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
 368                if (node_up)
 369                        dlm_mle_node_up(dlm, mle, NULL, idx);
 370                else
 371                        dlm_mle_node_down(dlm, mle, NULL, idx);
 372        }
 373}
 374
 375static void dlm_mle_node_down(struct dlm_ctxt *dlm,
 376                              struct dlm_master_list_entry *mle,
 377                              struct o2nm_node *node, int idx)
 378{
 379        spin_lock(&mle->spinlock);
 380
 381        if (!test_bit(idx, mle->node_map))
 382                mlog(0, "node %u already removed from nodemap!\n", idx);
 383        else
 384                clear_bit(idx, mle->node_map);
 385
 386        spin_unlock(&mle->spinlock);
 387}
 388
 389static void dlm_mle_node_up(struct dlm_ctxt *dlm,
 390                            struct dlm_master_list_entry *mle,
 391                            struct o2nm_node *node, int idx)
 392{
 393        spin_lock(&mle->spinlock);
 394
 395        if (test_bit(idx, mle->node_map))
 396                mlog(0, "node %u already in node map!\n", idx);
 397        else
 398                set_bit(idx, mle->node_map);
 399
 400        spin_unlock(&mle->spinlock);
 401}
 402
 403
 404int dlm_init_mle_cache(void)
 405{
 406        dlm_mle_cache = kmem_cache_create("o2dlm_mle",
 407                                          sizeof(struct dlm_master_list_entry),
 408                                          0, SLAB_HWCACHE_ALIGN,
 409                                          NULL);
 410        if (dlm_mle_cache == NULL)
 411                return -ENOMEM;
 412        return 0;
 413}
 414
 415void dlm_destroy_mle_cache(void)
 416{
 417        kmem_cache_destroy(dlm_mle_cache);
 418}
 419
 420static void dlm_mle_release(struct kref *kref)
 421{
 422        struct dlm_master_list_entry *mle;
 423        struct dlm_ctxt *dlm;
 424
 425        mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
 426        dlm = mle->dlm;
 427
 428        assert_spin_locked(&dlm->spinlock);
 429        assert_spin_locked(&dlm->master_lock);
 430
 431        mlog(0, "Releasing mle for %.*s, type %d\n", mle->mnamelen, mle->mname,
 432             mle->type);
 433
 434        /* remove from list if not already */
 435        __dlm_unlink_mle(dlm, mle);
 436
 437        /* detach the mle from the domain node up/down events */
 438        __dlm_mle_detach_hb_events(dlm, mle);
 439
 440        atomic_dec(&dlm->mle_cur_count[mle->type]);
 441
 442        /* NOTE: kfree under spinlock here.
 443         * if this is bad, we can move this to a freelist. */
 444        kmem_cache_free(dlm_mle_cache, mle);
 445}
 446
 447
 448/*
 449 * LOCK RESOURCE FUNCTIONS
 450 */
 451
 452int dlm_init_master_caches(void)
 453{
 454        dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
 455                                              sizeof(struct dlm_lock_resource),
 456                                              0, SLAB_HWCACHE_ALIGN, NULL);
 457        if (!dlm_lockres_cache)
 458                goto bail;
 459
 460        dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
 461                                               DLM_LOCKID_NAME_MAX, 0,
 462                                               SLAB_HWCACHE_ALIGN, NULL);
 463        if (!dlm_lockname_cache)
 464                goto bail;
 465
 466        return 0;
 467bail:
 468        dlm_destroy_master_caches();
 469        return -ENOMEM;
 470}
 471
 472void dlm_destroy_master_caches(void)
 473{
 474        kmem_cache_destroy(dlm_lockname_cache);
 475        dlm_lockname_cache = NULL;
 476
 477        kmem_cache_destroy(dlm_lockres_cache);
 478        dlm_lockres_cache = NULL;
 479}
 480
 481static void dlm_lockres_release(struct kref *kref)
 482{
 483        struct dlm_lock_resource *res;
 484        struct dlm_ctxt *dlm;
 485
 486        res = container_of(kref, struct dlm_lock_resource, refs);
 487        dlm = res->dlm;
 488
 489        /* This should not happen -- all lockres' have a name
 490         * associated with them at init time. */
 491        BUG_ON(!res->lockname.name);
 492
 493        mlog(0, "destroying lockres %.*s\n", res->lockname.len,
 494             res->lockname.name);
 495
 496        atomic_dec(&dlm->res_cur_count);
 497
 498        if (!hlist_unhashed(&res->hash_node) ||
 499            !list_empty(&res->granted) ||
 500            !list_empty(&res->converting) ||
 501            !list_empty(&res->blocked) ||
 502            !list_empty(&res->dirty) ||
 503            !list_empty(&res->recovering) ||
 504            !list_empty(&res->purge)) {
 505                mlog(ML_ERROR,
 506                     "Going to BUG for resource %.*s."
 507                     "  We're on a list! [%c%c%c%c%c%c%c]\n",
 508                     res->lockname.len, res->lockname.name,
 509                     !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
 510                     !list_empty(&res->granted) ? 'G' : ' ',
 511                     !list_empty(&res->converting) ? 'C' : ' ',
 512                     !list_empty(&res->blocked) ? 'B' : ' ',
 513                     !list_empty(&res->dirty) ? 'D' : ' ',
 514                     !list_empty(&res->recovering) ? 'R' : ' ',
 515                     !list_empty(&res->purge) ? 'P' : ' ');
 516
 517                dlm_print_one_lock_resource(res);
 518        }
 519
 520        /* By the time we're ready to blow this guy away, we shouldn't
 521         * be on any lists. */
 522        BUG_ON(!hlist_unhashed(&res->hash_node));
 523        BUG_ON(!list_empty(&res->granted));
 524        BUG_ON(!list_empty(&res->converting));
 525        BUG_ON(!list_empty(&res->blocked));
 526        BUG_ON(!list_empty(&res->dirty));
 527        BUG_ON(!list_empty(&res->recovering));
 528        BUG_ON(!list_empty(&res->purge));
 529
 530        kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
 531
 532        kmem_cache_free(dlm_lockres_cache, res);
 533}
 534
 535void dlm_lockres_put(struct dlm_lock_resource *res)
 536{
 537        kref_put(&res->refs, dlm_lockres_release);
 538}
 539
 540static void dlm_init_lockres(struct dlm_ctxt *dlm,
 541                             struct dlm_lock_resource *res,
 542                             const char *name, unsigned int namelen)
 543{
 544        char *qname;
 545
 546        /* If we memset here, we lose our reference to the kmalloc'd
 547         * res->lockname.name, so be sure to init every field
 548         * correctly! */
 549
 550        qname = (char *) res->lockname.name;
 551        memcpy(qname, name, namelen);
 552
 553        res->lockname.len = namelen;
 554        res->lockname.hash = dlm_lockid_hash(name, namelen);
 555
 556        init_waitqueue_head(&res->wq);
 557        spin_lock_init(&res->spinlock);
 558        INIT_HLIST_NODE(&res->hash_node);
 559        INIT_LIST_HEAD(&res->granted);
 560        INIT_LIST_HEAD(&res->converting);
 561        INIT_LIST_HEAD(&res->blocked);
 562        INIT_LIST_HEAD(&res->dirty);
 563        INIT_LIST_HEAD(&res->recovering);
 564        INIT_LIST_HEAD(&res->purge);
 565        INIT_LIST_HEAD(&res->tracking);
 566        atomic_set(&res->asts_reserved, 0);
 567        res->migration_pending = 0;
 568        res->inflight_locks = 0;
 569        res->inflight_assert_workers = 0;
 570
 571        res->dlm = dlm;
 572
 573        kref_init(&res->refs);
 574
 575        atomic_inc(&dlm->res_tot_count);
 576        atomic_inc(&dlm->res_cur_count);
 577
 578        /* just for consistency */
 579        spin_lock(&res->spinlock);
 580        dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
 581        spin_unlock(&res->spinlock);
 582
 583        res->state = DLM_LOCK_RES_IN_PROGRESS;
 584
 585        res->last_used = 0;
 586
 587        spin_lock(&dlm->spinlock);
 588        list_add_tail(&res->tracking, &dlm->tracking_list);
 589        spin_unlock(&dlm->spinlock);
 590
 591        memset(res->lvb, 0, DLM_LVB_LEN);
 592        memset(res->refmap, 0, sizeof(res->refmap));
 593}
 594
 595struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
 596                                   const char *name,
 597                                   unsigned int namelen)
 598{
 599        struct dlm_lock_resource *res = NULL;
 600
 601        res = kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
 602        if (!res)
 603                goto error;
 604
 605        res->lockname.name = kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
 606        if (!res->lockname.name)
 607                goto error;
 608
 609        dlm_init_lockres(dlm, res, name, namelen);
 610        return res;
 611
 612error:
 613        if (res)
 614                kmem_cache_free(dlm_lockres_cache, res);
 615        return NULL;
 616}
 617
 618void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm,
 619                                struct dlm_lock_resource *res, int bit)
 620{
 621        assert_spin_locked(&res->spinlock);
 622
 623        mlog(0, "res %.*s, set node %u, %ps()\n", res->lockname.len,
 624             res->lockname.name, bit, __builtin_return_address(0));
 625
 626        set_bit(bit, res->refmap);
 627}
 628
 629void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm,
 630                                  struct dlm_lock_resource *res, int bit)
 631{
 632        assert_spin_locked(&res->spinlock);
 633
 634        mlog(0, "res %.*s, clr node %u, %ps()\n", res->lockname.len,
 635             res->lockname.name, bit, __builtin_return_address(0));
 636
 637        clear_bit(bit, res->refmap);
 638}
 639
 640static void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
 641                                   struct dlm_lock_resource *res)
 642{
 643        res->inflight_locks++;
 644
 645        mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name,
 646             res->lockname.len, res->lockname.name, res->inflight_locks,
 647             __builtin_return_address(0));
 648}
 649
 650void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
 651                                   struct dlm_lock_resource *res)
 652{
 653        assert_spin_locked(&res->spinlock);
 654        __dlm_lockres_grab_inflight_ref(dlm, res);
 655}
 656
 657void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
 658                                   struct dlm_lock_resource *res)
 659{
 660        assert_spin_locked(&res->spinlock);
 661
 662        BUG_ON(res->inflight_locks == 0);
 663
 664        res->inflight_locks--;
 665
 666        mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name,
 667             res->lockname.len, res->lockname.name, res->inflight_locks,
 668             __builtin_return_address(0));
 669
 670        wake_up(&res->wq);
 671}
 672
 673void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
 674                struct dlm_lock_resource *res)
 675{
 676        assert_spin_locked(&res->spinlock);
 677        res->inflight_assert_workers++;
 678        mlog(0, "%s:%.*s: inflight assert worker++: now %u\n",
 679                        dlm->name, res->lockname.len, res->lockname.name,
 680                        res->inflight_assert_workers);
 681}
 682
 683static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
 684                struct dlm_lock_resource *res)
 685{
 686        assert_spin_locked(&res->spinlock);
 687        BUG_ON(res->inflight_assert_workers == 0);
 688        res->inflight_assert_workers--;
 689        mlog(0, "%s:%.*s: inflight assert worker--: now %u\n",
 690                        dlm->name, res->lockname.len, res->lockname.name,
 691                        res->inflight_assert_workers);
 692}
 693
 694static void dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
 695                struct dlm_lock_resource *res)
 696{
 697        spin_lock(&res->spinlock);
 698        __dlm_lockres_drop_inflight_worker(dlm, res);
 699        spin_unlock(&res->spinlock);
 700}
 701
 702/*
 703 * lookup a lock resource by name.
 704 * may already exist in the hashtable.
 705 * lockid is null terminated
 706 *
 707 * if not, allocate enough for the lockres and for
 708 * the temporary structure used in doing the mastering.
 709 *
 710 * also, do a lookup in the dlm->master_list to see
 711 * if another node has begun mastering the same lock.
 712 * if so, there should be a block entry in there
 713 * for this name, and we should *not* attempt to master
 714 * the lock here.   need to wait around for that node
 715 * to assert_master (or die).
 716 *
 717 */
 718struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
 719                                          const char *lockid,
 720                                          int namelen,
 721                                          int flags)
 722{
 723        struct dlm_lock_resource *tmpres=NULL, *res=NULL;
 724        struct dlm_master_list_entry *mle = NULL;
 725        struct dlm_master_list_entry *alloc_mle = NULL;
 726        int blocked = 0;
 727        int ret, nodenum;
 728        struct dlm_node_iter iter;
 729        unsigned int hash;
 730        int tries = 0;
 731        int bit, wait_on_recovery = 0;
 732
 733        BUG_ON(!lockid);
 734
 735        hash = dlm_lockid_hash(lockid, namelen);
 736
 737        mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
 738
 739lookup:
 740        spin_lock(&dlm->spinlock);
 741        tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
 742        if (tmpres) {
 743                spin_unlock(&dlm->spinlock);
 744                spin_lock(&tmpres->spinlock);
 745
 746                /*
 747                 * Right after dlm spinlock was released, dlm_thread could have
 748                 * purged the lockres. Check if lockres got unhashed. If so
 749                 * start over.
 750                 */
 751                if (hlist_unhashed(&tmpres->hash_node)) {
 752                        spin_unlock(&tmpres->spinlock);
 753                        dlm_lockres_put(tmpres);
 754                        tmpres = NULL;
 755                        goto lookup;
 756                }
 757
 758                /* Wait on the thread that is mastering the resource */
 759                if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
 760                        __dlm_wait_on_lockres(tmpres);
 761                        BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
 762                        spin_unlock(&tmpres->spinlock);
 763                        dlm_lockres_put(tmpres);
 764                        tmpres = NULL;
 765                        goto lookup;
 766                }
 767
 768                /* Wait on the resource purge to complete before continuing */
 769                if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) {
 770                        BUG_ON(tmpres->owner == dlm->node_num);
 771                        __dlm_wait_on_lockres_flags(tmpres,
 772                                                    DLM_LOCK_RES_DROPPING_REF);
 773                        spin_unlock(&tmpres->spinlock);
 774                        dlm_lockres_put(tmpres);
 775                        tmpres = NULL;
 776                        goto lookup;
 777                }
 778
 779                /* Grab inflight ref to pin the resource */
 780                dlm_lockres_grab_inflight_ref(dlm, tmpres);
 781
 782                spin_unlock(&tmpres->spinlock);
 783                if (res) {
 784                        spin_lock(&dlm->track_lock);
 785                        if (!list_empty(&res->tracking))
 786                                list_del_init(&res->tracking);
 787                        else
 788                                mlog(ML_ERROR, "Resource %.*s not "
 789                                                "on the Tracking list\n",
 790                                                res->lockname.len,
 791                                                res->lockname.name);
 792                        spin_unlock(&dlm->track_lock);
 793                        dlm_lockres_put(res);
 794                }
 795                res = tmpres;
 796                goto leave;
 797        }
 798
 799        if (!res) {
 800                spin_unlock(&dlm->spinlock);
 801                mlog(0, "allocating a new resource\n");
 802                /* nothing found and we need to allocate one. */
 803                alloc_mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
 804                if (!alloc_mle)
 805                        goto leave;
 806                res = dlm_new_lockres(dlm, lockid, namelen);
 807                if (!res)
 808                        goto leave;
 809                goto lookup;
 810        }
 811
 812        mlog(0, "no lockres found, allocated our own: %p\n", res);
 813
 814        if (flags & LKM_LOCAL) {
 815                /* caller knows it's safe to assume it's not mastered elsewhere
 816                 * DONE!  return right away */
 817                spin_lock(&res->spinlock);
 818                dlm_change_lockres_owner(dlm, res, dlm->node_num);
 819                __dlm_insert_lockres(dlm, res);
 820                dlm_lockres_grab_inflight_ref(dlm, res);
 821                spin_unlock(&res->spinlock);
 822                spin_unlock(&dlm->spinlock);
 823                /* lockres still marked IN_PROGRESS */
 824                goto wake_waiters;
 825        }
 826
 827        /* check master list to see if another node has started mastering it */
 828        spin_lock(&dlm->master_lock);
 829
 830        /* if we found a block, wait for lock to be mastered by another node */
 831        blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
 832        if (blocked) {
 833                int mig;
 834                if (mle->type == DLM_MLE_MASTER) {
 835                        mlog(ML_ERROR, "master entry for nonexistent lock!\n");
 836                        BUG();
 837                }
 838                mig = (mle->type == DLM_MLE_MIGRATION);
 839                /* if there is a migration in progress, let the migration
 840                 * finish before continuing.  we can wait for the absence
 841                 * of the MIGRATION mle: either the migrate finished or
 842                 * one of the nodes died and the mle was cleaned up.
 843                 * if there is a BLOCK here, but it already has a master
 844                 * set, we are too late.  the master does not have a ref
 845                 * for us in the refmap.  detach the mle and drop it.
 846                 * either way, go back to the top and start over. */
 847                if (mig || mle->master != O2NM_MAX_NODES) {
 848                        BUG_ON(mig && mle->master == dlm->node_num);
 849                        /* we arrived too late.  the master does not
 850                         * have a ref for us. retry. */
 851                        mlog(0, "%s:%.*s: late on %s\n",
 852                             dlm->name, namelen, lockid,
 853                             mig ?  "MIGRATION" : "BLOCK");
 854                        spin_unlock(&dlm->master_lock);
 855                        spin_unlock(&dlm->spinlock);
 856
 857                        /* master is known, detach */
 858                        if (!mig)
 859                                dlm_mle_detach_hb_events(dlm, mle);
 860                        dlm_put_mle(mle);
 861                        mle = NULL;
 862                        /* this is lame, but we can't wait on either
 863                         * the mle or lockres waitqueue here */
 864                        if (mig)
 865                                msleep(100);
 866                        goto lookup;
 867                }
 868        } else {
 869                /* go ahead and try to master lock on this node */
 870                mle = alloc_mle;
 871                /* make sure this does not get freed below */
 872                alloc_mle = NULL;
 873                dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
 874                set_bit(dlm->node_num, mle->maybe_map);
 875                __dlm_insert_mle(dlm, mle);
 876
 877                /* still holding the dlm spinlock, check the recovery map
 878                 * to see if there are any nodes that still need to be
 879                 * considered.  these will not appear in the mle nodemap
 880                 * but they might own this lockres.  wait on them. */
 881                bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
 882                if (bit < O2NM_MAX_NODES) {
 883                        mlog(0, "%s: res %.*s, At least one node (%d) "
 884                             "to recover before lock mastery can begin\n",
 885                             dlm->name, namelen, (char *)lockid, bit);
 886                        wait_on_recovery = 1;
 887                }
 888        }
 889
 890        /* at this point there is either a DLM_MLE_BLOCK or a
 891         * DLM_MLE_MASTER on the master list, so it's safe to add the
 892         * lockres to the hashtable.  anyone who finds the lock will
 893         * still have to wait on the IN_PROGRESS. */
 894
 895        /* finally add the lockres to its hash bucket */
 896        __dlm_insert_lockres(dlm, res);
 897
 898        /* since this lockres is new it doesn't not require the spinlock */
 899        __dlm_lockres_grab_inflight_ref(dlm, res);
 900
 901        /* get an extra ref on the mle in case this is a BLOCK
 902         * if so, the creator of the BLOCK may try to put the last
 903         * ref at this time in the assert master handler, so we
 904         * need an extra one to keep from a bad ptr deref. */
 905        dlm_get_mle_inuse(mle);
 906        spin_unlock(&dlm->master_lock);
 907        spin_unlock(&dlm->spinlock);
 908
 909redo_request:
 910        while (wait_on_recovery) {
 911                /* any cluster changes that occurred after dropping the
 912                 * dlm spinlock would be detectable be a change on the mle,
 913                 * so we only need to clear out the recovery map once. */
 914                if (dlm_is_recovery_lock(lockid, namelen)) {
 915                        mlog(0, "%s: Recovery map is not empty, but must "
 916                             "master $RECOVERY lock now\n", dlm->name);
 917                        if (!dlm_pre_master_reco_lockres(dlm, res))
 918                                wait_on_recovery = 0;
 919                        else {
 920                                mlog(0, "%s: waiting 500ms for heartbeat state "
 921                                    "change\n", dlm->name);
 922                                msleep(500);
 923                        }
 924                        continue;
 925                }
 926
 927                dlm_kick_recovery_thread(dlm);
 928                msleep(1000);
 929                dlm_wait_for_recovery(dlm);
 930
 931                spin_lock(&dlm->spinlock);
 932                bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
 933                if (bit < O2NM_MAX_NODES) {
 934                        mlog(0, "%s: res %.*s, At least one node (%d) "
 935                             "to recover before lock mastery can begin\n",
 936                             dlm->name, namelen, (char *)lockid, bit);
 937                        wait_on_recovery = 1;
 938                } else
 939                        wait_on_recovery = 0;
 940                spin_unlock(&dlm->spinlock);
 941
 942                if (wait_on_recovery)
 943                        dlm_wait_for_node_recovery(dlm, bit, 10000);
 944        }
 945
 946        /* must wait for lock to be mastered elsewhere */
 947        if (blocked)
 948                goto wait;
 949
 950        ret = -EINVAL;
 951        dlm_node_iter_init(mle->vote_map, &iter);
 952        while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
 953                ret = dlm_do_master_request(res, mle, nodenum);
 954                if (ret < 0)
 955                        mlog_errno(ret);
 956                if (mle->master != O2NM_MAX_NODES) {
 957                        /* found a master ! */
 958                        if (mle->master <= nodenum)
 959                                break;
 960                        /* if our master request has not reached the master
 961                         * yet, keep going until it does.  this is how the
 962                         * master will know that asserts are needed back to
 963                         * the lower nodes. */
 964                        mlog(0, "%s: res %.*s, Requests only up to %u but "
 965                             "master is %u, keep going\n", dlm->name, namelen,
 966                             lockid, nodenum, mle->master);
 967                }
 968        }
 969
 970wait:
 971        /* keep going until the response map includes all nodes */
 972        ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
 973        if (ret < 0) {
 974                wait_on_recovery = 1;
 975                mlog(0, "%s: res %.*s, Node map changed, redo the master "
 976                     "request now, blocked=%d\n", dlm->name, res->lockname.len,
 977                     res->lockname.name, blocked);
 978                if (++tries > 20) {
 979                        mlog(ML_ERROR, "%s: res %.*s, Spinning on "
 980                             "dlm_wait_for_lock_mastery, blocked = %d\n",
 981                             dlm->name, res->lockname.len,
 982                             res->lockname.name, blocked);
 983                        dlm_print_one_lock_resource(res);
 984                        dlm_print_one_mle(mle);
 985                        tries = 0;
 986                }
 987                goto redo_request;
 988        }
 989
 990        mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len,
 991             res->lockname.name, res->owner);
 992        /* make sure we never continue without this */
 993        BUG_ON(res->owner == O2NM_MAX_NODES);
 994
 995        /* master is known, detach if not already detached */
 996        dlm_mle_detach_hb_events(dlm, mle);
 997        dlm_put_mle(mle);
 998        /* put the extra ref */
 999        dlm_put_mle_inuse(mle);
1000
1001wake_waiters:
1002        spin_lock(&res->spinlock);
1003        res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
1004        spin_unlock(&res->spinlock);
1005        wake_up(&res->wq);
1006
1007leave:
1008        /* need to free the unused mle */
1009        if (alloc_mle)
1010                kmem_cache_free(dlm_mle_cache, alloc_mle);
1011
1012        return res;
1013}
1014
1015
1016#define DLM_MASTERY_TIMEOUT_MS   5000
1017
1018static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
1019                                     struct dlm_lock_resource *res,
1020                                     struct dlm_master_list_entry *mle,
1021                                     int *blocked)
1022{
1023        u8 m;
1024        int ret, bit;
1025        int map_changed, voting_done;
1026        int assert, sleep;
1027
1028recheck:
1029        ret = 0;
1030        assert = 0;
1031
1032        /* check if another node has already become the owner */
1033        spin_lock(&res->spinlock);
1034        if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1035                mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
1036                     res->lockname.len, res->lockname.name, res->owner);
1037                spin_unlock(&res->spinlock);
1038                /* this will cause the master to re-assert across
1039                 * the whole cluster, freeing up mles */
1040                if (res->owner != dlm->node_num) {
1041                        ret = dlm_do_master_request(res, mle, res->owner);
1042                        if (ret < 0) {
1043                                /* give recovery a chance to run */
1044                                mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
1045                                msleep(500);
1046                                goto recheck;
1047                        }
1048                }
1049                ret = 0;
1050                goto leave;
1051        }
1052        spin_unlock(&res->spinlock);
1053
1054        spin_lock(&mle->spinlock);
1055        m = mle->master;
1056        map_changed = (memcmp(mle->vote_map, mle->node_map,
1057                              sizeof(mle->vote_map)) != 0);
1058        voting_done = (memcmp(mle->vote_map, mle->response_map,
1059                             sizeof(mle->vote_map)) == 0);
1060
1061        /* restart if we hit any errors */
1062        if (map_changed) {
1063                int b;
1064                mlog(0, "%s: %.*s: node map changed, restarting\n",
1065                     dlm->name, res->lockname.len, res->lockname.name);
1066                ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
1067                b = (mle->type == DLM_MLE_BLOCK);
1068                if ((*blocked && !b) || (!*blocked && b)) {
1069                        mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
1070                             dlm->name, res->lockname.len, res->lockname.name,
1071                             *blocked, b);
1072                        *blocked = b;
1073                }
1074                spin_unlock(&mle->spinlock);
1075                if (ret < 0) {
1076                        mlog_errno(ret);
1077                        goto leave;
1078                }
1079                mlog(0, "%s:%.*s: restart lock mastery succeeded, "
1080                     "rechecking now\n", dlm->name, res->lockname.len,
1081                     res->lockname.name);
1082                goto recheck;
1083        } else {
1084                if (!voting_done) {
1085                        mlog(0, "map not changed and voting not done "
1086                             "for %s:%.*s\n", dlm->name, res->lockname.len,
1087                             res->lockname.name);
1088                }
1089        }
1090
1091        if (m != O2NM_MAX_NODES) {
1092                /* another node has done an assert!
1093                 * all done! */
1094                sleep = 0;
1095        } else {
1096                sleep = 1;
1097                /* have all nodes responded? */
1098                if (voting_done && !*blocked) {
1099                        bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
1100                        if (dlm->node_num <= bit) {
1101                                /* my node number is lowest.
1102                                 * now tell other nodes that I am
1103                                 * mastering this. */
1104                                mle->master = dlm->node_num;
1105                                /* ref was grabbed in get_lock_resource
1106                                 * will be dropped in dlmlock_master */
1107                                assert = 1;
1108                                sleep = 0;
1109                        }
1110                        /* if voting is done, but we have not received
1111                         * an assert master yet, we must sleep */
1112                }
1113        }
1114
1115        spin_unlock(&mle->spinlock);
1116
1117        /* sleep if we haven't finished voting yet */
1118        if (sleep) {
1119                unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
1120                atomic_set(&mle->woken, 0);
1121                (void)wait_event_timeout(mle->wq,
1122                                         (atomic_read(&mle->woken) == 1),
1123                                         timeo);
1124                if (res->owner == O2NM_MAX_NODES) {
1125                        mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1126                             res->lockname.len, res->lockname.name);
1127                        goto recheck;
1128                }
1129                mlog(0, "done waiting, master is %u\n", res->owner);
1130                ret = 0;
1131                goto leave;
1132        }
1133
1134        ret = 0;   /* done */
1135        if (assert) {
1136                m = dlm->node_num;
1137                mlog(0, "about to master %.*s here, this=%u\n",
1138                     res->lockname.len, res->lockname.name, m);
1139                ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1140                if (ret) {
1141                        /* This is a failure in the network path,
1142                         * not in the response to the assert_master
1143                         * (any nonzero response is a BUG on this node).
1144                         * Most likely a socket just got disconnected
1145                         * due to node death. */
1146                        mlog_errno(ret);
1147                }
1148                /* no longer need to restart lock mastery.
1149                 * all living nodes have been contacted. */
1150                ret = 0;
1151        }
1152
1153        /* set the lockres owner */
1154        spin_lock(&res->spinlock);
1155        /* mastery reference obtained either during
1156         * assert_master_handler or in get_lock_resource */
1157        dlm_change_lockres_owner(dlm, res, m);
1158        spin_unlock(&res->spinlock);
1159
1160leave:
1161        return ret;
1162}
1163
1164struct dlm_bitmap_diff_iter
1165{
1166        int curnode;
1167        unsigned long *orig_bm;
1168        unsigned long *cur_bm;
1169        unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
1170};
1171
1172enum dlm_node_state_change
1173{
1174        NODE_DOWN = -1,
1175        NODE_NO_CHANGE = 0,
1176        NODE_UP
1177};
1178
1179static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
1180                                      unsigned long *orig_bm,
1181                                      unsigned long *cur_bm)
1182{
1183        unsigned long p1, p2;
1184        int i;
1185
1186        iter->curnode = -1;
1187        iter->orig_bm = orig_bm;
1188        iter->cur_bm = cur_bm;
1189
1190        for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1191                p1 = *(iter->orig_bm + i);
1192                p2 = *(iter->cur_bm + i);
1193                iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1194        }
1195}
1196
1197static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1198                                     enum dlm_node_state_change *state)
1199{
1200        int bit;
1201
1202        if (iter->curnode >= O2NM_MAX_NODES)
1203                return -ENOENT;
1204
1205        bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1206                            iter->curnode+1);
1207        if (bit >= O2NM_MAX_NODES) {
1208                iter->curnode = O2NM_MAX_NODES;
1209                return -ENOENT;
1210        }
1211
1212        /* if it was there in the original then this node died */
1213        if (test_bit(bit, iter->orig_bm))
1214                *state = NODE_DOWN;
1215        else
1216                *state = NODE_UP;
1217
1218        iter->curnode = bit;
1219        return bit;
1220}
1221
1222
1223static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1224                                    struct dlm_lock_resource *res,
1225                                    struct dlm_master_list_entry *mle,
1226                                    int blocked)
1227{
1228        struct dlm_bitmap_diff_iter bdi;
1229        enum dlm_node_state_change sc;
1230        int node;
1231        int ret = 0;
1232
1233        mlog(0, "something happened such that the "
1234             "master process may need to be restarted!\n");
1235
1236        assert_spin_locked(&mle->spinlock);
1237
1238        dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1239        node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1240        while (node >= 0) {
1241                if (sc == NODE_UP) {
1242                        /* a node came up.  clear any old vote from
1243                         * the response map and set it in the vote map
1244                         * then restart the mastery. */
1245                        mlog(ML_NOTICE, "node %d up while restarting\n", node);
1246
1247                        /* redo the master request, but only for the new node */
1248                        mlog(0, "sending request to new node\n");
1249                        clear_bit(node, mle->response_map);
1250                        set_bit(node, mle->vote_map);
1251                } else {
1252                        mlog(ML_ERROR, "node down! %d\n", node);
1253                        if (blocked) {
1254                                int lowest = find_next_bit(mle->maybe_map,
1255                                                       O2NM_MAX_NODES, 0);
1256
1257                                /* act like it was never there */
1258                                clear_bit(node, mle->maybe_map);
1259
1260                                if (node == lowest) {
1261                                        mlog(0, "expected master %u died"
1262                                            " while this node was blocked "
1263                                            "waiting on it!\n", node);
1264                                        lowest = find_next_bit(mle->maybe_map,
1265                                                        O2NM_MAX_NODES,
1266                                                        lowest+1);
1267                                        if (lowest < O2NM_MAX_NODES) {
1268                                                mlog(0, "%s:%.*s:still "
1269                                                     "blocked. waiting on %u "
1270                                                     "now\n", dlm->name,
1271                                                     res->lockname.len,
1272                                                     res->lockname.name,
1273                                                     lowest);
1274                                        } else {
1275                                                /* mle is an MLE_BLOCK, but
1276                                                 * there is now nothing left to
1277                                                 * block on.  we need to return
1278                                                 * all the way back out and try
1279                                                 * again with an MLE_MASTER.
1280                                                 * dlm_do_local_recovery_cleanup
1281                                                 * has already run, so the mle
1282                                                 * refcount is ok */
1283                                                mlog(0, "%s:%.*s: no "
1284                                                     "longer blocking. try to "
1285                                                     "master this here\n",
1286                                                     dlm->name,
1287                                                     res->lockname.len,
1288                                                     res->lockname.name);
1289                                                mle->type = DLM_MLE_MASTER;
1290                                                mle->mleres = res;
1291                                        }
1292                                }
1293                        }
1294
1295                        /* now blank out everything, as if we had never
1296                         * contacted anyone */
1297                        memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
1298                        memset(mle->response_map, 0, sizeof(mle->response_map));
1299                        /* reset the vote_map to the current node_map */
1300                        memcpy(mle->vote_map, mle->node_map,
1301                               sizeof(mle->node_map));
1302                        /* put myself into the maybe map */
1303                        if (mle->type != DLM_MLE_BLOCK)
1304                                set_bit(dlm->node_num, mle->maybe_map);
1305                }
1306                ret = -EAGAIN;
1307                node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1308        }
1309        return ret;
1310}
1311
1312
1313/*
1314 * DLM_MASTER_REQUEST_MSG
1315 *
1316 * returns: 0 on success,
1317 *          -errno on a network error
1318 *
1319 * on error, the caller should assume the target node is "dead"
1320 *
1321 */
1322
1323static int dlm_do_master_request(struct dlm_lock_resource *res,
1324                                 struct dlm_master_list_entry *mle, int to)
1325{
1326        struct dlm_ctxt *dlm = mle->dlm;
1327        struct dlm_master_request request;
1328        int ret, response=0, resend;
1329
1330        memset(&request, 0, sizeof(request));
1331        request.node_idx = dlm->node_num;
1332
1333        BUG_ON(mle->type == DLM_MLE_MIGRATION);
1334
1335        request.namelen = (u8)mle->mnamelen;
1336        memcpy(request.name, mle->mname, request.namelen);
1337
1338again:
1339        ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1340                                 sizeof(request), to, &response);
1341        if (ret < 0)  {
1342                if (ret == -ESRCH) {
1343                        /* should never happen */
1344                        mlog(ML_ERROR, "TCP stack not ready!\n");
1345                        BUG();
1346                } else if (ret == -EINVAL) {
1347                        mlog(ML_ERROR, "bad args passed to o2net!\n");
1348                        BUG();
1349                } else if (ret == -ENOMEM) {
1350                        mlog(ML_ERROR, "out of memory while trying to send "
1351                             "network message!  retrying\n");
1352                        /* this is totally crude */
1353                        msleep(50);
1354                        goto again;
1355                } else if (!dlm_is_host_down(ret)) {
1356                        /* not a network error. bad. */
1357                        mlog_errno(ret);
1358                        mlog(ML_ERROR, "unhandled error!");
1359                        BUG();
1360                }
1361                /* all other errors should be network errors,
1362                 * and likely indicate node death */
1363                mlog(ML_ERROR, "link to %d went down!\n", to);
1364                goto out;
1365        }
1366
1367        ret = 0;
1368        resend = 0;
1369        spin_lock(&mle->spinlock);
1370        switch (response) {
1371                case DLM_MASTER_RESP_YES:
1372                        set_bit(to, mle->response_map);
1373                        mlog(0, "node %u is the master, response=YES\n", to);
1374                        mlog(0, "%s:%.*s: master node %u now knows I have a "
1375                             "reference\n", dlm->name, res->lockname.len,
1376                             res->lockname.name, to);
1377                        mle->master = to;
1378                        break;
1379                case DLM_MASTER_RESP_NO:
1380                        mlog(0, "node %u not master, response=NO\n", to);
1381                        set_bit(to, mle->response_map);
1382                        break;
1383                case DLM_MASTER_RESP_MAYBE:
1384                        mlog(0, "node %u not master, response=MAYBE\n", to);
1385                        set_bit(to, mle->response_map);
1386                        set_bit(to, mle->maybe_map);
1387                        break;
1388                case DLM_MASTER_RESP_ERROR:
1389                        mlog(0, "node %u hit an error, resending\n", to);
1390                        resend = 1;
1391                        response = 0;
1392                        break;
1393                default:
1394                        mlog(ML_ERROR, "bad response! %u\n", response);
1395                        BUG();
1396        }
1397        spin_unlock(&mle->spinlock);
1398        if (resend) {
1399                /* this is also totally crude */
1400                msleep(50);
1401                goto again;
1402        }
1403
1404out:
1405        return ret;
1406}
1407
1408/*
1409 * locks that can be taken here:
1410 * dlm->spinlock
1411 * res->spinlock
1412 * mle->spinlock
1413 * dlm->master_list
1414 *
1415 * if possible, TRIM THIS DOWN!!!
1416 */
1417int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
1418                               void **ret_data)
1419{
1420        u8 response = DLM_MASTER_RESP_MAYBE;
1421        struct dlm_ctxt *dlm = data;
1422        struct dlm_lock_resource *res = NULL;
1423        struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1424        struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1425        char *name;
1426        unsigned int namelen, hash;
1427        int found, ret;
1428        int set_maybe;
1429        int dispatch_assert = 0;
1430        int dispatched = 0;
1431
1432        if (!dlm_grab(dlm))
1433                return DLM_MASTER_RESP_NO;
1434
1435        if (!dlm_domain_fully_joined(dlm)) {
1436                response = DLM_MASTER_RESP_NO;
1437                goto send_response;
1438        }
1439
1440        name = request->name;
1441        namelen = request->namelen;
1442        hash = dlm_lockid_hash(name, namelen);
1443
1444        if (namelen > DLM_LOCKID_NAME_MAX) {
1445                response = DLM_IVBUFLEN;
1446                goto send_response;
1447        }
1448
1449way_up_top:
1450        spin_lock(&dlm->spinlock);
1451        res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1452        if (res) {
1453                spin_unlock(&dlm->spinlock);
1454
1455                /* take care of the easy cases up front */
1456                spin_lock(&res->spinlock);
1457
1458                /*
1459                 * Right after dlm spinlock was released, dlm_thread could have
1460                 * purged the lockres. Check if lockres got unhashed. If so
1461                 * start over.
1462                 */
1463                if (hlist_unhashed(&res->hash_node)) {
1464                        spin_unlock(&res->spinlock);
1465                        dlm_lockres_put(res);
1466                        goto way_up_top;
1467                }
1468
1469                if (res->state & (DLM_LOCK_RES_RECOVERING|
1470                                  DLM_LOCK_RES_MIGRATING)) {
1471                        spin_unlock(&res->spinlock);
1472                        mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1473                             "being recovered/migrated\n");
1474                        response = DLM_MASTER_RESP_ERROR;
1475                        if (mle)
1476                                kmem_cache_free(dlm_mle_cache, mle);
1477                        goto send_response;
1478                }
1479
1480                if (res->owner == dlm->node_num) {
1481                        dlm_lockres_set_refmap_bit(dlm, res, request->node_idx);
1482                        spin_unlock(&res->spinlock);
1483                        response = DLM_MASTER_RESP_YES;
1484                        if (mle)
1485                                kmem_cache_free(dlm_mle_cache, mle);
1486
1487                        /* this node is the owner.
1488                         * there is some extra work that needs to
1489                         * happen now.  the requesting node has
1490                         * caused all nodes up to this one to
1491                         * create mles.  this node now needs to
1492                         * go back and clean those up. */
1493                        dispatch_assert = 1;
1494                        goto send_response;
1495                } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1496                        spin_unlock(&res->spinlock);
1497                        // mlog(0, "node %u is the master\n", res->owner);
1498                        response = DLM_MASTER_RESP_NO;
1499                        if (mle)
1500                                kmem_cache_free(dlm_mle_cache, mle);
1501                        goto send_response;
1502                }
1503
1504                /* ok, there is no owner.  either this node is
1505                 * being blocked, or it is actively trying to
1506                 * master this lock. */
1507                if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1508                        mlog(ML_ERROR, "lock with no owner should be "
1509                             "in-progress!\n");
1510                        BUG();
1511                }
1512
1513                // mlog(0, "lockres is in progress...\n");
1514                spin_lock(&dlm->master_lock);
1515                found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1516                if (!found) {
1517                        mlog(ML_ERROR, "no mle found for this lock!\n");
1518                        BUG();
1519                }
1520                set_maybe = 1;
1521                spin_lock(&tmpmle->spinlock);
1522                if (tmpmle->type == DLM_MLE_BLOCK) {
1523                        // mlog(0, "this node is waiting for "
1524                        // "lockres to be mastered\n");
1525                        response = DLM_MASTER_RESP_NO;
1526                } else if (tmpmle->type == DLM_MLE_MIGRATION) {
1527                        mlog(0, "node %u is master, but trying to migrate to "
1528                             "node %u.\n", tmpmle->master, tmpmle->new_master);
1529                        if (tmpmle->master == dlm->node_num) {
1530                                mlog(ML_ERROR, "no owner on lockres, but this "
1531                                     "node is trying to migrate it to %u?!\n",
1532                                     tmpmle->new_master);
1533                                BUG();
1534                        } else {
1535                                /* the real master can respond on its own */
1536                                response = DLM_MASTER_RESP_NO;
1537                        }
1538                } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1539                        set_maybe = 0;
1540                        if (tmpmle->master == dlm->node_num) {
1541                                response = DLM_MASTER_RESP_YES;
1542                                /* this node will be the owner.
1543                                 * go back and clean the mles on any
1544                                 * other nodes */
1545                                dispatch_assert = 1;
1546                                dlm_lockres_set_refmap_bit(dlm, res,
1547                                                           request->node_idx);
1548                        } else
1549                                response = DLM_MASTER_RESP_NO;
1550                } else {
1551                        // mlog(0, "this node is attempting to "
1552                        // "master lockres\n");
1553                        response = DLM_MASTER_RESP_MAYBE;
1554                }
1555                if (set_maybe)
1556                        set_bit(request->node_idx, tmpmle->maybe_map);
1557                spin_unlock(&tmpmle->spinlock);
1558
1559                spin_unlock(&dlm->master_lock);
1560                spin_unlock(&res->spinlock);
1561
1562                /* keep the mle attached to heartbeat events */
1563                dlm_put_mle(tmpmle);
1564                if (mle)
1565                        kmem_cache_free(dlm_mle_cache, mle);
1566                goto send_response;
1567        }
1568
1569        /*
1570         * lockres doesn't exist on this node
1571         * if there is an MLE_BLOCK, return NO
1572         * if there is an MLE_MASTER, return MAYBE
1573         * otherwise, add an MLE_BLOCK, return NO
1574         */
1575        spin_lock(&dlm->master_lock);
1576        found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1577        if (!found) {
1578                /* this lockid has never been seen on this node yet */
1579                // mlog(0, "no mle found\n");
1580                if (!mle) {
1581                        spin_unlock(&dlm->master_lock);
1582                        spin_unlock(&dlm->spinlock);
1583
1584                        mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
1585                        if (!mle) {
1586                                response = DLM_MASTER_RESP_ERROR;
1587                                mlog_errno(-ENOMEM);
1588                                goto send_response;
1589                        }
1590                        goto way_up_top;
1591                }
1592
1593                // mlog(0, "this is second time thru, already allocated, "
1594                // "add the block.\n");
1595                dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1596                set_bit(request->node_idx, mle->maybe_map);
1597                __dlm_insert_mle(dlm, mle);
1598                response = DLM_MASTER_RESP_NO;
1599        } else {
1600                spin_lock(&tmpmle->spinlock);
1601                if (tmpmle->master == dlm->node_num) {
1602                        mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1603                        BUG();
1604                }
1605                if (tmpmle->type == DLM_MLE_BLOCK)
1606                        response = DLM_MASTER_RESP_NO;
1607                else if (tmpmle->type == DLM_MLE_MIGRATION) {
1608                        mlog(0, "migration mle was found (%u->%u)\n",
1609                             tmpmle->master, tmpmle->new_master);
1610                        /* real master can respond on its own */
1611                        response = DLM_MASTER_RESP_NO;
1612                } else
1613                        response = DLM_MASTER_RESP_MAYBE;
1614                set_bit(request->node_idx, tmpmle->maybe_map);
1615                spin_unlock(&tmpmle->spinlock);
1616        }
1617        spin_unlock(&dlm->master_lock);
1618        spin_unlock(&dlm->spinlock);
1619
1620        if (found) {
1621                /* keep the mle attached to heartbeat events */
1622                dlm_put_mle(tmpmle);
1623        }
1624send_response:
1625        /*
1626         * __dlm_lookup_lockres() grabbed a reference to this lockres.
1627         * The reference is released by dlm_assert_master_worker() under
1628         * the call to dlm_dispatch_assert_master().  If
1629         * dlm_assert_master_worker() isn't called, we drop it here.
1630         */
1631        if (dispatch_assert) {
1632                mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1633                             dlm->node_num, res->lockname.len, res->lockname.name);
1634                spin_lock(&res->spinlock);
1635                ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
1636                                                 DLM_ASSERT_MASTER_MLE_CLEANUP);
1637                if (ret < 0) {
1638                        mlog(ML_ERROR, "failed to dispatch assert master work\n");
1639                        response = DLM_MASTER_RESP_ERROR;
1640                        spin_unlock(&res->spinlock);
1641                        dlm_lockres_put(res);
1642                } else {
1643                        dispatched = 1;
1644                        __dlm_lockres_grab_inflight_worker(dlm, res);
1645                        spin_unlock(&res->spinlock);
1646                }
1647        } else {
1648                if (res)
1649                        dlm_lockres_put(res);
1650        }
1651
1652        if (!dispatched)
1653                dlm_put(dlm);
1654        return response;
1655}
1656
1657/*
1658 * DLM_ASSERT_MASTER_MSG
1659 */
1660
1661
1662/*
1663 * NOTE: this can be used for debugging
1664 * can periodically run all locks owned by this node
1665 * and re-assert across the cluster...
1666 */
1667static int dlm_do_assert_master(struct dlm_ctxt *dlm,
1668                                struct dlm_lock_resource *res,
1669                                void *nodemap, u32 flags)
1670{
1671        struct dlm_assert_master assert;
1672        int to, tmpret;
1673        struct dlm_node_iter iter;
1674        int ret = 0;
1675        int reassert;
1676        const char *lockname = res->lockname.name;
1677        unsigned int namelen = res->lockname.len;
1678
1679        BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1680
1681        spin_lock(&res->spinlock);
1682        res->state |= DLM_LOCK_RES_SETREF_INPROG;
1683        spin_unlock(&res->spinlock);
1684
1685again:
1686        reassert = 0;
1687
1688        /* note that if this nodemap is empty, it returns 0 */
1689        dlm_node_iter_init(nodemap, &iter);
1690        while ((to = dlm_node_iter_next(&iter)) >= 0) {
1691                int r = 0;
1692                struct dlm_master_list_entry *mle = NULL;
1693
1694                mlog(0, "sending assert master to %d (%.*s)\n", to,
1695                     namelen, lockname);
1696                memset(&assert, 0, sizeof(assert));
1697                assert.node_idx = dlm->node_num;
1698                assert.namelen = namelen;
1699                memcpy(assert.name, lockname, namelen);
1700                assert.flags = cpu_to_be32(flags);
1701
1702                tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1703                                            &assert, sizeof(assert), to, &r);
1704                if (tmpret < 0) {
1705                        mlog(ML_ERROR, "Error %d when sending message %u (key "
1706                             "0x%x) to node %u\n", tmpret,
1707                             DLM_ASSERT_MASTER_MSG, dlm->key, to);
1708                        if (!dlm_is_host_down(tmpret)) {
1709                                mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
1710                                BUG();
1711                        }
1712                        /* a node died.  finish out the rest of the nodes. */
1713                        mlog(0, "link to %d went down!\n", to);
1714                        /* any nonzero status return will do */
1715                        ret = tmpret;
1716                        r = 0;
1717                } else if (r < 0) {
1718                        /* ok, something horribly messed.  kill thyself. */
1719                        mlog(ML_ERROR,"during assert master of %.*s to %u, "
1720                             "got %d.\n", namelen, lockname, to, r);
1721                        spin_lock(&dlm->spinlock);
1722                        spin_lock(&dlm->master_lock);
1723                        if (dlm_find_mle(dlm, &mle, (char *)lockname,
1724                                         namelen)) {
1725                                dlm_print_one_mle(mle);
1726                                __dlm_put_mle(mle);
1727                        }
1728                        spin_unlock(&dlm->master_lock);
1729                        spin_unlock(&dlm->spinlock);
1730                        BUG();
1731                }
1732
1733                if (r & DLM_ASSERT_RESPONSE_REASSERT &&
1734                    !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
1735                                mlog(ML_ERROR, "%.*s: very strange, "
1736                                     "master MLE but no lockres on %u\n",
1737                                     namelen, lockname, to);
1738                }
1739
1740                if (r & DLM_ASSERT_RESPONSE_REASSERT) {
1741                        mlog(0, "%.*s: node %u create mles on other "
1742                             "nodes and requests a re-assert\n",
1743                             namelen, lockname, to);
1744                        reassert = 1;
1745                }
1746                if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
1747                        mlog(0, "%.*s: node %u has a reference to this "
1748                             "lockres, set the bit in the refmap\n",
1749                             namelen, lockname, to);
1750                        spin_lock(&res->spinlock);
1751                        dlm_lockres_set_refmap_bit(dlm, res, to);
1752                        spin_unlock(&res->spinlock);
1753                }
1754        }
1755
1756        if (reassert)
1757                goto again;
1758
1759        spin_lock(&res->spinlock);
1760        res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
1761        spin_unlock(&res->spinlock);
1762        wake_up(&res->wq);
1763
1764        return ret;
1765}
1766
1767/*
1768 * locks that can be taken here:
1769 * dlm->spinlock
1770 * res->spinlock
1771 * mle->spinlock
1772 * dlm->master_list
1773 *
1774 * if possible, TRIM THIS DOWN!!!
1775 */
1776int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
1777                              void **ret_data)
1778{
1779        struct dlm_ctxt *dlm = data;
1780        struct dlm_master_list_entry *mle = NULL;
1781        struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1782        struct dlm_lock_resource *res = NULL;
1783        char *name;
1784        unsigned int namelen, hash;
1785        u32 flags;
1786        int master_request = 0, have_lockres_ref = 0;
1787        int ret = 0;
1788
1789        if (!dlm_grab(dlm))
1790                return 0;
1791
1792        name = assert->name;
1793        namelen = assert->namelen;
1794        hash = dlm_lockid_hash(name, namelen);
1795        flags = be32_to_cpu(assert->flags);
1796
1797        if (namelen > DLM_LOCKID_NAME_MAX) {
1798                mlog(ML_ERROR, "Invalid name length!");
1799                goto done;
1800        }
1801
1802        spin_lock(&dlm->spinlock);
1803
1804        if (flags)
1805                mlog(0, "assert_master with flags: %u\n", flags);
1806
1807        /* find the MLE */
1808        spin_lock(&dlm->master_lock);
1809        if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1810                /* not an error, could be master just re-asserting */
1811                mlog(0, "just got an assert_master from %u, but no "
1812                     "MLE for it! (%.*s)\n", assert->node_idx,
1813                     namelen, name);
1814        } else {
1815                int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
1816                if (bit >= O2NM_MAX_NODES) {
1817                        /* not necessarily an error, though less likely.
1818                         * could be master just re-asserting. */
1819                        mlog(0, "no bits set in the maybe_map, but %u "
1820                             "is asserting! (%.*s)\n", assert->node_idx,
1821                             namelen, name);
1822                } else if (bit != assert->node_idx) {
1823                        if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1824                                mlog(0, "master %u was found, %u should "
1825                                     "back off\n", assert->node_idx, bit);
1826                        } else {
1827                                /* with the fix for bug 569, a higher node
1828                                 * number winning the mastery will respond
1829                                 * YES to mastery requests, but this node
1830                                 * had no way of knowing.  let it pass. */
1831                                mlog(0, "%u is the lowest node, "
1832                                     "%u is asserting. (%.*s)  %u must "
1833                                     "have begun after %u won.\n", bit,
1834                                     assert->node_idx, namelen, name, bit,
1835                                     assert->node_idx);
1836                        }
1837                }
1838                if (mle->type == DLM_MLE_MIGRATION) {
1839                        if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1840                                mlog(0, "%s:%.*s: got cleanup assert"
1841                                     " from %u for migration\n",
1842                                     dlm->name, namelen, name,
1843                                     assert->node_idx);
1844                        } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
1845                                mlog(0, "%s:%.*s: got unrelated assert"
1846                                     " from %u for migration, ignoring\n",
1847                                     dlm->name, namelen, name,
1848                                     assert->node_idx);
1849                                __dlm_put_mle(mle);
1850                                spin_unlock(&dlm->master_lock);
1851                                spin_unlock(&dlm->spinlock);
1852                                goto done;
1853                        }
1854                }
1855        }
1856        spin_unlock(&dlm->master_lock);
1857
1858        /* ok everything checks out with the MLE
1859         * now check to see if there is a lockres */
1860        res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1861        if (res) {
1862                spin_lock(&res->spinlock);
1863                if (res->state & DLM_LOCK_RES_RECOVERING)  {
1864                        mlog(ML_ERROR, "%u asserting but %.*s is "
1865                             "RECOVERING!\n", assert->node_idx, namelen, name);
1866                        goto kill;
1867                }
1868                if (!mle) {
1869                        if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
1870                            res->owner != assert->node_idx) {
1871                                mlog(ML_ERROR, "DIE! Mastery assert from %u, "
1872                                     "but current owner is %u! (%.*s)\n",
1873                                     assert->node_idx, res->owner, namelen,
1874                                     name);
1875                                __dlm_print_one_lock_resource(res);
1876                                BUG();
1877                        }
1878                } else if (mle->type != DLM_MLE_MIGRATION) {
1879                        if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1880                                /* owner is just re-asserting */
1881                                if (res->owner == assert->node_idx) {
1882                                        mlog(0, "owner %u re-asserting on "
1883                                             "lock %.*s\n", assert->node_idx,
1884                                             namelen, name);
1885                                        goto ok;
1886                                }
1887                                mlog(ML_ERROR, "got assert_master from "
1888                                     "node %u, but %u is the owner! "
1889                                     "(%.*s)\n", assert->node_idx,
1890                                     res->owner, namelen, name);
1891                                goto kill;
1892                        }
1893                        if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1894                                mlog(ML_ERROR, "got assert from %u, but lock "
1895                                     "with no owner should be "
1896                                     "in-progress! (%.*s)\n",
1897                                     assert->node_idx,
1898                                     namelen, name);
1899                                goto kill;
1900                        }
1901                } else /* mle->type == DLM_MLE_MIGRATION */ {
1902                        /* should only be getting an assert from new master */
1903                        if (assert->node_idx != mle->new_master) {
1904                                mlog(ML_ERROR, "got assert from %u, but "
1905                                     "new master is %u, and old master "
1906                                     "was %u (%.*s)\n",
1907                                     assert->node_idx, mle->new_master,
1908                                     mle->master, namelen, name);
1909                                goto kill;
1910                        }
1911
1912                }
1913ok:
1914                spin_unlock(&res->spinlock);
1915        }
1916
1917        // mlog(0, "woo!  got an assert_master from node %u!\n",
1918        //           assert->node_idx);
1919        if (mle) {
1920                int extra_ref = 0;
1921                int nn = -1;
1922                int rr, err = 0;
1923
1924                spin_lock(&mle->spinlock);
1925                if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1926                        extra_ref = 1;
1927                else {
1928                        /* MASTER mle: if any bits set in the response map
1929                         * then the calling node needs to re-assert to clear
1930                         * up nodes that this node contacted */
1931                        while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
1932                                                    nn+1)) < O2NM_MAX_NODES) {
1933                                if (nn != dlm->node_num && nn != assert->node_idx) {
1934                                        master_request = 1;
1935                                        break;
1936                                }
1937                        }
1938                }
1939                mle->master = assert->node_idx;
1940                atomic_set(&mle->woken, 1);
1941                wake_up(&mle->wq);
1942                spin_unlock(&mle->spinlock);
1943
1944                if (res) {
1945                        int wake = 0;
1946                        spin_lock(&res->spinlock);
1947                        if (mle->type == DLM_MLE_MIGRATION) {
1948                                mlog(0, "finishing off migration of lockres %.*s, "
1949                                        "from %u to %u\n",
1950                                        res->lockname.len, res->lockname.name,
1951                                        dlm->node_num, mle->new_master);
1952                                res->state &= ~DLM_LOCK_RES_MIGRATING;
1953                                wake = 1;
1954                                dlm_change_lockres_owner(dlm, res, mle->new_master);
1955                                BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1956                        } else {
1957                                dlm_change_lockres_owner(dlm, res, mle->master);
1958                        }
1959                        spin_unlock(&res->spinlock);
1960                        have_lockres_ref = 1;
1961                        if (wake)
1962                                wake_up(&res->wq);
1963                }
1964
1965                /* master is known, detach if not already detached.
1966                 * ensures that only one assert_master call will happen
1967                 * on this mle. */
1968                spin_lock(&dlm->master_lock);
1969
1970                rr = kref_read(&mle->mle_refs);
1971                if (mle->inuse > 0) {
1972                        if (extra_ref && rr < 3)
1973                                err = 1;
1974                        else if (!extra_ref && rr < 2)
1975                                err = 1;
1976                } else {
1977                        if (extra_ref && rr < 2)
1978                                err = 1;
1979                        else if (!extra_ref && rr < 1)
1980                                err = 1;
1981                }
1982                if (err) {
1983                        mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
1984                             "that will mess up this node, refs=%d, extra=%d, "
1985                             "inuse=%d\n", dlm->name, namelen, name,
1986                             assert->node_idx, rr, extra_ref, mle->inuse);
1987                        dlm_print_one_mle(mle);
1988                }
1989                __dlm_unlink_mle(dlm, mle);
1990                __dlm_mle_detach_hb_events(dlm, mle);
1991                __dlm_put_mle(mle);
1992                if (extra_ref) {
1993                        /* the assert master message now balances the extra
1994                         * ref given by the master / migration request message.
1995                         * if this is the last put, it will be removed
1996                         * from the list. */
1997                        __dlm_put_mle(mle);
1998                }
1999                spin_unlock(&dlm->master_lock);
2000        } else if (res) {
2001                if (res->owner != assert->node_idx) {
2002                        mlog(0, "assert_master from %u, but current "
2003                             "owner is %u (%.*s), no mle\n", assert->node_idx,
2004                             res->owner, namelen, name);
2005                }
2006        }
2007        spin_unlock(&dlm->spinlock);
2008
2009done:
2010        ret = 0;
2011        if (res) {
2012                spin_lock(&res->spinlock);
2013                res->state |= DLM_LOCK_RES_SETREF_INPROG;
2014                spin_unlock(&res->spinlock);
2015                *ret_data = (void *)res;
2016        }
2017        dlm_put(dlm);
2018        if (master_request) {
2019                mlog(0, "need to tell master to reassert\n");
2020                /* positive. negative would shoot down the node. */
2021                ret |= DLM_ASSERT_RESPONSE_REASSERT;
2022                if (!have_lockres_ref) {
2023                        mlog(ML_ERROR, "strange, got assert from %u, MASTER "
2024                             "mle present here for %s:%.*s, but no lockres!\n",
2025                             assert->node_idx, dlm->name, namelen, name);
2026                }
2027        }
2028        if (have_lockres_ref) {
2029                /* let the master know we have a reference to the lockres */
2030                ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
2031                mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
2032                     dlm->name, namelen, name, assert->node_idx);
2033        }
2034        return ret;
2035
2036kill:
2037        /* kill the caller! */
2038        mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
2039             "and killing the other node now!  This node is OK and can continue.\n");
2040        __dlm_print_one_lock_resource(res);
2041        spin_unlock(&res->spinlock);
2042        spin_lock(&dlm->master_lock);
2043        if (mle)
2044                __dlm_put_mle(mle);
2045        spin_unlock(&dlm->master_lock);
2046        spin_unlock(&dlm->spinlock);
2047        *ret_data = (void *)res;
2048        dlm_put(dlm);
2049        return -EINVAL;
2050}
2051
2052void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
2053{
2054        struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
2055
2056        if (ret_data) {
2057                spin_lock(&res->spinlock);
2058                res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
2059                spin_unlock(&res->spinlock);
2060                wake_up(&res->wq);
2061                dlm_lockres_put(res);
2062        }
2063        return;
2064}
2065
2066int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
2067                               struct dlm_lock_resource *res,
2068                               int ignore_higher, u8 request_from, u32 flags)
2069{
2070        struct dlm_work_item *item;
2071        item = kzalloc(sizeof(*item), GFP_ATOMIC);
2072        if (!item)
2073                return -ENOMEM;
2074
2075
2076        /* queue up work for dlm_assert_master_worker */
2077        dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
2078        item->u.am.lockres = res; /* already have a ref */
2079        /* can optionally ignore node numbers higher than this node */
2080        item->u.am.ignore_higher = ignore_higher;
2081        item->u.am.request_from = request_from;
2082        item->u.am.flags = flags;
2083
2084        if (ignore_higher)
2085                mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
2086                     res->lockname.name);
2087
2088        spin_lock(&dlm->work_lock);
2089        list_add_tail(&item->list, &dlm->work_list);
2090        spin_unlock(&dlm->work_lock);
2091
2092        queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2093        return 0;
2094}
2095
2096static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
2097{
2098        struct dlm_ctxt *dlm = data;
2099        int ret = 0;
2100        struct dlm_lock_resource *res;
2101        unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
2102        int ignore_higher;
2103        int bit;
2104        u8 request_from;
2105        u32 flags;
2106
2107        dlm = item->dlm;
2108        res = item->u.am.lockres;
2109        ignore_higher = item->u.am.ignore_higher;
2110        request_from = item->u.am.request_from;
2111        flags = item->u.am.flags;
2112
2113        spin_lock(&dlm->spinlock);
2114        memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
2115        spin_unlock(&dlm->spinlock);
2116
2117        clear_bit(dlm->node_num, nodemap);
2118        if (ignore_higher) {
2119                /* if is this just to clear up mles for nodes below
2120                 * this node, do not send the message to the original
2121                 * caller or any node number higher than this */
2122                clear_bit(request_from, nodemap);
2123                bit = dlm->node_num;
2124                while (1) {
2125                        bit = find_next_bit(nodemap, O2NM_MAX_NODES,
2126                                            bit+1);
2127                        if (bit >= O2NM_MAX_NODES)
2128                                break;
2129                        clear_bit(bit, nodemap);
2130                }
2131        }
2132
2133        /*
2134         * If we're migrating this lock to someone else, we are no
2135         * longer allowed to assert out own mastery.  OTOH, we need to
2136         * prevent migration from starting while we're still asserting
2137         * our dominance.  The reserved ast delays migration.
2138         */
2139        spin_lock(&res->spinlock);
2140        if (res->state & DLM_LOCK_RES_MIGRATING) {
2141                mlog(0, "Someone asked us to assert mastery, but we're "
2142                     "in the middle of migration.  Skipping assert, "
2143                     "the new master will handle that.\n");
2144                spin_unlock(&res->spinlock);
2145                goto put;
2146        } else
2147                __dlm_lockres_reserve_ast(res);
2148        spin_unlock(&res->spinlock);
2149
2150        /* this call now finishes out the nodemap
2151         * even if one or more nodes die */
2152        mlog(0, "worker about to master %.*s here, this=%u\n",
2153                     res->lockname.len, res->lockname.name, dlm->node_num);
2154        ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2155        if (ret < 0) {
2156                /* no need to restart, we are done */
2157                if (!dlm_is_host_down(ret))
2158                        mlog_errno(ret);
2159        }
2160
2161        /* Ok, we've asserted ourselves.  Let's let migration start. */
2162        dlm_lockres_release_ast(dlm, res);
2163
2164put:
2165        dlm_lockres_drop_inflight_worker(dlm, res);
2166
2167        dlm_lockres_put(res);
2168
2169        mlog(0, "finished with dlm_assert_master_worker\n");
2170}
2171
2172/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
2173 * We cannot wait for node recovery to complete to begin mastering this
2174 * lockres because this lockres is used to kick off recovery! ;-)
2175 * So, do a pre-check on all living nodes to see if any of those nodes
2176 * think that $RECOVERY is currently mastered by a dead node.  If so,
2177 * we wait a short time to allow that node to get notified by its own
2178 * heartbeat stack, then check again.  All $RECOVERY lock resources
2179 * mastered by dead nodes are purged when the hearbeat callback is
2180 * fired, so we can know for sure that it is safe to continue once
2181 * the node returns a live node or no node.  */
2182static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2183                                       struct dlm_lock_resource *res)
2184{
2185        struct dlm_node_iter iter;
2186        int nodenum;
2187        int ret = 0;
2188        u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
2189
2190        spin_lock(&dlm->spinlock);
2191        dlm_node_iter_init(dlm->domain_map, &iter);
2192        spin_unlock(&dlm->spinlock);
2193
2194        while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2195                /* do not send to self */
2196                if (nodenum == dlm->node_num)
2197                        continue;
2198                ret = dlm_do_master_requery(dlm, res, nodenum, &master);
2199                if (ret < 0) {
2200                        mlog_errno(ret);
2201                        if (!dlm_is_host_down(ret))
2202                                BUG();
2203                        /* host is down, so answer for that node would be
2204                         * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
2205                        ret = 0;
2206                }
2207
2208                if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
2209                        /* check to see if this master is in the recovery map */
2210                        spin_lock(&dlm->spinlock);
2211                        if (test_bit(master, dlm->recovery_map)) {
2212                                mlog(ML_NOTICE, "%s: node %u has not seen "
2213                                     "node %u go down yet, and thinks the "
2214                                     "dead node is mastering the recovery "
2215                                     "lock.  must wait.\n", dlm->name,
2216                                     nodenum, master);
2217                                ret = -EAGAIN;
2218                        }
2219                        spin_unlock(&dlm->spinlock);
2220                        mlog(0, "%s: reco lock master is %u\n", dlm->name,
2221                             master);
2222                        break;
2223                }
2224        }
2225        return ret;
2226}
2227
2228/*
2229 * DLM_DEREF_LOCKRES_MSG
2230 */
2231
2232int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2233{
2234        struct dlm_deref_lockres deref;
2235        int ret = 0, r;
2236        const char *lockname;
2237        unsigned int namelen;
2238
2239        lockname = res->lockname.name;
2240        namelen = res->lockname.len;
2241        BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2242
2243        memset(&deref, 0, sizeof(deref));
2244        deref.node_idx = dlm->node_num;
2245        deref.namelen = namelen;
2246        memcpy(deref.name, lockname, namelen);
2247
2248        ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2249                                 &deref, sizeof(deref), res->owner, &r);
2250        if (ret < 0)
2251                mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF to node %u\n",
2252                     dlm->name, namelen, lockname, ret, res->owner);
2253        else if (r < 0) {
2254                /* BAD.  other node says I did not have a ref. */
2255                mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
2256                     dlm->name, namelen, lockname, res->owner, r);
2257                dlm_print_one_lock_resource(res);
2258                if (r == -ENOMEM)
2259                        BUG();
2260        } else
2261                ret = r;
2262
2263        return ret;
2264}
2265
2266int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
2267                              void **ret_data)
2268{
2269        struct dlm_ctxt *dlm = data;
2270        struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
2271        struct dlm_lock_resource *res = NULL;
2272        char *name;
2273        unsigned int namelen;
2274        int ret = -EINVAL;
2275        u8 node;
2276        unsigned int hash;
2277        struct dlm_work_item *item;
2278        int cleared = 0;
2279        int dispatch = 0;
2280
2281        if (!dlm_grab(dlm))
2282                return 0;
2283
2284        name = deref->name;
2285        namelen = deref->namelen;
2286        node = deref->node_idx;
2287
2288        if (namelen > DLM_LOCKID_NAME_MAX) {
2289                mlog(ML_ERROR, "Invalid name length!");
2290                goto done;
2291        }
2292        if (deref->node_idx >= O2NM_MAX_NODES) {
2293                mlog(ML_ERROR, "Invalid node number: %u\n", node);
2294                goto done;
2295        }
2296
2297        hash = dlm_lockid_hash(name, namelen);
2298
2299        spin_lock(&dlm->spinlock);
2300        res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2301        if (!res) {
2302                spin_unlock(&dlm->spinlock);
2303                mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2304                     dlm->name, namelen, name);
2305                goto done;
2306        }
2307        spin_unlock(&dlm->spinlock);
2308
2309        spin_lock(&res->spinlock);
2310        if (res->state & DLM_LOCK_RES_SETREF_INPROG)
2311                dispatch = 1;
2312        else {
2313                BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2314                if (test_bit(node, res->refmap)) {
2315                        dlm_lockres_clear_refmap_bit(dlm, res, node);
2316                        cleared = 1;
2317                }
2318        }
2319        spin_unlock(&res->spinlock);
2320
2321        if (!dispatch) {
2322                if (cleared)
2323                        dlm_lockres_calc_usage(dlm, res);
2324                else {
2325                        mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2326                        "but it is already dropped!\n", dlm->name,
2327                        res->lockname.len, res->lockname.name, node);
2328                        dlm_print_one_lock_resource(res);
2329                }
2330                ret = DLM_DEREF_RESPONSE_DONE;
2331                goto done;
2332        }
2333
2334        item = kzalloc(sizeof(*item), GFP_NOFS);
2335        if (!item) {
2336                ret = -ENOMEM;
2337                mlog_errno(ret);
2338                goto done;
2339        }
2340
2341        dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
2342        item->u.dl.deref_res = res;
2343        item->u.dl.deref_node = node;
2344
2345        spin_lock(&dlm->work_lock);
2346        list_add_tail(&item->list, &dlm->work_list);
2347        spin_unlock(&dlm->work_lock);
2348
2349        queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2350        return DLM_DEREF_RESPONSE_INPROG;
2351
2352done:
2353        if (res)
2354                dlm_lockres_put(res);
2355        dlm_put(dlm);
2356
2357        return ret;
2358}
2359
2360int dlm_deref_lockres_done_handler(struct o2net_msg *msg, u32 len, void *data,
2361                              void **ret_data)
2362{
2363        struct dlm_ctxt *dlm = data;
2364        struct dlm_deref_lockres_done *deref
2365                        = (struct dlm_deref_lockres_done *)msg->buf;
2366        struct dlm_lock_resource *res = NULL;
2367        char *name;
2368        unsigned int namelen;
2369        int ret = -EINVAL;
2370        u8 node;
2371        unsigned int hash;
2372
2373        if (!dlm_grab(dlm))
2374                return 0;
2375
2376        name = deref->name;
2377        namelen = deref->namelen;
2378        node = deref->node_idx;
2379
2380        if (namelen > DLM_LOCKID_NAME_MAX) {
2381                mlog(ML_ERROR, "Invalid name length!");
2382                goto done;
2383        }
2384        if (deref->node_idx >= O2NM_MAX_NODES) {
2385                mlog(ML_ERROR, "Invalid node number: %u\n", node);
2386                goto done;
2387        }
2388
2389        hash = dlm_lockid_hash(name, namelen);
2390
2391        spin_lock(&dlm->spinlock);
2392        res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2393        if (!res) {
2394                spin_unlock(&dlm->spinlock);
2395                mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2396                     dlm->name, namelen, name);
2397                goto done;
2398        }
2399
2400        spin_lock(&res->spinlock);
2401        if (!(res->state & DLM_LOCK_RES_DROPPING_REF)) {
2402                spin_unlock(&res->spinlock);
2403                spin_unlock(&dlm->spinlock);
2404                mlog(ML_NOTICE, "%s:%.*s: node %u sends deref done "
2405                        "but it is already derefed!\n", dlm->name,
2406                        res->lockname.len, res->lockname.name, node);
2407                ret = 0;
2408                goto done;
2409        }
2410
2411        __dlm_do_purge_lockres(dlm, res);
2412        spin_unlock(&res->spinlock);
2413        wake_up(&res->wq);
2414
2415        spin_unlock(&dlm->spinlock);
2416
2417        ret = 0;
2418done:
2419        if (res)
2420                dlm_lockres_put(res);
2421        dlm_put(dlm);
2422        return ret;
2423}
2424
2425static void dlm_drop_lockres_ref_done(struct dlm_ctxt *dlm,
2426                struct dlm_lock_resource *res, u8 node)
2427{
2428        struct dlm_deref_lockres_done deref;
2429        int ret = 0, r;
2430        const char *lockname;
2431        unsigned int namelen;
2432
2433        lockname = res->lockname.name;
2434        namelen = res->lockname.len;
2435        BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2436
2437        memset(&deref, 0, sizeof(deref));
2438        deref.node_idx = dlm->node_num;
2439        deref.namelen = namelen;
2440        memcpy(deref.name, lockname, namelen);
2441
2442        ret = o2net_send_message(DLM_DEREF_LOCKRES_DONE, dlm->key,
2443                                 &deref, sizeof(deref), node, &r);
2444        if (ret < 0) {
2445                mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF DONE "
2446                                " to node %u\n", dlm->name, namelen,
2447                                lockname, ret, node);
2448        } else if (r < 0) {
2449                /* ignore the error */
2450                mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
2451                     dlm->name, namelen, lockname, node, r);
2452                dlm_print_one_lock_resource(res);
2453        }
2454}
2455
2456static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
2457{
2458        struct dlm_ctxt *dlm;
2459        struct dlm_lock_resource *res;
2460        u8 node;
2461        u8 cleared = 0;
2462
2463        dlm = item->dlm;
2464        res = item->u.dl.deref_res;
2465        node = item->u.dl.deref_node;
2466
2467        spin_lock(&res->spinlock);
2468        BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2469        __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
2470        if (test_bit(node, res->refmap)) {
2471                dlm_lockres_clear_refmap_bit(dlm, res, node);
2472                cleared = 1;
2473        }
2474        spin_unlock(&res->spinlock);
2475
2476        dlm_drop_lockres_ref_done(dlm, res, node);
2477
2478        if (cleared) {
2479                mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
2480                     dlm->name, res->lockname.len, res->lockname.name, node);
2481                dlm_lockres_calc_usage(dlm, res);
2482        } else {
2483                mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2484                     "but it is already dropped!\n", dlm->name,
2485                     res->lockname.len, res->lockname.name, node);
2486                dlm_print_one_lock_resource(res);
2487        }
2488
2489        dlm_lockres_put(res);
2490}
2491
2492/*
2493 * A migratable resource is one that is :
2494 * 1. locally mastered, and,
2495 * 2. zero local locks, and,
2496 * 3. one or more non-local locks, or, one or more references
2497 * Returns 1 if yes, 0 if not.
2498 */
2499static int dlm_is_lockres_migratable(struct dlm_ctxt *dlm,
2500                                      struct dlm_lock_resource *res)
2501{
2502        enum dlm_lockres_list idx;
2503        int nonlocal = 0, node_ref;
2504        struct list_head *queue;
2505        struct dlm_lock *lock;
2506        u64 cookie;
2507
2508        assert_spin_locked(&res->spinlock);
2509
2510        /* delay migration when the lockres is in MIGRATING state */
2511        if (res->state & DLM_LOCK_RES_MIGRATING)
2512                return 0;
2513
2514        /* delay migration when the lockres is in RECOCERING state */
2515        if (res->state & (DLM_LOCK_RES_RECOVERING|
2516                        DLM_LOCK_RES_RECOVERY_WAITING))
2517                return 0;
2518
2519        if (res->owner != dlm->node_num)
2520                return 0;
2521
2522        for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
2523                queue = dlm_list_idx_to_ptr(res, idx);
2524                list_for_each_entry(lock, queue, list) {
2525                        if (lock->ml.node != dlm->node_num) {
2526                                nonlocal++;
2527                                continue;
2528                        }
2529                        cookie = be64_to_cpu(lock->ml.cookie);
2530                        mlog(0, "%s: Not migratable res %.*s, lock %u:%llu on "
2531                             "%s list\n", dlm->name, res->lockname.len,
2532                             res->lockname.name,
2533                             dlm_get_lock_cookie_node(cookie),
2534                             dlm_get_lock_cookie_seq(cookie),
2535                             dlm_list_in_text(idx));
2536                        return 0;
2537                }
2538        }
2539
2540        if (!nonlocal) {
2541                node_ref = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
2542                if (node_ref >= O2NM_MAX_NODES)
2543                        return 0;
2544        }
2545
2546        mlog(0, "%s: res %.*s, Migratable\n", dlm->name, res->lockname.len,
2547             res->lockname.name);
2548
2549        return 1;
2550}
2551
2552/*
2553 * DLM_MIGRATE_LOCKRES
2554 */
2555
2556
2557static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
2558                               struct dlm_lock_resource *res, u8 target)
2559{
2560        struct dlm_master_list_entry *mle = NULL;
2561        struct dlm_master_list_entry *oldmle = NULL;
2562        struct dlm_migratable_lockres *mres = NULL;
2563        int ret = 0;
2564        const char *name;
2565        unsigned int namelen;
2566        int mle_added = 0;
2567        int wake = 0;
2568
2569        if (!dlm_grab(dlm))
2570                return -EINVAL;
2571
2572        BUG_ON(target == O2NM_MAX_NODES);
2573
2574        name = res->lockname.name;
2575        namelen = res->lockname.len;
2576
2577        mlog(0, "%s: Migrating %.*s to node %u\n", dlm->name, namelen, name,
2578             target);
2579
2580        /* preallocate up front. if this fails, abort */
2581        ret = -ENOMEM;
2582        mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
2583        if (!mres) {
2584                mlog_errno(ret);
2585                goto leave;
2586        }
2587
2588        mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
2589        if (!mle) {
2590                mlog_errno(ret);
2591                goto leave;
2592        }
2593        ret = 0;
2594
2595        /*
2596         * clear any existing master requests and
2597         * add the migration mle to the list
2598         */
2599        spin_lock(&dlm->spinlock);
2600        spin_lock(&dlm->master_lock);
2601        ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2602                                    namelen, target, dlm->node_num);
2603        /* get an extra reference on the mle.
2604         * otherwise the assert_master from the new
2605         * master will destroy this.
2606         */
2607        if (ret != -EEXIST)
2608                dlm_get_mle_inuse(mle);
2609
2610        spin_unlock(&dlm->master_lock);
2611        spin_unlock(&dlm->spinlock);
2612
2613        if (ret == -EEXIST) {
2614                mlog(0, "another process is already migrating it\n");
2615                goto fail;
2616        }
2617        mle_added = 1;
2618
2619        /*
2620         * set the MIGRATING flag and flush asts
2621         * if we fail after this we need to re-dirty the lockres
2622         */
2623        if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2624                mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
2625                     "the target went down.\n", res->lockname.len,
2626                     res->lockname.name, target);
2627                spin_lock(&res->spinlock);
2628                res->state &= ~DLM_LOCK_RES_MIGRATING;
2629                wake = 1;
2630                spin_unlock(&res->spinlock);
2631                ret = -EINVAL;
2632        }
2633
2634fail:
2635        if (ret != -EEXIST && oldmle) {
2636                /* master is known, detach if not already detached */
2637                dlm_mle_detach_hb_events(dlm, oldmle);
2638                dlm_put_mle(oldmle);
2639        }
2640
2641        if (ret < 0) {
2642                if (mle_added) {
2643                        dlm_mle_detach_hb_events(dlm, mle);
2644                        dlm_put_mle(mle);
2645                        dlm_put_mle_inuse(mle);
2646                } else if (mle) {
2647                        kmem_cache_free(dlm_mle_cache, mle);
2648                        mle = NULL;
2649                }
2650                goto leave;
2651        }
2652
2653        /*
2654         * at this point, we have a migration target, an mle
2655         * in the master list, and the MIGRATING flag set on
2656         * the lockres
2657         */
2658
2659        /* now that remote nodes are spinning on the MIGRATING flag,
2660         * ensure that all assert_master work is flushed. */
2661        flush_workqueue(dlm->dlm_worker);
2662
2663        /* notify new node and send all lock state */
2664        /* call send_one_lockres with migration flag.
2665         * this serves as notice to the target node that a
2666         * migration is starting. */
2667        ret = dlm_send_one_lockres(dlm, res, mres, target,
2668                                   DLM_MRES_MIGRATION);
2669
2670        if (ret < 0) {
2671                mlog(0, "migration to node %u failed with %d\n",
2672                     target, ret);
2673                /* migration failed, detach and clean up mle */
2674                dlm_mle_detach_hb_events(dlm, mle);
2675                dlm_put_mle(mle);
2676                dlm_put_mle_inuse(mle);
2677                spin_lock(&res->spinlock);
2678                res->state &= ~DLM_LOCK_RES_MIGRATING;
2679                wake = 1;
2680                spin_unlock(&res->spinlock);
2681                if (dlm_is_host_down(ret))
2682                        dlm_wait_for_node_death(dlm, target,
2683                                                DLM_NODE_DEATH_WAIT_MAX);
2684                goto leave;
2685        }
2686
2687        /* at this point, the target sends a message to all nodes,
2688         * (using dlm_do_migrate_request).  this node is skipped since
2689         * we had to put an mle in the list to begin the process.  this
2690         * node now waits for target to do an assert master.  this node
2691         * will be the last one notified, ensuring that the migration
2692         * is complete everywhere.  if the target dies while this is
2693         * going on, some nodes could potentially see the target as the
2694         * master, so it is important that my recovery finds the migration
2695         * mle and sets the master to UNKNOWN. */
2696
2697
2698        /* wait for new node to assert master */
2699        while (1) {
2700                ret = wait_event_interruptible_timeout(mle->wq,
2701                                        (atomic_read(&mle->woken) == 1),
2702                                        msecs_to_jiffies(5000));
2703
2704                if (ret >= 0) {
2705                        if (atomic_read(&mle->woken) == 1 ||
2706                            res->owner == target)
2707                                break;
2708
2709                        mlog(0, "%s:%.*s: timed out during migration\n",
2710                             dlm->name, res->lockname.len, res->lockname.name);
2711                        /* avoid hang during shutdown when migrating lockres
2712                         * to a node which also goes down */
2713                        if (dlm_is_node_dead(dlm, target)) {
2714                                mlog(0, "%s:%.*s: expected migration "
2715                                     "target %u is no longer up, restarting\n",
2716                                     dlm->name, res->lockname.len,
2717                                     res->lockname.name, target);
2718                                ret = -EINVAL;
2719                                /* migration failed, detach and clean up mle */
2720                                dlm_mle_detach_hb_events(dlm, mle);
2721                                dlm_put_mle(mle);
2722                                dlm_put_mle_inuse(mle);
2723                                spin_lock(&res->spinlock);
2724                                res->state &= ~DLM_LOCK_RES_MIGRATING;
2725                                wake = 1;
2726                                spin_unlock(&res->spinlock);
2727                                goto leave;
2728                        }
2729                } else
2730                        mlog(0, "%s:%.*s: caught signal during migration\n",
2731                             dlm->name, res->lockname.len, res->lockname.name);
2732        }
2733
2734        /* all done, set the owner, clear the flag */
2735        spin_lock(&res->spinlock);
2736        dlm_set_lockres_owner(dlm, res, target);
2737        res->state &= ~DLM_LOCK_RES_MIGRATING;
2738        dlm_remove_nonlocal_locks(dlm, res);
2739        spin_unlock(&res->spinlock);
2740        wake_up(&res->wq);
2741
2742        /* master is known, detach if not already detached */
2743        dlm_mle_detach_hb_events(dlm, mle);
2744        dlm_put_mle_inuse(mle);
2745        ret = 0;
2746
2747        dlm_lockres_calc_usage(dlm, res);
2748
2749leave:
2750        /* re-dirty the lockres if we failed */
2751        if (ret < 0)
2752                dlm_kick_thread(dlm, res);
2753
2754        /* wake up waiters if the MIGRATING flag got set
2755         * but migration failed */
2756        if (wake)
2757                wake_up(&res->wq);
2758
2759        if (mres)
2760                free_page((unsigned long)mres);
2761
2762        dlm_put(dlm);
2763
2764        mlog(0, "%s: Migrating %.*s to %u, returns %d\n", dlm->name, namelen,
2765             name, target, ret);
2766        return ret;
2767}
2768
2769#define DLM_MIGRATION_RETRY_MS  100
2770
2771/*
2772 * Should be called only after beginning the domain leave process.
2773 * There should not be any remaining locks on nonlocal lock resources,
2774 * and there should be no local locks left on locally mastered resources.
2775 *
2776 * Called with the dlm spinlock held, may drop it to do migration, but
2777 * will re-acquire before exit.
2778 *
2779 * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped
2780 */
2781int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2782{
2783        int ret;
2784        int lock_dropped = 0;
2785        u8 target = O2NM_MAX_NODES;
2786
2787        assert_spin_locked(&dlm->spinlock);
2788
2789        spin_lock(&res->spinlock);
2790        if (dlm_is_lockres_migratable(dlm, res))
2791                target = dlm_pick_migration_target(dlm, res);
2792        spin_unlock(&res->spinlock);
2793
2794        if (target == O2NM_MAX_NODES)
2795                goto leave;
2796
2797        /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
2798        spin_unlock(&dlm->spinlock);
2799        lock_dropped = 1;
2800        ret = dlm_migrate_lockres(dlm, res, target);
2801        if (ret)
2802                mlog(0, "%s: res %.*s, Migrate to node %u failed with %d\n",
2803                     dlm->name, res->lockname.len, res->lockname.name,
2804                     target, ret);
2805        spin_lock(&dlm->spinlock);
2806leave:
2807        return lock_dropped;
2808}
2809
2810int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2811{
2812        int ret;
2813        spin_lock(&dlm->ast_lock);
2814        spin_lock(&lock->spinlock);
2815        ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2816        spin_unlock(&lock->spinlock);
2817        spin_unlock(&dlm->ast_lock);
2818        return ret;
2819}
2820
2821static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2822                                     struct dlm_lock_resource *res,
2823                                     u8 mig_target)
2824{
2825        int can_proceed;
2826        spin_lock(&res->spinlock);
2827        can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2828        spin_unlock(&res->spinlock);
2829
2830        /* target has died, so make the caller break out of the
2831         * wait_event, but caller must recheck the domain_map */
2832        spin_lock(&dlm->spinlock);
2833        if (!test_bit(mig_target, dlm->domain_map))
2834                can_proceed = 1;
2835        spin_unlock(&dlm->spinlock);
2836        return can_proceed;
2837}
2838
2839static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
2840                                struct dlm_lock_resource *res)
2841{
2842        int ret;
2843        spin_lock(&res->spinlock);
2844        ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2845        spin_unlock(&res->spinlock);
2846        return ret;
2847}
2848
2849
2850static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2851                                       struct dlm_lock_resource *res,
2852                                       u8 target)
2853{
2854        int ret = 0;
2855
2856        mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2857               res->lockname.len, res->lockname.name, dlm->node_num,
2858               target);
2859        /* need to set MIGRATING flag on lockres.  this is done by
2860         * ensuring that all asts have been flushed for this lockres. */
2861        spin_lock(&res->spinlock);
2862        BUG_ON(res->migration_pending);
2863        res->migration_pending = 1;
2864        /* strategy is to reserve an extra ast then release
2865         * it below, letting the release do all of the work */
2866        __dlm_lockres_reserve_ast(res);
2867        spin_unlock(&res->spinlock);
2868
2869        /* now flush all the pending asts */
2870        dlm_kick_thread(dlm, res);
2871        /* before waiting on DIRTY, block processes which may
2872         * try to dirty the lockres before MIGRATING is set */
2873        spin_lock(&res->spinlock);
2874        BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
2875        res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
2876        spin_unlock(&res->spinlock);
2877        /* now wait on any pending asts and the DIRTY state */
2878        wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2879        dlm_lockres_release_ast(dlm, res);
2880
2881        mlog(0, "about to wait on migration_wq, dirty=%s\n",
2882               res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2883        /* if the extra ref we just put was the final one, this
2884         * will pass thru immediately.  otherwise, we need to wait
2885         * for the last ast to finish. */
2886again:
2887        ret = wait_event_interruptible_timeout(dlm->migration_wq,
2888                   dlm_migration_can_proceed(dlm, res, target),
2889                   msecs_to_jiffies(1000));
2890        if (ret < 0) {
2891                mlog(0, "woken again: migrating? %s, dead? %s\n",
2892                       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2893                       test_bit(target, dlm->domain_map) ? "no":"yes");
2894        } else {
2895                mlog(0, "all is well: migrating? %s, dead? %s\n",
2896                       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2897                       test_bit(target, dlm->domain_map) ? "no":"yes");
2898        }
2899        if (!dlm_migration_can_proceed(dlm, res, target)) {
2900                mlog(0, "trying again...\n");
2901                goto again;
2902        }
2903
2904        ret = 0;
2905        /* did the target go down or die? */
2906        spin_lock(&dlm->spinlock);
2907        if (!test_bit(target, dlm->domain_map)) {
2908                mlog(ML_ERROR, "aha. migration target %u just went down\n",
2909                     target);
2910                ret = -EHOSTDOWN;
2911        }
2912        spin_unlock(&dlm->spinlock);
2913
2914        /*
2915         * if target is down, we need to clear DLM_LOCK_RES_BLOCK_DIRTY for
2916         * another try; otherwise, we are sure the MIGRATING state is there,
2917         * drop the unneeded state which blocked threads trying to DIRTY
2918         */
2919        spin_lock(&res->spinlock);
2920        BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
2921        res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
2922        if (!ret)
2923                BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
2924        else
2925                res->migration_pending = 0;
2926        spin_unlock(&res->spinlock);
2927
2928        /*
2929         * at this point:
2930         *
2931         *   o the DLM_LOCK_RES_MIGRATING flag is set if target not down
2932         *   o there are no pending asts on this lockres
2933         *   o all processes trying to reserve an ast on this
2934         *     lockres must wait for the MIGRATING flag to clear
2935         */
2936        return ret;
2937}
2938
2939/* last step in the migration process.
2940 * original master calls this to free all of the dlm_lock
2941 * structures that used to be for other nodes. */
2942static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2943                                      struct dlm_lock_resource *res)
2944{
2945        struct list_head *queue = &res->granted;
2946        int i, bit;
2947        struct dlm_lock *lock, *next;
2948
2949        assert_spin_locked(&res->spinlock);
2950
2951        BUG_ON(res->owner == dlm->node_num);
2952
2953        for (i=0; i<3; i++) {
2954                list_for_each_entry_safe(lock, next, queue, list) {
2955                        if (lock->ml.node != dlm->node_num) {
2956                                mlog(0, "putting lock for node %u\n",
2957                                     lock->ml.node);
2958                                /* be extra careful */
2959                                BUG_ON(!list_empty(&lock->ast_list));
2960                                BUG_ON(!list_empty(&lock->bast_list));
2961                                BUG_ON(lock->ast_pending);
2962                                BUG_ON(lock->bast_pending);
2963                                dlm_lockres_clear_refmap_bit(dlm, res,
2964                                                             lock->ml.node);
2965                                list_del_init(&lock->list);
2966                                dlm_lock_put(lock);
2967                                /* In a normal unlock, we would have added a
2968                                 * DLM_UNLOCK_FREE_LOCK action. Force it. */
2969                                dlm_lock_put(lock);
2970                        }
2971                }
2972                queue++;
2973        }
2974        bit = 0;
2975        while (1) {
2976                bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
2977                if (bit >= O2NM_MAX_NODES)
2978                        break;
2979                /* do not clear the local node reference, if there is a
2980                 * process holding this, let it drop the ref itself */
2981                if (bit != dlm->node_num) {
2982                        mlog(0, "%s:%.*s: node %u had a ref to this "
2983                             "migrating lockres, clearing\n", dlm->name,
2984                             res->lockname.len, res->lockname.name, bit);
2985                        dlm_lockres_clear_refmap_bit(dlm, res, bit);
2986                }
2987                bit++;
2988        }
2989}
2990
2991/*
2992 * Pick a node to migrate the lock resource to. This function selects a
2993 * potential target based first on the locks and then on refmap. It skips
2994 * nodes that are in the process of exiting the domain.
2995 */
2996static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2997                                    struct dlm_lock_resource *res)
2998{
2999        enum dlm_lockres_list idx;
3000        struct list_head *queue = &res->granted;
3001        struct dlm_lock *lock;
3002        int noderef;
3003        u8 nodenum = O2NM_MAX_NODES;
3004
3005        assert_spin_locked(&dlm->spinlock);
3006        assert_spin_locked(&res->spinlock);
3007
3008        /* Go through all the locks */
3009        for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
3010                queue = dlm_list_idx_to_ptr(res, idx);
3011                list_for_each_entry(lock, queue, list) {
3012                        if (lock->ml.node == dlm->node_num)
3013                                continue;
3014                        if (test_bit(lock->ml.node, dlm->exit_domain_map))
3015                                continue;
3016                        nodenum = lock->ml.node;
3017                        goto bail;
3018                }
3019        }
3020
3021        /* Go thru the refmap */
3022        noderef = -1;
3023        while (1) {
3024                noderef = find_next_bit(res->refmap, O2NM_MAX_NODES,
3025                                        noderef + 1);
3026                if (noderef >= O2NM_MAX_NODES)
3027                        break;
3028                if (noderef == dlm->node_num)
3029                        continue;
3030                if (test_bit(noderef, dlm->exit_domain_map))
3031                        continue;
3032                nodenum = noderef;
3033                goto bail;
3034        }
3035
3036bail:
3037        return nodenum;
3038}
3039
3040/* this is called by the new master once all lockres
3041 * data has been received */
3042static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
3043                                  struct dlm_lock_resource *res,
3044                                  u8 master, u8 new_master,
3045                                  struct dlm_node_iter *iter)
3046{
3047        struct dlm_migrate_request migrate;
3048        int ret, skip, status = 0;
3049        int nodenum;
3050
3051        memset(&migrate, 0, sizeof(migrate));
3052        migrate.namelen = res->lockname.len;
3053        memcpy(migrate.name, res->lockname.name, migrate.namelen);
3054        migrate.new_master = new_master;
3055        migrate.master = master;
3056
3057        ret = 0;
3058
3059        /* send message to all nodes, except the master and myself */
3060        while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
3061                if (nodenum == master ||
3062                    nodenum == new_master)
3063                        continue;
3064
3065                /* We could race exit domain. If exited, skip. */
3066                spin_lock(&dlm->spinlock);
3067                skip = (!test_bit(nodenum, dlm->domain_map));
3068                spin_unlock(&dlm->spinlock);
3069                if (skip) {
3070                        clear_bit(nodenum, iter->node_map);
3071                        continue;
3072                }
3073
3074                ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
3075                                         &migrate, sizeof(migrate), nodenum,
3076                                         &status);
3077                if (ret < 0) {
3078                        mlog(ML_ERROR, "%s: res %.*s, Error %d send "
3079                             "MIGRATE_REQUEST to node %u\n", dlm->name,
3080                             migrate.namelen, migrate.name, ret, nodenum);
3081                        if (!dlm_is_host_down(ret)) {
3082                                mlog(ML_ERROR, "unhandled error=%d!\n", ret);
3083                                BUG();
3084                        }
3085                        clear_bit(nodenum, iter->node_map);
3086                        ret = 0;
3087                } else if (status < 0) {
3088                        mlog(0, "migrate request (node %u) returned %d!\n",
3089                             nodenum, status);
3090                        ret = status;
3091                } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
3092                        /* during the migration request we short-circuited
3093                         * the mastery of the lockres.  make sure we have
3094                         * a mastery ref for nodenum */
3095                        mlog(0, "%s:%.*s: need ref for node %u\n",
3096                             dlm->name, res->lockname.len, res->lockname.name,
3097                             nodenum);
3098                        spin_lock(&res->spinlock);
3099                        dlm_lockres_set_refmap_bit(dlm, res, nodenum);
3100                        spin_unlock(&res->spinlock);
3101                }
3102        }
3103
3104        if (ret < 0)
3105                mlog_errno(ret);
3106
3107        mlog(0, "returning ret=%d\n", ret);
3108        return ret;
3109}
3110
3111
3112/* if there is an existing mle for this lockres, we now know who the master is.
3113 * (the one who sent us *this* message) we can clear it up right away.
3114 * since the process that put the mle on the list still has a reference to it,
3115 * we can unhash it now, set the master and wake the process.  as a result,
3116 * we will have no mle in the list to start with.  now we can add an mle for
3117 * the migration and this should be the only one found for those scanning the
3118 * list.  */
3119int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
3120                                void **ret_data)
3121{
3122        struct dlm_ctxt *dlm = data;
3123        struct dlm_lock_resource *res = NULL;
3124        struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
3125        struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
3126        const char *name;
3127        unsigned int namelen, hash;
3128        int ret = 0;
3129
3130        if (!dlm_grab(dlm))
3131                return 0;
3132
3133        name = migrate->name;
3134        namelen = migrate->namelen;
3135        hash = dlm_lockid_hash(name, namelen);
3136
3137        /* preallocate.. if this fails, abort */
3138        mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
3139
3140        if (!mle) {
3141                ret = -ENOMEM;
3142                goto leave;
3143        }
3144
3145        /* check for pre-existing lock */
3146        spin_lock(&dlm->spinlock);
3147        res = __dlm_lookup_lockres(dlm, name, namelen, hash);
3148        if (res) {
3149                spin_lock(&res->spinlock);
3150                if (res->state & DLM_LOCK_RES_RECOVERING) {
3151                        /* if all is working ok, this can only mean that we got
3152                        * a migrate request from a node that we now see as
3153                        * dead.  what can we do here?  drop it to the floor? */
3154                        spin_unlock(&res->spinlock);
3155                        mlog(ML_ERROR, "Got a migrate request, but the "
3156                             "lockres is marked as recovering!");
3157                        kmem_cache_free(dlm_mle_cache, mle);
3158                        ret = -EINVAL; /* need a better solution */
3159                        goto unlock;
3160                }
3161                res->state |= DLM_LOCK_RES_MIGRATING;
3162                spin_unlock(&res->spinlock);
3163        }
3164
3165        spin_lock(&dlm->master_lock);
3166        /* ignore status.  only nonzero status would BUG. */
3167        ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
3168                                    name, namelen,
3169                                    migrate->new_master,
3170                                    migrate->master);
3171
3172        if (ret < 0)
3173                kmem_cache_free(dlm_mle_cache, mle);
3174
3175        spin_unlock(&dlm->master_lock);
3176unlock:
3177        spin_unlock(&dlm->spinlock);
3178
3179        if (oldmle) {
3180                /* master is known, detach if not already detached */
3181                dlm_mle_detach_hb_events(dlm, oldmle);
3182                dlm_put_mle(oldmle);
3183        }
3184
3185        if (res)
3186                dlm_lockres_put(res);
3187leave:
3188        dlm_put(dlm);
3189        return ret;
3190}
3191
3192/* must be holding dlm->spinlock and dlm->master_lock
3193 * when adding a migration mle, we can clear any other mles
3194 * in the master list because we know with certainty that
3195 * the master is "master".  so we remove any old mle from
3196 * the list after setting it's master field, and then add
3197 * the new migration mle.  this way we can hold with the rule
3198 * of having only one mle for a given lock name at all times. */
3199static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
3200                                 struct dlm_lock_resource *res,
3201                                 struct dlm_master_list_entry *mle,
3202                                 struct dlm_master_list_entry **oldmle,
3203                                 const char *name, unsigned int namelen,
3204                                 u8 new_master, u8 master)
3205{
3206        int found;
3207        int ret = 0;
3208
3209        *oldmle = NULL;
3210
3211        assert_spin_locked(&dlm->spinlock);
3212        assert_spin_locked(&dlm->master_lock);
3213
3214        /* caller is responsible for any ref taken here on oldmle */
3215        found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
3216        if (found) {
3217                struct dlm_master_list_entry *tmp = *oldmle;
3218                spin_lock(&tmp->spinlock);
3219                if (tmp->type == DLM_MLE_MIGRATION) {
3220                        if (master == dlm->node_num) {
3221                                /* ah another process raced me to it */
3222                                mlog(0, "tried to migrate %.*s, but some "
3223                                     "process beat me to it\n",
3224                                     namelen, name);
3225                                spin_unlock(&tmp->spinlock);
3226                                return -EEXIST;
3227                        } else {
3228                                /* bad.  2 NODES are trying to migrate! */
3229                                mlog(ML_ERROR, "migration error  mle: "
3230                                     "master=%u new_master=%u // request: "
3231                                     "master=%u new_master=%u // "
3232                                     "lockres=%.*s\n",
3233                                     tmp->master, tmp->new_master,
3234                                     master, new_master,
3235                                     namelen, name);
3236                                BUG();
3237                        }
3238                } else {
3239                        /* this is essentially what assert_master does */
3240                        tmp->master = master;
3241                        atomic_set(&tmp->woken, 1);
3242                        wake_up(&tmp->wq);
3243                        /* remove it so that only one mle will be found */
3244                        __dlm_unlink_mle(dlm, tmp);
3245                        __dlm_mle_detach_hb_events(dlm, tmp);
3246                        if (tmp->type == DLM_MLE_MASTER) {
3247                                ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
3248                                mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
3249                                                "telling master to get ref "
3250                                                "for cleared out mle during "
3251                                                "migration\n", dlm->name,
3252                                                namelen, name, master,
3253                                                new_master);
3254                        }
3255                }
3256                spin_unlock(&tmp->spinlock);
3257        }
3258
3259        /* now add a migration mle to the tail of the list */
3260        dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
3261        mle->new_master = new_master;
3262        /* the new master will be sending an assert master for this.
3263         * at that point we will get the refmap reference */
3264        mle->master = master;
3265        /* do this for consistency with other mle types */
3266        set_bit(new_master, mle->maybe_map);
3267        __dlm_insert_mle(dlm, mle);
3268
3269        return ret;
3270}
3271
3272/*
3273 * Sets the owner of the lockres, associated to the mle, to UNKNOWN
3274 */
3275static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
3276                                        struct dlm_master_list_entry *mle)
3277{
3278        struct dlm_lock_resource *res;
3279
3280        /* Find the lockres associated to the mle and set its owner to UNK */
3281        res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen,
3282                                   mle->mnamehash);
3283        if (res) {
3284                spin_unlock(&dlm->master_lock);
3285
3286                /* move lockres onto recovery list */
3287                spin_lock(&res->spinlock);
3288                dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
3289                dlm_move_lockres_to_recovery_list(dlm, res);
3290                spin_unlock(&res->spinlock);
3291                dlm_lockres_put(res);
3292
3293                /* about to get rid of mle, detach from heartbeat */
3294                __dlm_mle_detach_hb_events(dlm, mle);
3295
3296                /* dump the mle */
3297                spin_lock(&dlm->master_lock);
3298                __dlm_put_mle(mle);
3299                spin_unlock(&dlm->master_lock);
3300        }
3301
3302        return res;
3303}
3304
3305static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
3306                                    struct dlm_master_list_entry *mle)
3307{
3308        __dlm_mle_detach_hb_events(dlm, mle);
3309
3310        spin_lock(&mle->spinlock);
3311        __dlm_unlink_mle(dlm, mle);
3312        atomic_set(&mle->woken, 1);
3313        spin_unlock(&mle->spinlock);
3314
3315        wake_up(&mle->wq);
3316}
3317
3318static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
3319                                struct dlm_master_list_entry *mle, u8 dead_node)
3320{
3321        int bit;
3322
3323        BUG_ON(mle->type != DLM_MLE_BLOCK);
3324
3325        spin_lock(&mle->spinlock);
3326        bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
3327        if (bit != dead_node) {
3328                mlog(0, "mle found, but dead node %u would not have been "
3329                     "master\n", dead_node);
3330                spin_unlock(&mle->spinlock);
3331        } else {
3332                /* Must drop the refcount by one since the assert_master will
3333                 * never arrive. This may result in the mle being unlinked and
3334                 * freed, but there may still be a process waiting in the
3335                 * dlmlock path which is fine. */
3336                mlog(0, "node %u was expected master\n", dead_node);
3337                atomic_set(&mle->woken, 1);
3338                spin_unlock(&mle->spinlock);
3339                wake_up(&mle->wq);
3340
3341                /* Do not need events any longer, so detach from heartbeat */
3342                __dlm_mle_detach_hb_events(dlm, mle);
3343                __dlm_put_mle(mle);
3344        }
3345}
3346
3347void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
3348{
3349        struct dlm_master_list_entry *mle;
3350        struct dlm_lock_resource *res;
3351        struct hlist_head *bucket;
3352        struct hlist_node *tmp;
3353        unsigned int i;
3354
3355        mlog(0, "dlm=%s, dead node=%u\n", dlm->name, dead_node);
3356top:
3357        assert_spin_locked(&dlm->spinlock);
3358
3359        /* clean the master list */
3360        spin_lock(&dlm->master_lock);
3361        for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3362                bucket = dlm_master_hash(dlm, i);
3363                hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
3364                        BUG_ON(mle->type != DLM_MLE_BLOCK &&
3365                               mle->type != DLM_MLE_MASTER &&
3366                               mle->type != DLM_MLE_MIGRATION);
3367
3368                        /* MASTER mles are initiated locally. The waiting
3369                         * process will notice the node map change shortly.
3370                         * Let that happen as normal. */
3371                        if (mle->type == DLM_MLE_MASTER)
3372                                continue;
3373
3374                        /* BLOCK mles are initiated by other nodes. Need to
3375                         * clean up if the dead node would have been the
3376                         * master. */
3377                        if (mle->type == DLM_MLE_BLOCK) {
3378                                dlm_clean_block_mle(dlm, mle, dead_node);
3379                                continue;
3380                        }
3381
3382                        /* Everything else is a MIGRATION mle */
3383
3384                        /* The rule for MIGRATION mles is that the master
3385                         * becomes UNKNOWN if *either* the original or the new
3386                         * master dies. All UNKNOWN lockres' are sent to
3387                         * whichever node becomes the recovery master. The new
3388                         * master is responsible for determining if there is
3389                         * still a master for this lockres, or if he needs to
3390                         * take over mastery. Either way, this node should
3391                         * expect another message to resolve this. */
3392
3393                        if (mle->master != dead_node &&
3394                            mle->new_master != dead_node)
3395                                continue;
3396
3397                        if (mle->new_master == dead_node && mle->inuse) {
3398                                mlog(ML_NOTICE, "%s: target %u died during "
3399                                                "migration from %u, the MLE is "
3400                                                "still keep used, ignore it!\n",
3401                                                dlm->name, dead_node,
3402                                                mle->master);
3403                                continue;
3404                        }
3405
3406                        /* If we have reached this point, this mle needs to be
3407                         * removed from the list and freed. */
3408                        dlm_clean_migration_mle(dlm, mle);
3409
3410                        mlog(0, "%s: node %u died during migration from "
3411                             "%u to %u!\n", dlm->name, dead_node, mle->master,
3412                             mle->new_master);
3413
3414                        /* If we find a lockres associated with the mle, we've
3415                         * hit this rare case that messes up our lock ordering.
3416                         * If so, we need to drop the master lock so that we can
3417                         * take the lockres lock, meaning that we will have to
3418                         * restart from the head of list. */
3419                        res = dlm_reset_mleres_owner(dlm, mle);
3420                        if (res)
3421                                /* restart */
3422                                goto top;
3423
3424                        /* This may be the last reference */
3425                        __dlm_put_mle(mle);
3426                }
3427        }
3428        spin_unlock(&dlm->master_lock);
3429}
3430
3431int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
3432                         u8 old_master)
3433{
3434        struct dlm_node_iter iter;
3435        int ret = 0;
3436
3437        spin_lock(&dlm->spinlock);
3438        dlm_node_iter_init(dlm->domain_map, &iter);
3439        clear_bit(old_master, iter.node_map);
3440        clear_bit(dlm->node_num, iter.node_map);
3441        spin_unlock(&dlm->spinlock);
3442
3443        /* ownership of the lockres is changing.  account for the
3444         * mastery reference here since old_master will briefly have
3445         * a reference after the migration completes */
3446        spin_lock(&res->spinlock);
3447        dlm_lockres_set_refmap_bit(dlm, res, old_master);
3448        spin_unlock(&res->spinlock);
3449
3450        mlog(0, "now time to do a migrate request to other nodes\n");
3451        ret = dlm_do_migrate_request(dlm, res, old_master,
3452                                     dlm->node_num, &iter);
3453        if (ret < 0) {
3454                mlog_errno(ret);
3455                goto leave;
3456        }
3457
3458        mlog(0, "doing assert master of %.*s to all except the original node\n",
3459             res->lockname.len, res->lockname.name);
3460        /* this call now finishes out the nodemap
3461         * even if one or more nodes die */
3462        ret = dlm_do_assert_master(dlm, res, iter.node_map,
3463                                   DLM_ASSERT_MASTER_FINISH_MIGRATION);
3464        if (ret < 0) {
3465                /* no longer need to retry.  all living nodes contacted. */
3466                mlog_errno(ret);
3467                ret = 0;
3468        }
3469
3470        memset(iter.node_map, 0, sizeof(iter.node_map));
3471        set_bit(old_master, iter.node_map);
3472        mlog(0, "doing assert master of %.*s back to %u\n",
3473             res->lockname.len, res->lockname.name, old_master);
3474        ret = dlm_do_assert_master(dlm, res, iter.node_map,
3475                                   DLM_ASSERT_MASTER_FINISH_MIGRATION);
3476        if (ret < 0) {
3477                mlog(0, "assert master to original master failed "
3478                     "with %d.\n", ret);
3479                /* the only nonzero status here would be because of
3480                 * a dead original node.  we're done. */
3481                ret = 0;
3482        }
3483
3484        /* all done, set the owner, clear the flag */
3485        spin_lock(&res->spinlock);
3486        dlm_set_lockres_owner(dlm, res, dlm->node_num);
3487        res->state &= ~DLM_LOCK_RES_MIGRATING;
3488        spin_unlock(&res->spinlock);
3489        /* re-dirty it on the new master */
3490        dlm_kick_thread(dlm, res);
3491        wake_up(&res->wq);
3492leave:
3493        return ret;
3494}
3495
3496/*
3497 * LOCKRES AST REFCOUNT
3498 * this is integral to migration
3499 */
3500
3501/* for future intent to call an ast, reserve one ahead of time.
3502 * this should be called only after waiting on the lockres
3503 * with dlm_wait_on_lockres, and while still holding the
3504 * spinlock after the call. */
3505void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
3506{
3507        assert_spin_locked(&res->spinlock);
3508        if (res->state & DLM_LOCK_RES_MIGRATING) {
3509                __dlm_print_one_lock_resource(res);
3510        }
3511        BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3512
3513        atomic_inc(&res->asts_reserved);
3514}
3515
3516/*
3517 * used to drop the reserved ast, either because it went unused,
3518 * or because the ast/bast was actually called.
3519 *
3520 * also, if there is a pending migration on this lockres,
3521 * and this was the last pending ast on the lockres,
3522 * atomically set the MIGRATING flag before we drop the lock.
3523 * this is how we ensure that migration can proceed with no
3524 * asts in progress.  note that it is ok if the state of the
3525 * queues is such that a lock should be granted in the future
3526 * or that a bast should be fired, because the new master will
3527 * shuffle the lists on this lockres as soon as it is migrated.
3528 */
3529void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
3530                             struct dlm_lock_resource *res)
3531{
3532        if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
3533                return;
3534
3535        if (!res->migration_pending) {
3536                spin_unlock(&res->spinlock);
3537                return;
3538        }
3539
3540        BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3541        res->migration_pending = 0;
3542        res->state |= DLM_LOCK_RES_MIGRATING;
3543        spin_unlock(&res->spinlock);
3544        wake_up(&res->wq);
3545        wake_up(&dlm->migration_wq);
3546}
3547
3548void dlm_force_free_mles(struct dlm_ctxt *dlm)
3549{
3550        int i;
3551        struct hlist_head *bucket;
3552        struct dlm_master_list_entry *mle;
3553        struct hlist_node *tmp;
3554
3555        /*
3556         * We notified all other nodes that we are exiting the domain and
3557         * marked the dlm state to DLM_CTXT_LEAVING. If any mles are still
3558         * around we force free them and wake any processes that are waiting
3559         * on the mles
3560         */
3561        spin_lock(&dlm->spinlock);
3562        spin_lock(&dlm->master_lock);
3563
3564        BUG_ON(dlm->dlm_state != DLM_CTXT_LEAVING);
3565        BUG_ON((find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 0) < O2NM_MAX_NODES));
3566
3567        for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3568                bucket = dlm_master_hash(dlm, i);
3569                hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
3570                        if (mle->type != DLM_MLE_BLOCK) {
3571                                mlog(ML_ERROR, "bad mle: %p\n", mle);
3572                                dlm_print_one_mle(mle);
3573                        }
3574                        atomic_set(&mle->woken, 1);
3575                        wake_up(&mle->wq);
3576
3577                        __dlm_unlink_mle(dlm, mle);
3578                        __dlm_mle_detach_hb_events(dlm, mle);
3579                        __dlm_put_mle(mle);
3580                }
3581        }
3582        spin_unlock(&dlm->master_lock);
3583        spin_unlock(&dlm->spinlock);
3584}
3585