linux/fs/ceph/file.c
<<
>>
Prefs
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/module.h>
   4#include <linux/sched.h>
   5#include <linux/slab.h>
   6#include <linux/file.h>
   7#include <linux/mount.h>
   8#include <linux/namei.h>
   9#include <linux/writeback.h>
  10#include <linux/falloc.h>
  11
  12#include "super.h"
  13#include "mds_client.h"
  14#include "cache.h"
  15
  16/*
  17 * Ceph file operations
  18 *
  19 * Implement basic open/close functionality, and implement
  20 * read/write.
  21 *
  22 * We implement three modes of file I/O:
  23 *  - buffered uses the generic_file_aio_{read,write} helpers
  24 *
  25 *  - synchronous is used when there is multi-client read/write
  26 *    sharing, avoids the page cache, and synchronously waits for an
  27 *    ack from the OSD.
  28 *
  29 *  - direct io takes the variant of the sync path that references
  30 *    user pages directly.
  31 *
  32 * fsync() flushes and waits on dirty pages, but just queues metadata
  33 * for writeback: since the MDS can recover size and mtime there is no
  34 * need to wait for MDS acknowledgement.
  35 */
  36
  37/*
  38 * Calculate the length sum of direct io vectors that can
  39 * be combined into one page vector.
  40 */
  41static size_t dio_get_pagev_size(const struct iov_iter *it)
  42{
  43    const struct iovec *iov = it->iov;
  44    const struct iovec *iovend = iov + it->nr_segs;
  45    size_t size;
  46
  47    size = iov->iov_len - it->iov_offset;
  48    /*
  49     * An iov can be page vectored when both the current tail
  50     * and the next base are page aligned.
  51     */
  52    while (PAGE_ALIGNED((iov->iov_base + iov->iov_len)) &&
  53           (++iov < iovend && PAGE_ALIGNED((iov->iov_base)))) {
  54        size += iov->iov_len;
  55    }
  56    dout("dio_get_pagevlen len = %zu\n", size);
  57    return size;
  58}
  59
  60/*
  61 * Allocate a page vector based on (@it, @nbytes).
  62 * The return value is the tuple describing a page vector,
  63 * that is (@pages, @page_align, @num_pages).
  64 */
  65static struct page **
  66dio_get_pages_alloc(const struct iov_iter *it, size_t nbytes,
  67                    size_t *page_align, int *num_pages)
  68{
  69        struct iov_iter tmp_it = *it;
  70        size_t align;
  71        struct page **pages;
  72        int ret = 0, idx, npages;
  73
  74        align = (unsigned long)(it->iov->iov_base + it->iov_offset) &
  75                (PAGE_SIZE - 1);
  76        npages = calc_pages_for(align, nbytes);
  77        pages = kmalloc(sizeof(*pages) * npages, GFP_KERNEL);
  78        if (!pages) {
  79                pages = vmalloc(sizeof(*pages) * npages);
  80                if (!pages)
  81                        return ERR_PTR(-ENOMEM);
  82        }
  83
  84        for (idx = 0; idx < npages; ) {
  85                size_t start;
  86                ret = iov_iter_get_pages(&tmp_it, pages + idx, nbytes,
  87                                         npages - idx, &start);
  88                if (ret < 0)
  89                        goto fail;
  90
  91                iov_iter_advance(&tmp_it, ret);
  92                nbytes -= ret;
  93                idx += (ret + start + PAGE_SIZE - 1) / PAGE_SIZE;
  94        }
  95
  96        BUG_ON(nbytes != 0);
  97        *num_pages = npages;
  98        *page_align = align;
  99        dout("dio_get_pages_alloc: got %d pages align %zu\n", npages, align);
 100        return pages;
 101fail:
 102        ceph_put_page_vector(pages, idx, false);
 103        return ERR_PTR(ret);
 104}
 105
 106/*
 107 * Prepare an open request.  Preallocate ceph_cap to avoid an
 108 * inopportune ENOMEM later.
 109 */
 110static struct ceph_mds_request *
 111prepare_open_request(struct super_block *sb, int flags, int create_mode)
 112{
 113        struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
 114        struct ceph_mds_client *mdsc = fsc->mdsc;
 115        struct ceph_mds_request *req;
 116        int want_auth = USE_ANY_MDS;
 117        int op = (flags & O_CREAT) ? CEPH_MDS_OP_CREATE : CEPH_MDS_OP_OPEN;
 118
 119        if (flags & (O_WRONLY|O_RDWR|O_CREAT|O_TRUNC))
 120                want_auth = USE_AUTH_MDS;
 121
 122        req = ceph_mdsc_create_request(mdsc, op, want_auth);
 123        if (IS_ERR(req))
 124                goto out;
 125        req->r_fmode = ceph_flags_to_mode(flags);
 126        req->r_args.open.flags = cpu_to_le32(flags);
 127        req->r_args.open.mode = cpu_to_le32(create_mode);
 128out:
 129        return req;
 130}
 131
 132/*
 133 * initialize private struct file data.
 134 * if we fail, clean up by dropping fmode reference on the ceph_inode
 135 */
 136static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
 137{
 138        struct ceph_file_info *cf;
 139        int ret = 0;
 140
 141        switch (inode->i_mode & S_IFMT) {
 142        case S_IFREG:
 143                ceph_fscache_register_inode_cookie(inode);
 144                ceph_fscache_file_set_cookie(inode, file);
 145        case S_IFDIR:
 146                dout("init_file %p %p 0%o (regular)\n", inode, file,
 147                     inode->i_mode);
 148                cf = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL);
 149                if (cf == NULL) {
 150                        ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
 151                        return -ENOMEM;
 152                }
 153                cf->fmode = fmode;
 154                cf->next_offset = 2;
 155                cf->readdir_cache_idx = -1;
 156                file->private_data = cf;
 157                BUG_ON(inode->i_fop->release != ceph_release);
 158                break;
 159
 160        case S_IFLNK:
 161                dout("init_file %p %p 0%o (symlink)\n", inode, file,
 162                     inode->i_mode);
 163                ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
 164                break;
 165
 166        default:
 167                dout("init_file %p %p 0%o (special)\n", inode, file,
 168                     inode->i_mode);
 169                /*
 170                 * we need to drop the open ref now, since we don't
 171                 * have .release set to ceph_release.
 172                 */
 173                ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
 174                BUG_ON(inode->i_fop->release == ceph_release);
 175
 176                /* call the proper open fop */
 177                ret = inode->i_fop->open(inode, file);
 178        }
 179        return ret;
 180}
 181
 182/*
 183 * try renew caps after session gets killed.
 184 */
 185int ceph_renew_caps(struct inode *inode)
 186{
 187        struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 188        struct ceph_inode_info *ci = ceph_inode(inode);
 189        struct ceph_mds_request *req;
 190        int err, flags, wanted;
 191
 192        spin_lock(&ci->i_ceph_lock);
 193        wanted = __ceph_caps_file_wanted(ci);
 194        if (__ceph_is_any_real_caps(ci) &&
 195            (!(wanted & CEPH_CAP_ANY_WR) == 0 || ci->i_auth_cap)) {
 196                int issued = __ceph_caps_issued(ci, NULL);
 197                spin_unlock(&ci->i_ceph_lock);
 198                dout("renew caps %p want %s issued %s updating mds_wanted\n",
 199                     inode, ceph_cap_string(wanted), ceph_cap_string(issued));
 200                ceph_check_caps(ci, 0, NULL);
 201                return 0;
 202        }
 203        spin_unlock(&ci->i_ceph_lock);
 204
 205        flags = 0;
 206        if ((wanted & CEPH_CAP_FILE_RD) && (wanted & CEPH_CAP_FILE_WR))
 207                flags = O_RDWR;
 208        else if (wanted & CEPH_CAP_FILE_RD)
 209                flags = O_RDONLY;
 210        else if (wanted & CEPH_CAP_FILE_WR)
 211                flags = O_WRONLY;
 212#ifdef O_LAZY
 213        if (wanted & CEPH_CAP_FILE_LAZYIO)
 214                flags |= O_LAZY;
 215#endif
 216
 217        req = prepare_open_request(inode->i_sb, flags, 0);
 218        if (IS_ERR(req)) {
 219                err = PTR_ERR(req);
 220                goto out;
 221        }
 222
 223        req->r_inode = inode;
 224        ihold(inode);
 225        req->r_num_caps = 1;
 226        req->r_fmode = -1;
 227
 228        err = ceph_mdsc_do_request(mdsc, NULL, req);
 229        ceph_mdsc_put_request(req);
 230out:
 231        dout("renew caps %p open result=%d\n", inode, err);
 232        return err < 0 ? err : 0;
 233}
 234
 235/*
 236 * If we already have the requisite capabilities, we can satisfy
 237 * the open request locally (no need to request new caps from the
 238 * MDS).  We do, however, need to inform the MDS (asynchronously)
 239 * if our wanted caps set expands.
 240 */
 241int ceph_open(struct inode *inode, struct file *file)
 242{
 243        struct ceph_inode_info *ci = ceph_inode(inode);
 244        struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
 245        struct ceph_mds_client *mdsc = fsc->mdsc;
 246        struct ceph_mds_request *req;
 247        struct ceph_file_info *cf = file->private_data;
 248        int err;
 249        int flags, fmode, wanted;
 250
 251        if (cf) {
 252                dout("open file %p is already opened\n", file);
 253                return 0;
 254        }
 255
 256        /* filter out O_CREAT|O_EXCL; vfs did that already.  yuck. */
 257        flags = file->f_flags & ~(O_CREAT|O_EXCL);
 258        if (S_ISDIR(inode->i_mode))
 259                flags = O_DIRECTORY;  /* mds likes to know */
 260
 261        dout("open inode %p ino %llx.%llx file %p flags %d (%d)\n", inode,
 262             ceph_vinop(inode), file, flags, file->f_flags);
 263        fmode = ceph_flags_to_mode(flags);
 264        wanted = ceph_caps_for_mode(fmode);
 265
 266        /* snapped files are read-only */
 267        if (ceph_snap(inode) != CEPH_NOSNAP && (file->f_mode & FMODE_WRITE))
 268                return -EROFS;
 269
 270        /* trivially open snapdir */
 271        if (ceph_snap(inode) == CEPH_SNAPDIR) {
 272                spin_lock(&ci->i_ceph_lock);
 273                __ceph_get_fmode(ci, fmode);
 274                spin_unlock(&ci->i_ceph_lock);
 275                return ceph_init_file(inode, file, fmode);
 276        }
 277
 278        /*
 279         * No need to block if we have caps on the auth MDS (for
 280         * write) or any MDS (for read).  Update wanted set
 281         * asynchronously.
 282         */
 283        spin_lock(&ci->i_ceph_lock);
 284        if (__ceph_is_any_real_caps(ci) &&
 285            (((fmode & CEPH_FILE_MODE_WR) == 0) || ci->i_auth_cap)) {
 286                int mds_wanted = __ceph_caps_mds_wanted(ci);
 287                int issued = __ceph_caps_issued(ci, NULL);
 288
 289                dout("open %p fmode %d want %s issued %s using existing\n",
 290                     inode, fmode, ceph_cap_string(wanted),
 291                     ceph_cap_string(issued));
 292                __ceph_get_fmode(ci, fmode);
 293                spin_unlock(&ci->i_ceph_lock);
 294
 295                /* adjust wanted? */
 296                if ((issued & wanted) != wanted &&
 297                    (mds_wanted & wanted) != wanted &&
 298                    ceph_snap(inode) != CEPH_SNAPDIR)
 299                        ceph_check_caps(ci, 0, NULL);
 300
 301                return ceph_init_file(inode, file, fmode);
 302        } else if (ceph_snap(inode) != CEPH_NOSNAP &&
 303                   (ci->i_snap_caps & wanted) == wanted) {
 304                __ceph_get_fmode(ci, fmode);
 305                spin_unlock(&ci->i_ceph_lock);
 306                return ceph_init_file(inode, file, fmode);
 307        }
 308
 309        spin_unlock(&ci->i_ceph_lock);
 310
 311        dout("open fmode %d wants %s\n", fmode, ceph_cap_string(wanted));
 312        req = prepare_open_request(inode->i_sb, flags, 0);
 313        if (IS_ERR(req)) {
 314                err = PTR_ERR(req);
 315                goto out;
 316        }
 317        req->r_inode = inode;
 318        ihold(inode);
 319
 320        req->r_num_caps = 1;
 321        err = ceph_mdsc_do_request(mdsc, NULL, req);
 322        if (!err)
 323                err = ceph_init_file(inode, file, req->r_fmode);
 324        ceph_mdsc_put_request(req);
 325        dout("open result=%d on %llx.%llx\n", err, ceph_vinop(inode));
 326out:
 327        return err;
 328}
 329
 330
 331/*
 332 * Do a lookup + open with a single request.  If we get a non-existent
 333 * file or symlink, return 1 so the VFS can retry.
 334 */
 335int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 336                     struct file *file, unsigned flags, umode_t mode,
 337                     int *opened)
 338{
 339        struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
 340        struct ceph_mds_client *mdsc = fsc->mdsc;
 341        struct ceph_mds_request *req;
 342        struct dentry *dn;
 343        struct ceph_acls_info acls = {};
 344       int mask;
 345        int err;
 346
 347        dout("atomic_open %p dentry %p '%pd' %s flags %d mode 0%o\n",
 348             dir, dentry, dentry,
 349             d_unhashed(dentry) ? "unhashed" : "hashed", flags, mode);
 350
 351        if (dentry->d_name.len > NAME_MAX)
 352                return -ENAMETOOLONG;
 353
 354        err = ceph_init_dentry(dentry);
 355        if (err < 0)
 356                return err;
 357
 358        if (flags & O_CREAT) {
 359                err = ceph_pre_init_acls(dir, &mode, &acls);
 360                if (err < 0)
 361                        return err;
 362        }
 363
 364        /* do the open */
 365        req = prepare_open_request(dir->i_sb, flags, mode);
 366        if (IS_ERR(req)) {
 367                err = PTR_ERR(req);
 368                goto out_acl;
 369        }
 370        req->r_dentry = dget(dentry);
 371        req->r_num_caps = 2;
 372        if (flags & O_CREAT) {
 373                req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
 374                req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
 375                if (acls.pagelist) {
 376                        req->r_pagelist = acls.pagelist;
 377                        acls.pagelist = NULL;
 378                }
 379        }
 380
 381       mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
 382       if (ceph_security_xattr_wanted(dir))
 383               mask |= CEPH_CAP_XATTR_SHARED;
 384       req->r_args.open.mask = cpu_to_le32(mask);
 385
 386        req->r_locked_dir = dir;           /* caller holds dir->i_mutex */
 387        err = ceph_mdsc_do_request(mdsc,
 388                                   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
 389                                   req);
 390        err = ceph_handle_snapdir(req, dentry, err);
 391        if (err)
 392                goto out_req;
 393
 394        if ((flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
 395                err = ceph_handle_notrace_create(dir, dentry);
 396
 397        if (d_in_lookup(dentry)) {
 398                dn = ceph_finish_lookup(req, dentry, err);
 399                if (IS_ERR(dn))
 400                        err = PTR_ERR(dn);
 401        } else {
 402                /* we were given a hashed negative dentry */
 403                dn = NULL;
 404        }
 405        if (err)
 406                goto out_req;
 407        if (dn || d_really_is_negative(dentry) || d_is_symlink(dentry)) {
 408                /* make vfs retry on splice, ENOENT, or symlink */
 409                dout("atomic_open finish_no_open on dn %p\n", dn);
 410                err = finish_no_open(file, dn);
 411        } else {
 412                dout("atomic_open finish_open on dn %p\n", dn);
 413                if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
 414                        ceph_init_inode_acls(d_inode(dentry), &acls);
 415                        *opened |= FILE_CREATED;
 416                }
 417                err = finish_open(file, dentry, ceph_open, opened);
 418        }
 419out_req:
 420        if (!req->r_err && req->r_target_inode)
 421                ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode);
 422        ceph_mdsc_put_request(req);
 423out_acl:
 424        ceph_release_acls_info(&acls);
 425        dout("atomic_open result=%d\n", err);
 426        return err;
 427}
 428
 429int ceph_release(struct inode *inode, struct file *file)
 430{
 431        struct ceph_inode_info *ci = ceph_inode(inode);
 432        struct ceph_file_info *cf = file->private_data;
 433
 434        dout("release inode %p file %p\n", inode, file);
 435        ceph_put_fmode(ci, cf->fmode);
 436        if (cf->last_readdir)
 437                ceph_mdsc_put_request(cf->last_readdir);
 438        kfree(cf->last_name);
 439        kfree(cf->dir_info);
 440        kmem_cache_free(ceph_file_cachep, cf);
 441
 442        /* wake up anyone waiting for caps on this inode */
 443        wake_up_all(&ci->i_cap_wq);
 444        return 0;
 445}
 446
 447enum {
 448        HAVE_RETRIED = 1,
 449        CHECK_EOF =    2,
 450        READ_INLINE =  3,
 451};
 452
 453/*
 454 * Read a range of bytes striped over one or more objects.  Iterate over
 455 * objects we stripe over.  (That's not atomic, but good enough for now.)
 456 *
 457 * If we get a short result from the OSD, check against i_size; we need to
 458 * only return a short read to the caller if we hit EOF.
 459 */
 460static int striped_read(struct inode *inode,
 461                        u64 off, u64 len,
 462                        struct page **pages, int num_pages,
 463                        int *checkeof)
 464{
 465        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 466        struct ceph_inode_info *ci = ceph_inode(inode);
 467        u64 pos, this_len, left;
 468        loff_t i_size;
 469        int page_align, pages_left;
 470        int read, ret;
 471        struct page **page_pos;
 472        bool hit_stripe, was_short;
 473
 474        /*
 475         * we may need to do multiple reads.  not atomic, unfortunately.
 476         */
 477        pos = off;
 478        left = len;
 479        page_pos = pages;
 480        pages_left = num_pages;
 481        read = 0;
 482
 483more:
 484        page_align = pos & ~PAGE_MASK;
 485        this_len = left;
 486        ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
 487                                  &ci->i_layout, pos, &this_len,
 488                                  ci->i_truncate_seq,
 489                                  ci->i_truncate_size,
 490                                  page_pos, pages_left, page_align);
 491        if (ret == -ENOENT)
 492                ret = 0;
 493        hit_stripe = this_len < left;
 494        was_short = ret >= 0 && ret < this_len;
 495        dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read,
 496             ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
 497
 498        i_size = i_size_read(inode);
 499        if (ret >= 0) {
 500                int didpages;
 501                if (was_short && (pos + ret < i_size)) {
 502                        int zlen = min(this_len - ret, i_size - pos - ret);
 503                        int zoff = (off & ~PAGE_MASK) + read + ret;
 504                        dout(" zero gap %llu to %llu\n",
 505                                pos + ret, pos + ret + zlen);
 506                        ceph_zero_page_vector_range(zoff, zlen, pages);
 507                        ret += zlen;
 508                }
 509
 510                didpages = (page_align + ret) >> PAGE_SHIFT;
 511                pos += ret;
 512                read = pos - off;
 513                left -= ret;
 514                page_pos += didpages;
 515                pages_left -= didpages;
 516
 517                /* hit stripe and need continue*/
 518                if (left && hit_stripe && pos < i_size)
 519                        goto more;
 520        }
 521
 522        if (read > 0) {
 523                ret = read;
 524                /* did we bounce off eof? */
 525                if (pos + left > i_size)
 526                        *checkeof = CHECK_EOF;
 527        }
 528
 529        dout("striped_read returns %d\n", ret);
 530        return ret;
 531}
 532
 533/*
 534 * Completely synchronous read and write methods.  Direct from __user
 535 * buffer to osd, or directly to user pages (if O_DIRECT).
 536 *
 537 * If the read spans object boundary, just do multiple reads.
 538 */
 539static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
 540                                int *checkeof)
 541{
 542        struct file *file = iocb->ki_filp;
 543        struct inode *inode = file_inode(file);
 544        struct page **pages;
 545        u64 off = iocb->ki_pos;
 546        int num_pages, ret;
 547        size_t len = iov_iter_count(i);
 548
 549        dout("sync_read on file %p %llu~%u %s\n", file, off,
 550             (unsigned)len,
 551             (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
 552
 553        if (!len)
 554                return 0;
 555        /*
 556         * flush any page cache pages in this range.  this
 557         * will make concurrent normal and sync io slow,
 558         * but it will at least behave sensibly when they are
 559         * in sequence.
 560         */
 561        ret = filemap_write_and_wait_range(inode->i_mapping, off,
 562                                                off + len);
 563        if (ret < 0)
 564                return ret;
 565
 566        num_pages = calc_pages_for(off, len);
 567        pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
 568        if (IS_ERR(pages))
 569                return PTR_ERR(pages);
 570        ret = striped_read(inode, off, len, pages,
 571                                num_pages, checkeof);
 572        if (ret > 0) {
 573                int l, k = 0;
 574                size_t left = ret;
 575
 576                while (left) {
 577                        size_t page_off = off & ~PAGE_MASK;
 578                        size_t copy = min_t(size_t, left,
 579                                            PAGE_SIZE - page_off);
 580                        l = copy_page_to_iter(pages[k++], page_off, copy, i);
 581                        off += l;
 582                        left -= l;
 583                        if (l < copy)
 584                                break;
 585                }
 586        }
 587        ceph_release_page_vector(pages, num_pages);
 588
 589        if (off > iocb->ki_pos) {
 590                ret = off - iocb->ki_pos;
 591                iocb->ki_pos = off;
 592        }
 593
 594        dout("sync_read result %d\n", ret);
 595        return ret;
 596}
 597
 598struct ceph_aio_request {
 599        struct kiocb *iocb;
 600        size_t total_len;
 601        int write;
 602        int error;
 603        struct list_head osd_reqs;
 604        unsigned num_reqs;
 605        atomic_t pending_reqs;
 606        struct timespec mtime;
 607        struct ceph_cap_flush *prealloc_cf;
 608};
 609
 610struct ceph_aio_work {
 611        struct work_struct work;
 612        struct ceph_osd_request *req;
 613};
 614
 615static void ceph_aio_retry_work(struct work_struct *work);
 616
 617static void ceph_aio_complete(struct inode *inode,
 618                              struct ceph_aio_request *aio_req)
 619{
 620        struct ceph_inode_info *ci = ceph_inode(inode);
 621        int ret;
 622
 623        if (!atomic_dec_and_test(&aio_req->pending_reqs))
 624                return;
 625
 626        ret = aio_req->error;
 627        if (!ret)
 628                ret = aio_req->total_len;
 629
 630        dout("ceph_aio_complete %p rc %d\n", inode, ret);
 631
 632        if (ret >= 0 && aio_req->write) {
 633                int dirty;
 634
 635                loff_t endoff = aio_req->iocb->ki_pos + aio_req->total_len;
 636                if (endoff > i_size_read(inode)) {
 637                        if (ceph_inode_set_size(inode, endoff))
 638                                ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
 639                }
 640
 641                spin_lock(&ci->i_ceph_lock);
 642                ci->i_inline_version = CEPH_INLINE_NONE;
 643                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
 644                                               &aio_req->prealloc_cf);
 645                spin_unlock(&ci->i_ceph_lock);
 646                if (dirty)
 647                        __mark_inode_dirty(inode, dirty);
 648
 649        }
 650
 651        ceph_put_cap_refs(ci, (aio_req->write ? CEPH_CAP_FILE_WR :
 652                                                CEPH_CAP_FILE_RD));
 653
 654        aio_req->iocb->ki_complete(aio_req->iocb, ret, 0);
 655
 656        ceph_free_cap_flush(aio_req->prealloc_cf);
 657        kfree(aio_req);
 658}
 659
 660static void ceph_aio_complete_req(struct ceph_osd_request *req)
 661{
 662        int rc = req->r_result;
 663        struct inode *inode = req->r_inode;
 664        struct ceph_aio_request *aio_req = req->r_priv;
 665        struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
 666        int num_pages = calc_pages_for((u64)osd_data->alignment,
 667                                       osd_data->length);
 668
 669        dout("ceph_aio_complete_req %p rc %d bytes %llu\n",
 670             inode, rc, osd_data->length);
 671
 672        if (rc == -EOLDSNAPC) {
 673                struct ceph_aio_work *aio_work;
 674                BUG_ON(!aio_req->write);
 675
 676                aio_work = kmalloc(sizeof(*aio_work), GFP_NOFS);
 677                if (aio_work) {
 678                        INIT_WORK(&aio_work->work, ceph_aio_retry_work);
 679                        aio_work->req = req;
 680                        queue_work(ceph_inode_to_client(inode)->wb_wq,
 681                                   &aio_work->work);
 682                        return;
 683                }
 684                rc = -ENOMEM;
 685        } else if (!aio_req->write) {
 686                if (rc == -ENOENT)
 687                        rc = 0;
 688                if (rc >= 0 && osd_data->length > rc) {
 689                        int zoff = osd_data->alignment + rc;
 690                        int zlen = osd_data->length - rc;
 691                        /*
 692                         * If read is satisfied by single OSD request,
 693                         * it can pass EOF. Otherwise read is within
 694                         * i_size.
 695                         */
 696                        if (aio_req->num_reqs == 1) {
 697                                loff_t i_size = i_size_read(inode);
 698                                loff_t endoff = aio_req->iocb->ki_pos + rc;
 699                                if (endoff < i_size)
 700                                        zlen = min_t(size_t, zlen,
 701                                                     i_size - endoff);
 702                                aio_req->total_len = rc + zlen;
 703                        }
 704
 705                        if (zlen > 0)
 706                                ceph_zero_page_vector_range(zoff, zlen,
 707                                                            osd_data->pages);
 708                }
 709        }
 710
 711        ceph_put_page_vector(osd_data->pages, num_pages, !aio_req->write);
 712        ceph_osdc_put_request(req);
 713
 714        if (rc < 0)
 715                cmpxchg(&aio_req->error, 0, rc);
 716
 717        ceph_aio_complete(inode, aio_req);
 718        return;
 719}
 720
 721static void ceph_aio_retry_work(struct work_struct *work)
 722{
 723        struct ceph_aio_work *aio_work =
 724                container_of(work, struct ceph_aio_work, work);
 725        struct ceph_osd_request *orig_req = aio_work->req;
 726        struct ceph_aio_request *aio_req = orig_req->r_priv;
 727        struct inode *inode = orig_req->r_inode;
 728        struct ceph_inode_info *ci = ceph_inode(inode);
 729        struct ceph_snap_context *snapc;
 730        struct ceph_osd_request *req;
 731        int ret;
 732
 733        spin_lock(&ci->i_ceph_lock);
 734        if (__ceph_have_pending_cap_snap(ci)) {
 735                struct ceph_cap_snap *capsnap =
 736                        list_last_entry(&ci->i_cap_snaps,
 737                                        struct ceph_cap_snap,
 738                                        ci_item);
 739                snapc = ceph_get_snap_context(capsnap->context);
 740        } else {
 741                BUG_ON(!ci->i_head_snapc);
 742                snapc = ceph_get_snap_context(ci->i_head_snapc);
 743        }
 744        spin_unlock(&ci->i_ceph_lock);
 745
 746        req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2,
 747                        false, GFP_NOFS);
 748        if (!req) {
 749                ret = -ENOMEM;
 750                req = orig_req;
 751                goto out;
 752        }
 753
 754        req->r_flags =  CEPH_OSD_FLAG_ORDERSNAP |
 755                        CEPH_OSD_FLAG_ONDISK |
 756                        CEPH_OSD_FLAG_WRITE;
 757        ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc);
 758        ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid);
 759
 760        ret = ceph_osdc_alloc_messages(req, GFP_NOFS);
 761        if (ret) {
 762                ceph_osdc_put_request(req);
 763                req = orig_req;
 764                goto out;
 765        }
 766
 767        req->r_ops[0] = orig_req->r_ops[0];
 768        osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
 769
 770        req->r_mtime = aio_req->mtime;
 771        req->r_data_offset = req->r_ops[0].extent.offset;
 772
 773        ceph_osdc_put_request(orig_req);
 774
 775        req->r_callback = ceph_aio_complete_req;
 776        req->r_inode = inode;
 777        req->r_priv = aio_req;
 778
 779        ret = ceph_osdc_start_request(req->r_osdc, req, false);
 780out:
 781        if (ret < 0) {
 782                req->r_result = ret;
 783                ceph_aio_complete_req(req);
 784        }
 785
 786        ceph_put_snap_context(snapc);
 787        kfree(aio_work);
 788}
 789
 790/*
 791 * Write commit request unsafe callback, called to tell us when a
 792 * request is unsafe (that is, in flight--has been handed to the
 793 * messenger to send to its target osd).  It is called again when
 794 * we've received a response message indicating the request is
 795 * "safe" (its CEPH_OSD_FLAG_ONDISK flag is set), or when a request
 796 * is completed early (and unsuccessfully) due to a timeout or
 797 * interrupt.
 798 *
 799 * This is used if we requested both an ACK and ONDISK commit reply
 800 * from the OSD.
 801 */
 802static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
 803{
 804        struct ceph_inode_info *ci = ceph_inode(req->r_inode);
 805
 806        dout("%s %p tid %llu %ssafe\n", __func__, req, req->r_tid,
 807                unsafe ? "un" : "");
 808        if (unsafe) {
 809                ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR);
 810                spin_lock(&ci->i_unsafe_lock);
 811                list_add_tail(&req->r_unsafe_item,
 812                              &ci->i_unsafe_writes);
 813                spin_unlock(&ci->i_unsafe_lock);
 814
 815                complete_all(&req->r_completion);
 816        } else {
 817                spin_lock(&ci->i_unsafe_lock);
 818                list_del_init(&req->r_unsafe_item);
 819                spin_unlock(&ci->i_unsafe_lock);
 820                ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR);
 821        }
 822}
 823
 824/*
 825 * Wait on any unsafe replies for the given inode.  First wait on the
 826 * newest request, and make that the upper bound.  Then, if there are
 827 * more requests, keep waiting on the oldest as long as it is still older
 828 * than the original request.
 829 */
 830void ceph_sync_write_wait(struct inode *inode)
 831{
 832        struct ceph_inode_info *ci = ceph_inode(inode);
 833        struct list_head *head = &ci->i_unsafe_writes;
 834        struct ceph_osd_request *req;
 835        u64 last_tid;
 836
 837        if (!S_ISREG(inode->i_mode))
 838                return;
 839
 840        spin_lock(&ci->i_unsafe_lock);
 841        if (list_empty(head))
 842                goto out;
 843
 844        /* set upper bound as _last_ entry in chain */
 845
 846        req = list_last_entry(head, struct ceph_osd_request,
 847                              r_unsafe_item);
 848        last_tid = req->r_tid;
 849
 850        do {
 851                ceph_osdc_get_request(req);
 852                spin_unlock(&ci->i_unsafe_lock);
 853
 854                dout("sync_write_wait on tid %llu (until %llu)\n",
 855                     req->r_tid, last_tid);
 856                wait_for_completion(&req->r_safe_completion);
 857                ceph_osdc_put_request(req);
 858
 859                spin_lock(&ci->i_unsafe_lock);
 860                /*
 861                 * from here on look at first entry in chain, since we
 862                 * only want to wait for anything older than last_tid
 863                 */
 864                if (list_empty(head))
 865                        break;
 866                req = list_first_entry(head, struct ceph_osd_request,
 867                                       r_unsafe_item);
 868        } while (req->r_tid < last_tid);
 869out:
 870        spin_unlock(&ci->i_unsafe_lock);
 871}
 872
 873static ssize_t
 874ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 875                       struct ceph_snap_context *snapc,
 876                       struct ceph_cap_flush **pcf)
 877{
 878        struct file *file = iocb->ki_filp;
 879        struct inode *inode = file_inode(file);
 880        struct ceph_inode_info *ci = ceph_inode(inode);
 881        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 882        struct ceph_vino vino;
 883        struct ceph_osd_request *req;
 884        struct page **pages;
 885        struct ceph_aio_request *aio_req = NULL;
 886        int num_pages = 0;
 887        int flags;
 888        int ret;
 889        struct timespec mtime = current_time(inode);
 890        size_t count = iov_iter_count(iter);
 891        loff_t pos = iocb->ki_pos;
 892        bool write = iov_iter_rw(iter) == WRITE;
 893
 894        if (write && ceph_snap(file_inode(file)) != CEPH_NOSNAP)
 895                return -EROFS;
 896
 897        dout("sync_direct_read_write (%s) on file %p %lld~%u\n",
 898             (write ? "write" : "read"), file, pos, (unsigned)count);
 899
 900        ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
 901        if (ret < 0)
 902                return ret;
 903
 904        if (write) {
 905                int ret2 = invalidate_inode_pages2_range(inode->i_mapping,
 906                                        pos >> PAGE_SHIFT,
 907                                        (pos + count) >> PAGE_SHIFT);
 908                if (ret2 < 0)
 909                        dout("invalidate_inode_pages2_range returned %d\n", ret);
 910
 911                flags = CEPH_OSD_FLAG_ORDERSNAP |
 912                        CEPH_OSD_FLAG_ONDISK |
 913                        CEPH_OSD_FLAG_WRITE;
 914        } else {
 915                flags = CEPH_OSD_FLAG_READ;
 916        }
 917
 918        while (iov_iter_count(iter) > 0) {
 919                u64 size = dio_get_pagev_size(iter);
 920                size_t start = 0;
 921                ssize_t len;
 922
 923                vino = ceph_vino(inode);
 924                req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
 925                                            vino, pos, &size, 0,
 926                                            /*include a 'startsync' command*/
 927                                            write ? 2 : 1,
 928                                            write ? CEPH_OSD_OP_WRITE :
 929                                                    CEPH_OSD_OP_READ,
 930                                            flags, snapc,
 931                                            ci->i_truncate_seq,
 932                                            ci->i_truncate_size,
 933                                            false);
 934                if (IS_ERR(req)) {
 935                        ret = PTR_ERR(req);
 936                        break;
 937                }
 938
 939                len = size;
 940                pages = dio_get_pages_alloc(iter, len, &start, &num_pages);
 941                if (IS_ERR(pages)) {
 942                        ceph_osdc_put_request(req);
 943                        ret = PTR_ERR(pages);
 944                        break;
 945                }
 946
 947                /*
 948                 * To simplify error handling, allow AIO when IO within i_size
 949                 * or IO can be satisfied by single OSD request.
 950                 */
 951                if (pos == iocb->ki_pos && !is_sync_kiocb(iocb) &&
 952                    (len == count || pos + count <= i_size_read(inode))) {
 953                        aio_req = kzalloc(sizeof(*aio_req), GFP_KERNEL);
 954                        if (aio_req) {
 955                                aio_req->iocb = iocb;
 956                                aio_req->write = write;
 957                                INIT_LIST_HEAD(&aio_req->osd_reqs);
 958                                if (write) {
 959                                        aio_req->mtime = mtime;
 960                                        swap(aio_req->prealloc_cf, *pcf);
 961                                }
 962                        }
 963                        /* ignore error */
 964                }
 965
 966                if (write) {
 967                        /*
 968                         * throw out any page cache pages in this range. this
 969                         * may block.
 970                         */
 971                        truncate_inode_pages_range(inode->i_mapping, pos,
 972                                        (pos+len) | (PAGE_SIZE - 1));
 973
 974                        osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
 975                        req->r_mtime = mtime;
 976                }
 977
 978                osd_req_op_extent_osd_data_pages(req, 0, pages, len, start,
 979                                                 false, false);
 980
 981                if (aio_req) {
 982                        aio_req->total_len += len;
 983                        aio_req->num_reqs++;
 984                        atomic_inc(&aio_req->pending_reqs);
 985
 986                        req->r_callback = ceph_aio_complete_req;
 987                        req->r_inode = inode;
 988                        req->r_priv = aio_req;
 989                        list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs);
 990
 991                        pos += len;
 992                        iov_iter_advance(iter, len);
 993                        continue;
 994                }
 995
 996                ret = ceph_osdc_start_request(req->r_osdc, req, false);
 997                if (!ret)
 998                        ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
 999
1000                size = i_size_read(inode);
1001                if (!write) {
1002                        if (ret == -ENOENT)
1003                                ret = 0;
1004                        if (ret >= 0 && ret < len && pos + ret < size) {
1005                                int zlen = min_t(size_t, len - ret,
1006                                                 size - pos - ret);
1007                                ceph_zero_page_vector_range(start + ret, zlen,
1008                                                            pages);
1009                                ret += zlen;
1010                        }
1011                        if (ret >= 0)
1012                                len = ret;
1013                }
1014
1015                ceph_put_page_vector(pages, num_pages, !write);
1016
1017                ceph_osdc_put_request(req);
1018                if (ret < 0)
1019                        break;
1020
1021                pos += len;
1022                iov_iter_advance(iter, len);
1023
1024                if (!write && pos >= size)
1025                        break;
1026
1027                if (write && pos > size) {
1028                        if (ceph_inode_set_size(inode, pos))
1029                                ceph_check_caps(ceph_inode(inode),
1030                                                CHECK_CAPS_AUTHONLY,
1031                                                NULL);
1032                }
1033        }
1034
1035        if (aio_req) {
1036                LIST_HEAD(osd_reqs);
1037
1038                if (aio_req->num_reqs == 0) {
1039                        kfree(aio_req);
1040                        return ret;
1041                }
1042
1043                ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR :
1044                                              CEPH_CAP_FILE_RD);
1045
1046                list_splice(&aio_req->osd_reqs, &osd_reqs);
1047                while (!list_empty(&osd_reqs)) {
1048                        req = list_first_entry(&osd_reqs,
1049                                               struct ceph_osd_request,
1050                                               r_unsafe_item);
1051                        list_del_init(&req->r_unsafe_item);
1052                        if (ret >= 0)
1053                                ret = ceph_osdc_start_request(req->r_osdc,
1054                                                              req, false);
1055                        if (ret < 0) {
1056                                req->r_result = ret;
1057                                ceph_aio_complete_req(req);
1058                        }
1059                }
1060                return -EIOCBQUEUED;
1061        }
1062
1063        if (ret != -EOLDSNAPC && pos > iocb->ki_pos) {
1064                ret = pos - iocb->ki_pos;
1065                iocb->ki_pos = pos;
1066        }
1067        return ret;
1068}
1069
1070/*
1071 * Synchronous write, straight from __user pointer or user pages.
1072 *
1073 * If write spans object boundary, just do multiple writes.  (For a
1074 * correct atomic write, we should e.g. take write locks on all
1075 * objects, rollback on failure, etc.)
1076 */
1077static ssize_t
1078ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
1079                struct ceph_snap_context *snapc)
1080{
1081        struct file *file = iocb->ki_filp;
1082        struct inode *inode = file_inode(file);
1083        struct ceph_inode_info *ci = ceph_inode(inode);
1084        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1085        struct ceph_vino vino;
1086        struct ceph_osd_request *req;
1087        struct page **pages;
1088        u64 len;
1089        int num_pages;
1090        int written = 0;
1091        int flags;
1092        int check_caps = 0;
1093        int ret;
1094        struct timespec mtime = current_time(inode);
1095        size_t count = iov_iter_count(from);
1096
1097        if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
1098                return -EROFS;
1099
1100        dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);
1101
1102        ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
1103        if (ret < 0)
1104                return ret;
1105
1106        ret = invalidate_inode_pages2_range(inode->i_mapping,
1107                                            pos >> PAGE_SHIFT,
1108                                            (pos + count) >> PAGE_SHIFT);
1109        if (ret < 0)
1110                dout("invalidate_inode_pages2_range returned %d\n", ret);
1111
1112        flags = CEPH_OSD_FLAG_ORDERSNAP |
1113                CEPH_OSD_FLAG_ONDISK |
1114                CEPH_OSD_FLAG_WRITE |
1115                CEPH_OSD_FLAG_ACK;
1116
1117        while ((len = iov_iter_count(from)) > 0) {
1118                size_t left;
1119                int n;
1120
1121                vino = ceph_vino(inode);
1122                req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1123                                            vino, pos, &len, 0, 1,
1124                                            CEPH_OSD_OP_WRITE, flags, snapc,
1125                                            ci->i_truncate_seq,
1126                                            ci->i_truncate_size,
1127                                            false);
1128                if (IS_ERR(req)) {
1129                        ret = PTR_ERR(req);
1130                        break;
1131                }
1132
1133                /*
1134                 * write from beginning of first page,
1135                 * regardless of io alignment
1136                 */
1137                num_pages = (len + PAGE_SIZE - 1) >> PAGE_SHIFT;
1138
1139                pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1140                if (IS_ERR(pages)) {
1141                        ret = PTR_ERR(pages);
1142                        goto out;
1143                }
1144
1145                left = len;
1146                for (n = 0; n < num_pages; n++) {
1147                        size_t plen = min_t(size_t, left, PAGE_SIZE);
1148                        ret = copy_page_from_iter(pages[n], 0, plen, from);
1149                        if (ret != plen) {
1150                                ret = -EFAULT;
1151                                break;
1152                        }
1153                        left -= ret;
1154                }
1155
1156                if (ret < 0) {
1157                        ceph_release_page_vector(pages, num_pages);
1158                        goto out;
1159                }
1160
1161                /* get a second commit callback */
1162                req->r_unsafe_callback = ceph_sync_write_unsafe;
1163                req->r_inode = inode;
1164
1165                osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
1166                                                false, true);
1167
1168                req->r_mtime = mtime;
1169                ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1170                if (!ret)
1171                        ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
1172
1173out:
1174                ceph_osdc_put_request(req);
1175                if (ret == 0) {
1176                        pos += len;
1177                        written += len;
1178
1179                        if (pos > i_size_read(inode)) {
1180                                check_caps = ceph_inode_set_size(inode, pos);
1181                                if (check_caps)
1182                                        ceph_check_caps(ceph_inode(inode),
1183                                                        CHECK_CAPS_AUTHONLY,
1184                                                        NULL);
1185                        }
1186                } else
1187                        break;
1188        }
1189
1190        if (ret != -EOLDSNAPC && written > 0) {
1191                ret = written;
1192                iocb->ki_pos = pos;
1193        }
1194        return ret;
1195}
1196
1197/*
1198 * Wrap generic_file_aio_read with checks for cap bits on the inode.
1199 * Atomically grab references, so that those bits are not released
1200 * back to the MDS mid-read.
1201 *
1202 * Hmm, the sync read case isn't actually async... should it be?
1203 */
1204static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
1205{
1206        struct file *filp = iocb->ki_filp;
1207        struct ceph_file_info *fi = filp->private_data;
1208        size_t len = iov_iter_count(to);
1209        struct inode *inode = file_inode(filp);
1210        struct ceph_inode_info *ci = ceph_inode(inode);
1211        struct page *pinned_page = NULL;
1212        ssize_t ret;
1213        int want, got = 0;
1214        int retry_op = 0, read = 0;
1215
1216again:
1217        dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
1218             inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
1219
1220        if (fi->fmode & CEPH_FILE_MODE_LAZY)
1221                want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
1222        else
1223                want = CEPH_CAP_FILE_CACHE;
1224        ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
1225        if (ret < 0)
1226                return ret;
1227
1228        if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
1229            (iocb->ki_flags & IOCB_DIRECT) ||
1230            (fi->flags & CEPH_F_SYNC)) {
1231
1232                dout("aio_sync_read %p %llx.%llx %llu~%u got cap refs on %s\n",
1233                     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
1234                     ceph_cap_string(got));
1235
1236                if (ci->i_inline_version == CEPH_INLINE_NONE) {
1237                        if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) {
1238                                ret = ceph_direct_read_write(iocb, to,
1239                                                             NULL, NULL);
1240                                if (ret >= 0 && ret < len)
1241                                        retry_op = CHECK_EOF;
1242                        } else {
1243                                ret = ceph_sync_read(iocb, to, &retry_op);
1244                        }
1245                } else {
1246                        retry_op = READ_INLINE;
1247                }
1248        } else {
1249                dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
1250                     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
1251                     ceph_cap_string(got));
1252
1253                ret = generic_file_read_iter(iocb, to);
1254        }
1255        dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
1256             inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
1257        if (pinned_page) {
1258                put_page(pinned_page);
1259                pinned_page = NULL;
1260        }
1261        ceph_put_cap_refs(ci, got);
1262        if (retry_op > HAVE_RETRIED && ret >= 0) {
1263                int statret;
1264                struct page *page = NULL;
1265                loff_t i_size;
1266                if (retry_op == READ_INLINE) {
1267                        page = __page_cache_alloc(GFP_KERNEL);
1268                        if (!page)
1269                                return -ENOMEM;
1270                }
1271
1272                statret = __ceph_do_getattr(inode, page,
1273                                            CEPH_STAT_CAP_INLINE_DATA, !!page);
1274                if (statret < 0) {
1275                        if (page)
1276                                __free_page(page);
1277                        if (statret == -ENODATA) {
1278                                BUG_ON(retry_op != READ_INLINE);
1279                                goto again;
1280                        }
1281                        return statret;
1282                }
1283
1284                i_size = i_size_read(inode);
1285                if (retry_op == READ_INLINE) {
1286                        BUG_ON(ret > 0 || read > 0);
1287                        if (iocb->ki_pos < i_size &&
1288                            iocb->ki_pos < PAGE_SIZE) {
1289                                loff_t end = min_t(loff_t, i_size,
1290                                                   iocb->ki_pos + len);
1291                                end = min_t(loff_t, end, PAGE_SIZE);
1292                                if (statret < end)
1293                                        zero_user_segment(page, statret, end);
1294                                ret = copy_page_to_iter(page,
1295                                                iocb->ki_pos & ~PAGE_MASK,
1296                                                end - iocb->ki_pos, to);
1297                                iocb->ki_pos += ret;
1298                                read += ret;
1299                        }
1300                        if (iocb->ki_pos < i_size && read < len) {
1301                                size_t zlen = min_t(size_t, len - read,
1302                                                    i_size - iocb->ki_pos);
1303                                ret = iov_iter_zero(zlen, to);
1304                                iocb->ki_pos += ret;
1305                                read += ret;
1306                        }
1307                        __free_pages(page, 0);
1308                        return read;
1309                }
1310
1311                /* hit EOF or hole? */
1312                if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
1313                    ret < len) {
1314                        dout("sync_read hit hole, ppos %lld < size %lld"
1315                             ", reading more\n", iocb->ki_pos, i_size);
1316
1317                        read += ret;
1318                        len -= ret;
1319                        retry_op = HAVE_RETRIED;
1320                        goto again;
1321                }
1322        }
1323
1324        if (ret >= 0)
1325                ret += read;
1326
1327        return ret;
1328}
1329
1330/*
1331 * Take cap references to avoid releasing caps to MDS mid-write.
1332 *
1333 * If we are synchronous, and write with an old snap context, the OSD
1334 * may return EOLDSNAPC.  In that case, retry the write.. _after_
1335 * dropping our cap refs and allowing the pending snap to logically
1336 * complete _before_ this write occurs.
1337 *
1338 * If we are near ENOSPC, write synchronously.
1339 */
1340static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
1341{
1342        struct file *file = iocb->ki_filp;
1343        struct ceph_file_info *fi = file->private_data;
1344        struct inode *inode = file_inode(file);
1345        struct ceph_inode_info *ci = ceph_inode(inode);
1346        struct ceph_osd_client *osdc =
1347                &ceph_sb_to_client(inode->i_sb)->client->osdc;
1348        struct ceph_cap_flush *prealloc_cf;
1349        ssize_t count, written = 0;
1350        int err, want, got;
1351        loff_t pos;
1352
1353        if (ceph_snap(inode) != CEPH_NOSNAP)
1354                return -EROFS;
1355
1356        prealloc_cf = ceph_alloc_cap_flush();
1357        if (!prealloc_cf)
1358                return -ENOMEM;
1359
1360        inode_lock(inode);
1361
1362        /* We can write back this queue in page reclaim */
1363        current->backing_dev_info = inode_to_bdi(inode);
1364
1365        if (iocb->ki_flags & IOCB_APPEND) {
1366                err = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
1367                if (err < 0)
1368                        goto out;
1369        }
1370
1371        err = generic_write_checks(iocb, from);
1372        if (err <= 0)
1373                goto out;
1374
1375        pos = iocb->ki_pos;
1376        count = iov_iter_count(from);
1377        err = file_remove_privs(file);
1378        if (err)
1379                goto out;
1380
1381        err = file_update_time(file);
1382        if (err)
1383                goto out;
1384
1385        if (ci->i_inline_version != CEPH_INLINE_NONE) {
1386                err = ceph_uninline_data(file, NULL);
1387                if (err < 0)
1388                        goto out;
1389        }
1390
1391retry_snap:
1392        if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL)) {
1393                err = -ENOSPC;
1394                goto out;
1395        }
1396
1397        dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
1398             inode, ceph_vinop(inode), pos, count, i_size_read(inode));
1399        if (fi->fmode & CEPH_FILE_MODE_LAZY)
1400                want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1401        else
1402                want = CEPH_CAP_FILE_BUFFER;
1403        got = 0;
1404        err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count,
1405                            &got, NULL);
1406        if (err < 0)
1407                goto out;
1408
1409        dout("aio_write %p %llx.%llx %llu~%zd got cap refs on %s\n",
1410             inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
1411
1412        if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
1413            (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
1414                struct ceph_snap_context *snapc;
1415                struct iov_iter data;
1416                inode_unlock(inode);
1417
1418                spin_lock(&ci->i_ceph_lock);
1419                if (__ceph_have_pending_cap_snap(ci)) {
1420                        struct ceph_cap_snap *capsnap =
1421                                        list_last_entry(&ci->i_cap_snaps,
1422                                                        struct ceph_cap_snap,
1423                                                        ci_item);
1424                        snapc = ceph_get_snap_context(capsnap->context);
1425                } else {
1426                        BUG_ON(!ci->i_head_snapc);
1427                        snapc = ceph_get_snap_context(ci->i_head_snapc);
1428                }
1429                spin_unlock(&ci->i_ceph_lock);
1430
1431                /* we might need to revert back to that point */
1432                data = *from;
1433                if (iocb->ki_flags & IOCB_DIRECT)
1434                        written = ceph_direct_read_write(iocb, &data, snapc,
1435                                                         &prealloc_cf);
1436                else
1437                        written = ceph_sync_write(iocb, &data, pos, snapc);
1438                if (written == -EOLDSNAPC) {
1439                        dout("aio_write %p %llx.%llx %llu~%u"
1440                                "got EOLDSNAPC, retrying\n",
1441                                inode, ceph_vinop(inode),
1442                                pos, (unsigned)count);
1443                        inode_lock(inode);
1444                        goto retry_snap;
1445                }
1446                if (written > 0)
1447                        iov_iter_advance(from, written);
1448                ceph_put_snap_context(snapc);
1449        } else {
1450                /*
1451                 * No need to acquire the i_truncate_mutex. Because
1452                 * the MDS revokes Fwb caps before sending truncate
1453                 * message to us. We can't get Fwb cap while there
1454                 * are pending vmtruncate. So write and vmtruncate
1455                 * can not run at the same time
1456                 */
1457                written = generic_perform_write(file, from, pos);
1458                if (likely(written >= 0))
1459                        iocb->ki_pos = pos + written;
1460                inode_unlock(inode);
1461        }
1462
1463        if (written >= 0) {
1464                int dirty;
1465                spin_lock(&ci->i_ceph_lock);
1466                ci->i_inline_version = CEPH_INLINE_NONE;
1467                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
1468                                               &prealloc_cf);
1469                spin_unlock(&ci->i_ceph_lock);
1470                if (dirty)
1471                        __mark_inode_dirty(inode, dirty);
1472        }
1473
1474        dout("aio_write %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
1475             inode, ceph_vinop(inode), pos, (unsigned)count,
1476             ceph_cap_string(got));
1477        ceph_put_cap_refs(ci, got);
1478
1479        if (written >= 0) {
1480                if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_NEARFULL))
1481                        iocb->ki_flags |= IOCB_DSYNC;
1482
1483                written = generic_write_sync(iocb, written);
1484        }
1485
1486        goto out_unlocked;
1487
1488out:
1489        inode_unlock(inode);
1490out_unlocked:
1491        ceph_free_cap_flush(prealloc_cf);
1492        current->backing_dev_info = NULL;
1493        return written ? written : err;
1494}
1495
1496/*
1497 * llseek.  be sure to verify file size on SEEK_END.
1498 */
1499static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
1500{
1501        struct inode *inode = file->f_mapping->host;
1502        loff_t i_size;
1503        loff_t ret;
1504
1505        inode_lock(inode);
1506
1507        if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
1508                ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
1509                if (ret < 0)
1510                        goto out;
1511        }
1512
1513        i_size = i_size_read(inode);
1514        switch (whence) {
1515        case SEEK_END:
1516                offset += i_size;
1517                break;
1518        case SEEK_CUR:
1519                /*
1520                 * Here we special-case the lseek(fd, 0, SEEK_CUR)
1521                 * position-querying operation.  Avoid rewriting the "same"
1522                 * f_pos value back to the file because a concurrent read(),
1523                 * write() or lseek() might have altered it
1524                 */
1525                if (offset == 0) {
1526                        ret = file->f_pos;
1527                        goto out;
1528                }
1529                offset += file->f_pos;
1530                break;
1531        case SEEK_DATA:
1532                if (offset >= i_size) {
1533                        ret = -ENXIO;
1534                        goto out;
1535                }
1536                break;
1537        case SEEK_HOLE:
1538                if (offset >= i_size) {
1539                        ret = -ENXIO;
1540                        goto out;
1541                }
1542                offset = i_size;
1543                break;
1544        }
1545
1546        ret = vfs_setpos(file, offset, inode->i_sb->s_maxbytes);
1547
1548out:
1549        inode_unlock(inode);
1550        return ret;
1551}
1552
1553static inline void ceph_zero_partial_page(
1554        struct inode *inode, loff_t offset, unsigned size)
1555{
1556        struct page *page;
1557        pgoff_t index = offset >> PAGE_SHIFT;
1558
1559        page = find_lock_page(inode->i_mapping, index);
1560        if (page) {
1561                wait_on_page_writeback(page);
1562                zero_user(page, offset & (PAGE_SIZE - 1), size);
1563                unlock_page(page);
1564                put_page(page);
1565        }
1566}
1567
1568static void ceph_zero_pagecache_range(struct inode *inode, loff_t offset,
1569                                      loff_t length)
1570{
1571        loff_t nearly = round_up(offset, PAGE_SIZE);
1572        if (offset < nearly) {
1573                loff_t size = nearly - offset;
1574                if (length < size)
1575                        size = length;
1576                ceph_zero_partial_page(inode, offset, size);
1577                offset += size;
1578                length -= size;
1579        }
1580        if (length >= PAGE_SIZE) {
1581                loff_t size = round_down(length, PAGE_SIZE);
1582                truncate_pagecache_range(inode, offset, offset + size - 1);
1583                offset += size;
1584                length -= size;
1585        }
1586        if (length)
1587                ceph_zero_partial_page(inode, offset, length);
1588}
1589
1590static int ceph_zero_partial_object(struct inode *inode,
1591                                    loff_t offset, loff_t *length)
1592{
1593        struct ceph_inode_info *ci = ceph_inode(inode);
1594        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1595        struct ceph_osd_request *req;
1596        int ret = 0;
1597        loff_t zero = 0;
1598        int op;
1599
1600        if (!length) {
1601                op = offset ? CEPH_OSD_OP_DELETE : CEPH_OSD_OP_TRUNCATE;
1602                length = &zero;
1603        } else {
1604                op = CEPH_OSD_OP_ZERO;
1605        }
1606
1607        req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1608                                        ceph_vino(inode),
1609                                        offset, length,
1610                                        0, 1, op,
1611                                        CEPH_OSD_FLAG_WRITE |
1612                                        CEPH_OSD_FLAG_ONDISK,
1613                                        NULL, 0, 0, false);
1614        if (IS_ERR(req)) {
1615                ret = PTR_ERR(req);
1616                goto out;
1617        }
1618
1619        req->r_mtime = inode->i_mtime;
1620        ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1621        if (!ret) {
1622                ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
1623                if (ret == -ENOENT)
1624                        ret = 0;
1625        }
1626        ceph_osdc_put_request(req);
1627
1628out:
1629        return ret;
1630}
1631
1632static int ceph_zero_objects(struct inode *inode, loff_t offset, loff_t length)
1633{
1634        int ret = 0;
1635        struct ceph_inode_info *ci = ceph_inode(inode);
1636        s32 stripe_unit = ci->i_layout.stripe_unit;
1637        s32 stripe_count = ci->i_layout.stripe_count;
1638        s32 object_size = ci->i_layout.object_size;
1639        u64 object_set_size = object_size * stripe_count;
1640        u64 nearly, t;
1641
1642        /* round offset up to next period boundary */
1643        nearly = offset + object_set_size - 1;
1644        t = nearly;
1645        nearly -= do_div(t, object_set_size);
1646
1647        while (length && offset < nearly) {
1648                loff_t size = length;
1649                ret = ceph_zero_partial_object(inode, offset, &size);
1650                if (ret < 0)
1651                        return ret;
1652                offset += size;
1653                length -= size;
1654        }
1655        while (length >= object_set_size) {
1656                int i;
1657                loff_t pos = offset;
1658                for (i = 0; i < stripe_count; ++i) {
1659                        ret = ceph_zero_partial_object(inode, pos, NULL);
1660                        if (ret < 0)
1661                                return ret;
1662                        pos += stripe_unit;
1663                }
1664                offset += object_set_size;
1665                length -= object_set_size;
1666        }
1667        while (length) {
1668                loff_t size = length;
1669                ret = ceph_zero_partial_object(inode, offset, &size);
1670                if (ret < 0)
1671                        return ret;
1672                offset += size;
1673                length -= size;
1674        }
1675        return ret;
1676}
1677
1678static long ceph_fallocate(struct file *file, int mode,
1679                                loff_t offset, loff_t length)
1680{
1681        struct ceph_file_info *fi = file->private_data;
1682        struct inode *inode = file_inode(file);
1683        struct ceph_inode_info *ci = ceph_inode(inode);
1684        struct ceph_osd_client *osdc =
1685                &ceph_inode_to_client(inode)->client->osdc;
1686        struct ceph_cap_flush *prealloc_cf;
1687        int want, got = 0;
1688        int dirty;
1689        int ret = 0;
1690        loff_t endoff = 0;
1691        loff_t size;
1692
1693        if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
1694                return -EOPNOTSUPP;
1695
1696        if (!S_ISREG(inode->i_mode))
1697                return -EOPNOTSUPP;
1698
1699        prealloc_cf = ceph_alloc_cap_flush();
1700        if (!prealloc_cf)
1701                return -ENOMEM;
1702
1703        inode_lock(inode);
1704
1705        if (ceph_snap(inode) != CEPH_NOSNAP) {
1706                ret = -EROFS;
1707                goto unlock;
1708        }
1709
1710        if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) &&
1711            !(mode & FALLOC_FL_PUNCH_HOLE)) {
1712                ret = -ENOSPC;
1713                goto unlock;
1714        }
1715
1716        if (ci->i_inline_version != CEPH_INLINE_NONE) {
1717                ret = ceph_uninline_data(file, NULL);
1718                if (ret < 0)
1719                        goto unlock;
1720        }
1721
1722        size = i_size_read(inode);
1723        if (!(mode & FALLOC_FL_KEEP_SIZE))
1724                endoff = offset + length;
1725
1726        if (fi->fmode & CEPH_FILE_MODE_LAZY)
1727                want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1728        else
1729                want = CEPH_CAP_FILE_BUFFER;
1730
1731        ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
1732        if (ret < 0)
1733                goto unlock;
1734
1735        if (mode & FALLOC_FL_PUNCH_HOLE) {
1736                if (offset < size)
1737                        ceph_zero_pagecache_range(inode, offset, length);
1738                ret = ceph_zero_objects(inode, offset, length);
1739        } else if (endoff > size) {
1740                truncate_pagecache_range(inode, size, -1);
1741                if (ceph_inode_set_size(inode, endoff))
1742                        ceph_check_caps(ceph_inode(inode),
1743                                CHECK_CAPS_AUTHONLY, NULL);
1744        }
1745
1746        if (!ret) {
1747                spin_lock(&ci->i_ceph_lock);
1748                ci->i_inline_version = CEPH_INLINE_NONE;
1749                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
1750                                               &prealloc_cf);
1751                spin_unlock(&ci->i_ceph_lock);
1752                if (dirty)
1753                        __mark_inode_dirty(inode, dirty);
1754        }
1755
1756        ceph_put_cap_refs(ci, got);
1757unlock:
1758        inode_unlock(inode);
1759        ceph_free_cap_flush(prealloc_cf);
1760        return ret;
1761}
1762
1763const struct file_operations ceph_file_fops = {
1764        .open = ceph_open,
1765        .release = ceph_release,
1766        .llseek = ceph_llseek,
1767        .read_iter = ceph_read_iter,
1768        .write_iter = ceph_write_iter,
1769        .mmap = ceph_mmap,
1770        .fsync = ceph_fsync,
1771        .lock = ceph_lock,
1772        .flock = ceph_flock,
1773        .splice_write = iter_file_splice_write,
1774        .unlocked_ioctl = ceph_ioctl,
1775        .compat_ioctl   = ceph_ioctl,
1776        .fallocate      = ceph_fallocate,
1777};
1778
1779