qemu/net/colo-compare.c
<<
>>
Prefs
   1/*
   2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
   3 * (a.k.a. Fault Tolerance or Continuous Replication)
   4 *
   5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 * Copyright (c) 2016 Intel Corporation
   8 *
   9 * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
  10 *
  11 * This work is licensed under the terms of the GNU GPL, version 2 or
  12 * later.  See the COPYING file in the top-level directory.
  13 */
  14
  15#include "qemu/osdep.h"
  16#include "qemu/error-report.h"
  17#include "trace.h"
  18#include "qemu-common.h"
  19#include "qapi/error.h"
  20#include "net/net.h"
  21#include "net/eth.h"
  22#include "qom/object_interfaces.h"
  23#include "qemu/iov.h"
  24#include "qom/object.h"
  25#include "net/queue.h"
  26#include "chardev/char-fe.h"
  27#include "qemu/sockets.h"
  28#include "colo.h"
  29#include "sysemu/iothread.h"
  30#include "net/colo-compare.h"
  31#include "migration/colo.h"
  32#include "migration/migration.h"
  33
  34#define TYPE_COLO_COMPARE "colo-compare"
  35#define COLO_COMPARE(obj) \
  36    OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
  37
  38static QTAILQ_HEAD(, CompareState) net_compares =
  39       QTAILQ_HEAD_INITIALIZER(net_compares);
  40
  41static NotifierList colo_compare_notifiers =
  42    NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
  43
  44#define COMPARE_READ_LEN_MAX NET_BUFSIZE
  45#define MAX_QUEUE_SIZE 1024
  46
  47#define COLO_COMPARE_FREE_PRIMARY     0x01
  48#define COLO_COMPARE_FREE_SECONDARY   0x02
  49
  50/* TODO: Should be configurable */
  51#define REGULAR_PACKET_CHECK_MS 3000
  52
  53static QemuMutex event_mtx;
  54static QemuCond event_complete_cond;
  55static int event_unhandled_count;
  56
  57/*
  58 *  + CompareState ++
  59 *  |               |
  60 *  +---------------+   +---------------+         +---------------+
  61 *  |   conn list   + - >      conn     + ------- >      conn     + -- > ......
  62 *  +---------------+   +---------------+         +---------------+
  63 *  |               |     |           |             |          |
  64 *  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
  65 *                    |primary |  |secondary    |primary | |secondary
  66 *                    |packet  |  |packet  +    |packet  | |packet  +
  67 *                    +--------+  +--------+    +--------+ +--------+
  68 *                        |           |             |          |
  69 *                    +---v----+  +---v----+    +---v----+ +---v----+
  70 *                    |primary |  |secondary    |primary | |secondary
  71 *                    |packet  |  |packet  +    |packet  | |packet  +
  72 *                    +--------+  +--------+    +--------+ +--------+
  73 *                        |           |             |          |
  74 *                    +---v----+  +---v----+    +---v----+ +---v----+
  75 *                    |primary |  |secondary    |primary | |secondary
  76 *                    |packet  |  |packet  +    |packet  | |packet  +
  77 *                    +--------+  +--------+    +--------+ +--------+
  78 */
  79typedef struct CompareState {
  80    Object parent;
  81
  82    char *pri_indev;
  83    char *sec_indev;
  84    char *outdev;
  85    CharBackend chr_pri_in;
  86    CharBackend chr_sec_in;
  87    CharBackend chr_out;
  88    SocketReadState pri_rs;
  89    SocketReadState sec_rs;
  90    bool vnet_hdr;
  91
  92    /*
  93     * Record the connection that through the NIC
  94     * Element type: Connection
  95     */
  96    GQueue conn_list;
  97    /* Record the connection without repetition */
  98    GHashTable *connection_track_table;
  99
 100    IOThread *iothread;
 101    GMainContext *worker_context;
 102    QEMUTimer *packet_check_timer;
 103
 104    QEMUBH *event_bh;
 105    enum colo_event event;
 106
 107    QTAILQ_ENTRY(CompareState) next;
 108} CompareState;
 109
 110typedef struct CompareClass {
 111    ObjectClass parent_class;
 112} CompareClass;
 113
 114enum {
 115    PRIMARY_IN = 0,
 116    SECONDARY_IN,
 117};
 118
 119static void colo_compare_inconsistency_notify(void)
 120{
 121    notifier_list_notify(&colo_compare_notifiers,
 122                migrate_get_current());
 123}
 124
 125static int compare_chr_send(CompareState *s,
 126                            const uint8_t *buf,
 127                            uint32_t size,
 128                            uint32_t vnet_hdr_len);
 129
 130static gint seq_sorter(Packet *a, Packet *b, gpointer data)
 131{
 132    struct tcphdr *atcp, *btcp;
 133
 134    atcp = (struct tcphdr *)(a->transport_header);
 135    btcp = (struct tcphdr *)(b->transport_header);
 136    return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
 137}
 138
 139static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
 140{
 141    Packet *pkt = data;
 142    struct tcphdr *tcphd;
 143
 144    tcphd = (struct tcphdr *)pkt->transport_header;
 145
 146    pkt->tcp_seq = ntohl(tcphd->th_seq);
 147    pkt->tcp_ack = ntohl(tcphd->th_ack);
 148    *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
 149    pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
 150                       + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
 151    pkt->payload_size = pkt->size - pkt->header_size;
 152    pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
 153    pkt->flags = tcphd->th_flags;
 154}
 155
 156/*
 157 * Return 1 on success, if return 0 means the
 158 * packet will be dropped
 159 */
 160static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
 161{
 162    if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) {
 163        if (pkt->ip->ip_p == IPPROTO_TCP) {
 164            fill_pkt_tcp_info(pkt, max_ack);
 165            g_queue_insert_sorted(queue,
 166                                  pkt,
 167                                  (GCompareDataFunc)seq_sorter,
 168                                  NULL);
 169        } else {
 170            g_queue_push_tail(queue, pkt);
 171        }
 172        return 1;
 173    }
 174    return 0;
 175}
 176
 177/*
 178 * Return 0 on success, if return -1 means the pkt
 179 * is unsupported(arp and ipv6) and will be sent later
 180 */
 181static int packet_enqueue(CompareState *s, int mode, Connection **con)
 182{
 183    ConnectionKey key;
 184    Packet *pkt = NULL;
 185    Connection *conn;
 186
 187    if (mode == PRIMARY_IN) {
 188        pkt = packet_new(s->pri_rs.buf,
 189                         s->pri_rs.packet_len,
 190                         s->pri_rs.vnet_hdr_len);
 191    } else {
 192        pkt = packet_new(s->sec_rs.buf,
 193                         s->sec_rs.packet_len,
 194                         s->sec_rs.vnet_hdr_len);
 195    }
 196
 197    if (parse_packet_early(pkt)) {
 198        packet_destroy(pkt, NULL);
 199        pkt = NULL;
 200        return -1;
 201    }
 202    fill_connection_key(pkt, &key);
 203
 204    conn = connection_get(s->connection_track_table,
 205                          &key,
 206                          &s->conn_list);
 207
 208    if (!conn->processing) {
 209        g_queue_push_tail(&s->conn_list, conn);
 210        conn->processing = true;
 211    }
 212
 213    if (mode == PRIMARY_IN) {
 214        if (!colo_insert_packet(&conn->primary_list, pkt, &conn->pack)) {
 215            error_report("colo compare primary queue size too big,"
 216                         "drop packet");
 217        }
 218    } else {
 219        if (!colo_insert_packet(&conn->secondary_list, pkt, &conn->sack)) {
 220            error_report("colo compare secondary queue size too big,"
 221                         "drop packet");
 222        }
 223    }
 224    *con = conn;
 225
 226    return 0;
 227}
 228
 229static inline bool after(uint32_t seq1, uint32_t seq2)
 230{
 231        return (int32_t)(seq1 - seq2) > 0;
 232}
 233
 234static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
 235{
 236    int ret;
 237    ret = compare_chr_send(s,
 238                           pkt->data,
 239                           pkt->size,
 240                           pkt->vnet_hdr_len);
 241    if (ret < 0) {
 242        error_report("colo send primary packet failed");
 243    }
 244    trace_colo_compare_main("packet same and release packet");
 245    packet_destroy(pkt, NULL);
 246}
 247
 248/*
 249 * The IP packets sent by primary and secondary
 250 * will be compared in here
 251 * TODO support ip fragment, Out-Of-Order
 252 * return:    0  means packet same
 253 *            > 0 || < 0 means packet different
 254 */
 255static int colo_compare_packet_payload(Packet *ppkt,
 256                                       Packet *spkt,
 257                                       uint16_t poffset,
 258                                       uint16_t soffset,
 259                                       uint16_t len)
 260
 261{
 262    if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
 263        char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
 264
 265        strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
 266        strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
 267        strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
 268        strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
 269
 270        trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
 271                                   pri_ip_dst, spkt->size,
 272                                   sec_ip_src, sec_ip_dst);
 273    }
 274
 275    return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
 276}
 277
 278/*
 279 * return true means that the payload is consist and
 280 * need to make the next comparison, false means do
 281 * the checkpoint
 282*/
 283static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
 284                              int8_t *mark, uint32_t max_ack)
 285{
 286    *mark = 0;
 287
 288    if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
 289        if (colo_compare_packet_payload(ppkt, spkt,
 290                                        ppkt->header_size, spkt->header_size,
 291                                        ppkt->payload_size)) {
 292            *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
 293            return true;
 294        }
 295    }
 296    if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
 297        if (colo_compare_packet_payload(ppkt, spkt,
 298                                        ppkt->header_size, spkt->header_size,
 299                                        ppkt->payload_size)) {
 300            *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
 301            return true;
 302        }
 303    }
 304
 305    /* one part of secondary packet payload still need to be compared */
 306    if (!after(ppkt->seq_end, spkt->seq_end)) {
 307        if (colo_compare_packet_payload(ppkt, spkt,
 308                                        ppkt->header_size + ppkt->offset,
 309                                        spkt->header_size + spkt->offset,
 310                                        ppkt->payload_size - ppkt->offset)) {
 311            if (!after(ppkt->tcp_ack, max_ack)) {
 312                *mark = COLO_COMPARE_FREE_PRIMARY;
 313                spkt->offset += ppkt->payload_size - ppkt->offset;
 314                return true;
 315            } else {
 316                /* secondary guest hasn't ack the data, don't send
 317                 * out this packet
 318                 */
 319                return false;
 320            }
 321        }
 322    } else {
 323        /* primary packet is longer than secondary packet, compare
 324         * the same part and mark the primary packet offset
 325         */
 326        if (colo_compare_packet_payload(ppkt, spkt,
 327                                        ppkt->header_size + ppkt->offset,
 328                                        spkt->header_size + spkt->offset,
 329                                        spkt->payload_size - spkt->offset)) {
 330            *mark = COLO_COMPARE_FREE_SECONDARY;
 331            ppkt->offset += spkt->payload_size - spkt->offset;
 332            return true;
 333        }
 334    }
 335
 336    return false;
 337}
 338
 339static void colo_compare_tcp(CompareState *s, Connection *conn)
 340{
 341    Packet *ppkt = NULL, *spkt = NULL;
 342    int8_t mark;
 343
 344    /*
 345     * If ppkt and spkt have the same payload, but ppkt's ACK
 346     * is greater than spkt's ACK, in this case we can not
 347     * send the ppkt because it will cause the secondary guest
 348     * to miss sending some data in the next. Therefore, we
 349     * record the maximum ACK in the current queue at both
 350     * primary side and secondary side. Only when the ack is
 351     * less than the smaller of the two maximum ack, then we
 352     * can ensure that the packet's payload is acknowledged by
 353     * primary and secondary.
 354    */
 355    uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
 356
 357pri:
 358    if (g_queue_is_empty(&conn->primary_list)) {
 359        return;
 360    }
 361    ppkt = g_queue_pop_head(&conn->primary_list);
 362sec:
 363    if (g_queue_is_empty(&conn->secondary_list)) {
 364        g_queue_push_head(&conn->primary_list, ppkt);
 365        return;
 366    }
 367    spkt = g_queue_pop_head(&conn->secondary_list);
 368
 369    if (ppkt->tcp_seq == ppkt->seq_end) {
 370        colo_release_primary_pkt(s, ppkt);
 371        ppkt = NULL;
 372    }
 373
 374    if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
 375        trace_colo_compare_main("pri: this packet has compared");
 376        colo_release_primary_pkt(s, ppkt);
 377        ppkt = NULL;
 378    }
 379
 380    if (spkt->tcp_seq == spkt->seq_end) {
 381        packet_destroy(spkt, NULL);
 382        if (!ppkt) {
 383            goto pri;
 384        } else {
 385            goto sec;
 386        }
 387    } else {
 388        if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
 389            trace_colo_compare_main("sec: this packet has compared");
 390            packet_destroy(spkt, NULL);
 391            if (!ppkt) {
 392                goto pri;
 393            } else {
 394                goto sec;
 395            }
 396        }
 397        if (!ppkt) {
 398            g_queue_push_head(&conn->secondary_list, spkt);
 399            goto pri;
 400        }
 401    }
 402
 403    if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
 404        trace_colo_compare_tcp_info("pri",
 405                                    ppkt->tcp_seq, ppkt->tcp_ack,
 406                                    ppkt->header_size, ppkt->payload_size,
 407                                    ppkt->offset, ppkt->flags);
 408
 409        trace_colo_compare_tcp_info("sec",
 410                                    spkt->tcp_seq, spkt->tcp_ack,
 411                                    spkt->header_size, spkt->payload_size,
 412                                    spkt->offset, spkt->flags);
 413
 414        if (mark == COLO_COMPARE_FREE_PRIMARY) {
 415            conn->compare_seq = ppkt->seq_end;
 416            colo_release_primary_pkt(s, ppkt);
 417            g_queue_push_head(&conn->secondary_list, spkt);
 418            goto pri;
 419        }
 420        if (mark == COLO_COMPARE_FREE_SECONDARY) {
 421            conn->compare_seq = spkt->seq_end;
 422            packet_destroy(spkt, NULL);
 423            goto sec;
 424        }
 425        if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
 426            conn->compare_seq = ppkt->seq_end;
 427            colo_release_primary_pkt(s, ppkt);
 428            packet_destroy(spkt, NULL);
 429            goto pri;
 430        }
 431    } else {
 432        g_queue_push_head(&conn->primary_list, ppkt);
 433        g_queue_push_head(&conn->secondary_list, spkt);
 434
 435        qemu_hexdump((char *)ppkt->data, stderr,
 436                     "colo-compare ppkt", ppkt->size);
 437        qemu_hexdump((char *)spkt->data, stderr,
 438                     "colo-compare spkt", spkt->size);
 439
 440        colo_compare_inconsistency_notify();
 441    }
 442}
 443
 444
 445/*
 446 * Called from the compare thread on the primary
 447 * for compare udp packet
 448 */
 449static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
 450{
 451    uint16_t network_header_length = ppkt->ip->ip_hl << 2;
 452    uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
 453
 454    trace_colo_compare_main("compare udp");
 455
 456    /*
 457     * Because of ppkt and spkt are both in the same connection,
 458     * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
 459     * same with spkt. In addition, IP header's Identification is a random
 460     * field, we can handle it in IP fragmentation function later.
 461     * COLO just concern the response net packet payload from primary guest
 462     * and secondary guest are same or not, So we ignored all IP header include
 463     * other field like TOS,TTL,IP Checksum. we only need to compare
 464     * the ip payload here.
 465     */
 466    if (ppkt->size != spkt->size) {
 467        trace_colo_compare_main("UDP: payload size of packets are different");
 468        return -1;
 469    }
 470    if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
 471                                    ppkt->size - offset)) {
 472        trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
 473        trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
 474        if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
 475            qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
 476                         ppkt->size);
 477            qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
 478                         spkt->size);
 479        }
 480        return -1;
 481    } else {
 482        return 0;
 483    }
 484}
 485
 486/*
 487 * Called from the compare thread on the primary
 488 * for compare icmp packet
 489 */
 490static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
 491{
 492    uint16_t network_header_length = ppkt->ip->ip_hl << 2;
 493    uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
 494
 495    trace_colo_compare_main("compare icmp");
 496
 497    /*
 498     * Because of ppkt and spkt are both in the same connection,
 499     * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
 500     * same with spkt. In addition, IP header's Identification is a random
 501     * field, we can handle it in IP fragmentation function later.
 502     * COLO just concern the response net packet payload from primary guest
 503     * and secondary guest are same or not, So we ignored all IP header include
 504     * other field like TOS,TTL,IP Checksum. we only need to compare
 505     * the ip payload here.
 506     */
 507    if (ppkt->size != spkt->size) {
 508        trace_colo_compare_main("ICMP: payload size of packets are different");
 509        return -1;
 510    }
 511    if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
 512                                    ppkt->size - offset)) {
 513        trace_colo_compare_icmp_miscompare("primary pkt size",
 514                                           ppkt->size);
 515        trace_colo_compare_icmp_miscompare("Secondary pkt size",
 516                                           spkt->size);
 517        if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
 518            qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
 519                         ppkt->size);
 520            qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
 521                         spkt->size);
 522        }
 523        return -1;
 524    } else {
 525        return 0;
 526    }
 527}
 528
 529/*
 530 * Called from the compare thread on the primary
 531 * for compare other packet
 532 */
 533static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
 534{
 535    uint16_t offset = ppkt->vnet_hdr_len;
 536
 537    trace_colo_compare_main("compare other");
 538    if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
 539        char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
 540
 541        strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
 542        strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
 543        strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
 544        strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
 545
 546        trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
 547                                   pri_ip_dst, spkt->size,
 548                                   sec_ip_src, sec_ip_dst);
 549    }
 550
 551    if (ppkt->size != spkt->size) {
 552        trace_colo_compare_main("Other: payload size of packets are different");
 553        return -1;
 554    }
 555    return colo_compare_packet_payload(ppkt, spkt, offset, offset,
 556                                       ppkt->size - offset);
 557}
 558
 559static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
 560{
 561    int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 562
 563    if ((now - pkt->creation_ms) > (*check_time)) {
 564        trace_colo_old_packet_check_found(pkt->creation_ms);
 565        return 0;
 566    } else {
 567        return 1;
 568    }
 569}
 570
 571void colo_compare_register_notifier(Notifier *notify)
 572{
 573    notifier_list_add(&colo_compare_notifiers, notify);
 574}
 575
 576void colo_compare_unregister_notifier(Notifier *notify)
 577{
 578    notifier_remove(notify);
 579}
 580
 581static int colo_old_packet_check_one_conn(Connection *conn,
 582                                           void *user_data)
 583{
 584    GList *result = NULL;
 585    int64_t check_time = REGULAR_PACKET_CHECK_MS;
 586
 587    result = g_queue_find_custom(&conn->primary_list,
 588                                 &check_time,
 589                                 (GCompareFunc)colo_old_packet_check_one);
 590
 591    if (result) {
 592        /* Do checkpoint will flush old packet */
 593        colo_compare_inconsistency_notify();
 594        return 0;
 595    }
 596
 597    return 1;
 598}
 599
 600/*
 601 * Look for old packets that the secondary hasn't matched,
 602 * if we have some then we have to checkpoint to wake
 603 * the secondary up.
 604 */
 605static void colo_old_packet_check(void *opaque)
 606{
 607    CompareState *s = opaque;
 608
 609    /*
 610     * If we find one old packet, stop finding job and notify
 611     * COLO frame do checkpoint.
 612     */
 613    g_queue_find_custom(&s->conn_list, NULL,
 614                        (GCompareFunc)colo_old_packet_check_one_conn);
 615}
 616
 617static void colo_compare_packet(CompareState *s, Connection *conn,
 618                                int (*HandlePacket)(Packet *spkt,
 619                                Packet *ppkt))
 620{
 621    Packet *pkt = NULL;
 622    GList *result = NULL;
 623
 624    while (!g_queue_is_empty(&conn->primary_list) &&
 625           !g_queue_is_empty(&conn->secondary_list)) {
 626        pkt = g_queue_pop_head(&conn->primary_list);
 627        result = g_queue_find_custom(&conn->secondary_list,
 628                 pkt, (GCompareFunc)HandlePacket);
 629
 630        if (result) {
 631            colo_release_primary_pkt(s, pkt);
 632            g_queue_remove(&conn->secondary_list, result->data);
 633        } else {
 634            /*
 635             * If one packet arrive late, the secondary_list or
 636             * primary_list will be empty, so we can't compare it
 637             * until next comparison. If the packets in the list are
 638             * timeout, it will trigger a checkpoint request.
 639             */
 640            trace_colo_compare_main("packet different");
 641            g_queue_push_head(&conn->primary_list, pkt);
 642            colo_compare_inconsistency_notify();
 643            break;
 644        }
 645    }
 646}
 647
 648/*
 649 * Called from the compare thread on the primary
 650 * for compare packet with secondary list of the
 651 * specified connection when a new packet was
 652 * queued to it.
 653 */
 654static void colo_compare_connection(void *opaque, void *user_data)
 655{
 656    CompareState *s = user_data;
 657    Connection *conn = opaque;
 658
 659    switch (conn->ip_proto) {
 660    case IPPROTO_TCP:
 661        colo_compare_tcp(s, conn);
 662        break;
 663    case IPPROTO_UDP:
 664        colo_compare_packet(s, conn, colo_packet_compare_udp);
 665        break;
 666    case IPPROTO_ICMP:
 667        colo_compare_packet(s, conn, colo_packet_compare_icmp);
 668        break;
 669    default:
 670        colo_compare_packet(s, conn, colo_packet_compare_other);
 671        break;
 672    }
 673}
 674
 675static int compare_chr_send(CompareState *s,
 676                            const uint8_t *buf,
 677                            uint32_t size,
 678                            uint32_t vnet_hdr_len)
 679{
 680    int ret = 0;
 681    uint32_t len = htonl(size);
 682
 683    if (!size) {
 684        return 0;
 685    }
 686
 687    ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
 688    if (ret != sizeof(len)) {
 689        goto err;
 690    }
 691
 692    if (s->vnet_hdr) {
 693        /*
 694         * We send vnet header len make other module(like filter-redirector)
 695         * know how to parse net packet correctly.
 696         */
 697        len = htonl(vnet_hdr_len);
 698        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
 699        if (ret != sizeof(len)) {
 700            goto err;
 701        }
 702    }
 703
 704    ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
 705    if (ret != size) {
 706        goto err;
 707    }
 708
 709    return 0;
 710
 711err:
 712    return ret < 0 ? ret : -EIO;
 713}
 714
 715static int compare_chr_can_read(void *opaque)
 716{
 717    return COMPARE_READ_LEN_MAX;
 718}
 719
 720/*
 721 * Called from the main thread on the primary for packets
 722 * arriving over the socket from the primary.
 723 */
 724static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
 725{
 726    CompareState *s = COLO_COMPARE(opaque);
 727    int ret;
 728
 729    ret = net_fill_rstate(&s->pri_rs, buf, size);
 730    if (ret == -1) {
 731        qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
 732                                 NULL, NULL, true);
 733        error_report("colo-compare primary_in error");
 734    }
 735}
 736
 737/*
 738 * Called from the main thread on the primary for packets
 739 * arriving over the socket from the secondary.
 740 */
 741static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
 742{
 743    CompareState *s = COLO_COMPARE(opaque);
 744    int ret;
 745
 746    ret = net_fill_rstate(&s->sec_rs, buf, size);
 747    if (ret == -1) {
 748        qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
 749                                 NULL, NULL, true);
 750        error_report("colo-compare secondary_in error");
 751    }
 752}
 753
 754/*
 755 * Check old packet regularly so it can watch for any packets
 756 * that the secondary hasn't produced equivalents of.
 757 */
 758static void check_old_packet_regular(void *opaque)
 759{
 760    CompareState *s = opaque;
 761
 762    /* if have old packet we will notify checkpoint */
 763    colo_old_packet_check(s);
 764    timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
 765                REGULAR_PACKET_CHECK_MS);
 766}
 767
 768/* Public API, Used for COLO frame to notify compare event */
 769void colo_notify_compares_event(void *opaque, int event, Error **errp)
 770{
 771    CompareState *s;
 772
 773    qemu_mutex_lock(&event_mtx);
 774    QTAILQ_FOREACH(s, &net_compares, next) {
 775        s->event = event;
 776        qemu_bh_schedule(s->event_bh);
 777        event_unhandled_count++;
 778    }
 779    /* Wait all compare threads to finish handling this event */
 780    while (event_unhandled_count > 0) {
 781        qemu_cond_wait(&event_complete_cond, &event_mtx);
 782    }
 783
 784    qemu_mutex_unlock(&event_mtx);
 785}
 786
 787static void colo_compare_timer_init(CompareState *s)
 788{
 789    AioContext *ctx = iothread_get_aio_context(s->iothread);
 790
 791    s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
 792                                SCALE_MS, check_old_packet_regular,
 793                                s);
 794    timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
 795                    REGULAR_PACKET_CHECK_MS);
 796}
 797
 798static void colo_compare_timer_del(CompareState *s)
 799{
 800    if (s->packet_check_timer) {
 801        timer_del(s->packet_check_timer);
 802        timer_free(s->packet_check_timer);
 803        s->packet_check_timer = NULL;
 804    }
 805 }
 806
 807static void colo_flush_packets(void *opaque, void *user_data);
 808
 809static void colo_compare_handle_event(void *opaque)
 810{
 811    CompareState *s = opaque;
 812
 813    switch (s->event) {
 814    case COLO_EVENT_CHECKPOINT:
 815        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
 816        break;
 817    case COLO_EVENT_FAILOVER:
 818        break;
 819    default:
 820        break;
 821    }
 822
 823    assert(event_unhandled_count > 0);
 824
 825    qemu_mutex_lock(&event_mtx);
 826    event_unhandled_count--;
 827    qemu_cond_broadcast(&event_complete_cond);
 828    qemu_mutex_unlock(&event_mtx);
 829}
 830
 831static void colo_compare_iothread(CompareState *s)
 832{
 833    object_ref(OBJECT(s->iothread));
 834    s->worker_context = iothread_get_g_main_context(s->iothread);
 835
 836    qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
 837                             compare_pri_chr_in, NULL, NULL,
 838                             s, s->worker_context, true);
 839    qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
 840                             compare_sec_chr_in, NULL, NULL,
 841                             s, s->worker_context, true);
 842
 843    colo_compare_timer_init(s);
 844    s->event_bh = qemu_bh_new(colo_compare_handle_event, s);
 845}
 846
 847static char *compare_get_pri_indev(Object *obj, Error **errp)
 848{
 849    CompareState *s = COLO_COMPARE(obj);
 850
 851    return g_strdup(s->pri_indev);
 852}
 853
 854static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
 855{
 856    CompareState *s = COLO_COMPARE(obj);
 857
 858    g_free(s->pri_indev);
 859    s->pri_indev = g_strdup(value);
 860}
 861
 862static char *compare_get_sec_indev(Object *obj, Error **errp)
 863{
 864    CompareState *s = COLO_COMPARE(obj);
 865
 866    return g_strdup(s->sec_indev);
 867}
 868
 869static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
 870{
 871    CompareState *s = COLO_COMPARE(obj);
 872
 873    g_free(s->sec_indev);
 874    s->sec_indev = g_strdup(value);
 875}
 876
 877static char *compare_get_outdev(Object *obj, Error **errp)
 878{
 879    CompareState *s = COLO_COMPARE(obj);
 880
 881    return g_strdup(s->outdev);
 882}
 883
 884static void compare_set_outdev(Object *obj, const char *value, Error **errp)
 885{
 886    CompareState *s = COLO_COMPARE(obj);
 887
 888    g_free(s->outdev);
 889    s->outdev = g_strdup(value);
 890}
 891
 892static bool compare_get_vnet_hdr(Object *obj, Error **errp)
 893{
 894    CompareState *s = COLO_COMPARE(obj);
 895
 896    return s->vnet_hdr;
 897}
 898
 899static void compare_set_vnet_hdr(Object *obj,
 900                                 bool value,
 901                                 Error **errp)
 902{
 903    CompareState *s = COLO_COMPARE(obj);
 904
 905    s->vnet_hdr = value;
 906}
 907
 908static void compare_pri_rs_finalize(SocketReadState *pri_rs)
 909{
 910    CompareState *s = container_of(pri_rs, CompareState, pri_rs);
 911    Connection *conn = NULL;
 912
 913    if (packet_enqueue(s, PRIMARY_IN, &conn)) {
 914        trace_colo_compare_main("primary: unsupported packet in");
 915        compare_chr_send(s,
 916                         pri_rs->buf,
 917                         pri_rs->packet_len,
 918                         pri_rs->vnet_hdr_len);
 919    } else {
 920        /* compare packet in the specified connection */
 921        colo_compare_connection(conn, s);
 922    }
 923}
 924
 925static void compare_sec_rs_finalize(SocketReadState *sec_rs)
 926{
 927    CompareState *s = container_of(sec_rs, CompareState, sec_rs);
 928    Connection *conn = NULL;
 929
 930    if (packet_enqueue(s, SECONDARY_IN, &conn)) {
 931        trace_colo_compare_main("secondary: unsupported packet in");
 932    } else {
 933        /* compare packet in the specified connection */
 934        colo_compare_connection(conn, s);
 935    }
 936}
 937
 938
 939/*
 940 * Return 0 is success.
 941 * Return 1 is failed.
 942 */
 943static int find_and_check_chardev(Chardev **chr,
 944                                  char *chr_name,
 945                                  Error **errp)
 946{
 947    *chr = qemu_chr_find(chr_name);
 948    if (*chr == NULL) {
 949        error_setg(errp, "Device '%s' not found",
 950                   chr_name);
 951        return 1;
 952    }
 953
 954    if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
 955        error_setg(errp, "chardev \"%s\" is not reconnectable",
 956                   chr_name);
 957        return 1;
 958    }
 959
 960    return 0;
 961}
 962
 963/*
 964 * Called from the main thread on the primary
 965 * to setup colo-compare.
 966 */
 967static void colo_compare_complete(UserCreatable *uc, Error **errp)
 968{
 969    CompareState *s = COLO_COMPARE(uc);
 970    Chardev *chr;
 971
 972    if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
 973        error_setg(errp, "colo compare needs 'primary_in' ,"
 974                   "'secondary_in','outdev','iothread' property set");
 975        return;
 976    } else if (!strcmp(s->pri_indev, s->outdev) ||
 977               !strcmp(s->sec_indev, s->outdev) ||
 978               !strcmp(s->pri_indev, s->sec_indev)) {
 979        error_setg(errp, "'indev' and 'outdev' could not be same "
 980                   "for compare module");
 981        return;
 982    }
 983
 984    if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
 985        !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
 986        return;
 987    }
 988
 989    if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
 990        !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
 991        return;
 992    }
 993
 994    if (find_and_check_chardev(&chr, s->outdev, errp) ||
 995        !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
 996        return;
 997    }
 998
 999    net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
1000    net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
1001
1002    QTAILQ_INSERT_TAIL(&net_compares, s, next);
1003
1004    g_queue_init(&s->conn_list);
1005
1006    qemu_mutex_init(&event_mtx);
1007    qemu_cond_init(&event_complete_cond);
1008
1009    s->connection_track_table = g_hash_table_new_full(connection_key_hash,
1010                                                      connection_key_equal,
1011                                                      g_free,
1012                                                      connection_destroy);
1013
1014    colo_compare_iothread(s);
1015    return;
1016}
1017
1018static void colo_flush_packets(void *opaque, void *user_data)
1019{
1020    CompareState *s = user_data;
1021    Connection *conn = opaque;
1022    Packet *pkt = NULL;
1023
1024    while (!g_queue_is_empty(&conn->primary_list)) {
1025        pkt = g_queue_pop_head(&conn->primary_list);
1026        compare_chr_send(s,
1027                         pkt->data,
1028                         pkt->size,
1029                         pkt->vnet_hdr_len);
1030        packet_destroy(pkt, NULL);
1031    }
1032    while (!g_queue_is_empty(&conn->secondary_list)) {
1033        pkt = g_queue_pop_head(&conn->secondary_list);
1034        packet_destroy(pkt, NULL);
1035    }
1036}
1037
1038static void colo_compare_class_init(ObjectClass *oc, void *data)
1039{
1040    UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
1041
1042    ucc->complete = colo_compare_complete;
1043}
1044
1045static void colo_compare_init(Object *obj)
1046{
1047    CompareState *s = COLO_COMPARE(obj);
1048
1049    object_property_add_str(obj, "primary_in",
1050                            compare_get_pri_indev, compare_set_pri_indev,
1051                            NULL);
1052    object_property_add_str(obj, "secondary_in",
1053                            compare_get_sec_indev, compare_set_sec_indev,
1054                            NULL);
1055    object_property_add_str(obj, "outdev",
1056                            compare_get_outdev, compare_set_outdev,
1057                            NULL);
1058    object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
1059                            (Object **)&s->iothread,
1060                            object_property_allow_set_link,
1061                            OBJ_PROP_LINK_STRONG, NULL);
1062
1063    s->vnet_hdr = false;
1064    object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
1065                             compare_set_vnet_hdr, NULL);
1066}
1067
1068static void colo_compare_finalize(Object *obj)
1069{
1070    CompareState *s = COLO_COMPARE(obj);
1071    CompareState *tmp = NULL;
1072
1073    qemu_chr_fe_deinit(&s->chr_pri_in, false);
1074    qemu_chr_fe_deinit(&s->chr_sec_in, false);
1075    qemu_chr_fe_deinit(&s->chr_out, false);
1076    if (s->iothread) {
1077        colo_compare_timer_del(s);
1078    }
1079
1080    qemu_bh_delete(s->event_bh);
1081
1082    QTAILQ_FOREACH(tmp, &net_compares, next) {
1083        if (tmp == s) {
1084            QTAILQ_REMOVE(&net_compares, s, next);
1085            break;
1086        }
1087    }
1088
1089    /* Release all unhandled packets after compare thead exited */
1090    g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1091
1092    g_queue_clear(&s->conn_list);
1093
1094    if (s->connection_track_table) {
1095        g_hash_table_destroy(s->connection_track_table);
1096    }
1097
1098    if (s->iothread) {
1099        object_unref(OBJECT(s->iothread));
1100    }
1101
1102    qemu_mutex_destroy(&event_mtx);
1103    qemu_cond_destroy(&event_complete_cond);
1104
1105    g_free(s->pri_indev);
1106    g_free(s->sec_indev);
1107    g_free(s->outdev);
1108}
1109
1110static const TypeInfo colo_compare_info = {
1111    .name = TYPE_COLO_COMPARE,
1112    .parent = TYPE_OBJECT,
1113    .instance_size = sizeof(CompareState),
1114    .instance_init = colo_compare_init,
1115    .instance_finalize = colo_compare_finalize,
1116    .class_size = sizeof(CompareClass),
1117    .class_init = colo_compare_class_init,
1118    .interfaces = (InterfaceInfo[]) {
1119        { TYPE_USER_CREATABLE },
1120        { }
1121    }
1122};
1123
1124static void register_types(void)
1125{
1126    type_register_static(&colo_compare_info);
1127}
1128
1129type_init(register_types);
1130