qemu/net/colo-compare.c
<<
>>
Prefs
   1/*
   2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
   3 * (a.k.a. Fault Tolerance or Continuous Replication)
   4 *
   5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 * Copyright (c) 2016 Intel Corporation
   8 *
   9 * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
  10 *
  11 * This work is licensed under the terms of the GNU GPL, version 2 or
  12 * later.  See the COPYING file in the top-level directory.
  13 */
  14
  15#include "qemu/osdep.h"
  16#include "qemu-common.h"
  17#include "qemu/error-report.h"
  18#include "trace.h"
  19#include "qapi/error.h"
  20#include "net/net.h"
  21#include "net/eth.h"
  22#include "qom/object_interfaces.h"
  23#include "qemu/iov.h"
  24#include "qom/object.h"
  25#include "net/queue.h"
  26#include "chardev/char-fe.h"
  27#include "qemu/sockets.h"
  28#include "colo.h"
  29#include "sysemu/iothread.h"
  30#include "net/colo-compare.h"
  31#include "migration/colo.h"
  32#include "migration/migration.h"
  33#include "util.h"
  34
  35#define TYPE_COLO_COMPARE "colo-compare"
  36#define COLO_COMPARE(obj) \
  37    OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
  38
  39static QTAILQ_HEAD(, CompareState) net_compares =
  40       QTAILQ_HEAD_INITIALIZER(net_compares);
  41
  42static NotifierList colo_compare_notifiers =
  43    NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
  44
  45#define COMPARE_READ_LEN_MAX NET_BUFSIZE
  46#define MAX_QUEUE_SIZE 1024
  47
  48#define COLO_COMPARE_FREE_PRIMARY     0x01
  49#define COLO_COMPARE_FREE_SECONDARY   0x02
  50
  51#define REGULAR_PACKET_CHECK_MS 3000
  52#define DEFAULT_TIME_OUT_MS 3000
  53
  54static QemuMutex event_mtx;
  55static QemuCond event_complete_cond;
  56static int event_unhandled_count;
  57
  58/*
  59 *  + CompareState ++
  60 *  |               |
  61 *  +---------------+   +---------------+         +---------------+
  62 *  |   conn list   + - >      conn     + ------- >      conn     + -- > ......
  63 *  +---------------+   +---------------+         +---------------+
  64 *  |               |     |           |             |          |
  65 *  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
  66 *                    |primary |  |secondary    |primary | |secondary
  67 *                    |packet  |  |packet  +    |packet  | |packet  +
  68 *                    +--------+  +--------+    +--------+ +--------+
  69 *                        |           |             |          |
  70 *                    +---v----+  +---v----+    +---v----+ +---v----+
  71 *                    |primary |  |secondary    |primary | |secondary
  72 *                    |packet  |  |packet  +    |packet  | |packet  +
  73 *                    +--------+  +--------+    +--------+ +--------+
  74 *                        |           |             |          |
  75 *                    +---v----+  +---v----+    +---v----+ +---v----+
  76 *                    |primary |  |secondary    |primary | |secondary
  77 *                    |packet  |  |packet  +    |packet  | |packet  +
  78 *                    +--------+  +--------+    +--------+ +--------+
  79 */
  80typedef struct CompareState {
  81    Object parent;
  82
  83    char *pri_indev;
  84    char *sec_indev;
  85    char *outdev;
  86    char *notify_dev;
  87    CharBackend chr_pri_in;
  88    CharBackend chr_sec_in;
  89    CharBackend chr_out;
  90    CharBackend chr_notify_dev;
  91    SocketReadState pri_rs;
  92    SocketReadState sec_rs;
  93    SocketReadState notify_rs;
  94    bool vnet_hdr;
  95    uint32_t compare_timeout;
  96    uint32_t expired_scan_cycle;
  97
  98    /*
  99     * Record the connection that through the NIC
 100     * Element type: Connection
 101     */
 102    GQueue conn_list;
 103    /* Record the connection without repetition */
 104    GHashTable *connection_track_table;
 105
 106    IOThread *iothread;
 107    GMainContext *worker_context;
 108    QEMUTimer *packet_check_timer;
 109
 110    QEMUBH *event_bh;
 111    enum colo_event event;
 112
 113    QTAILQ_ENTRY(CompareState) next;
 114} CompareState;
 115
 116typedef struct CompareClass {
 117    ObjectClass parent_class;
 118} CompareClass;
 119
 120enum {
 121    PRIMARY_IN = 0,
 122    SECONDARY_IN,
 123};
 124
 125
 126static int compare_chr_send(CompareState *s,
 127                            const uint8_t *buf,
 128                            uint32_t size,
 129                            uint32_t vnet_hdr_len,
 130                            bool notify_remote_frame);
 131
 132static bool packet_matches_str(const char *str,
 133                               const uint8_t *buf,
 134                               uint32_t packet_len)
 135{
 136    if (packet_len != strlen(str)) {
 137        return false;
 138    }
 139
 140    return !memcmp(str, buf, strlen(str));
 141}
 142
 143static void notify_remote_frame(CompareState *s)
 144{
 145    char msg[] = "DO_CHECKPOINT";
 146    int ret = 0;
 147
 148    ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
 149    if (ret < 0) {
 150        error_report("Notify Xen COLO-frame failed");
 151    }
 152}
 153
 154static void colo_compare_inconsistency_notify(CompareState *s)
 155{
 156    if (s->notify_dev) {
 157        notify_remote_frame(s);
 158    } else {
 159        notifier_list_notify(&colo_compare_notifiers,
 160                             migrate_get_current());
 161    }
 162}
 163
 164static gint seq_sorter(Packet *a, Packet *b, gpointer data)
 165{
 166    struct tcp_hdr *atcp, *btcp;
 167
 168    atcp = (struct tcp_hdr *)(a->transport_header);
 169    btcp = (struct tcp_hdr *)(b->transport_header);
 170    return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
 171}
 172
 173static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
 174{
 175    Packet *pkt = data;
 176    struct tcp_hdr *tcphd;
 177
 178    tcphd = (struct tcp_hdr *)pkt->transport_header;
 179
 180    pkt->tcp_seq = ntohl(tcphd->th_seq);
 181    pkt->tcp_ack = ntohl(tcphd->th_ack);
 182    *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
 183    pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
 184                       + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
 185    pkt->payload_size = pkt->size - pkt->header_size;
 186    pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
 187    pkt->flags = tcphd->th_flags;
 188}
 189
 190/*
 191 * Return 1 on success, if return 0 means the
 192 * packet will be dropped
 193 */
 194static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
 195{
 196    if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) {
 197        if (pkt->ip->ip_p == IPPROTO_TCP) {
 198            fill_pkt_tcp_info(pkt, max_ack);
 199            g_queue_insert_sorted(queue,
 200                                  pkt,
 201                                  (GCompareDataFunc)seq_sorter,
 202                                  NULL);
 203        } else {
 204            g_queue_push_tail(queue, pkt);
 205        }
 206        return 1;
 207    }
 208    return 0;
 209}
 210
 211/*
 212 * Return 0 on success, if return -1 means the pkt
 213 * is unsupported(arp and ipv6) and will be sent later
 214 */
 215static int packet_enqueue(CompareState *s, int mode, Connection **con)
 216{
 217    ConnectionKey key;
 218    Packet *pkt = NULL;
 219    Connection *conn;
 220
 221    if (mode == PRIMARY_IN) {
 222        pkt = packet_new(s->pri_rs.buf,
 223                         s->pri_rs.packet_len,
 224                         s->pri_rs.vnet_hdr_len);
 225    } else {
 226        pkt = packet_new(s->sec_rs.buf,
 227                         s->sec_rs.packet_len,
 228                         s->sec_rs.vnet_hdr_len);
 229    }
 230
 231    if (parse_packet_early(pkt)) {
 232        packet_destroy(pkt, NULL);
 233        pkt = NULL;
 234        return -1;
 235    }
 236    fill_connection_key(pkt, &key);
 237
 238    conn = connection_get(s->connection_track_table,
 239                          &key,
 240                          &s->conn_list);
 241
 242    if (!conn->processing) {
 243        g_queue_push_tail(&s->conn_list, conn);
 244        conn->processing = true;
 245    }
 246
 247    if (mode == PRIMARY_IN) {
 248        if (!colo_insert_packet(&conn->primary_list, pkt, &conn->pack)) {
 249            error_report("colo compare primary queue size too big,"
 250                         "drop packet");
 251        }
 252    } else {
 253        if (!colo_insert_packet(&conn->secondary_list, pkt, &conn->sack)) {
 254            error_report("colo compare secondary queue size too big,"
 255                         "drop packet");
 256        }
 257    }
 258    *con = conn;
 259
 260    return 0;
 261}
 262
 263static inline bool after(uint32_t seq1, uint32_t seq2)
 264{
 265        return (int32_t)(seq1 - seq2) > 0;
 266}
 267
 268static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
 269{
 270    int ret;
 271    ret = compare_chr_send(s,
 272                           pkt->data,
 273                           pkt->size,
 274                           pkt->vnet_hdr_len,
 275                           false);
 276    if (ret < 0) {
 277        error_report("colo send primary packet failed");
 278    }
 279    trace_colo_compare_main("packet same and release packet");
 280    packet_destroy(pkt, NULL);
 281}
 282
 283/*
 284 * The IP packets sent by primary and secondary
 285 * will be compared in here
 286 * TODO support ip fragment, Out-Of-Order
 287 * return:    0  means packet same
 288 *            > 0 || < 0 means packet different
 289 */
 290static int colo_compare_packet_payload(Packet *ppkt,
 291                                       Packet *spkt,
 292                                       uint16_t poffset,
 293                                       uint16_t soffset,
 294                                       uint16_t len)
 295
 296{
 297    if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
 298        char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
 299
 300        strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
 301        strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
 302        strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
 303        strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
 304
 305        trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
 306                                   pri_ip_dst, spkt->size,
 307                                   sec_ip_src, sec_ip_dst);
 308    }
 309
 310    return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
 311}
 312
 313/*
 314 * return true means that the payload is consist and
 315 * need to make the next comparison, false means do
 316 * the checkpoint
 317*/
 318static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
 319                              int8_t *mark, uint32_t max_ack)
 320{
 321    *mark = 0;
 322
 323    if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
 324        if (!colo_compare_packet_payload(ppkt, spkt,
 325                                        ppkt->header_size, spkt->header_size,
 326                                        ppkt->payload_size)) {
 327            *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
 328            return true;
 329        }
 330    }
 331
 332    /* one part of secondary packet payload still need to be compared */
 333    if (!after(ppkt->seq_end, spkt->seq_end)) {
 334        if (!colo_compare_packet_payload(ppkt, spkt,
 335                                        ppkt->header_size + ppkt->offset,
 336                                        spkt->header_size + spkt->offset,
 337                                        ppkt->payload_size - ppkt->offset)) {
 338            if (!after(ppkt->tcp_ack, max_ack)) {
 339                *mark = COLO_COMPARE_FREE_PRIMARY;
 340                spkt->offset += ppkt->payload_size - ppkt->offset;
 341                return true;
 342            } else {
 343                /* secondary guest hasn't ack the data, don't send
 344                 * out this packet
 345                 */
 346                return false;
 347            }
 348        }
 349    } else {
 350        /* primary packet is longer than secondary packet, compare
 351         * the same part and mark the primary packet offset
 352         */
 353        if (!colo_compare_packet_payload(ppkt, spkt,
 354                                        ppkt->header_size + ppkt->offset,
 355                                        spkt->header_size + spkt->offset,
 356                                        spkt->payload_size - spkt->offset)) {
 357            *mark = COLO_COMPARE_FREE_SECONDARY;
 358            ppkt->offset += spkt->payload_size - spkt->offset;
 359            return true;
 360        }
 361    }
 362
 363    return false;
 364}
 365
 366static void colo_compare_tcp(CompareState *s, Connection *conn)
 367{
 368    Packet *ppkt = NULL, *spkt = NULL;
 369    int8_t mark;
 370
 371    /*
 372     * If ppkt and spkt have the same payload, but ppkt's ACK
 373     * is greater than spkt's ACK, in this case we can not
 374     * send the ppkt because it will cause the secondary guest
 375     * to miss sending some data in the next. Therefore, we
 376     * record the maximum ACK in the current queue at both
 377     * primary side and secondary side. Only when the ack is
 378     * less than the smaller of the two maximum ack, then we
 379     * can ensure that the packet's payload is acknowledged by
 380     * primary and secondary.
 381    */
 382    uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
 383
 384pri:
 385    if (g_queue_is_empty(&conn->primary_list)) {
 386        return;
 387    }
 388    ppkt = g_queue_pop_head(&conn->primary_list);
 389sec:
 390    if (g_queue_is_empty(&conn->secondary_list)) {
 391        g_queue_push_head(&conn->primary_list, ppkt);
 392        return;
 393    }
 394    spkt = g_queue_pop_head(&conn->secondary_list);
 395
 396    if (ppkt->tcp_seq == ppkt->seq_end) {
 397        colo_release_primary_pkt(s, ppkt);
 398        ppkt = NULL;
 399    }
 400
 401    if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
 402        trace_colo_compare_main("pri: this packet has compared");
 403        colo_release_primary_pkt(s, ppkt);
 404        ppkt = NULL;
 405    }
 406
 407    if (spkt->tcp_seq == spkt->seq_end) {
 408        packet_destroy(spkt, NULL);
 409        if (!ppkt) {
 410            goto pri;
 411        } else {
 412            goto sec;
 413        }
 414    } else {
 415        if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
 416            trace_colo_compare_main("sec: this packet has compared");
 417            packet_destroy(spkt, NULL);
 418            if (!ppkt) {
 419                goto pri;
 420            } else {
 421                goto sec;
 422            }
 423        }
 424        if (!ppkt) {
 425            g_queue_push_head(&conn->secondary_list, spkt);
 426            goto pri;
 427        }
 428    }
 429
 430    if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
 431        trace_colo_compare_tcp_info("pri",
 432                                    ppkt->tcp_seq, ppkt->tcp_ack,
 433                                    ppkt->header_size, ppkt->payload_size,
 434                                    ppkt->offset, ppkt->flags);
 435
 436        trace_colo_compare_tcp_info("sec",
 437                                    spkt->tcp_seq, spkt->tcp_ack,
 438                                    spkt->header_size, spkt->payload_size,
 439                                    spkt->offset, spkt->flags);
 440
 441        if (mark == COLO_COMPARE_FREE_PRIMARY) {
 442            conn->compare_seq = ppkt->seq_end;
 443            colo_release_primary_pkt(s, ppkt);
 444            g_queue_push_head(&conn->secondary_list, spkt);
 445            goto pri;
 446        }
 447        if (mark == COLO_COMPARE_FREE_SECONDARY) {
 448            conn->compare_seq = spkt->seq_end;
 449            packet_destroy(spkt, NULL);
 450            goto sec;
 451        }
 452        if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
 453            conn->compare_seq = ppkt->seq_end;
 454            colo_release_primary_pkt(s, ppkt);
 455            packet_destroy(spkt, NULL);
 456            goto pri;
 457        }
 458    } else {
 459        g_queue_push_head(&conn->primary_list, ppkt);
 460        g_queue_push_head(&conn->secondary_list, spkt);
 461
 462        qemu_hexdump((char *)ppkt->data, stderr,
 463                     "colo-compare ppkt", ppkt->size);
 464        qemu_hexdump((char *)spkt->data, stderr,
 465                     "colo-compare spkt", spkt->size);
 466
 467        colo_compare_inconsistency_notify(s);
 468    }
 469}
 470
 471
 472/*
 473 * Called from the compare thread on the primary
 474 * for compare udp packet
 475 */
 476static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
 477{
 478    uint16_t network_header_length = ppkt->ip->ip_hl << 2;
 479    uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
 480
 481    trace_colo_compare_main("compare udp");
 482
 483    /*
 484     * Because of ppkt and spkt are both in the same connection,
 485     * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
 486     * same with spkt. In addition, IP header's Identification is a random
 487     * field, we can handle it in IP fragmentation function later.
 488     * COLO just concern the response net packet payload from primary guest
 489     * and secondary guest are same or not, So we ignored all IP header include
 490     * other field like TOS,TTL,IP Checksum. we only need to compare
 491     * the ip payload here.
 492     */
 493    if (ppkt->size != spkt->size) {
 494        trace_colo_compare_main("UDP: payload size of packets are different");
 495        return -1;
 496    }
 497    if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
 498                                    ppkt->size - offset)) {
 499        trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
 500        trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
 501        if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
 502            qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
 503                         ppkt->size);
 504            qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
 505                         spkt->size);
 506        }
 507        return -1;
 508    } else {
 509        return 0;
 510    }
 511}
 512
 513/*
 514 * Called from the compare thread on the primary
 515 * for compare icmp packet
 516 */
 517static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
 518{
 519    uint16_t network_header_length = ppkt->ip->ip_hl << 2;
 520    uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
 521
 522    trace_colo_compare_main("compare icmp");
 523
 524    /*
 525     * Because of ppkt and spkt are both in the same connection,
 526     * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
 527     * same with spkt. In addition, IP header's Identification is a random
 528     * field, we can handle it in IP fragmentation function later.
 529     * COLO just concern the response net packet payload from primary guest
 530     * and secondary guest are same or not, So we ignored all IP header include
 531     * other field like TOS,TTL,IP Checksum. we only need to compare
 532     * the ip payload here.
 533     */
 534    if (ppkt->size != spkt->size) {
 535        trace_colo_compare_main("ICMP: payload size of packets are different");
 536        return -1;
 537    }
 538    if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
 539                                    ppkt->size - offset)) {
 540        trace_colo_compare_icmp_miscompare("primary pkt size",
 541                                           ppkt->size);
 542        trace_colo_compare_icmp_miscompare("Secondary pkt size",
 543                                           spkt->size);
 544        if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
 545            qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
 546                         ppkt->size);
 547            qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
 548                         spkt->size);
 549        }
 550        return -1;
 551    } else {
 552        return 0;
 553    }
 554}
 555
 556/*
 557 * Called from the compare thread on the primary
 558 * for compare other packet
 559 */
 560static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
 561{
 562    uint16_t offset = ppkt->vnet_hdr_len;
 563
 564    trace_colo_compare_main("compare other");
 565    if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
 566        char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
 567
 568        strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
 569        strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
 570        strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
 571        strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
 572
 573        trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
 574                                   pri_ip_dst, spkt->size,
 575                                   sec_ip_src, sec_ip_dst);
 576    }
 577
 578    if (ppkt->size != spkt->size) {
 579        trace_colo_compare_main("Other: payload size of packets are different");
 580        return -1;
 581    }
 582    return colo_compare_packet_payload(ppkt, spkt, offset, offset,
 583                                       ppkt->size - offset);
 584}
 585
 586static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
 587{
 588    int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 589
 590    if ((now - pkt->creation_ms) > (*check_time)) {
 591        trace_colo_old_packet_check_found(pkt->creation_ms);
 592        return 0;
 593    } else {
 594        return 1;
 595    }
 596}
 597
 598void colo_compare_register_notifier(Notifier *notify)
 599{
 600    notifier_list_add(&colo_compare_notifiers, notify);
 601}
 602
 603void colo_compare_unregister_notifier(Notifier *notify)
 604{
 605    notifier_remove(notify);
 606}
 607
 608static int colo_old_packet_check_one_conn(Connection *conn,
 609                                          CompareState *s)
 610{
 611    GList *result = NULL;
 612
 613    result = g_queue_find_custom(&conn->primary_list,
 614                                 &s->compare_timeout,
 615                                 (GCompareFunc)colo_old_packet_check_one);
 616
 617    if (result) {
 618        /* Do checkpoint will flush old packet */
 619        colo_compare_inconsistency_notify(s);
 620        return 0;
 621    }
 622
 623    return 1;
 624}
 625
 626/*
 627 * Look for old packets that the secondary hasn't matched,
 628 * if we have some then we have to checkpoint to wake
 629 * the secondary up.
 630 */
 631static void colo_old_packet_check(void *opaque)
 632{
 633    CompareState *s = opaque;
 634
 635    /*
 636     * If we find one old packet, stop finding job and notify
 637     * COLO frame do checkpoint.
 638     */
 639    g_queue_find_custom(&s->conn_list, s,
 640                        (GCompareFunc)colo_old_packet_check_one_conn);
 641}
 642
 643static void colo_compare_packet(CompareState *s, Connection *conn,
 644                                int (*HandlePacket)(Packet *spkt,
 645                                Packet *ppkt))
 646{
 647    Packet *pkt = NULL;
 648    GList *result = NULL;
 649
 650    while (!g_queue_is_empty(&conn->primary_list) &&
 651           !g_queue_is_empty(&conn->secondary_list)) {
 652        pkt = g_queue_pop_head(&conn->primary_list);
 653        result = g_queue_find_custom(&conn->secondary_list,
 654                 pkt, (GCompareFunc)HandlePacket);
 655
 656        if (result) {
 657            colo_release_primary_pkt(s, pkt);
 658            g_queue_remove(&conn->secondary_list, result->data);
 659        } else {
 660            /*
 661             * If one packet arrive late, the secondary_list or
 662             * primary_list will be empty, so we can't compare it
 663             * until next comparison. If the packets in the list are
 664             * timeout, it will trigger a checkpoint request.
 665             */
 666            trace_colo_compare_main("packet different");
 667            g_queue_push_head(&conn->primary_list, pkt);
 668
 669            colo_compare_inconsistency_notify(s);
 670            break;
 671        }
 672    }
 673}
 674
 675/*
 676 * Called from the compare thread on the primary
 677 * for compare packet with secondary list of the
 678 * specified connection when a new packet was
 679 * queued to it.
 680 */
 681static void colo_compare_connection(void *opaque, void *user_data)
 682{
 683    CompareState *s = user_data;
 684    Connection *conn = opaque;
 685
 686    switch (conn->ip_proto) {
 687    case IPPROTO_TCP:
 688        colo_compare_tcp(s, conn);
 689        break;
 690    case IPPROTO_UDP:
 691        colo_compare_packet(s, conn, colo_packet_compare_udp);
 692        break;
 693    case IPPROTO_ICMP:
 694        colo_compare_packet(s, conn, colo_packet_compare_icmp);
 695        break;
 696    default:
 697        colo_compare_packet(s, conn, colo_packet_compare_other);
 698        break;
 699    }
 700}
 701
 702static int compare_chr_send(CompareState *s,
 703                            const uint8_t *buf,
 704                            uint32_t size,
 705                            uint32_t vnet_hdr_len,
 706                            bool notify_remote_frame)
 707{
 708    int ret = 0;
 709    uint32_t len = htonl(size);
 710
 711    if (!size) {
 712        return 0;
 713    }
 714
 715    if (notify_remote_frame) {
 716        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
 717                                    (uint8_t *)&len,
 718                                    sizeof(len));
 719    } else {
 720        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
 721    }
 722
 723    if (ret != sizeof(len)) {
 724        goto err;
 725    }
 726
 727    if (s->vnet_hdr) {
 728        /*
 729         * We send vnet header len make other module(like filter-redirector)
 730         * know how to parse net packet correctly.
 731         */
 732        len = htonl(vnet_hdr_len);
 733
 734        if (!notify_remote_frame) {
 735            ret = qemu_chr_fe_write_all(&s->chr_out,
 736                                        (uint8_t *)&len,
 737                                        sizeof(len));
 738        }
 739
 740        if (ret != sizeof(len)) {
 741            goto err;
 742        }
 743    }
 744
 745    if (notify_remote_frame) {
 746        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
 747                                    (uint8_t *)buf,
 748                                    size);
 749    } else {
 750        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
 751    }
 752
 753    if (ret != size) {
 754        goto err;
 755    }
 756
 757    return 0;
 758
 759err:
 760    return ret < 0 ? ret : -EIO;
 761}
 762
 763static int compare_chr_can_read(void *opaque)
 764{
 765    return COMPARE_READ_LEN_MAX;
 766}
 767
 768/*
 769 * Called from the main thread on the primary for packets
 770 * arriving over the socket from the primary.
 771 */
 772static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
 773{
 774    CompareState *s = COLO_COMPARE(opaque);
 775    int ret;
 776
 777    ret = net_fill_rstate(&s->pri_rs, buf, size);
 778    if (ret == -1) {
 779        qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
 780                                 NULL, NULL, true);
 781        error_report("colo-compare primary_in error");
 782    }
 783}
 784
 785/*
 786 * Called from the main thread on the primary for packets
 787 * arriving over the socket from the secondary.
 788 */
 789static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
 790{
 791    CompareState *s = COLO_COMPARE(opaque);
 792    int ret;
 793
 794    ret = net_fill_rstate(&s->sec_rs, buf, size);
 795    if (ret == -1) {
 796        qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
 797                                 NULL, NULL, true);
 798        error_report("colo-compare secondary_in error");
 799    }
 800}
 801
 802static void compare_notify_chr(void *opaque, const uint8_t *buf, int size)
 803{
 804    CompareState *s = COLO_COMPARE(opaque);
 805    int ret;
 806
 807    ret = net_fill_rstate(&s->notify_rs, buf, size);
 808    if (ret == -1) {
 809        qemu_chr_fe_set_handlers(&s->chr_notify_dev, NULL, NULL, NULL, NULL,
 810                                 NULL, NULL, true);
 811        error_report("colo-compare notify_dev error");
 812    }
 813}
 814
 815/*
 816 * Check old packet regularly so it can watch for any packets
 817 * that the secondary hasn't produced equivalents of.
 818 */
 819static void check_old_packet_regular(void *opaque)
 820{
 821    CompareState *s = opaque;
 822
 823    /* if have old packet we will notify checkpoint */
 824    colo_old_packet_check(s);
 825    timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
 826              s->expired_scan_cycle);
 827}
 828
 829/* Public API, Used for COLO frame to notify compare event */
 830void colo_notify_compares_event(void *opaque, int event, Error **errp)
 831{
 832    CompareState *s;
 833
 834    qemu_mutex_lock(&event_mtx);
 835    QTAILQ_FOREACH(s, &net_compares, next) {
 836        s->event = event;
 837        qemu_bh_schedule(s->event_bh);
 838        event_unhandled_count++;
 839    }
 840    /* Wait all compare threads to finish handling this event */
 841    while (event_unhandled_count > 0) {
 842        qemu_cond_wait(&event_complete_cond, &event_mtx);
 843    }
 844
 845    qemu_mutex_unlock(&event_mtx);
 846}
 847
 848static void colo_compare_timer_init(CompareState *s)
 849{
 850    AioContext *ctx = iothread_get_aio_context(s->iothread);
 851
 852    s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
 853                                SCALE_MS, check_old_packet_regular,
 854                                s);
 855    timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
 856              s->expired_scan_cycle);
 857}
 858
 859static void colo_compare_timer_del(CompareState *s)
 860{
 861    if (s->packet_check_timer) {
 862        timer_del(s->packet_check_timer);
 863        timer_free(s->packet_check_timer);
 864        s->packet_check_timer = NULL;
 865    }
 866 }
 867
 868static void colo_flush_packets(void *opaque, void *user_data);
 869
 870static void colo_compare_handle_event(void *opaque)
 871{
 872    CompareState *s = opaque;
 873
 874    switch (s->event) {
 875    case COLO_EVENT_CHECKPOINT:
 876        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
 877        break;
 878    case COLO_EVENT_FAILOVER:
 879        break;
 880    default:
 881        break;
 882    }
 883
 884    qemu_mutex_lock(&event_mtx);
 885    assert(event_unhandled_count > 0);
 886    event_unhandled_count--;
 887    qemu_cond_broadcast(&event_complete_cond);
 888    qemu_mutex_unlock(&event_mtx);
 889}
 890
 891static void colo_compare_iothread(CompareState *s)
 892{
 893    object_ref(OBJECT(s->iothread));
 894    s->worker_context = iothread_get_g_main_context(s->iothread);
 895
 896    qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
 897                             compare_pri_chr_in, NULL, NULL,
 898                             s, s->worker_context, true);
 899    qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
 900                             compare_sec_chr_in, NULL, NULL,
 901                             s, s->worker_context, true);
 902    if (s->notify_dev) {
 903        qemu_chr_fe_set_handlers(&s->chr_notify_dev, compare_chr_can_read,
 904                                 compare_notify_chr, NULL, NULL,
 905                                 s, s->worker_context, true);
 906    }
 907
 908    colo_compare_timer_init(s);
 909    s->event_bh = qemu_bh_new(colo_compare_handle_event, s);
 910}
 911
 912static char *compare_get_pri_indev(Object *obj, Error **errp)
 913{
 914    CompareState *s = COLO_COMPARE(obj);
 915
 916    return g_strdup(s->pri_indev);
 917}
 918
 919static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
 920{
 921    CompareState *s = COLO_COMPARE(obj);
 922
 923    g_free(s->pri_indev);
 924    s->pri_indev = g_strdup(value);
 925}
 926
 927static char *compare_get_sec_indev(Object *obj, Error **errp)
 928{
 929    CompareState *s = COLO_COMPARE(obj);
 930
 931    return g_strdup(s->sec_indev);
 932}
 933
 934static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
 935{
 936    CompareState *s = COLO_COMPARE(obj);
 937
 938    g_free(s->sec_indev);
 939    s->sec_indev = g_strdup(value);
 940}
 941
 942static char *compare_get_outdev(Object *obj, Error **errp)
 943{
 944    CompareState *s = COLO_COMPARE(obj);
 945
 946    return g_strdup(s->outdev);
 947}
 948
 949static void compare_set_outdev(Object *obj, const char *value, Error **errp)
 950{
 951    CompareState *s = COLO_COMPARE(obj);
 952
 953    g_free(s->outdev);
 954    s->outdev = g_strdup(value);
 955}
 956
 957static bool compare_get_vnet_hdr(Object *obj, Error **errp)
 958{
 959    CompareState *s = COLO_COMPARE(obj);
 960
 961    return s->vnet_hdr;
 962}
 963
 964static void compare_set_vnet_hdr(Object *obj,
 965                                 bool value,
 966                                 Error **errp)
 967{
 968    CompareState *s = COLO_COMPARE(obj);
 969
 970    s->vnet_hdr = value;
 971}
 972
 973static char *compare_get_notify_dev(Object *obj, Error **errp)
 974{
 975    CompareState *s = COLO_COMPARE(obj);
 976
 977    return g_strdup(s->notify_dev);
 978}
 979
 980static void compare_set_notify_dev(Object *obj, const char *value, Error **errp)
 981{
 982    CompareState *s = COLO_COMPARE(obj);
 983
 984    g_free(s->notify_dev);
 985    s->notify_dev = g_strdup(value);
 986}
 987
 988static void compare_get_timeout(Object *obj, Visitor *v,
 989                                const char *name, void *opaque,
 990                                Error **errp)
 991{
 992    CompareState *s = COLO_COMPARE(obj);
 993    uint32_t value = s->compare_timeout;
 994
 995    visit_type_uint32(v, name, &value, errp);
 996}
 997
 998static void compare_set_timeout(Object *obj, Visitor *v,
 999                                const char *name, void *opaque,
1000                                Error **errp)
1001{
1002    CompareState *s = COLO_COMPARE(obj);
1003    Error *local_err = NULL;
1004    uint32_t value;
1005
1006    visit_type_uint32(v, name, &value, &local_err);
1007    if (local_err) {
1008        goto out;
1009    }
1010    if (!value) {
1011        error_setg(&local_err, "Property '%s.%s' requires a positive value",
1012                   object_get_typename(obj), name);
1013        goto out;
1014    }
1015    s->compare_timeout = value;
1016
1017out:
1018    error_propagate(errp, local_err);
1019}
1020
1021static void compare_get_expired_scan_cycle(Object *obj, Visitor *v,
1022                                           const char *name, void *opaque,
1023                                           Error **errp)
1024{
1025    CompareState *s = COLO_COMPARE(obj);
1026    uint32_t value = s->expired_scan_cycle;
1027
1028    visit_type_uint32(v, name, &value, errp);
1029}
1030
1031static void compare_set_expired_scan_cycle(Object *obj, Visitor *v,
1032                                           const char *name, void *opaque,
1033                                           Error **errp)
1034{
1035    CompareState *s = COLO_COMPARE(obj);
1036    Error *local_err = NULL;
1037    uint32_t value;
1038
1039    visit_type_uint32(v, name, &value, &local_err);
1040    if (local_err) {
1041        goto out;
1042    }
1043    if (!value) {
1044        error_setg(&local_err, "Property '%s.%s' requires a positive value",
1045                   object_get_typename(obj), name);
1046        goto out;
1047    }
1048    s->expired_scan_cycle = value;
1049
1050out:
1051    error_propagate(errp, local_err);
1052}
1053
1054static void compare_pri_rs_finalize(SocketReadState *pri_rs)
1055{
1056    CompareState *s = container_of(pri_rs, CompareState, pri_rs);
1057    Connection *conn = NULL;
1058
1059    if (packet_enqueue(s, PRIMARY_IN, &conn)) {
1060        trace_colo_compare_main("primary: unsupported packet in");
1061        compare_chr_send(s,
1062                         pri_rs->buf,
1063                         pri_rs->packet_len,
1064                         pri_rs->vnet_hdr_len,
1065                         false);
1066    } else {
1067        /* compare packet in the specified connection */
1068        colo_compare_connection(conn, s);
1069    }
1070}
1071
1072static void compare_sec_rs_finalize(SocketReadState *sec_rs)
1073{
1074    CompareState *s = container_of(sec_rs, CompareState, sec_rs);
1075    Connection *conn = NULL;
1076
1077    if (packet_enqueue(s, SECONDARY_IN, &conn)) {
1078        trace_colo_compare_main("secondary: unsupported packet in");
1079    } else {
1080        /* compare packet in the specified connection */
1081        colo_compare_connection(conn, s);
1082    }
1083}
1084
1085static void compare_notify_rs_finalize(SocketReadState *notify_rs)
1086{
1087    CompareState *s = container_of(notify_rs, CompareState, notify_rs);
1088
1089    const char msg[] = "COLO_COMPARE_GET_XEN_INIT";
1090    int ret;
1091
1092    if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
1093                           notify_rs->buf,
1094                           notify_rs->packet_len)) {
1095        ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
1096        if (ret < 0) {
1097            error_report("Notify Xen COLO-frame INIT failed");
1098        }
1099    } else if (packet_matches_str("COLO_CHECKPOINT",
1100                                  notify_rs->buf,
1101                                  notify_rs->packet_len)) {
1102        /* colo-compare do checkpoint, flush pri packet and remove sec packet */
1103        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1104    } else {
1105        error_report("COLO compare got unsupported instruction");
1106    }
1107}
1108
1109/*
1110 * Return 0 is success.
1111 * Return 1 is failed.
1112 */
1113static int find_and_check_chardev(Chardev **chr,
1114                                  char *chr_name,
1115                                  Error **errp)
1116{
1117    *chr = qemu_chr_find(chr_name);
1118    if (*chr == NULL) {
1119        error_setg(errp, "Device '%s' not found",
1120                   chr_name);
1121        return 1;
1122    }
1123
1124    if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
1125        error_setg(errp, "chardev \"%s\" is not reconnectable",
1126                   chr_name);
1127        return 1;
1128    }
1129
1130    if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) {
1131        error_setg(errp, "chardev \"%s\" cannot switch context",
1132                   chr_name);
1133        return 1;
1134    }
1135
1136    return 0;
1137}
1138
1139/*
1140 * Called from the main thread on the primary
1141 * to setup colo-compare.
1142 */
1143static void colo_compare_complete(UserCreatable *uc, Error **errp)
1144{
1145    CompareState *s = COLO_COMPARE(uc);
1146    Chardev *chr;
1147
1148    if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
1149        error_setg(errp, "colo compare needs 'primary_in' ,"
1150                   "'secondary_in','outdev','iothread' property set");
1151        return;
1152    } else if (!strcmp(s->pri_indev, s->outdev) ||
1153               !strcmp(s->sec_indev, s->outdev) ||
1154               !strcmp(s->pri_indev, s->sec_indev)) {
1155        error_setg(errp, "'indev' and 'outdev' could not be same "
1156                   "for compare module");
1157        return;
1158    }
1159
1160    if (!s->compare_timeout) {
1161        /* Set default value to 3000 MS */
1162        s->compare_timeout = DEFAULT_TIME_OUT_MS;
1163    }
1164
1165    if (!s->expired_scan_cycle) {
1166        /* Set default value to 3000 MS */
1167        s->expired_scan_cycle = REGULAR_PACKET_CHECK_MS;
1168    }
1169
1170    if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
1171        !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
1172        return;
1173    }
1174
1175    if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
1176        !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
1177        return;
1178    }
1179
1180    if (find_and_check_chardev(&chr, s->outdev, errp) ||
1181        !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
1182        return;
1183    }
1184
1185    net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
1186    net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
1187
1188    /* Try to enable remote notify chardev, currently just for Xen COLO */
1189    if (s->notify_dev) {
1190        if (find_and_check_chardev(&chr, s->notify_dev, errp) ||
1191            !qemu_chr_fe_init(&s->chr_notify_dev, chr, errp)) {
1192            return;
1193        }
1194
1195        net_socket_rs_init(&s->notify_rs, compare_notify_rs_finalize,
1196                           s->vnet_hdr);
1197    }
1198
1199    QTAILQ_INSERT_TAIL(&net_compares, s, next);
1200
1201    g_queue_init(&s->conn_list);
1202
1203    qemu_mutex_init(&event_mtx);
1204    qemu_cond_init(&event_complete_cond);
1205
1206    s->connection_track_table = g_hash_table_new_full(connection_key_hash,
1207                                                      connection_key_equal,
1208                                                      g_free,
1209                                                      connection_destroy);
1210
1211    colo_compare_iothread(s);
1212    return;
1213}
1214
1215static void colo_flush_packets(void *opaque, void *user_data)
1216{
1217    CompareState *s = user_data;
1218    Connection *conn = opaque;
1219    Packet *pkt = NULL;
1220
1221    while (!g_queue_is_empty(&conn->primary_list)) {
1222        pkt = g_queue_pop_head(&conn->primary_list);
1223        compare_chr_send(s,
1224                         pkt->data,
1225                         pkt->size,
1226                         pkt->vnet_hdr_len,
1227                         false);
1228        packet_destroy(pkt, NULL);
1229    }
1230    while (!g_queue_is_empty(&conn->secondary_list)) {
1231        pkt = g_queue_pop_head(&conn->secondary_list);
1232        packet_destroy(pkt, NULL);
1233    }
1234}
1235
1236static void colo_compare_class_init(ObjectClass *oc, void *data)
1237{
1238    UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
1239
1240    ucc->complete = colo_compare_complete;
1241}
1242
1243static void colo_compare_init(Object *obj)
1244{
1245    CompareState *s = COLO_COMPARE(obj);
1246
1247    object_property_add_str(obj, "primary_in",
1248                            compare_get_pri_indev, compare_set_pri_indev,
1249                            NULL);
1250    object_property_add_str(obj, "secondary_in",
1251                            compare_get_sec_indev, compare_set_sec_indev,
1252                            NULL);
1253    object_property_add_str(obj, "outdev",
1254                            compare_get_outdev, compare_set_outdev,
1255                            NULL);
1256    object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
1257                            (Object **)&s->iothread,
1258                            object_property_allow_set_link,
1259                            OBJ_PROP_LINK_STRONG, NULL);
1260    /* This parameter just for Xen COLO */
1261    object_property_add_str(obj, "notify_dev",
1262                            compare_get_notify_dev, compare_set_notify_dev,
1263                            NULL);
1264
1265    object_property_add(obj, "compare_timeout", "uint32",
1266                        compare_get_timeout,
1267                        compare_set_timeout, NULL, NULL, NULL);
1268
1269    object_property_add(obj, "expired_scan_cycle", "uint32",
1270                        compare_get_expired_scan_cycle,
1271                        compare_set_expired_scan_cycle, NULL, NULL, NULL);
1272
1273    s->vnet_hdr = false;
1274    object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
1275                             compare_set_vnet_hdr, NULL);
1276}
1277
1278static void colo_compare_finalize(Object *obj)
1279{
1280    CompareState *s = COLO_COMPARE(obj);
1281    CompareState *tmp = NULL;
1282
1283    qemu_chr_fe_deinit(&s->chr_pri_in, false);
1284    qemu_chr_fe_deinit(&s->chr_sec_in, false);
1285    qemu_chr_fe_deinit(&s->chr_out, false);
1286    if (s->notify_dev) {
1287        qemu_chr_fe_deinit(&s->chr_notify_dev, false);
1288    }
1289
1290    if (s->iothread) {
1291        colo_compare_timer_del(s);
1292    }
1293
1294    qemu_bh_delete(s->event_bh);
1295
1296    QTAILQ_FOREACH(tmp, &net_compares, next) {
1297        if (tmp == s) {
1298            QTAILQ_REMOVE(&net_compares, s, next);
1299            break;
1300        }
1301    }
1302
1303    /* Release all unhandled packets after compare thead exited */
1304    g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1305
1306    g_queue_clear(&s->conn_list);
1307
1308    if (s->connection_track_table) {
1309        g_hash_table_destroy(s->connection_track_table);
1310    }
1311
1312    if (s->iothread) {
1313        object_unref(OBJECT(s->iothread));
1314    }
1315
1316    qemu_mutex_destroy(&event_mtx);
1317    qemu_cond_destroy(&event_complete_cond);
1318
1319    g_free(s->pri_indev);
1320    g_free(s->sec_indev);
1321    g_free(s->outdev);
1322    g_free(s->notify_dev);
1323}
1324
1325static const TypeInfo colo_compare_info = {
1326    .name = TYPE_COLO_COMPARE,
1327    .parent = TYPE_OBJECT,
1328    .instance_size = sizeof(CompareState),
1329    .instance_init = colo_compare_init,
1330    .instance_finalize = colo_compare_finalize,
1331    .class_size = sizeof(CompareClass),
1332    .class_init = colo_compare_class_init,
1333    .interfaces = (InterfaceInfo[]) {
1334        { TYPE_USER_CREATABLE },
1335        { }
1336    }
1337};
1338
1339static void register_types(void)
1340{
1341    type_register_static(&colo_compare_info);
1342}
1343
1344type_init(register_types);
1345