linux/drivers/staging/lustre/lustre/obdclass/cl_io.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2015, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * Client IO.
  37 *
  38 *   Author: Nikita Danilov <nikita.danilov@sun.com>
  39 */
  40
  41#define DEBUG_SUBSYSTEM S_CLASS
  42
  43#include "../include/obd_class.h"
  44#include "../include/obd_support.h"
  45#include "../include/lustre_fid.h"
  46#include <linux/list.h>
  47#include <linux/sched.h>
  48#include "../include/cl_object.h"
  49#include "cl_internal.h"
  50
  51/*****************************************************************************
  52 *
  53 * cl_io interface.
  54 *
  55 */
  56
  57#define cl_io_for_each(slice, io) \
  58        list_for_each_entry((slice), &io->ci_layers, cis_linkage)
  59#define cl_io_for_each_reverse(slice, io)                \
  60        list_for_each_entry_reverse((slice), &io->ci_layers, cis_linkage)
  61
  62static inline int cl_io_type_is_valid(enum cl_io_type type)
  63{
  64        return CIT_READ <= type && type < CIT_OP_NR;
  65}
  66
  67static inline int cl_io_is_loopable(const struct cl_io *io)
  68{
  69        return cl_io_type_is_valid(io->ci_type) && io->ci_type != CIT_MISC;
  70}
  71
  72/**
  73 * Returns true iff there is an IO ongoing in the given environment.
  74 */
  75int cl_io_is_going(const struct lu_env *env)
  76{
  77        return cl_env_info(env)->clt_current_io != NULL;
  78}
  79EXPORT_SYMBOL(cl_io_is_going);
  80
  81/**
  82 * cl_io invariant that holds at all times when exported cl_io_*() functions
  83 * are entered and left.
  84 */
  85static int cl_io_invariant(const struct cl_io *io)
  86{
  87        struct cl_io *up;
  88
  89        up = io->ci_parent;
  90        return
  91                /*
  92                 * io can own pages only when it is ongoing. Sub-io might
  93                 * still be in CIS_LOCKED state when top-io is in
  94                 * CIS_IO_GOING.
  95                 */
  96                ergo(io->ci_owned_nr > 0, io->ci_state == CIS_IO_GOING ||
  97                     (io->ci_state == CIS_LOCKED && up));
  98}
  99
 100/**
 101 * Finalize \a io, by calling cl_io_operations::cio_fini() bottom-to-top.
 102 */
 103void cl_io_fini(const struct lu_env *env, struct cl_io *io)
 104{
 105        struct cl_io_slice    *slice;
 106        struct cl_thread_info *info;
 107
 108        LINVRNT(cl_io_type_is_valid(io->ci_type));
 109        LINVRNT(cl_io_invariant(io));
 110
 111        while (!list_empty(&io->ci_layers)) {
 112                slice = container_of(io->ci_layers.prev, struct cl_io_slice,
 113                                     cis_linkage);
 114                list_del_init(&slice->cis_linkage);
 115                if (slice->cis_iop->op[io->ci_type].cio_fini)
 116                        slice->cis_iop->op[io->ci_type].cio_fini(env, slice);
 117                /*
 118                 * Invalidate slice to catch use after free. This assumes that
 119                 * slices are allocated within session and can be touched
 120                 * after ->cio_fini() returns.
 121                 */
 122                slice->cis_io = NULL;
 123        }
 124        io->ci_state = CIS_FINI;
 125        info = cl_env_info(env);
 126        if (info->clt_current_io == io)
 127                info->clt_current_io = NULL;
 128
 129        /* sanity check for layout change */
 130        switch (io->ci_type) {
 131        case CIT_READ:
 132        case CIT_WRITE:
 133                break;
 134        case CIT_FAULT:
 135        case CIT_FSYNC:
 136                LASSERT(!io->ci_need_restart);
 137                break;
 138        case CIT_SETATTR:
 139        case CIT_MISC:
 140                /* Check ignore layout change conf */
 141                LASSERT(ergo(io->ci_ignore_layout || !io->ci_verify_layout,
 142                             !io->ci_need_restart));
 143                break;
 144        default:
 145                LBUG();
 146        }
 147}
 148EXPORT_SYMBOL(cl_io_fini);
 149
 150static int cl_io_init0(const struct lu_env *env, struct cl_io *io,
 151                       enum cl_io_type iot, struct cl_object *obj)
 152{
 153        struct cl_object *scan;
 154        int result;
 155
 156        LINVRNT(io->ci_state == CIS_ZERO || io->ci_state == CIS_FINI);
 157        LINVRNT(cl_io_type_is_valid(iot));
 158        LINVRNT(cl_io_invariant(io));
 159
 160        io->ci_type = iot;
 161        INIT_LIST_HEAD(&io->ci_lockset.cls_todo);
 162        INIT_LIST_HEAD(&io->ci_lockset.cls_curr);
 163        INIT_LIST_HEAD(&io->ci_lockset.cls_done);
 164        INIT_LIST_HEAD(&io->ci_layers);
 165
 166        result = 0;
 167        cl_object_for_each(scan, obj) {
 168                if (scan->co_ops->coo_io_init) {
 169                        result = scan->co_ops->coo_io_init(env, scan, io);
 170                        if (result != 0)
 171                                break;
 172                }
 173        }
 174        if (result == 0)
 175                io->ci_state = CIS_INIT;
 176        return result;
 177}
 178
 179/**
 180 * Initialize sub-io, by calling cl_io_operations::cio_init() top-to-bottom.
 181 *
 182 * \pre obj != cl_object_top(obj)
 183 */
 184int cl_io_sub_init(const struct lu_env *env, struct cl_io *io,
 185                   enum cl_io_type iot, struct cl_object *obj)
 186{
 187        struct cl_thread_info *info = cl_env_info(env);
 188
 189        LASSERT(obj != cl_object_top(obj));
 190        if (!info->clt_current_io)
 191                info->clt_current_io = io;
 192        return cl_io_init0(env, io, iot, obj);
 193}
 194EXPORT_SYMBOL(cl_io_sub_init);
 195
 196/**
 197 * Initialize \a io, by calling cl_io_operations::cio_init() top-to-bottom.
 198 *
 199 * Caller has to call cl_io_fini() after a call to cl_io_init(), no matter
 200 * what the latter returned.
 201 *
 202 * \pre obj == cl_object_top(obj)
 203 * \pre cl_io_type_is_valid(iot)
 204 * \post cl_io_type_is_valid(io->ci_type) && io->ci_type == iot
 205 */
 206int cl_io_init(const struct lu_env *env, struct cl_io *io,
 207               enum cl_io_type iot, struct cl_object *obj)
 208{
 209        struct cl_thread_info *info = cl_env_info(env);
 210
 211        LASSERT(obj == cl_object_top(obj));
 212        LASSERT(!info->clt_current_io);
 213
 214        info->clt_current_io = io;
 215        return cl_io_init0(env, io, iot, obj);
 216}
 217EXPORT_SYMBOL(cl_io_init);
 218
 219/**
 220 * Initialize read or write io.
 221 *
 222 * \pre iot == CIT_READ || iot == CIT_WRITE
 223 */
 224int cl_io_rw_init(const struct lu_env *env, struct cl_io *io,
 225                  enum cl_io_type iot, loff_t pos, size_t count)
 226{
 227        LINVRNT(iot == CIT_READ || iot == CIT_WRITE);
 228        LINVRNT(io->ci_obj);
 229
 230        LU_OBJECT_HEADER(D_VFSTRACE, env, &io->ci_obj->co_lu,
 231                         "io range: %u [%llu, %llu) %u %u\n",
 232                         iot, (__u64)pos, (__u64)pos + count,
 233                         io->u.ci_rw.crw_nonblock, io->u.ci_wr.wr_append);
 234        io->u.ci_rw.crw_pos    = pos;
 235        io->u.ci_rw.crw_count  = count;
 236        return cl_io_init(env, io, iot, io->ci_obj);
 237}
 238EXPORT_SYMBOL(cl_io_rw_init);
 239
 240static int cl_lock_descr_sort(const struct cl_lock_descr *d0,
 241                              const struct cl_lock_descr *d1)
 242{
 243        return lu_fid_cmp(lu_object_fid(&d0->cld_obj->co_lu),
 244                          lu_object_fid(&d1->cld_obj->co_lu)) ?:
 245                __diff_normalize(d0->cld_start, d1->cld_start);
 246}
 247
 248static int cl_lock_descr_cmp(const struct cl_lock_descr *d0,
 249                             const struct cl_lock_descr *d1)
 250{
 251        int ret;
 252
 253        ret = lu_fid_cmp(lu_object_fid(&d0->cld_obj->co_lu),
 254                         lu_object_fid(&d1->cld_obj->co_lu));
 255        if (ret)
 256                return ret;
 257        if (d0->cld_end < d1->cld_start)
 258                return -1;
 259        if (d0->cld_start > d0->cld_end)
 260                return 1;
 261        return 0;
 262}
 263
 264static void cl_lock_descr_merge(struct cl_lock_descr *d0,
 265                                const struct cl_lock_descr *d1)
 266{
 267        d0->cld_start = min(d0->cld_start, d1->cld_start);
 268        d0->cld_end = max(d0->cld_end, d1->cld_end);
 269
 270        if (d1->cld_mode == CLM_WRITE && d0->cld_mode != CLM_WRITE)
 271                d0->cld_mode = CLM_WRITE;
 272
 273        if (d1->cld_mode == CLM_GROUP && d0->cld_mode != CLM_GROUP)
 274                d0->cld_mode = CLM_GROUP;
 275}
 276
 277/*
 278 * Sort locks in lexicographical order of their (fid, start-offset) pairs.
 279 */
 280static void cl_io_locks_sort(struct cl_io *io)
 281{
 282        int done = 0;
 283
 284        /* hidden treasure: bubble sort for now. */
 285        do {
 286                struct cl_io_lock_link *curr;
 287                struct cl_io_lock_link *prev;
 288                struct cl_io_lock_link *temp;
 289
 290                done = 1;
 291                prev = NULL;
 292
 293                list_for_each_entry_safe(curr, temp,
 294                                         &io->ci_lockset.cls_todo,
 295                                         cill_linkage) {
 296                        if (prev) {
 297                                switch (cl_lock_descr_sort(&prev->cill_descr,
 298                                                           &curr->cill_descr)) {
 299                                case 0:
 300                                        /*
 301                                         * IMPOSSIBLE: Identical locks are
 302                                         *           already removed at
 303                                         *           this point.
 304                                         */
 305                                default:
 306                                        LBUG();
 307                                case 1:
 308                                        list_move_tail(&curr->cill_linkage,
 309                                                       &prev->cill_linkage);
 310                                        done = 0;
 311                                        continue; /* don't change prev: it's
 312                                                   * still "previous"
 313                                                   */
 314                                case -1: /* already in order */
 315                                        break;
 316                                }
 317                        }
 318                        prev = curr;
 319                }
 320        } while (!done);
 321}
 322
 323/**
 324 * Check whether \a queue contains locks matching \a need.
 325 *
 326 * \retval +ve there is a matching lock in the \a queue
 327 * \retval   0 there are no matching locks in the \a queue
 328 */
 329int cl_queue_match(const struct list_head *queue,
 330                   const struct cl_lock_descr *need)
 331{
 332        struct cl_io_lock_link *scan;
 333
 334        list_for_each_entry(scan, queue, cill_linkage) {
 335                if (cl_lock_descr_match(&scan->cill_descr, need))
 336                        return 1;
 337        }
 338        return 0;
 339}
 340EXPORT_SYMBOL(cl_queue_match);
 341
 342static int cl_queue_merge(const struct list_head *queue,
 343                          const struct cl_lock_descr *need)
 344{
 345        struct cl_io_lock_link *scan;
 346
 347        list_for_each_entry(scan, queue, cill_linkage) {
 348                if (cl_lock_descr_cmp(&scan->cill_descr, need))
 349                        continue;
 350                cl_lock_descr_merge(&scan->cill_descr, need);
 351                CDEBUG(D_VFSTRACE, "lock: %d: [%lu, %lu]\n",
 352                       scan->cill_descr.cld_mode, scan->cill_descr.cld_start,
 353                       scan->cill_descr.cld_end);
 354                return 1;
 355        }
 356        return 0;
 357}
 358
 359static int cl_lockset_match(const struct cl_lockset *set,
 360                            const struct cl_lock_descr *need)
 361{
 362        return cl_queue_match(&set->cls_curr, need) ||
 363               cl_queue_match(&set->cls_done, need);
 364}
 365
 366static int cl_lockset_merge(const struct cl_lockset *set,
 367                            const struct cl_lock_descr *need)
 368{
 369        return cl_queue_merge(&set->cls_todo, need) ||
 370               cl_lockset_match(set, need);
 371}
 372
 373static int cl_lockset_lock_one(const struct lu_env *env,
 374                               struct cl_io *io, struct cl_lockset *set,
 375                               struct cl_io_lock_link *link)
 376{
 377        struct cl_lock *lock;
 378        int          result;
 379
 380        lock = cl_lock_request(env, io, &link->cill_descr, "io", io);
 381
 382        if (!IS_ERR(lock)) {
 383                link->cill_lock = lock;
 384                list_move(&link->cill_linkage, &set->cls_curr);
 385                if (!(link->cill_descr.cld_enq_flags & CEF_ASYNC)) {
 386                        result = cl_wait(env, lock);
 387                        if (result == 0)
 388                                list_move(&link->cill_linkage, &set->cls_done);
 389                } else
 390                        result = 0;
 391        } else
 392                result = PTR_ERR(lock);
 393        return result;
 394}
 395
 396static void cl_lock_link_fini(const struct lu_env *env, struct cl_io *io,
 397                              struct cl_io_lock_link *link)
 398{
 399        struct cl_lock *lock = link->cill_lock;
 400
 401        list_del_init(&link->cill_linkage);
 402        if (lock) {
 403                cl_lock_release(env, lock, "io", io);
 404                link->cill_lock = NULL;
 405        }
 406        if (link->cill_fini)
 407                link->cill_fini(env, link);
 408}
 409
 410static int cl_lockset_lock(const struct lu_env *env, struct cl_io *io,
 411                           struct cl_lockset *set)
 412{
 413        struct cl_io_lock_link *link;
 414        struct cl_io_lock_link *temp;
 415        struct cl_lock   *lock;
 416        int result;
 417
 418        result = 0;
 419        list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
 420                if (!cl_lockset_match(set, &link->cill_descr)) {
 421                        /* XXX some locking to guarantee that locks aren't
 422                         * expanded in between.
 423                         */
 424                        result = cl_lockset_lock_one(env, io, set, link);
 425                        if (result != 0)
 426                                break;
 427                } else
 428                        cl_lock_link_fini(env, io, link);
 429        }
 430        if (result == 0) {
 431                list_for_each_entry_safe(link, temp,
 432                                         &set->cls_curr, cill_linkage) {
 433                        lock = link->cill_lock;
 434                        result = cl_wait(env, lock);
 435                        if (result == 0)
 436                                list_move(&link->cill_linkage, &set->cls_done);
 437                        else
 438                                break;
 439                }
 440        }
 441        return result;
 442}
 443
 444/**
 445 * Takes locks necessary for the current iteration of io.
 446 *
 447 * Calls cl_io_operations::cio_lock() top-to-bottom to collect locks required
 448 * by layers for the current iteration. Then sort locks (to avoid dead-locks),
 449 * and acquire them.
 450 */
 451int cl_io_lock(const struct lu_env *env, struct cl_io *io)
 452{
 453        const struct cl_io_slice *scan;
 454        int result = 0;
 455
 456        LINVRNT(cl_io_is_loopable(io));
 457        LINVRNT(io->ci_state == CIS_IT_STARTED);
 458        LINVRNT(cl_io_invariant(io));
 459
 460        cl_io_for_each(scan, io) {
 461                if (!scan->cis_iop->op[io->ci_type].cio_lock)
 462                        continue;
 463                result = scan->cis_iop->op[io->ci_type].cio_lock(env, scan);
 464                if (result != 0)
 465                        break;
 466        }
 467        if (result == 0) {
 468                cl_io_locks_sort(io);
 469                result = cl_lockset_lock(env, io, &io->ci_lockset);
 470        }
 471        if (result != 0)
 472                cl_io_unlock(env, io);
 473        else
 474                io->ci_state = CIS_LOCKED;
 475        return result;
 476}
 477EXPORT_SYMBOL(cl_io_lock);
 478
 479/**
 480 * Release locks takes by io.
 481 */
 482void cl_io_unlock(const struct lu_env *env, struct cl_io *io)
 483{
 484        struct cl_lockset       *set;
 485        struct cl_io_lock_link   *link;
 486        struct cl_io_lock_link   *temp;
 487        const struct cl_io_slice *scan;
 488
 489        LASSERT(cl_io_is_loopable(io));
 490        LASSERT(CIS_IT_STARTED <= io->ci_state && io->ci_state < CIS_UNLOCKED);
 491        LINVRNT(cl_io_invariant(io));
 492
 493        set = &io->ci_lockset;
 494
 495        list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage)
 496                cl_lock_link_fini(env, io, link);
 497
 498        list_for_each_entry_safe(link, temp, &set->cls_curr, cill_linkage)
 499                cl_lock_link_fini(env, io, link);
 500
 501        list_for_each_entry_safe(link, temp, &set->cls_done, cill_linkage) {
 502                cl_unuse(env, link->cill_lock);
 503                cl_lock_link_fini(env, io, link);
 504        }
 505        cl_io_for_each_reverse(scan, io) {
 506                if (scan->cis_iop->op[io->ci_type].cio_unlock)
 507                        scan->cis_iop->op[io->ci_type].cio_unlock(env, scan);
 508        }
 509        io->ci_state = CIS_UNLOCKED;
 510        LASSERT(!cl_env_info(env)->clt_counters[CNL_TOP].ctc_nr_locks_acquired);
 511}
 512EXPORT_SYMBOL(cl_io_unlock);
 513
 514/**
 515 * Prepares next iteration of io.
 516 *
 517 * Calls cl_io_operations::cio_iter_init() top-to-bottom. This exists to give
 518 * layers a chance to modify io parameters, e.g., so that lov can restrict io
 519 * to a single stripe.
 520 */
 521int cl_io_iter_init(const struct lu_env *env, struct cl_io *io)
 522{
 523        const struct cl_io_slice *scan;
 524        int result;
 525
 526        LINVRNT(cl_io_is_loopable(io));
 527        LINVRNT(io->ci_state == CIS_INIT || io->ci_state == CIS_IT_ENDED);
 528        LINVRNT(cl_io_invariant(io));
 529
 530        result = 0;
 531        cl_io_for_each(scan, io) {
 532                if (!scan->cis_iop->op[io->ci_type].cio_iter_init)
 533                        continue;
 534                result = scan->cis_iop->op[io->ci_type].cio_iter_init(env,
 535                                                                      scan);
 536                if (result != 0)
 537                        break;
 538        }
 539        if (result == 0)
 540                io->ci_state = CIS_IT_STARTED;
 541        return result;
 542}
 543EXPORT_SYMBOL(cl_io_iter_init);
 544
 545/**
 546 * Finalizes io iteration.
 547 *
 548 * Calls cl_io_operations::cio_iter_fini() bottom-to-top.
 549 */
 550void cl_io_iter_fini(const struct lu_env *env, struct cl_io *io)
 551{
 552        const struct cl_io_slice *scan;
 553
 554        LINVRNT(cl_io_is_loopable(io));
 555        LINVRNT(io->ci_state == CIS_UNLOCKED);
 556        LINVRNT(cl_io_invariant(io));
 557
 558        cl_io_for_each_reverse(scan, io) {
 559                if (scan->cis_iop->op[io->ci_type].cio_iter_fini)
 560                        scan->cis_iop->op[io->ci_type].cio_iter_fini(env, scan);
 561        }
 562        io->ci_state = CIS_IT_ENDED;
 563}
 564EXPORT_SYMBOL(cl_io_iter_fini);
 565
 566/**
 567 * Records that read or write io progressed \a nob bytes forward.
 568 */
 569static void cl_io_rw_advance(const struct lu_env *env, struct cl_io *io,
 570                             size_t nob)
 571{
 572        const struct cl_io_slice *scan;
 573
 574        LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
 575                nob == 0);
 576        LINVRNT(cl_io_is_loopable(io));
 577        LINVRNT(cl_io_invariant(io));
 578
 579        io->u.ci_rw.crw_pos   += nob;
 580        io->u.ci_rw.crw_count -= nob;
 581
 582        /* layers have to be notified. */
 583        cl_io_for_each_reverse(scan, io) {
 584                if (scan->cis_iop->op[io->ci_type].cio_advance)
 585                        scan->cis_iop->op[io->ci_type].cio_advance(env, scan,
 586                                                                   nob);
 587        }
 588}
 589
 590/**
 591 * Adds a lock to a lockset.
 592 */
 593int cl_io_lock_add(const struct lu_env *env, struct cl_io *io,
 594                   struct cl_io_lock_link *link)
 595{
 596        int result;
 597
 598        if (cl_lockset_merge(&io->ci_lockset, &link->cill_descr))
 599                result = 1;
 600        else {
 601                list_add(&link->cill_linkage, &io->ci_lockset.cls_todo);
 602                result = 0;
 603        }
 604        return result;
 605}
 606EXPORT_SYMBOL(cl_io_lock_add);
 607
 608static void cl_free_io_lock_link(const struct lu_env *env,
 609                                 struct cl_io_lock_link *link)
 610{
 611        kfree(link);
 612}
 613
 614/**
 615 * Allocates new lock link, and uses it to add a lock to a lockset.
 616 */
 617int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
 618                         struct cl_lock_descr *descr)
 619{
 620        struct cl_io_lock_link *link;
 621        int result;
 622
 623        link = kzalloc(sizeof(*link), GFP_NOFS);
 624        if (link) {
 625                link->cill_descr     = *descr;
 626                link->cill_fini      = cl_free_io_lock_link;
 627                result = cl_io_lock_add(env, io, link);
 628                if (result) /* lock match */
 629                        link->cill_fini(env, link);
 630        } else
 631                result = -ENOMEM;
 632
 633        return result;
 634}
 635EXPORT_SYMBOL(cl_io_lock_alloc_add);
 636
 637/**
 638 * Starts io by calling cl_io_operations::cio_start() top-to-bottom.
 639 */
 640int cl_io_start(const struct lu_env *env, struct cl_io *io)
 641{
 642        const struct cl_io_slice *scan;
 643        int result = 0;
 644
 645        LINVRNT(cl_io_is_loopable(io));
 646        LINVRNT(io->ci_state == CIS_LOCKED);
 647        LINVRNT(cl_io_invariant(io));
 648
 649        io->ci_state = CIS_IO_GOING;
 650        cl_io_for_each(scan, io) {
 651                if (!scan->cis_iop->op[io->ci_type].cio_start)
 652                        continue;
 653                result = scan->cis_iop->op[io->ci_type].cio_start(env, scan);
 654                if (result != 0)
 655                        break;
 656        }
 657        if (result >= 0)
 658                result = 0;
 659        return result;
 660}
 661EXPORT_SYMBOL(cl_io_start);
 662
 663/**
 664 * Wait until current io iteration is finished by calling
 665 * cl_io_operations::cio_end() bottom-to-top.
 666 */
 667void cl_io_end(const struct lu_env *env, struct cl_io *io)
 668{
 669        const struct cl_io_slice *scan;
 670
 671        LINVRNT(cl_io_is_loopable(io));
 672        LINVRNT(io->ci_state == CIS_IO_GOING);
 673        LINVRNT(cl_io_invariant(io));
 674
 675        cl_io_for_each_reverse(scan, io) {
 676                if (scan->cis_iop->op[io->ci_type].cio_end)
 677                        scan->cis_iop->op[io->ci_type].cio_end(env, scan);
 678                /* TODO: error handling. */
 679        }
 680        io->ci_state = CIS_IO_FINISHED;
 681}
 682EXPORT_SYMBOL(cl_io_end);
 683
 684static const struct cl_page_slice *
 685cl_io_slice_page(const struct cl_io_slice *ios, struct cl_page *page)
 686{
 687        const struct cl_page_slice *slice;
 688
 689        slice = cl_page_at(page, ios->cis_obj->co_lu.lo_dev->ld_type);
 690        LINVRNT(slice);
 691        return slice;
 692}
 693
 694/**
 695 * True iff \a page is within \a io range.
 696 */
 697static int cl_page_in_io(const struct cl_page *page, const struct cl_io *io)
 698{
 699        int     result = 1;
 700        loff_t  start;
 701        loff_t  end;
 702        pgoff_t idx;
 703
 704        idx = page->cp_index;
 705        switch (io->ci_type) {
 706        case CIT_READ:
 707        case CIT_WRITE:
 708                /*
 709                 * check that [start, end) and [pos, pos + count) extents
 710                 * overlap.
 711                 */
 712                if (!cl_io_is_append(io)) {
 713                        const struct cl_io_rw_common *crw = &(io->u.ci_rw);
 714
 715                        start = cl_offset(page->cp_obj, idx);
 716                        end   = cl_offset(page->cp_obj, idx + 1);
 717                        result = crw->crw_pos < end &&
 718                                 start < crw->crw_pos + crw->crw_count;
 719                }
 720                break;
 721        case CIT_FAULT:
 722                result = io->u.ci_fault.ft_index == idx;
 723                break;
 724        default:
 725                LBUG();
 726        }
 727        return result;
 728}
 729
 730/**
 731 * Called by read io, when page has to be read from the server.
 732 *
 733 * \see cl_io_operations::cio_read_page()
 734 */
 735int cl_io_read_page(const struct lu_env *env, struct cl_io *io,
 736                    struct cl_page *page)
 737{
 738        const struct cl_io_slice *scan;
 739        struct cl_2queue         *queue;
 740        int                    result = 0;
 741
 742        LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_FAULT);
 743        LINVRNT(cl_page_is_owned(page, io));
 744        LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
 745        LINVRNT(cl_page_in_io(page, io));
 746        LINVRNT(cl_io_invariant(io));
 747
 748        queue = &io->ci_queue;
 749
 750        cl_2queue_init(queue);
 751        /*
 752         * ->cio_read_page() methods called in the loop below are supposed to
 753         * never block waiting for network (the only subtle point is the
 754         * creation of new pages for read-ahead that might result in cache
 755         * shrinking, but currently only clean pages are shrunk and this
 756         * requires no network io).
 757         *
 758         * Should this ever starts blocking, retry loop would be needed for
 759         * "parallel io" (see CLO_REPEAT loops in cl_lock.c).
 760         */
 761        cl_io_for_each(scan, io) {
 762                if (scan->cis_iop->cio_read_page) {
 763                        const struct cl_page_slice *slice;
 764
 765                        slice = cl_io_slice_page(scan, page);
 766                        LINVRNT(slice);
 767                        result = scan->cis_iop->cio_read_page(env, scan, slice);
 768                        if (result != 0)
 769                                break;
 770                }
 771        }
 772        if (result == 0)
 773                result = cl_io_submit_rw(env, io, CRT_READ, queue);
 774        /*
 775         * Unlock unsent pages in case of error.
 776         */
 777        cl_page_list_disown(env, io, &queue->c2_qin);
 778        cl_2queue_fini(env, queue);
 779        return result;
 780}
 781EXPORT_SYMBOL(cl_io_read_page);
 782
 783/**
 784 * Called by write io to prepare page to receive data from user buffer.
 785 *
 786 * \see cl_io_operations::cio_prepare_write()
 787 */
 788int cl_io_prepare_write(const struct lu_env *env, struct cl_io *io,
 789                        struct cl_page *page, unsigned from, unsigned to)
 790{
 791        const struct cl_io_slice *scan;
 792        int result = 0;
 793
 794        LINVRNT(io->ci_type == CIT_WRITE);
 795        LINVRNT(cl_page_is_owned(page, io));
 796        LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
 797        LINVRNT(cl_io_invariant(io));
 798        LASSERT(cl_page_in_io(page, io));
 799
 800        cl_io_for_each_reverse(scan, io) {
 801                if (scan->cis_iop->cio_prepare_write) {
 802                        const struct cl_page_slice *slice;
 803
 804                        slice = cl_io_slice_page(scan, page);
 805                        result = scan->cis_iop->cio_prepare_write(env, scan,
 806                                                                  slice,
 807                                                                  from, to);
 808                        if (result != 0)
 809                                break;
 810                }
 811        }
 812        return result;
 813}
 814EXPORT_SYMBOL(cl_io_prepare_write);
 815
 816/**
 817 * Called by write io after user data were copied into a page.
 818 *
 819 * \see cl_io_operations::cio_commit_write()
 820 */
 821int cl_io_commit_write(const struct lu_env *env, struct cl_io *io,
 822                       struct cl_page *page, unsigned from, unsigned to)
 823{
 824        const struct cl_io_slice *scan;
 825        int result = 0;
 826
 827        LINVRNT(io->ci_type == CIT_WRITE);
 828        LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
 829        LINVRNT(cl_io_invariant(io));
 830        /*
 831         * XXX Uh... not nice. Top level cl_io_commit_write() call (vvp->lov)
 832         * already called cl_page_cache_add(), moving page into CPS_CACHED
 833         * state. Better (and more general) way of dealing with such situation
 834         * is needed.
 835         */
 836        LASSERT(cl_page_is_owned(page, io) || page->cp_parent);
 837        LASSERT(cl_page_in_io(page, io));
 838
 839        cl_io_for_each(scan, io) {
 840                if (scan->cis_iop->cio_commit_write) {
 841                        const struct cl_page_slice *slice;
 842
 843                        slice = cl_io_slice_page(scan, page);
 844                        result = scan->cis_iop->cio_commit_write(env, scan,
 845                                                                 slice,
 846                                                                 from, to);
 847                        if (result != 0)
 848                                break;
 849                }
 850        }
 851        LINVRNT(result <= 0);
 852        return result;
 853}
 854EXPORT_SYMBOL(cl_io_commit_write);
 855
 856/**
 857 * Submits a list of pages for immediate io.
 858 *
 859 * After the function gets returned, The submitted pages are moved to
 860 * queue->c2_qout queue, and queue->c2_qin contain both the pages don't need
 861 * to be submitted, and the pages are errant to submit.
 862 *
 863 * \returns 0 if at least one page was submitted, error code otherwise.
 864 * \see cl_io_operations::cio_submit()
 865 */
 866int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
 867                    enum cl_req_type crt, struct cl_2queue *queue)
 868{
 869        const struct cl_io_slice *scan;
 870        int result = 0;
 871
 872        LINVRNT(crt < ARRAY_SIZE(scan->cis_iop->req_op));
 873
 874        cl_io_for_each(scan, io) {
 875                if (!scan->cis_iop->req_op[crt].cio_submit)
 876                        continue;
 877                result = scan->cis_iop->req_op[crt].cio_submit(env, scan, crt,
 878                                                               queue);
 879                if (result != 0)
 880                        break;
 881        }
 882        /*
 883         * If ->cio_submit() failed, no pages were sent.
 884         */
 885        LASSERT(ergo(result != 0, list_empty(&queue->c2_qout.pl_pages)));
 886        return result;
 887}
 888EXPORT_SYMBOL(cl_io_submit_rw);
 889
 890/**
 891 * Submit a sync_io and wait for the IO to be finished, or error happens.
 892 * If \a timeout is zero, it means to wait for the IO unconditionally.
 893 */
 894int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
 895                      enum cl_req_type iot, struct cl_2queue *queue,
 896                      long timeout)
 897{
 898        struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
 899        struct cl_page *pg;
 900        int rc;
 901
 902        cl_page_list_for_each(pg, &queue->c2_qin) {
 903                LASSERT(!pg->cp_sync_io);
 904                pg->cp_sync_io = anchor;
 905        }
 906
 907        cl_sync_io_init(anchor, queue->c2_qin.pl_nr);
 908        rc = cl_io_submit_rw(env, io, iot, queue);
 909        if (rc == 0) {
 910                /*
 911                 * If some pages weren't sent for any reason (e.g.,
 912                 * read found up-to-date pages in the cache, or write found
 913                 * clean pages), count them as completed to avoid infinite
 914                 * wait.
 915                 */
 916                cl_page_list_for_each(pg, &queue->c2_qin) {
 917                        pg->cp_sync_io = NULL;
 918                        cl_sync_io_note(anchor, 1);
 919                }
 920
 921                /* wait for the IO to be finished. */
 922                rc = cl_sync_io_wait(env, io, &queue->c2_qout,
 923                                     anchor, timeout);
 924        } else {
 925                LASSERT(list_empty(&queue->c2_qout.pl_pages));
 926                cl_page_list_for_each(pg, &queue->c2_qin)
 927                        pg->cp_sync_io = NULL;
 928        }
 929        return rc;
 930}
 931EXPORT_SYMBOL(cl_io_submit_sync);
 932
 933/**
 934 * Cancel an IO which has been submitted by cl_io_submit_rw.
 935 */
 936static int cl_io_cancel(const struct lu_env *env, struct cl_io *io,
 937                        struct cl_page_list *queue)
 938{
 939        struct cl_page *page;
 940        int result = 0;
 941
 942        CERROR("Canceling ongoing page transmission\n");
 943        cl_page_list_for_each(page, queue) {
 944                int rc;
 945
 946                LINVRNT(cl_page_in_io(page, io));
 947                rc = cl_page_cancel(env, page);
 948                result = result ?: rc;
 949        }
 950        return result;
 951}
 952
 953/**
 954 * Main io loop.
 955 *
 956 * Pumps io through iterations calling
 957 *
 958 *    - cl_io_iter_init()
 959 *
 960 *    - cl_io_lock()
 961 *
 962 *    - cl_io_start()
 963 *
 964 *    - cl_io_end()
 965 *
 966 *    - cl_io_unlock()
 967 *
 968 *    - cl_io_iter_fini()
 969 *
 970 * repeatedly until there is no more io to do.
 971 */
 972int cl_io_loop(const struct lu_env *env, struct cl_io *io)
 973{
 974        int result   = 0;
 975
 976        LINVRNT(cl_io_is_loopable(io));
 977
 978        do {
 979                size_t nob;
 980
 981                io->ci_continue = 0;
 982                result = cl_io_iter_init(env, io);
 983                if (result == 0) {
 984                        nob    = io->ci_nob;
 985                        result = cl_io_lock(env, io);
 986                        if (result == 0) {
 987                                /*
 988                                 * Notify layers that locks has been taken,
 989                                 * and do actual i/o.
 990                                 *
 991                                 *   - llite: kms, short read;
 992                                 *   - llite: generic_file_read();
 993                                 */
 994                                result = cl_io_start(env, io);
 995                                /*
 996                                 * Send any remaining pending
 997                                 * io, etc.
 998                                 *
 999                                 *   - llite: ll_rw_stats_tally.
1000                                 */
1001                                cl_io_end(env, io);
1002                                cl_io_unlock(env, io);
1003                                cl_io_rw_advance(env, io, io->ci_nob - nob);
1004                        }
1005                }
1006                cl_io_iter_fini(env, io);
1007        } while (result == 0 && io->ci_continue);
1008        if (result == 0)
1009                result = io->ci_result;
1010        return result < 0 ? result : 0;
1011}
1012EXPORT_SYMBOL(cl_io_loop);
1013
1014/**
1015 * Adds io slice to the cl_io.
1016 *
1017 * This is called by cl_object_operations::coo_io_init() methods to add a
1018 * per-layer state to the io. New state is added at the end of
1019 * cl_io::ci_layers list, that is, it is at the bottom of the stack.
1020 *
1021 * \see cl_lock_slice_add(), cl_req_slice_add(), cl_page_slice_add()
1022 */
1023void cl_io_slice_add(struct cl_io *io, struct cl_io_slice *slice,
1024                     struct cl_object *obj,
1025                     const struct cl_io_operations *ops)
1026{
1027        struct list_head *linkage = &slice->cis_linkage;
1028
1029        LASSERT((!linkage->prev && !linkage->next) ||
1030                list_empty(linkage));
1031
1032        list_add_tail(linkage, &io->ci_layers);
1033        slice->cis_io  = io;
1034        slice->cis_obj = obj;
1035        slice->cis_iop = ops;
1036}
1037EXPORT_SYMBOL(cl_io_slice_add);
1038
1039/**
1040 * Initializes page list.
1041 */
1042void cl_page_list_init(struct cl_page_list *plist)
1043{
1044        plist->pl_nr = 0;
1045        INIT_LIST_HEAD(&plist->pl_pages);
1046        plist->pl_owner = current;
1047}
1048EXPORT_SYMBOL(cl_page_list_init);
1049
1050/**
1051 * Adds a page to a page list.
1052 */
1053void cl_page_list_add(struct cl_page_list *plist, struct cl_page *page)
1054{
1055        /* it would be better to check that page is owned by "current" io, but
1056         * it is not passed here.
1057         */
1058        LASSERT(page->cp_owner);
1059        LINVRNT(plist->pl_owner == current);
1060
1061        lockdep_off();
1062        mutex_lock(&page->cp_mutex);
1063        lockdep_on();
1064        LASSERT(list_empty(&page->cp_batch));
1065        list_add_tail(&page->cp_batch, &plist->pl_pages);
1066        ++plist->pl_nr;
1067        lu_ref_add_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
1068        cl_page_get(page);
1069}
1070EXPORT_SYMBOL(cl_page_list_add);
1071
1072/**
1073 * Removes a page from a page list.
1074 */
1075static void cl_page_list_del(const struct lu_env *env,
1076                             struct cl_page_list *plist, struct cl_page *page)
1077{
1078        LASSERT(plist->pl_nr > 0);
1079        LINVRNT(plist->pl_owner == current);
1080
1081        list_del_init(&page->cp_batch);
1082        lockdep_off();
1083        mutex_unlock(&page->cp_mutex);
1084        lockdep_on();
1085        --plist->pl_nr;
1086        lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
1087        cl_page_put(env, page);
1088}
1089
1090/**
1091 * Moves a page from one page list to another.
1092 */
1093void cl_page_list_move(struct cl_page_list *dst, struct cl_page_list *src,
1094                       struct cl_page *page)
1095{
1096        LASSERT(src->pl_nr > 0);
1097        LINVRNT(dst->pl_owner == current);
1098        LINVRNT(src->pl_owner == current);
1099
1100        list_move_tail(&page->cp_batch, &dst->pl_pages);
1101        --src->pl_nr;
1102        ++dst->pl_nr;
1103        lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref, "queue",
1104                      src, dst);
1105}
1106EXPORT_SYMBOL(cl_page_list_move);
1107
1108/**
1109 * splice the cl_page_list, just as list head does
1110 */
1111void cl_page_list_splice(struct cl_page_list *list, struct cl_page_list *head)
1112{
1113        struct cl_page *page;
1114        struct cl_page *tmp;
1115
1116        LINVRNT(list->pl_owner == current);
1117        LINVRNT(head->pl_owner == current);
1118
1119        cl_page_list_for_each_safe(page, tmp, list)
1120                cl_page_list_move(head, list, page);
1121}
1122EXPORT_SYMBOL(cl_page_list_splice);
1123
1124void cl_page_disown0(const struct lu_env *env,
1125                     struct cl_io *io, struct cl_page *pg);
1126
1127/**
1128 * Disowns pages in a queue.
1129 */
1130void cl_page_list_disown(const struct lu_env *env,
1131                         struct cl_io *io, struct cl_page_list *plist)
1132{
1133        struct cl_page *page;
1134        struct cl_page *temp;
1135
1136        LINVRNT(plist->pl_owner == current);
1137
1138        cl_page_list_for_each_safe(page, temp, plist) {
1139                LASSERT(plist->pl_nr > 0);
1140
1141                list_del_init(&page->cp_batch);
1142                lockdep_off();
1143                mutex_unlock(&page->cp_mutex);
1144                lockdep_on();
1145                --plist->pl_nr;
1146                /*
1147                 * cl_page_disown0 rather than usual cl_page_disown() is used,
1148                 * because pages are possibly in CPS_FREEING state already due
1149                 * to the call to cl_page_list_discard().
1150                 */
1151                /*
1152                 * XXX cl_page_disown0() will fail if page is not locked.
1153                 */
1154                cl_page_disown0(env, io, page);
1155                lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue",
1156                              plist);
1157                cl_page_put(env, page);
1158        }
1159}
1160EXPORT_SYMBOL(cl_page_list_disown);
1161
1162/**
1163 * Releases pages from queue.
1164 */
1165static void cl_page_list_fini(const struct lu_env *env,
1166                              struct cl_page_list *plist)
1167{
1168        struct cl_page *page;
1169        struct cl_page *temp;
1170
1171        LINVRNT(plist->pl_owner == current);
1172
1173        cl_page_list_for_each_safe(page, temp, plist)
1174                cl_page_list_del(env, plist, page);
1175        LASSERT(plist->pl_nr == 0);
1176}
1177
1178/**
1179 * Assumes all pages in a queue.
1180 */
1181static void cl_page_list_assume(const struct lu_env *env,
1182                                struct cl_io *io, struct cl_page_list *plist)
1183{
1184        struct cl_page *page;
1185
1186        LINVRNT(plist->pl_owner == current);
1187
1188        cl_page_list_for_each(page, plist)
1189                cl_page_assume(env, io, page);
1190}
1191
1192/**
1193 * Discards all pages in a queue.
1194 */
1195static void cl_page_list_discard(const struct lu_env *env, struct cl_io *io,
1196                                 struct cl_page_list *plist)
1197{
1198        struct cl_page *page;
1199
1200        LINVRNT(plist->pl_owner == current);
1201        cl_page_list_for_each(page, plist)
1202                cl_page_discard(env, io, page);
1203}
1204
1205/**
1206 * Initialize dual page queue.
1207 */
1208void cl_2queue_init(struct cl_2queue *queue)
1209{
1210        cl_page_list_init(&queue->c2_qin);
1211        cl_page_list_init(&queue->c2_qout);
1212}
1213EXPORT_SYMBOL(cl_2queue_init);
1214
1215/**
1216 * Disown pages in both lists of a 2-queue.
1217 */
1218void cl_2queue_disown(const struct lu_env *env,
1219                      struct cl_io *io, struct cl_2queue *queue)
1220{
1221        cl_page_list_disown(env, io, &queue->c2_qin);
1222        cl_page_list_disown(env, io, &queue->c2_qout);
1223}
1224EXPORT_SYMBOL(cl_2queue_disown);
1225
1226/**
1227 * Discard (truncate) pages in both lists of a 2-queue.
1228 */
1229void cl_2queue_discard(const struct lu_env *env,
1230                       struct cl_io *io, struct cl_2queue *queue)
1231{
1232        cl_page_list_discard(env, io, &queue->c2_qin);
1233        cl_page_list_discard(env, io, &queue->c2_qout);
1234}
1235EXPORT_SYMBOL(cl_2queue_discard);
1236
1237/**
1238 * Finalize both page lists of a 2-queue.
1239 */
1240void cl_2queue_fini(const struct lu_env *env, struct cl_2queue *queue)
1241{
1242        cl_page_list_fini(env, &queue->c2_qout);
1243        cl_page_list_fini(env, &queue->c2_qin);
1244}
1245EXPORT_SYMBOL(cl_2queue_fini);
1246
1247/**
1248 * Initialize a 2-queue to contain \a page in its incoming page list.
1249 */
1250void cl_2queue_init_page(struct cl_2queue *queue, struct cl_page *page)
1251{
1252        cl_2queue_init(queue);
1253        /*
1254         * Add a page to the incoming page list of 2-queue.
1255         */
1256        cl_page_list_add(&queue->c2_qin, page);
1257}
1258EXPORT_SYMBOL(cl_2queue_init_page);
1259
1260/**
1261 * Returns top-level io.
1262 *
1263 * \see cl_object_top(), cl_page_top().
1264 */
1265struct cl_io *cl_io_top(struct cl_io *io)
1266{
1267        while (io->ci_parent)
1268                io = io->ci_parent;
1269        return io;
1270}
1271EXPORT_SYMBOL(cl_io_top);
1272
1273/**
1274 * Adds request slice to the compound request.
1275 *
1276 * This is called by cl_device_operations::cdo_req_init() methods to add a
1277 * per-layer state to the request. New state is added at the end of
1278 * cl_req::crq_layers list, that is, it is at the bottom of the stack.
1279 *
1280 * \see cl_lock_slice_add(), cl_page_slice_add(), cl_io_slice_add()
1281 */
1282void cl_req_slice_add(struct cl_req *req, struct cl_req_slice *slice,
1283                      struct cl_device *dev,
1284                      const struct cl_req_operations *ops)
1285{
1286        list_add_tail(&slice->crs_linkage, &req->crq_layers);
1287        slice->crs_dev = dev;
1288        slice->crs_ops = ops;
1289        slice->crs_req = req;
1290}
1291EXPORT_SYMBOL(cl_req_slice_add);
1292
1293static void cl_req_free(const struct lu_env *env, struct cl_req *req)
1294{
1295        unsigned i;
1296
1297        LASSERT(list_empty(&req->crq_pages));
1298        LASSERT(req->crq_nrpages == 0);
1299        LINVRNT(list_empty(&req->crq_layers));
1300        LINVRNT(equi(req->crq_nrobjs > 0, req->crq_o));
1301
1302        if (req->crq_o) {
1303                for (i = 0; i < req->crq_nrobjs; ++i) {
1304                        struct cl_object *obj = req->crq_o[i].ro_obj;
1305
1306                        if (obj) {
1307                                lu_object_ref_del_at(&obj->co_lu,
1308                                                     &req->crq_o[i].ro_obj_ref,
1309                                                     "cl_req", req);
1310                                cl_object_put(env, obj);
1311                        }
1312                }
1313                kfree(req->crq_o);
1314        }
1315        kfree(req);
1316}
1317
1318static int cl_req_init(const struct lu_env *env, struct cl_req *req,
1319                       struct cl_page *page)
1320{
1321        struct cl_device     *dev;
1322        struct cl_page_slice *slice;
1323        int result;
1324
1325        result = 0;
1326        page = cl_page_top(page);
1327        do {
1328                list_for_each_entry(slice, &page->cp_layers, cpl_linkage) {
1329                        dev = lu2cl_dev(slice->cpl_obj->co_lu.lo_dev);
1330                        if (dev->cd_ops->cdo_req_init) {
1331                                result = dev->cd_ops->cdo_req_init(env,
1332                                                                   dev, req);
1333                                if (result != 0)
1334                                        break;
1335                        }
1336                }
1337                page = page->cp_child;
1338        } while (page && result == 0);
1339        return result;
1340}
1341
1342/**
1343 * Invokes per-request transfer completion call-backs
1344 * (cl_req_operations::cro_completion()) bottom-to-top.
1345 */
1346void cl_req_completion(const struct lu_env *env, struct cl_req *req, int rc)
1347{
1348        struct cl_req_slice *slice;
1349
1350        /*
1351         * for the lack of list_for_each_entry_reverse_safe()...
1352         */
1353        while (!list_empty(&req->crq_layers)) {
1354                slice = list_entry(req->crq_layers.prev,
1355                                   struct cl_req_slice, crs_linkage);
1356                list_del_init(&slice->crs_linkage);
1357                if (slice->crs_ops->cro_completion)
1358                        slice->crs_ops->cro_completion(env, slice, rc);
1359        }
1360        cl_req_free(env, req);
1361}
1362EXPORT_SYMBOL(cl_req_completion);
1363
1364/**
1365 * Allocates new transfer request.
1366 */
1367struct cl_req *cl_req_alloc(const struct lu_env *env, struct cl_page *page,
1368                            enum cl_req_type crt, int nr_objects)
1369{
1370        struct cl_req *req;
1371
1372        LINVRNT(nr_objects > 0);
1373
1374        req = kzalloc(sizeof(*req), GFP_NOFS);
1375        if (req) {
1376                int result;
1377
1378                req->crq_type = crt;
1379                INIT_LIST_HEAD(&req->crq_pages);
1380                INIT_LIST_HEAD(&req->crq_layers);
1381
1382                req->crq_o = kcalloc(nr_objects, sizeof(req->crq_o[0]),
1383                                     GFP_NOFS);
1384                if (req->crq_o) {
1385                        req->crq_nrobjs = nr_objects;
1386                        result = cl_req_init(env, req, page);
1387                } else
1388                        result = -ENOMEM;
1389                if (result != 0) {
1390                        cl_req_completion(env, req, result);
1391                        req = ERR_PTR(result);
1392                }
1393        } else
1394                req = ERR_PTR(-ENOMEM);
1395        return req;
1396}
1397EXPORT_SYMBOL(cl_req_alloc);
1398
1399/**
1400 * Adds a page to a request.
1401 */
1402void cl_req_page_add(const struct lu_env *env,
1403                     struct cl_req *req, struct cl_page *page)
1404{
1405        struct cl_object  *obj;
1406        struct cl_req_obj *rqo;
1407        int i;
1408
1409        page = cl_page_top(page);
1410
1411        LASSERT(list_empty(&page->cp_flight));
1412        LASSERT(!page->cp_req);
1413
1414        CL_PAGE_DEBUG(D_PAGE, env, page, "req %p, %d, %u\n",
1415                      req, req->crq_type, req->crq_nrpages);
1416
1417        list_add_tail(&page->cp_flight, &req->crq_pages);
1418        ++req->crq_nrpages;
1419        page->cp_req = req;
1420        obj = cl_object_top(page->cp_obj);
1421        for (i = 0, rqo = req->crq_o; obj != rqo->ro_obj; ++i, ++rqo) {
1422                if (!rqo->ro_obj) {
1423                        rqo->ro_obj = obj;
1424                        cl_object_get(obj);
1425                        lu_object_ref_add_at(&obj->co_lu, &rqo->ro_obj_ref,
1426                                             "cl_req", req);
1427                        break;
1428                }
1429        }
1430        LASSERT(i < req->crq_nrobjs);
1431}
1432EXPORT_SYMBOL(cl_req_page_add);
1433
1434/**
1435 * Removes a page from a request.
1436 */
1437void cl_req_page_done(const struct lu_env *env, struct cl_page *page)
1438{
1439        struct cl_req *req = page->cp_req;
1440
1441        page = cl_page_top(page);
1442
1443        LASSERT(!list_empty(&page->cp_flight));
1444        LASSERT(req->crq_nrpages > 0);
1445
1446        list_del_init(&page->cp_flight);
1447        --req->crq_nrpages;
1448        page->cp_req = NULL;
1449}
1450EXPORT_SYMBOL(cl_req_page_done);
1451
1452/**
1453 * Notifies layers that request is about to depart by calling
1454 * cl_req_operations::cro_prep() top-to-bottom.
1455 */
1456int cl_req_prep(const struct lu_env *env, struct cl_req *req)
1457{
1458        int i;
1459        int result;
1460        const struct cl_req_slice *slice;
1461
1462        /*
1463         * Check that the caller of cl_req_alloc() didn't lie about the number
1464         * of objects.
1465         */
1466        for (i = 0; i < req->crq_nrobjs; ++i)
1467                LASSERT(req->crq_o[i].ro_obj);
1468
1469        result = 0;
1470        list_for_each_entry(slice, &req->crq_layers, crs_linkage) {
1471                if (slice->crs_ops->cro_prep) {
1472                        result = slice->crs_ops->cro_prep(env, slice);
1473                        if (result != 0)
1474                                break;
1475                }
1476        }
1477        return result;
1478}
1479EXPORT_SYMBOL(cl_req_prep);
1480
1481/**
1482 * Fills in attributes that are passed to server together with transfer. Only
1483 * attributes from \a flags may be touched. This can be called multiple times
1484 * for the same request.
1485 */
1486void cl_req_attr_set(const struct lu_env *env, struct cl_req *req,
1487                     struct cl_req_attr *attr, u64 flags)
1488{
1489        const struct cl_req_slice *slice;
1490        struct cl_page      *page;
1491        int i;
1492
1493        LASSERT(!list_empty(&req->crq_pages));
1494
1495        /* Take any page to use as a model. */
1496        page = list_entry(req->crq_pages.next, struct cl_page, cp_flight);
1497
1498        for (i = 0; i < req->crq_nrobjs; ++i) {
1499                list_for_each_entry(slice, &req->crq_layers, crs_linkage) {
1500                        const struct cl_page_slice *scan;
1501                        const struct cl_object     *obj;
1502
1503                        scan = cl_page_at(page,
1504                                          slice->crs_dev->cd_lu_dev.ld_type);
1505                        obj = scan->cpl_obj;
1506                        if (slice->crs_ops->cro_attr_set)
1507                                slice->crs_ops->cro_attr_set(env, slice, obj,
1508                                                             attr + i, flags);
1509                }
1510        }
1511}
1512EXPORT_SYMBOL(cl_req_attr_set);
1513
1514
1515/**
1516 * Initialize synchronous io wait anchor, for transfer of \a nrpages pages.
1517 */
1518void cl_sync_io_init(struct cl_sync_io *anchor, int nrpages)
1519{
1520        init_waitqueue_head(&anchor->csi_waitq);
1521        atomic_set(&anchor->csi_sync_nr, nrpages);
1522        atomic_set(&anchor->csi_barrier, nrpages > 0);
1523        anchor->csi_sync_rc = 0;
1524}
1525EXPORT_SYMBOL(cl_sync_io_init);
1526
1527/**
1528 * Wait until all transfer completes. Transfer completion routine has to call
1529 * cl_sync_io_note() for every page.
1530 */
1531int cl_sync_io_wait(const struct lu_env *env, struct cl_io *io,
1532                    struct cl_page_list *queue, struct cl_sync_io *anchor,
1533                    long timeout)
1534{
1535        struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
1536                                                  NULL, NULL, NULL);
1537        int rc;
1538
1539        LASSERT(timeout >= 0);
1540
1541        rc = l_wait_event(anchor->csi_waitq,
1542                          atomic_read(&anchor->csi_sync_nr) == 0,
1543                          &lwi);
1544        if (rc < 0) {
1545                CERROR("SYNC IO failed with error: %d, try to cancel %d remaining pages\n",
1546                       rc, atomic_read(&anchor->csi_sync_nr));
1547
1548                (void)cl_io_cancel(env, io, queue);
1549
1550                lwi = (struct l_wait_info) { 0 };
1551                (void)l_wait_event(anchor->csi_waitq,
1552                                   atomic_read(&anchor->csi_sync_nr) == 0,
1553                                   &lwi);
1554        } else {
1555                rc = anchor->csi_sync_rc;
1556        }
1557        LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
1558        cl_page_list_assume(env, io, queue);
1559
1560        /* wait until cl_sync_io_note() has done wakeup */
1561        while (unlikely(atomic_read(&anchor->csi_barrier) != 0)) {
1562                cpu_relax();
1563        }
1564
1565        POISON(anchor, 0x5a, sizeof(*anchor));
1566        return rc;
1567}
1568EXPORT_SYMBOL(cl_sync_io_wait);
1569
1570/**
1571 * Indicate that transfer of a single page completed.
1572 */
1573void cl_sync_io_note(struct cl_sync_io *anchor, int ioret)
1574{
1575        if (anchor->csi_sync_rc == 0 && ioret < 0)
1576                anchor->csi_sync_rc = ioret;
1577        /*
1578         * Synchronous IO done without releasing page lock (e.g., as a part of
1579         * ->{prepare,commit}_write(). Completion is used to signal the end of
1580         * IO.
1581         */
1582        LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
1583        if (atomic_dec_and_test(&anchor->csi_sync_nr)) {
1584                wake_up_all(&anchor->csi_waitq);
1585                /* it's safe to nuke or reuse anchor now */
1586                atomic_set(&anchor->csi_barrier, 0);
1587        }
1588}
1589EXPORT_SYMBOL(cl_sync_io_note);
1590