linux/drivers/staging/lustre/lustre/obdclass/cl_io.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * Client IO.
  37 *
  38 *   Author: Nikita Danilov <nikita.danilov@sun.com>
  39 */
  40
  41#define DEBUG_SUBSYSTEM S_CLASS
  42
  43#include <obd_class.h>
  44#include <obd_support.h>
  45#include <lustre_fid.h>
  46#include <linux/list.h>
  47#include <cl_object.h>
  48#include "cl_internal.h"
  49
  50/*****************************************************************************
  51 *
  52 * cl_io interface.
  53 *
  54 */
  55
  56#define cl_io_for_each(slice, io) \
  57        list_for_each_entry((slice), &io->ci_layers, cis_linkage)
  58#define cl_io_for_each_reverse(slice, io)                \
  59        list_for_each_entry_reverse((slice), &io->ci_layers, cis_linkage)
  60
  61static inline int cl_io_type_is_valid(enum cl_io_type type)
  62{
  63        return CIT_READ <= type && type < CIT_OP_NR;
  64}
  65
  66static inline int cl_io_is_loopable(const struct cl_io *io)
  67{
  68        return cl_io_type_is_valid(io->ci_type) && io->ci_type != CIT_MISC;
  69}
  70
  71/**
  72 * Returns true iff there is an IO ongoing in the given environment.
  73 */
  74int cl_io_is_going(const struct lu_env *env)
  75{
  76        return cl_env_info(env)->clt_current_io != NULL;
  77}
  78EXPORT_SYMBOL(cl_io_is_going);
  79
  80/**
  81 * cl_io invariant that holds at all times when exported cl_io_*() functions
  82 * are entered and left.
  83 */
  84static int cl_io_invariant(const struct cl_io *io)
  85{
  86        struct cl_io *up;
  87
  88        up = io->ci_parent;
  89        return
  90                /*
  91                 * io can own pages only when it is ongoing. Sub-io might
  92                 * still be in CIS_LOCKED state when top-io is in
  93                 * CIS_IO_GOING.
  94                 */
  95                ergo(io->ci_owned_nr > 0, io->ci_state == CIS_IO_GOING ||
  96                     (io->ci_state == CIS_LOCKED && up != NULL));
  97}
  98
  99/**
 100 * Finalize \a io, by calling cl_io_operations::cio_fini() bottom-to-top.
 101 */
 102void cl_io_fini(const struct lu_env *env, struct cl_io *io)
 103{
 104        struct cl_io_slice    *slice;
 105        struct cl_thread_info *info;
 106
 107        LINVRNT(cl_io_type_is_valid(io->ci_type));
 108        LINVRNT(cl_io_invariant(io));
 109
 110        while (!list_empty(&io->ci_layers)) {
 111                slice = container_of(io->ci_layers.prev, struct cl_io_slice,
 112                                     cis_linkage);
 113                list_del_init(&slice->cis_linkage);
 114                if (slice->cis_iop->op[io->ci_type].cio_fini != NULL)
 115                        slice->cis_iop->op[io->ci_type].cio_fini(env, slice);
 116                /*
 117                 * Invalidate slice to catch use after free. This assumes that
 118                 * slices are allocated within session and can be touched
 119                 * after ->cio_fini() returns.
 120                 */
 121                slice->cis_io = NULL;
 122        }
 123        io->ci_state = CIS_FINI;
 124        info = cl_env_info(env);
 125        if (info->clt_current_io == io)
 126                info->clt_current_io = NULL;
 127
 128        /* sanity check for layout change */
 129        switch(io->ci_type) {
 130        case CIT_READ:
 131        case CIT_WRITE:
 132                break;
 133        case CIT_FAULT:
 134        case CIT_FSYNC:
 135                LASSERT(!io->ci_need_restart);
 136                break;
 137        case CIT_SETATTR:
 138        case CIT_MISC:
 139                /* Check ignore layout change conf */
 140                LASSERT(ergo(io->ci_ignore_layout || !io->ci_verify_layout,
 141                                !io->ci_need_restart));
 142                break;
 143        default:
 144                LBUG();
 145        }
 146}
 147EXPORT_SYMBOL(cl_io_fini);
 148
 149static int cl_io_init0(const struct lu_env *env, struct cl_io *io,
 150                       enum cl_io_type iot, struct cl_object *obj)
 151{
 152        struct cl_object *scan;
 153        int result;
 154
 155        LINVRNT(io->ci_state == CIS_ZERO || io->ci_state == CIS_FINI);
 156        LINVRNT(cl_io_type_is_valid(iot));
 157        LINVRNT(cl_io_invariant(io));
 158
 159        io->ci_type = iot;
 160        INIT_LIST_HEAD(&io->ci_lockset.cls_todo);
 161        INIT_LIST_HEAD(&io->ci_lockset.cls_curr);
 162        INIT_LIST_HEAD(&io->ci_lockset.cls_done);
 163        INIT_LIST_HEAD(&io->ci_layers);
 164
 165        result = 0;
 166        cl_object_for_each(scan, obj) {
 167                if (scan->co_ops->coo_io_init != NULL) {
 168                        result = scan->co_ops->coo_io_init(env, scan, io);
 169                        if (result != 0)
 170                                break;
 171                }
 172        }
 173        if (result == 0)
 174                io->ci_state = CIS_INIT;
 175        return result;
 176}
 177
 178/**
 179 * Initialize sub-io, by calling cl_io_operations::cio_init() top-to-bottom.
 180 *
 181 * \pre obj != cl_object_top(obj)
 182 */
 183int cl_io_sub_init(const struct lu_env *env, struct cl_io *io,
 184                   enum cl_io_type iot, struct cl_object *obj)
 185{
 186        struct cl_thread_info *info = cl_env_info(env);
 187
 188        LASSERT(obj != cl_object_top(obj));
 189        if (info->clt_current_io == NULL)
 190                info->clt_current_io = io;
 191        return cl_io_init0(env, io, iot, obj);
 192}
 193EXPORT_SYMBOL(cl_io_sub_init);
 194
 195/**
 196 * Initialize \a io, by calling cl_io_operations::cio_init() top-to-bottom.
 197 *
 198 * Caller has to call cl_io_fini() after a call to cl_io_init(), no matter
 199 * what the latter returned.
 200 *
 201 * \pre obj == cl_object_top(obj)
 202 * \pre cl_io_type_is_valid(iot)
 203 * \post cl_io_type_is_valid(io->ci_type) && io->ci_type == iot
 204 */
 205int cl_io_init(const struct lu_env *env, struct cl_io *io,
 206               enum cl_io_type iot, struct cl_object *obj)
 207{
 208        struct cl_thread_info *info = cl_env_info(env);
 209
 210        LASSERT(obj == cl_object_top(obj));
 211        LASSERT(info->clt_current_io == NULL);
 212
 213        info->clt_current_io = io;
 214        return cl_io_init0(env, io, iot, obj);
 215}
 216EXPORT_SYMBOL(cl_io_init);
 217
 218/**
 219 * Initialize read or write io.
 220 *
 221 * \pre iot == CIT_READ || iot == CIT_WRITE
 222 */
 223int cl_io_rw_init(const struct lu_env *env, struct cl_io *io,
 224                  enum cl_io_type iot, loff_t pos, size_t count)
 225{
 226        LINVRNT(iot == CIT_READ || iot == CIT_WRITE);
 227        LINVRNT(io->ci_obj != NULL);
 228
 229        LU_OBJECT_HEADER(D_VFSTRACE, env, &io->ci_obj->co_lu,
 230                         "io range: %u ["LPU64", "LPU64") %u %u\n",
 231                         iot, (__u64)pos, (__u64)pos + count,
 232                         io->u.ci_rw.crw_nonblock, io->u.ci_wr.wr_append);
 233        io->u.ci_rw.crw_pos    = pos;
 234        io->u.ci_rw.crw_count  = count;
 235        return cl_io_init(env, io, iot, io->ci_obj);
 236}
 237EXPORT_SYMBOL(cl_io_rw_init);
 238
 239static inline const struct lu_fid *
 240cl_lock_descr_fid(const struct cl_lock_descr *descr)
 241{
 242        return lu_object_fid(&descr->cld_obj->co_lu);
 243}
 244
 245static int cl_lock_descr_sort(const struct cl_lock_descr *d0,
 246                              const struct cl_lock_descr *d1)
 247{
 248        return lu_fid_cmp(cl_lock_descr_fid(d0), cl_lock_descr_fid(d1)) ?:
 249                __diff_normalize(d0->cld_start, d1->cld_start);
 250}
 251
 252static int cl_lock_descr_cmp(const struct cl_lock_descr *d0,
 253                             const struct cl_lock_descr *d1)
 254{
 255        int ret;
 256
 257        ret = lu_fid_cmp(cl_lock_descr_fid(d0), cl_lock_descr_fid(d1));
 258        if (ret)
 259                return ret;
 260        if (d0->cld_end < d1->cld_start)
 261                return -1;
 262        if (d0->cld_start > d0->cld_end)
 263                return 1;
 264        return 0;
 265}
 266
 267static void cl_lock_descr_merge(struct cl_lock_descr *d0,
 268                                const struct cl_lock_descr *d1)
 269{
 270        d0->cld_start = min(d0->cld_start, d1->cld_start);
 271        d0->cld_end = max(d0->cld_end, d1->cld_end);
 272
 273        if (d1->cld_mode == CLM_WRITE && d0->cld_mode != CLM_WRITE)
 274                d0->cld_mode = CLM_WRITE;
 275
 276        if (d1->cld_mode == CLM_GROUP && d0->cld_mode != CLM_GROUP)
 277                d0->cld_mode = CLM_GROUP;
 278}
 279
 280/*
 281 * Sort locks in lexicographical order of their (fid, start-offset) pairs.
 282 */
 283static void cl_io_locks_sort(struct cl_io *io)
 284{
 285        int done = 0;
 286
 287        /* hidden treasure: bubble sort for now. */
 288        do {
 289                struct cl_io_lock_link *curr;
 290                struct cl_io_lock_link *prev;
 291                struct cl_io_lock_link *temp;
 292
 293                done = 1;
 294                prev = NULL;
 295
 296                list_for_each_entry_safe(curr, temp,
 297                                             &io->ci_lockset.cls_todo,
 298                                             cill_linkage) {
 299                        if (prev != NULL) {
 300                                switch (cl_lock_descr_sort(&prev->cill_descr,
 301                                                          &curr->cill_descr)) {
 302                                case 0:
 303                                        /*
 304                                         * IMPOSSIBLE: Identical locks are
 305                                         *           already removed at
 306                                         *           this point.
 307                                         */
 308                                default:
 309                                        LBUG();
 310                                case +1:
 311                                        list_move_tail(&curr->cill_linkage,
 312                                                           &prev->cill_linkage);
 313                                        done = 0;
 314                                        continue; /* don't change prev: it's
 315                                                   * still "previous" */
 316                                case -1: /* already in order */
 317                                        break;
 318                                }
 319                        }
 320                        prev = curr;
 321                }
 322        } while (!done);
 323}
 324
 325/**
 326 * Check whether \a queue contains locks matching \a need.
 327 *
 328 * \retval +ve there is a matching lock in the \a queue
 329 * \retval   0 there are no matching locks in the \a queue
 330 */
 331int cl_queue_match(const struct list_head *queue,
 332                   const struct cl_lock_descr *need)
 333{
 334       struct cl_io_lock_link *scan;
 335
 336       list_for_each_entry(scan, queue, cill_linkage) {
 337               if (cl_lock_descr_match(&scan->cill_descr, need))
 338                       return +1;
 339       }
 340       return 0;
 341}
 342EXPORT_SYMBOL(cl_queue_match);
 343
 344static int cl_queue_merge(const struct list_head *queue,
 345                          const struct cl_lock_descr *need)
 346{
 347       struct cl_io_lock_link *scan;
 348
 349       list_for_each_entry(scan, queue, cill_linkage) {
 350               if (cl_lock_descr_cmp(&scan->cill_descr, need))
 351                       continue;
 352               cl_lock_descr_merge(&scan->cill_descr, need);
 353               CDEBUG(D_VFSTRACE, "lock: %d: [%lu, %lu]\n",
 354                      scan->cill_descr.cld_mode, scan->cill_descr.cld_start,
 355                      scan->cill_descr.cld_end);
 356               return +1;
 357       }
 358       return 0;
 359
 360}
 361
 362static int cl_lockset_match(const struct cl_lockset *set,
 363                            const struct cl_lock_descr *need)
 364{
 365        return cl_queue_match(&set->cls_curr, need) ||
 366               cl_queue_match(&set->cls_done, need);
 367}
 368
 369static int cl_lockset_merge(const struct cl_lockset *set,
 370                            const struct cl_lock_descr *need)
 371{
 372        return cl_queue_merge(&set->cls_todo, need) ||
 373               cl_lockset_match(set, need);
 374}
 375
 376static int cl_lockset_lock_one(const struct lu_env *env,
 377                               struct cl_io *io, struct cl_lockset *set,
 378                               struct cl_io_lock_link *link)
 379{
 380        struct cl_lock *lock;
 381        int          result;
 382
 383        lock = cl_lock_request(env, io, &link->cill_descr, "io", io);
 384
 385        if (!IS_ERR(lock)) {
 386                link->cill_lock = lock;
 387                list_move(&link->cill_linkage, &set->cls_curr);
 388                if (!(link->cill_descr.cld_enq_flags & CEF_ASYNC)) {
 389                        result = cl_wait(env, lock);
 390                        if (result == 0)
 391                                list_move(&link->cill_linkage,
 392                                              &set->cls_done);
 393                } else
 394                        result = 0;
 395        } else
 396                result = PTR_ERR(lock);
 397        return result;
 398}
 399
 400static void cl_lock_link_fini(const struct lu_env *env, struct cl_io *io,
 401                              struct cl_io_lock_link *link)
 402{
 403        struct cl_lock *lock = link->cill_lock;
 404
 405        list_del_init(&link->cill_linkage);
 406        if (lock != NULL) {
 407                cl_lock_release(env, lock, "io", io);
 408                link->cill_lock = NULL;
 409        }
 410        if (link->cill_fini != NULL)
 411                link->cill_fini(env, link);
 412}
 413
 414static int cl_lockset_lock(const struct lu_env *env, struct cl_io *io,
 415                           struct cl_lockset *set)
 416{
 417        struct cl_io_lock_link *link;
 418        struct cl_io_lock_link *temp;
 419        struct cl_lock   *lock;
 420        int result;
 421
 422        result = 0;
 423        list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
 424                if (!cl_lockset_match(set, &link->cill_descr)) {
 425                        /* XXX some locking to guarantee that locks aren't
 426                         * expanded in between. */
 427                        result = cl_lockset_lock_one(env, io, set, link);
 428                        if (result != 0)
 429                                break;
 430                } else
 431                        cl_lock_link_fini(env, io, link);
 432        }
 433        if (result == 0) {
 434                list_for_each_entry_safe(link, temp,
 435                                             &set->cls_curr, cill_linkage) {
 436                        lock = link->cill_lock;
 437                        result = cl_wait(env, lock);
 438                        if (result == 0)
 439                                list_move(&link->cill_linkage,
 440                                              &set->cls_done);
 441                        else
 442                                break;
 443                }
 444        }
 445        return result;
 446}
 447
 448/**
 449 * Takes locks necessary for the current iteration of io.
 450 *
 451 * Calls cl_io_operations::cio_lock() top-to-bottom to collect locks required
 452 * by layers for the current iteration. Then sort locks (to avoid dead-locks),
 453 * and acquire them.
 454 */
 455int cl_io_lock(const struct lu_env *env, struct cl_io *io)
 456{
 457        const struct cl_io_slice *scan;
 458        int result = 0;
 459
 460        LINVRNT(cl_io_is_loopable(io));
 461        LINVRNT(io->ci_state == CIS_IT_STARTED);
 462        LINVRNT(cl_io_invariant(io));
 463
 464        cl_io_for_each(scan, io) {
 465                if (scan->cis_iop->op[io->ci_type].cio_lock == NULL)
 466                        continue;
 467                result = scan->cis_iop->op[io->ci_type].cio_lock(env, scan);
 468                if (result != 0)
 469                        break;
 470        }
 471        if (result == 0) {
 472                cl_io_locks_sort(io);
 473                result = cl_lockset_lock(env, io, &io->ci_lockset);
 474        }
 475        if (result != 0)
 476                cl_io_unlock(env, io);
 477        else
 478                io->ci_state = CIS_LOCKED;
 479        return result;
 480}
 481EXPORT_SYMBOL(cl_io_lock);
 482
 483/**
 484 * Release locks takes by io.
 485 */
 486void cl_io_unlock(const struct lu_env *env, struct cl_io *io)
 487{
 488        struct cl_lockset       *set;
 489        struct cl_io_lock_link   *link;
 490        struct cl_io_lock_link   *temp;
 491        const struct cl_io_slice *scan;
 492
 493        LASSERT(cl_io_is_loopable(io));
 494        LASSERT(CIS_IT_STARTED <= io->ci_state && io->ci_state < CIS_UNLOCKED);
 495        LINVRNT(cl_io_invariant(io));
 496
 497        set = &io->ci_lockset;
 498
 499        list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage)
 500                cl_lock_link_fini(env, io, link);
 501
 502        list_for_each_entry_safe(link, temp, &set->cls_curr, cill_linkage)
 503                cl_lock_link_fini(env, io, link);
 504
 505        list_for_each_entry_safe(link, temp, &set->cls_done, cill_linkage) {
 506                cl_unuse(env, link->cill_lock);
 507                cl_lock_link_fini(env, io, link);
 508        }
 509        cl_io_for_each_reverse(scan, io) {
 510                if (scan->cis_iop->op[io->ci_type].cio_unlock != NULL)
 511                        scan->cis_iop->op[io->ci_type].cio_unlock(env, scan);
 512        }
 513        io->ci_state = CIS_UNLOCKED;
 514        LASSERT(!cl_env_info(env)->clt_counters[CNL_TOP].ctc_nr_locks_acquired);
 515}
 516EXPORT_SYMBOL(cl_io_unlock);
 517
 518/**
 519 * Prepares next iteration of io.
 520 *
 521 * Calls cl_io_operations::cio_iter_init() top-to-bottom. This exists to give
 522 * layers a chance to modify io parameters, e.g., so that lov can restrict io
 523 * to a single stripe.
 524 */
 525int cl_io_iter_init(const struct lu_env *env, struct cl_io *io)
 526{
 527        const struct cl_io_slice *scan;
 528        int result;
 529
 530        LINVRNT(cl_io_is_loopable(io));
 531        LINVRNT(io->ci_state == CIS_INIT || io->ci_state == CIS_IT_ENDED);
 532        LINVRNT(cl_io_invariant(io));
 533
 534        result = 0;
 535        cl_io_for_each(scan, io) {
 536                if (scan->cis_iop->op[io->ci_type].cio_iter_init == NULL)
 537                        continue;
 538                result = scan->cis_iop->op[io->ci_type].cio_iter_init(env,
 539                                                                      scan);
 540                if (result != 0)
 541                        break;
 542        }
 543        if (result == 0)
 544                io->ci_state = CIS_IT_STARTED;
 545        return result;
 546}
 547EXPORT_SYMBOL(cl_io_iter_init);
 548
 549/**
 550 * Finalizes io iteration.
 551 *
 552 * Calls cl_io_operations::cio_iter_fini() bottom-to-top.
 553 */
 554void cl_io_iter_fini(const struct lu_env *env, struct cl_io *io)
 555{
 556        const struct cl_io_slice *scan;
 557
 558        LINVRNT(cl_io_is_loopable(io));
 559        LINVRNT(io->ci_state == CIS_UNLOCKED);
 560        LINVRNT(cl_io_invariant(io));
 561
 562        cl_io_for_each_reverse(scan, io) {
 563                if (scan->cis_iop->op[io->ci_type].cio_iter_fini != NULL)
 564                        scan->cis_iop->op[io->ci_type].cio_iter_fini(env, scan);
 565        }
 566        io->ci_state = CIS_IT_ENDED;
 567}
 568EXPORT_SYMBOL(cl_io_iter_fini);
 569
 570/**
 571 * Records that read or write io progressed \a nob bytes forward.
 572 */
 573void cl_io_rw_advance(const struct lu_env *env, struct cl_io *io, size_t nob)
 574{
 575        const struct cl_io_slice *scan;
 576
 577        LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
 578                nob == 0);
 579        LINVRNT(cl_io_is_loopable(io));
 580        LINVRNT(cl_io_invariant(io));
 581
 582        io->u.ci_rw.crw_pos   += nob;
 583        io->u.ci_rw.crw_count -= nob;
 584
 585        /* layers have to be notified. */
 586        cl_io_for_each_reverse(scan, io) {
 587                if (scan->cis_iop->op[io->ci_type].cio_advance != NULL)
 588                        scan->cis_iop->op[io->ci_type].cio_advance(env, scan,
 589                                                                   nob);
 590        }
 591}
 592EXPORT_SYMBOL(cl_io_rw_advance);
 593
 594/**
 595 * Adds a lock to a lockset.
 596 */
 597int cl_io_lock_add(const struct lu_env *env, struct cl_io *io,
 598                   struct cl_io_lock_link *link)
 599{
 600        int result;
 601
 602        if (cl_lockset_merge(&io->ci_lockset, &link->cill_descr))
 603                result = +1;
 604        else {
 605                list_add(&link->cill_linkage, &io->ci_lockset.cls_todo);
 606                result = 0;
 607        }
 608        return result;
 609}
 610EXPORT_SYMBOL(cl_io_lock_add);
 611
 612static void cl_free_io_lock_link(const struct lu_env *env,
 613                                 struct cl_io_lock_link *link)
 614{
 615        OBD_FREE_PTR(link);
 616}
 617
 618/**
 619 * Allocates new lock link, and uses it to add a lock to a lockset.
 620 */
 621int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
 622                         struct cl_lock_descr *descr)
 623{
 624        struct cl_io_lock_link *link;
 625        int result;
 626
 627        OBD_ALLOC_PTR(link);
 628        if (link != NULL) {
 629                link->cill_descr     = *descr;
 630                link->cill_fini      = cl_free_io_lock_link;
 631                result = cl_io_lock_add(env, io, link);
 632                if (result) /* lock match */
 633                        link->cill_fini(env, link);
 634        } else
 635                result = -ENOMEM;
 636
 637        return result;
 638}
 639EXPORT_SYMBOL(cl_io_lock_alloc_add);
 640
 641/**
 642 * Starts io by calling cl_io_operations::cio_start() top-to-bottom.
 643 */
 644int cl_io_start(const struct lu_env *env, struct cl_io *io)
 645{
 646        const struct cl_io_slice *scan;
 647        int result = 0;
 648
 649        LINVRNT(cl_io_is_loopable(io));
 650        LINVRNT(io->ci_state == CIS_LOCKED);
 651        LINVRNT(cl_io_invariant(io));
 652
 653        io->ci_state = CIS_IO_GOING;
 654        cl_io_for_each(scan, io) {
 655                if (scan->cis_iop->op[io->ci_type].cio_start == NULL)
 656                        continue;
 657                result = scan->cis_iop->op[io->ci_type].cio_start(env, scan);
 658                if (result != 0)
 659                        break;
 660        }
 661        if (result >= 0)
 662                result = 0;
 663        return result;
 664}
 665EXPORT_SYMBOL(cl_io_start);
 666
 667/**
 668 * Wait until current io iteration is finished by calling
 669 * cl_io_operations::cio_end() bottom-to-top.
 670 */
 671void cl_io_end(const struct lu_env *env, struct cl_io *io)
 672{
 673        const struct cl_io_slice *scan;
 674
 675        LINVRNT(cl_io_is_loopable(io));
 676        LINVRNT(io->ci_state == CIS_IO_GOING);
 677        LINVRNT(cl_io_invariant(io));
 678
 679        cl_io_for_each_reverse(scan, io) {
 680                if (scan->cis_iop->op[io->ci_type].cio_end != NULL)
 681                        scan->cis_iop->op[io->ci_type].cio_end(env, scan);
 682                /* TODO: error handling. */
 683        }
 684        io->ci_state = CIS_IO_FINISHED;
 685}
 686EXPORT_SYMBOL(cl_io_end);
 687
 688static const struct cl_page_slice *
 689cl_io_slice_page(const struct cl_io_slice *ios, struct cl_page *page)
 690{
 691        const struct cl_page_slice *slice;
 692
 693        slice = cl_page_at(page, ios->cis_obj->co_lu.lo_dev->ld_type);
 694        LINVRNT(slice != NULL);
 695        return slice;
 696}
 697
 698/**
 699 * True iff \a page is within \a io range.
 700 */
 701static int cl_page_in_io(const struct cl_page *page, const struct cl_io *io)
 702{
 703        int     result = 1;
 704        loff_t  start;
 705        loff_t  end;
 706        pgoff_t idx;
 707
 708        idx = page->cp_index;
 709        switch (io->ci_type) {
 710        case CIT_READ:
 711        case CIT_WRITE:
 712                /*
 713                 * check that [start, end) and [pos, pos + count) extents
 714                 * overlap.
 715                 */
 716                if (!cl_io_is_append(io)) {
 717                        const struct cl_io_rw_common *crw = &(io->u.ci_rw);
 718                        start = cl_offset(page->cp_obj, idx);
 719                        end   = cl_offset(page->cp_obj, idx + 1);
 720                        result = crw->crw_pos < end &&
 721                                 start < crw->crw_pos + crw->crw_count;
 722                }
 723                break;
 724        case CIT_FAULT:
 725                result = io->u.ci_fault.ft_index == idx;
 726                break;
 727        default:
 728                LBUG();
 729        }
 730        return result;
 731}
 732
 733/**
 734 * Called by read io, when page has to be read from the server.
 735 *
 736 * \see cl_io_operations::cio_read_page()
 737 */
 738int cl_io_read_page(const struct lu_env *env, struct cl_io *io,
 739                    struct cl_page *page)
 740{
 741        const struct cl_io_slice *scan;
 742        struct cl_2queue         *queue;
 743        int                    result = 0;
 744
 745        LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_FAULT);
 746        LINVRNT(cl_page_is_owned(page, io));
 747        LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
 748        LINVRNT(cl_page_in_io(page, io));
 749        LINVRNT(cl_io_invariant(io));
 750
 751        queue = &io->ci_queue;
 752
 753        cl_2queue_init(queue);
 754        /*
 755         * ->cio_read_page() methods called in the loop below are supposed to
 756         * never block waiting for network (the only subtle point is the
 757         * creation of new pages for read-ahead that might result in cache
 758         * shrinking, but currently only clean pages are shrunk and this
 759         * requires no network io).
 760         *
 761         * Should this ever starts blocking, retry loop would be needed for
 762         * "parallel io" (see CLO_REPEAT loops in cl_lock.c).
 763         */
 764        cl_io_for_each(scan, io) {
 765                if (scan->cis_iop->cio_read_page != NULL) {
 766                        const struct cl_page_slice *slice;
 767
 768                        slice = cl_io_slice_page(scan, page);
 769                        LINVRNT(slice != NULL);
 770                        result = scan->cis_iop->cio_read_page(env, scan, slice);
 771                        if (result != 0)
 772                                break;
 773                }
 774        }
 775        if (result == 0)
 776                result = cl_io_submit_rw(env, io, CRT_READ, queue);
 777        /*
 778         * Unlock unsent pages in case of error.
 779         */
 780        cl_page_list_disown(env, io, &queue->c2_qin);
 781        cl_2queue_fini(env, queue);
 782        return result;
 783}
 784EXPORT_SYMBOL(cl_io_read_page);
 785
 786/**
 787 * Called by write io to prepare page to receive data from user buffer.
 788 *
 789 * \see cl_io_operations::cio_prepare_write()
 790 */
 791int cl_io_prepare_write(const struct lu_env *env, struct cl_io *io,
 792                        struct cl_page *page, unsigned from, unsigned to)
 793{
 794        const struct cl_io_slice *scan;
 795        int result = 0;
 796
 797        LINVRNT(io->ci_type == CIT_WRITE);
 798        LINVRNT(cl_page_is_owned(page, io));
 799        LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
 800        LINVRNT(cl_io_invariant(io));
 801        LASSERT(cl_page_in_io(page, io));
 802
 803        cl_io_for_each_reverse(scan, io) {
 804                if (scan->cis_iop->cio_prepare_write != NULL) {
 805                        const struct cl_page_slice *slice;
 806
 807                        slice = cl_io_slice_page(scan, page);
 808                        result = scan->cis_iop->cio_prepare_write(env, scan,
 809                                                                  slice,
 810                                                                  from, to);
 811                        if (result != 0)
 812                                break;
 813                }
 814        }
 815        return result;
 816}
 817EXPORT_SYMBOL(cl_io_prepare_write);
 818
 819/**
 820 * Called by write io after user data were copied into a page.
 821 *
 822 * \see cl_io_operations::cio_commit_write()
 823 */
 824int cl_io_commit_write(const struct lu_env *env, struct cl_io *io,
 825                       struct cl_page *page, unsigned from, unsigned to)
 826{
 827        const struct cl_io_slice *scan;
 828        int result = 0;
 829
 830        LINVRNT(io->ci_type == CIT_WRITE);
 831        LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
 832        LINVRNT(cl_io_invariant(io));
 833        /*
 834         * XXX Uh... not nice. Top level cl_io_commit_write() call (vvp->lov)
 835         * already called cl_page_cache_add(), moving page into CPS_CACHED
 836         * state. Better (and more general) way of dealing with such situation
 837         * is needed.
 838         */
 839        LASSERT(cl_page_is_owned(page, io) || page->cp_parent != NULL);
 840        LASSERT(cl_page_in_io(page, io));
 841
 842        cl_io_for_each(scan, io) {
 843                if (scan->cis_iop->cio_commit_write != NULL) {
 844                        const struct cl_page_slice *slice;
 845
 846                        slice = cl_io_slice_page(scan, page);
 847                        result = scan->cis_iop->cio_commit_write(env, scan,
 848                                                                 slice,
 849                                                                 from, to);
 850                        if (result != 0)
 851                                break;
 852                }
 853        }
 854        LINVRNT(result <= 0);
 855        return result;
 856}
 857EXPORT_SYMBOL(cl_io_commit_write);
 858
 859/**
 860 * Submits a list of pages for immediate io.
 861 *
 862 * After the function gets returned, The submitted pages are moved to
 863 * queue->c2_qout queue, and queue->c2_qin contain both the pages don't need
 864 * to be submitted, and the pages are errant to submit.
 865 *
 866 * \returns 0 if at least one page was submitted, error code otherwise.
 867 * \see cl_io_operations::cio_submit()
 868 */
 869int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
 870                    enum cl_req_type crt, struct cl_2queue *queue)
 871{
 872        const struct cl_io_slice *scan;
 873        int result = 0;
 874
 875        LINVRNT(crt < ARRAY_SIZE(scan->cis_iop->req_op));
 876
 877        cl_io_for_each(scan, io) {
 878                if (scan->cis_iop->req_op[crt].cio_submit == NULL)
 879                        continue;
 880                result = scan->cis_iop->req_op[crt].cio_submit(env, scan, crt,
 881                                                               queue);
 882                if (result != 0)
 883                        break;
 884        }
 885        /*
 886         * If ->cio_submit() failed, no pages were sent.
 887         */
 888        LASSERT(ergo(result != 0, list_empty(&queue->c2_qout.pl_pages)));
 889        return result;
 890}
 891EXPORT_SYMBOL(cl_io_submit_rw);
 892
 893/**
 894 * Submit a sync_io and wait for the IO to be finished, or error happens.
 895 * If \a timeout is zero, it means to wait for the IO unconditionally.
 896 */
 897int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
 898                      enum cl_req_type iot, struct cl_2queue *queue,
 899                      long timeout)
 900{
 901        struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
 902        struct cl_page *pg;
 903        int rc;
 904
 905        cl_page_list_for_each(pg, &queue->c2_qin) {
 906                LASSERT(pg->cp_sync_io == NULL);
 907                pg->cp_sync_io = anchor;
 908        }
 909
 910        cl_sync_io_init(anchor, queue->c2_qin.pl_nr);
 911        rc = cl_io_submit_rw(env, io, iot, queue);
 912        if (rc == 0) {
 913                /*
 914                 * If some pages weren't sent for any reason (e.g.,
 915                 * read found up-to-date pages in the cache, or write found
 916                 * clean pages), count them as completed to avoid infinite
 917                 * wait.
 918                 */
 919                 cl_page_list_for_each(pg, &queue->c2_qin) {
 920                        pg->cp_sync_io = NULL;
 921                        cl_sync_io_note(anchor, +1);
 922                 }
 923
 924                 /* wait for the IO to be finished. */
 925                 rc = cl_sync_io_wait(env, io, &queue->c2_qout,
 926                                      anchor, timeout);
 927        } else {
 928                LASSERT(list_empty(&queue->c2_qout.pl_pages));
 929                cl_page_list_for_each(pg, &queue->c2_qin)
 930                        pg->cp_sync_io = NULL;
 931        }
 932        return rc;
 933}
 934EXPORT_SYMBOL(cl_io_submit_sync);
 935
 936/**
 937 * Cancel an IO which has been submitted by cl_io_submit_rw.
 938 */
 939int cl_io_cancel(const struct lu_env *env, struct cl_io *io,
 940                 struct cl_page_list *queue)
 941{
 942        struct cl_page *page;
 943        int result = 0;
 944
 945        CERROR("Canceling ongoing page trasmission\n");
 946        cl_page_list_for_each(page, queue) {
 947                int rc;
 948
 949                LINVRNT(cl_page_in_io(page, io));
 950                rc = cl_page_cancel(env, page);
 951                result = result ?: rc;
 952        }
 953        return result;
 954}
 955EXPORT_SYMBOL(cl_io_cancel);
 956
 957/**
 958 * Main io loop.
 959 *
 960 * Pumps io through iterations calling
 961 *
 962 *    - cl_io_iter_init()
 963 *
 964 *    - cl_io_lock()
 965 *
 966 *    - cl_io_start()
 967 *
 968 *    - cl_io_end()
 969 *
 970 *    - cl_io_unlock()
 971 *
 972 *    - cl_io_iter_fini()
 973 *
 974 * repeatedly until there is no more io to do.
 975 */
 976int cl_io_loop(const struct lu_env *env, struct cl_io *io)
 977{
 978        int result   = 0;
 979
 980        LINVRNT(cl_io_is_loopable(io));
 981
 982        do {
 983                size_t nob;
 984
 985                io->ci_continue = 0;
 986                result = cl_io_iter_init(env, io);
 987                if (result == 0) {
 988                        nob    = io->ci_nob;
 989                        result = cl_io_lock(env, io);
 990                        if (result == 0) {
 991                                /*
 992                                 * Notify layers that locks has been taken,
 993                                 * and do actual i/o.
 994                                 *
 995                                 *   - llite: kms, short read;
 996                                 *   - llite: generic_file_read();
 997                                 */
 998                                result = cl_io_start(env, io);
 999                                /*
1000                                 * Send any remaining pending
1001                                 * io, etc.
1002                                 *
1003                                 *   - llite: ll_rw_stats_tally.
1004                                 */
1005                                cl_io_end(env, io);
1006                                cl_io_unlock(env, io);
1007                                cl_io_rw_advance(env, io, io->ci_nob - nob);
1008                        }
1009                }
1010                cl_io_iter_fini(env, io);
1011        } while (result == 0 && io->ci_continue);
1012        if (result == 0)
1013                result = io->ci_result;
1014        return result < 0 ? result : 0;
1015}
1016EXPORT_SYMBOL(cl_io_loop);
1017
1018/**
1019 * Adds io slice to the cl_io.
1020 *
1021 * This is called by cl_object_operations::coo_io_init() methods to add a
1022 * per-layer state to the io. New state is added at the end of
1023 * cl_io::ci_layers list, that is, it is at the bottom of the stack.
1024 *
1025 * \see cl_lock_slice_add(), cl_req_slice_add(), cl_page_slice_add()
1026 */
1027void cl_io_slice_add(struct cl_io *io, struct cl_io_slice *slice,
1028                     struct cl_object *obj,
1029                     const struct cl_io_operations *ops)
1030{
1031        struct list_head *linkage = &slice->cis_linkage;
1032
1033        LASSERT((linkage->prev == NULL && linkage->next == NULL) ||
1034                list_empty(linkage));
1035
1036        list_add_tail(linkage, &io->ci_layers);
1037        slice->cis_io  = io;
1038        slice->cis_obj = obj;
1039        slice->cis_iop = ops;
1040}
1041EXPORT_SYMBOL(cl_io_slice_add);
1042
1043
1044/**
1045 * Initializes page list.
1046 */
1047void cl_page_list_init(struct cl_page_list *plist)
1048{
1049        plist->pl_nr = 0;
1050        INIT_LIST_HEAD(&plist->pl_pages);
1051        plist->pl_owner = current;
1052}
1053EXPORT_SYMBOL(cl_page_list_init);
1054
1055/**
1056 * Adds a page to a page list.
1057 */
1058void cl_page_list_add(struct cl_page_list *plist, struct cl_page *page)
1059{
1060        /* it would be better to check that page is owned by "current" io, but
1061         * it is not passed here. */
1062        LASSERT(page->cp_owner != NULL);
1063        LINVRNT(plist->pl_owner == current);
1064
1065        lockdep_off();
1066        mutex_lock(&page->cp_mutex);
1067        lockdep_on();
1068        LASSERT(list_empty(&page->cp_batch));
1069        list_add_tail(&page->cp_batch, &plist->pl_pages);
1070        ++plist->pl_nr;
1071        lu_ref_add_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
1072        cl_page_get(page);
1073}
1074EXPORT_SYMBOL(cl_page_list_add);
1075
1076/**
1077 * Removes a page from a page list.
1078 */
1079void cl_page_list_del(const struct lu_env *env,
1080                      struct cl_page_list *plist, struct cl_page *page)
1081{
1082        LASSERT(plist->pl_nr > 0);
1083        LINVRNT(plist->pl_owner == current);
1084
1085        list_del_init(&page->cp_batch);
1086        lockdep_off();
1087        mutex_unlock(&page->cp_mutex);
1088        lockdep_on();
1089        --plist->pl_nr;
1090        lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
1091        cl_page_put(env, page);
1092}
1093EXPORT_SYMBOL(cl_page_list_del);
1094
1095/**
1096 * Moves a page from one page list to another.
1097 */
1098void cl_page_list_move(struct cl_page_list *dst, struct cl_page_list *src,
1099                       struct cl_page *page)
1100{
1101        LASSERT(src->pl_nr > 0);
1102        LINVRNT(dst->pl_owner == current);
1103        LINVRNT(src->pl_owner == current);
1104
1105        list_move_tail(&page->cp_batch, &dst->pl_pages);
1106        --src->pl_nr;
1107        ++dst->pl_nr;
1108        lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref, "queue",
1109                      src, dst);
1110}
1111EXPORT_SYMBOL(cl_page_list_move);
1112
1113/**
1114 * splice the cl_page_list, just as list head does
1115 */
1116void cl_page_list_splice(struct cl_page_list *list, struct cl_page_list *head)
1117{
1118        struct cl_page *page;
1119        struct cl_page *tmp;
1120
1121        LINVRNT(list->pl_owner == current);
1122        LINVRNT(head->pl_owner == current);
1123
1124        cl_page_list_for_each_safe(page, tmp, list)
1125                cl_page_list_move(head, list, page);
1126}
1127EXPORT_SYMBOL(cl_page_list_splice);
1128
1129void cl_page_disown0(const struct lu_env *env,
1130                     struct cl_io *io, struct cl_page *pg);
1131
1132/**
1133 * Disowns pages in a queue.
1134 */
1135void cl_page_list_disown(const struct lu_env *env,
1136                         struct cl_io *io, struct cl_page_list *plist)
1137{
1138        struct cl_page *page;
1139        struct cl_page *temp;
1140
1141        LINVRNT(plist->pl_owner == current);
1142
1143        cl_page_list_for_each_safe(page, temp, plist) {
1144                LASSERT(plist->pl_nr > 0);
1145
1146                list_del_init(&page->cp_batch);
1147                lockdep_off();
1148                mutex_unlock(&page->cp_mutex);
1149                lockdep_on();
1150                --plist->pl_nr;
1151                /*
1152                 * cl_page_disown0 rather than usual cl_page_disown() is used,
1153                 * because pages are possibly in CPS_FREEING state already due
1154                 * to the call to cl_page_list_discard().
1155                 */
1156                /*
1157                 * XXX cl_page_disown0() will fail if page is not locked.
1158                 */
1159                cl_page_disown0(env, io, page);
1160                lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue",
1161                              plist);
1162                cl_page_put(env, page);
1163        }
1164}
1165EXPORT_SYMBOL(cl_page_list_disown);
1166
1167/**
1168 * Releases pages from queue.
1169 */
1170void cl_page_list_fini(const struct lu_env *env, struct cl_page_list *plist)
1171{
1172        struct cl_page *page;
1173        struct cl_page *temp;
1174
1175        LINVRNT(plist->pl_owner == current);
1176
1177        cl_page_list_for_each_safe(page, temp, plist)
1178                cl_page_list_del(env, plist, page);
1179        LASSERT(plist->pl_nr == 0);
1180}
1181EXPORT_SYMBOL(cl_page_list_fini);
1182
1183/**
1184 * Owns all pages in a queue.
1185 */
1186int cl_page_list_own(const struct lu_env *env,
1187                     struct cl_io *io, struct cl_page_list *plist)
1188{
1189        struct cl_page *page;
1190        struct cl_page *temp;
1191        pgoff_t index = 0;
1192        int result;
1193
1194        LINVRNT(plist->pl_owner == current);
1195
1196        result = 0;
1197        cl_page_list_for_each_safe(page, temp, plist) {
1198                LASSERT(index <= page->cp_index);
1199                index = page->cp_index;
1200                if (cl_page_own(env, io, page) == 0)
1201                        result = result ?: page->cp_error;
1202                else
1203                        cl_page_list_del(env, plist, page);
1204        }
1205        return result;
1206}
1207EXPORT_SYMBOL(cl_page_list_own);
1208
1209/**
1210 * Assumes all pages in a queue.
1211 */
1212void cl_page_list_assume(const struct lu_env *env,
1213                         struct cl_io *io, struct cl_page_list *plist)
1214{
1215        struct cl_page *page;
1216
1217        LINVRNT(plist->pl_owner == current);
1218
1219        cl_page_list_for_each(page, plist)
1220                cl_page_assume(env, io, page);
1221}
1222EXPORT_SYMBOL(cl_page_list_assume);
1223
1224/**
1225 * Discards all pages in a queue.
1226 */
1227void cl_page_list_discard(const struct lu_env *env, struct cl_io *io,
1228                          struct cl_page_list *plist)
1229{
1230        struct cl_page *page;
1231
1232        LINVRNT(plist->pl_owner == current);
1233        cl_page_list_for_each(page, plist)
1234                cl_page_discard(env, io, page);
1235}
1236EXPORT_SYMBOL(cl_page_list_discard);
1237
1238/**
1239 * Unmaps all pages in a queue from user virtual memory.
1240 */
1241int cl_page_list_unmap(const struct lu_env *env, struct cl_io *io,
1242                        struct cl_page_list *plist)
1243{
1244        struct cl_page *page;
1245        int result;
1246
1247        LINVRNT(plist->pl_owner == current);
1248        result = 0;
1249        cl_page_list_for_each(page, plist) {
1250                result = cl_page_unmap(env, io, page);
1251                if (result != 0)
1252                        break;
1253        }
1254        return result;
1255}
1256EXPORT_SYMBOL(cl_page_list_unmap);
1257
1258/**
1259 * Initialize dual page queue.
1260 */
1261void cl_2queue_init(struct cl_2queue *queue)
1262{
1263        cl_page_list_init(&queue->c2_qin);
1264        cl_page_list_init(&queue->c2_qout);
1265}
1266EXPORT_SYMBOL(cl_2queue_init);
1267
1268/**
1269 * Add a page to the incoming page list of 2-queue.
1270 */
1271void cl_2queue_add(struct cl_2queue *queue, struct cl_page *page)
1272{
1273        cl_page_list_add(&queue->c2_qin, page);
1274}
1275EXPORT_SYMBOL(cl_2queue_add);
1276
1277/**
1278 * Disown pages in both lists of a 2-queue.
1279 */
1280void cl_2queue_disown(const struct lu_env *env,
1281                      struct cl_io *io, struct cl_2queue *queue)
1282{
1283        cl_page_list_disown(env, io, &queue->c2_qin);
1284        cl_page_list_disown(env, io, &queue->c2_qout);
1285}
1286EXPORT_SYMBOL(cl_2queue_disown);
1287
1288/**
1289 * Discard (truncate) pages in both lists of a 2-queue.
1290 */
1291void cl_2queue_discard(const struct lu_env *env,
1292                       struct cl_io *io, struct cl_2queue *queue)
1293{
1294        cl_page_list_discard(env, io, &queue->c2_qin);
1295        cl_page_list_discard(env, io, &queue->c2_qout);
1296}
1297EXPORT_SYMBOL(cl_2queue_discard);
1298
1299/**
1300 * Assume to own the pages in cl_2queue
1301 */
1302void cl_2queue_assume(const struct lu_env *env,
1303                      struct cl_io *io, struct cl_2queue *queue)
1304{
1305        cl_page_list_assume(env, io, &queue->c2_qin);
1306        cl_page_list_assume(env, io, &queue->c2_qout);
1307}
1308EXPORT_SYMBOL(cl_2queue_assume);
1309
1310/**
1311 * Finalize both page lists of a 2-queue.
1312 */
1313void cl_2queue_fini(const struct lu_env *env, struct cl_2queue *queue)
1314{
1315        cl_page_list_fini(env, &queue->c2_qout);
1316        cl_page_list_fini(env, &queue->c2_qin);
1317}
1318EXPORT_SYMBOL(cl_2queue_fini);
1319
1320/**
1321 * Initialize a 2-queue to contain \a page in its incoming page list.
1322 */
1323void cl_2queue_init_page(struct cl_2queue *queue, struct cl_page *page)
1324{
1325        cl_2queue_init(queue);
1326        cl_2queue_add(queue, page);
1327}
1328EXPORT_SYMBOL(cl_2queue_init_page);
1329
1330/**
1331 * Returns top-level io.
1332 *
1333 * \see cl_object_top(), cl_page_top().
1334 */
1335struct cl_io *cl_io_top(struct cl_io *io)
1336{
1337        while (io->ci_parent != NULL)
1338                io = io->ci_parent;
1339        return io;
1340}
1341EXPORT_SYMBOL(cl_io_top);
1342
1343/**
1344 * Prints human readable representation of \a io to the \a f.
1345 */
1346void cl_io_print(const struct lu_env *env, void *cookie,
1347                 lu_printer_t printer, const struct cl_io *io)
1348{
1349}
1350
1351/**
1352 * Adds request slice to the compound request.
1353 *
1354 * This is called by cl_device_operations::cdo_req_init() methods to add a
1355 * per-layer state to the request. New state is added at the end of
1356 * cl_req::crq_layers list, that is, it is at the bottom of the stack.
1357 *
1358 * \see cl_lock_slice_add(), cl_page_slice_add(), cl_io_slice_add()
1359 */
1360void cl_req_slice_add(struct cl_req *req, struct cl_req_slice *slice,
1361                      struct cl_device *dev,
1362                      const struct cl_req_operations *ops)
1363{
1364        list_add_tail(&slice->crs_linkage, &req->crq_layers);
1365        slice->crs_dev = dev;
1366        slice->crs_ops = ops;
1367        slice->crs_req = req;
1368}
1369EXPORT_SYMBOL(cl_req_slice_add);
1370
1371static void cl_req_free(const struct lu_env *env, struct cl_req *req)
1372{
1373        unsigned i;
1374
1375        LASSERT(list_empty(&req->crq_pages));
1376        LASSERT(req->crq_nrpages == 0);
1377        LINVRNT(list_empty(&req->crq_layers));
1378        LINVRNT(equi(req->crq_nrobjs > 0, req->crq_o != NULL));
1379
1380        if (req->crq_o != NULL) {
1381                for (i = 0; i < req->crq_nrobjs; ++i) {
1382                        struct cl_object *obj = req->crq_o[i].ro_obj;
1383                        if (obj != NULL) {
1384                                lu_object_ref_del_at(&obj->co_lu,
1385                                                     &req->crq_o[i].ro_obj_ref,
1386                                                     "cl_req", req);
1387                                cl_object_put(env, obj);
1388                        }
1389                }
1390                OBD_FREE(req->crq_o, req->crq_nrobjs * sizeof req->crq_o[0]);
1391        }
1392        OBD_FREE_PTR(req);
1393}
1394
1395static int cl_req_init(const struct lu_env *env, struct cl_req *req,
1396                       struct cl_page *page)
1397{
1398        struct cl_device     *dev;
1399        struct cl_page_slice *slice;
1400        int result;
1401
1402        result = 0;
1403        page = cl_page_top(page);
1404        do {
1405                list_for_each_entry(slice, &page->cp_layers, cpl_linkage) {
1406                        dev = lu2cl_dev(slice->cpl_obj->co_lu.lo_dev);
1407                        if (dev->cd_ops->cdo_req_init != NULL) {
1408                                result = dev->cd_ops->cdo_req_init(env,
1409                                                                   dev, req);
1410                                if (result != 0)
1411                                        break;
1412                        }
1413                }
1414                page = page->cp_child;
1415        } while (page != NULL && result == 0);
1416        return result;
1417}
1418
1419/**
1420 * Invokes per-request transfer completion call-backs
1421 * (cl_req_operations::cro_completion()) bottom-to-top.
1422 */
1423void cl_req_completion(const struct lu_env *env, struct cl_req *req, int rc)
1424{
1425        struct cl_req_slice *slice;
1426
1427        /*
1428         * for the lack of list_for_each_entry_reverse_safe()...
1429         */
1430        while (!list_empty(&req->crq_layers)) {
1431                slice = list_entry(req->crq_layers.prev,
1432                                       struct cl_req_slice, crs_linkage);
1433                list_del_init(&slice->crs_linkage);
1434                if (slice->crs_ops->cro_completion != NULL)
1435                        slice->crs_ops->cro_completion(env, slice, rc);
1436        }
1437        cl_req_free(env, req);
1438}
1439EXPORT_SYMBOL(cl_req_completion);
1440
1441/**
1442 * Allocates new transfer request.
1443 */
1444struct cl_req *cl_req_alloc(const struct lu_env *env, struct cl_page *page,
1445                            enum cl_req_type crt, int nr_objects)
1446{
1447        struct cl_req *req;
1448
1449        LINVRNT(nr_objects > 0);
1450
1451        OBD_ALLOC_PTR(req);
1452        if (req != NULL) {
1453                int result;
1454
1455                OBD_ALLOC(req->crq_o, nr_objects * sizeof req->crq_o[0]);
1456                if (req->crq_o != NULL) {
1457                        req->crq_nrobjs = nr_objects;
1458                        req->crq_type = crt;
1459                        INIT_LIST_HEAD(&req->crq_pages);
1460                        INIT_LIST_HEAD(&req->crq_layers);
1461                        result = cl_req_init(env, req, page);
1462                } else
1463                        result = -ENOMEM;
1464                if (result != 0) {
1465                        cl_req_completion(env, req, result);
1466                        req = ERR_PTR(result);
1467                }
1468        } else
1469                req = ERR_PTR(-ENOMEM);
1470        return req;
1471}
1472EXPORT_SYMBOL(cl_req_alloc);
1473
1474/**
1475 * Adds a page to a request.
1476 */
1477void cl_req_page_add(const struct lu_env *env,
1478                     struct cl_req *req, struct cl_page *page)
1479{
1480        struct cl_object  *obj;
1481        struct cl_req_obj *rqo;
1482        int i;
1483
1484        page = cl_page_top(page);
1485
1486        LASSERT(list_empty(&page->cp_flight));
1487        LASSERT(page->cp_req == NULL);
1488
1489        CL_PAGE_DEBUG(D_PAGE, env, page, "req %p, %d, %u\n",
1490                      req, req->crq_type, req->crq_nrpages);
1491
1492        list_add_tail(&page->cp_flight, &req->crq_pages);
1493        ++req->crq_nrpages;
1494        page->cp_req = req;
1495        obj = cl_object_top(page->cp_obj);
1496        for (i = 0, rqo = req->crq_o; obj != rqo->ro_obj; ++i, ++rqo) {
1497                if (rqo->ro_obj == NULL) {
1498                        rqo->ro_obj = obj;
1499                        cl_object_get(obj);
1500                        lu_object_ref_add_at(&obj->co_lu, &rqo->ro_obj_ref,
1501                                             "cl_req", req);
1502                        break;
1503                }
1504        }
1505        LASSERT(i < req->crq_nrobjs);
1506}
1507EXPORT_SYMBOL(cl_req_page_add);
1508
1509/**
1510 * Removes a page from a request.
1511 */
1512void cl_req_page_done(const struct lu_env *env, struct cl_page *page)
1513{
1514        struct cl_req *req = page->cp_req;
1515
1516        page = cl_page_top(page);
1517
1518        LASSERT(!list_empty(&page->cp_flight));
1519        LASSERT(req->crq_nrpages > 0);
1520
1521        list_del_init(&page->cp_flight);
1522        --req->crq_nrpages;
1523        page->cp_req = NULL;
1524}
1525EXPORT_SYMBOL(cl_req_page_done);
1526
1527/**
1528 * Notifies layers that request is about to depart by calling
1529 * cl_req_operations::cro_prep() top-to-bottom.
1530 */
1531int cl_req_prep(const struct lu_env *env, struct cl_req *req)
1532{
1533        int i;
1534        int result;
1535        const struct cl_req_slice *slice;
1536
1537        /*
1538         * Check that the caller of cl_req_alloc() didn't lie about the number
1539         * of objects.
1540         */
1541        for (i = 0; i < req->crq_nrobjs; ++i)
1542                LASSERT(req->crq_o[i].ro_obj != NULL);
1543
1544        result = 0;
1545        list_for_each_entry(slice, &req->crq_layers, crs_linkage) {
1546                if (slice->crs_ops->cro_prep != NULL) {
1547                        result = slice->crs_ops->cro_prep(env, slice);
1548                        if (result != 0)
1549                                break;
1550                }
1551        }
1552        return result;
1553}
1554EXPORT_SYMBOL(cl_req_prep);
1555
1556/**
1557 * Fills in attributes that are passed to server together with transfer. Only
1558 * attributes from \a flags may be touched. This can be called multiple times
1559 * for the same request.
1560 */
1561void cl_req_attr_set(const struct lu_env *env, struct cl_req *req,
1562                     struct cl_req_attr *attr, obd_valid flags)
1563{
1564        const struct cl_req_slice *slice;
1565        struct cl_page      *page;
1566        int i;
1567
1568        LASSERT(!list_empty(&req->crq_pages));
1569
1570        /* Take any page to use as a model. */
1571        page = list_entry(req->crq_pages.next, struct cl_page, cp_flight);
1572
1573        for (i = 0; i < req->crq_nrobjs; ++i) {
1574                list_for_each_entry(slice, &req->crq_layers, crs_linkage) {
1575                        const struct cl_page_slice *scan;
1576                        const struct cl_object     *obj;
1577
1578                        scan = cl_page_at(page,
1579                                          slice->crs_dev->cd_lu_dev.ld_type);
1580                        LASSERT(scan != NULL);
1581                        obj = scan->cpl_obj;
1582                        if (slice->crs_ops->cro_attr_set != NULL)
1583                                slice->crs_ops->cro_attr_set(env, slice, obj,
1584                                                             attr + i, flags);
1585                }
1586        }
1587}
1588EXPORT_SYMBOL(cl_req_attr_set);
1589
1590/* XXX complete(), init_completion(), and wait_for_completion(), until they are
1591 * implemented in libcfs. */
1592# include <linux/sched.h>
1593
1594/**
1595 * Initialize synchronous io wait anchor, for transfer of \a nrpages pages.
1596 */
1597void cl_sync_io_init(struct cl_sync_io *anchor, int nrpages)
1598{
1599        init_waitqueue_head(&anchor->csi_waitq);
1600        atomic_set(&anchor->csi_sync_nr, nrpages);
1601        atomic_set(&anchor->csi_barrier, nrpages > 0);
1602        anchor->csi_sync_rc = 0;
1603}
1604EXPORT_SYMBOL(cl_sync_io_init);
1605
1606/**
1607 * Wait until all transfer completes. Transfer completion routine has to call
1608 * cl_sync_io_note() for every page.
1609 */
1610int cl_sync_io_wait(const struct lu_env *env, struct cl_io *io,
1611                    struct cl_page_list *queue, struct cl_sync_io *anchor,
1612                    long timeout)
1613{
1614        struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
1615                                                  NULL, NULL, NULL);
1616        int rc;
1617
1618        LASSERT(timeout >= 0);
1619
1620        rc = l_wait_event(anchor->csi_waitq,
1621                          atomic_read(&anchor->csi_sync_nr) == 0,
1622                          &lwi);
1623        if (rc < 0) {
1624                CERROR("SYNC IO failed with error: %d, try to cancel "
1625                       "%d remaining pages\n",
1626                       rc, atomic_read(&anchor->csi_sync_nr));
1627
1628                (void)cl_io_cancel(env, io, queue);
1629
1630                lwi = (struct l_wait_info) { 0 };
1631                (void)l_wait_event(anchor->csi_waitq,
1632                                   atomic_read(&anchor->csi_sync_nr) == 0,
1633                                   &lwi);
1634        } else {
1635                rc = anchor->csi_sync_rc;
1636        }
1637        LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
1638        cl_page_list_assume(env, io, queue);
1639
1640        /* wait until cl_sync_io_note() has done wakeup */
1641        while (unlikely(atomic_read(&anchor->csi_barrier) != 0)) {
1642                cpu_relax();
1643        }
1644
1645        POISON(anchor, 0x5a, sizeof *anchor);
1646        return rc;
1647}
1648EXPORT_SYMBOL(cl_sync_io_wait);
1649
1650/**
1651 * Indicate that transfer of a single page completed.
1652 */
1653void cl_sync_io_note(struct cl_sync_io *anchor, int ioret)
1654{
1655        if (anchor->csi_sync_rc == 0 && ioret < 0)
1656                anchor->csi_sync_rc = ioret;
1657        /*
1658         * Synchronous IO done without releasing page lock (e.g., as a part of
1659         * ->{prepare,commit}_write(). Completion is used to signal the end of
1660         * IO.
1661         */
1662        LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
1663        if (atomic_dec_and_test(&anchor->csi_sync_nr)) {
1664                wake_up_all(&anchor->csi_waitq);
1665                /* it's safe to nuke or reuse anchor now */
1666                atomic_set(&anchor->csi_barrier, 0);
1667        }
1668}
1669EXPORT_SYMBOL(cl_sync_io_note);
1670