linux/fs/dlm/lockspace.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14#include <linux/module.h>
  15
  16#include "dlm_internal.h"
  17#include "lockspace.h"
  18#include "member.h"
  19#include "recoverd.h"
  20#include "dir.h"
  21#include "lowcomms.h"
  22#include "config.h"
  23#include "memory.h"
  24#include "lock.h"
  25#include "recover.h"
  26#include "requestqueue.h"
  27#include "user.h"
  28#include "ast.h"
  29
  30static int                      ls_count;
  31static struct mutex             ls_lock;
  32static struct list_head         lslist;
  33static spinlock_t               lslist_lock;
  34static struct task_struct *     scand_task;
  35
  36
  37static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
  38{
  39        ssize_t ret = len;
  40        int n;
  41        int rc = kstrtoint(buf, 0, &n);
  42
  43        if (rc)
  44                return rc;
  45        ls = dlm_find_lockspace_local(ls->ls_local_handle);
  46        if (!ls)
  47                return -EINVAL;
  48
  49        switch (n) {
  50        case 0:
  51                dlm_ls_stop(ls);
  52                break;
  53        case 1:
  54                dlm_ls_start(ls);
  55                break;
  56        default:
  57                ret = -EINVAL;
  58        }
  59        dlm_put_lockspace(ls);
  60        return ret;
  61}
  62
  63static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
  64{
  65        int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
  66
  67        if (rc)
  68                return rc;
  69        set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
  70        wake_up(&ls->ls_uevent_wait);
  71        return len;
  72}
  73
  74static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
  75{
  76        return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
  77}
  78
  79static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
  80{
  81        int rc = kstrtouint(buf, 0, &ls->ls_global_id);
  82
  83        if (rc)
  84                return rc;
  85        return len;
  86}
  87
  88static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
  89{
  90        return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
  91}
  92
  93static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
  94{
  95        int val;
  96        int rc = kstrtoint(buf, 0, &val);
  97
  98        if (rc)
  99                return rc;
 100        if (val == 1)
 101                set_bit(LSFL_NODIR, &ls->ls_flags);
 102        return len;
 103}
 104
 105static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
 106{
 107        uint32_t status = dlm_recover_status(ls);
 108        return snprintf(buf, PAGE_SIZE, "%x\n", status);
 109}
 110
 111static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
 112{
 113        return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
 114}
 115
 116struct dlm_attr {
 117        struct attribute attr;
 118        ssize_t (*show)(struct dlm_ls *, char *);
 119        ssize_t (*store)(struct dlm_ls *, const char *, size_t);
 120};
 121
 122static struct dlm_attr dlm_attr_control = {
 123        .attr  = {.name = "control", .mode = S_IWUSR},
 124        .store = dlm_control_store
 125};
 126
 127static struct dlm_attr dlm_attr_event = {
 128        .attr  = {.name = "event_done", .mode = S_IWUSR},
 129        .store = dlm_event_store
 130};
 131
 132static struct dlm_attr dlm_attr_id = {
 133        .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
 134        .show  = dlm_id_show,
 135        .store = dlm_id_store
 136};
 137
 138static struct dlm_attr dlm_attr_nodir = {
 139        .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
 140        .show  = dlm_nodir_show,
 141        .store = dlm_nodir_store
 142};
 143
 144static struct dlm_attr dlm_attr_recover_status = {
 145        .attr  = {.name = "recover_status", .mode = S_IRUGO},
 146        .show  = dlm_recover_status_show
 147};
 148
 149static struct dlm_attr dlm_attr_recover_nodeid = {
 150        .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
 151        .show  = dlm_recover_nodeid_show
 152};
 153
 154static struct attribute *dlm_attrs[] = {
 155        &dlm_attr_control.attr,
 156        &dlm_attr_event.attr,
 157        &dlm_attr_id.attr,
 158        &dlm_attr_nodir.attr,
 159        &dlm_attr_recover_status.attr,
 160        &dlm_attr_recover_nodeid.attr,
 161        NULL,
 162};
 163
 164static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
 165                             char *buf)
 166{
 167        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 168        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 169        return a->show ? a->show(ls, buf) : 0;
 170}
 171
 172static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
 173                              const char *buf, size_t len)
 174{
 175        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 176        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 177        return a->store ? a->store(ls, buf, len) : len;
 178}
 179
 180static void lockspace_kobj_release(struct kobject *k)
 181{
 182        struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
 183        kfree(ls);
 184}
 185
 186static const struct sysfs_ops dlm_attr_ops = {
 187        .show  = dlm_attr_show,
 188        .store = dlm_attr_store,
 189};
 190
 191static struct kobj_type dlm_ktype = {
 192        .default_attrs = dlm_attrs,
 193        .sysfs_ops     = &dlm_attr_ops,
 194        .release       = lockspace_kobj_release,
 195};
 196
 197static struct kset *dlm_kset;
 198
 199static int do_uevent(struct dlm_ls *ls, int in)
 200{
 201        int error;
 202
 203        if (in)
 204                kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
 205        else
 206                kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
 207
 208        log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
 209
 210        /* dlm_controld will see the uevent, do the necessary group management
 211           and then write to sysfs to wake us */
 212
 213        error = wait_event_interruptible(ls->ls_uevent_wait,
 214                        test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
 215
 216        log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
 217
 218        if (error)
 219                goto out;
 220
 221        error = ls->ls_uevent_result;
 222 out:
 223        if (error)
 224                log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
 225                          error, ls->ls_uevent_result);
 226        return error;
 227}
 228
 229static int dlm_uevent(struct kset *kset, struct kobject *kobj,
 230                      struct kobj_uevent_env *env)
 231{
 232        struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
 233
 234        add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
 235        return 0;
 236}
 237
 238static const struct kset_uevent_ops dlm_uevent_ops = {
 239        .uevent = dlm_uevent,
 240};
 241
 242int __init dlm_lockspace_init(void)
 243{
 244        ls_count = 0;
 245        mutex_init(&ls_lock);
 246        INIT_LIST_HEAD(&lslist);
 247        spin_lock_init(&lslist_lock);
 248
 249        dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
 250        if (!dlm_kset) {
 251                printk(KERN_WARNING "%s: can not create kset\n", __func__);
 252                return -ENOMEM;
 253        }
 254        return 0;
 255}
 256
 257void dlm_lockspace_exit(void)
 258{
 259        kset_unregister(dlm_kset);
 260}
 261
 262static struct dlm_ls *find_ls_to_scan(void)
 263{
 264        struct dlm_ls *ls;
 265
 266        spin_lock(&lslist_lock);
 267        list_for_each_entry(ls, &lslist, ls_list) {
 268                if (time_after_eq(jiffies, ls->ls_scan_time +
 269                                            dlm_config.ci_scan_secs * HZ)) {
 270                        spin_unlock(&lslist_lock);
 271                        return ls;
 272                }
 273        }
 274        spin_unlock(&lslist_lock);
 275        return NULL;
 276}
 277
 278static int dlm_scand(void *data)
 279{
 280        struct dlm_ls *ls;
 281
 282        while (!kthread_should_stop()) {
 283                ls = find_ls_to_scan();
 284                if (ls) {
 285                        if (dlm_lock_recovery_try(ls)) {
 286                                ls->ls_scan_time = jiffies;
 287                                dlm_scan_rsbs(ls);
 288                                dlm_scan_timeout(ls);
 289                                dlm_scan_waiters(ls);
 290                                dlm_unlock_recovery(ls);
 291                        } else {
 292                                ls->ls_scan_time += HZ;
 293                        }
 294                        continue;
 295                }
 296                schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
 297        }
 298        return 0;
 299}
 300
 301static int dlm_scand_start(void)
 302{
 303        struct task_struct *p;
 304        int error = 0;
 305
 306        p = kthread_run(dlm_scand, NULL, "dlm_scand");
 307        if (IS_ERR(p))
 308                error = PTR_ERR(p);
 309        else
 310                scand_task = p;
 311        return error;
 312}
 313
 314static void dlm_scand_stop(void)
 315{
 316        kthread_stop(scand_task);
 317}
 318
 319struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 320{
 321        struct dlm_ls *ls;
 322
 323        spin_lock(&lslist_lock);
 324
 325        list_for_each_entry(ls, &lslist, ls_list) {
 326                if (ls->ls_global_id == id) {
 327                        ls->ls_count++;
 328                        goto out;
 329                }
 330        }
 331        ls = NULL;
 332 out:
 333        spin_unlock(&lslist_lock);
 334        return ls;
 335}
 336
 337struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 338{
 339        struct dlm_ls *ls;
 340
 341        spin_lock(&lslist_lock);
 342        list_for_each_entry(ls, &lslist, ls_list) {
 343                if (ls->ls_local_handle == lockspace) {
 344                        ls->ls_count++;
 345                        goto out;
 346                }
 347        }
 348        ls = NULL;
 349 out:
 350        spin_unlock(&lslist_lock);
 351        return ls;
 352}
 353
 354struct dlm_ls *dlm_find_lockspace_device(int minor)
 355{
 356        struct dlm_ls *ls;
 357
 358        spin_lock(&lslist_lock);
 359        list_for_each_entry(ls, &lslist, ls_list) {
 360                if (ls->ls_device.minor == minor) {
 361                        ls->ls_count++;
 362                        goto out;
 363                }
 364        }
 365        ls = NULL;
 366 out:
 367        spin_unlock(&lslist_lock);
 368        return ls;
 369}
 370
 371void dlm_put_lockspace(struct dlm_ls *ls)
 372{
 373        spin_lock(&lslist_lock);
 374        ls->ls_count--;
 375        spin_unlock(&lslist_lock);
 376}
 377
 378static void remove_lockspace(struct dlm_ls *ls)
 379{
 380        for (;;) {
 381                spin_lock(&lslist_lock);
 382                if (ls->ls_count == 0) {
 383                        WARN_ON(ls->ls_create_count != 0);
 384                        list_del(&ls->ls_list);
 385                        spin_unlock(&lslist_lock);
 386                        return;
 387                }
 388                spin_unlock(&lslist_lock);
 389                ssleep(1);
 390        }
 391}
 392
 393static int threads_start(void)
 394{
 395        int error;
 396
 397        error = dlm_scand_start();
 398        if (error) {
 399                log_print("cannot start dlm_scand thread %d", error);
 400                goto fail;
 401        }
 402
 403        /* Thread for sending/receiving messages for all lockspace's */
 404        error = dlm_lowcomms_start();
 405        if (error) {
 406                log_print("cannot start dlm lowcomms %d", error);
 407                goto scand_fail;
 408        }
 409
 410        return 0;
 411
 412 scand_fail:
 413        dlm_scand_stop();
 414 fail:
 415        return error;
 416}
 417
 418static void threads_stop(void)
 419{
 420        dlm_scand_stop();
 421        dlm_lowcomms_stop();
 422}
 423
 424static int new_lockspace(const char *name, const char *cluster,
 425                         uint32_t flags, int lvblen,
 426                         const struct dlm_lockspace_ops *ops, void *ops_arg,
 427                         int *ops_result, dlm_lockspace_t **lockspace)
 428{
 429        struct dlm_ls *ls;
 430        int i, size, error;
 431        int do_unreg = 0;
 432        int namelen = strlen(name);
 433
 434        if (namelen > DLM_LOCKSPACE_LEN)
 435                return -EINVAL;
 436
 437        if (!lvblen || (lvblen % 8))
 438                return -EINVAL;
 439
 440        if (!try_module_get(THIS_MODULE))
 441                return -EINVAL;
 442
 443        if (!dlm_user_daemon_available()) {
 444                log_print("dlm user daemon not available");
 445                error = -EUNATCH;
 446                goto out;
 447        }
 448
 449        if (ops && ops_result) {
 450                if (!dlm_config.ci_recover_callbacks)
 451                        *ops_result = -EOPNOTSUPP;
 452                else
 453                        *ops_result = 0;
 454        }
 455
 456        if (!cluster)
 457                log_print("dlm cluster name '%s' is being used without an application provided cluster name",
 458                          dlm_config.ci_cluster_name);
 459
 460        if (dlm_config.ci_recover_callbacks && cluster &&
 461            strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
 462                log_print("dlm cluster name '%s' does not match "
 463                          "the application cluster name '%s'",
 464                          dlm_config.ci_cluster_name, cluster);
 465                error = -EBADR;
 466                goto out;
 467        }
 468
 469        error = 0;
 470
 471        spin_lock(&lslist_lock);
 472        list_for_each_entry(ls, &lslist, ls_list) {
 473                WARN_ON(ls->ls_create_count <= 0);
 474                if (ls->ls_namelen != namelen)
 475                        continue;
 476                if (memcmp(ls->ls_name, name, namelen))
 477                        continue;
 478                if (flags & DLM_LSFL_NEWEXCL) {
 479                        error = -EEXIST;
 480                        break;
 481                }
 482                ls->ls_create_count++;
 483                *lockspace = ls;
 484                error = 1;
 485                break;
 486        }
 487        spin_unlock(&lslist_lock);
 488
 489        if (error)
 490                goto out;
 491
 492        error = -ENOMEM;
 493
 494        ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
 495        if (!ls)
 496                goto out;
 497        memcpy(ls->ls_name, name, namelen);
 498        ls->ls_namelen = namelen;
 499        ls->ls_lvblen = lvblen;
 500        ls->ls_count = 0;
 501        ls->ls_flags = 0;
 502        ls->ls_scan_time = jiffies;
 503
 504        if (ops && dlm_config.ci_recover_callbacks) {
 505                ls->ls_ops = ops;
 506                ls->ls_ops_arg = ops_arg;
 507        }
 508
 509        if (flags & DLM_LSFL_TIMEWARN)
 510                set_bit(LSFL_TIMEWARN, &ls->ls_flags);
 511
 512        /* ls_exflags are forced to match among nodes, and we don't
 513           need to require all nodes to have some flags set */
 514        ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
 515                                    DLM_LSFL_NEWEXCL));
 516
 517        size = dlm_config.ci_rsbtbl_size;
 518        ls->ls_rsbtbl_size = size;
 519
 520        ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
 521        if (!ls->ls_rsbtbl)
 522                goto out_lsfree;
 523        for (i = 0; i < size; i++) {
 524                ls->ls_rsbtbl[i].keep.rb_node = NULL;
 525                ls->ls_rsbtbl[i].toss.rb_node = NULL;
 526                spin_lock_init(&ls->ls_rsbtbl[i].lock);
 527        }
 528
 529        spin_lock_init(&ls->ls_remove_spin);
 530
 531        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 532                ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
 533                                                 GFP_KERNEL);
 534                if (!ls->ls_remove_names[i])
 535                        goto out_rsbtbl;
 536        }
 537
 538        idr_init(&ls->ls_lkbidr);
 539        spin_lock_init(&ls->ls_lkbidr_spin);
 540
 541        INIT_LIST_HEAD(&ls->ls_waiters);
 542        mutex_init(&ls->ls_waiters_mutex);
 543        INIT_LIST_HEAD(&ls->ls_orphans);
 544        mutex_init(&ls->ls_orphans_mutex);
 545        INIT_LIST_HEAD(&ls->ls_timeout);
 546        mutex_init(&ls->ls_timeout_mutex);
 547
 548        INIT_LIST_HEAD(&ls->ls_new_rsb);
 549        spin_lock_init(&ls->ls_new_rsb_spin);
 550
 551        INIT_LIST_HEAD(&ls->ls_nodes);
 552        INIT_LIST_HEAD(&ls->ls_nodes_gone);
 553        ls->ls_num_nodes = 0;
 554        ls->ls_low_nodeid = 0;
 555        ls->ls_total_weight = 0;
 556        ls->ls_node_array = NULL;
 557
 558        memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
 559        ls->ls_stub_rsb.res_ls = ls;
 560
 561        ls->ls_debug_rsb_dentry = NULL;
 562        ls->ls_debug_waiters_dentry = NULL;
 563
 564        init_waitqueue_head(&ls->ls_uevent_wait);
 565        ls->ls_uevent_result = 0;
 566        init_completion(&ls->ls_members_done);
 567        ls->ls_members_result = -1;
 568
 569        mutex_init(&ls->ls_cb_mutex);
 570        INIT_LIST_HEAD(&ls->ls_cb_delay);
 571
 572        ls->ls_recoverd_task = NULL;
 573        mutex_init(&ls->ls_recoverd_active);
 574        spin_lock_init(&ls->ls_recover_lock);
 575        spin_lock_init(&ls->ls_rcom_spin);
 576        get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
 577        ls->ls_recover_status = 0;
 578        ls->ls_recover_seq = 0;
 579        ls->ls_recover_args = NULL;
 580        init_rwsem(&ls->ls_in_recovery);
 581        init_rwsem(&ls->ls_recv_active);
 582        INIT_LIST_HEAD(&ls->ls_requestqueue);
 583        mutex_init(&ls->ls_requestqueue_mutex);
 584        mutex_init(&ls->ls_clear_proc_locks);
 585
 586        ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
 587        if (!ls->ls_recover_buf)
 588                goto out_lkbidr;
 589
 590        ls->ls_slot = 0;
 591        ls->ls_num_slots = 0;
 592        ls->ls_slots_size = 0;
 593        ls->ls_slots = NULL;
 594
 595        INIT_LIST_HEAD(&ls->ls_recover_list);
 596        spin_lock_init(&ls->ls_recover_list_lock);
 597        idr_init(&ls->ls_recover_idr);
 598        spin_lock_init(&ls->ls_recover_idr_lock);
 599        ls->ls_recover_list_count = 0;
 600        ls->ls_local_handle = ls;
 601        init_waitqueue_head(&ls->ls_wait_general);
 602        INIT_LIST_HEAD(&ls->ls_root_list);
 603        init_rwsem(&ls->ls_root_sem);
 604
 605        spin_lock(&lslist_lock);
 606        ls->ls_create_count = 1;
 607        list_add(&ls->ls_list, &lslist);
 608        spin_unlock(&lslist_lock);
 609
 610        if (flags & DLM_LSFL_FS) {
 611                error = dlm_callback_start(ls);
 612                if (error) {
 613                        log_error(ls, "can't start dlm_callback %d", error);
 614                        goto out_delist;
 615                }
 616        }
 617
 618        init_waitqueue_head(&ls->ls_recover_lock_wait);
 619
 620        /*
 621         * Once started, dlm_recoverd first looks for ls in lslist, then
 622         * initializes ls_in_recovery as locked in "down" mode.  We need
 623         * to wait for the wakeup from dlm_recoverd because in_recovery
 624         * has to start out in down mode.
 625         */
 626
 627        error = dlm_recoverd_start(ls);
 628        if (error) {
 629                log_error(ls, "can't start dlm_recoverd %d", error);
 630                goto out_callback;
 631        }
 632
 633        wait_event(ls->ls_recover_lock_wait,
 634                   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
 635
 636        ls->ls_kobj.kset = dlm_kset;
 637        error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
 638                                     "%s", ls->ls_name);
 639        if (error)
 640                goto out_recoverd;
 641        kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
 642
 643        /* let kobject handle freeing of ls if there's an error */
 644        do_unreg = 1;
 645
 646        /* This uevent triggers dlm_controld in userspace to add us to the
 647           group of nodes that are members of this lockspace (managed by the
 648           cluster infrastructure.)  Once it's done that, it tells us who the
 649           current lockspace members are (via configfs) and then tells the
 650           lockspace to start running (via sysfs) in dlm_ls_start(). */
 651
 652        error = do_uevent(ls, 1);
 653        if (error)
 654                goto out_recoverd;
 655
 656        wait_for_completion(&ls->ls_members_done);
 657        error = ls->ls_members_result;
 658        if (error)
 659                goto out_members;
 660
 661        dlm_create_debug_file(ls);
 662
 663        log_rinfo(ls, "join complete");
 664        *lockspace = ls;
 665        return 0;
 666
 667 out_members:
 668        do_uevent(ls, 0);
 669        dlm_clear_members(ls);
 670        kfree(ls->ls_node_array);
 671 out_recoverd:
 672        dlm_recoverd_stop(ls);
 673 out_callback:
 674        dlm_callback_stop(ls);
 675 out_delist:
 676        spin_lock(&lslist_lock);
 677        list_del(&ls->ls_list);
 678        spin_unlock(&lslist_lock);
 679        idr_destroy(&ls->ls_recover_idr);
 680        kfree(ls->ls_recover_buf);
 681 out_lkbidr:
 682        idr_destroy(&ls->ls_lkbidr);
 683        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 684                if (ls->ls_remove_names[i])
 685                        kfree(ls->ls_remove_names[i]);
 686        }
 687 out_rsbtbl:
 688        vfree(ls->ls_rsbtbl);
 689 out_lsfree:
 690        if (do_unreg)
 691                kobject_put(&ls->ls_kobj);
 692        else
 693                kfree(ls);
 694 out:
 695        module_put(THIS_MODULE);
 696        return error;
 697}
 698
 699int dlm_new_lockspace(const char *name, const char *cluster,
 700                      uint32_t flags, int lvblen,
 701                      const struct dlm_lockspace_ops *ops, void *ops_arg,
 702                      int *ops_result, dlm_lockspace_t **lockspace)
 703{
 704        int error = 0;
 705
 706        mutex_lock(&ls_lock);
 707        if (!ls_count)
 708                error = threads_start();
 709        if (error)
 710                goto out;
 711
 712        error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
 713                              ops_result, lockspace);
 714        if (!error)
 715                ls_count++;
 716        if (error > 0)
 717                error = 0;
 718        if (!ls_count)
 719                threads_stop();
 720 out:
 721        mutex_unlock(&ls_lock);
 722        return error;
 723}
 724
 725static int lkb_idr_is_local(int id, void *p, void *data)
 726{
 727        struct dlm_lkb *lkb = p;
 728
 729        return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
 730}
 731
 732static int lkb_idr_is_any(int id, void *p, void *data)
 733{
 734        return 1;
 735}
 736
 737static int lkb_idr_free(int id, void *p, void *data)
 738{
 739        struct dlm_lkb *lkb = p;
 740
 741        if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
 742                dlm_free_lvb(lkb->lkb_lvbptr);
 743
 744        dlm_free_lkb(lkb);
 745        return 0;
 746}
 747
 748/* NOTE: We check the lkbidr here rather than the resource table.
 749   This is because there may be LKBs queued as ASTs that have been unlinked
 750   from their RSBs and are pending deletion once the AST has been delivered */
 751
 752static int lockspace_busy(struct dlm_ls *ls, int force)
 753{
 754        int rv;
 755
 756        spin_lock(&ls->ls_lkbidr_spin);
 757        if (force == 0) {
 758                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
 759        } else if (force == 1) {
 760                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
 761        } else {
 762                rv = 0;
 763        }
 764        spin_unlock(&ls->ls_lkbidr_spin);
 765        return rv;
 766}
 767
 768static int release_lockspace(struct dlm_ls *ls, int force)
 769{
 770        struct dlm_rsb *rsb;
 771        struct rb_node *n;
 772        int i, busy, rv;
 773
 774        busy = lockspace_busy(ls, force);
 775
 776        spin_lock(&lslist_lock);
 777        if (ls->ls_create_count == 1) {
 778                if (busy) {
 779                        rv = -EBUSY;
 780                } else {
 781                        /* remove_lockspace takes ls off lslist */
 782                        ls->ls_create_count = 0;
 783                        rv = 0;
 784                }
 785        } else if (ls->ls_create_count > 1) {
 786                rv = --ls->ls_create_count;
 787        } else {
 788                rv = -EINVAL;
 789        }
 790        spin_unlock(&lslist_lock);
 791
 792        if (rv) {
 793                log_debug(ls, "release_lockspace no remove %d", rv);
 794                return rv;
 795        }
 796
 797        dlm_device_deregister(ls);
 798
 799        if (force < 3 && dlm_user_daemon_available())
 800                do_uevent(ls, 0);
 801
 802        dlm_recoverd_stop(ls);
 803
 804        dlm_callback_stop(ls);
 805
 806        remove_lockspace(ls);
 807
 808        dlm_delete_debug_file(ls);
 809
 810        kfree(ls->ls_recover_buf);
 811
 812        /*
 813         * Free all lkb's in idr
 814         */
 815
 816        idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
 817        idr_destroy(&ls->ls_lkbidr);
 818
 819        /*
 820         * Free all rsb's on rsbtbl[] lists
 821         */
 822
 823        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 824                while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
 825                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 826                        rb_erase(n, &ls->ls_rsbtbl[i].keep);
 827                        dlm_free_rsb(rsb);
 828                }
 829
 830                while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
 831                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 832                        rb_erase(n, &ls->ls_rsbtbl[i].toss);
 833                        dlm_free_rsb(rsb);
 834                }
 835        }
 836
 837        vfree(ls->ls_rsbtbl);
 838
 839        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 840                kfree(ls->ls_remove_names[i]);
 841
 842        while (!list_empty(&ls->ls_new_rsb)) {
 843                rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
 844                                       res_hashchain);
 845                list_del(&rsb->res_hashchain);
 846                dlm_free_rsb(rsb);
 847        }
 848
 849        /*
 850         * Free structures on any other lists
 851         */
 852
 853        dlm_purge_requestqueue(ls);
 854        kfree(ls->ls_recover_args);
 855        dlm_clear_members(ls);
 856        dlm_clear_members_gone(ls);
 857        kfree(ls->ls_node_array);
 858        log_rinfo(ls, "release_lockspace final free");
 859        kobject_put(&ls->ls_kobj);
 860        /* The ls structure will be freed when the kobject is done with */
 861
 862        module_put(THIS_MODULE);
 863        return 0;
 864}
 865
 866/*
 867 * Called when a system has released all its locks and is not going to use the
 868 * lockspace any longer.  We free everything we're managing for this lockspace.
 869 * Remaining nodes will go through the recovery process as if we'd died.  The
 870 * lockspace must continue to function as usual, participating in recoveries,
 871 * until this returns.
 872 *
 873 * Force has 4 possible values:
 874 * 0 - don't destroy locksapce if it has any LKBs
 875 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
 876 * 2 - destroy lockspace regardless of LKBs
 877 * 3 - destroy lockspace as part of a forced shutdown
 878 */
 879
 880int dlm_release_lockspace(void *lockspace, int force)
 881{
 882        struct dlm_ls *ls;
 883        int error;
 884
 885        ls = dlm_find_lockspace_local(lockspace);
 886        if (!ls)
 887                return -EINVAL;
 888        dlm_put_lockspace(ls);
 889
 890        mutex_lock(&ls_lock);
 891        error = release_lockspace(ls, force);
 892        if (!error)
 893                ls_count--;
 894        if (!ls_count)
 895                threads_stop();
 896        mutex_unlock(&ls_lock);
 897
 898        return error;
 899}
 900
 901void dlm_stop_lockspaces(void)
 902{
 903        struct dlm_ls *ls;
 904        int count;
 905
 906 restart:
 907        count = 0;
 908        spin_lock(&lslist_lock);
 909        list_for_each_entry(ls, &lslist, ls_list) {
 910                if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
 911                        count++;
 912                        continue;
 913                }
 914                spin_unlock(&lslist_lock);
 915                log_error(ls, "no userland control daemon, stopping lockspace");
 916                dlm_ls_stop(ls);
 917                goto restart;
 918        }
 919        spin_unlock(&lslist_lock);
 920
 921        if (count)
 922                log_print("dlm user daemon left %d lockspaces", count);
 923}
 924
 925