linux/drivers/staging/lustre/lustre/lov/lov_io.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 *
  36 * Implementation of cl_io for LOV layer.
  37 *
  38 *   Author: Nikita Danilov <nikita.danilov@sun.com>
  39 *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
  40 */
  41
  42#define DEBUG_SUBSYSTEM S_LOV
  43
  44#include "lov_cl_internal.h"
  45
  46/** \addtogroup lov
  47 *  @{
  48 */
  49
  50static inline void lov_sub_enter(struct lov_io_sub *sub)
  51{
  52        sub->sub_reenter++;
  53}
  54static inline void lov_sub_exit(struct lov_io_sub *sub)
  55{
  56        sub->sub_reenter--;
  57}
  58
  59static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
  60                            struct lov_io_sub *sub)
  61{
  62        ENTRY;
  63        if (sub->sub_io != NULL) {
  64                if (sub->sub_io_initialized) {
  65                        lov_sub_enter(sub);
  66                        cl_io_fini(sub->sub_env, sub->sub_io);
  67                        lov_sub_exit(sub);
  68                        sub->sub_io_initialized = 0;
  69                        lio->lis_active_subios--;
  70                }
  71                if (sub->sub_stripe == lio->lis_single_subio_index)
  72                        lio->lis_single_subio_index = -1;
  73                else if (!sub->sub_borrowed)
  74                        OBD_FREE_PTR(sub->sub_io);
  75                sub->sub_io = NULL;
  76        }
  77        if (sub->sub_env != NULL && !IS_ERR(sub->sub_env)) {
  78                if (!sub->sub_borrowed)
  79                        cl_env_put(sub->sub_env, &sub->sub_refcheck);
  80                sub->sub_env = NULL;
  81        }
  82        EXIT;
  83}
  84
  85static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
  86                               int stripe, loff_t start, loff_t end)
  87{
  88        struct lov_stripe_md *lsm    = lio->lis_object->lo_lsm;
  89        struct cl_io     *parent = lio->lis_cl.cis_io;
  90
  91        switch(io->ci_type) {
  92        case CIT_SETATTR: {
  93                io->u.ci_setattr.sa_attr = parent->u.ci_setattr.sa_attr;
  94                io->u.ci_setattr.sa_valid = parent->u.ci_setattr.sa_valid;
  95                io->u.ci_setattr.sa_capa = parent->u.ci_setattr.sa_capa;
  96                if (cl_io_is_trunc(io)) {
  97                        loff_t new_size = parent->u.ci_setattr.sa_attr.lvb_size;
  98
  99                        new_size = lov_size_to_stripe(lsm, new_size, stripe);
 100                        io->u.ci_setattr.sa_attr.lvb_size = new_size;
 101                }
 102                break;
 103        }
 104        case CIT_FAULT: {
 105                struct cl_object *obj = parent->ci_obj;
 106                loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index);
 107
 108                io->u.ci_fault = parent->u.ci_fault;
 109                off = lov_size_to_stripe(lsm, off, stripe);
 110                io->u.ci_fault.ft_index = cl_index(obj, off);
 111                break;
 112        }
 113        case CIT_FSYNC: {
 114                io->u.ci_fsync.fi_start = start;
 115                io->u.ci_fsync.fi_end = end;
 116                io->u.ci_fsync.fi_capa = parent->u.ci_fsync.fi_capa;
 117                io->u.ci_fsync.fi_fid = parent->u.ci_fsync.fi_fid;
 118                io->u.ci_fsync.fi_mode = parent->u.ci_fsync.fi_mode;
 119                break;
 120        }
 121        case CIT_READ:
 122        case CIT_WRITE: {
 123                io->u.ci_wr.wr_sync = cl_io_is_sync_write(parent);
 124                if (cl_io_is_append(parent)) {
 125                        io->u.ci_wr.wr_append = 1;
 126                } else {
 127                        io->u.ci_rw.crw_pos = start;
 128                        io->u.ci_rw.crw_count = end - start;
 129                }
 130                break;
 131        }
 132        default:
 133                break;
 134        }
 135}
 136
 137static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
 138                           struct lov_io_sub *sub)
 139{
 140        struct lov_object *lov = lio->lis_object;
 141        struct lov_device *ld  = lu2lov_dev(lov2cl(lov)->co_lu.lo_dev);
 142        struct cl_io      *sub_io;
 143        struct cl_object  *sub_obj;
 144        struct cl_io      *io  = lio->lis_cl.cis_io;
 145
 146        int stripe = sub->sub_stripe;
 147        int result;
 148
 149        LASSERT(sub->sub_io == NULL);
 150        LASSERT(sub->sub_env == NULL);
 151        LASSERT(sub->sub_stripe < lio->lis_stripe_count);
 152        ENTRY;
 153
 154        result = 0;
 155        sub->sub_io_initialized = 0;
 156        sub->sub_borrowed = 0;
 157
 158        if (lio->lis_mem_frozen) {
 159                LASSERT(mutex_is_locked(&ld->ld_mutex));
 160                sub->sub_io  = &ld->ld_emrg[stripe]->emrg_subio;
 161                sub->sub_env = ld->ld_emrg[stripe]->emrg_env;
 162                sub->sub_borrowed = 1;
 163        } else {
 164                void *cookie;
 165
 166                /* obtain new environment */
 167                cookie = cl_env_reenter();
 168                sub->sub_env = cl_env_get(&sub->sub_refcheck);
 169                cl_env_reexit(cookie);
 170                if (IS_ERR(sub->sub_env))
 171                        result = PTR_ERR(sub->sub_env);
 172
 173                if (result == 0) {
 174                        /*
 175                         * First sub-io. Use ->lis_single_subio to
 176                         * avoid dynamic allocation.
 177                         */
 178                        if (lio->lis_active_subios == 0) {
 179                                sub->sub_io = &lio->lis_single_subio;
 180                                lio->lis_single_subio_index = stripe;
 181                        } else {
 182                                OBD_ALLOC_PTR(sub->sub_io);
 183                                if (sub->sub_io == NULL)
 184                                        result = -ENOMEM;
 185                        }
 186                }
 187        }
 188
 189        if (result == 0) {
 190                sub_obj = lovsub2cl(lov_r0(lov)->lo_sub[stripe]);
 191                sub_io  = sub->sub_io;
 192
 193                sub_io->ci_obj    = sub_obj;
 194                sub_io->ci_result = 0;
 195
 196                sub_io->ci_parent  = io;
 197                sub_io->ci_lockreq = io->ci_lockreq;
 198                sub_io->ci_type    = io->ci_type;
 199                sub_io->ci_no_srvlock = io->ci_no_srvlock;
 200
 201                lov_sub_enter(sub);
 202                result = cl_io_sub_init(sub->sub_env, sub_io,
 203                                        io->ci_type, sub_obj);
 204                lov_sub_exit(sub);
 205                if (result >= 0) {
 206                        lio->lis_active_subios++;
 207                        sub->sub_io_initialized = 1;
 208                        result = 0;
 209                }
 210        }
 211        if (result != 0)
 212                lov_io_sub_fini(env, lio, sub);
 213        RETURN(result);
 214}
 215
 216struct lov_io_sub *lov_sub_get(const struct lu_env *env,
 217                               struct lov_io *lio, int stripe)
 218{
 219        int rc;
 220        struct lov_io_sub *sub = &lio->lis_subs[stripe];
 221
 222        LASSERT(stripe < lio->lis_stripe_count);
 223        ENTRY;
 224
 225        if (!sub->sub_io_initialized) {
 226                sub->sub_stripe = stripe;
 227                rc = lov_io_sub_init(env, lio, sub);
 228        } else
 229                rc = 0;
 230        if (rc == 0)
 231                lov_sub_enter(sub);
 232        else
 233                sub = ERR_PTR(rc);
 234        RETURN(sub);
 235}
 236
 237void lov_sub_put(struct lov_io_sub *sub)
 238{
 239        lov_sub_exit(sub);
 240}
 241
 242/*****************************************************************************
 243 *
 244 * Lov io operations.
 245 *
 246 */
 247
 248static int lov_page_stripe(const struct cl_page *page)
 249{
 250        struct lovsub_object *subobj;
 251
 252        ENTRY;
 253        subobj = lu2lovsub(
 254                lu_object_locate(page->cp_child->cp_obj->co_lu.lo_header,
 255                                 &lovsub_device_type));
 256        LASSERT(subobj != NULL);
 257        RETURN(subobj->lso_index);
 258}
 259
 260struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
 261                                  const struct cl_page_slice *slice)
 262{
 263        struct lov_stripe_md *lsm  = lio->lis_object->lo_lsm;
 264        struct cl_page       *page = slice->cpl_page;
 265        int stripe;
 266
 267        LASSERT(lio->lis_cl.cis_io != NULL);
 268        LASSERT(cl2lov(slice->cpl_obj) == lio->lis_object);
 269        LASSERT(lsm != NULL);
 270        LASSERT(lio->lis_nr_subios > 0);
 271        ENTRY;
 272
 273        stripe = lov_page_stripe(page);
 274        RETURN(lov_sub_get(env, lio, stripe));
 275}
 276
 277
 278static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
 279                             struct cl_io *io)
 280{
 281        struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
 282        int result;
 283
 284        LASSERT(lio->lis_object != NULL);
 285        ENTRY;
 286
 287        /*
 288         * Need to be optimized, we can't afford to allocate a piece of memory
 289         * when writing a page. -jay
 290         */
 291        OBD_ALLOC_LARGE(lio->lis_subs,
 292                        lsm->lsm_stripe_count * sizeof lio->lis_subs[0]);
 293        if (lio->lis_subs != NULL) {
 294                lio->lis_nr_subios = lio->lis_stripe_count;
 295                lio->lis_single_subio_index = -1;
 296                lio->lis_active_subios = 0;
 297                result = 0;
 298        } else
 299                result = -ENOMEM;
 300        RETURN(result);
 301}
 302
 303static void lov_io_slice_init(struct lov_io *lio,
 304                              struct lov_object *obj, struct cl_io *io)
 305{
 306        ENTRY;
 307
 308        io->ci_result = 0;
 309        lio->lis_object = obj;
 310
 311        LASSERT(obj->lo_lsm != NULL);
 312        lio->lis_stripe_count = obj->lo_lsm->lsm_stripe_count;
 313
 314        switch (io->ci_type) {
 315        case CIT_READ:
 316        case CIT_WRITE:
 317                lio->lis_pos = io->u.ci_rw.crw_pos;
 318                lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
 319                lio->lis_io_endpos = lio->lis_endpos;
 320                if (cl_io_is_append(io)) {
 321                        LASSERT(io->ci_type == CIT_WRITE);
 322                        lio->lis_pos = 0;
 323                        lio->lis_endpos = OBD_OBJECT_EOF;
 324                }
 325                break;
 326
 327        case CIT_SETATTR:
 328                if (cl_io_is_trunc(io))
 329                        lio->lis_pos = io->u.ci_setattr.sa_attr.lvb_size;
 330                else
 331                        lio->lis_pos = 0;
 332                lio->lis_endpos = OBD_OBJECT_EOF;
 333                break;
 334
 335        case CIT_FAULT: {
 336                pgoff_t index = io->u.ci_fault.ft_index;
 337                lio->lis_pos = cl_offset(io->ci_obj, index);
 338                lio->lis_endpos = cl_offset(io->ci_obj, index + 1);
 339                break;
 340        }
 341
 342        case CIT_FSYNC: {
 343                lio->lis_pos = io->u.ci_fsync.fi_start;
 344                lio->lis_endpos = io->u.ci_fsync.fi_end;
 345                break;
 346        }
 347
 348        case CIT_MISC:
 349                lio->lis_pos = 0;
 350                lio->lis_endpos = OBD_OBJECT_EOF;
 351                break;
 352
 353        default:
 354                LBUG();
 355        }
 356
 357        EXIT;
 358}
 359
 360static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
 361{
 362        struct lov_io *lio = cl2lov_io(env, ios);
 363        struct lov_object *lov = cl2lov(ios->cis_obj);
 364        int i;
 365
 366        ENTRY;
 367        if (lio->lis_subs != NULL) {
 368                for (i = 0; i < lio->lis_nr_subios; i++)
 369                        lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
 370                OBD_FREE_LARGE(lio->lis_subs,
 371                         lio->lis_nr_subios * sizeof lio->lis_subs[0]);
 372                lio->lis_nr_subios = 0;
 373        }
 374
 375        LASSERT(atomic_read(&lov->lo_active_ios) > 0);
 376        if (atomic_dec_and_test(&lov->lo_active_ios))
 377                wake_up_all(&lov->lo_waitq);
 378        EXIT;
 379}
 380
 381static obd_off lov_offset_mod(obd_off val, int delta)
 382{
 383        if (val != OBD_OBJECT_EOF)
 384                val += delta;
 385        return val;
 386}
 387
 388static int lov_io_iter_init(const struct lu_env *env,
 389                            const struct cl_io_slice *ios)
 390{
 391        struct lov_io   *lio = cl2lov_io(env, ios);
 392        struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
 393        struct lov_io_sub    *sub;
 394        obd_off endpos;
 395        obd_off start;
 396        obd_off end;
 397        int stripe;
 398        int rc = 0;
 399
 400        ENTRY;
 401        endpos = lov_offset_mod(lio->lis_endpos, -1);
 402        for (stripe = 0; stripe < lio->lis_stripe_count; stripe++) {
 403                if (!lov_stripe_intersects(lsm, stripe, lio->lis_pos,
 404                                           endpos, &start, &end))
 405                        continue;
 406
 407                end = lov_offset_mod(end, +1);
 408                sub = lov_sub_get(env, lio, stripe);
 409                if (!IS_ERR(sub)) {
 410                        lov_io_sub_inherit(sub->sub_io, lio, stripe,
 411                                           start, end);
 412                        rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
 413                        lov_sub_put(sub);
 414                        CDEBUG(D_VFSTRACE, "shrink: %d ["LPU64", "LPU64")\n",
 415                               stripe, start, end);
 416                } else
 417                        rc = PTR_ERR(sub);
 418
 419                if (!rc)
 420                        list_add_tail(&sub->sub_linkage, &lio->lis_active);
 421                else
 422                        break;
 423        }
 424        RETURN(rc);
 425}
 426
 427static int lov_io_rw_iter_init(const struct lu_env *env,
 428                               const struct cl_io_slice *ios)
 429{
 430        struct lov_io   *lio = cl2lov_io(env, ios);
 431        struct cl_io     *io  = ios->cis_io;
 432        struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
 433        loff_t start = io->u.ci_rw.crw_pos;
 434        loff_t next;
 435        unsigned long ssize = lsm->lsm_stripe_size;
 436
 437        LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
 438        ENTRY;
 439
 440        /* fast path for common case. */
 441        if (lio->lis_nr_subios != 1 && !cl_io_is_append(io)) {
 442
 443                lov_do_div64(start, ssize);
 444                next = (start + 1) * ssize;
 445                if (next <= start * ssize)
 446                        next = ~0ull;
 447
 448                io->ci_continue = next < lio->lis_io_endpos;
 449                io->u.ci_rw.crw_count = min_t(loff_t, lio->lis_io_endpos,
 450                                              next) - io->u.ci_rw.crw_pos;
 451                lio->lis_pos    = io->u.ci_rw.crw_pos;
 452                lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
 453                CDEBUG(D_VFSTRACE, "stripe: "LPU64" chunk: ["LPU64", "LPU64") "
 454                       LPU64"\n", (__u64)start, lio->lis_pos, lio->lis_endpos,
 455                       (__u64)lio->lis_io_endpos);
 456        }
 457        /*
 458         * XXX The following call should be optimized: we know, that
 459         * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
 460         */
 461        RETURN(lov_io_iter_init(env, ios));
 462}
 463
 464static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
 465                       int (*iofunc)(const struct lu_env *, struct cl_io *))
 466{
 467        struct cl_io *parent = lio->lis_cl.cis_io;
 468        struct lov_io_sub *sub;
 469        int rc = 0;
 470
 471        ENTRY;
 472        list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
 473                lov_sub_enter(sub);
 474                rc = iofunc(sub->sub_env, sub->sub_io);
 475                lov_sub_exit(sub);
 476                if (rc)
 477                        break;
 478
 479                if (parent->ci_result == 0)
 480                        parent->ci_result = sub->sub_io->ci_result;
 481        }
 482        RETURN(rc);
 483}
 484
 485static int lov_io_lock(const struct lu_env *env, const struct cl_io_slice *ios)
 486{
 487        ENTRY;
 488        RETURN(lov_io_call(env, cl2lov_io(env, ios), cl_io_lock));
 489}
 490
 491static int lov_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
 492{
 493        ENTRY;
 494        RETURN(lov_io_call(env, cl2lov_io(env, ios), cl_io_start));
 495}
 496
 497static int lov_io_end_wrapper(const struct lu_env *env, struct cl_io *io)
 498{
 499        ENTRY;
 500        /*
 501         * It's possible that lov_io_start() wasn't called against this
 502         * sub-io, either because previous sub-io failed, or upper layer
 503         * completed IO.
 504         */
 505        if (io->ci_state == CIS_IO_GOING)
 506                cl_io_end(env, io);
 507        else
 508                io->ci_state = CIS_IO_FINISHED;
 509        RETURN(0);
 510}
 511
 512static int lov_io_iter_fini_wrapper(const struct lu_env *env, struct cl_io *io)
 513{
 514        cl_io_iter_fini(env, io);
 515        RETURN(0);
 516}
 517
 518static int lov_io_unlock_wrapper(const struct lu_env *env, struct cl_io *io)
 519{
 520        cl_io_unlock(env, io);
 521        RETURN(0);
 522}
 523
 524static void lov_io_end(const struct lu_env *env, const struct cl_io_slice *ios)
 525{
 526        int rc;
 527
 528        rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_end_wrapper);
 529        LASSERT(rc == 0);
 530}
 531
 532static void lov_io_iter_fini(const struct lu_env *env,
 533                             const struct cl_io_slice *ios)
 534{
 535        struct lov_io *lio = cl2lov_io(env, ios);
 536        int rc;
 537
 538        ENTRY;
 539        rc = lov_io_call(env, lio, lov_io_iter_fini_wrapper);
 540        LASSERT(rc == 0);
 541        while (!list_empty(&lio->lis_active))
 542                list_del_init(lio->lis_active.next);
 543        EXIT;
 544}
 545
 546static void lov_io_unlock(const struct lu_env *env,
 547                          const struct cl_io_slice *ios)
 548{
 549        int rc;
 550
 551        ENTRY;
 552        rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_unlock_wrapper);
 553        LASSERT(rc == 0);
 554        EXIT;
 555}
 556
 557
 558static struct cl_page_list *lov_io_submit_qin(struct lov_device *ld,
 559                                              struct cl_page_list *qin,
 560                                              int idx, int alloc)
 561{
 562        return alloc ? &qin[idx] : &ld->ld_emrg[idx]->emrg_page_list;
 563}
 564
 565/**
 566 * lov implementation of cl_operations::cio_submit() method. It takes a list
 567 * of pages in \a queue, splits it into per-stripe sub-lists, invokes
 568 * cl_io_submit() on underlying devices to submit sub-lists, and then splices
 569 * everything back.
 570 *
 571 * Major complication of this function is a need to handle memory cleansing:
 572 * cl_io_submit() is called to write out pages as a part of VM memory
 573 * reclamation, and hence it may not fail due to memory shortages (system
 574 * dead-locks otherwise). To deal with this, some resources (sub-lists,
 575 * sub-environment, etc.) are allocated per-device on "startup" (i.e., in a
 576 * not-memory cleansing context), and in case of memory shortage, these
 577 * pre-allocated resources are used by lov_io_submit() under
 578 * lov_device::ld_mutex mutex.
 579 */
 580static int lov_io_submit(const struct lu_env *env,
 581                         const struct cl_io_slice *ios,
 582                         enum cl_req_type crt, struct cl_2queue *queue)
 583{
 584        struct lov_io     *lio = cl2lov_io(env, ios);
 585        struct lov_object      *obj = lio->lis_object;
 586        struct lov_device       *ld = lu2lov_dev(lov2cl(obj)->co_lu.lo_dev);
 587        struct cl_page_list    *qin = &queue->c2_qin;
 588        struct cl_2queue      *cl2q = &lov_env_info(env)->lti_cl2q;
 589        struct cl_page_list *stripes_qin = NULL;
 590        struct cl_page *page;
 591        struct cl_page *tmp;
 592        int stripe;
 593
 594#define QIN(stripe) lov_io_submit_qin(ld, stripes_qin, stripe, alloc)
 595
 596        int rc = 0;
 597        int alloc =
 598                !(current->flags & PF_MEMALLOC);
 599        ENTRY;
 600        if (lio->lis_active_subios == 1) {
 601                int idx = lio->lis_single_subio_index;
 602                struct lov_io_sub *sub;
 603
 604                LASSERT(idx < lio->lis_nr_subios);
 605                sub = lov_sub_get(env, lio, idx);
 606                LASSERT(!IS_ERR(sub));
 607                LASSERT(sub->sub_io == &lio->lis_single_subio);
 608                rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
 609                                     crt, queue);
 610                lov_sub_put(sub);
 611                RETURN(rc);
 612        }
 613
 614        LASSERT(lio->lis_subs != NULL);
 615        if (alloc) {
 616                OBD_ALLOC_LARGE(stripes_qin,
 617                                sizeof(*stripes_qin) * lio->lis_nr_subios);
 618                if (stripes_qin == NULL)
 619                        RETURN(-ENOMEM);
 620
 621                for (stripe = 0; stripe < lio->lis_nr_subios; stripe++)
 622                        cl_page_list_init(&stripes_qin[stripe]);
 623        } else {
 624                /*
 625                 * If we get here, it means pageout & swap doesn't help.
 626                 * In order to not make things worse, even don't try to
 627                 * allocate the memory with __GFP_NOWARN. -jay
 628                 */
 629                mutex_lock(&ld->ld_mutex);
 630                lio->lis_mem_frozen = 1;
 631        }
 632
 633        cl_2queue_init(cl2q);
 634        cl_page_list_for_each_safe(page, tmp, qin) {
 635                stripe = lov_page_stripe(page);
 636                cl_page_list_move(QIN(stripe), qin, page);
 637        }
 638
 639        for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
 640                struct lov_io_sub   *sub;
 641                struct cl_page_list *sub_qin = QIN(stripe);
 642
 643                if (list_empty(&sub_qin->pl_pages))
 644                        continue;
 645
 646                cl_page_list_splice(sub_qin, &cl2q->c2_qin);
 647                sub = lov_sub_get(env, lio, stripe);
 648                if (!IS_ERR(sub)) {
 649                        rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
 650                                             crt, cl2q);
 651                        lov_sub_put(sub);
 652                } else
 653                        rc = PTR_ERR(sub);
 654                cl_page_list_splice(&cl2q->c2_qin,  &queue->c2_qin);
 655                cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
 656                if (rc != 0)
 657                        break;
 658        }
 659
 660        for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
 661                struct cl_page_list *sub_qin = QIN(stripe);
 662
 663                if (list_empty(&sub_qin->pl_pages))
 664                        continue;
 665
 666                cl_page_list_splice(sub_qin, qin);
 667        }
 668
 669        if (alloc) {
 670                OBD_FREE_LARGE(stripes_qin,
 671                         sizeof(*stripes_qin) * lio->lis_nr_subios);
 672        } else {
 673                int i;
 674
 675                for (i = 0; i < lio->lis_nr_subios; i++) {
 676                        struct cl_io *cio = lio->lis_subs[i].sub_io;
 677
 678                        if (cio && cio == &ld->ld_emrg[i]->emrg_subio)
 679                                lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
 680                }
 681                lio->lis_mem_frozen = 0;
 682                mutex_unlock(&ld->ld_mutex);
 683        }
 684
 685        RETURN(rc);
 686#undef QIN
 687}
 688
 689static int lov_io_prepare_write(const struct lu_env *env,
 690                                const struct cl_io_slice *ios,
 691                                const struct cl_page_slice *slice,
 692                                unsigned from, unsigned to)
 693{
 694        struct lov_io     *lio      = cl2lov_io(env, ios);
 695        struct cl_page    *sub_page = lov_sub_page(slice);
 696        struct lov_io_sub *sub;
 697        int result;
 698
 699        ENTRY;
 700        sub = lov_page_subio(env, lio, slice);
 701        if (!IS_ERR(sub)) {
 702                result = cl_io_prepare_write(sub->sub_env, sub->sub_io,
 703                                             sub_page, from, to);
 704                lov_sub_put(sub);
 705        } else
 706                result = PTR_ERR(sub);
 707        RETURN(result);
 708}
 709
 710static int lov_io_commit_write(const struct lu_env *env,
 711                               const struct cl_io_slice *ios,
 712                               const struct cl_page_slice *slice,
 713                               unsigned from, unsigned to)
 714{
 715        struct lov_io     *lio      = cl2lov_io(env, ios);
 716        struct cl_page    *sub_page = lov_sub_page(slice);
 717        struct lov_io_sub *sub;
 718        int result;
 719
 720        ENTRY;
 721        sub = lov_page_subio(env, lio, slice);
 722        if (!IS_ERR(sub)) {
 723                result = cl_io_commit_write(sub->sub_env, sub->sub_io,
 724                                            sub_page, from, to);
 725                lov_sub_put(sub);
 726        } else
 727                result = PTR_ERR(sub);
 728        RETURN(result);
 729}
 730
 731static int lov_io_fault_start(const struct lu_env *env,
 732                              const struct cl_io_slice *ios)
 733{
 734        struct cl_fault_io *fio;
 735        struct lov_io      *lio;
 736        struct lov_io_sub  *sub;
 737
 738        ENTRY;
 739        fio = &ios->cis_io->u.ci_fault;
 740        lio = cl2lov_io(env, ios);
 741        sub = lov_sub_get(env, lio, lov_page_stripe(fio->ft_page));
 742        sub->sub_io->u.ci_fault.ft_nob = fio->ft_nob;
 743        lov_sub_put(sub);
 744        RETURN(lov_io_start(env, ios));
 745}
 746
 747static void lov_io_fsync_end(const struct lu_env *env,
 748                             const struct cl_io_slice *ios)
 749{
 750        struct lov_io *lio = cl2lov_io(env, ios);
 751        struct lov_io_sub *sub;
 752        unsigned int *written = &ios->cis_io->u.ci_fsync.fi_nr_written;
 753        ENTRY;
 754
 755        *written = 0;
 756        list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
 757                struct cl_io *subio = sub->sub_io;
 758
 759                lov_sub_enter(sub);
 760                lov_io_end_wrapper(sub->sub_env, subio);
 761                lov_sub_exit(sub);
 762
 763                if (subio->ci_result == 0)
 764                        *written += subio->u.ci_fsync.fi_nr_written;
 765        }
 766        RETURN_EXIT;
 767}
 768
 769static const struct cl_io_operations lov_io_ops = {
 770        .op = {
 771                [CIT_READ] = {
 772                        .cio_fini      = lov_io_fini,
 773                        .cio_iter_init = lov_io_rw_iter_init,
 774                        .cio_iter_fini = lov_io_iter_fini,
 775                        .cio_lock      = lov_io_lock,
 776                        .cio_unlock    = lov_io_unlock,
 777                        .cio_start     = lov_io_start,
 778                        .cio_end       = lov_io_end
 779                },
 780                [CIT_WRITE] = {
 781                        .cio_fini      = lov_io_fini,
 782                        .cio_iter_init = lov_io_rw_iter_init,
 783                        .cio_iter_fini = lov_io_iter_fini,
 784                        .cio_lock      = lov_io_lock,
 785                        .cio_unlock    = lov_io_unlock,
 786                        .cio_start     = lov_io_start,
 787                        .cio_end       = lov_io_end
 788                },
 789                [CIT_SETATTR] = {
 790                        .cio_fini      = lov_io_fini,
 791                        .cio_iter_init = lov_io_iter_init,
 792                        .cio_iter_fini = lov_io_iter_fini,
 793                        .cio_lock      = lov_io_lock,
 794                        .cio_unlock    = lov_io_unlock,
 795                        .cio_start     = lov_io_start,
 796                        .cio_end       = lov_io_end
 797                },
 798                [CIT_FAULT] = {
 799                        .cio_fini      = lov_io_fini,
 800                        .cio_iter_init = lov_io_iter_init,
 801                        .cio_iter_fini = lov_io_iter_fini,
 802                        .cio_lock      = lov_io_lock,
 803                        .cio_unlock    = lov_io_unlock,
 804                        .cio_start     = lov_io_fault_start,
 805                        .cio_end       = lov_io_end
 806                },
 807                [CIT_FSYNC] = {
 808                        .cio_fini      = lov_io_fini,
 809                        .cio_iter_init = lov_io_iter_init,
 810                        .cio_iter_fini = lov_io_iter_fini,
 811                        .cio_lock      = lov_io_lock,
 812                        .cio_unlock    = lov_io_unlock,
 813                        .cio_start     = lov_io_start,
 814                        .cio_end       = lov_io_fsync_end
 815                },
 816                [CIT_MISC] = {
 817                        .cio_fini   = lov_io_fini
 818                }
 819        },
 820        .req_op = {
 821                 [CRT_READ] = {
 822                         .cio_submit    = lov_io_submit
 823                 },
 824                 [CRT_WRITE] = {
 825                         .cio_submit    = lov_io_submit
 826                 }
 827         },
 828        .cio_prepare_write = lov_io_prepare_write,
 829        .cio_commit_write  = lov_io_commit_write
 830};
 831
 832/*****************************************************************************
 833 *
 834 * Empty lov io operations.
 835 *
 836 */
 837
 838static void lov_empty_io_fini(const struct lu_env *env,
 839                              const struct cl_io_slice *ios)
 840{
 841        struct lov_object *lov = cl2lov(ios->cis_obj);
 842        ENTRY;
 843
 844        if (atomic_dec_and_test(&lov->lo_active_ios))
 845                wake_up_all(&lov->lo_waitq);
 846        EXIT;
 847}
 848
 849static void lov_empty_impossible(const struct lu_env *env,
 850                                 struct cl_io_slice *ios)
 851{
 852        LBUG();
 853}
 854
 855#define LOV_EMPTY_IMPOSSIBLE ((void *)lov_empty_impossible)
 856
 857/**
 858 * An io operation vector for files without stripes.
 859 */
 860static const struct cl_io_operations lov_empty_io_ops = {
 861        .op = {
 862                [CIT_READ] = {
 863                        .cio_fini       = lov_empty_io_fini,
 864#if 0
 865                        .cio_iter_init  = LOV_EMPTY_IMPOSSIBLE,
 866                        .cio_lock       = LOV_EMPTY_IMPOSSIBLE,
 867                        .cio_start      = LOV_EMPTY_IMPOSSIBLE,
 868                        .cio_end        = LOV_EMPTY_IMPOSSIBLE
 869#endif
 870                },
 871                [CIT_WRITE] = {
 872                        .cio_fini      = lov_empty_io_fini,
 873                        .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
 874                        .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
 875                        .cio_start     = LOV_EMPTY_IMPOSSIBLE,
 876                        .cio_end       = LOV_EMPTY_IMPOSSIBLE
 877                },
 878                [CIT_SETATTR] = {
 879                        .cio_fini      = lov_empty_io_fini,
 880                        .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
 881                        .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
 882                        .cio_start     = LOV_EMPTY_IMPOSSIBLE,
 883                        .cio_end       = LOV_EMPTY_IMPOSSIBLE
 884                },
 885                [CIT_FAULT] = {
 886                        .cio_fini      = lov_empty_io_fini,
 887                        .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
 888                        .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
 889                        .cio_start     = LOV_EMPTY_IMPOSSIBLE,
 890                        .cio_end       = LOV_EMPTY_IMPOSSIBLE
 891                },
 892                [CIT_FSYNC] = {
 893                        .cio_fini   = lov_empty_io_fini
 894                },
 895                [CIT_MISC] = {
 896                        .cio_fini   = lov_empty_io_fini
 897                }
 898        },
 899        .req_op = {
 900                 [CRT_READ] = {
 901                         .cio_submit    = LOV_EMPTY_IMPOSSIBLE
 902                 },
 903                 [CRT_WRITE] = {
 904                         .cio_submit    = LOV_EMPTY_IMPOSSIBLE
 905                 }
 906         },
 907        .cio_commit_write = LOV_EMPTY_IMPOSSIBLE
 908};
 909
 910int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
 911                      struct cl_io *io)
 912{
 913        struct lov_io       *lio = lov_env_io(env);
 914        struct lov_object   *lov = cl2lov(obj);
 915
 916        ENTRY;
 917        INIT_LIST_HEAD(&lio->lis_active);
 918        lov_io_slice_init(lio, lov, io);
 919        if (io->ci_result == 0) {
 920                io->ci_result = lov_io_subio_init(env, lio, io);
 921                if (io->ci_result == 0) {
 922                        cl_io_slice_add(io, &lio->lis_cl, obj, &lov_io_ops);
 923                        atomic_inc(&lov->lo_active_ios);
 924                }
 925        }
 926        RETURN(io->ci_result);
 927}
 928
 929int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
 930                      struct cl_io *io)
 931{
 932        struct lov_object *lov = cl2lov(obj);
 933        struct lov_io *lio = lov_env_io(env);
 934        int result;
 935        ENTRY;
 936
 937        lio->lis_object = lov;
 938        switch (io->ci_type) {
 939        default:
 940                LBUG();
 941        case CIT_MISC:
 942        case CIT_READ:
 943                result = 0;
 944                break;
 945        case CIT_FSYNC:
 946        case CIT_SETATTR:
 947                result = +1;
 948                break;
 949        case CIT_WRITE:
 950                result = -EBADF;
 951                break;
 952        case CIT_FAULT:
 953                result = -EFAULT;
 954                CERROR("Page fault on a file without stripes: "DFID"\n",
 955                       PFID(lu_object_fid(&obj->co_lu)));
 956                break;
 957        }
 958        if (result == 0) {
 959                cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
 960                atomic_inc(&lov->lo_active_ios);
 961        }
 962
 963        io->ci_result = result < 0 ? result : 0;
 964        RETURN(result != 0);
 965}
 966
 967/** @} lov */
 968