linux/drivers/staging/lustre/lustre/lov/lov_io.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * Implementation of cl_io for LOV layer.
  37 *
  38 *   Author: Nikita Danilov <nikita.danilov@sun.com>
  39 *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
  40 */
  41
  42#define DEBUG_SUBSYSTEM S_LOV
  43
  44#include "lov_cl_internal.h"
  45
  46/** \addtogroup lov
  47 *  @{
  48 */
  49
  50static inline void lov_sub_enter(struct lov_io_sub *sub)
  51{
  52        sub->sub_reenter++;
  53}
  54static inline void lov_sub_exit(struct lov_io_sub *sub)
  55{
  56        sub->sub_reenter--;
  57}
  58
  59static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
  60                            struct lov_io_sub *sub)
  61{
  62        if (sub->sub_io != NULL) {
  63                if (sub->sub_io_initialized) {
  64                        lov_sub_enter(sub);
  65                        cl_io_fini(sub->sub_env, sub->sub_io);
  66                        lov_sub_exit(sub);
  67                        sub->sub_io_initialized = 0;
  68                        lio->lis_active_subios--;
  69                }
  70                if (sub->sub_stripe == lio->lis_single_subio_index)
  71                        lio->lis_single_subio_index = -1;
  72                else if (!sub->sub_borrowed)
  73                        OBD_FREE_PTR(sub->sub_io);
  74                sub->sub_io = NULL;
  75        }
  76        if (sub->sub_env != NULL && !IS_ERR(sub->sub_env)) {
  77                if (!sub->sub_borrowed)
  78                        cl_env_put(sub->sub_env, &sub->sub_refcheck);
  79                sub->sub_env = NULL;
  80        }
  81}
  82
  83static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
  84                               int stripe, loff_t start, loff_t end)
  85{
  86        struct lov_stripe_md *lsm    = lio->lis_object->lo_lsm;
  87        struct cl_io     *parent = lio->lis_cl.cis_io;
  88
  89        switch(io->ci_type) {
  90        case CIT_SETATTR: {
  91                io->u.ci_setattr.sa_attr = parent->u.ci_setattr.sa_attr;
  92                io->u.ci_setattr.sa_valid = parent->u.ci_setattr.sa_valid;
  93                io->u.ci_setattr.sa_capa = parent->u.ci_setattr.sa_capa;
  94                if (cl_io_is_trunc(io)) {
  95                        loff_t new_size = parent->u.ci_setattr.sa_attr.lvb_size;
  96
  97                        new_size = lov_size_to_stripe(lsm, new_size, stripe);
  98                        io->u.ci_setattr.sa_attr.lvb_size = new_size;
  99                }
 100                break;
 101        }
 102        case CIT_FAULT: {
 103                struct cl_object *obj = parent->ci_obj;
 104                loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index);
 105
 106                io->u.ci_fault = parent->u.ci_fault;
 107                off = lov_size_to_stripe(lsm, off, stripe);
 108                io->u.ci_fault.ft_index = cl_index(obj, off);
 109                break;
 110        }
 111        case CIT_FSYNC: {
 112                io->u.ci_fsync.fi_start = start;
 113                io->u.ci_fsync.fi_end = end;
 114                io->u.ci_fsync.fi_capa = parent->u.ci_fsync.fi_capa;
 115                io->u.ci_fsync.fi_fid = parent->u.ci_fsync.fi_fid;
 116                io->u.ci_fsync.fi_mode = parent->u.ci_fsync.fi_mode;
 117                break;
 118        }
 119        case CIT_READ:
 120        case CIT_WRITE: {
 121                io->u.ci_wr.wr_sync = cl_io_is_sync_write(parent);
 122                if (cl_io_is_append(parent)) {
 123                        io->u.ci_wr.wr_append = 1;
 124                } else {
 125                        io->u.ci_rw.crw_pos = start;
 126                        io->u.ci_rw.crw_count = end - start;
 127                }
 128                break;
 129        }
 130        default:
 131                break;
 132        }
 133}
 134
 135static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
 136                           struct lov_io_sub *sub)
 137{
 138        struct lov_object *lov = lio->lis_object;
 139        struct lov_device *ld  = lu2lov_dev(lov2cl(lov)->co_lu.lo_dev);
 140        struct cl_io      *sub_io;
 141        struct cl_object  *sub_obj;
 142        struct cl_io      *io  = lio->lis_cl.cis_io;
 143
 144        int stripe = sub->sub_stripe;
 145        int result;
 146
 147        LASSERT(sub->sub_io == NULL);
 148        LASSERT(sub->sub_env == NULL);
 149        LASSERT(sub->sub_stripe < lio->lis_stripe_count);
 150
 151        result = 0;
 152        sub->sub_io_initialized = 0;
 153        sub->sub_borrowed = 0;
 154
 155        if (lio->lis_mem_frozen) {
 156                LASSERT(mutex_is_locked(&ld->ld_mutex));
 157                sub->sub_io  = &ld->ld_emrg[stripe]->emrg_subio;
 158                sub->sub_env = ld->ld_emrg[stripe]->emrg_env;
 159                sub->sub_borrowed = 1;
 160        } else {
 161                void *cookie;
 162
 163                /* obtain new environment */
 164                cookie = cl_env_reenter();
 165                sub->sub_env = cl_env_get(&sub->sub_refcheck);
 166                cl_env_reexit(cookie);
 167                if (IS_ERR(sub->sub_env))
 168                        result = PTR_ERR(sub->sub_env);
 169
 170                if (result == 0) {
 171                        /*
 172                         * First sub-io. Use ->lis_single_subio to
 173                         * avoid dynamic allocation.
 174                         */
 175                        if (lio->lis_active_subios == 0) {
 176                                sub->sub_io = &lio->lis_single_subio;
 177                                lio->lis_single_subio_index = stripe;
 178                        } else {
 179                                OBD_ALLOC_PTR(sub->sub_io);
 180                                if (sub->sub_io == NULL)
 181                                        result = -ENOMEM;
 182                        }
 183                }
 184        }
 185
 186        if (result == 0) {
 187                sub_obj = lovsub2cl(lov_r0(lov)->lo_sub[stripe]);
 188                sub_io  = sub->sub_io;
 189
 190                sub_io->ci_obj    = sub_obj;
 191                sub_io->ci_result = 0;
 192
 193                sub_io->ci_parent  = io;
 194                sub_io->ci_lockreq = io->ci_lockreq;
 195                sub_io->ci_type    = io->ci_type;
 196                sub_io->ci_no_srvlock = io->ci_no_srvlock;
 197
 198                lov_sub_enter(sub);
 199                result = cl_io_sub_init(sub->sub_env, sub_io,
 200                                        io->ci_type, sub_obj);
 201                lov_sub_exit(sub);
 202                if (result >= 0) {
 203                        lio->lis_active_subios++;
 204                        sub->sub_io_initialized = 1;
 205                        result = 0;
 206                }
 207        }
 208        if (result != 0)
 209                lov_io_sub_fini(env, lio, sub);
 210        return result;
 211}
 212
 213struct lov_io_sub *lov_sub_get(const struct lu_env *env,
 214                               struct lov_io *lio, int stripe)
 215{
 216        int rc;
 217        struct lov_io_sub *sub = &lio->lis_subs[stripe];
 218
 219        LASSERT(stripe < lio->lis_stripe_count);
 220
 221        if (!sub->sub_io_initialized) {
 222                sub->sub_stripe = stripe;
 223                rc = lov_io_sub_init(env, lio, sub);
 224        } else
 225                rc = 0;
 226        if (rc == 0)
 227                lov_sub_enter(sub);
 228        else
 229                sub = ERR_PTR(rc);
 230        return sub;
 231}
 232
 233void lov_sub_put(struct lov_io_sub *sub)
 234{
 235        lov_sub_exit(sub);
 236}
 237
 238/*****************************************************************************
 239 *
 240 * Lov io operations.
 241 *
 242 */
 243
 244static int lov_page_stripe(const struct cl_page *page)
 245{
 246        struct lovsub_object *subobj;
 247
 248        subobj = lu2lovsub(
 249                lu_object_locate(page->cp_child->cp_obj->co_lu.lo_header,
 250                                 &lovsub_device_type));
 251        LASSERT(subobj != NULL);
 252        return subobj->lso_index;
 253}
 254
 255struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
 256                                  const struct cl_page_slice *slice)
 257{
 258        struct lov_stripe_md *lsm  = lio->lis_object->lo_lsm;
 259        struct cl_page       *page = slice->cpl_page;
 260        int stripe;
 261
 262        LASSERT(lio->lis_cl.cis_io != NULL);
 263        LASSERT(cl2lov(slice->cpl_obj) == lio->lis_object);
 264        LASSERT(lsm != NULL);
 265        LASSERT(lio->lis_nr_subios > 0);
 266
 267        stripe = lov_page_stripe(page);
 268        return lov_sub_get(env, lio, stripe);
 269}
 270
 271
 272static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
 273                             struct cl_io *io)
 274{
 275        struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
 276        int result;
 277
 278        LASSERT(lio->lis_object != NULL);
 279
 280        /*
 281         * Need to be optimized, we can't afford to allocate a piece of memory
 282         * when writing a page. -jay
 283         */
 284        OBD_ALLOC_LARGE(lio->lis_subs,
 285                        lsm->lsm_stripe_count * sizeof lio->lis_subs[0]);
 286        if (lio->lis_subs != NULL) {
 287                lio->lis_nr_subios = lio->lis_stripe_count;
 288                lio->lis_single_subio_index = -1;
 289                lio->lis_active_subios = 0;
 290                result = 0;
 291        } else
 292                result = -ENOMEM;
 293        return result;
 294}
 295
 296static void lov_io_slice_init(struct lov_io *lio,
 297                              struct lov_object *obj, struct cl_io *io)
 298{
 299        io->ci_result = 0;
 300        lio->lis_object = obj;
 301
 302        LASSERT(obj->lo_lsm != NULL);
 303        lio->lis_stripe_count = obj->lo_lsm->lsm_stripe_count;
 304
 305        switch (io->ci_type) {
 306        case CIT_READ:
 307        case CIT_WRITE:
 308                lio->lis_pos = io->u.ci_rw.crw_pos;
 309                lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
 310                lio->lis_io_endpos = lio->lis_endpos;
 311                if (cl_io_is_append(io)) {
 312                        LASSERT(io->ci_type == CIT_WRITE);
 313                        lio->lis_pos = 0;
 314                        lio->lis_endpos = OBD_OBJECT_EOF;
 315                }
 316                break;
 317
 318        case CIT_SETATTR:
 319                if (cl_io_is_trunc(io))
 320                        lio->lis_pos = io->u.ci_setattr.sa_attr.lvb_size;
 321                else
 322                        lio->lis_pos = 0;
 323                lio->lis_endpos = OBD_OBJECT_EOF;
 324                break;
 325
 326        case CIT_FAULT: {
 327                pgoff_t index = io->u.ci_fault.ft_index;
 328                lio->lis_pos = cl_offset(io->ci_obj, index);
 329                lio->lis_endpos = cl_offset(io->ci_obj, index + 1);
 330                break;
 331        }
 332
 333        case CIT_FSYNC: {
 334                lio->lis_pos = io->u.ci_fsync.fi_start;
 335                lio->lis_endpos = io->u.ci_fsync.fi_end;
 336                break;
 337        }
 338
 339        case CIT_MISC:
 340                lio->lis_pos = 0;
 341                lio->lis_endpos = OBD_OBJECT_EOF;
 342                break;
 343
 344        default:
 345                LBUG();
 346        }
 347}
 348
 349static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
 350{
 351        struct lov_io *lio = cl2lov_io(env, ios);
 352        struct lov_object *lov = cl2lov(ios->cis_obj);
 353        int i;
 354
 355        if (lio->lis_subs != NULL) {
 356                for (i = 0; i < lio->lis_nr_subios; i++)
 357                        lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
 358                OBD_FREE_LARGE(lio->lis_subs,
 359                         lio->lis_nr_subios * sizeof lio->lis_subs[0]);
 360                lio->lis_nr_subios = 0;
 361        }
 362
 363        LASSERT(atomic_read(&lov->lo_active_ios) > 0);
 364        if (atomic_dec_and_test(&lov->lo_active_ios))
 365                wake_up_all(&lov->lo_waitq);
 366}
 367
 368static obd_off lov_offset_mod(obd_off val, int delta)
 369{
 370        if (val != OBD_OBJECT_EOF)
 371                val += delta;
 372        return val;
 373}
 374
 375static int lov_io_iter_init(const struct lu_env *env,
 376                            const struct cl_io_slice *ios)
 377{
 378        struct lov_io   *lio = cl2lov_io(env, ios);
 379        struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
 380        struct lov_io_sub    *sub;
 381        obd_off endpos;
 382        obd_off start;
 383        obd_off end;
 384        int stripe;
 385        int rc = 0;
 386
 387        endpos = lov_offset_mod(lio->lis_endpos, -1);
 388        for (stripe = 0; stripe < lio->lis_stripe_count; stripe++) {
 389                if (!lov_stripe_intersects(lsm, stripe, lio->lis_pos,
 390                                           endpos, &start, &end))
 391                        continue;
 392
 393                end = lov_offset_mod(end, +1);
 394                sub = lov_sub_get(env, lio, stripe);
 395                if (!IS_ERR(sub)) {
 396                        lov_io_sub_inherit(sub->sub_io, lio, stripe,
 397                                           start, end);
 398                        rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
 399                        lov_sub_put(sub);
 400                        CDEBUG(D_VFSTRACE, "shrink: %d ["LPU64", "LPU64")\n",
 401                               stripe, start, end);
 402                } else
 403                        rc = PTR_ERR(sub);
 404
 405                if (!rc)
 406                        list_add_tail(&sub->sub_linkage, &lio->lis_active);
 407                else
 408                        break;
 409        }
 410        return rc;
 411}
 412
 413static int lov_io_rw_iter_init(const struct lu_env *env,
 414                               const struct cl_io_slice *ios)
 415{
 416        struct lov_io   *lio = cl2lov_io(env, ios);
 417        struct cl_io     *io  = ios->cis_io;
 418        struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
 419        __u64 start = io->u.ci_rw.crw_pos;
 420        loff_t next;
 421        unsigned long ssize = lsm->lsm_stripe_size;
 422
 423        LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
 424
 425        /* fast path for common case. */
 426        if (lio->lis_nr_subios != 1 && !cl_io_is_append(io)) {
 427
 428                lov_do_div64(start, ssize);
 429                next = (start + 1) * ssize;
 430                if (next <= start * ssize)
 431                        next = ~0ull;
 432
 433                io->ci_continue = next < lio->lis_io_endpos;
 434                io->u.ci_rw.crw_count = min_t(loff_t, lio->lis_io_endpos,
 435                                              next) - io->u.ci_rw.crw_pos;
 436                lio->lis_pos    = io->u.ci_rw.crw_pos;
 437                lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
 438                CDEBUG(D_VFSTRACE, "stripe: "LPU64" chunk: ["LPU64", "LPU64") "
 439                       LPU64"\n", (__u64)start, lio->lis_pos, lio->lis_endpos,
 440                       (__u64)lio->lis_io_endpos);
 441        }
 442        /*
 443         * XXX The following call should be optimized: we know, that
 444         * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
 445         */
 446        return lov_io_iter_init(env, ios);
 447}
 448
 449static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
 450                       int (*iofunc)(const struct lu_env *, struct cl_io *))
 451{
 452        struct cl_io *parent = lio->lis_cl.cis_io;
 453        struct lov_io_sub *sub;
 454        int rc = 0;
 455
 456        list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
 457                lov_sub_enter(sub);
 458                rc = iofunc(sub->sub_env, sub->sub_io);
 459                lov_sub_exit(sub);
 460                if (rc)
 461                        break;
 462
 463                if (parent->ci_result == 0)
 464                        parent->ci_result = sub->sub_io->ci_result;
 465        }
 466        return rc;
 467}
 468
 469static int lov_io_lock(const struct lu_env *env, const struct cl_io_slice *ios)
 470{
 471        return lov_io_call(env, cl2lov_io(env, ios), cl_io_lock);
 472}
 473
 474static int lov_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
 475{
 476        return lov_io_call(env, cl2lov_io(env, ios), cl_io_start);
 477}
 478
 479static int lov_io_end_wrapper(const struct lu_env *env, struct cl_io *io)
 480{
 481        /*
 482         * It's possible that lov_io_start() wasn't called against this
 483         * sub-io, either because previous sub-io failed, or upper layer
 484         * completed IO.
 485         */
 486        if (io->ci_state == CIS_IO_GOING)
 487                cl_io_end(env, io);
 488        else
 489                io->ci_state = CIS_IO_FINISHED;
 490        return 0;
 491}
 492
 493static int lov_io_iter_fini_wrapper(const struct lu_env *env, struct cl_io *io)
 494{
 495        cl_io_iter_fini(env, io);
 496        return 0;
 497}
 498
 499static int lov_io_unlock_wrapper(const struct lu_env *env, struct cl_io *io)
 500{
 501        cl_io_unlock(env, io);
 502        return 0;
 503}
 504
 505static void lov_io_end(const struct lu_env *env, const struct cl_io_slice *ios)
 506{
 507        int rc;
 508
 509        rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_end_wrapper);
 510        LASSERT(rc == 0);
 511}
 512
 513static void lov_io_iter_fini(const struct lu_env *env,
 514                             const struct cl_io_slice *ios)
 515{
 516        struct lov_io *lio = cl2lov_io(env, ios);
 517        int rc;
 518
 519        rc = lov_io_call(env, lio, lov_io_iter_fini_wrapper);
 520        LASSERT(rc == 0);
 521        while (!list_empty(&lio->lis_active))
 522                list_del_init(lio->lis_active.next);
 523}
 524
 525static void lov_io_unlock(const struct lu_env *env,
 526                          const struct cl_io_slice *ios)
 527{
 528        int rc;
 529
 530        rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_unlock_wrapper);
 531        LASSERT(rc == 0);
 532}
 533
 534
 535static struct cl_page_list *lov_io_submit_qin(struct lov_device *ld,
 536                                              struct cl_page_list *qin,
 537                                              int idx, int alloc)
 538{
 539        return alloc ? &qin[idx] : &ld->ld_emrg[idx]->emrg_page_list;
 540}
 541
 542/**
 543 * lov implementation of cl_operations::cio_submit() method. It takes a list
 544 * of pages in \a queue, splits it into per-stripe sub-lists, invokes
 545 * cl_io_submit() on underlying devices to submit sub-lists, and then splices
 546 * everything back.
 547 *
 548 * Major complication of this function is a need to handle memory cleansing:
 549 * cl_io_submit() is called to write out pages as a part of VM memory
 550 * reclamation, and hence it may not fail due to memory shortages (system
 551 * dead-locks otherwise). To deal with this, some resources (sub-lists,
 552 * sub-environment, etc.) are allocated per-device on "startup" (i.e., in a
 553 * not-memory cleansing context), and in case of memory shortage, these
 554 * pre-allocated resources are used by lov_io_submit() under
 555 * lov_device::ld_mutex mutex.
 556 */
 557static int lov_io_submit(const struct lu_env *env,
 558                         const struct cl_io_slice *ios,
 559                         enum cl_req_type crt, struct cl_2queue *queue)
 560{
 561        struct lov_io     *lio = cl2lov_io(env, ios);
 562        struct lov_object      *obj = lio->lis_object;
 563        struct lov_device       *ld = lu2lov_dev(lov2cl(obj)->co_lu.lo_dev);
 564        struct cl_page_list    *qin = &queue->c2_qin;
 565        struct cl_2queue      *cl2q = &lov_env_info(env)->lti_cl2q;
 566        struct cl_page_list *stripes_qin = NULL;
 567        struct cl_page *page;
 568        struct cl_page *tmp;
 569        int stripe;
 570
 571#define QIN(stripe) lov_io_submit_qin(ld, stripes_qin, stripe, alloc)
 572
 573        int rc = 0;
 574        int alloc =
 575                !(current->flags & PF_MEMALLOC);
 576
 577        if (lio->lis_active_subios == 1) {
 578                int idx = lio->lis_single_subio_index;
 579                struct lov_io_sub *sub;
 580
 581                LASSERT(idx < lio->lis_nr_subios);
 582                sub = lov_sub_get(env, lio, idx);
 583                LASSERT(!IS_ERR(sub));
 584                LASSERT(sub->sub_io == &lio->lis_single_subio);
 585                rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
 586                                     crt, queue);
 587                lov_sub_put(sub);
 588                return rc;
 589        }
 590
 591        LASSERT(lio->lis_subs != NULL);
 592        if (alloc) {
 593                OBD_ALLOC_LARGE(stripes_qin,
 594                                sizeof(*stripes_qin) * lio->lis_nr_subios);
 595                if (stripes_qin == NULL)
 596                        return -ENOMEM;
 597
 598                for (stripe = 0; stripe < lio->lis_nr_subios; stripe++)
 599                        cl_page_list_init(&stripes_qin[stripe]);
 600        } else {
 601                /*
 602                 * If we get here, it means pageout & swap doesn't help.
 603                 * In order to not make things worse, even don't try to
 604                 * allocate the memory with __GFP_NOWARN. -jay
 605                 */
 606                mutex_lock(&ld->ld_mutex);
 607                lio->lis_mem_frozen = 1;
 608        }
 609
 610        cl_2queue_init(cl2q);
 611        cl_page_list_for_each_safe(page, tmp, qin) {
 612                stripe = lov_page_stripe(page);
 613                cl_page_list_move(QIN(stripe), qin, page);
 614        }
 615
 616        for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
 617                struct lov_io_sub   *sub;
 618                struct cl_page_list *sub_qin = QIN(stripe);
 619
 620                if (list_empty(&sub_qin->pl_pages))
 621                        continue;
 622
 623                cl_page_list_splice(sub_qin, &cl2q->c2_qin);
 624                sub = lov_sub_get(env, lio, stripe);
 625                if (!IS_ERR(sub)) {
 626                        rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
 627                                             crt, cl2q);
 628                        lov_sub_put(sub);
 629                } else
 630                        rc = PTR_ERR(sub);
 631                cl_page_list_splice(&cl2q->c2_qin,  &queue->c2_qin);
 632                cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
 633                if (rc != 0)
 634                        break;
 635        }
 636
 637        for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
 638                struct cl_page_list *sub_qin = QIN(stripe);
 639
 640                if (list_empty(&sub_qin->pl_pages))
 641                        continue;
 642
 643                cl_page_list_splice(sub_qin, qin);
 644        }
 645
 646        if (alloc) {
 647                OBD_FREE_LARGE(stripes_qin,
 648                         sizeof(*stripes_qin) * lio->lis_nr_subios);
 649        } else {
 650                int i;
 651
 652                for (i = 0; i < lio->lis_nr_subios; i++) {
 653                        struct cl_io *cio = lio->lis_subs[i].sub_io;
 654
 655                        if (cio && cio == &ld->ld_emrg[i]->emrg_subio)
 656                                lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
 657                }
 658                lio->lis_mem_frozen = 0;
 659                mutex_unlock(&ld->ld_mutex);
 660        }
 661
 662        return rc;
 663#undef QIN
 664}
 665
 666static int lov_io_prepare_write(const struct lu_env *env,
 667                                const struct cl_io_slice *ios,
 668                                const struct cl_page_slice *slice,
 669                                unsigned from, unsigned to)
 670{
 671        struct lov_io     *lio      = cl2lov_io(env, ios);
 672        struct cl_page    *sub_page = lov_sub_page(slice);
 673        struct lov_io_sub *sub;
 674        int result;
 675
 676        sub = lov_page_subio(env, lio, slice);
 677        if (!IS_ERR(sub)) {
 678                result = cl_io_prepare_write(sub->sub_env, sub->sub_io,
 679                                             sub_page, from, to);
 680                lov_sub_put(sub);
 681        } else
 682                result = PTR_ERR(sub);
 683        return result;
 684}
 685
 686static int lov_io_commit_write(const struct lu_env *env,
 687                               const struct cl_io_slice *ios,
 688                               const struct cl_page_slice *slice,
 689                               unsigned from, unsigned to)
 690{
 691        struct lov_io     *lio      = cl2lov_io(env, ios);
 692        struct cl_page    *sub_page = lov_sub_page(slice);
 693        struct lov_io_sub *sub;
 694        int result;
 695
 696        sub = lov_page_subio(env, lio, slice);
 697        if (!IS_ERR(sub)) {
 698                result = cl_io_commit_write(sub->sub_env, sub->sub_io,
 699                                            sub_page, from, to);
 700                lov_sub_put(sub);
 701        } else
 702                result = PTR_ERR(sub);
 703        return result;
 704}
 705
 706static int lov_io_fault_start(const struct lu_env *env,
 707                              const struct cl_io_slice *ios)
 708{
 709        struct cl_fault_io *fio;
 710        struct lov_io      *lio;
 711        struct lov_io_sub  *sub;
 712
 713        fio = &ios->cis_io->u.ci_fault;
 714        lio = cl2lov_io(env, ios);
 715        sub = lov_sub_get(env, lio, lov_page_stripe(fio->ft_page));
 716        sub->sub_io->u.ci_fault.ft_nob = fio->ft_nob;
 717        lov_sub_put(sub);
 718        return lov_io_start(env, ios);
 719}
 720
 721static void lov_io_fsync_end(const struct lu_env *env,
 722                             const struct cl_io_slice *ios)
 723{
 724        struct lov_io *lio = cl2lov_io(env, ios);
 725        struct lov_io_sub *sub;
 726        unsigned int *written = &ios->cis_io->u.ci_fsync.fi_nr_written;
 727
 728        *written = 0;
 729        list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
 730                struct cl_io *subio = sub->sub_io;
 731
 732                lov_sub_enter(sub);
 733                lov_io_end_wrapper(sub->sub_env, subio);
 734                lov_sub_exit(sub);
 735
 736                if (subio->ci_result == 0)
 737                        *written += subio->u.ci_fsync.fi_nr_written;
 738        }
 739}
 740
 741static const struct cl_io_operations lov_io_ops = {
 742        .op = {
 743                [CIT_READ] = {
 744                        .cio_fini      = lov_io_fini,
 745                        .cio_iter_init = lov_io_rw_iter_init,
 746                        .cio_iter_fini = lov_io_iter_fini,
 747                        .cio_lock      = lov_io_lock,
 748                        .cio_unlock    = lov_io_unlock,
 749                        .cio_start     = lov_io_start,
 750                        .cio_end       = lov_io_end
 751                },
 752                [CIT_WRITE] = {
 753                        .cio_fini      = lov_io_fini,
 754                        .cio_iter_init = lov_io_rw_iter_init,
 755                        .cio_iter_fini = lov_io_iter_fini,
 756                        .cio_lock      = lov_io_lock,
 757                        .cio_unlock    = lov_io_unlock,
 758                        .cio_start     = lov_io_start,
 759                        .cio_end       = lov_io_end
 760                },
 761                [CIT_SETATTR] = {
 762                        .cio_fini      = lov_io_fini,
 763                        .cio_iter_init = lov_io_iter_init,
 764                        .cio_iter_fini = lov_io_iter_fini,
 765                        .cio_lock      = lov_io_lock,
 766                        .cio_unlock    = lov_io_unlock,
 767                        .cio_start     = lov_io_start,
 768                        .cio_end       = lov_io_end
 769                },
 770                [CIT_FAULT] = {
 771                        .cio_fini      = lov_io_fini,
 772                        .cio_iter_init = lov_io_iter_init,
 773                        .cio_iter_fini = lov_io_iter_fini,
 774                        .cio_lock      = lov_io_lock,
 775                        .cio_unlock    = lov_io_unlock,
 776                        .cio_start     = lov_io_fault_start,
 777                        .cio_end       = lov_io_end
 778                },
 779                [CIT_FSYNC] = {
 780                        .cio_fini      = lov_io_fini,
 781                        .cio_iter_init = lov_io_iter_init,
 782                        .cio_iter_fini = lov_io_iter_fini,
 783                        .cio_lock      = lov_io_lock,
 784                        .cio_unlock    = lov_io_unlock,
 785                        .cio_start     = lov_io_start,
 786                        .cio_end       = lov_io_fsync_end
 787                },
 788                [CIT_MISC] = {
 789                        .cio_fini   = lov_io_fini
 790                }
 791        },
 792        .req_op = {
 793                 [CRT_READ] = {
 794                         .cio_submit    = lov_io_submit
 795                 },
 796                 [CRT_WRITE] = {
 797                         .cio_submit    = lov_io_submit
 798                 }
 799         },
 800        .cio_prepare_write = lov_io_prepare_write,
 801        .cio_commit_write  = lov_io_commit_write
 802};
 803
 804/*****************************************************************************
 805 *
 806 * Empty lov io operations.
 807 *
 808 */
 809
 810static void lov_empty_io_fini(const struct lu_env *env,
 811                              const struct cl_io_slice *ios)
 812{
 813        struct lov_object *lov = cl2lov(ios->cis_obj);
 814
 815        if (atomic_dec_and_test(&lov->lo_active_ios))
 816                wake_up_all(&lov->lo_waitq);
 817}
 818
 819static void lov_empty_impossible(const struct lu_env *env,
 820                                 struct cl_io_slice *ios)
 821{
 822        LBUG();
 823}
 824
 825#define LOV_EMPTY_IMPOSSIBLE ((void *)lov_empty_impossible)
 826
 827/**
 828 * An io operation vector for files without stripes.
 829 */
 830static const struct cl_io_operations lov_empty_io_ops = {
 831        .op = {
 832                [CIT_READ] = {
 833                        .cio_fini       = lov_empty_io_fini,
 834#if 0
 835                        .cio_iter_init  = LOV_EMPTY_IMPOSSIBLE,
 836                        .cio_lock       = LOV_EMPTY_IMPOSSIBLE,
 837                        .cio_start      = LOV_EMPTY_IMPOSSIBLE,
 838                        .cio_end        = LOV_EMPTY_IMPOSSIBLE
 839#endif
 840                },
 841                [CIT_WRITE] = {
 842                        .cio_fini      = lov_empty_io_fini,
 843                        .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
 844                        .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
 845                        .cio_start     = LOV_EMPTY_IMPOSSIBLE,
 846                        .cio_end       = LOV_EMPTY_IMPOSSIBLE
 847                },
 848                [CIT_SETATTR] = {
 849                        .cio_fini      = lov_empty_io_fini,
 850                        .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
 851                        .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
 852                        .cio_start     = LOV_EMPTY_IMPOSSIBLE,
 853                        .cio_end       = LOV_EMPTY_IMPOSSIBLE
 854                },
 855                [CIT_FAULT] = {
 856                        .cio_fini      = lov_empty_io_fini,
 857                        .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
 858                        .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
 859                        .cio_start     = LOV_EMPTY_IMPOSSIBLE,
 860                        .cio_end       = LOV_EMPTY_IMPOSSIBLE
 861                },
 862                [CIT_FSYNC] = {
 863                        .cio_fini   = lov_empty_io_fini
 864                },
 865                [CIT_MISC] = {
 866                        .cio_fini   = lov_empty_io_fini
 867                }
 868        },
 869        .req_op = {
 870                 [CRT_READ] = {
 871                         .cio_submit    = LOV_EMPTY_IMPOSSIBLE
 872                 },
 873                 [CRT_WRITE] = {
 874                         .cio_submit    = LOV_EMPTY_IMPOSSIBLE
 875                 }
 876         },
 877        .cio_commit_write = LOV_EMPTY_IMPOSSIBLE
 878};
 879
 880int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
 881                      struct cl_io *io)
 882{
 883        struct lov_io       *lio = lov_env_io(env);
 884        struct lov_object   *lov = cl2lov(obj);
 885
 886        INIT_LIST_HEAD(&lio->lis_active);
 887        lov_io_slice_init(lio, lov, io);
 888        if (io->ci_result == 0) {
 889                io->ci_result = lov_io_subio_init(env, lio, io);
 890                if (io->ci_result == 0) {
 891                        cl_io_slice_add(io, &lio->lis_cl, obj, &lov_io_ops);
 892                        atomic_inc(&lov->lo_active_ios);
 893                }
 894        }
 895        return io->ci_result;
 896}
 897
 898int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
 899                      struct cl_io *io)
 900{
 901        struct lov_object *lov = cl2lov(obj);
 902        struct lov_io *lio = lov_env_io(env);
 903        int result;
 904
 905        lio->lis_object = lov;
 906        switch (io->ci_type) {
 907        default:
 908                LBUG();
 909        case CIT_MISC:
 910        case CIT_READ:
 911                result = 0;
 912                break;
 913        case CIT_FSYNC:
 914        case CIT_SETATTR:
 915                result = +1;
 916                break;
 917        case CIT_WRITE:
 918                result = -EBADF;
 919                break;
 920        case CIT_FAULT:
 921                result = -EFAULT;
 922                CERROR("Page fault on a file without stripes: "DFID"\n",
 923                       PFID(lu_object_fid(&obj->co_lu)));
 924                break;
 925        }
 926        if (result == 0) {
 927                cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
 928                atomic_inc(&lov->lo_active_ios);
 929        }
 930
 931        io->ci_result = result < 0 ? result : 0;
 932        return result != 0;
 933}
 934
 935int lov_io_init_released(const struct lu_env *env, struct cl_object *obj,
 936                        struct cl_io *io)
 937{
 938        struct lov_object *lov = cl2lov(obj);
 939        struct lov_io *lio = lov_env_io(env);
 940        int result;
 941
 942        LASSERT(lov->lo_lsm != NULL);
 943        lio->lis_object = lov;
 944
 945        switch (io->ci_type) {
 946        default:
 947                LASSERTF(0, "invalid type %d\n", io->ci_type);
 948        case CIT_MISC:
 949        case CIT_FSYNC:
 950                result = +1;
 951                break;
 952        case CIT_SETATTR:
 953        case CIT_READ:
 954        case CIT_WRITE:
 955        case CIT_FAULT:
 956                /* TODO: need to restore the file. */
 957                result = -EBADF;
 958                break;
 959        }
 960        if (result == 0) {
 961                cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
 962                atomic_inc(&lov->lo_active_ios);
 963        }
 964
 965        io->ci_result = result < 0 ? result : 0;
 966        return result != 0;
 967}
 968/** @} lov */
 969