linux/fs/dlm/lockspace.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14#include "dlm_internal.h"
  15#include "lockspace.h"
  16#include "member.h"
  17#include "recoverd.h"
  18#include "dir.h"
  19#include "lowcomms.h"
  20#include "config.h"
  21#include "memory.h"
  22#include "lock.h"
  23#include "recover.h"
  24#include "requestqueue.h"
  25#include "user.h"
  26#include "ast.h"
  27
  28static int                      ls_count;
  29static struct mutex             ls_lock;
  30static struct list_head         lslist;
  31static spinlock_t               lslist_lock;
  32static struct task_struct *     scand_task;
  33
  34
  35static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
  36{
  37        ssize_t ret = len;
  38        int n = simple_strtol(buf, NULL, 0);
  39
  40        ls = dlm_find_lockspace_local(ls->ls_local_handle);
  41        if (!ls)
  42                return -EINVAL;
  43
  44        switch (n) {
  45        case 0:
  46                dlm_ls_stop(ls);
  47                break;
  48        case 1:
  49                dlm_ls_start(ls);
  50                break;
  51        default:
  52                ret = -EINVAL;
  53        }
  54        dlm_put_lockspace(ls);
  55        return ret;
  56}
  57
  58static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
  59{
  60        ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
  61        set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
  62        wake_up(&ls->ls_uevent_wait);
  63        return len;
  64}
  65
  66static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
  67{
  68        return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
  69}
  70
  71static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
  72{
  73        ls->ls_global_id = simple_strtoul(buf, NULL, 0);
  74        return len;
  75}
  76
  77static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
  78{
  79        return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
  80}
  81
  82static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
  83{
  84        int val = simple_strtoul(buf, NULL, 0);
  85        if (val == 1)
  86                set_bit(LSFL_NODIR, &ls->ls_flags);
  87        return len;
  88}
  89
  90static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
  91{
  92        uint32_t status = dlm_recover_status(ls);
  93        return snprintf(buf, PAGE_SIZE, "%x\n", status);
  94}
  95
  96static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
  97{
  98        return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
  99}
 100
 101struct dlm_attr {
 102        struct attribute attr;
 103        ssize_t (*show)(struct dlm_ls *, char *);
 104        ssize_t (*store)(struct dlm_ls *, const char *, size_t);
 105};
 106
 107static struct dlm_attr dlm_attr_control = {
 108        .attr  = {.name = "control", .mode = S_IWUSR},
 109        .store = dlm_control_store
 110};
 111
 112static struct dlm_attr dlm_attr_event = {
 113        .attr  = {.name = "event_done", .mode = S_IWUSR},
 114        .store = dlm_event_store
 115};
 116
 117static struct dlm_attr dlm_attr_id = {
 118        .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
 119        .show  = dlm_id_show,
 120        .store = dlm_id_store
 121};
 122
 123static struct dlm_attr dlm_attr_nodir = {
 124        .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
 125        .show  = dlm_nodir_show,
 126        .store = dlm_nodir_store
 127};
 128
 129static struct dlm_attr dlm_attr_recover_status = {
 130        .attr  = {.name = "recover_status", .mode = S_IRUGO},
 131        .show  = dlm_recover_status_show
 132};
 133
 134static struct dlm_attr dlm_attr_recover_nodeid = {
 135        .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
 136        .show  = dlm_recover_nodeid_show
 137};
 138
 139static struct attribute *dlm_attrs[] = {
 140        &dlm_attr_control.attr,
 141        &dlm_attr_event.attr,
 142        &dlm_attr_id.attr,
 143        &dlm_attr_nodir.attr,
 144        &dlm_attr_recover_status.attr,
 145        &dlm_attr_recover_nodeid.attr,
 146        NULL,
 147};
 148
 149static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
 150                             char *buf)
 151{
 152        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 153        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 154        return a->show ? a->show(ls, buf) : 0;
 155}
 156
 157static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
 158                              const char *buf, size_t len)
 159{
 160        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 161        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 162        return a->store ? a->store(ls, buf, len) : len;
 163}
 164
 165static void lockspace_kobj_release(struct kobject *k)
 166{
 167        struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
 168        kfree(ls);
 169}
 170
 171static const struct sysfs_ops dlm_attr_ops = {
 172        .show  = dlm_attr_show,
 173        .store = dlm_attr_store,
 174};
 175
 176static struct kobj_type dlm_ktype = {
 177        .default_attrs = dlm_attrs,
 178        .sysfs_ops     = &dlm_attr_ops,
 179        .release       = lockspace_kobj_release,
 180};
 181
 182static struct kset *dlm_kset;
 183
 184static int do_uevent(struct dlm_ls *ls, int in)
 185{
 186        int error;
 187
 188        if (in)
 189                kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
 190        else
 191                kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
 192
 193        log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
 194
 195        /* dlm_controld will see the uevent, do the necessary group management
 196           and then write to sysfs to wake us */
 197
 198        error = wait_event_interruptible(ls->ls_uevent_wait,
 199                        test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
 200
 201        log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
 202
 203        if (error)
 204                goto out;
 205
 206        error = ls->ls_uevent_result;
 207 out:
 208        if (error)
 209                log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
 210                          error, ls->ls_uevent_result);
 211        return error;
 212}
 213
 214static int dlm_uevent(struct kset *kset, struct kobject *kobj,
 215                      struct kobj_uevent_env *env)
 216{
 217        struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
 218
 219        add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
 220        return 0;
 221}
 222
 223static struct kset_uevent_ops dlm_uevent_ops = {
 224        .uevent = dlm_uevent,
 225};
 226
 227int __init dlm_lockspace_init(void)
 228{
 229        ls_count = 0;
 230        mutex_init(&ls_lock);
 231        INIT_LIST_HEAD(&lslist);
 232        spin_lock_init(&lslist_lock);
 233
 234        dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
 235        if (!dlm_kset) {
 236                printk(KERN_WARNING "%s: can not create kset\n", __func__);
 237                return -ENOMEM;
 238        }
 239        return 0;
 240}
 241
 242void dlm_lockspace_exit(void)
 243{
 244        kset_unregister(dlm_kset);
 245}
 246
 247static struct dlm_ls *find_ls_to_scan(void)
 248{
 249        struct dlm_ls *ls;
 250
 251        spin_lock(&lslist_lock);
 252        list_for_each_entry(ls, &lslist, ls_list) {
 253                if (time_after_eq(jiffies, ls->ls_scan_time +
 254                                            dlm_config.ci_scan_secs * HZ)) {
 255                        spin_unlock(&lslist_lock);
 256                        return ls;
 257                }
 258        }
 259        spin_unlock(&lslist_lock);
 260        return NULL;
 261}
 262
 263static int dlm_scand(void *data)
 264{
 265        struct dlm_ls *ls;
 266
 267        while (!kthread_should_stop()) {
 268                ls = find_ls_to_scan();
 269                if (ls) {
 270                        if (dlm_lock_recovery_try(ls)) {
 271                                ls->ls_scan_time = jiffies;
 272                                dlm_scan_rsbs(ls);
 273                                dlm_scan_timeout(ls);
 274                                dlm_scan_waiters(ls);
 275                                dlm_unlock_recovery(ls);
 276                        } else {
 277                                ls->ls_scan_time += HZ;
 278                        }
 279                        continue;
 280                }
 281                schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
 282        }
 283        return 0;
 284}
 285
 286static int dlm_scand_start(void)
 287{
 288        struct task_struct *p;
 289        int error = 0;
 290
 291        p = kthread_run(dlm_scand, NULL, "dlm_scand");
 292        if (IS_ERR(p))
 293                error = PTR_ERR(p);
 294        else
 295                scand_task = p;
 296        return error;
 297}
 298
 299static void dlm_scand_stop(void)
 300{
 301        kthread_stop(scand_task);
 302}
 303
 304struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 305{
 306        struct dlm_ls *ls;
 307
 308        spin_lock(&lslist_lock);
 309
 310        list_for_each_entry(ls, &lslist, ls_list) {
 311                if (ls->ls_global_id == id) {
 312                        ls->ls_count++;
 313                        goto out;
 314                }
 315        }
 316        ls = NULL;
 317 out:
 318        spin_unlock(&lslist_lock);
 319        return ls;
 320}
 321
 322struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 323{
 324        struct dlm_ls *ls;
 325
 326        spin_lock(&lslist_lock);
 327        list_for_each_entry(ls, &lslist, ls_list) {
 328                if (ls->ls_local_handle == lockspace) {
 329                        ls->ls_count++;
 330                        goto out;
 331                }
 332        }
 333        ls = NULL;
 334 out:
 335        spin_unlock(&lslist_lock);
 336        return ls;
 337}
 338
 339struct dlm_ls *dlm_find_lockspace_device(int minor)
 340{
 341        struct dlm_ls *ls;
 342
 343        spin_lock(&lslist_lock);
 344        list_for_each_entry(ls, &lslist, ls_list) {
 345                if (ls->ls_device.minor == minor) {
 346                        ls->ls_count++;
 347                        goto out;
 348                }
 349        }
 350        ls = NULL;
 351 out:
 352        spin_unlock(&lslist_lock);
 353        return ls;
 354}
 355
 356void dlm_put_lockspace(struct dlm_ls *ls)
 357{
 358        spin_lock(&lslist_lock);
 359        ls->ls_count--;
 360        spin_unlock(&lslist_lock);
 361}
 362
 363static void remove_lockspace(struct dlm_ls *ls)
 364{
 365        for (;;) {
 366                spin_lock(&lslist_lock);
 367                if (ls->ls_count == 0) {
 368                        WARN_ON(ls->ls_create_count != 0);
 369                        list_del(&ls->ls_list);
 370                        spin_unlock(&lslist_lock);
 371                        return;
 372                }
 373                spin_unlock(&lslist_lock);
 374                ssleep(1);
 375        }
 376}
 377
 378static int threads_start(void)
 379{
 380        int error;
 381
 382        error = dlm_scand_start();
 383        if (error) {
 384                log_print("cannot start dlm_scand thread %d", error);
 385                goto fail;
 386        }
 387
 388        /* Thread for sending/receiving messages for all lockspace's */
 389        error = dlm_lowcomms_start();
 390        if (error) {
 391                log_print("cannot start dlm lowcomms %d", error);
 392                goto scand_fail;
 393        }
 394
 395        return 0;
 396
 397 scand_fail:
 398        dlm_scand_stop();
 399 fail:
 400        return error;
 401}
 402
 403static void threads_stop(void)
 404{
 405        dlm_scand_stop();
 406        dlm_lowcomms_stop();
 407}
 408
 409static int new_lockspace(const char *name, const char *cluster,
 410                         uint32_t flags, int lvblen,
 411                         const struct dlm_lockspace_ops *ops, void *ops_arg,
 412                         int *ops_result, dlm_lockspace_t **lockspace)
 413{
 414        struct dlm_ls *ls;
 415        int i, size, error;
 416        int do_unreg = 0;
 417        int namelen = strlen(name);
 418
 419        if (namelen > DLM_LOCKSPACE_LEN)
 420                return -EINVAL;
 421
 422        if (!lvblen || (lvblen % 8))
 423                return -EINVAL;
 424
 425        if (!try_module_get(THIS_MODULE))
 426                return -EINVAL;
 427
 428        if (!dlm_user_daemon_available()) {
 429                log_print("dlm user daemon not available");
 430                error = -EUNATCH;
 431                goto out;
 432        }
 433
 434        if (ops && ops_result) {
 435                if (!dlm_config.ci_recover_callbacks)
 436                        *ops_result = -EOPNOTSUPP;
 437                else
 438                        *ops_result = 0;
 439        }
 440
 441        if (dlm_config.ci_recover_callbacks && cluster &&
 442            strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
 443                log_print("dlm cluster name %s mismatch %s",
 444                          dlm_config.ci_cluster_name, cluster);
 445                error = -EBADR;
 446                goto out;
 447        }
 448
 449        error = 0;
 450
 451        spin_lock(&lslist_lock);
 452        list_for_each_entry(ls, &lslist, ls_list) {
 453                WARN_ON(ls->ls_create_count <= 0);
 454                if (ls->ls_namelen != namelen)
 455                        continue;
 456                if (memcmp(ls->ls_name, name, namelen))
 457                        continue;
 458                if (flags & DLM_LSFL_NEWEXCL) {
 459                        error = -EEXIST;
 460                        break;
 461                }
 462                ls->ls_create_count++;
 463                *lockspace = ls;
 464                error = 1;
 465                break;
 466        }
 467        spin_unlock(&lslist_lock);
 468
 469        if (error)
 470                goto out;
 471
 472        error = -ENOMEM;
 473
 474        ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
 475        if (!ls)
 476                goto out;
 477        memcpy(ls->ls_name, name, namelen);
 478        ls->ls_namelen = namelen;
 479        ls->ls_lvblen = lvblen;
 480        ls->ls_count = 0;
 481        ls->ls_flags = 0;
 482        ls->ls_scan_time = jiffies;
 483
 484        if (ops && dlm_config.ci_recover_callbacks) {
 485                ls->ls_ops = ops;
 486                ls->ls_ops_arg = ops_arg;
 487        }
 488
 489        if (flags & DLM_LSFL_TIMEWARN)
 490                set_bit(LSFL_TIMEWARN, &ls->ls_flags);
 491
 492        /* ls_exflags are forced to match among nodes, and we don't
 493           need to require all nodes to have some flags set */
 494        ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
 495                                    DLM_LSFL_NEWEXCL));
 496
 497        size = dlm_config.ci_rsbtbl_size;
 498        ls->ls_rsbtbl_size = size;
 499
 500        ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
 501        if (!ls->ls_rsbtbl)
 502                goto out_lsfree;
 503        for (i = 0; i < size; i++) {
 504                ls->ls_rsbtbl[i].keep.rb_node = NULL;
 505                ls->ls_rsbtbl[i].toss.rb_node = NULL;
 506                spin_lock_init(&ls->ls_rsbtbl[i].lock);
 507        }
 508
 509        spin_lock_init(&ls->ls_remove_spin);
 510
 511        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 512                ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
 513                                                 GFP_KERNEL);
 514                if (!ls->ls_remove_names[i])
 515                        goto out_rsbtbl;
 516        }
 517
 518        idr_init(&ls->ls_lkbidr);
 519        spin_lock_init(&ls->ls_lkbidr_spin);
 520
 521        INIT_LIST_HEAD(&ls->ls_waiters);
 522        mutex_init(&ls->ls_waiters_mutex);
 523        INIT_LIST_HEAD(&ls->ls_orphans);
 524        mutex_init(&ls->ls_orphans_mutex);
 525        INIT_LIST_HEAD(&ls->ls_timeout);
 526        mutex_init(&ls->ls_timeout_mutex);
 527
 528        INIT_LIST_HEAD(&ls->ls_new_rsb);
 529        spin_lock_init(&ls->ls_new_rsb_spin);
 530
 531        INIT_LIST_HEAD(&ls->ls_nodes);
 532        INIT_LIST_HEAD(&ls->ls_nodes_gone);
 533        ls->ls_num_nodes = 0;
 534        ls->ls_low_nodeid = 0;
 535        ls->ls_total_weight = 0;
 536        ls->ls_node_array = NULL;
 537
 538        memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
 539        ls->ls_stub_rsb.res_ls = ls;
 540
 541        ls->ls_debug_rsb_dentry = NULL;
 542        ls->ls_debug_waiters_dentry = NULL;
 543
 544        init_waitqueue_head(&ls->ls_uevent_wait);
 545        ls->ls_uevent_result = 0;
 546        init_completion(&ls->ls_members_done);
 547        ls->ls_members_result = -1;
 548
 549        mutex_init(&ls->ls_cb_mutex);
 550        INIT_LIST_HEAD(&ls->ls_cb_delay);
 551
 552        ls->ls_recoverd_task = NULL;
 553        mutex_init(&ls->ls_recoverd_active);
 554        spin_lock_init(&ls->ls_recover_lock);
 555        spin_lock_init(&ls->ls_rcom_spin);
 556        get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
 557        ls->ls_recover_status = 0;
 558        ls->ls_recover_seq = 0;
 559        ls->ls_recover_args = NULL;
 560        init_rwsem(&ls->ls_in_recovery);
 561        init_rwsem(&ls->ls_recv_active);
 562        INIT_LIST_HEAD(&ls->ls_requestqueue);
 563        mutex_init(&ls->ls_requestqueue_mutex);
 564        mutex_init(&ls->ls_clear_proc_locks);
 565
 566        ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
 567        if (!ls->ls_recover_buf)
 568                goto out_lkbidr;
 569
 570        ls->ls_slot = 0;
 571        ls->ls_num_slots = 0;
 572        ls->ls_slots_size = 0;
 573        ls->ls_slots = NULL;
 574
 575        INIT_LIST_HEAD(&ls->ls_recover_list);
 576        spin_lock_init(&ls->ls_recover_list_lock);
 577        idr_init(&ls->ls_recover_idr);
 578        spin_lock_init(&ls->ls_recover_idr_lock);
 579        ls->ls_recover_list_count = 0;
 580        ls->ls_local_handle = ls;
 581        init_waitqueue_head(&ls->ls_wait_general);
 582        INIT_LIST_HEAD(&ls->ls_root_list);
 583        init_rwsem(&ls->ls_root_sem);
 584
 585        spin_lock(&lslist_lock);
 586        ls->ls_create_count = 1;
 587        list_add(&ls->ls_list, &lslist);
 588        spin_unlock(&lslist_lock);
 589
 590        if (flags & DLM_LSFL_FS) {
 591                error = dlm_callback_start(ls);
 592                if (error) {
 593                        log_error(ls, "can't start dlm_callback %d", error);
 594                        goto out_delist;
 595                }
 596        }
 597
 598        init_waitqueue_head(&ls->ls_recover_lock_wait);
 599
 600        /*
 601         * Once started, dlm_recoverd first looks for ls in lslist, then
 602         * initializes ls_in_recovery as locked in "down" mode.  We need
 603         * to wait for the wakeup from dlm_recoverd because in_recovery
 604         * has to start out in down mode.
 605         */
 606
 607        error = dlm_recoverd_start(ls);
 608        if (error) {
 609                log_error(ls, "can't start dlm_recoverd %d", error);
 610                goto out_callback;
 611        }
 612
 613        wait_event(ls->ls_recover_lock_wait,
 614                   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
 615
 616        ls->ls_kobj.kset = dlm_kset;
 617        error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
 618                                     "%s", ls->ls_name);
 619        if (error)
 620                goto out_recoverd;
 621        kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
 622
 623        /* let kobject handle freeing of ls if there's an error */
 624        do_unreg = 1;
 625
 626        /* This uevent triggers dlm_controld in userspace to add us to the
 627           group of nodes that are members of this lockspace (managed by the
 628           cluster infrastructure.)  Once it's done that, it tells us who the
 629           current lockspace members are (via configfs) and then tells the
 630           lockspace to start running (via sysfs) in dlm_ls_start(). */
 631
 632        error = do_uevent(ls, 1);
 633        if (error)
 634                goto out_recoverd;
 635
 636        wait_for_completion(&ls->ls_members_done);
 637        error = ls->ls_members_result;
 638        if (error)
 639                goto out_members;
 640
 641        dlm_create_debug_file(ls);
 642
 643        log_debug(ls, "join complete");
 644        *lockspace = ls;
 645        return 0;
 646
 647 out_members:
 648        do_uevent(ls, 0);
 649        dlm_clear_members(ls);
 650        kfree(ls->ls_node_array);
 651 out_recoverd:
 652        dlm_recoverd_stop(ls);
 653 out_callback:
 654        dlm_callback_stop(ls);
 655 out_delist:
 656        spin_lock(&lslist_lock);
 657        list_del(&ls->ls_list);
 658        spin_unlock(&lslist_lock);
 659        idr_destroy(&ls->ls_recover_idr);
 660        kfree(ls->ls_recover_buf);
 661 out_lkbidr:
 662        idr_destroy(&ls->ls_lkbidr);
 663        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 664                if (ls->ls_remove_names[i])
 665                        kfree(ls->ls_remove_names[i]);
 666        }
 667 out_rsbtbl:
 668        vfree(ls->ls_rsbtbl);
 669 out_lsfree:
 670        if (do_unreg)
 671                kobject_put(&ls->ls_kobj);
 672        else
 673                kfree(ls);
 674 out:
 675        module_put(THIS_MODULE);
 676        return error;
 677}
 678
 679int dlm_new_lockspace(const char *name, const char *cluster,
 680                      uint32_t flags, int lvblen,
 681                      const struct dlm_lockspace_ops *ops, void *ops_arg,
 682                      int *ops_result, dlm_lockspace_t **lockspace)
 683{
 684        int error = 0;
 685
 686        mutex_lock(&ls_lock);
 687        if (!ls_count)
 688                error = threads_start();
 689        if (error)
 690                goto out;
 691
 692        error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
 693                              ops_result, lockspace);
 694        if (!error)
 695                ls_count++;
 696        if (error > 0)
 697                error = 0;
 698        if (!ls_count)
 699                threads_stop();
 700 out:
 701        mutex_unlock(&ls_lock);
 702        return error;
 703}
 704
 705static int lkb_idr_is_local(int id, void *p, void *data)
 706{
 707        struct dlm_lkb *lkb = p;
 708
 709        if (!lkb->lkb_nodeid)
 710                return 1;
 711        return 0;
 712}
 713
 714static int lkb_idr_is_any(int id, void *p, void *data)
 715{
 716        return 1;
 717}
 718
 719static int lkb_idr_free(int id, void *p, void *data)
 720{
 721        struct dlm_lkb *lkb = p;
 722
 723        if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
 724                dlm_free_lvb(lkb->lkb_lvbptr);
 725
 726        dlm_free_lkb(lkb);
 727        return 0;
 728}
 729
 730/* NOTE: We check the lkbidr here rather than the resource table.
 731   This is because there may be LKBs queued as ASTs that have been unlinked
 732   from their RSBs and are pending deletion once the AST has been delivered */
 733
 734static int lockspace_busy(struct dlm_ls *ls, int force)
 735{
 736        int rv;
 737
 738        spin_lock(&ls->ls_lkbidr_spin);
 739        if (force == 0) {
 740                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
 741        } else if (force == 1) {
 742                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
 743        } else {
 744                rv = 0;
 745        }
 746        spin_unlock(&ls->ls_lkbidr_spin);
 747        return rv;
 748}
 749
 750static int release_lockspace(struct dlm_ls *ls, int force)
 751{
 752        struct dlm_rsb *rsb;
 753        struct rb_node *n;
 754        int i, busy, rv;
 755
 756        busy = lockspace_busy(ls, force);
 757
 758        spin_lock(&lslist_lock);
 759        if (ls->ls_create_count == 1) {
 760                if (busy) {
 761                        rv = -EBUSY;
 762                } else {
 763                        /* remove_lockspace takes ls off lslist */
 764                        ls->ls_create_count = 0;
 765                        rv = 0;
 766                }
 767        } else if (ls->ls_create_count > 1) {
 768                rv = --ls->ls_create_count;
 769        } else {
 770                rv = -EINVAL;
 771        }
 772        spin_unlock(&lslist_lock);
 773
 774        if (rv) {
 775                log_debug(ls, "release_lockspace no remove %d", rv);
 776                return rv;
 777        }
 778
 779        dlm_device_deregister(ls);
 780
 781        if (force < 3 && dlm_user_daemon_available())
 782                do_uevent(ls, 0);
 783
 784        dlm_recoverd_stop(ls);
 785
 786        dlm_callback_stop(ls);
 787
 788        remove_lockspace(ls);
 789
 790        dlm_delete_debug_file(ls);
 791
 792        kfree(ls->ls_recover_buf);
 793
 794        /*
 795         * Free all lkb's in idr
 796         */
 797
 798        idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
 799        idr_destroy(&ls->ls_lkbidr);
 800
 801        /*
 802         * Free all rsb's on rsbtbl[] lists
 803         */
 804
 805        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 806                while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
 807                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 808                        rb_erase(n, &ls->ls_rsbtbl[i].keep);
 809                        dlm_free_rsb(rsb);
 810                }
 811
 812                while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
 813                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 814                        rb_erase(n, &ls->ls_rsbtbl[i].toss);
 815                        dlm_free_rsb(rsb);
 816                }
 817        }
 818
 819        vfree(ls->ls_rsbtbl);
 820
 821        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 822                kfree(ls->ls_remove_names[i]);
 823
 824        while (!list_empty(&ls->ls_new_rsb)) {
 825                rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
 826                                       res_hashchain);
 827                list_del(&rsb->res_hashchain);
 828                dlm_free_rsb(rsb);
 829        }
 830
 831        /*
 832         * Free structures on any other lists
 833         */
 834
 835        dlm_purge_requestqueue(ls);
 836        kfree(ls->ls_recover_args);
 837        dlm_clear_members(ls);
 838        dlm_clear_members_gone(ls);
 839        kfree(ls->ls_node_array);
 840        log_debug(ls, "release_lockspace final free");
 841        kobject_put(&ls->ls_kobj);
 842        /* The ls structure will be freed when the kobject is done with */
 843
 844        module_put(THIS_MODULE);
 845        return 0;
 846}
 847
 848/*
 849 * Called when a system has released all its locks and is not going to use the
 850 * lockspace any longer.  We free everything we're managing for this lockspace.
 851 * Remaining nodes will go through the recovery process as if we'd died.  The
 852 * lockspace must continue to function as usual, participating in recoveries,
 853 * until this returns.
 854 *
 855 * Force has 4 possible values:
 856 * 0 - don't destroy locksapce if it has any LKBs
 857 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
 858 * 2 - destroy lockspace regardless of LKBs
 859 * 3 - destroy lockspace as part of a forced shutdown
 860 */
 861
 862int dlm_release_lockspace(void *lockspace, int force)
 863{
 864        struct dlm_ls *ls;
 865        int error;
 866
 867        ls = dlm_find_lockspace_local(lockspace);
 868        if (!ls)
 869                return -EINVAL;
 870        dlm_put_lockspace(ls);
 871
 872        mutex_lock(&ls_lock);
 873        error = release_lockspace(ls, force);
 874        if (!error)
 875                ls_count--;
 876        if (!ls_count)
 877                threads_stop();
 878        mutex_unlock(&ls_lock);
 879
 880        return error;
 881}
 882
 883void dlm_stop_lockspaces(void)
 884{
 885        struct dlm_ls *ls;
 886
 887 restart:
 888        spin_lock(&lslist_lock);
 889        list_for_each_entry(ls, &lslist, ls_list) {
 890                if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
 891                        continue;
 892                spin_unlock(&lslist_lock);
 893                log_error(ls, "no userland control daemon, stopping lockspace");
 894                dlm_ls_stop(ls);
 895                goto restart;
 896        }
 897        spin_unlock(&lslist_lock);
 898}
 899
 900