linux/drivers/staging/lustre/lustre/ldlm/ldlm_lockd.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2010, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * lustre/ldlm/ldlm_lockd.c
  37 *
  38 * Author: Peter Braam <braam@clusterfs.com>
  39 * Author: Phil Schwan <phil@clusterfs.com>
  40 */
  41
  42#define DEBUG_SUBSYSTEM S_LDLM
  43
  44#include "../../include/linux/libcfs/libcfs.h"
  45#include "../include/lustre_dlm.h"
  46#include "../include/obd_class.h"
  47#include <linux/list.h>
  48#include "ldlm_internal.h"
  49
  50static int ldlm_num_threads;
  51module_param(ldlm_num_threads, int, 0444);
  52MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start");
  53
  54static char *ldlm_cpts;
  55module_param(ldlm_cpts, charp, 0444);
  56MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on");
  57
  58extern struct kmem_cache *ldlm_resource_slab;
  59extern struct kmem_cache *ldlm_lock_slab;
  60static struct mutex     ldlm_ref_mutex;
  61static int ldlm_refcount;
  62
  63struct ldlm_cb_async_args {
  64        struct ldlm_cb_set_arg *ca_set_arg;
  65        struct ldlm_lock       *ca_lock;
  66};
  67
  68/* LDLM state */
  69
  70static struct ldlm_state *ldlm_state;
  71
  72inline unsigned long round_timeout(unsigned long timeout)
  73{
  74        return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
  75}
  76
  77/* timeout for initial callback (AST) reply (bz10399) */
  78static inline unsigned int ldlm_get_rq_timeout(void)
  79{
  80        /* Non-AT value */
  81        unsigned int timeout = min(ldlm_timeout, obd_timeout / 3);
  82
  83        return timeout < 1 ? 1 : timeout;
  84}
  85
  86#define ELT_STOPPED   0
  87#define ELT_READY     1
  88#define ELT_TERMINATE 2
  89
  90struct ldlm_bl_pool {
  91        spinlock_t              blp_lock;
  92
  93        /*
  94         * blp_prio_list is used for callbacks that should be handled
  95         * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
  96         * see bug 13843
  97         */
  98        struct list_head              blp_prio_list;
  99
 100        /*
 101         * blp_list is used for all other callbacks which are likely
 102         * to take longer to process.
 103         */
 104        struct list_head              blp_list;
 105
 106        wait_queue_head_t            blp_waitq;
 107        struct completion       blp_comp;
 108        atomic_t            blp_num_threads;
 109        atomic_t            blp_busy_threads;
 110        int                  blp_min_threads;
 111        int                  blp_max_threads;
 112};
 113
 114struct ldlm_bl_work_item {
 115        struct list_head              blwi_entry;
 116        struct ldlm_namespace  *blwi_ns;
 117        struct ldlm_lock_desc   blwi_ld;
 118        struct ldlm_lock       *blwi_lock;
 119        struct list_head              blwi_head;
 120        int                  blwi_count;
 121        struct completion       blwi_comp;
 122        ldlm_cancel_flags_t     blwi_flags;
 123        int                  blwi_mem_pressure;
 124};
 125
 126
 127int ldlm_del_waiting_lock(struct ldlm_lock *lock)
 128{
 129        return 0;
 130}
 131
 132int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
 133{
 134        return 0;
 135}
 136
 137
 138
 139/**
 140 * Callback handler for receiving incoming blocking ASTs.
 141 *
 142 * This can only happen on client side.
 143 */
 144void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
 145                             struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
 146{
 147        int do_ast;
 148
 149        LDLM_DEBUG(lock, "client blocking AST callback handler");
 150
 151        lock_res_and_lock(lock);
 152        lock->l_flags |= LDLM_FL_CBPENDING;
 153
 154        if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
 155                lock->l_flags |= LDLM_FL_CANCEL;
 156
 157        do_ast = (!lock->l_readers && !lock->l_writers);
 158        unlock_res_and_lock(lock);
 159
 160        if (do_ast) {
 161                CDEBUG(D_DLMTRACE, "Lock %p already unused, calling callback (%p)\n",
 162                       lock, lock->l_blocking_ast);
 163                if (lock->l_blocking_ast != NULL)
 164                        lock->l_blocking_ast(lock, ld, lock->l_ast_data,
 165                                             LDLM_CB_BLOCKING);
 166        } else {
 167                CDEBUG(D_DLMTRACE, "Lock %p is referenced, will be cancelled later\n",
 168                       lock);
 169        }
 170
 171        LDLM_DEBUG(lock, "client blocking callback handler END");
 172        LDLM_LOCK_RELEASE(lock);
 173}
 174
 175/**
 176 * Callback handler for receiving incoming completion ASTs.
 177 *
 178 * This only can happen on client side.
 179 */
 180static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
 181                                    struct ldlm_namespace *ns,
 182                                    struct ldlm_request *dlm_req,
 183                                    struct ldlm_lock *lock)
 184{
 185        int lvb_len;
 186        LIST_HEAD(ast_list);
 187        int rc = 0;
 188
 189        LDLM_DEBUG(lock, "client completion callback handler START");
 190
 191        if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
 192                int to = cfs_time_seconds(1);
 193                while (to > 0) {
 194                        set_current_state(TASK_INTERRUPTIBLE);
 195                        schedule_timeout(to);
 196                        if (lock->l_granted_mode == lock->l_req_mode ||
 197                            lock->l_flags & LDLM_FL_DESTROYED)
 198                                break;
 199                }
 200        }
 201
 202        lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
 203        if (lvb_len < 0) {
 204                LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", lvb_len);
 205                rc = lvb_len;
 206                goto out;
 207        } else if (lvb_len > 0) {
 208                if (lock->l_lvb_len > 0) {
 209                        /* for extent lock, lvb contains ost_lvb{}. */
 210                        LASSERT(lock->l_lvb_data != NULL);
 211
 212                        if (unlikely(lock->l_lvb_len < lvb_len)) {
 213                                LDLM_ERROR(lock, "Replied LVB is larger than "
 214                                           "expectation, expected = %d, "
 215                                           "replied = %d",
 216                                           lock->l_lvb_len, lvb_len);
 217                                rc = -EINVAL;
 218                                goto out;
 219                        }
 220                } else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has
 221                                                     * variable length */
 222                        void *lvb_data;
 223
 224                        OBD_ALLOC(lvb_data, lvb_len);
 225                        if (lvb_data == NULL) {
 226                                LDLM_ERROR(lock, "No memory: %d.\n", lvb_len);
 227                                rc = -ENOMEM;
 228                                goto out;
 229                        }
 230
 231                        lock_res_and_lock(lock);
 232                        LASSERT(lock->l_lvb_data == NULL);
 233                        lock->l_lvb_type = LVB_T_LAYOUT;
 234                        lock->l_lvb_data = lvb_data;
 235                        lock->l_lvb_len = lvb_len;
 236                        unlock_res_and_lock(lock);
 237                }
 238        }
 239
 240        lock_res_and_lock(lock);
 241        if ((lock->l_flags & LDLM_FL_DESTROYED) ||
 242            lock->l_granted_mode == lock->l_req_mode) {
 243                /* bug 11300: the lock has already been granted */
 244                unlock_res_and_lock(lock);
 245                LDLM_DEBUG(lock, "Double grant race happened");
 246                rc = 0;
 247                goto out;
 248        }
 249
 250        /* If we receive the completion AST before the actual enqueue returned,
 251         * then we might need to switch lock modes, resources, or extents. */
 252        if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
 253                lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
 254                LDLM_DEBUG(lock, "completion AST, new lock mode");
 255        }
 256
 257        if (lock->l_resource->lr_type != LDLM_PLAIN) {
 258                ldlm_convert_policy_to_local(req->rq_export,
 259                                          dlm_req->lock_desc.l_resource.lr_type,
 260                                          &dlm_req->lock_desc.l_policy_data,
 261                                          &lock->l_policy_data);
 262                LDLM_DEBUG(lock, "completion AST, new policy data");
 263        }
 264
 265        ldlm_resource_unlink_lock(lock);
 266        if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
 267                   &lock->l_resource->lr_name,
 268                   sizeof(lock->l_resource->lr_name)) != 0) {
 269                unlock_res_and_lock(lock);
 270                rc = ldlm_lock_change_resource(ns, lock,
 271                                &dlm_req->lock_desc.l_resource.lr_name);
 272                if (rc < 0) {
 273                        LDLM_ERROR(lock, "Failed to allocate resource");
 274                        goto out;
 275                }
 276                LDLM_DEBUG(lock, "completion AST, new resource");
 277                CERROR("change resource!\n");
 278                lock_res_and_lock(lock);
 279        }
 280
 281        if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
 282                /* BL_AST locks are not needed in LRU.
 283                 * Let ldlm_cancel_lru() be fast. */
 284                ldlm_lock_remove_from_lru(lock);
 285                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
 286                LDLM_DEBUG(lock, "completion AST includes blocking AST");
 287        }
 288
 289        if (lock->l_lvb_len > 0) {
 290                rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_CLIENT,
 291                                   lock->l_lvb_data, lvb_len);
 292                if (rc < 0) {
 293                        unlock_res_and_lock(lock);
 294                        goto out;
 295                }
 296        }
 297
 298        ldlm_grant_lock(lock, &ast_list);
 299        unlock_res_and_lock(lock);
 300
 301        LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
 302
 303        /* Let Enqueue to call osc_lock_upcall() and initialize
 304         * l_ast_data */
 305        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
 306
 307        ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
 308
 309        LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
 310                          lock);
 311        goto out;
 312
 313out:
 314        if (rc < 0) {
 315                lock_res_and_lock(lock);
 316                lock->l_flags |= LDLM_FL_FAILED;
 317                unlock_res_and_lock(lock);
 318                wake_up(&lock->l_waitq);
 319        }
 320        LDLM_LOCK_RELEASE(lock);
 321}
 322
 323/**
 324 * Callback handler for receiving incoming glimpse ASTs.
 325 *
 326 * This only can happen on client side.  After handling the glimpse AST
 327 * we also consider dropping the lock here if it is unused locally for a
 328 * long time.
 329 */
 330static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
 331                                    struct ldlm_namespace *ns,
 332                                    struct ldlm_request *dlm_req,
 333                                    struct ldlm_lock *lock)
 334{
 335        int rc = -ENOSYS;
 336
 337        LDLM_DEBUG(lock, "client glimpse AST callback handler");
 338
 339        if (lock->l_glimpse_ast != NULL)
 340                rc = lock->l_glimpse_ast(lock, req);
 341
 342        if (req->rq_repmsg != NULL) {
 343                ptlrpc_reply(req);
 344        } else {
 345                req->rq_status = rc;
 346                ptlrpc_error(req);
 347        }
 348
 349        lock_res_and_lock(lock);
 350        if (lock->l_granted_mode == LCK_PW &&
 351            !lock->l_readers && !lock->l_writers &&
 352            cfs_time_after(cfs_time_current(),
 353                           cfs_time_add(lock->l_last_used,
 354                                        cfs_time_seconds(10)))) {
 355                unlock_res_and_lock(lock);
 356                if (ldlm_bl_to_thread_lock(ns, NULL, lock))
 357                        ldlm_handle_bl_callback(ns, NULL, lock);
 358
 359                return;
 360        }
 361        unlock_res_and_lock(lock);
 362        LDLM_LOCK_RELEASE(lock);
 363}
 364
 365static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
 366{
 367        if (req->rq_no_reply)
 368                return 0;
 369
 370        req->rq_status = rc;
 371        if (!req->rq_packed_final) {
 372                rc = lustre_pack_reply(req, 1, NULL, NULL);
 373                if (rc)
 374                        return rc;
 375        }
 376        return ptlrpc_reply(req);
 377}
 378
 379static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
 380                               ldlm_cancel_flags_t cancel_flags)
 381{
 382        struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
 383
 384        spin_lock(&blp->blp_lock);
 385        if (blwi->blwi_lock &&
 386            blwi->blwi_lock->l_flags & LDLM_FL_DISCARD_DATA) {
 387                /* add LDLM_FL_DISCARD_DATA requests to the priority list */
 388                list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
 389        } else {
 390                /* other blocking callbacks are added to the regular list */
 391                list_add_tail(&blwi->blwi_entry, &blp->blp_list);
 392        }
 393        spin_unlock(&blp->blp_lock);
 394
 395        wake_up(&blp->blp_waitq);
 396
 397        /* can not check blwi->blwi_flags as blwi could be already freed in
 398           LCF_ASYNC mode */
 399        if (!(cancel_flags & LCF_ASYNC))
 400                wait_for_completion(&blwi->blwi_comp);
 401
 402        return 0;
 403}
 404
 405static inline void init_blwi(struct ldlm_bl_work_item *blwi,
 406                             struct ldlm_namespace *ns,
 407                             struct ldlm_lock_desc *ld,
 408                             struct list_head *cancels, int count,
 409                             struct ldlm_lock *lock,
 410                             ldlm_cancel_flags_t cancel_flags)
 411{
 412        init_completion(&blwi->blwi_comp);
 413        INIT_LIST_HEAD(&blwi->blwi_head);
 414
 415        if (memory_pressure_get())
 416                blwi->blwi_mem_pressure = 1;
 417
 418        blwi->blwi_ns = ns;
 419        blwi->blwi_flags = cancel_flags;
 420        if (ld != NULL)
 421                blwi->blwi_ld = *ld;
 422        if (count) {
 423                list_add(&blwi->blwi_head, cancels);
 424                list_del_init(cancels);
 425                blwi->blwi_count = count;
 426        } else {
 427                blwi->blwi_lock = lock;
 428        }
 429}
 430
 431/**
 432 * Queues a list of locks \a cancels containing \a count locks
 433 * for later processing by a blocking thread.  If \a count is zero,
 434 * then the lock referenced as \a lock is queued instead.
 435 *
 436 * The blocking thread would then call ->l_blocking_ast callback in the lock.
 437 * If list addition fails an error is returned and caller is supposed to
 438 * call ->l_blocking_ast itself.
 439 */
 440static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
 441                             struct ldlm_lock_desc *ld,
 442                             struct ldlm_lock *lock,
 443                             struct list_head *cancels, int count,
 444                             ldlm_cancel_flags_t cancel_flags)
 445{
 446        if (cancels && count == 0)
 447                return 0;
 448
 449        if (cancel_flags & LCF_ASYNC) {
 450                struct ldlm_bl_work_item *blwi;
 451
 452                OBD_ALLOC(blwi, sizeof(*blwi));
 453                if (blwi == NULL)
 454                        return -ENOMEM;
 455                init_blwi(blwi, ns, ld, cancels, count, lock, cancel_flags);
 456
 457                return __ldlm_bl_to_thread(blwi, cancel_flags);
 458        } else {
 459                /* if it is synchronous call do minimum mem alloc, as it could
 460                 * be triggered from kernel shrinker
 461                 */
 462                struct ldlm_bl_work_item blwi;
 463
 464                memset(&blwi, 0, sizeof(blwi));
 465                init_blwi(&blwi, ns, ld, cancels, count, lock, cancel_flags);
 466                return __ldlm_bl_to_thread(&blwi, cancel_flags);
 467        }
 468}
 469
 470
 471int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
 472                           struct ldlm_lock *lock)
 473{
 474        return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
 475}
 476
 477int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
 478                           struct list_head *cancels, int count,
 479                           ldlm_cancel_flags_t cancel_flags)
 480{
 481        return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
 482}
 483
 484/* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
 485static int ldlm_handle_setinfo(struct ptlrpc_request *req)
 486{
 487        struct obd_device *obd = req->rq_export->exp_obd;
 488        char *key;
 489        void *val;
 490        int keylen, vallen;
 491        int rc = -ENOSYS;
 492
 493        DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
 494
 495        req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
 496
 497        key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
 498        if (key == NULL) {
 499                DEBUG_REQ(D_IOCTL, req, "no set_info key");
 500                return -EFAULT;
 501        }
 502        keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
 503                                      RCL_CLIENT);
 504        val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
 505        if (val == NULL) {
 506                DEBUG_REQ(D_IOCTL, req, "no set_info val");
 507                return -EFAULT;
 508        }
 509        vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
 510                                      RCL_CLIENT);
 511
 512        /* We are responsible for swabbing contents of val */
 513
 514        if (KEY_IS(KEY_HSM_COPYTOOL_SEND))
 515                /* Pass it on to mdc (the "export" in this case) */
 516                rc = obd_set_info_async(req->rq_svc_thread->t_env,
 517                                        req->rq_export,
 518                                        sizeof(KEY_HSM_COPYTOOL_SEND),
 519                                        KEY_HSM_COPYTOOL_SEND,
 520                                        vallen, val, NULL);
 521        else
 522                DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key);
 523
 524        return rc;
 525}
 526
 527static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
 528                                        const char *msg, int rc,
 529                                        struct lustre_handle *handle)
 530{
 531        DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
 532                  "%s: [nid %s] [rc %d] [lock %#llx]",
 533                  msg, libcfs_id2str(req->rq_peer), rc,
 534                  handle ? handle->cookie : 0);
 535        if (req->rq_no_reply)
 536                CWARN("No reply was sent, maybe cause bug 21636.\n");
 537        else if (rc)
 538                CWARN("Send reply failed, maybe cause bug 21636.\n");
 539}
 540
 541static int ldlm_handle_qc_callback(struct ptlrpc_request *req)
 542{
 543        struct obd_quotactl *oqctl;
 544        struct client_obd *cli = &req->rq_export->exp_obd->u.cli;
 545
 546        oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
 547        if (oqctl == NULL) {
 548                CERROR("Can't unpack obd_quotactl\n");
 549                return -EPROTO;
 550        }
 551
 552        oqctl->qc_stat = ptlrpc_status_ntoh(oqctl->qc_stat);
 553
 554        cli->cl_qchk_stat = oqctl->qc_stat;
 555        return 0;
 556}
 557
 558/* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
 559static int ldlm_callback_handler(struct ptlrpc_request *req)
 560{
 561        struct ldlm_namespace *ns;
 562        struct ldlm_request *dlm_req;
 563        struct ldlm_lock *lock;
 564        int rc;
 565
 566        /* Requests arrive in sender's byte order.  The ptlrpc service
 567         * handler has already checked and, if necessary, byte-swapped the
 568         * incoming request message body, but I am responsible for the
 569         * message buffers. */
 570
 571        /* do nothing for sec context finalize */
 572        if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI)
 573                return 0;
 574
 575        req_capsule_init(&req->rq_pill, req, RCL_SERVER);
 576
 577        if (req->rq_export == NULL) {
 578                rc = ldlm_callback_reply(req, -ENOTCONN);
 579                ldlm_callback_errmsg(req, "Operate on unconnected server",
 580                                     rc, NULL);
 581                return 0;
 582        }
 583
 584        LASSERT(req->rq_export != NULL);
 585        LASSERT(req->rq_export->exp_obd != NULL);
 586
 587        switch (lustre_msg_get_opc(req->rq_reqmsg)) {
 588        case LDLM_BL_CALLBACK:
 589                if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
 590                        return 0;
 591                break;
 592        case LDLM_CP_CALLBACK:
 593                if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
 594                        return 0;
 595                break;
 596        case LDLM_GL_CALLBACK:
 597                if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
 598                        return 0;
 599                break;
 600        case LDLM_SET_INFO:
 601                rc = ldlm_handle_setinfo(req);
 602                ldlm_callback_reply(req, rc);
 603                return 0;
 604        case OBD_QC_CALLBACK:
 605                req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK);
 606                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET))
 607                        return 0;
 608                rc = ldlm_handle_qc_callback(req);
 609                ldlm_callback_reply(req, rc);
 610                return 0;
 611        default:
 612                CERROR("unknown opcode %u\n",
 613                       lustre_msg_get_opc(req->rq_reqmsg));
 614                ldlm_callback_reply(req, -EPROTO);
 615                return 0;
 616        }
 617
 618        ns = req->rq_export->exp_obd->obd_namespace;
 619        LASSERT(ns != NULL);
 620
 621        req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
 622
 623        dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
 624        if (dlm_req == NULL) {
 625                rc = ldlm_callback_reply(req, -EPROTO);
 626                ldlm_callback_errmsg(req, "Operate without parameter", rc,
 627                                     NULL);
 628                return 0;
 629        }
 630
 631        /* Force a known safe race, send a cancel to the server for a lock
 632         * which the server has already started a blocking callback on. */
 633        if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
 634            lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
 635                rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
 636                if (rc < 0)
 637                        CERROR("ldlm_cli_cancel: %d\n", rc);
 638        }
 639
 640        lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0);
 641        if (!lock) {
 642                CDEBUG(D_DLMTRACE, "callback on lock %#llx - lock "
 643                       "disappeared\n", dlm_req->lock_handle[0].cookie);
 644                rc = ldlm_callback_reply(req, -EINVAL);
 645                ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
 646                                     &dlm_req->lock_handle[0]);
 647                return 0;
 648        }
 649
 650        if ((lock->l_flags & LDLM_FL_FAIL_LOC) &&
 651            lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
 652                OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
 653
 654        /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
 655        lock_res_and_lock(lock);
 656        lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
 657                                              LDLM_AST_FLAGS);
 658        if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
 659                /* If somebody cancels lock and cache is already dropped,
 660                 * or lock is failed before cp_ast received on client,
 661                 * we can tell the server we have no lock. Otherwise, we
 662                 * should send cancel after dropping the cache. */
 663                if (((lock->l_flags & LDLM_FL_CANCELING) &&
 664                    (lock->l_flags & LDLM_FL_BL_DONE)) ||
 665                    (lock->l_flags & LDLM_FL_FAILED)) {
 666                        LDLM_DEBUG(lock, "callback on lock "
 667                                   "%#llx - lock disappeared\n",
 668                                   dlm_req->lock_handle[0].cookie);
 669                        unlock_res_and_lock(lock);
 670                        LDLM_LOCK_RELEASE(lock);
 671                        rc = ldlm_callback_reply(req, -EINVAL);
 672                        ldlm_callback_errmsg(req, "Operate on stale lock", rc,
 673                                             &dlm_req->lock_handle[0]);
 674                        return 0;
 675                }
 676                /* BL_AST locks are not needed in LRU.
 677                 * Let ldlm_cancel_lru() be fast. */
 678                ldlm_lock_remove_from_lru(lock);
 679                lock->l_flags |= LDLM_FL_BL_AST;
 680        }
 681        unlock_res_and_lock(lock);
 682
 683        /* We want the ost thread to get this reply so that it can respond
 684         * to ost requests (write cache writeback) that might be triggered
 685         * in the callback.
 686         *
 687         * But we'd also like to be able to indicate in the reply that we're
 688         * cancelling right now, because it's unused, or have an intent result
 689         * in the reply, so we might have to push the responsibility for sending
 690         * the reply down into the AST handlers, alas. */
 691
 692        switch (lustre_msg_get_opc(req->rq_reqmsg)) {
 693        case LDLM_BL_CALLBACK:
 694                CDEBUG(D_INODE, "blocking ast\n");
 695                req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
 696                if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) {
 697                        rc = ldlm_callback_reply(req, 0);
 698                        if (req->rq_no_reply || rc)
 699                                ldlm_callback_errmsg(req, "Normal process", rc,
 700                                                     &dlm_req->lock_handle[0]);
 701                }
 702                if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
 703                        ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
 704                break;
 705        case LDLM_CP_CALLBACK:
 706                CDEBUG(D_INODE, "completion ast\n");
 707                req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
 708                ldlm_callback_reply(req, 0);
 709                ldlm_handle_cp_callback(req, ns, dlm_req, lock);
 710                break;
 711        case LDLM_GL_CALLBACK:
 712                CDEBUG(D_INODE, "glimpse ast\n");
 713                req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
 714                ldlm_handle_gl_callback(req, ns, dlm_req, lock);
 715                break;
 716        default:
 717                LBUG();                  /* checked above */
 718        }
 719
 720        return 0;
 721}
 722
 723
 724static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
 725{
 726        struct ldlm_bl_work_item *blwi = NULL;
 727        static unsigned int num_bl = 0;
 728
 729        spin_lock(&blp->blp_lock);
 730        /* process a request from the blp_list at least every blp_num_threads */
 731        if (!list_empty(&blp->blp_list) &&
 732            (list_empty(&blp->blp_prio_list) || num_bl == 0))
 733                blwi = list_entry(blp->blp_list.next,
 734                                      struct ldlm_bl_work_item, blwi_entry);
 735        else
 736                if (!list_empty(&blp->blp_prio_list))
 737                        blwi = list_entry(blp->blp_prio_list.next,
 738                                              struct ldlm_bl_work_item,
 739                                              blwi_entry);
 740
 741        if (blwi) {
 742                if (++num_bl >= atomic_read(&blp->blp_num_threads))
 743                        num_bl = 0;
 744                list_del(&blwi->blwi_entry);
 745        }
 746        spin_unlock(&blp->blp_lock);
 747
 748        return blwi;
 749}
 750
 751/* This only contains temporary data until the thread starts */
 752struct ldlm_bl_thread_data {
 753        char                    bltd_name[CFS_CURPROC_COMM_MAX];
 754        struct ldlm_bl_pool     *bltd_blp;
 755        struct completion       bltd_comp;
 756        int                     bltd_num;
 757};
 758
 759static int ldlm_bl_thread_main(void *arg);
 760
 761static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
 762{
 763        struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
 764        struct task_struct *task;
 765
 766        init_completion(&bltd.bltd_comp);
 767        bltd.bltd_num = atomic_read(&blp->blp_num_threads);
 768        snprintf(bltd.bltd_name, sizeof(bltd.bltd_name),
 769                "ldlm_bl_%02d", bltd.bltd_num);
 770        task = kthread_run(ldlm_bl_thread_main, &bltd, "%s", bltd.bltd_name);
 771        if (IS_ERR(task)) {
 772                CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
 773                       atomic_read(&blp->blp_num_threads), PTR_ERR(task));
 774                return PTR_ERR(task);
 775        }
 776        wait_for_completion(&bltd.bltd_comp);
 777
 778        return 0;
 779}
 780
 781/**
 782 * Main blocking requests processing thread.
 783 *
 784 * Callers put locks into its queue by calling ldlm_bl_to_thread.
 785 * This thread in the end ends up doing actual call to ->l_blocking_ast
 786 * for queued locks.
 787 */
 788static int ldlm_bl_thread_main(void *arg)
 789{
 790        struct ldlm_bl_pool *blp;
 791
 792        {
 793                struct ldlm_bl_thread_data *bltd = arg;
 794
 795                blp = bltd->bltd_blp;
 796
 797                atomic_inc(&blp->blp_num_threads);
 798                atomic_inc(&blp->blp_busy_threads);
 799
 800                complete(&bltd->bltd_comp);
 801                /* cannot use bltd after this, it is only on caller's stack */
 802        }
 803
 804        while (1) {
 805                struct l_wait_info lwi = { 0 };
 806                struct ldlm_bl_work_item *blwi = NULL;
 807                int busy;
 808
 809                blwi = ldlm_bl_get_work(blp);
 810
 811                if (blwi == NULL) {
 812                        atomic_dec(&blp->blp_busy_threads);
 813                        l_wait_event_exclusive(blp->blp_waitq,
 814                                         (blwi = ldlm_bl_get_work(blp)) != NULL,
 815                                         &lwi);
 816                        busy = atomic_inc_return(&blp->blp_busy_threads);
 817                } else {
 818                        busy = atomic_read(&blp->blp_busy_threads);
 819                }
 820
 821                if (blwi->blwi_ns == NULL)
 822                        /* added by ldlm_cleanup() */
 823                        break;
 824
 825                /* Not fatal if racy and have a few too many threads */
 826                if (unlikely(busy < blp->blp_max_threads &&
 827                             busy >= atomic_read(&blp->blp_num_threads) &&
 828                             !blwi->blwi_mem_pressure))
 829                        /* discard the return value, we tried */
 830                        ldlm_bl_thread_start(blp);
 831
 832                if (blwi->blwi_mem_pressure)
 833                        memory_pressure_set();
 834
 835                if (blwi->blwi_count) {
 836                        int count;
 837                        /* The special case when we cancel locks in LRU
 838                         * asynchronously, we pass the list of locks here.
 839                         * Thus locks are marked LDLM_FL_CANCELING, but NOT
 840                         * canceled locally yet. */
 841                        count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
 842                                                           blwi->blwi_count,
 843                                                           LCF_BL_AST);
 844                        ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
 845                                             blwi->blwi_flags);
 846                } else {
 847                        ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
 848                                                blwi->blwi_lock);
 849                }
 850                if (blwi->blwi_mem_pressure)
 851                        memory_pressure_clr();
 852
 853                if (blwi->blwi_flags & LCF_ASYNC)
 854                        OBD_FREE(blwi, sizeof(*blwi));
 855                else
 856                        complete(&blwi->blwi_comp);
 857        }
 858
 859        atomic_dec(&blp->blp_busy_threads);
 860        atomic_dec(&blp->blp_num_threads);
 861        complete(&blp->blp_comp);
 862        return 0;
 863}
 864
 865
 866static int ldlm_setup(void);
 867static int ldlm_cleanup(void);
 868
 869int ldlm_get_ref(void)
 870{
 871        int rc = 0;
 872
 873        mutex_lock(&ldlm_ref_mutex);
 874        if (++ldlm_refcount == 1) {
 875                rc = ldlm_setup();
 876                if (rc)
 877                        ldlm_refcount--;
 878        }
 879        mutex_unlock(&ldlm_ref_mutex);
 880
 881        return rc;
 882}
 883EXPORT_SYMBOL(ldlm_get_ref);
 884
 885void ldlm_put_ref(void)
 886{
 887        mutex_lock(&ldlm_ref_mutex);
 888        if (ldlm_refcount == 1) {
 889                int rc = ldlm_cleanup();
 890                if (rc)
 891                        CERROR("ldlm_cleanup failed: %d\n", rc);
 892                else
 893                        ldlm_refcount--;
 894        } else {
 895                ldlm_refcount--;
 896        }
 897        mutex_unlock(&ldlm_ref_mutex);
 898}
 899EXPORT_SYMBOL(ldlm_put_ref);
 900
 901/*
 902 * Export handle<->lock hash operations.
 903 */
 904static unsigned
 905ldlm_export_lock_hash(struct cfs_hash *hs, const void *key, unsigned mask)
 906{
 907        return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask);
 908}
 909
 910static void *
 911ldlm_export_lock_key(struct hlist_node *hnode)
 912{
 913        struct ldlm_lock *lock;
 914
 915        lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
 916        return &lock->l_remote_handle;
 917}
 918
 919static void
 920ldlm_export_lock_keycpy(struct hlist_node *hnode, void *key)
 921{
 922        struct ldlm_lock     *lock;
 923
 924        lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
 925        lock->l_remote_handle = *(struct lustre_handle *)key;
 926}
 927
 928static int
 929ldlm_export_lock_keycmp(const void *key, struct hlist_node *hnode)
 930{
 931        return lustre_handle_equal(ldlm_export_lock_key(hnode), key);
 932}
 933
 934static void *
 935ldlm_export_lock_object(struct hlist_node *hnode)
 936{
 937        return hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
 938}
 939
 940static void
 941ldlm_export_lock_get(struct cfs_hash *hs, struct hlist_node *hnode)
 942{
 943        struct ldlm_lock *lock;
 944
 945        lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
 946        LDLM_LOCK_GET(lock);
 947}
 948
 949static void
 950ldlm_export_lock_put(struct cfs_hash *hs, struct hlist_node *hnode)
 951{
 952        struct ldlm_lock *lock;
 953
 954        lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
 955        LDLM_LOCK_RELEASE(lock);
 956}
 957
 958static cfs_hash_ops_t ldlm_export_lock_ops = {
 959        .hs_hash        = ldlm_export_lock_hash,
 960        .hs_key  = ldlm_export_lock_key,
 961        .hs_keycmp      = ldlm_export_lock_keycmp,
 962        .hs_keycpy      = ldlm_export_lock_keycpy,
 963        .hs_object      = ldlm_export_lock_object,
 964        .hs_get  = ldlm_export_lock_get,
 965        .hs_put  = ldlm_export_lock_put,
 966        .hs_put_locked  = ldlm_export_lock_put,
 967};
 968
 969int ldlm_init_export(struct obd_export *exp)
 970{
 971        int rc;
 972        exp->exp_lock_hash =
 973                cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
 974                                HASH_EXP_LOCK_CUR_BITS,
 975                                HASH_EXP_LOCK_MAX_BITS,
 976                                HASH_EXP_LOCK_BKT_BITS, 0,
 977                                CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
 978                                &ldlm_export_lock_ops,
 979                                CFS_HASH_DEFAULT | CFS_HASH_REHASH_KEY |
 980                                CFS_HASH_NBLK_CHANGE);
 981
 982        if (!exp->exp_lock_hash)
 983                return -ENOMEM;
 984
 985        rc = ldlm_init_flock_export(exp);
 986        if (rc)
 987                goto err;
 988
 989        return 0;
 990err:
 991        ldlm_destroy_export(exp);
 992        return rc;
 993}
 994EXPORT_SYMBOL(ldlm_init_export);
 995
 996void ldlm_destroy_export(struct obd_export *exp)
 997{
 998        cfs_hash_putref(exp->exp_lock_hash);
 999        exp->exp_lock_hash = NULL;
1000
1001        ldlm_destroy_flock_export(exp);
1002}
1003EXPORT_SYMBOL(ldlm_destroy_export);
1004
1005static int ldlm_setup(void)
1006{
1007        static struct ptlrpc_service_conf       conf;
1008        struct ldlm_bl_pool                     *blp = NULL;
1009        int rc = 0;
1010        int i;
1011
1012        if (ldlm_state != NULL)
1013                return -EALREADY;
1014
1015        OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
1016        if (ldlm_state == NULL)
1017                return -ENOMEM;
1018
1019        rc = ldlm_proc_setup();
1020        if (rc != 0)
1021                goto out;
1022
1023        memset(&conf, 0, sizeof(conf));
1024        conf = (typeof(conf)) {
1025                .psc_name               = "ldlm_cbd",
1026                .psc_watchdog_factor    = 2,
1027                .psc_buf                = {
1028                        .bc_nbufs               = LDLM_CLIENT_NBUFS,
1029                        .bc_buf_size            = LDLM_BUFSIZE,
1030                        .bc_req_max_size        = LDLM_MAXREQSIZE,
1031                        .bc_rep_max_size        = LDLM_MAXREPSIZE,
1032                        .bc_req_portal          = LDLM_CB_REQUEST_PORTAL,
1033                        .bc_rep_portal          = LDLM_CB_REPLY_PORTAL,
1034                },
1035                .psc_thr                = {
1036                        .tc_thr_name            = "ldlm_cb",
1037                        .tc_thr_factor          = LDLM_THR_FACTOR,
1038                        .tc_nthrs_init          = LDLM_NTHRS_INIT,
1039                        .tc_nthrs_base          = LDLM_NTHRS_BASE,
1040                        .tc_nthrs_max           = LDLM_NTHRS_MAX,
1041                        .tc_nthrs_user          = ldlm_num_threads,
1042                        .tc_cpu_affinity        = 1,
1043                        .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD,
1044                },
1045                .psc_cpt                = {
1046                        .cc_pattern             = ldlm_cpts,
1047                },
1048                .psc_ops                = {
1049                        .so_req_handler         = ldlm_callback_handler,
1050                },
1051        };
1052        ldlm_state->ldlm_cb_service = \
1053                        ptlrpc_register_service(&conf, ldlm_svc_proc_dir);
1054        if (IS_ERR(ldlm_state->ldlm_cb_service)) {
1055                CERROR("failed to start service\n");
1056                rc = PTR_ERR(ldlm_state->ldlm_cb_service);
1057                ldlm_state->ldlm_cb_service = NULL;
1058                goto out;
1059        }
1060
1061
1062        OBD_ALLOC(blp, sizeof(*blp));
1063        if (blp == NULL) {
1064                rc = -ENOMEM;
1065                goto out;
1066        }
1067        ldlm_state->ldlm_bl_pool = blp;
1068
1069        spin_lock_init(&blp->blp_lock);
1070        INIT_LIST_HEAD(&blp->blp_list);
1071        INIT_LIST_HEAD(&blp->blp_prio_list);
1072        init_waitqueue_head(&blp->blp_waitq);
1073        atomic_set(&blp->blp_num_threads, 0);
1074        atomic_set(&blp->blp_busy_threads, 0);
1075
1076        if (ldlm_num_threads == 0) {
1077                blp->blp_min_threads = LDLM_NTHRS_INIT;
1078                blp->blp_max_threads = LDLM_NTHRS_MAX;
1079        } else {
1080                blp->blp_min_threads = blp->blp_max_threads = \
1081                        min_t(int, LDLM_NTHRS_MAX, max_t(int, LDLM_NTHRS_INIT,
1082                                                         ldlm_num_threads));
1083        }
1084
1085        for (i = 0; i < blp->blp_min_threads; i++) {
1086                rc = ldlm_bl_thread_start(blp);
1087                if (rc < 0)
1088                        goto out;
1089        }
1090
1091
1092        rc = ldlm_pools_init();
1093        if (rc) {
1094                CERROR("Failed to initialize LDLM pools: %d\n", rc);
1095                goto out;
1096        }
1097        return 0;
1098
1099 out:
1100        ldlm_cleanup();
1101        return rc;
1102}
1103
1104static int ldlm_cleanup(void)
1105{
1106        if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
1107            !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
1108                CERROR("ldlm still has namespaces; clean these up first.\n");
1109                ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
1110                ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
1111                return -EBUSY;
1112        }
1113
1114        ldlm_pools_fini();
1115
1116        if (ldlm_state->ldlm_bl_pool != NULL) {
1117                struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1118
1119                while (atomic_read(&blp->blp_num_threads) > 0) {
1120                        struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
1121
1122                        init_completion(&blp->blp_comp);
1123
1124                        spin_lock(&blp->blp_lock);
1125                        list_add_tail(&blwi.blwi_entry, &blp->blp_list);
1126                        wake_up(&blp->blp_waitq);
1127                        spin_unlock(&blp->blp_lock);
1128
1129                        wait_for_completion(&blp->blp_comp);
1130                }
1131
1132                OBD_FREE(blp, sizeof(*blp));
1133        }
1134
1135        if (ldlm_state->ldlm_cb_service != NULL)
1136                ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1137
1138        ldlm_proc_cleanup();
1139
1140
1141        OBD_FREE(ldlm_state, sizeof(*ldlm_state));
1142        ldlm_state = NULL;
1143
1144        return 0;
1145}
1146
1147int ldlm_init(void)
1148{
1149        mutex_init(&ldlm_ref_mutex);
1150        mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
1151        mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
1152        ldlm_resource_slab = kmem_cache_create("ldlm_resources",
1153                                               sizeof(struct ldlm_resource), 0,
1154                                               SLAB_HWCACHE_ALIGN, NULL);
1155        if (ldlm_resource_slab == NULL)
1156                return -ENOMEM;
1157
1158        ldlm_lock_slab = kmem_cache_create("ldlm_locks",
1159                              sizeof(struct ldlm_lock), 0,
1160                              SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU, NULL);
1161        if (ldlm_lock_slab == NULL) {
1162                kmem_cache_destroy(ldlm_resource_slab);
1163                return -ENOMEM;
1164        }
1165
1166        ldlm_interval_slab = kmem_cache_create("interval_node",
1167                                        sizeof(struct ldlm_interval),
1168                                        0, SLAB_HWCACHE_ALIGN, NULL);
1169        if (ldlm_interval_slab == NULL) {
1170                kmem_cache_destroy(ldlm_resource_slab);
1171                kmem_cache_destroy(ldlm_lock_slab);
1172                return -ENOMEM;
1173        }
1174#if LUSTRE_TRACKS_LOCK_EXP_REFS
1175        class_export_dump_hook = ldlm_dump_export_locks;
1176#endif
1177        return 0;
1178}
1179
1180void ldlm_exit(void)
1181{
1182        if (ldlm_refcount)
1183                CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
1184        kmem_cache_destroy(ldlm_resource_slab);
1185        /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
1186         * synchronize_rcu() to wait a grace period elapsed, so that
1187         * ldlm_lock_free() get a chance to be called. */
1188        synchronize_rcu();
1189        kmem_cache_destroy(ldlm_lock_slab);
1190        kmem_cache_destroy(ldlm_interval_slab);
1191}
1192