qemu/migration/colo.c
<<
>>
Prefs
   1/*
   2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
   3 * (a.k.a. Fault Tolerance or Continuous Replication)
   4 *
   5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 * Copyright (c) 2016 Intel Corporation
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or
  10 * later.  See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include "sysemu/sysemu.h"
  15#include "qapi/error.h"
  16#include "qapi/qapi-commands-migration.h"
  17#include "qemu-file-channel.h"
  18#include "migration.h"
  19#include "qemu-file.h"
  20#include "savevm.h"
  21#include "migration/colo.h"
  22#include "block.h"
  23#include "io/channel-buffer.h"
  24#include "trace.h"
  25#include "qemu/error-report.h"
  26#include "qemu/main-loop.h"
  27#include "qemu/rcu.h"
  28#include "migration/failover.h"
  29#include "migration/ram.h"
  30#ifdef CONFIG_REPLICATION
  31#include "block/replication.h"
  32#endif
  33#include "net/colo-compare.h"
  34#include "net/colo.h"
  35#include "block/block.h"
  36#include "qapi/qapi-events-migration.h"
  37#include "qapi/qmp/qerror.h"
  38#include "sysemu/cpus.h"
  39#include "sysemu/runstate.h"
  40#include "net/filter.h"
  41
  42static bool vmstate_loading;
  43static Notifier packets_compare_notifier;
  44
  45/* User need to know colo mode after COLO failover */
  46static COLOMode last_colo_mode;
  47
  48#define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
  49
  50bool migration_in_colo_state(void)
  51{
  52    MigrationState *s = migrate_get_current();
  53
  54    return (s->state == MIGRATION_STATUS_COLO);
  55}
  56
  57bool migration_incoming_in_colo_state(void)
  58{
  59    MigrationIncomingState *mis = migration_incoming_get_current();
  60
  61    return mis && (mis->state == MIGRATION_STATUS_COLO);
  62}
  63
  64static bool colo_runstate_is_stopped(void)
  65{
  66    return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
  67}
  68
  69static void secondary_vm_do_failover(void)
  70{
  71/* COLO needs enable block-replication */
  72#ifdef CONFIG_REPLICATION
  73    int old_state;
  74    MigrationIncomingState *mis = migration_incoming_get_current();
  75    Error *local_err = NULL;
  76
  77    /* Can not do failover during the process of VM's loading VMstate, Or
  78     * it will break the secondary VM.
  79     */
  80    if (vmstate_loading) {
  81        old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
  82                        FAILOVER_STATUS_RELAUNCH);
  83        if (old_state != FAILOVER_STATUS_ACTIVE) {
  84            error_report("Unknown error while do failover for secondary VM,"
  85                         "old_state: %s", FailoverStatus_str(old_state));
  86        }
  87        return;
  88    }
  89
  90    migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
  91                      MIGRATION_STATUS_COMPLETED);
  92
  93    replication_stop_all(true, &local_err);
  94    if (local_err) {
  95        error_report_err(local_err);
  96        local_err = NULL;
  97    }
  98
  99    /* Notify all filters of all NIC to do checkpoint */
 100    colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err);
 101    if (local_err) {
 102        error_report_err(local_err);
 103    }
 104
 105    if (!autostart) {
 106        error_report("\"-S\" qemu option will be ignored in secondary side");
 107        /* recover runstate to normal migration finish state */
 108        autostart = true;
 109    }
 110    /*
 111     * Make sure COLO incoming thread not block in recv or send,
 112     * If mis->from_src_file and mis->to_src_file use the same fd,
 113     * The second shutdown() will return -1, we ignore this value,
 114     * It is harmless.
 115     */
 116    if (mis->from_src_file) {
 117        qemu_file_shutdown(mis->from_src_file);
 118    }
 119    if (mis->to_src_file) {
 120        qemu_file_shutdown(mis->to_src_file);
 121    }
 122
 123    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 124                                   FAILOVER_STATUS_COMPLETED);
 125    if (old_state != FAILOVER_STATUS_ACTIVE) {
 126        error_report("Incorrect state (%s) while doing failover for "
 127                     "secondary VM", FailoverStatus_str(old_state));
 128        return;
 129    }
 130    /* Notify COLO incoming thread that failover work is finished */
 131    qemu_sem_post(&mis->colo_incoming_sem);
 132
 133    /* For Secondary VM, jump to incoming co */
 134    if (mis->migration_incoming_co) {
 135        qemu_coroutine_enter(mis->migration_incoming_co);
 136    }
 137#else
 138    abort();
 139#endif
 140}
 141
 142static void primary_vm_do_failover(void)
 143{
 144#ifdef CONFIG_REPLICATION
 145    MigrationState *s = migrate_get_current();
 146    int old_state;
 147    Error *local_err = NULL;
 148
 149    migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
 150                      MIGRATION_STATUS_COMPLETED);
 151    /*
 152     * kick COLO thread which might wait at
 153     * qemu_sem_wait(&s->colo_checkpoint_sem).
 154     */
 155    colo_checkpoint_notify(s);
 156
 157    /*
 158     * Wake up COLO thread which may blocked in recv() or send(),
 159     * The s->rp_state.from_dst_file and s->to_dst_file may use the
 160     * same fd, but we still shutdown the fd for twice, it is harmless.
 161     */
 162    if (s->to_dst_file) {
 163        qemu_file_shutdown(s->to_dst_file);
 164    }
 165    if (s->rp_state.from_dst_file) {
 166        qemu_file_shutdown(s->rp_state.from_dst_file);
 167    }
 168
 169    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 170                                   FAILOVER_STATUS_COMPLETED);
 171    if (old_state != FAILOVER_STATUS_ACTIVE) {
 172        error_report("Incorrect state (%s) while doing failover for Primary VM",
 173                     FailoverStatus_str(old_state));
 174        return;
 175    }
 176
 177    replication_stop_all(true, &local_err);
 178    if (local_err) {
 179        error_report_err(local_err);
 180        local_err = NULL;
 181    }
 182
 183    /* Notify COLO thread that failover work is finished */
 184    qemu_sem_post(&s->colo_exit_sem);
 185#else
 186    abort();
 187#endif
 188}
 189
 190COLOMode get_colo_mode(void)
 191{
 192    if (migration_in_colo_state()) {
 193        return COLO_MODE_PRIMARY;
 194    } else if (migration_incoming_in_colo_state()) {
 195        return COLO_MODE_SECONDARY;
 196    } else {
 197        return COLO_MODE_NONE;
 198    }
 199}
 200
 201void colo_do_failover(void)
 202{
 203    /* Make sure VM stopped while failover happened. */
 204    if (!colo_runstate_is_stopped()) {
 205        vm_stop_force_state(RUN_STATE_COLO);
 206    }
 207
 208    switch (last_colo_mode = get_colo_mode()) {
 209    case COLO_MODE_PRIMARY:
 210        primary_vm_do_failover();
 211        break;
 212    case COLO_MODE_SECONDARY:
 213        secondary_vm_do_failover();
 214        break;
 215    default:
 216        error_report("colo_do_failover failed because the colo mode"
 217                     " could not be obtained");
 218    }
 219}
 220
 221#ifdef CONFIG_REPLICATION
 222void qmp_xen_set_replication(bool enable, bool primary,
 223                             bool has_failover, bool failover,
 224                             Error **errp)
 225{
 226    ReplicationMode mode = primary ?
 227                           REPLICATION_MODE_PRIMARY :
 228                           REPLICATION_MODE_SECONDARY;
 229
 230    if (has_failover && enable) {
 231        error_setg(errp, "Parameter 'failover' is only for"
 232                   " stopping replication");
 233        return;
 234    }
 235
 236    if (enable) {
 237        replication_start_all(mode, errp);
 238    } else {
 239        if (!has_failover) {
 240            failover = NULL;
 241        }
 242        replication_stop_all(failover, failover ? NULL : errp);
 243    }
 244}
 245
 246ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
 247{
 248    Error *err = NULL;
 249    ReplicationStatus *s = g_new0(ReplicationStatus, 1);
 250
 251    replication_get_error_all(&err);
 252    if (err) {
 253        s->error = true;
 254        s->has_desc = true;
 255        s->desc = g_strdup(error_get_pretty(err));
 256    } else {
 257        s->error = false;
 258    }
 259
 260    error_free(err);
 261    return s;
 262}
 263
 264void qmp_xen_colo_do_checkpoint(Error **errp)
 265{
 266    Error *err = NULL;
 267
 268    replication_do_checkpoint_all(&err);
 269    if (err) {
 270        error_propagate(errp, err);
 271        return;
 272    }
 273    /* Notify all filters of all NIC to do checkpoint */
 274    colo_notify_filters_event(COLO_EVENT_CHECKPOINT, errp);
 275}
 276#endif
 277
 278COLOStatus *qmp_query_colo_status(Error **errp)
 279{
 280    COLOStatus *s = g_new0(COLOStatus, 1);
 281
 282    s->mode = get_colo_mode();
 283    s->last_mode = last_colo_mode;
 284
 285    switch (failover_get_state()) {
 286    case FAILOVER_STATUS_NONE:
 287        s->reason = COLO_EXIT_REASON_NONE;
 288        break;
 289    case FAILOVER_STATUS_COMPLETED:
 290        s->reason = COLO_EXIT_REASON_REQUEST;
 291        break;
 292    default:
 293        if (migration_in_colo_state()) {
 294            s->reason = COLO_EXIT_REASON_PROCESSING;
 295        } else {
 296            s->reason = COLO_EXIT_REASON_ERROR;
 297        }
 298    }
 299
 300    return s;
 301}
 302
 303static void colo_send_message(QEMUFile *f, COLOMessage msg,
 304                              Error **errp)
 305{
 306    int ret;
 307
 308    if (msg >= COLO_MESSAGE__MAX) {
 309        error_setg(errp, "%s: Invalid message", __func__);
 310        return;
 311    }
 312    qemu_put_be32(f, msg);
 313    qemu_fflush(f);
 314
 315    ret = qemu_file_get_error(f);
 316    if (ret < 0) {
 317        error_setg_errno(errp, -ret, "Can't send COLO message");
 318    }
 319    trace_colo_send_message(COLOMessage_str(msg));
 320}
 321
 322static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
 323                                    uint64_t value, Error **errp)
 324{
 325    Error *local_err = NULL;
 326    int ret;
 327
 328    colo_send_message(f, msg, &local_err);
 329    if (local_err) {
 330        error_propagate(errp, local_err);
 331        return;
 332    }
 333    qemu_put_be64(f, value);
 334    qemu_fflush(f);
 335
 336    ret = qemu_file_get_error(f);
 337    if (ret < 0) {
 338        error_setg_errno(errp, -ret, "Failed to send value for message:%s",
 339                         COLOMessage_str(msg));
 340    }
 341}
 342
 343static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
 344{
 345    COLOMessage msg;
 346    int ret;
 347
 348    msg = qemu_get_be32(f);
 349    ret = qemu_file_get_error(f);
 350    if (ret < 0) {
 351        error_setg_errno(errp, -ret, "Can't receive COLO message");
 352        return msg;
 353    }
 354    if (msg >= COLO_MESSAGE__MAX) {
 355        error_setg(errp, "%s: Invalid message", __func__);
 356        return msg;
 357    }
 358    trace_colo_receive_message(COLOMessage_str(msg));
 359    return msg;
 360}
 361
 362static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
 363                                       Error **errp)
 364{
 365    COLOMessage msg;
 366    Error *local_err = NULL;
 367
 368    msg = colo_receive_message(f, &local_err);
 369    if (local_err) {
 370        error_propagate(errp, local_err);
 371        return;
 372    }
 373    if (msg != expect_msg) {
 374        error_setg(errp, "Unexpected COLO message %d, expected %d",
 375                          msg, expect_msg);
 376    }
 377}
 378
 379static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
 380                                           Error **errp)
 381{
 382    Error *local_err = NULL;
 383    uint64_t value;
 384    int ret;
 385
 386    colo_receive_check_message(f, expect_msg, &local_err);
 387    if (local_err) {
 388        error_propagate(errp, local_err);
 389        return 0;
 390    }
 391
 392    value = qemu_get_be64(f);
 393    ret = qemu_file_get_error(f);
 394    if (ret < 0) {
 395        error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
 396                         COLOMessage_str(expect_msg));
 397    }
 398    return value;
 399}
 400
 401static int colo_do_checkpoint_transaction(MigrationState *s,
 402                                          QIOChannelBuffer *bioc,
 403                                          QEMUFile *fb)
 404{
 405    Error *local_err = NULL;
 406    int ret = -1;
 407
 408    colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
 409                      &local_err);
 410    if (local_err) {
 411        goto out;
 412    }
 413
 414    colo_receive_check_message(s->rp_state.from_dst_file,
 415                    COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
 416    if (local_err) {
 417        goto out;
 418    }
 419    /* Reset channel-buffer directly */
 420    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 421    bioc->usage = 0;
 422
 423    qemu_mutex_lock_iothread();
 424    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 425        qemu_mutex_unlock_iothread();
 426        goto out;
 427    }
 428    vm_stop_force_state(RUN_STATE_COLO);
 429    qemu_mutex_unlock_iothread();
 430    trace_colo_vm_state_change("run", "stop");
 431    /*
 432     * Failover request bh could be called after vm_stop_force_state(),
 433     * So we need check failover_request_is_active() again.
 434     */
 435    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 436        goto out;
 437    }
 438    qemu_mutex_lock_iothread();
 439
 440#ifdef CONFIG_REPLICATION
 441    replication_do_checkpoint_all(&local_err);
 442    if (local_err) {
 443        qemu_mutex_unlock_iothread();
 444        goto out;
 445    }
 446#else
 447        abort();
 448#endif
 449
 450    colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
 451    if (local_err) {
 452        qemu_mutex_unlock_iothread();
 453        goto out;
 454    }
 455    /* Note: device state is saved into buffer */
 456    ret = qemu_save_device_state(fb);
 457
 458    qemu_mutex_unlock_iothread();
 459    if (ret < 0) {
 460        goto out;
 461    }
 462
 463    if (migrate_auto_converge()) {
 464        mig_throttle_counter_reset();
 465    }
 466    /*
 467     * Only save VM's live state, which not including device state.
 468     * TODO: We may need a timeout mechanism to prevent COLO process
 469     * to be blocked here.
 470     */
 471    qemu_savevm_live_state(s->to_dst_file);
 472
 473    qemu_fflush(fb);
 474
 475    /*
 476     * We need the size of the VMstate data in Secondary side,
 477     * With which we can decide how much data should be read.
 478     */
 479    colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
 480                            bioc->usage, &local_err);
 481    if (local_err) {
 482        goto out;
 483    }
 484
 485    qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
 486    qemu_fflush(s->to_dst_file);
 487    ret = qemu_file_get_error(s->to_dst_file);
 488    if (ret < 0) {
 489        goto out;
 490    }
 491
 492    colo_receive_check_message(s->rp_state.from_dst_file,
 493                       COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
 494    if (local_err) {
 495        goto out;
 496    }
 497
 498    qemu_event_reset(&s->colo_checkpoint_event);
 499    colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err);
 500    if (local_err) {
 501        goto out;
 502    }
 503
 504    colo_receive_check_message(s->rp_state.from_dst_file,
 505                       COLO_MESSAGE_VMSTATE_LOADED, &local_err);
 506    if (local_err) {
 507        goto out;
 508    }
 509
 510    ret = 0;
 511
 512    qemu_mutex_lock_iothread();
 513    vm_start();
 514    qemu_mutex_unlock_iothread();
 515    trace_colo_vm_state_change("stop", "run");
 516
 517out:
 518    if (local_err) {
 519        error_report_err(local_err);
 520    }
 521    return ret;
 522}
 523
 524static void colo_compare_notify_checkpoint(Notifier *notifier, void *data)
 525{
 526    colo_checkpoint_notify(data);
 527}
 528
 529static void colo_process_checkpoint(MigrationState *s)
 530{
 531    QIOChannelBuffer *bioc;
 532    QEMUFile *fb = NULL;
 533    int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 534    Error *local_err = NULL;
 535    int ret;
 536
 537    if (get_colo_mode() != COLO_MODE_PRIMARY) {
 538        error_report("COLO mode must be COLO_MODE_PRIMARY");
 539        return;
 540    }
 541
 542    failover_init_state();
 543
 544    s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
 545    if (!s->rp_state.from_dst_file) {
 546        error_report("Open QEMUFile from_dst_file failed");
 547        goto out;
 548    }
 549
 550    packets_compare_notifier.notify = colo_compare_notify_checkpoint;
 551    colo_compare_register_notifier(&packets_compare_notifier);
 552
 553    /*
 554     * Wait for Secondary finish loading VM states and enter COLO
 555     * restore.
 556     */
 557    colo_receive_check_message(s->rp_state.from_dst_file,
 558                       COLO_MESSAGE_CHECKPOINT_READY, &local_err);
 559    if (local_err) {
 560        goto out;
 561    }
 562    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 563    fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
 564    object_unref(OBJECT(bioc));
 565
 566    qemu_mutex_lock_iothread();
 567#ifdef CONFIG_REPLICATION
 568    replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
 569    if (local_err) {
 570        qemu_mutex_unlock_iothread();
 571        goto out;
 572    }
 573#else
 574        abort();
 575#endif
 576
 577    vm_start();
 578    qemu_mutex_unlock_iothread();
 579    trace_colo_vm_state_change("stop", "run");
 580
 581    timer_mod(s->colo_delay_timer,
 582            current_time + s->parameters.x_checkpoint_delay);
 583
 584    while (s->state == MIGRATION_STATUS_COLO) {
 585        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 586            error_report("failover request");
 587            goto out;
 588        }
 589
 590        qemu_event_wait(&s->colo_checkpoint_event);
 591
 592        if (s->state != MIGRATION_STATUS_COLO) {
 593            goto out;
 594        }
 595        ret = colo_do_checkpoint_transaction(s, bioc, fb);
 596        if (ret < 0) {
 597            goto out;
 598        }
 599    }
 600
 601out:
 602    /* Throw the unreported error message after exited from loop */
 603    if (local_err) {
 604        error_report_err(local_err);
 605    }
 606
 607    if (fb) {
 608        qemu_fclose(fb);
 609    }
 610
 611    /*
 612     * There are only two reasons we can get here, some error happened
 613     * or the user triggered failover.
 614     */
 615    switch (failover_get_state()) {
 616    case FAILOVER_STATUS_COMPLETED:
 617        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 618                                  COLO_EXIT_REASON_REQUEST);
 619        break;
 620    default:
 621        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 622                                  COLO_EXIT_REASON_ERROR);
 623    }
 624
 625    /* Hope this not to be too long to wait here */
 626    qemu_sem_wait(&s->colo_exit_sem);
 627    qemu_sem_destroy(&s->colo_exit_sem);
 628
 629    /*
 630     * It is safe to unregister notifier after failover finished.
 631     * Besides, colo_delay_timer and colo_checkpoint_sem can't be
 632     * released before unregister notifier, or there will be use-after-free
 633     * error.
 634     */
 635    colo_compare_unregister_notifier(&packets_compare_notifier);
 636    timer_free(s->colo_delay_timer);
 637    qemu_event_destroy(&s->colo_checkpoint_event);
 638
 639    /*
 640     * Must be called after failover BH is completed,
 641     * Or the failover BH may shutdown the wrong fd that
 642     * re-used by other threads after we release here.
 643     */
 644    if (s->rp_state.from_dst_file) {
 645        qemu_fclose(s->rp_state.from_dst_file);
 646        s->rp_state.from_dst_file = NULL;
 647    }
 648}
 649
 650void colo_checkpoint_notify(void *opaque)
 651{
 652    MigrationState *s = opaque;
 653    int64_t next_notify_time;
 654
 655    qemu_event_set(&s->colo_checkpoint_event);
 656    s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 657    next_notify_time = s->colo_checkpoint_time +
 658                    s->parameters.x_checkpoint_delay;
 659    timer_mod(s->colo_delay_timer, next_notify_time);
 660}
 661
 662void migrate_start_colo_process(MigrationState *s)
 663{
 664    qemu_mutex_unlock_iothread();
 665    qemu_event_init(&s->colo_checkpoint_event, false);
 666    s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
 667                                colo_checkpoint_notify, s);
 668
 669    qemu_sem_init(&s->colo_exit_sem, 0);
 670    migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
 671                      MIGRATION_STATUS_COLO);
 672    colo_process_checkpoint(s);
 673    qemu_mutex_lock_iothread();
 674}
 675
 676static void colo_incoming_process_checkpoint(MigrationIncomingState *mis,
 677                      QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
 678{
 679    uint64_t total_size;
 680    uint64_t value;
 681    Error *local_err = NULL;
 682    int ret;
 683
 684    qemu_mutex_lock_iothread();
 685    vm_stop_force_state(RUN_STATE_COLO);
 686    trace_colo_vm_state_change("run", "stop");
 687    qemu_mutex_unlock_iothread();
 688
 689    /* FIXME: This is unnecessary for periodic checkpoint mode */
 690    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
 691                 &local_err);
 692    if (local_err) {
 693        error_propagate(errp, local_err);
 694        return;
 695    }
 696
 697    colo_receive_check_message(mis->from_src_file,
 698                       COLO_MESSAGE_VMSTATE_SEND, &local_err);
 699    if (local_err) {
 700        error_propagate(errp, local_err);
 701        return;
 702    }
 703
 704    qemu_mutex_lock_iothread();
 705    cpu_synchronize_all_states();
 706    ret = qemu_loadvm_state_main(mis->from_src_file, mis);
 707    qemu_mutex_unlock_iothread();
 708
 709    if (ret < 0) {
 710        error_setg(errp, "Load VM's live state (ram) error");
 711        return;
 712    }
 713
 714    value = colo_receive_message_value(mis->from_src_file,
 715                             COLO_MESSAGE_VMSTATE_SIZE, &local_err);
 716    if (local_err) {
 717        error_propagate(errp, local_err);
 718        return;
 719    }
 720
 721    /*
 722     * Read VM device state data into channel buffer,
 723     * It's better to re-use the memory allocated.
 724     * Here we need to handle the channel buffer directly.
 725     */
 726    if (value > bioc->capacity) {
 727        bioc->capacity = value;
 728        bioc->data = g_realloc(bioc->data, bioc->capacity);
 729    }
 730    total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
 731    if (total_size != value) {
 732        error_setg(errp, "Got %" PRIu64 " VMState data, less than expected"
 733                    " %" PRIu64, total_size, value);
 734        return;
 735    }
 736    bioc->usage = total_size;
 737    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 738
 739    colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
 740                 &local_err);
 741    if (local_err) {
 742        error_propagate(errp, local_err);
 743        return;
 744    }
 745
 746    qemu_mutex_lock_iothread();
 747    vmstate_loading = true;
 748    colo_flush_ram_cache();
 749    ret = qemu_load_device_state(fb);
 750    if (ret < 0) {
 751        error_setg(errp, "COLO: load device state failed");
 752        vmstate_loading = false;
 753        qemu_mutex_unlock_iothread();
 754        return;
 755    }
 756
 757#ifdef CONFIG_REPLICATION
 758    replication_get_error_all(&local_err);
 759    if (local_err) {
 760        error_propagate(errp, local_err);
 761        vmstate_loading = false;
 762        qemu_mutex_unlock_iothread();
 763        return;
 764    }
 765
 766    /* discard colo disk buffer */
 767    replication_do_checkpoint_all(&local_err);
 768    if (local_err) {
 769        error_propagate(errp, local_err);
 770        vmstate_loading = false;
 771        qemu_mutex_unlock_iothread();
 772        return;
 773    }
 774#else
 775    abort();
 776#endif
 777    /* Notify all filters of all NIC to do checkpoint */
 778    colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
 779
 780    if (local_err) {
 781        error_propagate(errp, local_err);
 782        vmstate_loading = false;
 783        qemu_mutex_unlock_iothread();
 784        return;
 785    }
 786
 787    vmstate_loading = false;
 788    vm_start();
 789    trace_colo_vm_state_change("stop", "run");
 790    qemu_mutex_unlock_iothread();
 791
 792    if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 793        return;
 794    }
 795
 796    colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
 797                 &local_err);
 798    error_propagate(errp, local_err);
 799}
 800
 801static void colo_wait_handle_message(MigrationIncomingState *mis,
 802                QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
 803{
 804    COLOMessage msg;
 805    Error *local_err = NULL;
 806
 807    msg = colo_receive_message(mis->from_src_file, &local_err);
 808    if (local_err) {
 809        error_propagate(errp, local_err);
 810        return;
 811    }
 812
 813    switch (msg) {
 814    case COLO_MESSAGE_CHECKPOINT_REQUEST:
 815        colo_incoming_process_checkpoint(mis, fb, bioc, errp);
 816        break;
 817    default:
 818        error_setg(errp, "Got unknown COLO message: %d", msg);
 819        break;
 820    }
 821}
 822
 823void *colo_process_incoming_thread(void *opaque)
 824{
 825    MigrationIncomingState *mis = opaque;
 826    QEMUFile *fb = NULL;
 827    QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
 828    Error *local_err = NULL;
 829
 830    rcu_register_thread();
 831    qemu_sem_init(&mis->colo_incoming_sem, 0);
 832
 833    migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
 834                      MIGRATION_STATUS_COLO);
 835
 836    if (get_colo_mode() != COLO_MODE_SECONDARY) {
 837        error_report("COLO mode must be COLO_MODE_SECONDARY");
 838        return NULL;
 839    }
 840
 841    failover_init_state();
 842
 843    mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
 844    if (!mis->to_src_file) {
 845        error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
 846        goto out;
 847    }
 848    /*
 849     * Note: the communication between Primary side and Secondary side
 850     * should be sequential, we set the fd to unblocked in migration incoming
 851     * coroutine, and here we are in the COLO incoming thread, so it is ok to
 852     * set the fd back to blocked.
 853     */
 854    qemu_file_set_blocking(mis->from_src_file, true);
 855
 856    colo_incoming_start_dirty_log();
 857
 858    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 859    fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
 860    object_unref(OBJECT(bioc));
 861
 862    qemu_mutex_lock_iothread();
 863#ifdef CONFIG_REPLICATION
 864    replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
 865    if (local_err) {
 866        qemu_mutex_unlock_iothread();
 867        goto out;
 868    }
 869#else
 870        abort();
 871#endif
 872    vm_start();
 873    trace_colo_vm_state_change("stop", "run");
 874    qemu_mutex_unlock_iothread();
 875
 876    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
 877                      &local_err);
 878    if (local_err) {
 879        goto out;
 880    }
 881
 882    while (mis->state == MIGRATION_STATUS_COLO) {
 883        colo_wait_handle_message(mis, fb, bioc, &local_err);
 884        if (local_err) {
 885            error_report_err(local_err);
 886            break;
 887        }
 888
 889        if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 890            failover_set_state(FAILOVER_STATUS_RELAUNCH,
 891                            FAILOVER_STATUS_NONE);
 892            failover_request_active(NULL);
 893            break;
 894        }
 895
 896        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 897            error_report("failover request");
 898            break;
 899        }
 900    }
 901
 902out:
 903    /*
 904     * There are only two reasons we can get here, some error happened
 905     * or the user triggered failover.
 906     */
 907    switch (failover_get_state()) {
 908    case FAILOVER_STATUS_COMPLETED:
 909        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 910                                  COLO_EXIT_REASON_REQUEST);
 911        break;
 912    default:
 913        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 914                                  COLO_EXIT_REASON_ERROR);
 915    }
 916
 917    if (fb) {
 918        qemu_fclose(fb);
 919    }
 920
 921    /* Hope this not to be too long to loop here */
 922    qemu_sem_wait(&mis->colo_incoming_sem);
 923    qemu_sem_destroy(&mis->colo_incoming_sem);
 924
 925    rcu_unregister_thread();
 926    return NULL;
 927}
 928