linux/fs/dlm/lockspace.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14#include "dlm_internal.h"
  15#include "lockspace.h"
  16#include "member.h"
  17#include "recoverd.h"
  18#include "dir.h"
  19#include "lowcomms.h"
  20#include "config.h"
  21#include "memory.h"
  22#include "lock.h"
  23#include "recover.h"
  24#include "requestqueue.h"
  25#include "user.h"
  26#include "ast.h"
  27
  28static int                      ls_count;
  29static struct mutex             ls_lock;
  30static struct list_head         lslist;
  31static spinlock_t               lslist_lock;
  32static struct task_struct *     scand_task;
  33
  34
  35static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
  36{
  37        ssize_t ret = len;
  38        int n = simple_strtol(buf, NULL, 0);
  39
  40        ls = dlm_find_lockspace_local(ls->ls_local_handle);
  41        if (!ls)
  42                return -EINVAL;
  43
  44        switch (n) {
  45        case 0:
  46                dlm_ls_stop(ls);
  47                break;
  48        case 1:
  49                dlm_ls_start(ls);
  50                break;
  51        default:
  52                ret = -EINVAL;
  53        }
  54        dlm_put_lockspace(ls);
  55        return ret;
  56}
  57
  58static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
  59{
  60        ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
  61        set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
  62        wake_up(&ls->ls_uevent_wait);
  63        return len;
  64}
  65
  66static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
  67{
  68        return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
  69}
  70
  71static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
  72{
  73        ls->ls_global_id = simple_strtoul(buf, NULL, 0);
  74        return len;
  75}
  76
  77static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
  78{
  79        return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
  80}
  81
  82static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
  83{
  84        int val = simple_strtoul(buf, NULL, 0);
  85        if (val == 1)
  86                set_bit(LSFL_NODIR, &ls->ls_flags);
  87        return len;
  88}
  89
  90static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
  91{
  92        uint32_t status = dlm_recover_status(ls);
  93        return snprintf(buf, PAGE_SIZE, "%x\n", status);
  94}
  95
  96static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
  97{
  98        return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
  99}
 100
 101struct dlm_attr {
 102        struct attribute attr;
 103        ssize_t (*show)(struct dlm_ls *, char *);
 104        ssize_t (*store)(struct dlm_ls *, const char *, size_t);
 105};
 106
 107static struct dlm_attr dlm_attr_control = {
 108        .attr  = {.name = "control", .mode = S_IWUSR},
 109        .store = dlm_control_store
 110};
 111
 112static struct dlm_attr dlm_attr_event = {
 113        .attr  = {.name = "event_done", .mode = S_IWUSR},
 114        .store = dlm_event_store
 115};
 116
 117static struct dlm_attr dlm_attr_id = {
 118        .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
 119        .show  = dlm_id_show,
 120        .store = dlm_id_store
 121};
 122
 123static struct dlm_attr dlm_attr_nodir = {
 124        .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
 125        .show  = dlm_nodir_show,
 126        .store = dlm_nodir_store
 127};
 128
 129static struct dlm_attr dlm_attr_recover_status = {
 130        .attr  = {.name = "recover_status", .mode = S_IRUGO},
 131        .show  = dlm_recover_status_show
 132};
 133
 134static struct dlm_attr dlm_attr_recover_nodeid = {
 135        .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
 136        .show  = dlm_recover_nodeid_show
 137};
 138
 139static struct attribute *dlm_attrs[] = {
 140        &dlm_attr_control.attr,
 141        &dlm_attr_event.attr,
 142        &dlm_attr_id.attr,
 143        &dlm_attr_nodir.attr,
 144        &dlm_attr_recover_status.attr,
 145        &dlm_attr_recover_nodeid.attr,
 146        NULL,
 147};
 148
 149static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
 150                             char *buf)
 151{
 152        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 153        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 154        return a->show ? a->show(ls, buf) : 0;
 155}
 156
 157static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
 158                              const char *buf, size_t len)
 159{
 160        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 161        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 162        return a->store ? a->store(ls, buf, len) : len;
 163}
 164
 165static void lockspace_kobj_release(struct kobject *k)
 166{
 167        struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
 168        kfree(ls);
 169}
 170
 171static const struct sysfs_ops dlm_attr_ops = {
 172        .show  = dlm_attr_show,
 173        .store = dlm_attr_store,
 174};
 175
 176static struct kobj_type dlm_ktype = {
 177        .default_attrs = dlm_attrs,
 178        .sysfs_ops     = &dlm_attr_ops,
 179        .release       = lockspace_kobj_release,
 180};
 181
 182static struct kset *dlm_kset;
 183
 184static int do_uevent(struct dlm_ls *ls, int in)
 185{
 186        int error;
 187
 188        if (in)
 189                kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
 190        else
 191                kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
 192
 193        log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
 194
 195        /* dlm_controld will see the uevent, do the necessary group management
 196           and then write to sysfs to wake us */
 197
 198        error = wait_event_interruptible(ls->ls_uevent_wait,
 199                        test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
 200
 201        log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
 202
 203        if (error)
 204                goto out;
 205
 206        error = ls->ls_uevent_result;
 207 out:
 208        if (error)
 209                log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
 210                          error, ls->ls_uevent_result);
 211        return error;
 212}
 213
 214static int dlm_uevent(struct kset *kset, struct kobject *kobj,
 215                      struct kobj_uevent_env *env)
 216{
 217        struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
 218
 219        add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
 220        return 0;
 221}
 222
 223static struct kset_uevent_ops dlm_uevent_ops = {
 224        .uevent = dlm_uevent,
 225};
 226
 227int __init dlm_lockspace_init(void)
 228{
 229        ls_count = 0;
 230        mutex_init(&ls_lock);
 231        INIT_LIST_HEAD(&lslist);
 232        spin_lock_init(&lslist_lock);
 233
 234        dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
 235        if (!dlm_kset) {
 236                printk(KERN_WARNING "%s: can not create kset\n", __func__);
 237                return -ENOMEM;
 238        }
 239        return 0;
 240}
 241
 242void dlm_lockspace_exit(void)
 243{
 244        kset_unregister(dlm_kset);
 245}
 246
 247static struct dlm_ls *find_ls_to_scan(void)
 248{
 249        struct dlm_ls *ls;
 250
 251        spin_lock(&lslist_lock);
 252        list_for_each_entry(ls, &lslist, ls_list) {
 253                if (time_after_eq(jiffies, ls->ls_scan_time +
 254                                            dlm_config.ci_scan_secs * HZ)) {
 255                        spin_unlock(&lslist_lock);
 256                        return ls;
 257                }
 258        }
 259        spin_unlock(&lslist_lock);
 260        return NULL;
 261}
 262
 263static int dlm_scand(void *data)
 264{
 265        struct dlm_ls *ls;
 266
 267        while (!kthread_should_stop()) {
 268                ls = find_ls_to_scan();
 269                if (ls) {
 270                        if (dlm_lock_recovery_try(ls)) {
 271                                ls->ls_scan_time = jiffies;
 272                                dlm_scan_rsbs(ls);
 273                                dlm_scan_timeout(ls);
 274                                dlm_scan_waiters(ls);
 275                                dlm_unlock_recovery(ls);
 276                        } else {
 277                                ls->ls_scan_time += HZ;
 278                        }
 279                        continue;
 280                }
 281                schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
 282        }
 283        return 0;
 284}
 285
 286static int dlm_scand_start(void)
 287{
 288        struct task_struct *p;
 289        int error = 0;
 290
 291        p = kthread_run(dlm_scand, NULL, "dlm_scand");
 292        if (IS_ERR(p))
 293                error = PTR_ERR(p);
 294        else
 295                scand_task = p;
 296        return error;
 297}
 298
 299static void dlm_scand_stop(void)
 300{
 301        kthread_stop(scand_task);
 302}
 303
 304struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 305{
 306        struct dlm_ls *ls;
 307
 308        spin_lock(&lslist_lock);
 309
 310        list_for_each_entry(ls, &lslist, ls_list) {
 311                if (ls->ls_global_id == id) {
 312                        ls->ls_count++;
 313                        goto out;
 314                }
 315        }
 316        ls = NULL;
 317 out:
 318        spin_unlock(&lslist_lock);
 319        return ls;
 320}
 321
 322struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 323{
 324        struct dlm_ls *ls;
 325
 326        spin_lock(&lslist_lock);
 327        list_for_each_entry(ls, &lslist, ls_list) {
 328                if (ls->ls_local_handle == lockspace) {
 329                        ls->ls_count++;
 330                        goto out;
 331                }
 332        }
 333        ls = NULL;
 334 out:
 335        spin_unlock(&lslist_lock);
 336        return ls;
 337}
 338
 339struct dlm_ls *dlm_find_lockspace_device(int minor)
 340{
 341        struct dlm_ls *ls;
 342
 343        spin_lock(&lslist_lock);
 344        list_for_each_entry(ls, &lslist, ls_list) {
 345                if (ls->ls_device.minor == minor) {
 346                        ls->ls_count++;
 347                        goto out;
 348                }
 349        }
 350        ls = NULL;
 351 out:
 352        spin_unlock(&lslist_lock);
 353        return ls;
 354}
 355
 356void dlm_put_lockspace(struct dlm_ls *ls)
 357{
 358        spin_lock(&lslist_lock);
 359        ls->ls_count--;
 360        spin_unlock(&lslist_lock);
 361}
 362
 363static void remove_lockspace(struct dlm_ls *ls)
 364{
 365        for (;;) {
 366                spin_lock(&lslist_lock);
 367                if (ls->ls_count == 0) {
 368                        WARN_ON(ls->ls_create_count != 0);
 369                        list_del(&ls->ls_list);
 370                        spin_unlock(&lslist_lock);
 371                        return;
 372                }
 373                spin_unlock(&lslist_lock);
 374                ssleep(1);
 375        }
 376}
 377
 378static int threads_start(void)
 379{
 380        int error;
 381
 382        error = dlm_scand_start();
 383        if (error) {
 384                log_print("cannot start dlm_scand thread %d", error);
 385                goto fail;
 386        }
 387
 388        /* Thread for sending/receiving messages for all lockspace's */
 389        error = dlm_lowcomms_start();
 390        if (error) {
 391                log_print("cannot start dlm lowcomms %d", error);
 392                goto scand_fail;
 393        }
 394
 395        return 0;
 396
 397 scand_fail:
 398        dlm_scand_stop();
 399 fail:
 400        return error;
 401}
 402
 403static void threads_stop(void)
 404{
 405        dlm_scand_stop();
 406        dlm_lowcomms_stop();
 407}
 408
 409static int new_lockspace(const char *name, const char *cluster,
 410                         uint32_t flags, int lvblen,
 411                         const struct dlm_lockspace_ops *ops, void *ops_arg,
 412                         int *ops_result, dlm_lockspace_t **lockspace)
 413{
 414        struct dlm_ls *ls;
 415        int i, size, error;
 416        int do_unreg = 0;
 417        int namelen = strlen(name);
 418
 419        if (namelen > DLM_LOCKSPACE_LEN)
 420                return -EINVAL;
 421
 422        if (!lvblen || (lvblen % 8))
 423                return -EINVAL;
 424
 425        if (!try_module_get(THIS_MODULE))
 426                return -EINVAL;
 427
 428        if (!dlm_user_daemon_available()) {
 429                log_print("dlm user daemon not available");
 430                error = -EUNATCH;
 431                goto out;
 432        }
 433
 434        if (ops && ops_result) {
 435                if (!dlm_config.ci_recover_callbacks)
 436                        *ops_result = -EOPNOTSUPP;
 437                else
 438                        *ops_result = 0;
 439        }
 440
 441        if (dlm_config.ci_recover_callbacks && cluster &&
 442            strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
 443                log_print("dlm cluster name %s mismatch %s",
 444                          dlm_config.ci_cluster_name, cluster);
 445                error = -EBADR;
 446                goto out;
 447        }
 448
 449        error = 0;
 450
 451        spin_lock(&lslist_lock);
 452        list_for_each_entry(ls, &lslist, ls_list) {
 453                WARN_ON(ls->ls_create_count <= 0);
 454                if (ls->ls_namelen != namelen)
 455                        continue;
 456                if (memcmp(ls->ls_name, name, namelen))
 457                        continue;
 458                if (flags & DLM_LSFL_NEWEXCL) {
 459                        error = -EEXIST;
 460                        break;
 461                }
 462                ls->ls_create_count++;
 463                *lockspace = ls;
 464                error = 1;
 465                break;
 466        }
 467        spin_unlock(&lslist_lock);
 468
 469        if (error)
 470                goto out;
 471
 472        error = -ENOMEM;
 473
 474        ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
 475        if (!ls)
 476                goto out;
 477        memcpy(ls->ls_name, name, namelen);
 478        ls->ls_namelen = namelen;
 479        ls->ls_lvblen = lvblen;
 480        ls->ls_count = 0;
 481        ls->ls_flags = 0;
 482        ls->ls_scan_time = jiffies;
 483
 484        if (ops && dlm_config.ci_recover_callbacks) {
 485                ls->ls_ops = ops;
 486                ls->ls_ops_arg = ops_arg;
 487        }
 488
 489        if (flags & DLM_LSFL_TIMEWARN)
 490                set_bit(LSFL_TIMEWARN, &ls->ls_flags);
 491
 492        /* ls_exflags are forced to match among nodes, and we don't
 493           need to require all nodes to have some flags set */
 494        ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
 495                                    DLM_LSFL_NEWEXCL));
 496
 497        size = dlm_config.ci_rsbtbl_size;
 498        ls->ls_rsbtbl_size = size;
 499
 500        ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
 501        if (!ls->ls_rsbtbl)
 502                goto out_lsfree;
 503        for (i = 0; i < size; i++) {
 504                ls->ls_rsbtbl[i].keep.rb_node = NULL;
 505                ls->ls_rsbtbl[i].toss.rb_node = NULL;
 506                spin_lock_init(&ls->ls_rsbtbl[i].lock);
 507        }
 508
 509        spin_lock_init(&ls->ls_remove_spin);
 510
 511        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 512                ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
 513                                                 GFP_KERNEL);
 514                if (!ls->ls_remove_names[i])
 515                        goto out_rsbtbl;
 516        }
 517
 518        idr_init(&ls->ls_lkbidr);
 519        spin_lock_init(&ls->ls_lkbidr_spin);
 520
 521        INIT_LIST_HEAD(&ls->ls_waiters);
 522        mutex_init(&ls->ls_waiters_mutex);
 523        INIT_LIST_HEAD(&ls->ls_orphans);
 524        mutex_init(&ls->ls_orphans_mutex);
 525        INIT_LIST_HEAD(&ls->ls_timeout);
 526        mutex_init(&ls->ls_timeout_mutex);
 527
 528        INIT_LIST_HEAD(&ls->ls_new_rsb);
 529        spin_lock_init(&ls->ls_new_rsb_spin);
 530
 531        INIT_LIST_HEAD(&ls->ls_nodes);
 532        INIT_LIST_HEAD(&ls->ls_nodes_gone);
 533        ls->ls_num_nodes = 0;
 534        ls->ls_low_nodeid = 0;
 535        ls->ls_total_weight = 0;
 536        ls->ls_node_array = NULL;
 537
 538        memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
 539        ls->ls_stub_rsb.res_ls = ls;
 540
 541        ls->ls_debug_rsb_dentry = NULL;
 542        ls->ls_debug_waiters_dentry = NULL;
 543
 544        init_waitqueue_head(&ls->ls_uevent_wait);
 545        ls->ls_uevent_result = 0;
 546        init_completion(&ls->ls_members_done);
 547        ls->ls_members_result = -1;
 548
 549        mutex_init(&ls->ls_cb_mutex);
 550        INIT_LIST_HEAD(&ls->ls_cb_delay);
 551
 552        ls->ls_recoverd_task = NULL;
 553        mutex_init(&ls->ls_recoverd_active);
 554        spin_lock_init(&ls->ls_recover_lock);
 555        spin_lock_init(&ls->ls_rcom_spin);
 556        get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
 557        ls->ls_recover_status = 0;
 558        ls->ls_recover_seq = 0;
 559        ls->ls_recover_args = NULL;
 560        init_rwsem(&ls->ls_in_recovery);
 561        init_rwsem(&ls->ls_recv_active);
 562        INIT_LIST_HEAD(&ls->ls_requestqueue);
 563        mutex_init(&ls->ls_requestqueue_mutex);
 564        mutex_init(&ls->ls_clear_proc_locks);
 565
 566        ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
 567        if (!ls->ls_recover_buf)
 568                goto out_lkbidr;
 569
 570        ls->ls_slot = 0;
 571        ls->ls_num_slots = 0;
 572        ls->ls_slots_size = 0;
 573        ls->ls_slots = NULL;
 574
 575        INIT_LIST_HEAD(&ls->ls_recover_list);
 576        spin_lock_init(&ls->ls_recover_list_lock);
 577        idr_init(&ls->ls_recover_idr);
 578        spin_lock_init(&ls->ls_recover_idr_lock);
 579        ls->ls_recover_list_count = 0;
 580        ls->ls_local_handle = ls;
 581        init_waitqueue_head(&ls->ls_wait_general);
 582        INIT_LIST_HEAD(&ls->ls_root_list);
 583        init_rwsem(&ls->ls_root_sem);
 584
 585        spin_lock(&lslist_lock);
 586        ls->ls_create_count = 1;
 587        list_add(&ls->ls_list, &lslist);
 588        spin_unlock(&lslist_lock);
 589
 590        if (flags & DLM_LSFL_FS) {
 591                error = dlm_callback_start(ls);
 592                if (error) {
 593                        log_error(ls, "can't start dlm_callback %d", error);
 594                        goto out_delist;
 595                }
 596        }
 597
 598        init_waitqueue_head(&ls->ls_recover_lock_wait);
 599
 600        /*
 601         * Once started, dlm_recoverd first looks for ls in lslist, then
 602         * initializes ls_in_recovery as locked in "down" mode.  We need
 603         * to wait for the wakeup from dlm_recoverd because in_recovery
 604         * has to start out in down mode.
 605         */
 606
 607        error = dlm_recoverd_start(ls);
 608        if (error) {
 609                log_error(ls, "can't start dlm_recoverd %d", error);
 610                goto out_callback;
 611        }
 612
 613        wait_event(ls->ls_recover_lock_wait,
 614                   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
 615
 616        ls->ls_kobj.kset = dlm_kset;
 617        error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
 618                                     "%s", ls->ls_name);
 619        if (error)
 620                goto out_recoverd;
 621        kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
 622
 623        /* let kobject handle freeing of ls if there's an error */
 624        do_unreg = 1;
 625
 626        /* This uevent triggers dlm_controld in userspace to add us to the
 627           group of nodes that are members of this lockspace (managed by the
 628           cluster infrastructure.)  Once it's done that, it tells us who the
 629           current lockspace members are (via configfs) and then tells the
 630           lockspace to start running (via sysfs) in dlm_ls_start(). */
 631
 632        error = do_uevent(ls, 1);
 633        if (error)
 634                goto out_recoverd;
 635
 636        wait_for_completion(&ls->ls_members_done);
 637        error = ls->ls_members_result;
 638        if (error)
 639                goto out_members;
 640
 641        dlm_create_debug_file(ls);
 642
 643        log_rinfo(ls, "join complete");
 644        *lockspace = ls;
 645        return 0;
 646
 647 out_members:
 648        do_uevent(ls, 0);
 649        dlm_clear_members(ls);
 650        kfree(ls->ls_node_array);
 651 out_recoverd:
 652        dlm_recoverd_stop(ls);
 653 out_callback:
 654        dlm_callback_stop(ls);
 655 out_delist:
 656        spin_lock(&lslist_lock);
 657        list_del(&ls->ls_list);
 658        spin_unlock(&lslist_lock);
 659        idr_destroy(&ls->ls_recover_idr);
 660        kfree(ls->ls_recover_buf);
 661 out_lkbidr:
 662        idr_destroy(&ls->ls_lkbidr);
 663        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 664                if (ls->ls_remove_names[i])
 665                        kfree(ls->ls_remove_names[i]);
 666        }
 667 out_rsbtbl:
 668        vfree(ls->ls_rsbtbl);
 669 out_lsfree:
 670        if (do_unreg)
 671                kobject_put(&ls->ls_kobj);
 672        else
 673                kfree(ls);
 674 out:
 675        module_put(THIS_MODULE);
 676        return error;
 677}
 678
 679int dlm_new_lockspace(const char *name, const char *cluster,
 680                      uint32_t flags, int lvblen,
 681                      const struct dlm_lockspace_ops *ops, void *ops_arg,
 682                      int *ops_result, dlm_lockspace_t **lockspace)
 683{
 684        int error = 0;
 685
 686        mutex_lock(&ls_lock);
 687        if (!ls_count)
 688                error = threads_start();
 689        if (error)
 690                goto out;
 691
 692        error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
 693                              ops_result, lockspace);
 694        if (!error)
 695                ls_count++;
 696        if (error > 0)
 697                error = 0;
 698        if (!ls_count)
 699                threads_stop();
 700 out:
 701        mutex_unlock(&ls_lock);
 702        return error;
 703}
 704
 705static int lkb_idr_is_local(int id, void *p, void *data)
 706{
 707        struct dlm_lkb *lkb = p;
 708
 709        return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
 710}
 711
 712static int lkb_idr_is_any(int id, void *p, void *data)
 713{
 714        return 1;
 715}
 716
 717static int lkb_idr_free(int id, void *p, void *data)
 718{
 719        struct dlm_lkb *lkb = p;
 720
 721        if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
 722                dlm_free_lvb(lkb->lkb_lvbptr);
 723
 724        dlm_free_lkb(lkb);
 725        return 0;
 726}
 727
 728/* NOTE: We check the lkbidr here rather than the resource table.
 729   This is because there may be LKBs queued as ASTs that have been unlinked
 730   from their RSBs and are pending deletion once the AST has been delivered */
 731
 732static int lockspace_busy(struct dlm_ls *ls, int force)
 733{
 734        int rv;
 735
 736        spin_lock(&ls->ls_lkbidr_spin);
 737        if (force == 0) {
 738                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
 739        } else if (force == 1) {
 740                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
 741        } else {
 742                rv = 0;
 743        }
 744        spin_unlock(&ls->ls_lkbidr_spin);
 745        return rv;
 746}
 747
 748static int release_lockspace(struct dlm_ls *ls, int force)
 749{
 750        struct dlm_rsb *rsb;
 751        struct rb_node *n;
 752        int i, busy, rv;
 753
 754        busy = lockspace_busy(ls, force);
 755
 756        spin_lock(&lslist_lock);
 757        if (ls->ls_create_count == 1) {
 758                if (busy) {
 759                        rv = -EBUSY;
 760                } else {
 761                        /* remove_lockspace takes ls off lslist */
 762                        ls->ls_create_count = 0;
 763                        rv = 0;
 764                }
 765        } else if (ls->ls_create_count > 1) {
 766                rv = --ls->ls_create_count;
 767        } else {
 768                rv = -EINVAL;
 769        }
 770        spin_unlock(&lslist_lock);
 771
 772        if (rv) {
 773                log_debug(ls, "release_lockspace no remove %d", rv);
 774                return rv;
 775        }
 776
 777        dlm_device_deregister(ls);
 778
 779        if (force < 3 && dlm_user_daemon_available())
 780                do_uevent(ls, 0);
 781
 782        dlm_recoverd_stop(ls);
 783
 784        dlm_callback_stop(ls);
 785
 786        remove_lockspace(ls);
 787
 788        dlm_delete_debug_file(ls);
 789
 790        kfree(ls->ls_recover_buf);
 791
 792        /*
 793         * Free all lkb's in idr
 794         */
 795
 796        idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
 797        idr_destroy(&ls->ls_lkbidr);
 798
 799        /*
 800         * Free all rsb's on rsbtbl[] lists
 801         */
 802
 803        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 804                while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
 805                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 806                        rb_erase(n, &ls->ls_rsbtbl[i].keep);
 807                        dlm_free_rsb(rsb);
 808                }
 809
 810                while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
 811                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 812                        rb_erase(n, &ls->ls_rsbtbl[i].toss);
 813                        dlm_free_rsb(rsb);
 814                }
 815        }
 816
 817        vfree(ls->ls_rsbtbl);
 818
 819        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 820                kfree(ls->ls_remove_names[i]);
 821
 822        while (!list_empty(&ls->ls_new_rsb)) {
 823                rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
 824                                       res_hashchain);
 825                list_del(&rsb->res_hashchain);
 826                dlm_free_rsb(rsb);
 827        }
 828
 829        /*
 830         * Free structures on any other lists
 831         */
 832
 833        dlm_purge_requestqueue(ls);
 834        kfree(ls->ls_recover_args);
 835        dlm_clear_members(ls);
 836        dlm_clear_members_gone(ls);
 837        kfree(ls->ls_node_array);
 838        log_rinfo(ls, "release_lockspace final free");
 839        kobject_put(&ls->ls_kobj);
 840        /* The ls structure will be freed when the kobject is done with */
 841
 842        module_put(THIS_MODULE);
 843        return 0;
 844}
 845
 846/*
 847 * Called when a system has released all its locks and is not going to use the
 848 * lockspace any longer.  We free everything we're managing for this lockspace.
 849 * Remaining nodes will go through the recovery process as if we'd died.  The
 850 * lockspace must continue to function as usual, participating in recoveries,
 851 * until this returns.
 852 *
 853 * Force has 4 possible values:
 854 * 0 - don't destroy locksapce if it has any LKBs
 855 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
 856 * 2 - destroy lockspace regardless of LKBs
 857 * 3 - destroy lockspace as part of a forced shutdown
 858 */
 859
 860int dlm_release_lockspace(void *lockspace, int force)
 861{
 862        struct dlm_ls *ls;
 863        int error;
 864
 865        ls = dlm_find_lockspace_local(lockspace);
 866        if (!ls)
 867                return -EINVAL;
 868        dlm_put_lockspace(ls);
 869
 870        mutex_lock(&ls_lock);
 871        error = release_lockspace(ls, force);
 872        if (!error)
 873                ls_count--;
 874        if (!ls_count)
 875                threads_stop();
 876        mutex_unlock(&ls_lock);
 877
 878        return error;
 879}
 880
 881void dlm_stop_lockspaces(void)
 882{
 883        struct dlm_ls *ls;
 884        int count;
 885
 886 restart:
 887        count = 0;
 888        spin_lock(&lslist_lock);
 889        list_for_each_entry(ls, &lslist, ls_list) {
 890                if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
 891                        count++;
 892                        continue;
 893                }
 894                spin_unlock(&lslist_lock);
 895                log_error(ls, "no userland control daemon, stopping lockspace");
 896                dlm_ls_stop(ls);
 897                goto restart;
 898        }
 899        spin_unlock(&lslist_lock);
 900
 901        if (count)
 902                log_print("dlm user daemon left %d lockspaces", count);
 903}
 904
 905