qemu/migration/colo.c
<<
>>
Prefs
   1/*
   2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
   3 * (a.k.a. Fault Tolerance or Continuous Replication)
   4 *
   5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 * Copyright (c) 2016 Intel Corporation
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or
  10 * later.  See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include "sysemu/sysemu.h"
  15#include "qapi/error.h"
  16#include "qapi/qapi-commands-migration.h"
  17#include "qemu-file-channel.h"
  18#include "migration.h"
  19#include "qemu-file.h"
  20#include "savevm.h"
  21#include "migration/colo.h"
  22#include "block.h"
  23#include "io/channel-buffer.h"
  24#include "trace.h"
  25#include "qemu/error-report.h"
  26#include "migration/failover.h"
  27#ifdef CONFIG_REPLICATION
  28#include "replication.h"
  29#endif
  30#include "net/colo-compare.h"
  31#include "net/colo.h"
  32#include "block/block.h"
  33#include "qapi/qapi-events-migration.h"
  34#include "qapi/qmp/qerror.h"
  35#include "sysemu/cpus.h"
  36#include "net/filter.h"
  37
  38static bool vmstate_loading;
  39static Notifier packets_compare_notifier;
  40
  41/* User need to know colo mode after COLO failover */
  42static COLOMode last_colo_mode;
  43
  44#define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
  45
  46bool migration_in_colo_state(void)
  47{
  48    MigrationState *s = migrate_get_current();
  49
  50    return (s->state == MIGRATION_STATUS_COLO);
  51}
  52
  53bool migration_incoming_in_colo_state(void)
  54{
  55    MigrationIncomingState *mis = migration_incoming_get_current();
  56
  57    return mis && (mis->state == MIGRATION_STATUS_COLO);
  58}
  59
  60static bool colo_runstate_is_stopped(void)
  61{
  62    return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
  63}
  64
  65static void secondary_vm_do_failover(void)
  66{
  67/* COLO needs enable block-replication */
  68#ifdef CONFIG_REPLICATION
  69    int old_state;
  70    MigrationIncomingState *mis = migration_incoming_get_current();
  71    Error *local_err = NULL;
  72
  73    /* Can not do failover during the process of VM's loading VMstate, Or
  74     * it will break the secondary VM.
  75     */
  76    if (vmstate_loading) {
  77        old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
  78                        FAILOVER_STATUS_RELAUNCH);
  79        if (old_state != FAILOVER_STATUS_ACTIVE) {
  80            error_report("Unknown error while do failover for secondary VM,"
  81                         "old_state: %s", FailoverStatus_str(old_state));
  82        }
  83        return;
  84    }
  85
  86    migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
  87                      MIGRATION_STATUS_COMPLETED);
  88
  89    replication_stop_all(true, &local_err);
  90    if (local_err) {
  91        error_report_err(local_err);
  92    }
  93
  94    /* Notify all filters of all NIC to do checkpoint */
  95    colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err);
  96    if (local_err) {
  97        error_report_err(local_err);
  98    }
  99
 100    if (!autostart) {
 101        error_report("\"-S\" qemu option will be ignored in secondary side");
 102        /* recover runstate to normal migration finish state */
 103        autostart = true;
 104    }
 105    /*
 106     * Make sure COLO incoming thread not block in recv or send,
 107     * If mis->from_src_file and mis->to_src_file use the same fd,
 108     * The second shutdown() will return -1, we ignore this value,
 109     * It is harmless.
 110     */
 111    if (mis->from_src_file) {
 112        qemu_file_shutdown(mis->from_src_file);
 113    }
 114    if (mis->to_src_file) {
 115        qemu_file_shutdown(mis->to_src_file);
 116    }
 117
 118    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 119                                   FAILOVER_STATUS_COMPLETED);
 120    if (old_state != FAILOVER_STATUS_ACTIVE) {
 121        error_report("Incorrect state (%s) while doing failover for "
 122                     "secondary VM", FailoverStatus_str(old_state));
 123        return;
 124    }
 125    /* Notify COLO incoming thread that failover work is finished */
 126    qemu_sem_post(&mis->colo_incoming_sem);
 127
 128    /* For Secondary VM, jump to incoming co */
 129    if (mis->migration_incoming_co) {
 130        qemu_coroutine_enter(mis->migration_incoming_co);
 131    }
 132#else
 133    abort();
 134#endif
 135}
 136
 137static void primary_vm_do_failover(void)
 138{
 139#ifdef CONFIG_REPLICATION
 140    MigrationState *s = migrate_get_current();
 141    int old_state;
 142    Error *local_err = NULL;
 143
 144    migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
 145                      MIGRATION_STATUS_COMPLETED);
 146    /*
 147     * kick COLO thread which might wait at
 148     * qemu_sem_wait(&s->colo_checkpoint_sem).
 149     */
 150    colo_checkpoint_notify(migrate_get_current());
 151
 152    /*
 153     * Wake up COLO thread which may blocked in recv() or send(),
 154     * The s->rp_state.from_dst_file and s->to_dst_file may use the
 155     * same fd, but we still shutdown the fd for twice, it is harmless.
 156     */
 157    if (s->to_dst_file) {
 158        qemu_file_shutdown(s->to_dst_file);
 159    }
 160    if (s->rp_state.from_dst_file) {
 161        qemu_file_shutdown(s->rp_state.from_dst_file);
 162    }
 163
 164    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 165                                   FAILOVER_STATUS_COMPLETED);
 166    if (old_state != FAILOVER_STATUS_ACTIVE) {
 167        error_report("Incorrect state (%s) while doing failover for Primary VM",
 168                     FailoverStatus_str(old_state));
 169        return;
 170    }
 171
 172    replication_stop_all(true, &local_err);
 173    if (local_err) {
 174        error_report_err(local_err);
 175        local_err = NULL;
 176    }
 177
 178    /* Notify COLO thread that failover work is finished */
 179    qemu_sem_post(&s->colo_exit_sem);
 180#else
 181    abort();
 182#endif
 183}
 184
 185COLOMode get_colo_mode(void)
 186{
 187    if (migration_in_colo_state()) {
 188        return COLO_MODE_PRIMARY;
 189    } else if (migration_incoming_in_colo_state()) {
 190        return COLO_MODE_SECONDARY;
 191    } else {
 192        return COLO_MODE_NONE;
 193    }
 194}
 195
 196void colo_do_failover(void)
 197{
 198    /* Make sure VM stopped while failover happened. */
 199    if (!colo_runstate_is_stopped()) {
 200        vm_stop_force_state(RUN_STATE_COLO);
 201    }
 202
 203    switch (get_colo_mode()) {
 204    case COLO_MODE_PRIMARY:
 205        primary_vm_do_failover();
 206        break;
 207    case COLO_MODE_SECONDARY:
 208        secondary_vm_do_failover();
 209        break;
 210    default:
 211        error_report("colo_do_failover failed because the colo mode"
 212                     " could not be obtained");
 213    }
 214}
 215
 216#ifdef CONFIG_REPLICATION
 217void qmp_xen_set_replication(bool enable, bool primary,
 218                             bool has_failover, bool failover,
 219                             Error **errp)
 220{
 221    ReplicationMode mode = primary ?
 222                           REPLICATION_MODE_PRIMARY :
 223                           REPLICATION_MODE_SECONDARY;
 224
 225    if (has_failover && enable) {
 226        error_setg(errp, "Parameter 'failover' is only for"
 227                   " stopping replication");
 228        return;
 229    }
 230
 231    if (enable) {
 232        replication_start_all(mode, errp);
 233    } else {
 234        if (!has_failover) {
 235            failover = NULL;
 236        }
 237        replication_stop_all(failover, failover ? NULL : errp);
 238    }
 239}
 240
 241ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
 242{
 243    Error *err = NULL;
 244    ReplicationStatus *s = g_new0(ReplicationStatus, 1);
 245
 246    replication_get_error_all(&err);
 247    if (err) {
 248        s->error = true;
 249        s->has_desc = true;
 250        s->desc = g_strdup(error_get_pretty(err));
 251    } else {
 252        s->error = false;
 253    }
 254
 255    error_free(err);
 256    return s;
 257}
 258
 259void qmp_xen_colo_do_checkpoint(Error **errp)
 260{
 261    replication_do_checkpoint_all(errp);
 262    /* Notify all filters of all NIC to do checkpoint */
 263    colo_notify_filters_event(COLO_EVENT_CHECKPOINT, errp);
 264}
 265#endif
 266
 267COLOStatus *qmp_query_colo_status(Error **errp)
 268{
 269    COLOStatus *s = g_new0(COLOStatus, 1);
 270
 271    s->mode = get_colo_mode();
 272    s->last_mode = last_colo_mode;
 273
 274    switch (failover_get_state()) {
 275    case FAILOVER_STATUS_NONE:
 276        s->reason = COLO_EXIT_REASON_NONE;
 277        break;
 278    case FAILOVER_STATUS_COMPLETED:
 279        s->reason = COLO_EXIT_REASON_REQUEST;
 280        break;
 281    default:
 282        if (migration_in_colo_state()) {
 283            s->reason = COLO_EXIT_REASON_PROCESSING;
 284        } else {
 285            s->reason = COLO_EXIT_REASON_ERROR;
 286        }
 287    }
 288
 289    return s;
 290}
 291
 292static void colo_send_message(QEMUFile *f, COLOMessage msg,
 293                              Error **errp)
 294{
 295    int ret;
 296
 297    if (msg >= COLO_MESSAGE__MAX) {
 298        error_setg(errp, "%s: Invalid message", __func__);
 299        return;
 300    }
 301    qemu_put_be32(f, msg);
 302    qemu_fflush(f);
 303
 304    ret = qemu_file_get_error(f);
 305    if (ret < 0) {
 306        error_setg_errno(errp, -ret, "Can't send COLO message");
 307    }
 308    trace_colo_send_message(COLOMessage_str(msg));
 309}
 310
 311static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
 312                                    uint64_t value, Error **errp)
 313{
 314    Error *local_err = NULL;
 315    int ret;
 316
 317    colo_send_message(f, msg, &local_err);
 318    if (local_err) {
 319        error_propagate(errp, local_err);
 320        return;
 321    }
 322    qemu_put_be64(f, value);
 323    qemu_fflush(f);
 324
 325    ret = qemu_file_get_error(f);
 326    if (ret < 0) {
 327        error_setg_errno(errp, -ret, "Failed to send value for message:%s",
 328                         COLOMessage_str(msg));
 329    }
 330}
 331
 332static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
 333{
 334    COLOMessage msg;
 335    int ret;
 336
 337    msg = qemu_get_be32(f);
 338    ret = qemu_file_get_error(f);
 339    if (ret < 0) {
 340        error_setg_errno(errp, -ret, "Can't receive COLO message");
 341        return msg;
 342    }
 343    if (msg >= COLO_MESSAGE__MAX) {
 344        error_setg(errp, "%s: Invalid message", __func__);
 345        return msg;
 346    }
 347    trace_colo_receive_message(COLOMessage_str(msg));
 348    return msg;
 349}
 350
 351static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
 352                                       Error **errp)
 353{
 354    COLOMessage msg;
 355    Error *local_err = NULL;
 356
 357    msg = colo_receive_message(f, &local_err);
 358    if (local_err) {
 359        error_propagate(errp, local_err);
 360        return;
 361    }
 362    if (msg != expect_msg) {
 363        error_setg(errp, "Unexpected COLO message %d, expected %d",
 364                          msg, expect_msg);
 365    }
 366}
 367
 368static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
 369                                           Error **errp)
 370{
 371    Error *local_err = NULL;
 372    uint64_t value;
 373    int ret;
 374
 375    colo_receive_check_message(f, expect_msg, &local_err);
 376    if (local_err) {
 377        error_propagate(errp, local_err);
 378        return 0;
 379    }
 380
 381    value = qemu_get_be64(f);
 382    ret = qemu_file_get_error(f);
 383    if (ret < 0) {
 384        error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
 385                         COLOMessage_str(expect_msg));
 386    }
 387    return value;
 388}
 389
 390static int colo_do_checkpoint_transaction(MigrationState *s,
 391                                          QIOChannelBuffer *bioc,
 392                                          QEMUFile *fb)
 393{
 394    Error *local_err = NULL;
 395    int ret = -1;
 396
 397    colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
 398                      &local_err);
 399    if (local_err) {
 400        goto out;
 401    }
 402
 403    colo_receive_check_message(s->rp_state.from_dst_file,
 404                    COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
 405    if (local_err) {
 406        goto out;
 407    }
 408    /* Reset channel-buffer directly */
 409    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 410    bioc->usage = 0;
 411
 412    qemu_mutex_lock_iothread();
 413    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 414        qemu_mutex_unlock_iothread();
 415        goto out;
 416    }
 417    vm_stop_force_state(RUN_STATE_COLO);
 418    qemu_mutex_unlock_iothread();
 419    trace_colo_vm_state_change("run", "stop");
 420    /*
 421     * Failover request bh could be called after vm_stop_force_state(),
 422     * So we need check failover_request_is_active() again.
 423     */
 424    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 425        goto out;
 426    }
 427
 428    colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err);
 429    if (local_err) {
 430        goto out;
 431    }
 432
 433    /* Disable block migration */
 434    migrate_set_block_enabled(false, &local_err);
 435    qemu_mutex_lock_iothread();
 436
 437#ifdef CONFIG_REPLICATION
 438    replication_do_checkpoint_all(&local_err);
 439    if (local_err) {
 440        qemu_mutex_unlock_iothread();
 441        goto out;
 442    }
 443#else
 444        abort();
 445#endif
 446
 447    colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
 448    if (local_err) {
 449        qemu_mutex_unlock_iothread();
 450        goto out;
 451    }
 452    /* Note: device state is saved into buffer */
 453    ret = qemu_save_device_state(fb);
 454
 455    qemu_mutex_unlock_iothread();
 456    if (ret < 0) {
 457        goto out;
 458    }
 459    /*
 460     * Only save VM's live state, which not including device state.
 461     * TODO: We may need a timeout mechanism to prevent COLO process
 462     * to be blocked here.
 463     */
 464    qemu_savevm_live_state(s->to_dst_file);
 465
 466    qemu_fflush(fb);
 467
 468    /*
 469     * We need the size of the VMstate data in Secondary side,
 470     * With which we can decide how much data should be read.
 471     */
 472    colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
 473                            bioc->usage, &local_err);
 474    if (local_err) {
 475        goto out;
 476    }
 477
 478    qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
 479    qemu_fflush(s->to_dst_file);
 480    ret = qemu_file_get_error(s->to_dst_file);
 481    if (ret < 0) {
 482        goto out;
 483    }
 484
 485    colo_receive_check_message(s->rp_state.from_dst_file,
 486                       COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
 487    if (local_err) {
 488        goto out;
 489    }
 490
 491    colo_receive_check_message(s->rp_state.from_dst_file,
 492                       COLO_MESSAGE_VMSTATE_LOADED, &local_err);
 493    if (local_err) {
 494        goto out;
 495    }
 496
 497    ret = 0;
 498
 499    qemu_mutex_lock_iothread();
 500    vm_start();
 501    qemu_mutex_unlock_iothread();
 502    trace_colo_vm_state_change("stop", "run");
 503
 504out:
 505    if (local_err) {
 506        error_report_err(local_err);
 507    }
 508    return ret;
 509}
 510
 511static void colo_compare_notify_checkpoint(Notifier *notifier, void *data)
 512{
 513    colo_checkpoint_notify(data);
 514}
 515
 516static void colo_process_checkpoint(MigrationState *s)
 517{
 518    QIOChannelBuffer *bioc;
 519    QEMUFile *fb = NULL;
 520    int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 521    Error *local_err = NULL;
 522    int ret;
 523
 524    last_colo_mode = get_colo_mode();
 525    if (last_colo_mode != COLO_MODE_PRIMARY) {
 526        error_report("COLO mode must be COLO_MODE_PRIMARY");
 527        return;
 528    }
 529
 530    failover_init_state();
 531
 532    s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
 533    if (!s->rp_state.from_dst_file) {
 534        error_report("Open QEMUFile from_dst_file failed");
 535        goto out;
 536    }
 537
 538    packets_compare_notifier.notify = colo_compare_notify_checkpoint;
 539    colo_compare_register_notifier(&packets_compare_notifier);
 540
 541    /*
 542     * Wait for Secondary finish loading VM states and enter COLO
 543     * restore.
 544     */
 545    colo_receive_check_message(s->rp_state.from_dst_file,
 546                       COLO_MESSAGE_CHECKPOINT_READY, &local_err);
 547    if (local_err) {
 548        goto out;
 549    }
 550    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 551    fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
 552    object_unref(OBJECT(bioc));
 553
 554    qemu_mutex_lock_iothread();
 555#ifdef CONFIG_REPLICATION
 556    replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
 557    if (local_err) {
 558        qemu_mutex_unlock_iothread();
 559        goto out;
 560    }
 561#else
 562        abort();
 563#endif
 564
 565    vm_start();
 566    qemu_mutex_unlock_iothread();
 567    trace_colo_vm_state_change("stop", "run");
 568
 569    timer_mod(s->colo_delay_timer,
 570            current_time + s->parameters.x_checkpoint_delay);
 571
 572    while (s->state == MIGRATION_STATUS_COLO) {
 573        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 574            error_report("failover request");
 575            goto out;
 576        }
 577
 578        qemu_sem_wait(&s->colo_checkpoint_sem);
 579
 580        if (s->state != MIGRATION_STATUS_COLO) {
 581            goto out;
 582        }
 583        ret = colo_do_checkpoint_transaction(s, bioc, fb);
 584        if (ret < 0) {
 585            goto out;
 586        }
 587    }
 588
 589out:
 590    /* Throw the unreported error message after exited from loop */
 591    if (local_err) {
 592        error_report_err(local_err);
 593    }
 594
 595    if (fb) {
 596        qemu_fclose(fb);
 597    }
 598
 599    /*
 600     * There are only two reasons we can get here, some error happened
 601     * or the user triggered failover.
 602     */
 603    switch (failover_get_state()) {
 604    case FAILOVER_STATUS_COMPLETED:
 605        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 606                                  COLO_EXIT_REASON_REQUEST);
 607        break;
 608    default:
 609        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 610                                  COLO_EXIT_REASON_ERROR);
 611    }
 612
 613    /* Hope this not to be too long to wait here */
 614    qemu_sem_wait(&s->colo_exit_sem);
 615    qemu_sem_destroy(&s->colo_exit_sem);
 616
 617    /*
 618     * It is safe to unregister notifier after failover finished.
 619     * Besides, colo_delay_timer and colo_checkpoint_sem can't be
 620     * released befor unregister notifier, or there will be use-after-free
 621     * error.
 622     */
 623    colo_compare_unregister_notifier(&packets_compare_notifier);
 624    timer_del(s->colo_delay_timer);
 625    timer_free(s->colo_delay_timer);
 626    qemu_sem_destroy(&s->colo_checkpoint_sem);
 627
 628    /*
 629     * Must be called after failover BH is completed,
 630     * Or the failover BH may shutdown the wrong fd that
 631     * re-used by other threads after we release here.
 632     */
 633    if (s->rp_state.from_dst_file) {
 634        qemu_fclose(s->rp_state.from_dst_file);
 635    }
 636}
 637
 638void colo_checkpoint_notify(void *opaque)
 639{
 640    MigrationState *s = opaque;
 641    int64_t next_notify_time;
 642
 643    qemu_sem_post(&s->colo_checkpoint_sem);
 644    s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 645    next_notify_time = s->colo_checkpoint_time +
 646                    s->parameters.x_checkpoint_delay;
 647    timer_mod(s->colo_delay_timer, next_notify_time);
 648}
 649
 650void migrate_start_colo_process(MigrationState *s)
 651{
 652    qemu_mutex_unlock_iothread();
 653    qemu_sem_init(&s->colo_checkpoint_sem, 0);
 654    s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
 655                                colo_checkpoint_notify, s);
 656
 657    qemu_sem_init(&s->colo_exit_sem, 0);
 658    migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
 659                      MIGRATION_STATUS_COLO);
 660    colo_process_checkpoint(s);
 661    qemu_mutex_lock_iothread();
 662}
 663
 664static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
 665                                     Error **errp)
 666{
 667    COLOMessage msg;
 668    Error *local_err = NULL;
 669
 670    msg = colo_receive_message(f, &local_err);
 671    if (local_err) {
 672        error_propagate(errp, local_err);
 673        return;
 674    }
 675
 676    switch (msg) {
 677    case COLO_MESSAGE_CHECKPOINT_REQUEST:
 678        *checkpoint_request = 1;
 679        break;
 680    default:
 681        *checkpoint_request = 0;
 682        error_setg(errp, "Got unknown COLO message: %d", msg);
 683        break;
 684    }
 685}
 686
 687void *colo_process_incoming_thread(void *opaque)
 688{
 689    MigrationIncomingState *mis = opaque;
 690    QEMUFile *fb = NULL;
 691    QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
 692    uint64_t total_size;
 693    uint64_t value;
 694    Error *local_err = NULL;
 695    int ret;
 696
 697    rcu_register_thread();
 698    qemu_sem_init(&mis->colo_incoming_sem, 0);
 699
 700    migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
 701                      MIGRATION_STATUS_COLO);
 702
 703    last_colo_mode = get_colo_mode();
 704    if (last_colo_mode != COLO_MODE_SECONDARY) {
 705        error_report("COLO mode must be COLO_MODE_SECONDARY");
 706        return NULL;
 707    }
 708
 709    failover_init_state();
 710
 711    mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
 712    if (!mis->to_src_file) {
 713        error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
 714        goto out;
 715    }
 716    /*
 717     * Note: the communication between Primary side and Secondary side
 718     * should be sequential, we set the fd to unblocked in migration incoming
 719     * coroutine, and here we are in the COLO incoming thread, so it is ok to
 720     * set the fd back to blocked.
 721     */
 722    qemu_file_set_blocking(mis->from_src_file, true);
 723
 724    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 725    fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
 726    object_unref(OBJECT(bioc));
 727
 728    qemu_mutex_lock_iothread();
 729#ifdef CONFIG_REPLICATION
 730    replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
 731    if (local_err) {
 732        qemu_mutex_unlock_iothread();
 733        goto out;
 734    }
 735#else
 736        abort();
 737#endif
 738    vm_start();
 739    trace_colo_vm_state_change("stop", "run");
 740    qemu_mutex_unlock_iothread();
 741
 742    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
 743                      &local_err);
 744    if (local_err) {
 745        goto out;
 746    }
 747
 748    while (mis->state == MIGRATION_STATUS_COLO) {
 749        int request = 0;
 750
 751        colo_wait_handle_message(mis->from_src_file, &request, &local_err);
 752        if (local_err) {
 753            goto out;
 754        }
 755        assert(request);
 756        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 757            error_report("failover request");
 758            goto out;
 759        }
 760
 761        qemu_mutex_lock_iothread();
 762        vm_stop_force_state(RUN_STATE_COLO);
 763        trace_colo_vm_state_change("run", "stop");
 764        qemu_mutex_unlock_iothread();
 765
 766        /* FIXME: This is unnecessary for periodic checkpoint mode */
 767        colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
 768                     &local_err);
 769        if (local_err) {
 770            goto out;
 771        }
 772
 773        colo_receive_check_message(mis->from_src_file,
 774                           COLO_MESSAGE_VMSTATE_SEND, &local_err);
 775        if (local_err) {
 776            goto out;
 777        }
 778
 779        qemu_mutex_lock_iothread();
 780        cpu_synchronize_all_pre_loadvm();
 781        ret = qemu_loadvm_state_main(mis->from_src_file, mis);
 782        qemu_mutex_unlock_iothread();
 783
 784        if (ret < 0) {
 785            error_report("Load VM's live state (ram) error");
 786            goto out;
 787        }
 788
 789        value = colo_receive_message_value(mis->from_src_file,
 790                                 COLO_MESSAGE_VMSTATE_SIZE, &local_err);
 791        if (local_err) {
 792            goto out;
 793        }
 794
 795        /*
 796         * Read VM device state data into channel buffer,
 797         * It's better to re-use the memory allocated.
 798         * Here we need to handle the channel buffer directly.
 799         */
 800        if (value > bioc->capacity) {
 801            bioc->capacity = value;
 802            bioc->data = g_realloc(bioc->data, bioc->capacity);
 803        }
 804        total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
 805        if (total_size != value) {
 806            error_report("Got %" PRIu64 " VMState data, less than expected"
 807                        " %" PRIu64, total_size, value);
 808            goto out;
 809        }
 810        bioc->usage = total_size;
 811        qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 812
 813        colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
 814                     &local_err);
 815        if (local_err) {
 816            goto out;
 817        }
 818
 819        qemu_mutex_lock_iothread();
 820        vmstate_loading = true;
 821        ret = qemu_load_device_state(fb);
 822        if (ret < 0) {
 823            error_report("COLO: load device state failed");
 824            qemu_mutex_unlock_iothread();
 825            goto out;
 826        }
 827
 828#ifdef CONFIG_REPLICATION
 829        replication_get_error_all(&local_err);
 830        if (local_err) {
 831            qemu_mutex_unlock_iothread();
 832            goto out;
 833        }
 834
 835        /* discard colo disk buffer */
 836        replication_do_checkpoint_all(&local_err);
 837        if (local_err) {
 838            qemu_mutex_unlock_iothread();
 839            goto out;
 840        }
 841#else
 842        abort();
 843#endif
 844        /* Notify all filters of all NIC to do checkpoint */
 845        colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
 846
 847        if (local_err) {
 848            qemu_mutex_unlock_iothread();
 849            goto out;
 850        }
 851
 852        vmstate_loading = false;
 853        vm_start();
 854        trace_colo_vm_state_change("stop", "run");
 855        qemu_mutex_unlock_iothread();
 856
 857        if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 858            failover_set_state(FAILOVER_STATUS_RELAUNCH,
 859                            FAILOVER_STATUS_NONE);
 860            failover_request_active(NULL);
 861            goto out;
 862        }
 863
 864        colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
 865                     &local_err);
 866        if (local_err) {
 867            goto out;
 868        }
 869    }
 870
 871out:
 872    vmstate_loading = false;
 873    /* Throw the unreported error message after exited from loop */
 874    if (local_err) {
 875        error_report_err(local_err);
 876    }
 877
 878    /*
 879     * There are only two reasons we can get here, some error happened
 880     * or the user triggered failover.
 881     */
 882    switch (failover_get_state()) {
 883    case FAILOVER_STATUS_COMPLETED:
 884        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 885                                  COLO_EXIT_REASON_REQUEST);
 886        break;
 887    default:
 888        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 889                                  COLO_EXIT_REASON_ERROR);
 890    }
 891
 892    if (fb) {
 893        qemu_fclose(fb);
 894    }
 895
 896    /* Hope this not to be too long to loop here */
 897    qemu_sem_wait(&mis->colo_incoming_sem);
 898    qemu_sem_destroy(&mis->colo_incoming_sem);
 899    /* Must be called after failover BH is completed */
 900    if (mis->to_src_file) {
 901        qemu_fclose(mis->to_src_file);
 902        mis->to_src_file = NULL;
 903    }
 904
 905    rcu_unregister_thread();
 906    return NULL;
 907}
 908