qemu/net/colo-compare.c
<<
>>
Prefs
   1/*
   2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
   3 * (a.k.a. Fault Tolerance or Continuous Replication)
   4 *
   5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 * Copyright (c) 2016 Intel Corporation
   8 *
   9 * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
  10 *
  11 * This work is licensed under the terms of the GNU GPL, version 2 or
  12 * later.  See the COPYING file in the top-level directory.
  13 */
  14
  15#include "qemu/osdep.h"
  16#include "qemu-common.h"
  17#include "qemu/error-report.h"
  18#include "trace.h"
  19#include "qapi/error.h"
  20#include "net/net.h"
  21#include "net/eth.h"
  22#include "qom/object_interfaces.h"
  23#include "qemu/iov.h"
  24#include "qom/object.h"
  25#include "net/queue.h"
  26#include "chardev/char-fe.h"
  27#include "qemu/sockets.h"
  28#include "colo.h"
  29#include "sysemu/iothread.h"
  30#include "net/colo-compare.h"
  31#include "migration/colo.h"
  32#include "migration/migration.h"
  33#include "util.h"
  34
  35#define TYPE_COLO_COMPARE "colo-compare"
  36#define COLO_COMPARE(obj) \
  37    OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
  38
  39static QTAILQ_HEAD(, CompareState) net_compares =
  40       QTAILQ_HEAD_INITIALIZER(net_compares);
  41
  42static NotifierList colo_compare_notifiers =
  43    NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
  44
  45#define COMPARE_READ_LEN_MAX NET_BUFSIZE
  46#define MAX_QUEUE_SIZE 1024
  47
  48#define COLO_COMPARE_FREE_PRIMARY     0x01
  49#define COLO_COMPARE_FREE_SECONDARY   0x02
  50
  51/* TODO: Should be configurable */
  52#define REGULAR_PACKET_CHECK_MS 3000
  53
  54static QemuMutex event_mtx;
  55static QemuCond event_complete_cond;
  56static int event_unhandled_count;
  57
  58/*
  59 *  + CompareState ++
  60 *  |               |
  61 *  +---------------+   +---------------+         +---------------+
  62 *  |   conn list   + - >      conn     + ------- >      conn     + -- > ......
  63 *  +---------------+   +---------------+         +---------------+
  64 *  |               |     |           |             |          |
  65 *  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
  66 *                    |primary |  |secondary    |primary | |secondary
  67 *                    |packet  |  |packet  +    |packet  | |packet  +
  68 *                    +--------+  +--------+    +--------+ +--------+
  69 *                        |           |             |          |
  70 *                    +---v----+  +---v----+    +---v----+ +---v----+
  71 *                    |primary |  |secondary    |primary | |secondary
  72 *                    |packet  |  |packet  +    |packet  | |packet  +
  73 *                    +--------+  +--------+    +--------+ +--------+
  74 *                        |           |             |          |
  75 *                    +---v----+  +---v----+    +---v----+ +---v----+
  76 *                    |primary |  |secondary    |primary | |secondary
  77 *                    |packet  |  |packet  +    |packet  | |packet  +
  78 *                    +--------+  +--------+    +--------+ +--------+
  79 */
  80typedef struct CompareState {
  81    Object parent;
  82
  83    char *pri_indev;
  84    char *sec_indev;
  85    char *outdev;
  86    char *notify_dev;
  87    CharBackend chr_pri_in;
  88    CharBackend chr_sec_in;
  89    CharBackend chr_out;
  90    CharBackend chr_notify_dev;
  91    SocketReadState pri_rs;
  92    SocketReadState sec_rs;
  93    SocketReadState notify_rs;
  94    bool vnet_hdr;
  95
  96    /*
  97     * Record the connection that through the NIC
  98     * Element type: Connection
  99     */
 100    GQueue conn_list;
 101    /* Record the connection without repetition */
 102    GHashTable *connection_track_table;
 103
 104    IOThread *iothread;
 105    GMainContext *worker_context;
 106    QEMUTimer *packet_check_timer;
 107
 108    QEMUBH *event_bh;
 109    enum colo_event event;
 110
 111    QTAILQ_ENTRY(CompareState) next;
 112} CompareState;
 113
 114typedef struct CompareClass {
 115    ObjectClass parent_class;
 116} CompareClass;
 117
 118enum {
 119    PRIMARY_IN = 0,
 120    SECONDARY_IN,
 121};
 122
 123
 124static int compare_chr_send(CompareState *s,
 125                            const uint8_t *buf,
 126                            uint32_t size,
 127                            uint32_t vnet_hdr_len,
 128                            bool notify_remote_frame);
 129
 130static bool packet_matches_str(const char *str,
 131                               const uint8_t *buf,
 132                               uint32_t packet_len)
 133{
 134    if (packet_len != strlen(str)) {
 135        return false;
 136    }
 137
 138    return !memcmp(str, buf, strlen(str));
 139}
 140
 141static void notify_remote_frame(CompareState *s)
 142{
 143    char msg[] = "DO_CHECKPOINT";
 144    int ret = 0;
 145
 146    ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
 147    if (ret < 0) {
 148        error_report("Notify Xen COLO-frame failed");
 149    }
 150}
 151
 152static void colo_compare_inconsistency_notify(CompareState *s)
 153{
 154    if (s->notify_dev) {
 155        notify_remote_frame(s);
 156    } else {
 157        notifier_list_notify(&colo_compare_notifiers,
 158                             migrate_get_current());
 159    }
 160}
 161
 162static gint seq_sorter(Packet *a, Packet *b, gpointer data)
 163{
 164    struct tcp_hdr *atcp, *btcp;
 165
 166    atcp = (struct tcp_hdr *)(a->transport_header);
 167    btcp = (struct tcp_hdr *)(b->transport_header);
 168    return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
 169}
 170
 171static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
 172{
 173    Packet *pkt = data;
 174    struct tcp_hdr *tcphd;
 175
 176    tcphd = (struct tcp_hdr *)pkt->transport_header;
 177
 178    pkt->tcp_seq = ntohl(tcphd->th_seq);
 179    pkt->tcp_ack = ntohl(tcphd->th_ack);
 180    *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
 181    pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
 182                       + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
 183    pkt->payload_size = pkt->size - pkt->header_size;
 184    pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
 185    pkt->flags = tcphd->th_flags;
 186}
 187
 188/*
 189 * Return 1 on success, if return 0 means the
 190 * packet will be dropped
 191 */
 192static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
 193{
 194    if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) {
 195        if (pkt->ip->ip_p == IPPROTO_TCP) {
 196            fill_pkt_tcp_info(pkt, max_ack);
 197            g_queue_insert_sorted(queue,
 198                                  pkt,
 199                                  (GCompareDataFunc)seq_sorter,
 200                                  NULL);
 201        } else {
 202            g_queue_push_tail(queue, pkt);
 203        }
 204        return 1;
 205    }
 206    return 0;
 207}
 208
 209/*
 210 * Return 0 on success, if return -1 means the pkt
 211 * is unsupported(arp and ipv6) and will be sent later
 212 */
 213static int packet_enqueue(CompareState *s, int mode, Connection **con)
 214{
 215    ConnectionKey key;
 216    Packet *pkt = NULL;
 217    Connection *conn;
 218
 219    if (mode == PRIMARY_IN) {
 220        pkt = packet_new(s->pri_rs.buf,
 221                         s->pri_rs.packet_len,
 222                         s->pri_rs.vnet_hdr_len);
 223    } else {
 224        pkt = packet_new(s->sec_rs.buf,
 225                         s->sec_rs.packet_len,
 226                         s->sec_rs.vnet_hdr_len);
 227    }
 228
 229    if (parse_packet_early(pkt)) {
 230        packet_destroy(pkt, NULL);
 231        pkt = NULL;
 232        return -1;
 233    }
 234    fill_connection_key(pkt, &key);
 235
 236    conn = connection_get(s->connection_track_table,
 237                          &key,
 238                          &s->conn_list);
 239
 240    if (!conn->processing) {
 241        g_queue_push_tail(&s->conn_list, conn);
 242        conn->processing = true;
 243    }
 244
 245    if (mode == PRIMARY_IN) {
 246        if (!colo_insert_packet(&conn->primary_list, pkt, &conn->pack)) {
 247            error_report("colo compare primary queue size too big,"
 248                         "drop packet");
 249        }
 250    } else {
 251        if (!colo_insert_packet(&conn->secondary_list, pkt, &conn->sack)) {
 252            error_report("colo compare secondary queue size too big,"
 253                         "drop packet");
 254        }
 255    }
 256    *con = conn;
 257
 258    return 0;
 259}
 260
 261static inline bool after(uint32_t seq1, uint32_t seq2)
 262{
 263        return (int32_t)(seq1 - seq2) > 0;
 264}
 265
 266static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
 267{
 268    int ret;
 269    ret = compare_chr_send(s,
 270                           pkt->data,
 271                           pkt->size,
 272                           pkt->vnet_hdr_len,
 273                           false);
 274    if (ret < 0) {
 275        error_report("colo send primary packet failed");
 276    }
 277    trace_colo_compare_main("packet same and release packet");
 278    packet_destroy(pkt, NULL);
 279}
 280
 281/*
 282 * The IP packets sent by primary and secondary
 283 * will be compared in here
 284 * TODO support ip fragment, Out-Of-Order
 285 * return:    0  means packet same
 286 *            > 0 || < 0 means packet different
 287 */
 288static int colo_compare_packet_payload(Packet *ppkt,
 289                                       Packet *spkt,
 290                                       uint16_t poffset,
 291                                       uint16_t soffset,
 292                                       uint16_t len)
 293
 294{
 295    if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
 296        char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
 297
 298        strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
 299        strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
 300        strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
 301        strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
 302
 303        trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
 304                                   pri_ip_dst, spkt->size,
 305                                   sec_ip_src, sec_ip_dst);
 306    }
 307
 308    return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
 309}
 310
 311/*
 312 * return true means that the payload is consist and
 313 * need to make the next comparison, false means do
 314 * the checkpoint
 315*/
 316static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
 317                              int8_t *mark, uint32_t max_ack)
 318{
 319    *mark = 0;
 320
 321    if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
 322        if (!colo_compare_packet_payload(ppkt, spkt,
 323                                        ppkt->header_size, spkt->header_size,
 324                                        ppkt->payload_size)) {
 325            *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
 326            return true;
 327        }
 328    }
 329
 330    /* one part of secondary packet payload still need to be compared */
 331    if (!after(ppkt->seq_end, spkt->seq_end)) {
 332        if (!colo_compare_packet_payload(ppkt, spkt,
 333                                        ppkt->header_size + ppkt->offset,
 334                                        spkt->header_size + spkt->offset,
 335                                        ppkt->payload_size - ppkt->offset)) {
 336            if (!after(ppkt->tcp_ack, max_ack)) {
 337                *mark = COLO_COMPARE_FREE_PRIMARY;
 338                spkt->offset += ppkt->payload_size - ppkt->offset;
 339                return true;
 340            } else {
 341                /* secondary guest hasn't ack the data, don't send
 342                 * out this packet
 343                 */
 344                return false;
 345            }
 346        }
 347    } else {
 348        /* primary packet is longer than secondary packet, compare
 349         * the same part and mark the primary packet offset
 350         */
 351        if (!colo_compare_packet_payload(ppkt, spkt,
 352                                        ppkt->header_size + ppkt->offset,
 353                                        spkt->header_size + spkt->offset,
 354                                        spkt->payload_size - spkt->offset)) {
 355            *mark = COLO_COMPARE_FREE_SECONDARY;
 356            ppkt->offset += spkt->payload_size - spkt->offset;
 357            return true;
 358        }
 359    }
 360
 361    return false;
 362}
 363
 364static void colo_compare_tcp(CompareState *s, Connection *conn)
 365{
 366    Packet *ppkt = NULL, *spkt = NULL;
 367    int8_t mark;
 368
 369    /*
 370     * If ppkt and spkt have the same payload, but ppkt's ACK
 371     * is greater than spkt's ACK, in this case we can not
 372     * send the ppkt because it will cause the secondary guest
 373     * to miss sending some data in the next. Therefore, we
 374     * record the maximum ACK in the current queue at both
 375     * primary side and secondary side. Only when the ack is
 376     * less than the smaller of the two maximum ack, then we
 377     * can ensure that the packet's payload is acknowledged by
 378     * primary and secondary.
 379    */
 380    uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
 381
 382pri:
 383    if (g_queue_is_empty(&conn->primary_list)) {
 384        return;
 385    }
 386    ppkt = g_queue_pop_head(&conn->primary_list);
 387sec:
 388    if (g_queue_is_empty(&conn->secondary_list)) {
 389        g_queue_push_head(&conn->primary_list, ppkt);
 390        return;
 391    }
 392    spkt = g_queue_pop_head(&conn->secondary_list);
 393
 394    if (ppkt->tcp_seq == ppkt->seq_end) {
 395        colo_release_primary_pkt(s, ppkt);
 396        ppkt = NULL;
 397    }
 398
 399    if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
 400        trace_colo_compare_main("pri: this packet has compared");
 401        colo_release_primary_pkt(s, ppkt);
 402        ppkt = NULL;
 403    }
 404
 405    if (spkt->tcp_seq == spkt->seq_end) {
 406        packet_destroy(spkt, NULL);
 407        if (!ppkt) {
 408            goto pri;
 409        } else {
 410            goto sec;
 411        }
 412    } else {
 413        if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
 414            trace_colo_compare_main("sec: this packet has compared");
 415            packet_destroy(spkt, NULL);
 416            if (!ppkt) {
 417                goto pri;
 418            } else {
 419                goto sec;
 420            }
 421        }
 422        if (!ppkt) {
 423            g_queue_push_head(&conn->secondary_list, spkt);
 424            goto pri;
 425        }
 426    }
 427
 428    if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
 429        trace_colo_compare_tcp_info("pri",
 430                                    ppkt->tcp_seq, ppkt->tcp_ack,
 431                                    ppkt->header_size, ppkt->payload_size,
 432                                    ppkt->offset, ppkt->flags);
 433
 434        trace_colo_compare_tcp_info("sec",
 435                                    spkt->tcp_seq, spkt->tcp_ack,
 436                                    spkt->header_size, spkt->payload_size,
 437                                    spkt->offset, spkt->flags);
 438
 439        if (mark == COLO_COMPARE_FREE_PRIMARY) {
 440            conn->compare_seq = ppkt->seq_end;
 441            colo_release_primary_pkt(s, ppkt);
 442            g_queue_push_head(&conn->secondary_list, spkt);
 443            goto pri;
 444        }
 445        if (mark == COLO_COMPARE_FREE_SECONDARY) {
 446            conn->compare_seq = spkt->seq_end;
 447            packet_destroy(spkt, NULL);
 448            goto sec;
 449        }
 450        if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
 451            conn->compare_seq = ppkt->seq_end;
 452            colo_release_primary_pkt(s, ppkt);
 453            packet_destroy(spkt, NULL);
 454            goto pri;
 455        }
 456    } else {
 457        g_queue_push_head(&conn->primary_list, ppkt);
 458        g_queue_push_head(&conn->secondary_list, spkt);
 459
 460        qemu_hexdump((char *)ppkt->data, stderr,
 461                     "colo-compare ppkt", ppkt->size);
 462        qemu_hexdump((char *)spkt->data, stderr,
 463                     "colo-compare spkt", spkt->size);
 464
 465        colo_compare_inconsistency_notify(s);
 466    }
 467}
 468
 469
 470/*
 471 * Called from the compare thread on the primary
 472 * for compare udp packet
 473 */
 474static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
 475{
 476    uint16_t network_header_length = ppkt->ip->ip_hl << 2;
 477    uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
 478
 479    trace_colo_compare_main("compare udp");
 480
 481    /*
 482     * Because of ppkt and spkt are both in the same connection,
 483     * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
 484     * same with spkt. In addition, IP header's Identification is a random
 485     * field, we can handle it in IP fragmentation function later.
 486     * COLO just concern the response net packet payload from primary guest
 487     * and secondary guest are same or not, So we ignored all IP header include
 488     * other field like TOS,TTL,IP Checksum. we only need to compare
 489     * the ip payload here.
 490     */
 491    if (ppkt->size != spkt->size) {
 492        trace_colo_compare_main("UDP: payload size of packets are different");
 493        return -1;
 494    }
 495    if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
 496                                    ppkt->size - offset)) {
 497        trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
 498        trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
 499        if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
 500            qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
 501                         ppkt->size);
 502            qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
 503                         spkt->size);
 504        }
 505        return -1;
 506    } else {
 507        return 0;
 508    }
 509}
 510
 511/*
 512 * Called from the compare thread on the primary
 513 * for compare icmp packet
 514 */
 515static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
 516{
 517    uint16_t network_header_length = ppkt->ip->ip_hl << 2;
 518    uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
 519
 520    trace_colo_compare_main("compare icmp");
 521
 522    /*
 523     * Because of ppkt and spkt are both in the same connection,
 524     * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
 525     * same with spkt. In addition, IP header's Identification is a random
 526     * field, we can handle it in IP fragmentation function later.
 527     * COLO just concern the response net packet payload from primary guest
 528     * and secondary guest are same or not, So we ignored all IP header include
 529     * other field like TOS,TTL,IP Checksum. we only need to compare
 530     * the ip payload here.
 531     */
 532    if (ppkt->size != spkt->size) {
 533        trace_colo_compare_main("ICMP: payload size of packets are different");
 534        return -1;
 535    }
 536    if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
 537                                    ppkt->size - offset)) {
 538        trace_colo_compare_icmp_miscompare("primary pkt size",
 539                                           ppkt->size);
 540        trace_colo_compare_icmp_miscompare("Secondary pkt size",
 541                                           spkt->size);
 542        if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
 543            qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
 544                         ppkt->size);
 545            qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
 546                         spkt->size);
 547        }
 548        return -1;
 549    } else {
 550        return 0;
 551    }
 552}
 553
 554/*
 555 * Called from the compare thread on the primary
 556 * for compare other packet
 557 */
 558static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
 559{
 560    uint16_t offset = ppkt->vnet_hdr_len;
 561
 562    trace_colo_compare_main("compare other");
 563    if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
 564        char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
 565
 566        strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
 567        strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
 568        strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
 569        strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
 570
 571        trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
 572                                   pri_ip_dst, spkt->size,
 573                                   sec_ip_src, sec_ip_dst);
 574    }
 575
 576    if (ppkt->size != spkt->size) {
 577        trace_colo_compare_main("Other: payload size of packets are different");
 578        return -1;
 579    }
 580    return colo_compare_packet_payload(ppkt, spkt, offset, offset,
 581                                       ppkt->size - offset);
 582}
 583
 584static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
 585{
 586    int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 587
 588    if ((now - pkt->creation_ms) > (*check_time)) {
 589        trace_colo_old_packet_check_found(pkt->creation_ms);
 590        return 0;
 591    } else {
 592        return 1;
 593    }
 594}
 595
 596void colo_compare_register_notifier(Notifier *notify)
 597{
 598    notifier_list_add(&colo_compare_notifiers, notify);
 599}
 600
 601void colo_compare_unregister_notifier(Notifier *notify)
 602{
 603    notifier_remove(notify);
 604}
 605
 606static int colo_old_packet_check_one_conn(Connection *conn,
 607                                          CompareState *s)
 608{
 609    GList *result = NULL;
 610    int64_t check_time = REGULAR_PACKET_CHECK_MS;
 611
 612    result = g_queue_find_custom(&conn->primary_list,
 613                                 &check_time,
 614                                 (GCompareFunc)colo_old_packet_check_one);
 615
 616    if (result) {
 617        /* Do checkpoint will flush old packet */
 618        colo_compare_inconsistency_notify(s);
 619        return 0;
 620    }
 621
 622    return 1;
 623}
 624
 625/*
 626 * Look for old packets that the secondary hasn't matched,
 627 * if we have some then we have to checkpoint to wake
 628 * the secondary up.
 629 */
 630static void colo_old_packet_check(void *opaque)
 631{
 632    CompareState *s = opaque;
 633
 634    /*
 635     * If we find one old packet, stop finding job and notify
 636     * COLO frame do checkpoint.
 637     */
 638    g_queue_find_custom(&s->conn_list, s,
 639                        (GCompareFunc)colo_old_packet_check_one_conn);
 640}
 641
 642static void colo_compare_packet(CompareState *s, Connection *conn,
 643                                int (*HandlePacket)(Packet *spkt,
 644                                Packet *ppkt))
 645{
 646    Packet *pkt = NULL;
 647    GList *result = NULL;
 648
 649    while (!g_queue_is_empty(&conn->primary_list) &&
 650           !g_queue_is_empty(&conn->secondary_list)) {
 651        pkt = g_queue_pop_head(&conn->primary_list);
 652        result = g_queue_find_custom(&conn->secondary_list,
 653                 pkt, (GCompareFunc)HandlePacket);
 654
 655        if (result) {
 656            colo_release_primary_pkt(s, pkt);
 657            g_queue_remove(&conn->secondary_list, result->data);
 658        } else {
 659            /*
 660             * If one packet arrive late, the secondary_list or
 661             * primary_list will be empty, so we can't compare it
 662             * until next comparison. If the packets in the list are
 663             * timeout, it will trigger a checkpoint request.
 664             */
 665            trace_colo_compare_main("packet different");
 666            g_queue_push_head(&conn->primary_list, pkt);
 667
 668            colo_compare_inconsistency_notify(s);
 669            break;
 670        }
 671    }
 672}
 673
 674/*
 675 * Called from the compare thread on the primary
 676 * for compare packet with secondary list of the
 677 * specified connection when a new packet was
 678 * queued to it.
 679 */
 680static void colo_compare_connection(void *opaque, void *user_data)
 681{
 682    CompareState *s = user_data;
 683    Connection *conn = opaque;
 684
 685    switch (conn->ip_proto) {
 686    case IPPROTO_TCP:
 687        colo_compare_tcp(s, conn);
 688        break;
 689    case IPPROTO_UDP:
 690        colo_compare_packet(s, conn, colo_packet_compare_udp);
 691        break;
 692    case IPPROTO_ICMP:
 693        colo_compare_packet(s, conn, colo_packet_compare_icmp);
 694        break;
 695    default:
 696        colo_compare_packet(s, conn, colo_packet_compare_other);
 697        break;
 698    }
 699}
 700
 701static int compare_chr_send(CompareState *s,
 702                            const uint8_t *buf,
 703                            uint32_t size,
 704                            uint32_t vnet_hdr_len,
 705                            bool notify_remote_frame)
 706{
 707    int ret = 0;
 708    uint32_t len = htonl(size);
 709
 710    if (!size) {
 711        return 0;
 712    }
 713
 714    if (notify_remote_frame) {
 715        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
 716                                    (uint8_t *)&len,
 717                                    sizeof(len));
 718    } else {
 719        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
 720    }
 721
 722    if (ret != sizeof(len)) {
 723        goto err;
 724    }
 725
 726    if (s->vnet_hdr) {
 727        /*
 728         * We send vnet header len make other module(like filter-redirector)
 729         * know how to parse net packet correctly.
 730         */
 731        len = htonl(vnet_hdr_len);
 732
 733        if (!notify_remote_frame) {
 734            ret = qemu_chr_fe_write_all(&s->chr_out,
 735                                        (uint8_t *)&len,
 736                                        sizeof(len));
 737        }
 738
 739        if (ret != sizeof(len)) {
 740            goto err;
 741        }
 742    }
 743
 744    if (notify_remote_frame) {
 745        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
 746                                    (uint8_t *)buf,
 747                                    size);
 748    } else {
 749        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
 750    }
 751
 752    if (ret != size) {
 753        goto err;
 754    }
 755
 756    return 0;
 757
 758err:
 759    return ret < 0 ? ret : -EIO;
 760}
 761
 762static int compare_chr_can_read(void *opaque)
 763{
 764    return COMPARE_READ_LEN_MAX;
 765}
 766
 767/*
 768 * Called from the main thread on the primary for packets
 769 * arriving over the socket from the primary.
 770 */
 771static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
 772{
 773    CompareState *s = COLO_COMPARE(opaque);
 774    int ret;
 775
 776    ret = net_fill_rstate(&s->pri_rs, buf, size);
 777    if (ret == -1) {
 778        qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
 779                                 NULL, NULL, true);
 780        error_report("colo-compare primary_in error");
 781    }
 782}
 783
 784/*
 785 * Called from the main thread on the primary for packets
 786 * arriving over the socket from the secondary.
 787 */
 788static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
 789{
 790    CompareState *s = COLO_COMPARE(opaque);
 791    int ret;
 792
 793    ret = net_fill_rstate(&s->sec_rs, buf, size);
 794    if (ret == -1) {
 795        qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
 796                                 NULL, NULL, true);
 797        error_report("colo-compare secondary_in error");
 798    }
 799}
 800
 801static void compare_notify_chr(void *opaque, const uint8_t *buf, int size)
 802{
 803    CompareState *s = COLO_COMPARE(opaque);
 804    int ret;
 805
 806    ret = net_fill_rstate(&s->notify_rs, buf, size);
 807    if (ret == -1) {
 808        qemu_chr_fe_set_handlers(&s->chr_notify_dev, NULL, NULL, NULL, NULL,
 809                                 NULL, NULL, true);
 810        error_report("colo-compare notify_dev error");
 811    }
 812}
 813
 814/*
 815 * Check old packet regularly so it can watch for any packets
 816 * that the secondary hasn't produced equivalents of.
 817 */
 818static void check_old_packet_regular(void *opaque)
 819{
 820    CompareState *s = opaque;
 821
 822    /* if have old packet we will notify checkpoint */
 823    colo_old_packet_check(s);
 824    timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
 825                REGULAR_PACKET_CHECK_MS);
 826}
 827
 828/* Public API, Used for COLO frame to notify compare event */
 829void colo_notify_compares_event(void *opaque, int event, Error **errp)
 830{
 831    CompareState *s;
 832
 833    qemu_mutex_lock(&event_mtx);
 834    QTAILQ_FOREACH(s, &net_compares, next) {
 835        s->event = event;
 836        qemu_bh_schedule(s->event_bh);
 837        event_unhandled_count++;
 838    }
 839    /* Wait all compare threads to finish handling this event */
 840    while (event_unhandled_count > 0) {
 841        qemu_cond_wait(&event_complete_cond, &event_mtx);
 842    }
 843
 844    qemu_mutex_unlock(&event_mtx);
 845}
 846
 847static void colo_compare_timer_init(CompareState *s)
 848{
 849    AioContext *ctx = iothread_get_aio_context(s->iothread);
 850
 851    s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
 852                                SCALE_MS, check_old_packet_regular,
 853                                s);
 854    timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
 855                    REGULAR_PACKET_CHECK_MS);
 856}
 857
 858static void colo_compare_timer_del(CompareState *s)
 859{
 860    if (s->packet_check_timer) {
 861        timer_del(s->packet_check_timer);
 862        timer_free(s->packet_check_timer);
 863        s->packet_check_timer = NULL;
 864    }
 865 }
 866
 867static void colo_flush_packets(void *opaque, void *user_data);
 868
 869static void colo_compare_handle_event(void *opaque)
 870{
 871    CompareState *s = opaque;
 872
 873    switch (s->event) {
 874    case COLO_EVENT_CHECKPOINT:
 875        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
 876        break;
 877    case COLO_EVENT_FAILOVER:
 878        break;
 879    default:
 880        break;
 881    }
 882
 883    qemu_mutex_lock(&event_mtx);
 884    assert(event_unhandled_count > 0);
 885    event_unhandled_count--;
 886    qemu_cond_broadcast(&event_complete_cond);
 887    qemu_mutex_unlock(&event_mtx);
 888}
 889
 890static void colo_compare_iothread(CompareState *s)
 891{
 892    object_ref(OBJECT(s->iothread));
 893    s->worker_context = iothread_get_g_main_context(s->iothread);
 894
 895    qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
 896                             compare_pri_chr_in, NULL, NULL,
 897                             s, s->worker_context, true);
 898    qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
 899                             compare_sec_chr_in, NULL, NULL,
 900                             s, s->worker_context, true);
 901    if (s->notify_dev) {
 902        qemu_chr_fe_set_handlers(&s->chr_notify_dev, compare_chr_can_read,
 903                                 compare_notify_chr, NULL, NULL,
 904                                 s, s->worker_context, true);
 905    }
 906
 907    colo_compare_timer_init(s);
 908    s->event_bh = qemu_bh_new(colo_compare_handle_event, s);
 909}
 910
 911static char *compare_get_pri_indev(Object *obj, Error **errp)
 912{
 913    CompareState *s = COLO_COMPARE(obj);
 914
 915    return g_strdup(s->pri_indev);
 916}
 917
 918static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
 919{
 920    CompareState *s = COLO_COMPARE(obj);
 921
 922    g_free(s->pri_indev);
 923    s->pri_indev = g_strdup(value);
 924}
 925
 926static char *compare_get_sec_indev(Object *obj, Error **errp)
 927{
 928    CompareState *s = COLO_COMPARE(obj);
 929
 930    return g_strdup(s->sec_indev);
 931}
 932
 933static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
 934{
 935    CompareState *s = COLO_COMPARE(obj);
 936
 937    g_free(s->sec_indev);
 938    s->sec_indev = g_strdup(value);
 939}
 940
 941static char *compare_get_outdev(Object *obj, Error **errp)
 942{
 943    CompareState *s = COLO_COMPARE(obj);
 944
 945    return g_strdup(s->outdev);
 946}
 947
 948static void compare_set_outdev(Object *obj, const char *value, Error **errp)
 949{
 950    CompareState *s = COLO_COMPARE(obj);
 951
 952    g_free(s->outdev);
 953    s->outdev = g_strdup(value);
 954}
 955
 956static bool compare_get_vnet_hdr(Object *obj, Error **errp)
 957{
 958    CompareState *s = COLO_COMPARE(obj);
 959
 960    return s->vnet_hdr;
 961}
 962
 963static void compare_set_vnet_hdr(Object *obj,
 964                                 bool value,
 965                                 Error **errp)
 966{
 967    CompareState *s = COLO_COMPARE(obj);
 968
 969    s->vnet_hdr = value;
 970}
 971
 972static char *compare_get_notify_dev(Object *obj, Error **errp)
 973{
 974    CompareState *s = COLO_COMPARE(obj);
 975
 976    return g_strdup(s->notify_dev);
 977}
 978
 979static void compare_set_notify_dev(Object *obj, const char *value, Error **errp)
 980{
 981    CompareState *s = COLO_COMPARE(obj);
 982
 983    g_free(s->notify_dev);
 984    s->notify_dev = g_strdup(value);
 985}
 986
 987static void compare_pri_rs_finalize(SocketReadState *pri_rs)
 988{
 989    CompareState *s = container_of(pri_rs, CompareState, pri_rs);
 990    Connection *conn = NULL;
 991
 992    if (packet_enqueue(s, PRIMARY_IN, &conn)) {
 993        trace_colo_compare_main("primary: unsupported packet in");
 994        compare_chr_send(s,
 995                         pri_rs->buf,
 996                         pri_rs->packet_len,
 997                         pri_rs->vnet_hdr_len,
 998                         false);
 999    } else {
1000        /* compare packet in the specified connection */
1001        colo_compare_connection(conn, s);
1002    }
1003}
1004
1005static void compare_sec_rs_finalize(SocketReadState *sec_rs)
1006{
1007    CompareState *s = container_of(sec_rs, CompareState, sec_rs);
1008    Connection *conn = NULL;
1009
1010    if (packet_enqueue(s, SECONDARY_IN, &conn)) {
1011        trace_colo_compare_main("secondary: unsupported packet in");
1012    } else {
1013        /* compare packet in the specified connection */
1014        colo_compare_connection(conn, s);
1015    }
1016}
1017
1018static void compare_notify_rs_finalize(SocketReadState *notify_rs)
1019{
1020    CompareState *s = container_of(notify_rs, CompareState, notify_rs);
1021
1022    const char msg[] = "COLO_COMPARE_GET_XEN_INIT";
1023    int ret;
1024
1025    if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
1026                           notify_rs->buf,
1027                           notify_rs->packet_len)) {
1028        ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
1029        if (ret < 0) {
1030            error_report("Notify Xen COLO-frame INIT failed");
1031        }
1032    } else if (packet_matches_str("COLO_CHECKPOINT",
1033                                  notify_rs->buf,
1034                                  notify_rs->packet_len)) {
1035        /* colo-compare do checkpoint, flush pri packet and remove sec packet */
1036        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1037    } else {
1038        error_report("COLO compare got unsupported instruction");
1039    }
1040}
1041
1042/*
1043 * Return 0 is success.
1044 * Return 1 is failed.
1045 */
1046static int find_and_check_chardev(Chardev **chr,
1047                                  char *chr_name,
1048                                  Error **errp)
1049{
1050    *chr = qemu_chr_find(chr_name);
1051    if (*chr == NULL) {
1052        error_setg(errp, "Device '%s' not found",
1053                   chr_name);
1054        return 1;
1055    }
1056
1057    if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
1058        error_setg(errp, "chardev \"%s\" is not reconnectable",
1059                   chr_name);
1060        return 1;
1061    }
1062
1063    if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) {
1064        error_setg(errp, "chardev \"%s\" cannot switch context",
1065                   chr_name);
1066        return 1;
1067    }
1068
1069    return 0;
1070}
1071
1072/*
1073 * Called from the main thread on the primary
1074 * to setup colo-compare.
1075 */
1076static void colo_compare_complete(UserCreatable *uc, Error **errp)
1077{
1078    CompareState *s = COLO_COMPARE(uc);
1079    Chardev *chr;
1080
1081    if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
1082        error_setg(errp, "colo compare needs 'primary_in' ,"
1083                   "'secondary_in','outdev','iothread' property set");
1084        return;
1085    } else if (!strcmp(s->pri_indev, s->outdev) ||
1086               !strcmp(s->sec_indev, s->outdev) ||
1087               !strcmp(s->pri_indev, s->sec_indev)) {
1088        error_setg(errp, "'indev' and 'outdev' could not be same "
1089                   "for compare module");
1090        return;
1091    }
1092
1093    if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
1094        !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
1095        return;
1096    }
1097
1098    if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
1099        !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
1100        return;
1101    }
1102
1103    if (find_and_check_chardev(&chr, s->outdev, errp) ||
1104        !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
1105        return;
1106    }
1107
1108    net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
1109    net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
1110
1111    /* Try to enable remote notify chardev, currently just for Xen COLO */
1112    if (s->notify_dev) {
1113        if (find_and_check_chardev(&chr, s->notify_dev, errp) ||
1114            !qemu_chr_fe_init(&s->chr_notify_dev, chr, errp)) {
1115            return;
1116        }
1117
1118        net_socket_rs_init(&s->notify_rs, compare_notify_rs_finalize,
1119                           s->vnet_hdr);
1120    }
1121
1122    QTAILQ_INSERT_TAIL(&net_compares, s, next);
1123
1124    g_queue_init(&s->conn_list);
1125
1126    qemu_mutex_init(&event_mtx);
1127    qemu_cond_init(&event_complete_cond);
1128
1129    s->connection_track_table = g_hash_table_new_full(connection_key_hash,
1130                                                      connection_key_equal,
1131                                                      g_free,
1132                                                      connection_destroy);
1133
1134    colo_compare_iothread(s);
1135    return;
1136}
1137
1138static void colo_flush_packets(void *opaque, void *user_data)
1139{
1140    CompareState *s = user_data;
1141    Connection *conn = opaque;
1142    Packet *pkt = NULL;
1143
1144    while (!g_queue_is_empty(&conn->primary_list)) {
1145        pkt = g_queue_pop_head(&conn->primary_list);
1146        compare_chr_send(s,
1147                         pkt->data,
1148                         pkt->size,
1149                         pkt->vnet_hdr_len,
1150                         false);
1151        packet_destroy(pkt, NULL);
1152    }
1153    while (!g_queue_is_empty(&conn->secondary_list)) {
1154        pkt = g_queue_pop_head(&conn->secondary_list);
1155        packet_destroy(pkt, NULL);
1156    }
1157}
1158
1159static void colo_compare_class_init(ObjectClass *oc, void *data)
1160{
1161    UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
1162
1163    ucc->complete = colo_compare_complete;
1164}
1165
1166static void colo_compare_init(Object *obj)
1167{
1168    CompareState *s = COLO_COMPARE(obj);
1169
1170    object_property_add_str(obj, "primary_in",
1171                            compare_get_pri_indev, compare_set_pri_indev,
1172                            NULL);
1173    object_property_add_str(obj, "secondary_in",
1174                            compare_get_sec_indev, compare_set_sec_indev,
1175                            NULL);
1176    object_property_add_str(obj, "outdev",
1177                            compare_get_outdev, compare_set_outdev,
1178                            NULL);
1179    object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
1180                            (Object **)&s->iothread,
1181                            object_property_allow_set_link,
1182                            OBJ_PROP_LINK_STRONG, NULL);
1183    /* This parameter just for Xen COLO */
1184    object_property_add_str(obj, "notify_dev",
1185                            compare_get_notify_dev, compare_set_notify_dev,
1186                            NULL);
1187
1188    s->vnet_hdr = false;
1189    object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
1190                             compare_set_vnet_hdr, NULL);
1191}
1192
1193static void colo_compare_finalize(Object *obj)
1194{
1195    CompareState *s = COLO_COMPARE(obj);
1196    CompareState *tmp = NULL;
1197
1198    qemu_chr_fe_deinit(&s->chr_pri_in, false);
1199    qemu_chr_fe_deinit(&s->chr_sec_in, false);
1200    qemu_chr_fe_deinit(&s->chr_out, false);
1201    if (s->notify_dev) {
1202        qemu_chr_fe_deinit(&s->chr_notify_dev, false);
1203    }
1204
1205    if (s->iothread) {
1206        colo_compare_timer_del(s);
1207    }
1208
1209    qemu_bh_delete(s->event_bh);
1210
1211    QTAILQ_FOREACH(tmp, &net_compares, next) {
1212        if (tmp == s) {
1213            QTAILQ_REMOVE(&net_compares, s, next);
1214            break;
1215        }
1216    }
1217
1218    /* Release all unhandled packets after compare thead exited */
1219    g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1220
1221    g_queue_clear(&s->conn_list);
1222
1223    if (s->connection_track_table) {
1224        g_hash_table_destroy(s->connection_track_table);
1225    }
1226
1227    if (s->iothread) {
1228        object_unref(OBJECT(s->iothread));
1229    }
1230
1231    qemu_mutex_destroy(&event_mtx);
1232    qemu_cond_destroy(&event_complete_cond);
1233
1234    g_free(s->pri_indev);
1235    g_free(s->sec_indev);
1236    g_free(s->outdev);
1237    g_free(s->notify_dev);
1238}
1239
1240static const TypeInfo colo_compare_info = {
1241    .name = TYPE_COLO_COMPARE,
1242    .parent = TYPE_OBJECT,
1243    .instance_size = sizeof(CompareState),
1244    .instance_init = colo_compare_init,
1245    .instance_finalize = colo_compare_finalize,
1246    .class_size = sizeof(CompareClass),
1247    .class_init = colo_compare_class_init,
1248    .interfaces = (InterfaceInfo[]) {
1249        { TYPE_USER_CREATABLE },
1250        { }
1251    }
1252};
1253
1254static void register_types(void)
1255{
1256    type_register_static(&colo_compare_info);
1257}
1258
1259type_init(register_types);
1260