linux/drivers/staging/lustre/lustre/osc/osc_cache.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2012, Intel Corporation.
  31 *
  32 */
  33/*
  34 * This file is part of Lustre, http://www.lustre.org/
  35 * Lustre is a trademark of Sun Microsystems, Inc.
  36 *
  37 * osc cache management.
  38 *
  39 * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
  40 */
  41
  42#define DEBUG_SUBSYSTEM S_OSC
  43
  44#include "osc_cl_internal.h"
  45#include "osc_internal.h"
  46
  47static int extent_debug; /* set it to be true for more debug */
  48
  49static void osc_update_pending(struct osc_object *obj, int cmd, int delta);
  50static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
  51                           int state);
  52static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
  53                              struct osc_async_page *oap, int sent, int rc);
  54static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
  55                          int cmd);
  56static int osc_refresh_count(const struct lu_env *env,
  57                             struct osc_async_page *oap, int cmd);
  58static int osc_io_unplug_async(const struct lu_env *env,
  59                               struct client_obd *cli, struct osc_object *osc);
  60static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
  61                           unsigned int lost_grant);
  62
  63static void osc_extent_tree_dump0(int level, struct osc_object *obj,
  64                                  const char *func, int line);
  65#define osc_extent_tree_dump(lvl, obj) \
  66        osc_extent_tree_dump0(lvl, obj, __func__, __LINE__)
  67
  68/** \addtogroup osc
  69 *  @{
  70 */
  71
  72/* ------------------ osc extent ------------------ */
  73static inline char *ext_flags(struct osc_extent *ext, char *flags)
  74{
  75        char *buf = flags;
  76        *buf++ = ext->oe_rw ? 'r' : 'w';
  77        if (ext->oe_intree)
  78                *buf++ = 'i';
  79        if (ext->oe_srvlock)
  80                *buf++ = 's';
  81        if (ext->oe_hp)
  82                *buf++ = 'h';
  83        if (ext->oe_urgent)
  84                *buf++ = 'u';
  85        if (ext->oe_memalloc)
  86                *buf++ = 'm';
  87        if (ext->oe_trunc_pending)
  88                *buf++ = 't';
  89        if (ext->oe_fsync_wait)
  90                *buf++ = 'Y';
  91        *buf = 0;
  92        return flags;
  93}
  94
  95static inline char list_empty_marker(struct list_head *list)
  96{
  97        return list_empty(list) ? '-' : '+';
  98}
  99
 100#define EXTSTR       "[%lu -> %lu/%lu]"
 101#define EXTPARA(ext) (ext)->oe_start, (ext)->oe_end, (ext)->oe_max_end
 102static const char *oes_strings[] = {
 103        "inv", "active", "cache", "locking", "lockdone", "rpc", "trunc", NULL };
 104
 105#define OSC_EXTENT_DUMP(lvl, extent, fmt, ...) do {                           \
 106        struct osc_extent *__ext = (extent);                                  \
 107        char __buf[16];                                                       \
 108                                                                              \
 109        CDEBUG(lvl,                                                           \
 110                "extent %p@{" EXTSTR ", "                                     \
 111                "[%d|%d|%c|%s|%s|%p], [%d|%d|%c|%c|%p|%u|%p]} " fmt,          \
 112                /* ----- extent part 0 ----- */                               \
 113                __ext, EXTPARA(__ext),                                        \
 114                /* ----- part 1 ----- */                                      \
 115                atomic_read(&__ext->oe_refc),                         \
 116                atomic_read(&__ext->oe_users),                        \
 117                list_empty_marker(&__ext->oe_link),                           \
 118                oes_strings[__ext->oe_state], ext_flags(__ext, __buf),        \
 119                __ext->oe_obj,                                                \
 120                /* ----- part 2 ----- */                                      \
 121                __ext->oe_grants, __ext->oe_nr_pages,                         \
 122                list_empty_marker(&__ext->oe_pages),                          \
 123                waitqueue_active(&__ext->oe_waitq) ? '+' : '-',               \
 124                __ext->oe_osclock, __ext->oe_mppr, __ext->oe_owner,           \
 125                /* ----- part 4 ----- */                                      \
 126                ## __VA_ARGS__);                                              \
 127} while (0)
 128
 129#undef EASSERTF
 130#define EASSERTF(expr, ext, fmt, args...) do {                          \
 131        if (!(expr)) {                                                  \
 132                OSC_EXTENT_DUMP(D_ERROR, (ext), fmt, ##args);           \
 133                osc_extent_tree_dump(D_ERROR, (ext)->oe_obj);           \
 134                LASSERT(expr);                                          \
 135        }                                                               \
 136} while (0)
 137
 138#undef EASSERT
 139#define EASSERT(expr, ext) EASSERTF(expr, ext, "\n")
 140
 141static inline struct osc_extent *rb_extent(struct rb_node *n)
 142{
 143        if (n == NULL)
 144                return NULL;
 145
 146        return container_of(n, struct osc_extent, oe_node);
 147}
 148
 149static inline struct osc_extent *next_extent(struct osc_extent *ext)
 150{
 151        if (ext == NULL)
 152                return NULL;
 153
 154        LASSERT(ext->oe_intree);
 155        return rb_extent(rb_next(&ext->oe_node));
 156}
 157
 158static inline struct osc_extent *prev_extent(struct osc_extent *ext)
 159{
 160        if (ext == NULL)
 161                return NULL;
 162
 163        LASSERT(ext->oe_intree);
 164        return rb_extent(rb_prev(&ext->oe_node));
 165}
 166
 167static inline struct osc_extent *first_extent(struct osc_object *obj)
 168{
 169        return rb_extent(rb_first(&obj->oo_root));
 170}
 171
 172/* object must be locked by caller. */
 173static int osc_extent_sanity_check0(struct osc_extent *ext,
 174                                    const char *func, const int line)
 175{
 176        struct osc_object *obj = ext->oe_obj;
 177        struct osc_async_page *oap;
 178        int page_count;
 179        int rc = 0;
 180
 181        if (!osc_object_is_locked(obj))
 182                GOTO(out, rc = 9);
 183
 184        if (ext->oe_state >= OES_STATE_MAX)
 185                GOTO(out, rc = 10);
 186
 187        if (atomic_read(&ext->oe_refc) <= 0)
 188                GOTO(out, rc = 20);
 189
 190        if (atomic_read(&ext->oe_refc) < atomic_read(&ext->oe_users))
 191                GOTO(out, rc = 30);
 192
 193        switch (ext->oe_state) {
 194        case OES_INV:
 195                if (ext->oe_nr_pages > 0 || !list_empty(&ext->oe_pages))
 196                        GOTO(out, rc = 35);
 197                GOTO(out, rc = 0);
 198                break;
 199        case OES_ACTIVE:
 200                if (atomic_read(&ext->oe_users) == 0)
 201                        GOTO(out, rc = 40);
 202                if (ext->oe_hp)
 203                        GOTO(out, rc = 50);
 204                if (ext->oe_fsync_wait && !ext->oe_urgent)
 205                        GOTO(out, rc = 55);
 206                break;
 207        case OES_CACHE:
 208                if (ext->oe_grants == 0)
 209                        GOTO(out, rc = 60);
 210                if (ext->oe_fsync_wait && !ext->oe_urgent && !ext->oe_hp)
 211                        GOTO(out, rc = 65);
 212        default:
 213                if (atomic_read(&ext->oe_users) > 0)
 214                        GOTO(out, rc = 70);
 215        }
 216
 217        if (ext->oe_max_end < ext->oe_end || ext->oe_end < ext->oe_start)
 218                GOTO(out, rc = 80);
 219
 220        if (ext->oe_osclock == NULL && ext->oe_grants > 0)
 221                GOTO(out, rc = 90);
 222
 223        if (ext->oe_osclock) {
 224                struct cl_lock_descr *descr;
 225                descr = &ext->oe_osclock->cll_descr;
 226                if (!(descr->cld_start <= ext->oe_start &&
 227                      descr->cld_end >= ext->oe_max_end))
 228                        GOTO(out, rc = 100);
 229        }
 230
 231        if (ext->oe_nr_pages > ext->oe_mppr)
 232                GOTO(out, rc = 105);
 233
 234        /* Do not verify page list if extent is in RPC. This is because an
 235         * in-RPC extent is supposed to be exclusively accessible w/o lock. */
 236        if (ext->oe_state > OES_CACHE)
 237                GOTO(out, rc = 0);
 238
 239        if (!extent_debug)
 240                GOTO(out, rc = 0);
 241
 242        page_count = 0;
 243        list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
 244                pgoff_t index = oap2cl_page(oap)->cp_index;
 245                ++page_count;
 246                if (index > ext->oe_end || index < ext->oe_start)
 247                        GOTO(out, rc = 110);
 248        }
 249        if (page_count != ext->oe_nr_pages)
 250                GOTO(out, rc = 120);
 251
 252out:
 253        if (rc != 0)
 254                OSC_EXTENT_DUMP(D_ERROR, ext,
 255                                "%s:%d sanity check %p failed with rc = %d\n",
 256                                func, line, ext, rc);
 257        return rc;
 258}
 259
 260#define sanity_check_nolock(ext) \
 261        osc_extent_sanity_check0(ext, __func__, __LINE__)
 262
 263#define sanity_check(ext) ({                                               \
 264        int __res;                                                           \
 265        osc_object_lock((ext)->oe_obj);                                 \
 266        __res = sanity_check_nolock(ext);                                     \
 267        osc_object_unlock((ext)->oe_obj);                                     \
 268        __res;                                                           \
 269})
 270
 271
 272/**
 273 * sanity check - to make sure there is no overlapped extent in the tree.
 274 */
 275static int osc_extent_is_overlapped(struct osc_object *obj,
 276                                    struct osc_extent *ext)
 277{
 278        struct osc_extent *tmp;
 279
 280        LASSERT(osc_object_is_locked(obj));
 281
 282        if (!extent_debug)
 283                return 0;
 284
 285        for (tmp = first_extent(obj); tmp != NULL; tmp = next_extent(tmp)) {
 286                if (tmp == ext)
 287                        continue;
 288                if (tmp->oe_end >= ext->oe_start &&
 289                    tmp->oe_start <= ext->oe_end)
 290                        return 1;
 291        }
 292        return 0;
 293}
 294
 295static void osc_extent_state_set(struct osc_extent *ext, int state)
 296{
 297        LASSERT(osc_object_is_locked(ext->oe_obj));
 298        LASSERT(state >= OES_INV && state < OES_STATE_MAX);
 299
 300        /* Never try to sanity check a state changing extent :-) */
 301        /* LASSERT(sanity_check_nolock(ext) == 0); */
 302
 303        /* TODO: validate the state machine */
 304        ext->oe_state = state;
 305        wake_up_all(&ext->oe_waitq);
 306}
 307
 308static struct osc_extent *osc_extent_alloc(struct osc_object *obj)
 309{
 310        struct osc_extent *ext;
 311
 312        OBD_SLAB_ALLOC_PTR_GFP(ext, osc_extent_kmem, GFP_IOFS);
 313        if (ext == NULL)
 314                return NULL;
 315
 316        RB_CLEAR_NODE(&ext->oe_node);
 317        ext->oe_obj = obj;
 318        atomic_set(&ext->oe_refc, 1);
 319        atomic_set(&ext->oe_users, 0);
 320        INIT_LIST_HEAD(&ext->oe_link);
 321        ext->oe_state = OES_INV;
 322        INIT_LIST_HEAD(&ext->oe_pages);
 323        init_waitqueue_head(&ext->oe_waitq);
 324        ext->oe_osclock = NULL;
 325
 326        return ext;
 327}
 328
 329static void osc_extent_free(struct osc_extent *ext)
 330{
 331        OBD_SLAB_FREE_PTR(ext, osc_extent_kmem);
 332}
 333
 334static struct osc_extent *osc_extent_get(struct osc_extent *ext)
 335{
 336        LASSERT(atomic_read(&ext->oe_refc) >= 0);
 337        atomic_inc(&ext->oe_refc);
 338        return ext;
 339}
 340
 341static void osc_extent_put(const struct lu_env *env, struct osc_extent *ext)
 342{
 343        LASSERT(atomic_read(&ext->oe_refc) > 0);
 344        if (atomic_dec_and_test(&ext->oe_refc)) {
 345                LASSERT(list_empty(&ext->oe_link));
 346                LASSERT(atomic_read(&ext->oe_users) == 0);
 347                LASSERT(ext->oe_state == OES_INV);
 348                LASSERT(!ext->oe_intree);
 349
 350                if (ext->oe_osclock) {
 351                        cl_lock_put(env, ext->oe_osclock);
 352                        ext->oe_osclock = NULL;
 353                }
 354                osc_extent_free(ext);
 355        }
 356}
 357
 358/**
 359 * osc_extent_put_trust() is a special version of osc_extent_put() when
 360 * it's known that the caller is not the last user. This is to address the
 361 * problem of lacking of lu_env ;-).
 362 */
 363static void osc_extent_put_trust(struct osc_extent *ext)
 364{
 365        LASSERT(atomic_read(&ext->oe_refc) > 1);
 366        LASSERT(osc_object_is_locked(ext->oe_obj));
 367        atomic_dec(&ext->oe_refc);
 368}
 369
 370/**
 371 * Return the extent which includes pgoff @index, or return the greatest
 372 * previous extent in the tree.
 373 */
 374static struct osc_extent *osc_extent_search(struct osc_object *obj,
 375                                            pgoff_t index)
 376{
 377        struct rb_node    *n = obj->oo_root.rb_node;
 378        struct osc_extent *tmp, *p = NULL;
 379
 380        LASSERT(osc_object_is_locked(obj));
 381        while (n != NULL) {
 382                tmp = rb_extent(n);
 383                if (index < tmp->oe_start) {
 384                        n = n->rb_left;
 385                } else if (index > tmp->oe_end) {
 386                        p = rb_extent(n);
 387                        n = n->rb_right;
 388                } else {
 389                        return tmp;
 390                }
 391        }
 392        return p;
 393}
 394
 395/*
 396 * Return the extent covering @index, otherwise return NULL.
 397 * caller must have held object lock.
 398 */
 399static struct osc_extent *osc_extent_lookup(struct osc_object *obj,
 400                                            pgoff_t index)
 401{
 402        struct osc_extent *ext;
 403
 404        ext = osc_extent_search(obj, index);
 405        if (ext != NULL && ext->oe_start <= index && index <= ext->oe_end)
 406                return osc_extent_get(ext);
 407        return NULL;
 408}
 409
 410/* caller must have held object lock. */
 411static void osc_extent_insert(struct osc_object *obj, struct osc_extent *ext)
 412{
 413        struct rb_node   **n      = &obj->oo_root.rb_node;
 414        struct rb_node    *parent = NULL;
 415        struct osc_extent *tmp;
 416
 417        LASSERT(ext->oe_intree == 0);
 418        LASSERT(ext->oe_obj == obj);
 419        LASSERT(osc_object_is_locked(obj));
 420        while (*n != NULL) {
 421                tmp = rb_extent(*n);
 422                parent = *n;
 423
 424                if (ext->oe_end < tmp->oe_start)
 425                        n = &(*n)->rb_left;
 426                else if (ext->oe_start > tmp->oe_end)
 427                        n = &(*n)->rb_right;
 428                else
 429                        EASSERTF(0, tmp, EXTSTR, EXTPARA(ext));
 430        }
 431        rb_link_node(&ext->oe_node, parent, n);
 432        rb_insert_color(&ext->oe_node, &obj->oo_root);
 433        osc_extent_get(ext);
 434        ext->oe_intree = 1;
 435}
 436
 437/* caller must have held object lock. */
 438static void osc_extent_erase(struct osc_extent *ext)
 439{
 440        struct osc_object *obj = ext->oe_obj;
 441        LASSERT(osc_object_is_locked(obj));
 442        if (ext->oe_intree) {
 443                rb_erase(&ext->oe_node, &obj->oo_root);
 444                ext->oe_intree = 0;
 445                /* rbtree held a refcount */
 446                osc_extent_put_trust(ext);
 447        }
 448}
 449
 450static struct osc_extent *osc_extent_hold(struct osc_extent *ext)
 451{
 452        struct osc_object *obj = ext->oe_obj;
 453
 454        LASSERT(osc_object_is_locked(obj));
 455        LASSERT(ext->oe_state == OES_ACTIVE || ext->oe_state == OES_CACHE);
 456        if (ext->oe_state == OES_CACHE) {
 457                osc_extent_state_set(ext, OES_ACTIVE);
 458                osc_update_pending(obj, OBD_BRW_WRITE, -ext->oe_nr_pages);
 459        }
 460        atomic_inc(&ext->oe_users);
 461        list_del_init(&ext->oe_link);
 462        return osc_extent_get(ext);
 463}
 464
 465static void __osc_extent_remove(struct osc_extent *ext)
 466{
 467        LASSERT(osc_object_is_locked(ext->oe_obj));
 468        LASSERT(list_empty(&ext->oe_pages));
 469        osc_extent_erase(ext);
 470        list_del_init(&ext->oe_link);
 471        osc_extent_state_set(ext, OES_INV);
 472        OSC_EXTENT_DUMP(D_CACHE, ext, "destroyed.\n");
 473}
 474
 475static void osc_extent_remove(struct osc_extent *ext)
 476{
 477        struct osc_object *obj = ext->oe_obj;
 478
 479        osc_object_lock(obj);
 480        __osc_extent_remove(ext);
 481        osc_object_unlock(obj);
 482}
 483
 484/**
 485 * This function is used to merge extents to get better performance. It checks
 486 * if @cur and @victim are contiguous at chunk level.
 487 */
 488static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
 489                            struct osc_extent *victim)
 490{
 491        struct osc_object *obj = cur->oe_obj;
 492        pgoff_t chunk_start;
 493        pgoff_t chunk_end;
 494        int ppc_bits;
 495
 496        LASSERT(cur->oe_state == OES_CACHE);
 497        LASSERT(osc_object_is_locked(obj));
 498        if (victim == NULL)
 499                return -EINVAL;
 500
 501        if (victim->oe_state != OES_CACHE || victim->oe_fsync_wait)
 502                return -EBUSY;
 503
 504        if (cur->oe_max_end != victim->oe_max_end)
 505                return -ERANGE;
 506
 507        LASSERT(cur->oe_osclock == victim->oe_osclock);
 508        ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_CACHE_SHIFT;
 509        chunk_start = cur->oe_start >> ppc_bits;
 510        chunk_end   = cur->oe_end   >> ppc_bits;
 511        if (chunk_start   != (victim->oe_end >> ppc_bits) + 1 &&
 512            chunk_end + 1 != victim->oe_start >> ppc_bits)
 513                return -ERANGE;
 514
 515        OSC_EXTENT_DUMP(D_CACHE, victim, "will be merged by %p.\n", cur);
 516
 517        cur->oe_start     = min(cur->oe_start, victim->oe_start);
 518        cur->oe_end       = max(cur->oe_end,   victim->oe_end);
 519        cur->oe_grants   += victim->oe_grants;
 520        cur->oe_nr_pages += victim->oe_nr_pages;
 521        /* only the following bits are needed to merge */
 522        cur->oe_urgent   |= victim->oe_urgent;
 523        cur->oe_memalloc |= victim->oe_memalloc;
 524        list_splice_init(&victim->oe_pages, &cur->oe_pages);
 525        list_del_init(&victim->oe_link);
 526        victim->oe_nr_pages = 0;
 527
 528        osc_extent_get(victim);
 529        __osc_extent_remove(victim);
 530        osc_extent_put(env, victim);
 531
 532        OSC_EXTENT_DUMP(D_CACHE, cur, "after merging %p.\n", victim);
 533        return 0;
 534}
 535
 536/**
 537 * Drop user count of osc_extent, and unplug IO asynchronously.
 538 */
 539int osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
 540{
 541        struct osc_object *obj = ext->oe_obj;
 542        int rc = 0;
 543        ENTRY;
 544
 545        LASSERT(atomic_read(&ext->oe_users) > 0);
 546        LASSERT(sanity_check(ext) == 0);
 547        LASSERT(ext->oe_grants > 0);
 548
 549        if (atomic_dec_and_lock(&ext->oe_users, &obj->oo_lock)) {
 550                LASSERT(ext->oe_state == OES_ACTIVE);
 551                if (ext->oe_trunc_pending) {
 552                        /* a truncate process is waiting for this extent.
 553                         * This may happen due to a race, check
 554                         * osc_cache_truncate_start(). */
 555                        osc_extent_state_set(ext, OES_TRUNC);
 556                        ext->oe_trunc_pending = 0;
 557                } else {
 558                        osc_extent_state_set(ext, OES_CACHE);
 559                        osc_update_pending(obj, OBD_BRW_WRITE,
 560                                           ext->oe_nr_pages);
 561
 562                        /* try to merge the previous and next extent. */
 563                        osc_extent_merge(env, ext, prev_extent(ext));
 564                        osc_extent_merge(env, ext, next_extent(ext));
 565
 566                        if (ext->oe_urgent)
 567                                list_move_tail(&ext->oe_link,
 568                                                   &obj->oo_urgent_exts);
 569                }
 570                osc_object_unlock(obj);
 571
 572                osc_io_unplug_async(env, osc_cli(obj), obj);
 573        }
 574        osc_extent_put(env, ext);
 575        RETURN(rc);
 576}
 577
 578static inline int overlapped(struct osc_extent *ex1, struct osc_extent *ex2)
 579{
 580        return !(ex1->oe_end < ex2->oe_start || ex2->oe_end < ex1->oe_start);
 581}
 582
 583/**
 584 * Find or create an extent which includes @index, core function to manage
 585 * extent tree.
 586 */
 587struct osc_extent *osc_extent_find(const struct lu_env *env,
 588                                   struct osc_object *obj, pgoff_t index,
 589                                   int *grants)
 590
 591{
 592        struct client_obd *cli = osc_cli(obj);
 593        struct cl_lock    *lock;
 594        struct osc_extent *cur;
 595        struct osc_extent *ext;
 596        struct osc_extent *conflict = NULL;
 597        struct osc_extent *found = NULL;
 598        pgoff_t    chunk;
 599        pgoff_t    max_end;
 600        int     max_pages; /* max_pages_per_rpc */
 601        int     chunksize;
 602        int     ppc_bits; /* pages per chunk bits */
 603        int     chunk_mask;
 604        int     rc;
 605        ENTRY;
 606
 607        cur = osc_extent_alloc(obj);
 608        if (cur == NULL)
 609                RETURN(ERR_PTR(-ENOMEM));
 610
 611        lock = cl_lock_at_pgoff(env, osc2cl(obj), index, NULL, 1, 0);
 612        LASSERT(lock != NULL);
 613        LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
 614
 615        LASSERT(cli->cl_chunkbits >= PAGE_CACHE_SHIFT);
 616        ppc_bits   = cli->cl_chunkbits - PAGE_CACHE_SHIFT;
 617        chunk_mask = ~((1 << ppc_bits) - 1);
 618        chunksize  = 1 << cli->cl_chunkbits;
 619        chunk      = index >> ppc_bits;
 620
 621        /* align end to rpc edge, rpc size may not be a power 2 integer. */
 622        max_pages = cli->cl_max_pages_per_rpc;
 623        LASSERT((max_pages & ~chunk_mask) == 0);
 624        max_end = index - (index % max_pages) + max_pages - 1;
 625        max_end = min_t(pgoff_t, max_end, lock->cll_descr.cld_end);
 626
 627        /* initialize new extent by parameters so far */
 628        cur->oe_max_end = max_end;
 629        cur->oe_start   = index & chunk_mask;
 630        cur->oe_end     = ((index + ~chunk_mask + 1) & chunk_mask) - 1;
 631        if (cur->oe_start < lock->cll_descr.cld_start)
 632                cur->oe_start = lock->cll_descr.cld_start;
 633        if (cur->oe_end > max_end)
 634                cur->oe_end = max_end;
 635        cur->oe_osclock = lock;
 636        cur->oe_grants  = 0;
 637        cur->oe_mppr    = max_pages;
 638
 639        /* grants has been allocated by caller */
 640        LASSERTF(*grants >= chunksize + cli->cl_extent_tax,
 641                 "%u/%u/%u.\n", *grants, chunksize, cli->cl_extent_tax);
 642        LASSERTF((max_end - cur->oe_start) < max_pages, EXTSTR, EXTPARA(cur));
 643
 644restart:
 645        osc_object_lock(obj);
 646        ext = osc_extent_search(obj, cur->oe_start);
 647        if (ext == NULL)
 648                ext = first_extent(obj);
 649        while (ext != NULL) {
 650                loff_t ext_chk_start = ext->oe_start >> ppc_bits;
 651                loff_t ext_chk_end   = ext->oe_end   >> ppc_bits;
 652
 653                LASSERT(sanity_check_nolock(ext) == 0);
 654                if (chunk > ext_chk_end + 1)
 655                        break;
 656
 657                /* if covering by different locks, no chance to match */
 658                if (lock != ext->oe_osclock) {
 659                        EASSERTF(!overlapped(ext, cur), ext,
 660                                 EXTSTR, EXTPARA(cur));
 661
 662                        ext = next_extent(ext);
 663                        continue;
 664                }
 665
 666                /* discontiguous chunks? */
 667                if (chunk + 1 < ext_chk_start) {
 668                        ext = next_extent(ext);
 669                        continue;
 670                }
 671
 672                /* ok, from now on, ext and cur have these attrs:
 673                 * 1. covered by the same lock
 674                 * 2. contiguous at chunk level or overlapping. */
 675
 676                if (overlapped(ext, cur)) {
 677                        /* cur is the minimum unit, so overlapping means
 678                         * full contain. */
 679                        EASSERTF((ext->oe_start <= cur->oe_start &&
 680                                  ext->oe_end >= cur->oe_end),
 681                                 ext, EXTSTR, EXTPARA(cur));
 682
 683                        if (ext->oe_state > OES_CACHE || ext->oe_fsync_wait) {
 684                                /* for simplicity, we wait for this extent to
 685                                 * finish before going forward. */
 686                                conflict = osc_extent_get(ext);
 687                                break;
 688                        }
 689
 690                        found = osc_extent_hold(ext);
 691                        break;
 692                }
 693
 694                /* non-overlapped extent */
 695                if (ext->oe_state != OES_CACHE || ext->oe_fsync_wait) {
 696                        /* we can't do anything for a non OES_CACHE extent, or
 697                         * if there is someone waiting for this extent to be
 698                         * flushed, try next one. */
 699                        ext = next_extent(ext);
 700                        continue;
 701                }
 702
 703                /* check if they belong to the same rpc slot before trying to
 704                 * merge. the extents are not overlapped and contiguous at
 705                 * chunk level to get here. */
 706                if (ext->oe_max_end != max_end) {
 707                        /* if they don't belong to the same RPC slot or
 708                         * max_pages_per_rpc has ever changed, do not merge. */
 709                        ext = next_extent(ext);
 710                        continue;
 711                }
 712
 713                /* it's required that an extent must be contiguous at chunk
 714                 * level so that we know the whole extent is covered by grant
 715                 * (the pages in the extent are NOT required to be contiguous).
 716                 * Otherwise, it will be too much difficult to know which
 717                 * chunks have grants allocated. */
 718
 719                /* try to do front merge - extend ext's start */
 720                if (chunk + 1 == ext_chk_start) {
 721                        /* ext must be chunk size aligned */
 722                        EASSERT((ext->oe_start & ~chunk_mask) == 0, ext);
 723
 724                        /* pull ext's start back to cover cur */
 725                        ext->oe_start   = cur->oe_start;
 726                        ext->oe_grants += chunksize;
 727                        *grants -= chunksize;
 728
 729                        found = osc_extent_hold(ext);
 730                } else if (chunk == ext_chk_end + 1) {
 731                        /* rear merge */
 732                        ext->oe_end     = cur->oe_end;
 733                        ext->oe_grants += chunksize;
 734                        *grants -= chunksize;
 735
 736                        /* try to merge with the next one because we just fill
 737                         * in a gap */
 738                        if (osc_extent_merge(env, ext, next_extent(ext)) == 0)
 739                                /* we can save extent tax from next extent */
 740                                *grants += cli->cl_extent_tax;
 741
 742                        found = osc_extent_hold(ext);
 743                }
 744                if (found != NULL)
 745                        break;
 746
 747                ext = next_extent(ext);
 748        }
 749
 750        osc_extent_tree_dump(D_CACHE, obj);
 751        if (found != NULL) {
 752                LASSERT(conflict == NULL);
 753                if (!IS_ERR(found)) {
 754                        LASSERT(found->oe_osclock == cur->oe_osclock);
 755                        OSC_EXTENT_DUMP(D_CACHE, found,
 756                                        "found caching ext for %lu.\n", index);
 757                }
 758        } else if (conflict == NULL) {
 759                /* create a new extent */
 760                EASSERT(osc_extent_is_overlapped(obj, cur) == 0, cur);
 761                cur->oe_grants = chunksize + cli->cl_extent_tax;
 762                *grants -= cur->oe_grants;
 763                LASSERT(*grants >= 0);
 764
 765                cur->oe_state = OES_CACHE;
 766                found = osc_extent_hold(cur);
 767                osc_extent_insert(obj, cur);
 768                OSC_EXTENT_DUMP(D_CACHE, cur, "add into tree %lu/%lu.\n",
 769                                index, lock->cll_descr.cld_end);
 770        }
 771        osc_object_unlock(obj);
 772
 773        if (conflict != NULL) {
 774                LASSERT(found == NULL);
 775
 776                /* waiting for IO to finish. Please notice that it's impossible
 777                 * to be an OES_TRUNC extent. */
 778                rc = osc_extent_wait(env, conflict, OES_INV);
 779                osc_extent_put(env, conflict);
 780                conflict = NULL;
 781                if (rc < 0)
 782                        GOTO(out, found = ERR_PTR(rc));
 783
 784                goto restart;
 785        }
 786        EXIT;
 787
 788out:
 789        osc_extent_put(env, cur);
 790        LASSERT(*grants >= 0);
 791        return found;
 792}
 793
 794/**
 795 * Called when IO is finished to an extent.
 796 */
 797int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
 798                      int sent, int rc)
 799{
 800        struct client_obd *cli = osc_cli(ext->oe_obj);
 801        struct osc_async_page *oap;
 802        struct osc_async_page *tmp;
 803        int nr_pages = ext->oe_nr_pages;
 804        int lost_grant = 0;
 805        int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
 806        __u64 last_off = 0;
 807        int last_count = -1;
 808        ENTRY;
 809
 810        OSC_EXTENT_DUMP(D_CACHE, ext, "extent finished.\n");
 811
 812        ext->oe_rc = rc ?: ext->oe_nr_pages;
 813        EASSERT(ergo(rc == 0, ext->oe_state == OES_RPC), ext);
 814        list_for_each_entry_safe(oap, tmp, &ext->oe_pages,
 815                                     oap_pending_item) {
 816                list_del_init(&oap->oap_rpc_item);
 817                list_del_init(&oap->oap_pending_item);
 818                if (last_off <= oap->oap_obj_off) {
 819                        last_off = oap->oap_obj_off;
 820                        last_count = oap->oap_count;
 821                }
 822
 823                --ext->oe_nr_pages;
 824                osc_ap_completion(env, cli, oap, sent, rc);
 825        }
 826        EASSERT(ext->oe_nr_pages == 0, ext);
 827
 828        if (!sent) {
 829                lost_grant = ext->oe_grants;
 830        } else if (blocksize < PAGE_CACHE_SIZE &&
 831                   last_count != PAGE_CACHE_SIZE) {
 832                /* For short writes we shouldn't count parts of pages that
 833                 * span a whole chunk on the OST side, or our accounting goes
 834                 * wrong.  Should match the code in filter_grant_check. */
 835                int offset = oap->oap_page_off & ~CFS_PAGE_MASK;
 836                int count = oap->oap_count + (offset & (blocksize - 1));
 837                int end = (offset + oap->oap_count) & (blocksize - 1);
 838                if (end)
 839                        count += blocksize - end;
 840
 841                lost_grant = PAGE_CACHE_SIZE - count;
 842        }
 843        if (ext->oe_grants > 0)
 844                osc_free_grant(cli, nr_pages, lost_grant);
 845
 846        osc_extent_remove(ext);
 847        /* put the refcount for RPC */
 848        osc_extent_put(env, ext);
 849        RETURN(0);
 850}
 851
 852static int extent_wait_cb(struct osc_extent *ext, int state)
 853{
 854        int ret;
 855
 856        osc_object_lock(ext->oe_obj);
 857        ret = ext->oe_state == state;
 858        osc_object_unlock(ext->oe_obj);
 859
 860        return ret;
 861}
 862
 863/**
 864 * Wait for the extent's state to become @state.
 865 */
 866static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
 867                           int state)
 868{
 869        struct osc_object *obj = ext->oe_obj;
 870        struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL,
 871                                                  LWI_ON_SIGNAL_NOOP, NULL);
 872        int rc = 0;
 873        ENTRY;
 874
 875        osc_object_lock(obj);
 876        LASSERT(sanity_check_nolock(ext) == 0);
 877        /* `Kick' this extent only if the caller is waiting for it to be
 878         * written out. */
 879        if (state == OES_INV && !ext->oe_urgent && !ext->oe_hp) {
 880                if (ext->oe_state == OES_ACTIVE) {
 881                        ext->oe_urgent = 1;
 882                } else if (ext->oe_state == OES_CACHE) {
 883                        ext->oe_urgent = 1;
 884                        osc_extent_hold(ext);
 885                        rc = 1;
 886                }
 887        }
 888        osc_object_unlock(obj);
 889        if (rc == 1)
 890                osc_extent_release(env, ext);
 891
 892        /* wait for the extent until its state becomes @state */
 893        rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state), &lwi);
 894        if (rc == -ETIMEDOUT) {
 895                OSC_EXTENT_DUMP(D_ERROR, ext,
 896                        "%s: wait ext to %d timedout, recovery in progress?\n",
 897                        osc_export(obj)->exp_obd->obd_name, state);
 898
 899                lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
 900                rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state),
 901                                  &lwi);
 902        }
 903        if (rc == 0 && ext->oe_rc < 0)
 904                rc = ext->oe_rc;
 905        RETURN(rc);
 906}
 907
 908/**
 909 * Discard pages with index greater than @size. If @ext is overlapped with
 910 * @size, then partial truncate happens.
 911 */
 912static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
 913                                bool partial)
 914{
 915        struct cl_env_nest     nest;
 916        struct lu_env    *env;
 917        struct cl_io      *io;
 918        struct osc_object     *obj = ext->oe_obj;
 919        struct client_obd     *cli = osc_cli(obj);
 920        struct osc_async_page *oap;
 921        struct osc_async_page *tmp;
 922        int                 pages_in_chunk = 0;
 923        int                 ppc_bits    = cli->cl_chunkbits - PAGE_CACHE_SHIFT;
 924        __u64             trunc_chunk = trunc_index >> ppc_bits;
 925        int                 grants   = 0;
 926        int                 nr_pages = 0;
 927        int                 rc       = 0;
 928        ENTRY;
 929
 930        LASSERT(sanity_check(ext) == 0);
 931        LASSERT(ext->oe_state == OES_TRUNC);
 932        LASSERT(!ext->oe_urgent);
 933
 934        /* Request new lu_env.
 935         * We can't use that env from osc_cache_truncate_start() because
 936         * it's from lov_io_sub and not fully initialized. */
 937        env = cl_env_nested_get(&nest);
 938        io  = &osc_env_info(env)->oti_io;
 939        io->ci_obj = cl_object_top(osc2cl(obj));
 940        rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
 941        if (rc < 0)
 942                GOTO(out, rc);
 943
 944        /* discard all pages with index greater then trunc_index */
 945        list_for_each_entry_safe(oap, tmp, &ext->oe_pages,
 946                                     oap_pending_item) {
 947                struct cl_page  *sub  = oap2cl_page(oap);
 948                struct cl_page  *page = cl_page_top(sub);
 949
 950                LASSERT(list_empty(&oap->oap_rpc_item));
 951
 952                /* only discard the pages with their index greater than
 953                 * trunc_index, and ... */
 954                if (sub->cp_index < trunc_index ||
 955                    (sub->cp_index == trunc_index && partial)) {
 956                        /* accounting how many pages remaining in the chunk
 957                         * so that we can calculate grants correctly. */
 958                        if (sub->cp_index >> ppc_bits == trunc_chunk)
 959                                ++pages_in_chunk;
 960                        continue;
 961                }
 962
 963                list_del_init(&oap->oap_pending_item);
 964
 965                cl_page_get(page);
 966                lu_ref_add(&page->cp_reference, "truncate", current);
 967
 968                if (cl_page_own(env, io, page) == 0) {
 969                        cl_page_unmap(env, io, page);
 970                        cl_page_discard(env, io, page);
 971                        cl_page_disown(env, io, page);
 972                } else {
 973                        LASSERT(page->cp_state == CPS_FREEING);
 974                        LASSERT(0);
 975                }
 976
 977                lu_ref_del(&page->cp_reference, "truncate", current);
 978                cl_page_put(env, page);
 979
 980                --ext->oe_nr_pages;
 981                ++nr_pages;
 982        }
 983        EASSERTF(ergo(ext->oe_start >= trunc_index + !!partial,
 984                      ext->oe_nr_pages == 0),
 985                ext, "trunc_index %lu, partial %d\n", trunc_index, partial);
 986
 987        osc_object_lock(obj);
 988        if (ext->oe_nr_pages == 0) {
 989                LASSERT(pages_in_chunk == 0);
 990                grants = ext->oe_grants;
 991                ext->oe_grants = 0;
 992        } else { /* calculate how many grants we can free */
 993                int     chunks = (ext->oe_end >> ppc_bits) - trunc_chunk;
 994                pgoff_t last_index;
 995
 996
 997                /* if there is no pages in this chunk, we can also free grants
 998                 * for the last chunk */
 999                if (pages_in_chunk == 0) {
1000                        /* if this is the 1st chunk and no pages in this chunk,
1001                         * ext->oe_nr_pages must be zero, so we should be in
1002                         * the other if-clause. */
1003                        LASSERT(trunc_chunk > 0);
1004                        --trunc_chunk;
1005                        ++chunks;
1006                }
1007
1008                /* this is what we can free from this extent */
1009                grants    = chunks << cli->cl_chunkbits;
1010                ext->oe_grants -= grants;
1011                last_index      = ((trunc_chunk + 1) << ppc_bits) - 1;
1012                ext->oe_end     = min(last_index, ext->oe_max_end);
1013                LASSERT(ext->oe_end >= ext->oe_start);
1014                LASSERT(ext->oe_grants > 0);
1015        }
1016        osc_object_unlock(obj);
1017
1018        if (grants > 0 || nr_pages > 0)
1019                osc_free_grant(cli, nr_pages, grants);
1020
1021out:
1022        cl_io_fini(env, io);
1023        cl_env_nested_put(&nest, env);
1024        RETURN(rc);
1025}
1026
1027/**
1028 * This function is used to make the extent prepared for transfer.
1029 * A race with flusing page - ll_writepage() has to be handled cautiously.
1030 */
1031static int osc_extent_make_ready(const struct lu_env *env,
1032                                 struct osc_extent *ext)
1033{
1034        struct osc_async_page *oap;
1035        struct osc_async_page *last = NULL;
1036        struct osc_object *obj = ext->oe_obj;
1037        int page_count = 0;
1038        int rc;
1039        ENTRY;
1040
1041        /* we're going to grab page lock, so object lock must not be taken. */
1042        LASSERT(sanity_check(ext) == 0);
1043        /* in locking state, any process should not touch this extent. */
1044        EASSERT(ext->oe_state == OES_LOCKING, ext);
1045        EASSERT(ext->oe_owner != NULL, ext);
1046
1047        OSC_EXTENT_DUMP(D_CACHE, ext, "make ready\n");
1048
1049        list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1050                ++page_count;
1051                if (last == NULL || last->oap_obj_off < oap->oap_obj_off)
1052                        last = oap;
1053
1054                /* checking ASYNC_READY is race safe */
1055                if ((oap->oap_async_flags & ASYNC_READY) != 0)
1056                        continue;
1057
1058                rc = osc_make_ready(env, oap, OBD_BRW_WRITE);
1059                switch (rc) {
1060                case 0:
1061                        spin_lock(&oap->oap_lock);
1062                        oap->oap_async_flags |= ASYNC_READY;
1063                        spin_unlock(&oap->oap_lock);
1064                        break;
1065                case -EALREADY:
1066                        LASSERT((oap->oap_async_flags & ASYNC_READY) != 0);
1067                        break;
1068                default:
1069                        LASSERTF(0, "unknown return code: %d\n", rc);
1070                }
1071        }
1072
1073        LASSERT(page_count == ext->oe_nr_pages);
1074        LASSERT(last != NULL);
1075        /* the last page is the only one we need to refresh its count by
1076         * the size of file. */
1077        if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) {
1078                last->oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
1079                LASSERT(last->oap_count > 0);
1080                LASSERT(last->oap_page_off + last->oap_count <= PAGE_CACHE_SIZE);
1081                last->oap_async_flags |= ASYNC_COUNT_STABLE;
1082        }
1083
1084        /* for the rest of pages, we don't need to call osf_refresh_count()
1085         * because it's known they are not the last page */
1086        list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1087                if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
1088                        oap->oap_count = PAGE_CACHE_SIZE - oap->oap_page_off;
1089                        oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1090                }
1091        }
1092
1093        osc_object_lock(obj);
1094        osc_extent_state_set(ext, OES_RPC);
1095        osc_object_unlock(obj);
1096        /* get a refcount for RPC. */
1097        osc_extent_get(ext);
1098
1099        RETURN(0);
1100}
1101
1102/**
1103 * Quick and simple version of osc_extent_find(). This function is frequently
1104 * called to expand the extent for the same IO. To expand the extent, the
1105 * page index must be in the same or next chunk of ext->oe_end.
1106 */
1107static int osc_extent_expand(struct osc_extent *ext, pgoff_t index, int *grants)
1108{
1109        struct osc_object *obj = ext->oe_obj;
1110        struct client_obd *cli = osc_cli(obj);
1111        struct osc_extent *next;
1112        int ppc_bits = cli->cl_chunkbits - PAGE_CACHE_SHIFT;
1113        pgoff_t chunk = index >> ppc_bits;
1114        pgoff_t end_chunk;
1115        pgoff_t end_index;
1116        int chunksize = 1 << cli->cl_chunkbits;
1117        int rc = 0;
1118        ENTRY;
1119
1120        LASSERT(ext->oe_max_end >= index && ext->oe_start <= index);
1121        osc_object_lock(obj);
1122        LASSERT(sanity_check_nolock(ext) == 0);
1123        end_chunk = ext->oe_end >> ppc_bits;
1124        if (chunk > end_chunk + 1)
1125                GOTO(out, rc = -ERANGE);
1126
1127        if (end_chunk >= chunk)
1128                GOTO(out, rc = 0);
1129
1130        LASSERT(end_chunk + 1 == chunk);
1131        /* try to expand this extent to cover @index */
1132        end_index = min(ext->oe_max_end, ((chunk + 1) << ppc_bits) - 1);
1133
1134        next = next_extent(ext);
1135        if (next != NULL && next->oe_start <= end_index)
1136                /* complex mode - overlapped with the next extent,
1137                 * this case will be handled by osc_extent_find() */
1138                GOTO(out, rc = -EAGAIN);
1139
1140        ext->oe_end = end_index;
1141        ext->oe_grants += chunksize;
1142        *grants -= chunksize;
1143        LASSERT(*grants >= 0);
1144        EASSERTF(osc_extent_is_overlapped(obj, ext) == 0, ext,
1145                 "overlapped after expanding for %lu.\n", index);
1146        EXIT;
1147
1148out:
1149        osc_object_unlock(obj);
1150        RETURN(rc);
1151}
1152
1153static void osc_extent_tree_dump0(int level, struct osc_object *obj,
1154                                  const char *func, int line)
1155{
1156        struct osc_extent *ext;
1157        int cnt;
1158
1159        CDEBUG(level, "Dump object %p extents at %s:%d, mppr: %u.\n",
1160               obj, func, line, osc_cli(obj)->cl_max_pages_per_rpc);
1161
1162        /* osc_object_lock(obj); */
1163        cnt = 1;
1164        for (ext = first_extent(obj); ext != NULL; ext = next_extent(ext))
1165                OSC_EXTENT_DUMP(level, ext, "in tree %d.\n", cnt++);
1166
1167        cnt = 1;
1168        list_for_each_entry(ext, &obj->oo_hp_exts, oe_link)
1169                OSC_EXTENT_DUMP(level, ext, "hp %d.\n", cnt++);
1170
1171        cnt = 1;
1172        list_for_each_entry(ext, &obj->oo_urgent_exts, oe_link)
1173                OSC_EXTENT_DUMP(level, ext, "urgent %d.\n", cnt++);
1174
1175        cnt = 1;
1176        list_for_each_entry(ext, &obj->oo_reading_exts, oe_link)
1177                OSC_EXTENT_DUMP(level, ext, "reading %d.\n", cnt++);
1178        /* osc_object_unlock(obj); */
1179}
1180
1181/* ------------------ osc extent end ------------------ */
1182
1183static inline int osc_is_ready(struct osc_object *osc)
1184{
1185        return !list_empty(&osc->oo_ready_item) ||
1186               !list_empty(&osc->oo_hp_ready_item);
1187}
1188
1189#define OSC_IO_DEBUG(OSC, STR, args...)                                        \
1190        CDEBUG(D_CACHE, "obj %p ready %d|%c|%c wr %d|%c|%c rd %d|%c " STR,     \
1191               (OSC), osc_is_ready(OSC),                                       \
1192               list_empty_marker(&(OSC)->oo_hp_ready_item),                    \
1193               list_empty_marker(&(OSC)->oo_ready_item),                       \
1194               atomic_read(&(OSC)->oo_nr_writes),                              \
1195               list_empty_marker(&(OSC)->oo_hp_exts),                          \
1196               list_empty_marker(&(OSC)->oo_urgent_exts),                      \
1197               atomic_read(&(OSC)->oo_nr_reads),                               \
1198               list_empty_marker(&(OSC)->oo_reading_exts),                     \
1199               ##args)
1200
1201static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
1202                          int cmd)
1203{
1204        struct osc_page *opg  = oap2osc_page(oap);
1205        struct cl_page  *page = cl_page_top(oap2cl_page(oap));
1206        int result;
1207
1208        LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */
1209
1210        ENTRY;
1211        result = cl_page_make_ready(env, page, CRT_WRITE);
1212        if (result == 0)
1213                opg->ops_submit_time = cfs_time_current();
1214        RETURN(result);
1215}
1216
1217static int osc_refresh_count(const struct lu_env *env,
1218                             struct osc_async_page *oap, int cmd)
1219{
1220        struct osc_page  *opg = oap2osc_page(oap);
1221        struct cl_page   *page = oap2cl_page(oap);
1222        struct cl_object *obj;
1223        struct cl_attr   *attr = &osc_env_info(env)->oti_attr;
1224
1225        int result;
1226        loff_t kms;
1227
1228        /* readpage queues with _COUNT_STABLE, shouldn't get here. */
1229        LASSERT(!(cmd & OBD_BRW_READ));
1230        LASSERT(opg != NULL);
1231        obj = opg->ops_cl.cpl_obj;
1232
1233        cl_object_attr_lock(obj);
1234        result = cl_object_attr_get(env, obj, attr);
1235        cl_object_attr_unlock(obj);
1236        if (result < 0)
1237                return result;
1238        kms = attr->cat_kms;
1239        if (cl_offset(obj, page->cp_index) >= kms)
1240                /* catch race with truncate */
1241                return 0;
1242        else if (cl_offset(obj, page->cp_index + 1) > kms)
1243                /* catch sub-page write at end of file */
1244                return kms % PAGE_CACHE_SIZE;
1245        else
1246                return PAGE_CACHE_SIZE;
1247}
1248
1249static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
1250                          int cmd, int rc)
1251{
1252        struct osc_page   *opg  = oap2osc_page(oap);
1253        struct cl_page    *page = cl_page_top(oap2cl_page(oap));
1254        struct osc_object *obj  = cl2osc(opg->ops_cl.cpl_obj);
1255        enum cl_req_type   crt;
1256        int srvlock;
1257
1258        ENTRY;
1259
1260        cmd &= ~OBD_BRW_NOQUOTA;
1261        LASSERT(equi(page->cp_state == CPS_PAGEIN,  cmd == OBD_BRW_READ));
1262        LASSERT(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE));
1263        LASSERT(opg->ops_transfer_pinned);
1264
1265        /*
1266         * page->cp_req can be NULL if io submission failed before
1267         * cl_req was allocated.
1268         */
1269        if (page->cp_req != NULL)
1270                cl_req_page_done(env, page);
1271        LASSERT(page->cp_req == NULL);
1272
1273        crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE;
1274        /* Clear opg->ops_transfer_pinned before VM lock is released. */
1275        opg->ops_transfer_pinned = 0;
1276
1277        spin_lock(&obj->oo_seatbelt);
1278        LASSERT(opg->ops_submitter != NULL);
1279        LASSERT(!list_empty(&opg->ops_inflight));
1280        list_del_init(&opg->ops_inflight);
1281        opg->ops_submitter = NULL;
1282        spin_unlock(&obj->oo_seatbelt);
1283
1284        opg->ops_submit_time = 0;
1285        srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
1286
1287        /* statistic */
1288        if (rc == 0 && srvlock) {
1289                struct lu_device *ld    = opg->ops_cl.cpl_obj->co_lu.lo_dev;
1290                struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
1291                int bytes = oap->oap_count;
1292
1293                if (crt == CRT_READ)
1294                        stats->os_lockless_reads += bytes;
1295                else
1296                        stats->os_lockless_writes += bytes;
1297        }
1298
1299        /*
1300         * This has to be the last operation with the page, as locks are
1301         * released in cl_page_completion() and nothing except for the
1302         * reference counter protects page from concurrent reclaim.
1303         */
1304        lu_ref_del(&page->cp_reference, "transfer", page);
1305
1306        cl_page_completion(env, page, crt, rc);
1307
1308        RETURN(0);
1309}
1310
1311#define OSC_DUMP_GRANT(cli, fmt, args...) do {                                \
1312        struct client_obd *__tmp = (cli);                                     \
1313        CDEBUG(D_CACHE, "%s: { dirty: %ld/%ld dirty_pages: %d/%d "            \
1314               "dropped: %ld avail: %ld, reserved: %ld, flight: %d } " fmt,   \
1315               __tmp->cl_import->imp_obd->obd_name,                           \
1316               __tmp->cl_dirty, __tmp->cl_dirty_max,                          \
1317               atomic_read(&obd_dirty_pages), obd_max_dirty_pages,            \
1318               __tmp->cl_lost_grant, __tmp->cl_avail_grant,                   \
1319               __tmp->cl_reserved_grant, __tmp->cl_w_in_flight, ##args);      \
1320} while (0)
1321
1322/* caller must hold loi_list_lock */
1323static void osc_consume_write_grant(struct client_obd *cli,
1324                                    struct brw_page *pga)
1325{
1326        LASSERT(spin_is_locked(&cli->cl_loi_list_lock.lock));
1327        LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
1328        atomic_inc(&obd_dirty_pages);
1329        cli->cl_dirty += PAGE_CACHE_SIZE;
1330        pga->flag |= OBD_BRW_FROM_GRANT;
1331        CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
1332               PAGE_CACHE_SIZE, pga, pga->pg);
1333        osc_update_next_shrink(cli);
1334}
1335
1336/* the companion to osc_consume_write_grant, called when a brw has completed.
1337 * must be called with the loi lock held. */
1338static void osc_release_write_grant(struct client_obd *cli,
1339                                    struct brw_page *pga)
1340{
1341        ENTRY;
1342
1343        LASSERT(spin_is_locked(&cli->cl_loi_list_lock.lock));
1344        if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
1345                EXIT;
1346                return;
1347        }
1348
1349        pga->flag &= ~OBD_BRW_FROM_GRANT;
1350        atomic_dec(&obd_dirty_pages);
1351        cli->cl_dirty -= PAGE_CACHE_SIZE;
1352        if (pga->flag & OBD_BRW_NOCACHE) {
1353                pga->flag &= ~OBD_BRW_NOCACHE;
1354                atomic_dec(&obd_dirty_transit_pages);
1355                cli->cl_dirty_transit -= PAGE_CACHE_SIZE;
1356        }
1357        EXIT;
1358}
1359
1360/**
1361 * To avoid sleeping with object lock held, it's good for us allocate enough
1362 * grants before entering into critical section.
1363 *
1364 * client_obd_list_lock held by caller
1365 */
1366static int osc_reserve_grant(struct client_obd *cli, unsigned int bytes)
1367{
1368        int rc = -EDQUOT;
1369
1370        if (cli->cl_avail_grant >= bytes) {
1371                cli->cl_avail_grant    -= bytes;
1372                cli->cl_reserved_grant += bytes;
1373                rc = 0;
1374        }
1375        return rc;
1376}
1377
1378static void __osc_unreserve_grant(struct client_obd *cli,
1379                                  unsigned int reserved, unsigned int unused)
1380{
1381        /* it's quite normal for us to get more grant than reserved.
1382         * Thinking about a case that two extents merged by adding a new
1383         * chunk, we can save one extent tax. If extent tax is greater than
1384         * one chunk, we can save more grant by adding a new chunk */
1385        cli->cl_reserved_grant -= reserved;
1386        if (unused > reserved) {
1387                cli->cl_avail_grant += reserved;
1388                cli->cl_lost_grant  += unused - reserved;
1389        } else {
1390                cli->cl_avail_grant += unused;
1391        }
1392}
1393
1394void osc_unreserve_grant(struct client_obd *cli,
1395                         unsigned int reserved, unsigned int unused)
1396{
1397        client_obd_list_lock(&cli->cl_loi_list_lock);
1398        __osc_unreserve_grant(cli, reserved, unused);
1399        if (unused > 0)
1400                osc_wake_cache_waiters(cli);
1401        client_obd_list_unlock(&cli->cl_loi_list_lock);
1402}
1403
1404/**
1405 * Free grant after IO is finished or canceled.
1406 *
1407 * @lost_grant is used to remember how many grants we have allocated but not
1408 * used, we should return these grants to OST. There're two cases where grants
1409 * can be lost:
1410 * 1. truncate;
1411 * 2. blocksize at OST is less than PAGE_CACHE_SIZE and a partial page was
1412 *    written. In this case OST may use less chunks to serve this partial
1413 *    write. OSTs don't actually know the page size on the client side. so
1414 *    clients have to calculate lost grant by the blocksize on the OST.
1415 *    See filter_grant_check() for details.
1416 */
1417static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
1418                           unsigned int lost_grant)
1419{
1420        int grant = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
1421
1422        client_obd_list_lock(&cli->cl_loi_list_lock);
1423        atomic_sub(nr_pages, &obd_dirty_pages);
1424        cli->cl_dirty -= nr_pages << PAGE_CACHE_SHIFT;
1425        cli->cl_lost_grant += lost_grant;
1426        if (cli->cl_avail_grant < grant && cli->cl_lost_grant >= grant) {
1427                /* borrow some grant from truncate to avoid the case that
1428                 * truncate uses up all avail grant */
1429                cli->cl_lost_grant -= grant;
1430                cli->cl_avail_grant += grant;
1431        }
1432        osc_wake_cache_waiters(cli);
1433        client_obd_list_unlock(&cli->cl_loi_list_lock);
1434        CDEBUG(D_CACHE, "lost %u grant: %lu avail: %lu dirty: %lu\n",
1435               lost_grant, cli->cl_lost_grant,
1436               cli->cl_avail_grant, cli->cl_dirty);
1437}
1438
1439/**
1440 * The companion to osc_enter_cache(), called when @oap is no longer part of
1441 * the dirty accounting due to error.
1442 */
1443static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap)
1444{
1445        client_obd_list_lock(&cli->cl_loi_list_lock);
1446        osc_release_write_grant(cli, &oap->oap_brw_page);
1447        client_obd_list_unlock(&cli->cl_loi_list_lock);
1448}
1449
1450/**
1451 * Non-blocking version of osc_enter_cache() that consumes grant only when it
1452 * is available.
1453 */
1454static int osc_enter_cache_try(struct client_obd *cli,
1455                               struct osc_async_page *oap,
1456                               int bytes, int transient)
1457{
1458        int rc;
1459
1460        OSC_DUMP_GRANT(cli, "need:%d.\n", bytes);
1461
1462        rc = osc_reserve_grant(cli, bytes);
1463        if (rc < 0)
1464                return 0;
1465
1466        if (cli->cl_dirty + PAGE_CACHE_SIZE <= cli->cl_dirty_max &&
1467            atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) {
1468                osc_consume_write_grant(cli, &oap->oap_brw_page);
1469                if (transient) {
1470                        cli->cl_dirty_transit += PAGE_CACHE_SIZE;
1471                        atomic_inc(&obd_dirty_transit_pages);
1472                        oap->oap_brw_flags |= OBD_BRW_NOCACHE;
1473                }
1474                rc = 1;
1475        } else {
1476                __osc_unreserve_grant(cli, bytes, bytes);
1477                rc = 0;
1478        }
1479        return rc;
1480}
1481
1482static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
1483{
1484        int rc;
1485        client_obd_list_lock(&cli->cl_loi_list_lock);
1486        rc = list_empty(&ocw->ocw_entry);
1487        client_obd_list_unlock(&cli->cl_loi_list_lock);
1488        return rc;
1489}
1490
1491/**
1492 * The main entry to reserve dirty page accounting. Usually the grant reserved
1493 * in this function will be freed in bulk in osc_free_grant() unless it fails
1494 * to add osc cache, in that case, it will be freed in osc_exit_cache().
1495 *
1496 * The process will be put into sleep if it's already run out of grant.
1497 */
1498static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
1499                           struct osc_async_page *oap, int bytes)
1500{
1501        struct osc_object *osc = oap->oap_obj;
1502        struct lov_oinfo  *loi = osc->oo_oinfo;
1503        struct osc_cache_waiter ocw;
1504        struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1505        int rc = -EDQUOT;
1506        ENTRY;
1507
1508        OSC_DUMP_GRANT(cli, "need:%d.\n", bytes);
1509
1510        client_obd_list_lock(&cli->cl_loi_list_lock);
1511
1512        /* force the caller to try sync io.  this can jump the list
1513         * of queued writes and create a discontiguous rpc stream */
1514        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
1515            cli->cl_dirty_max < PAGE_CACHE_SIZE     ||
1516            cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
1517                GOTO(out, rc = -EDQUOT);
1518
1519        /* Hopefully normal case - cache space and write credits available */
1520        if (osc_enter_cache_try(cli, oap, bytes, 0))
1521                GOTO(out, rc = 0);
1522
1523        /* We can get here for two reasons: too many dirty pages in cache, or
1524         * run out of grants. In both cases we should write dirty pages out.
1525         * Adding a cache waiter will trigger urgent write-out no matter what
1526         * RPC size will be.
1527         * The exiting condition is no avail grants and no dirty pages caching,
1528         * that really means there is no space on the OST. */
1529        init_waitqueue_head(&ocw.ocw_waitq);
1530        ocw.ocw_oap   = oap;
1531        ocw.ocw_grant = bytes;
1532        while (cli->cl_dirty > 0 || cli->cl_w_in_flight > 0) {
1533                list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
1534                ocw.ocw_rc = 0;
1535                client_obd_list_unlock(&cli->cl_loi_list_lock);
1536
1537                osc_io_unplug_async(env, cli, NULL);
1538
1539                CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
1540                       cli->cl_import->imp_obd->obd_name, &ocw, oap);
1541
1542                rc = l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
1543
1544                client_obd_list_lock(&cli->cl_loi_list_lock);
1545
1546                /* l_wait_event is interrupted by signal */
1547                if (rc < 0) {
1548                        list_del_init(&ocw.ocw_entry);
1549                        GOTO(out, rc);
1550                }
1551
1552                LASSERT(list_empty(&ocw.ocw_entry));
1553                rc = ocw.ocw_rc;
1554
1555                if (rc != -EDQUOT)
1556                        GOTO(out, rc);
1557                if (osc_enter_cache_try(cli, oap, bytes, 0))
1558                        GOTO(out, rc = 0);
1559        }
1560        EXIT;
1561out:
1562        client_obd_list_unlock(&cli->cl_loi_list_lock);
1563        OSC_DUMP_GRANT(cli, "returned %d.\n", rc);
1564        RETURN(rc);
1565}
1566
1567/* caller must hold loi_list_lock */
1568void osc_wake_cache_waiters(struct client_obd *cli)
1569{
1570        struct list_head *l, *tmp;
1571        struct osc_cache_waiter *ocw;
1572
1573        ENTRY;
1574        list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
1575                ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
1576                list_del_init(&ocw->ocw_entry);
1577
1578                ocw->ocw_rc = -EDQUOT;
1579                /* we can't dirty more */
1580                if ((cli->cl_dirty + PAGE_CACHE_SIZE > cli->cl_dirty_max) ||
1581                    (atomic_read(&obd_dirty_pages) + 1 >
1582                     obd_max_dirty_pages)) {
1583                        CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
1584                               "osc max %ld, sys max %d\n", cli->cl_dirty,
1585                               cli->cl_dirty_max, obd_max_dirty_pages);
1586                        goto wakeup;
1587                }
1588
1589                ocw->ocw_rc = 0;
1590                if (!osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0))
1591                        ocw->ocw_rc = -EDQUOT;
1592
1593wakeup:
1594                CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n",
1595                       ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc);
1596
1597                wake_up(&ocw->ocw_waitq);
1598        }
1599
1600        EXIT;
1601}
1602
1603static int osc_max_rpc_in_flight(struct client_obd *cli, struct osc_object *osc)
1604{
1605        int hprpc = !!list_empty(&osc->oo_hp_exts);
1606        return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
1607}
1608
1609/* This maintains the lists of pending pages to read/write for a given object
1610 * (lop).  This is used by osc_check_rpcs->osc_next_obj() and osc_list_maint()
1611 * to quickly find objects that are ready to send an RPC. */
1612static int osc_makes_rpc(struct client_obd *cli, struct osc_object *osc,
1613                         int cmd)
1614{
1615        int invalid_import = 0;
1616        ENTRY;
1617
1618        /* if we have an invalid import we want to drain the queued pages
1619         * by forcing them through rpcs that immediately fail and complete
1620         * the pages.  recovery relies on this to empty the queued pages
1621         * before canceling the locks and evicting down the llite pages */
1622        if ((cli->cl_import == NULL || cli->cl_import->imp_invalid))
1623                invalid_import = 1;
1624
1625        if (cmd & OBD_BRW_WRITE) {
1626                if (atomic_read(&osc->oo_nr_writes) == 0)
1627                        RETURN(0);
1628                if (invalid_import) {
1629                        CDEBUG(D_CACHE, "invalid import forcing RPC\n");
1630                        RETURN(1);
1631                }
1632                if (!list_empty(&osc->oo_hp_exts)) {
1633                        CDEBUG(D_CACHE, "high prio request forcing RPC\n");
1634                        RETURN(1);
1635                }
1636                if (!list_empty(&osc->oo_urgent_exts)) {
1637                        CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1638                        RETURN(1);
1639                }
1640                /* trigger a write rpc stream as long as there are dirtiers
1641                 * waiting for space.  as they're waiting, they're not going to
1642                 * create more pages to coalesce with what's waiting.. */
1643                if (!list_empty(&cli->cl_cache_waiters)) {
1644                        CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1645                        RETURN(1);
1646                }
1647                if (atomic_read(&osc->oo_nr_writes) >=
1648                    cli->cl_max_pages_per_rpc)
1649                        RETURN(1);
1650        } else {
1651                if (atomic_read(&osc->oo_nr_reads) == 0)
1652                        RETURN(0);
1653                if (invalid_import) {
1654                        CDEBUG(D_CACHE, "invalid import forcing RPC\n");
1655                        RETURN(1);
1656                }
1657                /* all read are urgent. */
1658                if (!list_empty(&osc->oo_reading_exts))
1659                        RETURN(1);
1660        }
1661
1662        RETURN(0);
1663}
1664
1665static void osc_update_pending(struct osc_object *obj, int cmd, int delta)
1666{
1667        struct client_obd *cli = osc_cli(obj);
1668        if (cmd & OBD_BRW_WRITE) {
1669                atomic_add(delta, &obj->oo_nr_writes);
1670                atomic_add(delta, &cli->cl_pending_w_pages);
1671                LASSERT(atomic_read(&obj->oo_nr_writes) >= 0);
1672        } else {
1673                atomic_add(delta, &obj->oo_nr_reads);
1674                atomic_add(delta, &cli->cl_pending_r_pages);
1675                LASSERT(atomic_read(&obj->oo_nr_reads) >= 0);
1676        }
1677        OSC_IO_DEBUG(obj, "update pending cmd %d delta %d.\n", cmd, delta);
1678}
1679
1680static int osc_makes_hprpc(struct osc_object *obj)
1681{
1682        return !list_empty(&obj->oo_hp_exts);
1683}
1684
1685static void on_list(struct list_head *item, struct list_head *list, int should_be_on)
1686{
1687        if (list_empty(item) && should_be_on)
1688                list_add_tail(item, list);
1689        else if (!list_empty(item) && !should_be_on)
1690                list_del_init(item);
1691}
1692
1693/* maintain the osc's cli list membership invariants so that osc_send_oap_rpc
1694 * can find pages to build into rpcs quickly */
1695static int __osc_list_maint(struct client_obd *cli, struct osc_object *osc)
1696{
1697        if (osc_makes_hprpc(osc)) {
1698                /* HP rpc */
1699                on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list, 0);
1700                on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1701        } else {
1702                on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1703                on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list,
1704                        osc_makes_rpc(cli, osc, OBD_BRW_WRITE) ||
1705                        osc_makes_rpc(cli, osc, OBD_BRW_READ));
1706        }
1707
1708        on_list(&osc->oo_write_item, &cli->cl_loi_write_list,
1709                atomic_read(&osc->oo_nr_writes) > 0);
1710
1711        on_list(&osc->oo_read_item, &cli->cl_loi_read_list,
1712                atomic_read(&osc->oo_nr_reads) > 0);
1713
1714        return osc_is_ready(osc);
1715}
1716
1717static int osc_list_maint(struct client_obd *cli, struct osc_object *osc)
1718{
1719        int is_ready;
1720
1721        client_obd_list_lock(&cli->cl_loi_list_lock);
1722        is_ready = __osc_list_maint(cli, osc);
1723        client_obd_list_unlock(&cli->cl_loi_list_lock);
1724
1725        return is_ready;
1726}
1727
1728/* this is trying to propogate async writeback errors back up to the
1729 * application.  As an async write fails we record the error code for later if
1730 * the app does an fsync.  As long as errors persist we force future rpcs to be
1731 * sync so that the app can get a sync error and break the cycle of queueing
1732 * pages for which writeback will fail. */
1733static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1734                           int rc)
1735{
1736        if (rc) {
1737                if (!ar->ar_rc)
1738                        ar->ar_rc = rc;
1739
1740                ar->ar_force_sync = 1;
1741                ar->ar_min_xid = ptlrpc_sample_next_xid();
1742                return;
1743
1744        }
1745
1746        if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1747                ar->ar_force_sync = 0;
1748}
1749
1750
1751/* this must be called holding the loi list lock to give coverage to exit_cache,
1752 * async_flag maintenance, and oap_request */
1753static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
1754                              struct osc_async_page *oap, int sent, int rc)
1755{
1756        struct osc_object *osc = oap->oap_obj;
1757        struct lov_oinfo  *loi = osc->oo_oinfo;
1758        __u64 xid = 0;
1759
1760        ENTRY;
1761        if (oap->oap_request != NULL) {
1762                xid = ptlrpc_req_xid(oap->oap_request);
1763                ptlrpc_req_finished(oap->oap_request);
1764                oap->oap_request = NULL;
1765        }
1766
1767        /* As the transfer for this page is being done, clear the flags */
1768        spin_lock(&oap->oap_lock);
1769        oap->oap_async_flags = 0;
1770        spin_unlock(&oap->oap_lock);
1771        oap->oap_interrupted = 0;
1772
1773        if (oap->oap_cmd & OBD_BRW_WRITE && xid > 0) {
1774                client_obd_list_lock(&cli->cl_loi_list_lock);
1775                osc_process_ar(&cli->cl_ar, xid, rc);
1776                osc_process_ar(&loi->loi_ar, xid, rc);
1777                client_obd_list_unlock(&cli->cl_loi_list_lock);
1778        }
1779
1780        rc = osc_completion(env, oap, oap->oap_cmd, rc);
1781        if (rc)
1782                CERROR("completion on oap %p obj %p returns %d.\n",
1783                       oap, osc, rc);
1784
1785        EXIT;
1786}
1787
1788/**
1789 * Try to add extent to one RPC. We need to think about the following things:
1790 * - # of pages must not be over max_pages_per_rpc
1791 * - extent must be compatible with previous ones
1792 */
1793static int try_to_add_extent_for_io(struct client_obd *cli,
1794                                    struct osc_extent *ext, struct list_head *rpclist,
1795                                    int *pc, unsigned int *max_pages)
1796{
1797        struct osc_extent *tmp;
1798        ENTRY;
1799
1800        EASSERT((ext->oe_state == OES_CACHE || ext->oe_state == OES_LOCK_DONE),
1801                ext);
1802
1803        *max_pages = max(ext->oe_mppr, *max_pages);
1804        if (*pc + ext->oe_nr_pages > *max_pages)
1805                RETURN(0);
1806
1807        list_for_each_entry(tmp, rpclist, oe_link) {
1808                EASSERT(tmp->oe_owner == current, tmp);
1809#if 0
1810                if (overlapped(tmp, ext)) {
1811                        OSC_EXTENT_DUMP(D_ERROR, tmp, "overlapped %p.\n", ext);
1812                        EASSERT(0, ext);
1813                }
1814#endif
1815
1816                if (tmp->oe_srvlock != ext->oe_srvlock ||
1817                    !tmp->oe_grants != !ext->oe_grants)
1818                        RETURN(0);
1819
1820                /* remove break for strict check */
1821                break;
1822        }
1823
1824        *pc += ext->oe_nr_pages;
1825        list_move_tail(&ext->oe_link, rpclist);
1826        ext->oe_owner = current;
1827        RETURN(1);
1828}
1829
1830/**
1831 * In order to prevent multiple ptlrpcd from breaking contiguous extents,
1832 * get_write_extent() takes all appropriate extents in atomic.
1833 *
1834 * The following policy is used to collect extents for IO:
1835 * 1. Add as many HP extents as possible;
1836 * 2. Add the first urgent extent in urgent extent list and take it out of
1837 *    urgent list;
1838 * 3. Add subsequent extents of this urgent extent;
1839 * 4. If urgent list is not empty, goto 2;
1840 * 5. Traverse the extent tree from the 1st extent;
1841 * 6. Above steps exit if there is no space in this RPC.
1842 */
1843static int get_write_extents(struct osc_object *obj, struct list_head *rpclist)
1844{
1845        struct client_obd *cli = osc_cli(obj);
1846        struct osc_extent *ext;
1847        int page_count = 0;
1848        unsigned int max_pages = cli->cl_max_pages_per_rpc;
1849
1850        LASSERT(osc_object_is_locked(obj));
1851        while (!list_empty(&obj->oo_hp_exts)) {
1852                ext = list_entry(obj->oo_hp_exts.next, struct osc_extent,
1853                                     oe_link);
1854                LASSERT(ext->oe_state == OES_CACHE);
1855                if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
1856                                              &max_pages))
1857                        return page_count;
1858                EASSERT(ext->oe_nr_pages <= max_pages, ext);
1859        }
1860        if (page_count == max_pages)
1861                return page_count;
1862
1863        while (!list_empty(&obj->oo_urgent_exts)) {
1864                ext = list_entry(obj->oo_urgent_exts.next,
1865                                     struct osc_extent, oe_link);
1866                if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
1867                                              &max_pages))
1868                        return page_count;
1869
1870                if (!ext->oe_intree)
1871                        continue;
1872
1873                while ((ext = next_extent(ext)) != NULL) {
1874                        if ((ext->oe_state != OES_CACHE) ||
1875                            (!list_empty(&ext->oe_link) &&
1876                             ext->oe_owner != NULL))
1877                                continue;
1878
1879                        if (!try_to_add_extent_for_io(cli, ext, rpclist,
1880                                                      &page_count, &max_pages))
1881                                return page_count;
1882                }
1883        }
1884        if (page_count == max_pages)
1885                return page_count;
1886
1887        ext = first_extent(obj);
1888        while (ext != NULL) {
1889                if ((ext->oe_state != OES_CACHE) ||
1890                    /* this extent may be already in current rpclist */
1891                    (!list_empty(&ext->oe_link) && ext->oe_owner != NULL)) {
1892                        ext = next_extent(ext);
1893                        continue;
1894                }
1895
1896                if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
1897                                              &max_pages))
1898                        return page_count;
1899
1900                ext = next_extent(ext);
1901        }
1902        return page_count;
1903}
1904
1905static int
1906osc_send_write_rpc(const struct lu_env *env, struct client_obd *cli,
1907                   struct osc_object *osc, pdl_policy_t pol)
1908{
1909        LIST_HEAD(rpclist);
1910        struct osc_extent *ext;
1911        struct osc_extent *tmp;
1912        struct osc_extent *first = NULL;
1913        obd_count page_count = 0;
1914        int srvlock = 0;
1915        int rc = 0;
1916        ENTRY;
1917
1918        LASSERT(osc_object_is_locked(osc));
1919
1920        page_count = get_write_extents(osc, &rpclist);
1921        LASSERT(equi(page_count == 0, list_empty(&rpclist)));
1922
1923        if (list_empty(&rpclist))
1924                RETURN(0);
1925
1926        osc_update_pending(osc, OBD_BRW_WRITE, -page_count);
1927
1928        list_for_each_entry(ext, &rpclist, oe_link) {
1929                LASSERT(ext->oe_state == OES_CACHE ||
1930                        ext->oe_state == OES_LOCK_DONE);
1931                if (ext->oe_state == OES_CACHE)
1932                        osc_extent_state_set(ext, OES_LOCKING);
1933                else
1934                        osc_extent_state_set(ext, OES_RPC);
1935        }
1936
1937        /* we're going to grab page lock, so release object lock because
1938         * lock order is page lock -> object lock. */
1939        osc_object_unlock(osc);
1940
1941        list_for_each_entry_safe(ext, tmp, &rpclist, oe_link) {
1942                if (ext->oe_state == OES_LOCKING) {
1943                        rc = osc_extent_make_ready(env, ext);
1944                        if (unlikely(rc < 0)) {
1945                                list_del_init(&ext->oe_link);
1946                                osc_extent_finish(env, ext, 0, rc);
1947                                continue;
1948                        }
1949                }
1950                if (first == NULL) {
1951                        first = ext;
1952                        srvlock = ext->oe_srvlock;
1953                } else {
1954                        LASSERT(srvlock == ext->oe_srvlock);
1955                }
1956        }
1957
1958        if (!list_empty(&rpclist)) {
1959                LASSERT(page_count > 0);
1960                rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_WRITE, pol);
1961                LASSERT(list_empty(&rpclist));
1962        }
1963
1964        osc_object_lock(osc);
1965        RETURN(rc);
1966}
1967
1968/**
1969 * prepare pages for ASYNC io and put pages in send queue.
1970 *
1971 * \param cmd OBD_BRW_* macroses
1972 * \param lop pending pages
1973 *
1974 * \return zero if no page added to send queue.
1975 * \return 1 if pages successfully added to send queue.
1976 * \return negative on errors.
1977 */
1978static int
1979osc_send_read_rpc(const struct lu_env *env, struct client_obd *cli,
1980                  struct osc_object *osc, pdl_policy_t pol)
1981{
1982        struct osc_extent *ext;
1983        struct osc_extent *next;
1984        LIST_HEAD(rpclist);
1985        int page_count = 0;
1986        unsigned int max_pages = cli->cl_max_pages_per_rpc;
1987        int rc = 0;
1988        ENTRY;
1989
1990        LASSERT(osc_object_is_locked(osc));
1991        list_for_each_entry_safe(ext, next,
1992                                     &osc->oo_reading_exts, oe_link) {
1993                EASSERT(ext->oe_state == OES_LOCK_DONE, ext);
1994                if (!try_to_add_extent_for_io(cli, ext, &rpclist, &page_count,
1995                                              &max_pages))
1996                        break;
1997                osc_extent_state_set(ext, OES_RPC);
1998                EASSERT(ext->oe_nr_pages <= max_pages, ext);
1999        }
2000        LASSERT(page_count <= max_pages);
2001
2002        osc_update_pending(osc, OBD_BRW_READ, -page_count);
2003
2004        if (!list_empty(&rpclist)) {
2005                osc_object_unlock(osc);
2006
2007                LASSERT(page_count > 0);
2008                rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_READ, pol);
2009                LASSERT(list_empty(&rpclist));
2010
2011                osc_object_lock(osc);
2012        }
2013        RETURN(rc);
2014}
2015
2016#define list_to_obj(list, item) ({                                            \
2017        struct list_head *__tmp = (list)->next;                               \
2018        list_del_init(__tmp);                                         \
2019        list_entry(__tmp, struct osc_object, oo_##item);                      \
2020})
2021
2022/* This is called by osc_check_rpcs() to find which objects have pages that
2023 * we could be sending.  These lists are maintained by osc_makes_rpc(). */
2024static struct osc_object *osc_next_obj(struct client_obd *cli)
2025{
2026        ENTRY;
2027
2028        /* First return objects that have blocked locks so that they
2029         * will be flushed quickly and other clients can get the lock,
2030         * then objects which have pages ready to be stuffed into RPCs */
2031        if (!list_empty(&cli->cl_loi_hp_ready_list))
2032                RETURN(list_to_obj(&cli->cl_loi_hp_ready_list, hp_ready_item));
2033        if (!list_empty(&cli->cl_loi_ready_list))
2034                RETURN(list_to_obj(&cli->cl_loi_ready_list, ready_item));
2035
2036        /* then if we have cache waiters, return all objects with queued
2037         * writes.  This is especially important when many small files
2038         * have filled up the cache and not been fired into rpcs because
2039         * they don't pass the nr_pending/object threshhold */
2040        if (!list_empty(&cli->cl_cache_waiters) &&
2041            !list_empty(&cli->cl_loi_write_list))
2042                RETURN(list_to_obj(&cli->cl_loi_write_list, write_item));
2043
2044        /* then return all queued objects when we have an invalid import
2045         * so that they get flushed */
2046        if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2047                if (!list_empty(&cli->cl_loi_write_list))
2048                        RETURN(list_to_obj(&cli->cl_loi_write_list,
2049                                           write_item));
2050                if (!list_empty(&cli->cl_loi_read_list))
2051                        RETURN(list_to_obj(&cli->cl_loi_read_list,
2052                                           read_item));
2053        }
2054        RETURN(NULL);
2055}
2056
2057/* called with the loi list lock held */
2058static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
2059                           pdl_policy_t pol)
2060{
2061        struct osc_object *osc;
2062        int rc = 0;
2063        ENTRY;
2064
2065        while ((osc = osc_next_obj(cli)) != NULL) {
2066                struct cl_object *obj = osc2cl(osc);
2067                struct lu_ref_link *link;
2068
2069                OSC_IO_DEBUG(osc, "%lu in flight\n", rpcs_in_flight(cli));
2070
2071                if (osc_max_rpc_in_flight(cli, osc)) {
2072                        __osc_list_maint(cli, osc);
2073                        break;
2074                }
2075
2076                cl_object_get(obj);
2077                client_obd_list_unlock(&cli->cl_loi_list_lock);
2078                link = lu_object_ref_add(&obj->co_lu, "check", current);
2079
2080                /* attempt some read/write balancing by alternating between
2081                 * reads and writes in an object.  The makes_rpc checks here
2082                 * would be redundant if we were getting read/write work items
2083                 * instead of objects.  we don't want send_oap_rpc to drain a
2084                 * partial read pending queue when we're given this object to
2085                 * do io on writes while there are cache waiters */
2086                osc_object_lock(osc);
2087                if (osc_makes_rpc(cli, osc, OBD_BRW_WRITE)) {
2088                        rc = osc_send_write_rpc(env, cli, osc, pol);
2089                        if (rc < 0) {
2090                                CERROR("Write request failed with %d\n", rc);
2091
2092                                /* osc_send_write_rpc failed, mostly because of
2093                                 * memory pressure.
2094                                 *
2095                                 * It can't break here, because if:
2096                                 *  - a page was submitted by osc_io_submit, so
2097                                 *    page locked;
2098                                 *  - no request in flight
2099                                 *  - no subsequent request
2100                                 * The system will be in live-lock state,
2101                                 * because there is no chance to call
2102                                 * osc_io_unplug() and osc_check_rpcs() any
2103                                 * more. pdflush can't help in this case,
2104                                 * because it might be blocked at grabbing
2105                                 * the page lock as we mentioned.
2106                                 *
2107                                 * Anyway, continue to drain pages. */
2108                                /* break; */
2109                        }
2110                }
2111                if (osc_makes_rpc(cli, osc, OBD_BRW_READ)) {
2112                        rc = osc_send_read_rpc(env, cli, osc, pol);
2113                        if (rc < 0)
2114                                CERROR("Read request failed with %d\n", rc);
2115                }
2116                osc_object_unlock(osc);
2117
2118                osc_list_maint(cli, osc);
2119                lu_object_ref_del_at(&obj->co_lu, link, "check", current);
2120                cl_object_put(env, obj);
2121
2122                client_obd_list_lock(&cli->cl_loi_list_lock);
2123        }
2124}
2125
2126static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
2127                          struct osc_object *osc, pdl_policy_t pol, int async)
2128{
2129        int rc = 0;
2130
2131        if (osc != NULL && osc_list_maint(cli, osc) == 0)
2132                return 0;
2133
2134        if (!async) {
2135                /* disable osc_lru_shrink() temporarily to avoid
2136                 * potential stack overrun problem. LU-2859 */
2137                atomic_inc(&cli->cl_lru_shrinkers);
2138                client_obd_list_lock(&cli->cl_loi_list_lock);
2139                osc_check_rpcs(env, cli, pol);
2140                client_obd_list_unlock(&cli->cl_loi_list_lock);
2141                atomic_dec(&cli->cl_lru_shrinkers);
2142        } else {
2143                CDEBUG(D_CACHE, "Queue writeback work for client %p.\n", cli);
2144                LASSERT(cli->cl_writeback_work != NULL);
2145                rc = ptlrpcd_queue_work(cli->cl_writeback_work);
2146        }
2147        return rc;
2148}
2149
2150static int osc_io_unplug_async(const struct lu_env *env,
2151                                struct client_obd *cli, struct osc_object *osc)
2152{
2153        /* XXX: policy is no use actually. */
2154        return osc_io_unplug0(env, cli, osc, PDL_POLICY_ROUND, 1);
2155}
2156
2157void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
2158                   struct osc_object *osc, pdl_policy_t pol)
2159{
2160        (void)osc_io_unplug0(env, cli, osc, pol, 0);
2161}
2162
2163int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
2164                        struct page *page, loff_t offset)
2165{
2166        struct obd_export     *exp = osc_export(osc);
2167        struct osc_async_page *oap = &ops->ops_oap;
2168        ENTRY;
2169
2170        if (!page)
2171                return cfs_size_round(sizeof(*oap));
2172
2173        oap->oap_magic = OAP_MAGIC;
2174        oap->oap_cli = &exp->exp_obd->u.cli;
2175        oap->oap_obj = osc;
2176
2177        oap->oap_page = page;
2178        oap->oap_obj_off = offset;
2179        LASSERT(!(offset & ~CFS_PAGE_MASK));
2180
2181        if (!client_is_remote(exp) && cfs_capable(CFS_CAP_SYS_RESOURCE))
2182                oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2183
2184        INIT_LIST_HEAD(&oap->oap_pending_item);
2185        INIT_LIST_HEAD(&oap->oap_rpc_item);
2186
2187        spin_lock_init(&oap->oap_lock);
2188        CDEBUG(D_INFO, "oap %p page %p obj off "LPU64"\n",
2189               oap, page, oap->oap_obj_off);
2190        RETURN(0);
2191}
2192
2193int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
2194                       struct osc_page *ops)
2195{
2196        struct osc_io *oio = osc_env_io(env);
2197        struct osc_extent     *ext = NULL;
2198        struct osc_async_page *oap = &ops->ops_oap;
2199        struct client_obd     *cli = oap->oap_cli;
2200        struct osc_object     *osc = oap->oap_obj;
2201        pgoff_t index;
2202        int    grants = 0;
2203        int    brw_flags = OBD_BRW_ASYNC;
2204        int    cmd = OBD_BRW_WRITE;
2205        int    need_release = 0;
2206        int    rc = 0;
2207        ENTRY;
2208
2209        if (oap->oap_magic != OAP_MAGIC)
2210                RETURN(-EINVAL);
2211
2212        if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2213                RETURN(-EIO);
2214
2215        if (!list_empty(&oap->oap_pending_item) ||
2216            !list_empty(&oap->oap_rpc_item))
2217                RETURN(-EBUSY);
2218
2219        /* Set the OBD_BRW_SRVLOCK before the page is queued. */
2220        brw_flags |= ops->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
2221        if (!client_is_remote(osc_export(osc)) &&
2222            cfs_capable(CFS_CAP_SYS_RESOURCE)) {
2223                brw_flags |= OBD_BRW_NOQUOTA;
2224                cmd |= OBD_BRW_NOQUOTA;
2225        }
2226
2227        /* check if the file's owner/group is over quota */
2228        if (!(cmd & OBD_BRW_NOQUOTA)) {
2229                struct cl_object *obj;
2230                struct cl_attr   *attr;
2231                unsigned int qid[MAXQUOTAS];
2232
2233                obj = cl_object_top(&osc->oo_cl);
2234                attr = &osc_env_info(env)->oti_attr;
2235
2236                cl_object_attr_lock(obj);
2237                rc = cl_object_attr_get(env, obj, attr);
2238                cl_object_attr_unlock(obj);
2239
2240                qid[USRQUOTA] = attr->cat_uid;
2241                qid[GRPQUOTA] = attr->cat_gid;
2242                if (rc == 0 && osc_quota_chkdq(cli, qid) == NO_QUOTA)
2243                        rc = -EDQUOT;
2244                if (rc)
2245                        RETURN(rc);
2246        }
2247
2248        oap->oap_cmd = cmd;
2249        oap->oap_page_off = ops->ops_from;
2250        oap->oap_count = ops->ops_to - ops->ops_from;
2251        oap->oap_async_flags = 0;
2252        oap->oap_brw_flags = brw_flags;
2253
2254        OSC_IO_DEBUG(osc, "oap %p page %p added for cmd %d\n",
2255                     oap, oap->oap_page, oap->oap_cmd & OBD_BRW_RWMASK);
2256
2257        index = oap2cl_page(oap)->cp_index;
2258
2259        /* Add this page into extent by the following steps:
2260         * 1. if there exists an active extent for this IO, mostly this page
2261         *    can be added to the active extent and sometimes we need to
2262         *    expand extent to accomodate this page;
2263         * 2. otherwise, a new extent will be allocated. */
2264
2265        ext = oio->oi_active;
2266        if (ext != NULL && ext->oe_start <= index && ext->oe_max_end >= index) {
2267                /* one chunk plus extent overhead must be enough to write this
2268                 * page */
2269                grants = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
2270                if (ext->oe_end >= index)
2271                        grants = 0;
2272
2273                /* it doesn't need any grant to dirty this page */
2274                client_obd_list_lock(&cli->cl_loi_list_lock);
2275                rc = osc_enter_cache_try(cli, oap, grants, 0);
2276                client_obd_list_unlock(&cli->cl_loi_list_lock);
2277                if (rc == 0) { /* try failed */
2278                        grants = 0;
2279                        need_release = 1;
2280                } else if (ext->oe_end < index) {
2281                        int tmp = grants;
2282                        /* try to expand this extent */
2283                        rc = osc_extent_expand(ext, index, &tmp);
2284                        if (rc < 0) {
2285                                need_release = 1;
2286                                /* don't free reserved grant */
2287                        } else {
2288                                OSC_EXTENT_DUMP(D_CACHE, ext,
2289                                                "expanded for %lu.\n", index);
2290                                osc_unreserve_grant(cli, grants, tmp);
2291                                grants = 0;
2292                        }
2293                }
2294                rc = 0;
2295        } else if (ext != NULL) {
2296                /* index is located outside of active extent */
2297                need_release = 1;
2298        }
2299        if (need_release) {
2300                osc_extent_release(env, ext);
2301                oio->oi_active = NULL;
2302                ext = NULL;
2303        }
2304
2305        if (ext == NULL) {
2306                int tmp = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
2307
2308                /* try to find new extent to cover this page */
2309                LASSERT(oio->oi_active == NULL);
2310                /* we may have allocated grant for this page if we failed
2311                 * to expand the previous active extent. */
2312                LASSERT(ergo(grants > 0, grants >= tmp));
2313
2314                rc = 0;
2315                if (grants == 0) {
2316                        /* we haven't allocated grant for this page. */
2317                        rc = osc_enter_cache(env, cli, oap, tmp);
2318                        if (rc == 0)
2319                                grants = tmp;
2320                }
2321
2322                tmp = grants;
2323                if (rc == 0) {
2324                        ext = osc_extent_find(env, osc, index, &tmp);
2325                        if (IS_ERR(ext)) {
2326                                LASSERT(tmp == grants);
2327                                osc_exit_cache(cli, oap);
2328                                rc = PTR_ERR(ext);
2329                                ext = NULL;
2330                        } else {
2331                                oio->oi_active = ext;
2332                        }
2333                }
2334                if (grants > 0)
2335                        osc_unreserve_grant(cli, grants, tmp);
2336        }
2337
2338        LASSERT(ergo(rc == 0, ext != NULL));
2339        if (ext != NULL) {
2340                EASSERTF(ext->oe_end >= index && ext->oe_start <= index,
2341                         ext, "index = %lu.\n", index);
2342                LASSERT((oap->oap_brw_flags & OBD_BRW_FROM_GRANT) != 0);
2343
2344                osc_object_lock(osc);
2345                if (ext->oe_nr_pages == 0)
2346                        ext->oe_srvlock = ops->ops_srvlock;
2347                else
2348                        LASSERT(ext->oe_srvlock == ops->ops_srvlock);
2349                ++ext->oe_nr_pages;
2350                list_add_tail(&oap->oap_pending_item, &ext->oe_pages);
2351                osc_object_unlock(osc);
2352        }
2353        RETURN(rc);
2354}
2355
2356int osc_teardown_async_page(const struct lu_env *env,
2357                            struct osc_object *obj, struct osc_page *ops)
2358{
2359        struct osc_async_page *oap = &ops->ops_oap;
2360        struct osc_extent     *ext = NULL;
2361        int rc = 0;
2362        ENTRY;
2363
2364        LASSERT(oap->oap_magic == OAP_MAGIC);
2365
2366        CDEBUG(D_INFO, "teardown oap %p page %p at index %lu.\n",
2367               oap, ops, oap2cl_page(oap)->cp_index);
2368
2369        osc_object_lock(obj);
2370        if (!list_empty(&oap->oap_rpc_item)) {
2371                CDEBUG(D_CACHE, "oap %p is not in cache.\n", oap);
2372                rc = -EBUSY;
2373        } else if (!list_empty(&oap->oap_pending_item)) {
2374                ext = osc_extent_lookup(obj, oap2cl_page(oap)->cp_index);
2375                /* only truncated pages are allowed to be taken out.
2376                 * See osc_extent_truncate() and osc_cache_truncate_start()
2377                 * for details. */
2378                if (ext != NULL && ext->oe_state != OES_TRUNC) {
2379                        OSC_EXTENT_DUMP(D_ERROR, ext, "trunc at %lu.\n",
2380                                        oap2cl_page(oap)->cp_index);
2381                        rc = -EBUSY;
2382                }
2383        }
2384        osc_object_unlock(obj);
2385        if (ext != NULL)
2386                osc_extent_put(env, ext);
2387        RETURN(rc);
2388}
2389
2390/**
2391 * This is called when a page is picked up by kernel to write out.
2392 *
2393 * We should find out the corresponding extent and add the whole extent
2394 * into urgent list. The extent may be being truncated or used, handle it
2395 * carefully.
2396 */
2397int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
2398                         struct osc_page *ops)
2399{
2400        struct osc_extent *ext   = NULL;
2401        struct osc_object *obj   = cl2osc(ops->ops_cl.cpl_obj);
2402        struct cl_page    *cp    = ops->ops_cl.cpl_page;
2403        pgoff_t     index = cp->cp_index;
2404        struct osc_async_page *oap = &ops->ops_oap;
2405        bool unplug = false;
2406        int rc = 0;
2407        ENTRY;
2408
2409        osc_object_lock(obj);
2410        ext = osc_extent_lookup(obj, index);
2411        if (ext == NULL) {
2412                osc_extent_tree_dump(D_ERROR, obj);
2413                LASSERTF(0, "page index %lu is NOT covered.\n", index);
2414        }
2415
2416        switch (ext->oe_state) {
2417        case OES_RPC:
2418        case OES_LOCK_DONE:
2419                CL_PAGE_DEBUG(D_ERROR, env, cl_page_top(cp),
2420                              "flush an in-rpc page?\n");
2421                LASSERT(0);
2422                break;
2423        case OES_LOCKING:
2424                /* If we know this extent is being written out, we should abort
2425                 * so that the writer can make this page ready. Otherwise, there
2426                 * exists a deadlock problem because other process can wait for
2427                 * page writeback bit holding page lock; and meanwhile in
2428                 * vvp_page_make_ready(), we need to grab page lock before
2429                 * really sending the RPC. */
2430        case OES_TRUNC:
2431                /* race with truncate, page will be redirtied */
2432                GOTO(out, rc = -EAGAIN);
2433        default:
2434                break;
2435        }
2436
2437        rc = cl_page_prep(env, io, cl_page_top(cp), CRT_WRITE);
2438        if (rc)
2439                GOTO(out, rc);
2440
2441        spin_lock(&oap->oap_lock);
2442        oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT;
2443        spin_unlock(&oap->oap_lock);
2444
2445        if (memory_pressure_get())
2446                ext->oe_memalloc = 1;
2447
2448        ext->oe_urgent = 1;
2449        if (ext->oe_state == OES_CACHE) {
2450                OSC_EXTENT_DUMP(D_CACHE, ext,
2451                                "flush page %p make it urgent.\n", oap);
2452                if (list_empty(&ext->oe_link))
2453                        list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
2454                unplug = true;
2455        }
2456        rc = 0;
2457        EXIT;
2458
2459out:
2460        osc_object_unlock(obj);
2461        osc_extent_put(env, ext);
2462        if (unplug)
2463                osc_io_unplug_async(env, osc_cli(obj), obj);
2464        return rc;
2465}
2466
2467/**
2468 * this is called when a sync waiter receives an interruption.  Its job is to
2469 * get the caller woken as soon as possible.  If its page hasn't been put in an
2470 * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2471 * desiring interruption which will forcefully complete the rpc once the rpc
2472 * has timed out.
2473 */
2474int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops)
2475{
2476        struct osc_async_page *oap = &ops->ops_oap;
2477        struct osc_object     *obj = oap->oap_obj;
2478        struct client_obd     *cli = osc_cli(obj);
2479        struct osc_extent     *ext;
2480        struct osc_extent     *found = NULL;
2481        struct list_head            *plist;
2482        pgoff_t index = oap2cl_page(oap)->cp_index;
2483        int     rc = -EBUSY;
2484        int     cmd;
2485        ENTRY;
2486
2487        LASSERT(!oap->oap_interrupted);
2488        oap->oap_interrupted = 1;
2489
2490        /* Find out the caching extent */
2491        osc_object_lock(obj);
2492        if (oap->oap_cmd & OBD_BRW_WRITE) {
2493                plist = &obj->oo_urgent_exts;
2494                cmd   = OBD_BRW_WRITE;
2495        } else {
2496                plist = &obj->oo_reading_exts;
2497                cmd   = OBD_BRW_READ;
2498        }
2499        list_for_each_entry(ext, plist, oe_link) {
2500                if (ext->oe_start <= index && ext->oe_end >= index) {
2501                        LASSERT(ext->oe_state == OES_LOCK_DONE);
2502                        /* For OES_LOCK_DONE state extent, it has already held
2503                         * a refcount for RPC. */
2504                        found = osc_extent_get(ext);
2505                        break;
2506                }
2507        }
2508        if (found != NULL) {
2509                list_del_init(&found->oe_link);
2510                osc_update_pending(obj, cmd, -found->oe_nr_pages);
2511                osc_object_unlock(obj);
2512
2513                osc_extent_finish(env, found, 0, -EINTR);
2514                osc_extent_put(env, found);
2515                rc = 0;
2516        } else {
2517                osc_object_unlock(obj);
2518                /* ok, it's been put in an rpc. only one oap gets a request
2519                 * reference */
2520                if (oap->oap_request != NULL) {
2521                        ptlrpc_mark_interrupted(oap->oap_request);
2522                        ptlrpcd_wake(oap->oap_request);
2523                        ptlrpc_req_finished(oap->oap_request);
2524                        oap->oap_request = NULL;
2525                }
2526        }
2527
2528        osc_list_maint(cli, obj);
2529        RETURN(rc);
2530}
2531
2532int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
2533                         struct list_head *list, int cmd, int brw_flags)
2534{
2535        struct client_obd     *cli = osc_cli(obj);
2536        struct osc_extent     *ext;
2537        struct osc_async_page *oap, *tmp;
2538        int     page_count = 0;
2539        int     mppr       = cli->cl_max_pages_per_rpc;
2540        pgoff_t start      = CL_PAGE_EOF;
2541        pgoff_t end     = 0;
2542        ENTRY;
2543
2544        list_for_each_entry(oap, list, oap_pending_item) {
2545                struct cl_page *cp = oap2cl_page(oap);
2546                if (cp->cp_index > end)
2547                        end = cp->cp_index;
2548                if (cp->cp_index < start)
2549                        start = cp->cp_index;
2550                ++page_count;
2551                mppr <<= (page_count > mppr);
2552        }
2553
2554        ext = osc_extent_alloc(obj);
2555        if (ext == NULL) {
2556                list_for_each_entry_safe(oap, tmp, list, oap_pending_item) {
2557                        list_del_init(&oap->oap_pending_item);
2558                        osc_ap_completion(env, cli, oap, 0, -ENOMEM);
2559                }
2560                RETURN(-ENOMEM);
2561        }
2562
2563        ext->oe_rw = !!(cmd & OBD_BRW_READ);
2564        ext->oe_urgent = 1;
2565        ext->oe_start = start;
2566        ext->oe_end = ext->oe_max_end = end;
2567        ext->oe_obj = obj;
2568        ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
2569        ext->oe_nr_pages = page_count;
2570        ext->oe_mppr = mppr;
2571        list_splice_init(list, &ext->oe_pages);
2572
2573        osc_object_lock(obj);
2574        /* Reuse the initial refcount for RPC, don't drop it */
2575        osc_extent_state_set(ext, OES_LOCK_DONE);
2576        if (cmd & OBD_BRW_WRITE) {
2577                list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
2578                osc_update_pending(obj, OBD_BRW_WRITE, page_count);
2579        } else {
2580                list_add_tail(&ext->oe_link, &obj->oo_reading_exts);
2581                osc_update_pending(obj, OBD_BRW_READ, page_count);
2582        }
2583        osc_object_unlock(obj);
2584
2585        osc_io_unplug(env, cli, obj, PDL_POLICY_ROUND);
2586        RETURN(0);
2587}
2588
2589/**
2590 * Called by osc_io_setattr_start() to freeze and destroy covering extents.
2591 */
2592int osc_cache_truncate_start(const struct lu_env *env, struct osc_io *oio,
2593                             struct osc_object *obj, __u64 size)
2594{
2595        struct client_obd *cli = osc_cli(obj);
2596        struct osc_extent *ext;
2597        struct osc_extent *waiting = NULL;
2598        pgoff_t index;
2599        LIST_HEAD(list);
2600        int result = 0;
2601        bool partial;
2602        ENTRY;
2603
2604        /* pages with index greater or equal to index will be truncated. */
2605        index = cl_index(osc2cl(obj), size);
2606        partial = size > cl_offset(osc2cl(obj), index);
2607
2608again:
2609        osc_object_lock(obj);
2610        ext = osc_extent_search(obj, index);
2611        if (ext == NULL)
2612                ext = first_extent(obj);
2613        else if (ext->oe_end < index)
2614                ext = next_extent(ext);
2615        while (ext != NULL) {
2616                EASSERT(ext->oe_state != OES_TRUNC, ext);
2617
2618                if (ext->oe_state > OES_CACHE || ext->oe_urgent) {
2619                        /* if ext is in urgent state, it means there must exist
2620                         * a page already having been flushed by write_page().
2621                         * We have to wait for this extent because we can't
2622                         * truncate that page. */
2623                        LASSERT(!ext->oe_hp);
2624                        OSC_EXTENT_DUMP(D_CACHE, ext,
2625                                        "waiting for busy extent\n");
2626                        waiting = osc_extent_get(ext);
2627                        break;
2628                }
2629
2630                OSC_EXTENT_DUMP(D_CACHE, ext, "try to trunc:"LPU64".\n", size);
2631
2632                osc_extent_get(ext);
2633                if (ext->oe_state == OES_ACTIVE) {
2634                        /* though we grab inode mutex for write path, but we
2635                         * release it before releasing extent(in osc_io_end()),
2636                         * so there is a race window that an extent is still
2637                         * in OES_ACTIVE when truncate starts. */
2638                        LASSERT(!ext->oe_trunc_pending);
2639                        ext->oe_trunc_pending = 1;
2640                } else {
2641                        EASSERT(ext->oe_state == OES_CACHE, ext);
2642                        osc_extent_state_set(ext, OES_TRUNC);
2643                        osc_update_pending(obj, OBD_BRW_WRITE,
2644                                           -ext->oe_nr_pages);
2645                }
2646                EASSERT(list_empty(&ext->oe_link), ext);
2647                list_add_tail(&ext->oe_link, &list);
2648
2649                ext = next_extent(ext);
2650        }
2651        osc_object_unlock(obj);
2652
2653        osc_list_maint(cli, obj);
2654
2655        while (!list_empty(&list)) {
2656                int rc;
2657
2658                ext = list_entry(list.next, struct osc_extent, oe_link);
2659                list_del_init(&ext->oe_link);
2660
2661                /* extent may be in OES_ACTIVE state because inode mutex
2662                 * is released before osc_io_end() in file write case */
2663                if (ext->oe_state != OES_TRUNC)
2664                        osc_extent_wait(env, ext, OES_TRUNC);
2665
2666                rc = osc_extent_truncate(ext, index, partial);
2667                if (rc < 0) {
2668                        if (result == 0)
2669                                result = rc;
2670
2671                        OSC_EXTENT_DUMP(D_ERROR, ext,
2672                                        "truncate error %d\n", rc);
2673                } else if (ext->oe_nr_pages == 0) {
2674                        osc_extent_remove(ext);
2675                } else {
2676                        /* this must be an overlapped extent which means only
2677                         * part of pages in this extent have been truncated.
2678                         */
2679                        EASSERTF(ext->oe_start <= index, ext,
2680                                 "trunc index = %lu/%d.\n", index, partial);
2681                        /* fix index to skip this partially truncated extent */
2682                        index = ext->oe_end + 1;
2683                        partial = false;
2684
2685                        /* we need to hold this extent in OES_TRUNC state so
2686                         * that no writeback will happen. This is to avoid
2687                         * BUG 17397. */
2688                        LASSERT(oio->oi_trunc == NULL);
2689                        oio->oi_trunc = osc_extent_get(ext);
2690                        OSC_EXTENT_DUMP(D_CACHE, ext,
2691                                        "trunc at "LPU64"\n", size);
2692                }
2693                osc_extent_put(env, ext);
2694        }
2695        if (waiting != NULL) {
2696                int rc;
2697
2698                /* ignore the result of osc_extent_wait the write initiator
2699                 * should take care of it. */
2700                rc = osc_extent_wait(env, waiting, OES_INV);
2701                if (rc < 0)
2702                        OSC_EXTENT_DUMP(D_CACHE, waiting, "error: %d.\n", rc);
2703
2704                osc_extent_put(env, waiting);
2705                waiting = NULL;
2706                goto again;
2707        }
2708        RETURN(result);
2709}
2710
2711/**
2712 * Called after osc_io_setattr_end to add oio->oi_trunc back to cache.
2713 */
2714void osc_cache_truncate_end(const struct lu_env *env, struct osc_io *oio,
2715                            struct osc_object *obj)
2716{
2717        struct osc_extent *ext = oio->oi_trunc;
2718
2719        oio->oi_trunc = NULL;
2720        if (ext != NULL) {
2721                bool unplug = false;
2722
2723                EASSERT(ext->oe_nr_pages > 0, ext);
2724                EASSERT(ext->oe_state == OES_TRUNC, ext);
2725                EASSERT(!ext->oe_urgent, ext);
2726
2727                OSC_EXTENT_DUMP(D_CACHE, ext, "trunc -> cache.\n");
2728                osc_object_lock(obj);
2729                osc_extent_state_set(ext, OES_CACHE);
2730                if (ext->oe_fsync_wait && !ext->oe_urgent) {
2731                        ext->oe_urgent = 1;
2732                        list_move_tail(&ext->oe_link, &obj->oo_urgent_exts);
2733                        unplug = true;
2734                }
2735                osc_update_pending(obj, OBD_BRW_WRITE, ext->oe_nr_pages);
2736                osc_object_unlock(obj);
2737                osc_extent_put(env, ext);
2738
2739                if (unplug)
2740                        osc_io_unplug_async(env, osc_cli(obj), obj);
2741        }
2742}
2743
2744/**
2745 * Wait for extents in a specific range to be written out.
2746 * The caller must have called osc_cache_writeback_range() to issue IO
2747 * otherwise it will take a long time for this function to finish.
2748 *
2749 * Caller must hold inode_mutex , or cancel exclusive dlm lock so that
2750 * nobody else can dirty this range of file while we're waiting for
2751 * extents to be written.
2752 */
2753int osc_cache_wait_range(const struct lu_env *env, struct osc_object *obj,
2754                         pgoff_t start, pgoff_t end)
2755{
2756        struct osc_extent *ext;
2757        pgoff_t index = start;
2758        int     result = 0;
2759        ENTRY;
2760
2761again:
2762        osc_object_lock(obj);
2763        ext = osc_extent_search(obj, index);
2764        if (ext == NULL)
2765                ext = first_extent(obj);
2766        else if (ext->oe_end < index)
2767                ext = next_extent(ext);
2768        while (ext != NULL) {
2769                int rc;
2770
2771                if (ext->oe_start > end)
2772                        break;
2773
2774                if (!ext->oe_fsync_wait) {
2775                        ext = next_extent(ext);
2776                        continue;
2777                }
2778
2779                EASSERT(ergo(ext->oe_state == OES_CACHE,
2780                             ext->oe_hp || ext->oe_urgent), ext);
2781                EASSERT(ergo(ext->oe_state == OES_ACTIVE,
2782                             !ext->oe_hp && ext->oe_urgent), ext);
2783
2784                index = ext->oe_end + 1;
2785                osc_extent_get(ext);
2786                osc_object_unlock(obj);
2787
2788                rc = osc_extent_wait(env, ext, OES_INV);
2789                if (result == 0)
2790                        result = rc;
2791                osc_extent_put(env, ext);
2792                goto again;
2793        }
2794        osc_object_unlock(obj);
2795
2796        OSC_IO_DEBUG(obj, "sync file range.\n");
2797        RETURN(result);
2798}
2799
2800/**
2801 * Called to write out a range of osc object.
2802 *
2803 * @hp     : should be set this is caused by lock cancel;
2804 * @discard: is set if dirty pages should be dropped - file will be deleted or
2805 *         truncated, this implies there is no partially discarding extents.
2806 *
2807 * Return how many pages will be issued, or error code if error occurred.
2808 */
2809int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
2810                              pgoff_t start, pgoff_t end, int hp, int discard)
2811{
2812        struct osc_extent *ext;
2813        LIST_HEAD(discard_list);
2814        bool unplug = false;
2815        int result = 0;
2816        ENTRY;
2817
2818        osc_object_lock(obj);
2819        ext = osc_extent_search(obj, start);
2820        if (ext == NULL)
2821                ext = first_extent(obj);
2822        else if (ext->oe_end < start)
2823                ext = next_extent(ext);
2824        while (ext != NULL) {
2825                if (ext->oe_start > end)
2826                        break;
2827
2828                ext->oe_fsync_wait = 1;
2829                switch (ext->oe_state) {
2830                case OES_CACHE:
2831                        result += ext->oe_nr_pages;
2832                        if (!discard) {
2833                                struct list_head *list = NULL;
2834                                if (hp) {
2835                                        EASSERT(!ext->oe_hp, ext);
2836                                        ext->oe_hp = 1;
2837                                        list = &obj->oo_hp_exts;
2838                                } else if (!ext->oe_urgent) {
2839                                        ext->oe_urgent = 1;
2840                                        list = &obj->oo_urgent_exts;
2841                                }
2842                                if (list != NULL)
2843                                        list_move_tail(&ext->oe_link, list);
2844                                unplug = true;
2845                        } else {
2846                                /* the only discarder is lock cancelling, so
2847                                 * [start, end] must contain this extent */
2848                                EASSERT(ext->oe_start >= start &&
2849                                        ext->oe_max_end <= end, ext);
2850                                osc_extent_state_set(ext, OES_LOCKING);
2851                                ext->oe_owner = current;
2852                                list_move_tail(&ext->oe_link,
2853                                                   &discard_list);
2854                                osc_update_pending(obj, OBD_BRW_WRITE,
2855                                                   -ext->oe_nr_pages);
2856                        }
2857                        break;
2858                case OES_ACTIVE:
2859                        /* It's pretty bad to wait for ACTIVE extents, because
2860                         * we don't know how long we will wait for it to be
2861                         * flushed since it may be blocked at awaiting more
2862                         * grants. We do this for the correctness of fsync. */
2863                        LASSERT(hp == 0 && discard == 0);
2864                        ext->oe_urgent = 1;
2865                        break;
2866                case OES_TRUNC:
2867                        /* this extent is being truncated, can't do anything
2868                         * for it now. it will be set to urgent after truncate
2869                         * is finished in osc_cache_truncate_end(). */
2870                default:
2871                        break;
2872                }
2873                ext = next_extent(ext);
2874        }
2875        osc_object_unlock(obj);
2876
2877        LASSERT(ergo(!discard, list_empty(&discard_list)));
2878        if (!list_empty(&discard_list)) {
2879                struct osc_extent *tmp;
2880                int rc;
2881
2882                osc_list_maint(osc_cli(obj), obj);
2883                list_for_each_entry_safe(ext, tmp, &discard_list, oe_link) {
2884                        list_del_init(&ext->oe_link);
2885                        EASSERT(ext->oe_state == OES_LOCKING, ext);
2886
2887                        /* Discard caching pages. We don't actually write this
2888                         * extent out but we complete it as if we did. */
2889                        rc = osc_extent_make_ready(env, ext);
2890                        if (unlikely(rc < 0)) {
2891                                OSC_EXTENT_DUMP(D_ERROR, ext,
2892                                                "make_ready returned %d\n", rc);
2893                                if (result >= 0)
2894                                        result = rc;
2895                        }
2896
2897                        /* finish the extent as if the pages were sent */
2898                        osc_extent_finish(env, ext, 0, 0);
2899                }
2900        }
2901
2902        if (unplug)
2903                osc_io_unplug(env, osc_cli(obj), obj, PDL_POLICY_ROUND);
2904
2905        if (hp || discard) {
2906                int rc;
2907                rc = osc_cache_wait_range(env, obj, start, end);
2908                if (result >= 0 && rc < 0)
2909                        result = rc;
2910        }
2911
2912        OSC_IO_DEBUG(obj, "cache page out.\n");
2913        RETURN(result);
2914}
2915
2916/** @} osc */
2917