dpdk/examples/pipeline/thread.c
<<
>>
Prefs
   1/* SPDX-License-Identifier: BSD-3-Clause
   2 * Copyright(c) 2020 Intel Corporation
   3 */
   4
   5#include <stdlib.h>
   6
   7#include <rte_common.h>
   8#include <rte_cycles.h>
   9#include <rte_lcore.h>
  10#include <rte_ring.h>
  11
  12#include <rte_table_acl.h>
  13#include <rte_table_array.h>
  14#include <rte_table_hash.h>
  15#include <rte_table_lpm.h>
  16#include <rte_table_lpm_ipv6.h>
  17
  18#include "obj.h"
  19#include "thread.h"
  20
  21#ifndef THREAD_PIPELINES_MAX
  22#define THREAD_PIPELINES_MAX                               256
  23#endif
  24
  25#ifndef THREAD_MSGQ_SIZE
  26#define THREAD_MSGQ_SIZE                                   64
  27#endif
  28
  29#ifndef THREAD_TIMER_PERIOD_MS
  30#define THREAD_TIMER_PERIOD_MS                             100
  31#endif
  32
  33/**
  34 * Control thread: data plane thread context
  35 */
  36struct thread {
  37        struct rte_ring *msgq_req;
  38        struct rte_ring *msgq_rsp;
  39
  40        uint32_t enabled;
  41};
  42
  43static struct thread thread[RTE_MAX_LCORE];
  44
  45/**
  46 * Data plane threads: context
  47 */
  48struct pipeline_data {
  49        struct rte_swx_pipeline *p;
  50        uint64_t timer_period; /* Measured in CPU cycles. */
  51        uint64_t time_next;
  52};
  53
  54struct thread_data {
  55        struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
  56        uint32_t n_pipelines;
  57
  58        struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
  59        struct rte_ring *msgq_req;
  60        struct rte_ring *msgq_rsp;
  61        uint64_t timer_period; /* Measured in CPU cycles. */
  62        uint64_t time_next;
  63        uint64_t time_next_min;
  64} __rte_cache_aligned;
  65
  66static struct thread_data thread_data[RTE_MAX_LCORE];
  67
  68/**
  69 * Control thread: data plane thread init
  70 */
  71static void
  72thread_free(void)
  73{
  74        uint32_t i;
  75
  76        for (i = 0; i < RTE_MAX_LCORE; i++) {
  77                struct thread *t = &thread[i];
  78
  79                if (!rte_lcore_is_enabled(i))
  80                        continue;
  81
  82                /* MSGQs */
  83                if (t->msgq_req)
  84                        rte_ring_free(t->msgq_req);
  85
  86                if (t->msgq_rsp)
  87                        rte_ring_free(t->msgq_rsp);
  88        }
  89}
  90
  91int
  92thread_init(void)
  93{
  94        uint32_t i;
  95
  96        RTE_LCORE_FOREACH_WORKER(i) {
  97                char name[NAME_MAX];
  98                struct rte_ring *msgq_req, *msgq_rsp;
  99                struct thread *t = &thread[i];
 100                struct thread_data *t_data = &thread_data[i];
 101                uint32_t cpu_id = rte_lcore_to_socket_id(i);
 102
 103                /* MSGQs */
 104                snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
 105
 106                msgq_req = rte_ring_create(name,
 107                        THREAD_MSGQ_SIZE,
 108                        cpu_id,
 109                        RING_F_SP_ENQ | RING_F_SC_DEQ);
 110
 111                if (msgq_req == NULL) {
 112                        thread_free();
 113                        return -1;
 114                }
 115
 116                snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
 117
 118                msgq_rsp = rte_ring_create(name,
 119                        THREAD_MSGQ_SIZE,
 120                        cpu_id,
 121                        RING_F_SP_ENQ | RING_F_SC_DEQ);
 122
 123                if (msgq_rsp == NULL) {
 124                        thread_free();
 125                        return -1;
 126                }
 127
 128                /* Control thread records */
 129                t->msgq_req = msgq_req;
 130                t->msgq_rsp = msgq_rsp;
 131                t->enabled = 1;
 132
 133                /* Data plane thread records */
 134                t_data->n_pipelines = 0;
 135                t_data->msgq_req = msgq_req;
 136                t_data->msgq_rsp = msgq_rsp;
 137                t_data->timer_period =
 138                        (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
 139                t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
 140                t_data->time_next_min = t_data->time_next;
 141        }
 142
 143        return 0;
 144}
 145
 146static inline int
 147thread_is_running(uint32_t thread_id)
 148{
 149        enum rte_lcore_state_t thread_state;
 150
 151        thread_state = rte_eal_get_lcore_state(thread_id);
 152        return (thread_state == RUNNING) ? 1 : 0;
 153}
 154
 155/**
 156 * Control thread & data plane threads: message passing
 157 */
 158enum thread_req_type {
 159        THREAD_REQ_PIPELINE_ENABLE = 0,
 160        THREAD_REQ_PIPELINE_DISABLE,
 161        THREAD_REQ_MAX
 162};
 163
 164struct thread_msg_req {
 165        enum thread_req_type type;
 166
 167        union {
 168                struct {
 169                        struct rte_swx_pipeline *p;
 170                        uint32_t timer_period_ms;
 171                } pipeline_enable;
 172
 173                struct {
 174                        struct rte_swx_pipeline *p;
 175                } pipeline_disable;
 176        };
 177};
 178
 179struct thread_msg_rsp {
 180        int status;
 181};
 182
 183/**
 184 * Control thread
 185 */
 186static struct thread_msg_req *
 187thread_msg_alloc(void)
 188{
 189        size_t size = RTE_MAX(sizeof(struct thread_msg_req),
 190                sizeof(struct thread_msg_rsp));
 191
 192        return calloc(1, size);
 193}
 194
 195static void
 196thread_msg_free(struct thread_msg_rsp *rsp)
 197{
 198        free(rsp);
 199}
 200
 201static struct thread_msg_rsp *
 202thread_msg_send_recv(uint32_t thread_id,
 203        struct thread_msg_req *req)
 204{
 205        struct thread *t = &thread[thread_id];
 206        struct rte_ring *msgq_req = t->msgq_req;
 207        struct rte_ring *msgq_rsp = t->msgq_rsp;
 208        struct thread_msg_rsp *rsp;
 209        int status;
 210
 211        /* send */
 212        do {
 213                status = rte_ring_sp_enqueue(msgq_req, req);
 214        } while (status == -ENOBUFS);
 215
 216        /* recv */
 217        do {
 218                status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
 219        } while (status != 0);
 220
 221        return rsp;
 222}
 223
 224int
 225thread_pipeline_enable(uint32_t thread_id,
 226        struct obj *obj,
 227        const char *pipeline_name)
 228{
 229        struct pipeline *p = pipeline_find(obj, pipeline_name);
 230        struct thread *t;
 231        struct thread_msg_req *req;
 232        struct thread_msg_rsp *rsp;
 233        int status;
 234
 235        /* Check input params */
 236        if ((thread_id >= RTE_MAX_LCORE) ||
 237                (p == NULL))
 238                return -1;
 239
 240        t = &thread[thread_id];
 241        if (t->enabled == 0)
 242                return -1;
 243
 244        if (!thread_is_running(thread_id)) {
 245                struct thread_data *td = &thread_data[thread_id];
 246                struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
 247
 248                if (td->n_pipelines >= THREAD_PIPELINES_MAX)
 249                        return -1;
 250
 251                /* Data plane thread */
 252                td->p[td->n_pipelines] = p->p;
 253
 254                tdp->p = p->p;
 255                tdp->timer_period =
 256                        (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
 257                tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
 258
 259                td->n_pipelines++;
 260
 261                /* Pipeline */
 262                p->thread_id = thread_id;
 263                p->enabled = 1;
 264
 265                return 0;
 266        }
 267
 268        /* Allocate request */
 269        req = thread_msg_alloc();
 270        if (req == NULL)
 271                return -1;
 272
 273        /* Write request */
 274        req->type = THREAD_REQ_PIPELINE_ENABLE;
 275        req->pipeline_enable.p = p->p;
 276        req->pipeline_enable.timer_period_ms = p->timer_period_ms;
 277
 278        /* Send request and wait for response */
 279        rsp = thread_msg_send_recv(thread_id, req);
 280
 281        /* Read response */
 282        status = rsp->status;
 283
 284        /* Free response */
 285        thread_msg_free(rsp);
 286
 287        /* Request completion */
 288        if (status)
 289                return status;
 290
 291        p->thread_id = thread_id;
 292        p->enabled = 1;
 293
 294        return 0;
 295}
 296
 297int
 298thread_pipeline_disable(uint32_t thread_id,
 299        struct obj *obj,
 300        const char *pipeline_name)
 301{
 302        struct pipeline *p = pipeline_find(obj, pipeline_name);
 303        struct thread *t;
 304        struct thread_msg_req *req;
 305        struct thread_msg_rsp *rsp;
 306        int status;
 307
 308        /* Check input params */
 309        if ((thread_id >= RTE_MAX_LCORE) ||
 310                (p == NULL))
 311                return -1;
 312
 313        t = &thread[thread_id];
 314        if (t->enabled == 0)
 315                return -1;
 316
 317        if (p->enabled == 0)
 318                return 0;
 319
 320        if (p->thread_id != thread_id)
 321                return -1;
 322
 323        if (!thread_is_running(thread_id)) {
 324                struct thread_data *td = &thread_data[thread_id];
 325                uint32_t i;
 326
 327                for (i = 0; i < td->n_pipelines; i++) {
 328                        struct pipeline_data *tdp = &td->pipeline_data[i];
 329
 330                        if (tdp->p != p->p)
 331                                continue;
 332
 333                        /* Data plane thread */
 334                        if (i < td->n_pipelines - 1) {
 335                                struct rte_swx_pipeline *pipeline_last =
 336                                        td->p[td->n_pipelines - 1];
 337                                struct pipeline_data *tdp_last =
 338                                        &td->pipeline_data[td->n_pipelines - 1];
 339
 340                                td->p[i] = pipeline_last;
 341                                memcpy(tdp, tdp_last, sizeof(*tdp));
 342                        }
 343
 344                        td->n_pipelines--;
 345
 346                        /* Pipeline */
 347                        p->enabled = 0;
 348
 349                        break;
 350                }
 351
 352                return 0;
 353        }
 354
 355        /* Allocate request */
 356        req = thread_msg_alloc();
 357        if (req == NULL)
 358                return -1;
 359
 360        /* Write request */
 361        req->type = THREAD_REQ_PIPELINE_DISABLE;
 362        req->pipeline_disable.p = p->p;
 363
 364        /* Send request and wait for response */
 365        rsp = thread_msg_send_recv(thread_id, req);
 366
 367        /* Read response */
 368        status = rsp->status;
 369
 370        /* Free response */
 371        thread_msg_free(rsp);
 372
 373        /* Request completion */
 374        if (status)
 375                return status;
 376
 377        p->enabled = 0;
 378
 379        return 0;
 380}
 381
 382/**
 383 * Data plane threads: message handling
 384 */
 385static inline struct thread_msg_req *
 386thread_msg_recv(struct rte_ring *msgq_req)
 387{
 388        struct thread_msg_req *req;
 389
 390        int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
 391
 392        if (status != 0)
 393                return NULL;
 394
 395        return req;
 396}
 397
 398static inline void
 399thread_msg_send(struct rte_ring *msgq_rsp,
 400        struct thread_msg_rsp *rsp)
 401{
 402        int status;
 403
 404        do {
 405                status = rte_ring_sp_enqueue(msgq_rsp, rsp);
 406        } while (status == -ENOBUFS);
 407}
 408
 409static struct thread_msg_rsp *
 410thread_msg_handle_pipeline_enable(struct thread_data *t,
 411        struct thread_msg_req *req)
 412{
 413        struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
 414        struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
 415
 416        /* Request */
 417        if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
 418                rsp->status = -1;
 419                return rsp;
 420        }
 421
 422        t->p[t->n_pipelines] = req->pipeline_enable.p;
 423
 424        p->p = req->pipeline_enable.p;
 425        p->timer_period = (rte_get_tsc_hz() *
 426                req->pipeline_enable.timer_period_ms) / 1000;
 427        p->time_next = rte_get_tsc_cycles() + p->timer_period;
 428
 429        t->n_pipelines++;
 430
 431        /* Response */
 432        rsp->status = 0;
 433        return rsp;
 434}
 435
 436static struct thread_msg_rsp *
 437thread_msg_handle_pipeline_disable(struct thread_data *t,
 438        struct thread_msg_req *req)
 439{
 440        struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
 441        uint32_t n_pipelines = t->n_pipelines;
 442        struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
 443        uint32_t i;
 444
 445        /* find pipeline */
 446        for (i = 0; i < n_pipelines; i++) {
 447                struct pipeline_data *p = &t->pipeline_data[i];
 448
 449                if (p->p != pipeline)
 450                        continue;
 451
 452                if (i < n_pipelines - 1) {
 453                        struct rte_swx_pipeline *pipeline_last =
 454                                t->p[n_pipelines - 1];
 455                        struct pipeline_data *p_last =
 456                                &t->pipeline_data[n_pipelines - 1];
 457
 458                        t->p[i] = pipeline_last;
 459                        memcpy(p, p_last, sizeof(*p));
 460                }
 461
 462                t->n_pipelines--;
 463
 464                rsp->status = 0;
 465                return rsp;
 466        }
 467
 468        /* should not get here */
 469        rsp->status = 0;
 470        return rsp;
 471}
 472
 473static void
 474thread_msg_handle(struct thread_data *t)
 475{
 476        for ( ; ; ) {
 477                struct thread_msg_req *req;
 478                struct thread_msg_rsp *rsp;
 479
 480                req = thread_msg_recv(t->msgq_req);
 481                if (req == NULL)
 482                        break;
 483
 484                switch (req->type) {
 485                case THREAD_REQ_PIPELINE_ENABLE:
 486                        rsp = thread_msg_handle_pipeline_enable(t, req);
 487                        break;
 488
 489                case THREAD_REQ_PIPELINE_DISABLE:
 490                        rsp = thread_msg_handle_pipeline_disable(t, req);
 491                        break;
 492
 493                default:
 494                        rsp = (struct thread_msg_rsp *) req;
 495                        rsp->status = -1;
 496                }
 497
 498                thread_msg_send(t->msgq_rsp, rsp);
 499        }
 500}
 501
 502/**
 503 * Data plane threads: main
 504 */
 505int
 506thread_main(void *arg __rte_unused)
 507{
 508        struct thread_data *t;
 509        uint32_t thread_id, i;
 510
 511        thread_id = rte_lcore_id();
 512        t = &thread_data[thread_id];
 513
 514        /* Dispatch loop */
 515        for (i = 0; ; i++) {
 516                uint32_t j;
 517
 518                /* Data Plane */
 519                for (j = 0; j < t->n_pipelines; j++)
 520                        rte_swx_pipeline_run(t->p[j], 1000000);
 521
 522                /* Control Plane */
 523                if ((i & 0xF) == 0) {
 524                        uint64_t time = rte_get_tsc_cycles();
 525                        uint64_t time_next_min = UINT64_MAX;
 526
 527                        if (time < t->time_next_min)
 528                                continue;
 529
 530                        /* Thread message queues */
 531                        {
 532                                uint64_t time_next = t->time_next;
 533
 534                                if (time_next <= time) {
 535                                        thread_msg_handle(t);
 536                                        time_next = time + t->timer_period;
 537                                        t->time_next = time_next;
 538                                }
 539
 540                                if (time_next < time_next_min)
 541                                        time_next_min = time_next;
 542                        }
 543
 544                        t->time_next_min = time_next_min;
 545                }
 546        }
 547
 548        return 0;
 549}
 550