qemu/migration/colo.c
<<
>>
Prefs
   1/*
   2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
   3 * (a.k.a. Fault Tolerance or Continuous Replication)
   4 *
   5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 * Copyright (c) 2016 Intel Corporation
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or
  10 * later.  See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include "sysemu/sysemu.h"
  15#include "qapi/error.h"
  16#include "qapi/qapi-commands-migration.h"
  17#include "qemu-file-channel.h"
  18#include "migration.h"
  19#include "qemu-file.h"
  20#include "savevm.h"
  21#include "migration/colo.h"
  22#include "block.h"
  23#include "io/channel-buffer.h"
  24#include "trace.h"
  25#include "qemu/error-report.h"
  26#include "qemu/main-loop.h"
  27#include "qemu/rcu.h"
  28#include "migration/failover.h"
  29#ifdef CONFIG_REPLICATION
  30#include "replication.h"
  31#endif
  32#include "net/colo-compare.h"
  33#include "net/colo.h"
  34#include "block/block.h"
  35#include "qapi/qapi-events-migration.h"
  36#include "qapi/qmp/qerror.h"
  37#include "sysemu/cpus.h"
  38#include "sysemu/runstate.h"
  39#include "net/filter.h"
  40
  41static bool vmstate_loading;
  42static Notifier packets_compare_notifier;
  43
  44/* User need to know colo mode after COLO failover */
  45static COLOMode last_colo_mode;
  46
  47#define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
  48
  49bool migration_in_colo_state(void)
  50{
  51    MigrationState *s = migrate_get_current();
  52
  53    return (s->state == MIGRATION_STATUS_COLO);
  54}
  55
  56bool migration_incoming_in_colo_state(void)
  57{
  58    MigrationIncomingState *mis = migration_incoming_get_current();
  59
  60    return mis && (mis->state == MIGRATION_STATUS_COLO);
  61}
  62
  63static bool colo_runstate_is_stopped(void)
  64{
  65    return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
  66}
  67
  68static void secondary_vm_do_failover(void)
  69{
  70/* COLO needs enable block-replication */
  71#ifdef CONFIG_REPLICATION
  72    int old_state;
  73    MigrationIncomingState *mis = migration_incoming_get_current();
  74    Error *local_err = NULL;
  75
  76    /* Can not do failover during the process of VM's loading VMstate, Or
  77     * it will break the secondary VM.
  78     */
  79    if (vmstate_loading) {
  80        old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
  81                        FAILOVER_STATUS_RELAUNCH);
  82        if (old_state != FAILOVER_STATUS_ACTIVE) {
  83            error_report("Unknown error while do failover for secondary VM,"
  84                         "old_state: %s", FailoverStatus_str(old_state));
  85        }
  86        return;
  87    }
  88
  89    migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
  90                      MIGRATION_STATUS_COMPLETED);
  91
  92    replication_stop_all(true, &local_err);
  93    if (local_err) {
  94        error_report_err(local_err);
  95    }
  96
  97    /* Notify all filters of all NIC to do checkpoint */
  98    colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err);
  99    if (local_err) {
 100        error_report_err(local_err);
 101    }
 102
 103    if (!autostart) {
 104        error_report("\"-S\" qemu option will be ignored in secondary side");
 105        /* recover runstate to normal migration finish state */
 106        autostart = true;
 107    }
 108    /*
 109     * Make sure COLO incoming thread not block in recv or send,
 110     * If mis->from_src_file and mis->to_src_file use the same fd,
 111     * The second shutdown() will return -1, we ignore this value,
 112     * It is harmless.
 113     */
 114    if (mis->from_src_file) {
 115        qemu_file_shutdown(mis->from_src_file);
 116    }
 117    if (mis->to_src_file) {
 118        qemu_file_shutdown(mis->to_src_file);
 119    }
 120
 121    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 122                                   FAILOVER_STATUS_COMPLETED);
 123    if (old_state != FAILOVER_STATUS_ACTIVE) {
 124        error_report("Incorrect state (%s) while doing failover for "
 125                     "secondary VM", FailoverStatus_str(old_state));
 126        return;
 127    }
 128    /* Notify COLO incoming thread that failover work is finished */
 129    qemu_sem_post(&mis->colo_incoming_sem);
 130
 131    /* For Secondary VM, jump to incoming co */
 132    if (mis->migration_incoming_co) {
 133        qemu_coroutine_enter(mis->migration_incoming_co);
 134    }
 135#else
 136    abort();
 137#endif
 138}
 139
 140static void primary_vm_do_failover(void)
 141{
 142#ifdef CONFIG_REPLICATION
 143    MigrationState *s = migrate_get_current();
 144    int old_state;
 145    Error *local_err = NULL;
 146
 147    migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
 148                      MIGRATION_STATUS_COMPLETED);
 149    /*
 150     * kick COLO thread which might wait at
 151     * qemu_sem_wait(&s->colo_checkpoint_sem).
 152     */
 153    colo_checkpoint_notify(migrate_get_current());
 154
 155    /*
 156     * Wake up COLO thread which may blocked in recv() or send(),
 157     * The s->rp_state.from_dst_file and s->to_dst_file may use the
 158     * same fd, but we still shutdown the fd for twice, it is harmless.
 159     */
 160    if (s->to_dst_file) {
 161        qemu_file_shutdown(s->to_dst_file);
 162    }
 163    if (s->rp_state.from_dst_file) {
 164        qemu_file_shutdown(s->rp_state.from_dst_file);
 165    }
 166
 167    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 168                                   FAILOVER_STATUS_COMPLETED);
 169    if (old_state != FAILOVER_STATUS_ACTIVE) {
 170        error_report("Incorrect state (%s) while doing failover for Primary VM",
 171                     FailoverStatus_str(old_state));
 172        return;
 173    }
 174
 175    replication_stop_all(true, &local_err);
 176    if (local_err) {
 177        error_report_err(local_err);
 178        local_err = NULL;
 179    }
 180
 181    /* Notify COLO thread that failover work is finished */
 182    qemu_sem_post(&s->colo_exit_sem);
 183#else
 184    abort();
 185#endif
 186}
 187
 188COLOMode get_colo_mode(void)
 189{
 190    if (migration_in_colo_state()) {
 191        return COLO_MODE_PRIMARY;
 192    } else if (migration_incoming_in_colo_state()) {
 193        return COLO_MODE_SECONDARY;
 194    } else {
 195        return COLO_MODE_NONE;
 196    }
 197}
 198
 199void colo_do_failover(void)
 200{
 201    /* Make sure VM stopped while failover happened. */
 202    if (!colo_runstate_is_stopped()) {
 203        vm_stop_force_state(RUN_STATE_COLO);
 204    }
 205
 206    switch (get_colo_mode()) {
 207    case COLO_MODE_PRIMARY:
 208        primary_vm_do_failover();
 209        break;
 210    case COLO_MODE_SECONDARY:
 211        secondary_vm_do_failover();
 212        break;
 213    default:
 214        error_report("colo_do_failover failed because the colo mode"
 215                     " could not be obtained");
 216    }
 217}
 218
 219#ifdef CONFIG_REPLICATION
 220void qmp_xen_set_replication(bool enable, bool primary,
 221                             bool has_failover, bool failover,
 222                             Error **errp)
 223{
 224    ReplicationMode mode = primary ?
 225                           REPLICATION_MODE_PRIMARY :
 226                           REPLICATION_MODE_SECONDARY;
 227
 228    if (has_failover && enable) {
 229        error_setg(errp, "Parameter 'failover' is only for"
 230                   " stopping replication");
 231        return;
 232    }
 233
 234    if (enable) {
 235        replication_start_all(mode, errp);
 236    } else {
 237        if (!has_failover) {
 238            failover = NULL;
 239        }
 240        replication_stop_all(failover, failover ? NULL : errp);
 241    }
 242}
 243
 244ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
 245{
 246    Error *err = NULL;
 247    ReplicationStatus *s = g_new0(ReplicationStatus, 1);
 248
 249    replication_get_error_all(&err);
 250    if (err) {
 251        s->error = true;
 252        s->has_desc = true;
 253        s->desc = g_strdup(error_get_pretty(err));
 254    } else {
 255        s->error = false;
 256    }
 257
 258    error_free(err);
 259    return s;
 260}
 261
 262void qmp_xen_colo_do_checkpoint(Error **errp)
 263{
 264    replication_do_checkpoint_all(errp);
 265    /* Notify all filters of all NIC to do checkpoint */
 266    colo_notify_filters_event(COLO_EVENT_CHECKPOINT, errp);
 267}
 268#endif
 269
 270COLOStatus *qmp_query_colo_status(Error **errp)
 271{
 272    COLOStatus *s = g_new0(COLOStatus, 1);
 273
 274    s->mode = get_colo_mode();
 275    s->last_mode = last_colo_mode;
 276
 277    switch (failover_get_state()) {
 278    case FAILOVER_STATUS_NONE:
 279        s->reason = COLO_EXIT_REASON_NONE;
 280        break;
 281    case FAILOVER_STATUS_COMPLETED:
 282        s->reason = COLO_EXIT_REASON_REQUEST;
 283        break;
 284    default:
 285        if (migration_in_colo_state()) {
 286            s->reason = COLO_EXIT_REASON_PROCESSING;
 287        } else {
 288            s->reason = COLO_EXIT_REASON_ERROR;
 289        }
 290    }
 291
 292    return s;
 293}
 294
 295static void colo_send_message(QEMUFile *f, COLOMessage msg,
 296                              Error **errp)
 297{
 298    int ret;
 299
 300    if (msg >= COLO_MESSAGE__MAX) {
 301        error_setg(errp, "%s: Invalid message", __func__);
 302        return;
 303    }
 304    qemu_put_be32(f, msg);
 305    qemu_fflush(f);
 306
 307    ret = qemu_file_get_error(f);
 308    if (ret < 0) {
 309        error_setg_errno(errp, -ret, "Can't send COLO message");
 310    }
 311    trace_colo_send_message(COLOMessage_str(msg));
 312}
 313
 314static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
 315                                    uint64_t value, Error **errp)
 316{
 317    Error *local_err = NULL;
 318    int ret;
 319
 320    colo_send_message(f, msg, &local_err);
 321    if (local_err) {
 322        error_propagate(errp, local_err);
 323        return;
 324    }
 325    qemu_put_be64(f, value);
 326    qemu_fflush(f);
 327
 328    ret = qemu_file_get_error(f);
 329    if (ret < 0) {
 330        error_setg_errno(errp, -ret, "Failed to send value for message:%s",
 331                         COLOMessage_str(msg));
 332    }
 333}
 334
 335static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
 336{
 337    COLOMessage msg;
 338    int ret;
 339
 340    msg = qemu_get_be32(f);
 341    ret = qemu_file_get_error(f);
 342    if (ret < 0) {
 343        error_setg_errno(errp, -ret, "Can't receive COLO message");
 344        return msg;
 345    }
 346    if (msg >= COLO_MESSAGE__MAX) {
 347        error_setg(errp, "%s: Invalid message", __func__);
 348        return msg;
 349    }
 350    trace_colo_receive_message(COLOMessage_str(msg));
 351    return msg;
 352}
 353
 354static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
 355                                       Error **errp)
 356{
 357    COLOMessage msg;
 358    Error *local_err = NULL;
 359
 360    msg = colo_receive_message(f, &local_err);
 361    if (local_err) {
 362        error_propagate(errp, local_err);
 363        return;
 364    }
 365    if (msg != expect_msg) {
 366        error_setg(errp, "Unexpected COLO message %d, expected %d",
 367                          msg, expect_msg);
 368    }
 369}
 370
 371static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
 372                                           Error **errp)
 373{
 374    Error *local_err = NULL;
 375    uint64_t value;
 376    int ret;
 377
 378    colo_receive_check_message(f, expect_msg, &local_err);
 379    if (local_err) {
 380        error_propagate(errp, local_err);
 381        return 0;
 382    }
 383
 384    value = qemu_get_be64(f);
 385    ret = qemu_file_get_error(f);
 386    if (ret < 0) {
 387        error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
 388                         COLOMessage_str(expect_msg));
 389    }
 390    return value;
 391}
 392
 393static int colo_do_checkpoint_transaction(MigrationState *s,
 394                                          QIOChannelBuffer *bioc,
 395                                          QEMUFile *fb)
 396{
 397    Error *local_err = NULL;
 398    int ret = -1;
 399
 400    colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
 401                      &local_err);
 402    if (local_err) {
 403        goto out;
 404    }
 405
 406    colo_receive_check_message(s->rp_state.from_dst_file,
 407                    COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
 408    if (local_err) {
 409        goto out;
 410    }
 411    /* Reset channel-buffer directly */
 412    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 413    bioc->usage = 0;
 414
 415    qemu_mutex_lock_iothread();
 416    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 417        qemu_mutex_unlock_iothread();
 418        goto out;
 419    }
 420    vm_stop_force_state(RUN_STATE_COLO);
 421    qemu_mutex_unlock_iothread();
 422    trace_colo_vm_state_change("run", "stop");
 423    /*
 424     * Failover request bh could be called after vm_stop_force_state(),
 425     * So we need check failover_request_is_active() again.
 426     */
 427    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 428        goto out;
 429    }
 430
 431    colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err);
 432    if (local_err) {
 433        goto out;
 434    }
 435
 436    /* Disable block migration */
 437    migrate_set_block_enabled(false, &local_err);
 438    qemu_mutex_lock_iothread();
 439
 440#ifdef CONFIG_REPLICATION
 441    replication_do_checkpoint_all(&local_err);
 442    if (local_err) {
 443        qemu_mutex_unlock_iothread();
 444        goto out;
 445    }
 446#else
 447        abort();
 448#endif
 449
 450    colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
 451    if (local_err) {
 452        qemu_mutex_unlock_iothread();
 453        goto out;
 454    }
 455    /* Note: device state is saved into buffer */
 456    ret = qemu_save_device_state(fb);
 457
 458    qemu_mutex_unlock_iothread();
 459    if (ret < 0) {
 460        goto out;
 461    }
 462    /*
 463     * Only save VM's live state, which not including device state.
 464     * TODO: We may need a timeout mechanism to prevent COLO process
 465     * to be blocked here.
 466     */
 467    qemu_savevm_live_state(s->to_dst_file);
 468
 469    qemu_fflush(fb);
 470
 471    /*
 472     * We need the size of the VMstate data in Secondary side,
 473     * With which we can decide how much data should be read.
 474     */
 475    colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
 476                            bioc->usage, &local_err);
 477    if (local_err) {
 478        goto out;
 479    }
 480
 481    qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
 482    qemu_fflush(s->to_dst_file);
 483    ret = qemu_file_get_error(s->to_dst_file);
 484    if (ret < 0) {
 485        goto out;
 486    }
 487
 488    colo_receive_check_message(s->rp_state.from_dst_file,
 489                       COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
 490    if (local_err) {
 491        goto out;
 492    }
 493
 494    colo_receive_check_message(s->rp_state.from_dst_file,
 495                       COLO_MESSAGE_VMSTATE_LOADED, &local_err);
 496    if (local_err) {
 497        goto out;
 498    }
 499
 500    ret = 0;
 501
 502    qemu_mutex_lock_iothread();
 503    vm_start();
 504    qemu_mutex_unlock_iothread();
 505    trace_colo_vm_state_change("stop", "run");
 506
 507out:
 508    if (local_err) {
 509        error_report_err(local_err);
 510    }
 511    return ret;
 512}
 513
 514static void colo_compare_notify_checkpoint(Notifier *notifier, void *data)
 515{
 516    colo_checkpoint_notify(data);
 517}
 518
 519static void colo_process_checkpoint(MigrationState *s)
 520{
 521    QIOChannelBuffer *bioc;
 522    QEMUFile *fb = NULL;
 523    int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 524    Error *local_err = NULL;
 525    int ret;
 526
 527    last_colo_mode = get_colo_mode();
 528    if (last_colo_mode != COLO_MODE_PRIMARY) {
 529        error_report("COLO mode must be COLO_MODE_PRIMARY");
 530        return;
 531    }
 532
 533    failover_init_state();
 534
 535    s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
 536    if (!s->rp_state.from_dst_file) {
 537        error_report("Open QEMUFile from_dst_file failed");
 538        goto out;
 539    }
 540
 541    packets_compare_notifier.notify = colo_compare_notify_checkpoint;
 542    colo_compare_register_notifier(&packets_compare_notifier);
 543
 544    /*
 545     * Wait for Secondary finish loading VM states and enter COLO
 546     * restore.
 547     */
 548    colo_receive_check_message(s->rp_state.from_dst_file,
 549                       COLO_MESSAGE_CHECKPOINT_READY, &local_err);
 550    if (local_err) {
 551        goto out;
 552    }
 553    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 554    fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
 555    object_unref(OBJECT(bioc));
 556
 557    qemu_mutex_lock_iothread();
 558#ifdef CONFIG_REPLICATION
 559    replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
 560    if (local_err) {
 561        qemu_mutex_unlock_iothread();
 562        goto out;
 563    }
 564#else
 565        abort();
 566#endif
 567
 568    vm_start();
 569    qemu_mutex_unlock_iothread();
 570    trace_colo_vm_state_change("stop", "run");
 571
 572    timer_mod(s->colo_delay_timer,
 573            current_time + s->parameters.x_checkpoint_delay);
 574
 575    while (s->state == MIGRATION_STATUS_COLO) {
 576        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 577            error_report("failover request");
 578            goto out;
 579        }
 580
 581        qemu_sem_wait(&s->colo_checkpoint_sem);
 582
 583        if (s->state != MIGRATION_STATUS_COLO) {
 584            goto out;
 585        }
 586        ret = colo_do_checkpoint_transaction(s, bioc, fb);
 587        if (ret < 0) {
 588            goto out;
 589        }
 590    }
 591
 592out:
 593    /* Throw the unreported error message after exited from loop */
 594    if (local_err) {
 595        error_report_err(local_err);
 596    }
 597
 598    if (fb) {
 599        qemu_fclose(fb);
 600    }
 601
 602    /*
 603     * There are only two reasons we can get here, some error happened
 604     * or the user triggered failover.
 605     */
 606    switch (failover_get_state()) {
 607    case FAILOVER_STATUS_COMPLETED:
 608        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 609                                  COLO_EXIT_REASON_REQUEST);
 610        break;
 611    default:
 612        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 613                                  COLO_EXIT_REASON_ERROR);
 614    }
 615
 616    /* Hope this not to be too long to wait here */
 617    qemu_sem_wait(&s->colo_exit_sem);
 618    qemu_sem_destroy(&s->colo_exit_sem);
 619
 620    /*
 621     * It is safe to unregister notifier after failover finished.
 622     * Besides, colo_delay_timer and colo_checkpoint_sem can't be
 623     * released befor unregister notifier, or there will be use-after-free
 624     * error.
 625     */
 626    colo_compare_unregister_notifier(&packets_compare_notifier);
 627    timer_del(s->colo_delay_timer);
 628    timer_free(s->colo_delay_timer);
 629    qemu_sem_destroy(&s->colo_checkpoint_sem);
 630
 631    /*
 632     * Must be called after failover BH is completed,
 633     * Or the failover BH may shutdown the wrong fd that
 634     * re-used by other threads after we release here.
 635     */
 636    if (s->rp_state.from_dst_file) {
 637        qemu_fclose(s->rp_state.from_dst_file);
 638    }
 639}
 640
 641void colo_checkpoint_notify(void *opaque)
 642{
 643    MigrationState *s = opaque;
 644    int64_t next_notify_time;
 645
 646    qemu_sem_post(&s->colo_checkpoint_sem);
 647    s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 648    next_notify_time = s->colo_checkpoint_time +
 649                    s->parameters.x_checkpoint_delay;
 650    timer_mod(s->colo_delay_timer, next_notify_time);
 651}
 652
 653void migrate_start_colo_process(MigrationState *s)
 654{
 655    qemu_mutex_unlock_iothread();
 656    qemu_sem_init(&s->colo_checkpoint_sem, 0);
 657    s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
 658                                colo_checkpoint_notify, s);
 659
 660    qemu_sem_init(&s->colo_exit_sem, 0);
 661    migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
 662                      MIGRATION_STATUS_COLO);
 663    colo_process_checkpoint(s);
 664    qemu_mutex_lock_iothread();
 665}
 666
 667static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
 668                                     Error **errp)
 669{
 670    COLOMessage msg;
 671    Error *local_err = NULL;
 672
 673    msg = colo_receive_message(f, &local_err);
 674    if (local_err) {
 675        error_propagate(errp, local_err);
 676        return;
 677    }
 678
 679    switch (msg) {
 680    case COLO_MESSAGE_CHECKPOINT_REQUEST:
 681        *checkpoint_request = 1;
 682        break;
 683    default:
 684        *checkpoint_request = 0;
 685        error_setg(errp, "Got unknown COLO message: %d", msg);
 686        break;
 687    }
 688}
 689
 690void *colo_process_incoming_thread(void *opaque)
 691{
 692    MigrationIncomingState *mis = opaque;
 693    QEMUFile *fb = NULL;
 694    QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
 695    uint64_t total_size;
 696    uint64_t value;
 697    Error *local_err = NULL;
 698    int ret;
 699
 700    rcu_register_thread();
 701    qemu_sem_init(&mis->colo_incoming_sem, 0);
 702
 703    migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
 704                      MIGRATION_STATUS_COLO);
 705
 706    last_colo_mode = get_colo_mode();
 707    if (last_colo_mode != COLO_MODE_SECONDARY) {
 708        error_report("COLO mode must be COLO_MODE_SECONDARY");
 709        return NULL;
 710    }
 711
 712    failover_init_state();
 713
 714    mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
 715    if (!mis->to_src_file) {
 716        error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
 717        goto out;
 718    }
 719    /*
 720     * Note: the communication between Primary side and Secondary side
 721     * should be sequential, we set the fd to unblocked in migration incoming
 722     * coroutine, and here we are in the COLO incoming thread, so it is ok to
 723     * set the fd back to blocked.
 724     */
 725    qemu_file_set_blocking(mis->from_src_file, true);
 726
 727    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 728    fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
 729    object_unref(OBJECT(bioc));
 730
 731    qemu_mutex_lock_iothread();
 732#ifdef CONFIG_REPLICATION
 733    replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
 734    if (local_err) {
 735        qemu_mutex_unlock_iothread();
 736        goto out;
 737    }
 738#else
 739        abort();
 740#endif
 741    vm_start();
 742    trace_colo_vm_state_change("stop", "run");
 743    qemu_mutex_unlock_iothread();
 744
 745    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
 746                      &local_err);
 747    if (local_err) {
 748        goto out;
 749    }
 750
 751    while (mis->state == MIGRATION_STATUS_COLO) {
 752        int request = 0;
 753
 754        colo_wait_handle_message(mis->from_src_file, &request, &local_err);
 755        if (local_err) {
 756            goto out;
 757        }
 758        assert(request);
 759        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 760            error_report("failover request");
 761            goto out;
 762        }
 763
 764        qemu_mutex_lock_iothread();
 765        vm_stop_force_state(RUN_STATE_COLO);
 766        trace_colo_vm_state_change("run", "stop");
 767        qemu_mutex_unlock_iothread();
 768
 769        /* FIXME: This is unnecessary for periodic checkpoint mode */
 770        colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
 771                     &local_err);
 772        if (local_err) {
 773            goto out;
 774        }
 775
 776        colo_receive_check_message(mis->from_src_file,
 777                           COLO_MESSAGE_VMSTATE_SEND, &local_err);
 778        if (local_err) {
 779            goto out;
 780        }
 781
 782        qemu_mutex_lock_iothread();
 783        cpu_synchronize_all_pre_loadvm();
 784        ret = qemu_loadvm_state_main(mis->from_src_file, mis);
 785        qemu_mutex_unlock_iothread();
 786
 787        if (ret < 0) {
 788            error_report("Load VM's live state (ram) error");
 789            goto out;
 790        }
 791
 792        value = colo_receive_message_value(mis->from_src_file,
 793                                 COLO_MESSAGE_VMSTATE_SIZE, &local_err);
 794        if (local_err) {
 795            goto out;
 796        }
 797
 798        /*
 799         * Read VM device state data into channel buffer,
 800         * It's better to re-use the memory allocated.
 801         * Here we need to handle the channel buffer directly.
 802         */
 803        if (value > bioc->capacity) {
 804            bioc->capacity = value;
 805            bioc->data = g_realloc(bioc->data, bioc->capacity);
 806        }
 807        total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
 808        if (total_size != value) {
 809            error_report("Got %" PRIu64 " VMState data, less than expected"
 810                        " %" PRIu64, total_size, value);
 811            goto out;
 812        }
 813        bioc->usage = total_size;
 814        qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 815
 816        colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
 817                     &local_err);
 818        if (local_err) {
 819            goto out;
 820        }
 821
 822        qemu_mutex_lock_iothread();
 823        vmstate_loading = true;
 824        ret = qemu_load_device_state(fb);
 825        if (ret < 0) {
 826            error_report("COLO: load device state failed");
 827            qemu_mutex_unlock_iothread();
 828            goto out;
 829        }
 830
 831#ifdef CONFIG_REPLICATION
 832        replication_get_error_all(&local_err);
 833        if (local_err) {
 834            qemu_mutex_unlock_iothread();
 835            goto out;
 836        }
 837
 838        /* discard colo disk buffer */
 839        replication_do_checkpoint_all(&local_err);
 840        if (local_err) {
 841            qemu_mutex_unlock_iothread();
 842            goto out;
 843        }
 844#else
 845        abort();
 846#endif
 847        /* Notify all filters of all NIC to do checkpoint */
 848        colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
 849
 850        if (local_err) {
 851            qemu_mutex_unlock_iothread();
 852            goto out;
 853        }
 854
 855        vmstate_loading = false;
 856        vm_start();
 857        trace_colo_vm_state_change("stop", "run");
 858        qemu_mutex_unlock_iothread();
 859
 860        if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 861            failover_set_state(FAILOVER_STATUS_RELAUNCH,
 862                            FAILOVER_STATUS_NONE);
 863            failover_request_active(NULL);
 864            goto out;
 865        }
 866
 867        colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
 868                     &local_err);
 869        if (local_err) {
 870            goto out;
 871        }
 872    }
 873
 874out:
 875    vmstate_loading = false;
 876    /* Throw the unreported error message after exited from loop */
 877    if (local_err) {
 878        error_report_err(local_err);
 879    }
 880
 881    /*
 882     * There are only two reasons we can get here, some error happened
 883     * or the user triggered failover.
 884     */
 885    switch (failover_get_state()) {
 886    case FAILOVER_STATUS_COMPLETED:
 887        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 888                                  COLO_EXIT_REASON_REQUEST);
 889        break;
 890    default:
 891        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 892                                  COLO_EXIT_REASON_ERROR);
 893    }
 894
 895    if (fb) {
 896        qemu_fclose(fb);
 897    }
 898
 899    /* Hope this not to be too long to loop here */
 900    qemu_sem_wait(&mis->colo_incoming_sem);
 901    qemu_sem_destroy(&mis->colo_incoming_sem);
 902    /* Must be called after failover BH is completed */
 903    if (mis->to_src_file) {
 904        qemu_fclose(mis->to_src_file);
 905        mis->to_src_file = NULL;
 906    }
 907
 908    rcu_unregister_thread();
 909    return NULL;
 910}
 911