linux/drivers/staging/lustre/lustre/ldlm/ldlm_request.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * GPL HEADER START
   4 *
   5 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   6 *
   7 * This program is free software; you can redistribute it and/or modify
   8 * it under the terms of the GNU General Public License version 2 only,
   9 * as published by the Free Software Foundation.
  10 *
  11 * This program is distributed in the hope that it will be useful, but
  12 * WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * General Public License version 2 for more details (a copy is included
  15 * in the LICENSE file that accompanied this code).
  16 *
  17 * You should have received a copy of the GNU General Public License
  18 * version 2 along with this program; If not, see
  19 * http://www.gnu.org/licenses/gpl-2.0.html
  20 *
  21 * GPL HEADER END
  22 */
  23/*
  24 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  25 * Use is subject to license terms.
  26 *
  27 * Copyright (c) 2010, 2015, Intel Corporation.
  28 */
  29/*
  30 * This file is part of Lustre, http://www.lustre.org/
  31 * Lustre is a trademark of Sun Microsystems, Inc.
  32 */
  33/**
  34 * This file contains Asynchronous System Trap (AST) handlers and related
  35 * LDLM request-processing routines.
  36 *
  37 * An AST is a callback issued on a lock when its state is changed. There are
  38 * several different types of ASTs (callbacks) registered for each lock:
  39 *
  40 * - completion AST: when a lock is enqueued by some process, but cannot be
  41 *   granted immediately due to other conflicting locks on the same resource,
  42 *   the completion AST is sent to notify the caller when the lock is
  43 *   eventually granted
  44 *
  45 * - blocking AST: when a lock is granted to some process, if another process
  46 *   enqueues a conflicting (blocking) lock on a resource, a blocking AST is
  47 *   sent to notify the holder(s) of the lock(s) of the conflicting lock
  48 *   request. The lock holder(s) must release their lock(s) on that resource in
  49 *   a timely manner or be evicted by the server.
  50 *
  51 * - glimpse AST: this is used when a process wants information about a lock
  52 *   (i.e. the lock value block (LVB)) but does not necessarily require holding
  53 *   the lock. If the resource is locked, the lock holder(s) are sent glimpse
  54 *   ASTs and the LVB is returned to the caller, and lock holder(s) may CANCEL
  55 *   their lock(s) if they are idle. If the resource is not locked, the server
  56 *   may grant the lock.
  57 */
  58
  59#define DEBUG_SUBSYSTEM S_LDLM
  60
  61#include <lustre_errno.h>
  62#include <lustre_dlm.h>
  63#include <obd_class.h>
  64#include <obd.h>
  65
  66#include "ldlm_internal.h"
  67
  68unsigned int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
  69module_param(ldlm_enqueue_min, uint, 0644);
  70MODULE_PARM_DESC(ldlm_enqueue_min, "lock enqueue timeout minimum");
  71
  72/* in client side, whether the cached locks will be canceled before replay */
  73unsigned int ldlm_cancel_unused_locks_before_replay = 1;
  74
  75static void interrupted_completion_wait(void *data)
  76{
  77}
  78
  79struct lock_wait_data {
  80        struct ldlm_lock *lwd_lock;
  81        __u32        lwd_conn_cnt;
  82};
  83
  84struct ldlm_async_args {
  85        struct lustre_handle lock_handle;
  86};
  87
  88/**
  89 * ldlm_request_bufsize
  90 *
  91 * @count:      number of ldlm handles
  92 * @type:       ldlm opcode
  93 *
  94 * If opcode=LDLM_ENQUEUE, 1 slot is already occupied,
  95 * LDLM_LOCKREQ_HANDLE -1 slots are available.
  96 * Otherwise, LDLM_LOCKREQ_HANDLE slots are available.
  97 *
  98 * Return:      size of the request buffer
  99 */
 100static int ldlm_request_bufsize(int count, int type)
 101{
 102        int avail = LDLM_LOCKREQ_HANDLES;
 103
 104        if (type == LDLM_ENQUEUE)
 105                avail -= LDLM_ENQUEUE_CANCEL_OFF;
 106
 107        if (count > avail)
 108                avail = (count - avail) * sizeof(struct lustre_handle);
 109        else
 110                avail = 0;
 111
 112        return sizeof(struct ldlm_request) + avail;
 113}
 114
 115static int ldlm_expired_completion_wait(void *data)
 116{
 117        struct lock_wait_data *lwd = data;
 118        struct ldlm_lock *lock = lwd->lwd_lock;
 119        struct obd_import *imp;
 120        struct obd_device *obd;
 121
 122        if (!lock->l_conn_export) {
 123                static unsigned long next_dump, last_dump;
 124
 125                LDLM_ERROR(lock,
 126                           "lock timed out (enqueued at %lld, %llds ago); not entering recovery in server code, just going back to sleep",
 127                           (s64)lock->l_last_activity,
 128                           (s64)(ktime_get_real_seconds() -
 129                                 lock->l_last_activity));
 130                if (cfs_time_after(cfs_time_current(), next_dump)) {
 131                        last_dump = next_dump;
 132                        next_dump = cfs_time_shift(300);
 133                        ldlm_namespace_dump(D_DLMTRACE,
 134                                            ldlm_lock_to_ns(lock));
 135                        if (last_dump == 0)
 136                                libcfs_debug_dumplog();
 137                }
 138                return 0;
 139        }
 140
 141        obd = lock->l_conn_export->exp_obd;
 142        imp = obd->u.cli.cl_import;
 143        ptlrpc_fail_import(imp, lwd->lwd_conn_cnt);
 144        LDLM_ERROR(lock,
 145                   "lock timed out (enqueued at %lld, %llds ago), entering recovery for %s@%s",
 146                   (s64)lock->l_last_activity,
 147                   (s64)(ktime_get_real_seconds() - lock->l_last_activity),
 148                   obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);
 149
 150        return 0;
 151}
 152
 153/**
 154 * Calculate the Completion timeout (covering enqueue, BL AST, data flush,
 155 * lock cancel, and their replies). Used for lock completion timeout on the
 156 * client side.
 157 *
 158 * \param[in] lock      lock which is waiting the completion callback
 159 *
 160 * \retval              timeout in seconds to wait for the server reply
 161 */
 162/* We use the same basis for both server side and client side functions
 163 * from a single node.
 164 */
 165static unsigned int ldlm_cp_timeout(struct ldlm_lock *lock)
 166{
 167        unsigned int timeout;
 168
 169        if (AT_OFF)
 170                return obd_timeout;
 171
 172        /*
 173         * Wait a long time for enqueue - server may have to callback a
 174         * lock from another client.  Server will evict the other client if it
 175         * doesn't respond reasonably, and then give us the lock.
 176         */
 177        timeout = at_get(ldlm_lock_to_ns_at(lock));
 178        return max(3 * timeout, ldlm_enqueue_min);
 179}
 180
 181/**
 182 * Helper function for ldlm_completion_ast(), updating timings when lock is
 183 * actually granted.
 184 */
 185static int ldlm_completion_tail(struct ldlm_lock *lock, void *data)
 186{
 187        long delay;
 188        int result = 0;
 189
 190        if (ldlm_is_destroyed(lock) || ldlm_is_failed(lock)) {
 191                LDLM_DEBUG(lock, "client-side enqueue: destroyed");
 192                result = -EIO;
 193        } else if (!data) {
 194                LDLM_DEBUG(lock, "client-side enqueue: granted");
 195        } else {
 196                /* Take into AT only CP RPC, not immediately granted locks */
 197                delay = ktime_get_real_seconds() - lock->l_last_activity;
 198                LDLM_DEBUG(lock, "client-side enqueue: granted after %lds",
 199                           delay);
 200
 201                /* Update our time estimate */
 202                at_measured(ldlm_lock_to_ns_at(lock), delay);
 203        }
 204        return result;
 205}
 206
 207/**
 208 * Implementation of ->l_completion_ast() for a client, that doesn't wait
 209 * until lock is granted. Suitable for locks enqueued through ptlrpcd, of
 210 * other threads that cannot block for long.
 211 */
 212int ldlm_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data)
 213{
 214        if (flags == LDLM_FL_WAIT_NOREPROC) {
 215                LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock");
 216                return 0;
 217        }
 218
 219        if (!(flags & LDLM_FL_BLOCKED_MASK)) {
 220                wake_up(&lock->l_waitq);
 221                return ldlm_completion_tail(lock, data);
 222        }
 223
 224        LDLM_DEBUG(lock,
 225                   "client-side enqueue returned a blocked lock, going forward");
 226        return 0;
 227}
 228EXPORT_SYMBOL(ldlm_completion_ast_async);
 229
 230/**
 231 * Generic LDLM "completion" AST. This is called in several cases:
 232 *
 233 *     - when a reply to an ENQUEUE RPC is received from the server
 234 *       (ldlm_cli_enqueue_fini()). Lock might be granted or not granted at
 235 *       this point (determined by flags);
 236 *
 237 *     - when LDLM_CP_CALLBACK RPC comes to client to notify it that lock has
 238 *       been granted;
 239 *
 240 *     - when ldlm_lock_match(LDLM_FL_LVB_READY) is about to wait until lock
 241 *       gets correct lvb;
 242 *
 243 *     - to force all locks when resource is destroyed (cleanup_resource());
 244 *
 245 *     - during lock conversion (not used currently).
 246 *
 247 * If lock is not granted in the first case, this function waits until second
 248 * or penultimate cases happen in some other thread.
 249 *
 250 */
 251int ldlm_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
 252{
 253        /* XXX ALLOCATE - 160 bytes */
 254        struct lock_wait_data lwd;
 255        struct obd_device *obd;
 256        struct obd_import *imp = NULL;
 257        struct l_wait_info lwi;
 258        __u32 timeout;
 259        int rc = 0;
 260
 261        if (flags == LDLM_FL_WAIT_NOREPROC) {
 262                LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock");
 263                goto noreproc;
 264        }
 265
 266        if (!(flags & LDLM_FL_BLOCKED_MASK)) {
 267                wake_up(&lock->l_waitq);
 268                return 0;
 269        }
 270
 271        LDLM_DEBUG(lock,
 272                   "client-side enqueue returned a blocked lock, sleeping");
 273
 274noreproc:
 275
 276        obd = class_exp2obd(lock->l_conn_export);
 277
 278        /* if this is a local lock, then there is no import */
 279        if (obd)
 280                imp = obd->u.cli.cl_import;
 281
 282        timeout = ldlm_cp_timeout(lock);
 283
 284        lwd.lwd_lock = lock;
 285        lock->l_last_activity = ktime_get_real_seconds();
 286
 287        if (ldlm_is_no_timeout(lock)) {
 288                LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
 289                lwi = LWI_INTR(interrupted_completion_wait, &lwd);
 290        } else {
 291                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
 292                                       ldlm_expired_completion_wait,
 293                                       interrupted_completion_wait, &lwd);
 294        }
 295
 296        if (imp) {
 297                spin_lock(&imp->imp_lock);
 298                lwd.lwd_conn_cnt = imp->imp_conn_cnt;
 299                spin_unlock(&imp->imp_lock);
 300        }
 301
 302        if (OBD_FAIL_CHECK_RESET(OBD_FAIL_LDLM_INTR_CP_AST,
 303                                 OBD_FAIL_LDLM_CP_BL_RACE | OBD_FAIL_ONCE)) {
 304                ldlm_set_fail_loc(lock);
 305                rc = -EINTR;
 306        } else {
 307                /* Go to sleep until the lock is granted or cancelled. */
 308                rc = l_wait_event(lock->l_waitq,
 309                                  is_granted_or_cancelled(lock), &lwi);
 310        }
 311
 312        if (rc) {
 313                LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
 314                           rc);
 315                return rc;
 316        }
 317
 318        return ldlm_completion_tail(lock, data);
 319}
 320EXPORT_SYMBOL(ldlm_completion_ast);
 321
 322static void failed_lock_cleanup(struct ldlm_namespace *ns,
 323                                struct ldlm_lock *lock, int mode)
 324{
 325        int need_cancel = 0;
 326
 327        /* Set a flag to prevent us from sending a CANCEL (bug 407) */
 328        lock_res_and_lock(lock);
 329        /* Check that lock is not granted or failed, we might race. */
 330        if ((lock->l_req_mode != lock->l_granted_mode) &&
 331            !ldlm_is_failed(lock)) {
 332                /* Make sure that this lock will not be found by raced
 333                 * bl_ast and -EINVAL reply is sent to server anyways.
 334                 * bug 17645
 335                 */
 336                lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED |
 337                                 LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING;
 338                need_cancel = 1;
 339        }
 340        unlock_res_and_lock(lock);
 341
 342        if (need_cancel)
 343                LDLM_DEBUG(lock,
 344                           "setting FL_LOCAL_ONLY | LDLM_FL_FAILED | LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING");
 345        else
 346                LDLM_DEBUG(lock, "lock was granted or failed in race");
 347
 348        /* XXX - HACK because we shouldn't call ldlm_lock_destroy()
 349         *       from llite/file.c/ll_file_flock().
 350         */
 351        /* This code makes for the fact that we do not have blocking handler on
 352         * a client for flock locks. As such this is the place where we must
 353         * completely kill failed locks. (interrupted and those that
 354         * were waiting to be granted when server evicted us.
 355         */
 356        if (lock->l_resource->lr_type == LDLM_FLOCK) {
 357                lock_res_and_lock(lock);
 358                if (!ldlm_is_destroyed(lock)) {
 359                        ldlm_resource_unlink_lock(lock);
 360                        ldlm_lock_decref_internal_nolock(lock, mode);
 361                        ldlm_lock_destroy_nolock(lock);
 362                }
 363                unlock_res_and_lock(lock);
 364        } else {
 365                ldlm_lock_decref_internal(lock, mode);
 366        }
 367}
 368
 369/**
 370 * Finishing portion of client lock enqueue code.
 371 *
 372 * Called after receiving reply from server.
 373 */
 374int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
 375                          enum ldlm_type type, __u8 with_policy,
 376                          enum ldlm_mode mode,
 377                          __u64 *flags, void *lvb, __u32 lvb_len,
 378                          const struct lustre_handle *lockh, int rc)
 379{
 380        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
 381        int is_replay = *flags & LDLM_FL_REPLAY;
 382        struct ldlm_lock *lock;
 383        struct ldlm_reply *reply;
 384        int cleanup_phase = 1;
 385
 386        lock = ldlm_handle2lock(lockh);
 387        /* ldlm_cli_enqueue is holding a reference on this lock. */
 388        if (!lock) {
 389                LASSERT(type == LDLM_FLOCK);
 390                return -ENOLCK;
 391        }
 392
 393        LASSERTF(ergo(lvb_len != 0, lvb_len == lock->l_lvb_len),
 394                 "lvb_len = %d, l_lvb_len = %d\n", lvb_len, lock->l_lvb_len);
 395
 396        if (rc != ELDLM_OK) {
 397                LASSERT(!is_replay);
 398                LDLM_DEBUG(lock, "client-side enqueue END (%s)",
 399                           rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
 400
 401                if (rc != ELDLM_LOCK_ABORTED)
 402                        goto cleanup;
 403        }
 404
 405        /* Before we return, swab the reply */
 406        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
 407        if (!reply) {
 408                rc = -EPROTO;
 409                goto cleanup;
 410        }
 411
 412        if (lvb_len > 0) {
 413                int size = 0;
 414
 415                size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB,
 416                                            RCL_SERVER);
 417                if (size < 0) {
 418                        LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", size);
 419                        rc = size;
 420                        goto cleanup;
 421                } else if (unlikely(size > lvb_len)) {
 422                        LDLM_ERROR(lock,
 423                                   "Replied LVB is larger than expectation, expected = %d, replied = %d",
 424                                   lvb_len, size);
 425                        rc = -EINVAL;
 426                        goto cleanup;
 427                }
 428                lvb_len = size;
 429        }
 430
 431        if (rc == ELDLM_LOCK_ABORTED) {
 432                if (lvb_len > 0 && lvb)
 433                        rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
 434                                           lvb, lvb_len);
 435                if (rc == 0)
 436                        rc = ELDLM_LOCK_ABORTED;
 437                goto cleanup;
 438        }
 439
 440        /* lock enqueued on the server */
 441        cleanup_phase = 0;
 442
 443        lock_res_and_lock(lock);
 444        /* Key change rehash lock in per-export hash with new key */
 445        if (exp->exp_lock_hash) {
 446                /* In the function below, .hs_keycmp resolves to
 447                 * ldlm_export_lock_keycmp()
 448                 */
 449                /* coverity[overrun-buffer-val] */
 450                cfs_hash_rehash_key(exp->exp_lock_hash,
 451                                    &lock->l_remote_handle,
 452                                    &reply->lock_handle,
 453                                    &lock->l_exp_hash);
 454        } else {
 455                lock->l_remote_handle = reply->lock_handle;
 456        }
 457
 458        *flags = ldlm_flags_from_wire(reply->lock_flags);
 459        lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
 460                                              LDLM_FL_INHERIT_MASK);
 461        unlock_res_and_lock(lock);
 462
 463        CDEBUG(D_INFO, "local: %p, remote cookie: %#llx, flags: 0x%llx\n",
 464               lock, reply->lock_handle.cookie, *flags);
 465
 466        /* If enqueue returned a blocked lock but the completion handler has
 467         * already run, then it fixed up the resource and we don't need to do it
 468         * again.
 469         */
 470        if ((*flags) & LDLM_FL_LOCK_CHANGED) {
 471                int newmode = reply->lock_desc.l_req_mode;
 472
 473                LASSERT(!is_replay);
 474                if (newmode && newmode != lock->l_req_mode) {
 475                        LDLM_DEBUG(lock, "server returned different mode %s",
 476                                   ldlm_lockname[newmode]);
 477                        lock->l_req_mode = newmode;
 478                }
 479
 480                if (!ldlm_res_eq(&reply->lock_desc.l_resource.lr_name,
 481                                 &lock->l_resource->lr_name)) {
 482                        CDEBUG(D_INFO,
 483                               "remote intent success, locking " DLDLMRES " instead of " DLDLMRES "\n",
 484                               PLDLMRES(&reply->lock_desc.l_resource),
 485                               PLDLMRES(lock->l_resource));
 486
 487                        rc = ldlm_lock_change_resource(ns, lock,
 488                                        &reply->lock_desc.l_resource.lr_name);
 489                        if (rc || !lock->l_resource) {
 490                                rc = -ENOMEM;
 491                                goto cleanup;
 492                        }
 493                        LDLM_DEBUG(lock, "client-side enqueue, new resource");
 494                }
 495                if (with_policy)
 496                        if (!(type == LDLM_IBITS &&
 497                              !(exp_connect_flags(exp) & OBD_CONNECT_IBITS)))
 498                                /* We assume lock type cannot change on server*/
 499                                ldlm_convert_policy_to_local(exp,
 500                                                lock->l_resource->lr_type,
 501                                                &reply->lock_desc.l_policy_data,
 502                                                &lock->l_policy_data);
 503                if (type != LDLM_PLAIN)
 504                        LDLM_DEBUG(lock,
 505                                   "client-side enqueue, new policy data");
 506        }
 507
 508        if ((*flags) & LDLM_FL_AST_SENT) {
 509                lock_res_and_lock(lock);
 510                lock->l_flags |= LDLM_FL_CBPENDING |  LDLM_FL_BL_AST;
 511                unlock_res_and_lock(lock);
 512                LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
 513        }
 514
 515        /* If the lock has already been granted by a completion AST, don't
 516         * clobber the LVB with an older one.
 517         */
 518        if (lvb_len > 0) {
 519                /* We must lock or a racing completion might update lvb without
 520                 * letting us know and we'll clobber the correct value.
 521                 * Cannot unlock after the check either, as that still leaves
 522                 * a tiny window for completion to get in
 523                 */
 524                lock_res_and_lock(lock);
 525                if (lock->l_req_mode != lock->l_granted_mode)
 526                        rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
 527                                           lock->l_lvb_data, lvb_len);
 528                unlock_res_and_lock(lock);
 529                if (rc < 0) {
 530                        cleanup_phase = 1;
 531                        goto cleanup;
 532                }
 533        }
 534
 535        if (!is_replay) {
 536                rc = ldlm_lock_enqueue(ns, &lock, NULL, flags);
 537                if (lock->l_completion_ast) {
 538                        int err = lock->l_completion_ast(lock, *flags, NULL);
 539
 540                        if (!rc)
 541                                rc = err;
 542                        if (rc)
 543                                cleanup_phase = 1;
 544                }
 545        }
 546
 547        if (lvb_len > 0 && lvb) {
 548                /* Copy the LVB here, and not earlier, because the completion
 549                 * AST (if any) can override what we got in the reply
 550                 */
 551                memcpy(lvb, lock->l_lvb_data, lvb_len);
 552        }
 553
 554        LDLM_DEBUG(lock, "client-side enqueue END");
 555cleanup:
 556        if (cleanup_phase == 1 && rc)
 557                failed_lock_cleanup(ns, lock, mode);
 558        /* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */
 559        LDLM_LOCK_PUT(lock);
 560        LDLM_LOCK_RELEASE(lock);
 561        return rc;
 562}
 563EXPORT_SYMBOL(ldlm_cli_enqueue_fini);
 564
 565/**
 566 * Estimate number of lock handles that would fit into request of given
 567 * size.  PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into
 568 * a single page on the send/receive side. XXX: 512 should be changed to
 569 * more adequate value.
 570 */
 571static inline int ldlm_req_handles_avail(int req_size, int off)
 572{
 573        int avail;
 574
 575        avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512) - req_size;
 576        if (likely(avail >= 0))
 577                avail /= (int)sizeof(struct lustre_handle);
 578        else
 579                avail = 0;
 580        avail += LDLM_LOCKREQ_HANDLES - off;
 581
 582        return avail;
 583}
 584
 585static inline int ldlm_capsule_handles_avail(struct req_capsule *pill,
 586                                             enum req_location loc,
 587                                             int off)
 588{
 589        u32 size = req_capsule_msg_size(pill, loc);
 590
 591        return ldlm_req_handles_avail(size, off);
 592}
 593
 594static inline int ldlm_format_handles_avail(struct obd_import *imp,
 595                                            const struct req_format *fmt,
 596                                            enum req_location loc, int off)
 597{
 598        u32 size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc);
 599
 600        return ldlm_req_handles_avail(size, off);
 601}
 602
 603/**
 604 * Cancel LRU locks and pack them into the enqueue request. Pack there the given
 605 * \a count locks in \a cancels.
 606 *
 607 * This is to be called by functions preparing their own requests that
 608 * might contain lists of locks to cancel in addition to actual operation
 609 * that needs to be performed.
 610 */
 611int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
 612                      int version, int opc, int canceloff,
 613                      struct list_head *cancels, int count)
 614{
 615        struct ldlm_namespace   *ns = exp->exp_obd->obd_namespace;
 616        struct req_capsule      *pill = &req->rq_pill;
 617        struct ldlm_request     *dlm = NULL;
 618        int flags, avail, to_free, pack = 0;
 619        LIST_HEAD(head);
 620        int rc;
 621
 622        if (!cancels)
 623                cancels = &head;
 624        if (ns_connect_cancelset(ns)) {
 625                /* Estimate the amount of available space in the request. */
 626                req_capsule_filled_sizes(pill, RCL_CLIENT);
 627                avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff);
 628
 629                flags = ns_connect_lru_resize(ns) ?
 630                        LDLM_LRU_FLAG_LRUR_NO_WAIT : LDLM_LRU_FLAG_AGED;
 631                to_free = !ns_connect_lru_resize(ns) &&
 632                          opc == LDLM_ENQUEUE ? 1 : 0;
 633
 634                /* Cancel LRU locks here _only_ if the server supports
 635                 * EARLY_CANCEL. Otherwise we have to send extra CANCEL
 636                 * RPC, which will make us slower.
 637                 */
 638                if (avail > count)
 639                        count += ldlm_cancel_lru_local(ns, cancels, to_free,
 640                                                       avail - count, 0, flags);
 641                if (avail > count)
 642                        pack = count;
 643                else
 644                        pack = avail;
 645                req_capsule_set_size(pill, &RMF_DLM_REQ, RCL_CLIENT,
 646                                     ldlm_request_bufsize(pack, opc));
 647        }
 648
 649        rc = ptlrpc_request_pack(req, version, opc);
 650        if (rc) {
 651                ldlm_lock_list_put(cancels, l_bl_ast, count);
 652                return rc;
 653        }
 654
 655        if (ns_connect_cancelset(ns)) {
 656                if (canceloff) {
 657                        dlm = req_capsule_client_get(pill, &RMF_DLM_REQ);
 658                        LASSERT(dlm);
 659                        /* Skip first lock handler in ldlm_request_pack(),
 660                         * this method will increment @lock_count according
 661                         * to the lock handle amount actually written to
 662                         * the buffer.
 663                         */
 664                        dlm->lock_count = canceloff;
 665                }
 666                /* Pack into the request @pack lock handles. */
 667                ldlm_cli_cancel_list(cancels, pack, req, 0);
 668                /* Prepare and send separate cancel RPC for others. */
 669                ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);
 670        } else {
 671                ldlm_lock_list_put(cancels, l_bl_ast, count);
 672        }
 673        return 0;
 674}
 675EXPORT_SYMBOL(ldlm_prep_elc_req);
 676
 677int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req,
 678                          struct list_head *cancels, int count)
 679{
 680        return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
 681                                 LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
 682}
 683EXPORT_SYMBOL(ldlm_prep_enqueue_req);
 684
 685static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp,
 686                                                int lvb_len)
 687{
 688        struct ptlrpc_request *req;
 689        int rc;
 690
 691        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
 692        if (!req)
 693                return ERR_PTR(-ENOMEM);
 694
 695        rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
 696        if (rc) {
 697                ptlrpc_request_free(req);
 698                return ERR_PTR(rc);
 699        }
 700
 701        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
 702        ptlrpc_request_set_replen(req);
 703        return req;
 704}
 705
 706/**
 707 * Client-side lock enqueue.
 708 *
 709 * If a request has some specific initialisation it is passed in \a reqp,
 710 * otherwise it is created in ldlm_cli_enqueue.
 711 *
 712 * Supports sync and async requests, pass \a async flag accordingly. If a
 713 * request was created in ldlm_cli_enqueue and it is the async request,
 714 * pass it to the caller in \a reqp.
 715 */
 716int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 717                     struct ldlm_enqueue_info *einfo,
 718                     const struct ldlm_res_id *res_id,
 719                     union ldlm_policy_data const *policy, __u64 *flags,
 720                     void *lvb, __u32 lvb_len, enum lvb_type lvb_type,
 721                     struct lustre_handle *lockh, int async)
 722{
 723        struct ldlm_namespace *ns;
 724        struct ldlm_lock      *lock;
 725        struct ldlm_request   *body;
 726        int                 is_replay = *flags & LDLM_FL_REPLAY;
 727        int                 req_passed_in = 1;
 728        int                 rc, err;
 729        struct ptlrpc_request *req;
 730
 731        ns = exp->exp_obd->obd_namespace;
 732
 733        /* If we're replaying this lock, just check some invariants.
 734         * If we're creating a new lock, get everything all setup nicely.
 735         */
 736        if (is_replay) {
 737                lock = ldlm_handle2lock_long(lockh, 0);
 738                LASSERT(lock);
 739                LDLM_DEBUG(lock, "client-side enqueue START");
 740                LASSERT(exp == lock->l_conn_export);
 741        } else {
 742                const struct ldlm_callback_suite cbs = {
 743                        .lcs_completion = einfo->ei_cb_cp,
 744                        .lcs_blocking   = einfo->ei_cb_bl,
 745                        .lcs_glimpse    = einfo->ei_cb_gl
 746                };
 747                lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
 748                                        einfo->ei_mode, &cbs, einfo->ei_cbdata,
 749                                        lvb_len, lvb_type);
 750                if (IS_ERR(lock))
 751                        return PTR_ERR(lock);
 752                /* for the local lock, add the reference */
 753                ldlm_lock_addref_internal(lock, einfo->ei_mode);
 754                ldlm_lock2handle(lock, lockh);
 755                if (policy)
 756                        lock->l_policy_data = *policy;
 757
 758                if (einfo->ei_type == LDLM_EXTENT) {
 759                        /* extent lock without policy is a bug */
 760                        if (!policy)
 761                                LBUG();
 762
 763                        lock->l_req_extent = policy->l_extent;
 764                }
 765                LDLM_DEBUG(lock, "client-side enqueue START, flags %llx",
 766                           *flags);
 767        }
 768
 769        lock->l_conn_export = exp;
 770        lock->l_export = NULL;
 771        lock->l_blocking_ast = einfo->ei_cb_bl;
 772        lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL));
 773        lock->l_last_activity = ktime_get_real_seconds();
 774
 775        /* lock not sent to server yet */
 776        if (!reqp || !*reqp) {
 777                req = ldlm_enqueue_pack(exp, lvb_len);
 778                if (IS_ERR(req)) {
 779                        failed_lock_cleanup(ns, lock, einfo->ei_mode);
 780                        LDLM_LOCK_RELEASE(lock);
 781                        return PTR_ERR(req);
 782                }
 783
 784                req_passed_in = 0;
 785                if (reqp)
 786                        *reqp = req;
 787        } else {
 788                int len;
 789
 790                req = *reqp;
 791                len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ,
 792                                           RCL_CLIENT);
 793                LASSERTF(len >= sizeof(*body), "buflen[%d] = %d, not %d\n",
 794                         DLM_LOCKREQ_OFF, len, (int)sizeof(*body));
 795        }
 796
 797        /* Dump lock data into the request buffer */
 798        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
 799        ldlm_lock2desc(lock, &body->lock_desc);
 800        body->lock_flags = ldlm_flags_to_wire(*flags);
 801        body->lock_handle[0] = *lockh;
 802
 803        if (async) {
 804                LASSERT(reqp);
 805                return 0;
 806        }
 807
 808        LDLM_DEBUG(lock, "sending request");
 809
 810        rc = ptlrpc_queue_wait(req);
 811
 812        err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
 813                                    einfo->ei_mode, flags, lvb, lvb_len,
 814                                    lockh, rc);
 815
 816        /* If ldlm_cli_enqueue_fini did not find the lock, we need to free
 817         * one reference that we took
 818         */
 819        if (err == -ENOLCK)
 820                LDLM_LOCK_RELEASE(lock);
 821        else
 822                rc = err;
 823
 824        if (!req_passed_in && req) {
 825                ptlrpc_req_finished(req);
 826                if (reqp)
 827                        *reqp = NULL;
 828        }
 829
 830        return rc;
 831}
 832EXPORT_SYMBOL(ldlm_cli_enqueue);
 833
 834/**
 835 * Cancel locks locally.
 836 * Returns:
 837 * \retval LDLM_FL_LOCAL_ONLY if there is no need for a CANCEL RPC to the server
 838 * \retval LDLM_FL_CANCELING otherwise;
 839 * \retval LDLM_FL_BL_AST if there is a need for a separate CANCEL RPC.
 840 */
 841static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock)
 842{
 843        __u64 rc = LDLM_FL_LOCAL_ONLY;
 844
 845        if (lock->l_conn_export) {
 846                bool local_only;
 847
 848                LDLM_DEBUG(lock, "client-side cancel");
 849                /* Set this flag to prevent others from getting new references*/
 850                lock_res_and_lock(lock);
 851                ldlm_set_cbpending(lock);
 852                local_only = !!(lock->l_flags &
 853                                (LDLM_FL_LOCAL_ONLY | LDLM_FL_CANCEL_ON_BLOCK));
 854                ldlm_cancel_callback(lock);
 855                rc = ldlm_is_bl_ast(lock) ? LDLM_FL_BL_AST : LDLM_FL_CANCELING;
 856                unlock_res_and_lock(lock);
 857
 858                if (local_only) {
 859                        CDEBUG(D_DLMTRACE,
 860                               "not sending request (at caller's instruction)\n");
 861                        rc = LDLM_FL_LOCAL_ONLY;
 862                }
 863                ldlm_lock_cancel(lock);
 864        } else {
 865                LDLM_ERROR(lock, "Trying to cancel local lock");
 866                LBUG();
 867        }
 868
 869        return rc;
 870}
 871
 872/**
 873 * Pack \a count locks in \a head into ldlm_request buffer of request \a req.
 874 */
 875static void ldlm_cancel_pack(struct ptlrpc_request *req,
 876                             struct list_head *head, int count)
 877{
 878        struct ldlm_request *dlm;
 879        struct ldlm_lock *lock;
 880        int max, packed = 0;
 881
 882        dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
 883        LASSERT(dlm);
 884
 885        /* Check the room in the request buffer. */
 886        max = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT) -
 887                sizeof(struct ldlm_request);
 888        max /= sizeof(struct lustre_handle);
 889        max += LDLM_LOCKREQ_HANDLES;
 890        LASSERT(max >= dlm->lock_count + count);
 891
 892        /* XXX: it would be better to pack lock handles grouped by resource.
 893         * so that the server cancel would call filter_lvbo_update() less
 894         * frequently.
 895         */
 896        list_for_each_entry(lock, head, l_bl_ast) {
 897                if (!count--)
 898                        break;
 899                LASSERT(lock->l_conn_export);
 900                /* Pack the lock handle to the given request buffer. */
 901                LDLM_DEBUG(lock, "packing");
 902                dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle;
 903                packed++;
 904        }
 905        CDEBUG(D_DLMTRACE, "%d locks packed\n", packed);
 906}
 907
 908/**
 909 * Prepare and send a batched cancel RPC. It will include \a count lock
 910 * handles of locks given in \a cancels list.
 911 */
 912static int ldlm_cli_cancel_req(struct obd_export *exp,
 913                               struct list_head *cancels,
 914                               int count, enum ldlm_cancel_flags flags)
 915{
 916        struct ptlrpc_request *req = NULL;
 917        struct obd_import *imp;
 918        int free, sent = 0;
 919        int rc = 0;
 920
 921        LASSERT(exp);
 922        LASSERT(count > 0);
 923
 924        CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, cfs_fail_val);
 925
 926        if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
 927                return count;
 928
 929        free = ldlm_format_handles_avail(class_exp2cliimp(exp),
 930                                         &RQF_LDLM_CANCEL, RCL_CLIENT, 0);
 931        if (count > free)
 932                count = free;
 933
 934        while (1) {
 935                imp = class_exp2cliimp(exp);
 936                if (!imp || imp->imp_invalid) {
 937                        CDEBUG(D_DLMTRACE,
 938                               "skipping cancel on invalid import %p\n", imp);
 939                        return count;
 940                }
 941
 942                req = ptlrpc_request_alloc(imp, &RQF_LDLM_CANCEL);
 943                if (!req) {
 944                        rc = -ENOMEM;
 945                        goto out;
 946                }
 947
 948                req_capsule_filled_sizes(&req->rq_pill, RCL_CLIENT);
 949                req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
 950                                     ldlm_request_bufsize(count, LDLM_CANCEL));
 951
 952                rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CANCEL);
 953                if (rc) {
 954                        ptlrpc_request_free(req);
 955                        goto out;
 956                }
 957
 958                req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
 959                req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
 960                ptlrpc_at_set_req_timeout(req);
 961
 962                ldlm_cancel_pack(req, cancels, count);
 963
 964                ptlrpc_request_set_replen(req);
 965                if (flags & LCF_ASYNC) {
 966                        ptlrpcd_add_req(req);
 967                        sent = count;
 968                        goto out;
 969                }
 970
 971                rc = ptlrpc_queue_wait(req);
 972                if (rc == LUSTRE_ESTALE) {
 973                        CDEBUG(D_DLMTRACE,
 974                               "client/server (nid %s) out of sync -- not fatal\n",
 975                               libcfs_nid2str(req->rq_import->
 976                                              imp_connection->c_peer.nid));
 977                        rc = 0;
 978                } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/
 979                           req->rq_import_generation == imp->imp_generation) {
 980                        ptlrpc_req_finished(req);
 981                        continue;
 982                } else if (rc != ELDLM_OK) {
 983                        /* -ESHUTDOWN is common on umount */
 984                        CDEBUG_LIMIT(rc == -ESHUTDOWN ? D_DLMTRACE : D_ERROR,
 985                                     "Got rc %d from cancel RPC: canceling anyway\n",
 986                                     rc);
 987                        break;
 988                }
 989                sent = count;
 990                break;
 991        }
 992
 993        ptlrpc_req_finished(req);
 994out:
 995        return sent ? sent : rc;
 996}
 997
 998static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
 999{
1000        return &imp->imp_obd->obd_namespace->ns_pool;
1001}
1002
1003/**
1004 * Update client's OBD pool related fields with new SLV and Limit from \a req.
1005 */
1006int ldlm_cli_update_pool(struct ptlrpc_request *req)
1007{
1008        struct obd_device *obd;
1009        __u64 new_slv;
1010        __u32 new_limit;
1011
1012        if (unlikely(!req->rq_import || !req->rq_import->imp_obd ||
1013                     !imp_connect_lru_resize(req->rq_import))) {
1014                /*
1015                 * Do nothing for corner cases.
1016                 */
1017                return 0;
1018        }
1019
1020        /* In some cases RPC may contain SLV and limit zeroed out. This
1021         * is the case when server does not support LRU resize feature.
1022         * This is also possible in some recovery cases when server-side
1023         * reqs have no reference to the OBD export and thus access to
1024         * server-side namespace is not possible.
1025         */
1026        if (lustre_msg_get_slv(req->rq_repmsg) == 0 ||
1027            lustre_msg_get_limit(req->rq_repmsg) == 0) {
1028                DEBUG_REQ(D_HA, req,
1029                          "Zero SLV or Limit found (SLV: %llu, Limit: %u)",
1030                          lustre_msg_get_slv(req->rq_repmsg),
1031                          lustre_msg_get_limit(req->rq_repmsg));
1032                return 0;
1033        }
1034
1035        new_limit = lustre_msg_get_limit(req->rq_repmsg);
1036        new_slv = lustre_msg_get_slv(req->rq_repmsg);
1037        obd = req->rq_import->imp_obd;
1038
1039        /* Set new SLV and limit in OBD fields to make them accessible
1040         * to the pool thread. We do not access obd_namespace and pool
1041         * directly here as there is no reliable way to make sure that
1042         * they are still alive at cleanup time. Evil races are possible
1043         * which may cause Oops at that time.
1044         */
1045        write_lock(&obd->obd_pool_lock);
1046        obd->obd_pool_slv = new_slv;
1047        obd->obd_pool_limit = new_limit;
1048        write_unlock(&obd->obd_pool_lock);
1049
1050        return 0;
1051}
1052
1053/**
1054 * Client side lock cancel.
1055 *
1056 * Lock must not have any readers or writers by this time.
1057 */
1058int ldlm_cli_cancel(const struct lustre_handle *lockh,
1059                    enum ldlm_cancel_flags cancel_flags)
1060{
1061        struct obd_export *exp;
1062        int avail, flags, count = 1;
1063        __u64 rc = 0;
1064        struct ldlm_namespace *ns;
1065        struct ldlm_lock *lock;
1066        LIST_HEAD(cancels);
1067
1068        lock = ldlm_handle2lock_long(lockh, 0);
1069        if (!lock) {
1070                LDLM_DEBUG_NOLOCK("lock is already being destroyed");
1071                return 0;
1072        }
1073
1074        lock_res_and_lock(lock);
1075        /* Lock is being canceled and the caller doesn't want to wait */
1076        if (ldlm_is_canceling(lock) && (cancel_flags & LCF_ASYNC)) {
1077                unlock_res_and_lock(lock);
1078                LDLM_LOCK_RELEASE(lock);
1079                return 0;
1080        }
1081
1082        ldlm_set_canceling(lock);
1083        unlock_res_and_lock(lock);
1084
1085        rc = ldlm_cli_cancel_local(lock);
1086        if (rc == LDLM_FL_LOCAL_ONLY || cancel_flags & LCF_LOCAL) {
1087                LDLM_LOCK_RELEASE(lock);
1088                return 0;
1089        }
1090        /* Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL
1091         * RPC which goes to canceld portal, so we can cancel other LRU locks
1092         * here and send them all as one LDLM_CANCEL RPC.
1093         */
1094        LASSERT(list_empty(&lock->l_bl_ast));
1095        list_add(&lock->l_bl_ast, &cancels);
1096
1097        exp = lock->l_conn_export;
1098        if (exp_connect_cancelset(exp)) {
1099                avail = ldlm_format_handles_avail(class_exp2cliimp(exp),
1100                                                  &RQF_LDLM_CANCEL,
1101                                                  RCL_CLIENT, 0);
1102                LASSERT(avail > 0);
1103
1104                ns = ldlm_lock_to_ns(lock);
1105                flags = ns_connect_lru_resize(ns) ?
1106                        LDLM_LRU_FLAG_LRUR : LDLM_LRU_FLAG_AGED;
1107                count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
1108                                               LCF_BL_AST, flags);
1109        }
1110        ldlm_cli_cancel_list(&cancels, count, NULL, cancel_flags);
1111        return 0;
1112}
1113EXPORT_SYMBOL(ldlm_cli_cancel);
1114
1115/**
1116 * Locally cancel up to \a count locks in list \a cancels.
1117 * Return the number of cancelled locks.
1118 */
1119int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
1120                               enum ldlm_cancel_flags flags)
1121{
1122        LIST_HEAD(head);
1123        struct ldlm_lock *lock, *next;
1124        int left = 0, bl_ast = 0;
1125        __u64 rc;
1126
1127        left = count;
1128        list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
1129                if (left-- == 0)
1130                        break;
1131
1132                if (flags & LCF_LOCAL) {
1133                        rc = LDLM_FL_LOCAL_ONLY;
1134                        ldlm_lock_cancel(lock);
1135                } else {
1136                        rc = ldlm_cli_cancel_local(lock);
1137                }
1138                /* Until we have compound requests and can send LDLM_CANCEL
1139                 * requests batched with generic RPCs, we need to send cancels
1140                 * with the LDLM_FL_BL_AST flag in a separate RPC from
1141                 * the one being generated now.
1142                 */
1143                if (!(flags & LCF_BL_AST) && (rc == LDLM_FL_BL_AST)) {
1144                        LDLM_DEBUG(lock, "Cancel lock separately");
1145                        list_del_init(&lock->l_bl_ast);
1146                        list_add(&lock->l_bl_ast, &head);
1147                        bl_ast++;
1148                        continue;
1149                }
1150                if (rc == LDLM_FL_LOCAL_ONLY) {
1151                        /* CANCEL RPC should not be sent to server. */
1152                        list_del_init(&lock->l_bl_ast);
1153                        LDLM_LOCK_RELEASE(lock);
1154                        count--;
1155                }
1156        }
1157        if (bl_ast > 0) {
1158                count -= bl_ast;
1159                ldlm_cli_cancel_list(&head, bl_ast, NULL, 0);
1160        }
1161
1162        return count;
1163}
1164
1165/**
1166 * Cancel as many locks as possible w/o sending any RPCs (e.g. to write back
1167 * dirty data, to close a file, ...) or waiting for any RPCs in-flight (e.g.
1168 * readahead requests, ...)
1169 */
1170static enum ldlm_policy_res
1171ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
1172                           int unused, int added, int count)
1173{
1174        enum ldlm_policy_res result = LDLM_POLICY_CANCEL_LOCK;
1175
1176        /* don't check added & count since we want to process all locks
1177         * from unused list.
1178         * It's fine to not take lock to access lock->l_resource since
1179         * the lock has already been granted so it won't change.
1180         */
1181        switch (lock->l_resource->lr_type) {
1182        case LDLM_EXTENT:
1183        case LDLM_IBITS:
1184                if (ns->ns_cancel && ns->ns_cancel(lock) != 0)
1185                        break;
1186                /* fall through */
1187        default:
1188                result = LDLM_POLICY_SKIP_LOCK;
1189                lock_res_and_lock(lock);
1190                ldlm_set_skipped(lock);
1191                unlock_res_and_lock(lock);
1192                break;
1193        }
1194
1195        return result;
1196}
1197
1198/**
1199 * Callback function for LRU-resize policy. Decides whether to keep
1200 * \a lock in LRU for current \a LRU size \a unused, added in current
1201 * scan \a added and number of locks to be preferably canceled \a count.
1202 *
1203 * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1204 *
1205 * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1206 */
1207static enum ldlm_policy_res ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
1208                                                    struct ldlm_lock *lock,
1209                                                    int unused, int added,
1210                                                    int count)
1211{
1212        unsigned long cur = cfs_time_current();
1213        struct ldlm_pool *pl = &ns->ns_pool;
1214        __u64 slv, lvf, lv;
1215        unsigned long la;
1216
1217        /* Stop LRU processing when we reach past @count or have checked all
1218         * locks in LRU.
1219         */
1220        if (count && added >= count)
1221                return LDLM_POLICY_KEEP_LOCK;
1222
1223        /*
1224         * Despite of the LV, It doesn't make sense to keep the lock which
1225         * is unused for ns_max_age time.
1226         */
1227        if (cfs_time_after(cfs_time_current(),
1228                           cfs_time_add(lock->l_last_used, ns->ns_max_age)))
1229                return LDLM_POLICY_CANCEL_LOCK;
1230
1231        slv = ldlm_pool_get_slv(pl);
1232        lvf = ldlm_pool_get_lvf(pl);
1233        la = cfs_duration_sec(cfs_time_sub(cur, lock->l_last_used));
1234        lv = lvf * la * unused;
1235
1236        /* Inform pool about current CLV to see it via debugfs. */
1237        ldlm_pool_set_clv(pl, lv);
1238
1239        /* Stop when SLV is not yet come from server or lv is smaller than
1240         * it is.
1241         */
1242        if (slv == 0 || lv < slv)
1243                return LDLM_POLICY_KEEP_LOCK;
1244
1245        return LDLM_POLICY_CANCEL_LOCK;
1246}
1247
1248/**
1249 * Callback function for debugfs used policy. Makes decision whether to keep
1250 * \a lock in LRU for current \a LRU size \a unused, added in current scan \a
1251 * added and number of locks to be preferably canceled \a count.
1252 *
1253 * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1254 *
1255 * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1256 */
1257static enum ldlm_policy_res ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
1258                                                      struct ldlm_lock *lock,
1259                                                      int unused, int added,
1260                                                      int count)
1261{
1262        /* Stop LRU processing when we reach past @count or have checked all
1263         * locks in LRU.
1264         */
1265        return (added >= count) ?
1266                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1267}
1268
1269/**
1270 * Callback function for aged policy. Makes decision whether to keep \a lock in
1271 * LRU for current LRU size \a unused, added in current scan \a added and
1272 * number of locks to be preferably canceled \a count.
1273 *
1274 * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1275 *
1276 * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1277 */
1278static enum ldlm_policy_res ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
1279                                                    struct ldlm_lock *lock,
1280                                                    int unused, int added,
1281                                                    int count)
1282{
1283        if ((added >= count) &&
1284            time_before(cfs_time_current(),
1285                        cfs_time_add(lock->l_last_used, ns->ns_max_age)))
1286                return LDLM_POLICY_KEEP_LOCK;
1287
1288        return LDLM_POLICY_CANCEL_LOCK;
1289}
1290
1291static enum ldlm_policy_res
1292ldlm_cancel_lrur_no_wait_policy(struct ldlm_namespace *ns,
1293                                struct ldlm_lock *lock,
1294                                int unused, int added,
1295                                int count)
1296{
1297        enum ldlm_policy_res result;
1298
1299        result = ldlm_cancel_lrur_policy(ns, lock, unused, added, count);
1300        if (result == LDLM_POLICY_KEEP_LOCK)
1301                return result;
1302
1303        return ldlm_cancel_no_wait_policy(ns, lock, unused, added, count);
1304}
1305
1306/**
1307 * Callback function for default policy. Makes decision whether to keep \a lock
1308 * in LRU for current LRU size \a unused, added in current scan \a added and
1309 * number of locks to be preferably canceled \a count.
1310 *
1311 * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1312 *
1313 * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1314 */
1315static enum ldlm_policy_res
1316ldlm_cancel_default_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
1317                           int unused, int added, int count)
1318{
1319        /* Stop LRU processing when we reach past count or have checked all
1320         * locks in LRU.
1321         */
1322        return (added >= count) ?
1323                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1324}
1325
1326typedef enum ldlm_policy_res (*ldlm_cancel_lru_policy_t)(
1327                                                      struct ldlm_namespace *,
1328                                                      struct ldlm_lock *, int,
1329                                                      int, int);
1330
1331static ldlm_cancel_lru_policy_t
1332ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
1333{
1334        if (flags & LDLM_LRU_FLAG_NO_WAIT)
1335                return ldlm_cancel_no_wait_policy;
1336
1337        if (ns_connect_lru_resize(ns)) {
1338                if (flags & LDLM_LRU_FLAG_SHRINK)
1339                        /* We kill passed number of old locks. */
1340                        return ldlm_cancel_passed_policy;
1341                else if (flags & LDLM_LRU_FLAG_LRUR)
1342                        return ldlm_cancel_lrur_policy;
1343                else if (flags & LDLM_LRU_FLAG_PASSED)
1344                        return ldlm_cancel_passed_policy;
1345                else if (flags & LDLM_LRU_FLAG_LRUR_NO_WAIT)
1346                        return ldlm_cancel_lrur_no_wait_policy;
1347        } else {
1348                if (flags & LDLM_LRU_FLAG_AGED)
1349                        return ldlm_cancel_aged_policy;
1350        }
1351
1352        return ldlm_cancel_default_policy;
1353}
1354
1355/**
1356 * - Free space in LRU for \a count new locks,
1357 *   redundant unused locks are canceled locally;
1358 * - also cancel locally unused aged locks;
1359 * - do not cancel more than \a max locks;
1360 * - GET the found locks and add them into the \a cancels list.
1361 *
1362 * A client lock can be added to the l_bl_ast list only when it is
1363 * marked LDLM_FL_CANCELING. Otherwise, somebody is already doing
1364 * CANCEL.  There are the following use cases:
1365 * ldlm_cancel_resource_local(), ldlm_cancel_lru_local() and
1366 * ldlm_cli_cancel(), which check and set this flag properly. As any
1367 * attempt to cancel a lock rely on this flag, l_bl_ast list is accessed
1368 * later without any special locking.
1369 *
1370 * Calling policies for enabled LRU resize:
1371 * ----------------------------------------
1372 * flags & LDLM_LRU_FLAG_LRUR   - use LRU resize policy (SLV from server) to
1373 *                                cancel not more than \a count locks;
1374 *
1375 * flags & LDLM_LRU_FLAG_PASSED - cancel \a count number of old locks (located
1376 *                                at the beginning of LRU list);
1377 *
1378 * flags & LDLM_LRU_FLAG_SHRINK - cancel not more than \a count locks according
1379 *                                to memory pressure policy function;
1380 *
1381 * flags & LDLM_LRU_FLAG_AGED   - cancel \a count locks according to
1382 *                                "aged policy".
1383 *
1384 * flags & LDLM_LRU_FLAG_NO_WAIT - cancel as many unused locks as possible
1385 *                                 (typically before replaying locks) w/o
1386 *                                 sending any RPCs or waiting for any
1387 *                                 outstanding RPC to complete.
1388 */
1389static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
1390                                 struct list_head *cancels, int count, int max,
1391                                 int flags)
1392{
1393        ldlm_cancel_lru_policy_t pf;
1394        struct ldlm_lock *lock, *next;
1395        int added = 0, unused, remained;
1396        int no_wait = flags &
1397                (LDLM_LRU_FLAG_NO_WAIT | LDLM_LRU_FLAG_LRUR_NO_WAIT);
1398
1399        spin_lock(&ns->ns_lock);
1400        unused = ns->ns_nr_unused;
1401        remained = unused;
1402
1403        if (!ns_connect_lru_resize(ns))
1404                count += unused - ns->ns_max_unused;
1405
1406        pf = ldlm_cancel_lru_policy(ns, flags);
1407        LASSERT(pf);
1408
1409        while (!list_empty(&ns->ns_unused_list)) {
1410                enum ldlm_policy_res result;
1411                time_t last_use = 0;
1412
1413                /* all unused locks */
1414                if (remained-- <= 0)
1415                        break;
1416
1417                /* For any flags, stop scanning if @max is reached. */
1418                if (max && added >= max)
1419                        break;
1420
1421                list_for_each_entry_safe(lock, next, &ns->ns_unused_list,
1422                                         l_lru) {
1423                        /* No locks which got blocking requests. */
1424                        LASSERT(!ldlm_is_bl_ast(lock));
1425
1426                        if (no_wait && ldlm_is_skipped(lock))
1427                                /* already processed */
1428                                continue;
1429
1430                        last_use = lock->l_last_used;
1431                        if (last_use == cfs_time_current())
1432                                continue;
1433
1434                        /* Somebody is already doing CANCEL. No need for this
1435                         * lock in LRU, do not traverse it again.
1436                         */
1437                        if (!ldlm_is_canceling(lock))
1438                                break;
1439
1440                        ldlm_lock_remove_from_lru_nolock(lock);
1441                }
1442                if (&lock->l_lru == &ns->ns_unused_list)
1443                        break;
1444
1445                LDLM_LOCK_GET(lock);
1446                spin_unlock(&ns->ns_lock);
1447                lu_ref_add(&lock->l_reference, __func__, current);
1448
1449                /* Pass the lock through the policy filter and see if it
1450                 * should stay in LRU.
1451                 *
1452                 * Even for shrinker policy we stop scanning if
1453                 * we find a lock that should stay in the cache.
1454                 * We should take into account lock age anyway
1455                 * as a new lock is a valuable resource even if
1456                 * it has a low weight.
1457                 *
1458                 * That is, for shrinker policy we drop only
1459                 * old locks, but additionally choose them by
1460                 * their weight. Big extent locks will stay in
1461                 * the cache.
1462                 */
1463                result = pf(ns, lock, unused, added, count);
1464                if (result == LDLM_POLICY_KEEP_LOCK) {
1465                        lu_ref_del(&lock->l_reference,
1466                                   __func__, current);
1467                        LDLM_LOCK_RELEASE(lock);
1468                        spin_lock(&ns->ns_lock);
1469                        break;
1470                }
1471                if (result == LDLM_POLICY_SKIP_LOCK) {
1472                        lu_ref_del(&lock->l_reference,
1473                                   __func__, current);
1474                        LDLM_LOCK_RELEASE(lock);
1475                        spin_lock(&ns->ns_lock);
1476                        continue;
1477                }
1478
1479                lock_res_and_lock(lock);
1480                /* Check flags again under the lock. */
1481                if (ldlm_is_canceling(lock) ||
1482                    (ldlm_lock_remove_from_lru_check(lock, last_use) == 0)) {
1483                        /* Another thread is removing lock from LRU, or
1484                         * somebody is already doing CANCEL, or there
1485                         * is a blocking request which will send cancel
1486                         * by itself, or the lock is no longer unused or
1487                         * the lock has been used since the pf() call and
1488                         * pages could be put under it.
1489                         */
1490                        unlock_res_and_lock(lock);
1491                        lu_ref_del(&lock->l_reference,
1492                                   __func__, current);
1493                        LDLM_LOCK_RELEASE(lock);
1494                        spin_lock(&ns->ns_lock);
1495                        continue;
1496                }
1497                LASSERT(!lock->l_readers && !lock->l_writers);
1498
1499                /* If we have chosen to cancel this lock voluntarily, we
1500                 * better send cancel notification to server, so that it
1501                 * frees appropriate state. This might lead to a race
1502                 * where while we are doing cancel here, server is also
1503                 * silently cancelling this lock.
1504                 */
1505                ldlm_clear_cancel_on_block(lock);
1506
1507                /* Setting the CBPENDING flag is a little misleading,
1508                 * but prevents an important race; namely, once
1509                 * CBPENDING is set, the lock can accumulate no more
1510                 * readers/writers. Since readers and writers are
1511                 * already zero here, ldlm_lock_decref() won't see
1512                 * this flag and call l_blocking_ast
1513                 */
1514                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING;
1515
1516                /* We can't re-add to l_lru as it confuses the
1517                 * refcounting in ldlm_lock_remove_from_lru() if an AST
1518                 * arrives after we drop lr_lock below. We use l_bl_ast
1519                 * and can't use l_pending_chain as it is used both on
1520                 * server and client nevertheless bug 5666 says it is
1521                 * used only on server
1522                 */
1523                LASSERT(list_empty(&lock->l_bl_ast));
1524                list_add(&lock->l_bl_ast, cancels);
1525                unlock_res_and_lock(lock);
1526                lu_ref_del(&lock->l_reference, __func__, current);
1527                spin_lock(&ns->ns_lock);
1528                added++;
1529                unused--;
1530        }
1531        spin_unlock(&ns->ns_lock);
1532        return added;
1533}
1534
1535int ldlm_cancel_lru_local(struct ldlm_namespace *ns,
1536                          struct list_head *cancels, int count, int max,
1537                          enum ldlm_cancel_flags cancel_flags, int flags)
1538{
1539        int added;
1540
1541        added = ldlm_prepare_lru_list(ns, cancels, count, max, flags);
1542        if (added <= 0)
1543                return added;
1544        return ldlm_cli_cancel_list_local(cancels, added, cancel_flags);
1545}
1546
1547/**
1548 * Cancel at least \a nr locks from given namespace LRU.
1549 *
1550 * When called with LCF_ASYNC the blocking callback will be handled
1551 * in a thread and this function will return after the thread has been
1552 * asked to call the callback.  When called with LCF_ASYNC the blocking
1553 * callback will be performed in this function.
1554 */
1555int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
1556                    enum ldlm_cancel_flags cancel_flags,
1557                    int flags)
1558{
1559        LIST_HEAD(cancels);
1560        int count, rc;
1561
1562        /* Just prepare the list of locks, do not actually cancel them yet.
1563         * Locks are cancelled later in a separate thread.
1564         */
1565        count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, flags);
1566        rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags);
1567        if (rc == 0)
1568                return count;
1569
1570        return 0;
1571}
1572
1573/**
1574 * Find and cancel locally unused locks found on resource, matched to the
1575 * given policy, mode. GET the found locks and add them into the \a cancels
1576 * list.
1577 */
1578int ldlm_cancel_resource_local(struct ldlm_resource *res,
1579                               struct list_head *cancels,
1580                               union ldlm_policy_data *policy,
1581                               enum ldlm_mode mode, __u64 lock_flags,
1582                               enum ldlm_cancel_flags cancel_flags,
1583                               void *opaque)
1584{
1585        struct ldlm_lock *lock;
1586        int count = 0;
1587
1588        lock_res(res);
1589        list_for_each_entry(lock, &res->lr_granted, l_res_link) {
1590                if (opaque && lock->l_ast_data != opaque) {
1591                        LDLM_ERROR(lock, "data %p doesn't match opaque %p",
1592                                   lock->l_ast_data, opaque);
1593                        continue;
1594                }
1595
1596                if (lock->l_readers || lock->l_writers)
1597                        continue;
1598
1599                /* If somebody is already doing CANCEL, or blocking AST came,
1600                 * skip this lock.
1601                 */
1602                if (ldlm_is_bl_ast(lock) || ldlm_is_canceling(lock))
1603                        continue;
1604
1605                if (lockmode_compat(lock->l_granted_mode, mode))
1606                        continue;
1607
1608                /* If policy is given and this is IBITS lock, add to list only
1609                 * those locks that match by policy.
1610                 */
1611                if (policy && (lock->l_resource->lr_type == LDLM_IBITS) &&
1612                    !(lock->l_policy_data.l_inodebits.bits &
1613                      policy->l_inodebits.bits))
1614                        continue;
1615
1616                /* See CBPENDING comment in ldlm_cancel_lru */
1617                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
1618                                 lock_flags;
1619
1620                LASSERT(list_empty(&lock->l_bl_ast));
1621                list_add(&lock->l_bl_ast, cancels);
1622                LDLM_LOCK_GET(lock);
1623                count++;
1624        }
1625        unlock_res(res);
1626
1627        return ldlm_cli_cancel_list_local(cancels, count, cancel_flags);
1628}
1629EXPORT_SYMBOL(ldlm_cancel_resource_local);
1630
1631/**
1632 * Cancel client-side locks from a list and send/prepare cancel RPCs to the
1633 * server.
1634 * If \a req is NULL, send CANCEL request to server with handles of locks
1635 * in the \a cancels. If EARLY_CANCEL is not supported, send CANCEL requests
1636 * separately per lock.
1637 * If \a req is not NULL, put handles of locks in \a cancels into the request
1638 * buffer at the offset \a off.
1639 * Destroy \a cancels at the end.
1640 */
1641int ldlm_cli_cancel_list(struct list_head *cancels, int count,
1642                         struct ptlrpc_request *req,
1643                         enum ldlm_cancel_flags flags)
1644{
1645        struct ldlm_lock *lock;
1646        int res = 0;
1647
1648        if (list_empty(cancels) || count == 0)
1649                return 0;
1650
1651        /* XXX: requests (both batched and not) could be sent in parallel.
1652         * Usually it is enough to have just 1 RPC, but it is possible that
1653         * there are too many locks to be cancelled in LRU or on a resource.
1654         * It would also speed up the case when the server does not support
1655         * the feature.
1656         */
1657        while (count > 0) {
1658                LASSERT(!list_empty(cancels));
1659                lock = list_first_entry(cancels, struct ldlm_lock, l_bl_ast);
1660                LASSERT(lock->l_conn_export);
1661
1662                if (exp_connect_cancelset(lock->l_conn_export)) {
1663                        res = count;
1664                        if (req)
1665                                ldlm_cancel_pack(req, cancels, count);
1666                        else
1667                                res = ldlm_cli_cancel_req(lock->l_conn_export,
1668                                                          cancels, count,
1669                                                          flags);
1670                } else {
1671                        res = ldlm_cli_cancel_req(lock->l_conn_export,
1672                                                  cancels, 1, flags);
1673                }
1674
1675                if (res < 0) {
1676                        CDEBUG_LIMIT(res == -ESHUTDOWN ? D_DLMTRACE : D_ERROR,
1677                                     "%s: %d\n", __func__, res);
1678                        res = count;
1679                }
1680
1681                count -= res;
1682                ldlm_lock_list_put(cancels, l_bl_ast, res);
1683        }
1684        LASSERT(count == 0);
1685        return 0;
1686}
1687EXPORT_SYMBOL(ldlm_cli_cancel_list);
1688
1689/**
1690 * Cancel all locks on a resource that have 0 readers/writers.
1691 *
1692 * If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
1693 * to notify the server.
1694 */
1695int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
1696                                    const struct ldlm_res_id *res_id,
1697                                    union ldlm_policy_data *policy,
1698                                    enum ldlm_mode mode,
1699                                    enum ldlm_cancel_flags flags,
1700                                    void *opaque)
1701{
1702        struct ldlm_resource *res;
1703        LIST_HEAD(cancels);
1704        int count;
1705        int rc;
1706
1707        res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
1708        if (IS_ERR(res)) {
1709                /* This is not a problem. */
1710                CDEBUG(D_INFO, "No resource %llu\n", res_id->name[0]);
1711                return 0;
1712        }
1713
1714        LDLM_RESOURCE_ADDREF(res);
1715        count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
1716                                           0, flags | LCF_BL_AST, opaque);
1717        rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags);
1718        if (rc != ELDLM_OK)
1719                CERROR("canceling unused lock " DLDLMRES ": rc = %d\n",
1720                       PLDLMRES(res), rc);
1721
1722        LDLM_RESOURCE_DELREF(res);
1723        ldlm_resource_putref(res);
1724        return 0;
1725}
1726EXPORT_SYMBOL(ldlm_cli_cancel_unused_resource);
1727
1728struct ldlm_cli_cancel_arg {
1729        int     lc_flags;
1730        void   *lc_opaque;
1731};
1732
1733static int ldlm_cli_hash_cancel_unused(struct cfs_hash *hs,
1734                                       struct cfs_hash_bd *bd,
1735                                       struct hlist_node *hnode, void *arg)
1736{
1737        struct ldlm_resource       *res = cfs_hash_object(hs, hnode);
1738        struct ldlm_cli_cancel_arg     *lc = arg;
1739
1740        ldlm_cli_cancel_unused_resource(ldlm_res_to_ns(res), &res->lr_name,
1741                                        NULL, LCK_MINMODE,
1742                                        lc->lc_flags, lc->lc_opaque);
1743        /* must return 0 for hash iteration */
1744        return 0;
1745}
1746
1747/**
1748 * Cancel all locks on a namespace (or a specific resource, if given)
1749 * that have 0 readers/writers.
1750 *
1751 * If flags & LCF_LOCAL, throw the locks away without trying
1752 * to notify the server.
1753 */
1754int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
1755                           const struct ldlm_res_id *res_id,
1756                           enum ldlm_cancel_flags flags, void *opaque)
1757{
1758        struct ldlm_cli_cancel_arg arg = {
1759                .lc_flags       = flags,
1760                .lc_opaque      = opaque,
1761        };
1762
1763        if (!ns)
1764                return ELDLM_OK;
1765
1766        if (res_id) {
1767                return ldlm_cli_cancel_unused_resource(ns, res_id, NULL,
1768                                                       LCK_MINMODE, flags,
1769                                                       opaque);
1770        } else {
1771                cfs_hash_for_each_nolock(ns->ns_rs_hash,
1772                                         ldlm_cli_hash_cancel_unused, &arg, 0);
1773                return ELDLM_OK;
1774        }
1775}
1776EXPORT_SYMBOL(ldlm_cli_cancel_unused);
1777
1778/* Lock iterators. */
1779
1780static int ldlm_resource_foreach(struct ldlm_resource *res,
1781                                 ldlm_iterator_t iter, void *closure)
1782{
1783        struct ldlm_lock *tmp;
1784        struct ldlm_lock *lock;
1785        int rc = LDLM_ITER_CONTINUE;
1786
1787        if (!res)
1788                return LDLM_ITER_CONTINUE;
1789
1790        lock_res(res);
1791        list_for_each_entry_safe(lock, tmp, &res->lr_granted, l_res_link) {
1792                if (iter(lock, closure) == LDLM_ITER_STOP) {
1793                        rc = LDLM_ITER_STOP;
1794                        goto out;
1795                }
1796        }
1797
1798        list_for_each_entry_safe(lock, tmp, &res->lr_waiting, l_res_link) {
1799                if (iter(lock, closure) == LDLM_ITER_STOP) {
1800                        rc = LDLM_ITER_STOP;
1801                        goto out;
1802                }
1803        }
1804 out:
1805        unlock_res(res);
1806        return rc;
1807}
1808
1809struct iter_helper_data {
1810        ldlm_iterator_t iter;
1811        void *closure;
1812};
1813
1814static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure)
1815{
1816        struct iter_helper_data *helper = closure;
1817
1818        return helper->iter(lock, helper->closure);
1819}
1820
1821static int ldlm_res_iter_helper(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1822                                struct hlist_node *hnode, void *arg)
1823
1824{
1825        struct ldlm_resource *res = cfs_hash_object(hs, hnode);
1826
1827        return ldlm_resource_foreach(res, ldlm_iter_helper, arg) ==
1828               LDLM_ITER_STOP;
1829}
1830
1831static void ldlm_namespace_foreach(struct ldlm_namespace *ns,
1832                                   ldlm_iterator_t iter, void *closure)
1833
1834{
1835        struct iter_helper_data helper = {
1836                .iter           = iter,
1837                .closure        = closure,
1838        };
1839
1840        cfs_hash_for_each_nolock(ns->ns_rs_hash,
1841                                 ldlm_res_iter_helper, &helper, 0);
1842}
1843
1844/* non-blocking function to manipulate a lock whose cb_data is being put away.
1845 * return  0:  find no resource
1846 *       > 0:  must be LDLM_ITER_STOP/LDLM_ITER_CONTINUE.
1847 *       < 0:  errors
1848 */
1849int ldlm_resource_iterate(struct ldlm_namespace *ns,
1850                          const struct ldlm_res_id *res_id,
1851                          ldlm_iterator_t iter, void *data)
1852{
1853        struct ldlm_resource *res;
1854        int rc;
1855
1856        LASSERTF(ns, "must pass in namespace\n");
1857
1858        res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
1859        if (IS_ERR(res))
1860                return 0;
1861
1862        LDLM_RESOURCE_ADDREF(res);
1863        rc = ldlm_resource_foreach(res, iter, data);
1864        LDLM_RESOURCE_DELREF(res);
1865        ldlm_resource_putref(res);
1866        return rc;
1867}
1868EXPORT_SYMBOL(ldlm_resource_iterate);
1869
1870/* Lock replay */
1871
1872static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
1873{
1874        struct list_head *list = closure;
1875
1876        /* we use l_pending_chain here, because it's unused on clients. */
1877        LASSERTF(list_empty(&lock->l_pending_chain),
1878                 "lock %p next %p prev %p\n",
1879                 lock, &lock->l_pending_chain.next,
1880                 &lock->l_pending_chain.prev);
1881        /* bug 9573: don't replay locks left after eviction, or
1882         * bug 17614: locks being actively cancelled. Get a reference
1883         * on a lock so that it does not disappear under us (e.g. due to cancel)
1884         */
1885        if (!(lock->l_flags & (LDLM_FL_FAILED | LDLM_FL_BL_DONE))) {
1886                list_add(&lock->l_pending_chain, list);
1887                LDLM_LOCK_GET(lock);
1888        }
1889
1890        return LDLM_ITER_CONTINUE;
1891}
1892
1893static int replay_lock_interpret(const struct lu_env *env,
1894                                 struct ptlrpc_request *req,
1895                                 struct ldlm_async_args *aa, int rc)
1896{
1897        struct ldlm_lock     *lock;
1898        struct ldlm_reply    *reply;
1899        struct obd_export    *exp;
1900
1901        atomic_dec(&req->rq_import->imp_replay_inflight);
1902        if (rc != ELDLM_OK)
1903                goto out;
1904
1905        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1906        if (!reply) {
1907                rc = -EPROTO;
1908                goto out;
1909        }
1910
1911        lock = ldlm_handle2lock(&aa->lock_handle);
1912        if (!lock) {
1913                CERROR("received replay ack for unknown local cookie %#llx remote cookie %#llx from server %s id %s\n",
1914                       aa->lock_handle.cookie, reply->lock_handle.cookie,
1915                       req->rq_export->exp_client_uuid.uuid,
1916                       libcfs_id2str(req->rq_peer));
1917                rc = -ESTALE;
1918                goto out;
1919        }
1920
1921        /* Key change rehash lock in per-export hash with new key */
1922        exp = req->rq_export;
1923        if (exp && exp->exp_lock_hash) {
1924                /* In the function below, .hs_keycmp resolves to
1925                 * ldlm_export_lock_keycmp()
1926                 */
1927                /* coverity[overrun-buffer-val] */
1928                cfs_hash_rehash_key(exp->exp_lock_hash,
1929                                    &lock->l_remote_handle,
1930                                    &reply->lock_handle,
1931                                    &lock->l_exp_hash);
1932        } else {
1933                lock->l_remote_handle = reply->lock_handle;
1934        }
1935
1936        LDLM_DEBUG(lock, "replayed lock:");
1937        ptlrpc_import_recovery_state_machine(req->rq_import);
1938        LDLM_LOCK_PUT(lock);
1939out:
1940        if (rc != ELDLM_OK)
1941                ptlrpc_connect_import(req->rq_import);
1942
1943        return rc;
1944}
1945
1946static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
1947{
1948        struct ptlrpc_request *req;
1949        struct ldlm_async_args *aa;
1950        struct ldlm_request   *body;
1951        int flags;
1952
1953        /* Bug 11974: Do not replay a lock which is actively being canceled */
1954        if (ldlm_is_bl_done(lock)) {
1955                LDLM_DEBUG(lock, "Not replaying canceled lock:");
1956                return 0;
1957        }
1958
1959        /* If this is reply-less callback lock, we cannot replay it, since
1960         * server might have long dropped it, but notification of that event was
1961         * lost by network. (and server granted conflicting lock already)
1962         */
1963        if (ldlm_is_cancel_on_block(lock)) {
1964                LDLM_DEBUG(lock, "Not replaying reply-less lock:");
1965                ldlm_lock_cancel(lock);
1966                return 0;
1967        }
1968
1969        /*
1970         * If granted mode matches the requested mode, this lock is granted.
1971         *
1972         * If they differ, but we have a granted mode, then we were granted
1973         * one mode and now want another: ergo, converting.
1974         *
1975         * If we haven't been granted anything and are on a resource list,
1976         * then we're blocked/waiting.
1977         *
1978         * If we haven't been granted anything and we're NOT on a resource list,
1979         * then we haven't got a reply yet and don't have a known disposition.
1980         * This happens whenever a lock enqueue is the request that triggers
1981         * recovery.
1982         */
1983        if (lock->l_granted_mode == lock->l_req_mode)
1984                flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED;
1985        else if (lock->l_granted_mode)
1986                flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_CONV;
1987        else if (!list_empty(&lock->l_res_link))
1988                flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT;
1989        else
1990                flags = LDLM_FL_REPLAY;
1991
1992        req = ptlrpc_request_alloc_pack(imp, &RQF_LDLM_ENQUEUE,
1993                                        LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
1994        if (!req)
1995                return -ENOMEM;
1996
1997        /* We're part of recovery, so don't wait for it. */
1998        req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
1999
2000        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2001        ldlm_lock2desc(lock, &body->lock_desc);
2002        body->lock_flags = ldlm_flags_to_wire(flags);
2003
2004        ldlm_lock2handle(lock, &body->lock_handle[0]);
2005        if (lock->l_lvb_len > 0)
2006                req_capsule_extend(&req->rq_pill, &RQF_LDLM_ENQUEUE_LVB);
2007        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2008                             lock->l_lvb_len);
2009        ptlrpc_request_set_replen(req);
2010        /* notify the server we've replayed all requests.
2011         * also, we mark the request to be put on a dedicated
2012         * queue to be processed after all request replayes.
2013         * bug 6063
2014         */
2015        lustre_msg_set_flags(req->rq_reqmsg, MSG_REQ_REPLAY_DONE);
2016
2017        LDLM_DEBUG(lock, "replaying lock:");
2018
2019        atomic_inc(&req->rq_import->imp_replay_inflight);
2020        BUILD_BUG_ON(sizeof(*aa) > sizeof(req->rq_async_args));
2021        aa = ptlrpc_req_async_args(req);
2022        aa->lock_handle = body->lock_handle[0];
2023        req->rq_interpret_reply = (ptlrpc_interpterer_t)replay_lock_interpret;
2024        ptlrpcd_add_req(req);
2025
2026        return 0;
2027}
2028
2029/**
2030 * Cancel as many unused locks as possible before replay. since we are
2031 * in recovery, we can't wait for any outstanding RPCs to send any RPC
2032 * to the server.
2033 *
2034 * Called only in recovery before replaying locks. there is no need to
2035 * replay locks that are unused. since the clients may hold thousands of
2036 * cached unused locks, dropping the unused locks can greatly reduce the
2037 * load on the servers at recovery time.
2038 */
2039static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
2040{
2041        int canceled;
2042        LIST_HEAD(cancels);
2043
2044        CDEBUG(D_DLMTRACE,
2045               "Dropping as many unused locks as possible before replay for namespace %s (%d)\n",
2046               ldlm_ns_name(ns), ns->ns_nr_unused);
2047
2048        /* We don't need to care whether or not LRU resize is enabled
2049         * because the LDLM_LRU_FLAG_NO_WAIT policy doesn't use the
2050         * count parameter
2051         */
2052        canceled = ldlm_cancel_lru_local(ns, &cancels, ns->ns_nr_unused, 0,
2053                                         LCF_LOCAL, LDLM_LRU_FLAG_NO_WAIT);
2054
2055        CDEBUG(D_DLMTRACE, "Canceled %d unused locks from namespace %s\n",
2056               canceled, ldlm_ns_name(ns));
2057}
2058
2059int ldlm_replay_locks(struct obd_import *imp)
2060{
2061        struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
2062        LIST_HEAD(list);
2063        struct ldlm_lock *lock, *next;
2064        int rc = 0;
2065
2066        LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
2067
2068        /* don't replay locks if import failed recovery */
2069        if (imp->imp_vbr_failed)
2070                return 0;
2071
2072        /* ensure this doesn't fall to 0 before all have been queued */
2073        atomic_inc(&imp->imp_replay_inflight);
2074
2075        if (ldlm_cancel_unused_locks_before_replay)
2076                ldlm_cancel_unused_locks_for_replay(ns);
2077
2078        ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
2079
2080        list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
2081                list_del_init(&lock->l_pending_chain);
2082                if (rc) {
2083                        LDLM_LOCK_RELEASE(lock);
2084                        continue; /* or try to do the rest? */
2085                }
2086                rc = replay_one_lock(imp, lock);
2087                LDLM_LOCK_RELEASE(lock);
2088        }
2089
2090        atomic_dec(&imp->imp_replay_inflight);
2091
2092        return rc;
2093}
2094