qemu/migration/colo.c
<<
>>
Prefs
   1/*
   2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
   3 * (a.k.a. Fault Tolerance or Continuous Replication)
   4 *
   5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 * Copyright (c) 2016 Intel Corporation
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or
  10 * later.  See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include "sysemu/sysemu.h"
  15#include "qapi/error.h"
  16#include "qapi/qapi-commands-migration.h"
  17#include "qemu-file-channel.h"
  18#include "migration.h"
  19#include "qemu-file.h"
  20#include "savevm.h"
  21#include "migration/colo.h"
  22#include "block.h"
  23#include "io/channel-buffer.h"
  24#include "trace.h"
  25#include "qemu/error-report.h"
  26#include "migration/failover.h"
  27#ifdef CONFIG_REPLICATION
  28#include "replication.h"
  29#endif
  30#include "net/colo-compare.h"
  31#include "net/colo.h"
  32#include "block/block.h"
  33#include "qapi/qapi-events-migration.h"
  34#include "qapi/qmp/qerror.h"
  35#include "sysemu/cpus.h"
  36#include "net/filter.h"
  37
  38static bool vmstate_loading;
  39static Notifier packets_compare_notifier;
  40
  41/* User need to know colo mode after COLO failover */
  42static COLOMode last_colo_mode;
  43
  44#define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
  45
  46bool migration_in_colo_state(void)
  47{
  48    MigrationState *s = migrate_get_current();
  49
  50    return (s->state == MIGRATION_STATUS_COLO);
  51}
  52
  53bool migration_incoming_in_colo_state(void)
  54{
  55    MigrationIncomingState *mis = migration_incoming_get_current();
  56
  57    return mis && (mis->state == MIGRATION_STATUS_COLO);
  58}
  59
  60static bool colo_runstate_is_stopped(void)
  61{
  62    return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
  63}
  64
  65static void secondary_vm_do_failover(void)
  66{
  67/* COLO needs enable block-replication */
  68#ifdef CONFIG_REPLICATION
  69    int old_state;
  70    MigrationIncomingState *mis = migration_incoming_get_current();
  71    Error *local_err = NULL;
  72
  73    /* Can not do failover during the process of VM's loading VMstate, Or
  74     * it will break the secondary VM.
  75     */
  76    if (vmstate_loading) {
  77        old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
  78                        FAILOVER_STATUS_RELAUNCH);
  79        if (old_state != FAILOVER_STATUS_ACTIVE) {
  80            error_report("Unknown error while do failover for secondary VM,"
  81                         "old_state: %s", FailoverStatus_str(old_state));
  82        }
  83        return;
  84    }
  85
  86    migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
  87                      MIGRATION_STATUS_COMPLETED);
  88
  89    replication_stop_all(true, &local_err);
  90    if (local_err) {
  91        error_report_err(local_err);
  92    }
  93
  94    /* Notify all filters of all NIC to do checkpoint */
  95    colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err);
  96    if (local_err) {
  97        error_report_err(local_err);
  98    }
  99
 100    if (!autostart) {
 101        error_report("\"-S\" qemu option will be ignored in secondary side");
 102        /* recover runstate to normal migration finish state */
 103        autostart = true;
 104    }
 105    /*
 106     * Make sure COLO incoming thread not block in recv or send,
 107     * If mis->from_src_file and mis->to_src_file use the same fd,
 108     * The second shutdown() will return -1, we ignore this value,
 109     * It is harmless.
 110     */
 111    if (mis->from_src_file) {
 112        qemu_file_shutdown(mis->from_src_file);
 113    }
 114    if (mis->to_src_file) {
 115        qemu_file_shutdown(mis->to_src_file);
 116    }
 117
 118    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 119                                   FAILOVER_STATUS_COMPLETED);
 120    if (old_state != FAILOVER_STATUS_ACTIVE) {
 121        error_report("Incorrect state (%s) while doing failover for "
 122                     "secondary VM", FailoverStatus_str(old_state));
 123        return;
 124    }
 125    /* Notify COLO incoming thread that failover work is finished */
 126    qemu_sem_post(&mis->colo_incoming_sem);
 127
 128    /* For Secondary VM, jump to incoming co */
 129    if (mis->migration_incoming_co) {
 130        qemu_coroutine_enter(mis->migration_incoming_co);
 131    }
 132#else
 133    abort();
 134#endif
 135}
 136
 137static void primary_vm_do_failover(void)
 138{
 139#ifdef CONFIG_REPLICATION
 140    MigrationState *s = migrate_get_current();
 141    int old_state;
 142    Error *local_err = NULL;
 143
 144    migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
 145                      MIGRATION_STATUS_COMPLETED);
 146    /*
 147     * kick COLO thread which might wait at
 148     * qemu_sem_wait(&s->colo_checkpoint_sem).
 149     */
 150    colo_checkpoint_notify(migrate_get_current());
 151
 152    /*
 153     * Wake up COLO thread which may blocked in recv() or send(),
 154     * The s->rp_state.from_dst_file and s->to_dst_file may use the
 155     * same fd, but we still shutdown the fd for twice, it is harmless.
 156     */
 157    if (s->to_dst_file) {
 158        qemu_file_shutdown(s->to_dst_file);
 159    }
 160    if (s->rp_state.from_dst_file) {
 161        qemu_file_shutdown(s->rp_state.from_dst_file);
 162    }
 163
 164    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 165                                   FAILOVER_STATUS_COMPLETED);
 166    if (old_state != FAILOVER_STATUS_ACTIVE) {
 167        error_report("Incorrect state (%s) while doing failover for Primary VM",
 168                     FailoverStatus_str(old_state));
 169        return;
 170    }
 171
 172    replication_stop_all(true, &local_err);
 173    if (local_err) {
 174        error_report_err(local_err);
 175        local_err = NULL;
 176    }
 177
 178    /* Notify COLO thread that failover work is finished */
 179    qemu_sem_post(&s->colo_exit_sem);
 180#else
 181    abort();
 182#endif
 183}
 184
 185COLOMode get_colo_mode(void)
 186{
 187    if (migration_in_colo_state()) {
 188        return COLO_MODE_PRIMARY;
 189    } else if (migration_incoming_in_colo_state()) {
 190        return COLO_MODE_SECONDARY;
 191    } else {
 192        return COLO_MODE_NONE;
 193    }
 194}
 195
 196void colo_do_failover(MigrationState *s)
 197{
 198    /* Make sure VM stopped while failover happened. */
 199    if (!colo_runstate_is_stopped()) {
 200        vm_stop_force_state(RUN_STATE_COLO);
 201    }
 202
 203    switch (get_colo_mode()) {
 204    case COLO_MODE_PRIMARY:
 205        primary_vm_do_failover();
 206        break;
 207    case COLO_MODE_SECONDARY:
 208        secondary_vm_do_failover();
 209        break;
 210    default:
 211        error_report("colo_do_failover failed because the colo mode"
 212                     " could not be obtained");
 213    }
 214}
 215
 216#ifdef CONFIG_REPLICATION
 217void qmp_xen_set_replication(bool enable, bool primary,
 218                             bool has_failover, bool failover,
 219                             Error **errp)
 220{
 221    ReplicationMode mode = primary ?
 222                           REPLICATION_MODE_PRIMARY :
 223                           REPLICATION_MODE_SECONDARY;
 224
 225    if (has_failover && enable) {
 226        error_setg(errp, "Parameter 'failover' is only for"
 227                   " stopping replication");
 228        return;
 229    }
 230
 231    if (enable) {
 232        replication_start_all(mode, errp);
 233    } else {
 234        if (!has_failover) {
 235            failover = NULL;
 236        }
 237        replication_stop_all(failover, failover ? NULL : errp);
 238    }
 239}
 240
 241ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
 242{
 243    Error *err = NULL;
 244    ReplicationStatus *s = g_new0(ReplicationStatus, 1);
 245
 246    replication_get_error_all(&err);
 247    if (err) {
 248        s->error = true;
 249        s->has_desc = true;
 250        s->desc = g_strdup(error_get_pretty(err));
 251    } else {
 252        s->error = false;
 253    }
 254
 255    error_free(err);
 256    return s;
 257}
 258
 259void qmp_xen_colo_do_checkpoint(Error **errp)
 260{
 261    replication_do_checkpoint_all(errp);
 262}
 263#endif
 264
 265COLOStatus *qmp_query_colo_status(Error **errp)
 266{
 267    COLOStatus *s = g_new0(COLOStatus, 1);
 268
 269    s->mode = get_colo_mode();
 270    s->last_mode = last_colo_mode;
 271
 272    switch (failover_get_state()) {
 273    case FAILOVER_STATUS_NONE:
 274        s->reason = COLO_EXIT_REASON_NONE;
 275        break;
 276    case FAILOVER_STATUS_COMPLETED:
 277        s->reason = COLO_EXIT_REASON_REQUEST;
 278        break;
 279    default:
 280        if (migration_in_colo_state()) {
 281            s->reason = COLO_EXIT_REASON_PROCESSING;
 282        } else {
 283            s->reason = COLO_EXIT_REASON_ERROR;
 284        }
 285    }
 286
 287    return s;
 288}
 289
 290static void colo_send_message(QEMUFile *f, COLOMessage msg,
 291                              Error **errp)
 292{
 293    int ret;
 294
 295    if (msg >= COLO_MESSAGE__MAX) {
 296        error_setg(errp, "%s: Invalid message", __func__);
 297        return;
 298    }
 299    qemu_put_be32(f, msg);
 300    qemu_fflush(f);
 301
 302    ret = qemu_file_get_error(f);
 303    if (ret < 0) {
 304        error_setg_errno(errp, -ret, "Can't send COLO message");
 305    }
 306    trace_colo_send_message(COLOMessage_str(msg));
 307}
 308
 309static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
 310                                    uint64_t value, Error **errp)
 311{
 312    Error *local_err = NULL;
 313    int ret;
 314
 315    colo_send_message(f, msg, &local_err);
 316    if (local_err) {
 317        error_propagate(errp, local_err);
 318        return;
 319    }
 320    qemu_put_be64(f, value);
 321    qemu_fflush(f);
 322
 323    ret = qemu_file_get_error(f);
 324    if (ret < 0) {
 325        error_setg_errno(errp, -ret, "Failed to send value for message:%s",
 326                         COLOMessage_str(msg));
 327    }
 328}
 329
 330static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
 331{
 332    COLOMessage msg;
 333    int ret;
 334
 335    msg = qemu_get_be32(f);
 336    ret = qemu_file_get_error(f);
 337    if (ret < 0) {
 338        error_setg_errno(errp, -ret, "Can't receive COLO message");
 339        return msg;
 340    }
 341    if (msg >= COLO_MESSAGE__MAX) {
 342        error_setg(errp, "%s: Invalid message", __func__);
 343        return msg;
 344    }
 345    trace_colo_receive_message(COLOMessage_str(msg));
 346    return msg;
 347}
 348
 349static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
 350                                       Error **errp)
 351{
 352    COLOMessage msg;
 353    Error *local_err = NULL;
 354
 355    msg = colo_receive_message(f, &local_err);
 356    if (local_err) {
 357        error_propagate(errp, local_err);
 358        return;
 359    }
 360    if (msg != expect_msg) {
 361        error_setg(errp, "Unexpected COLO message %d, expected %d",
 362                          msg, expect_msg);
 363    }
 364}
 365
 366static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
 367                                           Error **errp)
 368{
 369    Error *local_err = NULL;
 370    uint64_t value;
 371    int ret;
 372
 373    colo_receive_check_message(f, expect_msg, &local_err);
 374    if (local_err) {
 375        error_propagate(errp, local_err);
 376        return 0;
 377    }
 378
 379    value = qemu_get_be64(f);
 380    ret = qemu_file_get_error(f);
 381    if (ret < 0) {
 382        error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
 383                         COLOMessage_str(expect_msg));
 384    }
 385    return value;
 386}
 387
 388static int colo_do_checkpoint_transaction(MigrationState *s,
 389                                          QIOChannelBuffer *bioc,
 390                                          QEMUFile *fb)
 391{
 392    Error *local_err = NULL;
 393    int ret = -1;
 394
 395    colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
 396                      &local_err);
 397    if (local_err) {
 398        goto out;
 399    }
 400
 401    colo_receive_check_message(s->rp_state.from_dst_file,
 402                    COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
 403    if (local_err) {
 404        goto out;
 405    }
 406    /* Reset channel-buffer directly */
 407    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 408    bioc->usage = 0;
 409
 410    qemu_mutex_lock_iothread();
 411    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 412        qemu_mutex_unlock_iothread();
 413        goto out;
 414    }
 415    vm_stop_force_state(RUN_STATE_COLO);
 416    qemu_mutex_unlock_iothread();
 417    trace_colo_vm_state_change("run", "stop");
 418    /*
 419     * Failover request bh could be called after vm_stop_force_state(),
 420     * So we need check failover_request_is_active() again.
 421     */
 422    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 423        goto out;
 424    }
 425
 426    colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err);
 427    if (local_err) {
 428        goto out;
 429    }
 430
 431    /* Disable block migration */
 432    migrate_set_block_enabled(false, &local_err);
 433    qemu_mutex_lock_iothread();
 434
 435#ifdef CONFIG_REPLICATION
 436    replication_do_checkpoint_all(&local_err);
 437    if (local_err) {
 438        qemu_mutex_unlock_iothread();
 439        goto out;
 440    }
 441#else
 442        abort();
 443#endif
 444
 445    colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
 446    if (local_err) {
 447        qemu_mutex_unlock_iothread();
 448        goto out;
 449    }
 450    /* Note: device state is saved into buffer */
 451    ret = qemu_save_device_state(fb);
 452
 453    qemu_mutex_unlock_iothread();
 454    if (ret < 0) {
 455        goto out;
 456    }
 457    /*
 458     * Only save VM's live state, which not including device state.
 459     * TODO: We may need a timeout mechanism to prevent COLO process
 460     * to be blocked here.
 461     */
 462    qemu_savevm_live_state(s->to_dst_file);
 463
 464    qemu_fflush(fb);
 465
 466    /*
 467     * We need the size of the VMstate data in Secondary side,
 468     * With which we can decide how much data should be read.
 469     */
 470    colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
 471                            bioc->usage, &local_err);
 472    if (local_err) {
 473        goto out;
 474    }
 475
 476    qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
 477    qemu_fflush(s->to_dst_file);
 478    ret = qemu_file_get_error(s->to_dst_file);
 479    if (ret < 0) {
 480        goto out;
 481    }
 482
 483    colo_receive_check_message(s->rp_state.from_dst_file,
 484                       COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
 485    if (local_err) {
 486        goto out;
 487    }
 488
 489    colo_receive_check_message(s->rp_state.from_dst_file,
 490                       COLO_MESSAGE_VMSTATE_LOADED, &local_err);
 491    if (local_err) {
 492        goto out;
 493    }
 494
 495    ret = 0;
 496
 497    qemu_mutex_lock_iothread();
 498    vm_start();
 499    qemu_mutex_unlock_iothread();
 500    trace_colo_vm_state_change("stop", "run");
 501
 502out:
 503    if (local_err) {
 504        error_report_err(local_err);
 505    }
 506    return ret;
 507}
 508
 509static void colo_compare_notify_checkpoint(Notifier *notifier, void *data)
 510{
 511    colo_checkpoint_notify(data);
 512}
 513
 514static void colo_process_checkpoint(MigrationState *s)
 515{
 516    QIOChannelBuffer *bioc;
 517    QEMUFile *fb = NULL;
 518    int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 519    Error *local_err = NULL;
 520    int ret;
 521
 522    last_colo_mode = get_colo_mode();
 523    if (last_colo_mode != COLO_MODE_PRIMARY) {
 524        error_report("COLO mode must be COLO_MODE_PRIMARY");
 525        return;
 526    }
 527
 528    failover_init_state();
 529
 530    s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
 531    if (!s->rp_state.from_dst_file) {
 532        error_report("Open QEMUFile from_dst_file failed");
 533        goto out;
 534    }
 535
 536    packets_compare_notifier.notify = colo_compare_notify_checkpoint;
 537    colo_compare_register_notifier(&packets_compare_notifier);
 538
 539    /*
 540     * Wait for Secondary finish loading VM states and enter COLO
 541     * restore.
 542     */
 543    colo_receive_check_message(s->rp_state.from_dst_file,
 544                       COLO_MESSAGE_CHECKPOINT_READY, &local_err);
 545    if (local_err) {
 546        goto out;
 547    }
 548    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 549    fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
 550    object_unref(OBJECT(bioc));
 551
 552    qemu_mutex_lock_iothread();
 553#ifdef CONFIG_REPLICATION
 554    replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
 555    if (local_err) {
 556        qemu_mutex_unlock_iothread();
 557        goto out;
 558    }
 559#else
 560        abort();
 561#endif
 562
 563    vm_start();
 564    qemu_mutex_unlock_iothread();
 565    trace_colo_vm_state_change("stop", "run");
 566
 567    timer_mod(s->colo_delay_timer,
 568            current_time + s->parameters.x_checkpoint_delay);
 569
 570    while (s->state == MIGRATION_STATUS_COLO) {
 571        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 572            error_report("failover request");
 573            goto out;
 574        }
 575
 576        qemu_sem_wait(&s->colo_checkpoint_sem);
 577
 578        if (s->state != MIGRATION_STATUS_COLO) {
 579            goto out;
 580        }
 581        ret = colo_do_checkpoint_transaction(s, bioc, fb);
 582        if (ret < 0) {
 583            goto out;
 584        }
 585    }
 586
 587out:
 588    /* Throw the unreported error message after exited from loop */
 589    if (local_err) {
 590        error_report_err(local_err);
 591    }
 592
 593    if (fb) {
 594        qemu_fclose(fb);
 595    }
 596
 597    /*
 598     * There are only two reasons we can get here, some error happened
 599     * or the user triggered failover.
 600     */
 601    switch (failover_get_state()) {
 602    case FAILOVER_STATUS_COMPLETED:
 603        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 604                                  COLO_EXIT_REASON_REQUEST);
 605        break;
 606    default:
 607        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 608                                  COLO_EXIT_REASON_ERROR);
 609    }
 610
 611    /* Hope this not to be too long to wait here */
 612    qemu_sem_wait(&s->colo_exit_sem);
 613    qemu_sem_destroy(&s->colo_exit_sem);
 614
 615    /*
 616     * It is safe to unregister notifier after failover finished.
 617     * Besides, colo_delay_timer and colo_checkpoint_sem can't be
 618     * released befor unregister notifier, or there will be use-after-free
 619     * error.
 620     */
 621    colo_compare_unregister_notifier(&packets_compare_notifier);
 622    timer_del(s->colo_delay_timer);
 623    timer_free(s->colo_delay_timer);
 624    qemu_sem_destroy(&s->colo_checkpoint_sem);
 625
 626    /*
 627     * Must be called after failover BH is completed,
 628     * Or the failover BH may shutdown the wrong fd that
 629     * re-used by other threads after we release here.
 630     */
 631    if (s->rp_state.from_dst_file) {
 632        qemu_fclose(s->rp_state.from_dst_file);
 633    }
 634}
 635
 636void colo_checkpoint_notify(void *opaque)
 637{
 638    MigrationState *s = opaque;
 639    int64_t next_notify_time;
 640
 641    qemu_sem_post(&s->colo_checkpoint_sem);
 642    s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 643    next_notify_time = s->colo_checkpoint_time +
 644                    s->parameters.x_checkpoint_delay;
 645    timer_mod(s->colo_delay_timer, next_notify_time);
 646}
 647
 648void migrate_start_colo_process(MigrationState *s)
 649{
 650    qemu_mutex_unlock_iothread();
 651    qemu_sem_init(&s->colo_checkpoint_sem, 0);
 652    s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
 653                                colo_checkpoint_notify, s);
 654
 655    qemu_sem_init(&s->colo_exit_sem, 0);
 656    migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
 657                      MIGRATION_STATUS_COLO);
 658    colo_process_checkpoint(s);
 659    qemu_mutex_lock_iothread();
 660}
 661
 662static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
 663                                     Error **errp)
 664{
 665    COLOMessage msg;
 666    Error *local_err = NULL;
 667
 668    msg = colo_receive_message(f, &local_err);
 669    if (local_err) {
 670        error_propagate(errp, local_err);
 671        return;
 672    }
 673
 674    switch (msg) {
 675    case COLO_MESSAGE_CHECKPOINT_REQUEST:
 676        *checkpoint_request = 1;
 677        break;
 678    default:
 679        *checkpoint_request = 0;
 680        error_setg(errp, "Got unknown COLO message: %d", msg);
 681        break;
 682    }
 683}
 684
 685void *colo_process_incoming_thread(void *opaque)
 686{
 687    MigrationIncomingState *mis = opaque;
 688    QEMUFile *fb = NULL;
 689    QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
 690    uint64_t total_size;
 691    uint64_t value;
 692    Error *local_err = NULL;
 693    int ret;
 694
 695    rcu_register_thread();
 696    qemu_sem_init(&mis->colo_incoming_sem, 0);
 697
 698    migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
 699                      MIGRATION_STATUS_COLO);
 700
 701    last_colo_mode = get_colo_mode();
 702    if (last_colo_mode != COLO_MODE_SECONDARY) {
 703        error_report("COLO mode must be COLO_MODE_SECONDARY");
 704        return NULL;
 705    }
 706
 707    failover_init_state();
 708
 709    mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
 710    if (!mis->to_src_file) {
 711        error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
 712        goto out;
 713    }
 714    /*
 715     * Note: the communication between Primary side and Secondary side
 716     * should be sequential, we set the fd to unblocked in migration incoming
 717     * coroutine, and here we are in the COLO incoming thread, so it is ok to
 718     * set the fd back to blocked.
 719     */
 720    qemu_file_set_blocking(mis->from_src_file, true);
 721
 722    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 723    fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
 724    object_unref(OBJECT(bioc));
 725
 726    qemu_mutex_lock_iothread();
 727#ifdef CONFIG_REPLICATION
 728    replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
 729    if (local_err) {
 730        qemu_mutex_unlock_iothread();
 731        goto out;
 732    }
 733#else
 734        abort();
 735#endif
 736    vm_start();
 737    trace_colo_vm_state_change("stop", "run");
 738    qemu_mutex_unlock_iothread();
 739
 740    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
 741                      &local_err);
 742    if (local_err) {
 743        goto out;
 744    }
 745
 746    while (mis->state == MIGRATION_STATUS_COLO) {
 747        int request = 0;
 748
 749        colo_wait_handle_message(mis->from_src_file, &request, &local_err);
 750        if (local_err) {
 751            goto out;
 752        }
 753        assert(request);
 754        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 755            error_report("failover request");
 756            goto out;
 757        }
 758
 759        qemu_mutex_lock_iothread();
 760        vm_stop_force_state(RUN_STATE_COLO);
 761        trace_colo_vm_state_change("run", "stop");
 762        qemu_mutex_unlock_iothread();
 763
 764        /* FIXME: This is unnecessary for periodic checkpoint mode */
 765        colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
 766                     &local_err);
 767        if (local_err) {
 768            goto out;
 769        }
 770
 771        colo_receive_check_message(mis->from_src_file,
 772                           COLO_MESSAGE_VMSTATE_SEND, &local_err);
 773        if (local_err) {
 774            goto out;
 775        }
 776
 777        qemu_mutex_lock_iothread();
 778        cpu_synchronize_all_pre_loadvm();
 779        ret = qemu_loadvm_state_main(mis->from_src_file, mis);
 780        qemu_mutex_unlock_iothread();
 781
 782        if (ret < 0) {
 783            error_report("Load VM's live state (ram) error");
 784            goto out;
 785        }
 786
 787        value = colo_receive_message_value(mis->from_src_file,
 788                                 COLO_MESSAGE_VMSTATE_SIZE, &local_err);
 789        if (local_err) {
 790            goto out;
 791        }
 792
 793        /*
 794         * Read VM device state data into channel buffer,
 795         * It's better to re-use the memory allocated.
 796         * Here we need to handle the channel buffer directly.
 797         */
 798        if (value > bioc->capacity) {
 799            bioc->capacity = value;
 800            bioc->data = g_realloc(bioc->data, bioc->capacity);
 801        }
 802        total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
 803        if (total_size != value) {
 804            error_report("Got %" PRIu64 " VMState data, less than expected"
 805                        " %" PRIu64, total_size, value);
 806            goto out;
 807        }
 808        bioc->usage = total_size;
 809        qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 810
 811        colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
 812                     &local_err);
 813        if (local_err) {
 814            goto out;
 815        }
 816
 817        qemu_mutex_lock_iothread();
 818        vmstate_loading = true;
 819        ret = qemu_load_device_state(fb);
 820        if (ret < 0) {
 821            error_report("COLO: load device state failed");
 822            qemu_mutex_unlock_iothread();
 823            goto out;
 824        }
 825
 826#ifdef CONFIG_REPLICATION
 827        replication_get_error_all(&local_err);
 828        if (local_err) {
 829            qemu_mutex_unlock_iothread();
 830            goto out;
 831        }
 832
 833        /* discard colo disk buffer */
 834        replication_do_checkpoint_all(&local_err);
 835        if (local_err) {
 836            qemu_mutex_unlock_iothread();
 837            goto out;
 838        }
 839#else
 840        abort();
 841#endif
 842        /* Notify all filters of all NIC to do checkpoint */
 843        colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
 844
 845        if (local_err) {
 846            qemu_mutex_unlock_iothread();
 847            goto out;
 848        }
 849
 850        vmstate_loading = false;
 851        vm_start();
 852        trace_colo_vm_state_change("stop", "run");
 853        qemu_mutex_unlock_iothread();
 854
 855        if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 856            failover_set_state(FAILOVER_STATUS_RELAUNCH,
 857                            FAILOVER_STATUS_NONE);
 858            failover_request_active(NULL);
 859            goto out;
 860        }
 861
 862        colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
 863                     &local_err);
 864        if (local_err) {
 865            goto out;
 866        }
 867    }
 868
 869out:
 870    vmstate_loading = false;
 871    /* Throw the unreported error message after exited from loop */
 872    if (local_err) {
 873        error_report_err(local_err);
 874    }
 875
 876    /*
 877     * There are only two reasons we can get here, some error happened
 878     * or the user triggered failover.
 879     */
 880    switch (failover_get_state()) {
 881    case FAILOVER_STATUS_COMPLETED:
 882        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 883                                  COLO_EXIT_REASON_REQUEST);
 884        break;
 885    default:
 886        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 887                                  COLO_EXIT_REASON_ERROR);
 888    }
 889
 890    if (fb) {
 891        qemu_fclose(fb);
 892    }
 893
 894    /* Hope this not to be too long to loop here */
 895    qemu_sem_wait(&mis->colo_incoming_sem);
 896    qemu_sem_destroy(&mis->colo_incoming_sem);
 897    /* Must be called after failover BH is completed */
 898    if (mis->to_src_file) {
 899        qemu_fclose(mis->to_src_file);
 900        mis->to_src_file = NULL;
 901    }
 902
 903    rcu_unregister_thread();
 904    return NULL;
 905}
 906