linux/drivers/staging/lustre/lustre/llite/vvp_io.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.gnu.org/licenses/gpl-2.0.html
  19 *
  20 * GPL HEADER END
  21 */
  22/*
  23 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  24 * Use is subject to license terms.
  25 *
  26 * Copyright (c) 2011, 2015, Intel Corporation.
  27 */
  28/*
  29 * This file is part of Lustre, http://www.lustre.org/
  30 * Lustre is a trademark of Sun Microsystems, Inc.
  31 *
  32 * Implementation of cl_io for VVP layer.
  33 *
  34 *   Author: Nikita Danilov <nikita.danilov@sun.com>
  35 *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
  36 */
  37
  38#define DEBUG_SUBSYSTEM S_LLITE
  39
  40#include "../include/obd.h"
  41
  42#include "llite_internal.h"
  43#include "vvp_internal.h"
  44
  45static struct vvp_io *cl2vvp_io(const struct lu_env *env,
  46                                const struct cl_io_slice *slice)
  47{
  48        struct vvp_io *vio;
  49
  50        vio = container_of(slice, struct vvp_io, vui_cl);
  51        LASSERT(vio == vvp_env_io(env));
  52
  53        return vio;
  54}
  55
  56/**
  57 * For swapping layout. The file's layout may have changed.
  58 * To avoid populating pages to a wrong stripe, we have to verify the
  59 * correctness of layout. It works because swapping layout processes
  60 * have to acquire group lock.
  61 */
  62static bool can_populate_pages(const struct lu_env *env, struct cl_io *io,
  63                               struct inode *inode)
  64{
  65        struct ll_inode_info    *lli = ll_i2info(inode);
  66        struct vvp_io           *vio = vvp_env_io(env);
  67        bool rc = true;
  68
  69        switch (io->ci_type) {
  70        case CIT_READ:
  71        case CIT_WRITE:
  72                /* don't need lock here to check lli_layout_gen as we have held
  73                 * extent lock and GROUP lock has to hold to swap layout
  74                 */
  75                if (ll_layout_version_get(lli) != vio->vui_layout_gen ||
  76                    OBD_FAIL_CHECK_RESET(OBD_FAIL_LLITE_LOST_LAYOUT, 0)) {
  77                        io->ci_need_restart = 1;
  78                        /* this will cause a short read/write */
  79                        io->ci_continue = 0;
  80                        rc = false;
  81                }
  82        case CIT_FAULT:
  83                /* fault is okay because we've already had a page. */
  84        default:
  85                break;
  86        }
  87
  88        return rc;
  89}
  90
  91static void vvp_object_size_lock(struct cl_object *obj)
  92{
  93        struct inode *inode = vvp_object_inode(obj);
  94
  95        ll_inode_size_lock(inode);
  96        cl_object_attr_lock(obj);
  97}
  98
  99static void vvp_object_size_unlock(struct cl_object *obj)
 100{
 101        struct inode *inode = vvp_object_inode(obj);
 102
 103        cl_object_attr_unlock(obj);
 104        ll_inode_size_unlock(inode);
 105}
 106
 107/**
 108 * Helper function that if necessary adjusts file size (inode->i_size), when
 109 * position at the offset \a pos is accessed. File size can be arbitrary stale
 110 * on a Lustre client, but client at least knows KMS. If accessed area is
 111 * inside [0, KMS], set file size to KMS, otherwise glimpse file size.
 112 *
 113 * Locking: cl_isize_lock is used to serialize changes to inode size and to
 114 * protect consistency between inode size and cl_object
 115 * attributes. cl_object_size_lock() protects consistency between cl_attr's of
 116 * top-object and sub-objects.
 117 */
 118static int vvp_prep_size(const struct lu_env *env, struct cl_object *obj,
 119                         struct cl_io *io, loff_t start, size_t count,
 120                         int *exceed)
 121{
 122        struct cl_attr *attr  = vvp_env_thread_attr(env);
 123        struct inode   *inode = vvp_object_inode(obj);
 124        loff_t    pos   = start + count - 1;
 125        loff_t kms;
 126        int result;
 127
 128        /*
 129         * Consistency guarantees: following possibilities exist for the
 130         * relation between region being accessed and real file size at this
 131         * moment:
 132         *
 133         *  (A): the region is completely inside of the file;
 134         *
 135         *  (B-x): x bytes of region are inside of the file, the rest is
 136         *  outside;
 137         *
 138         *  (C): the region is completely outside of the file.
 139         *
 140         * This classification is stable under DLM lock already acquired by
 141         * the caller, because to change the class, other client has to take
 142         * DLM lock conflicting with our lock. Also, any updates to ->i_size
 143         * by other threads on this client are serialized by
 144         * ll_inode_size_lock(). This guarantees that short reads are handled
 145         * correctly in the face of concurrent writes and truncates.
 146         */
 147        vvp_object_size_lock(obj);
 148        result = cl_object_attr_get(env, obj, attr);
 149        if (result == 0) {
 150                kms = attr->cat_kms;
 151                if (pos > kms) {
 152                        /*
 153                         * A glimpse is necessary to determine whether we
 154                         * return a short read (B) or some zeroes at the end
 155                         * of the buffer (C)
 156                         */
 157                        vvp_object_size_unlock(obj);
 158                        result = cl_glimpse_lock(env, io, inode, obj, 0);
 159                        if (result == 0 && exceed) {
 160                                /* If objective page index exceed end-of-file
 161                                 * page index, return directly. Do not expect
 162                                 * kernel will check such case correctly.
 163                                 * linux-2.6.18-128.1.1 miss to do that.
 164                                 * --bug 17336
 165                                 */
 166                                loff_t size = i_size_read(inode);
 167                                loff_t cur_index = start >> PAGE_SHIFT;
 168                                loff_t size_index = (size - 1) >> PAGE_SHIFT;
 169
 170                                if ((size == 0 && cur_index != 0) ||
 171                                    size_index < cur_index)
 172                                        *exceed = 1;
 173                        }
 174                        return result;
 175                }
 176                /*
 177                 * region is within kms and, hence, within real file
 178                 * size (A). We need to increase i_size to cover the
 179                 * read region so that generic_file_read() will do its
 180                 * job, but that doesn't mean the kms size is
 181                 * _correct_, it is only the _minimum_ size. If
 182                 * someone does a stat they will get the correct size
 183                 * which will always be >= the kms value here.
 184                 * b=11081
 185                 */
 186                if (i_size_read(inode) < kms) {
 187                        i_size_write(inode, kms);
 188                        CDEBUG(D_VFSTRACE, DFID " updating i_size %llu\n",
 189                               PFID(lu_object_fid(&obj->co_lu)),
 190                               (__u64)i_size_read(inode));
 191                }
 192        }
 193
 194        vvp_object_size_unlock(obj);
 195
 196        return result;
 197}
 198
 199/*****************************************************************************
 200 *
 201 * io operations.
 202 *
 203 */
 204
 205static int vvp_io_one_lock_index(const struct lu_env *env, struct cl_io *io,
 206                                 __u32 enqflags, enum cl_lock_mode mode,
 207                          pgoff_t start, pgoff_t end)
 208{
 209        struct vvp_io          *vio   = vvp_env_io(env);
 210        struct cl_lock_descr   *descr = &vio->vui_link.cill_descr;
 211        struct cl_object       *obj   = io->ci_obj;
 212
 213        CLOBINVRNT(env, obj, vvp_object_invariant(obj));
 214
 215        CDEBUG(D_VFSTRACE, "lock: %d [%lu, %lu]\n", mode, start, end);
 216
 217        memset(&vio->vui_link, 0, sizeof(vio->vui_link));
 218
 219        if (vio->vui_fd && (vio->vui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
 220                descr->cld_mode = CLM_GROUP;
 221                descr->cld_gid  = vio->vui_fd->fd_grouplock.lg_gid;
 222                enqflags |= CEF_LOCK_MATCH;
 223        } else {
 224                descr->cld_mode  = mode;
 225        }
 226        descr->cld_obj   = obj;
 227        descr->cld_start = start;
 228        descr->cld_end   = end;
 229        descr->cld_enq_flags = enqflags;
 230
 231        cl_io_lock_add(env, io, &vio->vui_link);
 232        return 0;
 233}
 234
 235static int vvp_io_one_lock(const struct lu_env *env, struct cl_io *io,
 236                           __u32 enqflags, enum cl_lock_mode mode,
 237                           loff_t start, loff_t end)
 238{
 239        struct cl_object *obj = io->ci_obj;
 240
 241        return vvp_io_one_lock_index(env, io, enqflags, mode,
 242                                     cl_index(obj, start), cl_index(obj, end));
 243}
 244
 245static int vvp_io_write_iter_init(const struct lu_env *env,
 246                                  const struct cl_io_slice *ios)
 247{
 248        struct vvp_io *vio = cl2vvp_io(env, ios);
 249
 250        cl_page_list_init(&vio->u.write.vui_queue);
 251        vio->u.write.vui_written = 0;
 252        vio->u.write.vui_from = 0;
 253        vio->u.write.vui_to = PAGE_SIZE;
 254
 255        return 0;
 256}
 257
 258static void vvp_io_write_iter_fini(const struct lu_env *env,
 259                                   const struct cl_io_slice *ios)
 260{
 261        struct vvp_io *vio = cl2vvp_io(env, ios);
 262
 263        LASSERT(vio->u.write.vui_queue.pl_nr == 0);
 264}
 265
 266static int vvp_io_fault_iter_init(const struct lu_env *env,
 267                                  const struct cl_io_slice *ios)
 268{
 269        struct vvp_io *vio   = cl2vvp_io(env, ios);
 270        struct inode  *inode = vvp_object_inode(ios->cis_obj);
 271
 272        LASSERT(inode == file_inode(vio->vui_fd->fd_file));
 273        vio->u.fault.ft_mtime = inode->i_mtime.tv_sec;
 274        return 0;
 275}
 276
 277static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
 278{
 279        struct cl_io     *io  = ios->cis_io;
 280        struct cl_object *obj = io->ci_obj;
 281        struct vvp_io    *vio = cl2vvp_io(env, ios);
 282        struct inode *inode = vvp_object_inode(obj);
 283
 284        CLOBINVRNT(env, obj, vvp_object_invariant(obj));
 285
 286        CDEBUG(D_VFSTRACE, DFID
 287               " ignore/verify layout %d/%d, layout version %d restore needed %d\n",
 288               PFID(lu_object_fid(&obj->co_lu)),
 289               io->ci_ignore_layout, io->ci_verify_layout,
 290               vio->vui_layout_gen, io->ci_restore_needed);
 291
 292        if (io->ci_restore_needed) {
 293                int     rc;
 294
 295                /* file was detected release, we need to restore it
 296                 * before finishing the io
 297                 */
 298                rc = ll_layout_restore(inode, 0, OBD_OBJECT_EOF);
 299                /* if restore registration failed, no restart,
 300                 * we will return -ENODATA
 301                 */
 302                /* The layout will change after restore, so we need to
 303                 * block on layout lock hold by the MDT
 304                 * as MDT will not send new layout in lvb (see LU-3124)
 305                 * we have to explicitly fetch it, all this will be done
 306                 * by ll_layout_refresh()
 307                 */
 308                if (rc == 0) {
 309                        io->ci_restore_needed = 0;
 310                        io->ci_need_restart = 1;
 311                        io->ci_verify_layout = 1;
 312                } else {
 313                        io->ci_restore_needed = 1;
 314                        io->ci_need_restart = 0;
 315                        io->ci_verify_layout = 0;
 316                        io->ci_result = rc;
 317                }
 318        }
 319
 320        if (!io->ci_ignore_layout && io->ci_verify_layout) {
 321                __u32 gen = 0;
 322
 323                /* check layout version */
 324                ll_layout_refresh(inode, &gen);
 325                io->ci_need_restart = vio->vui_layout_gen != gen;
 326                if (io->ci_need_restart) {
 327                        CDEBUG(D_VFSTRACE,
 328                               DFID " layout changed from %d to %d.\n",
 329                               PFID(lu_object_fid(&obj->co_lu)),
 330                               vio->vui_layout_gen, gen);
 331                        /* today successful restore is the only possible case */
 332                        /* restore was done, clear restoring state */
 333                        clear_bit(LLIF_FILE_RESTORING,
 334                                  &ll_i2info(inode)->lli_flags);
 335                }
 336        }
 337}
 338
 339static void vvp_io_fault_fini(const struct lu_env *env,
 340                              const struct cl_io_slice *ios)
 341{
 342        struct cl_io   *io   = ios->cis_io;
 343        struct cl_page *page = io->u.ci_fault.ft_page;
 344
 345        CLOBINVRNT(env, io->ci_obj, vvp_object_invariant(io->ci_obj));
 346
 347        if (page) {
 348                lu_ref_del(&page->cp_reference, "fault", io);
 349                cl_page_put(env, page);
 350                io->u.ci_fault.ft_page = NULL;
 351        }
 352        vvp_io_fini(env, ios);
 353}
 354
 355static enum cl_lock_mode vvp_mode_from_vma(struct vm_area_struct *vma)
 356{
 357        /*
 358         * we only want to hold PW locks if the mmap() can generate
 359         * writes back to the file and that only happens in shared
 360         * writable vmas
 361         */
 362        if ((vma->vm_flags & VM_SHARED) && (vma->vm_flags & VM_WRITE))
 363                return CLM_WRITE;
 364        return CLM_READ;
 365}
 366
 367static int vvp_mmap_locks(const struct lu_env *env,
 368                          struct vvp_io *vio, struct cl_io *io)
 369{
 370        struct vvp_thread_info *cti = vvp_env_info(env);
 371        struct mm_struct       *mm = current->mm;
 372        struct vm_area_struct  *vma;
 373        struct cl_lock_descr   *descr = &cti->vti_descr;
 374        union ldlm_policy_data policy;
 375        unsigned long      addr;
 376        ssize_t          count;
 377        int              result = 0;
 378        struct iov_iter i;
 379        struct iovec iov;
 380
 381        LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
 382
 383        if (!vio->vui_iter) /* nfs or loop back device write */
 384                return 0;
 385
 386        /* No MM (e.g. NFS)? No vmas too. */
 387        if (!mm)
 388                return 0;
 389
 390        iov_for_each(iov, i, *vio->vui_iter) {
 391                addr = (unsigned long)iov.iov_base;
 392                count = iov.iov_len;
 393                if (count == 0)
 394                        continue;
 395
 396                count += addr & (~PAGE_MASK);
 397                addr &= PAGE_MASK;
 398
 399                down_read(&mm->mmap_sem);
 400                while ((vma = our_vma(mm, addr, count)) != NULL) {
 401                        struct inode *inode = file_inode(vma->vm_file);
 402                        int flags = CEF_MUST;
 403
 404                        if (ll_file_nolock(vma->vm_file)) {
 405                                /*
 406                                 * For no lock case is not allowed for mmap
 407                                 */
 408                                result = -EINVAL;
 409                                break;
 410                        }
 411
 412                        /*
 413                         * XXX: Required lock mode can be weakened: CIT_WRITE
 414                         * io only ever reads user level buffer, and CIT_READ
 415                         * only writes on it.
 416                         */
 417                        policy_from_vma(&policy, vma, addr, count);
 418                        descr->cld_mode = vvp_mode_from_vma(vma);
 419                        descr->cld_obj = ll_i2info(inode)->lli_clob;
 420                        descr->cld_start = cl_index(descr->cld_obj,
 421                                                    policy.l_extent.start);
 422                        descr->cld_end = cl_index(descr->cld_obj,
 423                                                  policy.l_extent.end);
 424                        descr->cld_enq_flags = flags;
 425                        result = cl_io_lock_alloc_add(env, io, descr);
 426
 427                        CDEBUG(D_VFSTRACE, "lock: %d: [%lu, %lu]\n",
 428                               descr->cld_mode, descr->cld_start,
 429                               descr->cld_end);
 430
 431                        if (result < 0)
 432                                break;
 433
 434                        if (vma->vm_end - addr >= count)
 435                                break;
 436
 437                        count -= vma->vm_end - addr;
 438                        addr = vma->vm_end;
 439                }
 440                up_read(&mm->mmap_sem);
 441                if (result < 0)
 442                        break;
 443        }
 444        return result;
 445}
 446
 447static void vvp_io_advance(const struct lu_env *env,
 448                           const struct cl_io_slice *ios,
 449                           size_t nob)
 450{
 451        struct cl_object *obj = ios->cis_io->ci_obj;
 452        struct vvp_io    *vio = cl2vvp_io(env, ios);
 453
 454        CLOBINVRNT(env, obj, vvp_object_invariant(obj));
 455
 456        vio->vui_tot_count -= nob;
 457        iov_iter_reexpand(vio->vui_iter, vio->vui_tot_count);
 458}
 459
 460static void vvp_io_update_iov(const struct lu_env *env,
 461                              struct vvp_io *vio, struct cl_io *io)
 462{
 463        size_t size = io->u.ci_rw.crw_count;
 464
 465        if (!vio->vui_iter)
 466                return;
 467
 468        iov_iter_truncate(vio->vui_iter, size);
 469}
 470
 471static int vvp_io_rw_lock(const struct lu_env *env, struct cl_io *io,
 472                          enum cl_lock_mode mode, loff_t start, loff_t end)
 473{
 474        struct vvp_io *vio = vvp_env_io(env);
 475        int result;
 476        int ast_flags = 0;
 477
 478        LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
 479
 480        vvp_io_update_iov(env, vio, io);
 481
 482        if (io->u.ci_rw.crw_nonblock)
 483                ast_flags |= CEF_NONBLOCK;
 484        result = vvp_mmap_locks(env, vio, io);
 485        if (result == 0)
 486                result = vvp_io_one_lock(env, io, ast_flags, mode, start, end);
 487        return result;
 488}
 489
 490static int vvp_io_read_lock(const struct lu_env *env,
 491                            const struct cl_io_slice *ios)
 492{
 493        struct cl_io     *io = ios->cis_io;
 494        struct cl_io_rw_common *rd = &io->u.ci_rd.rd;
 495        int result;
 496
 497        result = vvp_io_rw_lock(env, io, CLM_READ, rd->crw_pos,
 498                                rd->crw_pos + rd->crw_count - 1);
 499
 500        return result;
 501}
 502
 503static int vvp_io_fault_lock(const struct lu_env *env,
 504                             const struct cl_io_slice *ios)
 505{
 506        struct cl_io *io   = ios->cis_io;
 507        struct vvp_io *vio = cl2vvp_io(env, ios);
 508        /*
 509         * XXX LDLM_FL_CBPENDING
 510         */
 511        return vvp_io_one_lock_index(env,
 512                                     io, 0,
 513                                     vvp_mode_from_vma(vio->u.fault.ft_vma),
 514                                     io->u.ci_fault.ft_index,
 515                                     io->u.ci_fault.ft_index);
 516}
 517
 518static int vvp_io_write_lock(const struct lu_env *env,
 519                             const struct cl_io_slice *ios)
 520{
 521        struct cl_io *io = ios->cis_io;
 522        loff_t start;
 523        loff_t end;
 524
 525        if (io->u.ci_wr.wr_append) {
 526                start = 0;
 527                end   = OBD_OBJECT_EOF;
 528        } else {
 529                start = io->u.ci_wr.wr.crw_pos;
 530                end   = start + io->u.ci_wr.wr.crw_count - 1;
 531        }
 532        return vvp_io_rw_lock(env, io, CLM_WRITE, start, end);
 533}
 534
 535static int vvp_io_setattr_iter_init(const struct lu_env *env,
 536                                    const struct cl_io_slice *ios)
 537{
 538        return 0;
 539}
 540
 541/**
 542 * Implementation of cl_io_operations::vio_lock() method for CIT_SETATTR io.
 543 *
 544 * Handles "lockless io" mode when extent locking is done by server.
 545 */
 546static int vvp_io_setattr_lock(const struct lu_env *env,
 547                               const struct cl_io_slice *ios)
 548{
 549        struct cl_io  *io  = ios->cis_io;
 550        __u64 new_size;
 551        __u32 enqflags = 0;
 552
 553        if (cl_io_is_trunc(io)) {
 554                new_size = io->u.ci_setattr.sa_attr.lvb_size;
 555                if (new_size == 0)
 556                        enqflags = CEF_DISCARD_DATA;
 557        } else {
 558                unsigned int valid = io->u.ci_setattr.sa_valid;
 559
 560                if (!(valid & TIMES_SET_FLAGS))
 561                        return 0;
 562
 563                if ((!(valid & ATTR_MTIME) ||
 564                     io->u.ci_setattr.sa_attr.lvb_mtime >=
 565                     io->u.ci_setattr.sa_attr.lvb_ctime) &&
 566                    (!(valid & ATTR_ATIME) ||
 567                     io->u.ci_setattr.sa_attr.lvb_atime >=
 568                     io->u.ci_setattr.sa_attr.lvb_ctime))
 569                        return 0;
 570                new_size = 0;
 571        }
 572
 573        return vvp_io_one_lock(env, io, enqflags, CLM_WRITE,
 574                               new_size, OBD_OBJECT_EOF);
 575}
 576
 577static int vvp_do_vmtruncate(struct inode *inode, size_t size)
 578{
 579        int     result;
 580        /*
 581         * Only ll_inode_size_lock is taken at this level.
 582         */
 583        ll_inode_size_lock(inode);
 584        result = inode_newsize_ok(inode, size);
 585        if (result < 0) {
 586                ll_inode_size_unlock(inode);
 587                return result;
 588        }
 589        truncate_setsize(inode, size);
 590        ll_inode_size_unlock(inode);
 591        return result;
 592}
 593
 594static int vvp_io_setattr_time(const struct lu_env *env,
 595                               const struct cl_io_slice *ios)
 596{
 597        struct cl_io       *io    = ios->cis_io;
 598        struct cl_object   *obj   = io->ci_obj;
 599        struct cl_attr     *attr  = vvp_env_thread_attr(env);
 600        int result;
 601        unsigned valid = CAT_CTIME;
 602
 603        cl_object_attr_lock(obj);
 604        attr->cat_ctime = io->u.ci_setattr.sa_attr.lvb_ctime;
 605        if (io->u.ci_setattr.sa_valid & ATTR_ATIME_SET) {
 606                attr->cat_atime = io->u.ci_setattr.sa_attr.lvb_atime;
 607                valid |= CAT_ATIME;
 608        }
 609        if (io->u.ci_setattr.sa_valid & ATTR_MTIME_SET) {
 610                attr->cat_mtime = io->u.ci_setattr.sa_attr.lvb_mtime;
 611                valid |= CAT_MTIME;
 612        }
 613        result = cl_object_attr_update(env, obj, attr, valid);
 614        cl_object_attr_unlock(obj);
 615
 616        return result;
 617}
 618
 619static int vvp_io_setattr_start(const struct lu_env *env,
 620                                const struct cl_io_slice *ios)
 621{
 622        struct cl_io    *io    = ios->cis_io;
 623        struct inode    *inode = vvp_object_inode(io->ci_obj);
 624        struct ll_inode_info *lli = ll_i2info(inode);
 625
 626        if (cl_io_is_trunc(io)) {
 627                down_write(&lli->lli_trunc_sem);
 628                inode_lock(inode);
 629                inode_dio_wait(inode);
 630        } else {
 631                inode_lock(inode);
 632        }
 633
 634        if (io->u.ci_setattr.sa_valid & TIMES_SET_FLAGS)
 635                return vvp_io_setattr_time(env, ios);
 636
 637        return 0;
 638}
 639
 640static void vvp_io_setattr_end(const struct lu_env *env,
 641                               const struct cl_io_slice *ios)
 642{
 643        struct cl_io *io    = ios->cis_io;
 644        struct inode *inode = vvp_object_inode(io->ci_obj);
 645        struct ll_inode_info *lli = ll_i2info(inode);
 646
 647        if (cl_io_is_trunc(io)) {
 648                /* Truncate in memory pages - they must be clean pages
 649                 * because osc has already notified to destroy osc_extents.
 650                 */
 651                vvp_do_vmtruncate(inode, io->u.ci_setattr.sa_attr.lvb_size);
 652                inode_unlock(inode);
 653                up_write(&lli->lli_trunc_sem);
 654        } else {
 655                inode_unlock(inode);
 656        }
 657}
 658
 659static void vvp_io_setattr_fini(const struct lu_env *env,
 660                                const struct cl_io_slice *ios)
 661{
 662        bool restore_needed = ios->cis_io->ci_restore_needed;
 663        struct inode *inode = vvp_object_inode(ios->cis_obj);
 664
 665        vvp_io_fini(env, ios);
 666
 667        if (restore_needed && !ios->cis_io->ci_restore_needed) {
 668                /* restore finished, set data modified flag for HSM */
 669                set_bit(LLIF_DATA_MODIFIED, &(ll_i2info(inode))->lli_flags);
 670        }
 671}
 672
 673static int vvp_io_read_start(const struct lu_env *env,
 674                             const struct cl_io_slice *ios)
 675{
 676        struct vvp_io     *vio   = cl2vvp_io(env, ios);
 677        struct cl_io      *io    = ios->cis_io;
 678        struct cl_object  *obj   = io->ci_obj;
 679        struct inode      *inode = vvp_object_inode(obj);
 680        struct ll_inode_info *lli = ll_i2info(inode);
 681        struct file       *file  = vio->vui_fd->fd_file;
 682
 683        int     result;
 684        loff_t  pos = io->u.ci_rd.rd.crw_pos;
 685        long    cnt = io->u.ci_rd.rd.crw_count;
 686        long    tot = vio->vui_tot_count;
 687        int     exceed = 0;
 688
 689        CLOBINVRNT(env, obj, vvp_object_invariant(obj));
 690
 691        CDEBUG(D_VFSTRACE, "read: -> [%lli, %lli)\n", pos, pos + cnt);
 692
 693        down_read(&lli->lli_trunc_sem);
 694
 695        if (!can_populate_pages(env, io, inode))
 696                return 0;
 697
 698        result = vvp_prep_size(env, obj, io, pos, tot, &exceed);
 699        if (result != 0)
 700                return result;
 701        else if (exceed != 0)
 702                goto out;
 703
 704        LU_OBJECT_HEADER(D_INODE, env, &obj->co_lu,
 705                         "Read ino %lu, %lu bytes, offset %lld, size %llu\n",
 706                         inode->i_ino, cnt, pos, i_size_read(inode));
 707
 708        /* turn off the kernel's read-ahead */
 709        vio->vui_fd->fd_file->f_ra.ra_pages = 0;
 710
 711        /* initialize read-ahead window once per syscall */
 712        if (!vio->vui_ra_valid) {
 713                vio->vui_ra_valid = true;
 714                vio->vui_ra_start = cl_index(obj, pos);
 715                vio->vui_ra_count = cl_index(obj, tot + PAGE_SIZE - 1);
 716                ll_ras_enter(file);
 717        }
 718
 719        /* BUG: 5972 */
 720        file_accessed(file);
 721        LASSERT(vio->vui_iocb->ki_pos == pos);
 722        result = generic_file_read_iter(vio->vui_iocb, vio->vui_iter);
 723
 724out:
 725        if (result >= 0) {
 726                if (result < cnt)
 727                        io->ci_continue = 0;
 728                io->ci_nob += result;
 729                ll_rw_stats_tally(ll_i2sbi(inode), current->pid,
 730                                  vio->vui_fd, pos, result, READ);
 731                result = 0;
 732        }
 733        return result;
 734}
 735
 736static int vvp_io_commit_sync(const struct lu_env *env, struct cl_io *io,
 737                              struct cl_page_list *plist, int from, int to)
 738{
 739        struct cl_2queue *queue = &io->ci_queue;
 740        struct cl_page *page;
 741        unsigned int bytes = 0;
 742        int rc = 0;
 743
 744        if (plist->pl_nr == 0)
 745                return 0;
 746
 747        if (from > 0 || to != PAGE_SIZE) {
 748                page = cl_page_list_first(plist);
 749                if (plist->pl_nr == 1) {
 750                        cl_page_clip(env, page, from, to);
 751                } else {
 752                        if (from > 0)
 753                                cl_page_clip(env, page, from, PAGE_SIZE);
 754                        if (to != PAGE_SIZE) {
 755                                page = cl_page_list_last(plist);
 756                                cl_page_clip(env, page, 0, to);
 757                        }
 758                }
 759        }
 760
 761        cl_2queue_init(queue);
 762        cl_page_list_splice(plist, &queue->c2_qin);
 763        rc = cl_io_submit_sync(env, io, CRT_WRITE, queue, 0);
 764
 765        /* plist is not sorted any more */
 766        cl_page_list_splice(&queue->c2_qin, plist);
 767        cl_page_list_splice(&queue->c2_qout, plist);
 768        cl_2queue_fini(env, queue);
 769
 770        if (rc == 0) {
 771                /* calculate bytes */
 772                bytes = plist->pl_nr << PAGE_SHIFT;
 773                bytes -= from + PAGE_SIZE - to;
 774
 775                while (plist->pl_nr > 0) {
 776                        page = cl_page_list_first(plist);
 777                        cl_page_list_del(env, plist, page);
 778
 779                        cl_page_clip(env, page, 0, PAGE_SIZE);
 780
 781                        SetPageUptodate(cl_page_vmpage(page));
 782                        cl_page_disown(env, io, page);
 783
 784                        /* held in ll_cl_init() */
 785                        lu_ref_del(&page->cp_reference, "cl_io", io);
 786                        cl_page_put(env, page);
 787                }
 788        }
 789
 790        return bytes > 0 ? bytes : rc;
 791}
 792
 793static void write_commit_callback(const struct lu_env *env, struct cl_io *io,
 794                                  struct cl_page *page)
 795{
 796        struct page *vmpage = page->cp_vmpage;
 797
 798        SetPageUptodate(vmpage);
 799        set_page_dirty(vmpage);
 800
 801        cl_page_disown(env, io, page);
 802
 803        /* held in ll_cl_init() */
 804        lu_ref_del(&page->cp_reference, "cl_io", cl_io_top(io));
 805        cl_page_put(env, page);
 806}
 807
 808/* make sure the page list is contiguous */
 809static bool page_list_sanity_check(struct cl_object *obj,
 810                                   struct cl_page_list *plist)
 811{
 812        struct cl_page *page;
 813        pgoff_t index = CL_PAGE_EOF;
 814
 815        cl_page_list_for_each(page, plist) {
 816                struct vvp_page *vpg = cl_object_page_slice(obj, page);
 817
 818                if (index == CL_PAGE_EOF) {
 819                        index = vvp_index(vpg);
 820                        continue;
 821                }
 822
 823                ++index;
 824                if (index == vvp_index(vpg))
 825                        continue;
 826
 827                return false;
 828        }
 829        return true;
 830}
 831
 832/* Return how many bytes have queued or written */
 833int vvp_io_write_commit(const struct lu_env *env, struct cl_io *io)
 834{
 835        struct cl_object *obj = io->ci_obj;
 836        struct inode *inode = vvp_object_inode(obj);
 837        struct vvp_io *vio = vvp_env_io(env);
 838        struct cl_page_list *queue = &vio->u.write.vui_queue;
 839        struct cl_page *page;
 840        int rc = 0;
 841        int bytes = 0;
 842        unsigned int npages = vio->u.write.vui_queue.pl_nr;
 843
 844        if (npages == 0)
 845                return 0;
 846
 847        CDEBUG(D_VFSTRACE, "commit async pages: %d, from %d, to %d\n",
 848               npages, vio->u.write.vui_from, vio->u.write.vui_to);
 849
 850        LASSERT(page_list_sanity_check(obj, queue));
 851
 852        /* submit IO with async write */
 853        rc = cl_io_commit_async(env, io, queue,
 854                                vio->u.write.vui_from, vio->u.write.vui_to,
 855                                write_commit_callback);
 856        npages -= queue->pl_nr; /* already committed pages */
 857        if (npages > 0) {
 858                /* calculate how many bytes were written */
 859                bytes = npages << PAGE_SHIFT;
 860
 861                /* first page */
 862                bytes -= vio->u.write.vui_from;
 863                if (queue->pl_nr == 0) /* last page */
 864                        bytes -= PAGE_SIZE - vio->u.write.vui_to;
 865                LASSERTF(bytes > 0, "bytes = %d, pages = %d\n", bytes, npages);
 866
 867                vio->u.write.vui_written += bytes;
 868
 869                CDEBUG(D_VFSTRACE, "Committed %d pages %d bytes, tot: %ld\n",
 870                       npages, bytes, vio->u.write.vui_written);
 871
 872                /* the first page must have been written. */
 873                vio->u.write.vui_from = 0;
 874        }
 875        LASSERT(page_list_sanity_check(obj, queue));
 876        LASSERT(ergo(rc == 0, queue->pl_nr == 0));
 877
 878        /* out of quota, try sync write */
 879        if (rc == -EDQUOT && !cl_io_is_mkwrite(io)) {
 880                rc = vvp_io_commit_sync(env, io, queue,
 881                                        vio->u.write.vui_from,
 882                                        vio->u.write.vui_to);
 883                if (rc > 0) {
 884                        vio->u.write.vui_written += rc;
 885                        rc = 0;
 886                }
 887        }
 888
 889        /* update inode size */
 890        ll_merge_attr(env, inode);
 891
 892        /* Now the pages in queue were failed to commit, discard them
 893         * unless they were dirtied before.
 894         */
 895        while (queue->pl_nr > 0) {
 896                page = cl_page_list_first(queue);
 897                cl_page_list_del(env, queue, page);
 898
 899                if (!PageDirty(cl_page_vmpage(page)))
 900                        cl_page_discard(env, io, page);
 901
 902                cl_page_disown(env, io, page);
 903
 904                /* held in ll_cl_init() */
 905                lu_ref_del(&page->cp_reference, "cl_io", io);
 906                cl_page_put(env, page);
 907        }
 908        cl_page_list_fini(env, queue);
 909
 910        return rc;
 911}
 912
 913static int vvp_io_write_start(const struct lu_env *env,
 914                              const struct cl_io_slice *ios)
 915{
 916        struct vvp_io      *vio   = cl2vvp_io(env, ios);
 917        struct cl_io       *io    = ios->cis_io;
 918        struct cl_object   *obj   = io->ci_obj;
 919        struct inode       *inode = vvp_object_inode(obj);
 920        struct ll_inode_info *lli = ll_i2info(inode);
 921        ssize_t result = 0;
 922        loff_t pos = io->u.ci_wr.wr.crw_pos;
 923        size_t cnt = io->u.ci_wr.wr.crw_count;
 924
 925        down_read(&lli->lli_trunc_sem);
 926
 927        if (!can_populate_pages(env, io, inode))
 928                return 0;
 929
 930        if (cl_io_is_append(io)) {
 931                /*
 932                 * PARALLEL IO This has to be changed for parallel IO doing
 933                 * out-of-order writes.
 934                 */
 935                ll_merge_attr(env, inode);
 936                pos = i_size_read(inode);
 937                io->u.ci_wr.wr.crw_pos = pos;
 938                vio->vui_iocb->ki_pos = pos;
 939        } else {
 940                LASSERT(vio->vui_iocb->ki_pos == pos);
 941        }
 942
 943        CDEBUG(D_VFSTRACE, "write: [%lli, %lli)\n", pos, pos + (long long)cnt);
 944
 945        /*
 946         * The maximum Lustre file size is variable, based on the OST maximum
 947         * object size and number of stripes.  This needs another check in
 948         * addition to the VFS checks earlier.
 949         */
 950        if (pos + cnt > ll_file_maxbytes(inode)) {
 951                CDEBUG(D_INODE,
 952                       "%s: file " DFID " offset %llu > maxbytes %llu\n",
 953                       ll_get_fsname(inode->i_sb, NULL, 0),
 954                       PFID(ll_inode2fid(inode)), pos + cnt,
 955                       ll_file_maxbytes(inode));
 956                return -EFBIG;
 957        }
 958
 959        if (!vio->vui_iter) {
 960                /* from a temp io in ll_cl_init(). */
 961                result = 0;
 962        } else {
 963                /*
 964                 * When using the locked AIO function (generic_file_aio_write())
 965                 * testing has shown the inode mutex to be a limiting factor
 966                 * with multi-threaded single shared file performance. To get
 967                 * around this, we now use the lockless version. To maintain
 968                 * consistency, proper locking to protect against writes,
 969                 * trucates, etc. is handled in the higher layers of lustre.
 970                 */
 971                bool lock_node = !IS_NOSEC(inode);
 972
 973                if (lock_node)
 974                        inode_lock(inode);
 975                result = __generic_file_write_iter(vio->vui_iocb,
 976                                                   vio->vui_iter);
 977                if (lock_node)
 978                        inode_unlock(inode);
 979
 980                if (result > 0 || result == -EIOCBQUEUED)
 981                        result = generic_write_sync(vio->vui_iocb, result);
 982        }
 983
 984        if (result > 0) {
 985                result = vvp_io_write_commit(env, io);
 986                if (vio->u.write.vui_written > 0) {
 987                        result = vio->u.write.vui_written;
 988                        io->ci_nob += result;
 989
 990                        CDEBUG(D_VFSTRACE, "write: nob %zd, result: %zd\n",
 991                               io->ci_nob, result);
 992                }
 993        }
 994        if (result > 0) {
 995                set_bit(LLIF_DATA_MODIFIED, &(ll_i2info(inode))->lli_flags);
 996
 997                if (result < cnt)
 998                        io->ci_continue = 0;
 999                ll_rw_stats_tally(ll_i2sbi(inode), current->pid,
1000                                  vio->vui_fd, pos, result, WRITE);
1001                result = 0;
1002        }
1003        return result;
1004}
1005
1006static void vvp_io_rw_end(const struct lu_env *env,
1007                          const struct cl_io_slice *ios)
1008{
1009        struct inode *inode = vvp_object_inode(ios->cis_obj);
1010        struct ll_inode_info *lli = ll_i2info(inode);
1011
1012        up_read(&lli->lli_trunc_sem);
1013}
1014
1015static int vvp_io_kernel_fault(struct vvp_fault_io *cfio)
1016{
1017        struct vm_fault *vmf = cfio->ft_vmf;
1018
1019        cfio->ft_flags = filemap_fault(vmf);
1020        cfio->ft_flags_valid = 1;
1021
1022        if (vmf->page) {
1023                CDEBUG(D_PAGE,
1024                       "page %p map %p index %lu flags %lx count %u priv %0lx: got addr %p type NOPAGE\n",
1025                       vmf->page, vmf->page->mapping, vmf->page->index,
1026                       (long)vmf->page->flags, page_count(vmf->page),
1027                       page_private(vmf->page), (void *)vmf->address);
1028                if (unlikely(!(cfio->ft_flags & VM_FAULT_LOCKED))) {
1029                        lock_page(vmf->page);
1030                        cfio->ft_flags |= VM_FAULT_LOCKED;
1031                }
1032
1033                cfio->ft_vmpage = vmf->page;
1034                return 0;
1035        }
1036
1037        if (cfio->ft_flags & (VM_FAULT_SIGBUS | VM_FAULT_SIGSEGV)) {
1038                CDEBUG(D_PAGE, "got addr %p - SIGBUS\n", (void *)vmf->address);
1039                return -EFAULT;
1040        }
1041
1042        if (cfio->ft_flags & VM_FAULT_OOM) {
1043                CDEBUG(D_PAGE, "got addr %p - OOM\n", (void *)vmf->address);
1044                return -ENOMEM;
1045        }
1046
1047        if (cfio->ft_flags & VM_FAULT_RETRY)
1048                return -EAGAIN;
1049
1050        CERROR("Unknown error in page fault %d!\n", cfio->ft_flags);
1051        return -EINVAL;
1052}
1053
1054static void mkwrite_commit_callback(const struct lu_env *env, struct cl_io *io,
1055                                    struct cl_page *page)
1056{
1057        set_page_dirty(page->cp_vmpage);
1058}
1059
1060static int vvp_io_fault_start(const struct lu_env *env,
1061                              const struct cl_io_slice *ios)
1062{
1063        struct vvp_io       *vio     = cl2vvp_io(env, ios);
1064        struct cl_io    *io      = ios->cis_io;
1065        struct cl_object    *obj     = io->ci_obj;
1066        struct inode        *inode   = vvp_object_inode(obj);
1067        struct ll_inode_info *lli = ll_i2info(inode);
1068        struct cl_fault_io  *fio     = &io->u.ci_fault;
1069        struct vvp_fault_io *cfio    = &vio->u.fault;
1070        loff_t         offset;
1071        int               result  = 0;
1072        struct page       *vmpage  = NULL;
1073        struct cl_page      *page;
1074        loff_t         size;
1075        pgoff_t              last_index;
1076
1077        down_read(&lli->lli_trunc_sem);
1078
1079        /* offset of the last byte on the page */
1080        offset = cl_offset(obj, fio->ft_index + 1) - 1;
1081        LASSERT(cl_index(obj, offset) == fio->ft_index);
1082        result = vvp_prep_size(env, obj, io, 0, offset + 1, NULL);
1083        if (result != 0)
1084                return result;
1085
1086        /* must return locked page */
1087        if (fio->ft_mkwrite) {
1088                LASSERT(cfio->ft_vmpage);
1089                lock_page(cfio->ft_vmpage);
1090        } else {
1091                result = vvp_io_kernel_fault(cfio);
1092                if (result != 0)
1093                        return result;
1094        }
1095
1096        vmpage = cfio->ft_vmpage;
1097        LASSERT(PageLocked(vmpage));
1098
1099        if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_FAULT_TRUNC_RACE))
1100                ll_invalidate_page(vmpage);
1101
1102        size = i_size_read(inode);
1103        /* Though we have already held a cl_lock upon this page, but
1104         * it still can be truncated locally.
1105         */
1106        if (unlikely((vmpage->mapping != inode->i_mapping) ||
1107                     (page_offset(vmpage) > size))) {
1108                CDEBUG(D_PAGE, "llite: fault and truncate race happened!\n");
1109
1110                /* return +1 to stop cl_io_loop() and ll_fault() will catch
1111                 * and retry.
1112                 */
1113                result = 1;
1114                goto out;
1115        }
1116
1117        last_index = cl_index(obj, size - 1);
1118
1119        if (fio->ft_mkwrite) {
1120                /*
1121                 * Capture the size while holding the lli_trunc_sem from above
1122                 * we want to make sure that we complete the mkwrite action
1123                 * while holding this lock. We need to make sure that we are
1124                 * not past the end of the file.
1125                 */
1126                if (last_index < fio->ft_index) {
1127                        CDEBUG(D_PAGE,
1128                               "llite: mkwrite and truncate race happened: %p: 0x%lx 0x%lx\n",
1129                               vmpage->mapping, fio->ft_index, last_index);
1130                        /*
1131                         * We need to return if we are
1132                         * passed the end of the file. This will propagate
1133                         * up the call stack to ll_page_mkwrite where
1134                         * we will return VM_FAULT_NOPAGE. Any non-negative
1135                         * value returned here will be silently
1136                         * converted to 0. If the vmpage->mapping is null
1137                         * the error code would be converted back to ENODATA
1138                         * in ll_page_mkwrite0. Thus we return -ENODATA
1139                         * to handle both cases
1140                         */
1141                        result = -ENODATA;
1142                        goto out;
1143                }
1144        }
1145
1146        page = cl_page_find(env, obj, fio->ft_index, vmpage, CPT_CACHEABLE);
1147        if (IS_ERR(page)) {
1148                result = PTR_ERR(page);
1149                goto out;
1150        }
1151
1152        /* if page is going to be written, we should add this page into cache
1153         * earlier.
1154         */
1155        if (fio->ft_mkwrite) {
1156                wait_on_page_writeback(vmpage);
1157                if (!PageDirty(vmpage)) {
1158                        struct cl_page_list *plist = &io->ci_queue.c2_qin;
1159                        struct vvp_page *vpg = cl_object_page_slice(obj, page);
1160                        int to = PAGE_SIZE;
1161
1162                        /* vvp_page_assume() calls wait_on_page_writeback(). */
1163                        cl_page_assume(env, io, page);
1164
1165                        cl_page_list_init(plist);
1166                        cl_page_list_add(plist, page);
1167
1168                        /* size fixup */
1169                        if (last_index == vvp_index(vpg))
1170                                to = size & ~PAGE_MASK;
1171
1172                        /* Do not set Dirty bit here so that in case IO is
1173                         * started before the page is really made dirty, we
1174                         * still have chance to detect it.
1175                         */
1176                        result = cl_io_commit_async(env, io, plist, 0, to,
1177                                                    mkwrite_commit_callback);
1178                        LASSERT(cl_page_is_owned(page, io));
1179                        cl_page_list_fini(env, plist);
1180
1181                        vmpage = NULL;
1182                        if (result < 0) {
1183                                cl_page_discard(env, io, page);
1184                                cl_page_disown(env, io, page);
1185
1186                                cl_page_put(env, page);
1187
1188                                /* we're in big trouble, what can we do now? */
1189                                if (result == -EDQUOT)
1190                                        result = -ENOSPC;
1191                                goto out;
1192                        } else {
1193                                cl_page_disown(env, io, page);
1194                        }
1195                }
1196        }
1197
1198        /*
1199         * The ft_index is only used in the case of
1200         * a mkwrite action. We need to check
1201         * our assertions are correct, since
1202         * we should have caught this above
1203         */
1204        LASSERT(!fio->ft_mkwrite || fio->ft_index <= last_index);
1205        if (fio->ft_index == last_index)
1206                /*
1207                 * Last page is mapped partially.
1208                 */
1209                fio->ft_nob = size - cl_offset(obj, fio->ft_index);
1210        else
1211                fio->ft_nob = cl_page_size(obj);
1212
1213        lu_ref_add(&page->cp_reference, "fault", io);
1214        fio->ft_page = page;
1215
1216out:
1217        /* return unlocked vmpage to avoid deadlocking */
1218        if (vmpage)
1219                unlock_page(vmpage);
1220
1221        cfio->ft_flags &= ~VM_FAULT_LOCKED;
1222
1223        return result;
1224}
1225
1226static void vvp_io_fault_end(const struct lu_env *env,
1227                             const struct cl_io_slice *ios)
1228{
1229        struct inode *inode = vvp_object_inode(ios->cis_obj);
1230        struct ll_inode_info *lli = ll_i2info(inode);
1231
1232        CLOBINVRNT(env, ios->cis_io->ci_obj,
1233                   vvp_object_invariant(ios->cis_io->ci_obj));
1234        up_read(&lli->lli_trunc_sem);
1235}
1236
1237static int vvp_io_fsync_start(const struct lu_env *env,
1238                              const struct cl_io_slice *ios)
1239{
1240        /* we should mark TOWRITE bit to each dirty page in radix tree to
1241         * verify pages have been written, but this is difficult because of
1242         * race.
1243         */
1244        return 0;
1245}
1246
1247static int vvp_io_read_ahead(const struct lu_env *env,
1248                             const struct cl_io_slice *ios,
1249                             pgoff_t start, struct cl_read_ahead *ra)
1250{
1251        int result = 0;
1252
1253        if (ios->cis_io->ci_type == CIT_READ ||
1254            ios->cis_io->ci_type == CIT_FAULT) {
1255                struct vvp_io *vio = cl2vvp_io(env, ios);
1256
1257                if (unlikely(vio->vui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1258                        ra->cra_end = CL_PAGE_EOF;
1259                        result = 1; /* no need to call down */
1260                }
1261        }
1262
1263        return result;
1264}
1265
1266static const struct cl_io_operations vvp_io_ops = {
1267        .op = {
1268                [CIT_READ] = {
1269                        .cio_fini       = vvp_io_fini,
1270                        .cio_lock      = vvp_io_read_lock,
1271                        .cio_start     = vvp_io_read_start,
1272                        .cio_end        = vvp_io_rw_end,
1273                        .cio_advance    = vvp_io_advance,
1274                },
1275                [CIT_WRITE] = {
1276                        .cio_fini      = vvp_io_fini,
1277                        .cio_iter_init = vvp_io_write_iter_init,
1278                        .cio_iter_fini = vvp_io_write_iter_fini,
1279                        .cio_lock      = vvp_io_write_lock,
1280                        .cio_start     = vvp_io_write_start,
1281                        .cio_end        = vvp_io_rw_end,
1282                        .cio_advance   = vvp_io_advance,
1283                },
1284                [CIT_SETATTR] = {
1285                        .cio_fini       = vvp_io_setattr_fini,
1286                        .cio_iter_init  = vvp_io_setattr_iter_init,
1287                        .cio_lock       = vvp_io_setattr_lock,
1288                        .cio_start      = vvp_io_setattr_start,
1289                        .cio_end        = vvp_io_setattr_end
1290                },
1291                [CIT_FAULT] = {
1292                        .cio_fini      = vvp_io_fault_fini,
1293                        .cio_iter_init = vvp_io_fault_iter_init,
1294                        .cio_lock      = vvp_io_fault_lock,
1295                        .cio_start     = vvp_io_fault_start,
1296                        .cio_end       = vvp_io_fault_end,
1297                },
1298                [CIT_FSYNC] = {
1299                        .cio_start  = vvp_io_fsync_start,
1300                        .cio_fini   = vvp_io_fini
1301                },
1302                [CIT_MISC] = {
1303                        .cio_fini   = vvp_io_fini
1304                }
1305        },
1306        .cio_read_ahead = vvp_io_read_ahead,
1307};
1308
1309int vvp_io_init(const struct lu_env *env, struct cl_object *obj,
1310                struct cl_io *io)
1311{
1312        struct vvp_io      *vio   = vvp_env_io(env);
1313        struct inode       *inode = vvp_object_inode(obj);
1314        int              result;
1315
1316        CLOBINVRNT(env, obj, vvp_object_invariant(obj));
1317
1318        CDEBUG(D_VFSTRACE, DFID
1319               " ignore/verify layout %d/%d, layout version %d restore needed %d\n",
1320               PFID(lu_object_fid(&obj->co_lu)),
1321               io->ci_ignore_layout, io->ci_verify_layout,
1322               vio->vui_layout_gen, io->ci_restore_needed);
1323
1324        CL_IO_SLICE_CLEAN(vio, vui_cl);
1325        cl_io_slice_add(io, &vio->vui_cl, obj, &vvp_io_ops);
1326        vio->vui_ra_valid = false;
1327        result = 0;
1328        if (io->ci_type == CIT_READ || io->ci_type == CIT_WRITE) {
1329                size_t count;
1330                struct ll_inode_info *lli = ll_i2info(inode);
1331
1332                count = io->u.ci_rw.crw_count;
1333                /* "If nbyte is 0, read() will return 0 and have no other
1334                 *  results."  -- Single Unix Spec
1335                 */
1336                if (count == 0)
1337                        result = 1;
1338                else
1339                        vio->vui_tot_count = count;
1340
1341                /* for read/write, we store the jobid in the inode, and
1342                 * it'll be fetched by osc when building RPC.
1343                 *
1344                 * it's not accurate if the file is shared by different
1345                 * jobs.
1346                 */
1347                lustre_get_jobid(lli->lli_jobid);
1348        } else if (io->ci_type == CIT_SETATTR) {
1349                if (!cl_io_is_trunc(io))
1350                        io->ci_lockreq = CILR_MANDATORY;
1351        }
1352
1353        /* Enqueue layout lock and get layout version. We need to do this
1354         * even for operations requiring to open file, such as read and write,
1355         * because it might not grant layout lock in IT_OPEN.
1356         */
1357        if (result == 0 && !io->ci_ignore_layout) {
1358                result = ll_layout_refresh(inode, &vio->vui_layout_gen);
1359                if (result == -ENOENT)
1360                        /* If the inode on MDS has been removed, but the objects
1361                         * on OSTs haven't been destroyed (async unlink), layout
1362                         * fetch will return -ENOENT, we'd ignore this error
1363                         * and continue with dirty flush. LU-3230.
1364                         */
1365                        result = 0;
1366                if (result < 0)
1367                        CERROR("%s: refresh file layout " DFID " error %d.\n",
1368                               ll_get_fsname(inode->i_sb, NULL, 0),
1369                               PFID(lu_object_fid(&obj->co_lu)), result);
1370        }
1371
1372        return result;
1373}
1374