dpdk/examples/performance-thread/common/lthread_pool.h
<<
>>
Prefs
   1/*
   2 * SPDX-License-Identifier: BSD-3-Clause
   3 * Copyright 2015 Intel Corporation.
   4 * Copyright 2010-2011 Dmitry Vyukov
   5 */
   6
   7#ifndef LTHREAD_POOL_H_
   8#define LTHREAD_POOL_H_
   9
  10#ifdef __cplusplus
  11extern "C" {
  12#endif
  13
  14#include <rte_malloc.h>
  15#include <rte_per_lcore.h>
  16#include <rte_log.h>
  17
  18#include "lthread_int.h"
  19#include "lthread_diag.h"
  20
  21/*
  22 * This file implements pool of queue nodes used by the queue implemented
  23 * in lthread_queue.h.
  24 *
  25 * The pool is an intrusive lock free MPSC queue.
  26 *
  27 * The pool is created empty and populated lazily, i.e. on first attempt to
  28 * allocate a the pool.
  29 *
  30 * Whenever the pool is empty more nodes are added to the pool
  31 * The number of nodes preallocated in this way is a parameter of
  32 * _qnode_pool_create. Freeing an object returns it to the pool.
  33 *
  34 * Each lthread scheduler maintains its own pool of nodes. L-threads must always
  35 * allocate from this local pool ( because it is a single consumer queue ).
  36 * L-threads can free nodes to any pool (because it is a multi producer queue)
  37 * This enables threads that have affined to a different scheduler to free
  38 * nodes safely.
  39 */
  40
  41struct qnode;
  42struct qnode_cache;
  43
  44/*
  45 * define intermediate node
  46 */
  47struct qnode {
  48        struct qnode *next;
  49        void *data;
  50        struct qnode_pool *pool;
  51} __rte_cache_aligned;
  52
  53/*
  54 * a pool structure
  55 */
  56struct qnode_pool {
  57        struct qnode *head;
  58        struct qnode *stub;
  59        struct qnode *fast_alloc;
  60        struct qnode *tail __rte_cache_aligned;
  61        int pre_alloc;
  62        char name[LT_MAX_NAME_SIZE];
  63
  64        DIAG_COUNT_DEFINE(rd);
  65        DIAG_COUNT_DEFINE(wr);
  66        DIAG_COUNT_DEFINE(available);
  67        DIAG_COUNT_DEFINE(prealloc);
  68        DIAG_COUNT_DEFINE(capacity);
  69} __rte_cache_aligned;
  70
  71/*
  72 * Create a pool of qnodes
  73 */
  74
  75static inline struct qnode_pool *
  76_qnode_pool_create(const char *name, int prealloc_size) {
  77
  78        struct qnode_pool *p = rte_malloc_socket(NULL,
  79                                        sizeof(struct qnode_pool),
  80                                        RTE_CACHE_LINE_SIZE,
  81                                        rte_socket_id());
  82
  83        RTE_ASSERT(p);
  84
  85        p->stub = rte_malloc_socket(NULL,
  86                                sizeof(struct qnode),
  87                                RTE_CACHE_LINE_SIZE,
  88                                rte_socket_id());
  89
  90        RTE_ASSERT(p->stub);
  91
  92        if (name != NULL)
  93                strncpy(p->name, name, LT_MAX_NAME_SIZE);
  94        p->name[sizeof(p->name)-1] = 0;
  95
  96        p->stub->pool = p;
  97        p->stub->next = NULL;
  98        p->tail = p->stub;
  99        p->head = p->stub;
 100        p->pre_alloc = prealloc_size;
 101
 102        DIAG_COUNT_INIT(p, rd);
 103        DIAG_COUNT_INIT(p, wr);
 104        DIAG_COUNT_INIT(p, available);
 105        DIAG_COUNT_INIT(p, prealloc);
 106        DIAG_COUNT_INIT(p, capacity);
 107
 108        return p;
 109}
 110
 111
 112/*
 113 * Insert a node into the pool
 114 */
 115static __rte_always_inline void
 116_qnode_pool_insert(struct qnode_pool *p, struct qnode *n)
 117{
 118        n->next = NULL;
 119        struct qnode *prev = n;
 120        /* We insert at the head */
 121        prev = (struct qnode *) __sync_lock_test_and_set((uint64_t *)&p->head,
 122                                                (uint64_t) prev);
 123        /* there is a window of inconsistency until prev next is set */
 124        /* which is why remove must retry */
 125        prev->next = (n);
 126}
 127
 128/*
 129 * Remove a node from the pool
 130 *
 131 * There is a race with _qnode_pool_insert() whereby the queue could appear
 132 * empty during a concurrent insert, this is handled by retrying
 133 *
 134 * The queue uses a stub node, which must be swung as the queue becomes
 135 * empty, this requires an insert of the stub, which means that removing the
 136 * last item from the queue incurs the penalty of an atomic exchange. Since the
 137 * pool is maintained with a bulk pre-allocation the cost of this is amortised.
 138 */
 139static __rte_always_inline struct qnode *
 140_pool_remove(struct qnode_pool *p)
 141{
 142        struct qnode *head;
 143        struct qnode *tail = p->tail;
 144        struct qnode *next = tail->next;
 145
 146        /* we remove from the tail */
 147        if (tail == p->stub) {
 148                if (next == NULL)
 149                        return NULL;
 150                /* advance the tail */
 151                p->tail = next;
 152                tail = next;
 153                next = next->next;
 154        }
 155        if (likely(next != NULL)) {
 156                p->tail = next;
 157                return tail;
 158        }
 159
 160        head = p->head;
 161        if (tail == head)
 162                return NULL;
 163
 164        /* swing stub node */
 165        _qnode_pool_insert(p, p->stub);
 166
 167        next = tail->next;
 168        if (next) {
 169                p->tail = next;
 170                return tail;
 171        }
 172        return NULL;
 173}
 174
 175
 176/*
 177 * This adds a retry to the _pool_remove function
 178 * defined above
 179 */
 180static __rte_always_inline struct qnode *
 181_qnode_pool_remove(struct qnode_pool *p)
 182{
 183        struct qnode *n;
 184
 185        do {
 186                n = _pool_remove(p);
 187                if (likely(n != NULL))
 188                        return n;
 189
 190                rte_compiler_barrier();
 191        }  while ((p->head != p->tail) &&
 192                        (p->tail != p->stub));
 193        return NULL;
 194}
 195
 196/*
 197 * Allocate a node from the pool
 198 * If the pool is empty add mode nodes
 199 */
 200static __rte_always_inline struct qnode *
 201_qnode_alloc(void)
 202{
 203        struct qnode_pool *p = (THIS_SCHED)->qnode_pool;
 204        int prealloc_size = p->pre_alloc;
 205        struct qnode *n;
 206        int i;
 207
 208        if (likely(p->fast_alloc != NULL)) {
 209                n = p->fast_alloc;
 210                p->fast_alloc = NULL;
 211                return n;
 212        }
 213
 214        n = _qnode_pool_remove(p);
 215
 216        if (unlikely(n == NULL)) {
 217                DIAG_COUNT_INC(p, prealloc);
 218                for (i = 0; i < prealloc_size; i++) {
 219                        n = rte_malloc_socket(NULL,
 220                                        sizeof(struct qnode),
 221                                        RTE_CACHE_LINE_SIZE,
 222                                        rte_socket_id());
 223                        if (n == NULL)
 224                                return NULL;
 225
 226                        DIAG_COUNT_INC(p, available);
 227                        DIAG_COUNT_INC(p, capacity);
 228
 229                        n->pool = p;
 230                        _qnode_pool_insert(p, n);
 231                }
 232                n = _qnode_pool_remove(p);
 233        }
 234        n->pool = p;
 235        DIAG_COUNT_INC(p, rd);
 236        DIAG_COUNT_DEC(p, available);
 237        return n;
 238}
 239
 240
 241
 242/*
 243* free a queue node to the per scheduler pool from which it came
 244*/
 245static __rte_always_inline void
 246_qnode_free(struct qnode *n)
 247{
 248        struct qnode_pool *p = n->pool;
 249
 250
 251        if (unlikely(p->fast_alloc != NULL) ||
 252                        unlikely(n->pool != (THIS_SCHED)->qnode_pool)) {
 253                DIAG_COUNT_INC(p, wr);
 254                DIAG_COUNT_INC(p, available);
 255                _qnode_pool_insert(p, n);
 256                return;
 257        }
 258        p->fast_alloc = n;
 259}
 260
 261/*
 262 * Destroy an qnode pool
 263 * queue must be empty when this is called
 264 */
 265static inline int
 266_qnode_pool_destroy(struct qnode_pool *p)
 267{
 268        rte_free(p->stub);
 269        rte_free(p);
 270        return 0;
 271}
 272
 273#ifdef __cplusplus
 274}
 275#endif
 276
 277#endif                          /* LTHREAD_POOL_H_ */
 278