linux/fs/dlm/lockspace.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/******************************************************************************
   3*******************************************************************************
   4**
   5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
   7**
   8**
   9*******************************************************************************
  10******************************************************************************/
  11
  12#include <linux/module.h>
  13
  14#include "dlm_internal.h"
  15#include "lockspace.h"
  16#include "member.h"
  17#include "recoverd.h"
  18#include "dir.h"
  19#include "midcomms.h"
  20#include "lowcomms.h"
  21#include "config.h"
  22#include "memory.h"
  23#include "lock.h"
  24#include "recover.h"
  25#include "requestqueue.h"
  26#include "user.h"
  27#include "ast.h"
  28
  29static int                      ls_count;
  30static struct mutex             ls_lock;
  31static struct list_head         lslist;
  32static spinlock_t               lslist_lock;
  33static struct task_struct *     scand_task;
  34
  35
  36static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
  37{
  38        ssize_t ret = len;
  39        int n;
  40        int rc = kstrtoint(buf, 0, &n);
  41
  42        if (rc)
  43                return rc;
  44        ls = dlm_find_lockspace_local(ls->ls_local_handle);
  45        if (!ls)
  46                return -EINVAL;
  47
  48        switch (n) {
  49        case 0:
  50                dlm_ls_stop(ls);
  51                break;
  52        case 1:
  53                dlm_ls_start(ls);
  54                break;
  55        default:
  56                ret = -EINVAL;
  57        }
  58        dlm_put_lockspace(ls);
  59        return ret;
  60}
  61
  62static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
  63{
  64        int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
  65
  66        if (rc)
  67                return rc;
  68        set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
  69        wake_up(&ls->ls_uevent_wait);
  70        return len;
  71}
  72
  73static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
  74{
  75        return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
  76}
  77
  78static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
  79{
  80        int rc = kstrtouint(buf, 0, &ls->ls_global_id);
  81
  82        if (rc)
  83                return rc;
  84        return len;
  85}
  86
  87static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
  88{
  89        return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
  90}
  91
  92static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
  93{
  94        int val;
  95        int rc = kstrtoint(buf, 0, &val);
  96
  97        if (rc)
  98                return rc;
  99        if (val == 1)
 100                set_bit(LSFL_NODIR, &ls->ls_flags);
 101        return len;
 102}
 103
 104static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
 105{
 106        uint32_t status = dlm_recover_status(ls);
 107        return snprintf(buf, PAGE_SIZE, "%x\n", status);
 108}
 109
 110static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
 111{
 112        return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
 113}
 114
 115struct dlm_attr {
 116        struct attribute attr;
 117        ssize_t (*show)(struct dlm_ls *, char *);
 118        ssize_t (*store)(struct dlm_ls *, const char *, size_t);
 119};
 120
 121static struct dlm_attr dlm_attr_control = {
 122        .attr  = {.name = "control", .mode = S_IWUSR},
 123        .store = dlm_control_store
 124};
 125
 126static struct dlm_attr dlm_attr_event = {
 127        .attr  = {.name = "event_done", .mode = S_IWUSR},
 128        .store = dlm_event_store
 129};
 130
 131static struct dlm_attr dlm_attr_id = {
 132        .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
 133        .show  = dlm_id_show,
 134        .store = dlm_id_store
 135};
 136
 137static struct dlm_attr dlm_attr_nodir = {
 138        .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
 139        .show  = dlm_nodir_show,
 140        .store = dlm_nodir_store
 141};
 142
 143static struct dlm_attr dlm_attr_recover_status = {
 144        .attr  = {.name = "recover_status", .mode = S_IRUGO},
 145        .show  = dlm_recover_status_show
 146};
 147
 148static struct dlm_attr dlm_attr_recover_nodeid = {
 149        .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
 150        .show  = dlm_recover_nodeid_show
 151};
 152
 153static struct attribute *dlm_attrs[] = {
 154        &dlm_attr_control.attr,
 155        &dlm_attr_event.attr,
 156        &dlm_attr_id.attr,
 157        &dlm_attr_nodir.attr,
 158        &dlm_attr_recover_status.attr,
 159        &dlm_attr_recover_nodeid.attr,
 160        NULL,
 161};
 162ATTRIBUTE_GROUPS(dlm);
 163
 164static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
 165                             char *buf)
 166{
 167        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 168        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 169        return a->show ? a->show(ls, buf) : 0;
 170}
 171
 172static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
 173                              const char *buf, size_t len)
 174{
 175        struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 176        struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 177        return a->store ? a->store(ls, buf, len) : len;
 178}
 179
 180static void lockspace_kobj_release(struct kobject *k)
 181{
 182        struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
 183        kfree(ls);
 184}
 185
 186static const struct sysfs_ops dlm_attr_ops = {
 187        .show  = dlm_attr_show,
 188        .store = dlm_attr_store,
 189};
 190
 191static struct kobj_type dlm_ktype = {
 192        .default_groups = dlm_groups,
 193        .sysfs_ops     = &dlm_attr_ops,
 194        .release       = lockspace_kobj_release,
 195};
 196
 197static struct kset *dlm_kset;
 198
 199static int do_uevent(struct dlm_ls *ls, int in)
 200{
 201        if (in)
 202                kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
 203        else
 204                kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
 205
 206        log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
 207
 208        /* dlm_controld will see the uevent, do the necessary group management
 209           and then write to sysfs to wake us */
 210
 211        wait_event(ls->ls_uevent_wait,
 212                   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
 213
 214        log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
 215
 216        return ls->ls_uevent_result;
 217}
 218
 219static int dlm_uevent(struct kset *kset, struct kobject *kobj,
 220                      struct kobj_uevent_env *env)
 221{
 222        struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
 223
 224        add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
 225        return 0;
 226}
 227
 228static const struct kset_uevent_ops dlm_uevent_ops = {
 229        .uevent = dlm_uevent,
 230};
 231
 232int __init dlm_lockspace_init(void)
 233{
 234        ls_count = 0;
 235        mutex_init(&ls_lock);
 236        INIT_LIST_HEAD(&lslist);
 237        spin_lock_init(&lslist_lock);
 238
 239        dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
 240        if (!dlm_kset) {
 241                printk(KERN_WARNING "%s: can not create kset\n", __func__);
 242                return -ENOMEM;
 243        }
 244        return 0;
 245}
 246
 247void dlm_lockspace_exit(void)
 248{
 249        kset_unregister(dlm_kset);
 250}
 251
 252static struct dlm_ls *find_ls_to_scan(void)
 253{
 254        struct dlm_ls *ls;
 255
 256        spin_lock(&lslist_lock);
 257        list_for_each_entry(ls, &lslist, ls_list) {
 258                if (time_after_eq(jiffies, ls->ls_scan_time +
 259                                            dlm_config.ci_scan_secs * HZ)) {
 260                        spin_unlock(&lslist_lock);
 261                        return ls;
 262                }
 263        }
 264        spin_unlock(&lslist_lock);
 265        return NULL;
 266}
 267
 268static int dlm_scand(void *data)
 269{
 270        struct dlm_ls *ls;
 271
 272        while (!kthread_should_stop()) {
 273                ls = find_ls_to_scan();
 274                if (ls) {
 275                        if (dlm_lock_recovery_try(ls)) {
 276                                ls->ls_scan_time = jiffies;
 277                                dlm_scan_rsbs(ls);
 278                                dlm_scan_timeout(ls);
 279                                dlm_scan_waiters(ls);
 280                                dlm_unlock_recovery(ls);
 281                        } else {
 282                                ls->ls_scan_time += HZ;
 283                        }
 284                        continue;
 285                }
 286                schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
 287        }
 288        return 0;
 289}
 290
 291static int dlm_scand_start(void)
 292{
 293        struct task_struct *p;
 294        int error = 0;
 295
 296        p = kthread_run(dlm_scand, NULL, "dlm_scand");
 297        if (IS_ERR(p))
 298                error = PTR_ERR(p);
 299        else
 300                scand_task = p;
 301        return error;
 302}
 303
 304static void dlm_scand_stop(void)
 305{
 306        kthread_stop(scand_task);
 307}
 308
 309struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 310{
 311        struct dlm_ls *ls;
 312
 313        spin_lock(&lslist_lock);
 314
 315        list_for_each_entry(ls, &lslist, ls_list) {
 316                if (ls->ls_global_id == id) {
 317                        ls->ls_count++;
 318                        goto out;
 319                }
 320        }
 321        ls = NULL;
 322 out:
 323        spin_unlock(&lslist_lock);
 324        return ls;
 325}
 326
 327struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 328{
 329        struct dlm_ls *ls;
 330
 331        spin_lock(&lslist_lock);
 332        list_for_each_entry(ls, &lslist, ls_list) {
 333                if (ls->ls_local_handle == lockspace) {
 334                        ls->ls_count++;
 335                        goto out;
 336                }
 337        }
 338        ls = NULL;
 339 out:
 340        spin_unlock(&lslist_lock);
 341        return ls;
 342}
 343
 344struct dlm_ls *dlm_find_lockspace_device(int minor)
 345{
 346        struct dlm_ls *ls;
 347
 348        spin_lock(&lslist_lock);
 349        list_for_each_entry(ls, &lslist, ls_list) {
 350                if (ls->ls_device.minor == minor) {
 351                        ls->ls_count++;
 352                        goto out;
 353                }
 354        }
 355        ls = NULL;
 356 out:
 357        spin_unlock(&lslist_lock);
 358        return ls;
 359}
 360
 361void dlm_put_lockspace(struct dlm_ls *ls)
 362{
 363        spin_lock(&lslist_lock);
 364        ls->ls_count--;
 365        spin_unlock(&lslist_lock);
 366}
 367
 368static void remove_lockspace(struct dlm_ls *ls)
 369{
 370        for (;;) {
 371                spin_lock(&lslist_lock);
 372                if (ls->ls_count == 0) {
 373                        WARN_ON(ls->ls_create_count != 0);
 374                        list_del(&ls->ls_list);
 375                        spin_unlock(&lslist_lock);
 376                        return;
 377                }
 378                spin_unlock(&lslist_lock);
 379                ssleep(1);
 380        }
 381}
 382
 383static int threads_start(void)
 384{
 385        int error;
 386
 387        error = dlm_scand_start();
 388        if (error) {
 389                log_print("cannot start dlm_scand thread %d", error);
 390                goto fail;
 391        }
 392
 393        /* Thread for sending/receiving messages for all lockspace's */
 394        error = dlm_midcomms_start();
 395        if (error) {
 396                log_print("cannot start dlm lowcomms %d", error);
 397                goto scand_fail;
 398        }
 399
 400        return 0;
 401
 402 scand_fail:
 403        dlm_scand_stop();
 404 fail:
 405        return error;
 406}
 407
 408static int new_lockspace(const char *name, const char *cluster,
 409                         uint32_t flags, int lvblen,
 410                         const struct dlm_lockspace_ops *ops, void *ops_arg,
 411                         int *ops_result, dlm_lockspace_t **lockspace)
 412{
 413        struct dlm_ls *ls;
 414        int i, size, error;
 415        int do_unreg = 0;
 416        int namelen = strlen(name);
 417
 418        if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
 419                return -EINVAL;
 420
 421        if (!lvblen || (lvblen % 8))
 422                return -EINVAL;
 423
 424        if (!try_module_get(THIS_MODULE))
 425                return -EINVAL;
 426
 427        if (!dlm_user_daemon_available()) {
 428                log_print("dlm user daemon not available");
 429                error = -EUNATCH;
 430                goto out;
 431        }
 432
 433        if (ops && ops_result) {
 434                if (!dlm_config.ci_recover_callbacks)
 435                        *ops_result = -EOPNOTSUPP;
 436                else
 437                        *ops_result = 0;
 438        }
 439
 440        if (!cluster)
 441                log_print("dlm cluster name '%s' is being used without an application provided cluster name",
 442                          dlm_config.ci_cluster_name);
 443
 444        if (dlm_config.ci_recover_callbacks && cluster &&
 445            strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
 446                log_print("dlm cluster name '%s' does not match "
 447                          "the application cluster name '%s'",
 448                          dlm_config.ci_cluster_name, cluster);
 449                error = -EBADR;
 450                goto out;
 451        }
 452
 453        error = 0;
 454
 455        spin_lock(&lslist_lock);
 456        list_for_each_entry(ls, &lslist, ls_list) {
 457                WARN_ON(ls->ls_create_count <= 0);
 458                if (ls->ls_namelen != namelen)
 459                        continue;
 460                if (memcmp(ls->ls_name, name, namelen))
 461                        continue;
 462                if (flags & DLM_LSFL_NEWEXCL) {
 463                        error = -EEXIST;
 464                        break;
 465                }
 466                ls->ls_create_count++;
 467                *lockspace = ls;
 468                error = 1;
 469                break;
 470        }
 471        spin_unlock(&lslist_lock);
 472
 473        if (error)
 474                goto out;
 475
 476        error = -ENOMEM;
 477
 478        ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
 479        if (!ls)
 480                goto out;
 481        memcpy(ls->ls_name, name, namelen);
 482        ls->ls_namelen = namelen;
 483        ls->ls_lvblen = lvblen;
 484        ls->ls_count = 0;
 485        ls->ls_flags = 0;
 486        ls->ls_scan_time = jiffies;
 487
 488        if (ops && dlm_config.ci_recover_callbacks) {
 489                ls->ls_ops = ops;
 490                ls->ls_ops_arg = ops_arg;
 491        }
 492
 493        if (flags & DLM_LSFL_TIMEWARN)
 494                set_bit(LSFL_TIMEWARN, &ls->ls_flags);
 495
 496        /* ls_exflags are forced to match among nodes, and we don't
 497           need to require all nodes to have some flags set */
 498        ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
 499                                    DLM_LSFL_NEWEXCL));
 500
 501        size = READ_ONCE(dlm_config.ci_rsbtbl_size);
 502        ls->ls_rsbtbl_size = size;
 503
 504        ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
 505        if (!ls->ls_rsbtbl)
 506                goto out_lsfree;
 507        for (i = 0; i < size; i++) {
 508                ls->ls_rsbtbl[i].keep.rb_node = NULL;
 509                ls->ls_rsbtbl[i].toss.rb_node = NULL;
 510                spin_lock_init(&ls->ls_rsbtbl[i].lock);
 511        }
 512
 513        spin_lock_init(&ls->ls_remove_spin);
 514
 515        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
 516                ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
 517                                                 GFP_KERNEL);
 518                if (!ls->ls_remove_names[i])
 519                        goto out_rsbtbl;
 520        }
 521
 522        idr_init(&ls->ls_lkbidr);
 523        spin_lock_init(&ls->ls_lkbidr_spin);
 524
 525        INIT_LIST_HEAD(&ls->ls_waiters);
 526        mutex_init(&ls->ls_waiters_mutex);
 527        INIT_LIST_HEAD(&ls->ls_orphans);
 528        mutex_init(&ls->ls_orphans_mutex);
 529        INIT_LIST_HEAD(&ls->ls_timeout);
 530        mutex_init(&ls->ls_timeout_mutex);
 531
 532        INIT_LIST_HEAD(&ls->ls_new_rsb);
 533        spin_lock_init(&ls->ls_new_rsb_spin);
 534
 535        INIT_LIST_HEAD(&ls->ls_nodes);
 536        INIT_LIST_HEAD(&ls->ls_nodes_gone);
 537        ls->ls_num_nodes = 0;
 538        ls->ls_low_nodeid = 0;
 539        ls->ls_total_weight = 0;
 540        ls->ls_node_array = NULL;
 541
 542        memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
 543        ls->ls_stub_rsb.res_ls = ls;
 544
 545        ls->ls_debug_rsb_dentry = NULL;
 546        ls->ls_debug_waiters_dentry = NULL;
 547
 548        init_waitqueue_head(&ls->ls_uevent_wait);
 549        ls->ls_uevent_result = 0;
 550        init_completion(&ls->ls_members_done);
 551        ls->ls_members_result = -1;
 552
 553        mutex_init(&ls->ls_cb_mutex);
 554        INIT_LIST_HEAD(&ls->ls_cb_delay);
 555
 556        ls->ls_recoverd_task = NULL;
 557        mutex_init(&ls->ls_recoverd_active);
 558        spin_lock_init(&ls->ls_recover_lock);
 559        spin_lock_init(&ls->ls_rcom_spin);
 560        get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
 561        ls->ls_recover_status = 0;
 562        ls->ls_recover_seq = 0;
 563        ls->ls_recover_args = NULL;
 564        init_rwsem(&ls->ls_in_recovery);
 565        init_rwsem(&ls->ls_recv_active);
 566        INIT_LIST_HEAD(&ls->ls_requestqueue);
 567        mutex_init(&ls->ls_requestqueue_mutex);
 568        mutex_init(&ls->ls_clear_proc_locks);
 569
 570        /* Due backwards compatibility with 3.1 we need to use maximum
 571         * possible dlm message size to be sure the message will fit and
 572         * not having out of bounds issues. However on sending side 3.2
 573         * might send less.
 574         */
 575        ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
 576        if (!ls->ls_recover_buf)
 577                goto out_lkbidr;
 578
 579        ls->ls_slot = 0;
 580        ls->ls_num_slots = 0;
 581        ls->ls_slots_size = 0;
 582        ls->ls_slots = NULL;
 583
 584        INIT_LIST_HEAD(&ls->ls_recover_list);
 585        spin_lock_init(&ls->ls_recover_list_lock);
 586        idr_init(&ls->ls_recover_idr);
 587        spin_lock_init(&ls->ls_recover_idr_lock);
 588        ls->ls_recover_list_count = 0;
 589        ls->ls_local_handle = ls;
 590        init_waitqueue_head(&ls->ls_wait_general);
 591        INIT_LIST_HEAD(&ls->ls_root_list);
 592        init_rwsem(&ls->ls_root_sem);
 593
 594        spin_lock(&lslist_lock);
 595        ls->ls_create_count = 1;
 596        list_add(&ls->ls_list, &lslist);
 597        spin_unlock(&lslist_lock);
 598
 599        if (flags & DLM_LSFL_FS) {
 600                error = dlm_callback_start(ls);
 601                if (error) {
 602                        log_error(ls, "can't start dlm_callback %d", error);
 603                        goto out_delist;
 604                }
 605        }
 606
 607        init_waitqueue_head(&ls->ls_recover_lock_wait);
 608
 609        /*
 610         * Once started, dlm_recoverd first looks for ls in lslist, then
 611         * initializes ls_in_recovery as locked in "down" mode.  We need
 612         * to wait for the wakeup from dlm_recoverd because in_recovery
 613         * has to start out in down mode.
 614         */
 615
 616        error = dlm_recoverd_start(ls);
 617        if (error) {
 618                log_error(ls, "can't start dlm_recoverd %d", error);
 619                goto out_callback;
 620        }
 621
 622        wait_event(ls->ls_recover_lock_wait,
 623                   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
 624
 625        /* let kobject handle freeing of ls if there's an error */
 626        do_unreg = 1;
 627
 628        ls->ls_kobj.kset = dlm_kset;
 629        error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
 630                                     "%s", ls->ls_name);
 631        if (error)
 632                goto out_recoverd;
 633        kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
 634
 635        /* This uevent triggers dlm_controld in userspace to add us to the
 636           group of nodes that are members of this lockspace (managed by the
 637           cluster infrastructure.)  Once it's done that, it tells us who the
 638           current lockspace members are (via configfs) and then tells the
 639           lockspace to start running (via sysfs) in dlm_ls_start(). */
 640
 641        error = do_uevent(ls, 1);
 642        if (error)
 643                goto out_recoverd;
 644
 645        wait_for_completion(&ls->ls_members_done);
 646        error = ls->ls_members_result;
 647        if (error)
 648                goto out_members;
 649
 650        dlm_create_debug_file(ls);
 651
 652        log_rinfo(ls, "join complete");
 653        *lockspace = ls;
 654        return 0;
 655
 656 out_members:
 657        do_uevent(ls, 0);
 658        dlm_clear_members(ls);
 659        kfree(ls->ls_node_array);
 660 out_recoverd:
 661        dlm_recoverd_stop(ls);
 662 out_callback:
 663        dlm_callback_stop(ls);
 664 out_delist:
 665        spin_lock(&lslist_lock);
 666        list_del(&ls->ls_list);
 667        spin_unlock(&lslist_lock);
 668        idr_destroy(&ls->ls_recover_idr);
 669        kfree(ls->ls_recover_buf);
 670 out_lkbidr:
 671        idr_destroy(&ls->ls_lkbidr);
 672 out_rsbtbl:
 673        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 674                kfree(ls->ls_remove_names[i]);
 675        vfree(ls->ls_rsbtbl);
 676 out_lsfree:
 677        if (do_unreg)
 678                kobject_put(&ls->ls_kobj);
 679        else
 680                kfree(ls);
 681 out:
 682        module_put(THIS_MODULE);
 683        return error;
 684}
 685
 686int dlm_new_lockspace(const char *name, const char *cluster,
 687                      uint32_t flags, int lvblen,
 688                      const struct dlm_lockspace_ops *ops, void *ops_arg,
 689                      int *ops_result, dlm_lockspace_t **lockspace)
 690{
 691        int error = 0;
 692
 693        mutex_lock(&ls_lock);
 694        if (!ls_count)
 695                error = threads_start();
 696        if (error)
 697                goto out;
 698
 699        error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
 700                              ops_result, lockspace);
 701        if (!error)
 702                ls_count++;
 703        if (error > 0)
 704                error = 0;
 705        if (!ls_count) {
 706                dlm_scand_stop();
 707                dlm_midcomms_shutdown();
 708                dlm_lowcomms_stop();
 709        }
 710 out:
 711        mutex_unlock(&ls_lock);
 712        return error;
 713}
 714
 715static int lkb_idr_is_local(int id, void *p, void *data)
 716{
 717        struct dlm_lkb *lkb = p;
 718
 719        return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
 720}
 721
 722static int lkb_idr_is_any(int id, void *p, void *data)
 723{
 724        return 1;
 725}
 726
 727static int lkb_idr_free(int id, void *p, void *data)
 728{
 729        struct dlm_lkb *lkb = p;
 730
 731        if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
 732                dlm_free_lvb(lkb->lkb_lvbptr);
 733
 734        dlm_free_lkb(lkb);
 735        return 0;
 736}
 737
 738/* NOTE: We check the lkbidr here rather than the resource table.
 739   This is because there may be LKBs queued as ASTs that have been unlinked
 740   from their RSBs and are pending deletion once the AST has been delivered */
 741
 742static int lockspace_busy(struct dlm_ls *ls, int force)
 743{
 744        int rv;
 745
 746        spin_lock(&ls->ls_lkbidr_spin);
 747        if (force == 0) {
 748                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
 749        } else if (force == 1) {
 750                rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
 751        } else {
 752                rv = 0;
 753        }
 754        spin_unlock(&ls->ls_lkbidr_spin);
 755        return rv;
 756}
 757
 758static int release_lockspace(struct dlm_ls *ls, int force)
 759{
 760        struct dlm_rsb *rsb;
 761        struct rb_node *n;
 762        int i, busy, rv;
 763
 764        busy = lockspace_busy(ls, force);
 765
 766        spin_lock(&lslist_lock);
 767        if (ls->ls_create_count == 1) {
 768                if (busy) {
 769                        rv = -EBUSY;
 770                } else {
 771                        /* remove_lockspace takes ls off lslist */
 772                        ls->ls_create_count = 0;
 773                        rv = 0;
 774                }
 775        } else if (ls->ls_create_count > 1) {
 776                rv = --ls->ls_create_count;
 777        } else {
 778                rv = -EINVAL;
 779        }
 780        spin_unlock(&lslist_lock);
 781
 782        if (rv) {
 783                log_debug(ls, "release_lockspace no remove %d", rv);
 784                return rv;
 785        }
 786
 787        dlm_device_deregister(ls);
 788
 789        if (force < 3 && dlm_user_daemon_available())
 790                do_uevent(ls, 0);
 791
 792        dlm_recoverd_stop(ls);
 793
 794        if (ls_count == 1) {
 795                dlm_scand_stop();
 796                dlm_clear_members(ls);
 797                dlm_midcomms_shutdown();
 798        }
 799
 800        dlm_callback_stop(ls);
 801
 802        remove_lockspace(ls);
 803
 804        dlm_delete_debug_file(ls);
 805
 806        idr_destroy(&ls->ls_recover_idr);
 807        kfree(ls->ls_recover_buf);
 808
 809        /*
 810         * Free all lkb's in idr
 811         */
 812
 813        idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
 814        idr_destroy(&ls->ls_lkbidr);
 815
 816        /*
 817         * Free all rsb's on rsbtbl[] lists
 818         */
 819
 820        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 821                while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
 822                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 823                        rb_erase(n, &ls->ls_rsbtbl[i].keep);
 824                        dlm_free_rsb(rsb);
 825                }
 826
 827                while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
 828                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
 829                        rb_erase(n, &ls->ls_rsbtbl[i].toss);
 830                        dlm_free_rsb(rsb);
 831                }
 832        }
 833
 834        vfree(ls->ls_rsbtbl);
 835
 836        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
 837                kfree(ls->ls_remove_names[i]);
 838
 839        while (!list_empty(&ls->ls_new_rsb)) {
 840                rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
 841                                       res_hashchain);
 842                list_del(&rsb->res_hashchain);
 843                dlm_free_rsb(rsb);
 844        }
 845
 846        /*
 847         * Free structures on any other lists
 848         */
 849
 850        dlm_purge_requestqueue(ls);
 851        kfree(ls->ls_recover_args);
 852        dlm_clear_members(ls);
 853        dlm_clear_members_gone(ls);
 854        kfree(ls->ls_node_array);
 855        log_rinfo(ls, "release_lockspace final free");
 856        kobject_put(&ls->ls_kobj);
 857        /* The ls structure will be freed when the kobject is done with */
 858
 859        module_put(THIS_MODULE);
 860        return 0;
 861}
 862
 863/*
 864 * Called when a system has released all its locks and is not going to use the
 865 * lockspace any longer.  We free everything we're managing for this lockspace.
 866 * Remaining nodes will go through the recovery process as if we'd died.  The
 867 * lockspace must continue to function as usual, participating in recoveries,
 868 * until this returns.
 869 *
 870 * Force has 4 possible values:
 871 * 0 - don't destroy locksapce if it has any LKBs
 872 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
 873 * 2 - destroy lockspace regardless of LKBs
 874 * 3 - destroy lockspace as part of a forced shutdown
 875 */
 876
 877int dlm_release_lockspace(void *lockspace, int force)
 878{
 879        struct dlm_ls *ls;
 880        int error;
 881
 882        ls = dlm_find_lockspace_local(lockspace);
 883        if (!ls)
 884                return -EINVAL;
 885        dlm_put_lockspace(ls);
 886
 887        mutex_lock(&ls_lock);
 888        error = release_lockspace(ls, force);
 889        if (!error)
 890                ls_count--;
 891        if (!ls_count)
 892                dlm_lowcomms_stop();
 893        mutex_unlock(&ls_lock);
 894
 895        return error;
 896}
 897
 898void dlm_stop_lockspaces(void)
 899{
 900        struct dlm_ls *ls;
 901        int count;
 902
 903 restart:
 904        count = 0;
 905        spin_lock(&lslist_lock);
 906        list_for_each_entry(ls, &lslist, ls_list) {
 907                if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
 908                        count++;
 909                        continue;
 910                }
 911                spin_unlock(&lslist_lock);
 912                log_error(ls, "no userland control daemon, stopping lockspace");
 913                dlm_ls_stop(ls);
 914                goto restart;
 915        }
 916        spin_unlock(&lslist_lock);
 917
 918        if (count)
 919                log_print("dlm user daemon left %d lockspaces", count);
 920}
 921
 922