linux/include/linux/ptr_ring.h
<<
>>
Prefs
   1/*
   2 *      Definitions for the 'struct ptr_ring' datastructure.
   3 *
   4 *      Author:
   5 *              Michael S. Tsirkin <mst@redhat.com>
   6 *
   7 *      Copyright (C) 2016 Red Hat, Inc.
   8 *
   9 *      This program is free software; you can redistribute it and/or modify it
  10 *      under the terms of the GNU General Public License as published by the
  11 *      Free Software Foundation; either version 2 of the License, or (at your
  12 *      option) any later version.
  13 *
  14 *      This is a limited-size FIFO maintaining pointers in FIFO order, with
  15 *      one CPU producing entries and another consuming entries from a FIFO.
  16 *
  17 *      This implementation tries to minimize cache-contention when there is a
  18 *      single producer and a single consumer CPU.
  19 */
  20
  21#ifndef _LINUX_PTR_RING_H
  22#define _LINUX_PTR_RING_H 1
  23
  24#ifdef __KERNEL__
  25#include <linux/spinlock.h>
  26#include <linux/cache.h>
  27#include <linux/types.h>
  28#include <linux/compiler.h>
  29#include <linux/cache.h>
  30#include <linux/slab.h>
  31#include <asm/errno.h>
  32#endif
  33
  34struct ptr_ring {
  35        int producer ____cacheline_aligned_in_smp;
  36        spinlock_t producer_lock;
  37        int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
  38        int consumer_tail; /* next entry to invalidate */
  39        spinlock_t consumer_lock;
  40        /* Shared consumer/producer data */
  41        /* Read-only by both the producer and the consumer */
  42        int size ____cacheline_aligned_in_smp; /* max entries in queue */
  43        int batch; /* number of entries to consume in a batch */
  44        void **queue;
  45};
  46
  47/* Note: callers invoking this in a loop must use a compiler barrier,
  48 * for example cpu_relax().
  49 *
  50 * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
  51 * see e.g. ptr_ring_full.
  52 */
  53static inline bool __ptr_ring_full(struct ptr_ring *r)
  54{
  55        return r->queue[r->producer];
  56}
  57
  58static inline bool ptr_ring_full(struct ptr_ring *r)
  59{
  60        bool ret;
  61
  62        spin_lock(&r->producer_lock);
  63        ret = __ptr_ring_full(r);
  64        spin_unlock(&r->producer_lock);
  65
  66        return ret;
  67}
  68
  69static inline bool ptr_ring_full_irq(struct ptr_ring *r)
  70{
  71        bool ret;
  72
  73        spin_lock_irq(&r->producer_lock);
  74        ret = __ptr_ring_full(r);
  75        spin_unlock_irq(&r->producer_lock);
  76
  77        return ret;
  78}
  79
  80static inline bool ptr_ring_full_any(struct ptr_ring *r)
  81{
  82        unsigned long flags;
  83        bool ret;
  84
  85        spin_lock_irqsave(&r->producer_lock, flags);
  86        ret = __ptr_ring_full(r);
  87        spin_unlock_irqrestore(&r->producer_lock, flags);
  88
  89        return ret;
  90}
  91
  92static inline bool ptr_ring_full_bh(struct ptr_ring *r)
  93{
  94        bool ret;
  95
  96        spin_lock_bh(&r->producer_lock);
  97        ret = __ptr_ring_full(r);
  98        spin_unlock_bh(&r->producer_lock);
  99
 100        return ret;
 101}
 102
 103/* Note: callers invoking this in a loop must use a compiler barrier,
 104 * for example cpu_relax(). Callers must hold producer_lock.
 105 * Callers are responsible for making sure pointer that is being queued
 106 * points to a valid data.
 107 */
 108static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
 109{
 110        if (unlikely(!r->size) || r->queue[r->producer])
 111                return -ENOSPC;
 112
 113        /* Make sure the pointer we are storing points to a valid data. */
 114        /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */
 115        smp_wmb();
 116
 117        WRITE_ONCE(r->queue[r->producer++], ptr);
 118        if (unlikely(r->producer >= r->size))
 119                r->producer = 0;
 120        return 0;
 121}
 122
 123/*
 124 * Note: resize (below) nests producer lock within consumer lock, so if you
 125 * consume in interrupt or BH context, you must disable interrupts/BH when
 126 * calling this.
 127 */
 128static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
 129{
 130        int ret;
 131
 132        spin_lock(&r->producer_lock);
 133        ret = __ptr_ring_produce(r, ptr);
 134        spin_unlock(&r->producer_lock);
 135
 136        return ret;
 137}
 138
 139static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
 140{
 141        int ret;
 142
 143        spin_lock_irq(&r->producer_lock);
 144        ret = __ptr_ring_produce(r, ptr);
 145        spin_unlock_irq(&r->producer_lock);
 146
 147        return ret;
 148}
 149
 150static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
 151{
 152        unsigned long flags;
 153        int ret;
 154
 155        spin_lock_irqsave(&r->producer_lock, flags);
 156        ret = __ptr_ring_produce(r, ptr);
 157        spin_unlock_irqrestore(&r->producer_lock, flags);
 158
 159        return ret;
 160}
 161
 162static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
 163{
 164        int ret;
 165
 166        spin_lock_bh(&r->producer_lock);
 167        ret = __ptr_ring_produce(r, ptr);
 168        spin_unlock_bh(&r->producer_lock);
 169
 170        return ret;
 171}
 172
 173static inline void *__ptr_ring_peek(struct ptr_ring *r)
 174{
 175        if (likely(r->size))
 176                return READ_ONCE(r->queue[r->consumer_head]);
 177        return NULL;
 178}
 179
 180/*
 181 * Test ring empty status without taking any locks.
 182 *
 183 * NB: This is only safe to call if ring is never resized.
 184 *
 185 * However, if some other CPU consumes ring entries at the same time, the value
 186 * returned is not guaranteed to be correct.
 187 *
 188 * In this case - to avoid incorrectly detecting the ring
 189 * as empty - the CPU consuming the ring entries is responsible
 190 * for either consuming all ring entries until the ring is empty,
 191 * or synchronizing with some other CPU and causing it to
 192 * re-test __ptr_ring_empty and/or consume the ring enteries
 193 * after the synchronization point.
 194 *
 195 * Note: callers invoking this in a loop must use a compiler barrier,
 196 * for example cpu_relax().
 197 */
 198static inline bool __ptr_ring_empty(struct ptr_ring *r)
 199{
 200        if (likely(r->size))
 201                return !r->queue[READ_ONCE(r->consumer_head)];
 202        return true;
 203}
 204
 205static inline bool ptr_ring_empty(struct ptr_ring *r)
 206{
 207        bool ret;
 208
 209        spin_lock(&r->consumer_lock);
 210        ret = __ptr_ring_empty(r);
 211        spin_unlock(&r->consumer_lock);
 212
 213        return ret;
 214}
 215
 216static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
 217{
 218        bool ret;
 219
 220        spin_lock_irq(&r->consumer_lock);
 221        ret = __ptr_ring_empty(r);
 222        spin_unlock_irq(&r->consumer_lock);
 223
 224        return ret;
 225}
 226
 227static inline bool ptr_ring_empty_any(struct ptr_ring *r)
 228{
 229        unsigned long flags;
 230        bool ret;
 231
 232        spin_lock_irqsave(&r->consumer_lock, flags);
 233        ret = __ptr_ring_empty(r);
 234        spin_unlock_irqrestore(&r->consumer_lock, flags);
 235
 236        return ret;
 237}
 238
 239static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
 240{
 241        bool ret;
 242
 243        spin_lock_bh(&r->consumer_lock);
 244        ret = __ptr_ring_empty(r);
 245        spin_unlock_bh(&r->consumer_lock);
 246
 247        return ret;
 248}
 249
 250/* Must only be called after __ptr_ring_peek returned !NULL */
 251static inline void __ptr_ring_discard_one(struct ptr_ring *r)
 252{
 253        /* Fundamentally, what we want to do is update consumer
 254         * index and zero out the entry so producer can reuse it.
 255         * Doing it naively at each consume would be as simple as:
 256         *       consumer = r->consumer;
 257         *       r->queue[consumer++] = NULL;
 258         *       if (unlikely(consumer >= r->size))
 259         *               consumer = 0;
 260         *       r->consumer = consumer;
 261         * but that is suboptimal when the ring is full as producer is writing
 262         * out new entries in the same cache line.  Defer these updates until a
 263         * batch of entries has been consumed.
 264         */
 265        /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
 266         * to work correctly.
 267         */
 268        int consumer_head = r->consumer_head;
 269        int head = consumer_head++;
 270
 271        /* Once we have processed enough entries invalidate them in
 272         * the ring all at once so producer can reuse their space in the ring.
 273         * We also do this when we reach end of the ring - not mandatory
 274         * but helps keep the implementation simple.
 275         */
 276        if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
 277                     consumer_head >= r->size)) {
 278                /* Zero out entries in the reverse order: this way we touch the
 279                 * cache line that producer might currently be reading the last;
 280                 * producer won't make progress and touch other cache lines
 281                 * besides the first one until we write out all entries.
 282                 */
 283                while (likely(head >= r->consumer_tail))
 284                        r->queue[head--] = NULL;
 285                r->consumer_tail = consumer_head;
 286        }
 287        if (unlikely(consumer_head >= r->size)) {
 288                consumer_head = 0;
 289                r->consumer_tail = 0;
 290        }
 291        /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
 292        WRITE_ONCE(r->consumer_head, consumer_head);
 293}
 294
 295static inline void *__ptr_ring_consume(struct ptr_ring *r)
 296{
 297        void *ptr;
 298
 299        /* The READ_ONCE in __ptr_ring_peek guarantees that anyone
 300         * accessing data through the pointer is up to date. Pairs
 301         * with smp_wmb in __ptr_ring_produce.
 302         */
 303        ptr = __ptr_ring_peek(r);
 304        if (ptr)
 305                __ptr_ring_discard_one(r);
 306
 307        return ptr;
 308}
 309
 310static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
 311                                             void **array, int n)
 312{
 313        void *ptr;
 314        int i;
 315
 316        for (i = 0; i < n; i++) {
 317                ptr = __ptr_ring_consume(r);
 318                if (!ptr)
 319                        break;
 320                array[i] = ptr;
 321        }
 322
 323        return i;
 324}
 325
 326/*
 327 * Note: resize (below) nests producer lock within consumer lock, so if you
 328 * call this in interrupt or BH context, you must disable interrupts/BH when
 329 * producing.
 330 */
 331static inline void *ptr_ring_consume(struct ptr_ring *r)
 332{
 333        void *ptr;
 334
 335        spin_lock(&r->consumer_lock);
 336        ptr = __ptr_ring_consume(r);
 337        spin_unlock(&r->consumer_lock);
 338
 339        return ptr;
 340}
 341
 342static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
 343{
 344        void *ptr;
 345
 346        spin_lock_irq(&r->consumer_lock);
 347        ptr = __ptr_ring_consume(r);
 348        spin_unlock_irq(&r->consumer_lock);
 349
 350        return ptr;
 351}
 352
 353static inline void *ptr_ring_consume_any(struct ptr_ring *r)
 354{
 355        unsigned long flags;
 356        void *ptr;
 357
 358        spin_lock_irqsave(&r->consumer_lock, flags);
 359        ptr = __ptr_ring_consume(r);
 360        spin_unlock_irqrestore(&r->consumer_lock, flags);
 361
 362        return ptr;
 363}
 364
 365static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
 366{
 367        void *ptr;
 368
 369        spin_lock_bh(&r->consumer_lock);
 370        ptr = __ptr_ring_consume(r);
 371        spin_unlock_bh(&r->consumer_lock);
 372
 373        return ptr;
 374}
 375
 376static inline int ptr_ring_consume_batched(struct ptr_ring *r,
 377                                           void **array, int n)
 378{
 379        int ret;
 380
 381        spin_lock(&r->consumer_lock);
 382        ret = __ptr_ring_consume_batched(r, array, n);
 383        spin_unlock(&r->consumer_lock);
 384
 385        return ret;
 386}
 387
 388static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
 389                                               void **array, int n)
 390{
 391        int ret;
 392
 393        spin_lock_irq(&r->consumer_lock);
 394        ret = __ptr_ring_consume_batched(r, array, n);
 395        spin_unlock_irq(&r->consumer_lock);
 396
 397        return ret;
 398}
 399
 400static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
 401                                               void **array, int n)
 402{
 403        unsigned long flags;
 404        int ret;
 405
 406        spin_lock_irqsave(&r->consumer_lock, flags);
 407        ret = __ptr_ring_consume_batched(r, array, n);
 408        spin_unlock_irqrestore(&r->consumer_lock, flags);
 409
 410        return ret;
 411}
 412
 413static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
 414                                              void **array, int n)
 415{
 416        int ret;
 417
 418        spin_lock_bh(&r->consumer_lock);
 419        ret = __ptr_ring_consume_batched(r, array, n);
 420        spin_unlock_bh(&r->consumer_lock);
 421
 422        return ret;
 423}
 424
 425/* Cast to structure type and call a function without discarding from FIFO.
 426 * Function must return a value.
 427 * Callers must take consumer_lock.
 428 */
 429#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
 430
 431#define PTR_RING_PEEK_CALL(r, f) ({ \
 432        typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 433        \
 434        spin_lock(&(r)->consumer_lock); \
 435        __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 436        spin_unlock(&(r)->consumer_lock); \
 437        __PTR_RING_PEEK_CALL_v; \
 438})
 439
 440#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
 441        typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 442        \
 443        spin_lock_irq(&(r)->consumer_lock); \
 444        __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 445        spin_unlock_irq(&(r)->consumer_lock); \
 446        __PTR_RING_PEEK_CALL_v; \
 447})
 448
 449#define PTR_RING_PEEK_CALL_BH(r, f) ({ \
 450        typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 451        \
 452        spin_lock_bh(&(r)->consumer_lock); \
 453        __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 454        spin_unlock_bh(&(r)->consumer_lock); \
 455        __PTR_RING_PEEK_CALL_v; \
 456})
 457
 458#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
 459        typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 460        unsigned long __PTR_RING_PEEK_CALL_f;\
 461        \
 462        spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
 463        __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 464        spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
 465        __PTR_RING_PEEK_CALL_v; \
 466})
 467
 468/* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See
 469 * documentation for vmalloc for which of them are legal.
 470 */
 471static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
 472{
 473        if (size > KMALLOC_MAX_SIZE / sizeof(void *))
 474                return NULL;
 475        return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO);
 476}
 477
 478static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
 479{
 480        r->size = size;
 481        r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
 482        /* We need to set batch at least to 1 to make logic
 483         * in __ptr_ring_discard_one work correctly.
 484         * Batching too much (because ring is small) would cause a lot of
 485         * burstiness. Needs tuning, for now disable batching.
 486         */
 487        if (r->batch > r->size / 2 || !r->batch)
 488                r->batch = 1;
 489}
 490
 491static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
 492{
 493        r->queue = __ptr_ring_init_queue_alloc(size, gfp);
 494        if (!r->queue)
 495                return -ENOMEM;
 496
 497        __ptr_ring_set_size(r, size);
 498        r->producer = r->consumer_head = r->consumer_tail = 0;
 499        spin_lock_init(&r->producer_lock);
 500        spin_lock_init(&r->consumer_lock);
 501
 502        return 0;
 503}
 504
 505/*
 506 * Return entries into ring. Destroy entries that don't fit.
 507 *
 508 * Note: this is expected to be a rare slow path operation.
 509 *
 510 * Note: producer lock is nested within consumer lock, so if you
 511 * resize you must make sure all uses nest correctly.
 512 * In particular if you consume ring in interrupt or BH context, you must
 513 * disable interrupts/BH when doing so.
 514 */
 515static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
 516                                      void (*destroy)(void *))
 517{
 518        unsigned long flags;
 519        int head;
 520
 521        spin_lock_irqsave(&r->consumer_lock, flags);
 522        spin_lock(&r->producer_lock);
 523
 524        if (!r->size)
 525                goto done;
 526
 527        /*
 528         * Clean out buffered entries (for simplicity). This way following code
 529         * can test entries for NULL and if not assume they are valid.
 530         */
 531        head = r->consumer_head - 1;
 532        while (likely(head >= r->consumer_tail))
 533                r->queue[head--] = NULL;
 534        r->consumer_tail = r->consumer_head;
 535
 536        /*
 537         * Go over entries in batch, start moving head back and copy entries.
 538         * Stop when we run into previously unconsumed entries.
 539         */
 540        while (n) {
 541                head = r->consumer_head - 1;
 542                if (head < 0)
 543                        head = r->size - 1;
 544                if (r->queue[head]) {
 545                        /* This batch entry will have to be destroyed. */
 546                        goto done;
 547                }
 548                r->queue[head] = batch[--n];
 549                r->consumer_tail = head;
 550                /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
 551                WRITE_ONCE(r->consumer_head, head);
 552        }
 553
 554done:
 555        /* Destroy all entries left in the batch. */
 556        while (n)
 557                destroy(batch[--n]);
 558        spin_unlock(&r->producer_lock);
 559        spin_unlock_irqrestore(&r->consumer_lock, flags);
 560}
 561
 562static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
 563                                           int size, gfp_t gfp,
 564                                           void (*destroy)(void *))
 565{
 566        int producer = 0;
 567        void **old;
 568        void *ptr;
 569
 570        while ((ptr = __ptr_ring_consume(r)))
 571                if (producer < size)
 572                        queue[producer++] = ptr;
 573                else if (destroy)
 574                        destroy(ptr);
 575
 576        __ptr_ring_set_size(r, size);
 577        r->producer = producer;
 578        r->consumer_head = 0;
 579        r->consumer_tail = 0;
 580        old = r->queue;
 581        r->queue = queue;
 582
 583        return old;
 584}
 585
 586/*
 587 * Note: producer lock is nested within consumer lock, so if you
 588 * resize you must make sure all uses nest correctly.
 589 * In particular if you consume ring in interrupt or BH context, you must
 590 * disable interrupts/BH when doing so.
 591 */
 592static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
 593                                  void (*destroy)(void *))
 594{
 595        unsigned long flags;
 596        void **queue = __ptr_ring_init_queue_alloc(size, gfp);
 597        void **old;
 598
 599        if (!queue)
 600                return -ENOMEM;
 601
 602        spin_lock_irqsave(&(r)->consumer_lock, flags);
 603        spin_lock(&(r)->producer_lock);
 604
 605        old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
 606
 607        spin_unlock(&(r)->producer_lock);
 608        spin_unlock_irqrestore(&(r)->consumer_lock, flags);
 609
 610        kvfree(old);
 611
 612        return 0;
 613}
 614
 615/*
 616 * Note: producer lock is nested within consumer lock, so if you
 617 * resize you must make sure all uses nest correctly.
 618 * In particular if you consume ring in interrupt or BH context, you must
 619 * disable interrupts/BH when doing so.
 620 */
 621static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
 622                                           unsigned int nrings,
 623                                           int size,
 624                                           gfp_t gfp, void (*destroy)(void *))
 625{
 626        unsigned long flags;
 627        void ***queues;
 628        int i;
 629
 630        queues = kmalloc_array(nrings, sizeof(*queues), gfp);
 631        if (!queues)
 632                goto noqueues;
 633
 634        for (i = 0; i < nrings; ++i) {
 635                queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
 636                if (!queues[i])
 637                        goto nomem;
 638        }
 639
 640        for (i = 0; i < nrings; ++i) {
 641                spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
 642                spin_lock(&(rings[i])->producer_lock);
 643                queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
 644                                                  size, gfp, destroy);
 645                spin_unlock(&(rings[i])->producer_lock);
 646                spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
 647        }
 648
 649        for (i = 0; i < nrings; ++i)
 650                kvfree(queues[i]);
 651
 652        kfree(queues);
 653
 654        return 0;
 655
 656nomem:
 657        while (--i >= 0)
 658                kvfree(queues[i]);
 659
 660        kfree(queues);
 661
 662noqueues:
 663        return -ENOMEM;
 664}
 665
 666static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
 667{
 668        void *ptr;
 669
 670        if (destroy)
 671                while ((ptr = ptr_ring_consume(r)))
 672                        destroy(ptr);
 673        kvfree(r->queue);
 674}
 675
 676#endif /* _LINUX_PTR_RING_H  */
 677