dpdk/app/test-eventdev/test_pipeline_atq.c
<<
>>
Prefs
   1/*
   2 * SPDX-License-Identifier: BSD-3-Clause
   3 * Copyright 2017 Cavium, Inc.
   4 */
   5
   6#include "test_pipeline_common.h"
   7
   8/* See http://doc.dpdk.org/guides/tools/testeventdev.html for test details */
   9
  10static __rte_always_inline int
  11pipeline_atq_nb_event_queues(struct evt_options *opt)
  12{
  13        RTE_SET_USED(opt);
  14
  15        return rte_eth_dev_count_avail();
  16}
  17
  18static __rte_noinline int
  19pipeline_atq_worker_single_stage_tx(void *arg)
  20{
  21        PIPELINE_WORKER_SINGLE_STAGE_INIT;
  22
  23        while (t->done == false) {
  24                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
  25
  26                if (!event) {
  27                        rte_pause();
  28                        continue;
  29                }
  30
  31                pipeline_event_tx(dev, port, &ev);
  32                w->processed_pkts++;
  33        }
  34
  35        return 0;
  36}
  37
  38static __rte_noinline int
  39pipeline_atq_worker_single_stage_fwd(void *arg)
  40{
  41        PIPELINE_WORKER_SINGLE_STAGE_INIT;
  42        const uint8_t *tx_queue = t->tx_evqueue_id;
  43
  44        while (t->done == false) {
  45                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
  46
  47                if (!event) {
  48                        rte_pause();
  49                        continue;
  50                }
  51
  52                ev.queue_id = tx_queue[ev.mbuf->port];
  53                pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
  54                pipeline_event_enqueue(dev, port, &ev);
  55                w->processed_pkts++;
  56        }
  57
  58        return 0;
  59}
  60
  61static __rte_noinline int
  62pipeline_atq_worker_single_stage_burst_tx(void *arg)
  63{
  64        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
  65
  66        while (t->done == false) {
  67                uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
  68                                BURST_SIZE, 0);
  69
  70                if (!nb_rx) {
  71                        rte_pause();
  72                        continue;
  73                }
  74
  75                for (i = 0; i < nb_rx; i++) {
  76                        rte_prefetch0(ev[i + 1].mbuf);
  77                        rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
  78                }
  79
  80                pipeline_event_tx_burst(dev, port, ev, nb_rx);
  81                w->processed_pkts += nb_rx;
  82        }
  83
  84        return 0;
  85}
  86
  87static __rte_noinline int
  88pipeline_atq_worker_single_stage_burst_fwd(void *arg)
  89{
  90        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
  91        const uint8_t *tx_queue = t->tx_evqueue_id;
  92
  93        while (t->done == false) {
  94                uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
  95                                BURST_SIZE, 0);
  96
  97                if (!nb_rx) {
  98                        rte_pause();
  99                        continue;
 100                }
 101
 102                for (i = 0; i < nb_rx; i++) {
 103                        rte_prefetch0(ev[i + 1].mbuf);
 104                        rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
 105                        ev[i].queue_id = tx_queue[ev[i].mbuf->port];
 106                        pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
 107                }
 108
 109                pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 110                w->processed_pkts += nb_rx;
 111        }
 112
 113        return 0;
 114}
 115
 116static __rte_noinline int
 117pipeline_atq_worker_multi_stage_tx(void *arg)
 118{
 119        PIPELINE_WORKER_MULTI_STAGE_INIT;
 120
 121        while (t->done == false) {
 122                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 123
 124                if (!event) {
 125                        rte_pause();
 126                        continue;
 127                }
 128
 129                cq_id = ev.sub_event_type % nb_stages;
 130
 131                if (cq_id == last_queue) {
 132                        pipeline_event_tx(dev, port, &ev);
 133                        w->processed_pkts++;
 134                        continue;
 135                }
 136
 137                ev.sub_event_type++;
 138                pipeline_fwd_event(&ev, sched_type_list[cq_id]);
 139                pipeline_event_enqueue(dev, port, &ev);
 140        }
 141
 142        return 0;
 143}
 144
 145static __rte_noinline int
 146pipeline_atq_worker_multi_stage_fwd(void *arg)
 147{
 148        PIPELINE_WORKER_MULTI_STAGE_INIT;
 149        const uint8_t *tx_queue = t->tx_evqueue_id;
 150
 151        while (t->done == false) {
 152                uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 153
 154                if (!event) {
 155                        rte_pause();
 156                        continue;
 157                }
 158
 159                cq_id = ev.sub_event_type % nb_stages;
 160
 161                if (cq_id == last_queue) {
 162                        ev.queue_id = tx_queue[ev.mbuf->port];
 163                        pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
 164                        w->processed_pkts++;
 165                } else {
 166                        ev.sub_event_type++;
 167                        pipeline_fwd_event(&ev, sched_type_list[cq_id]);
 168                }
 169
 170                pipeline_event_enqueue(dev, port, &ev);
 171        }
 172
 173        return 0;
 174}
 175
 176static __rte_noinline int
 177pipeline_atq_worker_multi_stage_burst_tx(void *arg)
 178{
 179        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
 180
 181        while (t->done == false) {
 182                uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
 183                                BURST_SIZE, 0);
 184
 185                if (!nb_rx) {
 186                        rte_pause();
 187                        continue;
 188                }
 189
 190                for (i = 0; i < nb_rx; i++) {
 191                        rte_prefetch0(ev[i + 1].mbuf);
 192                        cq_id = ev[i].sub_event_type % nb_stages;
 193
 194                        if (cq_id == last_queue) {
 195                                pipeline_event_tx(dev, port, &ev[i]);
 196                                ev[i].op = RTE_EVENT_OP_RELEASE;
 197                                w->processed_pkts++;
 198                                continue;
 199                        }
 200
 201                        ev[i].sub_event_type++;
 202                        pipeline_fwd_event(&ev[i], sched_type_list[cq_id]);
 203                }
 204
 205                pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 206        }
 207
 208        return 0;
 209}
 210
 211static __rte_noinline int
 212pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
 213{
 214        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
 215        const uint8_t *tx_queue = t->tx_evqueue_id;
 216
 217        while (t->done == false) {
 218                uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
 219                                BURST_SIZE, 0);
 220
 221                if (!nb_rx) {
 222                        rte_pause();
 223                        continue;
 224                }
 225
 226                for (i = 0; i < nb_rx; i++) {
 227                        rte_prefetch0(ev[i + 1].mbuf);
 228                        cq_id = ev[i].sub_event_type % nb_stages;
 229
 230                        if (cq_id == last_queue) {
 231                                w->processed_pkts++;
 232                                ev[i].queue_id = tx_queue[ev[i].mbuf->port];
 233                                pipeline_fwd_event(&ev[i],
 234                                                RTE_SCHED_TYPE_ATOMIC);
 235                        } else {
 236                                ev[i].sub_event_type++;
 237                                pipeline_fwd_event(&ev[i],
 238                                                sched_type_list[cq_id]);
 239                        }
 240                }
 241
 242                pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
 243        }
 244
 245        return 0;
 246}
 247
 248static int
 249worker_wrapper(void *arg)
 250{
 251        struct worker_data *w  = arg;
 252        struct evt_options *opt = w->t->opt;
 253        const bool burst = evt_has_burst_mode(w->dev_id);
 254        const bool internal_port = w->t->internal_port;
 255        const uint8_t nb_stages = opt->nb_stages;
 256        RTE_SET_USED(opt);
 257
 258        if (nb_stages == 1) {
 259                if (!burst && internal_port)
 260                        return pipeline_atq_worker_single_stage_tx(arg);
 261                else if (!burst && !internal_port)
 262                        return pipeline_atq_worker_single_stage_fwd(arg);
 263                else if (burst && internal_port)
 264                        return pipeline_atq_worker_single_stage_burst_tx(arg);
 265                else if (burst && !internal_port)
 266                        return pipeline_atq_worker_single_stage_burst_fwd(arg);
 267        } else {
 268                if (!burst && internal_port)
 269                        return pipeline_atq_worker_multi_stage_tx(arg);
 270                else if (!burst && !internal_port)
 271                        return pipeline_atq_worker_multi_stage_fwd(arg);
 272                if (burst && internal_port)
 273                        return pipeline_atq_worker_multi_stage_burst_tx(arg);
 274                else if (burst && !internal_port)
 275                        return pipeline_atq_worker_multi_stage_burst_fwd(arg);
 276        }
 277
 278        rte_panic("invalid worker\n");
 279}
 280
 281static int
 282pipeline_atq_launch_lcores(struct evt_test *test, struct evt_options *opt)
 283{
 284        return pipeline_launch_lcores(test, opt, worker_wrapper);
 285}
 286
 287static int
 288pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
 289{
 290        int ret;
 291        int nb_ports;
 292        int nb_queues;
 293        uint8_t queue;
 294        uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS];
 295        uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
 296        uint8_t nb_worker_queues = 0;
 297        uint8_t tx_evport_id = 0;
 298        uint16_t prod = 0;
 299        struct rte_event_dev_info info;
 300        struct test_pipeline *t = evt_test_priv(test);
 301
 302        nb_ports = evt_nr_active_lcores(opt->wlcores);
 303        nb_queues = rte_eth_dev_count_avail();
 304
 305        memset(tx_evqueue_id, 0, sizeof(uint8_t) * RTE_MAX_ETHPORTS);
 306        memset(queue_arr, 0, sizeof(uint8_t) * RTE_EVENT_MAX_QUEUES_PER_DEV);
 307        /* One queue for Tx adapter per port */
 308        if (!t->internal_port) {
 309                RTE_ETH_FOREACH_DEV(prod) {
 310                        tx_evqueue_id[prod] = nb_queues;
 311                        nb_queues++;
 312                }
 313        }
 314
 315        rte_event_dev_info_get(opt->dev_id, &info);
 316
 317        ret = evt_configure_eventdev(opt, nb_queues, nb_ports);
 318        if (ret) {
 319                evt_err("failed to configure eventdev %d", opt->dev_id);
 320                return ret;
 321        }
 322
 323        struct rte_event_queue_conf q_conf = {
 324                .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
 325                .nb_atomic_flows = opt->nb_flows,
 326                .nb_atomic_order_sequences = opt->nb_flows,
 327        };
 328        /* queue configurations */
 329        for (queue = 0; queue < nb_queues; queue++) {
 330                q_conf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ALL_TYPES;
 331
 332                if (!t->internal_port) {
 333                        RTE_ETH_FOREACH_DEV(prod) {
 334                                if (queue == tx_evqueue_id[prod]) {
 335                                        q_conf.event_queue_cfg =
 336                                                RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
 337                                } else {
 338                                        queue_arr[nb_worker_queues] = queue;
 339                                        nb_worker_queues++;
 340                                }
 341                        }
 342                }
 343
 344                ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
 345                if (ret) {
 346                        evt_err("failed to setup queue=%d", queue);
 347                        return ret;
 348                }
 349        }
 350
 351        if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth)
 352                opt->wkr_deq_dep = info.max_event_port_dequeue_depth;
 353
 354        /* port configuration */
 355        const struct rte_event_port_conf p_conf = {
 356                .dequeue_depth = opt->wkr_deq_dep,
 357                .enqueue_depth = info.max_event_port_dequeue_depth,
 358                .new_event_threshold = info.max_num_events,
 359        };
 360
 361        if (!t->internal_port)
 362                ret = pipeline_event_port_setup(test, opt, queue_arr,
 363                                nb_worker_queues, p_conf);
 364        else
 365                ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
 366                                p_conf);
 367
 368        if (ret)
 369                return ret;
 370
 371        /*
 372         * The pipelines are setup in the following manner:
 373         *
 374         * eth_dev_count = 2, nb_stages = 2, atq mode
 375         *
 376         * eth0, eth1 have Internal port capability :
 377         *      queues = 2
 378         *      stride = 1
 379         *
 380         *      event queue pipelines:
 381         *      eth0 -> q0 ->Tx
 382         *      eth1 -> q1 ->Tx
 383         *
 384         *      q0, q1 are configured as ATQ so, all the different stages can
 385         *      be enqueued on the same queue.
 386         *
 387         * eth0, eth1 use Tx adapters service core :
 388         *      queues = 4
 389         *      stride = 1
 390         *
 391         *      event queue pipelines:
 392         *      eth0 -> q0  -> q2 -> Tx
 393         *      eth1 -> q1  -> q3 -> Tx
 394         *
 395         *      q0, q1 are configured as stated above.
 396         *      q2, q3 configured as SINGLE_LINK.
 397         */
 398        ret = pipeline_event_rx_adapter_setup(opt, 1, p_conf);
 399        if (ret)
 400                return ret;
 401        ret = pipeline_event_tx_adapter_setup(opt, p_conf);
 402        if (ret)
 403                return ret;
 404
 405        if (!evt_has_distributed_sched(opt->dev_id)) {
 406                uint32_t service_id;
 407                rte_event_dev_service_id_get(opt->dev_id, &service_id);
 408                ret = evt_service_setup(service_id);
 409                if (ret) {
 410                        evt_err("No service lcore found to run event dev.");
 411                        return ret;
 412                }
 413        }
 414
 415        /* Connect the tx_evqueue_id to the Tx adapter port */
 416        if (!t->internal_port) {
 417                RTE_ETH_FOREACH_DEV(prod) {
 418                        ret = rte_event_eth_tx_adapter_event_port_get(prod,
 419                                        &tx_evport_id);
 420                        if (ret) {
 421                                evt_err("Unable to get Tx adapter[%d]", prod);
 422                                return ret;
 423                        }
 424
 425                        if (rte_event_port_link(opt->dev_id, tx_evport_id,
 426                                                &tx_evqueue_id[prod],
 427                                                NULL, 1) != 1) {
 428                                evt_err("Unable to link Tx adptr[%d] evprt[%d]",
 429                                                prod, tx_evport_id);
 430                                return ret;
 431                        }
 432                }
 433        }
 434
 435        ret = rte_event_dev_start(opt->dev_id);
 436        if (ret) {
 437                evt_err("failed to start eventdev %d", opt->dev_id);
 438                return ret;
 439        }
 440
 441
 442        RTE_ETH_FOREACH_DEV(prod) {
 443                ret = rte_eth_dev_start(prod);
 444                if (ret) {
 445                        evt_err("Ethernet dev [%d] failed to start."
 446                                        " Using synthetic producer", prod);
 447                        return ret;
 448                }
 449        }
 450
 451        RTE_ETH_FOREACH_DEV(prod) {
 452                ret = rte_event_eth_rx_adapter_start(prod);
 453                if (ret) {
 454                        evt_err("Rx adapter[%d] start failed", prod);
 455                        return ret;
 456                }
 457
 458                ret = rte_event_eth_tx_adapter_start(prod);
 459                if (ret) {
 460                        evt_err("Tx adapter[%d] start failed", prod);
 461                        return ret;
 462                }
 463        }
 464
 465        memcpy(t->tx_evqueue_id, tx_evqueue_id, sizeof(uint8_t) *
 466                        RTE_MAX_ETHPORTS);
 467
 468        return 0;
 469}
 470
 471static void
 472pipeline_atq_opt_dump(struct evt_options *opt)
 473{
 474        pipeline_opt_dump(opt, pipeline_atq_nb_event_queues(opt));
 475}
 476
 477static int
 478pipeline_atq_opt_check(struct evt_options *opt)
 479{
 480        return pipeline_opt_check(opt, pipeline_atq_nb_event_queues(opt));
 481}
 482
 483static bool
 484pipeline_atq_capability_check(struct evt_options *opt)
 485{
 486        struct rte_event_dev_info dev_info;
 487
 488        rte_event_dev_info_get(opt->dev_id, &dev_info);
 489        if (dev_info.max_event_queues < pipeline_atq_nb_event_queues(opt) ||
 490                        dev_info.max_event_ports <
 491                        evt_nr_active_lcores(opt->wlcores)) {
 492                evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
 493                        pipeline_atq_nb_event_queues(opt),
 494                        dev_info.max_event_queues,
 495                        evt_nr_active_lcores(opt->wlcores),
 496                        dev_info.max_event_ports);
 497        }
 498        if (!evt_has_all_types_queue(opt->dev_id))
 499                return false;
 500
 501        return true;
 502}
 503
 504static const struct evt_test_ops pipeline_atq =  {
 505        .cap_check          = pipeline_atq_capability_check,
 506        .opt_check          = pipeline_atq_opt_check,
 507        .opt_dump           = pipeline_atq_opt_dump,
 508        .test_setup         = pipeline_test_setup,
 509        .mempool_setup      = pipeline_mempool_setup,
 510        .ethdev_setup       = pipeline_ethdev_setup,
 511        .eventdev_setup     = pipeline_atq_eventdev_setup,
 512        .launch_lcores      = pipeline_atq_launch_lcores,
 513        .eventdev_destroy   = pipeline_eventdev_destroy,
 514        .mempool_destroy    = pipeline_mempool_destroy,
 515        .ethdev_destroy     = pipeline_ethdev_destroy,
 516        .test_result        = pipeline_test_result,
 517        .test_destroy       = pipeline_test_destroy,
 518};
 519
 520EVT_TEST_REGISTER(pipeline_atq);
 521