linux/drivers/lightnvm/pblk-rb.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2016 CNEX Labs
   3 * Initial release: Javier Gonzalez <javier@cnexlabs.com>
   4 *
   5 * Based upon the circular ringbuffer.
   6 *
   7 * This program is free software; you can redistribute it and/or
   8 * modify it under the terms of the GNU General Public License version
   9 * 2 as published by the Free Software Foundation.
  10 *
  11 * This program is distributed in the hope that it will be useful, but
  12 * WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * General Public License for more details.
  15 *
  16 * pblk-rb.c - pblk's write buffer
  17 */
  18
  19#include <linux/circ_buf.h>
  20
  21#include "pblk.h"
  22
  23static DECLARE_RWSEM(pblk_rb_lock);
  24
  25void pblk_rb_data_free(struct pblk_rb *rb)
  26{
  27        struct pblk_rb_pages *p, *t;
  28
  29        down_write(&pblk_rb_lock);
  30        list_for_each_entry_safe(p, t, &rb->pages, list) {
  31                free_pages((unsigned long)page_address(p->pages), p->order);
  32                list_del(&p->list);
  33                kfree(p);
  34        }
  35        up_write(&pblk_rb_lock);
  36}
  37
  38/*
  39 * Initialize ring buffer. The data and metadata buffers must be previously
  40 * allocated and their size must be a power of two
  41 * (Documentation/core-api/circular-buffers.rst)
  42 */
  43int pblk_rb_init(struct pblk_rb *rb, struct pblk_rb_entry *rb_entry_base,
  44                 unsigned int power_size, unsigned int power_seg_sz)
  45{
  46        struct pblk *pblk = container_of(rb, struct pblk, rwb);
  47        unsigned int init_entry = 0;
  48        unsigned int alloc_order = power_size;
  49        unsigned int max_order = MAX_ORDER - 1;
  50        unsigned int order, iter;
  51
  52        down_write(&pblk_rb_lock);
  53        rb->entries = rb_entry_base;
  54        rb->seg_size = (1 << power_seg_sz);
  55        rb->nr_entries = (1 << power_size);
  56        rb->mem = rb->subm = rb->sync = rb->l2p_update = 0;
  57        rb->flush_point = EMPTY_ENTRY;
  58
  59        spin_lock_init(&rb->w_lock);
  60        spin_lock_init(&rb->s_lock);
  61
  62        INIT_LIST_HEAD(&rb->pages);
  63
  64        if (alloc_order >= max_order) {
  65                order = max_order;
  66                iter = (1 << (alloc_order - max_order));
  67        } else {
  68                order = alloc_order;
  69                iter = 1;
  70        }
  71
  72        do {
  73                struct pblk_rb_entry *entry;
  74                struct pblk_rb_pages *page_set;
  75                void *kaddr;
  76                unsigned long set_size;
  77                int i;
  78
  79                page_set = kmalloc(sizeof(struct pblk_rb_pages), GFP_KERNEL);
  80                if (!page_set) {
  81                        up_write(&pblk_rb_lock);
  82                        return -ENOMEM;
  83                }
  84
  85                page_set->order = order;
  86                page_set->pages = alloc_pages(GFP_KERNEL, order);
  87                if (!page_set->pages) {
  88                        kfree(page_set);
  89                        pblk_rb_data_free(rb);
  90                        up_write(&pblk_rb_lock);
  91                        return -ENOMEM;
  92                }
  93                kaddr = page_address(page_set->pages);
  94
  95                entry = &rb->entries[init_entry];
  96                entry->data = kaddr;
  97                entry->cacheline = pblk_cacheline_to_addr(init_entry++);
  98                entry->w_ctx.flags = PBLK_WRITABLE_ENTRY;
  99
 100                set_size = (1 << order);
 101                for (i = 1; i < set_size; i++) {
 102                        entry = &rb->entries[init_entry];
 103                        entry->cacheline = pblk_cacheline_to_addr(init_entry++);
 104                        entry->data = kaddr + (i * rb->seg_size);
 105                        entry->w_ctx.flags = PBLK_WRITABLE_ENTRY;
 106                        bio_list_init(&entry->w_ctx.bios);
 107                }
 108
 109                list_add_tail(&page_set->list, &rb->pages);
 110                iter--;
 111        } while (iter > 0);
 112        up_write(&pblk_rb_lock);
 113
 114#ifdef CONFIG_NVM_DEBUG
 115        atomic_set(&rb->inflight_flush_point, 0);
 116#endif
 117
 118        /*
 119         * Initialize rate-limiter, which controls access to the write buffer
 120         * but user and GC I/O
 121         */
 122        pblk_rl_init(&pblk->rl, rb->nr_entries);
 123
 124        return 0;
 125}
 126
 127/*
 128 * pblk_rb_calculate_size -- calculate the size of the write buffer
 129 */
 130unsigned int pblk_rb_calculate_size(unsigned int nr_entries)
 131{
 132        /* Alloc a write buffer that can at least fit 128 entries */
 133        return (1 << max(get_count_order(nr_entries), 7));
 134}
 135
 136void *pblk_rb_entries_ref(struct pblk_rb *rb)
 137{
 138        return rb->entries;
 139}
 140
 141static void clean_wctx(struct pblk_w_ctx *w_ctx)
 142{
 143        int flags;
 144
 145        flags = READ_ONCE(w_ctx->flags);
 146        WARN_ONCE(!(flags & PBLK_SUBMITTED_ENTRY),
 147                        "pblk: overwriting unsubmitted data\n");
 148
 149        /* Release flags on context. Protect from writes and reads */
 150        smp_store_release(&w_ctx->flags, PBLK_WRITABLE_ENTRY);
 151        pblk_ppa_set_empty(&w_ctx->ppa);
 152        w_ctx->lba = ADDR_EMPTY;
 153}
 154
 155#define pblk_rb_ring_count(head, tail, size) CIRC_CNT(head, tail, size)
 156#define pblk_rb_ring_space(rb, head, tail, size) \
 157                                        (CIRC_SPACE(head, tail, size))
 158
 159/*
 160 * Buffer space is calculated with respect to the back pointer signaling
 161 * synchronized entries to the media.
 162 */
 163static unsigned int pblk_rb_space(struct pblk_rb *rb)
 164{
 165        unsigned int mem = READ_ONCE(rb->mem);
 166        unsigned int sync = READ_ONCE(rb->sync);
 167
 168        return pblk_rb_ring_space(rb, mem, sync, rb->nr_entries);
 169}
 170
 171/*
 172 * Buffer count is calculated with respect to the submission entry signaling the
 173 * entries that are available to send to the media
 174 */
 175unsigned int pblk_rb_read_count(struct pblk_rb *rb)
 176{
 177        unsigned int mem = READ_ONCE(rb->mem);
 178        unsigned int subm = READ_ONCE(rb->subm);
 179
 180        return pblk_rb_ring_count(mem, subm, rb->nr_entries);
 181}
 182
 183unsigned int pblk_rb_sync_count(struct pblk_rb *rb)
 184{
 185        unsigned int mem = READ_ONCE(rb->mem);
 186        unsigned int sync = READ_ONCE(rb->sync);
 187
 188        return pblk_rb_ring_count(mem, sync, rb->nr_entries);
 189}
 190
 191unsigned int pblk_rb_read_commit(struct pblk_rb *rb, unsigned int nr_entries)
 192{
 193        unsigned int subm;
 194
 195        subm = READ_ONCE(rb->subm);
 196        /* Commit read means updating submission pointer */
 197        smp_store_release(&rb->subm,
 198                                (subm + nr_entries) & (rb->nr_entries - 1));
 199
 200        return subm;
 201}
 202
 203static int __pblk_rb_update_l2p(struct pblk_rb *rb, unsigned int to_update)
 204{
 205        struct pblk *pblk = container_of(rb, struct pblk, rwb);
 206        struct pblk_line *line;
 207        struct pblk_rb_entry *entry;
 208        struct pblk_w_ctx *w_ctx;
 209        unsigned int user_io = 0, gc_io = 0;
 210        unsigned int i;
 211        int flags;
 212
 213        for (i = 0; i < to_update; i++) {
 214                entry = &rb->entries[rb->l2p_update];
 215                w_ctx = &entry->w_ctx;
 216
 217                flags = READ_ONCE(entry->w_ctx.flags);
 218                if (flags & PBLK_IOTYPE_USER)
 219                        user_io++;
 220                else if (flags & PBLK_IOTYPE_GC)
 221                        gc_io++;
 222                else
 223                        WARN(1, "pblk: unknown IO type\n");
 224
 225                pblk_update_map_dev(pblk, w_ctx->lba, w_ctx->ppa,
 226                                                        entry->cacheline);
 227
 228                line = &pblk->lines[pblk_ppa_to_line(w_ctx->ppa)];
 229                kref_put(&line->ref, pblk_line_put);
 230                clean_wctx(w_ctx);
 231                rb->l2p_update = (rb->l2p_update + 1) & (rb->nr_entries - 1);
 232        }
 233
 234        pblk_rl_out(&pblk->rl, user_io, gc_io);
 235
 236        return 0;
 237}
 238
 239/*
 240 * When we move the l2p_update pointer, we update the l2p table - lookups will
 241 * point to the physical address instead of to the cacheline in the write buffer
 242 * from this moment on.
 243 */
 244static int pblk_rb_update_l2p(struct pblk_rb *rb, unsigned int nr_entries,
 245                              unsigned int mem, unsigned int sync)
 246{
 247        unsigned int space, count;
 248        int ret = 0;
 249
 250        lockdep_assert_held(&rb->w_lock);
 251
 252        /* Update l2p only as buffer entries are being overwritten */
 253        space = pblk_rb_ring_space(rb, mem, rb->l2p_update, rb->nr_entries);
 254        if (space > nr_entries)
 255                goto out;
 256
 257        count = nr_entries - space;
 258        /* l2p_update used exclusively under rb->w_lock */
 259        ret = __pblk_rb_update_l2p(rb, count);
 260
 261out:
 262        return ret;
 263}
 264
 265/*
 266 * Update the l2p entry for all sectors stored on the write buffer. This means
 267 * that all future lookups to the l2p table will point to a device address, not
 268 * to the cacheline in the write buffer.
 269 */
 270void pblk_rb_sync_l2p(struct pblk_rb *rb)
 271{
 272        unsigned int sync;
 273        unsigned int to_update;
 274
 275        spin_lock(&rb->w_lock);
 276
 277        /* Protect from reads and writes */
 278        sync = smp_load_acquire(&rb->sync);
 279
 280        to_update = pblk_rb_ring_count(sync, rb->l2p_update, rb->nr_entries);
 281        __pblk_rb_update_l2p(rb, to_update);
 282
 283        spin_unlock(&rb->w_lock);
 284}
 285
 286/*
 287 * Write @nr_entries to ring buffer from @data buffer if there is enough space.
 288 * Typically, 4KB data chunks coming from a bio will be copied to the ring
 289 * buffer, thus the write will fail if not all incoming data can be copied.
 290 *
 291 */
 292static void __pblk_rb_write_entry(struct pblk_rb *rb, void *data,
 293                                  struct pblk_w_ctx w_ctx,
 294                                  struct pblk_rb_entry *entry)
 295{
 296        memcpy(entry->data, data, rb->seg_size);
 297
 298        entry->w_ctx.lba = w_ctx.lba;
 299        entry->w_ctx.ppa = w_ctx.ppa;
 300}
 301
 302void pblk_rb_write_entry_user(struct pblk_rb *rb, void *data,
 303                              struct pblk_w_ctx w_ctx, unsigned int ring_pos)
 304{
 305        struct pblk *pblk = container_of(rb, struct pblk, rwb);
 306        struct pblk_rb_entry *entry;
 307        int flags;
 308
 309        entry = &rb->entries[ring_pos];
 310        flags = READ_ONCE(entry->w_ctx.flags);
 311#ifdef CONFIG_NVM_DEBUG
 312        /* Caller must guarantee that the entry is free */
 313        BUG_ON(!(flags & PBLK_WRITABLE_ENTRY));
 314#endif
 315
 316        __pblk_rb_write_entry(rb, data, w_ctx, entry);
 317
 318        pblk_update_map_cache(pblk, w_ctx.lba, entry->cacheline);
 319        flags = w_ctx.flags | PBLK_WRITTEN_DATA;
 320
 321        /* Release flags on write context. Protect from writes */
 322        smp_store_release(&entry->w_ctx.flags, flags);
 323}
 324
 325void pblk_rb_write_entry_gc(struct pblk_rb *rb, void *data,
 326                            struct pblk_w_ctx w_ctx, struct pblk_line *line,
 327                            u64 paddr, unsigned int ring_pos)
 328{
 329        struct pblk *pblk = container_of(rb, struct pblk, rwb);
 330        struct pblk_rb_entry *entry;
 331        int flags;
 332
 333        entry = &rb->entries[ring_pos];
 334        flags = READ_ONCE(entry->w_ctx.flags);
 335#ifdef CONFIG_NVM_DEBUG
 336        /* Caller must guarantee that the entry is free */
 337        BUG_ON(!(flags & PBLK_WRITABLE_ENTRY));
 338#endif
 339
 340        __pblk_rb_write_entry(rb, data, w_ctx, entry);
 341
 342        if (!pblk_update_map_gc(pblk, w_ctx.lba, entry->cacheline, line, paddr))
 343                entry->w_ctx.lba = ADDR_EMPTY;
 344
 345        flags = w_ctx.flags | PBLK_WRITTEN_DATA;
 346
 347        /* Release flags on write context. Protect from writes */
 348        smp_store_release(&entry->w_ctx.flags, flags);
 349}
 350
 351static int pblk_rb_flush_point_set(struct pblk_rb *rb, struct bio *bio,
 352                                   unsigned int pos)
 353{
 354        struct pblk_rb_entry *entry;
 355        unsigned int sync, flush_point;
 356
 357        pblk_rb_sync_init(rb, NULL);
 358        sync = READ_ONCE(rb->sync);
 359
 360        if (pos == sync) {
 361                pblk_rb_sync_end(rb, NULL);
 362                return 0;
 363        }
 364
 365#ifdef CONFIG_NVM_DEBUG
 366        atomic_inc(&rb->inflight_flush_point);
 367#endif
 368
 369        flush_point = (pos == 0) ? (rb->nr_entries - 1) : (pos - 1);
 370        entry = &rb->entries[flush_point];
 371
 372        /* Protect flush points */
 373        smp_store_release(&rb->flush_point, flush_point);
 374
 375        if (bio)
 376                bio_list_add(&entry->w_ctx.bios, bio);
 377
 378        pblk_rb_sync_end(rb, NULL);
 379
 380        return bio ? 1 : 0;
 381}
 382
 383static int __pblk_rb_may_write(struct pblk_rb *rb, unsigned int nr_entries,
 384                               unsigned int *pos)
 385{
 386        unsigned int mem;
 387        unsigned int sync;
 388
 389        sync = READ_ONCE(rb->sync);
 390        mem = READ_ONCE(rb->mem);
 391
 392        if (pblk_rb_ring_space(rb, mem, sync, rb->nr_entries) < nr_entries)
 393                return 0;
 394
 395        if (pblk_rb_update_l2p(rb, nr_entries, mem, sync))
 396                return 0;
 397
 398        *pos = mem;
 399
 400        return 1;
 401}
 402
 403static int pblk_rb_may_write(struct pblk_rb *rb, unsigned int nr_entries,
 404                             unsigned int *pos)
 405{
 406        if (!__pblk_rb_may_write(rb, nr_entries, pos))
 407                return 0;
 408
 409        /* Protect from read count */
 410        smp_store_release(&rb->mem, (*pos + nr_entries) & (rb->nr_entries - 1));
 411        return 1;
 412}
 413
 414void pblk_rb_flush(struct pblk_rb *rb)
 415{
 416        struct pblk *pblk = container_of(rb, struct pblk, rwb);
 417        unsigned int mem = READ_ONCE(rb->mem);
 418
 419        if (pblk_rb_flush_point_set(rb, NULL, mem))
 420                return;
 421
 422        pblk_write_kick(pblk);
 423}
 424
 425static int pblk_rb_may_write_flush(struct pblk_rb *rb, unsigned int nr_entries,
 426                                   unsigned int *pos, struct bio *bio,
 427                                   int *io_ret)
 428{
 429        unsigned int mem;
 430
 431        if (!__pblk_rb_may_write(rb, nr_entries, pos))
 432                return 0;
 433
 434        mem = (*pos + nr_entries) & (rb->nr_entries - 1);
 435        *io_ret = NVM_IO_DONE;
 436
 437        if (bio->bi_opf & REQ_PREFLUSH) {
 438                struct pblk *pblk = container_of(rb, struct pblk, rwb);
 439
 440                atomic64_inc(&pblk->nr_flush);
 441                if (pblk_rb_flush_point_set(&pblk->rwb, bio, mem))
 442                        *io_ret = NVM_IO_OK;
 443        }
 444
 445        /* Protect from read count */
 446        smp_store_release(&rb->mem, mem);
 447
 448        return 1;
 449}
 450
 451/*
 452 * Atomically check that (i) there is space on the write buffer for the
 453 * incoming I/O, and (ii) the current I/O type has enough budget in the write
 454 * buffer (rate-limiter).
 455 */
 456int pblk_rb_may_write_user(struct pblk_rb *rb, struct bio *bio,
 457                           unsigned int nr_entries, unsigned int *pos)
 458{
 459        struct pblk *pblk = container_of(rb, struct pblk, rwb);
 460        int io_ret;
 461
 462        spin_lock(&rb->w_lock);
 463        io_ret = pblk_rl_user_may_insert(&pblk->rl, nr_entries);
 464        if (io_ret) {
 465                spin_unlock(&rb->w_lock);
 466                return io_ret;
 467        }
 468
 469        if (!pblk_rb_may_write_flush(rb, nr_entries, pos, bio, &io_ret)) {
 470                spin_unlock(&rb->w_lock);
 471                return NVM_IO_REQUEUE;
 472        }
 473
 474        pblk_rl_user_in(&pblk->rl, nr_entries);
 475        spin_unlock(&rb->w_lock);
 476
 477        return io_ret;
 478}
 479
 480/*
 481 * Look at pblk_rb_may_write_user comment
 482 */
 483int pblk_rb_may_write_gc(struct pblk_rb *rb, unsigned int nr_entries,
 484                         unsigned int *pos)
 485{
 486        struct pblk *pblk = container_of(rb, struct pblk, rwb);
 487
 488        spin_lock(&rb->w_lock);
 489        if (!pblk_rl_gc_may_insert(&pblk->rl, nr_entries)) {
 490                spin_unlock(&rb->w_lock);
 491                return 0;
 492        }
 493
 494        if (!pblk_rb_may_write(rb, nr_entries, pos)) {
 495                spin_unlock(&rb->w_lock);
 496                return 0;
 497        }
 498
 499        pblk_rl_gc_in(&pblk->rl, nr_entries);
 500        spin_unlock(&rb->w_lock);
 501
 502        return 1;
 503}
 504
 505/*
 506 * Read available entries on rb and add them to the given bio. To avoid a memory
 507 * copy, a page reference to the write buffer is used to be added to the bio.
 508 *
 509 * This function is used by the write thread to form the write bio that will
 510 * persist data on the write buffer to the media.
 511 */
 512unsigned int pblk_rb_read_to_bio(struct pblk_rb *rb, struct nvm_rq *rqd,
 513                                 unsigned int pos, unsigned int nr_entries,
 514                                 unsigned int count)
 515{
 516        struct pblk *pblk = container_of(rb, struct pblk, rwb);
 517        struct request_queue *q = pblk->dev->q;
 518        struct pblk_c_ctx *c_ctx = nvm_rq_to_pdu(rqd);
 519        struct bio *bio = rqd->bio;
 520        struct pblk_rb_entry *entry;
 521        struct page *page;
 522        unsigned int pad = 0, to_read = nr_entries;
 523        unsigned int i;
 524        int flags;
 525
 526        if (count < nr_entries) {
 527                pad = nr_entries - count;
 528                to_read = count;
 529        }
 530
 531        c_ctx->sentry = pos;
 532        c_ctx->nr_valid = to_read;
 533        c_ctx->nr_padded = pad;
 534
 535        for (i = 0; i < to_read; i++) {
 536                entry = &rb->entries[pos];
 537
 538                /* A write has been allowed into the buffer, but data is still
 539                 * being copied to it. It is ok to busy wait.
 540                 */
 541try:
 542                flags = READ_ONCE(entry->w_ctx.flags);
 543                if (!(flags & PBLK_WRITTEN_DATA)) {
 544                        io_schedule();
 545                        goto try;
 546                }
 547
 548                page = virt_to_page(entry->data);
 549                if (!page) {
 550                        pr_err("pblk: could not allocate write bio page\n");
 551                        flags &= ~PBLK_WRITTEN_DATA;
 552                        flags |= PBLK_SUBMITTED_ENTRY;
 553                        /* Release flags on context. Protect from writes */
 554                        smp_store_release(&entry->w_ctx.flags, flags);
 555                        return NVM_IO_ERR;
 556                }
 557
 558                if (bio_add_pc_page(q, bio, page, rb->seg_size, 0) !=
 559                                                                rb->seg_size) {
 560                        pr_err("pblk: could not add page to write bio\n");
 561                        flags &= ~PBLK_WRITTEN_DATA;
 562                        flags |= PBLK_SUBMITTED_ENTRY;
 563                        /* Release flags on context. Protect from writes */
 564                        smp_store_release(&entry->w_ctx.flags, flags);
 565                        return NVM_IO_ERR;
 566                }
 567
 568                flags &= ~PBLK_WRITTEN_DATA;
 569                flags |= PBLK_SUBMITTED_ENTRY;
 570
 571                /* Release flags on context. Protect from writes */
 572                smp_store_release(&entry->w_ctx.flags, flags);
 573
 574                pos = (pos + 1) & (rb->nr_entries - 1);
 575        }
 576
 577        if (pad) {
 578                if (pblk_bio_add_pages(pblk, bio, GFP_KERNEL, pad)) {
 579                        pr_err("pblk: could not pad page in write bio\n");
 580                        return NVM_IO_ERR;
 581                }
 582
 583                if (pad < pblk->min_write_pgs)
 584                        atomic64_inc(&pblk->pad_dist[pad - 1]);
 585                else
 586                        pr_warn("pblk: padding more than min. sectors\n");
 587
 588                atomic64_add(pad, &pblk->pad_wa);
 589        }
 590
 591#ifdef CONFIG_NVM_DEBUG
 592        atomic_long_add(pad, &pblk->padded_writes);
 593#endif
 594
 595        return NVM_IO_OK;
 596}
 597
 598/*
 599 * Copy to bio only if the lba matches the one on the given cache entry.
 600 * Otherwise, it means that the entry has been overwritten, and the bio should
 601 * be directed to disk.
 602 */
 603int pblk_rb_copy_to_bio(struct pblk_rb *rb, struct bio *bio, sector_t lba,
 604                        struct ppa_addr ppa, int bio_iter, bool advanced_bio)
 605{
 606        struct pblk *pblk = container_of(rb, struct pblk, rwb);
 607        struct pblk_rb_entry *entry;
 608        struct pblk_w_ctx *w_ctx;
 609        struct ppa_addr l2p_ppa;
 610        u64 pos = pblk_addr_to_cacheline(ppa);
 611        void *data;
 612        int flags;
 613        int ret = 1;
 614
 615
 616#ifdef CONFIG_NVM_DEBUG
 617        /* Caller must ensure that the access will not cause an overflow */
 618        BUG_ON(pos >= rb->nr_entries);
 619#endif
 620        entry = &rb->entries[pos];
 621        w_ctx = &entry->w_ctx;
 622        flags = READ_ONCE(w_ctx->flags);
 623
 624        spin_lock(&rb->w_lock);
 625        spin_lock(&pblk->trans_lock);
 626        l2p_ppa = pblk_trans_map_get(pblk, lba);
 627        spin_unlock(&pblk->trans_lock);
 628
 629        /* Check if the entry has been overwritten or is scheduled to be */
 630        if (!pblk_ppa_comp(l2p_ppa, ppa) || w_ctx->lba != lba ||
 631                                                flags & PBLK_WRITABLE_ENTRY) {
 632                ret = 0;
 633                goto out;
 634        }
 635
 636        /* Only advance the bio if it hasn't been advanced already. If advanced,
 637         * this bio is at least a partial bio (i.e., it has partially been
 638         * filled with data from the cache). If part of the data resides on the
 639         * media, we will read later on
 640         */
 641        if (unlikely(!advanced_bio))
 642                bio_advance(bio, bio_iter * PBLK_EXPOSED_PAGE_SIZE);
 643
 644        data = bio_data(bio);
 645        memcpy(data, entry->data, rb->seg_size);
 646
 647out:
 648        spin_unlock(&rb->w_lock);
 649        return ret;
 650}
 651
 652struct pblk_w_ctx *pblk_rb_w_ctx(struct pblk_rb *rb, unsigned int pos)
 653{
 654        unsigned int entry = pos & (rb->nr_entries - 1);
 655
 656        return &rb->entries[entry].w_ctx;
 657}
 658
 659unsigned int pblk_rb_sync_init(struct pblk_rb *rb, unsigned long *flags)
 660        __acquires(&rb->s_lock)
 661{
 662        if (flags)
 663                spin_lock_irqsave(&rb->s_lock, *flags);
 664        else
 665                spin_lock_irq(&rb->s_lock);
 666
 667        return rb->sync;
 668}
 669
 670void pblk_rb_sync_end(struct pblk_rb *rb, unsigned long *flags)
 671        __releases(&rb->s_lock)
 672{
 673        lockdep_assert_held(&rb->s_lock);
 674
 675        if (flags)
 676                spin_unlock_irqrestore(&rb->s_lock, *flags);
 677        else
 678                spin_unlock_irq(&rb->s_lock);
 679}
 680
 681unsigned int pblk_rb_sync_advance(struct pblk_rb *rb, unsigned int nr_entries)
 682{
 683        unsigned int sync, flush_point;
 684        lockdep_assert_held(&rb->s_lock);
 685
 686        sync = READ_ONCE(rb->sync);
 687        flush_point = READ_ONCE(rb->flush_point);
 688
 689        if (flush_point != EMPTY_ENTRY) {
 690                unsigned int secs_to_flush;
 691
 692                secs_to_flush = pblk_rb_ring_count(flush_point, sync,
 693                                        rb->nr_entries);
 694                if (secs_to_flush < nr_entries) {
 695                        /* Protect flush points */
 696                        smp_store_release(&rb->flush_point, EMPTY_ENTRY);
 697                }
 698        }
 699
 700        sync = (sync + nr_entries) & (rb->nr_entries - 1);
 701
 702        /* Protect from counts */
 703        smp_store_release(&rb->sync, sync);
 704
 705        return sync;
 706}
 707
 708/* Calculate how many sectors to submit up to the current flush point. */
 709unsigned int pblk_rb_flush_point_count(struct pblk_rb *rb)
 710{
 711        unsigned int subm, sync, flush_point;
 712        unsigned int submitted, to_flush;
 713
 714        /* Protect flush points */
 715        flush_point = smp_load_acquire(&rb->flush_point);
 716        if (flush_point == EMPTY_ENTRY)
 717                return 0;
 718
 719        /* Protect syncs */
 720        sync = smp_load_acquire(&rb->sync);
 721
 722        subm = READ_ONCE(rb->subm);
 723        submitted = pblk_rb_ring_count(subm, sync, rb->nr_entries);
 724
 725        /* The sync point itself counts as a sector to sync */
 726        to_flush = pblk_rb_ring_count(flush_point, sync, rb->nr_entries) + 1;
 727
 728        return (submitted < to_flush) ? (to_flush - submitted) : 0;
 729}
 730
 731/*
 732 * Scan from the current position of the sync pointer to find the entry that
 733 * corresponds to the given ppa. This is necessary since write requests can be
 734 * completed out of order. The assumption is that the ppa is close to the sync
 735 * pointer thus the search will not take long.
 736 *
 737 * The caller of this function must guarantee that the sync pointer will no
 738 * reach the entry while it is using the metadata associated with it. With this
 739 * assumption in mind, there is no need to take the sync lock.
 740 */
 741struct pblk_rb_entry *pblk_rb_sync_scan_entry(struct pblk_rb *rb,
 742                                              struct ppa_addr *ppa)
 743{
 744        unsigned int sync, subm, count;
 745        unsigned int i;
 746
 747        sync = READ_ONCE(rb->sync);
 748        subm = READ_ONCE(rb->subm);
 749        count = pblk_rb_ring_count(subm, sync, rb->nr_entries);
 750
 751        for (i = 0; i < count; i++)
 752                sync = (sync + 1) & (rb->nr_entries - 1);
 753
 754        return NULL;
 755}
 756
 757int pblk_rb_tear_down_check(struct pblk_rb *rb)
 758{
 759        struct pblk_rb_entry *entry;
 760        int i;
 761        int ret = 0;
 762
 763        spin_lock(&rb->w_lock);
 764        spin_lock_irq(&rb->s_lock);
 765
 766        if ((rb->mem == rb->subm) && (rb->subm == rb->sync) &&
 767                                (rb->sync == rb->l2p_update) &&
 768                                (rb->flush_point == EMPTY_ENTRY)) {
 769                goto out;
 770        }
 771
 772        if (!rb->entries) {
 773                ret = 1;
 774                goto out;
 775        }
 776
 777        for (i = 0; i < rb->nr_entries; i++) {
 778                entry = &rb->entries[i];
 779
 780                if (!entry->data) {
 781                        ret = 1;
 782                        goto out;
 783                }
 784        }
 785
 786out:
 787        spin_unlock(&rb->w_lock);
 788        spin_unlock_irq(&rb->s_lock);
 789
 790        return ret;
 791}
 792
 793unsigned int pblk_rb_wrap_pos(struct pblk_rb *rb, unsigned int pos)
 794{
 795        return (pos & (rb->nr_entries - 1));
 796}
 797
 798int pblk_rb_pos_oob(struct pblk_rb *rb, u64 pos)
 799{
 800        return (pos >= rb->nr_entries);
 801}
 802
 803ssize_t pblk_rb_sysfs(struct pblk_rb *rb, char *buf)
 804{
 805        struct pblk *pblk = container_of(rb, struct pblk, rwb);
 806        struct pblk_c_ctx *c;
 807        ssize_t offset;
 808        int queued_entries = 0;
 809
 810        spin_lock_irq(&rb->s_lock);
 811        list_for_each_entry(c, &pblk->compl_list, list)
 812                queued_entries++;
 813        spin_unlock_irq(&rb->s_lock);
 814
 815        if (rb->flush_point != EMPTY_ENTRY)
 816                offset = scnprintf(buf, PAGE_SIZE,
 817                        "%u\t%u\t%u\t%u\t%u\t%u\t%u - %u/%u/%u - %d\n",
 818                        rb->nr_entries,
 819                        rb->mem,
 820                        rb->subm,
 821                        rb->sync,
 822                        rb->l2p_update,
 823#ifdef CONFIG_NVM_DEBUG
 824                        atomic_read(&rb->inflight_flush_point),
 825#else
 826                        0,
 827#endif
 828                        rb->flush_point,
 829                        pblk_rb_read_count(rb),
 830                        pblk_rb_space(rb),
 831                        pblk_rb_flush_point_count(rb),
 832                        queued_entries);
 833        else
 834                offset = scnprintf(buf, PAGE_SIZE,
 835                        "%u\t%u\t%u\t%u\t%u\t%u\tNULL - %u/%u/%u - %d\n",
 836                        rb->nr_entries,
 837                        rb->mem,
 838                        rb->subm,
 839                        rb->sync,
 840                        rb->l2p_update,
 841#ifdef CONFIG_NVM_DEBUG
 842                        atomic_read(&rb->inflight_flush_point),
 843#else
 844                        0,
 845#endif
 846                        pblk_rb_read_count(rb),
 847                        pblk_rb_space(rb),
 848                        pblk_rb_flush_point_count(rb),
 849                        queued_entries);
 850
 851        return offset;
 852}
 853