qemu/migration/colo.c
<<
>>
Prefs
   1/*
   2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
   3 * (a.k.a. Fault Tolerance or Continuous Replication)
   4 *
   5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 * Copyright (c) 2016 Intel Corporation
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or
  10 * later.  See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include "sysemu/sysemu.h"
  15#include "qapi/error.h"
  16#include "qapi/qapi-commands-migration.h"
  17#include "migration.h"
  18#include "qemu-file.h"
  19#include "savevm.h"
  20#include "migration/colo.h"
  21#include "block.h"
  22#include "io/channel-buffer.h"
  23#include "trace.h"
  24#include "qemu/error-report.h"
  25#include "qemu/main-loop.h"
  26#include "qemu/rcu.h"
  27#include "migration/failover.h"
  28#include "migration/ram.h"
  29#ifdef CONFIG_REPLICATION
  30#include "block/replication.h"
  31#endif
  32#include "net/colo-compare.h"
  33#include "net/colo.h"
  34#include "block/block.h"
  35#include "qapi/qapi-events-migration.h"
  36#include "qapi/qmp/qerror.h"
  37#include "sysemu/cpus.h"
  38#include "sysemu/runstate.h"
  39#include "net/filter.h"
  40
  41static bool vmstate_loading;
  42static Notifier packets_compare_notifier;
  43
  44/* User need to know colo mode after COLO failover */
  45static COLOMode last_colo_mode;
  46
  47#define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
  48
  49bool migration_in_colo_state(void)
  50{
  51    MigrationState *s = migrate_get_current();
  52
  53    return (s->state == MIGRATION_STATUS_COLO);
  54}
  55
  56bool migration_incoming_in_colo_state(void)
  57{
  58    MigrationIncomingState *mis = migration_incoming_get_current();
  59
  60    return mis && (mis->state == MIGRATION_STATUS_COLO);
  61}
  62
  63static bool colo_runstate_is_stopped(void)
  64{
  65    return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
  66}
  67
  68static void secondary_vm_do_failover(void)
  69{
  70/* COLO needs enable block-replication */
  71#ifdef CONFIG_REPLICATION
  72    int old_state;
  73    MigrationIncomingState *mis = migration_incoming_get_current();
  74    Error *local_err = NULL;
  75
  76    /* Can not do failover during the process of VM's loading VMstate, Or
  77     * it will break the secondary VM.
  78     */
  79    if (vmstate_loading) {
  80        old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
  81                        FAILOVER_STATUS_RELAUNCH);
  82        if (old_state != FAILOVER_STATUS_ACTIVE) {
  83            error_report("Unknown error while do failover for secondary VM,"
  84                         "old_state: %s", FailoverStatus_str(old_state));
  85        }
  86        return;
  87    }
  88
  89    migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
  90                      MIGRATION_STATUS_COMPLETED);
  91
  92    replication_stop_all(true, &local_err);
  93    if (local_err) {
  94        error_report_err(local_err);
  95        local_err = NULL;
  96    }
  97
  98    /* Notify all filters of all NIC to do checkpoint */
  99    colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err);
 100    if (local_err) {
 101        error_report_err(local_err);
 102    }
 103
 104    if (!autostart) {
 105        error_report("\"-S\" qemu option will be ignored in secondary side");
 106        /* recover runstate to normal migration finish state */
 107        autostart = true;
 108    }
 109    /*
 110     * Make sure COLO incoming thread not block in recv or send,
 111     * If mis->from_src_file and mis->to_src_file use the same fd,
 112     * The second shutdown() will return -1, we ignore this value,
 113     * It is harmless.
 114     */
 115    if (mis->from_src_file) {
 116        qemu_file_shutdown(mis->from_src_file);
 117    }
 118    if (mis->to_src_file) {
 119        qemu_file_shutdown(mis->to_src_file);
 120    }
 121
 122    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 123                                   FAILOVER_STATUS_COMPLETED);
 124    if (old_state != FAILOVER_STATUS_ACTIVE) {
 125        error_report("Incorrect state (%s) while doing failover for "
 126                     "secondary VM", FailoverStatus_str(old_state));
 127        return;
 128    }
 129    /* Notify COLO incoming thread that failover work is finished */
 130    qemu_sem_post(&mis->colo_incoming_sem);
 131
 132    /* For Secondary VM, jump to incoming co */
 133    if (mis->migration_incoming_co) {
 134        qemu_coroutine_enter(mis->migration_incoming_co);
 135    }
 136#else
 137    abort();
 138#endif
 139}
 140
 141static void primary_vm_do_failover(void)
 142{
 143#ifdef CONFIG_REPLICATION
 144    MigrationState *s = migrate_get_current();
 145    int old_state;
 146    Error *local_err = NULL;
 147
 148    migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
 149                      MIGRATION_STATUS_COMPLETED);
 150    /*
 151     * kick COLO thread which might wait at
 152     * qemu_sem_wait(&s->colo_checkpoint_sem).
 153     */
 154    colo_checkpoint_notify(s);
 155
 156    /*
 157     * Wake up COLO thread which may blocked in recv() or send(),
 158     * The s->rp_state.from_dst_file and s->to_dst_file may use the
 159     * same fd, but we still shutdown the fd for twice, it is harmless.
 160     */
 161    if (s->to_dst_file) {
 162        qemu_file_shutdown(s->to_dst_file);
 163    }
 164    if (s->rp_state.from_dst_file) {
 165        qemu_file_shutdown(s->rp_state.from_dst_file);
 166    }
 167
 168    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 169                                   FAILOVER_STATUS_COMPLETED);
 170    if (old_state != FAILOVER_STATUS_ACTIVE) {
 171        error_report("Incorrect state (%s) while doing failover for Primary VM",
 172                     FailoverStatus_str(old_state));
 173        return;
 174    }
 175
 176    replication_stop_all(true, &local_err);
 177    if (local_err) {
 178        error_report_err(local_err);
 179        local_err = NULL;
 180    }
 181
 182    /* Notify COLO thread that failover work is finished */
 183    qemu_sem_post(&s->colo_exit_sem);
 184#else
 185    abort();
 186#endif
 187}
 188
 189COLOMode get_colo_mode(void)
 190{
 191    if (migration_in_colo_state()) {
 192        return COLO_MODE_PRIMARY;
 193    } else if (migration_incoming_in_colo_state()) {
 194        return COLO_MODE_SECONDARY;
 195    } else {
 196        return COLO_MODE_NONE;
 197    }
 198}
 199
 200void colo_do_failover(void)
 201{
 202    /* Make sure VM stopped while failover happened. */
 203    if (!colo_runstate_is_stopped()) {
 204        vm_stop_force_state(RUN_STATE_COLO);
 205    }
 206
 207    switch (last_colo_mode = get_colo_mode()) {
 208    case COLO_MODE_PRIMARY:
 209        primary_vm_do_failover();
 210        break;
 211    case COLO_MODE_SECONDARY:
 212        secondary_vm_do_failover();
 213        break;
 214    default:
 215        error_report("colo_do_failover failed because the colo mode"
 216                     " could not be obtained");
 217    }
 218}
 219
 220#ifdef CONFIG_REPLICATION
 221void qmp_xen_set_replication(bool enable, bool primary,
 222                             bool has_failover, bool failover,
 223                             Error **errp)
 224{
 225    ReplicationMode mode = primary ?
 226                           REPLICATION_MODE_PRIMARY :
 227                           REPLICATION_MODE_SECONDARY;
 228
 229    if (has_failover && enable) {
 230        error_setg(errp, "Parameter 'failover' is only for"
 231                   " stopping replication");
 232        return;
 233    }
 234
 235    if (enable) {
 236        replication_start_all(mode, errp);
 237    } else {
 238        if (!has_failover) {
 239            failover = NULL;
 240        }
 241        replication_stop_all(failover, failover ? NULL : errp);
 242    }
 243}
 244
 245ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
 246{
 247    Error *err = NULL;
 248    ReplicationStatus *s = g_new0(ReplicationStatus, 1);
 249
 250    replication_get_error_all(&err);
 251    if (err) {
 252        s->error = true;
 253        s->has_desc = true;
 254        s->desc = g_strdup(error_get_pretty(err));
 255    } else {
 256        s->error = false;
 257    }
 258
 259    error_free(err);
 260    return s;
 261}
 262
 263void qmp_xen_colo_do_checkpoint(Error **errp)
 264{
 265    Error *err = NULL;
 266
 267    replication_do_checkpoint_all(&err);
 268    if (err) {
 269        error_propagate(errp, err);
 270        return;
 271    }
 272    /* Notify all filters of all NIC to do checkpoint */
 273    colo_notify_filters_event(COLO_EVENT_CHECKPOINT, errp);
 274}
 275#endif
 276
 277COLOStatus *qmp_query_colo_status(Error **errp)
 278{
 279    COLOStatus *s = g_new0(COLOStatus, 1);
 280
 281    s->mode = get_colo_mode();
 282    s->last_mode = last_colo_mode;
 283
 284    switch (failover_get_state()) {
 285    case FAILOVER_STATUS_NONE:
 286        s->reason = COLO_EXIT_REASON_NONE;
 287        break;
 288    case FAILOVER_STATUS_COMPLETED:
 289        s->reason = COLO_EXIT_REASON_REQUEST;
 290        break;
 291    default:
 292        if (migration_in_colo_state()) {
 293            s->reason = COLO_EXIT_REASON_PROCESSING;
 294        } else {
 295            s->reason = COLO_EXIT_REASON_ERROR;
 296        }
 297    }
 298
 299    return s;
 300}
 301
 302static void colo_send_message(QEMUFile *f, COLOMessage msg,
 303                              Error **errp)
 304{
 305    int ret;
 306
 307    if (msg >= COLO_MESSAGE__MAX) {
 308        error_setg(errp, "%s: Invalid message", __func__);
 309        return;
 310    }
 311    qemu_put_be32(f, msg);
 312    qemu_fflush(f);
 313
 314    ret = qemu_file_get_error(f);
 315    if (ret < 0) {
 316        error_setg_errno(errp, -ret, "Can't send COLO message");
 317    }
 318    trace_colo_send_message(COLOMessage_str(msg));
 319}
 320
 321static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
 322                                    uint64_t value, Error **errp)
 323{
 324    Error *local_err = NULL;
 325    int ret;
 326
 327    colo_send_message(f, msg, &local_err);
 328    if (local_err) {
 329        error_propagate(errp, local_err);
 330        return;
 331    }
 332    qemu_put_be64(f, value);
 333    qemu_fflush(f);
 334
 335    ret = qemu_file_get_error(f);
 336    if (ret < 0) {
 337        error_setg_errno(errp, -ret, "Failed to send value for message:%s",
 338                         COLOMessage_str(msg));
 339    }
 340}
 341
 342static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
 343{
 344    COLOMessage msg;
 345    int ret;
 346
 347    msg = qemu_get_be32(f);
 348    ret = qemu_file_get_error(f);
 349    if (ret < 0) {
 350        error_setg_errno(errp, -ret, "Can't receive COLO message");
 351        return msg;
 352    }
 353    if (msg >= COLO_MESSAGE__MAX) {
 354        error_setg(errp, "%s: Invalid message", __func__);
 355        return msg;
 356    }
 357    trace_colo_receive_message(COLOMessage_str(msg));
 358    return msg;
 359}
 360
 361static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
 362                                       Error **errp)
 363{
 364    COLOMessage msg;
 365    Error *local_err = NULL;
 366
 367    msg = colo_receive_message(f, &local_err);
 368    if (local_err) {
 369        error_propagate(errp, local_err);
 370        return;
 371    }
 372    if (msg != expect_msg) {
 373        error_setg(errp, "Unexpected COLO message %d, expected %d",
 374                          msg, expect_msg);
 375    }
 376}
 377
 378static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
 379                                           Error **errp)
 380{
 381    Error *local_err = NULL;
 382    uint64_t value;
 383    int ret;
 384
 385    colo_receive_check_message(f, expect_msg, &local_err);
 386    if (local_err) {
 387        error_propagate(errp, local_err);
 388        return 0;
 389    }
 390
 391    value = qemu_get_be64(f);
 392    ret = qemu_file_get_error(f);
 393    if (ret < 0) {
 394        error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
 395                         COLOMessage_str(expect_msg));
 396    }
 397    return value;
 398}
 399
 400static int colo_do_checkpoint_transaction(MigrationState *s,
 401                                          QIOChannelBuffer *bioc,
 402                                          QEMUFile *fb)
 403{
 404    Error *local_err = NULL;
 405    int ret = -1;
 406
 407    colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
 408                      &local_err);
 409    if (local_err) {
 410        goto out;
 411    }
 412
 413    colo_receive_check_message(s->rp_state.from_dst_file,
 414                    COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
 415    if (local_err) {
 416        goto out;
 417    }
 418    /* Reset channel-buffer directly */
 419    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 420    bioc->usage = 0;
 421
 422    qemu_mutex_lock_iothread();
 423    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 424        qemu_mutex_unlock_iothread();
 425        goto out;
 426    }
 427    vm_stop_force_state(RUN_STATE_COLO);
 428    qemu_mutex_unlock_iothread();
 429    trace_colo_vm_state_change("run", "stop");
 430    /*
 431     * Failover request bh could be called after vm_stop_force_state(),
 432     * So we need check failover_request_is_active() again.
 433     */
 434    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 435        goto out;
 436    }
 437    qemu_mutex_lock_iothread();
 438
 439#ifdef CONFIG_REPLICATION
 440    replication_do_checkpoint_all(&local_err);
 441    if (local_err) {
 442        qemu_mutex_unlock_iothread();
 443        goto out;
 444    }
 445#else
 446        abort();
 447#endif
 448
 449    colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
 450    if (local_err) {
 451        qemu_mutex_unlock_iothread();
 452        goto out;
 453    }
 454    /* Note: device state is saved into buffer */
 455    ret = qemu_save_device_state(fb);
 456
 457    qemu_mutex_unlock_iothread();
 458    if (ret < 0) {
 459        goto out;
 460    }
 461
 462    if (migrate_auto_converge()) {
 463        mig_throttle_counter_reset();
 464    }
 465    /*
 466     * Only save VM's live state, which not including device state.
 467     * TODO: We may need a timeout mechanism to prevent COLO process
 468     * to be blocked here.
 469     */
 470    qemu_savevm_live_state(s->to_dst_file);
 471
 472    qemu_fflush(fb);
 473
 474    /*
 475     * We need the size of the VMstate data in Secondary side,
 476     * With which we can decide how much data should be read.
 477     */
 478    colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
 479                            bioc->usage, &local_err);
 480    if (local_err) {
 481        goto out;
 482    }
 483
 484    qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
 485    qemu_fflush(s->to_dst_file);
 486    ret = qemu_file_get_error(s->to_dst_file);
 487    if (ret < 0) {
 488        goto out;
 489    }
 490
 491    colo_receive_check_message(s->rp_state.from_dst_file,
 492                       COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
 493    if (local_err) {
 494        goto out;
 495    }
 496
 497    qemu_event_reset(&s->colo_checkpoint_event);
 498    colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err);
 499    if (local_err) {
 500        goto out;
 501    }
 502
 503    colo_receive_check_message(s->rp_state.from_dst_file,
 504                       COLO_MESSAGE_VMSTATE_LOADED, &local_err);
 505    if (local_err) {
 506        goto out;
 507    }
 508
 509    ret = 0;
 510
 511    qemu_mutex_lock_iothread();
 512    vm_start();
 513    qemu_mutex_unlock_iothread();
 514    trace_colo_vm_state_change("stop", "run");
 515
 516out:
 517    if (local_err) {
 518        error_report_err(local_err);
 519    }
 520    return ret;
 521}
 522
 523static void colo_compare_notify_checkpoint(Notifier *notifier, void *data)
 524{
 525    colo_checkpoint_notify(data);
 526}
 527
 528static void colo_process_checkpoint(MigrationState *s)
 529{
 530    QIOChannelBuffer *bioc;
 531    QEMUFile *fb = NULL;
 532    Error *local_err = NULL;
 533    int ret;
 534
 535    if (get_colo_mode() != COLO_MODE_PRIMARY) {
 536        error_report("COLO mode must be COLO_MODE_PRIMARY");
 537        return;
 538    }
 539
 540    failover_init_state();
 541
 542    s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
 543    if (!s->rp_state.from_dst_file) {
 544        error_report("Open QEMUFile from_dst_file failed");
 545        goto out;
 546    }
 547
 548    packets_compare_notifier.notify = colo_compare_notify_checkpoint;
 549    colo_compare_register_notifier(&packets_compare_notifier);
 550
 551    /*
 552     * Wait for Secondary finish loading VM states and enter COLO
 553     * restore.
 554     */
 555    colo_receive_check_message(s->rp_state.from_dst_file,
 556                       COLO_MESSAGE_CHECKPOINT_READY, &local_err);
 557    if (local_err) {
 558        goto out;
 559    }
 560    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 561    fb = qemu_file_new_output(QIO_CHANNEL(bioc));
 562    object_unref(OBJECT(bioc));
 563
 564    qemu_mutex_lock_iothread();
 565#ifdef CONFIG_REPLICATION
 566    replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
 567    if (local_err) {
 568        qemu_mutex_unlock_iothread();
 569        goto out;
 570    }
 571#else
 572        abort();
 573#endif
 574
 575    vm_start();
 576    qemu_mutex_unlock_iothread();
 577    trace_colo_vm_state_change("stop", "run");
 578
 579    timer_mod(s->colo_delay_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) +
 580              s->parameters.x_checkpoint_delay);
 581
 582    while (s->state == MIGRATION_STATUS_COLO) {
 583        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 584            error_report("failover request");
 585            goto out;
 586        }
 587
 588        qemu_event_wait(&s->colo_checkpoint_event);
 589
 590        if (s->state != MIGRATION_STATUS_COLO) {
 591            goto out;
 592        }
 593        ret = colo_do_checkpoint_transaction(s, bioc, fb);
 594        if (ret < 0) {
 595            goto out;
 596        }
 597    }
 598
 599out:
 600    /* Throw the unreported error message after exited from loop */
 601    if (local_err) {
 602        error_report_err(local_err);
 603    }
 604
 605    if (fb) {
 606        qemu_fclose(fb);
 607    }
 608
 609    /*
 610     * There are only two reasons we can get here, some error happened
 611     * or the user triggered failover.
 612     */
 613    switch (failover_get_state()) {
 614    case FAILOVER_STATUS_COMPLETED:
 615        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 616                                  COLO_EXIT_REASON_REQUEST);
 617        break;
 618    default:
 619        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 620                                  COLO_EXIT_REASON_ERROR);
 621    }
 622
 623    /* Hope this not to be too long to wait here */
 624    qemu_sem_wait(&s->colo_exit_sem);
 625    qemu_sem_destroy(&s->colo_exit_sem);
 626
 627    /*
 628     * It is safe to unregister notifier after failover finished.
 629     * Besides, colo_delay_timer and colo_checkpoint_sem can't be
 630     * released before unregister notifier, or there will be use-after-free
 631     * error.
 632     */
 633    colo_compare_unregister_notifier(&packets_compare_notifier);
 634    timer_free(s->colo_delay_timer);
 635    qemu_event_destroy(&s->colo_checkpoint_event);
 636
 637    /*
 638     * Must be called after failover BH is completed,
 639     * Or the failover BH may shutdown the wrong fd that
 640     * re-used by other threads after we release here.
 641     */
 642    if (s->rp_state.from_dst_file) {
 643        qemu_fclose(s->rp_state.from_dst_file);
 644        s->rp_state.from_dst_file = NULL;
 645    }
 646}
 647
 648void colo_checkpoint_notify(void *opaque)
 649{
 650    MigrationState *s = opaque;
 651    int64_t next_notify_time;
 652
 653    qemu_event_set(&s->colo_checkpoint_event);
 654    s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 655    next_notify_time = s->colo_checkpoint_time +
 656                    s->parameters.x_checkpoint_delay;
 657    timer_mod(s->colo_delay_timer, next_notify_time);
 658}
 659
 660void migrate_start_colo_process(MigrationState *s)
 661{
 662    qemu_mutex_unlock_iothread();
 663    qemu_event_init(&s->colo_checkpoint_event, false);
 664    s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
 665                                colo_checkpoint_notify, s);
 666
 667    qemu_sem_init(&s->colo_exit_sem, 0);
 668    colo_process_checkpoint(s);
 669    qemu_mutex_lock_iothread();
 670}
 671
 672static void colo_incoming_process_checkpoint(MigrationIncomingState *mis,
 673                      QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
 674{
 675    uint64_t total_size;
 676    uint64_t value;
 677    Error *local_err = NULL;
 678    int ret;
 679
 680    qemu_mutex_lock_iothread();
 681    vm_stop_force_state(RUN_STATE_COLO);
 682    qemu_mutex_unlock_iothread();
 683    trace_colo_vm_state_change("run", "stop");
 684
 685    /* FIXME: This is unnecessary for periodic checkpoint mode */
 686    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
 687                 &local_err);
 688    if (local_err) {
 689        error_propagate(errp, local_err);
 690        return;
 691    }
 692
 693    colo_receive_check_message(mis->from_src_file,
 694                       COLO_MESSAGE_VMSTATE_SEND, &local_err);
 695    if (local_err) {
 696        error_propagate(errp, local_err);
 697        return;
 698    }
 699
 700    qemu_mutex_lock_iothread();
 701    cpu_synchronize_all_states();
 702    ret = qemu_loadvm_state_main(mis->from_src_file, mis);
 703    qemu_mutex_unlock_iothread();
 704
 705    if (ret < 0) {
 706        error_setg(errp, "Load VM's live state (ram) error");
 707        return;
 708    }
 709
 710    value = colo_receive_message_value(mis->from_src_file,
 711                             COLO_MESSAGE_VMSTATE_SIZE, &local_err);
 712    if (local_err) {
 713        error_propagate(errp, local_err);
 714        return;
 715    }
 716
 717    /*
 718     * Read VM device state data into channel buffer,
 719     * It's better to re-use the memory allocated.
 720     * Here we need to handle the channel buffer directly.
 721     */
 722    if (value > bioc->capacity) {
 723        bioc->capacity = value;
 724        bioc->data = g_realloc(bioc->data, bioc->capacity);
 725    }
 726    total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
 727    if (total_size != value) {
 728        error_setg(errp, "Got %" PRIu64 " VMState data, less than expected"
 729                    " %" PRIu64, total_size, value);
 730        return;
 731    }
 732    bioc->usage = total_size;
 733    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 734
 735    colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
 736                 &local_err);
 737    if (local_err) {
 738        error_propagate(errp, local_err);
 739        return;
 740    }
 741
 742    qemu_mutex_lock_iothread();
 743    vmstate_loading = true;
 744    colo_flush_ram_cache();
 745    ret = qemu_load_device_state(fb);
 746    if (ret < 0) {
 747        error_setg(errp, "COLO: load device state failed");
 748        vmstate_loading = false;
 749        qemu_mutex_unlock_iothread();
 750        return;
 751    }
 752
 753#ifdef CONFIG_REPLICATION
 754    replication_get_error_all(&local_err);
 755    if (local_err) {
 756        error_propagate(errp, local_err);
 757        vmstate_loading = false;
 758        qemu_mutex_unlock_iothread();
 759        return;
 760    }
 761
 762    /* discard colo disk buffer */
 763    replication_do_checkpoint_all(&local_err);
 764    if (local_err) {
 765        error_propagate(errp, local_err);
 766        vmstate_loading = false;
 767        qemu_mutex_unlock_iothread();
 768        return;
 769    }
 770#else
 771    abort();
 772#endif
 773    /* Notify all filters of all NIC to do checkpoint */
 774    colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
 775
 776    if (local_err) {
 777        error_propagate(errp, local_err);
 778        vmstate_loading = false;
 779        qemu_mutex_unlock_iothread();
 780        return;
 781    }
 782
 783    vmstate_loading = false;
 784    vm_start();
 785    qemu_mutex_unlock_iothread();
 786    trace_colo_vm_state_change("stop", "run");
 787
 788    if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 789        return;
 790    }
 791
 792    colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
 793                 &local_err);
 794    error_propagate(errp, local_err);
 795}
 796
 797static void colo_wait_handle_message(MigrationIncomingState *mis,
 798                QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
 799{
 800    COLOMessage msg;
 801    Error *local_err = NULL;
 802
 803    msg = colo_receive_message(mis->from_src_file, &local_err);
 804    if (local_err) {
 805        error_propagate(errp, local_err);
 806        return;
 807    }
 808
 809    switch (msg) {
 810    case COLO_MESSAGE_CHECKPOINT_REQUEST:
 811        colo_incoming_process_checkpoint(mis, fb, bioc, errp);
 812        break;
 813    default:
 814        error_setg(errp, "Got unknown COLO message: %d", msg);
 815        break;
 816    }
 817}
 818
 819void colo_shutdown(void)
 820{
 821    MigrationIncomingState *mis = NULL;
 822    MigrationState *s = NULL;
 823
 824    switch (get_colo_mode()) {
 825    case COLO_MODE_PRIMARY:
 826        s = migrate_get_current();
 827        qemu_event_set(&s->colo_checkpoint_event);
 828        qemu_sem_post(&s->colo_exit_sem);
 829        break;
 830    case COLO_MODE_SECONDARY:
 831        mis = migration_incoming_get_current();
 832        qemu_sem_post(&mis->colo_incoming_sem);
 833        break;
 834    default:
 835        break;
 836    }
 837}
 838
 839void *colo_process_incoming_thread(void *opaque)
 840{
 841    MigrationIncomingState *mis = opaque;
 842    QEMUFile *fb = NULL;
 843    QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
 844    Error *local_err = NULL;
 845
 846    rcu_register_thread();
 847    qemu_sem_init(&mis->colo_incoming_sem, 0);
 848
 849    migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
 850                      MIGRATION_STATUS_COLO);
 851
 852    if (get_colo_mode() != COLO_MODE_SECONDARY) {
 853        error_report("COLO mode must be COLO_MODE_SECONDARY");
 854        return NULL;
 855    }
 856
 857    failover_init_state();
 858
 859    mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
 860    if (!mis->to_src_file) {
 861        error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
 862        goto out;
 863    }
 864    /*
 865     * Note: the communication between Primary side and Secondary side
 866     * should be sequential, we set the fd to unblocked in migration incoming
 867     * coroutine, and here we are in the COLO incoming thread, so it is ok to
 868     * set the fd back to blocked.
 869     */
 870    qemu_file_set_blocking(mis->from_src_file, true);
 871
 872    colo_incoming_start_dirty_log();
 873
 874    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 875    fb = qemu_file_new_input(QIO_CHANNEL(bioc));
 876    object_unref(OBJECT(bioc));
 877
 878    qemu_mutex_lock_iothread();
 879#ifdef CONFIG_REPLICATION
 880    replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
 881    if (local_err) {
 882        qemu_mutex_unlock_iothread();
 883        goto out;
 884    }
 885#else
 886        abort();
 887#endif
 888    vm_start();
 889    qemu_mutex_unlock_iothread();
 890    trace_colo_vm_state_change("stop", "run");
 891
 892    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
 893                      &local_err);
 894    if (local_err) {
 895        goto out;
 896    }
 897
 898    while (mis->state == MIGRATION_STATUS_COLO) {
 899        colo_wait_handle_message(mis, fb, bioc, &local_err);
 900        if (local_err) {
 901            error_report_err(local_err);
 902            break;
 903        }
 904
 905        if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 906            failover_set_state(FAILOVER_STATUS_RELAUNCH,
 907                            FAILOVER_STATUS_NONE);
 908            failover_request_active(NULL);
 909            break;
 910        }
 911
 912        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 913            error_report("failover request");
 914            break;
 915        }
 916    }
 917
 918out:
 919    /*
 920     * There are only two reasons we can get here, some error happened
 921     * or the user triggered failover.
 922     */
 923    switch (failover_get_state()) {
 924    case FAILOVER_STATUS_COMPLETED:
 925        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 926                                  COLO_EXIT_REASON_REQUEST);
 927        break;
 928    default:
 929        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 930                                  COLO_EXIT_REASON_ERROR);
 931    }
 932
 933    if (fb) {
 934        qemu_fclose(fb);
 935    }
 936
 937    /* Hope this not to be too long to loop here */
 938    qemu_sem_wait(&mis->colo_incoming_sem);
 939    qemu_sem_destroy(&mis->colo_incoming_sem);
 940
 941    rcu_unregister_thread();
 942    return NULL;
 943}
 944