linux/fs/ceph/file.c
<<
>>
Prefs
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/module.h>
   4#include <linux/sched.h>
   5#include <linux/slab.h>
   6#include <linux/file.h>
   7#include <linux/mount.h>
   8#include <linux/namei.h>
   9#include <linux/writeback.h>
  10#include <linux/falloc.h>
  11
  12#include "super.h"
  13#include "mds_client.h"
  14#include "cache.h"
  15
  16static __le32 ceph_flags_sys2wire(u32 flags)
  17{
  18        u32 wire_flags = 0;
  19
  20        switch (flags & O_ACCMODE) {
  21        case O_RDONLY:
  22                wire_flags |= CEPH_O_RDONLY;
  23                break;
  24        case O_WRONLY:
  25                wire_flags |= CEPH_O_WRONLY;
  26                break;
  27        case O_RDWR:
  28                wire_flags |= CEPH_O_RDWR;
  29                break;
  30        }
  31
  32#define ceph_sys2wire(a) if (flags & a) { wire_flags |= CEPH_##a; flags &= ~a; }
  33
  34        ceph_sys2wire(O_CREAT);
  35        ceph_sys2wire(O_EXCL);
  36        ceph_sys2wire(O_TRUNC);
  37        ceph_sys2wire(O_DIRECTORY);
  38        ceph_sys2wire(O_NOFOLLOW);
  39
  40#undef ceph_sys2wire
  41
  42        if (flags)
  43                dout("unused open flags: %x", flags);
  44
  45        return cpu_to_le32(wire_flags);
  46}
  47
  48/*
  49 * Ceph file operations
  50 *
  51 * Implement basic open/close functionality, and implement
  52 * read/write.
  53 *
  54 * We implement three modes of file I/O:
  55 *  - buffered uses the generic_file_aio_{read,write} helpers
  56 *
  57 *  - synchronous is used when there is multi-client read/write
  58 *    sharing, avoids the page cache, and synchronously waits for an
  59 *    ack from the OSD.
  60 *
  61 *  - direct io takes the variant of the sync path that references
  62 *    user pages directly.
  63 *
  64 * fsync() flushes and waits on dirty pages, but just queues metadata
  65 * for writeback: since the MDS can recover size and mtime there is no
  66 * need to wait for MDS acknowledgement.
  67 */
  68
  69/*
  70 * Calculate the length sum of direct io vectors that can
  71 * be combined into one page vector.
  72 */
  73static size_t dio_get_pagev_size(const struct iov_iter *it)
  74{
  75    const struct iovec *iov = it->iov;
  76    const struct iovec *iovend = iov + it->nr_segs;
  77    size_t size;
  78
  79    size = iov->iov_len - it->iov_offset;
  80    /*
  81     * An iov can be page vectored when both the current tail
  82     * and the next base are page aligned.
  83     */
  84    while (PAGE_ALIGNED((iov->iov_base + iov->iov_len)) &&
  85           (++iov < iovend && PAGE_ALIGNED((iov->iov_base)))) {
  86        size += iov->iov_len;
  87    }
  88    dout("dio_get_pagevlen len = %zu\n", size);
  89    return size;
  90}
  91
  92/*
  93 * Allocate a page vector based on (@it, @nbytes).
  94 * The return value is the tuple describing a page vector,
  95 * that is (@pages, @page_align, @num_pages).
  96 */
  97static struct page **
  98dio_get_pages_alloc(const struct iov_iter *it, size_t nbytes,
  99                    size_t *page_align, int *num_pages)
 100{
 101        struct iov_iter tmp_it = *it;
 102        size_t align;
 103        struct page **pages;
 104        int ret = 0, idx, npages;
 105
 106        align = (unsigned long)(it->iov->iov_base + it->iov_offset) &
 107                (PAGE_SIZE - 1);
 108        npages = calc_pages_for(align, nbytes);
 109        pages = kvmalloc(sizeof(*pages) * npages, GFP_KERNEL);
 110        if (!pages)
 111                return ERR_PTR(-ENOMEM);
 112
 113        for (idx = 0; idx < npages; ) {
 114                size_t start;
 115                ret = iov_iter_get_pages(&tmp_it, pages + idx, nbytes,
 116                                         npages - idx, &start);
 117                if (ret < 0)
 118                        goto fail;
 119
 120                iov_iter_advance(&tmp_it, ret);
 121                nbytes -= ret;
 122                idx += (ret + start + PAGE_SIZE - 1) / PAGE_SIZE;
 123        }
 124
 125        BUG_ON(nbytes != 0);
 126        *num_pages = npages;
 127        *page_align = align;
 128        dout("dio_get_pages_alloc: got %d pages align %zu\n", npages, align);
 129        return pages;
 130fail:
 131        ceph_put_page_vector(pages, idx, false);
 132        return ERR_PTR(ret);
 133}
 134
 135/*
 136 * Prepare an open request.  Preallocate ceph_cap to avoid an
 137 * inopportune ENOMEM later.
 138 */
 139static struct ceph_mds_request *
 140prepare_open_request(struct super_block *sb, int flags, int create_mode)
 141{
 142        struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
 143        struct ceph_mds_client *mdsc = fsc->mdsc;
 144        struct ceph_mds_request *req;
 145        int want_auth = USE_ANY_MDS;
 146        int op = (flags & O_CREAT) ? CEPH_MDS_OP_CREATE : CEPH_MDS_OP_OPEN;
 147
 148        if (flags & (O_WRONLY|O_RDWR|O_CREAT|O_TRUNC))
 149                want_auth = USE_AUTH_MDS;
 150
 151        req = ceph_mdsc_create_request(mdsc, op, want_auth);
 152        if (IS_ERR(req))
 153                goto out;
 154        req->r_fmode = ceph_flags_to_mode(flags);
 155        req->r_args.open.flags = ceph_flags_sys2wire(flags);
 156        req->r_args.open.mode = cpu_to_le32(create_mode);
 157out:
 158        return req;
 159}
 160
 161/*
 162 * initialize private struct file data.
 163 * if we fail, clean up by dropping fmode reference on the ceph_inode
 164 */
 165static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
 166{
 167        struct ceph_file_info *cf;
 168        int ret = 0;
 169
 170        switch (inode->i_mode & S_IFMT) {
 171        case S_IFREG:
 172                ceph_fscache_register_inode_cookie(inode);
 173                ceph_fscache_file_set_cookie(inode, file);
 174        case S_IFDIR:
 175                dout("init_file %p %p 0%o (regular)\n", inode, file,
 176                     inode->i_mode);
 177                cf = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL);
 178                if (cf == NULL) {
 179                        ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
 180                        return -ENOMEM;
 181                }
 182                cf->fmode = fmode;
 183                cf->next_offset = 2;
 184                cf->readdir_cache_idx = -1;
 185                file->private_data = cf;
 186                BUG_ON(inode->i_fop->release != ceph_release);
 187                break;
 188
 189        case S_IFLNK:
 190                dout("init_file %p %p 0%o (symlink)\n", inode, file,
 191                     inode->i_mode);
 192                ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
 193                break;
 194
 195        default:
 196                dout("init_file %p %p 0%o (special)\n", inode, file,
 197                     inode->i_mode);
 198                /*
 199                 * we need to drop the open ref now, since we don't
 200                 * have .release set to ceph_release.
 201                 */
 202                ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
 203                BUG_ON(inode->i_fop->release == ceph_release);
 204
 205                /* call the proper open fop */
 206                ret = inode->i_fop->open(inode, file);
 207        }
 208        return ret;
 209}
 210
 211/*
 212 * try renew caps after session gets killed.
 213 */
 214int ceph_renew_caps(struct inode *inode)
 215{
 216        struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 217        struct ceph_inode_info *ci = ceph_inode(inode);
 218        struct ceph_mds_request *req;
 219        int err, flags, wanted;
 220
 221        spin_lock(&ci->i_ceph_lock);
 222        wanted = __ceph_caps_file_wanted(ci);
 223        if (__ceph_is_any_real_caps(ci) &&
 224            (!(wanted & CEPH_CAP_ANY_WR) || ci->i_auth_cap)) {
 225                int issued = __ceph_caps_issued(ci, NULL);
 226                spin_unlock(&ci->i_ceph_lock);
 227                dout("renew caps %p want %s issued %s updating mds_wanted\n",
 228                     inode, ceph_cap_string(wanted), ceph_cap_string(issued));
 229                ceph_check_caps(ci, 0, NULL);
 230                return 0;
 231        }
 232        spin_unlock(&ci->i_ceph_lock);
 233
 234        flags = 0;
 235        if ((wanted & CEPH_CAP_FILE_RD) && (wanted & CEPH_CAP_FILE_WR))
 236                flags = O_RDWR;
 237        else if (wanted & CEPH_CAP_FILE_RD)
 238                flags = O_RDONLY;
 239        else if (wanted & CEPH_CAP_FILE_WR)
 240                flags = O_WRONLY;
 241#ifdef O_LAZY
 242        if (wanted & CEPH_CAP_FILE_LAZYIO)
 243                flags |= O_LAZY;
 244#endif
 245
 246        req = prepare_open_request(inode->i_sb, flags, 0);
 247        if (IS_ERR(req)) {
 248                err = PTR_ERR(req);
 249                goto out;
 250        }
 251
 252        req->r_inode = inode;
 253        ihold(inode);
 254        req->r_num_caps = 1;
 255        req->r_fmode = -1;
 256
 257        err = ceph_mdsc_do_request(mdsc, NULL, req);
 258        ceph_mdsc_put_request(req);
 259out:
 260        dout("renew caps %p open result=%d\n", inode, err);
 261        return err < 0 ? err : 0;
 262}
 263
 264/*
 265 * If we already have the requisite capabilities, we can satisfy
 266 * the open request locally (no need to request new caps from the
 267 * MDS).  We do, however, need to inform the MDS (asynchronously)
 268 * if our wanted caps set expands.
 269 */
 270int ceph_open(struct inode *inode, struct file *file)
 271{
 272        struct ceph_inode_info *ci = ceph_inode(inode);
 273        struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
 274        struct ceph_mds_client *mdsc = fsc->mdsc;
 275        struct ceph_mds_request *req;
 276        struct ceph_file_info *cf = file->private_data;
 277        int err;
 278        int flags, fmode, wanted;
 279
 280        if (cf) {
 281                dout("open file %p is already opened\n", file);
 282                return 0;
 283        }
 284
 285        /* filter out O_CREAT|O_EXCL; vfs did that already.  yuck. */
 286        flags = file->f_flags & ~(O_CREAT|O_EXCL);
 287        if (S_ISDIR(inode->i_mode))
 288                flags = O_DIRECTORY;  /* mds likes to know */
 289
 290        dout("open inode %p ino %llx.%llx file %p flags %d (%d)\n", inode,
 291             ceph_vinop(inode), file, flags, file->f_flags);
 292        fmode = ceph_flags_to_mode(flags);
 293        wanted = ceph_caps_for_mode(fmode);
 294
 295        /* snapped files are read-only */
 296        if (ceph_snap(inode) != CEPH_NOSNAP && (file->f_mode & FMODE_WRITE))
 297                return -EROFS;
 298
 299        /* trivially open snapdir */
 300        if (ceph_snap(inode) == CEPH_SNAPDIR) {
 301                spin_lock(&ci->i_ceph_lock);
 302                __ceph_get_fmode(ci, fmode);
 303                spin_unlock(&ci->i_ceph_lock);
 304                return ceph_init_file(inode, file, fmode);
 305        }
 306
 307        /*
 308         * No need to block if we have caps on the auth MDS (for
 309         * write) or any MDS (for read).  Update wanted set
 310         * asynchronously.
 311         */
 312        spin_lock(&ci->i_ceph_lock);
 313        if (__ceph_is_any_real_caps(ci) &&
 314            (((fmode & CEPH_FILE_MODE_WR) == 0) || ci->i_auth_cap)) {
 315                int mds_wanted = __ceph_caps_mds_wanted(ci, true);
 316                int issued = __ceph_caps_issued(ci, NULL);
 317
 318                dout("open %p fmode %d want %s issued %s using existing\n",
 319                     inode, fmode, ceph_cap_string(wanted),
 320                     ceph_cap_string(issued));
 321                __ceph_get_fmode(ci, fmode);
 322                spin_unlock(&ci->i_ceph_lock);
 323
 324                /* adjust wanted? */
 325                if ((issued & wanted) != wanted &&
 326                    (mds_wanted & wanted) != wanted &&
 327                    ceph_snap(inode) != CEPH_SNAPDIR)
 328                        ceph_check_caps(ci, 0, NULL);
 329
 330                return ceph_init_file(inode, file, fmode);
 331        } else if (ceph_snap(inode) != CEPH_NOSNAP &&
 332                   (ci->i_snap_caps & wanted) == wanted) {
 333                __ceph_get_fmode(ci, fmode);
 334                spin_unlock(&ci->i_ceph_lock);
 335                return ceph_init_file(inode, file, fmode);
 336        }
 337
 338        spin_unlock(&ci->i_ceph_lock);
 339
 340        dout("open fmode %d wants %s\n", fmode, ceph_cap_string(wanted));
 341        req = prepare_open_request(inode->i_sb, flags, 0);
 342        if (IS_ERR(req)) {
 343                err = PTR_ERR(req);
 344                goto out;
 345        }
 346        req->r_inode = inode;
 347        ihold(inode);
 348
 349        req->r_num_caps = 1;
 350        err = ceph_mdsc_do_request(mdsc, NULL, req);
 351        if (!err)
 352                err = ceph_init_file(inode, file, req->r_fmode);
 353        ceph_mdsc_put_request(req);
 354        dout("open result=%d on %llx.%llx\n", err, ceph_vinop(inode));
 355out:
 356        return err;
 357}
 358
 359
 360/*
 361 * Do a lookup + open with a single request.  If we get a non-existent
 362 * file or symlink, return 1 so the VFS can retry.
 363 */
 364int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 365                     struct file *file, unsigned flags, umode_t mode,
 366                     int *opened)
 367{
 368        struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
 369        struct ceph_mds_client *mdsc = fsc->mdsc;
 370        struct ceph_mds_request *req;
 371        struct dentry *dn;
 372        struct ceph_acls_info acls = {};
 373       int mask;
 374        int err;
 375
 376        dout("atomic_open %p dentry %p '%pd' %s flags %d mode 0%o\n",
 377             dir, dentry, dentry,
 378             d_unhashed(dentry) ? "unhashed" : "hashed", flags, mode);
 379
 380        if (dentry->d_name.len > NAME_MAX)
 381                return -ENAMETOOLONG;
 382
 383        if (flags & O_CREAT) {
 384                err = ceph_pre_init_acls(dir, &mode, &acls);
 385                if (err < 0)
 386                        return err;
 387        }
 388
 389        /* do the open */
 390        req = prepare_open_request(dir->i_sb, flags, mode);
 391        if (IS_ERR(req)) {
 392                err = PTR_ERR(req);
 393                goto out_acl;
 394        }
 395        req->r_dentry = dget(dentry);
 396        req->r_num_caps = 2;
 397        if (flags & O_CREAT) {
 398                req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
 399                req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
 400                if (acls.pagelist) {
 401                        req->r_pagelist = acls.pagelist;
 402                        acls.pagelist = NULL;
 403                }
 404        }
 405
 406       mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
 407       if (ceph_security_xattr_wanted(dir))
 408               mask |= CEPH_CAP_XATTR_SHARED;
 409       req->r_args.open.mask = cpu_to_le32(mask);
 410
 411        req->r_parent = dir;
 412        set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
 413        err = ceph_mdsc_do_request(mdsc,
 414                                   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
 415                                   req);
 416        err = ceph_handle_snapdir(req, dentry, err);
 417        if (err)
 418                goto out_req;
 419
 420        if ((flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
 421                err = ceph_handle_notrace_create(dir, dentry);
 422
 423        if (d_in_lookup(dentry)) {
 424                dn = ceph_finish_lookup(req, dentry, err);
 425                if (IS_ERR(dn))
 426                        err = PTR_ERR(dn);
 427        } else {
 428                /* we were given a hashed negative dentry */
 429                dn = NULL;
 430        }
 431        if (err)
 432                goto out_req;
 433        if (dn || d_really_is_negative(dentry) || d_is_symlink(dentry)) {
 434                /* make vfs retry on splice, ENOENT, or symlink */
 435                dout("atomic_open finish_no_open on dn %p\n", dn);
 436                err = finish_no_open(file, dn);
 437        } else {
 438                dout("atomic_open finish_open on dn %p\n", dn);
 439                if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
 440                        ceph_init_inode_acls(d_inode(dentry), &acls);
 441                        *opened |= FILE_CREATED;
 442                }
 443                err = finish_open(file, dentry, ceph_open, opened);
 444        }
 445out_req:
 446        if (!req->r_err && req->r_target_inode)
 447                ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode);
 448        ceph_mdsc_put_request(req);
 449out_acl:
 450        ceph_release_acls_info(&acls);
 451        dout("atomic_open result=%d\n", err);
 452        return err;
 453}
 454
 455int ceph_release(struct inode *inode, struct file *file)
 456{
 457        struct ceph_inode_info *ci = ceph_inode(inode);
 458        struct ceph_file_info *cf = file->private_data;
 459
 460        dout("release inode %p file %p\n", inode, file);
 461        ceph_put_fmode(ci, cf->fmode);
 462        if (cf->last_readdir)
 463                ceph_mdsc_put_request(cf->last_readdir);
 464        kfree(cf->last_name);
 465        kfree(cf->dir_info);
 466        kmem_cache_free(ceph_file_cachep, cf);
 467
 468        /* wake up anyone waiting for caps on this inode */
 469        wake_up_all(&ci->i_cap_wq);
 470        return 0;
 471}
 472
 473enum {
 474        HAVE_RETRIED = 1,
 475        CHECK_EOF =    2,
 476        READ_INLINE =  3,
 477};
 478
 479/*
 480 * Read a range of bytes striped over one or more objects.  Iterate over
 481 * objects we stripe over.  (That's not atomic, but good enough for now.)
 482 *
 483 * If we get a short result from the OSD, check against i_size; we need to
 484 * only return a short read to the caller if we hit EOF.
 485 */
 486static int striped_read(struct inode *inode,
 487                        u64 pos, u64 len,
 488                        struct page **pages, int num_pages,
 489                        int page_align, int *checkeof)
 490{
 491        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 492        struct ceph_inode_info *ci = ceph_inode(inode);
 493        u64 this_len;
 494        loff_t i_size;
 495        int page_idx;
 496        int ret, read = 0;
 497        bool hit_stripe, was_short;
 498
 499        /*
 500         * we may need to do multiple reads.  not atomic, unfortunately.
 501         */
 502more:
 503        this_len = len;
 504        page_idx = (page_align + read) >> PAGE_SHIFT;
 505        ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
 506                                  &ci->i_layout, pos, &this_len,
 507                                  ci->i_truncate_seq, ci->i_truncate_size,
 508                                  pages + page_idx, num_pages - page_idx,
 509                                  ((page_align + read) & ~PAGE_MASK));
 510        if (ret == -ENOENT)
 511                ret = 0;
 512        hit_stripe = this_len < len;
 513        was_short = ret >= 0 && ret < this_len;
 514        dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read,
 515             ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
 516
 517        i_size = i_size_read(inode);
 518        if (ret >= 0) {
 519                if (was_short && (pos + ret < i_size)) {
 520                        int zlen = min(this_len - ret, i_size - pos - ret);
 521                        int zoff = page_align + read + ret;
 522                        dout(" zero gap %llu to %llu\n",
 523                             pos + ret, pos + ret + zlen);
 524                        ceph_zero_page_vector_range(zoff, zlen, pages);
 525                        ret += zlen;
 526                }
 527
 528                read += ret;
 529                pos += ret;
 530                len -= ret;
 531
 532                /* hit stripe and need continue*/
 533                if (len && hit_stripe && pos < i_size)
 534                        goto more;
 535        }
 536
 537        if (read > 0) {
 538                ret = read;
 539                /* did we bounce off eof? */
 540                if (pos + len > i_size)
 541                        *checkeof = CHECK_EOF;
 542        }
 543
 544        dout("striped_read returns %d\n", ret);
 545        return ret;
 546}
 547
 548/*
 549 * Completely synchronous read and write methods.  Direct from __user
 550 * buffer to osd, or directly to user pages (if O_DIRECT).
 551 *
 552 * If the read spans object boundary, just do multiple reads.
 553 */
 554static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
 555                              int *checkeof)
 556{
 557        struct file *file = iocb->ki_filp;
 558        struct inode *inode = file_inode(file);
 559        struct page **pages;
 560        u64 off = iocb->ki_pos;
 561        int num_pages;
 562        ssize_t ret;
 563        size_t len = iov_iter_count(to);
 564
 565        dout("sync_read on file %p %llu~%u %s\n", file, off,
 566             (unsigned)len,
 567             (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
 568
 569        if (!len)
 570                return 0;
 571        /*
 572         * flush any page cache pages in this range.  this
 573         * will make concurrent normal and sync io slow,
 574         * but it will at least behave sensibly when they are
 575         * in sequence.
 576         */
 577        ret = filemap_write_and_wait_range(inode->i_mapping, off,
 578                                                off + len);
 579        if (ret < 0)
 580                return ret;
 581
 582        if (unlikely(to->type & ITER_PIPE)) {
 583                size_t page_off;
 584                ret = iov_iter_get_pages_alloc(to, &pages, len,
 585                                               &page_off);
 586                if (ret <= 0)
 587                        return -ENOMEM;
 588                num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
 589
 590                ret = striped_read(inode, off, ret, pages, num_pages,
 591                                   page_off, checkeof);
 592                if (ret > 0) {
 593                        iov_iter_advance(to, ret);
 594                        off += ret;
 595                } else {
 596                        iov_iter_advance(to, 0);
 597                }
 598                ceph_put_page_vector(pages, num_pages, false);
 599        } else {
 600                num_pages = calc_pages_for(off, len);
 601                pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
 602                if (IS_ERR(pages))
 603                        return PTR_ERR(pages);
 604
 605                ret = striped_read(inode, off, len, pages, num_pages,
 606                                   (off & ~PAGE_MASK), checkeof);
 607                if (ret > 0) {
 608                        int l, k = 0;
 609                        size_t left = ret;
 610
 611                        while (left) {
 612                                size_t page_off = off & ~PAGE_MASK;
 613                                size_t copy = min_t(size_t, left,
 614                                                    PAGE_SIZE - page_off);
 615                                l = copy_page_to_iter(pages[k++], page_off,
 616                                                      copy, to);
 617                                off += l;
 618                                left -= l;
 619                                if (l < copy)
 620                                        break;
 621                        }
 622                }
 623                ceph_release_page_vector(pages, num_pages);
 624        }
 625
 626        if (off > iocb->ki_pos) {
 627                ret = off - iocb->ki_pos;
 628                iocb->ki_pos = off;
 629        }
 630
 631        dout("sync_read result %zd\n", ret);
 632        return ret;
 633}
 634
 635struct ceph_aio_request {
 636        struct kiocb *iocb;
 637        size_t total_len;
 638        int write;
 639        int error;
 640        struct list_head osd_reqs;
 641        unsigned num_reqs;
 642        atomic_t pending_reqs;
 643        struct timespec mtime;
 644        struct ceph_cap_flush *prealloc_cf;
 645};
 646
 647struct ceph_aio_work {
 648        struct work_struct work;
 649        struct ceph_osd_request *req;
 650};
 651
 652static void ceph_aio_retry_work(struct work_struct *work);
 653
 654static void ceph_aio_complete(struct inode *inode,
 655                              struct ceph_aio_request *aio_req)
 656{
 657        struct ceph_inode_info *ci = ceph_inode(inode);
 658        int ret;
 659
 660        if (!atomic_dec_and_test(&aio_req->pending_reqs))
 661                return;
 662
 663        ret = aio_req->error;
 664        if (!ret)
 665                ret = aio_req->total_len;
 666
 667        dout("ceph_aio_complete %p rc %d\n", inode, ret);
 668
 669        if (ret >= 0 && aio_req->write) {
 670                int dirty;
 671
 672                loff_t endoff = aio_req->iocb->ki_pos + aio_req->total_len;
 673                if (endoff > i_size_read(inode)) {
 674                        if (ceph_inode_set_size(inode, endoff))
 675                                ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
 676                }
 677
 678                spin_lock(&ci->i_ceph_lock);
 679                ci->i_inline_version = CEPH_INLINE_NONE;
 680                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
 681                                               &aio_req->prealloc_cf);
 682                spin_unlock(&ci->i_ceph_lock);
 683                if (dirty)
 684                        __mark_inode_dirty(inode, dirty);
 685
 686        }
 687
 688        ceph_put_cap_refs(ci, (aio_req->write ? CEPH_CAP_FILE_WR :
 689                                                CEPH_CAP_FILE_RD));
 690
 691        aio_req->iocb->ki_complete(aio_req->iocb, ret, 0);
 692
 693        ceph_free_cap_flush(aio_req->prealloc_cf);
 694        kfree(aio_req);
 695}
 696
 697static void ceph_aio_complete_req(struct ceph_osd_request *req)
 698{
 699        int rc = req->r_result;
 700        struct inode *inode = req->r_inode;
 701        struct ceph_aio_request *aio_req = req->r_priv;
 702        struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
 703        int num_pages = calc_pages_for((u64)osd_data->alignment,
 704                                       osd_data->length);
 705
 706        dout("ceph_aio_complete_req %p rc %d bytes %llu\n",
 707             inode, rc, osd_data->length);
 708
 709        if (rc == -EOLDSNAPC) {
 710                struct ceph_aio_work *aio_work;
 711                BUG_ON(!aio_req->write);
 712
 713                aio_work = kmalloc(sizeof(*aio_work), GFP_NOFS);
 714                if (aio_work) {
 715                        INIT_WORK(&aio_work->work, ceph_aio_retry_work);
 716                        aio_work->req = req;
 717                        queue_work(ceph_inode_to_client(inode)->wb_wq,
 718                                   &aio_work->work);
 719                        return;
 720                }
 721                rc = -ENOMEM;
 722        } else if (!aio_req->write) {
 723                if (rc == -ENOENT)
 724                        rc = 0;
 725                if (rc >= 0 && osd_data->length > rc) {
 726                        int zoff = osd_data->alignment + rc;
 727                        int zlen = osd_data->length - rc;
 728                        /*
 729                         * If read is satisfied by single OSD request,
 730                         * it can pass EOF. Otherwise read is within
 731                         * i_size.
 732                         */
 733                        if (aio_req->num_reqs == 1) {
 734                                loff_t i_size = i_size_read(inode);
 735                                loff_t endoff = aio_req->iocb->ki_pos + rc;
 736                                if (endoff < i_size)
 737                                        zlen = min_t(size_t, zlen,
 738                                                     i_size - endoff);
 739                                aio_req->total_len = rc + zlen;
 740                        }
 741
 742                        if (zlen > 0)
 743                                ceph_zero_page_vector_range(zoff, zlen,
 744                                                            osd_data->pages);
 745                }
 746        }
 747
 748        ceph_put_page_vector(osd_data->pages, num_pages, !aio_req->write);
 749        ceph_osdc_put_request(req);
 750
 751        if (rc < 0)
 752                cmpxchg(&aio_req->error, 0, rc);
 753
 754        ceph_aio_complete(inode, aio_req);
 755        return;
 756}
 757
 758static void ceph_aio_retry_work(struct work_struct *work)
 759{
 760        struct ceph_aio_work *aio_work =
 761                container_of(work, struct ceph_aio_work, work);
 762        struct ceph_osd_request *orig_req = aio_work->req;
 763        struct ceph_aio_request *aio_req = orig_req->r_priv;
 764        struct inode *inode = orig_req->r_inode;
 765        struct ceph_inode_info *ci = ceph_inode(inode);
 766        struct ceph_snap_context *snapc;
 767        struct ceph_osd_request *req;
 768        int ret;
 769
 770        spin_lock(&ci->i_ceph_lock);
 771        if (__ceph_have_pending_cap_snap(ci)) {
 772                struct ceph_cap_snap *capsnap =
 773                        list_last_entry(&ci->i_cap_snaps,
 774                                        struct ceph_cap_snap,
 775                                        ci_item);
 776                snapc = ceph_get_snap_context(capsnap->context);
 777        } else {
 778                BUG_ON(!ci->i_head_snapc);
 779                snapc = ceph_get_snap_context(ci->i_head_snapc);
 780        }
 781        spin_unlock(&ci->i_ceph_lock);
 782
 783        req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2,
 784                        false, GFP_NOFS);
 785        if (!req) {
 786                ret = -ENOMEM;
 787                req = orig_req;
 788                goto out;
 789        }
 790
 791        req->r_flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_WRITE;
 792        ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc);
 793        ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid);
 794
 795        ret = ceph_osdc_alloc_messages(req, GFP_NOFS);
 796        if (ret) {
 797                ceph_osdc_put_request(req);
 798                req = orig_req;
 799                goto out;
 800        }
 801
 802        req->r_ops[0] = orig_req->r_ops[0];
 803        osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
 804
 805        req->r_mtime = aio_req->mtime;
 806        req->r_data_offset = req->r_ops[0].extent.offset;
 807
 808        ceph_osdc_put_request(orig_req);
 809
 810        req->r_callback = ceph_aio_complete_req;
 811        req->r_inode = inode;
 812        req->r_priv = aio_req;
 813        req->r_abort_on_full = true;
 814
 815        ret = ceph_osdc_start_request(req->r_osdc, req, false);
 816out:
 817        if (ret < 0) {
 818                req->r_result = ret;
 819                ceph_aio_complete_req(req);
 820        }
 821
 822        ceph_put_snap_context(snapc);
 823        kfree(aio_work);
 824}
 825
 826static ssize_t
 827ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 828                       struct ceph_snap_context *snapc,
 829                       struct ceph_cap_flush **pcf)
 830{
 831        struct file *file = iocb->ki_filp;
 832        struct inode *inode = file_inode(file);
 833        struct ceph_inode_info *ci = ceph_inode(inode);
 834        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 835        struct ceph_vino vino;
 836        struct ceph_osd_request *req;
 837        struct page **pages;
 838        struct ceph_aio_request *aio_req = NULL;
 839        int num_pages = 0;
 840        int flags;
 841        int ret;
 842        struct timespec mtime = current_time(inode);
 843        size_t count = iov_iter_count(iter);
 844        loff_t pos = iocb->ki_pos;
 845        bool write = iov_iter_rw(iter) == WRITE;
 846
 847        if (write && ceph_snap(file_inode(file)) != CEPH_NOSNAP)
 848                return -EROFS;
 849
 850        dout("sync_direct_read_write (%s) on file %p %lld~%u\n",
 851             (write ? "write" : "read"), file, pos, (unsigned)count);
 852
 853        ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
 854        if (ret < 0)
 855                return ret;
 856
 857        if (write) {
 858                int ret2 = invalidate_inode_pages2_range(inode->i_mapping,
 859                                        pos >> PAGE_SHIFT,
 860                                        (pos + count) >> PAGE_SHIFT);
 861                if (ret2 < 0)
 862                        dout("invalidate_inode_pages2_range returned %d\n", ret2);
 863
 864                flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_WRITE;
 865        } else {
 866                flags = CEPH_OSD_FLAG_READ;
 867        }
 868
 869        while (iov_iter_count(iter) > 0) {
 870                u64 size = dio_get_pagev_size(iter);
 871                size_t start = 0;
 872                ssize_t len;
 873
 874                vino = ceph_vino(inode);
 875                req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
 876                                            vino, pos, &size, 0,
 877                                            /*include a 'startsync' command*/
 878                                            write ? 2 : 1,
 879                                            write ? CEPH_OSD_OP_WRITE :
 880                                                    CEPH_OSD_OP_READ,
 881                                            flags, snapc,
 882                                            ci->i_truncate_seq,
 883                                            ci->i_truncate_size,
 884                                            false);
 885                if (IS_ERR(req)) {
 886                        ret = PTR_ERR(req);
 887                        break;
 888                }
 889
 890                len = size;
 891                pages = dio_get_pages_alloc(iter, len, &start, &num_pages);
 892                if (IS_ERR(pages)) {
 893                        ceph_osdc_put_request(req);
 894                        ret = PTR_ERR(pages);
 895                        break;
 896                }
 897
 898                /*
 899                 * To simplify error handling, allow AIO when IO within i_size
 900                 * or IO can be satisfied by single OSD request.
 901                 */
 902                if (pos == iocb->ki_pos && !is_sync_kiocb(iocb) &&
 903                    (len == count || pos + count <= i_size_read(inode))) {
 904                        aio_req = kzalloc(sizeof(*aio_req), GFP_KERNEL);
 905                        if (aio_req) {
 906                                aio_req->iocb = iocb;
 907                                aio_req->write = write;
 908                                INIT_LIST_HEAD(&aio_req->osd_reqs);
 909                                if (write) {
 910                                        aio_req->mtime = mtime;
 911                                        swap(aio_req->prealloc_cf, *pcf);
 912                                }
 913                        }
 914                        /* ignore error */
 915                }
 916
 917                if (write) {
 918                        /*
 919                         * throw out any page cache pages in this range. this
 920                         * may block.
 921                         */
 922                        truncate_inode_pages_range(inode->i_mapping, pos,
 923                                        (pos+len) | (PAGE_SIZE - 1));
 924
 925                        osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
 926                        req->r_mtime = mtime;
 927                }
 928
 929                osd_req_op_extent_osd_data_pages(req, 0, pages, len, start,
 930                                                 false, false);
 931
 932                if (aio_req) {
 933                        aio_req->total_len += len;
 934                        aio_req->num_reqs++;
 935                        atomic_inc(&aio_req->pending_reqs);
 936
 937                        req->r_callback = ceph_aio_complete_req;
 938                        req->r_inode = inode;
 939                        req->r_priv = aio_req;
 940                        list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs);
 941
 942                        pos += len;
 943                        iov_iter_advance(iter, len);
 944                        continue;
 945                }
 946
 947                ret = ceph_osdc_start_request(req->r_osdc, req, false);
 948                if (!ret)
 949                        ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
 950
 951                size = i_size_read(inode);
 952                if (!write) {
 953                        if (ret == -ENOENT)
 954                                ret = 0;
 955                        if (ret >= 0 && ret < len && pos + ret < size) {
 956                                int zlen = min_t(size_t, len - ret,
 957                                                 size - pos - ret);
 958                                ceph_zero_page_vector_range(start + ret, zlen,
 959                                                            pages);
 960                                ret += zlen;
 961                        }
 962                        if (ret >= 0)
 963                                len = ret;
 964                }
 965
 966                ceph_put_page_vector(pages, num_pages, !write);
 967
 968                ceph_osdc_put_request(req);
 969                if (ret < 0)
 970                        break;
 971
 972                pos += len;
 973                iov_iter_advance(iter, len);
 974
 975                if (!write && pos >= size)
 976                        break;
 977
 978                if (write && pos > size) {
 979                        if (ceph_inode_set_size(inode, pos))
 980                                ceph_check_caps(ceph_inode(inode),
 981                                                CHECK_CAPS_AUTHONLY,
 982                                                NULL);
 983                }
 984        }
 985
 986        if (aio_req) {
 987                LIST_HEAD(osd_reqs);
 988
 989                if (aio_req->num_reqs == 0) {
 990                        kfree(aio_req);
 991                        return ret;
 992                }
 993
 994                ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR :
 995                                              CEPH_CAP_FILE_RD);
 996
 997                list_splice(&aio_req->osd_reqs, &osd_reqs);
 998                while (!list_empty(&osd_reqs)) {
 999                        req = list_first_entry(&osd_reqs,
1000                                               struct ceph_osd_request,
1001                                               r_unsafe_item);
1002                        list_del_init(&req->r_unsafe_item);
1003                        if (ret >= 0)
1004                                ret = ceph_osdc_start_request(req->r_osdc,
1005                                                              req, false);
1006                        if (ret < 0) {
1007                                req->r_result = ret;
1008                                ceph_aio_complete_req(req);
1009                        }
1010                }
1011                return -EIOCBQUEUED;
1012        }
1013
1014        if (ret != -EOLDSNAPC && pos > iocb->ki_pos) {
1015                ret = pos - iocb->ki_pos;
1016                iocb->ki_pos = pos;
1017        }
1018        return ret;
1019}
1020
1021/*
1022 * Synchronous write, straight from __user pointer or user pages.
1023 *
1024 * If write spans object boundary, just do multiple writes.  (For a
1025 * correct atomic write, we should e.g. take write locks on all
1026 * objects, rollback on failure, etc.)
1027 */
1028static ssize_t
1029ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
1030                struct ceph_snap_context *snapc)
1031{
1032        struct file *file = iocb->ki_filp;
1033        struct inode *inode = file_inode(file);
1034        struct ceph_inode_info *ci = ceph_inode(inode);
1035        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1036        struct ceph_vino vino;
1037        struct ceph_osd_request *req;
1038        struct page **pages;
1039        u64 len;
1040        int num_pages;
1041        int written = 0;
1042        int flags;
1043        int ret;
1044        bool check_caps = false;
1045        struct timespec mtime = current_time(inode);
1046        size_t count = iov_iter_count(from);
1047
1048        if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
1049                return -EROFS;
1050
1051        dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);
1052
1053        ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
1054        if (ret < 0)
1055                return ret;
1056
1057        ret = invalidate_inode_pages2_range(inode->i_mapping,
1058                                            pos >> PAGE_SHIFT,
1059                                            (pos + count) >> PAGE_SHIFT);
1060        if (ret < 0)
1061                dout("invalidate_inode_pages2_range returned %d\n", ret);
1062
1063        flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_WRITE;
1064
1065        while ((len = iov_iter_count(from)) > 0) {
1066                size_t left;
1067                int n;
1068
1069                vino = ceph_vino(inode);
1070                req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1071                                            vino, pos, &len, 0, 1,
1072                                            CEPH_OSD_OP_WRITE, flags, snapc,
1073                                            ci->i_truncate_seq,
1074                                            ci->i_truncate_size,
1075                                            false);
1076                if (IS_ERR(req)) {
1077                        ret = PTR_ERR(req);
1078                        break;
1079                }
1080
1081                /*
1082                 * write from beginning of first page,
1083                 * regardless of io alignment
1084                 */
1085                num_pages = (len + PAGE_SIZE - 1) >> PAGE_SHIFT;
1086
1087                pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1088                if (IS_ERR(pages)) {
1089                        ret = PTR_ERR(pages);
1090                        goto out;
1091                }
1092
1093                left = len;
1094                for (n = 0; n < num_pages; n++) {
1095                        size_t plen = min_t(size_t, left, PAGE_SIZE);
1096                        ret = copy_page_from_iter(pages[n], 0, plen, from);
1097                        if (ret != plen) {
1098                                ret = -EFAULT;
1099                                break;
1100                        }
1101                        left -= ret;
1102                }
1103
1104                if (ret < 0) {
1105                        ceph_release_page_vector(pages, num_pages);
1106                        goto out;
1107                }
1108
1109                req->r_inode = inode;
1110
1111                osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
1112                                                false, true);
1113
1114                req->r_mtime = mtime;
1115                ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1116                if (!ret)
1117                        ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
1118
1119out:
1120                ceph_osdc_put_request(req);
1121                if (ret != 0) {
1122                        ceph_set_error_write(ci);
1123                        break;
1124                }
1125
1126                ceph_clear_error_write(ci);
1127                pos += len;
1128                written += len;
1129                if (pos > i_size_read(inode)) {
1130                        check_caps = ceph_inode_set_size(inode, pos);
1131                        if (check_caps)
1132                                ceph_check_caps(ceph_inode(inode),
1133                                                CHECK_CAPS_AUTHONLY,
1134                                                NULL);
1135                }
1136
1137        }
1138
1139        if (ret != -EOLDSNAPC && written > 0) {
1140                ret = written;
1141                iocb->ki_pos = pos;
1142        }
1143        return ret;
1144}
1145
1146/*
1147 * Wrap generic_file_aio_read with checks for cap bits on the inode.
1148 * Atomically grab references, so that those bits are not released
1149 * back to the MDS mid-read.
1150 *
1151 * Hmm, the sync read case isn't actually async... should it be?
1152 */
1153static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
1154{
1155        struct file *filp = iocb->ki_filp;
1156        struct ceph_file_info *fi = filp->private_data;
1157        size_t len = iov_iter_count(to);
1158        struct inode *inode = file_inode(filp);
1159        struct ceph_inode_info *ci = ceph_inode(inode);
1160        struct page *pinned_page = NULL;
1161        ssize_t ret;
1162        int want, got = 0;
1163        int retry_op = 0, read = 0;
1164
1165again:
1166        dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
1167             inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
1168
1169        if (fi->fmode & CEPH_FILE_MODE_LAZY)
1170                want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
1171        else
1172                want = CEPH_CAP_FILE_CACHE;
1173        ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
1174        if (ret < 0)
1175                return ret;
1176
1177        if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
1178            (iocb->ki_flags & IOCB_DIRECT) ||
1179            (fi->flags & CEPH_F_SYNC)) {
1180
1181                dout("aio_sync_read %p %llx.%llx %llu~%u got cap refs on %s\n",
1182                     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
1183                     ceph_cap_string(got));
1184
1185                if (ci->i_inline_version == CEPH_INLINE_NONE) {
1186                        if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) {
1187                                ret = ceph_direct_read_write(iocb, to,
1188                                                             NULL, NULL);
1189                                if (ret >= 0 && ret < len)
1190                                        retry_op = CHECK_EOF;
1191                        } else {
1192                                ret = ceph_sync_read(iocb, to, &retry_op);
1193                        }
1194                } else {
1195                        retry_op = READ_INLINE;
1196                }
1197        } else {
1198                dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
1199                     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
1200                     ceph_cap_string(got));
1201                current->journal_info = filp;
1202                ret = generic_file_read_iter(iocb, to);
1203                current->journal_info = NULL;
1204        }
1205        dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
1206             inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
1207        if (pinned_page) {
1208                put_page(pinned_page);
1209                pinned_page = NULL;
1210        }
1211        ceph_put_cap_refs(ci, got);
1212        if (retry_op > HAVE_RETRIED && ret >= 0) {
1213                int statret;
1214                struct page *page = NULL;
1215                loff_t i_size;
1216                if (retry_op == READ_INLINE) {
1217                        page = __page_cache_alloc(GFP_KERNEL);
1218                        if (!page)
1219                                return -ENOMEM;
1220                }
1221
1222                statret = __ceph_do_getattr(inode, page,
1223                                            CEPH_STAT_CAP_INLINE_DATA, !!page);
1224                if (statret < 0) {
1225                        if (page)
1226                                __free_page(page);
1227                        if (statret == -ENODATA) {
1228                                BUG_ON(retry_op != READ_INLINE);
1229                                goto again;
1230                        }
1231                        return statret;
1232                }
1233
1234                i_size = i_size_read(inode);
1235                if (retry_op == READ_INLINE) {
1236                        BUG_ON(ret > 0 || read > 0);
1237                        if (iocb->ki_pos < i_size &&
1238                            iocb->ki_pos < PAGE_SIZE) {
1239                                loff_t end = min_t(loff_t, i_size,
1240                                                   iocb->ki_pos + len);
1241                                end = min_t(loff_t, end, PAGE_SIZE);
1242                                if (statret < end)
1243                                        zero_user_segment(page, statret, end);
1244                                ret = copy_page_to_iter(page,
1245                                                iocb->ki_pos & ~PAGE_MASK,
1246                                                end - iocb->ki_pos, to);
1247                                iocb->ki_pos += ret;
1248                                read += ret;
1249                        }
1250                        if (iocb->ki_pos < i_size && read < len) {
1251                                size_t zlen = min_t(size_t, len - read,
1252                                                    i_size - iocb->ki_pos);
1253                                ret = iov_iter_zero(zlen, to);
1254                                iocb->ki_pos += ret;
1255                                read += ret;
1256                        }
1257                        __free_pages(page, 0);
1258                        return read;
1259                }
1260
1261                /* hit EOF or hole? */
1262                if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
1263                    ret < len) {
1264                        dout("sync_read hit hole, ppos %lld < size %lld"
1265                             ", reading more\n", iocb->ki_pos, i_size);
1266
1267                        read += ret;
1268                        len -= ret;
1269                        retry_op = HAVE_RETRIED;
1270                        goto again;
1271                }
1272        }
1273
1274        if (ret >= 0)
1275                ret += read;
1276
1277        return ret;
1278}
1279
1280/*
1281 * Take cap references to avoid releasing caps to MDS mid-write.
1282 *
1283 * If we are synchronous, and write with an old snap context, the OSD
1284 * may return EOLDSNAPC.  In that case, retry the write.. _after_
1285 * dropping our cap refs and allowing the pending snap to logically
1286 * complete _before_ this write occurs.
1287 *
1288 * If we are near ENOSPC, write synchronously.
1289 */
1290static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
1291{
1292        struct file *file = iocb->ki_filp;
1293        struct ceph_file_info *fi = file->private_data;
1294        struct inode *inode = file_inode(file);
1295        struct ceph_inode_info *ci = ceph_inode(inode);
1296        struct ceph_osd_client *osdc =
1297                &ceph_sb_to_client(inode->i_sb)->client->osdc;
1298        struct ceph_cap_flush *prealloc_cf;
1299        ssize_t count, written = 0;
1300        int err, want, got;
1301        loff_t pos;
1302
1303        if (ceph_snap(inode) != CEPH_NOSNAP)
1304                return -EROFS;
1305
1306        prealloc_cf = ceph_alloc_cap_flush();
1307        if (!prealloc_cf)
1308                return -ENOMEM;
1309
1310        inode_lock(inode);
1311
1312        /* We can write back this queue in page reclaim */
1313        current->backing_dev_info = inode_to_bdi(inode);
1314
1315        if (iocb->ki_flags & IOCB_APPEND) {
1316                err = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
1317                if (err < 0)
1318                        goto out;
1319        }
1320
1321        err = generic_write_checks(iocb, from);
1322        if (err <= 0)
1323                goto out;
1324
1325        pos = iocb->ki_pos;
1326        count = iov_iter_count(from);
1327        err = file_remove_privs(file);
1328        if (err)
1329                goto out;
1330
1331        err = file_update_time(file);
1332        if (err)
1333                goto out;
1334
1335        if (ci->i_inline_version != CEPH_INLINE_NONE) {
1336                err = ceph_uninline_data(file, NULL);
1337                if (err < 0)
1338                        goto out;
1339        }
1340
1341retry_snap:
1342        /* FIXME: not complete since it doesn't account for being at quota */
1343        if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL)) {
1344                err = -ENOSPC;
1345                goto out;
1346        }
1347
1348        dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
1349             inode, ceph_vinop(inode), pos, count, i_size_read(inode));
1350        if (fi->fmode & CEPH_FILE_MODE_LAZY)
1351                want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1352        else
1353                want = CEPH_CAP_FILE_BUFFER;
1354        got = 0;
1355        err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count,
1356                            &got, NULL);
1357        if (err < 0)
1358                goto out;
1359
1360        dout("aio_write %p %llx.%llx %llu~%zd got cap refs on %s\n",
1361             inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
1362
1363        if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
1364            (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC) ||
1365            (ci->i_ceph_flags & CEPH_I_ERROR_WRITE)) {
1366                struct ceph_snap_context *snapc;
1367                struct iov_iter data;
1368                inode_unlock(inode);
1369
1370                spin_lock(&ci->i_ceph_lock);
1371                if (__ceph_have_pending_cap_snap(ci)) {
1372                        struct ceph_cap_snap *capsnap =
1373                                        list_last_entry(&ci->i_cap_snaps,
1374                                                        struct ceph_cap_snap,
1375                                                        ci_item);
1376                        snapc = ceph_get_snap_context(capsnap->context);
1377                } else {
1378                        BUG_ON(!ci->i_head_snapc);
1379                        snapc = ceph_get_snap_context(ci->i_head_snapc);
1380                }
1381                spin_unlock(&ci->i_ceph_lock);
1382
1383                /* we might need to revert back to that point */
1384                data = *from;
1385                if (iocb->ki_flags & IOCB_DIRECT)
1386                        written = ceph_direct_read_write(iocb, &data, snapc,
1387                                                         &prealloc_cf);
1388                else
1389                        written = ceph_sync_write(iocb, &data, pos, snapc);
1390                if (written == -EOLDSNAPC) {
1391                        dout("aio_write %p %llx.%llx %llu~%u"
1392                                "got EOLDSNAPC, retrying\n",
1393                                inode, ceph_vinop(inode),
1394                                pos, (unsigned)count);
1395                        inode_lock(inode);
1396                        goto retry_snap;
1397                }
1398                if (written > 0)
1399                        iov_iter_advance(from, written);
1400                ceph_put_snap_context(snapc);
1401        } else {
1402                /*
1403                 * No need to acquire the i_truncate_mutex. Because
1404                 * the MDS revokes Fwb caps before sending truncate
1405                 * message to us. We can't get Fwb cap while there
1406                 * are pending vmtruncate. So write and vmtruncate
1407                 * can not run at the same time
1408                 */
1409                written = generic_perform_write(file, from, pos);
1410                if (likely(written >= 0))
1411                        iocb->ki_pos = pos + written;
1412                inode_unlock(inode);
1413        }
1414
1415        if (written >= 0) {
1416                int dirty;
1417                spin_lock(&ci->i_ceph_lock);
1418                ci->i_inline_version = CEPH_INLINE_NONE;
1419                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
1420                                               &prealloc_cf);
1421                spin_unlock(&ci->i_ceph_lock);
1422                if (dirty)
1423                        __mark_inode_dirty(inode, dirty);
1424        }
1425
1426        dout("aio_write %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
1427             inode, ceph_vinop(inode), pos, (unsigned)count,
1428             ceph_cap_string(got));
1429        ceph_put_cap_refs(ci, got);
1430
1431        if (written >= 0) {
1432                if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_NEARFULL))
1433                        iocb->ki_flags |= IOCB_DSYNC;
1434
1435                written = generic_write_sync(iocb, written);
1436        }
1437
1438        goto out_unlocked;
1439
1440out:
1441        inode_unlock(inode);
1442out_unlocked:
1443        ceph_free_cap_flush(prealloc_cf);
1444        current->backing_dev_info = NULL;
1445        return written ? written : err;
1446}
1447
1448/*
1449 * llseek.  be sure to verify file size on SEEK_END.
1450 */
1451static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
1452{
1453        struct inode *inode = file->f_mapping->host;
1454        loff_t i_size;
1455        loff_t ret;
1456
1457        inode_lock(inode);
1458
1459        if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
1460                ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
1461                if (ret < 0)
1462                        goto out;
1463        }
1464
1465        i_size = i_size_read(inode);
1466        switch (whence) {
1467        case SEEK_END:
1468                offset += i_size;
1469                break;
1470        case SEEK_CUR:
1471                /*
1472                 * Here we special-case the lseek(fd, 0, SEEK_CUR)
1473                 * position-querying operation.  Avoid rewriting the "same"
1474                 * f_pos value back to the file because a concurrent read(),
1475                 * write() or lseek() might have altered it
1476                 */
1477                if (offset == 0) {
1478                        ret = file->f_pos;
1479                        goto out;
1480                }
1481                offset += file->f_pos;
1482                break;
1483        case SEEK_DATA:
1484                if (offset >= i_size) {
1485                        ret = -ENXIO;
1486                        goto out;
1487                }
1488                break;
1489        case SEEK_HOLE:
1490                if (offset >= i_size) {
1491                        ret = -ENXIO;
1492                        goto out;
1493                }
1494                offset = i_size;
1495                break;
1496        }
1497
1498        ret = vfs_setpos(file, offset, inode->i_sb->s_maxbytes);
1499
1500out:
1501        inode_unlock(inode);
1502        return ret;
1503}
1504
1505static inline void ceph_zero_partial_page(
1506        struct inode *inode, loff_t offset, unsigned size)
1507{
1508        struct page *page;
1509        pgoff_t index = offset >> PAGE_SHIFT;
1510
1511        page = find_lock_page(inode->i_mapping, index);
1512        if (page) {
1513                wait_on_page_writeback(page);
1514                zero_user(page, offset & (PAGE_SIZE - 1), size);
1515                unlock_page(page);
1516                put_page(page);
1517        }
1518}
1519
1520static void ceph_zero_pagecache_range(struct inode *inode, loff_t offset,
1521                                      loff_t length)
1522{
1523        loff_t nearly = round_up(offset, PAGE_SIZE);
1524        if (offset < nearly) {
1525                loff_t size = nearly - offset;
1526                if (length < size)
1527                        size = length;
1528                ceph_zero_partial_page(inode, offset, size);
1529                offset += size;
1530                length -= size;
1531        }
1532        if (length >= PAGE_SIZE) {
1533                loff_t size = round_down(length, PAGE_SIZE);
1534                truncate_pagecache_range(inode, offset, offset + size - 1);
1535                offset += size;
1536                length -= size;
1537        }
1538        if (length)
1539                ceph_zero_partial_page(inode, offset, length);
1540}
1541
1542static int ceph_zero_partial_object(struct inode *inode,
1543                                    loff_t offset, loff_t *length)
1544{
1545        struct ceph_inode_info *ci = ceph_inode(inode);
1546        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1547        struct ceph_osd_request *req;
1548        int ret = 0;
1549        loff_t zero = 0;
1550        int op;
1551
1552        if (!length) {
1553                op = offset ? CEPH_OSD_OP_DELETE : CEPH_OSD_OP_TRUNCATE;
1554                length = &zero;
1555        } else {
1556                op = CEPH_OSD_OP_ZERO;
1557        }
1558
1559        req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1560                                        ceph_vino(inode),
1561                                        offset, length,
1562                                        0, 1, op,
1563                                        CEPH_OSD_FLAG_WRITE,
1564                                        NULL, 0, 0, false);
1565        if (IS_ERR(req)) {
1566                ret = PTR_ERR(req);
1567                goto out;
1568        }
1569
1570        req->r_mtime = inode->i_mtime;
1571        ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1572        if (!ret) {
1573                ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
1574                if (ret == -ENOENT)
1575                        ret = 0;
1576        }
1577        ceph_osdc_put_request(req);
1578
1579out:
1580        return ret;
1581}
1582
1583static int ceph_zero_objects(struct inode *inode, loff_t offset, loff_t length)
1584{
1585        int ret = 0;
1586        struct ceph_inode_info *ci = ceph_inode(inode);
1587        s32 stripe_unit = ci->i_layout.stripe_unit;
1588        s32 stripe_count = ci->i_layout.stripe_count;
1589        s32 object_size = ci->i_layout.object_size;
1590        u64 object_set_size = object_size * stripe_count;
1591        u64 nearly, t;
1592
1593        /* round offset up to next period boundary */
1594        nearly = offset + object_set_size - 1;
1595        t = nearly;
1596        nearly -= do_div(t, object_set_size);
1597
1598        while (length && offset < nearly) {
1599                loff_t size = length;
1600                ret = ceph_zero_partial_object(inode, offset, &size);
1601                if (ret < 0)
1602                        return ret;
1603                offset += size;
1604                length -= size;
1605        }
1606        while (length >= object_set_size) {
1607                int i;
1608                loff_t pos = offset;
1609                for (i = 0; i < stripe_count; ++i) {
1610                        ret = ceph_zero_partial_object(inode, pos, NULL);
1611                        if (ret < 0)
1612                                return ret;
1613                        pos += stripe_unit;
1614                }
1615                offset += object_set_size;
1616                length -= object_set_size;
1617        }
1618        while (length) {
1619                loff_t size = length;
1620                ret = ceph_zero_partial_object(inode, offset, &size);
1621                if (ret < 0)
1622                        return ret;
1623                offset += size;
1624                length -= size;
1625        }
1626        return ret;
1627}
1628
1629static long ceph_fallocate(struct file *file, int mode,
1630                                loff_t offset, loff_t length)
1631{
1632        struct ceph_file_info *fi = file->private_data;
1633        struct inode *inode = file_inode(file);
1634        struct ceph_inode_info *ci = ceph_inode(inode);
1635        struct ceph_osd_client *osdc =
1636                &ceph_inode_to_client(inode)->client->osdc;
1637        struct ceph_cap_flush *prealloc_cf;
1638        int want, got = 0;
1639        int dirty;
1640        int ret = 0;
1641        loff_t endoff = 0;
1642        loff_t size;
1643
1644        if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
1645                return -EOPNOTSUPP;
1646
1647        if (!S_ISREG(inode->i_mode))
1648                return -EOPNOTSUPP;
1649
1650        prealloc_cf = ceph_alloc_cap_flush();
1651        if (!prealloc_cf)
1652                return -ENOMEM;
1653
1654        inode_lock(inode);
1655
1656        if (ceph_snap(inode) != CEPH_NOSNAP) {
1657                ret = -EROFS;
1658                goto unlock;
1659        }
1660
1661        if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) &&
1662            !(mode & FALLOC_FL_PUNCH_HOLE)) {
1663                ret = -ENOSPC;
1664                goto unlock;
1665        }
1666
1667        if (ci->i_inline_version != CEPH_INLINE_NONE) {
1668                ret = ceph_uninline_data(file, NULL);
1669                if (ret < 0)
1670                        goto unlock;
1671        }
1672
1673        size = i_size_read(inode);
1674        if (!(mode & FALLOC_FL_KEEP_SIZE)) {
1675                endoff = offset + length;
1676                ret = inode_newsize_ok(inode, endoff);
1677                if (ret)
1678                        goto unlock;
1679        }
1680
1681        if (fi->fmode & CEPH_FILE_MODE_LAZY)
1682                want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1683        else
1684                want = CEPH_CAP_FILE_BUFFER;
1685
1686        ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
1687        if (ret < 0)
1688                goto unlock;
1689
1690        if (mode & FALLOC_FL_PUNCH_HOLE) {
1691                if (offset < size)
1692                        ceph_zero_pagecache_range(inode, offset, length);
1693                ret = ceph_zero_objects(inode, offset, length);
1694        } else if (endoff > size) {
1695                truncate_pagecache_range(inode, size, -1);
1696                if (ceph_inode_set_size(inode, endoff))
1697                        ceph_check_caps(ceph_inode(inode),
1698                                CHECK_CAPS_AUTHONLY, NULL);
1699        }
1700
1701        if (!ret) {
1702                spin_lock(&ci->i_ceph_lock);
1703                ci->i_inline_version = CEPH_INLINE_NONE;
1704                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
1705                                               &prealloc_cf);
1706                spin_unlock(&ci->i_ceph_lock);
1707                if (dirty)
1708                        __mark_inode_dirty(inode, dirty);
1709        }
1710
1711        ceph_put_cap_refs(ci, got);
1712unlock:
1713        inode_unlock(inode);
1714        ceph_free_cap_flush(prealloc_cf);
1715        return ret;
1716}
1717
1718const struct file_operations ceph_file_fops = {
1719        .open = ceph_open,
1720        .release = ceph_release,
1721        .llseek = ceph_llseek,
1722        .read_iter = ceph_read_iter,
1723        .write_iter = ceph_write_iter,
1724        .mmap = ceph_mmap,
1725        .fsync = ceph_fsync,
1726        .lock = ceph_lock,
1727        .flock = ceph_flock,
1728        .splice_read = generic_file_splice_read,
1729        .splice_write = iter_file_splice_write,
1730        .unlocked_ioctl = ceph_ioctl,
1731        .compat_ioctl   = ceph_ioctl,
1732        .fallocate      = ceph_fallocate,
1733};
1734
1735