dpdk/lib/ring/rte_ring_rts.h
<<
>>
Prefs
   1/* SPDX-License-Identifier: BSD-3-Clause
   2 *
   3 * Copyright (c) 2010-2020 Intel Corporation
   4 * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
   5 * All rights reserved.
   6 * Derived from FreeBSD's bufring.h
   7 * Used as BSD-3 Licensed with permission from Kip Macy.
   8 */
   9
  10#ifndef _RTE_RING_RTS_H_
  11#define _RTE_RING_RTS_H_
  12
  13/**
  14 * @file rte_ring_rts.h
  15 * @b EXPERIMENTAL: this API may change without prior notice
  16 * It is not recommended to include this file directly.
  17 * Please include <rte_ring.h> instead.
  18 *
  19 * Contains functions for Relaxed Tail Sync (RTS) ring mode.
  20 * The main idea remains the same as for our original MP/MC synchronization
  21 * mechanism.
  22 * The main difference is that tail value is increased not
  23 * by every thread that finished enqueue/dequeue,
  24 * but only by the current last one doing enqueue/dequeue.
  25 * That allows threads to skip spinning on tail value,
  26 * leaving actual tail value change to last thread at a given instance.
  27 * RTS requires 2 64-bit CAS for each enqueue(/dequeue) operation:
  28 * one for head update, second for tail update.
  29 * As a gain it allows thread to avoid spinning/waiting on tail value.
  30 * In comparison original MP/MC algorithm requires one 32-bit CAS
  31 * for head update and waiting/spinning on tail value.
  32 *
  33 * Brief outline:
  34 *  - introduce update counter (cnt) for both head and tail.
  35 *  - increment head.cnt for each head.value update
  36 *  - write head.value and head.cnt atomically (64-bit CAS)
  37 *  - move tail.value ahead only when tail.cnt + 1 == head.cnt
  38 *    (indicating that this is the last thread updating the tail)
  39 *  - increment tail.cnt when each enqueue/dequeue op finishes
  40 *    (no matter if tail.value going to change or not)
  41 *  - write tail.value and tail.cnt atomically (64-bit CAS)
  42 *
  43 * To avoid producer/consumer starvation:
  44 *  - limit max allowed distance between head and tail value (HTD_MAX).
  45 *    I.E. thread is allowed to proceed with changing head.value,
  46 *    only when:  head.value - tail.value <= HTD_MAX
  47 * HTD_MAX is an optional parameter.
  48 * With HTD_MAX == 0 we'll have fully serialized ring -
  49 * i.e. only one thread at a time will be able to enqueue/dequeue
  50 * to/from the ring.
  51 * With HTD_MAX >= ring.capacity - no limitation.
  52 * By default HTD_MAX == ring.capacity / 8.
  53 */
  54
  55#ifdef __cplusplus
  56extern "C" {
  57#endif
  58
  59#include <rte_ring_rts_elem_pvt.h>
  60
  61/**
  62 * Enqueue several objects on the RTS ring (multi-producers safe).
  63 *
  64 * @param r
  65 *   A pointer to the ring structure.
  66 * @param obj_table
  67 *   A pointer to a table of objects.
  68 * @param esize
  69 *   The size of ring element, in bytes. It must be a multiple of 4.
  70 *   This must be the same value used while creating the ring. Otherwise
  71 *   the results are undefined.
  72 * @param n
  73 *   The number of objects to add in the ring from the obj_table.
  74 * @param free_space
  75 *   if non-NULL, returns the amount of space in the ring after the
  76 *   enqueue operation has finished.
  77 * @return
  78 *   The number of objects enqueued, either 0 or n
  79 */
  80__rte_experimental
  81static __rte_always_inline unsigned int
  82rte_ring_mp_rts_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
  83        unsigned int esize, unsigned int n, unsigned int *free_space)
  84{
  85        return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n,
  86                        RTE_RING_QUEUE_FIXED, free_space);
  87}
  88
  89/**
  90 * Dequeue several objects from an RTS ring (multi-consumers safe).
  91 *
  92 * @param r
  93 *   A pointer to the ring structure.
  94 * @param obj_table
  95 *   A pointer to a table of objects that will be filled.
  96 * @param esize
  97 *   The size of ring element, in bytes. It must be a multiple of 4.
  98 *   This must be the same value used while creating the ring. Otherwise
  99 *   the results are undefined.
 100 * @param n
 101 *   The number of objects to dequeue from the ring to the obj_table.
 102 * @param available
 103 *   If non-NULL, returns the number of remaining ring entries after the
 104 *   dequeue has finished.
 105 * @return
 106 *   The number of objects dequeued, either 0 or n
 107 */
 108__rte_experimental
 109static __rte_always_inline unsigned int
 110rte_ring_mc_rts_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
 111        unsigned int esize, unsigned int n, unsigned int *available)
 112{
 113        return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n,
 114                        RTE_RING_QUEUE_FIXED, available);
 115}
 116
 117/**
 118 * Enqueue several objects on the RTS ring (multi-producers safe).
 119 *
 120 * @param r
 121 *   A pointer to the ring structure.
 122 * @param obj_table
 123 *   A pointer to a table of objects.
 124 * @param esize
 125 *   The size of ring element, in bytes. It must be a multiple of 4.
 126 *   This must be the same value used while creating the ring. Otherwise
 127 *   the results are undefined.
 128 * @param n
 129 *   The number of objects to add in the ring from the obj_table.
 130 * @param free_space
 131 *   if non-NULL, returns the amount of space in the ring after the
 132 *   enqueue operation has finished.
 133 * @return
 134 *   - n: Actual number of objects enqueued.
 135 */
 136__rte_experimental
 137static __rte_always_inline unsigned int
 138rte_ring_mp_rts_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
 139        unsigned int esize, unsigned int n, unsigned int *free_space)
 140{
 141        return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n,
 142                        RTE_RING_QUEUE_VARIABLE, free_space);
 143}
 144
 145/**
 146 * Dequeue several objects from an RTS  ring (multi-consumers safe).
 147 * When the requested objects are more than the available objects,
 148 * only dequeue the actual number of objects.
 149 *
 150 * @param r
 151 *   A pointer to the ring structure.
 152 * @param obj_table
 153 *   A pointer to a table of objects that will be filled.
 154 * @param esize
 155 *   The size of ring element, in bytes. It must be a multiple of 4.
 156 *   This must be the same value used while creating the ring. Otherwise
 157 *   the results are undefined.
 158 * @param n
 159 *   The number of objects to dequeue from the ring to the obj_table.
 160 * @param available
 161 *   If non-NULL, returns the number of remaining ring entries after the
 162 *   dequeue has finished.
 163 * @return
 164 *   - n: Actual number of objects dequeued, 0 if ring is empty
 165 */
 166__rte_experimental
 167static __rte_always_inline unsigned int
 168rte_ring_mc_rts_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
 169        unsigned int esize, unsigned int n, unsigned int *available)
 170{
 171        return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n,
 172                        RTE_RING_QUEUE_VARIABLE, available);
 173}
 174
 175/**
 176 * Enqueue several objects on the RTS ring (multi-producers safe).
 177 *
 178 * @param r
 179 *   A pointer to the ring structure.
 180 * @param obj_table
 181 *   A pointer to a table of void * pointers (objects).
 182 * @param n
 183 *   The number of objects to add in the ring from the obj_table.
 184 * @param free_space
 185 *   if non-NULL, returns the amount of space in the ring after the
 186 *   enqueue operation has finished.
 187 * @return
 188 *   The number of objects enqueued, either 0 or n
 189 */
 190__rte_experimental
 191static __rte_always_inline unsigned int
 192rte_ring_mp_rts_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
 193                         unsigned int n, unsigned int *free_space)
 194{
 195        return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table,
 196                        sizeof(uintptr_t), n, free_space);
 197}
 198
 199/**
 200 * Dequeue several objects from an RTS ring (multi-consumers safe).
 201 *
 202 * @param r
 203 *   A pointer to the ring structure.
 204 * @param obj_table
 205 *   A pointer to a table of void * pointers (objects) that will be filled.
 206 * @param n
 207 *   The number of objects to dequeue from the ring to the obj_table.
 208 * @param available
 209 *   If non-NULL, returns the number of remaining ring entries after the
 210 *   dequeue has finished.
 211 * @return
 212 *   The number of objects dequeued, either 0 or n
 213 */
 214__rte_experimental
 215static __rte_always_inline unsigned int
 216rte_ring_mc_rts_dequeue_bulk(struct rte_ring *r, void **obj_table,
 217                unsigned int n, unsigned int *available)
 218{
 219        return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table,
 220                        sizeof(uintptr_t), n, available);
 221}
 222
 223/**
 224 * Enqueue several objects on the RTS ring (multi-producers safe).
 225 *
 226 * @param r
 227 *   A pointer to the ring structure.
 228 * @param obj_table
 229 *   A pointer to a table of void * pointers (objects).
 230 * @param n
 231 *   The number of objects to add in the ring from the obj_table.
 232 * @param free_space
 233 *   if non-NULL, returns the amount of space in the ring after the
 234 *   enqueue operation has finished.
 235 * @return
 236 *   - n: Actual number of objects enqueued.
 237 */
 238__rte_experimental
 239static __rte_always_inline unsigned int
 240rte_ring_mp_rts_enqueue_burst(struct rte_ring *r, void * const *obj_table,
 241                         unsigned int n, unsigned int *free_space)
 242{
 243        return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table,
 244                        sizeof(uintptr_t), n, free_space);
 245}
 246
 247/**
 248 * Dequeue several objects from an RTS  ring (multi-consumers safe).
 249 * When the requested objects are more than the available objects,
 250 * only dequeue the actual number of objects.
 251 *
 252 * @param r
 253 *   A pointer to the ring structure.
 254 * @param obj_table
 255 *   A pointer to a table of void * pointers (objects) that will be filled.
 256 * @param n
 257 *   The number of objects to dequeue from the ring to the obj_table.
 258 * @param available
 259 *   If non-NULL, returns the number of remaining ring entries after the
 260 *   dequeue has finished.
 261 * @return
 262 *   - n: Actual number of objects dequeued, 0 if ring is empty
 263 */
 264__rte_experimental
 265static __rte_always_inline unsigned int
 266rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table,
 267                unsigned int n, unsigned int *available)
 268{
 269        return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table,
 270                        sizeof(uintptr_t), n, available);
 271}
 272
 273/**
 274 * Return producer max Head-Tail-Distance (HTD).
 275 *
 276 * @param r
 277 *   A pointer to the ring structure.
 278 * @return
 279 *   Producer HTD value, if producer is set in appropriate sync mode,
 280 *   or UINT32_MAX otherwise.
 281 */
 282__rte_experimental
 283static inline uint32_t
 284rte_ring_get_prod_htd_max(const struct rte_ring *r)
 285{
 286        if (r->prod.sync_type == RTE_RING_SYNC_MT_RTS)
 287                return r->rts_prod.htd_max;
 288        return UINT32_MAX;
 289}
 290
 291/**
 292 * Set producer max Head-Tail-Distance (HTD).
 293 * Note that producer has to use appropriate sync mode (RTS).
 294 *
 295 * @param r
 296 *   A pointer to the ring structure.
 297 * @param v
 298 *   new HTD value to setup.
 299 * @return
 300 *   Zero on success, or negative error code otherwise.
 301 */
 302__rte_experimental
 303static inline int
 304rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v)
 305{
 306        if (r->prod.sync_type != RTE_RING_SYNC_MT_RTS)
 307                return -ENOTSUP;
 308
 309        r->rts_prod.htd_max = v;
 310        return 0;
 311}
 312
 313/**
 314 * Return consumer max Head-Tail-Distance (HTD).
 315 *
 316 * @param r
 317 *   A pointer to the ring structure.
 318 * @return
 319 *   Consumer HTD value, if consumer is set in appropriate sync mode,
 320 *   or UINT32_MAX otherwise.
 321 */
 322__rte_experimental
 323static inline uint32_t
 324rte_ring_get_cons_htd_max(const struct rte_ring *r)
 325{
 326        if (r->cons.sync_type == RTE_RING_SYNC_MT_RTS)
 327                return r->rts_cons.htd_max;
 328        return UINT32_MAX;
 329}
 330
 331/**
 332 * Set consumer max Head-Tail-Distance (HTD).
 333 * Note that consumer has to use appropriate sync mode (RTS).
 334 *
 335 * @param r
 336 *   A pointer to the ring structure.
 337 * @param v
 338 *   new HTD value to setup.
 339 * @return
 340 *   Zero on success, or negative error code otherwise.
 341 */
 342__rte_experimental
 343static inline int
 344rte_ring_set_cons_htd_max(struct rte_ring *r, uint32_t v)
 345{
 346        if (r->cons.sync_type != RTE_RING_SYNC_MT_RTS)
 347                return -ENOTSUP;
 348
 349        r->rts_cons.htd_max = v;
 350        return 0;
 351}
 352
 353#ifdef __cplusplus
 354}
 355#endif
 356
 357#endif /* _RTE_RING_RTS_H_ */
 358