linux/fs/ceph/inode.c
<<
>>
Prefs
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/module.h>
   4#include <linux/fs.h>
   5#include <linux/slab.h>
   6#include <linux/string.h>
   7#include <linux/uaccess.h>
   8#include <linux/kernel.h>
   9#include <linux/namei.h>
  10#include <linux/writeback.h>
  11#include <linux/vmalloc.h>
  12#include <linux/random.h>
  13#include <linux/sort.h>
  14
  15#include "super.h"
  16#include "mds_client.h"
  17#include <linux/ceph/decode.h>
  18
  19/*
  20 * Ceph inode operations
  21 *
  22 * Implement basic inode helpers (get, alloc) and inode ops (getattr,
  23 * setattr, etc.), xattr helpers, and helpers for assimilating
  24 * metadata returned by the MDS into our cache.
  25 *
  26 * Also define helpers for doing asynchronous writeback, invalidation,
  27 * and truncation for the benefit of those who can't afford to block
  28 * (typically because they are in the message handler path).
  29 */
  30
  31static const struct inode_operations ceph_symlink_iops;
  32
  33static void ceph_invalidate_work(struct work_struct *work);
  34static void ceph_writeback_work(struct work_struct *work);
  35static void ceph_vmtruncate_work(struct work_struct *work);
  36
  37/*
  38 * find or create an inode, given the ceph ino number
  39 */
  40static int ceph_set_ino_cb(struct inode *inode, void *data)
  41{
  42        ceph_inode(inode)->i_vino = *(struct ceph_vino *)data;
  43        inode->i_ino = ceph_vino_to_ino(*(struct ceph_vino *)data);
  44        return 0;
  45}
  46
  47struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino)
  48{
  49        struct inode *inode;
  50        ino_t t = ceph_vino_to_ino(vino);
  51
  52        inode = iget5_locked(sb, t, ceph_ino_compare, ceph_set_ino_cb, &vino);
  53        if (!inode)
  54                return ERR_PTR(-ENOMEM);
  55        if (inode->i_state & I_NEW) {
  56                dout("get_inode created new inode %p %llx.%llx ino %llx\n",
  57                     inode, ceph_vinop(inode), (u64)inode->i_ino);
  58                unlock_new_inode(inode);
  59        }
  60
  61        dout("get_inode on %lu=%llx.%llx got %p\n", inode->i_ino, vino.ino,
  62             vino.snap, inode);
  63        return inode;
  64}
  65
  66/*
  67 * get/constuct snapdir inode for a given directory
  68 */
  69struct inode *ceph_get_snapdir(struct inode *parent)
  70{
  71        struct ceph_vino vino = {
  72                .ino = ceph_ino(parent),
  73                .snap = CEPH_SNAPDIR,
  74        };
  75        struct inode *inode = ceph_get_inode(parent->i_sb, vino);
  76        struct ceph_inode_info *ci = ceph_inode(inode);
  77
  78        BUG_ON(!S_ISDIR(parent->i_mode));
  79        if (IS_ERR(inode))
  80                return inode;
  81        inode->i_mode = parent->i_mode;
  82        inode->i_uid = parent->i_uid;
  83        inode->i_gid = parent->i_gid;
  84        inode->i_op = &ceph_snapdir_iops;
  85        inode->i_fop = &ceph_snapdir_fops;
  86        ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */
  87        ci->i_rbytes = 0;
  88        return inode;
  89}
  90
  91const struct inode_operations ceph_file_iops = {
  92        .permission = ceph_permission,
  93        .setattr = ceph_setattr,
  94        .getattr = ceph_getattr,
  95        .setxattr = ceph_setxattr,
  96        .getxattr = ceph_getxattr,
  97        .listxattr = ceph_listxattr,
  98        .removexattr = ceph_removexattr,
  99        .get_acl = ceph_get_acl,
 100};
 101
 102
 103/*
 104 * We use a 'frag tree' to keep track of the MDS's directory fragments
 105 * for a given inode (usually there is just a single fragment).  We
 106 * need to know when a child frag is delegated to a new MDS, or when
 107 * it is flagged as replicated, so we can direct our requests
 108 * accordingly.
 109 */
 110
 111/*
 112 * find/create a frag in the tree
 113 */
 114static struct ceph_inode_frag *__get_or_create_frag(struct ceph_inode_info *ci,
 115                                                    u32 f)
 116{
 117        struct rb_node **p;
 118        struct rb_node *parent = NULL;
 119        struct ceph_inode_frag *frag;
 120        int c;
 121
 122        p = &ci->i_fragtree.rb_node;
 123        while (*p) {
 124                parent = *p;
 125                frag = rb_entry(parent, struct ceph_inode_frag, node);
 126                c = ceph_frag_compare(f, frag->frag);
 127                if (c < 0)
 128                        p = &(*p)->rb_left;
 129                else if (c > 0)
 130                        p = &(*p)->rb_right;
 131                else
 132                        return frag;
 133        }
 134
 135        frag = kmalloc(sizeof(*frag), GFP_NOFS);
 136        if (!frag)
 137                return ERR_PTR(-ENOMEM);
 138
 139        frag->frag = f;
 140        frag->split_by = 0;
 141        frag->mds = -1;
 142        frag->ndist = 0;
 143
 144        rb_link_node(&frag->node, parent, p);
 145        rb_insert_color(&frag->node, &ci->i_fragtree);
 146
 147        dout("get_or_create_frag added %llx.%llx frag %x\n",
 148             ceph_vinop(&ci->vfs_inode), f);
 149        return frag;
 150}
 151
 152/*
 153 * find a specific frag @f
 154 */
 155struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, u32 f)
 156{
 157        struct rb_node *n = ci->i_fragtree.rb_node;
 158
 159        while (n) {
 160                struct ceph_inode_frag *frag =
 161                        rb_entry(n, struct ceph_inode_frag, node);
 162                int c = ceph_frag_compare(f, frag->frag);
 163                if (c < 0)
 164                        n = n->rb_left;
 165                else if (c > 0)
 166                        n = n->rb_right;
 167                else
 168                        return frag;
 169        }
 170        return NULL;
 171}
 172
 173/*
 174 * Choose frag containing the given value @v.  If @pfrag is
 175 * specified, copy the frag delegation info to the caller if
 176 * it is present.
 177 */
 178static u32 __ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
 179                              struct ceph_inode_frag *pfrag, int *found)
 180{
 181        u32 t = ceph_frag_make(0, 0);
 182        struct ceph_inode_frag *frag;
 183        unsigned nway, i;
 184        u32 n;
 185
 186        if (found)
 187                *found = 0;
 188
 189        while (1) {
 190                WARN_ON(!ceph_frag_contains_value(t, v));
 191                frag = __ceph_find_frag(ci, t);
 192                if (!frag)
 193                        break; /* t is a leaf */
 194                if (frag->split_by == 0) {
 195                        if (pfrag)
 196                                memcpy(pfrag, frag, sizeof(*pfrag));
 197                        if (found)
 198                                *found = 1;
 199                        break;
 200                }
 201
 202                /* choose child */
 203                nway = 1 << frag->split_by;
 204                dout("choose_frag(%x) %x splits by %d (%d ways)\n", v, t,
 205                     frag->split_by, nway);
 206                for (i = 0; i < nway; i++) {
 207                        n = ceph_frag_make_child(t, frag->split_by, i);
 208                        if (ceph_frag_contains_value(n, v)) {
 209                                t = n;
 210                                break;
 211                        }
 212                }
 213                BUG_ON(i == nway);
 214        }
 215        dout("choose_frag(%x) = %x\n", v, t);
 216
 217        return t;
 218}
 219
 220u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
 221                     struct ceph_inode_frag *pfrag, int *found)
 222{
 223        u32 ret;
 224        mutex_lock(&ci->i_fragtree_mutex);
 225        ret = __ceph_choose_frag(ci, v, pfrag, found);
 226        mutex_unlock(&ci->i_fragtree_mutex);
 227        return ret;
 228}
 229
 230/*
 231 * Process dirfrag (delegation) info from the mds.  Include leaf
 232 * fragment in tree ONLY if ndist > 0.  Otherwise, only
 233 * branches/splits are included in i_fragtree)
 234 */
 235static int ceph_fill_dirfrag(struct inode *inode,
 236                             struct ceph_mds_reply_dirfrag *dirinfo)
 237{
 238        struct ceph_inode_info *ci = ceph_inode(inode);
 239        struct ceph_inode_frag *frag;
 240        u32 id = le32_to_cpu(dirinfo->frag);
 241        int mds = le32_to_cpu(dirinfo->auth);
 242        int ndist = le32_to_cpu(dirinfo->ndist);
 243        int diri_auth = -1;
 244        int i;
 245        int err = 0;
 246
 247        spin_lock(&ci->i_ceph_lock);
 248        if (ci->i_auth_cap)
 249                diri_auth = ci->i_auth_cap->mds;
 250        spin_unlock(&ci->i_ceph_lock);
 251
 252        if (mds == -1) /* CDIR_AUTH_PARENT */
 253                mds = diri_auth;
 254
 255        mutex_lock(&ci->i_fragtree_mutex);
 256        if (ndist == 0 && mds == diri_auth) {
 257                /* no delegation info needed. */
 258                frag = __ceph_find_frag(ci, id);
 259                if (!frag)
 260                        goto out;
 261                if (frag->split_by == 0) {
 262                        /* tree leaf, remove */
 263                        dout("fill_dirfrag removed %llx.%llx frag %x"
 264                             " (no ref)\n", ceph_vinop(inode), id);
 265                        rb_erase(&frag->node, &ci->i_fragtree);
 266                        kfree(frag);
 267                } else {
 268                        /* tree branch, keep and clear */
 269                        dout("fill_dirfrag cleared %llx.%llx frag %x"
 270                             " referral\n", ceph_vinop(inode), id);
 271                        frag->mds = -1;
 272                        frag->ndist = 0;
 273                }
 274                goto out;
 275        }
 276
 277
 278        /* find/add this frag to store mds delegation info */
 279        frag = __get_or_create_frag(ci, id);
 280        if (IS_ERR(frag)) {
 281                /* this is not the end of the world; we can continue
 282                   with bad/inaccurate delegation info */
 283                pr_err("fill_dirfrag ENOMEM on mds ref %llx.%llx fg %x\n",
 284                       ceph_vinop(inode), le32_to_cpu(dirinfo->frag));
 285                err = -ENOMEM;
 286                goto out;
 287        }
 288
 289        frag->mds = mds;
 290        frag->ndist = min_t(u32, ndist, CEPH_MAX_DIRFRAG_REP);
 291        for (i = 0; i < frag->ndist; i++)
 292                frag->dist[i] = le32_to_cpu(dirinfo->dist[i]);
 293        dout("fill_dirfrag %llx.%llx frag %x ndist=%d\n",
 294             ceph_vinop(inode), frag->frag, frag->ndist);
 295
 296out:
 297        mutex_unlock(&ci->i_fragtree_mutex);
 298        return err;
 299}
 300
 301static int frag_tree_split_cmp(const void *l, const void *r)
 302{
 303        struct ceph_frag_tree_split *ls = (struct ceph_frag_tree_split*)l;
 304        struct ceph_frag_tree_split *rs = (struct ceph_frag_tree_split*)r;
 305        return ceph_frag_compare(le32_to_cpu(ls->frag),
 306                                 le32_to_cpu(rs->frag));
 307}
 308
 309static bool is_frag_child(u32 f, struct ceph_inode_frag *frag)
 310{
 311        if (!frag)
 312                return f == ceph_frag_make(0, 0);
 313        if (ceph_frag_bits(f) != ceph_frag_bits(frag->frag) + frag->split_by)
 314                return false;
 315        return ceph_frag_contains_value(frag->frag, ceph_frag_value(f));
 316}
 317
 318static int ceph_fill_fragtree(struct inode *inode,
 319                              struct ceph_frag_tree_head *fragtree,
 320                              struct ceph_mds_reply_dirfrag *dirinfo)
 321{
 322        struct ceph_inode_info *ci = ceph_inode(inode);
 323        struct ceph_inode_frag *frag, *prev_frag = NULL;
 324        struct rb_node *rb_node;
 325        unsigned i, split_by, nsplits;
 326        u32 id;
 327        bool update = false;
 328
 329        mutex_lock(&ci->i_fragtree_mutex);
 330        nsplits = le32_to_cpu(fragtree->nsplits);
 331        if (nsplits != ci->i_fragtree_nsplits) {
 332                update = true;
 333        } else if (nsplits) {
 334                i = prandom_u32() % nsplits;
 335                id = le32_to_cpu(fragtree->splits[i].frag);
 336                if (!__ceph_find_frag(ci, id))
 337                        update = true;
 338        } else if (!RB_EMPTY_ROOT(&ci->i_fragtree)) {
 339                rb_node = rb_first(&ci->i_fragtree);
 340                frag = rb_entry(rb_node, struct ceph_inode_frag, node);
 341                if (frag->frag != ceph_frag_make(0, 0) || rb_next(rb_node))
 342                        update = true;
 343        }
 344        if (!update && dirinfo) {
 345                id = le32_to_cpu(dirinfo->frag);
 346                if (id != __ceph_choose_frag(ci, id, NULL, NULL))
 347                        update = true;
 348        }
 349        if (!update)
 350                goto out_unlock;
 351
 352        if (nsplits > 1) {
 353                sort(fragtree->splits, nsplits, sizeof(fragtree->splits[0]),
 354                     frag_tree_split_cmp, NULL);
 355        }
 356
 357        dout("fill_fragtree %llx.%llx\n", ceph_vinop(inode));
 358        rb_node = rb_first(&ci->i_fragtree);
 359        for (i = 0; i < nsplits; i++) {
 360                id = le32_to_cpu(fragtree->splits[i].frag);
 361                split_by = le32_to_cpu(fragtree->splits[i].by);
 362                if (split_by == 0 || ceph_frag_bits(id) + split_by > 24) {
 363                        pr_err("fill_fragtree %llx.%llx invalid split %d/%u, "
 364                               "frag %x split by %d\n", ceph_vinop(inode),
 365                               i, nsplits, id, split_by);
 366                        continue;
 367                }
 368                frag = NULL;
 369                while (rb_node) {
 370                        frag = rb_entry(rb_node, struct ceph_inode_frag, node);
 371                        if (ceph_frag_compare(frag->frag, id) >= 0) {
 372                                if (frag->frag != id)
 373                                        frag = NULL;
 374                                else
 375                                        rb_node = rb_next(rb_node);
 376                                break;
 377                        }
 378                        rb_node = rb_next(rb_node);
 379                        /* delete stale split/leaf node */
 380                        if (frag->split_by > 0 ||
 381                            !is_frag_child(frag->frag, prev_frag)) {
 382                                rb_erase(&frag->node, &ci->i_fragtree);
 383                                if (frag->split_by > 0)
 384                                        ci->i_fragtree_nsplits--;
 385                                kfree(frag);
 386                        }
 387                        frag = NULL;
 388                }
 389                if (!frag) {
 390                        frag = __get_or_create_frag(ci, id);
 391                        if (IS_ERR(frag))
 392                                continue;
 393                }
 394                if (frag->split_by == 0)
 395                        ci->i_fragtree_nsplits++;
 396                frag->split_by = split_by;
 397                dout(" frag %x split by %d\n", frag->frag, frag->split_by);
 398                prev_frag = frag;
 399        }
 400        while (rb_node) {
 401                frag = rb_entry(rb_node, struct ceph_inode_frag, node);
 402                rb_node = rb_next(rb_node);
 403                /* delete stale split/leaf node */
 404                if (frag->split_by > 0 ||
 405                    !is_frag_child(frag->frag, prev_frag)) {
 406                        rb_erase(&frag->node, &ci->i_fragtree);
 407                        if (frag->split_by > 0)
 408                                ci->i_fragtree_nsplits--;
 409                        kfree(frag);
 410                }
 411        }
 412out_unlock:
 413        mutex_unlock(&ci->i_fragtree_mutex);
 414        return 0;
 415}
 416
 417/*
 418 * initialize a newly allocated inode.
 419 */
 420struct inode *ceph_alloc_inode(struct super_block *sb)
 421{
 422        struct ceph_inode_info *ci;
 423        int i;
 424
 425        ci = kmem_cache_alloc(ceph_inode_cachep, GFP_NOFS);
 426        if (!ci)
 427                return NULL;
 428
 429        dout("alloc_inode %p\n", &ci->vfs_inode);
 430
 431        spin_lock_init(&ci->i_ceph_lock);
 432
 433        ci->i_version = 0;
 434        ci->i_inline_version = 0;
 435        ci->i_time_warp_seq = 0;
 436        ci->i_ceph_flags = 0;
 437        atomic64_set(&ci->i_ordered_count, 1);
 438        atomic64_set(&ci->i_release_count, 1);
 439        atomic64_set(&ci->i_complete_seq[0], 0);
 440        atomic64_set(&ci->i_complete_seq[1], 0);
 441        ci->i_symlink = NULL;
 442
 443        memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
 444        ci->i_pool_ns_len = 0;
 445
 446        ci->i_fragtree = RB_ROOT;
 447        mutex_init(&ci->i_fragtree_mutex);
 448
 449        ci->i_xattrs.blob = NULL;
 450        ci->i_xattrs.prealloc_blob = NULL;
 451        ci->i_xattrs.dirty = false;
 452        ci->i_xattrs.index = RB_ROOT;
 453        ci->i_xattrs.count = 0;
 454        ci->i_xattrs.names_size = 0;
 455        ci->i_xattrs.vals_size = 0;
 456        ci->i_xattrs.version = 0;
 457        ci->i_xattrs.index_version = 0;
 458
 459        ci->i_caps = RB_ROOT;
 460        ci->i_auth_cap = NULL;
 461        ci->i_dirty_caps = 0;
 462        ci->i_flushing_caps = 0;
 463        INIT_LIST_HEAD(&ci->i_dirty_item);
 464        INIT_LIST_HEAD(&ci->i_flushing_item);
 465        ci->i_prealloc_cap_flush = NULL;
 466        INIT_LIST_HEAD(&ci->i_cap_flush_list);
 467        init_waitqueue_head(&ci->i_cap_wq);
 468        ci->i_hold_caps_min = 0;
 469        ci->i_hold_caps_max = 0;
 470        INIT_LIST_HEAD(&ci->i_cap_delay_list);
 471        INIT_LIST_HEAD(&ci->i_cap_snaps);
 472        ci->i_head_snapc = NULL;
 473        ci->i_snap_caps = 0;
 474
 475        for (i = 0; i < CEPH_FILE_MODE_BITS; i++)
 476                ci->i_nr_by_mode[i] = 0;
 477
 478        mutex_init(&ci->i_truncate_mutex);
 479        ci->i_truncate_seq = 0;
 480        ci->i_truncate_size = 0;
 481        ci->i_truncate_pending = 0;
 482
 483        ci->i_max_size = 0;
 484        ci->i_reported_size = 0;
 485        ci->i_wanted_max_size = 0;
 486        ci->i_requested_max_size = 0;
 487
 488        ci->i_pin_ref = 0;
 489        ci->i_rd_ref = 0;
 490        ci->i_rdcache_ref = 0;
 491        ci->i_wr_ref = 0;
 492        ci->i_wb_ref = 0;
 493        ci->i_wrbuffer_ref = 0;
 494        ci->i_wrbuffer_ref_head = 0;
 495        ci->i_shared_gen = 0;
 496        ci->i_rdcache_gen = 0;
 497        ci->i_rdcache_revoking = 0;
 498
 499        INIT_LIST_HEAD(&ci->i_unsafe_dirops);
 500        INIT_LIST_HEAD(&ci->i_unsafe_iops);
 501        spin_lock_init(&ci->i_unsafe_lock);
 502
 503        ci->i_snap_realm = NULL;
 504        INIT_LIST_HEAD(&ci->i_snap_realm_item);
 505        INIT_LIST_HEAD(&ci->i_snap_flush_item);
 506
 507        INIT_WORK(&ci->i_wb_work, ceph_writeback_work);
 508        INIT_WORK(&ci->i_pg_inv_work, ceph_invalidate_work);
 509
 510        INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work);
 511
 512        return &ci->vfs_inode;
 513}
 514
 515static void ceph_i_callback(struct rcu_head *head)
 516{
 517        struct inode *inode = container_of(head, struct inode, i_rcu);
 518        struct ceph_inode_info *ci = ceph_inode(inode);
 519
 520        kmem_cache_free(ceph_inode_cachep, ci);
 521}
 522
 523void ceph_destroy_inode(struct inode *inode)
 524{
 525        struct ceph_inode_info *ci = ceph_inode(inode);
 526        struct ceph_inode_frag *frag;
 527        struct rb_node *n;
 528
 529        dout("destroy_inode %p ino %llx.%llx\n", inode, ceph_vinop(inode));
 530
 531        ceph_queue_caps_release(inode);
 532
 533        /*
 534         * we may still have a snap_realm reference if there are stray
 535         * caps in i_snap_caps.
 536         */
 537        if (ci->i_snap_realm) {
 538                struct ceph_mds_client *mdsc =
 539                        ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
 540                struct ceph_snap_realm *realm = ci->i_snap_realm;
 541
 542                dout(" dropping residual ref to snap realm %p\n", realm);
 543                spin_lock(&realm->inodes_with_caps_lock);
 544                list_del_init(&ci->i_snap_realm_item);
 545                spin_unlock(&realm->inodes_with_caps_lock);
 546                ceph_put_snap_realm(mdsc, realm);
 547        }
 548
 549        kfree(ci->i_symlink);
 550        while ((n = rb_first(&ci->i_fragtree)) != NULL) {
 551                frag = rb_entry(n, struct ceph_inode_frag, node);
 552                rb_erase(n, &ci->i_fragtree);
 553                kfree(frag);
 554        }
 555        ci->i_fragtree_nsplits = 0;
 556
 557        __ceph_destroy_xattrs(ci);
 558        if (ci->i_xattrs.blob)
 559                ceph_buffer_put(ci->i_xattrs.blob);
 560        if (ci->i_xattrs.prealloc_blob)
 561                ceph_buffer_put(ci->i_xattrs.prealloc_blob);
 562
 563        call_rcu(&inode->i_rcu, ceph_i_callback);
 564}
 565
 566int ceph_drop_inode(struct inode *inode)
 567{
 568        /*
 569         * Positve dentry and corresponding inode are always accompanied
 570         * in MDS reply. So no need to keep inode in the cache after
 571         * dropping all its aliases.
 572         */
 573        return 1;
 574}
 575
 576static inline blkcnt_t calc_inode_blocks(u64 size)
 577{
 578        return (size + (1<<9) - 1) >> 9;
 579}
 580
 581/*
 582 * Helpers to fill in size, ctime, mtime, and atime.  We have to be
 583 * careful because either the client or MDS may have more up to date
 584 * info, depending on which capabilities are held, and whether
 585 * time_warp_seq or truncate_seq have increased.  (Ordinarily, mtime
 586 * and size are monotonically increasing, except when utimes() or
 587 * truncate() increments the corresponding _seq values.)
 588 */
 589int ceph_fill_file_size(struct inode *inode, int issued,
 590                        u32 truncate_seq, u64 truncate_size, u64 size)
 591{
 592        struct ceph_inode_info *ci = ceph_inode(inode);
 593        int queue_trunc = 0;
 594
 595        if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
 596            (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) {
 597                dout("size %lld -> %llu\n", inode->i_size, size);
 598                if (size > 0 && S_ISDIR(inode->i_mode)) {
 599                        pr_err("fill_file_size non-zero size for directory\n");
 600                        size = 0;
 601                }
 602                i_size_write(inode, size);
 603                inode->i_blocks = calc_inode_blocks(size);
 604                ci->i_reported_size = size;
 605                if (truncate_seq != ci->i_truncate_seq) {
 606                        dout("truncate_seq %u -> %u\n",
 607                             ci->i_truncate_seq, truncate_seq);
 608                        ci->i_truncate_seq = truncate_seq;
 609
 610                        /* the MDS should have revoked these caps */
 611                        WARN_ON_ONCE(issued & (CEPH_CAP_FILE_EXCL |
 612                                               CEPH_CAP_FILE_RD |
 613                                               CEPH_CAP_FILE_WR |
 614                                               CEPH_CAP_FILE_LAZYIO));
 615                        /*
 616                         * If we hold relevant caps, or in the case where we're
 617                         * not the only client referencing this file and we
 618                         * don't hold those caps, then we need to check whether
 619                         * the file is either opened or mmaped
 620                         */
 621                        if ((issued & (CEPH_CAP_FILE_CACHE|
 622                                       CEPH_CAP_FILE_BUFFER)) ||
 623                            mapping_mapped(inode->i_mapping) ||
 624                            __ceph_caps_file_wanted(ci)) {
 625                                ci->i_truncate_pending++;
 626                                queue_trunc = 1;
 627                        }
 628                }
 629        }
 630        if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) >= 0 &&
 631            ci->i_truncate_size != truncate_size) {
 632                dout("truncate_size %lld -> %llu\n", ci->i_truncate_size,
 633                     truncate_size);
 634                ci->i_truncate_size = truncate_size;
 635        }
 636        return queue_trunc;
 637}
 638
 639void ceph_fill_file_time(struct inode *inode, int issued,
 640                         u64 time_warp_seq, struct timespec *ctime,
 641                         struct timespec *mtime, struct timespec *atime)
 642{
 643        struct ceph_inode_info *ci = ceph_inode(inode);
 644        int warn = 0;
 645
 646        if (issued & (CEPH_CAP_FILE_EXCL|
 647                      CEPH_CAP_FILE_WR|
 648                      CEPH_CAP_FILE_BUFFER|
 649                      CEPH_CAP_AUTH_EXCL|
 650                      CEPH_CAP_XATTR_EXCL)) {
 651                if (timespec_compare(ctime, &inode->i_ctime) > 0) {
 652                        dout("ctime %ld.%09ld -> %ld.%09ld inc w/ cap\n",
 653                             inode->i_ctime.tv_sec, inode->i_ctime.tv_nsec,
 654                             ctime->tv_sec, ctime->tv_nsec);
 655                        inode->i_ctime = *ctime;
 656                }
 657                if (ceph_seq_cmp(time_warp_seq, ci->i_time_warp_seq) > 0) {
 658                        /* the MDS did a utimes() */
 659                        dout("mtime %ld.%09ld -> %ld.%09ld "
 660                             "tw %d -> %d\n",
 661                             inode->i_mtime.tv_sec, inode->i_mtime.tv_nsec,
 662                             mtime->tv_sec, mtime->tv_nsec,
 663                             ci->i_time_warp_seq, (int)time_warp_seq);
 664
 665                        inode->i_mtime = *mtime;
 666                        inode->i_atime = *atime;
 667                        ci->i_time_warp_seq = time_warp_seq;
 668                } else if (time_warp_seq == ci->i_time_warp_seq) {
 669                        /* nobody did utimes(); take the max */
 670                        if (timespec_compare(mtime, &inode->i_mtime) > 0) {
 671                                dout("mtime %ld.%09ld -> %ld.%09ld inc\n",
 672                                     inode->i_mtime.tv_sec,
 673                                     inode->i_mtime.tv_nsec,
 674                                     mtime->tv_sec, mtime->tv_nsec);
 675                                inode->i_mtime = *mtime;
 676                        }
 677                        if (timespec_compare(atime, &inode->i_atime) > 0) {
 678                                dout("atime %ld.%09ld -> %ld.%09ld inc\n",
 679                                     inode->i_atime.tv_sec,
 680                                     inode->i_atime.tv_nsec,
 681                                     atime->tv_sec, atime->tv_nsec);
 682                                inode->i_atime = *atime;
 683                        }
 684                } else if (issued & CEPH_CAP_FILE_EXCL) {
 685                        /* we did a utimes(); ignore mds values */
 686                } else {
 687                        warn = 1;
 688                }
 689        } else {
 690                /* we have no write|excl caps; whatever the MDS says is true */
 691                if (ceph_seq_cmp(time_warp_seq, ci->i_time_warp_seq) >= 0) {
 692                        inode->i_ctime = *ctime;
 693                        inode->i_mtime = *mtime;
 694                        inode->i_atime = *atime;
 695                        ci->i_time_warp_seq = time_warp_seq;
 696                } else {
 697                        warn = 1;
 698                }
 699        }
 700        if (warn) /* time_warp_seq shouldn't go backwards */
 701                dout("%p mds time_warp_seq %llu < %u\n",
 702                     inode, time_warp_seq, ci->i_time_warp_seq);
 703}
 704
 705/*
 706 * Populate an inode based on info from mds.  May be called on new or
 707 * existing inodes.
 708 */
 709static int fill_inode(struct inode *inode, struct page *locked_page,
 710                      struct ceph_mds_reply_info_in *iinfo,
 711                      struct ceph_mds_reply_dirfrag *dirinfo,
 712                      struct ceph_mds_session *session,
 713                      unsigned long ttl_from, int cap_fmode,
 714                      struct ceph_cap_reservation *caps_reservation)
 715{
 716        struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
 717        struct ceph_mds_reply_inode *info = iinfo->in;
 718        struct ceph_inode_info *ci = ceph_inode(inode);
 719        int issued = 0, implemented, new_issued;
 720        struct timespec mtime, atime, ctime;
 721        struct ceph_buffer *xattr_blob = NULL;
 722        struct ceph_cap *new_cap = NULL;
 723        int err = 0;
 724        bool wake = false;
 725        bool queue_trunc = false;
 726        bool new_version = false;
 727        bool fill_inline = false;
 728
 729        dout("fill_inode %p ino %llx.%llx v %llu had %llu\n",
 730             inode, ceph_vinop(inode), le64_to_cpu(info->version),
 731             ci->i_version);
 732
 733        /* prealloc new cap struct */
 734        if (info->cap.caps && ceph_snap(inode) == CEPH_NOSNAP)
 735                new_cap = ceph_get_cap(mdsc, caps_reservation);
 736
 737        /*
 738         * prealloc xattr data, if it looks like we'll need it.  only
 739         * if len > 4 (meaning there are actually xattrs; the first 4
 740         * bytes are the xattr count).
 741         */
 742        if (iinfo->xattr_len > 4) {
 743                xattr_blob = ceph_buffer_new(iinfo->xattr_len, GFP_NOFS);
 744                if (!xattr_blob)
 745                        pr_err("fill_inode ENOMEM xattr blob %d bytes\n",
 746                               iinfo->xattr_len);
 747        }
 748
 749        spin_lock(&ci->i_ceph_lock);
 750
 751        /*
 752         * provided version will be odd if inode value is projected,
 753         * even if stable.  skip the update if we have newer stable
 754         * info (ours>=theirs, e.g. due to racing mds replies), unless
 755         * we are getting projected (unstable) info (in which case the
 756         * version is odd, and we want ours>theirs).
 757         *   us   them
 758         *   2    2     skip
 759         *   3    2     skip
 760         *   3    3     update
 761         */
 762        if (ci->i_version == 0 ||
 763            ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
 764             le64_to_cpu(info->version) > (ci->i_version & ~1)))
 765                new_version = true;
 766
 767        issued = __ceph_caps_issued(ci, &implemented);
 768        issued |= implemented | __ceph_caps_dirty(ci);
 769        new_issued = ~issued & le32_to_cpu(info->cap.caps);
 770
 771        /* update inode */
 772        ci->i_version = le64_to_cpu(info->version);
 773        inode->i_version++;
 774        inode->i_rdev = le32_to_cpu(info->rdev);
 775        inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
 776
 777        if ((new_version || (new_issued & CEPH_CAP_AUTH_SHARED)) &&
 778            (issued & CEPH_CAP_AUTH_EXCL) == 0) {
 779                inode->i_mode = le32_to_cpu(info->mode);
 780                inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(info->uid));
 781                inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(info->gid));
 782                dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode,
 783                     from_kuid(&init_user_ns, inode->i_uid),
 784                     from_kgid(&init_user_ns, inode->i_gid));
 785        }
 786
 787        if ((new_version || (new_issued & CEPH_CAP_LINK_SHARED)) &&
 788            (issued & CEPH_CAP_LINK_EXCL) == 0)
 789                set_nlink(inode, le32_to_cpu(info->nlink));
 790
 791        if (new_version || (new_issued & CEPH_CAP_ANY_RD)) {
 792                /* be careful with mtime, atime, size */
 793                ceph_decode_timespec(&atime, &info->atime);
 794                ceph_decode_timespec(&mtime, &info->mtime);
 795                ceph_decode_timespec(&ctime, &info->ctime);
 796                ceph_fill_file_time(inode, issued,
 797                                le32_to_cpu(info->time_warp_seq),
 798                                &ctime, &mtime, &atime);
 799        }
 800
 801        if (new_version ||
 802            (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
 803                if (ci->i_layout.fl_pg_pool != info->layout.fl_pg_pool)
 804                        ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
 805                ci->i_layout = info->layout;
 806                ci->i_pool_ns_len = iinfo->pool_ns_len;
 807
 808                queue_trunc = ceph_fill_file_size(inode, issued,
 809                                        le32_to_cpu(info->truncate_seq),
 810                                        le64_to_cpu(info->truncate_size),
 811                                        le64_to_cpu(info->size));
 812                /* only update max_size on auth cap */
 813                if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
 814                    ci->i_max_size != le64_to_cpu(info->max_size)) {
 815                        dout("max_size %lld -> %llu\n", ci->i_max_size,
 816                                        le64_to_cpu(info->max_size));
 817                        ci->i_max_size = le64_to_cpu(info->max_size);
 818                }
 819        }
 820
 821        /* xattrs */
 822        /* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */
 823        if ((ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))  &&
 824            le64_to_cpu(info->xattr_version) > ci->i_xattrs.version) {
 825                if (ci->i_xattrs.blob)
 826                        ceph_buffer_put(ci->i_xattrs.blob);
 827                ci->i_xattrs.blob = xattr_blob;
 828                if (xattr_blob)
 829                        memcpy(ci->i_xattrs.blob->vec.iov_base,
 830                               iinfo->xattr_data, iinfo->xattr_len);
 831                ci->i_xattrs.version = le64_to_cpu(info->xattr_version);
 832                ceph_forget_all_cached_acls(inode);
 833                xattr_blob = NULL;
 834        }
 835
 836        inode->i_mapping->a_ops = &ceph_aops;
 837        inode->i_mapping->backing_dev_info =
 838                &ceph_sb_to_client(inode->i_sb)->backing_dev_info;
 839
 840        switch (inode->i_mode & S_IFMT) {
 841        case S_IFIFO:
 842        case S_IFBLK:
 843        case S_IFCHR:
 844        case S_IFSOCK:
 845                init_special_inode(inode, inode->i_mode, inode->i_rdev);
 846                inode->i_op = &ceph_file_iops;
 847                break;
 848        case S_IFREG:
 849                inode->i_op = &ceph_file_iops;
 850                inode->i_fop = &ceph_file_fops;
 851                break;
 852        case S_IFLNK:
 853                inode->i_op = &ceph_symlink_iops;
 854                if (!ci->i_symlink) {
 855                        u32 symlen = iinfo->symlink_len;
 856                        char *sym;
 857
 858                        spin_unlock(&ci->i_ceph_lock);
 859
 860                        if (symlen != i_size_read(inode)) {
 861                                pr_err("fill_inode %llx.%llx BAD symlink "
 862                                        "size %lld\n", ceph_vinop(inode),
 863                                        i_size_read(inode));
 864                                i_size_write(inode, symlen);
 865                                inode->i_blocks = calc_inode_blocks(symlen);
 866                        }
 867
 868                        err = -ENOMEM;
 869                        sym = kstrndup(iinfo->symlink, symlen, GFP_NOFS);
 870                        if (!sym)
 871                                goto out;
 872
 873                        spin_lock(&ci->i_ceph_lock);
 874                        if (!ci->i_symlink)
 875                                ci->i_symlink = sym;
 876                        else
 877                                kfree(sym); /* lost a race */
 878                }
 879                break;
 880        case S_IFDIR:
 881                inode->i_op = &ceph_dir_iops;
 882                inode->i_fop = &ceph_dir_fops;
 883
 884                ci->i_dir_layout = iinfo->dir_layout;
 885
 886                ci->i_files = le64_to_cpu(info->files);
 887                ci->i_subdirs = le64_to_cpu(info->subdirs);
 888                ci->i_rbytes = le64_to_cpu(info->rbytes);
 889                ci->i_rfiles = le64_to_cpu(info->rfiles);
 890                ci->i_rsubdirs = le64_to_cpu(info->rsubdirs);
 891                ceph_decode_timespec(&ci->i_rctime, &info->rctime);
 892                break;
 893        default:
 894                pr_err("fill_inode %llx.%llx BAD mode 0%o\n",
 895                       ceph_vinop(inode), inode->i_mode);
 896        }
 897
 898        /* were we issued a capability? */
 899        if (info->cap.caps) {
 900                if (ceph_snap(inode) == CEPH_NOSNAP) {
 901                        unsigned caps = le32_to_cpu(info->cap.caps);
 902                        ceph_add_cap(inode, session,
 903                                     le64_to_cpu(info->cap.cap_id),
 904                                     cap_fmode, caps,
 905                                     le32_to_cpu(info->cap.wanted),
 906                                     le32_to_cpu(info->cap.seq),
 907                                     le32_to_cpu(info->cap.mseq),
 908                                     le64_to_cpu(info->cap.realm),
 909                                     info->cap.flags, &new_cap);
 910
 911                        /* set dir completion flag? */
 912                        if (S_ISDIR(inode->i_mode) &&
 913                            ci->i_files == 0 && ci->i_subdirs == 0 &&
 914                            (caps & CEPH_CAP_FILE_SHARED) &&
 915                            (issued & CEPH_CAP_FILE_EXCL) == 0 &&
 916                            !__ceph_dir_is_complete(ci)) {
 917                                dout(" marking %p complete (empty)\n", inode);
 918                                i_size_write(inode, 0);
 919                                __ceph_dir_set_complete(ci,
 920                                        atomic64_read(&ci->i_release_count),
 921                                        atomic64_read(&ci->i_ordered_count));
 922                        }
 923
 924                        wake = true;
 925                } else {
 926                        dout(" %p got snap_caps %s\n", inode,
 927                             ceph_cap_string(le32_to_cpu(info->cap.caps)));
 928                        ci->i_snap_caps |= le32_to_cpu(info->cap.caps);
 929                        if (cap_fmode >= 0)
 930                                __ceph_get_fmode(ci, cap_fmode);
 931                }
 932        } else if (cap_fmode >= 0) {
 933                pr_warn("mds issued no caps on %llx.%llx\n",
 934                           ceph_vinop(inode));
 935                __ceph_get_fmode(ci, cap_fmode);
 936        }
 937
 938        if (iinfo->inline_version > 0 &&
 939            iinfo->inline_version >= ci->i_inline_version) {
 940                int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
 941                ci->i_inline_version = iinfo->inline_version;
 942                if (ci->i_inline_version != CEPH_INLINE_NONE &&
 943                    (locked_page ||
 944                     (le32_to_cpu(info->cap.caps) & cache_caps)))
 945                        fill_inline = true;
 946        }
 947
 948        spin_unlock(&ci->i_ceph_lock);
 949
 950        if (fill_inline)
 951                ceph_fill_inline_data(inode, locked_page,
 952                                      iinfo->inline_data, iinfo->inline_len);
 953
 954        if (wake)
 955                wake_up_all(&ci->i_cap_wq);
 956
 957        /* queue truncate if we saw i_size decrease */
 958        if (queue_trunc)
 959                ceph_queue_vmtruncate(inode);
 960
 961        /* populate frag tree */
 962        if (S_ISDIR(inode->i_mode))
 963                ceph_fill_fragtree(inode, &info->fragtree, dirinfo);
 964
 965        /* update delegation info? */
 966        if (dirinfo)
 967                ceph_fill_dirfrag(inode, dirinfo);
 968
 969        err = 0;
 970out:
 971        if (new_cap)
 972                ceph_put_cap(mdsc, new_cap);
 973        if (xattr_blob)
 974                ceph_buffer_put(xattr_blob);
 975        return err;
 976}
 977
 978/*
 979 * caller should hold session s_mutex.
 980 */
 981static void update_dentry_lease(struct dentry *dentry,
 982                                struct ceph_mds_reply_lease *lease,
 983                                struct ceph_mds_session *session,
 984                                unsigned long from_time,
 985                                struct ceph_vino *tgt_vino,
 986                                struct ceph_vino *dir_vino)
 987{
 988        struct ceph_dentry_info *di = ceph_dentry(dentry);
 989        long unsigned duration = le32_to_cpu(lease->duration_ms);
 990        long unsigned ttl = from_time + (duration * HZ) / 1000;
 991        long unsigned half_ttl = from_time + (duration * HZ / 2) / 1000;
 992        struct inode *dir;
 993        struct ceph_mds_session *old_lease_session = NULL;
 994
 995        /*
 996         * Make sure dentry's inode matches tgt_vino. NULL tgt_vino means that
 997         * we expect a negative dentry.
 998         */
 999        if (!tgt_vino && d_really_is_positive(dentry))
1000                return;
1001
1002        if (tgt_vino && (d_really_is_negative(dentry) ||
1003                        !ceph_ino_compare(d_inode(dentry), tgt_vino)))
1004                return;
1005
1006        spin_lock(&dentry->d_lock);
1007        dout("update_dentry_lease %p duration %lu ms ttl %lu\n",
1008             dentry, duration, ttl);
1009
1010        dir = dentry->d_parent->d_inode;
1011
1012        /* make sure parent matches dir_vino */
1013        if (!ceph_ino_compare(dir, dir_vino))
1014                goto out_unlock;
1015
1016        /* only track leases on regular dentries */
1017        if (ceph_snap(dir) != CEPH_NOSNAP)
1018                goto out_unlock;
1019
1020        di->lease_shared_gen = ceph_inode(dir)->i_shared_gen;
1021
1022        if (duration == 0)
1023                goto out_unlock;
1024
1025        if (di->lease_gen == session->s_cap_gen &&
1026            time_before(ttl, dentry->d_time))
1027                goto out_unlock;  /* we already have a newer lease. */
1028
1029        if (di->lease_session && di->lease_session != session) {
1030                old_lease_session = di->lease_session;
1031                di->lease_session = NULL;
1032        }
1033
1034        ceph_dentry_lru_touch(dentry);
1035
1036        if (!di->lease_session)
1037                di->lease_session = ceph_get_mds_session(session);
1038        di->lease_gen = session->s_cap_gen;
1039        di->lease_seq = le32_to_cpu(lease->seq);
1040        di->lease_renew_after = half_ttl;
1041        di->lease_renew_from = 0;
1042        dentry->d_time = ttl;
1043out_unlock:
1044        spin_unlock(&dentry->d_lock);
1045        if (old_lease_session)
1046                ceph_put_mds_session(old_lease_session);
1047}
1048
1049/*
1050 * splice a dentry to an inode.
1051 * caller must hold directory i_mutex for this to be safe.
1052 *
1053 * we will only rehash the resulting dentry if @prehash is
1054 * true; @prehash will be set to false (for the benefit of
1055 * the caller) if we fail.
1056 */
1057static struct dentry *splice_dentry(struct dentry *dn, struct inode *in,
1058                                    bool *prehash)
1059{
1060        struct dentry *realdn;
1061
1062        BUG_ON(dn->d_inode);
1063
1064        /* dn must be unhashed */
1065        if (!d_unhashed(dn))
1066                d_drop(dn);
1067        realdn = d_materialise_unique(dn, in);
1068        if (IS_ERR(realdn)) {
1069                pr_err("splice_dentry error %ld %p inode %p ino %llx.%llx\n",
1070                       PTR_ERR(realdn), dn, in, ceph_vinop(in));
1071                if (prehash)
1072                        *prehash = false; /* don't rehash on error */
1073                dn = realdn; /* note realdn contains the error */
1074                goto out;
1075        } else if (realdn) {
1076                dout("dn %p (%d) spliced with %p (%d) "
1077                     "inode %p ino %llx.%llx\n",
1078                     dn, d_count(dn),
1079                     realdn, d_count(realdn),
1080                     realdn->d_inode, ceph_vinop(realdn->d_inode));
1081                dput(dn);
1082                dn = realdn;
1083        } else {
1084                BUG_ON(!ceph_dentry(dn));
1085                dout("dn %p attached to %p ino %llx.%llx\n",
1086                     dn, dn->d_inode, ceph_vinop(dn->d_inode));
1087        }
1088        if ((!prehash || *prehash) && d_unhashed(dn))
1089                d_rehash(dn);
1090out:
1091        return dn;
1092}
1093
1094/*
1095 * Incorporate results into the local cache.  This is either just
1096 * one inode, or a directory, dentry, and possibly linked-to inode (e.g.,
1097 * after a lookup).
1098 *
1099 * A reply may contain
1100 *         a directory inode along with a dentry.
1101 *  and/or a target inode
1102 *
1103 * Called with snap_rwsem (read).
1104 */
1105int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
1106{
1107        struct ceph_mds_session *session = req->r_session;
1108        struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1109        struct inode *in = NULL;
1110        struct ceph_vino tvino, dvino;
1111        struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
1112        int err = 0;
1113
1114        dout("fill_trace %p is_dentry %d is_target %d\n", req,
1115             rinfo->head->is_dentry, rinfo->head->is_target);
1116
1117        if (!rinfo->head->is_target && !rinfo->head->is_dentry) {
1118                dout("fill_trace reply is empty!\n");
1119                if (rinfo->head->result == 0 && req->r_parent)
1120                        ceph_invalidate_dir_request(req);
1121                return 0;
1122        }
1123
1124        if (rinfo->head->is_dentry) {
1125                struct inode *dir = req->r_parent;
1126
1127                if (dir) {
1128                        err = fill_inode(dir, NULL,
1129                                         &rinfo->diri, rinfo->dirfrag,
1130                                         session, req->r_request_started, -1,
1131                                         &req->r_caps_reservation);
1132                        if (err < 0)
1133                                goto done;
1134                } else {
1135                        WARN_ON_ONCE(1);
1136                }
1137
1138                if (dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME) {
1139                        struct qstr dname;
1140                        struct dentry *dn, *parent;
1141
1142                        BUG_ON(!rinfo->head->is_target);
1143                        BUG_ON(req->r_dentry);
1144
1145                        parent = d_find_any_alias(dir);
1146                        BUG_ON(!parent);
1147
1148                        dname.name = rinfo->dname;
1149                        dname.len = rinfo->dname_len;
1150                        dname.hash = full_name_hash(dname.name, dname.len);
1151                        tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
1152                        tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
1153retry_lookup:
1154                        dn = d_lookup(parent, &dname);
1155                        dout("d_lookup on parent=%p name=%.*s got %p\n",
1156                             parent, dname.len, dname.name, dn);
1157
1158                        if (!dn) {
1159                                dn = d_alloc(parent, &dname);
1160                                dout("d_alloc %p '%.*s' = %p\n", parent,
1161                                     dname.len, dname.name, dn);
1162                                if (!dn) {
1163                                        dput(parent);
1164                                        err = -ENOMEM;
1165                                        goto done;
1166                                }
1167                                err = ceph_init_dentry(dn);
1168                                if (err < 0) {
1169                                        dput(dn);
1170                                        dput(parent);
1171                                        goto done;
1172                                }
1173                        } else if (dn->d_inode &&
1174                                   (ceph_ino(dn->d_inode) != tvino.ino ||
1175                                    ceph_snap(dn->d_inode) != tvino.snap)) {
1176                                dout(" dn %p points to wrong inode %p\n",
1177                                     dn, dn->d_inode);
1178                                d_delete(dn);
1179                                dput(dn);
1180                                goto retry_lookup;
1181                        }
1182
1183                        req->r_dentry = dn;
1184                        dput(parent);
1185                }
1186        }
1187
1188        if (rinfo->head->is_target) {
1189                tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
1190                tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
1191
1192                in = ceph_get_inode(sb, tvino);
1193                if (IS_ERR(in)) {
1194                        err = PTR_ERR(in);
1195                        goto done;
1196                }
1197                req->r_target_inode = in;
1198
1199                err = fill_inode(in, req->r_locked_page, &rinfo->targeti, NULL,
1200                                session, req->r_request_started,
1201                                (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags) &&
1202                                rinfo->head->result == 0) ?  req->r_fmode : -1,
1203                                &req->r_caps_reservation);
1204                if (err < 0) {
1205                        pr_err("fill_inode badness %p %llx.%llx\n",
1206                                in, ceph_vinop(in));
1207                        goto done;
1208                }
1209        }
1210
1211        /*
1212         * ignore null lease/binding on snapdir ENOENT, or else we
1213         * will have trouble splicing in the virtual snapdir later
1214         */
1215        if (rinfo->head->is_dentry &&
1216            !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags) &&
1217            test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) &&
1218            (rinfo->head->is_target || strncmp(req->r_dentry->d_name.name,
1219                                               fsc->mount_options->snapdir_name,
1220                                               req->r_dentry->d_name.len))) {
1221                /*
1222                 * lookup link rename   : null -> possibly existing inode
1223                 * mknod symlink mkdir  : null -> new inode
1224                 * unlink               : linked -> null
1225                 */
1226                struct inode *dir = req->r_parent;
1227                struct dentry *dn = req->r_dentry;
1228                bool have_dir_cap, have_lease;
1229
1230                BUG_ON(!dn);
1231                BUG_ON(!dir);
1232                BUG_ON(dn->d_parent->d_inode != dir);
1233
1234                dvino.ino = le64_to_cpu(rinfo->diri.in->ino);
1235                dvino.snap = le64_to_cpu(rinfo->diri.in->snapid);
1236
1237                BUG_ON(ceph_ino(dir) != dvino.ino);
1238                BUG_ON(ceph_snap(dir) != dvino.snap);
1239
1240                /* do we have a lease on the whole dir? */
1241                have_dir_cap =
1242                        (le32_to_cpu(rinfo->diri.in->cap.caps) &
1243                         CEPH_CAP_FILE_SHARED);
1244
1245                /* do we have a dn lease? */
1246                have_lease = have_dir_cap ||
1247                        le32_to_cpu(rinfo->dlease->duration_ms);
1248                if (!have_lease)
1249                        dout("fill_trace  no dentry lease or dir cap\n");
1250
1251                /* rename? */
1252                if (req->r_old_dentry && req->r_op == CEPH_MDS_OP_RENAME) {
1253                        struct inode *olddir = req->r_old_dentry_dir;
1254                        BUG_ON(!olddir);
1255
1256                        dout(" src %p '%pd' dst %p '%pd'\n",
1257                             req->r_old_dentry,
1258                             req->r_old_dentry,
1259                             dn, dn);
1260                        dout("fill_trace doing d_move %p -> %p\n",
1261                             req->r_old_dentry, dn);
1262
1263                        /* d_move screws up sibling dentries' offsets */
1264                        ceph_dir_clear_ordered(dir);
1265                        ceph_dir_clear_ordered(olddir);
1266
1267                        d_move(req->r_old_dentry, dn);
1268                        dout(" src %p '%pd' dst %p '%pd'\n",
1269                             req->r_old_dentry,
1270                             req->r_old_dentry,
1271                             dn, dn);
1272
1273                        /* ensure target dentry is invalidated, despite
1274                           rehashing bug in vfs_rename_dir */
1275                        ceph_invalidate_dentry_lease(dn);
1276
1277                        dout("dn %p gets new offset %lld\n", req->r_old_dentry,
1278                             ceph_dentry(req->r_old_dentry)->offset);
1279
1280                        dn = req->r_old_dentry;  /* use old_dentry */
1281                }
1282
1283                /* null dentry? */
1284                if (!rinfo->head->is_target) {
1285                        dout("fill_trace null dentry\n");
1286                        if (dn->d_inode) {
1287                                ceph_dir_clear_ordered(dir);
1288                                dout("d_delete %p\n", dn);
1289                                d_delete(dn);
1290                        } else if (have_lease) {
1291                                if (d_unhashed(dn))
1292                                        d_add(dn, NULL);
1293                                update_dentry_lease(dn, rinfo->dlease,
1294                                                    session,
1295                                                    req->r_request_started,
1296                                                    NULL, &dvino);
1297                        }
1298                        goto done;
1299                }
1300
1301                /* attach proper inode */
1302                if (!dn->d_inode) {
1303                        ceph_dir_clear_ordered(dir);
1304                        ihold(in);
1305                        dn = splice_dentry(dn, in, &have_lease);
1306                        if (IS_ERR(dn)) {
1307                                err = PTR_ERR(dn);
1308                                goto done;
1309                        }
1310                        req->r_dentry = dn;  /* may have spliced */
1311                } else if (dn->d_inode && dn->d_inode != in) {
1312                        dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
1313                             dn, dn->d_inode, ceph_vinop(dn->d_inode),
1314                             ceph_vinop(in));
1315                        d_invalidate(dn);
1316                        have_lease = false;
1317                }
1318
1319                if (have_lease) {
1320                        tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
1321                        tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
1322                        update_dentry_lease(dn, rinfo->dlease, session,
1323                                            req->r_request_started,
1324                                            &tvino, &dvino);
1325                }
1326                dout(" final dn %p\n", dn);
1327        } else if ((req->r_op == CEPH_MDS_OP_LOOKUPSNAP ||
1328                    req->r_op == CEPH_MDS_OP_MKSNAP) &&
1329                   !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
1330                struct dentry *dn = req->r_dentry;
1331                struct inode *dir = req->r_parent;
1332
1333                /* fill out a snapdir LOOKUPSNAP dentry */
1334                BUG_ON(!dn);
1335                BUG_ON(!dir);
1336                BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR);
1337                dout(" linking snapped dir %p to dn %p\n", in, dn);
1338                ceph_dir_clear_ordered(dir);
1339                ihold(in);
1340                dn = splice_dentry(dn, in, NULL);
1341                if (IS_ERR(dn)) {
1342                        err = PTR_ERR(dn);
1343                        goto done;
1344                }
1345                req->r_dentry = dn;  /* may have spliced */
1346        } else if (rinfo->head->is_dentry) {
1347                struct ceph_vino *ptvino = NULL;
1348
1349                if ((le32_to_cpu(rinfo->diri.in->cap.caps) & CEPH_CAP_FILE_SHARED) ||
1350                    le32_to_cpu(rinfo->dlease->duration_ms)) {
1351                        dvino.ino = le64_to_cpu(rinfo->diri.in->ino);
1352                        dvino.snap = le64_to_cpu(rinfo->diri.in->snapid);
1353
1354                        if (rinfo->head->is_target) {
1355                                tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
1356                                tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
1357                                ptvino = &tvino;
1358                        }
1359
1360                        update_dentry_lease(req->r_dentry, rinfo->dlease,
1361                                session, req->r_request_started, ptvino,
1362                                &dvino);
1363                } else {
1364                        dout("%s: no dentry lease or dir cap\n", __func__);
1365                }
1366        }
1367done:
1368        dout("fill_trace done err=%d\n", err);
1369        return err;
1370}
1371
1372void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl)
1373{
1374        if (ctl->page) {
1375                kunmap(ctl->page);
1376                page_cache_release(ctl->page);
1377                ctl->page = NULL;
1378        }
1379}
1380
1381static int fill_readdir_cache(struct inode *dir, struct dentry *dn,
1382                              struct ceph_readdir_cache_control *ctl,
1383                              struct ceph_mds_request *req)
1384{
1385        struct ceph_inode_info *ci = ceph_inode(dir);
1386        unsigned nsize = PAGE_CACHE_SIZE / sizeof(struct dentry*);
1387        unsigned idx = ctl->index % nsize;
1388        pgoff_t pgoff = ctl->index / nsize;
1389
1390        if (!ctl->page || pgoff != page_index(ctl->page)) {
1391                ceph_readdir_cache_release(ctl);
1392                if (idx == 0)
1393                        ctl->page = grab_cache_page(&dir->i_data, pgoff);
1394                else
1395                        ctl->page = find_lock_page(&dir->i_data, pgoff);
1396                if (!ctl->page) {
1397                        ctl->index = -1;
1398                        return idx == 0 ? -ENOMEM : 0;
1399                }
1400                /* reading/filling the cache are serialized by
1401                 * i_mutex, no need to use page lock */
1402                unlock_page(ctl->page);
1403                ctl->dentries = kmap(ctl->page);
1404                if (idx == 0)
1405                        memset(ctl->dentries, 0, PAGE_CACHE_SIZE);
1406        }
1407
1408        if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) &&
1409            req->r_dir_ordered_cnt == atomic64_read(&ci->i_ordered_count)) {
1410                dout("readdir cache dn %p idx %d\n", dn, ctl->index);
1411                ctl->dentries[idx] = dn;
1412                ctl->index++;
1413        } else {
1414                dout("disable readdir cache\n");
1415                ctl->index = -1;
1416        }
1417        return 0;
1418}
1419
1420/*
1421 * Prepopulate our cache with readdir results, leases, etc.
1422 */
1423static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
1424                                           struct ceph_mds_session *session)
1425{
1426        struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1427        int i, err = 0;
1428
1429        for (i = 0; i < rinfo->dir_nr; i++) {
1430                struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
1431                struct ceph_vino vino;
1432                struct inode *in;
1433                int rc;
1434
1435                vino.ino = le64_to_cpu(rde->inode.in->ino);
1436                vino.snap = le64_to_cpu(rde->inode.in->snapid);
1437
1438                in = ceph_get_inode(req->r_dentry->d_sb, vino);
1439                if (IS_ERR(in)) {
1440                        err = PTR_ERR(in);
1441                        dout("new_inode badness got %d\n", err);
1442                        continue;
1443                }
1444                rc = fill_inode(in, NULL, &rde->inode, NULL, session,
1445                                req->r_request_started, -1,
1446                                &req->r_caps_reservation);
1447                if (rc < 0) {
1448                        pr_err("fill_inode badness on %p got %d\n", in, rc);
1449                        err = rc;
1450                }
1451                iput(in);
1452        }
1453
1454        return err;
1455}
1456
1457int ceph_readdir_prepopulate(struct ceph_mds_request *req,
1458                             struct ceph_mds_session *session)
1459{
1460        struct dentry *parent = req->r_dentry;
1461        struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
1462        struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1463        struct qstr dname;
1464        struct dentry *dn;
1465        struct inode *in;
1466        int err = 0, skipped = 0, ret, i;
1467        struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
1468        u32 frag = le32_to_cpu(rhead->args.readdir.frag);
1469        u32 last_hash = 0;
1470        u32 fpos_offset;
1471        struct ceph_readdir_cache_control cache_ctl = {};
1472
1473        if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
1474                return readdir_prepopulate_inodes_only(req, session);
1475
1476        if (rinfo->hash_order) {
1477                if (req->r_path2) {
1478                        last_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
1479                                                  req->r_path2,
1480                                                  strlen(req->r_path2));
1481                        last_hash = ceph_frag_value(last_hash);
1482                } else if (rinfo->offset_hash) {
1483                        /* mds understands offset_hash */
1484                        WARN_ON_ONCE(req->r_readdir_offset != 2);
1485                        last_hash = le32_to_cpu(rhead->args.readdir.offset_hash);
1486                }
1487        }
1488
1489        if (rinfo->dir_dir &&
1490            le32_to_cpu(rinfo->dir_dir->frag) != frag) {
1491                dout("readdir_prepopulate got new frag %x -> %x\n",
1492                     frag, le32_to_cpu(rinfo->dir_dir->frag));
1493                frag = le32_to_cpu(rinfo->dir_dir->frag);
1494                if (!rinfo->hash_order)
1495                        req->r_readdir_offset = 2;
1496        }
1497
1498        if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) {
1499                dout("readdir_prepopulate %d items under SNAPDIR dn %p\n",
1500                     rinfo->dir_nr, parent);
1501        } else {
1502                dout("readdir_prepopulate %d items under dn %p\n",
1503                     rinfo->dir_nr, parent);
1504                if (rinfo->dir_dir)
1505                        ceph_fill_dirfrag(parent->d_inode, rinfo->dir_dir);
1506        }
1507
1508        if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2 &&
1509            !(rinfo->hash_order && last_hash)) {
1510                /* note dir version at start of readdir so we can tell
1511                 * if any dentries get dropped */
1512                req->r_dir_release_cnt = atomic64_read(&ci->i_release_count);
1513                req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count);
1514                req->r_readdir_cache_idx = 0;
1515        }
1516
1517        cache_ctl.index = req->r_readdir_cache_idx;
1518        fpos_offset = req->r_readdir_offset;
1519
1520        /* FIXME: release caps/leases if error occurs */
1521        for (i = 0; i < rinfo->dir_nr; i++) {
1522                struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
1523                struct ceph_vino tvino, dvino;
1524
1525                dname.name = rde->name;
1526                dname.len = rde->name_len;
1527                dname.hash = full_name_hash(dname.name, dname.len);
1528
1529                tvino.ino = le64_to_cpu(rde->inode.in->ino);
1530                tvino.snap = le64_to_cpu(rde->inode.in->snapid);
1531
1532                if (rinfo->hash_order) {
1533                        u32 hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
1534                                                 rde->name, rde->name_len);
1535                        hash = ceph_frag_value(hash);
1536                        if (hash != last_hash)
1537                                fpos_offset = 2;
1538                        last_hash = hash;
1539                        rde->offset = ceph_make_fpos(hash, fpos_offset++, true);
1540                } else {
1541                        rde->offset = ceph_make_fpos(frag, fpos_offset++, false);
1542                }
1543
1544retry_lookup:
1545                dn = d_lookup(parent, &dname);
1546                dout("d_lookup on parent=%p name=%.*s got %p\n",
1547                     parent, dname.len, dname.name, dn);
1548
1549                if (!dn) {
1550                        dn = d_alloc(parent, &dname);
1551                        dout("d_alloc %p '%.*s' = %p\n", parent,
1552                             dname.len, dname.name, dn);
1553                        if (!dn) {
1554                                dout("d_alloc badness\n");
1555                                err = -ENOMEM;
1556                                goto out;
1557                        }
1558                        ret = ceph_init_dentry(dn);
1559                        if (ret < 0) {
1560                                dput(dn);
1561                                err = ret;
1562                                goto out;
1563                        }
1564                } else if (dn->d_inode &&
1565                           (ceph_ino(dn->d_inode) != tvino.ino ||
1566                            ceph_snap(dn->d_inode) != tvino.snap)) {
1567                        dout(" dn %p points to wrong inode %p\n",
1568                             dn, dn->d_inode);
1569                        d_delete(dn);
1570                        dput(dn);
1571                        goto retry_lookup;
1572                }
1573
1574                /* inode */
1575                if (dn->d_inode) {
1576                        in = dn->d_inode;
1577                } else {
1578                        in = ceph_get_inode(parent->d_sb, tvino);
1579                        if (IS_ERR(in)) {
1580                                dout("new_inode badness\n");
1581                                d_drop(dn);
1582                                dput(dn);
1583                                err = PTR_ERR(in);
1584                                goto out;
1585                        }
1586                }
1587
1588                ret = fill_inode(in, NULL, &rde->inode, NULL, session,
1589                                 req->r_request_started, -1,
1590                                 &req->r_caps_reservation);
1591                if (ret < 0) {
1592                        pr_err("fill_inode badness on %p\n", in);
1593                        if (!dn->d_inode)
1594                                iput(in);
1595                        d_drop(dn);
1596                        err = ret;
1597                        goto next_item;
1598                }
1599
1600                if (!dn->d_inode) {
1601                        struct dentry *realdn;
1602
1603                        if (ceph_security_xattr_deadlock(in)) {
1604                                dout(" skip splicing dn %p to inode %p"
1605                                     " (security xattr deadlock)\n", dn, in);
1606                                iput(in);
1607                                skipped++;
1608                                goto next_item;
1609                        }
1610
1611                        realdn = splice_dentry(dn, in, NULL);
1612                        if (IS_ERR(realdn)) {
1613                                err = PTR_ERR(realdn);
1614                                d_drop(dn);
1615                                dn = NULL;
1616                                goto next_item;
1617                        }
1618                        dn = realdn;
1619                }
1620
1621                ceph_dentry(dn)->offset = rde->offset;
1622
1623                dvino = ceph_vino(d_inode(parent));
1624                update_dentry_lease(dn, rde->lease, req->r_session,
1625                                    req->r_request_started, &tvino, &dvino);
1626
1627                if (err == 0 && skipped == 0 && cache_ctl.index >= 0) {
1628                        ret = fill_readdir_cache(parent->d_inode, dn,
1629                                                 &cache_ctl, req);
1630                        if (ret < 0)
1631                                err = ret;
1632                }
1633next_item:
1634                if (dn)
1635                        dput(dn);
1636        }
1637out:
1638        if (err == 0 && skipped == 0) {
1639                set_bit(CEPH_MDS_R_DID_PREPOPULATE, &req->r_req_flags);
1640                req->r_readdir_cache_idx = cache_ctl.index;
1641        }
1642        ceph_readdir_cache_release(&cache_ctl);
1643        dout("readdir_prepopulate done\n");
1644        return err;
1645}
1646
1647bool ceph_inode_set_size(struct inode *inode, loff_t size)
1648{
1649        struct ceph_inode_info *ci = ceph_inode(inode);
1650        bool ret;
1651
1652        spin_lock(&ci->i_ceph_lock);
1653        dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);
1654        i_size_write(inode, size);
1655        inode->i_blocks = calc_inode_blocks(size);
1656
1657        ret = __ceph_should_report_size(ci);
1658
1659        spin_unlock(&ci->i_ceph_lock);
1660        return ret;
1661}
1662
1663/*
1664 * Write back inode data in a worker thread.  (This can't be done
1665 * in the message handler context.)
1666 */
1667void ceph_queue_writeback(struct inode *inode)
1668{
1669        ihold(inode);
1670        if (queue_work(ceph_inode_to_client(inode)->wb_wq,
1671                       &ceph_inode(inode)->i_wb_work)) {
1672                dout("ceph_queue_writeback %p\n", inode);
1673        } else {
1674                dout("ceph_queue_writeback %p failed\n", inode);
1675                iput(inode);
1676        }
1677}
1678
1679static void ceph_writeback_work(struct work_struct *work)
1680{
1681        struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
1682                                                  i_wb_work);
1683        struct inode *inode = &ci->vfs_inode;
1684
1685        dout("writeback %p\n", inode);
1686        filemap_fdatawrite(&inode->i_data);
1687        iput(inode);
1688}
1689
1690/*
1691 * queue an async invalidation
1692 */
1693void ceph_queue_invalidate(struct inode *inode)
1694{
1695        ihold(inode);
1696        if (queue_work(ceph_inode_to_client(inode)->pg_inv_wq,
1697                       &ceph_inode(inode)->i_pg_inv_work)) {
1698                dout("ceph_queue_invalidate %p\n", inode);
1699        } else {
1700                dout("ceph_queue_invalidate %p failed\n", inode);
1701                iput(inode);
1702        }
1703}
1704
1705/*
1706 * Invalidate inode pages in a worker thread.  (This can't be done
1707 * in the message handler context.)
1708 */
1709static void ceph_invalidate_work(struct work_struct *work)
1710{
1711        struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
1712                                                  i_pg_inv_work);
1713        struct inode *inode = &ci->vfs_inode;
1714        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1715        u32 orig_gen;
1716        int check = 0;
1717
1718        mutex_lock(&ci->i_truncate_mutex);
1719
1720        if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
1721                pr_warn_ratelimited("invalidate_pages %p %lld forced umount\n",
1722                                    inode, ceph_ino(inode));
1723                mapping_set_error(inode->i_mapping, -EIO);
1724                truncate_pagecache(inode, 0);
1725                mutex_unlock(&ci->i_truncate_mutex);
1726                goto out;
1727        }
1728
1729        spin_lock(&ci->i_ceph_lock);
1730        dout("invalidate_pages %p gen %d revoking %d\n", inode,
1731             ci->i_rdcache_gen, ci->i_rdcache_revoking);
1732        if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
1733                if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE))
1734                        check = 1;
1735                spin_unlock(&ci->i_ceph_lock);
1736                mutex_unlock(&ci->i_truncate_mutex);
1737                goto out;
1738        }
1739        orig_gen = ci->i_rdcache_gen;
1740        spin_unlock(&ci->i_ceph_lock);
1741
1742        if (invalidate_inode_pages2(inode->i_mapping) < 0) {
1743                pr_err("invalidate_pages %p fails\n", inode);
1744        }
1745
1746        spin_lock(&ci->i_ceph_lock);
1747        if (orig_gen == ci->i_rdcache_gen &&
1748            orig_gen == ci->i_rdcache_revoking) {
1749                dout("invalidate_pages %p gen %d successful\n", inode,
1750                     ci->i_rdcache_gen);
1751                ci->i_rdcache_revoking--;
1752                check = 1;
1753        } else {
1754                dout("invalidate_pages %p gen %d raced, now %d revoking %d\n",
1755                     inode, orig_gen, ci->i_rdcache_gen,
1756                     ci->i_rdcache_revoking);
1757                if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE))
1758                        check = 1;
1759        }
1760        spin_unlock(&ci->i_ceph_lock);
1761        mutex_unlock(&ci->i_truncate_mutex);
1762out:
1763        if (check)
1764                ceph_check_caps(ci, 0, NULL);
1765        iput(inode);
1766}
1767
1768
1769/*
1770 * called by trunc_wq;
1771 *
1772 * We also truncate in a separate thread as well.
1773 */
1774static void ceph_vmtruncate_work(struct work_struct *work)
1775{
1776        struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
1777                                                  i_vmtruncate_work);
1778        struct inode *inode = &ci->vfs_inode;
1779
1780        dout("vmtruncate_work %p\n", inode);
1781        __ceph_do_pending_vmtruncate(inode);
1782        iput(inode);
1783}
1784
1785/*
1786 * Queue an async vmtruncate.  If we fail to queue work, we will handle
1787 * the truncation the next time we call __ceph_do_pending_vmtruncate.
1788 */
1789void ceph_queue_vmtruncate(struct inode *inode)
1790{
1791        struct ceph_inode_info *ci = ceph_inode(inode);
1792
1793        ihold(inode);
1794        if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq,
1795                       &ci->i_vmtruncate_work)) {
1796                dout("ceph_queue_vmtruncate %p\n", inode);
1797        } else {
1798                dout("ceph_queue_vmtruncate %p failed, pending=%d\n",
1799                     inode, ci->i_truncate_pending);
1800                iput(inode);
1801        }
1802}
1803
1804/*
1805 * Make sure any pending truncation is applied before doing anything
1806 * that may depend on it.
1807 */
1808void __ceph_do_pending_vmtruncate(struct inode *inode)
1809{
1810        struct ceph_inode_info *ci = ceph_inode(inode);
1811        u64 to;
1812        int wrbuffer_refs, finish = 0;
1813
1814        mutex_lock(&ci->i_truncate_mutex);
1815retry:
1816        spin_lock(&ci->i_ceph_lock);
1817        if (ci->i_truncate_pending == 0) {
1818                dout("__do_pending_vmtruncate %p none pending\n", inode);
1819                spin_unlock(&ci->i_ceph_lock);
1820                mutex_unlock(&ci->i_truncate_mutex);
1821                return;
1822        }
1823
1824        /*
1825         * make sure any dirty snapped pages are flushed before we
1826         * possibly truncate them.. so write AND block!
1827         */
1828        if (ci->i_wrbuffer_ref_head < ci->i_wrbuffer_ref) {
1829                dout("__do_pending_vmtruncate %p flushing snaps first\n",
1830                     inode);
1831                spin_unlock(&ci->i_ceph_lock);
1832                filemap_write_and_wait_range(&inode->i_data, 0,
1833                                             inode->i_sb->s_maxbytes);
1834                goto retry;
1835        }
1836
1837        /* there should be no reader or writer */
1838        WARN_ON_ONCE(ci->i_rd_ref || ci->i_wr_ref);
1839
1840        to = ci->i_truncate_size;
1841        wrbuffer_refs = ci->i_wrbuffer_ref;
1842        dout("__do_pending_vmtruncate %p (%d) to %lld\n", inode,
1843             ci->i_truncate_pending, to);
1844        spin_unlock(&ci->i_ceph_lock);
1845
1846        truncate_pagecache(inode, to);
1847
1848        spin_lock(&ci->i_ceph_lock);
1849        if (to == ci->i_truncate_size) {
1850                ci->i_truncate_pending = 0;
1851                finish = 1;
1852        }
1853        spin_unlock(&ci->i_ceph_lock);
1854        if (!finish)
1855                goto retry;
1856
1857        mutex_unlock(&ci->i_truncate_mutex);
1858
1859        if (wrbuffer_refs == 0)
1860                ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
1861
1862        wake_up_all(&ci->i_cap_wq);
1863}
1864
1865
1866/*
1867 * symlinks
1868 */
1869static void *ceph_sym_follow_link(struct dentry *dentry, struct nameidata *nd)
1870{
1871        struct ceph_inode_info *ci = ceph_inode(dentry->d_inode);
1872        nd_set_link(nd, ci->i_symlink);
1873        return NULL;
1874}
1875
1876static const struct inode_operations ceph_symlink_iops = {
1877        .readlink = generic_readlink,
1878        .follow_link = ceph_sym_follow_link,
1879        .setattr = ceph_setattr,
1880        .getattr = ceph_getattr,
1881        .setxattr = ceph_setxattr,
1882        .getxattr = ceph_getxattr,
1883        .listxattr = ceph_listxattr,
1884        .removexattr = ceph_removexattr,
1885};
1886
1887int __ceph_setattr(struct dentry *dentry, struct iattr *attr)
1888{
1889        struct inode *inode = dentry->d_inode;
1890        struct ceph_inode_info *ci = ceph_inode(inode);
1891        const unsigned int ia_valid = attr->ia_valid;
1892        struct ceph_mds_request *req;
1893        struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
1894        struct ceph_cap_flush *prealloc_cf;
1895        int issued;
1896        int release = 0, dirtied = 0;
1897        int mask = 0;
1898        int err = 0;
1899        int inode_dirty_flags = 0;
1900        bool lock_snap_rwsem = false;
1901
1902        if (ceph_snap(inode) != CEPH_NOSNAP)
1903                return -EROFS;
1904
1905        err = inode_change_ok(inode, attr);
1906        if (err != 0)
1907                return err;
1908
1909        prealloc_cf = ceph_alloc_cap_flush();
1910        if (!prealloc_cf)
1911                return -ENOMEM;
1912
1913        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETATTR,
1914                                       USE_AUTH_MDS);
1915        if (IS_ERR(req)) {
1916                ceph_free_cap_flush(prealloc_cf);
1917                return PTR_ERR(req);
1918        }
1919
1920        spin_lock(&ci->i_ceph_lock);
1921        issued = __ceph_caps_issued(ci, NULL);
1922
1923        if (!ci->i_head_snapc &&
1924            (issued & (CEPH_CAP_ANY_EXCL | CEPH_CAP_FILE_WR))) {
1925                lock_snap_rwsem = true;
1926                if (!down_read_trylock(&mdsc->snap_rwsem)) {
1927                        spin_unlock(&ci->i_ceph_lock);
1928                        down_read(&mdsc->snap_rwsem);
1929                        spin_lock(&ci->i_ceph_lock);
1930                        issued = __ceph_caps_issued(ci, NULL);
1931                }
1932        }
1933
1934        dout("setattr %p issued %s\n", inode, ceph_cap_string(issued));
1935
1936        if (ia_valid & ATTR_UID) {
1937                dout("setattr %p uid %d -> %d\n", inode,
1938                     from_kuid(&init_user_ns, inode->i_uid),
1939                     from_kuid(&init_user_ns, attr->ia_uid));
1940                if (issued & CEPH_CAP_AUTH_EXCL) {
1941                        inode->i_uid = attr->ia_uid;
1942                        dirtied |= CEPH_CAP_AUTH_EXCL;
1943                } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 ||
1944                           !uid_eq(attr->ia_uid, inode->i_uid)) {
1945                        req->r_args.setattr.uid = cpu_to_le32(
1946                                from_kuid(&init_user_ns, attr->ia_uid));
1947                        mask |= CEPH_SETATTR_UID;
1948                        release |= CEPH_CAP_AUTH_SHARED;
1949                }
1950        }
1951        if (ia_valid & ATTR_GID) {
1952                dout("setattr %p gid %d -> %d\n", inode,
1953                     from_kgid(&init_user_ns, inode->i_gid),
1954                     from_kgid(&init_user_ns, attr->ia_gid));
1955                if (issued & CEPH_CAP_AUTH_EXCL) {
1956                        inode->i_gid = attr->ia_gid;
1957                        dirtied |= CEPH_CAP_AUTH_EXCL;
1958                } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 ||
1959                           !gid_eq(attr->ia_gid, inode->i_gid)) {
1960                        req->r_args.setattr.gid = cpu_to_le32(
1961                                from_kgid(&init_user_ns, attr->ia_gid));
1962                        mask |= CEPH_SETATTR_GID;
1963                        release |= CEPH_CAP_AUTH_SHARED;
1964                }
1965        }
1966        if (ia_valid & ATTR_MODE) {
1967                dout("setattr %p mode 0%o -> 0%o\n", inode, inode->i_mode,
1968                     attr->ia_mode);
1969                if (issued & CEPH_CAP_AUTH_EXCL) {
1970                        inode->i_mode = attr->ia_mode;
1971                        dirtied |= CEPH_CAP_AUTH_EXCL;
1972                } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 ||
1973                           attr->ia_mode != inode->i_mode) {
1974                        inode->i_mode = attr->ia_mode;
1975                        req->r_args.setattr.mode = cpu_to_le32(attr->ia_mode);
1976                        mask |= CEPH_SETATTR_MODE;
1977                        release |= CEPH_CAP_AUTH_SHARED;
1978                }
1979        }
1980
1981        if (ia_valid & ATTR_ATIME) {
1982                dout("setattr %p atime %ld.%ld -> %ld.%ld\n", inode,
1983                     inode->i_atime.tv_sec, inode->i_atime.tv_nsec,
1984                     attr->ia_atime.tv_sec, attr->ia_atime.tv_nsec);
1985                if (issued & CEPH_CAP_FILE_EXCL) {
1986                        ci->i_time_warp_seq++;
1987                        inode->i_atime = attr->ia_atime;
1988                        dirtied |= CEPH_CAP_FILE_EXCL;
1989                } else if ((issued & CEPH_CAP_FILE_WR) &&
1990                           timespec_compare(&inode->i_atime,
1991                                            &attr->ia_atime) < 0) {
1992                        inode->i_atime = attr->ia_atime;
1993                        dirtied |= CEPH_CAP_FILE_WR;
1994                } else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
1995                           !timespec_equal(&inode->i_atime, &attr->ia_atime)) {
1996                        ceph_encode_timespec(&req->r_args.setattr.atime,
1997                                             &attr->ia_atime);
1998                        mask |= CEPH_SETATTR_ATIME;
1999                        release |= CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_RD |
2000                                CEPH_CAP_FILE_WR;
2001                }
2002        }
2003        if (ia_valid & ATTR_MTIME) {
2004                dout("setattr %p mtime %ld.%ld -> %ld.%ld\n", inode,
2005                     inode->i_mtime.tv_sec, inode->i_mtime.tv_nsec,
2006                     attr->ia_mtime.tv_sec, attr->ia_mtime.tv_nsec);
2007                if (issued & CEPH_CAP_FILE_EXCL) {
2008                        ci->i_time_warp_seq++;
2009                        inode->i_mtime = attr->ia_mtime;
2010                        dirtied |= CEPH_CAP_FILE_EXCL;
2011                } else if ((issued & CEPH_CAP_FILE_WR) &&
2012                           timespec_compare(&inode->i_mtime,
2013                                            &attr->ia_mtime) < 0) {
2014                        inode->i_mtime = attr->ia_mtime;
2015                        dirtied |= CEPH_CAP_FILE_WR;
2016                } else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
2017                           !timespec_equal(&inode->i_mtime, &attr->ia_mtime)) {
2018                        ceph_encode_timespec(&req->r_args.setattr.mtime,
2019                                             &attr->ia_mtime);
2020                        mask |= CEPH_SETATTR_MTIME;
2021                        release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_RD |
2022                                CEPH_CAP_FILE_WR;
2023                }
2024        }
2025        if (ia_valid & ATTR_SIZE) {
2026                dout("setattr %p size %lld -> %lld\n", inode,
2027                     inode->i_size, attr->ia_size);
2028                if ((issued & CEPH_CAP_FILE_EXCL) &&
2029                    attr->ia_size > inode->i_size) {
2030                        i_size_write(inode, attr->ia_size);
2031                        inode->i_blocks = calc_inode_blocks(attr->ia_size);
2032                        ci->i_reported_size = attr->ia_size;
2033                        dirtied |= CEPH_CAP_FILE_EXCL;
2034                } else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
2035                           attr->ia_size != inode->i_size) {
2036                        req->r_args.setattr.size = cpu_to_le64(attr->ia_size);
2037                        req->r_args.setattr.old_size =
2038                                cpu_to_le64(inode->i_size);
2039                        mask |= CEPH_SETATTR_SIZE;
2040                        release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_RD |
2041                                CEPH_CAP_FILE_WR;
2042                }
2043        }
2044
2045        /* these do nothing */
2046        if (ia_valid & ATTR_CTIME) {
2047                bool only = (ia_valid & (ATTR_SIZE|ATTR_MTIME|ATTR_ATIME|
2048                                         ATTR_MODE|ATTR_UID|ATTR_GID)) == 0;
2049                dout("setattr %p ctime %ld.%ld -> %ld.%ld (%s)\n", inode,
2050                     inode->i_ctime.tv_sec, inode->i_ctime.tv_nsec,
2051                     attr->ia_ctime.tv_sec, attr->ia_ctime.tv_nsec,
2052                     only ? "ctime only" : "ignored");
2053                if (only) {
2054                        /*
2055                         * if kernel wants to dirty ctime but nothing else,
2056                         * we need to choose a cap to dirty under, or do
2057                         * a almost-no-op setattr
2058                         */
2059                        if (issued & CEPH_CAP_AUTH_EXCL)
2060                                dirtied |= CEPH_CAP_AUTH_EXCL;
2061                        else if (issued & CEPH_CAP_FILE_EXCL)
2062                                dirtied |= CEPH_CAP_FILE_EXCL;
2063                        else if (issued & CEPH_CAP_XATTR_EXCL)
2064                                dirtied |= CEPH_CAP_XATTR_EXCL;
2065                        else
2066                                mask |= CEPH_SETATTR_CTIME;
2067                }
2068        }
2069        if (ia_valid & ATTR_FILE)
2070                dout("setattr %p ATTR_FILE ... hrm!\n", inode);
2071
2072        if (dirtied) {
2073                inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied,
2074                                                           &prealloc_cf);
2075                inode->i_ctime = attr->ia_ctime;
2076        }
2077
2078        release &= issued;
2079        spin_unlock(&ci->i_ceph_lock);
2080        if (lock_snap_rwsem)
2081                up_read(&mdsc->snap_rwsem);
2082
2083        if (inode_dirty_flags)
2084                __mark_inode_dirty(inode, inode_dirty_flags);
2085
2086        if (mask) {
2087                req->r_inode = inode;
2088                ihold(inode);
2089                req->r_inode_drop = release;
2090                req->r_args.setattr.mask = cpu_to_le32(mask);
2091                req->r_num_caps = 1;
2092                req->r_stamp = attr->ia_ctime;
2093                err = ceph_mdsc_do_request(mdsc, NULL, req);
2094        }
2095        dout("setattr %p result=%d (%s locally, %d remote)\n", inode, err,
2096             ceph_cap_string(dirtied), mask);
2097
2098        ceph_mdsc_put_request(req);
2099        ceph_free_cap_flush(prealloc_cf);
2100
2101        if (err >= 0 && (mask & CEPH_SETATTR_SIZE))
2102                __ceph_do_pending_vmtruncate(inode);
2103
2104        return err;
2105}
2106
2107/*
2108 * setattr
2109 */
2110int ceph_setattr(struct dentry *dentry, struct iattr *attr)
2111{
2112        int err;
2113
2114        err = __ceph_setattr(dentry, attr);
2115
2116        if (err >= 0 && (attr->ia_valid & ATTR_MODE))
2117                err = ceph_acl_chmod(dentry, dentry->d_inode);
2118
2119        return err;
2120}
2121
2122/*
2123 * Verify that we have a lease on the given mask.  If not,
2124 * do a getattr against an mds.
2125 */
2126int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
2127                      int mask, bool force)
2128{
2129        struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
2130        struct ceph_mds_client *mdsc = fsc->mdsc;
2131        struct ceph_mds_request *req;
2132        int err;
2133
2134        if (ceph_snap(inode) == CEPH_SNAPDIR) {
2135                dout("do_getattr inode %p SNAPDIR\n", inode);
2136                return 0;
2137        }
2138
2139        dout("do_getattr inode %p mask %s mode 0%o\n",
2140             inode, ceph_cap_string(mask), inode->i_mode);
2141        if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
2142                return 0;
2143
2144        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS);
2145        if (IS_ERR(req))
2146                return PTR_ERR(req);
2147        req->r_inode = inode;
2148        ihold(inode);
2149        req->r_num_caps = 1;
2150        req->r_args.getattr.mask = cpu_to_le32(mask);
2151        req->r_locked_page = locked_page;
2152        err = ceph_mdsc_do_request(mdsc, NULL, req);
2153        if (locked_page && err == 0) {
2154                u64 inline_version = req->r_reply_info.targeti.inline_version;
2155                if (inline_version == 0) {
2156                        /* the reply is supposed to contain inline data */
2157                        err = -EINVAL;
2158                } else if (inline_version == CEPH_INLINE_NONE) {
2159                        err = -ENODATA;
2160                } else {
2161                        err = req->r_reply_info.targeti.inline_len;
2162                }
2163        }
2164        ceph_mdsc_put_request(req);
2165        dout("do_getattr result=%d\n", err);
2166        return err;
2167}
2168
2169
2170/*
2171 * Check inode permissions.  We verify we have a valid value for
2172 * the AUTH cap, then call the generic handler.
2173 */
2174int ceph_permission(struct inode *inode, int mask)
2175{
2176        int err;
2177
2178        if (mask & MAY_NOT_BLOCK)
2179                return -ECHILD;
2180
2181        err = ceph_do_getattr(inode, CEPH_CAP_AUTH_SHARED, false);
2182
2183        if (!err)
2184                err = generic_permission(inode, mask);
2185        return err;
2186}
2187
2188/*
2189 * Get all attributes.  Hopefully somedata we'll have a statlite()
2190 * and can limit the fields we require to be accurate.
2191 */
2192int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
2193                 struct kstat *stat)
2194{
2195        struct inode *inode = dentry->d_inode;
2196        struct ceph_inode_info *ci = ceph_inode(inode);
2197        int err;
2198
2199        err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL, false);
2200        if (!err) {
2201                generic_fillattr(inode, stat);
2202                stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino);
2203                if (ceph_snap(inode) != CEPH_NOSNAP)
2204                        stat->dev = ceph_snap(inode);
2205                else
2206                        stat->dev = 0;
2207                if (S_ISDIR(inode->i_mode)) {
2208                        if (ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb),
2209                                                RBYTES))
2210                                stat->size = ci->i_rbytes;
2211                        else
2212                                stat->size = ci->i_files + ci->i_subdirs;
2213                        stat->blocks = 0;
2214                        stat->blksize = 65536;
2215                }
2216        }
2217        return err;
2218}
2219