linux/drivers/staging/lustre/lustre/lov/lov_io.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2015, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * Implementation of cl_io for LOV layer.
  37 *
  38 *   Author: Nikita Danilov <nikita.danilov@sun.com>
  39 *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
  40 */
  41
  42#define DEBUG_SUBSYSTEM S_LOV
  43
  44#include "lov_cl_internal.h"
  45
  46/** \addtogroup lov
  47 *  @{
  48 */
  49
  50static inline void lov_sub_enter(struct lov_io_sub *sub)
  51{
  52        sub->sub_reenter++;
  53}
  54
  55static inline void lov_sub_exit(struct lov_io_sub *sub)
  56{
  57        sub->sub_reenter--;
  58}
  59
  60static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
  61                            struct lov_io_sub *sub)
  62{
  63        if (sub->sub_io != NULL) {
  64                if (sub->sub_io_initialized) {
  65                        lov_sub_enter(sub);
  66                        cl_io_fini(sub->sub_env, sub->sub_io);
  67                        lov_sub_exit(sub);
  68                        sub->sub_io_initialized = 0;
  69                        lio->lis_active_subios--;
  70                }
  71                if (sub->sub_stripe == lio->lis_single_subio_index)
  72                        lio->lis_single_subio_index = -1;
  73                else if (!sub->sub_borrowed)
  74                        kfree(sub->sub_io);
  75                sub->sub_io = NULL;
  76        }
  77        if (sub->sub_env != NULL && !IS_ERR(sub->sub_env)) {
  78                if (!sub->sub_borrowed)
  79                        cl_env_put(sub->sub_env, &sub->sub_refcheck);
  80                sub->sub_env = NULL;
  81        }
  82}
  83
  84static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
  85                               int stripe, loff_t start, loff_t end)
  86{
  87        struct lov_stripe_md *lsm    = lio->lis_object->lo_lsm;
  88        struct cl_io     *parent = lio->lis_cl.cis_io;
  89
  90        switch (io->ci_type) {
  91        case CIT_SETATTR: {
  92                io->u.ci_setattr.sa_attr = parent->u.ci_setattr.sa_attr;
  93                io->u.ci_setattr.sa_valid = parent->u.ci_setattr.sa_valid;
  94                if (cl_io_is_trunc(io)) {
  95                        loff_t new_size = parent->u.ci_setattr.sa_attr.lvb_size;
  96
  97                        new_size = lov_size_to_stripe(lsm, new_size, stripe);
  98                        io->u.ci_setattr.sa_attr.lvb_size = new_size;
  99                }
 100                break;
 101        }
 102        case CIT_FAULT: {
 103                struct cl_object *obj = parent->ci_obj;
 104                loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index);
 105
 106                io->u.ci_fault = parent->u.ci_fault;
 107                off = lov_size_to_stripe(lsm, off, stripe);
 108                io->u.ci_fault.ft_index = cl_index(obj, off);
 109                break;
 110        }
 111        case CIT_FSYNC: {
 112                io->u.ci_fsync.fi_start = start;
 113                io->u.ci_fsync.fi_end = end;
 114                io->u.ci_fsync.fi_fid = parent->u.ci_fsync.fi_fid;
 115                io->u.ci_fsync.fi_mode = parent->u.ci_fsync.fi_mode;
 116                break;
 117        }
 118        case CIT_READ:
 119        case CIT_WRITE: {
 120                io->u.ci_wr.wr_sync = cl_io_is_sync_write(parent);
 121                if (cl_io_is_append(parent)) {
 122                        io->u.ci_wr.wr_append = 1;
 123                } else {
 124                        io->u.ci_rw.crw_pos = start;
 125                        io->u.ci_rw.crw_count = end - start;
 126                }
 127                break;
 128        }
 129        default:
 130                break;
 131        }
 132}
 133
 134static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
 135                           struct lov_io_sub *sub)
 136{
 137        struct lov_object *lov = lio->lis_object;
 138        struct lov_device *ld  = lu2lov_dev(lov2cl(lov)->co_lu.lo_dev);
 139        struct cl_io      *sub_io;
 140        struct cl_object  *sub_obj;
 141        struct cl_io      *io  = lio->lis_cl.cis_io;
 142
 143        int stripe = sub->sub_stripe;
 144        int result;
 145
 146        LASSERT(sub->sub_io == NULL);
 147        LASSERT(sub->sub_env == NULL);
 148        LASSERT(sub->sub_stripe < lio->lis_stripe_count);
 149
 150        if (unlikely(lov_r0(lov)->lo_sub[stripe] == NULL))
 151                return -EIO;
 152
 153        result = 0;
 154        sub->sub_io_initialized = 0;
 155        sub->sub_borrowed = 0;
 156
 157        if (lio->lis_mem_frozen) {
 158                LASSERT(mutex_is_locked(&ld->ld_mutex));
 159                sub->sub_io  = &ld->ld_emrg[stripe]->emrg_subio;
 160                sub->sub_env = ld->ld_emrg[stripe]->emrg_env;
 161                sub->sub_borrowed = 1;
 162        } else {
 163                void *cookie;
 164
 165                /* obtain new environment */
 166                cookie = cl_env_reenter();
 167                sub->sub_env = cl_env_get(&sub->sub_refcheck);
 168                cl_env_reexit(cookie);
 169                if (IS_ERR(sub->sub_env))
 170                        result = PTR_ERR(sub->sub_env);
 171
 172                if (result == 0) {
 173                        /*
 174                         * First sub-io. Use ->lis_single_subio to
 175                         * avoid dynamic allocation.
 176                         */
 177                        if (lio->lis_active_subios == 0) {
 178                                sub->sub_io = &lio->lis_single_subio;
 179                                lio->lis_single_subio_index = stripe;
 180                        } else {
 181                                sub->sub_io = kzalloc(sizeof(*sub->sub_io),
 182                                                      GFP_NOFS);
 183                                if (!sub->sub_io)
 184                                        result = -ENOMEM;
 185                        }
 186                }
 187        }
 188
 189        if (result == 0) {
 190                sub_obj = lovsub2cl(lov_r0(lov)->lo_sub[stripe]);
 191                sub_io  = sub->sub_io;
 192
 193                sub_io->ci_obj    = sub_obj;
 194                sub_io->ci_result = 0;
 195
 196                sub_io->ci_parent  = io;
 197                sub_io->ci_lockreq = io->ci_lockreq;
 198                sub_io->ci_type    = io->ci_type;
 199                sub_io->ci_no_srvlock = io->ci_no_srvlock;
 200                sub_io->ci_noatime = io->ci_noatime;
 201
 202                lov_sub_enter(sub);
 203                result = cl_io_sub_init(sub->sub_env, sub_io,
 204                                        io->ci_type, sub_obj);
 205                lov_sub_exit(sub);
 206                if (result >= 0) {
 207                        lio->lis_active_subios++;
 208                        sub->sub_io_initialized = 1;
 209                        result = 0;
 210                }
 211        }
 212        if (result != 0)
 213                lov_io_sub_fini(env, lio, sub);
 214        return result;
 215}
 216
 217struct lov_io_sub *lov_sub_get(const struct lu_env *env,
 218                               struct lov_io *lio, int stripe)
 219{
 220        int rc;
 221        struct lov_io_sub *sub = &lio->lis_subs[stripe];
 222
 223        LASSERT(stripe < lio->lis_stripe_count);
 224
 225        if (!sub->sub_io_initialized) {
 226                sub->sub_stripe = stripe;
 227                rc = lov_io_sub_init(env, lio, sub);
 228        } else
 229                rc = 0;
 230        if (rc == 0)
 231                lov_sub_enter(sub);
 232        else
 233                sub = ERR_PTR(rc);
 234        return sub;
 235}
 236
 237void lov_sub_put(struct lov_io_sub *sub)
 238{
 239        lov_sub_exit(sub);
 240}
 241
 242/*****************************************************************************
 243 *
 244 * Lov io operations.
 245 *
 246 */
 247
 248static int lov_page_stripe(const struct cl_page *page)
 249{
 250        struct lovsub_object *subobj;
 251
 252        subobj = lu2lovsub(
 253                lu_object_locate(page->cp_child->cp_obj->co_lu.lo_header,
 254                                 &lovsub_device_type));
 255        LASSERT(subobj != NULL);
 256        return subobj->lso_index;
 257}
 258
 259struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
 260                                  const struct cl_page_slice *slice)
 261{
 262        struct lov_stripe_md *lsm  = lio->lis_object->lo_lsm;
 263        struct cl_page       *page = slice->cpl_page;
 264        int stripe;
 265
 266        LASSERT(lio->lis_cl.cis_io != NULL);
 267        LASSERT(cl2lov(slice->cpl_obj) == lio->lis_object);
 268        LASSERT(lsm != NULL);
 269        LASSERT(lio->lis_nr_subios > 0);
 270
 271        stripe = lov_page_stripe(page);
 272        return lov_sub_get(env, lio, stripe);
 273}
 274
 275static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
 276                             struct cl_io *io)
 277{
 278        struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
 279        int result;
 280
 281        LASSERT(lio->lis_object != NULL);
 282
 283        /*
 284         * Need to be optimized, we can't afford to allocate a piece of memory
 285         * when writing a page. -jay
 286         */
 287        lio->lis_subs =
 288                libcfs_kvzalloc(lsm->lsm_stripe_count *
 289                                sizeof(lio->lis_subs[0]),
 290                                GFP_NOFS);
 291        if (lio->lis_subs != NULL) {
 292                lio->lis_nr_subios = lio->lis_stripe_count;
 293                lio->lis_single_subio_index = -1;
 294                lio->lis_active_subios = 0;
 295                result = 0;
 296        } else
 297                result = -ENOMEM;
 298        return result;
 299}
 300
 301static void lov_io_slice_init(struct lov_io *lio,
 302                              struct lov_object *obj, struct cl_io *io)
 303{
 304        io->ci_result = 0;
 305        lio->lis_object = obj;
 306
 307        LASSERT(obj->lo_lsm != NULL);
 308        lio->lis_stripe_count = obj->lo_lsm->lsm_stripe_count;
 309
 310        switch (io->ci_type) {
 311        case CIT_READ:
 312        case CIT_WRITE:
 313                lio->lis_pos = io->u.ci_rw.crw_pos;
 314                lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
 315                lio->lis_io_endpos = lio->lis_endpos;
 316                if (cl_io_is_append(io)) {
 317                        LASSERT(io->ci_type == CIT_WRITE);
 318                        lio->lis_pos = 0;
 319                        lio->lis_endpos = OBD_OBJECT_EOF;
 320                }
 321                break;
 322
 323        case CIT_SETATTR:
 324                if (cl_io_is_trunc(io))
 325                        lio->lis_pos = io->u.ci_setattr.sa_attr.lvb_size;
 326                else
 327                        lio->lis_pos = 0;
 328                lio->lis_endpos = OBD_OBJECT_EOF;
 329                break;
 330
 331        case CIT_FAULT: {
 332                pgoff_t index = io->u.ci_fault.ft_index;
 333
 334                lio->lis_pos = cl_offset(io->ci_obj, index);
 335                lio->lis_endpos = cl_offset(io->ci_obj, index + 1);
 336                break;
 337        }
 338
 339        case CIT_FSYNC: {
 340                lio->lis_pos = io->u.ci_fsync.fi_start;
 341                lio->lis_endpos = io->u.ci_fsync.fi_end;
 342                break;
 343        }
 344
 345        case CIT_MISC:
 346                lio->lis_pos = 0;
 347                lio->lis_endpos = OBD_OBJECT_EOF;
 348                break;
 349
 350        default:
 351                LBUG();
 352        }
 353}
 354
 355static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
 356{
 357        struct lov_io *lio = cl2lov_io(env, ios);
 358        struct lov_object *lov = cl2lov(ios->cis_obj);
 359        int i;
 360
 361        if (lio->lis_subs != NULL) {
 362                for (i = 0; i < lio->lis_nr_subios; i++)
 363                        lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
 364                kvfree(lio->lis_subs);
 365                lio->lis_nr_subios = 0;
 366        }
 367
 368        LASSERT(atomic_read(&lov->lo_active_ios) > 0);
 369        if (atomic_dec_and_test(&lov->lo_active_ios))
 370                wake_up_all(&lov->lo_waitq);
 371}
 372
 373static u64 lov_offset_mod(u64 val, int delta)
 374{
 375        if (val != OBD_OBJECT_EOF)
 376                val += delta;
 377        return val;
 378}
 379
 380static int lov_io_iter_init(const struct lu_env *env,
 381                            const struct cl_io_slice *ios)
 382{
 383        struct lov_io   *lio = cl2lov_io(env, ios);
 384        struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
 385        struct lov_io_sub    *sub;
 386        u64 endpos;
 387        u64 start;
 388        u64 end;
 389        int stripe;
 390        int rc = 0;
 391
 392        endpos = lov_offset_mod(lio->lis_endpos, -1);
 393        for (stripe = 0; stripe < lio->lis_stripe_count; stripe++) {
 394                if (!lov_stripe_intersects(lsm, stripe, lio->lis_pos,
 395                                           endpos, &start, &end))
 396                        continue;
 397
 398                if (unlikely(lov_r0(lio->lis_object)->lo_sub[stripe] == NULL)) {
 399                        if (ios->cis_io->ci_type == CIT_READ ||
 400                            ios->cis_io->ci_type == CIT_WRITE ||
 401                            ios->cis_io->ci_type == CIT_FAULT)
 402                                return -EIO;
 403
 404                        continue;
 405                }
 406
 407                end = lov_offset_mod(end, 1);
 408                sub = lov_sub_get(env, lio, stripe);
 409                if (!IS_ERR(sub)) {
 410                        lov_io_sub_inherit(sub->sub_io, lio, stripe,
 411                                           start, end);
 412                        rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
 413                        lov_sub_put(sub);
 414                        CDEBUG(D_VFSTRACE, "shrink: %d [%llu, %llu)\n",
 415                               stripe, start, end);
 416                } else
 417                        rc = PTR_ERR(sub);
 418
 419                if (!rc)
 420                        list_add_tail(&sub->sub_linkage, &lio->lis_active);
 421                else
 422                        break;
 423        }
 424        return rc;
 425}
 426
 427static int lov_io_rw_iter_init(const struct lu_env *env,
 428                               const struct cl_io_slice *ios)
 429{
 430        struct lov_io   *lio = cl2lov_io(env, ios);
 431        struct cl_io     *io  = ios->cis_io;
 432        struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
 433        __u64 start = io->u.ci_rw.crw_pos;
 434        loff_t next;
 435        unsigned long ssize = lsm->lsm_stripe_size;
 436
 437        LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
 438
 439        /* fast path for common case. */
 440        if (lio->lis_nr_subios != 1 && !cl_io_is_append(io)) {
 441
 442                lov_do_div64(start, ssize);
 443                next = (start + 1) * ssize;
 444                if (next <= start * ssize)
 445                        next = ~0ull;
 446
 447                io->ci_continue = next < lio->lis_io_endpos;
 448                io->u.ci_rw.crw_count = min_t(loff_t, lio->lis_io_endpos,
 449                                              next) - io->u.ci_rw.crw_pos;
 450                lio->lis_pos    = io->u.ci_rw.crw_pos;
 451                lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
 452                CDEBUG(D_VFSTRACE, "stripe: %llu chunk: [%llu, %llu) %llu\n",
 453                       (__u64)start, lio->lis_pos, lio->lis_endpos,
 454                       (__u64)lio->lis_io_endpos);
 455        }
 456        /*
 457         * XXX The following call should be optimized: we know, that
 458         * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
 459         */
 460        return lov_io_iter_init(env, ios);
 461}
 462
 463static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
 464                       int (*iofunc)(const struct lu_env *, struct cl_io *))
 465{
 466        struct cl_io *parent = lio->lis_cl.cis_io;
 467        struct lov_io_sub *sub;
 468        int rc = 0;
 469
 470        list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
 471                lov_sub_enter(sub);
 472                rc = iofunc(sub->sub_env, sub->sub_io);
 473                lov_sub_exit(sub);
 474                if (rc)
 475                        break;
 476
 477                if (parent->ci_result == 0)
 478                        parent->ci_result = sub->sub_io->ci_result;
 479        }
 480        return rc;
 481}
 482
 483static int lov_io_lock(const struct lu_env *env, const struct cl_io_slice *ios)
 484{
 485        return lov_io_call(env, cl2lov_io(env, ios), cl_io_lock);
 486}
 487
 488static int lov_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
 489{
 490        return lov_io_call(env, cl2lov_io(env, ios), cl_io_start);
 491}
 492
 493static int lov_io_end_wrapper(const struct lu_env *env, struct cl_io *io)
 494{
 495        /*
 496         * It's possible that lov_io_start() wasn't called against this
 497         * sub-io, either because previous sub-io failed, or upper layer
 498         * completed IO.
 499         */
 500        if (io->ci_state == CIS_IO_GOING)
 501                cl_io_end(env, io);
 502        else
 503                io->ci_state = CIS_IO_FINISHED;
 504        return 0;
 505}
 506
 507static int lov_io_iter_fini_wrapper(const struct lu_env *env, struct cl_io *io)
 508{
 509        cl_io_iter_fini(env, io);
 510        return 0;
 511}
 512
 513static int lov_io_unlock_wrapper(const struct lu_env *env, struct cl_io *io)
 514{
 515        cl_io_unlock(env, io);
 516        return 0;
 517}
 518
 519static void lov_io_end(const struct lu_env *env, const struct cl_io_slice *ios)
 520{
 521        int rc;
 522
 523        rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_end_wrapper);
 524        LASSERT(rc == 0);
 525}
 526
 527static void lov_io_iter_fini(const struct lu_env *env,
 528                             const struct cl_io_slice *ios)
 529{
 530        struct lov_io *lio = cl2lov_io(env, ios);
 531        int rc;
 532
 533        rc = lov_io_call(env, lio, lov_io_iter_fini_wrapper);
 534        LASSERT(rc == 0);
 535        while (!list_empty(&lio->lis_active))
 536                list_del_init(lio->lis_active.next);
 537}
 538
 539static void lov_io_unlock(const struct lu_env *env,
 540                          const struct cl_io_slice *ios)
 541{
 542        int rc;
 543
 544        rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_unlock_wrapper);
 545        LASSERT(rc == 0);
 546}
 547
 548static struct cl_page_list *lov_io_submit_qin(struct lov_device *ld,
 549                                              struct cl_page_list *qin,
 550                                              int idx, int alloc)
 551{
 552        return alloc ? &qin[idx] : &ld->ld_emrg[idx]->emrg_page_list;
 553}
 554
 555/**
 556 * lov implementation of cl_operations::cio_submit() method. It takes a list
 557 * of pages in \a queue, splits it into per-stripe sub-lists, invokes
 558 * cl_io_submit() on underlying devices to submit sub-lists, and then splices
 559 * everything back.
 560 *
 561 * Major complication of this function is a need to handle memory cleansing:
 562 * cl_io_submit() is called to write out pages as a part of VM memory
 563 * reclamation, and hence it may not fail due to memory shortages (system
 564 * dead-locks otherwise). To deal with this, some resources (sub-lists,
 565 * sub-environment, etc.) are allocated per-device on "startup" (i.e., in a
 566 * not-memory cleansing context), and in case of memory shortage, these
 567 * pre-allocated resources are used by lov_io_submit() under
 568 * lov_device::ld_mutex mutex.
 569 */
 570static int lov_io_submit(const struct lu_env *env,
 571                         const struct cl_io_slice *ios,
 572                         enum cl_req_type crt, struct cl_2queue *queue)
 573{
 574        struct lov_io     *lio = cl2lov_io(env, ios);
 575        struct lov_object      *obj = lio->lis_object;
 576        struct lov_device       *ld = lu2lov_dev(lov2cl(obj)->co_lu.lo_dev);
 577        struct cl_page_list    *qin = &queue->c2_qin;
 578        struct cl_2queue      *cl2q = &lov_env_info(env)->lti_cl2q;
 579        struct cl_page_list *stripes_qin = NULL;
 580        struct cl_page *page;
 581        struct cl_page *tmp;
 582        int stripe;
 583
 584#define QIN(stripe) lov_io_submit_qin(ld, stripes_qin, stripe, alloc)
 585
 586        int rc = 0;
 587        int alloc =
 588                !(current->flags & PF_MEMALLOC);
 589
 590        if (lio->lis_active_subios == 1) {
 591                int idx = lio->lis_single_subio_index;
 592                struct lov_io_sub *sub;
 593
 594                LASSERT(idx < lio->lis_nr_subios);
 595                sub = lov_sub_get(env, lio, idx);
 596                LASSERT(!IS_ERR(sub));
 597                LASSERT(sub->sub_io == &lio->lis_single_subio);
 598                rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
 599                                     crt, queue);
 600                lov_sub_put(sub);
 601                return rc;
 602        }
 603
 604        LASSERT(lio->lis_subs != NULL);
 605        if (alloc) {
 606                stripes_qin =
 607                        libcfs_kvzalloc(sizeof(*stripes_qin) *
 608                                        lio->lis_nr_subios,
 609                                        GFP_NOFS);
 610                if (stripes_qin == NULL)
 611                        return -ENOMEM;
 612
 613                for (stripe = 0; stripe < lio->lis_nr_subios; stripe++)
 614                        cl_page_list_init(&stripes_qin[stripe]);
 615        } else {
 616                /*
 617                 * If we get here, it means pageout & swap doesn't help.
 618                 * In order to not make things worse, even don't try to
 619                 * allocate the memory with __GFP_NOWARN. -jay
 620                 */
 621                mutex_lock(&ld->ld_mutex);
 622                lio->lis_mem_frozen = 1;
 623        }
 624
 625        cl_2queue_init(cl2q);
 626        cl_page_list_for_each_safe(page, tmp, qin) {
 627                stripe = lov_page_stripe(page);
 628                cl_page_list_move(QIN(stripe), qin, page);
 629        }
 630
 631        for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
 632                struct lov_io_sub   *sub;
 633                struct cl_page_list *sub_qin = QIN(stripe);
 634
 635                if (list_empty(&sub_qin->pl_pages))
 636                        continue;
 637
 638                cl_page_list_splice(sub_qin, &cl2q->c2_qin);
 639                sub = lov_sub_get(env, lio, stripe);
 640                if (!IS_ERR(sub)) {
 641                        rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
 642                                             crt, cl2q);
 643                        lov_sub_put(sub);
 644                } else
 645                        rc = PTR_ERR(sub);
 646                cl_page_list_splice(&cl2q->c2_qin,  &queue->c2_qin);
 647                cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
 648                if (rc != 0)
 649                        break;
 650        }
 651
 652        for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
 653                struct cl_page_list *sub_qin = QIN(stripe);
 654
 655                if (list_empty(&sub_qin->pl_pages))
 656                        continue;
 657
 658                cl_page_list_splice(sub_qin, qin);
 659        }
 660
 661        if (alloc) {
 662                kvfree(stripes_qin);
 663        } else {
 664                int i;
 665
 666                for (i = 0; i < lio->lis_nr_subios; i++) {
 667                        struct cl_io *cio = lio->lis_subs[i].sub_io;
 668
 669                        if (cio && cio == &ld->ld_emrg[i]->emrg_subio)
 670                                lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
 671                }
 672                lio->lis_mem_frozen = 0;
 673                mutex_unlock(&ld->ld_mutex);
 674        }
 675
 676        return rc;
 677#undef QIN
 678}
 679
 680static int lov_io_prepare_write(const struct lu_env *env,
 681                                const struct cl_io_slice *ios,
 682                                const struct cl_page_slice *slice,
 683                                unsigned from, unsigned to)
 684{
 685        struct lov_io     *lio      = cl2lov_io(env, ios);
 686        struct cl_page    *sub_page = lov_sub_page(slice);
 687        struct lov_io_sub *sub;
 688        int result;
 689
 690        sub = lov_page_subio(env, lio, slice);
 691        if (!IS_ERR(sub)) {
 692                result = cl_io_prepare_write(sub->sub_env, sub->sub_io,
 693                                             sub_page, from, to);
 694                lov_sub_put(sub);
 695        } else
 696                result = PTR_ERR(sub);
 697        return result;
 698}
 699
 700static int lov_io_commit_write(const struct lu_env *env,
 701                               const struct cl_io_slice *ios,
 702                               const struct cl_page_slice *slice,
 703                               unsigned from, unsigned to)
 704{
 705        struct lov_io     *lio      = cl2lov_io(env, ios);
 706        struct cl_page    *sub_page = lov_sub_page(slice);
 707        struct lov_io_sub *sub;
 708        int result;
 709
 710        sub = lov_page_subio(env, lio, slice);
 711        if (!IS_ERR(sub)) {
 712                result = cl_io_commit_write(sub->sub_env, sub->sub_io,
 713                                            sub_page, from, to);
 714                lov_sub_put(sub);
 715        } else
 716                result = PTR_ERR(sub);
 717        return result;
 718}
 719
 720static int lov_io_fault_start(const struct lu_env *env,
 721                              const struct cl_io_slice *ios)
 722{
 723        struct cl_fault_io *fio;
 724        struct lov_io      *lio;
 725        struct lov_io_sub  *sub;
 726
 727        fio = &ios->cis_io->u.ci_fault;
 728        lio = cl2lov_io(env, ios);
 729        sub = lov_sub_get(env, lio, lov_page_stripe(fio->ft_page));
 730        if (IS_ERR(sub))
 731                return PTR_ERR(sub);
 732        sub->sub_io->u.ci_fault.ft_nob = fio->ft_nob;
 733        lov_sub_put(sub);
 734        return lov_io_start(env, ios);
 735}
 736
 737static void lov_io_fsync_end(const struct lu_env *env,
 738                             const struct cl_io_slice *ios)
 739{
 740        struct lov_io *lio = cl2lov_io(env, ios);
 741        struct lov_io_sub *sub;
 742        unsigned int *written = &ios->cis_io->u.ci_fsync.fi_nr_written;
 743
 744        *written = 0;
 745        list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
 746                struct cl_io *subio = sub->sub_io;
 747
 748                lov_sub_enter(sub);
 749                lov_io_end_wrapper(sub->sub_env, subio);
 750                lov_sub_exit(sub);
 751
 752                if (subio->ci_result == 0)
 753                        *written += subio->u.ci_fsync.fi_nr_written;
 754        }
 755}
 756
 757static const struct cl_io_operations lov_io_ops = {
 758        .op = {
 759                [CIT_READ] = {
 760                        .cio_fini      = lov_io_fini,
 761                        .cio_iter_init = lov_io_rw_iter_init,
 762                        .cio_iter_fini = lov_io_iter_fini,
 763                        .cio_lock      = lov_io_lock,
 764                        .cio_unlock    = lov_io_unlock,
 765                        .cio_start     = lov_io_start,
 766                        .cio_end       = lov_io_end
 767                },
 768                [CIT_WRITE] = {
 769                        .cio_fini      = lov_io_fini,
 770                        .cio_iter_init = lov_io_rw_iter_init,
 771                        .cio_iter_fini = lov_io_iter_fini,
 772                        .cio_lock      = lov_io_lock,
 773                        .cio_unlock    = lov_io_unlock,
 774                        .cio_start     = lov_io_start,
 775                        .cio_end       = lov_io_end
 776                },
 777                [CIT_SETATTR] = {
 778                        .cio_fini      = lov_io_fini,
 779                        .cio_iter_init = lov_io_iter_init,
 780                        .cio_iter_fini = lov_io_iter_fini,
 781                        .cio_lock      = lov_io_lock,
 782                        .cio_unlock    = lov_io_unlock,
 783                        .cio_start     = lov_io_start,
 784                        .cio_end       = lov_io_end
 785                },
 786                [CIT_FAULT] = {
 787                        .cio_fini      = lov_io_fini,
 788                        .cio_iter_init = lov_io_iter_init,
 789                        .cio_iter_fini = lov_io_iter_fini,
 790                        .cio_lock      = lov_io_lock,
 791                        .cio_unlock    = lov_io_unlock,
 792                        .cio_start     = lov_io_fault_start,
 793                        .cio_end       = lov_io_end
 794                },
 795                [CIT_FSYNC] = {
 796                        .cio_fini      = lov_io_fini,
 797                        .cio_iter_init = lov_io_iter_init,
 798                        .cio_iter_fini = lov_io_iter_fini,
 799                        .cio_lock      = lov_io_lock,
 800                        .cio_unlock    = lov_io_unlock,
 801                        .cio_start     = lov_io_start,
 802                        .cio_end       = lov_io_fsync_end
 803                },
 804                [CIT_MISC] = {
 805                        .cio_fini   = lov_io_fini
 806                }
 807        },
 808        .req_op = {
 809                 [CRT_READ] = {
 810                         .cio_submit    = lov_io_submit
 811                 },
 812                 [CRT_WRITE] = {
 813                         .cio_submit    = lov_io_submit
 814                 }
 815         },
 816        .cio_prepare_write = lov_io_prepare_write,
 817        .cio_commit_write  = lov_io_commit_write
 818};
 819
 820/*****************************************************************************
 821 *
 822 * Empty lov io operations.
 823 *
 824 */
 825
 826static void lov_empty_io_fini(const struct lu_env *env,
 827                              const struct cl_io_slice *ios)
 828{
 829        struct lov_object *lov = cl2lov(ios->cis_obj);
 830
 831        if (atomic_dec_and_test(&lov->lo_active_ios))
 832                wake_up_all(&lov->lo_waitq);
 833}
 834
 835static void lov_empty_impossible(const struct lu_env *env,
 836                                 struct cl_io_slice *ios)
 837{
 838        LBUG();
 839}
 840
 841#define LOV_EMPTY_IMPOSSIBLE ((void *)lov_empty_impossible)
 842
 843/**
 844 * An io operation vector for files without stripes.
 845 */
 846static const struct cl_io_operations lov_empty_io_ops = {
 847        .op = {
 848                [CIT_READ] = {
 849                        .cio_fini       = lov_empty_io_fini,
 850#if 0
 851                        .cio_iter_init  = LOV_EMPTY_IMPOSSIBLE,
 852                        .cio_lock       = LOV_EMPTY_IMPOSSIBLE,
 853                        .cio_start      = LOV_EMPTY_IMPOSSIBLE,
 854                        .cio_end        = LOV_EMPTY_IMPOSSIBLE
 855#endif
 856                },
 857                [CIT_WRITE] = {
 858                        .cio_fini      = lov_empty_io_fini,
 859                        .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
 860                        .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
 861                        .cio_start     = LOV_EMPTY_IMPOSSIBLE,
 862                        .cio_end       = LOV_EMPTY_IMPOSSIBLE
 863                },
 864                [CIT_SETATTR] = {
 865                        .cio_fini      = lov_empty_io_fini,
 866                        .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
 867                        .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
 868                        .cio_start     = LOV_EMPTY_IMPOSSIBLE,
 869                        .cio_end       = LOV_EMPTY_IMPOSSIBLE
 870                },
 871                [CIT_FAULT] = {
 872                        .cio_fini      = lov_empty_io_fini,
 873                        .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
 874                        .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
 875                        .cio_start     = LOV_EMPTY_IMPOSSIBLE,
 876                        .cio_end       = LOV_EMPTY_IMPOSSIBLE
 877                },
 878                [CIT_FSYNC] = {
 879                        .cio_fini   = lov_empty_io_fini
 880                },
 881                [CIT_MISC] = {
 882                        .cio_fini   = lov_empty_io_fini
 883                }
 884        },
 885        .req_op = {
 886                 [CRT_READ] = {
 887                         .cio_submit    = LOV_EMPTY_IMPOSSIBLE
 888                 },
 889                 [CRT_WRITE] = {
 890                         .cio_submit    = LOV_EMPTY_IMPOSSIBLE
 891                 }
 892         },
 893        .cio_commit_write = LOV_EMPTY_IMPOSSIBLE
 894};
 895
 896int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
 897                      struct cl_io *io)
 898{
 899        struct lov_io       *lio = lov_env_io(env);
 900        struct lov_object   *lov = cl2lov(obj);
 901
 902        INIT_LIST_HEAD(&lio->lis_active);
 903        lov_io_slice_init(lio, lov, io);
 904        if (io->ci_result == 0) {
 905                io->ci_result = lov_io_subio_init(env, lio, io);
 906                if (io->ci_result == 0) {
 907                        cl_io_slice_add(io, &lio->lis_cl, obj, &lov_io_ops);
 908                        atomic_inc(&lov->lo_active_ios);
 909                }
 910        }
 911        return io->ci_result;
 912}
 913
 914int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
 915                      struct cl_io *io)
 916{
 917        struct lov_object *lov = cl2lov(obj);
 918        struct lov_io *lio = lov_env_io(env);
 919        int result;
 920
 921        lio->lis_object = lov;
 922        switch (io->ci_type) {
 923        default:
 924                LBUG();
 925        case CIT_MISC:
 926        case CIT_READ:
 927                result = 0;
 928                break;
 929        case CIT_FSYNC:
 930        case CIT_SETATTR:
 931                result = 1;
 932                break;
 933        case CIT_WRITE:
 934                result = -EBADF;
 935                break;
 936        case CIT_FAULT:
 937                result = -EFAULT;
 938                CERROR("Page fault on a file without stripes: "DFID"\n",
 939                       PFID(lu_object_fid(&obj->co_lu)));
 940                break;
 941        }
 942        if (result == 0) {
 943                cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
 944                atomic_inc(&lov->lo_active_ios);
 945        }
 946
 947        io->ci_result = result < 0 ? result : 0;
 948        return result != 0;
 949}
 950
 951int lov_io_init_released(const struct lu_env *env, struct cl_object *obj,
 952                        struct cl_io *io)
 953{
 954        struct lov_object *lov = cl2lov(obj);
 955        struct lov_io *lio = lov_env_io(env);
 956        int result;
 957
 958        LASSERT(lov->lo_lsm != NULL);
 959        lio->lis_object = lov;
 960
 961        switch (io->ci_type) {
 962        default:
 963                LASSERTF(0, "invalid type %d\n", io->ci_type);
 964        case CIT_MISC:
 965        case CIT_FSYNC:
 966                result = 1;
 967                break;
 968        case CIT_SETATTR:
 969                /* the truncate to 0 is managed by MDT:
 970                 * - in open, for open O_TRUNC
 971                 * - in setattr, for truncate
 972                 */
 973                /* the truncate is for size > 0 so triggers a restore */
 974                if (cl_io_is_trunc(io))
 975                        io->ci_restore_needed = 1;
 976                result = -ENODATA;
 977                break;
 978        case CIT_READ:
 979        case CIT_WRITE:
 980        case CIT_FAULT:
 981                io->ci_restore_needed = 1;
 982                result = -ENODATA;
 983                break;
 984        }
 985        if (result == 0) {
 986                cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
 987                atomic_inc(&lov->lo_active_ios);
 988        }
 989
 990        io->ci_result = result < 0 ? result : 0;
 991        return result != 0;
 992}
 993
 994/** @} lov */
 995