linux/drivers/staging/lustre/lustre/llite/statahead.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.gnu.org/licenses/gpl-2.0.html
  19 *
  20 * GPL HEADER END
  21 */
  22/*
  23 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  24 * Use is subject to license terms.
  25 *
  26 * Copyright (c) 2011, 2015, Intel Corporation.
  27 */
  28/*
  29 * This file is part of Lustre, http://www.lustre.org/
  30 * Lustre is a trademark of Sun Microsystems, Inc.
  31 */
  32
  33#include <linux/fs.h>
  34#include <linux/sched.h>
  35#include <linux/mm.h>
  36#include <linux/highmem.h>
  37#include <linux/pagemap.h>
  38
  39#define DEBUG_SUBSYSTEM S_LLITE
  40
  41#include "../include/obd_support.h"
  42#include "../include/lustre_dlm.h"
  43#include "llite_internal.h"
  44
  45#define SA_OMITTED_ENTRY_MAX 8ULL
  46
  47enum se_stat {
  48        /** negative values are for error cases */
  49        SA_ENTRY_INIT = 0,      /** init entry */
  50        SA_ENTRY_SUCC = 1,      /** stat succeed */
  51        SA_ENTRY_INVA = 2,      /** invalid entry */
  52};
  53
  54/*
  55 * sa_entry is not refcounted: statahead thread allocates it and do async stat,
  56 * and in async stat callback ll_statahead_interpret() will add it into
  57 * sai_interim_entries, later statahead thread will call sa_handle_callback() to
  58 * instantiate entry and move it into sai_entries, and then only scanner process
  59 * can access and free it.
  60 */
  61struct sa_entry {
  62        /* link into sai_interim_entries or sai_entries */
  63        struct list_head              se_list;
  64        /* link into sai hash table locally */
  65        struct list_head              se_hash;
  66        /* entry index in the sai */
  67        __u64              se_index;
  68        /* low layer ldlm lock handle */
  69        __u64              se_handle;
  70        /* entry status */
  71        enum se_stat            se_state;
  72        /* entry size, contains name */
  73        int                  se_size;
  74        /* pointer to async getattr enqueue info */
  75        struct md_enqueue_info *se_minfo;
  76        /* pointer to the async getattr request */
  77        struct ptlrpc_request  *se_req;
  78        /* pointer to the target inode */
  79        struct inode       *se_inode;
  80        /* entry name */
  81        struct qstr          se_qstr;
  82};
  83
  84static unsigned int sai_generation;
  85static DEFINE_SPINLOCK(sai_generation_lock);
  86
  87/* sa_entry is ready to use */
  88static inline int sa_ready(struct sa_entry *entry)
  89{
  90        smp_rmb();
  91        return (entry->se_state != SA_ENTRY_INIT);
  92}
  93
  94/* hash value to put in sai_cache */
  95static inline int sa_hash(int val)
  96{
  97        return val & LL_SA_CACHE_MASK;
  98}
  99
 100/* hash entry into sai_cache */
 101static inline void
 102sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry)
 103{
 104        int i = sa_hash(entry->se_qstr.hash);
 105
 106        spin_lock(&sai->sai_cache_lock[i]);
 107        list_add_tail(&entry->se_hash, &sai->sai_cache[i]);
 108        spin_unlock(&sai->sai_cache_lock[i]);
 109}
 110
 111/*
 112 * Remove entry from SA table.
 113 */
 114static inline void
 115sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry)
 116{
 117        int i = sa_hash(entry->se_qstr.hash);
 118
 119        spin_lock(&sai->sai_cache_lock[i]);
 120        list_del_init(&entry->se_hash);
 121        spin_unlock(&sai->sai_cache_lock[i]);
 122}
 123
 124static inline int agl_should_run(struct ll_statahead_info *sai,
 125                                 struct inode *inode)
 126{
 127        return (inode && S_ISREG(inode->i_mode) && sai->sai_agl_valid);
 128}
 129
 130/* statahead window is full */
 131static inline int sa_sent_full(struct ll_statahead_info *sai)
 132{
 133        return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
 134}
 135
 136/* got async stat replies */
 137static inline int sa_has_callback(struct ll_statahead_info *sai)
 138{
 139        return !list_empty(&sai->sai_interim_entries);
 140}
 141
 142static inline int agl_list_empty(struct ll_statahead_info *sai)
 143{
 144        return list_empty(&sai->sai_agls);
 145}
 146
 147/**
 148 * (1) hit ratio less than 80%
 149 * or
 150 * (2) consecutive miss more than 8
 151 * then means low hit.
 152 */
 153static inline int sa_low_hit(struct ll_statahead_info *sai)
 154{
 155        return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
 156                (sai->sai_consecutive_miss > 8));
 157}
 158
 159/*
 160 * if the given index is behind of statahead window more than
 161 * SA_OMITTED_ENTRY_MAX, then it is old.
 162 */
 163static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
 164{
 165        return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX <
 166                 sai->sai_index);
 167}
 168
 169/* allocate sa_entry and hash it to allow scanner process to find it */
 170static struct sa_entry *
 171sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
 172         const char *name, int len)
 173{
 174        struct ll_inode_info *lli;
 175        struct sa_entry   *entry;
 176        int                entry_size;
 177        char             *dname;
 178
 179        entry_size = sizeof(struct sa_entry) + (len & ~3) + 4;
 180        entry = kzalloc(entry_size, GFP_NOFS);
 181        if (unlikely(!entry))
 182                return ERR_PTR(-ENOMEM);
 183
 184        CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n",
 185               len, name, entry, index);
 186
 187        entry->se_index = index;
 188        entry->se_state = SA_ENTRY_INIT;
 189        entry->se_size = entry_size;
 190        dname = (char *)entry + sizeof(struct sa_entry);
 191        memcpy(dname, name, len);
 192        dname[len] = 0;
 193
 194        entry->se_qstr.hash = full_name_hash(parent, name, len);
 195        entry->se_qstr.len = len;
 196        entry->se_qstr.name = dname;
 197
 198        lli = ll_i2info(sai->sai_dentry->d_inode);
 199        spin_lock(&lli->lli_sa_lock);
 200        INIT_LIST_HEAD(&entry->se_list);
 201        sa_rehash(sai, entry);
 202        spin_unlock(&lli->lli_sa_lock);
 203
 204        atomic_inc(&sai->sai_cache_count);
 205
 206        return entry;
 207}
 208
 209/* free sa_entry, which should have been unhashed and not in any list */
 210static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
 211{
 212        CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
 213               entry->se_qstr.len, entry->se_qstr.name, entry,
 214               entry->se_index);
 215
 216        LASSERT(list_empty(&entry->se_list));
 217        LASSERT(list_empty(&entry->se_hash));
 218
 219        kfree(entry);
 220        atomic_dec(&sai->sai_cache_count);
 221}
 222
 223/*
 224 * find sa_entry by name, used by directory scanner, lock is not needed because
 225 * only scanner can remove the entry from cache.
 226 */
 227static struct sa_entry *
 228sa_get(struct ll_statahead_info *sai, const struct qstr *qstr)
 229{
 230        struct sa_entry *entry;
 231        int i = sa_hash(qstr->hash);
 232
 233        list_for_each_entry(entry, &sai->sai_cache[i], se_hash) {
 234                if (entry->se_qstr.hash == qstr->hash &&
 235                    entry->se_qstr.len == qstr->len &&
 236                    memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0)
 237                        return entry;
 238        }
 239        return NULL;
 240}
 241
 242/* unhash and unlink sa_entry, and then free it */
 243static inline void
 244sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry)
 245{
 246        struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
 247
 248        LASSERT(!list_empty(&entry->se_hash));
 249        LASSERT(!list_empty(&entry->se_list));
 250        LASSERT(sa_ready(entry));
 251
 252        sa_unhash(sai, entry);
 253
 254        spin_lock(&lli->lli_sa_lock);
 255        list_del_init(&entry->se_list);
 256        spin_unlock(&lli->lli_sa_lock);
 257
 258        if (entry->se_inode)
 259                iput(entry->se_inode);
 260
 261        sa_free(sai, entry);
 262}
 263
 264/* called by scanner after use, sa_entry will be killed */
 265static void
 266sa_put(struct ll_statahead_info *sai, struct sa_entry *entry)
 267{
 268        struct sa_entry *tmp, *next;
 269
 270        if (entry && entry->se_state == SA_ENTRY_SUCC) {
 271                struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
 272
 273                sai->sai_hit++;
 274                sai->sai_consecutive_miss = 0;
 275                sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
 276        } else {
 277                sai->sai_miss++;
 278                sai->sai_consecutive_miss++;
 279        }
 280
 281        if (entry)
 282                sa_kill(sai, entry);
 283
 284        /*
 285         * kill old completed entries, only scanner process does this, no need
 286         * to lock
 287         */
 288        list_for_each_entry_safe(tmp, next, &sai->sai_entries, se_list) {
 289                if (!is_omitted_entry(sai, tmp->se_index))
 290                        break;
 291                sa_kill(sai, tmp);
 292        }
 293
 294        wake_up(&sai->sai_thread.t_ctl_waitq);
 295}
 296
 297/*
 298 * update state and sort add entry to sai_entries by index, return true if
 299 * scanner is waiting on this entry.
 300 */
 301static bool
 302__sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
 303{
 304        struct list_head *pos = &sai->sai_entries;
 305        __u64 index = entry->se_index;
 306        struct sa_entry *se;
 307
 308        LASSERT(!sa_ready(entry));
 309        LASSERT(list_empty(&entry->se_list));
 310
 311        list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
 312                if (se->se_index < entry->se_index) {
 313                        pos = &se->se_list;
 314                        break;
 315                }
 316        }
 317        list_add(&entry->se_list, pos);
 318        entry->se_state = ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC;
 319
 320        return (index == sai->sai_index_wait);
 321}
 322
 323/*
 324 * release resources used in async stat RPC, update entry state and wakeup if
 325 * scanner process it waiting on this entry.
 326 */
 327static void
 328sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
 329{
 330        struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
 331        struct md_enqueue_info *minfo = entry->se_minfo;
 332        struct ptlrpc_request *req = entry->se_req;
 333        bool wakeup;
 334
 335        /* release resources used in RPC */
 336        if (minfo) {
 337                entry->se_minfo = NULL;
 338                ll_intent_release(&minfo->mi_it);
 339                iput(minfo->mi_dir);
 340                kfree(minfo);
 341        }
 342
 343        if (req) {
 344                entry->se_req = NULL;
 345                ptlrpc_req_finished(req);
 346        }
 347
 348        spin_lock(&lli->lli_sa_lock);
 349        wakeup = __sa_make_ready(sai, entry, ret);
 350        spin_unlock(&lli->lli_sa_lock);
 351
 352        if (wakeup)
 353                wake_up(&sai->sai_waitq);
 354}
 355
 356/* Insert inode into the list of sai_agls. */
 357static void ll_agl_add(struct ll_statahead_info *sai,
 358                       struct inode *inode, int index)
 359{
 360        struct ll_inode_info *child  = ll_i2info(inode);
 361        struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
 362        int                added  = 0;
 363
 364        spin_lock(&child->lli_agl_lock);
 365        if (child->lli_agl_index == 0) {
 366                child->lli_agl_index = index;
 367                spin_unlock(&child->lli_agl_lock);
 368
 369                LASSERT(list_empty(&child->lli_agl_list));
 370
 371                igrab(inode);
 372                spin_lock(&parent->lli_agl_lock);
 373                if (list_empty(&sai->sai_agls))
 374                        added = 1;
 375                list_add_tail(&child->lli_agl_list, &sai->sai_agls);
 376                spin_unlock(&parent->lli_agl_lock);
 377        } else {
 378                spin_unlock(&child->lli_agl_lock);
 379        }
 380
 381        if (added > 0)
 382                wake_up(&sai->sai_agl_thread.t_ctl_waitq);
 383}
 384
 385/* allocate sai */
 386static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
 387{
 388        struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
 389        struct ll_statahead_info *sai;
 390        int                    i;
 391
 392        sai = kzalloc(sizeof(*sai), GFP_NOFS);
 393        if (!sai)
 394                return NULL;
 395
 396        sai->sai_dentry = dget(dentry);
 397        atomic_set(&sai->sai_refcount, 1);
 398
 399        sai->sai_max = LL_SA_RPC_MIN;
 400        sai->sai_index = 1;
 401        init_waitqueue_head(&sai->sai_waitq);
 402        init_waitqueue_head(&sai->sai_thread.t_ctl_waitq);
 403        init_waitqueue_head(&sai->sai_agl_thread.t_ctl_waitq);
 404
 405        INIT_LIST_HEAD(&sai->sai_interim_entries);
 406        INIT_LIST_HEAD(&sai->sai_entries);
 407        INIT_LIST_HEAD(&sai->sai_agls);
 408
 409        for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
 410                INIT_LIST_HEAD(&sai->sai_cache[i]);
 411                spin_lock_init(&sai->sai_cache_lock[i]);
 412        }
 413        atomic_set(&sai->sai_cache_count, 0);
 414
 415        spin_lock(&sai_generation_lock);
 416        lli->lli_sa_generation = ++sai_generation;
 417        if (unlikely(!sai_generation))
 418                lli->lli_sa_generation = ++sai_generation;
 419        spin_unlock(&sai_generation_lock);
 420
 421        return sai;
 422}
 423
 424/* free sai */
 425static inline void ll_sai_free(struct ll_statahead_info *sai)
 426{
 427        LASSERT(sai->sai_dentry);
 428        dput(sai->sai_dentry);
 429        kfree(sai);
 430}
 431
 432/*
 433 * take refcount of sai if sai for @dir exists, which means statahead is on for
 434 * this directory.
 435 */
 436static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
 437{
 438        struct ll_inode_info *lli = ll_i2info(dir);
 439        struct ll_statahead_info *sai = NULL;
 440
 441        spin_lock(&lli->lli_sa_lock);
 442        sai = lli->lli_sai;
 443        if (sai)
 444                atomic_inc(&sai->sai_refcount);
 445        spin_unlock(&lli->lli_sa_lock);
 446
 447        return sai;
 448}
 449
 450/*
 451 * put sai refcount after use, if refcount reaches zero, free sai and sa_entries
 452 * attached to it.
 453 */
 454static void ll_sai_put(struct ll_statahead_info *sai)
 455{
 456        struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
 457
 458        if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
 459                struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
 460                struct sa_entry *entry, *next;
 461
 462                lli->lli_sai = NULL;
 463                spin_unlock(&lli->lli_sa_lock);
 464
 465                LASSERT(thread_is_stopped(&sai->sai_thread));
 466                LASSERT(thread_is_stopped(&sai->sai_agl_thread));
 467                LASSERT(sai->sai_sent == sai->sai_replied);
 468                LASSERT(!sa_has_callback(sai));
 469
 470                list_for_each_entry_safe(entry, next, &sai->sai_entries,
 471                                         se_list)
 472                        sa_kill(sai, entry);
 473
 474                LASSERT(atomic_read(&sai->sai_cache_count) == 0);
 475                LASSERT(list_empty(&sai->sai_agls));
 476
 477                ll_sai_free(sai);
 478                atomic_dec(&sbi->ll_sa_running);
 479        }
 480}
 481
 482/* Do NOT forget to drop inode refcount when into sai_agls. */
 483static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
 484{
 485        struct ll_inode_info *lli   = ll_i2info(inode);
 486        __u64            index = lli->lli_agl_index;
 487        int                rc;
 488
 489        LASSERT(list_empty(&lli->lli_agl_list));
 490
 491        /* AGL maybe fall behind statahead with one entry */
 492        if (is_omitted_entry(sai, index + 1)) {
 493                lli->lli_agl_index = 0;
 494                iput(inode);
 495                return;
 496        }
 497
 498        /* Someone is in glimpse (sync or async), do nothing. */
 499        rc = down_write_trylock(&lli->lli_glimpse_sem);
 500        if (rc == 0) {
 501                lli->lli_agl_index = 0;
 502                iput(inode);
 503                return;
 504        }
 505
 506        /*
 507         * Someone triggered glimpse within 1 sec before.
 508         * 1) The former glimpse succeeded with glimpse lock granted by OST, and
 509         *    if the lock is still cached on client, AGL needs to do nothing. If
 510         *    it is cancelled by other client, AGL maybe cannot obtain new lock
 511         *    for no glimpse callback triggered by AGL.
 512         * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
 513         *    Under such case, it is quite possible that the OST will not grant
 514         *    glimpse lock for AGL also.
 515         * 3) The former glimpse failed, compared with other two cases, it is
 516         *    relative rare. AGL can ignore such case, and it will not muchly
 517         *    affect the performance.
 518         */
 519        if (lli->lli_glimpse_time != 0 &&
 520            time_before(cfs_time_shift(-1), lli->lli_glimpse_time)) {
 521                up_write(&lli->lli_glimpse_sem);
 522                lli->lli_agl_index = 0;
 523                iput(inode);
 524                return;
 525        }
 526
 527        CDEBUG(D_READA, "Handling (init) async glimpse: inode = "
 528               DFID", idx = %llu\n", PFID(&lli->lli_fid), index);
 529
 530        cl_agl(inode);
 531        lli->lli_agl_index = 0;
 532        lli->lli_glimpse_time = cfs_time_current();
 533        up_write(&lli->lli_glimpse_sem);
 534
 535        CDEBUG(D_READA, "Handled (init) async glimpse: inode= "
 536               DFID", idx = %llu, rc = %d\n",
 537               PFID(&lli->lli_fid), index, rc);
 538
 539        iput(inode);
 540}
 541
 542/*
 543 * prepare inode for sa entry, add it into agl list, now sa_entry is ready
 544 * to be used by scanner process.
 545 */
 546static void sa_instantiate(struct ll_statahead_info *sai,
 547                           struct sa_entry *entry)
 548{
 549        struct inode *dir = sai->sai_dentry->d_inode;
 550        struct inode       *child;
 551        struct md_enqueue_info *minfo;
 552        struct lookup_intent   *it;
 553        struct ptlrpc_request  *req;
 554        struct mdt_body *body;
 555        int                  rc    = 0;
 556
 557        LASSERT(entry->se_handle != 0);
 558
 559        minfo = entry->se_minfo;
 560        it = &minfo->mi_it;
 561        req = entry->se_req;
 562        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
 563        if (!body) {
 564                rc = -EFAULT;
 565                goto out;
 566        }
 567
 568        child = entry->se_inode;
 569        if (!child) {
 570                /*
 571                 * lookup.
 572                 */
 573                LASSERT(fid_is_zero(&minfo->mi_data.op_fid2));
 574
 575                /* XXX: No fid in reply, this is probably cross-ref case.
 576                 * SA can't handle it yet.
 577                 */
 578                if (body->mbo_valid & OBD_MD_MDS) {
 579                        rc = -EAGAIN;
 580                        goto out;
 581                }
 582        } else {
 583                /*
 584                 * revalidate.
 585                 */
 586                /* unlinked and re-created with the same name */
 587                if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2, &body->mbo_fid1))) {
 588                        entry->se_inode = NULL;
 589                        iput(child);
 590                        child = NULL;
 591                }
 592        }
 593
 594        it->it_lock_handle = entry->se_handle;
 595        rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
 596        if (rc != 1) {
 597                rc = -EAGAIN;
 598                goto out;
 599        }
 600
 601        rc = ll_prep_inode(&child, req, dir->i_sb, it);
 602        if (rc)
 603                goto out;
 604
 605        CDEBUG(D_READA, "%s: setting %.*s" DFID " l_data to inode %p\n",
 606               ll_get_fsname(child->i_sb, NULL, 0),
 607               entry->se_qstr.len, entry->se_qstr.name,
 608               PFID(ll_inode2fid(child)), child);
 609        ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
 610
 611        entry->se_inode = child;
 612
 613        if (agl_should_run(sai, child))
 614                ll_agl_add(sai, child, entry->se_index);
 615
 616out:
 617        /*
 618         * sa_make_ready() will drop ldlm ibits lock refcount by calling
 619         * ll_intent_drop_lock() in spite of failures. Do not worry about
 620         * calling ll_intent_drop_lock() more than once.
 621         */
 622        sa_make_ready(sai, entry, rc);
 623}
 624
 625/* once there are async stat replies, instantiate sa_entry from replies */
 626static void sa_handle_callback(struct ll_statahead_info *sai)
 627{
 628        struct ll_inode_info *lli;
 629
 630        lli = ll_i2info(sai->sai_dentry->d_inode);
 631
 632        while (sa_has_callback(sai)) {
 633                struct sa_entry *entry;
 634
 635                spin_lock(&lli->lli_sa_lock);
 636                if (unlikely(!sa_has_callback(sai))) {
 637                        spin_unlock(&lli->lli_sa_lock);
 638                        break;
 639                }
 640                entry = list_entry(sai->sai_interim_entries.next,
 641                                   struct sa_entry, se_list);
 642                list_del_init(&entry->se_list);
 643                spin_unlock(&lli->lli_sa_lock);
 644
 645                sa_instantiate(sai, entry);
 646        }
 647}
 648
 649/*
 650 * callback for async stat, because this is called in ptlrpcd context, we only
 651 * put sa_entry in sai_cb_entries list, and let sa_handle_callback() to really
 652 * prepare inode and instantiate sa_entry later.
 653 */
 654static int ll_statahead_interpret(struct ptlrpc_request *req,
 655                                  struct md_enqueue_info *minfo, int rc)
 656{
 657        struct lookup_intent     *it  = &minfo->mi_it;
 658        struct inode         *dir = minfo->mi_dir;
 659        struct ll_inode_info     *lli = ll_i2info(dir);
 660        struct ll_statahead_info *sai = lli->lli_sai;
 661        struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata;
 662        wait_queue_head_t *waitq = NULL;
 663        __u64 handle = 0;
 664
 665        if (it_disposition(it, DISP_LOOKUP_NEG))
 666                rc = -ENOENT;
 667
 668        /*
 669         * because statahead thread will wait for all inflight RPC to finish,
 670         * sai should be always valid, no need to refcount
 671         */
 672        LASSERT(sai);
 673        LASSERT(!thread_is_stopped(&sai->sai_thread));
 674        LASSERT(entry);
 675
 676        CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
 677               entry->se_qstr.len, entry->se_qstr.name, rc);
 678
 679        if (rc) {
 680                ll_intent_release(it);
 681                iput(dir);
 682                kfree(minfo);
 683        } else {
 684                /*
 685                 * release ibits lock ASAP to avoid deadlock when statahead
 686                 * thread enqueues lock on parent in readdir and another
 687                 * process enqueues lock on child with parent lock held, eg.
 688                 * unlink.
 689                 */
 690                handle = it->it_lock_handle;
 691                ll_intent_drop_lock(it);
 692        }
 693
 694        spin_lock(&lli->lli_sa_lock);
 695        if (rc) {
 696                if (__sa_make_ready(sai, entry, rc))
 697                        waitq = &sai->sai_waitq;
 698        } else {
 699                entry->se_minfo = minfo;
 700                entry->se_req = ptlrpc_request_addref(req);
 701                /*
 702                 * Release the async ibits lock ASAP to avoid deadlock
 703                 * when statahead thread tries to enqueue lock on parent
 704                 * for readpage and other tries to enqueue lock on child
 705                 * with parent's lock held, for example: unlink.
 706                 */
 707                entry->se_handle = handle;
 708                if (!sa_has_callback(sai))
 709                        waitq = &sai->sai_thread.t_ctl_waitq;
 710
 711                list_add_tail(&entry->se_list, &sai->sai_interim_entries);
 712        }
 713        sai->sai_replied++;
 714
 715        if (waitq)
 716                wake_up(waitq);
 717        spin_unlock(&lli->lli_sa_lock);
 718
 719        return rc;
 720}
 721
 722/* finish async stat RPC arguments */
 723static void sa_fini_data(struct md_enqueue_info *minfo,
 724                         struct ldlm_enqueue_info *einfo)
 725{
 726        LASSERT(minfo && einfo);
 727        iput(minfo->mi_dir);
 728        kfree(minfo);
 729        kfree(einfo);
 730}
 731
 732/**
 733 * prepare arguments for async stat RPC.
 734 */
 735static int sa_prep_data(struct inode *dir, struct inode *child,
 736                        struct sa_entry *entry, struct md_enqueue_info **pmi,
 737                        struct ldlm_enqueue_info **pei)
 738{
 739        const struct qstr      *qstr = &entry->se_qstr;
 740        struct md_enqueue_info   *minfo;
 741        struct ldlm_enqueue_info *einfo;
 742        struct md_op_data       *op_data;
 743
 744        einfo = kzalloc(sizeof(*einfo), GFP_NOFS);
 745        if (!einfo)
 746                return -ENOMEM;
 747
 748        minfo = kzalloc(sizeof(*minfo), GFP_NOFS);
 749        if (!minfo) {
 750                kfree(einfo);
 751                return -ENOMEM;
 752        }
 753
 754        op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child, qstr->name,
 755                                     qstr->len, 0, LUSTRE_OPC_ANY, NULL);
 756        if (IS_ERR(op_data)) {
 757                kfree(einfo);
 758                kfree(minfo);
 759                return PTR_ERR(op_data);
 760        }
 761
 762        minfo->mi_it.it_op = IT_GETATTR;
 763        minfo->mi_dir = igrab(dir);
 764        minfo->mi_cb = ll_statahead_interpret;
 765        minfo->mi_cbdata = entry;
 766
 767        einfo->ei_type   = LDLM_IBITS;
 768        einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
 769        einfo->ei_cb_bl  = ll_md_blocking_ast;
 770        einfo->ei_cb_cp  = ldlm_completion_ast;
 771        einfo->ei_cb_gl  = NULL;
 772        einfo->ei_cbdata = NULL;
 773
 774        *pmi = minfo;
 775        *pei = einfo;
 776
 777        return 0;
 778}
 779
 780/* async stat for file not found in dcache */
 781static int sa_lookup(struct inode *dir, struct sa_entry *entry)
 782{
 783        struct md_enqueue_info   *minfo;
 784        struct ldlm_enqueue_info *einfo;
 785        int                    rc;
 786
 787        rc = sa_prep_data(dir, NULL, entry, &minfo, &einfo);
 788        if (rc)
 789                return rc;
 790
 791        rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
 792        if (rc)
 793                sa_fini_data(minfo, einfo);
 794
 795        return rc;
 796}
 797
 798/**
 799 * async stat for file found in dcache, similar to .revalidate
 800 *
 801 * \retval      1 dentry valid, no RPC sent
 802 * \retval      0 dentry invalid, will send async stat RPC
 803 * \retval      negative number upon error
 804 */
 805static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
 806                         struct dentry *dentry)
 807{
 808        struct inode         *inode = d_inode(dentry);
 809        struct lookup_intent      it = { .it_op = IT_GETATTR,
 810                                         .it_lock_handle = 0 };
 811        struct md_enqueue_info   *minfo;
 812        struct ldlm_enqueue_info *einfo;
 813        int rc;
 814
 815        if (unlikely(!inode))
 816                return 1;
 817
 818        if (d_mountpoint(dentry))
 819                return 1;
 820
 821        entry->se_inode = igrab(inode);
 822        rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
 823                                NULL);
 824        if (rc == 1) {
 825                entry->se_handle = it.it_lock_handle;
 826                ll_intent_release(&it);
 827                return 1;
 828        }
 829
 830        rc = sa_prep_data(dir, inode, entry, &minfo, &einfo);
 831        if (rc) {
 832                entry->se_inode = NULL;
 833                iput(inode);
 834                return rc;
 835        }
 836
 837        rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
 838        if (rc) {
 839                entry->se_inode = NULL;
 840                iput(inode);
 841                sa_fini_data(minfo, einfo);
 842        }
 843
 844        return rc;
 845}
 846
 847/* async stat for file with @name */
 848static void sa_statahead(struct dentry *parent, const char *name, int len)
 849{
 850        struct inode         *dir    = d_inode(parent);
 851        struct ll_inode_info     *lli    = ll_i2info(dir);
 852        struct ll_statahead_info *sai    = lli->lli_sai;
 853        struct dentry       *dentry = NULL;
 854        struct sa_entry *entry;
 855        int                    rc;
 856
 857        entry = sa_alloc(parent, sai, sai->sai_index, name, len);
 858        if (IS_ERR(entry))
 859                return;
 860
 861        dentry = d_lookup(parent, &entry->se_qstr);
 862        if (!dentry) {
 863                rc = sa_lookup(dir, entry);
 864        } else {
 865                rc = sa_revalidate(dir, entry, dentry);
 866                if (rc == 1 && agl_should_run(sai, d_inode(dentry)))
 867                        ll_agl_add(sai, d_inode(dentry), entry->se_index);
 868        }
 869
 870        if (dentry)
 871                dput(dentry);
 872
 873        if (rc)
 874                sa_make_ready(sai, entry, rc);
 875        else
 876                sai->sai_sent++;
 877
 878        sai->sai_index++;
 879}
 880
 881/* async glimpse (agl) thread main function */
 882static int ll_agl_thread(void *arg)
 883{
 884        struct dentry       *parent = arg;
 885        struct inode         *dir    = d_inode(parent);
 886        struct ll_inode_info     *plli   = ll_i2info(dir);
 887        struct ll_inode_info     *clli;
 888        struct ll_sb_info       *sbi    = ll_i2sbi(dir);
 889        struct ll_statahead_info *sai;
 890        struct ptlrpc_thread *thread;
 891        struct l_wait_info      lwi    = { 0 };
 892
 893        sai = ll_sai_get(dir);
 894        thread = &sai->sai_agl_thread;
 895        thread->t_pid = current_pid();
 896        CDEBUG(D_READA, "agl thread started: sai %p, parent %pd\n",
 897               sai, parent);
 898
 899        atomic_inc(&sbi->ll_agl_total);
 900        spin_lock(&plli->lli_agl_lock);
 901        sai->sai_agl_valid = 1;
 902        if (thread_is_init(thread))
 903                /* If someone else has changed the thread state
 904                 * (e.g. already changed to SVC_STOPPING), we can't just
 905                 * blindly overwrite that setting.
 906                 */
 907                thread_set_flags(thread, SVC_RUNNING);
 908        spin_unlock(&plli->lli_agl_lock);
 909        wake_up(&thread->t_ctl_waitq);
 910
 911        while (1) {
 912                l_wait_event(thread->t_ctl_waitq,
 913                             !list_empty(&sai->sai_agls) ||
 914                             !thread_is_running(thread),
 915                             &lwi);
 916
 917                if (!thread_is_running(thread))
 918                        break;
 919
 920                spin_lock(&plli->lli_agl_lock);
 921                /* The statahead thread maybe help to process AGL entries,
 922                 * so check whether list empty again.
 923                 */
 924                if (!list_empty(&sai->sai_agls)) {
 925                        clli = list_entry(sai->sai_agls.next,
 926                                          struct ll_inode_info, lli_agl_list);
 927                        list_del_init(&clli->lli_agl_list);
 928                        spin_unlock(&plli->lli_agl_lock);
 929                        ll_agl_trigger(&clli->lli_vfs_inode, sai);
 930                } else {
 931                        spin_unlock(&plli->lli_agl_lock);
 932                }
 933        }
 934
 935        spin_lock(&plli->lli_agl_lock);
 936        sai->sai_agl_valid = 0;
 937        while (!list_empty(&sai->sai_agls)) {
 938                clli = list_entry(sai->sai_agls.next,
 939                                  struct ll_inode_info, lli_agl_list);
 940                list_del_init(&clli->lli_agl_list);
 941                spin_unlock(&plli->lli_agl_lock);
 942                clli->lli_agl_index = 0;
 943                iput(&clli->lli_vfs_inode);
 944                spin_lock(&plli->lli_agl_lock);
 945        }
 946        thread_set_flags(thread, SVC_STOPPED);
 947        spin_unlock(&plli->lli_agl_lock);
 948        wake_up(&thread->t_ctl_waitq);
 949        ll_sai_put(sai);
 950        CDEBUG(D_READA, "agl thread stopped: sai %p, parent %pd\n",
 951               sai, parent);
 952        return 0;
 953}
 954
 955/* start agl thread */
 956static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
 957{
 958        struct ptlrpc_thread *thread = &sai->sai_agl_thread;
 959        struct l_wait_info    lwi    = { 0 };
 960        struct ll_inode_info  *plli;
 961        struct task_struct *task;
 962
 963        CDEBUG(D_READA, "start agl thread: sai %p, parent %pd\n",
 964               sai, parent);
 965
 966        plli = ll_i2info(d_inode(parent));
 967        task = kthread_run(ll_agl_thread, parent, "ll_agl_%u",
 968                           plli->lli_opendir_pid);
 969        if (IS_ERR(task)) {
 970                CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
 971                thread_set_flags(thread, SVC_STOPPED);
 972                return;
 973        }
 974
 975        l_wait_event(thread->t_ctl_waitq,
 976                     thread_is_running(thread) || thread_is_stopped(thread),
 977                     &lwi);
 978}
 979
 980/* statahead thread main function */
 981static int ll_statahead_thread(void *arg)
 982{
 983        struct dentry       *parent = arg;
 984        struct inode         *dir    = d_inode(parent);
 985        struct ll_inode_info     *lli   = ll_i2info(dir);
 986        struct ll_sb_info       *sbi    = ll_i2sbi(dir);
 987        struct ll_statahead_info *sai;
 988        struct ptlrpc_thread *sa_thread;
 989        struct ptlrpc_thread *agl_thread;
 990        struct page           *page = NULL;
 991        __u64                pos    = 0;
 992        int                    first  = 0;
 993        int                    rc     = 0;
 994        struct md_op_data *op_data;
 995        struct l_wait_info      lwi    = { 0 };
 996
 997        sai = ll_sai_get(dir);
 998        sa_thread = &sai->sai_thread;
 999        agl_thread = &sai->sai_agl_thread;
1000        sa_thread->t_pid = current_pid();
1001        CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1002               sai, parent);
1003
1004        op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1005                                     LUSTRE_OPC_ANY, dir);
1006        if (IS_ERR(op_data)) {
1007                rc = PTR_ERR(op_data);
1008                goto out;
1009        }
1010
1011        op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages;
1012
1013        if (sbi->ll_flags & LL_SBI_AGL_ENABLED)
1014                ll_start_agl(parent, sai);
1015
1016        atomic_inc(&sbi->ll_sa_total);
1017        spin_lock(&lli->lli_sa_lock);
1018        if (thread_is_init(sa_thread))
1019                /* If someone else has changed the thread state
1020                 * (e.g. already changed to SVC_STOPPING), we can't just
1021                 * blindly overwrite that setting.
1022                 */
1023                thread_set_flags(sa_thread, SVC_RUNNING);
1024        spin_unlock(&lli->lli_sa_lock);
1025        wake_up(&sa_thread->t_ctl_waitq);
1026
1027        while (pos != MDS_DIR_END_OFF && thread_is_running(sa_thread)) {
1028                struct lu_dirpage *dp;
1029                struct lu_dirent  *ent;
1030
1031                sai->sai_in_readpage = 1;
1032                page = ll_get_dir_page(dir, op_data, pos);
1033                sai->sai_in_readpage = 0;
1034                if (IS_ERR(page)) {
1035                        rc = PTR_ERR(page);
1036                        CDEBUG(D_READA, "error reading dir "DFID" at %llu/%llu: opendir_pid = %u: rc = %d\n",
1037                               PFID(ll_inode2fid(dir)), pos, sai->sai_index,
1038                               lli->lli_opendir_pid, rc);
1039                        break;
1040                }
1041
1042                dp = page_address(page);
1043                for (ent = lu_dirent_start(dp);
1044                     ent && thread_is_running(sa_thread) && !sa_low_hit(sai);
1045                     ent = lu_dirent_next(ent)) {
1046                        __u64 hash;
1047                        int namelen;
1048                        char *name;
1049
1050                        hash = le64_to_cpu(ent->lde_hash);
1051                        if (unlikely(hash < pos))
1052                                /*
1053                                 * Skip until we find target hash value.
1054                                 */
1055                                continue;
1056
1057                        namelen = le16_to_cpu(ent->lde_namelen);
1058                        if (unlikely(namelen == 0))
1059                                /*
1060                                 * Skip dummy record.
1061                                 */
1062                                continue;
1063
1064                        name = ent->lde_name;
1065                        if (name[0] == '.') {
1066                                if (namelen == 1) {
1067                                        /*
1068                                         * skip "."
1069                                         */
1070                                        continue;
1071                                } else if (name[1] == '.' && namelen == 2) {
1072                                        /*
1073                                         * skip ".."
1074                                         */
1075                                        continue;
1076                                } else if (!sai->sai_ls_all) {
1077                                        /*
1078                                         * skip hidden files.
1079                                         */
1080                                        sai->sai_skip_hidden++;
1081                                        continue;
1082                                }
1083                        }
1084
1085                        /*
1086                         * don't stat-ahead first entry.
1087                         */
1088                        if (unlikely(++first == 1))
1089                                continue;
1090
1091                        /* wait for spare statahead window */
1092                        do {
1093                                l_wait_event(sa_thread->t_ctl_waitq,
1094                                             !sa_sent_full(sai) ||
1095                                             sa_has_callback(sai) ||
1096                                             !list_empty(&sai->sai_agls) ||
1097                                             !thread_is_running(sa_thread),
1098                                             &lwi);
1099                                sa_handle_callback(sai);
1100
1101                                spin_lock(&lli->lli_agl_lock);
1102                                while (sa_sent_full(sai) &&
1103                                       !agl_list_empty(sai)) {
1104                                        struct ll_inode_info *clli;
1105
1106                                        clli = list_entry(sai->sai_agls.next,
1107                                                          struct ll_inode_info, lli_agl_list);
1108                                        list_del_init(&clli->lli_agl_list);
1109                                        spin_unlock(&lli->lli_agl_lock);
1110
1111                                        ll_agl_trigger(&clli->lli_vfs_inode,
1112                                                       sai);
1113
1114                                        spin_lock(&lli->lli_agl_lock);
1115                                }
1116                                spin_unlock(&lli->lli_agl_lock);
1117                        } while (sa_sent_full(sai) &&
1118                                 thread_is_running(sa_thread));
1119
1120                        sa_statahead(parent, name, namelen);
1121                }
1122
1123                pos = le64_to_cpu(dp->ldp_hash_end);
1124                ll_release_page(dir, page,
1125                                le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1126
1127                if (sa_low_hit(sai)) {
1128                        rc = -EFAULT;
1129                        atomic_inc(&sbi->ll_sa_wrong);
1130                        CDEBUG(D_READA, "Statahead for dir "DFID" hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stopping statahead thread: pid %d\n",
1131                               PFID(&lli->lli_fid), sai->sai_hit,
1132                               sai->sai_miss, sai->sai_sent,
1133                               sai->sai_replied, current_pid());
1134                        break;
1135                }
1136        }
1137        ll_finish_md_op_data(op_data);
1138
1139        if (rc < 0) {
1140                spin_lock(&lli->lli_sa_lock);
1141                thread_set_flags(sa_thread, SVC_STOPPING);
1142                lli->lli_sa_enabled = 0;
1143                spin_unlock(&lli->lli_sa_lock);
1144        }
1145
1146        /*
1147         * statahead is finished, but statahead entries need to be cached, wait
1148         * for file release to stop me.
1149         */
1150        while (thread_is_running(sa_thread)) {
1151                l_wait_event(sa_thread->t_ctl_waitq,
1152                             sa_has_callback(sai) ||
1153                             !agl_list_empty(sai) ||
1154                             !thread_is_running(sa_thread),
1155                             &lwi);
1156
1157                sa_handle_callback(sai);
1158        }
1159out:
1160        if (sai->sai_agl_valid) {
1161                spin_lock(&lli->lli_agl_lock);
1162                thread_set_flags(agl_thread, SVC_STOPPING);
1163                spin_unlock(&lli->lli_agl_lock);
1164                wake_up(&agl_thread->t_ctl_waitq);
1165
1166                CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
1167                       sai, (unsigned int)agl_thread->t_pid);
1168                l_wait_event(agl_thread->t_ctl_waitq,
1169                             thread_is_stopped(agl_thread),
1170                             &lwi);
1171        } else {
1172                /* Set agl_thread flags anyway. */
1173                thread_set_flags(agl_thread, SVC_STOPPED);
1174        }
1175
1176        /*
1177         * wait for inflight statahead RPCs to finish, and then we can free sai
1178         * safely because statahead RPC will access sai data
1179         */
1180        while (sai->sai_sent != sai->sai_replied) {
1181                /* in case we're not woken up, timeout wait */
1182                lwi = LWI_TIMEOUT(msecs_to_jiffies(MSEC_PER_SEC >> 3),
1183                                  NULL, NULL);
1184                l_wait_event(sa_thread->t_ctl_waitq,
1185                             sai->sai_sent == sai->sai_replied, &lwi);
1186        }
1187
1188        /* release resources held by statahead RPCs */
1189        sa_handle_callback(sai);
1190
1191        spin_lock(&lli->lli_sa_lock);
1192        thread_set_flags(sa_thread, SVC_STOPPED);
1193        spin_unlock(&lli->lli_sa_lock);
1194
1195        CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %pd\n",
1196               sai, parent);
1197
1198        wake_up(&sai->sai_waitq);
1199        wake_up(&sa_thread->t_ctl_waitq);
1200        ll_sai_put(sai);
1201
1202        return rc;
1203}
1204
1205/* authorize opened dir handle @key to statahead */
1206void ll_authorize_statahead(struct inode *dir, void *key)
1207{
1208        struct ll_inode_info *lli = ll_i2info(dir);
1209
1210        spin_lock(&lli->lli_sa_lock);
1211        if (!lli->lli_opendir_key && !lli->lli_sai) {
1212                /*
1213                 * if lli_sai is not NULL, it means previous statahead is not
1214                 * finished yet, we'd better not start a new statahead for now.
1215                 */
1216                LASSERT(!lli->lli_opendir_pid);
1217                lli->lli_opendir_key = key;
1218                lli->lli_opendir_pid = current_pid();
1219                lli->lli_sa_enabled = 1;
1220        }
1221        spin_unlock(&lli->lli_sa_lock);
1222}
1223
1224/*
1225 * deauthorize opened dir handle @key to statahead, but statahead thread may
1226 * still be running, notify it to quit.
1227 */
1228void ll_deauthorize_statahead(struct inode *dir, void *key)
1229{
1230        struct ll_inode_info *lli = ll_i2info(dir);
1231        struct ll_statahead_info *sai;
1232
1233        LASSERT(lli->lli_opendir_key == key);
1234        LASSERT(lli->lli_opendir_pid);
1235
1236        CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
1237               PFID(&lli->lli_fid));
1238
1239        spin_lock(&lli->lli_sa_lock);
1240        lli->lli_opendir_key = NULL;
1241        lli->lli_opendir_pid = 0;
1242        lli->lli_sa_enabled = 0;
1243        sai = lli->lli_sai;
1244        if (sai && thread_is_running(&sai->sai_thread)) {
1245                /*
1246                 * statahead thread may not quit yet because it needs to cache
1247                 * entries, now it's time to tell it to quit.
1248                 */
1249                thread_set_flags(&sai->sai_thread, SVC_STOPPING);
1250                wake_up(&sai->sai_thread.t_ctl_waitq);
1251        }
1252        spin_unlock(&lli->lli_sa_lock);
1253}
1254
1255enum {
1256        /**
1257         * not first dirent, or is "."
1258         */
1259        LS_NOT_FIRST_DE = 0,
1260        /**
1261         * the first non-hidden dirent
1262         */
1263        LS_FIRST_DE,
1264        /**
1265         * the first hidden dirent, that is "."
1266         */
1267        LS_FIRST_DOT_DE
1268};
1269
1270/* file is first dirent under @dir */
1271static int is_first_dirent(struct inode *dir, struct dentry *dentry)
1272{
1273        const struct qstr  *target = &dentry->d_name;
1274        struct md_op_data *op_data;
1275        struct page       *page;
1276        __u64            pos    = 0;
1277        int                dot_de;
1278        int rc = LS_NOT_FIRST_DE;
1279
1280        op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1281                                     LUSTRE_OPC_ANY, dir);
1282        if (IS_ERR(op_data))
1283                return PTR_ERR(op_data);
1284        /**
1285         * FIXME choose the start offset of the readdir
1286         */
1287        op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages;
1288
1289        page = ll_get_dir_page(dir, op_data, pos);
1290
1291        while (1) {
1292                struct lu_dirpage *dp;
1293                struct lu_dirent  *ent;
1294
1295                if (IS_ERR(page)) {
1296                        struct ll_inode_info *lli = ll_i2info(dir);
1297
1298                        rc = PTR_ERR(page);
1299                        CERROR("%s: error reading dir "DFID" at %llu: opendir_pid = %u : rc = %d\n",
1300                               ll_get_fsname(dir->i_sb, NULL, 0),
1301                               PFID(ll_inode2fid(dir)), pos,
1302                               lli->lli_opendir_pid, rc);
1303                        break;
1304                }
1305
1306                dp = page_address(page);
1307                for (ent = lu_dirent_start(dp); ent;
1308                     ent = lu_dirent_next(ent)) {
1309                        __u64 hash;
1310                        int namelen;
1311                        char *name;
1312
1313                        hash = le64_to_cpu(ent->lde_hash);
1314                        /* The ll_get_dir_page() can return any page containing
1315                         * the given hash which may be not the start hash.
1316                         */
1317                        if (unlikely(hash < pos))
1318                                continue;
1319
1320                        namelen = le16_to_cpu(ent->lde_namelen);
1321                        if (unlikely(namelen == 0))
1322                                /*
1323                                 * skip dummy record.
1324                                 */
1325                                continue;
1326
1327                        name = ent->lde_name;
1328                        if (name[0] == '.') {
1329                                if (namelen == 1)
1330                                        /*
1331                                         * skip "."
1332                                         */
1333                                        continue;
1334                                else if (name[1] == '.' && namelen == 2)
1335                                        /*
1336                                         * skip ".."
1337                                         */
1338                                        continue;
1339                                else
1340                                        dot_de = 1;
1341                        } else {
1342                                dot_de = 0;
1343                        }
1344
1345                        if (dot_de && target->name[0] != '.') {
1346                                CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1347                                       target->len, target->name,
1348                                       namelen, name);
1349                                continue;
1350                        }
1351
1352                        if (target->len != namelen ||
1353                            memcmp(target->name, name, namelen) != 0)
1354                                rc = LS_NOT_FIRST_DE;
1355                        else if (!dot_de)
1356                                rc = LS_FIRST_DE;
1357                        else
1358                                rc = LS_FIRST_DOT_DE;
1359
1360                        ll_release_page(dir, page, false);
1361                        goto out;
1362                }
1363                pos = le64_to_cpu(dp->ldp_hash_end);
1364                if (pos == MDS_DIR_END_OFF) {
1365                        /*
1366                         * End of directory reached.
1367                         */
1368                        ll_release_page(dir, page, false);
1369                        goto out;
1370                } else {
1371                        /*
1372                         * chain is exhausted
1373                         * Normal case: continue to the next page.
1374                         */
1375                        ll_release_page(dir, page,
1376                                        le32_to_cpu(dp->ldp_flags) &
1377                                        LDF_COLLIDE);
1378                        page = ll_get_dir_page(dir, op_data, pos);
1379                }
1380        }
1381out:
1382        ll_finish_md_op_data(op_data);
1383        return rc;
1384}
1385
1386/**
1387 * revalidate @dentryp from statahead cache
1388 *
1389 * \param[in]  dir      parent directory
1390 * \param[in]  sai      sai structure
1391 * \param[out] dentryp  pointer to dentry which will be revalidated
1392 * \param[in]  unplug   unplug statahead window only (normally for negative
1393 *                      dentry)
1394 * \retval              1 on success, dentry is saved in @dentryp
1395 * \retval              0 if revalidation failed (no proper lock on client)
1396 * \retval              negative number upon error
1397 */
1398static int revalidate_statahead_dentry(struct inode *dir,
1399                                       struct ll_statahead_info *sai,
1400                                       struct dentry **dentryp,
1401                                       bool unplug)
1402{
1403        struct ll_inode_info *lli = ll_i2info(dir);
1404        struct sa_entry *entry = NULL;
1405        struct l_wait_info lwi = { 0 };
1406        struct ll_dentry_data *ldd;
1407        int rc = 0;
1408
1409        if ((*dentryp)->d_name.name[0] == '.') {
1410                if (sai->sai_ls_all ||
1411                    sai->sai_miss_hidden >= sai->sai_skip_hidden) {
1412                        /*
1413                         * Hidden dentry is the first one, or statahead
1414                         * thread does not skip so many hidden dentries
1415                         * before "sai_ls_all" enabled as below.
1416                         */
1417                } else {
1418                        if (!sai->sai_ls_all)
1419                                /*
1420                                 * It maybe because hidden dentry is not
1421                                 * the first one, "sai_ls_all" was not
1422                                 * set, then "ls -al" missed. Enable
1423                                 * "sai_ls_all" for such case.
1424                                 */
1425                                sai->sai_ls_all = 1;
1426
1427                        /*
1428                         * Such "getattr" has been skipped before
1429                         * "sai_ls_all" enabled as above.
1430                         */
1431                        sai->sai_miss_hidden++;
1432                        return -EAGAIN;
1433                }
1434        }
1435
1436        if (unplug) {
1437                rc = 1;
1438                goto out_unplug;
1439        }
1440
1441        entry = sa_get(sai, &(*dentryp)->d_name);
1442        if (!entry) {
1443                rc = -EAGAIN;
1444                goto out_unplug;
1445        }
1446
1447        /* if statahead is busy in readdir, help it do post-work */
1448        if (!sa_ready(entry) && sai->sai_in_readpage)
1449                sa_handle_callback(sai);
1450
1451        if (!sa_ready(entry)) {
1452                spin_lock(&lli->lli_sa_lock);
1453                sai->sai_index_wait = entry->se_index;
1454                spin_unlock(&lli->lli_sa_lock);
1455                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL,
1456                                       LWI_ON_SIGNAL_NOOP, NULL);
1457                rc = l_wait_event(sai->sai_waitq, sa_ready(entry), &lwi);
1458                if (rc < 0) {
1459                        /*
1460                         * entry may not be ready, so it may be used by inflight
1461                         * statahead RPC, don't free it.
1462                         */
1463                        entry = NULL;
1464                        rc = -EAGAIN;
1465                        goto out_unplug;
1466                }
1467        }
1468
1469        if (entry->se_state == SA_ENTRY_SUCC && entry->se_inode) {
1470                struct inode *inode = entry->se_inode;
1471                struct lookup_intent it = { .it_op = IT_GETATTR,
1472                                            .it_lock_handle = entry->se_handle };
1473                __u64 bits;
1474
1475                rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
1476                                        ll_inode2fid(inode), &bits);
1477                if (rc == 1) {
1478                        if (!(*dentryp)->d_inode) {
1479                                struct dentry *alias;
1480
1481                                alias = ll_splice_alias(inode, *dentryp);
1482                                if (IS_ERR(alias)) {
1483                                        ll_intent_release(&it);
1484                                        rc = PTR_ERR(alias);
1485                                        goto out_unplug;
1486                                }
1487                                *dentryp = alias;
1488                                /**
1489                                 * statahead prepared this inode, transfer inode
1490                                 * refcount from sa_entry to dentry
1491                                 */
1492                                entry->se_inode = NULL;
1493                        } else if ((*dentryp)->d_inode != inode) {
1494                                /* revalidate, but inode is recreated */
1495                                CDEBUG(D_READA,
1496                                       "%s: stale dentry %pd inode "DFID", statahead inode "DFID"\n",
1497                                       ll_get_fsname((*dentryp)->d_inode->i_sb,
1498                                                     NULL, 0),
1499                                       *dentryp,
1500                                       PFID(ll_inode2fid((*dentryp)->d_inode)),
1501                                       PFID(ll_inode2fid(inode)));
1502                                ll_intent_release(&it);
1503                                rc = -ESTALE;
1504                                goto out_unplug;
1505                        }
1506
1507                        if ((bits & MDS_INODELOCK_LOOKUP) &&
1508                            d_lustre_invalid(*dentryp))
1509                                d_lustre_revalidate(*dentryp);
1510                        ll_intent_release(&it);
1511                }
1512        }
1513out_unplug:
1514        /*
1515         * statahead cached sa_entry can be used only once, and will be killed
1516         * right after use, so if lookup/revalidate accessed statahead cache,
1517         * set dentry ldd_sa_generation to parent lli_sa_generation, later if we
1518         * stat this file again, we know we've done statahead before, see
1519         * dentry_may_statahead().
1520         */
1521        ldd = ll_d2d(*dentryp);
1522        ldd->lld_sa_generation = lli->lli_sa_generation;
1523        sa_put(sai, entry);
1524        return rc;
1525}
1526
1527/**
1528 * start statahead thread
1529 *
1530 * \param[in] dir       parent directory
1531 * \param[in] dentry    dentry that triggers statahead, normally the first
1532 *                      dirent under @dir
1533 * \retval              -EAGAIN on success, because when this function is
1534 *                      called, it's already in lookup call, so client should
1535 *                      do it itself instead of waiting for statahead thread
1536 *                      to do it asynchronously.
1537 * \retval              negative number upon error
1538 */
1539static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
1540{
1541        struct ll_inode_info *lli = ll_i2info(dir);
1542        struct ll_statahead_info *sai = NULL;
1543        struct l_wait_info lwi = { 0 };
1544        struct ptlrpc_thread *thread;
1545        struct task_struct *task;
1546        struct dentry *parent = dentry->d_parent;
1547        int rc;
1548
1549        /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
1550        rc = is_first_dirent(dir, dentry);
1551        if (rc == LS_NOT_FIRST_DE) {
1552                /* It is not "ls -{a}l" operation, no need statahead for it. */
1553                rc = -EFAULT;
1554                goto out;
1555        }
1556
1557        sai = ll_sai_alloc(parent);
1558        if (!sai) {
1559                rc = -ENOMEM;
1560                goto out;
1561        }
1562
1563        sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
1564        /*
1565         * if current lli_opendir_key was deauthorized, or dir re-opened by
1566         * another process, don't start statahead, otherwise the newly spawned
1567         * statahead thread won't be notified to quit.
1568         */
1569        spin_lock(&lli->lli_sa_lock);
1570        if (unlikely(lli->lli_sai || lli->lli_opendir_key ||
1571                     lli->lli_opendir_pid != current->pid)) {
1572                spin_unlock(&lli->lli_sa_lock);
1573                rc = -EPERM;
1574                goto out;
1575        }
1576        lli->lli_sai = sai;
1577        spin_unlock(&lli->lli_sa_lock);
1578
1579        atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_running);
1580
1581        CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
1582               current_pid(), parent);
1583
1584        task = kthread_run(ll_statahead_thread, parent, "ll_sa_%u",
1585                           lli->lli_opendir_pid);
1586        thread = &sai->sai_thread;
1587        if (IS_ERR(task)) {
1588                rc = PTR_ERR(task);
1589                CERROR("can't start ll_sa thread, rc : %d\n", rc);
1590                goto out;
1591        }
1592
1593        l_wait_event(thread->t_ctl_waitq,
1594                     thread_is_running(thread) || thread_is_stopped(thread),
1595                     &lwi);
1596        ll_sai_put(sai);
1597
1598        /*
1599         * We don't stat-ahead for the first dirent since we are already in
1600         * lookup.
1601         */
1602        return -EAGAIN;
1603
1604out:
1605        /*
1606         * once we start statahead thread failed, disable statahead so
1607         * that subsequent stat won't waste time to try it.
1608         */
1609        spin_lock(&lli->lli_sa_lock);
1610        lli->lli_sa_enabled = 0;
1611        lli->lli_sai = NULL;
1612        spin_unlock(&lli->lli_sa_lock);
1613        if (sai)
1614                ll_sai_free(sai);
1615        return rc;
1616}
1617
1618/**
1619 * statahead entry function, this is called when client getattr on a file, it
1620 * will start statahead thread if this is the first dir entry, else revalidate
1621 * dentry from statahead cache.
1622 *
1623 * \param[in]  dir      parent directory
1624 * \param[out] dentryp  dentry to getattr
1625 * \param[in]  unplug   unplug statahead window only (normally for negative
1626 *                      dentry)
1627 * \retval              1 on success
1628 * \retval              0 revalidation from statahead cache failed, caller needs
1629 *                      to getattr from server directly
1630 * \retval              negative number on error, caller often ignores this and
1631 *                      then getattr from server
1632 */
1633int ll_statahead(struct inode *dir, struct dentry **dentryp, bool unplug)
1634{
1635        struct ll_statahead_info *sai;
1636
1637        sai = ll_sai_get(dir);
1638        if (sai) {
1639                int rc;
1640
1641                rc = revalidate_statahead_dentry(dir, sai, dentryp, unplug);
1642                CDEBUG(D_READA, "revalidate statahead %pd: %d.\n",
1643                       *dentryp, rc);
1644                ll_sai_put(sai);
1645                return rc;
1646        }
1647        return start_statahead_thread(dir, *dentryp);
1648}
1649