dpdk/lib/rcu/rte_rcu_qsbr.c
<<
>>
Prefs
   1/* SPDX-License-Identifier: BSD-3-Clause
   2 *
   3 * Copyright (c) 2018-2020 Arm Limited
   4 */
   5
   6#include <stdio.h>
   7#include <string.h>
   8#include <stdint.h>
   9#include <inttypes.h>
  10#include <errno.h>
  11
  12#include <rte_common.h>
  13#include <rte_log.h>
  14#include <rte_memory.h>
  15#include <rte_malloc.h>
  16#include <rte_eal.h>
  17#include <rte_atomic.h>
  18#include <rte_per_lcore.h>
  19#include <rte_lcore.h>
  20#include <rte_errno.h>
  21#include <rte_ring_elem.h>
  22
  23#include "rte_rcu_qsbr.h"
  24#include "rcu_qsbr_pvt.h"
  25
  26/* Get the memory size of QSBR variable */
  27size_t
  28rte_rcu_qsbr_get_memsize(uint32_t max_threads)
  29{
  30        size_t sz;
  31
  32        if (max_threads == 0) {
  33                rte_log(RTE_LOG_ERR, rte_rcu_log_type,
  34                        "%s(): Invalid max_threads %u\n",
  35                        __func__, max_threads);
  36                rte_errno = EINVAL;
  37
  38                return 1;
  39        }
  40
  41        sz = sizeof(struct rte_rcu_qsbr);
  42
  43        /* Add the size of quiescent state counter array */
  44        sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
  45
  46        /* Add the size of the registered thread ID bitmap array */
  47        sz += __RTE_QSBR_THRID_ARRAY_SIZE(max_threads);
  48
  49        return sz;
  50}
  51
  52/* Initialize a quiescent state variable */
  53int
  54rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
  55{
  56        size_t sz;
  57
  58        if (v == NULL) {
  59                rte_log(RTE_LOG_ERR, rte_rcu_log_type,
  60                        "%s(): Invalid input parameter\n", __func__);
  61                rte_errno = EINVAL;
  62
  63                return 1;
  64        }
  65
  66        sz = rte_rcu_qsbr_get_memsize(max_threads);
  67        if (sz == 1)
  68                return 1;
  69
  70        /* Set all the threads to offline */
  71        memset(v, 0, sz);
  72        v->max_threads = max_threads;
  73        v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
  74                        __RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
  75                        __RTE_QSBR_THRID_ARRAY_ELM_SIZE;
  76        v->token = __RTE_QSBR_CNT_INIT;
  77        v->acked_token = __RTE_QSBR_CNT_INIT - 1;
  78
  79        return 0;
  80}
  81
  82/* Register a reader thread to report its quiescent state
  83 * on a QS variable.
  84 */
  85int
  86rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
  87{
  88        unsigned int i, id, success;
  89        uint64_t old_bmap, new_bmap;
  90
  91        if (v == NULL || thread_id >= v->max_threads) {
  92                rte_log(RTE_LOG_ERR, rte_rcu_log_type,
  93                        "%s(): Invalid input parameter\n", __func__);
  94                rte_errno = EINVAL;
  95
  96                return 1;
  97        }
  98
  99        __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
 100                                v->qsbr_cnt[thread_id].lock_cnt);
 101
 102        id = thread_id & __RTE_QSBR_THRID_MASK;
 103        i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
 104
 105        /* Make sure that the counter for registered threads does not
 106         * go out of sync. Hence, additional checks are required.
 107         */
 108        /* Check if the thread is already registered */
 109        old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
 110                                        __ATOMIC_RELAXED);
 111        if (old_bmap & 1UL << id)
 112                return 0;
 113
 114        do {
 115                new_bmap = old_bmap | (1UL << id);
 116                success = __atomic_compare_exchange(
 117                                        __RTE_QSBR_THRID_ARRAY_ELM(v, i),
 118                                        &old_bmap, &new_bmap, 0,
 119                                        __ATOMIC_RELEASE, __ATOMIC_RELAXED);
 120
 121                if (success)
 122                        __atomic_fetch_add(&v->num_threads,
 123                                                1, __ATOMIC_RELAXED);
 124                else if (old_bmap & (1UL << id))
 125                        /* Someone else registered this thread.
 126                         * Counter should not be incremented.
 127                         */
 128                        return 0;
 129        } while (success == 0);
 130
 131        return 0;
 132}
 133
 134/* Remove a reader thread, from the list of threads reporting their
 135 * quiescent state on a QS variable.
 136 */
 137int
 138rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
 139{
 140        unsigned int i, id, success;
 141        uint64_t old_bmap, new_bmap;
 142
 143        if (v == NULL || thread_id >= v->max_threads) {
 144                rte_log(RTE_LOG_ERR, rte_rcu_log_type,
 145                        "%s(): Invalid input parameter\n", __func__);
 146                rte_errno = EINVAL;
 147
 148                return 1;
 149        }
 150
 151        __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
 152                                v->qsbr_cnt[thread_id].lock_cnt);
 153
 154        id = thread_id & __RTE_QSBR_THRID_MASK;
 155        i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
 156
 157        /* Make sure that the counter for registered threads does not
 158         * go out of sync. Hence, additional checks are required.
 159         */
 160        /* Check if the thread is already unregistered */
 161        old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
 162                                        __ATOMIC_RELAXED);
 163        if (!(old_bmap & (1UL << id)))
 164                return 0;
 165
 166        do {
 167                new_bmap = old_bmap & ~(1UL << id);
 168                /* Make sure any loads of the shared data structure are
 169                 * completed before removal of the thread from the list of
 170                 * reporting threads.
 171                 */
 172                success = __atomic_compare_exchange(
 173                                        __RTE_QSBR_THRID_ARRAY_ELM(v, i),
 174                                        &old_bmap, &new_bmap, 0,
 175                                        __ATOMIC_RELEASE, __ATOMIC_RELAXED);
 176
 177                if (success)
 178                        __atomic_fetch_sub(&v->num_threads,
 179                                                1, __ATOMIC_RELAXED);
 180                else if (!(old_bmap & (1UL << id)))
 181                        /* Someone else unregistered this thread.
 182                         * Counter should not be incremented.
 183                         */
 184                        return 0;
 185        } while (success == 0);
 186
 187        return 0;
 188}
 189
 190/* Wait till the reader threads have entered quiescent state. */
 191void
 192rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
 193{
 194        uint64_t t;
 195
 196        RTE_ASSERT(v != NULL);
 197
 198        t = rte_rcu_qsbr_start(v);
 199
 200        /* If the current thread has readside critical section,
 201         * update its quiescent state status.
 202         */
 203        if (thread_id != RTE_QSBR_THRID_INVALID)
 204                rte_rcu_qsbr_quiescent(v, thread_id);
 205
 206        /* Wait for other readers to enter quiescent state */
 207        rte_rcu_qsbr_check(v, t, true);
 208}
 209
 210/* Dump the details of a single quiescent state variable to a file. */
 211int
 212rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
 213{
 214        uint64_t bmap;
 215        uint32_t i, t, id;
 216
 217        if (v == NULL || f == NULL) {
 218                rte_log(RTE_LOG_ERR, rte_rcu_log_type,
 219                        "%s(): Invalid input parameter\n", __func__);
 220                rte_errno = EINVAL;
 221
 222                return 1;
 223        }
 224
 225        fprintf(f, "\nQuiescent State Variable @%p\n", v);
 226
 227        fprintf(f, "  QS variable memory size = %zu\n",
 228                                rte_rcu_qsbr_get_memsize(v->max_threads));
 229        fprintf(f, "  Given # max threads = %u\n", v->max_threads);
 230        fprintf(f, "  Current # threads = %u\n", v->num_threads);
 231
 232        fprintf(f, "  Registered thread IDs = ");
 233        for (i = 0; i < v->num_elems; i++) {
 234                bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
 235                                        __ATOMIC_ACQUIRE);
 236                id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
 237                while (bmap) {
 238                        t = __builtin_ctzl(bmap);
 239                        fprintf(f, "%u ", id + t);
 240
 241                        bmap &= ~(1UL << t);
 242                }
 243        }
 244
 245        fprintf(f, "\n");
 246
 247        fprintf(f, "  Token = %" PRIu64 "\n",
 248                        __atomic_load_n(&v->token, __ATOMIC_ACQUIRE));
 249
 250        fprintf(f, "  Least Acknowledged Token = %" PRIu64 "\n",
 251                        __atomic_load_n(&v->acked_token, __ATOMIC_ACQUIRE));
 252
 253        fprintf(f, "Quiescent State Counts for readers:\n");
 254        for (i = 0; i < v->num_elems; i++) {
 255                bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
 256                                        __ATOMIC_ACQUIRE);
 257                id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
 258                while (bmap) {
 259                        t = __builtin_ctzl(bmap);
 260                        fprintf(f, "thread ID = %u, count = %" PRIu64 ", lock count = %u\n",
 261                                id + t,
 262                                __atomic_load_n(
 263                                        &v->qsbr_cnt[id + t].cnt,
 264                                        __ATOMIC_RELAXED),
 265                                __atomic_load_n(
 266                                        &v->qsbr_cnt[id + t].lock_cnt,
 267                                        __ATOMIC_RELAXED));
 268                        bmap &= ~(1UL << t);
 269                }
 270        }
 271
 272        return 0;
 273}
 274
 275/* Create a queue used to store the data structure elements that can
 276 * be freed later. This queue is referred to as 'defer queue'.
 277 */
 278struct rte_rcu_qsbr_dq *
 279rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
 280{
 281        struct rte_rcu_qsbr_dq *dq;
 282        uint32_t qs_fifo_size;
 283        unsigned int flags;
 284
 285        if (params == NULL || params->free_fn == NULL ||
 286                params->v == NULL || params->name == NULL ||
 287                params->size == 0 || params->esize == 0 ||
 288                (params->esize % 4 != 0)) {
 289                rte_log(RTE_LOG_ERR, rte_rcu_log_type,
 290                        "%s(): Invalid input parameter\n", __func__);
 291                rte_errno = EINVAL;
 292
 293                return NULL;
 294        }
 295        /* If auto reclamation is configured, reclaim limit
 296         * should be a valid value.
 297         */
 298        if ((params->trigger_reclaim_limit <= params->size) &&
 299            (params->max_reclaim_size == 0)) {
 300                rte_log(RTE_LOG_ERR, rte_rcu_log_type,
 301                        "%s(): Invalid input parameter, size = %u, trigger_reclaim_limit = %u, max_reclaim_size = %u\n",
 302                        __func__, params->size, params->trigger_reclaim_limit,
 303                        params->max_reclaim_size);
 304                rte_errno = EINVAL;
 305
 306                return NULL;
 307        }
 308
 309        dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq),
 310                         RTE_CACHE_LINE_SIZE);
 311        if (dq == NULL) {
 312                rte_errno = ENOMEM;
 313
 314                return NULL;
 315        }
 316
 317        /* Decide the flags for the ring.
 318         * If MT safety is requested, use RTS for ring enqueue as most
 319         * use cases involve dq-enqueue happening on the control plane.
 320         * Ring dequeue is always HTS due to the possibility of revert.
 321         */
 322        flags = RING_F_MP_RTS_ENQ;
 323        if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE)
 324                flags = RING_F_SP_ENQ;
 325        flags |= RING_F_MC_HTS_DEQ;
 326        /* round up qs_fifo_size to next power of two that is not less than
 327         * max_size.
 328         */
 329        qs_fifo_size = rte_align32pow2(params->size + 1);
 330        /* Add token size to ring element size */
 331        dq->r = rte_ring_create_elem(params->name,
 332                        __RTE_QSBR_TOKEN_SIZE + params->esize,
 333                        qs_fifo_size, SOCKET_ID_ANY, flags);
 334        if (dq->r == NULL) {
 335                rte_log(RTE_LOG_ERR, rte_rcu_log_type,
 336                        "%s(): defer queue create failed\n", __func__);
 337                rte_free(dq);
 338                return NULL;
 339        }
 340
 341        dq->v = params->v;
 342        dq->size = params->size;
 343        dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize;
 344        dq->trigger_reclaim_limit = params->trigger_reclaim_limit;
 345        dq->max_reclaim_size = params->max_reclaim_size;
 346        dq->free_fn = params->free_fn;
 347        dq->p = params->p;
 348
 349        return dq;
 350}
 351
 352/* Enqueue one resource to the defer queue to free after the grace
 353 * period is over.
 354 */
 355int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
 356{
 357        __rte_rcu_qsbr_dq_elem_t *dq_elem;
 358        uint32_t cur_size;
 359
 360        if (dq == NULL || e == NULL) {
 361                rte_log(RTE_LOG_ERR, rte_rcu_log_type,
 362                        "%s(): Invalid input parameter\n", __func__);
 363                rte_errno = EINVAL;
 364
 365                return 1;
 366        }
 367
 368        char data[dq->esize];
 369        dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
 370        /* Start the grace period */
 371        dq_elem->token = rte_rcu_qsbr_start(dq->v);
 372
 373        /* Reclaim resources if the queue size has hit the reclaim
 374         * limit. This helps the queue from growing too large and
 375         * allows time for reader threads to report their quiescent state.
 376         */
 377        cur_size = rte_ring_count(dq->r);
 378        if (cur_size > dq->trigger_reclaim_limit) {
 379                rte_log(RTE_LOG_INFO, rte_rcu_log_type,
 380                        "%s(): Triggering reclamation\n", __func__);
 381                rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size,
 382                                                NULL, NULL, NULL);
 383        }
 384
 385        /* Enqueue the token and resource. Generating the token and
 386         * enqueuing (token + resource) on the queue is not an
 387         * atomic operation. When the defer queue is shared by multiple
 388         * writers, this might result in tokens enqueued out of order
 389         * on the queue. So, some tokens might wait longer than they
 390         * are required to be reclaimed.
 391         */
 392        memcpy(dq_elem->elem, e, dq->esize - __RTE_QSBR_TOKEN_SIZE);
 393        /* Check the status as enqueue might fail since the other threads
 394         * might have used up the freed space.
 395         * Enqueue uses the configured flags when the DQ was created.
 396         */
 397        if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
 398                rte_log(RTE_LOG_ERR, rte_rcu_log_type,
 399                        "%s(): Enqueue failed\n", __func__);
 400                /* Note that the token generated above is not used.
 401                 * Other than wasting tokens, it should not cause any
 402                 * other issues.
 403                 */
 404                rte_log(RTE_LOG_INFO, rte_rcu_log_type,
 405                        "%s(): Skipped enqueuing token = %" PRIu64 "\n",
 406                        __func__, dq_elem->token);
 407
 408                rte_errno = ENOSPC;
 409                return 1;
 410        }
 411
 412        rte_log(RTE_LOG_INFO, rte_rcu_log_type,
 413                "%s(): Enqueued token = %" PRIu64 "\n",
 414                __func__, dq_elem->token);
 415
 416        return 0;
 417}
 418
 419/* Reclaim resources from the defer queue. */
 420int
 421rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
 422                        unsigned int *freed, unsigned int *pending,
 423                        unsigned int *available)
 424{
 425        uint32_t cnt;
 426        __rte_rcu_qsbr_dq_elem_t *dq_elem;
 427
 428        if (dq == NULL || n == 0) {
 429                rte_log(RTE_LOG_ERR, rte_rcu_log_type,
 430                        "%s(): Invalid input parameter\n", __func__);
 431                rte_errno = EINVAL;
 432
 433                return 1;
 434        }
 435
 436        cnt = 0;
 437
 438        char data[dq->esize];
 439        /* Check reader threads quiescent state and reclaim resources */
 440        while (cnt < n &&
 441                rte_ring_dequeue_bulk_elem_start(dq->r, &data,
 442                                        dq->esize, 1, available) != 0) {
 443                dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
 444
 445                /* Reclaim the resource */
 446                if (rte_rcu_qsbr_check(dq->v, dq_elem->token, false) != 1) {
 447                        rte_ring_dequeue_elem_finish(dq->r, 0);
 448                        break;
 449                }
 450                rte_ring_dequeue_elem_finish(dq->r, 1);
 451
 452                rte_log(RTE_LOG_INFO, rte_rcu_log_type,
 453                        "%s(): Reclaimed token = %" PRIu64 "\n",
 454                        __func__, dq_elem->token);
 455
 456                dq->free_fn(dq->p, dq_elem->elem, 1);
 457
 458                cnt++;
 459        }
 460
 461        rte_log(RTE_LOG_INFO, rte_rcu_log_type,
 462                "%s(): Reclaimed %u resources\n", __func__, cnt);
 463
 464        if (freed != NULL)
 465                *freed = cnt;
 466        if (pending != NULL)
 467                *pending = rte_ring_count(dq->r);
 468
 469        return 0;
 470}
 471
 472/* Delete a defer queue. */
 473int
 474rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
 475{
 476        unsigned int pending;
 477
 478        if (dq == NULL) {
 479                rte_log(RTE_LOG_DEBUG, rte_rcu_log_type,
 480                        "%s(): Invalid input parameter\n", __func__);
 481
 482                return 0;
 483        }
 484
 485        /* Reclaim all the resources */
 486        rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending, NULL);
 487        if (pending != 0) {
 488                rte_errno = EAGAIN;
 489
 490                return 1;
 491        }
 492
 493        rte_ring_free(dq->r);
 494        rte_free(dq);
 495
 496        return 0;
 497}
 498
 499RTE_LOG_REGISTER_DEFAULT(rte_rcu_log_type, ERR);
 500