dpdk/app/test-eventdev/test_pipeline_atq.c
<<
>>
Prefs
   1/*
   2 * SPDX-License-Identifier: BSD-3-Clause
   3 * Copyright 2017 Cavium, Inc.
   4 */
   5
   6#include "test_pipeline_common.h"
   7
   8/* See http://doc.dpdk.org/guides/tools/testeventdev.html for test details */
   9
  10static __rte_always_inline int
  11pipeline_atq_nb_event_queues(struct evt_options *opt)
  12{
  13        RTE_SET_USED(opt);
  14
  15        return rte_eth_dev_count_avail();
  16}
  17
  18typedef int (*pipeline_atq_worker_t)(void *arg);
  19
  20static __rte_noinline int
  21pipeline_atq_worker_single_stage_tx(void *arg)
  22{
  23        PIPELINE_WORKER_SINGLE_STAGE_INIT;
  24
  25        while (t->done == false) {
  26                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
  27
  28                if (!event) {
  29                        rte_pause();
  30                        continue;
  31                }
  32
  33                pipeline_event_tx(dev, port, &ev);
  34                w->processed_pkts++;
  35        }
  36
  37        return 0;
  38}
  39
  40static __rte_noinline int
  41pipeline_atq_worker_single_stage_fwd(void *arg)
  42{
  43        PIPELINE_WORKER_SINGLE_STAGE_INIT;
  44        const uint8_t *tx_queue = t->tx_evqueue_id;
  45
  46        while (t->done == false) {
  47                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
  48
  49                if (!event) {
  50                        rte_pause();
  51                        continue;
  52                }
  53
  54                ev.queue_id = tx_queue[ev.mbuf->port];
  55                pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
  56                pipeline_event_enqueue(dev, port, &ev);
  57                w->processed_pkts++;
  58        }
  59
  60        return 0;
  61}
  62
  63static __rte_noinline int
  64pipeline_atq_worker_single_stage_burst_tx(void *arg)
  65{
  66        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
  67
  68        while (t->done == false) {
  69                uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
  70                                BURST_SIZE, 0);
  71
  72                if (!nb_rx) {
  73                        rte_pause();
  74                        continue;
  75                }
  76
  77                for (i = 0; i < nb_rx; i++) {
  78                        rte_prefetch0(ev[i + 1].mbuf);
  79                        rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
  80                }
  81
  82                pipeline_event_tx_burst(dev, port, ev, nb_rx);
  83                w->processed_pkts += nb_rx;
  84        }
  85
  86        return 0;
  87}
  88
  89static __rte_noinline int
  90pipeline_atq_worker_single_stage_burst_fwd(void *arg)
  91{
  92        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
  93        const uint8_t *tx_queue = t->tx_evqueue_id;
  94
  95        while (t->done == false) {
  96                uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
  97                                BURST_SIZE, 0);
  98
  99                if (!nb_rx) {
 100                        rte_pause();
 101                        continue;
 102                }
 103
 104                for (i = 0; i < nb_rx; i++) {
 105                        rte_prefetch0(ev[i + 1].mbuf);
 106                        rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
 107                        ev[i].queue_id = tx_queue[ev[i].mbuf->port];
 108                        pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
 109                }
 110
 111                pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 112                w->processed_pkts += nb_rx;
 113        }
 114
 115        return 0;
 116}
 117
 118static __rte_noinline int
 119pipeline_atq_worker_single_stage_tx_vector(void *arg)
 120{
 121        PIPELINE_WORKER_SINGLE_STAGE_INIT;
 122        uint16_t vector_sz;
 123
 124        while (!t->done) {
 125                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 126
 127                if (!event) {
 128                        rte_pause();
 129                        continue;
 130                }
 131                vector_sz = ev.vec->nb_elem;
 132                pipeline_event_tx_vector(dev, port, &ev);
 133                w->processed_pkts += vector_sz;
 134        }
 135
 136        return 0;
 137}
 138
 139static __rte_noinline int
 140pipeline_atq_worker_single_stage_fwd_vector(void *arg)
 141{
 142        PIPELINE_WORKER_SINGLE_STAGE_INIT;
 143        const uint8_t *tx_queue = t->tx_evqueue_id;
 144        uint16_t vector_sz;
 145
 146        while (!t->done) {
 147                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 148
 149                if (!event) {
 150                        rte_pause();
 151                        continue;
 152                }
 153
 154                vector_sz = ev.vec->nb_elem;
 155                ev.queue_id = tx_queue[ev.vec->port];
 156                ev.vec->queue = 0;
 157                pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
 158                pipeline_event_enqueue(dev, port, &ev);
 159                w->processed_pkts += vector_sz;
 160        }
 161
 162        return 0;
 163}
 164
 165static __rte_noinline int
 166pipeline_atq_worker_single_stage_burst_tx_vector(void *arg)
 167{
 168        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
 169        uint16_t vector_sz;
 170
 171        while (!t->done) {
 172                uint16_t nb_rx =
 173                        rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 174
 175                if (!nb_rx) {
 176                        rte_pause();
 177                        continue;
 178                }
 179                vector_sz = 0;
 180                for (i = 0; i < nb_rx; i++) {
 181                        vector_sz += ev[i].vec->nb_elem;
 182                        ev[i].vec->queue = 0;
 183                }
 184
 185                pipeline_event_tx_burst(dev, port, ev, nb_rx);
 186                w->processed_pkts += vector_sz;
 187        }
 188
 189        return 0;
 190}
 191
 192static __rte_noinline int
 193pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg)
 194{
 195        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
 196        const uint8_t *tx_queue = t->tx_evqueue_id;
 197        uint16_t vector_sz;
 198
 199        while (!t->done) {
 200                uint16_t nb_rx =
 201                        rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 202
 203                if (!nb_rx) {
 204                        rte_pause();
 205                        continue;
 206                }
 207
 208                vector_sz = 0;
 209                for (i = 0; i < nb_rx; i++) {
 210                        ev[i].queue_id = tx_queue[ev[i].vec->port];
 211                        ev[i].vec->queue = 0;
 212                        vector_sz += ev[i].vec->nb_elem;
 213                        pipeline_fwd_event_vector(&ev[i],
 214                                                  RTE_SCHED_TYPE_ATOMIC);
 215                }
 216
 217                pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 218                w->processed_pkts += vector_sz;
 219        }
 220
 221        return 0;
 222}
 223
 224static __rte_noinline int
 225pipeline_atq_worker_multi_stage_tx(void *arg)
 226{
 227        PIPELINE_WORKER_MULTI_STAGE_INIT;
 228
 229        while (t->done == false) {
 230                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 231
 232                if (!event) {
 233                        rte_pause();
 234                        continue;
 235                }
 236
 237                cq_id = ev.sub_event_type % nb_stages;
 238
 239                if (cq_id == last_queue) {
 240                        pipeline_event_tx(dev, port, &ev);
 241                        w->processed_pkts++;
 242                        continue;
 243                }
 244
 245                ev.sub_event_type++;
 246                pipeline_fwd_event(&ev, sched_type_list[cq_id]);
 247                pipeline_event_enqueue(dev, port, &ev);
 248        }
 249
 250        return 0;
 251}
 252
 253static __rte_noinline int
 254pipeline_atq_worker_multi_stage_fwd(void *arg)
 255{
 256        PIPELINE_WORKER_MULTI_STAGE_INIT;
 257        const uint8_t *tx_queue = t->tx_evqueue_id;
 258
 259        while (t->done == false) {
 260                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 261
 262                if (!event) {
 263                        rte_pause();
 264                        continue;
 265                }
 266
 267                cq_id = ev.sub_event_type % nb_stages;
 268
 269                if (cq_id == last_queue) {
 270                        ev.queue_id = tx_queue[ev.mbuf->port];
 271                        pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
 272                        w->processed_pkts++;
 273                } else {
 274                        ev.sub_event_type++;
 275                        pipeline_fwd_event(&ev, sched_type_list[cq_id]);
 276                }
 277
 278                pipeline_event_enqueue(dev, port, &ev);
 279        }
 280
 281        return 0;
 282}
 283
 284static __rte_noinline int
 285pipeline_atq_worker_multi_stage_burst_tx(void *arg)
 286{
 287        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
 288
 289        while (t->done == false) {
 290                uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
 291                                BURST_SIZE, 0);
 292
 293                if (!nb_rx) {
 294                        rte_pause();
 295                        continue;
 296                }
 297
 298                for (i = 0; i < nb_rx; i++) {
 299                        rte_prefetch0(ev[i + 1].mbuf);
 300                        cq_id = ev[i].sub_event_type % nb_stages;
 301
 302                        if (cq_id == last_queue) {
 303                                pipeline_event_tx(dev, port, &ev[i]);
 304                                ev[i].op = RTE_EVENT_OP_RELEASE;
 305                                w->processed_pkts++;
 306                                continue;
 307                        }
 308
 309                        ev[i].sub_event_type++;
 310                        pipeline_fwd_event(&ev[i], sched_type_list[cq_id]);
 311                }
 312
 313                pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 314        }
 315
 316        return 0;
 317}
 318
 319static __rte_noinline int
 320pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
 321{
 322        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
 323        const uint8_t *tx_queue = t->tx_evqueue_id;
 324
 325        while (t->done == false) {
 326                uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
 327                                BURST_SIZE, 0);
 328
 329                if (!nb_rx) {
 330                        rte_pause();
 331                        continue;
 332                }
 333
 334                for (i = 0; i < nb_rx; i++) {
 335                        rte_prefetch0(ev[i + 1].mbuf);
 336                        cq_id = ev[i].sub_event_type % nb_stages;
 337
 338                        if (cq_id == last_queue) {
 339                                w->processed_pkts++;
 340                                ev[i].queue_id = tx_queue[ev[i].mbuf->port];
 341                                pipeline_fwd_event(&ev[i],
 342                                                RTE_SCHED_TYPE_ATOMIC);
 343                        } else {
 344                                ev[i].sub_event_type++;
 345                                pipeline_fwd_event(&ev[i],
 346                                                sched_type_list[cq_id]);
 347                        }
 348                }
 349
 350                pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 351        }
 352
 353        return 0;
 354}
 355
 356static __rte_noinline int
 357pipeline_atq_worker_multi_stage_tx_vector(void *arg)
 358{
 359        PIPELINE_WORKER_MULTI_STAGE_INIT;
 360        uint16_t vector_sz;
 361
 362        while (!t->done) {
 363                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 364
 365                if (!event) {
 366                        rte_pause();
 367                        continue;
 368                }
 369
 370                cq_id = ev.sub_event_type % nb_stages;
 371
 372                if (cq_id == last_queue) {
 373                        vector_sz = ev.vec->nb_elem;
 374                        pipeline_event_tx_vector(dev, port, &ev);
 375                        w->processed_pkts += vector_sz;
 376                        continue;
 377                }
 378
 379                ev.sub_event_type++;
 380                pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
 381                pipeline_event_enqueue(dev, port, &ev);
 382        }
 383
 384        return 0;
 385}
 386
 387static __rte_noinline int
 388pipeline_atq_worker_multi_stage_fwd_vector(void *arg)
 389{
 390        PIPELINE_WORKER_MULTI_STAGE_INIT;
 391        const uint8_t *tx_queue = t->tx_evqueue_id;
 392        uint16_t vector_sz;
 393
 394        while (!t->done) {
 395                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 396
 397                if (!event) {
 398                        rte_pause();
 399                        continue;
 400                }
 401
 402                cq_id = ev.sub_event_type % nb_stages;
 403
 404                if (cq_id == last_queue) {
 405                        ev.queue_id = tx_queue[ev.vec->port];
 406                        ev.vec->queue = 0;
 407                        vector_sz = ev.vec->nb_elem;
 408                        pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
 409                        pipeline_event_enqueue(dev, port, &ev);
 410                        w->processed_pkts += vector_sz;
 411                } else {
 412                        ev.sub_event_type++;
 413                        pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
 414                        pipeline_event_enqueue(dev, port, &ev);
 415                }
 416        }
 417
 418        return 0;
 419}
 420
 421static __rte_noinline int
 422pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
 423{
 424        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
 425        uint16_t vector_sz;
 426
 427        while (!t->done) {
 428                uint16_t nb_rx =
 429                        rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 430
 431                if (!nb_rx) {
 432                        rte_pause();
 433                        continue;
 434                }
 435
 436                for (i = 0; i < nb_rx; i++) {
 437                        cq_id = ev[i].sub_event_type % nb_stages;
 438
 439                        if (cq_id == last_queue) {
 440                                vector_sz = ev[i].vec->nb_elem;
 441                                pipeline_event_tx_vector(dev, port, &ev[i]);
 442                                ev[i].op = RTE_EVENT_OP_RELEASE;
 443                                w->processed_pkts += vector_sz;
 444                                continue;
 445                        }
 446
 447                        ev[i].sub_event_type++;
 448                        pipeline_fwd_event_vector(&ev[i],
 449                                                  sched_type_list[cq_id]);
 450                }
 451
 452                pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 453        }
 454
 455        return 0;
 456}
 457
 458static __rte_noinline int
 459pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg)
 460{
 461        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
 462        const uint8_t *tx_queue = t->tx_evqueue_id;
 463        uint16_t vector_sz;
 464
 465        while (!t->done) {
 466                uint16_t nb_rx =
 467                        rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 468
 469                if (!nb_rx) {
 470                        rte_pause();
 471                        continue;
 472                }
 473
 474                for (i = 0; i < nb_rx; i++) {
 475                        cq_id = ev[i].sub_event_type % nb_stages;
 476
 477                        if (cq_id == last_queue) {
 478                                vector_sz = ev[i].vec->nb_elem;
 479                                ev[i].queue_id = tx_queue[ev[i].vec->port];
 480                                ev[i].vec->queue = 0;
 481                                pipeline_fwd_event_vector(
 482                                        &ev[i], RTE_SCHED_TYPE_ATOMIC);
 483                                w->processed_pkts += vector_sz;
 484                        } else {
 485                                ev[i].sub_event_type++;
 486                                pipeline_fwd_event_vector(
 487                                        &ev[i], sched_type_list[cq_id]);
 488                        }
 489                }
 490
 491                pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 492        }
 493
 494        return 0;
 495}
 496
 497static int
 498worker_wrapper(void *arg)
 499{
 500        struct worker_data *w  = arg;
 501        struct evt_options *opt = w->t->opt;
 502        const bool burst = evt_has_burst_mode(w->dev_id);
 503        const bool internal_port = w->t->internal_port;
 504        const uint8_t nb_stages = opt->nb_stages;
 505        /*vector/burst/internal_port*/
 506        const pipeline_atq_worker_t
 507        pipeline_atq_worker_single_stage[2][2][2] = {
 508                [0][0][0] = pipeline_atq_worker_single_stage_fwd,
 509                [0][0][1] = pipeline_atq_worker_single_stage_tx,
 510                [0][1][0] = pipeline_atq_worker_single_stage_burst_fwd,
 511                [0][1][1] = pipeline_atq_worker_single_stage_burst_tx,
 512                [1][0][0] = pipeline_atq_worker_single_stage_fwd_vector,
 513                [1][0][1] = pipeline_atq_worker_single_stage_tx_vector,
 514                [1][1][0] = pipeline_atq_worker_single_stage_burst_fwd_vector,
 515                [1][1][1] = pipeline_atq_worker_single_stage_burst_tx_vector,
 516        };
 517        const pipeline_atq_worker_t
 518        pipeline_atq_worker_multi_stage[2][2][2] = {
 519                [0][0][0] = pipeline_atq_worker_multi_stage_fwd,
 520                [0][0][1] = pipeline_atq_worker_multi_stage_tx,
 521                [0][1][0] = pipeline_atq_worker_multi_stage_burst_fwd,
 522                [0][1][1] = pipeline_atq_worker_multi_stage_burst_tx,
 523                [1][0][0] = pipeline_atq_worker_multi_stage_fwd_vector,
 524                [1][0][1] = pipeline_atq_worker_multi_stage_tx_vector,
 525                [1][1][0] = pipeline_atq_worker_multi_stage_burst_fwd_vector,
 526                [1][1][1] = pipeline_atq_worker_multi_stage_burst_tx_vector,
 527        };
 528
 529        if (nb_stages == 1)
 530                return (pipeline_atq_worker_single_stage[opt->ena_vector][burst]
 531                                                        [internal_port])(arg);
 532        else
 533                return (pipeline_atq_worker_multi_stage[opt->ena_vector][burst]
 534                                                       [internal_port])(arg);
 535
 536        rte_panic("invalid worker\n");
 537}
 538
 539static int
 540pipeline_atq_launch_lcores(struct evt_test *test, struct evt_options *opt)
 541{
 542        return pipeline_launch_lcores(test, opt, worker_wrapper);
 543}
 544
 545static int
 546pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
 547{
 548        int ret;
 549        int nb_ports;
 550        int nb_queues;
 551        uint8_t queue, is_prod;
 552        uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS];
 553        uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
 554        uint8_t nb_worker_queues = 0;
 555        uint8_t tx_evport_id = 0;
 556        uint16_t prod = 0;
 557        struct rte_event_dev_info info;
 558        struct test_pipeline *t = evt_test_priv(test);
 559
 560        nb_ports = evt_nr_active_lcores(opt->wlcores);
 561        nb_queues = rte_eth_dev_count_avail();
 562
 563        memset(tx_evqueue_id, 0, sizeof(uint8_t) * RTE_MAX_ETHPORTS);
 564        memset(queue_arr, 0, sizeof(uint8_t) * RTE_EVENT_MAX_QUEUES_PER_DEV);
 565        /* One queue for Tx adapter per port */
 566        if (!t->internal_port) {
 567                RTE_ETH_FOREACH_DEV(prod) {
 568                        tx_evqueue_id[prod] = nb_queues;
 569                        nb_queues++;
 570                }
 571        }
 572
 573        rte_event_dev_info_get(opt->dev_id, &info);
 574
 575        ret = evt_configure_eventdev(opt, nb_queues, nb_ports);
 576        if (ret) {
 577                evt_err("failed to configure eventdev %d", opt->dev_id);
 578                return ret;
 579        }
 580
 581        struct rte_event_queue_conf q_conf = {
 582                .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
 583                .nb_atomic_flows = opt->nb_flows,
 584                .nb_atomic_order_sequences = opt->nb_flows,
 585        };
 586        /* queue configurations */
 587        for (queue = 0; queue < nb_queues; queue++) {
 588                q_conf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ALL_TYPES;
 589
 590                if (!t->internal_port) {
 591                        is_prod = false;
 592                        RTE_ETH_FOREACH_DEV(prod) {
 593                                if (queue == tx_evqueue_id[prod]) {
 594                                        q_conf.event_queue_cfg =
 595                                                RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
 596                                        is_prod = true;
 597                                        break;
 598                                }
 599                        }
 600                        if (!is_prod) {
 601                                queue_arr[nb_worker_queues] = queue;
 602                                nb_worker_queues++;
 603                        }
 604                }
 605
 606                ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
 607                if (ret) {
 608                        evt_err("failed to setup queue=%d", queue);
 609                        return ret;
 610                }
 611        }
 612
 613        if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth)
 614                opt->wkr_deq_dep = info.max_event_port_dequeue_depth;
 615
 616        /* port configuration */
 617        const struct rte_event_port_conf p_conf = {
 618                .dequeue_depth = opt->wkr_deq_dep,
 619                .enqueue_depth = info.max_event_port_dequeue_depth,
 620                .new_event_threshold = info.max_num_events,
 621        };
 622
 623        if (!t->internal_port)
 624                ret = pipeline_event_port_setup(test, opt, queue_arr,
 625                                nb_worker_queues, p_conf);
 626        else
 627                ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
 628                                p_conf);
 629
 630        if (ret)
 631                return ret;
 632
 633        /*
 634         * The pipelines are setup in the following manner:
 635         *
 636         * eth_dev_count = 2, nb_stages = 2, atq mode
 637         *
 638         * eth0, eth1 have Internal port capability :
 639         *      queues = 2
 640         *      stride = 1
 641         *
 642         *      event queue pipelines:
 643         *      eth0 -> q0 ->Tx
 644         *      eth1 -> q1 ->Tx
 645         *
 646         *      q0, q1 are configured as ATQ so, all the different stages can
 647         *      be enqueued on the same queue.
 648         *
 649         * eth0, eth1 use Tx adapters service core :
 650         *      queues = 4
 651         *      stride = 1
 652         *
 653         *      event queue pipelines:
 654         *      eth0 -> q0  -> q2 -> Tx
 655         *      eth1 -> q1  -> q3 -> Tx
 656         *
 657         *      q0, q1 are configured as stated above.
 658         *      q2, q3 configured as SINGLE_LINK.
 659         */
 660        ret = pipeline_event_rx_adapter_setup(opt, 1, p_conf);
 661        if (ret)
 662                return ret;
 663        ret = pipeline_event_tx_adapter_setup(opt, p_conf);
 664        if (ret)
 665                return ret;
 666
 667        if (!evt_has_distributed_sched(opt->dev_id)) {
 668                uint32_t service_id;
 669                rte_event_dev_service_id_get(opt->dev_id, &service_id);
 670                ret = evt_service_setup(service_id);
 671                if (ret) {
 672                        evt_err("No service lcore found to run event dev.");
 673                        return ret;
 674                }
 675        }
 676
 677        /* Connect the tx_evqueue_id to the Tx adapter port */
 678        if (!t->internal_port) {
 679                RTE_ETH_FOREACH_DEV(prod) {
 680                        ret = rte_event_eth_tx_adapter_event_port_get(prod,
 681                                        &tx_evport_id);
 682                        if (ret) {
 683                                evt_err("Unable to get Tx adapter[%d]", prod);
 684                                return ret;
 685                        }
 686
 687                        if (rte_event_port_link(opt->dev_id, tx_evport_id,
 688                                                &tx_evqueue_id[prod],
 689                                                NULL, 1) != 1) {
 690                                evt_err("Unable to link Tx adptr[%d] evprt[%d]",
 691                                                prod, tx_evport_id);
 692                                return ret;
 693                        }
 694                }
 695        }
 696
 697        ret = rte_event_dev_start(opt->dev_id);
 698        if (ret) {
 699                evt_err("failed to start eventdev %d", opt->dev_id);
 700                return ret;
 701        }
 702
 703
 704        RTE_ETH_FOREACH_DEV(prod) {
 705                ret = rte_eth_dev_start(prod);
 706                if (ret) {
 707                        evt_err("Ethernet dev [%d] failed to start."
 708                                        " Using synthetic producer", prod);
 709                        return ret;
 710                }
 711        }
 712
 713        RTE_ETH_FOREACH_DEV(prod) {
 714                ret = rte_event_eth_rx_adapter_start(prod);
 715                if (ret) {
 716                        evt_err("Rx adapter[%d] start failed", prod);
 717                        return ret;
 718                }
 719
 720                ret = rte_event_eth_tx_adapter_start(prod);
 721                if (ret) {
 722                        evt_err("Tx adapter[%d] start failed", prod);
 723                        return ret;
 724                }
 725        }
 726
 727        memcpy(t->tx_evqueue_id, tx_evqueue_id, sizeof(uint8_t) *
 728                        RTE_MAX_ETHPORTS);
 729
 730        return 0;
 731}
 732
 733static void
 734pipeline_atq_opt_dump(struct evt_options *opt)
 735{
 736        pipeline_opt_dump(opt, pipeline_atq_nb_event_queues(opt));
 737}
 738
 739static int
 740pipeline_atq_opt_check(struct evt_options *opt)
 741{
 742        return pipeline_opt_check(opt, pipeline_atq_nb_event_queues(opt));
 743}
 744
 745static bool
 746pipeline_atq_capability_check(struct evt_options *opt)
 747{
 748        struct rte_event_dev_info dev_info;
 749
 750        rte_event_dev_info_get(opt->dev_id, &dev_info);
 751        if (dev_info.max_event_queues < pipeline_atq_nb_event_queues(opt) ||
 752                        dev_info.max_event_ports <
 753                        evt_nr_active_lcores(opt->wlcores)) {
 754                evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
 755                        pipeline_atq_nb_event_queues(opt),
 756                        dev_info.max_event_queues,
 757                        evt_nr_active_lcores(opt->wlcores),
 758                        dev_info.max_event_ports);
 759        }
 760        if (!evt_has_all_types_queue(opt->dev_id))
 761                return false;
 762
 763        return true;
 764}
 765
 766static const struct evt_test_ops pipeline_atq =  {
 767        .cap_check          = pipeline_atq_capability_check,
 768        .opt_check          = pipeline_atq_opt_check,
 769        .opt_dump           = pipeline_atq_opt_dump,
 770        .test_setup         = pipeline_test_setup,
 771        .mempool_setup      = pipeline_mempool_setup,
 772        .ethdev_setup       = pipeline_ethdev_setup,
 773        .eventdev_setup     = pipeline_atq_eventdev_setup,
 774        .launch_lcores      = pipeline_atq_launch_lcores,
 775        .eventdev_destroy   = pipeline_eventdev_destroy,
 776        .mempool_destroy    = pipeline_mempool_destroy,
 777        .ethdev_destroy     = pipeline_ethdev_destroy,
 778        .test_result        = pipeline_test_result,
 779        .test_destroy       = pipeline_test_destroy,
 780};
 781
 782EVT_TEST_REGISTER(pipeline_atq);
 783