linux/drivers/staging/lustre/lustre/osc/osc_cache.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2012, Intel Corporation.
  31 *
  32 */
  33/*
  34 * This file is part of Lustre, http://www.lustre.org/
  35 * Lustre is a trademark of Sun Microsystems, Inc.
  36 *
  37 * osc cache management.
  38 *
  39 * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
  40 */
  41
  42#define DEBUG_SUBSYSTEM S_OSC
  43
  44#include "osc_cl_internal.h"
  45#include "osc_internal.h"
  46
  47static int extent_debug; /* set it to be true for more debug */
  48
  49static void osc_update_pending(struct osc_object *obj, int cmd, int delta);
  50static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
  51                           int state);
  52static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
  53                              struct osc_async_page *oap, int sent, int rc);
  54static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
  55                          int cmd);
  56static int osc_refresh_count(const struct lu_env *env,
  57                             struct osc_async_page *oap, int cmd);
  58static int osc_io_unplug_async(const struct lu_env *env,
  59                               struct client_obd *cli, struct osc_object *osc);
  60static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
  61                           unsigned int lost_grant);
  62
  63static void osc_extent_tree_dump0(int level, struct osc_object *obj,
  64                                  const char *func, int line);
  65#define osc_extent_tree_dump(lvl, obj) \
  66        osc_extent_tree_dump0(lvl, obj, __func__, __LINE__)
  67
  68/** \addtogroup osc
  69 *  @{
  70 */
  71
  72/* ------------------ osc extent ------------------ */
  73static inline char *ext_flags(struct osc_extent *ext, char *flags)
  74{
  75        char *buf = flags;
  76        *buf++ = ext->oe_rw ? 'r' : 'w';
  77        if (ext->oe_intree)
  78                *buf++ = 'i';
  79        if (ext->oe_srvlock)
  80                *buf++ = 's';
  81        if (ext->oe_hp)
  82                *buf++ = 'h';
  83        if (ext->oe_urgent)
  84                *buf++ = 'u';
  85        if (ext->oe_memalloc)
  86                *buf++ = 'm';
  87        if (ext->oe_trunc_pending)
  88                *buf++ = 't';
  89        if (ext->oe_fsync_wait)
  90                *buf++ = 'Y';
  91        *buf = 0;
  92        return flags;
  93}
  94
  95static inline char list_empty_marker(struct list_head *list)
  96{
  97        return list_empty(list) ? '-' : '+';
  98}
  99
 100#define EXTSTR       "[%lu -> %lu/%lu]"
 101#define EXTPARA(ext) (ext)->oe_start, (ext)->oe_end, (ext)->oe_max_end
 102static const char *oes_strings[] = {
 103        "inv", "active", "cache", "locking", "lockdone", "rpc", "trunc", NULL };
 104
 105#define OSC_EXTENT_DUMP(lvl, extent, fmt, ...) do {                           \
 106        struct osc_extent *__ext = (extent);                                  \
 107        char __buf[16];                                                       \
 108                                                                              \
 109        CDEBUG(lvl,                                                           \
 110                "extent %p@{" EXTSTR ", "                                     \
 111                "[%d|%d|%c|%s|%s|%p], [%d|%d|%c|%c|%p|%u|%p]} " fmt,          \
 112                /* ----- extent part 0 ----- */                               \
 113                __ext, EXTPARA(__ext),                                        \
 114                /* ----- part 1 ----- */                                      \
 115                atomic_read(&__ext->oe_refc),                         \
 116                atomic_read(&__ext->oe_users),                        \
 117                list_empty_marker(&__ext->oe_link),                           \
 118                oes_strings[__ext->oe_state], ext_flags(__ext, __buf),        \
 119                __ext->oe_obj,                                                \
 120                /* ----- part 2 ----- */                                      \
 121                __ext->oe_grants, __ext->oe_nr_pages,                         \
 122                list_empty_marker(&__ext->oe_pages),                          \
 123                waitqueue_active(&__ext->oe_waitq) ? '+' : '-',               \
 124                __ext->oe_osclock, __ext->oe_mppr, __ext->oe_owner,           \
 125                /* ----- part 4 ----- */                                      \
 126                ## __VA_ARGS__);                                              \
 127} while (0)
 128
 129#undef EASSERTF
 130#define EASSERTF(expr, ext, fmt, args...) do {                          \
 131        if (!(expr)) {                                                  \
 132                OSC_EXTENT_DUMP(D_ERROR, (ext), fmt, ##args);           \
 133                osc_extent_tree_dump(D_ERROR, (ext)->oe_obj);           \
 134                LASSERT(expr);                                          \
 135        }                                                               \
 136} while (0)
 137
 138#undef EASSERT
 139#define EASSERT(expr, ext) EASSERTF(expr, ext, "\n")
 140
 141static inline struct osc_extent *rb_extent(struct rb_node *n)
 142{
 143        if (n == NULL)
 144                return NULL;
 145
 146        return container_of(n, struct osc_extent, oe_node);
 147}
 148
 149static inline struct osc_extent *next_extent(struct osc_extent *ext)
 150{
 151        if (ext == NULL)
 152                return NULL;
 153
 154        LASSERT(ext->oe_intree);
 155        return rb_extent(rb_next(&ext->oe_node));
 156}
 157
 158static inline struct osc_extent *prev_extent(struct osc_extent *ext)
 159{
 160        if (ext == NULL)
 161                return NULL;
 162
 163        LASSERT(ext->oe_intree);
 164        return rb_extent(rb_prev(&ext->oe_node));
 165}
 166
 167static inline struct osc_extent *first_extent(struct osc_object *obj)
 168{
 169        return rb_extent(rb_first(&obj->oo_root));
 170}
 171
 172/* object must be locked by caller. */
 173static int osc_extent_sanity_check0(struct osc_extent *ext,
 174                                    const char *func, const int line)
 175{
 176        struct osc_object *obj = ext->oe_obj;
 177        struct osc_async_page *oap;
 178        int page_count;
 179        int rc = 0;
 180
 181        if (!osc_object_is_locked(obj)) {
 182                rc = 9;
 183                goto out;
 184        }
 185
 186        if (ext->oe_state >= OES_STATE_MAX) {
 187                rc = 10;
 188                goto out;
 189        }
 190
 191        if (atomic_read(&ext->oe_refc) <= 0) {
 192                rc = 20;
 193                goto out;
 194        }
 195
 196        if (atomic_read(&ext->oe_refc) < atomic_read(&ext->oe_users)) {
 197                rc = 30;
 198                goto out;
 199        }
 200
 201        switch (ext->oe_state) {
 202        case OES_INV:
 203                if (ext->oe_nr_pages > 0 || !list_empty(&ext->oe_pages))
 204                        rc = 35;
 205                else
 206                        rc = 0;
 207                goto out;
 208        case OES_ACTIVE:
 209                if (atomic_read(&ext->oe_users) == 0) {
 210                        rc = 40;
 211                        goto out;
 212                }
 213                if (ext->oe_hp) {
 214                        rc = 50;
 215                        goto out;
 216                }
 217                if (ext->oe_fsync_wait && !ext->oe_urgent) {
 218                        rc = 55;
 219                        goto out;
 220                }
 221                break;
 222        case OES_CACHE:
 223                if (ext->oe_grants == 0) {
 224                        rc = 60;
 225                        goto out;
 226                }
 227                if (ext->oe_fsync_wait && !ext->oe_urgent && !ext->oe_hp) {
 228                        rc = 65;
 229                        goto out;
 230                }
 231        default:
 232                if (atomic_read(&ext->oe_users) > 0) {
 233                        rc = 70;
 234                        goto out;
 235                }
 236        }
 237
 238        if (ext->oe_max_end < ext->oe_end || ext->oe_end < ext->oe_start) {
 239                rc = 80;
 240                goto out;
 241        }
 242
 243        if (ext->oe_osclock == NULL && ext->oe_grants > 0) {
 244                rc = 90;
 245                goto out;
 246        }
 247
 248        if (ext->oe_osclock) {
 249                struct cl_lock_descr *descr;
 250                descr = &ext->oe_osclock->cll_descr;
 251                if (!(descr->cld_start <= ext->oe_start &&
 252                      descr->cld_end >= ext->oe_max_end)) {
 253                        rc = 100;
 254                        goto out;
 255                }
 256        }
 257
 258        if (ext->oe_nr_pages > ext->oe_mppr) {
 259                rc = 105;
 260                goto out;
 261        }
 262
 263        /* Do not verify page list if extent is in RPC. This is because an
 264         * in-RPC extent is supposed to be exclusively accessible w/o lock. */
 265        if (ext->oe_state > OES_CACHE) {
 266                rc = 0;
 267                goto out;
 268        }
 269
 270        if (!extent_debug) {
 271                rc = 0;
 272                goto out;
 273        }
 274
 275        page_count = 0;
 276        list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
 277                pgoff_t index = oap2cl_page(oap)->cp_index;
 278                ++page_count;
 279                if (index > ext->oe_end || index < ext->oe_start) {
 280                        rc = 110;
 281                        goto out;
 282                }
 283        }
 284        if (page_count != ext->oe_nr_pages) {
 285                rc = 120;
 286                goto out;
 287        }
 288
 289out:
 290        if (rc != 0)
 291                OSC_EXTENT_DUMP(D_ERROR, ext,
 292                                "%s:%d sanity check %p failed with rc = %d\n",
 293                                func, line, ext, rc);
 294        return rc;
 295}
 296
 297#define sanity_check_nolock(ext) \
 298        osc_extent_sanity_check0(ext, __func__, __LINE__)
 299
 300#define sanity_check(ext) ({                                               \
 301        int __res;                                                           \
 302        osc_object_lock((ext)->oe_obj);                                 \
 303        __res = sanity_check_nolock(ext);                                     \
 304        osc_object_unlock((ext)->oe_obj);                                     \
 305        __res;                                                           \
 306})
 307
 308
 309/**
 310 * sanity check - to make sure there is no overlapped extent in the tree.
 311 */
 312static int osc_extent_is_overlapped(struct osc_object *obj,
 313                                    struct osc_extent *ext)
 314{
 315        struct osc_extent *tmp;
 316
 317        LASSERT(osc_object_is_locked(obj));
 318
 319        if (!extent_debug)
 320                return 0;
 321
 322        for (tmp = first_extent(obj); tmp != NULL; tmp = next_extent(tmp)) {
 323                if (tmp == ext)
 324                        continue;
 325                if (tmp->oe_end >= ext->oe_start &&
 326                    tmp->oe_start <= ext->oe_end)
 327                        return 1;
 328        }
 329        return 0;
 330}
 331
 332static void osc_extent_state_set(struct osc_extent *ext, int state)
 333{
 334        LASSERT(osc_object_is_locked(ext->oe_obj));
 335        LASSERT(state >= OES_INV && state < OES_STATE_MAX);
 336
 337        /* Never try to sanity check a state changing extent :-) */
 338        /* LASSERT(sanity_check_nolock(ext) == 0); */
 339
 340        /* TODO: validate the state machine */
 341        ext->oe_state = state;
 342        wake_up_all(&ext->oe_waitq);
 343}
 344
 345static struct osc_extent *osc_extent_alloc(struct osc_object *obj)
 346{
 347        struct osc_extent *ext;
 348
 349        OBD_SLAB_ALLOC_PTR_GFP(ext, osc_extent_kmem, GFP_IOFS);
 350        if (ext == NULL)
 351                return NULL;
 352
 353        RB_CLEAR_NODE(&ext->oe_node);
 354        ext->oe_obj = obj;
 355        atomic_set(&ext->oe_refc, 1);
 356        atomic_set(&ext->oe_users, 0);
 357        INIT_LIST_HEAD(&ext->oe_link);
 358        ext->oe_state = OES_INV;
 359        INIT_LIST_HEAD(&ext->oe_pages);
 360        init_waitqueue_head(&ext->oe_waitq);
 361        ext->oe_osclock = NULL;
 362
 363        return ext;
 364}
 365
 366static void osc_extent_free(struct osc_extent *ext)
 367{
 368        OBD_SLAB_FREE_PTR(ext, osc_extent_kmem);
 369}
 370
 371static struct osc_extent *osc_extent_get(struct osc_extent *ext)
 372{
 373        LASSERT(atomic_read(&ext->oe_refc) >= 0);
 374        atomic_inc(&ext->oe_refc);
 375        return ext;
 376}
 377
 378static void osc_extent_put(const struct lu_env *env, struct osc_extent *ext)
 379{
 380        LASSERT(atomic_read(&ext->oe_refc) > 0);
 381        if (atomic_dec_and_test(&ext->oe_refc)) {
 382                LASSERT(list_empty(&ext->oe_link));
 383                LASSERT(atomic_read(&ext->oe_users) == 0);
 384                LASSERT(ext->oe_state == OES_INV);
 385                LASSERT(!ext->oe_intree);
 386
 387                if (ext->oe_osclock) {
 388                        cl_lock_put(env, ext->oe_osclock);
 389                        ext->oe_osclock = NULL;
 390                }
 391                osc_extent_free(ext);
 392        }
 393}
 394
 395/**
 396 * osc_extent_put_trust() is a special version of osc_extent_put() when
 397 * it's known that the caller is not the last user. This is to address the
 398 * problem of lacking of lu_env ;-).
 399 */
 400static void osc_extent_put_trust(struct osc_extent *ext)
 401{
 402        LASSERT(atomic_read(&ext->oe_refc) > 1);
 403        LASSERT(osc_object_is_locked(ext->oe_obj));
 404        atomic_dec(&ext->oe_refc);
 405}
 406
 407/**
 408 * Return the extent which includes pgoff @index, or return the greatest
 409 * previous extent in the tree.
 410 */
 411static struct osc_extent *osc_extent_search(struct osc_object *obj,
 412                                            pgoff_t index)
 413{
 414        struct rb_node    *n = obj->oo_root.rb_node;
 415        struct osc_extent *tmp, *p = NULL;
 416
 417        LASSERT(osc_object_is_locked(obj));
 418        while (n != NULL) {
 419                tmp = rb_extent(n);
 420                if (index < tmp->oe_start) {
 421                        n = n->rb_left;
 422                } else if (index > tmp->oe_end) {
 423                        p = rb_extent(n);
 424                        n = n->rb_right;
 425                } else {
 426                        return tmp;
 427                }
 428        }
 429        return p;
 430}
 431
 432/*
 433 * Return the extent covering @index, otherwise return NULL.
 434 * caller must have held object lock.
 435 */
 436static struct osc_extent *osc_extent_lookup(struct osc_object *obj,
 437                                            pgoff_t index)
 438{
 439        struct osc_extent *ext;
 440
 441        ext = osc_extent_search(obj, index);
 442        if (ext != NULL && ext->oe_start <= index && index <= ext->oe_end)
 443                return osc_extent_get(ext);
 444        return NULL;
 445}
 446
 447/* caller must have held object lock. */
 448static void osc_extent_insert(struct osc_object *obj, struct osc_extent *ext)
 449{
 450        struct rb_node   **n      = &obj->oo_root.rb_node;
 451        struct rb_node    *parent = NULL;
 452        struct osc_extent *tmp;
 453
 454        LASSERT(ext->oe_intree == 0);
 455        LASSERT(ext->oe_obj == obj);
 456        LASSERT(osc_object_is_locked(obj));
 457        while (*n != NULL) {
 458                tmp = rb_extent(*n);
 459                parent = *n;
 460
 461                if (ext->oe_end < tmp->oe_start)
 462                        n = &(*n)->rb_left;
 463                else if (ext->oe_start > tmp->oe_end)
 464                        n = &(*n)->rb_right;
 465                else
 466                        EASSERTF(0, tmp, EXTSTR, EXTPARA(ext));
 467        }
 468        rb_link_node(&ext->oe_node, parent, n);
 469        rb_insert_color(&ext->oe_node, &obj->oo_root);
 470        osc_extent_get(ext);
 471        ext->oe_intree = 1;
 472}
 473
 474/* caller must have held object lock. */
 475static void osc_extent_erase(struct osc_extent *ext)
 476{
 477        struct osc_object *obj = ext->oe_obj;
 478        LASSERT(osc_object_is_locked(obj));
 479        if (ext->oe_intree) {
 480                rb_erase(&ext->oe_node, &obj->oo_root);
 481                ext->oe_intree = 0;
 482                /* rbtree held a refcount */
 483                osc_extent_put_trust(ext);
 484        }
 485}
 486
 487static struct osc_extent *osc_extent_hold(struct osc_extent *ext)
 488{
 489        struct osc_object *obj = ext->oe_obj;
 490
 491        LASSERT(osc_object_is_locked(obj));
 492        LASSERT(ext->oe_state == OES_ACTIVE || ext->oe_state == OES_CACHE);
 493        if (ext->oe_state == OES_CACHE) {
 494                osc_extent_state_set(ext, OES_ACTIVE);
 495                osc_update_pending(obj, OBD_BRW_WRITE, -ext->oe_nr_pages);
 496        }
 497        atomic_inc(&ext->oe_users);
 498        list_del_init(&ext->oe_link);
 499        return osc_extent_get(ext);
 500}
 501
 502static void __osc_extent_remove(struct osc_extent *ext)
 503{
 504        LASSERT(osc_object_is_locked(ext->oe_obj));
 505        LASSERT(list_empty(&ext->oe_pages));
 506        osc_extent_erase(ext);
 507        list_del_init(&ext->oe_link);
 508        osc_extent_state_set(ext, OES_INV);
 509        OSC_EXTENT_DUMP(D_CACHE, ext, "destroyed.\n");
 510}
 511
 512static void osc_extent_remove(struct osc_extent *ext)
 513{
 514        struct osc_object *obj = ext->oe_obj;
 515
 516        osc_object_lock(obj);
 517        __osc_extent_remove(ext);
 518        osc_object_unlock(obj);
 519}
 520
 521/**
 522 * This function is used to merge extents to get better performance. It checks
 523 * if @cur and @victim are contiguous at chunk level.
 524 */
 525static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
 526                            struct osc_extent *victim)
 527{
 528        struct osc_object *obj = cur->oe_obj;
 529        pgoff_t chunk_start;
 530        pgoff_t chunk_end;
 531        int ppc_bits;
 532
 533        LASSERT(cur->oe_state == OES_CACHE);
 534        LASSERT(osc_object_is_locked(obj));
 535        if (victim == NULL)
 536                return -EINVAL;
 537
 538        if (victim->oe_state != OES_CACHE || victim->oe_fsync_wait)
 539                return -EBUSY;
 540
 541        if (cur->oe_max_end != victim->oe_max_end)
 542                return -ERANGE;
 543
 544        LASSERT(cur->oe_osclock == victim->oe_osclock);
 545        ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_CACHE_SHIFT;
 546        chunk_start = cur->oe_start >> ppc_bits;
 547        chunk_end   = cur->oe_end   >> ppc_bits;
 548        if (chunk_start   != (victim->oe_end >> ppc_bits) + 1 &&
 549            chunk_end + 1 != victim->oe_start >> ppc_bits)
 550                return -ERANGE;
 551
 552        OSC_EXTENT_DUMP(D_CACHE, victim, "will be merged by %p.\n", cur);
 553
 554        cur->oe_start     = min(cur->oe_start, victim->oe_start);
 555        cur->oe_end       = max(cur->oe_end,   victim->oe_end);
 556        cur->oe_grants   += victim->oe_grants;
 557        cur->oe_nr_pages += victim->oe_nr_pages;
 558        /* only the following bits are needed to merge */
 559        cur->oe_urgent   |= victim->oe_urgent;
 560        cur->oe_memalloc |= victim->oe_memalloc;
 561        list_splice_init(&victim->oe_pages, &cur->oe_pages);
 562        list_del_init(&victim->oe_link);
 563        victim->oe_nr_pages = 0;
 564
 565        osc_extent_get(victim);
 566        __osc_extent_remove(victim);
 567        osc_extent_put(env, victim);
 568
 569        OSC_EXTENT_DUMP(D_CACHE, cur, "after merging %p.\n", victim);
 570        return 0;
 571}
 572
 573/**
 574 * Drop user count of osc_extent, and unplug IO asynchronously.
 575 */
 576void osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
 577{
 578        struct osc_object *obj = ext->oe_obj;
 579
 580        LASSERT(atomic_read(&ext->oe_users) > 0);
 581        LASSERT(sanity_check(ext) == 0);
 582        LASSERT(ext->oe_grants > 0);
 583
 584        if (atomic_dec_and_lock(&ext->oe_users, &obj->oo_lock)) {
 585                LASSERT(ext->oe_state == OES_ACTIVE);
 586                if (ext->oe_trunc_pending) {
 587                        /* a truncate process is waiting for this extent.
 588                         * This may happen due to a race, check
 589                         * osc_cache_truncate_start(). */
 590                        osc_extent_state_set(ext, OES_TRUNC);
 591                        ext->oe_trunc_pending = 0;
 592                } else {
 593                        osc_extent_state_set(ext, OES_CACHE);
 594                        osc_update_pending(obj, OBD_BRW_WRITE,
 595                                           ext->oe_nr_pages);
 596
 597                        /* try to merge the previous and next extent. */
 598                        osc_extent_merge(env, ext, prev_extent(ext));
 599                        osc_extent_merge(env, ext, next_extent(ext));
 600
 601                        if (ext->oe_urgent)
 602                                list_move_tail(&ext->oe_link,
 603                                                   &obj->oo_urgent_exts);
 604                }
 605                osc_object_unlock(obj);
 606
 607                osc_io_unplug_async(env, osc_cli(obj), obj);
 608        }
 609        osc_extent_put(env, ext);
 610}
 611
 612static inline int overlapped(struct osc_extent *ex1, struct osc_extent *ex2)
 613{
 614        return !(ex1->oe_end < ex2->oe_start || ex2->oe_end < ex1->oe_start);
 615}
 616
 617/**
 618 * Find or create an extent which includes @index, core function to manage
 619 * extent tree.
 620 */
 621struct osc_extent *osc_extent_find(const struct lu_env *env,
 622                                   struct osc_object *obj, pgoff_t index,
 623                                   int *grants)
 624
 625{
 626        struct client_obd *cli = osc_cli(obj);
 627        struct cl_lock    *lock;
 628        struct osc_extent *cur;
 629        struct osc_extent *ext;
 630        struct osc_extent *conflict = NULL;
 631        struct osc_extent *found = NULL;
 632        pgoff_t    chunk;
 633        pgoff_t    max_end;
 634        int     max_pages; /* max_pages_per_rpc */
 635        int     chunksize;
 636        int     ppc_bits; /* pages per chunk bits */
 637        int     chunk_mask;
 638        int     rc;
 639
 640        cur = osc_extent_alloc(obj);
 641        if (cur == NULL)
 642                return ERR_PTR(-ENOMEM);
 643
 644        lock = cl_lock_at_pgoff(env, osc2cl(obj), index, NULL, 1, 0);
 645        LASSERT(lock != NULL);
 646        LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
 647
 648        LASSERT(cli->cl_chunkbits >= PAGE_CACHE_SHIFT);
 649        ppc_bits   = cli->cl_chunkbits - PAGE_CACHE_SHIFT;
 650        chunk_mask = ~((1 << ppc_bits) - 1);
 651        chunksize  = 1 << cli->cl_chunkbits;
 652        chunk      = index >> ppc_bits;
 653
 654        /* align end to rpc edge, rpc size may not be a power 2 integer. */
 655        max_pages = cli->cl_max_pages_per_rpc;
 656        LASSERT((max_pages & ~chunk_mask) == 0);
 657        max_end = index - (index % max_pages) + max_pages - 1;
 658        max_end = min_t(pgoff_t, max_end, lock->cll_descr.cld_end);
 659
 660        /* initialize new extent by parameters so far */
 661        cur->oe_max_end = max_end;
 662        cur->oe_start   = index & chunk_mask;
 663        cur->oe_end     = ((index + ~chunk_mask + 1) & chunk_mask) - 1;
 664        if (cur->oe_start < lock->cll_descr.cld_start)
 665                cur->oe_start = lock->cll_descr.cld_start;
 666        if (cur->oe_end > max_end)
 667                cur->oe_end = max_end;
 668        cur->oe_osclock = lock;
 669        cur->oe_grants  = 0;
 670        cur->oe_mppr    = max_pages;
 671
 672        /* grants has been allocated by caller */
 673        LASSERTF(*grants >= chunksize + cli->cl_extent_tax,
 674                 "%u/%u/%u.\n", *grants, chunksize, cli->cl_extent_tax);
 675        LASSERTF((max_end - cur->oe_start) < max_pages, EXTSTR, EXTPARA(cur));
 676
 677restart:
 678        osc_object_lock(obj);
 679        ext = osc_extent_search(obj, cur->oe_start);
 680        if (ext == NULL)
 681                ext = first_extent(obj);
 682        while (ext != NULL) {
 683                loff_t ext_chk_start = ext->oe_start >> ppc_bits;
 684                loff_t ext_chk_end   = ext->oe_end   >> ppc_bits;
 685
 686                LASSERT(sanity_check_nolock(ext) == 0);
 687                if (chunk > ext_chk_end + 1)
 688                        break;
 689
 690                /* if covering by different locks, no chance to match */
 691                if (lock != ext->oe_osclock) {
 692                        EASSERTF(!overlapped(ext, cur), ext,
 693                                 EXTSTR, EXTPARA(cur));
 694
 695                        ext = next_extent(ext);
 696                        continue;
 697                }
 698
 699                /* discontiguous chunks? */
 700                if (chunk + 1 < ext_chk_start) {
 701                        ext = next_extent(ext);
 702                        continue;
 703                }
 704
 705                /* ok, from now on, ext and cur have these attrs:
 706                 * 1. covered by the same lock
 707                 * 2. contiguous at chunk level or overlapping. */
 708
 709                if (overlapped(ext, cur)) {
 710                        /* cur is the minimum unit, so overlapping means
 711                         * full contain. */
 712                        EASSERTF((ext->oe_start <= cur->oe_start &&
 713                                  ext->oe_end >= cur->oe_end),
 714                                 ext, EXTSTR, EXTPARA(cur));
 715
 716                        if (ext->oe_state > OES_CACHE || ext->oe_fsync_wait) {
 717                                /* for simplicity, we wait for this extent to
 718                                 * finish before going forward. */
 719                                conflict = osc_extent_get(ext);
 720                                break;
 721                        }
 722
 723                        found = osc_extent_hold(ext);
 724                        break;
 725                }
 726
 727                /* non-overlapped extent */
 728                if (ext->oe_state != OES_CACHE || ext->oe_fsync_wait) {
 729                        /* we can't do anything for a non OES_CACHE extent, or
 730                         * if there is someone waiting for this extent to be
 731                         * flushed, try next one. */
 732                        ext = next_extent(ext);
 733                        continue;
 734                }
 735
 736                /* check if they belong to the same rpc slot before trying to
 737                 * merge. the extents are not overlapped and contiguous at
 738                 * chunk level to get here. */
 739                if (ext->oe_max_end != max_end) {
 740                        /* if they don't belong to the same RPC slot or
 741                         * max_pages_per_rpc has ever changed, do not merge. */
 742                        ext = next_extent(ext);
 743                        continue;
 744                }
 745
 746                /* it's required that an extent must be contiguous at chunk
 747                 * level so that we know the whole extent is covered by grant
 748                 * (the pages in the extent are NOT required to be contiguous).
 749                 * Otherwise, it will be too much difficult to know which
 750                 * chunks have grants allocated. */
 751
 752                /* try to do front merge - extend ext's start */
 753                if (chunk + 1 == ext_chk_start) {
 754                        /* ext must be chunk size aligned */
 755                        EASSERT((ext->oe_start & ~chunk_mask) == 0, ext);
 756
 757                        /* pull ext's start back to cover cur */
 758                        ext->oe_start   = cur->oe_start;
 759                        ext->oe_grants += chunksize;
 760                        *grants -= chunksize;
 761
 762                        found = osc_extent_hold(ext);
 763                } else if (chunk == ext_chk_end + 1) {
 764                        /* rear merge */
 765                        ext->oe_end     = cur->oe_end;
 766                        ext->oe_grants += chunksize;
 767                        *grants -= chunksize;
 768
 769                        /* try to merge with the next one because we just fill
 770                         * in a gap */
 771                        if (osc_extent_merge(env, ext, next_extent(ext)) == 0)
 772                                /* we can save extent tax from next extent */
 773                                *grants += cli->cl_extent_tax;
 774
 775                        found = osc_extent_hold(ext);
 776                }
 777                if (found != NULL)
 778                        break;
 779
 780                ext = next_extent(ext);
 781        }
 782
 783        osc_extent_tree_dump(D_CACHE, obj);
 784        if (found != NULL) {
 785                LASSERT(conflict == NULL);
 786                if (!IS_ERR(found)) {
 787                        LASSERT(found->oe_osclock == cur->oe_osclock);
 788                        OSC_EXTENT_DUMP(D_CACHE, found,
 789                                        "found caching ext for %lu.\n", index);
 790                }
 791        } else if (conflict == NULL) {
 792                /* create a new extent */
 793                EASSERT(osc_extent_is_overlapped(obj, cur) == 0, cur);
 794                cur->oe_grants = chunksize + cli->cl_extent_tax;
 795                *grants -= cur->oe_grants;
 796                LASSERT(*grants >= 0);
 797
 798                cur->oe_state = OES_CACHE;
 799                found = osc_extent_hold(cur);
 800                osc_extent_insert(obj, cur);
 801                OSC_EXTENT_DUMP(D_CACHE, cur, "add into tree %lu/%lu.\n",
 802                                index, lock->cll_descr.cld_end);
 803        }
 804        osc_object_unlock(obj);
 805
 806        if (conflict != NULL) {
 807                LASSERT(found == NULL);
 808
 809                /* waiting for IO to finish. Please notice that it's impossible
 810                 * to be an OES_TRUNC extent. */
 811                rc = osc_extent_wait(env, conflict, OES_INV);
 812                osc_extent_put(env, conflict);
 813                conflict = NULL;
 814                if (rc < 0) {
 815                        found = ERR_PTR(rc);
 816                        goto out;
 817                }
 818
 819                goto restart;
 820        }
 821
 822out:
 823        osc_extent_put(env, cur);
 824        LASSERT(*grants >= 0);
 825        return found;
 826}
 827
 828/**
 829 * Called when IO is finished to an extent.
 830 */
 831int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
 832                      int sent, int rc)
 833{
 834        struct client_obd *cli = osc_cli(ext->oe_obj);
 835        struct osc_async_page *oap;
 836        struct osc_async_page *tmp;
 837        int nr_pages = ext->oe_nr_pages;
 838        int lost_grant = 0;
 839        int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
 840        __u64 last_off = 0;
 841        int last_count = -1;
 842
 843        OSC_EXTENT_DUMP(D_CACHE, ext, "extent finished.\n");
 844
 845        ext->oe_rc = rc ?: ext->oe_nr_pages;
 846        EASSERT(ergo(rc == 0, ext->oe_state == OES_RPC), ext);
 847        list_for_each_entry_safe(oap, tmp, &ext->oe_pages,
 848                                     oap_pending_item) {
 849                list_del_init(&oap->oap_rpc_item);
 850                list_del_init(&oap->oap_pending_item);
 851                if (last_off <= oap->oap_obj_off) {
 852                        last_off = oap->oap_obj_off;
 853                        last_count = oap->oap_count;
 854                }
 855
 856                --ext->oe_nr_pages;
 857                osc_ap_completion(env, cli, oap, sent, rc);
 858        }
 859        EASSERT(ext->oe_nr_pages == 0, ext);
 860
 861        if (!sent) {
 862                lost_grant = ext->oe_grants;
 863        } else if (blocksize < PAGE_CACHE_SIZE &&
 864                   last_count != PAGE_CACHE_SIZE) {
 865                /* For short writes we shouldn't count parts of pages that
 866                 * span a whole chunk on the OST side, or our accounting goes
 867                 * wrong.  Should match the code in filter_grant_check. */
 868                int offset = oap->oap_page_off & ~CFS_PAGE_MASK;
 869                int count = oap->oap_count + (offset & (blocksize - 1));
 870                int end = (offset + oap->oap_count) & (blocksize - 1);
 871                if (end)
 872                        count += blocksize - end;
 873
 874                lost_grant = PAGE_CACHE_SIZE - count;
 875        }
 876        if (ext->oe_grants > 0)
 877                osc_free_grant(cli, nr_pages, lost_grant);
 878
 879        osc_extent_remove(ext);
 880        /* put the refcount for RPC */
 881        osc_extent_put(env, ext);
 882        return 0;
 883}
 884
 885static int extent_wait_cb(struct osc_extent *ext, int state)
 886{
 887        int ret;
 888
 889        osc_object_lock(ext->oe_obj);
 890        ret = ext->oe_state == state;
 891        osc_object_unlock(ext->oe_obj);
 892
 893        return ret;
 894}
 895
 896/**
 897 * Wait for the extent's state to become @state.
 898 */
 899static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
 900                           int state)
 901{
 902        struct osc_object *obj = ext->oe_obj;
 903        struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL,
 904                                                  LWI_ON_SIGNAL_NOOP, NULL);
 905        int rc = 0;
 906
 907        osc_object_lock(obj);
 908        LASSERT(sanity_check_nolock(ext) == 0);
 909        /* `Kick' this extent only if the caller is waiting for it to be
 910         * written out. */
 911        if (state == OES_INV && !ext->oe_urgent && !ext->oe_hp &&
 912            !ext->oe_trunc_pending) {
 913                if (ext->oe_state == OES_ACTIVE) {
 914                        ext->oe_urgent = 1;
 915                } else if (ext->oe_state == OES_CACHE) {
 916                        ext->oe_urgent = 1;
 917                        osc_extent_hold(ext);
 918                        rc = 1;
 919                }
 920        }
 921        osc_object_unlock(obj);
 922        if (rc == 1)
 923                osc_extent_release(env, ext);
 924
 925        /* wait for the extent until its state becomes @state */
 926        rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state), &lwi);
 927        if (rc == -ETIMEDOUT) {
 928                OSC_EXTENT_DUMP(D_ERROR, ext,
 929                        "%s: wait ext to %d timedout, recovery in progress?\n",
 930                        osc_export(obj)->exp_obd->obd_name, state);
 931
 932                lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
 933                rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state),
 934                                  &lwi);
 935        }
 936        if (rc == 0 && ext->oe_rc < 0)
 937                rc = ext->oe_rc;
 938        return rc;
 939}
 940
 941/**
 942 * Discard pages with index greater than @size. If @ext is overlapped with
 943 * @size, then partial truncate happens.
 944 */
 945static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
 946                                bool partial)
 947{
 948        struct cl_env_nest     nest;
 949        struct lu_env    *env;
 950        struct cl_io      *io;
 951        struct osc_object     *obj = ext->oe_obj;
 952        struct client_obd     *cli = osc_cli(obj);
 953        struct osc_async_page *oap;
 954        struct osc_async_page *tmp;
 955        int                 pages_in_chunk = 0;
 956        int                 ppc_bits    = cli->cl_chunkbits - PAGE_CACHE_SHIFT;
 957        __u64             trunc_chunk = trunc_index >> ppc_bits;
 958        int                 grants   = 0;
 959        int                 nr_pages = 0;
 960        int                 rc       = 0;
 961
 962        LASSERT(sanity_check(ext) == 0);
 963        EASSERT(ext->oe_state == OES_TRUNC, ext);
 964        EASSERT(!ext->oe_urgent, ext);
 965
 966        /* Request new lu_env.
 967         * We can't use that env from osc_cache_truncate_start() because
 968         * it's from lov_io_sub and not fully initialized. */
 969        env = cl_env_nested_get(&nest);
 970        io  = &osc_env_info(env)->oti_io;
 971        io->ci_obj = cl_object_top(osc2cl(obj));
 972        rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
 973        if (rc < 0)
 974                goto out;
 975
 976        /* discard all pages with index greater then trunc_index */
 977        list_for_each_entry_safe(oap, tmp, &ext->oe_pages,
 978                                     oap_pending_item) {
 979                struct cl_page  *sub  = oap2cl_page(oap);
 980                struct cl_page  *page = cl_page_top(sub);
 981
 982                LASSERT(list_empty(&oap->oap_rpc_item));
 983
 984                /* only discard the pages with their index greater than
 985                 * trunc_index, and ... */
 986                if (sub->cp_index < trunc_index ||
 987                    (sub->cp_index == trunc_index && partial)) {
 988                        /* accounting how many pages remaining in the chunk
 989                         * so that we can calculate grants correctly. */
 990                        if (sub->cp_index >> ppc_bits == trunc_chunk)
 991                                ++pages_in_chunk;
 992                        continue;
 993                }
 994
 995                list_del_init(&oap->oap_pending_item);
 996
 997                cl_page_get(page);
 998                lu_ref_add(&page->cp_reference, "truncate", current);
 999
1000                if (cl_page_own(env, io, page) == 0) {
1001                        cl_page_unmap(env, io, page);
1002                        cl_page_discard(env, io, page);
1003                        cl_page_disown(env, io, page);
1004                } else {
1005                        LASSERT(page->cp_state == CPS_FREEING);
1006                        LASSERT(0);
1007                }
1008
1009                lu_ref_del(&page->cp_reference, "truncate", current);
1010                cl_page_put(env, page);
1011
1012                --ext->oe_nr_pages;
1013                ++nr_pages;
1014        }
1015        EASSERTF(ergo(ext->oe_start >= trunc_index + !!partial,
1016                      ext->oe_nr_pages == 0),
1017                ext, "trunc_index %lu, partial %d\n", trunc_index, partial);
1018
1019        osc_object_lock(obj);
1020        if (ext->oe_nr_pages == 0) {
1021                LASSERT(pages_in_chunk == 0);
1022                grants = ext->oe_grants;
1023                ext->oe_grants = 0;
1024        } else { /* calculate how many grants we can free */
1025                int     chunks = (ext->oe_end >> ppc_bits) - trunc_chunk;
1026                pgoff_t last_index;
1027
1028
1029                /* if there is no pages in this chunk, we can also free grants
1030                 * for the last chunk */
1031                if (pages_in_chunk == 0) {
1032                        /* if this is the 1st chunk and no pages in this chunk,
1033                         * ext->oe_nr_pages must be zero, so we should be in
1034                         * the other if-clause. */
1035                        LASSERT(trunc_chunk > 0);
1036                        --trunc_chunk;
1037                        ++chunks;
1038                }
1039
1040                /* this is what we can free from this extent */
1041                grants    = chunks << cli->cl_chunkbits;
1042                ext->oe_grants -= grants;
1043                last_index      = ((trunc_chunk + 1) << ppc_bits) - 1;
1044                ext->oe_end     = min(last_index, ext->oe_max_end);
1045                LASSERT(ext->oe_end >= ext->oe_start);
1046                LASSERT(ext->oe_grants > 0);
1047        }
1048        osc_object_unlock(obj);
1049
1050        if (grants > 0 || nr_pages > 0)
1051                osc_free_grant(cli, nr_pages, grants);
1052
1053out:
1054        cl_io_fini(env, io);
1055        cl_env_nested_put(&nest, env);
1056        return rc;
1057}
1058
1059/**
1060 * This function is used to make the extent prepared for transfer.
1061 * A race with flushing page - ll_writepage() has to be handled cautiously.
1062 */
1063static int osc_extent_make_ready(const struct lu_env *env,
1064                                 struct osc_extent *ext)
1065{
1066        struct osc_async_page *oap;
1067        struct osc_async_page *last = NULL;
1068        struct osc_object *obj = ext->oe_obj;
1069        int page_count = 0;
1070        int rc;
1071
1072        /* we're going to grab page lock, so object lock must not be taken. */
1073        LASSERT(sanity_check(ext) == 0);
1074        /* in locking state, any process should not touch this extent. */
1075        EASSERT(ext->oe_state == OES_LOCKING, ext);
1076        EASSERT(ext->oe_owner != NULL, ext);
1077
1078        OSC_EXTENT_DUMP(D_CACHE, ext, "make ready\n");
1079
1080        list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1081                ++page_count;
1082                if (last == NULL || last->oap_obj_off < oap->oap_obj_off)
1083                        last = oap;
1084
1085                /* checking ASYNC_READY is race safe */
1086                if ((oap->oap_async_flags & ASYNC_READY) != 0)
1087                        continue;
1088
1089                rc = osc_make_ready(env, oap, OBD_BRW_WRITE);
1090                switch (rc) {
1091                case 0:
1092                        spin_lock(&oap->oap_lock);
1093                        oap->oap_async_flags |= ASYNC_READY;
1094                        spin_unlock(&oap->oap_lock);
1095                        break;
1096                case -EALREADY:
1097                        LASSERT((oap->oap_async_flags & ASYNC_READY) != 0);
1098                        break;
1099                default:
1100                        LASSERTF(0, "unknown return code: %d\n", rc);
1101                }
1102        }
1103
1104        LASSERT(page_count == ext->oe_nr_pages);
1105        LASSERT(last != NULL);
1106        /* the last page is the only one we need to refresh its count by
1107         * the size of file. */
1108        if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) {
1109                last->oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
1110                LASSERT(last->oap_count > 0);
1111                LASSERT(last->oap_page_off + last->oap_count <= PAGE_CACHE_SIZE);
1112                last->oap_async_flags |= ASYNC_COUNT_STABLE;
1113        }
1114
1115        /* for the rest of pages, we don't need to call osf_refresh_count()
1116         * because it's known they are not the last page */
1117        list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1118                if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
1119                        oap->oap_count = PAGE_CACHE_SIZE - oap->oap_page_off;
1120                        oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1121                }
1122        }
1123
1124        osc_object_lock(obj);
1125        osc_extent_state_set(ext, OES_RPC);
1126        osc_object_unlock(obj);
1127        /* get a refcount for RPC. */
1128        osc_extent_get(ext);
1129
1130        return 0;
1131}
1132
1133/**
1134 * Quick and simple version of osc_extent_find(). This function is frequently
1135 * called to expand the extent for the same IO. To expand the extent, the
1136 * page index must be in the same or next chunk of ext->oe_end.
1137 */
1138static int osc_extent_expand(struct osc_extent *ext, pgoff_t index, int *grants)
1139{
1140        struct osc_object *obj = ext->oe_obj;
1141        struct client_obd *cli = osc_cli(obj);
1142        struct osc_extent *next;
1143        int ppc_bits = cli->cl_chunkbits - PAGE_CACHE_SHIFT;
1144        pgoff_t chunk = index >> ppc_bits;
1145        pgoff_t end_chunk;
1146        pgoff_t end_index;
1147        int chunksize = 1 << cli->cl_chunkbits;
1148        int rc = 0;
1149
1150        LASSERT(ext->oe_max_end >= index && ext->oe_start <= index);
1151        osc_object_lock(obj);
1152        LASSERT(sanity_check_nolock(ext) == 0);
1153        end_chunk = ext->oe_end >> ppc_bits;
1154        if (chunk > end_chunk + 1) {
1155                rc = -ERANGE;
1156                goto out;
1157        }
1158
1159        if (end_chunk >= chunk) {
1160                rc = 0;
1161                goto out;
1162        }
1163
1164        LASSERT(end_chunk + 1 == chunk);
1165        /* try to expand this extent to cover @index */
1166        end_index = min(ext->oe_max_end, ((chunk + 1) << ppc_bits) - 1);
1167
1168        next = next_extent(ext);
1169        if (next != NULL && next->oe_start <= end_index) {
1170                /* complex mode - overlapped with the next extent,
1171                 * this case will be handled by osc_extent_find() */
1172                rc = -EAGAIN;
1173                goto out;
1174        }
1175
1176        ext->oe_end = end_index;
1177        ext->oe_grants += chunksize;
1178        *grants -= chunksize;
1179        LASSERT(*grants >= 0);
1180        EASSERTF(osc_extent_is_overlapped(obj, ext) == 0, ext,
1181                 "overlapped after expanding for %lu.\n", index);
1182
1183out:
1184        osc_object_unlock(obj);
1185        return rc;
1186}
1187
1188static void osc_extent_tree_dump0(int level, struct osc_object *obj,
1189                                  const char *func, int line)
1190{
1191        struct osc_extent *ext;
1192        int cnt;
1193
1194        CDEBUG(level, "Dump object %p extents at %s:%d, mppr: %u.\n",
1195               obj, func, line, osc_cli(obj)->cl_max_pages_per_rpc);
1196
1197        /* osc_object_lock(obj); */
1198        cnt = 1;
1199        for (ext = first_extent(obj); ext != NULL; ext = next_extent(ext))
1200                OSC_EXTENT_DUMP(level, ext, "in tree %d.\n", cnt++);
1201
1202        cnt = 1;
1203        list_for_each_entry(ext, &obj->oo_hp_exts, oe_link)
1204                OSC_EXTENT_DUMP(level, ext, "hp %d.\n", cnt++);
1205
1206        cnt = 1;
1207        list_for_each_entry(ext, &obj->oo_urgent_exts, oe_link)
1208                OSC_EXTENT_DUMP(level, ext, "urgent %d.\n", cnt++);
1209
1210        cnt = 1;
1211        list_for_each_entry(ext, &obj->oo_reading_exts, oe_link)
1212                OSC_EXTENT_DUMP(level, ext, "reading %d.\n", cnt++);
1213        /* osc_object_unlock(obj); */
1214}
1215
1216/* ------------------ osc extent end ------------------ */
1217
1218static inline int osc_is_ready(struct osc_object *osc)
1219{
1220        return !list_empty(&osc->oo_ready_item) ||
1221               !list_empty(&osc->oo_hp_ready_item);
1222}
1223
1224#define OSC_IO_DEBUG(OSC, STR, args...)                                        \
1225        CDEBUG(D_CACHE, "obj %p ready %d|%c|%c wr %d|%c|%c rd %d|%c " STR,     \
1226               (OSC), osc_is_ready(OSC),                                       \
1227               list_empty_marker(&(OSC)->oo_hp_ready_item),                    \
1228               list_empty_marker(&(OSC)->oo_ready_item),                       \
1229               atomic_read(&(OSC)->oo_nr_writes),                              \
1230               list_empty_marker(&(OSC)->oo_hp_exts),                          \
1231               list_empty_marker(&(OSC)->oo_urgent_exts),                      \
1232               atomic_read(&(OSC)->oo_nr_reads),                               \
1233               list_empty_marker(&(OSC)->oo_reading_exts),                     \
1234               ##args)
1235
1236static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
1237                          int cmd)
1238{
1239        struct osc_page *opg  = oap2osc_page(oap);
1240        struct cl_page  *page = cl_page_top(oap2cl_page(oap));
1241        int result;
1242
1243        LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */
1244
1245        result = cl_page_make_ready(env, page, CRT_WRITE);
1246        if (result == 0)
1247                opg->ops_submit_time = cfs_time_current();
1248        return result;
1249}
1250
1251static int osc_refresh_count(const struct lu_env *env,
1252                             struct osc_async_page *oap, int cmd)
1253{
1254        struct osc_page  *opg = oap2osc_page(oap);
1255        struct cl_page   *page = oap2cl_page(oap);
1256        struct cl_object *obj;
1257        struct cl_attr   *attr = &osc_env_info(env)->oti_attr;
1258
1259        int result;
1260        loff_t kms;
1261
1262        /* readpage queues with _COUNT_STABLE, shouldn't get here. */
1263        LASSERT(!(cmd & OBD_BRW_READ));
1264        LASSERT(opg != NULL);
1265        obj = opg->ops_cl.cpl_obj;
1266
1267        cl_object_attr_lock(obj);
1268        result = cl_object_attr_get(env, obj, attr);
1269        cl_object_attr_unlock(obj);
1270        if (result < 0)
1271                return result;
1272        kms = attr->cat_kms;
1273        if (cl_offset(obj, page->cp_index) >= kms)
1274                /* catch race with truncate */
1275                return 0;
1276        else if (cl_offset(obj, page->cp_index + 1) > kms)
1277                /* catch sub-page write at end of file */
1278                return kms % PAGE_CACHE_SIZE;
1279        else
1280                return PAGE_CACHE_SIZE;
1281}
1282
1283static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
1284                          int cmd, int rc)
1285{
1286        struct osc_page   *opg  = oap2osc_page(oap);
1287        struct cl_page    *page = cl_page_top(oap2cl_page(oap));
1288        struct osc_object *obj  = cl2osc(opg->ops_cl.cpl_obj);
1289        enum cl_req_type   crt;
1290        int srvlock;
1291
1292        cmd &= ~OBD_BRW_NOQUOTA;
1293        LASSERT(equi(page->cp_state == CPS_PAGEIN,  cmd == OBD_BRW_READ));
1294        LASSERT(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE));
1295        LASSERT(opg->ops_transfer_pinned);
1296
1297        /*
1298         * page->cp_req can be NULL if io submission failed before
1299         * cl_req was allocated.
1300         */
1301        if (page->cp_req != NULL)
1302                cl_req_page_done(env, page);
1303        LASSERT(page->cp_req == NULL);
1304
1305        crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE;
1306        /* Clear opg->ops_transfer_pinned before VM lock is released. */
1307        opg->ops_transfer_pinned = 0;
1308
1309        spin_lock(&obj->oo_seatbelt);
1310        LASSERT(opg->ops_submitter != NULL);
1311        LASSERT(!list_empty(&opg->ops_inflight));
1312        list_del_init(&opg->ops_inflight);
1313        opg->ops_submitter = NULL;
1314        spin_unlock(&obj->oo_seatbelt);
1315
1316        opg->ops_submit_time = 0;
1317        srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
1318
1319        /* statistic */
1320        if (rc == 0 && srvlock) {
1321                struct lu_device *ld    = opg->ops_cl.cpl_obj->co_lu.lo_dev;
1322                struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
1323                int bytes = oap->oap_count;
1324
1325                if (crt == CRT_READ)
1326                        stats->os_lockless_reads += bytes;
1327                else
1328                        stats->os_lockless_writes += bytes;
1329        }
1330
1331        /*
1332         * This has to be the last operation with the page, as locks are
1333         * released in cl_page_completion() and nothing except for the
1334         * reference counter protects page from concurrent reclaim.
1335         */
1336        lu_ref_del(&page->cp_reference, "transfer", page);
1337
1338        cl_page_completion(env, page, crt, rc);
1339
1340        return 0;
1341}
1342
1343#define OSC_DUMP_GRANT(cli, fmt, args...) do {                                \
1344        struct client_obd *__tmp = (cli);                                     \
1345        CDEBUG(D_CACHE, "%s: { dirty: %ld/%ld dirty_pages: %d/%d "            \
1346               "dropped: %ld avail: %ld, reserved: %ld, flight: %d } " fmt,   \
1347               __tmp->cl_import->imp_obd->obd_name,                           \
1348               __tmp->cl_dirty, __tmp->cl_dirty_max,                          \
1349               atomic_read(&obd_dirty_pages), obd_max_dirty_pages,            \
1350               __tmp->cl_lost_grant, __tmp->cl_avail_grant,                   \
1351               __tmp->cl_reserved_grant, __tmp->cl_w_in_flight, ##args);      \
1352} while (0)
1353
1354/* caller must hold loi_list_lock */
1355static void osc_consume_write_grant(struct client_obd *cli,
1356                                    struct brw_page *pga)
1357{
1358        assert_spin_locked(&cli->cl_loi_list_lock.lock);
1359        LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
1360        atomic_inc(&obd_dirty_pages);
1361        cli->cl_dirty += PAGE_CACHE_SIZE;
1362        pga->flag |= OBD_BRW_FROM_GRANT;
1363        CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
1364               PAGE_CACHE_SIZE, pga, pga->pg);
1365        osc_update_next_shrink(cli);
1366}
1367
1368/* the companion to osc_consume_write_grant, called when a brw has completed.
1369 * must be called with the loi lock held. */
1370static void osc_release_write_grant(struct client_obd *cli,
1371                                    struct brw_page *pga)
1372{
1373        assert_spin_locked(&cli->cl_loi_list_lock.lock);
1374        if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
1375                return;
1376        }
1377
1378        pga->flag &= ~OBD_BRW_FROM_GRANT;
1379        atomic_dec(&obd_dirty_pages);
1380        cli->cl_dirty -= PAGE_CACHE_SIZE;
1381        if (pga->flag & OBD_BRW_NOCACHE) {
1382                pga->flag &= ~OBD_BRW_NOCACHE;
1383                atomic_dec(&obd_dirty_transit_pages);
1384                cli->cl_dirty_transit -= PAGE_CACHE_SIZE;
1385        }
1386}
1387
1388/**
1389 * To avoid sleeping with object lock held, it's good for us allocate enough
1390 * grants before entering into critical section.
1391 *
1392 * client_obd_list_lock held by caller
1393 */
1394static int osc_reserve_grant(struct client_obd *cli, unsigned int bytes)
1395{
1396        int rc = -EDQUOT;
1397
1398        if (cli->cl_avail_grant >= bytes) {
1399                cli->cl_avail_grant    -= bytes;
1400                cli->cl_reserved_grant += bytes;
1401                rc = 0;
1402        }
1403        return rc;
1404}
1405
1406static void __osc_unreserve_grant(struct client_obd *cli,
1407                                  unsigned int reserved, unsigned int unused)
1408{
1409        /* it's quite normal for us to get more grant than reserved.
1410         * Thinking about a case that two extents merged by adding a new
1411         * chunk, we can save one extent tax. If extent tax is greater than
1412         * one chunk, we can save more grant by adding a new chunk */
1413        cli->cl_reserved_grant -= reserved;
1414        if (unused > reserved) {
1415                cli->cl_avail_grant += reserved;
1416                cli->cl_lost_grant  += unused - reserved;
1417        } else {
1418                cli->cl_avail_grant += unused;
1419        }
1420}
1421
1422void osc_unreserve_grant(struct client_obd *cli,
1423                         unsigned int reserved, unsigned int unused)
1424{
1425        client_obd_list_lock(&cli->cl_loi_list_lock);
1426        __osc_unreserve_grant(cli, reserved, unused);
1427        if (unused > 0)
1428                osc_wake_cache_waiters(cli);
1429        client_obd_list_unlock(&cli->cl_loi_list_lock);
1430}
1431
1432/**
1433 * Free grant after IO is finished or canceled.
1434 *
1435 * @lost_grant is used to remember how many grants we have allocated but not
1436 * used, we should return these grants to OST. There're two cases where grants
1437 * can be lost:
1438 * 1. truncate;
1439 * 2. blocksize at OST is less than PAGE_CACHE_SIZE and a partial page was
1440 *    written. In this case OST may use less chunks to serve this partial
1441 *    write. OSTs don't actually know the page size on the client side. so
1442 *    clients have to calculate lost grant by the blocksize on the OST.
1443 *    See filter_grant_check() for details.
1444 */
1445static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
1446                           unsigned int lost_grant)
1447{
1448        int grant = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
1449
1450        client_obd_list_lock(&cli->cl_loi_list_lock);
1451        atomic_sub(nr_pages, &obd_dirty_pages);
1452        cli->cl_dirty -= nr_pages << PAGE_CACHE_SHIFT;
1453        cli->cl_lost_grant += lost_grant;
1454        if (cli->cl_avail_grant < grant && cli->cl_lost_grant >= grant) {
1455                /* borrow some grant from truncate to avoid the case that
1456                 * truncate uses up all avail grant */
1457                cli->cl_lost_grant -= grant;
1458                cli->cl_avail_grant += grant;
1459        }
1460        osc_wake_cache_waiters(cli);
1461        client_obd_list_unlock(&cli->cl_loi_list_lock);
1462        CDEBUG(D_CACHE, "lost %u grant: %lu avail: %lu dirty: %lu\n",
1463               lost_grant, cli->cl_lost_grant,
1464               cli->cl_avail_grant, cli->cl_dirty);
1465}
1466
1467/**
1468 * The companion to osc_enter_cache(), called when @oap is no longer part of
1469 * the dirty accounting due to error.
1470 */
1471static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap)
1472{
1473        client_obd_list_lock(&cli->cl_loi_list_lock);
1474        osc_release_write_grant(cli, &oap->oap_brw_page);
1475        client_obd_list_unlock(&cli->cl_loi_list_lock);
1476}
1477
1478/**
1479 * Non-blocking version of osc_enter_cache() that consumes grant only when it
1480 * is available.
1481 */
1482static int osc_enter_cache_try(struct client_obd *cli,
1483                               struct osc_async_page *oap,
1484                               int bytes, int transient)
1485{
1486        int rc;
1487
1488        OSC_DUMP_GRANT(cli, "need:%d.\n", bytes);
1489
1490        rc = osc_reserve_grant(cli, bytes);
1491        if (rc < 0)
1492                return 0;
1493
1494        if (cli->cl_dirty + PAGE_CACHE_SIZE <= cli->cl_dirty_max &&
1495            atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) {
1496                osc_consume_write_grant(cli, &oap->oap_brw_page);
1497                if (transient) {
1498                        cli->cl_dirty_transit += PAGE_CACHE_SIZE;
1499                        atomic_inc(&obd_dirty_transit_pages);
1500                        oap->oap_brw_flags |= OBD_BRW_NOCACHE;
1501                }
1502                rc = 1;
1503        } else {
1504                __osc_unreserve_grant(cli, bytes, bytes);
1505                rc = 0;
1506        }
1507        return rc;
1508}
1509
1510static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
1511{
1512        int rc;
1513        client_obd_list_lock(&cli->cl_loi_list_lock);
1514        rc = list_empty(&ocw->ocw_entry);
1515        client_obd_list_unlock(&cli->cl_loi_list_lock);
1516        return rc;
1517}
1518
1519/**
1520 * The main entry to reserve dirty page accounting. Usually the grant reserved
1521 * in this function will be freed in bulk in osc_free_grant() unless it fails
1522 * to add osc cache, in that case, it will be freed in osc_exit_cache().
1523 *
1524 * The process will be put into sleep if it's already run out of grant.
1525 */
1526static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
1527                           struct osc_async_page *oap, int bytes)
1528{
1529        struct osc_object *osc = oap->oap_obj;
1530        struct lov_oinfo  *loi = osc->oo_oinfo;
1531        struct osc_cache_waiter ocw;
1532        struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1533        int rc = -EDQUOT;
1534
1535        OSC_DUMP_GRANT(cli, "need:%d.\n", bytes);
1536
1537        client_obd_list_lock(&cli->cl_loi_list_lock);
1538
1539        /* force the caller to try sync io.  this can jump the list
1540         * of queued writes and create a discontiguous rpc stream */
1541        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
1542            cli->cl_dirty_max < PAGE_CACHE_SIZE     ||
1543            cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync) {
1544                rc = -EDQUOT;
1545                goto out;
1546        }
1547
1548        /* Hopefully normal case - cache space and write credits available */
1549        if (osc_enter_cache_try(cli, oap, bytes, 0)) {
1550                rc = 0;
1551                goto out;
1552        }
1553
1554        /* We can get here for two reasons: too many dirty pages in cache, or
1555         * run out of grants. In both cases we should write dirty pages out.
1556         * Adding a cache waiter will trigger urgent write-out no matter what
1557         * RPC size will be.
1558         * The exiting condition is no avail grants and no dirty pages caching,
1559         * that really means there is no space on the OST. */
1560        init_waitqueue_head(&ocw.ocw_waitq);
1561        ocw.ocw_oap   = oap;
1562        ocw.ocw_grant = bytes;
1563        while (cli->cl_dirty > 0 || cli->cl_w_in_flight > 0) {
1564                list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
1565                ocw.ocw_rc = 0;
1566                client_obd_list_unlock(&cli->cl_loi_list_lock);
1567
1568                osc_io_unplug_async(env, cli, NULL);
1569
1570                CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
1571                       cli->cl_import->imp_obd->obd_name, &ocw, oap);
1572
1573                rc = l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
1574
1575                client_obd_list_lock(&cli->cl_loi_list_lock);
1576
1577                /* l_wait_event is interrupted by signal */
1578                if (rc < 0) {
1579                        list_del_init(&ocw.ocw_entry);
1580                        goto out;
1581                }
1582
1583                LASSERT(list_empty(&ocw.ocw_entry));
1584                rc = ocw.ocw_rc;
1585
1586                if (rc != -EDQUOT)
1587                        goto out;
1588                if (osc_enter_cache_try(cli, oap, bytes, 0)) {
1589                        rc = 0;
1590                        goto out;
1591                }
1592        }
1593out:
1594        client_obd_list_unlock(&cli->cl_loi_list_lock);
1595        OSC_DUMP_GRANT(cli, "returned %d.\n", rc);
1596        return rc;
1597}
1598
1599/* caller must hold loi_list_lock */
1600void osc_wake_cache_waiters(struct client_obd *cli)
1601{
1602        struct list_head *l, *tmp;
1603        struct osc_cache_waiter *ocw;
1604
1605        list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
1606                ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
1607                list_del_init(&ocw->ocw_entry);
1608
1609                ocw->ocw_rc = -EDQUOT;
1610                /* we can't dirty more */
1611                if ((cli->cl_dirty + PAGE_CACHE_SIZE > cli->cl_dirty_max) ||
1612                    (atomic_read(&obd_dirty_pages) + 1 >
1613                     obd_max_dirty_pages)) {
1614                        CDEBUG(D_CACHE, "no dirty room: dirty: %ld osc max %ld, sys max %d\n",
1615                               cli->cl_dirty,
1616                               cli->cl_dirty_max, obd_max_dirty_pages);
1617                        goto wakeup;
1618                }
1619
1620                ocw->ocw_rc = 0;
1621                if (!osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0))
1622                        ocw->ocw_rc = -EDQUOT;
1623
1624wakeup:
1625                CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n",
1626                       ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc);
1627
1628                wake_up(&ocw->ocw_waitq);
1629        }
1630}
1631
1632static int osc_max_rpc_in_flight(struct client_obd *cli, struct osc_object *osc)
1633{
1634        int hprpc = !!list_empty(&osc->oo_hp_exts);
1635        return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
1636}
1637
1638/* This maintains the lists of pending pages to read/write for a given object
1639 * (lop).  This is used by osc_check_rpcs->osc_next_obj() and osc_list_maint()
1640 * to quickly find objects that are ready to send an RPC. */
1641static int osc_makes_rpc(struct client_obd *cli, struct osc_object *osc,
1642                         int cmd)
1643{
1644        int invalid_import = 0;
1645
1646        /* if we have an invalid import we want to drain the queued pages
1647         * by forcing them through rpcs that immediately fail and complete
1648         * the pages.  recovery relies on this to empty the queued pages
1649         * before canceling the locks and evicting down the llite pages */
1650        if ((cli->cl_import == NULL || cli->cl_import->imp_invalid))
1651                invalid_import = 1;
1652
1653        if (cmd & OBD_BRW_WRITE) {
1654                if (atomic_read(&osc->oo_nr_writes) == 0)
1655                        return 0;
1656                if (invalid_import) {
1657                        CDEBUG(D_CACHE, "invalid import forcing RPC\n");
1658                        return 1;
1659                }
1660                if (!list_empty(&osc->oo_hp_exts)) {
1661                        CDEBUG(D_CACHE, "high prio request forcing RPC\n");
1662                        return 1;
1663                }
1664                if (!list_empty(&osc->oo_urgent_exts)) {
1665                        CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1666                        return 1;
1667                }
1668                /* trigger a write rpc stream as long as there are dirtiers
1669                 * waiting for space.  as they're waiting, they're not going to
1670                 * create more pages to coalesce with what's waiting.. */
1671                if (!list_empty(&cli->cl_cache_waiters)) {
1672                        CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1673                        return 1;
1674                }
1675                if (atomic_read(&osc->oo_nr_writes) >=
1676                    cli->cl_max_pages_per_rpc)
1677                        return 1;
1678        } else {
1679                if (atomic_read(&osc->oo_nr_reads) == 0)
1680                        return 0;
1681                if (invalid_import) {
1682                        CDEBUG(D_CACHE, "invalid import forcing RPC\n");
1683                        return 1;
1684                }
1685                /* all read are urgent. */
1686                if (!list_empty(&osc->oo_reading_exts))
1687                        return 1;
1688        }
1689
1690        return 0;
1691}
1692
1693static void osc_update_pending(struct osc_object *obj, int cmd, int delta)
1694{
1695        struct client_obd *cli = osc_cli(obj);
1696        if (cmd & OBD_BRW_WRITE) {
1697                atomic_add(delta, &obj->oo_nr_writes);
1698                atomic_add(delta, &cli->cl_pending_w_pages);
1699                LASSERT(atomic_read(&obj->oo_nr_writes) >= 0);
1700        } else {
1701                atomic_add(delta, &obj->oo_nr_reads);
1702                atomic_add(delta, &cli->cl_pending_r_pages);
1703                LASSERT(atomic_read(&obj->oo_nr_reads) >= 0);
1704        }
1705        OSC_IO_DEBUG(obj, "update pending cmd %d delta %d.\n", cmd, delta);
1706}
1707
1708static int osc_makes_hprpc(struct osc_object *obj)
1709{
1710        return !list_empty(&obj->oo_hp_exts);
1711}
1712
1713static void on_list(struct list_head *item, struct list_head *list, int should_be_on)
1714{
1715        if (list_empty(item) && should_be_on)
1716                list_add_tail(item, list);
1717        else if (!list_empty(item) && !should_be_on)
1718                list_del_init(item);
1719}
1720
1721/* maintain the osc's cli list membership invariants so that osc_send_oap_rpc
1722 * can find pages to build into rpcs quickly */
1723static int __osc_list_maint(struct client_obd *cli, struct osc_object *osc)
1724{
1725        if (osc_makes_hprpc(osc)) {
1726                /* HP rpc */
1727                on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list, 0);
1728                on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1729        } else {
1730                on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1731                on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list,
1732                        osc_makes_rpc(cli, osc, OBD_BRW_WRITE) ||
1733                        osc_makes_rpc(cli, osc, OBD_BRW_READ));
1734        }
1735
1736        on_list(&osc->oo_write_item, &cli->cl_loi_write_list,
1737                atomic_read(&osc->oo_nr_writes) > 0);
1738
1739        on_list(&osc->oo_read_item, &cli->cl_loi_read_list,
1740                atomic_read(&osc->oo_nr_reads) > 0);
1741
1742        return osc_is_ready(osc);
1743}
1744
1745static int osc_list_maint(struct client_obd *cli, struct osc_object *osc)
1746{
1747        int is_ready;
1748
1749        client_obd_list_lock(&cli->cl_loi_list_lock);
1750        is_ready = __osc_list_maint(cli, osc);
1751        client_obd_list_unlock(&cli->cl_loi_list_lock);
1752
1753        return is_ready;
1754}
1755
1756/* this is trying to propagate async writeback errors back up to the
1757 * application.  As an async write fails we record the error code for later if
1758 * the app does an fsync.  As long as errors persist we force future rpcs to be
1759 * sync so that the app can get a sync error and break the cycle of queueing
1760 * pages for which writeback will fail. */
1761static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1762                           int rc)
1763{
1764        if (rc) {
1765                if (!ar->ar_rc)
1766                        ar->ar_rc = rc;
1767
1768                ar->ar_force_sync = 1;
1769                ar->ar_min_xid = ptlrpc_sample_next_xid();
1770                return;
1771
1772        }
1773
1774        if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1775                ar->ar_force_sync = 0;
1776}
1777
1778
1779/* this must be called holding the loi list lock to give coverage to exit_cache,
1780 * async_flag maintenance, and oap_request */
1781static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
1782                              struct osc_async_page *oap, int sent, int rc)
1783{
1784        struct osc_object *osc = oap->oap_obj;
1785        struct lov_oinfo  *loi = osc->oo_oinfo;
1786        __u64 xid = 0;
1787
1788        if (oap->oap_request != NULL) {
1789                xid = ptlrpc_req_xid(oap->oap_request);
1790                ptlrpc_req_finished(oap->oap_request);
1791                oap->oap_request = NULL;
1792        }
1793
1794        /* As the transfer for this page is being done, clear the flags */
1795        spin_lock(&oap->oap_lock);
1796        oap->oap_async_flags = 0;
1797        spin_unlock(&oap->oap_lock);
1798        oap->oap_interrupted = 0;
1799
1800        if (oap->oap_cmd & OBD_BRW_WRITE && xid > 0) {
1801                client_obd_list_lock(&cli->cl_loi_list_lock);
1802                osc_process_ar(&cli->cl_ar, xid, rc);
1803                osc_process_ar(&loi->loi_ar, xid, rc);
1804                client_obd_list_unlock(&cli->cl_loi_list_lock);
1805        }
1806
1807        rc = osc_completion(env, oap, oap->oap_cmd, rc);
1808        if (rc)
1809                CERROR("completion on oap %p obj %p returns %d.\n",
1810                       oap, osc, rc);
1811}
1812
1813/**
1814 * Try to add extent to one RPC. We need to think about the following things:
1815 * - # of pages must not be over max_pages_per_rpc
1816 * - extent must be compatible with previous ones
1817 */
1818static int try_to_add_extent_for_io(struct client_obd *cli,
1819                                    struct osc_extent *ext, struct list_head *rpclist,
1820                                    int *pc, unsigned int *max_pages)
1821{
1822        struct osc_extent *tmp;
1823
1824        EASSERT((ext->oe_state == OES_CACHE || ext->oe_state == OES_LOCK_DONE),
1825                ext);
1826
1827        *max_pages = max(ext->oe_mppr, *max_pages);
1828        if (*pc + ext->oe_nr_pages > *max_pages)
1829                return 0;
1830
1831        list_for_each_entry(tmp, rpclist, oe_link) {
1832                EASSERT(tmp->oe_owner == current, tmp);
1833#if 0
1834                if (overlapped(tmp, ext)) {
1835                        OSC_EXTENT_DUMP(D_ERROR, tmp, "overlapped %p.\n", ext);
1836                        EASSERT(0, ext);
1837                }
1838#endif
1839
1840                if (tmp->oe_srvlock != ext->oe_srvlock ||
1841                    !tmp->oe_grants != !ext->oe_grants)
1842                        return 0;
1843
1844                /* remove break for strict check */
1845                break;
1846        }
1847
1848        *pc += ext->oe_nr_pages;
1849        list_move_tail(&ext->oe_link, rpclist);
1850        ext->oe_owner = current;
1851        return 1;
1852}
1853
1854/**
1855 * In order to prevent multiple ptlrpcd from breaking contiguous extents,
1856 * get_write_extent() takes all appropriate extents in atomic.
1857 *
1858 * The following policy is used to collect extents for IO:
1859 * 1. Add as many HP extents as possible;
1860 * 2. Add the first urgent extent in urgent extent list and take it out of
1861 *    urgent list;
1862 * 3. Add subsequent extents of this urgent extent;
1863 * 4. If urgent list is not empty, goto 2;
1864 * 5. Traverse the extent tree from the 1st extent;
1865 * 6. Above steps exit if there is no space in this RPC.
1866 */
1867static int get_write_extents(struct osc_object *obj, struct list_head *rpclist)
1868{
1869        struct client_obd *cli = osc_cli(obj);
1870        struct osc_extent *ext;
1871        int page_count = 0;
1872        unsigned int max_pages = cli->cl_max_pages_per_rpc;
1873
1874        LASSERT(osc_object_is_locked(obj));
1875        while (!list_empty(&obj->oo_hp_exts)) {
1876                ext = list_entry(obj->oo_hp_exts.next, struct osc_extent,
1877                                     oe_link);
1878                LASSERT(ext->oe_state == OES_CACHE);
1879                if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
1880                                              &max_pages))
1881                        return page_count;
1882                EASSERT(ext->oe_nr_pages <= max_pages, ext);
1883        }
1884        if (page_count == max_pages)
1885                return page_count;
1886
1887        while (!list_empty(&obj->oo_urgent_exts)) {
1888                ext = list_entry(obj->oo_urgent_exts.next,
1889                                     struct osc_extent, oe_link);
1890                if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
1891                                              &max_pages))
1892                        return page_count;
1893
1894                if (!ext->oe_intree)
1895                        continue;
1896
1897                while ((ext = next_extent(ext)) != NULL) {
1898                        if ((ext->oe_state != OES_CACHE) ||
1899                            (!list_empty(&ext->oe_link) &&
1900                             ext->oe_owner != NULL))
1901                                continue;
1902
1903                        if (!try_to_add_extent_for_io(cli, ext, rpclist,
1904                                                      &page_count, &max_pages))
1905                                return page_count;
1906                }
1907        }
1908        if (page_count == max_pages)
1909                return page_count;
1910
1911        ext = first_extent(obj);
1912        while (ext != NULL) {
1913                if ((ext->oe_state != OES_CACHE) ||
1914                    /* this extent may be already in current rpclist */
1915                    (!list_empty(&ext->oe_link) && ext->oe_owner != NULL)) {
1916                        ext = next_extent(ext);
1917                        continue;
1918                }
1919
1920                if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
1921                                              &max_pages))
1922                        return page_count;
1923
1924                ext = next_extent(ext);
1925        }
1926        return page_count;
1927}
1928
1929static int
1930osc_send_write_rpc(const struct lu_env *env, struct client_obd *cli,
1931                   struct osc_object *osc, pdl_policy_t pol)
1932{
1933        LIST_HEAD(rpclist);
1934        struct osc_extent *ext;
1935        struct osc_extent *tmp;
1936        struct osc_extent *first = NULL;
1937        u32 page_count = 0;
1938        int srvlock = 0;
1939        int rc = 0;
1940
1941        LASSERT(osc_object_is_locked(osc));
1942
1943        page_count = get_write_extents(osc, &rpclist);
1944        LASSERT(equi(page_count == 0, list_empty(&rpclist)));
1945
1946        if (list_empty(&rpclist))
1947                return 0;
1948
1949        osc_update_pending(osc, OBD_BRW_WRITE, -page_count);
1950
1951        list_for_each_entry(ext, &rpclist, oe_link) {
1952                LASSERT(ext->oe_state == OES_CACHE ||
1953                        ext->oe_state == OES_LOCK_DONE);
1954                if (ext->oe_state == OES_CACHE)
1955                        osc_extent_state_set(ext, OES_LOCKING);
1956                else
1957                        osc_extent_state_set(ext, OES_RPC);
1958        }
1959
1960        /* we're going to grab page lock, so release object lock because
1961         * lock order is page lock -> object lock. */
1962        osc_object_unlock(osc);
1963
1964        list_for_each_entry_safe(ext, tmp, &rpclist, oe_link) {
1965                if (ext->oe_state == OES_LOCKING) {
1966                        rc = osc_extent_make_ready(env, ext);
1967                        if (unlikely(rc < 0)) {
1968                                list_del_init(&ext->oe_link);
1969                                osc_extent_finish(env, ext, 0, rc);
1970                                continue;
1971                        }
1972                }
1973                if (first == NULL) {
1974                        first = ext;
1975                        srvlock = ext->oe_srvlock;
1976                } else {
1977                        LASSERT(srvlock == ext->oe_srvlock);
1978                }
1979        }
1980
1981        if (!list_empty(&rpclist)) {
1982                LASSERT(page_count > 0);
1983                rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_WRITE, pol);
1984                LASSERT(list_empty(&rpclist));
1985        }
1986
1987        osc_object_lock(osc);
1988        return rc;
1989}
1990
1991/**
1992 * prepare pages for ASYNC io and put pages in send queue.
1993 *
1994 * \param cmd OBD_BRW_* macroses
1995 * \param lop pending pages
1996 *
1997 * \return zero if no page added to send queue.
1998 * \return 1 if pages successfully added to send queue.
1999 * \return negative on errors.
2000 */
2001static int
2002osc_send_read_rpc(const struct lu_env *env, struct client_obd *cli,
2003                  struct osc_object *osc, pdl_policy_t pol)
2004{
2005        struct osc_extent *ext;
2006        struct osc_extent *next;
2007        LIST_HEAD(rpclist);
2008        int page_count = 0;
2009        unsigned int max_pages = cli->cl_max_pages_per_rpc;
2010        int rc = 0;
2011
2012        LASSERT(osc_object_is_locked(osc));
2013        list_for_each_entry_safe(ext, next,
2014                                     &osc->oo_reading_exts, oe_link) {
2015                EASSERT(ext->oe_state == OES_LOCK_DONE, ext);
2016                if (!try_to_add_extent_for_io(cli, ext, &rpclist, &page_count,
2017                                              &max_pages))
2018                        break;
2019                osc_extent_state_set(ext, OES_RPC);
2020                EASSERT(ext->oe_nr_pages <= max_pages, ext);
2021        }
2022        LASSERT(page_count <= max_pages);
2023
2024        osc_update_pending(osc, OBD_BRW_READ, -page_count);
2025
2026        if (!list_empty(&rpclist)) {
2027                osc_object_unlock(osc);
2028
2029                LASSERT(page_count > 0);
2030                rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_READ, pol);
2031                LASSERT(list_empty(&rpclist));
2032
2033                osc_object_lock(osc);
2034        }
2035        return rc;
2036}
2037
2038#define list_to_obj(list, item) ({                                            \
2039        struct list_head *__tmp = (list)->next;                               \
2040        list_del_init(__tmp);                                         \
2041        list_entry(__tmp, struct osc_object, oo_##item);                      \
2042})
2043
2044/* This is called by osc_check_rpcs() to find which objects have pages that
2045 * we could be sending.  These lists are maintained by osc_makes_rpc(). */
2046static struct osc_object *osc_next_obj(struct client_obd *cli)
2047{
2048        /* First return objects that have blocked locks so that they
2049         * will be flushed quickly and other clients can get the lock,
2050         * then objects which have pages ready to be stuffed into RPCs */
2051        if (!list_empty(&cli->cl_loi_hp_ready_list))
2052                return list_to_obj(&cli->cl_loi_hp_ready_list, hp_ready_item);
2053        if (!list_empty(&cli->cl_loi_ready_list))
2054                return list_to_obj(&cli->cl_loi_ready_list, ready_item);
2055
2056        /* then if we have cache waiters, return all objects with queued
2057         * writes.  This is especially important when many small files
2058         * have filled up the cache and not been fired into rpcs because
2059         * they don't pass the nr_pending/object threshold */
2060        if (!list_empty(&cli->cl_cache_waiters) &&
2061            !list_empty(&cli->cl_loi_write_list))
2062                return list_to_obj(&cli->cl_loi_write_list, write_item);
2063
2064        /* then return all queued objects when we have an invalid import
2065         * so that they get flushed */
2066        if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2067                if (!list_empty(&cli->cl_loi_write_list))
2068                        return list_to_obj(&cli->cl_loi_write_list, write_item);
2069                if (!list_empty(&cli->cl_loi_read_list))
2070                        return list_to_obj(&cli->cl_loi_read_list, read_item);
2071        }
2072        return NULL;
2073}
2074
2075/* called with the loi list lock held */
2076static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
2077                           pdl_policy_t pol)
2078{
2079        struct osc_object *osc;
2080        int rc = 0;
2081
2082        while ((osc = osc_next_obj(cli)) != NULL) {
2083                struct cl_object *obj = osc2cl(osc);
2084                struct lu_ref_link link;
2085
2086                OSC_IO_DEBUG(osc, "%lu in flight\n", rpcs_in_flight(cli));
2087
2088                if (osc_max_rpc_in_flight(cli, osc)) {
2089                        __osc_list_maint(cli, osc);
2090                        break;
2091                }
2092
2093                cl_object_get(obj);
2094                client_obd_list_unlock(&cli->cl_loi_list_lock);
2095                lu_object_ref_add_at(&obj->co_lu, &link, "check",
2096                                     current);
2097
2098                /* attempt some read/write balancing by alternating between
2099                 * reads and writes in an object.  The makes_rpc checks here
2100                 * would be redundant if we were getting read/write work items
2101                 * instead of objects.  we don't want send_oap_rpc to drain a
2102                 * partial read pending queue when we're given this object to
2103                 * do io on writes while there are cache waiters */
2104                osc_object_lock(osc);
2105                if (osc_makes_rpc(cli, osc, OBD_BRW_WRITE)) {
2106                        rc = osc_send_write_rpc(env, cli, osc, pol);
2107                        if (rc < 0) {
2108                                CERROR("Write request failed with %d\n", rc);
2109
2110                                /* osc_send_write_rpc failed, mostly because of
2111                                 * memory pressure.
2112                                 *
2113                                 * It can't break here, because if:
2114                                 *  - a page was submitted by osc_io_submit, so
2115                                 *    page locked;
2116                                 *  - no request in flight
2117                                 *  - no subsequent request
2118                                 * The system will be in live-lock state,
2119                                 * because there is no chance to call
2120                                 * osc_io_unplug() and osc_check_rpcs() any
2121                                 * more. pdflush can't help in this case,
2122                                 * because it might be blocked at grabbing
2123                                 * the page lock as we mentioned.
2124                                 *
2125                                 * Anyway, continue to drain pages. */
2126                                /* break; */
2127                        }
2128                }
2129                if (osc_makes_rpc(cli, osc, OBD_BRW_READ)) {
2130                        rc = osc_send_read_rpc(env, cli, osc, pol);
2131                        if (rc < 0)
2132                                CERROR("Read request failed with %d\n", rc);
2133                }
2134                osc_object_unlock(osc);
2135
2136                osc_list_maint(cli, osc);
2137                lu_object_ref_del_at(&obj->co_lu, &link, "check",
2138                                     current);
2139                cl_object_put(env, obj);
2140
2141                client_obd_list_lock(&cli->cl_loi_list_lock);
2142        }
2143}
2144
2145static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
2146                          struct osc_object *osc, pdl_policy_t pol, int async)
2147{
2148        int rc = 0;
2149
2150        if (osc != NULL && osc_list_maint(cli, osc) == 0)
2151                return 0;
2152
2153        if (!async) {
2154                /* disable osc_lru_shrink() temporarily to avoid
2155                 * potential stack overrun problem. LU-2859 */
2156                atomic_inc(&cli->cl_lru_shrinkers);
2157                client_obd_list_lock(&cli->cl_loi_list_lock);
2158                osc_check_rpcs(env, cli, pol);
2159                client_obd_list_unlock(&cli->cl_loi_list_lock);
2160                atomic_dec(&cli->cl_lru_shrinkers);
2161        } else {
2162                CDEBUG(D_CACHE, "Queue writeback work for client %p.\n", cli);
2163                LASSERT(cli->cl_writeback_work != NULL);
2164                rc = ptlrpcd_queue_work(cli->cl_writeback_work);
2165        }
2166        return rc;
2167}
2168
2169static int osc_io_unplug_async(const struct lu_env *env,
2170                                struct client_obd *cli, struct osc_object *osc)
2171{
2172        /* XXX: policy is no use actually. */
2173        return osc_io_unplug0(env, cli, osc, PDL_POLICY_ROUND, 1);
2174}
2175
2176void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
2177                   struct osc_object *osc, pdl_policy_t pol)
2178{
2179        (void)osc_io_unplug0(env, cli, osc, pol, 0);
2180}
2181
2182int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
2183                        struct page *page, loff_t offset)
2184{
2185        struct obd_export     *exp = osc_export(osc);
2186        struct osc_async_page *oap = &ops->ops_oap;
2187
2188        if (!page)
2189                return cfs_size_round(sizeof(*oap));
2190
2191        oap->oap_magic = OAP_MAGIC;
2192        oap->oap_cli = &exp->exp_obd->u.cli;
2193        oap->oap_obj = osc;
2194
2195        oap->oap_page = page;
2196        oap->oap_obj_off = offset;
2197        LASSERT(!(offset & ~CFS_PAGE_MASK));
2198
2199        if (!client_is_remote(exp) && capable(CFS_CAP_SYS_RESOURCE))
2200                oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2201
2202        INIT_LIST_HEAD(&oap->oap_pending_item);
2203        INIT_LIST_HEAD(&oap->oap_rpc_item);
2204
2205        spin_lock_init(&oap->oap_lock);
2206        CDEBUG(D_INFO, "oap %p page %p obj off %llu\n",
2207               oap, page, oap->oap_obj_off);
2208        return 0;
2209}
2210
2211int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
2212                       struct osc_page *ops)
2213{
2214        struct osc_io *oio = osc_env_io(env);
2215        struct osc_extent     *ext = NULL;
2216        struct osc_async_page *oap = &ops->ops_oap;
2217        struct client_obd     *cli = oap->oap_cli;
2218        struct osc_object     *osc = oap->oap_obj;
2219        pgoff_t index;
2220        int    grants = 0;
2221        int    brw_flags = OBD_BRW_ASYNC;
2222        int    cmd = OBD_BRW_WRITE;
2223        int    need_release = 0;
2224        int    rc = 0;
2225
2226        if (oap->oap_magic != OAP_MAGIC)
2227                return -EINVAL;
2228
2229        if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2230                return -EIO;
2231
2232        if (!list_empty(&oap->oap_pending_item) ||
2233            !list_empty(&oap->oap_rpc_item))
2234                return -EBUSY;
2235
2236        /* Set the OBD_BRW_SRVLOCK before the page is queued. */
2237        brw_flags |= ops->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
2238        if (!client_is_remote(osc_export(osc)) &&
2239            capable(CFS_CAP_SYS_RESOURCE)) {
2240                brw_flags |= OBD_BRW_NOQUOTA;
2241                cmd |= OBD_BRW_NOQUOTA;
2242        }
2243
2244        /* check if the file's owner/group is over quota */
2245        if (!(cmd & OBD_BRW_NOQUOTA)) {
2246                struct cl_object *obj;
2247                struct cl_attr   *attr;
2248                unsigned int qid[MAXQUOTAS];
2249
2250                obj = cl_object_top(&osc->oo_cl);
2251                attr = &osc_env_info(env)->oti_attr;
2252
2253                cl_object_attr_lock(obj);
2254                rc = cl_object_attr_get(env, obj, attr);
2255                cl_object_attr_unlock(obj);
2256
2257                qid[USRQUOTA] = attr->cat_uid;
2258                qid[GRPQUOTA] = attr->cat_gid;
2259                if (rc == 0 && osc_quota_chkdq(cli, qid) == NO_QUOTA)
2260                        rc = -EDQUOT;
2261                if (rc)
2262                        return rc;
2263        }
2264
2265        oap->oap_cmd = cmd;
2266        oap->oap_page_off = ops->ops_from;
2267        oap->oap_count = ops->ops_to - ops->ops_from;
2268        oap->oap_async_flags = 0;
2269        oap->oap_brw_flags = brw_flags;
2270
2271        OSC_IO_DEBUG(osc, "oap %p page %p added for cmd %d\n",
2272                     oap, oap->oap_page, oap->oap_cmd & OBD_BRW_RWMASK);
2273
2274        index = oap2cl_page(oap)->cp_index;
2275
2276        /* Add this page into extent by the following steps:
2277         * 1. if there exists an active extent for this IO, mostly this page
2278         *    can be added to the active extent and sometimes we need to
2279         *    expand extent to accommodate this page;
2280         * 2. otherwise, a new extent will be allocated. */
2281
2282        ext = oio->oi_active;
2283        if (ext != NULL && ext->oe_start <= index && ext->oe_max_end >= index) {
2284                /* one chunk plus extent overhead must be enough to write this
2285                 * page */
2286                grants = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
2287                if (ext->oe_end >= index)
2288                        grants = 0;
2289
2290                /* it doesn't need any grant to dirty this page */
2291                client_obd_list_lock(&cli->cl_loi_list_lock);
2292                rc = osc_enter_cache_try(cli, oap, grants, 0);
2293                client_obd_list_unlock(&cli->cl_loi_list_lock);
2294                if (rc == 0) { /* try failed */
2295                        grants = 0;
2296                        need_release = 1;
2297                } else if (ext->oe_end < index) {
2298                        int tmp = grants;
2299                        /* try to expand this extent */
2300                        rc = osc_extent_expand(ext, index, &tmp);
2301                        if (rc < 0) {
2302                                need_release = 1;
2303                                /* don't free reserved grant */
2304                        } else {
2305                                OSC_EXTENT_DUMP(D_CACHE, ext,
2306                                                "expanded for %lu.\n", index);
2307                                osc_unreserve_grant(cli, grants, tmp);
2308                                grants = 0;
2309                        }
2310                }
2311                rc = 0;
2312        } else if (ext != NULL) {
2313                /* index is located outside of active extent */
2314                need_release = 1;
2315        }
2316        if (need_release) {
2317                osc_extent_release(env, ext);
2318                oio->oi_active = NULL;
2319                ext = NULL;
2320        }
2321
2322        if (ext == NULL) {
2323                int tmp = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
2324
2325                /* try to find new extent to cover this page */
2326                LASSERT(oio->oi_active == NULL);
2327                /* we may have allocated grant for this page if we failed
2328                 * to expand the previous active extent. */
2329                LASSERT(ergo(grants > 0, grants >= tmp));
2330
2331                rc = 0;
2332                if (grants == 0) {
2333                        /* we haven't allocated grant for this page. */
2334                        rc = osc_enter_cache(env, cli, oap, tmp);
2335                        if (rc == 0)
2336                                grants = tmp;
2337                }
2338
2339                tmp = grants;
2340                if (rc == 0) {
2341                        ext = osc_extent_find(env, osc, index, &tmp);
2342                        if (IS_ERR(ext)) {
2343                                LASSERT(tmp == grants);
2344                                osc_exit_cache(cli, oap);
2345                                rc = PTR_ERR(ext);
2346                                ext = NULL;
2347                        } else {
2348                                oio->oi_active = ext;
2349                        }
2350                }
2351                if (grants > 0)
2352                        osc_unreserve_grant(cli, grants, tmp);
2353        }
2354
2355        LASSERT(ergo(rc == 0, ext != NULL));
2356        if (ext != NULL) {
2357                EASSERTF(ext->oe_end >= index && ext->oe_start <= index,
2358                         ext, "index = %lu.\n", index);
2359                LASSERT((oap->oap_brw_flags & OBD_BRW_FROM_GRANT) != 0);
2360
2361                osc_object_lock(osc);
2362                if (ext->oe_nr_pages == 0)
2363                        ext->oe_srvlock = ops->ops_srvlock;
2364                else
2365                        LASSERT(ext->oe_srvlock == ops->ops_srvlock);
2366                ++ext->oe_nr_pages;
2367                list_add_tail(&oap->oap_pending_item, &ext->oe_pages);
2368                osc_object_unlock(osc);
2369        }
2370        return rc;
2371}
2372
2373int osc_teardown_async_page(const struct lu_env *env,
2374                            struct osc_object *obj, struct osc_page *ops)
2375{
2376        struct osc_async_page *oap = &ops->ops_oap;
2377        struct osc_extent     *ext = NULL;
2378        int rc = 0;
2379
2380        LASSERT(oap->oap_magic == OAP_MAGIC);
2381
2382        CDEBUG(D_INFO, "teardown oap %p page %p at index %lu.\n",
2383               oap, ops, oap2cl_page(oap)->cp_index);
2384
2385        osc_object_lock(obj);
2386        if (!list_empty(&oap->oap_rpc_item)) {
2387                CDEBUG(D_CACHE, "oap %p is not in cache.\n", oap);
2388                rc = -EBUSY;
2389        } else if (!list_empty(&oap->oap_pending_item)) {
2390                ext = osc_extent_lookup(obj, oap2cl_page(oap)->cp_index);
2391                /* only truncated pages are allowed to be taken out.
2392                 * See osc_extent_truncate() and osc_cache_truncate_start()
2393                 * for details. */
2394                if (ext != NULL && ext->oe_state != OES_TRUNC) {
2395                        OSC_EXTENT_DUMP(D_ERROR, ext, "trunc at %lu.\n",
2396                                        oap2cl_page(oap)->cp_index);
2397                        rc = -EBUSY;
2398                }
2399        }
2400        osc_object_unlock(obj);
2401        if (ext != NULL)
2402                osc_extent_put(env, ext);
2403        return rc;
2404}
2405
2406/**
2407 * This is called when a page is picked up by kernel to write out.
2408 *
2409 * We should find out the corresponding extent and add the whole extent
2410 * into urgent list. The extent may be being truncated or used, handle it
2411 * carefully.
2412 */
2413int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
2414                         struct osc_page *ops)
2415{
2416        struct osc_extent *ext   = NULL;
2417        struct osc_object *obj   = cl2osc(ops->ops_cl.cpl_obj);
2418        struct cl_page    *cp    = ops->ops_cl.cpl_page;
2419        pgoff_t     index = cp->cp_index;
2420        struct osc_async_page *oap = &ops->ops_oap;
2421        bool unplug = false;
2422        int rc = 0;
2423
2424        osc_object_lock(obj);
2425        ext = osc_extent_lookup(obj, index);
2426        if (ext == NULL) {
2427                osc_extent_tree_dump(D_ERROR, obj);
2428                LASSERTF(0, "page index %lu is NOT covered.\n", index);
2429        }
2430
2431        switch (ext->oe_state) {
2432        case OES_RPC:
2433        case OES_LOCK_DONE:
2434                CL_PAGE_DEBUG(D_ERROR, env, cl_page_top(cp),
2435                              "flush an in-rpc page?\n");
2436                LASSERT(0);
2437                break;
2438        case OES_LOCKING:
2439                /* If we know this extent is being written out, we should abort
2440                 * so that the writer can make this page ready. Otherwise, there
2441                 * exists a deadlock problem because other process can wait for
2442                 * page writeback bit holding page lock; and meanwhile in
2443                 * vvp_page_make_ready(), we need to grab page lock before
2444                 * really sending the RPC. */
2445        case OES_TRUNC:
2446                /* race with truncate, page will be redirtied */
2447        case OES_ACTIVE:
2448                /* The extent is active so we need to abort and let the caller
2449                 * re-dirty the page. If we continued on here, and we were the
2450                 * one making the extent active, we could deadlock waiting for
2451                 * the page writeback to clear but it won't because the extent
2452                 * is active and won't be written out. */
2453                rc = -EAGAIN;
2454                goto out;
2455        default:
2456                break;
2457        }
2458
2459        rc = cl_page_prep(env, io, cl_page_top(cp), CRT_WRITE);
2460        if (rc)
2461                goto out;
2462
2463        spin_lock(&oap->oap_lock);
2464        oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT;
2465        spin_unlock(&oap->oap_lock);
2466
2467        if (memory_pressure_get())
2468                ext->oe_memalloc = 1;
2469
2470        ext->oe_urgent = 1;
2471        if (ext->oe_state == OES_CACHE) {
2472                OSC_EXTENT_DUMP(D_CACHE, ext,
2473                                "flush page %p make it urgent.\n", oap);
2474                if (list_empty(&ext->oe_link))
2475                        list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
2476                unplug = true;
2477        }
2478        rc = 0;
2479
2480out:
2481        osc_object_unlock(obj);
2482        osc_extent_put(env, ext);
2483        if (unplug)
2484                osc_io_unplug_async(env, osc_cli(obj), obj);
2485        return rc;
2486}
2487
2488/**
2489 * this is called when a sync waiter receives an interruption.  Its job is to
2490 * get the caller woken as soon as possible.  If its page hasn't been put in an
2491 * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2492 * desiring interruption which will forcefully complete the rpc once the rpc
2493 * has timed out.
2494 */
2495int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops)
2496{
2497        struct osc_async_page *oap = &ops->ops_oap;
2498        struct osc_object     *obj = oap->oap_obj;
2499        struct client_obd     *cli = osc_cli(obj);
2500        struct osc_extent     *ext;
2501        struct osc_extent     *found = NULL;
2502        struct list_head            *plist;
2503        pgoff_t index = oap2cl_page(oap)->cp_index;
2504        int     rc = -EBUSY;
2505        int     cmd;
2506
2507        LASSERT(!oap->oap_interrupted);
2508        oap->oap_interrupted = 1;
2509
2510        /* Find out the caching extent */
2511        osc_object_lock(obj);
2512        if (oap->oap_cmd & OBD_BRW_WRITE) {
2513                plist = &obj->oo_urgent_exts;
2514                cmd   = OBD_BRW_WRITE;
2515        } else {
2516                plist = &obj->oo_reading_exts;
2517                cmd   = OBD_BRW_READ;
2518        }
2519        list_for_each_entry(ext, plist, oe_link) {
2520                if (ext->oe_start <= index && ext->oe_end >= index) {
2521                        LASSERT(ext->oe_state == OES_LOCK_DONE);
2522                        /* For OES_LOCK_DONE state extent, it has already held
2523                         * a refcount for RPC. */
2524                        found = osc_extent_get(ext);
2525                        break;
2526                }
2527        }
2528        if (found != NULL) {
2529                list_del_init(&found->oe_link);
2530                osc_update_pending(obj, cmd, -found->oe_nr_pages);
2531                osc_object_unlock(obj);
2532
2533                osc_extent_finish(env, found, 0, -EINTR);
2534                osc_extent_put(env, found);
2535                rc = 0;
2536        } else {
2537                osc_object_unlock(obj);
2538                /* ok, it's been put in an rpc. only one oap gets a request
2539                 * reference */
2540                if (oap->oap_request != NULL) {
2541                        ptlrpc_mark_interrupted(oap->oap_request);
2542                        ptlrpcd_wake(oap->oap_request);
2543                        ptlrpc_req_finished(oap->oap_request);
2544                        oap->oap_request = NULL;
2545                }
2546        }
2547
2548        osc_list_maint(cli, obj);
2549        return rc;
2550}
2551
2552int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
2553                         struct list_head *list, int cmd, int brw_flags)
2554{
2555        struct client_obd     *cli = osc_cli(obj);
2556        struct osc_extent     *ext;
2557        struct osc_async_page *oap, *tmp;
2558        int     page_count = 0;
2559        int     mppr       = cli->cl_max_pages_per_rpc;
2560        pgoff_t start      = CL_PAGE_EOF;
2561        pgoff_t end     = 0;
2562
2563        list_for_each_entry(oap, list, oap_pending_item) {
2564                struct cl_page *cp = oap2cl_page(oap);
2565                if (cp->cp_index > end)
2566                        end = cp->cp_index;
2567                if (cp->cp_index < start)
2568                        start = cp->cp_index;
2569                ++page_count;
2570                mppr <<= (page_count > mppr);
2571        }
2572
2573        ext = osc_extent_alloc(obj);
2574        if (ext == NULL) {
2575                list_for_each_entry_safe(oap, tmp, list, oap_pending_item) {
2576                        list_del_init(&oap->oap_pending_item);
2577                        osc_ap_completion(env, cli, oap, 0, -ENOMEM);
2578                }
2579                return -ENOMEM;
2580        }
2581
2582        ext->oe_rw = !!(cmd & OBD_BRW_READ);
2583        ext->oe_urgent = 1;
2584        ext->oe_start = start;
2585        ext->oe_end = ext->oe_max_end = end;
2586        ext->oe_obj = obj;
2587        ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
2588        ext->oe_nr_pages = page_count;
2589        ext->oe_mppr = mppr;
2590        list_splice_init(list, &ext->oe_pages);
2591
2592        osc_object_lock(obj);
2593        /* Reuse the initial refcount for RPC, don't drop it */
2594        osc_extent_state_set(ext, OES_LOCK_DONE);
2595        if (cmd & OBD_BRW_WRITE) {
2596                list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
2597                osc_update_pending(obj, OBD_BRW_WRITE, page_count);
2598        } else {
2599                list_add_tail(&ext->oe_link, &obj->oo_reading_exts);
2600                osc_update_pending(obj, OBD_BRW_READ, page_count);
2601        }
2602        osc_object_unlock(obj);
2603
2604        osc_io_unplug(env, cli, obj, PDL_POLICY_ROUND);
2605        return 0;
2606}
2607
2608/**
2609 * Called by osc_io_setattr_start() to freeze and destroy covering extents.
2610 */
2611int osc_cache_truncate_start(const struct lu_env *env, struct osc_io *oio,
2612                             struct osc_object *obj, __u64 size)
2613{
2614        struct client_obd *cli = osc_cli(obj);
2615        struct osc_extent *ext;
2616        struct osc_extent *waiting = NULL;
2617        pgoff_t index;
2618        LIST_HEAD(list);
2619        int result = 0;
2620        bool partial;
2621
2622        /* pages with index greater or equal to index will be truncated. */
2623        index = cl_index(osc2cl(obj), size);
2624        partial = size > cl_offset(osc2cl(obj), index);
2625
2626again:
2627        osc_object_lock(obj);
2628        ext = osc_extent_search(obj, index);
2629        if (ext == NULL)
2630                ext = first_extent(obj);
2631        else if (ext->oe_end < index)
2632                ext = next_extent(ext);
2633        while (ext != NULL) {
2634                EASSERT(ext->oe_state != OES_TRUNC, ext);
2635
2636                if (ext->oe_state > OES_CACHE || ext->oe_urgent) {
2637                        /* if ext is in urgent state, it means there must exist
2638                         * a page already having been flushed by write_page().
2639                         * We have to wait for this extent because we can't
2640                         * truncate that page. */
2641                        LASSERT(!ext->oe_hp);
2642                        OSC_EXTENT_DUMP(D_CACHE, ext,
2643                                        "waiting for busy extent\n");
2644                        waiting = osc_extent_get(ext);
2645                        break;
2646                }
2647
2648                OSC_EXTENT_DUMP(D_CACHE, ext, "try to trunc:%llu.\n", size);
2649
2650                osc_extent_get(ext);
2651                if (ext->oe_state == OES_ACTIVE) {
2652                        /* though we grab inode mutex for write path, but we
2653                         * release it before releasing extent(in osc_io_end()),
2654                         * so there is a race window that an extent is still
2655                         * in OES_ACTIVE when truncate starts. */
2656                        LASSERT(!ext->oe_trunc_pending);
2657                        ext->oe_trunc_pending = 1;
2658                } else {
2659                        EASSERT(ext->oe_state == OES_CACHE, ext);
2660                        osc_extent_state_set(ext, OES_TRUNC);
2661                        osc_update_pending(obj, OBD_BRW_WRITE,
2662                                           -ext->oe_nr_pages);
2663                }
2664                EASSERT(list_empty(&ext->oe_link), ext);
2665                list_add_tail(&ext->oe_link, &list);
2666
2667                ext = next_extent(ext);
2668        }
2669        osc_object_unlock(obj);
2670
2671        osc_list_maint(cli, obj);
2672
2673        while (!list_empty(&list)) {
2674                int rc;
2675
2676                ext = list_entry(list.next, struct osc_extent, oe_link);
2677                list_del_init(&ext->oe_link);
2678
2679                /* extent may be in OES_ACTIVE state because inode mutex
2680                 * is released before osc_io_end() in file write case */
2681                if (ext->oe_state != OES_TRUNC)
2682                        osc_extent_wait(env, ext, OES_TRUNC);
2683
2684                rc = osc_extent_truncate(ext, index, partial);
2685                if (rc < 0) {
2686                        if (result == 0)
2687                                result = rc;
2688
2689                        OSC_EXTENT_DUMP(D_ERROR, ext,
2690                                        "truncate error %d\n", rc);
2691                } else if (ext->oe_nr_pages == 0) {
2692                        osc_extent_remove(ext);
2693                } else {
2694                        /* this must be an overlapped extent which means only
2695                         * part of pages in this extent have been truncated.
2696                         */
2697                        EASSERTF(ext->oe_start <= index, ext,
2698                                 "trunc index = %lu/%d.\n", index, partial);
2699                        /* fix index to skip this partially truncated extent */
2700                        index = ext->oe_end + 1;
2701                        partial = false;
2702
2703                        /* we need to hold this extent in OES_TRUNC state so
2704                         * that no writeback will happen. This is to avoid
2705                         * BUG 17397. */
2706                        LASSERT(oio->oi_trunc == NULL);
2707                        oio->oi_trunc = osc_extent_get(ext);
2708                        OSC_EXTENT_DUMP(D_CACHE, ext,
2709                                        "trunc at %llu\n", size);
2710                }
2711                osc_extent_put(env, ext);
2712        }
2713        if (waiting != NULL) {
2714                int rc;
2715
2716                /* ignore the result of osc_extent_wait the write initiator
2717                 * should take care of it. */
2718                rc = osc_extent_wait(env, waiting, OES_INV);
2719                if (rc < 0)
2720                        OSC_EXTENT_DUMP(D_CACHE, waiting, "error: %d.\n", rc);
2721
2722                osc_extent_put(env, waiting);
2723                waiting = NULL;
2724                goto again;
2725        }
2726        return result;
2727}
2728
2729/**
2730 * Called after osc_io_setattr_end to add oio->oi_trunc back to cache.
2731 */
2732void osc_cache_truncate_end(const struct lu_env *env, struct osc_io *oio,
2733                            struct osc_object *obj)
2734{
2735        struct osc_extent *ext = oio->oi_trunc;
2736
2737        oio->oi_trunc = NULL;
2738        if (ext != NULL) {
2739                bool unplug = false;
2740
2741                EASSERT(ext->oe_nr_pages > 0, ext);
2742                EASSERT(ext->oe_state == OES_TRUNC, ext);
2743                EASSERT(!ext->oe_urgent, ext);
2744
2745                OSC_EXTENT_DUMP(D_CACHE, ext, "trunc -> cache.\n");
2746                osc_object_lock(obj);
2747                osc_extent_state_set(ext, OES_CACHE);
2748                if (ext->oe_fsync_wait && !ext->oe_urgent) {
2749                        ext->oe_urgent = 1;
2750                        list_move_tail(&ext->oe_link, &obj->oo_urgent_exts);
2751                        unplug = true;
2752                }
2753                osc_update_pending(obj, OBD_BRW_WRITE, ext->oe_nr_pages);
2754                osc_object_unlock(obj);
2755                osc_extent_put(env, ext);
2756
2757                if (unplug)
2758                        osc_io_unplug_async(env, osc_cli(obj), obj);
2759        }
2760}
2761
2762/**
2763 * Wait for extents in a specific range to be written out.
2764 * The caller must have called osc_cache_writeback_range() to issue IO
2765 * otherwise it will take a long time for this function to finish.
2766 *
2767 * Caller must hold inode_mutex , or cancel exclusive dlm lock so that
2768 * nobody else can dirty this range of file while we're waiting for
2769 * extents to be written.
2770 */
2771int osc_cache_wait_range(const struct lu_env *env, struct osc_object *obj,
2772                         pgoff_t start, pgoff_t end)
2773{
2774        struct osc_extent *ext;
2775        pgoff_t index = start;
2776        int     result = 0;
2777
2778again:
2779        osc_object_lock(obj);
2780        ext = osc_extent_search(obj, index);
2781        if (ext == NULL)
2782                ext = first_extent(obj);
2783        else if (ext->oe_end < index)
2784                ext = next_extent(ext);
2785        while (ext != NULL) {
2786                int rc;
2787
2788                if (ext->oe_start > end)
2789                        break;
2790
2791                if (!ext->oe_fsync_wait) {
2792                        ext = next_extent(ext);
2793                        continue;
2794                }
2795
2796                EASSERT(ergo(ext->oe_state == OES_CACHE,
2797                             ext->oe_hp || ext->oe_urgent), ext);
2798                EASSERT(ergo(ext->oe_state == OES_ACTIVE,
2799                             !ext->oe_hp && ext->oe_urgent), ext);
2800
2801                index = ext->oe_end + 1;
2802                osc_extent_get(ext);
2803                osc_object_unlock(obj);
2804
2805                rc = osc_extent_wait(env, ext, OES_INV);
2806                if (result == 0)
2807                        result = rc;
2808                osc_extent_put(env, ext);
2809                goto again;
2810        }
2811        osc_object_unlock(obj);
2812
2813        OSC_IO_DEBUG(obj, "sync file range.\n");
2814        return result;
2815}
2816
2817/**
2818 * Called to write out a range of osc object.
2819 *
2820 * @hp     : should be set this is caused by lock cancel;
2821 * @discard: is set if dirty pages should be dropped - file will be deleted or
2822 *         truncated, this implies there is no partially discarding extents.
2823 *
2824 * Return how many pages will be issued, or error code if error occurred.
2825 */
2826int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
2827                              pgoff_t start, pgoff_t end, int hp, int discard)
2828{
2829        struct osc_extent *ext;
2830        LIST_HEAD(discard_list);
2831        bool unplug = false;
2832        int result = 0;
2833
2834        osc_object_lock(obj);
2835        ext = osc_extent_search(obj, start);
2836        if (ext == NULL)
2837                ext = first_extent(obj);
2838        else if (ext->oe_end < start)
2839                ext = next_extent(ext);
2840        while (ext != NULL) {
2841                if (ext->oe_start > end)
2842                        break;
2843
2844                ext->oe_fsync_wait = 1;
2845                switch (ext->oe_state) {
2846                case OES_CACHE:
2847                        result += ext->oe_nr_pages;
2848                        if (!discard) {
2849                                struct list_head *list = NULL;
2850                                if (hp) {
2851                                        EASSERT(!ext->oe_hp, ext);
2852                                        ext->oe_hp = 1;
2853                                        list = &obj->oo_hp_exts;
2854                                } else if (!ext->oe_urgent) {
2855                                        ext->oe_urgent = 1;
2856                                        list = &obj->oo_urgent_exts;
2857                                }
2858                                if (list != NULL)
2859                                        list_move_tail(&ext->oe_link, list);
2860                                unplug = true;
2861                        } else {
2862                                /* the only discarder is lock cancelling, so
2863                                 * [start, end] must contain this extent */
2864                                EASSERT(ext->oe_start >= start &&
2865                                        ext->oe_max_end <= end, ext);
2866                                osc_extent_state_set(ext, OES_LOCKING);
2867                                ext->oe_owner = current;
2868                                list_move_tail(&ext->oe_link,
2869                                                   &discard_list);
2870                                osc_update_pending(obj, OBD_BRW_WRITE,
2871                                                   -ext->oe_nr_pages);
2872                        }
2873                        break;
2874                case OES_ACTIVE:
2875                        /* It's pretty bad to wait for ACTIVE extents, because
2876                         * we don't know how long we will wait for it to be
2877                         * flushed since it may be blocked at awaiting more
2878                         * grants. We do this for the correctness of fsync. */
2879                        LASSERT(hp == 0 && discard == 0);
2880                        ext->oe_urgent = 1;
2881                        break;
2882                case OES_TRUNC:
2883                        /* this extent is being truncated, can't do anything
2884                         * for it now. it will be set to urgent after truncate
2885                         * is finished in osc_cache_truncate_end(). */
2886                default:
2887                        break;
2888                }
2889                ext = next_extent(ext);
2890        }
2891        osc_object_unlock(obj);
2892
2893        LASSERT(ergo(!discard, list_empty(&discard_list)));
2894        if (!list_empty(&discard_list)) {
2895                struct osc_extent *tmp;
2896                int rc;
2897
2898                osc_list_maint(osc_cli(obj), obj);
2899                list_for_each_entry_safe(ext, tmp, &discard_list, oe_link) {
2900                        list_del_init(&ext->oe_link);
2901                        EASSERT(ext->oe_state == OES_LOCKING, ext);
2902
2903                        /* Discard caching pages. We don't actually write this
2904                         * extent out but we complete it as if we did. */
2905                        rc = osc_extent_make_ready(env, ext);
2906                        if (unlikely(rc < 0)) {
2907                                OSC_EXTENT_DUMP(D_ERROR, ext,
2908                                                "make_ready returned %d\n", rc);
2909                                if (result >= 0)
2910                                        result = rc;
2911                        }
2912
2913                        /* finish the extent as if the pages were sent */
2914                        osc_extent_finish(env, ext, 0, 0);
2915                }
2916        }
2917
2918        if (unplug)
2919                osc_io_unplug(env, osc_cli(obj), obj, PDL_POLICY_ROUND);
2920
2921        if (hp || discard) {
2922                int rc;
2923                rc = osc_cache_wait_range(env, obj, start, end);
2924                if (result >= 0 && rc < 0)
2925                        result = rc;
2926        }
2927
2928        OSC_IO_DEBUG(obj, "cache page out.\n");
2929        return result;
2930}
2931
2932/** @} osc */
2933