linux/fs/dlm/lockspace.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/******************************************************************************
   3*******************************************************************************
   4**
   5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
   7**
   8**
   9*******************************************************************************
  10******************************************************************************/
  11
  12#include <linux/module.h>
  13
  14#include "dlm_internal.h"
  15#include "lockspace.h"
  16#include "member.h"
  17#include "recoverd.h"
  18#include "dir.h"
  19#include "midcomms.h"
  20#include "lowcomms.h"
  21#include "config.h"
  22#include "memory.h"
  23#include "lock.h"
  24#include "recover.h"
  25#include "requestqueue.h"
  26#include "user.h"
  27#include "ast.h"
  28
  29static int                      ls_count;
  30static struct mutex             ls_lock;
  31static struct list_head         lslist;
  32static spinlock_t               lslist_lock;
  33static struct task_struct *     scand_task;
  34
  35
  36static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
  37{
  38        ssize_t ret = len;
  39        int n;
  40        int rc = kstrtoint(buf, 0, &n);
  41
  42        if (rc)
  43                return rc;
  44        ls = dlm_find_lockspace_local(ls->ls_local_handle);
  45        if (!ls)
  46                return -EINVAL;
  47
  48        switch (n) {
  49        case 0:
  50                dlm_ls_stop(ls);
  51                break;
  52        case 1:
  53                dlm_ls_start(ls);
  54                break;
  55        default:
  56                ret = -EINVAL;
  57        }
  58        dlm_put_lockspace(ls);
  59        return ret;
  60}
  61
  62static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
  63{
  64        int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
  65
  66        if (rc)
  67                return rc;
  68        set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
  69        wake_up(&ls->ls_uevent_wait);
  70        return len;
  71}
  72
  73static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
  74{
  75        return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
  76}
  77
  78static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
  79{
  80        int rc = kstrtouint(buf, 0, &ls->ls_global_id);
  81
  82        if (rc)
  83                return rc;
  84        return len;
  85}
  86
  87static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
  88{
  89        return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
  90}
  91
  92static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
  93{
  94        int val;
  95        int rc = kstrtoint(buf, 0, &val);
  96
  97        if (rc)
  98                return rc;
  99        if (val == 1)
 100                set_bit(LSFL_NODIR, &ls->ls_flags);
 101        return len;
 102}
 103
 104static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
 105{
 106        uint32_t status = dlm_recover_status(ls);
 107        return snprintf(buf, PAGE_SIZE, "%x\n", status);
 108}
 109
 110static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
 111{
 112        return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
 113}
 114
 115struct dlm_attr {
 116        struct attribute attr;
 117        ssize_t (*show)(struct dlm_ls *, char *);
 118        ssize_t (*store)(struct dlm_ls *, const char *, size_t);
 119};
 120
 121static struct dlm_attr dlm_attr_control = {
 122        .attr  = {.name = "control", .mode = S_IWUSR},
 123        .store = dlm_control_store
 124};
 125
 126static struct dlm_attr dlm_attr_event = {
 127        .attr  = {.name = "event_done", .mode = S_IWUSR},
 128        .store = dlm_event_store
 129};
 130
 131static struct dlm_attr dlm_attr_id = {
 132        .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
 133        .show  = dlm_id_show,
 134        .store = dlm_id_store
 135};
 136
 137static struct dlm_attr dlm_attr_nodir = {
 138        .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
 139        .show  = dlm_nodir_show,
 140        .store = dlm_nodir_store
 141};
 142
 143static struct dlm_attr dlm_attr_recover_status = {
 144        .attr  = {.name = "recover_status", .mode = S_IRUGO},
 145        .show  = dlm_recover_status_show
 146};
 147
 148static struct dlm_attr dlm_attr_recover_nodeid = {
 149        .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
 150        .show  = dlm_recover_nodeid_show
 151};
 152
 153static struct attribute *dlm_attrs[] = {
 154        &dlm_attr_control.attr,
 155        &dlm_attr_event.attr,
 156        &dlm_attr_id.attr,
 157        &dlm_attr_nodir.attr,
 158        &dlm_attr_recover_status.attr,
 159        &dlm_attr_recover_nodeid.attr,
 160        NULL,
 161};
 162ATTRIBUTE_GROUPS(dlm);
 163
 164static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
 165                             char *buf)
 166{
 167        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 168        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 169        return a->show ? a->show(ls, buf) : 0;
 170}
 171
 172static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
 173                              const char *buf, size_t len)
 174{
 175        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 176        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 177        return a->store ? a->store(ls, buf, len) : len;
 178}
 179
 180static void lockspace_kobj_release(struct kobject *k)
 181{
 182        struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
 183        kfree(ls);
 184}
 185
 186static const struct sysfs_ops dlm_attr_ops = {
 187        .show  = dlm_attr_show,
 188        .store = dlm_attr_store,
 189};
 190
 191static struct kobj_type dlm_ktype = {
 192        .default_groups = dlm_groups,
 193        .sysfs_ops     = &dlm_attr_ops,
 194        .release       = lockspace_kobj_release,
 195};
 196
 197static struct kset *dlm_kset;
 198
 199static int do_uevent(struct dlm_ls *ls, int in)
 200{
 201        if (in)
 202                kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
 203        else
 204                kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
 205
 206        log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
 207
 208        /* dlm_controld will see the uevent, do the necessary group management
 209           and then write to sysfs to wake us */
 210
 211        wait_event(ls->ls_uevent_wait,
 212                   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
 213
 214        log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
 215
 216        return ls->ls_uevent_result;
 217}
 218
 219static int dlm_uevent(struct kobject *kobj, struct kobj_uevent_env *env)
 220{
 221        struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
 222
 223        add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
 224        return 0;
 225}
 226
 227static const struct kset_uevent_ops dlm_uevent_ops = {
 228        .uevent = dlm_uevent,
 229};
 230
 231int __init dlm_lockspace_init(void)
 232{
 233        ls_count = 0;
 234        mutex_init(&ls_lock);
 235        INIT_LIST_HEAD(&lslist);
 236        spin_lock_init(&lslist_lock);
 237
 238        dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
 239        if (!dlm_kset) {
 240                printk(KERN_WARNING "%s: can not create kset\n", __func__);
 241                return -ENOMEM;
 242        }
 243        return 0;
 244}
 245
 246void dlm_lockspace_exit(void)
 247{
 248        kset_unregister(dlm_kset);
 249}
 250
 251static struct dlm_ls *find_ls_to_scan(void)
 252{
 253        struct dlm_ls *ls;
 254
 255        spin_lock(&lslist_lock);
 256        list_for_each_entry(ls, &lslist, ls_list) {
 257                if (time_after_eq(jiffies, ls->ls_scan_time +
 258                                            dlm_config.ci_scan_secs * HZ)) {
 259                        spin_unlock(&lslist_lock);
 260                        return ls;
 261                }
 262        }
 263        spin_unlock(&lslist_lock);
 264        return NULL;
 265}
 266
 267static int dlm_scand(void *data)
 268{
 269        struct dlm_ls *ls;
 270
 271        while (!kthread_should_stop()) {
 272                ls = find_ls_to_scan();
 273                if (ls) {
 274                        if (dlm_lock_recovery_try(ls)) {
 275                                ls->ls_scan_time = jiffies;
 276                                dlm_scan_rsbs(ls);
 277                                dlm_scan_timeout(ls);
 278                                dlm_scan_waiters(ls);
 279                                dlm_unlock_recovery(ls);
 280                        } else {
 281                                ls->ls_scan_time += HZ;
 282                        }
 283                        continue;
 284                }
 285                schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
 286        }
 287        return 0;
 288}
 289
 290static int dlm_scand_start(void)
 291{
 292        struct task_struct *p;
 293        int error = 0;
 294
 295        p = kthread_run(dlm_scand, NULL, "dlm_scand");
 296        if (IS_ERR(p))
 297                error = PTR_ERR(p);
 298        else
 299                scand_task = p;
 300        return error;
 301}
 302
 303static void dlm_scand_stop(void)
 304{
 305        kthread_stop(scand_task);
 306}
 307
 308struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 309{
 310        struct dlm_ls *ls;
 311
 312        spin_lock(&lslist_lock);
 313
 314        list_for_each_entry(ls, &lslist, ls_list) {
 315                if (ls->ls_global_id == id) {
 316                        atomic_inc(&ls->ls_count);
 317                        goto out;
 318                }
 319        }
 320        ls = NULL;
 321 out:
 322        spin_unlock(&lslist_lock);
 323        return ls;
 324}
 325
 326struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 327{
 328        struct dlm_ls *ls;
 329
 330        spin_lock(&lslist_lock);
 331        list_for_each_entry(ls, &lslist, ls_list) {
 332                if (ls->ls_local_handle == lockspace) {
 333                        atomic_inc(&ls->ls_count);
 334                        goto out;
 335                }
 336        }
 337        ls = NULL;
 338 out:
 339        spin_unlock(&lslist_lock);
 340        return ls;
 341}
 342
 343struct dlm_ls *dlm_find_lockspace_device(int minor)
 344{
 345        struct dlm_ls *ls;
 346
 347        spin_lock(&lslist_lock);
 348        list_for_each_entry(ls, &lslist, ls_list) {
 349                if (ls->ls_device.minor == minor) {
 350                        atomic_inc(&ls->ls_count);
 351                        goto out;
 352                }
 353        }
 354        ls = NULL;
 355 out:
 356        spin_unlock(&lslist_lock);
 357        return ls;
 358}
 359
 360void dlm_put_lockspace(struct dlm_ls *ls)
 361{
 362        if (atomic_dec_and_test(&ls->ls_count))
 363                wake_up(&ls->ls_count_wait);
 364}
 365
 366static void remove_lockspace(struct dlm_ls *ls)
 367{
 368retry:
 369        wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
 370
 371        spin_lock(&lslist_lock);
 372        if (atomic_read(&ls->ls_count) != 0) {
 373                spin_unlock(&lslist_lock);
 374                goto retry;
 375        }
 376
 377        WARN_ON(ls->ls_create_count != 0);
 378        list_del(&ls->ls_list);
 379        spin_unlock(&lslist_lock);
 380}
 381
 382static int threads_start(void)
 383{
 384        int error;
 385
 386        error = dlm_scand_start();
 387        if (error) {
 388                log_print("cannot start dlm_scand thread %d", error);
 389                goto fail;
 390        }
 391
 392        /* Thread for sending/receiving messages for all lockspace's */
 393        error = dlm_midcomms_start();
 394        if (error) {
 395                log_print("cannot start dlm lowcomms %d", error);
 396                goto scand_fail;
 397        }
 398
 399        return 0;
 400
 401 scand_fail:
 402        dlm_scand_stop();
 403 fail:
 404        return error;
 405}
 406
 407static int new_lockspace(const char *name, const char *cluster,
 408                         uint32_t flags, int lvblen,
 409                         const struct dlm_lockspace_ops *ops, void *ops_arg,
 410                         int *ops_result, dlm_lockspace_t **lockspace)
 411{
 412        struct dlm_ls *ls;
 413        int i, size, error;
 414        int do_unreg = 0;
 415        int namelen = strlen(name);
 416
 417        if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
 418                return -EINVAL;
 419
 420        if (!lvblen || (lvblen % 8))
 421                return -EINVAL;
 422
 423        if (!try_module_get(THIS_MODULE))
 424                return -EINVAL;
 425
 426        if (!dlm_user_daemon_available()) {
 427                log_print("dlm user daemon not available");
 428                error = -EUNATCH;
 429                goto out;
 430        }
 431
 432        if (ops && ops_result) {
 433                if (!dlm_config.ci_recover_callbacks)
 434                        *ops_result = -EOPNOTSUPP;
 435                else
 436                        *ops_result = 0;
 437        }
 438
 439        if (!cluster)
 440                log_print("dlm cluster name '%s' is being used without an application provided cluster name",
 441                          dlm_config.ci_cluster_name);
 442
 443        if (dlm_config.ci_recover_callbacks && cluster &&
 444            strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
 445                log_print("dlm cluster name '%s' does not match "
 446                          "the application cluster name '%s'",
 447                          dlm_config.ci_cluster_name, cluster);
 448                error = -EBADR;
 449                goto out;
 450        }
 451
 452        error = 0;
 453
 454        spin_lock(&lslist_lock);
 455        list_for_each_entry(ls, &lslist, ls_list) {
 456                WARN_ON(ls->ls_create_count <= 0);
 457                if (ls->ls_namelen != namelen)
 458                        continue;
 459                if (memcmp(ls->ls_name, name, namelen))
 460                        continue;
 461                if (flags & DLM_LSFL_NEWEXCL) {
 462                        error = -EEXIST;
 463                        break;
 464                }
 465                ls->ls_create_count++;
 466                *lockspace = ls;
 467                error = 1;
 468                break;
 469        }
 470        spin_unlock(&lslist_lock);
 471
 472        if (error)
 473                goto out;
 474
 475        error = -ENOMEM;
 476
 477        ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
 478        if (!ls)
 479                goto out;
 480        memcpy(ls->ls_name, name, namelen);
 481        ls->ls_namelen = namelen;
 482        ls->ls_lvblen = lvblen;
 483        atomic_set(&ls->ls_count, 0);
 484        init_waitqueue_head(&ls->ls_count_wait);
 485        ls->ls_flags = 0;
 486        ls->ls_scan_time = jiffies;
 487
 488        if (ops && dlm_config.ci_recover_callbacks) {
 489                ls->ls_ops = ops;
 490                ls->ls_ops_arg = ops_arg;
 491        }
 492
 493        if (flags & DLM_LSFL_TIMEWARN)
 494                set_bit(LSFL_TIMEWARN, &ls->ls_flags);
 495
 496        /* ls_exflags are forced to match among nodes, and we don't
 497           need to require all nodes to have some flags set */
 498        ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
 499                                    DLM_LSFL_NEWEXCL));
 500
 501        size = READ_ONCE(dlm_config.ci_rsbtbl_size);
 502        ls->ls_rsbtbl_size = size;
 503
 504        ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
 505        if (!ls->ls_rsbtbl)
 506                goto out_lsfree;
 507        for (i = 0; i < size; i++) {
 508                ls->ls_rsbtbl[i].keep.rb_node = NULL;
 509                ls->ls_rsbtbl[i].toss.rb_node = NULL;
 510                spin_lock_init(&ls->ls_rsbtbl[i].lock);
 511        }
 512
 513        spin_lock_init(&ls->ls_remove_spin);
 514        init_waitqueue_head(&ls->ls_remove_wait);
 515
 516        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 517                ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
 518                                                 GFP_KERNEL);
 519                if (!ls->ls_remove_names[i])
 520                        goto out_rsbtbl;
 521        }
 522
 523        idr_init(&ls->ls_lkbidr);
 524        spin_lock_init(&ls->ls_lkbidr_spin);
 525
 526        INIT_LIST_HEAD(&ls->ls_waiters);
 527        mutex_init(&ls->ls_waiters_mutex);
 528        INIT_LIST_HEAD(&ls->ls_orphans);
 529        mutex_init(&ls->ls_orphans_mutex);
 530        INIT_LIST_HEAD(&ls->ls_timeout);
 531        mutex_init(&ls->ls_timeout_mutex);
 532
 533        INIT_LIST_HEAD(&ls->ls_new_rsb);
 534        spin_lock_init(&ls->ls_new_rsb_spin);
 535
 536        INIT_LIST_HEAD(&ls->ls_nodes);
 537        INIT_LIST_HEAD(&ls->ls_nodes_gone);
 538        ls->ls_num_nodes = 0;
 539        ls->ls_low_nodeid = 0;
 540        ls->ls_total_weight = 0;
 541        ls->ls_node_array = NULL;
 542
 543        memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
 544        ls->ls_stub_rsb.res_ls = ls;
 545
 546        ls->ls_debug_rsb_dentry = NULL;
 547        ls->ls_debug_waiters_dentry = NULL;
 548
 549        init_waitqueue_head(&ls->ls_uevent_wait);
 550        ls->ls_uevent_result = 0;
 551        init_completion(&ls->ls_members_done);
 552        ls->ls_members_result = -1;
 553
 554        mutex_init(&ls->ls_cb_mutex);
 555        INIT_LIST_HEAD(&ls->ls_cb_delay);
 556
 557        ls->ls_recoverd_task = NULL;
 558        mutex_init(&ls->ls_recoverd_active);
 559        spin_lock_init(&ls->ls_recover_lock);
 560        spin_lock_init(&ls->ls_rcom_spin);
 561        get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
 562        ls->ls_recover_status = 0;
 563        ls->ls_recover_seq = 0;
 564        ls->ls_recover_args = NULL;
 565        init_rwsem(&ls->ls_in_recovery);
 566        init_rwsem(&ls->ls_recv_active);
 567        INIT_LIST_HEAD(&ls->ls_requestqueue);
 568        atomic_set(&ls->ls_requestqueue_cnt, 0);
 569        init_waitqueue_head(&ls->ls_requestqueue_wait);
 570        mutex_init(&ls->ls_requestqueue_mutex);
 571        mutex_init(&ls->ls_clear_proc_locks);
 572
 573        /* Due backwards compatibility with 3.1 we need to use maximum
 574         * possible dlm message size to be sure the message will fit and
 575         * not having out of bounds issues. However on sending side 3.2
 576         * might send less.
 577         */
 578        ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
 579        if (!ls->ls_recover_buf)
 580                goto out_lkbidr;
 581
 582        ls->ls_slot = 0;
 583        ls->ls_num_slots = 0;
 584        ls->ls_slots_size = 0;
 585        ls->ls_slots = NULL;
 586
 587        INIT_LIST_HEAD(&ls->ls_recover_list);
 588        spin_lock_init(&ls->ls_recover_list_lock);
 589        idr_init(&ls->ls_recover_idr);
 590        spin_lock_init(&ls->ls_recover_idr_lock);
 591        ls->ls_recover_list_count = 0;
 592        ls->ls_local_handle = ls;
 593        init_waitqueue_head(&ls->ls_wait_general);
 594        INIT_LIST_HEAD(&ls->ls_root_list);
 595        init_rwsem(&ls->ls_root_sem);
 596
 597        spin_lock(&lslist_lock);
 598        ls->ls_create_count = 1;
 599        list_add(&ls->ls_list, &lslist);
 600        spin_unlock(&lslist_lock);
 601
 602        if (flags & DLM_LSFL_FS) {
 603                error = dlm_callback_start(ls);
 604                if (error) {
 605                        log_error(ls, "can't start dlm_callback %d", error);
 606                        goto out_delist;
 607                }
 608        }
 609
 610        init_waitqueue_head(&ls->ls_recover_lock_wait);
 611
 612        /*
 613         * Once started, dlm_recoverd first looks for ls in lslist, then
 614         * initializes ls_in_recovery as locked in "down" mode.  We need
 615         * to wait for the wakeup from dlm_recoverd because in_recovery
 616         * has to start out in down mode.
 617         */
 618
 619        error = dlm_recoverd_start(ls);
 620        if (error) {
 621                log_error(ls, "can't start dlm_recoverd %d", error);
 622                goto out_callback;
 623        }
 624
 625        wait_event(ls->ls_recover_lock_wait,
 626                   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
 627
 628        /* let kobject handle freeing of ls if there's an error */
 629        do_unreg = 1;
 630
 631        ls->ls_kobj.kset = dlm_kset;
 632        error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
 633                                     "%s", ls->ls_name);
 634        if (error)
 635                goto out_recoverd;
 636        kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
 637
 638        /* This uevent triggers dlm_controld in userspace to add us to the
 639           group of nodes that are members of this lockspace (managed by the
 640           cluster infrastructure.)  Once it's done that, it tells us who the
 641           current lockspace members are (via configfs) and then tells the
 642           lockspace to start running (via sysfs) in dlm_ls_start(). */
 643
 644        error = do_uevent(ls, 1);
 645        if (error)
 646                goto out_recoverd;
 647
 648        wait_for_completion(&ls->ls_members_done);
 649        error = ls->ls_members_result;
 650        if (error)
 651                goto out_members;
 652
 653        dlm_create_debug_file(ls);
 654
 655        log_rinfo(ls, "join complete");
 656        *lockspace = ls;
 657        return 0;
 658
 659 out_members:
 660        do_uevent(ls, 0);
 661        dlm_clear_members(ls);
 662        kfree(ls->ls_node_array);
 663 out_recoverd:
 664        dlm_recoverd_stop(ls);
 665 out_callback:
 666        dlm_callback_stop(ls);
 667 out_delist:
 668        spin_lock(&lslist_lock);
 669        list_del(&ls->ls_list);
 670        spin_unlock(&lslist_lock);
 671        idr_destroy(&ls->ls_recover_idr);
 672        kfree(ls->ls_recover_buf);
 673 out_lkbidr:
 674        idr_destroy(&ls->ls_lkbidr);
 675 out_rsbtbl:
 676        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 677                kfree(ls->ls_remove_names[i]);
 678        vfree(ls->ls_rsbtbl);
 679 out_lsfree:
 680        if (do_unreg)
 681                kobject_put(&ls->ls_kobj);
 682        else
 683                kfree(ls);
 684 out:
 685        module_put(THIS_MODULE);
 686        return error;
 687}
 688
 689int dlm_new_lockspace(const char *name, const char *cluster,
 690                      uint32_t flags, int lvblen,
 691                      const struct dlm_lockspace_ops *ops, void *ops_arg,
 692                      int *ops_result, dlm_lockspace_t **lockspace)
 693{
 694        int error = 0;
 695
 696        mutex_lock(&ls_lock);
 697        if (!ls_count)
 698                error = threads_start();
 699        if (error)
 700                goto out;
 701
 702        error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
 703                              ops_result, lockspace);
 704        if (!error)
 705                ls_count++;
 706        if (error > 0)
 707                error = 0;
 708        if (!ls_count) {
 709                dlm_scand_stop();
 710                dlm_midcomms_shutdown();
 711                dlm_lowcomms_stop();
 712        }
 713 out:
 714        mutex_unlock(&ls_lock);
 715        return error;
 716}
 717
 718static int lkb_idr_is_local(int id, void *p, void *data)
 719{
 720        struct dlm_lkb *lkb = p;
 721
 722        return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
 723}
 724
 725static int lkb_idr_is_any(int id, void *p, void *data)
 726{
 727        return 1;
 728}
 729
 730static int lkb_idr_free(int id, void *p, void *data)
 731{
 732        struct dlm_lkb *lkb = p;
 733
 734        if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
 735                dlm_free_lvb(lkb->lkb_lvbptr);
 736
 737        dlm_free_lkb(lkb);
 738        return 0;
 739}
 740
 741/* NOTE: We check the lkbidr here rather than the resource table.
 742   This is because there may be LKBs queued as ASTs that have been unlinked
 743   from their RSBs and are pending deletion once the AST has been delivered */
 744
 745static int lockspace_busy(struct dlm_ls *ls, int force)
 746{
 747        int rv;
 748
 749        spin_lock(&ls->ls_lkbidr_spin);
 750        if (force == 0) {
 751                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
 752        } else if (force == 1) {
 753                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
 754        } else {
 755                rv = 0;
 756        }
 757        spin_unlock(&ls->ls_lkbidr_spin);
 758        return rv;
 759}
 760
 761static int release_lockspace(struct dlm_ls *ls, int force)
 762{
 763        struct dlm_rsb *rsb;
 764        struct rb_node *n;
 765        int i, busy, rv;
 766
 767        busy = lockspace_busy(ls, force);
 768
 769        spin_lock(&lslist_lock);
 770        if (ls->ls_create_count == 1) {
 771                if (busy) {
 772                        rv = -EBUSY;
 773                } else {
 774                        /* remove_lockspace takes ls off lslist */
 775                        ls->ls_create_count = 0;
 776                        rv = 0;
 777                }
 778        } else if (ls->ls_create_count > 1) {
 779                rv = --ls->ls_create_count;
 780        } else {
 781                rv = -EINVAL;
 782        }
 783        spin_unlock(&lslist_lock);
 784
 785        if (rv) {
 786                log_debug(ls, "release_lockspace no remove %d", rv);
 787                return rv;
 788        }
 789
 790        dlm_device_deregister(ls);
 791
 792        if (force < 3 && dlm_user_daemon_available())
 793                do_uevent(ls, 0);
 794
 795        dlm_recoverd_stop(ls);
 796
 797        if (ls_count == 1) {
 798                dlm_scand_stop();
 799                dlm_clear_members(ls);
 800                dlm_midcomms_shutdown();
 801        }
 802
 803        dlm_callback_stop(ls);
 804
 805        remove_lockspace(ls);
 806
 807        dlm_delete_debug_file(ls);
 808
 809        idr_destroy(&ls->ls_recover_idr);
 810        kfree(ls->ls_recover_buf);
 811
 812        /*
 813         * Free all lkb's in idr
 814         */
 815
 816        idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
 817        idr_destroy(&ls->ls_lkbidr);
 818
 819        /*
 820         * Free all rsb's on rsbtbl[] lists
 821         */
 822
 823        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 824                while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
 825                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 826                        rb_erase(n, &ls->ls_rsbtbl[i].keep);
 827                        dlm_free_rsb(rsb);
 828                }
 829
 830                while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
 831                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 832                        rb_erase(n, &ls->ls_rsbtbl[i].toss);
 833                        dlm_free_rsb(rsb);
 834                }
 835        }
 836
 837        vfree(ls->ls_rsbtbl);
 838
 839        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 840                kfree(ls->ls_remove_names[i]);
 841
 842        while (!list_empty(&ls->ls_new_rsb)) {
 843                rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
 844                                       res_hashchain);
 845                list_del(&rsb->res_hashchain);
 846                dlm_free_rsb(rsb);
 847        }
 848
 849        /*
 850         * Free structures on any other lists
 851         */
 852
 853        dlm_purge_requestqueue(ls);
 854        kfree(ls->ls_recover_args);
 855        dlm_clear_members(ls);
 856        dlm_clear_members_gone(ls);
 857        kfree(ls->ls_node_array);
 858        log_rinfo(ls, "release_lockspace final free");
 859        kobject_put(&ls->ls_kobj);
 860        /* The ls structure will be freed when the kobject is done with */
 861
 862        module_put(THIS_MODULE);
 863        return 0;
 864}
 865
 866/*
 867 * Called when a system has released all its locks and is not going to use the
 868 * lockspace any longer.  We free everything we're managing for this lockspace.
 869 * Remaining nodes will go through the recovery process as if we'd died.  The
 870 * lockspace must continue to function as usual, participating in recoveries,
 871 * until this returns.
 872 *
 873 * Force has 4 possible values:
 874 * 0 - don't destroy lockspace if it has any LKBs
 875 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
 876 * 2 - destroy lockspace regardless of LKBs
 877 * 3 - destroy lockspace as part of a forced shutdown
 878 */
 879
 880int dlm_release_lockspace(void *lockspace, int force)
 881{
 882        struct dlm_ls *ls;
 883        int error;
 884
 885        ls = dlm_find_lockspace_local(lockspace);
 886        if (!ls)
 887                return -EINVAL;
 888        dlm_put_lockspace(ls);
 889
 890        mutex_lock(&ls_lock);
 891        error = release_lockspace(ls, force);
 892        if (!error)
 893                ls_count--;
 894        if (!ls_count)
 895                dlm_lowcomms_stop();
 896        mutex_unlock(&ls_lock);
 897
 898        return error;
 899}
 900
 901void dlm_stop_lockspaces(void)
 902{
 903        struct dlm_ls *ls;
 904        int count;
 905
 906 restart:
 907        count = 0;
 908        spin_lock(&lslist_lock);
 909        list_for_each_entry(ls, &lslist, ls_list) {
 910                if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
 911                        count++;
 912                        continue;
 913                }
 914                spin_unlock(&lslist_lock);
 915                log_error(ls, "no userland control daemon, stopping lockspace");
 916                dlm_ls_stop(ls);
 917                goto restart;
 918        }
 919        spin_unlock(&lslist_lock);
 920
 921        if (count)
 922                log_print("dlm user daemon left %d lockspaces", count);
 923}
 924
 925void dlm_stop_lockspaces_check(void)
 926{
 927        struct dlm_ls *ls;
 928
 929        spin_lock(&lslist_lock);
 930        list_for_each_entry(ls, &lslist, ls_list) {
 931                if (WARN_ON(!rwsem_is_locked(&ls->ls_in_recovery) ||
 932                            !dlm_locking_stopped(ls)))
 933                        break;
 934        }
 935        spin_unlock(&lslist_lock);
 936}
 937