qemu/migration/colo.c
<<
>>
Prefs
   1/*
   2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
   3 * (a.k.a. Fault Tolerance or Continuous Replication)
   4 *
   5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 * Copyright (c) 2016 Intel Corporation
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or
  10 * later.  See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include "sysemu/sysemu.h"
  15#include "qapi/error.h"
  16#include "qapi/qapi-commands-migration.h"
  17#include "qemu-file-channel.h"
  18#include "migration.h"
  19#include "qemu-file.h"
  20#include "savevm.h"
  21#include "migration/colo.h"
  22#include "block.h"
  23#include "io/channel-buffer.h"
  24#include "trace.h"
  25#include "qemu/error-report.h"
  26#include "qemu/main-loop.h"
  27#include "qemu/rcu.h"
  28#include "migration/failover.h"
  29#include "migration/ram.h"
  30#ifdef CONFIG_REPLICATION
  31#include "block/replication.h"
  32#endif
  33#include "net/colo-compare.h"
  34#include "net/colo.h"
  35#include "block/block.h"
  36#include "qapi/qapi-events-migration.h"
  37#include "qapi/qmp/qerror.h"
  38#include "sysemu/cpus.h"
  39#include "sysemu/runstate.h"
  40#include "net/filter.h"
  41
  42static bool vmstate_loading;
  43static Notifier packets_compare_notifier;
  44
  45/* User need to know colo mode after COLO failover */
  46static COLOMode last_colo_mode;
  47
  48#define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
  49
  50bool migration_in_colo_state(void)
  51{
  52    MigrationState *s = migrate_get_current();
  53
  54    return (s->state == MIGRATION_STATUS_COLO);
  55}
  56
  57bool migration_incoming_in_colo_state(void)
  58{
  59    MigrationIncomingState *mis = migration_incoming_get_current();
  60
  61    return mis && (mis->state == MIGRATION_STATUS_COLO);
  62}
  63
  64static bool colo_runstate_is_stopped(void)
  65{
  66    return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
  67}
  68
  69static void secondary_vm_do_failover(void)
  70{
  71/* COLO needs enable block-replication */
  72#ifdef CONFIG_REPLICATION
  73    int old_state;
  74    MigrationIncomingState *mis = migration_incoming_get_current();
  75    Error *local_err = NULL;
  76
  77    /* Can not do failover during the process of VM's loading VMstate, Or
  78     * it will break the secondary VM.
  79     */
  80    if (vmstate_loading) {
  81        old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
  82                        FAILOVER_STATUS_RELAUNCH);
  83        if (old_state != FAILOVER_STATUS_ACTIVE) {
  84            error_report("Unknown error while do failover for secondary VM,"
  85                         "old_state: %s", FailoverStatus_str(old_state));
  86        }
  87        return;
  88    }
  89
  90    migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
  91                      MIGRATION_STATUS_COMPLETED);
  92
  93    replication_stop_all(true, &local_err);
  94    if (local_err) {
  95        error_report_err(local_err);
  96        local_err = NULL;
  97    }
  98
  99    /* Notify all filters of all NIC to do checkpoint */
 100    colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err);
 101    if (local_err) {
 102        error_report_err(local_err);
 103    }
 104
 105    if (!autostart) {
 106        error_report("\"-S\" qemu option will be ignored in secondary side");
 107        /* recover runstate to normal migration finish state */
 108        autostart = true;
 109    }
 110    /*
 111     * Make sure COLO incoming thread not block in recv or send,
 112     * If mis->from_src_file and mis->to_src_file use the same fd,
 113     * The second shutdown() will return -1, we ignore this value,
 114     * It is harmless.
 115     */
 116    if (mis->from_src_file) {
 117        qemu_file_shutdown(mis->from_src_file);
 118    }
 119    if (mis->to_src_file) {
 120        qemu_file_shutdown(mis->to_src_file);
 121    }
 122
 123    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 124                                   FAILOVER_STATUS_COMPLETED);
 125    if (old_state != FAILOVER_STATUS_ACTIVE) {
 126        error_report("Incorrect state (%s) while doing failover for "
 127                     "secondary VM", FailoverStatus_str(old_state));
 128        return;
 129    }
 130    /* Notify COLO incoming thread that failover work is finished */
 131    qemu_sem_post(&mis->colo_incoming_sem);
 132
 133    /* For Secondary VM, jump to incoming co */
 134    if (mis->migration_incoming_co) {
 135        qemu_coroutine_enter(mis->migration_incoming_co);
 136    }
 137#else
 138    abort();
 139#endif
 140}
 141
 142static void primary_vm_do_failover(void)
 143{
 144#ifdef CONFIG_REPLICATION
 145    MigrationState *s = migrate_get_current();
 146    int old_state;
 147    Error *local_err = NULL;
 148
 149    migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
 150                      MIGRATION_STATUS_COMPLETED);
 151    /*
 152     * kick COLO thread which might wait at
 153     * qemu_sem_wait(&s->colo_checkpoint_sem).
 154     */
 155    colo_checkpoint_notify(migrate_get_current());
 156
 157    /*
 158     * Wake up COLO thread which may blocked in recv() or send(),
 159     * The s->rp_state.from_dst_file and s->to_dst_file may use the
 160     * same fd, but we still shutdown the fd for twice, it is harmless.
 161     */
 162    if (s->to_dst_file) {
 163        qemu_file_shutdown(s->to_dst_file);
 164    }
 165    if (s->rp_state.from_dst_file) {
 166        qemu_file_shutdown(s->rp_state.from_dst_file);
 167    }
 168
 169    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 170                                   FAILOVER_STATUS_COMPLETED);
 171    if (old_state != FAILOVER_STATUS_ACTIVE) {
 172        error_report("Incorrect state (%s) while doing failover for Primary VM",
 173                     FailoverStatus_str(old_state));
 174        return;
 175    }
 176
 177    replication_stop_all(true, &local_err);
 178    if (local_err) {
 179        error_report_err(local_err);
 180        local_err = NULL;
 181    }
 182
 183    /* Notify COLO thread that failover work is finished */
 184    qemu_sem_post(&s->colo_exit_sem);
 185#else
 186    abort();
 187#endif
 188}
 189
 190COLOMode get_colo_mode(void)
 191{
 192    if (migration_in_colo_state()) {
 193        return COLO_MODE_PRIMARY;
 194    } else if (migration_incoming_in_colo_state()) {
 195        return COLO_MODE_SECONDARY;
 196    } else {
 197        return COLO_MODE_NONE;
 198    }
 199}
 200
 201void colo_do_failover(void)
 202{
 203    /* Make sure VM stopped while failover happened. */
 204    if (!colo_runstate_is_stopped()) {
 205        vm_stop_force_state(RUN_STATE_COLO);
 206    }
 207
 208    switch (get_colo_mode()) {
 209    case COLO_MODE_PRIMARY:
 210        primary_vm_do_failover();
 211        break;
 212    case COLO_MODE_SECONDARY:
 213        secondary_vm_do_failover();
 214        break;
 215    default:
 216        error_report("colo_do_failover failed because the colo mode"
 217                     " could not be obtained");
 218    }
 219}
 220
 221#ifdef CONFIG_REPLICATION
 222void qmp_xen_set_replication(bool enable, bool primary,
 223                             bool has_failover, bool failover,
 224                             Error **errp)
 225{
 226    ReplicationMode mode = primary ?
 227                           REPLICATION_MODE_PRIMARY :
 228                           REPLICATION_MODE_SECONDARY;
 229
 230    if (has_failover && enable) {
 231        error_setg(errp, "Parameter 'failover' is only for"
 232                   " stopping replication");
 233        return;
 234    }
 235
 236    if (enable) {
 237        replication_start_all(mode, errp);
 238    } else {
 239        if (!has_failover) {
 240            failover = NULL;
 241        }
 242        replication_stop_all(failover, failover ? NULL : errp);
 243    }
 244}
 245
 246ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
 247{
 248    Error *err = NULL;
 249    ReplicationStatus *s = g_new0(ReplicationStatus, 1);
 250
 251    replication_get_error_all(&err);
 252    if (err) {
 253        s->error = true;
 254        s->has_desc = true;
 255        s->desc = g_strdup(error_get_pretty(err));
 256    } else {
 257        s->error = false;
 258    }
 259
 260    error_free(err);
 261    return s;
 262}
 263
 264void qmp_xen_colo_do_checkpoint(Error **errp)
 265{
 266    Error *err = NULL;
 267
 268    replication_do_checkpoint_all(&err);
 269    if (err) {
 270        error_propagate(errp, err);
 271        return;
 272    }
 273    /* Notify all filters of all NIC to do checkpoint */
 274    colo_notify_filters_event(COLO_EVENT_CHECKPOINT, errp);
 275}
 276#endif
 277
 278COLOStatus *qmp_query_colo_status(Error **errp)
 279{
 280    COLOStatus *s = g_new0(COLOStatus, 1);
 281
 282    s->mode = get_colo_mode();
 283    s->last_mode = last_colo_mode;
 284
 285    switch (failover_get_state()) {
 286    case FAILOVER_STATUS_NONE:
 287        s->reason = COLO_EXIT_REASON_NONE;
 288        break;
 289    case FAILOVER_STATUS_COMPLETED:
 290        s->reason = COLO_EXIT_REASON_REQUEST;
 291        break;
 292    default:
 293        if (migration_in_colo_state()) {
 294            s->reason = COLO_EXIT_REASON_PROCESSING;
 295        } else {
 296            s->reason = COLO_EXIT_REASON_ERROR;
 297        }
 298    }
 299
 300    return s;
 301}
 302
 303static void colo_send_message(QEMUFile *f, COLOMessage msg,
 304                              Error **errp)
 305{
 306    int ret;
 307
 308    if (msg >= COLO_MESSAGE__MAX) {
 309        error_setg(errp, "%s: Invalid message", __func__);
 310        return;
 311    }
 312    qemu_put_be32(f, msg);
 313    qemu_fflush(f);
 314
 315    ret = qemu_file_get_error(f);
 316    if (ret < 0) {
 317        error_setg_errno(errp, -ret, "Can't send COLO message");
 318    }
 319    trace_colo_send_message(COLOMessage_str(msg));
 320}
 321
 322static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
 323                                    uint64_t value, Error **errp)
 324{
 325    Error *local_err = NULL;
 326    int ret;
 327
 328    colo_send_message(f, msg, &local_err);
 329    if (local_err) {
 330        error_propagate(errp, local_err);
 331        return;
 332    }
 333    qemu_put_be64(f, value);
 334    qemu_fflush(f);
 335
 336    ret = qemu_file_get_error(f);
 337    if (ret < 0) {
 338        error_setg_errno(errp, -ret, "Failed to send value for message:%s",
 339                         COLOMessage_str(msg));
 340    }
 341}
 342
 343static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
 344{
 345    COLOMessage msg;
 346    int ret;
 347
 348    msg = qemu_get_be32(f);
 349    ret = qemu_file_get_error(f);
 350    if (ret < 0) {
 351        error_setg_errno(errp, -ret, "Can't receive COLO message");
 352        return msg;
 353    }
 354    if (msg >= COLO_MESSAGE__MAX) {
 355        error_setg(errp, "%s: Invalid message", __func__);
 356        return msg;
 357    }
 358    trace_colo_receive_message(COLOMessage_str(msg));
 359    return msg;
 360}
 361
 362static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
 363                                       Error **errp)
 364{
 365    COLOMessage msg;
 366    Error *local_err = NULL;
 367
 368    msg = colo_receive_message(f, &local_err);
 369    if (local_err) {
 370        error_propagate(errp, local_err);
 371        return;
 372    }
 373    if (msg != expect_msg) {
 374        error_setg(errp, "Unexpected COLO message %d, expected %d",
 375                          msg, expect_msg);
 376    }
 377}
 378
 379static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
 380                                           Error **errp)
 381{
 382    Error *local_err = NULL;
 383    uint64_t value;
 384    int ret;
 385
 386    colo_receive_check_message(f, expect_msg, &local_err);
 387    if (local_err) {
 388        error_propagate(errp, local_err);
 389        return 0;
 390    }
 391
 392    value = qemu_get_be64(f);
 393    ret = qemu_file_get_error(f);
 394    if (ret < 0) {
 395        error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
 396                         COLOMessage_str(expect_msg));
 397    }
 398    return value;
 399}
 400
 401static int colo_do_checkpoint_transaction(MigrationState *s,
 402                                          QIOChannelBuffer *bioc,
 403                                          QEMUFile *fb)
 404{
 405    Error *local_err = NULL;
 406    int ret = -1;
 407
 408    colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
 409                      &local_err);
 410    if (local_err) {
 411        goto out;
 412    }
 413
 414    colo_receive_check_message(s->rp_state.from_dst_file,
 415                    COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
 416    if (local_err) {
 417        goto out;
 418    }
 419    /* Reset channel-buffer directly */
 420    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 421    bioc->usage = 0;
 422
 423    qemu_mutex_lock_iothread();
 424    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 425        qemu_mutex_unlock_iothread();
 426        goto out;
 427    }
 428    vm_stop_force_state(RUN_STATE_COLO);
 429    qemu_mutex_unlock_iothread();
 430    trace_colo_vm_state_change("run", "stop");
 431    /*
 432     * Failover request bh could be called after vm_stop_force_state(),
 433     * So we need check failover_request_is_active() again.
 434     */
 435    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 436        goto out;
 437    }
 438    qemu_mutex_lock_iothread();
 439
 440#ifdef CONFIG_REPLICATION
 441    replication_do_checkpoint_all(&local_err);
 442    if (local_err) {
 443        qemu_mutex_unlock_iothread();
 444        goto out;
 445    }
 446#else
 447        abort();
 448#endif
 449
 450    colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
 451    if (local_err) {
 452        qemu_mutex_unlock_iothread();
 453        goto out;
 454    }
 455    /* Note: device state is saved into buffer */
 456    ret = qemu_save_device_state(fb);
 457
 458    qemu_mutex_unlock_iothread();
 459    if (ret < 0) {
 460        goto out;
 461    }
 462    /*
 463     * Only save VM's live state, which not including device state.
 464     * TODO: We may need a timeout mechanism to prevent COLO process
 465     * to be blocked here.
 466     */
 467    qemu_savevm_live_state(s->to_dst_file);
 468
 469    qemu_fflush(fb);
 470
 471    /*
 472     * We need the size of the VMstate data in Secondary side,
 473     * With which we can decide how much data should be read.
 474     */
 475    colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
 476                            bioc->usage, &local_err);
 477    if (local_err) {
 478        goto out;
 479    }
 480
 481    qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
 482    qemu_fflush(s->to_dst_file);
 483    ret = qemu_file_get_error(s->to_dst_file);
 484    if (ret < 0) {
 485        goto out;
 486    }
 487
 488    colo_receive_check_message(s->rp_state.from_dst_file,
 489                       COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
 490    if (local_err) {
 491        goto out;
 492    }
 493
 494    qemu_event_reset(&s->colo_checkpoint_event);
 495    colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err);
 496    if (local_err) {
 497        goto out;
 498    }
 499
 500    colo_receive_check_message(s->rp_state.from_dst_file,
 501                       COLO_MESSAGE_VMSTATE_LOADED, &local_err);
 502    if (local_err) {
 503        goto out;
 504    }
 505
 506    ret = 0;
 507
 508    qemu_mutex_lock_iothread();
 509    vm_start();
 510    qemu_mutex_unlock_iothread();
 511    trace_colo_vm_state_change("stop", "run");
 512
 513out:
 514    if (local_err) {
 515        error_report_err(local_err);
 516    }
 517    return ret;
 518}
 519
 520static void colo_compare_notify_checkpoint(Notifier *notifier, void *data)
 521{
 522    colo_checkpoint_notify(data);
 523}
 524
 525static void colo_process_checkpoint(MigrationState *s)
 526{
 527    QIOChannelBuffer *bioc;
 528    QEMUFile *fb = NULL;
 529    int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 530    Error *local_err = NULL;
 531    int ret;
 532
 533    last_colo_mode = get_colo_mode();
 534    if (last_colo_mode != COLO_MODE_PRIMARY) {
 535        error_report("COLO mode must be COLO_MODE_PRIMARY");
 536        return;
 537    }
 538
 539    failover_init_state();
 540
 541    s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
 542    if (!s->rp_state.from_dst_file) {
 543        error_report("Open QEMUFile from_dst_file failed");
 544        goto out;
 545    }
 546
 547    packets_compare_notifier.notify = colo_compare_notify_checkpoint;
 548    colo_compare_register_notifier(&packets_compare_notifier);
 549
 550    /*
 551     * Wait for Secondary finish loading VM states and enter COLO
 552     * restore.
 553     */
 554    colo_receive_check_message(s->rp_state.from_dst_file,
 555                       COLO_MESSAGE_CHECKPOINT_READY, &local_err);
 556    if (local_err) {
 557        goto out;
 558    }
 559    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 560    fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
 561    object_unref(OBJECT(bioc));
 562
 563    qemu_mutex_lock_iothread();
 564#ifdef CONFIG_REPLICATION
 565    replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
 566    if (local_err) {
 567        qemu_mutex_unlock_iothread();
 568        goto out;
 569    }
 570#else
 571        abort();
 572#endif
 573
 574    vm_start();
 575    qemu_mutex_unlock_iothread();
 576    trace_colo_vm_state_change("stop", "run");
 577
 578    timer_mod(s->colo_delay_timer,
 579            current_time + s->parameters.x_checkpoint_delay);
 580
 581    while (s->state == MIGRATION_STATUS_COLO) {
 582        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 583            error_report("failover request");
 584            goto out;
 585        }
 586
 587        qemu_event_wait(&s->colo_checkpoint_event);
 588
 589        if (s->state != MIGRATION_STATUS_COLO) {
 590            goto out;
 591        }
 592        ret = colo_do_checkpoint_transaction(s, bioc, fb);
 593        if (ret < 0) {
 594            goto out;
 595        }
 596    }
 597
 598out:
 599    /* Throw the unreported error message after exited from loop */
 600    if (local_err) {
 601        error_report_err(local_err);
 602    }
 603
 604    if (fb) {
 605        qemu_fclose(fb);
 606    }
 607
 608    /*
 609     * There are only two reasons we can get here, some error happened
 610     * or the user triggered failover.
 611     */
 612    switch (failover_get_state()) {
 613    case FAILOVER_STATUS_COMPLETED:
 614        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 615                                  COLO_EXIT_REASON_REQUEST);
 616        break;
 617    default:
 618        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 619                                  COLO_EXIT_REASON_ERROR);
 620    }
 621
 622    /* Hope this not to be too long to wait here */
 623    qemu_sem_wait(&s->colo_exit_sem);
 624    qemu_sem_destroy(&s->colo_exit_sem);
 625
 626    /*
 627     * It is safe to unregister notifier after failover finished.
 628     * Besides, colo_delay_timer and colo_checkpoint_sem can't be
 629     * released before unregister notifier, or there will be use-after-free
 630     * error.
 631     */
 632    colo_compare_unregister_notifier(&packets_compare_notifier);
 633    timer_free(s->colo_delay_timer);
 634    qemu_event_destroy(&s->colo_checkpoint_event);
 635
 636    /*
 637     * Must be called after failover BH is completed,
 638     * Or the failover BH may shutdown the wrong fd that
 639     * re-used by other threads after we release here.
 640     */
 641    if (s->rp_state.from_dst_file) {
 642        qemu_fclose(s->rp_state.from_dst_file);
 643    }
 644}
 645
 646void colo_checkpoint_notify(void *opaque)
 647{
 648    MigrationState *s = opaque;
 649    int64_t next_notify_time;
 650
 651    qemu_event_set(&s->colo_checkpoint_event);
 652    s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 653    next_notify_time = s->colo_checkpoint_time +
 654                    s->parameters.x_checkpoint_delay;
 655    timer_mod(s->colo_delay_timer, next_notify_time);
 656}
 657
 658void migrate_start_colo_process(MigrationState *s)
 659{
 660    qemu_mutex_unlock_iothread();
 661    qemu_event_init(&s->colo_checkpoint_event, false);
 662    s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
 663                                colo_checkpoint_notify, s);
 664
 665    qemu_sem_init(&s->colo_exit_sem, 0);
 666    migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
 667                      MIGRATION_STATUS_COLO);
 668    colo_process_checkpoint(s);
 669    qemu_mutex_lock_iothread();
 670}
 671
 672static void colo_incoming_process_checkpoint(MigrationIncomingState *mis,
 673                      QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
 674{
 675    uint64_t total_size;
 676    uint64_t value;
 677    Error *local_err = NULL;
 678    int ret;
 679
 680    qemu_mutex_lock_iothread();
 681    vm_stop_force_state(RUN_STATE_COLO);
 682    trace_colo_vm_state_change("run", "stop");
 683    qemu_mutex_unlock_iothread();
 684
 685    /* FIXME: This is unnecessary for periodic checkpoint mode */
 686    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
 687                 &local_err);
 688    if (local_err) {
 689        error_propagate(errp, local_err);
 690        return;
 691    }
 692
 693    colo_receive_check_message(mis->from_src_file,
 694                       COLO_MESSAGE_VMSTATE_SEND, &local_err);
 695    if (local_err) {
 696        error_propagate(errp, local_err);
 697        return;
 698    }
 699
 700    qemu_mutex_lock_iothread();
 701    cpu_synchronize_all_states();
 702    ret = qemu_loadvm_state_main(mis->from_src_file, mis);
 703    qemu_mutex_unlock_iothread();
 704
 705    if (ret < 0) {
 706        error_setg(errp, "Load VM's live state (ram) error");
 707        return;
 708    }
 709
 710    value = colo_receive_message_value(mis->from_src_file,
 711                             COLO_MESSAGE_VMSTATE_SIZE, &local_err);
 712    if (local_err) {
 713        error_propagate(errp, local_err);
 714        return;
 715    }
 716
 717    /*
 718     * Read VM device state data into channel buffer,
 719     * It's better to re-use the memory allocated.
 720     * Here we need to handle the channel buffer directly.
 721     */
 722    if (value > bioc->capacity) {
 723        bioc->capacity = value;
 724        bioc->data = g_realloc(bioc->data, bioc->capacity);
 725    }
 726    total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
 727    if (total_size != value) {
 728        error_setg(errp, "Got %" PRIu64 " VMState data, less than expected"
 729                    " %" PRIu64, total_size, value);
 730        return;
 731    }
 732    bioc->usage = total_size;
 733    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 734
 735    colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
 736                 &local_err);
 737    if (local_err) {
 738        error_propagate(errp, local_err);
 739        return;
 740    }
 741
 742    qemu_mutex_lock_iothread();
 743    vmstate_loading = true;
 744    colo_flush_ram_cache();
 745    ret = qemu_load_device_state(fb);
 746    if (ret < 0) {
 747        error_setg(errp, "COLO: load device state failed");
 748        vmstate_loading = false;
 749        qemu_mutex_unlock_iothread();
 750        return;
 751    }
 752
 753#ifdef CONFIG_REPLICATION
 754    replication_get_error_all(&local_err);
 755    if (local_err) {
 756        error_propagate(errp, local_err);
 757        vmstate_loading = false;
 758        qemu_mutex_unlock_iothread();
 759        return;
 760    }
 761
 762    /* discard colo disk buffer */
 763    replication_do_checkpoint_all(&local_err);
 764    if (local_err) {
 765        error_propagate(errp, local_err);
 766        vmstate_loading = false;
 767        qemu_mutex_unlock_iothread();
 768        return;
 769    }
 770#else
 771    abort();
 772#endif
 773    /* Notify all filters of all NIC to do checkpoint */
 774    colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
 775
 776    if (local_err) {
 777        error_propagate(errp, local_err);
 778        vmstate_loading = false;
 779        qemu_mutex_unlock_iothread();
 780        return;
 781    }
 782
 783    vmstate_loading = false;
 784    vm_start();
 785    trace_colo_vm_state_change("stop", "run");
 786    qemu_mutex_unlock_iothread();
 787
 788    if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 789        return;
 790    }
 791
 792    colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
 793                 &local_err);
 794    error_propagate(errp, local_err);
 795}
 796
 797static void colo_wait_handle_message(MigrationIncomingState *mis,
 798                QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
 799{
 800    COLOMessage msg;
 801    Error *local_err = NULL;
 802
 803    msg = colo_receive_message(mis->from_src_file, &local_err);
 804    if (local_err) {
 805        error_propagate(errp, local_err);
 806        return;
 807    }
 808
 809    switch (msg) {
 810    case COLO_MESSAGE_CHECKPOINT_REQUEST:
 811        colo_incoming_process_checkpoint(mis, fb, bioc, errp);
 812        break;
 813    default:
 814        error_setg(errp, "Got unknown COLO message: %d", msg);
 815        break;
 816    }
 817}
 818
 819void *colo_process_incoming_thread(void *opaque)
 820{
 821    MigrationIncomingState *mis = opaque;
 822    QEMUFile *fb = NULL;
 823    QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
 824    Error *local_err = NULL;
 825
 826    rcu_register_thread();
 827    qemu_sem_init(&mis->colo_incoming_sem, 0);
 828
 829    migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
 830                      MIGRATION_STATUS_COLO);
 831
 832    last_colo_mode = get_colo_mode();
 833    if (last_colo_mode != COLO_MODE_SECONDARY) {
 834        error_report("COLO mode must be COLO_MODE_SECONDARY");
 835        return NULL;
 836    }
 837
 838    failover_init_state();
 839
 840    mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
 841    if (!mis->to_src_file) {
 842        error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
 843        goto out;
 844    }
 845    /*
 846     * Note: the communication between Primary side and Secondary side
 847     * should be sequential, we set the fd to unblocked in migration incoming
 848     * coroutine, and here we are in the COLO incoming thread, so it is ok to
 849     * set the fd back to blocked.
 850     */
 851    qemu_file_set_blocking(mis->from_src_file, true);
 852
 853    colo_incoming_start_dirty_log();
 854
 855    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 856    fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
 857    object_unref(OBJECT(bioc));
 858
 859    qemu_mutex_lock_iothread();
 860#ifdef CONFIG_REPLICATION
 861    replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
 862    if (local_err) {
 863        qemu_mutex_unlock_iothread();
 864        goto out;
 865    }
 866#else
 867        abort();
 868#endif
 869    vm_start();
 870    trace_colo_vm_state_change("stop", "run");
 871    qemu_mutex_unlock_iothread();
 872
 873    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
 874                      &local_err);
 875    if (local_err) {
 876        goto out;
 877    }
 878
 879    while (mis->state == MIGRATION_STATUS_COLO) {
 880        colo_wait_handle_message(mis, fb, bioc, &local_err);
 881        if (local_err) {
 882            error_report_err(local_err);
 883            break;
 884        }
 885
 886        if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 887            failover_set_state(FAILOVER_STATUS_RELAUNCH,
 888                            FAILOVER_STATUS_NONE);
 889            failover_request_active(NULL);
 890            break;
 891        }
 892
 893        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 894            error_report("failover request");
 895            break;
 896        }
 897    }
 898
 899out:
 900    /*
 901     * There are only two reasons we can get here, some error happened
 902     * or the user triggered failover.
 903     */
 904    switch (failover_get_state()) {
 905    case FAILOVER_STATUS_COMPLETED:
 906        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 907                                  COLO_EXIT_REASON_REQUEST);
 908        break;
 909    default:
 910        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 911                                  COLO_EXIT_REASON_ERROR);
 912    }
 913
 914    if (fb) {
 915        qemu_fclose(fb);
 916    }
 917
 918    /* Hope this not to be too long to loop here */
 919    qemu_sem_wait(&mis->colo_incoming_sem);
 920    qemu_sem_destroy(&mis->colo_incoming_sem);
 921    /* Must be called after failover BH is completed */
 922    if (mis->to_src_file) {
 923        qemu_fclose(mis->to_src_file);
 924        mis->to_src_file = NULL;
 925    }
 926
 927    rcu_unregister_thread();
 928    return NULL;
 929}
 930