linux/drivers/staging/lustre/lustre/llite/rw.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * GPL HEADER START
   4 *
   5 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   6 *
   7 * This program is free software; you can redistribute it and/or modify
   8 * it under the terms of the GNU General Public License version 2 only,
   9 * as published by the Free Software Foundation.
  10 *
  11 * This program is distributed in the hope that it will be useful, but
  12 * WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * General Public License version 2 for more details (a copy is included
  15 * in the LICENSE file that accompanied this code).
  16 *
  17 * You should have received a copy of the GNU General Public License
  18 * version 2 along with this program; If not, see
  19 * http://www.gnu.org/licenses/gpl-2.0.html
  20 *
  21 * GPL HEADER END
  22 */
  23/*
  24 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  25 * Use is subject to license terms.
  26 *
  27 * Copyright (c) 2011, 2015, Intel Corporation.
  28 */
  29/*
  30 * This file is part of Lustre, http://www.lustre.org/
  31 * Lustre is a trademark of Sun Microsystems, Inc.
  32 *
  33 * lustre/llite/rw.c
  34 *
  35 * Lustre Lite I/O page cache routines shared by different kernel revs
  36 */
  37
  38#include <linux/kernel.h>
  39#include <linux/mm.h>
  40#include <linux/string.h>
  41#include <linux/stat.h>
  42#include <linux/errno.h>
  43#include <linux/unistd.h>
  44#include <linux/writeback.h>
  45#include <linux/uaccess.h>
  46
  47#include <linux/fs.h>
  48#include <linux/pagemap.h>
  49/* current_is_kswapd() */
  50#include <linux/swap.h>
  51#include <linux/bvec.h>
  52
  53#define DEBUG_SUBSYSTEM S_LLITE
  54
  55#include <obd_cksum.h>
  56#include "llite_internal.h"
  57
  58static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which);
  59
  60/**
  61 * Get readahead pages from the filesystem readahead pool of the client for a
  62 * thread.
  63 *
  64 * /param sbi superblock for filesystem readahead state ll_ra_info
  65 * /param ria per-thread readahead state
  66 * /param pages number of pages requested for readahead for the thread.
  67 *
  68 * WARNING: This algorithm is used to reduce contention on sbi->ll_lock.
  69 * It should work well if the ra_max_pages is much greater than the single
  70 * file's read-ahead window, and not too many threads contending for
  71 * these readahead pages.
  72 *
  73 * TODO: There may be a 'global sync problem' if many threads are trying
  74 * to get an ra budget that is larger than the remaining readahead pages
  75 * and reach here at exactly the same time. They will compute /a ret to
  76 * consume the remaining pages, but will fail at atomic_add_return() and
  77 * get a zero ra window, although there is still ra space remaining. - Jay
  78 */
  79static unsigned long ll_ra_count_get(struct ll_sb_info *sbi,
  80                                     struct ra_io_arg *ria,
  81                                     unsigned long pages, unsigned long min)
  82{
  83        struct ll_ra_info *ra = &sbi->ll_ra_info;
  84        long ret;
  85
  86        /* If read-ahead pages left are less than 1M, do not do read-ahead,
  87         * otherwise it will form small read RPC(< 1M), which hurt server
  88         * performance a lot.
  89         */
  90        ret = min(ra->ra_max_pages - atomic_read(&ra->ra_cur_pages), pages);
  91        if (ret < 0 || ret < min_t(long, PTLRPC_MAX_BRW_PAGES, pages)) {
  92                ret = 0;
  93                goto out;
  94        }
  95
  96        if (atomic_add_return(ret, &ra->ra_cur_pages) > ra->ra_max_pages) {
  97                atomic_sub(ret, &ra->ra_cur_pages);
  98                ret = 0;
  99        }
 100
 101out:
 102        if (ret < min) {
 103                /* override ra limit for maximum performance */
 104                atomic_add(min - ret, &ra->ra_cur_pages);
 105                ret = min;
 106        }
 107        return ret;
 108}
 109
 110void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len)
 111{
 112        struct ll_ra_info *ra = &sbi->ll_ra_info;
 113
 114        atomic_sub(len, &ra->ra_cur_pages);
 115}
 116
 117static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which)
 118{
 119        LASSERTF(which < _NR_RA_STAT, "which: %u\n", which);
 120        lprocfs_counter_incr(sbi->ll_ra_stats, which);
 121}
 122
 123void ll_ra_stats_inc(struct inode *inode, enum ra_stat which)
 124{
 125        struct ll_sb_info *sbi = ll_i2sbi(inode);
 126
 127        ll_ra_stats_inc_sbi(sbi, which);
 128}
 129
 130#define RAS_CDEBUG(ras) \
 131        CDEBUG(D_READA,                                               \
 132               "lrp %lu cr %lu cp %lu ws %lu wl %lu nra %lu rpc %lu "        \
 133               "r %lu ri %lu csr %lu sf %lu sp %lu sl %lu\n",                \
 134               ras->ras_last_readpage, ras->ras_consecutive_requests,   \
 135               ras->ras_consecutive_pages, ras->ras_window_start,           \
 136               ras->ras_window_len, ras->ras_next_readahead,             \
 137               ras->ras_rpc_size,                                            \
 138               ras->ras_requests, ras->ras_request_index,                   \
 139               ras->ras_consecutive_stride_requests, ras->ras_stride_offset, \
 140               ras->ras_stride_pages, ras->ras_stride_length)
 141
 142static int index_in_window(unsigned long index, unsigned long point,
 143                           unsigned long before, unsigned long after)
 144{
 145        unsigned long start = point - before, end = point + after;
 146
 147        if (start > point)
 148                start = 0;
 149        if (end < point)
 150                end = ~0;
 151
 152        return start <= index && index <= end;
 153}
 154
 155void ll_ras_enter(struct file *f)
 156{
 157        struct ll_file_data *fd = LUSTRE_FPRIVATE(f);
 158        struct ll_readahead_state *ras = &fd->fd_ras;
 159
 160        spin_lock(&ras->ras_lock);
 161        ras->ras_requests++;
 162        ras->ras_request_index = 0;
 163        ras->ras_consecutive_requests++;
 164        spin_unlock(&ras->ras_lock);
 165}
 166
 167/**
 168 * Initiates read-ahead of a page with given index.
 169 *
 170 * \retval +ve: page was already uptodate so it will be skipped
 171 *              from being added;
 172 * \retval -ve: page wasn't added to \a queue for error;
 173 * \retval   0: page was added into \a queue for read ahead.
 174 */
 175static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io,
 176                              struct cl_page_list *queue, pgoff_t index)
 177{
 178        enum ra_stat which = _NR_RA_STAT; /* keep gcc happy */
 179        struct cl_object *clob = io->ci_obj;
 180        struct inode *inode = vvp_object_inode(clob);
 181        const char *msg = NULL;
 182        struct cl_page *page;
 183        struct vvp_page *vpg;
 184        struct page *vmpage;
 185        int rc = 0;
 186
 187        vmpage = grab_cache_page_nowait(inode->i_mapping, index);
 188        if (!vmpage) {
 189                which = RA_STAT_FAILED_GRAB_PAGE;
 190                msg = "g_c_p_n failed";
 191                rc = -EBUSY;
 192                goto out;
 193        }
 194
 195        /* Check if vmpage was truncated or reclaimed */
 196        if (vmpage->mapping != inode->i_mapping) {
 197                which = RA_STAT_WRONG_GRAB_PAGE;
 198                msg = "g_c_p_n returned invalid page";
 199                rc = -EBUSY;
 200                goto out;
 201        }
 202
 203        page = cl_page_find(env, clob, vmpage->index, vmpage, CPT_CACHEABLE);
 204        if (IS_ERR(page)) {
 205                which = RA_STAT_FAILED_GRAB_PAGE;
 206                msg = "cl_page_find failed";
 207                rc = PTR_ERR(page);
 208                goto out;
 209        }
 210
 211        lu_ref_add(&page->cp_reference, "ra", current);
 212        cl_page_assume(env, io, page);
 213        vpg = cl2vvp_page(cl_object_page_slice(clob, page));
 214        if (!vpg->vpg_defer_uptodate && !PageUptodate(vmpage)) {
 215                vpg->vpg_defer_uptodate = 1;
 216                vpg->vpg_ra_used = 0;
 217                cl_page_list_add(queue, page);
 218        } else {
 219                /* skip completed pages */
 220                cl_page_unassume(env, io, page);
 221                /* This page is already uptodate, returning a positive number
 222                 * to tell the callers about this
 223                 */
 224                rc = 1;
 225        }
 226
 227        lu_ref_del(&page->cp_reference, "ra", current);
 228        cl_page_put(env, page);
 229out:
 230        if (vmpage) {
 231                if (rc)
 232                        unlock_page(vmpage);
 233                put_page(vmpage);
 234        }
 235        if (msg) {
 236                ll_ra_stats_inc(inode, which);
 237                CDEBUG(D_READA, "%s\n", msg);
 238        }
 239        return rc;
 240}
 241
 242#define RIA_DEBUG(ria)                                                 \
 243        CDEBUG(D_READA, "rs %lu re %lu ro %lu rl %lu rp %lu\n",       \
 244        ria->ria_start, ria->ria_end, ria->ria_stoff, ria->ria_length,\
 245        ria->ria_pages)
 246
 247static inline int stride_io_mode(struct ll_readahead_state *ras)
 248{
 249        return ras->ras_consecutive_stride_requests > 1;
 250}
 251
 252/* The function calculates how much pages will be read in
 253 * [off, off + length], in such stride IO area,
 254 * stride_offset = st_off, stride_length = st_len,
 255 * stride_pages = st_pgs
 256 *
 257 *   |------------------|*****|------------------|*****|------------|*****|....
 258 * st_off
 259 *   |--- st_pgs     ---|
 260 *   |-----     st_len   -----|
 261 *
 262 *            How many pages it should read in such pattern
 263 *            |-------------------------------------------------------------|
 264 *            off
 265 *            |<------            length                      ------->|
 266 *
 267 *        =   |<----->|  +  |-------------------------------------| +   |---|
 268 *           start_left          st_pgs * i                 end_left
 269 */
 270static unsigned long
 271stride_pg_count(pgoff_t st_off, unsigned long st_len, unsigned long st_pgs,
 272                unsigned long off, unsigned long length)
 273{
 274        __u64 start = off > st_off ? off - st_off : 0;
 275        __u64 end = off + length > st_off ? off + length - st_off : 0;
 276        unsigned long start_left = 0;
 277        unsigned long end_left = 0;
 278        unsigned long pg_count;
 279
 280        if (st_len == 0 || length == 0 || end == 0)
 281                return length;
 282
 283        start_left = do_div(start, st_len);
 284        if (start_left < st_pgs)
 285                start_left = st_pgs - start_left;
 286        else
 287                start_left = 0;
 288
 289        end_left = do_div(end, st_len);
 290        if (end_left > st_pgs)
 291                end_left = st_pgs;
 292
 293        CDEBUG(D_READA, "start %llu, end %llu start_left %lu end_left %lu\n",
 294               start, end, start_left, end_left);
 295
 296        if (start == end)
 297                pg_count = end_left - (st_pgs - start_left);
 298        else
 299                pg_count = start_left + st_pgs * (end - start - 1) + end_left;
 300
 301        CDEBUG(D_READA,
 302               "st_off %lu, st_len %lu st_pgs %lu off %lu length %lu pgcount %lu\n",
 303               st_off, st_len, st_pgs, off, length, pg_count);
 304
 305        return pg_count;
 306}
 307
 308static int ria_page_count(struct ra_io_arg *ria)
 309{
 310        __u64 length = ria->ria_end >= ria->ria_start ?
 311                       ria->ria_end - ria->ria_start + 1 : 0;
 312
 313        return stride_pg_count(ria->ria_stoff, ria->ria_length,
 314                               ria->ria_pages, ria->ria_start,
 315                               length);
 316}
 317
 318static unsigned long ras_align(struct ll_readahead_state *ras,
 319                               unsigned long index,
 320                               unsigned long *remainder)
 321{
 322        unsigned long rem = index % ras->ras_rpc_size;
 323
 324        if (remainder)
 325                *remainder = rem;
 326        return index - rem;
 327}
 328
 329/*Check whether the index is in the defined ra-window */
 330static int ras_inside_ra_window(unsigned long idx, struct ra_io_arg *ria)
 331{
 332        /* If ria_length == ria_pages, it means non-stride I/O mode,
 333         * idx should always inside read-ahead window in this case
 334         * For stride I/O mode, just check whether the idx is inside
 335         * the ria_pages.
 336         */
 337        return ria->ria_length == 0 || ria->ria_length == ria->ria_pages ||
 338               (idx >= ria->ria_stoff && (idx - ria->ria_stoff) %
 339                ria->ria_length < ria->ria_pages);
 340}
 341
 342static unsigned long
 343ll_read_ahead_pages(const struct lu_env *env, struct cl_io *io,
 344                    struct cl_page_list *queue, struct ll_readahead_state *ras,
 345                    struct ra_io_arg *ria)
 346{
 347        struct cl_read_ahead ra = { 0 };
 348        unsigned long ra_end = 0;
 349        bool stride_ria;
 350        pgoff_t page_idx;
 351        int rc;
 352
 353        LASSERT(ria);
 354        RIA_DEBUG(ria);
 355
 356        stride_ria = ria->ria_length > ria->ria_pages && ria->ria_pages > 0;
 357        for (page_idx = ria->ria_start;
 358             page_idx <= ria->ria_end && ria->ria_reserved > 0; page_idx++) {
 359                if (ras_inside_ra_window(page_idx, ria)) {
 360                        if (!ra.cra_end || ra.cra_end < page_idx) {
 361                                unsigned long end;
 362
 363                                cl_read_ahead_release(env, &ra);
 364
 365                                rc = cl_io_read_ahead(env, io, page_idx, &ra);
 366                                if (rc < 0)
 367                                        break;
 368
 369                                CDEBUG(D_READA, "idx: %lu, ra: %lu, rpc: %lu\n",
 370                                       page_idx, ra.cra_end, ra.cra_rpc_size);
 371                                LASSERTF(ra.cra_end >= page_idx,
 372                                         "object: %p, indcies %lu / %lu\n",
 373                                         io->ci_obj, ra.cra_end, page_idx);
 374                                /*
 375                                 * update read ahead RPC size.
 376                                 * NB: it's racy but doesn't matter
 377                                 */
 378                                if (ras->ras_rpc_size > ra.cra_rpc_size &&
 379                                    ra.cra_rpc_size > 0)
 380                                        ras->ras_rpc_size = ra.cra_rpc_size;
 381                                /* trim it to align with optimal RPC size */
 382                                end = ras_align(ras, ria->ria_end + 1, NULL);
 383                                if (end > 0 && !ria->ria_eof)
 384                                        ria->ria_end = end - 1;
 385                                if (ria->ria_end < ria->ria_end_min)
 386                                        ria->ria_end = ria->ria_end_min;
 387                                if (ria->ria_end > ra.cra_end)
 388                                        ria->ria_end = ra.cra_end;
 389                        }
 390
 391                        /* If the page is inside the read-ahead window */
 392                        rc = ll_read_ahead_page(env, io, queue, page_idx);
 393                        if (rc < 0)
 394                                break;
 395
 396                        ra_end = page_idx;
 397                        if (!rc)
 398                                ria->ria_reserved--;
 399                } else if (stride_ria) {
 400                        /* If it is not in the read-ahead window, and it is
 401                         * read-ahead mode, then check whether it should skip
 402                         * the stride gap
 403                         */
 404                        pgoff_t offset;
 405                        /* FIXME: This assertion only is valid when it is for
 406                         * forward read-ahead, it will be fixed when backward
 407                         * read-ahead is implemented
 408                         */
 409                        LASSERTF(page_idx >= ria->ria_stoff,
 410                                 "Invalid page_idx %lu rs %lu re %lu ro %lu rl %lu rp %lu\n",
 411                                 page_idx,
 412                                 ria->ria_start, ria->ria_end, ria->ria_stoff,
 413                                 ria->ria_length, ria->ria_pages);
 414                        offset = page_idx - ria->ria_stoff;
 415                        offset = offset % (ria->ria_length);
 416                        if (offset > ria->ria_pages) {
 417                                page_idx += ria->ria_length - offset;
 418                                CDEBUG(D_READA, "i %lu skip %lu\n", page_idx,
 419                                       ria->ria_length - offset);
 420                                continue;
 421                        }
 422                }
 423        }
 424        cl_read_ahead_release(env, &ra);
 425
 426        return ra_end;
 427}
 428
 429static int ll_readahead(const struct lu_env *env, struct cl_io *io,
 430                        struct cl_page_list *queue,
 431                        struct ll_readahead_state *ras, bool hit)
 432{
 433        struct vvp_io *vio = vvp_env_io(env);
 434        struct ll_thread_info *lti = ll_env_info(env);
 435        struct cl_attr *attr = vvp_env_thread_attr(env);
 436        unsigned long len, mlen = 0;
 437        pgoff_t ra_end, start = 0, end = 0;
 438        struct inode *inode;
 439        struct ra_io_arg *ria = &lti->lti_ria;
 440        struct cl_object *clob;
 441        int ret = 0;
 442        __u64 kms;
 443
 444        clob = io->ci_obj;
 445        inode = vvp_object_inode(clob);
 446
 447        memset(ria, 0, sizeof(*ria));
 448
 449        cl_object_attr_lock(clob);
 450        ret = cl_object_attr_get(env, clob, attr);
 451        cl_object_attr_unlock(clob);
 452
 453        if (ret != 0)
 454                return ret;
 455        kms = attr->cat_kms;
 456        if (kms == 0) {
 457                ll_ra_stats_inc(inode, RA_STAT_ZERO_LEN);
 458                return 0;
 459        }
 460
 461        spin_lock(&ras->ras_lock);
 462
 463        /**
 464         * Note: other thread might rollback the ras_next_readahead,
 465         * if it can not get the full size of prepared pages, see the
 466         * end of this function. For stride read ahead, it needs to
 467         * make sure the offset is no less than ras_stride_offset,
 468         * so that stride read ahead can work correctly.
 469         */
 470        if (stride_io_mode(ras))
 471                start = max(ras->ras_next_readahead, ras->ras_stride_offset);
 472        else
 473                start = ras->ras_next_readahead;
 474
 475        if (ras->ras_window_len > 0)
 476                end = ras->ras_window_start + ras->ras_window_len - 1;
 477
 478        /* Enlarge the RA window to encompass the full read */
 479        if (vio->vui_ra_valid &&
 480            end < vio->vui_ra_start + vio->vui_ra_count - 1)
 481                end = vio->vui_ra_start + vio->vui_ra_count - 1;
 482
 483        if (end) {
 484                unsigned long end_index;
 485
 486                /* Truncate RA window to end of file */
 487                end_index = (unsigned long)((kms - 1) >> PAGE_SHIFT);
 488                if (end_index <= end) {
 489                        end = end_index;
 490                        ria->ria_eof = true;
 491                }
 492
 493                ras->ras_next_readahead = max(end, end + 1);
 494                RAS_CDEBUG(ras);
 495        }
 496        ria->ria_start = start;
 497        ria->ria_end = end;
 498        /* If stride I/O mode is detected, get stride window*/
 499        if (stride_io_mode(ras)) {
 500                ria->ria_stoff = ras->ras_stride_offset;
 501                ria->ria_length = ras->ras_stride_length;
 502                ria->ria_pages = ras->ras_stride_pages;
 503        }
 504        spin_unlock(&ras->ras_lock);
 505
 506        if (end == 0) {
 507                ll_ra_stats_inc(inode, RA_STAT_ZERO_WINDOW);
 508                return 0;
 509        }
 510        len = ria_page_count(ria);
 511        if (len == 0) {
 512                ll_ra_stats_inc(inode, RA_STAT_ZERO_WINDOW);
 513                return 0;
 514        }
 515
 516        CDEBUG(D_READA, DFID ": ria: %lu/%lu, bead: %lu/%lu, hit: %d\n",
 517               PFID(lu_object_fid(&clob->co_lu)),
 518               ria->ria_start, ria->ria_end,
 519               vio->vui_ra_valid ? vio->vui_ra_start : 0,
 520               vio->vui_ra_valid ? vio->vui_ra_count : 0,
 521               hit);
 522
 523        /* at least to extend the readahead window to cover current read */
 524        if (!hit && vio->vui_ra_valid &&
 525            vio->vui_ra_start + vio->vui_ra_count > ria->ria_start) {
 526                unsigned long remainder;
 527
 528                /* to the end of current read window. */
 529                mlen = vio->vui_ra_start + vio->vui_ra_count - ria->ria_start;
 530                /* trim to RPC boundary */
 531                ras_align(ras, ria->ria_start, &remainder);
 532                mlen = min(mlen, ras->ras_rpc_size - remainder);
 533                ria->ria_end_min = ria->ria_start + mlen;
 534        }
 535
 536        ria->ria_reserved = ll_ra_count_get(ll_i2sbi(inode), ria, len, mlen);
 537        if (ria->ria_reserved < len)
 538                ll_ra_stats_inc(inode, RA_STAT_MAX_IN_FLIGHT);
 539
 540        CDEBUG(D_READA, "reserved pages %lu/%lu/%lu, ra_cur %d, ra_max %lu\n",
 541               ria->ria_reserved, len, mlen,
 542               atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages),
 543               ll_i2sbi(inode)->ll_ra_info.ra_max_pages);
 544
 545        ra_end = ll_read_ahead_pages(env, io, queue, ras, ria);
 546
 547        if (ria->ria_reserved)
 548                ll_ra_count_put(ll_i2sbi(inode), ria->ria_reserved);
 549
 550        if (ra_end == end && ra_end == (kms >> PAGE_SHIFT))
 551                ll_ra_stats_inc(inode, RA_STAT_EOF);
 552
 553        /* if we didn't get to the end of the region we reserved from
 554         * the ras we need to go back and update the ras so that the
 555         * next read-ahead tries from where we left off.  we only do so
 556         * if the region we failed to issue read-ahead on is still ahead
 557         * of the app and behind the next index to start read-ahead from
 558         */
 559        CDEBUG(D_READA, "ra_end = %lu end = %lu stride end = %lu pages = %d\n",
 560               ra_end, end, ria->ria_end, ret);
 561
 562        if (ra_end > 0 && ra_end != end) {
 563                ll_ra_stats_inc(inode, RA_STAT_FAILED_REACH_END);
 564                spin_lock(&ras->ras_lock);
 565                if (ra_end <= ras->ras_next_readahead &&
 566                    index_in_window(ra_end, ras->ras_window_start, 0,
 567                                    ras->ras_window_len)) {
 568                        ras->ras_next_readahead = ra_end + 1;
 569                        RAS_CDEBUG(ras);
 570                }
 571                spin_unlock(&ras->ras_lock);
 572        }
 573
 574        return ret;
 575}
 576
 577static void ras_set_start(struct inode *inode, struct ll_readahead_state *ras,
 578                          unsigned long index)
 579{
 580        ras->ras_window_start = ras_align(ras, index, NULL);
 581}
 582
 583/* called with the ras_lock held or from places where it doesn't matter */
 584static void ras_reset(struct inode *inode, struct ll_readahead_state *ras,
 585                      unsigned long index)
 586{
 587        ras->ras_last_readpage = index;
 588        ras->ras_consecutive_requests = 0;
 589        ras->ras_consecutive_pages = 0;
 590        ras->ras_window_len = 0;
 591        ras_set_start(inode, ras, index);
 592        ras->ras_next_readahead = max(ras->ras_window_start, index + 1);
 593
 594        RAS_CDEBUG(ras);
 595}
 596
 597/* called with the ras_lock held or from places where it doesn't matter */
 598static void ras_stride_reset(struct ll_readahead_state *ras)
 599{
 600        ras->ras_consecutive_stride_requests = 0;
 601        ras->ras_stride_length = 0;
 602        ras->ras_stride_pages = 0;
 603        RAS_CDEBUG(ras);
 604}
 605
 606void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras)
 607{
 608        spin_lock_init(&ras->ras_lock);
 609        ras->ras_rpc_size = PTLRPC_MAX_BRW_PAGES;
 610        ras_reset(inode, ras, 0);
 611        ras->ras_requests = 0;
 612}
 613
 614/*
 615 * Check whether the read request is in the stride window.
 616 * If it is in the stride window, return 1, otherwise return 0.
 617 */
 618static int index_in_stride_window(struct ll_readahead_state *ras,
 619                                  unsigned long index)
 620{
 621        unsigned long stride_gap;
 622
 623        if (ras->ras_stride_length == 0 || ras->ras_stride_pages == 0 ||
 624            ras->ras_stride_pages == ras->ras_stride_length)
 625                return 0;
 626
 627        stride_gap = index - ras->ras_last_readpage - 1;
 628
 629        /* If it is contiguous read */
 630        if (stride_gap == 0)
 631                return ras->ras_consecutive_pages + 1 <= ras->ras_stride_pages;
 632
 633        /* Otherwise check the stride by itself */
 634        return (ras->ras_stride_length - ras->ras_stride_pages) == stride_gap &&
 635                ras->ras_consecutive_pages == ras->ras_stride_pages;
 636}
 637
 638static void ras_update_stride_detector(struct ll_readahead_state *ras,
 639                                       unsigned long index)
 640{
 641        unsigned long stride_gap = index - ras->ras_last_readpage - 1;
 642
 643        if ((stride_gap != 0 || ras->ras_consecutive_stride_requests == 0) &&
 644            !stride_io_mode(ras)) {
 645                ras->ras_stride_pages = ras->ras_consecutive_pages;
 646                ras->ras_stride_length = ras->ras_consecutive_pages +
 647                                         stride_gap;
 648        }
 649        LASSERT(ras->ras_request_index == 0);
 650        LASSERT(ras->ras_consecutive_stride_requests == 0);
 651
 652        if (index <= ras->ras_last_readpage) {
 653                /*Reset stride window for forward read*/
 654                ras_stride_reset(ras);
 655                return;
 656        }
 657
 658        ras->ras_stride_pages = ras->ras_consecutive_pages;
 659        ras->ras_stride_length = stride_gap + ras->ras_consecutive_pages;
 660
 661        RAS_CDEBUG(ras);
 662}
 663
 664/* Stride Read-ahead window will be increased inc_len according to
 665 * stride I/O pattern
 666 */
 667static void ras_stride_increase_window(struct ll_readahead_state *ras,
 668                                       struct ll_ra_info *ra,
 669                                       unsigned long inc_len)
 670{
 671        unsigned long left, step, window_len;
 672        unsigned long stride_len;
 673
 674        LASSERT(ras->ras_stride_length > 0);
 675        LASSERTF(ras->ras_window_start + ras->ras_window_len >=
 676                 ras->ras_stride_offset,
 677                 "window_start %lu, window_len %lu stride_offset %lu\n",
 678                 ras->ras_window_start,
 679                 ras->ras_window_len, ras->ras_stride_offset);
 680
 681        stride_len = ras->ras_window_start + ras->ras_window_len -
 682                     ras->ras_stride_offset;
 683
 684        left = stride_len % ras->ras_stride_length;
 685        window_len = ras->ras_window_len - left;
 686
 687        if (left < ras->ras_stride_pages)
 688                left += inc_len;
 689        else
 690                left = ras->ras_stride_pages + inc_len;
 691
 692        LASSERT(ras->ras_stride_pages != 0);
 693
 694        step = left / ras->ras_stride_pages;
 695        left %= ras->ras_stride_pages;
 696
 697        window_len += step * ras->ras_stride_length + left;
 698
 699        if (stride_pg_count(ras->ras_stride_offset, ras->ras_stride_length,
 700                            ras->ras_stride_pages, ras->ras_stride_offset,
 701                            window_len) <= ra->ra_max_pages_per_file)
 702                ras->ras_window_len = window_len;
 703
 704        RAS_CDEBUG(ras);
 705}
 706
 707static void ras_increase_window(struct inode *inode,
 708                                struct ll_readahead_state *ras,
 709                                struct ll_ra_info *ra)
 710{
 711        /* The stretch of ra-window should be aligned with max rpc_size
 712         * but current clio architecture does not support retrieve such
 713         * information from lower layer. FIXME later
 714         */
 715        if (stride_io_mode(ras)) {
 716                ras_stride_increase_window(ras, ra, ras->ras_rpc_size);
 717        } else {
 718                unsigned long wlen;
 719
 720                wlen = min(ras->ras_window_len + ras->ras_rpc_size,
 721                           ra->ra_max_pages_per_file);
 722                ras->ras_window_len = ras_align(ras, wlen, NULL);
 723        }
 724}
 725
 726static void ras_update(struct ll_sb_info *sbi, struct inode *inode,
 727                       struct ll_readahead_state *ras, unsigned long index,
 728                       enum ras_update_flags flags)
 729{
 730        struct ll_ra_info *ra = &sbi->ll_ra_info;
 731        int zero = 0, stride_detect = 0, ra_miss = 0;
 732        bool hit = flags & LL_RAS_HIT;
 733
 734        spin_lock(&ras->ras_lock);
 735
 736        if (!hit)
 737                CDEBUG(D_READA, DFID " pages at %lu miss.\n",
 738                       PFID(ll_inode2fid(inode)), index);
 739
 740        ll_ra_stats_inc_sbi(sbi, hit ? RA_STAT_HIT : RA_STAT_MISS);
 741
 742        /* reset the read-ahead window in two cases.  First when the app seeks
 743         * or reads to some other part of the file.  Secondly if we get a
 744         * read-ahead miss that we think we've previously issued.  This can
 745         * be a symptom of there being so many read-ahead pages that the VM is
 746         * reclaiming it before we get to it.
 747         */
 748        if (!index_in_window(index, ras->ras_last_readpage, 8, 8)) {
 749                zero = 1;
 750                ll_ra_stats_inc_sbi(sbi, RA_STAT_DISTANT_READPAGE);
 751        } else if (!hit && ras->ras_window_len &&
 752                   index < ras->ras_next_readahead &&
 753                   index_in_window(index, ras->ras_window_start, 0,
 754                                   ras->ras_window_len)) {
 755                ra_miss = 1;
 756                ll_ra_stats_inc_sbi(sbi, RA_STAT_MISS_IN_WINDOW);
 757        }
 758
 759        /* On the second access to a file smaller than the tunable
 760         * ra_max_read_ahead_whole_pages trigger RA on all pages in the
 761         * file up to ra_max_pages_per_file.  This is simply a best effort
 762         * and only occurs once per open file.  Normal RA behavior is reverted
 763         * to for subsequent IO.  The mmap case does not increment
 764         * ras_requests and thus can never trigger this behavior.
 765         */
 766        if (ras->ras_requests >= 2 && !ras->ras_request_index) {
 767                __u64 kms_pages;
 768
 769                kms_pages = (i_size_read(inode) + PAGE_SIZE - 1) >>
 770                            PAGE_SHIFT;
 771
 772                CDEBUG(D_READA, "kmsp %llu mwp %lu mp %lu\n", kms_pages,
 773                       ra->ra_max_read_ahead_whole_pages,
 774                       ra->ra_max_pages_per_file);
 775
 776                if (kms_pages &&
 777                    kms_pages <= ra->ra_max_read_ahead_whole_pages) {
 778                        ras->ras_window_start = 0;
 779                        ras->ras_next_readahead = index + 1;
 780                        ras->ras_window_len = min(ra->ra_max_pages_per_file,
 781                                ra->ra_max_read_ahead_whole_pages);
 782                        goto out_unlock;
 783                }
 784        }
 785        if (zero) {
 786                /* check whether it is in stride I/O mode*/
 787                if (!index_in_stride_window(ras, index)) {
 788                        if (ras->ras_consecutive_stride_requests == 0 &&
 789                            ras->ras_request_index == 0) {
 790                                ras_update_stride_detector(ras, index);
 791                                ras->ras_consecutive_stride_requests++;
 792                        } else {
 793                                ras_stride_reset(ras);
 794                        }
 795                        ras_reset(inode, ras, index);
 796                        ras->ras_consecutive_pages++;
 797                        goto out_unlock;
 798                } else {
 799                        ras->ras_consecutive_pages = 0;
 800                        ras->ras_consecutive_requests = 0;
 801                        if (++ras->ras_consecutive_stride_requests > 1)
 802                                stride_detect = 1;
 803                        RAS_CDEBUG(ras);
 804                }
 805        } else {
 806                if (ra_miss) {
 807                        if (index_in_stride_window(ras, index) &&
 808                            stride_io_mode(ras)) {
 809                                if (index != ras->ras_last_readpage + 1)
 810                                        ras->ras_consecutive_pages = 0;
 811                                ras_reset(inode, ras, index);
 812
 813                                /* If stride-RA hit cache miss, the stride
 814                                 * detector will not be reset to avoid the
 815                                 * overhead of redetecting read-ahead mode,
 816                                 * but on the condition that the stride window
 817                                 * is still intersect with normal sequential
 818                                 * read-ahead window.
 819                                 */
 820                                if (ras->ras_window_start <
 821                                    ras->ras_stride_offset)
 822                                        ras_stride_reset(ras);
 823                                RAS_CDEBUG(ras);
 824                        } else {
 825                                /* Reset both stride window and normal RA
 826                                 * window
 827                                 */
 828                                ras_reset(inode, ras, index);
 829                                ras->ras_consecutive_pages++;
 830                                ras_stride_reset(ras);
 831                                goto out_unlock;
 832                        }
 833                } else if (stride_io_mode(ras)) {
 834                        /* If this is contiguous read but in stride I/O mode
 835                         * currently, check whether stride step still is valid,
 836                         * if invalid, it will reset the stride ra window
 837                         */
 838                        if (!index_in_stride_window(ras, index)) {
 839                                /* Shrink stride read-ahead window to be zero */
 840                                ras_stride_reset(ras);
 841                                ras->ras_window_len = 0;
 842                                ras->ras_next_readahead = index;
 843                        }
 844                }
 845        }
 846        ras->ras_consecutive_pages++;
 847        ras->ras_last_readpage = index;
 848        ras_set_start(inode, ras, index);
 849
 850        if (stride_io_mode(ras)) {
 851                /* Since stride readahead is sensitive to the offset
 852                 * of read-ahead, so we use original offset here,
 853                 * instead of ras_window_start, which is RPC aligned
 854                 */
 855                ras->ras_next_readahead = max(index, ras->ras_next_readahead);
 856                ras->ras_window_start = max(ras->ras_stride_offset,
 857                                            ras->ras_window_start);
 858        } else {
 859                if (ras->ras_next_readahead < ras->ras_window_start)
 860                        ras->ras_next_readahead = ras->ras_window_start;
 861                if (!hit)
 862                        ras->ras_next_readahead = index + 1;
 863        }
 864        RAS_CDEBUG(ras);
 865
 866        /* Trigger RA in the mmap case where ras_consecutive_requests
 867         * is not incremented and thus can't be used to trigger RA
 868         */
 869        if (ras->ras_consecutive_pages >= 4 && flags & LL_RAS_MMAP) {
 870                ras_increase_window(inode, ras, ra);
 871                /*
 872                 * reset consecutive pages so that the readahead window can
 873                 * grow gradually.
 874                 */
 875                ras->ras_consecutive_pages = 0;
 876                goto out_unlock;
 877        }
 878
 879        /* Initially reset the stride window offset to next_readahead*/
 880        if (ras->ras_consecutive_stride_requests == 2 && stride_detect) {
 881                /**
 882                 * Once stride IO mode is detected, next_readahead should be
 883                 * reset to make sure next_readahead > stride offset
 884                 */
 885                ras->ras_next_readahead = max(index, ras->ras_next_readahead);
 886                ras->ras_stride_offset = index;
 887                ras->ras_window_start = max(index, ras->ras_window_start);
 888        }
 889
 890        /* The initial ras_window_len is set to the request size.  To avoid
 891         * uselessly reading and discarding pages for random IO the window is
 892         * only increased once per consecutive request received.
 893         */
 894        if ((ras->ras_consecutive_requests > 1 || stride_detect) &&
 895            !ras->ras_request_index)
 896                ras_increase_window(inode, ras, ra);
 897out_unlock:
 898        RAS_CDEBUG(ras);
 899        ras->ras_request_index++;
 900        spin_unlock(&ras->ras_lock);
 901}
 902
 903int ll_writepage(struct page *vmpage, struct writeback_control *wbc)
 904{
 905        struct inode           *inode = vmpage->mapping->host;
 906        struct ll_inode_info   *lli   = ll_i2info(inode);
 907        struct lu_env     *env;
 908        struct cl_io       *io;
 909        struct cl_page   *page;
 910        struct cl_object       *clob;
 911        bool redirtied = false;
 912        bool unlocked = false;
 913        int result;
 914        u16 refcheck;
 915
 916        LASSERT(PageLocked(vmpage));
 917        LASSERT(!PageWriteback(vmpage));
 918
 919        LASSERT(ll_i2dtexp(inode));
 920
 921        env = cl_env_get(&refcheck);
 922        if (IS_ERR(env)) {
 923                result = PTR_ERR(env);
 924                goto out;
 925        }
 926
 927        clob  = ll_i2info(inode)->lli_clob;
 928        LASSERT(clob);
 929
 930        io = vvp_env_thread_io(env);
 931        io->ci_obj = clob;
 932        io->ci_ignore_layout = 1;
 933        result = cl_io_init(env, io, CIT_MISC, clob);
 934        if (result == 0) {
 935                page = cl_page_find(env, clob, vmpage->index,
 936                                    vmpage, CPT_CACHEABLE);
 937                if (!IS_ERR(page)) {
 938                        lu_ref_add(&page->cp_reference, "writepage",
 939                                   current);
 940                        cl_page_assume(env, io, page);
 941                        result = cl_page_flush(env, io, page);
 942                        if (result != 0) {
 943                                /*
 944                                 * Re-dirty page on error so it retries write,
 945                                 * but not in case when IO has actually
 946                                 * occurred and completed with an error.
 947                                 */
 948                                if (!PageError(vmpage)) {
 949                                        redirty_page_for_writepage(wbc, vmpage);
 950                                        result = 0;
 951                                        redirtied = true;
 952                                }
 953                        }
 954                        cl_page_disown(env, io, page);
 955                        unlocked = true;
 956                        lu_ref_del(&page->cp_reference,
 957                                   "writepage", current);
 958                        cl_page_put(env, page);
 959                } else {
 960                        result = PTR_ERR(page);
 961                }
 962        }
 963        cl_io_fini(env, io);
 964
 965        if (redirtied && wbc->sync_mode == WB_SYNC_ALL) {
 966                loff_t offset = cl_offset(clob, vmpage->index);
 967
 968                /* Flush page failed because the extent is being written out.
 969                 * Wait for the write of extent to be finished to avoid
 970                 * breaking kernel which assumes ->writepage should mark
 971                 * PageWriteback or clean the page.
 972                 */
 973                result = cl_sync_file_range(inode, offset,
 974                                            offset + PAGE_SIZE - 1,
 975                                            CL_FSYNC_LOCAL, 1);
 976                if (result > 0) {
 977                        /* actually we may have written more than one page.
 978                         * decreasing this page because the caller will count
 979                         * it.
 980                         */
 981                        wbc->nr_to_write -= result - 1;
 982                        result = 0;
 983                }
 984        }
 985
 986        cl_env_put(env, &refcheck);
 987        goto out;
 988
 989out:
 990        if (result < 0) {
 991                if (!lli->lli_async_rc)
 992                        lli->lli_async_rc = result;
 993                SetPageError(vmpage);
 994                if (!unlocked)
 995                        unlock_page(vmpage);
 996        }
 997        return result;
 998}
 999
1000int ll_writepages(struct address_space *mapping, struct writeback_control *wbc)
1001{
1002        struct inode *inode = mapping->host;
1003        struct ll_sb_info *sbi = ll_i2sbi(inode);
1004        loff_t start;
1005        loff_t end;
1006        enum cl_fsync_mode mode;
1007        int range_whole = 0;
1008        int result;
1009        int ignore_layout = 0;
1010
1011        if (wbc->range_cyclic) {
1012                start = mapping->writeback_index << PAGE_SHIFT;
1013                end = OBD_OBJECT_EOF;
1014        } else {
1015                start = wbc->range_start;
1016                end = wbc->range_end;
1017                if (end == LLONG_MAX) {
1018                        end = OBD_OBJECT_EOF;
1019                        range_whole = start == 0;
1020                }
1021        }
1022
1023        mode = CL_FSYNC_NONE;
1024        if (wbc->sync_mode == WB_SYNC_ALL)
1025                mode = CL_FSYNC_LOCAL;
1026
1027        if (sbi->ll_umounting)
1028                /* if the mountpoint is being umounted, all pages have to be
1029                 * evicted to avoid hitting LBUG when truncate_inode_pages()
1030                 * is called later on.
1031                 */
1032                ignore_layout = 1;
1033
1034        if (!ll_i2info(inode)->lli_clob)
1035                return 0;
1036
1037        result = cl_sync_file_range(inode, start, end, mode, ignore_layout);
1038        if (result > 0) {
1039                wbc->nr_to_write -= result;
1040                result = 0;
1041        }
1042
1043        if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0)) {
1044                if (end == OBD_OBJECT_EOF)
1045                        mapping->writeback_index = 0;
1046                else
1047                        mapping->writeback_index = (end >> PAGE_SHIFT) + 1;
1048        }
1049        return result;
1050}
1051
1052struct ll_cl_context *ll_cl_find(struct file *file)
1053{
1054        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1055        struct ll_cl_context *lcc;
1056        struct ll_cl_context *found = NULL;
1057
1058        read_lock(&fd->fd_lock);
1059        list_for_each_entry(lcc, &fd->fd_lccs, lcc_list) {
1060                if (lcc->lcc_cookie == current) {
1061                        found = lcc;
1062                        break;
1063                }
1064        }
1065        read_unlock(&fd->fd_lock);
1066
1067        return found;
1068}
1069
1070void ll_cl_add(struct file *file, const struct lu_env *env, struct cl_io *io)
1071{
1072        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1073        struct ll_cl_context *lcc = &ll_env_info(env)->lti_io_ctx;
1074
1075        memset(lcc, 0, sizeof(*lcc));
1076        INIT_LIST_HEAD(&lcc->lcc_list);
1077        lcc->lcc_cookie = current;
1078        lcc->lcc_env = env;
1079        lcc->lcc_io = io;
1080
1081        write_lock(&fd->fd_lock);
1082        list_add(&lcc->lcc_list, &fd->fd_lccs);
1083        write_unlock(&fd->fd_lock);
1084}
1085
1086void ll_cl_remove(struct file *file, const struct lu_env *env)
1087{
1088        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1089        struct ll_cl_context *lcc = &ll_env_info(env)->lti_io_ctx;
1090
1091        write_lock(&fd->fd_lock);
1092        list_del_init(&lcc->lcc_list);
1093        write_unlock(&fd->fd_lock);
1094}
1095
1096static int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
1097                           struct cl_page *page)
1098{
1099        struct inode *inode = vvp_object_inode(page->cp_obj);
1100        struct ll_file_data *fd = vvp_env_io(env)->vui_fd;
1101        struct ll_readahead_state *ras = &fd->fd_ras;
1102        struct cl_2queue *queue  = &io->ci_queue;
1103        struct ll_sb_info *sbi = ll_i2sbi(inode);
1104        struct vvp_page *vpg;
1105        bool uptodate;
1106        int rc = 0;
1107
1108        vpg = cl2vvp_page(cl_object_page_slice(page->cp_obj, page));
1109        uptodate = vpg->vpg_defer_uptodate;
1110
1111        if (sbi->ll_ra_info.ra_max_pages_per_file > 0 &&
1112            sbi->ll_ra_info.ra_max_pages > 0) {
1113                struct vvp_io *vio = vvp_env_io(env);
1114                enum ras_update_flags flags = 0;
1115
1116                if (uptodate)
1117                        flags |= LL_RAS_HIT;
1118                if (!vio->vui_ra_valid)
1119                        flags |= LL_RAS_MMAP;
1120                ras_update(sbi, inode, ras, vvp_index(vpg), flags);
1121        }
1122
1123        cl_2queue_init(queue);
1124        if (uptodate) {
1125                vpg->vpg_ra_used = 1;
1126                cl_page_export(env, page, 1);
1127                cl_page_disown(env, io, page);
1128        } else {
1129                cl_page_list_add(&queue->c2_qin, page);
1130        }
1131
1132        if (sbi->ll_ra_info.ra_max_pages_per_file > 0 &&
1133            sbi->ll_ra_info.ra_max_pages > 0) {
1134                int rc2;
1135
1136                rc2 = ll_readahead(env, io, &queue->c2_qin, ras,
1137                                   uptodate);
1138                CDEBUG(D_READA, DFID "%d pages read ahead at %lu\n",
1139                       PFID(ll_inode2fid(inode)), rc2, vvp_index(vpg));
1140        }
1141
1142        if (queue->c2_qin.pl_nr > 0)
1143                rc = cl_io_submit_rw(env, io, CRT_READ, queue);
1144
1145        /*
1146         * Unlock unsent pages in case of error.
1147         */
1148        cl_page_list_disown(env, io, &queue->c2_qin);
1149        cl_2queue_fini(env, queue);
1150
1151        return rc;
1152}
1153
1154int ll_readpage(struct file *file, struct page *vmpage)
1155{
1156        struct cl_object *clob = ll_i2info(file_inode(file))->lli_clob;
1157        struct ll_cl_context *lcc;
1158        const struct lu_env  *env;
1159        struct cl_io   *io;
1160        struct cl_page *page;
1161        int result;
1162
1163        lcc = ll_cl_find(file);
1164        if (!lcc) {
1165                unlock_page(vmpage);
1166                return -EIO;
1167        }
1168
1169        env = lcc->lcc_env;
1170        io = lcc->lcc_io;
1171        LASSERT(io->ci_state == CIS_IO_GOING);
1172        page = cl_page_find(env, clob, vmpage->index, vmpage, CPT_CACHEABLE);
1173        if (!IS_ERR(page)) {
1174                LASSERT(page->cp_type == CPT_CACHEABLE);
1175                if (likely(!PageUptodate(vmpage))) {
1176                        cl_page_assume(env, io, page);
1177                        result = ll_io_read_page(env, io, page);
1178                } else {
1179                        /* Page from a non-object file. */
1180                        unlock_page(vmpage);
1181                        result = 0;
1182                }
1183                cl_page_put(env, page);
1184        } else {
1185                unlock_page(vmpage);
1186                result = PTR_ERR(page);
1187        }
1188        return result;
1189}
1190
1191int ll_page_sync_io(const struct lu_env *env, struct cl_io *io,
1192                    struct cl_page *page, enum cl_req_type crt)
1193{
1194        struct cl_2queue  *queue;
1195        int result;
1196
1197        LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
1198
1199        queue = &io->ci_queue;
1200        cl_2queue_init_page(queue, page);
1201
1202        result = cl_io_submit_sync(env, io, crt, queue, 0);
1203        LASSERT(cl_page_is_owned(page, io));
1204
1205        if (crt == CRT_READ)
1206                /*
1207                 * in CRT_WRITE case page is left locked even in case of
1208                 * error.
1209                 */
1210                cl_page_list_disown(env, io, &queue->c2_qin);
1211        cl_2queue_fini(env, queue);
1212
1213        return result;
1214}
1215