linux/fs/dlm/lockspace.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/******************************************************************************
   3*******************************************************************************
   4**
   5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
   7**
   8**
   9*******************************************************************************
  10******************************************************************************/
  11
  12#include <linux/module.h>
  13
  14#include "dlm_internal.h"
  15#include "lockspace.h"
  16#include "member.h"
  17#include "recoverd.h"
  18#include "dir.h"
  19#include "lowcomms.h"
  20#include "config.h"
  21#include "memory.h"
  22#include "lock.h"
  23#include "recover.h"
  24#include "requestqueue.h"
  25#include "user.h"
  26#include "ast.h"
  27
  28static int                      ls_count;
  29static struct mutex             ls_lock;
  30static struct list_head         lslist;
  31static spinlock_t               lslist_lock;
  32static struct task_struct *     scand_task;
  33
  34
  35static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
  36{
  37        ssize_t ret = len;
  38        int n;
  39        int rc = kstrtoint(buf, 0, &n);
  40
  41        if (rc)
  42                return rc;
  43        ls = dlm_find_lockspace_local(ls->ls_local_handle);
  44        if (!ls)
  45                return -EINVAL;
  46
  47        switch (n) {
  48        case 0:
  49                dlm_ls_stop(ls);
  50                break;
  51        case 1:
  52                dlm_ls_start(ls);
  53                break;
  54        default:
  55                ret = -EINVAL;
  56        }
  57        dlm_put_lockspace(ls);
  58        return ret;
  59}
  60
  61static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
  62{
  63        int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
  64
  65        if (rc)
  66                return rc;
  67        set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
  68        wake_up(&ls->ls_uevent_wait);
  69        return len;
  70}
  71
  72static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
  73{
  74        return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
  75}
  76
  77static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
  78{
  79        int rc = kstrtouint(buf, 0, &ls->ls_global_id);
  80
  81        if (rc)
  82                return rc;
  83        return len;
  84}
  85
  86static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
  87{
  88        return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
  89}
  90
  91static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
  92{
  93        int val;
  94        int rc = kstrtoint(buf, 0, &val);
  95
  96        if (rc)
  97                return rc;
  98        if (val == 1)
  99                set_bit(LSFL_NODIR, &ls->ls_flags);
 100        return len;
 101}
 102
 103static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
 104{
 105        uint32_t status = dlm_recover_status(ls);
 106        return snprintf(buf, PAGE_SIZE, "%x\n", status);
 107}
 108
 109static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
 110{
 111        return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
 112}
 113
 114struct dlm_attr {
 115        struct attribute attr;
 116        ssize_t (*show)(struct dlm_ls *, char *);
 117        ssize_t (*store)(struct dlm_ls *, const char *, size_t);
 118};
 119
 120static struct dlm_attr dlm_attr_control = {
 121        .attr  = {.name = "control", .mode = S_IWUSR},
 122        .store = dlm_control_store
 123};
 124
 125static struct dlm_attr dlm_attr_event = {
 126        .attr  = {.name = "event_done", .mode = S_IWUSR},
 127        .store = dlm_event_store
 128};
 129
 130static struct dlm_attr dlm_attr_id = {
 131        .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
 132        .show  = dlm_id_show,
 133        .store = dlm_id_store
 134};
 135
 136static struct dlm_attr dlm_attr_nodir = {
 137        .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
 138        .show  = dlm_nodir_show,
 139        .store = dlm_nodir_store
 140};
 141
 142static struct dlm_attr dlm_attr_recover_status = {
 143        .attr  = {.name = "recover_status", .mode = S_IRUGO},
 144        .show  = dlm_recover_status_show
 145};
 146
 147static struct dlm_attr dlm_attr_recover_nodeid = {
 148        .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
 149        .show  = dlm_recover_nodeid_show
 150};
 151
 152static struct attribute *dlm_attrs[] = {
 153        &dlm_attr_control.attr,
 154        &dlm_attr_event.attr,
 155        &dlm_attr_id.attr,
 156        &dlm_attr_nodir.attr,
 157        &dlm_attr_recover_status.attr,
 158        &dlm_attr_recover_nodeid.attr,
 159        NULL,
 160};
 161ATTRIBUTE_GROUPS(dlm);
 162
 163static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
 164                             char *buf)
 165{
 166        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 167        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 168        return a->show ? a->show(ls, buf) : 0;
 169}
 170
 171static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
 172                              const char *buf, size_t len)
 173{
 174        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 175        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 176        return a->store ? a->store(ls, buf, len) : len;
 177}
 178
 179static void lockspace_kobj_release(struct kobject *k)
 180{
 181        struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
 182        kfree(ls);
 183}
 184
 185static const struct sysfs_ops dlm_attr_ops = {
 186        .show  = dlm_attr_show,
 187        .store = dlm_attr_store,
 188};
 189
 190static struct kobj_type dlm_ktype = {
 191        .default_groups = dlm_groups,
 192        .sysfs_ops     = &dlm_attr_ops,
 193        .release       = lockspace_kobj_release,
 194};
 195
 196static struct kset *dlm_kset;
 197
 198static int do_uevent(struct dlm_ls *ls, int in)
 199{
 200        int error;
 201
 202        if (in)
 203                kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
 204        else
 205                kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
 206
 207        log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
 208
 209        /* dlm_controld will see the uevent, do the necessary group management
 210           and then write to sysfs to wake us */
 211
 212        error = wait_event_interruptible(ls->ls_uevent_wait,
 213                        test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
 214
 215        log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
 216
 217        if (error)
 218                goto out;
 219
 220        error = ls->ls_uevent_result;
 221 out:
 222        if (error)
 223                log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
 224                          error, ls->ls_uevent_result);
 225        return error;
 226}
 227
 228static int dlm_uevent(struct kset *kset, struct kobject *kobj,
 229                      struct kobj_uevent_env *env)
 230{
 231        struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
 232
 233        add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
 234        return 0;
 235}
 236
 237static const struct kset_uevent_ops dlm_uevent_ops = {
 238        .uevent = dlm_uevent,
 239};
 240
 241int __init dlm_lockspace_init(void)
 242{
 243        ls_count = 0;
 244        mutex_init(&ls_lock);
 245        INIT_LIST_HEAD(&lslist);
 246        spin_lock_init(&lslist_lock);
 247
 248        dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
 249        if (!dlm_kset) {
 250                printk(KERN_WARNING "%s: can not create kset\n", __func__);
 251                return -ENOMEM;
 252        }
 253        return 0;
 254}
 255
 256void dlm_lockspace_exit(void)
 257{
 258        kset_unregister(dlm_kset);
 259}
 260
 261static struct dlm_ls *find_ls_to_scan(void)
 262{
 263        struct dlm_ls *ls;
 264
 265        spin_lock(&lslist_lock);
 266        list_for_each_entry(ls, &lslist, ls_list) {
 267                if (time_after_eq(jiffies, ls->ls_scan_time +
 268                                            dlm_config.ci_scan_secs * HZ)) {
 269                        spin_unlock(&lslist_lock);
 270                        return ls;
 271                }
 272        }
 273        spin_unlock(&lslist_lock);
 274        return NULL;
 275}
 276
 277static int dlm_scand(void *data)
 278{
 279        struct dlm_ls *ls;
 280
 281        while (!kthread_should_stop()) {
 282                ls = find_ls_to_scan();
 283                if (ls) {
 284                        if (dlm_lock_recovery_try(ls)) {
 285                                ls->ls_scan_time = jiffies;
 286                                dlm_scan_rsbs(ls);
 287                                dlm_scan_timeout(ls);
 288                                dlm_scan_waiters(ls);
 289                                dlm_unlock_recovery(ls);
 290                        } else {
 291                                ls->ls_scan_time += HZ;
 292                        }
 293                        continue;
 294                }
 295                schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
 296        }
 297        return 0;
 298}
 299
 300static int dlm_scand_start(void)
 301{
 302        struct task_struct *p;
 303        int error = 0;
 304
 305        p = kthread_run(dlm_scand, NULL, "dlm_scand");
 306        if (IS_ERR(p))
 307                error = PTR_ERR(p);
 308        else
 309                scand_task = p;
 310        return error;
 311}
 312
 313static void dlm_scand_stop(void)
 314{
 315        kthread_stop(scand_task);
 316}
 317
 318struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 319{
 320        struct dlm_ls *ls;
 321
 322        spin_lock(&lslist_lock);
 323
 324        list_for_each_entry(ls, &lslist, ls_list) {
 325                if (ls->ls_global_id == id) {
 326                        ls->ls_count++;
 327                        goto out;
 328                }
 329        }
 330        ls = NULL;
 331 out:
 332        spin_unlock(&lslist_lock);
 333        return ls;
 334}
 335
 336struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 337{
 338        struct dlm_ls *ls;
 339
 340        spin_lock(&lslist_lock);
 341        list_for_each_entry(ls, &lslist, ls_list) {
 342                if (ls->ls_local_handle == lockspace) {
 343                        ls->ls_count++;
 344                        goto out;
 345                }
 346        }
 347        ls = NULL;
 348 out:
 349        spin_unlock(&lslist_lock);
 350        return ls;
 351}
 352
 353struct dlm_ls *dlm_find_lockspace_device(int minor)
 354{
 355        struct dlm_ls *ls;
 356
 357        spin_lock(&lslist_lock);
 358        list_for_each_entry(ls, &lslist, ls_list) {
 359                if (ls->ls_device.minor == minor) {
 360                        ls->ls_count++;
 361                        goto out;
 362                }
 363        }
 364        ls = NULL;
 365 out:
 366        spin_unlock(&lslist_lock);
 367        return ls;
 368}
 369
 370void dlm_put_lockspace(struct dlm_ls *ls)
 371{
 372        spin_lock(&lslist_lock);
 373        ls->ls_count--;
 374        spin_unlock(&lslist_lock);
 375}
 376
 377static void remove_lockspace(struct dlm_ls *ls)
 378{
 379        for (;;) {
 380                spin_lock(&lslist_lock);
 381                if (ls->ls_count == 0) {
 382                        WARN_ON(ls->ls_create_count != 0);
 383                        list_del(&ls->ls_list);
 384                        spin_unlock(&lslist_lock);
 385                        return;
 386                }
 387                spin_unlock(&lslist_lock);
 388                ssleep(1);
 389        }
 390}
 391
 392static int threads_start(void)
 393{
 394        int error;
 395
 396        error = dlm_scand_start();
 397        if (error) {
 398                log_print("cannot start dlm_scand thread %d", error);
 399                goto fail;
 400        }
 401
 402        /* Thread for sending/receiving messages for all lockspace's */
 403        error = dlm_lowcomms_start();
 404        if (error) {
 405                log_print("cannot start dlm lowcomms %d", error);
 406                goto scand_fail;
 407        }
 408
 409        return 0;
 410
 411 scand_fail:
 412        dlm_scand_stop();
 413 fail:
 414        return error;
 415}
 416
 417static void threads_stop(void)
 418{
 419        dlm_scand_stop();
 420        dlm_lowcomms_stop();
 421}
 422
 423static int new_lockspace(const char *name, const char *cluster,
 424                         uint32_t flags, int lvblen,
 425                         const struct dlm_lockspace_ops *ops, void *ops_arg,
 426                         int *ops_result, dlm_lockspace_t **lockspace)
 427{
 428        struct dlm_ls *ls;
 429        int i, size, error;
 430        int do_unreg = 0;
 431        int namelen = strlen(name);
 432
 433        if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
 434                return -EINVAL;
 435
 436        if (!lvblen || (lvblen % 8))
 437                return -EINVAL;
 438
 439        if (!try_module_get(THIS_MODULE))
 440                return -EINVAL;
 441
 442        if (!dlm_user_daemon_available()) {
 443                log_print("dlm user daemon not available");
 444                error = -EUNATCH;
 445                goto out;
 446        }
 447
 448        if (ops && ops_result) {
 449                if (!dlm_config.ci_recover_callbacks)
 450                        *ops_result = -EOPNOTSUPP;
 451                else
 452                        *ops_result = 0;
 453        }
 454
 455        if (!cluster)
 456                log_print("dlm cluster name '%s' is being used without an application provided cluster name",
 457                          dlm_config.ci_cluster_name);
 458
 459        if (dlm_config.ci_recover_callbacks && cluster &&
 460            strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
 461                log_print("dlm cluster name '%s' does not match "
 462                          "the application cluster name '%s'",
 463                          dlm_config.ci_cluster_name, cluster);
 464                error = -EBADR;
 465                goto out;
 466        }
 467
 468        error = 0;
 469
 470        spin_lock(&lslist_lock);
 471        list_for_each_entry(ls, &lslist, ls_list) {
 472                WARN_ON(ls->ls_create_count <= 0);
 473                if (ls->ls_namelen != namelen)
 474                        continue;
 475                if (memcmp(ls->ls_name, name, namelen))
 476                        continue;
 477                if (flags & DLM_LSFL_NEWEXCL) {
 478                        error = -EEXIST;
 479                        break;
 480                }
 481                ls->ls_create_count++;
 482                *lockspace = ls;
 483                error = 1;
 484                break;
 485        }
 486        spin_unlock(&lslist_lock);
 487
 488        if (error)
 489                goto out;
 490
 491        error = -ENOMEM;
 492
 493        ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
 494        if (!ls)
 495                goto out;
 496        memcpy(ls->ls_name, name, namelen);
 497        ls->ls_namelen = namelen;
 498        ls->ls_lvblen = lvblen;
 499        ls->ls_count = 0;
 500        ls->ls_flags = 0;
 501        ls->ls_scan_time = jiffies;
 502
 503        if (ops && dlm_config.ci_recover_callbacks) {
 504                ls->ls_ops = ops;
 505                ls->ls_ops_arg = ops_arg;
 506        }
 507
 508        if (flags & DLM_LSFL_TIMEWARN)
 509                set_bit(LSFL_TIMEWARN, &ls->ls_flags);
 510
 511        /* ls_exflags are forced to match among nodes, and we don't
 512           need to require all nodes to have some flags set */
 513        ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
 514                                    DLM_LSFL_NEWEXCL));
 515
 516        size = dlm_config.ci_rsbtbl_size;
 517        ls->ls_rsbtbl_size = size;
 518
 519        ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
 520        if (!ls->ls_rsbtbl)
 521                goto out_lsfree;
 522        for (i = 0; i < size; i++) {
 523                ls->ls_rsbtbl[i].keep.rb_node = NULL;
 524                ls->ls_rsbtbl[i].toss.rb_node = NULL;
 525                spin_lock_init(&ls->ls_rsbtbl[i].lock);
 526        }
 527
 528        spin_lock_init(&ls->ls_remove_spin);
 529
 530        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 531                ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
 532                                                 GFP_KERNEL);
 533                if (!ls->ls_remove_names[i])
 534                        goto out_rsbtbl;
 535        }
 536
 537        idr_init(&ls->ls_lkbidr);
 538        spin_lock_init(&ls->ls_lkbidr_spin);
 539
 540        INIT_LIST_HEAD(&ls->ls_waiters);
 541        mutex_init(&ls->ls_waiters_mutex);
 542        INIT_LIST_HEAD(&ls->ls_orphans);
 543        mutex_init(&ls->ls_orphans_mutex);
 544        INIT_LIST_HEAD(&ls->ls_timeout);
 545        mutex_init(&ls->ls_timeout_mutex);
 546
 547        INIT_LIST_HEAD(&ls->ls_new_rsb);
 548        spin_lock_init(&ls->ls_new_rsb_spin);
 549
 550        INIT_LIST_HEAD(&ls->ls_nodes);
 551        INIT_LIST_HEAD(&ls->ls_nodes_gone);
 552        ls->ls_num_nodes = 0;
 553        ls->ls_low_nodeid = 0;
 554        ls->ls_total_weight = 0;
 555        ls->ls_node_array = NULL;
 556
 557        memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
 558        ls->ls_stub_rsb.res_ls = ls;
 559
 560        ls->ls_debug_rsb_dentry = NULL;
 561        ls->ls_debug_waiters_dentry = NULL;
 562
 563        init_waitqueue_head(&ls->ls_uevent_wait);
 564        ls->ls_uevent_result = 0;
 565        init_completion(&ls->ls_members_done);
 566        ls->ls_members_result = -1;
 567
 568        mutex_init(&ls->ls_cb_mutex);
 569        INIT_LIST_HEAD(&ls->ls_cb_delay);
 570
 571        ls->ls_recoverd_task = NULL;
 572        mutex_init(&ls->ls_recoverd_active);
 573        spin_lock_init(&ls->ls_recover_lock);
 574        spin_lock_init(&ls->ls_rcom_spin);
 575        get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
 576        ls->ls_recover_status = 0;
 577        ls->ls_recover_seq = 0;
 578        ls->ls_recover_args = NULL;
 579        init_rwsem(&ls->ls_in_recovery);
 580        init_rwsem(&ls->ls_recv_active);
 581        INIT_LIST_HEAD(&ls->ls_requestqueue);
 582        mutex_init(&ls->ls_requestqueue_mutex);
 583        mutex_init(&ls->ls_clear_proc_locks);
 584
 585        ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
 586        if (!ls->ls_recover_buf)
 587                goto out_lkbidr;
 588
 589        ls->ls_slot = 0;
 590        ls->ls_num_slots = 0;
 591        ls->ls_slots_size = 0;
 592        ls->ls_slots = NULL;
 593
 594        INIT_LIST_HEAD(&ls->ls_recover_list);
 595        spin_lock_init(&ls->ls_recover_list_lock);
 596        idr_init(&ls->ls_recover_idr);
 597        spin_lock_init(&ls->ls_recover_idr_lock);
 598        ls->ls_recover_list_count = 0;
 599        ls->ls_local_handle = ls;
 600        init_waitqueue_head(&ls->ls_wait_general);
 601        INIT_LIST_HEAD(&ls->ls_root_list);
 602        init_rwsem(&ls->ls_root_sem);
 603
 604        spin_lock(&lslist_lock);
 605        ls->ls_create_count = 1;
 606        list_add(&ls->ls_list, &lslist);
 607        spin_unlock(&lslist_lock);
 608
 609        if (flags & DLM_LSFL_FS) {
 610                error = dlm_callback_start(ls);
 611                if (error) {
 612                        log_error(ls, "can't start dlm_callback %d", error);
 613                        goto out_delist;
 614                }
 615        }
 616
 617        init_waitqueue_head(&ls->ls_recover_lock_wait);
 618
 619        /*
 620         * Once started, dlm_recoverd first looks for ls in lslist, then
 621         * initializes ls_in_recovery as locked in "down" mode.  We need
 622         * to wait for the wakeup from dlm_recoverd because in_recovery
 623         * has to start out in down mode.
 624         */
 625
 626        error = dlm_recoverd_start(ls);
 627        if (error) {
 628                log_error(ls, "can't start dlm_recoverd %d", error);
 629                goto out_callback;
 630        }
 631
 632        wait_event(ls->ls_recover_lock_wait,
 633                   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
 634
 635        ls->ls_kobj.kset = dlm_kset;
 636        error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
 637                                     "%s", ls->ls_name);
 638        if (error)
 639                goto out_recoverd;
 640        kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
 641
 642        /* let kobject handle freeing of ls if there's an error */
 643        do_unreg = 1;
 644
 645        /* This uevent triggers dlm_controld in userspace to add us to the
 646           group of nodes that are members of this lockspace (managed by the
 647           cluster infrastructure.)  Once it's done that, it tells us who the
 648           current lockspace members are (via configfs) and then tells the
 649           lockspace to start running (via sysfs) in dlm_ls_start(). */
 650
 651        error = do_uevent(ls, 1);
 652        if (error)
 653                goto out_recoverd;
 654
 655        wait_for_completion(&ls->ls_members_done);
 656        error = ls->ls_members_result;
 657        if (error)
 658                goto out_members;
 659
 660        dlm_create_debug_file(ls);
 661
 662        log_rinfo(ls, "join complete");
 663        *lockspace = ls;
 664        return 0;
 665
 666 out_members:
 667        do_uevent(ls, 0);
 668        dlm_clear_members(ls);
 669        kfree(ls->ls_node_array);
 670 out_recoverd:
 671        dlm_recoverd_stop(ls);
 672 out_callback:
 673        dlm_callback_stop(ls);
 674 out_delist:
 675        spin_lock(&lslist_lock);
 676        list_del(&ls->ls_list);
 677        spin_unlock(&lslist_lock);
 678        idr_destroy(&ls->ls_recover_idr);
 679        kfree(ls->ls_recover_buf);
 680 out_lkbidr:
 681        idr_destroy(&ls->ls_lkbidr);
 682 out_rsbtbl:
 683        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 684                kfree(ls->ls_remove_names[i]);
 685        vfree(ls->ls_rsbtbl);
 686 out_lsfree:
 687        if (do_unreg)
 688                kobject_put(&ls->ls_kobj);
 689        else
 690                kfree(ls);
 691 out:
 692        module_put(THIS_MODULE);
 693        return error;
 694}
 695
 696int dlm_new_lockspace(const char *name, const char *cluster,
 697                      uint32_t flags, int lvblen,
 698                      const struct dlm_lockspace_ops *ops, void *ops_arg,
 699                      int *ops_result, dlm_lockspace_t **lockspace)
 700{
 701        int error = 0;
 702
 703        mutex_lock(&ls_lock);
 704        if (!ls_count)
 705                error = threads_start();
 706        if (error)
 707                goto out;
 708
 709        error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
 710                              ops_result, lockspace);
 711        if (!error)
 712                ls_count++;
 713        if (error > 0)
 714                error = 0;
 715        if (!ls_count)
 716                threads_stop();
 717 out:
 718        mutex_unlock(&ls_lock);
 719        return error;
 720}
 721
 722static int lkb_idr_is_local(int id, void *p, void *data)
 723{
 724        struct dlm_lkb *lkb = p;
 725
 726        return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
 727}
 728
 729static int lkb_idr_is_any(int id, void *p, void *data)
 730{
 731        return 1;
 732}
 733
 734static int lkb_idr_free(int id, void *p, void *data)
 735{
 736        struct dlm_lkb *lkb = p;
 737
 738        if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
 739                dlm_free_lvb(lkb->lkb_lvbptr);
 740
 741        dlm_free_lkb(lkb);
 742        return 0;
 743}
 744
 745/* NOTE: We check the lkbidr here rather than the resource table.
 746   This is because there may be LKBs queued as ASTs that have been unlinked
 747   from their RSBs and are pending deletion once the AST has been delivered */
 748
 749static int lockspace_busy(struct dlm_ls *ls, int force)
 750{
 751        int rv;
 752
 753        spin_lock(&ls->ls_lkbidr_spin);
 754        if (force == 0) {
 755                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
 756        } else if (force == 1) {
 757                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
 758        } else {
 759                rv = 0;
 760        }
 761        spin_unlock(&ls->ls_lkbidr_spin);
 762        return rv;
 763}
 764
 765static int release_lockspace(struct dlm_ls *ls, int force)
 766{
 767        struct dlm_rsb *rsb;
 768        struct rb_node *n;
 769        int i, busy, rv;
 770
 771        busy = lockspace_busy(ls, force);
 772
 773        spin_lock(&lslist_lock);
 774        if (ls->ls_create_count == 1) {
 775                if (busy) {
 776                        rv = -EBUSY;
 777                } else {
 778                        /* remove_lockspace takes ls off lslist */
 779                        ls->ls_create_count = 0;
 780                        rv = 0;
 781                }
 782        } else if (ls->ls_create_count > 1) {
 783                rv = --ls->ls_create_count;
 784        } else {
 785                rv = -EINVAL;
 786        }
 787        spin_unlock(&lslist_lock);
 788
 789        if (rv) {
 790                log_debug(ls, "release_lockspace no remove %d", rv);
 791                return rv;
 792        }
 793
 794        dlm_device_deregister(ls);
 795
 796        if (force < 3 && dlm_user_daemon_available())
 797                do_uevent(ls, 0);
 798
 799        dlm_recoverd_stop(ls);
 800
 801        dlm_callback_stop(ls);
 802
 803        remove_lockspace(ls);
 804
 805        dlm_delete_debug_file(ls);
 806
 807        idr_destroy(&ls->ls_recover_idr);
 808        kfree(ls->ls_recover_buf);
 809
 810        /*
 811         * Free all lkb's in idr
 812         */
 813
 814        idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
 815        idr_destroy(&ls->ls_lkbidr);
 816
 817        /*
 818         * Free all rsb's on rsbtbl[] lists
 819         */
 820
 821        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 822                while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
 823                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 824                        rb_erase(n, &ls->ls_rsbtbl[i].keep);
 825                        dlm_free_rsb(rsb);
 826                }
 827
 828                while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
 829                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 830                        rb_erase(n, &ls->ls_rsbtbl[i].toss);
 831                        dlm_free_rsb(rsb);
 832                }
 833        }
 834
 835        vfree(ls->ls_rsbtbl);
 836
 837        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 838                kfree(ls->ls_remove_names[i]);
 839
 840        while (!list_empty(&ls->ls_new_rsb)) {
 841                rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
 842                                       res_hashchain);
 843                list_del(&rsb->res_hashchain);
 844                dlm_free_rsb(rsb);
 845        }
 846
 847        /*
 848         * Free structures on any other lists
 849         */
 850
 851        dlm_purge_requestqueue(ls);
 852        kfree(ls->ls_recover_args);
 853        dlm_clear_members(ls);
 854        dlm_clear_members_gone(ls);
 855        kfree(ls->ls_node_array);
 856        log_rinfo(ls, "release_lockspace final free");
 857        kobject_put(&ls->ls_kobj);
 858        /* The ls structure will be freed when the kobject is done with */
 859
 860        module_put(THIS_MODULE);
 861        return 0;
 862}
 863
 864/*
 865 * Called when a system has released all its locks and is not going to use the
 866 * lockspace any longer.  We free everything we're managing for this lockspace.
 867 * Remaining nodes will go through the recovery process as if we'd died.  The
 868 * lockspace must continue to function as usual, participating in recoveries,
 869 * until this returns.
 870 *
 871 * Force has 4 possible values:
 872 * 0 - don't destroy locksapce if it has any LKBs
 873 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
 874 * 2 - destroy lockspace regardless of LKBs
 875 * 3 - destroy lockspace as part of a forced shutdown
 876 */
 877
 878int dlm_release_lockspace(void *lockspace, int force)
 879{
 880        struct dlm_ls *ls;
 881        int error;
 882
 883        ls = dlm_find_lockspace_local(lockspace);
 884        if (!ls)
 885                return -EINVAL;
 886        dlm_put_lockspace(ls);
 887
 888        mutex_lock(&ls_lock);
 889        error = release_lockspace(ls, force);
 890        if (!error)
 891                ls_count--;
 892        if (!ls_count)
 893                threads_stop();
 894        mutex_unlock(&ls_lock);
 895
 896        return error;
 897}
 898
 899void dlm_stop_lockspaces(void)
 900{
 901        struct dlm_ls *ls;
 902        int count;
 903
 904 restart:
 905        count = 0;
 906        spin_lock(&lslist_lock);
 907        list_for_each_entry(ls, &lslist, ls_list) {
 908                if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
 909                        count++;
 910                        continue;
 911                }
 912                spin_unlock(&lslist_lock);
 913                log_error(ls, "no userland control daemon, stopping lockspace");
 914                dlm_ls_stop(ls);
 915                goto restart;
 916        }
 917        spin_unlock(&lslist_lock);
 918
 919        if (count)
 920                log_print("dlm user daemon left %d lockspaces", count);
 921}
 922
 923