dpdk/examples/bbdev_app/main.c
<<
>>
Prefs
   1/* SPDX-License-Identifier: BSD-3-Clause
   2 * Copyright(c) 2017 Intel Corporation
   3 */
   4
   5#include <stdio.h>
   6#include <stdlib.h>
   7#include <string.h>
   8#include <stdint.h>
   9#include <inttypes.h>
  10#include <sys/types.h>
  11#include <sys/unistd.h>
  12#include <sys/queue.h>
  13#include <stdarg.h>
  14#include <ctype.h>
  15#include <errno.h>
  16#include <math.h>
  17#include <assert.h>
  18#include <getopt.h>
  19#include <signal.h>
  20
  21#include <rte_atomic.h>
  22#include <rte_common.h>
  23#include <rte_eal.h>
  24#include <rte_cycles.h>
  25#include <rte_ether.h>
  26#include <rte_ethdev.h>
  27#include <rte_ip.h>
  28#include <rte_lcore.h>
  29#include <rte_malloc.h>
  30#include <rte_mbuf.h>
  31#include <rte_mbuf_dyn.h>
  32#include <rte_memory.h>
  33#include <rte_mempool.h>
  34#include <rte_log.h>
  35#include <rte_bbdev.h>
  36#include <rte_bbdev_op.h>
  37
  38/* LLR values - negative value for '1' bit */
  39#define LLR_1_BIT 0x81
  40#define LLR_0_BIT 0x7F
  41
  42#define MAX_PKT_BURST 32
  43#define NB_MBUF 8191
  44#define MEMPOOL_CACHE_SIZE 256
  45
  46/* Hardcoded K value */
  47#define K 40
  48#define NCB (3 * RTE_ALIGN_CEIL(K + 4, 32))
  49
  50#define CRC_24B_LEN 3
  51
  52/* Configurable number of RX/TX ring descriptors */
  53#define RTE_TEST_RX_DESC_DEFAULT 128
  54#define RTE_TEST_TX_DESC_DEFAULT 512
  55
  56#define BBDEV_ASSERT(a) do { \
  57        if (!(a)) { \
  58                usage(prgname); \
  59                return -1; \
  60        } \
  61} while (0)
  62
  63static int input_dynfield_offset = -1;
  64
  65static inline struct rte_mbuf **
  66mbuf_input(struct rte_mbuf *mbuf)
  67{
  68        return RTE_MBUF_DYNFIELD(mbuf,
  69                        input_dynfield_offset, struct rte_mbuf **);
  70}
  71
  72static const struct rte_eth_conf port_conf = {
  73        .rxmode = {
  74                .mq_mode = ETH_MQ_RX_NONE,
  75                .max_rx_pkt_len = RTE_ETHER_MAX_LEN,
  76                .split_hdr_size = 0,
  77        },
  78        .txmode = {
  79                .mq_mode = ETH_MQ_TX_NONE,
  80        },
  81};
  82
  83struct rte_bbdev_op_turbo_enc def_op_enc = {
  84        /* These values are arbitrarily put, and does not map to the real
  85         * values for the data received from ethdev ports
  86         */
  87        .rv_index = 0,
  88        .code_block_mode = 1,
  89        .cb_params = {
  90                .k = K,
  91        },
  92        .op_flags = RTE_BBDEV_TURBO_CRC_24A_ATTACH
  93};
  94
  95struct rte_bbdev_op_turbo_dec def_op_dec = {
  96        /* These values are arbitrarily put, and does not map to the real
  97         * values for the data received from ethdev ports
  98         */
  99        .code_block_mode = 1,
 100        .cb_params = {
 101                .k = K,
 102        },
 103        .rv_index = 0,
 104        .iter_max = 8,
 105        .iter_min = 4,
 106        .ext_scale = 15,
 107        .num_maps = 0,
 108        .op_flags = RTE_BBDEV_TURBO_NEG_LLR_1_BIT_IN
 109};
 110
 111struct app_config_params {
 112        /* Placeholders for app params */
 113        uint16_t port_id;
 114        uint16_t bbdev_id;
 115        uint64_t enc_core_mask;
 116        uint64_t dec_core_mask;
 117
 118        /* Values filled during init time */
 119        uint16_t enc_queue_ids[RTE_MAX_LCORE];
 120        uint16_t dec_queue_ids[RTE_MAX_LCORE];
 121        uint16_t num_enc_cores;
 122        uint16_t num_dec_cores;
 123};
 124
 125struct lcore_statistics {
 126        unsigned int enqueued;
 127        unsigned int dequeued;
 128        unsigned int rx_lost_packets;
 129        unsigned int enc_to_dec_lost_packets;
 130        unsigned int tx_lost_packets;
 131} __rte_cache_aligned;
 132
 133/** each lcore configuration */
 134struct lcore_conf {
 135        uint64_t core_type;
 136
 137        unsigned int port_id;
 138        unsigned int rx_queue_id;
 139        unsigned int tx_queue_id;
 140
 141        unsigned int bbdev_id;
 142        unsigned int enc_queue_id;
 143        unsigned int dec_queue_id;
 144
 145        uint8_t llr_temp_buf[NCB];
 146
 147        struct rte_mempool *bbdev_dec_op_pool;
 148        struct rte_mempool *bbdev_enc_op_pool;
 149        struct rte_mempool *enc_out_pool;
 150        struct rte_ring *enc_to_dec_ring;
 151
 152        struct lcore_statistics *lcore_stats;
 153} __rte_cache_aligned;
 154
 155struct stats_lcore_params {
 156        struct lcore_conf *lconf;
 157        struct app_config_params *app_params;
 158};
 159
 160
 161static const struct app_config_params def_app_config = {
 162        .port_id = 0,
 163        .bbdev_id = 0,
 164        .enc_core_mask = 0x2,
 165        .dec_core_mask = 0x4,
 166        .num_enc_cores = 1,
 167        .num_dec_cores = 1,
 168};
 169
 170static rte_atomic16_t global_exit_flag;
 171
 172/* display usage */
 173static inline void
 174usage(const char *prgname)
 175{
 176        printf("%s [EAL options] "
 177                        "  --\n"
 178                        "  --enc_cores - number of encoding cores (default = 0x2)\n"
 179                        "  --dec_cores - number of decoding cores (default = 0x4)\n"
 180                        "  --port_id - Ethernet port ID (default = 0)\n"
 181                        "  --bbdev_id - BBDev ID (default = 0)\n"
 182                        "\n", prgname);
 183}
 184
 185/* parse core mask */
 186static inline
 187uint16_t bbdev_parse_mask(const char *mask)
 188{
 189        char *end = NULL;
 190        unsigned long pm;
 191
 192        /* parse hexadecimal string */
 193        pm = strtoul(mask, &end, 16);
 194        if ((mask[0] == '\0') || (end == NULL) || (*end != '\0'))
 195                return 0;
 196
 197        return pm;
 198}
 199
 200/* parse core mask */
 201static inline
 202uint16_t bbdev_parse_number(const char *mask)
 203{
 204        char *end = NULL;
 205        unsigned long pm;
 206
 207        /* parse hexadecimal string */
 208        pm = strtoul(mask, &end, 10);
 209        if ((mask[0] == '\0') || (end == NULL) || (*end != '\0'))
 210                return 0;
 211
 212        return pm;
 213}
 214
 215static int
 216bbdev_parse_args(int argc, char **argv,
 217                struct app_config_params *app_params)
 218{
 219        int optind = 0;
 220        int opt;
 221        int opt_indx = 0;
 222        char *prgname = argv[0];
 223
 224        static struct option lgopts[] = {
 225                { "enc_core_mask", required_argument, 0, 'e' },
 226                { "dec_core_mask", required_argument, 0, 'd' },
 227                { "port_id", required_argument, 0, 'p' },
 228                { "bbdev_id", required_argument, 0, 'b' },
 229                { NULL, 0, 0, 0 }
 230        };
 231
 232        BBDEV_ASSERT(argc != 0);
 233        BBDEV_ASSERT(argv != NULL);
 234        BBDEV_ASSERT(app_params != NULL);
 235
 236        while ((opt = getopt_long(argc, argv, "e:d:p:b:", lgopts, &opt_indx)) !=
 237                EOF) {
 238                switch (opt) {
 239                case 'e':
 240                        app_params->enc_core_mask =
 241                                bbdev_parse_mask(optarg);
 242                        if (app_params->enc_core_mask == 0) {
 243                                usage(prgname);
 244                                return -1;
 245                        }
 246                        app_params->num_enc_cores =
 247                                __builtin_popcount(app_params->enc_core_mask);
 248                        break;
 249
 250                case 'd':
 251                        app_params->dec_core_mask =
 252                                bbdev_parse_mask(optarg);
 253                        if (app_params->dec_core_mask == 0) {
 254                                usage(prgname);
 255                                return -1;
 256                        }
 257                        app_params->num_dec_cores =
 258                                __builtin_popcount(app_params->dec_core_mask);
 259                        break;
 260
 261                case 'p':
 262                        app_params->port_id = bbdev_parse_number(optarg);
 263                        break;
 264
 265                case 'b':
 266                        app_params->bbdev_id = bbdev_parse_number(optarg);
 267                        break;
 268
 269                default:
 270                        usage(prgname);
 271                        return -1;
 272                }
 273        }
 274        optind = 0;
 275        return optind;
 276}
 277
 278static void
 279signal_handler(int signum)
 280{
 281        printf("\nSignal %d received\n", signum);
 282        rte_atomic16_set(&global_exit_flag, 1);
 283}
 284
 285static void
 286print_mac(unsigned int portid, struct rte_ether_addr *bbdev_ports_eth_address)
 287{
 288        printf("Port %u, MAC address: %02X:%02X:%02X:%02X:%02X:%02X\n\n",
 289                        (unsigned int) portid,
 290                        bbdev_ports_eth_address->addr_bytes[0],
 291                        bbdev_ports_eth_address->addr_bytes[1],
 292                        bbdev_ports_eth_address->addr_bytes[2],
 293                        bbdev_ports_eth_address->addr_bytes[3],
 294                        bbdev_ports_eth_address->addr_bytes[4],
 295                        bbdev_ports_eth_address->addr_bytes[5]);
 296}
 297
 298static inline void
 299pktmbuf_free_bulk(struct rte_mbuf **mbufs, unsigned int nb_to_free)
 300{
 301        unsigned int i;
 302        for (i = 0; i < nb_to_free; ++i)
 303                rte_pktmbuf_free(mbufs[i]);
 304}
 305
 306static inline void
 307pktmbuf_input_free_bulk(struct rte_mbuf **mbufs, unsigned int nb_to_free)
 308{
 309        unsigned int i;
 310        for (i = 0; i < nb_to_free; ++i) {
 311                struct rte_mbuf *rx_pkt = *mbuf_input(mbufs[i]);
 312                rte_pktmbuf_free(rx_pkt);
 313                rte_pktmbuf_free(mbufs[i]);
 314        }
 315}
 316
 317/* Check the link status of all ports in up to 9s, and print them finally */
 318static int
 319check_port_link_status(uint16_t port_id)
 320{
 321#define CHECK_INTERVAL 100 /* 100ms */
 322#define MAX_CHECK_TIME 90 /* 9s (90 * 100ms) in total */
 323        uint8_t count;
 324        struct rte_eth_link link;
 325        int link_get_err = -EINVAL;
 326
 327        printf("\nChecking link status.");
 328        fflush(stdout);
 329
 330        for (count = 0; count <= MAX_CHECK_TIME &&
 331                        !rte_atomic16_read(&global_exit_flag); count++) {
 332                memset(&link, 0, sizeof(link));
 333                link_get_err = rte_eth_link_get_nowait(port_id, &link);
 334
 335                if (link_get_err >= 0 && link.link_status) {
 336                        const char *dp = (link.link_duplex ==
 337                                ETH_LINK_FULL_DUPLEX) ?
 338                                "full-duplex" : "half-duplex";
 339                        printf("\nPort %u Link Up - speed %s - %s\n",
 340                                port_id,
 341                                rte_eth_link_speed_to_str(link.link_speed),
 342                                dp);
 343                        return 0;
 344                }
 345                printf(".");
 346                fflush(stdout);
 347                rte_delay_ms(CHECK_INTERVAL);
 348        }
 349
 350        if (link_get_err >= 0)
 351                printf("\nPort %d Link Down\n", port_id);
 352        else
 353                printf("\nGet link failed (port %d): %s\n", port_id,
 354                       rte_strerror(-link_get_err));
 355
 356        return 0;
 357}
 358
 359static inline void
 360add_ether_hdr(struct rte_mbuf *pkt_src, struct rte_mbuf *pkt_dst)
 361{
 362        struct rte_ether_hdr *eth_from;
 363        struct rte_ether_hdr *eth_to;
 364
 365        eth_from = rte_pktmbuf_mtod(pkt_src, struct rte_ether_hdr *);
 366        eth_to = rte_pktmbuf_mtod(pkt_dst, struct rte_ether_hdr *);
 367
 368        /* copy header */
 369        rte_memcpy(eth_to, eth_from, sizeof(struct rte_ether_hdr));
 370}
 371
 372static inline void
 373add_awgn(struct rte_mbuf **mbufs, uint16_t num_pkts)
 374{
 375        RTE_SET_USED(mbufs);
 376        RTE_SET_USED(num_pkts);
 377}
 378
 379/* Encoder output to Decoder input adapter. The Decoder accepts only soft input
 380 * so each bit of the encoder output must be translated into one byte of LLR. If
 381 * Sub-block Deinterleaver is bypassed, which is the case, the padding bytes
 382 * must additionally be insterted at the end of each sub-block.
 383 */
 384static inline void
 385transform_enc_out_dec_in(struct rte_mbuf **mbufs, uint8_t *temp_buf,
 386                uint16_t num_pkts, uint16_t k)
 387{
 388        uint16_t i, l, j;
 389        uint16_t start_bit_idx;
 390        uint16_t out_idx;
 391        uint16_t d = k + 4;
 392        uint16_t kpi = RTE_ALIGN_CEIL(d, 32);
 393        uint16_t nd = kpi - d;
 394        uint16_t ncb = 3 * kpi;
 395
 396        for (i = 0; i < num_pkts; ++i) {
 397                uint16_t pkt_data_len = rte_pktmbuf_data_len(mbufs[i]) -
 398                                sizeof(struct rte_ether_hdr);
 399
 400                /* Resize the packet if needed */
 401                if (pkt_data_len < ncb) {
 402                        char *data = rte_pktmbuf_append(mbufs[i],
 403                                        ncb - pkt_data_len);
 404                        if (data == NULL)
 405                                printf(
 406                                        "Not enough space in decoder input packet");
 407                }
 408
 409                /* Translate each bit into 1 LLR byte. */
 410                start_bit_idx = 0;
 411                out_idx = 0;
 412                for (j = 0; j < 3; ++j) {
 413                        for (l = start_bit_idx; l < start_bit_idx + d; ++l) {
 414                                uint8_t *data = rte_pktmbuf_mtod_offset(
 415                                        mbufs[i], uint8_t *,
 416                                        sizeof(struct rte_ether_hdr) +
 417                                        (l >> 3));
 418                                if (*data & (0x80 >> (l & 7)))
 419                                        temp_buf[out_idx] = LLR_1_BIT;
 420                                else
 421                                        temp_buf[out_idx] = LLR_0_BIT;
 422                                ++out_idx;
 423                        }
 424                        /* Padding bytes should be at the end of the sub-block.
 425                         */
 426                        memset(&temp_buf[out_idx], 0, nd);
 427                        out_idx += nd;
 428                        start_bit_idx += d;
 429                }
 430
 431                rte_memcpy(rte_pktmbuf_mtod_offset(mbufs[i], uint8_t *,
 432                                sizeof(struct rte_ether_hdr)), temp_buf, ncb);
 433        }
 434}
 435
 436static inline void
 437verify_data(struct rte_mbuf **mbufs, uint16_t num_pkts)
 438{
 439        uint16_t i;
 440        for (i = 0; i < num_pkts; ++i) {
 441                struct rte_mbuf *out = mbufs[i];
 442                struct rte_mbuf *in = *mbuf_input(out);
 443
 444                if (memcmp(rte_pktmbuf_mtod_offset(in, uint8_t *,
 445                                sizeof(struct rte_ether_hdr)),
 446                                rte_pktmbuf_mtod_offset(out, uint8_t *,
 447                                sizeof(struct rte_ether_hdr)),
 448                                K / 8 - CRC_24B_LEN))
 449                        printf("Input and output buffers are not equal!\n");
 450        }
 451}
 452
 453static int
 454initialize_ports(struct app_config_params *app_params,
 455                struct rte_mempool *ethdev_mbuf_mempool)
 456{
 457        int ret;
 458        uint16_t port_id = app_params->port_id;
 459        uint16_t q;
 460        /* ethernet addresses of ports */
 461        struct rte_ether_addr bbdev_port_eth_addr;
 462
 463        /* initialize ports */
 464        printf("\nInitializing port %u...\n", app_params->port_id);
 465        ret = rte_eth_dev_configure(port_id, app_params->num_enc_cores,
 466                app_params->num_dec_cores, &port_conf);
 467
 468        if (ret < 0) {
 469                printf("Cannot configure device: err=%d, port=%u\n",
 470                        ret, port_id);
 471                return -1;
 472        }
 473
 474        /* initialize RX queues for encoder */
 475        for (q = 0; q < app_params->num_enc_cores; q++) {
 476                ret = rte_eth_rx_queue_setup(port_id, q,
 477                        RTE_TEST_RX_DESC_DEFAULT,
 478                        rte_eth_dev_socket_id(port_id),
 479                        NULL, ethdev_mbuf_mempool);
 480                if (ret < 0) {
 481                        printf("rte_eth_rx_queue_setup: err=%d, queue=%u\n",
 482                                ret, q);
 483                        return -1;
 484                }
 485        }
 486        /* initialize TX queues for decoder */
 487        for (q = 0; q < app_params->num_dec_cores; q++) {
 488                ret = rte_eth_tx_queue_setup(port_id, q,
 489                        RTE_TEST_TX_DESC_DEFAULT,
 490                        rte_eth_dev_socket_id(port_id), NULL);
 491                if (ret < 0) {
 492                        printf("rte_eth_tx_queue_setup: err=%d, queue=%u\n",
 493                                ret, q);
 494                        return -1;
 495                }
 496        }
 497
 498        ret = rte_eth_promiscuous_enable(port_id);
 499        if (ret != 0) {
 500                printf("Cannot enable promiscuous mode: err=%s, port=%u\n",
 501                        rte_strerror(-ret), port_id);
 502                return ret;
 503        }
 504
 505        ret = rte_eth_macaddr_get(port_id, &bbdev_port_eth_addr);
 506        if (ret < 0) {
 507                printf("rte_eth_macaddr_get: err=%d, queue=%u\n",
 508                        ret, q);
 509                return -1;
 510        }
 511
 512        print_mac(port_id, &bbdev_port_eth_addr);
 513
 514        return 0;
 515}
 516
 517static void
 518lcore_conf_init(struct app_config_params *app_params,
 519                struct lcore_conf *lcore_conf,
 520                struct rte_mempool **bbdev_op_pools,
 521                struct rte_mempool *bbdev_mbuf_mempool,
 522                struct rte_ring *enc_to_dec_ring,
 523                struct lcore_statistics *lcore_stats)
 524{
 525        unsigned int lcore_id;
 526        struct lcore_conf *lconf;
 527        uint16_t rx_queue_id = 0;
 528        uint16_t tx_queue_id = 0;
 529        uint16_t enc_q_id = 0;
 530        uint16_t dec_q_id = 0;
 531
 532        /* Configure lcores */
 533        for (lcore_id = 0; lcore_id < 8 * sizeof(uint64_t); ++lcore_id) {
 534                lconf = &lcore_conf[lcore_id];
 535                lconf->core_type = 0;
 536
 537                if ((1ULL << lcore_id) & app_params->enc_core_mask) {
 538                        lconf->core_type |= (1 << RTE_BBDEV_OP_TURBO_ENC);
 539                        lconf->rx_queue_id = rx_queue_id++;
 540                        lconf->enc_queue_id =
 541                                        app_params->enc_queue_ids[enc_q_id++];
 542                }
 543
 544                if ((1ULL << lcore_id) & app_params->dec_core_mask) {
 545                        lconf->core_type |= (1 << RTE_BBDEV_OP_TURBO_DEC);
 546                        lconf->tx_queue_id = tx_queue_id++;
 547                        lconf->dec_queue_id =
 548                                        app_params->dec_queue_ids[dec_q_id++];
 549                }
 550
 551                lconf->bbdev_enc_op_pool =
 552                                bbdev_op_pools[RTE_BBDEV_OP_TURBO_ENC];
 553                lconf->bbdev_dec_op_pool =
 554                                bbdev_op_pools[RTE_BBDEV_OP_TURBO_DEC];
 555                lconf->bbdev_id = app_params->bbdev_id;
 556                lconf->port_id = app_params->port_id;
 557                lconf->enc_out_pool = bbdev_mbuf_mempool;
 558                lconf->enc_to_dec_ring = enc_to_dec_ring;
 559                lconf->lcore_stats = &lcore_stats[lcore_id];
 560        }
 561}
 562
 563static void
 564print_lcore_stats(struct lcore_statistics *lstats, unsigned int lcore_id)
 565{
 566        static const char *stats_border = "_______";
 567
 568        printf("\nLcore %d: %s enqueued count:\t\t%u\n",
 569                        lcore_id, stats_border, lstats->enqueued);
 570        printf("Lcore %d: %s dequeued count:\t\t%u\n",
 571                        lcore_id, stats_border, lstats->dequeued);
 572        printf("Lcore %d: %s RX lost packets count:\t\t%u\n",
 573                        lcore_id, stats_border, lstats->rx_lost_packets);
 574        printf("Lcore %d: %s encoder-to-decoder lost count:\t%u\n",
 575                        lcore_id, stats_border,
 576                        lstats->enc_to_dec_lost_packets);
 577        printf("Lcore %d: %s TX lost packets count:\t\t%u\n",
 578                        lcore_id, stats_border, lstats->tx_lost_packets);
 579}
 580
 581static void
 582print_stats(struct stats_lcore_params *stats_lcore)
 583{
 584        unsigned int l_id;
 585        unsigned int bbdev_id = stats_lcore->app_params->bbdev_id;
 586        unsigned int port_id = stats_lcore->app_params->port_id;
 587        int len, ret, i;
 588
 589        struct rte_eth_xstat *xstats;
 590        struct rte_eth_xstat_name *xstats_names;
 591        struct rte_bbdev_stats bbstats;
 592        static const char *stats_border = "_______";
 593
 594        const char clr[] = { 27, '[', '2', 'J', '\0' };
 595        const char topLeft[] = { 27, '[', '1', ';', '1', 'H', '\0' };
 596
 597        /* Clear screen and move to top left */
 598        printf("%s%s", clr, topLeft);
 599
 600        printf("PORT STATISTICS:\n================\n");
 601        len = rte_eth_xstats_get(port_id, NULL, 0);
 602        if (len < 0)
 603                rte_exit(EXIT_FAILURE,
 604                                "rte_eth_xstats_get(%u) failed: %d", port_id,
 605                                len);
 606
 607        xstats = calloc(len, sizeof(*xstats));
 608        if (xstats == NULL)
 609                rte_exit(EXIT_FAILURE,
 610                                "Failed to calloc memory for xstats");
 611
 612        ret = rte_eth_xstats_get(port_id, xstats, len);
 613        if (ret < 0 || ret > len) {
 614                free(xstats);
 615                rte_exit(EXIT_FAILURE,
 616                                "rte_eth_xstats_get(%u) len%i failed: %d",
 617                                port_id, len, ret);
 618        }
 619
 620        xstats_names = calloc(len, sizeof(*xstats_names));
 621        if (xstats_names == NULL) {
 622                free(xstats);
 623                rte_exit(EXIT_FAILURE,
 624                                "Failed to calloc memory for xstats_names");
 625        }
 626
 627        ret = rte_eth_xstats_get_names(port_id, xstats_names, len);
 628        if (ret < 0 || ret > len) {
 629                free(xstats);
 630                free(xstats_names);
 631                rte_exit(EXIT_FAILURE,
 632                                "rte_eth_xstats_get_names(%u) len%i failed: %d",
 633                                port_id, len, ret);
 634        }
 635
 636        for (i = 0; i < len; i++) {
 637                if (xstats[i].value > 0)
 638                        printf("Port %u: %s %s:\t\t%"PRIu64"\n",
 639                                        port_id, stats_border,
 640                                        xstats_names[i].name,
 641                                        xstats[i].value);
 642        }
 643
 644        ret = rte_bbdev_stats_get(bbdev_id, &bbstats);
 645        if (ret < 0) {
 646                free(xstats);
 647                free(xstats_names);
 648                rte_exit(EXIT_FAILURE,
 649                                "ERROR(%d): Failure to get BBDEV %u statistics\n",
 650                                ret, bbdev_id);
 651        }
 652
 653        printf("\nBBDEV STATISTICS:\n=================\n");
 654        printf("BBDEV %u: %s enqueue count:\t\t%"PRIu64"\n",
 655                        bbdev_id, stats_border,
 656                        bbstats.enqueued_count);
 657        printf("BBDEV %u: %s dequeue count:\t\t%"PRIu64"\n",
 658                        bbdev_id, stats_border,
 659                        bbstats.dequeued_count);
 660        printf("BBDEV %u: %s enqueue error count:\t\t%"PRIu64"\n",
 661                        bbdev_id, stats_border,
 662                        bbstats.enqueue_err_count);
 663        printf("BBDEV %u: %s dequeue error count:\t\t%"PRIu64"\n\n",
 664                        bbdev_id, stats_border,
 665                        bbstats.dequeue_err_count);
 666
 667        printf("LCORE STATISTICS:\n=================\n");
 668        for (l_id = 0; l_id < RTE_MAX_LCORE; ++l_id) {
 669                if (stats_lcore->lconf[l_id].core_type == 0)
 670                        continue;
 671                print_lcore_stats(stats_lcore->lconf[l_id].lcore_stats, l_id);
 672        }
 673
 674        fflush(stdout);
 675
 676        free(xstats);
 677        free(xstats_names);
 678}
 679
 680static int
 681stats_loop(void *arg)
 682{
 683        struct stats_lcore_params *stats_lcore = arg;
 684
 685        while (!rte_atomic16_read(&global_exit_flag)) {
 686                print_stats(stats_lcore);
 687                rte_delay_ms(500);
 688        }
 689
 690        return 0;
 691}
 692
 693static inline void
 694run_encoding(struct lcore_conf *lcore_conf)
 695{
 696        uint16_t i;
 697        uint16_t port_id, rx_queue_id;
 698        uint16_t bbdev_id, enc_queue_id;
 699        uint16_t nb_rx, nb_enq, nb_deq, nb_sent;
 700        struct rte_mbuf *rx_pkts_burst[MAX_PKT_BURST];
 701        struct rte_mbuf *enc_out_pkts[MAX_PKT_BURST];
 702        struct rte_bbdev_enc_op *bbdev_ops_burst[MAX_PKT_BURST];
 703        struct lcore_statistics *lcore_stats;
 704        struct rte_mempool *bbdev_op_pool, *enc_out_pool;
 705        struct rte_ring *enc_to_dec_ring;
 706        const int in_data_len = (def_op_enc.cb_params.k / 8) - CRC_24B_LEN;
 707
 708        lcore_stats = lcore_conf->lcore_stats;
 709        port_id = lcore_conf->port_id;
 710        rx_queue_id = lcore_conf->rx_queue_id;
 711        bbdev_id = lcore_conf->bbdev_id;
 712        enc_queue_id = lcore_conf->enc_queue_id;
 713        bbdev_op_pool = lcore_conf->bbdev_enc_op_pool;
 714        enc_out_pool = lcore_conf->enc_out_pool;
 715        enc_to_dec_ring = lcore_conf->enc_to_dec_ring;
 716
 717        /* Read packet from RX queues*/
 718        nb_rx = rte_eth_rx_burst(port_id, rx_queue_id, rx_pkts_burst,
 719                        MAX_PKT_BURST);
 720        if (!nb_rx)
 721                return;
 722
 723        if (unlikely(rte_mempool_get_bulk(enc_out_pool, (void **)enc_out_pkts,
 724                        nb_rx) != 0)) {
 725                pktmbuf_free_bulk(rx_pkts_burst, nb_rx);
 726                lcore_stats->rx_lost_packets += nb_rx;
 727                return;
 728        }
 729
 730        if (unlikely(rte_bbdev_enc_op_alloc_bulk(bbdev_op_pool, bbdev_ops_burst,
 731                        nb_rx) != 0)) {
 732                pktmbuf_free_bulk(enc_out_pkts, nb_rx);
 733                pktmbuf_free_bulk(rx_pkts_burst, nb_rx);
 734                lcore_stats->rx_lost_packets += nb_rx;
 735                return;
 736        }
 737
 738        for (i = 0; i < nb_rx; i++) {
 739                char *data;
 740                const uint16_t pkt_data_len =
 741                                rte_pktmbuf_data_len(rx_pkts_burst[i]) -
 742                                sizeof(struct rte_ether_hdr);
 743                /* save input mbuf pointer for later comparison */
 744                *mbuf_input(enc_out_pkts[i]) = rx_pkts_burst[i];
 745
 746                /* copy ethernet header */
 747                rte_pktmbuf_reset(enc_out_pkts[i]);
 748                data = rte_pktmbuf_append(enc_out_pkts[i],
 749                                sizeof(struct rte_ether_hdr));
 750                if (data == NULL) {
 751                        printf(
 752                                "Not enough space for ethernet header in encoder output mbuf\n");
 753                        continue;
 754                }
 755                add_ether_hdr(rx_pkts_burst[i], enc_out_pkts[i]);
 756
 757                /* set op */
 758                bbdev_ops_burst[i]->turbo_enc = def_op_enc;
 759
 760                bbdev_ops_burst[i]->turbo_enc.input.data =
 761                                rx_pkts_burst[i];
 762                bbdev_ops_burst[i]->turbo_enc.input.offset =
 763                                sizeof(struct rte_ether_hdr);
 764                /* Encoder will attach the CRC24B, adjust the length */
 765                bbdev_ops_burst[i]->turbo_enc.input.length = in_data_len;
 766
 767                if (in_data_len < pkt_data_len)
 768                        rte_pktmbuf_trim(rx_pkts_burst[i], pkt_data_len -
 769                                        in_data_len);
 770                else if (in_data_len > pkt_data_len) {
 771                        data = rte_pktmbuf_append(rx_pkts_burst[i],
 772                                        in_data_len - pkt_data_len);
 773                        if (data == NULL)
 774                                printf(
 775                                        "Not enough storage in mbuf to perform the encoding\n");
 776                }
 777
 778                bbdev_ops_burst[i]->turbo_enc.output.data =
 779                                enc_out_pkts[i];
 780                bbdev_ops_burst[i]->turbo_enc.output.offset =
 781                                sizeof(struct rte_ether_hdr);
 782        }
 783
 784        /* Enqueue packets on BBDevice */
 785        nb_enq = rte_bbdev_enqueue_enc_ops(bbdev_id, enc_queue_id,
 786                        bbdev_ops_burst, nb_rx);
 787        if (unlikely(nb_enq < nb_rx)) {
 788                pktmbuf_input_free_bulk(&enc_out_pkts[nb_enq],
 789                                nb_rx - nb_enq);
 790                rte_bbdev_enc_op_free_bulk(&bbdev_ops_burst[nb_enq],
 791                                nb_rx - nb_enq);
 792                lcore_stats->rx_lost_packets += nb_rx - nb_enq;
 793
 794                if (!nb_enq)
 795                        return;
 796        }
 797
 798        lcore_stats->enqueued += nb_enq;
 799
 800        /* Dequeue packets from bbdev device*/
 801        nb_deq = 0;
 802        do {
 803                nb_deq += rte_bbdev_dequeue_enc_ops(bbdev_id, enc_queue_id,
 804                                &bbdev_ops_burst[nb_deq], nb_enq - nb_deq);
 805        } while (unlikely(nb_deq < nb_enq));
 806
 807        lcore_stats->dequeued += nb_deq;
 808
 809        /* Generate and add AWGN */
 810        add_awgn(enc_out_pkts, nb_deq);
 811
 812        rte_bbdev_enc_op_free_bulk(bbdev_ops_burst, nb_deq);
 813
 814        /* Enqueue packets to encoder-to-decoder ring */
 815        nb_sent = rte_ring_enqueue_burst(enc_to_dec_ring, (void **)enc_out_pkts,
 816                        nb_deq, NULL);
 817        if (unlikely(nb_sent < nb_deq)) {
 818                pktmbuf_input_free_bulk(&enc_out_pkts[nb_sent],
 819                                nb_deq - nb_sent);
 820                lcore_stats->enc_to_dec_lost_packets += nb_deq - nb_sent;
 821        }
 822}
 823
 824static void
 825run_decoding(struct lcore_conf *lcore_conf)
 826{
 827        uint16_t i;
 828        uint16_t port_id, tx_queue_id;
 829        uint16_t bbdev_id, bbdev_queue_id;
 830        uint16_t nb_recv, nb_enq, nb_deq, nb_tx;
 831        uint8_t *llr_temp_buf;
 832        struct rte_mbuf *recv_pkts_burst[MAX_PKT_BURST];
 833        struct rte_bbdev_dec_op *bbdev_ops_burst[MAX_PKT_BURST];
 834        struct lcore_statistics *lcore_stats;
 835        struct rte_mempool *bbdev_op_pool;
 836        struct rte_ring *enc_to_dec_ring;
 837
 838        lcore_stats = lcore_conf->lcore_stats;
 839        port_id = lcore_conf->port_id;
 840        tx_queue_id = lcore_conf->tx_queue_id;
 841        bbdev_id = lcore_conf->bbdev_id;
 842        bbdev_queue_id = lcore_conf->dec_queue_id;
 843        bbdev_op_pool = lcore_conf->bbdev_dec_op_pool;
 844        enc_to_dec_ring = lcore_conf->enc_to_dec_ring;
 845        llr_temp_buf = lcore_conf->llr_temp_buf;
 846
 847        /* Dequeue packets from the ring */
 848        nb_recv = rte_ring_dequeue_burst(enc_to_dec_ring,
 849                        (void **)recv_pkts_burst, MAX_PKT_BURST, NULL);
 850        if (!nb_recv)
 851                return;
 852
 853        if (unlikely(rte_bbdev_dec_op_alloc_bulk(bbdev_op_pool, bbdev_ops_burst,
 854                        nb_recv) != 0)) {
 855                pktmbuf_input_free_bulk(recv_pkts_burst, nb_recv);
 856                lcore_stats->rx_lost_packets += nb_recv;
 857                return;
 858        }
 859
 860        transform_enc_out_dec_in(recv_pkts_burst, llr_temp_buf, nb_recv,
 861                        def_op_dec.cb_params.k);
 862
 863        for (i = 0; i < nb_recv; i++) {
 864                /* set op */
 865                bbdev_ops_burst[i]->turbo_dec = def_op_dec;
 866
 867                bbdev_ops_burst[i]->turbo_dec.input.data = recv_pkts_burst[i];
 868                bbdev_ops_burst[i]->turbo_dec.input.offset =
 869                                sizeof(struct rte_ether_hdr);
 870                bbdev_ops_burst[i]->turbo_dec.input.length =
 871                                rte_pktmbuf_data_len(recv_pkts_burst[i])
 872                                - sizeof(struct rte_ether_hdr);
 873
 874                bbdev_ops_burst[i]->turbo_dec.hard_output.data =
 875                                recv_pkts_burst[i];
 876                bbdev_ops_burst[i]->turbo_dec.hard_output.offset =
 877                                sizeof(struct rte_ether_hdr);
 878        }
 879
 880        /* Enqueue packets on BBDevice */
 881        nb_enq = rte_bbdev_enqueue_dec_ops(bbdev_id, bbdev_queue_id,
 882                        bbdev_ops_burst, nb_recv);
 883        if (unlikely(nb_enq < nb_recv)) {
 884                pktmbuf_input_free_bulk(&recv_pkts_burst[nb_enq],
 885                                nb_recv - nb_enq);
 886                rte_bbdev_dec_op_free_bulk(&bbdev_ops_burst[nb_enq],
 887                                nb_recv - nb_enq);
 888                lcore_stats->rx_lost_packets += nb_recv - nb_enq;
 889
 890                if (!nb_enq)
 891                        return;
 892        }
 893
 894        lcore_stats->enqueued += nb_enq;
 895
 896        /* Dequeue packets from BBDevice */
 897        nb_deq = 0;
 898        do {
 899                nb_deq += rte_bbdev_dequeue_dec_ops(bbdev_id, bbdev_queue_id,
 900                                &bbdev_ops_burst[nb_deq], nb_enq - nb_deq);
 901        } while (unlikely(nb_deq < nb_enq));
 902
 903        lcore_stats->dequeued += nb_deq;
 904
 905        rte_bbdev_dec_op_free_bulk(bbdev_ops_burst, nb_deq);
 906
 907        verify_data(recv_pkts_burst, nb_deq);
 908
 909        /* Free the RX mbufs after verification */
 910        for (i = 0; i < nb_deq; ++i)
 911                rte_pktmbuf_free(*mbuf_input(recv_pkts_burst[i]));
 912
 913        /* Transmit the packets */
 914        nb_tx = rte_eth_tx_burst(port_id, tx_queue_id, recv_pkts_burst, nb_deq);
 915        if (unlikely(nb_tx < nb_deq)) {
 916                pktmbuf_input_free_bulk(&recv_pkts_burst[nb_tx],
 917                                nb_deq - nb_tx);
 918                lcore_stats->tx_lost_packets += nb_deq - nb_tx;
 919        }
 920}
 921
 922static int
 923processing_loop(void *arg)
 924{
 925        struct lcore_conf *lcore_conf = arg;
 926        const bool run_encoder = (lcore_conf->core_type &
 927                        (1 << RTE_BBDEV_OP_TURBO_ENC));
 928        const bool run_decoder = (lcore_conf->core_type &
 929                        (1 << RTE_BBDEV_OP_TURBO_DEC));
 930
 931        while (!rte_atomic16_read(&global_exit_flag)) {
 932                if (run_encoder)
 933                        run_encoding(lcore_conf);
 934                if (run_decoder)
 935                        run_decoding(lcore_conf);
 936        }
 937
 938        return 0;
 939}
 940
 941static int
 942prepare_bbdev_device(unsigned int dev_id, struct rte_bbdev_info *info,
 943                struct app_config_params *app_params)
 944{
 945        int ret;
 946        unsigned int q_id, dec_q_id, enc_q_id;
 947        struct rte_bbdev_queue_conf qconf = {0};
 948        uint16_t dec_qs_nb = app_params->num_dec_cores;
 949        uint16_t enc_qs_nb = app_params->num_enc_cores;
 950        uint16_t tot_qs = dec_qs_nb + enc_qs_nb;
 951
 952        ret = rte_bbdev_setup_queues(dev_id, tot_qs, info->socket_id);
 953        if (ret < 0)
 954                rte_exit(EXIT_FAILURE,
 955                                "ERROR(%d): BBDEV %u not configured properly\n",
 956                                ret, dev_id);
 957
 958        /* setup device DEC queues */
 959        qconf.socket = info->socket_id;
 960        qconf.queue_size = info->drv.queue_size_lim;
 961        qconf.op_type = RTE_BBDEV_OP_TURBO_DEC;
 962
 963        for (q_id = 0, dec_q_id = 0; q_id < dec_qs_nb; q_id++) {
 964                ret = rte_bbdev_queue_configure(dev_id, q_id, &qconf);
 965                if (ret < 0)
 966                        rte_exit(EXIT_FAILURE,
 967                                        "ERROR(%d): BBDEV %u DEC queue %u not configured properly\n",
 968                                        ret, dev_id, q_id);
 969                app_params->dec_queue_ids[dec_q_id++] = q_id;
 970        }
 971
 972        /* setup device ENC queues */
 973        qconf.op_type = RTE_BBDEV_OP_TURBO_ENC;
 974
 975        for (q_id = dec_qs_nb, enc_q_id = 0; q_id < tot_qs; q_id++) {
 976                ret = rte_bbdev_queue_configure(dev_id, q_id, &qconf);
 977                if (ret < 0)
 978                        rte_exit(EXIT_FAILURE,
 979                                        "ERROR(%d): BBDEV %u ENC queue %u not configured properly\n",
 980                                        ret, dev_id, q_id);
 981                app_params->enc_queue_ids[enc_q_id++] = q_id;
 982        }
 983
 984        ret = rte_bbdev_start(dev_id);
 985
 986        if (ret != 0)
 987                rte_exit(EXIT_FAILURE, "ERROR(%d): BBDEV %u not started\n",
 988                        ret, dev_id);
 989
 990        printf("BBdev %u started\n", dev_id);
 991
 992        return 0;
 993}
 994
 995static inline bool
 996check_matching_capabilities(uint64_t mask, uint64_t required_mask)
 997{
 998        return (mask & required_mask) == required_mask;
 999}
1000
1001static void
1002enable_bbdev(struct app_config_params *app_params)
1003{
1004        struct rte_bbdev_info dev_info;
1005        const struct rte_bbdev_op_cap *op_cap;
1006        uint16_t bbdev_id = app_params->bbdev_id;
1007        bool encoder_capable = false;
1008        bool decoder_capable = false;
1009
1010        rte_bbdev_info_get(bbdev_id, &dev_info);
1011        op_cap = dev_info.drv.capabilities;
1012
1013        while (op_cap->type != RTE_BBDEV_OP_NONE) {
1014                if (op_cap->type == RTE_BBDEV_OP_TURBO_ENC) {
1015                        if (check_matching_capabilities(
1016                                        op_cap->cap.turbo_enc.capability_flags,
1017                                        def_op_enc.op_flags))
1018                                encoder_capable = true;
1019                }
1020
1021                if (op_cap->type == RTE_BBDEV_OP_TURBO_DEC) {
1022                        if (check_matching_capabilities(
1023                                        op_cap->cap.turbo_dec.capability_flags,
1024                                        def_op_dec.op_flags))
1025                                decoder_capable = true;
1026                }
1027
1028                op_cap++;
1029        }
1030
1031        if (encoder_capable == false)
1032                rte_exit(EXIT_FAILURE,
1033                        "The specified BBDev %u doesn't have required encoder capabilities!\n",
1034                        bbdev_id);
1035        if (decoder_capable == false)
1036                rte_exit(EXIT_FAILURE,
1037                        "The specified BBDev %u doesn't have required decoder capabilities!\n",
1038                        bbdev_id);
1039
1040        prepare_bbdev_device(bbdev_id, &dev_info, app_params);
1041}
1042
1043int
1044main(int argc, char **argv)
1045{
1046        int ret;
1047        unsigned int nb_bbdevs, flags, lcore_id;
1048        void *sigret;
1049        struct app_config_params app_params = def_app_config;
1050        struct rte_mempool *ethdev_mbuf_mempool, *bbdev_mbuf_mempool;
1051        struct rte_mempool *bbdev_op_pools[RTE_BBDEV_OP_TYPE_COUNT];
1052        struct lcore_conf lcore_conf[RTE_MAX_LCORE] = { {0} };
1053        struct lcore_statistics lcore_stats[RTE_MAX_LCORE] = { {0} };
1054        struct stats_lcore_params stats_lcore;
1055        struct rte_ring *enc_to_dec_ring;
1056        bool stats_thread_started = false;
1057        unsigned int main_lcore_id = rte_get_main_lcore();
1058
1059        static const struct rte_mbuf_dynfield input_dynfield_desc = {
1060                .name = "example_bbdev_dynfield_input",
1061                .size = sizeof(struct rte_mbuf *),
1062                .align = __alignof__(struct rte_mbuf *),
1063        };
1064
1065        rte_atomic16_init(&global_exit_flag);
1066
1067        sigret = signal(SIGTERM, signal_handler);
1068        if (sigret == SIG_ERR)
1069                rte_exit(EXIT_FAILURE, "signal(%d, ...) failed", SIGTERM);
1070
1071        sigret = signal(SIGINT, signal_handler);
1072        if (sigret == SIG_ERR)
1073                rte_exit(EXIT_FAILURE, "signal(%d, ...) failed", SIGINT);
1074
1075        ret = rte_eal_init(argc, argv);
1076        if (ret < 0)
1077                rte_exit(EXIT_FAILURE, "Invalid EAL arguments\n");
1078
1079        argc -= ret;
1080        argv += ret;
1081
1082        /* parse application arguments (after the EAL ones) */
1083        ret = bbdev_parse_args(argc, argv, &app_params);
1084        if (ret < 0)
1085                rte_exit(EXIT_FAILURE, "Invalid BBDEV arguments\n");
1086
1087        /*create bbdev op pools*/
1088        bbdev_op_pools[RTE_BBDEV_OP_TURBO_DEC] =
1089                        rte_bbdev_op_pool_create("bbdev_op_pool_dec",
1090                        RTE_BBDEV_OP_TURBO_DEC, NB_MBUF, 128, rte_socket_id());
1091        bbdev_op_pools[RTE_BBDEV_OP_TURBO_ENC] =
1092                        rte_bbdev_op_pool_create("bbdev_op_pool_enc",
1093                        RTE_BBDEV_OP_TURBO_ENC, NB_MBUF, 128, rte_socket_id());
1094
1095        if ((bbdev_op_pools[RTE_BBDEV_OP_TURBO_DEC] == NULL) ||
1096                        (bbdev_op_pools[RTE_BBDEV_OP_TURBO_ENC] == NULL))
1097                rte_exit(EXIT_FAILURE, "Cannot create bbdev op pools\n");
1098
1099        /* Create encoder to decoder ring */
1100        flags = (app_params.num_enc_cores == 1) ? RING_F_SP_ENQ : 0;
1101        if (app_params.num_dec_cores == 1)
1102                flags |= RING_F_SC_DEQ;
1103
1104        enc_to_dec_ring = rte_ring_create("enc_to_dec_ring",
1105                rte_align32pow2(NB_MBUF), rte_socket_id(), flags);
1106
1107        /* Get the number of available bbdev devices */
1108        nb_bbdevs = rte_bbdev_count();
1109        if (nb_bbdevs <= app_params.bbdev_id)
1110                rte_exit(EXIT_FAILURE,
1111                                "%u BBDevs detected, cannot use BBDev with ID %u!\n",
1112                                nb_bbdevs, app_params.bbdev_id);
1113        printf("Number of bbdevs detected: %d\n", nb_bbdevs);
1114
1115        if (!rte_eth_dev_is_valid_port(app_params.port_id))
1116                rte_exit(EXIT_FAILURE,
1117                                "cannot use port with ID %u!\n",
1118                                app_params.port_id);
1119
1120        /* create the mbuf mempool for ethdev pkts */
1121        ethdev_mbuf_mempool = rte_pktmbuf_pool_create("ethdev_mbuf_pool",
1122                        NB_MBUF, MEMPOOL_CACHE_SIZE, 0,
1123                        RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
1124        if (ethdev_mbuf_mempool == NULL)
1125                rte_exit(EXIT_FAILURE, "Cannot create ethdev mbuf mempool\n");
1126
1127        /* create the mbuf mempool for encoder output */
1128        bbdev_mbuf_mempool = rte_pktmbuf_pool_create("bbdev_mbuf_pool",
1129                        NB_MBUF, MEMPOOL_CACHE_SIZE, 0,
1130                        RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
1131        if (bbdev_mbuf_mempool == NULL)
1132                rte_exit(EXIT_FAILURE, "Cannot create ethdev mbuf mempool\n");
1133
1134        /* register mbuf field to store input pointer */
1135        input_dynfield_offset =
1136                rte_mbuf_dynfield_register(&input_dynfield_desc);
1137        if (input_dynfield_offset < 0)
1138                rte_exit(EXIT_FAILURE, "Cannot register mbuf field\n");
1139
1140        /* initialize ports */
1141        ret = initialize_ports(&app_params, ethdev_mbuf_mempool);
1142
1143        /* Check if all requested lcores are available */
1144        for (lcore_id = 0; lcore_id < 8 * sizeof(uint64_t); ++lcore_id)
1145                if (((1ULL << lcore_id) & app_params.enc_core_mask) ||
1146                                ((1ULL << lcore_id) & app_params.dec_core_mask))
1147                        if (!rte_lcore_is_enabled(lcore_id))
1148                                rte_exit(EXIT_FAILURE,
1149                                                "Requested lcore_id %u is not enabled!\n",
1150                                                lcore_id);
1151
1152        /* Start ethernet port */
1153        ret = rte_eth_dev_start(app_params.port_id);
1154        if (ret < 0)
1155                rte_exit(EXIT_FAILURE, "rte_eth_dev_start:err=%d, port=%u\n",
1156                                ret, app_params.port_id);
1157
1158        ret = check_port_link_status(app_params.port_id);
1159        if (ret < 0)
1160                exit(EXIT_FAILURE);
1161
1162        /* start BBDevice and save BBDev queue IDs */
1163        enable_bbdev(&app_params);
1164
1165        /* Initialize the port/queue configuration of each logical core */
1166        lcore_conf_init(&app_params, lcore_conf, bbdev_op_pools,
1167                        bbdev_mbuf_mempool, enc_to_dec_ring, lcore_stats);
1168
1169        stats_lcore.app_params = &app_params;
1170        stats_lcore.lconf = lcore_conf;
1171
1172        RTE_LCORE_FOREACH_WORKER(lcore_id) {
1173                if (lcore_conf[lcore_id].core_type != 0)
1174                        /* launch per-lcore processing loop on worker lcores */
1175                        rte_eal_remote_launch(processing_loop,
1176                                        &lcore_conf[lcore_id], lcore_id);
1177                else if (!stats_thread_started) {
1178                        /* launch statistics printing loop */
1179                        rte_eal_remote_launch(stats_loop, &stats_lcore,
1180                                        lcore_id);
1181                        stats_thread_started = true;
1182                }
1183        }
1184
1185        if (!stats_thread_started &&
1186                        lcore_conf[main_lcore_id].core_type != 0)
1187                rte_exit(EXIT_FAILURE,
1188                                "Not enough lcores to run the statistics printing loop!");
1189        else if (lcore_conf[main_lcore_id].core_type != 0)
1190                processing_loop(&lcore_conf[main_lcore_id]);
1191        else if (!stats_thread_started)
1192                stats_loop(&stats_lcore);
1193
1194        RTE_LCORE_FOREACH_WORKER(lcore_id) {
1195                ret |= rte_eal_wait_lcore(lcore_id);
1196        }
1197
1198        return ret;
1199}
1200