dpdk/app/test-eventdev/test_pipeline_queue.c
<<
>>
Prefs
   1/*
   2 * SPDX-License-Identifier: BSD-3-Clause
   3 * Copyright 2017 Cavium, Inc.
   4 */
   5
   6#include "test_pipeline_common.h"
   7
   8/* See http://doc.dpdk.org/guides/tools/testeventdev.html for test details */
   9
  10static __rte_always_inline int
  11pipeline_queue_nb_event_queues(struct evt_options *opt)
  12{
  13        uint16_t eth_count = rte_eth_dev_count_avail();
  14
  15        return (eth_count * opt->nb_stages) + eth_count;
  16}
  17
  18typedef int (*pipeline_queue_worker_t)(void *arg);
  19
  20static __rte_noinline int
  21pipeline_queue_worker_single_stage_tx(void *arg)
  22{
  23        PIPELINE_WORKER_SINGLE_STAGE_INIT;
  24
  25        while (t->done == false) {
  26                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
  27
  28                if (!event) {
  29                        rte_pause();
  30                        continue;
  31                }
  32
  33                if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
  34                        pipeline_event_tx(dev, port, &ev);
  35                        w->processed_pkts++;
  36                } else {
  37                        ev.queue_id++;
  38                        pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
  39                        pipeline_event_enqueue(dev, port, &ev);
  40                }
  41        }
  42
  43        return 0;
  44}
  45
  46static __rte_noinline int
  47pipeline_queue_worker_single_stage_fwd(void *arg)
  48{
  49        PIPELINE_WORKER_SINGLE_STAGE_INIT;
  50        const uint8_t *tx_queue = t->tx_evqueue_id;
  51
  52        while (t->done == false) {
  53                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
  54
  55                if (!event) {
  56                        rte_pause();
  57                        continue;
  58                }
  59
  60                ev.queue_id = tx_queue[ev.mbuf->port];
  61                rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
  62                pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
  63                pipeline_event_enqueue(dev, port, &ev);
  64                w->processed_pkts++;
  65        }
  66
  67        return 0;
  68}
  69
  70static __rte_noinline int
  71pipeline_queue_worker_single_stage_burst_tx(void *arg)
  72{
  73        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
  74
  75        while (t->done == false) {
  76                uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
  77                                BURST_SIZE, 0);
  78
  79                if (!nb_rx) {
  80                        rte_pause();
  81                        continue;
  82                }
  83
  84                for (i = 0; i < nb_rx; i++) {
  85                        rte_prefetch0(ev[i + 1].mbuf);
  86                        if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
  87                                pipeline_event_tx(dev, port, &ev[i]);
  88                                w->processed_pkts++;
  89                        } else {
  90                                ev[i].queue_id++;
  91                                pipeline_fwd_event(&ev[i],
  92                                                RTE_SCHED_TYPE_ATOMIC);
  93                                pipeline_event_enqueue_burst(dev, port, ev,
  94                                                nb_rx);
  95                        }
  96                }
  97        }
  98
  99        return 0;
 100}
 101
 102static __rte_noinline int
 103pipeline_queue_worker_single_stage_burst_fwd(void *arg)
 104{
 105        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
 106        const uint8_t *tx_queue = t->tx_evqueue_id;
 107
 108        while (t->done == false) {
 109                uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
 110                                BURST_SIZE, 0);
 111
 112                if (!nb_rx) {
 113                        rte_pause();
 114                        continue;
 115                }
 116
 117                for (i = 0; i < nb_rx; i++) {
 118                        rte_prefetch0(ev[i + 1].mbuf);
 119                        ev[i].queue_id = tx_queue[ev[i].mbuf->port];
 120                        rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
 121                        pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
 122                }
 123
 124                pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 125                w->processed_pkts += nb_rx;
 126        }
 127
 128        return 0;
 129}
 130
 131static __rte_noinline int
 132pipeline_queue_worker_single_stage_tx_vector(void *arg)
 133{
 134        PIPELINE_WORKER_SINGLE_STAGE_INIT;
 135        uint16_t vector_sz;
 136
 137        while (!t->done) {
 138                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 139
 140                if (!event) {
 141                        rte_pause();
 142                        continue;
 143                }
 144
 145                if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
 146                        vector_sz = ev.vec->nb_elem;
 147                        pipeline_event_tx_vector(dev, port, &ev);
 148                        w->processed_pkts += vector_sz;
 149                } else {
 150                        ev.queue_id++;
 151                        pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
 152                        pipeline_event_enqueue(dev, port, &ev);
 153                }
 154        }
 155
 156        return 0;
 157}
 158
 159static __rte_noinline int
 160pipeline_queue_worker_single_stage_fwd_vector(void *arg)
 161{
 162        PIPELINE_WORKER_SINGLE_STAGE_INIT;
 163        const uint8_t *tx_queue = t->tx_evqueue_id;
 164        uint16_t vector_sz;
 165
 166        while (!t->done) {
 167                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 168
 169                if (!event) {
 170                        rte_pause();
 171                        continue;
 172                }
 173
 174                ev.queue_id = tx_queue[ev.vec->port];
 175                ev.vec->queue = 0;
 176                vector_sz = ev.vec->nb_elem;
 177                pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
 178                pipeline_event_enqueue(dev, port, &ev);
 179                w->processed_pkts += vector_sz;
 180        }
 181
 182        return 0;
 183}
 184
 185static __rte_noinline int
 186pipeline_queue_worker_single_stage_burst_tx_vector(void *arg)
 187{
 188        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
 189        uint16_t vector_sz;
 190
 191        while (!t->done) {
 192                uint16_t nb_rx =
 193                        rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 194
 195                if (!nb_rx) {
 196                        rte_pause();
 197                        continue;
 198                }
 199
 200                for (i = 0; i < nb_rx; i++) {
 201                        if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
 202                                vector_sz = ev[i].vec->nb_elem;
 203                                pipeline_event_tx_vector(dev, port, &ev[i]);
 204                                ev[i].op = RTE_EVENT_OP_RELEASE;
 205                                w->processed_pkts += vector_sz;
 206                        } else {
 207                                ev[i].queue_id++;
 208                                pipeline_fwd_event_vector(
 209                                        &ev[i], RTE_SCHED_TYPE_ATOMIC);
 210                        }
 211                }
 212
 213                pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 214        }
 215
 216        return 0;
 217}
 218
 219static __rte_noinline int
 220pipeline_queue_worker_single_stage_burst_fwd_vector(void *arg)
 221{
 222        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
 223        const uint8_t *tx_queue = t->tx_evqueue_id;
 224        uint16_t vector_sz;
 225
 226        while (!t->done) {
 227                uint16_t nb_rx =
 228                        rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 229
 230                if (!nb_rx) {
 231                        rte_pause();
 232                        continue;
 233                }
 234
 235                vector_sz = 0;
 236                for (i = 0; i < nb_rx; i++) {
 237                        ev[i].queue_id = tx_queue[ev[i].vec->port];
 238                        ev[i].vec->queue = 0;
 239                        vector_sz += ev[i].vec->nb_elem;
 240                        pipeline_fwd_event_vector(&ev[i],
 241                                                  RTE_SCHED_TYPE_ATOMIC);
 242                }
 243
 244                pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 245                w->processed_pkts += vector_sz;
 246        }
 247
 248        return 0;
 249}
 250
 251static __rte_noinline int
 252pipeline_queue_worker_multi_stage_tx(void *arg)
 253{
 254        PIPELINE_WORKER_MULTI_STAGE_INIT;
 255        const uint8_t *tx_queue = t->tx_evqueue_id;
 256
 257        while (t->done == false) {
 258                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 259
 260                if (!event) {
 261                        rte_pause();
 262                        continue;
 263                }
 264
 265                cq_id = ev.queue_id % nb_stages;
 266
 267                if (ev.queue_id == tx_queue[ev.mbuf->port]) {
 268                        pipeline_event_tx(dev, port, &ev);
 269                        w->processed_pkts++;
 270                        continue;
 271                }
 272
 273                ev.queue_id++;
 274                pipeline_fwd_event(&ev, cq_id != last_queue ?
 275                                sched_type_list[cq_id] :
 276                                RTE_SCHED_TYPE_ATOMIC);
 277                pipeline_event_enqueue(dev, port, &ev);
 278        }
 279
 280        return 0;
 281}
 282
 283static __rte_noinline int
 284pipeline_queue_worker_multi_stage_fwd(void *arg)
 285{
 286        PIPELINE_WORKER_MULTI_STAGE_INIT;
 287        const uint8_t *tx_queue = t->tx_evqueue_id;
 288
 289        while (t->done == false) {
 290                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 291
 292                if (!event) {
 293                        rte_pause();
 294                        continue;
 295                }
 296
 297                cq_id = ev.queue_id % nb_stages;
 298
 299                if (cq_id == last_queue) {
 300                        ev.queue_id = tx_queue[ev.mbuf->port];
 301                        rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
 302                        pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
 303                        pipeline_event_enqueue(dev, port, &ev);
 304                        w->processed_pkts++;
 305                } else {
 306                        ev.queue_id++;
 307                        pipeline_fwd_event(&ev, sched_type_list[cq_id]);
 308                        pipeline_event_enqueue(dev, port, &ev);
 309                }
 310        }
 311
 312        return 0;
 313}
 314
 315static __rte_noinline int
 316pipeline_queue_worker_multi_stage_burst_tx(void *arg)
 317{
 318        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
 319        const uint8_t *tx_queue = t->tx_evqueue_id;
 320
 321        while (t->done == false) {
 322                uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
 323                                BURST_SIZE, 0);
 324
 325                if (!nb_rx) {
 326                        rte_pause();
 327                        continue;
 328                }
 329
 330                for (i = 0; i < nb_rx; i++) {
 331                        rte_prefetch0(ev[i + 1].mbuf);
 332                        cq_id = ev[i].queue_id % nb_stages;
 333
 334                        if (ev[i].queue_id == tx_queue[ev[i].mbuf->port]) {
 335                                pipeline_event_tx(dev, port, &ev[i]);
 336                                w->processed_pkts++;
 337                                continue;
 338                        }
 339
 340                        ev[i].queue_id++;
 341                        pipeline_fwd_event(&ev[i], cq_id != last_queue ?
 342                                        sched_type_list[cq_id] :
 343                                        RTE_SCHED_TYPE_ATOMIC);
 344                        pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 345                }
 346        }
 347
 348        return 0;
 349}
 350
 351static __rte_noinline int
 352pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
 353{
 354        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
 355        const uint8_t *tx_queue = t->tx_evqueue_id;
 356
 357        while (t->done == false) {
 358                uint16_t processed_pkts = 0;
 359                uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
 360                                BURST_SIZE, 0);
 361
 362                if (!nb_rx) {
 363                        rte_pause();
 364                        continue;
 365                }
 366
 367                for (i = 0; i < nb_rx; i++) {
 368                        rte_prefetch0(ev[i + 1].mbuf);
 369                        cq_id = ev[i].queue_id % nb_stages;
 370
 371                        if (cq_id == last_queue) {
 372                                ev[i].queue_id = tx_queue[ev[i].mbuf->port];
 373                                rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
 374                                pipeline_fwd_event(&ev[i],
 375                                                RTE_SCHED_TYPE_ATOMIC);
 376                                processed_pkts++;
 377                        } else {
 378                                ev[i].queue_id++;
 379                                pipeline_fwd_event(&ev[i],
 380                                                sched_type_list[cq_id]);
 381                        }
 382                }
 383
 384                pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 385                w->processed_pkts += processed_pkts;
 386        }
 387
 388        return 0;
 389}
 390
 391static __rte_noinline int
 392pipeline_queue_worker_multi_stage_tx_vector(void *arg)
 393{
 394        PIPELINE_WORKER_MULTI_STAGE_INIT;
 395        const uint8_t *tx_queue = t->tx_evqueue_id;
 396        uint16_t vector_sz;
 397
 398        while (!t->done) {
 399                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 400
 401                if (!event) {
 402                        rte_pause();
 403                        continue;
 404                }
 405
 406                cq_id = ev.queue_id % nb_stages;
 407
 408                if (ev.queue_id == tx_queue[ev.vec->port]) {
 409                        vector_sz = ev.vec->nb_elem;
 410                        pipeline_event_tx_vector(dev, port, &ev);
 411                        w->processed_pkts += vector_sz;
 412                        continue;
 413                }
 414
 415                ev.queue_id++;
 416                pipeline_fwd_event_vector(&ev, cq_id != last_queue
 417                                                       ? sched_type_list[cq_id]
 418                                                       : RTE_SCHED_TYPE_ATOMIC);
 419                pipeline_event_enqueue(dev, port, &ev);
 420        }
 421
 422        return 0;
 423}
 424
 425static __rte_noinline int
 426pipeline_queue_worker_multi_stage_fwd_vector(void *arg)
 427{
 428        PIPELINE_WORKER_MULTI_STAGE_INIT;
 429        const uint8_t *tx_queue = t->tx_evqueue_id;
 430        uint16_t vector_sz;
 431
 432        while (!t->done) {
 433                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 434
 435                if (!event) {
 436                        rte_pause();
 437                        continue;
 438                }
 439
 440                cq_id = ev.queue_id % nb_stages;
 441
 442                if (cq_id == last_queue) {
 443                        vector_sz = ev.vec->nb_elem;
 444                        ev.queue_id = tx_queue[ev.vec->port];
 445                        pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
 446                        w->processed_pkts += vector_sz;
 447                } else {
 448                        ev.queue_id++;
 449                        pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
 450                }
 451
 452                pipeline_event_enqueue(dev, port, &ev);
 453        }
 454
 455        return 0;
 456}
 457
 458static __rte_noinline int
 459pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg)
 460{
 461        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
 462        const uint8_t *tx_queue = t->tx_evqueue_id;
 463        uint16_t vector_sz;
 464
 465        while (!t->done) {
 466                uint16_t nb_rx =
 467                        rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 468
 469                if (!nb_rx) {
 470                        rte_pause();
 471                        continue;
 472                }
 473
 474                for (i = 0; i < nb_rx; i++) {
 475                        cq_id = ev[i].queue_id % nb_stages;
 476
 477                        if (ev[i].queue_id == tx_queue[ev[i].vec->port]) {
 478                                vector_sz = ev[i].vec->nb_elem;
 479                                pipeline_event_tx_vector(dev, port, &ev[i]);
 480                                ev[i].op = RTE_EVENT_OP_RELEASE;
 481                                w->processed_pkts += vector_sz;
 482                                continue;
 483                        }
 484
 485                        ev[i].queue_id++;
 486                        pipeline_fwd_event_vector(
 487                                &ev[i], cq_id != last_queue
 488                                                ? sched_type_list[cq_id]
 489                                                : RTE_SCHED_TYPE_ATOMIC);
 490                }
 491
 492                pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 493        }
 494
 495        return 0;
 496}
 497
 498static __rte_noinline int
 499pipeline_queue_worker_multi_stage_burst_fwd_vector(void *arg)
 500{
 501        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
 502        const uint8_t *tx_queue = t->tx_evqueue_id;
 503        uint16_t vector_sz;
 504
 505        while (!t->done) {
 506                uint16_t nb_rx =
 507                        rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 508
 509                if (!nb_rx) {
 510                        rte_pause();
 511                        continue;
 512                }
 513
 514                for (i = 0; i < nb_rx; i++) {
 515                        cq_id = ev[i].queue_id % nb_stages;
 516
 517                        if (cq_id == last_queue) {
 518                                ev[i].queue_id = tx_queue[ev[i].vec->port];
 519                                vector_sz = ev[i].vec->nb_elem;
 520                                pipeline_fwd_event_vector(
 521                                        &ev[i], RTE_SCHED_TYPE_ATOMIC);
 522                                w->processed_pkts += vector_sz;
 523                        } else {
 524                                ev[i].queue_id++;
 525                                pipeline_fwd_event_vector(
 526                                        &ev[i], sched_type_list[cq_id]);
 527                        }
 528                }
 529
 530                pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 531        }
 532
 533        return 0;
 534}
 535
 536static int
 537worker_wrapper(void *arg)
 538{
 539        struct worker_data *w  = arg;
 540        struct evt_options *opt = w->t->opt;
 541        const bool burst = evt_has_burst_mode(w->dev_id);
 542        const bool internal_port = w->t->internal_port;
 543        const uint8_t nb_stages = opt->nb_stages;
 544        /*vector/burst/internal_port*/
 545        const pipeline_queue_worker_t
 546        pipeline_queue_worker_single_stage[2][2][2] = {
 547                [0][0][0] = pipeline_queue_worker_single_stage_fwd,
 548                [0][0][1] = pipeline_queue_worker_single_stage_tx,
 549                [0][1][0] = pipeline_queue_worker_single_stage_burst_fwd,
 550                [0][1][1] = pipeline_queue_worker_single_stage_burst_tx,
 551                [1][0][0] = pipeline_queue_worker_single_stage_fwd_vector,
 552                [1][0][1] = pipeline_queue_worker_single_stage_tx_vector,
 553                [1][1][0] = pipeline_queue_worker_single_stage_burst_fwd_vector,
 554                [1][1][1] = pipeline_queue_worker_single_stage_burst_tx_vector,
 555        };
 556        const pipeline_queue_worker_t
 557        pipeline_queue_worker_multi_stage[2][2][2] = {
 558                [0][0][0] = pipeline_queue_worker_multi_stage_fwd,
 559                [0][0][1] = pipeline_queue_worker_multi_stage_tx,
 560                [0][1][0] = pipeline_queue_worker_multi_stage_burst_fwd,
 561                [0][1][1] = pipeline_queue_worker_multi_stage_burst_tx,
 562                [1][0][0] = pipeline_queue_worker_multi_stage_fwd_vector,
 563                [1][0][1] = pipeline_queue_worker_multi_stage_tx_vector,
 564                [1][1][0] = pipeline_queue_worker_multi_stage_burst_fwd_vector,
 565                [1][1][1] = pipeline_queue_worker_multi_stage_burst_tx_vector,
 566        };
 567
 568        if (nb_stages == 1)
 569                return (pipeline_queue_worker_single_stage[opt->ena_vector]
 570                                                          [burst]
 571                                                          [internal_port])(arg);
 572        else
 573                return (pipeline_queue_worker_multi_stage[opt->ena_vector]
 574                                                         [burst]
 575                                                         [internal_port])(arg);
 576
 577        rte_panic("invalid worker\n");
 578}
 579
 580static int
 581pipeline_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
 582{
 583        return pipeline_launch_lcores(test, opt, worker_wrapper);
 584}
 585
 586static int
 587pipeline_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
 588{
 589        int ret;
 590        int nb_ports;
 591        int nb_queues;
 592        int nb_stages = opt->nb_stages;
 593        uint8_t queue;
 594        uint8_t tx_evport_id = 0;
 595        uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS];
 596        uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
 597        uint8_t nb_worker_queues = 0;
 598        uint16_t prod = 0;
 599        struct rte_event_dev_info info;
 600        struct test_pipeline *t = evt_test_priv(test);
 601
 602        nb_ports = evt_nr_active_lcores(opt->wlcores);
 603        nb_queues = rte_eth_dev_count_avail() * (nb_stages);
 604
 605        /* One queue for Tx adapter per port */
 606        nb_queues += rte_eth_dev_count_avail();
 607
 608        memset(tx_evqueue_id, 0, sizeof(uint8_t) * RTE_MAX_ETHPORTS);
 609        memset(queue_arr, 0, sizeof(uint8_t) * RTE_EVENT_MAX_QUEUES_PER_DEV);
 610
 611        rte_event_dev_info_get(opt->dev_id, &info);
 612        ret = evt_configure_eventdev(opt, nb_queues, nb_ports);
 613        if (ret) {
 614                evt_err("failed to configure eventdev %d", opt->dev_id);
 615                return ret;
 616        }
 617
 618        struct rte_event_queue_conf q_conf = {
 619                        .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
 620                        .nb_atomic_flows = opt->nb_flows,
 621                        .nb_atomic_order_sequences = opt->nb_flows,
 622        };
 623        /* queue configurations */
 624        for (queue = 0; queue < nb_queues; queue++) {
 625                uint8_t slot;
 626
 627                q_conf.event_queue_cfg = 0;
 628                slot = queue % (nb_stages + 1);
 629                if (slot == nb_stages) {
 630                        q_conf.schedule_type = RTE_SCHED_TYPE_ATOMIC;
 631                        if (!t->internal_port) {
 632                                q_conf.event_queue_cfg =
 633                                        RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
 634                        }
 635                        tx_evqueue_id[prod++] = queue;
 636                } else {
 637                        q_conf.schedule_type = opt->sched_type_list[slot];
 638                        queue_arr[nb_worker_queues] = queue;
 639                        nb_worker_queues++;
 640                }
 641
 642                ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
 643                if (ret) {
 644                        evt_err("failed to setup queue=%d", queue);
 645                        return ret;
 646                }
 647        }
 648
 649        if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth)
 650                opt->wkr_deq_dep = info.max_event_port_dequeue_depth;
 651
 652        /* port configuration */
 653        const struct rte_event_port_conf p_conf = {
 654                        .dequeue_depth = opt->wkr_deq_dep,
 655                        .enqueue_depth = info.max_event_port_dequeue_depth,
 656                        .new_event_threshold = info.max_num_events,
 657        };
 658
 659        if (!t->internal_port) {
 660                ret = pipeline_event_port_setup(test, opt, queue_arr,
 661                                nb_worker_queues, p_conf);
 662                if (ret)
 663                        return ret;
 664        } else
 665                ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
 666                                p_conf);
 667
 668        if (ret)
 669                return ret;
 670        /*
 671         * The pipelines are setup in the following manner:
 672         *
 673         * eth_dev_count = 2, nb_stages = 2.
 674         *
 675         *      queues = 6
 676         *      stride = 3
 677         *
 678         *      event queue pipelines:
 679         *      eth0 -> q0 -> q1 -> (q2->tx)
 680         *      eth1 -> q3 -> q4 -> (q5->tx)
 681         *
 682         *      q2, q5 configured as ATOMIC | SINGLE_LINK
 683         *
 684         */
 685        ret = pipeline_event_rx_adapter_setup(opt, nb_stages + 1, p_conf);
 686        if (ret)
 687                return ret;
 688
 689        ret = pipeline_event_tx_adapter_setup(opt, p_conf);
 690        if (ret)
 691                return ret;
 692
 693        if (!evt_has_distributed_sched(opt->dev_id)) {
 694                uint32_t service_id;
 695                rte_event_dev_service_id_get(opt->dev_id, &service_id);
 696                ret = evt_service_setup(service_id);
 697                if (ret) {
 698                        evt_err("No service lcore found to run event dev.");
 699                        return ret;
 700                }
 701        }
 702
 703        /* Connect the tx_evqueue_id to the Tx adapter port */
 704        if (!t->internal_port) {
 705                RTE_ETH_FOREACH_DEV(prod) {
 706                        ret = rte_event_eth_tx_adapter_event_port_get(prod,
 707                                        &tx_evport_id);
 708                        if (ret) {
 709                                evt_err("Unable to get Tx adptr[%d] evprt[%d]",
 710                                                prod, tx_evport_id);
 711                                return ret;
 712                        }
 713
 714                        if (rte_event_port_link(opt->dev_id, tx_evport_id,
 715                                                &tx_evqueue_id[prod],
 716                                                NULL, 1) != 1) {
 717                                evt_err("Unable to link Tx adptr[%d] evprt[%d]",
 718                                                prod, tx_evport_id);
 719                                return ret;
 720                        }
 721                }
 722        }
 723
 724        ret = rte_event_dev_start(opt->dev_id);
 725        if (ret) {
 726                evt_err("failed to start eventdev %d", opt->dev_id);
 727                return ret;
 728        }
 729
 730
 731        RTE_ETH_FOREACH_DEV(prod) {
 732                ret = rte_eth_dev_start(prod);
 733                if (ret) {
 734                        evt_err("Ethernet dev [%d] failed to start."
 735                                        " Using synthetic producer", prod);
 736                        return ret;
 737                }
 738
 739        }
 740
 741        RTE_ETH_FOREACH_DEV(prod) {
 742                ret = rte_event_eth_rx_adapter_start(prod);
 743                if (ret) {
 744                        evt_err("Rx adapter[%d] start failed", prod);
 745                        return ret;
 746                }
 747
 748                ret = rte_event_eth_tx_adapter_start(prod);
 749                if (ret) {
 750                        evt_err("Tx adapter[%d] start failed", prod);
 751                        return ret;
 752                }
 753        }
 754
 755        memcpy(t->tx_evqueue_id, tx_evqueue_id, sizeof(uint8_t) *
 756                        RTE_MAX_ETHPORTS);
 757
 758        return 0;
 759}
 760
 761static void
 762pipeline_queue_opt_dump(struct evt_options *opt)
 763{
 764        pipeline_opt_dump(opt, pipeline_queue_nb_event_queues(opt));
 765}
 766
 767static int
 768pipeline_queue_opt_check(struct evt_options *opt)
 769{
 770        return pipeline_opt_check(opt, pipeline_queue_nb_event_queues(opt));
 771}
 772
 773static bool
 774pipeline_queue_capability_check(struct evt_options *opt)
 775{
 776        struct rte_event_dev_info dev_info;
 777
 778        rte_event_dev_info_get(opt->dev_id, &dev_info);
 779        if (dev_info.max_event_queues < pipeline_queue_nb_event_queues(opt) ||
 780                        dev_info.max_event_ports <
 781                        evt_nr_active_lcores(opt->wlcores)) {
 782                evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
 783                        pipeline_queue_nb_event_queues(opt),
 784                        dev_info.max_event_queues,
 785                        evt_nr_active_lcores(opt->wlcores),
 786                        dev_info.max_event_ports);
 787        }
 788
 789        return true;
 790}
 791
 792static const struct evt_test_ops pipeline_queue =  {
 793        .cap_check          = pipeline_queue_capability_check,
 794        .opt_check          = pipeline_queue_opt_check,
 795        .opt_dump           = pipeline_queue_opt_dump,
 796        .test_setup         = pipeline_test_setup,
 797        .mempool_setup      = pipeline_mempool_setup,
 798        .ethdev_setup       = pipeline_ethdev_setup,
 799        .eventdev_setup     = pipeline_queue_eventdev_setup,
 800        .launch_lcores      = pipeline_queue_launch_lcores,
 801        .eventdev_destroy   = pipeline_eventdev_destroy,
 802        .mempool_destroy    = pipeline_mempool_destroy,
 803        .ethdev_destroy     = pipeline_ethdev_destroy,
 804        .test_result        = pipeline_test_result,
 805        .test_destroy       = pipeline_test_destroy,
 806};
 807
 808EVT_TEST_REGISTER(pipeline_queue);
 809