qemu/migration/colo.c
<<
>>
Prefs
   1/*
   2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
   3 * (a.k.a. Fault Tolerance or Continuous Replication)
   4 *
   5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 * Copyright (c) 2016 Intel Corporation
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or
  10 * later.  See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include "sysemu/sysemu.h"
  15#include "qapi/error.h"
  16#include "qapi/qapi-commands-migration.h"
  17#include "migration.h"
  18#include "qemu-file.h"
  19#include "savevm.h"
  20#include "migration/colo.h"
  21#include "block.h"
  22#include "io/channel-buffer.h"
  23#include "trace.h"
  24#include "qemu/error-report.h"
  25#include "qemu/main-loop.h"
  26#include "qemu/rcu.h"
  27#include "migration/failover.h"
  28#include "migration/ram.h"
  29#include "block/replication.h"
  30#include "net/colo-compare.h"
  31#include "net/colo.h"
  32#include "block/block.h"
  33#include "qapi/qapi-events-migration.h"
  34#include "sysemu/cpus.h"
  35#include "sysemu/runstate.h"
  36#include "net/filter.h"
  37#include "options.h"
  38
  39static bool vmstate_loading;
  40static Notifier packets_compare_notifier;
  41
  42/* User need to know colo mode after COLO failover */
  43static COLOMode last_colo_mode;
  44
  45#define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
  46
  47bool migration_in_colo_state(void)
  48{
  49    MigrationState *s = migrate_get_current();
  50
  51    return (s->state == MIGRATION_STATUS_COLO);
  52}
  53
  54bool migration_incoming_in_colo_state(void)
  55{
  56    MigrationIncomingState *mis = migration_incoming_get_current();
  57
  58    return mis && (mis->state == MIGRATION_STATUS_COLO);
  59}
  60
  61static bool colo_runstate_is_stopped(void)
  62{
  63    return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
  64}
  65
  66static void colo_checkpoint_notify(void *opaque)
  67{
  68    MigrationState *s = opaque;
  69    int64_t next_notify_time;
  70
  71    qemu_event_set(&s->colo_checkpoint_event);
  72    s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
  73    next_notify_time = s->colo_checkpoint_time + migrate_checkpoint_delay();
  74    timer_mod(s->colo_delay_timer, next_notify_time);
  75}
  76
  77void colo_checkpoint_delay_set(void)
  78{
  79    if (migration_in_colo_state()) {
  80        colo_checkpoint_notify(migrate_get_current());
  81    }
  82}
  83
  84static void secondary_vm_do_failover(void)
  85{
  86/* COLO needs enable block-replication */
  87    int old_state;
  88    MigrationIncomingState *mis = migration_incoming_get_current();
  89    Error *local_err = NULL;
  90
  91    /* Can not do failover during the process of VM's loading VMstate, Or
  92     * it will break the secondary VM.
  93     */
  94    if (vmstate_loading) {
  95        old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
  96                        FAILOVER_STATUS_RELAUNCH);
  97        if (old_state != FAILOVER_STATUS_ACTIVE) {
  98            error_report("Unknown error while do failover for secondary VM,"
  99                         "old_state: %s", FailoverStatus_str(old_state));
 100        }
 101        return;
 102    }
 103
 104    migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
 105                      MIGRATION_STATUS_COMPLETED);
 106
 107    replication_stop_all(true, &local_err);
 108    if (local_err) {
 109        error_report_err(local_err);
 110        local_err = NULL;
 111    }
 112
 113    /* Notify all filters of all NIC to do checkpoint */
 114    colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err);
 115    if (local_err) {
 116        error_report_err(local_err);
 117    }
 118
 119    if (!autostart) {
 120        error_report("\"-S\" qemu option will be ignored in secondary side");
 121        /* recover runstate to normal migration finish state */
 122        autostart = true;
 123    }
 124    /*
 125     * Make sure COLO incoming thread not block in recv or send,
 126     * If mis->from_src_file and mis->to_src_file use the same fd,
 127     * The second shutdown() will return -1, we ignore this value,
 128     * It is harmless.
 129     */
 130    if (mis->from_src_file) {
 131        qemu_file_shutdown(mis->from_src_file);
 132    }
 133    if (mis->to_src_file) {
 134        qemu_file_shutdown(mis->to_src_file);
 135    }
 136
 137    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 138                                   FAILOVER_STATUS_COMPLETED);
 139    if (old_state != FAILOVER_STATUS_ACTIVE) {
 140        error_report("Incorrect state (%s) while doing failover for "
 141                     "secondary VM", FailoverStatus_str(old_state));
 142        return;
 143    }
 144    /* Notify COLO incoming thread that failover work is finished */
 145    qemu_sem_post(&mis->colo_incoming_sem);
 146
 147    /* For Secondary VM, jump to incoming co */
 148    if (mis->colo_incoming_co) {
 149        qemu_coroutine_enter(mis->colo_incoming_co);
 150    }
 151}
 152
 153static void primary_vm_do_failover(void)
 154{
 155    MigrationState *s = migrate_get_current();
 156    int old_state;
 157    Error *local_err = NULL;
 158
 159    migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
 160                      MIGRATION_STATUS_COMPLETED);
 161    /*
 162     * kick COLO thread which might wait at
 163     * qemu_sem_wait(&s->colo_checkpoint_sem).
 164     */
 165    colo_checkpoint_notify(s);
 166
 167    /*
 168     * Wake up COLO thread which may blocked in recv() or send(),
 169     * The s->rp_state.from_dst_file and s->to_dst_file may use the
 170     * same fd, but we still shutdown the fd for twice, it is harmless.
 171     */
 172    if (s->to_dst_file) {
 173        qemu_file_shutdown(s->to_dst_file);
 174    }
 175    if (s->rp_state.from_dst_file) {
 176        qemu_file_shutdown(s->rp_state.from_dst_file);
 177    }
 178
 179    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 180                                   FAILOVER_STATUS_COMPLETED);
 181    if (old_state != FAILOVER_STATUS_ACTIVE) {
 182        error_report("Incorrect state (%s) while doing failover for Primary VM",
 183                     FailoverStatus_str(old_state));
 184        return;
 185    }
 186
 187    replication_stop_all(true, &local_err);
 188    if (local_err) {
 189        error_report_err(local_err);
 190        local_err = NULL;
 191    }
 192
 193    /* Notify COLO thread that failover work is finished */
 194    qemu_sem_post(&s->colo_exit_sem);
 195}
 196
 197COLOMode get_colo_mode(void)
 198{
 199    if (migration_in_colo_state()) {
 200        return COLO_MODE_PRIMARY;
 201    } else if (migration_incoming_in_colo_state()) {
 202        return COLO_MODE_SECONDARY;
 203    } else {
 204        return COLO_MODE_NONE;
 205    }
 206}
 207
 208void colo_do_failover(void)
 209{
 210    /* Make sure VM stopped while failover happened. */
 211    if (!colo_runstate_is_stopped()) {
 212        vm_stop_force_state(RUN_STATE_COLO);
 213    }
 214
 215    switch (last_colo_mode = get_colo_mode()) {
 216    case COLO_MODE_PRIMARY:
 217        primary_vm_do_failover();
 218        break;
 219    case COLO_MODE_SECONDARY:
 220        secondary_vm_do_failover();
 221        break;
 222    default:
 223        error_report("colo_do_failover failed because the colo mode"
 224                     " could not be obtained");
 225    }
 226}
 227
 228void qmp_xen_set_replication(bool enable, bool primary,
 229                             bool has_failover, bool failover,
 230                             Error **errp)
 231{
 232    ReplicationMode mode = primary ?
 233                           REPLICATION_MODE_PRIMARY :
 234                           REPLICATION_MODE_SECONDARY;
 235
 236    if (has_failover && enable) {
 237        error_setg(errp, "Parameter 'failover' is only for"
 238                   " stopping replication");
 239        return;
 240    }
 241
 242    if (enable) {
 243        replication_start_all(mode, errp);
 244    } else {
 245        if (!has_failover) {
 246            failover = NULL;
 247        }
 248        replication_stop_all(failover, failover ? NULL : errp);
 249    }
 250}
 251
 252ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
 253{
 254    Error *err = NULL;
 255    ReplicationStatus *s = g_new0(ReplicationStatus, 1);
 256
 257    replication_get_error_all(&err);
 258    if (err) {
 259        s->error = true;
 260        s->desc = g_strdup(error_get_pretty(err));
 261    } else {
 262        s->error = false;
 263    }
 264
 265    error_free(err);
 266    return s;
 267}
 268
 269void qmp_xen_colo_do_checkpoint(Error **errp)
 270{
 271    Error *err = NULL;
 272
 273    replication_do_checkpoint_all(&err);
 274    if (err) {
 275        error_propagate(errp, err);
 276        return;
 277    }
 278    /* Notify all filters of all NIC to do checkpoint */
 279    colo_notify_filters_event(COLO_EVENT_CHECKPOINT, errp);
 280}
 281
 282COLOStatus *qmp_query_colo_status(Error **errp)
 283{
 284    COLOStatus *s = g_new0(COLOStatus, 1);
 285
 286    s->mode = get_colo_mode();
 287    s->last_mode = last_colo_mode;
 288
 289    switch (failover_get_state()) {
 290    case FAILOVER_STATUS_NONE:
 291        s->reason = COLO_EXIT_REASON_NONE;
 292        break;
 293    case FAILOVER_STATUS_COMPLETED:
 294        s->reason = COLO_EXIT_REASON_REQUEST;
 295        break;
 296    default:
 297        if (migration_in_colo_state()) {
 298            s->reason = COLO_EXIT_REASON_PROCESSING;
 299        } else {
 300            s->reason = COLO_EXIT_REASON_ERROR;
 301        }
 302    }
 303
 304    return s;
 305}
 306
 307static void colo_send_message(QEMUFile *f, COLOMessage msg,
 308                              Error **errp)
 309{
 310    int ret;
 311
 312    if (msg >= COLO_MESSAGE__MAX) {
 313        error_setg(errp, "%s: Invalid message", __func__);
 314        return;
 315    }
 316    qemu_put_be32(f, msg);
 317    qemu_fflush(f);
 318
 319    ret = qemu_file_get_error(f);
 320    if (ret < 0) {
 321        error_setg_errno(errp, -ret, "Can't send COLO message");
 322    }
 323    trace_colo_send_message(COLOMessage_str(msg));
 324}
 325
 326static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
 327                                    uint64_t value, Error **errp)
 328{
 329    Error *local_err = NULL;
 330    int ret;
 331
 332    colo_send_message(f, msg, &local_err);
 333    if (local_err) {
 334        error_propagate(errp, local_err);
 335        return;
 336    }
 337    qemu_put_be64(f, value);
 338    qemu_fflush(f);
 339
 340    ret = qemu_file_get_error(f);
 341    if (ret < 0) {
 342        error_setg_errno(errp, -ret, "Failed to send value for message:%s",
 343                         COLOMessage_str(msg));
 344    }
 345}
 346
 347static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
 348{
 349    COLOMessage msg;
 350    int ret;
 351
 352    msg = qemu_get_be32(f);
 353    ret = qemu_file_get_error(f);
 354    if (ret < 0) {
 355        error_setg_errno(errp, -ret, "Can't receive COLO message");
 356        return msg;
 357    }
 358    if (msg >= COLO_MESSAGE__MAX) {
 359        error_setg(errp, "%s: Invalid message", __func__);
 360        return msg;
 361    }
 362    trace_colo_receive_message(COLOMessage_str(msg));
 363    return msg;
 364}
 365
 366static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
 367                                       Error **errp)
 368{
 369    COLOMessage msg;
 370    Error *local_err = NULL;
 371
 372    msg = colo_receive_message(f, &local_err);
 373    if (local_err) {
 374        error_propagate(errp, local_err);
 375        return;
 376    }
 377    if (msg != expect_msg) {
 378        error_setg(errp, "Unexpected COLO message %d, expected %d",
 379                          msg, expect_msg);
 380    }
 381}
 382
 383static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
 384                                           Error **errp)
 385{
 386    Error *local_err = NULL;
 387    uint64_t value;
 388    int ret;
 389
 390    colo_receive_check_message(f, expect_msg, &local_err);
 391    if (local_err) {
 392        error_propagate(errp, local_err);
 393        return 0;
 394    }
 395
 396    value = qemu_get_be64(f);
 397    ret = qemu_file_get_error(f);
 398    if (ret < 0) {
 399        error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
 400                         COLOMessage_str(expect_msg));
 401    }
 402    return value;
 403}
 404
 405static int colo_do_checkpoint_transaction(MigrationState *s,
 406                                          QIOChannelBuffer *bioc,
 407                                          QEMUFile *fb)
 408{
 409    Error *local_err = NULL;
 410    int ret = -1;
 411
 412    colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
 413                      &local_err);
 414    if (local_err) {
 415        goto out;
 416    }
 417
 418    colo_receive_check_message(s->rp_state.from_dst_file,
 419                    COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
 420    if (local_err) {
 421        goto out;
 422    }
 423    /* Reset channel-buffer directly */
 424    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 425    bioc->usage = 0;
 426
 427    qemu_mutex_lock_iothread();
 428    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 429        qemu_mutex_unlock_iothread();
 430        goto out;
 431    }
 432    vm_stop_force_state(RUN_STATE_COLO);
 433    qemu_mutex_unlock_iothread();
 434    trace_colo_vm_state_change("run", "stop");
 435    /*
 436     * Failover request bh could be called after vm_stop_force_state(),
 437     * So we need check failover_request_is_active() again.
 438     */
 439    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 440        goto out;
 441    }
 442    qemu_mutex_lock_iothread();
 443
 444    replication_do_checkpoint_all(&local_err);
 445    if (local_err) {
 446        qemu_mutex_unlock_iothread();
 447        goto out;
 448    }
 449
 450    colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
 451    if (local_err) {
 452        qemu_mutex_unlock_iothread();
 453        goto out;
 454    }
 455    /* Note: device state is saved into buffer */
 456    ret = qemu_save_device_state(fb);
 457
 458    qemu_mutex_unlock_iothread();
 459    if (ret < 0) {
 460        goto out;
 461    }
 462
 463    if (migrate_auto_converge()) {
 464        mig_throttle_counter_reset();
 465    }
 466    /*
 467     * Only save VM's live state, which not including device state.
 468     * TODO: We may need a timeout mechanism to prevent COLO process
 469     * to be blocked here.
 470     */
 471    qemu_savevm_live_state(s->to_dst_file);
 472
 473    qemu_fflush(fb);
 474
 475    /*
 476     * We need the size of the VMstate data in Secondary side,
 477     * With which we can decide how much data should be read.
 478     */
 479    colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
 480                            bioc->usage, &local_err);
 481    if (local_err) {
 482        goto out;
 483    }
 484
 485    qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
 486    qemu_fflush(s->to_dst_file);
 487    ret = qemu_file_get_error(s->to_dst_file);
 488    if (ret < 0) {
 489        goto out;
 490    }
 491
 492    colo_receive_check_message(s->rp_state.from_dst_file,
 493                       COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
 494    if (local_err) {
 495        goto out;
 496    }
 497
 498    qemu_event_reset(&s->colo_checkpoint_event);
 499    colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err);
 500    if (local_err) {
 501        goto out;
 502    }
 503
 504    colo_receive_check_message(s->rp_state.from_dst_file,
 505                       COLO_MESSAGE_VMSTATE_LOADED, &local_err);
 506    if (local_err) {
 507        goto out;
 508    }
 509
 510    ret = 0;
 511
 512    qemu_mutex_lock_iothread();
 513    vm_start();
 514    qemu_mutex_unlock_iothread();
 515    trace_colo_vm_state_change("stop", "run");
 516
 517out:
 518    if (local_err) {
 519        error_report_err(local_err);
 520    }
 521    return ret;
 522}
 523
 524static void colo_compare_notify_checkpoint(Notifier *notifier, void *data)
 525{
 526    colo_checkpoint_notify(data);
 527}
 528
 529static void colo_process_checkpoint(MigrationState *s)
 530{
 531    QIOChannelBuffer *bioc;
 532    QEMUFile *fb = NULL;
 533    Error *local_err = NULL;
 534    int ret;
 535
 536    if (get_colo_mode() != COLO_MODE_PRIMARY) {
 537        error_report("COLO mode must be COLO_MODE_PRIMARY");
 538        return;
 539    }
 540
 541    failover_init_state();
 542
 543    s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
 544    if (!s->rp_state.from_dst_file) {
 545        error_report("Open QEMUFile from_dst_file failed");
 546        goto out;
 547    }
 548
 549    packets_compare_notifier.notify = colo_compare_notify_checkpoint;
 550    colo_compare_register_notifier(&packets_compare_notifier);
 551
 552    /*
 553     * Wait for Secondary finish loading VM states and enter COLO
 554     * restore.
 555     */
 556    colo_receive_check_message(s->rp_state.from_dst_file,
 557                       COLO_MESSAGE_CHECKPOINT_READY, &local_err);
 558    if (local_err) {
 559        goto out;
 560    }
 561    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 562    fb = qemu_file_new_output(QIO_CHANNEL(bioc));
 563    object_unref(OBJECT(bioc));
 564
 565    qemu_mutex_lock_iothread();
 566    replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
 567    if (local_err) {
 568        qemu_mutex_unlock_iothread();
 569        goto out;
 570    }
 571
 572    vm_start();
 573    qemu_mutex_unlock_iothread();
 574    trace_colo_vm_state_change("stop", "run");
 575
 576    timer_mod(s->colo_delay_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) +
 577              migrate_checkpoint_delay());
 578
 579    while (s->state == MIGRATION_STATUS_COLO) {
 580        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 581            error_report("failover request");
 582            goto out;
 583        }
 584
 585        qemu_event_wait(&s->colo_checkpoint_event);
 586
 587        if (s->state != MIGRATION_STATUS_COLO) {
 588            goto out;
 589        }
 590        ret = colo_do_checkpoint_transaction(s, bioc, fb);
 591        if (ret < 0) {
 592            goto out;
 593        }
 594    }
 595
 596out:
 597    /* Throw the unreported error message after exited from loop */
 598    if (local_err) {
 599        error_report_err(local_err);
 600    }
 601
 602    if (fb) {
 603        qemu_fclose(fb);
 604    }
 605
 606    /*
 607     * There are only two reasons we can get here, some error happened
 608     * or the user triggered failover.
 609     */
 610    switch (failover_get_state()) {
 611    case FAILOVER_STATUS_COMPLETED:
 612        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 613                                  COLO_EXIT_REASON_REQUEST);
 614        break;
 615    default:
 616        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 617                                  COLO_EXIT_REASON_ERROR);
 618    }
 619
 620    /* Hope this not to be too long to wait here */
 621    qemu_sem_wait(&s->colo_exit_sem);
 622    qemu_sem_destroy(&s->colo_exit_sem);
 623
 624    /*
 625     * It is safe to unregister notifier after failover finished.
 626     * Besides, colo_delay_timer and colo_checkpoint_sem can't be
 627     * released before unregister notifier, or there will be use-after-free
 628     * error.
 629     */
 630    colo_compare_unregister_notifier(&packets_compare_notifier);
 631    timer_free(s->colo_delay_timer);
 632    qemu_event_destroy(&s->colo_checkpoint_event);
 633
 634    /*
 635     * Must be called after failover BH is completed,
 636     * Or the failover BH may shutdown the wrong fd that
 637     * re-used by other threads after we release here.
 638     */
 639    if (s->rp_state.from_dst_file) {
 640        qemu_fclose(s->rp_state.from_dst_file);
 641        s->rp_state.from_dst_file = NULL;
 642    }
 643}
 644
 645void migrate_start_colo_process(MigrationState *s)
 646{
 647    qemu_mutex_unlock_iothread();
 648    qemu_event_init(&s->colo_checkpoint_event, false);
 649    s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
 650                                colo_checkpoint_notify, s);
 651
 652    qemu_sem_init(&s->colo_exit_sem, 0);
 653    colo_process_checkpoint(s);
 654    qemu_mutex_lock_iothread();
 655}
 656
 657static void colo_incoming_process_checkpoint(MigrationIncomingState *mis,
 658                      QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
 659{
 660    uint64_t total_size;
 661    uint64_t value;
 662    Error *local_err = NULL;
 663    int ret;
 664
 665    qemu_mutex_lock_iothread();
 666    vm_stop_force_state(RUN_STATE_COLO);
 667    qemu_mutex_unlock_iothread();
 668    trace_colo_vm_state_change("run", "stop");
 669
 670    /* FIXME: This is unnecessary for periodic checkpoint mode */
 671    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
 672                 &local_err);
 673    if (local_err) {
 674        error_propagate(errp, local_err);
 675        return;
 676    }
 677
 678    colo_receive_check_message(mis->from_src_file,
 679                       COLO_MESSAGE_VMSTATE_SEND, &local_err);
 680    if (local_err) {
 681        error_propagate(errp, local_err);
 682        return;
 683    }
 684
 685    qemu_mutex_lock_iothread();
 686    cpu_synchronize_all_states();
 687    ret = qemu_loadvm_state_main(mis->from_src_file, mis);
 688    qemu_mutex_unlock_iothread();
 689
 690    if (ret < 0) {
 691        error_setg(errp, "Load VM's live state (ram) error");
 692        return;
 693    }
 694
 695    value = colo_receive_message_value(mis->from_src_file,
 696                             COLO_MESSAGE_VMSTATE_SIZE, &local_err);
 697    if (local_err) {
 698        error_propagate(errp, local_err);
 699        return;
 700    }
 701
 702    /*
 703     * Read VM device state data into channel buffer,
 704     * It's better to re-use the memory allocated.
 705     * Here we need to handle the channel buffer directly.
 706     */
 707    if (value > bioc->capacity) {
 708        bioc->capacity = value;
 709        bioc->data = g_realloc(bioc->data, bioc->capacity);
 710    }
 711    total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
 712    if (total_size != value) {
 713        error_setg(errp, "Got %" PRIu64 " VMState data, less than expected"
 714                    " %" PRIu64, total_size, value);
 715        return;
 716    }
 717    bioc->usage = total_size;
 718    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 719
 720    colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
 721                 &local_err);
 722    if (local_err) {
 723        error_propagate(errp, local_err);
 724        return;
 725    }
 726
 727    qemu_mutex_lock_iothread();
 728    vmstate_loading = true;
 729    colo_flush_ram_cache();
 730    ret = qemu_load_device_state(fb);
 731    if (ret < 0) {
 732        error_setg(errp, "COLO: load device state failed");
 733        vmstate_loading = false;
 734        qemu_mutex_unlock_iothread();
 735        return;
 736    }
 737
 738    replication_get_error_all(&local_err);
 739    if (local_err) {
 740        error_propagate(errp, local_err);
 741        vmstate_loading = false;
 742        qemu_mutex_unlock_iothread();
 743        return;
 744    }
 745
 746    /* discard colo disk buffer */
 747    replication_do_checkpoint_all(&local_err);
 748    if (local_err) {
 749        error_propagate(errp, local_err);
 750        vmstate_loading = false;
 751        qemu_mutex_unlock_iothread();
 752        return;
 753    }
 754    /* Notify all filters of all NIC to do checkpoint */
 755    colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
 756
 757    if (local_err) {
 758        error_propagate(errp, local_err);
 759        vmstate_loading = false;
 760        qemu_mutex_unlock_iothread();
 761        return;
 762    }
 763
 764    vmstate_loading = false;
 765    vm_start();
 766    qemu_mutex_unlock_iothread();
 767    trace_colo_vm_state_change("stop", "run");
 768
 769    if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 770        return;
 771    }
 772
 773    colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
 774                 &local_err);
 775    error_propagate(errp, local_err);
 776}
 777
 778static void colo_wait_handle_message(MigrationIncomingState *mis,
 779                QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
 780{
 781    COLOMessage msg;
 782    Error *local_err = NULL;
 783
 784    msg = colo_receive_message(mis->from_src_file, &local_err);
 785    if (local_err) {
 786        error_propagate(errp, local_err);
 787        return;
 788    }
 789
 790    switch (msg) {
 791    case COLO_MESSAGE_CHECKPOINT_REQUEST:
 792        colo_incoming_process_checkpoint(mis, fb, bioc, errp);
 793        break;
 794    default:
 795        error_setg(errp, "Got unknown COLO message: %d", msg);
 796        break;
 797    }
 798}
 799
 800void colo_shutdown(void)
 801{
 802    MigrationIncomingState *mis = NULL;
 803    MigrationState *s = NULL;
 804
 805    switch (get_colo_mode()) {
 806    case COLO_MODE_PRIMARY:
 807        s = migrate_get_current();
 808        qemu_event_set(&s->colo_checkpoint_event);
 809        qemu_sem_post(&s->colo_exit_sem);
 810        break;
 811    case COLO_MODE_SECONDARY:
 812        mis = migration_incoming_get_current();
 813        qemu_sem_post(&mis->colo_incoming_sem);
 814        break;
 815    default:
 816        break;
 817    }
 818}
 819
 820static void *colo_process_incoming_thread(void *opaque)
 821{
 822    MigrationIncomingState *mis = opaque;
 823    QEMUFile *fb = NULL;
 824    QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
 825    Error *local_err = NULL;
 826
 827    rcu_register_thread();
 828    qemu_sem_init(&mis->colo_incoming_sem, 0);
 829
 830    migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
 831                      MIGRATION_STATUS_COLO);
 832
 833    if (get_colo_mode() != COLO_MODE_SECONDARY) {
 834        error_report("COLO mode must be COLO_MODE_SECONDARY");
 835        return NULL;
 836    }
 837
 838    failover_init_state();
 839
 840    mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
 841    if (!mis->to_src_file) {
 842        error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
 843        goto out;
 844    }
 845    /*
 846     * Note: the communication between Primary side and Secondary side
 847     * should be sequential, we set the fd to unblocked in migration incoming
 848     * coroutine, and here we are in the COLO incoming thread, so it is ok to
 849     * set the fd back to blocked.
 850     */
 851    qemu_file_set_blocking(mis->from_src_file, true);
 852
 853    colo_incoming_start_dirty_log();
 854
 855    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 856    fb = qemu_file_new_input(QIO_CHANNEL(bioc));
 857    object_unref(OBJECT(bioc));
 858
 859    qemu_mutex_lock_iothread();
 860    replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
 861    if (local_err) {
 862        qemu_mutex_unlock_iothread();
 863        goto out;
 864    }
 865    vm_start();
 866    qemu_mutex_unlock_iothread();
 867    trace_colo_vm_state_change("stop", "run");
 868
 869    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
 870                      &local_err);
 871    if (local_err) {
 872        goto out;
 873    }
 874
 875    while (mis->state == MIGRATION_STATUS_COLO) {
 876        colo_wait_handle_message(mis, fb, bioc, &local_err);
 877        if (local_err) {
 878            error_report_err(local_err);
 879            break;
 880        }
 881
 882        if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 883            failover_set_state(FAILOVER_STATUS_RELAUNCH,
 884                            FAILOVER_STATUS_NONE);
 885            failover_request_active(NULL);
 886            break;
 887        }
 888
 889        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 890            error_report("failover request");
 891            break;
 892        }
 893    }
 894
 895out:
 896    /*
 897     * There are only two reasons we can get here, some error happened
 898     * or the user triggered failover.
 899     */
 900    switch (failover_get_state()) {
 901    case FAILOVER_STATUS_COMPLETED:
 902        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 903                                  COLO_EXIT_REASON_REQUEST);
 904        break;
 905    default:
 906        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 907                                  COLO_EXIT_REASON_ERROR);
 908    }
 909
 910    if (fb) {
 911        qemu_fclose(fb);
 912    }
 913
 914    /* Hope this not to be too long to loop here */
 915    qemu_sem_wait(&mis->colo_incoming_sem);
 916    qemu_sem_destroy(&mis->colo_incoming_sem);
 917
 918    rcu_unregister_thread();
 919    return NULL;
 920}
 921
 922int coroutine_fn colo_incoming_co(void)
 923{
 924    MigrationIncomingState *mis = migration_incoming_get_current();
 925    Error *local_err = NULL;
 926    QemuThread th;
 927
 928    assert(qemu_mutex_iothread_locked());
 929
 930    if (!migration_incoming_colo_enabled()) {
 931        return 0;
 932    }
 933
 934    /* Make sure all file formats throw away their mutable metadata */
 935    bdrv_activate_all(&local_err);
 936    if (local_err) {
 937        error_report_err(local_err);
 938        return -EINVAL;
 939    }
 940
 941    qemu_thread_create(&th, "COLO incoming", colo_process_incoming_thread,
 942                       mis, QEMU_THREAD_JOINABLE);
 943
 944    mis->colo_incoming_co = qemu_coroutine_self();
 945    qemu_coroutine_yield();
 946    mis->colo_incoming_co = NULL;
 947
 948    qemu_mutex_unlock_iothread();
 949    /* Wait checkpoint incoming thread exit before free resource */
 950    qemu_thread_join(&th);
 951    qemu_mutex_lock_iothread();
 952
 953    /* We hold the global iothread lock, so it is safe here */
 954    colo_release_ram_cache();
 955
 956    return 0;
 957}
 958