qemu/net/colo-compare.c
<<
>>
Prefs
   1/*
   2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
   3 * (a.k.a. Fault Tolerance or Continuous Replication)
   4 *
   5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 * Copyright (c) 2016 Intel Corporation
   8 *
   9 * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
  10 *
  11 * This work is licensed under the terms of the GNU GPL, version 2 or
  12 * later.  See the COPYING file in the top-level directory.
  13 */
  14
  15#include "qemu/osdep.h"
  16#include "qemu/error-report.h"
  17#include "trace.h"
  18#include "qemu-common.h"
  19#include "qapi/error.h"
  20#include "net/net.h"
  21#include "net/eth.h"
  22#include "qom/object_interfaces.h"
  23#include "qemu/iov.h"
  24#include "qom/object.h"
  25#include "net/queue.h"
  26#include "chardev/char-fe.h"
  27#include "qemu/sockets.h"
  28#include "colo.h"
  29#include "sysemu/iothread.h"
  30#include "net/colo-compare.h"
  31#include "migration/colo.h"
  32#include "migration/migration.h"
  33#include "util.h"
  34
  35#define TYPE_COLO_COMPARE "colo-compare"
  36#define COLO_COMPARE(obj) \
  37    OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
  38
  39static QTAILQ_HEAD(, CompareState) net_compares =
  40       QTAILQ_HEAD_INITIALIZER(net_compares);
  41
  42static NotifierList colo_compare_notifiers =
  43    NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
  44
  45#define COMPARE_READ_LEN_MAX NET_BUFSIZE
  46#define MAX_QUEUE_SIZE 1024
  47
  48#define COLO_COMPARE_FREE_PRIMARY     0x01
  49#define COLO_COMPARE_FREE_SECONDARY   0x02
  50
  51/* TODO: Should be configurable */
  52#define REGULAR_PACKET_CHECK_MS 3000
  53
  54static QemuMutex event_mtx;
  55static QemuCond event_complete_cond;
  56static int event_unhandled_count;
  57
  58/*
  59 *  + CompareState ++
  60 *  |               |
  61 *  +---------------+   +---------------+         +---------------+
  62 *  |   conn list   + - >      conn     + ------- >      conn     + -- > ......
  63 *  +---------------+   +---------------+         +---------------+
  64 *  |               |     |           |             |          |
  65 *  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
  66 *                    |primary |  |secondary    |primary | |secondary
  67 *                    |packet  |  |packet  +    |packet  | |packet  +
  68 *                    +--------+  +--------+    +--------+ +--------+
  69 *                        |           |             |          |
  70 *                    +---v----+  +---v----+    +---v----+ +---v----+
  71 *                    |primary |  |secondary    |primary | |secondary
  72 *                    |packet  |  |packet  +    |packet  | |packet  +
  73 *                    +--------+  +--------+    +--------+ +--------+
  74 *                        |           |             |          |
  75 *                    +---v----+  +---v----+    +---v----+ +---v----+
  76 *                    |primary |  |secondary    |primary | |secondary
  77 *                    |packet  |  |packet  +    |packet  | |packet  +
  78 *                    +--------+  +--------+    +--------+ +--------+
  79 */
  80typedef struct CompareState {
  81    Object parent;
  82
  83    char *pri_indev;
  84    char *sec_indev;
  85    char *outdev;
  86    CharBackend chr_pri_in;
  87    CharBackend chr_sec_in;
  88    CharBackend chr_out;
  89    SocketReadState pri_rs;
  90    SocketReadState sec_rs;
  91    bool vnet_hdr;
  92
  93    /*
  94     * Record the connection that through the NIC
  95     * Element type: Connection
  96     */
  97    GQueue conn_list;
  98    /* Record the connection without repetition */
  99    GHashTable *connection_track_table;
 100
 101    IOThread *iothread;
 102    GMainContext *worker_context;
 103    QEMUTimer *packet_check_timer;
 104
 105    QEMUBH *event_bh;
 106    enum colo_event event;
 107
 108    QTAILQ_ENTRY(CompareState) next;
 109} CompareState;
 110
 111typedef struct CompareClass {
 112    ObjectClass parent_class;
 113} CompareClass;
 114
 115enum {
 116    PRIMARY_IN = 0,
 117    SECONDARY_IN,
 118};
 119
 120static void colo_compare_inconsistency_notify(void)
 121{
 122    notifier_list_notify(&colo_compare_notifiers,
 123                migrate_get_current());
 124}
 125
 126static int compare_chr_send(CompareState *s,
 127                            const uint8_t *buf,
 128                            uint32_t size,
 129                            uint32_t vnet_hdr_len);
 130
 131static gint seq_sorter(Packet *a, Packet *b, gpointer data)
 132{
 133    struct tcp_hdr *atcp, *btcp;
 134
 135    atcp = (struct tcp_hdr *)(a->transport_header);
 136    btcp = (struct tcp_hdr *)(b->transport_header);
 137    return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
 138}
 139
 140static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
 141{
 142    Packet *pkt = data;
 143    struct tcp_hdr *tcphd;
 144
 145    tcphd = (struct tcp_hdr *)pkt->transport_header;
 146
 147    pkt->tcp_seq = ntohl(tcphd->th_seq);
 148    pkt->tcp_ack = ntohl(tcphd->th_ack);
 149    *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
 150    pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
 151                       + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
 152    pkt->payload_size = pkt->size - pkt->header_size;
 153    pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
 154    pkt->flags = tcphd->th_flags;
 155}
 156
 157/*
 158 * Return 1 on success, if return 0 means the
 159 * packet will be dropped
 160 */
 161static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
 162{
 163    if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) {
 164        if (pkt->ip->ip_p == IPPROTO_TCP) {
 165            fill_pkt_tcp_info(pkt, max_ack);
 166            g_queue_insert_sorted(queue,
 167                                  pkt,
 168                                  (GCompareDataFunc)seq_sorter,
 169                                  NULL);
 170        } else {
 171            g_queue_push_tail(queue, pkt);
 172        }
 173        return 1;
 174    }
 175    return 0;
 176}
 177
 178/*
 179 * Return 0 on success, if return -1 means the pkt
 180 * is unsupported(arp and ipv6) and will be sent later
 181 */
 182static int packet_enqueue(CompareState *s, int mode, Connection **con)
 183{
 184    ConnectionKey key;
 185    Packet *pkt = NULL;
 186    Connection *conn;
 187
 188    if (mode == PRIMARY_IN) {
 189        pkt = packet_new(s->pri_rs.buf,
 190                         s->pri_rs.packet_len,
 191                         s->pri_rs.vnet_hdr_len);
 192    } else {
 193        pkt = packet_new(s->sec_rs.buf,
 194                         s->sec_rs.packet_len,
 195                         s->sec_rs.vnet_hdr_len);
 196    }
 197
 198    if (parse_packet_early(pkt)) {
 199        packet_destroy(pkt, NULL);
 200        pkt = NULL;
 201        return -1;
 202    }
 203    fill_connection_key(pkt, &key);
 204
 205    conn = connection_get(s->connection_track_table,
 206                          &key,
 207                          &s->conn_list);
 208
 209    if (!conn->processing) {
 210        g_queue_push_tail(&s->conn_list, conn);
 211        conn->processing = true;
 212    }
 213
 214    if (mode == PRIMARY_IN) {
 215        if (!colo_insert_packet(&conn->primary_list, pkt, &conn->pack)) {
 216            error_report("colo compare primary queue size too big,"
 217                         "drop packet");
 218        }
 219    } else {
 220        if (!colo_insert_packet(&conn->secondary_list, pkt, &conn->sack)) {
 221            error_report("colo compare secondary queue size too big,"
 222                         "drop packet");
 223        }
 224    }
 225    *con = conn;
 226
 227    return 0;
 228}
 229
 230static inline bool after(uint32_t seq1, uint32_t seq2)
 231{
 232        return (int32_t)(seq1 - seq2) > 0;
 233}
 234
 235static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
 236{
 237    int ret;
 238    ret = compare_chr_send(s,
 239                           pkt->data,
 240                           pkt->size,
 241                           pkt->vnet_hdr_len);
 242    if (ret < 0) {
 243        error_report("colo send primary packet failed");
 244    }
 245    trace_colo_compare_main("packet same and release packet");
 246    packet_destroy(pkt, NULL);
 247}
 248
 249/*
 250 * The IP packets sent by primary and secondary
 251 * will be compared in here
 252 * TODO support ip fragment, Out-Of-Order
 253 * return:    0  means packet same
 254 *            > 0 || < 0 means packet different
 255 */
 256static int colo_compare_packet_payload(Packet *ppkt,
 257                                       Packet *spkt,
 258                                       uint16_t poffset,
 259                                       uint16_t soffset,
 260                                       uint16_t len)
 261
 262{
 263    if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
 264        char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
 265
 266        strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
 267        strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
 268        strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
 269        strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
 270
 271        trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
 272                                   pri_ip_dst, spkt->size,
 273                                   sec_ip_src, sec_ip_dst);
 274    }
 275
 276    return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
 277}
 278
 279/*
 280 * return true means that the payload is consist and
 281 * need to make the next comparison, false means do
 282 * the checkpoint
 283*/
 284static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
 285                              int8_t *mark, uint32_t max_ack)
 286{
 287    *mark = 0;
 288
 289    if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
 290        if (colo_compare_packet_payload(ppkt, spkt,
 291                                        ppkt->header_size, spkt->header_size,
 292                                        ppkt->payload_size)) {
 293            *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
 294            return true;
 295        }
 296    }
 297
 298    /* one part of secondary packet payload still need to be compared */
 299    if (!after(ppkt->seq_end, spkt->seq_end)) {
 300        if (colo_compare_packet_payload(ppkt, spkt,
 301                                        ppkt->header_size + ppkt->offset,
 302                                        spkt->header_size + spkt->offset,
 303                                        ppkt->payload_size - ppkt->offset)) {
 304            if (!after(ppkt->tcp_ack, max_ack)) {
 305                *mark = COLO_COMPARE_FREE_PRIMARY;
 306                spkt->offset += ppkt->payload_size - ppkt->offset;
 307                return true;
 308            } else {
 309                /* secondary guest hasn't ack the data, don't send
 310                 * out this packet
 311                 */
 312                return false;
 313            }
 314        }
 315    } else {
 316        /* primary packet is longer than secondary packet, compare
 317         * the same part and mark the primary packet offset
 318         */
 319        if (colo_compare_packet_payload(ppkt, spkt,
 320                                        ppkt->header_size + ppkt->offset,
 321                                        spkt->header_size + spkt->offset,
 322                                        spkt->payload_size - spkt->offset)) {
 323            *mark = COLO_COMPARE_FREE_SECONDARY;
 324            ppkt->offset += spkt->payload_size - spkt->offset;
 325            return true;
 326        }
 327    }
 328
 329    return false;
 330}
 331
 332static void colo_compare_tcp(CompareState *s, Connection *conn)
 333{
 334    Packet *ppkt = NULL, *spkt = NULL;
 335    int8_t mark;
 336
 337    /*
 338     * If ppkt and spkt have the same payload, but ppkt's ACK
 339     * is greater than spkt's ACK, in this case we can not
 340     * send the ppkt because it will cause the secondary guest
 341     * to miss sending some data in the next. Therefore, we
 342     * record the maximum ACK in the current queue at both
 343     * primary side and secondary side. Only when the ack is
 344     * less than the smaller of the two maximum ack, then we
 345     * can ensure that the packet's payload is acknowledged by
 346     * primary and secondary.
 347    */
 348    uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
 349
 350pri:
 351    if (g_queue_is_empty(&conn->primary_list)) {
 352        return;
 353    }
 354    ppkt = g_queue_pop_head(&conn->primary_list);
 355sec:
 356    if (g_queue_is_empty(&conn->secondary_list)) {
 357        g_queue_push_head(&conn->primary_list, ppkt);
 358        return;
 359    }
 360    spkt = g_queue_pop_head(&conn->secondary_list);
 361
 362    if (ppkt->tcp_seq == ppkt->seq_end) {
 363        colo_release_primary_pkt(s, ppkt);
 364        ppkt = NULL;
 365    }
 366
 367    if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
 368        trace_colo_compare_main("pri: this packet has compared");
 369        colo_release_primary_pkt(s, ppkt);
 370        ppkt = NULL;
 371    }
 372
 373    if (spkt->tcp_seq == spkt->seq_end) {
 374        packet_destroy(spkt, NULL);
 375        if (!ppkt) {
 376            goto pri;
 377        } else {
 378            goto sec;
 379        }
 380    } else {
 381        if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
 382            trace_colo_compare_main("sec: this packet has compared");
 383            packet_destroy(spkt, NULL);
 384            if (!ppkt) {
 385                goto pri;
 386            } else {
 387                goto sec;
 388            }
 389        }
 390        if (!ppkt) {
 391            g_queue_push_head(&conn->secondary_list, spkt);
 392            goto pri;
 393        }
 394    }
 395
 396    if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
 397        trace_colo_compare_tcp_info("pri",
 398                                    ppkt->tcp_seq, ppkt->tcp_ack,
 399                                    ppkt->header_size, ppkt->payload_size,
 400                                    ppkt->offset, ppkt->flags);
 401
 402        trace_colo_compare_tcp_info("sec",
 403                                    spkt->tcp_seq, spkt->tcp_ack,
 404                                    spkt->header_size, spkt->payload_size,
 405                                    spkt->offset, spkt->flags);
 406
 407        if (mark == COLO_COMPARE_FREE_PRIMARY) {
 408            conn->compare_seq = ppkt->seq_end;
 409            colo_release_primary_pkt(s, ppkt);
 410            g_queue_push_head(&conn->secondary_list, spkt);
 411            goto pri;
 412        }
 413        if (mark == COLO_COMPARE_FREE_SECONDARY) {
 414            conn->compare_seq = spkt->seq_end;
 415            packet_destroy(spkt, NULL);
 416            goto sec;
 417        }
 418        if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
 419            conn->compare_seq = ppkt->seq_end;
 420            colo_release_primary_pkt(s, ppkt);
 421            packet_destroy(spkt, NULL);
 422            goto pri;
 423        }
 424    } else {
 425        g_queue_push_head(&conn->primary_list, ppkt);
 426        g_queue_push_head(&conn->secondary_list, spkt);
 427
 428        qemu_hexdump((char *)ppkt->data, stderr,
 429                     "colo-compare ppkt", ppkt->size);
 430        qemu_hexdump((char *)spkt->data, stderr,
 431                     "colo-compare spkt", spkt->size);
 432
 433        colo_compare_inconsistency_notify();
 434    }
 435}
 436
 437
 438/*
 439 * Called from the compare thread on the primary
 440 * for compare udp packet
 441 */
 442static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
 443{
 444    uint16_t network_header_length = ppkt->ip->ip_hl << 2;
 445    uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
 446
 447    trace_colo_compare_main("compare udp");
 448
 449    /*
 450     * Because of ppkt and spkt are both in the same connection,
 451     * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
 452     * same with spkt. In addition, IP header's Identification is a random
 453     * field, we can handle it in IP fragmentation function later.
 454     * COLO just concern the response net packet payload from primary guest
 455     * and secondary guest are same or not, So we ignored all IP header include
 456     * other field like TOS,TTL,IP Checksum. we only need to compare
 457     * the ip payload here.
 458     */
 459    if (ppkt->size != spkt->size) {
 460        trace_colo_compare_main("UDP: payload size of packets are different");
 461        return -1;
 462    }
 463    if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
 464                                    ppkt->size - offset)) {
 465        trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
 466        trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
 467        if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
 468            qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
 469                         ppkt->size);
 470            qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
 471                         spkt->size);
 472        }
 473        return -1;
 474    } else {
 475        return 0;
 476    }
 477}
 478
 479/*
 480 * Called from the compare thread on the primary
 481 * for compare icmp packet
 482 */
 483static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
 484{
 485    uint16_t network_header_length = ppkt->ip->ip_hl << 2;
 486    uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
 487
 488    trace_colo_compare_main("compare icmp");
 489
 490    /*
 491     * Because of ppkt and spkt are both in the same connection,
 492     * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
 493     * same with spkt. In addition, IP header's Identification is a random
 494     * field, we can handle it in IP fragmentation function later.
 495     * COLO just concern the response net packet payload from primary guest
 496     * and secondary guest are same or not, So we ignored all IP header include
 497     * other field like TOS,TTL,IP Checksum. we only need to compare
 498     * the ip payload here.
 499     */
 500    if (ppkt->size != spkt->size) {
 501        trace_colo_compare_main("ICMP: payload size of packets are different");
 502        return -1;
 503    }
 504    if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
 505                                    ppkt->size - offset)) {
 506        trace_colo_compare_icmp_miscompare("primary pkt size",
 507                                           ppkt->size);
 508        trace_colo_compare_icmp_miscompare("Secondary pkt size",
 509                                           spkt->size);
 510        if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
 511            qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
 512                         ppkt->size);
 513            qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
 514                         spkt->size);
 515        }
 516        return -1;
 517    } else {
 518        return 0;
 519    }
 520}
 521
 522/*
 523 * Called from the compare thread on the primary
 524 * for compare other packet
 525 */
 526static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
 527{
 528    uint16_t offset = ppkt->vnet_hdr_len;
 529
 530    trace_colo_compare_main("compare other");
 531    if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
 532        char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
 533
 534        strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
 535        strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
 536        strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
 537        strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
 538
 539        trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
 540                                   pri_ip_dst, spkt->size,
 541                                   sec_ip_src, sec_ip_dst);
 542    }
 543
 544    if (ppkt->size != spkt->size) {
 545        trace_colo_compare_main("Other: payload size of packets are different");
 546        return -1;
 547    }
 548    return colo_compare_packet_payload(ppkt, spkt, offset, offset,
 549                                       ppkt->size - offset);
 550}
 551
 552static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
 553{
 554    int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 555
 556    if ((now - pkt->creation_ms) > (*check_time)) {
 557        trace_colo_old_packet_check_found(pkt->creation_ms);
 558        return 0;
 559    } else {
 560        return 1;
 561    }
 562}
 563
 564void colo_compare_register_notifier(Notifier *notify)
 565{
 566    notifier_list_add(&colo_compare_notifiers, notify);
 567}
 568
 569void colo_compare_unregister_notifier(Notifier *notify)
 570{
 571    notifier_remove(notify);
 572}
 573
 574static int colo_old_packet_check_one_conn(Connection *conn,
 575                                           void *user_data)
 576{
 577    GList *result = NULL;
 578    int64_t check_time = REGULAR_PACKET_CHECK_MS;
 579
 580    result = g_queue_find_custom(&conn->primary_list,
 581                                 &check_time,
 582                                 (GCompareFunc)colo_old_packet_check_one);
 583
 584    if (result) {
 585        /* Do checkpoint will flush old packet */
 586        colo_compare_inconsistency_notify();
 587        return 0;
 588    }
 589
 590    return 1;
 591}
 592
 593/*
 594 * Look for old packets that the secondary hasn't matched,
 595 * if we have some then we have to checkpoint to wake
 596 * the secondary up.
 597 */
 598static void colo_old_packet_check(void *opaque)
 599{
 600    CompareState *s = opaque;
 601
 602    /*
 603     * If we find one old packet, stop finding job and notify
 604     * COLO frame do checkpoint.
 605     */
 606    g_queue_find_custom(&s->conn_list, NULL,
 607                        (GCompareFunc)colo_old_packet_check_one_conn);
 608}
 609
 610static void colo_compare_packet(CompareState *s, Connection *conn,
 611                                int (*HandlePacket)(Packet *spkt,
 612                                Packet *ppkt))
 613{
 614    Packet *pkt = NULL;
 615    GList *result = NULL;
 616
 617    while (!g_queue_is_empty(&conn->primary_list) &&
 618           !g_queue_is_empty(&conn->secondary_list)) {
 619        pkt = g_queue_pop_head(&conn->primary_list);
 620        result = g_queue_find_custom(&conn->secondary_list,
 621                 pkt, (GCompareFunc)HandlePacket);
 622
 623        if (result) {
 624            colo_release_primary_pkt(s, pkt);
 625            g_queue_remove(&conn->secondary_list, result->data);
 626        } else {
 627            /*
 628             * If one packet arrive late, the secondary_list or
 629             * primary_list will be empty, so we can't compare it
 630             * until next comparison. If the packets in the list are
 631             * timeout, it will trigger a checkpoint request.
 632             */
 633            trace_colo_compare_main("packet different");
 634            g_queue_push_head(&conn->primary_list, pkt);
 635            colo_compare_inconsistency_notify();
 636            break;
 637        }
 638    }
 639}
 640
 641/*
 642 * Called from the compare thread on the primary
 643 * for compare packet with secondary list of the
 644 * specified connection when a new packet was
 645 * queued to it.
 646 */
 647static void colo_compare_connection(void *opaque, void *user_data)
 648{
 649    CompareState *s = user_data;
 650    Connection *conn = opaque;
 651
 652    switch (conn->ip_proto) {
 653    case IPPROTO_TCP:
 654        colo_compare_tcp(s, conn);
 655        break;
 656    case IPPROTO_UDP:
 657        colo_compare_packet(s, conn, colo_packet_compare_udp);
 658        break;
 659    case IPPROTO_ICMP:
 660        colo_compare_packet(s, conn, colo_packet_compare_icmp);
 661        break;
 662    default:
 663        colo_compare_packet(s, conn, colo_packet_compare_other);
 664        break;
 665    }
 666}
 667
 668static int compare_chr_send(CompareState *s,
 669                            const uint8_t *buf,
 670                            uint32_t size,
 671                            uint32_t vnet_hdr_len)
 672{
 673    int ret = 0;
 674    uint32_t len = htonl(size);
 675
 676    if (!size) {
 677        return 0;
 678    }
 679
 680    ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
 681    if (ret != sizeof(len)) {
 682        goto err;
 683    }
 684
 685    if (s->vnet_hdr) {
 686        /*
 687         * We send vnet header len make other module(like filter-redirector)
 688         * know how to parse net packet correctly.
 689         */
 690        len = htonl(vnet_hdr_len);
 691        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
 692        if (ret != sizeof(len)) {
 693            goto err;
 694        }
 695    }
 696
 697    ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
 698    if (ret != size) {
 699        goto err;
 700    }
 701
 702    return 0;
 703
 704err:
 705    return ret < 0 ? ret : -EIO;
 706}
 707
 708static int compare_chr_can_read(void *opaque)
 709{
 710    return COMPARE_READ_LEN_MAX;
 711}
 712
 713/*
 714 * Called from the main thread on the primary for packets
 715 * arriving over the socket from the primary.
 716 */
 717static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
 718{
 719    CompareState *s = COLO_COMPARE(opaque);
 720    int ret;
 721
 722    ret = net_fill_rstate(&s->pri_rs, buf, size);
 723    if (ret == -1) {
 724        qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
 725                                 NULL, NULL, true);
 726        error_report("colo-compare primary_in error");
 727    }
 728}
 729
 730/*
 731 * Called from the main thread on the primary for packets
 732 * arriving over the socket from the secondary.
 733 */
 734static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
 735{
 736    CompareState *s = COLO_COMPARE(opaque);
 737    int ret;
 738
 739    ret = net_fill_rstate(&s->sec_rs, buf, size);
 740    if (ret == -1) {
 741        qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
 742                                 NULL, NULL, true);
 743        error_report("colo-compare secondary_in error");
 744    }
 745}
 746
 747/*
 748 * Check old packet regularly so it can watch for any packets
 749 * that the secondary hasn't produced equivalents of.
 750 */
 751static void check_old_packet_regular(void *opaque)
 752{
 753    CompareState *s = opaque;
 754
 755    /* if have old packet we will notify checkpoint */
 756    colo_old_packet_check(s);
 757    timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
 758                REGULAR_PACKET_CHECK_MS);
 759}
 760
 761/* Public API, Used for COLO frame to notify compare event */
 762void colo_notify_compares_event(void *opaque, int event, Error **errp)
 763{
 764    CompareState *s;
 765
 766    qemu_mutex_lock(&event_mtx);
 767    QTAILQ_FOREACH(s, &net_compares, next) {
 768        s->event = event;
 769        qemu_bh_schedule(s->event_bh);
 770        event_unhandled_count++;
 771    }
 772    /* Wait all compare threads to finish handling this event */
 773    while (event_unhandled_count > 0) {
 774        qemu_cond_wait(&event_complete_cond, &event_mtx);
 775    }
 776
 777    qemu_mutex_unlock(&event_mtx);
 778}
 779
 780static void colo_compare_timer_init(CompareState *s)
 781{
 782    AioContext *ctx = iothread_get_aio_context(s->iothread);
 783
 784    s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
 785                                SCALE_MS, check_old_packet_regular,
 786                                s);
 787    timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
 788                    REGULAR_PACKET_CHECK_MS);
 789}
 790
 791static void colo_compare_timer_del(CompareState *s)
 792{
 793    if (s->packet_check_timer) {
 794        timer_del(s->packet_check_timer);
 795        timer_free(s->packet_check_timer);
 796        s->packet_check_timer = NULL;
 797    }
 798 }
 799
 800static void colo_flush_packets(void *opaque, void *user_data);
 801
 802static void colo_compare_handle_event(void *opaque)
 803{
 804    CompareState *s = opaque;
 805
 806    switch (s->event) {
 807    case COLO_EVENT_CHECKPOINT:
 808        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
 809        break;
 810    case COLO_EVENT_FAILOVER:
 811        break;
 812    default:
 813        break;
 814    }
 815
 816    assert(event_unhandled_count > 0);
 817
 818    qemu_mutex_lock(&event_mtx);
 819    event_unhandled_count--;
 820    qemu_cond_broadcast(&event_complete_cond);
 821    qemu_mutex_unlock(&event_mtx);
 822}
 823
 824static void colo_compare_iothread(CompareState *s)
 825{
 826    object_ref(OBJECT(s->iothread));
 827    s->worker_context = iothread_get_g_main_context(s->iothread);
 828
 829    qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
 830                             compare_pri_chr_in, NULL, NULL,
 831                             s, s->worker_context, true);
 832    qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
 833                             compare_sec_chr_in, NULL, NULL,
 834                             s, s->worker_context, true);
 835
 836    colo_compare_timer_init(s);
 837    s->event_bh = qemu_bh_new(colo_compare_handle_event, s);
 838}
 839
 840static char *compare_get_pri_indev(Object *obj, Error **errp)
 841{
 842    CompareState *s = COLO_COMPARE(obj);
 843
 844    return g_strdup(s->pri_indev);
 845}
 846
 847static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
 848{
 849    CompareState *s = COLO_COMPARE(obj);
 850
 851    g_free(s->pri_indev);
 852    s->pri_indev = g_strdup(value);
 853}
 854
 855static char *compare_get_sec_indev(Object *obj, Error **errp)
 856{
 857    CompareState *s = COLO_COMPARE(obj);
 858
 859    return g_strdup(s->sec_indev);
 860}
 861
 862static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
 863{
 864    CompareState *s = COLO_COMPARE(obj);
 865
 866    g_free(s->sec_indev);
 867    s->sec_indev = g_strdup(value);
 868}
 869
 870static char *compare_get_outdev(Object *obj, Error **errp)
 871{
 872    CompareState *s = COLO_COMPARE(obj);
 873
 874    return g_strdup(s->outdev);
 875}
 876
 877static void compare_set_outdev(Object *obj, const char *value, Error **errp)
 878{
 879    CompareState *s = COLO_COMPARE(obj);
 880
 881    g_free(s->outdev);
 882    s->outdev = g_strdup(value);
 883}
 884
 885static bool compare_get_vnet_hdr(Object *obj, Error **errp)
 886{
 887    CompareState *s = COLO_COMPARE(obj);
 888
 889    return s->vnet_hdr;
 890}
 891
 892static void compare_set_vnet_hdr(Object *obj,
 893                                 bool value,
 894                                 Error **errp)
 895{
 896    CompareState *s = COLO_COMPARE(obj);
 897
 898    s->vnet_hdr = value;
 899}
 900
 901static void compare_pri_rs_finalize(SocketReadState *pri_rs)
 902{
 903    CompareState *s = container_of(pri_rs, CompareState, pri_rs);
 904    Connection *conn = NULL;
 905
 906    if (packet_enqueue(s, PRIMARY_IN, &conn)) {
 907        trace_colo_compare_main("primary: unsupported packet in");
 908        compare_chr_send(s,
 909                         pri_rs->buf,
 910                         pri_rs->packet_len,
 911                         pri_rs->vnet_hdr_len);
 912    } else {
 913        /* compare packet in the specified connection */
 914        colo_compare_connection(conn, s);
 915    }
 916}
 917
 918static void compare_sec_rs_finalize(SocketReadState *sec_rs)
 919{
 920    CompareState *s = container_of(sec_rs, CompareState, sec_rs);
 921    Connection *conn = NULL;
 922
 923    if (packet_enqueue(s, SECONDARY_IN, &conn)) {
 924        trace_colo_compare_main("secondary: unsupported packet in");
 925    } else {
 926        /* compare packet in the specified connection */
 927        colo_compare_connection(conn, s);
 928    }
 929}
 930
 931
 932/*
 933 * Return 0 is success.
 934 * Return 1 is failed.
 935 */
 936static int find_and_check_chardev(Chardev **chr,
 937                                  char *chr_name,
 938                                  Error **errp)
 939{
 940    *chr = qemu_chr_find(chr_name);
 941    if (*chr == NULL) {
 942        error_setg(errp, "Device '%s' not found",
 943                   chr_name);
 944        return 1;
 945    }
 946
 947    if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
 948        error_setg(errp, "chardev \"%s\" is not reconnectable",
 949                   chr_name);
 950        return 1;
 951    }
 952
 953    if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) {
 954        error_setg(errp, "chardev \"%s\" cannot switch context",
 955                   chr_name);
 956        return 1;
 957    }
 958
 959    return 0;
 960}
 961
 962/*
 963 * Called from the main thread on the primary
 964 * to setup colo-compare.
 965 */
 966static void colo_compare_complete(UserCreatable *uc, Error **errp)
 967{
 968    CompareState *s = COLO_COMPARE(uc);
 969    Chardev *chr;
 970
 971    if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
 972        error_setg(errp, "colo compare needs 'primary_in' ,"
 973                   "'secondary_in','outdev','iothread' property set");
 974        return;
 975    } else if (!strcmp(s->pri_indev, s->outdev) ||
 976               !strcmp(s->sec_indev, s->outdev) ||
 977               !strcmp(s->pri_indev, s->sec_indev)) {
 978        error_setg(errp, "'indev' and 'outdev' could not be same "
 979                   "for compare module");
 980        return;
 981    }
 982
 983    if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
 984        !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
 985        return;
 986    }
 987
 988    if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
 989        !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
 990        return;
 991    }
 992
 993    if (find_and_check_chardev(&chr, s->outdev, errp) ||
 994        !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
 995        return;
 996    }
 997
 998    net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
 999    net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
1000
1001    QTAILQ_INSERT_TAIL(&net_compares, s, next);
1002
1003    g_queue_init(&s->conn_list);
1004
1005    qemu_mutex_init(&event_mtx);
1006    qemu_cond_init(&event_complete_cond);
1007
1008    s->connection_track_table = g_hash_table_new_full(connection_key_hash,
1009                                                      connection_key_equal,
1010                                                      g_free,
1011                                                      connection_destroy);
1012
1013    colo_compare_iothread(s);
1014    return;
1015}
1016
1017static void colo_flush_packets(void *opaque, void *user_data)
1018{
1019    CompareState *s = user_data;
1020    Connection *conn = opaque;
1021    Packet *pkt = NULL;
1022
1023    while (!g_queue_is_empty(&conn->primary_list)) {
1024        pkt = g_queue_pop_head(&conn->primary_list);
1025        compare_chr_send(s,
1026                         pkt->data,
1027                         pkt->size,
1028                         pkt->vnet_hdr_len);
1029        packet_destroy(pkt, NULL);
1030    }
1031    while (!g_queue_is_empty(&conn->secondary_list)) {
1032        pkt = g_queue_pop_head(&conn->secondary_list);
1033        packet_destroy(pkt, NULL);
1034    }
1035}
1036
1037static void colo_compare_class_init(ObjectClass *oc, void *data)
1038{
1039    UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
1040
1041    ucc->complete = colo_compare_complete;
1042}
1043
1044static void colo_compare_init(Object *obj)
1045{
1046    CompareState *s = COLO_COMPARE(obj);
1047
1048    object_property_add_str(obj, "primary_in",
1049                            compare_get_pri_indev, compare_set_pri_indev,
1050                            NULL);
1051    object_property_add_str(obj, "secondary_in",
1052                            compare_get_sec_indev, compare_set_sec_indev,
1053                            NULL);
1054    object_property_add_str(obj, "outdev",
1055                            compare_get_outdev, compare_set_outdev,
1056                            NULL);
1057    object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
1058                            (Object **)&s->iothread,
1059                            object_property_allow_set_link,
1060                            OBJ_PROP_LINK_STRONG, NULL);
1061
1062    s->vnet_hdr = false;
1063    object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
1064                             compare_set_vnet_hdr, NULL);
1065}
1066
1067static void colo_compare_finalize(Object *obj)
1068{
1069    CompareState *s = COLO_COMPARE(obj);
1070    CompareState *tmp = NULL;
1071
1072    qemu_chr_fe_deinit(&s->chr_pri_in, false);
1073    qemu_chr_fe_deinit(&s->chr_sec_in, false);
1074    qemu_chr_fe_deinit(&s->chr_out, false);
1075    if (s->iothread) {
1076        colo_compare_timer_del(s);
1077    }
1078
1079    qemu_bh_delete(s->event_bh);
1080
1081    QTAILQ_FOREACH(tmp, &net_compares, next) {
1082        if (tmp == s) {
1083            QTAILQ_REMOVE(&net_compares, s, next);
1084            break;
1085        }
1086    }
1087
1088    /* Release all unhandled packets after compare thead exited */
1089    g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1090
1091    g_queue_clear(&s->conn_list);
1092
1093    if (s->connection_track_table) {
1094        g_hash_table_destroy(s->connection_track_table);
1095    }
1096
1097    if (s->iothread) {
1098        object_unref(OBJECT(s->iothread));
1099    }
1100
1101    qemu_mutex_destroy(&event_mtx);
1102    qemu_cond_destroy(&event_complete_cond);
1103
1104    g_free(s->pri_indev);
1105    g_free(s->sec_indev);
1106    g_free(s->outdev);
1107}
1108
1109static const TypeInfo colo_compare_info = {
1110    .name = TYPE_COLO_COMPARE,
1111    .parent = TYPE_OBJECT,
1112    .instance_size = sizeof(CompareState),
1113    .instance_init = colo_compare_init,
1114    .instance_finalize = colo_compare_finalize,
1115    .class_size = sizeof(CompareClass),
1116    .class_init = colo_compare_class_init,
1117    .interfaces = (InterfaceInfo[]) {
1118        { TYPE_USER_CREATABLE },
1119        { }
1120    }
1121};
1122
1123static void register_types(void)
1124{
1125    type_register_static(&colo_compare_info);
1126}
1127
1128type_init(register_types);
1129