linux/drivers/staging/lustre/lustre/ldlm/ldlm_request.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2010, 2015, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 */
  36/**
  37 * This file contains Asynchronous System Trap (AST) handlers and related
  38 * LDLM request-processing routines.
  39 *
  40 * An AST is a callback issued on a lock when its state is changed. There are
  41 * several different types of ASTs (callbacks) registered for each lock:
  42 *
  43 * - completion AST: when a lock is enqueued by some process, but cannot be
  44 *   granted immediately due to other conflicting locks on the same resource,
  45 *   the completion AST is sent to notify the caller when the lock is
  46 *   eventually granted
  47 *
  48 * - blocking AST: when a lock is granted to some process, if another process
  49 *   enqueues a conflicting (blocking) lock on a resource, a blocking AST is
  50 *   sent to notify the holder(s) of the lock(s) of the conflicting lock
  51 *   request. The lock holder(s) must release their lock(s) on that resource in
  52 *   a timely manner or be evicted by the server.
  53 *
  54 * - glimpse AST: this is used when a process wants information about a lock
  55 *   (i.e. the lock value block (LVB)) but does not necessarily require holding
  56 *   the lock. If the resource is locked, the lock holder(s) are sent glimpse
  57 *   ASTs and the LVB is returned to the caller, and lock holder(s) may CANCEL
  58 *   their lock(s) if they are idle. If the resource is not locked, the server
  59 *   may grant the lock.
  60 */
  61
  62#define DEBUG_SUBSYSTEM S_LDLM
  63
  64#include "../include/lustre_dlm.h"
  65#include "../include/obd_class.h"
  66#include "../include/obd.h"
  67
  68#include "ldlm_internal.h"
  69
  70int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
  71module_param(ldlm_enqueue_min, int, 0644);
  72MODULE_PARM_DESC(ldlm_enqueue_min, "lock enqueue timeout minimum");
  73
  74/* in client side, whether the cached locks will be canceled before replay */
  75unsigned int ldlm_cancel_unused_locks_before_replay = 1;
  76
  77static void interrupted_completion_wait(void *data)
  78{
  79}
  80
  81struct lock_wait_data {
  82        struct ldlm_lock *lwd_lock;
  83        __u32        lwd_conn_cnt;
  84};
  85
  86struct ldlm_async_args {
  87        struct lustre_handle lock_handle;
  88};
  89
  90static int ldlm_expired_completion_wait(void *data)
  91{
  92        struct lock_wait_data *lwd = data;
  93        struct ldlm_lock *lock = lwd->lwd_lock;
  94        struct obd_import *imp;
  95        struct obd_device *obd;
  96
  97        if (!lock->l_conn_export) {
  98                static unsigned long next_dump, last_dump;
  99
 100                LCONSOLE_WARN("lock timed out (enqueued at %lld, %llds ago)\n",
 101                              (s64)lock->l_last_activity,
 102                              (s64)(ktime_get_real_seconds() -
 103                                    lock->l_last_activity));
 104                LDLM_DEBUG(lock, "lock timed out (enqueued at %lld, %llds ago); not entering recovery in server code, just going back to sleep",
 105                           (s64)lock->l_last_activity,
 106                           (s64)(ktime_get_real_seconds() -
 107                                 lock->l_last_activity));
 108                if (cfs_time_after(cfs_time_current(), next_dump)) {
 109                        last_dump = next_dump;
 110                        next_dump = cfs_time_shift(300);
 111                        ldlm_namespace_dump(D_DLMTRACE,
 112                                            ldlm_lock_to_ns(lock));
 113                        if (last_dump == 0)
 114                                libcfs_debug_dumplog();
 115                }
 116                return 0;
 117        }
 118
 119        obd = lock->l_conn_export->exp_obd;
 120        imp = obd->u.cli.cl_import;
 121        ptlrpc_fail_import(imp, lwd->lwd_conn_cnt);
 122        LDLM_ERROR(lock, "lock timed out (enqueued at %lld, %llds ago), entering recovery for %s@%s",
 123                   (s64)lock->l_last_activity,
 124                   (s64)(ktime_get_real_seconds() - lock->l_last_activity),
 125                   obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);
 126
 127        return 0;
 128}
 129
 130/* We use the same basis for both server side and client side functions
 131 * from a single node.
 132 */
 133static int ldlm_get_enq_timeout(struct ldlm_lock *lock)
 134{
 135        int timeout = at_get(ldlm_lock_to_ns_at(lock));
 136
 137        if (AT_OFF)
 138                return obd_timeout / 2;
 139        /* Since these are non-updating timeouts, we should be conservative.
 140         * It would be nice to have some kind of "early reply" mechanism for
 141         * lock callbacks too...
 142         */
 143        timeout = min_t(int, at_max, timeout + (timeout >> 1)); /* 150% */
 144        return max(timeout, ldlm_enqueue_min);
 145}
 146
 147/**
 148 * Helper function for ldlm_completion_ast(), updating timings when lock is
 149 * actually granted.
 150 */
 151static int ldlm_completion_tail(struct ldlm_lock *lock)
 152{
 153        long delay;
 154        int  result;
 155
 156        if (lock->l_flags & (LDLM_FL_DESTROYED | LDLM_FL_FAILED)) {
 157                LDLM_DEBUG(lock, "client-side enqueue: destroyed");
 158                result = -EIO;
 159        } else {
 160                delay = ktime_get_real_seconds() - lock->l_last_activity;
 161                LDLM_DEBUG(lock, "client-side enqueue: granted after %lds",
 162                           delay);
 163
 164                /* Update our time estimate */
 165                at_measured(ldlm_lock_to_ns_at(lock),
 166                            delay);
 167                result = 0;
 168        }
 169        return result;
 170}
 171
 172/**
 173 * Implementation of ->l_completion_ast() for a client, that doesn't wait
 174 * until lock is granted. Suitable for locks enqueued through ptlrpcd, of
 175 * other threads that cannot block for long.
 176 */
 177int ldlm_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data)
 178{
 179        if (flags == LDLM_FL_WAIT_NOREPROC) {
 180                LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock");
 181                return 0;
 182        }
 183
 184        if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
 185                       LDLM_FL_BLOCK_CONV))) {
 186                wake_up(&lock->l_waitq);
 187                return ldlm_completion_tail(lock);
 188        }
 189
 190        LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, going forward");
 191        return 0;
 192}
 193EXPORT_SYMBOL(ldlm_completion_ast_async);
 194
 195/**
 196 * Generic LDLM "completion" AST. This is called in several cases:
 197 *
 198 *     - when a reply to an ENQUEUE RPC is received from the server
 199 *       (ldlm_cli_enqueue_fini()). Lock might be granted or not granted at
 200 *       this point (determined by flags);
 201 *
 202 *     - when LDLM_CP_CALLBACK RPC comes to client to notify it that lock has
 203 *       been granted;
 204 *
 205 *     - when ldlm_lock_match(LDLM_FL_LVB_READY) is about to wait until lock
 206 *       gets correct lvb;
 207 *
 208 *     - to force all locks when resource is destroyed (cleanup_resource());
 209 *
 210 *     - during lock conversion (not used currently).
 211 *
 212 * If lock is not granted in the first case, this function waits until second
 213 * or penultimate cases happen in some other thread.
 214 *
 215 */
 216int ldlm_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
 217{
 218        /* XXX ALLOCATE - 160 bytes */
 219        struct lock_wait_data lwd;
 220        struct obd_device *obd;
 221        struct obd_import *imp = NULL;
 222        struct l_wait_info lwi;
 223        __u32 timeout;
 224        int rc = 0;
 225
 226        if (flags == LDLM_FL_WAIT_NOREPROC) {
 227                LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock");
 228                goto noreproc;
 229        }
 230
 231        if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
 232                       LDLM_FL_BLOCK_CONV))) {
 233                wake_up(&lock->l_waitq);
 234                return 0;
 235        }
 236
 237        LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, sleeping");
 238
 239noreproc:
 240
 241        obd = class_exp2obd(lock->l_conn_export);
 242
 243        /* if this is a local lock, then there is no import */
 244        if (obd)
 245                imp = obd->u.cli.cl_import;
 246
 247        /* Wait a long time for enqueue - server may have to callback a
 248         * lock from another client.  Server will evict the other client if it
 249         * doesn't respond reasonably, and then give us the lock.
 250         */
 251        timeout = ldlm_get_enq_timeout(lock) * 2;
 252
 253        lwd.lwd_lock = lock;
 254
 255        if (lock->l_flags & LDLM_FL_NO_TIMEOUT) {
 256                LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
 257                lwi = LWI_INTR(interrupted_completion_wait, &lwd);
 258        } else {
 259                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
 260                                       ldlm_expired_completion_wait,
 261                                       interrupted_completion_wait, &lwd);
 262        }
 263
 264        if (imp) {
 265                spin_lock(&imp->imp_lock);
 266                lwd.lwd_conn_cnt = imp->imp_conn_cnt;
 267                spin_unlock(&imp->imp_lock);
 268        }
 269
 270        if (OBD_FAIL_CHECK_RESET(OBD_FAIL_LDLM_INTR_CP_AST,
 271                                 OBD_FAIL_LDLM_CP_BL_RACE | OBD_FAIL_ONCE)) {
 272                lock->l_flags |= LDLM_FL_FAIL_LOC;
 273                rc = -EINTR;
 274        } else {
 275                /* Go to sleep until the lock is granted or cancelled. */
 276                rc = l_wait_event(lock->l_waitq,
 277                                  is_granted_or_cancelled(lock), &lwi);
 278        }
 279
 280        if (rc) {
 281                LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
 282                           rc);
 283                return rc;
 284        }
 285
 286        return ldlm_completion_tail(lock);
 287}
 288EXPORT_SYMBOL(ldlm_completion_ast);
 289
 290static void failed_lock_cleanup(struct ldlm_namespace *ns,
 291                                struct ldlm_lock *lock, int mode)
 292{
 293        int need_cancel = 0;
 294
 295        /* Set a flag to prevent us from sending a CANCEL (bug 407) */
 296        lock_res_and_lock(lock);
 297        /* Check that lock is not granted or failed, we might race. */
 298        if ((lock->l_req_mode != lock->l_granted_mode) &&
 299            !(lock->l_flags & LDLM_FL_FAILED)) {
 300                /* Make sure that this lock will not be found by raced
 301                 * bl_ast and -EINVAL reply is sent to server anyways.
 302                 * bug 17645
 303                 */
 304                lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED |
 305                                 LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING;
 306                need_cancel = 1;
 307        }
 308        unlock_res_and_lock(lock);
 309
 310        if (need_cancel)
 311                LDLM_DEBUG(lock,
 312                           "setting FL_LOCAL_ONLY | LDLM_FL_FAILED | LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING");
 313        else
 314                LDLM_DEBUG(lock, "lock was granted or failed in race");
 315
 316        ldlm_lock_decref_internal(lock, mode);
 317
 318        /* XXX - HACK because we shouldn't call ldlm_lock_destroy()
 319         *       from llite/file.c/ll_file_flock().
 320         */
 321        /* This code makes for the fact that we do not have blocking handler on
 322         * a client for flock locks. As such this is the place where we must
 323         * completely kill failed locks. (interrupted and those that
 324         * were waiting to be granted when server evicted us.
 325         */
 326        if (lock->l_resource->lr_type == LDLM_FLOCK) {
 327                lock_res_and_lock(lock);
 328                ldlm_resource_unlink_lock(lock);
 329                ldlm_lock_destroy_nolock(lock);
 330                unlock_res_and_lock(lock);
 331        }
 332}
 333
 334/**
 335 * Finishing portion of client lock enqueue code.
 336 *
 337 * Called after receiving reply from server.
 338 */
 339int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
 340                          enum ldlm_type type, __u8 with_policy,
 341                          enum ldlm_mode mode,
 342                          __u64 *flags, void *lvb, __u32 lvb_len,
 343                          struct lustre_handle *lockh, int rc)
 344{
 345        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
 346        int is_replay = *flags & LDLM_FL_REPLAY;
 347        struct ldlm_lock *lock;
 348        struct ldlm_reply *reply;
 349        int cleanup_phase = 1;
 350        int size = 0;
 351
 352        lock = ldlm_handle2lock(lockh);
 353        /* ldlm_cli_enqueue is holding a reference on this lock. */
 354        if (!lock) {
 355                LASSERT(type == LDLM_FLOCK);
 356                return -ENOLCK;
 357        }
 358
 359        LASSERTF(ergo(lvb_len != 0, lvb_len == lock->l_lvb_len),
 360                 "lvb_len = %d, l_lvb_len = %d\n", lvb_len, lock->l_lvb_len);
 361
 362        if (rc != ELDLM_OK) {
 363                LASSERT(!is_replay);
 364                LDLM_DEBUG(lock, "client-side enqueue END (%s)",
 365                           rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
 366
 367                if (rc != ELDLM_LOCK_ABORTED)
 368                        goto cleanup;
 369        }
 370
 371        /* Before we return, swab the reply */
 372        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
 373        if (!reply) {
 374                rc = -EPROTO;
 375                goto cleanup;
 376        }
 377
 378        if (lvb_len != 0) {
 379                LASSERT(lvb);
 380
 381                size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB,
 382                                            RCL_SERVER);
 383                if (size < 0) {
 384                        LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", size);
 385                        rc = size;
 386                        goto cleanup;
 387                } else if (unlikely(size > lvb_len)) {
 388                        LDLM_ERROR(lock, "Replied LVB is larger than expectation, expected = %d, replied = %d",
 389                                   lvb_len, size);
 390                        rc = -EINVAL;
 391                        goto cleanup;
 392                }
 393        }
 394
 395        if (rc == ELDLM_LOCK_ABORTED) {
 396                if (lvb_len != 0)
 397                        rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
 398                                           lvb, size);
 399                if (rc == 0)
 400                        rc = ELDLM_LOCK_ABORTED;
 401                goto cleanup;
 402        }
 403
 404        /* lock enqueued on the server */
 405        cleanup_phase = 0;
 406
 407        lock_res_and_lock(lock);
 408        /* Key change rehash lock in per-export hash with new key */
 409        if (exp->exp_lock_hash) {
 410                /* In the function below, .hs_keycmp resolves to
 411                 * ldlm_export_lock_keycmp()
 412                 */
 413                /* coverity[overrun-buffer-val] */
 414                cfs_hash_rehash_key(exp->exp_lock_hash,
 415                                    &lock->l_remote_handle,
 416                                    &reply->lock_handle,
 417                                    &lock->l_exp_hash);
 418        } else {
 419                lock->l_remote_handle = reply->lock_handle;
 420        }
 421
 422        *flags = ldlm_flags_from_wire(reply->lock_flags);
 423        lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
 424                                              LDLM_INHERIT_FLAGS);
 425        /* move NO_TIMEOUT flag to the lock to force ldlm_lock_match()
 426         * to wait with no timeout as well
 427         */
 428        lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
 429                                              LDLM_FL_NO_TIMEOUT);
 430        unlock_res_and_lock(lock);
 431
 432        CDEBUG(D_INFO, "local: %p, remote cookie: %#llx, flags: 0x%llx\n",
 433               lock, reply->lock_handle.cookie, *flags);
 434
 435        /* If enqueue returned a blocked lock but the completion handler has
 436         * already run, then it fixed up the resource and we don't need to do it
 437         * again.
 438         */
 439        if ((*flags) & LDLM_FL_LOCK_CHANGED) {
 440                int newmode = reply->lock_desc.l_req_mode;
 441
 442                LASSERT(!is_replay);
 443                if (newmode && newmode != lock->l_req_mode) {
 444                        LDLM_DEBUG(lock, "server returned different mode %s",
 445                                   ldlm_lockname[newmode]);
 446                        lock->l_req_mode = newmode;
 447                }
 448
 449                if (!ldlm_res_eq(&reply->lock_desc.l_resource.lr_name,
 450                                 &lock->l_resource->lr_name)) {
 451                        CDEBUG(D_INFO, "remote intent success, locking "DLDLMRES
 452                                       " instead of "DLDLMRES"\n",
 453                               PLDLMRES(&reply->lock_desc.l_resource),
 454                               PLDLMRES(lock->l_resource));
 455
 456                        rc = ldlm_lock_change_resource(ns, lock,
 457                                        &reply->lock_desc.l_resource.lr_name);
 458                        if (rc || !lock->l_resource) {
 459                                rc = -ENOMEM;
 460                                goto cleanup;
 461                        }
 462                        LDLM_DEBUG(lock, "client-side enqueue, new resource");
 463                }
 464                if (with_policy)
 465                        if (!(type == LDLM_IBITS &&
 466                              !(exp_connect_flags(exp) & OBD_CONNECT_IBITS)))
 467                                /* We assume lock type cannot change on server*/
 468                                ldlm_convert_policy_to_local(exp,
 469                                                lock->l_resource->lr_type,
 470                                                &reply->lock_desc.l_policy_data,
 471                                                &lock->l_policy_data);
 472                if (type != LDLM_PLAIN)
 473                        LDLM_DEBUG(lock,
 474                                   "client-side enqueue, new policy data");
 475        }
 476
 477        if ((*flags) & LDLM_FL_AST_SENT ||
 478            /* Cancel extent locks as soon as possible on a liblustre client,
 479             * because it cannot handle asynchronous ASTs robustly (see
 480             * bug 7311).
 481             */
 482            (LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) {
 483                lock_res_and_lock(lock);
 484                lock->l_flags |= LDLM_FL_CBPENDING |  LDLM_FL_BL_AST;
 485                unlock_res_and_lock(lock);
 486                LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
 487        }
 488
 489        /* If the lock has already been granted by a completion AST, don't
 490         * clobber the LVB with an older one.
 491         */
 492        if (lvb_len != 0) {
 493                /* We must lock or a racing completion might update lvb without
 494                 * letting us know and we'll clobber the correct value.
 495                 * Cannot unlock after the check either, as that still leaves
 496                 * a tiny window for completion to get in
 497                 */
 498                lock_res_and_lock(lock);
 499                if (lock->l_req_mode != lock->l_granted_mode)
 500                        rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
 501                                           lock->l_lvb_data, size);
 502                unlock_res_and_lock(lock);
 503                if (rc < 0) {
 504                        cleanup_phase = 1;
 505                        goto cleanup;
 506                }
 507        }
 508
 509        if (!is_replay) {
 510                rc = ldlm_lock_enqueue(ns, &lock, NULL, flags);
 511                if (lock->l_completion_ast) {
 512                        int err = lock->l_completion_ast(lock, *flags, NULL);
 513
 514                        if (!rc)
 515                                rc = err;
 516                        if (rc)
 517                                cleanup_phase = 1;
 518                }
 519        }
 520
 521        if (lvb_len && lvb) {
 522                /* Copy the LVB here, and not earlier, because the completion
 523                 * AST (if any) can override what we got in the reply
 524                 */
 525                memcpy(lvb, lock->l_lvb_data, lvb_len);
 526        }
 527
 528        LDLM_DEBUG(lock, "client-side enqueue END");
 529cleanup:
 530        if (cleanup_phase == 1 && rc)
 531                failed_lock_cleanup(ns, lock, mode);
 532        /* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */
 533        LDLM_LOCK_PUT(lock);
 534        LDLM_LOCK_RELEASE(lock);
 535        return rc;
 536}
 537EXPORT_SYMBOL(ldlm_cli_enqueue_fini);
 538
 539/**
 540 * Estimate number of lock handles that would fit into request of given
 541 * size.  PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into
 542 * a single page on the send/receive side. XXX: 512 should be changed to
 543 * more adequate value.
 544 */
 545static inline int ldlm_req_handles_avail(int req_size, int off)
 546{
 547        int avail;
 548
 549        avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512) - req_size;
 550        if (likely(avail >= 0))
 551                avail /= (int)sizeof(struct lustre_handle);
 552        else
 553                avail = 0;
 554        avail += LDLM_LOCKREQ_HANDLES - off;
 555
 556        return avail;
 557}
 558
 559static inline int ldlm_capsule_handles_avail(struct req_capsule *pill,
 560                                             enum req_location loc,
 561                                             int off)
 562{
 563        int size = req_capsule_msg_size(pill, loc);
 564
 565        return ldlm_req_handles_avail(size, off);
 566}
 567
 568static inline int ldlm_format_handles_avail(struct obd_import *imp,
 569                                            const struct req_format *fmt,
 570                                            enum req_location loc, int off)
 571{
 572        int size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc);
 573
 574        return ldlm_req_handles_avail(size, off);
 575}
 576
 577/**
 578 * Cancel LRU locks and pack them into the enqueue request. Pack there the given
 579 * \a count locks in \a cancels.
 580 *
 581 * This is to be called by functions preparing their own requests that
 582 * might contain lists of locks to cancel in addition to actual operation
 583 * that needs to be performed.
 584 */
 585int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
 586                      int version, int opc, int canceloff,
 587                      struct list_head *cancels, int count)
 588{
 589        struct ldlm_namespace   *ns = exp->exp_obd->obd_namespace;
 590        struct req_capsule      *pill = &req->rq_pill;
 591        struct ldlm_request     *dlm = NULL;
 592        int flags, avail, to_free, pack = 0;
 593        LIST_HEAD(head);
 594        int rc;
 595
 596        if (!cancels)
 597                cancels = &head;
 598        if (ns_connect_cancelset(ns)) {
 599                /* Estimate the amount of available space in the request. */
 600                req_capsule_filled_sizes(pill, RCL_CLIENT);
 601                avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff);
 602
 603                flags = ns_connect_lru_resize(ns) ?
 604                        LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
 605                to_free = !ns_connect_lru_resize(ns) &&
 606                          opc == LDLM_ENQUEUE ? 1 : 0;
 607
 608                /* Cancel LRU locks here _only_ if the server supports
 609                 * EARLY_CANCEL. Otherwise we have to send extra CANCEL
 610                 * RPC, which will make us slower.
 611                 */
 612                if (avail > count)
 613                        count += ldlm_cancel_lru_local(ns, cancels, to_free,
 614                                                       avail - count, 0, flags);
 615                if (avail > count)
 616                        pack = count;
 617                else
 618                        pack = avail;
 619                req_capsule_set_size(pill, &RMF_DLM_REQ, RCL_CLIENT,
 620                                     ldlm_request_bufsize(pack, opc));
 621        }
 622
 623        rc = ptlrpc_request_pack(req, version, opc);
 624        if (rc) {
 625                ldlm_lock_list_put(cancels, l_bl_ast, count);
 626                return rc;
 627        }
 628
 629        if (ns_connect_cancelset(ns)) {
 630                if (canceloff) {
 631                        dlm = req_capsule_client_get(pill, &RMF_DLM_REQ);
 632                        LASSERT(dlm);
 633                        /* Skip first lock handler in ldlm_request_pack(),
 634                         * this method will increment @lock_count according
 635                         * to the lock handle amount actually written to
 636                         * the buffer.
 637                         */
 638                        dlm->lock_count = canceloff;
 639                }
 640                /* Pack into the request @pack lock handles. */
 641                ldlm_cli_cancel_list(cancels, pack, req, 0);
 642                /* Prepare and send separate cancel RPC for others. */
 643                ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);
 644        } else {
 645                ldlm_lock_list_put(cancels, l_bl_ast, count);
 646        }
 647        return 0;
 648}
 649EXPORT_SYMBOL(ldlm_prep_elc_req);
 650
 651int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req,
 652                          struct list_head *cancels, int count)
 653{
 654        return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
 655                                 LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
 656}
 657EXPORT_SYMBOL(ldlm_prep_enqueue_req);
 658
 659/**
 660 * Client-side lock enqueue.
 661 *
 662 * If a request has some specific initialisation it is passed in \a reqp,
 663 * otherwise it is created in ldlm_cli_enqueue.
 664 *
 665 * Supports sync and async requests, pass \a async flag accordingly. If a
 666 * request was created in ldlm_cli_enqueue and it is the async request,
 667 * pass it to the caller in \a reqp.
 668 */
 669int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 670                     struct ldlm_enqueue_info *einfo,
 671                     const struct ldlm_res_id *res_id,
 672                     ldlm_policy_data_t const *policy, __u64 *flags,
 673                     void *lvb, __u32 lvb_len, enum lvb_type lvb_type,
 674                     struct lustre_handle *lockh, int async)
 675{
 676        struct ldlm_namespace *ns;
 677        struct ldlm_lock      *lock;
 678        struct ldlm_request   *body;
 679        int                 is_replay = *flags & LDLM_FL_REPLAY;
 680        int                 req_passed_in = 1;
 681        int                 rc, err;
 682        struct ptlrpc_request *req;
 683
 684        ns = exp->exp_obd->obd_namespace;
 685
 686        /* If we're replaying this lock, just check some invariants.
 687         * If we're creating a new lock, get everything all setup nicely.
 688         */
 689        if (is_replay) {
 690                lock = ldlm_handle2lock_long(lockh, 0);
 691                LASSERT(lock);
 692                LDLM_DEBUG(lock, "client-side enqueue START");
 693                LASSERT(exp == lock->l_conn_export);
 694        } else {
 695                const struct ldlm_callback_suite cbs = {
 696                        .lcs_completion = einfo->ei_cb_cp,
 697                        .lcs_blocking   = einfo->ei_cb_bl,
 698                        .lcs_glimpse    = einfo->ei_cb_gl
 699                };
 700                lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
 701                                        einfo->ei_mode, &cbs, einfo->ei_cbdata,
 702                                        lvb_len, lvb_type);
 703                if (!lock)
 704                        return -ENOMEM;
 705                /* for the local lock, add the reference */
 706                ldlm_lock_addref_internal(lock, einfo->ei_mode);
 707                ldlm_lock2handle(lock, lockh);
 708                if (policy)
 709                        lock->l_policy_data = *policy;
 710
 711                if (einfo->ei_type == LDLM_EXTENT) {
 712                        /* extent lock without policy is a bug */
 713                        if (!policy)
 714                                LBUG();
 715
 716                        lock->l_req_extent = policy->l_extent;
 717                }
 718                LDLM_DEBUG(lock, "client-side enqueue START, flags %llx\n",
 719                           *flags);
 720        }
 721
 722        lock->l_conn_export = exp;
 723        lock->l_export = NULL;
 724        lock->l_blocking_ast = einfo->ei_cb_bl;
 725        lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL));
 726
 727        /* lock not sent to server yet */
 728
 729        if (!reqp || !*reqp) {
 730                req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
 731                                                &RQF_LDLM_ENQUEUE,
 732                                                LUSTRE_DLM_VERSION,
 733                                                LDLM_ENQUEUE);
 734                if (!req) {
 735                        failed_lock_cleanup(ns, lock, einfo->ei_mode);
 736                        LDLM_LOCK_RELEASE(lock);
 737                        return -ENOMEM;
 738                }
 739                req_passed_in = 0;
 740                if (reqp)
 741                        *reqp = req;
 742        } else {
 743                int len;
 744
 745                req = *reqp;
 746                len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ,
 747                                           RCL_CLIENT);
 748                LASSERTF(len >= sizeof(*body), "buflen[%d] = %d, not %d\n",
 749                         DLM_LOCKREQ_OFF, len, (int)sizeof(*body));
 750        }
 751
 752        /* Dump lock data into the request buffer */
 753        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
 754        ldlm_lock2desc(lock, &body->lock_desc);
 755        body->lock_flags = ldlm_flags_to_wire(*flags);
 756        body->lock_handle[0] = *lockh;
 757
 758        /* Continue as normal. */
 759        if (!req_passed_in) {
 760                if (lvb_len > 0)
 761                        req_capsule_extend(&req->rq_pill,
 762                                           &RQF_LDLM_ENQUEUE_LVB);
 763                req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
 764                                     lvb_len);
 765                ptlrpc_request_set_replen(req);
 766        }
 767
 768        /*
 769         * Liblustre client doesn't get extent locks, except for O_APPEND case
 770         * where [0, OBD_OBJECT_EOF] lock is taken, or truncate, where
 771         * [i_size, OBD_OBJECT_EOF] lock is taken.
 772         */
 773        LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT ||
 774                     policy->l_extent.end == OBD_OBJECT_EOF));
 775
 776        if (async) {
 777                LASSERT(reqp);
 778                return 0;
 779        }
 780
 781        LDLM_DEBUG(lock, "sending request");
 782
 783        rc = ptlrpc_queue_wait(req);
 784
 785        err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
 786                                    einfo->ei_mode, flags, lvb, lvb_len,
 787                                    lockh, rc);
 788
 789        /* If ldlm_cli_enqueue_fini did not find the lock, we need to free
 790         * one reference that we took
 791         */
 792        if (err == -ENOLCK)
 793                LDLM_LOCK_RELEASE(lock);
 794        else
 795                rc = err;
 796
 797        if (!req_passed_in && req) {
 798                ptlrpc_req_finished(req);
 799                if (reqp)
 800                        *reqp = NULL;
 801        }
 802
 803        return rc;
 804}
 805EXPORT_SYMBOL(ldlm_cli_enqueue);
 806
 807/**
 808 * Cancel locks locally.
 809 * Returns:
 810 * \retval LDLM_FL_LOCAL_ONLY if there is no need for a CANCEL RPC to the server
 811 * \retval LDLM_FL_CANCELING otherwise;
 812 * \retval LDLM_FL_BL_AST if there is a need for a separate CANCEL RPC.
 813 */
 814static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock)
 815{
 816        __u64 rc = LDLM_FL_LOCAL_ONLY;
 817
 818        if (lock->l_conn_export) {
 819                bool local_only;
 820
 821                LDLM_DEBUG(lock, "client-side cancel");
 822                /* Set this flag to prevent others from getting new references*/
 823                lock_res_and_lock(lock);
 824                lock->l_flags |= LDLM_FL_CBPENDING;
 825                local_only = !!(lock->l_flags &
 826                                (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK));
 827                ldlm_cancel_callback(lock);
 828                rc = (lock->l_flags & LDLM_FL_BL_AST) ?
 829                        LDLM_FL_BL_AST : LDLM_FL_CANCELING;
 830                unlock_res_and_lock(lock);
 831
 832                if (local_only) {
 833                        CDEBUG(D_DLMTRACE, "not sending request (at caller's instruction)\n");
 834                        rc = LDLM_FL_LOCAL_ONLY;
 835                }
 836                ldlm_lock_cancel(lock);
 837        } else {
 838                LDLM_ERROR(lock, "Trying to cancel local lock");
 839                LBUG();
 840        }
 841
 842        return rc;
 843}
 844
 845/**
 846 * Pack \a count locks in \a head into ldlm_request buffer of request \a req.
 847 */
 848static void ldlm_cancel_pack(struct ptlrpc_request *req,
 849                             struct list_head *head, int count)
 850{
 851        struct ldlm_request *dlm;
 852        struct ldlm_lock *lock;
 853        int max, packed = 0;
 854
 855        dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
 856        LASSERT(dlm);
 857
 858        /* Check the room in the request buffer. */
 859        max = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT) -
 860                sizeof(struct ldlm_request);
 861        max /= sizeof(struct lustre_handle);
 862        max += LDLM_LOCKREQ_HANDLES;
 863        LASSERT(max >= dlm->lock_count + count);
 864
 865        /* XXX: it would be better to pack lock handles grouped by resource.
 866         * so that the server cancel would call filter_lvbo_update() less
 867         * frequently.
 868         */
 869        list_for_each_entry(lock, head, l_bl_ast) {
 870                if (!count--)
 871                        break;
 872                LASSERT(lock->l_conn_export);
 873                /* Pack the lock handle to the given request buffer. */
 874                LDLM_DEBUG(lock, "packing");
 875                dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle;
 876                packed++;
 877        }
 878        CDEBUG(D_DLMTRACE, "%d locks packed\n", packed);
 879}
 880
 881/**
 882 * Prepare and send a batched cancel RPC. It will include \a count lock
 883 * handles of locks given in \a cancels list.
 884 */
 885static int ldlm_cli_cancel_req(struct obd_export *exp,
 886                               struct list_head *cancels,
 887                               int count, enum ldlm_cancel_flags flags)
 888{
 889        struct ptlrpc_request *req = NULL;
 890        struct obd_import *imp;
 891        int free, sent = 0;
 892        int rc = 0;
 893
 894        LASSERT(exp);
 895        LASSERT(count > 0);
 896
 897        CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, cfs_fail_val);
 898
 899        if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
 900                return count;
 901
 902        free = ldlm_format_handles_avail(class_exp2cliimp(exp),
 903                                         &RQF_LDLM_CANCEL, RCL_CLIENT, 0);
 904        if (count > free)
 905                count = free;
 906
 907        while (1) {
 908                imp = class_exp2cliimp(exp);
 909                if (!imp || imp->imp_invalid) {
 910                        CDEBUG(D_DLMTRACE,
 911                               "skipping cancel on invalid import %p\n", imp);
 912                        return count;
 913                }
 914
 915                req = ptlrpc_request_alloc(imp, &RQF_LDLM_CANCEL);
 916                if (!req) {
 917                        rc = -ENOMEM;
 918                        goto out;
 919                }
 920
 921                req_capsule_filled_sizes(&req->rq_pill, RCL_CLIENT);
 922                req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
 923                                     ldlm_request_bufsize(count, LDLM_CANCEL));
 924
 925                rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CANCEL);
 926                if (rc) {
 927                        ptlrpc_request_free(req);
 928                        goto out;
 929                }
 930
 931                req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
 932                req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
 933                ptlrpc_at_set_req_timeout(req);
 934
 935                ldlm_cancel_pack(req, cancels, count);
 936
 937                ptlrpc_request_set_replen(req);
 938                if (flags & LCF_ASYNC) {
 939                        ptlrpcd_add_req(req);
 940                        sent = count;
 941                        goto out;
 942                }
 943
 944                rc = ptlrpc_queue_wait(req);
 945                if (rc == LUSTRE_ESTALE) {
 946                        CDEBUG(D_DLMTRACE, "client/server (nid %s) out of sync -- not fatal\n",
 947                               libcfs_nid2str(req->rq_import->
 948                                              imp_connection->c_peer.nid));
 949                        rc = 0;
 950                } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/
 951                           req->rq_import_generation == imp->imp_generation) {
 952                        ptlrpc_req_finished(req);
 953                        continue;
 954                } else if (rc != ELDLM_OK) {
 955                        /* -ESHUTDOWN is common on umount */
 956                        CDEBUG_LIMIT(rc == -ESHUTDOWN ? D_DLMTRACE : D_ERROR,
 957                                     "Got rc %d from cancel RPC: canceling anyway\n",
 958                                     rc);
 959                        break;
 960                }
 961                sent = count;
 962                break;
 963        }
 964
 965        ptlrpc_req_finished(req);
 966out:
 967        return sent ? sent : rc;
 968}
 969
 970static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
 971{
 972        return &imp->imp_obd->obd_namespace->ns_pool;
 973}
 974
 975/**
 976 * Update client's OBD pool related fields with new SLV and Limit from \a req.
 977 */
 978int ldlm_cli_update_pool(struct ptlrpc_request *req)
 979{
 980        struct obd_device *obd;
 981        __u64 new_slv;
 982        __u32 new_limit;
 983
 984        if (unlikely(!req->rq_import || !req->rq_import->imp_obd ||
 985                     !imp_connect_lru_resize(req->rq_import))) {
 986                /*
 987                 * Do nothing for corner cases.
 988                 */
 989                return 0;
 990        }
 991
 992        /* In some cases RPC may contain SLV and limit zeroed out. This
 993         * is the case when server does not support LRU resize feature.
 994         * This is also possible in some recovery cases when server-side
 995         * reqs have no reference to the OBD export and thus access to
 996         * server-side namespace is not possible.
 997         */
 998        if (lustre_msg_get_slv(req->rq_repmsg) == 0 ||
 999            lustre_msg_get_limit(req->rq_repmsg) == 0) {
1000                DEBUG_REQ(D_HA, req,
1001                          "Zero SLV or Limit found (SLV: %llu, Limit: %u)",
1002                          lustre_msg_get_slv(req->rq_repmsg),
1003                          lustre_msg_get_limit(req->rq_repmsg));
1004                return 0;
1005        }
1006
1007        new_limit = lustre_msg_get_limit(req->rq_repmsg);
1008        new_slv = lustre_msg_get_slv(req->rq_repmsg);
1009        obd = req->rq_import->imp_obd;
1010
1011        /* Set new SLV and limit in OBD fields to make them accessible
1012         * to the pool thread. We do not access obd_namespace and pool
1013         * directly here as there is no reliable way to make sure that
1014         * they are still alive at cleanup time. Evil races are possible
1015         * which may cause Oops at that time.
1016         */
1017        write_lock(&obd->obd_pool_lock);
1018        obd->obd_pool_slv = new_slv;
1019        obd->obd_pool_limit = new_limit;
1020        write_unlock(&obd->obd_pool_lock);
1021
1022        return 0;
1023}
1024EXPORT_SYMBOL(ldlm_cli_update_pool);
1025
1026/**
1027 * Client side lock cancel.
1028 *
1029 * Lock must not have any readers or writers by this time.
1030 */
1031int ldlm_cli_cancel(struct lustre_handle *lockh,
1032                    enum ldlm_cancel_flags cancel_flags)
1033{
1034        struct obd_export *exp;
1035        int avail, flags, count = 1;
1036        __u64 rc = 0;
1037        struct ldlm_namespace *ns;
1038        struct ldlm_lock *lock;
1039        LIST_HEAD(cancels);
1040
1041        /* concurrent cancels on the same handle can happen */
1042        lock = ldlm_handle2lock_long(lockh, LDLM_FL_CANCELING);
1043        if (!lock) {
1044                LDLM_DEBUG_NOLOCK("lock is already being destroyed");
1045                return 0;
1046        }
1047
1048        rc = ldlm_cli_cancel_local(lock);
1049        if (rc == LDLM_FL_LOCAL_ONLY || cancel_flags & LCF_LOCAL) {
1050                LDLM_LOCK_RELEASE(lock);
1051                return 0;
1052        }
1053        /* Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL
1054         * RPC which goes to canceld portal, so we can cancel other LRU locks
1055         * here and send them all as one LDLM_CANCEL RPC.
1056         */
1057        LASSERT(list_empty(&lock->l_bl_ast));
1058        list_add(&lock->l_bl_ast, &cancels);
1059
1060        exp = lock->l_conn_export;
1061        if (exp_connect_cancelset(exp)) {
1062                avail = ldlm_format_handles_avail(class_exp2cliimp(exp),
1063                                                  &RQF_LDLM_CANCEL,
1064                                                  RCL_CLIENT, 0);
1065                LASSERT(avail > 0);
1066
1067                ns = ldlm_lock_to_ns(lock);
1068                flags = ns_connect_lru_resize(ns) ?
1069                        LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
1070                count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
1071                                               LCF_BL_AST, flags);
1072        }
1073        ldlm_cli_cancel_list(&cancels, count, NULL, cancel_flags);
1074        return 0;
1075}
1076EXPORT_SYMBOL(ldlm_cli_cancel);
1077
1078/**
1079 * Locally cancel up to \a count locks in list \a cancels.
1080 * Return the number of cancelled locks.
1081 */
1082int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
1083                               enum ldlm_cancel_flags flags)
1084{
1085        LIST_HEAD(head);
1086        struct ldlm_lock *lock, *next;
1087        int left = 0, bl_ast = 0;
1088        __u64 rc;
1089
1090        left = count;
1091        list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
1092                if (left-- == 0)
1093                        break;
1094
1095                if (flags & LCF_LOCAL) {
1096                        rc = LDLM_FL_LOCAL_ONLY;
1097                        ldlm_lock_cancel(lock);
1098                } else {
1099                        rc = ldlm_cli_cancel_local(lock);
1100                }
1101                /* Until we have compound requests and can send LDLM_CANCEL
1102                 * requests batched with generic RPCs, we need to send cancels
1103                 * with the LDLM_FL_BL_AST flag in a separate RPC from
1104                 * the one being generated now.
1105                 */
1106                if (!(flags & LCF_BL_AST) && (rc == LDLM_FL_BL_AST)) {
1107                        LDLM_DEBUG(lock, "Cancel lock separately");
1108                        list_del_init(&lock->l_bl_ast);
1109                        list_add(&lock->l_bl_ast, &head);
1110                        bl_ast++;
1111                        continue;
1112                }
1113                if (rc == LDLM_FL_LOCAL_ONLY) {
1114                        /* CANCEL RPC should not be sent to server. */
1115                        list_del_init(&lock->l_bl_ast);
1116                        LDLM_LOCK_RELEASE(lock);
1117                        count--;
1118                }
1119        }
1120        if (bl_ast > 0) {
1121                count -= bl_ast;
1122                ldlm_cli_cancel_list(&head, bl_ast, NULL, 0);
1123        }
1124
1125        return count;
1126}
1127EXPORT_SYMBOL(ldlm_cli_cancel_list_local);
1128
1129/**
1130 * Cancel as many locks as possible w/o sending any RPCs (e.g. to write back
1131 * dirty data, to close a file, ...) or waiting for any RPCs in-flight (e.g.
1132 * readahead requests, ...)
1133 */
1134static ldlm_policy_res_t ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns,
1135                                                    struct ldlm_lock *lock,
1136                                                    int unused, int added,
1137                                                    int count)
1138{
1139        ldlm_policy_res_t result = LDLM_POLICY_CANCEL_LOCK;
1140        ldlm_cancel_for_recovery cb = ns->ns_cancel_for_recovery;
1141
1142        lock_res_and_lock(lock);
1143
1144        /* don't check added & count since we want to process all locks
1145         * from unused list
1146         */
1147        switch (lock->l_resource->lr_type) {
1148        case LDLM_EXTENT:
1149        case LDLM_IBITS:
1150                if (cb && cb(lock))
1151                        break;
1152        default:
1153                result = LDLM_POLICY_SKIP_LOCK;
1154                lock->l_flags |= LDLM_FL_SKIPPED;
1155                break;
1156        }
1157
1158        unlock_res_and_lock(lock);
1159        return result;
1160}
1161
1162/**
1163 * Callback function for LRU-resize policy. Decides whether to keep
1164 * \a lock in LRU for current \a LRU size \a unused, added in current
1165 * scan \a added and number of locks to be preferably canceled \a count.
1166 *
1167 * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1168 *
1169 * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1170 */
1171static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
1172                                                 struct ldlm_lock *lock,
1173                                                 int unused, int added,
1174                                                 int count)
1175{
1176        unsigned long cur = cfs_time_current();
1177        struct ldlm_pool *pl = &ns->ns_pool;
1178        __u64 slv, lvf, lv;
1179        unsigned long la;
1180
1181        /* Stop LRU processing when we reach past @count or have checked all
1182         * locks in LRU.
1183         */
1184        if (count && added >= count)
1185                return LDLM_POLICY_KEEP_LOCK;
1186
1187        slv = ldlm_pool_get_slv(pl);
1188        lvf = ldlm_pool_get_lvf(pl);
1189        la = cfs_duration_sec(cfs_time_sub(cur,
1190                              lock->l_last_used));
1191        lv = lvf * la * unused;
1192
1193        /* Inform pool about current CLV to see it via debugfs. */
1194        ldlm_pool_set_clv(pl, lv);
1195
1196        /* Stop when SLV is not yet come from server or lv is smaller than
1197         * it is.
1198         */
1199        return (slv == 0 || lv < slv) ?
1200                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1201}
1202
1203/**
1204 * Callback function for debugfs used policy. Makes decision whether to keep
1205 * \a lock in LRU for current \a LRU size \a unused, added in current scan \a
1206 * added and number of locks to be preferably canceled \a count.
1207 *
1208 * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1209 *
1210 * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1211 */
1212static ldlm_policy_res_t ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
1213                                                   struct ldlm_lock *lock,
1214                                                   int unused, int added,
1215                                                   int count)
1216{
1217        /* Stop LRU processing when we reach past @count or have checked all
1218         * locks in LRU.
1219         */
1220        return (added >= count) ?
1221                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1222}
1223
1224/**
1225 * Callback function for aged policy. Makes decision whether to keep \a lock in
1226 * LRU for current LRU size \a unused, added in current scan \a added and
1227 * number of locks to be preferably canceled \a count.
1228 *
1229 * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1230 *
1231 * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1232 */
1233static ldlm_policy_res_t ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
1234                                                 struct ldlm_lock *lock,
1235                                                 int unused, int added,
1236                                                 int count)
1237{
1238        /* Stop LRU processing if young lock is found and we reach past count */
1239        return ((added >= count) &&
1240                time_before(cfs_time_current(),
1241                            cfs_time_add(lock->l_last_used, ns->ns_max_age))) ?
1242                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1243}
1244
1245/**
1246 * Callback function for default policy. Makes decision whether to keep \a lock
1247 * in LRU for current LRU size \a unused, added in current scan \a added and
1248 * number of locks to be preferably canceled \a count.
1249 *
1250 * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1251 *
1252 * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1253 */
1254static ldlm_policy_res_t ldlm_cancel_default_policy(struct ldlm_namespace *ns,
1255                                                    struct ldlm_lock *lock,
1256                                                    int unused, int added,
1257                                                    int count)
1258{
1259        /* Stop LRU processing when we reach past count or have checked all
1260         * locks in LRU.
1261         */
1262        return (added >= count) ?
1263                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1264}
1265
1266typedef ldlm_policy_res_t (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *,
1267                                                      struct ldlm_lock *, int,
1268                                                      int, int);
1269
1270static ldlm_cancel_lru_policy_t
1271ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
1272{
1273        if (flags & LDLM_CANCEL_NO_WAIT)
1274                return ldlm_cancel_no_wait_policy;
1275
1276        if (ns_connect_lru_resize(ns)) {
1277                if (flags & LDLM_CANCEL_SHRINK)
1278                        /* We kill passed number of old locks. */
1279                        return ldlm_cancel_passed_policy;
1280                else if (flags & LDLM_CANCEL_LRUR)
1281                        return ldlm_cancel_lrur_policy;
1282                else if (flags & LDLM_CANCEL_PASSED)
1283                        return ldlm_cancel_passed_policy;
1284        } else {
1285                if (flags & LDLM_CANCEL_AGED)
1286                        return ldlm_cancel_aged_policy;
1287        }
1288
1289        return ldlm_cancel_default_policy;
1290}
1291
1292/**
1293 * - Free space in LRU for \a count new locks,
1294 *   redundant unused locks are canceled locally;
1295 * - also cancel locally unused aged locks;
1296 * - do not cancel more than \a max locks;
1297 * - GET the found locks and add them into the \a cancels list.
1298 *
1299 * A client lock can be added to the l_bl_ast list only when it is
1300 * marked LDLM_FL_CANCELING. Otherwise, somebody is already doing
1301 * CANCEL.  There are the following use cases:
1302 * ldlm_cancel_resource_local(), ldlm_cancel_lru_local() and
1303 * ldlm_cli_cancel(), which check and set this flag properly. As any
1304 * attempt to cancel a lock rely on this flag, l_bl_ast list is accessed
1305 * later without any special locking.
1306 *
1307 * Calling policies for enabled LRU resize:
1308 * ----------------------------------------
1309 * flags & LDLM_CANCEL_LRUR - use LRU resize policy (SLV from server) to
1310 *                          cancel not more than \a count locks;
1311 *
1312 * flags & LDLM_CANCEL_PASSED - cancel \a count number of old locks (located at
1313 *                            the beginning of LRU list);
1314 *
1315 * flags & LDLM_CANCEL_SHRINK - cancel not more than \a count locks according to
1316 *                            memory pressure policy function;
1317 *
1318 * flags & LDLM_CANCEL_AGED - cancel \a count locks according to "aged policy".
1319 *
1320 * flags & LDLM_CANCEL_NO_WAIT - cancel as many unused locks as possible
1321 *                             (typically before replaying locks) w/o
1322 *                             sending any RPCs or waiting for any
1323 *                             outstanding RPC to complete.
1324 */
1325static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
1326                                 struct list_head *cancels, int count, int max,
1327                                 int flags)
1328{
1329        ldlm_cancel_lru_policy_t pf;
1330        struct ldlm_lock *lock, *next;
1331        int added = 0, unused, remained;
1332
1333        spin_lock(&ns->ns_lock);
1334        unused = ns->ns_nr_unused;
1335        remained = unused;
1336
1337        if (!ns_connect_lru_resize(ns))
1338                count += unused - ns->ns_max_unused;
1339
1340        pf = ldlm_cancel_lru_policy(ns, flags);
1341        LASSERT(pf);
1342
1343        while (!list_empty(&ns->ns_unused_list)) {
1344                ldlm_policy_res_t result;
1345
1346                /* all unused locks */
1347                if (remained-- <= 0)
1348                        break;
1349
1350                /* For any flags, stop scanning if @max is reached. */
1351                if (max && added >= max)
1352                        break;
1353
1354                list_for_each_entry_safe(lock, next, &ns->ns_unused_list,
1355                                             l_lru) {
1356                        /* No locks which got blocking requests. */
1357                        LASSERT(!(lock->l_flags & LDLM_FL_BL_AST));
1358
1359                        if (flags & LDLM_CANCEL_NO_WAIT &&
1360                            lock->l_flags & LDLM_FL_SKIPPED)
1361                                /* already processed */
1362                                continue;
1363
1364                        /* Somebody is already doing CANCEL. No need for this
1365                         * lock in LRU, do not traverse it again.
1366                         */
1367                        if (!(lock->l_flags & LDLM_FL_CANCELING))
1368                                break;
1369
1370                        ldlm_lock_remove_from_lru_nolock(lock);
1371                }
1372                if (&lock->l_lru == &ns->ns_unused_list)
1373                        break;
1374
1375                LDLM_LOCK_GET(lock);
1376                spin_unlock(&ns->ns_lock);
1377                lu_ref_add(&lock->l_reference, __func__, current);
1378
1379                /* Pass the lock through the policy filter and see if it
1380                 * should stay in LRU.
1381                 *
1382                 * Even for shrinker policy we stop scanning if
1383                 * we find a lock that should stay in the cache.
1384                 * We should take into account lock age anyway
1385                 * as a new lock is a valuable resource even if
1386                 * it has a low weight.
1387                 *
1388                 * That is, for shrinker policy we drop only
1389                 * old locks, but additionally choose them by
1390                 * their weight. Big extent locks will stay in
1391                 * the cache. */
1392                result = pf(ns, lock, unused, added, count);
1393                if (result == LDLM_POLICY_KEEP_LOCK) {
1394                        lu_ref_del(&lock->l_reference,
1395                                   __func__, current);
1396                        LDLM_LOCK_RELEASE(lock);
1397                        spin_lock(&ns->ns_lock);
1398                        break;
1399                }
1400                if (result == LDLM_POLICY_SKIP_LOCK) {
1401                        lu_ref_del(&lock->l_reference,
1402                                   __func__, current);
1403                        LDLM_LOCK_RELEASE(lock);
1404                        spin_lock(&ns->ns_lock);
1405                        continue;
1406                }
1407
1408                lock_res_and_lock(lock);
1409                /* Check flags again under the lock. */
1410                if ((lock->l_flags & LDLM_FL_CANCELING) ||
1411                    (ldlm_lock_remove_from_lru(lock) == 0)) {
1412                        /* Another thread is removing lock from LRU, or
1413                         * somebody is already doing CANCEL, or there
1414                         * is a blocking request which will send cancel
1415                         * by itself, or the lock is no longer unused.
1416                         */
1417                        unlock_res_and_lock(lock);
1418                        lu_ref_del(&lock->l_reference,
1419                                   __func__, current);
1420                        LDLM_LOCK_RELEASE(lock);
1421                        spin_lock(&ns->ns_lock);
1422                        continue;
1423                }
1424                LASSERT(!lock->l_readers && !lock->l_writers);
1425
1426                /* If we have chosen to cancel this lock voluntarily, we
1427                 * better send cancel notification to server, so that it
1428                 * frees appropriate state. This might lead to a race
1429                 * where while we are doing cancel here, server is also
1430                 * silently cancelling this lock.
1431                 */
1432                lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK;
1433
1434                /* Setting the CBPENDING flag is a little misleading,
1435                 * but prevents an important race; namely, once
1436                 * CBPENDING is set, the lock can accumulate no more
1437                 * readers/writers. Since readers and writers are
1438                 * already zero here, ldlm_lock_decref() won't see
1439                 * this flag and call l_blocking_ast
1440                 */
1441                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING;
1442
1443                /* We can't re-add to l_lru as it confuses the
1444                 * refcounting in ldlm_lock_remove_from_lru() if an AST
1445                 * arrives after we drop lr_lock below. We use l_bl_ast
1446                 * and can't use l_pending_chain as it is used both on
1447                 * server and client nevertheless bug 5666 says it is
1448                 * used only on server
1449                 */
1450                LASSERT(list_empty(&lock->l_bl_ast));
1451                list_add(&lock->l_bl_ast, cancels);
1452                unlock_res_and_lock(lock);
1453                lu_ref_del(&lock->l_reference, __func__, current);
1454                spin_lock(&ns->ns_lock);
1455                added++;
1456                unused--;
1457        }
1458        spin_unlock(&ns->ns_lock);
1459        return added;
1460}
1461
1462int ldlm_cancel_lru_local(struct ldlm_namespace *ns,
1463                          struct list_head *cancels, int count, int max,
1464                          enum ldlm_cancel_flags cancel_flags, int flags)
1465{
1466        int added;
1467
1468        added = ldlm_prepare_lru_list(ns, cancels, count, max, flags);
1469        if (added <= 0)
1470                return added;
1471        return ldlm_cli_cancel_list_local(cancels, added, cancel_flags);
1472}
1473
1474/**
1475 * Cancel at least \a nr locks from given namespace LRU.
1476 *
1477 * When called with LCF_ASYNC the blocking callback will be handled
1478 * in a thread and this function will return after the thread has been
1479 * asked to call the callback.  When called with LCF_ASYNC the blocking
1480 * callback will be performed in this function.
1481 */
1482int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
1483                    enum ldlm_cancel_flags cancel_flags,
1484                    int flags)
1485{
1486        LIST_HEAD(cancels);
1487        int count, rc;
1488
1489        /* Just prepare the list of locks, do not actually cancel them yet.
1490         * Locks are cancelled later in a separate thread.
1491         */
1492        count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, flags);
1493        rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags);
1494        if (rc == 0)
1495                return count;
1496
1497        return 0;
1498}
1499
1500/**
1501 * Find and cancel locally unused locks found on resource, matched to the
1502 * given policy, mode. GET the found locks and add them into the \a cancels
1503 * list.
1504 */
1505int ldlm_cancel_resource_local(struct ldlm_resource *res,
1506                               struct list_head *cancels,
1507                               ldlm_policy_data_t *policy,
1508                               enum ldlm_mode mode, __u64 lock_flags,
1509                               enum ldlm_cancel_flags cancel_flags,
1510                               void *opaque)
1511{
1512        struct ldlm_lock *lock;
1513        int count = 0;
1514
1515        lock_res(res);
1516        list_for_each_entry(lock, &res->lr_granted, l_res_link) {
1517                if (opaque && lock->l_ast_data != opaque) {
1518                        LDLM_ERROR(lock, "data %p doesn't match opaque %p",
1519                                   lock->l_ast_data, opaque);
1520                        continue;
1521                }
1522
1523                if (lock->l_readers || lock->l_writers)
1524                        continue;
1525
1526                /* If somebody is already doing CANCEL, or blocking AST came,
1527                 * skip this lock.
1528                 */
1529                if (lock->l_flags & LDLM_FL_BL_AST ||
1530                    lock->l_flags & LDLM_FL_CANCELING)
1531                        continue;
1532
1533                if (lockmode_compat(lock->l_granted_mode, mode))
1534                        continue;
1535
1536                /* If policy is given and this is IBITS lock, add to list only
1537                 * those locks that match by policy.
1538                 */
1539                if (policy && (lock->l_resource->lr_type == LDLM_IBITS) &&
1540                    !(lock->l_policy_data.l_inodebits.bits &
1541                      policy->l_inodebits.bits))
1542                        continue;
1543
1544                /* See CBPENDING comment in ldlm_cancel_lru */
1545                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
1546                                 lock_flags;
1547
1548                LASSERT(list_empty(&lock->l_bl_ast));
1549                list_add(&lock->l_bl_ast, cancels);
1550                LDLM_LOCK_GET(lock);
1551                count++;
1552        }
1553        unlock_res(res);
1554
1555        return ldlm_cli_cancel_list_local(cancels, count, cancel_flags);
1556}
1557EXPORT_SYMBOL(ldlm_cancel_resource_local);
1558
1559/**
1560 * Cancel client-side locks from a list and send/prepare cancel RPCs to the
1561 * server.
1562 * If \a req is NULL, send CANCEL request to server with handles of locks
1563 * in the \a cancels. If EARLY_CANCEL is not supported, send CANCEL requests
1564 * separately per lock.
1565 * If \a req is not NULL, put handles of locks in \a cancels into the request
1566 * buffer at the offset \a off.
1567 * Destroy \a cancels at the end.
1568 */
1569int ldlm_cli_cancel_list(struct list_head *cancels, int count,
1570                         struct ptlrpc_request *req,
1571                         enum ldlm_cancel_flags flags)
1572{
1573        struct ldlm_lock *lock;
1574        int res = 0;
1575
1576        if (list_empty(cancels) || count == 0)
1577                return 0;
1578
1579        /* XXX: requests (both batched and not) could be sent in parallel.
1580         * Usually it is enough to have just 1 RPC, but it is possible that
1581         * there are too many locks to be cancelled in LRU or on a resource.
1582         * It would also speed up the case when the server does not support
1583         * the feature.
1584         */
1585        while (count > 0) {
1586                LASSERT(!list_empty(cancels));
1587                lock = list_entry(cancels->next, struct ldlm_lock,
1588                                      l_bl_ast);
1589                LASSERT(lock->l_conn_export);
1590
1591                if (exp_connect_cancelset(lock->l_conn_export)) {
1592                        res = count;
1593                        if (req)
1594                                ldlm_cancel_pack(req, cancels, count);
1595                        else
1596                                res = ldlm_cli_cancel_req(lock->l_conn_export,
1597                                                          cancels, count,
1598                                                          flags);
1599                } else {
1600                        res = ldlm_cli_cancel_req(lock->l_conn_export,
1601                                                  cancels, 1, flags);
1602                }
1603
1604                if (res < 0) {
1605                        CDEBUG_LIMIT(res == -ESHUTDOWN ? D_DLMTRACE : D_ERROR,
1606                                     "ldlm_cli_cancel_list: %d\n", res);
1607                        res = count;
1608                }
1609
1610                count -= res;
1611                ldlm_lock_list_put(cancels, l_bl_ast, res);
1612        }
1613        LASSERT(count == 0);
1614        return 0;
1615}
1616EXPORT_SYMBOL(ldlm_cli_cancel_list);
1617
1618/**
1619 * Cancel all locks on a resource that have 0 readers/writers.
1620 *
1621 * If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
1622 * to notify the server.
1623 */
1624int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
1625                                    const struct ldlm_res_id *res_id,
1626                                    ldlm_policy_data_t *policy,
1627                                    enum ldlm_mode mode,
1628                                    enum ldlm_cancel_flags flags,
1629                                    void *opaque)
1630{
1631        struct ldlm_resource *res;
1632        LIST_HEAD(cancels);
1633        int count;
1634        int rc;
1635
1636        res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
1637        if (!res) {
1638                /* This is not a problem. */
1639                CDEBUG(D_INFO, "No resource %llu\n", res_id->name[0]);
1640                return 0;
1641        }
1642
1643        LDLM_RESOURCE_ADDREF(res);
1644        count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
1645                                           0, flags | LCF_BL_AST, opaque);
1646        rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags);
1647        if (rc != ELDLM_OK)
1648                CERROR("canceling unused lock "DLDLMRES": rc = %d\n",
1649                       PLDLMRES(res), rc);
1650
1651        LDLM_RESOURCE_DELREF(res);
1652        ldlm_resource_putref(res);
1653        return 0;
1654}
1655EXPORT_SYMBOL(ldlm_cli_cancel_unused_resource);
1656
1657struct ldlm_cli_cancel_arg {
1658        int     lc_flags;
1659        void   *lc_opaque;
1660};
1661
1662static int ldlm_cli_hash_cancel_unused(struct cfs_hash *hs,
1663                                       struct cfs_hash_bd *bd,
1664                                       struct hlist_node *hnode, void *arg)
1665{
1666        struct ldlm_resource       *res = cfs_hash_object(hs, hnode);
1667        struct ldlm_cli_cancel_arg     *lc = arg;
1668
1669        ldlm_cli_cancel_unused_resource(ldlm_res_to_ns(res), &res->lr_name,
1670                                        NULL, LCK_MINMODE,
1671                                        lc->lc_flags, lc->lc_opaque);
1672        /* must return 0 for hash iteration */
1673        return 0;
1674}
1675
1676/**
1677 * Cancel all locks on a namespace (or a specific resource, if given)
1678 * that have 0 readers/writers.
1679 *
1680 * If flags & LCF_LOCAL, throw the locks away without trying
1681 * to notify the server. */
1682int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
1683                           const struct ldlm_res_id *res_id,
1684                           enum ldlm_cancel_flags flags, void *opaque)
1685{
1686        struct ldlm_cli_cancel_arg arg = {
1687                .lc_flags       = flags,
1688                .lc_opaque      = opaque,
1689        };
1690
1691        if (!ns)
1692                return ELDLM_OK;
1693
1694        if (res_id) {
1695                return ldlm_cli_cancel_unused_resource(ns, res_id, NULL,
1696                                                       LCK_MINMODE, flags,
1697                                                       opaque);
1698        } else {
1699                cfs_hash_for_each_nolock(ns->ns_rs_hash,
1700                                         ldlm_cli_hash_cancel_unused, &arg);
1701                return ELDLM_OK;
1702        }
1703}
1704EXPORT_SYMBOL(ldlm_cli_cancel_unused);
1705
1706/* Lock iterators. */
1707
1708static int ldlm_resource_foreach(struct ldlm_resource *res,
1709                                 ldlm_iterator_t iter, void *closure)
1710{
1711        struct list_head *tmp, *next;
1712        struct ldlm_lock *lock;
1713        int rc = LDLM_ITER_CONTINUE;
1714
1715        if (!res)
1716                return LDLM_ITER_CONTINUE;
1717
1718        lock_res(res);
1719        list_for_each_safe(tmp, next, &res->lr_granted) {
1720                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1721
1722                if (iter(lock, closure) == LDLM_ITER_STOP) {
1723                        rc = LDLM_ITER_STOP;
1724                        goto out;
1725                }
1726        }
1727
1728        list_for_each_safe(tmp, next, &res->lr_waiting) {
1729                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1730
1731                if (iter(lock, closure) == LDLM_ITER_STOP) {
1732                        rc = LDLM_ITER_STOP;
1733                        goto out;
1734                }
1735        }
1736 out:
1737        unlock_res(res);
1738        return rc;
1739}
1740
1741struct iter_helper_data {
1742        ldlm_iterator_t iter;
1743        void *closure;
1744};
1745
1746static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure)
1747{
1748        struct iter_helper_data *helper = closure;
1749
1750        return helper->iter(lock, helper->closure);
1751}
1752
1753static int ldlm_res_iter_helper(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1754                                struct hlist_node *hnode, void *arg)
1755
1756{
1757        struct ldlm_resource *res = cfs_hash_object(hs, hnode);
1758
1759        return ldlm_resource_foreach(res, ldlm_iter_helper, arg) ==
1760               LDLM_ITER_STOP;
1761}
1762
1763static void ldlm_namespace_foreach(struct ldlm_namespace *ns,
1764                                   ldlm_iterator_t iter, void *closure)
1765
1766{
1767        struct iter_helper_data helper = {
1768                .iter           = iter,
1769                .closure        = closure,
1770        };
1771
1772        cfs_hash_for_each_nolock(ns->ns_rs_hash,
1773                                 ldlm_res_iter_helper, &helper);
1774
1775}
1776
1777/* non-blocking function to manipulate a lock whose cb_data is being put away.
1778 * return  0:  find no resource
1779 *       > 0:  must be LDLM_ITER_STOP/LDLM_ITER_CONTINUE.
1780 *       < 0:  errors
1781 */
1782int ldlm_resource_iterate(struct ldlm_namespace *ns,
1783                          const struct ldlm_res_id *res_id,
1784                          ldlm_iterator_t iter, void *data)
1785{
1786        struct ldlm_resource *res;
1787        int rc;
1788
1789        if (!ns) {
1790                CERROR("must pass in namespace\n");
1791                LBUG();
1792        }
1793
1794        res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
1795        if (!res)
1796                return 0;
1797
1798        LDLM_RESOURCE_ADDREF(res);
1799        rc = ldlm_resource_foreach(res, iter, data);
1800        LDLM_RESOURCE_DELREF(res);
1801        ldlm_resource_putref(res);
1802        return rc;
1803}
1804EXPORT_SYMBOL(ldlm_resource_iterate);
1805
1806/* Lock replay */
1807
1808static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
1809{
1810        struct list_head *list = closure;
1811
1812        /* we use l_pending_chain here, because it's unused on clients. */
1813        LASSERTF(list_empty(&lock->l_pending_chain),
1814                 "lock %p next %p prev %p\n",
1815                 lock, &lock->l_pending_chain.next,
1816                 &lock->l_pending_chain.prev);
1817        /* bug 9573: don't replay locks left after eviction, or
1818         * bug 17614: locks being actively cancelled. Get a reference
1819         * on a lock so that it does not disappear under us (e.g. due to cancel)
1820         */
1821        if (!(lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_CANCELING))) {
1822                list_add(&lock->l_pending_chain, list);
1823                LDLM_LOCK_GET(lock);
1824        }
1825
1826        return LDLM_ITER_CONTINUE;
1827}
1828
1829static int replay_lock_interpret(const struct lu_env *env,
1830                                 struct ptlrpc_request *req,
1831                                 struct ldlm_async_args *aa, int rc)
1832{
1833        struct ldlm_lock     *lock;
1834        struct ldlm_reply    *reply;
1835        struct obd_export    *exp;
1836
1837        atomic_dec(&req->rq_import->imp_replay_inflight);
1838        if (rc != ELDLM_OK)
1839                goto out;
1840
1841        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1842        if (!reply) {
1843                rc = -EPROTO;
1844                goto out;
1845        }
1846
1847        lock = ldlm_handle2lock(&aa->lock_handle);
1848        if (!lock) {
1849                CERROR("received replay ack for unknown local cookie %#llx remote cookie %#llx from server %s id %s\n",
1850                       aa->lock_handle.cookie, reply->lock_handle.cookie,
1851                       req->rq_export->exp_client_uuid.uuid,
1852                       libcfs_id2str(req->rq_peer));
1853                rc = -ESTALE;
1854                goto out;
1855        }
1856
1857        /* Key change rehash lock in per-export hash with new key */
1858        exp = req->rq_export;
1859        if (exp && exp->exp_lock_hash) {
1860                /* In the function below, .hs_keycmp resolves to
1861                 * ldlm_export_lock_keycmp()
1862                 */
1863                /* coverity[overrun-buffer-val] */
1864                cfs_hash_rehash_key(exp->exp_lock_hash,
1865                                    &lock->l_remote_handle,
1866                                    &reply->lock_handle,
1867                                    &lock->l_exp_hash);
1868        } else {
1869                lock->l_remote_handle = reply->lock_handle;
1870        }
1871
1872        LDLM_DEBUG(lock, "replayed lock:");
1873        ptlrpc_import_recovery_state_machine(req->rq_import);
1874        LDLM_LOCK_PUT(lock);
1875out:
1876        if (rc != ELDLM_OK)
1877                ptlrpc_connect_import(req->rq_import);
1878
1879        return rc;
1880}
1881
1882static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
1883{
1884        struct ptlrpc_request *req;
1885        struct ldlm_async_args *aa;
1886        struct ldlm_request   *body;
1887        int flags;
1888
1889        /* Bug 11974: Do not replay a lock which is actively being canceled */
1890        if (lock->l_flags & LDLM_FL_CANCELING) {
1891                LDLM_DEBUG(lock, "Not replaying canceled lock:");
1892                return 0;
1893        }
1894
1895        /* If this is reply-less callback lock, we cannot replay it, since
1896         * server might have long dropped it, but notification of that event was
1897         * lost by network. (and server granted conflicting lock already)
1898         */
1899        if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) {
1900                LDLM_DEBUG(lock, "Not replaying reply-less lock:");
1901                ldlm_lock_cancel(lock);
1902                return 0;
1903        }
1904
1905        /*
1906         * If granted mode matches the requested mode, this lock is granted.
1907         *
1908         * If they differ, but we have a granted mode, then we were granted
1909         * one mode and now want another: ergo, converting.
1910         *
1911         * If we haven't been granted anything and are on a resource list,
1912         * then we're blocked/waiting.
1913         *
1914         * If we haven't been granted anything and we're NOT on a resource list,
1915         * then we haven't got a reply yet and don't have a known disposition.
1916         * This happens whenever a lock enqueue is the request that triggers
1917         * recovery.
1918         */
1919        if (lock->l_granted_mode == lock->l_req_mode)
1920                flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED;
1921        else if (lock->l_granted_mode)
1922                flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_CONV;
1923        else if (!list_empty(&lock->l_res_link))
1924                flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT;
1925        else
1926                flags = LDLM_FL_REPLAY;
1927
1928        req = ptlrpc_request_alloc_pack(imp, &RQF_LDLM_ENQUEUE,
1929                                        LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
1930        if (!req)
1931                return -ENOMEM;
1932
1933        /* We're part of recovery, so don't wait for it. */
1934        req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
1935
1936        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1937        ldlm_lock2desc(lock, &body->lock_desc);
1938        body->lock_flags = ldlm_flags_to_wire(flags);
1939
1940        ldlm_lock2handle(lock, &body->lock_handle[0]);
1941        if (lock->l_lvb_len > 0)
1942                req_capsule_extend(&req->rq_pill, &RQF_LDLM_ENQUEUE_LVB);
1943        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
1944                             lock->l_lvb_len);
1945        ptlrpc_request_set_replen(req);
1946        /* notify the server we've replayed all requests.
1947         * also, we mark the request to be put on a dedicated
1948         * queue to be processed after all request replayes.
1949         * bug 6063
1950         */
1951        lustre_msg_set_flags(req->rq_reqmsg, MSG_REQ_REPLAY_DONE);
1952
1953        LDLM_DEBUG(lock, "replaying lock:");
1954
1955        atomic_inc(&req->rq_import->imp_replay_inflight);
1956        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1957        aa = ptlrpc_req_async_args(req);
1958        aa->lock_handle = body->lock_handle[0];
1959        req->rq_interpret_reply = (ptlrpc_interpterer_t)replay_lock_interpret;
1960        ptlrpcd_add_req(req);
1961
1962        return 0;
1963}
1964
1965/**
1966 * Cancel as many unused locks as possible before replay. since we are
1967 * in recovery, we can't wait for any outstanding RPCs to send any RPC
1968 * to the server.
1969 *
1970 * Called only in recovery before replaying locks. there is no need to
1971 * replay locks that are unused. since the clients may hold thousands of
1972 * cached unused locks, dropping the unused locks can greatly reduce the
1973 * load on the servers at recovery time.
1974 */
1975static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
1976{
1977        int canceled;
1978        LIST_HEAD(cancels);
1979
1980        CDEBUG(D_DLMTRACE, "Dropping as many unused locks as possible before replay for namespace %s (%d)\n",
1981               ldlm_ns_name(ns), ns->ns_nr_unused);
1982
1983        /* We don't need to care whether or not LRU resize is enabled
1984         * because the LDLM_CANCEL_NO_WAIT policy doesn't use the
1985         * count parameter
1986         */
1987        canceled = ldlm_cancel_lru_local(ns, &cancels, ns->ns_nr_unused, 0,
1988                                         LCF_LOCAL, LDLM_CANCEL_NO_WAIT);
1989
1990        CDEBUG(D_DLMTRACE, "Canceled %d unused locks from namespace %s\n",
1991                           canceled, ldlm_ns_name(ns));
1992}
1993
1994int ldlm_replay_locks(struct obd_import *imp)
1995{
1996        struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
1997        LIST_HEAD(list);
1998        struct ldlm_lock *lock, *next;
1999        int rc = 0;
2000
2001        LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
2002
2003        /* don't replay locks if import failed recovery */
2004        if (imp->imp_vbr_failed)
2005                return 0;
2006
2007        /* ensure this doesn't fall to 0 before all have been queued */
2008        atomic_inc(&imp->imp_replay_inflight);
2009
2010        if (ldlm_cancel_unused_locks_before_replay)
2011                ldlm_cancel_unused_locks_for_replay(ns);
2012
2013        ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
2014
2015        list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
2016                list_del_init(&lock->l_pending_chain);
2017                if (rc) {
2018                        LDLM_LOCK_RELEASE(lock);
2019                        continue; /* or try to do the rest? */
2020                }
2021                rc = replay_one_lock(imp, lock);
2022                LDLM_LOCK_RELEASE(lock);
2023        }
2024
2025        atomic_dec(&imp->imp_replay_inflight);
2026
2027        return rc;
2028}
2029EXPORT_SYMBOL(ldlm_replay_locks);
2030