dpdk/lib/eventdev/rte_event_eth_tx_adapter.c
<<
>>
Prefs
   1/* SPDX-License-Identifier: BSD-3-Clause
   2 * Copyright(c) 2018 Intel Corporation.
   3 */
   4#include <rte_spinlock.h>
   5#include <rte_service_component.h>
   6#include <ethdev_driver.h>
   7
   8#include "eventdev_pmd.h"
   9#include "eventdev_trace.h"
  10#include "rte_event_eth_tx_adapter.h"
  11
  12#define TXA_BATCH_SIZE          32
  13#define TXA_SERVICE_NAME_LEN    32
  14#define TXA_MEM_NAME_LEN        32
  15#define TXA_FLUSH_THRESHOLD     1024
  16#define TXA_RETRY_CNT           100
  17#define TXA_MAX_NB_TX           128
  18#define TXA_INVALID_DEV_ID      INT32_C(-1)
  19#define TXA_INVALID_SERVICE_ID  INT64_C(-1)
  20
  21#define txa_evdev(id) (&rte_eventdevs[txa_dev_id_array[(id)]])
  22
  23#define txa_dev_caps_get(id) txa_evdev((id))->dev_ops->eth_tx_adapter_caps_get
  24
  25#define txa_dev_adapter_create(t) txa_evdev(t)->dev_ops->eth_tx_adapter_create
  26
  27#define txa_dev_adapter_create_ext(t) \
  28                                txa_evdev(t)->dev_ops->eth_tx_adapter_create
  29
  30#define txa_dev_adapter_free(t) txa_evdev(t)->dev_ops->eth_tx_adapter_free
  31
  32#define txa_dev_queue_add(id) txa_evdev(id)->dev_ops->eth_tx_adapter_queue_add
  33
  34#define txa_dev_queue_del(t) txa_evdev(t)->dev_ops->eth_tx_adapter_queue_del
  35
  36#define txa_dev_start(t) txa_evdev(t)->dev_ops->eth_tx_adapter_start
  37
  38#define txa_dev_stop(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stop
  39
  40#define txa_dev_stats_reset(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_reset
  41
  42#define txa_dev_stats_get(t) txa_evdev(t)->dev_ops->eth_tx_adapter_stats_get
  43
  44#define RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) \
  45do { \
  46        if (!txa_valid_id(id)) { \
  47                RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d", id); \
  48                return retval; \
  49        } \
  50} while (0)
  51
  52#define TXA_CHECK_OR_ERR_RET(id) \
  53do {\
  54        int ret; \
  55        RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET((id), -EINVAL); \
  56        ret = txa_init(); \
  57        if (ret != 0) \
  58                return ret; \
  59        if (!txa_adapter_exist((id))) \
  60                return -EINVAL; \
  61} while (0)
  62
  63#define TXA_CHECK_TXQ(dev, queue) \
  64do {\
  65        if ((dev)->data->nb_tx_queues == 0) { \
  66                RTE_EDEV_LOG_ERR("No tx queues configured"); \
  67                return -EINVAL; \
  68        } \
  69        if ((queue) != -1 && \
  70                (uint16_t)(queue) >= (dev)->data->nb_tx_queues) { \
  71                RTE_EDEV_LOG_ERR("Invalid tx queue_id %" PRIu16, \
  72                                (uint16_t)(queue)); \
  73                return -EINVAL; \
  74        } \
  75} while (0)
  76
  77/* Tx retry callback structure */
  78struct txa_retry {
  79        /* Ethernet port id */
  80        uint16_t port_id;
  81        /* Tx queue */
  82        uint16_t tx_queue;
  83        /* Adapter ID */
  84        uint8_t id;
  85};
  86
  87/* Per queue structure */
  88struct txa_service_queue_info {
  89        /* Queue has been added */
  90        uint8_t added;
  91        /* Retry callback argument */
  92        struct txa_retry txa_retry;
  93        /* Tx buffer */
  94        struct rte_eth_dev_tx_buffer *tx_buf;
  95};
  96
  97/* PMD private structure */
  98struct txa_service_data {
  99        /* Max mbufs processed in any service function invocation */
 100        uint32_t max_nb_tx;
 101        /* Number of Tx queues in adapter */
 102        uint32_t nb_queues;
 103        /*  Synchronization with data path */
 104        rte_spinlock_t tx_lock;
 105        /* Event port ID */
 106        uint8_t port_id;
 107        /* Event device identifier */
 108        uint8_t eventdev_id;
 109        /* Highest port id supported + 1 */
 110        uint16_t dev_count;
 111        /* Loop count to flush Tx buffers */
 112        int loop_cnt;
 113        /* Per ethernet device structure */
 114        struct txa_service_ethdev *txa_ethdev;
 115        /* Statistics */
 116        struct rte_event_eth_tx_adapter_stats stats;
 117        /* Adapter Identifier */
 118        uint8_t id;
 119        /* Conf arg must be freed */
 120        uint8_t conf_free;
 121        /* Configuration callback */
 122        rte_event_eth_tx_adapter_conf_cb conf_cb;
 123        /* Configuration callback argument */
 124        void *conf_arg;
 125        /* socket id */
 126        int socket_id;
 127        /* Per adapter EAL service */
 128        int64_t service_id;
 129        /* Memory allocation name */
 130        char mem_name[TXA_MEM_NAME_LEN];
 131} __rte_cache_aligned;
 132
 133/* Per eth device structure */
 134struct txa_service_ethdev {
 135        /* Pointer to ethernet device */
 136        struct rte_eth_dev *dev;
 137        /* Number of queues added */
 138        uint16_t nb_queues;
 139        /* PMD specific queue data */
 140        void *queues;
 141};
 142
 143/* Array of adapter instances, initialized with event device id
 144 * when adapter is created
 145 */
 146static int *txa_dev_id_array;
 147
 148/* Array of pointers to service implementation data */
 149static struct txa_service_data **txa_service_data_array;
 150
 151static int32_t txa_service_func(void *args);
 152static int txa_service_adapter_create_ext(uint8_t id,
 153                        struct rte_eventdev *dev,
 154                        rte_event_eth_tx_adapter_conf_cb conf_cb,
 155                        void *conf_arg);
 156static int txa_service_queue_del(uint8_t id,
 157                                const struct rte_eth_dev *dev,
 158                                int32_t tx_queue_id);
 159
 160static int
 161txa_adapter_exist(uint8_t id)
 162{
 163        return txa_dev_id_array[id] != TXA_INVALID_DEV_ID;
 164}
 165
 166static inline int
 167txa_valid_id(uint8_t id)
 168{
 169        return id < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE;
 170}
 171
 172static void *
 173txa_memzone_array_get(const char *name, unsigned int elt_size, int nb_elems)
 174{
 175        const struct rte_memzone *mz;
 176        unsigned int sz;
 177
 178        sz = elt_size * nb_elems;
 179        sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
 180
 181        mz = rte_memzone_lookup(name);
 182        if (mz == NULL) {
 183                mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
 184                                                 RTE_CACHE_LINE_SIZE);
 185                if (mz == NULL) {
 186                        RTE_EDEV_LOG_ERR("failed to reserve memzone"
 187                                        " name = %s err = %"
 188                                        PRId32, name, rte_errno);
 189                        return NULL;
 190                }
 191        }
 192
 193        return  mz->addr;
 194}
 195
 196static int
 197txa_dev_id_array_init(void)
 198{
 199        if (txa_dev_id_array == NULL) {
 200                int i;
 201
 202                txa_dev_id_array = txa_memzone_array_get("txa_adapter_array",
 203                                        sizeof(int),
 204                                        RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
 205                if (txa_dev_id_array == NULL)
 206                        return -ENOMEM;
 207
 208                for (i = 0; i < RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE; i++)
 209                        txa_dev_id_array[i] = TXA_INVALID_DEV_ID;
 210        }
 211
 212        return 0;
 213}
 214
 215static int
 216txa_init(void)
 217{
 218        return txa_dev_id_array_init();
 219}
 220
 221static int
 222txa_service_data_init(void)
 223{
 224        if (txa_service_data_array == NULL) {
 225                txa_service_data_array =
 226                                txa_memzone_array_get("txa_service_data_array",
 227                                        sizeof(int),
 228                                        RTE_EVENT_ETH_TX_ADAPTER_MAX_INSTANCE);
 229                if (txa_service_data_array == NULL)
 230                        return -ENOMEM;
 231        }
 232
 233        return 0;
 234}
 235
 236static inline struct txa_service_data *
 237txa_service_id_to_data(uint8_t id)
 238{
 239        return txa_service_data_array[id];
 240}
 241
 242static inline struct txa_service_queue_info *
 243txa_service_queue(struct txa_service_data *txa, uint16_t port_id,
 244                uint16_t tx_queue_id)
 245{
 246        struct txa_service_queue_info *tqi;
 247
 248        if (unlikely(txa->txa_ethdev == NULL || txa->dev_count < port_id + 1))
 249                return NULL;
 250
 251        tqi = txa->txa_ethdev[port_id].queues;
 252
 253        return likely(tqi != NULL) ? tqi + tx_queue_id : NULL;
 254}
 255
 256static int
 257txa_service_conf_cb(uint8_t __rte_unused id, uint8_t dev_id,
 258                struct rte_event_eth_tx_adapter_conf *conf, void *arg)
 259{
 260        int ret;
 261        struct rte_eventdev *dev;
 262        struct rte_event_port_conf *pc;
 263        struct rte_event_dev_config dev_conf;
 264        int started;
 265        uint8_t port_id;
 266
 267        pc = arg;
 268        dev = &rte_eventdevs[dev_id];
 269        dev_conf = dev->data->dev_conf;
 270
 271        started = dev->data->dev_started;
 272        if (started)
 273                rte_event_dev_stop(dev_id);
 274
 275        port_id = dev_conf.nb_event_ports;
 276        dev_conf.nb_event_ports += 1;
 277
 278        ret = rte_event_dev_configure(dev_id, &dev_conf);
 279        if (ret) {
 280                RTE_EDEV_LOG_ERR("failed to configure event dev %u",
 281                                                dev_id);
 282                if (started) {
 283                        if (rte_event_dev_start(dev_id))
 284                                return -EIO;
 285                }
 286                return ret;
 287        }
 288
 289        ret = rte_event_port_setup(dev_id, port_id, pc);
 290        if (ret) {
 291                RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
 292                                        port_id);
 293                if (started) {
 294                        if (rte_event_dev_start(dev_id))
 295                                return -EIO;
 296                }
 297                return ret;
 298        }
 299
 300        conf->event_port_id = port_id;
 301        conf->max_nb_tx = TXA_MAX_NB_TX;
 302        if (started)
 303                ret = rte_event_dev_start(dev_id);
 304        return ret;
 305}
 306
 307static int
 308txa_service_ethdev_alloc(struct txa_service_data *txa)
 309{
 310        struct txa_service_ethdev *txa_ethdev;
 311        uint16_t i, dev_count;
 312
 313        dev_count = rte_eth_dev_count_avail();
 314        if (txa->txa_ethdev && dev_count == txa->dev_count)
 315                return 0;
 316
 317        txa_ethdev = rte_zmalloc_socket(txa->mem_name,
 318                                        dev_count * sizeof(*txa_ethdev),
 319                                        0,
 320                                        txa->socket_id);
 321        if (txa_ethdev == NULL) {
 322                RTE_EDEV_LOG_ERR("Failed to alloc txa::txa_ethdev ");
 323                return -ENOMEM;
 324        }
 325
 326        if (txa->dev_count)
 327                memcpy(txa_ethdev, txa->txa_ethdev,
 328                        txa->dev_count * sizeof(*txa_ethdev));
 329
 330        RTE_ETH_FOREACH_DEV(i) {
 331                if (i == dev_count)
 332                        break;
 333                txa_ethdev[i].dev = &rte_eth_devices[i];
 334        }
 335
 336        txa->txa_ethdev = txa_ethdev;
 337        txa->dev_count = dev_count;
 338        return 0;
 339}
 340
 341static int
 342txa_service_queue_array_alloc(struct txa_service_data *txa,
 343                        uint16_t port_id)
 344{
 345        struct txa_service_queue_info *tqi;
 346        uint16_t nb_queue;
 347        int ret;
 348
 349        ret = txa_service_ethdev_alloc(txa);
 350        if (ret != 0)
 351                return ret;
 352
 353        if (txa->txa_ethdev[port_id].queues)
 354                return 0;
 355
 356        nb_queue = txa->txa_ethdev[port_id].dev->data->nb_tx_queues;
 357        tqi = rte_zmalloc_socket(txa->mem_name,
 358                                nb_queue *
 359                                sizeof(struct txa_service_queue_info), 0,
 360                                txa->socket_id);
 361        if (tqi == NULL)
 362                return -ENOMEM;
 363        txa->txa_ethdev[port_id].queues = tqi;
 364        return 0;
 365}
 366
 367static void
 368txa_service_queue_array_free(struct txa_service_data *txa,
 369                        uint16_t port_id)
 370{
 371        struct txa_service_ethdev *txa_ethdev;
 372        struct txa_service_queue_info *tqi;
 373
 374        txa_ethdev = &txa->txa_ethdev[port_id];
 375        if (txa->txa_ethdev == NULL || txa_ethdev->nb_queues != 0)
 376                return;
 377
 378        tqi = txa_ethdev->queues;
 379        txa_ethdev->queues = NULL;
 380        rte_free(tqi);
 381
 382        if (txa->nb_queues == 0) {
 383                rte_free(txa->txa_ethdev);
 384                txa->txa_ethdev = NULL;
 385        }
 386}
 387
 388static void
 389txa_service_unregister(struct txa_service_data *txa)
 390{
 391        if (txa->service_id != TXA_INVALID_SERVICE_ID) {
 392                rte_service_component_runstate_set(txa->service_id, 0);
 393                while (rte_service_may_be_active(txa->service_id))
 394                        rte_pause();
 395                rte_service_component_unregister(txa->service_id);
 396        }
 397        txa->service_id = TXA_INVALID_SERVICE_ID;
 398}
 399
 400static int
 401txa_service_register(struct txa_service_data *txa)
 402{
 403        int ret;
 404        struct rte_service_spec service;
 405        struct rte_event_eth_tx_adapter_conf conf;
 406
 407        if (txa->service_id != TXA_INVALID_SERVICE_ID)
 408                return 0;
 409
 410        memset(&service, 0, sizeof(service));
 411        snprintf(service.name, TXA_SERVICE_NAME_LEN, "txa_%d", txa->id);
 412        service.socket_id = txa->socket_id;
 413        service.callback = txa_service_func;
 414        service.callback_userdata = txa;
 415        service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
 416        ret = rte_service_component_register(&service,
 417                                        (uint32_t *)&txa->service_id);
 418        if (ret) {
 419                RTE_EDEV_LOG_ERR("failed to register service %s err = %"
 420                                 PRId32, service.name, ret);
 421                return ret;
 422        }
 423
 424        ret = txa->conf_cb(txa->id, txa->eventdev_id, &conf, txa->conf_arg);
 425        if (ret) {
 426                txa_service_unregister(txa);
 427                return ret;
 428        }
 429
 430        rte_service_component_runstate_set(txa->service_id, 1);
 431        txa->port_id = conf.event_port_id;
 432        txa->max_nb_tx = conf.max_nb_tx;
 433        return 0;
 434}
 435
 436static struct rte_eth_dev_tx_buffer *
 437txa_service_tx_buf_alloc(struct txa_service_data *txa,
 438                        const struct rte_eth_dev *dev)
 439{
 440        struct rte_eth_dev_tx_buffer *tb;
 441        uint16_t port_id;
 442
 443        port_id = dev->data->port_id;
 444        tb = rte_zmalloc_socket(txa->mem_name,
 445                                RTE_ETH_TX_BUFFER_SIZE(TXA_BATCH_SIZE),
 446                                0,
 447                                rte_eth_dev_socket_id(port_id));
 448        if (tb == NULL)
 449                RTE_EDEV_LOG_ERR("Failed to allocate memory for tx buffer");
 450        return tb;
 451}
 452
 453static int
 454txa_service_is_queue_added(struct txa_service_data *txa,
 455                        const struct rte_eth_dev *dev,
 456                        uint16_t tx_queue_id)
 457{
 458        struct txa_service_queue_info *tqi;
 459
 460        tqi = txa_service_queue(txa, dev->data->port_id, tx_queue_id);
 461        return tqi && tqi->added;
 462}
 463
 464static int
 465txa_service_ctrl(uint8_t id, int start)
 466{
 467        int ret;
 468        struct txa_service_data *txa;
 469
 470        txa = txa_service_id_to_data(id);
 471        if (txa->service_id == TXA_INVALID_SERVICE_ID)
 472                return 0;
 473
 474        ret = rte_service_runstate_set(txa->service_id, start);
 475        if (ret == 0 && !start) {
 476                while (rte_service_may_be_active(txa->service_id))
 477                        rte_pause();
 478        }
 479        return ret;
 480}
 481
 482static void
 483txa_service_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
 484                        void *userdata)
 485{
 486        struct txa_retry *tr;
 487        struct txa_service_data *data;
 488        struct rte_event_eth_tx_adapter_stats *stats;
 489        uint16_t sent = 0;
 490        unsigned int retry = 0;
 491        uint16_t i, n;
 492
 493        tr = (struct txa_retry *)(uintptr_t)userdata;
 494        data = txa_service_id_to_data(tr->id);
 495        stats = &data->stats;
 496
 497        do {
 498                n = rte_eth_tx_burst(tr->port_id, tr->tx_queue,
 499                               &pkts[sent], unsent - sent);
 500
 501                sent += n;
 502        } while (sent != unsent && retry++ < TXA_RETRY_CNT);
 503
 504        for (i = sent; i < unsent; i++)
 505                rte_pktmbuf_free(pkts[i]);
 506
 507        stats->tx_retry += retry;
 508        stats->tx_packets += sent;
 509        stats->tx_dropped += unsent - sent;
 510}
 511
 512static uint16_t
 513txa_process_event_vector(struct txa_service_data *txa,
 514                         struct rte_event_vector *vec)
 515{
 516        struct txa_service_queue_info *tqi;
 517        uint16_t port, queue, nb_tx = 0;
 518        struct rte_mbuf **mbufs;
 519        int i;
 520
 521        mbufs = (struct rte_mbuf **)vec->mbufs;
 522        if (vec->attr_valid) {
 523                port = vec->port;
 524                queue = vec->queue;
 525                tqi = txa_service_queue(txa, port, queue);
 526                if (unlikely(tqi == NULL || !tqi->added)) {
 527                        rte_pktmbuf_free_bulk(mbufs, vec->nb_elem);
 528                        rte_mempool_put(rte_mempool_from_obj(vec), vec);
 529                        return 0;
 530                }
 531                for (i = 0; i < vec->nb_elem; i++) {
 532                        nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
 533                                                   mbufs[i]);
 534                }
 535        } else {
 536                for (i = 0; i < vec->nb_elem; i++) {
 537                        port = mbufs[i]->port;
 538                        queue = rte_event_eth_tx_adapter_txq_get(mbufs[i]);
 539                        tqi = txa_service_queue(txa, port, queue);
 540                        if (unlikely(tqi == NULL || !tqi->added)) {
 541                                rte_pktmbuf_free(mbufs[i]);
 542                                continue;
 543                        }
 544                        nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf,
 545                                                   mbufs[i]);
 546                }
 547        }
 548        rte_mempool_put(rte_mempool_from_obj(vec), vec);
 549
 550        return nb_tx;
 551}
 552
 553static void
 554txa_service_tx(struct txa_service_data *txa, struct rte_event *ev,
 555        uint32_t n)
 556{
 557        uint32_t i;
 558        uint16_t nb_tx;
 559        struct rte_event_eth_tx_adapter_stats *stats;
 560
 561        stats = &txa->stats;
 562
 563        nb_tx = 0;
 564        for (i = 0; i < n; i++) {
 565                uint16_t port;
 566                uint16_t queue;
 567                struct txa_service_queue_info *tqi;
 568
 569                if (!(ev[i].event_type & RTE_EVENT_TYPE_VECTOR)) {
 570                        struct rte_mbuf *m;
 571
 572                        m = ev[i].mbuf;
 573                        port = m->port;
 574                        queue = rte_event_eth_tx_adapter_txq_get(m);
 575
 576                        tqi = txa_service_queue(txa, port, queue);
 577                        if (unlikely(tqi == NULL || !tqi->added)) {
 578                                rte_pktmbuf_free(m);
 579                                continue;
 580                        }
 581
 582                        nb_tx += rte_eth_tx_buffer(port, queue, tqi->tx_buf, m);
 583                } else {
 584                        nb_tx += txa_process_event_vector(txa, ev[i].vec);
 585                }
 586        }
 587
 588        stats->tx_packets += nb_tx;
 589}
 590
 591static int32_t
 592txa_service_func(void *args)
 593{
 594        struct txa_service_data *txa = args;
 595        uint8_t dev_id;
 596        uint8_t port;
 597        uint16_t n;
 598        uint32_t nb_tx, max_nb_tx;
 599        struct rte_event ev[TXA_BATCH_SIZE];
 600
 601        dev_id = txa->eventdev_id;
 602        max_nb_tx = txa->max_nb_tx;
 603        port = txa->port_id;
 604
 605        if (txa->nb_queues == 0)
 606                return 0;
 607
 608        if (!rte_spinlock_trylock(&txa->tx_lock))
 609                return 0;
 610
 611        for (nb_tx = 0; nb_tx < max_nb_tx; nb_tx += n) {
 612
 613                n = rte_event_dequeue_burst(dev_id, port, ev, RTE_DIM(ev), 0);
 614                if (!n)
 615                        break;
 616                txa_service_tx(txa, ev, n);
 617        }
 618
 619        if ((txa->loop_cnt++ & (TXA_FLUSH_THRESHOLD - 1)) == 0) {
 620
 621                struct txa_service_ethdev *tdi;
 622                struct txa_service_queue_info *tqi;
 623                struct rte_eth_dev *dev;
 624                uint16_t i;
 625
 626                tdi = txa->txa_ethdev;
 627                nb_tx = 0;
 628
 629                RTE_ETH_FOREACH_DEV(i) {
 630                        uint16_t q;
 631
 632                        if (i == txa->dev_count)
 633                                break;
 634
 635                        dev = tdi[i].dev;
 636                        if (tdi[i].nb_queues == 0)
 637                                continue;
 638                        for (q = 0; q < dev->data->nb_tx_queues; q++) {
 639
 640                                tqi = txa_service_queue(txa, i, q);
 641                                if (unlikely(tqi == NULL || !tqi->added))
 642                                        continue;
 643
 644                                nb_tx += rte_eth_tx_buffer_flush(i, q,
 645                                                        tqi->tx_buf);
 646                        }
 647                }
 648
 649                txa->stats.tx_packets += nb_tx;
 650        }
 651        rte_spinlock_unlock(&txa->tx_lock);
 652        return 0;
 653}
 654
 655static int
 656txa_service_adapter_create(uint8_t id, struct rte_eventdev *dev,
 657                        struct rte_event_port_conf *port_conf)
 658{
 659        struct txa_service_data *txa;
 660        struct rte_event_port_conf *cb_conf;
 661        int ret;
 662
 663        cb_conf = rte_malloc(NULL, sizeof(*cb_conf), 0);
 664        if (cb_conf == NULL)
 665                return -ENOMEM;
 666
 667        *cb_conf = *port_conf;
 668        ret = txa_service_adapter_create_ext(id, dev, txa_service_conf_cb,
 669                                        cb_conf);
 670        if (ret) {
 671                rte_free(cb_conf);
 672                return ret;
 673        }
 674
 675        txa = txa_service_id_to_data(id);
 676        txa->conf_free = 1;
 677        return ret;
 678}
 679
 680static int
 681txa_service_adapter_create_ext(uint8_t id, struct rte_eventdev *dev,
 682                        rte_event_eth_tx_adapter_conf_cb conf_cb,
 683                        void *conf_arg)
 684{
 685        struct txa_service_data *txa;
 686        int socket_id;
 687        char mem_name[TXA_SERVICE_NAME_LEN];
 688        int ret;
 689
 690        if (conf_cb == NULL)
 691                return -EINVAL;
 692
 693        socket_id = dev->data->socket_id;
 694        snprintf(mem_name, TXA_MEM_NAME_LEN,
 695                "rte_event_eth_txa_%d",
 696                id);
 697
 698        ret = txa_service_data_init();
 699        if (ret != 0)
 700                return ret;
 701
 702        txa = rte_zmalloc_socket(mem_name,
 703                                sizeof(*txa),
 704                                RTE_CACHE_LINE_SIZE, socket_id);
 705        if (txa == NULL) {
 706                RTE_EDEV_LOG_ERR("failed to get mem for tx adapter");
 707                return -ENOMEM;
 708        }
 709
 710        txa->id = id;
 711        txa->eventdev_id = dev->data->dev_id;
 712        txa->socket_id = socket_id;
 713        strncpy(txa->mem_name, mem_name, TXA_SERVICE_NAME_LEN);
 714        txa->conf_cb = conf_cb;
 715        txa->conf_arg = conf_arg;
 716        txa->service_id = TXA_INVALID_SERVICE_ID;
 717        rte_spinlock_init(&txa->tx_lock);
 718        txa_service_data_array[id] = txa;
 719
 720        return 0;
 721}
 722
 723static int
 724txa_service_event_port_get(uint8_t id, uint8_t *port)
 725{
 726        struct txa_service_data *txa;
 727
 728        txa = txa_service_id_to_data(id);
 729        if (txa->service_id == TXA_INVALID_SERVICE_ID)
 730                return -ENODEV;
 731
 732        *port = txa->port_id;
 733        return 0;
 734}
 735
 736static int
 737txa_service_adapter_free(uint8_t id)
 738{
 739        struct txa_service_data *txa;
 740
 741        txa = txa_service_id_to_data(id);
 742        if (txa->nb_queues) {
 743                RTE_EDEV_LOG_ERR("%" PRIu16 " Tx queues not deleted",
 744                                txa->nb_queues);
 745                return -EBUSY;
 746        }
 747
 748        if (txa->conf_free)
 749                rte_free(txa->conf_arg);
 750        rte_free(txa);
 751        return 0;
 752}
 753
 754static int
 755txa_service_queue_add(uint8_t id,
 756                __rte_unused struct rte_eventdev *dev,
 757                const struct rte_eth_dev *eth_dev,
 758                int32_t tx_queue_id)
 759{
 760        struct txa_service_data *txa;
 761        struct txa_service_ethdev *tdi;
 762        struct txa_service_queue_info *tqi;
 763        struct rte_eth_dev_tx_buffer *tb;
 764        struct txa_retry *txa_retry;
 765        int ret = 0;
 766
 767        txa = txa_service_id_to_data(id);
 768
 769        if (tx_queue_id == -1) {
 770                int nb_queues;
 771                uint16_t i, j;
 772                uint16_t *qdone;
 773
 774                nb_queues = eth_dev->data->nb_tx_queues;
 775                if (txa->dev_count > eth_dev->data->port_id) {
 776                        tdi = &txa->txa_ethdev[eth_dev->data->port_id];
 777                        nb_queues -= tdi->nb_queues;
 778                }
 779
 780                qdone = rte_zmalloc(txa->mem_name,
 781                                nb_queues * sizeof(*qdone), 0);
 782                if (qdone == NULL)
 783                        return -ENOMEM;
 784                j = 0;
 785                for (i = 0; i < nb_queues; i++) {
 786                        if (txa_service_is_queue_added(txa, eth_dev, i))
 787                                continue;
 788                        ret = txa_service_queue_add(id, dev, eth_dev, i);
 789                        if (ret == 0)
 790                                qdone[j++] = i;
 791                        else
 792                                break;
 793                }
 794
 795                if (i != nb_queues) {
 796                        for (i = 0; i < j; i++)
 797                                txa_service_queue_del(id, eth_dev, qdone[i]);
 798                }
 799                rte_free(qdone);
 800                return ret;
 801        }
 802
 803        ret = txa_service_register(txa);
 804        if (ret)
 805                return ret;
 806
 807        rte_spinlock_lock(&txa->tx_lock);
 808
 809        if (txa_service_is_queue_added(txa, eth_dev, tx_queue_id))
 810                goto ret_unlock;
 811
 812        ret = txa_service_queue_array_alloc(txa, eth_dev->data->port_id);
 813        if (ret)
 814                goto err_unlock;
 815
 816        tb = txa_service_tx_buf_alloc(txa, eth_dev);
 817        if (tb == NULL)
 818                goto err_unlock;
 819
 820        tdi = &txa->txa_ethdev[eth_dev->data->port_id];
 821        tqi = txa_service_queue(txa, eth_dev->data->port_id, tx_queue_id);
 822        if (tqi == NULL)
 823                goto err_unlock;
 824
 825        txa_retry = &tqi->txa_retry;
 826        txa_retry->id = txa->id;
 827        txa_retry->port_id = eth_dev->data->port_id;
 828        txa_retry->tx_queue = tx_queue_id;
 829
 830        rte_eth_tx_buffer_init(tb, TXA_BATCH_SIZE);
 831        rte_eth_tx_buffer_set_err_callback(tb,
 832                txa_service_buffer_retry, txa_retry);
 833
 834        tqi->tx_buf = tb;
 835        tqi->added = 1;
 836        tdi->nb_queues++;
 837        txa->nb_queues++;
 838
 839ret_unlock:
 840        rte_spinlock_unlock(&txa->tx_lock);
 841        return 0;
 842
 843err_unlock:
 844        if (txa->nb_queues == 0) {
 845                txa_service_queue_array_free(txa,
 846                                        eth_dev->data->port_id);
 847                txa_service_unregister(txa);
 848        }
 849
 850        rte_spinlock_unlock(&txa->tx_lock);
 851        return -1;
 852}
 853
 854static int
 855txa_service_queue_del(uint8_t id,
 856                const struct rte_eth_dev *dev,
 857                int32_t tx_queue_id)
 858{
 859        struct txa_service_data *txa;
 860        struct txa_service_queue_info *tqi;
 861        struct rte_eth_dev_tx_buffer *tb;
 862        uint16_t port_id;
 863
 864        txa = txa_service_id_to_data(id);
 865        port_id = dev->data->port_id;
 866
 867        if (tx_queue_id == -1) {
 868                uint16_t i, q, nb_queues;
 869                int ret = 0;
 870
 871                nb_queues = txa->txa_ethdev[port_id].nb_queues;
 872                if (nb_queues == 0)
 873                        return 0;
 874
 875                i = 0;
 876                q = 0;
 877                tqi = txa->txa_ethdev[port_id].queues;
 878
 879                while (i < nb_queues) {
 880
 881                        if (tqi[q].added) {
 882                                ret = txa_service_queue_del(id, dev, q);
 883                                if (ret != 0)
 884                                        break;
 885                        }
 886                        i++;
 887                        q++;
 888                }
 889                return ret;
 890        }
 891
 892        txa = txa_service_id_to_data(id);
 893
 894        tqi = txa_service_queue(txa, port_id, tx_queue_id);
 895        if (tqi == NULL || !tqi->added)
 896                return 0;
 897
 898        tb = tqi->tx_buf;
 899        tqi->added = 0;
 900        tqi->tx_buf = NULL;
 901        rte_free(tb);
 902        txa->nb_queues--;
 903        txa->txa_ethdev[port_id].nb_queues--;
 904
 905        txa_service_queue_array_free(txa, port_id);
 906        return 0;
 907}
 908
 909static int
 910txa_service_id_get(uint8_t id, uint32_t *service_id)
 911{
 912        struct txa_service_data *txa;
 913
 914        txa = txa_service_id_to_data(id);
 915        if (txa->service_id == TXA_INVALID_SERVICE_ID)
 916                return -ESRCH;
 917
 918        if (service_id == NULL)
 919                return -EINVAL;
 920
 921        *service_id = txa->service_id;
 922        return 0;
 923}
 924
 925static int
 926txa_service_start(uint8_t id)
 927{
 928        return txa_service_ctrl(id, 1);
 929}
 930
 931static int
 932txa_service_stats_get(uint8_t id,
 933                struct rte_event_eth_tx_adapter_stats *stats)
 934{
 935        struct txa_service_data *txa;
 936
 937        txa = txa_service_id_to_data(id);
 938        *stats = txa->stats;
 939        return 0;
 940}
 941
 942static int
 943txa_service_stats_reset(uint8_t id)
 944{
 945        struct txa_service_data *txa;
 946
 947        txa = txa_service_id_to_data(id);
 948        memset(&txa->stats, 0, sizeof(txa->stats));
 949        return 0;
 950}
 951
 952static int
 953txa_service_stop(uint8_t id)
 954{
 955        return txa_service_ctrl(id, 0);
 956}
 957
 958
 959int
 960rte_event_eth_tx_adapter_create(uint8_t id, uint8_t dev_id,
 961                                struct rte_event_port_conf *port_conf)
 962{
 963        struct rte_eventdev *dev;
 964        int ret;
 965
 966        if (port_conf == NULL)
 967                return -EINVAL;
 968
 969        RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
 970        RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
 971
 972        dev = &rte_eventdevs[dev_id];
 973
 974        ret = txa_init();
 975        if (ret != 0)
 976                return ret;
 977
 978        if (txa_adapter_exist(id))
 979                return -EEXIST;
 980
 981        txa_dev_id_array[id] = dev_id;
 982        if (txa_dev_adapter_create(id))
 983                ret = txa_dev_adapter_create(id)(id, dev);
 984
 985        if (ret != 0) {
 986                txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
 987                return ret;
 988        }
 989
 990        ret = txa_service_adapter_create(id, dev, port_conf);
 991        if (ret != 0) {
 992                if (txa_dev_adapter_free(id))
 993                        txa_dev_adapter_free(id)(id, dev);
 994                txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
 995                return ret;
 996        }
 997        rte_eventdev_trace_eth_tx_adapter_create(id, dev_id, NULL, port_conf,
 998                ret);
 999        txa_dev_id_array[id] = dev_id;
1000        return 0;
1001}
1002
1003int
1004rte_event_eth_tx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1005                                rte_event_eth_tx_adapter_conf_cb conf_cb,
1006                                void *conf_arg)
1007{
1008        struct rte_eventdev *dev;
1009        int ret;
1010
1011        RTE_EVENT_ETH_TX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1012        RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1013
1014        ret = txa_init();
1015        if (ret != 0)
1016                return ret;
1017
1018        if (txa_adapter_exist(id))
1019                return -EINVAL;
1020
1021        dev = &rte_eventdevs[dev_id];
1022
1023        txa_dev_id_array[id] = dev_id;
1024        if (txa_dev_adapter_create_ext(id))
1025                ret = txa_dev_adapter_create_ext(id)(id, dev);
1026
1027        if (ret != 0) {
1028                txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1029                return ret;
1030        }
1031
1032        ret = txa_service_adapter_create_ext(id, dev, conf_cb, conf_arg);
1033        if (ret != 0) {
1034                if (txa_dev_adapter_free(id))
1035                        txa_dev_adapter_free(id)(id, dev);
1036                txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1037                return ret;
1038        }
1039
1040        rte_eventdev_trace_eth_tx_adapter_create(id, dev_id, conf_cb, conf_arg,
1041                ret);
1042        txa_dev_id_array[id] = dev_id;
1043        return 0;
1044}
1045
1046
1047int
1048rte_event_eth_tx_adapter_event_port_get(uint8_t id, uint8_t *event_port_id)
1049{
1050        TXA_CHECK_OR_ERR_RET(id);
1051
1052        return txa_service_event_port_get(id, event_port_id);
1053}
1054
1055int
1056rte_event_eth_tx_adapter_free(uint8_t id)
1057{
1058        int ret;
1059
1060        TXA_CHECK_OR_ERR_RET(id);
1061
1062        ret = txa_dev_adapter_free(id) ?
1063                txa_dev_adapter_free(id)(id, txa_evdev(id)) :
1064                0;
1065
1066        if (ret == 0)
1067                ret = txa_service_adapter_free(id);
1068        txa_dev_id_array[id] = TXA_INVALID_DEV_ID;
1069
1070        rte_eventdev_trace_eth_tx_adapter_free(id, ret);
1071        return ret;
1072}
1073
1074int
1075rte_event_eth_tx_adapter_queue_add(uint8_t id,
1076                                uint16_t eth_dev_id,
1077                                int32_t queue)
1078{
1079        struct rte_eth_dev *eth_dev;
1080        int ret;
1081        uint32_t caps;
1082
1083        RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1084        TXA_CHECK_OR_ERR_RET(id);
1085
1086        eth_dev = &rte_eth_devices[eth_dev_id];
1087        TXA_CHECK_TXQ(eth_dev, queue);
1088
1089        caps = 0;
1090        if (txa_dev_caps_get(id))
1091                txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1092
1093        if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1094                ret =  txa_dev_queue_add(id) ?
1095                                        txa_dev_queue_add(id)(id,
1096                                                        txa_evdev(id),
1097                                                        eth_dev,
1098                                                        queue) : 0;
1099        else
1100                ret = txa_service_queue_add(id, txa_evdev(id), eth_dev, queue);
1101
1102        rte_eventdev_trace_eth_tx_adapter_queue_add(id, eth_dev_id, queue,
1103                ret);
1104        return ret;
1105}
1106
1107int
1108rte_event_eth_tx_adapter_queue_del(uint8_t id,
1109                                uint16_t eth_dev_id,
1110                                int32_t queue)
1111{
1112        struct rte_eth_dev *eth_dev;
1113        int ret;
1114        uint32_t caps;
1115
1116        RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1117        TXA_CHECK_OR_ERR_RET(id);
1118
1119        eth_dev = &rte_eth_devices[eth_dev_id];
1120
1121        caps = 0;
1122
1123        if (txa_dev_caps_get(id))
1124                txa_dev_caps_get(id)(txa_evdev(id), eth_dev, &caps);
1125
1126        if (caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT)
1127                ret =  txa_dev_queue_del(id) ?
1128                                        txa_dev_queue_del(id)(id, txa_evdev(id),
1129                                                        eth_dev,
1130                                                        queue) : 0;
1131        else
1132                ret = txa_service_queue_del(id, eth_dev, queue);
1133
1134        rte_eventdev_trace_eth_tx_adapter_queue_del(id, eth_dev_id, queue,
1135                ret);
1136        return ret;
1137}
1138
1139int
1140rte_event_eth_tx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1141{
1142        TXA_CHECK_OR_ERR_RET(id);
1143
1144        return txa_service_id_get(id, service_id);
1145}
1146
1147int
1148rte_event_eth_tx_adapter_start(uint8_t id)
1149{
1150        int ret;
1151
1152        TXA_CHECK_OR_ERR_RET(id);
1153
1154        ret = txa_dev_start(id) ? txa_dev_start(id)(id, txa_evdev(id)) : 0;
1155        if (ret == 0)
1156                ret = txa_service_start(id);
1157        rte_eventdev_trace_eth_tx_adapter_start(id, ret);
1158        return ret;
1159}
1160
1161int
1162rte_event_eth_tx_adapter_stats_get(uint8_t id,
1163                                struct rte_event_eth_tx_adapter_stats *stats)
1164{
1165        int ret;
1166
1167        TXA_CHECK_OR_ERR_RET(id);
1168
1169        if (stats == NULL)
1170                return -EINVAL;
1171
1172        *stats = (struct rte_event_eth_tx_adapter_stats){0};
1173
1174        ret = txa_dev_stats_get(id) ?
1175                        txa_dev_stats_get(id)(id, txa_evdev(id), stats) : 0;
1176
1177        if (ret == 0 && txa_service_id_get(id, NULL) != ESRCH) {
1178                if (txa_dev_stats_get(id)) {
1179                        struct rte_event_eth_tx_adapter_stats service_stats;
1180
1181                        ret = txa_service_stats_get(id, &service_stats);
1182                        if (ret == 0) {
1183                                stats->tx_retry += service_stats.tx_retry;
1184                                stats->tx_packets += service_stats.tx_packets;
1185                                stats->tx_dropped += service_stats.tx_dropped;
1186                        }
1187                } else
1188                        ret = txa_service_stats_get(id, stats);
1189        }
1190
1191        return ret;
1192}
1193
1194int
1195rte_event_eth_tx_adapter_stats_reset(uint8_t id)
1196{
1197        int ret;
1198
1199        TXA_CHECK_OR_ERR_RET(id);
1200
1201        ret = txa_dev_stats_reset(id) ?
1202                txa_dev_stats_reset(id)(id, txa_evdev(id)) : 0;
1203        if (ret == 0)
1204                ret = txa_service_stats_reset(id);
1205        return ret;
1206}
1207
1208int
1209rte_event_eth_tx_adapter_stop(uint8_t id)
1210{
1211        int ret;
1212
1213        TXA_CHECK_OR_ERR_RET(id);
1214
1215        ret = txa_dev_stop(id) ? txa_dev_stop(id)(id,  txa_evdev(id)) : 0;
1216        if (ret == 0)
1217                ret = txa_service_stop(id);
1218        rte_eventdev_trace_eth_tx_adapter_stop(id, ret);
1219        return ret;
1220}
1221