dpdk/lib/ring/rte_ring_hts_elem_pvt.h
<<
>>
Prefs
   1/* SPDX-License-Identifier: BSD-3-Clause
   2 *
   3 * Copyright (c) 2010-2020 Intel Corporation
   4 * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
   5 * All rights reserved.
   6 * Derived from FreeBSD's bufring.h
   7 * Used as BSD-3 Licensed with permission from Kip Macy.
   8 */
   9
  10#ifndef _RTE_RING_HTS_ELEM_PVT_H_
  11#define _RTE_RING_HTS_ELEM_PVT_H_
  12
  13/**
  14 * @file rte_ring_hts_elem_pvt.h
  15 * It is not recommended to include this file directly,
  16 * include <rte_ring.h> instead.
  17 * Contains internal helper functions for head/tail sync (HTS) ring mode.
  18 * For more information please refer to <rte_ring_hts.h>.
  19 */
  20
  21/**
  22 * @internal update tail with new value.
  23 */
  24static __rte_always_inline void
  25__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t old_tail,
  26        uint32_t num, uint32_t enqueue)
  27{
  28        uint32_t tail;
  29
  30        RTE_SET_USED(enqueue);
  31
  32        tail = old_tail + num;
  33        __atomic_store_n(&ht->ht.pos.tail, tail, __ATOMIC_RELEASE);
  34}
  35
  36/**
  37 * @internal waits till tail will become equal to head.
  38 * Means no writer/reader is active for that ring.
  39 * Suppose to work as serialization point.
  40 */
  41static __rte_always_inline void
  42__rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht,
  43                union __rte_ring_hts_pos *p)
  44{
  45        while (p->pos.head != p->pos.tail) {
  46                rte_pause();
  47                p->raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_ACQUIRE);
  48        }
  49}
  50
  51/**
  52 * @internal This function updates the producer head for enqueue
  53 */
  54static __rte_always_inline unsigned int
  55__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
  56        enum rte_ring_queue_behavior behavior, uint32_t *old_head,
  57        uint32_t *free_entries)
  58{
  59        uint32_t n;
  60        union __rte_ring_hts_pos np, op;
  61
  62        const uint32_t capacity = r->capacity;
  63
  64        op.raw = __atomic_load_n(&r->hts_prod.ht.raw, __ATOMIC_ACQUIRE);
  65
  66        do {
  67                /* Reset n to the initial burst count */
  68                n = num;
  69
  70                /*
  71                 * wait for tail to be equal to head,
  72                 * make sure that we read prod head/tail *before*
  73                 * reading cons tail.
  74                 */
  75                __rte_ring_hts_head_wait(&r->hts_prod, &op);
  76
  77                /*
  78                 *  The subtraction is done between two unsigned 32bits value
  79                 * (the result is always modulo 32 bits even if we have
  80                 * *old_head > cons_tail). So 'free_entries' is always between 0
  81                 * and capacity (which is < size).
  82                 */
  83                *free_entries = capacity + r->cons.tail - op.pos.head;
  84
  85                /* check that we have enough room in ring */
  86                if (unlikely(n > *free_entries))
  87                        n = (behavior == RTE_RING_QUEUE_FIXED) ?
  88                                        0 : *free_entries;
  89
  90                if (n == 0)
  91                        break;
  92
  93                np.pos.tail = op.pos.tail;
  94                np.pos.head = op.pos.head + n;
  95
  96        /*
  97         * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
  98         *  - OOO reads of cons tail value
  99         *  - OOO copy of elems from the ring
 100         */
 101        } while (__atomic_compare_exchange_n(&r->hts_prod.ht.raw,
 102                        &op.raw, np.raw,
 103                        0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0);
 104
 105        *old_head = op.pos.head;
 106        return n;
 107}
 108
 109/**
 110 * @internal This function updates the consumer head for dequeue
 111 */
 112static __rte_always_inline unsigned int
 113__rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num,
 114        enum rte_ring_queue_behavior behavior, uint32_t *old_head,
 115        uint32_t *entries)
 116{
 117        uint32_t n;
 118        union __rte_ring_hts_pos np, op;
 119
 120        op.raw = __atomic_load_n(&r->hts_cons.ht.raw, __ATOMIC_ACQUIRE);
 121
 122        /* move cons.head atomically */
 123        do {
 124                /* Restore n as it may change every loop */
 125                n = num;
 126
 127                /*
 128                 * wait for tail to be equal to head,
 129                 * make sure that we read cons head/tail *before*
 130                 * reading prod tail.
 131                 */
 132                __rte_ring_hts_head_wait(&r->hts_cons, &op);
 133
 134                /* The subtraction is done between two unsigned 32bits value
 135                 * (the result is always modulo 32 bits even if we have
 136                 * cons_head > prod_tail). So 'entries' is always between 0
 137                 * and size(ring)-1.
 138                 */
 139                *entries = r->prod.tail - op.pos.head;
 140
 141                /* Set the actual entries for dequeue */
 142                if (n > *entries)
 143                        n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
 144
 145                if (unlikely(n == 0))
 146                        break;
 147
 148                np.pos.tail = op.pos.tail;
 149                np.pos.head = op.pos.head + n;
 150
 151        /*
 152         * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
 153         *  - OOO reads of prod tail value
 154         *  - OOO copy of elems from the ring
 155         */
 156        } while (__atomic_compare_exchange_n(&r->hts_cons.ht.raw,
 157                        &op.raw, np.raw,
 158                        0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0);
 159
 160        *old_head = op.pos.head;
 161        return n;
 162}
 163
 164/**
 165 * @internal Enqueue several objects on the HTS ring.
 166 *
 167 * @param r
 168 *   A pointer to the ring structure.
 169 * @param obj_table
 170 *   A pointer to a table of objects.
 171 * @param esize
 172 *   The size of ring element, in bytes. It must be a multiple of 4.
 173 *   This must be the same value used while creating the ring. Otherwise
 174 *   the results are undefined.
 175 * @param n
 176 *   The number of objects to add in the ring from the obj_table.
 177 * @param behavior
 178 *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
 179 *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
 180 * @param free_space
 181 *   returns the amount of space after the enqueue operation has finished
 182 * @return
 183 *   Actual number of objects enqueued.
 184 *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
 185 */
 186static __rte_always_inline unsigned int
 187__rte_ring_do_hts_enqueue_elem(struct rte_ring *r, const void *obj_table,
 188        uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
 189        uint32_t *free_space)
 190{
 191        uint32_t free, head;
 192
 193        n =  __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free);
 194
 195        if (n != 0) {
 196                __rte_ring_enqueue_elems(r, head, obj_table, esize, n);
 197                __rte_ring_hts_update_tail(&r->hts_prod, head, n, 1);
 198        }
 199
 200        if (free_space != NULL)
 201                *free_space = free - n;
 202        return n;
 203}
 204
 205/**
 206 * @internal Dequeue several objects from the HTS ring.
 207 *
 208 * @param r
 209 *   A pointer to the ring structure.
 210 * @param obj_table
 211 *   A pointer to a table of objects.
 212 * @param esize
 213 *   The size of ring element, in bytes. It must be a multiple of 4.
 214 *   This must be the same value used while creating the ring. Otherwise
 215 *   the results are undefined.
 216 * @param n
 217 *   The number of objects to pull from the ring.
 218 * @param behavior
 219 *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
 220 *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
 221 * @param available
 222 *   returns the number of remaining ring entries after the dequeue has finished
 223 * @return
 224 *   - Actual number of objects dequeued.
 225 *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
 226 */
 227static __rte_always_inline unsigned int
 228__rte_ring_do_hts_dequeue_elem(struct rte_ring *r, void *obj_table,
 229        uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
 230        uint32_t *available)
 231{
 232        uint32_t entries, head;
 233
 234        n = __rte_ring_hts_move_cons_head(r, n, behavior, &head, &entries);
 235
 236        if (n != 0) {
 237                __rte_ring_dequeue_elems(r, head, obj_table, esize, n);
 238                __rte_ring_hts_update_tail(&r->hts_cons, head, n, 0);
 239        }
 240
 241        if (available != NULL)
 242                *available = entries - n;
 243        return n;
 244}
 245
 246#endif /* _RTE_RING_HTS_ELEM_PVT_H_ */
 247