dpdk/drivers/event/opdl/opdl_evdev_init.c
<<
>>
Prefs
   1/* SPDX-License-Identifier: BSD-3-Clause
   2 * Copyright(c) 2017 Intel Corporation
   3 */
   4
   5#include <inttypes.h>
   6#include <string.h>
   7
   8#include <rte_bus_vdev.h>
   9#include <rte_errno.h>
  10#include <rte_cycles.h>
  11#include <rte_memzone.h>
  12
  13#include "opdl_evdev.h"
  14#include "opdl_ring.h"
  15#include "opdl_log.h"
  16
  17
  18static __rte_always_inline uint32_t
  19enqueue_check(struct opdl_port *p,
  20                const struct rte_event ev[],
  21                uint16_t num,
  22                uint16_t num_events)
  23{
  24        uint16_t i;
  25
  26        if (p->opdl->do_validation) {
  27
  28                for (i = 0; i < num; i++) {
  29                        if (ev[i].queue_id != p->next_external_qid) {
  30                                PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
  31                                             "ERROR - port:[%u] - event wants"
  32                                             " to enq to q_id[%u],"
  33                                             " but should be [%u]",
  34                                             opdl_pmd_dev_id(p->opdl),
  35                                             p->id,
  36                                             ev[i].queue_id,
  37                                             p->next_external_qid);
  38                                rte_errno = EINVAL;
  39                                return 0;
  40                        }
  41                }
  42
  43                /* Stats */
  44                if (p->p_type == OPDL_PURE_RX_PORT ||
  45                                p->p_type == OPDL_ASYNC_PORT) {
  46                        /* Stats */
  47                        if (num_events) {
  48                                p->port_stat[claim_pkts_requested] += num;
  49                                p->port_stat[claim_pkts_granted] += num_events;
  50                                p->port_stat[claim_non_empty]++;
  51                                p->start_cycles = rte_rdtsc();
  52                        } else {
  53                                p->port_stat[claim_empty]++;
  54                                p->start_cycles = 0;
  55                        }
  56                } else {
  57                        if (p->start_cycles) {
  58                                uint64_t end_cycles = rte_rdtsc();
  59                                p->port_stat[total_cycles] +=
  60                                        end_cycles - p->start_cycles;
  61                        }
  62                }
  63        } else {
  64                if (num > 0 &&
  65                                ev[0].queue_id != p->next_external_qid) {
  66                        rte_errno = EINVAL;
  67                        return 0;
  68                }
  69        }
  70
  71        return num;
  72}
  73
  74static __rte_always_inline void
  75update_on_dequeue(struct opdl_port *p,
  76                struct rte_event ev[],
  77                uint16_t num,
  78                uint16_t num_events)
  79{
  80        if (p->opdl->do_validation) {
  81                int16_t i;
  82                for (i = 0; i < num; i++)
  83                        ev[i].queue_id =
  84                                p->opdl->queue[p->queue_id].external_qid;
  85
  86                /* Stats */
  87                if (num_events) {
  88                        p->port_stat[claim_pkts_requested] += num;
  89                        p->port_stat[claim_pkts_granted] += num_events;
  90                        p->port_stat[claim_non_empty]++;
  91                        p->start_cycles = rte_rdtsc();
  92                } else {
  93                        p->port_stat[claim_empty]++;
  94                        p->start_cycles = 0;
  95                }
  96        } else {
  97                if (num > 0)
  98                        ev[0].queue_id =
  99                                p->opdl->queue[p->queue_id].external_qid;
 100        }
 101}
 102
 103
 104/*
 105 * Error RX enqueue:
 106 *
 107 *
 108 */
 109
 110static uint16_t
 111opdl_rx_error_enqueue(struct opdl_port *p,
 112                const struct rte_event ev[],
 113                uint16_t num)
 114{
 115        RTE_SET_USED(p);
 116        RTE_SET_USED(ev);
 117        RTE_SET_USED(num);
 118
 119        rte_errno = ENOSPC;
 120
 121        return 0;
 122}
 123
 124/*
 125 * RX enqueue:
 126 *
 127 * This function handles enqueue for a single input stage_inst with
 128 *      threadsafe disabled or enabled. eg 1 thread using a stage_inst or
 129 *      multiple threads sharing a stage_inst
 130 */
 131
 132static uint16_t
 133opdl_rx_enqueue(struct opdl_port *p,
 134                const struct rte_event ev[],
 135                uint16_t num)
 136{
 137        uint16_t enqueued = 0;
 138
 139        enqueued = opdl_ring_input(opdl_stage_get_opdl_ring(p->enq_stage_inst),
 140                                   ev,
 141                                   num,
 142                                   false);
 143        if (!enqueue_check(p, ev, num, enqueued))
 144                return 0;
 145
 146
 147        if (enqueued < num)
 148                rte_errno = ENOSPC;
 149
 150        return enqueued;
 151}
 152
 153/*
 154 * Error TX handler
 155 *
 156 */
 157
 158static uint16_t
 159opdl_tx_error_dequeue(struct opdl_port *p,
 160                struct rte_event ev[],
 161                uint16_t num)
 162{
 163        RTE_SET_USED(p);
 164        RTE_SET_USED(ev);
 165        RTE_SET_USED(num);
 166
 167        rte_errno = ENOSPC;
 168
 169        return 0;
 170}
 171
 172/*
 173 * TX single threaded claim
 174 *
 175 * This function handles dequeue for a single worker stage_inst with
 176 *      threadsafe disabled. eg 1 thread using an stage_inst
 177 */
 178
 179static uint16_t
 180opdl_tx_dequeue_single_thread(struct opdl_port *p,
 181                        struct rte_event ev[],
 182                        uint16_t num)
 183{
 184        uint16_t returned;
 185
 186        struct opdl_ring  *ring;
 187
 188        ring = opdl_stage_get_opdl_ring(p->deq_stage_inst);
 189
 190        returned = opdl_ring_copy_to_burst(ring,
 191                                           p->deq_stage_inst,
 192                                           ev,
 193                                           num,
 194                                           false);
 195
 196        update_on_dequeue(p, ev, num, returned);
 197
 198        return returned;
 199}
 200
 201/*
 202 * TX multi threaded claim
 203 *
 204 * This function handles dequeue for multiple worker stage_inst with
 205 *      threadsafe disabled. eg multiple stage_inst each with its own instance
 206 */
 207
 208static uint16_t
 209opdl_tx_dequeue_multi_inst(struct opdl_port *p,
 210                        struct rte_event ev[],
 211                        uint16_t num)
 212{
 213        uint32_t num_events = 0;
 214
 215        num_events = opdl_stage_claim(p->deq_stage_inst,
 216                                    (void *)ev,
 217                                    num,
 218                                    NULL,
 219                                    false,
 220                                    false);
 221
 222        update_on_dequeue(p, ev, num, num_events);
 223
 224        return opdl_stage_disclaim(p->deq_stage_inst, num_events, false);
 225}
 226
 227
 228/*
 229 * Worker thread claim
 230 *
 231 */
 232
 233static uint16_t
 234opdl_claim(struct opdl_port *p, struct rte_event ev[], uint16_t num)
 235{
 236        uint32_t num_events = 0;
 237
 238        if (unlikely(num > MAX_OPDL_CONS_Q_DEPTH)) {
 239                PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
 240                             "Attempt to dequeue num of events larger than port (%d) max",
 241                             opdl_pmd_dev_id(p->opdl),
 242                             p->id);
 243                rte_errno = EINVAL;
 244                return 0;
 245        }
 246
 247
 248        num_events = opdl_stage_claim(p->deq_stage_inst,
 249                        (void *)ev,
 250                        num,
 251                        NULL,
 252                        false,
 253                        p->atomic_claim);
 254
 255
 256        update_on_dequeue(p, ev, num, num_events);
 257
 258        return num_events;
 259}
 260
 261/*
 262 * Worker thread disclaim
 263 */
 264
 265static uint16_t
 266opdl_disclaim(struct opdl_port *p, const struct rte_event ev[], uint16_t num)
 267{
 268        uint16_t enqueued = 0;
 269
 270        uint32_t i = 0;
 271
 272        for (i = 0; i < num; i++)
 273                opdl_ring_cas_slot(p->enq_stage_inst, &ev[i],
 274                                i, p->atomic_claim);
 275
 276        enqueued = opdl_stage_disclaim(p->enq_stage_inst,
 277                                       num,
 278                                       false);
 279
 280        return enqueue_check(p, ev, num, enqueued);
 281}
 282
 283static __rte_always_inline struct opdl_stage *
 284stage_for_port(struct opdl_queue *q, unsigned int i)
 285{
 286        if (q->q_pos == OPDL_Q_POS_START || q->q_pos == OPDL_Q_POS_MIDDLE)
 287                return q->ports[i]->enq_stage_inst;
 288        else
 289                return q->ports[i]->deq_stage_inst;
 290}
 291
 292static int opdl_add_deps(struct opdl_evdev *device,
 293                         int q_id,
 294                         int deps_q_id)
 295{
 296        unsigned int i, j;
 297        int status;
 298        struct opdl_ring  *ring;
 299        struct opdl_queue *queue = &device->queue[q_id];
 300        struct opdl_queue *queue_deps = &device->queue[deps_q_id];
 301        struct opdl_stage *dep_stages[OPDL_PORTS_MAX];
 302
 303        /* sanity check that all stages are for same opdl ring */
 304        for (i = 0; i < queue->nb_ports; i++) {
 305                struct opdl_ring *r =
 306                        opdl_stage_get_opdl_ring(stage_for_port(queue, i));
 307                for (j = 0; j < queue_deps->nb_ports; j++) {
 308                        struct opdl_ring *rj =
 309                                opdl_stage_get_opdl_ring(
 310                                                stage_for_port(queue_deps, j));
 311                        if (r != rj) {
 312                                PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
 313                                             "Stages and dependents"
 314                                             " are not for same opdl ring",
 315                                             opdl_pmd_dev_id(device));
 316                                uint32_t k;
 317                                for (k = 0; k < device->nb_opdls; k++) {
 318                                        opdl_ring_dump(device->opdl[k],
 319                                                        stdout);
 320                                }
 321                                return -EINVAL;
 322                        }
 323                }
 324        }
 325
 326        /* Gather all stages instance in deps */
 327        for (i = 0; i < queue_deps->nb_ports; i++)
 328                dep_stages[i] = stage_for_port(queue_deps, i);
 329
 330
 331        /* Add all deps for each port->stage_inst in this queue */
 332        for (i = 0; i < queue->nb_ports; i++) {
 333
 334                ring = opdl_stage_get_opdl_ring(stage_for_port(queue, i));
 335
 336                status = opdl_stage_deps_add(ring,
 337                                stage_for_port(queue, i),
 338                                queue->ports[i]->num_instance,
 339                                queue->ports[i]->instance_id,
 340                                dep_stages,
 341                                queue_deps->nb_ports);
 342                if (status < 0)
 343                        return -EINVAL;
 344        }
 345
 346        return 0;
 347}
 348
 349int
 350opdl_add_event_handlers(struct rte_eventdev *dev)
 351{
 352        int err = 0;
 353
 354        struct opdl_evdev *device = opdl_pmd_priv(dev);
 355        unsigned int i;
 356
 357        for (i = 0; i < device->max_port_nb; i++) {
 358
 359                struct opdl_port *port = &device->ports[i];
 360
 361                if (port->configured) {
 362                        if (port->p_type == OPDL_PURE_RX_PORT) {
 363                                port->enq = opdl_rx_enqueue;
 364                                port->deq = opdl_tx_error_dequeue;
 365
 366                        } else if (port->p_type == OPDL_PURE_TX_PORT) {
 367
 368                                port->enq = opdl_rx_error_enqueue;
 369
 370                                if (port->num_instance == 1)
 371                                        port->deq =
 372                                                opdl_tx_dequeue_single_thread;
 373                                else
 374                                        port->deq = opdl_tx_dequeue_multi_inst;
 375
 376                        } else if (port->p_type == OPDL_REGULAR_PORT) {
 377
 378                                port->enq = opdl_disclaim;
 379                                port->deq = opdl_claim;
 380
 381                        } else if (port->p_type == OPDL_ASYNC_PORT) {
 382
 383                                port->enq = opdl_rx_enqueue;
 384
 385                                /* Always single instance */
 386                                port->deq = opdl_tx_dequeue_single_thread;
 387                        } else {
 388                                PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
 389                                             "port:[%u] has invalid port type - ",
 390                                             opdl_pmd_dev_id(port->opdl),
 391                                             port->id);
 392                                err = -EINVAL;
 393                                break;
 394                        }
 395                        port->initialized = 1;
 396                }
 397        }
 398
 399        if (!err)
 400                fprintf(stdout, "Success - enqueue/dequeue handler(s) added\n");
 401        return err;
 402}
 403
 404int
 405build_all_dependencies(struct rte_eventdev *dev)
 406{
 407
 408        int err = 0;
 409        unsigned int i;
 410        struct opdl_evdev *device = opdl_pmd_priv(dev);
 411
 412        uint8_t start_qid = 0;
 413
 414        for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
 415                struct opdl_queue *queue = &device->queue[i];
 416                if (!queue->initialized)
 417                        break;
 418
 419                if (queue->q_pos == OPDL_Q_POS_START) {
 420                        start_qid = i;
 421                        continue;
 422                }
 423
 424                if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
 425                        err = opdl_add_deps(device, i, i-1);
 426                        if (err < 0) {
 427                                PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
 428                                             "dependency addition for queue:[%u] - FAILED",
 429                                             dev->data->dev_id,
 430                                             queue->external_qid);
 431                                break;
 432                        }
 433                }
 434
 435                if (queue->q_pos == OPDL_Q_POS_END) {
 436                        /* Add this dependency */
 437                        err = opdl_add_deps(device, i, i-1);
 438                        if (err < 0) {
 439                                PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
 440                                             "dependency addition for queue:[%u] - FAILED",
 441                                             dev->data->dev_id,
 442                                             queue->external_qid);
 443                                break;
 444                        }
 445                        /* Add dependency for rx on tx */
 446                        err = opdl_add_deps(device, start_qid, i);
 447                        if (err < 0) {
 448                                PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
 449                                             "dependency addition for queue:[%u] - FAILED",
 450                                             dev->data->dev_id,
 451                                             queue->external_qid);
 452                                break;
 453                        }
 454                }
 455        }
 456
 457        if (!err)
 458                fprintf(stdout, "Success - dependencies built\n");
 459
 460        return err;
 461}
 462int
 463check_queues_linked(struct rte_eventdev *dev)
 464{
 465
 466        int err = 0;
 467        unsigned int i;
 468        struct opdl_evdev *device = opdl_pmd_priv(dev);
 469        uint32_t nb_iq = 0;
 470
 471        for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
 472                struct opdl_queue *queue = &device->queue[i];
 473
 474                if (!queue->initialized)
 475                        break;
 476
 477                if (queue->external_qid == OPDL_INVALID_QID)
 478                        nb_iq++;
 479
 480                if (queue->nb_ports == 0) {
 481                        PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
 482                                     "queue:[%u] has no associated ports",
 483                                     dev->data->dev_id,
 484                                     i);
 485                        err = -EINVAL;
 486                        break;
 487                }
 488        }
 489        if (!err) {
 490                if ((i - nb_iq) != device->max_queue_nb) {
 491                        PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
 492                                     "%u queues counted but should be %u",
 493                                     dev->data->dev_id,
 494                                     i - nb_iq,
 495                                     device->max_queue_nb);
 496                        err = -1;
 497                }
 498
 499        }
 500        return err;
 501}
 502
 503void
 504destroy_queues_and_rings(struct rte_eventdev *dev)
 505{
 506        struct opdl_evdev *device = opdl_pmd_priv(dev);
 507        uint32_t i;
 508
 509        for (i = 0; i < device->nb_opdls; i++) {
 510                if (device->opdl[i])
 511                        opdl_ring_free(device->opdl[i]);
 512        }
 513
 514        memset(&device->queue,
 515                        0,
 516                        sizeof(struct opdl_queue)
 517                        * RTE_EVENT_MAX_QUEUES_PER_DEV);
 518}
 519
 520#define OPDL_ID(d)(d->nb_opdls - 1)
 521
 522static __rte_always_inline void
 523initialise_queue(struct opdl_evdev *device,
 524                enum queue_pos pos,
 525                int32_t i)
 526{
 527        struct opdl_queue *queue = &device->queue[device->nb_queues];
 528
 529        if (i == -1) {
 530                queue->q_type = OPDL_Q_TYPE_ORDERED;
 531                queue->external_qid = OPDL_INVALID_QID;
 532        } else {
 533                queue->q_type = device->q_md[i].type;
 534                queue->external_qid = device->q_md[i].ext_id;
 535                /* Add ex->in for queues setup */
 536                device->q_map_ex_to_in[queue->external_qid] = device->nb_queues;
 537        }
 538        queue->opdl_id = OPDL_ID(device);
 539        queue->q_pos = pos;
 540        queue->nb_ports = 0;
 541        queue->configured = 1;
 542
 543        device->nb_queues++;
 544}
 545
 546
 547static __rte_always_inline int
 548create_opdl(struct opdl_evdev *device)
 549{
 550        int err = 0;
 551
 552        char name[RTE_MEMZONE_NAMESIZE];
 553
 554        snprintf(name, RTE_MEMZONE_NAMESIZE,
 555                        "%s_%u", device->service_name, device->nb_opdls);
 556
 557        device->opdl[device->nb_opdls] =
 558                opdl_ring_create(name,
 559                                device->nb_events_limit,
 560                                sizeof(struct rte_event),
 561                                device->max_port_nb * 2,
 562                                device->socket);
 563
 564        if (!device->opdl[device->nb_opdls]) {
 565                PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
 566                             "opdl ring %u creation - FAILED",
 567                             opdl_pmd_dev_id(device),
 568                             device->nb_opdls);
 569                err = -EINVAL;
 570        } else {
 571                device->nb_opdls++;
 572        }
 573        return err;
 574}
 575
 576static __rte_always_inline int
 577create_link_opdl(struct opdl_evdev *device, uint32_t index)
 578{
 579
 580        int err = 0;
 581
 582        if (device->q_md[index + 1].type !=
 583                        OPDL_Q_TYPE_SINGLE_LINK) {
 584
 585                /* async queue with regular
 586                 * queue following it
 587                 */
 588
 589                /* create a new opdl ring */
 590                err = create_opdl(device);
 591                if (!err) {
 592                        /* create an initial
 593                         * dummy queue for new opdl
 594                         */
 595                        initialise_queue(device,
 596                                        OPDL_Q_POS_START,
 597                                        -1);
 598                } else {
 599                        err = -EINVAL;
 600                }
 601        } else {
 602                PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
 603                             "queue %u, two consecutive"
 604                             " SINGLE_LINK queues, not allowed",
 605                             opdl_pmd_dev_id(device),
 606                             index);
 607                err = -EINVAL;
 608        }
 609
 610        return err;
 611}
 612
 613int
 614create_queues_and_rings(struct rte_eventdev *dev)
 615{
 616        int err = 0;
 617
 618        struct opdl_evdev *device = opdl_pmd_priv(dev);
 619
 620        device->nb_queues = 0;
 621
 622        if (device->nb_ports != device->max_port_nb) {
 623                PMD_DRV_LOG(ERR, "Number ports setup:%u NOT EQUAL to max port"
 624                                " number:%u for this device",
 625                                device->nb_ports,
 626                                device->max_port_nb);
 627                err = -1;
 628        }
 629
 630        if (!err) {
 631                /* We will have at least one opdl so create it now */
 632                err = create_opdl(device);
 633        }
 634
 635        if (!err) {
 636
 637                /* Create 1st "dummy" queue */
 638                initialise_queue(device,
 639                                 OPDL_Q_POS_START,
 640                                 -1);
 641
 642                uint32_t i;
 643                for (i = 0; i < device->nb_q_md; i++) {
 644
 645                        /* Check */
 646                        if (!device->q_md[i].setup) {
 647
 648                                PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
 649                                             "queue meta data slot %u"
 650                                             " not setup - FAILING",
 651                                             dev->data->dev_id,
 652                                             i);
 653                                err = -EINVAL;
 654                                break;
 655                        } else if (device->q_md[i].type !=
 656                                        OPDL_Q_TYPE_SINGLE_LINK) {
 657
 658                                if (!device->q_md[i + 1].setup) {
 659                                        /* Create a simple ORDERED/ATOMIC
 660                                         * queue at the end
 661                                         */
 662                                        initialise_queue(device,
 663                                                        OPDL_Q_POS_END,
 664                                                        i);
 665
 666                                } else {
 667                                        /* Create a simple ORDERED/ATOMIC
 668                                         * queue in the middle
 669                                         */
 670                                        initialise_queue(device,
 671                                                        OPDL_Q_POS_MIDDLE,
 672                                                        i);
 673                                }
 674                        } else if (device->q_md[i].type ==
 675                                        OPDL_Q_TYPE_SINGLE_LINK) {
 676
 677                                /* create last queue for this opdl */
 678                                initialise_queue(device,
 679                                                OPDL_Q_POS_END,
 680                                                i);
 681
 682                                err = create_link_opdl(device, i);
 683
 684                                if (err)
 685                                        break;
 686
 687
 688                        }
 689                }
 690        }
 691        if (err)
 692                destroy_queues_and_rings(dev);
 693
 694        return err;
 695}
 696
 697
 698int
 699initialise_all_other_ports(struct rte_eventdev *dev)
 700{
 701        int err = 0;
 702        struct opdl_stage *stage_inst = NULL;
 703
 704        struct opdl_evdev *device = opdl_pmd_priv(dev);
 705
 706        uint32_t i;
 707        for (i = 0; i < device->nb_ports; i++) {
 708                struct opdl_port *port = &device->ports[i];
 709                struct opdl_queue *queue = &device->queue[port->queue_id];
 710
 711                if (port->queue_id == 0) {
 712                        continue;
 713                } else if (queue->q_type != OPDL_Q_TYPE_SINGLE_LINK) {
 714
 715                        if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
 716
 717                                /* Regular port with claim/disclaim */
 718                                stage_inst = opdl_stage_add(
 719                                        device->opdl[queue->opdl_id],
 720                                                false,
 721                                                false);
 722                                port->deq_stage_inst = stage_inst;
 723                                port->enq_stage_inst = stage_inst;
 724
 725                                if (queue->q_type == OPDL_Q_TYPE_ATOMIC)
 726                                        port->atomic_claim = true;
 727                                else
 728                                        port->atomic_claim = false;
 729
 730                                port->p_type =  OPDL_REGULAR_PORT;
 731
 732                                /* Add the port to the queue array of ports */
 733                                queue->ports[queue->nb_ports] = port;
 734                                port->instance_id = queue->nb_ports;
 735                                queue->nb_ports++;
 736                                opdl_stage_set_queue_id(stage_inst,
 737                                                port->queue_id);
 738
 739                        } else if (queue->q_pos == OPDL_Q_POS_END) {
 740
 741                                /* tx port  */
 742                                stage_inst = opdl_stage_add(
 743                                        device->opdl[queue->opdl_id],
 744                                                false,
 745                                                false);
 746                                port->deq_stage_inst = stage_inst;
 747                                port->enq_stage_inst = NULL;
 748                                port->p_type = OPDL_PURE_TX_PORT;
 749
 750                                /* Add the port to the queue array of ports */
 751                                queue->ports[queue->nb_ports] = port;
 752                                port->instance_id = queue->nb_ports;
 753                                queue->nb_ports++;
 754                        } else {
 755
 756                                PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
 757                                             "port %u:, linked incorrectly"
 758                                             " to a q_pos START/INVALID %u",
 759                                             opdl_pmd_dev_id(port->opdl),
 760                                             port->id,
 761                                             queue->q_pos);
 762                                err = -EINVAL;
 763                                break;
 764                        }
 765
 766                } else if (queue->q_type == OPDL_Q_TYPE_SINGLE_LINK) {
 767
 768                        port->p_type = OPDL_ASYNC_PORT;
 769
 770                        /* -- tx -- */
 771                        stage_inst = opdl_stage_add(
 772                                device->opdl[queue->opdl_id],
 773                                        false,
 774                                        false); /* First stage */
 775                        port->deq_stage_inst = stage_inst;
 776
 777                        /* Add the port to the queue array of ports */
 778                        queue->ports[queue->nb_ports] = port;
 779                        port->instance_id = queue->nb_ports;
 780                        queue->nb_ports++;
 781
 782                        if (queue->nb_ports > 1) {
 783                                PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
 784                                             "queue %u:, setup as SINGLE_LINK"
 785                                             " but has more than one port linked",
 786                                             opdl_pmd_dev_id(port->opdl),
 787                                             queue->external_qid);
 788                                err = -EINVAL;
 789                                break;
 790                        }
 791
 792                        /* -- single instance rx for next opdl -- */
 793                        uint8_t next_qid =
 794                                device->q_map_ex_to_in[queue->external_qid] + 1;
 795                        if (next_qid < RTE_EVENT_MAX_QUEUES_PER_DEV &&
 796                                        device->queue[next_qid].configured) {
 797
 798                                /* Remap the queue */
 799                                queue = &device->queue[next_qid];
 800
 801                                stage_inst = opdl_stage_add(
 802                                        device->opdl[queue->opdl_id],
 803                                                false,
 804                                                true);
 805                                port->enq_stage_inst = stage_inst;
 806
 807                                /* Add the port to the queue array of ports */
 808                                queue->ports[queue->nb_ports] = port;
 809                                port->instance_id = queue->nb_ports;
 810                                queue->nb_ports++;
 811                                if (queue->nb_ports > 1) {
 812                                        PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
 813                                                "dummy queue %u: for "
 814                                                "port %u, "
 815                                                "SINGLE_LINK but has more "
 816                                                "than one port linked",
 817                                                opdl_pmd_dev_id(port->opdl),
 818                                                next_qid,
 819                                                port->id);
 820                                        err = -EINVAL;
 821                                        break;
 822                                }
 823                                /* Set this queue to initialized as it is never
 824                                 * referenced by any ports
 825                                 */
 826                                queue->initialized = 1;
 827                        }
 828                }
 829        }
 830
 831        /* Now that all ports are initialised we need to
 832         * setup the last bit of stage md
 833         */
 834        if (!err) {
 835                for (i = 0; i < device->nb_ports; i++) {
 836                        struct opdl_port *port = &device->ports[i];
 837                        struct opdl_queue *queue =
 838                                &device->queue[port->queue_id];
 839
 840                        if (port->configured &&
 841                                        (port->queue_id != OPDL_INVALID_QID)) {
 842                                if (queue->nb_ports == 0) {
 843                                        PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
 844                                                "queue:[%u] has no ports"
 845                                                " linked to it",
 846                                                opdl_pmd_dev_id(port->opdl),
 847                                                port->id);
 848                                        err = -EINVAL;
 849                                        break;
 850                                }
 851
 852                                port->num_instance = queue->nb_ports;
 853                                port->initialized = 1;
 854                                queue->initialized = 1;
 855                        } else {
 856                                PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
 857                                             "Port:[%u] not configured  invalid"
 858                                             " queue configuration",
 859                                             opdl_pmd_dev_id(port->opdl),
 860                                             port->id);
 861                                err = -EINVAL;
 862                                break;
 863                        }
 864                }
 865        }
 866        return err;
 867}
 868
 869int
 870initialise_queue_zero_ports(struct rte_eventdev *dev)
 871{
 872        int err = 0;
 873        uint8_t mt_rx = 0;
 874        struct opdl_stage *stage_inst = NULL;
 875        struct opdl_queue *queue = NULL;
 876
 877        struct opdl_evdev *device = opdl_pmd_priv(dev);
 878
 879        /* Assign queue zero and figure out how many Q0 ports we have */
 880        uint32_t i;
 881        for (i = 0; i < device->nb_ports; i++) {
 882                struct opdl_port *port = &device->ports[i];
 883                if (port->queue_id == OPDL_INVALID_QID) {
 884                        port->queue_id = 0;
 885                        port->external_qid = OPDL_INVALID_QID;
 886                        port->p_type = OPDL_PURE_RX_PORT;
 887                        mt_rx++;
 888                }
 889        }
 890
 891        /* Create the stage */
 892        stage_inst = opdl_stage_add(device->opdl[0],
 893                        (mt_rx > 1 ? true : false),
 894                        true);
 895        if (stage_inst) {
 896
 897                /* Assign the new created input stage to all relevant ports */
 898                for (i = 0; i < device->nb_ports; i++) {
 899                        struct opdl_port *port = &device->ports[i];
 900                        if (port->queue_id == 0) {
 901                                queue = &device->queue[port->queue_id];
 902                                port->enq_stage_inst = stage_inst;
 903                                port->deq_stage_inst = NULL;
 904                                port->configured = 1;
 905                                port->initialized = 1;
 906
 907                                queue->ports[queue->nb_ports] = port;
 908                                port->instance_id = queue->nb_ports;
 909                                queue->nb_ports++;
 910                        }
 911                }
 912        } else {
 913                err = -1;
 914        }
 915        return err;
 916}
 917
 918int
 919assign_internal_queue_ids(struct rte_eventdev *dev)
 920{
 921        int err = 0;
 922        struct opdl_evdev *device = opdl_pmd_priv(dev);
 923        uint32_t i;
 924
 925        for (i = 0; i < device->nb_ports; i++) {
 926                struct opdl_port *port = &device->ports[i];
 927                if (port->external_qid != OPDL_INVALID_QID) {
 928                        port->queue_id =
 929                                device->q_map_ex_to_in[port->external_qid];
 930
 931                        /* Now do the external_qid of the next queue */
 932                        struct opdl_queue *queue =
 933                                &device->queue[port->queue_id];
 934                        if (queue->q_pos == OPDL_Q_POS_END)
 935                                port->next_external_qid =
 936                                device->queue[port->queue_id + 2].external_qid;
 937                        else
 938                                port->next_external_qid =
 939                                device->queue[port->queue_id + 1].external_qid;
 940                }
 941        }
 942        return err;
 943}
 944