linux/drivers/md/bcache/journal.c
<<
>>
Prefs
   1/*
   2 * bcache journalling code, for btree insertions
   3 *
   4 * Copyright 2012 Google, Inc.
   5 */
   6
   7#include "bcache.h"
   8#include "btree.h"
   9#include "debug.h"
  10#include "extents.h"
  11
  12#include <trace/events/bcache.h>
  13
  14/*
  15 * Journal replay/recovery:
  16 *
  17 * This code is all driven from run_cache_set(); we first read the journal
  18 * entries, do some other stuff, then we mark all the keys in the journal
  19 * entries (same as garbage collection would), then we replay them - reinserting
  20 * them into the cache in precisely the same order as they appear in the
  21 * journal.
  22 *
  23 * We only journal keys that go in leaf nodes, which simplifies things quite a
  24 * bit.
  25 */
  26
  27static void journal_read_endio(struct bio *bio)
  28{
  29        struct closure *cl = bio->bi_private;
  30        closure_put(cl);
  31}
  32
  33static int journal_read_bucket(struct cache *ca, struct list_head *list,
  34                               unsigned bucket_index)
  35{
  36        struct journal_device *ja = &ca->journal;
  37        struct bio *bio = &ja->bio;
  38
  39        struct journal_replay *i;
  40        struct jset *j, *data = ca->set->journal.w[0].data;
  41        struct closure cl;
  42        unsigned len, left, offset = 0;
  43        int ret = 0;
  44        sector_t bucket = bucket_to_sector(ca->set, ca->sb.d[bucket_index]);
  45
  46        closure_init_stack(&cl);
  47
  48        pr_debug("reading %u", bucket_index);
  49
  50        while (offset < ca->sb.bucket_size) {
  51reread:         left = ca->sb.bucket_size - offset;
  52                len = min_t(unsigned, left, PAGE_SECTORS << JSET_BITS);
  53
  54                bio_reset(bio);
  55                bio->bi_iter.bi_sector  = bucket + offset;
  56                bio->bi_bdev    = ca->bdev;
  57                bio->bi_rw      = READ;
  58                bio->bi_iter.bi_size    = len << 9;
  59
  60                bio->bi_end_io  = journal_read_endio;
  61                bio->bi_private = &cl;
  62                bch_bio_map(bio, data);
  63
  64                closure_bio_submit(bio, &cl);
  65                closure_sync(&cl);
  66
  67                /* This function could be simpler now since we no longer write
  68                 * journal entries that overlap bucket boundaries; this means
  69                 * the start of a bucket will always have a valid journal entry
  70                 * if it has any journal entries at all.
  71                 */
  72
  73                j = data;
  74                while (len) {
  75                        struct list_head *where;
  76                        size_t blocks, bytes = set_bytes(j);
  77
  78                        if (j->magic != jset_magic(&ca->sb)) {
  79                                pr_debug("%u: bad magic", bucket_index);
  80                                return ret;
  81                        }
  82
  83                        if (bytes > left << 9 ||
  84                            bytes > PAGE_SIZE << JSET_BITS) {
  85                                pr_info("%u: too big, %zu bytes, offset %u",
  86                                        bucket_index, bytes, offset);
  87                                return ret;
  88                        }
  89
  90                        if (bytes > len << 9)
  91                                goto reread;
  92
  93                        if (j->csum != csum_set(j)) {
  94                                pr_info("%u: bad csum, %zu bytes, offset %u",
  95                                        bucket_index, bytes, offset);
  96                                return ret;
  97                        }
  98
  99                        blocks = set_blocks(j, block_bytes(ca->set));
 100
 101                        while (!list_empty(list)) {
 102                                i = list_first_entry(list,
 103                                        struct journal_replay, list);
 104                                if (i->j.seq >= j->last_seq)
 105                                        break;
 106                                list_del(&i->list);
 107                                kfree(i);
 108                        }
 109
 110                        list_for_each_entry_reverse(i, list, list) {
 111                                if (j->seq == i->j.seq)
 112                                        goto next_set;
 113
 114                                if (j->seq < i->j.last_seq)
 115                                        goto next_set;
 116
 117                                if (j->seq > i->j.seq) {
 118                                        where = &i->list;
 119                                        goto add;
 120                                }
 121                        }
 122
 123                        where = list;
 124add:
 125                        i = kmalloc(offsetof(struct journal_replay, j) +
 126                                    bytes, GFP_KERNEL);
 127                        if (!i)
 128                                return -ENOMEM;
 129                        memcpy(&i->j, j, bytes);
 130                        list_add(&i->list, where);
 131                        ret = 1;
 132
 133                        ja->seq[bucket_index] = j->seq;
 134next_set:
 135                        offset  += blocks * ca->sb.block_size;
 136                        len     -= blocks * ca->sb.block_size;
 137                        j = ((void *) j) + blocks * block_bytes(ca);
 138                }
 139        }
 140
 141        return ret;
 142}
 143
 144int bch_journal_read(struct cache_set *c, struct list_head *list)
 145{
 146#define read_bucket(b)                                                  \
 147        ({                                                              \
 148                int ret = journal_read_bucket(ca, list, b);             \
 149                __set_bit(b, bitmap);                                   \
 150                if (ret < 0)                                            \
 151                        return ret;                                     \
 152                ret;                                                    \
 153        })
 154
 155        struct cache *ca;
 156        unsigned iter;
 157
 158        for_each_cache(ca, c, iter) {
 159                struct journal_device *ja = &ca->journal;
 160                DECLARE_BITMAP(bitmap, SB_JOURNAL_BUCKETS);
 161                unsigned i, l, r, m;
 162                uint64_t seq;
 163
 164                bitmap_zero(bitmap, SB_JOURNAL_BUCKETS);
 165                pr_debug("%u journal buckets", ca->sb.njournal_buckets);
 166
 167                /*
 168                 * Read journal buckets ordered by golden ratio hash to quickly
 169                 * find a sequence of buckets with valid journal entries
 170                 */
 171                for (i = 0; i < ca->sb.njournal_buckets; i++) {
 172                        l = (i * 2654435769U) % ca->sb.njournal_buckets;
 173
 174                        if (test_bit(l, bitmap))
 175                                break;
 176
 177                        if (read_bucket(l))
 178                                goto bsearch;
 179                }
 180
 181                /*
 182                 * If that fails, check all the buckets we haven't checked
 183                 * already
 184                 */
 185                pr_debug("falling back to linear search");
 186
 187                for (l = find_first_zero_bit(bitmap, ca->sb.njournal_buckets);
 188                     l < ca->sb.njournal_buckets;
 189                     l = find_next_zero_bit(bitmap, ca->sb.njournal_buckets, l + 1))
 190                        if (read_bucket(l))
 191                                goto bsearch;
 192
 193                /* no journal entries on this device? */
 194                if (l == ca->sb.njournal_buckets)
 195                        continue;
 196bsearch:
 197                BUG_ON(list_empty(list));
 198
 199                /* Binary search */
 200                m = l;
 201                r = find_next_bit(bitmap, ca->sb.njournal_buckets, l + 1);
 202                pr_debug("starting binary search, l %u r %u", l, r);
 203
 204                while (l + 1 < r) {
 205                        seq = list_entry(list->prev, struct journal_replay,
 206                                         list)->j.seq;
 207
 208                        m = (l + r) >> 1;
 209                        read_bucket(m);
 210
 211                        if (seq != list_entry(list->prev, struct journal_replay,
 212                                              list)->j.seq)
 213                                l = m;
 214                        else
 215                                r = m;
 216                }
 217
 218                /*
 219                 * Read buckets in reverse order until we stop finding more
 220                 * journal entries
 221                 */
 222                pr_debug("finishing up: m %u njournal_buckets %u",
 223                         m, ca->sb.njournal_buckets);
 224                l = m;
 225
 226                while (1) {
 227                        if (!l--)
 228                                l = ca->sb.njournal_buckets - 1;
 229
 230                        if (l == m)
 231                                break;
 232
 233                        if (test_bit(l, bitmap))
 234                                continue;
 235
 236                        if (!read_bucket(l))
 237                                break;
 238                }
 239
 240                seq = 0;
 241
 242                for (i = 0; i < ca->sb.njournal_buckets; i++)
 243                        if (ja->seq[i] > seq) {
 244                                seq = ja->seq[i];
 245                                /*
 246                                 * When journal_reclaim() goes to allocate for
 247                                 * the first time, it'll use the bucket after
 248                                 * ja->cur_idx
 249                                 */
 250                                ja->cur_idx = i;
 251                                ja->last_idx = ja->discard_idx = (i + 1) %
 252                                        ca->sb.njournal_buckets;
 253
 254                        }
 255        }
 256
 257        if (!list_empty(list))
 258                c->journal.seq = list_entry(list->prev,
 259                                            struct journal_replay,
 260                                            list)->j.seq;
 261
 262        return 0;
 263#undef read_bucket
 264}
 265
 266void bch_journal_mark(struct cache_set *c, struct list_head *list)
 267{
 268        atomic_t p = { 0 };
 269        struct bkey *k;
 270        struct journal_replay *i;
 271        struct journal *j = &c->journal;
 272        uint64_t last = j->seq;
 273
 274        /*
 275         * journal.pin should never fill up - we never write a journal
 276         * entry when it would fill up. But if for some reason it does, we
 277         * iterate over the list in reverse order so that we can just skip that
 278         * refcount instead of bugging.
 279         */
 280
 281        list_for_each_entry_reverse(i, list, list) {
 282                BUG_ON(last < i->j.seq);
 283                i->pin = NULL;
 284
 285                while (last-- != i->j.seq)
 286                        if (fifo_free(&j->pin) > 1) {
 287                                fifo_push_front(&j->pin, p);
 288                                atomic_set(&fifo_front(&j->pin), 0);
 289                        }
 290
 291                if (fifo_free(&j->pin) > 1) {
 292                        fifo_push_front(&j->pin, p);
 293                        i->pin = &fifo_front(&j->pin);
 294                        atomic_set(i->pin, 1);
 295                }
 296
 297                for (k = i->j.start;
 298                     k < bset_bkey_last(&i->j);
 299                     k = bkey_next(k))
 300                        if (!__bch_extent_invalid(c, k)) {
 301                                unsigned j;
 302
 303                                for (j = 0; j < KEY_PTRS(k); j++)
 304                                        if (ptr_available(c, k, j))
 305                                                atomic_inc(&PTR_BUCKET(c, k, j)->pin);
 306
 307                                bch_initial_mark_key(c, 0, k);
 308                        }
 309        }
 310}
 311
 312int bch_journal_replay(struct cache_set *s, struct list_head *list)
 313{
 314        int ret = 0, keys = 0, entries = 0;
 315        struct bkey *k;
 316        struct journal_replay *i =
 317                list_entry(list->prev, struct journal_replay, list);
 318
 319        uint64_t start = i->j.last_seq, end = i->j.seq, n = start;
 320        struct keylist keylist;
 321
 322        list_for_each_entry(i, list, list) {
 323                BUG_ON(i->pin && atomic_read(i->pin) != 1);
 324
 325                cache_set_err_on(n != i->j.seq, s,
 326"bcache: journal entries %llu-%llu missing! (replaying %llu-%llu)",
 327                                 n, i->j.seq - 1, start, end);
 328
 329                for (k = i->j.start;
 330                     k < bset_bkey_last(&i->j);
 331                     k = bkey_next(k)) {
 332                        trace_bcache_journal_replay_key(k);
 333
 334                        bch_keylist_init_single(&keylist, k);
 335
 336                        ret = bch_btree_insert(s, &keylist, i->pin, NULL);
 337                        if (ret)
 338                                goto err;
 339
 340                        BUG_ON(!bch_keylist_empty(&keylist));
 341                        keys++;
 342
 343                        cond_resched();
 344                }
 345
 346                if (i->pin)
 347                        atomic_dec(i->pin);
 348                n = i->j.seq + 1;
 349                entries++;
 350        }
 351
 352        pr_info("journal replay done, %i keys in %i entries, seq %llu",
 353                keys, entries, end);
 354err:
 355        while (!list_empty(list)) {
 356                i = list_first_entry(list, struct journal_replay, list);
 357                list_del(&i->list);
 358                kfree(i);
 359        }
 360
 361        return ret;
 362}
 363
 364/* Journalling */
 365
 366static void btree_flush_write(struct cache_set *c)
 367{
 368        /*
 369         * Try to find the btree node with that references the oldest journal
 370         * entry, best is our current candidate and is locked if non NULL:
 371         */
 372        struct btree *b, *best;
 373        unsigned i;
 374retry:
 375        best = NULL;
 376
 377        for_each_cached_btree(b, c, i)
 378                if (btree_current_write(b)->journal) {
 379                        if (!best)
 380                                best = b;
 381                        else if (journal_pin_cmp(c,
 382                                        btree_current_write(best)->journal,
 383                                        btree_current_write(b)->journal)) {
 384                                best = b;
 385                        }
 386                }
 387
 388        b = best;
 389        if (b) {
 390                mutex_lock(&b->write_lock);
 391                if (!btree_current_write(b)->journal) {
 392                        mutex_unlock(&b->write_lock);
 393                        /* We raced */
 394                        goto retry;
 395                }
 396
 397                __bch_btree_node_write(b, NULL);
 398                mutex_unlock(&b->write_lock);
 399        }
 400}
 401
 402#define last_seq(j)     ((j)->seq - fifo_used(&(j)->pin) + 1)
 403
 404static void journal_discard_endio(struct bio *bio)
 405{
 406        struct journal_device *ja =
 407                container_of(bio, struct journal_device, discard_bio);
 408        struct cache *ca = container_of(ja, struct cache, journal);
 409
 410        atomic_set(&ja->discard_in_flight, DISCARD_DONE);
 411
 412        closure_wake_up(&ca->set->journal.wait);
 413        closure_put(&ca->set->cl);
 414}
 415
 416static void journal_discard_work(struct work_struct *work)
 417{
 418        struct journal_device *ja =
 419                container_of(work, struct journal_device, discard_work);
 420
 421        submit_bio(0, &ja->discard_bio);
 422}
 423
 424static void do_journal_discard(struct cache *ca)
 425{
 426        struct journal_device *ja = &ca->journal;
 427        struct bio *bio = &ja->discard_bio;
 428
 429        if (!ca->discard) {
 430                ja->discard_idx = ja->last_idx;
 431                return;
 432        }
 433
 434        switch (atomic_read(&ja->discard_in_flight)) {
 435        case DISCARD_IN_FLIGHT:
 436                return;
 437
 438        case DISCARD_DONE:
 439                ja->discard_idx = (ja->discard_idx + 1) %
 440                        ca->sb.njournal_buckets;
 441
 442                atomic_set(&ja->discard_in_flight, DISCARD_READY);
 443                /* fallthrough */
 444
 445        case DISCARD_READY:
 446                if (ja->discard_idx == ja->last_idx)
 447                        return;
 448
 449                atomic_set(&ja->discard_in_flight, DISCARD_IN_FLIGHT);
 450
 451                bio_init(bio);
 452                bio->bi_iter.bi_sector  = bucket_to_sector(ca->set,
 453                                                ca->sb.d[ja->discard_idx]);
 454                bio->bi_bdev            = ca->bdev;
 455                bio->bi_rw              = REQ_WRITE|REQ_DISCARD;
 456                bio->bi_max_vecs        = 1;
 457                bio->bi_io_vec          = bio->bi_inline_vecs;
 458                bio->bi_iter.bi_size    = bucket_bytes(ca);
 459                bio->bi_end_io          = journal_discard_endio;
 460
 461                closure_get(&ca->set->cl);
 462                INIT_WORK(&ja->discard_work, journal_discard_work);
 463                schedule_work(&ja->discard_work);
 464        }
 465}
 466
 467static void journal_reclaim(struct cache_set *c)
 468{
 469        struct bkey *k = &c->journal.key;
 470        struct cache *ca;
 471        uint64_t last_seq;
 472        unsigned iter, n = 0;
 473        atomic_t p;
 474
 475        while (!atomic_read(&fifo_front(&c->journal.pin)))
 476                fifo_pop(&c->journal.pin, p);
 477
 478        last_seq = last_seq(&c->journal);
 479
 480        /* Update last_idx */
 481
 482        for_each_cache(ca, c, iter) {
 483                struct journal_device *ja = &ca->journal;
 484
 485                while (ja->last_idx != ja->cur_idx &&
 486                       ja->seq[ja->last_idx] < last_seq)
 487                        ja->last_idx = (ja->last_idx + 1) %
 488                                ca->sb.njournal_buckets;
 489        }
 490
 491        for_each_cache(ca, c, iter)
 492                do_journal_discard(ca);
 493
 494        if (c->journal.blocks_free)
 495                goto out;
 496
 497        /*
 498         * Allocate:
 499         * XXX: Sort by free journal space
 500         */
 501
 502        for_each_cache(ca, c, iter) {
 503                struct journal_device *ja = &ca->journal;
 504                unsigned next = (ja->cur_idx + 1) % ca->sb.njournal_buckets;
 505
 506                /* No space available on this device */
 507                if (next == ja->discard_idx)
 508                        continue;
 509
 510                ja->cur_idx = next;
 511                k->ptr[n++] = PTR(0,
 512                                  bucket_to_sector(c, ca->sb.d[ja->cur_idx]),
 513                                  ca->sb.nr_this_dev);
 514        }
 515
 516        bkey_init(k);
 517        SET_KEY_PTRS(k, n);
 518
 519        if (n)
 520                c->journal.blocks_free = c->sb.bucket_size >> c->block_bits;
 521out:
 522        if (!journal_full(&c->journal))
 523                __closure_wake_up(&c->journal.wait);
 524}
 525
 526void bch_journal_next(struct journal *j)
 527{
 528        atomic_t p = { 1 };
 529
 530        j->cur = (j->cur == j->w)
 531                ? &j->w[1]
 532                : &j->w[0];
 533
 534        /*
 535         * The fifo_push() needs to happen at the same time as j->seq is
 536         * incremented for last_seq() to be calculated correctly
 537         */
 538        BUG_ON(!fifo_push(&j->pin, p));
 539        atomic_set(&fifo_back(&j->pin), 1);
 540
 541        j->cur->data->seq       = ++j->seq;
 542        j->cur->dirty           = false;
 543        j->cur->need_write      = false;
 544        j->cur->data->keys      = 0;
 545
 546        if (fifo_full(&j->pin))
 547                pr_debug("journal_pin full (%zu)", fifo_used(&j->pin));
 548}
 549
 550static void journal_write_endio(struct bio *bio)
 551{
 552        struct journal_write *w = bio->bi_private;
 553
 554        cache_set_err_on(bio->bi_error, w->c, "journal io error");
 555        closure_put(&w->c->journal.io);
 556}
 557
 558static void journal_write(struct closure *);
 559
 560static void journal_write_done(struct closure *cl)
 561{
 562        struct journal *j = container_of(cl, struct journal, io);
 563        struct journal_write *w = (j->cur == j->w)
 564                ? &j->w[1]
 565                : &j->w[0];
 566
 567        __closure_wake_up(&w->wait);
 568        continue_at_nobarrier(cl, journal_write, system_wq);
 569}
 570
 571static void journal_write_unlock(struct closure *cl)
 572{
 573        struct cache_set *c = container_of(cl, struct cache_set, journal.io);
 574
 575        c->journal.io_in_flight = 0;
 576        spin_unlock(&c->journal.lock);
 577}
 578
 579static void journal_write_unlocked(struct closure *cl)
 580        __releases(c->journal.lock)
 581{
 582        struct cache_set *c = container_of(cl, struct cache_set, journal.io);
 583        struct cache *ca;
 584        struct journal_write *w = c->journal.cur;
 585        struct bkey *k = &c->journal.key;
 586        unsigned i, sectors = set_blocks(w->data, block_bytes(c)) *
 587                c->sb.block_size;
 588
 589        struct bio *bio;
 590        struct bio_list list;
 591        bio_list_init(&list);
 592
 593        if (!w->need_write) {
 594                closure_return_with_destructor(cl, journal_write_unlock);
 595                return;
 596        } else if (journal_full(&c->journal)) {
 597                journal_reclaim(c);
 598                spin_unlock(&c->journal.lock);
 599
 600                btree_flush_write(c);
 601                continue_at(cl, journal_write, system_wq);
 602                return;
 603        }
 604
 605        c->journal.blocks_free -= set_blocks(w->data, block_bytes(c));
 606
 607        w->data->btree_level = c->root->level;
 608
 609        bkey_copy(&w->data->btree_root, &c->root->key);
 610        bkey_copy(&w->data->uuid_bucket, &c->uuid_bucket);
 611
 612        for_each_cache(ca, c, i)
 613                w->data->prio_bucket[ca->sb.nr_this_dev] = ca->prio_buckets[0];
 614
 615        w->data->magic          = jset_magic(&c->sb);
 616        w->data->version        = BCACHE_JSET_VERSION;
 617        w->data->last_seq       = last_seq(&c->journal);
 618        w->data->csum           = csum_set(w->data);
 619
 620        for (i = 0; i < KEY_PTRS(k); i++) {
 621                ca = PTR_CACHE(c, k, i);
 622                bio = &ca->journal.bio;
 623
 624                atomic_long_add(sectors, &ca->meta_sectors_written);
 625
 626                bio_reset(bio);
 627                bio->bi_iter.bi_sector  = PTR_OFFSET(k, i);
 628                bio->bi_bdev    = ca->bdev;
 629                bio->bi_rw      = REQ_WRITE|REQ_SYNC|REQ_META|REQ_FLUSH|REQ_FUA;
 630                bio->bi_iter.bi_size = sectors << 9;
 631
 632                bio->bi_end_io  = journal_write_endio;
 633                bio->bi_private = w;
 634                bch_bio_map(bio, w->data);
 635
 636                trace_bcache_journal_write(bio);
 637                bio_list_add(&list, bio);
 638
 639                SET_PTR_OFFSET(k, i, PTR_OFFSET(k, i) + sectors);
 640
 641                ca->journal.seq[ca->journal.cur_idx] = w->data->seq;
 642        }
 643
 644        atomic_dec_bug(&fifo_back(&c->journal.pin));
 645        bch_journal_next(&c->journal);
 646        journal_reclaim(c);
 647
 648        spin_unlock(&c->journal.lock);
 649
 650        while ((bio = bio_list_pop(&list)))
 651                closure_bio_submit(bio, cl);
 652
 653        continue_at(cl, journal_write_done, NULL);
 654}
 655
 656static void journal_write(struct closure *cl)
 657{
 658        struct cache_set *c = container_of(cl, struct cache_set, journal.io);
 659
 660        spin_lock(&c->journal.lock);
 661        journal_write_unlocked(cl);
 662}
 663
 664static void journal_try_write(struct cache_set *c)
 665        __releases(c->journal.lock)
 666{
 667        struct closure *cl = &c->journal.io;
 668        struct journal_write *w = c->journal.cur;
 669
 670        w->need_write = true;
 671
 672        if (!c->journal.io_in_flight) {
 673                c->journal.io_in_flight = 1;
 674                closure_call(cl, journal_write_unlocked, NULL, &c->cl);
 675        } else {
 676                spin_unlock(&c->journal.lock);
 677        }
 678}
 679
 680static struct journal_write *journal_wait_for_write(struct cache_set *c,
 681                                                    unsigned nkeys)
 682{
 683        size_t sectors;
 684        struct closure cl;
 685        bool wait = false;
 686
 687        closure_init_stack(&cl);
 688
 689        spin_lock(&c->journal.lock);
 690
 691        while (1) {
 692                struct journal_write *w = c->journal.cur;
 693
 694                sectors = __set_blocks(w->data, w->data->keys + nkeys,
 695                                       block_bytes(c)) * c->sb.block_size;
 696
 697                if (sectors <= min_t(size_t,
 698                                     c->journal.blocks_free * c->sb.block_size,
 699                                     PAGE_SECTORS << JSET_BITS))
 700                        return w;
 701
 702                if (wait)
 703                        closure_wait(&c->journal.wait, &cl);
 704
 705                if (!journal_full(&c->journal)) {
 706                        if (wait)
 707                                trace_bcache_journal_entry_full(c);
 708
 709                        /*
 710                         * XXX: If we were inserting so many keys that they
 711                         * won't fit in an _empty_ journal write, we'll
 712                         * deadlock. For now, handle this in
 713                         * bch_keylist_realloc() - but something to think about.
 714                         */
 715                        BUG_ON(!w->data->keys);
 716
 717                        journal_try_write(c); /* unlocks */
 718                } else {
 719                        if (wait)
 720                                trace_bcache_journal_full(c);
 721
 722                        journal_reclaim(c);
 723                        spin_unlock(&c->journal.lock);
 724
 725                        btree_flush_write(c);
 726                }
 727
 728                closure_sync(&cl);
 729                spin_lock(&c->journal.lock);
 730                wait = true;
 731        }
 732}
 733
 734static void journal_write_work(struct work_struct *work)
 735{
 736        struct cache_set *c = container_of(to_delayed_work(work),
 737                                           struct cache_set,
 738                                           journal.work);
 739        spin_lock(&c->journal.lock);
 740        if (c->journal.cur->dirty)
 741                journal_try_write(c);
 742        else
 743                spin_unlock(&c->journal.lock);
 744}
 745
 746/*
 747 * Entry point to the journalling code - bio_insert() and btree_invalidate()
 748 * pass bch_journal() a list of keys to be journalled, and then
 749 * bch_journal() hands those same keys off to btree_insert_async()
 750 */
 751
 752atomic_t *bch_journal(struct cache_set *c,
 753                      struct keylist *keys,
 754                      struct closure *parent)
 755{
 756        struct journal_write *w;
 757        atomic_t *ret;
 758
 759        if (!CACHE_SYNC(&c->sb))
 760                return NULL;
 761
 762        w = journal_wait_for_write(c, bch_keylist_nkeys(keys));
 763
 764        memcpy(bset_bkey_last(w->data), keys->keys, bch_keylist_bytes(keys));
 765        w->data->keys += bch_keylist_nkeys(keys);
 766
 767        ret = &fifo_back(&c->journal.pin);
 768        atomic_inc(ret);
 769
 770        if (parent) {
 771                closure_wait(&w->wait, parent);
 772                journal_try_write(c);
 773        } else if (!w->dirty) {
 774                w->dirty = true;
 775                schedule_delayed_work(&c->journal.work,
 776                                      msecs_to_jiffies(c->journal_delay_ms));
 777                spin_unlock(&c->journal.lock);
 778        } else {
 779                spin_unlock(&c->journal.lock);
 780        }
 781
 782
 783        return ret;
 784}
 785
 786void bch_journal_meta(struct cache_set *c, struct closure *cl)
 787{
 788        struct keylist keys;
 789        atomic_t *ref;
 790
 791        bch_keylist_init(&keys);
 792
 793        ref = bch_journal(c, &keys, cl);
 794        if (ref)
 795                atomic_dec_bug(ref);
 796}
 797
 798void bch_journal_free(struct cache_set *c)
 799{
 800        free_pages((unsigned long) c->journal.w[1].data, JSET_BITS);
 801        free_pages((unsigned long) c->journal.w[0].data, JSET_BITS);
 802        free_fifo(&c->journal.pin);
 803}
 804
 805int bch_journal_alloc(struct cache_set *c)
 806{
 807        struct journal *j = &c->journal;
 808
 809        spin_lock_init(&j->lock);
 810        INIT_DELAYED_WORK(&j->work, journal_write_work);
 811
 812        c->journal_delay_ms = 100;
 813
 814        j->w[0].c = c;
 815        j->w[1].c = c;
 816
 817        if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
 818            !(j->w[0].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS)) ||
 819            !(j->w[1].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS)))
 820                return -ENOMEM;
 821
 822        return 0;
 823}
 824