qemu/migration/colo.c
<<
>>
Prefs
   1/*
   2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
   3 * (a.k.a. Fault Tolerance or Continuous Replication)
   4 *
   5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 * Copyright (c) 2016 Intel Corporation
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or
  10 * later.  See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include "system/system.h"
  15#include "qapi/error.h"
  16#include "qapi/qapi-commands-migration.h"
  17#include "migration.h"
  18#include "qemu-file.h"
  19#include "savevm.h"
  20#include "migration/colo.h"
  21#include "io/channel-buffer.h"
  22#include "trace.h"
  23#include "qemu/error-report.h"
  24#include "qemu/main-loop.h"
  25#include "qemu/rcu.h"
  26#include "migration/failover.h"
  27#include "migration/ram.h"
  28#include "block/replication.h"
  29#include "net/colo-compare.h"
  30#include "net/colo.h"
  31#include "block/block.h"
  32#include "qapi/qapi-events-migration.h"
  33#include "system/cpus.h"
  34#include "system/runstate.h"
  35#include "net/filter.h"
  36#include "options.h"
  37
  38static bool vmstate_loading;
  39static Notifier packets_compare_notifier;
  40
  41/* User need to know colo mode after COLO failover */
  42static COLOMode last_colo_mode;
  43
  44#define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
  45
  46bool migration_in_colo_state(void)
  47{
  48    MigrationState *s = migrate_get_current();
  49
  50    return (s->state == MIGRATION_STATUS_COLO);
  51}
  52
  53bool migration_incoming_in_colo_state(void)
  54{
  55    MigrationIncomingState *mis = migration_incoming_get_current();
  56
  57    return mis && (mis->state == MIGRATION_STATUS_COLO);
  58}
  59
  60static bool colo_runstate_is_stopped(void)
  61{
  62    return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
  63}
  64
  65static void colo_checkpoint_notify(void)
  66{
  67    MigrationState *s = migrate_get_current();
  68    int64_t next_notify_time;
  69
  70    qemu_event_set(&s->colo_checkpoint_event);
  71    s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
  72    next_notify_time = s->colo_checkpoint_time + migrate_checkpoint_delay();
  73    timer_mod(s->colo_delay_timer, next_notify_time);
  74}
  75
  76static void colo_checkpoint_notify_timer(void *opaque)
  77{
  78    colo_checkpoint_notify();
  79}
  80
  81void colo_checkpoint_delay_set(void)
  82{
  83    if (migration_in_colo_state()) {
  84        colo_checkpoint_notify();
  85    }
  86}
  87
  88static void secondary_vm_do_failover(void)
  89{
  90/* COLO needs enable block-replication */
  91    int old_state;
  92    MigrationIncomingState *mis = migration_incoming_get_current();
  93    Error *local_err = NULL;
  94
  95    /* Can not do failover during the process of VM's loading VMstate, Or
  96     * it will break the secondary VM.
  97     */
  98    if (vmstate_loading) {
  99        old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 100                        FAILOVER_STATUS_RELAUNCH);
 101        if (old_state != FAILOVER_STATUS_ACTIVE) {
 102            error_report("Unknown error while do failover for secondary VM,"
 103                         "old_state: %s", FailoverStatus_str(old_state));
 104        }
 105        return;
 106    }
 107
 108    migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
 109                      MIGRATION_STATUS_COMPLETED);
 110
 111    replication_stop_all(true, &local_err);
 112    if (local_err) {
 113        error_report_err(local_err);
 114        local_err = NULL;
 115    }
 116
 117    /* Notify all filters of all NIC to do checkpoint */
 118    colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err);
 119    if (local_err) {
 120        error_report_err(local_err);
 121    }
 122
 123    if (!autostart) {
 124        error_report("\"-S\" qemu option will be ignored in secondary side");
 125        /* recover runstate to normal migration finish state */
 126        autostart = true;
 127    }
 128    /*
 129     * Make sure COLO incoming thread not block in recv or send,
 130     * If mis->from_src_file and mis->to_src_file use the same fd,
 131     * The second shutdown() will return -1, we ignore this value,
 132     * It is harmless.
 133     */
 134    if (mis->from_src_file) {
 135        qemu_file_shutdown(mis->from_src_file);
 136    }
 137    if (mis->to_src_file) {
 138        qemu_file_shutdown(mis->to_src_file);
 139    }
 140
 141    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 142                                   FAILOVER_STATUS_COMPLETED);
 143    if (old_state != FAILOVER_STATUS_ACTIVE) {
 144        error_report("Incorrect state (%s) while doing failover for "
 145                     "secondary VM", FailoverStatus_str(old_state));
 146        return;
 147    }
 148    /* Notify COLO incoming thread that failover work is finished */
 149    qemu_event_set(&mis->colo_incoming_event);
 150
 151    /* For Secondary VM, jump to incoming co */
 152    if (mis->colo_incoming_co) {
 153        qemu_coroutine_enter(mis->colo_incoming_co);
 154    }
 155}
 156
 157static void primary_vm_do_failover(void)
 158{
 159    MigrationState *s = migrate_get_current();
 160    int old_state;
 161    Error *local_err = NULL;
 162
 163    migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
 164                      MIGRATION_STATUS_COMPLETED);
 165    /*
 166     * kick COLO thread which might wait at
 167     * qemu_sem_wait(&s->colo_checkpoint_sem).
 168     */
 169    colo_checkpoint_notify();
 170
 171    /*
 172     * Wake up COLO thread which may blocked in recv() or send(),
 173     * The s->rp_state.from_dst_file and s->to_dst_file may use the
 174     * same fd, but we still shutdown the fd for twice, it is harmless.
 175     */
 176    if (s->to_dst_file) {
 177        qemu_file_shutdown(s->to_dst_file);
 178    }
 179    if (s->rp_state.from_dst_file) {
 180        qemu_file_shutdown(s->rp_state.from_dst_file);
 181    }
 182
 183    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 184                                   FAILOVER_STATUS_COMPLETED);
 185    if (old_state != FAILOVER_STATUS_ACTIVE) {
 186        error_report("Incorrect state (%s) while doing failover for Primary VM",
 187                     FailoverStatus_str(old_state));
 188        return;
 189    }
 190
 191    replication_stop_all(true, &local_err);
 192    if (local_err) {
 193        error_report_err(local_err);
 194        local_err = NULL;
 195    }
 196
 197    /* Notify COLO thread that failover work is finished */
 198    qemu_event_set(&s->colo_exit_event);
 199}
 200
 201COLOMode get_colo_mode(void)
 202{
 203    if (migration_in_colo_state()) {
 204        return COLO_MODE_PRIMARY;
 205    } else if (migration_incoming_in_colo_state()) {
 206        return COLO_MODE_SECONDARY;
 207    } else {
 208        return COLO_MODE_NONE;
 209    }
 210}
 211
 212void colo_do_failover(void)
 213{
 214    /* Make sure VM stopped while failover happened. */
 215    if (!colo_runstate_is_stopped()) {
 216        vm_stop_force_state(RUN_STATE_COLO);
 217    }
 218
 219    switch (last_colo_mode = get_colo_mode()) {
 220    case COLO_MODE_PRIMARY:
 221        primary_vm_do_failover();
 222        break;
 223    case COLO_MODE_SECONDARY:
 224        secondary_vm_do_failover();
 225        break;
 226    default:
 227        error_report("colo_do_failover failed because the colo mode"
 228                     " could not be obtained");
 229    }
 230}
 231
 232void qmp_xen_set_replication(bool enable, bool primary,
 233                             bool has_failover, bool failover,
 234                             Error **errp)
 235{
 236    ReplicationMode mode = primary ?
 237                           REPLICATION_MODE_PRIMARY :
 238                           REPLICATION_MODE_SECONDARY;
 239
 240    if (has_failover && enable) {
 241        error_setg(errp, "Parameter 'failover' is only for"
 242                   " stopping replication");
 243        return;
 244    }
 245
 246    if (enable) {
 247        replication_start_all(mode, errp);
 248    } else {
 249        if (!has_failover) {
 250            failover = NULL;
 251        }
 252        replication_stop_all(failover, failover ? NULL : errp);
 253    }
 254}
 255
 256ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
 257{
 258    Error *err = NULL;
 259    ReplicationStatus *s = g_new0(ReplicationStatus, 1);
 260
 261    replication_get_error_all(&err);
 262    if (err) {
 263        s->error = true;
 264        s->desc = g_strdup(error_get_pretty(err));
 265    } else {
 266        s->error = false;
 267    }
 268
 269    error_free(err);
 270    return s;
 271}
 272
 273void qmp_xen_colo_do_checkpoint(Error **errp)
 274{
 275    Error *err = NULL;
 276
 277    replication_do_checkpoint_all(&err);
 278    if (err) {
 279        error_propagate(errp, err);
 280        return;
 281    }
 282    /* Notify all filters of all NIC to do checkpoint */
 283    colo_notify_filters_event(COLO_EVENT_CHECKPOINT, errp);
 284}
 285
 286COLOStatus *qmp_query_colo_status(Error **errp)
 287{
 288    COLOStatus *s = g_new0(COLOStatus, 1);
 289
 290    s->mode = get_colo_mode();
 291    s->last_mode = last_colo_mode;
 292
 293    switch (failover_get_state()) {
 294    case FAILOVER_STATUS_NONE:
 295        s->reason = COLO_EXIT_REASON_NONE;
 296        break;
 297    case FAILOVER_STATUS_COMPLETED:
 298        s->reason = COLO_EXIT_REASON_REQUEST;
 299        break;
 300    default:
 301        if (migration_in_colo_state()) {
 302            s->reason = COLO_EXIT_REASON_PROCESSING;
 303        } else {
 304            s->reason = COLO_EXIT_REASON_ERROR;
 305        }
 306    }
 307
 308    return s;
 309}
 310
 311static void colo_send_message(QEMUFile *f, COLOMessage msg,
 312                              Error **errp)
 313{
 314    int ret;
 315
 316    if (msg >= COLO_MESSAGE__MAX) {
 317        error_setg(errp, "%s: Invalid message", __func__);
 318        return;
 319    }
 320    qemu_put_be32(f, msg);
 321    ret = qemu_fflush(f);
 322    if (ret < 0) {
 323        error_setg_errno(errp, -ret, "Can't send COLO message");
 324    }
 325    trace_colo_send_message(COLOMessage_str(msg));
 326}
 327
 328static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
 329                                    uint64_t value, Error **errp)
 330{
 331    Error *local_err = NULL;
 332    int ret;
 333
 334    colo_send_message(f, msg, &local_err);
 335    if (local_err) {
 336        error_propagate(errp, local_err);
 337        return;
 338    }
 339    qemu_put_be64(f, value);
 340    ret = qemu_fflush(f);
 341    if (ret < 0) {
 342        error_setg_errno(errp, -ret, "Failed to send value for message:%s",
 343                         COLOMessage_str(msg));
 344    }
 345}
 346
 347static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
 348{
 349    COLOMessage msg;
 350    int ret;
 351
 352    msg = qemu_get_be32(f);
 353    ret = qemu_file_get_error(f);
 354    if (ret < 0) {
 355        error_setg_errno(errp, -ret, "Can't receive COLO message");
 356        return msg;
 357    }
 358    if (msg >= COLO_MESSAGE__MAX) {
 359        error_setg(errp, "%s: Invalid message", __func__);
 360        return msg;
 361    }
 362    trace_colo_receive_message(COLOMessage_str(msg));
 363    return msg;
 364}
 365
 366static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
 367                                       Error **errp)
 368{
 369    COLOMessage msg;
 370    Error *local_err = NULL;
 371
 372    msg = colo_receive_message(f, &local_err);
 373    if (local_err) {
 374        error_propagate(errp, local_err);
 375        return;
 376    }
 377    if (msg != expect_msg) {
 378        error_setg(errp, "Unexpected COLO message %d, expected %d",
 379                          msg, expect_msg);
 380    }
 381}
 382
 383static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
 384                                           Error **errp)
 385{
 386    Error *local_err = NULL;
 387    uint64_t value;
 388    int ret;
 389
 390    colo_receive_check_message(f, expect_msg, &local_err);
 391    if (local_err) {
 392        error_propagate(errp, local_err);
 393        return 0;
 394    }
 395
 396    value = qemu_get_be64(f);
 397    ret = qemu_file_get_error(f);
 398    if (ret < 0) {
 399        error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
 400                         COLOMessage_str(expect_msg));
 401    }
 402    return value;
 403}
 404
 405static int colo_do_checkpoint_transaction(MigrationState *s,
 406                                          QIOChannelBuffer *bioc,
 407                                          QEMUFile *fb)
 408{
 409    Error *local_err = NULL;
 410    int ret = -1;
 411
 412    colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
 413                      &local_err);
 414    if (local_err) {
 415        goto out;
 416    }
 417
 418    colo_receive_check_message(s->rp_state.from_dst_file,
 419                    COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
 420    if (local_err) {
 421        goto out;
 422    }
 423    /* Reset channel-buffer directly */
 424    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 425    bioc->usage = 0;
 426
 427    bql_lock();
 428    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 429        bql_unlock();
 430        goto out;
 431    }
 432    vm_stop_force_state(RUN_STATE_COLO);
 433    bql_unlock();
 434    trace_colo_vm_state_change("run", "stop");
 435    /*
 436     * Failover request bh could be called after vm_stop_force_state(),
 437     * So we need check failover_request_is_active() again.
 438     */
 439    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 440        goto out;
 441    }
 442    bql_lock();
 443
 444    replication_do_checkpoint_all(&local_err);
 445    if (local_err) {
 446        bql_unlock();
 447        goto out;
 448    }
 449
 450    colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
 451    if (local_err) {
 452        bql_unlock();
 453        goto out;
 454    }
 455
 456    qemu_savevm_maybe_send_switchover_start(s->to_dst_file);
 457
 458    /* Note: device state is saved into buffer */
 459    ret = qemu_save_device_state(fb);
 460
 461    bql_unlock();
 462    if (ret < 0) {
 463        goto out;
 464    }
 465
 466    if (migrate_auto_converge()) {
 467        mig_throttle_counter_reset();
 468    }
 469    /*
 470     * Only save VM's live state, which not including device state.
 471     * TODO: We may need a timeout mechanism to prevent COLO process
 472     * to be blocked here.
 473     */
 474    qemu_savevm_live_state(s->to_dst_file);
 475
 476    qemu_fflush(fb);
 477
 478    /*
 479     * We need the size of the VMstate data in Secondary side,
 480     * With which we can decide how much data should be read.
 481     */
 482    colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
 483                            bioc->usage, &local_err);
 484    if (local_err) {
 485        goto out;
 486    }
 487
 488    qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
 489    ret = qemu_fflush(s->to_dst_file);
 490    if (ret < 0) {
 491        goto out;
 492    }
 493
 494    colo_receive_check_message(s->rp_state.from_dst_file,
 495                       COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
 496    if (local_err) {
 497        goto out;
 498    }
 499
 500    qemu_event_reset(&s->colo_checkpoint_event);
 501    colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err);
 502    if (local_err) {
 503        goto out;
 504    }
 505
 506    colo_receive_check_message(s->rp_state.from_dst_file,
 507                       COLO_MESSAGE_VMSTATE_LOADED, &local_err);
 508    if (local_err) {
 509        goto out;
 510    }
 511
 512    ret = 0;
 513
 514    bql_lock();
 515    vm_start();
 516    bql_unlock();
 517    trace_colo_vm_state_change("stop", "run");
 518
 519out:
 520    if (local_err) {
 521        error_report_err(local_err);
 522    }
 523    return ret;
 524}
 525
 526static void colo_compare_notify_checkpoint(Notifier *notifier, void *data)
 527{
 528    colo_checkpoint_notify();
 529}
 530
 531static void colo_process_checkpoint(MigrationState *s)
 532{
 533    QIOChannelBuffer *bioc;
 534    QEMUFile *fb = NULL;
 535    Error *local_err = NULL;
 536    int ret;
 537
 538    if (get_colo_mode() != COLO_MODE_PRIMARY) {
 539        error_report("COLO mode must be COLO_MODE_PRIMARY");
 540        return;
 541    }
 542
 543    failover_init_state();
 544
 545    s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
 546    if (!s->rp_state.from_dst_file) {
 547        error_report("Open QEMUFile from_dst_file failed");
 548        goto out;
 549    }
 550
 551    packets_compare_notifier.notify = colo_compare_notify_checkpoint;
 552    colo_compare_register_notifier(&packets_compare_notifier);
 553
 554    /*
 555     * Wait for Secondary finish loading VM states and enter COLO
 556     * restore.
 557     */
 558    colo_receive_check_message(s->rp_state.from_dst_file,
 559                       COLO_MESSAGE_CHECKPOINT_READY, &local_err);
 560    if (local_err) {
 561        goto out;
 562    }
 563    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 564    fb = qemu_file_new_output(QIO_CHANNEL(bioc));
 565    object_unref(OBJECT(bioc));
 566
 567    bql_lock();
 568    replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
 569    if (local_err) {
 570        bql_unlock();
 571        goto out;
 572    }
 573
 574    vm_start();
 575    bql_unlock();
 576    trace_colo_vm_state_change("stop", "run");
 577
 578    timer_mod(s->colo_delay_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) +
 579              migrate_checkpoint_delay());
 580
 581    while (s->state == MIGRATION_STATUS_COLO) {
 582        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 583            error_report("failover request");
 584            goto out;
 585        }
 586
 587        qemu_event_wait(&s->colo_checkpoint_event);
 588
 589        if (s->state != MIGRATION_STATUS_COLO) {
 590            goto out;
 591        }
 592        ret = colo_do_checkpoint_transaction(s, bioc, fb);
 593        if (ret < 0) {
 594            goto out;
 595        }
 596    }
 597
 598out:
 599    /* Throw the unreported error message after exited from loop */
 600    if (local_err) {
 601        error_report_err(local_err);
 602    }
 603
 604    if (fb) {
 605        qemu_fclose(fb);
 606    }
 607
 608    /*
 609     * There are only two reasons we can get here, some error happened
 610     * or the user triggered failover.
 611     */
 612    switch (failover_get_state()) {
 613    case FAILOVER_STATUS_COMPLETED:
 614        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 615                                  COLO_EXIT_REASON_REQUEST);
 616        break;
 617    default:
 618        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 619                                  COLO_EXIT_REASON_ERROR);
 620    }
 621
 622    /* Hope this not to be too long to wait here */
 623    qemu_event_wait(&s->colo_exit_event);
 624    qemu_event_destroy(&s->colo_exit_event);
 625
 626    /*
 627     * It is safe to unregister notifier after failover finished.
 628     * Besides, colo_delay_timer and colo_checkpoint_sem can't be
 629     * released before unregister notifier, or there will be use-after-free
 630     * error.
 631     */
 632    colo_compare_unregister_notifier(&packets_compare_notifier);
 633    timer_free(s->colo_delay_timer);
 634    qemu_event_destroy(&s->colo_checkpoint_event);
 635
 636    /*
 637     * Must be called after failover BH is completed,
 638     * Or the failover BH may shutdown the wrong fd that
 639     * re-used by other threads after we release here.
 640     */
 641    if (s->rp_state.from_dst_file) {
 642        qemu_fclose(s->rp_state.from_dst_file);
 643        s->rp_state.from_dst_file = NULL;
 644    }
 645}
 646
 647void migrate_start_colo_process(MigrationState *s)
 648{
 649    bql_unlock();
 650    qemu_event_init(&s->colo_checkpoint_event, false);
 651    s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
 652                                colo_checkpoint_notify_timer, NULL);
 653
 654    qemu_event_init(&s->colo_exit_event, false);
 655    colo_process_checkpoint(s);
 656    bql_lock();
 657}
 658
 659static void colo_incoming_process_checkpoint(MigrationIncomingState *mis,
 660                      QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
 661{
 662    uint64_t total_size;
 663    uint64_t value;
 664    Error *local_err = NULL;
 665    int ret;
 666
 667    bql_lock();
 668    vm_stop_force_state(RUN_STATE_COLO);
 669    bql_unlock();
 670    trace_colo_vm_state_change("run", "stop");
 671
 672    /* FIXME: This is unnecessary for periodic checkpoint mode */
 673    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
 674                 &local_err);
 675    if (local_err) {
 676        error_propagate(errp, local_err);
 677        return;
 678    }
 679
 680    colo_receive_check_message(mis->from_src_file,
 681                       COLO_MESSAGE_VMSTATE_SEND, &local_err);
 682    if (local_err) {
 683        error_propagate(errp, local_err);
 684        return;
 685    }
 686
 687    bql_lock();
 688    cpu_synchronize_all_states();
 689    ret = qemu_loadvm_state_main(mis->from_src_file, mis);
 690    bql_unlock();
 691
 692    if (ret < 0) {
 693        error_setg(errp, "Load VM's live state (ram) error");
 694        return;
 695    }
 696
 697    value = colo_receive_message_value(mis->from_src_file,
 698                             COLO_MESSAGE_VMSTATE_SIZE, &local_err);
 699    if (local_err) {
 700        error_propagate(errp, local_err);
 701        return;
 702    }
 703
 704    /*
 705     * Read VM device state data into channel buffer,
 706     * It's better to re-use the memory allocated.
 707     * Here we need to handle the channel buffer directly.
 708     */
 709    if (value > bioc->capacity) {
 710        bioc->capacity = value;
 711        bioc->data = g_realloc(bioc->data, bioc->capacity);
 712    }
 713    total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
 714    if (total_size != value) {
 715        error_setg(errp, "Got %" PRIu64 " VMState data, less than expected"
 716                    " %" PRIu64, total_size, value);
 717        return;
 718    }
 719    bioc->usage = total_size;
 720    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 721
 722    colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
 723                 &local_err);
 724    if (local_err) {
 725        error_propagate(errp, local_err);
 726        return;
 727    }
 728
 729    bql_lock();
 730    vmstate_loading = true;
 731    colo_flush_ram_cache();
 732    ret = qemu_load_device_state(fb);
 733    if (ret < 0) {
 734        error_setg(errp, "COLO: load device state failed");
 735        vmstate_loading = false;
 736        bql_unlock();
 737        return;
 738    }
 739
 740    replication_get_error_all(&local_err);
 741    if (local_err) {
 742        error_propagate(errp, local_err);
 743        vmstate_loading = false;
 744        bql_unlock();
 745        return;
 746    }
 747
 748    /* discard colo disk buffer */
 749    replication_do_checkpoint_all(&local_err);
 750    if (local_err) {
 751        error_propagate(errp, local_err);
 752        vmstate_loading = false;
 753        bql_unlock();
 754        return;
 755    }
 756    /* Notify all filters of all NIC to do checkpoint */
 757    colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
 758
 759    if (local_err) {
 760        error_propagate(errp, local_err);
 761        vmstate_loading = false;
 762        bql_unlock();
 763        return;
 764    }
 765
 766    vmstate_loading = false;
 767    vm_start();
 768    bql_unlock();
 769    trace_colo_vm_state_change("stop", "run");
 770
 771    if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 772        return;
 773    }
 774
 775    colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
 776                 &local_err);
 777    error_propagate(errp, local_err);
 778}
 779
 780static void colo_wait_handle_message(MigrationIncomingState *mis,
 781                QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
 782{
 783    COLOMessage msg;
 784    Error *local_err = NULL;
 785
 786    msg = colo_receive_message(mis->from_src_file, &local_err);
 787    if (local_err) {
 788        error_propagate(errp, local_err);
 789        return;
 790    }
 791
 792    switch (msg) {
 793    case COLO_MESSAGE_CHECKPOINT_REQUEST:
 794        colo_incoming_process_checkpoint(mis, fb, bioc, errp);
 795        break;
 796    default:
 797        error_setg(errp, "Got unknown COLO message: %d", msg);
 798        break;
 799    }
 800}
 801
 802void colo_shutdown(void)
 803{
 804    MigrationIncomingState *mis = NULL;
 805    MigrationState *s = NULL;
 806
 807    switch (get_colo_mode()) {
 808    case COLO_MODE_PRIMARY:
 809        s = migrate_get_current();
 810        qemu_event_set(&s->colo_checkpoint_event);
 811        qemu_event_set(&s->colo_exit_event);
 812        break;
 813    case COLO_MODE_SECONDARY:
 814        mis = migration_incoming_get_current();
 815        qemu_event_set(&mis->colo_incoming_event);
 816        break;
 817    default:
 818        break;
 819    }
 820}
 821
 822static void *colo_process_incoming_thread(void *opaque)
 823{
 824    MigrationIncomingState *mis = opaque;
 825    QEMUFile *fb = NULL;
 826    QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
 827    Error *local_err = NULL;
 828
 829    rcu_register_thread();
 830    qemu_event_init(&mis->colo_incoming_event, false);
 831
 832    migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
 833                      MIGRATION_STATUS_COLO);
 834
 835    if (get_colo_mode() != COLO_MODE_SECONDARY) {
 836        error_report("COLO mode must be COLO_MODE_SECONDARY");
 837        return NULL;
 838    }
 839
 840    /* Make sure all file formats throw away their mutable metadata */
 841    bql_lock();
 842    migration_block_activate(&local_err);
 843    bql_unlock();
 844    if (local_err) {
 845        error_report_err(local_err);
 846        return NULL;
 847    }
 848
 849    failover_init_state();
 850
 851    mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
 852    if (!mis->to_src_file) {
 853        error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
 854        goto out;
 855    }
 856    /*
 857     * Note: the communication between Primary side and Secondary side
 858     * should be sequential, we set the fd to unblocked in migration incoming
 859     * coroutine, and here we are in the COLO incoming thread, so it is ok to
 860     * set the fd back to blocked.
 861     */
 862    qemu_file_set_blocking(mis->from_src_file, true);
 863
 864    colo_incoming_start_dirty_log();
 865
 866    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 867    fb = qemu_file_new_input(QIO_CHANNEL(bioc));
 868    object_unref(OBJECT(bioc));
 869
 870    bql_lock();
 871    replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
 872    if (local_err) {
 873        bql_unlock();
 874        goto out;
 875    }
 876    vm_start();
 877    bql_unlock();
 878    trace_colo_vm_state_change("stop", "run");
 879
 880    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
 881                      &local_err);
 882    if (local_err) {
 883        goto out;
 884    }
 885
 886    while (mis->state == MIGRATION_STATUS_COLO) {
 887        colo_wait_handle_message(mis, fb, bioc, &local_err);
 888        if (local_err) {
 889            error_report_err(local_err);
 890            break;
 891        }
 892
 893        if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 894            failover_set_state(FAILOVER_STATUS_RELAUNCH,
 895                            FAILOVER_STATUS_NONE);
 896            failover_request_active(NULL);
 897            break;
 898        }
 899
 900        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 901            error_report("failover request");
 902            break;
 903        }
 904    }
 905
 906out:
 907    /*
 908     * There are only two reasons we can get here, some error happened
 909     * or the user triggered failover.
 910     */
 911    switch (failover_get_state()) {
 912    case FAILOVER_STATUS_COMPLETED:
 913        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 914                                  COLO_EXIT_REASON_REQUEST);
 915        break;
 916    default:
 917        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 918                                  COLO_EXIT_REASON_ERROR);
 919    }
 920
 921    if (fb) {
 922        qemu_fclose(fb);
 923    }
 924
 925    /* Hope this not to be too long to loop here */
 926    qemu_event_wait(&mis->colo_incoming_event);
 927    qemu_event_destroy(&mis->colo_incoming_event);
 928
 929    rcu_unregister_thread();
 930    return NULL;
 931}
 932
 933void coroutine_fn colo_incoming_co(void)
 934{
 935    MigrationIncomingState *mis = migration_incoming_get_current();
 936    QemuThread th;
 937
 938    assert(bql_locked());
 939    assert(migration_incoming_colo_enabled());
 940
 941    qemu_thread_create(&th, MIGRATION_THREAD_DST_COLO,
 942                       colo_process_incoming_thread,
 943                       mis, QEMU_THREAD_JOINABLE);
 944
 945    mis->colo_incoming_co = qemu_coroutine_self();
 946    qemu_coroutine_yield();
 947    mis->colo_incoming_co = NULL;
 948
 949    bql_unlock();
 950    /* Wait checkpoint incoming thread exit before free resource */
 951    qemu_thread_join(&th);
 952    bql_lock();
 953
 954    /* We hold the global BQL, so it is safe here */
 955    colo_release_ram_cache();
 956}
 957