linux/drivers/staging/lustre/lustre/llite/statahead.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * GPL HEADER START
   4 *
   5 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   6 *
   7 * This program is free software; you can redistribute it and/or modify
   8 * it under the terms of the GNU General Public License version 2 only,
   9 * as published by the Free Software Foundation.
  10 *
  11 * This program is distributed in the hope that it will be useful, but
  12 * WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * General Public License version 2 for more details (a copy is included
  15 * in the LICENSE file that accompanied this code).
  16 *
  17 * You should have received a copy of the GNU General Public License
  18 * version 2 along with this program; If not, see
  19 * http://www.gnu.org/licenses/gpl-2.0.html
  20 *
  21 * GPL HEADER END
  22 */
  23/*
  24 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  25 * Use is subject to license terms.
  26 *
  27 * Copyright (c) 2011, 2015, Intel Corporation.
  28 */
  29/*
  30 * This file is part of Lustre, http://www.lustre.org/
  31 * Lustre is a trademark of Sun Microsystems, Inc.
  32 */
  33
  34#include <linux/fs.h>
  35#include <linux/sched.h>
  36#include <linux/mm.h>
  37#include <linux/highmem.h>
  38#include <linux/pagemap.h>
  39
  40#define DEBUG_SUBSYSTEM S_LLITE
  41
  42#include <obd_support.h>
  43#include <lustre_dlm.h>
  44#include "llite_internal.h"
  45
  46#define SA_OMITTED_ENTRY_MAX 8ULL
  47
  48enum se_stat {
  49        /** negative values are for error cases */
  50        SA_ENTRY_INIT = 0,      /** init entry */
  51        SA_ENTRY_SUCC = 1,      /** stat succeed */
  52        SA_ENTRY_INVA = 2,      /** invalid entry */
  53};
  54
  55/*
  56 * sa_entry is not refcounted: statahead thread allocates it and do async stat,
  57 * and in async stat callback ll_statahead_interpret() will add it into
  58 * sai_interim_entries, later statahead thread will call sa_handle_callback() to
  59 * instantiate entry and move it into sai_entries, and then only scanner process
  60 * can access and free it.
  61 */
  62struct sa_entry {
  63        /* link into sai_interim_entries or sai_entries */
  64        struct list_head              se_list;
  65        /* link into sai hash table locally */
  66        struct list_head              se_hash;
  67        /* entry index in the sai */
  68        __u64              se_index;
  69        /* low layer ldlm lock handle */
  70        __u64              se_handle;
  71        /* entry status */
  72        enum se_stat            se_state;
  73        /* entry size, contains name */
  74        int                  se_size;
  75        /* pointer to async getattr enqueue info */
  76        struct md_enqueue_info *se_minfo;
  77        /* pointer to the async getattr request */
  78        struct ptlrpc_request  *se_req;
  79        /* pointer to the target inode */
  80        struct inode       *se_inode;
  81        /* entry name */
  82        struct qstr          se_qstr;
  83        /* entry fid */
  84        struct lu_fid           se_fid;
  85};
  86
  87static unsigned int sai_generation;
  88static DEFINE_SPINLOCK(sai_generation_lock);
  89
  90/* sa_entry is ready to use */
  91static inline int sa_ready(struct sa_entry *entry)
  92{
  93        smp_rmb();
  94        return (entry->se_state != SA_ENTRY_INIT);
  95}
  96
  97/* hash value to put in sai_cache */
  98static inline int sa_hash(int val)
  99{
 100        return val & LL_SA_CACHE_MASK;
 101}
 102
 103/* hash entry into sai_cache */
 104static inline void
 105sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry)
 106{
 107        int i = sa_hash(entry->se_qstr.hash);
 108
 109        spin_lock(&sai->sai_cache_lock[i]);
 110        list_add_tail(&entry->se_hash, &sai->sai_cache[i]);
 111        spin_unlock(&sai->sai_cache_lock[i]);
 112}
 113
 114/*
 115 * Remove entry from SA table.
 116 */
 117static inline void
 118sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry)
 119{
 120        int i = sa_hash(entry->se_qstr.hash);
 121
 122        spin_lock(&sai->sai_cache_lock[i]);
 123        list_del_init(&entry->se_hash);
 124        spin_unlock(&sai->sai_cache_lock[i]);
 125}
 126
 127static inline int agl_should_run(struct ll_statahead_info *sai,
 128                                 struct inode *inode)
 129{
 130        return (inode && S_ISREG(inode->i_mode) && sai->sai_agl_valid);
 131}
 132
 133/* statahead window is full */
 134static inline int sa_sent_full(struct ll_statahead_info *sai)
 135{
 136        return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
 137}
 138
 139/* got async stat replies */
 140static inline int sa_has_callback(struct ll_statahead_info *sai)
 141{
 142        return !list_empty(&sai->sai_interim_entries);
 143}
 144
 145static inline int agl_list_empty(struct ll_statahead_info *sai)
 146{
 147        return list_empty(&sai->sai_agls);
 148}
 149
 150/**
 151 * (1) hit ratio less than 80%
 152 * or
 153 * (2) consecutive miss more than 8
 154 * then means low hit.
 155 */
 156static inline int sa_low_hit(struct ll_statahead_info *sai)
 157{
 158        return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
 159                (sai->sai_consecutive_miss > 8));
 160}
 161
 162/*
 163 * if the given index is behind of statahead window more than
 164 * SA_OMITTED_ENTRY_MAX, then it is old.
 165 */
 166static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
 167{
 168        return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX <
 169                 sai->sai_index);
 170}
 171
 172/* allocate sa_entry and hash it to allow scanner process to find it */
 173static struct sa_entry *
 174sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
 175         const char *name, int len, const struct lu_fid *fid)
 176{
 177        struct ll_inode_info *lli;
 178        struct sa_entry   *entry;
 179        int                entry_size;
 180        char             *dname;
 181
 182        entry_size = sizeof(struct sa_entry) + (len & ~3) + 4;
 183        entry = kzalloc(entry_size, GFP_NOFS);
 184        if (unlikely(!entry))
 185                return ERR_PTR(-ENOMEM);
 186
 187        CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n",
 188               len, name, entry, index);
 189
 190        entry->se_index = index;
 191        entry->se_state = SA_ENTRY_INIT;
 192        entry->se_size = entry_size;
 193        dname = (char *)entry + sizeof(struct sa_entry);
 194        memcpy(dname, name, len);
 195        dname[len] = 0;
 196
 197        entry->se_qstr.hash = full_name_hash(parent, name, len);
 198        entry->se_qstr.len = len;
 199        entry->se_qstr.name = dname;
 200        entry->se_fid = *fid;
 201
 202        lli = ll_i2info(sai->sai_dentry->d_inode);
 203        spin_lock(&lli->lli_sa_lock);
 204        INIT_LIST_HEAD(&entry->se_list);
 205        sa_rehash(sai, entry);
 206        spin_unlock(&lli->lli_sa_lock);
 207
 208        atomic_inc(&sai->sai_cache_count);
 209
 210        return entry;
 211}
 212
 213/* free sa_entry, which should have been unhashed and not in any list */
 214static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
 215{
 216        CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
 217               entry->se_qstr.len, entry->se_qstr.name, entry,
 218               entry->se_index);
 219
 220        LASSERT(list_empty(&entry->se_list));
 221        LASSERT(list_empty(&entry->se_hash));
 222
 223        kfree(entry);
 224        atomic_dec(&sai->sai_cache_count);
 225}
 226
 227/*
 228 * find sa_entry by name, used by directory scanner, lock is not needed because
 229 * only scanner can remove the entry from cache.
 230 */
 231static struct sa_entry *
 232sa_get(struct ll_statahead_info *sai, const struct qstr *qstr)
 233{
 234        struct sa_entry *entry;
 235        int i = sa_hash(qstr->hash);
 236
 237        list_for_each_entry(entry, &sai->sai_cache[i], se_hash) {
 238                if (entry->se_qstr.hash == qstr->hash &&
 239                    entry->se_qstr.len == qstr->len &&
 240                    memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0)
 241                        return entry;
 242        }
 243        return NULL;
 244}
 245
 246/* unhash and unlink sa_entry, and then free it */
 247static inline void
 248sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry)
 249{
 250        struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
 251
 252        LASSERT(!list_empty(&entry->se_hash));
 253        LASSERT(!list_empty(&entry->se_list));
 254        LASSERT(sa_ready(entry));
 255
 256        sa_unhash(sai, entry);
 257
 258        spin_lock(&lli->lli_sa_lock);
 259        list_del_init(&entry->se_list);
 260        spin_unlock(&lli->lli_sa_lock);
 261
 262        if (entry->se_inode)
 263                iput(entry->se_inode);
 264
 265        sa_free(sai, entry);
 266}
 267
 268/* called by scanner after use, sa_entry will be killed */
 269static void
 270sa_put(struct ll_statahead_info *sai, struct sa_entry *entry, struct ll_inode_info *lli)
 271{
 272        struct sa_entry *tmp, *next;
 273
 274        if (entry && entry->se_state == SA_ENTRY_SUCC) {
 275                struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
 276
 277                sai->sai_hit++;
 278                sai->sai_consecutive_miss = 0;
 279                sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
 280        } else {
 281                sai->sai_miss++;
 282                sai->sai_consecutive_miss++;
 283        }
 284
 285        if (entry)
 286                sa_kill(sai, entry);
 287
 288        /*
 289         * kill old completed entries, only scanner process does this, no need
 290         * to lock
 291         */
 292        list_for_each_entry_safe(tmp, next, &sai->sai_entries, se_list) {
 293                if (!is_omitted_entry(sai, tmp->se_index))
 294                        break;
 295                sa_kill(sai, tmp);
 296        }
 297
 298        spin_lock(&lli->lli_sa_lock);
 299        if (sai->sai_task)
 300                wake_up_process(sai->sai_task);
 301        spin_unlock(&lli->lli_sa_lock);
 302
 303}
 304
 305/*
 306 * update state and sort add entry to sai_entries by index, return true if
 307 * scanner is waiting on this entry.
 308 */
 309static bool
 310__sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
 311{
 312        struct list_head *pos = &sai->sai_entries;
 313        __u64 index = entry->se_index;
 314        struct sa_entry *se;
 315
 316        LASSERT(!sa_ready(entry));
 317        LASSERT(list_empty(&entry->se_list));
 318
 319        list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
 320                if (se->se_index < entry->se_index) {
 321                        pos = &se->se_list;
 322                        break;
 323                }
 324        }
 325        list_add(&entry->se_list, pos);
 326        entry->se_state = ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC;
 327
 328        return (index == sai->sai_index_wait);
 329}
 330
 331/*
 332 * release resources used in async stat RPC, update entry state and wakeup if
 333 * scanner process it waiting on this entry.
 334 */
 335static void
 336sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
 337{
 338        struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
 339        struct md_enqueue_info *minfo = entry->se_minfo;
 340        struct ptlrpc_request *req = entry->se_req;
 341        bool wakeup;
 342
 343        /* release resources used in RPC */
 344        if (minfo) {
 345                entry->se_minfo = NULL;
 346                ll_intent_release(&minfo->mi_it);
 347                iput(minfo->mi_dir);
 348                kfree(minfo);
 349        }
 350
 351        if (req) {
 352                entry->se_req = NULL;
 353                ptlrpc_req_finished(req);
 354        }
 355
 356        spin_lock(&lli->lli_sa_lock);
 357        wakeup = __sa_make_ready(sai, entry, ret);
 358        spin_unlock(&lli->lli_sa_lock);
 359
 360        if (wakeup)
 361                wake_up(&sai->sai_waitq);
 362}
 363
 364/* Insert inode into the list of sai_agls. */
 365static void ll_agl_add(struct ll_statahead_info *sai,
 366                       struct inode *inode, int index)
 367{
 368        struct ll_inode_info *child  = ll_i2info(inode);
 369        struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
 370        int                added  = 0;
 371
 372        spin_lock(&child->lli_agl_lock);
 373        if (child->lli_agl_index == 0) {
 374                child->lli_agl_index = index;
 375                spin_unlock(&child->lli_agl_lock);
 376
 377                LASSERT(list_empty(&child->lli_agl_list));
 378
 379                igrab(inode);
 380                spin_lock(&parent->lli_agl_lock);
 381                if (list_empty(&sai->sai_agls))
 382                        added = 1;
 383                list_add_tail(&child->lli_agl_list, &sai->sai_agls);
 384                spin_unlock(&parent->lli_agl_lock);
 385        } else {
 386                spin_unlock(&child->lli_agl_lock);
 387        }
 388
 389        if (added > 0)
 390                wake_up_process(sai->sai_agl_task);
 391}
 392
 393/* allocate sai */
 394static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
 395{
 396        struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
 397        struct ll_statahead_info *sai;
 398        int                    i;
 399
 400        sai = kzalloc(sizeof(*sai), GFP_NOFS);
 401        if (!sai)
 402                return NULL;
 403
 404        sai->sai_dentry = dget(dentry);
 405        atomic_set(&sai->sai_refcount, 1);
 406
 407        sai->sai_max = LL_SA_RPC_MIN;
 408        sai->sai_index = 1;
 409        init_waitqueue_head(&sai->sai_waitq);
 410
 411        INIT_LIST_HEAD(&sai->sai_interim_entries);
 412        INIT_LIST_HEAD(&sai->sai_entries);
 413        INIT_LIST_HEAD(&sai->sai_agls);
 414
 415        for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
 416                INIT_LIST_HEAD(&sai->sai_cache[i]);
 417                spin_lock_init(&sai->sai_cache_lock[i]);
 418        }
 419        atomic_set(&sai->sai_cache_count, 0);
 420
 421        spin_lock(&sai_generation_lock);
 422        lli->lli_sa_generation = ++sai_generation;
 423        if (unlikely(!sai_generation))
 424                lli->lli_sa_generation = ++sai_generation;
 425        spin_unlock(&sai_generation_lock);
 426
 427        return sai;
 428}
 429
 430/* free sai */
 431static inline void ll_sai_free(struct ll_statahead_info *sai)
 432{
 433        LASSERT(sai->sai_dentry);
 434        dput(sai->sai_dentry);
 435        kfree(sai);
 436}
 437
 438/*
 439 * take refcount of sai if sai for @dir exists, which means statahead is on for
 440 * this directory.
 441 */
 442static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
 443{
 444        struct ll_inode_info *lli = ll_i2info(dir);
 445        struct ll_statahead_info *sai = NULL;
 446
 447        spin_lock(&lli->lli_sa_lock);
 448        sai = lli->lli_sai;
 449        if (sai)
 450                atomic_inc(&sai->sai_refcount);
 451        spin_unlock(&lli->lli_sa_lock);
 452
 453        return sai;
 454}
 455
 456/*
 457 * put sai refcount after use, if refcount reaches zero, free sai and sa_entries
 458 * attached to it.
 459 */
 460static void ll_sai_put(struct ll_statahead_info *sai)
 461{
 462        struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
 463
 464        if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
 465                struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
 466                struct sa_entry *entry, *next;
 467
 468                lli->lli_sai = NULL;
 469                spin_unlock(&lli->lli_sa_lock);
 470
 471                LASSERT(sai->sai_task == NULL);
 472                LASSERT(sai->sai_agl_task == NULL);
 473                LASSERT(sai->sai_sent == sai->sai_replied);
 474                LASSERT(!sa_has_callback(sai));
 475
 476                list_for_each_entry_safe(entry, next, &sai->sai_entries,
 477                                         se_list)
 478                        sa_kill(sai, entry);
 479
 480                LASSERT(atomic_read(&sai->sai_cache_count) == 0);
 481                LASSERT(list_empty(&sai->sai_agls));
 482
 483                ll_sai_free(sai);
 484                atomic_dec(&sbi->ll_sa_running);
 485        }
 486}
 487
 488/* Do NOT forget to drop inode refcount when into sai_agls. */
 489static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
 490{
 491        struct ll_inode_info *lli   = ll_i2info(inode);
 492        __u64            index = lli->lli_agl_index;
 493        int                rc;
 494
 495        LASSERT(list_empty(&lli->lli_agl_list));
 496
 497        /* AGL maybe fall behind statahead with one entry */
 498        if (is_omitted_entry(sai, index + 1)) {
 499                lli->lli_agl_index = 0;
 500                iput(inode);
 501                return;
 502        }
 503
 504        /* Someone is in glimpse (sync or async), do nothing. */
 505        rc = down_write_trylock(&lli->lli_glimpse_sem);
 506        if (rc == 0) {
 507                lli->lli_agl_index = 0;
 508                iput(inode);
 509                return;
 510        }
 511
 512        /*
 513         * Someone triggered glimpse within 1 sec before.
 514         * 1) The former glimpse succeeded with glimpse lock granted by OST, and
 515         *    if the lock is still cached on client, AGL needs to do nothing. If
 516         *    it is cancelled by other client, AGL maybe cannot obtain new lock
 517         *    for no glimpse callback triggered by AGL.
 518         * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
 519         *    Under such case, it is quite possible that the OST will not grant
 520         *    glimpse lock for AGL also.
 521         * 3) The former glimpse failed, compared with other two cases, it is
 522         *    relative rare. AGL can ignore such case, and it will not muchly
 523         *    affect the performance.
 524         */
 525        if (lli->lli_glimpse_time != 0 &&
 526            time_before(cfs_time_shift(-1), lli->lli_glimpse_time)) {
 527                up_write(&lli->lli_glimpse_sem);
 528                lli->lli_agl_index = 0;
 529                iput(inode);
 530                return;
 531        }
 532
 533        CDEBUG(D_READA, "Handling (init) async glimpse: inode = "
 534               DFID ", idx = %llu\n", PFID(&lli->lli_fid), index);
 535
 536        cl_agl(inode);
 537        lli->lli_agl_index = 0;
 538        lli->lli_glimpse_time = cfs_time_current();
 539        up_write(&lli->lli_glimpse_sem);
 540
 541        CDEBUG(D_READA, "Handled (init) async glimpse: inode= "
 542               DFID ", idx = %llu, rc = %d\n",
 543               PFID(&lli->lli_fid), index, rc);
 544
 545        iput(inode);
 546}
 547
 548/*
 549 * prepare inode for sa entry, add it into agl list, now sa_entry is ready
 550 * to be used by scanner process.
 551 */
 552static void sa_instantiate(struct ll_statahead_info *sai,
 553                           struct sa_entry *entry)
 554{
 555        struct inode *dir = sai->sai_dentry->d_inode;
 556        struct inode       *child;
 557        struct md_enqueue_info *minfo;
 558        struct lookup_intent   *it;
 559        struct ptlrpc_request  *req;
 560        struct mdt_body *body;
 561        int                  rc    = 0;
 562
 563        LASSERT(entry->se_handle != 0);
 564
 565        minfo = entry->se_minfo;
 566        it = &minfo->mi_it;
 567        req = entry->se_req;
 568        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
 569        if (!body) {
 570                rc = -EFAULT;
 571                goto out;
 572        }
 573
 574        child = entry->se_inode;
 575        if (child) {
 576                /* revalidate; unlinked and re-created with the same name */
 577                if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2, &body->mbo_fid1))) {
 578                        entry->se_inode = NULL;
 579                        iput(child);
 580                        child = NULL;
 581                }
 582        }
 583
 584        it->it_lock_handle = entry->se_handle;
 585        rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
 586        if (rc != 1) {
 587                rc = -EAGAIN;
 588                goto out;
 589        }
 590
 591        rc = ll_prep_inode(&child, req, dir->i_sb, it);
 592        if (rc)
 593                goto out;
 594
 595        CDEBUG(D_READA, "%s: setting %.*s" DFID " l_data to inode %p\n",
 596               ll_get_fsname(child->i_sb, NULL, 0),
 597               entry->se_qstr.len, entry->se_qstr.name,
 598               PFID(ll_inode2fid(child)), child);
 599        ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
 600
 601        entry->se_inode = child;
 602
 603        if (agl_should_run(sai, child))
 604                ll_agl_add(sai, child, entry->se_index);
 605
 606out:
 607        /*
 608         * sa_make_ready() will drop ldlm ibits lock refcount by calling
 609         * ll_intent_drop_lock() in spite of failures. Do not worry about
 610         * calling ll_intent_drop_lock() more than once.
 611         */
 612        sa_make_ready(sai, entry, rc);
 613}
 614
 615/* once there are async stat replies, instantiate sa_entry from replies */
 616static void sa_handle_callback(struct ll_statahead_info *sai)
 617{
 618        struct ll_inode_info *lli;
 619
 620        lli = ll_i2info(sai->sai_dentry->d_inode);
 621
 622        while (sa_has_callback(sai)) {
 623                struct sa_entry *entry;
 624
 625                spin_lock(&lli->lli_sa_lock);
 626                if (unlikely(!sa_has_callback(sai))) {
 627                        spin_unlock(&lli->lli_sa_lock);
 628                        break;
 629                }
 630                entry = list_entry(sai->sai_interim_entries.next,
 631                                   struct sa_entry, se_list);
 632                list_del_init(&entry->se_list);
 633                spin_unlock(&lli->lli_sa_lock);
 634
 635                sa_instantiate(sai, entry);
 636        }
 637}
 638
 639/*
 640 * callback for async stat, because this is called in ptlrpcd context, we only
 641 * put sa_entry in sai_cb_entries list, and let sa_handle_callback() to really
 642 * prepare inode and instantiate sa_entry later.
 643 */
 644static int ll_statahead_interpret(struct ptlrpc_request *req,
 645                                  struct md_enqueue_info *minfo, int rc)
 646{
 647        struct lookup_intent     *it  = &minfo->mi_it;
 648        struct inode         *dir = minfo->mi_dir;
 649        struct ll_inode_info     *lli = ll_i2info(dir);
 650        struct ll_statahead_info *sai = lli->lli_sai;
 651        struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata;
 652        __u64 handle = 0;
 653
 654        if (it_disposition(it, DISP_LOOKUP_NEG))
 655                rc = -ENOENT;
 656
 657        /*
 658         * because statahead thread will wait for all inflight RPC to finish,
 659         * sai should be always valid, no need to refcount
 660         */
 661        LASSERT(sai);
 662        LASSERT(entry);
 663
 664        CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
 665               entry->se_qstr.len, entry->se_qstr.name, rc);
 666
 667        if (rc) {
 668                ll_intent_release(it);
 669                iput(dir);
 670                kfree(minfo);
 671        } else {
 672                /*
 673                 * release ibits lock ASAP to avoid deadlock when statahead
 674                 * thread enqueues lock on parent in readdir and another
 675                 * process enqueues lock on child with parent lock held, eg.
 676                 * unlink.
 677                 */
 678                handle = it->it_lock_handle;
 679                ll_intent_drop_lock(it);
 680        }
 681
 682        spin_lock(&lli->lli_sa_lock);
 683        if (rc) {
 684                if (__sa_make_ready(sai, entry, rc))
 685                        wake_up(&sai->sai_waitq);
 686        } else {
 687                int first = 0;
 688                entry->se_minfo = minfo;
 689                entry->se_req = ptlrpc_request_addref(req);
 690                /*
 691                 * Release the async ibits lock ASAP to avoid deadlock
 692                 * when statahead thread tries to enqueue lock on parent
 693                 * for readpage and other tries to enqueue lock on child
 694                 * with parent's lock held, for example: unlink.
 695                 */
 696                entry->se_handle = handle;
 697                if (!sa_has_callback(sai))
 698                        first = 1;
 699
 700                list_add_tail(&entry->se_list, &sai->sai_interim_entries);
 701
 702                if (first && sai->sai_task)
 703                        wake_up_process(sai->sai_task);
 704        }
 705        sai->sai_replied++;
 706
 707        spin_unlock(&lli->lli_sa_lock);
 708
 709        return rc;
 710}
 711
 712/* finish async stat RPC arguments */
 713static void sa_fini_data(struct md_enqueue_info *minfo)
 714{
 715        iput(minfo->mi_dir);
 716        kfree(minfo);
 717}
 718
 719/**
 720 * prepare arguments for async stat RPC.
 721 */
 722static struct md_enqueue_info *
 723sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
 724{
 725        struct md_enqueue_info   *minfo;
 726        struct ldlm_enqueue_info *einfo;
 727        struct md_op_data       *op_data;
 728
 729        minfo = kzalloc(sizeof(*minfo), GFP_NOFS);
 730        if (!minfo)
 731                return ERR_PTR(-ENOMEM);
 732
 733        op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child, NULL, 0, 0,
 734                                     LUSTRE_OPC_ANY, NULL);
 735        if (IS_ERR(op_data)) {
 736                kfree(minfo);
 737                return (struct md_enqueue_info *)op_data;
 738        }
 739
 740        if (!child)
 741                op_data->op_fid2 = entry->se_fid;
 742
 743        minfo->mi_it.it_op = IT_GETATTR;
 744        minfo->mi_dir = igrab(dir);
 745        minfo->mi_cb = ll_statahead_interpret;
 746        minfo->mi_cbdata = entry;
 747
 748        einfo = &minfo->mi_einfo;
 749        einfo->ei_type   = LDLM_IBITS;
 750        einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
 751        einfo->ei_cb_bl  = ll_md_blocking_ast;
 752        einfo->ei_cb_cp  = ldlm_completion_ast;
 753        einfo->ei_cb_gl  = NULL;
 754        einfo->ei_cbdata = NULL;
 755
 756        return minfo;
 757}
 758
 759/* async stat for file not found in dcache */
 760static int sa_lookup(struct inode *dir, struct sa_entry *entry)
 761{
 762        struct md_enqueue_info   *minfo;
 763        int                    rc;
 764
 765        minfo = sa_prep_data(dir, NULL, entry);
 766        if (IS_ERR(minfo))
 767                return PTR_ERR(minfo);
 768
 769        rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
 770        if (rc)
 771                sa_fini_data(minfo);
 772
 773        return rc;
 774}
 775
 776/**
 777 * async stat for file found in dcache, similar to .revalidate
 778 *
 779 * \retval      1 dentry valid, no RPC sent
 780 * \retval      0 dentry invalid, will send async stat RPC
 781 * \retval      negative number upon error
 782 */
 783static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
 784                         struct dentry *dentry)
 785{
 786        struct inode         *inode = d_inode(dentry);
 787        struct lookup_intent      it = { .it_op = IT_GETATTR,
 788                                         .it_lock_handle = 0 };
 789        struct md_enqueue_info   *minfo;
 790        int rc;
 791
 792        if (unlikely(!inode))
 793                return 1;
 794
 795        if (d_mountpoint(dentry))
 796                return 1;
 797
 798        entry->se_inode = igrab(inode);
 799        rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
 800                                NULL);
 801        if (rc == 1) {
 802                entry->se_handle = it.it_lock_handle;
 803                ll_intent_release(&it);
 804                return 1;
 805        }
 806
 807        minfo = sa_prep_data(dir, inode, entry);
 808        if (IS_ERR(minfo)) {
 809                entry->se_inode = NULL;
 810                iput(inode);
 811                return PTR_ERR(minfo);
 812        }
 813
 814        rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
 815        if (rc) {
 816                entry->se_inode = NULL;
 817                iput(inode);
 818                sa_fini_data(minfo);
 819        }
 820
 821        return rc;
 822}
 823
 824/* async stat for file with @name */
 825static void sa_statahead(struct dentry *parent, const char *name, int len,
 826                         const struct lu_fid *fid)
 827{
 828        struct inode         *dir    = d_inode(parent);
 829        struct ll_inode_info     *lli    = ll_i2info(dir);
 830        struct ll_statahead_info *sai    = lli->lli_sai;
 831        struct dentry       *dentry = NULL;
 832        struct sa_entry *entry;
 833        int                    rc;
 834
 835        entry = sa_alloc(parent, sai, sai->sai_index, name, len, fid);
 836        if (IS_ERR(entry))
 837                return;
 838
 839        dentry = d_lookup(parent, &entry->se_qstr);
 840        if (!dentry) {
 841                rc = sa_lookup(dir, entry);
 842        } else {
 843                rc = sa_revalidate(dir, entry, dentry);
 844                if (rc == 1 && agl_should_run(sai, d_inode(dentry)))
 845                        ll_agl_add(sai, d_inode(dentry), entry->se_index);
 846        }
 847
 848        if (dentry)
 849                dput(dentry);
 850
 851        if (rc)
 852                sa_make_ready(sai, entry, rc);
 853        else
 854                sai->sai_sent++;
 855
 856        sai->sai_index++;
 857}
 858
 859/* async glimpse (agl) thread main function */
 860static int ll_agl_thread(void *arg)
 861{
 862        struct dentry       *parent = arg;
 863        struct inode         *dir    = d_inode(parent);
 864        struct ll_inode_info     *plli   = ll_i2info(dir);
 865        struct ll_inode_info     *clli;
 866        /* We already own this reference, so it is safe to take it without a lock. */
 867        struct ll_statahead_info *sai = plli->lli_sai;
 868
 869        CDEBUG(D_READA, "agl thread started: sai %p, parent %pd\n",
 870               sai, parent);
 871
 872        while (!kthread_should_stop()) {
 873
 874                spin_lock(&plli->lli_agl_lock);
 875                /* The statahead thread maybe help to process AGL entries,
 876                 * so check whether list empty again.
 877                 */
 878                if (!list_empty(&sai->sai_agls)) {
 879                        clli = list_entry(sai->sai_agls.next,
 880                                          struct ll_inode_info, lli_agl_list);
 881                        list_del_init(&clli->lli_agl_list);
 882                        spin_unlock(&plli->lli_agl_lock);
 883                        ll_agl_trigger(&clli->lli_vfs_inode, sai);
 884                } else {
 885                        spin_unlock(&plli->lli_agl_lock);
 886                }
 887
 888                set_current_state(TASK_IDLE);
 889                if (list_empty(&sai->sai_agls) &&
 890                    !kthread_should_stop())
 891                        schedule();
 892                __set_current_state(TASK_RUNNING);
 893        }
 894
 895        spin_lock(&plli->lli_agl_lock);
 896        sai->sai_agl_valid = 0;
 897        while (!list_empty(&sai->sai_agls)) {
 898                clli = list_entry(sai->sai_agls.next,
 899                                  struct ll_inode_info, lli_agl_list);
 900                list_del_init(&clli->lli_agl_list);
 901                spin_unlock(&plli->lli_agl_lock);
 902                clli->lli_agl_index = 0;
 903                iput(&clli->lli_vfs_inode);
 904                spin_lock(&plli->lli_agl_lock);
 905        }
 906        spin_unlock(&plli->lli_agl_lock);
 907        CDEBUG(D_READA, "agl thread stopped: sai %p, parent %pd\n",
 908               sai, parent);
 909        ll_sai_put(sai);
 910        return 0;
 911}
 912
 913/* start agl thread */
 914static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
 915{
 916        struct ll_inode_info  *plli;
 917        struct task_struct *task;
 918
 919        CDEBUG(D_READA, "start agl thread: sai %p, parent %pd\n",
 920               sai, parent);
 921
 922        plli = ll_i2info(d_inode(parent));
 923        task = kthread_create(ll_agl_thread, parent, "ll_agl_%u",
 924                              plli->lli_opendir_pid);
 925        if (IS_ERR(task)) {
 926                CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
 927                return;
 928        }
 929
 930        sai->sai_agl_task = task;
 931        atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total);
 932        spin_lock(&plli->lli_agl_lock);
 933        sai->sai_agl_valid = 1;
 934        spin_unlock(&plli->lli_agl_lock);
 935        /* Get an extra reference that the thread holds */
 936        ll_sai_get(d_inode(parent));
 937
 938        wake_up_process(task);
 939}
 940
 941/* statahead thread main function */
 942static int ll_statahead_thread(void *arg)
 943{
 944        struct dentry       *parent = arg;
 945        struct inode         *dir    = d_inode(parent);
 946        struct ll_inode_info     *lli   = ll_i2info(dir);
 947        struct ll_sb_info       *sbi    = ll_i2sbi(dir);
 948        struct ll_statahead_info *sai = lli->lli_sai;
 949        struct page           *page = NULL;
 950        __u64                pos    = 0;
 951        int                    first  = 0;
 952        int                    rc     = 0;
 953        struct md_op_data *op_data;
 954
 955        CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
 956               sai, parent);
 957
 958        op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
 959                                     LUSTRE_OPC_ANY, dir);
 960        if (IS_ERR(op_data)) {
 961                rc = PTR_ERR(op_data);
 962                goto out;
 963        }
 964
 965        op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages;
 966
 967        while (pos != MDS_DIR_END_OFF && sai->sai_task) {
 968                struct lu_dirpage *dp;
 969                struct lu_dirent  *ent;
 970
 971                sai->sai_in_readpage = 1;
 972                page = ll_get_dir_page(dir, op_data, pos);
 973                sai->sai_in_readpage = 0;
 974                if (IS_ERR(page)) {
 975                        rc = PTR_ERR(page);
 976                        CDEBUG(D_READA, "error reading dir " DFID " at %llu/%llu: opendir_pid = %u: rc = %d\n",
 977                               PFID(ll_inode2fid(dir)), pos, sai->sai_index,
 978                               lli->lli_opendir_pid, rc);
 979                        break;
 980                }
 981
 982                dp = page_address(page);
 983                for (ent = lu_dirent_start(dp);
 984                     ent && sai->sai_task && !sa_low_hit(sai);
 985                     ent = lu_dirent_next(ent)) {
 986                        struct lu_fid fid;
 987                        __u64 hash;
 988                        int namelen;
 989                        char *name;
 990
 991                        hash = le64_to_cpu(ent->lde_hash);
 992                        if (unlikely(hash < pos))
 993                                /*
 994                                 * Skip until we find target hash value.
 995                                 */
 996                                continue;
 997
 998                        namelen = le16_to_cpu(ent->lde_namelen);
 999                        if (unlikely(namelen == 0))
1000                                /*
1001                                 * Skip dummy record.
1002                                 */
1003                                continue;
1004
1005                        name = ent->lde_name;
1006                        if (name[0] == '.') {
1007                                if (namelen == 1) {
1008                                        /*
1009                                         * skip "."
1010                                         */
1011                                        continue;
1012                                } else if (name[1] == '.' && namelen == 2) {
1013                                        /*
1014                                         * skip ".."
1015                                         */
1016                                        continue;
1017                                } else if (!sai->sai_ls_all) {
1018                                        /*
1019                                         * skip hidden files.
1020                                         */
1021                                        sai->sai_skip_hidden++;
1022                                        continue;
1023                                }
1024                        }
1025
1026                        /*
1027                         * don't stat-ahead first entry.
1028                         */
1029                        if (unlikely(++first == 1))
1030                                continue;
1031
1032                        fid_le_to_cpu(&fid, &ent->lde_fid);
1033
1034                        do {
1035                                sa_handle_callback(sai);
1036
1037                                spin_lock(&lli->lli_agl_lock);
1038                                while (sa_sent_full(sai) &&
1039                                       !agl_list_empty(sai)) {
1040                                        struct ll_inode_info *clli;
1041
1042                                        clli = list_entry(sai->sai_agls.next,
1043                                                          struct ll_inode_info,
1044                                                          lli_agl_list);
1045                                        list_del_init(&clli->lli_agl_list);
1046                                        spin_unlock(&lli->lli_agl_lock);
1047
1048                                        ll_agl_trigger(&clli->lli_vfs_inode,
1049                                                       sai);
1050
1051                                        spin_lock(&lli->lli_agl_lock);
1052                                }
1053                                spin_unlock(&lli->lli_agl_lock);
1054
1055                                set_current_state(TASK_IDLE);
1056                                if (sa_sent_full(sai) &&
1057                                    !sa_has_callback(sai) &&
1058                                    agl_list_empty(sai) &&
1059                                    sai->sai_task)
1060                                        /* wait for spare statahead window */
1061                                        schedule();
1062                                __set_current_state(TASK_RUNNING);
1063                        } while (sa_sent_full(sai) && sai->sai_task);
1064
1065                        sa_statahead(parent, name, namelen, &fid);
1066                }
1067
1068                pos = le64_to_cpu(dp->ldp_hash_end);
1069                ll_release_page(dir, page,
1070                                le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1071
1072                if (sa_low_hit(sai)) {
1073                        rc = -EFAULT;
1074                        atomic_inc(&sbi->ll_sa_wrong);
1075                        CDEBUG(D_READA, "Statahead for dir " DFID " hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stopping statahead thread: pid %d\n",
1076                               PFID(&lli->lli_fid), sai->sai_hit,
1077                               sai->sai_miss, sai->sai_sent,
1078                               sai->sai_replied, current_pid());
1079                        break;
1080                }
1081        }
1082        ll_finish_md_op_data(op_data);
1083
1084        if (rc < 0) {
1085                spin_lock(&lli->lli_sa_lock);
1086                sai->sai_task = NULL;
1087                lli->lli_sa_enabled = 0;
1088                spin_unlock(&lli->lli_sa_lock);
1089        }
1090
1091        /*
1092         * statahead is finished, but statahead entries need to be cached, wait
1093         * for file release to stop me.
1094         */
1095        while (sai->sai_task) {
1096                sa_handle_callback(sai);
1097
1098                set_current_state(TASK_IDLE);
1099                if (!sa_has_callback(sai) &&
1100                    sai->sai_task)
1101                        schedule();
1102                __set_current_state(TASK_RUNNING);
1103        }
1104out:
1105        if (sai->sai_agl_task) {
1106                kthread_stop(sai->sai_agl_task);
1107
1108                CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
1109                       sai, (unsigned int)sai->sai_agl_task->pid);
1110                sai->sai_agl_task = NULL;
1111        }
1112        /*
1113         * wait for inflight statahead RPCs to finish, and then we can free sai
1114         * safely because statahead RPC will access sai data
1115         */
1116        while (sai->sai_sent != sai->sai_replied) {
1117                /* in case we're not woken up, timeout wait */
1118                schedule_timeout_idle(HZ>>3);
1119        }
1120
1121        /* release resources held by statahead RPCs */
1122        sa_handle_callback(sai);
1123
1124        CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %pd\n",
1125               sai, parent);
1126
1127        spin_lock(&lli->lli_sa_lock);
1128        sai->sai_task = NULL;
1129        spin_unlock(&lli->lli_sa_lock);
1130
1131        wake_up(&sai->sai_waitq);
1132        ll_sai_put(sai);
1133
1134        do_exit(rc);
1135}
1136
1137/* authorize opened dir handle @key to statahead */
1138void ll_authorize_statahead(struct inode *dir, void *key)
1139{
1140        struct ll_inode_info *lli = ll_i2info(dir);
1141
1142        spin_lock(&lli->lli_sa_lock);
1143        if (!lli->lli_opendir_key && !lli->lli_sai) {
1144                /*
1145                 * if lli_sai is not NULL, it means previous statahead is not
1146                 * finished yet, we'd better not start a new statahead for now.
1147                 */
1148                LASSERT(!lli->lli_opendir_pid);
1149                lli->lli_opendir_key = key;
1150                lli->lli_opendir_pid = current_pid();
1151                lli->lli_sa_enabled = 1;
1152        }
1153        spin_unlock(&lli->lli_sa_lock);
1154}
1155
1156/*
1157 * deauthorize opened dir handle @key to statahead, but statahead thread may
1158 * still be running, notify it to quit.
1159 */
1160void ll_deauthorize_statahead(struct inode *dir, void *key)
1161{
1162        struct ll_inode_info *lli = ll_i2info(dir);
1163        struct ll_statahead_info *sai;
1164
1165        LASSERT(lli->lli_opendir_key == key);
1166        LASSERT(lli->lli_opendir_pid);
1167
1168        CDEBUG(D_READA, "deauthorize statahead for " DFID "\n",
1169               PFID(&lli->lli_fid));
1170
1171        spin_lock(&lli->lli_sa_lock);
1172        lli->lli_opendir_key = NULL;
1173        lli->lli_opendir_pid = 0;
1174        lli->lli_sa_enabled = 0;
1175        sai = lli->lli_sai;
1176        if (sai && sai->sai_task) {
1177                /*
1178                 * statahead thread may not quit yet because it needs to cache
1179                 * entries, now it's time to tell it to quit.
1180                 */
1181                wake_up_process(sai->sai_task);
1182                sai->sai_task = NULL;
1183        }
1184        spin_unlock(&lli->lli_sa_lock);
1185}
1186
1187enum {
1188        /**
1189         * not first dirent, or is "."
1190         */
1191        LS_NOT_FIRST_DE = 0,
1192        /**
1193         * the first non-hidden dirent
1194         */
1195        LS_FIRST_DE,
1196        /**
1197         * the first hidden dirent, that is "."
1198         */
1199        LS_FIRST_DOT_DE
1200};
1201
1202/* file is first dirent under @dir */
1203static int is_first_dirent(struct inode *dir, struct dentry *dentry)
1204{
1205        const struct qstr  *target = &dentry->d_name;
1206        struct md_op_data *op_data;
1207        struct page       *page;
1208        __u64            pos    = 0;
1209        int                dot_de;
1210        int rc = LS_NOT_FIRST_DE;
1211
1212        op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1213                                     LUSTRE_OPC_ANY, dir);
1214        if (IS_ERR(op_data))
1215                return PTR_ERR(op_data);
1216        /**
1217         * FIXME choose the start offset of the readdir
1218         */
1219        op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages;
1220
1221        page = ll_get_dir_page(dir, op_data, pos);
1222
1223        while (1) {
1224                struct lu_dirpage *dp;
1225                struct lu_dirent  *ent;
1226
1227                if (IS_ERR(page)) {
1228                        struct ll_inode_info *lli = ll_i2info(dir);
1229
1230                        rc = PTR_ERR(page);
1231                        CERROR("%s: error reading dir " DFID " at %llu: opendir_pid = %u : rc = %d\n",
1232                               ll_get_fsname(dir->i_sb, NULL, 0),
1233                               PFID(ll_inode2fid(dir)), pos,
1234                               lli->lli_opendir_pid, rc);
1235                        break;
1236                }
1237
1238                dp = page_address(page);
1239                for (ent = lu_dirent_start(dp); ent;
1240                     ent = lu_dirent_next(ent)) {
1241                        __u64 hash;
1242                        int namelen;
1243                        char *name;
1244
1245                        hash = le64_to_cpu(ent->lde_hash);
1246                        /* The ll_get_dir_page() can return any page containing
1247                         * the given hash which may be not the start hash.
1248                         */
1249                        if (unlikely(hash < pos))
1250                                continue;
1251
1252                        namelen = le16_to_cpu(ent->lde_namelen);
1253                        if (unlikely(namelen == 0))
1254                                /*
1255                                 * skip dummy record.
1256                                 */
1257                                continue;
1258
1259                        name = ent->lde_name;
1260                        if (name[0] == '.') {
1261                                if (namelen == 1)
1262                                        /*
1263                                         * skip "."
1264                                         */
1265                                        continue;
1266                                else if (name[1] == '.' && namelen == 2)
1267                                        /*
1268                                         * skip ".."
1269                                         */
1270                                        continue;
1271                                else
1272                                        dot_de = 1;
1273                        } else {
1274                                dot_de = 0;
1275                        }
1276
1277                        if (dot_de && target->name[0] != '.') {
1278                                CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1279                                       target->len, target->name,
1280                                       namelen, name);
1281                                continue;
1282                        }
1283
1284                        if (target->len != namelen ||
1285                            memcmp(target->name, name, namelen) != 0)
1286                                rc = LS_NOT_FIRST_DE;
1287                        else if (!dot_de)
1288                                rc = LS_FIRST_DE;
1289                        else
1290                                rc = LS_FIRST_DOT_DE;
1291
1292                        ll_release_page(dir, page, false);
1293                        goto out;
1294                }
1295                pos = le64_to_cpu(dp->ldp_hash_end);
1296                if (pos == MDS_DIR_END_OFF) {
1297                        /*
1298                         * End of directory reached.
1299                         */
1300                        ll_release_page(dir, page, false);
1301                        goto out;
1302                } else {
1303                        /*
1304                         * chain is exhausted
1305                         * Normal case: continue to the next page.
1306                         */
1307                        ll_release_page(dir, page,
1308                                        le32_to_cpu(dp->ldp_flags) &
1309                                        LDF_COLLIDE);
1310                        page = ll_get_dir_page(dir, op_data, pos);
1311                }
1312        }
1313out:
1314        ll_finish_md_op_data(op_data);
1315        return rc;
1316}
1317
1318/**
1319 * revalidate @dentryp from statahead cache
1320 *
1321 * \param[in]  dir      parent directory
1322 * \param[in]  sai      sai structure
1323 * \param[out] dentryp  pointer to dentry which will be revalidated
1324 * \param[in]  unplug   unplug statahead window only (normally for negative
1325 *                      dentry)
1326 * \retval              1 on success, dentry is saved in @dentryp
1327 * \retval              0 if revalidation failed (no proper lock on client)
1328 * \retval              negative number upon error
1329 */
1330static int revalidate_statahead_dentry(struct inode *dir,
1331                                       struct ll_statahead_info *sai,
1332                                       struct dentry **dentryp,
1333                                       bool unplug)
1334{
1335        struct ll_inode_info *lli = ll_i2info(dir);
1336        struct sa_entry *entry = NULL;
1337        struct ll_dentry_data *ldd;
1338        int rc = 0;
1339
1340        if ((*dentryp)->d_name.name[0] == '.') {
1341                if (sai->sai_ls_all ||
1342                    sai->sai_miss_hidden >= sai->sai_skip_hidden) {
1343                        /*
1344                         * Hidden dentry is the first one, or statahead
1345                         * thread does not skip so many hidden dentries
1346                         * before "sai_ls_all" enabled as below.
1347                         */
1348                } else {
1349                        if (!sai->sai_ls_all)
1350                                /*
1351                                 * It maybe because hidden dentry is not
1352                                 * the first one, "sai_ls_all" was not
1353                                 * set, then "ls -al" missed. Enable
1354                                 * "sai_ls_all" for such case.
1355                                 */
1356                                sai->sai_ls_all = 1;
1357
1358                        /*
1359                         * Such "getattr" has been skipped before
1360                         * "sai_ls_all" enabled as above.
1361                         */
1362                        sai->sai_miss_hidden++;
1363                        return -EAGAIN;
1364                }
1365        }
1366
1367        if (unplug) {
1368                rc = 1;
1369                goto out_unplug;
1370        }
1371
1372        entry = sa_get(sai, &(*dentryp)->d_name);
1373        if (!entry) {
1374                rc = -EAGAIN;
1375                goto out_unplug;
1376        }
1377
1378        /* if statahead is busy in readdir, help it do post-work */
1379        if (!sa_ready(entry) && sai->sai_in_readpage)
1380                sa_handle_callback(sai);
1381
1382        if (!sa_ready(entry)) {
1383                spin_lock(&lli->lli_sa_lock);
1384                sai->sai_index_wait = entry->se_index;
1385                spin_unlock(&lli->lli_sa_lock);
1386                if (0 == wait_event_idle_timeout(sai->sai_waitq,
1387                                                 sa_ready(entry), 30 * HZ)) {
1388                        /*
1389                         * entry may not be ready, so it may be used by inflight
1390                         * statahead RPC, don't free it.
1391                         */
1392                        entry = NULL;
1393                        rc = -EAGAIN;
1394                        goto out_unplug;
1395                }
1396        }
1397
1398        if (entry->se_state == SA_ENTRY_SUCC && entry->se_inode) {
1399                struct inode *inode = entry->se_inode;
1400                struct lookup_intent it = { .it_op = IT_GETATTR,
1401                                            .it_lock_handle = entry->se_handle };
1402                __u64 bits;
1403
1404                rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
1405                                        ll_inode2fid(inode), &bits);
1406                if (rc == 1) {
1407                        if (!(*dentryp)->d_inode) {
1408                                struct dentry *alias;
1409
1410                                alias = ll_splice_alias(inode, *dentryp);
1411                                if (IS_ERR(alias)) {
1412                                        ll_intent_release(&it);
1413                                        rc = PTR_ERR(alias);
1414                                        goto out_unplug;
1415                                }
1416                                *dentryp = alias;
1417                                /**
1418                                 * statahead prepared this inode, transfer inode
1419                                 * refcount from sa_entry to dentry
1420                                 */
1421                                entry->se_inode = NULL;
1422                        } else if ((*dentryp)->d_inode != inode) {
1423                                /* revalidate, but inode is recreated */
1424                                CDEBUG(D_READA,
1425                                       "%s: stale dentry %pd inode " DFID ", statahead inode " DFID "\n",
1426                                       ll_get_fsname((*dentryp)->d_inode->i_sb,
1427                                                     NULL, 0),
1428                                       *dentryp,
1429                                       PFID(ll_inode2fid((*dentryp)->d_inode)),
1430                                       PFID(ll_inode2fid(inode)));
1431                                ll_intent_release(&it);
1432                                rc = -ESTALE;
1433                                goto out_unplug;
1434                        }
1435
1436                        if ((bits & MDS_INODELOCK_LOOKUP) &&
1437                            d_lustre_invalid(*dentryp))
1438                                d_lustre_revalidate(*dentryp);
1439                        ll_intent_release(&it);
1440                }
1441        }
1442out_unplug:
1443        /*
1444         * statahead cached sa_entry can be used only once, and will be killed
1445         * right after use, so if lookup/revalidate accessed statahead cache,
1446         * set dentry ldd_sa_generation to parent lli_sa_generation, later if we
1447         * stat this file again, we know we've done statahead before, see
1448         * dentry_may_statahead().
1449         */
1450        ldd = ll_d2d(*dentryp);
1451        ldd->lld_sa_generation = lli->lli_sa_generation;
1452        sa_put(sai, entry, lli);
1453        return rc;
1454}
1455
1456/**
1457 * start statahead thread
1458 *
1459 * \param[in] dir       parent directory
1460 * \param[in] dentry    dentry that triggers statahead, normally the first
1461 *                      dirent under @dir
1462 * \retval              -EAGAIN on success, because when this function is
1463 *                      called, it's already in lookup call, so client should
1464 *                      do it itself instead of waiting for statahead thread
1465 *                      to do it asynchronously.
1466 * \retval              negative number upon error
1467 */
1468static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
1469{
1470        struct ll_inode_info *lli = ll_i2info(dir);
1471        struct ll_statahead_info *sai = NULL;
1472        struct task_struct *task;
1473        struct dentry *parent = dentry->d_parent;
1474        int rc;
1475
1476        /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
1477        rc = is_first_dirent(dir, dentry);
1478        if (rc == LS_NOT_FIRST_DE) {
1479                /* It is not "ls -{a}l" operation, no need statahead for it. */
1480                rc = -EFAULT;
1481                goto out;
1482        }
1483
1484        sai = ll_sai_alloc(parent);
1485        if (!sai) {
1486                rc = -ENOMEM;
1487                goto out;
1488        }
1489
1490        sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
1491        /*
1492         * if current lli_opendir_key was deauthorized, or dir re-opened by
1493         * another process, don't start statahead, otherwise the newly spawned
1494         * statahead thread won't be notified to quit.
1495         */
1496        spin_lock(&lli->lli_sa_lock);
1497        if (unlikely(lli->lli_sai || lli->lli_opendir_key ||
1498                     lli->lli_opendir_pid != current->pid)) {
1499                spin_unlock(&lli->lli_sa_lock);
1500                rc = -EPERM;
1501                goto out;
1502        }
1503        lli->lli_sai = sai;
1504        spin_unlock(&lli->lli_sa_lock);
1505
1506        atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_running);
1507
1508        CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
1509               current_pid(), parent);
1510
1511        task = kthread_create(ll_statahead_thread, parent, "ll_sa_%u",
1512                              lli->lli_opendir_pid);
1513        if (IS_ERR(task)) {
1514                rc = PTR_ERR(task);
1515                CERROR("can't start ll_sa thread, rc : %d\n", rc);
1516                goto out;
1517        }
1518
1519        if (ll_i2sbi(parent->d_inode)->ll_flags & LL_SBI_AGL_ENABLED)
1520                ll_start_agl(parent, sai);
1521
1522        atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_total);
1523        sai->sai_task = task;
1524
1525        wake_up_process(task);
1526
1527        /*
1528         * We don't stat-ahead for the first dirent since we are already in
1529         * lookup.
1530         */
1531        return -EAGAIN;
1532
1533out:
1534        /*
1535         * once we start statahead thread failed, disable statahead so
1536         * that subsequent stat won't waste time to try it.
1537         */
1538        spin_lock(&lli->lli_sa_lock);
1539        lli->lli_sa_enabled = 0;
1540        lli->lli_sai = NULL;
1541        spin_unlock(&lli->lli_sa_lock);
1542        if (sai)
1543                ll_sai_free(sai);
1544        return rc;
1545}
1546
1547/**
1548 * statahead entry function, this is called when client getattr on a file, it
1549 * will start statahead thread if this is the first dir entry, else revalidate
1550 * dentry from statahead cache.
1551 *
1552 * \param[in]  dir      parent directory
1553 * \param[out] dentryp  dentry to getattr
1554 * \param[in]  unplug   unplug statahead window only (normally for negative
1555 *                      dentry)
1556 * \retval              1 on success
1557 * \retval              0 revalidation from statahead cache failed, caller needs
1558 *                      to getattr from server directly
1559 * \retval              negative number on error, caller often ignores this and
1560 *                      then getattr from server
1561 */
1562int ll_statahead(struct inode *dir, struct dentry **dentryp, bool unplug)
1563{
1564        struct ll_statahead_info *sai;
1565
1566        sai = ll_sai_get(dir);
1567        if (sai) {
1568                int rc;
1569
1570                rc = revalidate_statahead_dentry(dir, sai, dentryp, unplug);
1571                CDEBUG(D_READA, "revalidate statahead %pd: %d.\n",
1572                       *dentryp, rc);
1573                ll_sai_put(sai);
1574                return rc;
1575        }
1576        return start_statahead_thread(dir, *dentryp);
1577}
1578