linux/fs/ceph/addr.c
<<
>>
Prefs
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/backing-dev.h>
   4#include <linux/fs.h>
   5#include <linux/mm.h>
   6#include <linux/pagemap.h>
   7#include <linux/writeback.h>    /* generic_writepages */
   8#include <linux/slab.h>
   9#include <linux/pagevec.h>
  10#include <linux/task_io_accounting_ops.h>
  11
  12#include "super.h"
  13#include "mds_client.h"
  14#include <linux/ceph/osd_client.h>
  15
  16/*
  17 * Ceph address space ops.
  18 *
  19 * There are a few funny things going on here.
  20 *
  21 * The page->private field is used to reference a struct
  22 * ceph_snap_context for _every_ dirty page.  This indicates which
  23 * snapshot the page was logically dirtied in, and thus which snap
  24 * context needs to be associated with the osd write during writeback.
  25 *
  26 * Similarly, struct ceph_inode_info maintains a set of counters to
  27 * count dirty pages on the inode.  In the absence of snapshots,
  28 * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count.
  29 *
  30 * When a snapshot is taken (that is, when the client receives
  31 * notification that a snapshot was taken), each inode with caps and
  32 * with dirty pages (dirty pages implies there is a cap) gets a new
  33 * ceph_cap_snap in the i_cap_snaps list (which is sorted in ascending
  34 * order, new snaps go to the tail).  The i_wrbuffer_ref_head count is
  35 * moved to capsnap->dirty. (Unless a sync write is currently in
  36 * progress.  In that case, the capsnap is said to be "pending", new
  37 * writes cannot start, and the capsnap isn't "finalized" until the
  38 * write completes (or fails) and a final size/mtime for the inode for
  39 * that snap can be settled upon.)  i_wrbuffer_ref_head is reset to 0.
  40 *
  41 * On writeback, we must submit writes to the osd IN SNAP ORDER.  So,
  42 * we look for the first capsnap in i_cap_snaps and write out pages in
  43 * that snap context _only_.  Then we move on to the next capsnap,
  44 * eventually reaching the "live" or "head" context (i.e., pages that
  45 * are not yet snapped) and are writing the most recently dirtied
  46 * pages.
  47 *
  48 * Invalidate and so forth must take care to ensure the dirty page
  49 * accounting is preserved.
  50 */
  51
  52#define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10))
  53#define CONGESTION_OFF_THRESH(congestion_kb)                            \
  54        (CONGESTION_ON_THRESH(congestion_kb) -                          \
  55         (CONGESTION_ON_THRESH(congestion_kb) >> 2))
  56
  57static inline struct ceph_snap_context *page_snap_context(struct page *page)
  58{
  59        if (PagePrivate(page))
  60                return (void *)page->private;
  61        return NULL;
  62}
  63
  64/*
  65 * Dirty a page.  Optimistically adjust accounting, on the assumption
  66 * that we won't race with invalidate.  If we do, readjust.
  67 */
  68static int ceph_set_page_dirty(struct page *page)
  69{
  70        struct address_space *mapping = page->mapping;
  71        struct inode *inode;
  72        struct ceph_inode_info *ci;
  73        struct ceph_snap_context *snapc;
  74        int ret;
  75
  76        if (unlikely(!mapping))
  77                return !TestSetPageDirty(page);
  78
  79        if (PageDirty(page)) {
  80                dout("%p set_page_dirty %p idx %lu -- already dirty\n",
  81                     mapping->host, page, page->index);
  82                BUG_ON(!PagePrivate(page));
  83                return 0;
  84        }
  85
  86        inode = mapping->host;
  87        ci = ceph_inode(inode);
  88
  89        /* dirty the head */
  90        spin_lock(&ci->i_ceph_lock);
  91        BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference
  92        if (__ceph_have_pending_cap_snap(ci)) {
  93                struct ceph_cap_snap *capsnap =
  94                                list_last_entry(&ci->i_cap_snaps,
  95                                                struct ceph_cap_snap,
  96                                                ci_item);
  97                snapc = ceph_get_snap_context(capsnap->context);
  98                capsnap->dirty_pages++;
  99        } else {
 100                BUG_ON(!ci->i_head_snapc);
 101                snapc = ceph_get_snap_context(ci->i_head_snapc);
 102                ++ci->i_wrbuffer_ref_head;
 103        }
 104        if (ci->i_wrbuffer_ref == 0)
 105                ihold(inode);
 106        ++ci->i_wrbuffer_ref;
 107        dout("%p set_page_dirty %p idx %lu head %d/%d -> %d/%d "
 108             "snapc %p seq %lld (%d snaps)\n",
 109             mapping->host, page, page->index,
 110             ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1,
 111             ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
 112             snapc, snapc->seq, snapc->num_snaps);
 113        spin_unlock(&ci->i_ceph_lock);
 114
 115        /*
 116         * Reference snap context in page->private.  Also set
 117         * PagePrivate so that we get invalidatepage callback.
 118         */
 119        BUG_ON(PagePrivate(page));
 120        page->private = (unsigned long)snapc;
 121        SetPagePrivate(page);
 122
 123        ret = __set_page_dirty_nobuffers(page);
 124        WARN_ON(!PageLocked(page));
 125        WARN_ON(!page->mapping);
 126
 127        return ret;
 128}
 129
 130/*
 131 * If we are truncating the full page (i.e. offset == 0), adjust the
 132 * dirty page counters appropriately.  Only called if there is private
 133 * data on the page.
 134 */
 135static void ceph_invalidatepage(struct page *page, unsigned long offset)
 136{
 137        struct inode *inode;
 138        struct ceph_inode_info *ci;
 139        struct ceph_snap_context *snapc = page_snap_context(page);
 140
 141        inode = page->mapping->host;
 142        ci = ceph_inode(inode);
 143
 144        if (offset != 0) {
 145                dout("%p invalidatepage %p idx %lu partial dirty page %lu\n",
 146                     inode, page, page->index, offset);
 147                return;
 148        }
 149
 150        WARN_ON(!PageLocked(page));
 151        if (!PagePrivate(page))
 152                return;
 153
 154        ClearPageChecked(page);
 155
 156        dout("%p invalidatepage %p idx %lu full dirty page\n",
 157             inode, page, page->index);
 158
 159        ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
 160        ceph_put_snap_context(snapc);
 161        page->private = 0;
 162        ClearPagePrivate(page);
 163}
 164
 165/* just a sanity check */
 166static int ceph_releasepage(struct page *page, gfp_t g)
 167{
 168        dout("%p releasepage %p idx %lu (%sdirty)\n", page->mapping->host,
 169             page, page->index, PageDirty(page) ? "" : "not ");
 170
 171        return !PagePrivate(page);
 172}
 173
 174/*
 175 * read a single page, without unlocking it.
 176 */
 177static int readpage_nounlock(struct file *filp, struct page *page)
 178{
 179        struct inode *inode = file_inode(filp);
 180        struct ceph_inode_info *ci = ceph_inode(inode);
 181        struct ceph_osd_client *osdc = 
 182                &ceph_inode_to_client(inode)->client->osdc;
 183        int err = 0;
 184        u64 off = page_offset(page);
 185        u64 len = PAGE_CACHE_SIZE;
 186
 187        if (off >= i_size_read(inode)) {
 188                zero_user_segment(page, 0, PAGE_CACHE_SIZE);
 189                SetPageUptodate(page);
 190                return 0;
 191        }
 192
 193        if (ci->i_inline_version != CEPH_INLINE_NONE) {
 194                /*
 195                 * Uptodate inline data should have been added
 196                 * into page cache while getting Fcr caps.
 197                 */
 198                if (off == 0)
 199                        return -EINVAL;
 200                zero_user_segment(page, 0, PAGE_CACHE_SIZE);
 201                SetPageUptodate(page);
 202                return 0;
 203        }
 204
 205        dout("readpage inode %p file %p page %p index %lu\n",
 206             inode, filp, page, page->index);
 207        err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
 208                                  off, &len,
 209                                  ci->i_truncate_seq, ci->i_truncate_size,
 210                                  &page, 1, 0);
 211        if (err == -ENOENT)
 212                err = 0;
 213        if (err < 0) {
 214                SetPageError(page);
 215                goto out;
 216        }
 217
 218        if (err < PAGE_CACHE_SIZE)
 219                /* zero fill remainder of page */
 220                zero_user_segment(page, err, PAGE_CACHE_SIZE);
 221        else
 222                flush_dcache_page(page);
 223
 224        SetPageUptodate(page);
 225out:
 226        return err < 0 ? err : 0;
 227}
 228
 229static int ceph_readpage(struct file *filp, struct page *page)
 230{
 231        int r = readpage_nounlock(filp, page);
 232        unlock_page(page);
 233        return r;
 234}
 235
 236/*
 237 * Finish an async read(ahead) op.
 238 */
 239static void finish_read(struct ceph_osd_request *req)
 240{
 241        struct inode *inode = req->r_inode;
 242        struct ceph_osd_data *osd_data;
 243        int rc = req->r_result <= 0 ? req->r_result : 0;
 244        int bytes = req->r_result >= 0 ? req->r_result : 0;
 245        int num_pages;
 246        int i;
 247
 248        dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
 249
 250        /* unlock all pages, zeroing any data we didn't read */
 251        osd_data = osd_req_op_extent_osd_data(req, 0);
 252        BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
 253        num_pages = calc_pages_for((u64)osd_data->alignment,
 254                                        (u64)osd_data->length);
 255        for (i = 0; i < num_pages; i++) {
 256                struct page *page = osd_data->pages[i];
 257
 258                if (rc < 0 && rc != -ENOENT)
 259                        goto unlock;
 260                if (bytes < (int)PAGE_CACHE_SIZE) {
 261                        /* zero (remainder of) page */
 262                        int s = bytes < 0 ? 0 : bytes;
 263                        zero_user_segment(page, s, PAGE_CACHE_SIZE);
 264                }
 265                dout("finish_read %p uptodate %p idx %lu\n", inode, page,
 266                     page->index);
 267                flush_dcache_page(page);
 268                SetPageUptodate(page);
 269unlock:
 270                unlock_page(page);
 271                page_cache_release(page);
 272                bytes -= PAGE_CACHE_SIZE;
 273        }
 274        kfree(osd_data->pages);
 275}
 276
 277/*
 278 * start an async read(ahead) operation.  return nr_pages we submitted
 279 * a read for on success, or negative error code.
 280 */
 281static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx,
 282                      struct list_head *page_list, int max)
 283{
 284        struct ceph_osd_client *osdc =
 285                &ceph_inode_to_client(inode)->client->osdc;
 286        struct ceph_inode_info *ci = ceph_inode(inode);
 287        struct page *page = list_entry(page_list->prev, struct page, lru);
 288        struct ceph_vino vino;
 289        struct ceph_osd_request *req;
 290        u64 off;
 291        u64 len;
 292        int i;
 293        struct page **pages;
 294        pgoff_t next_index;
 295        int nr_pages = 0;
 296        int got = 0;
 297        int ret = 0;
 298
 299        if (!rw_ctx) {
 300                /* caller of readpages does not hold buffer and read caps
 301                 * (fadvise, madvise and readahead cases) */
 302                int want = CEPH_CAP_FILE_CACHE;
 303                ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, &got);
 304                if (ret < 0) {
 305                        dout("start_read %p, error getting cap\n", inode);
 306                } else if (!(got & want)) {
 307                        dout("start_read %p, no cache cap\n", inode);
 308                        ret = 0;
 309                }
 310                if (ret <= 0) {
 311                        if (got)
 312                                ceph_put_cap_refs(ci, got);
 313                        while (!list_empty(page_list)) {
 314                                page = list_entry(page_list->prev,
 315                                                  struct page, lru);
 316                                list_del(&page->lru);
 317                                put_page(page);
 318                        }
 319                        return ret;
 320                }
 321        }
 322
 323        off = (u64) page_offset(page);
 324
 325        /* count pages */
 326        next_index = page->index;
 327        list_for_each_entry_reverse(page, page_list, lru) {
 328                if (page->index != next_index)
 329                        break;
 330                nr_pages++;
 331                next_index++;
 332                if (max && nr_pages == max)
 333                        break;
 334        }
 335        len = nr_pages << PAGE_CACHE_SHIFT;
 336        dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
 337             off, len);
 338        vino = ceph_vino(inode);
 339        req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len,
 340                                    0, 1, CEPH_OSD_OP_READ,
 341                                    CEPH_OSD_FLAG_READ, NULL,
 342                                    ci->i_truncate_seq, ci->i_truncate_size,
 343                                    false);
 344        if (IS_ERR(req)) {
 345                ret = PTR_ERR(req);
 346                goto out;
 347        }
 348
 349        /* build page vector */
 350        nr_pages = calc_pages_for(0, len);
 351        pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL);
 352        if (!pages) {
 353                ret = -ENOMEM;
 354                goto out_put;
 355        }
 356        for (i = 0; i < nr_pages; ++i) {
 357                page = list_entry(page_list->prev, struct page, lru);
 358                BUG_ON(PageLocked(page));
 359                list_del(&page->lru);
 360                
 361                dout("start_read %p adding %p idx %lu\n", inode, page,
 362                     page->index);
 363                if (add_to_page_cache_lru(page, &inode->i_data, page->index,
 364                                          GFP_KERNEL)) {
 365                        page_cache_release(page);
 366                        dout("start_read %p add_to_page_cache failed %p\n",
 367                             inode, page);
 368                        nr_pages = i;
 369                        if (nr_pages > 0) {
 370                                len = nr_pages << PAGE_SHIFT;
 371                                osd_req_op_extent_update(req, 0, len);
 372                                break;
 373                        }
 374                        goto out_pages;
 375                }
 376                pages[i] = page;
 377        }
 378        osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false);
 379        req->r_callback = finish_read;
 380        req->r_inode = inode;
 381
 382        dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
 383        ret = ceph_osdc_start_request(osdc, req, false);
 384        if (ret < 0)
 385                goto out_pages;
 386        ceph_osdc_put_request(req);
 387
 388        /* After adding locked pages to page cache, the inode holds cache cap.
 389         * So we can drop our cap refs. */
 390        if (got)
 391                ceph_put_cap_refs(ci, got);
 392
 393        return nr_pages;
 394
 395out_pages:
 396        for (i = 0; i < nr_pages; ++i)
 397                unlock_page(pages[i]);
 398        ceph_put_page_vector(pages, nr_pages, false);
 399out_put:
 400        ceph_osdc_put_request(req);
 401out:
 402        if (got)
 403                ceph_put_cap_refs(ci, got);
 404        return ret;
 405}
 406
 407
 408/*
 409 * Read multiple pages.  Leave pages we don't read + unlock in page_list;
 410 * the caller (VM) cleans them up.
 411 */
 412static int ceph_readpages(struct file *file, struct address_space *mapping,
 413                          struct list_head *page_list, unsigned nr_pages)
 414{
 415        struct inode *inode = file_inode(file);
 416        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 417        struct ceph_file_info *fi = file->private_data;
 418        struct ceph_rw_context *rw_ctx;
 419        int rc = 0;
 420        int max = 0;
 421
 422        if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE)
 423                return -EINVAL;
 424
 425        rw_ctx = ceph_find_rw_context(fi);
 426        max = fsc->mount_options->rsize >> PAGE_SHIFT;
 427        dout("readpages %p file %p ctx %p nr_pages %d max %d\n",
 428             inode, file, rw_ctx, nr_pages, max);
 429        while (!list_empty(page_list)) {
 430                rc = start_read(inode, rw_ctx, page_list, max);
 431                if (rc < 0)
 432                        goto out;
 433        }
 434out:
 435        dout("readpages %p file %p ret %d\n", inode, file, rc);
 436        return rc;
 437}
 438
 439struct ceph_writeback_ctl
 440{
 441        loff_t i_size;
 442        u64 truncate_size;
 443        u32 truncate_seq;
 444        bool size_stable;
 445        bool head_snapc;
 446};
 447
 448/*
 449 * Get ref for the oldest snapc for an inode with dirty data... that is, the
 450 * only snap context we are allowed to write back.
 451 */
 452static struct ceph_snap_context *
 453get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl,
 454                   struct ceph_snap_context *page_snapc)
 455{
 456        struct ceph_inode_info *ci = ceph_inode(inode);
 457        struct ceph_snap_context *snapc = NULL;
 458        struct ceph_cap_snap *capsnap = NULL;
 459
 460        spin_lock(&ci->i_ceph_lock);
 461        list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
 462                dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
 463                     capsnap->context, capsnap->dirty_pages);
 464                if (!capsnap->dirty_pages)
 465                        continue;
 466
 467                /* get i_size, truncate_{seq,size} for page_snapc? */
 468                if (snapc && capsnap->context != page_snapc)
 469                        continue;
 470
 471                if (ctl) {
 472                        if (capsnap->writing) {
 473                                ctl->i_size = i_size_read(inode);
 474                                ctl->size_stable = false;
 475                        } else {
 476                                ctl->i_size = capsnap->size;
 477                                ctl->size_stable = true;
 478                        }
 479                        ctl->truncate_size = capsnap->truncate_size;
 480                        ctl->truncate_seq = capsnap->truncate_seq;
 481                        ctl->head_snapc = false;
 482                }
 483
 484                if (snapc)
 485                        break;
 486
 487                snapc = ceph_get_snap_context(capsnap->context);
 488                if (!page_snapc ||
 489                    page_snapc == snapc ||
 490                    page_snapc->seq > snapc->seq)
 491                        break;
 492        }
 493        if (!snapc && ci->i_wrbuffer_ref_head) {
 494                snapc = ceph_get_snap_context(ci->i_head_snapc);
 495                dout(" head snapc %p has %d dirty pages\n",
 496                     snapc, ci->i_wrbuffer_ref_head);
 497                if (ctl) {
 498                        ctl->i_size = i_size_read(inode);
 499                        ctl->truncate_size = ci->i_truncate_size;
 500                        ctl->truncate_seq = ci->i_truncate_seq;
 501                        ctl->size_stable = false;
 502                        ctl->head_snapc = true;
 503                }
 504        }
 505        spin_unlock(&ci->i_ceph_lock);
 506        return snapc;
 507}
 508
 509static u64 get_writepages_data_length(struct inode *inode,
 510                                      struct page *page, u64 start)
 511{
 512        struct ceph_inode_info *ci = ceph_inode(inode);
 513        struct ceph_snap_context *snapc = page_snap_context(page);
 514        struct ceph_cap_snap *capsnap = NULL;
 515        u64 end = i_size_read(inode);
 516
 517        if (snapc != ci->i_head_snapc) {
 518                bool found = false;
 519                spin_lock(&ci->i_ceph_lock);
 520                list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
 521                        if (capsnap->context == snapc) {
 522                                if (!capsnap->writing)
 523                                        end = capsnap->size;
 524                                found = true;
 525                                break;
 526                        }
 527                }
 528                spin_unlock(&ci->i_ceph_lock);
 529                WARN_ON(!found);
 530        }
 531        if (end > page_offset(page) + PAGE_SIZE)
 532                end = page_offset(page) + PAGE_SIZE;
 533        return end > start ? end - start : 0;
 534}
 535
 536/*
 537 * Write a single page, but leave the page locked.
 538 *
 539 * If we get a write error, set the page error bit, but still adjust the
 540 * dirty page accounting (i.e., page is no longer dirty).
 541 */
 542static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 543{
 544        struct inode *inode;
 545        struct ceph_inode_info *ci;
 546        struct ceph_fs_client *fsc;
 547        struct ceph_snap_context *snapc, *oldest;
 548        loff_t page_off = page_offset(page);
 549        int err, len = PAGE_CACHE_SIZE;
 550        struct ceph_writeback_ctl ceph_wbc;
 551
 552        dout("writepage %p idx %lu\n", page, page->index);
 553
 554        inode = page->mapping->host;
 555        ci = ceph_inode(inode);
 556        fsc = ceph_inode_to_client(inode);
 557
 558        /* verify this is a writeable snap context */
 559        snapc = page_snap_context(page);
 560        if (!snapc) {
 561                dout("writepage %p page %p not dirty?\n", inode, page);
 562                return 0;
 563        }
 564        oldest = get_oldest_context(inode, &ceph_wbc, snapc);
 565        if (snapc->seq > oldest->seq) {
 566                dout("writepage %p page %p snapc %p not writeable - noop\n",
 567                     inode, page, snapc);
 568                /* we should only noop if called by kswapd */
 569                WARN_ON(!(current->flags & PF_MEMALLOC));
 570                ceph_put_snap_context(oldest);
 571                redirty_page_for_writepage(wbc, page);
 572                return 0;
 573        }
 574        ceph_put_snap_context(oldest);
 575
 576        /* is this a partial page at end of file? */
 577        if (page_off >= ceph_wbc.i_size) {
 578                dout("%p page eof %llu\n", page, ceph_wbc.i_size);
 579                page->mapping->a_ops->invalidatepage(page, 0);
 580                return 0;
 581        }
 582
 583        if (ceph_wbc.i_size < page_off + len)
 584                len = ceph_wbc.i_size - page_off;
 585
 586        dout("writepage %p page %p index %lu on %llu~%u snapc %p seq %lld\n",
 587             inode, page, page->index, page_off, len, snapc, snapc->seq);
 588
 589        if (atomic_long_inc_return(&fsc->writeback_count) >
 590            CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
 591                set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC);
 592
 593        set_page_writeback(page);
 594        err = ceph_osdc_writepages(&fsc->client->osdc, ceph_vino(inode),
 595                                   &ci->i_layout, snapc, page_off, len,
 596                                   ceph_wbc.truncate_seq,
 597                                   ceph_wbc.truncate_size,
 598                                   &inode->i_mtime, &page, 1);
 599        if (err < 0) {
 600                struct writeback_control tmp_wbc;
 601                if (!wbc)
 602                        wbc = &tmp_wbc;
 603                if (err == -ERESTARTSYS) {
 604                        /* killed by SIGKILL */
 605                        dout("writepage interrupted page %p\n", page);
 606                        redirty_page_for_writepage(wbc, page);
 607                        end_page_writeback(page);
 608                        return err;
 609                }
 610                dout("writepage setting page/mapping error %d %p\n",
 611                     err, page);
 612                SetPageError(page);
 613                mapping_set_error(&inode->i_data, err);
 614                wbc->pages_skipped++;
 615        } else {
 616                dout("writepage cleaned page %p\n", page);
 617                err = 0;  /* vfs expects us to return 0 */
 618        }
 619        page->private = 0;
 620        ClearPagePrivate(page);
 621        end_page_writeback(page);
 622        ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
 623        ceph_put_snap_context(snapc);  /* page's reference */
 624
 625        if (atomic_long_dec_return(&fsc->writeback_count) <
 626            CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
 627                clear_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC);
 628
 629        return err;
 630}
 631
 632static int ceph_writepage(struct page *page, struct writeback_control *wbc)
 633{
 634        int err;
 635        struct inode *inode = page->mapping->host;
 636        BUG_ON(!inode);
 637        ihold(inode);
 638        err = writepage_nounlock(page, wbc);
 639        if (err == -ERESTARTSYS) {
 640                /* direct memory reclaimer was killed by SIGKILL. return 0
 641                 * to prevent caller from setting mapping/page error */
 642                err = 0;
 643        }
 644        unlock_page(page);
 645        iput(inode);
 646        return err;
 647}
 648
 649/*
 650 * lame release_pages helper.  release_pages() isn't exported to
 651 * modules.
 652 */
 653static void ceph_release_pages(struct page **pages, int num)
 654{
 655        struct pagevec pvec;
 656        int i;
 657
 658        pagevec_init(&pvec, 0);
 659        for (i = 0; i < num; i++) {
 660                if (pagevec_add(&pvec, pages[i]) == 0)
 661                        pagevec_release(&pvec);
 662        }
 663        pagevec_release(&pvec);
 664}
 665
 666
 667/*
 668 * async writeback completion handler.
 669 *
 670 * If we get an error, set the mapping error bit, but not the individual
 671 * page error bits.
 672 */
 673static void writepages_finish(struct ceph_osd_request *req)
 674{
 675        struct inode *inode = req->r_inode;
 676        struct ceph_inode_info *ci = ceph_inode(inode);
 677        struct ceph_osd_data *osd_data;
 678        struct page *page;
 679        int num_pages, total_pages = 0;
 680        int i, j;
 681        int rc = req->r_result;
 682        struct ceph_snap_context *snapc = req->r_snapc;
 683        struct address_space *mapping = inode->i_mapping;
 684        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 685        bool remove_page;
 686
 687
 688        dout("writepages_finish %p rc %d\n", inode, rc);
 689        if (rc < 0) {
 690                mapping_set_error(mapping, rc);
 691                ceph_set_error_write(ci);
 692        } else {
 693                ceph_clear_error_write(ci);
 694        }
 695
 696        /*
 697         * We lost the cache cap, need to truncate the page before
 698         * it is unlocked, otherwise we'd truncate it later in the
 699         * page truncation thread, possibly losing some data that
 700         * raced its way in
 701         */
 702        remove_page = !(ceph_caps_issued(ci) &
 703                        (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
 704
 705        /* clean all pages */
 706        for (i = 0; i < req->r_num_ops; i++) {
 707                if (req->r_ops[i].op != CEPH_OSD_OP_WRITE)
 708                        break;
 709
 710                osd_data = osd_req_op_extent_osd_data(req, i);
 711                BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
 712                num_pages = calc_pages_for((u64)osd_data->alignment,
 713                                           (u64)osd_data->length);
 714                total_pages += num_pages;
 715                for (j = 0; j < num_pages; j++) {
 716                        page = osd_data->pages[j];
 717                        BUG_ON(!page);
 718                        WARN_ON(!PageUptodate(page));
 719
 720                        if (atomic_long_dec_return(&fsc->writeback_count) <
 721                             CONGESTION_OFF_THRESH(
 722                                        fsc->mount_options->congestion_kb))
 723                                clear_bdi_congested(&fsc->backing_dev_info,
 724                                                    BLK_RW_ASYNC);
 725
 726                        ceph_put_snap_context(page_snap_context(page));
 727                        page->private = 0;
 728                        ClearPagePrivate(page);
 729                        dout("unlocking %p\n", page);
 730                        end_page_writeback(page);
 731
 732                        if (remove_page)
 733                                generic_error_remove_page(inode->i_mapping,
 734                                                          page);
 735
 736                        unlock_page(page);
 737                }
 738                dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
 739                     inode, osd_data->length, rc >= 0 ? num_pages : 0);
 740
 741                ceph_release_pages(osd_data->pages, num_pages);
 742        }
 743
 744        ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
 745
 746        osd_data = osd_req_op_extent_osd_data(req, 0);
 747        if (osd_data->pages_from_pool)
 748                mempool_free(osd_data->pages,
 749                             ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
 750        else
 751                kfree(osd_data->pages);
 752        ceph_osdc_put_request(req);
 753}
 754
 755/*
 756 * initiate async writeback
 757 */
 758static int ceph_writepages_start(struct address_space *mapping,
 759                                 struct writeback_control *wbc)
 760{
 761        struct inode *inode = mapping->host;
 762        struct ceph_inode_info *ci = ceph_inode(inode);
 763        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 764        struct ceph_vino vino = ceph_vino(inode);
 765        pgoff_t index, start_index, end = -1;
 766        struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
 767        struct pagevec pvec;
 768        int rc = 0;
 769        unsigned wsize = 1 << inode->i_blkbits;
 770        struct ceph_osd_request *req = NULL;
 771        struct ceph_writeback_ctl ceph_wbc;
 772        bool should_loop, range_whole = false;
 773        bool done = false;
 774
 775        dout("writepages_start %p (mode=%s)\n", inode,
 776             wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
 777             (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
 778
 779        if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
 780                if (ci->i_wrbuffer_ref > 0) {
 781                        pr_warn_ratelimited(
 782                                "writepage_start %p %lld forced umount\n",
 783                                inode, ceph_ino(inode));
 784                }
 785                mapping_set_error(mapping, -EIO);
 786                return -EIO; /* we're in a forced umount, don't write! */
 787        }
 788        if (fsc->mount_options->wsize < wsize)
 789                wsize = fsc->mount_options->wsize;
 790
 791        pagevec_init(&pvec, 0);
 792
 793        start_index = wbc->range_cyclic ? mapping->writeback_index : 0;
 794        index = start_index;
 795
 796retry:
 797        /* find oldest snap context with dirty data */
 798        snapc = get_oldest_context(inode, &ceph_wbc, NULL);
 799        if (!snapc) {
 800                /* hmm, why does writepages get called when there
 801                   is no dirty data? */
 802                dout(" no snap context with dirty data?\n");
 803                goto out;
 804        }
 805        dout(" oldest snapc is %p seq %lld (%d snaps)\n",
 806             snapc, snapc->seq, snapc->num_snaps);
 807
 808        should_loop = false;
 809        if (ceph_wbc.head_snapc && snapc != last_snapc) {
 810                /* where to start/end? */
 811                if (wbc->range_cyclic) {
 812                        index = start_index;
 813                        end = -1;
 814                        if (index > 0)
 815                                should_loop = true;
 816                        dout(" cyclic, start at %lu\n", index);
 817                } else {
 818                        index = wbc->range_start >> PAGE_SHIFT;
 819                        end = wbc->range_end >> PAGE_SHIFT;
 820                        if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
 821                                range_whole = true;
 822                        dout(" not cyclic, %lu to %lu\n", index, end);
 823                }
 824        } else if (!ceph_wbc.head_snapc) {
 825                /* Do not respect wbc->range_{start,end}. Dirty pages
 826                 * in that range can be associated with newer snapc.
 827                 * They are not writeable until we write all dirty pages
 828                 * associated with 'snapc' get written */
 829                if (index > 0)
 830                        should_loop = true;
 831                dout(" non-head snapc, range whole\n");
 832        }
 833
 834        ceph_put_snap_context(last_snapc);
 835        last_snapc = snapc;
 836
 837        while (!done && index <= end) {
 838                int num_ops = 0, op_idx;
 839                unsigned i, pvec_pages, max_pages, locked_pages = 0;
 840                struct page **pages = NULL, **data_pages;
 841                mempool_t *pool = NULL; /* Becomes non-null if mempool used */
 842                struct page *page;
 843                pgoff_t strip_unit_end = 0;
 844                u64 offset = 0, len = 0;
 845
 846                max_pages = wsize >> PAGE_SHIFT;
 847
 848get_more_pages:
 849                pvec_pages = min_t(unsigned, PAGEVEC_SIZE,
 850                                   max_pages - locked_pages);
 851                if (end - index < (u64)(pvec_pages - 1))
 852                        pvec_pages = (unsigned)(end - index) + 1;
 853
 854                pvec_pages = pagevec_lookup_tag(&pvec, mapping, &index,
 855                                                PAGECACHE_TAG_DIRTY,
 856                                                pvec_pages);
 857                dout("pagevec_lookup_tag got %d\n", pvec_pages);
 858                if (!pvec_pages && !locked_pages)
 859                        break;
 860                for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) {
 861                        page = pvec.pages[i];
 862                        dout("? %p idx %lu\n", page, page->index);
 863                        if (locked_pages == 0)
 864                                lock_page(page);  /* first page */
 865                        else if (!trylock_page(page))
 866                                break;
 867
 868                        /* only dirty pages, or our accounting breaks */
 869                        if (unlikely(!PageDirty(page)) ||
 870                            unlikely(page->mapping != mapping)) {
 871                                dout("!dirty or !mapping %p\n", page);
 872                                unlock_page(page);
 873                                continue;
 874                        }
 875                        /* only if matching snap context */
 876                        pgsnapc = page_snap_context(page);
 877                        if (pgsnapc != snapc) {
 878                                dout("page snapc %p %lld != oldest %p %lld\n",
 879                                     pgsnapc, pgsnapc->seq, snapc, snapc->seq);
 880                                if (!should_loop &&
 881                                    !ceph_wbc.head_snapc &&
 882                                    wbc->sync_mode != WB_SYNC_NONE)
 883                                        should_loop = true;
 884                                unlock_page(page);
 885                                continue;
 886                        }
 887                        if (page_offset(page) >= ceph_wbc.i_size) {
 888                                dout("%p page eof %llu\n",
 889                                     page, ceph_wbc.i_size);
 890                                if (ceph_wbc.size_stable ||
 891                                    page_offset(page) >= i_size_read(inode))
 892                                        mapping->a_ops->invalidatepage(page, 0);
 893                                unlock_page(page);
 894                                continue;
 895                        }
 896                        if (strip_unit_end && (page->index > strip_unit_end)) {
 897                                dout("end of strip unit %p\n", page);
 898                                unlock_page(page);
 899                                break;
 900                        }
 901                        if (PageWriteback(page)) {
 902                                if (wbc->sync_mode == WB_SYNC_NONE) {
 903                                        dout("%p under writeback\n", page);
 904                                        unlock_page(page);
 905                                        continue;
 906                                }
 907                                dout("waiting on writeback %p\n", page);
 908                                wait_on_page_writeback(page);
 909                        }
 910
 911                        if (!clear_page_dirty_for_io(page)) {
 912                                dout("%p !clear_page_dirty_for_io\n", page);
 913                                unlock_page(page);
 914                                continue;
 915                        }
 916
 917                        /*
 918                         * We have something to write.  If this is
 919                         * the first locked page this time through,
 920                         * calculate max possinle write size and
 921                         * allocate a page array
 922                         */
 923                        if (locked_pages == 0) {
 924                                u64 objnum;
 925                                u64 objoff;
 926                                u32 xlen;
 927
 928                                /* prepare async write request */
 929                                offset = (u64)page_offset(page);
 930                                ceph_calc_file_object_mapping(&ci->i_layout,
 931                                                              offset, wsize,
 932                                                              &objnum, &objoff,
 933                                                              &xlen);
 934                                len = xlen;
 935
 936                                num_ops = 1;
 937                                strip_unit_end = page->index +
 938                                        ((len - 1) >> PAGE_CACHE_SHIFT);
 939
 940                                BUG_ON(pages);
 941                                max_pages = calc_pages_for(0, (u64)len);
 942                                pages = kmalloc(max_pages * sizeof (*pages),
 943                                                GFP_NOFS);
 944                                if (!pages) {
 945                                        pool = fsc->wb_pagevec_pool;
 946                                        pages = mempool_alloc(pool, GFP_NOFS);
 947                                        BUG_ON(!pages);
 948                                }
 949
 950                                len = 0;
 951                        } else if (page->index !=
 952                                   (offset + len) >> PAGE_CACHE_SHIFT) {
 953                                if (num_ops >= (pool ?  CEPH_OSD_SLAB_OPS :
 954                                                        CEPH_OSD_MAX_OPS)) {
 955                                        redirty_page_for_writepage(wbc, page);
 956                                        unlock_page(page);
 957                                        break;
 958                                }
 959
 960                                num_ops++;
 961                                offset = (u64)page_offset(page);
 962                                len = 0;
 963                        }
 964
 965                        /* note position of first page in pvec */
 966                        dout("%p will write page %p idx %lu\n",
 967                             inode, page, page->index);
 968
 969                        if (atomic_long_inc_return(&fsc->writeback_count) >
 970                            CONGESTION_ON_THRESH(
 971                                    fsc->mount_options->congestion_kb)) {
 972                                set_bdi_congested(&fsc->backing_dev_info,
 973                                                  BLK_RW_ASYNC);
 974                        }
 975
 976
 977                        pages[locked_pages++] = page;
 978                        pvec.pages[i] = NULL;
 979
 980                        len += PAGE_CACHE_SIZE;
 981                }
 982
 983                /* did we get anything? */
 984                if (!locked_pages)
 985                        goto release_pvec_pages;
 986                if (i) {
 987                        unsigned j, n = 0;
 988                        /* shift unused page to beginning of pvec */
 989                        for (j = 0; j < pvec_pages; j++) {
 990                                if (!pvec.pages[j])
 991                                        continue;
 992                                if (n < j)
 993                                        pvec.pages[n] = pvec.pages[j];
 994                                n++;
 995                        }
 996                        pvec.nr = n;
 997
 998                        if (pvec_pages && i == pvec_pages &&
 999                            locked_pages < max_pages) {
1000                                dout("reached end pvec, trying for more\n");
1001                                pagevec_release(&pvec);
1002                                goto get_more_pages;
1003                        }
1004                }
1005
1006new_request:
1007                offset = page_offset(pages[0]);
1008                len = wsize;
1009
1010                req = ceph_osdc_new_request(&fsc->client->osdc,
1011                                        &ci->i_layout, vino,
1012                                        offset, &len, 0, num_ops,
1013                                        CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
1014                                        snapc, ceph_wbc.truncate_seq,
1015                                        ceph_wbc.truncate_size, false);
1016                if (IS_ERR(req)) {
1017                        req = ceph_osdc_new_request(&fsc->client->osdc,
1018                                                &ci->i_layout, vino,
1019                                                offset, &len, 0,
1020                                                min(num_ops,
1021                                                    CEPH_OSD_SLAB_OPS),
1022                                                CEPH_OSD_OP_WRITE,
1023                                                CEPH_OSD_FLAG_WRITE,
1024                                                snapc, ceph_wbc.truncate_seq,
1025                                                ceph_wbc.truncate_size, true);
1026                        BUG_ON(IS_ERR(req));
1027                }
1028                BUG_ON(len < page_offset(pages[locked_pages - 1]) +
1029                             PAGE_CACHE_SIZE - offset);
1030
1031                req->r_callback = writepages_finish;
1032                req->r_inode = inode;
1033
1034                /* Format the osd request message and submit the write */
1035                len = 0;
1036                data_pages = pages;
1037                op_idx = 0;
1038                for (i = 0; i < locked_pages; i++) {
1039                        u64 cur_offset = page_offset(pages[i]);
1040                        if (offset + len != cur_offset) {
1041                                if (op_idx + 1 == req->r_num_ops)
1042                                        break;
1043                                osd_req_op_extent_dup_last(req, op_idx,
1044                                                           cur_offset - offset);
1045                                dout("writepages got pages at %llu~%llu\n",
1046                                     offset, len);
1047                                osd_req_op_extent_osd_data_pages(req, op_idx,
1048                                                        data_pages, len, 0,
1049                                                        !!pool, false);
1050                                osd_req_op_extent_update(req, op_idx, len);
1051
1052                                len = 0;
1053                                offset = cur_offset; 
1054                                data_pages = pages + i;
1055                                op_idx++;
1056                        }
1057
1058                        set_page_writeback(pages[i]);
1059                        len += PAGE_CACHE_SIZE;
1060                }
1061
1062                if (ceph_wbc.size_stable) {
1063                        len = min(len, ceph_wbc.i_size - offset);
1064                } else if (i == locked_pages) {
1065                        /* writepages_finish() clears writeback pages
1066                         * according to the data length, so make sure
1067                         * data length covers all locked pages */
1068                        u64 min_len = len + 1 - PAGE_CACHE_SIZE;
1069                        len = get_writepages_data_length(inode, pages[i - 1],
1070                                                         offset);
1071                        len = max(len, min_len);
1072                }
1073                dout("writepages got pages at %llu~%llu\n", offset, len);
1074
1075                osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
1076                                                 0, !!pool, false);
1077                osd_req_op_extent_update(req, op_idx, len);
1078
1079                BUG_ON(op_idx + 1 != req->r_num_ops);
1080
1081                pool = NULL;
1082                if (i < locked_pages) {
1083                        BUG_ON(num_ops <= req->r_num_ops);
1084                        num_ops -= req->r_num_ops;
1085                        locked_pages -= i;
1086
1087                        /* allocate new pages array for next request */
1088                        data_pages = pages;
1089                        pages = kmalloc(locked_pages * sizeof (*pages),
1090                                        GFP_NOFS);
1091                        if (!pages) {
1092                                pool = fsc->wb_pagevec_pool;
1093                                pages = mempool_alloc(pool, GFP_NOFS);
1094                                BUG_ON(!pages);
1095                        }
1096                        memcpy(pages, data_pages + i,
1097                               locked_pages * sizeof(*pages));
1098                        memset(data_pages + i, 0,
1099                               locked_pages * sizeof(*pages));
1100                } else {
1101                        BUG_ON(num_ops != req->r_num_ops);
1102                        index = pages[i - 1]->index + 1;
1103                        /* request message now owns the pages array */
1104                        pages = NULL;
1105                }
1106
1107                req->r_mtime = inode->i_mtime;
1108                rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
1109                BUG_ON(rc);
1110                req = NULL;
1111
1112                wbc->nr_to_write -= i;
1113                if (pages)
1114                        goto new_request;
1115
1116                /*
1117                 * We stop writing back only if we are not doing
1118                 * integrity sync. In case of integrity sync we have to
1119                 * keep going until we have written all the pages
1120                 * we tagged for writeback prior to entering this loop.
1121                 */
1122                if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE)
1123                        done = true;
1124
1125release_pvec_pages:
1126                dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
1127                     pvec.nr ? pvec.pages[0] : NULL);
1128                pagevec_release(&pvec);
1129        }
1130
1131        if (should_loop && !done) {
1132                /* more to do; loop back to beginning of file */
1133                dout("writepages looping back to beginning of file\n");
1134                end = start_index - 1; /* OK even when start_index == 0 */
1135
1136                /* to write dirty pages associated with next snapc,
1137                 * we need to wait until current writes complete */
1138                if (wbc->sync_mode != WB_SYNC_NONE &&
1139                    start_index == 0 && /* all dirty pages were checked */
1140                    !ceph_wbc.head_snapc) {
1141                        struct page *page;
1142                        unsigned i, nr;
1143                        index = 0;
1144                        while ((index <= end) &&
1145                               (nr = pagevec_lookup_tag(&pvec, mapping, &index,
1146                                                        PAGECACHE_TAG_WRITEBACK,
1147                                                        PAGEVEC_SIZE))) {
1148                                for (i = 0; i < nr; i++) {
1149                                        page = pvec.pages[i];
1150                                        if (page_snap_context(page) != snapc)
1151                                                continue;
1152                                        wait_on_page_writeback(page);
1153                                }
1154                                pagevec_release(&pvec);
1155                                cond_resched();
1156                        }
1157                }
1158
1159                start_index = 0;
1160                index = 0;
1161                goto retry;
1162        }
1163
1164        if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0))
1165                mapping->writeback_index = index;
1166
1167out:
1168        ceph_osdc_put_request(req);
1169        ceph_put_snap_context(last_snapc);
1170        dout("writepages dend - startone, rc = %d\n", rc);
1171        return rc;
1172}
1173
1174
1175
1176/*
1177 * See if a given @snapc is either writeable, or already written.
1178 */
1179static int context_is_writeable_or_written(struct inode *inode,
1180                                           struct ceph_snap_context *snapc)
1181{
1182        struct ceph_snap_context *oldest = get_oldest_context(inode, NULL, NULL);
1183        int ret = !oldest || snapc->seq <= oldest->seq;
1184
1185        ceph_put_snap_context(oldest);
1186        return ret;
1187}
1188
1189/*
1190 * We are only allowed to write into/dirty the page if the page is
1191 * clean, or already dirty within the same snap context.
1192 *
1193 * called with page locked.
1194 * return success with page locked,
1195 * or any failure (incl -EAGAIN) with page unlocked.
1196 */
1197static int ceph_update_writeable_page(struct file *file,
1198                            loff_t pos, unsigned len,
1199                            struct page *page)
1200{
1201        struct inode *inode = file_inode(file);
1202        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1203        struct ceph_inode_info *ci = ceph_inode(inode);
1204        loff_t page_off = pos & PAGE_CACHE_MASK;
1205        int pos_in_page = pos & ~PAGE_CACHE_MASK;
1206        int end_in_page = pos_in_page + len;
1207        loff_t i_size;
1208        int r;
1209        struct ceph_snap_context *snapc, *oldest;
1210
1211        if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
1212                dout(" page %p forced umount\n", page);
1213                unlock_page(page);
1214                return -EIO;
1215        }
1216
1217retry_locked:
1218        /* writepages currently holds page lock, but if we change that later, */
1219        wait_on_page_writeback(page);
1220
1221        snapc = page_snap_context(page);
1222        if (snapc && snapc != ci->i_head_snapc) {
1223                /*
1224                 * this page is already dirty in another (older) snap
1225                 * context!  is it writeable now?
1226                 */
1227                oldest = get_oldest_context(inode, NULL, NULL);
1228                if (snapc->seq > oldest->seq) {
1229                        ceph_put_snap_context(oldest);
1230                        dout(" page %p snapc %p not current or oldest\n",
1231                             page, snapc);
1232                        /*
1233                         * queue for writeback, and wait for snapc to
1234                         * be writeable or written
1235                         */
1236                        snapc = ceph_get_snap_context(snapc);
1237                        unlock_page(page);
1238                        ceph_queue_writeback(inode);
1239                        r = wait_event_killable(ci->i_cap_wq,
1240                               context_is_writeable_or_written(inode, snapc));
1241                        ceph_put_snap_context(snapc);
1242                        if (r == -ERESTARTSYS)
1243                                return r;
1244                        return -EAGAIN;
1245                }
1246                ceph_put_snap_context(oldest);
1247
1248                /* yay, writeable, do it now (without dropping page lock) */
1249                dout(" page %p snapc %p not current, but oldest\n",
1250                     page, snapc);
1251                if (!clear_page_dirty_for_io(page))
1252                        goto retry_locked;
1253                r = writepage_nounlock(page, NULL);
1254                if (r < 0)
1255                        goto fail_nosnap;
1256                goto retry_locked;
1257        }
1258
1259        if (PageUptodate(page)) {
1260                dout(" page %p already uptodate\n", page);
1261                return 0;
1262        }
1263
1264        /* full page? */
1265        if (pos_in_page == 0 && len == PAGE_CACHE_SIZE)
1266                return 0;
1267
1268        /* past end of file? */
1269        i_size = i_size_read(inode);
1270
1271        if (page_off >= i_size ||
1272            (pos_in_page == 0 && (pos+len) >= i_size &&
1273             end_in_page - pos_in_page != PAGE_CACHE_SIZE)) {
1274                dout(" zeroing %p 0 - %d and %d - %d\n",
1275                     page, pos_in_page, end_in_page, (int)PAGE_CACHE_SIZE);
1276                zero_user_segments(page,
1277                                   0, pos_in_page,
1278                                   end_in_page, PAGE_CACHE_SIZE);
1279                return 0;
1280        }
1281
1282        /* we need to read it. */
1283        r = readpage_nounlock(file, page);
1284        if (r < 0)
1285                goto fail_nosnap;
1286        goto retry_locked;
1287fail_nosnap:
1288        unlock_page(page);
1289        return r;
1290}
1291
1292/*
1293 * We are only allowed to write into/dirty the page if the page is
1294 * clean, or already dirty within the same snap context.
1295 */
1296static int ceph_write_begin(struct file *file, struct address_space *mapping,
1297                            loff_t pos, unsigned len, unsigned flags,
1298                            struct page **pagep, void **fsdata)
1299{
1300        struct inode *inode = file_inode(file);
1301        struct page *page;
1302        pgoff_t index = pos >> PAGE_CACHE_SHIFT;
1303        int r;
1304
1305        do {
1306                /* get a page */
1307                page = grab_cache_page_write_begin(mapping, index, 0);
1308                if (!page)
1309                        return -ENOMEM;
1310
1311                dout("write_begin file %p inode %p page %p %d~%d\n", file,
1312                     inode, page, (int)pos, (int)len);
1313
1314                r = ceph_update_writeable_page(file, pos, len, page);
1315                if (r < 0)
1316                        page_cache_release(page);
1317                else
1318                        *pagep = page;
1319        } while (r == -EAGAIN);
1320
1321        return r;
1322}
1323
1324/*
1325 * we don't do anything in here that simple_write_end doesn't do
1326 * except adjust dirty page accounting
1327 */
1328static int ceph_write_end(struct file *file, struct address_space *mapping,
1329                          loff_t pos, unsigned len, unsigned copied,
1330                          struct page *page, void *fsdata)
1331{
1332        struct inode *inode = file_inode(file);
1333        bool check_cap = false;
1334
1335        dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
1336             inode, page, (int)pos, (int)copied, (int)len);
1337
1338        /* zero the stale part of the page if we did a short copy */
1339        if (!PageUptodate(page)) {
1340                if (copied < len) {
1341                        copied = 0;
1342                        goto out;
1343                }
1344                SetPageUptodate(page);
1345        }
1346
1347        /* did file size increase? */
1348        if (pos+copied > i_size_read(inode))
1349                check_cap = ceph_inode_set_size(inode, pos+copied);
1350
1351        set_page_dirty(page);
1352
1353out:
1354        unlock_page(page);
1355        page_cache_release(page);
1356
1357        if (check_cap)
1358                ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
1359
1360        return copied;
1361}
1362
1363/*
1364 * we set .direct_IO to indicate direct io is supported, but since we
1365 * intercept O_DIRECT reads and writes early, this function should
1366 * never get called.
1367 */
1368static ssize_t ceph_direct_io(int rw, struct kiocb *iocb,
1369                              const struct iovec *iov,
1370                              loff_t pos, unsigned long nr_segs)
1371{
1372        WARN_ON(1);
1373        return -EINVAL;
1374}
1375
1376const struct address_space_operations ceph_aops = {
1377        .readpage = ceph_readpage,
1378        .readpages = ceph_readpages,
1379        .writepage = ceph_writepage,
1380        .writepages = ceph_writepages_start,
1381        .write_begin = ceph_write_begin,
1382        .write_end = ceph_write_end,
1383        .set_page_dirty = ceph_set_page_dirty,
1384        .invalidatepage = ceph_invalidatepage,
1385        .releasepage = ceph_releasepage,
1386        .direct_IO = ceph_direct_io,
1387};
1388
1389static void ceph_block_sigs(sigset_t *oldset)
1390{
1391        sigset_t mask;
1392        siginitsetinv(&mask, sigmask(SIGKILL));
1393        sigprocmask(SIG_BLOCK, &mask, oldset);
1394}
1395
1396static void ceph_restore_sigs(sigset_t *oldset)
1397{
1398        sigprocmask(SIG_SETMASK, oldset, NULL);
1399}
1400
1401/*
1402 * vm ops
1403 */
1404static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
1405{
1406        struct inode *inode = file_inode(vma->vm_file);
1407        struct ceph_inode_info *ci = ceph_inode(inode);
1408        struct ceph_file_info *fi = vma->vm_file->private_data;
1409        struct page *pinned_page = NULL;
1410        loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT;
1411        int want, got, ret;
1412        sigset_t oldset;
1413
1414        ceph_block_sigs(&oldset);
1415
1416        dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n",
1417             inode, ceph_vinop(inode), off, (size_t)PAGE_CACHE_SIZE);
1418        if (fi->fmode & CEPH_FILE_MODE_LAZY)
1419                want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
1420        else
1421                want = CEPH_CAP_FILE_CACHE;
1422
1423        got = 0;
1424        ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
1425        if (ret < 0)
1426                goto out_restore;
1427
1428        dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
1429             inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got));
1430
1431        if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
1432            ci->i_inline_version == CEPH_INLINE_NONE) {
1433                CEPH_DEFINE_RW_CONTEXT(rw_ctx, got);
1434                ceph_add_rw_context(fi, &rw_ctx);
1435                ret = filemap_fault(vma, vmf);
1436                ceph_del_rw_context(fi, &rw_ctx);
1437        } else
1438                ret = -EAGAIN;
1439
1440        dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
1441             inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret);
1442        if (pinned_page)
1443                page_cache_release(pinned_page);
1444        ceph_put_cap_refs(ci, got);
1445
1446        if (ret != -EAGAIN)
1447                goto out_restore;
1448
1449        /* read inline data */
1450        if (off >= PAGE_CACHE_SIZE) {
1451                /* does not support inline data > PAGE_SIZE */
1452                ret = VM_FAULT_SIGBUS;
1453        } else {
1454                int ret1;
1455                struct address_space *mapping = inode->i_mapping;
1456                struct page *page = find_or_create_page(mapping, 0,
1457                                                mapping_gfp_mask(mapping) &
1458                                                ~__GFP_FS);
1459                if (!page) {
1460                        ret = VM_FAULT_OOM;
1461                        goto out_inline;
1462                }
1463                ret1 = __ceph_do_getattr(inode, page,
1464                                         CEPH_STAT_CAP_INLINE_DATA, true);
1465                if (ret1 < 0 || off >= i_size_read(inode)) {
1466                        unlock_page(page);
1467                        page_cache_release(page);
1468                        if (ret1 < 0)
1469                                ret = ret1;
1470                        else
1471                                ret = VM_FAULT_SIGBUS;
1472                        goto out_inline;
1473                }
1474                if (ret1 < PAGE_CACHE_SIZE)
1475                        zero_user_segment(page, ret1, PAGE_CACHE_SIZE);
1476                else
1477                        flush_dcache_page(page);
1478                SetPageUptodate(page);
1479                vmf->page = page;
1480                ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
1481out_inline:
1482                dout("filemap_fault %p %llu~%zd read inline data ret %d\n",
1483                     inode, off, (size_t)PAGE_SIZE, ret);
1484        }
1485out_restore:
1486        ceph_restore_sigs(&oldset);
1487        if (ret < 0)
1488                ret = (ret == -ENOMEM) ? VM_FAULT_OOM : VM_FAULT_SIGBUS;
1489
1490        return ret;
1491}
1492
1493/*
1494 * Reuse write_begin here for simplicity.
1495 */
1496static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1497{
1498        struct inode *inode = file_inode(vma->vm_file);
1499        struct ceph_inode_info *ci = ceph_inode(inode);
1500        struct ceph_file_info *fi = vma->vm_file->private_data;
1501        struct ceph_cap_flush *prealloc_cf;
1502        struct page *page = vmf->page;
1503        loff_t off = page_offset(page);
1504        loff_t size = i_size_read(inode);
1505        size_t len;
1506        int want, got, ret;
1507        sigset_t oldset;
1508
1509        prealloc_cf = ceph_alloc_cap_flush();
1510        if (!prealloc_cf)
1511                return VM_FAULT_OOM;
1512
1513        ceph_block_sigs(&oldset);
1514
1515        if (ci->i_inline_version != CEPH_INLINE_NONE) {
1516                struct page *locked_page = NULL;
1517                if (off == 0) {
1518                        lock_page(page);
1519                        locked_page = page;
1520                }
1521                ret = ceph_uninline_data(vma->vm_file, locked_page);
1522                if (locked_page)
1523                        unlock_page(locked_page);
1524                if (ret < 0)
1525                        goto out_free;
1526        }
1527
1528        if (off + PAGE_CACHE_SIZE <= size)
1529                len = PAGE_CACHE_SIZE;
1530        else
1531                len = size & ~PAGE_CACHE_MASK;
1532
1533        dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n",
1534             inode, ceph_vinop(inode), off, len, size);
1535        if (fi->fmode & CEPH_FILE_MODE_LAZY)
1536                want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1537        else
1538                want = CEPH_CAP_FILE_BUFFER;
1539
1540        got = 0;
1541        ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len,
1542                            &got, NULL);
1543        if (ret < 0)
1544                goto out_free;
1545
1546        dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
1547             inode, off, len, ceph_cap_string(got));
1548
1549        /* Update time before taking page lock */
1550        file_update_time(vma->vm_file);
1551
1552        do {
1553                lock_page(page);
1554
1555                if ((off > size) || (page->mapping != inode->i_mapping)) {
1556                        unlock_page(page);
1557                        ret = VM_FAULT_NOPAGE;
1558                        break;
1559                }
1560
1561                ret = ceph_update_writeable_page(vma->vm_file, off, len, page);
1562                if (ret >= 0) {
1563                        /* success.  we'll keep the page locked. */
1564                        set_page_dirty(page);
1565                        ret = VM_FAULT_LOCKED;
1566                }
1567        } while (ret == -EAGAIN);
1568
1569        if (ret == VM_FAULT_LOCKED ||
1570            ci->i_inline_version != CEPH_INLINE_NONE) {
1571                int dirty;
1572                spin_lock(&ci->i_ceph_lock);
1573                ci->i_inline_version = CEPH_INLINE_NONE;
1574                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
1575                                               &prealloc_cf);
1576                spin_unlock(&ci->i_ceph_lock);
1577                if (dirty)
1578                        __mark_inode_dirty(inode, dirty);
1579        }
1580
1581        dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n",
1582             inode, off, len, ceph_cap_string(got), ret);
1583        ceph_put_cap_refs(ci, got);
1584out_free:
1585        ceph_restore_sigs(&oldset);
1586        ceph_free_cap_flush(prealloc_cf);
1587        if (ret < 0)
1588                ret = (ret == -ENOMEM) ? VM_FAULT_OOM : VM_FAULT_SIGBUS;
1589        return ret;
1590}
1591
1592void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
1593                           char *data, size_t len)
1594{
1595        struct address_space *mapping = inode->i_mapping;
1596        struct page *page;
1597
1598        if (locked_page) {
1599                page = locked_page;
1600        } else {
1601                if (i_size_read(inode) == 0)
1602                        return;
1603                page = find_or_create_page(mapping, 0,
1604                                           mapping_gfp_mask(mapping) & ~__GFP_FS);
1605                if (!page)
1606                        return;
1607                if (PageUptodate(page)) {
1608                        unlock_page(page);
1609                        page_cache_release(page);
1610                        return;
1611                }
1612        }
1613
1614        dout("fill_inline_data %p %llx.%llx len %zu locked_page %p\n",
1615             inode, ceph_vinop(inode), len, locked_page);
1616
1617        if (len > 0) {
1618                void *kaddr = kmap_atomic(page);
1619                memcpy(kaddr, data, len);
1620                kunmap_atomic(kaddr);
1621        }
1622
1623        if (page != locked_page) {
1624                if (len < PAGE_CACHE_SIZE)
1625                        zero_user_segment(page, len, PAGE_CACHE_SIZE);
1626                else
1627                        flush_dcache_page(page);
1628
1629                SetPageUptodate(page);
1630                unlock_page(page);
1631                page_cache_release(page);
1632        }
1633}
1634
1635int ceph_uninline_data(struct file *filp, struct page *locked_page)
1636{
1637        struct inode *inode = file_inode(filp);
1638        struct ceph_inode_info *ci = ceph_inode(inode);
1639        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1640        struct ceph_osd_request *req;
1641        struct page *page = NULL;
1642        u64 len, inline_version;
1643        int err = 0;
1644        bool from_pagecache = false;
1645
1646        spin_lock(&ci->i_ceph_lock);
1647        inline_version = ci->i_inline_version;
1648        spin_unlock(&ci->i_ceph_lock);
1649
1650        dout("uninline_data %p %llx.%llx inline_version %llu\n",
1651             inode, ceph_vinop(inode), inline_version);
1652
1653        if (inline_version == 1 || /* initial version, no data */
1654            inline_version == CEPH_INLINE_NONE)
1655                goto out;
1656
1657        if (locked_page) {
1658                page = locked_page;
1659                WARN_ON(!PageUptodate(page));
1660        } else if (ceph_caps_issued(ci) &
1661                   (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
1662                page = find_get_page(inode->i_mapping, 0);
1663                if (page) {
1664                        if (PageUptodate(page)) {
1665                                from_pagecache = true;
1666                                lock_page(page);
1667                        } else {
1668                                page_cache_release(page);
1669                                page = NULL;
1670                        }
1671                }
1672        }
1673
1674        if (page) {
1675                len = i_size_read(inode);
1676                if (len > PAGE_CACHE_SIZE)
1677                        len = PAGE_CACHE_SIZE;
1678        } else {
1679                page = __page_cache_alloc(GFP_NOFS);
1680                if (!page) {
1681                        err = -ENOMEM;
1682                        goto out;
1683                }
1684                err = __ceph_do_getattr(inode, page,
1685                                        CEPH_STAT_CAP_INLINE_DATA, true);
1686                if (err < 0) {
1687                        /* no inline data */
1688                        if (err == -ENODATA)
1689                                err = 0;
1690                        goto out;
1691                }
1692                len = err;
1693        }
1694
1695        req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1696                                    ceph_vino(inode), 0, &len, 0, 1,
1697                                    CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_WRITE,
1698                                    NULL, 0, 0, false);
1699        if (IS_ERR(req)) {
1700                err = PTR_ERR(req);
1701                goto out;
1702        }
1703
1704        req->r_mtime = inode->i_mtime;
1705        err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1706        if (!err)
1707                err = ceph_osdc_wait_request(&fsc->client->osdc, req);
1708        ceph_osdc_put_request(req);
1709        if (err < 0)
1710                goto out;
1711
1712        req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1713                                    ceph_vino(inode), 0, &len, 1, 3,
1714                                    CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
1715                                    NULL, ci->i_truncate_seq,
1716                                    ci->i_truncate_size, false);
1717        if (IS_ERR(req)) {
1718                err = PTR_ERR(req);
1719                goto out;
1720        }
1721
1722        osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);
1723
1724        {
1725                __le64 xattr_buf = cpu_to_le64(inline_version);
1726                err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
1727                                            "inline_version", &xattr_buf,
1728                                            sizeof(xattr_buf),
1729                                            CEPH_OSD_CMPXATTR_OP_GT,
1730                                            CEPH_OSD_CMPXATTR_MODE_U64);
1731                if (err)
1732                        goto out_put;
1733        }
1734
1735        {
1736                char xattr_buf[32];
1737                int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf),
1738                                         "%llu", inline_version);
1739                err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
1740                                            "inline_version",
1741                                            xattr_buf, xattr_len, 0, 0);
1742                if (err)
1743                        goto out_put;
1744        }
1745
1746        req->r_mtime = inode->i_mtime;
1747        err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1748        if (!err)
1749                err = ceph_osdc_wait_request(&fsc->client->osdc, req);
1750out_put:
1751        ceph_osdc_put_request(req);
1752        if (err == -ECANCELED)
1753                err = 0;
1754out:
1755        if (page && page != locked_page) {
1756                if (from_pagecache) {
1757                        unlock_page(page);
1758                        page_cache_release(page);
1759                } else
1760                        __free_pages(page, 0);
1761        }
1762
1763        dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
1764             inode, ceph_vinop(inode), inline_version, err);
1765        return err;
1766}
1767
1768static struct vm_operations_struct ceph_vmops = {
1769        .fault          = ceph_filemap_fault,
1770        .page_mkwrite   = ceph_page_mkwrite,
1771        .remap_pages    = generic_file_remap_pages,
1772};
1773
1774int ceph_mmap(struct file *file, struct vm_area_struct *vma)
1775{
1776        struct address_space *mapping = file->f_mapping;
1777
1778        if (!mapping->a_ops->readpage)
1779                return -ENOEXEC;
1780        file_accessed(file);
1781        vma->vm_ops = &ceph_vmops;
1782        return 0;
1783}
1784
1785enum {
1786        POOL_READ       = 1,
1787        POOL_WRITE      = 2,
1788};
1789
1790static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
1791                                s64 pool, struct ceph_string *pool_ns)
1792{
1793        struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
1794        struct ceph_mds_client *mdsc = fsc->mdsc;
1795        struct ceph_osd_request *rd_req = NULL, *wr_req = NULL;
1796        struct rb_node **p, *parent;
1797        struct ceph_pool_perm *perm;
1798        struct page **pages;
1799        size_t pool_ns_len;
1800        int err = 0, err2 = 0, have = 0;
1801
1802        down_read(&mdsc->pool_perm_rwsem);
1803        p = &mdsc->pool_perm_tree.rb_node;
1804        while (*p) {
1805                perm = rb_entry(*p, struct ceph_pool_perm, node);
1806                if (pool < perm->pool)
1807                        p = &(*p)->rb_left;
1808                else if (pool > perm->pool)
1809                        p = &(*p)->rb_right;
1810                else {
1811                        int ret = ceph_compare_string(pool_ns,
1812                                                perm->pool_ns,
1813                                                perm->pool_ns_len);
1814                        if (ret < 0)
1815                                p = &(*p)->rb_left;
1816                        else if (ret > 0)
1817                                p = &(*p)->rb_right;
1818                        else {
1819                                have = perm->perm;
1820                                break;
1821                        }
1822                }
1823        }
1824        up_read(&mdsc->pool_perm_rwsem);
1825        if (*p)
1826                goto out;
1827
1828        if (pool_ns)
1829                dout("__ceph_pool_perm_get pool %lld ns %.*s no perm cached\n",
1830                     pool, (int)pool_ns->len, pool_ns->str);
1831        else
1832                dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool);
1833
1834        down_write(&mdsc->pool_perm_rwsem);
1835        p = &mdsc->pool_perm_tree.rb_node;
1836        parent = NULL;
1837        while (*p) {
1838                parent = *p;
1839                perm = rb_entry(parent, struct ceph_pool_perm, node);
1840                if (pool < perm->pool)
1841                        p = &(*p)->rb_left;
1842                else if (pool > perm->pool)
1843                        p = &(*p)->rb_right;
1844                else {
1845                        int ret = ceph_compare_string(pool_ns,
1846                                                perm->pool_ns,
1847                                                perm->pool_ns_len);
1848                        if (ret < 0)
1849                                p = &(*p)->rb_left;
1850                        else if (ret > 0)
1851                                p = &(*p)->rb_right;
1852                        else {
1853                                have = perm->perm;
1854                                break;
1855                        }
1856                }
1857        }
1858        if (*p) {
1859                up_write(&mdsc->pool_perm_rwsem);
1860                goto out;
1861        }
1862
1863        rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1864                                         1, false, GFP_NOFS);
1865        if (!rd_req) {
1866                err = -ENOMEM;
1867                goto out_unlock;
1868        }
1869
1870        rd_req->r_flags = CEPH_OSD_FLAG_READ;
1871        osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
1872        rd_req->r_base_oloc.pool = pool;
1873        if (pool_ns)
1874                rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns);
1875        ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino);
1876
1877        err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS);
1878        if (err)
1879                goto out_unlock;
1880
1881        wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1882                                         1, false, GFP_NOFS);
1883        if (!wr_req) {
1884                err = -ENOMEM;
1885                goto out_unlock;
1886        }
1887
1888        wr_req->r_flags = CEPH_OSD_FLAG_WRITE;
1889        osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL);
1890        ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc);
1891        ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid);
1892
1893        err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS);
1894        if (err)
1895                goto out_unlock;
1896
1897        /* one page should be large enough for STAT data */
1898        pages = ceph_alloc_page_vector(1, GFP_KERNEL);
1899        if (IS_ERR(pages)) {
1900                err = PTR_ERR(pages);
1901                goto out_unlock;
1902        }
1903
1904        osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,
1905                                     0, false, true);
1906        err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
1907
1908        wr_req->r_mtime = ci->vfs_inode.i_mtime;
1909        wr_req->r_abort_on_full = true;
1910        err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
1911
1912        if (!err)
1913                err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req);
1914        if (!err2)
1915                err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req);
1916
1917        if (err >= 0 || err == -ENOENT)
1918                have |= POOL_READ;
1919        else if (err != -EPERM)
1920                goto out_unlock;
1921
1922        if (err2 == 0 || err2 == -EEXIST)
1923                have |= POOL_WRITE;
1924        else if (err2 != -EPERM) {
1925                err = err2;
1926                goto out_unlock;
1927        }
1928
1929        pool_ns_len = pool_ns ? pool_ns->len : 0;
1930        perm = kmalloc(sizeof(*perm) + pool_ns_len + 1, GFP_NOFS);
1931        if (!perm) {
1932                err = -ENOMEM;
1933                goto out_unlock;
1934        }
1935
1936        perm->pool = pool;
1937        perm->perm = have;
1938        perm->pool_ns_len = pool_ns_len;
1939        if (pool_ns_len > 0)
1940                memcpy(perm->pool_ns, pool_ns->str, pool_ns_len);
1941        perm->pool_ns[pool_ns_len] = 0;
1942
1943        rb_link_node(&perm->node, parent, p);
1944        rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
1945        err = 0;
1946out_unlock:
1947        up_write(&mdsc->pool_perm_rwsem);
1948
1949        ceph_osdc_put_request(rd_req);
1950        ceph_osdc_put_request(wr_req);
1951out:
1952        if (!err)
1953                err = have;
1954        if (pool_ns)
1955                dout("__ceph_pool_perm_get pool %lld ns %.*s result = %d\n",
1956                     pool, (int)pool_ns->len, pool_ns->str, err);
1957        else
1958                dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err);
1959        return err;
1960}
1961
1962int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
1963{
1964        s64 pool;
1965        struct ceph_string *pool_ns;
1966        int ret, flags;
1967
1968        if (ci->i_vino.snap != CEPH_NOSNAP) {
1969                /*
1970                 * Pool permission check needs to write to the first object.
1971                 * But for snapshot, head of the first object may have alread
1972                 * been deleted. Skip check to avoid creating orphan object.
1973                 */
1974                return 0;
1975        }
1976
1977        if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
1978                                NOPOOLPERM))
1979                return 0;
1980
1981        spin_lock(&ci->i_ceph_lock);
1982        flags = ci->i_ceph_flags;
1983        pool = ci->i_layout.pool_id;
1984        spin_unlock(&ci->i_ceph_lock);
1985check:
1986        if (flags & CEPH_I_POOL_PERM) {
1987                if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) {
1988                        dout("ceph_pool_perm_check pool %lld no read perm\n",
1989                             pool);
1990                        return -EPERM;
1991                }
1992                if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) {
1993                        dout("ceph_pool_perm_check pool %lld no write perm\n",
1994                             pool);
1995                        return -EPERM;
1996                }
1997                return 0;
1998        }
1999
2000        pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
2001        ret = __ceph_pool_perm_get(ci, pool, pool_ns);
2002        ceph_put_string(pool_ns);
2003        if (ret < 0)
2004                return ret;
2005
2006        flags = CEPH_I_POOL_PERM;
2007        if (ret & POOL_READ)
2008                flags |= CEPH_I_POOL_RD;
2009        if (ret & POOL_WRITE)
2010                flags |= CEPH_I_POOL_WR;
2011
2012        spin_lock(&ci->i_ceph_lock);
2013        if (pool == ci->i_layout.pool_id &&
2014            pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) {
2015                ci->i_ceph_flags |= flags;
2016        } else {
2017                pool = ci->i_layout.pool_id;
2018                flags = ci->i_ceph_flags;
2019        }
2020        spin_unlock(&ci->i_ceph_lock);
2021        goto check;
2022}
2023
2024void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc)
2025{
2026        struct ceph_pool_perm *perm;
2027        struct rb_node *n;
2028
2029        while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) {
2030                n = rb_first(&mdsc->pool_perm_tree);
2031                perm = rb_entry(n, struct ceph_pool_perm, node);
2032                rb_erase(n, &mdsc->pool_perm_tree);
2033                kfree(perm);
2034        }
2035}
2036