linux/drivers/staging/lustre/lustre/obdclass/cl_page.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * Client Lustre Page.
  37 *
  38 *   Author: Nikita Danilov <nikita.danilov@sun.com>
  39 */
  40
  41#define DEBUG_SUBSYSTEM S_CLASS
  42
  43#include <linux/libcfs/libcfs.h>
  44#include <obd_class.h>
  45#include <obd_support.h>
  46#include <linux/list.h>
  47
  48#include <cl_object.h>
  49#include "cl_internal.h"
  50
  51static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg,
  52                            int radix);
  53
  54# define PASSERT(env, page, expr)                                      \
  55  do {                                                              \
  56          if (unlikely(!(expr))) {                                    \
  57                  CL_PAGE_DEBUG(D_ERROR, (env), (page), #expr "\n");    \
  58                  LASSERT(0);                                      \
  59          }                                                          \
  60  } while (0)
  61
  62# define PINVRNT(env, page, exp) \
  63        ((void)sizeof(env), (void)sizeof(page), (void)sizeof !!(exp))
  64
  65/* Disable page statistic by default due to huge performance penalty. */
  66#define CS_PAGE_INC(o, item)
  67#define CS_PAGE_DEC(o, item)
  68#define CS_PAGESTATE_INC(o, state)
  69#define CS_PAGESTATE_DEC(o, state)
  70
  71/**
  72 * Internal version of cl_page_top, it should be called if the page is
  73 * known to be not freed, says with page referenced, or radix tree lock held,
  74 * or page owned.
  75 */
  76static struct cl_page *cl_page_top_trusted(struct cl_page *page)
  77{
  78        while (page->cp_parent != NULL)
  79                page = page->cp_parent;
  80        return page;
  81}
  82
  83/**
  84 * Internal version of cl_page_get().
  85 *
  86 * This function can be used to obtain initial reference to previously
  87 * unreferenced cached object. It can be called only if concurrent page
  88 * reclamation is somehow prevented, e.g., by locking page radix-tree
  89 * (cl_object_header::hdr->coh_page_guard), or by keeping a lock on a VM page,
  90 * associated with \a page.
  91 *
  92 * Use with care! Not exported.
  93 */
  94static void cl_page_get_trust(struct cl_page *page)
  95{
  96        LASSERT(atomic_read(&page->cp_ref) > 0);
  97        atomic_inc(&page->cp_ref);
  98}
  99
 100/**
 101 * Returns a slice within a page, corresponding to the given layer in the
 102 * device stack.
 103 *
 104 * \see cl_lock_at()
 105 */
 106static const struct cl_page_slice *
 107cl_page_at_trusted(const struct cl_page *page,
 108                   const struct lu_device_type *dtype)
 109{
 110        const struct cl_page_slice *slice;
 111        ENTRY;
 112
 113        page = cl_page_top_trusted((struct cl_page *)page);
 114        do {
 115                list_for_each_entry(slice, &page->cp_layers, cpl_linkage) {
 116                        if (slice->cpl_obj->co_lu.lo_dev->ld_type == dtype)
 117                                RETURN(slice);
 118                }
 119                page = page->cp_child;
 120        } while (page != NULL);
 121        RETURN(NULL);
 122}
 123
 124/**
 125 * Returns a page with given index in the given object, or NULL if no page is
 126 * found. Acquires a reference on \a page.
 127 *
 128 * Locking: called under cl_object_header::coh_page_guard spin-lock.
 129 */
 130struct cl_page *cl_page_lookup(struct cl_object_header *hdr, pgoff_t index)
 131{
 132        struct cl_page *page;
 133
 134        LASSERT(spin_is_locked(&hdr->coh_page_guard));
 135
 136        page = radix_tree_lookup(&hdr->coh_tree, index);
 137        if (page != NULL)
 138                cl_page_get_trust(page);
 139        return page;
 140}
 141EXPORT_SYMBOL(cl_page_lookup);
 142
 143/**
 144 * Returns a list of pages by a given [start, end] of \a obj.
 145 *
 146 * \param resched If not NULL, then we give up before hogging CPU for too
 147 * long and set *resched = 1, in that case caller should implement a retry
 148 * logic.
 149 *
 150 * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
 151 * crucial in the face of [offset, EOF] locks.
 152 *
 153 * Return at least one page in @queue unless there is no covered page.
 154 */
 155int cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
 156                        struct cl_io *io, pgoff_t start, pgoff_t end,
 157                        cl_page_gang_cb_t cb, void *cbdata)
 158{
 159        struct cl_object_header *hdr;
 160        struct cl_page    *page;
 161        struct cl_page   **pvec;
 162        const struct cl_page_slice  *slice;
 163        const struct lu_device_type *dtype;
 164        pgoff_t           idx;
 165        unsigned int         nr;
 166        unsigned int         i;
 167        unsigned int         j;
 168        int                   res = CLP_GANG_OKAY;
 169        int                   tree_lock = 1;
 170        ENTRY;
 171
 172        idx = start;
 173        hdr = cl_object_header(obj);
 174        pvec = cl_env_info(env)->clt_pvec;
 175        dtype = cl_object_top(obj)->co_lu.lo_dev->ld_type;
 176        spin_lock(&hdr->coh_page_guard);
 177        while ((nr = radix_tree_gang_lookup(&hdr->coh_tree, (void **)pvec,
 178                                            idx, CLT_PVEC_SIZE)) > 0) {
 179                int end_of_region = 0;
 180                idx = pvec[nr - 1]->cp_index + 1;
 181                for (i = 0, j = 0; i < nr; ++i) {
 182                        page = pvec[i];
 183                        pvec[i] = NULL;
 184
 185                        LASSERT(page->cp_type == CPT_CACHEABLE);
 186                        if (page->cp_index > end) {
 187                                end_of_region = 1;
 188                                break;
 189                        }
 190                        if (page->cp_state == CPS_FREEING)
 191                                continue;
 192
 193                        slice = cl_page_at_trusted(page, dtype);
 194                        /*
 195                         * Pages for lsm-less file has no underneath sub-page
 196                         * for osc, in case of ...
 197                         */
 198                        PASSERT(env, page, slice != NULL);
 199
 200                        page = slice->cpl_page;
 201                        /*
 202                         * Can safely call cl_page_get_trust() under
 203                         * radix-tree spin-lock.
 204                         *
 205                         * XXX not true, because @page is from object another
 206                         * than @hdr and protected by different tree lock.
 207                         */
 208                        cl_page_get_trust(page);
 209                        lu_ref_add_atomic(&page->cp_reference,
 210                                          "gang_lookup", current);
 211                        pvec[j++] = page;
 212                }
 213
 214                /*
 215                 * Here a delicate locking dance is performed. Current thread
 216                 * holds a reference to a page, but has to own it before it
 217                 * can be placed into queue. Owning implies waiting, so
 218                 * radix-tree lock is to be released. After a wait one has to
 219                 * check that pages weren't truncated (cl_page_own() returns
 220                 * error in the latter case).
 221                 */
 222                spin_unlock(&hdr->coh_page_guard);
 223                tree_lock = 0;
 224
 225                for (i = 0; i < j; ++i) {
 226                        page = pvec[i];
 227                        if (res == CLP_GANG_OKAY)
 228                                res = (*cb)(env, io, page, cbdata);
 229                        lu_ref_del(&page->cp_reference,
 230                                   "gang_lookup", current);
 231                        cl_page_put(env, page);
 232                }
 233                if (nr < CLT_PVEC_SIZE || end_of_region)
 234                        break;
 235
 236                if (res == CLP_GANG_OKAY && need_resched())
 237                        res = CLP_GANG_RESCHED;
 238                if (res != CLP_GANG_OKAY)
 239                        break;
 240
 241                spin_lock(&hdr->coh_page_guard);
 242                tree_lock = 1;
 243        }
 244        if (tree_lock)
 245                spin_unlock(&hdr->coh_page_guard);
 246        RETURN(res);
 247}
 248EXPORT_SYMBOL(cl_page_gang_lookup);
 249
 250static void cl_page_free(const struct lu_env *env, struct cl_page *page)
 251{
 252        struct cl_object *obj  = page->cp_obj;
 253        int pagesize = cl_object_header(obj)->coh_page_bufsize;
 254
 255        PASSERT(env, page, list_empty(&page->cp_batch));
 256        PASSERT(env, page, page->cp_owner == NULL);
 257        PASSERT(env, page, page->cp_req == NULL);
 258        PASSERT(env, page, page->cp_parent == NULL);
 259        PASSERT(env, page, page->cp_state == CPS_FREEING);
 260
 261        ENTRY;
 262        might_sleep();
 263        while (!list_empty(&page->cp_layers)) {
 264                struct cl_page_slice *slice;
 265
 266                slice = list_entry(page->cp_layers.next,
 267                                       struct cl_page_slice, cpl_linkage);
 268                list_del_init(page->cp_layers.next);
 269                slice->cpl_ops->cpo_fini(env, slice);
 270        }
 271        CS_PAGE_DEC(obj, total);
 272        CS_PAGESTATE_DEC(obj, page->cp_state);
 273        lu_object_ref_del_at(&obj->co_lu, page->cp_obj_ref, "cl_page", page);
 274        cl_object_put(env, obj);
 275        lu_ref_fini(&page->cp_reference);
 276        OBD_FREE(page, pagesize);
 277        EXIT;
 278}
 279
 280/**
 281 * Helper function updating page state. This is the only place in the code
 282 * where cl_page::cp_state field is mutated.
 283 */
 284static inline void cl_page_state_set_trust(struct cl_page *page,
 285                                           enum cl_page_state state)
 286{
 287        /* bypass const. */
 288        *(enum cl_page_state *)&page->cp_state = state;
 289}
 290
 291static struct cl_page *cl_page_alloc(const struct lu_env *env,
 292                struct cl_object *o, pgoff_t ind, struct page *vmpage,
 293                enum cl_page_type type)
 294{
 295        struct cl_page    *page;
 296        struct lu_object_header *head;
 297
 298        ENTRY;
 299        OBD_ALLOC_GFP(page, cl_object_header(o)->coh_page_bufsize,
 300                        __GFP_IO);
 301        if (page != NULL) {
 302                int result = 0;
 303                atomic_set(&page->cp_ref, 1);
 304                if (type == CPT_CACHEABLE) /* for radix tree */
 305                        atomic_inc(&page->cp_ref);
 306                page->cp_obj = o;
 307                cl_object_get(o);
 308                page->cp_obj_ref = lu_object_ref_add(&o->co_lu, "cl_page",page);
 309                page->cp_index = ind;
 310                cl_page_state_set_trust(page, CPS_CACHED);
 311                page->cp_type = type;
 312                INIT_LIST_HEAD(&page->cp_layers);
 313                INIT_LIST_HEAD(&page->cp_batch);
 314                INIT_LIST_HEAD(&page->cp_flight);
 315                mutex_init(&page->cp_mutex);
 316                lu_ref_init(&page->cp_reference);
 317                head = o->co_lu.lo_header;
 318                list_for_each_entry(o, &head->loh_layers,
 319                                        co_lu.lo_linkage) {
 320                        if (o->co_ops->coo_page_init != NULL) {
 321                                result = o->co_ops->coo_page_init(env, o,
 322                                                                  page, vmpage);
 323                                if (result != 0) {
 324                                        cl_page_delete0(env, page, 0);
 325                                        cl_page_free(env, page);
 326                                        page = ERR_PTR(result);
 327                                        break;
 328                                }
 329                        }
 330                }
 331                if (result == 0) {
 332                        CS_PAGE_INC(o, total);
 333                        CS_PAGE_INC(o, create);
 334                        CS_PAGESTATE_DEC(o, CPS_CACHED);
 335                }
 336        } else {
 337                page = ERR_PTR(-ENOMEM);
 338        }
 339        RETURN(page);
 340}
 341
 342/**
 343 * Returns a cl_page with index \a idx at the object \a o, and associated with
 344 * the VM page \a vmpage.
 345 *
 346 * This is the main entry point into the cl_page caching interface. First, a
 347 * cache (implemented as a per-object radix tree) is consulted. If page is
 348 * found there, it is returned immediately. Otherwise new page is allocated
 349 * and returned. In any case, additional reference to page is acquired.
 350 *
 351 * \see cl_object_find(), cl_lock_find()
 352 */
 353static struct cl_page *cl_page_find0(const struct lu_env *env,
 354                                     struct cl_object *o,
 355                                     pgoff_t idx, struct page *vmpage,
 356                                     enum cl_page_type type,
 357                                     struct cl_page *parent)
 358{
 359        struct cl_page    *page = NULL;
 360        struct cl_page    *ghost = NULL;
 361        struct cl_object_header *hdr;
 362        int err;
 363
 364        LASSERT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
 365        might_sleep();
 366
 367        ENTRY;
 368
 369        hdr = cl_object_header(o);
 370        CS_PAGE_INC(o, lookup);
 371
 372        CDEBUG(D_PAGE, "%lu@"DFID" %p %lx %d\n",
 373               idx, PFID(&hdr->coh_lu.loh_fid), vmpage, vmpage->private, type);
 374        /* fast path. */
 375        if (type == CPT_CACHEABLE) {
 376                /* vmpage lock is used to protect the child/parent
 377                 * relationship */
 378                KLASSERT(PageLocked(vmpage));
 379                /*
 380                 * cl_vmpage_page() can be called here without any locks as
 381                 *
 382                 *     - "vmpage" is locked (which prevents ->private from
 383                 *       concurrent updates), and
 384                 *
 385                 *     - "o" cannot be destroyed while current thread holds a
 386                 *       reference on it.
 387                 */
 388                page = cl_vmpage_page(vmpage, o);
 389                PINVRNT(env, page,
 390                        ergo(page != NULL,
 391                             cl_page_vmpage(env, page) == vmpage &&
 392                             (void *)radix_tree_lookup(&hdr->coh_tree,
 393                                                       idx) == page));
 394        }
 395
 396        if (page != NULL) {
 397                CS_PAGE_INC(o, hit);
 398                RETURN(page);
 399        }
 400
 401        /* allocate and initialize cl_page */
 402        page = cl_page_alloc(env, o, idx, vmpage, type);
 403        if (IS_ERR(page))
 404                RETURN(page);
 405
 406        if (type == CPT_TRANSIENT) {
 407                if (parent) {
 408                        LASSERT(page->cp_parent == NULL);
 409                        page->cp_parent = parent;
 410                        parent->cp_child = page;
 411                }
 412                RETURN(page);
 413        }
 414
 415        /*
 416         * XXX optimization: use radix_tree_preload() here, and change tree
 417         * gfp mask to GFP_KERNEL in cl_object_header_init().
 418         */
 419        spin_lock(&hdr->coh_page_guard);
 420        err = radix_tree_insert(&hdr->coh_tree, idx, page);
 421        if (err != 0) {
 422                ghost = page;
 423                /*
 424                 * Noted by Jay: a lock on \a vmpage protects cl_page_find()
 425                 * from this race, but
 426                 *
 427                 *     0. it's better to have cl_page interface "locally
 428                 *     consistent" so that its correctness can be reasoned
 429                 *     about without appealing to the (obscure world of) VM
 430                 *     locking.
 431                 *
 432                 *     1. handling this race allows ->coh_tree to remain
 433                 *     consistent even when VM locking is somehow busted,
 434                 *     which is very useful during diagnosing and debugging.
 435                 */
 436                page = ERR_PTR(err);
 437                CL_PAGE_DEBUG(D_ERROR, env, ghost,
 438                              "fail to insert into radix tree: %d\n", err);
 439        } else {
 440                if (parent) {
 441                        LASSERT(page->cp_parent == NULL);
 442                        page->cp_parent = parent;
 443                        parent->cp_child = page;
 444                }
 445                hdr->coh_pages++;
 446        }
 447        spin_unlock(&hdr->coh_page_guard);
 448
 449        if (unlikely(ghost != NULL)) {
 450                cl_page_delete0(env, ghost, 0);
 451                cl_page_free(env, ghost);
 452        }
 453        RETURN(page);
 454}
 455
 456struct cl_page *cl_page_find(const struct lu_env *env, struct cl_object *o,
 457                             pgoff_t idx, struct page *vmpage,
 458                             enum cl_page_type type)
 459{
 460        return cl_page_find0(env, o, idx, vmpage, type, NULL);
 461}
 462EXPORT_SYMBOL(cl_page_find);
 463
 464
 465struct cl_page *cl_page_find_sub(const struct lu_env *env, struct cl_object *o,
 466                                 pgoff_t idx, struct page *vmpage,
 467                                 struct cl_page *parent)
 468{
 469        return cl_page_find0(env, o, idx, vmpage, parent->cp_type, parent);
 470}
 471EXPORT_SYMBOL(cl_page_find_sub);
 472
 473static inline int cl_page_invariant(const struct cl_page *pg)
 474{
 475        struct cl_object_header *header;
 476        struct cl_page    *parent;
 477        struct cl_page    *child;
 478        struct cl_io        *owner;
 479
 480        /*
 481         * Page invariant is protected by a VM lock.
 482         */
 483        LINVRNT(cl_page_is_vmlocked(NULL, pg));
 484
 485        header = cl_object_header(pg->cp_obj);
 486        parent = pg->cp_parent;
 487        child  = pg->cp_child;
 488        owner  = pg->cp_owner;
 489
 490        return cl_page_in_use(pg) &&
 491                ergo(parent != NULL, parent->cp_child == pg) &&
 492                ergo(child != NULL, child->cp_parent == pg) &&
 493                ergo(child != NULL, pg->cp_obj != child->cp_obj) &&
 494                ergo(parent != NULL, pg->cp_obj != parent->cp_obj) &&
 495                ergo(owner != NULL && parent != NULL,
 496                     parent->cp_owner == pg->cp_owner->ci_parent) &&
 497                ergo(owner != NULL && child != NULL,
 498                     child->cp_owner->ci_parent == owner) &&
 499                /*
 500                 * Either page is early in initialization (has neither child
 501                 * nor parent yet), or it is in the object radix tree.
 502                 */
 503                ergo(pg->cp_state < CPS_FREEING && pg->cp_type == CPT_CACHEABLE,
 504                     (void *)radix_tree_lookup(&header->coh_tree,
 505                                               pg->cp_index) == pg ||
 506                     (child == NULL && parent == NULL));
 507}
 508
 509static void cl_page_state_set0(const struct lu_env *env,
 510                               struct cl_page *page, enum cl_page_state state)
 511{
 512        enum cl_page_state old;
 513
 514        /*
 515         * Matrix of allowed state transitions [old][new], for sanity
 516         * checking.
 517         */
 518        static const int allowed_transitions[CPS_NR][CPS_NR] = {
 519                [CPS_CACHED] = {
 520                        [CPS_CACHED]  = 0,
 521                        [CPS_OWNED]   = 1, /* io finds existing cached page */
 522                        [CPS_PAGEIN]  = 0,
 523                        [CPS_PAGEOUT] = 1, /* write-out from the cache */
 524                        [CPS_FREEING] = 1, /* eviction on the memory pressure */
 525                },
 526                [CPS_OWNED] = {
 527                        [CPS_CACHED]  = 1, /* release to the cache */
 528                        [CPS_OWNED]   = 0,
 529                        [CPS_PAGEIN]  = 1, /* start read immediately */
 530                        [CPS_PAGEOUT] = 1, /* start write immediately */
 531                        [CPS_FREEING] = 1, /* lock invalidation or truncate */
 532                },
 533                [CPS_PAGEIN] = {
 534                        [CPS_CACHED]  = 1, /* io completion */
 535                        [CPS_OWNED]   = 0,
 536                        [CPS_PAGEIN]  = 0,
 537                        [CPS_PAGEOUT] = 0,
 538                        [CPS_FREEING] = 0,
 539                },
 540                [CPS_PAGEOUT] = {
 541                        [CPS_CACHED]  = 1, /* io completion */
 542                        [CPS_OWNED]   = 0,
 543                        [CPS_PAGEIN]  = 0,
 544                        [CPS_PAGEOUT] = 0,
 545                        [CPS_FREEING] = 0,
 546                },
 547                [CPS_FREEING] = {
 548                        [CPS_CACHED]  = 0,
 549                        [CPS_OWNED]   = 0,
 550                        [CPS_PAGEIN]  = 0,
 551                        [CPS_PAGEOUT] = 0,
 552                        [CPS_FREEING] = 0,
 553                }
 554        };
 555
 556        ENTRY;
 557        old = page->cp_state;
 558        PASSERT(env, page, allowed_transitions[old][state]);
 559        CL_PAGE_HEADER(D_TRACE, env, page, "%d -> %d\n", old, state);
 560        for (; page != NULL; page = page->cp_child) {
 561                PASSERT(env, page, page->cp_state == old);
 562                PASSERT(env, page,
 563                        equi(state == CPS_OWNED, page->cp_owner != NULL));
 564
 565                CS_PAGESTATE_DEC(page->cp_obj, page->cp_state);
 566                CS_PAGESTATE_INC(page->cp_obj, state);
 567                cl_page_state_set_trust(page, state);
 568        }
 569        EXIT;
 570}
 571
 572static void cl_page_state_set(const struct lu_env *env,
 573                              struct cl_page *page, enum cl_page_state state)
 574{
 575        cl_page_state_set0(env, page, state);
 576}
 577
 578/**
 579 * Acquires an additional reference to a page.
 580 *
 581 * This can be called only by caller already possessing a reference to \a
 582 * page.
 583 *
 584 * \see cl_object_get(), cl_lock_get().
 585 */
 586void cl_page_get(struct cl_page *page)
 587{
 588        ENTRY;
 589        cl_page_get_trust(page);
 590        EXIT;
 591}
 592EXPORT_SYMBOL(cl_page_get);
 593
 594/**
 595 * Releases a reference to a page.
 596 *
 597 * When last reference is released, page is returned to the cache, unless it
 598 * is in cl_page_state::CPS_FREEING state, in which case it is immediately
 599 * destroyed.
 600 *
 601 * \see cl_object_put(), cl_lock_put().
 602 */
 603void cl_page_put(const struct lu_env *env, struct cl_page *page)
 604{
 605        PASSERT(env, page, atomic_read(&page->cp_ref) > !!page->cp_parent);
 606
 607        ENTRY;
 608        CL_PAGE_HEADER(D_TRACE, env, page, "%d\n",
 609                       atomic_read(&page->cp_ref));
 610
 611        if (atomic_dec_and_test(&page->cp_ref)) {
 612                LASSERT(page->cp_state == CPS_FREEING);
 613
 614                LASSERT(atomic_read(&page->cp_ref) == 0);
 615                PASSERT(env, page, page->cp_owner == NULL);
 616                PASSERT(env, page, list_empty(&page->cp_batch));
 617                /*
 618                 * Page is no longer reachable by other threads. Tear
 619                 * it down.
 620                 */
 621                cl_page_free(env, page);
 622        }
 623
 624        EXIT;
 625}
 626EXPORT_SYMBOL(cl_page_put);
 627
 628/**
 629 * Returns a VM page associated with a given cl_page.
 630 */
 631struct page *cl_page_vmpage(const struct lu_env *env, struct cl_page *page)
 632{
 633        const struct cl_page_slice *slice;
 634
 635        /*
 636         * Find uppermost layer with ->cpo_vmpage() method, and return its
 637         * result.
 638         */
 639        page = cl_page_top(page);
 640        do {
 641                list_for_each_entry(slice, &page->cp_layers, cpl_linkage) {
 642                        if (slice->cpl_ops->cpo_vmpage != NULL)
 643                                RETURN(slice->cpl_ops->cpo_vmpage(env, slice));
 644                }
 645                page = page->cp_child;
 646        } while (page != NULL);
 647        LBUG(); /* ->cpo_vmpage() has to be defined somewhere in the stack */
 648}
 649EXPORT_SYMBOL(cl_page_vmpage);
 650
 651/**
 652 * Returns a cl_page associated with a VM page, and given cl_object.
 653 */
 654struct cl_page *cl_vmpage_page(struct page *vmpage, struct cl_object *obj)
 655{
 656        struct cl_page *top;
 657        struct cl_page *page;
 658
 659        ENTRY;
 660        KLASSERT(PageLocked(vmpage));
 661
 662        /*
 663         * NOTE: absence of races and liveness of data are guaranteed by page
 664         *       lock on a "vmpage". That works because object destruction has
 665         *       bottom-to-top pass.
 666         */
 667
 668        /*
 669         * This loop assumes that ->private points to the top-most page. This
 670         * can be rectified easily.
 671         */
 672        top = (struct cl_page *)vmpage->private;
 673        if (top == NULL)
 674                RETURN(NULL);
 675
 676        for (page = top; page != NULL; page = page->cp_child) {
 677                if (cl_object_same(page->cp_obj, obj)) {
 678                        cl_page_get_trust(page);
 679                        break;
 680                }
 681        }
 682        LASSERT(ergo(page, page->cp_type == CPT_CACHEABLE));
 683        RETURN(page);
 684}
 685EXPORT_SYMBOL(cl_vmpage_page);
 686
 687/**
 688 * Returns the top-page for a given page.
 689 *
 690 * \see cl_object_top(), cl_io_top()
 691 */
 692struct cl_page *cl_page_top(struct cl_page *page)
 693{
 694        return cl_page_top_trusted(page);
 695}
 696EXPORT_SYMBOL(cl_page_top);
 697
 698const struct cl_page_slice *cl_page_at(const struct cl_page *page,
 699                                       const struct lu_device_type *dtype)
 700{
 701        return cl_page_at_trusted(page, dtype);
 702}
 703EXPORT_SYMBOL(cl_page_at);
 704
 705#define CL_PAGE_OP(opname) offsetof(struct cl_page_operations, opname)
 706
 707#define CL_PAGE_INVOKE(_env, _page, _op, _proto, ...)              \
 708({                                                                    \
 709        const struct lu_env     *__env  = (_env);                   \
 710        struct cl_page       *__page = (_page);            \
 711        const struct cl_page_slice *__scan;                          \
 712        int                      __result;                         \
 713        ptrdiff_t                  __op   = (_op);                   \
 714        int                    (*__method)_proto;                   \
 715                                                                        \
 716        __result = 0;                                              \
 717        __page = cl_page_top(__page);                              \
 718        do {                                                        \
 719                list_for_each_entry(__scan, &__page->cp_layers,     \
 720                                        cpl_linkage) {            \
 721                        __method = *(void **)((char *)__scan->cpl_ops + \
 722                                              __op);                \
 723                        if (__method != NULL) {                  \
 724                                __result = (*__method)(__env, __scan,   \
 725                                                       ## __VA_ARGS__); \
 726                                if (__result != 0)                    \
 727                                        break;                    \
 728                        }                                              \
 729                }                                                      \
 730                __page = __page->cp_child;                            \
 731        } while (__page != NULL && __result == 0);                    \
 732        if (__result > 0)                                              \
 733                __result = 0;                                      \
 734        __result;                                                      \
 735})
 736
 737#define CL_PAGE_INVOID(_env, _page, _op, _proto, ...)              \
 738do {                                                                \
 739        const struct lu_env     *__env  = (_env);                   \
 740        struct cl_page       *__page = (_page);            \
 741        const struct cl_page_slice *__scan;                          \
 742        ptrdiff_t                  __op   = (_op);                   \
 743        void                  (*__method)_proto;                    \
 744                                                                        \
 745        __page = cl_page_top(__page);                              \
 746        do {                                                        \
 747                list_for_each_entry(__scan, &__page->cp_layers,     \
 748                                        cpl_linkage) {            \
 749                        __method = *(void **)((char *)__scan->cpl_ops + \
 750                                              __op);                \
 751                        if (__method != NULL)                      \
 752                                (*__method)(__env, __scan,            \
 753                                            ## __VA_ARGS__);        \
 754                }                                                      \
 755                __page = __page->cp_child;                            \
 756        } while (__page != NULL);                                      \
 757} while (0)
 758
 759#define CL_PAGE_INVOID_REVERSE(_env, _page, _op, _proto, ...)          \
 760do {                                                                    \
 761        const struct lu_env     *__env  = (_env);                       \
 762        struct cl_page       *__page = (_page);                \
 763        const struct cl_page_slice *__scan;                              \
 764        ptrdiff_t                  __op   = (_op);                       \
 765        void                  (*__method)_proto;                        \
 766                                                                            \
 767        /* get to the bottom page. */                                  \
 768        while (__page->cp_child != NULL)                                    \
 769                __page = __page->cp_child;                                \
 770        do {                                                            \
 771                list_for_each_entry_reverse(__scan, &__page->cp_layers, \
 772                                                cpl_linkage) {        \
 773                        __method = *(void **)((char *)__scan->cpl_ops +     \
 774                                              __op);                    \
 775                        if (__method != NULL)                          \
 776                                (*__method)(__env, __scan,                \
 777                                            ## __VA_ARGS__);            \
 778                }                                                          \
 779                __page = __page->cp_parent;                              \
 780        } while (__page != NULL);                                          \
 781} while (0)
 782
 783static int cl_page_invoke(const struct lu_env *env,
 784                          struct cl_io *io, struct cl_page *page, ptrdiff_t op)
 785
 786{
 787        PINVRNT(env, page, cl_object_same(page->cp_obj, io->ci_obj));
 788        ENTRY;
 789        RETURN(CL_PAGE_INVOKE(env, page, op,
 790                              (const struct lu_env *,
 791                               const struct cl_page_slice *, struct cl_io *),
 792                              io));
 793}
 794
 795static void cl_page_invoid(const struct lu_env *env,
 796                           struct cl_io *io, struct cl_page *page, ptrdiff_t op)
 797
 798{
 799        PINVRNT(env, page, cl_object_same(page->cp_obj, io->ci_obj));
 800        ENTRY;
 801        CL_PAGE_INVOID(env, page, op,
 802                       (const struct lu_env *,
 803                        const struct cl_page_slice *, struct cl_io *), io);
 804        EXIT;
 805}
 806
 807static void cl_page_owner_clear(struct cl_page *page)
 808{
 809        ENTRY;
 810        for (page = cl_page_top(page); page != NULL; page = page->cp_child) {
 811                if (page->cp_owner != NULL) {
 812                        LASSERT(page->cp_owner->ci_owned_nr > 0);
 813                        page->cp_owner->ci_owned_nr--;
 814                        page->cp_owner = NULL;
 815                        page->cp_task = NULL;
 816                }
 817        }
 818        EXIT;
 819}
 820
 821static void cl_page_owner_set(struct cl_page *page)
 822{
 823        ENTRY;
 824        for (page = cl_page_top(page); page != NULL; page = page->cp_child) {
 825                LASSERT(page->cp_owner != NULL);
 826                page->cp_owner->ci_owned_nr++;
 827        }
 828        EXIT;
 829}
 830
 831void cl_page_disown0(const struct lu_env *env,
 832                     struct cl_io *io, struct cl_page *pg)
 833{
 834        enum cl_page_state state;
 835
 836        ENTRY;
 837        state = pg->cp_state;
 838        PINVRNT(env, pg, state == CPS_OWNED || state == CPS_FREEING);
 839        PINVRNT(env, pg, cl_page_invariant(pg));
 840        cl_page_owner_clear(pg);
 841
 842        if (state == CPS_OWNED)
 843                cl_page_state_set(env, pg, CPS_CACHED);
 844        /*
 845         * Completion call-backs are executed in the bottom-up order, so that
 846         * uppermost layer (llite), responsible for VFS/VM interaction runs
 847         * last and can release locks safely.
 848         */
 849        CL_PAGE_INVOID_REVERSE(env, pg, CL_PAGE_OP(cpo_disown),
 850                               (const struct lu_env *,
 851                                const struct cl_page_slice *, struct cl_io *),
 852                               io);
 853        EXIT;
 854}
 855
 856/**
 857 * returns true, iff page is owned by the given io.
 858 */
 859int cl_page_is_owned(const struct cl_page *pg, const struct cl_io *io)
 860{
 861        LINVRNT(cl_object_same(pg->cp_obj, io->ci_obj));
 862        ENTRY;
 863        RETURN(pg->cp_state == CPS_OWNED && pg->cp_owner == io);
 864}
 865EXPORT_SYMBOL(cl_page_is_owned);
 866
 867/**
 868 * Try to own a page by IO.
 869 *
 870 * Waits until page is in cl_page_state::CPS_CACHED state, and then switch it
 871 * into cl_page_state::CPS_OWNED state.
 872 *
 873 * \pre  !cl_page_is_owned(pg, io)
 874 * \post result == 0 iff cl_page_is_owned(pg, io)
 875 *
 876 * \retval 0   success
 877 *
 878 * \retval -ve failure, e.g., page was destroyed (and landed in
 879 *           cl_page_state::CPS_FREEING instead of cl_page_state::CPS_CACHED).
 880 *           or, page was owned by another thread, or in IO.
 881 *
 882 * \see cl_page_disown()
 883 * \see cl_page_operations::cpo_own()
 884 * \see cl_page_own_try()
 885 * \see cl_page_own
 886 */
 887static int cl_page_own0(const struct lu_env *env, struct cl_io *io,
 888                        struct cl_page *pg, int nonblock)
 889{
 890        int result;
 891
 892        PINVRNT(env, pg, !cl_page_is_owned(pg, io));
 893
 894        ENTRY;
 895        pg = cl_page_top(pg);
 896        io = cl_io_top(io);
 897
 898        if (pg->cp_state == CPS_FREEING) {
 899                result = -ENOENT;
 900        } else {
 901                result = CL_PAGE_INVOKE(env, pg, CL_PAGE_OP(cpo_own),
 902                                        (const struct lu_env *,
 903                                         const struct cl_page_slice *,
 904                                         struct cl_io *, int),
 905                                        io, nonblock);
 906                if (result == 0) {
 907                        PASSERT(env, pg, pg->cp_owner == NULL);
 908                        PASSERT(env, pg, pg->cp_req == NULL);
 909                        pg->cp_owner = io;
 910                        pg->cp_task  = current;
 911                        cl_page_owner_set(pg);
 912                        if (pg->cp_state != CPS_FREEING) {
 913                                cl_page_state_set(env, pg, CPS_OWNED);
 914                        } else {
 915                                cl_page_disown0(env, io, pg);
 916                                result = -ENOENT;
 917                        }
 918                }
 919        }
 920        PINVRNT(env, pg, ergo(result == 0, cl_page_invariant(pg)));
 921        RETURN(result);
 922}
 923
 924/**
 925 * Own a page, might be blocked.
 926 *
 927 * \see cl_page_own0()
 928 */
 929int cl_page_own(const struct lu_env *env, struct cl_io *io, struct cl_page *pg)
 930{
 931        return cl_page_own0(env, io, pg, 0);
 932}
 933EXPORT_SYMBOL(cl_page_own);
 934
 935/**
 936 * Nonblock version of cl_page_own().
 937 *
 938 * \see cl_page_own0()
 939 */
 940int cl_page_own_try(const struct lu_env *env, struct cl_io *io,
 941                    struct cl_page *pg)
 942{
 943        return cl_page_own0(env, io, pg, 1);
 944}
 945EXPORT_SYMBOL(cl_page_own_try);
 946
 947
 948/**
 949 * Assume page ownership.
 950 *
 951 * Called when page is already locked by the hosting VM.
 952 *
 953 * \pre !cl_page_is_owned(pg, io)
 954 * \post cl_page_is_owned(pg, io)
 955 *
 956 * \see cl_page_operations::cpo_assume()
 957 */
 958void cl_page_assume(const struct lu_env *env,
 959                    struct cl_io *io, struct cl_page *pg)
 960{
 961        PINVRNT(env, pg, cl_object_same(pg->cp_obj, io->ci_obj));
 962
 963        ENTRY;
 964        pg = cl_page_top(pg);
 965        io = cl_io_top(io);
 966
 967        cl_page_invoid(env, io, pg, CL_PAGE_OP(cpo_assume));
 968        PASSERT(env, pg, pg->cp_owner == NULL);
 969        pg->cp_owner = io;
 970        pg->cp_task = current;
 971        cl_page_owner_set(pg);
 972        cl_page_state_set(env, pg, CPS_OWNED);
 973        EXIT;
 974}
 975EXPORT_SYMBOL(cl_page_assume);
 976
 977/**
 978 * Releases page ownership without unlocking the page.
 979 *
 980 * Moves page into cl_page_state::CPS_CACHED without releasing a lock on the
 981 * underlying VM page (as VM is supposed to do this itself).
 982 *
 983 * \pre   cl_page_is_owned(pg, io)
 984 * \post !cl_page_is_owned(pg, io)
 985 *
 986 * \see cl_page_assume()
 987 */
 988void cl_page_unassume(const struct lu_env *env,
 989                      struct cl_io *io, struct cl_page *pg)
 990{
 991        PINVRNT(env, pg, cl_page_is_owned(pg, io));
 992        PINVRNT(env, pg, cl_page_invariant(pg));
 993
 994        ENTRY;
 995        pg = cl_page_top(pg);
 996        io = cl_io_top(io);
 997        cl_page_owner_clear(pg);
 998        cl_page_state_set(env, pg, CPS_CACHED);
 999        CL_PAGE_INVOID_REVERSE(env, pg, CL_PAGE_OP(cpo_unassume),
1000                               (const struct lu_env *,
1001                                const struct cl_page_slice *, struct cl_io *),
1002                               io);
1003        EXIT;
1004}
1005EXPORT_SYMBOL(cl_page_unassume);
1006
1007/**
1008 * Releases page ownership.
1009 *
1010 * Moves page into cl_page_state::CPS_CACHED.
1011 *
1012 * \pre   cl_page_is_owned(pg, io)
1013 * \post !cl_page_is_owned(pg, io)
1014 *
1015 * \see cl_page_own()
1016 * \see cl_page_operations::cpo_disown()
1017 */
1018void cl_page_disown(const struct lu_env *env,
1019                    struct cl_io *io, struct cl_page *pg)
1020{
1021        PINVRNT(env, pg, cl_page_is_owned(pg, io));
1022
1023        ENTRY;
1024        pg = cl_page_top(pg);
1025        io = cl_io_top(io);
1026        cl_page_disown0(env, io, pg);
1027        EXIT;
1028}
1029EXPORT_SYMBOL(cl_page_disown);
1030
1031/**
1032 * Called when page is to be removed from the object, e.g., as a result of
1033 * truncate.
1034 *
1035 * Calls cl_page_operations::cpo_discard() top-to-bottom.
1036 *
1037 * \pre cl_page_is_owned(pg, io)
1038 *
1039 * \see cl_page_operations::cpo_discard()
1040 */
1041void cl_page_discard(const struct lu_env *env,
1042                     struct cl_io *io, struct cl_page *pg)
1043{
1044        PINVRNT(env, pg, cl_page_is_owned(pg, io));
1045        PINVRNT(env, pg, cl_page_invariant(pg));
1046
1047        cl_page_invoid(env, io, pg, CL_PAGE_OP(cpo_discard));
1048}
1049EXPORT_SYMBOL(cl_page_discard);
1050
1051/**
1052 * Version of cl_page_delete() that can be called for not fully constructed
1053 * pages, e.g,. in a error handling cl_page_find()->cl_page_delete0()
1054 * path. Doesn't check page invariant.
1055 */
1056static void cl_page_delete0(const struct lu_env *env, struct cl_page *pg,
1057                            int radix)
1058{
1059        struct cl_page *tmp = pg;
1060        ENTRY;
1061
1062        PASSERT(env, pg, pg == cl_page_top(pg));
1063        PASSERT(env, pg, pg->cp_state != CPS_FREEING);
1064
1065        /*
1066         * Severe all ways to obtain new pointers to @pg.
1067         */
1068        cl_page_owner_clear(pg);
1069
1070        /*
1071         * unexport the page firstly before freeing it so that
1072         * the page content is considered to be invalid.
1073         * We have to do this because a CPS_FREEING cl_page may
1074         * be NOT under the protection of a cl_lock.
1075         * Afterwards, if this page is found by other threads, then this
1076         * page will be forced to reread.
1077         */
1078        cl_page_export(env, pg, 0);
1079        cl_page_state_set0(env, pg, CPS_FREEING);
1080
1081        CL_PAGE_INVOID(env, pg, CL_PAGE_OP(cpo_delete),
1082                       (const struct lu_env *, const struct cl_page_slice *));
1083
1084        if (tmp->cp_type == CPT_CACHEABLE) {
1085                if (!radix)
1086                        /* !radix means that @pg is not yet in the radix tree,
1087                         * skip removing it.
1088                         */
1089                        tmp = pg->cp_child;
1090                for (; tmp != NULL; tmp = tmp->cp_child) {
1091                        void                *value;
1092                        struct cl_object_header *hdr;
1093
1094                        hdr = cl_object_header(tmp->cp_obj);
1095                        spin_lock(&hdr->coh_page_guard);
1096                        value = radix_tree_delete(&hdr->coh_tree,
1097                                                  tmp->cp_index);
1098                        PASSERT(env, tmp, value == tmp);
1099                        PASSERT(env, tmp, hdr->coh_pages > 0);
1100                        hdr->coh_pages--;
1101                        spin_unlock(&hdr->coh_page_guard);
1102                        cl_page_put(env, tmp);
1103                }
1104        }
1105
1106        EXIT;
1107}
1108
1109/**
1110 * Called when a decision is made to throw page out of memory.
1111 *
1112 * Notifies all layers about page destruction by calling
1113 * cl_page_operations::cpo_delete() method top-to-bottom.
1114 *
1115 * Moves page into cl_page_state::CPS_FREEING state (this is the only place
1116 * where transition to this state happens).
1117 *
1118 * Eliminates all venues through which new references to the page can be
1119 * obtained:
1120 *
1121 *     - removes page from the radix trees,
1122 *
1123 *     - breaks linkage from VM page to cl_page.
1124 *
1125 * Once page reaches cl_page_state::CPS_FREEING, all remaining references will
1126 * drain after some time, at which point page will be recycled.
1127 *
1128 * \pre  pg == cl_page_top(pg)
1129 * \pre  VM page is locked
1130 * \post pg->cp_state == CPS_FREEING
1131 *
1132 * \see cl_page_operations::cpo_delete()
1133 */
1134void cl_page_delete(const struct lu_env *env, struct cl_page *pg)
1135{
1136        PINVRNT(env, pg, cl_page_invariant(pg));
1137        ENTRY;
1138        cl_page_delete0(env, pg, 1);
1139        EXIT;
1140}
1141EXPORT_SYMBOL(cl_page_delete);
1142
1143/**
1144 * Unmaps page from user virtual memory.
1145 *
1146 * Calls cl_page_operations::cpo_unmap() through all layers top-to-bottom. The
1147 * layer responsible for VM interaction has to unmap page from user space
1148 * virtual memory.
1149 *
1150 * \see cl_page_operations::cpo_unmap()
1151 */
1152int cl_page_unmap(const struct lu_env *env,
1153                  struct cl_io *io, struct cl_page *pg)
1154{
1155        PINVRNT(env, pg, cl_page_is_owned(pg, io));
1156        PINVRNT(env, pg, cl_page_invariant(pg));
1157
1158        return cl_page_invoke(env, io, pg, CL_PAGE_OP(cpo_unmap));
1159}
1160EXPORT_SYMBOL(cl_page_unmap);
1161
1162/**
1163 * Marks page up-to-date.
1164 *
1165 * Call cl_page_operations::cpo_export() through all layers top-to-bottom. The
1166 * layer responsible for VM interaction has to mark/clear page as up-to-date
1167 * by the \a uptodate argument.
1168 *
1169 * \see cl_page_operations::cpo_export()
1170 */
1171void cl_page_export(const struct lu_env *env, struct cl_page *pg, int uptodate)
1172{
1173        PINVRNT(env, pg, cl_page_invariant(pg));
1174        CL_PAGE_INVOID(env, pg, CL_PAGE_OP(cpo_export),
1175                       (const struct lu_env *,
1176                        const struct cl_page_slice *, int), uptodate);
1177}
1178EXPORT_SYMBOL(cl_page_export);
1179
1180/**
1181 * Returns true, iff \a pg is VM locked in a suitable sense by the calling
1182 * thread.
1183 */
1184int cl_page_is_vmlocked(const struct lu_env *env, const struct cl_page *pg)
1185{
1186        int result;
1187        const struct cl_page_slice *slice;
1188
1189        ENTRY;
1190        pg = cl_page_top_trusted((struct cl_page *)pg);
1191        slice = container_of(pg->cp_layers.next,
1192                             const struct cl_page_slice, cpl_linkage);
1193        PASSERT(env, pg, slice->cpl_ops->cpo_is_vmlocked != NULL);
1194        /*
1195         * Call ->cpo_is_vmlocked() directly instead of going through
1196         * CL_PAGE_INVOKE(), because cl_page_is_vmlocked() is used by
1197         * cl_page_invariant().
1198         */
1199        result = slice->cpl_ops->cpo_is_vmlocked(env, slice);
1200        PASSERT(env, pg, result == -EBUSY || result == -ENODATA);
1201        RETURN(result == -EBUSY);
1202}
1203EXPORT_SYMBOL(cl_page_is_vmlocked);
1204
1205static enum cl_page_state cl_req_type_state(enum cl_req_type crt)
1206{
1207        ENTRY;
1208        RETURN(crt == CRT_WRITE ? CPS_PAGEOUT : CPS_PAGEIN);
1209}
1210
1211static void cl_page_io_start(const struct lu_env *env,
1212                             struct cl_page *pg, enum cl_req_type crt)
1213{
1214        /*
1215         * Page is queued for IO, change its state.
1216         */
1217        ENTRY;
1218        cl_page_owner_clear(pg);
1219        cl_page_state_set(env, pg, cl_req_type_state(crt));
1220        EXIT;
1221}
1222
1223/**
1224 * Prepares page for immediate transfer. cl_page_operations::cpo_prep() is
1225 * called top-to-bottom. Every layer either agrees to submit this page (by
1226 * returning 0), or requests to omit this page (by returning -EALREADY). Layer
1227 * handling interactions with the VM also has to inform VM that page is under
1228 * transfer now.
1229 */
1230int cl_page_prep(const struct lu_env *env, struct cl_io *io,
1231                 struct cl_page *pg, enum cl_req_type crt)
1232{
1233        int result;
1234
1235        PINVRNT(env, pg, cl_page_is_owned(pg, io));
1236        PINVRNT(env, pg, cl_page_invariant(pg));
1237        PINVRNT(env, pg, crt < CRT_NR);
1238
1239        /*
1240         * XXX this has to be called bottom-to-top, so that llite can set up
1241         * PG_writeback without risking other layers deciding to skip this
1242         * page.
1243         */
1244        if (crt >= CRT_NR)
1245                return -EINVAL;
1246        result = cl_page_invoke(env, io, pg, CL_PAGE_OP(io[crt].cpo_prep));
1247        if (result == 0)
1248                cl_page_io_start(env, pg, crt);
1249
1250        KLASSERT(ergo(crt == CRT_WRITE && pg->cp_type == CPT_CACHEABLE,
1251                      equi(result == 0,
1252                           PageWriteback(cl_page_vmpage(env, pg)))));
1253        CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result);
1254        return result;
1255}
1256EXPORT_SYMBOL(cl_page_prep);
1257
1258/**
1259 * Notify layers about transfer completion.
1260 *
1261 * Invoked by transfer sub-system (which is a part of osc) to notify layers
1262 * that a transfer, of which this page is a part of has completed.
1263 *
1264 * Completion call-backs are executed in the bottom-up order, so that
1265 * uppermost layer (llite), responsible for the VFS/VM interaction runs last
1266 * and can release locks safely.
1267 *
1268 * \pre  pg->cp_state == CPS_PAGEIN || pg->cp_state == CPS_PAGEOUT
1269 * \post pg->cp_state == CPS_CACHED
1270 *
1271 * \see cl_page_operations::cpo_completion()
1272 */
1273void cl_page_completion(const struct lu_env *env,
1274                        struct cl_page *pg, enum cl_req_type crt, int ioret)
1275{
1276        struct cl_sync_io *anchor = pg->cp_sync_io;
1277
1278        PASSERT(env, pg, crt < CRT_NR);
1279        /* cl_page::cp_req already cleared by the caller (osc_completion()) */
1280        PASSERT(env, pg, pg->cp_req == NULL);
1281        PASSERT(env, pg, pg->cp_state == cl_req_type_state(crt));
1282
1283        ENTRY;
1284        CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, ioret);
1285        if (crt == CRT_READ && ioret == 0) {
1286                PASSERT(env, pg, !(pg->cp_flags & CPF_READ_COMPLETED));
1287                pg->cp_flags |= CPF_READ_COMPLETED;
1288        }
1289
1290        cl_page_state_set(env, pg, CPS_CACHED);
1291        if (crt >= CRT_NR)
1292                return;
1293        CL_PAGE_INVOID_REVERSE(env, pg, CL_PAGE_OP(io[crt].cpo_completion),
1294                               (const struct lu_env *,
1295                                const struct cl_page_slice *, int), ioret);
1296        if (anchor) {
1297                LASSERT(cl_page_is_vmlocked(env, pg));
1298                LASSERT(pg->cp_sync_io == anchor);
1299                pg->cp_sync_io = NULL;
1300        }
1301        /*
1302         * As page->cp_obj is pinned by a reference from page->cp_req, it is
1303         * safe to call cl_page_put() without risking object destruction in a
1304         * non-blocking context.
1305         */
1306        cl_page_put(env, pg);
1307
1308        if (anchor)
1309                cl_sync_io_note(anchor, ioret);
1310
1311        EXIT;
1312}
1313EXPORT_SYMBOL(cl_page_completion);
1314
1315/**
1316 * Notify layers that transfer formation engine decided to yank this page from
1317 * the cache and to make it a part of a transfer.
1318 *
1319 * \pre  pg->cp_state == CPS_CACHED
1320 * \post pg->cp_state == CPS_PAGEIN || pg->cp_state == CPS_PAGEOUT
1321 *
1322 * \see cl_page_operations::cpo_make_ready()
1323 */
1324int cl_page_make_ready(const struct lu_env *env, struct cl_page *pg,
1325                       enum cl_req_type crt)
1326{
1327        int result;
1328
1329        PINVRNT(env, pg, crt < CRT_NR);
1330
1331        ENTRY;
1332        if (crt >= CRT_NR)
1333                RETURN(-EINVAL);
1334        result = CL_PAGE_INVOKE(env, pg, CL_PAGE_OP(io[crt].cpo_make_ready),
1335                                (const struct lu_env *,
1336                                 const struct cl_page_slice *));
1337        if (result == 0) {
1338                PASSERT(env, pg, pg->cp_state == CPS_CACHED);
1339                cl_page_io_start(env, pg, crt);
1340        }
1341        CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result);
1342        RETURN(result);
1343}
1344EXPORT_SYMBOL(cl_page_make_ready);
1345
1346/**
1347 * Notify layers that high level io decided to place this page into a cache
1348 * for future transfer.
1349 *
1350 * The layer implementing transfer engine (osc) has to register this page in
1351 * its queues.
1352 *
1353 * \pre  cl_page_is_owned(pg, io)
1354 * \post cl_page_is_owned(pg, io)
1355 *
1356 * \see cl_page_operations::cpo_cache_add()
1357 */
1358int cl_page_cache_add(const struct lu_env *env, struct cl_io *io,
1359                      struct cl_page *pg, enum cl_req_type crt)
1360{
1361        const struct cl_page_slice *scan;
1362        int result = 0;
1363
1364        PINVRNT(env, pg, crt < CRT_NR);
1365        PINVRNT(env, pg, cl_page_is_owned(pg, io));
1366        PINVRNT(env, pg, cl_page_invariant(pg));
1367
1368        ENTRY;
1369
1370        if (crt >= CRT_NR)
1371                RETURN(-EINVAL);
1372
1373        list_for_each_entry(scan, &pg->cp_layers, cpl_linkage) {
1374                if (scan->cpl_ops->io[crt].cpo_cache_add == NULL)
1375                        continue;
1376
1377                result = scan->cpl_ops->io[crt].cpo_cache_add(env, scan, io);
1378                if (result != 0)
1379                        break;
1380        }
1381        CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result);
1382        RETURN(result);
1383}
1384EXPORT_SYMBOL(cl_page_cache_add);
1385
1386/**
1387 * Called if a pge is being written back by kernel's intention.
1388 *
1389 * \pre  cl_page_is_owned(pg, io)
1390 * \post ergo(result == 0, pg->cp_state == CPS_PAGEOUT)
1391 *
1392 * \see cl_page_operations::cpo_flush()
1393 */
1394int cl_page_flush(const struct lu_env *env, struct cl_io *io,
1395                  struct cl_page *pg)
1396{
1397        int result;
1398
1399        PINVRNT(env, pg, cl_page_is_owned(pg, io));
1400        PINVRNT(env, pg, cl_page_invariant(pg));
1401
1402        ENTRY;
1403
1404        result = cl_page_invoke(env, io, pg, CL_PAGE_OP(cpo_flush));
1405
1406        CL_PAGE_HEADER(D_TRACE, env, pg, "%d\n", result);
1407        RETURN(result);
1408}
1409EXPORT_SYMBOL(cl_page_flush);
1410
1411/**
1412 * Checks whether page is protected by any extent lock is at least required
1413 * mode.
1414 *
1415 * \return the same as in cl_page_operations::cpo_is_under_lock() method.
1416 * \see cl_page_operations::cpo_is_under_lock()
1417 */
1418int cl_page_is_under_lock(const struct lu_env *env, struct cl_io *io,
1419                          struct cl_page *page)
1420{
1421        int rc;
1422
1423        PINVRNT(env, page, cl_page_invariant(page));
1424
1425        ENTRY;
1426        rc = CL_PAGE_INVOKE(env, page, CL_PAGE_OP(cpo_is_under_lock),
1427                            (const struct lu_env *,
1428                             const struct cl_page_slice *, struct cl_io *),
1429                            io);
1430        PASSERT(env, page, rc != 0);
1431        RETURN(rc);
1432}
1433EXPORT_SYMBOL(cl_page_is_under_lock);
1434
1435static int page_prune_cb(const struct lu_env *env, struct cl_io *io,
1436                         struct cl_page *page, void *cbdata)
1437{
1438        cl_page_own(env, io, page);
1439        cl_page_unmap(env, io, page);
1440        cl_page_discard(env, io, page);
1441        cl_page_disown(env, io, page);
1442        return CLP_GANG_OKAY;
1443}
1444
1445/**
1446 * Purges all cached pages belonging to the object \a obj.
1447 */
1448int cl_pages_prune(const struct lu_env *env, struct cl_object *clobj)
1449{
1450        struct cl_thread_info   *info;
1451        struct cl_object        *obj = cl_object_top(clobj);
1452        struct cl_io        *io;
1453        int                   result;
1454
1455        ENTRY;
1456        info  = cl_env_info(env);
1457        io    = &info->clt_io;
1458
1459        /*
1460         * initialize the io. This is ugly since we never do IO in this
1461         * function, we just make cl_page_list functions happy. -jay
1462         */
1463        io->ci_obj = obj;
1464        io->ci_ignore_layout = 1;
1465        result = cl_io_init(env, io, CIT_MISC, obj);
1466        if (result != 0) {
1467                cl_io_fini(env, io);
1468                RETURN(io->ci_result);
1469        }
1470
1471        do {
1472                result = cl_page_gang_lookup(env, obj, io, 0, CL_PAGE_EOF,
1473                                             page_prune_cb, NULL);
1474                if (result == CLP_GANG_RESCHED)
1475                        cond_resched();
1476        } while (result != CLP_GANG_OKAY);
1477
1478        cl_io_fini(env, io);
1479        RETURN(result);
1480}
1481EXPORT_SYMBOL(cl_pages_prune);
1482
1483/**
1484 * Tells transfer engine that only part of a page is to be transmitted.
1485 *
1486 * \see cl_page_operations::cpo_clip()
1487 */
1488void cl_page_clip(const struct lu_env *env, struct cl_page *pg,
1489                  int from, int to)
1490{
1491        PINVRNT(env, pg, cl_page_invariant(pg));
1492
1493        CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", from, to);
1494        CL_PAGE_INVOID(env, pg, CL_PAGE_OP(cpo_clip),
1495                       (const struct lu_env *,
1496                        const struct cl_page_slice *,int, int),
1497                       from, to);
1498}
1499EXPORT_SYMBOL(cl_page_clip);
1500
1501/**
1502 * Prints human readable representation of \a pg to the \a f.
1503 */
1504void cl_page_header_print(const struct lu_env *env, void *cookie,
1505                          lu_printer_t printer, const struct cl_page *pg)
1506{
1507        (*printer)(env, cookie,
1508                   "page@%p[%d %p:%lu ^%p_%p %d %d %d %p %p %#x]\n",
1509                   pg, atomic_read(&pg->cp_ref), pg->cp_obj,
1510                   pg->cp_index, pg->cp_parent, pg->cp_child,
1511                   pg->cp_state, pg->cp_error, pg->cp_type,
1512                   pg->cp_owner, pg->cp_req, pg->cp_flags);
1513}
1514EXPORT_SYMBOL(cl_page_header_print);
1515
1516/**
1517 * Prints human readable representation of \a pg to the \a f.
1518 */
1519void cl_page_print(const struct lu_env *env, void *cookie,
1520                   lu_printer_t printer, const struct cl_page *pg)
1521{
1522        struct cl_page *scan;
1523
1524        for (scan = cl_page_top((struct cl_page *)pg);
1525             scan != NULL; scan = scan->cp_child)
1526                cl_page_header_print(env, cookie, printer, scan);
1527        CL_PAGE_INVOKE(env, (struct cl_page *)pg, CL_PAGE_OP(cpo_print),
1528                       (const struct lu_env *env,
1529                        const struct cl_page_slice *slice,
1530                        void *cookie, lu_printer_t p), cookie, printer);
1531        (*printer)(env, cookie, "end page@%p\n", pg);
1532}
1533EXPORT_SYMBOL(cl_page_print);
1534
1535/**
1536 * Cancel a page which is still in a transfer.
1537 */
1538int cl_page_cancel(const struct lu_env *env, struct cl_page *page)
1539{
1540        return CL_PAGE_INVOKE(env, page, CL_PAGE_OP(cpo_cancel),
1541                              (const struct lu_env *,
1542                               const struct cl_page_slice *));
1543}
1544EXPORT_SYMBOL(cl_page_cancel);
1545
1546/**
1547 * Converts a byte offset within object \a obj into a page index.
1548 */
1549loff_t cl_offset(const struct cl_object *obj, pgoff_t idx)
1550{
1551        /*
1552         * XXX for now.
1553         */
1554        return (loff_t)idx << PAGE_CACHE_SHIFT;
1555}
1556EXPORT_SYMBOL(cl_offset);
1557
1558/**
1559 * Converts a page index into a byte offset within object \a obj.
1560 */
1561pgoff_t cl_index(const struct cl_object *obj, loff_t offset)
1562{
1563        /*
1564         * XXX for now.
1565         */
1566        return offset >> PAGE_CACHE_SHIFT;
1567}
1568EXPORT_SYMBOL(cl_index);
1569
1570int cl_page_size(const struct cl_object *obj)
1571{
1572        return 1 << PAGE_CACHE_SHIFT;
1573}
1574EXPORT_SYMBOL(cl_page_size);
1575
1576/**
1577 * Adds page slice to the compound page.
1578 *
1579 * This is called by cl_object_operations::coo_page_init() methods to add a
1580 * per-layer state to the page. New state is added at the end of
1581 * cl_page::cp_layers list, that is, it is at the bottom of the stack.
1582 *
1583 * \see cl_lock_slice_add(), cl_req_slice_add(), cl_io_slice_add()
1584 */
1585void cl_page_slice_add(struct cl_page *page, struct cl_page_slice *slice,
1586                       struct cl_object *obj,
1587                       const struct cl_page_operations *ops)
1588{
1589        ENTRY;
1590        list_add_tail(&slice->cpl_linkage, &page->cp_layers);
1591        slice->cpl_obj  = obj;
1592        slice->cpl_ops  = ops;
1593        slice->cpl_page = page;
1594        EXIT;
1595}
1596EXPORT_SYMBOL(cl_page_slice_add);
1597
1598int  cl_page_init(void)
1599{
1600        return 0;
1601}
1602
1603void cl_page_fini(void)
1604{
1605}
1606