1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21#ifndef _LINUX_PTR_RING_H
22#define _LINUX_PTR_RING_H 1
23
24#ifdef __KERNEL__
25#include <linux/spinlock.h>
26#include <linux/cache.h>
27#include <linux/types.h>
28#include <linux/compiler.h>
29#include <linux/cache.h>
30#include <linux/slab.h>
31#include <asm/errno.h>
32#endif
33
34struct ptr_ring {
35 int producer ____cacheline_aligned_in_smp;
36 spinlock_t producer_lock;
37 int consumer_head ____cacheline_aligned_in_smp;
38 int consumer_tail;
39 spinlock_t consumer_lock;
40
41
42 int size ____cacheline_aligned_in_smp;
43 int batch;
44 void **queue;
45};
46
47
48
49
50
51
52
53static inline bool __ptr_ring_full(struct ptr_ring *r)
54{
55 return r->queue[r->producer];
56}
57
58static inline bool ptr_ring_full(struct ptr_ring *r)
59{
60 bool ret;
61
62 spin_lock(&r->producer_lock);
63 ret = __ptr_ring_full(r);
64 spin_unlock(&r->producer_lock);
65
66 return ret;
67}
68
69static inline bool ptr_ring_full_irq(struct ptr_ring *r)
70{
71 bool ret;
72
73 spin_lock_irq(&r->producer_lock);
74 ret = __ptr_ring_full(r);
75 spin_unlock_irq(&r->producer_lock);
76
77 return ret;
78}
79
80static inline bool ptr_ring_full_any(struct ptr_ring *r)
81{
82 unsigned long flags;
83 bool ret;
84
85 spin_lock_irqsave(&r->producer_lock, flags);
86 ret = __ptr_ring_full(r);
87 spin_unlock_irqrestore(&r->producer_lock, flags);
88
89 return ret;
90}
91
92static inline bool ptr_ring_full_bh(struct ptr_ring *r)
93{
94 bool ret;
95
96 spin_lock_bh(&r->producer_lock);
97 ret = __ptr_ring_full(r);
98 spin_unlock_bh(&r->producer_lock);
99
100 return ret;
101}
102
103
104
105
106
107
108static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
109{
110 if (unlikely(!r->size) || r->queue[r->producer])
111 return -ENOSPC;
112
113
114
115 smp_wmb();
116
117 WRITE_ONCE(r->queue[r->producer++], ptr);
118 if (unlikely(r->producer >= r->size))
119 r->producer = 0;
120 return 0;
121}
122
123
124
125
126
127
128static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
129{
130 int ret;
131
132 spin_lock(&r->producer_lock);
133 ret = __ptr_ring_produce(r, ptr);
134 spin_unlock(&r->producer_lock);
135
136 return ret;
137}
138
139static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
140{
141 int ret;
142
143 spin_lock_irq(&r->producer_lock);
144 ret = __ptr_ring_produce(r, ptr);
145 spin_unlock_irq(&r->producer_lock);
146
147 return ret;
148}
149
150static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
151{
152 unsigned long flags;
153 int ret;
154
155 spin_lock_irqsave(&r->producer_lock, flags);
156 ret = __ptr_ring_produce(r, ptr);
157 spin_unlock_irqrestore(&r->producer_lock, flags);
158
159 return ret;
160}
161
162static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
163{
164 int ret;
165
166 spin_lock_bh(&r->producer_lock);
167 ret = __ptr_ring_produce(r, ptr);
168 spin_unlock_bh(&r->producer_lock);
169
170 return ret;
171}
172
173static inline void *__ptr_ring_peek(struct ptr_ring *r)
174{
175 if (likely(r->size))
176 return READ_ONCE(r->queue[r->consumer_head]);
177 return NULL;
178}
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198static inline bool __ptr_ring_empty(struct ptr_ring *r)
199{
200 if (likely(r->size))
201 return !r->queue[READ_ONCE(r->consumer_head)];
202 return true;
203}
204
205static inline bool ptr_ring_empty(struct ptr_ring *r)
206{
207 bool ret;
208
209 spin_lock(&r->consumer_lock);
210 ret = __ptr_ring_empty(r);
211 spin_unlock(&r->consumer_lock);
212
213 return ret;
214}
215
216static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
217{
218 bool ret;
219
220 spin_lock_irq(&r->consumer_lock);
221 ret = __ptr_ring_empty(r);
222 spin_unlock_irq(&r->consumer_lock);
223
224 return ret;
225}
226
227static inline bool ptr_ring_empty_any(struct ptr_ring *r)
228{
229 unsigned long flags;
230 bool ret;
231
232 spin_lock_irqsave(&r->consumer_lock, flags);
233 ret = __ptr_ring_empty(r);
234 spin_unlock_irqrestore(&r->consumer_lock, flags);
235
236 return ret;
237}
238
239static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
240{
241 bool ret;
242
243 spin_lock_bh(&r->consumer_lock);
244 ret = __ptr_ring_empty(r);
245 spin_unlock_bh(&r->consumer_lock);
246
247 return ret;
248}
249
250
251static inline void __ptr_ring_discard_one(struct ptr_ring *r)
252{
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268 int consumer_head = r->consumer_head;
269 int head = consumer_head++;
270
271
272
273
274
275
276 if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
277 consumer_head >= r->size)) {
278
279
280
281
282
283 while (likely(head >= r->consumer_tail))
284 r->queue[head--] = NULL;
285 r->consumer_tail = consumer_head;
286 }
287 if (unlikely(consumer_head >= r->size)) {
288 consumer_head = 0;
289 r->consumer_tail = 0;
290 }
291
292 WRITE_ONCE(r->consumer_head, consumer_head);
293}
294
295static inline void *__ptr_ring_consume(struct ptr_ring *r)
296{
297 void *ptr;
298
299
300
301
302
303 ptr = __ptr_ring_peek(r);
304 if (ptr)
305 __ptr_ring_discard_one(r);
306
307 return ptr;
308}
309
310static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
311 void **array, int n)
312{
313 void *ptr;
314 int i;
315
316 for (i = 0; i < n; i++) {
317 ptr = __ptr_ring_consume(r);
318 if (!ptr)
319 break;
320 array[i] = ptr;
321 }
322
323 return i;
324}
325
326
327
328
329
330
331static inline void *ptr_ring_consume(struct ptr_ring *r)
332{
333 void *ptr;
334
335 spin_lock(&r->consumer_lock);
336 ptr = __ptr_ring_consume(r);
337 spin_unlock(&r->consumer_lock);
338
339 return ptr;
340}
341
342static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
343{
344 void *ptr;
345
346 spin_lock_irq(&r->consumer_lock);
347 ptr = __ptr_ring_consume(r);
348 spin_unlock_irq(&r->consumer_lock);
349
350 return ptr;
351}
352
353static inline void *ptr_ring_consume_any(struct ptr_ring *r)
354{
355 unsigned long flags;
356 void *ptr;
357
358 spin_lock_irqsave(&r->consumer_lock, flags);
359 ptr = __ptr_ring_consume(r);
360 spin_unlock_irqrestore(&r->consumer_lock, flags);
361
362 return ptr;
363}
364
365static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
366{
367 void *ptr;
368
369 spin_lock_bh(&r->consumer_lock);
370 ptr = __ptr_ring_consume(r);
371 spin_unlock_bh(&r->consumer_lock);
372
373 return ptr;
374}
375
376static inline int ptr_ring_consume_batched(struct ptr_ring *r,
377 void **array, int n)
378{
379 int ret;
380
381 spin_lock(&r->consumer_lock);
382 ret = __ptr_ring_consume_batched(r, array, n);
383 spin_unlock(&r->consumer_lock);
384
385 return ret;
386}
387
388static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
389 void **array, int n)
390{
391 int ret;
392
393 spin_lock_irq(&r->consumer_lock);
394 ret = __ptr_ring_consume_batched(r, array, n);
395 spin_unlock_irq(&r->consumer_lock);
396
397 return ret;
398}
399
400static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
401 void **array, int n)
402{
403 unsigned long flags;
404 int ret;
405
406 spin_lock_irqsave(&r->consumer_lock, flags);
407 ret = __ptr_ring_consume_batched(r, array, n);
408 spin_unlock_irqrestore(&r->consumer_lock, flags);
409
410 return ret;
411}
412
413static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
414 void **array, int n)
415{
416 int ret;
417
418 spin_lock_bh(&r->consumer_lock);
419 ret = __ptr_ring_consume_batched(r, array, n);
420 spin_unlock_bh(&r->consumer_lock);
421
422 return ret;
423}
424
425
426
427
428
429#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
430
431#define PTR_RING_PEEK_CALL(r, f) ({ \
432 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
433 \
434 spin_lock(&(r)->consumer_lock); \
435 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
436 spin_unlock(&(r)->consumer_lock); \
437 __PTR_RING_PEEK_CALL_v; \
438})
439
440#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
441 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
442 \
443 spin_lock_irq(&(r)->consumer_lock); \
444 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
445 spin_unlock_irq(&(r)->consumer_lock); \
446 __PTR_RING_PEEK_CALL_v; \
447})
448
449#define PTR_RING_PEEK_CALL_BH(r, f) ({ \
450 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
451 \
452 spin_lock_bh(&(r)->consumer_lock); \
453 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
454 spin_unlock_bh(&(r)->consumer_lock); \
455 __PTR_RING_PEEK_CALL_v; \
456})
457
458#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
459 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
460 unsigned long __PTR_RING_PEEK_CALL_f;\
461 \
462 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
463 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
464 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
465 __PTR_RING_PEEK_CALL_v; \
466})
467
468
469
470
471static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
472{
473 if (size > KMALLOC_MAX_SIZE / sizeof(void *))
474 return NULL;
475 return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO);
476}
477
478static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
479{
480 r->size = size;
481 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
482
483
484
485
486
487 if (r->batch > r->size / 2 || !r->batch)
488 r->batch = 1;
489}
490
491static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
492{
493 r->queue = __ptr_ring_init_queue_alloc(size, gfp);
494 if (!r->queue)
495 return -ENOMEM;
496
497 __ptr_ring_set_size(r, size);
498 r->producer = r->consumer_head = r->consumer_tail = 0;
499 spin_lock_init(&r->producer_lock);
500 spin_lock_init(&r->consumer_lock);
501
502 return 0;
503}
504
505
506
507
508
509
510
511
512
513
514
515static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
516 void (*destroy)(void *))
517{
518 unsigned long flags;
519 int head;
520
521 spin_lock_irqsave(&r->consumer_lock, flags);
522 spin_lock(&r->producer_lock);
523
524 if (!r->size)
525 goto done;
526
527
528
529
530
531 head = r->consumer_head - 1;
532 while (likely(head >= r->consumer_tail))
533 r->queue[head--] = NULL;
534 r->consumer_tail = r->consumer_head;
535
536
537
538
539
540 while (n) {
541 head = r->consumer_head - 1;
542 if (head < 0)
543 head = r->size - 1;
544 if (r->queue[head]) {
545
546 goto done;
547 }
548 r->queue[head] = batch[--n];
549 r->consumer_tail = head;
550
551 WRITE_ONCE(r->consumer_head, head);
552 }
553
554done:
555
556 while (n)
557 destroy(batch[--n]);
558 spin_unlock(&r->producer_lock);
559 spin_unlock_irqrestore(&r->consumer_lock, flags);
560}
561
562static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
563 int size, gfp_t gfp,
564 void (*destroy)(void *))
565{
566 int producer = 0;
567 void **old;
568 void *ptr;
569
570 while ((ptr = __ptr_ring_consume(r)))
571 if (producer < size)
572 queue[producer++] = ptr;
573 else if (destroy)
574 destroy(ptr);
575
576 __ptr_ring_set_size(r, size);
577 r->producer = producer;
578 r->consumer_head = 0;
579 r->consumer_tail = 0;
580 old = r->queue;
581 r->queue = queue;
582
583 return old;
584}
585
586
587
588
589
590
591
592static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
593 void (*destroy)(void *))
594{
595 unsigned long flags;
596 void **queue = __ptr_ring_init_queue_alloc(size, gfp);
597 void **old;
598
599 if (!queue)
600 return -ENOMEM;
601
602 spin_lock_irqsave(&(r)->consumer_lock, flags);
603 spin_lock(&(r)->producer_lock);
604
605 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
606
607 spin_unlock(&(r)->producer_lock);
608 spin_unlock_irqrestore(&(r)->consumer_lock, flags);
609
610 kvfree(old);
611
612 return 0;
613}
614
615
616
617
618
619
620
621static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
622 unsigned int nrings,
623 int size,
624 gfp_t gfp, void (*destroy)(void *))
625{
626 unsigned long flags;
627 void ***queues;
628 int i;
629
630 queues = kmalloc_array(nrings, sizeof(*queues), gfp);
631 if (!queues)
632 goto noqueues;
633
634 for (i = 0; i < nrings; ++i) {
635 queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
636 if (!queues[i])
637 goto nomem;
638 }
639
640 for (i = 0; i < nrings; ++i) {
641 spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
642 spin_lock(&(rings[i])->producer_lock);
643 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
644 size, gfp, destroy);
645 spin_unlock(&(rings[i])->producer_lock);
646 spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
647 }
648
649 for (i = 0; i < nrings; ++i)
650 kvfree(queues[i]);
651
652 kfree(queues);
653
654 return 0;
655
656nomem:
657 while (--i >= 0)
658 kvfree(queues[i]);
659
660 kfree(queues);
661
662noqueues:
663 return -ENOMEM;
664}
665
666static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
667{
668 void *ptr;
669
670 if (destroy)
671 while ((ptr = ptr_ring_consume(r)))
672 destroy(ptr);
673 kvfree(r->queue);
674}
675
676#endif
677