dpdk/examples/performance-thread/common/lthread_sched.c
<<
>>
Prefs
   1/*
   2 * SPDX-License-Identifier: BSD-3-Clause
   3 * Copyright 2015 Intel Corporation.
   4 * Copyright 2012 Hasan Alayli <halayli@gmail.com>
   5 */
   6
   7#define RTE_MEM 1
   8
   9#include <stdio.h>
  10#include <stdlib.h>
  11#include <string.h>
  12#include <stdint.h>
  13#include <stddef.h>
  14#include <limits.h>
  15#include <inttypes.h>
  16#include <unistd.h>
  17#include <pthread.h>
  18#include <fcntl.h>
  19#include <sys/time.h>
  20#include <sys/mman.h>
  21#include <sched.h>
  22
  23#include <rte_prefetch.h>
  24#include <rte_per_lcore.h>
  25#include <rte_atomic.h>
  26#include <rte_atomic_64.h>
  27#include <rte_log.h>
  28#include <rte_common.h>
  29#include <rte_branch_prediction.h>
  30
  31#include "lthread_api.h"
  32#include "lthread_int.h"
  33#include "lthread_sched.h"
  34#include "lthread_objcache.h"
  35#include "lthread_timer.h"
  36#include "lthread_mutex.h"
  37#include "lthread_cond.h"
  38#include "lthread_tls.h"
  39#include "lthread_diag.h"
  40
  41/*
  42 * This file implements the lthread scheduler
  43 * The scheduler is the function lthread_run()
  44 * This must be run as the main loop of an EAL thread.
  45 *
  46 * Currently once a scheduler is created it cannot be destroyed
  47 * When a scheduler shuts down it is assumed that the application is terminating
  48 */
  49
  50static rte_atomic16_t num_schedulers;
  51static rte_atomic16_t active_schedulers;
  52
  53/* one scheduler per lcore */
  54RTE_DEFINE_PER_LCORE(struct lthread_sched *, this_sched) = NULL;
  55
  56struct lthread_sched *schedcore[LTHREAD_MAX_LCORES];
  57
  58diag_callback diag_cb;
  59
  60uint64_t diag_mask;
  61
  62
  63/* constructor */
  64RTE_INIT(lthread_sched_ctor)
  65{
  66        memset(schedcore, 0, sizeof(schedcore));
  67        rte_atomic16_init(&num_schedulers);
  68        rte_atomic16_set(&num_schedulers, 1);
  69        rte_atomic16_init(&active_schedulers);
  70        rte_atomic16_set(&active_schedulers, 0);
  71        diag_cb = NULL;
  72}
  73
  74
  75enum sched_alloc_phase {
  76        SCHED_ALLOC_OK,
  77        SCHED_ALLOC_QNODE_POOL,
  78        SCHED_ALLOC_READY_QUEUE,
  79        SCHED_ALLOC_PREADY_QUEUE,
  80        SCHED_ALLOC_LTHREAD_CACHE,
  81        SCHED_ALLOC_STACK_CACHE,
  82        SCHED_ALLOC_PERLT_CACHE,
  83        SCHED_ALLOC_TLS_CACHE,
  84        SCHED_ALLOC_COND_CACHE,
  85        SCHED_ALLOC_MUTEX_CACHE,
  86};
  87
  88static int
  89_lthread_sched_alloc_resources(struct lthread_sched *new_sched)
  90{
  91        int alloc_status;
  92
  93        do {
  94                /* Initialize per scheduler queue node pool */
  95                alloc_status = SCHED_ALLOC_QNODE_POOL;
  96                new_sched->qnode_pool =
  97                        _qnode_pool_create("qnode pool", LTHREAD_PREALLOC);
  98                if (new_sched->qnode_pool == NULL)
  99                        break;
 100
 101                /* Initialize per scheduler local ready queue */
 102                alloc_status = SCHED_ALLOC_READY_QUEUE;
 103                new_sched->ready = _lthread_queue_create("ready queue");
 104                if (new_sched->ready == NULL)
 105                        break;
 106
 107                /* Initialize per scheduler local peer ready queue */
 108                alloc_status = SCHED_ALLOC_PREADY_QUEUE;
 109                new_sched->pready = _lthread_queue_create("pready queue");
 110                if (new_sched->pready == NULL)
 111                        break;
 112
 113                /* Initialize per scheduler local free lthread cache */
 114                alloc_status = SCHED_ALLOC_LTHREAD_CACHE;
 115                new_sched->lthread_cache =
 116                        _lthread_objcache_create("lthread cache",
 117                                                sizeof(struct lthread),
 118                                                LTHREAD_PREALLOC);
 119                if (new_sched->lthread_cache == NULL)
 120                        break;
 121
 122                /* Initialize per scheduler local free stack cache */
 123                alloc_status = SCHED_ALLOC_STACK_CACHE;
 124                new_sched->stack_cache =
 125                        _lthread_objcache_create("stack_cache",
 126                                                sizeof(struct lthread_stack),
 127                                                LTHREAD_PREALLOC);
 128                if (new_sched->stack_cache == NULL)
 129                        break;
 130
 131                /* Initialize per scheduler local free per lthread data cache */
 132                alloc_status = SCHED_ALLOC_PERLT_CACHE;
 133                new_sched->per_lthread_cache =
 134                        _lthread_objcache_create("per_lt cache",
 135                                                RTE_PER_LTHREAD_SECTION_SIZE,
 136                                                LTHREAD_PREALLOC);
 137                if (new_sched->per_lthread_cache == NULL)
 138                        break;
 139
 140                /* Initialize per scheduler local free tls cache */
 141                alloc_status = SCHED_ALLOC_TLS_CACHE;
 142                new_sched->tls_cache =
 143                        _lthread_objcache_create("TLS cache",
 144                                                sizeof(struct lthread_tls),
 145                                                LTHREAD_PREALLOC);
 146                if (new_sched->tls_cache == NULL)
 147                        break;
 148
 149                /* Initialize per scheduler local free cond var cache */
 150                alloc_status = SCHED_ALLOC_COND_CACHE;
 151                new_sched->cond_cache =
 152                        _lthread_objcache_create("cond cache",
 153                                                sizeof(struct lthread_cond),
 154                                                LTHREAD_PREALLOC);
 155                if (new_sched->cond_cache == NULL)
 156                        break;
 157
 158                /* Initialize per scheduler local free mutex cache */
 159                alloc_status = SCHED_ALLOC_MUTEX_CACHE;
 160                new_sched->mutex_cache =
 161                        _lthread_objcache_create("mutex cache",
 162                                                sizeof(struct lthread_mutex),
 163                                                LTHREAD_PREALLOC);
 164                if (new_sched->mutex_cache == NULL)
 165                        break;
 166
 167                alloc_status = SCHED_ALLOC_OK;
 168        } while (0);
 169
 170        /* roll back on any failure */
 171        switch (alloc_status) {
 172        case SCHED_ALLOC_MUTEX_CACHE:
 173                _lthread_objcache_destroy(new_sched->cond_cache);
 174                /* fall through */
 175        case SCHED_ALLOC_COND_CACHE:
 176                _lthread_objcache_destroy(new_sched->tls_cache);
 177                /* fall through */
 178        case SCHED_ALLOC_TLS_CACHE:
 179                _lthread_objcache_destroy(new_sched->per_lthread_cache);
 180                /* fall through */
 181        case SCHED_ALLOC_PERLT_CACHE:
 182                _lthread_objcache_destroy(new_sched->stack_cache);
 183                /* fall through */
 184        case SCHED_ALLOC_STACK_CACHE:
 185                _lthread_objcache_destroy(new_sched->lthread_cache);
 186                /* fall through */
 187        case SCHED_ALLOC_LTHREAD_CACHE:
 188                _lthread_queue_destroy(new_sched->pready);
 189                /* fall through */
 190        case SCHED_ALLOC_PREADY_QUEUE:
 191                _lthread_queue_destroy(new_sched->ready);
 192                /* fall through */
 193        case SCHED_ALLOC_READY_QUEUE:
 194                _qnode_pool_destroy(new_sched->qnode_pool);
 195                /* fall through */
 196        case SCHED_ALLOC_QNODE_POOL:
 197                /* fall through */
 198        case SCHED_ALLOC_OK:
 199                break;
 200        }
 201        return alloc_status;
 202}
 203
 204
 205/*
 206 * Create a scheduler on the current lcore
 207 */
 208struct lthread_sched *_lthread_sched_create(size_t stack_size)
 209{
 210        int status;
 211        struct lthread_sched *new_sched;
 212        unsigned lcoreid = rte_lcore_id();
 213
 214        RTE_ASSERT(stack_size <= LTHREAD_MAX_STACK_SIZE);
 215
 216        if (stack_size == 0)
 217                stack_size = LTHREAD_MAX_STACK_SIZE;
 218
 219        new_sched =
 220             rte_calloc_socket(NULL, 1, sizeof(struct lthread_sched),
 221                                RTE_CACHE_LINE_SIZE,
 222                                rte_socket_id());
 223        if (new_sched == NULL) {
 224                RTE_LOG(CRIT, LTHREAD,
 225                        "Failed to allocate memory for scheduler\n");
 226                return NULL;
 227        }
 228
 229        _lthread_key_pool_init();
 230
 231        new_sched->stack_size = stack_size;
 232        new_sched->birth = rte_rdtsc();
 233        THIS_SCHED = new_sched;
 234
 235        status = _lthread_sched_alloc_resources(new_sched);
 236        if (status != SCHED_ALLOC_OK) {
 237                RTE_LOG(CRIT, LTHREAD,
 238                        "Failed to allocate resources for scheduler code = %d\n",
 239                        status);
 240                rte_free(new_sched);
 241                return NULL;
 242        }
 243
 244        bzero(&new_sched->ctx, sizeof(struct ctx));
 245
 246        new_sched->lcore_id = lcoreid;
 247
 248        schedcore[lcoreid] = new_sched;
 249
 250        new_sched->run_flag = 1;
 251
 252        DIAG_EVENT(new_sched, LT_DIAG_SCHED_CREATE, rte_lcore_id(), 0);
 253
 254        rte_wmb();
 255        return new_sched;
 256}
 257
 258/*
 259 * Set the number of schedulers in the system
 260 */
 261int lthread_num_schedulers_set(int num)
 262{
 263        rte_atomic16_set(&num_schedulers, num);
 264        return (int)rte_atomic16_read(&num_schedulers);
 265}
 266
 267/*
 268 * Return the number of schedulers active
 269 */
 270int lthread_active_schedulers(void)
 271{
 272        return (int)rte_atomic16_read(&active_schedulers);
 273}
 274
 275
 276/**
 277 * shutdown the scheduler running on the specified lcore
 278 */
 279void lthread_scheduler_shutdown(unsigned lcoreid)
 280{
 281        uint64_t coreid = (uint64_t) lcoreid;
 282
 283        if (coreid < LTHREAD_MAX_LCORES) {
 284                if (schedcore[coreid] != NULL)
 285                        schedcore[coreid]->run_flag = 0;
 286        }
 287}
 288
 289/**
 290 * shutdown all schedulers
 291 */
 292void lthread_scheduler_shutdown_all(void)
 293{
 294        uint64_t i;
 295
 296        /*
 297         * give time for all schedulers to have started
 298         * Note we use sched_yield() rather than pthread_yield() to allow
 299         * for the possibility of a pthread wrapper on lthread_yield(),
 300         * something that is not possible unless the scheduler is running.
 301         */
 302        while (rte_atomic16_read(&active_schedulers) <
 303               rte_atomic16_read(&num_schedulers))
 304                sched_yield();
 305
 306        for (i = 0; i < LTHREAD_MAX_LCORES; i++) {
 307                if (schedcore[i] != NULL)
 308                        schedcore[i]->run_flag = 0;
 309        }
 310}
 311
 312/*
 313 * Resume a suspended lthread
 314 */
 315static __rte_always_inline void
 316_lthread_resume(struct lthread *lt);
 317static inline void _lthread_resume(struct lthread *lt)
 318{
 319        struct lthread_sched *sched = THIS_SCHED;
 320        struct lthread_stack *s;
 321        uint64_t state = lt->state;
 322#if LTHREAD_DIAG
 323        int init = 0;
 324#endif
 325
 326        sched->current_lthread = lt;
 327
 328        if (state & (BIT(ST_LT_CANCELLED) | BIT(ST_LT_EXITED))) {
 329                /* if detached we can free the thread now */
 330                if (state & BIT(ST_LT_DETACH)) {
 331                        _lthread_free(lt);
 332                        sched->current_lthread = NULL;
 333                        return;
 334                }
 335        }
 336
 337        if (state & BIT(ST_LT_INIT)) {
 338                /* first time this thread has been run */
 339                /* assign thread to this scheduler */
 340                lt->sched = THIS_SCHED;
 341
 342                /* allocate stack */
 343                s = _stack_alloc();
 344
 345                lt->stack_container = s;
 346                _lthread_set_stack(lt, s->stack, s->stack_size);
 347
 348                /* allocate memory for TLS used by this thread */
 349                _lthread_tls_alloc(lt);
 350
 351                lt->state = BIT(ST_LT_READY);
 352#if LTHREAD_DIAG
 353                init = 1;
 354#endif
 355        }
 356
 357        DIAG_EVENT(lt, LT_DIAG_LTHREAD_RESUMED, init, lt);
 358
 359        /* switch to the new thread */
 360        ctx_switch(&lt->ctx, &sched->ctx);
 361
 362        /* If posting to a queue that could be read by another lcore
 363         * we defer the queue write till now to ensure the context has been
 364         * saved before the other core tries to resume it
 365         * This applies to blocking on mutex, cond, and to set_affinity
 366         */
 367        if (lt->pending_wr_queue != NULL) {
 368                struct lthread_queue *dest = lt->pending_wr_queue;
 369
 370                lt->pending_wr_queue = NULL;
 371
 372                /* queue the current thread to the specified queue */
 373                _lthread_queue_insert_mp(dest, lt);
 374        }
 375
 376        sched->current_lthread = NULL;
 377}
 378
 379/*
 380 * Handle sleep timer expiry
 381*/
 382void
 383_sched_timer_cb(struct rte_timer *tim, void *arg)
 384{
 385        struct lthread *lt = (struct lthread *) arg;
 386        uint64_t state = lt->state;
 387
 388        DIAG_EVENT(lt, LT_DIAG_LTHREAD_TMR_EXPIRED, &lt->tim, 0);
 389
 390        rte_timer_stop(tim);
 391
 392        if (lt->state & BIT(ST_LT_CANCELLED))
 393                (THIS_SCHED)->nb_blocked_threads--;
 394
 395        lt->state = state | BIT(ST_LT_EXPIRED);
 396        _lthread_resume(lt);
 397        lt->state = state & CLEARBIT(ST_LT_EXPIRED);
 398}
 399
 400
 401
 402/*
 403 * Returns 0 if there is a pending job in scheduler or 1 if done and can exit.
 404 */
 405static inline int _lthread_sched_isdone(struct lthread_sched *sched)
 406{
 407        return (sched->run_flag == 0) &&
 408                        (_lthread_queue_empty(sched->ready)) &&
 409                        (_lthread_queue_empty(sched->pready)) &&
 410                        (sched->nb_blocked_threads == 0);
 411}
 412
 413/*
 414 * Wait for all schedulers to start
 415 */
 416static inline void _lthread_schedulers_sync_start(void)
 417{
 418        rte_atomic16_inc(&active_schedulers);
 419
 420        /* wait for lthread schedulers
 421         * Note we use sched_yield() rather than pthread_yield() to allow
 422         * for the possibility of a pthread wrapper on lthread_yield(),
 423         * something that is not possible unless the scheduler is running.
 424         */
 425        while (rte_atomic16_read(&active_schedulers) <
 426               rte_atomic16_read(&num_schedulers))
 427                sched_yield();
 428
 429}
 430
 431/*
 432 * Wait for all schedulers to stop
 433 */
 434static inline void _lthread_schedulers_sync_stop(void)
 435{
 436        rte_atomic16_dec(&active_schedulers);
 437        rte_atomic16_dec(&num_schedulers);
 438
 439        /* wait for schedulers
 440         * Note we use sched_yield() rather than pthread_yield() to allow
 441         * for the possibility of a pthread wrapper on lthread_yield(),
 442         * something that is not possible unless the scheduler is running.
 443         */
 444        while (rte_atomic16_read(&active_schedulers) > 0)
 445                sched_yield();
 446
 447}
 448
 449
 450/*
 451 * Run the lthread scheduler
 452 * This loop is the heart of the system
 453 */
 454void lthread_run(void)
 455{
 456
 457        struct lthread_sched *sched = THIS_SCHED;
 458        struct lthread *lt = NULL;
 459
 460        RTE_LOG(INFO, LTHREAD,
 461                "starting scheduler %p on lcore %u phys core %u\n",
 462                sched, rte_lcore_id(),
 463                rte_lcore_index(rte_lcore_id()));
 464
 465        /* if more than one, wait for all schedulers to start */
 466        _lthread_schedulers_sync_start();
 467
 468
 469        /*
 470         * This is the main scheduling loop
 471         * So long as there are tasks in existence we run this loop.
 472         * We check for:-
 473         *   expired timers,
 474         *   the local ready queue,
 475         *   and the peer ready queue,
 476         *
 477         * and resume lthreads ad infinitum.
 478         */
 479        while (!_lthread_sched_isdone(sched)) {
 480
 481                rte_timer_manage();
 482
 483                lt = _lthread_queue_poll(sched->ready);
 484                if (lt != NULL)
 485                        _lthread_resume(lt);
 486                lt = _lthread_queue_poll(sched->pready);
 487                if (lt != NULL)
 488                        _lthread_resume(lt);
 489        }
 490
 491
 492        /* if more than one wait for all schedulers to stop */
 493        _lthread_schedulers_sync_stop();
 494
 495        (THIS_SCHED) = NULL;
 496
 497        RTE_LOG(INFO, LTHREAD,
 498                "stopping scheduler %p on lcore %u phys core %u\n",
 499                sched, rte_lcore_id(),
 500                rte_lcore_index(rte_lcore_id()));
 501        fflush(stdout);
 502}
 503
 504/*
 505 * Return the scheduler for this lcore
 506 *
 507 */
 508struct lthread_sched *_lthread_sched_get(unsigned int lcore_id)
 509{
 510        struct lthread_sched *res = NULL;
 511
 512        if (lcore_id < LTHREAD_MAX_LCORES)
 513                res = schedcore[lcore_id];
 514
 515        return res;
 516}
 517
 518/*
 519 * migrate the current thread to another scheduler running
 520 * on the specified lcore.
 521 */
 522int lthread_set_affinity(unsigned lcoreid)
 523{
 524        struct lthread *lt = THIS_LTHREAD;
 525        struct lthread_sched *dest_sched;
 526
 527        if (unlikely(lcoreid >= LTHREAD_MAX_LCORES))
 528                return POSIX_ERRNO(EINVAL);
 529
 530        DIAG_EVENT(lt, LT_DIAG_LTHREAD_AFFINITY, lcoreid, 0);
 531
 532        dest_sched = schedcore[lcoreid];
 533
 534        if (unlikely(dest_sched == NULL))
 535                return POSIX_ERRNO(EINVAL);
 536
 537        if (likely(dest_sched != THIS_SCHED)) {
 538                lt->sched = dest_sched;
 539                lt->pending_wr_queue = dest_sched->pready;
 540                _affinitize();
 541                return 0;
 542        }
 543        return 0;
 544}
 545