linux/drivers/md/bcache/journal.c
<<
>>
Prefs
   1/*
   2 * bcache journalling code, for btree insertions
   3 *
   4 * Copyright 2012 Google, Inc.
   5 */
   6
   7#include "bcache.h"
   8#include "btree.h"
   9#include "debug.h"
  10#include "request.h"
  11
  12/*
  13 * Journal replay/recovery:
  14 *
  15 * This code is all driven from run_cache_set(); we first read the journal
  16 * entries, do some other stuff, then we mark all the keys in the journal
  17 * entries (same as garbage collection would), then we replay them - reinserting
  18 * them into the cache in precisely the same order as they appear in the
  19 * journal.
  20 *
  21 * We only journal keys that go in leaf nodes, which simplifies things quite a
  22 * bit.
  23 */
  24
  25static void journal_read_endio(struct bio *bio, int error)
  26{
  27        struct closure *cl = bio->bi_private;
  28        closure_put(cl);
  29}
  30
  31static int journal_read_bucket(struct cache *ca, struct list_head *list,
  32                               struct btree_op *op, unsigned bucket_index)
  33{
  34        struct journal_device *ja = &ca->journal;
  35        struct bio *bio = &ja->bio;
  36
  37        struct journal_replay *i;
  38        struct jset *j, *data = ca->set->journal.w[0].data;
  39        unsigned len, left, offset = 0;
  40        int ret = 0;
  41        sector_t bucket = bucket_to_sector(ca->set, ca->sb.d[bucket_index]);
  42
  43        pr_debug("reading %llu", (uint64_t) bucket);
  44
  45        while (offset < ca->sb.bucket_size) {
  46reread:         left = ca->sb.bucket_size - offset;
  47                len = min_t(unsigned, left, PAGE_SECTORS * 8);
  48
  49                bio_reset(bio);
  50                bio->bi_sector  = bucket + offset;
  51                bio->bi_bdev    = ca->bdev;
  52                bio->bi_rw      = READ;
  53                bio->bi_size    = len << 9;
  54
  55                bio->bi_end_io  = journal_read_endio;
  56                bio->bi_private = &op->cl;
  57                bch_bio_map(bio, data);
  58
  59                closure_bio_submit(bio, &op->cl, ca);
  60                closure_sync(&op->cl);
  61
  62                /* This function could be simpler now since we no longer write
  63                 * journal entries that overlap bucket boundaries; this means
  64                 * the start of a bucket will always have a valid journal entry
  65                 * if it has any journal entries at all.
  66                 */
  67
  68                j = data;
  69                while (len) {
  70                        struct list_head *where;
  71                        size_t blocks, bytes = set_bytes(j);
  72
  73                        if (j->magic != jset_magic(ca->set))
  74                                return ret;
  75
  76                        if (bytes > left << 9)
  77                                return ret;
  78
  79                        if (bytes > len << 9)
  80                                goto reread;
  81
  82                        if (j->csum != csum_set(j))
  83                                return ret;
  84
  85                        blocks = set_blocks(j, ca->set);
  86
  87                        while (!list_empty(list)) {
  88                                i = list_first_entry(list,
  89                                        struct journal_replay, list);
  90                                if (i->j.seq >= j->last_seq)
  91                                        break;
  92                                list_del(&i->list);
  93                                kfree(i);
  94                        }
  95
  96                        list_for_each_entry_reverse(i, list, list) {
  97                                if (j->seq == i->j.seq)
  98                                        goto next_set;
  99
 100                                if (j->seq < i->j.last_seq)
 101                                        goto next_set;
 102
 103                                if (j->seq > i->j.seq) {
 104                                        where = &i->list;
 105                                        goto add;
 106                                }
 107                        }
 108
 109                        where = list;
 110add:
 111                        i = kmalloc(offsetof(struct journal_replay, j) +
 112                                    bytes, GFP_KERNEL);
 113                        if (!i)
 114                                return -ENOMEM;
 115                        memcpy(&i->j, j, bytes);
 116                        list_add(&i->list, where);
 117                        ret = 1;
 118
 119                        ja->seq[bucket_index] = j->seq;
 120next_set:
 121                        offset  += blocks * ca->sb.block_size;
 122                        len     -= blocks * ca->sb.block_size;
 123                        j = ((void *) j) + blocks * block_bytes(ca);
 124                }
 125        }
 126
 127        return ret;
 128}
 129
 130int bch_journal_read(struct cache_set *c, struct list_head *list,
 131                        struct btree_op *op)
 132{
 133#define read_bucket(b)                                                  \
 134        ({                                                              \
 135                int ret = journal_read_bucket(ca, list, op, b);         \
 136                __set_bit(b, bitmap);                                   \
 137                if (ret < 0)                                            \
 138                        return ret;                                     \
 139                ret;                                                    \
 140        })
 141
 142        struct cache *ca;
 143        unsigned iter;
 144
 145        for_each_cache(ca, c, iter) {
 146                struct journal_device *ja = &ca->journal;
 147                unsigned long bitmap[SB_JOURNAL_BUCKETS / BITS_PER_LONG];
 148                unsigned i, l, r, m;
 149                uint64_t seq;
 150
 151                bitmap_zero(bitmap, SB_JOURNAL_BUCKETS);
 152                pr_debug("%u journal buckets", ca->sb.njournal_buckets);
 153
 154                /* Read journal buckets ordered by golden ratio hash to quickly
 155                 * find a sequence of buckets with valid journal entries
 156                 */
 157                for (i = 0; i < ca->sb.njournal_buckets; i++) {
 158                        l = (i * 2654435769U) % ca->sb.njournal_buckets;
 159
 160                        if (test_bit(l, bitmap))
 161                                break;
 162
 163                        if (read_bucket(l))
 164                                goto bsearch;
 165                }
 166
 167                /* If that fails, check all the buckets we haven't checked
 168                 * already
 169                 */
 170                pr_debug("falling back to linear search");
 171
 172                for (l = 0; l < ca->sb.njournal_buckets; l++) {
 173                        if (test_bit(l, bitmap))
 174                                continue;
 175
 176                        if (read_bucket(l))
 177                                goto bsearch;
 178                }
 179bsearch:
 180                /* Binary search */
 181                m = r = find_next_bit(bitmap, ca->sb.njournal_buckets, l + 1);
 182                pr_debug("starting binary search, l %u r %u", l, r);
 183
 184                while (l + 1 < r) {
 185                        m = (l + r) >> 1;
 186
 187                        if (read_bucket(m))
 188                                l = m;
 189                        else
 190                                r = m;
 191                }
 192
 193                /* Read buckets in reverse order until we stop finding more
 194                 * journal entries
 195                 */
 196                pr_debug("finishing up");
 197                l = m;
 198
 199                while (1) {
 200                        if (!l--)
 201                                l = ca->sb.njournal_buckets - 1;
 202
 203                        if (l == m)
 204                                break;
 205
 206                        if (test_bit(l, bitmap))
 207                                continue;
 208
 209                        if (!read_bucket(l))
 210                                break;
 211                }
 212
 213                seq = 0;
 214
 215                for (i = 0; i < ca->sb.njournal_buckets; i++)
 216                        if (ja->seq[i] > seq) {
 217                                seq = ja->seq[i];
 218                                ja->cur_idx = ja->discard_idx =
 219                                        ja->last_idx = i;
 220
 221                        }
 222        }
 223
 224        c->journal.seq = list_entry(list->prev,
 225                                    struct journal_replay,
 226                                    list)->j.seq;
 227
 228        return 0;
 229#undef read_bucket
 230}
 231
 232void bch_journal_mark(struct cache_set *c, struct list_head *list)
 233{
 234        atomic_t p = { 0 };
 235        struct bkey *k;
 236        struct journal_replay *i;
 237        struct journal *j = &c->journal;
 238        uint64_t last = j->seq;
 239
 240        /*
 241         * journal.pin should never fill up - we never write a journal
 242         * entry when it would fill up. But if for some reason it does, we
 243         * iterate over the list in reverse order so that we can just skip that
 244         * refcount instead of bugging.
 245         */
 246
 247        list_for_each_entry_reverse(i, list, list) {
 248                BUG_ON(last < i->j.seq);
 249                i->pin = NULL;
 250
 251                while (last-- != i->j.seq)
 252                        if (fifo_free(&j->pin) > 1) {
 253                                fifo_push_front(&j->pin, p);
 254                                atomic_set(&fifo_front(&j->pin), 0);
 255                        }
 256
 257                if (fifo_free(&j->pin) > 1) {
 258                        fifo_push_front(&j->pin, p);
 259                        i->pin = &fifo_front(&j->pin);
 260                        atomic_set(i->pin, 1);
 261                }
 262
 263                for (k = i->j.start;
 264                     k < end(&i->j);
 265                     k = bkey_next(k)) {
 266                        unsigned j;
 267
 268                        for (j = 0; j < KEY_PTRS(k); j++) {
 269                                struct bucket *g = PTR_BUCKET(c, k, j);
 270                                atomic_inc(&g->pin);
 271
 272                                if (g->prio == BTREE_PRIO &&
 273                                    !ptr_stale(c, k, j))
 274                                        g->prio = INITIAL_PRIO;
 275                        }
 276
 277                        __bch_btree_mark_key(c, 0, k);
 278                }
 279        }
 280}
 281
 282int bch_journal_replay(struct cache_set *s, struct list_head *list,
 283                          struct btree_op *op)
 284{
 285        int ret = 0, keys = 0, entries = 0;
 286        struct bkey *k;
 287        struct journal_replay *i =
 288                list_entry(list->prev, struct journal_replay, list);
 289
 290        uint64_t start = i->j.last_seq, end = i->j.seq, n = start;
 291
 292        list_for_each_entry(i, list, list) {
 293                BUG_ON(i->pin && atomic_read(i->pin) != 1);
 294
 295                if (n != i->j.seq)
 296                        pr_err(
 297                "journal entries %llu-%llu missing! (replaying %llu-%llu)\n",
 298                n, i->j.seq - 1, start, end);
 299
 300                for (k = i->j.start;
 301                     k < end(&i->j);
 302                     k = bkey_next(k)) {
 303                        pr_debug("%s", pkey(k));
 304                        bkey_copy(op->keys.top, k);
 305                        bch_keylist_push(&op->keys);
 306
 307                        op->journal = i->pin;
 308                        atomic_inc(op->journal);
 309
 310                        ret = bch_btree_insert(op, s);
 311                        if (ret)
 312                                goto err;
 313
 314                        BUG_ON(!bch_keylist_empty(&op->keys));
 315                        keys++;
 316
 317                        cond_resched();
 318                }
 319
 320                if (i->pin)
 321                        atomic_dec(i->pin);
 322                n = i->j.seq + 1;
 323                entries++;
 324        }
 325
 326        pr_info("journal replay done, %i keys in %i entries, seq %llu",
 327                keys, entries, end);
 328
 329        while (!list_empty(list)) {
 330                i = list_first_entry(list, struct journal_replay, list);
 331                list_del(&i->list);
 332                kfree(i);
 333        }
 334err:
 335        closure_sync(&op->cl);
 336        return ret;
 337}
 338
 339/* Journalling */
 340
 341static void btree_flush_write(struct cache_set *c)
 342{
 343        /*
 344         * Try to find the btree node with that references the oldest journal
 345         * entry, best is our current candidate and is locked if non NULL:
 346         */
 347        struct btree *b, *best = NULL;
 348        unsigned iter;
 349
 350        for_each_cached_btree(b, c, iter) {
 351                if (!down_write_trylock(&b->lock))
 352                        continue;
 353
 354                if (!btree_node_dirty(b) ||
 355                    !btree_current_write(b)->journal) {
 356                        rw_unlock(true, b);
 357                        continue;
 358                }
 359
 360                if (!best)
 361                        best = b;
 362                else if (journal_pin_cmp(c,
 363                                         btree_current_write(best),
 364                                         btree_current_write(b))) {
 365                        rw_unlock(true, best);
 366                        best = b;
 367                } else
 368                        rw_unlock(true, b);
 369        }
 370
 371        if (best)
 372                goto out;
 373
 374        /* We can't find the best btree node, just pick the first */
 375        list_for_each_entry(b, &c->btree_cache, list)
 376                if (!b->level && btree_node_dirty(b)) {
 377                        best = b;
 378                        rw_lock(true, best, best->level);
 379                        goto found;
 380                }
 381
 382out:
 383        if (!best)
 384                return;
 385found:
 386        if (btree_node_dirty(best))
 387                bch_btree_write(best, true, NULL);
 388        rw_unlock(true, best);
 389}
 390
 391#define last_seq(j)     ((j)->seq - fifo_used(&(j)->pin) + 1)
 392
 393static void journal_discard_endio(struct bio *bio, int error)
 394{
 395        struct journal_device *ja =
 396                container_of(bio, struct journal_device, discard_bio);
 397        struct cache *ca = container_of(ja, struct cache, journal);
 398
 399        atomic_set(&ja->discard_in_flight, DISCARD_DONE);
 400
 401        closure_wake_up(&ca->set->journal.wait);
 402        closure_put(&ca->set->cl);
 403}
 404
 405static void journal_discard_work(struct work_struct *work)
 406{
 407        struct journal_device *ja =
 408                container_of(work, struct journal_device, discard_work);
 409
 410        submit_bio(0, &ja->discard_bio);
 411}
 412
 413static void do_journal_discard(struct cache *ca)
 414{
 415        struct journal_device *ja = &ca->journal;
 416        struct bio *bio = &ja->discard_bio;
 417
 418        if (!ca->discard) {
 419                ja->discard_idx = ja->last_idx;
 420                return;
 421        }
 422
 423        switch (atomic_read(&ja->discard_in_flight) == DISCARD_IN_FLIGHT) {
 424        case DISCARD_IN_FLIGHT:
 425                return;
 426
 427        case DISCARD_DONE:
 428                ja->discard_idx = (ja->discard_idx + 1) %
 429                        ca->sb.njournal_buckets;
 430
 431                atomic_set(&ja->discard_in_flight, DISCARD_READY);
 432                /* fallthrough */
 433
 434        case DISCARD_READY:
 435                if (ja->discard_idx == ja->last_idx)
 436                        return;
 437
 438                atomic_set(&ja->discard_in_flight, DISCARD_IN_FLIGHT);
 439
 440                bio_init(bio);
 441                bio->bi_sector          = bucket_to_sector(ca->set,
 442                                                ca->sb.d[ja->discard_idx]);
 443                bio->bi_bdev            = ca->bdev;
 444                bio->bi_rw              = REQ_WRITE|REQ_DISCARD;
 445                bio->bi_max_vecs        = 1;
 446                bio->bi_io_vec          = bio->bi_inline_vecs;
 447                bio->bi_size            = bucket_bytes(ca);
 448                bio->bi_end_io          = journal_discard_endio;
 449
 450                closure_get(&ca->set->cl);
 451                INIT_WORK(&ja->discard_work, journal_discard_work);
 452                schedule_work(&ja->discard_work);
 453        }
 454}
 455
 456static void journal_reclaim(struct cache_set *c)
 457{
 458        struct bkey *k = &c->journal.key;
 459        struct cache *ca;
 460        uint64_t last_seq;
 461        unsigned iter, n = 0;
 462        atomic_t p;
 463
 464        while (!atomic_read(&fifo_front(&c->journal.pin)))
 465                fifo_pop(&c->journal.pin, p);
 466
 467        last_seq = last_seq(&c->journal);
 468
 469        /* Update last_idx */
 470
 471        for_each_cache(ca, c, iter) {
 472                struct journal_device *ja = &ca->journal;
 473
 474                while (ja->last_idx != ja->cur_idx &&
 475                       ja->seq[ja->last_idx] < last_seq)
 476                        ja->last_idx = (ja->last_idx + 1) %
 477                                ca->sb.njournal_buckets;
 478        }
 479
 480        for_each_cache(ca, c, iter)
 481                do_journal_discard(ca);
 482
 483        if (c->journal.blocks_free)
 484                return;
 485
 486        /*
 487         * Allocate:
 488         * XXX: Sort by free journal space
 489         */
 490
 491        for_each_cache(ca, c, iter) {
 492                struct journal_device *ja = &ca->journal;
 493                unsigned next = (ja->cur_idx + 1) % ca->sb.njournal_buckets;
 494
 495                /* No space available on this device */
 496                if (next == ja->discard_idx)
 497                        continue;
 498
 499                ja->cur_idx = next;
 500                k->ptr[n++] = PTR(0,
 501                                  bucket_to_sector(c, ca->sb.d[ja->cur_idx]),
 502                                  ca->sb.nr_this_dev);
 503        }
 504
 505        bkey_init(k);
 506        SET_KEY_PTRS(k, n);
 507
 508        if (n)
 509                c->journal.blocks_free = c->sb.bucket_size >> c->block_bits;
 510
 511        if (!journal_full(&c->journal))
 512                __closure_wake_up(&c->journal.wait);
 513}
 514
 515void bch_journal_next(struct journal *j)
 516{
 517        atomic_t p = { 1 };
 518
 519        j->cur = (j->cur == j->w)
 520                ? &j->w[1]
 521                : &j->w[0];
 522
 523        /*
 524         * The fifo_push() needs to happen at the same time as j->seq is
 525         * incremented for last_seq() to be calculated correctly
 526         */
 527        BUG_ON(!fifo_push(&j->pin, p));
 528        atomic_set(&fifo_back(&j->pin), 1);
 529
 530        j->cur->data->seq       = ++j->seq;
 531        j->cur->need_write      = false;
 532        j->cur->data->keys      = 0;
 533
 534        if (fifo_full(&j->pin))
 535                pr_debug("journal_pin full (%zu)", fifo_used(&j->pin));
 536}
 537
 538static void journal_write_endio(struct bio *bio, int error)
 539{
 540        struct journal_write *w = bio->bi_private;
 541
 542        cache_set_err_on(error, w->c, "journal io error");
 543        closure_put(&w->c->journal.io.cl);
 544}
 545
 546static void journal_write(struct closure *);
 547
 548static void journal_write_done(struct closure *cl)
 549{
 550        struct journal *j = container_of(cl, struct journal, io.cl);
 551        struct cache_set *c = container_of(j, struct cache_set, journal);
 552
 553        struct journal_write *w = (j->cur == j->w)
 554                ? &j->w[1]
 555                : &j->w[0];
 556
 557        __closure_wake_up(&w->wait);
 558
 559        if (c->journal_delay_ms)
 560                closure_delay(&j->io, msecs_to_jiffies(c->journal_delay_ms));
 561
 562        continue_at(cl, journal_write, system_wq);
 563}
 564
 565static void journal_write_unlocked(struct closure *cl)
 566        __releases(c->journal.lock)
 567{
 568        struct cache_set *c = container_of(cl, struct cache_set, journal.io.cl);
 569        struct cache *ca;
 570        struct journal_write *w = c->journal.cur;
 571        struct bkey *k = &c->journal.key;
 572        unsigned i, sectors = set_blocks(w->data, c) * c->sb.block_size;
 573
 574        struct bio *bio;
 575        struct bio_list list;
 576        bio_list_init(&list);
 577
 578        if (!w->need_write) {
 579                /*
 580                 * XXX: have to unlock closure before we unlock journal lock,
 581                 * else we race with bch_journal(). But this way we race
 582                 * against cache set unregister. Doh.
 583                 */
 584                set_closure_fn(cl, NULL, NULL);
 585                closure_sub(cl, CLOSURE_RUNNING + 1);
 586                spin_unlock(&c->journal.lock);
 587                return;
 588        } else if (journal_full(&c->journal)) {
 589                journal_reclaim(c);
 590                spin_unlock(&c->journal.lock);
 591
 592                btree_flush_write(c);
 593                continue_at(cl, journal_write, system_wq);
 594        }
 595
 596        c->journal.blocks_free -= set_blocks(w->data, c);
 597
 598        w->data->btree_level = c->root->level;
 599
 600        bkey_copy(&w->data->btree_root, &c->root->key);
 601        bkey_copy(&w->data->uuid_bucket, &c->uuid_bucket);
 602
 603        for_each_cache(ca, c, i)
 604                w->data->prio_bucket[ca->sb.nr_this_dev] = ca->prio_buckets[0];
 605
 606        w->data->magic          = jset_magic(c);
 607        w->data->version        = BCACHE_JSET_VERSION;
 608        w->data->last_seq       = last_seq(&c->journal);
 609        w->data->csum           = csum_set(w->data);
 610
 611        for (i = 0; i < KEY_PTRS(k); i++) {
 612                ca = PTR_CACHE(c, k, i);
 613                bio = &ca->journal.bio;
 614
 615                atomic_long_add(sectors, &ca->meta_sectors_written);
 616
 617                bio_reset(bio);
 618                bio->bi_sector  = PTR_OFFSET(k, i);
 619                bio->bi_bdev    = ca->bdev;
 620                bio->bi_rw      = REQ_WRITE|REQ_SYNC|REQ_META|REQ_FLUSH;
 621                bio->bi_size    = sectors << 9;
 622
 623                bio->bi_end_io  = journal_write_endio;
 624                bio->bi_private = w;
 625                bch_bio_map(bio, w->data);
 626
 627                trace_bcache_journal_write(bio);
 628                bio_list_add(&list, bio);
 629
 630                SET_PTR_OFFSET(k, i, PTR_OFFSET(k, i) + sectors);
 631
 632                ca->journal.seq[ca->journal.cur_idx] = w->data->seq;
 633        }
 634
 635        atomic_dec_bug(&fifo_back(&c->journal.pin));
 636        bch_journal_next(&c->journal);
 637        journal_reclaim(c);
 638
 639        spin_unlock(&c->journal.lock);
 640
 641        while ((bio = bio_list_pop(&list)))
 642                closure_bio_submit(bio, cl, c->cache[0]);
 643
 644        continue_at(cl, journal_write_done, NULL);
 645}
 646
 647static void journal_write(struct closure *cl)
 648{
 649        struct cache_set *c = container_of(cl, struct cache_set, journal.io.cl);
 650
 651        spin_lock(&c->journal.lock);
 652        journal_write_unlocked(cl);
 653}
 654
 655static void __journal_try_write(struct cache_set *c, bool noflush)
 656        __releases(c->journal.lock)
 657{
 658        struct closure *cl = &c->journal.io.cl;
 659
 660        if (!closure_trylock(cl, &c->cl))
 661                spin_unlock(&c->journal.lock);
 662        else if (noflush && journal_full(&c->journal)) {
 663                spin_unlock(&c->journal.lock);
 664                continue_at(cl, journal_write, system_wq);
 665        } else
 666                journal_write_unlocked(cl);
 667}
 668
 669#define journal_try_write(c)    __journal_try_write(c, false)
 670
 671void bch_journal_meta(struct cache_set *c, struct closure *cl)
 672{
 673        struct journal_write *w;
 674
 675        if (CACHE_SYNC(&c->sb)) {
 676                spin_lock(&c->journal.lock);
 677
 678                w = c->journal.cur;
 679                w->need_write = true;
 680
 681                if (cl)
 682                        BUG_ON(!closure_wait(&w->wait, cl));
 683
 684                __journal_try_write(c, true);
 685        }
 686}
 687
 688/*
 689 * Entry point to the journalling code - bio_insert() and btree_invalidate()
 690 * pass bch_journal() a list of keys to be journalled, and then
 691 * bch_journal() hands those same keys off to btree_insert_async()
 692 */
 693
 694void bch_journal(struct closure *cl)
 695{
 696        struct btree_op *op = container_of(cl, struct btree_op, cl);
 697        struct cache_set *c = op->c;
 698        struct journal_write *w;
 699        size_t b, n = ((uint64_t *) op->keys.top) - op->keys.list;
 700
 701        if (op->type != BTREE_INSERT ||
 702            !CACHE_SYNC(&c->sb))
 703                goto out;
 704
 705        /*
 706         * If we're looping because we errored, might already be waiting on
 707         * another journal write:
 708         */
 709        while (atomic_read(&cl->parent->remaining) & CLOSURE_WAITING)
 710                closure_sync(cl->parent);
 711
 712        spin_lock(&c->journal.lock);
 713
 714        if (journal_full(&c->journal)) {
 715                /* XXX: tracepoint */
 716                closure_wait(&c->journal.wait, cl);
 717
 718                journal_reclaim(c);
 719                spin_unlock(&c->journal.lock);
 720
 721                btree_flush_write(c);
 722                continue_at(cl, bch_journal, bcache_wq);
 723        }
 724
 725        w = c->journal.cur;
 726        w->need_write = true;
 727        b = __set_blocks(w->data, w->data->keys + n, c);
 728
 729        if (b * c->sb.block_size > PAGE_SECTORS << JSET_BITS ||
 730            b > c->journal.blocks_free) {
 731                /* XXX: If we were inserting so many keys that they won't fit in
 732                 * an _empty_ journal write, we'll deadlock. For now, handle
 733                 * this in bch_keylist_realloc() - but something to think about.
 734                 */
 735                BUG_ON(!w->data->keys);
 736
 737                /* XXX: tracepoint */
 738                BUG_ON(!closure_wait(&w->wait, cl));
 739
 740                closure_flush(&c->journal.io);
 741
 742                journal_try_write(c);
 743                continue_at(cl, bch_journal, bcache_wq);
 744        }
 745
 746        memcpy(end(w->data), op->keys.list, n * sizeof(uint64_t));
 747        w->data->keys += n;
 748
 749        op->journal = &fifo_back(&c->journal.pin);
 750        atomic_inc(op->journal);
 751
 752        if (op->flush_journal) {
 753                closure_flush(&c->journal.io);
 754                closure_wait(&w->wait, cl->parent);
 755        }
 756
 757        journal_try_write(c);
 758out:
 759        bch_btree_insert_async(cl);
 760}
 761
 762void bch_journal_free(struct cache_set *c)
 763{
 764        free_pages((unsigned long) c->journal.w[1].data, JSET_BITS);
 765        free_pages((unsigned long) c->journal.w[0].data, JSET_BITS);
 766        free_fifo(&c->journal.pin);
 767}
 768
 769int bch_journal_alloc(struct cache_set *c)
 770{
 771        struct journal *j = &c->journal;
 772
 773        closure_init_unlocked(&j->io);
 774        spin_lock_init(&j->lock);
 775
 776        c->journal_delay_ms = 100;
 777
 778        j->w[0].c = c;
 779        j->w[1].c = c;
 780
 781        if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
 782            !(j->w[0].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS)) ||
 783            !(j->w[1].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS)))
 784                return -ENOMEM;
 785
 786        return 0;
 787}
 788