1/* SPDX-License-Identifier: BSD-3-Clause 2 * 3 * Copyright (c) 2010-2020 Intel Corporation 4 * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org 5 * All rights reserved. 6 * Derived from FreeBSD's bufring.h 7 * Used as BSD-3 Licensed with permission from Kip Macy. 8 */ 9 10#ifndef _RTE_RING_RTS_H_ 11#define _RTE_RING_RTS_H_ 12 13/** 14 * @file rte_ring_rts.h 15 * It is not recommended to include this file directly. 16 * Please include <rte_ring.h> instead. 17 * 18 * Contains functions for Relaxed Tail Sync (RTS) ring mode. 19 * The main idea remains the same as for our original MP/MC synchronization 20 * mechanism. 21 * The main difference is that tail value is increased not 22 * by every thread that finished enqueue/dequeue, 23 * but only by the current last one doing enqueue/dequeue. 24 * That allows threads to skip spinning on tail value, 25 * leaving actual tail value change to last thread at a given instance. 26 * RTS requires 2 64-bit CAS for each enqueue(/dequeue) operation: 27 * one for head update, second for tail update. 28 * As a gain it allows thread to avoid spinning/waiting on tail value. 29 * In comparison original MP/MC algorithm requires one 32-bit CAS 30 * for head update and waiting/spinning on tail value. 31 * 32 * Brief outline: 33 * - introduce update counter (cnt) for both head and tail. 34 * - increment head.cnt for each head.value update 35 * - write head.value and head.cnt atomically (64-bit CAS) 36 * - move tail.value ahead only when tail.cnt + 1 == head.cnt 37 * (indicating that this is the last thread updating the tail) 38 * - increment tail.cnt when each enqueue/dequeue op finishes 39 * (no matter if tail.value going to change or not) 40 * - write tail.value and tail.cnt atomically (64-bit CAS) 41 * 42 * To avoid producer/consumer starvation: 43 * - limit max allowed distance between head and tail value (HTD_MAX). 44 * I.E. thread is allowed to proceed with changing head.value, 45 * only when: head.value - tail.value <= HTD_MAX 46 * HTD_MAX is an optional parameter. 47 * With HTD_MAX == 0 we'll have fully serialized ring - 48 * i.e. only one thread at a time will be able to enqueue/dequeue 49 * to/from the ring. 50 * With HTD_MAX >= ring.capacity - no limitation. 51 * By default HTD_MAX == ring.capacity / 8. 52 */ 53 54#ifdef __cplusplus 55extern "C" { 56#endif 57 58#include <rte_ring_rts_elem_pvt.h> 59 60/** 61 * Enqueue several objects on the RTS ring (multi-producers safe). 62 * 63 * @param r 64 * A pointer to the ring structure. 65 * @param obj_table 66 * A pointer to a table of objects. 67 * @param esize 68 * The size of ring element, in bytes. It must be a multiple of 4. 69 * This must be the same value used while creating the ring. Otherwise 70 * the results are undefined. 71 * @param n 72 * The number of objects to add in the ring from the obj_table. 73 * @param free_space 74 * if non-NULL, returns the amount of space in the ring after the 75 * enqueue operation has finished. 76 * @return 77 * The number of objects enqueued, either 0 or n 78 */ 79static __rte_always_inline unsigned int 80rte_ring_mp_rts_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, 81 unsigned int esize, unsigned int n, unsigned int *free_space) 82{ 83 return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n, 84 RTE_RING_QUEUE_FIXED, free_space); 85} 86 87/** 88 * Dequeue several objects from an RTS ring (multi-consumers safe). 89 * 90 * @param r 91 * A pointer to the ring structure. 92 * @param obj_table 93 * A pointer to a table of objects that will be filled. 94 * @param esize 95 * The size of ring element, in bytes. It must be a multiple of 4. 96 * This must be the same value used while creating the ring. Otherwise 97 * the results are undefined. 98 * @param n 99 * The number of objects to dequeue from the ring to the obj_table. 100 * @param available 101 * If non-NULL, returns the number of remaining ring entries after the 102 * dequeue has finished. 103 * @return 104 * The number of objects dequeued, either 0 or n 105 */ 106static __rte_always_inline unsigned int 107rte_ring_mc_rts_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, 108 unsigned int esize, unsigned int n, unsigned int *available) 109{ 110 return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n, 111 RTE_RING_QUEUE_FIXED, available); 112} 113 114/** 115 * Enqueue several objects on the RTS ring (multi-producers safe). 116 * 117 * @param r 118 * A pointer to the ring structure. 119 * @param obj_table 120 * A pointer to a table of objects. 121 * @param esize 122 * The size of ring element, in bytes. It must be a multiple of 4. 123 * This must be the same value used while creating the ring. Otherwise 124 * the results are undefined. 125 * @param n 126 * The number of objects to add in the ring from the obj_table. 127 * @param free_space 128 * if non-NULL, returns the amount of space in the ring after the 129 * enqueue operation has finished. 130 * @return 131 * - n: Actual number of objects enqueued. 132 */ 133static __rte_always_inline unsigned int 134rte_ring_mp_rts_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, 135 unsigned int esize, unsigned int n, unsigned int *free_space) 136{ 137 return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n, 138 RTE_RING_QUEUE_VARIABLE, free_space); 139} 140 141/** 142 * Dequeue several objects from an RTS ring (multi-consumers safe). 143 * When the requested objects are more than the available objects, 144 * only dequeue the actual number of objects. 145 * 146 * @param r 147 * A pointer to the ring structure. 148 * @param obj_table 149 * A pointer to a table of objects that will be filled. 150 * @param esize 151 * The size of ring element, in bytes. It must be a multiple of 4. 152 * This must be the same value used while creating the ring. Otherwise 153 * the results are undefined. 154 * @param n 155 * The number of objects to dequeue from the ring to the obj_table. 156 * @param available 157 * If non-NULL, returns the number of remaining ring entries after the 158 * dequeue has finished. 159 * @return 160 * - n: Actual number of objects dequeued, 0 if ring is empty 161 */ 162static __rte_always_inline unsigned int 163rte_ring_mc_rts_dequeue_burst_elem(struct rte_ring *r, void *obj_table, 164 unsigned int esize, unsigned int n, unsigned int *available) 165{ 166 return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n, 167 RTE_RING_QUEUE_VARIABLE, available); 168} 169 170/** 171 * Enqueue several objects on the RTS ring (multi-producers safe). 172 * 173 * @param r 174 * A pointer to the ring structure. 175 * @param obj_table 176 * A pointer to a table of void * pointers (objects). 177 * @param n 178 * The number of objects to add in the ring from the obj_table. 179 * @param free_space 180 * if non-NULL, returns the amount of space in the ring after the 181 * enqueue operation has finished. 182 * @return 183 * The number of objects enqueued, either 0 or n 184 */ 185static __rte_always_inline unsigned int 186rte_ring_mp_rts_enqueue_bulk(struct rte_ring *r, void * const *obj_table, 187 unsigned int n, unsigned int *free_space) 188{ 189 return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table, 190 sizeof(uintptr_t), n, free_space); 191} 192 193/** 194 * Dequeue several objects from an RTS ring (multi-consumers safe). 195 * 196 * @param r 197 * A pointer to the ring structure. 198 * @param obj_table 199 * A pointer to a table of void * pointers (objects) that will be filled. 200 * @param n 201 * The number of objects to dequeue from the ring to the obj_table. 202 * @param available 203 * If non-NULL, returns the number of remaining ring entries after the 204 * dequeue has finished. 205 * @return 206 * The number of objects dequeued, either 0 or n 207 */ 208static __rte_always_inline unsigned int 209rte_ring_mc_rts_dequeue_bulk(struct rte_ring *r, void **obj_table, 210 unsigned int n, unsigned int *available) 211{ 212 return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table, 213 sizeof(uintptr_t), n, available); 214} 215 216/** 217 * Enqueue several objects on the RTS ring (multi-producers safe). 218 * 219 * @param r 220 * A pointer to the ring structure. 221 * @param obj_table 222 * A pointer to a table of void * pointers (objects). 223 * @param n 224 * The number of objects to add in the ring from the obj_table. 225 * @param free_space 226 * if non-NULL, returns the amount of space in the ring after the 227 * enqueue operation has finished. 228 * @return 229 * - n: Actual number of objects enqueued. 230 */ 231static __rte_always_inline unsigned int 232rte_ring_mp_rts_enqueue_burst(struct rte_ring *r, void * const *obj_table, 233 unsigned int n, unsigned int *free_space) 234{ 235 return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table, 236 sizeof(uintptr_t), n, free_space); 237} 238 239/** 240 * Dequeue several objects from an RTS ring (multi-consumers safe). 241 * When the requested objects are more than the available objects, 242 * only dequeue the actual number of objects. 243 * 244 * @param r 245 * A pointer to the ring structure. 246 * @param obj_table 247 * A pointer to a table of void * pointers (objects) that will be filled. 248 * @param n 249 * The number of objects to dequeue from the ring to the obj_table. 250 * @param available 251 * If non-NULL, returns the number of remaining ring entries after the 252 * dequeue has finished. 253 * @return 254 * - n: Actual number of objects dequeued, 0 if ring is empty 255 */ 256static __rte_always_inline unsigned int 257rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table, 258 unsigned int n, unsigned int *available) 259{ 260 return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table, 261 sizeof(uintptr_t), n, available); 262} 263 264/** 265 * Return producer max Head-Tail-Distance (HTD). 266 * 267 * @param r 268 * A pointer to the ring structure. 269 * @return 270 * Producer HTD value, if producer is set in appropriate sync mode, 271 * or UINT32_MAX otherwise. 272 */ 273static inline uint32_t 274rte_ring_get_prod_htd_max(const struct rte_ring *r) 275{ 276 if (r->prod.sync_type == RTE_RING_SYNC_MT_RTS) 277 return r->rts_prod.htd_max; 278 return UINT32_MAX; 279} 280 281/** 282 * Set producer max Head-Tail-Distance (HTD). 283 * Note that producer has to use appropriate sync mode (RTS). 284 * 285 * @param r 286 * A pointer to the ring structure. 287 * @param v 288 * new HTD value to setup. 289 * @return 290 * Zero on success, or negative error code otherwise. 291 */ 292static inline int 293rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v) 294{ 295 if (r->prod.sync_type != RTE_RING_SYNC_MT_RTS) 296 return -ENOTSUP; 297 298 r->rts_prod.htd_max = v; 299 return 0; 300} 301 302/** 303 * Return consumer max Head-Tail-Distance (HTD). 304 * 305 * @param r 306 * A pointer to the ring structure. 307 * @return 308 * Consumer HTD value, if consumer is set in appropriate sync mode, 309 * or UINT32_MAX otherwise. 310 */ 311static inline uint32_t 312rte_ring_get_cons_htd_max(const struct rte_ring *r) 313{ 314 if (r->cons.sync_type == RTE_RING_SYNC_MT_RTS) 315 return r->rts_cons.htd_max; 316 return UINT32_MAX; 317} 318 319/** 320 * Set consumer max Head-Tail-Distance (HTD). 321 * Note that consumer has to use appropriate sync mode (RTS). 322 * 323 * @param r 324 * A pointer to the ring structure. 325 * @param v 326 * new HTD value to setup. 327 * @return 328 * Zero on success, or negative error code otherwise. 329 */ 330static inline int 331rte_ring_set_cons_htd_max(struct rte_ring *r, uint32_t v) 332{ 333 if (r->cons.sync_type != RTE_RING_SYNC_MT_RTS) 334 return -ENOTSUP; 335 336 r->rts_cons.htd_max = v; 337 return 0; 338} 339 340#ifdef __cplusplus 341} 342#endif 343 344#endif /* _RTE_RING_RTS_H_ */ 345