linux/fs/ceph/file.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2#include <linux/ceph/ceph_debug.h>
   3#include <linux/ceph/striper.h>
   4
   5#include <linux/module.h>
   6#include <linux/sched.h>
   7#include <linux/slab.h>
   8#include <linux/file.h>
   9#include <linux/mount.h>
  10#include <linux/namei.h>
  11#include <linux/writeback.h>
  12#include <linux/falloc.h>
  13#include <linux/iversion.h>
  14#include <linux/ktime.h>
  15
  16#include "super.h"
  17#include "mds_client.h"
  18#include "cache.h"
  19#include "io.h"
  20#include "metric.h"
  21
  22static __le32 ceph_flags_sys2wire(u32 flags)
  23{
  24        u32 wire_flags = 0;
  25
  26        switch (flags & O_ACCMODE) {
  27        case O_RDONLY:
  28                wire_flags |= CEPH_O_RDONLY;
  29                break;
  30        case O_WRONLY:
  31                wire_flags |= CEPH_O_WRONLY;
  32                break;
  33        case O_RDWR:
  34                wire_flags |= CEPH_O_RDWR;
  35                break;
  36        }
  37
  38        flags &= ~O_ACCMODE;
  39
  40#define ceph_sys2wire(a) if (flags & a) { wire_flags |= CEPH_##a; flags &= ~a; }
  41
  42        ceph_sys2wire(O_CREAT);
  43        ceph_sys2wire(O_EXCL);
  44        ceph_sys2wire(O_TRUNC);
  45        ceph_sys2wire(O_DIRECTORY);
  46        ceph_sys2wire(O_NOFOLLOW);
  47
  48#undef ceph_sys2wire
  49
  50        if (flags)
  51                dout("unused open flags: %x\n", flags);
  52
  53        return cpu_to_le32(wire_flags);
  54}
  55
  56/*
  57 * Ceph file operations
  58 *
  59 * Implement basic open/close functionality, and implement
  60 * read/write.
  61 *
  62 * We implement three modes of file I/O:
  63 *  - buffered uses the generic_file_aio_{read,write} helpers
  64 *
  65 *  - synchronous is used when there is multi-client read/write
  66 *    sharing, avoids the page cache, and synchronously waits for an
  67 *    ack from the OSD.
  68 *
  69 *  - direct io takes the variant of the sync path that references
  70 *    user pages directly.
  71 *
  72 * fsync() flushes and waits on dirty pages, but just queues metadata
  73 * for writeback: since the MDS can recover size and mtime there is no
  74 * need to wait for MDS acknowledgement.
  75 */
  76
  77/*
  78 * How many pages to get in one call to iov_iter_get_pages().  This
  79 * determines the size of the on-stack array used as a buffer.
  80 */
  81#define ITER_GET_BVECS_PAGES    64
  82
  83static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
  84                                struct bio_vec *bvecs)
  85{
  86        size_t size = 0;
  87        int bvec_idx = 0;
  88
  89        if (maxsize > iov_iter_count(iter))
  90                maxsize = iov_iter_count(iter);
  91
  92        while (size < maxsize) {
  93                struct page *pages[ITER_GET_BVECS_PAGES];
  94                ssize_t bytes;
  95                size_t start;
  96                int idx = 0;
  97
  98                bytes = iov_iter_get_pages(iter, pages, maxsize - size,
  99                                           ITER_GET_BVECS_PAGES, &start);
 100                if (bytes < 0)
 101                        return size ?: bytes;
 102
 103                iov_iter_advance(iter, bytes);
 104                size += bytes;
 105
 106                for ( ; bytes; idx++, bvec_idx++) {
 107                        struct bio_vec bv = {
 108                                .bv_page = pages[idx],
 109                                .bv_len = min_t(int, bytes, PAGE_SIZE - start),
 110                                .bv_offset = start,
 111                        };
 112
 113                        bvecs[bvec_idx] = bv;
 114                        bytes -= bv.bv_len;
 115                        start = 0;
 116                }
 117        }
 118
 119        return size;
 120}
 121
 122/*
 123 * iov_iter_get_pages() only considers one iov_iter segment, no matter
 124 * what maxsize or maxpages are given.  For ITER_BVEC that is a single
 125 * page.
 126 *
 127 * Attempt to get up to @maxsize bytes worth of pages from @iter.
 128 * Return the number of bytes in the created bio_vec array, or an error.
 129 */
 130static ssize_t iter_get_bvecs_alloc(struct iov_iter *iter, size_t maxsize,
 131                                    struct bio_vec **bvecs, int *num_bvecs)
 132{
 133        struct bio_vec *bv;
 134        size_t orig_count = iov_iter_count(iter);
 135        ssize_t bytes;
 136        int npages;
 137
 138        iov_iter_truncate(iter, maxsize);
 139        npages = iov_iter_npages(iter, INT_MAX);
 140        iov_iter_reexpand(iter, orig_count);
 141
 142        /*
 143         * __iter_get_bvecs() may populate only part of the array -- zero it
 144         * out.
 145         */
 146        bv = kvmalloc_array(npages, sizeof(*bv), GFP_KERNEL | __GFP_ZERO);
 147        if (!bv)
 148                return -ENOMEM;
 149
 150        bytes = __iter_get_bvecs(iter, maxsize, bv);
 151        if (bytes < 0) {
 152                /*
 153                 * No pages were pinned -- just free the array.
 154                 */
 155                kvfree(bv);
 156                return bytes;
 157        }
 158
 159        *bvecs = bv;
 160        *num_bvecs = npages;
 161        return bytes;
 162}
 163
 164static void put_bvecs(struct bio_vec *bvecs, int num_bvecs, bool should_dirty)
 165{
 166        int i;
 167
 168        for (i = 0; i < num_bvecs; i++) {
 169                if (bvecs[i].bv_page) {
 170                        if (should_dirty)
 171                                set_page_dirty_lock(bvecs[i].bv_page);
 172                        put_page(bvecs[i].bv_page);
 173                }
 174        }
 175        kvfree(bvecs);
 176}
 177
 178/*
 179 * Prepare an open request.  Preallocate ceph_cap to avoid an
 180 * inopportune ENOMEM later.
 181 */
 182static struct ceph_mds_request *
 183prepare_open_request(struct super_block *sb, int flags, int create_mode)
 184{
 185        struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(sb);
 186        struct ceph_mds_request *req;
 187        int want_auth = USE_ANY_MDS;
 188        int op = (flags & O_CREAT) ? CEPH_MDS_OP_CREATE : CEPH_MDS_OP_OPEN;
 189
 190        if (flags & (O_WRONLY|O_RDWR|O_CREAT|O_TRUNC))
 191                want_auth = USE_AUTH_MDS;
 192
 193        req = ceph_mdsc_create_request(mdsc, op, want_auth);
 194        if (IS_ERR(req))
 195                goto out;
 196        req->r_fmode = ceph_flags_to_mode(flags);
 197        req->r_args.open.flags = ceph_flags_sys2wire(flags);
 198        req->r_args.open.mode = cpu_to_le32(create_mode);
 199out:
 200        return req;
 201}
 202
 203static int ceph_init_file_info(struct inode *inode, struct file *file,
 204                                        int fmode, bool isdir)
 205{
 206        struct ceph_inode_info *ci = ceph_inode(inode);
 207        struct ceph_file_info *fi;
 208
 209        dout("%s %p %p 0%o (%s)\n", __func__, inode, file,
 210                        inode->i_mode, isdir ? "dir" : "regular");
 211        BUG_ON(inode->i_fop->release != ceph_release);
 212
 213        if (isdir) {
 214                struct ceph_dir_file_info *dfi =
 215                        kmem_cache_zalloc(ceph_dir_file_cachep, GFP_KERNEL);
 216                if (!dfi)
 217                        return -ENOMEM;
 218
 219                file->private_data = dfi;
 220                fi = &dfi->file_info;
 221                dfi->next_offset = 2;
 222                dfi->readdir_cache_idx = -1;
 223        } else {
 224                fi = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL);
 225                if (!fi)
 226                        return -ENOMEM;
 227
 228                file->private_data = fi;
 229        }
 230
 231        ceph_get_fmode(ci, fmode, 1);
 232        fi->fmode = fmode;
 233
 234        spin_lock_init(&fi->rw_contexts_lock);
 235        INIT_LIST_HEAD(&fi->rw_contexts);
 236        fi->meta_err = errseq_sample(&ci->i_meta_err);
 237        fi->filp_gen = READ_ONCE(ceph_inode_to_client(inode)->filp_gen);
 238
 239        return 0;
 240}
 241
 242/*
 243 * initialize private struct file data.
 244 * if we fail, clean up by dropping fmode reference on the ceph_inode
 245 */
 246static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
 247{
 248        int ret = 0;
 249
 250        switch (inode->i_mode & S_IFMT) {
 251        case S_IFREG:
 252                ceph_fscache_register_inode_cookie(inode);
 253                ceph_fscache_file_set_cookie(inode, file);
 254                fallthrough;
 255        case S_IFDIR:
 256                ret = ceph_init_file_info(inode, file, fmode,
 257                                                S_ISDIR(inode->i_mode));
 258                break;
 259
 260        case S_IFLNK:
 261                dout("init_file %p %p 0%o (symlink)\n", inode, file,
 262                     inode->i_mode);
 263                break;
 264
 265        default:
 266                dout("init_file %p %p 0%o (special)\n", inode, file,
 267                     inode->i_mode);
 268                /*
 269                 * we need to drop the open ref now, since we don't
 270                 * have .release set to ceph_release.
 271                 */
 272                BUG_ON(inode->i_fop->release == ceph_release);
 273
 274                /* call the proper open fop */
 275                ret = inode->i_fop->open(inode, file);
 276        }
 277        return ret;
 278}
 279
 280/*
 281 * try renew caps after session gets killed.
 282 */
 283int ceph_renew_caps(struct inode *inode, int fmode)
 284{
 285        struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
 286        struct ceph_inode_info *ci = ceph_inode(inode);
 287        struct ceph_mds_request *req;
 288        int err, flags, wanted;
 289
 290        spin_lock(&ci->i_ceph_lock);
 291        __ceph_touch_fmode(ci, mdsc, fmode);
 292        wanted = __ceph_caps_file_wanted(ci);
 293        if (__ceph_is_any_real_caps(ci) &&
 294            (!(wanted & CEPH_CAP_ANY_WR) || ci->i_auth_cap)) {
 295                int issued = __ceph_caps_issued(ci, NULL);
 296                spin_unlock(&ci->i_ceph_lock);
 297                dout("renew caps %p want %s issued %s updating mds_wanted\n",
 298                     inode, ceph_cap_string(wanted), ceph_cap_string(issued));
 299                ceph_check_caps(ci, 0, NULL);
 300                return 0;
 301        }
 302        spin_unlock(&ci->i_ceph_lock);
 303
 304        flags = 0;
 305        if ((wanted & CEPH_CAP_FILE_RD) && (wanted & CEPH_CAP_FILE_WR))
 306                flags = O_RDWR;
 307        else if (wanted & CEPH_CAP_FILE_RD)
 308                flags = O_RDONLY;
 309        else if (wanted & CEPH_CAP_FILE_WR)
 310                flags = O_WRONLY;
 311#ifdef O_LAZY
 312        if (wanted & CEPH_CAP_FILE_LAZYIO)
 313                flags |= O_LAZY;
 314#endif
 315
 316        req = prepare_open_request(inode->i_sb, flags, 0);
 317        if (IS_ERR(req)) {
 318                err = PTR_ERR(req);
 319                goto out;
 320        }
 321
 322        req->r_inode = inode;
 323        ihold(inode);
 324        req->r_num_caps = 1;
 325
 326        err = ceph_mdsc_do_request(mdsc, NULL, req);
 327        ceph_mdsc_put_request(req);
 328out:
 329        dout("renew caps %p open result=%d\n", inode, err);
 330        return err < 0 ? err : 0;
 331}
 332
 333/*
 334 * If we already have the requisite capabilities, we can satisfy
 335 * the open request locally (no need to request new caps from the
 336 * MDS).  We do, however, need to inform the MDS (asynchronously)
 337 * if our wanted caps set expands.
 338 */
 339int ceph_open(struct inode *inode, struct file *file)
 340{
 341        struct ceph_inode_info *ci = ceph_inode(inode);
 342        struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
 343        struct ceph_mds_client *mdsc = fsc->mdsc;
 344        struct ceph_mds_request *req;
 345        struct ceph_file_info *fi = file->private_data;
 346        int err;
 347        int flags, fmode, wanted;
 348
 349        if (fi) {
 350                dout("open file %p is already opened\n", file);
 351                return 0;
 352        }
 353
 354        /* filter out O_CREAT|O_EXCL; vfs did that already.  yuck. */
 355        flags = file->f_flags & ~(O_CREAT|O_EXCL);
 356        if (S_ISDIR(inode->i_mode))
 357                flags = O_DIRECTORY;  /* mds likes to know */
 358
 359        dout("open inode %p ino %llx.%llx file %p flags %d (%d)\n", inode,
 360             ceph_vinop(inode), file, flags, file->f_flags);
 361        fmode = ceph_flags_to_mode(flags);
 362        wanted = ceph_caps_for_mode(fmode);
 363
 364        /* snapped files are read-only */
 365        if (ceph_snap(inode) != CEPH_NOSNAP && (file->f_mode & FMODE_WRITE))
 366                return -EROFS;
 367
 368        /* trivially open snapdir */
 369        if (ceph_snap(inode) == CEPH_SNAPDIR) {
 370                return ceph_init_file(inode, file, fmode);
 371        }
 372
 373        /*
 374         * No need to block if we have caps on the auth MDS (for
 375         * write) or any MDS (for read).  Update wanted set
 376         * asynchronously.
 377         */
 378        spin_lock(&ci->i_ceph_lock);
 379        if (__ceph_is_any_real_caps(ci) &&
 380            (((fmode & CEPH_FILE_MODE_WR) == 0) || ci->i_auth_cap)) {
 381                int mds_wanted = __ceph_caps_mds_wanted(ci, true);
 382                int issued = __ceph_caps_issued(ci, NULL);
 383
 384                dout("open %p fmode %d want %s issued %s using existing\n",
 385                     inode, fmode, ceph_cap_string(wanted),
 386                     ceph_cap_string(issued));
 387                __ceph_touch_fmode(ci, mdsc, fmode);
 388                spin_unlock(&ci->i_ceph_lock);
 389
 390                /* adjust wanted? */
 391                if ((issued & wanted) != wanted &&
 392                    (mds_wanted & wanted) != wanted &&
 393                    ceph_snap(inode) != CEPH_SNAPDIR)
 394                        ceph_check_caps(ci, 0, NULL);
 395
 396                return ceph_init_file(inode, file, fmode);
 397        } else if (ceph_snap(inode) != CEPH_NOSNAP &&
 398                   (ci->i_snap_caps & wanted) == wanted) {
 399                __ceph_touch_fmode(ci, mdsc, fmode);
 400                spin_unlock(&ci->i_ceph_lock);
 401                return ceph_init_file(inode, file, fmode);
 402        }
 403
 404        spin_unlock(&ci->i_ceph_lock);
 405
 406        dout("open fmode %d wants %s\n", fmode, ceph_cap_string(wanted));
 407        req = prepare_open_request(inode->i_sb, flags, 0);
 408        if (IS_ERR(req)) {
 409                err = PTR_ERR(req);
 410                goto out;
 411        }
 412        req->r_inode = inode;
 413        ihold(inode);
 414
 415        req->r_num_caps = 1;
 416        err = ceph_mdsc_do_request(mdsc, NULL, req);
 417        if (!err)
 418                err = ceph_init_file(inode, file, req->r_fmode);
 419        ceph_mdsc_put_request(req);
 420        dout("open result=%d on %llx.%llx\n", err, ceph_vinop(inode));
 421out:
 422        return err;
 423}
 424
 425/* Clone the layout from a synchronous create, if the dir now has Dc caps */
 426static void
 427cache_file_layout(struct inode *dst, struct inode *src)
 428{
 429        struct ceph_inode_info *cdst = ceph_inode(dst);
 430        struct ceph_inode_info *csrc = ceph_inode(src);
 431
 432        spin_lock(&cdst->i_ceph_lock);
 433        if ((__ceph_caps_issued(cdst, NULL) & CEPH_CAP_DIR_CREATE) &&
 434            !ceph_file_layout_is_valid(&cdst->i_cached_layout)) {
 435                memcpy(&cdst->i_cached_layout, &csrc->i_layout,
 436                        sizeof(cdst->i_cached_layout));
 437                rcu_assign_pointer(cdst->i_cached_layout.pool_ns,
 438                                   ceph_try_get_string(csrc->i_layout.pool_ns));
 439        }
 440        spin_unlock(&cdst->i_ceph_lock);
 441}
 442
 443/*
 444 * Try to set up an async create. We need caps, a file layout, and inode number,
 445 * and either a lease on the dentry or complete dir info. If any of those
 446 * criteria are not satisfied, then return false and the caller can go
 447 * synchronous.
 448 */
 449static int try_prep_async_create(struct inode *dir, struct dentry *dentry,
 450                                 struct ceph_file_layout *lo, u64 *pino)
 451{
 452        struct ceph_inode_info *ci = ceph_inode(dir);
 453        struct ceph_dentry_info *di = ceph_dentry(dentry);
 454        int got = 0, want = CEPH_CAP_FILE_EXCL | CEPH_CAP_DIR_CREATE;
 455        u64 ino;
 456
 457        spin_lock(&ci->i_ceph_lock);
 458        /* No auth cap means no chance for Dc caps */
 459        if (!ci->i_auth_cap)
 460                goto no_async;
 461
 462        /* Any delegated inos? */
 463        if (xa_empty(&ci->i_auth_cap->session->s_delegated_inos))
 464                goto no_async;
 465
 466        if (!ceph_file_layout_is_valid(&ci->i_cached_layout))
 467                goto no_async;
 468
 469        if ((__ceph_caps_issued(ci, NULL) & want) != want)
 470                goto no_async;
 471
 472        if (d_in_lookup(dentry)) {
 473                if (!__ceph_dir_is_complete(ci))
 474                        goto no_async;
 475                spin_lock(&dentry->d_lock);
 476                di->lease_shared_gen = atomic_read(&ci->i_shared_gen);
 477                spin_unlock(&dentry->d_lock);
 478        } else if (atomic_read(&ci->i_shared_gen) !=
 479                   READ_ONCE(di->lease_shared_gen)) {
 480                goto no_async;
 481        }
 482
 483        ino = ceph_get_deleg_ino(ci->i_auth_cap->session);
 484        if (!ino)
 485                goto no_async;
 486
 487        *pino = ino;
 488        ceph_take_cap_refs(ci, want, false);
 489        memcpy(lo, &ci->i_cached_layout, sizeof(*lo));
 490        rcu_assign_pointer(lo->pool_ns,
 491                           ceph_try_get_string(ci->i_cached_layout.pool_ns));
 492        got = want;
 493no_async:
 494        spin_unlock(&ci->i_ceph_lock);
 495        return got;
 496}
 497
 498static void restore_deleg_ino(struct inode *dir, u64 ino)
 499{
 500        struct ceph_inode_info *ci = ceph_inode(dir);
 501        struct ceph_mds_session *s = NULL;
 502
 503        spin_lock(&ci->i_ceph_lock);
 504        if (ci->i_auth_cap)
 505                s = ceph_get_mds_session(ci->i_auth_cap->session);
 506        spin_unlock(&ci->i_ceph_lock);
 507        if (s) {
 508                int err = ceph_restore_deleg_ino(s, ino);
 509                if (err)
 510                        pr_warn("ceph: unable to restore delegated ino 0x%llx to session: %d\n",
 511                                ino, err);
 512                ceph_put_mds_session(s);
 513        }
 514}
 515
 516static void ceph_async_create_cb(struct ceph_mds_client *mdsc,
 517                                 struct ceph_mds_request *req)
 518{
 519        int result = req->r_err ? req->r_err :
 520                        le32_to_cpu(req->r_reply_info.head->result);
 521
 522        if (result == -EJUKEBOX)
 523                goto out;
 524
 525        mapping_set_error(req->r_parent->i_mapping, result);
 526
 527        if (result) {
 528                struct dentry *dentry = req->r_dentry;
 529                int pathlen = 0;
 530                u64 base = 0;
 531                char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
 532                                                  &base, 0);
 533
 534                ceph_dir_clear_complete(req->r_parent);
 535                if (!d_unhashed(dentry))
 536                        d_drop(dentry);
 537
 538                /* FIXME: start returning I/O errors on all accesses? */
 539                pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n",
 540                        base, IS_ERR(path) ? "<<bad>>" : path, result);
 541                ceph_mdsc_free_path(path, pathlen);
 542        }
 543
 544        if (req->r_target_inode) {
 545                struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
 546                u64 ino = ceph_vino(req->r_target_inode).ino;
 547
 548                if (req->r_deleg_ino != ino)
 549                        pr_warn("%s: inode number mismatch! err=%d deleg_ino=0x%llx target=0x%llx\n",
 550                                __func__, req->r_err, req->r_deleg_ino, ino);
 551                mapping_set_error(req->r_target_inode->i_mapping, result);
 552
 553                spin_lock(&ci->i_ceph_lock);
 554                if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) {
 555                        ci->i_ceph_flags &= ~CEPH_I_ASYNC_CREATE;
 556                        wake_up_bit(&ci->i_ceph_flags, CEPH_ASYNC_CREATE_BIT);
 557                }
 558                ceph_kick_flushing_inode_caps(req->r_session, ci);
 559                spin_unlock(&ci->i_ceph_lock);
 560        } else {
 561                pr_warn("%s: no req->r_target_inode for 0x%llx\n", __func__,
 562                        req->r_deleg_ino);
 563        }
 564out:
 565        ceph_mdsc_release_dir_caps(req);
 566}
 567
 568static int ceph_finish_async_create(struct inode *dir, struct dentry *dentry,
 569                                    struct file *file, umode_t mode,
 570                                    struct ceph_mds_request *req,
 571                                    struct ceph_acl_sec_ctx *as_ctx,
 572                                    struct ceph_file_layout *lo)
 573{
 574        int ret;
 575        char xattr_buf[4];
 576        struct ceph_mds_reply_inode in = { };
 577        struct ceph_mds_reply_info_in iinfo = { .in = &in };
 578        struct ceph_inode_info *ci = ceph_inode(dir);
 579        struct inode *inode;
 580        struct timespec64 now;
 581        struct ceph_vino vino = { .ino = req->r_deleg_ino,
 582                                  .snap = CEPH_NOSNAP };
 583
 584        ktime_get_real_ts64(&now);
 585
 586        inode = ceph_get_inode(dentry->d_sb, vino);
 587        if (IS_ERR(inode))
 588                return PTR_ERR(inode);
 589
 590        iinfo.inline_version = CEPH_INLINE_NONE;
 591        iinfo.change_attr = 1;
 592        ceph_encode_timespec64(&iinfo.btime, &now);
 593
 594        iinfo.xattr_len = ARRAY_SIZE(xattr_buf);
 595        iinfo.xattr_data = xattr_buf;
 596        memset(iinfo.xattr_data, 0, iinfo.xattr_len);
 597
 598        in.ino = cpu_to_le64(vino.ino);
 599        in.snapid = cpu_to_le64(CEPH_NOSNAP);
 600        in.version = cpu_to_le64(1);    // ???
 601        in.cap.caps = in.cap.wanted = cpu_to_le32(CEPH_CAP_ALL_FILE);
 602        in.cap.cap_id = cpu_to_le64(1);
 603        in.cap.realm = cpu_to_le64(ci->i_snap_realm->ino);
 604        in.cap.flags = CEPH_CAP_FLAG_AUTH;
 605        in.ctime = in.mtime = in.atime = iinfo.btime;
 606        in.mode = cpu_to_le32((u32)mode);
 607        in.truncate_seq = cpu_to_le32(1);
 608        in.truncate_size = cpu_to_le64(-1ULL);
 609        in.xattr_version = cpu_to_le64(1);
 610        in.uid = cpu_to_le32(from_kuid(&init_user_ns, current_fsuid()));
 611        in.gid = cpu_to_le32(from_kgid(&init_user_ns, dir->i_mode & S_ISGID ?
 612                                dir->i_gid : current_fsgid()));
 613        in.nlink = cpu_to_le32(1);
 614        in.max_size = cpu_to_le64(lo->stripe_unit);
 615
 616        ceph_file_layout_to_legacy(lo, &in.layout);
 617
 618        ret = ceph_fill_inode(inode, NULL, &iinfo, NULL, req->r_session,
 619                              req->r_fmode, NULL);
 620        if (ret) {
 621                dout("%s failed to fill inode: %d\n", __func__, ret);
 622                ceph_dir_clear_complete(dir);
 623                if (!d_unhashed(dentry))
 624                        d_drop(dentry);
 625                if (inode->i_state & I_NEW)
 626                        discard_new_inode(inode);
 627        } else {
 628                struct dentry *dn;
 629
 630                dout("%s d_adding new inode 0x%llx to 0x%llx/%s\n", __func__,
 631                        vino.ino, ceph_ino(dir), dentry->d_name.name);
 632                ceph_dir_clear_ordered(dir);
 633                ceph_init_inode_acls(inode, as_ctx);
 634                if (inode->i_state & I_NEW) {
 635                        /*
 636                         * If it's not I_NEW, then someone created this before
 637                         * we got here. Assume the server is aware of it at
 638                         * that point and don't worry about setting
 639                         * CEPH_I_ASYNC_CREATE.
 640                         */
 641                        ceph_inode(inode)->i_ceph_flags = CEPH_I_ASYNC_CREATE;
 642                        unlock_new_inode(inode);
 643                }
 644                if (d_in_lookup(dentry) || d_really_is_negative(dentry)) {
 645                        if (!d_unhashed(dentry))
 646                                d_drop(dentry);
 647                        dn = d_splice_alias(inode, dentry);
 648                        WARN_ON_ONCE(dn && dn != dentry);
 649                }
 650                file->f_mode |= FMODE_CREATED;
 651                ret = finish_open(file, dentry, ceph_open);
 652        }
 653        return ret;
 654}
 655
 656/*
 657 * Do a lookup + open with a single request.  If we get a non-existent
 658 * file or symlink, return 1 so the VFS can retry.
 659 */
 660int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 661                     struct file *file, unsigned flags, umode_t mode)
 662{
 663        struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
 664        struct ceph_mds_client *mdsc = fsc->mdsc;
 665        struct ceph_mds_request *req;
 666        struct dentry *dn;
 667        struct ceph_acl_sec_ctx as_ctx = {};
 668        bool try_async = ceph_test_mount_opt(fsc, ASYNC_DIROPS);
 669        int mask;
 670        int err;
 671
 672        dout("atomic_open %p dentry %p '%pd' %s flags %d mode 0%o\n",
 673             dir, dentry, dentry,
 674             d_unhashed(dentry) ? "unhashed" : "hashed", flags, mode);
 675
 676        if (dentry->d_name.len > NAME_MAX)
 677                return -ENAMETOOLONG;
 678
 679        if (flags & O_CREAT) {
 680                if (ceph_quota_is_max_files_exceeded(dir))
 681                        return -EDQUOT;
 682                err = ceph_pre_init_acls(dir, &mode, &as_ctx);
 683                if (err < 0)
 684                        return err;
 685                err = ceph_security_init_secctx(dentry, mode, &as_ctx);
 686                if (err < 0)
 687                        goto out_ctx;
 688        } else if (!d_in_lookup(dentry)) {
 689                /* If it's not being looked up, it's negative */
 690                return -ENOENT;
 691        }
 692retry:
 693        /* do the open */
 694        req = prepare_open_request(dir->i_sb, flags, mode);
 695        if (IS_ERR(req)) {
 696                err = PTR_ERR(req);
 697                goto out_ctx;
 698        }
 699        req->r_dentry = dget(dentry);
 700        req->r_num_caps = 2;
 701        mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
 702        if (ceph_security_xattr_wanted(dir))
 703                mask |= CEPH_CAP_XATTR_SHARED;
 704        req->r_args.open.mask = cpu_to_le32(mask);
 705        req->r_parent = dir;
 706
 707        if (flags & O_CREAT) {
 708                struct ceph_file_layout lo;
 709
 710                req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
 711                req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
 712                if (as_ctx.pagelist) {
 713                        req->r_pagelist = as_ctx.pagelist;
 714                        as_ctx.pagelist = NULL;
 715                }
 716                if (try_async &&
 717                    (req->r_dir_caps =
 718                      try_prep_async_create(dir, dentry, &lo,
 719                                            &req->r_deleg_ino))) {
 720                        set_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags);
 721                        req->r_args.open.flags |= cpu_to_le32(CEPH_O_EXCL);
 722                        req->r_callback = ceph_async_create_cb;
 723                        err = ceph_mdsc_submit_request(mdsc, dir, req);
 724                        if (!err) {
 725                                err = ceph_finish_async_create(dir, dentry,
 726                                                        file, mode, req,
 727                                                        &as_ctx, &lo);
 728                        } else if (err == -EJUKEBOX) {
 729                                restore_deleg_ino(dir, req->r_deleg_ino);
 730                                ceph_mdsc_put_request(req);
 731                                try_async = false;
 732                                goto retry;
 733                        }
 734                        goto out_req;
 735                }
 736        }
 737
 738        set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
 739        err = ceph_mdsc_do_request(mdsc,
 740                                   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
 741                                   req);
 742        err = ceph_handle_snapdir(req, dentry, err);
 743        if (err)
 744                goto out_req;
 745
 746        if ((flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
 747                err = ceph_handle_notrace_create(dir, dentry);
 748
 749        if (d_in_lookup(dentry)) {
 750                dn = ceph_finish_lookup(req, dentry, err);
 751                if (IS_ERR(dn))
 752                        err = PTR_ERR(dn);
 753        } else {
 754                /* we were given a hashed negative dentry */
 755                dn = NULL;
 756        }
 757        if (err)
 758                goto out_req;
 759        if (dn || d_really_is_negative(dentry) || d_is_symlink(dentry)) {
 760                /* make vfs retry on splice, ENOENT, or symlink */
 761                dout("atomic_open finish_no_open on dn %p\n", dn);
 762                err = finish_no_open(file, dn);
 763        } else {
 764                dout("atomic_open finish_open on dn %p\n", dn);
 765                if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
 766                        struct inode *newino = d_inode(dentry);
 767
 768                        cache_file_layout(dir, newino);
 769                        ceph_init_inode_acls(newino, &as_ctx);
 770                        file->f_mode |= FMODE_CREATED;
 771                }
 772                err = finish_open(file, dentry, ceph_open);
 773        }
 774out_req:
 775        ceph_mdsc_put_request(req);
 776out_ctx:
 777        ceph_release_acl_sec_ctx(&as_ctx);
 778        dout("atomic_open result=%d\n", err);
 779        return err;
 780}
 781
 782int ceph_release(struct inode *inode, struct file *file)
 783{
 784        struct ceph_inode_info *ci = ceph_inode(inode);
 785
 786        if (S_ISDIR(inode->i_mode)) {
 787                struct ceph_dir_file_info *dfi = file->private_data;
 788                dout("release inode %p dir file %p\n", inode, file);
 789                WARN_ON(!list_empty(&dfi->file_info.rw_contexts));
 790
 791                ceph_put_fmode(ci, dfi->file_info.fmode, 1);
 792
 793                if (dfi->last_readdir)
 794                        ceph_mdsc_put_request(dfi->last_readdir);
 795                kfree(dfi->last_name);
 796                kfree(dfi->dir_info);
 797                kmem_cache_free(ceph_dir_file_cachep, dfi);
 798        } else {
 799                struct ceph_file_info *fi = file->private_data;
 800                dout("release inode %p regular file %p\n", inode, file);
 801                WARN_ON(!list_empty(&fi->rw_contexts));
 802
 803                ceph_put_fmode(ci, fi->fmode, 1);
 804
 805                kmem_cache_free(ceph_file_cachep, fi);
 806        }
 807
 808        /* wake up anyone waiting for caps on this inode */
 809        wake_up_all(&ci->i_cap_wq);
 810        return 0;
 811}
 812
 813enum {
 814        HAVE_RETRIED = 1,
 815        CHECK_EOF =    2,
 816        READ_INLINE =  3,
 817};
 818
 819/*
 820 * Completely synchronous read and write methods.  Direct from __user
 821 * buffer to osd, or directly to user pages (if O_DIRECT).
 822 *
 823 * If the read spans object boundary, just do multiple reads.  (That's not
 824 * atomic, but good enough for now.)
 825 *
 826 * If we get a short result from the OSD, check against i_size; we need to
 827 * only return a short read to the caller if we hit EOF.
 828 */
 829static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
 830                              int *retry_op)
 831{
 832        struct file *file = iocb->ki_filp;
 833        struct inode *inode = file_inode(file);
 834        struct ceph_inode_info *ci = ceph_inode(inode);
 835        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 836        struct ceph_osd_client *osdc = &fsc->client->osdc;
 837        ssize_t ret;
 838        u64 off = iocb->ki_pos;
 839        u64 len = iov_iter_count(to);
 840
 841        dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len,
 842             (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
 843
 844        if (!len)
 845                return 0;
 846        /*
 847         * flush any page cache pages in this range.  this
 848         * will make concurrent normal and sync io slow,
 849         * but it will at least behave sensibly when they are
 850         * in sequence.
 851         */
 852        ret = filemap_write_and_wait_range(inode->i_mapping,
 853                                           off, off + len - 1);
 854        if (ret < 0)
 855                return ret;
 856
 857        ret = 0;
 858        while ((len = iov_iter_count(to)) > 0) {
 859                struct ceph_osd_request *req;
 860                struct page **pages;
 861                int num_pages;
 862                size_t page_off;
 863                u64 i_size;
 864                bool more;
 865                int idx;
 866                size_t left;
 867
 868                req = ceph_osdc_new_request(osdc, &ci->i_layout,
 869                                        ci->i_vino, off, &len, 0, 1,
 870                                        CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
 871                                        NULL, ci->i_truncate_seq,
 872                                        ci->i_truncate_size, false);
 873                if (IS_ERR(req)) {
 874                        ret = PTR_ERR(req);
 875                        break;
 876                }
 877
 878                more = len < iov_iter_count(to);
 879
 880                num_pages = calc_pages_for(off, len);
 881                page_off = off & ~PAGE_MASK;
 882                pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
 883                if (IS_ERR(pages)) {
 884                        ceph_osdc_put_request(req);
 885                        ret = PTR_ERR(pages);
 886                        break;
 887                }
 888
 889                osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off,
 890                                                 false, false);
 891                ret = ceph_osdc_start_request(osdc, req, false);
 892                if (!ret)
 893                        ret = ceph_osdc_wait_request(osdc, req);
 894
 895                ceph_update_read_latency(&fsc->mdsc->metric,
 896                                         req->r_start_latency,
 897                                         req->r_end_latency,
 898                                         ret);
 899
 900                ceph_osdc_put_request(req);
 901
 902                i_size = i_size_read(inode);
 903                dout("sync_read %llu~%llu got %zd i_size %llu%s\n",
 904                     off, len, ret, i_size, (more ? " MORE" : ""));
 905
 906                if (ret == -ENOENT)
 907                        ret = 0;
 908                if (ret >= 0 && ret < len && (off + ret < i_size)) {
 909                        int zlen = min(len - ret, i_size - off - ret);
 910                        int zoff = page_off + ret;
 911                        dout("sync_read zero gap %llu~%llu\n",
 912                             off + ret, off + ret + zlen);
 913                        ceph_zero_page_vector_range(zoff, zlen, pages);
 914                        ret += zlen;
 915                }
 916
 917                idx = 0;
 918                left = ret > 0 ? ret : 0;
 919                while (left > 0) {
 920                        size_t len, copied;
 921                        page_off = off & ~PAGE_MASK;
 922                        len = min_t(size_t, left, PAGE_SIZE - page_off);
 923                        SetPageUptodate(pages[idx]);
 924                        copied = copy_page_to_iter(pages[idx++],
 925                                                   page_off, len, to);
 926                        off += copied;
 927                        left -= copied;
 928                        if (copied < len) {
 929                                ret = -EFAULT;
 930                                break;
 931                        }
 932                }
 933                ceph_release_page_vector(pages, num_pages);
 934
 935                if (ret < 0) {
 936                        if (ret == -EBLOCKLISTED)
 937                                fsc->blocklisted = true;
 938                        break;
 939                }
 940
 941                if (off >= i_size || !more)
 942                        break;
 943        }
 944
 945        if (off > iocb->ki_pos) {
 946                if (ret >= 0 &&
 947                    iov_iter_count(to) > 0 && off >= i_size_read(inode))
 948                        *retry_op = CHECK_EOF;
 949                ret = off - iocb->ki_pos;
 950                iocb->ki_pos = off;
 951        }
 952
 953        dout("sync_read result %zd retry_op %d\n", ret, *retry_op);
 954        return ret;
 955}
 956
 957struct ceph_aio_request {
 958        struct kiocb *iocb;
 959        size_t total_len;
 960        bool write;
 961        bool should_dirty;
 962        int error;
 963        struct list_head osd_reqs;
 964        unsigned num_reqs;
 965        atomic_t pending_reqs;
 966        struct timespec64 mtime;
 967        struct ceph_cap_flush *prealloc_cf;
 968};
 969
 970struct ceph_aio_work {
 971        struct work_struct work;
 972        struct ceph_osd_request *req;
 973};
 974
 975static void ceph_aio_retry_work(struct work_struct *work);
 976
 977static void ceph_aio_complete(struct inode *inode,
 978                              struct ceph_aio_request *aio_req)
 979{
 980        struct ceph_inode_info *ci = ceph_inode(inode);
 981        int ret;
 982
 983        if (!atomic_dec_and_test(&aio_req->pending_reqs))
 984                return;
 985
 986        if (aio_req->iocb->ki_flags & IOCB_DIRECT)
 987                inode_dio_end(inode);
 988
 989        ret = aio_req->error;
 990        if (!ret)
 991                ret = aio_req->total_len;
 992
 993        dout("ceph_aio_complete %p rc %d\n", inode, ret);
 994
 995        if (ret >= 0 && aio_req->write) {
 996                int dirty;
 997
 998                loff_t endoff = aio_req->iocb->ki_pos + aio_req->total_len;
 999                if (endoff > i_size_read(inode)) {
1000                        if (ceph_inode_set_size(inode, endoff))
1001                                ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
1002                }
1003
1004                spin_lock(&ci->i_ceph_lock);
1005                ci->i_inline_version = CEPH_INLINE_NONE;
1006                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
1007                                               &aio_req->prealloc_cf);
1008                spin_unlock(&ci->i_ceph_lock);
1009                if (dirty)
1010                        __mark_inode_dirty(inode, dirty);
1011
1012        }
1013
1014        ceph_put_cap_refs(ci, (aio_req->write ? CEPH_CAP_FILE_WR :
1015                                                CEPH_CAP_FILE_RD));
1016
1017        aio_req->iocb->ki_complete(aio_req->iocb, ret, 0);
1018
1019        ceph_free_cap_flush(aio_req->prealloc_cf);
1020        kfree(aio_req);
1021}
1022
1023static void ceph_aio_complete_req(struct ceph_osd_request *req)
1024{
1025        int rc = req->r_result;
1026        struct inode *inode = req->r_inode;
1027        struct ceph_aio_request *aio_req = req->r_priv;
1028        struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
1029        struct ceph_client_metric *metric = &ceph_sb_to_mdsc(inode->i_sb)->metric;
1030
1031        BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS);
1032        BUG_ON(!osd_data->num_bvecs);
1033
1034        dout("ceph_aio_complete_req %p rc %d bytes %u\n",
1035             inode, rc, osd_data->bvec_pos.iter.bi_size);
1036
1037        /* r_start_latency == 0 means the request was not submitted */
1038        if (req->r_start_latency) {
1039                if (aio_req->write)
1040                        ceph_update_write_latency(metric, req->r_start_latency,
1041                                                  req->r_end_latency, rc);
1042                else
1043                        ceph_update_read_latency(metric, req->r_start_latency,
1044                                                 req->r_end_latency, rc);
1045        }
1046
1047        if (rc == -EOLDSNAPC) {
1048                struct ceph_aio_work *aio_work;
1049                BUG_ON(!aio_req->write);
1050
1051                aio_work = kmalloc(sizeof(*aio_work), GFP_NOFS);
1052                if (aio_work) {
1053                        INIT_WORK(&aio_work->work, ceph_aio_retry_work);
1054                        aio_work->req = req;
1055                        queue_work(ceph_inode_to_client(inode)->inode_wq,
1056                                   &aio_work->work);
1057                        return;
1058                }
1059                rc = -ENOMEM;
1060        } else if (!aio_req->write) {
1061                if (rc == -ENOENT)
1062                        rc = 0;
1063                if (rc >= 0 && osd_data->bvec_pos.iter.bi_size > rc) {
1064                        struct iov_iter i;
1065                        int zlen = osd_data->bvec_pos.iter.bi_size - rc;
1066
1067                        /*
1068                         * If read is satisfied by single OSD request,
1069                         * it can pass EOF. Otherwise read is within
1070                         * i_size.
1071                         */
1072                        if (aio_req->num_reqs == 1) {
1073                                loff_t i_size = i_size_read(inode);
1074                                loff_t endoff = aio_req->iocb->ki_pos + rc;
1075                                if (endoff < i_size)
1076                                        zlen = min_t(size_t, zlen,
1077                                                     i_size - endoff);
1078                                aio_req->total_len = rc + zlen;
1079                        }
1080
1081                        iov_iter_bvec(&i, READ, osd_data->bvec_pos.bvecs,
1082                                      osd_data->num_bvecs,
1083                                      osd_data->bvec_pos.iter.bi_size);
1084                        iov_iter_advance(&i, rc);
1085                        iov_iter_zero(zlen, &i);
1086                }
1087        }
1088
1089        put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs,
1090                  aio_req->should_dirty);
1091        ceph_osdc_put_request(req);
1092
1093        if (rc < 0)
1094                cmpxchg(&aio_req->error, 0, rc);
1095
1096        ceph_aio_complete(inode, aio_req);
1097        return;
1098}
1099
1100static void ceph_aio_retry_work(struct work_struct *work)
1101{
1102        struct ceph_aio_work *aio_work =
1103                container_of(work, struct ceph_aio_work, work);
1104        struct ceph_osd_request *orig_req = aio_work->req;
1105        struct ceph_aio_request *aio_req = orig_req->r_priv;
1106        struct inode *inode = orig_req->r_inode;
1107        struct ceph_inode_info *ci = ceph_inode(inode);
1108        struct ceph_snap_context *snapc;
1109        struct ceph_osd_request *req;
1110        int ret;
1111
1112        spin_lock(&ci->i_ceph_lock);
1113        if (__ceph_have_pending_cap_snap(ci)) {
1114                struct ceph_cap_snap *capsnap =
1115                        list_last_entry(&ci->i_cap_snaps,
1116                                        struct ceph_cap_snap,
1117                                        ci_item);
1118                snapc = ceph_get_snap_context(capsnap->context);
1119        } else {
1120                BUG_ON(!ci->i_head_snapc);
1121                snapc = ceph_get_snap_context(ci->i_head_snapc);
1122        }
1123        spin_unlock(&ci->i_ceph_lock);
1124
1125        req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 1,
1126                        false, GFP_NOFS);
1127        if (!req) {
1128                ret = -ENOMEM;
1129                req = orig_req;
1130                goto out;
1131        }
1132
1133        req->r_flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE;
1134        ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc);
1135        ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid);
1136
1137        req->r_ops[0] = orig_req->r_ops[0];
1138
1139        req->r_mtime = aio_req->mtime;
1140        req->r_data_offset = req->r_ops[0].extent.offset;
1141
1142        ret = ceph_osdc_alloc_messages(req, GFP_NOFS);
1143        if (ret) {
1144                ceph_osdc_put_request(req);
1145                req = orig_req;
1146                goto out;
1147        }
1148
1149        ceph_osdc_put_request(orig_req);
1150
1151        req->r_callback = ceph_aio_complete_req;
1152        req->r_inode = inode;
1153        req->r_priv = aio_req;
1154
1155        ret = ceph_osdc_start_request(req->r_osdc, req, false);
1156out:
1157        if (ret < 0) {
1158                req->r_result = ret;
1159                ceph_aio_complete_req(req);
1160        }
1161
1162        ceph_put_snap_context(snapc);
1163        kfree(aio_work);
1164}
1165
1166static ssize_t
1167ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
1168                       struct ceph_snap_context *snapc,
1169                       struct ceph_cap_flush **pcf)
1170{
1171        struct file *file = iocb->ki_filp;
1172        struct inode *inode = file_inode(file);
1173        struct ceph_inode_info *ci = ceph_inode(inode);
1174        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1175        struct ceph_client_metric *metric = &fsc->mdsc->metric;
1176        struct ceph_vino vino;
1177        struct ceph_osd_request *req;
1178        struct bio_vec *bvecs;
1179        struct ceph_aio_request *aio_req = NULL;
1180        int num_pages = 0;
1181        int flags;
1182        int ret = 0;
1183        struct timespec64 mtime = current_time(inode);
1184        size_t count = iov_iter_count(iter);
1185        loff_t pos = iocb->ki_pos;
1186        bool write = iov_iter_rw(iter) == WRITE;
1187        bool should_dirty = !write && iter_is_iovec(iter);
1188
1189        if (write && ceph_snap(file_inode(file)) != CEPH_NOSNAP)
1190                return -EROFS;
1191
1192        dout("sync_direct_%s on file %p %lld~%u snapc %p seq %lld\n",
1193             (write ? "write" : "read"), file, pos, (unsigned)count,
1194             snapc, snapc ? snapc->seq : 0);
1195
1196        if (write) {
1197                int ret2 = invalidate_inode_pages2_range(inode->i_mapping,
1198                                        pos >> PAGE_SHIFT,
1199                                        (pos + count - 1) >> PAGE_SHIFT);
1200                if (ret2 < 0)
1201                        dout("invalidate_inode_pages2_range returned %d\n", ret2);
1202
1203                flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE;
1204        } else {
1205                flags = CEPH_OSD_FLAG_READ;
1206        }
1207
1208        while (iov_iter_count(iter) > 0) {
1209                u64 size = iov_iter_count(iter);
1210                ssize_t len;
1211
1212                if (write)
1213                        size = min_t(u64, size, fsc->mount_options->wsize);
1214                else
1215                        size = min_t(u64, size, fsc->mount_options->rsize);
1216
1217                vino = ceph_vino(inode);
1218                req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1219                                            vino, pos, &size, 0,
1220                                            1,
1221                                            write ? CEPH_OSD_OP_WRITE :
1222                                                    CEPH_OSD_OP_READ,
1223                                            flags, snapc,
1224                                            ci->i_truncate_seq,
1225                                            ci->i_truncate_size,
1226                                            false);
1227                if (IS_ERR(req)) {
1228                        ret = PTR_ERR(req);
1229                        break;
1230                }
1231
1232                len = iter_get_bvecs_alloc(iter, size, &bvecs, &num_pages);
1233                if (len < 0) {
1234                        ceph_osdc_put_request(req);
1235                        ret = len;
1236                        break;
1237                }
1238                if (len != size)
1239                        osd_req_op_extent_update(req, 0, len);
1240
1241                /*
1242                 * To simplify error handling, allow AIO when IO within i_size
1243                 * or IO can be satisfied by single OSD request.
1244                 */
1245                if (pos == iocb->ki_pos && !is_sync_kiocb(iocb) &&
1246                    (len == count || pos + count <= i_size_read(inode))) {
1247                        aio_req = kzalloc(sizeof(*aio_req), GFP_KERNEL);
1248                        if (aio_req) {
1249                                aio_req->iocb = iocb;
1250                                aio_req->write = write;
1251                                aio_req->should_dirty = should_dirty;
1252                                INIT_LIST_HEAD(&aio_req->osd_reqs);
1253                                if (write) {
1254                                        aio_req->mtime = mtime;
1255                                        swap(aio_req->prealloc_cf, *pcf);
1256                                }
1257                        }
1258                        /* ignore error */
1259                }
1260
1261                if (write) {
1262                        /*
1263                         * throw out any page cache pages in this range. this
1264                         * may block.
1265                         */
1266                        truncate_inode_pages_range(inode->i_mapping, pos,
1267                                                   PAGE_ALIGN(pos + len) - 1);
1268
1269                        req->r_mtime = mtime;
1270                }
1271
1272                osd_req_op_extent_osd_data_bvecs(req, 0, bvecs, num_pages, len);
1273
1274                if (aio_req) {
1275                        aio_req->total_len += len;
1276                        aio_req->num_reqs++;
1277                        atomic_inc(&aio_req->pending_reqs);
1278
1279                        req->r_callback = ceph_aio_complete_req;
1280                        req->r_inode = inode;
1281                        req->r_priv = aio_req;
1282                        list_add_tail(&req->r_private_item, &aio_req->osd_reqs);
1283
1284                        pos += len;
1285                        continue;
1286                }
1287
1288                ret = ceph_osdc_start_request(req->r_osdc, req, false);
1289                if (!ret)
1290                        ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
1291
1292                if (write)
1293                        ceph_update_write_latency(metric, req->r_start_latency,
1294                                                  req->r_end_latency, ret);
1295                else
1296                        ceph_update_read_latency(metric, req->r_start_latency,
1297                                                 req->r_end_latency, ret);
1298
1299                size = i_size_read(inode);
1300                if (!write) {
1301                        if (ret == -ENOENT)
1302                                ret = 0;
1303                        if (ret >= 0 && ret < len && pos + ret < size) {
1304                                struct iov_iter i;
1305                                int zlen = min_t(size_t, len - ret,
1306                                                 size - pos - ret);
1307
1308                                iov_iter_bvec(&i, READ, bvecs, num_pages, len);
1309                                iov_iter_advance(&i, ret);
1310                                iov_iter_zero(zlen, &i);
1311                                ret += zlen;
1312                        }
1313                        if (ret >= 0)
1314                                len = ret;
1315                }
1316
1317                put_bvecs(bvecs, num_pages, should_dirty);
1318                ceph_osdc_put_request(req);
1319                if (ret < 0)
1320                        break;
1321
1322                pos += len;
1323                if (!write && pos >= size)
1324                        break;
1325
1326                if (write && pos > size) {
1327                        if (ceph_inode_set_size(inode, pos))
1328                                ceph_check_caps(ceph_inode(inode),
1329                                                CHECK_CAPS_AUTHONLY,
1330                                                NULL);
1331                }
1332        }
1333
1334        if (aio_req) {
1335                LIST_HEAD(osd_reqs);
1336
1337                if (aio_req->num_reqs == 0) {
1338                        kfree(aio_req);
1339                        return ret;
1340                }
1341
1342                ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR :
1343                                              CEPH_CAP_FILE_RD);
1344
1345                list_splice(&aio_req->osd_reqs, &osd_reqs);
1346                inode_dio_begin(inode);
1347                while (!list_empty(&osd_reqs)) {
1348                        req = list_first_entry(&osd_reqs,
1349                                               struct ceph_osd_request,
1350                                               r_private_item);
1351                        list_del_init(&req->r_private_item);
1352                        if (ret >= 0)
1353                                ret = ceph_osdc_start_request(req->r_osdc,
1354                                                              req, false);
1355                        if (ret < 0) {
1356                                req->r_result = ret;
1357                                ceph_aio_complete_req(req);
1358                        }
1359                }
1360                return -EIOCBQUEUED;
1361        }
1362
1363        if (ret != -EOLDSNAPC && pos > iocb->ki_pos) {
1364                ret = pos - iocb->ki_pos;
1365                iocb->ki_pos = pos;
1366        }
1367        return ret;
1368}
1369
1370/*
1371 * Synchronous write, straight from __user pointer or user pages.
1372 *
1373 * If write spans object boundary, just do multiple writes.  (For a
1374 * correct atomic write, we should e.g. take write locks on all
1375 * objects, rollback on failure, etc.)
1376 */
1377static ssize_t
1378ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
1379                struct ceph_snap_context *snapc)
1380{
1381        struct file *file = iocb->ki_filp;
1382        struct inode *inode = file_inode(file);
1383        struct ceph_inode_info *ci = ceph_inode(inode);
1384        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1385        struct ceph_vino vino;
1386        struct ceph_osd_request *req;
1387        struct page **pages;
1388        u64 len;
1389        int num_pages;
1390        int written = 0;
1391        int flags;
1392        int ret;
1393        bool check_caps = false;
1394        struct timespec64 mtime = current_time(inode);
1395        size_t count = iov_iter_count(from);
1396
1397        if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
1398                return -EROFS;
1399
1400        dout("sync_write on file %p %lld~%u snapc %p seq %lld\n",
1401             file, pos, (unsigned)count, snapc, snapc->seq);
1402
1403        ret = filemap_write_and_wait_range(inode->i_mapping,
1404                                           pos, pos + count - 1);
1405        if (ret < 0)
1406                return ret;
1407
1408        ret = invalidate_inode_pages2_range(inode->i_mapping,
1409                                            pos >> PAGE_SHIFT,
1410                                            (pos + count - 1) >> PAGE_SHIFT);
1411        if (ret < 0)
1412                dout("invalidate_inode_pages2_range returned %d\n", ret);
1413
1414        flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE;
1415
1416        while ((len = iov_iter_count(from)) > 0) {
1417                size_t left;
1418                int n;
1419
1420                vino = ceph_vino(inode);
1421                req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1422                                            vino, pos, &len, 0, 1,
1423                                            CEPH_OSD_OP_WRITE, flags, snapc,
1424                                            ci->i_truncate_seq,
1425                                            ci->i_truncate_size,
1426                                            false);
1427                if (IS_ERR(req)) {
1428                        ret = PTR_ERR(req);
1429                        break;
1430                }
1431
1432                /*
1433                 * write from beginning of first page,
1434                 * regardless of io alignment
1435                 */
1436                num_pages = (len + PAGE_SIZE - 1) >> PAGE_SHIFT;
1437
1438                pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1439                if (IS_ERR(pages)) {
1440                        ret = PTR_ERR(pages);
1441                        goto out;
1442                }
1443
1444                left = len;
1445                for (n = 0; n < num_pages; n++) {
1446                        size_t plen = min_t(size_t, left, PAGE_SIZE);
1447                        ret = copy_page_from_iter(pages[n], 0, plen, from);
1448                        if (ret != plen) {
1449                                ret = -EFAULT;
1450                                break;
1451                        }
1452                        left -= ret;
1453                }
1454
1455                if (ret < 0) {
1456                        ceph_release_page_vector(pages, num_pages);
1457                        goto out;
1458                }
1459
1460                req->r_inode = inode;
1461
1462                osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
1463                                                false, true);
1464
1465                req->r_mtime = mtime;
1466                ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1467                if (!ret)
1468                        ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
1469
1470                ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
1471                                          req->r_end_latency, ret);
1472out:
1473                ceph_osdc_put_request(req);
1474                if (ret != 0) {
1475                        ceph_set_error_write(ci);
1476                        break;
1477                }
1478
1479                ceph_clear_error_write(ci);
1480                pos += len;
1481                written += len;
1482                if (pos > i_size_read(inode)) {
1483                        check_caps = ceph_inode_set_size(inode, pos);
1484                        if (check_caps)
1485                                ceph_check_caps(ceph_inode(inode),
1486                                                CHECK_CAPS_AUTHONLY,
1487                                                NULL);
1488                }
1489
1490        }
1491
1492        if (ret != -EOLDSNAPC && written > 0) {
1493                ret = written;
1494                iocb->ki_pos = pos;
1495        }
1496        return ret;
1497}
1498
1499/*
1500 * Wrap generic_file_aio_read with checks for cap bits on the inode.
1501 * Atomically grab references, so that those bits are not released
1502 * back to the MDS mid-read.
1503 *
1504 * Hmm, the sync read case isn't actually async... should it be?
1505 */
1506static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
1507{
1508        struct file *filp = iocb->ki_filp;
1509        struct ceph_file_info *fi = filp->private_data;
1510        size_t len = iov_iter_count(to);
1511        struct inode *inode = file_inode(filp);
1512        struct ceph_inode_info *ci = ceph_inode(inode);
1513        struct page *pinned_page = NULL;
1514        bool direct_lock = iocb->ki_flags & IOCB_DIRECT;
1515        ssize_t ret;
1516        int want, got = 0;
1517        int retry_op = 0, read = 0;
1518
1519again:
1520        dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
1521             inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
1522
1523        if (direct_lock)
1524                ceph_start_io_direct(inode);
1525        else
1526                ceph_start_io_read(inode);
1527
1528        if (fi->fmode & CEPH_FILE_MODE_LAZY)
1529                want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
1530        else
1531                want = CEPH_CAP_FILE_CACHE;
1532        ret = ceph_get_caps(filp, CEPH_CAP_FILE_RD, want, -1,
1533                            &got, &pinned_page);
1534        if (ret < 0) {
1535                if (iocb->ki_flags & IOCB_DIRECT)
1536                        ceph_end_io_direct(inode);
1537                else
1538                        ceph_end_io_read(inode);
1539                return ret;
1540        }
1541
1542        if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
1543            (iocb->ki_flags & IOCB_DIRECT) ||
1544            (fi->flags & CEPH_F_SYNC)) {
1545
1546                dout("aio_sync_read %p %llx.%llx %llu~%u got cap refs on %s\n",
1547                     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
1548                     ceph_cap_string(got));
1549
1550                if (ci->i_inline_version == CEPH_INLINE_NONE) {
1551                        if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) {
1552                                ret = ceph_direct_read_write(iocb, to,
1553                                                             NULL, NULL);
1554                                if (ret >= 0 && ret < len)
1555                                        retry_op = CHECK_EOF;
1556                        } else {
1557                                ret = ceph_sync_read(iocb, to, &retry_op);
1558                        }
1559                } else {
1560                        retry_op = READ_INLINE;
1561                }
1562        } else {
1563                CEPH_DEFINE_RW_CONTEXT(rw_ctx, got);
1564                dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
1565                     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
1566                     ceph_cap_string(got));
1567                ceph_add_rw_context(fi, &rw_ctx);
1568                ret = generic_file_read_iter(iocb, to);
1569                ceph_del_rw_context(fi, &rw_ctx);
1570        }
1571
1572        dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
1573             inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
1574        if (pinned_page) {
1575                put_page(pinned_page);
1576                pinned_page = NULL;
1577        }
1578        ceph_put_cap_refs(ci, got);
1579
1580        if (direct_lock)
1581                ceph_end_io_direct(inode);
1582        else
1583                ceph_end_io_read(inode);
1584
1585        if (retry_op > HAVE_RETRIED && ret >= 0) {
1586                int statret;
1587                struct page *page = NULL;
1588                loff_t i_size;
1589                if (retry_op == READ_INLINE) {
1590                        page = __page_cache_alloc(GFP_KERNEL);
1591                        if (!page)
1592                                return -ENOMEM;
1593                }
1594
1595                statret = __ceph_do_getattr(inode, page,
1596                                            CEPH_STAT_CAP_INLINE_DATA, !!page);
1597                if (statret < 0) {
1598                        if (page)
1599                                __free_page(page);
1600                        if (statret == -ENODATA) {
1601                                BUG_ON(retry_op != READ_INLINE);
1602                                goto again;
1603                        }
1604                        return statret;
1605                }
1606
1607                i_size = i_size_read(inode);
1608                if (retry_op == READ_INLINE) {
1609                        BUG_ON(ret > 0 || read > 0);
1610                        if (iocb->ki_pos < i_size &&
1611                            iocb->ki_pos < PAGE_SIZE) {
1612                                loff_t end = min_t(loff_t, i_size,
1613                                                   iocb->ki_pos + len);
1614                                end = min_t(loff_t, end, PAGE_SIZE);
1615                                if (statret < end)
1616                                        zero_user_segment(page, statret, end);
1617                                ret = copy_page_to_iter(page,
1618                                                iocb->ki_pos & ~PAGE_MASK,
1619                                                end - iocb->ki_pos, to);
1620                                iocb->ki_pos += ret;
1621                                read += ret;
1622                        }
1623                        if (iocb->ki_pos < i_size && read < len) {
1624                                size_t zlen = min_t(size_t, len - read,
1625                                                    i_size - iocb->ki_pos);
1626                                ret = iov_iter_zero(zlen, to);
1627                                iocb->ki_pos += ret;
1628                                read += ret;
1629                        }
1630                        __free_pages(page, 0);
1631                        return read;
1632                }
1633
1634                /* hit EOF or hole? */
1635                if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
1636                    ret < len) {
1637                        dout("sync_read hit hole, ppos %lld < size %lld"
1638                             ", reading more\n", iocb->ki_pos, i_size);
1639
1640                        read += ret;
1641                        len -= ret;
1642                        retry_op = HAVE_RETRIED;
1643                        goto again;
1644                }
1645        }
1646
1647        if (ret >= 0)
1648                ret += read;
1649
1650        return ret;
1651}
1652
1653/*
1654 * Take cap references to avoid releasing caps to MDS mid-write.
1655 *
1656 * If we are synchronous, and write with an old snap context, the OSD
1657 * may return EOLDSNAPC.  In that case, retry the write.. _after_
1658 * dropping our cap refs and allowing the pending snap to logically
1659 * complete _before_ this write occurs.
1660 *
1661 * If we are near ENOSPC, write synchronously.
1662 */
1663static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
1664{
1665        struct file *file = iocb->ki_filp;
1666        struct ceph_file_info *fi = file->private_data;
1667        struct inode *inode = file_inode(file);
1668        struct ceph_inode_info *ci = ceph_inode(inode);
1669        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1670        struct ceph_osd_client *osdc = &fsc->client->osdc;
1671        struct ceph_cap_flush *prealloc_cf;
1672        ssize_t count, written = 0;
1673        int err, want, got;
1674        bool direct_lock = false;
1675        u32 map_flags;
1676        u64 pool_flags;
1677        loff_t pos;
1678        loff_t limit = max(i_size_read(inode), fsc->max_file_size);
1679
1680        if (ceph_snap(inode) != CEPH_NOSNAP)
1681                return -EROFS;
1682
1683        prealloc_cf = ceph_alloc_cap_flush();
1684        if (!prealloc_cf)
1685                return -ENOMEM;
1686
1687        if ((iocb->ki_flags & (IOCB_DIRECT | IOCB_APPEND)) == IOCB_DIRECT)
1688                direct_lock = true;
1689
1690retry_snap:
1691        if (direct_lock)
1692                ceph_start_io_direct(inode);
1693        else
1694                ceph_start_io_write(inode);
1695
1696        /* We can write back this queue in page reclaim */
1697        current->backing_dev_info = inode_to_bdi(inode);
1698
1699        if (iocb->ki_flags & IOCB_APPEND) {
1700                err = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
1701                if (err < 0)
1702                        goto out;
1703        }
1704
1705        err = generic_write_checks(iocb, from);
1706        if (err <= 0)
1707                goto out;
1708
1709        pos = iocb->ki_pos;
1710        if (unlikely(pos >= limit)) {
1711                err = -EFBIG;
1712                goto out;
1713        } else {
1714                iov_iter_truncate(from, limit - pos);
1715        }
1716
1717        count = iov_iter_count(from);
1718        if (ceph_quota_is_max_bytes_exceeded(inode, pos + count)) {
1719                err = -EDQUOT;
1720                goto out;
1721        }
1722
1723        err = file_remove_privs(file);
1724        if (err)
1725                goto out;
1726
1727        err = file_update_time(file);
1728        if (err)
1729                goto out;
1730
1731        inode_inc_iversion_raw(inode);
1732
1733        if (ci->i_inline_version != CEPH_INLINE_NONE) {
1734                err = ceph_uninline_data(file, NULL);
1735                if (err < 0)
1736                        goto out;
1737        }
1738
1739        down_read(&osdc->lock);
1740        map_flags = osdc->osdmap->flags;
1741        pool_flags = ceph_pg_pool_flags(osdc->osdmap, ci->i_layout.pool_id);
1742        up_read(&osdc->lock);
1743        if ((map_flags & CEPH_OSDMAP_FULL) ||
1744            (pool_flags & CEPH_POOL_FLAG_FULL)) {
1745                err = -ENOSPC;
1746                goto out;
1747        }
1748
1749        dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
1750             inode, ceph_vinop(inode), pos, count, i_size_read(inode));
1751        if (fi->fmode & CEPH_FILE_MODE_LAZY)
1752                want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1753        else
1754                want = CEPH_CAP_FILE_BUFFER;
1755        got = 0;
1756        err = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, pos + count,
1757                            &got, NULL);
1758        if (err < 0)
1759                goto out;
1760
1761        dout("aio_write %p %llx.%llx %llu~%zd got cap refs on %s\n",
1762             inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
1763
1764        if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
1765            (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC) ||
1766            (ci->i_ceph_flags & CEPH_I_ERROR_WRITE)) {
1767                struct ceph_snap_context *snapc;
1768                struct iov_iter data;
1769
1770                spin_lock(&ci->i_ceph_lock);
1771                if (__ceph_have_pending_cap_snap(ci)) {
1772                        struct ceph_cap_snap *capsnap =
1773                                        list_last_entry(&ci->i_cap_snaps,
1774                                                        struct ceph_cap_snap,
1775                                                        ci_item);
1776                        snapc = ceph_get_snap_context(capsnap->context);
1777                } else {
1778                        BUG_ON(!ci->i_head_snapc);
1779                        snapc = ceph_get_snap_context(ci->i_head_snapc);
1780                }
1781                spin_unlock(&ci->i_ceph_lock);
1782
1783                /* we might need to revert back to that point */
1784                data = *from;
1785                if (iocb->ki_flags & IOCB_DIRECT)
1786                        written = ceph_direct_read_write(iocb, &data, snapc,
1787                                                         &prealloc_cf);
1788                else
1789                        written = ceph_sync_write(iocb, &data, pos, snapc);
1790                if (direct_lock)
1791                        ceph_end_io_direct(inode);
1792                else
1793                        ceph_end_io_write(inode);
1794                if (written > 0)
1795                        iov_iter_advance(from, written);
1796                ceph_put_snap_context(snapc);
1797        } else {
1798                /*
1799                 * No need to acquire the i_truncate_mutex. Because
1800                 * the MDS revokes Fwb caps before sending truncate
1801                 * message to us. We can't get Fwb cap while there
1802                 * are pending vmtruncate. So write and vmtruncate
1803                 * can not run at the same time
1804                 */
1805                written = generic_perform_write(file, from, pos);
1806                if (likely(written >= 0))
1807                        iocb->ki_pos = pos + written;
1808                ceph_end_io_write(inode);
1809        }
1810
1811        if (written >= 0) {
1812                int dirty;
1813
1814                spin_lock(&ci->i_ceph_lock);
1815                ci->i_inline_version = CEPH_INLINE_NONE;
1816                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
1817                                               &prealloc_cf);
1818                spin_unlock(&ci->i_ceph_lock);
1819                if (dirty)
1820                        __mark_inode_dirty(inode, dirty);
1821                if (ceph_quota_is_max_bytes_approaching(inode, iocb->ki_pos))
1822                        ceph_check_caps(ci, 0, NULL);
1823        }
1824
1825        dout("aio_write %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
1826             inode, ceph_vinop(inode), pos, (unsigned)count,
1827             ceph_cap_string(got));
1828        ceph_put_cap_refs(ci, got);
1829
1830        if (written == -EOLDSNAPC) {
1831                dout("aio_write %p %llx.%llx %llu~%u" "got EOLDSNAPC, retrying\n",
1832                     inode, ceph_vinop(inode), pos, (unsigned)count);
1833                goto retry_snap;
1834        }
1835
1836        if (written >= 0) {
1837                if ((map_flags & CEPH_OSDMAP_NEARFULL) ||
1838                    (pool_flags & CEPH_POOL_FLAG_NEARFULL))
1839                        iocb->ki_flags |= IOCB_DSYNC;
1840                written = generic_write_sync(iocb, written);
1841        }
1842
1843        goto out_unlocked;
1844out:
1845        if (direct_lock)
1846                ceph_end_io_direct(inode);
1847        else
1848                ceph_end_io_write(inode);
1849out_unlocked:
1850        ceph_free_cap_flush(prealloc_cf);
1851        current->backing_dev_info = NULL;
1852        return written ? written : err;
1853}
1854
1855/*
1856 * llseek.  be sure to verify file size on SEEK_END.
1857 */
1858static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
1859{
1860        struct inode *inode = file->f_mapping->host;
1861        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1862        loff_t i_size;
1863        loff_t ret;
1864
1865        inode_lock(inode);
1866
1867        if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
1868                ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
1869                if (ret < 0)
1870                        goto out;
1871        }
1872
1873        i_size = i_size_read(inode);
1874        switch (whence) {
1875        case SEEK_END:
1876                offset += i_size;
1877                break;
1878        case SEEK_CUR:
1879                /*
1880                 * Here we special-case the lseek(fd, 0, SEEK_CUR)
1881                 * position-querying operation.  Avoid rewriting the "same"
1882                 * f_pos value back to the file because a concurrent read(),
1883                 * write() or lseek() might have altered it
1884                 */
1885                if (offset == 0) {
1886                        ret = file->f_pos;
1887                        goto out;
1888                }
1889                offset += file->f_pos;
1890                break;
1891        case SEEK_DATA:
1892                if (offset < 0 || offset >= i_size) {
1893                        ret = -ENXIO;
1894                        goto out;
1895                }
1896                break;
1897        case SEEK_HOLE:
1898                if (offset < 0 || offset >= i_size) {
1899                        ret = -ENXIO;
1900                        goto out;
1901                }
1902                offset = i_size;
1903                break;
1904        }
1905
1906        ret = vfs_setpos(file, offset, max(i_size, fsc->max_file_size));
1907
1908out:
1909        inode_unlock(inode);
1910        return ret;
1911}
1912
1913static inline void ceph_zero_partial_page(
1914        struct inode *inode, loff_t offset, unsigned size)
1915{
1916        struct page *page;
1917        pgoff_t index = offset >> PAGE_SHIFT;
1918
1919        page = find_lock_page(inode->i_mapping, index);
1920        if (page) {
1921                wait_on_page_writeback(page);
1922                zero_user(page, offset & (PAGE_SIZE - 1), size);
1923                unlock_page(page);
1924                put_page(page);
1925        }
1926}
1927
1928static void ceph_zero_pagecache_range(struct inode *inode, loff_t offset,
1929                                      loff_t length)
1930{
1931        loff_t nearly = round_up(offset, PAGE_SIZE);
1932        if (offset < nearly) {
1933                loff_t size = nearly - offset;
1934                if (length < size)
1935                        size = length;
1936                ceph_zero_partial_page(inode, offset, size);
1937                offset += size;
1938                length -= size;
1939        }
1940        if (length >= PAGE_SIZE) {
1941                loff_t size = round_down(length, PAGE_SIZE);
1942                truncate_pagecache_range(inode, offset, offset + size - 1);
1943                offset += size;
1944                length -= size;
1945        }
1946        if (length)
1947                ceph_zero_partial_page(inode, offset, length);
1948}
1949
1950static int ceph_zero_partial_object(struct inode *inode,
1951                                    loff_t offset, loff_t *length)
1952{
1953        struct ceph_inode_info *ci = ceph_inode(inode);
1954        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1955        struct ceph_osd_request *req;
1956        int ret = 0;
1957        loff_t zero = 0;
1958        int op;
1959
1960        if (!length) {
1961                op = offset ? CEPH_OSD_OP_DELETE : CEPH_OSD_OP_TRUNCATE;
1962                length = &zero;
1963        } else {
1964                op = CEPH_OSD_OP_ZERO;
1965        }
1966
1967        req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1968                                        ceph_vino(inode),
1969                                        offset, length,
1970                                        0, 1, op,
1971                                        CEPH_OSD_FLAG_WRITE,
1972                                        NULL, 0, 0, false);
1973        if (IS_ERR(req)) {
1974                ret = PTR_ERR(req);
1975                goto out;
1976        }
1977
1978        req->r_mtime = inode->i_mtime;
1979        ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1980        if (!ret) {
1981                ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
1982                if (ret == -ENOENT)
1983                        ret = 0;
1984        }
1985        ceph_osdc_put_request(req);
1986
1987out:
1988        return ret;
1989}
1990
1991static int ceph_zero_objects(struct inode *inode, loff_t offset, loff_t length)
1992{
1993        int ret = 0;
1994        struct ceph_inode_info *ci = ceph_inode(inode);
1995        s32 stripe_unit = ci->i_layout.stripe_unit;
1996        s32 stripe_count = ci->i_layout.stripe_count;
1997        s32 object_size = ci->i_layout.object_size;
1998        u64 object_set_size = object_size * stripe_count;
1999        u64 nearly, t;
2000
2001        /* round offset up to next period boundary */
2002        nearly = offset + object_set_size - 1;
2003        t = nearly;
2004        nearly -= do_div(t, object_set_size);
2005
2006        while (length && offset < nearly) {
2007                loff_t size = length;
2008                ret = ceph_zero_partial_object(inode, offset, &size);
2009                if (ret < 0)
2010                        return ret;
2011                offset += size;
2012                length -= size;
2013        }
2014        while (length >= object_set_size) {
2015                int i;
2016                loff_t pos = offset;
2017                for (i = 0; i < stripe_count; ++i) {
2018                        ret = ceph_zero_partial_object(inode, pos, NULL);
2019                        if (ret < 0)
2020                                return ret;
2021                        pos += stripe_unit;
2022                }
2023                offset += object_set_size;
2024                length -= object_set_size;
2025        }
2026        while (length) {
2027                loff_t size = length;
2028                ret = ceph_zero_partial_object(inode, offset, &size);
2029                if (ret < 0)
2030                        return ret;
2031                offset += size;
2032                length -= size;
2033        }
2034        return ret;
2035}
2036
2037static long ceph_fallocate(struct file *file, int mode,
2038                                loff_t offset, loff_t length)
2039{
2040        struct ceph_file_info *fi = file->private_data;
2041        struct inode *inode = file_inode(file);
2042        struct ceph_inode_info *ci = ceph_inode(inode);
2043        struct ceph_cap_flush *prealloc_cf;
2044        int want, got = 0;
2045        int dirty;
2046        int ret = 0;
2047        loff_t endoff = 0;
2048        loff_t size;
2049
2050        if (mode != (FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
2051                return -EOPNOTSUPP;
2052
2053        if (!S_ISREG(inode->i_mode))
2054                return -EOPNOTSUPP;
2055
2056        prealloc_cf = ceph_alloc_cap_flush();
2057        if (!prealloc_cf)
2058                return -ENOMEM;
2059
2060        inode_lock(inode);
2061
2062        if (ceph_snap(inode) != CEPH_NOSNAP) {
2063                ret = -EROFS;
2064                goto unlock;
2065        }
2066
2067        if (ci->i_inline_version != CEPH_INLINE_NONE) {
2068                ret = ceph_uninline_data(file, NULL);
2069                if (ret < 0)
2070                        goto unlock;
2071        }
2072
2073        size = i_size_read(inode);
2074
2075        /* Are we punching a hole beyond EOF? */
2076        if (offset >= size)
2077                goto unlock;
2078        if ((offset + length) > size)
2079                length = size - offset;
2080
2081        if (fi->fmode & CEPH_FILE_MODE_LAZY)
2082                want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
2083        else
2084                want = CEPH_CAP_FILE_BUFFER;
2085
2086        ret = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
2087        if (ret < 0)
2088                goto unlock;
2089
2090        ceph_zero_pagecache_range(inode, offset, length);
2091        ret = ceph_zero_objects(inode, offset, length);
2092
2093        if (!ret) {
2094                spin_lock(&ci->i_ceph_lock);
2095                ci->i_inline_version = CEPH_INLINE_NONE;
2096                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
2097                                               &prealloc_cf);
2098                spin_unlock(&ci->i_ceph_lock);
2099                if (dirty)
2100                        __mark_inode_dirty(inode, dirty);
2101        }
2102
2103        ceph_put_cap_refs(ci, got);
2104unlock:
2105        inode_unlock(inode);
2106        ceph_free_cap_flush(prealloc_cf);
2107        return ret;
2108}
2109
2110/*
2111 * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for
2112 * src_ci.  Two attempts are made to obtain both caps, and an error is return if
2113 * this fails; zero is returned on success.
2114 */
2115static int get_rd_wr_caps(struct file *src_filp, int *src_got,
2116                          struct file *dst_filp,
2117                          loff_t dst_endoff, int *dst_got)
2118{
2119        int ret = 0;
2120        bool retrying = false;
2121
2122retry_caps:
2123        ret = ceph_get_caps(dst_filp, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
2124                            dst_endoff, dst_got, NULL);
2125        if (ret < 0)
2126                return ret;
2127
2128        /*
2129         * Since we're already holding the FILE_WR capability for the dst file,
2130         * we would risk a deadlock by using ceph_get_caps.  Thus, we'll do some
2131         * retry dance instead to try to get both capabilities.
2132         */
2133        ret = ceph_try_get_caps(file_inode(src_filp),
2134                                CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
2135                                false, src_got);
2136        if (ret <= 0) {
2137                /* Start by dropping dst_ci caps and getting src_ci caps */
2138                ceph_put_cap_refs(ceph_inode(file_inode(dst_filp)), *dst_got);
2139                if (retrying) {
2140                        if (!ret)
2141                                /* ceph_try_get_caps masks EAGAIN */
2142                                ret = -EAGAIN;
2143                        return ret;
2144                }
2145                ret = ceph_get_caps(src_filp, CEPH_CAP_FILE_RD,
2146                                    CEPH_CAP_FILE_SHARED, -1, src_got, NULL);
2147                if (ret < 0)
2148                        return ret;
2149                /*... drop src_ci caps too, and retry */
2150                ceph_put_cap_refs(ceph_inode(file_inode(src_filp)), *src_got);
2151                retrying = true;
2152                goto retry_caps;
2153        }
2154        return ret;
2155}
2156
2157static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got,
2158                           struct ceph_inode_info *dst_ci, int dst_got)
2159{
2160        ceph_put_cap_refs(src_ci, src_got);
2161        ceph_put_cap_refs(dst_ci, dst_got);
2162}
2163
2164/*
2165 * This function does several size-related checks, returning an error if:
2166 *  - source file is smaller than off+len
2167 *  - destination file size is not OK (inode_newsize_ok())
2168 *  - max bytes quotas is exceeded
2169 */
2170static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode,
2171                           loff_t src_off, loff_t dst_off, size_t len)
2172{
2173        loff_t size, endoff;
2174
2175        size = i_size_read(src_inode);
2176        /*
2177         * Don't copy beyond source file EOF.  Instead of simply setting length
2178         * to (size - src_off), just drop to VFS default implementation, as the
2179         * local i_size may be stale due to other clients writing to the source
2180         * inode.
2181         */
2182        if (src_off + len > size) {
2183                dout("Copy beyond EOF (%llu + %zu > %llu)\n",
2184                     src_off, len, size);
2185                return -EOPNOTSUPP;
2186        }
2187        size = i_size_read(dst_inode);
2188
2189        endoff = dst_off + len;
2190        if (inode_newsize_ok(dst_inode, endoff))
2191                return -EOPNOTSUPP;
2192
2193        if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff))
2194                return -EDQUOT;
2195
2196        return 0;
2197}
2198
2199static ssize_t ceph_do_objects_copy(struct ceph_inode_info *src_ci, u64 *src_off,
2200                                    struct ceph_inode_info *dst_ci, u64 *dst_off,
2201                                    struct ceph_fs_client *fsc,
2202                                    size_t len, unsigned int flags)
2203{
2204        struct ceph_object_locator src_oloc, dst_oloc;
2205        struct ceph_object_id src_oid, dst_oid;
2206        size_t bytes = 0;
2207        u64 src_objnum, src_objoff, dst_objnum, dst_objoff;
2208        u32 src_objlen, dst_objlen;
2209        u32 object_size = src_ci->i_layout.object_size;
2210        int ret;
2211
2212        src_oloc.pool = src_ci->i_layout.pool_id;
2213        src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
2214        dst_oloc.pool = dst_ci->i_layout.pool_id;
2215        dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
2216
2217        while (len >= object_size) {
2218                ceph_calc_file_object_mapping(&src_ci->i_layout, *src_off,
2219                                              object_size, &src_objnum,
2220                                              &src_objoff, &src_objlen);
2221                ceph_calc_file_object_mapping(&dst_ci->i_layout, *dst_off,
2222                                              object_size, &dst_objnum,
2223                                              &dst_objoff, &dst_objlen);
2224                ceph_oid_init(&src_oid);
2225                ceph_oid_printf(&src_oid, "%llx.%08llx",
2226                                src_ci->i_vino.ino, src_objnum);
2227                ceph_oid_init(&dst_oid);
2228                ceph_oid_printf(&dst_oid, "%llx.%08llx",
2229                                dst_ci->i_vino.ino, dst_objnum);
2230                /* Do an object remote copy */
2231                ret = ceph_osdc_copy_from(&fsc->client->osdc,
2232                                          src_ci->i_vino.snap, 0,
2233                                          &src_oid, &src_oloc,
2234                                          CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
2235                                          CEPH_OSD_OP_FLAG_FADVISE_NOCACHE,
2236                                          &dst_oid, &dst_oloc,
2237                                          CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
2238                                          CEPH_OSD_OP_FLAG_FADVISE_DONTNEED,
2239                                          dst_ci->i_truncate_seq,
2240                                          dst_ci->i_truncate_size,
2241                                          CEPH_OSD_COPY_FROM_FLAG_TRUNCATE_SEQ);
2242                if (ret) {
2243                        if (ret == -EOPNOTSUPP) {
2244                                fsc->have_copy_from2 = false;
2245                                pr_notice("OSDs don't support copy-from2; disabling copy offload\n");
2246                        }
2247                        dout("ceph_osdc_copy_from returned %d\n", ret);
2248                        if (!bytes)
2249                                bytes = ret;
2250                        goto out;
2251                }
2252                len -= object_size;
2253                bytes += object_size;
2254                *src_off += object_size;
2255                *dst_off += object_size;
2256        }
2257
2258out:
2259        ceph_oloc_destroy(&src_oloc);
2260        ceph_oloc_destroy(&dst_oloc);
2261        return bytes;
2262}
2263
2264static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
2265                                      struct file *dst_file, loff_t dst_off,
2266                                      size_t len, unsigned int flags)
2267{
2268        struct inode *src_inode = file_inode(src_file);
2269        struct inode *dst_inode = file_inode(dst_file);
2270        struct ceph_inode_info *src_ci = ceph_inode(src_inode);
2271        struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
2272        struct ceph_cap_flush *prealloc_cf;
2273        struct ceph_fs_client *src_fsc = ceph_inode_to_client(src_inode);
2274        loff_t size;
2275        ssize_t ret = -EIO, bytes;
2276        u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
2277        u32 src_objlen, dst_objlen;
2278        int src_got = 0, dst_got = 0, err, dirty;
2279
2280        if (src_inode->i_sb != dst_inode->i_sb) {
2281                struct ceph_fs_client *dst_fsc = ceph_inode_to_client(dst_inode);
2282
2283                if (ceph_fsid_compare(&src_fsc->client->fsid,
2284                                      &dst_fsc->client->fsid)) {
2285                        dout("Copying files across clusters: src: %pU dst: %pU\n",
2286                             &src_fsc->client->fsid, &dst_fsc->client->fsid);
2287                        return -EXDEV;
2288                }
2289        }
2290        if (ceph_snap(dst_inode) != CEPH_NOSNAP)
2291                return -EROFS;
2292
2293        /*
2294         * Some of the checks below will return -EOPNOTSUPP, which will force a
2295         * fallback to the default VFS copy_file_range implementation.  This is
2296         * desirable in several cases (for ex, the 'len' is smaller than the
2297         * size of the objects, or in cases where that would be more
2298         * efficient).
2299         */
2300
2301        if (ceph_test_mount_opt(src_fsc, NOCOPYFROM))
2302                return -EOPNOTSUPP;
2303
2304        if (!src_fsc->have_copy_from2)
2305                return -EOPNOTSUPP;
2306
2307        /*
2308         * Striped file layouts require that we copy partial objects, but the
2309         * OSD copy-from operation only supports full-object copies.  Limit
2310         * this to non-striped file layouts for now.
2311         */
2312        if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
2313            (src_ci->i_layout.stripe_count != 1) ||
2314            (dst_ci->i_layout.stripe_count != 1) ||
2315            (src_ci->i_layout.object_size != dst_ci->i_layout.object_size)) {
2316                dout("Invalid src/dst files layout\n");
2317                return -EOPNOTSUPP;
2318        }
2319
2320        if (len < src_ci->i_layout.object_size)
2321                return -EOPNOTSUPP; /* no remote copy will be done */
2322
2323        prealloc_cf = ceph_alloc_cap_flush();
2324        if (!prealloc_cf)
2325                return -ENOMEM;
2326
2327        /* Start by sync'ing the source and destination files */
2328        ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
2329        if (ret < 0) {
2330                dout("failed to write src file (%zd)\n", ret);
2331                goto out;
2332        }
2333        ret = file_write_and_wait_range(dst_file, dst_off, (dst_off + len));
2334        if (ret < 0) {
2335                dout("failed to write dst file (%zd)\n", ret);
2336                goto out;
2337        }
2338
2339        /*
2340         * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other
2341         * clients may have dirty data in their caches.  And OSDs know nothing
2342         * about caps, so they can't safely do the remote object copies.
2343         */
2344        err = get_rd_wr_caps(src_file, &src_got,
2345                             dst_file, (dst_off + len), &dst_got);
2346        if (err < 0) {
2347                dout("get_rd_wr_caps returned %d\n", err);
2348                ret = -EOPNOTSUPP;
2349                goto out;
2350        }
2351
2352        ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
2353        if (ret < 0)
2354                goto out_caps;
2355
2356        /* Drop dst file cached pages */
2357        ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
2358                                            dst_off >> PAGE_SHIFT,
2359                                            (dst_off + len) >> PAGE_SHIFT);
2360        if (ret < 0) {
2361                dout("Failed to invalidate inode pages (%zd)\n", ret);
2362                ret = 0; /* XXX */
2363        }
2364        ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
2365                                      src_ci->i_layout.object_size,
2366                                      &src_objnum, &src_objoff, &src_objlen);
2367        ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
2368                                      dst_ci->i_layout.object_size,
2369                                      &dst_objnum, &dst_objoff, &dst_objlen);
2370        /* object-level offsets need to the same */
2371        if (src_objoff != dst_objoff) {
2372                ret = -EOPNOTSUPP;
2373                goto out_caps;
2374        }
2375
2376        /*
2377         * Do a manual copy if the object offset isn't object aligned.
2378         * 'src_objlen' contains the bytes left until the end of the object,
2379         * starting at the src_off
2380         */
2381        if (src_objoff) {
2382                dout("Initial partial copy of %u bytes\n", src_objlen);
2383
2384                /*
2385                 * we need to temporarily drop all caps as we'll be calling
2386                 * {read,write}_iter, which will get caps again.
2387                 */
2388                put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
2389                ret = do_splice_direct(src_file, &src_off, dst_file,
2390                                       &dst_off, src_objlen, flags);
2391                /* Abort on short copies or on error */
2392                if (ret < src_objlen) {
2393                        dout("Failed partial copy (%zd)\n", ret);
2394                        goto out;
2395                }
2396                len -= ret;
2397                err = get_rd_wr_caps(src_file, &src_got,
2398                                     dst_file, (dst_off + len), &dst_got);
2399                if (err < 0)
2400                        goto out;
2401                err = is_file_size_ok(src_inode, dst_inode,
2402                                      src_off, dst_off, len);
2403                if (err < 0)
2404                        goto out_caps;
2405        }
2406
2407        size = i_size_read(dst_inode);
2408        bytes = ceph_do_objects_copy(src_ci, &src_off, dst_ci, &dst_off,
2409                                     src_fsc, len, flags);
2410        if (bytes <= 0) {
2411                if (!ret)
2412                        ret = bytes;
2413                goto out_caps;
2414        }
2415        dout("Copied %zu bytes out of %zu\n", bytes, len);
2416        len -= bytes;
2417        ret += bytes;
2418
2419        file_update_time(dst_file);
2420        inode_inc_iversion_raw(dst_inode);
2421
2422        if (dst_off > size) {
2423                /* Let the MDS know about dst file size change */
2424                if (ceph_inode_set_size(dst_inode, dst_off) ||
2425                    ceph_quota_is_max_bytes_approaching(dst_inode, dst_off))
2426                        ceph_check_caps(dst_ci, CHECK_CAPS_AUTHONLY, NULL);
2427        }
2428        /* Mark Fw dirty */
2429        spin_lock(&dst_ci->i_ceph_lock);
2430        dst_ci->i_inline_version = CEPH_INLINE_NONE;
2431        dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
2432        spin_unlock(&dst_ci->i_ceph_lock);
2433        if (dirty)
2434                __mark_inode_dirty(dst_inode, dirty);
2435
2436out_caps:
2437        put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
2438
2439        /*
2440         * Do the final manual copy if we still have some bytes left, unless
2441         * there were errors in remote object copies (len >= object_size).
2442         */
2443        if (len && (len < src_ci->i_layout.object_size)) {
2444                dout("Final partial copy of %zu bytes\n", len);
2445                bytes = do_splice_direct(src_file, &src_off, dst_file,
2446                                         &dst_off, len, flags);
2447                if (bytes > 0)
2448                        ret += bytes;
2449                else
2450                        dout("Failed partial copy (%zd)\n", bytes);
2451        }
2452
2453out:
2454        ceph_free_cap_flush(prealloc_cf);
2455
2456        return ret;
2457}
2458
2459static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
2460                                    struct file *dst_file, loff_t dst_off,
2461                                    size_t len, unsigned int flags)
2462{
2463        ssize_t ret;
2464
2465        ret = __ceph_copy_file_range(src_file, src_off, dst_file, dst_off,
2466                                     len, flags);
2467
2468        if (ret == -EOPNOTSUPP || ret == -EXDEV)
2469                ret = generic_copy_file_range(src_file, src_off, dst_file,
2470                                              dst_off, len, flags);
2471        return ret;
2472}
2473
2474const struct file_operations ceph_file_fops = {
2475        .open = ceph_open,
2476        .release = ceph_release,
2477        .llseek = ceph_llseek,
2478        .read_iter = ceph_read_iter,
2479        .write_iter = ceph_write_iter,
2480        .mmap = ceph_mmap,
2481        .fsync = ceph_fsync,
2482        .lock = ceph_lock,
2483        .setlease = simple_nosetlease,
2484        .flock = ceph_flock,
2485        .splice_read = generic_file_splice_read,
2486        .splice_write = iter_file_splice_write,
2487        .unlocked_ioctl = ceph_ioctl,
2488        .compat_ioctl = compat_ptr_ioctl,
2489        .fallocate      = ceph_fallocate,
2490        .copy_file_range = ceph_copy_file_range,
2491};
2492