linux/fs/dlm/lockspace.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14#include "dlm_internal.h"
  15#include "lockspace.h"
  16#include "member.h"
  17#include "recoverd.h"
  18#include "ast.h"
  19#include "dir.h"
  20#include "lowcomms.h"
  21#include "config.h"
  22#include "memory.h"
  23#include "lock.h"
  24#include "recover.h"
  25#include "requestqueue.h"
  26#include "user.h"
  27
  28static int                      ls_count;
  29static struct mutex             ls_lock;
  30static struct list_head         lslist;
  31static spinlock_t               lslist_lock;
  32static struct task_struct *     scand_task;
  33
  34
  35static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
  36{
  37        ssize_t ret = len;
  38        int n = simple_strtol(buf, NULL, 0);
  39
  40        ls = dlm_find_lockspace_local(ls->ls_local_handle);
  41        if (!ls)
  42                return -EINVAL;
  43
  44        switch (n) {
  45        case 0:
  46                dlm_ls_stop(ls);
  47                break;
  48        case 1:
  49                dlm_ls_start(ls);
  50                break;
  51        default:
  52                ret = -EINVAL;
  53        }
  54        dlm_put_lockspace(ls);
  55        return ret;
  56}
  57
  58static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
  59{
  60        ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
  61        set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
  62        wake_up(&ls->ls_uevent_wait);
  63        return len;
  64}
  65
  66static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
  67{
  68        return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
  69}
  70
  71static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
  72{
  73        ls->ls_global_id = simple_strtoul(buf, NULL, 0);
  74        return len;
  75}
  76
  77static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
  78{
  79        uint32_t status = dlm_recover_status(ls);
  80        return snprintf(buf, PAGE_SIZE, "%x\n", status);
  81}
  82
  83static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
  84{
  85        return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
  86}
  87
  88struct dlm_attr {
  89        struct attribute attr;
  90        ssize_t (*show)(struct dlm_ls *, char *);
  91        ssize_t (*store)(struct dlm_ls *, const char *, size_t);
  92};
  93
  94static struct dlm_attr dlm_attr_control = {
  95        .attr  = {.name = "control", .mode = S_IWUSR},
  96        .store = dlm_control_store
  97};
  98
  99static struct dlm_attr dlm_attr_event = {
 100        .attr  = {.name = "event_done", .mode = S_IWUSR},
 101        .store = dlm_event_store
 102};
 103
 104static struct dlm_attr dlm_attr_id = {
 105        .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
 106        .show  = dlm_id_show,
 107        .store = dlm_id_store
 108};
 109
 110static struct dlm_attr dlm_attr_recover_status = {
 111        .attr  = {.name = "recover_status", .mode = S_IRUGO},
 112        .show  = dlm_recover_status_show
 113};
 114
 115static struct dlm_attr dlm_attr_recover_nodeid = {
 116        .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
 117        .show  = dlm_recover_nodeid_show
 118};
 119
 120static struct attribute *dlm_attrs[] = {
 121        &dlm_attr_control.attr,
 122        &dlm_attr_event.attr,
 123        &dlm_attr_id.attr,
 124        &dlm_attr_recover_status.attr,
 125        &dlm_attr_recover_nodeid.attr,
 126        NULL,
 127};
 128
 129static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
 130                             char *buf)
 131{
 132        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 133        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 134        return a->show ? a->show(ls, buf) : 0;
 135}
 136
 137static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
 138                              const char *buf, size_t len)
 139{
 140        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 141        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 142        return a->store ? a->store(ls, buf, len) : len;
 143}
 144
 145static void lockspace_kobj_release(struct kobject *k)
 146{
 147        struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
 148        kfree(ls);
 149}
 150
 151static struct sysfs_ops dlm_attr_ops = {
 152        .show  = dlm_attr_show,
 153        .store = dlm_attr_store,
 154};
 155
 156static struct kobj_type dlm_ktype = {
 157        .default_attrs = dlm_attrs,
 158        .sysfs_ops     = &dlm_attr_ops,
 159        .release       = lockspace_kobj_release,
 160};
 161
 162static struct kset *dlm_kset;
 163
 164static int do_uevent(struct dlm_ls *ls, int in)
 165{
 166        int error;
 167
 168        if (in)
 169                kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
 170        else
 171                kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
 172
 173        log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
 174
 175        /* dlm_controld will see the uevent, do the necessary group management
 176           and then write to sysfs to wake us */
 177
 178        error = wait_event_interruptible(ls->ls_uevent_wait,
 179                        test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
 180
 181        log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
 182
 183        if (error)
 184                goto out;
 185
 186        error = ls->ls_uevent_result;
 187 out:
 188        if (error)
 189                log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
 190                          error, ls->ls_uevent_result);
 191        return error;
 192}
 193
 194
 195int __init dlm_lockspace_init(void)
 196{
 197        ls_count = 0;
 198        mutex_init(&ls_lock);
 199        INIT_LIST_HEAD(&lslist);
 200        spin_lock_init(&lslist_lock);
 201
 202        dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
 203        if (!dlm_kset) {
 204                printk(KERN_WARNING "%s: can not create kset\n", __func__);
 205                return -ENOMEM;
 206        }
 207        return 0;
 208}
 209
 210void dlm_lockspace_exit(void)
 211{
 212        kset_unregister(dlm_kset);
 213}
 214
 215static struct dlm_ls *find_ls_to_scan(void)
 216{
 217        struct dlm_ls *ls;
 218
 219        spin_lock(&lslist_lock);
 220        list_for_each_entry(ls, &lslist, ls_list) {
 221                if (time_after_eq(jiffies, ls->ls_scan_time +
 222                                            dlm_config.ci_scan_secs * HZ)) {
 223                        spin_unlock(&lslist_lock);
 224                        return ls;
 225                }
 226        }
 227        spin_unlock(&lslist_lock);
 228        return NULL;
 229}
 230
 231static int dlm_scand(void *data)
 232{
 233        struct dlm_ls *ls;
 234        int timeout_jiffies = dlm_config.ci_scan_secs * HZ;
 235
 236        while (!kthread_should_stop()) {
 237                ls = find_ls_to_scan();
 238                if (ls) {
 239                        if (dlm_lock_recovery_try(ls)) {
 240                                ls->ls_scan_time = jiffies;
 241                                dlm_scan_rsbs(ls);
 242                                dlm_scan_timeout(ls);
 243                                dlm_unlock_recovery(ls);
 244                        } else {
 245                                ls->ls_scan_time += HZ;
 246                        }
 247                } else {
 248                        schedule_timeout_interruptible(timeout_jiffies);
 249                }
 250        }
 251        return 0;
 252}
 253
 254static int dlm_scand_start(void)
 255{
 256        struct task_struct *p;
 257        int error = 0;
 258
 259        p = kthread_run(dlm_scand, NULL, "dlm_scand");
 260        if (IS_ERR(p))
 261                error = PTR_ERR(p);
 262        else
 263                scand_task = p;
 264        return error;
 265}
 266
 267static void dlm_scand_stop(void)
 268{
 269        kthread_stop(scand_task);
 270}
 271
 272struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 273{
 274        struct dlm_ls *ls;
 275
 276        spin_lock(&lslist_lock);
 277
 278        list_for_each_entry(ls, &lslist, ls_list) {
 279                if (ls->ls_global_id == id) {
 280                        ls->ls_count++;
 281                        goto out;
 282                }
 283        }
 284        ls = NULL;
 285 out:
 286        spin_unlock(&lslist_lock);
 287        return ls;
 288}
 289
 290struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 291{
 292        struct dlm_ls *ls;
 293
 294        spin_lock(&lslist_lock);
 295        list_for_each_entry(ls, &lslist, ls_list) {
 296                if (ls->ls_local_handle == lockspace) {
 297                        ls->ls_count++;
 298                        goto out;
 299                }
 300        }
 301        ls = NULL;
 302 out:
 303        spin_unlock(&lslist_lock);
 304        return ls;
 305}
 306
 307struct dlm_ls *dlm_find_lockspace_device(int minor)
 308{
 309        struct dlm_ls *ls;
 310
 311        spin_lock(&lslist_lock);
 312        list_for_each_entry(ls, &lslist, ls_list) {
 313                if (ls->ls_device.minor == minor) {
 314                        ls->ls_count++;
 315                        goto out;
 316                }
 317        }
 318        ls = NULL;
 319 out:
 320        spin_unlock(&lslist_lock);
 321        return ls;
 322}
 323
 324void dlm_put_lockspace(struct dlm_ls *ls)
 325{
 326        spin_lock(&lslist_lock);
 327        ls->ls_count--;
 328        spin_unlock(&lslist_lock);
 329}
 330
 331static void remove_lockspace(struct dlm_ls *ls)
 332{
 333        for (;;) {
 334                spin_lock(&lslist_lock);
 335                if (ls->ls_count == 0) {
 336                        WARN_ON(ls->ls_create_count != 0);
 337                        list_del(&ls->ls_list);
 338                        spin_unlock(&lslist_lock);
 339                        return;
 340                }
 341                spin_unlock(&lslist_lock);
 342                ssleep(1);
 343        }
 344}
 345
 346static int threads_start(void)
 347{
 348        int error;
 349
 350        /* Thread which process lock requests for all lockspace's */
 351        error = dlm_astd_start();
 352        if (error) {
 353                log_print("cannot start dlm_astd thread %d", error);
 354                goto fail;
 355        }
 356
 357        error = dlm_scand_start();
 358        if (error) {
 359                log_print("cannot start dlm_scand thread %d", error);
 360                goto astd_fail;
 361        }
 362
 363        /* Thread for sending/receiving messages for all lockspace's */
 364        error = dlm_lowcomms_start();
 365        if (error) {
 366                log_print("cannot start dlm lowcomms %d", error);
 367                goto scand_fail;
 368        }
 369
 370        return 0;
 371
 372 scand_fail:
 373        dlm_scand_stop();
 374 astd_fail:
 375        dlm_astd_stop();
 376 fail:
 377        return error;
 378}
 379
 380static void threads_stop(void)
 381{
 382        dlm_scand_stop();
 383        dlm_lowcomms_stop();
 384        dlm_astd_stop();
 385}
 386
 387static int new_lockspace(const char *name, int namelen, void **lockspace,
 388                         uint32_t flags, int lvblen)
 389{
 390        struct dlm_ls *ls;
 391        int i, size, error;
 392        int do_unreg = 0;
 393
 394        if (namelen > DLM_LOCKSPACE_LEN)
 395                return -EINVAL;
 396
 397        if (!lvblen || (lvblen % 8))
 398                return -EINVAL;
 399
 400        if (!try_module_get(THIS_MODULE))
 401                return -EINVAL;
 402
 403        if (!dlm_user_daemon_available()) {
 404                module_put(THIS_MODULE);
 405                return -EUNATCH;
 406        }
 407
 408        error = 0;
 409
 410        spin_lock(&lslist_lock);
 411        list_for_each_entry(ls, &lslist, ls_list) {
 412                WARN_ON(ls->ls_create_count <= 0);
 413                if (ls->ls_namelen != namelen)
 414                        continue;
 415                if (memcmp(ls->ls_name, name, namelen))
 416                        continue;
 417                if (flags & DLM_LSFL_NEWEXCL) {
 418                        error = -EEXIST;
 419                        break;
 420                }
 421                ls->ls_create_count++;
 422                *lockspace = ls;
 423                error = 1;
 424                break;
 425        }
 426        spin_unlock(&lslist_lock);
 427
 428        if (error)
 429                goto out;
 430
 431        error = -ENOMEM;
 432
 433        ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
 434        if (!ls)
 435                goto out;
 436        memcpy(ls->ls_name, name, namelen);
 437        ls->ls_namelen = namelen;
 438        ls->ls_lvblen = lvblen;
 439        ls->ls_count = 0;
 440        ls->ls_flags = 0;
 441        ls->ls_scan_time = jiffies;
 442
 443        if (flags & DLM_LSFL_TIMEWARN)
 444                set_bit(LSFL_TIMEWARN, &ls->ls_flags);
 445
 446        if (flags & DLM_LSFL_FS)
 447                ls->ls_allocation = GFP_NOFS;
 448        else
 449                ls->ls_allocation = GFP_KERNEL;
 450
 451        /* ls_exflags are forced to match among nodes, and we don't
 452           need to require all nodes to have some flags set */
 453        ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
 454                                    DLM_LSFL_NEWEXCL));
 455
 456        size = dlm_config.ci_rsbtbl_size;
 457        ls->ls_rsbtbl_size = size;
 458
 459        ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
 460        if (!ls->ls_rsbtbl)
 461                goto out_lsfree;
 462        for (i = 0; i < size; i++) {
 463                INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
 464                INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
 465                spin_lock_init(&ls->ls_rsbtbl[i].lock);
 466        }
 467
 468        size = dlm_config.ci_lkbtbl_size;
 469        ls->ls_lkbtbl_size = size;
 470
 471        ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
 472        if (!ls->ls_lkbtbl)
 473                goto out_rsbfree;
 474        for (i = 0; i < size; i++) {
 475                INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
 476                rwlock_init(&ls->ls_lkbtbl[i].lock);
 477                ls->ls_lkbtbl[i].counter = 1;
 478        }
 479
 480        size = dlm_config.ci_dirtbl_size;
 481        ls->ls_dirtbl_size = size;
 482
 483        ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
 484        if (!ls->ls_dirtbl)
 485                goto out_lkbfree;
 486        for (i = 0; i < size; i++) {
 487                INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
 488                spin_lock_init(&ls->ls_dirtbl[i].lock);
 489        }
 490
 491        INIT_LIST_HEAD(&ls->ls_waiters);
 492        mutex_init(&ls->ls_waiters_mutex);
 493        INIT_LIST_HEAD(&ls->ls_orphans);
 494        mutex_init(&ls->ls_orphans_mutex);
 495        INIT_LIST_HEAD(&ls->ls_timeout);
 496        mutex_init(&ls->ls_timeout_mutex);
 497
 498        INIT_LIST_HEAD(&ls->ls_nodes);
 499        INIT_LIST_HEAD(&ls->ls_nodes_gone);
 500        ls->ls_num_nodes = 0;
 501        ls->ls_low_nodeid = 0;
 502        ls->ls_total_weight = 0;
 503        ls->ls_node_array = NULL;
 504
 505        memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
 506        ls->ls_stub_rsb.res_ls = ls;
 507
 508        ls->ls_debug_rsb_dentry = NULL;
 509        ls->ls_debug_waiters_dentry = NULL;
 510
 511        init_waitqueue_head(&ls->ls_uevent_wait);
 512        ls->ls_uevent_result = 0;
 513        init_completion(&ls->ls_members_done);
 514        ls->ls_members_result = -1;
 515
 516        ls->ls_recoverd_task = NULL;
 517        mutex_init(&ls->ls_recoverd_active);
 518        spin_lock_init(&ls->ls_recover_lock);
 519        spin_lock_init(&ls->ls_rcom_spin);
 520        get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
 521        ls->ls_recover_status = 0;
 522        ls->ls_recover_seq = 0;
 523        ls->ls_recover_args = NULL;
 524        init_rwsem(&ls->ls_in_recovery);
 525        init_rwsem(&ls->ls_recv_active);
 526        INIT_LIST_HEAD(&ls->ls_requestqueue);
 527        mutex_init(&ls->ls_requestqueue_mutex);
 528        mutex_init(&ls->ls_clear_proc_locks);
 529
 530        ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
 531        if (!ls->ls_recover_buf)
 532                goto out_dirfree;
 533
 534        INIT_LIST_HEAD(&ls->ls_recover_list);
 535        spin_lock_init(&ls->ls_recover_list_lock);
 536        ls->ls_recover_list_count = 0;
 537        ls->ls_local_handle = ls;
 538        init_waitqueue_head(&ls->ls_wait_general);
 539        INIT_LIST_HEAD(&ls->ls_root_list);
 540        init_rwsem(&ls->ls_root_sem);
 541
 542        down_write(&ls->ls_in_recovery);
 543
 544        spin_lock(&lslist_lock);
 545        ls->ls_create_count = 1;
 546        list_add(&ls->ls_list, &lslist);
 547        spin_unlock(&lslist_lock);
 548
 549        /* needs to find ls in lslist */
 550        error = dlm_recoverd_start(ls);
 551        if (error) {
 552                log_error(ls, "can't start dlm_recoverd %d", error);
 553                goto out_delist;
 554        }
 555
 556        ls->ls_kobj.kset = dlm_kset;
 557        error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
 558                                     "%s", ls->ls_name);
 559        if (error)
 560                goto out_stop;
 561        kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
 562
 563        /* let kobject handle freeing of ls if there's an error */
 564        do_unreg = 1;
 565
 566        /* This uevent triggers dlm_controld in userspace to add us to the
 567           group of nodes that are members of this lockspace (managed by the
 568           cluster infrastructure.)  Once it's done that, it tells us who the
 569           current lockspace members are (via configfs) and then tells the
 570           lockspace to start running (via sysfs) in dlm_ls_start(). */
 571
 572        error = do_uevent(ls, 1);
 573        if (error)
 574                goto out_stop;
 575
 576        wait_for_completion(&ls->ls_members_done);
 577        error = ls->ls_members_result;
 578        if (error)
 579                goto out_members;
 580
 581        dlm_create_debug_file(ls);
 582
 583        log_debug(ls, "join complete");
 584        *lockspace = ls;
 585        return 0;
 586
 587 out_members:
 588        do_uevent(ls, 0);
 589        dlm_clear_members(ls);
 590        kfree(ls->ls_node_array);
 591 out_stop:
 592        dlm_recoverd_stop(ls);
 593 out_delist:
 594        spin_lock(&lslist_lock);
 595        list_del(&ls->ls_list);
 596        spin_unlock(&lslist_lock);
 597        kfree(ls->ls_recover_buf);
 598 out_dirfree:
 599        kfree(ls->ls_dirtbl);
 600 out_lkbfree:
 601        kfree(ls->ls_lkbtbl);
 602 out_rsbfree:
 603        kfree(ls->ls_rsbtbl);
 604 out_lsfree:
 605        if (do_unreg)
 606                kobject_put(&ls->ls_kobj);
 607        else
 608                kfree(ls);
 609 out:
 610        module_put(THIS_MODULE);
 611        return error;
 612}
 613
 614int dlm_new_lockspace(const char *name, int namelen, void **lockspace,
 615                      uint32_t flags, int lvblen)
 616{
 617        int error = 0;
 618
 619        mutex_lock(&ls_lock);
 620        if (!ls_count)
 621                error = threads_start();
 622        if (error)
 623                goto out;
 624
 625        error = new_lockspace(name, namelen, lockspace, flags, lvblen);
 626        if (!error)
 627                ls_count++;
 628        if (error > 0)
 629                error = 0;
 630        if (!ls_count)
 631                threads_stop();
 632 out:
 633        mutex_unlock(&ls_lock);
 634        return error;
 635}
 636
 637/* Return 1 if the lockspace still has active remote locks,
 638 *        2 if the lockspace still has active local locks.
 639 */
 640static int lockspace_busy(struct dlm_ls *ls)
 641{
 642        int i, lkb_found = 0;
 643        struct dlm_lkb *lkb;
 644
 645        /* NOTE: We check the lockidtbl here rather than the resource table.
 646           This is because there may be LKBs queued as ASTs that have been
 647           unlinked from their RSBs and are pending deletion once the AST has
 648           been delivered */
 649
 650        for (i = 0; i < ls->ls_lkbtbl_size; i++) {
 651                read_lock(&ls->ls_lkbtbl[i].lock);
 652                if (!list_empty(&ls->ls_lkbtbl[i].list)) {
 653                        lkb_found = 1;
 654                        list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
 655                                            lkb_idtbl_list) {
 656                                if (!lkb->lkb_nodeid) {
 657                                        read_unlock(&ls->ls_lkbtbl[i].lock);
 658                                        return 2;
 659                                }
 660                        }
 661                }
 662                read_unlock(&ls->ls_lkbtbl[i].lock);
 663        }
 664        return lkb_found;
 665}
 666
 667static int release_lockspace(struct dlm_ls *ls, int force)
 668{
 669        struct dlm_lkb *lkb;
 670        struct dlm_rsb *rsb;
 671        struct list_head *head;
 672        int i, busy, rv;
 673
 674        busy = lockspace_busy(ls);
 675
 676        spin_lock(&lslist_lock);
 677        if (ls->ls_create_count == 1) {
 678                if (busy > force)
 679                        rv = -EBUSY;
 680                else {
 681                        /* remove_lockspace takes ls off lslist */
 682                        ls->ls_create_count = 0;
 683                        rv = 0;
 684                }
 685        } else if (ls->ls_create_count > 1) {
 686                rv = --ls->ls_create_count;
 687        } else {
 688                rv = -EINVAL;
 689        }
 690        spin_unlock(&lslist_lock);
 691
 692        if (rv) {
 693                log_debug(ls, "release_lockspace no remove %d", rv);
 694                return rv;
 695        }
 696
 697        dlm_device_deregister(ls);
 698
 699        if (force < 3 && dlm_user_daemon_available())
 700                do_uevent(ls, 0);
 701
 702        dlm_recoverd_stop(ls);
 703
 704        remove_lockspace(ls);
 705
 706        dlm_delete_debug_file(ls);
 707
 708        dlm_astd_suspend();
 709
 710        kfree(ls->ls_recover_buf);
 711
 712        /*
 713         * Free direntry structs.
 714         */
 715
 716        dlm_dir_clear(ls);
 717        kfree(ls->ls_dirtbl);
 718
 719        /*
 720         * Free all lkb's on lkbtbl[] lists.
 721         */
 722
 723        for (i = 0; i < ls->ls_lkbtbl_size; i++) {
 724                head = &ls->ls_lkbtbl[i].list;
 725                while (!list_empty(head)) {
 726                        lkb = list_entry(head->next, struct dlm_lkb,
 727                                         lkb_idtbl_list);
 728
 729                        list_del(&lkb->lkb_idtbl_list);
 730
 731                        dlm_del_ast(lkb);
 732
 733                        if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
 734                                dlm_free_lvb(lkb->lkb_lvbptr);
 735
 736                        dlm_free_lkb(lkb);
 737                }
 738        }
 739        dlm_astd_resume();
 740
 741        kfree(ls->ls_lkbtbl);
 742
 743        /*
 744         * Free all rsb's on rsbtbl[] lists
 745         */
 746
 747        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 748                head = &ls->ls_rsbtbl[i].list;
 749                while (!list_empty(head)) {
 750                        rsb = list_entry(head->next, struct dlm_rsb,
 751                                         res_hashchain);
 752
 753                        list_del(&rsb->res_hashchain);
 754                        dlm_free_rsb(rsb);
 755                }
 756
 757                head = &ls->ls_rsbtbl[i].toss;
 758                while (!list_empty(head)) {
 759                        rsb = list_entry(head->next, struct dlm_rsb,
 760                                         res_hashchain);
 761                        list_del(&rsb->res_hashchain);
 762                        dlm_free_rsb(rsb);
 763                }
 764        }
 765
 766        kfree(ls->ls_rsbtbl);
 767
 768        /*
 769         * Free structures on any other lists
 770         */
 771
 772        dlm_purge_requestqueue(ls);
 773        kfree(ls->ls_recover_args);
 774        dlm_clear_free_entries(ls);
 775        dlm_clear_members(ls);
 776        dlm_clear_members_gone(ls);
 777        kfree(ls->ls_node_array);
 778        log_debug(ls, "release_lockspace final free");
 779        kobject_put(&ls->ls_kobj);
 780        /* The ls structure will be freed when the kobject is done with */
 781
 782        module_put(THIS_MODULE);
 783        return 0;
 784}
 785
 786/*
 787 * Called when a system has released all its locks and is not going to use the
 788 * lockspace any longer.  We free everything we're managing for this lockspace.
 789 * Remaining nodes will go through the recovery process as if we'd died.  The
 790 * lockspace must continue to function as usual, participating in recoveries,
 791 * until this returns.
 792 *
 793 * Force has 4 possible values:
 794 * 0 - don't destroy locksapce if it has any LKBs
 795 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
 796 * 2 - destroy lockspace regardless of LKBs
 797 * 3 - destroy lockspace as part of a forced shutdown
 798 */
 799
 800int dlm_release_lockspace(void *lockspace, int force)
 801{
 802        struct dlm_ls *ls;
 803        int error;
 804
 805        ls = dlm_find_lockspace_local(lockspace);
 806        if (!ls)
 807                return -EINVAL;
 808        dlm_put_lockspace(ls);
 809
 810        mutex_lock(&ls_lock);
 811        error = release_lockspace(ls, force);
 812        if (!error)
 813                ls_count--;
 814        if (!ls_count)
 815                threads_stop();
 816        mutex_unlock(&ls_lock);
 817
 818        return error;
 819}
 820
 821void dlm_stop_lockspaces(void)
 822{
 823        struct dlm_ls *ls;
 824
 825 restart:
 826        spin_lock(&lslist_lock);
 827        list_for_each_entry(ls, &lslist, ls_list) {
 828                if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
 829                        continue;
 830                spin_unlock(&lslist_lock);
 831                log_error(ls, "no userland control daemon, stopping lockspace");
 832                dlm_ls_stop(ls);
 833                goto restart;
 834        }
 835        spin_unlock(&lslist_lock);
 836}
 837
 838