qemu/migration/colo.c
<<
>>
Prefs
   1/*
   2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
   3 * (a.k.a. Fault Tolerance or Continuous Replication)
   4 *
   5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 * Copyright (c) 2016 Intel Corporation
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or
  10 * later.  See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include "sysemu/sysemu.h"
  15#include "qapi/error.h"
  16#include "qapi/qapi-commands-migration.h"
  17#include "qemu-file-channel.h"
  18#include "migration.h"
  19#include "qemu-file.h"
  20#include "savevm.h"
  21#include "migration/colo.h"
  22#include "block.h"
  23#include "io/channel-buffer.h"
  24#include "trace.h"
  25#include "qemu/error-report.h"
  26#include "qemu/main-loop.h"
  27#include "qemu/rcu.h"
  28#include "migration/failover.h"
  29#include "migration/ram.h"
  30#ifdef CONFIG_REPLICATION
  31#include "replication.h"
  32#endif
  33#include "net/colo-compare.h"
  34#include "net/colo.h"
  35#include "block/block.h"
  36#include "qapi/qapi-events-migration.h"
  37#include "qapi/qmp/qerror.h"
  38#include "sysemu/cpus.h"
  39#include "sysemu/runstate.h"
  40#include "net/filter.h"
  41
  42static bool vmstate_loading;
  43static Notifier packets_compare_notifier;
  44
  45/* User need to know colo mode after COLO failover */
  46static COLOMode last_colo_mode;
  47
  48#define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
  49
  50bool migration_in_colo_state(void)
  51{
  52    MigrationState *s = migrate_get_current();
  53
  54    return (s->state == MIGRATION_STATUS_COLO);
  55}
  56
  57bool migration_incoming_in_colo_state(void)
  58{
  59    MigrationIncomingState *mis = migration_incoming_get_current();
  60
  61    return mis && (mis->state == MIGRATION_STATUS_COLO);
  62}
  63
  64static bool colo_runstate_is_stopped(void)
  65{
  66    return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
  67}
  68
  69static void secondary_vm_do_failover(void)
  70{
  71/* COLO needs enable block-replication */
  72#ifdef CONFIG_REPLICATION
  73    int old_state;
  74    MigrationIncomingState *mis = migration_incoming_get_current();
  75    Error *local_err = NULL;
  76
  77    /* Can not do failover during the process of VM's loading VMstate, Or
  78     * it will break the secondary VM.
  79     */
  80    if (vmstate_loading) {
  81        old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
  82                        FAILOVER_STATUS_RELAUNCH);
  83        if (old_state != FAILOVER_STATUS_ACTIVE) {
  84            error_report("Unknown error while do failover for secondary VM,"
  85                         "old_state: %s", FailoverStatus_str(old_state));
  86        }
  87        return;
  88    }
  89
  90    migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
  91                      MIGRATION_STATUS_COMPLETED);
  92
  93    replication_stop_all(true, &local_err);
  94    if (local_err) {
  95        error_report_err(local_err);
  96        local_err = NULL;
  97    }
  98
  99    /* Notify all filters of all NIC to do checkpoint */
 100    colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err);
 101    if (local_err) {
 102        error_report_err(local_err);
 103    }
 104
 105    if (!autostart) {
 106        error_report("\"-S\" qemu option will be ignored in secondary side");
 107        /* recover runstate to normal migration finish state */
 108        autostart = true;
 109    }
 110    /*
 111     * Make sure COLO incoming thread not block in recv or send,
 112     * If mis->from_src_file and mis->to_src_file use the same fd,
 113     * The second shutdown() will return -1, we ignore this value,
 114     * It is harmless.
 115     */
 116    if (mis->from_src_file) {
 117        qemu_file_shutdown(mis->from_src_file);
 118    }
 119    if (mis->to_src_file) {
 120        qemu_file_shutdown(mis->to_src_file);
 121    }
 122
 123    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 124                                   FAILOVER_STATUS_COMPLETED);
 125    if (old_state != FAILOVER_STATUS_ACTIVE) {
 126        error_report("Incorrect state (%s) while doing failover for "
 127                     "secondary VM", FailoverStatus_str(old_state));
 128        return;
 129    }
 130    /* Notify COLO incoming thread that failover work is finished */
 131    qemu_sem_post(&mis->colo_incoming_sem);
 132
 133    /* For Secondary VM, jump to incoming co */
 134    if (mis->migration_incoming_co) {
 135        qemu_coroutine_enter(mis->migration_incoming_co);
 136    }
 137#else
 138    abort();
 139#endif
 140}
 141
 142static void primary_vm_do_failover(void)
 143{
 144#ifdef CONFIG_REPLICATION
 145    MigrationState *s = migrate_get_current();
 146    int old_state;
 147    Error *local_err = NULL;
 148
 149    migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
 150                      MIGRATION_STATUS_COMPLETED);
 151    /*
 152     * kick COLO thread which might wait at
 153     * qemu_sem_wait(&s->colo_checkpoint_sem).
 154     */
 155    colo_checkpoint_notify(migrate_get_current());
 156
 157    /*
 158     * Wake up COLO thread which may blocked in recv() or send(),
 159     * The s->rp_state.from_dst_file and s->to_dst_file may use the
 160     * same fd, but we still shutdown the fd for twice, it is harmless.
 161     */
 162    if (s->to_dst_file) {
 163        qemu_file_shutdown(s->to_dst_file);
 164    }
 165    if (s->rp_state.from_dst_file) {
 166        qemu_file_shutdown(s->rp_state.from_dst_file);
 167    }
 168
 169    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 170                                   FAILOVER_STATUS_COMPLETED);
 171    if (old_state != FAILOVER_STATUS_ACTIVE) {
 172        error_report("Incorrect state (%s) while doing failover for Primary VM",
 173                     FailoverStatus_str(old_state));
 174        return;
 175    }
 176
 177    replication_stop_all(true, &local_err);
 178    if (local_err) {
 179        error_report_err(local_err);
 180        local_err = NULL;
 181    }
 182
 183    /* Notify COLO thread that failover work is finished */
 184    qemu_sem_post(&s->colo_exit_sem);
 185#else
 186    abort();
 187#endif
 188}
 189
 190COLOMode get_colo_mode(void)
 191{
 192    if (migration_in_colo_state()) {
 193        return COLO_MODE_PRIMARY;
 194    } else if (migration_incoming_in_colo_state()) {
 195        return COLO_MODE_SECONDARY;
 196    } else {
 197        return COLO_MODE_NONE;
 198    }
 199}
 200
 201void colo_do_failover(void)
 202{
 203    /* Make sure VM stopped while failover happened. */
 204    if (!colo_runstate_is_stopped()) {
 205        vm_stop_force_state(RUN_STATE_COLO);
 206    }
 207
 208    switch (get_colo_mode()) {
 209    case COLO_MODE_PRIMARY:
 210        primary_vm_do_failover();
 211        break;
 212    case COLO_MODE_SECONDARY:
 213        secondary_vm_do_failover();
 214        break;
 215    default:
 216        error_report("colo_do_failover failed because the colo mode"
 217                     " could not be obtained");
 218    }
 219}
 220
 221#ifdef CONFIG_REPLICATION
 222void qmp_xen_set_replication(bool enable, bool primary,
 223                             bool has_failover, bool failover,
 224                             Error **errp)
 225{
 226    ReplicationMode mode = primary ?
 227                           REPLICATION_MODE_PRIMARY :
 228                           REPLICATION_MODE_SECONDARY;
 229
 230    if (has_failover && enable) {
 231        error_setg(errp, "Parameter 'failover' is only for"
 232                   " stopping replication");
 233        return;
 234    }
 235
 236    if (enable) {
 237        replication_start_all(mode, errp);
 238    } else {
 239        if (!has_failover) {
 240            failover = NULL;
 241        }
 242        replication_stop_all(failover, failover ? NULL : errp);
 243    }
 244}
 245
 246ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
 247{
 248    Error *err = NULL;
 249    ReplicationStatus *s = g_new0(ReplicationStatus, 1);
 250
 251    replication_get_error_all(&err);
 252    if (err) {
 253        s->error = true;
 254        s->has_desc = true;
 255        s->desc = g_strdup(error_get_pretty(err));
 256    } else {
 257        s->error = false;
 258    }
 259
 260    error_free(err);
 261    return s;
 262}
 263
 264void qmp_xen_colo_do_checkpoint(Error **errp)
 265{
 266    Error *err = NULL;
 267
 268    replication_do_checkpoint_all(&err);
 269    if (err) {
 270        error_propagate(errp, err);
 271        return;
 272    }
 273    /* Notify all filters of all NIC to do checkpoint */
 274    colo_notify_filters_event(COLO_EVENT_CHECKPOINT, errp);
 275}
 276#endif
 277
 278COLOStatus *qmp_query_colo_status(Error **errp)
 279{
 280    COLOStatus *s = g_new0(COLOStatus, 1);
 281
 282    s->mode = get_colo_mode();
 283    s->last_mode = last_colo_mode;
 284
 285    switch (failover_get_state()) {
 286    case FAILOVER_STATUS_NONE:
 287        s->reason = COLO_EXIT_REASON_NONE;
 288        break;
 289    case FAILOVER_STATUS_COMPLETED:
 290        s->reason = COLO_EXIT_REASON_REQUEST;
 291        break;
 292    default:
 293        if (migration_in_colo_state()) {
 294            s->reason = COLO_EXIT_REASON_PROCESSING;
 295        } else {
 296            s->reason = COLO_EXIT_REASON_ERROR;
 297        }
 298    }
 299
 300    return s;
 301}
 302
 303static void colo_send_message(QEMUFile *f, COLOMessage msg,
 304                              Error **errp)
 305{
 306    int ret;
 307
 308    if (msg >= COLO_MESSAGE__MAX) {
 309        error_setg(errp, "%s: Invalid message", __func__);
 310        return;
 311    }
 312    qemu_put_be32(f, msg);
 313    qemu_fflush(f);
 314
 315    ret = qemu_file_get_error(f);
 316    if (ret < 0) {
 317        error_setg_errno(errp, -ret, "Can't send COLO message");
 318    }
 319    trace_colo_send_message(COLOMessage_str(msg));
 320}
 321
 322static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
 323                                    uint64_t value, Error **errp)
 324{
 325    Error *local_err = NULL;
 326    int ret;
 327
 328    colo_send_message(f, msg, &local_err);
 329    if (local_err) {
 330        error_propagate(errp, local_err);
 331        return;
 332    }
 333    qemu_put_be64(f, value);
 334    qemu_fflush(f);
 335
 336    ret = qemu_file_get_error(f);
 337    if (ret < 0) {
 338        error_setg_errno(errp, -ret, "Failed to send value for message:%s",
 339                         COLOMessage_str(msg));
 340    }
 341}
 342
 343static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
 344{
 345    COLOMessage msg;
 346    int ret;
 347
 348    msg = qemu_get_be32(f);
 349    ret = qemu_file_get_error(f);
 350    if (ret < 0) {
 351        error_setg_errno(errp, -ret, "Can't receive COLO message");
 352        return msg;
 353    }
 354    if (msg >= COLO_MESSAGE__MAX) {
 355        error_setg(errp, "%s: Invalid message", __func__);
 356        return msg;
 357    }
 358    trace_colo_receive_message(COLOMessage_str(msg));
 359    return msg;
 360}
 361
 362static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
 363                                       Error **errp)
 364{
 365    COLOMessage msg;
 366    Error *local_err = NULL;
 367
 368    msg = colo_receive_message(f, &local_err);
 369    if (local_err) {
 370        error_propagate(errp, local_err);
 371        return;
 372    }
 373    if (msg != expect_msg) {
 374        error_setg(errp, "Unexpected COLO message %d, expected %d",
 375                          msg, expect_msg);
 376    }
 377}
 378
 379static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
 380                                           Error **errp)
 381{
 382    Error *local_err = NULL;
 383    uint64_t value;
 384    int ret;
 385
 386    colo_receive_check_message(f, expect_msg, &local_err);
 387    if (local_err) {
 388        error_propagate(errp, local_err);
 389        return 0;
 390    }
 391
 392    value = qemu_get_be64(f);
 393    ret = qemu_file_get_error(f);
 394    if (ret < 0) {
 395        error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
 396                         COLOMessage_str(expect_msg));
 397    }
 398    return value;
 399}
 400
 401static int colo_do_checkpoint_transaction(MigrationState *s,
 402                                          QIOChannelBuffer *bioc,
 403                                          QEMUFile *fb)
 404{
 405    Error *local_err = NULL;
 406    int ret = -1;
 407
 408    colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
 409                      &local_err);
 410    if (local_err) {
 411        goto out;
 412    }
 413
 414    colo_receive_check_message(s->rp_state.from_dst_file,
 415                    COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
 416    if (local_err) {
 417        goto out;
 418    }
 419    /* Reset channel-buffer directly */
 420    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 421    bioc->usage = 0;
 422
 423    qemu_mutex_lock_iothread();
 424    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 425        qemu_mutex_unlock_iothread();
 426        goto out;
 427    }
 428    vm_stop_force_state(RUN_STATE_COLO);
 429    qemu_mutex_unlock_iothread();
 430    trace_colo_vm_state_change("run", "stop");
 431    /*
 432     * Failover request bh could be called after vm_stop_force_state(),
 433     * So we need check failover_request_is_active() again.
 434     */
 435    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 436        goto out;
 437    }
 438
 439    /* Disable block migration */
 440    migrate_set_block_enabled(false, &local_err);
 441    if (local_err) {
 442        goto out;
 443    }
 444    qemu_mutex_lock_iothread();
 445
 446#ifdef CONFIG_REPLICATION
 447    replication_do_checkpoint_all(&local_err);
 448    if (local_err) {
 449        qemu_mutex_unlock_iothread();
 450        goto out;
 451    }
 452#else
 453        abort();
 454#endif
 455
 456    colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
 457    if (local_err) {
 458        qemu_mutex_unlock_iothread();
 459        goto out;
 460    }
 461    /* Note: device state is saved into buffer */
 462    ret = qemu_save_device_state(fb);
 463
 464    qemu_mutex_unlock_iothread();
 465    if (ret < 0) {
 466        goto out;
 467    }
 468    /*
 469     * Only save VM's live state, which not including device state.
 470     * TODO: We may need a timeout mechanism to prevent COLO process
 471     * to be blocked here.
 472     */
 473    qemu_savevm_live_state(s->to_dst_file);
 474
 475    qemu_fflush(fb);
 476
 477    /*
 478     * We need the size of the VMstate data in Secondary side,
 479     * With which we can decide how much data should be read.
 480     */
 481    colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
 482                            bioc->usage, &local_err);
 483    if (local_err) {
 484        goto out;
 485    }
 486
 487    qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
 488    qemu_fflush(s->to_dst_file);
 489    ret = qemu_file_get_error(s->to_dst_file);
 490    if (ret < 0) {
 491        goto out;
 492    }
 493
 494    colo_receive_check_message(s->rp_state.from_dst_file,
 495                       COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
 496    if (local_err) {
 497        goto out;
 498    }
 499
 500    qemu_event_reset(&s->colo_checkpoint_event);
 501    colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err);
 502    if (local_err) {
 503        goto out;
 504    }
 505
 506    colo_receive_check_message(s->rp_state.from_dst_file,
 507                       COLO_MESSAGE_VMSTATE_LOADED, &local_err);
 508    if (local_err) {
 509        goto out;
 510    }
 511
 512    ret = 0;
 513
 514    qemu_mutex_lock_iothread();
 515    vm_start();
 516    qemu_mutex_unlock_iothread();
 517    trace_colo_vm_state_change("stop", "run");
 518
 519out:
 520    if (local_err) {
 521        error_report_err(local_err);
 522    }
 523    return ret;
 524}
 525
 526static void colo_compare_notify_checkpoint(Notifier *notifier, void *data)
 527{
 528    colo_checkpoint_notify(data);
 529}
 530
 531static void colo_process_checkpoint(MigrationState *s)
 532{
 533    QIOChannelBuffer *bioc;
 534    QEMUFile *fb = NULL;
 535    int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 536    Error *local_err = NULL;
 537    int ret;
 538
 539    last_colo_mode = get_colo_mode();
 540    if (last_colo_mode != COLO_MODE_PRIMARY) {
 541        error_report("COLO mode must be COLO_MODE_PRIMARY");
 542        return;
 543    }
 544
 545    failover_init_state();
 546
 547    s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
 548    if (!s->rp_state.from_dst_file) {
 549        error_report("Open QEMUFile from_dst_file failed");
 550        goto out;
 551    }
 552
 553    packets_compare_notifier.notify = colo_compare_notify_checkpoint;
 554    colo_compare_register_notifier(&packets_compare_notifier);
 555
 556    /*
 557     * Wait for Secondary finish loading VM states and enter COLO
 558     * restore.
 559     */
 560    colo_receive_check_message(s->rp_state.from_dst_file,
 561                       COLO_MESSAGE_CHECKPOINT_READY, &local_err);
 562    if (local_err) {
 563        goto out;
 564    }
 565    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 566    fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
 567    object_unref(OBJECT(bioc));
 568
 569    qemu_mutex_lock_iothread();
 570#ifdef CONFIG_REPLICATION
 571    replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
 572    if (local_err) {
 573        qemu_mutex_unlock_iothread();
 574        goto out;
 575    }
 576#else
 577        abort();
 578#endif
 579
 580    vm_start();
 581    qemu_mutex_unlock_iothread();
 582    trace_colo_vm_state_change("stop", "run");
 583
 584    timer_mod(s->colo_delay_timer,
 585            current_time + s->parameters.x_checkpoint_delay);
 586
 587    while (s->state == MIGRATION_STATUS_COLO) {
 588        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 589            error_report("failover request");
 590            goto out;
 591        }
 592
 593        qemu_event_wait(&s->colo_checkpoint_event);
 594
 595        if (s->state != MIGRATION_STATUS_COLO) {
 596            goto out;
 597        }
 598        ret = colo_do_checkpoint_transaction(s, bioc, fb);
 599        if (ret < 0) {
 600            goto out;
 601        }
 602    }
 603
 604out:
 605    /* Throw the unreported error message after exited from loop */
 606    if (local_err) {
 607        error_report_err(local_err);
 608    }
 609
 610    if (fb) {
 611        qemu_fclose(fb);
 612    }
 613
 614    /*
 615     * There are only two reasons we can get here, some error happened
 616     * or the user triggered failover.
 617     */
 618    switch (failover_get_state()) {
 619    case FAILOVER_STATUS_COMPLETED:
 620        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 621                                  COLO_EXIT_REASON_REQUEST);
 622        break;
 623    default:
 624        qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
 625                                  COLO_EXIT_REASON_ERROR);
 626    }
 627
 628    /* Hope this not to be too long to wait here */
 629    qemu_sem_wait(&s->colo_exit_sem);
 630    qemu_sem_destroy(&s->colo_exit_sem);
 631
 632    /*
 633     * It is safe to unregister notifier after failover finished.
 634     * Besides, colo_delay_timer and colo_checkpoint_sem can't be
 635     * released before unregister notifier, or there will be use-after-free
 636     * error.
 637     */
 638    colo_compare_unregister_notifier(&packets_compare_notifier);
 639    timer_free(s->colo_delay_timer);
 640    qemu_event_destroy(&s->colo_checkpoint_event);
 641
 642    /*
 643     * Must be called after failover BH is completed,
 644     * Or the failover BH may shutdown the wrong fd that
 645     * re-used by other threads after we release here.
 646     */
 647    if (s->rp_state.from_dst_file) {
 648        qemu_fclose(s->rp_state.from_dst_file);
 649    }
 650}
 651
 652void colo_checkpoint_notify(void *opaque)
 653{
 654    MigrationState *s = opaque;
 655    int64_t next_notify_time;
 656
 657    qemu_event_set(&s->colo_checkpoint_event);
 658    s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 659    next_notify_time = s->colo_checkpoint_time +
 660                    s->parameters.x_checkpoint_delay;
 661    timer_mod(s->colo_delay_timer, next_notify_time);
 662}
 663
 664void migrate_start_colo_process(MigrationState *s)
 665{
 666    qemu_mutex_unlock_iothread();
 667    qemu_event_init(&s->colo_checkpoint_event, false);
 668    s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
 669                                colo_checkpoint_notify, s);
 670
 671    qemu_sem_init(&s->colo_exit_sem, 0);
 672    migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
 673                      MIGRATION_STATUS_COLO);
 674    colo_process_checkpoint(s);
 675    qemu_mutex_lock_iothread();
 676}
 677
 678static void colo_incoming_process_checkpoint(MigrationIncomingState *mis,
 679                      QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
 680{
 681    uint64_t total_size;
 682    uint64_t value;
 683    Error *local_err = NULL;
 684    int ret;
 685
 686    qemu_mutex_lock_iothread();
 687    vm_stop_force_state(RUN_STATE_COLO);
 688    trace_colo_vm_state_change("run", "stop");
 689    qemu_mutex_unlock_iothread();
 690
 691    /* FIXME: This is unnecessary for periodic checkpoint mode */
 692    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
 693                 &local_err);
 694    if (local_err) {
 695        error_propagate(errp, local_err);
 696        return;
 697    }
 698
 699    colo_receive_check_message(mis->from_src_file,
 700                       COLO_MESSAGE_VMSTATE_SEND, &local_err);
 701    if (local_err) {
 702        error_propagate(errp, local_err);
 703        return;
 704    }
 705
 706    qemu_mutex_lock_iothread();
 707    cpu_synchronize_all_states();
 708    ret = qemu_loadvm_state_main(mis->from_src_file, mis);
 709    qemu_mutex_unlock_iothread();
 710
 711    if (ret < 0) {
 712        error_setg(errp, "Load VM's live state (ram) error");
 713        return;
 714    }
 715
 716    value = colo_receive_message_value(mis->from_src_file,
 717                             COLO_MESSAGE_VMSTATE_SIZE, &local_err);
 718    if (local_err) {
 719        error_propagate(errp, local_err);
 720        return;
 721    }
 722
 723    /*
 724     * Read VM device state data into channel buffer,
 725     * It's better to re-use the memory allocated.
 726     * Here we need to handle the channel buffer directly.
 727     */
 728    if (value > bioc->capacity) {
 729        bioc->capacity = value;
 730        bioc->data = g_realloc(bioc->data, bioc->capacity);
 731    }
 732    total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
 733    if (total_size != value) {
 734        error_setg(errp, "Got %" PRIu64 " VMState data, less than expected"
 735                    " %" PRIu64, total_size, value);
 736        return;
 737    }
 738    bioc->usage = total_size;
 739    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 740
 741    colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
 742                 &local_err);
 743    if (local_err) {
 744        error_propagate(errp, local_err);
 745        return;
 746    }
 747
 748    qemu_mutex_lock_iothread();
 749    vmstate_loading = true;
 750    colo_flush_ram_cache();
 751    ret = qemu_load_device_state(fb);
 752    if (ret < 0) {
 753        error_setg(errp, "COLO: load device state failed");
 754        vmstate_loading = false;
 755        qemu_mutex_unlock_iothread();
 756        return;
 757    }
 758
 759#ifdef CONFIG_REPLICATION
 760    replication_get_error_all(&local_err);
 761    if (local_err) {
 762        error_propagate(errp, local_err);
 763        vmstate_loading = false;
 764        qemu_mutex_unlock_iothread();
 765        return;
 766    }
 767
 768    /* discard colo disk buffer */
 769    replication_do_checkpoint_all(&local_err);
 770    if (local_err) {
 771        error_propagate(errp, local_err);
 772        vmstate_loading = false;
 773        qemu_mutex_unlock_iothread();
 774        return;
 775    }
 776#else
 777    abort();
 778#endif
 779    /* Notify all filters of all NIC to do checkpoint */
 780    colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
 781
 782    if (local_err) {
 783        error_propagate(errp, local_err);
 784        vmstate_loading = false;
 785        qemu_mutex_unlock_iothread();
 786        return;
 787    }
 788
 789    vmstate_loading = false;
 790    vm_start();
 791    trace_colo_vm_state_change("stop", "run");
 792    qemu_mutex_unlock_iothread();
 793
 794    if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 795        return;
 796    }
 797
 798    colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
 799                 &local_err);
 800    error_propagate(errp, local_err);
 801}
 802
 803static void colo_wait_handle_message(MigrationIncomingState *mis,
 804                QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
 805{
 806    COLOMessage msg;
 807    Error *local_err = NULL;
 808
 809    msg = colo_receive_message(mis->from_src_file, &local_err);
 810    if (local_err) {
 811        error_propagate(errp, local_err);
 812        return;
 813    }
 814
 815    switch (msg) {
 816    case COLO_MESSAGE_CHECKPOINT_REQUEST:
 817        colo_incoming_process_checkpoint(mis, fb, bioc, errp);
 818        break;
 819    default:
 820        error_setg(errp, "Got unknown COLO message: %d", msg);
 821        break;
 822    }
 823}
 824
 825void *colo_process_incoming_thread(void *opaque)
 826{
 827    MigrationIncomingState *mis = opaque;
 828    QEMUFile *fb = NULL;
 829    QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
 830    Error *local_err = NULL;
 831
 832    rcu_register_thread();
 833    qemu_sem_init(&mis->colo_incoming_sem, 0);
 834
 835    migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
 836                      MIGRATION_STATUS_COLO);
 837
 838    last_colo_mode = get_colo_mode();
 839    if (last_colo_mode != COLO_MODE_SECONDARY) {
 840        error_report("COLO mode must be COLO_MODE_SECONDARY");
 841        return NULL;
 842    }
 843
 844    failover_init_state();
 845
 846    mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
 847    if (!mis->to_src_file) {
 848        error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
 849        goto out;
 850    }
 851    /*
 852     * Note: the communication between Primary side and Secondary side
 853     * should be sequential, we set the fd to unblocked in migration incoming
 854     * coroutine, and here we are in the COLO incoming thread, so it is ok to
 855     * set the fd back to blocked.
 856     */
 857    qemu_file_set_blocking(mis->from_src_file, true);
 858
 859    colo_incoming_start_dirty_log();
 860
 861    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 862    fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
 863    object_unref(OBJECT(bioc));
 864
 865    qemu_mutex_lock_iothread();
 866#ifdef CONFIG_REPLICATION
 867    replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
 868    if (local_err) {
 869        qemu_mutex_unlock_iothread();
 870        goto out;
 871    }
 872#else
 873        abort();
 874#endif
 875    vm_start();
 876    trace_colo_vm_state_change("stop", "run");
 877    qemu_mutex_unlock_iothread();
 878
 879    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
 880                      &local_err);
 881    if (local_err) {
 882        goto out;
 883    }
 884
 885    while (mis->state == MIGRATION_STATUS_COLO) {
 886        colo_wait_handle_message(mis, fb, bioc, &local_err);
 887        if (local_err) {
 888            error_report_err(local_err);
 889            break;
 890        }
 891
 892        if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 893            failover_set_state(FAILOVER_STATUS_RELAUNCH,
 894                            FAILOVER_STATUS_NONE);
 895            failover_request_active(NULL);
 896            break;
 897        }
 898
 899        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 900            error_report("failover request");
 901            break;
 902        }
 903    }
 904
 905out:
 906    /*
 907     * There are only two reasons we can get here, some error happened
 908     * or the user triggered failover.
 909     */
 910    switch (failover_get_state()) {
 911    case FAILOVER_STATUS_COMPLETED:
 912        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 913                                  COLO_EXIT_REASON_REQUEST);
 914        break;
 915    default:
 916        qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
 917                                  COLO_EXIT_REASON_ERROR);
 918    }
 919
 920    if (fb) {
 921        qemu_fclose(fb);
 922    }
 923
 924    /* Hope this not to be too long to loop here */
 925    qemu_sem_wait(&mis->colo_incoming_sem);
 926    qemu_sem_destroy(&mis->colo_incoming_sem);
 927    /* Must be called after failover BH is completed */
 928    if (mis->to_src_file) {
 929        qemu_fclose(mis->to_src_file);
 930        mis->to_src_file = NULL;
 931    }
 932
 933    rcu_unregister_thread();
 934    return NULL;
 935}
 936