qemu/migration/colo.c
<<
>>
Prefs
   1/*
   2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
   3 * (a.k.a. Fault Tolerance or Continuous Replication)
   4 *
   5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 * Copyright (c) 2016 Intel Corporation
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or
  10 * later.  See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include "sysemu/sysemu.h"
  15#include "qapi/error.h"
  16#include "qapi/qapi-commands-migration.h"
  17#include "qemu-file-channel.h"
  18#include "migration.h"
  19#include "qemu-file.h"
  20#include "savevm.h"
  21#include "migration/colo.h"
  22#include "block.h"
  23#include "io/channel-buffer.h"
  24#include "trace.h"
  25#include "qemu/error-report.h"
  26#include "qemu/main-loop.h"
  27#include "qemu/rcu.h"
  28#include "migration/failover.h"
  29#include "migration/ram.h"
  30#ifdef CONFIG_REPLICATION
  31#include "block/replication.h"
  32#endif
  33#include "net/colo-compare.h"
  34#include "net/colo.h"
  35#include "block/block.h"
  36#include "qapi/qapi-events-migration.h"
  37#include "qapi/qmp/qerror.h"
  38#include "sysemu/cpus.h"
  39#include "sysemu/runstate.h"
  40#include "net/filter.h"
  41
  42static bool vmstate_loading;
  43static Notifier packets_compare_notifier;
  44
  45/* User need to know colo mode after COLO failover */
  46static COLOMode last_colo_mode;
  47
  48#define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
  49
  50bool migration_in_colo_state(void)
  51{
  52    MigrationState *s = migrate_get_current();
  53
  54    return (s->state == MIGRATION_STATUS_COLO);
  55}
  56
  57bool migration_incoming_in_colo_state(void)
  58{
  59    MigrationIncomingState *mis = migration_incoming_get_current();
  60
  61    return mis && (mis->state == MIGRATION_STATUS_COLO);
  62}
  63
  64static bool colo_runstate_is_stopped(void)
  65{
  66    return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
  67}
  68
  69static void secondary_vm_do_failover(void)
  70{
  71/* COLO needs enable block-replication */
  72#ifdef CONFIG_REPLICATION
  73    int old_state;
  74    MigrationIncomingState *mis = migration_incoming_get_current();
  75    Error *local_err = NULL;
  76
  77    /* Can not do failover during the process of VM's loading VMstate, Or
  78     * it will break the secondary VM.
  79     */
  80    if (vmstate_loading) {
  81        old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
  82                        FAILOVER_STATUS_RELAUNCH);
  83        if (old_state != FAILOVER_STATUS_ACTIVE) {
  84            error_report("Unknown error while do failover for secondary VM,"
  85                         "old_state: %s", FailoverStatus_str(old_state));
  86        }
  87        return;
  88    }
  89
  90    migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
  91                      MIGRATION_STATUS_COMPLETED);
  92
  93    replication_stop_all(true, &local_err);
  94    if (local_err) {
  95        error_report_err(local_err);
  96        local_err = NULL;
  97    }
  98
  99    /* Notify all filters of all NIC to do checkpoint */
 100    colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err);
 101    if (local_err) {
 102        error_report_err(local_err);
 103    }
 104
 105    if (!autostart) {
 106        error_report("\"-S\" qemu option will be ignored in secondary side");
 107        /* recover runstate to normal migration finish state */
 108        autostart = true;
 109    }
 110    /*
 111     * Make sure COLO incoming thread not block in recv or send,
 112     * If mis->from_src_file and mis->to_src_file use the same fd,
 113     * The second shutdown() will return -1, we ignore this value,
 114     * It is harmless.
 115     */
 116    if (mis->from_src_file) {
 117        qemu_file_shutdown(mis->from_src_file);
 118    }
 119    if (mis->to_src_file) {
 120        qemu_file_shutdown(mis->to_src_file);
 121    }
 122
 123    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 124                                   FAILOVER_STATUS_COMPLETED);
 125    if (old_state != FAILOVER_STATUS_ACTIVE) {
 126        error_report("Incorrect state (%s) while doing failover for "
 127                     "secondary VM", FailoverStatus_str(old_state));
 128        return;
 129    }
 130    /* Notify COLO incoming thread that failover work is finished */
 131    qemu_sem_post(&mis->colo_incoming_sem);
 132
 133    /* For Secondary VM, jump to incoming co */
 134    if (mis->migration_incoming_co) {
 135        qemu_coroutine_enter(mis->migration_incoming_co);
 136    }
 137#else
 138    abort();
 139#endif
 140}
 141
 142static void primary_vm_do_failover(void)
 143{
 144#ifdef CONFIG_REPLICATION
 145    MigrationState *s = migrate_get_current();
 146    int old_state;
 147    Error *local_err = NULL;
 148
 149    migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
 150                      MIGRATION_STATUS_COMPLETED);
 151    /*
 152     * kick COLO thread which might wait at
 153     * qemu_sem_wait(&s->colo_checkpoint_sem).
 154     */
 155    colo_checkpoint_notify(s);
 156
 157    /*
 158     * Wake up COLO thread which may blocked in recv() or send(),
 159     * The s->rp_state.from_dst_file and s->to_dst_file may use the
 160     * same fd, but we still shutdown the fd for twice, it is harmless.
 161     */
 162    if (s->to_dst_file) {
 163        qemu_file_shutdown(s->to_dst_file);
 164    }
 165    if (s->rp_state.from_dst_file) {
 166        qemu_file_shutdown(s->rp_state.from_dst_file);
 167    }
 168
 169    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 170                                   FAILOVER_STATUS_COMPLETED);
 171    if (old_state != FAILOVER_STATUS_ACTIVE) {
 172        error_report("Incorrect state (%s) while doing failover for Primary VM",
 173                     FailoverStatus_str(old_state));
 174        return;
 175    }
 176
 177    replication_stop_all(true, &local_err);
 178    if (local_err) {
 179        error_report_err(local_err);
 180        local_err = NULL;
 181    }
 182
 183    /* Notify COLO thread that failover work is finished */
 184    qemu_sem_post(&s->colo_exit_sem);
 185#else
 186    abort();
 187#endif
 188}
 189
 190COLOMode get_colo_mode(void)
 191{
 192    if (migration_in_colo_state()) {
 193        return COLO_MODE_PRIMARY;
 194    } else if (migration_incoming_in_colo_state()) {
 195        return COLO_MODE_SECONDARY;
 196    } else {
 197        return COLO_MODE_NONE;
 198    }
 199}
 200
 201void colo_do_failover(void)
 202{
 203    /* Make sure VM stopped while failover happened. */
 204    if (!colo_runstate_is_stopped()) {
 205        vm_stop_force_state(RUN_STATE_COLO);
 206    }
 207
 208    switch (last_colo_mode = get_colo_mode()) {
 209    case COLO_MODE_PRIMARY:
 210        primary_vm_do_failover();
 211        break;
 212    case COLO_MODE_SECONDARY:
 213        secondary_vm_do_failover();
 214        break;
 215    default:
 216        error_report("colo_do_failover failed because the colo mode"
 217                     " could not be obtained");
 218    }
 219}
 220
 221#ifdef CONFIG_REPLICATION
 222void qmp_xen_set_replication(bool enable, bool primary,
 223                             bool has_failover, bool failover,
 224                             Error **errp)
 225{
 226    ReplicationMode mode = primary ?
 227                           REPLICATION_MODE_PRIMARY :
 228                           REPLICATION_MODE_SECONDARY;
 229
 230    if (has_failover && enable) {
 231        error_setg(errp, "Parameter 'failover' is only for"
 232                   " stopping replication");
 233        return;
 234    }
 235
 236    if (enable) {
 237        replication_start_all(mode, errp);
 238    } else {
 239        if (!has_failover) {
 240            failover = NULL;
 241        }
 242        replication_stop_all(failover, failover ? NULL : errp);
 243    }
 244}
 245
 246ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
 247{
 248    Error *err = NULL;
 249    ReplicationStatus *s = g_new0(ReplicationStatus, 1);
 250
 251    replication_get_error_all(&err);
 252    if (err) {
 253        s->error = true;
 254        s->has_desc = true;
 255        s->desc = g_strdup(error_get_pretty(err));
 256    } else {
 257        s->error = false;
 258    }
 259
 260    error_free(err);
 261    return s;
 262}
 263
 264void qmp_xen_colo_do_checkpoint(Error **errp)
 265{
 266    Error *err = NULL;
 267
 268    replication_do_checkpoint_all(&err);
 269    if (err) {
 270        error_propagate(errp, err);
 271        return;
 272    }
 273    /* Notify all filters of all NIC to do checkpoint */
 274    colo_notify_filters_event(COLO_EVENT_CHECKPOINT, errp);
 275}
 276#endif
 277
 278COLOStatus *qmp_query_colo_status(Error **errp)
 279{
 280    COLOStatus *s = g_new0(COLOStatus, 1);
 281
 282    s->mode = get_colo_mode();
 283    s->last_mode = last_colo_mode;
 284
 285    switch (failover_get_state()) {
 286    case FAILOVER_STATUS_NONE:
 287        s->reason = COLO_EXIT_REASON_NONE;
 288        break;
 289    case FAILOVER_STATUS_COMPLETED:
 290        s->reason = COLO_EXIT_REASON_REQUEST;
 291        break;
 292    default:
 293        if (migration_in_colo_state()) {
 294            s->reason = COLO_EXIT_REASON_PROCESSING;
 295        } else {
 296            s->reason = COLO_EXIT_REASON_ERROR;
 297        }
 298    }
 299
 300    return s;
 301}
 302
 303static void colo_send_message(QEMUFile *f, COLOMessage msg,
 304                              Error **errp)
 305{
 306    int ret;
 307
 308    if (msg >= COLO_MESSAGE__MAX) {
 309        error_setg(errp, "%s: Invalid message", __func__);
 310        return;
 311    }
 312    qemu_put_be32(f, msg);
 313    qemu_fflush(f);
 314
 315    ret = qemu_file_get_error(f);
 316    if (ret < 0) {
 317        error_setg_errno(errp, -ret, "Can't send COLO message");
 318    }
 319    trace_colo_send_message(COLOMessage_str(msg));
 320}
 321
 322static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
 323                                    uint64_t value, Error **errp)
 324{
 325    Error *local_err = NULL;
 326    int ret;
 327
 328    colo_send_message(f, msg, &local_err);
 329    if (local_err) {
 330        error_propagate(errp, local_err);
 331        return;
 332    }
 333    qemu_put_be64(f, value);
 334    qemu_fflush(f);
 335
 336    ret = qemu_file_get_error(f);
 337    if (ret < 0) {
 338        error_setg_errno(errp, -ret, "Failed to send value for message:%s",
 339                         COLOMessage_str(msg));
 340    }
 341}
 342
 343static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
 344{
 345    COLOMessage msg;
 346    int ret;
 347
 348    msg = qemu_get_be32(f);
 349    ret = qemu_file_get_error(f);
 350    if (ret < 0) {
 351        error_setg_errno(errp, -ret, "Can't receive COLO message");
 352        return msg;
 353    }
 354    if (msg >= COLO_MESSAGE__MAX) {
 355        error_setg(errp, "%s: Invalid message", __func__);
 356        return msg;
 357    }
 358    trace_colo_receive_message(COLOMessage_str(msg));
 359    return msg;
 360}
 361
 362static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
 363                                       Error **errp)
 364{
 365    COLOMessage msg;
 366    Error *local_err = NULL;
 367
 368    msg = colo_receive_message(f, &local_err);
 369    if (local_err) {
 370        error_propagate(errp, local_err);
 371        return;
 372    }
 373    if (msg != expect_msg) {
 374        error_setg(errp, "Unexpected COLO message %d, expected %d",
 375                          msg, expect_msg);
 376    }
 377}
 378
 379static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
 380                                           Error **errp)
 381{
 382    Error *local_err = NULL;
 383    uint64_t value;
 384    int ret;
 385
 386    colo_receive_check_message(f, expect_msg, &local_err);
 387    if (local_err) {
 388        error_propagate(errp, local_err);
 389        return 0;
 390    }
 391
 392    value = qemu_get_be64(f);
 393    ret = qemu_file_get_error(f);
 394    if (ret < 0) {
 395        error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
 396                         COLOMessage_str(expect_msg));
 397    }
 398    return value;
 399}
 400
 401static int colo_do_checkpoint_transaction(MigrationState *s,
 402                                          QIOChannelBuffer *bioc,
 403                                          QEMUFile *fb)
 404{
 405    Error *local_err = NULL;
 406    int ret = -1;
 407
 408    colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
 409                      &local_err);
 410    if (local_err) {
 411        goto out;
 412    }
 413
 414    colo_receive_check_message(s->rp_state.from_dst_file,
 415                    COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
 416    if (local_err) {
 417        goto out;
 418    }
 419    /* Reset channel-buffer directly */
 420    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 421    bioc->usage = 0;
 422
 423    qemu_mutex_lock_iothread();
 424    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 425        qemu_mutex_unlock_iothread();
 426        goto out;
 427    }
 428    vm_stop_force_state(RUN_STATE_COLO);
 429    qemu_mutex_unlock_iothread();
 430    trace_colo_vm_state_change("run", "stop");
 431    /*
 432     * Failover request bh could be called after vm_stop_force_state(),
 433     * So we need check failover_request_is_active() again.
 434     */
 435    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 436        goto out;
 437    }
 438    qemu_mutex_lock_iothread();
 439
 440#ifdef CONFIG_REPLICATION
 441    replication_do_checkpoint_all(&local_err);
 442    if (local_err) {
 443        qemu_mutex_unlock_iothread();
 444        goto out;
 445    }
 446#else
 447        abort();
 448#endif
 449
 450    colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
 451    if (local_err) {
 452        qemu_mutex_unlock_iothread();
 453        goto out;
 454    }
 455    /* Note: device state is saved into buffer */
 456    ret = qemu_save_device_state(fb);
 457
 458    qemu_mutex_unlock_iothread();
 459    if (ret < 0) {
 460        goto out;
 461    }
 462
 463    if (migrate_auto_converge()) {
 464        mig_throttle_counter_reset();
 465    }
 466    /*
 467     * Only save VM's live state, which not including device state.
 468     * TODO: We may need a timeout mechanism to prevent COLO process
 469     * to be blocked here.
 470     */
 471    qemu_savevm_live_state(s->to_dst_file);
 472
 473    qemu_fflush(fb);
 474
 475    /*
 476     * We need the size of the VMstate data in Secondary side,
 477     * With which we can decide how much data should be read.
 478     */
 479    colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
 480                            bioc->usage, &local_err);
 481    if (local_err) {
 482        goto out;
 483    }
 484
 485    qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
 486    qemu_fflush(s->to_dst_file);
 487    ret = qemu_file_get_error(s->to_dst_file);
 488    if (ret < 0) {
 489        goto out;
 490    }
 491
 492    colo_receive_check_message(s->rp_state.from_dst_file,
 493                       COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
 494    if (local_err) {
 495        goto out;
 496    }
 497
 498    qemu_event_reset(&s->colo_checkpoint_event);
 499    colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err);
 500    if (local_err) {
 501        goto out;
 502    }
 503
 504    colo_receive_check_message(s->rp_state.from_dst_file,
 505                       COLO_MESSAGE_VMSTATE_LOADED, &local_err);
 506    if (local_err) {
 507        goto out;
 508    }
 509
 510    ret = 0;
 511
 512    qemu_mutex_lock_iothread();
 513    vm_start();
 514    qemu_mutex_unlock_iothread();
 515    trace_colo_vm_state_change("stop", "run");
 516
 517out:
 518    if (local_err) {
 519        error_report_err(local_err);
 520    }
 521    return ret;
 522}
 523
 524static void colo_compare_notify_checkpoint(Notifier *notifier, void *data)
 525{
 526    colo_checkpoint_notify(data);
 527}
 528
 529static void colo_process_checkpoint(MigrationState *s)
 530{
 531    QIOChannelBuffer *bioc;
 532    QEMUFile *fb = NULL;
 533    Error *local_err = NULL;
 534    int ret;
 535
 536    if (get_colo_mode() != COLO_MODE_PRIMARY) {
 537        error_report("COLO mode must be COLO_MODE_PRIMARY");
 538        return;
 539    }
 540
 541    failover_init_state();
 542
 543    s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
 544    if (!s->rp_state.from_dst_file) {
 545        error_report("Open QEMUFile from_dst_file failed");
 546        goto out;
 547    }
 548
 549    packets_compare_notifier.notify = colo_compare_notify_checkpoint;
 550    colo_compare_register_notifier(&packets_compare_notifier);
 551
 552    /*
 553     * Wait for Secondary finish loading VM states and enter COLO
 554     * restore.
 555     */
 556    colo_receive_check_message(s->rp_state.from_dst_file,
 557                       COLO_MESSAGE_CHECKPOINT_READY, &local_err);
 558    if (local_err) {
 559        goto out;
 560    }
 561    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 562    fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
 563    object_unref(OBJECT(bioc));
 564
 565    qemu_mutex_lock_iothread();
 566#ifdef CONFIG_REPLICATION
 567    replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
 568    if (local_err) {
 569        qemu_mutex_unlock_iothread();
 570        goto out;
 571    }
 572#else
 573        abort();
 574#endif
 575
 576    vm_start();
 577    qemu_mutex_unlock_iothread();
 578    trace_colo_vm_state_change("stop", "run");
 579
 580    timer_mod(s->colo_delay_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) +
 581              s->parameters.x_checkpoint_delay);
 582
 583    while (s->state == MIGRATION_STATUS_COLO) {
 584        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 585            error_report("failover request");
 586            goto out;
 587        }
 588
 589        qemu_event_wait(&s->colo_checkpoint_event);
 590
 591        if (s->state != MIGRATION_STATUS_COLO) {
 592            goto out;
 593        }
 594        ret = colo_do_checkpoint_transaction(s, bioc, fb);
 595        if (ret < 0) {
 596            goto out;
 597        }
 598    }
 599
 600out:
 601    /* Throw the unreported error message after exited from loop */
 602    if (local_err) {
 603        error_report_err(local_err);
 604    }
 605
 606    if (fb) {
 607        qemu_fclose(fb);
 608    }
 609
 610    /*
 611     * There are only two reasons we can get here, some error happened
 612     * or the user triggered failover.
 613     */
 614    switch (failover_get_state()) {
 615    case FAILOVER_STATUS_COMPLETED:
 616        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 617                                  COLO_EXIT_REASON_REQUEST);
 618        break;
 619    default:
 620        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 621                                  COLO_EXIT_REASON_ERROR);
 622    }
 623
 624    /* Hope this not to be too long to wait here */
 625    qemu_sem_wait(&s->colo_exit_sem);
 626    qemu_sem_destroy(&s->colo_exit_sem);
 627
 628    /*
 629     * It is safe to unregister notifier after failover finished.
 630     * Besides, colo_delay_timer and colo_checkpoint_sem can't be
 631     * released before unregister notifier, or there will be use-after-free
 632     * error.
 633     */
 634    colo_compare_unregister_notifier(&packets_compare_notifier);
 635    timer_free(s->colo_delay_timer);
 636    qemu_event_destroy(&s->colo_checkpoint_event);
 637
 638    /*
 639     * Must be called after failover BH is completed,
 640     * Or the failover BH may shutdown the wrong fd that
 641     * re-used by other threads after we release here.
 642     */
 643    if (s->rp_state.from_dst_file) {
 644        qemu_fclose(s->rp_state.from_dst_file);
 645        s->rp_state.from_dst_file = NULL;
 646    }
 647}
 648
 649void colo_checkpoint_notify(void *opaque)
 650{
 651    MigrationState *s = opaque;
 652    int64_t next_notify_time;
 653
 654    qemu_event_set(&s->colo_checkpoint_event);
 655    s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 656    next_notify_time = s->colo_checkpoint_time +
 657                    s->parameters.x_checkpoint_delay;
 658    timer_mod(s->colo_delay_timer, next_notify_time);
 659}
 660
 661void migrate_start_colo_process(MigrationState *s)
 662{
 663    qemu_mutex_unlock_iothread();
 664    qemu_event_init(&s->colo_checkpoint_event, false);
 665    s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
 666                                colo_checkpoint_notify, s);
 667
 668    qemu_sem_init(&s->colo_exit_sem, 0);
 669    colo_process_checkpoint(s);
 670    qemu_mutex_lock_iothread();
 671}
 672
 673static void colo_incoming_process_checkpoint(MigrationIncomingState *mis,
 674                      QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
 675{
 676    uint64_t total_size;
 677    uint64_t value;
 678    Error *local_err = NULL;
 679    int ret;
 680
 681    qemu_mutex_lock_iothread();
 682    vm_stop_force_state(RUN_STATE_COLO);
 683    qemu_mutex_unlock_iothread();
 684    trace_colo_vm_state_change("run", "stop");
 685
 686    /* FIXME: This is unnecessary for periodic checkpoint mode */
 687    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
 688                 &local_err);
 689    if (local_err) {
 690        error_propagate(errp, local_err);
 691        return;
 692    }
 693
 694    colo_receive_check_message(mis->from_src_file,
 695                       COLO_MESSAGE_VMSTATE_SEND, &local_err);
 696    if (local_err) {
 697        error_propagate(errp, local_err);
 698        return;
 699    }
 700
 701    qemu_mutex_lock_iothread();
 702    cpu_synchronize_all_states();
 703    ret = qemu_loadvm_state_main(mis->from_src_file, mis);
 704    qemu_mutex_unlock_iothread();
 705
 706    if (ret < 0) {
 707        error_setg(errp, "Load VM's live state (ram) error");
 708        return;
 709    }
 710
 711    value = colo_receive_message_value(mis->from_src_file,
 712                             COLO_MESSAGE_VMSTATE_SIZE, &local_err);
 713    if (local_err) {
 714        error_propagate(errp, local_err);
 715        return;
 716    }
 717
 718    /*
 719     * Read VM device state data into channel buffer,
 720     * It's better to re-use the memory allocated.
 721     * Here we need to handle the channel buffer directly.
 722     */
 723    if (value > bioc->capacity) {
 724        bioc->capacity = value;
 725        bioc->data = g_realloc(bioc->data, bioc->capacity);
 726    }
 727    total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
 728    if (total_size != value) {
 729        error_setg(errp, "Got %" PRIu64 " VMState data, less than expected"
 730                    " %" PRIu64, total_size, value);
 731        return;
 732    }
 733    bioc->usage = total_size;
 734    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 735
 736    colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
 737                 &local_err);
 738    if (local_err) {
 739        error_propagate(errp, local_err);
 740        return;
 741    }
 742
 743    qemu_mutex_lock_iothread();
 744    vmstate_loading = true;
 745    colo_flush_ram_cache();
 746    ret = qemu_load_device_state(fb);
 747    if (ret < 0) {
 748        error_setg(errp, "COLO: load device state failed");
 749        vmstate_loading = false;
 750        qemu_mutex_unlock_iothread();
 751        return;
 752    }
 753
 754#ifdef CONFIG_REPLICATION
 755    replication_get_error_all(&local_err);
 756    if (local_err) {
 757        error_propagate(errp, local_err);
 758        vmstate_loading = false;
 759        qemu_mutex_unlock_iothread();
 760        return;
 761    }
 762
 763    /* discard colo disk buffer */
 764    replication_do_checkpoint_all(&local_err);
 765    if (local_err) {
 766        error_propagate(errp, local_err);
 767        vmstate_loading = false;
 768        qemu_mutex_unlock_iothread();
 769        return;
 770    }
 771#else
 772    abort();
 773#endif
 774    /* Notify all filters of all NIC to do checkpoint */
 775    colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
 776
 777    if (local_err) {
 778        error_propagate(errp, local_err);
 779        vmstate_loading = false;
 780        qemu_mutex_unlock_iothread();
 781        return;
 782    }
 783
 784    vmstate_loading = false;
 785    vm_start();
 786    qemu_mutex_unlock_iothread();
 787    trace_colo_vm_state_change("stop", "run");
 788
 789    if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 790        return;
 791    }
 792
 793    colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
 794                 &local_err);
 795    error_propagate(errp, local_err);
 796}
 797
 798static void colo_wait_handle_message(MigrationIncomingState *mis,
 799                QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
 800{
 801    COLOMessage msg;
 802    Error *local_err = NULL;
 803
 804    msg = colo_receive_message(mis->from_src_file, &local_err);
 805    if (local_err) {
 806        error_propagate(errp, local_err);
 807        return;
 808    }
 809
 810    switch (msg) {
 811    case COLO_MESSAGE_CHECKPOINT_REQUEST:
 812        colo_incoming_process_checkpoint(mis, fb, bioc, errp);
 813        break;
 814    default:
 815        error_setg(errp, "Got unknown COLO message: %d", msg);
 816        break;
 817    }
 818}
 819
 820void colo_shutdown(void)
 821{
 822    MigrationIncomingState *mis = NULL;
 823    MigrationState *s = NULL;
 824
 825    switch (get_colo_mode()) {
 826    case COLO_MODE_PRIMARY:
 827        s = migrate_get_current();
 828        qemu_event_set(&s->colo_checkpoint_event);
 829        qemu_sem_post(&s->colo_exit_sem);
 830        break;
 831    case COLO_MODE_SECONDARY:
 832        mis = migration_incoming_get_current();
 833        qemu_sem_post(&mis->colo_incoming_sem);
 834        break;
 835    default:
 836        break;
 837    }
 838}
 839
 840void *colo_process_incoming_thread(void *opaque)
 841{
 842    MigrationIncomingState *mis = opaque;
 843    QEMUFile *fb = NULL;
 844    QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
 845    Error *local_err = NULL;
 846
 847    rcu_register_thread();
 848    qemu_sem_init(&mis->colo_incoming_sem, 0);
 849
 850    migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
 851                      MIGRATION_STATUS_COLO);
 852
 853    if (get_colo_mode() != COLO_MODE_SECONDARY) {
 854        error_report("COLO mode must be COLO_MODE_SECONDARY");
 855        return NULL;
 856    }
 857
 858    failover_init_state();
 859
 860    mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
 861    if (!mis->to_src_file) {
 862        error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
 863        goto out;
 864    }
 865    /*
 866     * Note: the communication between Primary side and Secondary side
 867     * should be sequential, we set the fd to unblocked in migration incoming
 868     * coroutine, and here we are in the COLO incoming thread, so it is ok to
 869     * set the fd back to blocked.
 870     */
 871    qemu_file_set_blocking(mis->from_src_file, true);
 872
 873    colo_incoming_start_dirty_log();
 874
 875    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 876    fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
 877    object_unref(OBJECT(bioc));
 878
 879    qemu_mutex_lock_iothread();
 880#ifdef CONFIG_REPLICATION
 881    replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
 882    if (local_err) {
 883        qemu_mutex_unlock_iothread();
 884        goto out;
 885    }
 886#else
 887        abort();
 888#endif
 889    vm_start();
 890    qemu_mutex_unlock_iothread();
 891    trace_colo_vm_state_change("stop", "run");
 892
 893    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
 894                      &local_err);
 895    if (local_err) {
 896        goto out;
 897    }
 898
 899    while (mis->state == MIGRATION_STATUS_COLO) {
 900        colo_wait_handle_message(mis, fb, bioc, &local_err);
 901        if (local_err) {
 902            error_report_err(local_err);
 903            break;
 904        }
 905
 906        if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 907            failover_set_state(FAILOVER_STATUS_RELAUNCH,
 908                            FAILOVER_STATUS_NONE);
 909            failover_request_active(NULL);
 910            break;
 911        }
 912
 913        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 914            error_report("failover request");
 915            break;
 916        }
 917    }
 918
 919out:
 920    /*
 921     * There are only two reasons we can get here, some error happened
 922     * or the user triggered failover.
 923     */
 924    switch (failover_get_state()) {
 925    case FAILOVER_STATUS_COMPLETED:
 926        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 927                                  COLO_EXIT_REASON_REQUEST);
 928        break;
 929    default:
 930        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 931                                  COLO_EXIT_REASON_ERROR);
 932    }
 933
 934    if (fb) {
 935        qemu_fclose(fb);
 936    }
 937
 938    /* Hope this not to be too long to loop here */
 939    qemu_sem_wait(&mis->colo_incoming_sem);
 940    qemu_sem_destroy(&mis->colo_incoming_sem);
 941
 942    rcu_unregister_thread();
 943    return NULL;
 944}
 945