linux/fs/ceph/addr.c
<<
>>
Prefs
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/backing-dev.h>
   4#include <linux/fs.h>
   5#include <linux/mm.h>
   6#include <linux/pagemap.h>
   7#include <linux/writeback.h>    /* generic_writepages */
   8#include <linux/slab.h>
   9#include <linux/pagevec.h>
  10#include <linux/task_io_accounting_ops.h>
  11
  12#include "super.h"
  13#include "mds_client.h"
  14#include <linux/ceph/osd_client.h>
  15
  16/*
  17 * Ceph address space ops.
  18 *
  19 * There are a few funny things going on here.
  20 *
  21 * The page->private field is used to reference a struct
  22 * ceph_snap_context for _every_ dirty page.  This indicates which
  23 * snapshot the page was logically dirtied in, and thus which snap
  24 * context needs to be associated with the osd write during writeback.
  25 *
  26 * Similarly, struct ceph_inode_info maintains a set of counters to
  27 * count dirty pages on the inode.  In the absence of snapshots,
  28 * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count.
  29 *
  30 * When a snapshot is taken (that is, when the client receives
  31 * notification that a snapshot was taken), each inode with caps and
  32 * with dirty pages (dirty pages implies there is a cap) gets a new
  33 * ceph_cap_snap in the i_cap_snaps list (which is sorted in ascending
  34 * order, new snaps go to the tail).  The i_wrbuffer_ref_head count is
  35 * moved to capsnap->dirty. (Unless a sync write is currently in
  36 * progress.  In that case, the capsnap is said to be "pending", new
  37 * writes cannot start, and the capsnap isn't "finalized" until the
  38 * write completes (or fails) and a final size/mtime for the inode for
  39 * that snap can be settled upon.)  i_wrbuffer_ref_head is reset to 0.
  40 *
  41 * On writeback, we must submit writes to the osd IN SNAP ORDER.  So,
  42 * we look for the first capsnap in i_cap_snaps and write out pages in
  43 * that snap context _only_.  Then we move on to the next capsnap,
  44 * eventually reaching the "live" or "head" context (i.e., pages that
  45 * are not yet snapped) and are writing the most recently dirtied
  46 * pages.
  47 *
  48 * Invalidate and so forth must take care to ensure the dirty page
  49 * accounting is preserved.
  50 */
  51
  52#define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10))
  53#define CONGESTION_OFF_THRESH(congestion_kb)                            \
  54        (CONGESTION_ON_THRESH(congestion_kb) -                          \
  55         (CONGESTION_ON_THRESH(congestion_kb) >> 2))
  56
  57static inline struct ceph_snap_context *page_snap_context(struct page *page)
  58{
  59        if (PagePrivate(page))
  60                return (void *)page->private;
  61        return NULL;
  62}
  63
  64/*
  65 * Dirty a page.  Optimistically adjust accounting, on the assumption
  66 * that we won't race with invalidate.  If we do, readjust.
  67 */
  68static int ceph_set_page_dirty(struct page *page)
  69{
  70        struct address_space *mapping = page->mapping;
  71        struct inode *inode;
  72        struct ceph_inode_info *ci;
  73        int undo = 0;
  74        struct ceph_snap_context *snapc;
  75
  76        if (unlikely(!mapping))
  77                return !TestSetPageDirty(page);
  78
  79        if (TestSetPageDirty(page)) {
  80                dout("%p set_page_dirty %p idx %lu -- already dirty\n",
  81                     mapping->host, page, page->index);
  82                return 0;
  83        }
  84
  85        inode = mapping->host;
  86        ci = ceph_inode(inode);
  87
  88        /*
  89         * Note that we're grabbing a snapc ref here without holding
  90         * any locks!
  91         */
  92        snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context);
  93
  94        /* dirty the head */
  95        spin_lock(&ci->i_ceph_lock);
  96        if (ci->i_head_snapc == NULL)
  97                ci->i_head_snapc = ceph_get_snap_context(snapc);
  98        ++ci->i_wrbuffer_ref_head;
  99        if (ci->i_wrbuffer_ref == 0)
 100                ihold(inode);
 101        ++ci->i_wrbuffer_ref;
 102        dout("%p set_page_dirty %p idx %lu head %d/%d -> %d/%d "
 103             "snapc %p seq %lld (%d snaps)\n",
 104             mapping->host, page, page->index,
 105             ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1,
 106             ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
 107             snapc, snapc->seq, snapc->num_snaps);
 108        spin_unlock(&ci->i_ceph_lock);
 109
 110        /* now adjust page */
 111        spin_lock_irq(&mapping->tree_lock);
 112        if (page->mapping) {    /* Race with truncate? */
 113                WARN_ON_ONCE(!PageUptodate(page));
 114                account_page_dirtied(page, page->mapping);
 115                radix_tree_tag_set(&mapping->page_tree,
 116                                page_index(page), PAGECACHE_TAG_DIRTY);
 117
 118                /*
 119                 * Reference snap context in page->private.  Also set
 120                 * PagePrivate so that we get invalidatepage callback.
 121                 */
 122                page->private = (unsigned long)snapc;
 123                SetPagePrivate(page);
 124        } else {
 125                dout("ANON set_page_dirty %p (raced truncate?)\n", page);
 126                undo = 1;
 127        }
 128
 129        spin_unlock_irq(&mapping->tree_lock);
 130
 131        if (undo)
 132                /* whoops, we failed to dirty the page */
 133                ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
 134
 135        __mark_inode_dirty(mapping->host, I_DIRTY_PAGES);
 136
 137        BUG_ON(!PageDirty(page));
 138        return 1;
 139}
 140
 141/*
 142 * If we are truncating the full page (i.e. offset == 0), adjust the
 143 * dirty page counters appropriately.  Only called if there is private
 144 * data on the page.
 145 */
 146static void ceph_invalidatepage(struct page *page, unsigned int offset,
 147                                unsigned int length)
 148{
 149        struct inode *inode;
 150        struct ceph_inode_info *ci;
 151        struct ceph_snap_context *snapc = page_snap_context(page);
 152
 153        BUG_ON(!PageLocked(page));
 154        BUG_ON(!PagePrivate(page));
 155        BUG_ON(!page->mapping);
 156
 157        inode = page->mapping->host;
 158
 159        /*
 160         * We can get non-dirty pages here due to races between
 161         * set_page_dirty and truncate_complete_page; just spit out a
 162         * warning, in case we end up with accounting problems later.
 163         */
 164        if (!PageDirty(page))
 165                pr_err("%p invalidatepage %p page not dirty\n", inode, page);
 166
 167        if (offset == 0 && length == PAGE_CACHE_SIZE)
 168                ClearPageChecked(page);
 169
 170        ci = ceph_inode(inode);
 171        if (offset == 0 && length == PAGE_CACHE_SIZE) {
 172                dout("%p invalidatepage %p idx %lu full dirty page\n",
 173                     inode, page, page->index);
 174                ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
 175                ceph_put_snap_context(snapc);
 176                page->private = 0;
 177                ClearPagePrivate(page);
 178        } else {
 179                dout("%p invalidatepage %p idx %lu partial dirty page %u(%u)\n",
 180                     inode, page, page->index, offset, length);
 181        }
 182}
 183
 184/* just a sanity check */
 185static int ceph_releasepage(struct page *page, gfp_t g)
 186{
 187        struct inode *inode = page->mapping ? page->mapping->host : NULL;
 188        dout("%p releasepage %p idx %lu\n", inode, page, page->index);
 189        WARN_ON(PageDirty(page));
 190        WARN_ON(PagePrivate(page));
 191        return 0;
 192}
 193
 194/*
 195 * read a single page, without unlocking it.
 196 */
 197static int readpage_nounlock(struct file *filp, struct page *page)
 198{
 199        struct inode *inode = file_inode(filp);
 200        struct ceph_inode_info *ci = ceph_inode(inode);
 201        struct ceph_osd_client *osdc = 
 202                &ceph_inode_to_client(inode)->client->osdc;
 203        int err = 0;
 204        u64 len = PAGE_CACHE_SIZE;
 205
 206        dout("readpage inode %p file %p page %p index %lu\n",
 207             inode, filp, page, page->index);
 208        err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
 209                                  (u64) page_offset(page), &len,
 210                                  ci->i_truncate_seq, ci->i_truncate_size,
 211                                  &page, 1, 0);
 212        if (err == -ENOENT)
 213                err = 0;
 214        if (err < 0) {
 215                SetPageError(page);
 216                goto out;
 217        } else if (err < PAGE_CACHE_SIZE) {
 218                /* zero fill remainder of page */
 219                zero_user_segment(page, err, PAGE_CACHE_SIZE);
 220        }
 221        SetPageUptodate(page);
 222
 223out:
 224        return err < 0 ? err : 0;
 225}
 226
 227static int ceph_readpage(struct file *filp, struct page *page)
 228{
 229        int r = readpage_nounlock(filp, page);
 230        unlock_page(page);
 231        return r;
 232}
 233
 234/*
 235 * Finish an async read(ahead) op.
 236 */
 237static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
 238{
 239        struct inode *inode = req->r_inode;
 240        struct ceph_osd_data *osd_data;
 241        int rc = req->r_result;
 242        int bytes = le32_to_cpu(msg->hdr.data_len);
 243        int num_pages;
 244        int i;
 245
 246        dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
 247
 248        /* unlock all pages, zeroing any data we didn't read */
 249        osd_data = osd_req_op_extent_osd_data(req, 0);
 250        BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
 251        num_pages = calc_pages_for((u64)osd_data->alignment,
 252                                        (u64)osd_data->length);
 253        for (i = 0; i < num_pages; i++) {
 254                struct page *page = osd_data->pages[i];
 255
 256                if (bytes < (int)PAGE_CACHE_SIZE) {
 257                        /* zero (remainder of) page */
 258                        int s = bytes < 0 ? 0 : bytes;
 259                        zero_user_segment(page, s, PAGE_CACHE_SIZE);
 260                }
 261                dout("finish_read %p uptodate %p idx %lu\n", inode, page,
 262                     page->index);
 263                flush_dcache_page(page);
 264                SetPageUptodate(page);
 265                unlock_page(page);
 266                page_cache_release(page);
 267                bytes -= PAGE_CACHE_SIZE;
 268        }
 269        kfree(osd_data->pages);
 270}
 271
 272static void ceph_unlock_page_vector(struct page **pages, int num_pages)
 273{
 274        int i;
 275
 276        for (i = 0; i < num_pages; i++)
 277                unlock_page(pages[i]);
 278}
 279
 280/*
 281 * start an async read(ahead) operation.  return nr_pages we submitted
 282 * a read for on success, or negative error code.
 283 */
 284static int start_read(struct inode *inode, struct list_head *page_list, int max)
 285{
 286        struct ceph_osd_client *osdc =
 287                &ceph_inode_to_client(inode)->client->osdc;
 288        struct ceph_inode_info *ci = ceph_inode(inode);
 289        struct page *page = list_entry(page_list->prev, struct page, lru);
 290        struct ceph_vino vino;
 291        struct ceph_osd_request *req;
 292        u64 off;
 293        u64 len;
 294        int i;
 295        struct page **pages;
 296        pgoff_t next_index;
 297        int nr_pages = 0;
 298        int ret;
 299
 300        off = (u64) page_offset(page);
 301
 302        /* count pages */
 303        next_index = page->index;
 304        list_for_each_entry_reverse(page, page_list, lru) {
 305                if (page->index != next_index)
 306                        break;
 307                nr_pages++;
 308                next_index++;
 309                if (max && nr_pages == max)
 310                        break;
 311        }
 312        len = nr_pages << PAGE_CACHE_SHIFT;
 313        dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
 314             off, len);
 315        vino = ceph_vino(inode);
 316        req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len,
 317                                    1, CEPH_OSD_OP_READ,
 318                                    CEPH_OSD_FLAG_READ, NULL,
 319                                    ci->i_truncate_seq, ci->i_truncate_size,
 320                                    false);
 321        if (IS_ERR(req))
 322                return PTR_ERR(req);
 323
 324        /* build page vector */
 325        nr_pages = calc_pages_for(0, len);
 326        pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS);
 327        ret = -ENOMEM;
 328        if (!pages)
 329                goto out;
 330        for (i = 0; i < nr_pages; ++i) {
 331                page = list_entry(page_list->prev, struct page, lru);
 332                BUG_ON(PageLocked(page));
 333                list_del(&page->lru);
 334                
 335                dout("start_read %p adding %p idx %lu\n", inode, page,
 336                     page->index);
 337                if (add_to_page_cache_lru(page, &inode->i_data, page->index,
 338                                          GFP_NOFS)) {
 339                        page_cache_release(page);
 340                        dout("start_read %p add_to_page_cache failed %p\n",
 341                             inode, page);
 342                        nr_pages = i;
 343                        goto out_pages;
 344                }
 345                pages[i] = page;
 346        }
 347        osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false);
 348        req->r_callback = finish_read;
 349        req->r_inode = inode;
 350
 351        ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
 352
 353        dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
 354        ret = ceph_osdc_start_request(osdc, req, false);
 355        if (ret < 0)
 356                goto out_pages;
 357        ceph_osdc_put_request(req);
 358        return nr_pages;
 359
 360out_pages:
 361        ceph_unlock_page_vector(pages, nr_pages);
 362        ceph_release_page_vector(pages, nr_pages);
 363out:
 364        ceph_osdc_put_request(req);
 365        return ret;
 366}
 367
 368
 369/*
 370 * Read multiple pages.  Leave pages we don't read + unlock in page_list;
 371 * the caller (VM) cleans them up.
 372 */
 373static int ceph_readpages(struct file *file, struct address_space *mapping,
 374                          struct list_head *page_list, unsigned nr_pages)
 375{
 376        struct inode *inode = file_inode(file);
 377        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 378        int rc = 0;
 379        int max = 0;
 380
 381        if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE)
 382                max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1)
 383                        >> PAGE_SHIFT;
 384
 385        dout("readpages %p file %p nr_pages %d max %d\n", inode,
 386                file, nr_pages,
 387             max);
 388        while (!list_empty(page_list)) {
 389                rc = start_read(inode, page_list, max);
 390                if (rc < 0)
 391                        goto out;
 392                BUG_ON(rc == 0);
 393        }
 394out:
 395        dout("readpages %p file %p ret %d\n", inode, file, rc);
 396        return rc;
 397}
 398
 399/*
 400 * Get ref for the oldest snapc for an inode with dirty data... that is, the
 401 * only snap context we are allowed to write back.
 402 */
 403static struct ceph_snap_context *get_oldest_context(struct inode *inode,
 404                                                    u64 *snap_size)
 405{
 406        struct ceph_inode_info *ci = ceph_inode(inode);
 407        struct ceph_snap_context *snapc = NULL;
 408        struct ceph_cap_snap *capsnap = NULL;
 409
 410        spin_lock(&ci->i_ceph_lock);
 411        list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
 412                dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
 413                     capsnap->context, capsnap->dirty_pages);
 414                if (capsnap->dirty_pages) {
 415                        snapc = ceph_get_snap_context(capsnap->context);
 416                        if (snap_size)
 417                                *snap_size = capsnap->size;
 418                        break;
 419                }
 420        }
 421        if (!snapc && ci->i_wrbuffer_ref_head) {
 422                snapc = ceph_get_snap_context(ci->i_head_snapc);
 423                dout(" head snapc %p has %d dirty pages\n",
 424                     snapc, ci->i_wrbuffer_ref_head);
 425        }
 426        spin_unlock(&ci->i_ceph_lock);
 427        return snapc;
 428}
 429
 430/*
 431 * Write a single page, but leave the page locked.
 432 *
 433 * If we get a write error, set the page error bit, but still adjust the
 434 * dirty page accounting (i.e., page is no longer dirty).
 435 */
 436static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 437{
 438        struct inode *inode;
 439        struct ceph_inode_info *ci;
 440        struct ceph_fs_client *fsc;
 441        struct ceph_osd_client *osdc;
 442        struct ceph_snap_context *snapc, *oldest;
 443        loff_t page_off = page_offset(page);
 444        long writeback_stat;
 445        u64 truncate_size, snap_size = 0;
 446        u32 truncate_seq;
 447        int err = 0, len = PAGE_CACHE_SIZE;
 448
 449        dout("writepage %p idx %lu\n", page, page->index);
 450
 451        if (!page->mapping || !page->mapping->host) {
 452                dout("writepage %p - no mapping\n", page);
 453                return -EFAULT;
 454        }
 455        inode = page->mapping->host;
 456        ci = ceph_inode(inode);
 457        fsc = ceph_inode_to_client(inode);
 458        osdc = &fsc->client->osdc;
 459
 460        /* verify this is a writeable snap context */
 461        snapc = page_snap_context(page);
 462        if (snapc == NULL) {
 463                dout("writepage %p page %p not dirty?\n", inode, page);
 464                goto out;
 465        }
 466        oldest = get_oldest_context(inode, &snap_size);
 467        if (snapc->seq > oldest->seq) {
 468                dout("writepage %p page %p snapc %p not writeable - noop\n",
 469                     inode, page, snapc);
 470                /* we should only noop if called by kswapd */
 471                WARN_ON((current->flags & PF_MEMALLOC) == 0);
 472                ceph_put_snap_context(oldest);
 473                goto out;
 474        }
 475        ceph_put_snap_context(oldest);
 476
 477        spin_lock(&ci->i_ceph_lock);
 478        truncate_seq = ci->i_truncate_seq;
 479        truncate_size = ci->i_truncate_size;
 480        if (!snap_size)
 481                snap_size = i_size_read(inode);
 482        spin_unlock(&ci->i_ceph_lock);
 483
 484        /* is this a partial page at end of file? */
 485        if (page_off >= snap_size) {
 486                dout("%p page eof %llu\n", page, snap_size);
 487                goto out;
 488        }
 489        if (snap_size < page_off + len)
 490                len = snap_size - page_off;
 491
 492        dout("writepage %p page %p index %lu on %llu~%u snapc %p\n",
 493             inode, page, page->index, page_off, len, snapc);
 494
 495        writeback_stat = atomic_long_inc_return(&fsc->writeback_count);
 496        if (writeback_stat >
 497            CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
 498                set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC);
 499
 500        set_page_writeback(page);
 501        err = ceph_osdc_writepages(osdc, ceph_vino(inode),
 502                                   &ci->i_layout, snapc,
 503                                   page_off, len,
 504                                   truncate_seq, truncate_size,
 505                                   &inode->i_mtime, &page, 1);
 506        if (err < 0) {
 507                dout("writepage setting page/mapping error %d %p\n", err, page);
 508                SetPageError(page);
 509                mapping_set_error(&inode->i_data, err);
 510                if (wbc)
 511                        wbc->pages_skipped++;
 512        } else {
 513                dout("writepage cleaned page %p\n", page);
 514                err = 0;  /* vfs expects us to return 0 */
 515        }
 516        page->private = 0;
 517        ClearPagePrivate(page);
 518        end_page_writeback(page);
 519        ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
 520        ceph_put_snap_context(snapc);  /* page's reference */
 521out:
 522        return err;
 523}
 524
 525static int ceph_writepage(struct page *page, struct writeback_control *wbc)
 526{
 527        int err;
 528        struct inode *inode = page->mapping->host;
 529        BUG_ON(!inode);
 530        ihold(inode);
 531        err = writepage_nounlock(page, wbc);
 532        unlock_page(page);
 533        iput(inode);
 534        return err;
 535}
 536
 537
 538/*
 539 * lame release_pages helper.  release_pages() isn't exported to
 540 * modules.
 541 */
 542static void ceph_release_pages(struct page **pages, int num)
 543{
 544        struct pagevec pvec;
 545        int i;
 546
 547        pagevec_init(&pvec, 0);
 548        for (i = 0; i < num; i++) {
 549                if (pagevec_add(&pvec, pages[i]) == 0)
 550                        pagevec_release(&pvec);
 551        }
 552        pagevec_release(&pvec);
 553}
 554
 555
 556/*
 557 * async writeback completion handler.
 558 *
 559 * If we get an error, set the mapping error bit, but not the individual
 560 * page error bits.
 561 */
 562static void writepages_finish(struct ceph_osd_request *req,
 563                              struct ceph_msg *msg)
 564{
 565        struct inode *inode = req->r_inode;
 566        struct ceph_inode_info *ci = ceph_inode(inode);
 567        struct ceph_osd_data *osd_data;
 568        unsigned wrote;
 569        struct page *page;
 570        int num_pages;
 571        int i;
 572        struct ceph_snap_context *snapc = req->r_snapc;
 573        struct address_space *mapping = inode->i_mapping;
 574        int rc = req->r_result;
 575        u64 bytes = req->r_ops[0].extent.length;
 576        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 577        long writeback_stat;
 578        unsigned issued = ceph_caps_issued(ci);
 579
 580        osd_data = osd_req_op_extent_osd_data(req, 0);
 581        BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
 582        num_pages = calc_pages_for((u64)osd_data->alignment,
 583                                        (u64)osd_data->length);
 584        if (rc >= 0) {
 585                /*
 586                 * Assume we wrote the pages we originally sent.  The
 587                 * osd might reply with fewer pages if our writeback
 588                 * raced with a truncation and was adjusted at the osd,
 589                 * so don't believe the reply.
 590                 */
 591                wrote = num_pages;
 592        } else {
 593                wrote = 0;
 594                mapping_set_error(mapping, rc);
 595        }
 596        dout("writepages_finish %p rc %d bytes %llu wrote %d (pages)\n",
 597             inode, rc, bytes, wrote);
 598
 599        /* clean all pages */
 600        for (i = 0; i < num_pages; i++) {
 601                page = osd_data->pages[i];
 602                BUG_ON(!page);
 603                WARN_ON(!PageUptodate(page));
 604
 605                writeback_stat =
 606                        atomic_long_dec_return(&fsc->writeback_count);
 607                if (writeback_stat <
 608                    CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
 609                        clear_bdi_congested(&fsc->backing_dev_info,
 610                                            BLK_RW_ASYNC);
 611
 612                ceph_put_snap_context(page_snap_context(page));
 613                page->private = 0;
 614                ClearPagePrivate(page);
 615                dout("unlocking %d %p\n", i, page);
 616                end_page_writeback(page);
 617
 618                /*
 619                 * We lost the cache cap, need to truncate the page before
 620                 * it is unlocked, otherwise we'd truncate it later in the
 621                 * page truncation thread, possibly losing some data that
 622                 * raced its way in
 623                 */
 624                if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
 625                        generic_error_remove_page(inode->i_mapping, page);
 626
 627                unlock_page(page);
 628        }
 629        dout("%p wrote+cleaned %d pages\n", inode, wrote);
 630        ceph_put_wrbuffer_cap_refs(ci, num_pages, snapc);
 631
 632        ceph_release_pages(osd_data->pages, num_pages);
 633        if (osd_data->pages_from_pool)
 634                mempool_free(osd_data->pages,
 635                             ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
 636        else
 637                kfree(osd_data->pages);
 638        ceph_osdc_put_request(req);
 639}
 640
 641/*
 642 * initiate async writeback
 643 */
 644static int ceph_writepages_start(struct address_space *mapping,
 645                                 struct writeback_control *wbc)
 646{
 647        struct inode *inode = mapping->host;
 648        struct ceph_inode_info *ci = ceph_inode(inode);
 649        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 650        struct ceph_vino vino = ceph_vino(inode);
 651        pgoff_t index, start, end;
 652        int range_whole = 0;
 653        int should_loop = 1;
 654        pgoff_t max_pages = 0, max_pages_ever = 0;
 655        struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
 656        struct pagevec pvec;
 657        int done = 0;
 658        int rc = 0;
 659        unsigned wsize = 1 << inode->i_blkbits;
 660        struct ceph_osd_request *req = NULL;
 661        int do_sync;
 662        u64 truncate_size, snap_size;
 663        u32 truncate_seq;
 664
 665        /*
 666         * Include a 'sync' in the OSD request if this is a data
 667         * integrity write (e.g., O_SYNC write or fsync()), or if our
 668         * cap is being revoked.
 669         */
 670        if ((wbc->sync_mode == WB_SYNC_ALL) ||
 671                ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER))
 672                do_sync = 1;
 673        dout("writepages_start %p dosync=%d (mode=%s)\n",
 674             inode, do_sync,
 675             wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
 676             (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
 677
 678        if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) {
 679                pr_warning("writepage_start %p on forced umount\n", inode);
 680                return -EIO; /* we're in a forced umount, don't write! */
 681        }
 682        if (fsc->mount_options->wsize && fsc->mount_options->wsize < wsize)
 683                wsize = fsc->mount_options->wsize;
 684        if (wsize < PAGE_CACHE_SIZE)
 685                wsize = PAGE_CACHE_SIZE;
 686        max_pages_ever = wsize >> PAGE_CACHE_SHIFT;
 687
 688        pagevec_init(&pvec, 0);
 689
 690        /* where to start/end? */
 691        if (wbc->range_cyclic) {
 692                start = mapping->writeback_index; /* Start from prev offset */
 693                end = -1;
 694                dout(" cyclic, start at %lu\n", start);
 695        } else {
 696                start = wbc->range_start >> PAGE_CACHE_SHIFT;
 697                end = wbc->range_end >> PAGE_CACHE_SHIFT;
 698                if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
 699                        range_whole = 1;
 700                should_loop = 0;
 701                dout(" not cyclic, %lu to %lu\n", start, end);
 702        }
 703        index = start;
 704
 705retry:
 706        /* find oldest snap context with dirty data */
 707        ceph_put_snap_context(snapc);
 708        snap_size = 0;
 709        snapc = get_oldest_context(inode, &snap_size);
 710        if (!snapc) {
 711                /* hmm, why does writepages get called when there
 712                   is no dirty data? */
 713                dout(" no snap context with dirty data?\n");
 714                goto out;
 715        }
 716        if (snap_size == 0)
 717                snap_size = i_size_read(inode);
 718        dout(" oldest snapc is %p seq %lld (%d snaps)\n",
 719             snapc, snapc->seq, snapc->num_snaps);
 720
 721        spin_lock(&ci->i_ceph_lock);
 722        truncate_seq = ci->i_truncate_seq;
 723        truncate_size = ci->i_truncate_size;
 724        if (!snap_size)
 725                snap_size = i_size_read(inode);
 726        spin_unlock(&ci->i_ceph_lock);
 727
 728        if (last_snapc && snapc != last_snapc) {
 729                /* if we switched to a newer snapc, restart our scan at the
 730                 * start of the original file range. */
 731                dout("  snapc differs from last pass, restarting at %lu\n",
 732                     index);
 733                index = start;
 734        }
 735        last_snapc = snapc;
 736
 737        while (!done && index <= end) {
 738                int num_ops = do_sync ? 2 : 1;
 739                unsigned i;
 740                int first;
 741                pgoff_t next;
 742                int pvec_pages, locked_pages;
 743                struct page **pages = NULL;
 744                mempool_t *pool = NULL; /* Becomes non-null if mempool used */
 745                struct page *page;
 746                int want;
 747                u64 offset, len;
 748                long writeback_stat;
 749
 750                next = 0;
 751                locked_pages = 0;
 752                max_pages = max_pages_ever;
 753
 754get_more_pages:
 755                first = -1;
 756                want = min(end - index,
 757                           min((pgoff_t)PAGEVEC_SIZE,
 758                               max_pages - (pgoff_t)locked_pages) - 1)
 759                        + 1;
 760                pvec_pages = pagevec_lookup_tag(&pvec, mapping, &index,
 761                                                PAGECACHE_TAG_DIRTY,
 762                                                want);
 763                dout("pagevec_lookup_tag got %d\n", pvec_pages);
 764                if (!pvec_pages && !locked_pages)
 765                        break;
 766                for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) {
 767                        page = pvec.pages[i];
 768                        dout("? %p idx %lu\n", page, page->index);
 769                        if (locked_pages == 0)
 770                                lock_page(page);  /* first page */
 771                        else if (!trylock_page(page))
 772                                break;
 773
 774                        /* only dirty pages, or our accounting breaks */
 775                        if (unlikely(!PageDirty(page)) ||
 776                            unlikely(page->mapping != mapping)) {
 777                                dout("!dirty or !mapping %p\n", page);
 778                                unlock_page(page);
 779                                break;
 780                        }
 781                        if (!wbc->range_cyclic && page->index > end) {
 782                                dout("end of range %p\n", page);
 783                                done = 1;
 784                                unlock_page(page);
 785                                break;
 786                        }
 787                        if (next && (page->index != next)) {
 788                                dout("not consecutive %p\n", page);
 789                                unlock_page(page);
 790                                break;
 791                        }
 792                        if (wbc->sync_mode != WB_SYNC_NONE) {
 793                                dout("waiting on writeback %p\n", page);
 794                                wait_on_page_writeback(page);
 795                        }
 796                        if (page_offset(page) >= snap_size) {
 797                                dout("%p page eof %llu\n", page, snap_size);
 798                                done = 1;
 799                                unlock_page(page);
 800                                break;
 801                        }
 802                        if (PageWriteback(page)) {
 803                                dout("%p under writeback\n", page);
 804                                unlock_page(page);
 805                                break;
 806                        }
 807
 808                        /* only if matching snap context */
 809                        pgsnapc = page_snap_context(page);
 810                        if (pgsnapc->seq > snapc->seq) {
 811                                dout("page snapc %p %lld > oldest %p %lld\n",
 812                                     pgsnapc, pgsnapc->seq, snapc, snapc->seq);
 813                                unlock_page(page);
 814                                if (!locked_pages)
 815                                        continue; /* keep looking for snap */
 816                                break;
 817                        }
 818
 819                        if (!clear_page_dirty_for_io(page)) {
 820                                dout("%p !clear_page_dirty_for_io\n", page);
 821                                unlock_page(page);
 822                                break;
 823                        }
 824
 825                        /*
 826                         * We have something to write.  If this is
 827                         * the first locked page this time through,
 828                         * allocate an osd request and a page array
 829                         * that it will use.
 830                         */
 831                        if (locked_pages == 0) {
 832                                BUG_ON(pages);
 833                                /* prepare async write request */
 834                                offset = (u64)page_offset(page);
 835                                len = wsize;
 836                                req = ceph_osdc_new_request(&fsc->client->osdc,
 837                                                        &ci->i_layout, vino,
 838                                                        offset, &len, num_ops,
 839                                                        CEPH_OSD_OP_WRITE,
 840                                                        CEPH_OSD_FLAG_WRITE |
 841                                                        CEPH_OSD_FLAG_ONDISK,
 842                                                        snapc, truncate_seq,
 843                                                        truncate_size, true);
 844                                if (IS_ERR(req)) {
 845                                        rc = PTR_ERR(req);
 846                                        unlock_page(page);
 847                                        break;
 848                                }
 849
 850                                req->r_callback = writepages_finish;
 851                                req->r_inode = inode;
 852
 853                                max_pages = calc_pages_for(0, (u64)len);
 854                                pages = kmalloc(max_pages * sizeof (*pages),
 855                                                GFP_NOFS);
 856                                if (!pages) {
 857                                        pool = fsc->wb_pagevec_pool;
 858                                        pages = mempool_alloc(pool, GFP_NOFS);
 859                                        BUG_ON(!pages);
 860                                }
 861                        }
 862
 863                        /* note position of first page in pvec */
 864                        if (first < 0)
 865                                first = i;
 866                        dout("%p will write page %p idx %lu\n",
 867                             inode, page, page->index);
 868
 869                        writeback_stat =
 870                               atomic_long_inc_return(&fsc->writeback_count);
 871                        if (writeback_stat > CONGESTION_ON_THRESH(
 872                                    fsc->mount_options->congestion_kb)) {
 873                                set_bdi_congested(&fsc->backing_dev_info,
 874                                                  BLK_RW_ASYNC);
 875                        }
 876
 877                        set_page_writeback(page);
 878                        pages[locked_pages] = page;
 879                        locked_pages++;
 880                        next = page->index + 1;
 881                }
 882
 883                /* did we get anything? */
 884                if (!locked_pages)
 885                        goto release_pvec_pages;
 886                if (i) {
 887                        int j;
 888                        BUG_ON(!locked_pages || first < 0);
 889
 890                        if (pvec_pages && i == pvec_pages &&
 891                            locked_pages < max_pages) {
 892                                dout("reached end pvec, trying for more\n");
 893                                pagevec_reinit(&pvec);
 894                                goto get_more_pages;
 895                        }
 896
 897                        /* shift unused pages over in the pvec...  we
 898                         * will need to release them below. */
 899                        for (j = i; j < pvec_pages; j++) {
 900                                dout(" pvec leftover page %p\n",
 901                                     pvec.pages[j]);
 902                                pvec.pages[j-i+first] = pvec.pages[j];
 903                        }
 904                        pvec.nr -= i-first;
 905                }
 906
 907                /* Format the osd request message and submit the write */
 908
 909                offset = page_offset(pages[0]);
 910                len = min(snap_size - offset,
 911                          (u64)locked_pages << PAGE_CACHE_SHIFT);
 912                dout("writepages got %d pages at %llu~%llu\n",
 913                     locked_pages, offset, len);
 914
 915                osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
 916                                                        !!pool, false);
 917
 918                pages = NULL;   /* request message now owns the pages array */
 919                pool = NULL;
 920
 921                /* Update the write op length in case we changed it */
 922
 923                osd_req_op_extent_update(req, 0, len);
 924
 925                vino = ceph_vino(inode);
 926                ceph_osdc_build_request(req, offset, snapc, vino.snap,
 927                                        &inode->i_mtime);
 928
 929                rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
 930                BUG_ON(rc);
 931                req = NULL;
 932
 933                /* continue? */
 934                index = next;
 935                wbc->nr_to_write -= locked_pages;
 936                if (wbc->nr_to_write <= 0)
 937                        done = 1;
 938
 939release_pvec_pages:
 940                dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
 941                     pvec.nr ? pvec.pages[0] : NULL);
 942                pagevec_release(&pvec);
 943
 944                if (locked_pages && !done)
 945                        goto retry;
 946        }
 947
 948        if (should_loop && !done) {
 949                /* more to do; loop back to beginning of file */
 950                dout("writepages looping back to beginning of file\n");
 951                should_loop = 0;
 952                index = 0;
 953                goto retry;
 954        }
 955
 956        if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0))
 957                mapping->writeback_index = index;
 958
 959out:
 960        if (req)
 961                ceph_osdc_put_request(req);
 962        ceph_put_snap_context(snapc);
 963        dout("writepages done, rc = %d\n", rc);
 964        return rc;
 965}
 966
 967
 968
 969/*
 970 * See if a given @snapc is either writeable, or already written.
 971 */
 972static int context_is_writeable_or_written(struct inode *inode,
 973                                           struct ceph_snap_context *snapc)
 974{
 975        struct ceph_snap_context *oldest = get_oldest_context(inode, NULL);
 976        int ret = !oldest || snapc->seq <= oldest->seq;
 977
 978        ceph_put_snap_context(oldest);
 979        return ret;
 980}
 981
 982/*
 983 * We are only allowed to write into/dirty the page if the page is
 984 * clean, or already dirty within the same snap context.
 985 *
 986 * called with page locked.
 987 * return success with page locked,
 988 * or any failure (incl -EAGAIN) with page unlocked.
 989 */
 990static int ceph_update_writeable_page(struct file *file,
 991                            loff_t pos, unsigned len,
 992                            struct page *page)
 993{
 994        struct inode *inode = file_inode(file);
 995        struct ceph_inode_info *ci = ceph_inode(inode);
 996        struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
 997        loff_t page_off = pos & PAGE_CACHE_MASK;
 998        int pos_in_page = pos & ~PAGE_CACHE_MASK;
 999        int end_in_page = pos_in_page + len;
1000        loff_t i_size;
1001        int r;
1002        struct ceph_snap_context *snapc, *oldest;
1003
1004retry_locked:
1005        /* writepages currently holds page lock, but if we change that later, */
1006        wait_on_page_writeback(page);
1007
1008        /* check snap context */
1009        BUG_ON(!ci->i_snap_realm);
1010        down_read(&mdsc->snap_rwsem);
1011        BUG_ON(!ci->i_snap_realm->cached_context);
1012        snapc = page_snap_context(page);
1013        if (snapc && snapc != ci->i_head_snapc) {
1014                /*
1015                 * this page is already dirty in another (older) snap
1016                 * context!  is it writeable now?
1017                 */
1018                oldest = get_oldest_context(inode, NULL);
1019                up_read(&mdsc->snap_rwsem);
1020
1021                if (snapc->seq > oldest->seq) {
1022                        ceph_put_snap_context(oldest);
1023                        dout(" page %p snapc %p not current or oldest\n",
1024                             page, snapc);
1025                        /*
1026                         * queue for writeback, and wait for snapc to
1027                         * be writeable or written
1028                         */
1029                        snapc = ceph_get_snap_context(snapc);
1030                        unlock_page(page);
1031                        ceph_queue_writeback(inode);
1032                        r = wait_event_interruptible(ci->i_cap_wq,
1033                               context_is_writeable_or_written(inode, snapc));
1034                        ceph_put_snap_context(snapc);
1035                        if (r == -ERESTARTSYS)
1036                                return r;
1037                        return -EAGAIN;
1038                }
1039                ceph_put_snap_context(oldest);
1040
1041                /* yay, writeable, do it now (without dropping page lock) */
1042                dout(" page %p snapc %p not current, but oldest\n",
1043                     page, snapc);
1044                if (!clear_page_dirty_for_io(page))
1045                        goto retry_locked;
1046                r = writepage_nounlock(page, NULL);
1047                if (r < 0)
1048                        goto fail_nosnap;
1049                goto retry_locked;
1050        }
1051
1052        if (PageUptodate(page)) {
1053                dout(" page %p already uptodate\n", page);
1054                return 0;
1055        }
1056
1057        /* full page? */
1058        if (pos_in_page == 0 && len == PAGE_CACHE_SIZE)
1059                return 0;
1060
1061        /* past end of file? */
1062        i_size = inode->i_size;   /* caller holds i_mutex */
1063
1064        if (i_size + len > inode->i_sb->s_maxbytes) {
1065                /* file is too big */
1066                r = -EINVAL;
1067                goto fail;
1068        }
1069
1070        if (page_off >= i_size ||
1071            (pos_in_page == 0 && (pos+len) >= i_size &&
1072             end_in_page - pos_in_page != PAGE_CACHE_SIZE)) {
1073                dout(" zeroing %p 0 - %d and %d - %d\n",
1074                     page, pos_in_page, end_in_page, (int)PAGE_CACHE_SIZE);
1075                zero_user_segments(page,
1076                                   0, pos_in_page,
1077                                   end_in_page, PAGE_CACHE_SIZE);
1078                return 0;
1079        }
1080
1081        /* we need to read it. */
1082        up_read(&mdsc->snap_rwsem);
1083        r = readpage_nounlock(file, page);
1084        if (r < 0)
1085                goto fail_nosnap;
1086        goto retry_locked;
1087
1088fail:
1089        up_read(&mdsc->snap_rwsem);
1090fail_nosnap:
1091        unlock_page(page);
1092        return r;
1093}
1094
1095/*
1096 * We are only allowed to write into/dirty the page if the page is
1097 * clean, or already dirty within the same snap context.
1098 */
1099static int ceph_write_begin(struct file *file, struct address_space *mapping,
1100                            loff_t pos, unsigned len, unsigned flags,
1101                            struct page **pagep, void **fsdata)
1102{
1103        struct inode *inode = file_inode(file);
1104        struct page *page;
1105        pgoff_t index = pos >> PAGE_CACHE_SHIFT;
1106        int r;
1107
1108        do {
1109                /* get a page */
1110                page = grab_cache_page_write_begin(mapping, index, 0);
1111                if (!page)
1112                        return -ENOMEM;
1113                *pagep = page;
1114
1115                dout("write_begin file %p inode %p page %p %d~%d\n", file,
1116                     inode, page, (int)pos, (int)len);
1117
1118                r = ceph_update_writeable_page(file, pos, len, page);
1119        } while (r == -EAGAIN);
1120
1121        return r;
1122}
1123
1124/*
1125 * we don't do anything in here that simple_write_end doesn't do
1126 * except adjust dirty page accounting and drop read lock on
1127 * mdsc->snap_rwsem.
1128 */
1129static int ceph_write_end(struct file *file, struct address_space *mapping,
1130                          loff_t pos, unsigned len, unsigned copied,
1131                          struct page *page, void *fsdata)
1132{
1133        struct inode *inode = file_inode(file);
1134        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1135        struct ceph_mds_client *mdsc = fsc->mdsc;
1136        unsigned from = pos & (PAGE_CACHE_SIZE - 1);
1137        int check_cap = 0;
1138
1139        dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
1140             inode, page, (int)pos, (int)copied, (int)len);
1141
1142        /* zero the stale part of the page if we did a short copy */
1143        if (copied < len)
1144                zero_user_segment(page, from+copied, len);
1145
1146        /* did file size increase? */
1147        /* (no need for i_size_read(); we caller holds i_mutex */
1148        if (pos+copied > inode->i_size)
1149                check_cap = ceph_inode_set_size(inode, pos+copied);
1150
1151        if (!PageUptodate(page))
1152                SetPageUptodate(page);
1153
1154        set_page_dirty(page);
1155
1156        unlock_page(page);
1157        up_read(&mdsc->snap_rwsem);
1158        page_cache_release(page);
1159
1160        if (check_cap)
1161                ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
1162
1163        return copied;
1164}
1165
1166/*
1167 * we set .direct_IO to indicate direct io is supported, but since we
1168 * intercept O_DIRECT reads and writes early, this function should
1169 * never get called.
1170 */
1171static ssize_t ceph_direct_io(int rw, struct kiocb *iocb,
1172                              const struct iovec *iov,
1173                              loff_t pos, unsigned long nr_segs)
1174{
1175        WARN_ON(1);
1176        return -EINVAL;
1177}
1178
1179const struct address_space_operations ceph_aops = {
1180        .readpage = ceph_readpage,
1181        .readpages = ceph_readpages,
1182        .writepage = ceph_writepage,
1183        .writepages = ceph_writepages_start,
1184        .write_begin = ceph_write_begin,
1185        .write_end = ceph_write_end,
1186        .set_page_dirty = ceph_set_page_dirty,
1187        .invalidatepage = ceph_invalidatepage,
1188        .releasepage = ceph_releasepage,
1189        .direct_IO = ceph_direct_io,
1190};
1191
1192
1193/*
1194 * vm ops
1195 */
1196
1197/*
1198 * Reuse write_begin here for simplicity.
1199 */
1200static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1201{
1202        struct inode *inode = file_inode(vma->vm_file);
1203        struct page *page = vmf->page;
1204        struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
1205        loff_t off = page_offset(page);
1206        loff_t size, len;
1207        int ret;
1208
1209        /* Update time before taking page lock */
1210        file_update_time(vma->vm_file);
1211
1212        size = i_size_read(inode);
1213        if (off + PAGE_CACHE_SIZE <= size)
1214                len = PAGE_CACHE_SIZE;
1215        else
1216                len = size & ~PAGE_CACHE_MASK;
1217
1218        dout("page_mkwrite %p %llu~%llu page %p idx %lu\n", inode,
1219             off, len, page, page->index);
1220
1221        lock_page(page);
1222
1223        ret = VM_FAULT_NOPAGE;
1224        if ((off > size) ||
1225            (page->mapping != inode->i_mapping))
1226                goto out;
1227
1228        ret = ceph_update_writeable_page(vma->vm_file, off, len, page);
1229        if (ret == 0) {
1230                /* success.  we'll keep the page locked. */
1231                set_page_dirty(page);
1232                up_read(&mdsc->snap_rwsem);
1233                ret = VM_FAULT_LOCKED;
1234        } else {
1235                if (ret == -ENOMEM)
1236                        ret = VM_FAULT_OOM;
1237                else
1238                        ret = VM_FAULT_SIGBUS;
1239        }
1240out:
1241        dout("page_mkwrite %p %llu~%llu = %d\n", inode, off, len, ret);
1242        if (ret != VM_FAULT_LOCKED)
1243                unlock_page(page);
1244        return ret;
1245}
1246
1247static struct vm_operations_struct ceph_vmops = {
1248        .fault          = filemap_fault,
1249        .page_mkwrite   = ceph_page_mkwrite,
1250        .remap_pages    = generic_file_remap_pages,
1251};
1252
1253int ceph_mmap(struct file *file, struct vm_area_struct *vma)
1254{
1255        struct address_space *mapping = file->f_mapping;
1256
1257        if (!mapping->a_ops->readpage)
1258                return -ENOEXEC;
1259        file_accessed(file);
1260        vma->vm_ops = &ceph_vmops;
1261        return 0;
1262}
1263