linux/fs/ocfs2/dlm/dlmthread.c
<<
>>
Prefs
   1/* -*- mode: c; c-basic-offset: 8; -*-
   2 * vim: noexpandtab sw=8 ts=8 sts=0:
   3 *
   4 * dlmthread.c
   5 *
   6 * standalone DLM module
   7 *
   8 * Copyright (C) 2004 Oracle.  All rights reserved.
   9 *
  10 * This program is free software; you can redistribute it and/or
  11 * modify it under the terms of the GNU General Public
  12 * License as published by the Free Software Foundation; either
  13 * version 2 of the License, or (at your option) any later version.
  14 *
  15 * This program is distributed in the hope that it will be useful,
  16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  18 * General Public License for more details.
  19 *
  20 * You should have received a copy of the GNU General Public
  21 * License along with this program; if not, write to the
  22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  23 * Boston, MA 021110-1307, USA.
  24 *
  25 */
  26
  27
  28#include <linux/module.h>
  29#include <linux/fs.h>
  30#include <linux/types.h>
  31#include <linux/slab.h>
  32#include <linux/highmem.h>
  33#include <linux/init.h>
  34#include <linux/sysctl.h>
  35#include <linux/random.h>
  36#include <linux/blkdev.h>
  37#include <linux/socket.h>
  38#include <linux/inet.h>
  39#include <linux/timer.h>
  40#include <linux/kthread.h>
  41#include <linux/delay.h>
  42
  43
  44#include "cluster/heartbeat.h"
  45#include "cluster/nodemanager.h"
  46#include "cluster/tcp.h"
  47
  48#include "dlmapi.h"
  49#include "dlmcommon.h"
  50#include "dlmdomain.h"
  51
  52#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD)
  53#include "cluster/masklog.h"
  54
  55static int dlm_thread(void *data);
  56static void dlm_flush_asts(struct dlm_ctxt *dlm);
  57
  58#define dlm_lock_is_remote(dlm, lock)     ((lock)->ml.node != (dlm)->node_num)
  59
  60/* will exit holding res->spinlock, but may drop in function */
  61/* waits until flags are cleared on res->state */
  62void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags)
  63{
  64        DECLARE_WAITQUEUE(wait, current);
  65
  66        assert_spin_locked(&res->spinlock);
  67
  68        add_wait_queue(&res->wq, &wait);
  69repeat:
  70        set_current_state(TASK_UNINTERRUPTIBLE);
  71        if (res->state & flags) {
  72                spin_unlock(&res->spinlock);
  73                schedule();
  74                spin_lock(&res->spinlock);
  75                goto repeat;
  76        }
  77        remove_wait_queue(&res->wq, &wait);
  78        __set_current_state(TASK_RUNNING);
  79}
  80
  81int __dlm_lockres_has_locks(struct dlm_lock_resource *res)
  82{
  83        if (list_empty(&res->granted) &&
  84            list_empty(&res->converting) &&
  85            list_empty(&res->blocked))
  86                return 0;
  87        return 1;
  88}
  89
  90/* "unused": the lockres has no locks, is not on the dirty list,
  91 * has no inflight locks (in the gap between mastery and acquiring
  92 * the first lock), and has no bits in its refmap.
  93 * truly ready to be freed. */
  94int __dlm_lockres_unused(struct dlm_lock_resource *res)
  95{
  96        if (!__dlm_lockres_has_locks(res) &&
  97            (list_empty(&res->dirty) && !(res->state & DLM_LOCK_RES_DIRTY))) {
  98                /* try not to scan the bitmap unless the first two
  99                 * conditions are already true */
 100                int bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
 101                if (bit >= O2NM_MAX_NODES) {
 102                        /* since the bit for dlm->node_num is not
 103                         * set, inflight_locks better be zero */
 104                        BUG_ON(res->inflight_locks != 0);
 105                        return 1;
 106                }
 107        }
 108        return 0;
 109}
 110
 111
 112/* Call whenever you may have added or deleted something from one of
 113 * the lockres queue's. This will figure out whether it belongs on the
 114 * unused list or not and does the appropriate thing. */
 115void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
 116                              struct dlm_lock_resource *res)
 117{
 118        mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
 119
 120        assert_spin_locked(&dlm->spinlock);
 121        assert_spin_locked(&res->spinlock);
 122
 123        if (__dlm_lockres_unused(res)){
 124                if (list_empty(&res->purge)) {
 125                        mlog(0, "putting lockres %.*s:%p onto purge list\n",
 126                             res->lockname.len, res->lockname.name, res);
 127
 128                        res->last_used = jiffies;
 129                        dlm_lockres_get(res);
 130                        list_add_tail(&res->purge, &dlm->purge_list);
 131                        dlm->purge_count++;
 132                }
 133        } else if (!list_empty(&res->purge)) {
 134                mlog(0, "removing lockres %.*s:%p from purge list, owner=%u\n",
 135                     res->lockname.len, res->lockname.name, res, res->owner);
 136
 137                list_del_init(&res->purge);
 138                dlm_lockres_put(res);
 139                dlm->purge_count--;
 140        }
 141}
 142
 143void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
 144                            struct dlm_lock_resource *res)
 145{
 146        mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
 147        spin_lock(&dlm->spinlock);
 148        spin_lock(&res->spinlock);
 149
 150        __dlm_lockres_calc_usage(dlm, res);
 151
 152        spin_unlock(&res->spinlock);
 153        spin_unlock(&dlm->spinlock);
 154}
 155
 156static int dlm_purge_lockres(struct dlm_ctxt *dlm,
 157                             struct dlm_lock_resource *res)
 158{
 159        int master;
 160        int ret = 0;
 161
 162        spin_lock(&res->spinlock);
 163        if (!__dlm_lockres_unused(res)) {
 164                mlog(0, "%s:%.*s: tried to purge but not unused\n",
 165                     dlm->name, res->lockname.len, res->lockname.name);
 166                __dlm_print_one_lock_resource(res);
 167                spin_unlock(&res->spinlock);
 168                BUG();
 169        }
 170
 171        if (res->state & DLM_LOCK_RES_MIGRATING) {
 172                mlog(0, "%s:%.*s: Delay dropref as this lockres is "
 173                     "being remastered\n", dlm->name, res->lockname.len,
 174                     res->lockname.name);
 175                /* Re-add the lockres to the end of the purge list */
 176                if (!list_empty(&res->purge)) {
 177                        list_del_init(&res->purge);
 178                        list_add_tail(&res->purge, &dlm->purge_list);
 179                }
 180                spin_unlock(&res->spinlock);
 181                return 0;
 182        }
 183
 184        master = (res->owner == dlm->node_num);
 185
 186        if (!master)
 187                res->state |= DLM_LOCK_RES_DROPPING_REF;
 188        spin_unlock(&res->spinlock);
 189
 190        mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len,
 191             res->lockname.name, master);
 192
 193        if (!master) {
 194                /* drop spinlock...  retake below */
 195                spin_unlock(&dlm->spinlock);
 196
 197                spin_lock(&res->spinlock);
 198                /* This ensures that clear refmap is sent after the set */
 199                __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
 200                spin_unlock(&res->spinlock);
 201
 202                /* clear our bit from the master's refmap, ignore errors */
 203                ret = dlm_drop_lockres_ref(dlm, res);
 204                if (ret < 0) {
 205                        mlog_errno(ret);
 206                        if (!dlm_is_host_down(ret))
 207                                BUG();
 208                }
 209                mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n",
 210                     dlm->name, res->lockname.len, res->lockname.name, ret);
 211                spin_lock(&dlm->spinlock);
 212        }
 213
 214        spin_lock(&res->spinlock);
 215        if (!list_empty(&res->purge)) {
 216                mlog(0, "removing lockres %.*s:%p from purgelist, "
 217                     "master = %d\n", res->lockname.len, res->lockname.name,
 218                     res, master);
 219                list_del_init(&res->purge);
 220                spin_unlock(&res->spinlock);
 221                dlm_lockres_put(res);
 222                dlm->purge_count--;
 223        } else
 224                spin_unlock(&res->spinlock);
 225
 226        __dlm_unhash_lockres(res);
 227
 228        /* lockres is not in the hash now.  drop the flag and wake up
 229         * any processes waiting in dlm_get_lock_resource. */
 230        if (!master) {
 231                spin_lock(&res->spinlock);
 232                res->state &= ~DLM_LOCK_RES_DROPPING_REF;
 233                spin_unlock(&res->spinlock);
 234                wake_up(&res->wq);
 235        }
 236        return 0;
 237}
 238
 239static void dlm_run_purge_list(struct dlm_ctxt *dlm,
 240                               int purge_now)
 241{
 242        unsigned int run_max, unused;
 243        unsigned long purge_jiffies;
 244        struct dlm_lock_resource *lockres;
 245
 246        spin_lock(&dlm->spinlock);
 247        run_max = dlm->purge_count;
 248
 249        while(run_max && !list_empty(&dlm->purge_list)) {
 250                run_max--;
 251
 252                lockres = list_entry(dlm->purge_list.next,
 253                                     struct dlm_lock_resource, purge);
 254
 255                /* Status of the lockres *might* change so double
 256                 * check. If the lockres is unused, holding the dlm
 257                 * spinlock will prevent people from getting and more
 258                 * refs on it -- there's no need to keep the lockres
 259                 * spinlock. */
 260                spin_lock(&lockres->spinlock);
 261                unused = __dlm_lockres_unused(lockres);
 262                spin_unlock(&lockres->spinlock);
 263
 264                if (!unused)
 265                        continue;
 266
 267                purge_jiffies = lockres->last_used +
 268                        msecs_to_jiffies(DLM_PURGE_INTERVAL_MS);
 269
 270                /* Make sure that we want to be processing this guy at
 271                 * this time. */
 272                if (!purge_now && time_after(purge_jiffies, jiffies)) {
 273                        /* Since resources are added to the purge list
 274                         * in tail order, we can stop at the first
 275                         * unpurgable resource -- anyone added after
 276                         * him will have a greater last_used value */
 277                        break;
 278                }
 279
 280                dlm_lockres_get(lockres);
 281
 282                /* This may drop and reacquire the dlm spinlock if it
 283                 * has to do migration. */
 284                if (dlm_purge_lockres(dlm, lockres))
 285                        BUG();
 286
 287                dlm_lockres_put(lockres);
 288
 289                /* Avoid adding any scheduling latencies */
 290                cond_resched_lock(&dlm->spinlock);
 291        }
 292
 293        spin_unlock(&dlm->spinlock);
 294}
 295
 296static void dlm_shuffle_lists(struct dlm_ctxt *dlm,
 297                              struct dlm_lock_resource *res)
 298{
 299        struct dlm_lock *lock, *target;
 300        struct list_head *iter;
 301        struct list_head *head;
 302        int can_grant = 1;
 303
 304        //mlog(0, "res->lockname.len=%d\n", res->lockname.len);
 305        //mlog(0, "res->lockname.name=%p\n", res->lockname.name);
 306        //mlog(0, "shuffle res %.*s\n", res->lockname.len,
 307        //        res->lockname.name);
 308
 309        /* because this function is called with the lockres
 310         * spinlock, and because we know that it is not migrating/
 311         * recovering/in-progress, it is fine to reserve asts and
 312         * basts right before queueing them all throughout */
 313        assert_spin_locked(&res->spinlock);
 314        BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING|
 315                              DLM_LOCK_RES_RECOVERING|
 316                              DLM_LOCK_RES_IN_PROGRESS)));
 317
 318converting:
 319        if (list_empty(&res->converting))
 320                goto blocked;
 321        mlog(0, "res %.*s has locks on a convert queue\n", res->lockname.len,
 322             res->lockname.name);
 323
 324        target = list_entry(res->converting.next, struct dlm_lock, list);
 325        if (target->ml.convert_type == LKM_IVMODE) {
 326                mlog(ML_ERROR, "%.*s: converting a lock with no "
 327                     "convert_type!\n", res->lockname.len, res->lockname.name);
 328                BUG();
 329        }
 330        head = &res->granted;
 331        list_for_each(iter, head) {
 332                lock = list_entry(iter, struct dlm_lock, list);
 333                if (lock==target)
 334                        continue;
 335                if (!dlm_lock_compatible(lock->ml.type,
 336                                         target->ml.convert_type)) {
 337                        can_grant = 0;
 338                        /* queue the BAST if not already */
 339                        if (lock->ml.highest_blocked == LKM_IVMODE) {
 340                                __dlm_lockres_reserve_ast(res);
 341                                dlm_queue_bast(dlm, lock);
 342                        }
 343                        /* update the highest_blocked if needed */
 344                        if (lock->ml.highest_blocked < target->ml.convert_type)
 345                                lock->ml.highest_blocked =
 346                                        target->ml.convert_type;
 347                }
 348        }
 349        head = &res->converting;
 350        list_for_each(iter, head) {
 351                lock = list_entry(iter, struct dlm_lock, list);
 352                if (lock==target)
 353                        continue;
 354                if (!dlm_lock_compatible(lock->ml.type,
 355                                         target->ml.convert_type)) {
 356                        can_grant = 0;
 357                        if (lock->ml.highest_blocked == LKM_IVMODE) {
 358                                __dlm_lockres_reserve_ast(res);
 359                                dlm_queue_bast(dlm, lock);
 360                        }
 361                        if (lock->ml.highest_blocked < target->ml.convert_type)
 362                                lock->ml.highest_blocked =
 363                                        target->ml.convert_type;
 364                }
 365        }
 366
 367        /* we can convert the lock */
 368        if (can_grant) {
 369                spin_lock(&target->spinlock);
 370                BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
 371
 372                mlog(0, "calling ast for converting lock: %.*s, have: %d, "
 373                     "granting: %d, node: %u\n", res->lockname.len,
 374                     res->lockname.name, target->ml.type,
 375                     target->ml.convert_type, target->ml.node);
 376
 377                target->ml.type = target->ml.convert_type;
 378                target->ml.convert_type = LKM_IVMODE;
 379                list_move_tail(&target->list, &res->granted);
 380
 381                BUG_ON(!target->lksb);
 382                target->lksb->status = DLM_NORMAL;
 383
 384                spin_unlock(&target->spinlock);
 385
 386                __dlm_lockres_reserve_ast(res);
 387                dlm_queue_ast(dlm, target);
 388                /* go back and check for more */
 389                goto converting;
 390        }
 391
 392blocked:
 393        if (list_empty(&res->blocked))
 394                goto leave;
 395        target = list_entry(res->blocked.next, struct dlm_lock, list);
 396
 397        head = &res->granted;
 398        list_for_each(iter, head) {
 399                lock = list_entry(iter, struct dlm_lock, list);
 400                if (lock==target)
 401                        continue;
 402                if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
 403                        can_grant = 0;
 404                        if (lock->ml.highest_blocked == LKM_IVMODE) {
 405                                __dlm_lockres_reserve_ast(res);
 406                                dlm_queue_bast(dlm, lock);
 407                        }
 408                        if (lock->ml.highest_blocked < target->ml.type)
 409                                lock->ml.highest_blocked = target->ml.type;
 410                }
 411        }
 412
 413        head = &res->converting;
 414        list_for_each(iter, head) {
 415                lock = list_entry(iter, struct dlm_lock, list);
 416                if (lock==target)
 417                        continue;
 418                if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
 419                        can_grant = 0;
 420                        if (lock->ml.highest_blocked == LKM_IVMODE) {
 421                                __dlm_lockres_reserve_ast(res);
 422                                dlm_queue_bast(dlm, lock);
 423                        }
 424                        if (lock->ml.highest_blocked < target->ml.type)
 425                                lock->ml.highest_blocked = target->ml.type;
 426                }
 427        }
 428
 429        /* we can grant the blocked lock (only
 430         * possible if converting list empty) */
 431        if (can_grant) {
 432                spin_lock(&target->spinlock);
 433                BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
 434
 435                mlog(0, "calling ast for blocked lock: %.*s, granting: %d, "
 436                     "node: %u\n", res->lockname.len, res->lockname.name,
 437                     target->ml.type, target->ml.node);
 438
 439                // target->ml.type is already correct
 440                list_move_tail(&target->list, &res->granted);
 441
 442                BUG_ON(!target->lksb);
 443                target->lksb->status = DLM_NORMAL;
 444
 445                spin_unlock(&target->spinlock);
 446
 447                __dlm_lockres_reserve_ast(res);
 448                dlm_queue_ast(dlm, target);
 449                /* go back and check for more */
 450                goto converting;
 451        }
 452
 453leave:
 454        return;
 455}
 456
 457/* must have NO locks when calling this with res !=NULL * */
 458void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 459{
 460        mlog_entry("dlm=%p, res=%p\n", dlm, res);
 461        if (res) {
 462                spin_lock(&dlm->spinlock);
 463                spin_lock(&res->spinlock);
 464                __dlm_dirty_lockres(dlm, res);
 465                spin_unlock(&res->spinlock);
 466                spin_unlock(&dlm->spinlock);
 467        }
 468        wake_up(&dlm->dlm_thread_wq);
 469}
 470
 471void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 472{
 473        mlog_entry("dlm=%p, res=%p\n", dlm, res);
 474
 475        assert_spin_locked(&dlm->spinlock);
 476        assert_spin_locked(&res->spinlock);
 477
 478        /* don't shuffle secondary queues */
 479        if ((res->owner == dlm->node_num)) {
 480                if (res->state & (DLM_LOCK_RES_MIGRATING |
 481                                  DLM_LOCK_RES_BLOCK_DIRTY))
 482                    return;
 483
 484                if (list_empty(&res->dirty)) {
 485                        /* ref for dirty_list */
 486                        dlm_lockres_get(res);
 487                        list_add_tail(&res->dirty, &dlm->dirty_list);
 488                        res->state |= DLM_LOCK_RES_DIRTY;
 489                }
 490        }
 491}
 492
 493
 494/* Launch the NM thread for the mounted volume */
 495int dlm_launch_thread(struct dlm_ctxt *dlm)
 496{
 497        mlog(0, "starting dlm thread...\n");
 498
 499        dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm_thread");
 500        if (IS_ERR(dlm->dlm_thread_task)) {
 501                mlog_errno(PTR_ERR(dlm->dlm_thread_task));
 502                dlm->dlm_thread_task = NULL;
 503                return -EINVAL;
 504        }
 505
 506        return 0;
 507}
 508
 509void dlm_complete_thread(struct dlm_ctxt *dlm)
 510{
 511        if (dlm->dlm_thread_task) {
 512                mlog(ML_KTHREAD, "waiting for dlm thread to exit\n");
 513                kthread_stop(dlm->dlm_thread_task);
 514                dlm->dlm_thread_task = NULL;
 515        }
 516}
 517
 518static int dlm_dirty_list_empty(struct dlm_ctxt *dlm)
 519{
 520        int empty;
 521
 522        spin_lock(&dlm->spinlock);
 523        empty = list_empty(&dlm->dirty_list);
 524        spin_unlock(&dlm->spinlock);
 525
 526        return empty;
 527}
 528
 529static void dlm_flush_asts(struct dlm_ctxt *dlm)
 530{
 531        int ret;
 532        struct dlm_lock *lock;
 533        struct dlm_lock_resource *res;
 534        u8 hi;
 535
 536        spin_lock(&dlm->ast_lock);
 537        while (!list_empty(&dlm->pending_asts)) {
 538                lock = list_entry(dlm->pending_asts.next,
 539                                  struct dlm_lock, ast_list);
 540                /* get an extra ref on lock */
 541                dlm_lock_get(lock);
 542                res = lock->lockres;
 543                mlog(0, "delivering an ast for this lockres\n");
 544
 545                BUG_ON(!lock->ast_pending);
 546
 547                /* remove from list (including ref) */
 548                list_del_init(&lock->ast_list);
 549                dlm_lock_put(lock);
 550                spin_unlock(&dlm->ast_lock);
 551
 552                if (lock->ml.node != dlm->node_num) {
 553                        ret = dlm_do_remote_ast(dlm, res, lock);
 554                        if (ret < 0)
 555                                mlog_errno(ret);
 556                } else
 557                        dlm_do_local_ast(dlm, res, lock);
 558
 559                spin_lock(&dlm->ast_lock);
 560
 561                /* possible that another ast was queued while
 562                 * we were delivering the last one */
 563                if (!list_empty(&lock->ast_list)) {
 564                        mlog(0, "aha another ast got queued while "
 565                             "we were finishing the last one.  will "
 566                             "keep the ast_pending flag set.\n");
 567                } else
 568                        lock->ast_pending = 0;
 569
 570                /* drop the extra ref.
 571                 * this may drop it completely. */
 572                dlm_lock_put(lock);
 573                dlm_lockres_release_ast(dlm, res);
 574        }
 575
 576        while (!list_empty(&dlm->pending_basts)) {
 577                lock = list_entry(dlm->pending_basts.next,
 578                                  struct dlm_lock, bast_list);
 579                /* get an extra ref on lock */
 580                dlm_lock_get(lock);
 581                res = lock->lockres;
 582
 583                BUG_ON(!lock->bast_pending);
 584
 585                /* get the highest blocked lock, and reset */
 586                spin_lock(&lock->spinlock);
 587                BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE);
 588                hi = lock->ml.highest_blocked;
 589                lock->ml.highest_blocked = LKM_IVMODE;
 590                spin_unlock(&lock->spinlock);
 591
 592                /* remove from list (including ref) */
 593                list_del_init(&lock->bast_list);
 594                dlm_lock_put(lock);
 595                spin_unlock(&dlm->ast_lock);
 596
 597                mlog(0, "delivering a bast for this lockres "
 598                     "(blocked = %d\n", hi);
 599
 600                if (lock->ml.node != dlm->node_num) {
 601                        ret = dlm_send_proxy_bast(dlm, res, lock, hi);
 602                        if (ret < 0)
 603                                mlog_errno(ret);
 604                } else
 605                        dlm_do_local_bast(dlm, res, lock, hi);
 606
 607                spin_lock(&dlm->ast_lock);
 608
 609                /* possible that another bast was queued while
 610                 * we were delivering the last one */
 611                if (!list_empty(&lock->bast_list)) {
 612                        mlog(0, "aha another bast got queued while "
 613                             "we were finishing the last one.  will "
 614                             "keep the bast_pending flag set.\n");
 615                } else
 616                        lock->bast_pending = 0;
 617
 618                /* drop the extra ref.
 619                 * this may drop it completely. */
 620                dlm_lock_put(lock);
 621                dlm_lockres_release_ast(dlm, res);
 622        }
 623        wake_up(&dlm->ast_wq);
 624        spin_unlock(&dlm->ast_lock);
 625}
 626
 627
 628#define DLM_THREAD_TIMEOUT_MS (4 * 1000)
 629#define DLM_THREAD_MAX_DIRTY  100
 630#define DLM_THREAD_MAX_ASTS   10
 631
 632static int dlm_thread(void *data)
 633{
 634        struct dlm_lock_resource *res;
 635        struct dlm_ctxt *dlm = data;
 636        unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS);
 637
 638        mlog(0, "dlm thread running for %s...\n", dlm->name);
 639
 640        while (!kthread_should_stop()) {
 641                int n = DLM_THREAD_MAX_DIRTY;
 642
 643                /* dlm_shutting_down is very point-in-time, but that
 644                 * doesn't matter as we'll just loop back around if we
 645                 * get false on the leading edge of a state
 646                 * transition. */
 647                dlm_run_purge_list(dlm, dlm_shutting_down(dlm));
 648
 649                /* We really don't want to hold dlm->spinlock while
 650                 * calling dlm_shuffle_lists on each lockres that
 651                 * needs to have its queues adjusted and AST/BASTs
 652                 * run.  So let's pull each entry off the dirty_list
 653                 * and drop dlm->spinlock ASAP.  Once off the list,
 654                 * res->spinlock needs to be taken again to protect
 655                 * the queues while calling dlm_shuffle_lists.  */
 656                spin_lock(&dlm->spinlock);
 657                while (!list_empty(&dlm->dirty_list)) {
 658                        int delay = 0;
 659                        res = list_entry(dlm->dirty_list.next,
 660                                         struct dlm_lock_resource, dirty);
 661
 662                        /* peel a lockres off, remove it from the list,
 663                         * unset the dirty flag and drop the dlm lock */
 664                        BUG_ON(!res);
 665                        dlm_lockres_get(res);
 666
 667                        spin_lock(&res->spinlock);
 668                        /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */
 669                        list_del_init(&res->dirty);
 670                        spin_unlock(&res->spinlock);
 671                        spin_unlock(&dlm->spinlock);
 672                        /* Drop dirty_list ref */
 673                        dlm_lockres_put(res);
 674
 675                        /* lockres can be re-dirtied/re-added to the
 676                         * dirty_list in this gap, but that is ok */
 677
 678                        spin_lock(&res->spinlock);
 679                        if (res->owner != dlm->node_num) {
 680                                __dlm_print_one_lock_resource(res);
 681                                mlog(ML_ERROR, "inprog:%s, mig:%s, reco:%s, dirty:%s\n",
 682                                     res->state & DLM_LOCK_RES_IN_PROGRESS ? "yes" : "no",
 683                                     res->state & DLM_LOCK_RES_MIGRATING ? "yes" : "no",
 684                                     res->state & DLM_LOCK_RES_RECOVERING ? "yes" : "no",
 685                                     res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
 686                        }
 687                        BUG_ON(res->owner != dlm->node_num);
 688
 689                        /* it is now ok to move lockreses in these states
 690                         * to the dirty list, assuming that they will only be
 691                         * dirty for a short while. */
 692                        BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
 693                        if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
 694                                          DLM_LOCK_RES_RECOVERING)) {
 695                                /* move it to the tail and keep going */
 696                                res->state &= ~DLM_LOCK_RES_DIRTY;
 697                                spin_unlock(&res->spinlock);
 698                                mlog(0, "delaying list shuffling for in-"
 699                                     "progress lockres %.*s, state=%d\n",
 700                                     res->lockname.len, res->lockname.name,
 701                                     res->state);
 702                                delay = 1;
 703                                goto in_progress;
 704                        }
 705
 706                        /* at this point the lockres is not migrating/
 707                         * recovering/in-progress.  we have the lockres
 708                         * spinlock and do NOT have the dlm lock.
 709                         * safe to reserve/queue asts and run the lists. */
 710
 711                        mlog(0, "calling dlm_shuffle_lists with dlm=%s, "
 712                             "res=%.*s\n", dlm->name,
 713                             res->lockname.len, res->lockname.name);
 714
 715                        /* called while holding lockres lock */
 716                        dlm_shuffle_lists(dlm, res);
 717                        res->state &= ~DLM_LOCK_RES_DIRTY;
 718                        spin_unlock(&res->spinlock);
 719
 720                        dlm_lockres_calc_usage(dlm, res);
 721
 722in_progress:
 723
 724                        spin_lock(&dlm->spinlock);
 725                        /* if the lock was in-progress, stick
 726                         * it on the back of the list */
 727                        if (delay) {
 728                                spin_lock(&res->spinlock);
 729                                __dlm_dirty_lockres(dlm, res);
 730                                spin_unlock(&res->spinlock);
 731                        }
 732                        dlm_lockres_put(res);
 733
 734                        /* unlikely, but we may need to give time to
 735                         * other tasks */
 736                        if (!--n) {
 737                                mlog(0, "throttling dlm_thread\n");
 738                                break;
 739                        }
 740                }
 741
 742                spin_unlock(&dlm->spinlock);
 743                dlm_flush_asts(dlm);
 744
 745                /* yield and continue right away if there is more work to do */
 746                if (!n) {
 747                        cond_resched();
 748                        continue;
 749                }
 750
 751                wait_event_interruptible_timeout(dlm->dlm_thread_wq,
 752                                                 !dlm_dirty_list_empty(dlm) ||
 753                                                 kthread_should_stop(),
 754                                                 timeout);
 755        }
 756
 757        mlog(0, "quitting DLM thread\n");
 758        return 0;
 759}
 760