linux/fs/ocfs2/dlm/dlmthread.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3 * dlmthread.c
   4 *
   5 * standalone DLM module
   6 *
   7 * Copyright (C) 2004 Oracle.  All rights reserved.
   8 */
   9
  10
  11#include <linux/module.h>
  12#include <linux/fs.h>
  13#include <linux/types.h>
  14#include <linux/highmem.h>
  15#include <linux/init.h>
  16#include <linux/sysctl.h>
  17#include <linux/random.h>
  18#include <linux/blkdev.h>
  19#include <linux/socket.h>
  20#include <linux/inet.h>
  21#include <linux/timer.h>
  22#include <linux/kthread.h>
  23#include <linux/delay.h>
  24
  25
  26#include "../cluster/heartbeat.h"
  27#include "../cluster/nodemanager.h"
  28#include "../cluster/tcp.h"
  29
  30#include "dlmapi.h"
  31#include "dlmcommon.h"
  32#include "dlmdomain.h"
  33
  34#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD)
  35#include "../cluster/masklog.h"
  36
  37static int dlm_thread(void *data);
  38static void dlm_flush_asts(struct dlm_ctxt *dlm);
  39
  40/* will exit holding res->spinlock, but may drop in function */
  41/* waits until flags are cleared on res->state */
  42void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags)
  43{
  44        DECLARE_WAITQUEUE(wait, current);
  45
  46        assert_spin_locked(&res->spinlock);
  47
  48        add_wait_queue(&res->wq, &wait);
  49repeat:
  50        set_current_state(TASK_UNINTERRUPTIBLE);
  51        if (res->state & flags) {
  52                spin_unlock(&res->spinlock);
  53                schedule();
  54                spin_lock(&res->spinlock);
  55                goto repeat;
  56        }
  57        remove_wait_queue(&res->wq, &wait);
  58        __set_current_state(TASK_RUNNING);
  59}
  60
  61int __dlm_lockres_has_locks(struct dlm_lock_resource *res)
  62{
  63        if (list_empty(&res->granted) &&
  64            list_empty(&res->converting) &&
  65            list_empty(&res->blocked))
  66                return 0;
  67        return 1;
  68}
  69
  70/* "unused": the lockres has no locks, is not on the dirty list,
  71 * has no inflight locks (in the gap between mastery and acquiring
  72 * the first lock), and has no bits in its refmap.
  73 * truly ready to be freed. */
  74int __dlm_lockres_unused(struct dlm_lock_resource *res)
  75{
  76        int bit;
  77
  78        assert_spin_locked(&res->spinlock);
  79
  80        if (__dlm_lockres_has_locks(res))
  81                return 0;
  82
  83        /* Locks are in the process of being created */
  84        if (res->inflight_locks)
  85                return 0;
  86
  87        if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY)
  88                return 0;
  89
  90        if (res->state & (DLM_LOCK_RES_RECOVERING|
  91                        DLM_LOCK_RES_RECOVERY_WAITING))
  92                return 0;
  93
  94        /* Another node has this resource with this node as the master */
  95        bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
  96        if (bit < O2NM_MAX_NODES)
  97                return 0;
  98
  99        return 1;
 100}
 101
 102
 103/* Call whenever you may have added or deleted something from one of
 104 * the lockres queue's. This will figure out whether it belongs on the
 105 * unused list or not and does the appropriate thing. */
 106void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
 107                              struct dlm_lock_resource *res)
 108{
 109        assert_spin_locked(&dlm->spinlock);
 110        assert_spin_locked(&res->spinlock);
 111
 112        if (__dlm_lockres_unused(res)){
 113                if (list_empty(&res->purge)) {
 114                        mlog(0, "%s: Adding res %.*s to purge list\n",
 115                             dlm->name, res->lockname.len, res->lockname.name);
 116
 117                        res->last_used = jiffies;
 118                        dlm_lockres_get(res);
 119                        list_add_tail(&res->purge, &dlm->purge_list);
 120                        dlm->purge_count++;
 121                }
 122        } else if (!list_empty(&res->purge)) {
 123                mlog(0, "%s: Removing res %.*s from purge list\n",
 124                     dlm->name, res->lockname.len, res->lockname.name);
 125
 126                list_del_init(&res->purge);
 127                dlm_lockres_put(res);
 128                dlm->purge_count--;
 129        }
 130}
 131
 132void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
 133                            struct dlm_lock_resource *res)
 134{
 135        spin_lock(&dlm->spinlock);
 136        spin_lock(&res->spinlock);
 137
 138        __dlm_lockres_calc_usage(dlm, res);
 139
 140        spin_unlock(&res->spinlock);
 141        spin_unlock(&dlm->spinlock);
 142}
 143
 144/*
 145 * Do the real purge work:
 146 *     unhash the lockres, and
 147 *     clear flag DLM_LOCK_RES_DROPPING_REF.
 148 * It requires dlm and lockres spinlock to be taken.
 149 */
 150void __dlm_do_purge_lockres(struct dlm_ctxt *dlm,
 151                struct dlm_lock_resource *res)
 152{
 153        assert_spin_locked(&dlm->spinlock);
 154        assert_spin_locked(&res->spinlock);
 155
 156        if (!list_empty(&res->purge)) {
 157                mlog(0, "%s: Removing res %.*s from purgelist\n",
 158                     dlm->name, res->lockname.len, res->lockname.name);
 159                list_del_init(&res->purge);
 160                dlm_lockres_put(res);
 161                dlm->purge_count--;
 162        }
 163
 164        if (!__dlm_lockres_unused(res)) {
 165                mlog(ML_ERROR, "%s: res %.*s in use after deref\n",
 166                     dlm->name, res->lockname.len, res->lockname.name);
 167                __dlm_print_one_lock_resource(res);
 168                BUG();
 169        }
 170
 171        __dlm_unhash_lockres(dlm, res);
 172
 173        spin_lock(&dlm->track_lock);
 174        if (!list_empty(&res->tracking))
 175                list_del_init(&res->tracking);
 176        else {
 177                mlog(ML_ERROR, "%s: Resource %.*s not on the Tracking list\n",
 178                     dlm->name, res->lockname.len, res->lockname.name);
 179                __dlm_print_one_lock_resource(res);
 180        }
 181        spin_unlock(&dlm->track_lock);
 182
 183        /*
 184         * lockres is not in the hash now. drop the flag and wake up
 185         * any processes waiting in dlm_get_lock_resource.
 186         */
 187        res->state &= ~DLM_LOCK_RES_DROPPING_REF;
 188}
 189
 190static void dlm_purge_lockres(struct dlm_ctxt *dlm,
 191                             struct dlm_lock_resource *res)
 192{
 193        int master;
 194        int ret = 0;
 195
 196        assert_spin_locked(&dlm->spinlock);
 197        assert_spin_locked(&res->spinlock);
 198
 199        master = (res->owner == dlm->node_num);
 200
 201        mlog(0, "%s: Purging res %.*s, master %d\n", dlm->name,
 202             res->lockname.len, res->lockname.name, master);
 203
 204        if (!master) {
 205                if (res->state & DLM_LOCK_RES_DROPPING_REF) {
 206                        mlog(ML_NOTICE, "%s: res %.*s already in DLM_LOCK_RES_DROPPING_REF state\n",
 207                                dlm->name, res->lockname.len, res->lockname.name);
 208                        spin_unlock(&res->spinlock);
 209                        return;
 210                }
 211
 212                res->state |= DLM_LOCK_RES_DROPPING_REF;
 213                /* drop spinlock...  retake below */
 214                spin_unlock(&res->spinlock);
 215                spin_unlock(&dlm->spinlock);
 216
 217                spin_lock(&res->spinlock);
 218                /* This ensures that clear refmap is sent after the set */
 219                __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
 220                spin_unlock(&res->spinlock);
 221
 222                /* clear our bit from the master's refmap, ignore errors */
 223                ret = dlm_drop_lockres_ref(dlm, res);
 224                if (ret < 0) {
 225                        if (!dlm_is_host_down(ret))
 226                                BUG();
 227                }
 228                spin_lock(&dlm->spinlock);
 229                spin_lock(&res->spinlock);
 230        }
 231
 232        if (!list_empty(&res->purge)) {
 233                mlog(0, "%s: Removing res %.*s from purgelist, master %d\n",
 234                     dlm->name, res->lockname.len, res->lockname.name, master);
 235                list_del_init(&res->purge);
 236                dlm_lockres_put(res);
 237                dlm->purge_count--;
 238        }
 239
 240        if (!master && ret == DLM_DEREF_RESPONSE_INPROG) {
 241                mlog(0, "%s: deref %.*s in progress\n",
 242                        dlm->name, res->lockname.len, res->lockname.name);
 243                spin_unlock(&res->spinlock);
 244                return;
 245        }
 246
 247        if (!__dlm_lockres_unused(res)) {
 248                mlog(ML_ERROR, "%s: res %.*s in use after deref\n",
 249                     dlm->name, res->lockname.len, res->lockname.name);
 250                __dlm_print_one_lock_resource(res);
 251                BUG();
 252        }
 253
 254        __dlm_unhash_lockres(dlm, res);
 255
 256        spin_lock(&dlm->track_lock);
 257        if (!list_empty(&res->tracking))
 258                list_del_init(&res->tracking);
 259        else {
 260                mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n",
 261                                res->lockname.len, res->lockname.name);
 262                __dlm_print_one_lock_resource(res);
 263        }
 264        spin_unlock(&dlm->track_lock);
 265
 266        /* lockres is not in the hash now.  drop the flag and wake up
 267         * any processes waiting in dlm_get_lock_resource. */
 268        if (!master) {
 269                res->state &= ~DLM_LOCK_RES_DROPPING_REF;
 270                spin_unlock(&res->spinlock);
 271                wake_up(&res->wq);
 272        } else
 273                spin_unlock(&res->spinlock);
 274}
 275
 276static void dlm_run_purge_list(struct dlm_ctxt *dlm,
 277                               int purge_now)
 278{
 279        unsigned int run_max, unused;
 280        unsigned long purge_jiffies;
 281        struct dlm_lock_resource *lockres;
 282
 283        spin_lock(&dlm->spinlock);
 284        run_max = dlm->purge_count;
 285
 286        while(run_max && !list_empty(&dlm->purge_list)) {
 287                run_max--;
 288
 289                lockres = list_entry(dlm->purge_list.next,
 290                                     struct dlm_lock_resource, purge);
 291
 292                spin_lock(&lockres->spinlock);
 293
 294                purge_jiffies = lockres->last_used +
 295                        msecs_to_jiffies(DLM_PURGE_INTERVAL_MS);
 296
 297                /* Make sure that we want to be processing this guy at
 298                 * this time. */
 299                if (!purge_now && time_after(purge_jiffies, jiffies)) {
 300                        /* Since resources are added to the purge list
 301                         * in tail order, we can stop at the first
 302                         * unpurgable resource -- anyone added after
 303                         * him will have a greater last_used value */
 304                        spin_unlock(&lockres->spinlock);
 305                        break;
 306                }
 307
 308                /* Status of the lockres *might* change so double
 309                 * check. If the lockres is unused, holding the dlm
 310                 * spinlock will prevent people from getting and more
 311                 * refs on it. */
 312                unused = __dlm_lockres_unused(lockres);
 313                if (!unused ||
 314                    (lockres->state & DLM_LOCK_RES_MIGRATING) ||
 315                    (lockres->inflight_assert_workers != 0)) {
 316                        mlog(0, "%s: res %.*s is in use or being remastered, "
 317                             "used %d, state %d, assert master workers %u\n",
 318                             dlm->name, lockres->lockname.len,
 319                             lockres->lockname.name,
 320                             !unused, lockres->state,
 321                             lockres->inflight_assert_workers);
 322                        list_move_tail(&lockres->purge, &dlm->purge_list);
 323                        spin_unlock(&lockres->spinlock);
 324                        continue;
 325                }
 326
 327                dlm_lockres_get(lockres);
 328
 329                dlm_purge_lockres(dlm, lockres);
 330
 331                dlm_lockres_put(lockres);
 332
 333                /* Avoid adding any scheduling latencies */
 334                cond_resched_lock(&dlm->spinlock);
 335        }
 336
 337        spin_unlock(&dlm->spinlock);
 338}
 339
 340static void dlm_shuffle_lists(struct dlm_ctxt *dlm,
 341                              struct dlm_lock_resource *res)
 342{
 343        struct dlm_lock *lock, *target;
 344        int can_grant = 1;
 345
 346        /*
 347         * Because this function is called with the lockres
 348         * spinlock, and because we know that it is not migrating/
 349         * recovering/in-progress, it is fine to reserve asts and
 350         * basts right before queueing them all throughout
 351         */
 352        assert_spin_locked(&dlm->ast_lock);
 353        assert_spin_locked(&res->spinlock);
 354        BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING|
 355                              DLM_LOCK_RES_RECOVERING|
 356                              DLM_LOCK_RES_IN_PROGRESS)));
 357
 358converting:
 359        if (list_empty(&res->converting))
 360                goto blocked;
 361        mlog(0, "%s: res %.*s has locks on the convert queue\n", dlm->name,
 362             res->lockname.len, res->lockname.name);
 363
 364        target = list_entry(res->converting.next, struct dlm_lock, list);
 365        if (target->ml.convert_type == LKM_IVMODE) {
 366                mlog(ML_ERROR, "%s: res %.*s converting lock to invalid mode\n",
 367                     dlm->name, res->lockname.len, res->lockname.name);
 368                BUG();
 369        }
 370        list_for_each_entry(lock, &res->granted, list) {
 371                if (lock==target)
 372                        continue;
 373                if (!dlm_lock_compatible(lock->ml.type,
 374                                         target->ml.convert_type)) {
 375                        can_grant = 0;
 376                        /* queue the BAST if not already */
 377                        if (lock->ml.highest_blocked == LKM_IVMODE) {
 378                                __dlm_lockres_reserve_ast(res);
 379                                __dlm_queue_bast(dlm, lock);
 380                        }
 381                        /* update the highest_blocked if needed */
 382                        if (lock->ml.highest_blocked < target->ml.convert_type)
 383                                lock->ml.highest_blocked =
 384                                        target->ml.convert_type;
 385                }
 386        }
 387
 388        list_for_each_entry(lock, &res->converting, list) {
 389                if (lock==target)
 390                        continue;
 391                if (!dlm_lock_compatible(lock->ml.type,
 392                                         target->ml.convert_type)) {
 393                        can_grant = 0;
 394                        if (lock->ml.highest_blocked == LKM_IVMODE) {
 395                                __dlm_lockres_reserve_ast(res);
 396                                __dlm_queue_bast(dlm, lock);
 397                        }
 398                        if (lock->ml.highest_blocked < target->ml.convert_type)
 399                                lock->ml.highest_blocked =
 400                                        target->ml.convert_type;
 401                }
 402        }
 403
 404        /* we can convert the lock */
 405        if (can_grant) {
 406                spin_lock(&target->spinlock);
 407                BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
 408
 409                mlog(0, "%s: res %.*s, AST for Converting lock %u:%llu, type "
 410                     "%d => %d, node %u\n", dlm->name, res->lockname.len,
 411                     res->lockname.name,
 412                     dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)),
 413                     dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)),
 414                     target->ml.type,
 415                     target->ml.convert_type, target->ml.node);
 416
 417                target->ml.type = target->ml.convert_type;
 418                target->ml.convert_type = LKM_IVMODE;
 419                list_move_tail(&target->list, &res->granted);
 420
 421                BUG_ON(!target->lksb);
 422                target->lksb->status = DLM_NORMAL;
 423
 424                spin_unlock(&target->spinlock);
 425
 426                __dlm_lockres_reserve_ast(res);
 427                __dlm_queue_ast(dlm, target);
 428                /* go back and check for more */
 429                goto converting;
 430        }
 431
 432blocked:
 433        if (list_empty(&res->blocked))
 434                goto leave;
 435        target = list_entry(res->blocked.next, struct dlm_lock, list);
 436
 437        list_for_each_entry(lock, &res->granted, list) {
 438                if (lock==target)
 439                        continue;
 440                if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
 441                        can_grant = 0;
 442                        if (lock->ml.highest_blocked == LKM_IVMODE) {
 443                                __dlm_lockres_reserve_ast(res);
 444                                __dlm_queue_bast(dlm, lock);
 445                        }
 446                        if (lock->ml.highest_blocked < target->ml.type)
 447                                lock->ml.highest_blocked = target->ml.type;
 448                }
 449        }
 450
 451        list_for_each_entry(lock, &res->converting, list) {
 452                if (lock==target)
 453                        continue;
 454                if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
 455                        can_grant = 0;
 456                        if (lock->ml.highest_blocked == LKM_IVMODE) {
 457                                __dlm_lockres_reserve_ast(res);
 458                                __dlm_queue_bast(dlm, lock);
 459                        }
 460                        if (lock->ml.highest_blocked < target->ml.type)
 461                                lock->ml.highest_blocked = target->ml.type;
 462                }
 463        }
 464
 465        /* we can grant the blocked lock (only
 466         * possible if converting list empty) */
 467        if (can_grant) {
 468                spin_lock(&target->spinlock);
 469                BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
 470
 471                mlog(0, "%s: res %.*s, AST for Blocked lock %u:%llu, type %d, "
 472                     "node %u\n", dlm->name, res->lockname.len,
 473                     res->lockname.name,
 474                     dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)),
 475                     dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)),
 476                     target->ml.type, target->ml.node);
 477
 478                /* target->ml.type is already correct */
 479                list_move_tail(&target->list, &res->granted);
 480
 481                BUG_ON(!target->lksb);
 482                target->lksb->status = DLM_NORMAL;
 483
 484                spin_unlock(&target->spinlock);
 485
 486                __dlm_lockres_reserve_ast(res);
 487                __dlm_queue_ast(dlm, target);
 488                /* go back and check for more */
 489                goto converting;
 490        }
 491
 492leave:
 493        return;
 494}
 495
 496/* must have NO locks when calling this with res !=NULL * */
 497void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 498{
 499        if (res) {
 500                spin_lock(&dlm->spinlock);
 501                spin_lock(&res->spinlock);
 502                __dlm_dirty_lockres(dlm, res);
 503                spin_unlock(&res->spinlock);
 504                spin_unlock(&dlm->spinlock);
 505        }
 506        wake_up(&dlm->dlm_thread_wq);
 507}
 508
 509void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 510{
 511        assert_spin_locked(&dlm->spinlock);
 512        assert_spin_locked(&res->spinlock);
 513
 514        /* don't shuffle secondary queues */
 515        if (res->owner == dlm->node_num) {
 516                if (res->state & (DLM_LOCK_RES_MIGRATING |
 517                                  DLM_LOCK_RES_BLOCK_DIRTY))
 518                    return;
 519
 520                if (list_empty(&res->dirty)) {
 521                        /* ref for dirty_list */
 522                        dlm_lockres_get(res);
 523                        list_add_tail(&res->dirty, &dlm->dirty_list);
 524                        res->state |= DLM_LOCK_RES_DIRTY;
 525                }
 526        }
 527
 528        mlog(0, "%s: res %.*s\n", dlm->name, res->lockname.len,
 529             res->lockname.name);
 530}
 531
 532
 533/* Launch the NM thread for the mounted volume */
 534int dlm_launch_thread(struct dlm_ctxt *dlm)
 535{
 536        mlog(0, "Starting dlm_thread...\n");
 537
 538        dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm-%s",
 539                        dlm->name);
 540        if (IS_ERR(dlm->dlm_thread_task)) {
 541                mlog_errno(PTR_ERR(dlm->dlm_thread_task));
 542                dlm->dlm_thread_task = NULL;
 543                return -EINVAL;
 544        }
 545
 546        return 0;
 547}
 548
 549void dlm_complete_thread(struct dlm_ctxt *dlm)
 550{
 551        if (dlm->dlm_thread_task) {
 552                mlog(ML_KTHREAD, "Waiting for dlm thread to exit\n");
 553                kthread_stop(dlm->dlm_thread_task);
 554                dlm->dlm_thread_task = NULL;
 555        }
 556}
 557
 558static int dlm_dirty_list_empty(struct dlm_ctxt *dlm)
 559{
 560        int empty;
 561
 562        spin_lock(&dlm->spinlock);
 563        empty = list_empty(&dlm->dirty_list);
 564        spin_unlock(&dlm->spinlock);
 565
 566        return empty;
 567}
 568
 569static void dlm_flush_asts(struct dlm_ctxt *dlm)
 570{
 571        int ret;
 572        struct dlm_lock *lock;
 573        struct dlm_lock_resource *res;
 574        u8 hi;
 575
 576        spin_lock(&dlm->ast_lock);
 577        while (!list_empty(&dlm->pending_asts)) {
 578                lock = list_entry(dlm->pending_asts.next,
 579                                  struct dlm_lock, ast_list);
 580                /* get an extra ref on lock */
 581                dlm_lock_get(lock);
 582                res = lock->lockres;
 583                mlog(0, "%s: res %.*s, Flush AST for lock %u:%llu, type %d, "
 584                     "node %u\n", dlm->name, res->lockname.len,
 585                     res->lockname.name,
 586                     dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
 587                     dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
 588                     lock->ml.type, lock->ml.node);
 589
 590                BUG_ON(!lock->ast_pending);
 591
 592                /* remove from list (including ref) */
 593                list_del_init(&lock->ast_list);
 594                dlm_lock_put(lock);
 595                spin_unlock(&dlm->ast_lock);
 596
 597                if (lock->ml.node != dlm->node_num) {
 598                        ret = dlm_do_remote_ast(dlm, res, lock);
 599                        if (ret < 0)
 600                                mlog_errno(ret);
 601                } else
 602                        dlm_do_local_ast(dlm, res, lock);
 603
 604                spin_lock(&dlm->ast_lock);
 605
 606                /* possible that another ast was queued while
 607                 * we were delivering the last one */
 608                if (!list_empty(&lock->ast_list)) {
 609                        mlog(0, "%s: res %.*s, AST queued while flushing last "
 610                             "one\n", dlm->name, res->lockname.len,
 611                             res->lockname.name);
 612                } else
 613                        lock->ast_pending = 0;
 614
 615                /* drop the extra ref.
 616                 * this may drop it completely. */
 617                dlm_lock_put(lock);
 618                dlm_lockres_release_ast(dlm, res);
 619        }
 620
 621        while (!list_empty(&dlm->pending_basts)) {
 622                lock = list_entry(dlm->pending_basts.next,
 623                                  struct dlm_lock, bast_list);
 624                /* get an extra ref on lock */
 625                dlm_lock_get(lock);
 626                res = lock->lockres;
 627
 628                BUG_ON(!lock->bast_pending);
 629
 630                /* get the highest blocked lock, and reset */
 631                spin_lock(&lock->spinlock);
 632                BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE);
 633                hi = lock->ml.highest_blocked;
 634                lock->ml.highest_blocked = LKM_IVMODE;
 635                spin_unlock(&lock->spinlock);
 636
 637                /* remove from list (including ref) */
 638                list_del_init(&lock->bast_list);
 639                dlm_lock_put(lock);
 640                spin_unlock(&dlm->ast_lock);
 641
 642                mlog(0, "%s: res %.*s, Flush BAST for lock %u:%llu, "
 643                     "blocked %d, node %u\n",
 644                     dlm->name, res->lockname.len, res->lockname.name,
 645                     dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
 646                     dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
 647                     hi, lock->ml.node);
 648
 649                if (lock->ml.node != dlm->node_num) {
 650                        ret = dlm_send_proxy_bast(dlm, res, lock, hi);
 651                        if (ret < 0)
 652                                mlog_errno(ret);
 653                } else
 654                        dlm_do_local_bast(dlm, res, lock, hi);
 655
 656                spin_lock(&dlm->ast_lock);
 657
 658                /* possible that another bast was queued while
 659                 * we were delivering the last one */
 660                if (!list_empty(&lock->bast_list)) {
 661                        mlog(0, "%s: res %.*s, BAST queued while flushing last "
 662                             "one\n", dlm->name, res->lockname.len,
 663                             res->lockname.name);
 664                } else
 665                        lock->bast_pending = 0;
 666
 667                /* drop the extra ref.
 668                 * this may drop it completely. */
 669                dlm_lock_put(lock);
 670                dlm_lockres_release_ast(dlm, res);
 671        }
 672        wake_up(&dlm->ast_wq);
 673        spin_unlock(&dlm->ast_lock);
 674}
 675
 676
 677#define DLM_THREAD_TIMEOUT_MS (4 * 1000)
 678#define DLM_THREAD_MAX_DIRTY  100
 679
 680static int dlm_thread(void *data)
 681{
 682        struct dlm_lock_resource *res;
 683        struct dlm_ctxt *dlm = data;
 684        unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS);
 685
 686        mlog(0, "dlm thread running for %s...\n", dlm->name);
 687
 688        while (!kthread_should_stop()) {
 689                int n = DLM_THREAD_MAX_DIRTY;
 690
 691                /* dlm_shutting_down is very point-in-time, but that
 692                 * doesn't matter as we'll just loop back around if we
 693                 * get false on the leading edge of a state
 694                 * transition. */
 695                dlm_run_purge_list(dlm, dlm_shutting_down(dlm));
 696
 697                /* We really don't want to hold dlm->spinlock while
 698                 * calling dlm_shuffle_lists on each lockres that
 699                 * needs to have its queues adjusted and AST/BASTs
 700                 * run.  So let's pull each entry off the dirty_list
 701                 * and drop dlm->spinlock ASAP.  Once off the list,
 702                 * res->spinlock needs to be taken again to protect
 703                 * the queues while calling dlm_shuffle_lists.  */
 704                spin_lock(&dlm->spinlock);
 705                while (!list_empty(&dlm->dirty_list)) {
 706                        int delay = 0;
 707                        res = list_entry(dlm->dirty_list.next,
 708                                         struct dlm_lock_resource, dirty);
 709
 710                        /* peel a lockres off, remove it from the list,
 711                         * unset the dirty flag and drop the dlm lock */
 712                        BUG_ON(!res);
 713                        dlm_lockres_get(res);
 714
 715                        spin_lock(&res->spinlock);
 716                        /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */
 717                        list_del_init(&res->dirty);
 718                        spin_unlock(&res->spinlock);
 719                        spin_unlock(&dlm->spinlock);
 720                        /* Drop dirty_list ref */
 721                        dlm_lockres_put(res);
 722
 723                        /* lockres can be re-dirtied/re-added to the
 724                         * dirty_list in this gap, but that is ok */
 725
 726                        spin_lock(&dlm->ast_lock);
 727                        spin_lock(&res->spinlock);
 728                        if (res->owner != dlm->node_num) {
 729                                __dlm_print_one_lock_resource(res);
 730                                mlog(ML_ERROR, "%s: inprog %d, mig %d, reco %d,"
 731                                     " dirty %d\n", dlm->name,
 732                                     !!(res->state & DLM_LOCK_RES_IN_PROGRESS),
 733                                     !!(res->state & DLM_LOCK_RES_MIGRATING),
 734                                     !!(res->state & DLM_LOCK_RES_RECOVERING),
 735                                     !!(res->state & DLM_LOCK_RES_DIRTY));
 736                        }
 737                        BUG_ON(res->owner != dlm->node_num);
 738
 739                        /* it is now ok to move lockreses in these states
 740                         * to the dirty list, assuming that they will only be
 741                         * dirty for a short while. */
 742                        BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
 743                        if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
 744                                          DLM_LOCK_RES_RECOVERING |
 745                                          DLM_LOCK_RES_RECOVERY_WAITING)) {
 746                                /* move it to the tail and keep going */
 747                                res->state &= ~DLM_LOCK_RES_DIRTY;
 748                                spin_unlock(&res->spinlock);
 749                                spin_unlock(&dlm->ast_lock);
 750                                mlog(0, "%s: res %.*s, inprogress, delay list "
 751                                     "shuffle, state %d\n", dlm->name,
 752                                     res->lockname.len, res->lockname.name,
 753                                     res->state);
 754                                delay = 1;
 755                                goto in_progress;
 756                        }
 757
 758                        /* at this point the lockres is not migrating/
 759                         * recovering/in-progress.  we have the lockres
 760                         * spinlock and do NOT have the dlm lock.
 761                         * safe to reserve/queue asts and run the lists. */
 762
 763                        /* called while holding lockres lock */
 764                        dlm_shuffle_lists(dlm, res);
 765                        res->state &= ~DLM_LOCK_RES_DIRTY;
 766                        spin_unlock(&res->spinlock);
 767                        spin_unlock(&dlm->ast_lock);
 768
 769                        dlm_lockres_calc_usage(dlm, res);
 770
 771in_progress:
 772
 773                        spin_lock(&dlm->spinlock);
 774                        /* if the lock was in-progress, stick
 775                         * it on the back of the list */
 776                        if (delay) {
 777                                spin_lock(&res->spinlock);
 778                                __dlm_dirty_lockres(dlm, res);
 779                                spin_unlock(&res->spinlock);
 780                        }
 781                        dlm_lockres_put(res);
 782
 783                        /* unlikely, but we may need to give time to
 784                         * other tasks */
 785                        if (!--n) {
 786                                mlog(0, "%s: Throttling dlm thread\n",
 787                                     dlm->name);
 788                                break;
 789                        }
 790                }
 791
 792                spin_unlock(&dlm->spinlock);
 793                dlm_flush_asts(dlm);
 794
 795                /* yield and continue right away if there is more work to do */
 796                if (!n) {
 797                        cond_resched();
 798                        continue;
 799                }
 800
 801                wait_event_interruptible_timeout(dlm->dlm_thread_wq,
 802                                                 !dlm_dirty_list_empty(dlm) ||
 803                                                 kthread_should_stop(),
 804                                                 timeout);
 805        }
 806
 807        mlog(0, "quitting DLM thread\n");
 808        return 0;
 809}
 810