dpdk/lib/rcu/rte_rcu_qsbr.h
<<
>>
Prefs
   1/* SPDX-License-Identifier: BSD-3-Clause
   2 * Copyright (c) 2018-2020 Arm Limited
   3 */
   4
   5#ifndef _RTE_RCU_QSBR_H_
   6#define _RTE_RCU_QSBR_H_
   7
   8/**
   9 * @file
  10 *
  11 * RTE Quiescent State Based Reclamation (QSBR).
  12 *
  13 * @warning
  14 * @b EXPERIMENTAL:
  15 * All functions in this file may be changed or removed without prior notice.
  16 *
  17 * Quiescent State (QS) is any point in the thread execution
  18 * where the thread does not hold a reference to a data structure
  19 * in shared memory. While using lock-less data structures, the writer
  20 * can safely free memory once all the reader threads have entered
  21 * quiescent state.
  22 *
  23 * This library provides the ability for the readers to report quiescent
  24 * state and for the writers to identify when all the readers have
  25 * entered quiescent state.
  26 */
  27
  28#ifdef __cplusplus
  29extern "C" {
  30#endif
  31
  32#include <stdbool.h>
  33#include <stdio.h>
  34#include <stdint.h>
  35#include <inttypes.h>
  36#include <errno.h>
  37#include <rte_common.h>
  38#include <rte_memory.h>
  39#include <rte_lcore.h>
  40#include <rte_debug.h>
  41#include <rte_atomic.h>
  42#include <rte_ring.h>
  43
  44extern int rte_rcu_log_type;
  45
  46#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
  47#define __RTE_RCU_DP_LOG(level, fmt, args...) \
  48        rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
  49                "%s(): " fmt "\n", __func__, ## args)
  50#else
  51#define __RTE_RCU_DP_LOG(level, fmt, args...)
  52#endif
  53
  54#if defined(RTE_LIBRTE_RCU_DEBUG)
  55#define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
  56        if (v->qsbr_cnt[thread_id].lock_cnt) \
  57                rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
  58                        "%s(): " fmt "\n", __func__, ## args); \
  59} while (0)
  60#else
  61#define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
  62#endif
  63
  64/* Registered thread IDs are stored as a bitmap of 64b element array.
  65 * Given thread id needs to be converted to index into the array and
  66 * the id within the array element.
  67 */
  68#define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
  69#define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
  70        RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
  71                __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
  72#define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
  73        ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
  74#define __RTE_QSBR_THRID_INDEX_SHIFT 6
  75#define __RTE_QSBR_THRID_MASK 0x3f
  76#define RTE_QSBR_THRID_INVALID 0xffffffff
  77
  78/* Worker thread counter */
  79struct rte_rcu_qsbr_cnt {
  80        uint64_t cnt;
  81        /**< Quiescent state counter. Value 0 indicates the thread is offline
  82         *   64b counter is used to avoid adding more code to address
  83         *   counter overflow. Changing this to 32b would require additional
  84         *   changes to various APIs.
  85         */
  86        uint32_t lock_cnt;
  87        /**< Lock counter. Used when RTE_LIBRTE_RCU_DEBUG is enabled */
  88} __rte_cache_aligned;
  89
  90#define __RTE_QSBR_CNT_THR_OFFLINE 0
  91#define __RTE_QSBR_CNT_INIT 1
  92#define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
  93#define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
  94
  95/* RTE Quiescent State variable structure.
  96 * This structure has two elements that vary in size based on the
  97 * 'max_threads' parameter.
  98 * 1) Quiescent state counter array
  99 * 2) Register thread ID array
 100 */
 101struct rte_rcu_qsbr {
 102        uint64_t token __rte_cache_aligned;
 103        /**< Counter to allow for multiple concurrent quiescent state queries */
 104        uint64_t acked_token;
 105        /**< Least token acked by all the threads in the last call to
 106         *   rte_rcu_qsbr_check API.
 107         */
 108
 109        uint32_t num_elems __rte_cache_aligned;
 110        /**< Number of elements in the thread ID array */
 111        uint32_t num_threads;
 112        /**< Number of threads currently using this QS variable */
 113        uint32_t max_threads;
 114        /**< Maximum number of threads using this QS variable */
 115
 116        struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
 117        /**< Quiescent state counter array of 'max_threads' elements */
 118
 119        /**< Registered thread IDs are stored in a bitmap array,
 120         *   after the quiescent state counter array.
 121         */
 122} __rte_cache_aligned;
 123
 124/**
 125 * Call back function called to free the resources.
 126 *
 127 * @param p
 128 *   Pointer provided while creating the defer queue
 129 * @param e
 130 *   Pointer to the resource data stored on the defer queue
 131 * @param n
 132 *   Number of resources to free. Currently, this is set to 1.
 133 *
 134 * @return
 135 *   None
 136 */
 137typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n);
 138
 139#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
 140
 141/**
 142 * Various flags supported.
 143 */
 144/**< Enqueue and reclaim operations are multi-thread safe by default.
 145 *   The call back functions registered to free the resources are
 146 *   assumed to be multi-thread safe.
 147 *   Set this flag if multi-thread safety is not required.
 148 */
 149#define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
 150
 151/**
 152 * Parameters used when creating the defer queue.
 153 */
 154struct rte_rcu_qsbr_dq_parameters {
 155        const char *name;
 156        /**< Name of the queue. */
 157        uint32_t flags;
 158        /**< Flags to control API behaviors */
 159        uint32_t size;
 160        /**< Number of entries in queue. Typically, this will be
 161         *   the same as the maximum number of entries supported in the
 162         *   lock free data structure.
 163         *   Data structures with unbounded number of entries is not
 164         *   supported currently.
 165         */
 166        uint32_t esize;
 167        /**< Size (in bytes) of each element in the defer queue.
 168         *   This has to be multiple of 4B.
 169         */
 170        uint32_t trigger_reclaim_limit;
 171        /**< Trigger automatic reclamation after the defer queue
 172         *   has at least these many resources waiting. This auto
 173         *   reclamation is triggered in rte_rcu_qsbr_dq_enqueue API
 174         *   call.
 175         *   If this is greater than 'size', auto reclamation is
 176         *   not triggered.
 177         *   If this is set to 0, auto reclamation is triggered
 178         *   in every call to rte_rcu_qsbr_dq_enqueue API.
 179         */
 180        uint32_t max_reclaim_size;
 181        /**< When automatic reclamation is enabled, reclaim at the max
 182         *   these many resources. This should contain a valid value, if
 183         *   auto reclamation is on. Setting this to 'size' or greater will
 184         *   reclaim all possible resources currently on the defer queue.
 185         */
 186        rte_rcu_qsbr_free_resource_t free_fn;
 187        /**< Function to call to free the resource. */
 188        void *p;
 189        /**< Pointer passed to the free function. Typically, this is the
 190         *   pointer to the data structure to which the resource to free
 191         *   belongs. This can be NULL.
 192         */
 193        struct rte_rcu_qsbr *v;
 194        /**< RCU QSBR variable to use for this defer queue */
 195};
 196
 197/* RTE defer queue structure.
 198 * This structure holds the defer queue. The defer queue is used to
 199 * hold the deleted entries from the data structure that are not
 200 * yet freed.
 201 */
 202struct rte_rcu_qsbr_dq;
 203
 204/**
 205 * Return the size of the memory occupied by a Quiescent State variable.
 206 *
 207 * @param max_threads
 208 *   Maximum number of threads reporting quiescent state on this variable.
 209 * @return
 210 *   On success - size of memory in bytes required for this QS variable.
 211 *   On error - 1 with error code set in rte_errno.
 212 *   Possible rte_errno codes are:
 213 *   - EINVAL - max_threads is 0
 214 */
 215size_t
 216rte_rcu_qsbr_get_memsize(uint32_t max_threads);
 217
 218/**
 219 * Initialize a Quiescent State (QS) variable.
 220 *
 221 * @param v
 222 *   QS variable
 223 * @param max_threads
 224 *   Maximum number of threads reporting quiescent state on this variable.
 225 *   This should be the same value as passed to rte_rcu_qsbr_get_memsize.
 226 * @return
 227 *   On success - 0
 228 *   On error - 1 with error code set in rte_errno.
 229 *   Possible rte_errno codes are:
 230 *   - EINVAL - max_threads is 0 or 'v' is NULL.
 231 *
 232 */
 233int
 234rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
 235
 236/**
 237 * Register a reader thread to report its quiescent state
 238 * on a QS variable.
 239 *
 240 * This is implemented as a lock-free function. It is multi-thread
 241 * safe.
 242 * Any reader thread that wants to report its quiescent state must
 243 * call this API. This can be called during initialization or as part
 244 * of the packet processing loop.
 245 *
 246 * Note that rte_rcu_qsbr_thread_online must be called before the
 247 * thread updates its quiescent state using rte_rcu_qsbr_quiescent.
 248 *
 249 * @param v
 250 *   QS variable
 251 * @param thread_id
 252 *   Reader thread with this thread ID will report its quiescent state on
 253 *   the QS variable. thread_id is a value between 0 and (max_threads - 1).
 254 *   'max_threads' is the parameter passed in 'rte_rcu_qsbr_init' API.
 255 */
 256int
 257rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
 258
 259/**
 260 * Remove a reader thread, from the list of threads reporting their
 261 * quiescent state on a QS variable.
 262 *
 263 * This is implemented as a lock-free function. It is multi-thread safe.
 264 * This API can be called from the reader threads during shutdown.
 265 * Ongoing quiescent state queries will stop waiting for the status from this
 266 * unregistered reader thread.
 267 *
 268 * @param v
 269 *   QS variable
 270 * @param thread_id
 271 *   Reader thread with this thread ID will stop reporting its quiescent
 272 *   state on the QS variable.
 273 */
 274int
 275rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
 276
 277/**
 278 * Add a registered reader thread, to the list of threads reporting their
 279 * quiescent state on a QS variable.
 280 *
 281 * This is implemented as a lock-free function. It is multi-thread
 282 * safe.
 283 *
 284 * Any registered reader thread that wants to report its quiescent state must
 285 * call this API before calling rte_rcu_qsbr_quiescent. This can be called
 286 * during initialization or as part of the packet processing loop.
 287 *
 288 * The reader thread must call rte_rcu_qsbr_thread_offline API, before
 289 * calling any functions that block, to ensure that rte_rcu_qsbr_check
 290 * API does not wait indefinitely for the reader thread to update its QS.
 291 *
 292 * The reader thread must call rte_rcu_thread_online API, after the blocking
 293 * function call returns, to ensure that rte_rcu_qsbr_check API
 294 * waits for the reader thread to update its quiescent state.
 295 *
 296 * @param v
 297 *   QS variable
 298 * @param thread_id
 299 *   Reader thread with this thread ID will report its quiescent state on
 300 *   the QS variable.
 301 */
 302static __rte_always_inline void
 303rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
 304{
 305        uint64_t t;
 306
 307        RTE_ASSERT(v != NULL && thread_id < v->max_threads);
 308
 309        __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
 310                                v->qsbr_cnt[thread_id].lock_cnt);
 311
 312        /* Copy the current value of token.
 313         * The fence at the end of the function will ensure that
 314         * the following will not move down after the load of any shared
 315         * data structure.
 316         */
 317        t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
 318
 319        /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
 320         * 'cnt' (64b) is accessed atomically.
 321         */
 322        __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
 323                t, __ATOMIC_RELAXED);
 324
 325        /* The subsequent load of the data structure should not
 326         * move above the store. Hence a store-load barrier
 327         * is required.
 328         * If the load of the data structure moves above the store,
 329         * writer might not see that the reader is online, even though
 330         * the reader is referencing the shared data structure.
 331         */
 332        rte_atomic_thread_fence(__ATOMIC_SEQ_CST);
 333}
 334
 335/**
 336 * Remove a registered reader thread from the list of threads reporting their
 337 * quiescent state on a QS variable.
 338 *
 339 * This is implemented as a lock-free function. It is multi-thread
 340 * safe.
 341 *
 342 * This can be called during initialization or as part of the packet
 343 * processing loop.
 344 *
 345 * The reader thread must call rte_rcu_qsbr_thread_offline API, before
 346 * calling any functions that block, to ensure that rte_rcu_qsbr_check
 347 * API does not wait indefinitely for the reader thread to update its QS.
 348 *
 349 * @param v
 350 *   QS variable
 351 * @param thread_id
 352 *   rte_rcu_qsbr_check API will not wait for the reader thread with
 353 *   this thread ID to report its quiescent state on the QS variable.
 354 */
 355static __rte_always_inline void
 356rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
 357{
 358        RTE_ASSERT(v != NULL && thread_id < v->max_threads);
 359
 360        __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
 361                                v->qsbr_cnt[thread_id].lock_cnt);
 362
 363        /* The reader can go offline only after the load of the
 364         * data structure is completed. i.e. any load of the
 365         * data structure can not move after this store.
 366         */
 367
 368        __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
 369                __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
 370}
 371
 372/**
 373 * Acquire a lock for accessing a shared data structure.
 374 *
 375 * This is implemented as a lock-free function. It is multi-thread
 376 * safe.
 377 *
 378 * This API is provided to aid debugging. This should be called before
 379 * accessing a shared data structure.
 380 *
 381 * When RTE_LIBRTE_RCU_DEBUG is enabled a lock counter is incremented.
 382 * Similarly rte_rcu_qsbr_unlock will decrement the counter. When the
 383 * rte_rcu_qsbr_check API will verify that this counter is 0.
 384 *
 385 * When RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing.
 386 *
 387 * @param v
 388 *   QS variable
 389 * @param thread_id
 390 *   Reader thread id
 391 */
 392static __rte_always_inline void
 393rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
 394                        __rte_unused unsigned int thread_id)
 395{
 396        RTE_ASSERT(v != NULL && thread_id < v->max_threads);
 397
 398#if defined(RTE_LIBRTE_RCU_DEBUG)
 399        /* Increment the lock counter */
 400        __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
 401                                1, __ATOMIC_ACQUIRE);
 402#endif
 403}
 404
 405/**
 406 * Release a lock after accessing a shared data structure.
 407 *
 408 * This is implemented as a lock-free function. It is multi-thread
 409 * safe.
 410 *
 411 * This API is provided to aid debugging. This should be called after
 412 * accessing a shared data structure.
 413 *
 414 * When RTE_LIBRTE_RCU_DEBUG is enabled, rte_rcu_qsbr_unlock will
 415 * decrement a lock counter. rte_rcu_qsbr_check API will verify that this
 416 * counter is 0.
 417 *
 418 * When RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing.
 419 *
 420 * @param v
 421 *   QS variable
 422 * @param thread_id
 423 *   Reader thread id
 424 */
 425static __rte_always_inline void
 426rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
 427                        __rte_unused unsigned int thread_id)
 428{
 429        RTE_ASSERT(v != NULL && thread_id < v->max_threads);
 430
 431#if defined(RTE_LIBRTE_RCU_DEBUG)
 432        /* Decrement the lock counter */
 433        __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
 434                                1, __ATOMIC_RELEASE);
 435
 436        __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
 437                                "Lock counter %u. Nested locks?\n",
 438                                v->qsbr_cnt[thread_id].lock_cnt);
 439#endif
 440}
 441
 442/**
 443 * Ask the reader threads to report the quiescent state
 444 * status.
 445 *
 446 * This is implemented as a lock-free function. It is multi-thread
 447 * safe and can be called from worker threads.
 448 *
 449 * @param v
 450 *   QS variable
 451 * @return
 452 *   - This is the token for this call of the API. This should be
 453 *     passed to rte_rcu_qsbr_check API.
 454 */
 455static __rte_always_inline uint64_t
 456rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
 457{
 458        uint64_t t;
 459
 460        RTE_ASSERT(v != NULL);
 461
 462        /* Release the changes to the shared data structure.
 463         * This store release will ensure that changes to any data
 464         * structure are visible to the workers before the token
 465         * update is visible.
 466         */
 467        t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
 468
 469        return t;
 470}
 471
 472/**
 473 * Update quiescent state for a reader thread.
 474 *
 475 * This is implemented as a lock-free function. It is multi-thread safe.
 476 * All the reader threads registered to report their quiescent state
 477 * on the QS variable must call this API.
 478 *
 479 * @param v
 480 *   QS variable
 481 * @param thread_id
 482 *   Update the quiescent state for the reader with this thread ID.
 483 */
 484static __rte_always_inline void
 485rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
 486{
 487        uint64_t t;
 488
 489        RTE_ASSERT(v != NULL && thread_id < v->max_threads);
 490
 491        __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
 492                                v->qsbr_cnt[thread_id].lock_cnt);
 493
 494        /* Acquire the changes to the shared data structure released
 495         * by rte_rcu_qsbr_start.
 496         * Later loads of the shared data structure should not move
 497         * above this load. Hence, use load-acquire.
 498         */
 499        t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
 500
 501        /* Check if there are updates available from the writer.
 502         * Inform the writer that updates are visible to this reader.
 503         * Prior loads of the shared data structure should not move
 504         * beyond this store. Hence use store-release.
 505         */
 506        if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED))
 507                __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
 508                                         t, __ATOMIC_RELEASE);
 509
 510        __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %" PRIu64 ", Thread ID = %d",
 511                __func__, t, thread_id);
 512}
 513
 514/* Check the quiescent state counter for registered threads only, assuming
 515 * that not all threads have registered.
 516 */
 517static __rte_always_inline int
 518__rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
 519{
 520        uint32_t i, j, id;
 521        uint64_t bmap;
 522        uint64_t c;
 523        uint64_t *reg_thread_id;
 524        uint64_t acked_token = __RTE_QSBR_CNT_MAX;
 525
 526        for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
 527                i < v->num_elems;
 528                i++, reg_thread_id++) {
 529                /* Load the current registered thread bit map before
 530                 * loading the reader thread quiescent state counters.
 531                 */
 532                bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
 533                id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
 534
 535                while (bmap) {
 536                        j = __builtin_ctzl(bmap);
 537                        __RTE_RCU_DP_LOG(DEBUG,
 538                                "%s: check: token = %" PRIu64 ", wait = %d, Bit Map = 0x%" PRIx64 ", Thread ID = %d",
 539                                __func__, t, wait, bmap, id + j);
 540                        c = __atomic_load_n(
 541                                        &v->qsbr_cnt[id + j].cnt,
 542                                        __ATOMIC_ACQUIRE);
 543                        __RTE_RCU_DP_LOG(DEBUG,
 544                                "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
 545                                __func__, t, wait, c, id+j);
 546
 547                        /* Counter is not checked for wrap-around condition
 548                         * as it is a 64b counter.
 549                         */
 550                        if (unlikely(c !=
 551                                __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
 552                                /* This thread is not in quiescent state */
 553                                if (!wait)
 554                                        return 0;
 555
 556                                rte_pause();
 557                                /* This thread might have unregistered.
 558                                 * Re-read the bitmap.
 559                                 */
 560                                bmap = __atomic_load_n(reg_thread_id,
 561                                                __ATOMIC_ACQUIRE);
 562
 563                                continue;
 564                        }
 565
 566                        /* This thread is in quiescent state. Use the counter
 567                         * to find the least acknowledged token among all the
 568                         * readers.
 569                         */
 570                        if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
 571                                acked_token = c;
 572
 573                        bmap &= ~(1UL << j);
 574                }
 575        }
 576
 577        /* All readers are checked, update least acknowledged token.
 578         * There might be multiple writers trying to update this. There is
 579         * no need to update this very accurately using compare-and-swap.
 580         */
 581        if (acked_token != __RTE_QSBR_CNT_MAX)
 582                __atomic_store_n(&v->acked_token, acked_token,
 583                        __ATOMIC_RELAXED);
 584
 585        return 1;
 586}
 587
 588/* Check the quiescent state counter for all threads, assuming that
 589 * all the threads have registered.
 590 */
 591static __rte_always_inline int
 592__rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
 593{
 594        uint32_t i;
 595        struct rte_rcu_qsbr_cnt *cnt;
 596        uint64_t c;
 597        uint64_t acked_token = __RTE_QSBR_CNT_MAX;
 598
 599        for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
 600                __RTE_RCU_DP_LOG(DEBUG,
 601                        "%s: check: token = %" PRIu64 ", wait = %d, Thread ID = %d",
 602                        __func__, t, wait, i);
 603                while (1) {
 604                        c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
 605                        __RTE_RCU_DP_LOG(DEBUG,
 606                                "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
 607                                __func__, t, wait, c, i);
 608
 609                        /* Counter is not checked for wrap-around condition
 610                         * as it is a 64b counter.
 611                         */
 612                        if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
 613                                break;
 614
 615                        /* This thread is not in quiescent state */
 616                        if (!wait)
 617                                return 0;
 618
 619                        rte_pause();
 620                }
 621
 622                /* This thread is in quiescent state. Use the counter to find
 623                 * the least acknowledged token among all the readers.
 624                 */
 625                if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
 626                        acked_token = c;
 627        }
 628
 629        /* All readers are checked, update least acknowledged token.
 630         * There might be multiple writers trying to update this. There is
 631         * no need to update this very accurately using compare-and-swap.
 632         */
 633        if (acked_token != __RTE_QSBR_CNT_MAX)
 634                __atomic_store_n(&v->acked_token, acked_token,
 635                        __ATOMIC_RELAXED);
 636
 637        return 1;
 638}
 639
 640/**
 641 * Checks if all the reader threads have entered the quiescent state
 642 * referenced by token.
 643 *
 644 * This is implemented as a lock-free function. It is multi-thread
 645 * safe and can be called from the worker threads as well.
 646 *
 647 * If this API is called with 'wait' set to true, the following
 648 * factors must be considered:
 649 *
 650 * 1) If the calling thread is also reporting the status on the
 651 * same QS variable, it must update the quiescent state status, before
 652 * calling this API.
 653 *
 654 * 2) In addition, while calling from multiple threads, only
 655 * one of those threads can be reporting the quiescent state status
 656 * on a given QS variable.
 657 *
 658 * @param v
 659 *   QS variable
 660 * @param t
 661 *   Token returned by rte_rcu_qsbr_start API
 662 * @param wait
 663 *   If true, block till all the reader threads have completed entering
 664 *   the quiescent state referenced by token 't'.
 665 * @return
 666 *   - 0 if all reader threads have NOT passed through specified number
 667 *     of quiescent states.
 668 *   - 1 if all reader threads have passed through specified number
 669 *     of quiescent states.
 670 */
 671static __rte_always_inline int
 672rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
 673{
 674        RTE_ASSERT(v != NULL);
 675
 676        /* Check if all the readers have already acknowledged this token */
 677        if (likely(t <= v->acked_token)) {
 678                __RTE_RCU_DP_LOG(DEBUG,
 679                        "%s: check: token = %" PRIu64 ", wait = %d",
 680                        __func__, t, wait);
 681                __RTE_RCU_DP_LOG(DEBUG,
 682                        "%s: status: least acked token = %" PRIu64,
 683                        __func__, v->acked_token);
 684                return 1;
 685        }
 686
 687        if (likely(v->num_threads == v->max_threads))
 688                return __rte_rcu_qsbr_check_all(v, t, wait);
 689        else
 690                return __rte_rcu_qsbr_check_selective(v, t, wait);
 691}
 692
 693/**
 694 * Wait till the reader threads have entered quiescent state.
 695 *
 696 * This is implemented as a lock-free function. It is multi-thread safe.
 697 * This API can be thought of as a wrapper around rte_rcu_qsbr_start and
 698 * rte_rcu_qsbr_check APIs.
 699 *
 700 * If this API is called from multiple threads, only one of
 701 * those threads can be reporting the quiescent state status on a
 702 * given QS variable.
 703 *
 704 * @param v
 705 *   QS variable
 706 * @param thread_id
 707 *   Thread ID of the caller if it is registered to report quiescent state
 708 *   on this QS variable (i.e. the calling thread is also part of the
 709 *   readside critical section). If not, pass RTE_QSBR_THRID_INVALID.
 710 */
 711void
 712rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
 713
 714/**
 715 * Dump the details of a single QS variables to a file.
 716 *
 717 * It is NOT multi-thread safe.
 718 *
 719 * @param f
 720 *   A pointer to a file for output
 721 * @param v
 722 *   QS variable
 723 * @return
 724 *   On success - 0
 725 *   On error - 1 with error code set in rte_errno.
 726 *   Possible rte_errno codes are:
 727 *   - EINVAL - NULL parameters are passed
 728 */
 729int
 730rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
 731
 732/**
 733 * @warning
 734 * @b EXPERIMENTAL: this API may change without prior notice
 735 *
 736 * Create a queue used to store the data structure elements that can
 737 * be freed later. This queue is referred to as 'defer queue'.
 738 *
 739 * @param params
 740 *   Parameters to create a defer queue.
 741 * @return
 742 *   On success - Valid pointer to defer queue
 743 *   On error - NULL
 744 *   Possible rte_errno codes are:
 745 *   - EINVAL - NULL parameters are passed
 746 *   - ENOMEM - Not enough memory
 747 */
 748__rte_experimental
 749struct rte_rcu_qsbr_dq *
 750rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params);
 751
 752/**
 753 * @warning
 754 * @b EXPERIMENTAL: this API may change without prior notice
 755 *
 756 * Enqueue one resource to the defer queue and start the grace period.
 757 * The resource will be freed later after at least one grace period
 758 * is over.
 759 *
 760 * If the defer queue is full, it will attempt to reclaim resources.
 761 * It will also reclaim resources at regular intervals to avoid
 762 * the defer queue from growing too big.
 763 *
 764 * Multi-thread safety is provided as the defer queue configuration.
 765 * When multi-thread safety is requested, it is possible that the
 766 * resources are not stored in their order of deletion. This results
 767 * in resources being held in the defer queue longer than they should.
 768 *
 769 * @param dq
 770 *   Defer queue to allocate an entry from.
 771 * @param e
 772 *   Pointer to resource data to copy to the defer queue. The size of
 773 *   the data to copy is equal to the element size provided when the
 774 *   defer queue was created.
 775 * @return
 776 *   On success - 0
 777 *   On error - 1 with rte_errno set to
 778 *   - EINVAL - NULL parameters are passed
 779 *   - ENOSPC - Defer queue is full. This condition can not happen
 780 *              if the defer queue size is equal (or larger) than the
 781 *              number of elements in the data structure.
 782 */
 783__rte_experimental
 784int
 785rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
 786
 787/**
 788 * @warning
 789 * @b EXPERIMENTAL: this API may change without prior notice
 790 *
 791 * Free resources from the defer queue.
 792 *
 793 * This API is multi-thread safe.
 794 *
 795 * @param dq
 796 *   Defer queue to free an entry from.
 797 * @param n
 798 *   Maximum number of resources to free.
 799 * @param freed
 800 *   Number of resources that were freed.
 801 * @param pending
 802 *   Number of resources pending on the defer queue. This number might not
 803 *   be accurate if multi-thread safety is configured.
 804 * @param available
 805 *   Number of resources that can be added to the defer queue.
 806 *   This number might not be accurate if multi-thread safety is configured.
 807 * @return
 808 *   On successful reclamation of at least 1 resource - 0
 809 *   On error - 1 with rte_errno set to
 810 *   - EINVAL - NULL parameters are passed
 811 */
 812__rte_experimental
 813int
 814rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
 815        unsigned int *freed, unsigned int *pending, unsigned int *available);
 816
 817/**
 818 * @warning
 819 * @b EXPERIMENTAL: this API may change without prior notice
 820 *
 821 * Delete a defer queue.
 822 *
 823 * It tries to reclaim all the resources on the defer queue.
 824 * If any of the resources have not completed the grace period
 825 * the reclamation stops and returns immediately. The rest of
 826 * the resources are not reclaimed and the defer queue is not
 827 * freed.
 828 *
 829 * @param dq
 830 *   Defer queue to delete.
 831 * @return
 832 *   On success - 0
 833 *   On error - 1
 834 *   Possible rte_errno codes are:
 835 *   - EAGAIN - Some of the resources have not completed at least 1 grace
 836 *              period, try again.
 837 */
 838__rte_experimental
 839int
 840rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
 841
 842#ifdef __cplusplus
 843}
 844#endif
 845
 846#endif /* _RTE_RCU_QSBR_H_ */
 847