dpdk/examples/vm_power_manager/channel_monitor.c
<<
>>
Prefs
   1/* SPDX-License-Identifier: BSD-3-Clause
   2 * Copyright(c) 2010-2014 Intel Corporation
   3 */
   4
   5#include <unistd.h>
   6#include <stdio.h>
   7#include <stdlib.h>
   8#include <stdint.h>
   9#include <signal.h>
  10#include <errno.h>
  11#include <string.h>
  12#include <fcntl.h>
  13#include <sys/types.h>
  14#include <sys/epoll.h>
  15#include <sys/queue.h>
  16#include <sys/time.h>
  17#include <sys/socket.h>
  18#include <sys/select.h>
  19#ifdef USE_JANSSON
  20#include <jansson.h>
  21#else
  22#pragma message "Jansson dev libs unavailable, not including JSON parsing"
  23#endif
  24#include <rte_string_fns.h>
  25#include <rte_log.h>
  26#include <rte_memory.h>
  27#include <rte_malloc.h>
  28#include <rte_cycles.h>
  29#include <rte_ethdev.h>
  30#ifdef RTE_NET_I40E
  31#include <rte_pmd_i40e.h>
  32#endif
  33#include <rte_power.h>
  34
  35#include <libvirt/libvirt.h>
  36#include "channel_monitor.h"
  37#include "channel_manager.h"
  38#include "power_manager.h"
  39#include "oob_monitor.h"
  40
  41#define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1
  42
  43#define MAX_EVENTS 256
  44
  45uint64_t vsi_pkt_count_prev[384];
  46uint64_t rdtsc_prev[384];
  47#define MAX_JSON_STRING_LEN 1024
  48char json_data[MAX_JSON_STRING_LEN];
  49
  50double time_period_ms = 1;
  51static volatile unsigned run_loop = 1;
  52static int global_event_fd;
  53static unsigned int policy_is_set;
  54static struct epoll_event *global_events_list;
  55static struct policy policies[RTE_MAX_LCORE];
  56
  57#ifdef USE_JANSSON
  58
  59union PFID {
  60        struct rte_ether_addr addr;
  61        uint64_t pfid;
  62};
  63
  64static int
  65str_to_ether_addr(const char *a, struct rte_ether_addr *ether_addr)
  66{
  67        int i;
  68        char *end;
  69        unsigned long o[RTE_ETHER_ADDR_LEN];
  70
  71        i = 0;
  72        do {
  73                errno = 0;
  74                o[i] = strtoul(a, &end, 16);
  75                if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0))
  76                        return -1;
  77                a = end + 1;
  78        } while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0);
  79
  80        /* Junk at the end of line */
  81        if (end[0] != 0)
  82                return -1;
  83
  84        /* Support the format XX:XX:XX:XX:XX:XX */
  85        if (i == RTE_ETHER_ADDR_LEN) {
  86                while (i-- != 0) {
  87                        if (o[i] > UINT8_MAX)
  88                                return -1;
  89                        ether_addr->addr_bytes[i] = (uint8_t)o[i];
  90                }
  91        /* Support the format XXXX:XXXX:XXXX */
  92        } else if (i == RTE_ETHER_ADDR_LEN / 2) {
  93                while (i-- != 0) {
  94                        if (o[i] > UINT16_MAX)
  95                                return -1;
  96                        ether_addr->addr_bytes[i * 2] =
  97                                        (uint8_t)(o[i] >> 8);
  98                        ether_addr->addr_bytes[i * 2 + 1] =
  99                                        (uint8_t)(o[i] & 0xff);
 100                }
 101        /* unknown format */
 102        } else
 103                return -1;
 104
 105        return 0;
 106}
 107
 108static int
 109set_policy_mac(struct rte_power_channel_packet *pkt, int idx, char *mac)
 110{
 111        union PFID pfid;
 112        int ret;
 113
 114        /* Use port MAC address as the vfid */
 115        ret = str_to_ether_addr(mac, &pfid.addr);
 116
 117        if (ret != 0) {
 118                RTE_LOG(ERR, CHANNEL_MONITOR,
 119                        "Invalid mac address received in JSON\n");
 120                pkt->vfid[idx] = 0;
 121                return -1;
 122        }
 123
 124        printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":"
 125                        "%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n",
 126                        RTE_ETHER_ADDR_BYTES(&pfid.addr));
 127
 128        pkt->vfid[idx] = pfid.pfid;
 129        return 0;
 130}
 131
 132static char*
 133get_resource_name_from_chn_path(const char *channel_path)
 134{
 135        char *substr = NULL;
 136
 137        substr = strstr(channel_path, CHANNEL_MGR_FIFO_PATTERN_NAME);
 138
 139        return substr;
 140}
 141
 142static int
 143get_resource_id_from_vmname(const char *vm_name)
 144{
 145        int result = -1;
 146        int off = 0;
 147
 148        if (vm_name == NULL)
 149                return -1;
 150
 151        while (vm_name[off] != '\0') {
 152                if (isdigit(vm_name[off]))
 153                        break;
 154                off++;
 155        }
 156        result = atoi(&vm_name[off]);
 157        if ((result == 0) && (vm_name[off] != '0'))
 158                return -1;
 159
 160        return result;
 161}
 162
 163static int
 164parse_json_to_pkt(json_t *element, struct rte_power_channel_packet *pkt,
 165                                        const char *vm_name)
 166{
 167        const char *key;
 168        json_t *value;
 169        int ret;
 170        int resource_id;
 171
 172        memset(pkt, 0, sizeof(*pkt));
 173
 174        pkt->nb_mac_to_monitor = 0;
 175        pkt->t_boost_status.tbEnabled = false;
 176        pkt->workload = RTE_POWER_WL_LOW;
 177        pkt->policy_to_use = RTE_POWER_POLICY_TIME;
 178        pkt->command = RTE_POWER_PKT_POLICY;
 179        pkt->core_type = RTE_POWER_CORE_TYPE_PHYSICAL;
 180
 181        if (vm_name == NULL) {
 182                RTE_LOG(ERR, CHANNEL_MONITOR,
 183                        "vm_name is NULL, request rejected !\n");
 184                return -1;
 185        }
 186
 187        json_object_foreach(element, key, value) {
 188                if (!strcmp(key, "policy")) {
 189                        /* Recurse in to get the contents of profile */
 190                        ret = parse_json_to_pkt(value, pkt, vm_name);
 191                        if (ret)
 192                                return ret;
 193                } else if (!strcmp(key, "instruction")) {
 194                        /* Recurse in to get the contents of instruction */
 195                        ret = parse_json_to_pkt(value, pkt, vm_name);
 196                        if (ret)
 197                                return ret;
 198                } else if (!strcmp(key, "command")) {
 199                        char command[32];
 200                        strlcpy(command, json_string_value(value), 32);
 201                        if (!strcmp(command, "power")) {
 202                                pkt->command = RTE_POWER_CPU_POWER;
 203                        } else if (!strcmp(command, "create")) {
 204                                pkt->command = RTE_POWER_PKT_POLICY;
 205                        } else if (!strcmp(command, "destroy")) {
 206                                pkt->command = RTE_POWER_PKT_POLICY_REMOVE;
 207                        } else {
 208                                RTE_LOG(ERR, CHANNEL_MONITOR,
 209                                        "Invalid command received in JSON\n");
 210                                return -1;
 211                        }
 212                } else if (!strcmp(key, "policy_type")) {
 213                        char command[32];
 214                        strlcpy(command, json_string_value(value), 32);
 215                        if (!strcmp(command, "TIME")) {
 216                                pkt->policy_to_use =
 217                                                RTE_POWER_POLICY_TIME;
 218                        } else if (!strcmp(command, "TRAFFIC")) {
 219                                pkt->policy_to_use =
 220                                                RTE_POWER_POLICY_TRAFFIC;
 221                        } else if (!strcmp(command, "WORKLOAD")) {
 222                                pkt->policy_to_use =
 223                                                RTE_POWER_POLICY_WORKLOAD;
 224                        } else if (!strcmp(command, "BRANCH_RATIO")) {
 225                                pkt->policy_to_use =
 226                                                RTE_POWER_POLICY_BRANCH_RATIO;
 227                        } else {
 228                                RTE_LOG(ERR, CHANNEL_MONITOR,
 229                                        "Wrong policy_type received in JSON\n");
 230                                return -1;
 231                        }
 232                } else if (!strcmp(key, "workload")) {
 233                        char command[32];
 234                        strlcpy(command, json_string_value(value), 32);
 235                        if (!strcmp(command, "HIGH")) {
 236                                pkt->workload = RTE_POWER_WL_HIGH;
 237                        } else if (!strcmp(command, "MEDIUM")) {
 238                                pkt->workload = RTE_POWER_WL_MEDIUM;
 239                        } else if (!strcmp(command, "LOW")) {
 240                                pkt->workload = RTE_POWER_WL_LOW;
 241                        } else {
 242                                RTE_LOG(ERR, CHANNEL_MONITOR,
 243                                        "Wrong workload received in JSON\n");
 244                                return -1;
 245                        }
 246                } else if (!strcmp(key, "busy_hours")) {
 247                        unsigned int i;
 248                        size_t size = json_array_size(value);
 249
 250                        for (i = 0; i < size; i++) {
 251                                int hour = (int)json_integer_value(
 252                                                json_array_get(value, i));
 253                                pkt->timer_policy.busy_hours[i] = hour;
 254                        }
 255                } else if (!strcmp(key, "quiet_hours")) {
 256                        unsigned int i;
 257                        size_t size = json_array_size(value);
 258
 259                        for (i = 0; i < size; i++) {
 260                                int hour = (int)json_integer_value(
 261                                                json_array_get(value, i));
 262                                pkt->timer_policy.quiet_hours[i] = hour;
 263                        }
 264                } else if (!strcmp(key, "mac_list")) {
 265                        unsigned int i;
 266                        size_t size = json_array_size(value);
 267
 268                        for (i = 0; i < size; i++) {
 269                                char mac[32];
 270                                strlcpy(mac,
 271                                        json_string_value(json_array_get(value, i)),
 272                                        32);
 273                                set_policy_mac(pkt, i, mac);
 274                        }
 275                        pkt->nb_mac_to_monitor = size;
 276                } else if (!strcmp(key, "avg_packet_thresh")) {
 277                        pkt->traffic_policy.avg_max_packet_thresh =
 278                                        (uint32_t)json_integer_value(value);
 279                } else if (!strcmp(key, "max_packet_thresh")) {
 280                        pkt->traffic_policy.max_max_packet_thresh =
 281                                        (uint32_t)json_integer_value(value);
 282                } else if (!strcmp(key, "unit")) {
 283                        char unit[32];
 284                        strlcpy(unit, json_string_value(value), 32);
 285                        if (!strcmp(unit, "SCALE_UP")) {
 286                                pkt->unit = RTE_POWER_SCALE_UP;
 287                        } else if (!strcmp(unit, "SCALE_DOWN")) {
 288                                pkt->unit = RTE_POWER_SCALE_DOWN;
 289                        } else if (!strcmp(unit, "SCALE_MAX")) {
 290                                pkt->unit = RTE_POWER_SCALE_MAX;
 291                        } else if (!strcmp(unit, "SCALE_MIN")) {
 292                                pkt->unit = RTE_POWER_SCALE_MIN;
 293                        } else if (!strcmp(unit, "ENABLE_TURBO")) {
 294                                pkt->unit = RTE_POWER_ENABLE_TURBO;
 295                        } else if (!strcmp(unit, "DISABLE_TURBO")) {
 296                                pkt->unit = RTE_POWER_DISABLE_TURBO;
 297                        } else {
 298                                RTE_LOG(ERR, CHANNEL_MONITOR,
 299                                        "Invalid command received in JSON\n");
 300                                return -1;
 301                        }
 302                } else {
 303                        RTE_LOG(ERR, CHANNEL_MONITOR,
 304                                "Unknown key received in JSON string: %s\n",
 305                                key);
 306                }
 307
 308                resource_id = get_resource_id_from_vmname(vm_name);
 309                if (resource_id < 0) {
 310                        RTE_LOG(ERR, CHANNEL_MONITOR,
 311                                "Could not get resource_id from vm_name:%s\n",
 312                                vm_name);
 313                        return -1;
 314                }
 315                strlcpy(pkt->vm_name, vm_name, RTE_POWER_VM_MAX_NAME_SZ);
 316                pkt->resource_id = resource_id;
 317        }
 318        return 0;
 319}
 320#endif
 321
 322void channel_monitor_exit(void)
 323{
 324        run_loop = 0;
 325        rte_free(global_events_list);
 326}
 327
 328static void
 329core_share(int pNo, int z, int x, int t)
 330{
 331        if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) {
 332                if (strcmp(policies[pNo].pkt.vm_name,
 333                                lvm_info[x].vm_name) != 0) {
 334                        policies[pNo].core_share[z].status = 1;
 335                        power_manager_scale_core_max(
 336                                        policies[pNo].core_share[z].pcpu);
 337                }
 338        }
 339}
 340
 341static void
 342core_share_status(int pNo)
 343{
 344
 345        int noVms = 0, noVcpus = 0, z, x, t;
 346
 347        get_all_vm(&noVms, &noVcpus);
 348
 349        /* Reset Core Share Status. */
 350        for (z = 0; z < noVcpus; z++)
 351                policies[pNo].core_share[z].status = 0;
 352
 353        /* Foreach vcpu in a policy. */
 354        for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) {
 355                /* Foreach VM on the platform. */
 356                for (x = 0; x < noVms; x++) {
 357                        /* Foreach vcpu of VMs on platform. */
 358                        for (t = 0; t < lvm_info[x].num_cpus; t++)
 359                                core_share(pNo, z, x, t);
 360                }
 361        }
 362}
 363
 364
 365static int
 366pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count)
 367{
 368        int ret = 0;
 369
 370        if (pol->pkt.policy_to_use == RTE_POWER_POLICY_BRANCH_RATIO) {
 371                ci->cd[pcpu].oob_enabled = 1;
 372                ret = add_core_to_monitor(pcpu);
 373                if (ret == 0)
 374                        RTE_LOG(INFO, CHANNEL_MONITOR,
 375                                        "Monitoring pcpu %d OOB for %s\n",
 376                                        pcpu, pol->pkt.vm_name);
 377                else
 378                        RTE_LOG(ERR, CHANNEL_MONITOR,
 379                                        "Error monitoring pcpu %d OOB for %s\n",
 380                                        pcpu, pol->pkt.vm_name);
 381
 382        } else {
 383                pol->core_share[count].pcpu = pcpu;
 384                RTE_LOG(INFO, CHANNEL_MONITOR,
 385                                "Monitoring pcpu %d for %s\n",
 386                                pcpu, pol->pkt.vm_name);
 387        }
 388        return ret;
 389}
 390
 391static void
 392get_pcpu_to_control(struct policy *pol)
 393{
 394
 395        /* Convert vcpu to pcpu. */
 396        struct vm_info info;
 397        int pcpu, count;
 398        struct core_info *ci;
 399
 400        ci = get_core_info();
 401
 402        RTE_LOG(DEBUG, CHANNEL_MONITOR,
 403                        "Looking for pcpu for %s\n", pol->pkt.vm_name);
 404
 405        /*
 406         * So now that we're handling virtual and physical cores, we need to
 407         * differentiate between them when adding them to the branch monitor.
 408         * Virtual cores need to be converted to physical cores.
 409         */
 410        if (pol->pkt.core_type == RTE_POWER_CORE_TYPE_VIRTUAL) {
 411                /*
 412                 * If the cores in the policy are virtual, we need to map them
 413                 * to physical core. We look up the vm info and use that for
 414                 * the mapping.
 415                 */
 416                get_info_vm(pol->pkt.vm_name, &info);
 417                for (count = 0; count < pol->pkt.num_vcpu; count++) {
 418                        pcpu = info.pcpu_map[pol->pkt.vcpu_to_control[count]];
 419                        pcpu_monitor(pol, ci, pcpu, count);
 420                }
 421        } else {
 422                /*
 423                 * If the cores in the policy are physical, we just use
 424                 * those core id's directly.
 425                 */
 426                for (count = 0; count < pol->pkt.num_vcpu; count++) {
 427                        pcpu = pol->pkt.vcpu_to_control[count];
 428                        pcpu_monitor(pol, ci, pcpu, count);
 429                }
 430        }
 431}
 432
 433static int
 434get_pfid(struct policy *pol)
 435{
 436
 437        int i, x, ret = 0;
 438
 439        for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) {
 440
 441                RTE_ETH_FOREACH_DEV(x) {
 442#ifdef RTE_NET_I40E
 443                        ret = rte_pmd_i40e_query_vfid_by_mac(x,
 444                                (struct rte_ether_addr *)&(pol->pkt.vfid[i]));
 445#else
 446                        ret = -ENOTSUP;
 447#endif
 448                        if (ret != -EINVAL) {
 449                                pol->port[i] = x;
 450                                break;
 451                        }
 452                }
 453                if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) {
 454                        RTE_LOG(INFO, CHANNEL_MONITOR,
 455                                "Error with Policy. MAC not found on "
 456                                "attached ports ");
 457                        pol->enabled = 0;
 458                        return ret;
 459                }
 460                pol->pfid[i] = ret;
 461        }
 462        return 1;
 463}
 464
 465static int
 466update_policy(struct rte_power_channel_packet *pkt)
 467{
 468
 469        unsigned int updated = 0;
 470        unsigned int i;
 471
 472
 473        RTE_LOG(INFO, CHANNEL_MONITOR,
 474                        "Applying policy for %s\n", pkt->vm_name);
 475
 476        for (i = 0; i < RTE_DIM(policies); i++) {
 477                if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
 478                        /* Copy the contents of *pkt into the policy.pkt */
 479                        policies[i].pkt = *pkt;
 480                        get_pcpu_to_control(&policies[i]);
 481                        /* Check Eth dev only for Traffic policy */
 482                        if (policies[i].pkt.policy_to_use ==
 483                                        RTE_POWER_POLICY_TRAFFIC) {
 484                                if (get_pfid(&policies[i]) < 0) {
 485                                        updated = 1;
 486                                        break;
 487                                }
 488                        }
 489                        core_share_status(i);
 490                        policies[i].enabled = 1;
 491                        updated = 1;
 492                }
 493        }
 494        if (!updated) {
 495                for (i = 0; i < RTE_DIM(policies); i++) {
 496                        if (policies[i].enabled == 0) {
 497                                policies[i].pkt = *pkt;
 498                                get_pcpu_to_control(&policies[i]);
 499                                /* Check Eth dev only for Traffic policy */
 500                                if (policies[i].pkt.policy_to_use ==
 501                                                RTE_POWER_POLICY_TRAFFIC) {
 502                                        if (get_pfid(&policies[i]) < 0) {
 503                                                updated = 1;
 504                                                break;
 505                                        }
 506                                }
 507                                core_share_status(i);
 508                                policies[i].enabled = 1;
 509                                break;
 510                        }
 511                }
 512        }
 513        return 0;
 514}
 515
 516static int
 517remove_policy(struct rte_power_channel_packet *pkt __rte_unused)
 518{
 519        unsigned int i;
 520
 521        /*
 522         * Disabling the policy is simply a case of setting
 523         * enabled to 0
 524         */
 525        for (i = 0; i < RTE_DIM(policies); i++) {
 526                if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
 527                        policies[i].enabled = 0;
 528                        return 0;
 529                }
 530        }
 531        return -1;
 532}
 533
 534static uint64_t
 535get_pkt_diff(struct policy *pol)
 536{
 537
 538        uint64_t vsi_pkt_count,
 539                vsi_pkt_total = 0,
 540                vsi_pkt_count_prev_total = 0;
 541        double rdtsc_curr, rdtsc_diff, diff;
 542        int x;
 543#ifdef RTE_NET_I40E
 544        struct rte_eth_stats vf_stats;
 545#endif
 546
 547        for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) {
 548
 549#ifdef RTE_NET_I40E
 550                /*Read vsi stats*/
 551                if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0)
 552                        vsi_pkt_count = vf_stats.ipackets;
 553                else
 554                        vsi_pkt_count = -1;
 555#else
 556                vsi_pkt_count = -1;
 557#endif
 558
 559                vsi_pkt_total += vsi_pkt_count;
 560
 561                vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]];
 562                vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count;
 563        }
 564
 565        rdtsc_curr = rte_rdtsc_precise();
 566        rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]];
 567        rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr;
 568
 569        diff = (vsi_pkt_total - vsi_pkt_count_prev_total) *
 570                        ((double)rte_get_tsc_hz() / rdtsc_diff);
 571
 572        return diff;
 573}
 574
 575static void
 576apply_traffic_profile(struct policy *pol)
 577{
 578
 579        int count;
 580        uint64_t diff = 0;
 581
 582        diff = get_pkt_diff(pol);
 583
 584        if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) {
 585                for (count = 0; count < pol->pkt.num_vcpu; count++) {
 586                        if (pol->core_share[count].status != 1)
 587                                power_manager_scale_core_max(
 588                                                pol->core_share[count].pcpu);
 589                }
 590        } else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
 591                for (count = 0; count < pol->pkt.num_vcpu; count++) {
 592                        if (pol->core_share[count].status != 1)
 593                                power_manager_scale_core_med(
 594                                                pol->core_share[count].pcpu);
 595                }
 596        } else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
 597                for (count = 0; count < pol->pkt.num_vcpu; count++) {
 598                        if (pol->core_share[count].status != 1)
 599                                power_manager_scale_core_min(
 600                                                pol->core_share[count].pcpu);
 601                }
 602        }
 603}
 604
 605static void
 606apply_time_profile(struct policy *pol)
 607{
 608
 609        int count, x;
 610        struct timeval tv;
 611        struct tm *ptm;
 612        char time_string[40];
 613
 614        /* Obtain the time of day, and convert it to a tm struct. */
 615        gettimeofday(&tv, NULL);
 616        ptm = localtime(&tv.tv_sec);
 617        /* Format the date and time, down to a single second. */
 618        strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
 619
 620        for (x = 0; x < RTE_POWER_HOURS_PER_DAY; x++) {
 621
 622                if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) {
 623                        for (count = 0; count < pol->pkt.num_vcpu; count++) {
 624                                if (pol->core_share[count].status != 1) {
 625                                        power_manager_scale_core_max(
 626                                                pol->core_share[count].pcpu);
 627                                }
 628                        }
 629                        break;
 630                } else if (ptm->tm_hour ==
 631                                pol->pkt.timer_policy.quiet_hours[x]) {
 632                        for (count = 0; count < pol->pkt.num_vcpu; count++) {
 633                                if (pol->core_share[count].status != 1) {
 634                                        power_manager_scale_core_min(
 635                                                pol->core_share[count].pcpu);
 636                        }
 637                }
 638                        break;
 639                } else if (ptm->tm_hour ==
 640                        pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) {
 641                        apply_traffic_profile(pol);
 642                        break;
 643                }
 644        }
 645}
 646
 647static void
 648apply_workload_profile(struct policy *pol)
 649{
 650
 651        int count;
 652
 653        if (pol->pkt.workload == RTE_POWER_WL_HIGH) {
 654                for (count = 0; count < pol->pkt.num_vcpu; count++) {
 655                        if (pol->core_share[count].status != 1)
 656                                power_manager_scale_core_max(
 657                                                pol->core_share[count].pcpu);
 658                }
 659        } else if (pol->pkt.workload == RTE_POWER_WL_MEDIUM) {
 660                for (count = 0; count < pol->pkt.num_vcpu; count++) {
 661                        if (pol->core_share[count].status != 1)
 662                                power_manager_scale_core_med(
 663                                                pol->core_share[count].pcpu);
 664                }
 665        } else if (pol->pkt.workload == RTE_POWER_WL_LOW) {
 666                for (count = 0; count < pol->pkt.num_vcpu; count++) {
 667                        if (pol->core_share[count].status != 1)
 668                                power_manager_scale_core_min(
 669                                                pol->core_share[count].pcpu);
 670                }
 671        }
 672}
 673
 674static void
 675apply_policy(struct policy *pol)
 676{
 677
 678        struct rte_power_channel_packet *pkt = &pol->pkt;
 679
 680        /*Check policy to use*/
 681        if (pkt->policy_to_use == RTE_POWER_POLICY_TRAFFIC)
 682                apply_traffic_profile(pol);
 683        else if (pkt->policy_to_use == RTE_POWER_POLICY_TIME)
 684                apply_time_profile(pol);
 685        else if (pkt->policy_to_use == RTE_POWER_POLICY_WORKLOAD)
 686                apply_workload_profile(pol);
 687}
 688
 689static int
 690write_binary_packet(void *buffer,
 691                size_t buffer_len,
 692                struct channel_info *chan_info)
 693{
 694        int ret;
 695
 696        if (buffer_len == 0 || buffer == NULL)
 697                return -1;
 698
 699        if (chan_info->fd < 0) {
 700                RTE_LOG(ERR, CHANNEL_MONITOR, "Channel is not connected\n");
 701                return -1;
 702        }
 703
 704        while (buffer_len > 0) {
 705                ret = write(chan_info->fd, buffer, buffer_len);
 706                if (ret == -1) {
 707                        if (errno == EINTR)
 708                                continue;
 709                        RTE_LOG(ERR, CHANNEL_MONITOR, "Write function failed due to %s.\n",
 710                                        strerror(errno));
 711                        return -1;
 712                }
 713                buffer = (char *)buffer + ret;
 714                buffer_len -= ret;
 715        }
 716        return 0;
 717}
 718
 719static int
 720send_freq(struct rte_power_channel_packet *pkt,
 721                struct channel_info *chan_info,
 722                bool freq_list)
 723{
 724        unsigned int vcore_id = pkt->resource_id;
 725        struct rte_power_channel_packet_freq_list channel_pkt_freq_list;
 726        struct vm_info info;
 727
 728        if (get_info_vm(pkt->vm_name, &info) != 0)
 729                return -1;
 730
 731        if (!freq_list && vcore_id >= RTE_POWER_MAX_VCPU_PER_VM)
 732                return -1;
 733
 734        if (!info.allow_query)
 735                return -1;
 736
 737        channel_pkt_freq_list.command = RTE_POWER_FREQ_LIST;
 738        channel_pkt_freq_list.num_vcpu = info.num_vcpus;
 739
 740        if (freq_list) {
 741                unsigned int i;
 742                for (i = 0; i < info.num_vcpus; i++)
 743                        channel_pkt_freq_list.freq_list[i] =
 744                          power_manager_get_current_frequency(info.pcpu_map[i]);
 745        } else {
 746                channel_pkt_freq_list.freq_list[vcore_id] =
 747                  power_manager_get_current_frequency(info.pcpu_map[vcore_id]);
 748        }
 749
 750        return write_binary_packet(&channel_pkt_freq_list,
 751                        sizeof(channel_pkt_freq_list),
 752                        chan_info);
 753}
 754
 755static int
 756send_capabilities(struct rte_power_channel_packet *pkt,
 757                struct channel_info *chan_info,
 758                bool list_requested)
 759{
 760        unsigned int vcore_id = pkt->resource_id;
 761        struct rte_power_channel_packet_caps_list channel_pkt_caps_list;
 762        struct vm_info info;
 763        struct rte_power_core_capabilities caps;
 764        int ret;
 765
 766        if (get_info_vm(pkt->vm_name, &info) != 0)
 767                return -1;
 768
 769        if (!list_requested && vcore_id >= RTE_POWER_MAX_VCPU_PER_VM)
 770                return -1;
 771
 772        if (!info.allow_query)
 773                return -1;
 774
 775        channel_pkt_caps_list.command = RTE_POWER_CAPS_LIST;
 776        channel_pkt_caps_list.num_vcpu = info.num_vcpus;
 777
 778        if (list_requested) {
 779                unsigned int i;
 780                for (i = 0; i < info.num_vcpus; i++) {
 781                        ret = rte_power_get_capabilities(info.pcpu_map[i],
 782                                        &caps);
 783                        if (ret == 0) {
 784                                channel_pkt_caps_list.turbo[i] =
 785                                                caps.turbo;
 786                                channel_pkt_caps_list.priority[i] =
 787                                                caps.priority;
 788                        } else
 789                                return -1;
 790
 791                }
 792        } else {
 793                ret = rte_power_get_capabilities(info.pcpu_map[vcore_id],
 794                                &caps);
 795                if (ret == 0) {
 796                        channel_pkt_caps_list.turbo[vcore_id] =
 797                                        caps.turbo;
 798                        channel_pkt_caps_list.priority[vcore_id] =
 799                                        caps.priority;
 800                } else
 801                        return -1;
 802        }
 803
 804        return write_binary_packet(&channel_pkt_caps_list,
 805                        sizeof(channel_pkt_caps_list),
 806                        chan_info);
 807}
 808
 809static int
 810send_ack_for_received_cmd(struct rte_power_channel_packet *pkt,
 811                struct channel_info *chan_info,
 812                uint32_t command)
 813{
 814        pkt->command = command;
 815        return write_binary_packet(pkt,
 816                        sizeof(*pkt),
 817                        chan_info);
 818}
 819
 820static int
 821process_request(struct rte_power_channel_packet *pkt,
 822                struct channel_info *chan_info)
 823{
 824        int ret;
 825
 826        if (chan_info == NULL)
 827                return -1;
 828
 829        uint32_t channel_connected = CHANNEL_MGR_CHANNEL_CONNECTED;
 830        if (__atomic_compare_exchange_n(&(chan_info->status), &channel_connected,
 831                CHANNEL_MGR_CHANNEL_PROCESSING, 0, __ATOMIC_RELAXED, __ATOMIC_RELAXED) == 0)
 832                return -1;
 833
 834        if (pkt->command == RTE_POWER_CPU_POWER) {
 835                unsigned int core_num;
 836
 837                if (pkt->core_type == RTE_POWER_CORE_TYPE_VIRTUAL)
 838                        core_num = get_pcpu(chan_info, pkt->resource_id);
 839                else
 840                        core_num = pkt->resource_id;
 841
 842                RTE_LOG(DEBUG, CHANNEL_MONITOR, "Processing requested cmd for cpu:%d\n",
 843                        core_num);
 844
 845                int scale_res;
 846                bool valid_unit = true;
 847
 848                switch (pkt->unit) {
 849                case(RTE_POWER_SCALE_MIN):
 850                        scale_res = power_manager_scale_core_min(core_num);
 851                        break;
 852                case(RTE_POWER_SCALE_MAX):
 853                        scale_res = power_manager_scale_core_max(core_num);
 854                        break;
 855                case(RTE_POWER_SCALE_DOWN):
 856                        scale_res = power_manager_scale_core_down(core_num);
 857                        break;
 858                case(RTE_POWER_SCALE_UP):
 859                        scale_res = power_manager_scale_core_up(core_num);
 860                        break;
 861                case(RTE_POWER_ENABLE_TURBO):
 862                        scale_res = power_manager_enable_turbo_core(core_num);
 863                        break;
 864                case(RTE_POWER_DISABLE_TURBO):
 865                        scale_res = power_manager_disable_turbo_core(core_num);
 866                        break;
 867                default:
 868                        valid_unit = false;
 869                        break;
 870                }
 871
 872                if (valid_unit) {
 873                        ret = send_ack_for_received_cmd(pkt,
 874                                        chan_info,
 875                                        scale_res >= 0 ?
 876                                                RTE_POWER_CMD_ACK :
 877                                                RTE_POWER_CMD_NACK);
 878                        if (ret < 0)
 879                                RTE_LOG(ERR, CHANNEL_MONITOR, "Error during sending ack command.\n");
 880                } else
 881                        RTE_LOG(ERR, CHANNEL_MONITOR, "Unexpected unit type.\n");
 882
 883        }
 884
 885        if (pkt->command == RTE_POWER_PKT_POLICY) {
 886                RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n",
 887                                pkt->vm_name);
 888                int ret = send_ack_for_received_cmd(pkt,
 889                                chan_info,
 890                                RTE_POWER_CMD_ACK);
 891                if (ret < 0)
 892                        RTE_LOG(ERR, CHANNEL_MONITOR, "Error during sending ack command.\n");
 893                update_policy(pkt);
 894                policy_is_set = 1;
 895        }
 896
 897        if (pkt->command == RTE_POWER_PKT_POLICY_REMOVE) {
 898                ret = remove_policy(pkt);
 899                if (ret == 0)
 900                        RTE_LOG(INFO, CHANNEL_MONITOR,
 901                                 "Removed policy %s\n", pkt->vm_name);
 902                else
 903                        RTE_LOG(INFO, CHANNEL_MONITOR,
 904                                 "Policy %s does not exist\n", pkt->vm_name);
 905        }
 906
 907        if (pkt->command == RTE_POWER_QUERY_FREQ_LIST ||
 908                pkt->command == RTE_POWER_QUERY_FREQ) {
 909
 910                RTE_LOG(INFO, CHANNEL_MONITOR,
 911                        "Frequency for %s requested.\n", pkt->vm_name);
 912                int ret = send_freq(pkt,
 913                                chan_info,
 914                                pkt->command == RTE_POWER_QUERY_FREQ_LIST);
 915                if (ret < 0)
 916                        RTE_LOG(ERR, CHANNEL_MONITOR, "Error during frequency sending.\n");
 917        }
 918
 919        if (pkt->command == RTE_POWER_QUERY_CAPS_LIST ||
 920                pkt->command == RTE_POWER_QUERY_CAPS) {
 921
 922                RTE_LOG(INFO, CHANNEL_MONITOR,
 923                        "Capabilities for %s requested.\n", pkt->vm_name);
 924                int ret = send_capabilities(pkt,
 925                                chan_info,
 926                                pkt->command == RTE_POWER_QUERY_CAPS_LIST);
 927                if (ret < 0)
 928                        RTE_LOG(ERR, CHANNEL_MONITOR, "Error during sending capabilities.\n");
 929        }
 930
 931        /*
 932         * Return is not checked as channel status may have been set to DISABLED
 933         * from management thread
 934         */
 935        uint32_t channel_processing = CHANNEL_MGR_CHANNEL_PROCESSING;
 936        __atomic_compare_exchange_n(&(chan_info->status), &channel_processing,
 937                CHANNEL_MGR_CHANNEL_CONNECTED, 0, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
 938        return 0;
 939
 940}
 941
 942int
 943add_channel_to_monitor(struct channel_info **chan_info)
 944{
 945        struct channel_info *info = *chan_info;
 946        struct epoll_event event;
 947
 948        event.events = EPOLLIN;
 949        event.data.ptr = info;
 950        if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) {
 951                RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' "
 952                                "to epoll\n", info->channel_path);
 953                return -1;
 954        }
 955        RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' "
 956                        "to monitor\n", info->channel_path);
 957        return 0;
 958}
 959
 960int
 961remove_channel_from_monitor(struct channel_info *chan_info)
 962{
 963        if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL,
 964                        chan_info->fd, NULL) < 0) {
 965                RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' "
 966                                "from epoll\n", chan_info->channel_path);
 967                return -1;
 968        }
 969        return 0;
 970}
 971
 972int
 973channel_monitor_init(void)
 974{
 975        global_event_fd = epoll_create1(0);
 976        if (global_event_fd == 0) {
 977                RTE_LOG(ERR, CHANNEL_MONITOR,
 978                                "Error creating epoll context with error %s\n",
 979                                strerror(errno));
 980                return -1;
 981        }
 982        global_events_list = rte_malloc("epoll_events",
 983                        sizeof(*global_events_list)
 984                        * MAX_EVENTS, RTE_CACHE_LINE_SIZE);
 985        if (global_events_list == NULL) {
 986                RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for "
 987                                "epoll events\n");
 988                return -1;
 989        }
 990        return 0;
 991}
 992
 993static void
 994read_binary_packet(struct channel_info *chan_info)
 995{
 996        struct rte_power_channel_packet pkt;
 997        void *buffer = &pkt;
 998        int buffer_len = sizeof(pkt);
 999        int n_bytes, err = 0;
1000
1001        while (buffer_len > 0) {
1002                n_bytes = read(chan_info->fd,
1003                                buffer, buffer_len);
1004                if (n_bytes == buffer_len)
1005                        break;
1006                if (n_bytes < 0) {
1007                        err = errno;
1008                        RTE_LOG(DEBUG, CHANNEL_MONITOR,
1009                                "Received error on "
1010                                "channel '%s' read: %s\n",
1011                                chan_info->channel_path,
1012                                strerror(err));
1013                        remove_channel(&chan_info);
1014                        break;
1015                }
1016                buffer = (char *)buffer + n_bytes;
1017                buffer_len -= n_bytes;
1018        }
1019        if (!err)
1020                process_request(&pkt, chan_info);
1021}
1022
1023#ifdef USE_JANSSON
1024static void
1025read_json_packet(struct channel_info *chan_info)
1026{
1027        struct rte_power_channel_packet pkt;
1028        int n_bytes, ret;
1029        json_t *root;
1030        json_error_t error;
1031        const char *resource_name;
1032        char *start, *end;
1033        uint32_t n;
1034
1035
1036        /* read opening brace to closing brace */
1037        do {
1038                int idx = 0;
1039                int indent = 0;
1040                do {
1041                        n_bytes = read(chan_info->fd, &json_data[idx], 1);
1042                        if (n_bytes == 0)
1043                                break;
1044                        if (json_data[idx] == '{')
1045                                indent++;
1046                        if (json_data[idx] == '}')
1047                                indent--;
1048                        if ((indent > 0) || (idx > 0))
1049                                idx++;
1050                        if (indent <= 0)
1051                                json_data[idx] = 0;
1052                        if (idx >= MAX_JSON_STRING_LEN-1)
1053                                break;
1054                } while (indent > 0);
1055
1056                json_data[idx] = '\0';
1057
1058                if (strlen(json_data) == 0)
1059                        continue;
1060
1061                printf("got [%s]\n", json_data);
1062
1063                root = json_loads(json_data, 0, &error);
1064
1065                if (root) {
1066                        resource_name = get_resource_name_from_chn_path(
1067                                chan_info->channel_path);
1068                        /*
1069                         * Because our data is now in the json
1070                         * object, we can overwrite the pkt
1071                         * with a rte_power_channel_packet struct, using
1072                         * parse_json_to_pkt()
1073                         */
1074                        ret = parse_json_to_pkt(root, &pkt, resource_name);
1075                        json_decref(root);
1076                        if (ret) {
1077                                RTE_LOG(ERR, CHANNEL_MONITOR,
1078                                        "Error validating JSON profile data\n");
1079                                break;
1080                        }
1081                        start = strstr(pkt.vm_name,
1082                                        CHANNEL_MGR_FIFO_PATTERN_NAME);
1083                        if (start != NULL) {
1084                                /* move past pattern to start of fifo id */
1085                                start += strlen(CHANNEL_MGR_FIFO_PATTERN_NAME);
1086
1087                                end = start;
1088                                n = (uint32_t)strtoul(start, &end, 10);
1089
1090                                if (end[0] == '\0') {
1091                                        /* Add core id to core list */
1092                                        pkt.num_vcpu = 1;
1093                                        pkt.vcpu_to_control[0] = n;
1094                                        process_request(&pkt, chan_info);
1095                                } else {
1096                                        RTE_LOG(ERR, CHANNEL_MONITOR,
1097                                                "Cannot extract core id from fifo name\n");
1098                                }
1099                        } else {
1100                                process_request(&pkt, chan_info);
1101                        }
1102                } else {
1103                        RTE_LOG(ERR, CHANNEL_MONITOR,
1104                                        "JSON error on line %d: %s\n",
1105                                        error.line, error.text);
1106                }
1107        } while (n_bytes > 0);
1108}
1109#endif
1110
1111void
1112run_channel_monitor(void)
1113{
1114        while (run_loop) {
1115                int n_events, i;
1116
1117                n_events = epoll_wait(global_event_fd, global_events_list,
1118                                MAX_EVENTS, 1);
1119                if (!run_loop)
1120                        break;
1121                for (i = 0; i < n_events; i++) {
1122                        struct channel_info *chan_info = (struct channel_info *)
1123                                        global_events_list[i].data.ptr;
1124                        if ((global_events_list[i].events & EPOLLERR) ||
1125                                (global_events_list[i].events & EPOLLHUP)) {
1126                                RTE_LOG(INFO, CHANNEL_MONITOR,
1127                                                "Remote closed connection for "
1128                                                "channel '%s'\n",
1129                                                chan_info->channel_path);
1130                                remove_channel(&chan_info);
1131                                continue;
1132                        }
1133                        if (global_events_list[i].events & EPOLLIN) {
1134
1135                                switch (chan_info->type) {
1136                                case CHANNEL_TYPE_BINARY:
1137                                        read_binary_packet(chan_info);
1138                                        break;
1139#ifdef USE_JANSSON
1140                                case CHANNEL_TYPE_JSON:
1141                                        read_json_packet(chan_info);
1142                                        break;
1143#endif
1144                                default:
1145                                        break;
1146                                }
1147                        }
1148                }
1149                rte_delay_us(time_period_ms*1000);
1150                if (policy_is_set) {
1151                        unsigned int j;
1152
1153                        for (j = 0; j < RTE_DIM(policies); j++) {
1154                                if (policies[j].enabled == 1)
1155                                        apply_policy(&policies[j]);
1156                        }
1157                }
1158        }
1159}
1160