qemu/migration/colo.c
<<
>>
Prefs
   1/*
   2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
   3 * (a.k.a. Fault Tolerance or Continuous Replication)
   4 *
   5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 * Copyright (c) 2016 Intel Corporation
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or
  10 * later.  See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include "sysemu/sysemu.h"
  15#include "qapi/error.h"
  16#include "qapi/qapi-commands-migration.h"
  17#include "qemu-file-channel.h"
  18#include "migration.h"
  19#include "qemu-file.h"
  20#include "savevm.h"
  21#include "migration/colo.h"
  22#include "block.h"
  23#include "io/channel-buffer.h"
  24#include "trace.h"
  25#include "qemu/error-report.h"
  26#include "migration/failover.h"
  27#include "replication.h"
  28#include "net/colo-compare.h"
  29#include "net/colo.h"
  30#include "block/block.h"
  31#include "qapi/qapi-events-migration.h"
  32#include "qapi/qmp/qerror.h"
  33#include "sysemu/cpus.h"
  34#include "net/filter.h"
  35
  36static bool vmstate_loading;
  37static Notifier packets_compare_notifier;
  38
  39#define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
  40
  41bool migration_in_colo_state(void)
  42{
  43    MigrationState *s = migrate_get_current();
  44
  45    return (s->state == MIGRATION_STATUS_COLO);
  46}
  47
  48bool migration_incoming_in_colo_state(void)
  49{
  50    MigrationIncomingState *mis = migration_incoming_get_current();
  51
  52    return mis && (mis->state == MIGRATION_STATUS_COLO);
  53}
  54
  55static bool colo_runstate_is_stopped(void)
  56{
  57    return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
  58}
  59
  60static void secondary_vm_do_failover(void)
  61{
  62/* COLO needs enable block-replication */
  63#ifdef CONFIG_REPLICATION
  64    int old_state;
  65    MigrationIncomingState *mis = migration_incoming_get_current();
  66    Error *local_err = NULL;
  67
  68    /* Can not do failover during the process of VM's loading VMstate, Or
  69     * it will break the secondary VM.
  70     */
  71    if (vmstate_loading) {
  72        old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
  73                        FAILOVER_STATUS_RELAUNCH);
  74        if (old_state != FAILOVER_STATUS_ACTIVE) {
  75            error_report("Unknown error while do failover for secondary VM,"
  76                         "old_state: %s", FailoverStatus_str(old_state));
  77        }
  78        return;
  79    }
  80
  81    migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
  82                      MIGRATION_STATUS_COMPLETED);
  83
  84    replication_stop_all(true, &local_err);
  85    if (local_err) {
  86        error_report_err(local_err);
  87    }
  88
  89    /* Notify all filters of all NIC to do checkpoint */
  90    colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err);
  91    if (local_err) {
  92        error_report_err(local_err);
  93    }
  94
  95    if (!autostart) {
  96        error_report("\"-S\" qemu option will be ignored in secondary side");
  97        /* recover runstate to normal migration finish state */
  98        autostart = true;
  99    }
 100    /*
 101     * Make sure COLO incoming thread not block in recv or send,
 102     * If mis->from_src_file and mis->to_src_file use the same fd,
 103     * The second shutdown() will return -1, we ignore this value,
 104     * It is harmless.
 105     */
 106    if (mis->from_src_file) {
 107        qemu_file_shutdown(mis->from_src_file);
 108    }
 109    if (mis->to_src_file) {
 110        qemu_file_shutdown(mis->to_src_file);
 111    }
 112
 113    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 114                                   FAILOVER_STATUS_COMPLETED);
 115    if (old_state != FAILOVER_STATUS_ACTIVE) {
 116        error_report("Incorrect state (%s) while doing failover for "
 117                     "secondary VM", FailoverStatus_str(old_state));
 118        return;
 119    }
 120    /* Notify COLO incoming thread that failover work is finished */
 121    qemu_sem_post(&mis->colo_incoming_sem);
 122    /* For Secondary VM, jump to incoming co */
 123    if (mis->migration_incoming_co) {
 124        qemu_coroutine_enter(mis->migration_incoming_co);
 125    }
 126#else
 127    abort();
 128#endif
 129}
 130
 131static void primary_vm_do_failover(void)
 132{
 133#ifdef CONFIG_REPLICATION
 134    MigrationState *s = migrate_get_current();
 135    int old_state;
 136    Error *local_err = NULL;
 137
 138    migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
 139                      MIGRATION_STATUS_COMPLETED);
 140    /*
 141     * kick COLO thread which might wait at
 142     * qemu_sem_wait(&s->colo_checkpoint_sem).
 143     */
 144    colo_checkpoint_notify(migrate_get_current());
 145
 146    /*
 147     * Wake up COLO thread which may blocked in recv() or send(),
 148     * The s->rp_state.from_dst_file and s->to_dst_file may use the
 149     * same fd, but we still shutdown the fd for twice, it is harmless.
 150     */
 151    if (s->to_dst_file) {
 152        qemu_file_shutdown(s->to_dst_file);
 153    }
 154    if (s->rp_state.from_dst_file) {
 155        qemu_file_shutdown(s->rp_state.from_dst_file);
 156    }
 157
 158    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 159                                   FAILOVER_STATUS_COMPLETED);
 160    if (old_state != FAILOVER_STATUS_ACTIVE) {
 161        error_report("Incorrect state (%s) while doing failover for Primary VM",
 162                     FailoverStatus_str(old_state));
 163        return;
 164    }
 165
 166    replication_stop_all(true, &local_err);
 167    if (local_err) {
 168        error_report_err(local_err);
 169        local_err = NULL;
 170    }
 171
 172    /* Notify COLO thread that failover work is finished */
 173    qemu_sem_post(&s->colo_exit_sem);
 174#else
 175    abort();
 176#endif
 177}
 178
 179COLOMode get_colo_mode(void)
 180{
 181    if (migration_in_colo_state()) {
 182        return COLO_MODE_PRIMARY;
 183    } else if (migration_incoming_in_colo_state()) {
 184        return COLO_MODE_SECONDARY;
 185    } else {
 186        return COLO_MODE_NONE;
 187    }
 188}
 189
 190void colo_do_failover(MigrationState *s)
 191{
 192    /* Make sure VM stopped while failover happened. */
 193    if (!colo_runstate_is_stopped()) {
 194        vm_stop_force_state(RUN_STATE_COLO);
 195    }
 196
 197    if (get_colo_mode() == COLO_MODE_PRIMARY) {
 198        primary_vm_do_failover();
 199    } else {
 200        secondary_vm_do_failover();
 201    }
 202}
 203
 204void qmp_xen_set_replication(bool enable, bool primary,
 205                             bool has_failover, bool failover,
 206                             Error **errp)
 207{
 208#ifdef CONFIG_REPLICATION
 209    ReplicationMode mode = primary ?
 210                           REPLICATION_MODE_PRIMARY :
 211                           REPLICATION_MODE_SECONDARY;
 212
 213    if (has_failover && enable) {
 214        error_setg(errp, "Parameter 'failover' is only for"
 215                   " stopping replication");
 216        return;
 217    }
 218
 219    if (enable) {
 220        replication_start_all(mode, errp);
 221    } else {
 222        if (!has_failover) {
 223            failover = NULL;
 224        }
 225        replication_stop_all(failover, failover ? NULL : errp);
 226    }
 227#else
 228    abort();
 229#endif
 230}
 231
 232ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
 233{
 234#ifdef CONFIG_REPLICATION
 235    Error *err = NULL;
 236    ReplicationStatus *s = g_new0(ReplicationStatus, 1);
 237
 238    replication_get_error_all(&err);
 239    if (err) {
 240        s->error = true;
 241        s->has_desc = true;
 242        s->desc = g_strdup(error_get_pretty(err));
 243    } else {
 244        s->error = false;
 245    }
 246
 247    error_free(err);
 248    return s;
 249#else
 250    abort();
 251#endif
 252}
 253
 254void qmp_xen_colo_do_checkpoint(Error **errp)
 255{
 256#ifdef CONFIG_REPLICATION
 257    replication_do_checkpoint_all(errp);
 258#else
 259    abort();
 260#endif
 261}
 262
 263COLOStatus *qmp_query_colo_status(Error **errp)
 264{
 265    COLOStatus *s = g_new0(COLOStatus, 1);
 266
 267    s->mode = get_colo_mode();
 268
 269    switch (failover_get_state()) {
 270    case FAILOVER_STATUS_NONE:
 271        s->reason = COLO_EXIT_REASON_NONE;
 272        break;
 273    case FAILOVER_STATUS_REQUIRE:
 274        s->reason = COLO_EXIT_REASON_REQUEST;
 275        break;
 276    default:
 277        s->reason = COLO_EXIT_REASON_ERROR;
 278    }
 279
 280    return s;
 281}
 282
 283static void colo_send_message(QEMUFile *f, COLOMessage msg,
 284                              Error **errp)
 285{
 286    int ret;
 287
 288    if (msg >= COLO_MESSAGE__MAX) {
 289        error_setg(errp, "%s: Invalid message", __func__);
 290        return;
 291    }
 292    qemu_put_be32(f, msg);
 293    qemu_fflush(f);
 294
 295    ret = qemu_file_get_error(f);
 296    if (ret < 0) {
 297        error_setg_errno(errp, -ret, "Can't send COLO message");
 298    }
 299    trace_colo_send_message(COLOMessage_str(msg));
 300}
 301
 302static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
 303                                    uint64_t value, Error **errp)
 304{
 305    Error *local_err = NULL;
 306    int ret;
 307
 308    colo_send_message(f, msg, &local_err);
 309    if (local_err) {
 310        error_propagate(errp, local_err);
 311        return;
 312    }
 313    qemu_put_be64(f, value);
 314    qemu_fflush(f);
 315
 316    ret = qemu_file_get_error(f);
 317    if (ret < 0) {
 318        error_setg_errno(errp, -ret, "Failed to send value for message:%s",
 319                         COLOMessage_str(msg));
 320    }
 321}
 322
 323static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
 324{
 325    COLOMessage msg;
 326    int ret;
 327
 328    msg = qemu_get_be32(f);
 329    ret = qemu_file_get_error(f);
 330    if (ret < 0) {
 331        error_setg_errno(errp, -ret, "Can't receive COLO message");
 332        return msg;
 333    }
 334    if (msg >= COLO_MESSAGE__MAX) {
 335        error_setg(errp, "%s: Invalid message", __func__);
 336        return msg;
 337    }
 338    trace_colo_receive_message(COLOMessage_str(msg));
 339    return msg;
 340}
 341
 342static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
 343                                       Error **errp)
 344{
 345    COLOMessage msg;
 346    Error *local_err = NULL;
 347
 348    msg = colo_receive_message(f, &local_err);
 349    if (local_err) {
 350        error_propagate(errp, local_err);
 351        return;
 352    }
 353    if (msg != expect_msg) {
 354        error_setg(errp, "Unexpected COLO message %d, expected %d",
 355                          msg, expect_msg);
 356    }
 357}
 358
 359static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
 360                                           Error **errp)
 361{
 362    Error *local_err = NULL;
 363    uint64_t value;
 364    int ret;
 365
 366    colo_receive_check_message(f, expect_msg, &local_err);
 367    if (local_err) {
 368        error_propagate(errp, local_err);
 369        return 0;
 370    }
 371
 372    value = qemu_get_be64(f);
 373    ret = qemu_file_get_error(f);
 374    if (ret < 0) {
 375        error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
 376                         COLOMessage_str(expect_msg));
 377    }
 378    return value;
 379}
 380
 381static int colo_do_checkpoint_transaction(MigrationState *s,
 382                                          QIOChannelBuffer *bioc,
 383                                          QEMUFile *fb)
 384{
 385    Error *local_err = NULL;
 386    int ret = -1;
 387
 388    colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
 389                      &local_err);
 390    if (local_err) {
 391        goto out;
 392    }
 393
 394    colo_receive_check_message(s->rp_state.from_dst_file,
 395                    COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
 396    if (local_err) {
 397        goto out;
 398    }
 399    /* Reset channel-buffer directly */
 400    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 401    bioc->usage = 0;
 402
 403    qemu_mutex_lock_iothread();
 404    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 405        qemu_mutex_unlock_iothread();
 406        goto out;
 407    }
 408    vm_stop_force_state(RUN_STATE_COLO);
 409    qemu_mutex_unlock_iothread();
 410    trace_colo_vm_state_change("run", "stop");
 411    /*
 412     * Failover request bh could be called after vm_stop_force_state(),
 413     * So we need check failover_request_is_active() again.
 414     */
 415    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 416        goto out;
 417    }
 418
 419    colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err);
 420    if (local_err) {
 421        goto out;
 422    }
 423
 424    /* Disable block migration */
 425    migrate_set_block_enabled(false, &local_err);
 426    qemu_mutex_lock_iothread();
 427
 428#ifdef CONFIG_REPLICATION
 429    replication_do_checkpoint_all(&local_err);
 430    if (local_err) {
 431        qemu_mutex_unlock_iothread();
 432        goto out;
 433    }
 434#else
 435        abort();
 436#endif
 437
 438    colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
 439    if (local_err) {
 440        qemu_mutex_unlock_iothread();
 441        goto out;
 442    }
 443    /* Note: device state is saved into buffer */
 444    ret = qemu_save_device_state(fb);
 445
 446    qemu_mutex_unlock_iothread();
 447    if (ret < 0) {
 448        goto out;
 449    }
 450    /*
 451     * Only save VM's live state, which not including device state.
 452     * TODO: We may need a timeout mechanism to prevent COLO process
 453     * to be blocked here.
 454     */
 455    qemu_savevm_live_state(s->to_dst_file);
 456
 457    qemu_fflush(fb);
 458
 459    /*
 460     * We need the size of the VMstate data in Secondary side,
 461     * With which we can decide how much data should be read.
 462     */
 463    colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
 464                            bioc->usage, &local_err);
 465    if (local_err) {
 466        goto out;
 467    }
 468
 469    qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
 470    qemu_fflush(s->to_dst_file);
 471    ret = qemu_file_get_error(s->to_dst_file);
 472    if (ret < 0) {
 473        goto out;
 474    }
 475
 476    colo_receive_check_message(s->rp_state.from_dst_file,
 477                       COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
 478    if (local_err) {
 479        goto out;
 480    }
 481
 482    colo_receive_check_message(s->rp_state.from_dst_file,
 483                       COLO_MESSAGE_VMSTATE_LOADED, &local_err);
 484    if (local_err) {
 485        goto out;
 486    }
 487
 488    ret = 0;
 489
 490    qemu_mutex_lock_iothread();
 491    vm_start();
 492    qemu_mutex_unlock_iothread();
 493    trace_colo_vm_state_change("stop", "run");
 494
 495out:
 496    if (local_err) {
 497        error_report_err(local_err);
 498    }
 499    return ret;
 500}
 501
 502static void colo_compare_notify_checkpoint(Notifier *notifier, void *data)
 503{
 504    colo_checkpoint_notify(data);
 505}
 506
 507static void colo_process_checkpoint(MigrationState *s)
 508{
 509    QIOChannelBuffer *bioc;
 510    QEMUFile *fb = NULL;
 511    int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 512    Error *local_err = NULL;
 513    int ret;
 514
 515    failover_init_state();
 516
 517    s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
 518    if (!s->rp_state.from_dst_file) {
 519        error_report("Open QEMUFile from_dst_file failed");
 520        goto out;
 521    }
 522
 523    packets_compare_notifier.notify = colo_compare_notify_checkpoint;
 524    colo_compare_register_notifier(&packets_compare_notifier);
 525
 526    /*
 527     * Wait for Secondary finish loading VM states and enter COLO
 528     * restore.
 529     */
 530    colo_receive_check_message(s->rp_state.from_dst_file,
 531                       COLO_MESSAGE_CHECKPOINT_READY, &local_err);
 532    if (local_err) {
 533        goto out;
 534    }
 535    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 536    fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
 537    object_unref(OBJECT(bioc));
 538
 539    qemu_mutex_lock_iothread();
 540#ifdef CONFIG_REPLICATION
 541    replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
 542    if (local_err) {
 543        qemu_mutex_unlock_iothread();
 544        goto out;
 545    }
 546#else
 547        abort();
 548#endif
 549
 550    vm_start();
 551    qemu_mutex_unlock_iothread();
 552    trace_colo_vm_state_change("stop", "run");
 553
 554    timer_mod(s->colo_delay_timer,
 555            current_time + s->parameters.x_checkpoint_delay);
 556
 557    while (s->state == MIGRATION_STATUS_COLO) {
 558        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 559            error_report("failover request");
 560            goto out;
 561        }
 562
 563        qemu_sem_wait(&s->colo_checkpoint_sem);
 564
 565        if (s->state != MIGRATION_STATUS_COLO) {
 566            goto out;
 567        }
 568        ret = colo_do_checkpoint_transaction(s, bioc, fb);
 569        if (ret < 0) {
 570            goto out;
 571        }
 572    }
 573
 574out:
 575    /* Throw the unreported error message after exited from loop */
 576    if (local_err) {
 577        error_report_err(local_err);
 578    }
 579
 580    if (fb) {
 581        qemu_fclose(fb);
 582    }
 583
 584    /*
 585     * There are only two reasons we can get here, some error happened
 586     * or the user triggered failover.
 587     */
 588    switch (failover_get_state()) {
 589    case FAILOVER_STATUS_NONE:
 590        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 591                                  COLO_EXIT_REASON_ERROR);
 592        break;
 593    case FAILOVER_STATUS_REQUIRE:
 594        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 595                                  COLO_EXIT_REASON_REQUEST);
 596        break;
 597    default:
 598        abort();
 599    }
 600
 601    /* Hope this not to be too long to wait here */
 602    qemu_sem_wait(&s->colo_exit_sem);
 603    qemu_sem_destroy(&s->colo_exit_sem);
 604
 605    /*
 606     * It is safe to unregister notifier after failover finished.
 607     * Besides, colo_delay_timer and colo_checkpoint_sem can't be
 608     * released befor unregister notifier, or there will be use-after-free
 609     * error.
 610     */
 611    colo_compare_unregister_notifier(&packets_compare_notifier);
 612    timer_del(s->colo_delay_timer);
 613    timer_free(s->colo_delay_timer);
 614    qemu_sem_destroy(&s->colo_checkpoint_sem);
 615
 616    /*
 617     * Must be called after failover BH is completed,
 618     * Or the failover BH may shutdown the wrong fd that
 619     * re-used by other threads after we release here.
 620     */
 621    if (s->rp_state.from_dst_file) {
 622        qemu_fclose(s->rp_state.from_dst_file);
 623    }
 624}
 625
 626void colo_checkpoint_notify(void *opaque)
 627{
 628    MigrationState *s = opaque;
 629    int64_t next_notify_time;
 630
 631    qemu_sem_post(&s->colo_checkpoint_sem);
 632    s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 633    next_notify_time = s->colo_checkpoint_time +
 634                    s->parameters.x_checkpoint_delay;
 635    timer_mod(s->colo_delay_timer, next_notify_time);
 636}
 637
 638void migrate_start_colo_process(MigrationState *s)
 639{
 640    qemu_mutex_unlock_iothread();
 641    qemu_sem_init(&s->colo_checkpoint_sem, 0);
 642    s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
 643                                colo_checkpoint_notify, s);
 644
 645    qemu_sem_init(&s->colo_exit_sem, 0);
 646    migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
 647                      MIGRATION_STATUS_COLO);
 648    colo_process_checkpoint(s);
 649    qemu_mutex_lock_iothread();
 650}
 651
 652static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
 653                                     Error **errp)
 654{
 655    COLOMessage msg;
 656    Error *local_err = NULL;
 657
 658    msg = colo_receive_message(f, &local_err);
 659    if (local_err) {
 660        error_propagate(errp, local_err);
 661        return;
 662    }
 663
 664    switch (msg) {
 665    case COLO_MESSAGE_CHECKPOINT_REQUEST:
 666        *checkpoint_request = 1;
 667        break;
 668    default:
 669        *checkpoint_request = 0;
 670        error_setg(errp, "Got unknown COLO message: %d", msg);
 671        break;
 672    }
 673}
 674
 675void *colo_process_incoming_thread(void *opaque)
 676{
 677    MigrationIncomingState *mis = opaque;
 678    QEMUFile *fb = NULL;
 679    QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
 680    uint64_t total_size;
 681    uint64_t value;
 682    Error *local_err = NULL;
 683    int ret;
 684
 685    rcu_register_thread();
 686    qemu_sem_init(&mis->colo_incoming_sem, 0);
 687
 688    migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
 689                      MIGRATION_STATUS_COLO);
 690
 691    failover_init_state();
 692
 693    mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
 694    if (!mis->to_src_file) {
 695        error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
 696        goto out;
 697    }
 698    /*
 699     * Note: the communication between Primary side and Secondary side
 700     * should be sequential, we set the fd to unblocked in migration incoming
 701     * coroutine, and here we are in the COLO incoming thread, so it is ok to
 702     * set the fd back to blocked.
 703     */
 704    qemu_file_set_blocking(mis->from_src_file, true);
 705
 706    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 707    fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
 708    object_unref(OBJECT(bioc));
 709
 710    qemu_mutex_lock_iothread();
 711#ifdef CONFIG_REPLICATION
 712    replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
 713    if (local_err) {
 714        qemu_mutex_unlock_iothread();
 715        goto out;
 716    }
 717#else
 718        abort();
 719#endif
 720    vm_start();
 721    trace_colo_vm_state_change("stop", "run");
 722    qemu_mutex_unlock_iothread();
 723
 724    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
 725                      &local_err);
 726    if (local_err) {
 727        goto out;
 728    }
 729
 730    while (mis->state == MIGRATION_STATUS_COLO) {
 731        int request = 0;
 732
 733        colo_wait_handle_message(mis->from_src_file, &request, &local_err);
 734        if (local_err) {
 735            goto out;
 736        }
 737        assert(request);
 738        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 739            error_report("failover request");
 740            goto out;
 741        }
 742
 743        qemu_mutex_lock_iothread();
 744        vm_stop_force_state(RUN_STATE_COLO);
 745        trace_colo_vm_state_change("run", "stop");
 746        qemu_mutex_unlock_iothread();
 747
 748        /* FIXME: This is unnecessary for periodic checkpoint mode */
 749        colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
 750                     &local_err);
 751        if (local_err) {
 752            goto out;
 753        }
 754
 755        colo_receive_check_message(mis->from_src_file,
 756                           COLO_MESSAGE_VMSTATE_SEND, &local_err);
 757        if (local_err) {
 758            goto out;
 759        }
 760
 761        qemu_mutex_lock_iothread();
 762        cpu_synchronize_all_pre_loadvm();
 763        ret = qemu_loadvm_state_main(mis->from_src_file, mis);
 764        qemu_mutex_unlock_iothread();
 765
 766        if (ret < 0) {
 767            error_report("Load VM's live state (ram) error");
 768            goto out;
 769        }
 770
 771        value = colo_receive_message_value(mis->from_src_file,
 772                                 COLO_MESSAGE_VMSTATE_SIZE, &local_err);
 773        if (local_err) {
 774            goto out;
 775        }
 776
 777        /*
 778         * Read VM device state data into channel buffer,
 779         * It's better to re-use the memory allocated.
 780         * Here we need to handle the channel buffer directly.
 781         */
 782        if (value > bioc->capacity) {
 783            bioc->capacity = value;
 784            bioc->data = g_realloc(bioc->data, bioc->capacity);
 785        }
 786        total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
 787        if (total_size != value) {
 788            error_report("Got %" PRIu64 " VMState data, less than expected"
 789                        " %" PRIu64, total_size, value);
 790            goto out;
 791        }
 792        bioc->usage = total_size;
 793        qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 794
 795        colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
 796                     &local_err);
 797        if (local_err) {
 798            goto out;
 799        }
 800
 801        qemu_mutex_lock_iothread();
 802        vmstate_loading = true;
 803        ret = qemu_load_device_state(fb);
 804        if (ret < 0) {
 805            error_report("COLO: load device state failed");
 806            qemu_mutex_unlock_iothread();
 807            goto out;
 808        }
 809
 810#ifdef CONFIG_REPLICATION
 811        replication_get_error_all(&local_err);
 812        if (local_err) {
 813            qemu_mutex_unlock_iothread();
 814            goto out;
 815        }
 816
 817        /* discard colo disk buffer */
 818        replication_do_checkpoint_all(&local_err);
 819        if (local_err) {
 820            qemu_mutex_unlock_iothread();
 821            goto out;
 822        }
 823#else
 824        abort();
 825#endif
 826        /* Notify all filters of all NIC to do checkpoint */
 827        colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
 828
 829        if (local_err) {
 830            qemu_mutex_unlock_iothread();
 831            goto out;
 832        }
 833
 834        vmstate_loading = false;
 835        vm_start();
 836        trace_colo_vm_state_change("stop", "run");
 837        qemu_mutex_unlock_iothread();
 838
 839        if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 840            failover_set_state(FAILOVER_STATUS_RELAUNCH,
 841                            FAILOVER_STATUS_NONE);
 842            failover_request_active(NULL);
 843            goto out;
 844        }
 845
 846        colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
 847                     &local_err);
 848        if (local_err) {
 849            goto out;
 850        }
 851    }
 852
 853out:
 854    vmstate_loading = false;
 855    /* Throw the unreported error message after exited from loop */
 856    if (local_err) {
 857        error_report_err(local_err);
 858    }
 859
 860    switch (failover_get_state()) {
 861    case FAILOVER_STATUS_NONE:
 862        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 863                                  COLO_EXIT_REASON_ERROR);
 864        break;
 865    case FAILOVER_STATUS_REQUIRE:
 866        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 867                                  COLO_EXIT_REASON_REQUEST);
 868        break;
 869    default:
 870        abort();
 871    }
 872
 873    if (fb) {
 874        qemu_fclose(fb);
 875    }
 876
 877    /* Hope this not to be too long to loop here */
 878    qemu_sem_wait(&mis->colo_incoming_sem);
 879    qemu_sem_destroy(&mis->colo_incoming_sem);
 880    /* Must be called after failover BH is completed */
 881    if (mis->to_src_file) {
 882        qemu_fclose(mis->to_src_file);
 883    }
 884    migration_incoming_disable_colo();
 885
 886    rcu_unregister_thread();
 887    return NULL;
 888}
 889