linux/drivers/lightnvm/pblk-rb.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2016 CNEX Labs
   3 * Initial release: Javier Gonzalez <javier@cnexlabs.com>
   4 *
   5 * Based upon the circular ringbuffer.
   6 *
   7 * This program is free software; you can redistribute it and/or
   8 * modify it under the terms of the GNU General Public License version
   9 * 2 as published by the Free Software Foundation.
  10 *
  11 * This program is distributed in the hope that it will be useful, but
  12 * WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * General Public License for more details.
  15 *
  16 * pblk-rb.c - pblk's write buffer
  17 */
  18
  19#include <linux/circ_buf.h>
  20
  21#include "pblk.h"
  22
  23static DECLARE_RWSEM(pblk_rb_lock);
  24
  25void pblk_rb_data_free(struct pblk_rb *rb)
  26{
  27        struct pblk_rb_pages *p, *t;
  28
  29        down_write(&pblk_rb_lock);
  30        list_for_each_entry_safe(p, t, &rb->pages, list) {
  31                free_pages((unsigned long)page_address(p->pages), p->order);
  32                list_del(&p->list);
  33                kfree(p);
  34        }
  35        up_write(&pblk_rb_lock);
  36}
  37
  38/*
  39 * Initialize ring buffer. The data and metadata buffers must be previously
  40 * allocated and their size must be a power of two
  41 * (Documentation/circular-buffers.txt)
  42 */
  43int pblk_rb_init(struct pblk_rb *rb, struct pblk_rb_entry *rb_entry_base,
  44                 unsigned int power_size, unsigned int power_seg_sz)
  45{
  46        struct pblk *pblk = container_of(rb, struct pblk, rwb);
  47        unsigned int init_entry = 0;
  48        unsigned int alloc_order = power_size;
  49        unsigned int max_order = MAX_ORDER - 1;
  50        unsigned int order, iter;
  51
  52        down_write(&pblk_rb_lock);
  53        rb->entries = rb_entry_base;
  54        rb->seg_size = (1 << power_seg_sz);
  55        rb->nr_entries = (1 << power_size);
  56        rb->mem = rb->subm = rb->sync = rb->l2p_update = 0;
  57        rb->flush_point = EMPTY_ENTRY;
  58
  59        spin_lock_init(&rb->w_lock);
  60        spin_lock_init(&rb->s_lock);
  61
  62        INIT_LIST_HEAD(&rb->pages);
  63
  64        if (alloc_order >= max_order) {
  65                order = max_order;
  66                iter = (1 << (alloc_order - max_order));
  67        } else {
  68                order = alloc_order;
  69                iter = 1;
  70        }
  71
  72        do {
  73                struct pblk_rb_entry *entry;
  74                struct pblk_rb_pages *page_set;
  75                void *kaddr;
  76                unsigned long set_size;
  77                int i;
  78
  79                page_set = kmalloc(sizeof(struct pblk_rb_pages), GFP_KERNEL);
  80                if (!page_set) {
  81                        up_write(&pblk_rb_lock);
  82                        return -ENOMEM;
  83                }
  84
  85                page_set->order = order;
  86                page_set->pages = alloc_pages(GFP_KERNEL, order);
  87                if (!page_set->pages) {
  88                        kfree(page_set);
  89                        pblk_rb_data_free(rb);
  90                        up_write(&pblk_rb_lock);
  91                        return -ENOMEM;
  92                }
  93                kaddr = page_address(page_set->pages);
  94
  95                entry = &rb->entries[init_entry];
  96                entry->data = kaddr;
  97                entry->cacheline = pblk_cacheline_to_addr(init_entry++);
  98                entry->w_ctx.flags = PBLK_WRITABLE_ENTRY;
  99
 100                set_size = (1 << order);
 101                for (i = 1; i < set_size; i++) {
 102                        entry = &rb->entries[init_entry];
 103                        entry->cacheline = pblk_cacheline_to_addr(init_entry++);
 104                        entry->data = kaddr + (i * rb->seg_size);
 105                        entry->w_ctx.flags = PBLK_WRITABLE_ENTRY;
 106                        bio_list_init(&entry->w_ctx.bios);
 107                }
 108
 109                list_add_tail(&page_set->list, &rb->pages);
 110                iter--;
 111        } while (iter > 0);
 112        up_write(&pblk_rb_lock);
 113
 114#ifdef CONFIG_NVM_DEBUG
 115        atomic_set(&rb->inflight_flush_point, 0);
 116#endif
 117
 118        /*
 119         * Initialize rate-limiter, which controls access to the write buffer
 120         * but user and GC I/O
 121         */
 122        pblk_rl_init(&pblk->rl, rb->nr_entries);
 123
 124        return 0;
 125}
 126
 127/*
 128 * pblk_rb_calculate_size -- calculate the size of the write buffer
 129 */
 130unsigned int pblk_rb_calculate_size(unsigned int nr_entries)
 131{
 132        /* Alloc a write buffer that can at least fit 128 entries */
 133        return (1 << max(get_count_order(nr_entries), 7));
 134}
 135
 136void *pblk_rb_entries_ref(struct pblk_rb *rb)
 137{
 138        return rb->entries;
 139}
 140
 141static void clean_wctx(struct pblk_w_ctx *w_ctx)
 142{
 143        int flags;
 144
 145try:
 146        flags = READ_ONCE(w_ctx->flags);
 147        if (!(flags & PBLK_SUBMITTED_ENTRY))
 148                goto try;
 149
 150        /* Release flags on context. Protect from writes and reads */
 151        smp_store_release(&w_ctx->flags, PBLK_WRITABLE_ENTRY);
 152        pblk_ppa_set_empty(&w_ctx->ppa);
 153        w_ctx->lba = ADDR_EMPTY;
 154}
 155
 156#define pblk_rb_ring_count(head, tail, size) CIRC_CNT(head, tail, size)
 157#define pblk_rb_ring_space(rb, head, tail, size) \
 158                                        (CIRC_SPACE(head, tail, size))
 159
 160/*
 161 * Buffer space is calculated with respect to the back pointer signaling
 162 * synchronized entries to the media.
 163 */
 164static unsigned int pblk_rb_space(struct pblk_rb *rb)
 165{
 166        unsigned int mem = READ_ONCE(rb->mem);
 167        unsigned int sync = READ_ONCE(rb->sync);
 168
 169        return pblk_rb_ring_space(rb, mem, sync, rb->nr_entries);
 170}
 171
 172/*
 173 * Buffer count is calculated with respect to the submission entry signaling the
 174 * entries that are available to send to the media
 175 */
 176unsigned int pblk_rb_read_count(struct pblk_rb *rb)
 177{
 178        unsigned int mem = READ_ONCE(rb->mem);
 179        unsigned int subm = READ_ONCE(rb->subm);
 180
 181        return pblk_rb_ring_count(mem, subm, rb->nr_entries);
 182}
 183
 184unsigned int pblk_rb_sync_count(struct pblk_rb *rb)
 185{
 186        unsigned int mem = READ_ONCE(rb->mem);
 187        unsigned int sync = READ_ONCE(rb->sync);
 188
 189        return pblk_rb_ring_count(mem, sync, rb->nr_entries);
 190}
 191
 192unsigned int pblk_rb_read_commit(struct pblk_rb *rb, unsigned int nr_entries)
 193{
 194        unsigned int subm;
 195
 196        subm = READ_ONCE(rb->subm);
 197        /* Commit read means updating submission pointer */
 198        smp_store_release(&rb->subm,
 199                                (subm + nr_entries) & (rb->nr_entries - 1));
 200
 201        return subm;
 202}
 203
 204static int __pblk_rb_update_l2p(struct pblk_rb *rb, unsigned int to_update)
 205{
 206        struct pblk *pblk = container_of(rb, struct pblk, rwb);
 207        struct pblk_line *line;
 208        struct pblk_rb_entry *entry;
 209        struct pblk_w_ctx *w_ctx;
 210        unsigned int user_io = 0, gc_io = 0;
 211        unsigned int i;
 212        int flags;
 213
 214        for (i = 0; i < to_update; i++) {
 215                entry = &rb->entries[rb->l2p_update];
 216                w_ctx = &entry->w_ctx;
 217
 218                flags = READ_ONCE(entry->w_ctx.flags);
 219                if (flags & PBLK_IOTYPE_USER)
 220                        user_io++;
 221                else if (flags & PBLK_IOTYPE_GC)
 222                        gc_io++;
 223                else
 224                        WARN(1, "pblk: unknown IO type\n");
 225
 226                pblk_update_map_dev(pblk, w_ctx->lba, w_ctx->ppa,
 227                                                        entry->cacheline);
 228
 229                line = &pblk->lines[pblk_ppa_to_line(w_ctx->ppa)];
 230                kref_put(&line->ref, pblk_line_put);
 231                clean_wctx(w_ctx);
 232                rb->l2p_update = (rb->l2p_update + 1) & (rb->nr_entries - 1);
 233        }
 234
 235        pblk_rl_out(&pblk->rl, user_io, gc_io);
 236
 237        return 0;
 238}
 239
 240/*
 241 * When we move the l2p_update pointer, we update the l2p table - lookups will
 242 * point to the physical address instead of to the cacheline in the write buffer
 243 * from this moment on.
 244 */
 245static int pblk_rb_update_l2p(struct pblk_rb *rb, unsigned int nr_entries,
 246                              unsigned int mem, unsigned int sync)
 247{
 248        unsigned int space, count;
 249        int ret = 0;
 250
 251        lockdep_assert_held(&rb->w_lock);
 252
 253        /* Update l2p only as buffer entries are being overwritten */
 254        space = pblk_rb_ring_space(rb, mem, rb->l2p_update, rb->nr_entries);
 255        if (space > nr_entries)
 256                goto out;
 257
 258        count = nr_entries - space;
 259        /* l2p_update used exclusively under rb->w_lock */
 260        ret = __pblk_rb_update_l2p(rb, count);
 261
 262out:
 263        return ret;
 264}
 265
 266/*
 267 * Update the l2p entry for all sectors stored on the write buffer. This means
 268 * that all future lookups to the l2p table will point to a device address, not
 269 * to the cacheline in the write buffer.
 270 */
 271void pblk_rb_sync_l2p(struct pblk_rb *rb)
 272{
 273        unsigned int sync;
 274        unsigned int to_update;
 275
 276        spin_lock(&rb->w_lock);
 277
 278        /* Protect from reads and writes */
 279        sync = smp_load_acquire(&rb->sync);
 280
 281        to_update = pblk_rb_ring_count(sync, rb->l2p_update, rb->nr_entries);
 282        __pblk_rb_update_l2p(rb, to_update);
 283
 284        spin_unlock(&rb->w_lock);
 285}
 286
 287/*
 288 * Write @nr_entries to ring buffer from @data buffer if there is enough space.
 289 * Typically, 4KB data chunks coming from a bio will be copied to the ring
 290 * buffer, thus the write will fail if not all incoming data can be copied.
 291 *
 292 */
 293static void __pblk_rb_write_entry(struct pblk_rb *rb, void *data,
 294                                  struct pblk_w_ctx w_ctx,
 295                                  struct pblk_rb_entry *entry)
 296{
 297        memcpy(entry->data, data, rb->seg_size);
 298
 299        entry->w_ctx.lba = w_ctx.lba;
 300        entry->w_ctx.ppa = w_ctx.ppa;
 301}
 302
 303void pblk_rb_write_entry_user(struct pblk_rb *rb, void *data,
 304                              struct pblk_w_ctx w_ctx, unsigned int ring_pos)
 305{
 306        struct pblk *pblk = container_of(rb, struct pblk, rwb);
 307        struct pblk_rb_entry *entry;
 308        int flags;
 309
 310        entry = &rb->entries[ring_pos];
 311        flags = READ_ONCE(entry->w_ctx.flags);
 312#ifdef CONFIG_NVM_DEBUG
 313        /* Caller must guarantee that the entry is free */
 314        BUG_ON(!(flags & PBLK_WRITABLE_ENTRY));
 315#endif
 316
 317        __pblk_rb_write_entry(rb, data, w_ctx, entry);
 318
 319        pblk_update_map_cache(pblk, w_ctx.lba, entry->cacheline);
 320        flags = w_ctx.flags | PBLK_WRITTEN_DATA;
 321
 322        /* Release flags on write context. Protect from writes */
 323        smp_store_release(&entry->w_ctx.flags, flags);
 324}
 325
 326void pblk_rb_write_entry_gc(struct pblk_rb *rb, void *data,
 327                            struct pblk_w_ctx w_ctx, struct pblk_line *line,
 328                            u64 paddr, unsigned int ring_pos)
 329{
 330        struct pblk *pblk = container_of(rb, struct pblk, rwb);
 331        struct pblk_rb_entry *entry;
 332        int flags;
 333
 334        entry = &rb->entries[ring_pos];
 335        flags = READ_ONCE(entry->w_ctx.flags);
 336#ifdef CONFIG_NVM_DEBUG
 337        /* Caller must guarantee that the entry is free */
 338        BUG_ON(!(flags & PBLK_WRITABLE_ENTRY));
 339#endif
 340
 341        __pblk_rb_write_entry(rb, data, w_ctx, entry);
 342
 343        if (!pblk_update_map_gc(pblk, w_ctx.lba, entry->cacheline, line, paddr))
 344                entry->w_ctx.lba = ADDR_EMPTY;
 345
 346        flags = w_ctx.flags | PBLK_WRITTEN_DATA;
 347
 348        /* Release flags on write context. Protect from writes */
 349        smp_store_release(&entry->w_ctx.flags, flags);
 350}
 351
 352static int pblk_rb_flush_point_set(struct pblk_rb *rb, struct bio *bio,
 353                                  unsigned int pos)
 354{
 355        struct pblk_rb_entry *entry;
 356        unsigned int sync, flush_point;
 357
 358        pblk_rb_sync_init(rb, NULL);
 359        sync = READ_ONCE(rb->sync);
 360
 361        if (pos == sync) {
 362                pblk_rb_sync_end(rb, NULL);
 363                return 0;
 364        }
 365
 366#ifdef CONFIG_NVM_DEBUG
 367        atomic_inc(&rb->inflight_flush_point);
 368#endif
 369
 370        flush_point = (pos == 0) ? (rb->nr_entries - 1) : (pos - 1);
 371        entry = &rb->entries[flush_point];
 372
 373        /* Protect flush points */
 374        smp_store_release(&rb->flush_point, flush_point);
 375
 376        if (bio)
 377                bio_list_add(&entry->w_ctx.bios, bio);
 378
 379        pblk_rb_sync_end(rb, NULL);
 380
 381        return bio ? 1 : 0;
 382}
 383
 384static int __pblk_rb_may_write(struct pblk_rb *rb, unsigned int nr_entries,
 385                               unsigned int *pos)
 386{
 387        unsigned int mem;
 388        unsigned int sync;
 389
 390        sync = READ_ONCE(rb->sync);
 391        mem = READ_ONCE(rb->mem);
 392
 393        if (pblk_rb_ring_space(rb, mem, sync, rb->nr_entries) < nr_entries)
 394                return 0;
 395
 396        if (pblk_rb_update_l2p(rb, nr_entries, mem, sync))
 397                return 0;
 398
 399        *pos = mem;
 400
 401        return 1;
 402}
 403
 404static int pblk_rb_may_write(struct pblk_rb *rb, unsigned int nr_entries,
 405                             unsigned int *pos)
 406{
 407        if (!__pblk_rb_may_write(rb, nr_entries, pos))
 408                return 0;
 409
 410        /* Protect from read count */
 411        smp_store_release(&rb->mem, (*pos + nr_entries) & (rb->nr_entries - 1));
 412        return 1;
 413}
 414
 415void pblk_rb_flush(struct pblk_rb *rb)
 416{
 417        struct pblk *pblk = container_of(rb, struct pblk, rwb);
 418        unsigned int mem = READ_ONCE(rb->mem);
 419
 420        if (pblk_rb_flush_point_set(rb, NULL, mem))
 421                return;
 422
 423        pblk_write_should_kick(pblk);
 424}
 425
 426static int pblk_rb_may_write_flush(struct pblk_rb *rb, unsigned int nr_entries,
 427                                   unsigned int *pos, struct bio *bio,
 428                                   int *io_ret)
 429{
 430        unsigned int mem;
 431
 432        if (!__pblk_rb_may_write(rb, nr_entries, pos))
 433                return 0;
 434
 435        mem = (*pos + nr_entries) & (rb->nr_entries - 1);
 436        *io_ret = NVM_IO_DONE;
 437
 438        if (bio->bi_opf & REQ_PREFLUSH) {
 439                struct pblk *pblk = container_of(rb, struct pblk, rwb);
 440
 441                atomic64_inc(&pblk->nr_flush);
 442                if (pblk_rb_flush_point_set(&pblk->rwb, bio, mem))
 443                        *io_ret = NVM_IO_OK;
 444        }
 445
 446        /* Protect from read count */
 447        smp_store_release(&rb->mem, mem);
 448
 449        return 1;
 450}
 451
 452/*
 453 * Atomically check that (i) there is space on the write buffer for the
 454 * incoming I/O, and (ii) the current I/O type has enough budget in the write
 455 * buffer (rate-limiter).
 456 */
 457int pblk_rb_may_write_user(struct pblk_rb *rb, struct bio *bio,
 458                           unsigned int nr_entries, unsigned int *pos)
 459{
 460        struct pblk *pblk = container_of(rb, struct pblk, rwb);
 461        int io_ret;
 462
 463        spin_lock(&rb->w_lock);
 464        io_ret = pblk_rl_user_may_insert(&pblk->rl, nr_entries);
 465        if (io_ret) {
 466                spin_unlock(&rb->w_lock);
 467                return io_ret;
 468        }
 469
 470        if (!pblk_rb_may_write_flush(rb, nr_entries, pos, bio, &io_ret)) {
 471                spin_unlock(&rb->w_lock);
 472                return NVM_IO_REQUEUE;
 473        }
 474
 475        pblk_rl_user_in(&pblk->rl, nr_entries);
 476        spin_unlock(&rb->w_lock);
 477
 478        return io_ret;
 479}
 480
 481/*
 482 * Look at pblk_rb_may_write_user comment
 483 */
 484int pblk_rb_may_write_gc(struct pblk_rb *rb, unsigned int nr_entries,
 485                         unsigned int *pos)
 486{
 487        struct pblk *pblk = container_of(rb, struct pblk, rwb);
 488
 489        spin_lock(&rb->w_lock);
 490        if (!pblk_rl_gc_may_insert(&pblk->rl, nr_entries)) {
 491                spin_unlock(&rb->w_lock);
 492                return 0;
 493        }
 494
 495        if (!pblk_rb_may_write(rb, nr_entries, pos)) {
 496                spin_unlock(&rb->w_lock);
 497                return 0;
 498        }
 499
 500        pblk_rl_gc_in(&pblk->rl, nr_entries);
 501        spin_unlock(&rb->w_lock);
 502
 503        return 1;
 504}
 505
 506/*
 507 * The caller of this function must ensure that the backpointer will not
 508 * overwrite the entries passed on the list.
 509 */
 510unsigned int pblk_rb_read_to_bio_list(struct pblk_rb *rb, struct bio *bio,
 511                                      struct list_head *list,
 512                                      unsigned int max)
 513{
 514        struct pblk_rb_entry *entry, *tentry;
 515        struct page *page;
 516        unsigned int read = 0;
 517        int ret;
 518
 519        list_for_each_entry_safe(entry, tentry, list, index) {
 520                if (read > max) {
 521                        pr_err("pblk: too many entries on list\n");
 522                        goto out;
 523                }
 524
 525                page = virt_to_page(entry->data);
 526                if (!page) {
 527                        pr_err("pblk: could not allocate write bio page\n");
 528                        goto out;
 529                }
 530
 531                ret = bio_add_page(bio, page, rb->seg_size, 0);
 532                if (ret != rb->seg_size) {
 533                        pr_err("pblk: could not add page to write bio\n");
 534                        goto out;
 535                }
 536
 537                list_del(&entry->index);
 538                read++;
 539        }
 540
 541out:
 542        return read;
 543}
 544
 545/*
 546 * Read available entries on rb and add them to the given bio. To avoid a memory
 547 * copy, a page reference to the write buffer is used to be added to the bio.
 548 *
 549 * This function is used by the write thread to form the write bio that will
 550 * persist data on the write buffer to the media.
 551 */
 552unsigned int pblk_rb_read_to_bio(struct pblk_rb *rb, struct nvm_rq *rqd,
 553                                 unsigned int pos, unsigned int nr_entries,
 554                                 unsigned int count)
 555{
 556        struct pblk *pblk = container_of(rb, struct pblk, rwb);
 557        struct request_queue *q = pblk->dev->q;
 558        struct pblk_c_ctx *c_ctx = nvm_rq_to_pdu(rqd);
 559        struct bio *bio = rqd->bio;
 560        struct pblk_rb_entry *entry;
 561        struct page *page;
 562        unsigned int pad = 0, to_read = nr_entries;
 563        unsigned int i;
 564        int flags;
 565
 566        if (count < nr_entries) {
 567                pad = nr_entries - count;
 568                to_read = count;
 569        }
 570
 571        c_ctx->sentry = pos;
 572        c_ctx->nr_valid = to_read;
 573        c_ctx->nr_padded = pad;
 574
 575        for (i = 0; i < to_read; i++) {
 576                entry = &rb->entries[pos];
 577
 578                /* A write has been allowed into the buffer, but data is still
 579                 * being copied to it. It is ok to busy wait.
 580                 */
 581try:
 582                flags = READ_ONCE(entry->w_ctx.flags);
 583                if (!(flags & PBLK_WRITTEN_DATA)) {
 584                        io_schedule();
 585                        goto try;
 586                }
 587
 588                page = virt_to_page(entry->data);
 589                if (!page) {
 590                        pr_err("pblk: could not allocate write bio page\n");
 591                        flags &= ~PBLK_WRITTEN_DATA;
 592                        flags |= PBLK_SUBMITTED_ENTRY;
 593                        /* Release flags on context. Protect from writes */
 594                        smp_store_release(&entry->w_ctx.flags, flags);
 595                        return NVM_IO_ERR;
 596                }
 597
 598                if (bio_add_pc_page(q, bio, page, rb->seg_size, 0) !=
 599                                                                rb->seg_size) {
 600                        pr_err("pblk: could not add page to write bio\n");
 601                        flags &= ~PBLK_WRITTEN_DATA;
 602                        flags |= PBLK_SUBMITTED_ENTRY;
 603                        /* Release flags on context. Protect from writes */
 604                        smp_store_release(&entry->w_ctx.flags, flags);
 605                        return NVM_IO_ERR;
 606                }
 607
 608                flags &= ~PBLK_WRITTEN_DATA;
 609                flags |= PBLK_SUBMITTED_ENTRY;
 610
 611                /* Release flags on context. Protect from writes */
 612                smp_store_release(&entry->w_ctx.flags, flags);
 613
 614                pos = (pos + 1) & (rb->nr_entries - 1);
 615        }
 616
 617        if (pad) {
 618                if (pblk_bio_add_pages(pblk, bio, GFP_KERNEL, pad)) {
 619                        pr_err("pblk: could not pad page in write bio\n");
 620                        return NVM_IO_ERR;
 621                }
 622
 623                if (pad < pblk->min_write_pgs)
 624                        atomic64_inc(&pblk->pad_dist[pad - 1]);
 625                else
 626                        pr_warn("pblk: padding more than min. sectors\n");
 627
 628                atomic64_add(pad, &pblk->pad_wa);
 629        }
 630
 631#ifdef CONFIG_NVM_DEBUG
 632        atomic_long_add(pad, &pblk->padded_writes);
 633#endif
 634
 635        return NVM_IO_OK;
 636}
 637
 638/*
 639 * Copy to bio only if the lba matches the one on the given cache entry.
 640 * Otherwise, it means that the entry has been overwritten, and the bio should
 641 * be directed to disk.
 642 */
 643int pblk_rb_copy_to_bio(struct pblk_rb *rb, struct bio *bio, sector_t lba,
 644                        struct ppa_addr ppa, int bio_iter, bool advanced_bio)
 645{
 646        struct pblk *pblk = container_of(rb, struct pblk, rwb);
 647        struct pblk_rb_entry *entry;
 648        struct pblk_w_ctx *w_ctx;
 649        struct ppa_addr l2p_ppa;
 650        u64 pos = pblk_addr_to_cacheline(ppa);
 651        void *data;
 652        int flags;
 653        int ret = 1;
 654
 655
 656#ifdef CONFIG_NVM_DEBUG
 657        /* Caller must ensure that the access will not cause an overflow */
 658        BUG_ON(pos >= rb->nr_entries);
 659#endif
 660        entry = &rb->entries[pos];
 661        w_ctx = &entry->w_ctx;
 662        flags = READ_ONCE(w_ctx->flags);
 663
 664        spin_lock(&rb->w_lock);
 665        spin_lock(&pblk->trans_lock);
 666        l2p_ppa = pblk_trans_map_get(pblk, lba);
 667        spin_unlock(&pblk->trans_lock);
 668
 669        /* Check if the entry has been overwritten or is scheduled to be */
 670        if (!pblk_ppa_comp(l2p_ppa, ppa) || w_ctx->lba != lba ||
 671                                                flags & PBLK_WRITABLE_ENTRY) {
 672                ret = 0;
 673                goto out;
 674        }
 675
 676        /* Only advance the bio if it hasn't been advanced already. If advanced,
 677         * this bio is at least a partial bio (i.e., it has partially been
 678         * filled with data from the cache). If part of the data resides on the
 679         * media, we will read later on
 680         */
 681        if (unlikely(!advanced_bio))
 682                bio_advance(bio, bio_iter * PBLK_EXPOSED_PAGE_SIZE);
 683
 684        data = bio_data(bio);
 685        memcpy(data, entry->data, rb->seg_size);
 686
 687out:
 688        spin_unlock(&rb->w_lock);
 689        return ret;
 690}
 691
 692struct pblk_w_ctx *pblk_rb_w_ctx(struct pblk_rb *rb, unsigned int pos)
 693{
 694        unsigned int entry = pos & (rb->nr_entries - 1);
 695
 696        return &rb->entries[entry].w_ctx;
 697}
 698
 699unsigned int pblk_rb_sync_init(struct pblk_rb *rb, unsigned long *flags)
 700        __acquires(&rb->s_lock)
 701{
 702        if (flags)
 703                spin_lock_irqsave(&rb->s_lock, *flags);
 704        else
 705                spin_lock_irq(&rb->s_lock);
 706
 707        return rb->sync;
 708}
 709
 710void pblk_rb_sync_end(struct pblk_rb *rb, unsigned long *flags)
 711        __releases(&rb->s_lock)
 712{
 713        lockdep_assert_held(&rb->s_lock);
 714
 715        if (flags)
 716                spin_unlock_irqrestore(&rb->s_lock, *flags);
 717        else
 718                spin_unlock_irq(&rb->s_lock);
 719}
 720
 721unsigned int pblk_rb_sync_advance(struct pblk_rb *rb, unsigned int nr_entries)
 722{
 723        unsigned int sync, flush_point;
 724        lockdep_assert_held(&rb->s_lock);
 725
 726        sync = READ_ONCE(rb->sync);
 727        flush_point = READ_ONCE(rb->flush_point);
 728
 729        if (flush_point != EMPTY_ENTRY) {
 730                unsigned int secs_to_flush;
 731
 732                secs_to_flush = pblk_rb_ring_count(flush_point, sync,
 733                                        rb->nr_entries);
 734                if (secs_to_flush < nr_entries) {
 735                        /* Protect flush points */
 736                        smp_store_release(&rb->flush_point, EMPTY_ENTRY);
 737                }
 738        }
 739
 740        sync = (sync + nr_entries) & (rb->nr_entries - 1);
 741
 742        /* Protect from counts */
 743        smp_store_release(&rb->sync, sync);
 744
 745        return sync;
 746}
 747
 748/* Calculate how many sectors to submit up to the current flush point. */
 749unsigned int pblk_rb_flush_point_count(struct pblk_rb *rb)
 750{
 751        unsigned int subm, sync, flush_point;
 752        unsigned int submitted, to_flush;
 753
 754        /* Protect flush points */
 755        flush_point = smp_load_acquire(&rb->flush_point);
 756        if (flush_point == EMPTY_ENTRY)
 757                return 0;
 758
 759        /* Protect syncs */
 760        sync = smp_load_acquire(&rb->sync);
 761
 762        subm = READ_ONCE(rb->subm);
 763        submitted = pblk_rb_ring_count(subm, sync, rb->nr_entries);
 764
 765        /* The sync point itself counts as a sector to sync */
 766        to_flush = pblk_rb_ring_count(flush_point, sync, rb->nr_entries) + 1;
 767
 768        return (submitted < to_flush) ? (to_flush - submitted) : 0;
 769}
 770
 771/*
 772 * Scan from the current position of the sync pointer to find the entry that
 773 * corresponds to the given ppa. This is necessary since write requests can be
 774 * completed out of order. The assumption is that the ppa is close to the sync
 775 * pointer thus the search will not take long.
 776 *
 777 * The caller of this function must guarantee that the sync pointer will no
 778 * reach the entry while it is using the metadata associated with it. With this
 779 * assumption in mind, there is no need to take the sync lock.
 780 */
 781struct pblk_rb_entry *pblk_rb_sync_scan_entry(struct pblk_rb *rb,
 782                                              struct ppa_addr *ppa)
 783{
 784        unsigned int sync, subm, count;
 785        unsigned int i;
 786
 787        sync = READ_ONCE(rb->sync);
 788        subm = READ_ONCE(rb->subm);
 789        count = pblk_rb_ring_count(subm, sync, rb->nr_entries);
 790
 791        for (i = 0; i < count; i++)
 792                sync = (sync + 1) & (rb->nr_entries - 1);
 793
 794        return NULL;
 795}
 796
 797int pblk_rb_tear_down_check(struct pblk_rb *rb)
 798{
 799        struct pblk_rb_entry *entry;
 800        int i;
 801        int ret = 0;
 802
 803        spin_lock(&rb->w_lock);
 804        spin_lock_irq(&rb->s_lock);
 805
 806        if ((rb->mem == rb->subm) && (rb->subm == rb->sync) &&
 807                                (rb->sync == rb->l2p_update) &&
 808                                (rb->flush_point == EMPTY_ENTRY)) {
 809                goto out;
 810        }
 811
 812        if (!rb->entries) {
 813                ret = 1;
 814                goto out;
 815        }
 816
 817        for (i = 0; i < rb->nr_entries; i++) {
 818                entry = &rb->entries[i];
 819
 820                if (!entry->data) {
 821                        ret = 1;
 822                        goto out;
 823                }
 824        }
 825
 826out:
 827        spin_unlock(&rb->w_lock);
 828        spin_unlock_irq(&rb->s_lock);
 829
 830        return ret;
 831}
 832
 833unsigned int pblk_rb_wrap_pos(struct pblk_rb *rb, unsigned int pos)
 834{
 835        return (pos & (rb->nr_entries - 1));
 836}
 837
 838int pblk_rb_pos_oob(struct pblk_rb *rb, u64 pos)
 839{
 840        return (pos >= rb->nr_entries);
 841}
 842
 843ssize_t pblk_rb_sysfs(struct pblk_rb *rb, char *buf)
 844{
 845        struct pblk *pblk = container_of(rb, struct pblk, rwb);
 846        struct pblk_c_ctx *c;
 847        ssize_t offset;
 848        int queued_entries = 0;
 849
 850        spin_lock_irq(&rb->s_lock);
 851        list_for_each_entry(c, &pblk->compl_list, list)
 852                queued_entries++;
 853        spin_unlock_irq(&rb->s_lock);
 854
 855        if (rb->flush_point != EMPTY_ENTRY)
 856                offset = scnprintf(buf, PAGE_SIZE,
 857                        "%u\t%u\t%u\t%u\t%u\t%u\t%u - %u/%u/%u - %d\n",
 858                        rb->nr_entries,
 859                        rb->mem,
 860                        rb->subm,
 861                        rb->sync,
 862                        rb->l2p_update,
 863#ifdef CONFIG_NVM_DEBUG
 864                        atomic_read(&rb->inflight_flush_point),
 865#else
 866                        0,
 867#endif
 868                        rb->flush_point,
 869                        pblk_rb_read_count(rb),
 870                        pblk_rb_space(rb),
 871                        pblk_rb_flush_point_count(rb),
 872                        queued_entries);
 873        else
 874                offset = scnprintf(buf, PAGE_SIZE,
 875                        "%u\t%u\t%u\t%u\t%u\t%u\tNULL - %u/%u/%u - %d\n",
 876                        rb->nr_entries,
 877                        rb->mem,
 878                        rb->subm,
 879                        rb->sync,
 880                        rb->l2p_update,
 881#ifdef CONFIG_NVM_DEBUG
 882                        atomic_read(&rb->inflight_flush_point),
 883#else
 884                        0,
 885#endif
 886                        pblk_rb_read_count(rb),
 887                        pblk_rb_space(rb),
 888                        pblk_rb_flush_point_count(rb),
 889                        queued_entries);
 890
 891        return offset;
 892}
 893