linux/drivers/staging/lustre/lustre/obdclass/cl_page.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * Client Lustre Page.
  37 *
  38 *   Author: Nikita Danilov <nikita.danilov@sun.com>
  39 */
  40
  41#define DEBUG_SUBSYSTEM S_CLASS
  42
  43#include "../../include/linux/libcfs/libcfs.h"
  44#include "../include/obd_class.h"
  45#include "../include/obd_support.h"
  46#include <linux/list.h>
  47
  48#include "../include/cl_object.h"
  49#include "cl_internal.h"
  50
  51static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg,
  52                            int radix);
  53
  54# define PASSERT(env, page, expr)                                          \
  55        do {                                                               \
  56                if (unlikely(!(expr))) {                                   \
  57                        CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n"); \
  58                        LASSERT(0);                                        \
  59                }                                                          \
  60        } while (0)
  61
  62# define PINVRNT(env, page, exp) \
  63        ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
  64
  65/**
  66 * Internal version of cl_page_top, it should be called if the page is
  67 * known to be not freed, says with page referenced, or radix tree lock held,
  68 * or page owned.
  69 */
  70static struct cl_page *cl_page_top_trusted(struct cl_page *page)
  71{
  72        while (page->cp_parent != NULL)
  73                page = page->cp_parent;
  74        return page;
  75}
  76
  77/**
  78 * Internal version of cl_page_get().
  79 *
  80 * This function can be used to obtain initial reference to previously
  81 * unreferenced cached object. It can be called only if concurrent page
  82 * reclamation is somehow prevented, e.g., by locking page radix-tree
  83 * (cl_object_header::hdr->coh_page_guard), or by keeping a lock on a VM page,
  84 * associated with \a page.
  85 *
  86 * Use with care! Not exported.
  87 */
  88static void cl_page_get_trust(struct cl_page *page)
  89{
  90        LASSERT(atomic_read(&page->cp_ref) > 0);
  91        atomic_inc(&page->cp_ref);
  92}
  93
  94/**
  95 * Returns a slice within a page, corresponding to the given layer in the
  96 * device stack.
  97 *
  98 * \see cl_lock_at()
  99 */
 100static const struct cl_page_slice *
 101cl_page_at_trusted(const struct cl_page *page,
 102                   const struct lu_device_type *dtype)
 103{
 104        const struct cl_page_slice *slice;
 105
 106        page = cl_page_top_trusted((struct cl_page *)page);
 107        do {
 108                list_for_each_entry(slice, &page->cp_layers, cpl_linkage) {
 109                        if (slice->cpl_obj->co_lu.lo_dev->ld_type == dtype)
 110                                return slice;
 111                }
 112                page = page->cp_child;
 113        } while (page != NULL);
 114        return NULL;
 115}
 116
 117/**
 118 * Returns a page with given index in the given object, or NULL if no page is
 119 * found. Acquires a reference on \a page.
 120 *
 121 * Locking: called under cl_object_header::coh_page_guard spin-lock.
 122 */
 123struct cl_page *cl_page_lookup(struct cl_object_header *hdr, pgoff_t index)
 124{
 125        struct cl_page *page;
 126
 127        assert_spin_locked(&hdr->coh_page_guard);
 128
 129        page = radix_tree_lookup(&hdr->coh_tree, index);
 130        if (page != NULL)
 131                cl_page_get_trust(page);
 132        return page;
 133}
 134EXPORT_SYMBOL(cl_page_lookup);
 135
 136/**
 137 * Returns a list of pages by a given [start, end] of \a obj.
 138 *
 139 * \param resched If not NULL, then we give up before hogging CPU for too
 140 * long and set *resched = 1, in that case caller should implement a retry
 141 * logic.
 142 *
 143 * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
 144 * crucial in the face of [offset, EOF] locks.
 145 *
 146 * Return at least one page in @queue unless there is no covered page.
 147 */
 148int cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
 149                        struct cl_io *io, pgoff_t start, pgoff_t end,
 150                        cl_page_gang_cb_t cb, void *cbdata)
 151{
 152        struct cl_object_header *hdr;
 153        struct cl_page    *page;
 154        struct cl_page   **pvec;
 155        const struct cl_page_slice  *slice;
 156        const struct lu_device_type *dtype;
 157        pgoff_t           idx;
 158        unsigned int         nr;
 159        unsigned int         i;
 160        unsigned int         j;
 161        int                   res = CLP_GANG_OKAY;
 162        int                   tree_lock = 1;
 163
 164        idx = start;
 165        hdr = cl_object_header(obj);
 166        pvec = cl_env_info(env)->clt_pvec;
 167        dtype = cl_object_top(obj)->co_lu.lo_dev->ld_type;
 168        spin_lock(&hdr->coh_page_guard);
 169        while ((nr = radix_tree_gang_lookup(&hdr->coh_tree, (void **)pvec,
 170                                            idx, CLT_PVEC_SIZE)) > 0) {
 171                int end_of_region = 0;
 172
 173                idx = pvec[nr - 1]->cp_index + 1;
 174                for (i = 0, j = 0; i < nr; ++i) {
 175                        page = pvec[i];
 176                        pvec[i] = NULL;
 177
 178                        LASSERT(page->cp_type == CPT_CACHEABLE);
 179                        if (page->cp_index > end) {
 180                                end_of_region = 1;
 181                                break;
 182                        }
 183                        if (page->cp_state == CPS_FREEING)
 184                                continue;
 185
 186                        slice = cl_page_at_trusted(page, dtype);
 187                        /*
 188                         * Pages for lsm-less file has no underneath sub-page
 189                         * for osc, in case of ...
 190                         */
 191                        PASSERT(env, page, slice != NULL);
 192
 193                        page = slice->cpl_page;
 194                        /*
 195                         * Can safely call cl_page_get_trust() under
 196                         * radix-tree spin-lock.
 197                         *
 198                         * XXX not true, because @page is from object another
 199                         * than @hdr and protected by different tree lock.
 200                         */
 201                        cl_page_get_trust(page);
 202                        lu_ref_add_atomic(&page->cp_reference,
 203                                          "gang_lookup", current);
 204                        pvec[j++] = page;
 205                }
 206
 207                /*
 208                 * Here a delicate locking dance is performed. Current thread
 209                 * holds a reference to a page, but has to own it before it
 210                 * can be placed into queue. Owning implies waiting, so
 211                 * radix-tree lock is to be released. After a wait one has to
 212                 * check that pages weren't truncated (cl_page_own() returns
 213                 * error in the latter case).
 214                 */
 215                spin_unlock(&hdr->coh_page_guard);
 216                tree_lock = 0;
 217
 218                for (i = 0; i < j; ++i) {
 219                        page = pvec[i];
 220                        if (res == CLP_GANG_OKAY)
 221                                res = (*cb)(env, io, page, cbdata);
 222                        lu_ref_del(&page->cp_reference,
 223                                   "gang_lookup", current);
 224                        cl_page_put(env, page);
 225                }
 226                if (nr < CLT_PVEC_SIZE || end_of_region)
 227                        break;
 228
 229                if (res == CLP_GANG_OKAY && need_resched())
 230                        res = CLP_GANG_RESCHED;
 231                if (res != CLP_GANG_OKAY)
 232                        break;
 233
 234                spin_lock(&hdr->coh_page_guard);
 235                tree_lock = 1;
 236        }
 237        if (tree_lock)
 238                spin_unlock(&hdr->coh_page_guard);
 239        return res;
 240}
 241EXPORT_SYMBOL(cl_page_gang_lookup);
 242
 243static void cl_page_free(const struct lu_env *env, struct cl_page *page)
 244{
 245        struct cl_object *obj  = page->cp_obj;
 246
 247        PASSERT(env, page, list_empty(&page->cp_batch));
 248        PASSERT(env, page, page->cp_owner == NULL);
 249        PASSERT(env, page, page->cp_req == NULL);
 250        PASSERT(env, page, page->cp_parent == NULL);
 251        PASSERT(env, page, page->cp_state == CPS_FREEING);
 252
 253        might_sleep();
 254        while (!list_empty(&page->cp_layers)) {
 255                struct cl_page_slice *slice;
 256
 257                slice = list_entry(page->cp_layers.next,
 258                                       struct cl_page_slice, cpl_linkage);
 259                list_del_init(page->cp_layers.next);
 260                slice->cpl_ops->cpo_fini(env, slice);
 261        }
 262        lu_object_ref_del_at(&obj->co_lu, &page->cp_obj_ref, "cl_page", page);
 263        cl_object_put(env, obj);
 264        lu_ref_fini(&page->cp_reference);
 265        kfree(page);
 266}
 267
 268/**
 269 * Helper function updating page state. This is the only place in the code
 270 * where cl_page::cp_state field is mutated.
 271 */
 272static inline void cl_page_state_set_trust(struct cl_page *page,
 273                                           enum cl_page_state state)
 274{
 275        /* bypass const. */
 276        *(enum cl_page_state *)&page->cp_state = state;
 277}
 278
 279static struct cl_page *cl_page_alloc(const struct lu_env *env,
 280                struct cl_object *o, pgoff_t ind, struct page *vmpage,
 281                enum cl_page_type type)
 282{
 283        struct cl_page    *page;
 284        struct lu_object_header *head;
 285
 286        OBD_ALLOC_GFP(page, cl_object_header(o)->coh_page_bufsize,
 287                        GFP_NOFS);
 288        if (page != NULL) {
 289                int result = 0;
 290
 291                atomic_set(&page->cp_ref, 1);
 292                if (type == CPT_CACHEABLE) /* for radix tree */
 293                        atomic_inc(&page->cp_ref);
 294                page->cp_obj = o;
 295                cl_object_get(o);
 296                lu_object_ref_add_at(&o->co_lu, &page->cp_obj_ref, "cl_page",
 297                                     page);
 298                page->cp_index = ind;
 299                cl_page_state_set_trust(page, CPS_CACHED);
 300                page->cp_type = type;
 301                INIT_LIST_HEAD(&page->cp_layers);
 302                INIT_LIST_HEAD(&page->cp_batch);
 303                INIT_LIST_HEAD(&page->cp_flight);
 304                mutex_init(&page->cp_mutex);
 305                lu_ref_init(&page->cp_reference);
 306                head = o->co_lu.lo_header;
 307                list_for_each_entry(o, &head->loh_layers,
 308                                        co_lu.lo_linkage) {
 309                        if (o->co_ops->coo_page_init != NULL) {
 310                                result = o->co_ops->coo_page_init(env, o,
 311                                                                  page, vmpage);
 312                                if (result != 0) {
 313                                        cl_page_delete0(env, page, 0);
 314                                        cl_page_free(env, page);
 315                                        page = ERR_PTR(result);
 316                                        break;
 317                                }
 318                        }
 319                }
 320        } else {
 321                page = ERR_PTR(-ENOMEM);
 322        }
 323        return page;
 324}
 325
 326/**
 327 * Returns a cl_page with index \a idx at the object \a o, and associated with
 328 * the VM page \a vmpage.
 329 *
 330 * This is the main entry point into the cl_page caching interface. First, a
 331 * cache (implemented as a per-object radix tree) is consulted. If page is
 332 * found there, it is returned immediately. Otherwise new page is allocated
 333 * and returned. In any case, additional reference to page is acquired.
 334 *
 335 * \see cl_object_find(), cl_lock_find()
 336 */
 337static struct cl_page *cl_page_find0(const struct lu_env *env,
 338                                     struct cl_object *o,
 339                                     pgoff_t idx, struct page *vmpage,
 340                                     enum cl_page_type type,
 341                                     struct cl_page *parent)
 342{
 343        struct cl_page    *page = NULL;
 344        struct cl_page    *ghost = NULL;
 345        struct cl_object_header *hdr;
 346        int err;
 347
 348        LASSERT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
 349        might_sleep();
 350
 351        hdr = cl_object_header(o);
 352
 353        CDEBUG(D_PAGE, "%lu@"DFID" %p %lx %d\n",
 354               idx, PFID(&hdr->coh_lu.loh_fid), vmpage, vmpage->private, type);
 355        /* fast path. */
 356        if (type == CPT_CACHEABLE) {
 357                /*
 358                 * vmpage lock is used to protect the child/parent
 359                 * relationship
 360                 */
 361                KLASSERT(PageLocked(vmpage));
 362                /*
 363                 * cl_vmpage_page() can be called here without any locks as
 364                 *
 365                 *     - "vmpage" is locked (which prevents ->private from
 366                 *       concurrent updates), and
 367                 *
 368                 *     - "o" cannot be destroyed while current thread holds a
 369                 *       reference on it.
 370                 */
 371                page = cl_vmpage_page(vmpage, o);
 372                PINVRNT(env, page,
 373                        ergo(page != NULL,
 374                             cl_page_vmpage(env, page) == vmpage &&
 375                             (void *)radix_tree_lookup(&hdr->coh_tree,
 376                                                       idx) == page));
 377        }
 378
 379        if (page != NULL)
 380                return page;
 381
 382        /* allocate and initialize cl_page */
 383        page = cl_page_alloc(env, o, idx, vmpage, type);
 384        if (IS_ERR(page))
 385                return page;
 386
 387        if (type == CPT_TRANSIENT) {
 388                if (parent) {
 389                        LASSERT(page->cp_parent == NULL);
 390                        page->cp_parent = parent;
 391                        parent->cp_child = page;
 392                }
 393                return page;
 394        }
 395
 396        /*
 397         * XXX optimization: use radix_tree_preload() here, and change tree
 398         * gfp mask to GFP_KERNEL in cl_object_header_init().
 399         */
 400        spin_lock(&hdr->coh_page_guard);
 401        err = radix_tree_insert(&hdr->coh_tree, idx, page);
 402        if (err != 0) {
 403                ghost = page;
 404                /*
 405                 * Noted by Jay: a lock on \a vmpage protects cl_page_find()
 406                 * from this race, but
 407                 *
 408                 *     0. it's better to have cl_page interface "locally
 409                 *     consistent" so that its correctness can be reasoned
 410                 *     about without appealing to the (obscure world of) VM
 411                 *     locking.
 412                 *
 413                 *     1. handling this race allows ->coh_tree to remain
 414                 *     consistent even when VM locking is somehow busted,
 415                 *     which is very useful during diagnosing and debugging.
 416                 */
 417                page = ERR_PTR(err);
 418                CL_PAGE_DEBUG(D_ERROR, env, ghost,
 419                              "fail to insert into radix tree: %d\n", err);
 420        } else {
 421                if (parent) {
 422                        LASSERT(page->cp_parent == NULL);
 423                        page->cp_parent = parent;
 424                        parent->cp_child = page;
 425                }
 426                hdr->coh_pages++;
 427        }
 428        spin_unlock(&hdr->coh_page_guard);
 429
 430        if (unlikely(ghost != NULL)) {
 431                cl_page_delete0(env, ghost, 0);
 432                cl_page_free(env, ghost);
 433        }
 434        return page;
 435}
 436
 437struct cl_page *cl_page_find(const struct lu_env *env, struct cl_object *o,
 438                             pgoff_t idx, struct page *vmpage,
 439                             enum cl_page_type type)
 440{
 441        return cl_page_find0(env, o, idx, vmpage, type, NULL);
 442}
 443EXPORT_SYMBOL(cl_page_find);
 444
 445
 446struct cl_page *cl_page_find_sub(const struct lu_env *env, struct cl_object *o,
 447                                 pgoff_t idx, struct page *vmpage,
 448                                 struct cl_page *parent)
 449{
 450        return cl_page_find0(env, o, idx, vmpage, parent->cp_type, parent);
 451}
 452EXPORT_SYMBOL(cl_page_find_sub);
 453
 454static inline int cl_page_invariant(const struct cl_page *pg)
 455{
 456        struct cl_object_header *header;
 457        struct cl_page    *parent;
 458        struct cl_page    *child;
 459        struct cl_io        *owner;
 460
 461        /*
 462         * Page invariant is protected by a VM lock.
 463         */
 464        LINVRNT(cl_page_is_vmlocked(NULL, pg));
 465
 466        header = cl_object_header(pg->cp_obj);
 467        parent = pg->cp_parent;
 468        child  = pg->cp_child;
 469        owner  = pg->cp_owner;
 470
 471        return cl_page_in_use(pg) &&
 472                ergo(parent != NULL, parent->cp_child == pg) &&
 473                ergo(child != NULL, child->cp_parent == pg) &&
 474                ergo(child != NULL, pg->cp_obj != child->cp_obj) &&
 475                ergo(parent != NULL, pg->cp_obj != parent->cp_obj) &&
 476                ergo(owner != NULL && parent != NULL,
 477                     parent->cp_owner == pg->cp_owner->ci_parent) &&
 478                ergo(owner != NULL && child != NULL,
 479                     child->cp_owner->ci_parent == owner) &&
 480                /*
 481                 * Either page is early in initialization (has neither child
 482                 * nor parent yet), or it is in the object radix tree.
 483                 */
 484                ergo(pg->cp_state < CPS_FREEING && pg->cp_type == CPT_CACHEABLE,
 485                     (void *)radix_tree_lookup(&header->coh_tree,
 486                                               pg->cp_index) == pg ||
 487                     (child == NULL && parent == NULL));
 488}
 489
 490static void cl_page_state_set0(const struct lu_env *env,
 491                               struct cl_page *page, enum cl_page_state state)
 492{
 493        enum cl_page_state old;
 494
 495        /*
 496         * Matrix of allowed state transitions [old][new], for sanity
 497         * checking.
 498         */
 499        static const int allowed_transitions[CPS_NR][CPS_NR] = {
 500                [CPS_CACHED] = {
 501                        [CPS_CACHED]  = 0,
 502                        [CPS_OWNED]   = 1, /* io finds existing cached page */
 503                        [CPS_PAGEIN]  = 0,
 504                        [CPS_PAGEOUT] = 1, /* write-out from the cache */
 505                        [CPS_FREEING] = 1, /* eviction on the memory pressure */
 506                },
 507                [CPS_OWNED] = {
 508                        [CPS_CACHED]  = 1, /* release to the cache */
 509                        [CPS_OWNED]   = 0,
 510                        [CPS_PAGEIN]  = 1, /* start read immediately */
 511                        [CPS_PAGEOUT] = 1, /* start write immediately */
 512                        [CPS_FREEING] = 1, /* lock invalidation or truncate */
 513                },
 514                [CPS_PAGEIN] = {
 515                        [CPS_CACHED]  = 1, /* io completion */
 516                        [CPS_OWNED]   = 0,
 517                        [CPS_PAGEIN]  = 0,
 518                        [CPS_PAGEOUT] = 0,
 519                        [CPS_FREEING] = 0,
 520                },
 521                [CPS_PAGEOUT] = {
 522                        [CPS_CACHED]  = 1, /* io completion */
 523                        [CPS_OWNED]   = 0,
 524                        [CPS_PAGEIN]  = 0,
 525                        [CPS_PAGEOUT] = 0,
 526                        [CPS_FREEING] = 0,
 527                },
 528                [CPS_FREEING] = {
 529                        [CPS_CACHED]  = 0,
 530                        [CPS_OWNED]   = 0,
 531                        [CPS_PAGEIN]  = 0,
 532                        [CPS_PAGEOUT] = 0,
 533                        [CPS_FREEING] = 0,
 534                }
 535        };
 536
 537        old = page->cp_state;
 538        PASSERT(env, page, allowed_transitions[old][state]);
 539        CL_PAGE_HEADER(D_TRACE, env, page, "%d -> %d\n", old, state);
 540        for (; page != NULL; page = page->cp_child) {
 541                PASSERT(env, page, page->cp_state == old);
 542                PASSERT(env, page,
 543                        equi(state == CPS_OWNED, page->cp_owner != NULL));
 544
 545                cl_page_state_set_trust(page, state);
 546        }
 547}
 548
 549static void cl_page_state_set(const struct lu_env *env,
 550                              struct cl_page *page, enum cl_page_state state)
 551{
 552        cl_page_state_set0(env, page, state);
 553}
 554
 555/**
 556 * Acquires an additional reference to a page.
 557 *
 558 * This can be called only by caller already possessing a reference to \a
 559 * page.
 560 *
 561 * \see cl_object_get(), cl_lock_get().
 562 */
 563void cl_page_get(struct cl_page *page)
 564{
 565        cl_page_get_trust(page);
 566}
 567EXPORT_SYMBOL(cl_page_get);
 568
 569/**
 570 * Releases a reference to a page.
 571 *
 572 * When last reference is released, page is returned to the cache, unless it
 573 * is in cl_page_state::CPS_FREEING state, in which case it is immediately
 574 * destroyed.
 575 *
 576 * \see cl_object_put(), cl_lock_put().
 577 */
 578void cl_page_put(const struct lu_env *env, struct cl_page *page)
 579{
 580        PASSERT(env, page, atomic_read(&page->cp_ref) > !!page->cp_parent);
 581
 582        CL_PAGE_HEADER(D_TRACE, env, page, "%d\n",
 583                       atomic_read(&page->cp_ref));
 584
 585        if (atomic_dec_and_test(&page->cp_ref)) {
 586                LASSERT(page->cp_state == CPS_FREEING);
 587
 588                LASSERT(atomic_read(&page->cp_ref) == 0);
 589                PASSERT(env, page, page->cp_owner == NULL);
 590                PASSERT(env, page, list_empty(&page->cp_batch));
 591                /*
 592                 * Page is no longer reachable by other threads. Tear
 593                 * it down.
 594                 */
 595                cl_page_free(env, page);
 596        }
 597}
 598EXPORT_SYMBOL(cl_page_put);
 599
 600/**
 601 * Returns a VM page associated with a given cl_page.
 602 */
 603struct page *cl_page_vmpage(const struct lu_env *env, struct cl_page *page)
 604{
 605        const struct cl_page_slice *slice;
 606
 607        /*
 608         * Find uppermost layer with ->cpo_vmpage() method, and return its
 609         * result.
 610         */
 611        page = cl_page_top(page);
 612        do {
 613                list_for_each_entry(slice, &page->cp_layers, cpl_linkage) {
 614                        if (slice->cpl_ops->cpo_vmpage != NULL)
 615                                return slice->cpl_ops->cpo_vmpage(env, slice);
 616                }
 617                page = page->cp_child;
 618        } while (page != NULL);
 619        LBUG(); /* ->cpo_vmpage() has to be defined somewhere in the stack */
 620}
 621EXPORT_SYMBOL(cl_page_vmpage);
 622
 623/**
 624 * Returns a cl_page associated with a VM page, and given cl_object.
 625 */
 626struct cl_page *cl_vmpage_page(struct page *vmpage, struct cl_object *obj)
 627{
 628        struct cl_page *top;
 629        struct cl_page *page;
 630
 631        KLASSERT(PageLocked(vmpage));
 632
 633        /*
 634         * NOTE: absence of races and liveness of data are guaranteed by page
 635         *       lock on a "vmpage". That works because object destruction has
 636         *       bottom-to-top pass.
 637         */
 638
 639        /*
 640         * This loop assumes that ->private points to the top-most page. This
 641         * can be rectified easily.
 642         */
 643        top = (struct cl_page *)vmpage->private;
 644        if (top == NULL)
 645                return NULL;
 646
 647        for (page = top; page != NULL; page = page->cp_child) {
 648                if (cl_object_same(page->cp_obj, obj)) {
 649                        cl_page_get_trust(page);
 650                        break;
 651                }
 652        }
 653        LASSERT(ergo(page, page->cp_type == CPT_CACHEABLE));
 654        return page;
 655}
 656EXPORT_SYMBOL(cl_vmpage_page);
 657
 658/**
 659 * Returns the top-page for a given page.
 660 *
 661 * \see cl_object_top(), cl_io_top()
 662 */
 663struct cl_page *cl_page_top(struct cl_page *page)
 664{
 665        return cl_page_top_trusted(page);
 666}
 667EXPORT_SYMBOL(cl_page_top);
 668
 669const struct cl_page_slice *cl_page_at(const struct cl_page *page,
 670                                       const struct lu_device_type *dtype)
 671{
 672        return cl_page_at_trusted(page, dtype);
 673}
 674EXPORT_SYMBOL(cl_page_at);
 675
 676#define CL_PAGE_OP(opname) offsetof(struct cl_page_operations, opname)
 677
 678#define CL_PAGE_INVOKE(_env, _page, _op, _proto, ...)              \
 679({                                                                    \
 680        const struct lu_env     *__env  = (_env);                   \
 681        struct cl_page       *__page = (_page);            \
 682        const struct cl_page_slice *__scan;                          \
 683        int                      __result;                         \
 684        ptrdiff_t                  __op   = (_op);                   \
 685        int                    (*__method)_proto;                   \
 686                                                                        \
 687        __result = 0;                                              \
 688        __page = cl_page_top(__page);                              \
 689        do {                                                        \
 690                list_for_each_entry(__scan, &__page->cp_layers,     \
 691                                        cpl_linkage) {            \
 692                        __method = *(void **)((char *)__scan->cpl_ops + \
 693                                              __op);                \
 694                        if (__method != NULL) {                  \
 695                                __result = (*__method)(__env, __scan,   \
 696                                                       ## __VA_ARGS__); \
 697                                if (__result != 0)                    \
 698                                        break;                    \
 699                        }                                              \
 700                }                                                      \
 701                __page = __page->cp_child;                            \
 702        } while (__page != NULL && __result == 0);                    \
 703        if (__result > 0)                                              \
 704                __result = 0;                                      \
 705        __result;                                                      \
 706})
 707
 708#define CL_PAGE_INVOID(_env, _page, _op, _proto, ...)              \
 709do {                                                                \
 710        const struct lu_env     *__env  = (_env);                   \
 711        struct cl_page       *__page = (_page);            \
 712        const struct cl_page_slice *__scan;                          \
 713        ptrdiff_t                  __op   = (_op);                   \
 714        void                  (*__method)_proto;                    \
 715                                                                        \
 716        __page = cl_page_top(__page);                              \
 717        do {                                                        \
 718                list_for_each_entry(__scan, &__page->cp_layers,     \
 719                                        cpl_linkage) {            \
 720                        __method = *(void **)((char *)__scan->cpl_ops + \
 721                                              __op);                \
 722                        if (__method != NULL)                      \
 723                                (*__method)(__env, __scan,            \
 724                                            ## __VA_ARGS__);        \
 725                }                                                      \
 726                __page = __page->cp_child;                            \
 727        } while (__page != NULL);                                      \
 728} while (0)
 729
 730#define CL_PAGE_INVOID_REVERSE(_env, _page, _op, _proto, ...)          \
 731do {                                                                    \
 732        const struct lu_env     *__env  = (_env);                       \
 733        struct cl_page       *__page = (_page);                \
 734        const struct cl_page_slice *__scan;                              \
 735        ptrdiff_t                  __op   = (_op);                       \
 736        void                  (*__method)_proto;                        \
 737                                                                            \
 738        /* get to the bottom page. */                                  \
 739        while (__page->cp_child != NULL)                                    \
 740                __page = __page->cp_child;                                \
 741        do {                                                            \
 742                list_for_each_entry_reverse(__scan, &__page->cp_layers, \
 743                                                cpl_linkage) {        \
 744                        __method = *(void **)((char *)__scan->cpl_ops +     \
 745                                              __op);                    \
 746                        if (__method != NULL)                          \
 747                                (*__method)(__env, __scan,                \
 748                                            ## __VA_ARGS__);            \
 749                }                                                          \
 750                __page = __page->cp_parent;                              \
 751        } while (__page != NULL);                                          \
 752} while (0)
 753
 754static int cl_page_invoke(const struct lu_env *env,
 755                          struct cl_io *io, struct cl_page *page, ptrdiff_t op)
 756
 757{
 758        PINVRNT(env, page, cl_object_same(page->cp_obj, io->ci_obj));
 759        return CL_PAGE_INVOKE(env, page, op,
 760                              (const struct lu_env *,
 761                               const struct cl_page_slice *, struct cl_io *),
 762                              io);
 763}
 764
 765static void cl_page_invoid(const struct lu_env *env,
 766                           struct cl_io *io, struct cl_page *page, ptrdiff_t op)
 767
 768{
 769        PINVRNT(env, page, cl_object_same(page->cp_obj, io->ci_obj));
 770        CL_PAGE_INVOID(env, page, op,
 771                       (const struct lu_env *,
 772                        const struct cl_page_slice *, struct cl_io *), io);
 773}
 774
 775static void cl_page_owner_clear(struct cl_page *page)
 776{
 777        for (page = cl_page_top(page); page != NULL; page = page->cp_child) {
 778                if (page->cp_owner != NULL) {
 779                        LASSERT(page->cp_owner->ci_owned_nr > 0);
 780                        page->cp_owner->ci_owned_nr--;
 781                        page->cp_owner = NULL;
 782                        page->cp_task = NULL;
 783                }
 784        }
 785}
 786
 787static void cl_page_owner_set(struct cl_page *page)
 788{
 789        for (page = cl_page_top(page); page != NULL; page = page->cp_child) {
 790                LASSERT(page->cp_owner != NULL);
 791                page->cp_owner->ci_owned_nr++;
 792        }
 793}
 794
 795void cl_page_disown0(const struct lu_env *env,
 796                     struct cl_io *io, struct cl_page *pg)
 797{
 798        enum cl_page_state state;
 799
 800        state = pg->cp_state;
 801        PINVRNT(env, pg, state == CPS_OWNED || state == CPS_FREEING);
 802        PINVRNT(env, pg, cl_page_invariant(pg));
 803        cl_page_owner_clear(pg);
 804
 805        if (state == CPS_OWNED)
 806                cl_page_state_set(env, pg, CPS_CACHED);
 807        /*
 808         * Completion call-backs are executed in the bottom-up order, so that
 809         * uppermost layer (llite), responsible for VFS/VM interaction runs
 810         * last and can release locks safely.
 811         */
 812        CL_PAGE_INVOID_REVERSE(env, pg, CL_PAGE_OP(cpo_disown),
 813                               (const struct lu_env *,
 814                                const struct cl_page_slice *, struct cl_io *),
 815                               io);
 816}
 817
 818/**
 819 * returns true, iff page is owned by the given io.
 820 */
 821int cl_page_is_owned(const struct cl_page *pg, const struct cl_io *io)
 822{
 823        LINVRNT(cl_object_same(pg->cp_obj, io->ci_obj));
 824        return pg->cp_state == CPS_OWNED && pg->cp_owner == io;
 825}
 826EXPORT_SYMBOL(cl_page_is_owned);
 827
 828/**
 829 * Try to own a page by IO.
 830 *
 831 * Waits until page is in cl_page_state::CPS_CACHED state, and then switch it
 832 * into cl_page_state::CPS_OWNED state.
 833 *
 834 * \pre  !cl_page_is_owned(pg, io)
 835 * \post result == 0 iff cl_page_is_owned(pg, io)
 836 *
 837 * \retval 0   success
 838 *
 839 * \retval -ve failure, e.g., page was destroyed (and landed in
 840 *           cl_page_state::CPS_FREEING instead of cl_page_state::CPS_CACHED).
 841 *           or, page was owned by another thread, or in IO.
 842 *
 843 * \see cl_page_disown()
 844 * \see cl_page_operations::cpo_own()
 845 * \see cl_page_own_try()
 846 * \see cl_page_own
 847 */
 848static int cl_page_own0(const struct lu_env *env, struct cl_io *io,
 849                        struct cl_page *pg, int nonblock)
 850{
 851        int result;
 852
 853        PINVRNT(env, pg, !cl_page_is_owned(pg, io));
 854
 855        pg = cl_page_top(pg);
 856        io = cl_io_top(io);
 857
 858        if (pg->cp_state == CPS_FREEING) {
 859                result = -ENOENT;
 860        } else {
 861                result = CL_PAGE_INVOKE(env, pg, CL_PAGE_OP(cpo_own),
 862                                        (const struct lu_env *,
 863                                         const struct cl_page_slice *,
 864                                         struct cl_io *, int),
 865                                        io, nonblock);
 866                if (result == 0) {
 867                        PASSERT(env, pg, pg->cp_owner == NULL);
 868                        PASSERT(env, pg, pg->cp_req == NULL);
 869                        pg->cp_owner = io;
 870                        pg->cp_task  = current;
 871                        cl_page_owner_set(pg);
 872                        if (pg->cp_state != CPS_FREEING) {
 873                                cl_page_state_set(env, pg, CPS_OWNED);
 874                        } else {
 875                                cl_page_disown0(env, io, pg);
 876                                result = -ENOENT;
 877                        }
 878                }
 879        }
 880        PINVRNT(env, pg, ergo(result == 0, cl_page_invariant(pg)));
 881        return result;
 882}
 883
 884/**
 885 * Own a page, might be blocked.
 886 *
 887 * \see cl_page_own0()
 888 */
 889int cl_page_own(const struct lu_env *env, struct cl_io *io, struct cl_page *pg)
 890{
 891        return cl_page_own0(env, io, pg, 0);
 892}
 893EXPORT_SYMBOL(cl_page_own);
 894
 895/**
 896 * Nonblock version of cl_page_own().
 897 *
 898 * \see cl_page_own0()
 899 */
 900int cl_page_own_try(const struct lu_env *env, struct cl_io *io,
 901                    struct cl_page *pg)
 902{
 903        return cl_page_own0(env, io, pg, 1);
 904}
 905EXPORT_SYMBOL(cl_page_own_try);
 906
 907
 908/**
 909 * Assume page ownership.
 910 *
 911 * Called when page is already locked by the hosting VM.
 912 *
 913 * \pre !cl_page_is_owned(pg, io)
 914 * \post cl_page_is_owned(pg, io)
 915 *
 916 * \see cl_page_operations::cpo_assume()
 917 */
 918void cl_page_assume(const struct lu_env *env,
 919                    struct cl_io *io, struct cl_page *pg)
 920{
 921        PINVRNT(env, pg, cl_object_same(pg->cp_obj, io->ci_obj));
 922
 923        pg = cl_page_top(pg);
 924        io = cl_io_top(io);
 925
 926        cl_page_invoid(env, io, pg, CL_PAGE_OP(cpo_assume));
 927        PASSERT(env, pg, pg->cp_owner == NULL);
 928        pg->cp_owner = io;
 929        pg->cp_task = current;
 930        cl_page_owner_set(pg);
 931        cl_page_state_set(env, pg, CPS_OWNED);
 932}
 933EXPORT_SYMBOL(cl_page_assume);
 934
 935/**
 936 * Releases page ownership without unlocking the page.
 937 *
 938 * Moves page into cl_page_state::CPS_CACHED without releasing a lock on the
 939 * underlying VM page (as VM is supposed to do this itself).
 940 *
 941 * \pre   cl_page_is_owned(pg, io)
 942 * \post !cl_page_is_owned(pg, io)
 943 *
 944 * \see cl_page_assume()
 945 */
 946void cl_page_unassume(const struct lu_env *env,
 947                      struct cl_io *io, struct cl_page *pg)
 948{
 949        PINVRNT(env, pg, cl_page_is_owned(pg, io));
 950        PINVRNT(env, pg, cl_page_invariant(pg));
 951
 952        pg = cl_page_top(pg);
 953        io = cl_io_top(io);
 954        cl_page_owner_clear(pg);
 955        cl_page_state_set(env, pg, CPS_CACHED);
 956        CL_PAGE_INVOID_REVERSE(env, pg, CL_PAGE_OP(cpo_unassume),
 957                               (const struct lu_env *,
 958                                const struct cl_page_slice *, struct cl_io *),
 959                               io);
 960}
 961EXPORT_SYMBOL(cl_page_unassume);
 962
 963/**
 964 * Releases page ownership.
 965 *
 966 * Moves page into cl_page_state::CPS_CACHED.
 967 *
 968 * \pre   cl_page_is_owned(pg, io)
 969 * \post !cl_page_is_owned(pg, io)
 970 *
 971 * \see cl_page_own()
 972 * \see cl_page_operations::cpo_disown()
 973 */
 974void cl_page_disown(const struct lu_env *env,
 975                    struct cl_io *io, struct cl_page *pg)
 976{
 977        PINVRNT(env, pg, cl_page_is_owned(pg, io));
 978
 979        pg = cl_page_top(pg);
 980        io = cl_io_top(io);
 981        cl_page_disown0(env, io, pg);
 982}
 983EXPORT_SYMBOL(cl_page_disown);
 984
 985/**
 986 * Called when page is to be removed from the object, e.g., as a result of
 987 * truncate.
 988 *
 989 * Calls cl_page_operations::cpo_discard() top-to-bottom.
 990 *
 991 * \pre cl_page_is_owned(pg, io)
 992 *
 993 * \see cl_page_operations::cpo_discard()
 994 */
 995void cl_page_discard(const struct lu_env *env,
 996                     struct cl_io *io, struct cl_page *pg)
 997{
 998        PINVRNT(env, pg, cl_page_is_owned(pg, io));
 999        PINVRNT(env, pg, cl_page_invariant(pg));
1000
1001        cl_page_invoid(env, io, pg, CL_PAGE_OP(cpo_discard));
1002}
1003EXPORT_SYMBOL(cl_page_discard);
1004
1005/**
1006 * Version of cl_page_delete() that can be called for not fully constructed
1007 * pages, e.g,. in a error handling cl_page_find()->cl_page_delete0()
1008 * path. Doesn't check page invariant.
1009 */
1010static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg,
1011                            int radix)
1012{
1013        struct cl_page *tmp = pg;
1014
1015        PASSERT(env, pg, pg == cl_page_top(pg));
1016        PASSERT(env, pg, pg->cp_state != CPS_FREEING);
1017
1018        /*
1019         * Severe all ways to obtain new pointers to @pg.
1020         */
1021        cl_page_owner_clear(pg);
1022
1023        /*
1024         * unexport the page firstly before freeing it so that
1025         * the page content is considered to be invalid.
1026         * We have to do this because a CPS_FREEING cl_page may
1027         * be NOT under the protection of a cl_lock.
1028         * Afterwards, if this page is found by other threads, then this
1029         * page will be forced to reread.
1030         */
1031        cl_page_export(env, pg, 0);
1032        cl_page_state_set0(env, pg, CPS_FREEING);
1033
1034        CL_PAGE_INVOID(env, pg, CL_PAGE_OP(cpo_delete),
1035                       (const struct lu_env *, const struct cl_page_slice *));
1036
1037        if (tmp->cp_type == CPT_CACHEABLE) {
1038                if (!radix)
1039                        /* !radix means that @pg is not yet in the radix tree,
1040                         * skip removing it.
1041                         */
1042                        tmp = pg->cp_child;
1043                for (; tmp != NULL; tmp = tmp->cp_child) {
1044                        void                *value;
1045                        struct cl_object_header *hdr;
1046
1047                        hdr = cl_object_header(tmp->cp_obj);
1048                        spin_lock(&hdr->coh_page_guard);
1049                        value = radix_tree_delete(&hdr->coh_tree,
1050                                                  tmp->cp_index);
1051                        PASSERT(env, tmp, value == tmp);
1052                        PASSERT(env, tmp, hdr->coh_pages > 0);
1053                        hdr->coh_pages--;
1054                        spin_unlock(&hdr->coh_page_guard);
1055                        cl_page_put(env, tmp);
1056                }
1057        }
1058}
1059
1060/**
1061 * Called when a decision is made to throw page out of memory.
1062 *
1063 * Notifies all layers about page destruction by calling
1064 * cl_page_operations::cpo_delete() method top-to-bottom.
1065 *
1066 * Moves page into cl_page_state::CPS_FREEING state (this is the only place
1067 * where transition to this state happens).
1068 *
1069 * Eliminates all venues through which new references to the page can be
1070 * obtained:
1071 *
1072 *     - removes page from the radix trees,
1073 *
1074 *     - breaks linkage from VM page to cl_page.
1075 *
1076 * Once page reaches cl_page_state::CPS_FREEING, all remaining references will
1077 * drain after some time, at which point page will be recycled.
1078 *
1079 * \pre  pg == cl_page_top(pg)
1080 * \pre  VM page is locked
1081 * \post pg->cp_state == CPS_FREEING
1082 *
1083 * \see cl_page_operations::cpo_delete()
1084 */
1085void cl_page_delete(const struct lu_env *env, struct cl_page *pg)
1086{
1087        PINVRNT(env, pg, cl_page_invariant(pg));
1088        cl_page_delete0(env, pg, 1);
1089}
1090EXPORT_SYMBOL(cl_page_delete);
1091
1092/**
1093 * Unmaps page from user virtual memory.
1094 *
1095 * Calls cl_page_operations::cpo_unmap() through all layers top-to-bottom. The
1096 * layer responsible for VM interaction has to unmap page from user space
1097 * virtual memory.
1098 *
1099 * \see cl_page_operations::cpo_unmap()
1100 */
1101int cl_page_unmap(const struct lu_env *env,
1102                  struct cl_io *io, struct cl_page *pg)
1103{
1104        PINVRNT(env, pg, cl_page_is_owned(pg, io));
1105        PINVRNT(env, pg, cl_page_invariant(pg));
1106
1107        return cl_page_invoke(env, io, pg, CL_PAGE_OP(cpo_unmap));
1108}
1109EXPORT_SYMBOL(cl_page_unmap);
1110
1111/**
1112 * Marks page up-to-date.
1113 *
1114 * Call cl_page_operations::cpo_export() through all layers top-to-bottom. The
1115 * layer responsible for VM interaction has to mark/clear page as up-to-date
1116 * by the \a uptodate argument.
1117 *
1118 * \see cl_page_operations::cpo_export()
1119 */
1120void cl_page_export(const struct lu_env *env, struct cl_page *pg, int uptodate)
1121{
1122        PINVRNT(env, pg, cl_page_invariant(pg));
1123        CL_PAGE_INVOID(env, pg, CL_PAGE_OP(cpo_export),
1124                       (const struct lu_env *,
1125                        const struct cl_page_slice *, int), uptodate);
1126}
1127EXPORT_SYMBOL(cl_page_export);
1128
1129/**
1130 * Returns true, iff \a pg is VM locked in a suitable sense by the calling
1131 * thread.
1132 */
1133int cl_page_is_vmlocked(const struct lu_env *env, const struct cl_page *pg)
1134{
1135        int result;
1136        const struct cl_page_slice *slice;
1137
1138        pg = cl_page_top_trusted((struct cl_page *)pg);
1139        slice = container_of(pg->cp_layers.next,
1140                             const struct cl_page_slice, cpl_linkage);
1141        PASSERT(env, pg, slice->cpl_ops->cpo_is_vmlocked != NULL);
1142        /*
1143         * Call ->cpo_is_vmlocked() directly instead of going through
1144         * CL_PAGE_INVOKE(), because cl_page_is_vmlocked() is used by
1145         * cl_page_invariant().
1146         */
1147        result = slice->cpl_ops->cpo_is_vmlocked(env, slice);
1148        PASSERT(env, pg, result == -EBUSY || result == -ENODATA);
1149        return result == -EBUSY;
1150}
1151EXPORT_SYMBOL(cl_page_is_vmlocked);
1152
1153static enum cl_page_state cl_req_type_state(enum cl_req_type crt)
1154{
1155        return crt == CRT_WRITE ? CPS_PAGEOUT : CPS_PAGEIN;
1156}
1157
1158static void cl_page_io_start(const struct lu_env *env,
1159                             struct cl_page *pg, enum cl_req_type crt)
1160{
1161        /*
1162         * Page is queued for IO, change its state.
1163         */
1164        cl_page_owner_clear(pg);
1165        cl_page_state_set(env, pg, cl_req_type_state(crt));
1166}
1167
1168/**
1169 * Prepares page for immediate transfer. cl_page_operations::cpo_prep() is
1170 * called top-to-bottom. Every layer either agrees to submit this page (by
1171 * returning 0), or requests to omit this page (by returning -EALREADY). Layer
1172 * handling interactions with the VM also has to inform VM that page is under
1173 * transfer now.
1174 */
1175int cl_page_prep(const struct lu_env *env, struct cl_io *io,
1176                 struct cl_page *pg, enum cl_req_type crt)
1177{
1178        int result;
1179
1180        PINVRNT(env, pg, cl_page_is_owned(pg, io));
1181        PINVRNT(env, pg, cl_page_invariant(pg));
1182        PINVRNT(env, pg, crt < CRT_NR);
1183
1184        /*
1185         * XXX this has to be called bottom-to-top, so that llite can set up
1186         * PG_writeback without risking other layers deciding to skip this
1187         * page.
1188         */
1189        if (crt >= CRT_NR)
1190                return -EINVAL;
1191        result = cl_page_invoke(env, io, pg, CL_PAGE_OP(io[crt].cpo_prep));
1192        if (result == 0)
1193                cl_page_io_start(env, pg, crt);
1194
1195        CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result);
1196        return result;
1197}
1198EXPORT_SYMBOL(cl_page_prep);
1199
1200/**
1201 * Notify layers about transfer completion.
1202 *
1203 * Invoked by transfer sub-system (which is a part of osc) to notify layers
1204 * that a transfer, of which this page is a part of has completed.
1205 *
1206 * Completion call-backs are executed in the bottom-up order, so that
1207 * uppermost layer (llite), responsible for the VFS/VM interaction runs last
1208 * and can release locks safely.
1209 *
1210 * \pre  pg->cp_state == CPS_PAGEIN || pg->cp_state == CPS_PAGEOUT
1211 * \post pg->cp_state == CPS_CACHED
1212 *
1213 * \see cl_page_operations::cpo_completion()
1214 */
1215void cl_page_completion(const struct lu_env *env,
1216                        struct cl_page *pg, enum cl_req_type crt, int ioret)
1217{
1218        struct cl_sync_io *anchor = pg->cp_sync_io;
1219
1220        PASSERT(env, pg, crt < CRT_NR);
1221        /* cl_page::cp_req already cleared by the caller (osc_completion()) */
1222        PASSERT(env, pg, pg->cp_req == NULL);
1223        PASSERT(env, pg, pg->cp_state == cl_req_type_state(crt));
1224
1225        CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, ioret);
1226        if (crt == CRT_READ && ioret == 0) {
1227                PASSERT(env, pg, !(pg->cp_flags & CPF_READ_COMPLETED));
1228                pg->cp_flags |= CPF_READ_COMPLETED;
1229        }
1230
1231        cl_page_state_set(env, pg, CPS_CACHED);
1232        if (crt >= CRT_NR)
1233                return;
1234        CL_PAGE_INVOID_REVERSE(env, pg, CL_PAGE_OP(io[crt].cpo_completion),
1235                               (const struct lu_env *,
1236                                const struct cl_page_slice *, int), ioret);
1237        if (anchor) {
1238                LASSERT(cl_page_is_vmlocked(env, pg));
1239                LASSERT(pg->cp_sync_io == anchor);
1240                pg->cp_sync_io = NULL;
1241        }
1242        /*
1243         * As page->cp_obj is pinned by a reference from page->cp_req, it is
1244         * safe to call cl_page_put() without risking object destruction in a
1245         * non-blocking context.
1246         */
1247        cl_page_put(env, pg);
1248
1249        if (anchor)
1250                cl_sync_io_note(anchor, ioret);
1251}
1252EXPORT_SYMBOL(cl_page_completion);
1253
1254/**
1255 * Notify layers that transfer formation engine decided to yank this page from
1256 * the cache and to make it a part of a transfer.
1257 *
1258 * \pre  pg->cp_state == CPS_CACHED
1259 * \post pg->cp_state == CPS_PAGEIN || pg->cp_state == CPS_PAGEOUT
1260 *
1261 * \see cl_page_operations::cpo_make_ready()
1262 */
1263int cl_page_make_ready(const struct lu_env *env, struct cl_page *pg,
1264                       enum cl_req_type crt)
1265{
1266        int result;
1267
1268        PINVRNT(env, pg, crt < CRT_NR);
1269
1270        if (crt >= CRT_NR)
1271                return -EINVAL;
1272        result = CL_PAGE_INVOKE(env, pg, CL_PAGE_OP(io[crt].cpo_make_ready),
1273                                (const struct lu_env *,
1274                                 const struct cl_page_slice *));
1275        if (result == 0) {
1276                PASSERT(env, pg, pg->cp_state == CPS_CACHED);
1277                cl_page_io_start(env, pg, crt);
1278        }
1279        CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result);
1280        return result;
1281}
1282EXPORT_SYMBOL(cl_page_make_ready);
1283
1284/**
1285 * Notify layers that high level io decided to place this page into a cache
1286 * for future transfer.
1287 *
1288 * The layer implementing transfer engine (osc) has to register this page in
1289 * its queues.
1290 *
1291 * \pre  cl_page_is_owned(pg, io)
1292 * \post cl_page_is_owned(pg, io)
1293 *
1294 * \see cl_page_operations::cpo_cache_add()
1295 */
1296int cl_page_cache_add(const struct lu_env *env, struct cl_io *io,
1297                      struct cl_page *pg, enum cl_req_type crt)
1298{
1299        const struct cl_page_slice *scan;
1300        int result = 0;
1301
1302        PINVRNT(env, pg, crt < CRT_NR);
1303        PINVRNT(env, pg, cl_page_is_owned(pg, io));
1304        PINVRNT(env, pg, cl_page_invariant(pg));
1305
1306        if (crt >= CRT_NR)
1307                return -EINVAL;
1308
1309        list_for_each_entry(scan, &pg->cp_layers, cpl_linkage) {
1310                if (scan->cpl_ops->io[crt].cpo_cache_add == NULL)
1311                        continue;
1312
1313                result = scan->cpl_ops->io[crt].cpo_cache_add(env, scan, io);
1314                if (result != 0)
1315                        break;
1316        }
1317        CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result);
1318        return result;
1319}
1320EXPORT_SYMBOL(cl_page_cache_add);
1321
1322/**
1323 * Called if a pge is being written back by kernel's intention.
1324 *
1325 * \pre  cl_page_is_owned(pg, io)
1326 * \post ergo(result == 0, pg->cp_state == CPS_PAGEOUT)
1327 *
1328 * \see cl_page_operations::cpo_flush()
1329 */
1330int cl_page_flush(const struct lu_env *env, struct cl_io *io,
1331                  struct cl_page *pg)
1332{
1333        int result;
1334
1335        PINVRNT(env, pg, cl_page_is_owned(pg, io));
1336        PINVRNT(env, pg, cl_page_invariant(pg));
1337
1338        result = cl_page_invoke(env, io, pg, CL_PAGE_OP(cpo_flush));
1339
1340        CL_PAGE_HEADER(D_TRACE, env, pg, "%d\n", result);
1341        return result;
1342}
1343EXPORT_SYMBOL(cl_page_flush);
1344
1345/**
1346 * Checks whether page is protected by any extent lock is at least required
1347 * mode.
1348 *
1349 * \return the same as in cl_page_operations::cpo_is_under_lock() method.
1350 * \see cl_page_operations::cpo_is_under_lock()
1351 */
1352int cl_page_is_under_lock(const struct lu_env *env, struct cl_io *io,
1353                          struct cl_page *page)
1354{
1355        int rc;
1356
1357        PINVRNT(env, page, cl_page_invariant(page));
1358
1359        rc = CL_PAGE_INVOKE(env, page, CL_PAGE_OP(cpo_is_under_lock),
1360                            (const struct lu_env *,
1361                             const struct cl_page_slice *, struct cl_io *),
1362                            io);
1363        PASSERT(env, page, rc != 0);
1364        return rc;
1365}
1366EXPORT_SYMBOL(cl_page_is_under_lock);
1367
1368static int page_prune_cb(const struct lu_env *env, struct cl_io *io,
1369                         struct cl_page *page, void *cbdata)
1370{
1371        cl_page_own(env, io, page);
1372        cl_page_unmap(env, io, page);
1373        cl_page_discard(env, io, page);
1374        cl_page_disown(env, io, page);
1375        return CLP_GANG_OKAY;
1376}
1377
1378/**
1379 * Purges all cached pages belonging to the object \a obj.
1380 */
1381int cl_pages_prune(const struct lu_env *env, struct cl_object *clobj)
1382{
1383        struct cl_thread_info   *info;
1384        struct cl_object        *obj = cl_object_top(clobj);
1385        struct cl_io        *io;
1386        int                   result;
1387
1388        info  = cl_env_info(env);
1389        io    = &info->clt_io;
1390
1391        /*
1392         * initialize the io. This is ugly since we never do IO in this
1393         * function, we just make cl_page_list functions happy. -jay
1394         */
1395        io->ci_obj = obj;
1396        io->ci_ignore_layout = 1;
1397        result = cl_io_init(env, io, CIT_MISC, obj);
1398        if (result != 0) {
1399                cl_io_fini(env, io);
1400                return io->ci_result;
1401        }
1402
1403        do {
1404                result = cl_page_gang_lookup(env, obj, io, 0, CL_PAGE_EOF,
1405                                             page_prune_cb, NULL);
1406                if (result == CLP_GANG_RESCHED)
1407                        cond_resched();
1408        } while (result != CLP_GANG_OKAY);
1409
1410        cl_io_fini(env, io);
1411        return result;
1412}
1413EXPORT_SYMBOL(cl_pages_prune);
1414
1415/**
1416 * Tells transfer engine that only part of a page is to be transmitted.
1417 *
1418 * \see cl_page_operations::cpo_clip()
1419 */
1420void cl_page_clip(const struct lu_env *env, struct cl_page *pg,
1421                  int from, int to)
1422{
1423        PINVRNT(env, pg, cl_page_invariant(pg));
1424
1425        CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", from, to);
1426        CL_PAGE_INVOID(env, pg, CL_PAGE_OP(cpo_clip),
1427                       (const struct lu_env *,
1428                        const struct cl_page_slice *, int, int),
1429                       from, to);
1430}
1431EXPORT_SYMBOL(cl_page_clip);
1432
1433/**
1434 * Prints human readable representation of \a pg to the \a f.
1435 */
1436void cl_page_header_print(const struct lu_env *env, void *cookie,
1437                          lu_printer_t printer, const struct cl_page *pg)
1438{
1439        (*printer)(env, cookie,
1440                   "page@%p[%d %p:%lu ^%p_%p %d %d %d %p %p %#x]\n",
1441                   pg, atomic_read(&pg->cp_ref), pg->cp_obj,
1442                   pg->cp_index, pg->cp_parent, pg->cp_child,
1443                   pg->cp_state, pg->cp_error, pg->cp_type,
1444                   pg->cp_owner, pg->cp_req, pg->cp_flags);
1445}
1446EXPORT_SYMBOL(cl_page_header_print);
1447
1448/**
1449 * Prints human readable representation of \a pg to the \a f.
1450 */
1451void cl_page_print(const struct lu_env *env, void *cookie,
1452                   lu_printer_t printer, const struct cl_page *pg)
1453{
1454        struct cl_page *scan;
1455
1456        for (scan = cl_page_top((struct cl_page *)pg);
1457             scan != NULL; scan = scan->cp_child)
1458                cl_page_header_print(env, cookie, printer, scan);
1459        CL_PAGE_INVOKE(env, (struct cl_page *)pg, CL_PAGE_OP(cpo_print),
1460                       (const struct lu_env *env,
1461                        const struct cl_page_slice *slice,
1462                        void *cookie, lu_printer_t p), cookie, printer);
1463        (*printer)(env, cookie, "end page@%p\n", pg);
1464}
1465EXPORT_SYMBOL(cl_page_print);
1466
1467/**
1468 * Cancel a page which is still in a transfer.
1469 */
1470int cl_page_cancel(const struct lu_env *env, struct cl_page *page)
1471{
1472        return CL_PAGE_INVOKE(env, page, CL_PAGE_OP(cpo_cancel),
1473                              (const struct lu_env *,
1474                               const struct cl_page_slice *));
1475}
1476EXPORT_SYMBOL(cl_page_cancel);
1477
1478/**
1479 * Converts a byte offset within object \a obj into a page index.
1480 */
1481loff_t cl_offset(const struct cl_object *obj, pgoff_t idx)
1482{
1483        /*
1484         * XXX for now.
1485         */
1486        return (loff_t)idx << PAGE_CACHE_SHIFT;
1487}
1488EXPORT_SYMBOL(cl_offset);
1489
1490/**
1491 * Converts a page index into a byte offset within object \a obj.
1492 */
1493pgoff_t cl_index(const struct cl_object *obj, loff_t offset)
1494{
1495        /*
1496         * XXX for now.
1497         */
1498        return offset >> PAGE_CACHE_SHIFT;
1499}
1500EXPORT_SYMBOL(cl_index);
1501
1502int cl_page_size(const struct cl_object *obj)
1503{
1504        return 1 << PAGE_CACHE_SHIFT;
1505}
1506EXPORT_SYMBOL(cl_page_size);
1507
1508/**
1509 * Adds page slice to the compound page.
1510 *
1511 * This is called by cl_object_operations::coo_page_init() methods to add a
1512 * per-layer state to the page. New state is added at the end of
1513 * cl_page::cp_layers list, that is, it is at the bottom of the stack.
1514 *
1515 * \see cl_lock_slice_add(), cl_req_slice_add(), cl_io_slice_add()
1516 */
1517void cl_page_slice_add(struct cl_page *page, struct cl_page_slice *slice,
1518                       struct cl_object *obj,
1519                       const struct cl_page_operations *ops)
1520{
1521        list_add_tail(&slice->cpl_linkage, &page->cp_layers);
1522        slice->cpl_obj  = obj;
1523        slice->cpl_ops  = ops;
1524        slice->cpl_page = page;
1525}
1526EXPORT_SYMBOL(cl_page_slice_add);
1527
1528int  cl_page_init(void)
1529{
1530        return 0;
1531}
1532
1533void cl_page_fini(void)
1534{
1535}
1536