dpdk/lib/ring/rte_ring_generic_pvt.h
<<
>>
Prefs
   1/* SPDX-License-Identifier: BSD-3-Clause
   2 *
   3 * Copyright (c) 2010-2017 Intel Corporation
   4 * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
   5 * All rights reserved.
   6 * Derived from FreeBSD's bufring.h
   7 * Used as BSD-3 Licensed with permission from Kip Macy.
   8 */
   9
  10#ifndef _RTE_RING_GENERIC_PVT_H_
  11#define _RTE_RING_GENERIC_PVT_H_
  12
  13static __rte_always_inline void
  14__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
  15                uint32_t new_val, uint32_t single, uint32_t enqueue)
  16{
  17        if (enqueue)
  18                rte_smp_wmb();
  19        else
  20                rte_smp_rmb();
  21        /*
  22         * If there are other enqueues/dequeues in progress that preceded us,
  23         * we need to wait for them to complete
  24         */
  25        if (!single)
  26                rte_wait_until_equal_32(&ht->tail, old_val, __ATOMIC_RELAXED);
  27
  28        ht->tail = new_val;
  29}
  30
  31/**
  32 * @internal This function updates the producer head for enqueue
  33 *
  34 * @param r
  35 *   A pointer to the ring structure
  36 * @param is_sp
  37 *   Indicates whether multi-producer path is needed or not
  38 * @param n
  39 *   The number of elements we will want to enqueue, i.e. how far should the
  40 *   head be moved
  41 * @param behavior
  42 *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
  43 *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
  44 * @param old_head
  45 *   Returns head value as it was before the move, i.e. where enqueue starts
  46 * @param new_head
  47 *   Returns the current/new head value i.e. where enqueue finishes
  48 * @param free_entries
  49 *   Returns the amount of free space in the ring BEFORE head was moved
  50 * @return
  51 *   Actual number of objects enqueued.
  52 *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
  53 */
  54static __rte_always_inline unsigned int
  55__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
  56                unsigned int n, enum rte_ring_queue_behavior behavior,
  57                uint32_t *old_head, uint32_t *new_head,
  58                uint32_t *free_entries)
  59{
  60        const uint32_t capacity = r->capacity;
  61        unsigned int max = n;
  62        int success;
  63
  64        do {
  65                /* Reset n to the initial burst count */
  66                n = max;
  67
  68                *old_head = r->prod.head;
  69
  70                /* add rmb barrier to avoid load/load reorder in weak
  71                 * memory model. It is noop on x86
  72                 */
  73                rte_smp_rmb();
  74
  75                /*
  76                 *  The subtraction is done between two unsigned 32bits value
  77                 * (the result is always modulo 32 bits even if we have
  78                 * *old_head > cons_tail). So 'free_entries' is always between 0
  79                 * and capacity (which is < size).
  80                 */
  81                *free_entries = (capacity + r->cons.tail - *old_head);
  82
  83                /* check that we have enough room in ring */
  84                if (unlikely(n > *free_entries))
  85                        n = (behavior == RTE_RING_QUEUE_FIXED) ?
  86                                        0 : *free_entries;
  87
  88                if (n == 0)
  89                        return 0;
  90
  91                *new_head = *old_head + n;
  92                if (is_sp)
  93                        r->prod.head = *new_head, success = 1;
  94                else
  95                        success = rte_atomic32_cmpset(&r->prod.head,
  96                                        *old_head, *new_head);
  97        } while (unlikely(success == 0));
  98        return n;
  99}
 100
 101/**
 102 * @internal This function updates the consumer head for dequeue
 103 *
 104 * @param r
 105 *   A pointer to the ring structure
 106 * @param is_sc
 107 *   Indicates whether multi-consumer path is needed or not
 108 * @param n
 109 *   The number of elements we will want to enqueue, i.e. how far should the
 110 *   head be moved
 111 * @param behavior
 112 *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
 113 *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
 114 * @param old_head
 115 *   Returns head value as it was before the move, i.e. where dequeue starts
 116 * @param new_head
 117 *   Returns the current/new head value i.e. where dequeue finishes
 118 * @param entries
 119 *   Returns the number of entries in the ring BEFORE head was moved
 120 * @return
 121 *   - Actual number of objects dequeued.
 122 *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
 123 */
 124static __rte_always_inline unsigned int
 125__rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
 126                unsigned int n, enum rte_ring_queue_behavior behavior,
 127                uint32_t *old_head, uint32_t *new_head,
 128                uint32_t *entries)
 129{
 130        unsigned int max = n;
 131        int success;
 132
 133        /* move cons.head atomically */
 134        do {
 135                /* Restore n as it may change every loop */
 136                n = max;
 137
 138                *old_head = r->cons.head;
 139
 140                /* add rmb barrier to avoid load/load reorder in weak
 141                 * memory model. It is noop on x86
 142                 */
 143                rte_smp_rmb();
 144
 145                /* The subtraction is done between two unsigned 32bits value
 146                 * (the result is always modulo 32 bits even if we have
 147                 * cons_head > prod_tail). So 'entries' is always between 0
 148                 * and size(ring)-1.
 149                 */
 150                *entries = (r->prod.tail - *old_head);
 151
 152                /* Set the actual entries for dequeue */
 153                if (n > *entries)
 154                        n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
 155
 156                if (unlikely(n == 0))
 157                        return 0;
 158
 159                *new_head = *old_head + n;
 160                if (is_sc) {
 161                        r->cons.head = *new_head;
 162                        rte_smp_rmb();
 163                        success = 1;
 164                } else {
 165                        success = rte_atomic32_cmpset(&r->cons.head, *old_head,
 166                                        *new_head);
 167                }
 168        } while (unlikely(success == 0));
 169        return n;
 170}
 171
 172#endif /* _RTE_RING_GENERIC_PVT_H_ */
 173