linux/include/linux/ptr_ring.h
<<
>>
Prefs
   1/* SPDX-License-Identifier: GPL-2.0-or-later */
   2/*
   3 *      Definitions for the 'struct ptr_ring' datastructure.
   4 *
   5 *      Author:
   6 *              Michael S. Tsirkin <mst@redhat.com>
   7 *
   8 *      Copyright (C) 2016 Red Hat, Inc.
   9 *
  10 *      This is a limited-size FIFO maintaining pointers in FIFO order, with
  11 *      one CPU producing entries and another consuming entries from a FIFO.
  12 *
  13 *      This implementation tries to minimize cache-contention when there is a
  14 *      single producer and a single consumer CPU.
  15 */
  16
  17#ifndef _LINUX_PTR_RING_H
  18#define _LINUX_PTR_RING_H 1
  19
  20#ifdef __KERNEL__
  21#include <linux/spinlock.h>
  22#include <linux/cache.h>
  23#include <linux/types.h>
  24#include <linux/compiler.h>
  25#include <linux/slab.h>
  26#include <asm/errno.h>
  27#endif
  28
  29struct ptr_ring {
  30        int producer ____cacheline_aligned_in_smp;
  31        spinlock_t producer_lock;
  32        int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
  33        int consumer_tail; /* next entry to invalidate */
  34        spinlock_t consumer_lock;
  35        /* Shared consumer/producer data */
  36        /* Read-only by both the producer and the consumer */
  37        int size ____cacheline_aligned_in_smp; /* max entries in queue */
  38        int batch; /* number of entries to consume in a batch */
  39        void **queue;
  40};
  41
  42/* Note: callers invoking this in a loop must use a compiler barrier,
  43 * for example cpu_relax().
  44 *
  45 * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
  46 * see e.g. ptr_ring_full.
  47 */
  48static inline bool __ptr_ring_full(struct ptr_ring *r)
  49{
  50        return r->queue[r->producer];
  51}
  52
  53static inline bool ptr_ring_full(struct ptr_ring *r)
  54{
  55        bool ret;
  56
  57        spin_lock(&r->producer_lock);
  58        ret = __ptr_ring_full(r);
  59        spin_unlock(&r->producer_lock);
  60
  61        return ret;
  62}
  63
  64static inline bool ptr_ring_full_irq(struct ptr_ring *r)
  65{
  66        bool ret;
  67
  68        spin_lock_irq(&r->producer_lock);
  69        ret = __ptr_ring_full(r);
  70        spin_unlock_irq(&r->producer_lock);
  71
  72        return ret;
  73}
  74
  75static inline bool ptr_ring_full_any(struct ptr_ring *r)
  76{
  77        unsigned long flags;
  78        bool ret;
  79
  80        spin_lock_irqsave(&r->producer_lock, flags);
  81        ret = __ptr_ring_full(r);
  82        spin_unlock_irqrestore(&r->producer_lock, flags);
  83
  84        return ret;
  85}
  86
  87static inline bool ptr_ring_full_bh(struct ptr_ring *r)
  88{
  89        bool ret;
  90
  91        spin_lock_bh(&r->producer_lock);
  92        ret = __ptr_ring_full(r);
  93        spin_unlock_bh(&r->producer_lock);
  94
  95        return ret;
  96}
  97
  98/* Note: callers invoking this in a loop must use a compiler barrier,
  99 * for example cpu_relax(). Callers must hold producer_lock.
 100 * Callers are responsible for making sure pointer that is being queued
 101 * points to a valid data.
 102 */
 103static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
 104{
 105        if (unlikely(!r->size) || r->queue[r->producer])
 106                return -ENOSPC;
 107
 108        /* Make sure the pointer we are storing points to a valid data. */
 109        /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */
 110        smp_wmb();
 111
 112        WRITE_ONCE(r->queue[r->producer++], ptr);
 113        if (unlikely(r->producer >= r->size))
 114                r->producer = 0;
 115        return 0;
 116}
 117
 118/*
 119 * Note: resize (below) nests producer lock within consumer lock, so if you
 120 * consume in interrupt or BH context, you must disable interrupts/BH when
 121 * calling this.
 122 */
 123static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
 124{
 125        int ret;
 126
 127        spin_lock(&r->producer_lock);
 128        ret = __ptr_ring_produce(r, ptr);
 129        spin_unlock(&r->producer_lock);
 130
 131        return ret;
 132}
 133
 134static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
 135{
 136        int ret;
 137
 138        spin_lock_irq(&r->producer_lock);
 139        ret = __ptr_ring_produce(r, ptr);
 140        spin_unlock_irq(&r->producer_lock);
 141
 142        return ret;
 143}
 144
 145static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
 146{
 147        unsigned long flags;
 148        int ret;
 149
 150        spin_lock_irqsave(&r->producer_lock, flags);
 151        ret = __ptr_ring_produce(r, ptr);
 152        spin_unlock_irqrestore(&r->producer_lock, flags);
 153
 154        return ret;
 155}
 156
 157static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
 158{
 159        int ret;
 160
 161        spin_lock_bh(&r->producer_lock);
 162        ret = __ptr_ring_produce(r, ptr);
 163        spin_unlock_bh(&r->producer_lock);
 164
 165        return ret;
 166}
 167
 168static inline void *__ptr_ring_peek(struct ptr_ring *r)
 169{
 170        if (likely(r->size))
 171                return READ_ONCE(r->queue[r->consumer_head]);
 172        return NULL;
 173}
 174
 175/*
 176 * Test ring empty status without taking any locks.
 177 *
 178 * NB: This is only safe to call if ring is never resized.
 179 *
 180 * However, if some other CPU consumes ring entries at the same time, the value
 181 * returned is not guaranteed to be correct.
 182 *
 183 * In this case - to avoid incorrectly detecting the ring
 184 * as empty - the CPU consuming the ring entries is responsible
 185 * for either consuming all ring entries until the ring is empty,
 186 * or synchronizing with some other CPU and causing it to
 187 * re-test __ptr_ring_empty and/or consume the ring enteries
 188 * after the synchronization point.
 189 *
 190 * Note: callers invoking this in a loop must use a compiler barrier,
 191 * for example cpu_relax().
 192 */
 193static inline bool __ptr_ring_empty(struct ptr_ring *r)
 194{
 195        if (likely(r->size))
 196                return !r->queue[READ_ONCE(r->consumer_head)];
 197        return true;
 198}
 199
 200static inline bool ptr_ring_empty(struct ptr_ring *r)
 201{
 202        bool ret;
 203
 204        spin_lock(&r->consumer_lock);
 205        ret = __ptr_ring_empty(r);
 206        spin_unlock(&r->consumer_lock);
 207
 208        return ret;
 209}
 210
 211static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
 212{
 213        bool ret;
 214
 215        spin_lock_irq(&r->consumer_lock);
 216        ret = __ptr_ring_empty(r);
 217        spin_unlock_irq(&r->consumer_lock);
 218
 219        return ret;
 220}
 221
 222static inline bool ptr_ring_empty_any(struct ptr_ring *r)
 223{
 224        unsigned long flags;
 225        bool ret;
 226
 227        spin_lock_irqsave(&r->consumer_lock, flags);
 228        ret = __ptr_ring_empty(r);
 229        spin_unlock_irqrestore(&r->consumer_lock, flags);
 230
 231        return ret;
 232}
 233
 234static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
 235{
 236        bool ret;
 237
 238        spin_lock_bh(&r->consumer_lock);
 239        ret = __ptr_ring_empty(r);
 240        spin_unlock_bh(&r->consumer_lock);
 241
 242        return ret;
 243}
 244
 245/* Must only be called after __ptr_ring_peek returned !NULL */
 246static inline void __ptr_ring_discard_one(struct ptr_ring *r)
 247{
 248        /* Fundamentally, what we want to do is update consumer
 249         * index and zero out the entry so producer can reuse it.
 250         * Doing it naively at each consume would be as simple as:
 251         *       consumer = r->consumer;
 252         *       r->queue[consumer++] = NULL;
 253         *       if (unlikely(consumer >= r->size))
 254         *               consumer = 0;
 255         *       r->consumer = consumer;
 256         * but that is suboptimal when the ring is full as producer is writing
 257         * out new entries in the same cache line.  Defer these updates until a
 258         * batch of entries has been consumed.
 259         */
 260        /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
 261         * to work correctly.
 262         */
 263        int consumer_head = r->consumer_head;
 264        int head = consumer_head++;
 265
 266        /* Once we have processed enough entries invalidate them in
 267         * the ring all at once so producer can reuse their space in the ring.
 268         * We also do this when we reach end of the ring - not mandatory
 269         * but helps keep the implementation simple.
 270         */
 271        if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
 272                     consumer_head >= r->size)) {
 273                /* Zero out entries in the reverse order: this way we touch the
 274                 * cache line that producer might currently be reading the last;
 275                 * producer won't make progress and touch other cache lines
 276                 * besides the first one until we write out all entries.
 277                 */
 278                while (likely(head >= r->consumer_tail))
 279                        r->queue[head--] = NULL;
 280                r->consumer_tail = consumer_head;
 281        }
 282        if (unlikely(consumer_head >= r->size)) {
 283                consumer_head = 0;
 284                r->consumer_tail = 0;
 285        }
 286        /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
 287        WRITE_ONCE(r->consumer_head, consumer_head);
 288}
 289
 290static inline void *__ptr_ring_consume(struct ptr_ring *r)
 291{
 292        void *ptr;
 293
 294        /* The READ_ONCE in __ptr_ring_peek guarantees that anyone
 295         * accessing data through the pointer is up to date. Pairs
 296         * with smp_wmb in __ptr_ring_produce.
 297         */
 298        ptr = __ptr_ring_peek(r);
 299        if (ptr)
 300                __ptr_ring_discard_one(r);
 301
 302        return ptr;
 303}
 304
 305static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
 306                                             void **array, int n)
 307{
 308        void *ptr;
 309        int i;
 310
 311        for (i = 0; i < n; i++) {
 312                ptr = __ptr_ring_consume(r);
 313                if (!ptr)
 314                        break;
 315                array[i] = ptr;
 316        }
 317
 318        return i;
 319}
 320
 321/*
 322 * Note: resize (below) nests producer lock within consumer lock, so if you
 323 * call this in interrupt or BH context, you must disable interrupts/BH when
 324 * producing.
 325 */
 326static inline void *ptr_ring_consume(struct ptr_ring *r)
 327{
 328        void *ptr;
 329
 330        spin_lock(&r->consumer_lock);
 331        ptr = __ptr_ring_consume(r);
 332        spin_unlock(&r->consumer_lock);
 333
 334        return ptr;
 335}
 336
 337static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
 338{
 339        void *ptr;
 340
 341        spin_lock_irq(&r->consumer_lock);
 342        ptr = __ptr_ring_consume(r);
 343        spin_unlock_irq(&r->consumer_lock);
 344
 345        return ptr;
 346}
 347
 348static inline void *ptr_ring_consume_any(struct ptr_ring *r)
 349{
 350        unsigned long flags;
 351        void *ptr;
 352
 353        spin_lock_irqsave(&r->consumer_lock, flags);
 354        ptr = __ptr_ring_consume(r);
 355        spin_unlock_irqrestore(&r->consumer_lock, flags);
 356
 357        return ptr;
 358}
 359
 360static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
 361{
 362        void *ptr;
 363
 364        spin_lock_bh(&r->consumer_lock);
 365        ptr = __ptr_ring_consume(r);
 366        spin_unlock_bh(&r->consumer_lock);
 367
 368        return ptr;
 369}
 370
 371static inline int ptr_ring_consume_batched(struct ptr_ring *r,
 372                                           void **array, int n)
 373{
 374        int ret;
 375
 376        spin_lock(&r->consumer_lock);
 377        ret = __ptr_ring_consume_batched(r, array, n);
 378        spin_unlock(&r->consumer_lock);
 379
 380        return ret;
 381}
 382
 383static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
 384                                               void **array, int n)
 385{
 386        int ret;
 387
 388        spin_lock_irq(&r->consumer_lock);
 389        ret = __ptr_ring_consume_batched(r, array, n);
 390        spin_unlock_irq(&r->consumer_lock);
 391
 392        return ret;
 393}
 394
 395static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
 396                                               void **array, int n)
 397{
 398        unsigned long flags;
 399        int ret;
 400
 401        spin_lock_irqsave(&r->consumer_lock, flags);
 402        ret = __ptr_ring_consume_batched(r, array, n);
 403        spin_unlock_irqrestore(&r->consumer_lock, flags);
 404
 405        return ret;
 406}
 407
 408static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
 409                                              void **array, int n)
 410{
 411        int ret;
 412
 413        spin_lock_bh(&r->consumer_lock);
 414        ret = __ptr_ring_consume_batched(r, array, n);
 415        spin_unlock_bh(&r->consumer_lock);
 416
 417        return ret;
 418}
 419
 420/* Cast to structure type and call a function without discarding from FIFO.
 421 * Function must return a value.
 422 * Callers must take consumer_lock.
 423 */
 424#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
 425
 426#define PTR_RING_PEEK_CALL(r, f) ({ \
 427        typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 428        \
 429        spin_lock(&(r)->consumer_lock); \
 430        __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 431        spin_unlock(&(r)->consumer_lock); \
 432        __PTR_RING_PEEK_CALL_v; \
 433})
 434
 435#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
 436        typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 437        \
 438        spin_lock_irq(&(r)->consumer_lock); \
 439        __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 440        spin_unlock_irq(&(r)->consumer_lock); \
 441        __PTR_RING_PEEK_CALL_v; \
 442})
 443
 444#define PTR_RING_PEEK_CALL_BH(r, f) ({ \
 445        typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 446        \
 447        spin_lock_bh(&(r)->consumer_lock); \
 448        __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 449        spin_unlock_bh(&(r)->consumer_lock); \
 450        __PTR_RING_PEEK_CALL_v; \
 451})
 452
 453#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
 454        typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 455        unsigned long __PTR_RING_PEEK_CALL_f;\
 456        \
 457        spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
 458        __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 459        spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
 460        __PTR_RING_PEEK_CALL_v; \
 461})
 462
 463/* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See
 464 * documentation for vmalloc for which of them are legal.
 465 */
 466static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
 467{
 468        if (size > KMALLOC_MAX_SIZE / sizeof(void *))
 469                return NULL;
 470        return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO);
 471}
 472
 473static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
 474{
 475        r->size = size;
 476        r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
 477        /* We need to set batch at least to 1 to make logic
 478         * in __ptr_ring_discard_one work correctly.
 479         * Batching too much (because ring is small) would cause a lot of
 480         * burstiness. Needs tuning, for now disable batching.
 481         */
 482        if (r->batch > r->size / 2 || !r->batch)
 483                r->batch = 1;
 484}
 485
 486static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
 487{
 488        r->queue = __ptr_ring_init_queue_alloc(size, gfp);
 489        if (!r->queue)
 490                return -ENOMEM;
 491
 492        __ptr_ring_set_size(r, size);
 493        r->producer = r->consumer_head = r->consumer_tail = 0;
 494        spin_lock_init(&r->producer_lock);
 495        spin_lock_init(&r->consumer_lock);
 496
 497        return 0;
 498}
 499
 500/*
 501 * Return entries into ring. Destroy entries that don't fit.
 502 *
 503 * Note: this is expected to be a rare slow path operation.
 504 *
 505 * Note: producer lock is nested within consumer lock, so if you
 506 * resize you must make sure all uses nest correctly.
 507 * In particular if you consume ring in interrupt or BH context, you must
 508 * disable interrupts/BH when doing so.
 509 */
 510static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
 511                                      void (*destroy)(void *))
 512{
 513        unsigned long flags;
 514        int head;
 515
 516        spin_lock_irqsave(&r->consumer_lock, flags);
 517        spin_lock(&r->producer_lock);
 518
 519        if (!r->size)
 520                goto done;
 521
 522        /*
 523         * Clean out buffered entries (for simplicity). This way following code
 524         * can test entries for NULL and if not assume they are valid.
 525         */
 526        head = r->consumer_head - 1;
 527        while (likely(head >= r->consumer_tail))
 528                r->queue[head--] = NULL;
 529        r->consumer_tail = r->consumer_head;
 530
 531        /*
 532         * Go over entries in batch, start moving head back and copy entries.
 533         * Stop when we run into previously unconsumed entries.
 534         */
 535        while (n) {
 536                head = r->consumer_head - 1;
 537                if (head < 0)
 538                        head = r->size - 1;
 539                if (r->queue[head]) {
 540                        /* This batch entry will have to be destroyed. */
 541                        goto done;
 542                }
 543                r->queue[head] = batch[--n];
 544                r->consumer_tail = head;
 545                /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
 546                WRITE_ONCE(r->consumer_head, head);
 547        }
 548
 549done:
 550        /* Destroy all entries left in the batch. */
 551        while (n)
 552                destroy(batch[--n]);
 553        spin_unlock(&r->producer_lock);
 554        spin_unlock_irqrestore(&r->consumer_lock, flags);
 555}
 556
 557static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
 558                                           int size, gfp_t gfp,
 559                                           void (*destroy)(void *))
 560{
 561        int producer = 0;
 562        void **old;
 563        void *ptr;
 564
 565        while ((ptr = __ptr_ring_consume(r)))
 566                if (producer < size)
 567                        queue[producer++] = ptr;
 568                else if (destroy)
 569                        destroy(ptr);
 570
 571        if (producer >= size)
 572                producer = 0;
 573        __ptr_ring_set_size(r, size);
 574        r->producer = producer;
 575        r->consumer_head = 0;
 576        r->consumer_tail = 0;
 577        old = r->queue;
 578        r->queue = queue;
 579
 580        return old;
 581}
 582
 583/*
 584 * Note: producer lock is nested within consumer lock, so if you
 585 * resize you must make sure all uses nest correctly.
 586 * In particular if you consume ring in interrupt or BH context, you must
 587 * disable interrupts/BH when doing so.
 588 */
 589static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
 590                                  void (*destroy)(void *))
 591{
 592        unsigned long flags;
 593        void **queue = __ptr_ring_init_queue_alloc(size, gfp);
 594        void **old;
 595
 596        if (!queue)
 597                return -ENOMEM;
 598
 599        spin_lock_irqsave(&(r)->consumer_lock, flags);
 600        spin_lock(&(r)->producer_lock);
 601
 602        old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
 603
 604        spin_unlock(&(r)->producer_lock);
 605        spin_unlock_irqrestore(&(r)->consumer_lock, flags);
 606
 607        kvfree(old);
 608
 609        return 0;
 610}
 611
 612/*
 613 * Note: producer lock is nested within consumer lock, so if you
 614 * resize you must make sure all uses nest correctly.
 615 * In particular if you consume ring in interrupt or BH context, you must
 616 * disable interrupts/BH when doing so.
 617 */
 618static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
 619                                           unsigned int nrings,
 620                                           int size,
 621                                           gfp_t gfp, void (*destroy)(void *))
 622{
 623        unsigned long flags;
 624        void ***queues;
 625        int i;
 626
 627        queues = kmalloc_array(nrings, sizeof(*queues), gfp);
 628        if (!queues)
 629                goto noqueues;
 630
 631        for (i = 0; i < nrings; ++i) {
 632                queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
 633                if (!queues[i])
 634                        goto nomem;
 635        }
 636
 637        for (i = 0; i < nrings; ++i) {
 638                spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
 639                spin_lock(&(rings[i])->producer_lock);
 640                queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
 641                                                  size, gfp, destroy);
 642                spin_unlock(&(rings[i])->producer_lock);
 643                spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
 644        }
 645
 646        for (i = 0; i < nrings; ++i)
 647                kvfree(queues[i]);
 648
 649        kfree(queues);
 650
 651        return 0;
 652
 653nomem:
 654        while (--i >= 0)
 655                kvfree(queues[i]);
 656
 657        kfree(queues);
 658
 659noqueues:
 660        return -ENOMEM;
 661}
 662
 663static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
 664{
 665        void *ptr;
 666
 667        if (destroy)
 668                while ((ptr = ptr_ring_consume(r)))
 669                        destroy(ptr);
 670        kvfree(r->queue);
 671}
 672
 673#endif /* _LINUX_PTR_RING_H  */
 674