linux/drivers/md/bcache/journal.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * bcache journalling code, for btree insertions
   4 *
   5 * Copyright 2012 Google, Inc.
   6 */
   7
   8#include "bcache.h"
   9#include "btree.h"
  10#include "debug.h"
  11#include "extents.h"
  12
  13#include <trace/events/bcache.h>
  14
  15/*
  16 * Journal replay/recovery:
  17 *
  18 * This code is all driven from run_cache_set(); we first read the journal
  19 * entries, do some other stuff, then we mark all the keys in the journal
  20 * entries (same as garbage collection would), then we replay them - reinserting
  21 * them into the cache in precisely the same order as they appear in the
  22 * journal.
  23 *
  24 * We only journal keys that go in leaf nodes, which simplifies things quite a
  25 * bit.
  26 */
  27
  28static void journal_read_endio(struct bio *bio)
  29{
  30        struct closure *cl = bio->bi_private;
  31        closure_put(cl);
  32}
  33
  34static int journal_read_bucket(struct cache *ca, struct list_head *list,
  35                               unsigned bucket_index)
  36{
  37        struct journal_device *ja = &ca->journal;
  38        struct bio *bio = &ja->bio;
  39
  40        struct journal_replay *i;
  41        struct jset *j, *data = ca->set->journal.w[0].data;
  42        struct closure cl;
  43        unsigned len, left, offset = 0;
  44        int ret = 0;
  45        sector_t bucket = bucket_to_sector(ca->set, ca->sb.d[bucket_index]);
  46
  47        closure_init_stack(&cl);
  48
  49        pr_debug("reading %u", bucket_index);
  50
  51        while (offset < ca->sb.bucket_size) {
  52reread:         left = ca->sb.bucket_size - offset;
  53                len = min_t(unsigned, left, PAGE_SECTORS << JSET_BITS);
  54
  55                bio_reset(bio);
  56                bio->bi_iter.bi_sector  = bucket + offset;
  57                bio_set_dev(bio, ca->bdev);
  58                bio->bi_iter.bi_size    = len << 9;
  59
  60                bio->bi_end_io  = journal_read_endio;
  61                bio->bi_private = &cl;
  62                bio_set_op_attrs(bio, REQ_OP_READ, 0);
  63                bch_bio_map(bio, data);
  64
  65                closure_bio_submit(ca->set, bio, &cl);
  66                closure_sync(&cl);
  67
  68                /* This function could be simpler now since we no longer write
  69                 * journal entries that overlap bucket boundaries; this means
  70                 * the start of a bucket will always have a valid journal entry
  71                 * if it has any journal entries at all.
  72                 */
  73
  74                j = data;
  75                while (len) {
  76                        struct list_head *where;
  77                        size_t blocks, bytes = set_bytes(j);
  78
  79                        if (j->magic != jset_magic(&ca->sb)) {
  80                                pr_debug("%u: bad magic", bucket_index);
  81                                return ret;
  82                        }
  83
  84                        if (bytes > left << 9 ||
  85                            bytes > PAGE_SIZE << JSET_BITS) {
  86                                pr_info("%u: too big, %zu bytes, offset %u",
  87                                        bucket_index, bytes, offset);
  88                                return ret;
  89                        }
  90
  91                        if (bytes > len << 9)
  92                                goto reread;
  93
  94                        if (j->csum != csum_set(j)) {
  95                                pr_info("%u: bad csum, %zu bytes, offset %u",
  96                                        bucket_index, bytes, offset);
  97                                return ret;
  98                        }
  99
 100                        blocks = set_blocks(j, block_bytes(ca->set));
 101
 102                        while (!list_empty(list)) {
 103                                i = list_first_entry(list,
 104                                        struct journal_replay, list);
 105                                if (i->j.seq >= j->last_seq)
 106                                        break;
 107                                list_del(&i->list);
 108                                kfree(i);
 109                        }
 110
 111                        list_for_each_entry_reverse(i, list, list) {
 112                                if (j->seq == i->j.seq)
 113                                        goto next_set;
 114
 115                                if (j->seq < i->j.last_seq)
 116                                        goto next_set;
 117
 118                                if (j->seq > i->j.seq) {
 119                                        where = &i->list;
 120                                        goto add;
 121                                }
 122                        }
 123
 124                        where = list;
 125add:
 126                        i = kmalloc(offsetof(struct journal_replay, j) +
 127                                    bytes, GFP_KERNEL);
 128                        if (!i)
 129                                return -ENOMEM;
 130                        memcpy(&i->j, j, bytes);
 131                        list_add(&i->list, where);
 132                        ret = 1;
 133
 134                        ja->seq[bucket_index] = j->seq;
 135next_set:
 136                        offset  += blocks * ca->sb.block_size;
 137                        len     -= blocks * ca->sb.block_size;
 138                        j = ((void *) j) + blocks * block_bytes(ca);
 139                }
 140        }
 141
 142        return ret;
 143}
 144
 145int bch_journal_read(struct cache_set *c, struct list_head *list)
 146{
 147#define read_bucket(b)                                                  \
 148        ({                                                              \
 149                int ret = journal_read_bucket(ca, list, b);             \
 150                __set_bit(b, bitmap);                                   \
 151                if (ret < 0)                                            \
 152                        return ret;                                     \
 153                ret;                                                    \
 154        })
 155
 156        struct cache *ca;
 157        unsigned iter;
 158
 159        for_each_cache(ca, c, iter) {
 160                struct journal_device *ja = &ca->journal;
 161                DECLARE_BITMAP(bitmap, SB_JOURNAL_BUCKETS);
 162                unsigned i, l, r, m;
 163                uint64_t seq;
 164
 165                bitmap_zero(bitmap, SB_JOURNAL_BUCKETS);
 166                pr_debug("%u journal buckets", ca->sb.njournal_buckets);
 167
 168                /*
 169                 * Read journal buckets ordered by golden ratio hash to quickly
 170                 * find a sequence of buckets with valid journal entries
 171                 */
 172                for (i = 0; i < ca->sb.njournal_buckets; i++) {
 173                        /*
 174                         * We must try the index l with ZERO first for
 175                         * correctness due to the scenario that the journal
 176                         * bucket is circular buffer which might have wrapped
 177                         */
 178                        l = (i * 2654435769U) % ca->sb.njournal_buckets;
 179
 180                        if (test_bit(l, bitmap))
 181                                break;
 182
 183                        if (read_bucket(l))
 184                                goto bsearch;
 185                }
 186
 187                /*
 188                 * If that fails, check all the buckets we haven't checked
 189                 * already
 190                 */
 191                pr_debug("falling back to linear search");
 192
 193                for (l = find_first_zero_bit(bitmap, ca->sb.njournal_buckets);
 194                     l < ca->sb.njournal_buckets;
 195                     l = find_next_zero_bit(bitmap, ca->sb.njournal_buckets, l + 1))
 196                        if (read_bucket(l))
 197                                goto bsearch;
 198
 199                /* no journal entries on this device? */
 200                if (l == ca->sb.njournal_buckets)
 201                        continue;
 202bsearch:
 203                BUG_ON(list_empty(list));
 204
 205                /* Binary search */
 206                m = l;
 207                r = find_next_bit(bitmap, ca->sb.njournal_buckets, l + 1);
 208                pr_debug("starting binary search, l %u r %u", l, r);
 209
 210                while (l + 1 < r) {
 211                        seq = list_entry(list->prev, struct journal_replay,
 212                                         list)->j.seq;
 213
 214                        m = (l + r) >> 1;
 215                        read_bucket(m);
 216
 217                        if (seq != list_entry(list->prev, struct journal_replay,
 218                                              list)->j.seq)
 219                                l = m;
 220                        else
 221                                r = m;
 222                }
 223
 224                /*
 225                 * Read buckets in reverse order until we stop finding more
 226                 * journal entries
 227                 */
 228                pr_debug("finishing up: m %u njournal_buckets %u",
 229                         m, ca->sb.njournal_buckets);
 230                l = m;
 231
 232                while (1) {
 233                        if (!l--)
 234                                l = ca->sb.njournal_buckets - 1;
 235
 236                        if (l == m)
 237                                break;
 238
 239                        if (test_bit(l, bitmap))
 240                                continue;
 241
 242                        if (!read_bucket(l))
 243                                break;
 244                }
 245
 246                seq = 0;
 247
 248                for (i = 0; i < ca->sb.njournal_buckets; i++)
 249                        if (ja->seq[i] > seq) {
 250                                seq = ja->seq[i];
 251                                /*
 252                                 * When journal_reclaim() goes to allocate for
 253                                 * the first time, it'll use the bucket after
 254                                 * ja->cur_idx
 255                                 */
 256                                ja->cur_idx = i;
 257                                ja->last_idx = ja->discard_idx = (i + 1) %
 258                                        ca->sb.njournal_buckets;
 259
 260                        }
 261        }
 262
 263        if (!list_empty(list))
 264                c->journal.seq = list_entry(list->prev,
 265                                            struct journal_replay,
 266                                            list)->j.seq;
 267
 268        return 0;
 269#undef read_bucket
 270}
 271
 272void bch_journal_mark(struct cache_set *c, struct list_head *list)
 273{
 274        atomic_t p = { 0 };
 275        struct bkey *k;
 276        struct journal_replay *i;
 277        struct journal *j = &c->journal;
 278        uint64_t last = j->seq;
 279
 280        /*
 281         * journal.pin should never fill up - we never write a journal
 282         * entry when it would fill up. But if for some reason it does, we
 283         * iterate over the list in reverse order so that we can just skip that
 284         * refcount instead of bugging.
 285         */
 286
 287        list_for_each_entry_reverse(i, list, list) {
 288                BUG_ON(last < i->j.seq);
 289                i->pin = NULL;
 290
 291                while (last-- != i->j.seq)
 292                        if (fifo_free(&j->pin) > 1) {
 293                                fifo_push_front(&j->pin, p);
 294                                atomic_set(&fifo_front(&j->pin), 0);
 295                        }
 296
 297                if (fifo_free(&j->pin) > 1) {
 298                        fifo_push_front(&j->pin, p);
 299                        i->pin = &fifo_front(&j->pin);
 300                        atomic_set(i->pin, 1);
 301                }
 302
 303                for (k = i->j.start;
 304                     k < bset_bkey_last(&i->j);
 305                     k = bkey_next(k))
 306                        if (!__bch_extent_invalid(c, k)) {
 307                                unsigned j;
 308
 309                                for (j = 0; j < KEY_PTRS(k); j++)
 310                                        if (ptr_available(c, k, j))
 311                                                atomic_inc(&PTR_BUCKET(c, k, j)->pin);
 312
 313                                bch_initial_mark_key(c, 0, k);
 314                        }
 315        }
 316}
 317
 318int bch_journal_replay(struct cache_set *s, struct list_head *list)
 319{
 320        int ret = 0, keys = 0, entries = 0;
 321        struct bkey *k;
 322        struct journal_replay *i =
 323                list_entry(list->prev, struct journal_replay, list);
 324
 325        uint64_t start = i->j.last_seq, end = i->j.seq, n = start;
 326        struct keylist keylist;
 327
 328        list_for_each_entry(i, list, list) {
 329                BUG_ON(i->pin && atomic_read(i->pin) != 1);
 330
 331                cache_set_err_on(n != i->j.seq, s,
 332"bcache: journal entries %llu-%llu missing! (replaying %llu-%llu)",
 333                                 n, i->j.seq - 1, start, end);
 334
 335                for (k = i->j.start;
 336                     k < bset_bkey_last(&i->j);
 337                     k = bkey_next(k)) {
 338                        trace_bcache_journal_replay_key(k);
 339
 340                        bch_keylist_init_single(&keylist, k);
 341
 342                        ret = bch_btree_insert(s, &keylist, i->pin, NULL);
 343                        if (ret)
 344                                goto err;
 345
 346                        BUG_ON(!bch_keylist_empty(&keylist));
 347                        keys++;
 348
 349                        cond_resched();
 350                }
 351
 352                if (i->pin)
 353                        atomic_dec(i->pin);
 354                n = i->j.seq + 1;
 355                entries++;
 356        }
 357
 358        pr_info("journal replay done, %i keys in %i entries, seq %llu",
 359                keys, entries, end);
 360err:
 361        while (!list_empty(list)) {
 362                i = list_first_entry(list, struct journal_replay, list);
 363                list_del(&i->list);
 364                kfree(i);
 365        }
 366
 367        return ret;
 368}
 369
 370/* Journalling */
 371#define journal_max_cmp(l, r) \
 372        (fifo_idx(&c->journal.pin, btree_current_write(l)->journal) < \
 373         fifo_idx(&(c)->journal.pin, btree_current_write(r)->journal))
 374#define journal_min_cmp(l, r) \
 375        (fifo_idx(&c->journal.pin, btree_current_write(l)->journal) > \
 376         fifo_idx(&(c)->journal.pin, btree_current_write(r)->journal))
 377
 378static void btree_flush_write(struct cache_set *c)
 379{
 380        /*
 381         * Try to find the btree node with that references the oldest journal
 382         * entry, best is our current candidate and is locked if non NULL:
 383         */
 384        struct btree *b;
 385        int i;
 386
 387        atomic_long_inc(&c->flush_write);
 388
 389retry:
 390        spin_lock(&c->journal.lock);
 391        if (heap_empty(&c->flush_btree)) {
 392                for_each_cached_btree(b, c, i)
 393                        if (btree_current_write(b)->journal) {
 394                                if (!heap_full(&c->flush_btree))
 395                                        heap_add(&c->flush_btree, b,
 396                                                 journal_max_cmp);
 397                                else if (journal_max_cmp(b,
 398                                         heap_peek(&c->flush_btree))) {
 399                                        c->flush_btree.data[0] = b;
 400                                        heap_sift(&c->flush_btree, 0,
 401                                                  journal_max_cmp);
 402                                }
 403                        }
 404
 405                for (i = c->flush_btree.used / 2 - 1; i >= 0; --i)
 406                        heap_sift(&c->flush_btree, i, journal_min_cmp);
 407        }
 408
 409        b = NULL;
 410        heap_pop(&c->flush_btree, b, journal_min_cmp);
 411        spin_unlock(&c->journal.lock);
 412
 413        if (b) {
 414                mutex_lock(&b->write_lock);
 415                if (!btree_current_write(b)->journal) {
 416                        mutex_unlock(&b->write_lock);
 417                        /* We raced */
 418                        atomic_long_inc(&c->retry_flush_write);
 419                        goto retry;
 420                }
 421
 422                __bch_btree_node_write(b, NULL);
 423                mutex_unlock(&b->write_lock);
 424        }
 425}
 426
 427#define last_seq(j)     ((j)->seq - fifo_used(&(j)->pin) + 1)
 428
 429static void journal_discard_endio(struct bio *bio)
 430{
 431        struct journal_device *ja =
 432                container_of(bio, struct journal_device, discard_bio);
 433        struct cache *ca = container_of(ja, struct cache, journal);
 434
 435        atomic_set(&ja->discard_in_flight, DISCARD_DONE);
 436
 437        closure_wake_up(&ca->set->journal.wait);
 438        closure_put(&ca->set->cl);
 439}
 440
 441static void journal_discard_work(struct work_struct *work)
 442{
 443        struct journal_device *ja =
 444                container_of(work, struct journal_device, discard_work);
 445
 446        submit_bio(&ja->discard_bio);
 447}
 448
 449static void do_journal_discard(struct cache *ca)
 450{
 451        struct journal_device *ja = &ca->journal;
 452        struct bio *bio = &ja->discard_bio;
 453
 454        if (!ca->discard) {
 455                ja->discard_idx = ja->last_idx;
 456                return;
 457        }
 458
 459        switch (atomic_read(&ja->discard_in_flight)) {
 460        case DISCARD_IN_FLIGHT:
 461                return;
 462
 463        case DISCARD_DONE:
 464                ja->discard_idx = (ja->discard_idx + 1) %
 465                        ca->sb.njournal_buckets;
 466
 467                atomic_set(&ja->discard_in_flight, DISCARD_READY);
 468                /* fallthrough */
 469
 470        case DISCARD_READY:
 471                if (ja->discard_idx == ja->last_idx)
 472                        return;
 473
 474                atomic_set(&ja->discard_in_flight, DISCARD_IN_FLIGHT);
 475
 476                bio_init(bio, bio->bi_inline_vecs, 1);
 477                bio_set_op_attrs(bio, REQ_OP_DISCARD, 0);
 478                bio->bi_iter.bi_sector  = bucket_to_sector(ca->set,
 479                                                ca->sb.d[ja->discard_idx]);
 480                bio_set_dev(bio, ca->bdev);
 481                bio->bi_iter.bi_size    = bucket_bytes(ca);
 482                bio->bi_end_io          = journal_discard_endio;
 483
 484                closure_get(&ca->set->cl);
 485                INIT_WORK(&ja->discard_work, journal_discard_work);
 486                schedule_work(&ja->discard_work);
 487        }
 488}
 489
 490static void journal_reclaim(struct cache_set *c)
 491{
 492        struct bkey *k = &c->journal.key;
 493        struct cache *ca;
 494        uint64_t last_seq;
 495        unsigned iter, n = 0;
 496        atomic_t p __maybe_unused;
 497
 498        atomic_long_inc(&c->reclaim);
 499
 500        while (!atomic_read(&fifo_front(&c->journal.pin)))
 501                fifo_pop(&c->journal.pin, p);
 502
 503        last_seq = last_seq(&c->journal);
 504
 505        /* Update last_idx */
 506
 507        for_each_cache(ca, c, iter) {
 508                struct journal_device *ja = &ca->journal;
 509
 510                while (ja->last_idx != ja->cur_idx &&
 511                       ja->seq[ja->last_idx] < last_seq)
 512                        ja->last_idx = (ja->last_idx + 1) %
 513                                ca->sb.njournal_buckets;
 514        }
 515
 516        for_each_cache(ca, c, iter)
 517                do_journal_discard(ca);
 518
 519        if (c->journal.blocks_free)
 520                goto out;
 521
 522        /*
 523         * Allocate:
 524         * XXX: Sort by free journal space
 525         */
 526
 527        for_each_cache(ca, c, iter) {
 528                struct journal_device *ja = &ca->journal;
 529                unsigned next = (ja->cur_idx + 1) % ca->sb.njournal_buckets;
 530
 531                /* No space available on this device */
 532                if (next == ja->discard_idx)
 533                        continue;
 534
 535                ja->cur_idx = next;
 536                k->ptr[n++] = MAKE_PTR(0,
 537                                  bucket_to_sector(c, ca->sb.d[ja->cur_idx]),
 538                                  ca->sb.nr_this_dev);
 539        }
 540
 541        bkey_init(k);
 542        SET_KEY_PTRS(k, n);
 543
 544        if (n)
 545                c->journal.blocks_free = c->sb.bucket_size >> c->block_bits;
 546out:
 547        if (!journal_full(&c->journal))
 548                __closure_wake_up(&c->journal.wait);
 549}
 550
 551void bch_journal_next(struct journal *j)
 552{
 553        atomic_t p = { 1 };
 554
 555        j->cur = (j->cur == j->w)
 556                ? &j->w[1]
 557                : &j->w[0];
 558
 559        /*
 560         * The fifo_push() needs to happen at the same time as j->seq is
 561         * incremented for last_seq() to be calculated correctly
 562         */
 563        BUG_ON(!fifo_push(&j->pin, p));
 564        atomic_set(&fifo_back(&j->pin), 1);
 565
 566        j->cur->data->seq       = ++j->seq;
 567        j->cur->dirty           = false;
 568        j->cur->need_write      = false;
 569        j->cur->data->keys      = 0;
 570
 571        if (fifo_full(&j->pin))
 572                pr_debug("journal_pin full (%zu)", fifo_used(&j->pin));
 573}
 574
 575static void journal_write_endio(struct bio *bio)
 576{
 577        struct journal_write *w = bio->bi_private;
 578
 579        cache_set_err_on(bio->bi_status, w->c, "journal io error");
 580        closure_put(&w->c->journal.io);
 581}
 582
 583static void journal_write(struct closure *);
 584
 585static void journal_write_done(struct closure *cl)
 586{
 587        struct journal *j = container_of(cl, struct journal, io);
 588        struct journal_write *w = (j->cur == j->w)
 589                ? &j->w[1]
 590                : &j->w[0];
 591
 592        __closure_wake_up(&w->wait);
 593        continue_at_nobarrier(cl, journal_write, system_wq);
 594}
 595
 596static void journal_write_unlock(struct closure *cl)
 597        __releases(&c->journal.lock)
 598{
 599        struct cache_set *c = container_of(cl, struct cache_set, journal.io);
 600
 601        c->journal.io_in_flight = 0;
 602        spin_unlock(&c->journal.lock);
 603}
 604
 605static void journal_write_unlocked(struct closure *cl)
 606        __releases(c->journal.lock)
 607{
 608        struct cache_set *c = container_of(cl, struct cache_set, journal.io);
 609        struct cache *ca;
 610        struct journal_write *w = c->journal.cur;
 611        struct bkey *k = &c->journal.key;
 612        unsigned i, sectors = set_blocks(w->data, block_bytes(c)) *
 613                c->sb.block_size;
 614
 615        struct bio *bio;
 616        struct bio_list list;
 617        bio_list_init(&list);
 618
 619        if (!w->need_write) {
 620                closure_return_with_destructor(cl, journal_write_unlock);
 621                return;
 622        } else if (journal_full(&c->journal)) {
 623                journal_reclaim(c);
 624                spin_unlock(&c->journal.lock);
 625
 626                btree_flush_write(c);
 627                continue_at(cl, journal_write, system_wq);
 628                return;
 629        }
 630
 631        c->journal.blocks_free -= set_blocks(w->data, block_bytes(c));
 632
 633        w->data->btree_level = c->root->level;
 634
 635        bkey_copy(&w->data->btree_root, &c->root->key);
 636        bkey_copy(&w->data->uuid_bucket, &c->uuid_bucket);
 637
 638        for_each_cache(ca, c, i)
 639                w->data->prio_bucket[ca->sb.nr_this_dev] = ca->prio_buckets[0];
 640
 641        w->data->magic          = jset_magic(&c->sb);
 642        w->data->version        = BCACHE_JSET_VERSION;
 643        w->data->last_seq       = last_seq(&c->journal);
 644        w->data->csum           = csum_set(w->data);
 645
 646        for (i = 0; i < KEY_PTRS(k); i++) {
 647                ca = PTR_CACHE(c, k, i);
 648                bio = &ca->journal.bio;
 649
 650                atomic_long_add(sectors, &ca->meta_sectors_written);
 651
 652                bio_reset(bio);
 653                bio->bi_iter.bi_sector  = PTR_OFFSET(k, i);
 654                bio_set_dev(bio, ca->bdev);
 655                bio->bi_iter.bi_size = sectors << 9;
 656
 657                bio->bi_end_io  = journal_write_endio;
 658                bio->bi_private = w;
 659                bio_set_op_attrs(bio, REQ_OP_WRITE,
 660                                 REQ_SYNC|REQ_META|REQ_PREFLUSH|REQ_FUA);
 661                bch_bio_map(bio, w->data);
 662
 663                trace_bcache_journal_write(bio);
 664                bio_list_add(&list, bio);
 665
 666                SET_PTR_OFFSET(k, i, PTR_OFFSET(k, i) + sectors);
 667
 668                ca->journal.seq[ca->journal.cur_idx] = w->data->seq;
 669        }
 670
 671        atomic_dec_bug(&fifo_back(&c->journal.pin));
 672        bch_journal_next(&c->journal);
 673        journal_reclaim(c);
 674
 675        spin_unlock(&c->journal.lock);
 676
 677        while ((bio = bio_list_pop(&list)))
 678                closure_bio_submit(c, bio, cl);
 679
 680        continue_at(cl, journal_write_done, NULL);
 681}
 682
 683static void journal_write(struct closure *cl)
 684{
 685        struct cache_set *c = container_of(cl, struct cache_set, journal.io);
 686
 687        spin_lock(&c->journal.lock);
 688        journal_write_unlocked(cl);
 689}
 690
 691static void journal_try_write(struct cache_set *c)
 692        __releases(c->journal.lock)
 693{
 694        struct closure *cl = &c->journal.io;
 695        struct journal_write *w = c->journal.cur;
 696
 697        w->need_write = true;
 698
 699        if (!c->journal.io_in_flight) {
 700                c->journal.io_in_flight = 1;
 701                closure_call(cl, journal_write_unlocked, NULL, &c->cl);
 702        } else {
 703                spin_unlock(&c->journal.lock);
 704        }
 705}
 706
 707static struct journal_write *journal_wait_for_write(struct cache_set *c,
 708                                                    unsigned nkeys)
 709        __acquires(&c->journal.lock)
 710{
 711        size_t sectors;
 712        struct closure cl;
 713        bool wait = false;
 714
 715        closure_init_stack(&cl);
 716
 717        spin_lock(&c->journal.lock);
 718
 719        while (1) {
 720                struct journal_write *w = c->journal.cur;
 721
 722                sectors = __set_blocks(w->data, w->data->keys + nkeys,
 723                                       block_bytes(c)) * c->sb.block_size;
 724
 725                if (sectors <= min_t(size_t,
 726                                     c->journal.blocks_free * c->sb.block_size,
 727                                     PAGE_SECTORS << JSET_BITS))
 728                        return w;
 729
 730                if (wait)
 731                        closure_wait(&c->journal.wait, &cl);
 732
 733                if (!journal_full(&c->journal)) {
 734                        if (wait)
 735                                trace_bcache_journal_entry_full(c);
 736
 737                        /*
 738                         * XXX: If we were inserting so many keys that they
 739                         * won't fit in an _empty_ journal write, we'll
 740                         * deadlock. For now, handle this in
 741                         * bch_keylist_realloc() - but something to think about.
 742                         */
 743                        BUG_ON(!w->data->keys);
 744
 745                        journal_try_write(c); /* unlocks */
 746                } else {
 747                        if (wait)
 748                                trace_bcache_journal_full(c);
 749
 750                        journal_reclaim(c);
 751                        spin_unlock(&c->journal.lock);
 752
 753                        btree_flush_write(c);
 754                }
 755
 756                closure_sync(&cl);
 757                spin_lock(&c->journal.lock);
 758                wait = true;
 759        }
 760}
 761
 762static void journal_write_work(struct work_struct *work)
 763{
 764        struct cache_set *c = container_of(to_delayed_work(work),
 765                                           struct cache_set,
 766                                           journal.work);
 767        spin_lock(&c->journal.lock);
 768        if (c->journal.cur->dirty)
 769                journal_try_write(c);
 770        else
 771                spin_unlock(&c->journal.lock);
 772}
 773
 774/*
 775 * Entry point to the journalling code - bio_insert() and btree_invalidate()
 776 * pass bch_journal() a list of keys to be journalled, and then
 777 * bch_journal() hands those same keys off to btree_insert_async()
 778 */
 779
 780atomic_t *bch_journal(struct cache_set *c,
 781                      struct keylist *keys,
 782                      struct closure *parent)
 783{
 784        struct journal_write *w;
 785        atomic_t *ret;
 786
 787        if (!CACHE_SYNC(&c->sb))
 788                return NULL;
 789
 790        w = journal_wait_for_write(c, bch_keylist_nkeys(keys));
 791
 792        memcpy(bset_bkey_last(w->data), keys->keys, bch_keylist_bytes(keys));
 793        w->data->keys += bch_keylist_nkeys(keys);
 794
 795        ret = &fifo_back(&c->journal.pin);
 796        atomic_inc(ret);
 797
 798        if (parent) {
 799                closure_wait(&w->wait, parent);
 800                journal_try_write(c);
 801        } else if (!w->dirty) {
 802                w->dirty = true;
 803                schedule_delayed_work(&c->journal.work,
 804                                      msecs_to_jiffies(c->journal_delay_ms));
 805                spin_unlock(&c->journal.lock);
 806        } else {
 807                spin_unlock(&c->journal.lock);
 808        }
 809
 810
 811        return ret;
 812}
 813
 814void bch_journal_meta(struct cache_set *c, struct closure *cl)
 815{
 816        struct keylist keys;
 817        atomic_t *ref;
 818
 819        bch_keylist_init(&keys);
 820
 821        ref = bch_journal(c, &keys, cl);
 822        if (ref)
 823                atomic_dec_bug(ref);
 824}
 825
 826void bch_journal_free(struct cache_set *c)
 827{
 828        free_pages((unsigned long) c->journal.w[1].data, JSET_BITS);
 829        free_pages((unsigned long) c->journal.w[0].data, JSET_BITS);
 830        free_fifo(&c->journal.pin);
 831}
 832
 833int bch_journal_alloc(struct cache_set *c)
 834{
 835        struct journal *j = &c->journal;
 836
 837        spin_lock_init(&j->lock);
 838        INIT_DELAYED_WORK(&j->work, journal_write_work);
 839
 840        c->journal_delay_ms = 100;
 841
 842        j->w[0].c = c;
 843        j->w[1].c = c;
 844
 845        if (!(init_heap(&c->flush_btree, 128, GFP_KERNEL)) ||
 846            !(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
 847            !(j->w[0].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS)) ||
 848            !(j->w[1].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS)))
 849                return -ENOMEM;
 850
 851        return 0;
 852}
 853