linux/include/linux/ptr_ring.h
<<
>>
Prefs
   1/*
   2 *      Definitions for the 'struct ptr_ring' datastructure.
   3 *
   4 *      Author:
   5 *              Michael S. Tsirkin <mst@redhat.com>
   6 *
   7 *      Copyright (C) 2016 Red Hat, Inc.
   8 *
   9 *      This program is free software; you can redistribute it and/or modify it
  10 *      under the terms of the GNU General Public License as published by the
  11 *      Free Software Foundation; either version 2 of the License, or (at your
  12 *      option) any later version.
  13 *
  14 *      This is a limited-size FIFO maintaining pointers in FIFO order, with
  15 *      one CPU producing entries and another consuming entries from a FIFO.
  16 *
  17 *      This implementation tries to minimize cache-contention when there is a
  18 *      single producer and a single consumer CPU.
  19 */
  20
  21#ifndef _LINUX_PTR_RING_H
  22#define _LINUX_PTR_RING_H 1
  23
  24#ifdef __KERNEL__
  25#include <linux/spinlock.h>
  26#include <linux/cache.h>
  27#include <linux/types.h>
  28#include <linux/compiler.h>
  29#include <linux/cache.h>
  30#include <linux/slab.h>
  31#include <asm/errno.h>
  32#endif
  33
  34struct ptr_ring {
  35        int producer ____cacheline_aligned_in_smp;
  36        spinlock_t producer_lock;
  37        int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
  38        int consumer_tail; /* next entry to invalidate */
  39        spinlock_t consumer_lock;
  40        /* Shared consumer/producer data */
  41        /* Read-only by both the producer and the consumer */
  42        int size ____cacheline_aligned_in_smp; /* max entries in queue */
  43        int batch; /* number of entries to consume in a batch */
  44        void **queue;
  45};
  46
  47/* Note: callers invoking this in a loop must use a compiler barrier,
  48 * for example cpu_relax().  If ring is ever resized, callers must hold
  49 * producer_lock - see e.g. ptr_ring_full.  Otherwise, if callers don't hold
  50 * producer_lock, the next call to __ptr_ring_produce may fail.
  51 */
  52static inline bool __ptr_ring_full(struct ptr_ring *r)
  53{
  54        return r->queue[r->producer];
  55}
  56
  57static inline bool ptr_ring_full(struct ptr_ring *r)
  58{
  59        bool ret;
  60
  61        spin_lock(&r->producer_lock);
  62        ret = __ptr_ring_full(r);
  63        spin_unlock(&r->producer_lock);
  64
  65        return ret;
  66}
  67
  68static inline bool ptr_ring_full_irq(struct ptr_ring *r)
  69{
  70        bool ret;
  71
  72        spin_lock_irq(&r->producer_lock);
  73        ret = __ptr_ring_full(r);
  74        spin_unlock_irq(&r->producer_lock);
  75
  76        return ret;
  77}
  78
  79static inline bool ptr_ring_full_any(struct ptr_ring *r)
  80{
  81        unsigned long flags;
  82        bool ret;
  83
  84        spin_lock_irqsave(&r->producer_lock, flags);
  85        ret = __ptr_ring_full(r);
  86        spin_unlock_irqrestore(&r->producer_lock, flags);
  87
  88        return ret;
  89}
  90
  91static inline bool ptr_ring_full_bh(struct ptr_ring *r)
  92{
  93        bool ret;
  94
  95        spin_lock_bh(&r->producer_lock);
  96        ret = __ptr_ring_full(r);
  97        spin_unlock_bh(&r->producer_lock);
  98
  99        return ret;
 100}
 101
 102/* Note: callers invoking this in a loop must use a compiler barrier,
 103 * for example cpu_relax(). Callers must hold producer_lock.
 104 */
 105static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
 106{
 107        if (unlikely(!r->size) || r->queue[r->producer])
 108                return -ENOSPC;
 109
 110        r->queue[r->producer++] = ptr;
 111        if (unlikely(r->producer >= r->size))
 112                r->producer = 0;
 113        return 0;
 114}
 115
 116/*
 117 * Note: resize (below) nests producer lock within consumer lock, so if you
 118 * consume in interrupt or BH context, you must disable interrupts/BH when
 119 * calling this.
 120 */
 121static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
 122{
 123        int ret;
 124
 125        spin_lock(&r->producer_lock);
 126        ret = __ptr_ring_produce(r, ptr);
 127        spin_unlock(&r->producer_lock);
 128
 129        return ret;
 130}
 131
 132static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
 133{
 134        int ret;
 135
 136        spin_lock_irq(&r->producer_lock);
 137        ret = __ptr_ring_produce(r, ptr);
 138        spin_unlock_irq(&r->producer_lock);
 139
 140        return ret;
 141}
 142
 143static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
 144{
 145        unsigned long flags;
 146        int ret;
 147
 148        spin_lock_irqsave(&r->producer_lock, flags);
 149        ret = __ptr_ring_produce(r, ptr);
 150        spin_unlock_irqrestore(&r->producer_lock, flags);
 151
 152        return ret;
 153}
 154
 155static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
 156{
 157        int ret;
 158
 159        spin_lock_bh(&r->producer_lock);
 160        ret = __ptr_ring_produce(r, ptr);
 161        spin_unlock_bh(&r->producer_lock);
 162
 163        return ret;
 164}
 165
 166/* Note: callers invoking this in a loop must use a compiler barrier,
 167 * for example cpu_relax(). Callers must take consumer_lock
 168 * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL.
 169 * If ring is never resized, and if the pointer is merely
 170 * tested, there's no need to take the lock - see e.g.  __ptr_ring_empty.
 171 */
 172static inline void *__ptr_ring_peek(struct ptr_ring *r)
 173{
 174        if (likely(r->size))
 175                return r->queue[r->consumer_head];
 176        return NULL;
 177}
 178
 179/* Note: callers invoking this in a loop must use a compiler barrier,
 180 * for example cpu_relax(). Callers must take consumer_lock
 181 * if the ring is ever resized - see e.g. ptr_ring_empty.
 182 */
 183static inline bool __ptr_ring_empty(struct ptr_ring *r)
 184{
 185        return !__ptr_ring_peek(r);
 186}
 187
 188static inline bool ptr_ring_empty(struct ptr_ring *r)
 189{
 190        bool ret;
 191
 192        spin_lock(&r->consumer_lock);
 193        ret = __ptr_ring_empty(r);
 194        spin_unlock(&r->consumer_lock);
 195
 196        return ret;
 197}
 198
 199static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
 200{
 201        bool ret;
 202
 203        spin_lock_irq(&r->consumer_lock);
 204        ret = __ptr_ring_empty(r);
 205        spin_unlock_irq(&r->consumer_lock);
 206
 207        return ret;
 208}
 209
 210static inline bool ptr_ring_empty_any(struct ptr_ring *r)
 211{
 212        unsigned long flags;
 213        bool ret;
 214
 215        spin_lock_irqsave(&r->consumer_lock, flags);
 216        ret = __ptr_ring_empty(r);
 217        spin_unlock_irqrestore(&r->consumer_lock, flags);
 218
 219        return ret;
 220}
 221
 222static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
 223{
 224        bool ret;
 225
 226        spin_lock_bh(&r->consumer_lock);
 227        ret = __ptr_ring_empty(r);
 228        spin_unlock_bh(&r->consumer_lock);
 229
 230        return ret;
 231}
 232
 233/* Must only be called after __ptr_ring_peek returned !NULL */
 234static inline void __ptr_ring_discard_one(struct ptr_ring *r)
 235{
 236        /* Fundamentally, what we want to do is update consumer
 237         * index and zero out the entry so producer can reuse it.
 238         * Doing it naively at each consume would be as simple as:
 239         *       r->queue[r->consumer++] = NULL;
 240         *       if (unlikely(r->consumer >= r->size))
 241         *               r->consumer = 0;
 242         * but that is suboptimal when the ring is full as producer is writing
 243         * out new entries in the same cache line.  Defer these updates until a
 244         * batch of entries has been consumed.
 245         */
 246        int head = r->consumer_head++;
 247
 248        /* Once we have processed enough entries invalidate them in
 249         * the ring all at once so producer can reuse their space in the ring.
 250         * We also do this when we reach end of the ring - not mandatory
 251         * but helps keep the implementation simple.
 252         */
 253        if (unlikely(r->consumer_head - r->consumer_tail >= r->batch ||
 254                     r->consumer_head >= r->size)) {
 255                /* Zero out entries in the reverse order: this way we touch the
 256                 * cache line that producer might currently be reading the last;
 257                 * producer won't make progress and touch other cache lines
 258                 * besides the first one until we write out all entries.
 259                 */
 260                while (likely(head >= r->consumer_tail))
 261                        r->queue[head--] = NULL;
 262                r->consumer_tail = r->consumer_head;
 263        }
 264        if (unlikely(r->consumer_head >= r->size)) {
 265                r->consumer_head = 0;
 266                r->consumer_tail = 0;
 267        }
 268}
 269
 270static inline void *__ptr_ring_consume(struct ptr_ring *r)
 271{
 272        void *ptr;
 273
 274        ptr = __ptr_ring_peek(r);
 275        if (ptr)
 276                __ptr_ring_discard_one(r);
 277
 278        return ptr;
 279}
 280
 281static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
 282                                             void **array, int n)
 283{
 284        void *ptr;
 285        int i;
 286
 287        for (i = 0; i < n; i++) {
 288                ptr = __ptr_ring_consume(r);
 289                if (!ptr)
 290                        break;
 291                array[i] = ptr;
 292        }
 293
 294        return i;
 295}
 296
 297/*
 298 * Note: resize (below) nests producer lock within consumer lock, so if you
 299 * call this in interrupt or BH context, you must disable interrupts/BH when
 300 * producing.
 301 */
 302static inline void *ptr_ring_consume(struct ptr_ring *r)
 303{
 304        void *ptr;
 305
 306        spin_lock(&r->consumer_lock);
 307        ptr = __ptr_ring_consume(r);
 308        spin_unlock(&r->consumer_lock);
 309
 310        return ptr;
 311}
 312
 313static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
 314{
 315        void *ptr;
 316
 317        spin_lock_irq(&r->consumer_lock);
 318        ptr = __ptr_ring_consume(r);
 319        spin_unlock_irq(&r->consumer_lock);
 320
 321        return ptr;
 322}
 323
 324static inline void *ptr_ring_consume_any(struct ptr_ring *r)
 325{
 326        unsigned long flags;
 327        void *ptr;
 328
 329        spin_lock_irqsave(&r->consumer_lock, flags);
 330        ptr = __ptr_ring_consume(r);
 331        spin_unlock_irqrestore(&r->consumer_lock, flags);
 332
 333        return ptr;
 334}
 335
 336static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
 337{
 338        void *ptr;
 339
 340        spin_lock_bh(&r->consumer_lock);
 341        ptr = __ptr_ring_consume(r);
 342        spin_unlock_bh(&r->consumer_lock);
 343
 344        return ptr;
 345}
 346
 347static inline int ptr_ring_consume_batched(struct ptr_ring *r,
 348                                           void **array, int n)
 349{
 350        int ret;
 351
 352        spin_lock(&r->consumer_lock);
 353        ret = __ptr_ring_consume_batched(r, array, n);
 354        spin_unlock(&r->consumer_lock);
 355
 356        return ret;
 357}
 358
 359static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
 360                                               void **array, int n)
 361{
 362        int ret;
 363
 364        spin_lock_irq(&r->consumer_lock);
 365        ret = __ptr_ring_consume_batched(r, array, n);
 366        spin_unlock_irq(&r->consumer_lock);
 367
 368        return ret;
 369}
 370
 371static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
 372                                               void **array, int n)
 373{
 374        unsigned long flags;
 375        int ret;
 376
 377        spin_lock_irqsave(&r->consumer_lock, flags);
 378        ret = __ptr_ring_consume_batched(r, array, n);
 379        spin_unlock_irqrestore(&r->consumer_lock, flags);
 380
 381        return ret;
 382}
 383
 384static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
 385                                              void **array, int n)
 386{
 387        int ret;
 388
 389        spin_lock_bh(&r->consumer_lock);
 390        ret = __ptr_ring_consume_batched(r, array, n);
 391        spin_unlock_bh(&r->consumer_lock);
 392
 393        return ret;
 394}
 395
 396/* Cast to structure type and call a function without discarding from FIFO.
 397 * Function must return a value.
 398 * Callers must take consumer_lock.
 399 */
 400#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
 401
 402#define PTR_RING_PEEK_CALL(r, f) ({ \
 403        typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 404        \
 405        spin_lock(&(r)->consumer_lock); \
 406        __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 407        spin_unlock(&(r)->consumer_lock); \
 408        __PTR_RING_PEEK_CALL_v; \
 409})
 410
 411#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
 412        typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 413        \
 414        spin_lock_irq(&(r)->consumer_lock); \
 415        __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 416        spin_unlock_irq(&(r)->consumer_lock); \
 417        __PTR_RING_PEEK_CALL_v; \
 418})
 419
 420#define PTR_RING_PEEK_CALL_BH(r, f) ({ \
 421        typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 422        \
 423        spin_lock_bh(&(r)->consumer_lock); \
 424        __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 425        spin_unlock_bh(&(r)->consumer_lock); \
 426        __PTR_RING_PEEK_CALL_v; \
 427})
 428
 429#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
 430        typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
 431        unsigned long __PTR_RING_PEEK_CALL_f;\
 432        \
 433        spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
 434        __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
 435        spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
 436        __PTR_RING_PEEK_CALL_v; \
 437})
 438
 439static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
 440{
 441        return kcalloc(size, sizeof(void *), gfp);
 442}
 443
 444static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
 445{
 446        r->size = size;
 447        r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
 448        /* We need to set batch at least to 1 to make logic
 449         * in __ptr_ring_discard_one work correctly.
 450         * Batching too much (because ring is small) would cause a lot of
 451         * burstiness. Needs tuning, for now disable batching.
 452         */
 453        if (r->batch > r->size / 2 || !r->batch)
 454                r->batch = 1;
 455}
 456
 457static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
 458{
 459        r->queue = __ptr_ring_init_queue_alloc(size, gfp);
 460        if (!r->queue)
 461                return -ENOMEM;
 462
 463        __ptr_ring_set_size(r, size);
 464        r->producer = r->consumer_head = r->consumer_tail = 0;
 465        spin_lock_init(&r->producer_lock);
 466        spin_lock_init(&r->consumer_lock);
 467
 468        return 0;
 469}
 470
 471/*
 472 * Return entries into ring. Destroy entries that don't fit.
 473 *
 474 * Note: this is expected to be a rare slow path operation.
 475 *
 476 * Note: producer lock is nested within consumer lock, so if you
 477 * resize you must make sure all uses nest correctly.
 478 * In particular if you consume ring in interrupt or BH context, you must
 479 * disable interrupts/BH when doing so.
 480 */
 481static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
 482                                      void (*destroy)(void *))
 483{
 484        unsigned long flags;
 485        int head;
 486
 487        spin_lock_irqsave(&r->consumer_lock, flags);
 488        spin_lock(&r->producer_lock);
 489
 490        if (!r->size)
 491                goto done;
 492
 493        /*
 494         * Clean out buffered entries (for simplicity). This way following code
 495         * can test entries for NULL and if not assume they are valid.
 496         */
 497        head = r->consumer_head - 1;
 498        while (likely(head >= r->consumer_tail))
 499                r->queue[head--] = NULL;
 500        r->consumer_tail = r->consumer_head;
 501
 502        /*
 503         * Go over entries in batch, start moving head back and copy entries.
 504         * Stop when we run into previously unconsumed entries.
 505         */
 506        while (n) {
 507                head = r->consumer_head - 1;
 508                if (head < 0)
 509                        head = r->size - 1;
 510                if (r->queue[head]) {
 511                        /* This batch entry will have to be destroyed. */
 512                        goto done;
 513                }
 514                r->queue[head] = batch[--n];
 515                r->consumer_tail = r->consumer_head = head;
 516        }
 517
 518done:
 519        /* Destroy all entries left in the batch. */
 520        while (n)
 521                destroy(batch[--n]);
 522        spin_unlock(&r->producer_lock);
 523        spin_unlock_irqrestore(&r->consumer_lock, flags);
 524}
 525
 526static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
 527                                           int size, gfp_t gfp,
 528                                           void (*destroy)(void *))
 529{
 530        int producer = 0;
 531        void **old;
 532        void *ptr;
 533
 534        while ((ptr = __ptr_ring_consume(r)))
 535                if (producer < size)
 536                        queue[producer++] = ptr;
 537                else if (destroy)
 538                        destroy(ptr);
 539
 540        __ptr_ring_set_size(r, size);
 541        r->producer = producer;
 542        r->consumer_head = 0;
 543        r->consumer_tail = 0;
 544        old = r->queue;
 545        r->queue = queue;
 546
 547        return old;
 548}
 549
 550/*
 551 * Note: producer lock is nested within consumer lock, so if you
 552 * resize you must make sure all uses nest correctly.
 553 * In particular if you consume ring in interrupt or BH context, you must
 554 * disable interrupts/BH when doing so.
 555 */
 556static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
 557                                  void (*destroy)(void *))
 558{
 559        unsigned long flags;
 560        void **queue = __ptr_ring_init_queue_alloc(size, gfp);
 561        void **old;
 562
 563        if (!queue)
 564                return -ENOMEM;
 565
 566        spin_lock_irqsave(&(r)->consumer_lock, flags);
 567        spin_lock(&(r)->producer_lock);
 568
 569        old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
 570
 571        spin_unlock(&(r)->producer_lock);
 572        spin_unlock_irqrestore(&(r)->consumer_lock, flags);
 573
 574        kfree(old);
 575
 576        return 0;
 577}
 578
 579/*
 580 * Note: producer lock is nested within consumer lock, so if you
 581 * resize you must make sure all uses nest correctly.
 582 * In particular if you consume ring in interrupt or BH context, you must
 583 * disable interrupts/BH when doing so.
 584 */
 585static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
 586                                           unsigned int nrings,
 587                                           int size,
 588                                           gfp_t gfp, void (*destroy)(void *))
 589{
 590        unsigned long flags;
 591        void ***queues;
 592        int i;
 593
 594        queues = kmalloc_array(nrings, sizeof(*queues), gfp);
 595        if (!queues)
 596                goto noqueues;
 597
 598        for (i = 0; i < nrings; ++i) {
 599                queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
 600                if (!queues[i])
 601                        goto nomem;
 602        }
 603
 604        for (i = 0; i < nrings; ++i) {
 605                spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
 606                spin_lock(&(rings[i])->producer_lock);
 607                queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
 608                                                  size, gfp, destroy);
 609                spin_unlock(&(rings[i])->producer_lock);
 610                spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
 611        }
 612
 613        for (i = 0; i < nrings; ++i)
 614                kfree(queues[i]);
 615
 616        kfree(queues);
 617
 618        return 0;
 619
 620nomem:
 621        while (--i >= 0)
 622                kfree(queues[i]);
 623
 624        kfree(queues);
 625
 626noqueues:
 627        return -ENOMEM;
 628}
 629
 630static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
 631{
 632        void *ptr;
 633
 634        if (destroy)
 635                while ((ptr = ptr_ring_consume(r)))
 636                        destroy(ptr);
 637        kfree(r->queue);
 638}
 639
 640#endif /* _LINUX_PTR_RING_H  */
 641