1/* SPDX-License-Identifier: BSD-3-Clause 2 * 3 * Copyright (c) 2010-2020 Intel Corporation 4 * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org 5 * All rights reserved. 6 * Derived from FreeBSD's bufring.h 7 * Used as BSD-3 Licensed with permission from Kip Macy. 8 */ 9 10#ifndef _RTE_RING_RTS_H_ 11#define _RTE_RING_RTS_H_ 12 13/** 14 * @file rte_ring_rts.h 15 * @b EXPERIMENTAL: this API may change without prior notice 16 * It is not recommended to include this file directly. 17 * Please include <rte_ring.h> instead. 18 * 19 * Contains functions for Relaxed Tail Sync (RTS) ring mode. 20 * The main idea remains the same as for our original MP/MC synchronization 21 * mechanism. 22 * The main difference is that tail value is increased not 23 * by every thread that finished enqueue/dequeue, 24 * but only by the current last one doing enqueue/dequeue. 25 * That allows threads to skip spinning on tail value, 26 * leaving actual tail value change to last thread at a given instance. 27 * RTS requires 2 64-bit CAS for each enqueue(/dequeue) operation: 28 * one for head update, second for tail update. 29 * As a gain it allows thread to avoid spinning/waiting on tail value. 30 * In comparison original MP/MC algorithm requires one 32-bit CAS 31 * for head update and waiting/spinning on tail value. 32 * 33 * Brief outline: 34 * - introduce update counter (cnt) for both head and tail. 35 * - increment head.cnt for each head.value update 36 * - write head.value and head.cnt atomically (64-bit CAS) 37 * - move tail.value ahead only when tail.cnt + 1 == head.cnt 38 * (indicating that this is the last thread updating the tail) 39 * - increment tail.cnt when each enqueue/dequeue op finishes 40 * (no matter if tail.value going to change or not) 41 * - write tail.value and tail.cnt atomically (64-bit CAS) 42 * 43 * To avoid producer/consumer starvation: 44 * - limit max allowed distance between head and tail value (HTD_MAX). 45 * I.E. thread is allowed to proceed with changing head.value, 46 * only when: head.value - tail.value <= HTD_MAX 47 * HTD_MAX is an optional parameter. 48 * With HTD_MAX == 0 we'll have fully serialized ring - 49 * i.e. only one thread at a time will be able to enqueue/dequeue 50 * to/from the ring. 51 * With HTD_MAX >= ring.capacity - no limitation. 52 * By default HTD_MAX == ring.capacity / 8. 53 */ 54 55#ifdef __cplusplus 56extern "C" { 57#endif 58 59#include <rte_ring_rts_elem_pvt.h> 60 61/** 62 * Enqueue several objects on the RTS ring (multi-producers safe). 63 * 64 * @param r 65 * A pointer to the ring structure. 66 * @param obj_table 67 * A pointer to a table of objects. 68 * @param esize 69 * The size of ring element, in bytes. It must be a multiple of 4. 70 * This must be the same value used while creating the ring. Otherwise 71 * the results are undefined. 72 * @param n 73 * The number of objects to add in the ring from the obj_table. 74 * @param free_space 75 * if non-NULL, returns the amount of space in the ring after the 76 * enqueue operation has finished. 77 * @return 78 * The number of objects enqueued, either 0 or n 79 */ 80__rte_experimental 81static __rte_always_inline unsigned int 82rte_ring_mp_rts_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, 83 unsigned int esize, unsigned int n, unsigned int *free_space) 84{ 85 return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n, 86 RTE_RING_QUEUE_FIXED, free_space); 87} 88 89/** 90 * Dequeue several objects from an RTS ring (multi-consumers safe). 91 * 92 * @param r 93 * A pointer to the ring structure. 94 * @param obj_table 95 * A pointer to a table of objects that will be filled. 96 * @param esize 97 * The size of ring element, in bytes. It must be a multiple of 4. 98 * This must be the same value used while creating the ring. Otherwise 99 * the results are undefined. 100 * @param n 101 * The number of objects to dequeue from the ring to the obj_table. 102 * @param available 103 * If non-NULL, returns the number of remaining ring entries after the 104 * dequeue has finished. 105 * @return 106 * The number of objects dequeued, either 0 or n 107 */ 108__rte_experimental 109static __rte_always_inline unsigned int 110rte_ring_mc_rts_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, 111 unsigned int esize, unsigned int n, unsigned int *available) 112{ 113 return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n, 114 RTE_RING_QUEUE_FIXED, available); 115} 116 117/** 118 * Enqueue several objects on the RTS ring (multi-producers safe). 119 * 120 * @param r 121 * A pointer to the ring structure. 122 * @param obj_table 123 * A pointer to a table of objects. 124 * @param esize 125 * The size of ring element, in bytes. It must be a multiple of 4. 126 * This must be the same value used while creating the ring. Otherwise 127 * the results are undefined. 128 * @param n 129 * The number of objects to add in the ring from the obj_table. 130 * @param free_space 131 * if non-NULL, returns the amount of space in the ring after the 132 * enqueue operation has finished. 133 * @return 134 * - n: Actual number of objects enqueued. 135 */ 136__rte_experimental 137static __rte_always_inline unsigned int 138rte_ring_mp_rts_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, 139 unsigned int esize, unsigned int n, unsigned int *free_space) 140{ 141 return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n, 142 RTE_RING_QUEUE_VARIABLE, free_space); 143} 144 145/** 146 * Dequeue several objects from an RTS ring (multi-consumers safe). 147 * When the requested objects are more than the available objects, 148 * only dequeue the actual number of objects. 149 * 150 * @param r 151 * A pointer to the ring structure. 152 * @param obj_table 153 * A pointer to a table of objects that will be filled. 154 * @param esize 155 * The size of ring element, in bytes. It must be a multiple of 4. 156 * This must be the same value used while creating the ring. Otherwise 157 * the results are undefined. 158 * @param n 159 * The number of objects to dequeue from the ring to the obj_table. 160 * @param available 161 * If non-NULL, returns the number of remaining ring entries after the 162 * dequeue has finished. 163 * @return 164 * - n: Actual number of objects dequeued, 0 if ring is empty 165 */ 166__rte_experimental 167static __rte_always_inline unsigned int 168rte_ring_mc_rts_dequeue_burst_elem(struct rte_ring *r, void *obj_table, 169 unsigned int esize, unsigned int n, unsigned int *available) 170{ 171 return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n, 172 RTE_RING_QUEUE_VARIABLE, available); 173} 174 175/** 176 * Enqueue several objects on the RTS ring (multi-producers safe). 177 * 178 * @param r 179 * A pointer to the ring structure. 180 * @param obj_table 181 * A pointer to a table of void * pointers (objects). 182 * @param n 183 * The number of objects to add in the ring from the obj_table. 184 * @param free_space 185 * if non-NULL, returns the amount of space in the ring after the 186 * enqueue operation has finished. 187 * @return 188 * The number of objects enqueued, either 0 or n 189 */ 190__rte_experimental 191static __rte_always_inline unsigned int 192rte_ring_mp_rts_enqueue_bulk(struct rte_ring *r, void * const *obj_table, 193 unsigned int n, unsigned int *free_space) 194{ 195 return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table, 196 sizeof(uintptr_t), n, free_space); 197} 198 199/** 200 * Dequeue several objects from an RTS ring (multi-consumers safe). 201 * 202 * @param r 203 * A pointer to the ring structure. 204 * @param obj_table 205 * A pointer to a table of void * pointers (objects) that will be filled. 206 * @param n 207 * The number of objects to dequeue from the ring to the obj_table. 208 * @param available 209 * If non-NULL, returns the number of remaining ring entries after the 210 * dequeue has finished. 211 * @return 212 * The number of objects dequeued, either 0 or n 213 */ 214__rte_experimental 215static __rte_always_inline unsigned int 216rte_ring_mc_rts_dequeue_bulk(struct rte_ring *r, void **obj_table, 217 unsigned int n, unsigned int *available) 218{ 219 return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table, 220 sizeof(uintptr_t), n, available); 221} 222 223/** 224 * Enqueue several objects on the RTS ring (multi-producers safe). 225 * 226 * @param r 227 * A pointer to the ring structure. 228 * @param obj_table 229 * A pointer to a table of void * pointers (objects). 230 * @param n 231 * The number of objects to add in the ring from the obj_table. 232 * @param free_space 233 * if non-NULL, returns the amount of space in the ring after the 234 * enqueue operation has finished. 235 * @return 236 * - n: Actual number of objects enqueued. 237 */ 238__rte_experimental 239static __rte_always_inline unsigned int 240rte_ring_mp_rts_enqueue_burst(struct rte_ring *r, void * const *obj_table, 241 unsigned int n, unsigned int *free_space) 242{ 243 return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table, 244 sizeof(uintptr_t), n, free_space); 245} 246 247/** 248 * Dequeue several objects from an RTS ring (multi-consumers safe). 249 * When the requested objects are more than the available objects, 250 * only dequeue the actual number of objects. 251 * 252 * @param r 253 * A pointer to the ring structure. 254 * @param obj_table 255 * A pointer to a table of void * pointers (objects) that will be filled. 256 * @param n 257 * The number of objects to dequeue from the ring to the obj_table. 258 * @param available 259 * If non-NULL, returns the number of remaining ring entries after the 260 * dequeue has finished. 261 * @return 262 * - n: Actual number of objects dequeued, 0 if ring is empty 263 */ 264__rte_experimental 265static __rte_always_inline unsigned int 266rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table, 267 unsigned int n, unsigned int *available) 268{ 269 return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table, 270 sizeof(uintptr_t), n, available); 271} 272 273/** 274 * Return producer max Head-Tail-Distance (HTD). 275 * 276 * @param r 277 * A pointer to the ring structure. 278 * @return 279 * Producer HTD value, if producer is set in appropriate sync mode, 280 * or UINT32_MAX otherwise. 281 */ 282__rte_experimental 283static inline uint32_t 284rte_ring_get_prod_htd_max(const struct rte_ring *r) 285{ 286 if (r->prod.sync_type == RTE_RING_SYNC_MT_RTS) 287 return r->rts_prod.htd_max; 288 return UINT32_MAX; 289} 290 291/** 292 * Set producer max Head-Tail-Distance (HTD). 293 * Note that producer has to use appropriate sync mode (RTS). 294 * 295 * @param r 296 * A pointer to the ring structure. 297 * @param v 298 * new HTD value to setup. 299 * @return 300 * Zero on success, or negative error code otherwise. 301 */ 302__rte_experimental 303static inline int 304rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v) 305{ 306 if (r->prod.sync_type != RTE_RING_SYNC_MT_RTS) 307 return -ENOTSUP; 308 309 r->rts_prod.htd_max = v; 310 return 0; 311} 312 313/** 314 * Return consumer max Head-Tail-Distance (HTD). 315 * 316 * @param r 317 * A pointer to the ring structure. 318 * @return 319 * Consumer HTD value, if consumer is set in appropriate sync mode, 320 * or UINT32_MAX otherwise. 321 */ 322__rte_experimental 323static inline uint32_t 324rte_ring_get_cons_htd_max(const struct rte_ring *r) 325{ 326 if (r->cons.sync_type == RTE_RING_SYNC_MT_RTS) 327 return r->rts_cons.htd_max; 328 return UINT32_MAX; 329} 330 331/** 332 * Set consumer max Head-Tail-Distance (HTD). 333 * Note that consumer has to use appropriate sync mode (RTS). 334 * 335 * @param r 336 * A pointer to the ring structure. 337 * @param v 338 * new HTD value to setup. 339 * @return 340 * Zero on success, or negative error code otherwise. 341 */ 342__rte_experimental 343static inline int 344rte_ring_set_cons_htd_max(struct rte_ring *r, uint32_t v) 345{ 346 if (r->cons.sync_type != RTE_RING_SYNC_MT_RTS) 347 return -ENOTSUP; 348 349 r->rts_cons.htd_max = v; 350 return 0; 351} 352 353#ifdef __cplusplus 354} 355#endif 356 357#endif /* _RTE_RING_RTS_H_ */ 358