linux/fs/ceph/dir.c
<<
>>
Prefs
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/spinlock.h>
   4#include <linux/fs_struct.h>
   5#include <linux/namei.h>
   6#include <linux/slab.h>
   7#include <linux/sched.h>
   8
   9#include "super.h"
  10#include "mds_client.h"
  11
  12/*
  13 * Directory operations: readdir, lookup, create, link, unlink,
  14 * rename, etc.
  15 */
  16
  17/*
  18 * Ceph MDS operations are specified in terms of a base ino and
  19 * relative path.  Thus, the client can specify an operation on a
  20 * specific inode (e.g., a getattr due to fstat(2)), or as a path
  21 * relative to, say, the root directory.
  22 *
  23 * Normally, we limit ourselves to strict inode ops (no path component)
  24 * or dentry operations (a single path component relative to an ino).  The
  25 * exception to this is open_root_dentry(), which will open the mount
  26 * point by name.
  27 */
  28
  29const struct dentry_operations ceph_dentry_ops;
  30
  31/*
  32 * Initialize ceph dentry state.
  33 */
  34int ceph_init_dentry(struct dentry *dentry)
  35{
  36        struct ceph_dentry_info *di;
  37
  38        if (dentry->d_fsdata)
  39                return 0;
  40
  41        di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL);
  42        if (!di)
  43                return -ENOMEM;          /* oh well */
  44
  45        spin_lock(&dentry->d_lock);
  46        if (dentry->d_fsdata) {
  47                /* lost a race */
  48                kmem_cache_free(ceph_dentry_cachep, di);
  49                goto out_unlock;
  50        }
  51
  52        if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_NOSNAP)
  53                d_set_d_op(dentry, &ceph_dentry_ops);
  54        else if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_SNAPDIR)
  55                d_set_d_op(dentry, &ceph_snapdir_dentry_ops);
  56        else
  57                d_set_d_op(dentry, &ceph_snap_dentry_ops);
  58
  59        di->dentry = dentry;
  60        di->lease_session = NULL;
  61        dentry->d_time = jiffies;
  62        /* avoid reordering d_fsdata setup so that the check above is safe */
  63        smp_mb();
  64        dentry->d_fsdata = di;
  65        ceph_dentry_lru_add(dentry);
  66out_unlock:
  67        spin_unlock(&dentry->d_lock);
  68        return 0;
  69}
  70
  71/*
  72 * for readdir, we encode the directory frag and offset within that
  73 * frag into f_pos.
  74 */
  75static unsigned fpos_frag(loff_t p)
  76{
  77        return p >> 32;
  78}
  79static unsigned fpos_off(loff_t p)
  80{
  81        return p & 0xffffffff;
  82}
  83
  84static int fpos_cmp(loff_t l, loff_t r)
  85{
  86        int v = ceph_frag_compare(fpos_frag(l), fpos_frag(r));
  87        if (v)
  88                return v;
  89        return (int)(fpos_off(l) - fpos_off(r));
  90}
  91
  92/*
  93 * make note of the last dentry we read, so we can
  94 * continue at the same lexicographical point,
  95 * regardless of what dir changes take place on the
  96 * server.
  97 */
  98static int note_last_dentry(struct ceph_file_info *fi, const char *name,
  99                            int len, unsigned next_offset)
 100{
 101        char *buf = kmalloc(len+1, GFP_KERNEL);
 102        if (!buf)
 103                return -ENOMEM;
 104        kfree(fi->last_name);
 105        fi->last_name = buf;
 106        memcpy(fi->last_name, name, len);
 107        fi->last_name[len] = 0;
 108        fi->next_offset = next_offset;
 109        dout("note_last_dentry '%s'\n", fi->last_name);
 110        return 0;
 111}
 112
 113/*
 114 * When possible, we try to satisfy a readdir by peeking at the
 115 * dcache.  We make this work by carefully ordering dentries on
 116 * d_child when we initially get results back from the MDS, and
 117 * falling back to a "normal" sync readdir if any dentries in the dir
 118 * are dropped.
 119 *
 120 * Complete dir indicates that we have all dentries in the dir.  It is
 121 * defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by
 122 * the MDS if/when the directory is modified).
 123 */
 124static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
 125                            u32 shared_gen)
 126{
 127        struct ceph_file_info *fi = file->private_data;
 128        struct dentry *parent = file->f_path.dentry;
 129        struct inode *dir = d_inode(parent);
 130        struct dentry *dentry, *last = NULL;
 131        struct ceph_dentry_info *di;
 132        unsigned nsize = PAGE_SIZE / sizeof(struct dentry *);
 133        int err = 0;
 134        loff_t ptr_pos = 0;
 135        struct ceph_readdir_cache_control cache_ctl = {};
 136
 137        dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos);
 138
 139        /* we can calculate cache index for the first dirfrag */
 140        if (ceph_frag_is_leftmost(fpos_frag(ctx->pos))) {
 141                cache_ctl.index = fpos_off(ctx->pos) - 2;
 142                BUG_ON(cache_ctl.index < 0);
 143                ptr_pos = cache_ctl.index * sizeof(struct dentry *);
 144        }
 145
 146        while (true) {
 147                pgoff_t pgoff;
 148                bool emit_dentry;
 149
 150                if (ptr_pos >= i_size_read(dir)) {
 151                        fi->flags |= CEPH_F_ATEND;
 152                        err = 0;
 153                        break;
 154                }
 155
 156                err = -EAGAIN;
 157                pgoff = ptr_pos >> PAGE_SHIFT;
 158                if (!cache_ctl.page || pgoff != page_index(cache_ctl.page)) {
 159                        ceph_readdir_cache_release(&cache_ctl);
 160                        cache_ctl.page = find_lock_page(&dir->i_data, pgoff);
 161                        if (!cache_ctl.page) {
 162                                dout(" page %lu not found\n", pgoff);
 163                                break;
 164                        }
 165                        /* reading/filling the cache are serialized by
 166                         * i_mutex, no need to use page lock */
 167                        unlock_page(cache_ctl.page);
 168                        cache_ctl.dentries = kmap(cache_ctl.page);
 169                }
 170
 171                rcu_read_lock();
 172                spin_lock(&parent->d_lock);
 173                /* check i_size again here, because empty directory can be
 174                 * marked as complete while not holding the i_mutex. */
 175                if (ceph_dir_is_complete_ordered(dir) &&
 176                    ptr_pos < i_size_read(dir))
 177                        dentry = cache_ctl.dentries[cache_ctl.index % nsize];
 178                else
 179                        dentry = NULL;
 180                spin_unlock(&parent->d_lock);
 181                if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
 182                        dentry = NULL;
 183                rcu_read_unlock();
 184                if (!dentry)
 185                        break;
 186
 187                emit_dentry = false;
 188                di = ceph_dentry(dentry);
 189                spin_lock(&dentry->d_lock);
 190                if (di->lease_shared_gen == shared_gen &&
 191                    d_really_is_positive(dentry) &&
 192                    ceph_snap(d_inode(dentry)) != CEPH_SNAPDIR &&
 193                    ceph_ino(d_inode(dentry)) != CEPH_INO_CEPH &&
 194                    fpos_cmp(ctx->pos, di->offset) <= 0) {
 195                        emit_dentry = true;
 196                }
 197                spin_unlock(&dentry->d_lock);
 198
 199                if (emit_dentry) {
 200                        dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos,
 201                             dentry, dentry, d_inode(dentry));
 202                        ctx->pos = di->offset;
 203                        if (!dir_emit(ctx, dentry->d_name.name,
 204                                      dentry->d_name.len,
 205                                      ceph_translate_ino(dentry->d_sb,
 206                                                         d_inode(dentry)->i_ino),
 207                                      d_inode(dentry)->i_mode >> 12)) {
 208                                dput(dentry);
 209                                err = 0;
 210                                break;
 211                        }
 212                        ctx->pos++;
 213
 214                        if (last)
 215                                dput(last);
 216                        last = dentry;
 217                } else {
 218                        dput(dentry);
 219                }
 220
 221                cache_ctl.index++;
 222                ptr_pos += sizeof(struct dentry *);
 223        }
 224        ceph_readdir_cache_release(&cache_ctl);
 225        if (last) {
 226                int ret;
 227                di = ceph_dentry(last);
 228                ret = note_last_dentry(fi, last->d_name.name, last->d_name.len,
 229                                       fpos_off(di->offset) + 1);
 230                if (ret < 0)
 231                        err = ret;
 232                dput(last);
 233        }
 234        return err;
 235}
 236
 237static int ceph_readdir(struct file *file, struct dir_context *ctx)
 238{
 239        struct ceph_file_info *fi = file->private_data;
 240        struct inode *inode = file_inode(file);
 241        struct ceph_inode_info *ci = ceph_inode(inode);
 242        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 243        struct ceph_mds_client *mdsc = fsc->mdsc;
 244        unsigned frag = fpos_frag(ctx->pos);
 245        int off = fpos_off(ctx->pos);
 246        int err;
 247        u32 ftype;
 248        struct ceph_mds_reply_info_parsed *rinfo;
 249
 250        dout("readdir %p file %p frag %u off %u\n", inode, file, frag, off);
 251        if (fi->flags & CEPH_F_ATEND)
 252                return 0;
 253
 254        /* always start with . and .. */
 255        if (ctx->pos == 0) {
 256                dout("readdir off 0 -> '.'\n");
 257                if (!dir_emit(ctx, ".", 1, 
 258                            ceph_translate_ino(inode->i_sb, inode->i_ino),
 259                            inode->i_mode >> 12))
 260                        return 0;
 261                ctx->pos = 1;
 262                off = 1;
 263        }
 264        if (ctx->pos == 1) {
 265                ino_t ino = parent_ino(file->f_path.dentry);
 266                dout("readdir off 1 -> '..'\n");
 267                if (!dir_emit(ctx, "..", 2,
 268                            ceph_translate_ino(inode->i_sb, ino),
 269                            inode->i_mode >> 12))
 270                        return 0;
 271                ctx->pos = 2;
 272                off = 2;
 273        }
 274
 275        /* can we use the dcache? */
 276        spin_lock(&ci->i_ceph_lock);
 277        if (ceph_test_mount_opt(fsc, DCACHE) &&
 278            !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
 279            ceph_snap(inode) != CEPH_SNAPDIR &&
 280            __ceph_dir_is_complete_ordered(ci) &&
 281            __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
 282                u32 shared_gen = ci->i_shared_gen;
 283                spin_unlock(&ci->i_ceph_lock);
 284                err = __dcache_readdir(file, ctx, shared_gen);
 285                if (err != -EAGAIN)
 286                        return err;
 287                frag = fpos_frag(ctx->pos);
 288                off = fpos_off(ctx->pos);
 289        } else {
 290                spin_unlock(&ci->i_ceph_lock);
 291        }
 292
 293        /* proceed with a normal readdir */
 294more:
 295        /* do we have the correct frag content buffered? */
 296        if (fi->frag != frag || fi->last_readdir == NULL) {
 297                struct ceph_mds_request *req;
 298                int op = ceph_snap(inode) == CEPH_SNAPDIR ?
 299                        CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
 300
 301                /* discard old result, if any */
 302                if (fi->last_readdir) {
 303                        ceph_mdsc_put_request(fi->last_readdir);
 304                        fi->last_readdir = NULL;
 305                }
 306
 307                dout("readdir fetching %llx.%llx frag %x offset '%s'\n",
 308                     ceph_vinop(inode), frag, fi->last_name);
 309                req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
 310                if (IS_ERR(req))
 311                        return PTR_ERR(req);
 312                err = ceph_alloc_readdir_reply_buffer(req, inode);
 313                if (err) {
 314                        ceph_mdsc_put_request(req);
 315                        return err;
 316                }
 317                /* hints to request -> mds selection code */
 318                req->r_direct_mode = USE_AUTH_MDS;
 319                req->r_direct_hash = ceph_frag_value(frag);
 320                req->r_direct_is_hash = true;
 321                if (fi->last_name) {
 322                        req->r_path2 = kstrdup(fi->last_name, GFP_KERNEL);
 323                        if (!req->r_path2) {
 324                                ceph_mdsc_put_request(req);
 325                                return -ENOMEM;
 326                        }
 327                }
 328                req->r_dir_release_cnt = fi->dir_release_count;
 329                req->r_dir_ordered_cnt = fi->dir_ordered_count;
 330                req->r_readdir_cache_idx = fi->readdir_cache_idx;
 331                req->r_readdir_offset = fi->next_offset;
 332                req->r_args.readdir.frag = cpu_to_le32(frag);
 333
 334                req->r_inode = inode;
 335                ihold(inode);
 336                req->r_dentry = dget(file->f_path.dentry);
 337                err = ceph_mdsc_do_request(mdsc, NULL, req);
 338                if (err < 0) {
 339                        ceph_mdsc_put_request(req);
 340                        return err;
 341                }
 342                dout("readdir got and parsed readdir result=%d"
 343                     " on frag %x, end=%d, complete=%d\n", err, frag,
 344                     (int)req->r_reply_info.dir_end,
 345                     (int)req->r_reply_info.dir_complete);
 346
 347
 348                /* note next offset and last dentry name */
 349                rinfo = &req->r_reply_info;
 350                if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
 351                        frag = le32_to_cpu(rinfo->dir_dir->frag);
 352                        off = req->r_readdir_offset;
 353                        fi->next_offset = off;
 354                }
 355
 356                fi->frag = frag;
 357                fi->offset = fi->next_offset;
 358                fi->last_readdir = req;
 359
 360                if (req->r_did_prepopulate) {
 361                        fi->readdir_cache_idx = req->r_readdir_cache_idx;
 362                        if (fi->readdir_cache_idx < 0) {
 363                                /* preclude from marking dir ordered */
 364                                fi->dir_ordered_count = 0;
 365                        } else if (ceph_frag_is_leftmost(frag) && off == 2) {
 366                                /* note dir version at start of readdir so
 367                                 * we can tell if any dentries get dropped */
 368                                fi->dir_release_count = req->r_dir_release_cnt;
 369                                fi->dir_ordered_count = req->r_dir_ordered_cnt;
 370                        }
 371                } else {
 372                        dout("readdir !did_prepopulate");
 373                        /* disable readdir cache */
 374                        fi->readdir_cache_idx = -1;
 375                        /* preclude from marking dir complete */
 376                        fi->dir_release_count = 0;
 377                }
 378
 379                if (req->r_reply_info.dir_end) {
 380                        kfree(fi->last_name);
 381                        fi->last_name = NULL;
 382                        if (ceph_frag_is_rightmost(frag))
 383                                fi->next_offset = 2;
 384                        else
 385                                fi->next_offset = 0;
 386                } else {
 387                        err = note_last_dentry(fi,
 388                                       rinfo->dir_dname[rinfo->dir_nr-1],
 389                                       rinfo->dir_dname_len[rinfo->dir_nr-1],
 390                                       fi->next_offset + rinfo->dir_nr);
 391                        if (err)
 392                                return err;
 393                }
 394        }
 395
 396        rinfo = &fi->last_readdir->r_reply_info;
 397        dout("readdir frag %x num %d off %d chunkoff %d\n", frag,
 398             rinfo->dir_nr, off, fi->offset);
 399
 400        ctx->pos = ceph_make_fpos(frag, off);
 401        while (off >= fi->offset && off - fi->offset < rinfo->dir_nr) {
 402                struct ceph_mds_reply_inode *in =
 403                        rinfo->dir_in[off - fi->offset].in;
 404                struct ceph_vino vino;
 405                ino_t ino;
 406
 407                dout("readdir off %d (%d/%d) -> %lld '%.*s' %p\n",
 408                     off, off - fi->offset, rinfo->dir_nr, ctx->pos,
 409                     rinfo->dir_dname_len[off - fi->offset],
 410                     rinfo->dir_dname[off - fi->offset], in);
 411                BUG_ON(!in);
 412                ftype = le32_to_cpu(in->mode) >> 12;
 413                vino.ino = le64_to_cpu(in->ino);
 414                vino.snap = le64_to_cpu(in->snapid);
 415                ino = ceph_vino_to_ino(vino);
 416                if (!dir_emit(ctx,
 417                            rinfo->dir_dname[off - fi->offset],
 418                            rinfo->dir_dname_len[off - fi->offset],
 419                            ceph_translate_ino(inode->i_sb, ino), ftype)) {
 420                        dout("filldir stopping us...\n");
 421                        return 0;
 422                }
 423                off++;
 424                ctx->pos++;
 425        }
 426
 427        if (fi->last_name) {
 428                ceph_mdsc_put_request(fi->last_readdir);
 429                fi->last_readdir = NULL;
 430                goto more;
 431        }
 432
 433        /* more frags? */
 434        if (!ceph_frag_is_rightmost(frag)) {
 435                frag = ceph_frag_next(frag);
 436                off = 0;
 437                ctx->pos = ceph_make_fpos(frag, off);
 438                dout("readdir next frag is %x\n", frag);
 439                goto more;
 440        }
 441        fi->flags |= CEPH_F_ATEND;
 442
 443        /*
 444         * if dir_release_count still matches the dir, no dentries
 445         * were released during the whole readdir, and we should have
 446         * the complete dir contents in our cache.
 447         */
 448        if (atomic64_read(&ci->i_release_count) == fi->dir_release_count) {
 449                spin_lock(&ci->i_ceph_lock);
 450                if (fi->dir_ordered_count == atomic64_read(&ci->i_ordered_count)) {
 451                        dout(" marking %p complete and ordered\n", inode);
 452                        /* use i_size to track number of entries in
 453                         * readdir cache */
 454                        BUG_ON(fi->readdir_cache_idx < 0);
 455                        i_size_write(inode, fi->readdir_cache_idx *
 456                                     sizeof(struct dentry*));
 457                } else {
 458                        dout(" marking %p complete\n", inode);
 459                }
 460                __ceph_dir_set_complete(ci, fi->dir_release_count,
 461                                        fi->dir_ordered_count);
 462                spin_unlock(&ci->i_ceph_lock);
 463        }
 464
 465        dout("readdir %p file %p done.\n", inode, file);
 466        return 0;
 467}
 468
 469static void reset_readdir(struct ceph_file_info *fi, unsigned frag)
 470{
 471        if (fi->last_readdir) {
 472                ceph_mdsc_put_request(fi->last_readdir);
 473                fi->last_readdir = NULL;
 474        }
 475        kfree(fi->last_name);
 476        fi->last_name = NULL;
 477        fi->dir_release_count = 0;
 478        fi->readdir_cache_idx = -1;
 479        if (ceph_frag_is_leftmost(frag))
 480                fi->next_offset = 2;  /* compensate for . and .. */
 481        else
 482                fi->next_offset = 0;
 483        fi->flags &= ~CEPH_F_ATEND;
 484}
 485
 486static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
 487{
 488        struct ceph_file_info *fi = file->private_data;
 489        struct inode *inode = file->f_mapping->host;
 490        loff_t old_offset = ceph_make_fpos(fi->frag, fi->next_offset);
 491        loff_t retval;
 492
 493        inode_lock(inode);
 494        retval = -EINVAL;
 495        switch (whence) {
 496        case SEEK_CUR:
 497                offset += file->f_pos;
 498        case SEEK_SET:
 499                break;
 500        case SEEK_END:
 501                retval = -EOPNOTSUPP;
 502        default:
 503                goto out;
 504        }
 505
 506        if (offset >= 0) {
 507                if (offset != file->f_pos) {
 508                        file->f_pos = offset;
 509                        file->f_version = 0;
 510                        fi->flags &= ~CEPH_F_ATEND;
 511                }
 512                retval = offset;
 513
 514                if (offset == 0 ||
 515                    fpos_frag(offset) != fi->frag ||
 516                    fpos_off(offset) < fi->offset) {
 517                        /* discard buffered readdir content on seekdir(0), or
 518                         * seek to new frag, or seek prior to current chunk */
 519                        dout("dir_llseek dropping %p content\n", file);
 520                        reset_readdir(fi, fpos_frag(offset));
 521                } else if (fpos_cmp(offset, old_offset) > 0) {
 522                        /* reset dir_release_count if we did a forward seek */
 523                        fi->dir_release_count = 0;
 524                        fi->readdir_cache_idx = -1;
 525                }
 526        }
 527out:
 528        inode_unlock(inode);
 529        return retval;
 530}
 531
 532/*
 533 * Handle lookups for the hidden .snap directory.
 534 */
 535int ceph_handle_snapdir(struct ceph_mds_request *req,
 536                        struct dentry *dentry, int err)
 537{
 538        struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
 539        struct inode *parent = d_inode(dentry->d_parent); /* we hold i_mutex */
 540
 541        /* .snap dir? */
 542        if (err == -ENOENT &&
 543            ceph_snap(parent) == CEPH_NOSNAP &&
 544            strcmp(dentry->d_name.name,
 545                   fsc->mount_options->snapdir_name) == 0) {
 546                struct inode *inode = ceph_get_snapdir(parent);
 547                dout("ENOENT on snapdir %p '%pd', linking to snapdir %p\n",
 548                     dentry, dentry, inode);
 549                BUG_ON(!d_unhashed(dentry));
 550                d_add(dentry, inode);
 551                err = 0;
 552        }
 553        return err;
 554}
 555
 556/*
 557 * Figure out final result of a lookup/open request.
 558 *
 559 * Mainly, make sure we return the final req->r_dentry (if it already
 560 * existed) in place of the original VFS-provided dentry when they
 561 * differ.
 562 *
 563 * Gracefully handle the case where the MDS replies with -ENOENT and
 564 * no trace (which it may do, at its discretion, e.g., if it doesn't
 565 * care to issue a lease on the negative dentry).
 566 */
 567struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
 568                                  struct dentry *dentry, int err)
 569{
 570        if (err == -ENOENT) {
 571                /* no trace? */
 572                err = 0;
 573                if (!req->r_reply_info.head->is_dentry) {
 574                        dout("ENOENT and no trace, dentry %p inode %p\n",
 575                             dentry, d_inode(dentry));
 576                        if (d_really_is_positive(dentry)) {
 577                                d_drop(dentry);
 578                                err = -ENOENT;
 579                        } else {
 580                                d_add(dentry, NULL);
 581                        }
 582                }
 583        }
 584        if (err)
 585                dentry = ERR_PTR(err);
 586        else if (dentry != req->r_dentry)
 587                dentry = dget(req->r_dentry);   /* we got spliced */
 588        else
 589                dentry = NULL;
 590        return dentry;
 591}
 592
 593static int is_root_ceph_dentry(struct inode *inode, struct dentry *dentry)
 594{
 595        return ceph_ino(inode) == CEPH_INO_ROOT &&
 596                strncmp(dentry->d_name.name, ".ceph", 5) == 0;
 597}
 598
 599/*
 600 * Look up a single dir entry.  If there is a lookup intent, inform
 601 * the MDS so that it gets our 'caps wanted' value in a single op.
 602 */
 603static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
 604                                  unsigned int flags)
 605{
 606        struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
 607        struct ceph_mds_client *mdsc = fsc->mdsc;
 608        struct ceph_mds_request *req;
 609        int op;
 610        int mask;
 611        int err;
 612
 613        dout("lookup %p dentry %p '%pd'\n",
 614             dir, dentry, dentry);
 615
 616        if (dentry->d_name.len > NAME_MAX)
 617                return ERR_PTR(-ENAMETOOLONG);
 618
 619        err = ceph_init_dentry(dentry);
 620        if (err < 0)
 621                return ERR_PTR(err);
 622
 623        /* can we conclude ENOENT locally? */
 624        if (d_really_is_negative(dentry)) {
 625                struct ceph_inode_info *ci = ceph_inode(dir);
 626                struct ceph_dentry_info *di = ceph_dentry(dentry);
 627
 628                spin_lock(&ci->i_ceph_lock);
 629                dout(" dir %p flags are %d\n", dir, ci->i_ceph_flags);
 630                if (strncmp(dentry->d_name.name,
 631                            fsc->mount_options->snapdir_name,
 632                            dentry->d_name.len) &&
 633                    !is_root_ceph_dentry(dir, dentry) &&
 634                    ceph_test_mount_opt(fsc, DCACHE) &&
 635                    __ceph_dir_is_complete(ci) &&
 636                    (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) {
 637                        spin_unlock(&ci->i_ceph_lock);
 638                        dout(" dir %p complete, -ENOENT\n", dir);
 639                        d_add(dentry, NULL);
 640                        di->lease_shared_gen = ci->i_shared_gen;
 641                        return NULL;
 642                }
 643                spin_unlock(&ci->i_ceph_lock);
 644        }
 645
 646        op = ceph_snap(dir) == CEPH_SNAPDIR ?
 647                CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
 648        req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
 649        if (IS_ERR(req))
 650                return ERR_CAST(req);
 651        req->r_dentry = dget(dentry);
 652        req->r_num_caps = 2;
 653
 654        mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
 655        if (ceph_security_xattr_wanted(dir))
 656                mask |= CEPH_CAP_XATTR_SHARED;
 657        req->r_args.getattr.mask = cpu_to_le32(mask);
 658
 659        req->r_locked_dir = dir;
 660        err = ceph_mdsc_do_request(mdsc, NULL, req);
 661        err = ceph_handle_snapdir(req, dentry, err);
 662        dentry = ceph_finish_lookup(req, dentry, err);
 663        ceph_mdsc_put_request(req);  /* will dput(dentry) */
 664        dout("lookup result=%p\n", dentry);
 665        return dentry;
 666}
 667
 668/*
 669 * If we do a create but get no trace back from the MDS, follow up with
 670 * a lookup (the VFS expects us to link up the provided dentry).
 671 */
 672int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry)
 673{
 674        struct dentry *result = ceph_lookup(dir, dentry, 0);
 675
 676        if (result && !IS_ERR(result)) {
 677                /*
 678                 * We created the item, then did a lookup, and found
 679                 * it was already linked to another inode we already
 680                 * had in our cache (and thus got spliced). To not
 681                 * confuse VFS (especially when inode is a directory),
 682                 * we don't link our dentry to that inode, return an
 683                 * error instead.
 684                 *
 685                 * This event should be rare and it happens only when
 686                 * we talk to old MDS. Recent MDS does not send traceless
 687                 * reply for request that creates new inode.
 688                 */
 689                d_drop(result);
 690                return -ESTALE;
 691        }
 692        return PTR_ERR(result);
 693}
 694
 695static int ceph_mknod(struct inode *dir, struct dentry *dentry,
 696                      umode_t mode, dev_t rdev)
 697{
 698        struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
 699        struct ceph_mds_client *mdsc = fsc->mdsc;
 700        struct ceph_mds_request *req;
 701        struct ceph_acls_info acls = {};
 702        int err;
 703
 704        if (ceph_snap(dir) != CEPH_NOSNAP)
 705                return -EROFS;
 706
 707        err = ceph_pre_init_acls(dir, &mode, &acls);
 708        if (err < 0)
 709                return err;
 710
 711        dout("mknod in dir %p dentry %p mode 0%ho rdev %d\n",
 712             dir, dentry, mode, rdev);
 713        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS);
 714        if (IS_ERR(req)) {
 715                err = PTR_ERR(req);
 716                goto out;
 717        }
 718        req->r_dentry = dget(dentry);
 719        req->r_num_caps = 2;
 720        req->r_locked_dir = dir;
 721        req->r_args.mknod.mode = cpu_to_le32(mode);
 722        req->r_args.mknod.rdev = cpu_to_le32(rdev);
 723        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
 724        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
 725        if (acls.pagelist) {
 726                req->r_pagelist = acls.pagelist;
 727                acls.pagelist = NULL;
 728        }
 729        err = ceph_mdsc_do_request(mdsc, dir, req);
 730        if (!err && !req->r_reply_info.head->is_dentry)
 731                err = ceph_handle_notrace_create(dir, dentry);
 732        ceph_mdsc_put_request(req);
 733out:
 734        if (!err)
 735                ceph_init_inode_acls(d_inode(dentry), &acls);
 736        else
 737                d_drop(dentry);
 738        ceph_release_acls_info(&acls);
 739        return err;
 740}
 741
 742static int ceph_create(struct inode *dir, struct dentry *dentry, umode_t mode,
 743                       bool excl)
 744{
 745        return ceph_mknod(dir, dentry, mode, 0);
 746}
 747
 748static int ceph_symlink(struct inode *dir, struct dentry *dentry,
 749                            const char *dest)
 750{
 751        struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
 752        struct ceph_mds_client *mdsc = fsc->mdsc;
 753        struct ceph_mds_request *req;
 754        int err;
 755
 756        if (ceph_snap(dir) != CEPH_NOSNAP)
 757                return -EROFS;
 758
 759        dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest);
 760        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS);
 761        if (IS_ERR(req)) {
 762                err = PTR_ERR(req);
 763                goto out;
 764        }
 765        req->r_path2 = kstrdup(dest, GFP_KERNEL);
 766        if (!req->r_path2) {
 767                err = -ENOMEM;
 768                ceph_mdsc_put_request(req);
 769                goto out;
 770        }
 771        req->r_locked_dir = dir;
 772        req->r_dentry = dget(dentry);
 773        req->r_num_caps = 2;
 774        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
 775        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
 776        err = ceph_mdsc_do_request(mdsc, dir, req);
 777        if (!err && !req->r_reply_info.head->is_dentry)
 778                err = ceph_handle_notrace_create(dir, dentry);
 779        ceph_mdsc_put_request(req);
 780out:
 781        if (err)
 782                d_drop(dentry);
 783        return err;
 784}
 785
 786static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
 787{
 788        struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
 789        struct ceph_mds_client *mdsc = fsc->mdsc;
 790        struct ceph_mds_request *req;
 791        struct ceph_acls_info acls = {};
 792        int err = -EROFS;
 793        int op;
 794
 795        if (ceph_snap(dir) == CEPH_SNAPDIR) {
 796                /* mkdir .snap/foo is a MKSNAP */
 797                op = CEPH_MDS_OP_MKSNAP;
 798                dout("mksnap dir %p snap '%pd' dn %p\n", dir,
 799                     dentry, dentry);
 800        } else if (ceph_snap(dir) == CEPH_NOSNAP) {
 801                dout("mkdir dir %p dn %p mode 0%ho\n", dir, dentry, mode);
 802                op = CEPH_MDS_OP_MKDIR;
 803        } else {
 804                goto out;
 805        }
 806
 807        mode |= S_IFDIR;
 808        err = ceph_pre_init_acls(dir, &mode, &acls);
 809        if (err < 0)
 810                goto out;
 811
 812        req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
 813        if (IS_ERR(req)) {
 814                err = PTR_ERR(req);
 815                goto out;
 816        }
 817
 818        req->r_dentry = dget(dentry);
 819        req->r_num_caps = 2;
 820        req->r_locked_dir = dir;
 821        req->r_args.mkdir.mode = cpu_to_le32(mode);
 822        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
 823        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
 824        if (acls.pagelist) {
 825                req->r_pagelist = acls.pagelist;
 826                acls.pagelist = NULL;
 827        }
 828        err = ceph_mdsc_do_request(mdsc, dir, req);
 829        if (!err &&
 830            !req->r_reply_info.head->is_target &&
 831            !req->r_reply_info.head->is_dentry)
 832                err = ceph_handle_notrace_create(dir, dentry);
 833        ceph_mdsc_put_request(req);
 834out:
 835        if (!err)
 836                ceph_init_inode_acls(d_inode(dentry), &acls);
 837        else
 838                d_drop(dentry);
 839        ceph_release_acls_info(&acls);
 840        return err;
 841}
 842
 843static int ceph_link(struct dentry *old_dentry, struct inode *dir,
 844                     struct dentry *dentry)
 845{
 846        struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
 847        struct ceph_mds_client *mdsc = fsc->mdsc;
 848        struct ceph_mds_request *req;
 849        int err;
 850
 851        if (ceph_snap(dir) != CEPH_NOSNAP)
 852                return -EROFS;
 853
 854        dout("link in dir %p old_dentry %p dentry %p\n", dir,
 855             old_dentry, dentry);
 856        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LINK, USE_AUTH_MDS);
 857        if (IS_ERR(req)) {
 858                d_drop(dentry);
 859                return PTR_ERR(req);
 860        }
 861        req->r_dentry = dget(dentry);
 862        req->r_num_caps = 2;
 863        req->r_old_dentry = dget(old_dentry);
 864        req->r_locked_dir = dir;
 865        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
 866        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
 867        /* release LINK_SHARED on source inode (mds will lock it) */
 868        req->r_old_inode_drop = CEPH_CAP_LINK_SHARED;
 869        err = ceph_mdsc_do_request(mdsc, dir, req);
 870        if (err) {
 871                d_drop(dentry);
 872        } else if (!req->r_reply_info.head->is_dentry) {
 873                ihold(d_inode(old_dentry));
 874                d_instantiate(dentry, d_inode(old_dentry));
 875        }
 876        ceph_mdsc_put_request(req);
 877        return err;
 878}
 879
 880/*
 881 * For a soon-to-be unlinked file, drop the AUTH_RDCACHE caps.  If it
 882 * looks like the link count will hit 0, drop any other caps (other
 883 * than PIN) we don't specifically want (due to the file still being
 884 * open).
 885 */
 886static int drop_caps_for_unlink(struct inode *inode)
 887{
 888        struct ceph_inode_info *ci = ceph_inode(inode);
 889        int drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
 890
 891        spin_lock(&ci->i_ceph_lock);
 892        if (inode->i_nlink == 1) {
 893                drop |= ~(__ceph_caps_wanted(ci) | CEPH_CAP_PIN);
 894                ci->i_ceph_flags |= CEPH_I_NODELAY;
 895        }
 896        spin_unlock(&ci->i_ceph_lock);
 897        return drop;
 898}
 899
 900/*
 901 * rmdir and unlink are differ only by the metadata op code
 902 */
 903static int ceph_unlink(struct inode *dir, struct dentry *dentry)
 904{
 905        struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
 906        struct ceph_mds_client *mdsc = fsc->mdsc;
 907        struct inode *inode = d_inode(dentry);
 908        struct ceph_mds_request *req;
 909        int err = -EROFS;
 910        int op;
 911
 912        if (ceph_snap(dir) == CEPH_SNAPDIR) {
 913                /* rmdir .snap/foo is RMSNAP */
 914                dout("rmsnap dir %p '%pd' dn %p\n", dir, dentry, dentry);
 915                op = CEPH_MDS_OP_RMSNAP;
 916        } else if (ceph_snap(dir) == CEPH_NOSNAP) {
 917                dout("unlink/rmdir dir %p dn %p inode %p\n",
 918                     dir, dentry, inode);
 919                op = d_is_dir(dentry) ?
 920                        CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK;
 921        } else
 922                goto out;
 923        req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
 924        if (IS_ERR(req)) {
 925                err = PTR_ERR(req);
 926                goto out;
 927        }
 928        req->r_dentry = dget(dentry);
 929        req->r_num_caps = 2;
 930        req->r_locked_dir = dir;
 931        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
 932        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
 933        req->r_inode_drop = drop_caps_for_unlink(inode);
 934        err = ceph_mdsc_do_request(mdsc, dir, req);
 935        if (!err && !req->r_reply_info.head->is_dentry)
 936                d_delete(dentry);
 937        ceph_mdsc_put_request(req);
 938out:
 939        return err;
 940}
 941
 942static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
 943                       struct inode *new_dir, struct dentry *new_dentry)
 944{
 945        struct ceph_fs_client *fsc = ceph_sb_to_client(old_dir->i_sb);
 946        struct ceph_mds_client *mdsc = fsc->mdsc;
 947        struct ceph_mds_request *req;
 948        int op = CEPH_MDS_OP_RENAME;
 949        int err;
 950
 951        if (ceph_snap(old_dir) != ceph_snap(new_dir))
 952                return -EXDEV;
 953        if (ceph_snap(old_dir) != CEPH_NOSNAP) {
 954                if (old_dir == new_dir && ceph_snap(old_dir) == CEPH_SNAPDIR)
 955                        op = CEPH_MDS_OP_RENAMESNAP;
 956                else
 957                        return -EROFS;
 958        }
 959        dout("rename dir %p dentry %p to dir %p dentry %p\n",
 960             old_dir, old_dentry, new_dir, new_dentry);
 961        req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
 962        if (IS_ERR(req))
 963                return PTR_ERR(req);
 964        ihold(old_dir);
 965        req->r_dentry = dget(new_dentry);
 966        req->r_num_caps = 2;
 967        req->r_old_dentry = dget(old_dentry);
 968        req->r_old_dentry_dir = old_dir;
 969        req->r_locked_dir = new_dir;
 970        req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED;
 971        req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL;
 972        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
 973        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
 974        /* release LINK_RDCACHE on source inode (mds will lock it) */
 975        req->r_old_inode_drop = CEPH_CAP_LINK_SHARED;
 976        if (d_really_is_positive(new_dentry))
 977                req->r_inode_drop = drop_caps_for_unlink(d_inode(new_dentry));
 978        err = ceph_mdsc_do_request(mdsc, old_dir, req);
 979        if (!err && !req->r_reply_info.head->is_dentry) {
 980                /*
 981                 * Normally d_move() is done by fill_trace (called by
 982                 * do_request, above).  If there is no trace, we need
 983                 * to do it here.
 984                 */
 985
 986                /* d_move screws up sibling dentries' offsets */
 987                ceph_dir_clear_complete(old_dir);
 988                ceph_dir_clear_complete(new_dir);
 989
 990                d_move(old_dentry, new_dentry);
 991
 992                /* ensure target dentry is invalidated, despite
 993                   rehashing bug in vfs_rename_dir */
 994                ceph_invalidate_dentry_lease(new_dentry);
 995        }
 996        ceph_mdsc_put_request(req);
 997        return err;
 998}
 999
1000/*
1001 * Ensure a dentry lease will no longer revalidate.
1002 */
1003void ceph_invalidate_dentry_lease(struct dentry *dentry)
1004{
1005        spin_lock(&dentry->d_lock);
1006        dentry->d_time = jiffies;
1007        ceph_dentry(dentry)->lease_shared_gen = 0;
1008        spin_unlock(&dentry->d_lock);
1009}
1010
1011/*
1012 * Check if dentry lease is valid.  If not, delete the lease.  Try to
1013 * renew if the least is more than half up.
1014 */
1015static int dentry_lease_is_valid(struct dentry *dentry)
1016{
1017        struct ceph_dentry_info *di;
1018        struct ceph_mds_session *s;
1019        int valid = 0;
1020        u32 gen;
1021        unsigned long ttl;
1022        struct ceph_mds_session *session = NULL;
1023        struct inode *dir = NULL;
1024        u32 seq = 0;
1025
1026        spin_lock(&dentry->d_lock);
1027        di = ceph_dentry(dentry);
1028        if (di->lease_session) {
1029                s = di->lease_session;
1030                spin_lock(&s->s_gen_ttl_lock);
1031                gen = s->s_cap_gen;
1032                ttl = s->s_cap_ttl;
1033                spin_unlock(&s->s_gen_ttl_lock);
1034
1035                if (di->lease_gen == gen &&
1036                    time_before(jiffies, dentry->d_time) &&
1037                    time_before(jiffies, ttl)) {
1038                        valid = 1;
1039                        if (di->lease_renew_after &&
1040                            time_after(jiffies, di->lease_renew_after)) {
1041                                /* we should renew */
1042                                dir = d_inode(dentry->d_parent);
1043                                session = ceph_get_mds_session(s);
1044                                seq = di->lease_seq;
1045                                di->lease_renew_after = 0;
1046                                di->lease_renew_from = jiffies;
1047                        }
1048                }
1049        }
1050        spin_unlock(&dentry->d_lock);
1051
1052        if (session) {
1053                ceph_mdsc_lease_send_msg(session, dir, dentry,
1054                                         CEPH_MDS_LEASE_RENEW, seq);
1055                ceph_put_mds_session(session);
1056        }
1057        dout("dentry_lease_is_valid - dentry %p = %d\n", dentry, valid);
1058        return valid;
1059}
1060
1061/*
1062 * Check if directory-wide content lease/cap is valid.
1063 */
1064static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
1065{
1066        struct ceph_inode_info *ci = ceph_inode(dir);
1067        struct ceph_dentry_info *di = ceph_dentry(dentry);
1068        int valid = 0;
1069
1070        spin_lock(&ci->i_ceph_lock);
1071        if (ci->i_shared_gen == di->lease_shared_gen)
1072                valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1);
1073        spin_unlock(&ci->i_ceph_lock);
1074        dout("dir_lease_is_valid dir %p v%u dentry %p v%u = %d\n",
1075             dir, (unsigned)ci->i_shared_gen, dentry,
1076             (unsigned)di->lease_shared_gen, valid);
1077        return valid;
1078}
1079
1080/*
1081 * Check if cached dentry can be trusted.
1082 */
1083static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
1084{
1085        int valid = 0;
1086        struct dentry *parent;
1087        struct inode *dir;
1088
1089        if (flags & LOOKUP_RCU)
1090                return -ECHILD;
1091
1092        dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry,
1093             dentry, d_inode(dentry), ceph_dentry(dentry)->offset);
1094
1095        parent = dget_parent(dentry);
1096        dir = d_inode(parent);
1097
1098        /* always trust cached snapped dentries, snapdir dentry */
1099        if (ceph_snap(dir) != CEPH_NOSNAP) {
1100                dout("d_revalidate %p '%pd' inode %p is SNAPPED\n", dentry,
1101                     dentry, d_inode(dentry));
1102                valid = 1;
1103        } else if (d_really_is_positive(dentry) &&
1104                   ceph_snap(d_inode(dentry)) == CEPH_SNAPDIR) {
1105                valid = 1;
1106        } else if (dentry_lease_is_valid(dentry) ||
1107                   dir_lease_is_valid(dir, dentry)) {
1108                if (d_really_is_positive(dentry))
1109                        valid = ceph_is_any_caps(d_inode(dentry));
1110                else
1111                        valid = 1;
1112        }
1113
1114        if (!valid) {
1115                struct ceph_mds_client *mdsc =
1116                        ceph_sb_to_client(dir->i_sb)->mdsc;
1117                struct ceph_mds_request *req;
1118                int op, mask, err;
1119
1120                op = ceph_snap(dir) == CEPH_SNAPDIR ?
1121                        CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
1122                req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
1123                if (!IS_ERR(req)) {
1124                        req->r_dentry = dget(dentry);
1125                        req->r_num_caps = 2;
1126
1127                        mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
1128                        if (ceph_security_xattr_wanted(dir))
1129                                mask |= CEPH_CAP_XATTR_SHARED;
1130                        req->r_args.getattr.mask = mask;
1131
1132                        req->r_locked_dir = dir;
1133                        err = ceph_mdsc_do_request(mdsc, NULL, req);
1134                        if (err == 0 || err == -ENOENT) {
1135                                if (dentry == req->r_dentry) {
1136                                        valid = !d_unhashed(dentry);
1137                                } else {
1138                                        d_invalidate(req->r_dentry);
1139                                        err = -EAGAIN;
1140                                }
1141                        }
1142                        ceph_mdsc_put_request(req);
1143                        dout("d_revalidate %p lookup result=%d\n",
1144                             dentry, err);
1145                }
1146        }
1147
1148        dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
1149        if (valid) {
1150                ceph_dentry_lru_touch(dentry);
1151        } else {
1152                ceph_dir_clear_complete(dir);
1153        }
1154
1155        dput(parent);
1156        return valid;
1157}
1158
1159/*
1160 * Release our ceph_dentry_info.
1161 */
1162static void ceph_d_release(struct dentry *dentry)
1163{
1164        struct ceph_dentry_info *di = ceph_dentry(dentry);
1165
1166        dout("d_release %p\n", dentry);
1167        ceph_dentry_lru_del(dentry);
1168        if (di->lease_session)
1169                ceph_put_mds_session(di->lease_session);
1170        kmem_cache_free(ceph_dentry_cachep, di);
1171        dentry->d_fsdata = NULL;
1172}
1173
1174static int ceph_snapdir_d_revalidate(struct dentry *dentry,
1175                                          unsigned int flags)
1176{
1177        /*
1178         * Eventually, we'll want to revalidate snapped metadata
1179         * too... probably...
1180         */
1181        return 1;
1182}
1183
1184/*
1185 * When the VFS prunes a dentry from the cache, we need to clear the
1186 * complete flag on the parent directory.
1187 *
1188 * Called under dentry->d_lock.
1189 */
1190static void ceph_d_prune(struct dentry *dentry)
1191{
1192        dout("ceph_d_prune %p\n", dentry);
1193
1194        /* do we have a valid parent? */
1195        if (IS_ROOT(dentry))
1196                return;
1197
1198        /* if we are not hashed, we don't affect dir's completeness */
1199        if (d_unhashed(dentry))
1200                return;
1201
1202        /*
1203         * we hold d_lock, so d_parent is stable, and d_fsdata is never
1204         * cleared until d_release
1205         */
1206        ceph_dir_clear_complete(d_inode(dentry->d_parent));
1207}
1208
1209/*
1210 * read() on a dir.  This weird interface hack only works if mounted
1211 * with '-o dirstat'.
1212 */
1213static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size,
1214                             loff_t *ppos)
1215{
1216        struct ceph_file_info *cf = file->private_data;
1217        struct inode *inode = file_inode(file);
1218        struct ceph_inode_info *ci = ceph_inode(inode);
1219        int left;
1220        const int bufsize = 1024;
1221
1222        if (!ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb), DIRSTAT))
1223                return -EISDIR;
1224
1225        if (!cf->dir_info) {
1226                cf->dir_info = kmalloc(bufsize, GFP_KERNEL);
1227                if (!cf->dir_info)
1228                        return -ENOMEM;
1229                cf->dir_info_len =
1230                        snprintf(cf->dir_info, bufsize,
1231                                "entries:   %20lld\n"
1232                                " files:    %20lld\n"
1233                                " subdirs:  %20lld\n"
1234                                "rentries:  %20lld\n"
1235                                " rfiles:   %20lld\n"
1236                                " rsubdirs: %20lld\n"
1237                                "rbytes:    %20lld\n"
1238                                "rctime:    %10ld.%09ld\n",
1239                                ci->i_files + ci->i_subdirs,
1240                                ci->i_files,
1241                                ci->i_subdirs,
1242                                ci->i_rfiles + ci->i_rsubdirs,
1243                                ci->i_rfiles,
1244                                ci->i_rsubdirs,
1245                                ci->i_rbytes,
1246                                (long)ci->i_rctime.tv_sec,
1247                                (long)ci->i_rctime.tv_nsec);
1248        }
1249
1250        if (*ppos >= cf->dir_info_len)
1251                return 0;
1252        size = min_t(unsigned, size, cf->dir_info_len-*ppos);
1253        left = copy_to_user(buf, cf->dir_info + *ppos, size);
1254        if (left == size)
1255                return -EFAULT;
1256        *ppos += (size - left);
1257        return size - left;
1258}
1259
1260/*
1261 * We maintain a private dentry LRU.
1262 *
1263 * FIXME: this needs to be changed to a per-mds lru to be useful.
1264 */
1265void ceph_dentry_lru_add(struct dentry *dn)
1266{
1267        struct ceph_dentry_info *di = ceph_dentry(dn);
1268        struct ceph_mds_client *mdsc;
1269
1270        dout("dentry_lru_add %p %p '%pd'\n", di, dn, dn);
1271        mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
1272        spin_lock(&mdsc->dentry_lru_lock);
1273        list_add_tail(&di->lru, &mdsc->dentry_lru);
1274        mdsc->num_dentry++;
1275        spin_unlock(&mdsc->dentry_lru_lock);
1276}
1277
1278void ceph_dentry_lru_touch(struct dentry *dn)
1279{
1280        struct ceph_dentry_info *di = ceph_dentry(dn);
1281        struct ceph_mds_client *mdsc;
1282
1283        dout("dentry_lru_touch %p %p '%pd' (offset %lld)\n", di, dn, dn,
1284             di->offset);
1285        mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
1286        spin_lock(&mdsc->dentry_lru_lock);
1287        list_move_tail(&di->lru, &mdsc->dentry_lru);
1288        spin_unlock(&mdsc->dentry_lru_lock);
1289}
1290
1291void ceph_dentry_lru_del(struct dentry *dn)
1292{
1293        struct ceph_dentry_info *di = ceph_dentry(dn);
1294        struct ceph_mds_client *mdsc;
1295
1296        dout("dentry_lru_del %p %p '%pd'\n", di, dn, dn);
1297        mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
1298        spin_lock(&mdsc->dentry_lru_lock);
1299        list_del_init(&di->lru);
1300        mdsc->num_dentry--;
1301        spin_unlock(&mdsc->dentry_lru_lock);
1302}
1303
1304/*
1305 * Return name hash for a given dentry.  This is dependent on
1306 * the parent directory's hash function.
1307 */
1308unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn)
1309{
1310        struct ceph_inode_info *dci = ceph_inode(dir);
1311
1312        switch (dci->i_dir_layout.dl_dir_hash) {
1313        case 0: /* for backward compat */
1314        case CEPH_STR_HASH_LINUX:
1315                return dn->d_name.hash;
1316
1317        default:
1318                return ceph_str_hash(dci->i_dir_layout.dl_dir_hash,
1319                                     dn->d_name.name, dn->d_name.len);
1320        }
1321}
1322
1323const struct file_operations ceph_dir_fops = {
1324        .read = ceph_read_dir,
1325        .iterate = ceph_readdir,
1326        .llseek = ceph_dir_llseek,
1327        .open = ceph_open,
1328        .release = ceph_release,
1329        .unlocked_ioctl = ceph_ioctl,
1330        .fsync = ceph_fsync,
1331};
1332
1333const struct file_operations ceph_snapdir_fops = {
1334        .iterate = ceph_readdir,
1335        .llseek = ceph_dir_llseek,
1336        .open = ceph_open,
1337        .release = ceph_release,
1338};
1339
1340const struct inode_operations ceph_dir_iops = {
1341        .lookup = ceph_lookup,
1342        .permission = ceph_permission,
1343        .getattr = ceph_getattr,
1344        .setattr = ceph_setattr,
1345        .setxattr = ceph_setxattr,
1346        .getxattr = ceph_getxattr,
1347        .listxattr = ceph_listxattr,
1348        .removexattr = ceph_removexattr,
1349        .get_acl = ceph_get_acl,
1350        .set_acl = ceph_set_acl,
1351        .mknod = ceph_mknod,
1352        .symlink = ceph_symlink,
1353        .mkdir = ceph_mkdir,
1354        .link = ceph_link,
1355        .unlink = ceph_unlink,
1356        .rmdir = ceph_unlink,
1357        .rename = ceph_rename,
1358        .create = ceph_create,
1359        .atomic_open = ceph_atomic_open,
1360};
1361
1362const struct inode_operations ceph_snapdir_iops = {
1363        .lookup = ceph_lookup,
1364        .permission = ceph_permission,
1365        .getattr = ceph_getattr,
1366        .mkdir = ceph_mkdir,
1367        .rmdir = ceph_unlink,
1368        .rename = ceph_rename,
1369};
1370
1371const struct dentry_operations ceph_dentry_ops = {
1372        .d_revalidate = ceph_d_revalidate,
1373        .d_release = ceph_d_release,
1374        .d_prune = ceph_d_prune,
1375};
1376
1377const struct dentry_operations ceph_snapdir_dentry_ops = {
1378        .d_revalidate = ceph_snapdir_d_revalidate,
1379        .d_release = ceph_d_release,
1380};
1381
1382const struct dentry_operations ceph_snap_dentry_ops = {
1383        .d_release = ceph_d_release,
1384        .d_prune = ceph_d_prune,
1385};
1386