linux/include/linux/ptr_ring.h
<<
>>
Prefs
   1/* SPDX-License-Identifier: GPL-2.0-or-later */
   2/*
   3 *      Definitions for the 'struct ptr_ring' datastructure.
   4 *
   5 *      Author:
   6 *              Michael S. Tsirkin <mst@redhat.com>
   7 *
   8 *      Copyright (C) 2016 Red Hat, Inc.
   9 *
  10 *      This is a limited-size FIFO maintaining pointers in FIFO order, with
  11 *      one CPU producing entries and another consuming entries from a FIFO.
  12 *
  13 *      This implementation tries to minimize cache-contention when there is a
  14 *      single producer and a single consumer CPU.
  15 */
  16
  17#ifndef _LINUX_PTR_RING_H
  18#define _LINUX_PTR_RING_H 1
  19
  20#ifdef __KERNEL__
  21#include <linux/spinlock.h>
  22#include <linux/cache.h>
  23#include <linux/types.h>
  24#include <linux/compiler.h>
  25#include <linux/slab.h>
  26#include <linux/mm.h>
  27#include <asm/errno.h>
  28#endif
  29
  30struct ptr_ring {
  31        int producer ____cacheline_aligned_in_smp;
  32        spinlock_t producer_lock;
  33        int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
  34        int consumer_tail; /* next entry to invalidate */
  35        spinlock_t consumer_lock;
  36        /* Shared consumer/producer data */
  37        /* Read-only by both the producer and the consumer */
  38        int size ____cacheline_aligned_in_smp; /* max entries in queue */
  39        int batch; /* number of entries to consume in a batch */
  40        void **queue;
  41};
  42
  43/* Note: callers invoking this in a loop must use a compiler barrier,
  44 * for example cpu_relax().
  45 *
  46 * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
  47 * see e.g. ptr_ring_full.
  48 */
  49static inline bool __ptr_ring_full(struct ptr_ring *r)
  50{
  51        return r->queue[r->producer];
  52}
  53
  54static inline bool ptr_ring_full(struct ptr_ring *r)
  55{
  56        bool ret;
  57
  58        spin_lock(&r->producer_lock);
  59        ret = __ptr_ring_full(r);
  60        spin_unlock(&r->producer_lock);
  61
  62        return ret;
  63}
  64
  65static inline bool ptr_ring_full_irq(struct ptr_ring *r)
  66{
  67        bool ret;
  68
  69        spin_lock_irq(&r->producer_lock);
  70        ret = __ptr_ring_full(r);
  71        spin_unlock_irq(&r->producer_lock);
  72
  73        return ret;
  74}
  75
  76static inline bool ptr_ring_full_any(struct ptr_ring *r)
  77{
  78        unsigned long flags;
  79        bool ret;
  80
  81        spin_lock_irqsave(&r->producer_lock, flags);
  82        ret = __ptr_ring_full(r);
  83        spin_unlock_irqrestore(&r->producer_lock, flags);
  84
  85        return ret;
  86}
  87
  88static inline bool ptr_ring_full_bh(struct ptr_ring *r)
  89{
  90        bool ret;
  91
  92        spin_lock_bh(&r->producer_lock);
  93        ret = __ptr_ring_full(r);
  94        spin_unlock_bh(&r->producer_lock);
  95
  96        return ret;
  97}
  98
  99/* Note: callers invoking this in a loop must use a compiler barrier,
 100 * for example cpu_relax(). Callers must hold producer_lock.
 101 * Callers are responsible for making sure pointer that is being queued
 102 * points to a valid data.
 103 */
 104static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
 105{
 106        if (unlikely(!r->size) || r->queue[r->producer])
 107                return -ENOSPC;
 108
 109        /* Make sure the pointer we are storing points to a valid data. */
 110        /* Pairs with the dependency ordering in __ptr_ring_consume. */
 111        smp_wmb();
 112
 113        WRITE_ONCE(r->queue[r->producer++], ptr);
 114        if (unlikely(r->producer >= r->size))
 115                r->producer = 0;
 116        return 0;
 117}
 118
 119/*
 120 * Note: resize (below) nests producer lock within consumer lock, so if you
 121 * consume in interrupt or BH context, you must disable interrupts/BH when
 122 * calling this.
 123 */
 124static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
 125{
 126        int ret;
 127
 128        spin_lock(&r->producer_lock);
 129        ret = __ptr_ring_produce(r, ptr);
 130        spin_unlock(&r->producer_lock);
 131
 132        return ret;
 133}
 134
 135static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
 136{
 137        int ret;
 138
 139        spin_lock_irq(&r->producer_lock);
 140        ret = __ptr_ring_produce(r, ptr);
 141        spin_unlock_irq(&r->producer_lock);
 142
 143        return ret;
 144}
 145
 146static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
 147{
 148        unsigned long flags;
 149        int ret;
 150
 151        spin_lock_irqsave(&r->producer_lock, flags);
 152        ret = __ptr_ring_produce(r, ptr);
 153        spin_unlock_irqrestore(&r->producer_lock, flags);
 154
 155        return ret;
 156}
 157
 158static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
 159{
 160        int ret;
 161
 162        spin_lock_bh(&r->producer_lock);
 163        ret = __ptr_ring_produce(r, ptr);
 164        spin_unlock_bh(&r->producer_lock);
 165
 166        return ret;
 167}
 168
 169static inline void *__ptr_ring_peek(struct ptr_ring *r)
 170{
 171        if (likely(r->size))
 172                return READ_ONCE(r->queue[r->consumer_head]);
 173        return NULL;
 174}
 175
 176/*
 177 * Test ring empty status without taking any locks.
 178 *
 179 * NB: This is only safe to call if ring is never resized.
 180 *
 181 * However, if some other CPU consumes ring entries at the same time, the value
 182 * returned is not guaranteed to be correct.
 183 *
 184 * In this case - to avoid incorrectly detecting the ring
 185 * as empty - the CPU consuming the ring entries is responsible
 186 * for either consuming all ring entries until the ring is empty,
 187 * or synchronizing with some other CPU and causing it to
 188 * re-test __ptr_ring_empty and/or consume the ring enteries
 189 * after the synchronization point.
 190 *
 191 * Note: callers invoking this in a loop must use a compiler barrier,
 192 * for example cpu_relax().
 193 */
 194static inline bool __ptr_ring_empty(struct ptr_ring *r)
 195{
 196        if (likely(r->size))
 197                return !r->queue[READ_ONCE(r->consumer_head)];
 198        return true;
 199}
 200
 201static inline bool ptr_ring_empty(struct ptr_ring *r)
 202{
 203        bool ret;
 204
 205        spin_lock(&r->consumer_lock);
 206        ret = __ptr_ring_empty(r);
 207        spin_unlock(&r->consumer_lock);
 208
 209        return ret;
 210}
 211
 212static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
 213{
 214        bool ret;
 215
 216        spin_lock_irq(&r->consumer_lock);
 217        ret = __ptr_ring_empty(r);
 218        spin_unlock_irq(&r->consumer_lock);
 219
 220        return ret;
 221}
 222
 223static inline bool ptr_ring_empty_any(struct ptr_ring *r)
 224{
 225        unsigned long flags;
 226        bool ret;
 227
 228        spin_lock_irqsave(&r->consumer_lock, flags);
 229        ret = __ptr_ring_empty(r);
 230        spin_unlock_irqrestore(&r->consumer_lock, flags);
 231
 232        return ret;
 233}
 234
 235static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
 236{
 237        bool ret;
 238
 239        spin_lock_bh(&r->consumer_lock);
 240        ret = __ptr_ring_empty(r);
 241        spin_unlock_bh(&r->consumer_lock);
 242
 243        return ret;
 244}
 245
 246/* Must only be called after __ptr_ring_peek returned !NULL */
 247static inline void __ptr_ring_discard_one(struct ptr_ring *r)
 248{
 249        /* Fundamentally, what we want to do is update consumer
 250         * index and zero out the entry so producer can reuse it.
 251         * Doing it naively at each consume would be as simple as:
 252         *       consumer = r->consumer;
 253         *       r->queue[consumer++] = NULL;
 254         *       if (unlikely(consumer >= r->size))
 255         *               consumer = 0;
 256         *       r->consumer = consumer;
 257         * but that is suboptimal when the ring is full as producer is writing
 258         * out new entries in the same cache line.  Defer these updates until a
 259         * batch of entries has been consumed.
 260         */
 261        /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
 262         * to work correctly.
 263         */
 264        int consumer_head = r->consumer_head;
 265        int head = consumer_head++;
 266
 267        /* Once we have processed enough entries invalidate them in
 268         * the ring all at once so producer can reuse their space in the ring.
 269         * We also do this when we reach end of the ring - not mandatory
 270         * but helps keep the implementation simple.
 271         */
 272        if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
 273                     consumer_head >= r->size)) {
 274                /* Zero out entries in the reverse order: this way we touch the
 275                 * cache line that producer might currently be reading the last;
 276                 * producer won't make progress and touch other cache lines
 277                 * besides the first one until we write out all entries.
 278                 */
 279                while (likely(head >= r->consumer_tail))
 280                        r->queue[head--] = NULL;
 281                r->consumer_tail = consumer_head;
 282        }
 283        if (unlikely(consumer_head >= r->size)) {
 284                consumer_head = 0;
 285                r->consumer_tail = 0;
 286        }
 287        /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
 288        WRITE_ONCE(r->consumer_head, consumer_head);
 289}
 290
 291static inline void *__ptr_ring_consume(struct ptr_ring *r)
 292{
 293        void *ptr;
 294
 295        /* The READ_ONCE in __ptr_ring_peek guarantees that anyone
 296         * accessing data through the pointer is up to date. Pairs
 297         * with smp_wmb in __ptr_ring_produce.
 298         */
 299        ptr = __ptr_ring_peek(r);
 300        if (ptr)
 301                __ptr_ring_discard_one(r);
 302
 303        return ptr;
 304}
 305
 306static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
 307                                             void **array, int n)
 308{
 309        void *ptr;
 310        int i;
 311
 312        for (i = 0; i < n; i++) {
 313                ptr = __ptr_ring_consume(r);
 314                if (!ptr)
 315                        break;
 316                array[i] = ptr;
 317        }
 318
 319        return i;
 320}
 321
 322/*
 323 * Note: resize (below) nests producer lock within consumer lock, so if you
 324 * call this in interrupt or BH context, you must disable interrupts/BH when
 325 * producing.
 326 */
 327static inline void *ptr_ring_consume(struct ptr_ring *r)
 328{
 329        void *ptr;
 330
 331        spin_lock(&r->consumer_lock);
 332        ptr = __ptr_ring_consume(r);
 333        spin_unlock(&r->consumer_lock);
 334
 335        return ptr;
 336}
 337
 338static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
 339{
 340        void *ptr;
 341
 342        spin_lock_irq(&r->consumer_lock);
 343        ptr = __ptr_ring_consume(r);
 344        spin_unlock_irq(&r->consumer_lock);
 345
 346        return ptr;
 347}
 348
 349static inline void *ptr_ring_consume_any(struct ptr_ring *r)
 350{
 351        unsigned long flags;
 352        void *ptr;
 353
 354        spin_lock_irqsave(&r->consumer_lock, flags);
 355        ptr = __ptr_ring_consume(r);
 356        spin_unlock_irqrestore(&r->consumer_lock, flags);
 357
 358        return ptr;
 359}
 360
 361static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
 362{
 363        void *ptr;
 364
 365        spin_lock_bh(&r->consumer_lock);
 366        ptr = __ptr_ring_consume(r);
 367        spin_unlock_bh(&r->consumer_lock);
 368
 369        return ptr;
 370}
 371
 372static inline int ptr_ring_consume_batched(struct ptr_ring *r,
 373                                           void **array, int n)
 374{
 375        int ret;
 376
 377        spin_lock(&r->consumer_lock);
 378        ret = __ptr_ring_consume_batched(r, array, n);
 379        spin_unlock(&r->consumer_lock);
 380
 381        return ret;
 382}
 383
 384static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
 385                                               void **array, int n)
 386{
 387        int ret;
 388
 389        spin_lock_irq(&r->consumer_lock);
 390        ret = __ptr_ring_consume_batched(r, array, n);
 391        spin_unlock_irq(&r->consumer_lock);
 392
 393        return ret;
 394}
 395
 396static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
 397                                               void **array, int n)
 398{
 399        unsigned long flags;
 400        int ret;
 401
 402        spin_lock_irqsave(&r->consumer_lock, flags);
 403        ret = __ptr_ring_consume_batched(r, array, n);
 404        spin_unlock_irqrestore(&r->consumer_lock, flags);
 405
 406        return ret;
 407}
 408
 409static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
 410                                              void **array, int n)
 411{
 412        int ret;
 413
 414        spin_lock_bh(&r->consumer_lock);
 415        ret = __ptr_ring_consume_batched(r, array, n);
 416        spin_unlock_bh(&r->consumer_lock);
 417
 418        return ret;
 419}
 420
 421/* Cast to structure type and call a function without discarding from FIFO.
 422 * Function must return a value.
 423 * Callers must take consumer_lock.
 424 */
 425#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
 426
 427#define PTR_RING_PEEK_CALL(r, f) ({ \
 428        typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 429        \
 430        spin_lock(&(r)->consumer_lock); \
 431        __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 432        spin_unlock(&(r)->consumer_lock); \
 433        __PTR_RING_PEEK_CALL_v; \
 434})
 435
 436#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
 437        typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 438        \
 439        spin_lock_irq(&(r)->consumer_lock); \
 440        __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 441        spin_unlock_irq(&(r)->consumer_lock); \
 442        __PTR_RING_PEEK_CALL_v; \
 443})
 444
 445#define PTR_RING_PEEK_CALL_BH(r, f) ({ \
 446        typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 447        \
 448        spin_lock_bh(&(r)->consumer_lock); \
 449        __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 450        spin_unlock_bh(&(r)->consumer_lock); \
 451        __PTR_RING_PEEK_CALL_v; \
 452})
 453
 454#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
 455        typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 456        unsigned long __PTR_RING_PEEK_CALL_f;\
 457        \
 458        spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
 459        __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 460        spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
 461        __PTR_RING_PEEK_CALL_v; \
 462})
 463
 464/* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See
 465 * documentation for vmalloc for which of them are legal.
 466 */
 467static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
 468{
 469        if (size > KMALLOC_MAX_SIZE / sizeof(void *))
 470                return NULL;
 471        return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO);
 472}
 473
 474static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
 475{
 476        r->size = size;
 477        r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
 478        /* We need to set batch at least to 1 to make logic
 479         * in __ptr_ring_discard_one work correctly.
 480         * Batching too much (because ring is small) would cause a lot of
 481         * burstiness. Needs tuning, for now disable batching.
 482         */
 483        if (r->batch > r->size / 2 || !r->batch)
 484                r->batch = 1;
 485}
 486
 487static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
 488{
 489        r->queue = __ptr_ring_init_queue_alloc(size, gfp);
 490        if (!r->queue)
 491                return -ENOMEM;
 492
 493        __ptr_ring_set_size(r, size);
 494        r->producer = r->consumer_head = r->consumer_tail = 0;
 495        spin_lock_init(&r->producer_lock);
 496        spin_lock_init(&r->consumer_lock);
 497
 498        return 0;
 499}
 500
 501/*
 502 * Return entries into ring. Destroy entries that don't fit.
 503 *
 504 * Note: this is expected to be a rare slow path operation.
 505 *
 506 * Note: producer lock is nested within consumer lock, so if you
 507 * resize you must make sure all uses nest correctly.
 508 * In particular if you consume ring in interrupt or BH context, you must
 509 * disable interrupts/BH when doing so.
 510 */
 511static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
 512                                      void (*destroy)(void *))
 513{
 514        unsigned long flags;
 515        int head;
 516
 517        spin_lock_irqsave(&r->consumer_lock, flags);
 518        spin_lock(&r->producer_lock);
 519
 520        if (!r->size)
 521                goto done;
 522
 523        /*
 524         * Clean out buffered entries (for simplicity). This way following code
 525         * can test entries for NULL and if not assume they are valid.
 526         */
 527        head = r->consumer_head - 1;
 528        while (likely(head >= r->consumer_tail))
 529                r->queue[head--] = NULL;
 530        r->consumer_tail = r->consumer_head;
 531
 532        /*
 533         * Go over entries in batch, start moving head back and copy entries.
 534         * Stop when we run into previously unconsumed entries.
 535         */
 536        while (n) {
 537                head = r->consumer_head - 1;
 538                if (head < 0)
 539                        head = r->size - 1;
 540                if (r->queue[head]) {
 541                        /* This batch entry will have to be destroyed. */
 542                        goto done;
 543                }
 544                r->queue[head] = batch[--n];
 545                r->consumer_tail = head;
 546                /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
 547                WRITE_ONCE(r->consumer_head, head);
 548        }
 549
 550done:
 551        /* Destroy all entries left in the batch. */
 552        while (n)
 553                destroy(batch[--n]);
 554        spin_unlock(&r->producer_lock);
 555        spin_unlock_irqrestore(&r->consumer_lock, flags);
 556}
 557
 558static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
 559                                           int size, gfp_t gfp,
 560                                           void (*destroy)(void *))
 561{
 562        int producer = 0;
 563        void **old;
 564        void *ptr;
 565
 566        while ((ptr = __ptr_ring_consume(r)))
 567                if (producer < size)
 568                        queue[producer++] = ptr;
 569                else if (destroy)
 570                        destroy(ptr);
 571
 572        if (producer >= size)
 573                producer = 0;
 574        __ptr_ring_set_size(r, size);
 575        r->producer = producer;
 576        r->consumer_head = 0;
 577        r->consumer_tail = 0;
 578        old = r->queue;
 579        r->queue = queue;
 580
 581        return old;
 582}
 583
 584/*
 585 * Note: producer lock is nested within consumer lock, so if you
 586 * resize you must make sure all uses nest correctly.
 587 * In particular if you consume ring in interrupt or BH context, you must
 588 * disable interrupts/BH when doing so.
 589 */
 590static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
 591                                  void (*destroy)(void *))
 592{
 593        unsigned long flags;
 594        void **queue = __ptr_ring_init_queue_alloc(size, gfp);
 595        void **old;
 596
 597        if (!queue)
 598                return -ENOMEM;
 599
 600        spin_lock_irqsave(&(r)->consumer_lock, flags);
 601        spin_lock(&(r)->producer_lock);
 602
 603        old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
 604
 605        spin_unlock(&(r)->producer_lock);
 606        spin_unlock_irqrestore(&(r)->consumer_lock, flags);
 607
 608        kvfree(old);
 609
 610        return 0;
 611}
 612
 613/*
 614 * Note: producer lock is nested within consumer lock, so if you
 615 * resize you must make sure all uses nest correctly.
 616 * In particular if you consume ring in interrupt or BH context, you must
 617 * disable interrupts/BH when doing so.
 618 */
 619static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
 620                                           unsigned int nrings,
 621                                           int size,
 622                                           gfp_t gfp, void (*destroy)(void *))
 623{
 624        unsigned long flags;
 625        void ***queues;
 626        int i;
 627
 628        queues = kmalloc_array(nrings, sizeof(*queues), gfp);
 629        if (!queues)
 630                goto noqueues;
 631
 632        for (i = 0; i < nrings; ++i) {
 633                queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
 634                if (!queues[i])
 635                        goto nomem;
 636        }
 637
 638        for (i = 0; i < nrings; ++i) {
 639                spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
 640                spin_lock(&(rings[i])->producer_lock);
 641                queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
 642                                                  size, gfp, destroy);
 643                spin_unlock(&(rings[i])->producer_lock);
 644                spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
 645        }
 646
 647        for (i = 0; i < nrings; ++i)
 648                kvfree(queues[i]);
 649
 650        kfree(queues);
 651
 652        return 0;
 653
 654nomem:
 655        while (--i >= 0)
 656                kvfree(queues[i]);
 657
 658        kfree(queues);
 659
 660noqueues:
 661        return -ENOMEM;
 662}
 663
 664static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
 665{
 666        void *ptr;
 667
 668        if (destroy)
 669                while ((ptr = ptr_ring_consume(r)))
 670                        destroy(ptr);
 671        kvfree(r->queue);
 672}
 673
 674#endif /* _LINUX_PTR_RING_H  */
 675