linux/fs/ceph/snap.c
<<
>>
Prefs
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/sort.h>
   4#include <linux/slab.h>
   5
   6#include "super.h"
   7#include "mds_client.h"
   8
   9#include <linux/ceph/decode.h>
  10
  11/*
  12 * Snapshots in ceph are driven in large part by cooperation from the
  13 * client.  In contrast to local file systems or file servers that
  14 * implement snapshots at a single point in the system, ceph's
  15 * distributed access to storage requires clients to help decide
  16 * whether a write logically occurs before or after a recently created
  17 * snapshot.
  18 *
  19 * This provides a perfect instantanous client-wide snapshot.  Between
  20 * clients, however, snapshots may appear to be applied at slightly
  21 * different points in time, depending on delays in delivering the
  22 * snapshot notification.
  23 *
  24 * Snapshots are _not_ file system-wide.  Instead, each snapshot
  25 * applies to the subdirectory nested beneath some directory.  This
  26 * effectively divides the hierarchy into multiple "realms," where all
  27 * of the files contained by each realm share the same set of
  28 * snapshots.  An individual realm's snap set contains snapshots
  29 * explicitly created on that realm, as well as any snaps in its
  30 * parent's snap set _after_ the point at which the parent became it's
  31 * parent (due to, say, a rename).  Similarly, snaps from prior parents
  32 * during the time intervals during which they were the parent are included.
  33 *
  34 * The client is spared most of this detail, fortunately... it must only
  35 * maintains a hierarchy of realms reflecting the current parent/child
  36 * realm relationship, and for each realm has an explicit list of snaps
  37 * inherited from prior parents.
  38 *
  39 * A snap_realm struct is maintained for realms containing every inode
  40 * with an open cap in the system.  (The needed snap realm information is
  41 * provided by the MDS whenever a cap is issued, i.e., on open.)  A 'seq'
  42 * version number is used to ensure that as realm parameters change (new
  43 * snapshot, new parent, etc.) the client's realm hierarchy is updated.
  44 *
  45 * The realm hierarchy drives the generation of a 'snap context' for each
  46 * realm, which simply lists the resulting set of snaps for the realm.  This
  47 * is attached to any writes sent to OSDs.
  48 */
  49/*
  50 * Unfortunately error handling is a bit mixed here.  If we get a snap
  51 * update, but don't have enough memory to update our realm hierarchy,
  52 * it's not clear what we can do about it (besides complaining to the
  53 * console).
  54 */
  55
  56
  57/*
  58 * increase ref count for the realm
  59 *
  60 * caller must hold snap_rwsem for write.
  61 */
  62void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
  63                         struct ceph_snap_realm *realm)
  64{
  65        dout("get_realm %p %d -> %d\n", realm,
  66             atomic_read(&realm->nref), atomic_read(&realm->nref)+1);
  67        /*
  68         * since we _only_ increment realm refs or empty the empty
  69         * list with snap_rwsem held, adjusting the empty list here is
  70         * safe.  we do need to protect against concurrent empty list
  71         * additions, however.
  72         */
  73        if (atomic_inc_return(&realm->nref) == 1) {
  74                spin_lock(&mdsc->snap_empty_lock);
  75                list_del_init(&realm->empty_item);
  76                spin_unlock(&mdsc->snap_empty_lock);
  77        }
  78}
  79
  80static void __insert_snap_realm(struct rb_root *root,
  81                                struct ceph_snap_realm *new)
  82{
  83        struct rb_node **p = &root->rb_node;
  84        struct rb_node *parent = NULL;
  85        struct ceph_snap_realm *r = NULL;
  86
  87        while (*p) {
  88                parent = *p;
  89                r = rb_entry(parent, struct ceph_snap_realm, node);
  90                if (new->ino < r->ino)
  91                        p = &(*p)->rb_left;
  92                else if (new->ino > r->ino)
  93                        p = &(*p)->rb_right;
  94                else
  95                        BUG();
  96        }
  97
  98        rb_link_node(&new->node, parent, p);
  99        rb_insert_color(&new->node, root);
 100}
 101
 102/*
 103 * create and get the realm rooted at @ino and bump its ref count.
 104 *
 105 * caller must hold snap_rwsem for write.
 106 */
 107static struct ceph_snap_realm *ceph_create_snap_realm(
 108        struct ceph_mds_client *mdsc,
 109        u64 ino)
 110{
 111        struct ceph_snap_realm *realm;
 112
 113        realm = kzalloc(sizeof(*realm), GFP_NOFS);
 114        if (!realm)
 115                return ERR_PTR(-ENOMEM);
 116
 117        atomic_set(&realm->nref, 1);    /* for caller */
 118        realm->ino = ino;
 119        INIT_LIST_HEAD(&realm->children);
 120        INIT_LIST_HEAD(&realm->child_item);
 121        INIT_LIST_HEAD(&realm->empty_item);
 122        INIT_LIST_HEAD(&realm->dirty_item);
 123        INIT_LIST_HEAD(&realm->inodes_with_caps);
 124        spin_lock_init(&realm->inodes_with_caps_lock);
 125        __insert_snap_realm(&mdsc->snap_realms, realm);
 126        dout("create_snap_realm %llx %p\n", realm->ino, realm);
 127        return realm;
 128}
 129
 130/*
 131 * lookup the realm rooted at @ino.
 132 *
 133 * caller must hold snap_rwsem for write.
 134 */
 135static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc,
 136                                                   u64 ino)
 137{
 138        struct rb_node *n = mdsc->snap_realms.rb_node;
 139        struct ceph_snap_realm *r;
 140
 141        while (n) {
 142                r = rb_entry(n, struct ceph_snap_realm, node);
 143                if (ino < r->ino)
 144                        n = n->rb_left;
 145                else if (ino > r->ino)
 146                        n = n->rb_right;
 147                else {
 148                        dout("lookup_snap_realm %llx %p\n", r->ino, r);
 149                        return r;
 150                }
 151        }
 152        return NULL;
 153}
 154
 155struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
 156                                               u64 ino)
 157{
 158        struct ceph_snap_realm *r;
 159        r = __lookup_snap_realm(mdsc, ino);
 160        if (r)
 161                ceph_get_snap_realm(mdsc, r);
 162        return r;
 163}
 164
 165static void __put_snap_realm(struct ceph_mds_client *mdsc,
 166                             struct ceph_snap_realm *realm);
 167
 168/*
 169 * called with snap_rwsem (write)
 170 */
 171static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
 172                                 struct ceph_snap_realm *realm)
 173{
 174        dout("__destroy_snap_realm %p %llx\n", realm, realm->ino);
 175
 176        rb_erase(&realm->node, &mdsc->snap_realms);
 177
 178        if (realm->parent) {
 179                list_del_init(&realm->child_item);
 180                __put_snap_realm(mdsc, realm->parent);
 181        }
 182
 183        kfree(realm->prior_parent_snaps);
 184        kfree(realm->snaps);
 185        ceph_put_snap_context(realm->cached_context);
 186        kfree(realm);
 187}
 188
 189/*
 190 * caller holds snap_rwsem (write)
 191 */
 192static void __put_snap_realm(struct ceph_mds_client *mdsc,
 193                             struct ceph_snap_realm *realm)
 194{
 195        dout("__put_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
 196             atomic_read(&realm->nref), atomic_read(&realm->nref)-1);
 197        if (atomic_dec_and_test(&realm->nref))
 198                __destroy_snap_realm(mdsc, realm);
 199}
 200
 201/*
 202 * caller needn't hold any locks
 203 */
 204void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
 205                         struct ceph_snap_realm *realm)
 206{
 207        dout("put_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
 208             atomic_read(&realm->nref), atomic_read(&realm->nref)-1);
 209        if (!atomic_dec_and_test(&realm->nref))
 210                return;
 211
 212        if (down_write_trylock(&mdsc->snap_rwsem)) {
 213                __destroy_snap_realm(mdsc, realm);
 214                up_write(&mdsc->snap_rwsem);
 215        } else {
 216                spin_lock(&mdsc->snap_empty_lock);
 217                list_add(&realm->empty_item, &mdsc->snap_empty);
 218                spin_unlock(&mdsc->snap_empty_lock);
 219        }
 220}
 221
 222/*
 223 * Clean up any realms whose ref counts have dropped to zero.  Note
 224 * that this does not include realms who were created but not yet
 225 * used.
 226 *
 227 * Called under snap_rwsem (write)
 228 */
 229static void __cleanup_empty_realms(struct ceph_mds_client *mdsc)
 230{
 231        struct ceph_snap_realm *realm;
 232
 233        spin_lock(&mdsc->snap_empty_lock);
 234        while (!list_empty(&mdsc->snap_empty)) {
 235                realm = list_first_entry(&mdsc->snap_empty,
 236                                   struct ceph_snap_realm, empty_item);
 237                list_del(&realm->empty_item);
 238                spin_unlock(&mdsc->snap_empty_lock);
 239                __destroy_snap_realm(mdsc, realm);
 240                spin_lock(&mdsc->snap_empty_lock);
 241        }
 242        spin_unlock(&mdsc->snap_empty_lock);
 243}
 244
 245void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc)
 246{
 247        down_write(&mdsc->snap_rwsem);
 248        __cleanup_empty_realms(mdsc);
 249        up_write(&mdsc->snap_rwsem);
 250}
 251
 252/*
 253 * adjust the parent realm of a given @realm.  adjust child list, and parent
 254 * pointers, and ref counts appropriately.
 255 *
 256 * return true if parent was changed, 0 if unchanged, <0 on error.
 257 *
 258 * caller must hold snap_rwsem for write.
 259 */
 260static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc,
 261                                    struct ceph_snap_realm *realm,
 262                                    u64 parentino)
 263{
 264        struct ceph_snap_realm *parent;
 265
 266        if (realm->parent_ino == parentino)
 267                return 0;
 268
 269        parent = ceph_lookup_snap_realm(mdsc, parentino);
 270        if (!parent) {
 271                parent = ceph_create_snap_realm(mdsc, parentino);
 272                if (IS_ERR(parent))
 273                        return PTR_ERR(parent);
 274        }
 275        dout("adjust_snap_realm_parent %llx %p: %llx %p -> %llx %p\n",
 276             realm->ino, realm, realm->parent_ino, realm->parent,
 277             parentino, parent);
 278        if (realm->parent) {
 279                list_del_init(&realm->child_item);
 280                ceph_put_snap_realm(mdsc, realm->parent);
 281        }
 282        realm->parent_ino = parentino;
 283        realm->parent = parent;
 284        list_add(&realm->child_item, &parent->children);
 285        return 1;
 286}
 287
 288
 289static int cmpu64_rev(const void *a, const void *b)
 290{
 291        if (*(u64 *)a < *(u64 *)b)
 292                return 1;
 293        if (*(u64 *)a > *(u64 *)b)
 294                return -1;
 295        return 0;
 296}
 297
 298
 299/*
 300 * build the snap context for a given realm.
 301 */
 302static int build_snap_context(struct ceph_snap_realm *realm)
 303{
 304        struct ceph_snap_realm *parent = realm->parent;
 305        struct ceph_snap_context *snapc;
 306        int err = 0;
 307        u32 num = realm->num_prior_parent_snaps + realm->num_snaps;
 308
 309        /*
 310         * build parent context, if it hasn't been built.
 311         * conservatively estimate that all parent snaps might be
 312         * included by us.
 313         */
 314        if (parent) {
 315                if (!parent->cached_context) {
 316                        err = build_snap_context(parent);
 317                        if (err)
 318                                goto fail;
 319                }
 320                num += parent->cached_context->num_snaps;
 321        }
 322
 323        /* do i actually need to update?  not if my context seq
 324           matches realm seq, and my parents' does to.  (this works
 325           because we rebuild_snap_realms() works _downward_ in
 326           hierarchy after each update.) */
 327        if (realm->cached_context &&
 328            realm->cached_context->seq == realm->seq &&
 329            (!parent ||
 330             realm->cached_context->seq >= parent->cached_context->seq)) {
 331                dout("build_snap_context %llx %p: %p seq %lld (%u snaps)"
 332                     " (unchanged)\n",
 333                     realm->ino, realm, realm->cached_context,
 334                     realm->cached_context->seq,
 335                     (unsigned int) realm->cached_context->num_snaps);
 336                return 0;
 337        }
 338
 339        /* alloc new snap context */
 340        err = -ENOMEM;
 341        if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64))
 342                goto fail;
 343        snapc = ceph_create_snap_context(num, GFP_NOFS);
 344        if (!snapc)
 345                goto fail;
 346
 347        /* build (reverse sorted) snap vector */
 348        num = 0;
 349        snapc->seq = realm->seq;
 350        if (parent) {
 351                u32 i;
 352
 353                /* include any of parent's snaps occurring _after_ my
 354                   parent became my parent */
 355                for (i = 0; i < parent->cached_context->num_snaps; i++)
 356                        if (parent->cached_context->snaps[i] >=
 357                            realm->parent_since)
 358                                snapc->snaps[num++] =
 359                                        parent->cached_context->snaps[i];
 360                if (parent->cached_context->seq > snapc->seq)
 361                        snapc->seq = parent->cached_context->seq;
 362        }
 363        memcpy(snapc->snaps + num, realm->snaps,
 364               sizeof(u64)*realm->num_snaps);
 365        num += realm->num_snaps;
 366        memcpy(snapc->snaps + num, realm->prior_parent_snaps,
 367               sizeof(u64)*realm->num_prior_parent_snaps);
 368        num += realm->num_prior_parent_snaps;
 369
 370        sort(snapc->snaps, num, sizeof(u64), cmpu64_rev, NULL);
 371        snapc->num_snaps = num;
 372        dout("build_snap_context %llx %p: %p seq %lld (%u snaps)\n",
 373             realm->ino, realm, snapc, snapc->seq,
 374             (unsigned int) snapc->num_snaps);
 375
 376        ceph_put_snap_context(realm->cached_context);
 377        realm->cached_context = snapc;
 378        return 0;
 379
 380fail:
 381        /*
 382         * if we fail, clear old (incorrect) cached_context... hopefully
 383         * we'll have better luck building it later
 384         */
 385        if (realm->cached_context) {
 386                ceph_put_snap_context(realm->cached_context);
 387                realm->cached_context = NULL;
 388        }
 389        pr_err("build_snap_context %llx %p fail %d\n", realm->ino,
 390               realm, err);
 391        return err;
 392}
 393
 394/*
 395 * rebuild snap context for the given realm and all of its children.
 396 */
 397static void rebuild_snap_realms(struct ceph_snap_realm *realm)
 398{
 399        struct ceph_snap_realm *child;
 400
 401        dout("rebuild_snap_realms %llx %p\n", realm->ino, realm);
 402        build_snap_context(realm);
 403
 404        list_for_each_entry(child, &realm->children, child_item)
 405                rebuild_snap_realms(child);
 406}
 407
 408
 409/*
 410 * helper to allocate and decode an array of snapids.  free prior
 411 * instance, if any.
 412 */
 413static int dup_array(u64 **dst, __le64 *src, u32 num)
 414{
 415        u32 i;
 416
 417        kfree(*dst);
 418        if (num) {
 419                *dst = kcalloc(num, sizeof(u64), GFP_NOFS);
 420                if (!*dst)
 421                        return -ENOMEM;
 422                for (i = 0; i < num; i++)
 423                        (*dst)[i] = get_unaligned_le64(src + i);
 424        } else {
 425                *dst = NULL;
 426        }
 427        return 0;
 428}
 429
 430static bool has_new_snaps(struct ceph_snap_context *o,
 431                          struct ceph_snap_context *n)
 432{
 433        if (n->num_snaps == 0)
 434                return false;
 435        /* snaps are in descending order */
 436        return n->snaps[0] > o->seq;
 437}
 438
 439/*
 440 * When a snapshot is applied, the size/mtime inode metadata is queued
 441 * in a ceph_cap_snap (one for each snapshot) until writeback
 442 * completes and the metadata can be flushed back to the MDS.
 443 *
 444 * However, if a (sync) write is currently in-progress when we apply
 445 * the snapshot, we have to wait until the write succeeds or fails
 446 * (and a final size/mtime is known).  In this case the
 447 * cap_snap->writing = 1, and is said to be "pending."  When the write
 448 * finishes, we __ceph_finish_cap_snap().
 449 *
 450 * Caller must hold snap_rwsem for read (i.e., the realm topology won't
 451 * change).
 452 */
 453void ceph_queue_cap_snap(struct ceph_inode_info *ci)
 454{
 455        struct inode *inode = &ci->vfs_inode;
 456        struct ceph_cap_snap *capsnap;
 457        struct ceph_snap_context *old_snapc, *new_snapc;
 458        int used, dirty;
 459
 460        capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS);
 461        if (!capsnap) {
 462                pr_err("ENOMEM allocating ceph_cap_snap on %p\n", inode);
 463                return;
 464        }
 465
 466        spin_lock(&ci->i_ceph_lock);
 467        used = __ceph_caps_used(ci);
 468        dirty = __ceph_caps_dirty(ci);
 469
 470        old_snapc = ci->i_head_snapc;
 471        new_snapc = ci->i_snap_realm->cached_context;
 472
 473        /*
 474         * If there is a write in progress, treat that as a dirty Fw,
 475         * even though it hasn't completed yet; by the time we finish
 476         * up this capsnap it will be.
 477         */
 478        if (used & CEPH_CAP_FILE_WR)
 479                dirty |= CEPH_CAP_FILE_WR;
 480
 481        if (__ceph_have_pending_cap_snap(ci)) {
 482                /* there is no point in queuing multiple "pending" cap_snaps,
 483                   as no new writes are allowed to start when pending, so any
 484                   writes in progress now were started before the previous
 485                   cap_snap.  lucky us. */
 486                dout("queue_cap_snap %p already pending\n", inode);
 487                goto update_snapc;
 488        }
 489        if (ci->i_wrbuffer_ref_head == 0 &&
 490            !(dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))) {
 491                dout("queue_cap_snap %p nothing dirty|writing\n", inode);
 492                goto update_snapc;
 493        }
 494
 495        BUG_ON(!old_snapc);
 496
 497        /*
 498         * There is no need to send FLUSHSNAP message to MDS if there is
 499         * no new snapshot. But when there is dirty pages or on-going
 500         * writes, we still need to create cap_snap. cap_snap is needed
 501         * by the write path and page writeback path.
 502         *
 503         * also see ceph_try_drop_cap_snap()
 504         */
 505        if (has_new_snaps(old_snapc, new_snapc)) {
 506                if (dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))
 507                        capsnap->need_flush = true;
 508        } else {
 509                if (!(used & CEPH_CAP_FILE_WR) &&
 510                    ci->i_wrbuffer_ref_head == 0) {
 511                        dout("queue_cap_snap %p "
 512                             "no new_snap|dirty_page|writing\n", inode);
 513                        goto update_snapc;
 514                }
 515        }
 516
 517        dout("queue_cap_snap %p cap_snap %p queuing under %p %s %s\n",
 518             inode, capsnap, old_snapc, ceph_cap_string(dirty),
 519             capsnap->need_flush ? "" : "no_flush");
 520        ihold(inode);
 521
 522        atomic_set(&capsnap->nref, 1);
 523        capsnap->ci = ci;
 524        INIT_LIST_HEAD(&capsnap->ci_item);
 525        INIT_LIST_HEAD(&capsnap->flushing_item);
 526
 527        capsnap->follows = old_snapc->seq;
 528        capsnap->issued = __ceph_caps_issued(ci, NULL);
 529        capsnap->dirty = dirty;
 530
 531        capsnap->mode = inode->i_mode;
 532        capsnap->uid = inode->i_uid;
 533        capsnap->gid = inode->i_gid;
 534
 535        if (dirty & CEPH_CAP_XATTR_EXCL) {
 536                __ceph_build_xattrs_blob(ci);
 537                capsnap->xattr_blob =
 538                        ceph_buffer_get(ci->i_xattrs.blob);
 539                capsnap->xattr_version = ci->i_xattrs.version;
 540        } else {
 541                capsnap->xattr_blob = NULL;
 542                capsnap->xattr_version = 0;
 543        }
 544
 545        capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
 546
 547        /* dirty page count moved from _head to this cap_snap;
 548           all subsequent writes page dirties occur _after_ this
 549           snapshot. */
 550        capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
 551        ci->i_wrbuffer_ref_head = 0;
 552        capsnap->context = old_snapc;
 553        list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
 554        old_snapc = NULL;
 555
 556        if (used & CEPH_CAP_FILE_WR) {
 557                dout("queue_cap_snap %p cap_snap %p snapc %p"
 558                     " seq %llu used WR, now pending\n", inode,
 559                     capsnap, old_snapc, old_snapc->seq);
 560                capsnap->writing = 1;
 561        } else {
 562                /* note mtime, size NOW. */
 563                __ceph_finish_cap_snap(ci, capsnap);
 564        }
 565        capsnap = NULL;
 566
 567update_snapc:
 568        if (ci->i_head_snapc) {
 569                ci->i_head_snapc = ceph_get_snap_context(new_snapc);
 570                dout(" new snapc is %p\n", new_snapc);
 571        }
 572        spin_unlock(&ci->i_ceph_lock);
 573
 574        kfree(capsnap);
 575        ceph_put_snap_context(old_snapc);
 576}
 577
 578/*
 579 * Finalize the size, mtime for a cap_snap.. that is, settle on final values
 580 * to be used for the snapshot, to be flushed back to the mds.
 581 *
 582 * If capsnap can now be flushed, add to snap_flush list, and return 1.
 583 *
 584 * Caller must hold i_ceph_lock.
 585 */
 586int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
 587                            struct ceph_cap_snap *capsnap)
 588{
 589        struct inode *inode = &ci->vfs_inode;
 590        struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 591
 592        BUG_ON(capsnap->writing);
 593        capsnap->size = inode->i_size;
 594        capsnap->mtime = inode->i_mtime;
 595        capsnap->atime = inode->i_atime;
 596        capsnap->ctime = inode->i_ctime;
 597        capsnap->time_warp_seq = ci->i_time_warp_seq;
 598        if (capsnap->dirty_pages) {
 599                dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu "
 600                     "still has %d dirty pages\n", inode, capsnap,
 601                     capsnap->context, capsnap->context->seq,
 602                     ceph_cap_string(capsnap->dirty), capsnap->size,
 603                     capsnap->dirty_pages);
 604                return 0;
 605        }
 606        dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu\n",
 607             inode, capsnap, capsnap->context,
 608             capsnap->context->seq, ceph_cap_string(capsnap->dirty),
 609             capsnap->size);
 610
 611        spin_lock(&mdsc->snap_flush_lock);
 612        list_add_tail(&ci->i_snap_flush_item, &mdsc->snap_flush_list);
 613        spin_unlock(&mdsc->snap_flush_lock);
 614        return 1;  /* caller may want to ceph_flush_snaps */
 615}
 616
 617/*
 618 * Queue cap_snaps for snap writeback for this realm and its children.
 619 * Called under snap_rwsem, so realm topology won't change.
 620 */
 621static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
 622{
 623        struct ceph_inode_info *ci;
 624        struct inode *lastinode = NULL;
 625        struct ceph_snap_realm *child;
 626
 627        dout("queue_realm_cap_snaps %p %llx inodes\n", realm, realm->ino);
 628
 629        spin_lock(&realm->inodes_with_caps_lock);
 630        list_for_each_entry(ci, &realm->inodes_with_caps,
 631                            i_snap_realm_item) {
 632                struct inode *inode = igrab(&ci->vfs_inode);
 633                if (!inode)
 634                        continue;
 635                spin_unlock(&realm->inodes_with_caps_lock);
 636                iput(lastinode);
 637                lastinode = inode;
 638                ceph_queue_cap_snap(ci);
 639                spin_lock(&realm->inodes_with_caps_lock);
 640        }
 641        spin_unlock(&realm->inodes_with_caps_lock);
 642        iput(lastinode);
 643
 644        list_for_each_entry(child, &realm->children, child_item) {
 645                dout("queue_realm_cap_snaps %p %llx queue child %p %llx\n",
 646                     realm, realm->ino, child, child->ino);
 647                list_del_init(&child->dirty_item);
 648                list_add(&child->dirty_item, &realm->dirty_item);
 649        }
 650
 651        list_del_init(&realm->dirty_item);
 652        dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino);
 653}
 654
 655/*
 656 * Parse and apply a snapblob "snap trace" from the MDS.  This specifies
 657 * the snap realm parameters from a given realm and all of its ancestors,
 658 * up to the root.
 659 *
 660 * Caller must hold snap_rwsem for write.
 661 */
 662int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
 663                           void *p, void *e, bool deletion,
 664                           struct ceph_snap_realm **realm_ret)
 665{
 666        struct ceph_mds_snap_realm *ri;    /* encoded */
 667        __le64 *snaps;                     /* encoded */
 668        __le64 *prior_parent_snaps;        /* encoded */
 669        struct ceph_snap_realm *realm = NULL;
 670        struct ceph_snap_realm *first_realm = NULL;
 671        int invalidate = 0;
 672        int err = -ENOMEM;
 673        LIST_HEAD(dirty_realms);
 674
 675        dout("update_snap_trace deletion=%d\n", deletion);
 676more:
 677        ceph_decode_need(&p, e, sizeof(*ri), bad);
 678        ri = p;
 679        p += sizeof(*ri);
 680        ceph_decode_need(&p, e, sizeof(u64)*(le32_to_cpu(ri->num_snaps) +
 681                            le32_to_cpu(ri->num_prior_parent_snaps)), bad);
 682        snaps = p;
 683        p += sizeof(u64) * le32_to_cpu(ri->num_snaps);
 684        prior_parent_snaps = p;
 685        p += sizeof(u64) * le32_to_cpu(ri->num_prior_parent_snaps);
 686
 687        realm = ceph_lookup_snap_realm(mdsc, le64_to_cpu(ri->ino));
 688        if (!realm) {
 689                realm = ceph_create_snap_realm(mdsc, le64_to_cpu(ri->ino));
 690                if (IS_ERR(realm)) {
 691                        err = PTR_ERR(realm);
 692                        goto fail;
 693                }
 694        }
 695
 696        /* ensure the parent is correct */
 697        err = adjust_snap_realm_parent(mdsc, realm, le64_to_cpu(ri->parent));
 698        if (err < 0)
 699                goto fail;
 700        invalidate += err;
 701
 702        if (le64_to_cpu(ri->seq) > realm->seq) {
 703                dout("update_snap_trace updating %llx %p %lld -> %lld\n",
 704                     realm->ino, realm, realm->seq, le64_to_cpu(ri->seq));
 705                /* update realm parameters, snap lists */
 706                realm->seq = le64_to_cpu(ri->seq);
 707                realm->created = le64_to_cpu(ri->created);
 708                realm->parent_since = le64_to_cpu(ri->parent_since);
 709
 710                realm->num_snaps = le32_to_cpu(ri->num_snaps);
 711                err = dup_array(&realm->snaps, snaps, realm->num_snaps);
 712                if (err < 0)
 713                        goto fail;
 714
 715                realm->num_prior_parent_snaps =
 716                        le32_to_cpu(ri->num_prior_parent_snaps);
 717                err = dup_array(&realm->prior_parent_snaps, prior_parent_snaps,
 718                                realm->num_prior_parent_snaps);
 719                if (err < 0)
 720                        goto fail;
 721
 722                /* queue realm for cap_snap creation */
 723                list_add(&realm->dirty_item, &dirty_realms);
 724                if (realm->seq > mdsc->last_snap_seq)
 725                        mdsc->last_snap_seq = realm->seq;
 726
 727                invalidate = 1;
 728        } else if (!realm->cached_context) {
 729                dout("update_snap_trace %llx %p seq %lld new\n",
 730                     realm->ino, realm, realm->seq);
 731                invalidate = 1;
 732        } else {
 733                dout("update_snap_trace %llx %p seq %lld unchanged\n",
 734                     realm->ino, realm, realm->seq);
 735        }
 736
 737        dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino,
 738             realm, invalidate, p, e);
 739
 740        /* invalidate when we reach the _end_ (root) of the trace */
 741        if (invalidate && p >= e)
 742                rebuild_snap_realms(realm);
 743
 744        if (!first_realm)
 745                first_realm = realm;
 746        else
 747                ceph_put_snap_realm(mdsc, realm);
 748
 749        if (p < e)
 750                goto more;
 751
 752        /*
 753         * queue cap snaps _after_ we've built the new snap contexts,
 754         * so that i_head_snapc can be set appropriately.
 755         */
 756        while (!list_empty(&dirty_realms)) {
 757                realm = list_first_entry(&dirty_realms, struct ceph_snap_realm,
 758                                         dirty_item);
 759                queue_realm_cap_snaps(realm);
 760        }
 761
 762        if (realm_ret)
 763                *realm_ret = first_realm;
 764        else
 765                ceph_put_snap_realm(mdsc, first_realm);
 766
 767        __cleanup_empty_realms(mdsc);
 768        return 0;
 769
 770bad:
 771        err = -EINVAL;
 772fail:
 773        if (realm && !IS_ERR(realm))
 774                ceph_put_snap_realm(mdsc, realm);
 775        if (first_realm)
 776                ceph_put_snap_realm(mdsc, first_realm);
 777        pr_err("update_snap_trace error %d\n", err);
 778        return err;
 779}
 780
 781
 782/*
 783 * Send any cap_snaps that are queued for flush.  Try to carry
 784 * s_mutex across multiple snap flushes to avoid locking overhead.
 785 *
 786 * Caller holds no locks.
 787 */
 788static void flush_snaps(struct ceph_mds_client *mdsc)
 789{
 790        struct ceph_inode_info *ci;
 791        struct inode *inode;
 792        struct ceph_mds_session *session = NULL;
 793
 794        dout("flush_snaps\n");
 795        spin_lock(&mdsc->snap_flush_lock);
 796        while (!list_empty(&mdsc->snap_flush_list)) {
 797                ci = list_first_entry(&mdsc->snap_flush_list,
 798                                struct ceph_inode_info, i_snap_flush_item);
 799                inode = &ci->vfs_inode;
 800                ihold(inode);
 801                spin_unlock(&mdsc->snap_flush_lock);
 802                spin_lock(&ci->i_ceph_lock);
 803                __ceph_flush_snaps(ci, &session, 0);
 804                spin_unlock(&ci->i_ceph_lock);
 805                iput(inode);
 806                spin_lock(&mdsc->snap_flush_lock);
 807        }
 808        spin_unlock(&mdsc->snap_flush_lock);
 809
 810        if (session) {
 811                mutex_unlock(&session->s_mutex);
 812                ceph_put_mds_session(session);
 813        }
 814        dout("flush_snaps done\n");
 815}
 816
 817
 818/*
 819 * Handle a snap notification from the MDS.
 820 *
 821 * This can take two basic forms: the simplest is just a snap creation
 822 * or deletion notification on an existing realm.  This should update the
 823 * realm and its children.
 824 *
 825 * The more difficult case is realm creation, due to snap creation at a
 826 * new point in the file hierarchy, or due to a rename that moves a file or
 827 * directory into another realm.
 828 */
 829void ceph_handle_snap(struct ceph_mds_client *mdsc,
 830                      struct ceph_mds_session *session,
 831                      struct ceph_msg *msg)
 832{
 833        struct super_block *sb = mdsc->fsc->sb;
 834        int mds = session->s_mds;
 835        u64 split;
 836        int op;
 837        int trace_len;
 838        struct ceph_snap_realm *realm = NULL;
 839        void *p = msg->front.iov_base;
 840        void *e = p + msg->front.iov_len;
 841        struct ceph_mds_snap_head *h;
 842        int num_split_inos, num_split_realms;
 843        __le64 *split_inos = NULL, *split_realms = NULL;
 844        int i;
 845        int locked_rwsem = 0;
 846
 847        /* decode */
 848        if (msg->front.iov_len < sizeof(*h))
 849                goto bad;
 850        h = p;
 851        op = le32_to_cpu(h->op);
 852        split = le64_to_cpu(h->split);   /* non-zero if we are splitting an
 853                                          * existing realm */
 854        num_split_inos = le32_to_cpu(h->num_split_inos);
 855        num_split_realms = le32_to_cpu(h->num_split_realms);
 856        trace_len = le32_to_cpu(h->trace_len);
 857        p += sizeof(*h);
 858
 859        dout("handle_snap from mds%d op %s split %llx tracelen %d\n", mds,
 860             ceph_snap_op_name(op), split, trace_len);
 861
 862        mutex_lock(&session->s_mutex);
 863        session->s_seq++;
 864        mutex_unlock(&session->s_mutex);
 865
 866        down_write(&mdsc->snap_rwsem);
 867        locked_rwsem = 1;
 868
 869        if (op == CEPH_SNAP_OP_SPLIT) {
 870                struct ceph_mds_snap_realm *ri;
 871
 872                /*
 873                 * A "split" breaks part of an existing realm off into
 874                 * a new realm.  The MDS provides a list of inodes
 875                 * (with caps) and child realms that belong to the new
 876                 * child.
 877                 */
 878                split_inos = p;
 879                p += sizeof(u64) * num_split_inos;
 880                split_realms = p;
 881                p += sizeof(u64) * num_split_realms;
 882                ceph_decode_need(&p, e, sizeof(*ri), bad);
 883                /* we will peek at realm info here, but will _not_
 884                 * advance p, as the realm update will occur below in
 885                 * ceph_update_snap_trace. */
 886                ri = p;
 887
 888                realm = ceph_lookup_snap_realm(mdsc, split);
 889                if (!realm) {
 890                        realm = ceph_create_snap_realm(mdsc, split);
 891                        if (IS_ERR(realm))
 892                                goto out;
 893                }
 894
 895                dout("splitting snap_realm %llx %p\n", realm->ino, realm);
 896                for (i = 0; i < num_split_inos; i++) {
 897                        struct ceph_vino vino = {
 898                                .ino = le64_to_cpu(split_inos[i]),
 899                                .snap = CEPH_NOSNAP,
 900                        };
 901                        struct inode *inode = ceph_find_inode(sb, vino);
 902                        struct ceph_inode_info *ci;
 903                        struct ceph_snap_realm *oldrealm;
 904
 905                        if (!inode)
 906                                continue;
 907                        ci = ceph_inode(inode);
 908
 909                        spin_lock(&ci->i_ceph_lock);
 910                        if (!ci->i_snap_realm)
 911                                goto skip_inode;
 912                        /*
 913                         * If this inode belongs to a realm that was
 914                         * created after our new realm, we experienced
 915                         * a race (due to another split notifications
 916                         * arriving from a different MDS).  So skip
 917                         * this inode.
 918                         */
 919                        if (ci->i_snap_realm->created >
 920                            le64_to_cpu(ri->created)) {
 921                                dout(" leaving %p in newer realm %llx %p\n",
 922                                     inode, ci->i_snap_realm->ino,
 923                                     ci->i_snap_realm);
 924                                goto skip_inode;
 925                        }
 926                        dout(" will move %p to split realm %llx %p\n",
 927                             inode, realm->ino, realm);
 928                        /*
 929                         * Move the inode to the new realm
 930                         */
 931                        spin_lock(&realm->inodes_with_caps_lock);
 932                        list_del_init(&ci->i_snap_realm_item);
 933                        list_add(&ci->i_snap_realm_item,
 934                                 &realm->inodes_with_caps);
 935                        oldrealm = ci->i_snap_realm;
 936                        ci->i_snap_realm = realm;
 937                        spin_unlock(&realm->inodes_with_caps_lock);
 938                        spin_unlock(&ci->i_ceph_lock);
 939
 940                        ceph_get_snap_realm(mdsc, realm);
 941                        ceph_put_snap_realm(mdsc, oldrealm);
 942
 943                        iput(inode);
 944                        continue;
 945
 946skip_inode:
 947                        spin_unlock(&ci->i_ceph_lock);
 948                        iput(inode);
 949                }
 950
 951                /* we may have taken some of the old realm's children. */
 952                for (i = 0; i < num_split_realms; i++) {
 953                        struct ceph_snap_realm *child =
 954                                __lookup_snap_realm(mdsc,
 955                                           le64_to_cpu(split_realms[i]));
 956                        if (!child)
 957                                continue;
 958                        adjust_snap_realm_parent(mdsc, child, realm->ino);
 959                }
 960        }
 961
 962        /*
 963         * update using the provided snap trace. if we are deleting a
 964         * snap, we can avoid queueing cap_snaps.
 965         */
 966        ceph_update_snap_trace(mdsc, p, e,
 967                               op == CEPH_SNAP_OP_DESTROY, NULL);
 968
 969        if (op == CEPH_SNAP_OP_SPLIT)
 970                /* we took a reference when we created the realm, above */
 971                ceph_put_snap_realm(mdsc, realm);
 972
 973        __cleanup_empty_realms(mdsc);
 974
 975        up_write(&mdsc->snap_rwsem);
 976
 977        flush_snaps(mdsc);
 978        return;
 979
 980bad:
 981        pr_err("corrupt snap message from mds%d\n", mds);
 982        ceph_msg_dump(msg);
 983out:
 984        if (locked_rwsem)
 985                up_write(&mdsc->snap_rwsem);
 986        return;
 987}
 988