dpdk/examples/pipeline/thread.c
<<
>>
Prefs
   1/* SPDX-License-Identifier: BSD-3-Clause
   2 * Copyright(c) 2020 Intel Corporation
   3 */
   4
   5#include <stdlib.h>
   6
   7#include <rte_common.h>
   8#include <rte_cycles.h>
   9#include <rte_lcore.h>
  10#include <rte_ring.h>
  11
  12#include <rte_table_acl.h>
  13#include <rte_table_array.h>
  14#include <rte_table_hash.h>
  15#include <rte_table_lpm.h>
  16#include <rte_table_lpm_ipv6.h>
  17
  18#include "obj.h"
  19#include "thread.h"
  20
  21#ifndef THREAD_PIPELINES_MAX
  22#define THREAD_PIPELINES_MAX                               256
  23#endif
  24
  25#ifndef THREAD_MSGQ_SIZE
  26#define THREAD_MSGQ_SIZE                                   64
  27#endif
  28
  29#ifndef THREAD_TIMER_PERIOD_MS
  30#define THREAD_TIMER_PERIOD_MS                             100
  31#endif
  32
  33/* Pipeline instruction quanta: Needs to be big enough to do some meaningful
  34 * work, but not too big to avoid starving any other pipelines mapped to the
  35 * same thread. For a pipeline that executes 10 instructions per packet, a
  36 * quanta of 1000 instructions equates to processing 100 packets.
  37 */
  38#ifndef PIPELINE_INSTR_QUANTA
  39#define PIPELINE_INSTR_QUANTA                              1000
  40#endif
  41
  42/**
  43 * Control thread: data plane thread context
  44 */
  45struct thread {
  46        struct rte_ring *msgq_req;
  47        struct rte_ring *msgq_rsp;
  48
  49        uint32_t enabled;
  50};
  51
  52static struct thread thread[RTE_MAX_LCORE];
  53
  54/**
  55 * Data plane threads: context
  56 */
  57struct pipeline_data {
  58        struct rte_swx_pipeline *p;
  59        uint64_t timer_period; /* Measured in CPU cycles. */
  60        uint64_t time_next;
  61};
  62
  63struct thread_data {
  64        struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
  65        uint32_t n_pipelines;
  66
  67        struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
  68        struct rte_ring *msgq_req;
  69        struct rte_ring *msgq_rsp;
  70        uint64_t timer_period; /* Measured in CPU cycles. */
  71        uint64_t time_next;
  72        uint64_t time_next_min;
  73} __rte_cache_aligned;
  74
  75static struct thread_data thread_data[RTE_MAX_LCORE];
  76
  77/**
  78 * Control thread: data plane thread init
  79 */
  80static void
  81thread_free(void)
  82{
  83        uint32_t i;
  84
  85        for (i = 0; i < RTE_MAX_LCORE; i++) {
  86                struct thread *t = &thread[i];
  87
  88                if (!rte_lcore_is_enabled(i))
  89                        continue;
  90
  91                /* MSGQs */
  92                rte_ring_free(t->msgq_req);
  93
  94                rte_ring_free(t->msgq_rsp);
  95        }
  96}
  97
  98int
  99thread_init(void)
 100{
 101        uint32_t i;
 102
 103        RTE_LCORE_FOREACH_WORKER(i) {
 104                char name[NAME_MAX];
 105                struct rte_ring *msgq_req, *msgq_rsp;
 106                struct thread *t = &thread[i];
 107                struct thread_data *t_data = &thread_data[i];
 108                uint32_t cpu_id = rte_lcore_to_socket_id(i);
 109
 110                /* MSGQs */
 111                snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
 112
 113                msgq_req = rte_ring_create(name,
 114                        THREAD_MSGQ_SIZE,
 115                        cpu_id,
 116                        RING_F_SP_ENQ | RING_F_SC_DEQ);
 117
 118                if (msgq_req == NULL) {
 119                        thread_free();
 120                        return -1;
 121                }
 122
 123                snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
 124
 125                msgq_rsp = rte_ring_create(name,
 126                        THREAD_MSGQ_SIZE,
 127                        cpu_id,
 128                        RING_F_SP_ENQ | RING_F_SC_DEQ);
 129
 130                if (msgq_rsp == NULL) {
 131                        thread_free();
 132                        return -1;
 133                }
 134
 135                /* Control thread records */
 136                t->msgq_req = msgq_req;
 137                t->msgq_rsp = msgq_rsp;
 138                t->enabled = 1;
 139
 140                /* Data plane thread records */
 141                t_data->n_pipelines = 0;
 142                t_data->msgq_req = msgq_req;
 143                t_data->msgq_rsp = msgq_rsp;
 144                t_data->timer_period =
 145                        (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
 146                t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
 147                t_data->time_next_min = t_data->time_next;
 148        }
 149
 150        return 0;
 151}
 152
 153static inline int
 154thread_is_running(uint32_t thread_id)
 155{
 156        enum rte_lcore_state_t thread_state;
 157
 158        thread_state = rte_eal_get_lcore_state(thread_id);
 159        return (thread_state == RUNNING) ? 1 : 0;
 160}
 161
 162/**
 163 * Control thread & data plane threads: message passing
 164 */
 165enum thread_req_type {
 166        THREAD_REQ_PIPELINE_ENABLE = 0,
 167        THREAD_REQ_PIPELINE_DISABLE,
 168        THREAD_REQ_MAX
 169};
 170
 171struct thread_msg_req {
 172        enum thread_req_type type;
 173
 174        union {
 175                struct {
 176                        struct rte_swx_pipeline *p;
 177                        uint32_t timer_period_ms;
 178                } pipeline_enable;
 179
 180                struct {
 181                        struct rte_swx_pipeline *p;
 182                } pipeline_disable;
 183        };
 184};
 185
 186struct thread_msg_rsp {
 187        int status;
 188};
 189
 190/**
 191 * Control thread
 192 */
 193static struct thread_msg_req *
 194thread_msg_alloc(void)
 195{
 196        size_t size = RTE_MAX(sizeof(struct thread_msg_req),
 197                sizeof(struct thread_msg_rsp));
 198
 199        return calloc(1, size);
 200}
 201
 202static void
 203thread_msg_free(struct thread_msg_rsp *rsp)
 204{
 205        free(rsp);
 206}
 207
 208static struct thread_msg_rsp *
 209thread_msg_send_recv(uint32_t thread_id,
 210        struct thread_msg_req *req)
 211{
 212        struct thread *t = &thread[thread_id];
 213        struct rte_ring *msgq_req = t->msgq_req;
 214        struct rte_ring *msgq_rsp = t->msgq_rsp;
 215        struct thread_msg_rsp *rsp;
 216        int status;
 217
 218        /* send */
 219        do {
 220                status = rte_ring_sp_enqueue(msgq_req, req);
 221        } while (status == -ENOBUFS);
 222
 223        /* recv */
 224        do {
 225                status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
 226        } while (status != 0);
 227
 228        return rsp;
 229}
 230
 231int
 232thread_pipeline_enable(uint32_t thread_id,
 233        struct obj *obj,
 234        const char *pipeline_name)
 235{
 236        struct pipeline *p = pipeline_find(obj, pipeline_name);
 237        struct thread *t;
 238        struct thread_msg_req *req;
 239        struct thread_msg_rsp *rsp;
 240        int status;
 241
 242        /* Check input params */
 243        if ((thread_id >= RTE_MAX_LCORE) ||
 244                (p == NULL))
 245                return -1;
 246
 247        t = &thread[thread_id];
 248        if (t->enabled == 0)
 249                return -1;
 250
 251        if (!thread_is_running(thread_id)) {
 252                struct thread_data *td = &thread_data[thread_id];
 253                struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
 254
 255                if (td->n_pipelines >= THREAD_PIPELINES_MAX)
 256                        return -1;
 257
 258                /* Data plane thread */
 259                td->p[td->n_pipelines] = p->p;
 260
 261                tdp->p = p->p;
 262                tdp->timer_period =
 263                        (rte_get_tsc_hz() * p->timer_period_ms) / 1000;
 264                tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
 265
 266                td->n_pipelines++;
 267
 268                /* Pipeline */
 269                p->thread_id = thread_id;
 270                p->enabled = 1;
 271
 272                return 0;
 273        }
 274
 275        /* Allocate request */
 276        req = thread_msg_alloc();
 277        if (req == NULL)
 278                return -1;
 279
 280        /* Write request */
 281        req->type = THREAD_REQ_PIPELINE_ENABLE;
 282        req->pipeline_enable.p = p->p;
 283        req->pipeline_enable.timer_period_ms = p->timer_period_ms;
 284
 285        /* Send request and wait for response */
 286        rsp = thread_msg_send_recv(thread_id, req);
 287
 288        /* Read response */
 289        status = rsp->status;
 290
 291        /* Free response */
 292        thread_msg_free(rsp);
 293
 294        /* Request completion */
 295        if (status)
 296                return status;
 297
 298        p->thread_id = thread_id;
 299        p->enabled = 1;
 300
 301        return 0;
 302}
 303
 304int
 305thread_pipeline_disable(uint32_t thread_id,
 306        struct obj *obj,
 307        const char *pipeline_name)
 308{
 309        struct pipeline *p = pipeline_find(obj, pipeline_name);
 310        struct thread *t;
 311        struct thread_msg_req *req;
 312        struct thread_msg_rsp *rsp;
 313        int status;
 314
 315        /* Check input params */
 316        if ((thread_id >= RTE_MAX_LCORE) ||
 317                (p == NULL))
 318                return -1;
 319
 320        t = &thread[thread_id];
 321        if (t->enabled == 0)
 322                return -1;
 323
 324        if (p->enabled == 0)
 325                return 0;
 326
 327        if (p->thread_id != thread_id)
 328                return -1;
 329
 330        if (!thread_is_running(thread_id)) {
 331                struct thread_data *td = &thread_data[thread_id];
 332                uint32_t i;
 333
 334                for (i = 0; i < td->n_pipelines; i++) {
 335                        struct pipeline_data *tdp = &td->pipeline_data[i];
 336
 337                        if (tdp->p != p->p)
 338                                continue;
 339
 340                        /* Data plane thread */
 341                        if (i < td->n_pipelines - 1) {
 342                                struct rte_swx_pipeline *pipeline_last =
 343                                        td->p[td->n_pipelines - 1];
 344                                struct pipeline_data *tdp_last =
 345                                        &td->pipeline_data[td->n_pipelines - 1];
 346
 347                                td->p[i] = pipeline_last;
 348                                memcpy(tdp, tdp_last, sizeof(*tdp));
 349                        }
 350
 351                        td->n_pipelines--;
 352
 353                        /* Pipeline */
 354                        p->enabled = 0;
 355
 356                        break;
 357                }
 358
 359                return 0;
 360        }
 361
 362        /* Allocate request */
 363        req = thread_msg_alloc();
 364        if (req == NULL)
 365                return -1;
 366
 367        /* Write request */
 368        req->type = THREAD_REQ_PIPELINE_DISABLE;
 369        req->pipeline_disable.p = p->p;
 370
 371        /* Send request and wait for response */
 372        rsp = thread_msg_send_recv(thread_id, req);
 373
 374        /* Read response */
 375        status = rsp->status;
 376
 377        /* Free response */
 378        thread_msg_free(rsp);
 379
 380        /* Request completion */
 381        if (status)
 382                return status;
 383
 384        p->enabled = 0;
 385
 386        return 0;
 387}
 388
 389/**
 390 * Data plane threads: message handling
 391 */
 392static inline struct thread_msg_req *
 393thread_msg_recv(struct rte_ring *msgq_req)
 394{
 395        struct thread_msg_req *req;
 396
 397        int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
 398
 399        if (status != 0)
 400                return NULL;
 401
 402        return req;
 403}
 404
 405static inline void
 406thread_msg_send(struct rte_ring *msgq_rsp,
 407        struct thread_msg_rsp *rsp)
 408{
 409        int status;
 410
 411        do {
 412                status = rte_ring_sp_enqueue(msgq_rsp, rsp);
 413        } while (status == -ENOBUFS);
 414}
 415
 416static struct thread_msg_rsp *
 417thread_msg_handle_pipeline_enable(struct thread_data *t,
 418        struct thread_msg_req *req)
 419{
 420        struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
 421        struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
 422
 423        /* Request */
 424        if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
 425                rsp->status = -1;
 426                return rsp;
 427        }
 428
 429        t->p[t->n_pipelines] = req->pipeline_enable.p;
 430
 431        p->p = req->pipeline_enable.p;
 432        p->timer_period = (rte_get_tsc_hz() *
 433                req->pipeline_enable.timer_period_ms) / 1000;
 434        p->time_next = rte_get_tsc_cycles() + p->timer_period;
 435
 436        t->n_pipelines++;
 437
 438        /* Response */
 439        rsp->status = 0;
 440        return rsp;
 441}
 442
 443static struct thread_msg_rsp *
 444thread_msg_handle_pipeline_disable(struct thread_data *t,
 445        struct thread_msg_req *req)
 446{
 447        struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
 448        uint32_t n_pipelines = t->n_pipelines;
 449        struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
 450        uint32_t i;
 451
 452        /* find pipeline */
 453        for (i = 0; i < n_pipelines; i++) {
 454                struct pipeline_data *p = &t->pipeline_data[i];
 455
 456                if (p->p != pipeline)
 457                        continue;
 458
 459                if (i < n_pipelines - 1) {
 460                        struct rte_swx_pipeline *pipeline_last =
 461                                t->p[n_pipelines - 1];
 462                        struct pipeline_data *p_last =
 463                                &t->pipeline_data[n_pipelines - 1];
 464
 465                        t->p[i] = pipeline_last;
 466                        memcpy(p, p_last, sizeof(*p));
 467                }
 468
 469                t->n_pipelines--;
 470
 471                rsp->status = 0;
 472                return rsp;
 473        }
 474
 475        /* should not get here */
 476        rsp->status = 0;
 477        return rsp;
 478}
 479
 480static void
 481thread_msg_handle(struct thread_data *t)
 482{
 483        for ( ; ; ) {
 484                struct thread_msg_req *req;
 485                struct thread_msg_rsp *rsp;
 486
 487                req = thread_msg_recv(t->msgq_req);
 488                if (req == NULL)
 489                        break;
 490
 491                switch (req->type) {
 492                case THREAD_REQ_PIPELINE_ENABLE:
 493                        rsp = thread_msg_handle_pipeline_enable(t, req);
 494                        break;
 495
 496                case THREAD_REQ_PIPELINE_DISABLE:
 497                        rsp = thread_msg_handle_pipeline_disable(t, req);
 498                        break;
 499
 500                default:
 501                        rsp = (struct thread_msg_rsp *) req;
 502                        rsp->status = -1;
 503                }
 504
 505                thread_msg_send(t->msgq_rsp, rsp);
 506        }
 507}
 508
 509/**
 510 * Data plane threads: main
 511 */
 512int
 513thread_main(void *arg __rte_unused)
 514{
 515        struct thread_data *t;
 516        uint32_t thread_id, i;
 517
 518        thread_id = rte_lcore_id();
 519        t = &thread_data[thread_id];
 520
 521        /* Dispatch loop */
 522        for (i = 0; ; i++) {
 523                uint32_t j;
 524
 525                /* Data Plane */
 526                for (j = 0; j < t->n_pipelines; j++)
 527                        rte_swx_pipeline_run(t->p[j], PIPELINE_INSTR_QUANTA);
 528
 529                /* Control Plane */
 530                if ((i & 0xF) == 0) {
 531                        uint64_t time = rte_get_tsc_cycles();
 532                        uint64_t time_next_min = UINT64_MAX;
 533
 534                        if (time < t->time_next_min)
 535                                continue;
 536
 537                        /* Thread message queues */
 538                        {
 539                                uint64_t time_next = t->time_next;
 540
 541                                if (time_next <= time) {
 542                                        thread_msg_handle(t);
 543                                        time_next = time + t->timer_period;
 544                                        t->time_next = time_next;
 545                                }
 546
 547                                if (time_next < time_next_min)
 548                                        time_next_min = time_next;
 549                        }
 550
 551                        t->time_next_min = time_next_min;
 552                }
 553        }
 554
 555        return 0;
 556}
 557