linux/drivers/staging/lustre/lustre/osc/osc_cache.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2012, 2015, Intel Corporation.
  31 *
  32 */
  33/*
  34 * This file is part of Lustre, http://www.lustre.org/
  35 * Lustre is a trademark of Sun Microsystems, Inc.
  36 *
  37 * osc cache management.
  38 *
  39 * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
  40 */
  41
  42#define DEBUG_SUBSYSTEM S_OSC
  43
  44#include "osc_cl_internal.h"
  45#include "osc_internal.h"
  46
  47static int extent_debug; /* set it to be true for more debug */
  48
  49static void osc_update_pending(struct osc_object *obj, int cmd, int delta);
  50static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
  51                           int state);
  52static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
  53                              struct osc_async_page *oap, int sent, int rc);
  54static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
  55                          int cmd);
  56static int osc_refresh_count(const struct lu_env *env,
  57                             struct osc_async_page *oap, int cmd);
  58static int osc_io_unplug_async(const struct lu_env *env,
  59                               struct client_obd *cli, struct osc_object *osc);
  60static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
  61                           unsigned int lost_grant);
  62
  63static void osc_extent_tree_dump0(int level, struct osc_object *obj,
  64                                  const char *func, int line);
  65#define osc_extent_tree_dump(lvl, obj) \
  66        osc_extent_tree_dump0(lvl, obj, __func__, __LINE__)
  67
  68/** \addtogroup osc
  69 *  @{
  70 */
  71
  72/* ------------------ osc extent ------------------ */
  73static inline char *ext_flags(struct osc_extent *ext, char *flags)
  74{
  75        char *buf = flags;
  76        *buf++ = ext->oe_rw ? 'r' : 'w';
  77        if (ext->oe_intree)
  78                *buf++ = 'i';
  79        if (ext->oe_srvlock)
  80                *buf++ = 's';
  81        if (ext->oe_hp)
  82                *buf++ = 'h';
  83        if (ext->oe_urgent)
  84                *buf++ = 'u';
  85        if (ext->oe_memalloc)
  86                *buf++ = 'm';
  87        if (ext->oe_trunc_pending)
  88                *buf++ = 't';
  89        if (ext->oe_fsync_wait)
  90                *buf++ = 'Y';
  91        *buf = 0;
  92        return flags;
  93}
  94
  95static inline char list_empty_marker(struct list_head *list)
  96{
  97        return list_empty(list) ? '-' : '+';
  98}
  99
 100#define EXTSTR       "[%lu -> %lu/%lu]"
 101#define EXTPARA(ext) (ext)->oe_start, (ext)->oe_end, (ext)->oe_max_end
 102static const char *oes_strings[] = {
 103        "inv", "active", "cache", "locking", "lockdone", "rpc", "trunc", NULL };
 104
 105#define OSC_EXTENT_DUMP(lvl, extent, fmt, ...) do {                           \
 106        struct osc_extent *__ext = (extent);                                  \
 107        char __buf[16];                                                       \
 108                                                                              \
 109        CDEBUG(lvl,                                                           \
 110                "extent %p@{" EXTSTR ", "                                     \
 111                "[%d|%d|%c|%s|%s|%p], [%d|%d|%c|%c|%p|%u|%p]} " fmt,          \
 112                /* ----- extent part 0 ----- */                               \
 113                __ext, EXTPARA(__ext),                                        \
 114                /* ----- part 1 ----- */                                      \
 115                atomic_read(&__ext->oe_refc),                                 \
 116                atomic_read(&__ext->oe_users),                                \
 117                list_empty_marker(&__ext->oe_link),                           \
 118                oes_strings[__ext->oe_state], ext_flags(__ext, __buf),        \
 119                __ext->oe_obj,                                                \
 120                /* ----- part 2 ----- */                                      \
 121                __ext->oe_grants, __ext->oe_nr_pages,                         \
 122                list_empty_marker(&__ext->oe_pages),                          \
 123                waitqueue_active(&__ext->oe_waitq) ? '+' : '-',               \
 124                __ext->oe_osclock, __ext->oe_mppr, __ext->oe_owner,           \
 125                /* ----- part 4 ----- */                                      \
 126                ## __VA_ARGS__);                                              \
 127} while (0)
 128
 129#undef EASSERTF
 130#define EASSERTF(expr, ext, fmt, args...) do {                          \
 131        if (!(expr)) {                                                  \
 132                OSC_EXTENT_DUMP(D_ERROR, (ext), fmt, ##args);           \
 133                osc_extent_tree_dump(D_ERROR, (ext)->oe_obj);           \
 134                LASSERT(expr);                                          \
 135        }                                                               \
 136} while (0)
 137
 138#undef EASSERT
 139#define EASSERT(expr, ext) EASSERTF(expr, ext, "\n")
 140
 141static inline struct osc_extent *rb_extent(struct rb_node *n)
 142{
 143        if (!n)
 144                return NULL;
 145
 146        return container_of(n, struct osc_extent, oe_node);
 147}
 148
 149static inline struct osc_extent *next_extent(struct osc_extent *ext)
 150{
 151        if (!ext)
 152                return NULL;
 153
 154        LASSERT(ext->oe_intree);
 155        return rb_extent(rb_next(&ext->oe_node));
 156}
 157
 158static inline struct osc_extent *prev_extent(struct osc_extent *ext)
 159{
 160        if (!ext)
 161                return NULL;
 162
 163        LASSERT(ext->oe_intree);
 164        return rb_extent(rb_prev(&ext->oe_node));
 165}
 166
 167static inline struct osc_extent *first_extent(struct osc_object *obj)
 168{
 169        return rb_extent(rb_first(&obj->oo_root));
 170}
 171
 172/* object must be locked by caller. */
 173static int osc_extent_sanity_check0(struct osc_extent *ext,
 174                                    const char *func, const int line)
 175{
 176        struct osc_object *obj = ext->oe_obj;
 177        struct osc_async_page *oap;
 178        int page_count;
 179        int rc = 0;
 180
 181        if (!osc_object_is_locked(obj)) {
 182                rc = 9;
 183                goto out;
 184        }
 185
 186        if (ext->oe_state >= OES_STATE_MAX) {
 187                rc = 10;
 188                goto out;
 189        }
 190
 191        if (atomic_read(&ext->oe_refc) <= 0) {
 192                rc = 20;
 193                goto out;
 194        }
 195
 196        if (atomic_read(&ext->oe_refc) < atomic_read(&ext->oe_users)) {
 197                rc = 30;
 198                goto out;
 199        }
 200
 201        switch (ext->oe_state) {
 202        case OES_INV:
 203                if (ext->oe_nr_pages > 0 || !list_empty(&ext->oe_pages))
 204                        rc = 35;
 205                else
 206                        rc = 0;
 207                goto out;
 208        case OES_ACTIVE:
 209                if (atomic_read(&ext->oe_users) == 0) {
 210                        rc = 40;
 211                        goto out;
 212                }
 213                if (ext->oe_hp) {
 214                        rc = 50;
 215                        goto out;
 216                }
 217                if (ext->oe_fsync_wait && !ext->oe_urgent) {
 218                        rc = 55;
 219                        goto out;
 220                }
 221                break;
 222        case OES_CACHE:
 223                if (ext->oe_grants == 0) {
 224                        rc = 60;
 225                        goto out;
 226                }
 227                if (ext->oe_fsync_wait && !ext->oe_urgent && !ext->oe_hp) {
 228                        rc = 65;
 229                        goto out;
 230                }
 231        default:
 232                if (atomic_read(&ext->oe_users) > 0) {
 233                        rc = 70;
 234                        goto out;
 235                }
 236        }
 237
 238        if (ext->oe_max_end < ext->oe_end || ext->oe_end < ext->oe_start) {
 239                rc = 80;
 240                goto out;
 241        }
 242
 243        if (!ext->oe_osclock && ext->oe_grants > 0) {
 244                rc = 90;
 245                goto out;
 246        }
 247
 248        if (ext->oe_osclock) {
 249                struct cl_lock_descr *descr;
 250
 251                descr = &ext->oe_osclock->cll_descr;
 252                if (!(descr->cld_start <= ext->oe_start &&
 253                      descr->cld_end >= ext->oe_max_end)) {
 254                        rc = 100;
 255                        goto out;
 256                }
 257        }
 258
 259        if (ext->oe_nr_pages > ext->oe_mppr) {
 260                rc = 105;
 261                goto out;
 262        }
 263
 264        /* Do not verify page list if extent is in RPC. This is because an
 265         * in-RPC extent is supposed to be exclusively accessible w/o lock.
 266         */
 267        if (ext->oe_state > OES_CACHE) {
 268                rc = 0;
 269                goto out;
 270        }
 271
 272        if (!extent_debug) {
 273                rc = 0;
 274                goto out;
 275        }
 276
 277        page_count = 0;
 278        list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
 279                pgoff_t index = oap2cl_page(oap)->cp_index;
 280                ++page_count;
 281                if (index > ext->oe_end || index < ext->oe_start) {
 282                        rc = 110;
 283                        goto out;
 284                }
 285        }
 286        if (page_count != ext->oe_nr_pages) {
 287                rc = 120;
 288                goto out;
 289        }
 290
 291out:
 292        if (rc != 0)
 293                OSC_EXTENT_DUMP(D_ERROR, ext,
 294                                "%s:%d sanity check %p failed with rc = %d\n",
 295                                func, line, ext, rc);
 296        return rc;
 297}
 298
 299#define sanity_check_nolock(ext) \
 300        osc_extent_sanity_check0(ext, __func__, __LINE__)
 301
 302#define sanity_check(ext) ({                                            \
 303        int __res;                                                      \
 304        osc_object_lock((ext)->oe_obj);                                 \
 305        __res = sanity_check_nolock(ext);                               \
 306        osc_object_unlock((ext)->oe_obj);                               \
 307        __res;                                                          \
 308})
 309
 310/**
 311 * sanity check - to make sure there is no overlapped extent in the tree.
 312 */
 313static int osc_extent_is_overlapped(struct osc_object *obj,
 314                                    struct osc_extent *ext)
 315{
 316        struct osc_extent *tmp;
 317
 318        LASSERT(osc_object_is_locked(obj));
 319
 320        if (!extent_debug)
 321                return 0;
 322
 323        for (tmp = first_extent(obj); tmp; tmp = next_extent(tmp)) {
 324                if (tmp == ext)
 325                        continue;
 326                if (tmp->oe_end >= ext->oe_start &&
 327                    tmp->oe_start <= ext->oe_end)
 328                        return 1;
 329        }
 330        return 0;
 331}
 332
 333static void osc_extent_state_set(struct osc_extent *ext, int state)
 334{
 335        LASSERT(osc_object_is_locked(ext->oe_obj));
 336        LASSERT(state >= OES_INV && state < OES_STATE_MAX);
 337
 338        /* Never try to sanity check a state changing extent :-) */
 339        /* LASSERT(sanity_check_nolock(ext) == 0); */
 340
 341        /* TODO: validate the state machine */
 342        ext->oe_state = state;
 343        wake_up_all(&ext->oe_waitq);
 344}
 345
 346static struct osc_extent *osc_extent_alloc(struct osc_object *obj)
 347{
 348        struct osc_extent *ext;
 349
 350        ext = kmem_cache_zalloc(osc_extent_kmem, GFP_NOFS);
 351        if (!ext)
 352                return NULL;
 353
 354        RB_CLEAR_NODE(&ext->oe_node);
 355        ext->oe_obj = obj;
 356        atomic_set(&ext->oe_refc, 1);
 357        atomic_set(&ext->oe_users, 0);
 358        INIT_LIST_HEAD(&ext->oe_link);
 359        ext->oe_state = OES_INV;
 360        INIT_LIST_HEAD(&ext->oe_pages);
 361        init_waitqueue_head(&ext->oe_waitq);
 362        ext->oe_osclock = NULL;
 363
 364        return ext;
 365}
 366
 367static void osc_extent_free(struct osc_extent *ext)
 368{
 369        kmem_cache_free(osc_extent_kmem, ext);
 370}
 371
 372static struct osc_extent *osc_extent_get(struct osc_extent *ext)
 373{
 374        LASSERT(atomic_read(&ext->oe_refc) >= 0);
 375        atomic_inc(&ext->oe_refc);
 376        return ext;
 377}
 378
 379static void osc_extent_put(const struct lu_env *env, struct osc_extent *ext)
 380{
 381        LASSERT(atomic_read(&ext->oe_refc) > 0);
 382        if (atomic_dec_and_test(&ext->oe_refc)) {
 383                LASSERT(list_empty(&ext->oe_link));
 384                LASSERT(atomic_read(&ext->oe_users) == 0);
 385                LASSERT(ext->oe_state == OES_INV);
 386                LASSERT(!ext->oe_intree);
 387
 388                if (ext->oe_osclock) {
 389                        cl_lock_put(env, ext->oe_osclock);
 390                        ext->oe_osclock = NULL;
 391                }
 392                osc_extent_free(ext);
 393        }
 394}
 395
 396/**
 397 * osc_extent_put_trust() is a special version of osc_extent_put() when
 398 * it's known that the caller is not the last user. This is to address the
 399 * problem of lacking of lu_env ;-).
 400 */
 401static void osc_extent_put_trust(struct osc_extent *ext)
 402{
 403        LASSERT(atomic_read(&ext->oe_refc) > 1);
 404        LASSERT(osc_object_is_locked(ext->oe_obj));
 405        atomic_dec(&ext->oe_refc);
 406}
 407
 408/**
 409 * Return the extent which includes pgoff @index, or return the greatest
 410 * previous extent in the tree.
 411 */
 412static struct osc_extent *osc_extent_search(struct osc_object *obj,
 413                                            pgoff_t index)
 414{
 415        struct rb_node *n = obj->oo_root.rb_node;
 416        struct osc_extent *tmp, *p = NULL;
 417
 418        LASSERT(osc_object_is_locked(obj));
 419        while (n) {
 420                tmp = rb_extent(n);
 421                if (index < tmp->oe_start) {
 422                        n = n->rb_left;
 423                } else if (index > tmp->oe_end) {
 424                        p = rb_extent(n);
 425                        n = n->rb_right;
 426                } else {
 427                        return tmp;
 428                }
 429        }
 430        return p;
 431}
 432
 433/*
 434 * Return the extent covering @index, otherwise return NULL.
 435 * caller must have held object lock.
 436 */
 437static struct osc_extent *osc_extent_lookup(struct osc_object *obj,
 438                                            pgoff_t index)
 439{
 440        struct osc_extent *ext;
 441
 442        ext = osc_extent_search(obj, index);
 443        if (ext && ext->oe_start <= index && index <= ext->oe_end)
 444                return osc_extent_get(ext);
 445        return NULL;
 446}
 447
 448/* caller must have held object lock. */
 449static void osc_extent_insert(struct osc_object *obj, struct osc_extent *ext)
 450{
 451        struct rb_node **n = &obj->oo_root.rb_node;
 452        struct rb_node *parent = NULL;
 453        struct osc_extent *tmp;
 454
 455        LASSERT(ext->oe_intree == 0);
 456        LASSERT(ext->oe_obj == obj);
 457        LASSERT(osc_object_is_locked(obj));
 458        while (*n) {
 459                tmp = rb_extent(*n);
 460                parent = *n;
 461
 462                if (ext->oe_end < tmp->oe_start)
 463                        n = &(*n)->rb_left;
 464                else if (ext->oe_start > tmp->oe_end)
 465                        n = &(*n)->rb_right;
 466                else
 467                        EASSERTF(0, tmp, EXTSTR"\n", EXTPARA(ext));
 468        }
 469        rb_link_node(&ext->oe_node, parent, n);
 470        rb_insert_color(&ext->oe_node, &obj->oo_root);
 471        osc_extent_get(ext);
 472        ext->oe_intree = 1;
 473}
 474
 475/* caller must have held object lock. */
 476static void osc_extent_erase(struct osc_extent *ext)
 477{
 478        struct osc_object *obj = ext->oe_obj;
 479
 480        LASSERT(osc_object_is_locked(obj));
 481        if (ext->oe_intree) {
 482                rb_erase(&ext->oe_node, &obj->oo_root);
 483                ext->oe_intree = 0;
 484                /* rbtree held a refcount */
 485                osc_extent_put_trust(ext);
 486        }
 487}
 488
 489static struct osc_extent *osc_extent_hold(struct osc_extent *ext)
 490{
 491        struct osc_object *obj = ext->oe_obj;
 492
 493        LASSERT(osc_object_is_locked(obj));
 494        LASSERT(ext->oe_state == OES_ACTIVE || ext->oe_state == OES_CACHE);
 495        if (ext->oe_state == OES_CACHE) {
 496                osc_extent_state_set(ext, OES_ACTIVE);
 497                osc_update_pending(obj, OBD_BRW_WRITE, -ext->oe_nr_pages);
 498        }
 499        atomic_inc(&ext->oe_users);
 500        list_del_init(&ext->oe_link);
 501        return osc_extent_get(ext);
 502}
 503
 504static void __osc_extent_remove(struct osc_extent *ext)
 505{
 506        LASSERT(osc_object_is_locked(ext->oe_obj));
 507        LASSERT(list_empty(&ext->oe_pages));
 508        osc_extent_erase(ext);
 509        list_del_init(&ext->oe_link);
 510        osc_extent_state_set(ext, OES_INV);
 511        OSC_EXTENT_DUMP(D_CACHE, ext, "destroyed.\n");
 512}
 513
 514static void osc_extent_remove(struct osc_extent *ext)
 515{
 516        struct osc_object *obj = ext->oe_obj;
 517
 518        osc_object_lock(obj);
 519        __osc_extent_remove(ext);
 520        osc_object_unlock(obj);
 521}
 522
 523/**
 524 * This function is used to merge extents to get better performance. It checks
 525 * if @cur and @victim are contiguous at chunk level.
 526 */
 527static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
 528                            struct osc_extent *victim)
 529{
 530        struct osc_object *obj = cur->oe_obj;
 531        pgoff_t chunk_start;
 532        pgoff_t chunk_end;
 533        int ppc_bits;
 534
 535        LASSERT(cur->oe_state == OES_CACHE);
 536        LASSERT(osc_object_is_locked(obj));
 537        if (!victim)
 538                return -EINVAL;
 539
 540        if (victim->oe_state != OES_CACHE || victim->oe_fsync_wait)
 541                return -EBUSY;
 542
 543        if (cur->oe_max_end != victim->oe_max_end)
 544                return -ERANGE;
 545
 546        LASSERT(cur->oe_osclock == victim->oe_osclock);
 547        ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_SHIFT;
 548        chunk_start = cur->oe_start >> ppc_bits;
 549        chunk_end = cur->oe_end >> ppc_bits;
 550        if (chunk_start != (victim->oe_end >> ppc_bits) + 1 &&
 551            chunk_end + 1 != victim->oe_start >> ppc_bits)
 552                return -ERANGE;
 553
 554        OSC_EXTENT_DUMP(D_CACHE, victim, "will be merged by %p.\n", cur);
 555
 556        cur->oe_start = min(cur->oe_start, victim->oe_start);
 557        cur->oe_end = max(cur->oe_end, victim->oe_end);
 558        cur->oe_grants += victim->oe_grants;
 559        cur->oe_nr_pages += victim->oe_nr_pages;
 560        /* only the following bits are needed to merge */
 561        cur->oe_urgent |= victim->oe_urgent;
 562        cur->oe_memalloc |= victim->oe_memalloc;
 563        list_splice_init(&victim->oe_pages, &cur->oe_pages);
 564        list_del_init(&victim->oe_link);
 565        victim->oe_nr_pages = 0;
 566
 567        osc_extent_get(victim);
 568        __osc_extent_remove(victim);
 569        osc_extent_put(env, victim);
 570
 571        OSC_EXTENT_DUMP(D_CACHE, cur, "after merging %p.\n", victim);
 572        return 0;
 573}
 574
 575/**
 576 * Drop user count of osc_extent, and unplug IO asynchronously.
 577 */
 578void osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
 579{
 580        struct osc_object *obj = ext->oe_obj;
 581
 582        LASSERT(atomic_read(&ext->oe_users) > 0);
 583        LASSERT(sanity_check(ext) == 0);
 584        LASSERT(ext->oe_grants > 0);
 585
 586        if (atomic_dec_and_lock(&ext->oe_users, &obj->oo_lock)) {
 587                LASSERT(ext->oe_state == OES_ACTIVE);
 588                if (ext->oe_trunc_pending) {
 589                        /* a truncate process is waiting for this extent.
 590                         * This may happen due to a race, check
 591                         * osc_cache_truncate_start().
 592                         */
 593                        osc_extent_state_set(ext, OES_TRUNC);
 594                        ext->oe_trunc_pending = 0;
 595                } else {
 596                        osc_extent_state_set(ext, OES_CACHE);
 597                        osc_update_pending(obj, OBD_BRW_WRITE,
 598                                           ext->oe_nr_pages);
 599
 600                        /* try to merge the previous and next extent. */
 601                        osc_extent_merge(env, ext, prev_extent(ext));
 602                        osc_extent_merge(env, ext, next_extent(ext));
 603
 604                        if (ext->oe_urgent)
 605                                list_move_tail(&ext->oe_link,
 606                                               &obj->oo_urgent_exts);
 607                }
 608                osc_object_unlock(obj);
 609
 610                osc_io_unplug_async(env, osc_cli(obj), obj);
 611        }
 612        osc_extent_put(env, ext);
 613}
 614
 615static inline int overlapped(struct osc_extent *ex1, struct osc_extent *ex2)
 616{
 617        return !(ex1->oe_end < ex2->oe_start || ex2->oe_end < ex1->oe_start);
 618}
 619
 620/**
 621 * Find or create an extent which includes @index, core function to manage
 622 * extent tree.
 623 */
 624static struct osc_extent *osc_extent_find(const struct lu_env *env,
 625                                          struct osc_object *obj, pgoff_t index,
 626                                          int *grants)
 627
 628{
 629        struct client_obd *cli = osc_cli(obj);
 630        struct cl_lock *lock;
 631        struct osc_extent *cur;
 632        struct osc_extent *ext;
 633        struct osc_extent *conflict = NULL;
 634        struct osc_extent *found = NULL;
 635        pgoff_t chunk;
 636        pgoff_t max_end;
 637        int max_pages; /* max_pages_per_rpc */
 638        int chunksize;
 639        int ppc_bits; /* pages per chunk bits */
 640        int chunk_mask;
 641        int rc;
 642
 643        cur = osc_extent_alloc(obj);
 644        if (!cur)
 645                return ERR_PTR(-ENOMEM);
 646
 647        lock = cl_lock_at_pgoff(env, osc2cl(obj), index, NULL, 1, 0);
 648        LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
 649
 650        LASSERT(cli->cl_chunkbits >= PAGE_SHIFT);
 651        ppc_bits = cli->cl_chunkbits - PAGE_SHIFT;
 652        chunk_mask = ~((1 << ppc_bits) - 1);
 653        chunksize = 1 << cli->cl_chunkbits;
 654        chunk = index >> ppc_bits;
 655
 656        /* align end to rpc edge, rpc size may not be a power 2 integer. */
 657        max_pages = cli->cl_max_pages_per_rpc;
 658        LASSERT((max_pages & ~chunk_mask) == 0);
 659        max_end = index - (index % max_pages) + max_pages - 1;
 660        max_end = min_t(pgoff_t, max_end, lock->cll_descr.cld_end);
 661
 662        /* initialize new extent by parameters so far */
 663        cur->oe_max_end = max_end;
 664        cur->oe_start = index & chunk_mask;
 665        cur->oe_end = ((index + ~chunk_mask + 1) & chunk_mask) - 1;
 666        if (cur->oe_start < lock->cll_descr.cld_start)
 667                cur->oe_start = lock->cll_descr.cld_start;
 668        if (cur->oe_end > max_end)
 669                cur->oe_end = max_end;
 670        cur->oe_osclock = lock;
 671        cur->oe_grants = 0;
 672        cur->oe_mppr = max_pages;
 673
 674        /* grants has been allocated by caller */
 675        LASSERTF(*grants >= chunksize + cli->cl_extent_tax,
 676                 "%u/%u/%u.\n", *grants, chunksize, cli->cl_extent_tax);
 677        LASSERTF((max_end - cur->oe_start) < max_pages, EXTSTR"\n",
 678                 EXTPARA(cur));
 679
 680restart:
 681        osc_object_lock(obj);
 682        ext = osc_extent_search(obj, cur->oe_start);
 683        if (!ext)
 684                ext = first_extent(obj);
 685        while (ext) {
 686                loff_t ext_chk_start = ext->oe_start >> ppc_bits;
 687                loff_t ext_chk_end = ext->oe_end >> ppc_bits;
 688
 689                LASSERT(sanity_check_nolock(ext) == 0);
 690                if (chunk > ext_chk_end + 1)
 691                        break;
 692
 693                /* if covering by different locks, no chance to match */
 694                if (lock != ext->oe_osclock) {
 695                        EASSERTF(!overlapped(ext, cur), ext,
 696                                 EXTSTR"\n", EXTPARA(cur));
 697
 698                        ext = next_extent(ext);
 699                        continue;
 700                }
 701
 702                /* discontiguous chunks? */
 703                if (chunk + 1 < ext_chk_start) {
 704                        ext = next_extent(ext);
 705                        continue;
 706                }
 707
 708                /* ok, from now on, ext and cur have these attrs:
 709                 * 1. covered by the same lock
 710                 * 2. contiguous at chunk level or overlapping.
 711                 */
 712
 713                if (overlapped(ext, cur)) {
 714                        /* cur is the minimum unit, so overlapping means
 715                         * full contain.
 716                         */
 717                        EASSERTF((ext->oe_start <= cur->oe_start &&
 718                                  ext->oe_end >= cur->oe_end),
 719                                 ext, EXTSTR"\n", EXTPARA(cur));
 720
 721                        if (ext->oe_state > OES_CACHE || ext->oe_fsync_wait) {
 722                                /* for simplicity, we wait for this extent to
 723                                 * finish before going forward.
 724                                 */
 725                                conflict = osc_extent_get(ext);
 726                                break;
 727                        }
 728
 729                        found = osc_extent_hold(ext);
 730                        break;
 731                }
 732
 733                /* non-overlapped extent */
 734                if (ext->oe_state != OES_CACHE || ext->oe_fsync_wait) {
 735                        /* we can't do anything for a non OES_CACHE extent, or
 736                         * if there is someone waiting for this extent to be
 737                         * flushed, try next one.
 738                         */
 739                        ext = next_extent(ext);
 740                        continue;
 741                }
 742
 743                /* check if they belong to the same rpc slot before trying to
 744                 * merge. the extents are not overlapped and contiguous at
 745                 * chunk level to get here.
 746                 */
 747                if (ext->oe_max_end != max_end) {
 748                        /* if they don't belong to the same RPC slot or
 749                         * max_pages_per_rpc has ever changed, do not merge.
 750                         */
 751                        ext = next_extent(ext);
 752                        continue;
 753                }
 754
 755                /* it's required that an extent must be contiguous at chunk
 756                 * level so that we know the whole extent is covered by grant
 757                 * (the pages in the extent are NOT required to be contiguous).
 758                 * Otherwise, it will be too much difficult to know which
 759                 * chunks have grants allocated.
 760                 */
 761
 762                /* try to do front merge - extend ext's start */
 763                if (chunk + 1 == ext_chk_start) {
 764                        /* ext must be chunk size aligned */
 765                        EASSERT((ext->oe_start & ~chunk_mask) == 0, ext);
 766
 767                        /* pull ext's start back to cover cur */
 768                        ext->oe_start = cur->oe_start;
 769                        ext->oe_grants += chunksize;
 770                        *grants -= chunksize;
 771
 772                        found = osc_extent_hold(ext);
 773                } else if (chunk == ext_chk_end + 1) {
 774                        /* rear merge */
 775                        ext->oe_end = cur->oe_end;
 776                        ext->oe_grants += chunksize;
 777                        *grants -= chunksize;
 778
 779                        /* try to merge with the next one because we just fill
 780                         * in a gap
 781                         */
 782                        if (osc_extent_merge(env, ext, next_extent(ext)) == 0)
 783                                /* we can save extent tax from next extent */
 784                                *grants += cli->cl_extent_tax;
 785
 786                        found = osc_extent_hold(ext);
 787                }
 788                if (found)
 789                        break;
 790
 791                ext = next_extent(ext);
 792        }
 793
 794        osc_extent_tree_dump(D_CACHE, obj);
 795        if (found) {
 796                LASSERT(!conflict);
 797                if (!IS_ERR(found)) {
 798                        LASSERT(found->oe_osclock == cur->oe_osclock);
 799                        OSC_EXTENT_DUMP(D_CACHE, found,
 800                                        "found caching ext for %lu.\n", index);
 801                }
 802        } else if (!conflict) {
 803                /* create a new extent */
 804                EASSERT(osc_extent_is_overlapped(obj, cur) == 0, cur);
 805                cur->oe_grants = chunksize + cli->cl_extent_tax;
 806                *grants -= cur->oe_grants;
 807                LASSERT(*grants >= 0);
 808
 809                cur->oe_state = OES_CACHE;
 810                found = osc_extent_hold(cur);
 811                osc_extent_insert(obj, cur);
 812                OSC_EXTENT_DUMP(D_CACHE, cur, "add into tree %lu/%lu.\n",
 813                                index, lock->cll_descr.cld_end);
 814        }
 815        osc_object_unlock(obj);
 816
 817        if (conflict) {
 818                LASSERT(!found);
 819
 820                /* waiting for IO to finish. Please notice that it's impossible
 821                 * to be an OES_TRUNC extent.
 822                 */
 823                rc = osc_extent_wait(env, conflict, OES_INV);
 824                osc_extent_put(env, conflict);
 825                conflict = NULL;
 826                if (rc < 0) {
 827                        found = ERR_PTR(rc);
 828                        goto out;
 829                }
 830
 831                goto restart;
 832        }
 833
 834out:
 835        osc_extent_put(env, cur);
 836        LASSERT(*grants >= 0);
 837        return found;
 838}
 839
 840/**
 841 * Called when IO is finished to an extent.
 842 */
 843int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
 844                      int sent, int rc)
 845{
 846        struct client_obd *cli = osc_cli(ext->oe_obj);
 847        struct osc_async_page *oap;
 848        struct osc_async_page *tmp;
 849        int nr_pages = ext->oe_nr_pages;
 850        int lost_grant = 0;
 851        int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
 852        __u64 last_off = 0;
 853        int last_count = -1;
 854
 855        OSC_EXTENT_DUMP(D_CACHE, ext, "extent finished.\n");
 856
 857        ext->oe_rc = rc ?: ext->oe_nr_pages;
 858        EASSERT(ergo(rc == 0, ext->oe_state == OES_RPC), ext);
 859        list_for_each_entry_safe(oap, tmp, &ext->oe_pages, oap_pending_item) {
 860                list_del_init(&oap->oap_rpc_item);
 861                list_del_init(&oap->oap_pending_item);
 862                if (last_off <= oap->oap_obj_off) {
 863                        last_off = oap->oap_obj_off;
 864                        last_count = oap->oap_count;
 865                }
 866
 867                --ext->oe_nr_pages;
 868                osc_ap_completion(env, cli, oap, sent, rc);
 869        }
 870        EASSERT(ext->oe_nr_pages == 0, ext);
 871
 872        if (!sent) {
 873                lost_grant = ext->oe_grants;
 874        } else if (blocksize < PAGE_SIZE &&
 875                   last_count != PAGE_SIZE) {
 876                /* For short writes we shouldn't count parts of pages that
 877                 * span a whole chunk on the OST side, or our accounting goes
 878                 * wrong.  Should match the code in filter_grant_check.
 879                 */
 880                int offset = oap->oap_page_off & ~CFS_PAGE_MASK;
 881                int count = oap->oap_count + (offset & (blocksize - 1));
 882                int end = (offset + oap->oap_count) & (blocksize - 1);
 883
 884                if (end)
 885                        count += blocksize - end;
 886
 887                lost_grant = PAGE_SIZE - count;
 888        }
 889        if (ext->oe_grants > 0)
 890                osc_free_grant(cli, nr_pages, lost_grant);
 891
 892        osc_extent_remove(ext);
 893        /* put the refcount for RPC */
 894        osc_extent_put(env, ext);
 895        return 0;
 896}
 897
 898static int extent_wait_cb(struct osc_extent *ext, int state)
 899{
 900        int ret;
 901
 902        osc_object_lock(ext->oe_obj);
 903        ret = ext->oe_state == state;
 904        osc_object_unlock(ext->oe_obj);
 905
 906        return ret;
 907}
 908
 909/**
 910 * Wait for the extent's state to become @state.
 911 */
 912static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
 913                           int state)
 914{
 915        struct osc_object *obj = ext->oe_obj;
 916        struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL,
 917                                                  LWI_ON_SIGNAL_NOOP, NULL);
 918        int rc = 0;
 919
 920        osc_object_lock(obj);
 921        LASSERT(sanity_check_nolock(ext) == 0);
 922        /* `Kick' this extent only if the caller is waiting for it to be
 923         * written out.
 924         */
 925        if (state == OES_INV && !ext->oe_urgent && !ext->oe_hp &&
 926            !ext->oe_trunc_pending) {
 927                if (ext->oe_state == OES_ACTIVE) {
 928                        ext->oe_urgent = 1;
 929                } else if (ext->oe_state == OES_CACHE) {
 930                        ext->oe_urgent = 1;
 931                        osc_extent_hold(ext);
 932                        rc = 1;
 933                }
 934        }
 935        osc_object_unlock(obj);
 936        if (rc == 1)
 937                osc_extent_release(env, ext);
 938
 939        /* wait for the extent until its state becomes @state */
 940        rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state), &lwi);
 941        if (rc == -ETIMEDOUT) {
 942                OSC_EXTENT_DUMP(D_ERROR, ext,
 943                        "%s: wait ext to %d timedout, recovery in progress?\n",
 944                        osc_export(obj)->exp_obd->obd_name, state);
 945
 946                lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
 947                rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state),
 948                                  &lwi);
 949        }
 950        if (rc == 0 && ext->oe_rc < 0)
 951                rc = ext->oe_rc;
 952        return rc;
 953}
 954
 955/**
 956 * Discard pages with index greater than @size. If @ext is overlapped with
 957 * @size, then partial truncate happens.
 958 */
 959static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
 960                               bool partial)
 961{
 962        struct cl_env_nest nest;
 963        struct lu_env *env;
 964        struct cl_io *io;
 965        struct osc_object *obj = ext->oe_obj;
 966        struct client_obd *cli = osc_cli(obj);
 967        struct osc_async_page *oap;
 968        struct osc_async_page *tmp;
 969        int pages_in_chunk = 0;
 970        int ppc_bits = cli->cl_chunkbits - PAGE_SHIFT;
 971        __u64 trunc_chunk = trunc_index >> ppc_bits;
 972        int grants = 0;
 973        int nr_pages = 0;
 974        int rc = 0;
 975
 976        LASSERT(sanity_check(ext) == 0);
 977        EASSERT(ext->oe_state == OES_TRUNC, ext);
 978        EASSERT(!ext->oe_urgent, ext);
 979
 980        /* Request new lu_env.
 981         * We can't use that env from osc_cache_truncate_start() because
 982         * it's from lov_io_sub and not fully initialized.
 983         */
 984        env = cl_env_nested_get(&nest);
 985        io  = &osc_env_info(env)->oti_io;
 986        io->ci_obj = cl_object_top(osc2cl(obj));
 987        rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
 988        if (rc < 0)
 989                goto out;
 990
 991        /* discard all pages with index greater then trunc_index */
 992        list_for_each_entry_safe(oap, tmp, &ext->oe_pages, oap_pending_item) {
 993                struct cl_page *sub = oap2cl_page(oap);
 994                struct cl_page *page = cl_page_top(sub);
 995
 996                LASSERT(list_empty(&oap->oap_rpc_item));
 997
 998                /* only discard the pages with their index greater than
 999                 * trunc_index, and ...
1000                 */
1001                if (sub->cp_index < trunc_index ||
1002                    (sub->cp_index == trunc_index && partial)) {
1003                        /* accounting how many pages remaining in the chunk
1004                         * so that we can calculate grants correctly. */
1005                        if (sub->cp_index >> ppc_bits == trunc_chunk)
1006                                ++pages_in_chunk;
1007                        continue;
1008                }
1009
1010                list_del_init(&oap->oap_pending_item);
1011
1012                cl_page_get(page);
1013                lu_ref_add(&page->cp_reference, "truncate", current);
1014
1015                if (cl_page_own(env, io, page) == 0) {
1016                        cl_page_unmap(env, io, page);
1017                        cl_page_discard(env, io, page);
1018                        cl_page_disown(env, io, page);
1019                } else {
1020                        LASSERT(page->cp_state == CPS_FREEING);
1021                        LASSERT(0);
1022                }
1023
1024                lu_ref_del(&page->cp_reference, "truncate", current);
1025                cl_page_put(env, page);
1026
1027                --ext->oe_nr_pages;
1028                ++nr_pages;
1029        }
1030        EASSERTF(ergo(ext->oe_start >= trunc_index + !!partial,
1031                      ext->oe_nr_pages == 0),
1032                ext, "trunc_index %lu, partial %d\n", trunc_index, partial);
1033
1034        osc_object_lock(obj);
1035        if (ext->oe_nr_pages == 0) {
1036                LASSERT(pages_in_chunk == 0);
1037                grants = ext->oe_grants;
1038                ext->oe_grants = 0;
1039        } else { /* calculate how many grants we can free */
1040                int chunks = (ext->oe_end >> ppc_bits) - trunc_chunk;
1041                pgoff_t last_index;
1042
1043                /* if there is no pages in this chunk, we can also free grants
1044                 * for the last chunk
1045                 */
1046                if (pages_in_chunk == 0) {
1047                        /* if this is the 1st chunk and no pages in this chunk,
1048                         * ext->oe_nr_pages must be zero, so we should be in
1049                         * the other if-clause.
1050                         */
1051                        LASSERT(trunc_chunk > 0);
1052                        --trunc_chunk;
1053                        ++chunks;
1054                }
1055
1056                /* this is what we can free from this extent */
1057                grants = chunks << cli->cl_chunkbits;
1058                ext->oe_grants -= grants;
1059                last_index = ((trunc_chunk + 1) << ppc_bits) - 1;
1060                ext->oe_end = min(last_index, ext->oe_max_end);
1061                LASSERT(ext->oe_end >= ext->oe_start);
1062                LASSERT(ext->oe_grants > 0);
1063        }
1064        osc_object_unlock(obj);
1065
1066        if (grants > 0 || nr_pages > 0)
1067                osc_free_grant(cli, nr_pages, grants);
1068
1069out:
1070        cl_io_fini(env, io);
1071        cl_env_nested_put(&nest, env);
1072        return rc;
1073}
1074
1075/**
1076 * This function is used to make the extent prepared for transfer.
1077 * A race with flushing page - ll_writepage() has to be handled cautiously.
1078 */
1079static int osc_extent_make_ready(const struct lu_env *env,
1080                                 struct osc_extent *ext)
1081{
1082        struct osc_async_page *oap;
1083        struct osc_async_page *last = NULL;
1084        struct osc_object *obj = ext->oe_obj;
1085        int page_count = 0;
1086        int rc;
1087
1088        /* we're going to grab page lock, so object lock must not be taken. */
1089        LASSERT(sanity_check(ext) == 0);
1090        /* in locking state, any process should not touch this extent. */
1091        EASSERT(ext->oe_state == OES_LOCKING, ext);
1092        EASSERT(ext->oe_owner, ext);
1093
1094        OSC_EXTENT_DUMP(D_CACHE, ext, "make ready\n");
1095
1096        list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1097                ++page_count;
1098                if (!last || last->oap_obj_off < oap->oap_obj_off)
1099                        last = oap;
1100
1101                /* checking ASYNC_READY is race safe */
1102                if ((oap->oap_async_flags & ASYNC_READY) != 0)
1103                        continue;
1104
1105                rc = osc_make_ready(env, oap, OBD_BRW_WRITE);
1106                switch (rc) {
1107                case 0:
1108                        spin_lock(&oap->oap_lock);
1109                        oap->oap_async_flags |= ASYNC_READY;
1110                        spin_unlock(&oap->oap_lock);
1111                        break;
1112                case -EALREADY:
1113                        LASSERT((oap->oap_async_flags & ASYNC_READY) != 0);
1114                        break;
1115                default:
1116                        LASSERTF(0, "unknown return code: %d\n", rc);
1117                }
1118        }
1119
1120        LASSERT(page_count == ext->oe_nr_pages);
1121        LASSERT(last);
1122        /* the last page is the only one we need to refresh its count by
1123         * the size of file.
1124         */
1125        if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) {
1126                last->oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
1127                LASSERT(last->oap_count > 0);
1128                LASSERT(last->oap_page_off + last->oap_count <= PAGE_SIZE);
1129                last->oap_async_flags |= ASYNC_COUNT_STABLE;
1130        }
1131
1132        /* for the rest of pages, we don't need to call osf_refresh_count()
1133         * because it's known they are not the last page
1134         */
1135        list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1136                if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
1137                        oap->oap_count = PAGE_SIZE - oap->oap_page_off;
1138                        oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1139                }
1140        }
1141
1142        osc_object_lock(obj);
1143        osc_extent_state_set(ext, OES_RPC);
1144        osc_object_unlock(obj);
1145        /* get a refcount for RPC. */
1146        osc_extent_get(ext);
1147
1148        return 0;
1149}
1150
1151/**
1152 * Quick and simple version of osc_extent_find(). This function is frequently
1153 * called to expand the extent for the same IO. To expand the extent, the
1154 * page index must be in the same or next chunk of ext->oe_end.
1155 */
1156static int osc_extent_expand(struct osc_extent *ext, pgoff_t index, int *grants)
1157{
1158        struct osc_object *obj = ext->oe_obj;
1159        struct client_obd *cli = osc_cli(obj);
1160        struct osc_extent *next;
1161        int ppc_bits = cli->cl_chunkbits - PAGE_SHIFT;
1162        pgoff_t chunk = index >> ppc_bits;
1163        pgoff_t end_chunk;
1164        pgoff_t end_index;
1165        int chunksize = 1 << cli->cl_chunkbits;
1166        int rc = 0;
1167
1168        LASSERT(ext->oe_max_end >= index && ext->oe_start <= index);
1169        osc_object_lock(obj);
1170        LASSERT(sanity_check_nolock(ext) == 0);
1171        end_chunk = ext->oe_end >> ppc_bits;
1172        if (chunk > end_chunk + 1) {
1173                rc = -ERANGE;
1174                goto out;
1175        }
1176
1177        if (end_chunk >= chunk) {
1178                rc = 0;
1179                goto out;
1180        }
1181
1182        LASSERT(end_chunk + 1 == chunk);
1183        /* try to expand this extent to cover @index */
1184        end_index = min(ext->oe_max_end, ((chunk + 1) << ppc_bits) - 1);
1185
1186        next = next_extent(ext);
1187        if (next && next->oe_start <= end_index) {
1188                /* complex mode - overlapped with the next extent,
1189                 * this case will be handled by osc_extent_find()
1190                 */
1191                rc = -EAGAIN;
1192                goto out;
1193        }
1194
1195        ext->oe_end = end_index;
1196        ext->oe_grants += chunksize;
1197        *grants -= chunksize;
1198        LASSERT(*grants >= 0);
1199        EASSERTF(osc_extent_is_overlapped(obj, ext) == 0, ext,
1200                 "overlapped after expanding for %lu.\n", index);
1201
1202out:
1203        osc_object_unlock(obj);
1204        return rc;
1205}
1206
1207static void osc_extent_tree_dump0(int level, struct osc_object *obj,
1208                                  const char *func, int line)
1209{
1210        struct osc_extent *ext;
1211        int cnt;
1212
1213        CDEBUG(level, "Dump object %p extents at %s:%d, mppr: %u.\n",
1214               obj, func, line, osc_cli(obj)->cl_max_pages_per_rpc);
1215
1216        /* osc_object_lock(obj); */
1217        cnt = 1;
1218        for (ext = first_extent(obj); ext; ext = next_extent(ext))
1219                OSC_EXTENT_DUMP(level, ext, "in tree %d.\n", cnt++);
1220
1221        cnt = 1;
1222        list_for_each_entry(ext, &obj->oo_hp_exts, oe_link)
1223                OSC_EXTENT_DUMP(level, ext, "hp %d.\n", cnt++);
1224
1225        cnt = 1;
1226        list_for_each_entry(ext, &obj->oo_urgent_exts, oe_link)
1227                OSC_EXTENT_DUMP(level, ext, "urgent %d.\n", cnt++);
1228
1229        cnt = 1;
1230        list_for_each_entry(ext, &obj->oo_reading_exts, oe_link)
1231                OSC_EXTENT_DUMP(level, ext, "reading %d.\n", cnt++);
1232        /* osc_object_unlock(obj); */
1233}
1234
1235/* ------------------ osc extent end ------------------ */
1236
1237static inline int osc_is_ready(struct osc_object *osc)
1238{
1239        return !list_empty(&osc->oo_ready_item) ||
1240               !list_empty(&osc->oo_hp_ready_item);
1241}
1242
1243#define OSC_IO_DEBUG(OSC, STR, args...)                                        \
1244        CDEBUG(D_CACHE, "obj %p ready %d|%c|%c wr %d|%c|%c rd %d|%c " STR,     \
1245               (OSC), osc_is_ready(OSC),                                       \
1246               list_empty_marker(&(OSC)->oo_hp_ready_item),                    \
1247               list_empty_marker(&(OSC)->oo_ready_item),                       \
1248               atomic_read(&(OSC)->oo_nr_writes),                              \
1249               list_empty_marker(&(OSC)->oo_hp_exts),                          \
1250               list_empty_marker(&(OSC)->oo_urgent_exts),                      \
1251               atomic_read(&(OSC)->oo_nr_reads),                               \
1252               list_empty_marker(&(OSC)->oo_reading_exts),                     \
1253               ##args)
1254
1255static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
1256                          int cmd)
1257{
1258        struct osc_page *opg = oap2osc_page(oap);
1259        struct cl_page *page = cl_page_top(oap2cl_page(oap));
1260        int result;
1261
1262        LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */
1263
1264        result = cl_page_make_ready(env, page, CRT_WRITE);
1265        if (result == 0)
1266                opg->ops_submit_time = cfs_time_current();
1267        return result;
1268}
1269
1270static int osc_refresh_count(const struct lu_env *env,
1271                             struct osc_async_page *oap, int cmd)
1272{
1273        struct osc_page *opg = oap2osc_page(oap);
1274        struct cl_page *page = oap2cl_page(oap);
1275        struct cl_object *obj;
1276        struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1277
1278        int result;
1279        loff_t kms;
1280
1281        /* readpage queues with _COUNT_STABLE, shouldn't get here. */
1282        LASSERT(!(cmd & OBD_BRW_READ));
1283        obj = opg->ops_cl.cpl_obj;
1284
1285        cl_object_attr_lock(obj);
1286        result = cl_object_attr_get(env, obj, attr);
1287        cl_object_attr_unlock(obj);
1288        if (result < 0)
1289                return result;
1290        kms = attr->cat_kms;
1291        if (cl_offset(obj, page->cp_index) >= kms)
1292                /* catch race with truncate */
1293                return 0;
1294        else if (cl_offset(obj, page->cp_index + 1) > kms)
1295                /* catch sub-page write at end of file */
1296                return kms % PAGE_SIZE;
1297        else
1298                return PAGE_SIZE;
1299}
1300
1301static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
1302                          int cmd, int rc)
1303{
1304        struct osc_page *opg = oap2osc_page(oap);
1305        struct cl_page *page = cl_page_top(oap2cl_page(oap));
1306        struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
1307        enum cl_req_type crt;
1308        int srvlock;
1309
1310        cmd &= ~OBD_BRW_NOQUOTA;
1311        LASSERT(equi(page->cp_state == CPS_PAGEIN,  cmd == OBD_BRW_READ));
1312        LASSERT(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE));
1313        LASSERT(opg->ops_transfer_pinned);
1314
1315        /*
1316         * page->cp_req can be NULL if io submission failed before
1317         * cl_req was allocated.
1318         */
1319        if (page->cp_req)
1320                cl_req_page_done(env, page);
1321        LASSERT(!page->cp_req);
1322
1323        crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE;
1324        /* Clear opg->ops_transfer_pinned before VM lock is released. */
1325        opg->ops_transfer_pinned = 0;
1326
1327        spin_lock(&obj->oo_seatbelt);
1328        LASSERT(opg->ops_submitter);
1329        LASSERT(!list_empty(&opg->ops_inflight));
1330        list_del_init(&opg->ops_inflight);
1331        opg->ops_submitter = NULL;
1332        spin_unlock(&obj->oo_seatbelt);
1333
1334        opg->ops_submit_time = 0;
1335        srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
1336
1337        /* statistic */
1338        if (rc == 0 && srvlock) {
1339                struct lu_device *ld = opg->ops_cl.cpl_obj->co_lu.lo_dev;
1340                struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
1341                int bytes = oap->oap_count;
1342
1343                if (crt == CRT_READ)
1344                        stats->os_lockless_reads += bytes;
1345                else
1346                        stats->os_lockless_writes += bytes;
1347        }
1348
1349        /*
1350         * This has to be the last operation with the page, as locks are
1351         * released in cl_page_completion() and nothing except for the
1352         * reference counter protects page from concurrent reclaim.
1353         */
1354        lu_ref_del(&page->cp_reference, "transfer", page);
1355
1356        cl_page_completion(env, page, crt, rc);
1357
1358        return 0;
1359}
1360
1361#define OSC_DUMP_GRANT(cli, fmt, args...) do {                                \
1362        struct client_obd *__tmp = (cli);                                     \
1363        CDEBUG(D_CACHE, "%s: { dirty: %ld/%ld dirty_pages: %d/%d "            \
1364               "dropped: %ld avail: %ld, reserved: %ld, flight: %d } " fmt,   \
1365               __tmp->cl_import->imp_obd->obd_name,                           \
1366               __tmp->cl_dirty, __tmp->cl_dirty_max,                          \
1367               atomic_read(&obd_dirty_pages), obd_max_dirty_pages,            \
1368               __tmp->cl_lost_grant, __tmp->cl_avail_grant,                   \
1369               __tmp->cl_reserved_grant, __tmp->cl_w_in_flight, ##args);      \
1370} while (0)
1371
1372/* caller must hold loi_list_lock */
1373static void osc_consume_write_grant(struct client_obd *cli,
1374                                    struct brw_page *pga)
1375{
1376        assert_spin_locked(&cli->cl_loi_list_lock.lock);
1377        LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
1378        atomic_inc(&obd_dirty_pages);
1379        cli->cl_dirty += PAGE_SIZE;
1380        pga->flag |= OBD_BRW_FROM_GRANT;
1381        CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
1382               PAGE_SIZE, pga, pga->pg);
1383        osc_update_next_shrink(cli);
1384}
1385
1386/* the companion to osc_consume_write_grant, called when a brw has completed.
1387 * must be called with the loi lock held.
1388 */
1389static void osc_release_write_grant(struct client_obd *cli,
1390                                    struct brw_page *pga)
1391{
1392        assert_spin_locked(&cli->cl_loi_list_lock.lock);
1393        if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
1394                return;
1395        }
1396
1397        pga->flag &= ~OBD_BRW_FROM_GRANT;
1398        atomic_dec(&obd_dirty_pages);
1399        cli->cl_dirty -= PAGE_SIZE;
1400        if (pga->flag & OBD_BRW_NOCACHE) {
1401                pga->flag &= ~OBD_BRW_NOCACHE;
1402                atomic_dec(&obd_dirty_transit_pages);
1403                cli->cl_dirty_transit -= PAGE_SIZE;
1404        }
1405}
1406
1407/**
1408 * To avoid sleeping with object lock held, it's good for us allocate enough
1409 * grants before entering into critical section.
1410 *
1411 * client_obd_list_lock held by caller
1412 */
1413static int osc_reserve_grant(struct client_obd *cli, unsigned int bytes)
1414{
1415        int rc = -EDQUOT;
1416
1417        if (cli->cl_avail_grant >= bytes) {
1418                cli->cl_avail_grant -= bytes;
1419                cli->cl_reserved_grant += bytes;
1420                rc = 0;
1421        }
1422        return rc;
1423}
1424
1425static void __osc_unreserve_grant(struct client_obd *cli,
1426                                  unsigned int reserved, unsigned int unused)
1427{
1428        /* it's quite normal for us to get more grant than reserved.
1429         * Thinking about a case that two extents merged by adding a new
1430         * chunk, we can save one extent tax. If extent tax is greater than
1431         * one chunk, we can save more grant by adding a new chunk
1432         */
1433        cli->cl_reserved_grant -= reserved;
1434        if (unused > reserved) {
1435                cli->cl_avail_grant += reserved;
1436                cli->cl_lost_grant  += unused - reserved;
1437        } else {
1438                cli->cl_avail_grant += unused;
1439        }
1440}
1441
1442static void osc_unreserve_grant(struct client_obd *cli,
1443                                unsigned int reserved, unsigned int unused)
1444{
1445        client_obd_list_lock(&cli->cl_loi_list_lock);
1446        __osc_unreserve_grant(cli, reserved, unused);
1447        if (unused > 0)
1448                osc_wake_cache_waiters(cli);
1449        client_obd_list_unlock(&cli->cl_loi_list_lock);
1450}
1451
1452/**
1453 * Free grant after IO is finished or canceled.
1454 *
1455 * @lost_grant is used to remember how many grants we have allocated but not
1456 * used, we should return these grants to OST. There're two cases where grants
1457 * can be lost:
1458 * 1. truncate;
1459 * 2. blocksize at OST is less than PAGE_SIZE and a partial page was
1460 *    written. In this case OST may use less chunks to serve this partial
1461 *    write. OSTs don't actually know the page size on the client side. so
1462 *    clients have to calculate lost grant by the blocksize on the OST.
1463 *    See filter_grant_check() for details.
1464 */
1465static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
1466                           unsigned int lost_grant)
1467{
1468        int grant = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
1469
1470        client_obd_list_lock(&cli->cl_loi_list_lock);
1471        atomic_sub(nr_pages, &obd_dirty_pages);
1472        cli->cl_dirty -= nr_pages << PAGE_SHIFT;
1473        cli->cl_lost_grant += lost_grant;
1474        if (cli->cl_avail_grant < grant && cli->cl_lost_grant >= grant) {
1475                /* borrow some grant from truncate to avoid the case that
1476                 * truncate uses up all avail grant
1477                 */
1478                cli->cl_lost_grant -= grant;
1479                cli->cl_avail_grant += grant;
1480        }
1481        osc_wake_cache_waiters(cli);
1482        client_obd_list_unlock(&cli->cl_loi_list_lock);
1483        CDEBUG(D_CACHE, "lost %u grant: %lu avail: %lu dirty: %lu\n",
1484               lost_grant, cli->cl_lost_grant,
1485               cli->cl_avail_grant, cli->cl_dirty);
1486}
1487
1488/**
1489 * The companion to osc_enter_cache(), called when @oap is no longer part of
1490 * the dirty accounting due to error.
1491 */
1492static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap)
1493{
1494        client_obd_list_lock(&cli->cl_loi_list_lock);
1495        osc_release_write_grant(cli, &oap->oap_brw_page);
1496        client_obd_list_unlock(&cli->cl_loi_list_lock);
1497}
1498
1499/**
1500 * Non-blocking version of osc_enter_cache() that consumes grant only when it
1501 * is available.
1502 */
1503static int osc_enter_cache_try(struct client_obd *cli,
1504                               struct osc_async_page *oap,
1505                               int bytes, int transient)
1506{
1507        int rc;
1508
1509        OSC_DUMP_GRANT(cli, "need:%d.\n", bytes);
1510
1511        rc = osc_reserve_grant(cli, bytes);
1512        if (rc < 0)
1513                return 0;
1514
1515        if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max &&
1516            atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) {
1517                osc_consume_write_grant(cli, &oap->oap_brw_page);
1518                if (transient) {
1519                        cli->cl_dirty_transit += PAGE_SIZE;
1520                        atomic_inc(&obd_dirty_transit_pages);
1521                        oap->oap_brw_flags |= OBD_BRW_NOCACHE;
1522                }
1523                rc = 1;
1524        } else {
1525                __osc_unreserve_grant(cli, bytes, bytes);
1526                rc = 0;
1527        }
1528        return rc;
1529}
1530
1531static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
1532{
1533        int rc;
1534
1535        client_obd_list_lock(&cli->cl_loi_list_lock);
1536        rc = list_empty(&ocw->ocw_entry);
1537        client_obd_list_unlock(&cli->cl_loi_list_lock);
1538        return rc;
1539}
1540
1541/**
1542 * The main entry to reserve dirty page accounting. Usually the grant reserved
1543 * in this function will be freed in bulk in osc_free_grant() unless it fails
1544 * to add osc cache, in that case, it will be freed in osc_exit_cache().
1545 *
1546 * The process will be put into sleep if it's already run out of grant.
1547 */
1548static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
1549                           struct osc_async_page *oap, int bytes)
1550{
1551        struct osc_object *osc = oap->oap_obj;
1552        struct lov_oinfo *loi = osc->oo_oinfo;
1553        struct osc_cache_waiter ocw;
1554        struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1555        int rc = -EDQUOT;
1556
1557        OSC_DUMP_GRANT(cli, "need:%d.\n", bytes);
1558
1559        client_obd_list_lock(&cli->cl_loi_list_lock);
1560
1561        /* force the caller to try sync io.  this can jump the list
1562         * of queued writes and create a discontiguous rpc stream
1563         */
1564        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
1565            cli->cl_dirty_max < PAGE_SIZE     ||
1566            cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync) {
1567                rc = -EDQUOT;
1568                goto out;
1569        }
1570
1571        /* Hopefully normal case - cache space and write credits available */
1572        if (osc_enter_cache_try(cli, oap, bytes, 0)) {
1573                rc = 0;
1574                goto out;
1575        }
1576
1577        /* We can get here for two reasons: too many dirty pages in cache, or
1578         * run out of grants. In both cases we should write dirty pages out.
1579         * Adding a cache waiter will trigger urgent write-out no matter what
1580         * RPC size will be.
1581         * The exiting condition is no avail grants and no dirty pages caching,
1582         * that really means there is no space on the OST.
1583         */
1584        init_waitqueue_head(&ocw.ocw_waitq);
1585        ocw.ocw_oap   = oap;
1586        ocw.ocw_grant = bytes;
1587        while (cli->cl_dirty > 0 || cli->cl_w_in_flight > 0) {
1588                list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
1589                ocw.ocw_rc = 0;
1590                client_obd_list_unlock(&cli->cl_loi_list_lock);
1591
1592                osc_io_unplug_async(env, cli, NULL);
1593
1594                CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
1595                       cli->cl_import->imp_obd->obd_name, &ocw, oap);
1596
1597                rc = l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
1598
1599                client_obd_list_lock(&cli->cl_loi_list_lock);
1600
1601                /* l_wait_event is interrupted by signal */
1602                if (rc < 0) {
1603                        list_del_init(&ocw.ocw_entry);
1604                        goto out;
1605                }
1606
1607                LASSERT(list_empty(&ocw.ocw_entry));
1608                rc = ocw.ocw_rc;
1609
1610                if (rc != -EDQUOT)
1611                        goto out;
1612                if (osc_enter_cache_try(cli, oap, bytes, 0)) {
1613                        rc = 0;
1614                        goto out;
1615                }
1616        }
1617out:
1618        client_obd_list_unlock(&cli->cl_loi_list_lock);
1619        OSC_DUMP_GRANT(cli, "returned %d.\n", rc);
1620        return rc;
1621}
1622
1623/* caller must hold loi_list_lock */
1624void osc_wake_cache_waiters(struct client_obd *cli)
1625{
1626        struct list_head *l, *tmp;
1627        struct osc_cache_waiter *ocw;
1628
1629        list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
1630                ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
1631                list_del_init(&ocw->ocw_entry);
1632
1633                ocw->ocw_rc = -EDQUOT;
1634                /* we can't dirty more */
1635                if ((cli->cl_dirty + PAGE_SIZE > cli->cl_dirty_max) ||
1636                    (atomic_read(&obd_dirty_pages) + 1 >
1637                     obd_max_dirty_pages)) {
1638                        CDEBUG(D_CACHE, "no dirty room: dirty: %ld osc max %ld, sys max %d\n",
1639                               cli->cl_dirty,
1640                               cli->cl_dirty_max, obd_max_dirty_pages);
1641                        goto wakeup;
1642                }
1643
1644                ocw->ocw_rc = 0;
1645                if (!osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0))
1646                        ocw->ocw_rc = -EDQUOT;
1647
1648wakeup:
1649                CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n",
1650                       ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc);
1651
1652                wake_up(&ocw->ocw_waitq);
1653        }
1654}
1655
1656static int osc_max_rpc_in_flight(struct client_obd *cli, struct osc_object *osc)
1657{
1658        int hprpc = !!list_empty(&osc->oo_hp_exts);
1659
1660        return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
1661}
1662
1663/* This maintains the lists of pending pages to read/write for a given object
1664 * (lop).  This is used by osc_check_rpcs->osc_next_obj() and osc_list_maint()
1665 * to quickly find objects that are ready to send an RPC.
1666 */
1667static int osc_makes_rpc(struct client_obd *cli, struct osc_object *osc,
1668                         int cmd)
1669{
1670        int invalid_import = 0;
1671
1672        /* if we have an invalid import we want to drain the queued pages
1673         * by forcing them through rpcs that immediately fail and complete
1674         * the pages.  recovery relies on this to empty the queued pages
1675         * before canceling the locks and evicting down the llite pages
1676         */
1677        if (!cli->cl_import || cli->cl_import->imp_invalid)
1678                invalid_import = 1;
1679
1680        if (cmd & OBD_BRW_WRITE) {
1681                if (atomic_read(&osc->oo_nr_writes) == 0)
1682                        return 0;
1683                if (invalid_import) {
1684                        CDEBUG(D_CACHE, "invalid import forcing RPC\n");
1685                        return 1;
1686                }
1687                if (!list_empty(&osc->oo_hp_exts)) {
1688                        CDEBUG(D_CACHE, "high prio request forcing RPC\n");
1689                        return 1;
1690                }
1691                if (!list_empty(&osc->oo_urgent_exts)) {
1692                        CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1693                        return 1;
1694                }
1695                /* trigger a write rpc stream as long as there are dirtiers
1696                 * waiting for space.  as they're waiting, they're not going to
1697                 * create more pages to coalesce with what's waiting..
1698                 */
1699                if (!list_empty(&cli->cl_cache_waiters)) {
1700                        CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1701                        return 1;
1702                }
1703                if (atomic_read(&osc->oo_nr_writes) >=
1704                    cli->cl_max_pages_per_rpc)
1705                        return 1;
1706        } else {
1707                if (atomic_read(&osc->oo_nr_reads) == 0)
1708                        return 0;
1709                if (invalid_import) {
1710                        CDEBUG(D_CACHE, "invalid import forcing RPC\n");
1711                        return 1;
1712                }
1713                /* all read are urgent. */
1714                if (!list_empty(&osc->oo_reading_exts))
1715                        return 1;
1716        }
1717
1718        return 0;
1719}
1720
1721static void osc_update_pending(struct osc_object *obj, int cmd, int delta)
1722{
1723        struct client_obd *cli = osc_cli(obj);
1724
1725        if (cmd & OBD_BRW_WRITE) {
1726                atomic_add(delta, &obj->oo_nr_writes);
1727                atomic_add(delta, &cli->cl_pending_w_pages);
1728                LASSERT(atomic_read(&obj->oo_nr_writes) >= 0);
1729        } else {
1730                atomic_add(delta, &obj->oo_nr_reads);
1731                atomic_add(delta, &cli->cl_pending_r_pages);
1732                LASSERT(atomic_read(&obj->oo_nr_reads) >= 0);
1733        }
1734        OSC_IO_DEBUG(obj, "update pending cmd %d delta %d.\n", cmd, delta);
1735}
1736
1737static int osc_makes_hprpc(struct osc_object *obj)
1738{
1739        return !list_empty(&obj->oo_hp_exts);
1740}
1741
1742static void on_list(struct list_head *item, struct list_head *list, int should_be_on)
1743{
1744        if (list_empty(item) && should_be_on)
1745                list_add_tail(item, list);
1746        else if (!list_empty(item) && !should_be_on)
1747                list_del_init(item);
1748}
1749
1750/* maintain the osc's cli list membership invariants so that osc_send_oap_rpc
1751 * can find pages to build into rpcs quickly
1752 */
1753static int __osc_list_maint(struct client_obd *cli, struct osc_object *osc)
1754{
1755        if (osc_makes_hprpc(osc)) {
1756                /* HP rpc */
1757                on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list, 0);
1758                on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1759        } else {
1760                on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1761                on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list,
1762                        osc_makes_rpc(cli, osc, OBD_BRW_WRITE) ||
1763                        osc_makes_rpc(cli, osc, OBD_BRW_READ));
1764        }
1765
1766        on_list(&osc->oo_write_item, &cli->cl_loi_write_list,
1767                atomic_read(&osc->oo_nr_writes) > 0);
1768
1769        on_list(&osc->oo_read_item, &cli->cl_loi_read_list,
1770                atomic_read(&osc->oo_nr_reads) > 0);
1771
1772        return osc_is_ready(osc);
1773}
1774
1775static int osc_list_maint(struct client_obd *cli, struct osc_object *osc)
1776{
1777        int is_ready;
1778
1779        client_obd_list_lock(&cli->cl_loi_list_lock);
1780        is_ready = __osc_list_maint(cli, osc);
1781        client_obd_list_unlock(&cli->cl_loi_list_lock);
1782
1783        return is_ready;
1784}
1785
1786/* this is trying to propagate async writeback errors back up to the
1787 * application.  As an async write fails we record the error code for later if
1788 * the app does an fsync.  As long as errors persist we force future rpcs to be
1789 * sync so that the app can get a sync error and break the cycle of queueing
1790 * pages for which writeback will fail.
1791 */
1792static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1793                           int rc)
1794{
1795        if (rc) {
1796                if (!ar->ar_rc)
1797                        ar->ar_rc = rc;
1798
1799                ar->ar_force_sync = 1;
1800                ar->ar_min_xid = ptlrpc_sample_next_xid();
1801                return;
1802
1803        }
1804
1805        if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1806                ar->ar_force_sync = 0;
1807}
1808
1809/* this must be called holding the loi list lock to give coverage to exit_cache,
1810 * async_flag maintenance, and oap_request
1811 */
1812static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
1813                              struct osc_async_page *oap, int sent, int rc)
1814{
1815        struct osc_object *osc = oap->oap_obj;
1816        struct lov_oinfo *loi = osc->oo_oinfo;
1817        __u64 xid = 0;
1818
1819        if (oap->oap_request) {
1820                xid = ptlrpc_req_xid(oap->oap_request);
1821                ptlrpc_req_finished(oap->oap_request);
1822                oap->oap_request = NULL;
1823        }
1824
1825        /* As the transfer for this page is being done, clear the flags */
1826        spin_lock(&oap->oap_lock);
1827        oap->oap_async_flags = 0;
1828        spin_unlock(&oap->oap_lock);
1829        oap->oap_interrupted = 0;
1830
1831        if (oap->oap_cmd & OBD_BRW_WRITE && xid > 0) {
1832                client_obd_list_lock(&cli->cl_loi_list_lock);
1833                osc_process_ar(&cli->cl_ar, xid, rc);
1834                osc_process_ar(&loi->loi_ar, xid, rc);
1835                client_obd_list_unlock(&cli->cl_loi_list_lock);
1836        }
1837
1838        rc = osc_completion(env, oap, oap->oap_cmd, rc);
1839        if (rc)
1840                CERROR("completion on oap %p obj %p returns %d.\n",
1841                       oap, osc, rc);
1842}
1843
1844/**
1845 * Try to add extent to one RPC. We need to think about the following things:
1846 * - # of pages must not be over max_pages_per_rpc
1847 * - extent must be compatible with previous ones
1848 */
1849static int try_to_add_extent_for_io(struct client_obd *cli,
1850                                    struct osc_extent *ext, struct list_head *rpclist,
1851                                    int *pc, unsigned int *max_pages)
1852{
1853        struct osc_extent *tmp;
1854        struct osc_async_page *oap = list_first_entry(&ext->oe_pages,
1855                                                      struct osc_async_page,
1856                                                      oap_pending_item);
1857
1858        EASSERT((ext->oe_state == OES_CACHE || ext->oe_state == OES_LOCK_DONE),
1859                ext);
1860
1861        *max_pages = max(ext->oe_mppr, *max_pages);
1862        if (*pc + ext->oe_nr_pages > *max_pages)
1863                return 0;
1864
1865        list_for_each_entry(tmp, rpclist, oe_link) {
1866                struct osc_async_page *oap2;
1867
1868                oap2 = list_first_entry(&tmp->oe_pages, struct osc_async_page,
1869                                        oap_pending_item);
1870                EASSERT(tmp->oe_owner == current, tmp);
1871                if (oap2cl_page(oap)->cp_type != oap2cl_page(oap2)->cp_type) {
1872                        CDEBUG(D_CACHE, "Do not permit different type of IO"
1873                                        " for a same RPC\n");
1874                        return 0;
1875                }
1876
1877                if (tmp->oe_srvlock != ext->oe_srvlock ||
1878                    !tmp->oe_grants != !ext->oe_grants)
1879                        return 0;
1880
1881                /* remove break for strict check */
1882                break;
1883        }
1884
1885        *pc += ext->oe_nr_pages;
1886        list_move_tail(&ext->oe_link, rpclist);
1887        ext->oe_owner = current;
1888        return 1;
1889}
1890
1891/**
1892 * In order to prevent multiple ptlrpcd from breaking contiguous extents,
1893 * get_write_extent() takes all appropriate extents in atomic.
1894 *
1895 * The following policy is used to collect extents for IO:
1896 * 1. Add as many HP extents as possible;
1897 * 2. Add the first urgent extent in urgent extent list and take it out of
1898 *    urgent list;
1899 * 3. Add subsequent extents of this urgent extent;
1900 * 4. If urgent list is not empty, goto 2;
1901 * 5. Traverse the extent tree from the 1st extent;
1902 * 6. Above steps exit if there is no space in this RPC.
1903 */
1904static int get_write_extents(struct osc_object *obj, struct list_head *rpclist)
1905{
1906        struct client_obd *cli = osc_cli(obj);
1907        struct osc_extent *ext;
1908        struct osc_extent *temp;
1909        int page_count = 0;
1910        unsigned int max_pages = cli->cl_max_pages_per_rpc;
1911
1912        LASSERT(osc_object_is_locked(obj));
1913        list_for_each_entry_safe(ext, temp, &obj->oo_hp_exts, oe_link) {
1914                LASSERT(ext->oe_state == OES_CACHE);
1915                if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
1916                                              &max_pages))
1917                        return page_count;
1918                EASSERT(ext->oe_nr_pages <= max_pages, ext);
1919        }
1920        if (page_count == max_pages)
1921                return page_count;
1922
1923        while (!list_empty(&obj->oo_urgent_exts)) {
1924                ext = list_entry(obj->oo_urgent_exts.next,
1925                                 struct osc_extent, oe_link);
1926                if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
1927                                              &max_pages))
1928                        return page_count;
1929
1930                if (!ext->oe_intree)
1931                        continue;
1932
1933                while ((ext = next_extent(ext)) != NULL) {
1934                        if ((ext->oe_state != OES_CACHE) ||
1935                            (!list_empty(&ext->oe_link) &&
1936                             ext->oe_owner))
1937                                continue;
1938
1939                        if (!try_to_add_extent_for_io(cli, ext, rpclist,
1940                                                      &page_count, &max_pages))
1941                                return page_count;
1942                }
1943        }
1944        if (page_count == max_pages)
1945                return page_count;
1946
1947        ext = first_extent(obj);
1948        while (ext) {
1949                if ((ext->oe_state != OES_CACHE) ||
1950                    /* this extent may be already in current rpclist */
1951                    (!list_empty(&ext->oe_link) && ext->oe_owner)) {
1952                        ext = next_extent(ext);
1953                        continue;
1954                }
1955
1956                if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
1957                                              &max_pages))
1958                        return page_count;
1959
1960                ext = next_extent(ext);
1961        }
1962        return page_count;
1963}
1964
1965static int
1966osc_send_write_rpc(const struct lu_env *env, struct client_obd *cli,
1967                   struct osc_object *osc)
1968        __must_hold(osc)
1969{
1970        LIST_HEAD(rpclist);
1971        struct osc_extent *ext;
1972        struct osc_extent *tmp;
1973        struct osc_extent *first = NULL;
1974        u32 page_count = 0;
1975        int srvlock = 0;
1976        int rc = 0;
1977
1978        LASSERT(osc_object_is_locked(osc));
1979
1980        page_count = get_write_extents(osc, &rpclist);
1981        LASSERT(equi(page_count == 0, list_empty(&rpclist)));
1982
1983        if (list_empty(&rpclist))
1984                return 0;
1985
1986        osc_update_pending(osc, OBD_BRW_WRITE, -page_count);
1987
1988        list_for_each_entry(ext, &rpclist, oe_link) {
1989                LASSERT(ext->oe_state == OES_CACHE ||
1990                        ext->oe_state == OES_LOCK_DONE);
1991                if (ext->oe_state == OES_CACHE)
1992                        osc_extent_state_set(ext, OES_LOCKING);
1993                else
1994                        osc_extent_state_set(ext, OES_RPC);
1995        }
1996
1997        /* we're going to grab page lock, so release object lock because
1998         * lock order is page lock -> object lock.
1999         */
2000        osc_object_unlock(osc);
2001
2002        list_for_each_entry_safe(ext, tmp, &rpclist, oe_link) {
2003                if (ext->oe_state == OES_LOCKING) {
2004                        rc = osc_extent_make_ready(env, ext);
2005                        if (unlikely(rc < 0)) {
2006                                list_del_init(&ext->oe_link);
2007                                osc_extent_finish(env, ext, 0, rc);
2008                                continue;
2009                        }
2010                }
2011                if (!first) {
2012                        first = ext;
2013                        srvlock = ext->oe_srvlock;
2014                } else {
2015                        LASSERT(srvlock == ext->oe_srvlock);
2016                }
2017        }
2018
2019        if (!list_empty(&rpclist)) {
2020                LASSERT(page_count > 0);
2021                rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_WRITE);
2022                LASSERT(list_empty(&rpclist));
2023        }
2024
2025        osc_object_lock(osc);
2026        return rc;
2027}
2028
2029/**
2030 * prepare pages for ASYNC io and put pages in send queue.
2031 *
2032 * \param cmd OBD_BRW_* macroses
2033 * \param lop pending pages
2034 *
2035 * \return zero if no page added to send queue.
2036 * \return 1 if pages successfully added to send queue.
2037 * \return negative on errors.
2038 */
2039static int
2040osc_send_read_rpc(const struct lu_env *env, struct client_obd *cli,
2041                  struct osc_object *osc)
2042        __must_hold(osc)
2043{
2044        struct osc_extent *ext;
2045        struct osc_extent *next;
2046        LIST_HEAD(rpclist);
2047        int page_count = 0;
2048        unsigned int max_pages = cli->cl_max_pages_per_rpc;
2049        int rc = 0;
2050
2051        LASSERT(osc_object_is_locked(osc));
2052        list_for_each_entry_safe(ext, next, &osc->oo_reading_exts, oe_link) {
2053                EASSERT(ext->oe_state == OES_LOCK_DONE, ext);
2054                if (!try_to_add_extent_for_io(cli, ext, &rpclist, &page_count,
2055                                              &max_pages))
2056                        break;
2057                osc_extent_state_set(ext, OES_RPC);
2058                EASSERT(ext->oe_nr_pages <= max_pages, ext);
2059        }
2060        LASSERT(page_count <= max_pages);
2061
2062        osc_update_pending(osc, OBD_BRW_READ, -page_count);
2063
2064        if (!list_empty(&rpclist)) {
2065                osc_object_unlock(osc);
2066
2067                LASSERT(page_count > 0);
2068                rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_READ);
2069                LASSERT(list_empty(&rpclist));
2070
2071                osc_object_lock(osc);
2072        }
2073        return rc;
2074}
2075
2076#define list_to_obj(list, item) ({                                            \
2077        struct list_head *__tmp = (list)->next;                               \
2078        list_del_init(__tmp);                                                 \
2079        list_entry(__tmp, struct osc_object, oo_##item);                      \
2080})
2081
2082/* This is called by osc_check_rpcs() to find which objects have pages that
2083 * we could be sending.  These lists are maintained by osc_makes_rpc().
2084 */
2085static struct osc_object *osc_next_obj(struct client_obd *cli)
2086{
2087        /* First return objects that have blocked locks so that they
2088         * will be flushed quickly and other clients can get the lock,
2089         * then objects which have pages ready to be stuffed into RPCs
2090         */
2091        if (!list_empty(&cli->cl_loi_hp_ready_list))
2092                return list_to_obj(&cli->cl_loi_hp_ready_list, hp_ready_item);
2093        if (!list_empty(&cli->cl_loi_ready_list))
2094                return list_to_obj(&cli->cl_loi_ready_list, ready_item);
2095
2096        /* then if we have cache waiters, return all objects with queued
2097         * writes.  This is especially important when many small files
2098         * have filled up the cache and not been fired into rpcs because
2099         * they don't pass the nr_pending/object threshold
2100         */
2101        if (!list_empty(&cli->cl_cache_waiters) &&
2102            !list_empty(&cli->cl_loi_write_list))
2103                return list_to_obj(&cli->cl_loi_write_list, write_item);
2104
2105        /* then return all queued objects when we have an invalid import
2106         * so that they get flushed
2107         */
2108        if (!cli->cl_import || cli->cl_import->imp_invalid) {
2109                if (!list_empty(&cli->cl_loi_write_list))
2110                        return list_to_obj(&cli->cl_loi_write_list, write_item);
2111                if (!list_empty(&cli->cl_loi_read_list))
2112                        return list_to_obj(&cli->cl_loi_read_list, read_item);
2113        }
2114        return NULL;
2115}
2116
2117/* called with the loi list lock held */
2118static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2119        __must_hold(&cli->cl_loi_list_lock)
2120{
2121        struct osc_object *osc;
2122        int rc = 0;
2123
2124        while ((osc = osc_next_obj(cli)) != NULL) {
2125                struct cl_object *obj = osc2cl(osc);
2126                struct lu_ref_link link;
2127
2128                OSC_IO_DEBUG(osc, "%lu in flight\n", rpcs_in_flight(cli));
2129
2130                if (osc_max_rpc_in_flight(cli, osc)) {
2131                        __osc_list_maint(cli, osc);
2132                        break;
2133                }
2134
2135                cl_object_get(obj);
2136                client_obd_list_unlock(&cli->cl_loi_list_lock);
2137                lu_object_ref_add_at(&obj->co_lu, &link, "check",
2138                                     current);
2139
2140                /* attempt some read/write balancing by alternating between
2141                 * reads and writes in an object.  The makes_rpc checks here
2142                 * would be redundant if we were getting read/write work items
2143                 * instead of objects.  we don't want send_oap_rpc to drain a
2144                 * partial read pending queue when we're given this object to
2145                 * do io on writes while there are cache waiters
2146                 */
2147                osc_object_lock(osc);
2148                if (osc_makes_rpc(cli, osc, OBD_BRW_WRITE)) {
2149                        rc = osc_send_write_rpc(env, cli, osc);
2150                        if (rc < 0) {
2151                                CERROR("Write request failed with %d\n", rc);
2152
2153                                /* osc_send_write_rpc failed, mostly because of
2154                                 * memory pressure.
2155                                 *
2156                                 * It can't break here, because if:
2157                                 *  - a page was submitted by osc_io_submit, so
2158                                 *    page locked;
2159                                 *  - no request in flight
2160                                 *  - no subsequent request
2161                                 * The system will be in live-lock state,
2162                                 * because there is no chance to call
2163                                 * osc_io_unplug() and osc_check_rpcs() any
2164                                 * more. pdflush can't help in this case,
2165                                 * because it might be blocked at grabbing
2166                                 * the page lock as we mentioned.
2167                                 *
2168                                 * Anyway, continue to drain pages.
2169                                 */
2170                                /* break; */
2171                        }
2172                }
2173                if (osc_makes_rpc(cli, osc, OBD_BRW_READ)) {
2174                        rc = osc_send_read_rpc(env, cli, osc);
2175                        if (rc < 0)
2176                                CERROR("Read request failed with %d\n", rc);
2177                }
2178                osc_object_unlock(osc);
2179
2180                osc_list_maint(cli, osc);
2181                lu_object_ref_del_at(&obj->co_lu, &link, "check",
2182                                     current);
2183                cl_object_put(env, obj);
2184
2185                client_obd_list_lock(&cli->cl_loi_list_lock);
2186        }
2187}
2188
2189static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
2190                          struct osc_object *osc, int async)
2191{
2192        int rc = 0;
2193
2194        if (osc && osc_list_maint(cli, osc) == 0)
2195                return 0;
2196
2197        if (!async) {
2198                /* disable osc_lru_shrink() temporarily to avoid
2199                 * potential stack overrun problem. LU-2859
2200                 */
2201                atomic_inc(&cli->cl_lru_shrinkers);
2202                client_obd_list_lock(&cli->cl_loi_list_lock);
2203                osc_check_rpcs(env, cli);
2204                client_obd_list_unlock(&cli->cl_loi_list_lock);
2205                atomic_dec(&cli->cl_lru_shrinkers);
2206        } else {
2207                CDEBUG(D_CACHE, "Queue writeback work for client %p.\n", cli);
2208                LASSERT(cli->cl_writeback_work);
2209                rc = ptlrpcd_queue_work(cli->cl_writeback_work);
2210        }
2211        return rc;
2212}
2213
2214static int osc_io_unplug_async(const struct lu_env *env,
2215                               struct client_obd *cli, struct osc_object *osc)
2216{
2217        return osc_io_unplug0(env, cli, osc, 1);
2218}
2219
2220void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
2221                   struct osc_object *osc)
2222{
2223        (void)osc_io_unplug0(env, cli, osc, 0);
2224}
2225
2226int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
2227                        struct page *page, loff_t offset)
2228{
2229        struct obd_export *exp = osc_export(osc);
2230        struct osc_async_page *oap = &ops->ops_oap;
2231
2232        if (!page)
2233                return cfs_size_round(sizeof(*oap));
2234
2235        oap->oap_magic = OAP_MAGIC;
2236        oap->oap_cli = &exp->exp_obd->u.cli;
2237        oap->oap_obj = osc;
2238
2239        oap->oap_page = page;
2240        oap->oap_obj_off = offset;
2241        LASSERT(!(offset & ~CFS_PAGE_MASK));
2242
2243        if (!client_is_remote(exp) && capable(CFS_CAP_SYS_RESOURCE))
2244                oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2245
2246        INIT_LIST_HEAD(&oap->oap_pending_item);
2247        INIT_LIST_HEAD(&oap->oap_rpc_item);
2248
2249        spin_lock_init(&oap->oap_lock);
2250        CDEBUG(D_INFO, "oap %p page %p obj off %llu\n",
2251               oap, page, oap->oap_obj_off);
2252        return 0;
2253}
2254
2255int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
2256                       struct osc_page *ops)
2257{
2258        struct osc_io *oio = osc_env_io(env);
2259        struct osc_extent *ext = NULL;
2260        struct osc_async_page *oap = &ops->ops_oap;
2261        struct client_obd *cli = oap->oap_cli;
2262        struct osc_object *osc = oap->oap_obj;
2263        pgoff_t index;
2264        int grants = 0;
2265        int brw_flags = OBD_BRW_ASYNC;
2266        int cmd = OBD_BRW_WRITE;
2267        int need_release = 0;
2268        int rc = 0;
2269
2270        if (oap->oap_magic != OAP_MAGIC)
2271                return -EINVAL;
2272
2273        if (!cli->cl_import || cli->cl_import->imp_invalid)
2274                return -EIO;
2275
2276        if (!list_empty(&oap->oap_pending_item) ||
2277            !list_empty(&oap->oap_rpc_item))
2278                return -EBUSY;
2279
2280        /* Set the OBD_BRW_SRVLOCK before the page is queued. */
2281        brw_flags |= ops->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
2282        if (!client_is_remote(osc_export(osc)) &&
2283            capable(CFS_CAP_SYS_RESOURCE)) {
2284                brw_flags |= OBD_BRW_NOQUOTA;
2285                cmd |= OBD_BRW_NOQUOTA;
2286        }
2287
2288        /* check if the file's owner/group is over quota */
2289        if (!(cmd & OBD_BRW_NOQUOTA)) {
2290                struct cl_object *obj;
2291                struct cl_attr *attr;
2292                unsigned int qid[MAXQUOTAS];
2293
2294                obj = cl_object_top(&osc->oo_cl);
2295                attr = &osc_env_info(env)->oti_attr;
2296
2297                cl_object_attr_lock(obj);
2298                rc = cl_object_attr_get(env, obj, attr);
2299                cl_object_attr_unlock(obj);
2300
2301                qid[USRQUOTA] = attr->cat_uid;
2302                qid[GRPQUOTA] = attr->cat_gid;
2303                if (rc == 0 && osc_quota_chkdq(cli, qid) == NO_QUOTA)
2304                        rc = -EDQUOT;
2305                if (rc)
2306                        return rc;
2307        }
2308
2309        oap->oap_cmd = cmd;
2310        oap->oap_page_off = ops->ops_from;
2311        oap->oap_count = ops->ops_to - ops->ops_from;
2312        oap->oap_async_flags = 0;
2313        oap->oap_brw_flags = brw_flags;
2314
2315        OSC_IO_DEBUG(osc, "oap %p page %p added for cmd %d\n",
2316                     oap, oap->oap_page, oap->oap_cmd & OBD_BRW_RWMASK);
2317
2318        index = oap2cl_page(oap)->cp_index;
2319
2320        /* Add this page into extent by the following steps:
2321         * 1. if there exists an active extent for this IO, mostly this page
2322         *    can be added to the active extent and sometimes we need to
2323         *    expand extent to accommodate this page;
2324         * 2. otherwise, a new extent will be allocated.
2325         */
2326
2327        ext = oio->oi_active;
2328        if (ext && ext->oe_start <= index && ext->oe_max_end >= index) {
2329                /* one chunk plus extent overhead must be enough to write this
2330                 * page
2331                 */
2332                grants = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
2333                if (ext->oe_end >= index)
2334                        grants = 0;
2335
2336                /* it doesn't need any grant to dirty this page */
2337                client_obd_list_lock(&cli->cl_loi_list_lock);
2338                rc = osc_enter_cache_try(cli, oap, grants, 0);
2339                client_obd_list_unlock(&cli->cl_loi_list_lock);
2340                if (rc == 0) { /* try failed */
2341                        grants = 0;
2342                        need_release = 1;
2343                } else if (ext->oe_end < index) {
2344                        int tmp = grants;
2345                        /* try to expand this extent */
2346                        rc = osc_extent_expand(ext, index, &tmp);
2347                        if (rc < 0) {
2348                                need_release = 1;
2349                                /* don't free reserved grant */
2350                        } else {
2351                                OSC_EXTENT_DUMP(D_CACHE, ext,
2352                                                "expanded for %lu.\n", index);
2353                                osc_unreserve_grant(cli, grants, tmp);
2354                                grants = 0;
2355                        }
2356                }
2357                rc = 0;
2358        } else if (ext) {
2359                /* index is located outside of active extent */
2360                need_release = 1;
2361        }
2362        if (need_release) {
2363                osc_extent_release(env, ext);
2364                oio->oi_active = NULL;
2365                ext = NULL;
2366        }
2367
2368        if (!ext) {
2369                int tmp = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
2370
2371                /* try to find new extent to cover this page */
2372                LASSERT(!oio->oi_active);
2373                /* we may have allocated grant for this page if we failed
2374                 * to expand the previous active extent.
2375                 */
2376                LASSERT(ergo(grants > 0, grants >= tmp));
2377
2378                rc = 0;
2379                if (grants == 0) {
2380                        /* we haven't allocated grant for this page. */
2381                        rc = osc_enter_cache(env, cli, oap, tmp);
2382                        if (rc == 0)
2383                                grants = tmp;
2384                }
2385
2386                tmp = grants;
2387                if (rc == 0) {
2388                        ext = osc_extent_find(env, osc, index, &tmp);
2389                        if (IS_ERR(ext)) {
2390                                LASSERT(tmp == grants);
2391                                osc_exit_cache(cli, oap);
2392                                rc = PTR_ERR(ext);
2393                                ext = NULL;
2394                        } else {
2395                                oio->oi_active = ext;
2396                        }
2397                }
2398                if (grants > 0)
2399                        osc_unreserve_grant(cli, grants, tmp);
2400        }
2401
2402        LASSERT(ergo(rc == 0, ext));
2403        if (ext) {
2404                EASSERTF(ext->oe_end >= index && ext->oe_start <= index,
2405                         ext, "index = %lu.\n", index);
2406                LASSERT((oap->oap_brw_flags & OBD_BRW_FROM_GRANT) != 0);
2407
2408                osc_object_lock(osc);
2409                if (ext->oe_nr_pages == 0)
2410                        ext->oe_srvlock = ops->ops_srvlock;
2411                else
2412                        LASSERT(ext->oe_srvlock == ops->ops_srvlock);
2413                ++ext->oe_nr_pages;
2414                list_add_tail(&oap->oap_pending_item, &ext->oe_pages);
2415                osc_object_unlock(osc);
2416        }
2417        return rc;
2418}
2419
2420int osc_teardown_async_page(const struct lu_env *env,
2421                            struct osc_object *obj, struct osc_page *ops)
2422{
2423        struct osc_async_page *oap = &ops->ops_oap;
2424        struct osc_extent *ext = NULL;
2425        int rc = 0;
2426
2427        LASSERT(oap->oap_magic == OAP_MAGIC);
2428
2429        CDEBUG(D_INFO, "teardown oap %p page %p at index %lu.\n",
2430               oap, ops, oap2cl_page(oap)->cp_index);
2431
2432        osc_object_lock(obj);
2433        if (!list_empty(&oap->oap_rpc_item)) {
2434                CDEBUG(D_CACHE, "oap %p is not in cache.\n", oap);
2435                rc = -EBUSY;
2436        } else if (!list_empty(&oap->oap_pending_item)) {
2437                ext = osc_extent_lookup(obj, oap2cl_page(oap)->cp_index);
2438                /* only truncated pages are allowed to be taken out.
2439                 * See osc_extent_truncate() and osc_cache_truncate_start()
2440                 * for details.
2441                 */
2442                if (ext && ext->oe_state != OES_TRUNC) {
2443                        OSC_EXTENT_DUMP(D_ERROR, ext, "trunc at %lu.\n",
2444                                        oap2cl_page(oap)->cp_index);
2445                        rc = -EBUSY;
2446                }
2447        }
2448        osc_object_unlock(obj);
2449        if (ext)
2450                osc_extent_put(env, ext);
2451        return rc;
2452}
2453
2454/**
2455 * This is called when a page is picked up by kernel to write out.
2456 *
2457 * We should find out the corresponding extent and add the whole extent
2458 * into urgent list. The extent may be being truncated or used, handle it
2459 * carefully.
2460 */
2461int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
2462                         struct osc_page *ops)
2463{
2464        struct osc_extent *ext = NULL;
2465        struct osc_object *obj = cl2osc(ops->ops_cl.cpl_obj);
2466        struct cl_page *cp = ops->ops_cl.cpl_page;
2467        pgoff_t index = cp->cp_index;
2468        struct osc_async_page *oap = &ops->ops_oap;
2469        bool unplug = false;
2470        int rc = 0;
2471
2472        osc_object_lock(obj);
2473        ext = osc_extent_lookup(obj, index);
2474        if (!ext) {
2475                osc_extent_tree_dump(D_ERROR, obj);
2476                LASSERTF(0, "page index %lu is NOT covered.\n", index);
2477        }
2478
2479        switch (ext->oe_state) {
2480        case OES_RPC:
2481        case OES_LOCK_DONE:
2482                CL_PAGE_DEBUG(D_ERROR, env, cl_page_top(cp),
2483                              "flush an in-rpc page?\n");
2484                LASSERT(0);
2485                break;
2486        case OES_LOCKING:
2487                /* If we know this extent is being written out, we should abort
2488                 * so that the writer can make this page ready. Otherwise, there
2489                 * exists a deadlock problem because other process can wait for
2490                 * page writeback bit holding page lock; and meanwhile in
2491                 * vvp_page_make_ready(), we need to grab page lock before
2492                 * really sending the RPC.
2493                 */
2494        case OES_TRUNC:
2495                /* race with truncate, page will be redirtied */
2496        case OES_ACTIVE:
2497                /* The extent is active so we need to abort and let the caller
2498                 * re-dirty the page. If we continued on here, and we were the
2499                 * one making the extent active, we could deadlock waiting for
2500                 * the page writeback to clear but it won't because the extent
2501                 * is active and won't be written out.
2502                 */
2503                rc = -EAGAIN;
2504                goto out;
2505        default:
2506                break;
2507        }
2508
2509        rc = cl_page_prep(env, io, cl_page_top(cp), CRT_WRITE);
2510        if (rc)
2511                goto out;
2512
2513        spin_lock(&oap->oap_lock);
2514        oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT;
2515        spin_unlock(&oap->oap_lock);
2516
2517        if (memory_pressure_get())
2518                ext->oe_memalloc = 1;
2519
2520        ext->oe_urgent = 1;
2521        if (ext->oe_state == OES_CACHE) {
2522                OSC_EXTENT_DUMP(D_CACHE, ext,
2523                                "flush page %p make it urgent.\n", oap);
2524                if (list_empty(&ext->oe_link))
2525                        list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
2526                unplug = true;
2527        }
2528        rc = 0;
2529
2530out:
2531        osc_object_unlock(obj);
2532        osc_extent_put(env, ext);
2533        if (unplug)
2534                osc_io_unplug_async(env, osc_cli(obj), obj);
2535        return rc;
2536}
2537
2538/**
2539 * this is called when a sync waiter receives an interruption.  Its job is to
2540 * get the caller woken as soon as possible.  If its page hasn't been put in an
2541 * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2542 * desiring interruption which will forcefully complete the rpc once the rpc
2543 * has timed out.
2544 */
2545int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops)
2546{
2547        struct osc_async_page *oap = &ops->ops_oap;
2548        struct osc_object *obj = oap->oap_obj;
2549        struct client_obd *cli = osc_cli(obj);
2550        struct osc_extent *ext;
2551        struct osc_extent *found = NULL;
2552        struct list_head *plist;
2553        pgoff_t index = oap2cl_page(oap)->cp_index;
2554        int rc = -EBUSY;
2555        int cmd;
2556
2557        LASSERT(!oap->oap_interrupted);
2558        oap->oap_interrupted = 1;
2559
2560        /* Find out the caching extent */
2561        osc_object_lock(obj);
2562        if (oap->oap_cmd & OBD_BRW_WRITE) {
2563                plist = &obj->oo_urgent_exts;
2564                cmd = OBD_BRW_WRITE;
2565        } else {
2566                plist = &obj->oo_reading_exts;
2567                cmd = OBD_BRW_READ;
2568        }
2569        list_for_each_entry(ext, plist, oe_link) {
2570                if (ext->oe_start <= index && ext->oe_end >= index) {
2571                        LASSERT(ext->oe_state == OES_LOCK_DONE);
2572                        /* For OES_LOCK_DONE state extent, it has already held
2573                         * a refcount for RPC.
2574                         */
2575                        found = osc_extent_get(ext);
2576                        break;
2577                }
2578        }
2579        if (found) {
2580                list_del_init(&found->oe_link);
2581                osc_update_pending(obj, cmd, -found->oe_nr_pages);
2582                osc_object_unlock(obj);
2583
2584                osc_extent_finish(env, found, 0, -EINTR);
2585                osc_extent_put(env, found);
2586                rc = 0;
2587        } else {
2588                osc_object_unlock(obj);
2589                /* ok, it's been put in an rpc. only one oap gets a request
2590                 * reference
2591                 */
2592                if (oap->oap_request) {
2593                        ptlrpc_mark_interrupted(oap->oap_request);
2594                        ptlrpcd_wake(oap->oap_request);
2595                        ptlrpc_req_finished(oap->oap_request);
2596                        oap->oap_request = NULL;
2597                }
2598        }
2599
2600        osc_list_maint(cli, obj);
2601        return rc;
2602}
2603
2604int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
2605                         struct list_head *list, int cmd, int brw_flags)
2606{
2607        struct client_obd *cli = osc_cli(obj);
2608        struct osc_extent *ext;
2609        struct osc_async_page *oap, *tmp;
2610        int page_count = 0;
2611        int mppr = cli->cl_max_pages_per_rpc;
2612        pgoff_t start = CL_PAGE_EOF;
2613        pgoff_t end = 0;
2614
2615        list_for_each_entry(oap, list, oap_pending_item) {
2616                struct cl_page *cp = oap2cl_page(oap);
2617
2618                if (cp->cp_index > end)
2619                        end = cp->cp_index;
2620                if (cp->cp_index < start)
2621                        start = cp->cp_index;
2622                ++page_count;
2623                mppr <<= (page_count > mppr);
2624        }
2625
2626        ext = osc_extent_alloc(obj);
2627        if (!ext) {
2628                list_for_each_entry_safe(oap, tmp, list, oap_pending_item) {
2629                        list_del_init(&oap->oap_pending_item);
2630                        osc_ap_completion(env, cli, oap, 0, -ENOMEM);
2631                }
2632                return -ENOMEM;
2633        }
2634
2635        ext->oe_rw = !!(cmd & OBD_BRW_READ);
2636        ext->oe_urgent = 1;
2637        ext->oe_start = start;
2638        ext->oe_end = ext->oe_max_end = end;
2639        ext->oe_obj = obj;
2640        ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
2641        ext->oe_nr_pages = page_count;
2642        ext->oe_mppr = mppr;
2643        list_splice_init(list, &ext->oe_pages);
2644
2645        osc_object_lock(obj);
2646        /* Reuse the initial refcount for RPC, don't drop it */
2647        osc_extent_state_set(ext, OES_LOCK_DONE);
2648        if (cmd & OBD_BRW_WRITE) {
2649                list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
2650                osc_update_pending(obj, OBD_BRW_WRITE, page_count);
2651        } else {
2652                list_add_tail(&ext->oe_link, &obj->oo_reading_exts);
2653                osc_update_pending(obj, OBD_BRW_READ, page_count);
2654        }
2655        osc_object_unlock(obj);
2656
2657        osc_io_unplug_async(env, cli, obj);
2658        return 0;
2659}
2660
2661/**
2662 * Called by osc_io_setattr_start() to freeze and destroy covering extents.
2663 */
2664int osc_cache_truncate_start(const struct lu_env *env, struct osc_io *oio,
2665                             struct osc_object *obj, __u64 size)
2666{
2667        struct client_obd *cli = osc_cli(obj);
2668        struct osc_extent *ext;
2669        struct osc_extent *temp;
2670        struct osc_extent *waiting = NULL;
2671        pgoff_t index;
2672        LIST_HEAD(list);
2673        int result = 0;
2674        bool partial;
2675
2676        /* pages with index greater or equal to index will be truncated. */
2677        index = cl_index(osc2cl(obj), size);
2678        partial = size > cl_offset(osc2cl(obj), index);
2679
2680again:
2681        osc_object_lock(obj);
2682        ext = osc_extent_search(obj, index);
2683        if (!ext)
2684                ext = first_extent(obj);
2685        else if (ext->oe_end < index)
2686                ext = next_extent(ext);
2687        while (ext) {
2688                EASSERT(ext->oe_state != OES_TRUNC, ext);
2689
2690                if (ext->oe_state > OES_CACHE || ext->oe_urgent) {
2691                        /* if ext is in urgent state, it means there must exist
2692                         * a page already having been flushed by write_page().
2693                         * We have to wait for this extent because we can't
2694                         * truncate that page.
2695                         */
2696                        LASSERT(!ext->oe_hp);
2697                        OSC_EXTENT_DUMP(D_CACHE, ext,
2698                                        "waiting for busy extent\n");
2699                        waiting = osc_extent_get(ext);
2700                        break;
2701                }
2702
2703                OSC_EXTENT_DUMP(D_CACHE, ext, "try to trunc:%llu.\n", size);
2704
2705                osc_extent_get(ext);
2706                if (ext->oe_state == OES_ACTIVE) {
2707                        /* though we grab inode mutex for write path, but we
2708                         * release it before releasing extent(in osc_io_end()),
2709                         * so there is a race window that an extent is still
2710                         * in OES_ACTIVE when truncate starts.
2711                         */
2712                        LASSERT(!ext->oe_trunc_pending);
2713                        ext->oe_trunc_pending = 1;
2714                } else {
2715                        EASSERT(ext->oe_state == OES_CACHE, ext);
2716                        osc_extent_state_set(ext, OES_TRUNC);
2717                        osc_update_pending(obj, OBD_BRW_WRITE,
2718                                           -ext->oe_nr_pages);
2719                }
2720                EASSERT(list_empty(&ext->oe_link), ext);
2721                list_add_tail(&ext->oe_link, &list);
2722
2723                ext = next_extent(ext);
2724        }
2725        osc_object_unlock(obj);
2726
2727        osc_list_maint(cli, obj);
2728
2729        list_for_each_entry_safe(ext, temp, &list, oe_link) {
2730                int rc;
2731
2732                list_del_init(&ext->oe_link);
2733
2734                /* extent may be in OES_ACTIVE state because inode mutex
2735                 * is released before osc_io_end() in file write case
2736                 */
2737                if (ext->oe_state != OES_TRUNC)
2738                        osc_extent_wait(env, ext, OES_TRUNC);
2739
2740                rc = osc_extent_truncate(ext, index, partial);
2741                if (rc < 0) {
2742                        if (result == 0)
2743                                result = rc;
2744
2745                        OSC_EXTENT_DUMP(D_ERROR, ext,
2746                                        "truncate error %d\n", rc);
2747                } else if (ext->oe_nr_pages == 0) {
2748                        osc_extent_remove(ext);
2749                } else {
2750                        /* this must be an overlapped extent which means only
2751                         * part of pages in this extent have been truncated.
2752                         */
2753                        EASSERTF(ext->oe_start <= index, ext,
2754                                 "trunc index = %lu/%d.\n", index, partial);
2755                        /* fix index to skip this partially truncated extent */
2756                        index = ext->oe_end + 1;
2757                        partial = false;
2758
2759                        /* we need to hold this extent in OES_TRUNC state so
2760                         * that no writeback will happen. This is to avoid
2761                         * BUG 17397.
2762                         */
2763                        LASSERT(!oio->oi_trunc);
2764                        oio->oi_trunc = osc_extent_get(ext);
2765                        OSC_EXTENT_DUMP(D_CACHE, ext,
2766                                        "trunc at %llu\n", size);
2767                }
2768                osc_extent_put(env, ext);
2769        }
2770        if (waiting) {
2771                int rc;
2772
2773                /* ignore the result of osc_extent_wait the write initiator
2774                 * should take care of it.
2775                 */
2776                rc = osc_extent_wait(env, waiting, OES_INV);
2777                if (rc < 0)
2778                        OSC_EXTENT_DUMP(D_CACHE, waiting, "error: %d.\n", rc);
2779
2780                osc_extent_put(env, waiting);
2781                waiting = NULL;
2782                goto again;
2783        }
2784        return result;
2785}
2786
2787/**
2788 * Called after osc_io_setattr_end to add oio->oi_trunc back to cache.
2789 */
2790void osc_cache_truncate_end(const struct lu_env *env, struct osc_io *oio,
2791                            struct osc_object *obj)
2792{
2793        struct osc_extent *ext = oio->oi_trunc;
2794
2795        oio->oi_trunc = NULL;
2796        if (ext) {
2797                bool unplug = false;
2798
2799                EASSERT(ext->oe_nr_pages > 0, ext);
2800                EASSERT(ext->oe_state == OES_TRUNC, ext);
2801                EASSERT(!ext->oe_urgent, ext);
2802
2803                OSC_EXTENT_DUMP(D_CACHE, ext, "trunc -> cache.\n");
2804                osc_object_lock(obj);
2805                osc_extent_state_set(ext, OES_CACHE);
2806                if (ext->oe_fsync_wait && !ext->oe_urgent) {
2807                        ext->oe_urgent = 1;
2808                        list_move_tail(&ext->oe_link, &obj->oo_urgent_exts);
2809                        unplug = true;
2810                }
2811                osc_update_pending(obj, OBD_BRW_WRITE, ext->oe_nr_pages);
2812                osc_object_unlock(obj);
2813                osc_extent_put(env, ext);
2814
2815                if (unplug)
2816                        osc_io_unplug_async(env, osc_cli(obj), obj);
2817        }
2818}
2819
2820/**
2821 * Wait for extents in a specific range to be written out.
2822 * The caller must have called osc_cache_writeback_range() to issue IO
2823 * otherwise it will take a long time for this function to finish.
2824 *
2825 * Caller must hold inode_mutex , or cancel exclusive dlm lock so that
2826 * nobody else can dirty this range of file while we're waiting for
2827 * extents to be written.
2828 */
2829int osc_cache_wait_range(const struct lu_env *env, struct osc_object *obj,
2830                         pgoff_t start, pgoff_t end)
2831{
2832        struct osc_extent *ext;
2833        pgoff_t index = start;
2834        int result = 0;
2835
2836again:
2837        osc_object_lock(obj);
2838        ext = osc_extent_search(obj, index);
2839        if (!ext)
2840                ext = first_extent(obj);
2841        else if (ext->oe_end < index)
2842                ext = next_extent(ext);
2843        while (ext) {
2844                int rc;
2845
2846                if (ext->oe_start > end)
2847                        break;
2848
2849                if (!ext->oe_fsync_wait) {
2850                        ext = next_extent(ext);
2851                        continue;
2852                }
2853
2854                EASSERT(ergo(ext->oe_state == OES_CACHE,
2855                             ext->oe_hp || ext->oe_urgent), ext);
2856                EASSERT(ergo(ext->oe_state == OES_ACTIVE,
2857                             !ext->oe_hp && ext->oe_urgent), ext);
2858
2859                index = ext->oe_end + 1;
2860                osc_extent_get(ext);
2861                osc_object_unlock(obj);
2862
2863                rc = osc_extent_wait(env, ext, OES_INV);
2864                if (result == 0)
2865                        result = rc;
2866                osc_extent_put(env, ext);
2867                goto again;
2868        }
2869        osc_object_unlock(obj);
2870
2871        OSC_IO_DEBUG(obj, "sync file range.\n");
2872        return result;
2873}
2874
2875/**
2876 * Called to write out a range of osc object.
2877 *
2878 * @hp     : should be set this is caused by lock cancel;
2879 * @discard: is set if dirty pages should be dropped - file will be deleted or
2880 *         truncated, this implies there is no partially discarding extents.
2881 *
2882 * Return how many pages will be issued, or error code if error occurred.
2883 */
2884int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
2885                              pgoff_t start, pgoff_t end, int hp, int discard)
2886{
2887        struct osc_extent *ext;
2888        LIST_HEAD(discard_list);
2889        bool unplug = false;
2890        int result = 0;
2891
2892        osc_object_lock(obj);
2893        ext = osc_extent_search(obj, start);
2894        if (!ext)
2895                ext = first_extent(obj);
2896        else if (ext->oe_end < start)
2897                ext = next_extent(ext);
2898        while (ext) {
2899                if (ext->oe_start > end)
2900                        break;
2901
2902                ext->oe_fsync_wait = 1;
2903                switch (ext->oe_state) {
2904                case OES_CACHE:
2905                        result += ext->oe_nr_pages;
2906                        if (!discard) {
2907                                struct list_head *list = NULL;
2908
2909                                if (hp) {
2910                                        EASSERT(!ext->oe_hp, ext);
2911                                        ext->oe_hp = 1;
2912                                        list = &obj->oo_hp_exts;
2913                                } else if (!ext->oe_urgent) {
2914                                        ext->oe_urgent = 1;
2915                                        list = &obj->oo_urgent_exts;
2916                                }
2917                                if (list)
2918                                        list_move_tail(&ext->oe_link, list);
2919                                unplug = true;
2920                        } else {
2921                                /* the only discarder is lock cancelling, so
2922                                 * [start, end] must contain this extent
2923                                 */
2924                                EASSERT(ext->oe_start >= start &&
2925                                        ext->oe_max_end <= end, ext);
2926                                osc_extent_state_set(ext, OES_LOCKING);
2927                                ext->oe_owner = current;
2928                                list_move_tail(&ext->oe_link, &discard_list);
2929                                osc_update_pending(obj, OBD_BRW_WRITE,
2930                                                   -ext->oe_nr_pages);
2931                        }
2932                        break;
2933                case OES_ACTIVE:
2934                        /* It's pretty bad to wait for ACTIVE extents, because
2935                         * we don't know how long we will wait for it to be
2936                         * flushed since it may be blocked at awaiting more
2937                         * grants. We do this for the correctness of fsync.
2938                         */
2939                        LASSERT(hp == 0 && discard == 0);
2940                        ext->oe_urgent = 1;
2941                        break;
2942                case OES_TRUNC:
2943                        /* this extent is being truncated, can't do anything
2944                         * for it now. it will be set to urgent after truncate
2945                         * is finished in osc_cache_truncate_end().
2946                         */
2947                default:
2948                        break;
2949                }
2950                ext = next_extent(ext);
2951        }
2952        osc_object_unlock(obj);
2953
2954        LASSERT(ergo(!discard, list_empty(&discard_list)));
2955        if (!list_empty(&discard_list)) {
2956                struct osc_extent *tmp;
2957                int rc;
2958
2959                osc_list_maint(osc_cli(obj), obj);
2960                list_for_each_entry_safe(ext, tmp, &discard_list, oe_link) {
2961                        list_del_init(&ext->oe_link);
2962                        EASSERT(ext->oe_state == OES_LOCKING, ext);
2963
2964                        /* Discard caching pages. We don't actually write this
2965                         * extent out but we complete it as if we did.
2966                         */
2967                        rc = osc_extent_make_ready(env, ext);
2968                        if (unlikely(rc < 0)) {
2969                                OSC_EXTENT_DUMP(D_ERROR, ext,
2970                                                "make_ready returned %d\n", rc);
2971                                if (result >= 0)
2972                                        result = rc;
2973                        }
2974
2975                        /* finish the extent as if the pages were sent */
2976                        osc_extent_finish(env, ext, 0, 0);
2977                }
2978        }
2979
2980        if (unplug)
2981                osc_io_unplug(env, osc_cli(obj), obj);
2982
2983        if (hp || discard) {
2984                int rc;
2985
2986                rc = osc_cache_wait_range(env, obj, start, end);
2987                if (result >= 0 && rc < 0)
2988                        result = rc;
2989        }
2990
2991        OSC_IO_DEBUG(obj, "cache page out.\n");
2992        return result;
2993}
2994
2995/** @} osc */
2996