dpdk/app/test-eventdev/test_pipeline_atq.c
<<
>>
Prefs
   1/*
   2 * SPDX-License-Identifier: BSD-3-Clause
   3 * Copyright 2017 Cavium, Inc.
   4 */
   5
   6#include "test_pipeline_common.h"
   7
   8/* See http://doc.dpdk.org/guides/tools/testeventdev.html for test details */
   9
  10static __rte_always_inline int
  11pipeline_atq_nb_event_queues(struct evt_options *opt)
  12{
  13        RTE_SET_USED(opt);
  14
  15        return rte_eth_dev_count_avail();
  16}
  17
  18typedef int (*pipeline_atq_worker_t)(void *arg);
  19
  20static __rte_noinline int
  21pipeline_atq_worker_single_stage_tx(void *arg)
  22{
  23        PIPELINE_WORKER_SINGLE_STAGE_INIT;
  24        uint8_t enq = 0, deq = 0;
  25
  26        while (t->done == false) {
  27                deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
  28
  29                if (!deq) {
  30                        rte_pause();
  31                        continue;
  32                }
  33
  34                deq = pipeline_event_tx(dev, port, &ev, t);
  35                w->processed_pkts++;
  36        }
  37        pipeline_worker_cleanup(dev, port, &ev, enq, deq);
  38
  39        return 0;
  40}
  41
  42static __rte_noinline int
  43pipeline_atq_worker_single_stage_fwd(void *arg)
  44{
  45        PIPELINE_WORKER_SINGLE_STAGE_INIT;
  46        const uint8_t *tx_queue = t->tx_evqueue_id;
  47        uint8_t enq = 0, deq = 0;
  48
  49        while (t->done == false) {
  50                deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
  51
  52                if (!deq) {
  53                        rte_pause();
  54                        continue;
  55                }
  56
  57                ev.queue_id = tx_queue[ev.mbuf->port];
  58                pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
  59                enq = pipeline_event_enqueue(dev, port, &ev, t);
  60                w->processed_pkts++;
  61        }
  62        pipeline_worker_cleanup(dev, port, &ev, enq, deq);
  63
  64        return 0;
  65}
  66
  67static __rte_noinline int
  68pipeline_atq_worker_single_stage_burst_tx(void *arg)
  69{
  70        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
  71        uint16_t nb_rx = 0, nb_tx = 0;
  72
  73        while (t->done == false) {
  74                nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
  75
  76                if (!nb_rx) {
  77                        rte_pause();
  78                        continue;
  79                }
  80
  81                for (i = 0; i < nb_rx; i++) {
  82                        rte_prefetch0(ev[i + 1].mbuf);
  83                        rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
  84                }
  85
  86                nb_tx = pipeline_event_tx_burst(dev, port, ev, nb_rx, t);
  87                w->processed_pkts += nb_tx;
  88        }
  89        pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
  90
  91        return 0;
  92}
  93
  94static __rte_noinline int
  95pipeline_atq_worker_single_stage_burst_fwd(void *arg)
  96{
  97        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
  98        const uint8_t *tx_queue = t->tx_evqueue_id;
  99        uint16_t nb_rx = 0, nb_tx = 0;
 100
 101        while (t->done == false) {
 102                nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 103
 104                if (!nb_rx) {
 105                        rte_pause();
 106                        continue;
 107                }
 108
 109                for (i = 0; i < nb_rx; i++) {
 110                        rte_prefetch0(ev[i + 1].mbuf);
 111                        rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
 112                        ev[i].queue_id = tx_queue[ev[i].mbuf->port];
 113                        pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
 114                }
 115
 116                nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
 117                w->processed_pkts += nb_tx;
 118        }
 119        pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 120
 121        return 0;
 122}
 123
 124static __rte_noinline int
 125pipeline_atq_worker_single_stage_tx_vector(void *arg)
 126{
 127        PIPELINE_WORKER_SINGLE_STAGE_INIT;
 128        uint8_t enq = 0, deq = 0;
 129        uint16_t vector_sz;
 130
 131        while (!t->done) {
 132                deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 133
 134                if (!deq) {
 135                        rte_pause();
 136                        continue;
 137                }
 138                vector_sz = ev.vec->nb_elem;
 139                enq = pipeline_event_tx_vector(dev, port, &ev, t);
 140                w->processed_pkts += vector_sz;
 141        }
 142        pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 143
 144        return 0;
 145}
 146
 147static __rte_noinline int
 148pipeline_atq_worker_single_stage_fwd_vector(void *arg)
 149{
 150        PIPELINE_WORKER_SINGLE_STAGE_INIT;
 151        const uint8_t *tx_queue = t->tx_evqueue_id;
 152        uint8_t enq = 0, deq = 0;
 153        uint16_t vector_sz;
 154
 155        while (!t->done) {
 156                deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 157
 158                if (!deq) {
 159                        rte_pause();
 160                        continue;
 161                }
 162
 163                vector_sz = ev.vec->nb_elem;
 164                ev.queue_id = tx_queue[ev.vec->port];
 165                ev.vec->queue = 0;
 166                pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
 167                enq = pipeline_event_enqueue(dev, port, &ev, t);
 168                w->processed_pkts += vector_sz;
 169        }
 170        pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 171
 172        return 0;
 173}
 174
 175static __rte_noinline int
 176pipeline_atq_worker_single_stage_burst_tx_vector(void *arg)
 177{
 178        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
 179        uint16_t nb_rx = 0, nb_tx = 0;
 180        uint16_t vector_sz;
 181
 182        while (!t->done) {
 183                nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 184
 185                if (!nb_rx) {
 186                        rte_pause();
 187                        continue;
 188                }
 189                vector_sz = 0;
 190                for (i = 0; i < nb_rx; i++) {
 191                        vector_sz += ev[i].vec->nb_elem;
 192                        ev[i].vec->queue = 0;
 193                }
 194
 195                nb_tx = pipeline_event_tx_burst(dev, port, ev, nb_rx, t);
 196                w->processed_pkts += vector_sz;
 197        }
 198        pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 199
 200        return 0;
 201}
 202
 203static __rte_noinline int
 204pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg)
 205{
 206        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
 207        const uint8_t *tx_queue = t->tx_evqueue_id;
 208        uint16_t nb_rx = 0, nb_tx = 0;
 209        uint16_t vector_sz;
 210
 211        while (!t->done) {
 212                nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 213
 214                if (!nb_rx) {
 215                        rte_pause();
 216                        continue;
 217                }
 218
 219                vector_sz = 0;
 220                for (i = 0; i < nb_rx; i++) {
 221                        ev[i].queue_id = tx_queue[ev[i].vec->port];
 222                        ev[i].vec->queue = 0;
 223                        vector_sz += ev[i].vec->nb_elem;
 224                        pipeline_fwd_event_vector(&ev[i],
 225                                                  RTE_SCHED_TYPE_ATOMIC);
 226                }
 227
 228                nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
 229                w->processed_pkts += vector_sz;
 230        }
 231        pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 232
 233        return 0;
 234}
 235
 236static __rte_noinline int
 237pipeline_atq_worker_multi_stage_tx(void *arg)
 238{
 239        PIPELINE_WORKER_MULTI_STAGE_INIT;
 240        uint8_t enq = 0, deq = 0;
 241
 242        while (t->done == false) {
 243                deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 244
 245                if (!deq) {
 246                        rte_pause();
 247                        continue;
 248                }
 249
 250                cq_id = ev.sub_event_type % nb_stages;
 251
 252                if (cq_id == last_queue) {
 253                        enq = pipeline_event_tx(dev, port, &ev, t);
 254                        w->processed_pkts++;
 255                        continue;
 256                }
 257
 258                ev.sub_event_type++;
 259                pipeline_fwd_event(&ev, sched_type_list[cq_id]);
 260                enq = pipeline_event_enqueue(dev, port, &ev, t);
 261        }
 262        pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 263
 264        return 0;
 265}
 266
 267static __rte_noinline int
 268pipeline_atq_worker_multi_stage_fwd(void *arg)
 269{
 270        PIPELINE_WORKER_MULTI_STAGE_INIT;
 271        const uint8_t *tx_queue = t->tx_evqueue_id;
 272        uint8_t enq = 0, deq = 0;
 273
 274        while (t->done == false) {
 275                deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 276
 277                if (!deq) {
 278                        rte_pause();
 279                        continue;
 280                }
 281
 282                cq_id = ev.sub_event_type % nb_stages;
 283
 284                if (cq_id == last_queue) {
 285                        ev.queue_id = tx_queue[ev.mbuf->port];
 286                        pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
 287                        w->processed_pkts++;
 288                } else {
 289                        ev.sub_event_type++;
 290                        pipeline_fwd_event(&ev, sched_type_list[cq_id]);
 291                }
 292
 293                enq = pipeline_event_enqueue(dev, port, &ev, t);
 294        }
 295        pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 296
 297        return 0;
 298}
 299
 300static __rte_noinline int
 301pipeline_atq_worker_multi_stage_burst_tx(void *arg)
 302{
 303        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
 304        uint16_t nb_rx = 0, nb_tx = 0;
 305
 306        while (t->done == false) {
 307                nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 308
 309                if (!nb_rx) {
 310                        rte_pause();
 311                        continue;
 312                }
 313
 314                for (i = 0; i < nb_rx; i++) {
 315                        rte_prefetch0(ev[i + 1].mbuf);
 316                        cq_id = ev[i].sub_event_type % nb_stages;
 317
 318                        if (cq_id == last_queue) {
 319                                pipeline_event_tx(dev, port, &ev[i], t);
 320                                ev[i].op = RTE_EVENT_OP_RELEASE;
 321                                w->processed_pkts++;
 322                                continue;
 323                        }
 324
 325                        ev[i].sub_event_type++;
 326                        pipeline_fwd_event(&ev[i], sched_type_list[cq_id]);
 327                }
 328
 329                nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
 330        }
 331        pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 332
 333        return 0;
 334}
 335
 336static __rte_noinline int
 337pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
 338{
 339        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
 340        const uint8_t *tx_queue = t->tx_evqueue_id;
 341        uint16_t nb_rx = 0, nb_tx = 0;
 342
 343        while (t->done == false) {
 344                nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 345
 346                if (!nb_rx) {
 347                        rte_pause();
 348                        continue;
 349                }
 350
 351                for (i = 0; i < nb_rx; i++) {
 352                        rte_prefetch0(ev[i + 1].mbuf);
 353                        cq_id = ev[i].sub_event_type % nb_stages;
 354
 355                        if (cq_id == last_queue) {
 356                                w->processed_pkts++;
 357                                ev[i].queue_id = tx_queue[ev[i].mbuf->port];
 358                                pipeline_fwd_event(&ev[i],
 359                                                RTE_SCHED_TYPE_ATOMIC);
 360                        } else {
 361                                ev[i].sub_event_type++;
 362                                pipeline_fwd_event(&ev[i],
 363                                                sched_type_list[cq_id]);
 364                        }
 365                }
 366
 367                nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
 368        }
 369        pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 370
 371        return 0;
 372}
 373
 374static __rte_noinline int
 375pipeline_atq_worker_multi_stage_tx_vector(void *arg)
 376{
 377        PIPELINE_WORKER_MULTI_STAGE_INIT;
 378        uint8_t enq = 0, deq = 0;
 379        uint16_t vector_sz;
 380
 381        while (!t->done) {
 382                deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 383
 384                if (!deq) {
 385                        rte_pause();
 386                        continue;
 387                }
 388
 389                cq_id = ev.sub_event_type % nb_stages;
 390
 391                if (cq_id == last_queue) {
 392                        vector_sz = ev.vec->nb_elem;
 393                        enq = pipeline_event_tx_vector(dev, port, &ev, t);
 394                        w->processed_pkts += vector_sz;
 395                        continue;
 396                }
 397
 398                ev.sub_event_type++;
 399                pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
 400                enq = pipeline_event_enqueue(dev, port, &ev, t);
 401        }
 402        pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 403
 404        return 0;
 405}
 406
 407static __rte_noinline int
 408pipeline_atq_worker_multi_stage_fwd_vector(void *arg)
 409{
 410        PIPELINE_WORKER_MULTI_STAGE_INIT;
 411        const uint8_t *tx_queue = t->tx_evqueue_id;
 412        uint8_t enq = 0, deq = 0;
 413        uint16_t vector_sz;
 414
 415        while (!t->done) {
 416                deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 417
 418                if (!deq) {
 419                        rte_pause();
 420                        continue;
 421                }
 422
 423                cq_id = ev.sub_event_type % nb_stages;
 424
 425                if (cq_id == last_queue) {
 426                        ev.queue_id = tx_queue[ev.vec->port];
 427                        ev.vec->queue = 0;
 428                        vector_sz = ev.vec->nb_elem;
 429                        pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
 430                        enq = pipeline_event_enqueue(dev, port, &ev, t);
 431                        w->processed_pkts += vector_sz;
 432                } else {
 433                        ev.sub_event_type++;
 434                        pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
 435                        enq = pipeline_event_enqueue(dev, port, &ev, t);
 436                }
 437        }
 438        pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 439
 440        return 0;
 441}
 442
 443static __rte_noinline int
 444pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
 445{
 446        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
 447        uint16_t nb_rx = 0, nb_tx = 0;
 448        uint16_t vector_sz;
 449
 450        while (!t->done) {
 451                nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 452
 453                if (!nb_rx) {
 454                        rte_pause();
 455                        continue;
 456                }
 457
 458                for (i = 0; i < nb_rx; i++) {
 459                        cq_id = ev[i].sub_event_type % nb_stages;
 460
 461                        if (cq_id == last_queue) {
 462                                vector_sz = ev[i].vec->nb_elem;
 463                                pipeline_event_tx_vector(dev, port, &ev[i], t);
 464                                ev[i].op = RTE_EVENT_OP_RELEASE;
 465                                w->processed_pkts += vector_sz;
 466                                continue;
 467                        }
 468
 469                        ev[i].sub_event_type++;
 470                        pipeline_fwd_event_vector(&ev[i],
 471                                                  sched_type_list[cq_id]);
 472                }
 473
 474                nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
 475        }
 476        pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 477
 478        return 0;
 479}
 480
 481static __rte_noinline int
 482pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg)
 483{
 484        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
 485        const uint8_t *tx_queue = t->tx_evqueue_id;
 486        uint16_t nb_rx = 0, nb_tx = 0;
 487        uint16_t vector_sz;
 488
 489        while (!t->done) {
 490                nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 491
 492                if (!nb_rx) {
 493                        rte_pause();
 494                        continue;
 495                }
 496
 497                for (i = 0; i < nb_rx; i++) {
 498                        cq_id = ev[i].sub_event_type % nb_stages;
 499
 500                        if (cq_id == last_queue) {
 501                                vector_sz = ev[i].vec->nb_elem;
 502                                ev[i].queue_id = tx_queue[ev[i].vec->port];
 503                                ev[i].vec->queue = 0;
 504                                pipeline_fwd_event_vector(
 505                                        &ev[i], RTE_SCHED_TYPE_ATOMIC);
 506                                w->processed_pkts += vector_sz;
 507                        } else {
 508                                ev[i].sub_event_type++;
 509                                pipeline_fwd_event_vector(
 510                                        &ev[i], sched_type_list[cq_id]);
 511                        }
 512                }
 513
 514                nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
 515        }
 516        pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 517
 518        return 0;
 519}
 520
 521static int
 522worker_wrapper(void *arg)
 523{
 524        struct worker_data *w  = arg;
 525        struct evt_options *opt = w->t->opt;
 526        const bool burst = evt_has_burst_mode(w->dev_id);
 527        const bool internal_port = w->t->internal_port;
 528        const uint8_t nb_stages = opt->nb_stages;
 529        /*vector/burst/internal_port*/
 530        const pipeline_atq_worker_t
 531        pipeline_atq_worker_single_stage[2][2][2] = {
 532                [0][0][0] = pipeline_atq_worker_single_stage_fwd,
 533                [0][0][1] = pipeline_atq_worker_single_stage_tx,
 534                [0][1][0] = pipeline_atq_worker_single_stage_burst_fwd,
 535                [0][1][1] = pipeline_atq_worker_single_stage_burst_tx,
 536                [1][0][0] = pipeline_atq_worker_single_stage_fwd_vector,
 537                [1][0][1] = pipeline_atq_worker_single_stage_tx_vector,
 538                [1][1][0] = pipeline_atq_worker_single_stage_burst_fwd_vector,
 539                [1][1][1] = pipeline_atq_worker_single_stage_burst_tx_vector,
 540        };
 541        const pipeline_atq_worker_t
 542        pipeline_atq_worker_multi_stage[2][2][2] = {
 543                [0][0][0] = pipeline_atq_worker_multi_stage_fwd,
 544                [0][0][1] = pipeline_atq_worker_multi_stage_tx,
 545                [0][1][0] = pipeline_atq_worker_multi_stage_burst_fwd,
 546                [0][1][1] = pipeline_atq_worker_multi_stage_burst_tx,
 547                [1][0][0] = pipeline_atq_worker_multi_stage_fwd_vector,
 548                [1][0][1] = pipeline_atq_worker_multi_stage_tx_vector,
 549                [1][1][0] = pipeline_atq_worker_multi_stage_burst_fwd_vector,
 550                [1][1][1] = pipeline_atq_worker_multi_stage_burst_tx_vector,
 551        };
 552
 553        if (nb_stages == 1)
 554                return (pipeline_atq_worker_single_stage[opt->ena_vector][burst]
 555                                                        [internal_port])(arg);
 556        else
 557                return (pipeline_atq_worker_multi_stage[opt->ena_vector][burst]
 558                                                       [internal_port])(arg);
 559
 560        rte_panic("invalid worker\n");
 561}
 562
 563static int
 564pipeline_atq_launch_lcores(struct evt_test *test, struct evt_options *opt)
 565{
 566        return pipeline_launch_lcores(test, opt, worker_wrapper);
 567}
 568
 569static int
 570pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
 571{
 572        int ret;
 573        int nb_ports;
 574        int nb_queues;
 575        uint8_t queue, is_prod;
 576        uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS];
 577        uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
 578        uint8_t nb_worker_queues = 0;
 579        uint8_t tx_evport_id = 0;
 580        uint16_t prod = 0;
 581        struct rte_event_dev_info info;
 582        struct test_pipeline *t = evt_test_priv(test);
 583
 584        nb_ports = evt_nr_active_lcores(opt->wlcores);
 585        nb_queues = rte_eth_dev_count_avail();
 586
 587        memset(tx_evqueue_id, 0, sizeof(uint8_t) * RTE_MAX_ETHPORTS);
 588        memset(queue_arr, 0, sizeof(uint8_t) * RTE_EVENT_MAX_QUEUES_PER_DEV);
 589        /* One queue for Tx adapter per port */
 590        if (!t->internal_port) {
 591                RTE_ETH_FOREACH_DEV(prod) {
 592                        tx_evqueue_id[prod] = nb_queues;
 593                        nb_queues++;
 594                }
 595        }
 596
 597        rte_event_dev_info_get(opt->dev_id, &info);
 598
 599        ret = evt_configure_eventdev(opt, nb_queues, nb_ports);
 600        if (ret) {
 601                evt_err("failed to configure eventdev %d", opt->dev_id);
 602                return ret;
 603        }
 604
 605        struct rte_event_queue_conf q_conf = {
 606                .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
 607                .nb_atomic_flows = opt->nb_flows,
 608                .nb_atomic_order_sequences = opt->nb_flows,
 609        };
 610        /* queue configurations */
 611        for (queue = 0; queue < nb_queues; queue++) {
 612                q_conf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ALL_TYPES;
 613
 614                if (!t->internal_port) {
 615                        is_prod = false;
 616                        RTE_ETH_FOREACH_DEV(prod) {
 617                                if (queue == tx_evqueue_id[prod]) {
 618                                        q_conf.event_queue_cfg =
 619                                                RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
 620                                        is_prod = true;
 621                                        break;
 622                                }
 623                        }
 624                        if (!is_prod) {
 625                                queue_arr[nb_worker_queues] = queue;
 626                                nb_worker_queues++;
 627                        }
 628                }
 629
 630                ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
 631                if (ret) {
 632                        evt_err("failed to setup queue=%d", queue);
 633                        return ret;
 634                }
 635        }
 636
 637        if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth)
 638                opt->wkr_deq_dep = info.max_event_port_dequeue_depth;
 639
 640        /* port configuration */
 641        const struct rte_event_port_conf p_conf = {
 642                .dequeue_depth = opt->wkr_deq_dep,
 643                .enqueue_depth = info.max_event_port_dequeue_depth,
 644                .new_event_threshold = info.max_num_events,
 645        };
 646
 647        if (!t->internal_port)
 648                ret = pipeline_event_port_setup(test, opt, queue_arr,
 649                                nb_worker_queues, p_conf);
 650        else
 651                ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
 652                                p_conf);
 653
 654        if (ret)
 655                return ret;
 656
 657        /*
 658         * The pipelines are setup in the following manner:
 659         *
 660         * eth_dev_count = 2, nb_stages = 2, atq mode
 661         *
 662         * eth0, eth1 have Internal port capability :
 663         *      queues = 2
 664         *      stride = 1
 665         *
 666         *      event queue pipelines:
 667         *      eth0 -> q0 ->Tx
 668         *      eth1 -> q1 ->Tx
 669         *
 670         *      q0, q1 are configured as ATQ so, all the different stages can
 671         *      be enqueued on the same queue.
 672         *
 673         * eth0, eth1 use Tx adapters service core :
 674         *      queues = 4
 675         *      stride = 1
 676         *
 677         *      event queue pipelines:
 678         *      eth0 -> q0  -> q2 -> Tx
 679         *      eth1 -> q1  -> q3 -> Tx
 680         *
 681         *      q0, q1 are configured as stated above.
 682         *      q2, q3 configured as SINGLE_LINK.
 683         */
 684        ret = pipeline_event_rx_adapter_setup(opt, 1, p_conf);
 685        if (ret)
 686                return ret;
 687        ret = pipeline_event_tx_adapter_setup(opt, p_conf);
 688        if (ret)
 689                return ret;
 690
 691        if (!evt_has_distributed_sched(opt->dev_id)) {
 692                uint32_t service_id;
 693                rte_event_dev_service_id_get(opt->dev_id, &service_id);
 694                ret = evt_service_setup(service_id);
 695                if (ret) {
 696                        evt_err("No service lcore found to run event dev.");
 697                        return ret;
 698                }
 699        }
 700
 701        /* Connect the tx_evqueue_id to the Tx adapter port */
 702        if (!t->internal_port) {
 703                RTE_ETH_FOREACH_DEV(prod) {
 704                        ret = rte_event_eth_tx_adapter_event_port_get(prod,
 705                                        &tx_evport_id);
 706                        if (ret) {
 707                                evt_err("Unable to get Tx adapter[%d]", prod);
 708                                return ret;
 709                        }
 710
 711                        if (rte_event_port_link(opt->dev_id, tx_evport_id,
 712                                                &tx_evqueue_id[prod],
 713                                                NULL, 1) != 1) {
 714                                evt_err("Unable to link Tx adptr[%d] evprt[%d]",
 715                                                prod, tx_evport_id);
 716                                return ret;
 717                        }
 718                }
 719        }
 720
 721        ret = rte_event_dev_start(opt->dev_id);
 722        if (ret) {
 723                evt_err("failed to start eventdev %d", opt->dev_id);
 724                return ret;
 725        }
 726
 727
 728        RTE_ETH_FOREACH_DEV(prod) {
 729                ret = rte_eth_dev_start(prod);
 730                if (ret) {
 731                        evt_err("Ethernet dev [%d] failed to start."
 732                                        " Using synthetic producer", prod);
 733                        return ret;
 734                }
 735        }
 736
 737        RTE_ETH_FOREACH_DEV(prod) {
 738                ret = rte_event_eth_rx_adapter_start(prod);
 739                if (ret) {
 740                        evt_err("Rx adapter[%d] start failed", prod);
 741                        return ret;
 742                }
 743
 744                ret = rte_event_eth_tx_adapter_start(prod);
 745                if (ret) {
 746                        evt_err("Tx adapter[%d] start failed", prod);
 747                        return ret;
 748                }
 749        }
 750
 751        memcpy(t->tx_evqueue_id, tx_evqueue_id, sizeof(uint8_t) *
 752                        RTE_MAX_ETHPORTS);
 753
 754        return 0;
 755}
 756
 757static void
 758pipeline_atq_opt_dump(struct evt_options *opt)
 759{
 760        pipeline_opt_dump(opt, pipeline_atq_nb_event_queues(opt));
 761}
 762
 763static int
 764pipeline_atq_opt_check(struct evt_options *opt)
 765{
 766        return pipeline_opt_check(opt, pipeline_atq_nb_event_queues(opt));
 767}
 768
 769static bool
 770pipeline_atq_capability_check(struct evt_options *opt)
 771{
 772        struct rte_event_dev_info dev_info;
 773
 774        rte_event_dev_info_get(opt->dev_id, &dev_info);
 775        if (dev_info.max_event_queues < pipeline_atq_nb_event_queues(opt) ||
 776                        dev_info.max_event_ports <
 777                        evt_nr_active_lcores(opt->wlcores)) {
 778                evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
 779                        pipeline_atq_nb_event_queues(opt),
 780                        dev_info.max_event_queues,
 781                        evt_nr_active_lcores(opt->wlcores),
 782                        dev_info.max_event_ports);
 783        }
 784        if (!evt_has_all_types_queue(opt->dev_id))
 785                return false;
 786
 787        return true;
 788}
 789
 790static const struct evt_test_ops pipeline_atq =  {
 791        .cap_check          = pipeline_atq_capability_check,
 792        .opt_check          = pipeline_atq_opt_check,
 793        .opt_dump           = pipeline_atq_opt_dump,
 794        .test_setup         = pipeline_test_setup,
 795        .mempool_setup      = pipeline_mempool_setup,
 796        .ethdev_setup       = pipeline_ethdev_setup,
 797        .eventdev_setup     = pipeline_atq_eventdev_setup,
 798        .launch_lcores      = pipeline_atq_launch_lcores,
 799        .ethdev_rx_stop     = pipeline_ethdev_rx_stop,
 800        .eventdev_destroy   = pipeline_eventdev_destroy,
 801        .mempool_destroy    = pipeline_mempool_destroy,
 802        .ethdev_destroy     = pipeline_ethdev_destroy,
 803        .test_result        = pipeline_test_result,
 804        .test_destroy       = pipeline_test_destroy,
 805};
 806
 807EVT_TEST_REGISTER(pipeline_atq);
 808