linux/fs/dlm/lockspace.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14#include "dlm_internal.h"
  15#include "lockspace.h"
  16#include "member.h"
  17#include "recoverd.h"
  18#include "dir.h"
  19#include "lowcomms.h"
  20#include "config.h"
  21#include "memory.h"
  22#include "lock.h"
  23#include "recover.h"
  24#include "requestqueue.h"
  25#include "user.h"
  26#include "ast.h"
  27
  28static int                      ls_count;
  29static struct mutex             ls_lock;
  30static struct list_head         lslist;
  31static spinlock_t               lslist_lock;
  32static struct task_struct *     scand_task;
  33
  34
  35static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
  36{
  37        ssize_t ret = len;
  38        int n;
  39        int rc = kstrtoint(buf, 0, &n);
  40
  41        if (rc)
  42                return rc;
  43        ls = dlm_find_lockspace_local(ls->ls_local_handle);
  44        if (!ls)
  45                return -EINVAL;
  46
  47        switch (n) {
  48        case 0:
  49                dlm_ls_stop(ls);
  50                break;
  51        case 1:
  52                dlm_ls_start(ls);
  53                break;
  54        default:
  55                ret = -EINVAL;
  56        }
  57        dlm_put_lockspace(ls);
  58        return ret;
  59}
  60
  61static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
  62{
  63        int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
  64
  65        if (rc)
  66                return rc;
  67        set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
  68        wake_up(&ls->ls_uevent_wait);
  69        return len;
  70}
  71
  72static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
  73{
  74        return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
  75}
  76
  77static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
  78{
  79        int rc = kstrtouint(buf, 0, &ls->ls_global_id);
  80
  81        if (rc)
  82                return rc;
  83        return len;
  84}
  85
  86static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
  87{
  88        return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
  89}
  90
  91static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
  92{
  93        int val;
  94        int rc = kstrtoint(buf, 0, &val);
  95
  96        if (rc)
  97                return rc;
  98        if (val == 1)
  99                set_bit(LSFL_NODIR, &ls->ls_flags);
 100        return len;
 101}
 102
 103static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
 104{
 105        uint32_t status = dlm_recover_status(ls);
 106        return snprintf(buf, PAGE_SIZE, "%x\n", status);
 107}
 108
 109static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
 110{
 111        return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
 112}
 113
 114struct dlm_attr {
 115        struct attribute attr;
 116        ssize_t (*show)(struct dlm_ls *, char *);
 117        ssize_t (*store)(struct dlm_ls *, const char *, size_t);
 118};
 119
 120static struct dlm_attr dlm_attr_control = {
 121        .attr  = {.name = "control", .mode = S_IWUSR},
 122        .store = dlm_control_store
 123};
 124
 125static struct dlm_attr dlm_attr_event = {
 126        .attr  = {.name = "event_done", .mode = S_IWUSR},
 127        .store = dlm_event_store
 128};
 129
 130static struct dlm_attr dlm_attr_id = {
 131        .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
 132        .show  = dlm_id_show,
 133        .store = dlm_id_store
 134};
 135
 136static struct dlm_attr dlm_attr_nodir = {
 137        .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
 138        .show  = dlm_nodir_show,
 139        .store = dlm_nodir_store
 140};
 141
 142static struct dlm_attr dlm_attr_recover_status = {
 143        .attr  = {.name = "recover_status", .mode = S_IRUGO},
 144        .show  = dlm_recover_status_show
 145};
 146
 147static struct dlm_attr dlm_attr_recover_nodeid = {
 148        .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
 149        .show  = dlm_recover_nodeid_show
 150};
 151
 152static struct attribute *dlm_attrs[] = {
 153        &dlm_attr_control.attr,
 154        &dlm_attr_event.attr,
 155        &dlm_attr_id.attr,
 156        &dlm_attr_nodir.attr,
 157        &dlm_attr_recover_status.attr,
 158        &dlm_attr_recover_nodeid.attr,
 159        NULL,
 160};
 161
 162static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
 163                             char *buf)
 164{
 165        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 166        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 167        return a->show ? a->show(ls, buf) : 0;
 168}
 169
 170static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
 171                              const char *buf, size_t len)
 172{
 173        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 174        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 175        return a->store ? a->store(ls, buf, len) : len;
 176}
 177
 178static void lockspace_kobj_release(struct kobject *k)
 179{
 180        struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
 181        kfree(ls);
 182}
 183
 184static const struct sysfs_ops dlm_attr_ops = {
 185        .show  = dlm_attr_show,
 186        .store = dlm_attr_store,
 187};
 188
 189static struct kobj_type dlm_ktype = {
 190        .default_attrs = dlm_attrs,
 191        .sysfs_ops     = &dlm_attr_ops,
 192        .release       = lockspace_kobj_release,
 193};
 194
 195static struct kset *dlm_kset;
 196
 197static int do_uevent(struct dlm_ls *ls, int in)
 198{
 199        int error;
 200
 201        if (in)
 202                kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
 203        else
 204                kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
 205
 206        log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
 207
 208        /* dlm_controld will see the uevent, do the necessary group management
 209           and then write to sysfs to wake us */
 210
 211        error = wait_event_interruptible(ls->ls_uevent_wait,
 212                        test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
 213
 214        log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
 215
 216        if (error)
 217                goto out;
 218
 219        error = ls->ls_uevent_result;
 220 out:
 221        if (error)
 222                log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
 223                          error, ls->ls_uevent_result);
 224        return error;
 225}
 226
 227static int dlm_uevent(struct kset *kset, struct kobject *kobj,
 228                      struct kobj_uevent_env *env)
 229{
 230        struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
 231
 232        add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
 233        return 0;
 234}
 235
 236static struct kset_uevent_ops dlm_uevent_ops = {
 237        .uevent = dlm_uevent,
 238};
 239
 240int __init dlm_lockspace_init(void)
 241{
 242        ls_count = 0;
 243        mutex_init(&ls_lock);
 244        INIT_LIST_HEAD(&lslist);
 245        spin_lock_init(&lslist_lock);
 246
 247        dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
 248        if (!dlm_kset) {
 249                printk(KERN_WARNING "%s: can not create kset\n", __func__);
 250                return -ENOMEM;
 251        }
 252        return 0;
 253}
 254
 255void dlm_lockspace_exit(void)
 256{
 257        kset_unregister(dlm_kset);
 258}
 259
 260static struct dlm_ls *find_ls_to_scan(void)
 261{
 262        struct dlm_ls *ls;
 263
 264        spin_lock(&lslist_lock);
 265        list_for_each_entry(ls, &lslist, ls_list) {
 266                if (time_after_eq(jiffies, ls->ls_scan_time +
 267                                            dlm_config.ci_scan_secs * HZ)) {
 268                        spin_unlock(&lslist_lock);
 269                        return ls;
 270                }
 271        }
 272        spin_unlock(&lslist_lock);
 273        return NULL;
 274}
 275
 276static int dlm_scand(void *data)
 277{
 278        struct dlm_ls *ls;
 279
 280        while (!kthread_should_stop()) {
 281                ls = find_ls_to_scan();
 282                if (ls) {
 283                        if (dlm_lock_recovery_try(ls)) {
 284                                ls->ls_scan_time = jiffies;
 285                                dlm_scan_rsbs(ls);
 286                                dlm_scan_timeout(ls);
 287                                dlm_scan_waiters(ls);
 288                                dlm_unlock_recovery(ls);
 289                        } else {
 290                                ls->ls_scan_time += HZ;
 291                        }
 292                        continue;
 293                }
 294                schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
 295        }
 296        return 0;
 297}
 298
 299static int dlm_scand_start(void)
 300{
 301        struct task_struct *p;
 302        int error = 0;
 303
 304        p = kthread_run(dlm_scand, NULL, "dlm_scand");
 305        if (IS_ERR(p))
 306                error = PTR_ERR(p);
 307        else
 308                scand_task = p;
 309        return error;
 310}
 311
 312static void dlm_scand_stop(void)
 313{
 314        kthread_stop(scand_task);
 315}
 316
 317struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 318{
 319        struct dlm_ls *ls;
 320
 321        spin_lock(&lslist_lock);
 322
 323        list_for_each_entry(ls, &lslist, ls_list) {
 324                if (ls->ls_global_id == id) {
 325                        ls->ls_count++;
 326                        goto out;
 327                }
 328        }
 329        ls = NULL;
 330 out:
 331        spin_unlock(&lslist_lock);
 332        return ls;
 333}
 334
 335struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 336{
 337        struct dlm_ls *ls;
 338
 339        spin_lock(&lslist_lock);
 340        list_for_each_entry(ls, &lslist, ls_list) {
 341                if (ls->ls_local_handle == lockspace) {
 342                        ls->ls_count++;
 343                        goto out;
 344                }
 345        }
 346        ls = NULL;
 347 out:
 348        spin_unlock(&lslist_lock);
 349        return ls;
 350}
 351
 352struct dlm_ls *dlm_find_lockspace_device(int minor)
 353{
 354        struct dlm_ls *ls;
 355
 356        spin_lock(&lslist_lock);
 357        list_for_each_entry(ls, &lslist, ls_list) {
 358                if (ls->ls_device.minor == minor) {
 359                        ls->ls_count++;
 360                        goto out;
 361                }
 362        }
 363        ls = NULL;
 364 out:
 365        spin_unlock(&lslist_lock);
 366        return ls;
 367}
 368
 369void dlm_put_lockspace(struct dlm_ls *ls)
 370{
 371        spin_lock(&lslist_lock);
 372        ls->ls_count--;
 373        spin_unlock(&lslist_lock);
 374}
 375
 376static void remove_lockspace(struct dlm_ls *ls)
 377{
 378        for (;;) {
 379                spin_lock(&lslist_lock);
 380                if (ls->ls_count == 0) {
 381                        WARN_ON(ls->ls_create_count != 0);
 382                        list_del(&ls->ls_list);
 383                        spin_unlock(&lslist_lock);
 384                        return;
 385                }
 386                spin_unlock(&lslist_lock);
 387                ssleep(1);
 388        }
 389}
 390
 391static int threads_start(void)
 392{
 393        int error;
 394
 395        error = dlm_scand_start();
 396        if (error) {
 397                log_print("cannot start dlm_scand thread %d", error);
 398                goto fail;
 399        }
 400
 401        /* Thread for sending/receiving messages for all lockspace's */
 402        error = dlm_lowcomms_start();
 403        if (error) {
 404                log_print("cannot start dlm lowcomms %d", error);
 405                goto scand_fail;
 406        }
 407
 408        return 0;
 409
 410 scand_fail:
 411        dlm_scand_stop();
 412 fail:
 413        return error;
 414}
 415
 416static void threads_stop(void)
 417{
 418        dlm_scand_stop();
 419        dlm_lowcomms_stop();
 420}
 421
 422static int new_lockspace(const char *name, const char *cluster,
 423                         uint32_t flags, int lvblen,
 424                         const struct dlm_lockspace_ops *ops, void *ops_arg,
 425                         int *ops_result, dlm_lockspace_t **lockspace)
 426{
 427        struct dlm_ls *ls;
 428        int i, size, error;
 429        int do_unreg = 0;
 430        int namelen = strlen(name);
 431
 432        if (namelen > DLM_LOCKSPACE_LEN)
 433                return -EINVAL;
 434
 435        if (!lvblen || (lvblen % 8))
 436                return -EINVAL;
 437
 438        if (!try_module_get(THIS_MODULE))
 439                return -EINVAL;
 440
 441        if (!dlm_user_daemon_available()) {
 442                log_print("dlm user daemon not available");
 443                error = -EUNATCH;
 444                goto out;
 445        }
 446
 447        if (ops && ops_result) {
 448                if (!dlm_config.ci_recover_callbacks)
 449                        *ops_result = -EOPNOTSUPP;
 450                else
 451                        *ops_result = 0;
 452        }
 453
 454        if (dlm_config.ci_recover_callbacks && cluster &&
 455            strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
 456                log_print("dlm cluster name %s mismatch %s",
 457                          dlm_config.ci_cluster_name, cluster);
 458                error = -EBADR;
 459                goto out;
 460        }
 461
 462        error = 0;
 463
 464        spin_lock(&lslist_lock);
 465        list_for_each_entry(ls, &lslist, ls_list) {
 466                WARN_ON(ls->ls_create_count <= 0);
 467                if (ls->ls_namelen != namelen)
 468                        continue;
 469                if (memcmp(ls->ls_name, name, namelen))
 470                        continue;
 471                if (flags & DLM_LSFL_NEWEXCL) {
 472                        error = -EEXIST;
 473                        break;
 474                }
 475                ls->ls_create_count++;
 476                *lockspace = ls;
 477                error = 1;
 478                break;
 479        }
 480        spin_unlock(&lslist_lock);
 481
 482        if (error)
 483                goto out;
 484
 485        error = -ENOMEM;
 486
 487        ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
 488        if (!ls)
 489                goto out;
 490        memcpy(ls->ls_name, name, namelen);
 491        ls->ls_namelen = namelen;
 492        ls->ls_lvblen = lvblen;
 493        ls->ls_count = 0;
 494        ls->ls_flags = 0;
 495        ls->ls_scan_time = jiffies;
 496
 497        if (ops && dlm_config.ci_recover_callbacks) {
 498                ls->ls_ops = ops;
 499                ls->ls_ops_arg = ops_arg;
 500        }
 501
 502        if (flags & DLM_LSFL_TIMEWARN)
 503                set_bit(LSFL_TIMEWARN, &ls->ls_flags);
 504
 505        /* ls_exflags are forced to match among nodes, and we don't
 506           need to require all nodes to have some flags set */
 507        ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
 508                                    DLM_LSFL_NEWEXCL));
 509
 510        size = dlm_config.ci_rsbtbl_size;
 511        ls->ls_rsbtbl_size = size;
 512
 513        ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
 514        if (!ls->ls_rsbtbl)
 515                goto out_lsfree;
 516        for (i = 0; i < size; i++) {
 517                ls->ls_rsbtbl[i].keep.rb_node = NULL;
 518                ls->ls_rsbtbl[i].toss.rb_node = NULL;
 519                spin_lock_init(&ls->ls_rsbtbl[i].lock);
 520        }
 521
 522        spin_lock_init(&ls->ls_remove_spin);
 523
 524        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 525                ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
 526                                                 GFP_KERNEL);
 527                if (!ls->ls_remove_names[i])
 528                        goto out_rsbtbl;
 529        }
 530
 531        idr_init(&ls->ls_lkbidr);
 532        spin_lock_init(&ls->ls_lkbidr_spin);
 533
 534        INIT_LIST_HEAD(&ls->ls_waiters);
 535        mutex_init(&ls->ls_waiters_mutex);
 536        INIT_LIST_HEAD(&ls->ls_orphans);
 537        mutex_init(&ls->ls_orphans_mutex);
 538        INIT_LIST_HEAD(&ls->ls_timeout);
 539        mutex_init(&ls->ls_timeout_mutex);
 540
 541        INIT_LIST_HEAD(&ls->ls_new_rsb);
 542        spin_lock_init(&ls->ls_new_rsb_spin);
 543
 544        INIT_LIST_HEAD(&ls->ls_nodes);
 545        INIT_LIST_HEAD(&ls->ls_nodes_gone);
 546        ls->ls_num_nodes = 0;
 547        ls->ls_low_nodeid = 0;
 548        ls->ls_total_weight = 0;
 549        ls->ls_node_array = NULL;
 550
 551        memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
 552        ls->ls_stub_rsb.res_ls = ls;
 553
 554        ls->ls_debug_rsb_dentry = NULL;
 555        ls->ls_debug_waiters_dentry = NULL;
 556
 557        init_waitqueue_head(&ls->ls_uevent_wait);
 558        ls->ls_uevent_result = 0;
 559        init_completion(&ls->ls_members_done);
 560        ls->ls_members_result = -1;
 561
 562        mutex_init(&ls->ls_cb_mutex);
 563        INIT_LIST_HEAD(&ls->ls_cb_delay);
 564
 565        ls->ls_recoverd_task = NULL;
 566        mutex_init(&ls->ls_recoverd_active);
 567        spin_lock_init(&ls->ls_recover_lock);
 568        spin_lock_init(&ls->ls_rcom_spin);
 569        get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
 570        ls->ls_recover_status = 0;
 571        ls->ls_recover_seq = 0;
 572        ls->ls_recover_args = NULL;
 573        init_rwsem(&ls->ls_in_recovery);
 574        init_rwsem(&ls->ls_recv_active);
 575        INIT_LIST_HEAD(&ls->ls_requestqueue);
 576        mutex_init(&ls->ls_requestqueue_mutex);
 577        mutex_init(&ls->ls_clear_proc_locks);
 578
 579        ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
 580        if (!ls->ls_recover_buf)
 581                goto out_lkbidr;
 582
 583        ls->ls_slot = 0;
 584        ls->ls_num_slots = 0;
 585        ls->ls_slots_size = 0;
 586        ls->ls_slots = NULL;
 587
 588        INIT_LIST_HEAD(&ls->ls_recover_list);
 589        spin_lock_init(&ls->ls_recover_list_lock);
 590        idr_init(&ls->ls_recover_idr);
 591        spin_lock_init(&ls->ls_recover_idr_lock);
 592        ls->ls_recover_list_count = 0;
 593        ls->ls_local_handle = ls;
 594        init_waitqueue_head(&ls->ls_wait_general);
 595        INIT_LIST_HEAD(&ls->ls_root_list);
 596        init_rwsem(&ls->ls_root_sem);
 597
 598        spin_lock(&lslist_lock);
 599        ls->ls_create_count = 1;
 600        list_add(&ls->ls_list, &lslist);
 601        spin_unlock(&lslist_lock);
 602
 603        if (flags & DLM_LSFL_FS) {
 604                error = dlm_callback_start(ls);
 605                if (error) {
 606                        log_error(ls, "can't start dlm_callback %d", error);
 607                        goto out_delist;
 608                }
 609        }
 610
 611        init_waitqueue_head(&ls->ls_recover_lock_wait);
 612
 613        /*
 614         * Once started, dlm_recoverd first looks for ls in lslist, then
 615         * initializes ls_in_recovery as locked in "down" mode.  We need
 616         * to wait for the wakeup from dlm_recoverd because in_recovery
 617         * has to start out in down mode.
 618         */
 619
 620        error = dlm_recoverd_start(ls);
 621        if (error) {
 622                log_error(ls, "can't start dlm_recoverd %d", error);
 623                goto out_callback;
 624        }
 625
 626        wait_event(ls->ls_recover_lock_wait,
 627                   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
 628
 629        ls->ls_kobj.kset = dlm_kset;
 630        error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
 631                                     "%s", ls->ls_name);
 632        if (error)
 633                goto out_recoverd;
 634        kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
 635
 636        /* let kobject handle freeing of ls if there's an error */
 637        do_unreg = 1;
 638
 639        /* This uevent triggers dlm_controld in userspace to add us to the
 640           group of nodes that are members of this lockspace (managed by the
 641           cluster infrastructure.)  Once it's done that, it tells us who the
 642           current lockspace members are (via configfs) and then tells the
 643           lockspace to start running (via sysfs) in dlm_ls_start(). */
 644
 645        error = do_uevent(ls, 1);
 646        if (error)
 647                goto out_recoverd;
 648
 649        wait_for_completion(&ls->ls_members_done);
 650        error = ls->ls_members_result;
 651        if (error)
 652                goto out_members;
 653
 654        dlm_create_debug_file(ls);
 655
 656        log_rinfo(ls, "join complete");
 657        *lockspace = ls;
 658        return 0;
 659
 660 out_members:
 661        do_uevent(ls, 0);
 662        dlm_clear_members(ls);
 663        kfree(ls->ls_node_array);
 664 out_recoverd:
 665        dlm_recoverd_stop(ls);
 666 out_callback:
 667        dlm_callback_stop(ls);
 668 out_delist:
 669        spin_lock(&lslist_lock);
 670        list_del(&ls->ls_list);
 671        spin_unlock(&lslist_lock);
 672        idr_destroy(&ls->ls_recover_idr);
 673        kfree(ls->ls_recover_buf);
 674 out_lkbidr:
 675        idr_destroy(&ls->ls_lkbidr);
 676        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 677                if (ls->ls_remove_names[i])
 678                        kfree(ls->ls_remove_names[i]);
 679        }
 680 out_rsbtbl:
 681        vfree(ls->ls_rsbtbl);
 682 out_lsfree:
 683        if (do_unreg)
 684                kobject_put(&ls->ls_kobj);
 685        else
 686                kfree(ls);
 687 out:
 688        module_put(THIS_MODULE);
 689        return error;
 690}
 691
 692int dlm_new_lockspace(const char *name, const char *cluster,
 693                      uint32_t flags, int lvblen,
 694                      const struct dlm_lockspace_ops *ops, void *ops_arg,
 695                      int *ops_result, dlm_lockspace_t **lockspace)
 696{
 697        int error = 0;
 698
 699        mutex_lock(&ls_lock);
 700        if (!ls_count)
 701                error = threads_start();
 702        if (error)
 703                goto out;
 704
 705        error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
 706                              ops_result, lockspace);
 707        if (!error)
 708                ls_count++;
 709        if (error > 0)
 710                error = 0;
 711        if (!ls_count)
 712                threads_stop();
 713 out:
 714        mutex_unlock(&ls_lock);
 715        return error;
 716}
 717
 718static int lkb_idr_is_local(int id, void *p, void *data)
 719{
 720        struct dlm_lkb *lkb = p;
 721
 722        return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
 723}
 724
 725static int lkb_idr_is_any(int id, void *p, void *data)
 726{
 727        return 1;
 728}
 729
 730static int lkb_idr_free(int id, void *p, void *data)
 731{
 732        struct dlm_lkb *lkb = p;
 733
 734        if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
 735                dlm_free_lvb(lkb->lkb_lvbptr);
 736
 737        dlm_free_lkb(lkb);
 738        return 0;
 739}
 740
 741/* NOTE: We check the lkbidr here rather than the resource table.
 742   This is because there may be LKBs queued as ASTs that have been unlinked
 743   from their RSBs and are pending deletion once the AST has been delivered */
 744
 745static int lockspace_busy(struct dlm_ls *ls, int force)
 746{
 747        int rv;
 748
 749        spin_lock(&ls->ls_lkbidr_spin);
 750        if (force == 0) {
 751                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
 752        } else if (force == 1) {
 753                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
 754        } else {
 755                rv = 0;
 756        }
 757        spin_unlock(&ls->ls_lkbidr_spin);
 758        return rv;
 759}
 760
 761static int release_lockspace(struct dlm_ls *ls, int force)
 762{
 763        struct dlm_rsb *rsb;
 764        struct rb_node *n;
 765        int i, busy, rv;
 766
 767        busy = lockspace_busy(ls, force);
 768
 769        spin_lock(&lslist_lock);
 770        if (ls->ls_create_count == 1) {
 771                if (busy) {
 772                        rv = -EBUSY;
 773                } else {
 774                        /* remove_lockspace takes ls off lslist */
 775                        ls->ls_create_count = 0;
 776                        rv = 0;
 777                }
 778        } else if (ls->ls_create_count > 1) {
 779                rv = --ls->ls_create_count;
 780        } else {
 781                rv = -EINVAL;
 782        }
 783        spin_unlock(&lslist_lock);
 784
 785        if (rv) {
 786                log_debug(ls, "release_lockspace no remove %d", rv);
 787                return rv;
 788        }
 789
 790        dlm_device_deregister(ls);
 791
 792        if (force < 3 && dlm_user_daemon_available())
 793                do_uevent(ls, 0);
 794
 795        dlm_recoverd_stop(ls);
 796
 797        dlm_callback_stop(ls);
 798
 799        remove_lockspace(ls);
 800
 801        dlm_delete_debug_file(ls);
 802
 803        kfree(ls->ls_recover_buf);
 804
 805        /*
 806         * Free all lkb's in idr
 807         */
 808
 809        idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
 810        idr_destroy(&ls->ls_lkbidr);
 811
 812        /*
 813         * Free all rsb's on rsbtbl[] lists
 814         */
 815
 816        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 817                while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
 818                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 819                        rb_erase(n, &ls->ls_rsbtbl[i].keep);
 820                        dlm_free_rsb(rsb);
 821                }
 822
 823                while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
 824                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 825                        rb_erase(n, &ls->ls_rsbtbl[i].toss);
 826                        dlm_free_rsb(rsb);
 827                }
 828        }
 829
 830        vfree(ls->ls_rsbtbl);
 831
 832        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 833                kfree(ls->ls_remove_names[i]);
 834
 835        while (!list_empty(&ls->ls_new_rsb)) {
 836                rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
 837                                       res_hashchain);
 838                list_del(&rsb->res_hashchain);
 839                dlm_free_rsb(rsb);
 840        }
 841
 842        /*
 843         * Free structures on any other lists
 844         */
 845
 846        dlm_purge_requestqueue(ls);
 847        kfree(ls->ls_recover_args);
 848        dlm_clear_members(ls);
 849        dlm_clear_members_gone(ls);
 850        kfree(ls->ls_node_array);
 851        log_rinfo(ls, "release_lockspace final free");
 852        kobject_put(&ls->ls_kobj);
 853        /* The ls structure will be freed when the kobject is done with */
 854
 855        module_put(THIS_MODULE);
 856        return 0;
 857}
 858
 859/*
 860 * Called when a system has released all its locks and is not going to use the
 861 * lockspace any longer.  We free everything we're managing for this lockspace.
 862 * Remaining nodes will go through the recovery process as if we'd died.  The
 863 * lockspace must continue to function as usual, participating in recoveries,
 864 * until this returns.
 865 *
 866 * Force has 4 possible values:
 867 * 0 - don't destroy locksapce if it has any LKBs
 868 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
 869 * 2 - destroy lockspace regardless of LKBs
 870 * 3 - destroy lockspace as part of a forced shutdown
 871 */
 872
 873int dlm_release_lockspace(void *lockspace, int force)
 874{
 875        struct dlm_ls *ls;
 876        int error;
 877
 878        ls = dlm_find_lockspace_local(lockspace);
 879        if (!ls)
 880                return -EINVAL;
 881        dlm_put_lockspace(ls);
 882
 883        mutex_lock(&ls_lock);
 884        error = release_lockspace(ls, force);
 885        if (!error)
 886                ls_count--;
 887        if (!ls_count)
 888                threads_stop();
 889        mutex_unlock(&ls_lock);
 890
 891        return error;
 892}
 893
 894void dlm_stop_lockspaces(void)
 895{
 896        struct dlm_ls *ls;
 897        int count;
 898
 899 restart:
 900        count = 0;
 901        spin_lock(&lslist_lock);
 902        list_for_each_entry(ls, &lslist, ls_list) {
 903                if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
 904                        count++;
 905                        continue;
 906                }
 907                spin_unlock(&lslist_lock);
 908                log_error(ls, "no userland control daemon, stopping lockspace");
 909                dlm_ls_stop(ls);
 910                goto restart;
 911        }
 912        spin_unlock(&lslist_lock);
 913
 914        if (count)
 915                log_print("dlm user daemon left %d lockspaces", count);
 916}
 917
 918