linux/drivers/staging/lustre/lustre/llite/rw.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * lustre/llite/rw.c
  37 *
  38 * Lustre Lite I/O page cache routines shared by different kernel revs
  39 */
  40
  41#include <linux/kernel.h>
  42#include <linux/mm.h>
  43#include <linux/string.h>
  44#include <linux/stat.h>
  45#include <linux/errno.h>
  46#include <linux/unistd.h>
  47#include <linux/writeback.h>
  48#include <asm/uaccess.h>
  49
  50#include <linux/fs.h>
  51#include <linux/pagemap.h>
  52/* current_is_kswapd() */
  53#include <linux/swap.h>
  54
  55#define DEBUG_SUBSYSTEM S_LLITE
  56
  57#include <lustre_lite.h>
  58#include <obd_cksum.h>
  59#include "llite_internal.h"
  60#include <linux/lustre_compat25.h>
  61
  62/**
  63 * Finalizes cl-data before exiting typical address_space operation. Dual to
  64 * ll_cl_init().
  65 */
  66static void ll_cl_fini(struct ll_cl_context *lcc)
  67{
  68        struct lu_env  *env  = lcc->lcc_env;
  69        struct cl_io   *io   = lcc->lcc_io;
  70        struct cl_page *page = lcc->lcc_page;
  71
  72        LASSERT(lcc->lcc_cookie == current);
  73        LASSERT(env != NULL);
  74
  75        if (page != NULL) {
  76                lu_ref_del(&page->cp_reference, "cl_io", io);
  77                cl_page_put(env, page);
  78        }
  79
  80        if (io && lcc->lcc_created) {
  81                cl_io_end(env, io);
  82                cl_io_unlock(env, io);
  83                cl_io_iter_fini(env, io);
  84                cl_io_fini(env, io);
  85        }
  86        cl_env_put(env, &lcc->lcc_refcheck);
  87}
  88
  89/**
  90 * Initializes common cl-data at the typical address_space operation entry
  91 * point.
  92 */
  93static struct ll_cl_context *ll_cl_init(struct file *file,
  94                                        struct page *vmpage, int create)
  95{
  96        struct ll_cl_context *lcc;
  97        struct lu_env    *env;
  98        struct cl_io     *io;
  99        struct cl_object *clob;
 100        struct ccc_io    *cio;
 101
 102        int refcheck;
 103        int result = 0;
 104
 105        clob = ll_i2info(vmpage->mapping->host)->lli_clob;
 106        LASSERT(clob != NULL);
 107
 108        env = cl_env_get(&refcheck);
 109        if (IS_ERR(env))
 110                return ERR_CAST(env);
 111
 112        lcc = &vvp_env_info(env)->vti_io_ctx;
 113        memset(lcc, 0, sizeof(*lcc));
 114        lcc->lcc_env = env;
 115        lcc->lcc_refcheck = refcheck;
 116        lcc->lcc_cookie = current;
 117
 118        cio = ccc_env_io(env);
 119        io = cio->cui_cl.cis_io;
 120        if (io == NULL && create) {
 121                struct inode *inode = vmpage->mapping->host;
 122                loff_t pos;
 123
 124                if (mutex_trylock(&inode->i_mutex)) {
 125                        mutex_unlock(&(inode)->i_mutex);
 126
 127                        /* this is too bad. Someone is trying to write the
 128                         * page w/o holding inode mutex. This means we can
 129                         * add dirty pages into cache during truncate */
 130                        CERROR("Proc %s is dirting page w/o inode lock, this"
 131                               "will break truncate.\n", current->comm);
 132                        dump_stack();
 133                        LBUG();
 134                        return ERR_PTR(-EIO);
 135                }
 136
 137                /*
 138                 * Loop-back driver calls ->prepare_write() and ->sendfile()
 139                 * methods directly, bypassing file system ->write() operation,
 140                 * so cl_io has to be created here.
 141                 */
 142                io = ccc_env_thread_io(env);
 143                ll_io_init(io, file, 1);
 144
 145                /* No lock at all for this kind of IO - we can't do it because
 146                 * we have held page lock, it would cause deadlock.
 147                 * XXX: This causes poor performance to loop device - One page
 148                 *      per RPC.
 149                 *      In order to get better performance, users should use
 150                 *      lloop driver instead.
 151                 */
 152                io->ci_lockreq = CILR_NEVER;
 153
 154                pos = (vmpage->index << PAGE_CACHE_SHIFT);
 155
 156                /* Create a temp IO to serve write. */
 157                result = cl_io_rw_init(env, io, CIT_WRITE, pos, PAGE_CACHE_SIZE);
 158                if (result == 0) {
 159                        cio->cui_fd = LUSTRE_FPRIVATE(file);
 160                        cio->cui_iov = NULL;
 161                        cio->cui_nrsegs = 0;
 162                        result = cl_io_iter_init(env, io);
 163                        if (result == 0) {
 164                                result = cl_io_lock(env, io);
 165                                if (result == 0)
 166                                        result = cl_io_start(env, io);
 167                        }
 168                } else
 169                        result = io->ci_result;
 170                lcc->lcc_created = 1;
 171        }
 172
 173        lcc->lcc_io = io;
 174        if (io == NULL)
 175                result = -EIO;
 176        if (result == 0) {
 177                struct cl_page   *page;
 178
 179                LASSERT(io != NULL);
 180                LASSERT(io->ci_state == CIS_IO_GOING);
 181                LASSERT(cio->cui_fd == LUSTRE_FPRIVATE(file));
 182                page = cl_page_find(env, clob, vmpage->index, vmpage,
 183                                    CPT_CACHEABLE);
 184                if (!IS_ERR(page)) {
 185                        lcc->lcc_page = page;
 186                        lu_ref_add(&page->cp_reference, "cl_io", io);
 187                        result = 0;
 188                } else
 189                        result = PTR_ERR(page);
 190        }
 191        if (result) {
 192                ll_cl_fini(lcc);
 193                lcc = ERR_PTR(result);
 194        }
 195
 196        CDEBUG(D_VFSTRACE, "%lu@"DFID" -> %d %p %p\n",
 197               vmpage->index, PFID(lu_object_fid(&clob->co_lu)), result,
 198               env, io);
 199        return lcc;
 200}
 201
 202static struct ll_cl_context *ll_cl_get(void)
 203{
 204        struct ll_cl_context *lcc;
 205        struct lu_env *env;
 206        int refcheck;
 207
 208        env = cl_env_get(&refcheck);
 209        LASSERT(!IS_ERR(env));
 210        lcc = &vvp_env_info(env)->vti_io_ctx;
 211        LASSERT(env == lcc->lcc_env);
 212        LASSERT(current == lcc->lcc_cookie);
 213        cl_env_put(env, &refcheck);
 214
 215        /* env has got in ll_cl_init, so it is still usable. */
 216        return lcc;
 217}
 218
 219/**
 220 * ->prepare_write() address space operation called by generic_file_write()
 221 * for every page during write.
 222 */
 223int ll_prepare_write(struct file *file, struct page *vmpage, unsigned from,
 224                     unsigned to)
 225{
 226        struct ll_cl_context *lcc;
 227        int result;
 228
 229        lcc = ll_cl_init(file, vmpage, 1);
 230        if (!IS_ERR(lcc)) {
 231                struct lu_env  *env = lcc->lcc_env;
 232                struct cl_io   *io  = lcc->lcc_io;
 233                struct cl_page *page = lcc->lcc_page;
 234
 235                cl_page_assume(env, io, page);
 236
 237                result = cl_io_prepare_write(env, io, page, from, to);
 238                if (result == 0) {
 239                        /*
 240                         * Add a reference, so that page is not evicted from
 241                         * the cache until ->commit_write() is called.
 242                         */
 243                        cl_page_get(page);
 244                        lu_ref_add(&page->cp_reference, "prepare_write",
 245                                   current);
 246                } else {
 247                        cl_page_unassume(env, io, page);
 248                        ll_cl_fini(lcc);
 249                }
 250                /* returning 0 in prepare assumes commit must be called
 251                 * afterwards */
 252        } else {
 253                result = PTR_ERR(lcc);
 254        }
 255        return result;
 256}
 257
 258int ll_commit_write(struct file *file, struct page *vmpage, unsigned from,
 259                    unsigned to)
 260{
 261        struct ll_cl_context *lcc;
 262        struct lu_env    *env;
 263        struct cl_io     *io;
 264        struct cl_page   *page;
 265        int result = 0;
 266
 267        lcc  = ll_cl_get();
 268        env  = lcc->lcc_env;
 269        page = lcc->lcc_page;
 270        io   = lcc->lcc_io;
 271
 272        LASSERT(cl_page_is_owned(page, io));
 273        LASSERT(from <= to);
 274        if (from != to) /* handle short write case. */
 275                result = cl_io_commit_write(env, io, page, from, to);
 276        if (cl_page_is_owned(page, io))
 277                cl_page_unassume(env, io, page);
 278
 279        /*
 280         * Release reference acquired by ll_prepare_write().
 281         */
 282        lu_ref_del(&page->cp_reference, "prepare_write", current);
 283        cl_page_put(env, page);
 284        ll_cl_fini(lcc);
 285        return result;
 286}
 287
 288struct obd_capa *cl_capa_lookup(struct inode *inode, enum cl_req_type crt)
 289{
 290        __u64 opc;
 291
 292        opc = crt == CRT_WRITE ? CAPA_OPC_OSS_WRITE : CAPA_OPC_OSS_RW;
 293        return ll_osscapa_get(inode, opc);
 294}
 295
 296static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which);
 297
 298/**
 299 * Get readahead pages from the filesystem readahead pool of the client for a
 300 * thread.
 301 *
 302 * /param sbi superblock for filesystem readahead state ll_ra_info
 303 * /param ria per-thread readahead state
 304 * /param pages number of pages requested for readahead for the thread.
 305 *
 306 * WARNING: This algorithm is used to reduce contention on sbi->ll_lock.
 307 * It should work well if the ra_max_pages is much greater than the single
 308 * file's read-ahead window, and not too many threads contending for
 309 * these readahead pages.
 310 *
 311 * TODO: There may be a 'global sync problem' if many threads are trying
 312 * to get an ra budget that is larger than the remaining readahead pages
 313 * and reach here at exactly the same time. They will compute /a ret to
 314 * consume the remaining pages, but will fail at atomic_add_return() and
 315 * get a zero ra window, although there is still ra space remaining. - Jay */
 316
 317static unsigned long ll_ra_count_get(struct ll_sb_info *sbi,
 318                                     struct ra_io_arg *ria,
 319                                     unsigned long pages)
 320{
 321        struct ll_ra_info *ra = &sbi->ll_ra_info;
 322        long ret;
 323
 324        /* If read-ahead pages left are less than 1M, do not do read-ahead,
 325         * otherwise it will form small read RPC(< 1M), which hurt server
 326         * performance a lot. */
 327        ret = min(ra->ra_max_pages - atomic_read(&ra->ra_cur_pages), pages);
 328        if (ret < 0 || ret < min_t(long, PTLRPC_MAX_BRW_PAGES, pages))
 329                GOTO(out, ret = 0);
 330
 331        /* If the non-strided (ria_pages == 0) readahead window
 332         * (ria_start + ret) has grown across an RPC boundary, then trim
 333         * readahead size by the amount beyond the RPC so it ends on an
 334         * RPC boundary. If the readahead window is already ending on
 335         * an RPC boundary (beyond_rpc == 0), or smaller than a full
 336         * RPC (beyond_rpc < ret) the readahead size is unchanged.
 337         * The (beyond_rpc != 0) check is skipped since the conditional
 338         * branch is more expensive than subtracting zero from the result.
 339         *
 340         * Strided read is left unaligned to avoid small fragments beyond
 341         * the RPC boundary from needing an extra read RPC. */
 342        if (ria->ria_pages == 0) {
 343                long beyond_rpc = (ria->ria_start + ret) % PTLRPC_MAX_BRW_PAGES;
 344                if (/* beyond_rpc != 0 && */ beyond_rpc < ret)
 345                        ret -= beyond_rpc;
 346        }
 347
 348        if (atomic_add_return(ret, &ra->ra_cur_pages) > ra->ra_max_pages) {
 349                atomic_sub(ret, &ra->ra_cur_pages);
 350                ret = 0;
 351        }
 352
 353out:
 354        return ret;
 355}
 356
 357void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len)
 358{
 359        struct ll_ra_info *ra = &sbi->ll_ra_info;
 360        atomic_sub(len, &ra->ra_cur_pages);
 361}
 362
 363static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which)
 364{
 365        LASSERTF(which >= 0 && which < _NR_RA_STAT, "which: %u\n", which);
 366        lprocfs_counter_incr(sbi->ll_ra_stats, which);
 367}
 368
 369void ll_ra_stats_inc(struct address_space *mapping, enum ra_stat which)
 370{
 371        struct ll_sb_info *sbi = ll_i2sbi(mapping->host);
 372        ll_ra_stats_inc_sbi(sbi, which);
 373}
 374
 375#define RAS_CDEBUG(ras) \
 376        CDEBUG(D_READA,                                               \
 377               "lrp %lu cr %lu cp %lu ws %lu wl %lu nra %lu r %lu ri %lu"    \
 378               "csr %lu sf %lu sp %lu sl %lu \n",                           \
 379               ras->ras_last_readpage, ras->ras_consecutive_requests,   \
 380               ras->ras_consecutive_pages, ras->ras_window_start,           \
 381               ras->ras_window_len, ras->ras_next_readahead,             \
 382               ras->ras_requests, ras->ras_request_index,                   \
 383               ras->ras_consecutive_stride_requests, ras->ras_stride_offset, \
 384               ras->ras_stride_pages, ras->ras_stride_length)
 385
 386static int index_in_window(unsigned long index, unsigned long point,
 387                           unsigned long before, unsigned long after)
 388{
 389        unsigned long start = point - before, end = point + after;
 390
 391        if (start > point)
 392               start = 0;
 393        if (end < point)
 394               end = ~0;
 395
 396        return start <= index && index <= end;
 397}
 398
 399static struct ll_readahead_state *ll_ras_get(struct file *f)
 400{
 401        struct ll_file_data       *fd;
 402
 403        fd = LUSTRE_FPRIVATE(f);
 404        return &fd->fd_ras;
 405}
 406
 407void ll_ra_read_in(struct file *f, struct ll_ra_read *rar)
 408{
 409        struct ll_readahead_state *ras;
 410
 411        ras = ll_ras_get(f);
 412
 413        spin_lock(&ras->ras_lock);
 414        ras->ras_requests++;
 415        ras->ras_request_index = 0;
 416        ras->ras_consecutive_requests++;
 417        rar->lrr_reader = current;
 418
 419        list_add(&rar->lrr_linkage, &ras->ras_read_beads);
 420        spin_unlock(&ras->ras_lock);
 421}
 422
 423void ll_ra_read_ex(struct file *f, struct ll_ra_read *rar)
 424{
 425        struct ll_readahead_state *ras;
 426
 427        ras = ll_ras_get(f);
 428
 429        spin_lock(&ras->ras_lock);
 430        list_del_init(&rar->lrr_linkage);
 431        spin_unlock(&ras->ras_lock);
 432}
 433
 434static struct ll_ra_read *ll_ra_read_get_locked(struct ll_readahead_state *ras)
 435{
 436        struct ll_ra_read *scan;
 437
 438        list_for_each_entry(scan, &ras->ras_read_beads, lrr_linkage) {
 439                if (scan->lrr_reader == current)
 440                        return scan;
 441        }
 442        return NULL;
 443}
 444
 445struct ll_ra_read *ll_ra_read_get(struct file *f)
 446{
 447        struct ll_readahead_state *ras;
 448        struct ll_ra_read        *bead;
 449
 450        ras = ll_ras_get(f);
 451
 452        spin_lock(&ras->ras_lock);
 453        bead = ll_ra_read_get_locked(ras);
 454        spin_unlock(&ras->ras_lock);
 455        return bead;
 456}
 457
 458static int cl_read_ahead_page(const struct lu_env *env, struct cl_io *io,
 459                              struct cl_page_list *queue, struct cl_page *page,
 460                              struct page *vmpage)
 461{
 462        struct ccc_page *cp;
 463        int           rc;
 464
 465        rc = 0;
 466        cl_page_assume(env, io, page);
 467        lu_ref_add(&page->cp_reference, "ra", current);
 468        cp = cl2ccc_page(cl_page_at(page, &vvp_device_type));
 469        if (!cp->cpg_defer_uptodate && !PageUptodate(vmpage)) {
 470                rc = cl_page_is_under_lock(env, io, page);
 471                if (rc == -EBUSY) {
 472                        cp->cpg_defer_uptodate = 1;
 473                        cp->cpg_ra_used = 0;
 474                        cl_page_list_add(queue, page);
 475                        rc = 1;
 476                } else {
 477                        cl_page_delete(env, page);
 478                        rc = -ENOLCK;
 479                }
 480        } else {
 481                /* skip completed pages */
 482                cl_page_unassume(env, io, page);
 483        }
 484        lu_ref_del(&page->cp_reference, "ra", current);
 485        cl_page_put(env, page);
 486        return rc;
 487}
 488
 489/**
 490 * Initiates read-ahead of a page with given index.
 491 *
 492 * \retval     +ve: page was added to \a queue.
 493 *
 494 * \retval -ENOLCK: there is no extent lock for this part of a file, stop
 495 *                read-ahead.
 496 *
 497 * \retval  -ve, 0: page wasn't added to \a queue for other reason.
 498 */
 499static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io,
 500                              struct cl_page_list *queue,
 501                              pgoff_t index, struct address_space *mapping)
 502{
 503        struct page      *vmpage;
 504        struct cl_object *clob  = ll_i2info(mapping->host)->lli_clob;
 505        struct cl_page   *page;
 506        enum ra_stat      which = _NR_RA_STAT; /* keep gcc happy */
 507        unsigned int      gfp_mask;
 508        int            rc    = 0;
 509        const char       *msg   = NULL;
 510
 511        gfp_mask = GFP_HIGHUSER & ~__GFP_WAIT;
 512#ifdef __GFP_NOWARN
 513        gfp_mask |= __GFP_NOWARN;
 514#endif
 515        vmpage = grab_cache_page_nowait(mapping, index);
 516        if (vmpage != NULL) {
 517                /* Check if vmpage was truncated or reclaimed */
 518                if (vmpage->mapping == mapping) {
 519                        page = cl_page_find(env, clob, vmpage->index,
 520                                            vmpage, CPT_CACHEABLE);
 521                        if (!IS_ERR(page)) {
 522                                rc = cl_read_ahead_page(env, io, queue,
 523                                                        page, vmpage);
 524                                if (rc == -ENOLCK) {
 525                                        which = RA_STAT_FAILED_MATCH;
 526                                        msg   = "lock match failed";
 527                                }
 528                        } else {
 529                                which = RA_STAT_FAILED_GRAB_PAGE;
 530                                msg   = "cl_page_find failed";
 531                        }
 532                } else {
 533                        which = RA_STAT_WRONG_GRAB_PAGE;
 534                        msg   = "g_c_p_n returned invalid page";
 535                }
 536                if (rc != 1)
 537                        unlock_page(vmpage);
 538                page_cache_release(vmpage);
 539        } else {
 540                which = RA_STAT_FAILED_GRAB_PAGE;
 541                msg   = "g_c_p_n failed";
 542        }
 543        if (msg != NULL) {
 544                ll_ra_stats_inc(mapping, which);
 545                CDEBUG(D_READA, "%s\n", msg);
 546        }
 547        return rc;
 548}
 549
 550#define RIA_DEBUG(ria)                                                 \
 551        CDEBUG(D_READA, "rs %lu re %lu ro %lu rl %lu rp %lu\n",       \
 552        ria->ria_start, ria->ria_end, ria->ria_stoff, ria->ria_length,\
 553        ria->ria_pages)
 554
 555/* Limit this to the blocksize instead of PTLRPC_BRW_MAX_SIZE, since we don't
 556 * know what the actual RPC size is.  If this needs to change, it makes more
 557 * sense to tune the i_blkbits value for the file based on the OSTs it is
 558 * striped over, rather than having a constant value for all files here. */
 559
 560/* RAS_INCREASE_STEP should be (1UL << (inode->i_blkbits - PAGE_CACHE_SHIFT)).
 561 * Temprarily set RAS_INCREASE_STEP to 1MB. After 4MB RPC is enabled
 562 * by default, this should be adjusted corresponding with max_read_ahead_mb
 563 * and max_read_ahead_per_file_mb otherwise the readahead budget can be used
 564 * up quickly which will affect read performance siginificantly. See LU-2816 */
 565#define RAS_INCREASE_STEP(inode) (ONE_MB_BRW_SIZE >> PAGE_CACHE_SHIFT)
 566
 567static inline int stride_io_mode(struct ll_readahead_state *ras)
 568{
 569        return ras->ras_consecutive_stride_requests > 1;
 570}
 571/* The function calculates how much pages will be read in
 572 * [off, off + length], in such stride IO area,
 573 * stride_offset = st_off, stride_lengh = st_len,
 574 * stride_pages = st_pgs
 575 *
 576 *   |------------------|*****|------------------|*****|------------|*****|....
 577 * st_off
 578 *   |--- st_pgs     ---|
 579 *   |-----     st_len   -----|
 580 *
 581 *            How many pages it should read in such pattern
 582 *            |-------------------------------------------------------------|
 583 *            off
 584 *            |<------            length                      ------->|
 585 *
 586 *        =   |<----->|  +  |-------------------------------------| +   |---|
 587 *           start_left          st_pgs * i                 end_left
 588 */
 589static unsigned long
 590stride_pg_count(pgoff_t st_off, unsigned long st_len, unsigned long st_pgs,
 591                unsigned long off, unsigned long length)
 592{
 593        __u64 start = off > st_off ? off - st_off : 0;
 594        __u64 end = off + length > st_off ? off + length - st_off : 0;
 595        unsigned long start_left = 0;
 596        unsigned long end_left = 0;
 597        unsigned long pg_count;
 598
 599        if (st_len == 0 || length == 0 || end == 0)
 600                return length;
 601
 602        start_left = do_div(start, st_len);
 603        if (start_left < st_pgs)
 604                start_left = st_pgs - start_left;
 605        else
 606                start_left = 0;
 607
 608        end_left = do_div(end, st_len);
 609        if (end_left > st_pgs)
 610                end_left = st_pgs;
 611
 612        CDEBUG(D_READA, "start "LPU64", end "LPU64" start_left %lu end_left %lu \n",
 613               start, end, start_left, end_left);
 614
 615        if (start == end)
 616                pg_count = end_left - (st_pgs - start_left);
 617        else
 618                pg_count = start_left + st_pgs * (end - start - 1) + end_left;
 619
 620        CDEBUG(D_READA, "st_off %lu, st_len %lu st_pgs %lu off %lu length %lu"
 621               "pgcount %lu\n", st_off, st_len, st_pgs, off, length, pg_count);
 622
 623        return pg_count;
 624}
 625
 626static int ria_page_count(struct ra_io_arg *ria)
 627{
 628        __u64 length = ria->ria_end >= ria->ria_start ?
 629                       ria->ria_end - ria->ria_start + 1 : 0;
 630
 631        return stride_pg_count(ria->ria_stoff, ria->ria_length,
 632                               ria->ria_pages, ria->ria_start,
 633                               length);
 634}
 635
 636/*Check whether the index is in the defined ra-window */
 637static int ras_inside_ra_window(unsigned long idx, struct ra_io_arg *ria)
 638{
 639        /* If ria_length == ria_pages, it means non-stride I/O mode,
 640         * idx should always inside read-ahead window in this case
 641         * For stride I/O mode, just check whether the idx is inside
 642         * the ria_pages. */
 643        return ria->ria_length == 0 || ria->ria_length == ria->ria_pages ||
 644               (idx >= ria->ria_stoff && (idx - ria->ria_stoff) %
 645                ria->ria_length < ria->ria_pages);
 646}
 647
 648static int ll_read_ahead_pages(const struct lu_env *env,
 649                               struct cl_io *io, struct cl_page_list *queue,
 650                               struct ra_io_arg *ria,
 651                               unsigned long *reserved_pages,
 652                               struct address_space *mapping,
 653                               unsigned long *ra_end)
 654{
 655        int rc, count = 0, stride_ria;
 656        unsigned long page_idx;
 657
 658        LASSERT(ria != NULL);
 659        RIA_DEBUG(ria);
 660
 661        stride_ria = ria->ria_length > ria->ria_pages && ria->ria_pages > 0;
 662        for (page_idx = ria->ria_start; page_idx <= ria->ria_end &&
 663                        *reserved_pages > 0; page_idx++) {
 664                if (ras_inside_ra_window(page_idx, ria)) {
 665                        /* If the page is inside the read-ahead window*/
 666                        rc = ll_read_ahead_page(env, io, queue,
 667                                                page_idx, mapping);
 668                        if (rc == 1) {
 669                                (*reserved_pages)--;
 670                                count ++;
 671                        } else if (rc == -ENOLCK)
 672                                break;
 673                } else if (stride_ria) {
 674                        /* If it is not in the read-ahead window, and it is
 675                         * read-ahead mode, then check whether it should skip
 676                         * the stride gap */
 677                        pgoff_t offset;
 678                        /* FIXME: This assertion only is valid when it is for
 679                         * forward read-ahead, it will be fixed when backward
 680                         * read-ahead is implemented */
 681                        LASSERTF(page_idx > ria->ria_stoff, "Invalid page_idx %lu"
 682                                "rs %lu re %lu ro %lu rl %lu rp %lu\n", page_idx,
 683                                ria->ria_start, ria->ria_end, ria->ria_stoff,
 684                                ria->ria_length, ria->ria_pages);
 685                        offset = page_idx - ria->ria_stoff;
 686                        offset = offset % (ria->ria_length);
 687                        if (offset > ria->ria_pages) {
 688                                page_idx += ria->ria_length - offset;
 689                                CDEBUG(D_READA, "i %lu skip %lu \n", page_idx,
 690                                       ria->ria_length - offset);
 691                                continue;
 692                        }
 693                }
 694        }
 695        *ra_end = page_idx;
 696        return count;
 697}
 698
 699int ll_readahead(const struct lu_env *env, struct cl_io *io,
 700                 struct ll_readahead_state *ras, struct address_space *mapping,
 701                 struct cl_page_list *queue, int flags)
 702{
 703        struct vvp_io *vio = vvp_env_io(env);
 704        struct vvp_thread_info *vti = vvp_env_info(env);
 705        struct cl_attr *attr = ccc_env_thread_attr(env);
 706        unsigned long start = 0, end = 0, reserved;
 707        unsigned long ra_end, len;
 708        struct inode *inode;
 709        struct ll_ra_read *bead;
 710        struct ra_io_arg *ria = &vti->vti_ria;
 711        struct ll_inode_info *lli;
 712        struct cl_object *clob;
 713        int ret = 0;
 714        __u64 kms;
 715
 716        inode = mapping->host;
 717        lli = ll_i2info(inode);
 718        clob = lli->lli_clob;
 719
 720        memset(ria, 0, sizeof(*ria));
 721
 722        cl_object_attr_lock(clob);
 723        ret = cl_object_attr_get(env, clob, attr);
 724        cl_object_attr_unlock(clob);
 725
 726        if (ret != 0)
 727                return ret;
 728        kms = attr->cat_kms;
 729        if (kms == 0) {
 730                ll_ra_stats_inc(mapping, RA_STAT_ZERO_LEN);
 731                return 0;
 732        }
 733
 734        spin_lock(&ras->ras_lock);
 735        if (vio->cui_ra_window_set)
 736                bead = &vio->cui_bead;
 737        else
 738                bead = NULL;
 739
 740        /* Enlarge the RA window to encompass the full read */
 741        if (bead != NULL && ras->ras_window_start + ras->ras_window_len <
 742            bead->lrr_start + bead->lrr_count) {
 743                ras->ras_window_len = bead->lrr_start + bead->lrr_count -
 744                                      ras->ras_window_start;
 745        }
 746        /* Reserve a part of the read-ahead window that we'll be issuing */
 747        if (ras->ras_window_len) {
 748                start = ras->ras_next_readahead;
 749                end = ras->ras_window_start + ras->ras_window_len - 1;
 750        }
 751        if (end != 0) {
 752                unsigned long rpc_boundary;
 753                /*
 754                 * Align RA window to an optimal boundary.
 755                 *
 756                 * XXX This would be better to align to cl_max_pages_per_rpc
 757                 * instead of PTLRPC_MAX_BRW_PAGES, because the RPC size may
 758                 * be aligned to the RAID stripe size in the future and that
 759                 * is more important than the RPC size.
 760                 */
 761                /* Note: we only trim the RPC, instead of extending the RPC
 762                 * to the boundary, so to avoid reading too much pages during
 763                 * random reading. */
 764                rpc_boundary = ((end + 1) & (~(PTLRPC_MAX_BRW_PAGES - 1)));
 765                if (rpc_boundary > 0)
 766                        rpc_boundary--;
 767
 768                if (rpc_boundary  > start)
 769                        end = rpc_boundary;
 770
 771                /* Truncate RA window to end of file */
 772                end = min(end, (unsigned long)((kms - 1) >> PAGE_CACHE_SHIFT));
 773
 774                ras->ras_next_readahead = max(end, end + 1);
 775                RAS_CDEBUG(ras);
 776        }
 777        ria->ria_start = start;
 778        ria->ria_end = end;
 779        /* If stride I/O mode is detected, get stride window*/
 780        if (stride_io_mode(ras)) {
 781                ria->ria_stoff = ras->ras_stride_offset;
 782                ria->ria_length = ras->ras_stride_length;
 783                ria->ria_pages = ras->ras_stride_pages;
 784        }
 785        spin_unlock(&ras->ras_lock);
 786
 787        if (end == 0) {
 788                ll_ra_stats_inc(mapping, RA_STAT_ZERO_WINDOW);
 789                return 0;
 790        }
 791        len = ria_page_count(ria);
 792        if (len == 0)
 793                return 0;
 794
 795        reserved = ll_ra_count_get(ll_i2sbi(inode), ria, len);
 796        if (reserved < len)
 797                ll_ra_stats_inc(mapping, RA_STAT_MAX_IN_FLIGHT);
 798
 799        CDEBUG(D_READA, "reserved page %lu ra_cur %d ra_max %lu\n", reserved,
 800               atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages),
 801               ll_i2sbi(inode)->ll_ra_info.ra_max_pages);
 802
 803        ret = ll_read_ahead_pages(env, io, queue,
 804                                  ria, &reserved, mapping, &ra_end);
 805
 806        LASSERTF(reserved >= 0, "reserved %lu\n", reserved);
 807        if (reserved != 0)
 808                ll_ra_count_put(ll_i2sbi(inode), reserved);
 809
 810        if (ra_end == end + 1 && ra_end == (kms >> PAGE_CACHE_SHIFT))
 811                ll_ra_stats_inc(mapping, RA_STAT_EOF);
 812
 813        /* if we didn't get to the end of the region we reserved from
 814         * the ras we need to go back and update the ras so that the
 815         * next read-ahead tries from where we left off.  we only do so
 816         * if the region we failed to issue read-ahead on is still ahead
 817         * of the app and behind the next index to start read-ahead from */
 818        CDEBUG(D_READA, "ra_end %lu end %lu stride end %lu \n",
 819               ra_end, end, ria->ria_end);
 820
 821        if (ra_end != end + 1) {
 822                spin_lock(&ras->ras_lock);
 823                if (ra_end < ras->ras_next_readahead &&
 824                    index_in_window(ra_end, ras->ras_window_start, 0,
 825                                    ras->ras_window_len)) {
 826                        ras->ras_next_readahead = ra_end;
 827                        RAS_CDEBUG(ras);
 828                }
 829                spin_unlock(&ras->ras_lock);
 830        }
 831
 832        return ret;
 833}
 834
 835static void ras_set_start(struct inode *inode, struct ll_readahead_state *ras,
 836                          unsigned long index)
 837{
 838        ras->ras_window_start = index & (~(RAS_INCREASE_STEP(inode) - 1));
 839}
 840
 841/* called with the ras_lock held or from places where it doesn't matter */
 842static void ras_reset(struct inode *inode, struct ll_readahead_state *ras,
 843                      unsigned long index)
 844{
 845        ras->ras_last_readpage = index;
 846        ras->ras_consecutive_requests = 0;
 847        ras->ras_consecutive_pages = 0;
 848        ras->ras_window_len = 0;
 849        ras_set_start(inode, ras, index);
 850        ras->ras_next_readahead = max(ras->ras_window_start, index);
 851
 852        RAS_CDEBUG(ras);
 853}
 854
 855/* called with the ras_lock held or from places where it doesn't matter */
 856static void ras_stride_reset(struct ll_readahead_state *ras)
 857{
 858        ras->ras_consecutive_stride_requests = 0;
 859        ras->ras_stride_length = 0;
 860        ras->ras_stride_pages = 0;
 861        RAS_CDEBUG(ras);
 862}
 863
 864void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras)
 865{
 866        spin_lock_init(&ras->ras_lock);
 867        ras_reset(inode, ras, 0);
 868        ras->ras_requests = 0;
 869        INIT_LIST_HEAD(&ras->ras_read_beads);
 870}
 871
 872/*
 873 * Check whether the read request is in the stride window.
 874 * If it is in the stride window, return 1, otherwise return 0.
 875 */
 876static int index_in_stride_window(struct ll_readahead_state *ras,
 877                                  unsigned long index)
 878{
 879        unsigned long stride_gap;
 880
 881        if (ras->ras_stride_length == 0 || ras->ras_stride_pages == 0 ||
 882            ras->ras_stride_pages == ras->ras_stride_length)
 883                return 0;
 884
 885        stride_gap = index - ras->ras_last_readpage - 1;
 886
 887        /* If it is contiguous read */
 888        if (stride_gap == 0)
 889                return ras->ras_consecutive_pages + 1 <= ras->ras_stride_pages;
 890
 891        /* Otherwise check the stride by itself */
 892        return (ras->ras_stride_length - ras->ras_stride_pages) == stride_gap &&
 893                ras->ras_consecutive_pages == ras->ras_stride_pages;
 894}
 895
 896static void ras_update_stride_detector(struct ll_readahead_state *ras,
 897                                       unsigned long index)
 898{
 899        unsigned long stride_gap = index - ras->ras_last_readpage - 1;
 900
 901        if (!stride_io_mode(ras) && (stride_gap != 0 ||
 902             ras->ras_consecutive_stride_requests == 0)) {
 903                ras->ras_stride_pages = ras->ras_consecutive_pages;
 904                ras->ras_stride_length = stride_gap +ras->ras_consecutive_pages;
 905        }
 906        LASSERT(ras->ras_request_index == 0);
 907        LASSERT(ras->ras_consecutive_stride_requests == 0);
 908
 909        if (index <= ras->ras_last_readpage) {
 910                /*Reset stride window for forward read*/
 911                ras_stride_reset(ras);
 912                return;
 913        }
 914
 915        ras->ras_stride_pages = ras->ras_consecutive_pages;
 916        ras->ras_stride_length = stride_gap +ras->ras_consecutive_pages;
 917
 918        RAS_CDEBUG(ras);
 919        return;
 920}
 921
 922static unsigned long
 923stride_page_count(struct ll_readahead_state *ras, unsigned long len)
 924{
 925        return stride_pg_count(ras->ras_stride_offset, ras->ras_stride_length,
 926                               ras->ras_stride_pages, ras->ras_stride_offset,
 927                               len);
 928}
 929
 930/* Stride Read-ahead window will be increased inc_len according to
 931 * stride I/O pattern */
 932static void ras_stride_increase_window(struct ll_readahead_state *ras,
 933                                       struct ll_ra_info *ra,
 934                                       unsigned long inc_len)
 935{
 936        unsigned long left, step, window_len;
 937        unsigned long stride_len;
 938
 939        LASSERT(ras->ras_stride_length > 0);
 940        LASSERTF(ras->ras_window_start + ras->ras_window_len
 941                 >= ras->ras_stride_offset, "window_start %lu, window_len %lu"
 942                 " stride_offset %lu\n", ras->ras_window_start,
 943                 ras->ras_window_len, ras->ras_stride_offset);
 944
 945        stride_len = ras->ras_window_start + ras->ras_window_len -
 946                     ras->ras_stride_offset;
 947
 948        left = stride_len % ras->ras_stride_length;
 949        window_len = ras->ras_window_len - left;
 950
 951        if (left < ras->ras_stride_pages)
 952                left += inc_len;
 953        else
 954                left = ras->ras_stride_pages + inc_len;
 955
 956        LASSERT(ras->ras_stride_pages != 0);
 957
 958        step = left / ras->ras_stride_pages;
 959        left %= ras->ras_stride_pages;
 960
 961        window_len += step * ras->ras_stride_length + left;
 962
 963        if (stride_page_count(ras, window_len) <= ra->ra_max_pages_per_file)
 964                ras->ras_window_len = window_len;
 965
 966        RAS_CDEBUG(ras);
 967}
 968
 969static void ras_increase_window(struct inode *inode,
 970                                struct ll_readahead_state *ras,
 971                                struct ll_ra_info *ra)
 972{
 973        /* The stretch of ra-window should be aligned with max rpc_size
 974         * but current clio architecture does not support retrieve such
 975         * information from lower layer. FIXME later
 976         */
 977        if (stride_io_mode(ras))
 978                ras_stride_increase_window(ras, ra, RAS_INCREASE_STEP(inode));
 979        else
 980                ras->ras_window_len = min(ras->ras_window_len +
 981                                          RAS_INCREASE_STEP(inode),
 982                                          ra->ra_max_pages_per_file);
 983}
 984
 985void ras_update(struct ll_sb_info *sbi, struct inode *inode,
 986                struct ll_readahead_state *ras, unsigned long index,
 987                unsigned hit)
 988{
 989        struct ll_ra_info *ra = &sbi->ll_ra_info;
 990        int zero = 0, stride_detect = 0, ra_miss = 0;
 991
 992        spin_lock(&ras->ras_lock);
 993
 994        ll_ra_stats_inc_sbi(sbi, hit ? RA_STAT_HIT : RA_STAT_MISS);
 995
 996        /* reset the read-ahead window in two cases.  First when the app seeks
 997         * or reads to some other part of the file.  Secondly if we get a
 998         * read-ahead miss that we think we've previously issued.  This can
 999         * be a symptom of there being so many read-ahead pages that the VM is
1000         * reclaiming it before we get to it. */
1001        if (!index_in_window(index, ras->ras_last_readpage, 8, 8)) {
1002                zero = 1;
1003                ll_ra_stats_inc_sbi(sbi, RA_STAT_DISTANT_READPAGE);
1004        } else if (!hit && ras->ras_window_len &&
1005                   index < ras->ras_next_readahead &&
1006                   index_in_window(index, ras->ras_window_start, 0,
1007                                   ras->ras_window_len)) {
1008                ra_miss = 1;
1009                ll_ra_stats_inc_sbi(sbi, RA_STAT_MISS_IN_WINDOW);
1010        }
1011
1012        /* On the second access to a file smaller than the tunable
1013         * ra_max_read_ahead_whole_pages trigger RA on all pages in the
1014         * file up to ra_max_pages_per_file.  This is simply a best effort
1015         * and only occurs once per open file.  Normal RA behavior is reverted
1016         * to for subsequent IO.  The mmap case does not increment
1017         * ras_requests and thus can never trigger this behavior. */
1018        if (ras->ras_requests == 2 && !ras->ras_request_index) {
1019                __u64 kms_pages;
1020
1021                kms_pages = (i_size_read(inode) + PAGE_CACHE_SIZE - 1) >>
1022                            PAGE_CACHE_SHIFT;
1023
1024                CDEBUG(D_READA, "kmsp "LPU64" mwp %lu mp %lu\n", kms_pages,
1025                       ra->ra_max_read_ahead_whole_pages, ra->ra_max_pages_per_file);
1026
1027                if (kms_pages &&
1028                    kms_pages <= ra->ra_max_read_ahead_whole_pages) {
1029                        ras->ras_window_start = 0;
1030                        ras->ras_last_readpage = 0;
1031                        ras->ras_next_readahead = 0;
1032                        ras->ras_window_len = min(ra->ra_max_pages_per_file,
1033                                ra->ra_max_read_ahead_whole_pages);
1034                        GOTO(out_unlock, 0);
1035                }
1036        }
1037        if (zero) {
1038                /* check whether it is in stride I/O mode*/
1039                if (!index_in_stride_window(ras, index)) {
1040                        if (ras->ras_consecutive_stride_requests == 0 &&
1041                            ras->ras_request_index == 0) {
1042                                ras_update_stride_detector(ras, index);
1043                                ras->ras_consecutive_stride_requests++;
1044                        } else {
1045                                ras_stride_reset(ras);
1046                        }
1047                        ras_reset(inode, ras, index);
1048                        ras->ras_consecutive_pages++;
1049                        GOTO(out_unlock, 0);
1050                } else {
1051                        ras->ras_consecutive_pages = 0;
1052                        ras->ras_consecutive_requests = 0;
1053                        if (++ras->ras_consecutive_stride_requests > 1)
1054                                stride_detect = 1;
1055                        RAS_CDEBUG(ras);
1056                }
1057        } else {
1058                if (ra_miss) {
1059                        if (index_in_stride_window(ras, index) &&
1060                            stride_io_mode(ras)) {
1061                                /*If stride-RA hit cache miss, the stride dector
1062                                 *will not be reset to avoid the overhead of
1063                                 *redetecting read-ahead mode */
1064                                if (index != ras->ras_last_readpage + 1)
1065                                        ras->ras_consecutive_pages = 0;
1066                                ras_reset(inode, ras, index);
1067                                RAS_CDEBUG(ras);
1068                        } else {
1069                                /* Reset both stride window and normal RA
1070                                 * window */
1071                                ras_reset(inode, ras, index);
1072                                ras->ras_consecutive_pages++;
1073                                ras_stride_reset(ras);
1074                                GOTO(out_unlock, 0);
1075                        }
1076                } else if (stride_io_mode(ras)) {
1077                        /* If this is contiguous read but in stride I/O mode
1078                         * currently, check whether stride step still is valid,
1079                         * if invalid, it will reset the stride ra window*/
1080                        if (!index_in_stride_window(ras, index)) {
1081                                /* Shrink stride read-ahead window to be zero */
1082                                ras_stride_reset(ras);
1083                                ras->ras_window_len = 0;
1084                                ras->ras_next_readahead = index;
1085                        }
1086                }
1087        }
1088        ras->ras_consecutive_pages++;
1089        ras->ras_last_readpage = index;
1090        ras_set_start(inode, ras, index);
1091
1092        if (stride_io_mode(ras))
1093                /* Since stride readahead is sentivite to the offset
1094                 * of read-ahead, so we use original offset here,
1095                 * instead of ras_window_start, which is RPC aligned */
1096                ras->ras_next_readahead = max(index, ras->ras_next_readahead);
1097        else
1098                ras->ras_next_readahead = max(ras->ras_window_start,
1099                                              ras->ras_next_readahead);
1100        RAS_CDEBUG(ras);
1101
1102        /* Trigger RA in the mmap case where ras_consecutive_requests
1103         * is not incremented and thus can't be used to trigger RA */
1104        if (!ras->ras_window_len && ras->ras_consecutive_pages == 4) {
1105                ras->ras_window_len = RAS_INCREASE_STEP(inode);
1106                GOTO(out_unlock, 0);
1107        }
1108
1109        /* Initially reset the stride window offset to next_readahead*/
1110        if (ras->ras_consecutive_stride_requests == 2 && stride_detect) {
1111                /**
1112                 * Once stride IO mode is detected, next_readahead should be
1113                 * reset to make sure next_readahead > stride offset
1114                 */
1115                ras->ras_next_readahead = max(index, ras->ras_next_readahead);
1116                ras->ras_stride_offset = index;
1117                ras->ras_window_len = RAS_INCREASE_STEP(inode);
1118        }
1119
1120        /* The initial ras_window_len is set to the request size.  To avoid
1121         * uselessly reading and discarding pages for random IO the window is
1122         * only increased once per consecutive request received. */
1123        if ((ras->ras_consecutive_requests > 1 || stride_detect) &&
1124            !ras->ras_request_index)
1125                ras_increase_window(inode, ras, ra);
1126out_unlock:
1127        RAS_CDEBUG(ras);
1128        ras->ras_request_index++;
1129        spin_unlock(&ras->ras_lock);
1130        return;
1131}
1132
1133int ll_writepage(struct page *vmpage, struct writeback_control *wbc)
1134{
1135        struct inode           *inode = vmpage->mapping->host;
1136        struct ll_inode_info   *lli   = ll_i2info(inode);
1137        struct lu_env     *env;
1138        struct cl_io       *io;
1139        struct cl_page   *page;
1140        struct cl_object       *clob;
1141        struct cl_env_nest      nest;
1142        bool redirtied = false;
1143        bool unlocked = false;
1144        int result;
1145
1146        LASSERT(PageLocked(vmpage));
1147        LASSERT(!PageWriteback(vmpage));
1148
1149        LASSERT(ll_i2dtexp(inode) != NULL);
1150
1151        env = cl_env_nested_get(&nest);
1152        if (IS_ERR(env))
1153                GOTO(out, result = PTR_ERR(env));
1154
1155        clob  = ll_i2info(inode)->lli_clob;
1156        LASSERT(clob != NULL);
1157
1158        io = ccc_env_thread_io(env);
1159        io->ci_obj = clob;
1160        io->ci_ignore_layout = 1;
1161        result = cl_io_init(env, io, CIT_MISC, clob);
1162        if (result == 0) {
1163                page = cl_page_find(env, clob, vmpage->index,
1164                                    vmpage, CPT_CACHEABLE);
1165                if (!IS_ERR(page)) {
1166                        lu_ref_add(&page->cp_reference, "writepage",
1167                                   current);
1168                        cl_page_assume(env, io, page);
1169                        result = cl_page_flush(env, io, page);
1170                        if (result != 0) {
1171                                /*
1172                                 * Re-dirty page on error so it retries write,
1173                                 * but not in case when IO has actually
1174                                 * occurred and completed with an error.
1175                                 */
1176                                if (!PageError(vmpage)) {
1177                                        redirty_page_for_writepage(wbc, vmpage);
1178                                        result = 0;
1179                                        redirtied = true;
1180                                }
1181                        }
1182                        cl_page_disown(env, io, page);
1183                        unlocked = true;
1184                        lu_ref_del(&page->cp_reference,
1185                                   "writepage", current);
1186                        cl_page_put(env, page);
1187                } else {
1188                        result = PTR_ERR(page);
1189                }
1190        }
1191        cl_io_fini(env, io);
1192
1193        if (redirtied && wbc->sync_mode == WB_SYNC_ALL) {
1194                loff_t offset = cl_offset(clob, vmpage->index);
1195
1196                /* Flush page failed because the extent is being written out.
1197                 * Wait for the write of extent to be finished to avoid
1198                 * breaking kernel which assumes ->writepage should mark
1199                 * PageWriteback or clean the page. */
1200                result = cl_sync_file_range(inode, offset,
1201                                            offset + PAGE_CACHE_SIZE - 1,
1202                                            CL_FSYNC_LOCAL, 1);
1203                if (result > 0) {
1204                        /* actually we may have written more than one page.
1205                         * decreasing this page because the caller will count
1206                         * it. */
1207                        wbc->nr_to_write -= result - 1;
1208                        result = 0;
1209                }
1210        }
1211
1212        cl_env_nested_put(&nest, env);
1213        GOTO(out, result);
1214
1215out:
1216        if (result < 0) {
1217                if (!lli->lli_async_rc)
1218                        lli->lli_async_rc = result;
1219                SetPageError(vmpage);
1220                if (!unlocked)
1221                        unlock_page(vmpage);
1222        }
1223        return result;
1224}
1225
1226int ll_writepages(struct address_space *mapping, struct writeback_control *wbc)
1227{
1228        struct inode *inode = mapping->host;
1229        struct ll_sb_info *sbi = ll_i2sbi(inode);
1230        loff_t start;
1231        loff_t end;
1232        enum cl_fsync_mode mode;
1233        int range_whole = 0;
1234        int result;
1235        int ignore_layout = 0;
1236
1237        if (wbc->range_cyclic) {
1238                start = mapping->writeback_index << PAGE_CACHE_SHIFT;
1239                end = OBD_OBJECT_EOF;
1240        } else {
1241                start = wbc->range_start;
1242                end = wbc->range_end;
1243                if (end == LLONG_MAX) {
1244                        end = OBD_OBJECT_EOF;
1245                        range_whole = start == 0;
1246                }
1247        }
1248
1249        mode = CL_FSYNC_NONE;
1250        if (wbc->sync_mode == WB_SYNC_ALL)
1251                mode = CL_FSYNC_LOCAL;
1252
1253        if (sbi->ll_umounting)
1254                /* if the mountpoint is being umounted, all pages have to be
1255                 * evicted to avoid hitting LBUG when truncate_inode_pages()
1256                 * is called later on. */
1257                ignore_layout = 1;
1258        result = cl_sync_file_range(inode, start, end, mode, ignore_layout);
1259        if (result > 0) {
1260                wbc->nr_to_write -= result;
1261                result = 0;
1262         }
1263
1264        if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0)) {
1265                if (end == OBD_OBJECT_EOF)
1266                        end = i_size_read(inode);
1267                mapping->writeback_index = (end >> PAGE_CACHE_SHIFT) + 1;
1268        }
1269        return result;
1270}
1271
1272int ll_readpage(struct file *file, struct page *vmpage)
1273{
1274        struct ll_cl_context *lcc;
1275        int result;
1276
1277        lcc = ll_cl_init(file, vmpage, 0);
1278        if (!IS_ERR(lcc)) {
1279                struct lu_env  *env  = lcc->lcc_env;
1280                struct cl_io   *io   = lcc->lcc_io;
1281                struct cl_page *page = lcc->lcc_page;
1282
1283                LASSERT(page->cp_type == CPT_CACHEABLE);
1284                if (likely(!PageUptodate(vmpage))) {
1285                        cl_page_assume(env, io, page);
1286                        result = cl_io_read_page(env, io, page);
1287                } else {
1288                        /* Page from a non-object file. */
1289                        unlock_page(vmpage);
1290                        result = 0;
1291                }
1292                ll_cl_fini(lcc);
1293        } else {
1294                unlock_page(vmpage);
1295                result = PTR_ERR(lcc);
1296        }
1297        return result;
1298}
1299