linux/drivers/staging/lustre/lustre/ldlm/ldlm_request.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2010, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 */
  36/**
  37 * This file contains Asynchronous System Trap (AST) handlers and related
  38 * LDLM request-processing routines.
  39 *
  40 * An AST is a callback issued on a lock when its state is changed. There are
  41 * several different types of ASTs (callbacks) registered for each lock:
  42 *
  43 * - completion AST: when a lock is enqueued by some process, but cannot be
  44 *   granted immediately due to other conflicting locks on the same resource,
  45 *   the completion AST is sent to notify the caller when the lock is
  46 *   eventually granted
  47 *
  48 * - blocking AST: when a lock is granted to some process, if another process
  49 *   enqueues a conflicting (blocking) lock on a resource, a blocking AST is
  50 *   sent to notify the holder(s) of the lock(s) of the conflicting lock
  51 *   request. The lock holder(s) must release their lock(s) on that resource in
  52 *   a timely manner or be evicted by the server.
  53 *
  54 * - glimpse AST: this is used when a process wants information about a lock
  55 *   (i.e. the lock value block (LVB)) but does not necessarily require holding
  56 *   the lock. If the resource is locked, the lock holder(s) are sent glimpse
  57 *   ASTs and the LVB is returned to the caller, and lock holder(s) may CANCEL
  58 *   their lock(s) if they are idle. If the resource is not locked, the server
  59 *   may grant the lock.
  60 */
  61
  62#define DEBUG_SUBSYSTEM S_LDLM
  63
  64#include <lustre_dlm.h>
  65#include <obd_class.h>
  66#include <obd.h>
  67
  68#include "ldlm_internal.h"
  69
  70int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
  71CFS_MODULE_PARM(ldlm_enqueue_min, "i", int, 0644,
  72                "lock enqueue timeout minimum");
  73
  74/* in client side, whether the cached locks will be canceled before replay */
  75unsigned int ldlm_cancel_unused_locks_before_replay = 1;
  76
  77static void interrupted_completion_wait(void *data)
  78{
  79}
  80
  81struct lock_wait_data {
  82        struct ldlm_lock *lwd_lock;
  83        __u32        lwd_conn_cnt;
  84};
  85
  86struct ldlm_async_args {
  87        struct lustre_handle lock_handle;
  88};
  89
  90int ldlm_expired_completion_wait(void *data)
  91{
  92        struct lock_wait_data *lwd = data;
  93        struct ldlm_lock *lock = lwd->lwd_lock;
  94        struct obd_import *imp;
  95        struct obd_device *obd;
  96
  97        ENTRY;
  98        if (lock->l_conn_export == NULL) {
  99                static cfs_time_t next_dump = 0, last_dump = 0;
 100
 101                if (ptlrpc_check_suspend())
 102                        RETURN(0);
 103
 104                LCONSOLE_WARN("lock timed out (enqueued at "CFS_TIME_T", "
 105                              CFS_DURATION_T"s ago)\n",
 106                              lock->l_last_activity,
 107                              cfs_time_sub(cfs_time_current_sec(),
 108                                           lock->l_last_activity));
 109                LDLM_DEBUG(lock, "lock timed out (enqueued at "CFS_TIME_T", "
 110                           CFS_DURATION_T"s ago); not entering recovery in "
 111                           "server code, just going back to sleep",
 112                           lock->l_last_activity,
 113                           cfs_time_sub(cfs_time_current_sec(),
 114                                        lock->l_last_activity));
 115                if (cfs_time_after(cfs_time_current(), next_dump)) {
 116                        last_dump = next_dump;
 117                        next_dump = cfs_time_shift(300);
 118                        ldlm_namespace_dump(D_DLMTRACE,
 119                                            ldlm_lock_to_ns(lock));
 120                        if (last_dump == 0)
 121                                libcfs_debug_dumplog();
 122                }
 123                RETURN(0);
 124        }
 125
 126        obd = lock->l_conn_export->exp_obd;
 127        imp = obd->u.cli.cl_import;
 128        ptlrpc_fail_import(imp, lwd->lwd_conn_cnt);
 129        LDLM_ERROR(lock, "lock timed out (enqueued at "CFS_TIME_T", "
 130                  CFS_DURATION_T"s ago), entering recovery for %s@%s",
 131                  lock->l_last_activity,
 132                  cfs_time_sub(cfs_time_current_sec(), lock->l_last_activity),
 133                  obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);
 134
 135        RETURN(0);
 136}
 137EXPORT_SYMBOL(ldlm_expired_completion_wait);
 138
 139/* We use the same basis for both server side and client side functions
 140   from a single node. */
 141int ldlm_get_enq_timeout(struct ldlm_lock *lock)
 142{
 143        int timeout = at_get(ldlm_lock_to_ns_at(lock));
 144        if (AT_OFF)
 145                return obd_timeout / 2;
 146        /* Since these are non-updating timeouts, we should be conservative.
 147           It would be nice to have some kind of "early reply" mechanism for
 148           lock callbacks too... */
 149        timeout = min_t(int, at_max, timeout + (timeout >> 1)); /* 150% */
 150        return max(timeout, ldlm_enqueue_min);
 151}
 152EXPORT_SYMBOL(ldlm_get_enq_timeout);
 153
 154/**
 155 * Helper function for ldlm_completion_ast(), updating timings when lock is
 156 * actually granted.
 157 */
 158static int ldlm_completion_tail(struct ldlm_lock *lock)
 159{
 160        long delay;
 161        int  result;
 162
 163        if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED) {
 164                LDLM_DEBUG(lock, "client-side enqueue: destroyed");
 165                result = -EIO;
 166        } else {
 167                delay = cfs_time_sub(cfs_time_current_sec(),
 168                                     lock->l_last_activity);
 169                LDLM_DEBUG(lock, "client-side enqueue: granted after "
 170                           CFS_DURATION_T"s", delay);
 171
 172                /* Update our time estimate */
 173                at_measured(ldlm_lock_to_ns_at(lock),
 174                            delay);
 175                result = 0;
 176        }
 177        return result;
 178}
 179
 180/**
 181 * Implementation of ->l_completion_ast() for a client, that doesn't wait
 182 * until lock is granted. Suitable for locks enqueued through ptlrpcd, of
 183 * other threads that cannot block for long.
 184 */
 185int ldlm_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data)
 186{
 187        ENTRY;
 188
 189        if (flags == LDLM_FL_WAIT_NOREPROC) {
 190                LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock");
 191                RETURN(0);
 192        }
 193
 194        if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
 195                       LDLM_FL_BLOCK_CONV))) {
 196                wake_up(&lock->l_waitq);
 197                RETURN(ldlm_completion_tail(lock));
 198        }
 199
 200        LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
 201                   "going forward");
 202        ldlm_reprocess_all(lock->l_resource);
 203        RETURN(0);
 204}
 205EXPORT_SYMBOL(ldlm_completion_ast_async);
 206
 207/**
 208 * Generic LDLM "completion" AST. This is called in several cases:
 209 *
 210 *     - when a reply to an ENQUEUE RPC is received from the server
 211 *       (ldlm_cli_enqueue_fini()). Lock might be granted or not granted at
 212 *       this point (determined by flags);
 213 *
 214 *     - when LDLM_CP_CALLBACK RPC comes to client to notify it that lock has
 215 *       been granted;
 216 *
 217 *     - when ldlm_lock_match(LDLM_FL_LVB_READY) is about to wait until lock
 218 *       gets correct lvb;
 219 *
 220 *     - to force all locks when resource is destroyed (cleanup_resource());
 221 *
 222 *     - during lock conversion (not used currently).
 223 *
 224 * If lock is not granted in the first case, this function waits until second
 225 * or penultimate cases happen in some other thread.
 226 *
 227 */
 228int ldlm_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
 229{
 230        /* XXX ALLOCATE - 160 bytes */
 231        struct lock_wait_data lwd;
 232        struct obd_device *obd;
 233        struct obd_import *imp = NULL;
 234        struct l_wait_info lwi;
 235        __u32 timeout;
 236        int rc = 0;
 237        ENTRY;
 238
 239        if (flags == LDLM_FL_WAIT_NOREPROC) {
 240                LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock");
 241                goto noreproc;
 242        }
 243
 244        if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
 245                       LDLM_FL_BLOCK_CONV))) {
 246                wake_up(&lock->l_waitq);
 247                RETURN(0);
 248        }
 249
 250        LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
 251                   "sleeping");
 252
 253noreproc:
 254
 255        obd = class_exp2obd(lock->l_conn_export);
 256
 257        /* if this is a local lock, then there is no import */
 258        if (obd != NULL) {
 259                imp = obd->u.cli.cl_import;
 260        }
 261
 262        /* Wait a long time for enqueue - server may have to callback a
 263           lock from another client.  Server will evict the other client if it
 264           doesn't respond reasonably, and then give us the lock. */
 265        timeout = ldlm_get_enq_timeout(lock) * 2;
 266
 267        lwd.lwd_lock = lock;
 268
 269        if (lock->l_flags & LDLM_FL_NO_TIMEOUT) {
 270                LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
 271                lwi = LWI_INTR(interrupted_completion_wait, &lwd);
 272        } else {
 273                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
 274                                       ldlm_expired_completion_wait,
 275                                       interrupted_completion_wait, &lwd);
 276        }
 277
 278        if (imp != NULL) {
 279                spin_lock(&imp->imp_lock);
 280                lwd.lwd_conn_cnt = imp->imp_conn_cnt;
 281                spin_unlock(&imp->imp_lock);
 282        }
 283
 284        if (ns_is_client(ldlm_lock_to_ns(lock)) &&
 285            OBD_FAIL_CHECK_RESET(OBD_FAIL_LDLM_INTR_CP_AST,
 286                                 OBD_FAIL_LDLM_CP_BL_RACE | OBD_FAIL_ONCE)) {
 287                lock->l_flags |= LDLM_FL_FAIL_LOC;
 288                rc = -EINTR;
 289        } else {
 290                /* Go to sleep until the lock is granted or cancelled. */
 291                rc = l_wait_event(lock->l_waitq,
 292                                  is_granted_or_cancelled(lock), &lwi);
 293        }
 294
 295        if (rc) {
 296                LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
 297                           rc);
 298                RETURN(rc);
 299        }
 300
 301        RETURN(ldlm_completion_tail(lock));
 302}
 303EXPORT_SYMBOL(ldlm_completion_ast);
 304
 305/**
 306 * A helper to build a blocking AST function
 307 *
 308 * Perform a common operation for blocking ASTs:
 309 * defferred lock cancellation.
 310 *
 311 * \param lock the lock blocking or canceling AST was called on
 312 * \retval 0
 313 * \see mdt_blocking_ast
 314 * \see ldlm_blocking_ast
 315 */
 316int ldlm_blocking_ast_nocheck(struct ldlm_lock *lock)
 317{
 318        int do_ast;
 319        ENTRY;
 320
 321        lock->l_flags |= LDLM_FL_CBPENDING;
 322        do_ast = (!lock->l_readers && !lock->l_writers);
 323        unlock_res_and_lock(lock);
 324
 325        if (do_ast) {
 326                struct lustre_handle lockh;
 327                int rc;
 328
 329                LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
 330                ldlm_lock2handle(lock, &lockh);
 331                rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
 332                if (rc < 0)
 333                        CERROR("ldlm_cli_cancel: %d\n", rc);
 334        } else {
 335                LDLM_DEBUG(lock, "Lock still has references, will be "
 336                           "cancelled later");
 337        }
 338        RETURN(0);
 339}
 340EXPORT_SYMBOL(ldlm_blocking_ast_nocheck);
 341
 342/**
 343 * Server blocking AST
 344 *
 345 * ->l_blocking_ast() callback for LDLM locks acquired by server-side
 346 * OBDs.
 347 *
 348 * \param lock the lock which blocks a request or cancelling lock
 349 * \param desc unused
 350 * \param data unused
 351 * \param flag indicates whether this cancelling or blocking callback
 352 * \retval 0
 353 * \see ldlm_blocking_ast_nocheck
 354 */
 355int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 356                      void *data, int flag)
 357{
 358        ENTRY;
 359
 360        if (flag == LDLM_CB_CANCELING) {
 361                /* Don't need to do anything here. */
 362                RETURN(0);
 363        }
 364
 365        lock_res_and_lock(lock);
 366        /* Get this: if ldlm_blocking_ast is racing with intent_policy, such
 367         * that ldlm_blocking_ast is called just before intent_policy method
 368         * takes the lr_lock, then by the time we get the lock, we might not
 369         * be the correct blocking function anymore.  So check, and return
 370         * early, if so. */
 371        if (lock->l_blocking_ast != ldlm_blocking_ast) {
 372                unlock_res_and_lock(lock);
 373                RETURN(0);
 374        }
 375        RETURN(ldlm_blocking_ast_nocheck(lock));
 376}
 377EXPORT_SYMBOL(ldlm_blocking_ast);
 378
 379/**
 380 * ->l_glimpse_ast() for DLM extent locks acquired on the server-side. See
 381 * comment in filter_intent_policy() on why you may need this.
 382 */
 383int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp)
 384{
 385        /*
 386         * Returning -ELDLM_NO_LOCK_DATA actually works, but the reason for
 387         * that is rather subtle: with OST-side locking, it may so happen that
 388         * _all_ extent locks are held by the OST. If client wants to obtain
 389         * current file size it calls ll{,u}_glimpse_size(), and (as locks are
 390         * on the server), dummy glimpse callback fires and does
 391         * nothing. Client still receives correct file size due to the
 392         * following fragment in filter_intent_policy():
 393         *
 394         * rc = l->l_glimpse_ast(l, NULL); // this will update the LVB
 395         * if (rc != 0 && res->lr_namespace->ns_lvbo &&
 396         *     res->lr_namespace->ns_lvbo->lvbo_update) {
 397         *       res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
 398         * }
 399         *
 400         * that is, after glimpse_ast() fails, filter_lvbo_update() runs, and
 401         * returns correct file size to the client.
 402         */
 403        return -ELDLM_NO_LOCK_DATA;
 404}
 405EXPORT_SYMBOL(ldlm_glimpse_ast);
 406
 407/**
 408 * Enqueue a local lock (typically on a server).
 409 */
 410int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
 411                           const struct ldlm_res_id *res_id,
 412                           ldlm_type_t type, ldlm_policy_data_t *policy,
 413                           ldlm_mode_t mode, __u64 *flags,
 414                           ldlm_blocking_callback blocking,
 415                           ldlm_completion_callback completion,
 416                           ldlm_glimpse_callback glimpse,
 417                           void *data, __u32 lvb_len, enum lvb_type lvb_type,
 418                           const __u64 *client_cookie,
 419                           struct lustre_handle *lockh)
 420{
 421        struct ldlm_lock *lock;
 422        int err;
 423        const struct ldlm_callback_suite cbs = { .lcs_completion = completion,
 424                                                 .lcs_blocking   = blocking,
 425                                                 .lcs_glimpse    = glimpse,
 426        };
 427        ENTRY;
 428
 429        LASSERT(!(*flags & LDLM_FL_REPLAY));
 430        if (unlikely(ns_is_client(ns))) {
 431                CERROR("Trying to enqueue local lock in a shadow namespace\n");
 432                LBUG();
 433        }
 434
 435        lock = ldlm_lock_create(ns, res_id, type, mode, &cbs, data, lvb_len,
 436                                lvb_type);
 437        if (unlikely(!lock))
 438                GOTO(out_nolock, err = -ENOMEM);
 439
 440        ldlm_lock2handle(lock, lockh);
 441
 442        /* NB: we don't have any lock now (lock_res_and_lock)
 443         * because it's a new lock */
 444        ldlm_lock_addref_internal_nolock(lock, mode);
 445        lock->l_flags |= LDLM_FL_LOCAL;
 446        if (*flags & LDLM_FL_ATOMIC_CB)
 447                lock->l_flags |= LDLM_FL_ATOMIC_CB;
 448
 449        if (policy != NULL)
 450                lock->l_policy_data = *policy;
 451        if (client_cookie != NULL)
 452                lock->l_client_cookie = *client_cookie;
 453        if (type == LDLM_EXTENT)
 454                lock->l_req_extent = policy->l_extent;
 455
 456        err = ldlm_lock_enqueue(ns, &lock, policy, flags);
 457        if (unlikely(err != ELDLM_OK))
 458                GOTO(out, err);
 459
 460        if (policy != NULL)
 461                *policy = lock->l_policy_data;
 462
 463        if (lock->l_completion_ast)
 464                lock->l_completion_ast(lock, *flags, NULL);
 465
 466        LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
 467        EXIT;
 468 out:
 469        LDLM_LOCK_RELEASE(lock);
 470 out_nolock:
 471        return err;
 472}
 473EXPORT_SYMBOL(ldlm_cli_enqueue_local);
 474
 475static void failed_lock_cleanup(struct ldlm_namespace *ns,
 476                                struct ldlm_lock *lock, int mode)
 477{
 478        int need_cancel = 0;
 479
 480        /* Set a flag to prevent us from sending a CANCEL (bug 407) */
 481        lock_res_and_lock(lock);
 482        /* Check that lock is not granted or failed, we might race. */
 483        if ((lock->l_req_mode != lock->l_granted_mode) &&
 484            !(lock->l_flags & LDLM_FL_FAILED)) {
 485                /* Make sure that this lock will not be found by raced
 486                 * bl_ast and -EINVAL reply is sent to server anyways.
 487                 * bug 17645 */
 488                lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED |
 489                                 LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING;
 490                need_cancel = 1;
 491        }
 492        unlock_res_and_lock(lock);
 493
 494        if (need_cancel)
 495                LDLM_DEBUG(lock,
 496                           "setting FL_LOCAL_ONLY | LDLM_FL_FAILED | "
 497                           "LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING");
 498        else
 499                LDLM_DEBUG(lock, "lock was granted or failed in race");
 500
 501        ldlm_lock_decref_internal(lock, mode);
 502
 503        /* XXX - HACK because we shouldn't call ldlm_lock_destroy()
 504         *       from llite/file.c/ll_file_flock(). */
 505        /* This code makes for the fact that we do not have blocking handler on
 506         * a client for flock locks. As such this is the place where we must
 507         * completely kill failed locks. (interrupted and those that
 508         * were waiting to be granted when server evicted us. */
 509        if (lock->l_resource->lr_type == LDLM_FLOCK) {
 510                lock_res_and_lock(lock);
 511                ldlm_resource_unlink_lock(lock);
 512                ldlm_lock_destroy_nolock(lock);
 513                unlock_res_and_lock(lock);
 514        }
 515}
 516
 517/**
 518 * Finishing portion of client lock enqueue code.
 519 *
 520 * Called after receiving reply from server.
 521 */
 522int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
 523                          ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode,
 524                          __u64 *flags, void *lvb, __u32 lvb_len,
 525                          struct lustre_handle *lockh,int rc)
 526{
 527        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
 528        int is_replay = *flags & LDLM_FL_REPLAY;
 529        struct ldlm_lock *lock;
 530        struct ldlm_reply *reply;
 531        int cleanup_phase = 1;
 532        int size = 0;
 533        ENTRY;
 534
 535        lock = ldlm_handle2lock(lockh);
 536        /* ldlm_cli_enqueue is holding a reference on this lock. */
 537        if (!lock) {
 538                LASSERT(type == LDLM_FLOCK);
 539                RETURN(-ENOLCK);
 540        }
 541
 542        LASSERTF(ergo(lvb_len != 0, lvb_len == lock->l_lvb_len),
 543                 "lvb_len = %d, l_lvb_len = %d\n", lvb_len, lock->l_lvb_len);
 544
 545        if (rc != ELDLM_OK) {
 546                LASSERT(!is_replay);
 547                LDLM_DEBUG(lock, "client-side enqueue END (%s)",
 548                           rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
 549
 550                if (rc != ELDLM_LOCK_ABORTED)
 551                        GOTO(cleanup, rc);
 552        }
 553
 554        /* Before we return, swab the reply */
 555        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
 556        if (reply == NULL)
 557                GOTO(cleanup, rc = -EPROTO);
 558
 559        if (lvb_len != 0) {
 560                LASSERT(lvb != NULL);
 561
 562                size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB,
 563                                            RCL_SERVER);
 564                if (size < 0) {
 565                        LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", size);
 566                        GOTO(cleanup, rc = size);
 567                } else if (unlikely(size > lvb_len)) {
 568                        LDLM_ERROR(lock, "Replied LVB is larger than "
 569                                   "expectation, expected = %d, replied = %d",
 570                                   lvb_len, size);
 571                        GOTO(cleanup, rc = -EINVAL);
 572                }
 573        }
 574
 575        if (rc == ELDLM_LOCK_ABORTED) {
 576                if (lvb_len != 0)
 577                        rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
 578                                           lvb, size);
 579                GOTO(cleanup, rc = (rc != 0 ? rc : ELDLM_LOCK_ABORTED));
 580        }
 581
 582        /* lock enqueued on the server */
 583        cleanup_phase = 0;
 584
 585        lock_res_and_lock(lock);
 586        /* Key change rehash lock in per-export hash with new key */
 587        if (exp->exp_lock_hash) {
 588                /* In the function below, .hs_keycmp resolves to
 589                 * ldlm_export_lock_keycmp() */
 590                /* coverity[overrun-buffer-val] */
 591                cfs_hash_rehash_key(exp->exp_lock_hash,
 592                                    &lock->l_remote_handle,
 593                                    &reply->lock_handle,
 594                                    &lock->l_exp_hash);
 595        } else {
 596                lock->l_remote_handle = reply->lock_handle;
 597        }
 598
 599        *flags = ldlm_flags_from_wire(reply->lock_flags);
 600        lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
 601                                              LDLM_INHERIT_FLAGS);
 602        /* move NO_TIMEOUT flag to the lock to force ldlm_lock_match()
 603         * to wait with no timeout as well */
 604        lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
 605                                              LDLM_FL_NO_TIMEOUT);
 606        unlock_res_and_lock(lock);
 607
 608        CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: 0x%llx\n",
 609               lock, reply->lock_handle.cookie, *flags);
 610
 611        /* If enqueue returned a blocked lock but the completion handler has
 612         * already run, then it fixed up the resource and we don't need to do it
 613         * again. */
 614        if ((*flags) & LDLM_FL_LOCK_CHANGED) {
 615                int newmode = reply->lock_desc.l_req_mode;
 616                LASSERT(!is_replay);
 617                if (newmode && newmode != lock->l_req_mode) {
 618                        LDLM_DEBUG(lock, "server returned different mode %s",
 619                                   ldlm_lockname[newmode]);
 620                        lock->l_req_mode = newmode;
 621                }
 622
 623                if (memcmp(reply->lock_desc.l_resource.lr_name.name,
 624                          lock->l_resource->lr_name.name,
 625                          sizeof(struct ldlm_res_id))) {
 626                        CDEBUG(D_INFO, "remote intent success, locking "
 627                                        "(%ld,%ld,%ld) instead of "
 628                                        "(%ld,%ld,%ld)\n",
 629                              (long)reply->lock_desc.l_resource.lr_name.name[0],
 630                              (long)reply->lock_desc.l_resource.lr_name.name[1],
 631                              (long)reply->lock_desc.l_resource.lr_name.name[2],
 632                              (long)lock->l_resource->lr_name.name[0],
 633                              (long)lock->l_resource->lr_name.name[1],
 634                              (long)lock->l_resource->lr_name.name[2]);
 635
 636                        rc = ldlm_lock_change_resource(ns, lock,
 637                                        &reply->lock_desc.l_resource.lr_name);
 638                        if (rc || lock->l_resource == NULL)
 639                                GOTO(cleanup, rc = -ENOMEM);
 640                        LDLM_DEBUG(lock, "client-side enqueue, new resource");
 641                }
 642                if (with_policy)
 643                        if (!(type == LDLM_IBITS &&
 644                              !(exp_connect_flags(exp) & OBD_CONNECT_IBITS)))
 645                                /* We assume lock type cannot change on server*/
 646                                ldlm_convert_policy_to_local(exp,
 647                                                lock->l_resource->lr_type,
 648                                                &reply->lock_desc.l_policy_data,
 649                                                &lock->l_policy_data);
 650                if (type != LDLM_PLAIN)
 651                        LDLM_DEBUG(lock,"client-side enqueue, new policy data");
 652        }
 653
 654        if ((*flags) & LDLM_FL_AST_SENT ||
 655            /* Cancel extent locks as soon as possible on a liblustre client,
 656             * because it cannot handle asynchronous ASTs robustly (see
 657             * bug 7311). */
 658            (LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) {
 659                lock_res_and_lock(lock);
 660                lock->l_flags |= LDLM_FL_CBPENDING |  LDLM_FL_BL_AST;
 661                unlock_res_and_lock(lock);
 662                LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
 663        }
 664
 665        /* If the lock has already been granted by a completion AST, don't
 666         * clobber the LVB with an older one. */
 667        if (lvb_len != 0) {
 668                /* We must lock or a racing completion might update lvb without
 669                 * letting us know and we'll clobber the correct value.
 670                 * Cannot unlock after the check either, a that still leaves
 671                 * a tiny window for completion to get in */
 672                lock_res_and_lock(lock);
 673                if (lock->l_req_mode != lock->l_granted_mode)
 674                        rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
 675                                           lock->l_lvb_data, size);
 676                unlock_res_and_lock(lock);
 677                if (rc < 0) {
 678                        cleanup_phase = 1;
 679                        GOTO(cleanup, rc);
 680                }
 681        }
 682
 683        if (!is_replay) {
 684                rc = ldlm_lock_enqueue(ns, &lock, NULL, flags);
 685                if (lock->l_completion_ast != NULL) {
 686                        int err = lock->l_completion_ast(lock, *flags, NULL);
 687                        if (!rc)
 688                                rc = err;
 689                        if (rc)
 690                                cleanup_phase = 1;
 691                }
 692        }
 693
 694        if (lvb_len && lvb != NULL) {
 695                /* Copy the LVB here, and not earlier, because the completion
 696                 * AST (if any) can override what we got in the reply */
 697                memcpy(lvb, lock->l_lvb_data, lvb_len);
 698        }
 699
 700        LDLM_DEBUG(lock, "client-side enqueue END");
 701        EXIT;
 702cleanup:
 703        if (cleanup_phase == 1 && rc)
 704                failed_lock_cleanup(ns, lock, mode);
 705        /* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */
 706        LDLM_LOCK_PUT(lock);
 707        LDLM_LOCK_RELEASE(lock);
 708        return rc;
 709}
 710EXPORT_SYMBOL(ldlm_cli_enqueue_fini);
 711
 712/**
 713 * Estimate number of lock handles that would fit into request of given
 714 * size.  PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into
 715 * a single page on the send/receive side. XXX: 512 should be changed to
 716 * more adequate value.
 717 */
 718static inline int ldlm_req_handles_avail(int req_size, int off)
 719{
 720        int avail;
 721
 722        avail = min_t(int, LDLM_MAXREQSIZE, PAGE_CACHE_SIZE - 512) - req_size;
 723        if (likely(avail >= 0))
 724                avail /= (int)sizeof(struct lustre_handle);
 725        else
 726                avail = 0;
 727        avail += LDLM_LOCKREQ_HANDLES - off;
 728
 729        return avail;
 730}
 731
 732static inline int ldlm_capsule_handles_avail(struct req_capsule *pill,
 733                                             enum req_location loc,
 734                                             int off)
 735{
 736        int size = req_capsule_msg_size(pill, loc);
 737        return ldlm_req_handles_avail(size, off);
 738}
 739
 740static inline int ldlm_format_handles_avail(struct obd_import *imp,
 741                                            const struct req_format *fmt,
 742                                            enum req_location loc, int off)
 743{
 744        int size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc);
 745        return ldlm_req_handles_avail(size, off);
 746}
 747
 748/**
 749 * Cancel LRU locks and pack them into the enqueue request. Pack there the given
 750 * \a count locks in \a cancels.
 751 *
 752 * This is to be called by functions preparing their own requests that
 753 * might contain lists of locks to cancel in addition to actual operation
 754 * that needs to be performed.
 755 */
 756int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
 757                      int version, int opc, int canceloff,
 758                      struct list_head *cancels, int count)
 759{
 760        struct ldlm_namespace   *ns = exp->exp_obd->obd_namespace;
 761        struct req_capsule      *pill = &req->rq_pill;
 762        struct ldlm_request     *dlm = NULL;
 763        int flags, avail, to_free, pack = 0;
 764        LIST_HEAD(head);
 765        int rc;
 766        ENTRY;
 767
 768        if (cancels == NULL)
 769                cancels = &head;
 770        if (ns_connect_cancelset(ns)) {
 771                /* Estimate the amount of available space in the request. */
 772                req_capsule_filled_sizes(pill, RCL_CLIENT);
 773                avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff);
 774
 775                flags = ns_connect_lru_resize(ns) ?
 776                        LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
 777                to_free = !ns_connect_lru_resize(ns) &&
 778                          opc == LDLM_ENQUEUE ? 1 : 0;
 779
 780                /* Cancel LRU locks here _only_ if the server supports
 781                 * EARLY_CANCEL. Otherwise we have to send extra CANCEL
 782                 * RPC, which will make us slower. */
 783                if (avail > count)
 784                        count += ldlm_cancel_lru_local(ns, cancels, to_free,
 785                                                       avail - count, 0, flags);
 786                if (avail > count)
 787                        pack = count;
 788                else
 789                        pack = avail;
 790                req_capsule_set_size(pill, &RMF_DLM_REQ, RCL_CLIENT,
 791                                     ldlm_request_bufsize(pack, opc));
 792        }
 793
 794        rc = ptlrpc_request_pack(req, version, opc);
 795        if (rc) {
 796                ldlm_lock_list_put(cancels, l_bl_ast, count);
 797                RETURN(rc);
 798        }
 799
 800        if (ns_connect_cancelset(ns)) {
 801                if (canceloff) {
 802                        dlm = req_capsule_client_get(pill, &RMF_DLM_REQ);
 803                        LASSERT(dlm);
 804                        /* Skip first lock handler in ldlm_request_pack(),
 805                         * this method will incrment @lock_count according
 806                         * to the lock handle amount actually written to
 807                         * the buffer. */
 808                        dlm->lock_count = canceloff;
 809                }
 810                /* Pack into the request @pack lock handles. */
 811                ldlm_cli_cancel_list(cancels, pack, req, 0);
 812                /* Prepare and send separate cancel RPC for others. */
 813                ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);
 814        } else {
 815                ldlm_lock_list_put(cancels, l_bl_ast, count);
 816        }
 817        RETURN(0);
 818}
 819EXPORT_SYMBOL(ldlm_prep_elc_req);
 820
 821int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req,
 822                          struct list_head *cancels, int count)
 823{
 824        return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
 825                                 LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
 826}
 827EXPORT_SYMBOL(ldlm_prep_enqueue_req);
 828
 829struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, int lvb_len)
 830{
 831        struct ptlrpc_request *req;
 832        int rc;
 833        ENTRY;
 834
 835        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
 836        if (req == NULL)
 837                RETURN(ERR_PTR(-ENOMEM));
 838
 839        rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
 840        if (rc) {
 841                ptlrpc_request_free(req);
 842                RETURN(ERR_PTR(rc));
 843        }
 844
 845        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
 846        ptlrpc_request_set_replen(req);
 847        RETURN(req);
 848}
 849EXPORT_SYMBOL(ldlm_enqueue_pack);
 850
 851/**
 852 * Client-side lock enqueue.
 853 *
 854 * If a request has some specific initialisation it is passed in \a reqp,
 855 * otherwise it is created in ldlm_cli_enqueue.
 856 *
 857 * Supports sync and async requests, pass \a async flag accordingly. If a
 858 * request was created in ldlm_cli_enqueue and it is the async request,
 859 * pass it to the caller in \a reqp.
 860 */
 861int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 862                     struct ldlm_enqueue_info *einfo,
 863                     const struct ldlm_res_id *res_id,
 864                     ldlm_policy_data_t const *policy, __u64 *flags,
 865                     void *lvb, __u32 lvb_len, enum lvb_type lvb_type,
 866                     struct lustre_handle *lockh, int async)
 867{
 868        struct ldlm_namespace *ns;
 869        struct ldlm_lock      *lock;
 870        struct ldlm_request   *body;
 871        int                 is_replay = *flags & LDLM_FL_REPLAY;
 872        int                 req_passed_in = 1;
 873        int                 rc, err;
 874        struct ptlrpc_request *req;
 875        ENTRY;
 876
 877        LASSERT(exp != NULL);
 878
 879        ns = exp->exp_obd->obd_namespace;
 880
 881        /* If we're replaying this lock, just check some invariants.
 882         * If we're creating a new lock, get everything all setup nice. */
 883        if (is_replay) {
 884                lock = ldlm_handle2lock_long(lockh, 0);
 885                LASSERT(lock != NULL);
 886                LDLM_DEBUG(lock, "client-side enqueue START");
 887                LASSERT(exp == lock->l_conn_export);
 888        } else {
 889                const struct ldlm_callback_suite cbs = {
 890                        .lcs_completion = einfo->ei_cb_cp,
 891                        .lcs_blocking   = einfo->ei_cb_bl,
 892                        .lcs_glimpse    = einfo->ei_cb_gl,
 893                        .lcs_weigh      = einfo->ei_cb_wg
 894                };
 895                lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
 896                                        einfo->ei_mode, &cbs, einfo->ei_cbdata,
 897                                        lvb_len, lvb_type);
 898                if (lock == NULL)
 899                        RETURN(-ENOMEM);
 900                /* for the local lock, add the reference */
 901                ldlm_lock_addref_internal(lock, einfo->ei_mode);
 902                ldlm_lock2handle(lock, lockh);
 903                if (policy != NULL) {
 904                        /* INODEBITS_INTEROP: If the server does not support
 905                         * inodebits, we will request a plain lock in the
 906                         * descriptor (ldlm_lock2desc() below) but use an
 907                         * inodebits lock internally with both bits set.
 908                         */
 909                        if (einfo->ei_type == LDLM_IBITS &&
 910                            !(exp_connect_flags(exp) &
 911                              OBD_CONNECT_IBITS))
 912                                lock->l_policy_data.l_inodebits.bits =
 913                                        MDS_INODELOCK_LOOKUP |
 914                                        MDS_INODELOCK_UPDATE;
 915                        else
 916                                lock->l_policy_data = *policy;
 917                }
 918
 919                if (einfo->ei_type == LDLM_EXTENT)
 920                        lock->l_req_extent = policy->l_extent;
 921                LDLM_DEBUG(lock, "client-side enqueue START, flags %llx\n",
 922                           *flags);
 923        }
 924
 925        lock->l_conn_export = exp;
 926        lock->l_export = NULL;
 927        lock->l_blocking_ast = einfo->ei_cb_bl;
 928        lock->l_flags |= (*flags & LDLM_FL_NO_LRU);
 929
 930        /* lock not sent to server yet */
 931
 932        if (reqp == NULL || *reqp == NULL) {
 933                req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
 934                                                &RQF_LDLM_ENQUEUE,
 935                                                LUSTRE_DLM_VERSION,
 936                                                LDLM_ENQUEUE);
 937                if (req == NULL) {
 938                        failed_lock_cleanup(ns, lock, einfo->ei_mode);
 939                        LDLM_LOCK_RELEASE(lock);
 940                        RETURN(-ENOMEM);
 941                }
 942                req_passed_in = 0;
 943                if (reqp)
 944                        *reqp = req;
 945        } else {
 946                int len;
 947
 948                req = *reqp;
 949                len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ,
 950                                           RCL_CLIENT);
 951                LASSERTF(len >= sizeof(*body), "buflen[%d] = %d, not %d\n",
 952                         DLM_LOCKREQ_OFF, len, (int)sizeof(*body));
 953        }
 954
 955        /* Dump lock data into the request buffer */
 956        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
 957        ldlm_lock2desc(lock, &body->lock_desc);
 958        body->lock_flags = ldlm_flags_to_wire(*flags);
 959        body->lock_handle[0] = *lockh;
 960
 961        /* Continue as normal. */
 962        if (!req_passed_in) {
 963                if (lvb_len > 0)
 964                        req_capsule_extend(&req->rq_pill,
 965                                           &RQF_LDLM_ENQUEUE_LVB);
 966                req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
 967                                     lvb_len);
 968                ptlrpc_request_set_replen(req);
 969        }
 970
 971        /*
 972         * Liblustre client doesn't get extent locks, except for O_APPEND case
 973         * where [0, OBD_OBJECT_EOF] lock is taken, or truncate, where
 974         * [i_size, OBD_OBJECT_EOF] lock is taken.
 975         */
 976        LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT ||
 977                     policy->l_extent.end == OBD_OBJECT_EOF));
 978
 979        if (async) {
 980                LASSERT(reqp != NULL);
 981                RETURN(0);
 982        }
 983
 984        LDLM_DEBUG(lock, "sending request");
 985
 986        rc = ptlrpc_queue_wait(req);
 987
 988        err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
 989                                    einfo->ei_mode, flags, lvb, lvb_len,
 990                                    lockh, rc);
 991
 992        /* If ldlm_cli_enqueue_fini did not find the lock, we need to free
 993         * one reference that we took */
 994        if (err == -ENOLCK)
 995                LDLM_LOCK_RELEASE(lock);
 996        else
 997                rc = err;
 998
 999        if (!req_passed_in && req != NULL) {
1000                ptlrpc_req_finished(req);
1001                if (reqp)
1002                        *reqp = NULL;
1003        }
1004
1005        RETURN(rc);
1006}
1007EXPORT_SYMBOL(ldlm_cli_enqueue);
1008
1009static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
1010                                  __u32 *flags)
1011{
1012        struct ldlm_resource *res;
1013        int rc;
1014        ENTRY;
1015        if (ns_is_client(ldlm_lock_to_ns(lock))) {
1016                CERROR("Trying to cancel local lock\n");
1017                LBUG();
1018        }
1019        LDLM_DEBUG(lock, "client-side local convert");
1020
1021        res = ldlm_lock_convert(lock, new_mode, flags);
1022        if (res) {
1023                ldlm_reprocess_all(res);
1024                rc = 0;
1025        } else {
1026                rc = EDEADLOCK;
1027        }
1028        LDLM_DEBUG(lock, "client-side local convert handler END");
1029        LDLM_LOCK_PUT(lock);
1030        RETURN(rc);
1031}
1032
1033/* FIXME: one of ldlm_cli_convert or the server side should reject attempted
1034 * conversion of locks which are on the waiting or converting queue */
1035/* Caller of this code is supposed to take care of lock readers/writers
1036   accounting */
1037int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags)
1038{
1039        struct ldlm_request   *body;
1040        struct ldlm_reply     *reply;
1041        struct ldlm_lock      *lock;
1042        struct ldlm_resource  *res;
1043        struct ptlrpc_request *req;
1044        int                 rc;
1045        ENTRY;
1046
1047        lock = ldlm_handle2lock(lockh);
1048        if (!lock) {
1049                LBUG();
1050                RETURN(-EINVAL);
1051        }
1052        *flags = 0;
1053
1054        if (lock->l_conn_export == NULL)
1055                RETURN(ldlm_cli_convert_local(lock, new_mode, flags));
1056
1057        LDLM_DEBUG(lock, "client-side convert");
1058
1059        req = ptlrpc_request_alloc_pack(class_exp2cliimp(lock->l_conn_export),
1060                                        &RQF_LDLM_CONVERT, LUSTRE_DLM_VERSION,
1061                                        LDLM_CONVERT);
1062        if (req == NULL) {
1063                LDLM_LOCK_PUT(lock);
1064                RETURN(-ENOMEM);
1065        }
1066
1067        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1068        body->lock_handle[0] = lock->l_remote_handle;
1069
1070        body->lock_desc.l_req_mode = new_mode;
1071        body->lock_flags = ldlm_flags_to_wire(*flags);
1072
1073
1074        ptlrpc_request_set_replen(req);
1075        rc = ptlrpc_queue_wait(req);
1076        if (rc != ELDLM_OK)
1077                GOTO(out, rc);
1078
1079        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1080        if (reply == NULL)
1081                GOTO(out, rc = -EPROTO);
1082
1083        if (req->rq_status)
1084                GOTO(out, rc = req->rq_status);
1085
1086        res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
1087        if (res != NULL) {
1088                ldlm_reprocess_all(res);
1089                /* Go to sleep until the lock is granted. */
1090                /* FIXME: or cancelled. */
1091                if (lock->l_completion_ast) {
1092                        rc = lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC,
1093                                                    NULL);
1094                        if (rc)
1095                                GOTO(out, rc);
1096                }
1097        } else {
1098                rc = EDEADLOCK;
1099        }
1100        EXIT;
1101 out:
1102        LDLM_LOCK_PUT(lock);
1103        ptlrpc_req_finished(req);
1104        return rc;
1105}
1106EXPORT_SYMBOL(ldlm_cli_convert);
1107
1108/**
1109 * Cancel locks locally.
1110 * Returns:
1111 * \retval LDLM_FL_LOCAL_ONLY if there is no need for a CANCEL RPC to the server
1112 * \retval LDLM_FL_CANCELING otherwise;
1113 * \retval LDLM_FL_BL_AST if there is a need for a separate CANCEL RPC.
1114 */
1115static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock)
1116{
1117        __u64 rc = LDLM_FL_LOCAL_ONLY;
1118        ENTRY;
1119
1120        if (lock->l_conn_export) {
1121                bool local_only;
1122
1123                LDLM_DEBUG(lock, "client-side cancel");
1124                /* Set this flag to prevent others from getting new references*/
1125                lock_res_and_lock(lock);
1126                lock->l_flags |= LDLM_FL_CBPENDING;
1127                local_only = !!(lock->l_flags &
1128                                (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK));
1129                ldlm_cancel_callback(lock);
1130                rc = (lock->l_flags & LDLM_FL_BL_AST) ?
1131                        LDLM_FL_BL_AST : LDLM_FL_CANCELING;
1132                unlock_res_and_lock(lock);
1133
1134                if (local_only) {
1135                        CDEBUG(D_DLMTRACE, "not sending request (at caller's "
1136                               "instruction)\n");
1137                        rc = LDLM_FL_LOCAL_ONLY;
1138                }
1139                ldlm_lock_cancel(lock);
1140        } else {
1141                if (ns_is_client(ldlm_lock_to_ns(lock))) {
1142                        LDLM_ERROR(lock, "Trying to cancel local lock");
1143                        LBUG();
1144                }
1145                LDLM_DEBUG(lock, "server-side local cancel");
1146                ldlm_lock_cancel(lock);
1147                ldlm_reprocess_all(lock->l_resource);
1148        }
1149
1150        RETURN(rc);
1151}
1152
1153/**
1154 * Pack \a count locks in \a head into ldlm_request buffer of request \a req.
1155 */
1156static void ldlm_cancel_pack(struct ptlrpc_request *req,
1157                             struct list_head *head, int count)
1158{
1159        struct ldlm_request *dlm;
1160        struct ldlm_lock *lock;
1161        int max, packed = 0;
1162        ENTRY;
1163
1164        dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1165        LASSERT(dlm != NULL);
1166
1167        /* Check the room in the request buffer. */
1168        max = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT) -
1169                sizeof(struct ldlm_request);
1170        max /= sizeof(struct lustre_handle);
1171        max += LDLM_LOCKREQ_HANDLES;
1172        LASSERT(max >= dlm->lock_count + count);
1173
1174        /* XXX: it would be better to pack lock handles grouped by resource.
1175         * so that the server cancel would call filter_lvbo_update() less
1176         * frequently. */
1177        list_for_each_entry(lock, head, l_bl_ast) {
1178                if (!count--)
1179                        break;
1180                LASSERT(lock->l_conn_export);
1181                /* Pack the lock handle to the given request buffer. */
1182                LDLM_DEBUG(lock, "packing");
1183                dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle;
1184                packed++;
1185        }
1186        CDEBUG(D_DLMTRACE, "%d locks packed\n", packed);
1187        EXIT;
1188}
1189
1190/**
1191 * Prepare and send a batched cancel RPC. It will include \a count lock
1192 * handles of locks given in \a cancels list. */
1193int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
1194                        int count, ldlm_cancel_flags_t flags)
1195{
1196        struct ptlrpc_request *req = NULL;
1197        struct obd_import *imp;
1198        int free, sent = 0;
1199        int rc = 0;
1200        ENTRY;
1201
1202        LASSERT(exp != NULL);
1203        LASSERT(count > 0);
1204
1205        CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, cfs_fail_val);
1206
1207        if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
1208                RETURN(count);
1209
1210        free = ldlm_format_handles_avail(class_exp2cliimp(exp),
1211                                         &RQF_LDLM_CANCEL, RCL_CLIENT, 0);
1212        if (count > free)
1213                count = free;
1214
1215        while (1) {
1216                imp = class_exp2cliimp(exp);
1217                if (imp == NULL || imp->imp_invalid) {
1218                        CDEBUG(D_DLMTRACE,
1219                               "skipping cancel on invalid import %p\n", imp);
1220                        RETURN(count);
1221                }
1222
1223                req = ptlrpc_request_alloc(imp, &RQF_LDLM_CANCEL);
1224                if (req == NULL)
1225                        GOTO(out, rc = -ENOMEM);
1226
1227                req_capsule_filled_sizes(&req->rq_pill, RCL_CLIENT);
1228                req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
1229                                     ldlm_request_bufsize(count, LDLM_CANCEL));
1230
1231                rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CANCEL);
1232                if (rc) {
1233                        ptlrpc_request_free(req);
1234                        GOTO(out, rc);
1235                }
1236
1237                req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
1238                req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
1239                ptlrpc_at_set_req_timeout(req);
1240
1241                ldlm_cancel_pack(req, cancels, count);
1242
1243                ptlrpc_request_set_replen(req);
1244                if (flags & LCF_ASYNC) {
1245                        ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1246                        sent = count;
1247                        GOTO(out, 0);
1248                } else {
1249                        rc = ptlrpc_queue_wait(req);
1250                }
1251                if (rc == ESTALE) {
1252                        CDEBUG(D_DLMTRACE, "client/server (nid %s) "
1253                               "out of sync -- not fatal\n",
1254                               libcfs_nid2str(req->rq_import->
1255                                              imp_connection->c_peer.nid));
1256                        rc = 0;
1257                } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/
1258                           req->rq_import_generation == imp->imp_generation) {
1259                        ptlrpc_req_finished(req);
1260                        continue;
1261                } else if (rc != ELDLM_OK) {
1262                        /* -ESHUTDOWN is common on umount */
1263                        CDEBUG_LIMIT(rc == -ESHUTDOWN ? D_DLMTRACE : D_ERROR,
1264                                     "Got rc %d from cancel RPC: "
1265                                     "canceling anyway\n", rc);
1266                        break;
1267                }
1268                sent = count;
1269                break;
1270        }
1271
1272        ptlrpc_req_finished(req);
1273        EXIT;
1274out:
1275        return sent ? sent : rc;
1276}
1277EXPORT_SYMBOL(ldlm_cli_cancel_req);
1278
1279static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
1280{
1281        LASSERT(imp != NULL);
1282        return &imp->imp_obd->obd_namespace->ns_pool;
1283}
1284
1285/**
1286 * Update client's OBD pool related fields with new SLV and Limit from \a req.
1287 */
1288int ldlm_cli_update_pool(struct ptlrpc_request *req)
1289{
1290        struct obd_device *obd;
1291        __u64 new_slv;
1292        __u32 new_limit;
1293        ENTRY;
1294        if (unlikely(!req->rq_import || !req->rq_import->imp_obd ||
1295                     !imp_connect_lru_resize(req->rq_import)))
1296        {
1297                /*
1298                 * Do nothing for corner cases.
1299                 */
1300                RETURN(0);
1301        }
1302
1303        /* In some cases RPC may contain SLV and limit zeroed out. This
1304         * is the case when server does not support LRU resize feature.
1305         * This is also possible in some recovery cases when server-side
1306         * reqs have no reference to the OBD export and thus access to
1307         * server-side namespace is not possible. */
1308        if (lustre_msg_get_slv(req->rq_repmsg) == 0 ||
1309            lustre_msg_get_limit(req->rq_repmsg) == 0) {
1310                DEBUG_REQ(D_HA, req, "Zero SLV or Limit found "
1311                          "(SLV: "LPU64", Limit: %u)",
1312                          lustre_msg_get_slv(req->rq_repmsg),
1313                          lustre_msg_get_limit(req->rq_repmsg));
1314                RETURN(0);
1315        }
1316
1317        new_limit = lustre_msg_get_limit(req->rq_repmsg);
1318        new_slv = lustre_msg_get_slv(req->rq_repmsg);
1319        obd = req->rq_import->imp_obd;
1320
1321        /* Set new SLV and limit in OBD fields to make them accessible
1322         * to the pool thread. We do not access obd_namespace and pool
1323         * directly here as there is no reliable way to make sure that
1324         * they are still alive at cleanup time. Evil races are possible
1325         * which may cause Oops at that time. */
1326        write_lock(&obd->obd_pool_lock);
1327        obd->obd_pool_slv = new_slv;
1328        obd->obd_pool_limit = new_limit;
1329        write_unlock(&obd->obd_pool_lock);
1330
1331        RETURN(0);
1332}
1333EXPORT_SYMBOL(ldlm_cli_update_pool);
1334
1335/**
1336 * Client side lock cancel.
1337 *
1338 * Lock must not have any readers or writers by this time.
1339 */
1340int ldlm_cli_cancel(struct lustre_handle *lockh,
1341                    ldlm_cancel_flags_t cancel_flags)
1342{
1343        struct obd_export *exp;
1344        int avail, flags, count = 1;
1345        __u64 rc = 0;
1346        struct ldlm_namespace *ns;
1347        struct ldlm_lock *lock;
1348        LIST_HEAD(cancels);
1349        ENTRY;
1350
1351        /* concurrent cancels on the same handle can happen */
1352        lock = ldlm_handle2lock_long(lockh, LDLM_FL_CANCELING);
1353        if (lock == NULL) {
1354                LDLM_DEBUG_NOLOCK("lock is already being destroyed\n");
1355                RETURN(0);
1356        }
1357
1358        rc = ldlm_cli_cancel_local(lock);
1359        if (rc == LDLM_FL_LOCAL_ONLY) {
1360                LDLM_LOCK_RELEASE(lock);
1361                RETURN(0);
1362        }
1363        /* Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL
1364         * RPC which goes to canceld portal, so we can cancel other LRU locks
1365         * here and send them all as one LDLM_CANCEL RPC. */
1366        LASSERT(list_empty(&lock->l_bl_ast));
1367        list_add(&lock->l_bl_ast, &cancels);
1368
1369        exp = lock->l_conn_export;
1370        if (exp_connect_cancelset(exp)) {
1371                avail = ldlm_format_handles_avail(class_exp2cliimp(exp),
1372                                                  &RQF_LDLM_CANCEL,
1373                                                  RCL_CLIENT, 0);
1374                LASSERT(avail > 0);
1375
1376                ns = ldlm_lock_to_ns(lock);
1377                flags = ns_connect_lru_resize(ns) ?
1378                        LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
1379                count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
1380                                               LCF_BL_AST, flags);
1381        }
1382        ldlm_cli_cancel_list(&cancels, count, NULL, cancel_flags);
1383        RETURN(0);
1384}
1385EXPORT_SYMBOL(ldlm_cli_cancel);
1386
1387/**
1388 * Locally cancel up to \a count locks in list \a cancels.
1389 * Return the number of cancelled locks.
1390 */
1391int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
1392                               ldlm_cancel_flags_t flags)
1393{
1394        LIST_HEAD(head);
1395        struct ldlm_lock *lock, *next;
1396        int left = 0, bl_ast = 0;
1397        __u64 rc;
1398
1399        left = count;
1400        list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
1401                if (left-- == 0)
1402                        break;
1403
1404                if (flags & LCF_LOCAL) {
1405                        rc = LDLM_FL_LOCAL_ONLY;
1406                        ldlm_lock_cancel(lock);
1407                } else {
1408                        rc = ldlm_cli_cancel_local(lock);
1409                }
1410                /* Until we have compound requests and can send LDLM_CANCEL
1411                 * requests batched with generic RPCs, we need to send cancels
1412                 * with the LDLM_FL_BL_AST flag in a separate RPC from
1413                 * the one being generated now. */
1414                if (!(flags & LCF_BL_AST) && (rc == LDLM_FL_BL_AST)) {
1415                        LDLM_DEBUG(lock, "Cancel lock separately");
1416                        list_del_init(&lock->l_bl_ast);
1417                        list_add(&lock->l_bl_ast, &head);
1418                        bl_ast++;
1419                        continue;
1420                }
1421                if (rc == LDLM_FL_LOCAL_ONLY) {
1422                        /* CANCEL RPC should not be sent to server. */
1423                        list_del_init(&lock->l_bl_ast);
1424                        LDLM_LOCK_RELEASE(lock);
1425                        count--;
1426                }
1427        }
1428        if (bl_ast > 0) {
1429                count -= bl_ast;
1430                ldlm_cli_cancel_list(&head, bl_ast, NULL, 0);
1431        }
1432
1433        RETURN(count);
1434}
1435EXPORT_SYMBOL(ldlm_cli_cancel_list_local);
1436
1437/**
1438 * Cancel as many locks as possible w/o sending any RPCs (e.g. to write back
1439 * dirty data, to close a file, ...) or waiting for any RPCs in-flight (e.g.
1440 * readahead requests, ...)
1441 */
1442static ldlm_policy_res_t ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns,
1443                                                    struct ldlm_lock *lock,
1444                                                    int unused, int added,
1445                                                    int count)
1446{
1447        ldlm_policy_res_t result = LDLM_POLICY_CANCEL_LOCK;
1448        ldlm_cancel_for_recovery cb = ns->ns_cancel_for_recovery;
1449        lock_res_and_lock(lock);
1450
1451        /* don't check added & count since we want to process all locks
1452         * from unused list */
1453        switch (lock->l_resource->lr_type) {
1454                case LDLM_EXTENT:
1455                case LDLM_IBITS:
1456                        if (cb && cb(lock))
1457                                break;
1458                default:
1459                        result = LDLM_POLICY_SKIP_LOCK;
1460                        lock->l_flags |= LDLM_FL_SKIPPED;
1461                        break;
1462        }
1463
1464        unlock_res_and_lock(lock);
1465        RETURN(result);
1466}
1467
1468/**
1469 * Callback function for LRU-resize policy. Decides whether to keep
1470 * \a lock in LRU for current \a LRU size \a unused, added in current
1471 * scan \a added and number of locks to be preferably canceled \a count.
1472 *
1473 * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1474 *
1475 * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1476 */
1477static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
1478                                                 struct ldlm_lock *lock,
1479                                                 int unused, int added,
1480                                                 int count)
1481{
1482        cfs_time_t cur = cfs_time_current();
1483        struct ldlm_pool *pl = &ns->ns_pool;
1484        __u64 slv, lvf, lv;
1485        cfs_time_t la;
1486
1487        /* Stop LRU processing when we reach past @count or have checked all
1488         * locks in LRU. */
1489        if (count && added >= count)
1490                return LDLM_POLICY_KEEP_LOCK;
1491
1492        slv = ldlm_pool_get_slv(pl);
1493        lvf = ldlm_pool_get_lvf(pl);
1494        la = cfs_duration_sec(cfs_time_sub(cur,
1495                              lock->l_last_used));
1496        lv = lvf * la * unused;
1497
1498        /* Inform pool about current CLV to see it via proc. */
1499        ldlm_pool_set_clv(pl, lv);
1500
1501        /* Stop when SLV is not yet come from server or lv is smaller than
1502         * it is. */
1503        return (slv == 0 || lv < slv) ?
1504                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1505}
1506
1507/**
1508 * Callback function for proc used policy. Makes decision whether to keep
1509 * \a lock in LRU for current \a LRU size \a unused, added in current scan \a
1510 * added and number of locks to be preferably canceled \a count.
1511 *
1512 * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1513 *
1514 * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1515 */
1516static ldlm_policy_res_t ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
1517                                                   struct ldlm_lock *lock,
1518                                                   int unused, int added,
1519                                                   int count)
1520{
1521        /* Stop LRU processing when we reach past @count or have checked all
1522         * locks in LRU. */
1523        return (added >= count) ?
1524                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1525}
1526
1527/**
1528 * Callback function for aged policy. Makes decision whether to keep \a lock in
1529 * LRU for current LRU size \a unused, added in current scan \a added and
1530 * number of locks to be preferably canceled \a count.
1531 *
1532 * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1533 *
1534 * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1535 */
1536static ldlm_policy_res_t ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
1537                                                 struct ldlm_lock *lock,
1538                                                 int unused, int added,
1539                                                 int count)
1540{
1541        /* Stop LRU processing if young lock is found and we reach past count */
1542        return ((added >= count) &&
1543                cfs_time_before(cfs_time_current(),
1544                                cfs_time_add(lock->l_last_used,
1545                                             ns->ns_max_age))) ?
1546                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1547}
1548
1549/**
1550 * Callback function for default policy. Makes decision whether to keep \a lock
1551 * in LRU for current LRU size \a unused, added in current scan \a added and
1552 * number of locks to be preferably canceled \a count.
1553 *
1554 * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1555 *
1556 * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1557 */
1558static ldlm_policy_res_t ldlm_cancel_default_policy(struct ldlm_namespace *ns,
1559                                                    struct ldlm_lock *lock,
1560                                                    int unused, int added,
1561                                                    int count)
1562{
1563        /* Stop LRU processing when we reach past count or have checked all
1564         * locks in LRU. */
1565        return (added >= count) ?
1566                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1567}
1568
1569typedef ldlm_policy_res_t (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *,
1570                                                      struct ldlm_lock *, int,
1571                                                      int, int);
1572
1573static ldlm_cancel_lru_policy_t
1574ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
1575{
1576        if (flags & LDLM_CANCEL_NO_WAIT)
1577                return ldlm_cancel_no_wait_policy;
1578
1579        if (ns_connect_lru_resize(ns)) {
1580                if (flags & LDLM_CANCEL_SHRINK)
1581                        /* We kill passed number of old locks. */
1582                        return ldlm_cancel_passed_policy;
1583                else if (flags & LDLM_CANCEL_LRUR)
1584                        return ldlm_cancel_lrur_policy;
1585                else if (flags & LDLM_CANCEL_PASSED)
1586                        return ldlm_cancel_passed_policy;
1587        } else {
1588                if (flags & LDLM_CANCEL_AGED)
1589                        return ldlm_cancel_aged_policy;
1590        }
1591
1592        return ldlm_cancel_default_policy;
1593}
1594
1595/**
1596 * - Free space in LRU for \a count new locks,
1597 *   redundant unused locks are canceled locally;
1598 * - also cancel locally unused aged locks;
1599 * - do not cancel more than \a max locks;
1600 * - GET the found locks and add them into the \a cancels list.
1601 *
1602 * A client lock can be added to the l_bl_ast list only when it is
1603 * marked LDLM_FL_CANCELING. Otherwise, somebody is already doing
1604 * CANCEL.  There are the following use cases:
1605 * ldlm_cancel_resource_local(), ldlm_cancel_lru_local() and
1606 * ldlm_cli_cancel(), which check and set this flag properly. As any
1607 * attempt to cancel a lock rely on this flag, l_bl_ast list is accessed
1608 * later without any special locking.
1609 *
1610 * Calling policies for enabled LRU resize:
1611 * ----------------------------------------
1612 * flags & LDLM_CANCEL_LRUR - use LRU resize policy (SLV from server) to
1613 *                          cancel not more than \a count locks;
1614 *
1615 * flags & LDLM_CANCEL_PASSED - cancel \a count number of old locks (located at
1616 *                            the beginning of LRU list);
1617 *
1618 * flags & LDLM_CANCEL_SHRINK - cancel not more than \a count locks according to
1619 *                            memory pressre policy function;
1620 *
1621 * flags & LDLM_CANCEL_AGED - cancel \a count locks according to "aged policy".
1622 *
1623 * flags & LDLM_CANCEL_NO_WAIT - cancel as many unused locks as possible
1624 *                             (typically before replaying locks) w/o
1625 *                             sending any RPCs or waiting for any
1626 *                             outstanding RPC to complete.
1627 */
1628static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, struct list_head *cancels,
1629                                 int count, int max, int flags)
1630{
1631        ldlm_cancel_lru_policy_t pf;
1632        struct ldlm_lock *lock, *next;
1633        int added = 0, unused, remained;
1634        ENTRY;
1635
1636        spin_lock(&ns->ns_lock);
1637        unused = ns->ns_nr_unused;
1638        remained = unused;
1639
1640        if (!ns_connect_lru_resize(ns))
1641                count += unused - ns->ns_max_unused;
1642
1643        pf = ldlm_cancel_lru_policy(ns, flags);
1644        LASSERT(pf != NULL);
1645
1646        while (!list_empty(&ns->ns_unused_list)) {
1647                ldlm_policy_res_t result;
1648
1649                /* all unused locks */
1650                if (remained-- <= 0)
1651                        break;
1652
1653                /* For any flags, stop scanning if @max is reached. */
1654                if (max && added >= max)
1655                        break;
1656
1657                list_for_each_entry_safe(lock, next, &ns->ns_unused_list,
1658                                             l_lru) {
1659                        /* No locks which got blocking requests. */
1660                        LASSERT(!(lock->l_flags & LDLM_FL_BL_AST));
1661
1662                        if (flags & LDLM_CANCEL_NO_WAIT &&
1663                            lock->l_flags & LDLM_FL_SKIPPED)
1664                                /* already processed */
1665                                continue;
1666
1667                        /* Somebody is already doing CANCEL. No need for this
1668                         * lock in LRU, do not traverse it again. */
1669                        if (!(lock->l_flags & LDLM_FL_CANCELING))
1670                                break;
1671
1672                        ldlm_lock_remove_from_lru_nolock(lock);
1673                }
1674                if (&lock->l_lru == &ns->ns_unused_list)
1675                        break;
1676
1677                LDLM_LOCK_GET(lock);
1678                spin_unlock(&ns->ns_lock);
1679                lu_ref_add(&lock->l_reference, __FUNCTION__, current);
1680
1681                /* Pass the lock through the policy filter and see if it
1682                 * should stay in LRU.
1683                 *
1684                 * Even for shrinker policy we stop scanning if
1685                 * we find a lock that should stay in the cache.
1686                 * We should take into account lock age anyway
1687                 * as a new lock is a valuable resource even if
1688                 * it has a low weight.
1689                 *
1690                 * That is, for shrinker policy we drop only
1691                 * old locks, but additionally choose them by
1692                 * their weight. Big extent locks will stay in
1693                 * the cache. */
1694                result = pf(ns, lock, unused, added, count);
1695                if (result == LDLM_POLICY_KEEP_LOCK) {
1696                        lu_ref_del(&lock->l_reference,
1697                                   __FUNCTION__, current);
1698                        LDLM_LOCK_RELEASE(lock);
1699                        spin_lock(&ns->ns_lock);
1700                        break;
1701                }
1702                if (result == LDLM_POLICY_SKIP_LOCK) {
1703                        lu_ref_del(&lock->l_reference,
1704                                   __func__, current);
1705                        LDLM_LOCK_RELEASE(lock);
1706                        spin_lock(&ns->ns_lock);
1707                        continue;
1708                }
1709
1710                lock_res_and_lock(lock);
1711                /* Check flags again under the lock. */
1712                if ((lock->l_flags & LDLM_FL_CANCELING) ||
1713                    (ldlm_lock_remove_from_lru(lock) == 0)) {
1714                        /* Another thread is removing lock from LRU, or
1715                         * somebody is already doing CANCEL, or there
1716                         * is a blocking request which will send cancel
1717                         * by itself, or the lock is no longer unused. */
1718                        unlock_res_and_lock(lock);
1719                        lu_ref_del(&lock->l_reference,
1720                                   __FUNCTION__, current);
1721                        LDLM_LOCK_RELEASE(lock);
1722                        spin_lock(&ns->ns_lock);
1723                        continue;
1724                }
1725                LASSERT(!lock->l_readers && !lock->l_writers);
1726
1727                /* If we have chosen to cancel this lock voluntarily, we
1728                 * better send cancel notification to server, so that it
1729                 * frees appropriate state. This might lead to a race
1730                 * where while we are doing cancel here, server is also
1731                 * silently cancelling this lock. */
1732                lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK;
1733
1734                /* Setting the CBPENDING flag is a little misleading,
1735                 * but prevents an important race; namely, once
1736                 * CBPENDING is set, the lock can accumulate no more
1737                 * readers/writers. Since readers and writers are
1738                 * already zero here, ldlm_lock_decref() won't see
1739                 * this flag and call l_blocking_ast */
1740                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING;
1741
1742                /* We can't re-add to l_lru as it confuses the
1743                 * refcounting in ldlm_lock_remove_from_lru() if an AST
1744                 * arrives after we drop lr_lock below. We use l_bl_ast
1745                 * and can't use l_pending_chain as it is used both on
1746                 * server and client nevertheless bug 5666 says it is
1747                 * used only on server */
1748                LASSERT(list_empty(&lock->l_bl_ast));
1749                list_add(&lock->l_bl_ast, cancels);
1750                unlock_res_and_lock(lock);
1751                lu_ref_del(&lock->l_reference, __FUNCTION__, current);
1752                spin_lock(&ns->ns_lock);
1753                added++;
1754                unused--;
1755        }
1756        spin_unlock(&ns->ns_lock);
1757        RETURN(added);
1758}
1759
1760int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
1761                          int count, int max, ldlm_cancel_flags_t cancel_flags,
1762                          int flags)
1763{
1764        int added;
1765        added = ldlm_prepare_lru_list(ns, cancels, count, max, flags);
1766        if (added <= 0)
1767                return added;
1768        return ldlm_cli_cancel_list_local(cancels, added, cancel_flags);
1769}
1770
1771/**
1772 * Cancel at least \a nr locks from given namespace LRU.
1773 *
1774 * When called with LCF_ASYNC the blocking callback will be handled
1775 * in a thread and this function will return after the thread has been
1776 * asked to call the callback.  When called with LCF_ASYNC the blocking
1777 * callback will be performed in this function.
1778 */
1779int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
1780                    ldlm_cancel_flags_t cancel_flags,
1781                    int flags)
1782{
1783        LIST_HEAD(cancels);
1784        int count, rc;
1785        ENTRY;
1786
1787        /* Just prepare the list of locks, do not actually cancel them yet.
1788         * Locks are cancelled later in a separate thread. */
1789        count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, flags);
1790        rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags);
1791        if (rc == 0)
1792                RETURN(count);
1793
1794        RETURN(0);
1795}
1796
1797/**
1798 * Find and cancel locally unused locks found on resource, matched to the
1799 * given policy, mode. GET the found locks and add them into the \a cancels
1800 * list.
1801 */
1802int ldlm_cancel_resource_local(struct ldlm_resource *res,
1803                               struct list_head *cancels,
1804                               ldlm_policy_data_t *policy,
1805                               ldlm_mode_t mode, int lock_flags,
1806                               ldlm_cancel_flags_t cancel_flags, void *opaque)
1807{
1808        struct ldlm_lock *lock;
1809        int count = 0;
1810        ENTRY;
1811
1812        lock_res(res);
1813        list_for_each_entry(lock, &res->lr_granted, l_res_link) {
1814                if (opaque != NULL && lock->l_ast_data != opaque) {
1815                        LDLM_ERROR(lock, "data %p doesn't match opaque %p",
1816                                   lock->l_ast_data, opaque);
1817                        //LBUG();
1818                        continue;
1819                }
1820
1821                if (lock->l_readers || lock->l_writers)
1822                        continue;
1823
1824                /* If somebody is already doing CANCEL, or blocking AST came,
1825                 * skip this lock. */
1826                if (lock->l_flags & LDLM_FL_BL_AST ||
1827                    lock->l_flags & LDLM_FL_CANCELING)
1828                        continue;
1829
1830                if (lockmode_compat(lock->l_granted_mode, mode))
1831                        continue;
1832
1833                /* If policy is given and this is IBITS lock, add to list only
1834                 * those locks that match by policy. */
1835                if (policy && (lock->l_resource->lr_type == LDLM_IBITS) &&
1836                    !(lock->l_policy_data.l_inodebits.bits &
1837                      policy->l_inodebits.bits))
1838                        continue;
1839
1840                /* See CBPENDING comment in ldlm_cancel_lru */
1841                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
1842                                 lock_flags;
1843
1844                LASSERT(list_empty(&lock->l_bl_ast));
1845                list_add(&lock->l_bl_ast, cancels);
1846                LDLM_LOCK_GET(lock);
1847                count++;
1848        }
1849        unlock_res(res);
1850
1851        RETURN(ldlm_cli_cancel_list_local(cancels, count, cancel_flags));
1852}
1853EXPORT_SYMBOL(ldlm_cancel_resource_local);
1854
1855/**
1856 * Cancel client-side locks from a list and send/prepare cancel RPCs to the
1857 * server.
1858 * If \a req is NULL, send CANCEL request to server with handles of locks
1859 * in the \a cancels. If EARLY_CANCEL is not supported, send CANCEL requests
1860 * separately per lock.
1861 * If \a req is not NULL, put handles of locks in \a cancels into the request
1862 * buffer at the offset \a off.
1863 * Destroy \a cancels at the end.
1864 */
1865int ldlm_cli_cancel_list(struct list_head *cancels, int count,
1866                         struct ptlrpc_request *req, ldlm_cancel_flags_t flags)
1867{
1868        struct ldlm_lock *lock;
1869        int res = 0;
1870        ENTRY;
1871
1872        if (list_empty(cancels) || count == 0)
1873                RETURN(0);
1874
1875        /* XXX: requests (both batched and not) could be sent in parallel.
1876         * Usually it is enough to have just 1 RPC, but it is possible that
1877         * there are too many locks to be cancelled in LRU or on a resource.
1878         * It would also speed up the case when the server does not support
1879         * the feature. */
1880        while (count > 0) {
1881                LASSERT(!list_empty(cancels));
1882                lock = list_entry(cancels->next, struct ldlm_lock,
1883                                      l_bl_ast);
1884                LASSERT(lock->l_conn_export);
1885
1886                if (exp_connect_cancelset(lock->l_conn_export)) {
1887                        res = count;
1888                        if (req)
1889                                ldlm_cancel_pack(req, cancels, count);
1890                        else
1891                                res = ldlm_cli_cancel_req(lock->l_conn_export,
1892                                                          cancels, count,
1893                                                          flags);
1894                } else {
1895                        res = ldlm_cli_cancel_req(lock->l_conn_export,
1896                                                  cancels, 1, flags);
1897                }
1898
1899                if (res < 0) {
1900                        CDEBUG_LIMIT(res == -ESHUTDOWN ? D_DLMTRACE : D_ERROR,
1901                                     "ldlm_cli_cancel_list: %d\n", res);
1902                        res = count;
1903                }
1904
1905                count -= res;
1906                ldlm_lock_list_put(cancels, l_bl_ast, res);
1907        }
1908        LASSERT(count == 0);
1909        RETURN(0);
1910}
1911EXPORT_SYMBOL(ldlm_cli_cancel_list);
1912
1913/**
1914 * Cancel all locks on a resource that have 0 readers/writers.
1915 *
1916 * If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
1917 * to notify the server. */
1918int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
1919                                    const struct ldlm_res_id *res_id,
1920                                    ldlm_policy_data_t *policy,
1921                                    ldlm_mode_t mode,
1922                                    ldlm_cancel_flags_t flags,
1923                                    void *opaque)
1924{
1925        struct ldlm_resource *res;
1926        LIST_HEAD(cancels);
1927        int count;
1928        int rc;
1929        ENTRY;
1930
1931        res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
1932        if (res == NULL) {
1933                /* This is not a problem. */
1934                CDEBUG(D_INFO, "No resource "LPU64"\n", res_id->name[0]);
1935                RETURN(0);
1936        }
1937
1938        LDLM_RESOURCE_ADDREF(res);
1939        count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
1940                                           0, flags | LCF_BL_AST, opaque);
1941        rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags);
1942        if (rc != ELDLM_OK)
1943                CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc);
1944
1945        LDLM_RESOURCE_DELREF(res);
1946        ldlm_resource_putref(res);
1947        RETURN(0);
1948}
1949EXPORT_SYMBOL(ldlm_cli_cancel_unused_resource);
1950
1951struct ldlm_cli_cancel_arg {
1952        int     lc_flags;
1953        void   *lc_opaque;
1954};
1955
1956static int ldlm_cli_hash_cancel_unused(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1957                                       struct hlist_node *hnode, void *arg)
1958{
1959        struct ldlm_resource       *res = cfs_hash_object(hs, hnode);
1960        struct ldlm_cli_cancel_arg     *lc = arg;
1961        int                          rc;
1962
1963        rc = ldlm_cli_cancel_unused_resource(ldlm_res_to_ns(res), &res->lr_name,
1964                                             NULL, LCK_MINMODE,
1965                                             lc->lc_flags, lc->lc_opaque);
1966        if (rc != 0) {
1967                CERROR("ldlm_cli_cancel_unused ("LPU64"): %d\n",
1968                       res->lr_name.name[0], rc);
1969        }
1970        /* must return 0 for hash iteration */
1971        return 0;
1972}
1973
1974/**
1975 * Cancel all locks on a namespace (or a specific resource, if given)
1976 * that have 0 readers/writers.
1977 *
1978 * If flags & LCF_LOCAL, throw the locks away without trying
1979 * to notify the server. */
1980int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
1981                           const struct ldlm_res_id *res_id,
1982                           ldlm_cancel_flags_t flags, void *opaque)
1983{
1984        struct ldlm_cli_cancel_arg arg = {
1985                .lc_flags       = flags,
1986                .lc_opaque      = opaque,
1987        };
1988
1989        ENTRY;
1990
1991        if (ns == NULL)
1992                RETURN(ELDLM_OK);
1993
1994        if (res_id != NULL) {
1995                RETURN(ldlm_cli_cancel_unused_resource(ns, res_id, NULL,
1996                                                       LCK_MINMODE, flags,
1997                                                       opaque));
1998        } else {
1999                cfs_hash_for_each_nolock(ns->ns_rs_hash,
2000                                         ldlm_cli_hash_cancel_unused, &arg);
2001                RETURN(ELDLM_OK);
2002        }
2003}
2004EXPORT_SYMBOL(ldlm_cli_cancel_unused);
2005
2006/* Lock iterators. */
2007
2008int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
2009                          void *closure)
2010{
2011        struct list_head *tmp, *next;
2012        struct ldlm_lock *lock;
2013        int rc = LDLM_ITER_CONTINUE;
2014
2015        ENTRY;
2016
2017        if (!res)
2018                RETURN(LDLM_ITER_CONTINUE);
2019
2020        lock_res(res);
2021        list_for_each_safe(tmp, next, &res->lr_granted) {
2022                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
2023
2024                if (iter(lock, closure) == LDLM_ITER_STOP)
2025                        GOTO(out, rc = LDLM_ITER_STOP);
2026        }
2027
2028        list_for_each_safe(tmp, next, &res->lr_converting) {
2029                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
2030
2031                if (iter(lock, closure) == LDLM_ITER_STOP)
2032                        GOTO(out, rc = LDLM_ITER_STOP);
2033        }
2034
2035        list_for_each_safe(tmp, next, &res->lr_waiting) {
2036                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
2037
2038                if (iter(lock, closure) == LDLM_ITER_STOP)
2039                        GOTO(out, rc = LDLM_ITER_STOP);
2040        }
2041 out:
2042        unlock_res(res);
2043        RETURN(rc);
2044}
2045EXPORT_SYMBOL(ldlm_resource_foreach);
2046
2047struct iter_helper_data {
2048        ldlm_iterator_t iter;
2049        void *closure;
2050};
2051
2052static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure)
2053{
2054        struct iter_helper_data *helper = closure;
2055        return helper->iter(lock, helper->closure);
2056}
2057
2058static int ldlm_res_iter_helper(cfs_hash_t *hs, cfs_hash_bd_t *bd,
2059                                struct hlist_node *hnode, void *arg)
2060
2061{
2062        struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2063
2064        return ldlm_resource_foreach(res, ldlm_iter_helper, arg) ==
2065               LDLM_ITER_STOP;
2066}
2067
2068void ldlm_namespace_foreach(struct ldlm_namespace *ns,
2069                            ldlm_iterator_t iter, void *closure)
2070
2071{
2072        struct iter_helper_data helper = { iter: iter, closure: closure };
2073
2074        cfs_hash_for_each_nolock(ns->ns_rs_hash,
2075                                 ldlm_res_iter_helper, &helper);
2076
2077}
2078EXPORT_SYMBOL(ldlm_namespace_foreach);
2079
2080/* non-blocking function to manipulate a lock whose cb_data is being put away.
2081 * return  0:  find no resource
2082 *       > 0:  must be LDLM_ITER_STOP/LDLM_ITER_CONTINUE.
2083 *       < 0:  errors
2084 */
2085int ldlm_resource_iterate(struct ldlm_namespace *ns,
2086                          const struct ldlm_res_id *res_id,
2087                          ldlm_iterator_t iter, void *data)
2088{
2089        struct ldlm_resource *res;
2090        int rc;
2091        ENTRY;
2092
2093        if (ns == NULL) {
2094                CERROR("must pass in namespace\n");
2095                LBUG();
2096        }
2097
2098        res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
2099        if (res == NULL)
2100                RETURN(0);
2101
2102        LDLM_RESOURCE_ADDREF(res);
2103        rc = ldlm_resource_foreach(res, iter, data);
2104        LDLM_RESOURCE_DELREF(res);
2105        ldlm_resource_putref(res);
2106        RETURN(rc);
2107}
2108EXPORT_SYMBOL(ldlm_resource_iterate);
2109
2110/* Lock replay */
2111
2112static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
2113{
2114        struct list_head *list = closure;
2115
2116        /* we use l_pending_chain here, because it's unused on clients. */
2117        LASSERTF(list_empty(&lock->l_pending_chain),
2118                 "lock %p next %p prev %p\n",
2119                 lock, &lock->l_pending_chain.next,&lock->l_pending_chain.prev);
2120        /* bug 9573: don't replay locks left after eviction, or
2121         * bug 17614: locks being actively cancelled. Get a reference
2122         * on a lock so that it does not disapear under us (e.g. due to cancel)
2123         */
2124        if (!(lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_CANCELING))) {
2125                list_add(&lock->l_pending_chain, list);
2126                LDLM_LOCK_GET(lock);
2127        }
2128
2129        return LDLM_ITER_CONTINUE;
2130}
2131
2132static int replay_lock_interpret(const struct lu_env *env,
2133                                 struct ptlrpc_request *req,
2134                                 struct ldlm_async_args *aa, int rc)
2135{
2136        struct ldlm_lock     *lock;
2137        struct ldlm_reply    *reply;
2138        struct obd_export    *exp;
2139
2140        ENTRY;
2141        atomic_dec(&req->rq_import->imp_replay_inflight);
2142        if (rc != ELDLM_OK)
2143                GOTO(out, rc);
2144
2145
2146        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2147        if (reply == NULL)
2148                GOTO(out, rc = -EPROTO);
2149
2150        lock = ldlm_handle2lock(&aa->lock_handle);
2151        if (!lock) {
2152                CERROR("received replay ack for unknown local cookie "LPX64
2153                       " remote cookie "LPX64 " from server %s id %s\n",
2154                       aa->lock_handle.cookie, reply->lock_handle.cookie,
2155                       req->rq_export->exp_client_uuid.uuid,
2156                       libcfs_id2str(req->rq_peer));
2157                GOTO(out, rc = -ESTALE);
2158        }
2159
2160        /* Key change rehash lock in per-export hash with new key */
2161        exp = req->rq_export;
2162        if (exp && exp->exp_lock_hash) {
2163                /* In the function below, .hs_keycmp resolves to
2164                 * ldlm_export_lock_keycmp() */
2165                /* coverity[overrun-buffer-val] */
2166                cfs_hash_rehash_key(exp->exp_lock_hash,
2167                                    &lock->l_remote_handle,
2168                                    &reply->lock_handle,
2169                                    &lock->l_exp_hash);
2170        } else {
2171                lock->l_remote_handle = reply->lock_handle;
2172        }
2173
2174        LDLM_DEBUG(lock, "replayed lock:");
2175        ptlrpc_import_recovery_state_machine(req->rq_import);
2176        LDLM_LOCK_PUT(lock);
2177out:
2178        if (rc != ELDLM_OK)
2179                ptlrpc_connect_import(req->rq_import);
2180
2181        RETURN(rc);
2182}
2183
2184static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
2185{
2186        struct ptlrpc_request *req;
2187        struct ldlm_async_args *aa;
2188        struct ldlm_request   *body;
2189        int flags;
2190        ENTRY;
2191
2192
2193        /* Bug 11974: Do not replay a lock which is actively being canceled */
2194        if (lock->l_flags & LDLM_FL_CANCELING) {
2195                LDLM_DEBUG(lock, "Not replaying canceled lock:");
2196                RETURN(0);
2197        }
2198
2199        /* If this is reply-less callback lock, we cannot replay it, since
2200         * server might have long dropped it, but notification of that event was
2201         * lost by network. (and server granted conflicting lock already) */
2202        if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) {
2203                LDLM_DEBUG(lock, "Not replaying reply-less lock:");
2204                ldlm_lock_cancel(lock);
2205                RETURN(0);
2206        }
2207
2208        /*
2209         * If granted mode matches the requested mode, this lock is granted.
2210         *
2211         * If they differ, but we have a granted mode, then we were granted
2212         * one mode and now want another: ergo, converting.
2213         *
2214         * If we haven't been granted anything and are on a resource list,
2215         * then we're blocked/waiting.
2216         *
2217         * If we haven't been granted anything and we're NOT on a resource list,
2218         * then we haven't got a reply yet and don't have a known disposition.
2219         * This happens whenever a lock enqueue is the request that triggers
2220         * recovery.
2221         */
2222        if (lock->l_granted_mode == lock->l_req_mode)
2223                flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED;
2224        else if (lock->l_granted_mode)
2225                flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_CONV;
2226        else if (!list_empty(&lock->l_res_link))
2227                flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT;
2228        else
2229                flags = LDLM_FL_REPLAY;
2230
2231        req = ptlrpc_request_alloc_pack(imp, &RQF_LDLM_ENQUEUE,
2232                                        LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2233        if (req == NULL)
2234                RETURN(-ENOMEM);
2235
2236        /* We're part of recovery, so don't wait for it. */
2237        req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
2238
2239        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2240        ldlm_lock2desc(lock, &body->lock_desc);
2241        body->lock_flags = ldlm_flags_to_wire(flags);
2242
2243        ldlm_lock2handle(lock, &body->lock_handle[0]);
2244        if (lock->l_lvb_len > 0)
2245                req_capsule_extend(&req->rq_pill, &RQF_LDLM_ENQUEUE_LVB);
2246        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2247                             lock->l_lvb_len);
2248        ptlrpc_request_set_replen(req);
2249        /* notify the server we've replayed all requests.
2250         * also, we mark the request to be put on a dedicated
2251         * queue to be processed after all request replayes.
2252         * bug 6063 */
2253        lustre_msg_set_flags(req->rq_reqmsg, MSG_REQ_REPLAY_DONE);
2254
2255        LDLM_DEBUG(lock, "replaying lock:");
2256
2257        atomic_inc(&req->rq_import->imp_replay_inflight);
2258        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2259        aa = ptlrpc_req_async_args(req);
2260        aa->lock_handle = body->lock_handle[0];
2261        req->rq_interpret_reply = (ptlrpc_interpterer_t)replay_lock_interpret;
2262        ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
2263
2264        RETURN(0);
2265}
2266
2267/**
2268 * Cancel as many unused locks as possible before replay. since we are
2269 * in recovery, we can't wait for any outstanding RPCs to send any RPC
2270 * to the server.
2271 *
2272 * Called only in recovery before replaying locks. there is no need to
2273 * replay locks that are unused. since the clients may hold thousands of
2274 * cached unused locks, dropping the unused locks can greatly reduce the
2275 * load on the servers at recovery time.
2276 */
2277static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
2278{
2279        int canceled;
2280        LIST_HEAD(cancels);
2281
2282        CDEBUG(D_DLMTRACE, "Dropping as many unused locks as possible before"
2283                           "replay for namespace %s (%d)\n",
2284                           ldlm_ns_name(ns), ns->ns_nr_unused);
2285
2286        /* We don't need to care whether or not LRU resize is enabled
2287         * because the LDLM_CANCEL_NO_WAIT policy doesn't use the
2288         * count parameter */
2289        canceled = ldlm_cancel_lru_local(ns, &cancels, ns->ns_nr_unused, 0,
2290                                         LCF_LOCAL, LDLM_CANCEL_NO_WAIT);
2291
2292        CDEBUG(D_DLMTRACE, "Canceled %d unused locks from namespace %s\n",
2293                           canceled, ldlm_ns_name(ns));
2294}
2295
2296int ldlm_replay_locks(struct obd_import *imp)
2297{
2298        struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
2299        LIST_HEAD(list);
2300        struct ldlm_lock *lock, *next;
2301        int rc = 0;
2302
2303        ENTRY;
2304
2305        LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
2306
2307        /* don't replay locks if import failed recovery */
2308        if (imp->imp_vbr_failed)
2309                RETURN(0);
2310
2311        /* ensure this doesn't fall to 0 before all have been queued */
2312        atomic_inc(&imp->imp_replay_inflight);
2313
2314        if (ldlm_cancel_unused_locks_before_replay)
2315                ldlm_cancel_unused_locks_for_replay(ns);
2316
2317        ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
2318
2319        list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
2320                list_del_init(&lock->l_pending_chain);
2321                if (rc) {
2322                        LDLM_LOCK_RELEASE(lock);
2323                        continue; /* or try to do the rest? */
2324                }
2325                rc = replay_one_lock(imp, lock);
2326                LDLM_LOCK_RELEASE(lock);
2327        }
2328
2329        atomic_dec(&imp->imp_replay_inflight);
2330
2331        RETURN(rc);
2332}
2333EXPORT_SYMBOL(ldlm_replay_locks);
2334