linux/drivers/md/bcache/journal.c
<<
>>
Prefs
   1/*
   2 * bcache journalling code, for btree insertions
   3 *
   4 * Copyright 2012 Google, Inc.
   5 */
   6
   7#include "bcache.h"
   8#include "btree.h"
   9#include "debug.h"
  10#include "extents.h"
  11
  12#include <trace/events/bcache.h>
  13
  14/*
  15 * Journal replay/recovery:
  16 *
  17 * This code is all driven from run_cache_set(); we first read the journal
  18 * entries, do some other stuff, then we mark all the keys in the journal
  19 * entries (same as garbage collection would), then we replay them - reinserting
  20 * them into the cache in precisely the same order as they appear in the
  21 * journal.
  22 *
  23 * We only journal keys that go in leaf nodes, which simplifies things quite a
  24 * bit.
  25 */
  26
  27static void journal_read_endio(struct bio *bio)
  28{
  29        struct closure *cl = bio->bi_private;
  30        closure_put(cl);
  31}
  32
  33static int journal_read_bucket(struct cache *ca, struct list_head *list,
  34                               unsigned bucket_index)
  35{
  36        struct journal_device *ja = &ca->journal;
  37        struct bio *bio = &ja->bio;
  38
  39        struct journal_replay *i;
  40        struct jset *j, *data = ca->set->journal.w[0].data;
  41        struct closure cl;
  42        unsigned len, left, offset = 0;
  43        int ret = 0;
  44        sector_t bucket = bucket_to_sector(ca->set, ca->sb.d[bucket_index]);
  45
  46        closure_init_stack(&cl);
  47
  48        pr_debug("reading %u", bucket_index);
  49
  50        while (offset < ca->sb.bucket_size) {
  51reread:         left = ca->sb.bucket_size - offset;
  52                len = min_t(unsigned, left, PAGE_SECTORS << JSET_BITS);
  53
  54                bio_reset(bio);
  55                bio->bi_iter.bi_sector  = bucket + offset;
  56                bio->bi_bdev    = ca->bdev;
  57                bio->bi_iter.bi_size    = len << 9;
  58
  59                bio->bi_end_io  = journal_read_endio;
  60                bio->bi_private = &cl;
  61                bio_set_op_attrs(bio, REQ_OP_READ, 0);
  62                bch_bio_map(bio, data);
  63
  64                closure_bio_submit(bio, &cl);
  65                closure_sync(&cl);
  66
  67                /* This function could be simpler now since we no longer write
  68                 * journal entries that overlap bucket boundaries; this means
  69                 * the start of a bucket will always have a valid journal entry
  70                 * if it has any journal entries at all.
  71                 */
  72
  73                j = data;
  74                while (len) {
  75                        struct list_head *where;
  76                        size_t blocks, bytes = set_bytes(j);
  77
  78                        if (j->magic != jset_magic(&ca->sb)) {
  79                                pr_debug("%u: bad magic", bucket_index);
  80                                return ret;
  81                        }
  82
  83                        if (bytes > left << 9 ||
  84                            bytes > PAGE_SIZE << JSET_BITS) {
  85                                pr_info("%u: too big, %zu bytes, offset %u",
  86                                        bucket_index, bytes, offset);
  87                                return ret;
  88                        }
  89
  90                        if (bytes > len << 9)
  91                                goto reread;
  92
  93                        if (j->csum != csum_set(j)) {
  94                                pr_info("%u: bad csum, %zu bytes, offset %u",
  95                                        bucket_index, bytes, offset);
  96                                return ret;
  97                        }
  98
  99                        blocks = set_blocks(j, block_bytes(ca->set));
 100
 101                        while (!list_empty(list)) {
 102                                i = list_first_entry(list,
 103                                        struct journal_replay, list);
 104                                if (i->j.seq >= j->last_seq)
 105                                        break;
 106                                list_del(&i->list);
 107                                kfree(i);
 108                        }
 109
 110                        list_for_each_entry_reverse(i, list, list) {
 111                                if (j->seq == i->j.seq)
 112                                        goto next_set;
 113
 114                                if (j->seq < i->j.last_seq)
 115                                        goto next_set;
 116
 117                                if (j->seq > i->j.seq) {
 118                                        where = &i->list;
 119                                        goto add;
 120                                }
 121                        }
 122
 123                        where = list;
 124add:
 125                        i = kmalloc(offsetof(struct journal_replay, j) +
 126                                    bytes, GFP_KERNEL);
 127                        if (!i)
 128                                return -ENOMEM;
 129                        memcpy(&i->j, j, bytes);
 130                        list_add(&i->list, where);
 131                        ret = 1;
 132
 133                        ja->seq[bucket_index] = j->seq;
 134next_set:
 135                        offset  += blocks * ca->sb.block_size;
 136                        len     -= blocks * ca->sb.block_size;
 137                        j = ((void *) j) + blocks * block_bytes(ca);
 138                }
 139        }
 140
 141        return ret;
 142}
 143
 144int bch_journal_read(struct cache_set *c, struct list_head *list)
 145{
 146#define read_bucket(b)                                                  \
 147        ({                                                              \
 148                int ret = journal_read_bucket(ca, list, b);             \
 149                __set_bit(b, bitmap);                                   \
 150                if (ret < 0)                                            \
 151                        return ret;                                     \
 152                ret;                                                    \
 153        })
 154
 155        struct cache *ca;
 156        unsigned iter;
 157
 158        for_each_cache(ca, c, iter) {
 159                struct journal_device *ja = &ca->journal;
 160                DECLARE_BITMAP(bitmap, SB_JOURNAL_BUCKETS);
 161                unsigned i, l, r, m;
 162                uint64_t seq;
 163
 164                bitmap_zero(bitmap, SB_JOURNAL_BUCKETS);
 165                pr_debug("%u journal buckets", ca->sb.njournal_buckets);
 166
 167                /*
 168                 * Read journal buckets ordered by golden ratio hash to quickly
 169                 * find a sequence of buckets with valid journal entries
 170                 */
 171                for (i = 0; i < ca->sb.njournal_buckets; i++) {
 172                        l = (i * 2654435769U) % ca->sb.njournal_buckets;
 173
 174                        if (test_bit(l, bitmap))
 175                                break;
 176
 177                        if (read_bucket(l))
 178                                goto bsearch;
 179                }
 180
 181                /*
 182                 * If that fails, check all the buckets we haven't checked
 183                 * already
 184                 */
 185                pr_debug("falling back to linear search");
 186
 187                for (l = find_first_zero_bit(bitmap, ca->sb.njournal_buckets);
 188                     l < ca->sb.njournal_buckets;
 189                     l = find_next_zero_bit(bitmap, ca->sb.njournal_buckets, l + 1))
 190                        if (read_bucket(l))
 191                                goto bsearch;
 192
 193                /* no journal entries on this device? */
 194                if (l == ca->sb.njournal_buckets)
 195                        continue;
 196bsearch:
 197                BUG_ON(list_empty(list));
 198
 199                /* Binary search */
 200                m = l;
 201                r = find_next_bit(bitmap, ca->sb.njournal_buckets, l + 1);
 202                pr_debug("starting binary search, l %u r %u", l, r);
 203
 204                while (l + 1 < r) {
 205                        seq = list_entry(list->prev, struct journal_replay,
 206                                         list)->j.seq;
 207
 208                        m = (l + r) >> 1;
 209                        read_bucket(m);
 210
 211                        if (seq != list_entry(list->prev, struct journal_replay,
 212                                              list)->j.seq)
 213                                l = m;
 214                        else
 215                                r = m;
 216                }
 217
 218                /*
 219                 * Read buckets in reverse order until we stop finding more
 220                 * journal entries
 221                 */
 222                pr_debug("finishing up: m %u njournal_buckets %u",
 223                         m, ca->sb.njournal_buckets);
 224                l = m;
 225
 226                while (1) {
 227                        if (!l--)
 228                                l = ca->sb.njournal_buckets - 1;
 229
 230                        if (l == m)
 231                                break;
 232
 233                        if (test_bit(l, bitmap))
 234                                continue;
 235
 236                        if (!read_bucket(l))
 237                                break;
 238                }
 239
 240                seq = 0;
 241
 242                for (i = 0; i < ca->sb.njournal_buckets; i++)
 243                        if (ja->seq[i] > seq) {
 244                                seq = ja->seq[i];
 245                                /*
 246                                 * When journal_reclaim() goes to allocate for
 247                                 * the first time, it'll use the bucket after
 248                                 * ja->cur_idx
 249                                 */
 250                                ja->cur_idx = i;
 251                                ja->last_idx = ja->discard_idx = (i + 1) %
 252                                        ca->sb.njournal_buckets;
 253
 254                        }
 255        }
 256
 257        if (!list_empty(list))
 258                c->journal.seq = list_entry(list->prev,
 259                                            struct journal_replay,
 260                                            list)->j.seq;
 261
 262        return 0;
 263#undef read_bucket
 264}
 265
 266void bch_journal_mark(struct cache_set *c, struct list_head *list)
 267{
 268        atomic_t p = { 0 };
 269        struct bkey *k;
 270        struct journal_replay *i;
 271        struct journal *j = &c->journal;
 272        uint64_t last = j->seq;
 273
 274        /*
 275         * journal.pin should never fill up - we never write a journal
 276         * entry when it would fill up. But if for some reason it does, we
 277         * iterate over the list in reverse order so that we can just skip that
 278         * refcount instead of bugging.
 279         */
 280
 281        list_for_each_entry_reverse(i, list, list) {
 282                BUG_ON(last < i->j.seq);
 283                i->pin = NULL;
 284
 285                while (last-- != i->j.seq)
 286                        if (fifo_free(&j->pin) > 1) {
 287                                fifo_push_front(&j->pin, p);
 288                                atomic_set(&fifo_front(&j->pin), 0);
 289                        }
 290
 291                if (fifo_free(&j->pin) > 1) {
 292                        fifo_push_front(&j->pin, p);
 293                        i->pin = &fifo_front(&j->pin);
 294                        atomic_set(i->pin, 1);
 295                }
 296
 297                for (k = i->j.start;
 298                     k < bset_bkey_last(&i->j);
 299                     k = bkey_next(k))
 300                        if (!__bch_extent_invalid(c, k)) {
 301                                unsigned j;
 302
 303                                for (j = 0; j < KEY_PTRS(k); j++)
 304                                        if (ptr_available(c, k, j))
 305                                                atomic_inc(&PTR_BUCKET(c, k, j)->pin);
 306
 307                                bch_initial_mark_key(c, 0, k);
 308                        }
 309        }
 310}
 311
 312int bch_journal_replay(struct cache_set *s, struct list_head *list)
 313{
 314        int ret = 0, keys = 0, entries = 0;
 315        struct bkey *k;
 316        struct journal_replay *i =
 317                list_entry(list->prev, struct journal_replay, list);
 318
 319        uint64_t start = i->j.last_seq, end = i->j.seq, n = start;
 320        struct keylist keylist;
 321
 322        list_for_each_entry(i, list, list) {
 323                BUG_ON(i->pin && atomic_read(i->pin) != 1);
 324
 325                cache_set_err_on(n != i->j.seq, s,
 326"bcache: journal entries %llu-%llu missing! (replaying %llu-%llu)",
 327                                 n, i->j.seq - 1, start, end);
 328
 329                for (k = i->j.start;
 330                     k < bset_bkey_last(&i->j);
 331                     k = bkey_next(k)) {
 332                        trace_bcache_journal_replay_key(k);
 333
 334                        bch_keylist_init_single(&keylist, k);
 335
 336                        ret = bch_btree_insert(s, &keylist, i->pin, NULL);
 337                        if (ret)
 338                                goto err;
 339
 340                        BUG_ON(!bch_keylist_empty(&keylist));
 341                        keys++;
 342
 343                        cond_resched();
 344                }
 345
 346                if (i->pin)
 347                        atomic_dec(i->pin);
 348                n = i->j.seq + 1;
 349                entries++;
 350        }
 351
 352        pr_info("journal replay done, %i keys in %i entries, seq %llu",
 353                keys, entries, end);
 354err:
 355        while (!list_empty(list)) {
 356                i = list_first_entry(list, struct journal_replay, list);
 357                list_del(&i->list);
 358                kfree(i);
 359        }
 360
 361        return ret;
 362}
 363
 364/* Journalling */
 365
 366static void btree_flush_write(struct cache_set *c)
 367{
 368        /*
 369         * Try to find the btree node with that references the oldest journal
 370         * entry, best is our current candidate and is locked if non NULL:
 371         */
 372        struct btree *b, *best;
 373        unsigned i;
 374retry:
 375        best = NULL;
 376
 377        for_each_cached_btree(b, c, i)
 378                if (btree_current_write(b)->journal) {
 379                        if (!best)
 380                                best = b;
 381                        else if (journal_pin_cmp(c,
 382                                        btree_current_write(best)->journal,
 383                                        btree_current_write(b)->journal)) {
 384                                best = b;
 385                        }
 386                }
 387
 388        b = best;
 389        if (b) {
 390                mutex_lock(&b->write_lock);
 391                if (!btree_current_write(b)->journal) {
 392                        mutex_unlock(&b->write_lock);
 393                        /* We raced */
 394                        goto retry;
 395                }
 396
 397                __bch_btree_node_write(b, NULL);
 398                mutex_unlock(&b->write_lock);
 399        }
 400}
 401
 402#define last_seq(j)     ((j)->seq - fifo_used(&(j)->pin) + 1)
 403
 404static void journal_discard_endio(struct bio *bio)
 405{
 406        struct journal_device *ja =
 407                container_of(bio, struct journal_device, discard_bio);
 408        struct cache *ca = container_of(ja, struct cache, journal);
 409
 410        atomic_set(&ja->discard_in_flight, DISCARD_DONE);
 411
 412        closure_wake_up(&ca->set->journal.wait);
 413        closure_put(&ca->set->cl);
 414}
 415
 416static void journal_discard_work(struct work_struct *work)
 417{
 418        struct journal_device *ja =
 419                container_of(work, struct journal_device, discard_work);
 420
 421        submit_bio(&ja->discard_bio);
 422}
 423
 424static void do_journal_discard(struct cache *ca)
 425{
 426        struct journal_device *ja = &ca->journal;
 427        struct bio *bio = &ja->discard_bio;
 428
 429        if (!ca->discard) {
 430                ja->discard_idx = ja->last_idx;
 431                return;
 432        }
 433
 434        switch (atomic_read(&ja->discard_in_flight)) {
 435        case DISCARD_IN_FLIGHT:
 436                return;
 437
 438        case DISCARD_DONE:
 439                ja->discard_idx = (ja->discard_idx + 1) %
 440                        ca->sb.njournal_buckets;
 441
 442                atomic_set(&ja->discard_in_flight, DISCARD_READY);
 443                /* fallthrough */
 444
 445        case DISCARD_READY:
 446                if (ja->discard_idx == ja->last_idx)
 447                        return;
 448
 449                atomic_set(&ja->discard_in_flight, DISCARD_IN_FLIGHT);
 450
 451                bio_init(bio);
 452                bio_set_op_attrs(bio, REQ_OP_DISCARD, 0);
 453                bio->bi_iter.bi_sector  = bucket_to_sector(ca->set,
 454                                                ca->sb.d[ja->discard_idx]);
 455                bio->bi_bdev            = ca->bdev;
 456                bio->bi_max_vecs        = 1;
 457                bio->bi_io_vec          = bio->bi_inline_vecs;
 458                bio->bi_iter.bi_size    = bucket_bytes(ca);
 459                bio->bi_end_io          = journal_discard_endio;
 460
 461                closure_get(&ca->set->cl);
 462                INIT_WORK(&ja->discard_work, journal_discard_work);
 463                schedule_work(&ja->discard_work);
 464        }
 465}
 466
 467static void journal_reclaim(struct cache_set *c)
 468{
 469        struct bkey *k = &c->journal.key;
 470        struct cache *ca;
 471        uint64_t last_seq;
 472        unsigned iter, n = 0;
 473        atomic_t p;
 474
 475        while (!atomic_read(&fifo_front(&c->journal.pin)))
 476                fifo_pop(&c->journal.pin, p);
 477
 478        last_seq = last_seq(&c->journal);
 479
 480        /* Update last_idx */
 481
 482        for_each_cache(ca, c, iter) {
 483                struct journal_device *ja = &ca->journal;
 484
 485                while (ja->last_idx != ja->cur_idx &&
 486                       ja->seq[ja->last_idx] < last_seq)
 487                        ja->last_idx = (ja->last_idx + 1) %
 488                                ca->sb.njournal_buckets;
 489        }
 490
 491        for_each_cache(ca, c, iter)
 492                do_journal_discard(ca);
 493
 494        if (c->journal.blocks_free)
 495                goto out;
 496
 497        /*
 498         * Allocate:
 499         * XXX: Sort by free journal space
 500         */
 501
 502        for_each_cache(ca, c, iter) {
 503                struct journal_device *ja = &ca->journal;
 504                unsigned next = (ja->cur_idx + 1) % ca->sb.njournal_buckets;
 505
 506                /* No space available on this device */
 507                if (next == ja->discard_idx)
 508                        continue;
 509
 510                ja->cur_idx = next;
 511                k->ptr[n++] = PTR(0,
 512                                  bucket_to_sector(c, ca->sb.d[ja->cur_idx]),
 513                                  ca->sb.nr_this_dev);
 514        }
 515
 516        bkey_init(k);
 517        SET_KEY_PTRS(k, n);
 518
 519        if (n)
 520                c->journal.blocks_free = c->sb.bucket_size >> c->block_bits;
 521out:
 522        if (!journal_full(&c->journal))
 523                __closure_wake_up(&c->journal.wait);
 524}
 525
 526void bch_journal_next(struct journal *j)
 527{
 528        atomic_t p = { 1 };
 529
 530        j->cur = (j->cur == j->w)
 531                ? &j->w[1]
 532                : &j->w[0];
 533
 534        /*
 535         * The fifo_push() needs to happen at the same time as j->seq is
 536         * incremented for last_seq() to be calculated correctly
 537         */
 538        BUG_ON(!fifo_push(&j->pin, p));
 539        atomic_set(&fifo_back(&j->pin), 1);
 540
 541        j->cur->data->seq       = ++j->seq;
 542        j->cur->dirty           = false;
 543        j->cur->need_write      = false;
 544        j->cur->data->keys      = 0;
 545
 546        if (fifo_full(&j->pin))
 547                pr_debug("journal_pin full (%zu)", fifo_used(&j->pin));
 548}
 549
 550static void journal_write_endio(struct bio *bio)
 551{
 552        struct journal_write *w = bio->bi_private;
 553
 554        cache_set_err_on(bio->bi_error, w->c, "journal io error");
 555        closure_put(&w->c->journal.io);
 556}
 557
 558static void journal_write(struct closure *);
 559
 560static void journal_write_done(struct closure *cl)
 561{
 562        struct journal *j = container_of(cl, struct journal, io);
 563        struct journal_write *w = (j->cur == j->w)
 564                ? &j->w[1]
 565                : &j->w[0];
 566
 567        __closure_wake_up(&w->wait);
 568        continue_at_nobarrier(cl, journal_write, system_wq);
 569}
 570
 571static void journal_write_unlock(struct closure *cl)
 572{
 573        struct cache_set *c = container_of(cl, struct cache_set, journal.io);
 574
 575        c->journal.io_in_flight = 0;
 576        spin_unlock(&c->journal.lock);
 577}
 578
 579static void journal_write_unlocked(struct closure *cl)
 580        __releases(c->journal.lock)
 581{
 582        struct cache_set *c = container_of(cl, struct cache_set, journal.io);
 583        struct cache *ca;
 584        struct journal_write *w = c->journal.cur;
 585        struct bkey *k = &c->journal.key;
 586        unsigned i, sectors = set_blocks(w->data, block_bytes(c)) *
 587                c->sb.block_size;
 588
 589        struct bio *bio;
 590        struct bio_list list;
 591        bio_list_init(&list);
 592
 593        if (!w->need_write) {
 594                closure_return_with_destructor(cl, journal_write_unlock);
 595                return;
 596        } else if (journal_full(&c->journal)) {
 597                journal_reclaim(c);
 598                spin_unlock(&c->journal.lock);
 599
 600                btree_flush_write(c);
 601                continue_at(cl, journal_write, system_wq);
 602                return;
 603        }
 604
 605        c->journal.blocks_free -= set_blocks(w->data, block_bytes(c));
 606
 607        w->data->btree_level = c->root->level;
 608
 609        bkey_copy(&w->data->btree_root, &c->root->key);
 610        bkey_copy(&w->data->uuid_bucket, &c->uuid_bucket);
 611
 612        for_each_cache(ca, c, i)
 613                w->data->prio_bucket[ca->sb.nr_this_dev] = ca->prio_buckets[0];
 614
 615        w->data->magic          = jset_magic(&c->sb);
 616        w->data->version        = BCACHE_JSET_VERSION;
 617        w->data->last_seq       = last_seq(&c->journal);
 618        w->data->csum           = csum_set(w->data);
 619
 620        for (i = 0; i < KEY_PTRS(k); i++) {
 621                ca = PTR_CACHE(c, k, i);
 622                bio = &ca->journal.bio;
 623
 624                atomic_long_add(sectors, &ca->meta_sectors_written);
 625
 626                bio_reset(bio);
 627                bio->bi_iter.bi_sector  = PTR_OFFSET(k, i);
 628                bio->bi_bdev    = ca->bdev;
 629                bio->bi_iter.bi_size = sectors << 9;
 630
 631                bio->bi_end_io  = journal_write_endio;
 632                bio->bi_private = w;
 633                bio_set_op_attrs(bio, REQ_OP_WRITE,
 634                                 REQ_SYNC|REQ_META|REQ_PREFLUSH|REQ_FUA);
 635                bch_bio_map(bio, w->data);
 636
 637                trace_bcache_journal_write(bio);
 638                bio_list_add(&list, bio);
 639
 640                SET_PTR_OFFSET(k, i, PTR_OFFSET(k, i) + sectors);
 641
 642                ca->journal.seq[ca->journal.cur_idx] = w->data->seq;
 643        }
 644
 645        atomic_dec_bug(&fifo_back(&c->journal.pin));
 646        bch_journal_next(&c->journal);
 647        journal_reclaim(c);
 648
 649        spin_unlock(&c->journal.lock);
 650
 651        while ((bio = bio_list_pop(&list)))
 652                closure_bio_submit(bio, cl);
 653
 654        continue_at(cl, journal_write_done, NULL);
 655}
 656
 657static void journal_write(struct closure *cl)
 658{
 659        struct cache_set *c = container_of(cl, struct cache_set, journal.io);
 660
 661        spin_lock(&c->journal.lock);
 662        journal_write_unlocked(cl);
 663}
 664
 665static void journal_try_write(struct cache_set *c)
 666        __releases(c->journal.lock)
 667{
 668        struct closure *cl = &c->journal.io;
 669        struct journal_write *w = c->journal.cur;
 670
 671        w->need_write = true;
 672
 673        if (!c->journal.io_in_flight) {
 674                c->journal.io_in_flight = 1;
 675                closure_call(cl, journal_write_unlocked, NULL, &c->cl);
 676        } else {
 677                spin_unlock(&c->journal.lock);
 678        }
 679}
 680
 681static struct journal_write *journal_wait_for_write(struct cache_set *c,
 682                                                    unsigned nkeys)
 683{
 684        size_t sectors;
 685        struct closure cl;
 686        bool wait = false;
 687
 688        closure_init_stack(&cl);
 689
 690        spin_lock(&c->journal.lock);
 691
 692        while (1) {
 693                struct journal_write *w = c->journal.cur;
 694
 695                sectors = __set_blocks(w->data, w->data->keys + nkeys,
 696                                       block_bytes(c)) * c->sb.block_size;
 697
 698                if (sectors <= min_t(size_t,
 699                                     c->journal.blocks_free * c->sb.block_size,
 700                                     PAGE_SECTORS << JSET_BITS))
 701                        return w;
 702
 703                if (wait)
 704                        closure_wait(&c->journal.wait, &cl);
 705
 706                if (!journal_full(&c->journal)) {
 707                        if (wait)
 708                                trace_bcache_journal_entry_full(c);
 709
 710                        /*
 711                         * XXX: If we were inserting so many keys that they
 712                         * won't fit in an _empty_ journal write, we'll
 713                         * deadlock. For now, handle this in
 714                         * bch_keylist_realloc() - but something to think about.
 715                         */
 716                        BUG_ON(!w->data->keys);
 717
 718                        journal_try_write(c); /* unlocks */
 719                } else {
 720                        if (wait)
 721                                trace_bcache_journal_full(c);
 722
 723                        journal_reclaim(c);
 724                        spin_unlock(&c->journal.lock);
 725
 726                        btree_flush_write(c);
 727                }
 728
 729                closure_sync(&cl);
 730                spin_lock(&c->journal.lock);
 731                wait = true;
 732        }
 733}
 734
 735static void journal_write_work(struct work_struct *work)
 736{
 737        struct cache_set *c = container_of(to_delayed_work(work),
 738                                           struct cache_set,
 739                                           journal.work);
 740        spin_lock(&c->journal.lock);
 741        if (c->journal.cur->dirty)
 742                journal_try_write(c);
 743        else
 744                spin_unlock(&c->journal.lock);
 745}
 746
 747/*
 748 * Entry point to the journalling code - bio_insert() and btree_invalidate()
 749 * pass bch_journal() a list of keys to be journalled, and then
 750 * bch_journal() hands those same keys off to btree_insert_async()
 751 */
 752
 753atomic_t *bch_journal(struct cache_set *c,
 754                      struct keylist *keys,
 755                      struct closure *parent)
 756{
 757        struct journal_write *w;
 758        atomic_t *ret;
 759
 760        if (!CACHE_SYNC(&c->sb))
 761                return NULL;
 762
 763        w = journal_wait_for_write(c, bch_keylist_nkeys(keys));
 764
 765        memcpy(bset_bkey_last(w->data), keys->keys, bch_keylist_bytes(keys));
 766        w->data->keys += bch_keylist_nkeys(keys);
 767
 768        ret = &fifo_back(&c->journal.pin);
 769        atomic_inc(ret);
 770
 771        if (parent) {
 772                closure_wait(&w->wait, parent);
 773                journal_try_write(c);
 774        } else if (!w->dirty) {
 775                w->dirty = true;
 776                schedule_delayed_work(&c->journal.work,
 777                                      msecs_to_jiffies(c->journal_delay_ms));
 778                spin_unlock(&c->journal.lock);
 779        } else {
 780                spin_unlock(&c->journal.lock);
 781        }
 782
 783
 784        return ret;
 785}
 786
 787void bch_journal_meta(struct cache_set *c, struct closure *cl)
 788{
 789        struct keylist keys;
 790        atomic_t *ref;
 791
 792        bch_keylist_init(&keys);
 793
 794        ref = bch_journal(c, &keys, cl);
 795        if (ref)
 796                atomic_dec_bug(ref);
 797}
 798
 799void bch_journal_free(struct cache_set *c)
 800{
 801        free_pages((unsigned long) c->journal.w[1].data, JSET_BITS);
 802        free_pages((unsigned long) c->journal.w[0].data, JSET_BITS);
 803        free_fifo(&c->journal.pin);
 804}
 805
 806int bch_journal_alloc(struct cache_set *c)
 807{
 808        struct journal *j = &c->journal;
 809
 810        spin_lock_init(&j->lock);
 811        INIT_DELAYED_WORK(&j->work, journal_write_work);
 812
 813        c->journal_delay_ms = 100;
 814
 815        j->w[0].c = c;
 816        j->w[1].c = c;
 817
 818        if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
 819            !(j->w[0].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS)) ||
 820            !(j->w[1].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS)))
 821                return -ENOMEM;
 822
 823        return 0;
 824}
 825