qemu/migration/colo.c
<<
>>
Prefs
   1/*
   2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
   3 * (a.k.a. Fault Tolerance or Continuous Replication)
   4 *
   5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 * Copyright (c) 2016 Intel Corporation
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or
  10 * later.  See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include "sysemu/sysemu.h"
  15#include "qapi/error.h"
  16#include "qapi/qapi-commands-migration.h"
  17#include "qemu-file-channel.h"
  18#include "migration.h"
  19#include "qemu-file.h"
  20#include "savevm.h"
  21#include "migration/colo.h"
  22#include "block.h"
  23#include "io/channel-buffer.h"
  24#include "trace.h"
  25#include "qemu/error-report.h"
  26#include "migration/failover.h"
  27#include "replication.h"
  28
  29static bool vmstate_loading;
  30
  31#define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
  32
  33bool migration_in_colo_state(void)
  34{
  35    MigrationState *s = migrate_get_current();
  36
  37    return (s->state == MIGRATION_STATUS_COLO);
  38}
  39
  40bool migration_incoming_in_colo_state(void)
  41{
  42    MigrationIncomingState *mis = migration_incoming_get_current();
  43
  44    return mis && (mis->state == MIGRATION_STATUS_COLO);
  45}
  46
  47static bool colo_runstate_is_stopped(void)
  48{
  49    return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
  50}
  51
  52static void secondary_vm_do_failover(void)
  53{
  54    int old_state;
  55    MigrationIncomingState *mis = migration_incoming_get_current();
  56
  57    /* Can not do failover during the process of VM's loading VMstate, Or
  58     * it will break the secondary VM.
  59     */
  60    if (vmstate_loading) {
  61        old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
  62                        FAILOVER_STATUS_RELAUNCH);
  63        if (old_state != FAILOVER_STATUS_ACTIVE) {
  64            error_report("Unknown error while do failover for secondary VM,"
  65                         "old_state: %s", FailoverStatus_str(old_state));
  66        }
  67        return;
  68    }
  69
  70    migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
  71                      MIGRATION_STATUS_COMPLETED);
  72
  73    if (!autostart) {
  74        error_report("\"-S\" qemu option will be ignored in secondary side");
  75        /* recover runstate to normal migration finish state */
  76        autostart = true;
  77    }
  78    /*
  79     * Make sure COLO incoming thread not block in recv or send,
  80     * If mis->from_src_file and mis->to_src_file use the same fd,
  81     * The second shutdown() will return -1, we ignore this value,
  82     * It is harmless.
  83     */
  84    if (mis->from_src_file) {
  85        qemu_file_shutdown(mis->from_src_file);
  86    }
  87    if (mis->to_src_file) {
  88        qemu_file_shutdown(mis->to_src_file);
  89    }
  90
  91    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
  92                                   FAILOVER_STATUS_COMPLETED);
  93    if (old_state != FAILOVER_STATUS_ACTIVE) {
  94        error_report("Incorrect state (%s) while doing failover for "
  95                     "secondary VM", FailoverStatus_str(old_state));
  96        return;
  97    }
  98    /* Notify COLO incoming thread that failover work is finished */
  99    qemu_sem_post(&mis->colo_incoming_sem);
 100    /* For Secondary VM, jump to incoming co */
 101    if (mis->migration_incoming_co) {
 102        qemu_coroutine_enter(mis->migration_incoming_co);
 103    }
 104}
 105
 106static void primary_vm_do_failover(void)
 107{
 108    MigrationState *s = migrate_get_current();
 109    int old_state;
 110
 111    migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
 112                      MIGRATION_STATUS_COMPLETED);
 113
 114    /*
 115     * Wake up COLO thread which may blocked in recv() or send(),
 116     * The s->rp_state.from_dst_file and s->to_dst_file may use the
 117     * same fd, but we still shutdown the fd for twice, it is harmless.
 118     */
 119    if (s->to_dst_file) {
 120        qemu_file_shutdown(s->to_dst_file);
 121    }
 122    if (s->rp_state.from_dst_file) {
 123        qemu_file_shutdown(s->rp_state.from_dst_file);
 124    }
 125
 126    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 127                                   FAILOVER_STATUS_COMPLETED);
 128    if (old_state != FAILOVER_STATUS_ACTIVE) {
 129        error_report("Incorrect state (%s) while doing failover for Primary VM",
 130                     FailoverStatus_str(old_state));
 131        return;
 132    }
 133    /* Notify COLO thread that failover work is finished */
 134    qemu_sem_post(&s->colo_exit_sem);
 135}
 136
 137void colo_do_failover(MigrationState *s)
 138{
 139    /* Make sure VM stopped while failover happened. */
 140    if (!colo_runstate_is_stopped()) {
 141        vm_stop_force_state(RUN_STATE_COLO);
 142    }
 143
 144    if (get_colo_mode() == COLO_MODE_PRIMARY) {
 145        primary_vm_do_failover();
 146    } else {
 147        secondary_vm_do_failover();
 148    }
 149}
 150
 151void qmp_xen_set_replication(bool enable, bool primary,
 152                             bool has_failover, bool failover,
 153                             Error **errp)
 154{
 155#ifdef CONFIG_REPLICATION
 156    ReplicationMode mode = primary ?
 157                           REPLICATION_MODE_PRIMARY :
 158                           REPLICATION_MODE_SECONDARY;
 159
 160    if (has_failover && enable) {
 161        error_setg(errp, "Parameter 'failover' is only for"
 162                   " stopping replication");
 163        return;
 164    }
 165
 166    if (enable) {
 167        replication_start_all(mode, errp);
 168    } else {
 169        if (!has_failover) {
 170            failover = NULL;
 171        }
 172        replication_stop_all(failover, failover ? NULL : errp);
 173    }
 174#else
 175    abort();
 176#endif
 177}
 178
 179ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
 180{
 181#ifdef CONFIG_REPLICATION
 182    Error *err = NULL;
 183    ReplicationStatus *s = g_new0(ReplicationStatus, 1);
 184
 185    replication_get_error_all(&err);
 186    if (err) {
 187        s->error = true;
 188        s->has_desc = true;
 189        s->desc = g_strdup(error_get_pretty(err));
 190    } else {
 191        s->error = false;
 192    }
 193
 194    error_free(err);
 195    return s;
 196#else
 197    abort();
 198#endif
 199}
 200
 201void qmp_xen_colo_do_checkpoint(Error **errp)
 202{
 203#ifdef CONFIG_REPLICATION
 204    replication_do_checkpoint_all(errp);
 205#else
 206    abort();
 207#endif
 208}
 209
 210static void colo_send_message(QEMUFile *f, COLOMessage msg,
 211                              Error **errp)
 212{
 213    int ret;
 214
 215    if (msg >= COLO_MESSAGE__MAX) {
 216        error_setg(errp, "%s: Invalid message", __func__);
 217        return;
 218    }
 219    qemu_put_be32(f, msg);
 220    qemu_fflush(f);
 221
 222    ret = qemu_file_get_error(f);
 223    if (ret < 0) {
 224        error_setg_errno(errp, -ret, "Can't send COLO message");
 225    }
 226    trace_colo_send_message(COLOMessage_str(msg));
 227}
 228
 229static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
 230                                    uint64_t value, Error **errp)
 231{
 232    Error *local_err = NULL;
 233    int ret;
 234
 235    colo_send_message(f, msg, &local_err);
 236    if (local_err) {
 237        error_propagate(errp, local_err);
 238        return;
 239    }
 240    qemu_put_be64(f, value);
 241    qemu_fflush(f);
 242
 243    ret = qemu_file_get_error(f);
 244    if (ret < 0) {
 245        error_setg_errno(errp, -ret, "Failed to send value for message:%s",
 246                         COLOMessage_str(msg));
 247    }
 248}
 249
 250static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
 251{
 252    COLOMessage msg;
 253    int ret;
 254
 255    msg = qemu_get_be32(f);
 256    ret = qemu_file_get_error(f);
 257    if (ret < 0) {
 258        error_setg_errno(errp, -ret, "Can't receive COLO message");
 259        return msg;
 260    }
 261    if (msg >= COLO_MESSAGE__MAX) {
 262        error_setg(errp, "%s: Invalid message", __func__);
 263        return msg;
 264    }
 265    trace_colo_receive_message(COLOMessage_str(msg));
 266    return msg;
 267}
 268
 269static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
 270                                       Error **errp)
 271{
 272    COLOMessage msg;
 273    Error *local_err = NULL;
 274
 275    msg = colo_receive_message(f, &local_err);
 276    if (local_err) {
 277        error_propagate(errp, local_err);
 278        return;
 279    }
 280    if (msg != expect_msg) {
 281        error_setg(errp, "Unexpected COLO message %d, expected %d",
 282                          msg, expect_msg);
 283    }
 284}
 285
 286static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
 287                                           Error **errp)
 288{
 289    Error *local_err = NULL;
 290    uint64_t value;
 291    int ret;
 292
 293    colo_receive_check_message(f, expect_msg, &local_err);
 294    if (local_err) {
 295        error_propagate(errp, local_err);
 296        return 0;
 297    }
 298
 299    value = qemu_get_be64(f);
 300    ret = qemu_file_get_error(f);
 301    if (ret < 0) {
 302        error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
 303                         COLOMessage_str(expect_msg));
 304    }
 305    return value;
 306}
 307
 308static int colo_do_checkpoint_transaction(MigrationState *s,
 309                                          QIOChannelBuffer *bioc,
 310                                          QEMUFile *fb)
 311{
 312    Error *local_err = NULL;
 313    int ret = -1;
 314
 315    colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
 316                      &local_err);
 317    if (local_err) {
 318        goto out;
 319    }
 320
 321    colo_receive_check_message(s->rp_state.from_dst_file,
 322                    COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
 323    if (local_err) {
 324        goto out;
 325    }
 326    /* Reset channel-buffer directly */
 327    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 328    bioc->usage = 0;
 329
 330    qemu_mutex_lock_iothread();
 331    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 332        qemu_mutex_unlock_iothread();
 333        goto out;
 334    }
 335    vm_stop_force_state(RUN_STATE_COLO);
 336    qemu_mutex_unlock_iothread();
 337    trace_colo_vm_state_change("run", "stop");
 338    /*
 339     * Failover request bh could be called after vm_stop_force_state(),
 340     * So we need check failover_request_is_active() again.
 341     */
 342    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 343        goto out;
 344    }
 345
 346    /* Disable block migration */
 347    migrate_set_block_enabled(false, &local_err);
 348    qemu_savevm_state_header(fb);
 349    qemu_savevm_state_setup(fb);
 350    qemu_mutex_lock_iothread();
 351    qemu_savevm_state_complete_precopy(fb, false, false);
 352    qemu_mutex_unlock_iothread();
 353
 354    qemu_fflush(fb);
 355
 356    colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
 357    if (local_err) {
 358        goto out;
 359    }
 360    /*
 361     * We need the size of the VMstate data in Secondary side,
 362     * With which we can decide how much data should be read.
 363     */
 364    colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
 365                            bioc->usage, &local_err);
 366    if (local_err) {
 367        goto out;
 368    }
 369
 370    qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
 371    qemu_fflush(s->to_dst_file);
 372    ret = qemu_file_get_error(s->to_dst_file);
 373    if (ret < 0) {
 374        goto out;
 375    }
 376
 377    colo_receive_check_message(s->rp_state.from_dst_file,
 378                       COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
 379    if (local_err) {
 380        goto out;
 381    }
 382
 383    colo_receive_check_message(s->rp_state.from_dst_file,
 384                       COLO_MESSAGE_VMSTATE_LOADED, &local_err);
 385    if (local_err) {
 386        goto out;
 387    }
 388
 389    ret = 0;
 390
 391    qemu_mutex_lock_iothread();
 392    vm_start();
 393    qemu_mutex_unlock_iothread();
 394    trace_colo_vm_state_change("stop", "run");
 395
 396out:
 397    if (local_err) {
 398        error_report_err(local_err);
 399    }
 400    return ret;
 401}
 402
 403static void colo_process_checkpoint(MigrationState *s)
 404{
 405    QIOChannelBuffer *bioc;
 406    QEMUFile *fb = NULL;
 407    int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 408    Error *local_err = NULL;
 409    int ret;
 410
 411    failover_init_state();
 412
 413    s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
 414    if (!s->rp_state.from_dst_file) {
 415        error_report("Open QEMUFile from_dst_file failed");
 416        goto out;
 417    }
 418
 419    /*
 420     * Wait for Secondary finish loading VM states and enter COLO
 421     * restore.
 422     */
 423    colo_receive_check_message(s->rp_state.from_dst_file,
 424                       COLO_MESSAGE_CHECKPOINT_READY, &local_err);
 425    if (local_err) {
 426        goto out;
 427    }
 428    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 429    fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
 430    object_unref(OBJECT(bioc));
 431
 432    qemu_mutex_lock_iothread();
 433    vm_start();
 434    qemu_mutex_unlock_iothread();
 435    trace_colo_vm_state_change("stop", "run");
 436
 437    timer_mod(s->colo_delay_timer,
 438            current_time + s->parameters.x_checkpoint_delay);
 439
 440    while (s->state == MIGRATION_STATUS_COLO) {
 441        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 442            error_report("failover request");
 443            goto out;
 444        }
 445
 446        qemu_sem_wait(&s->colo_checkpoint_sem);
 447
 448        ret = colo_do_checkpoint_transaction(s, bioc, fb);
 449        if (ret < 0) {
 450            goto out;
 451        }
 452    }
 453
 454out:
 455    /* Throw the unreported error message after exited from loop */
 456    if (local_err) {
 457        error_report_err(local_err);
 458    }
 459
 460    if (fb) {
 461        qemu_fclose(fb);
 462    }
 463
 464    timer_del(s->colo_delay_timer);
 465
 466    /* Hope this not to be too long to wait here */
 467    qemu_sem_wait(&s->colo_exit_sem);
 468    qemu_sem_destroy(&s->colo_exit_sem);
 469    /*
 470     * Must be called after failover BH is completed,
 471     * Or the failover BH may shutdown the wrong fd that
 472     * re-used by other threads after we release here.
 473     */
 474    if (s->rp_state.from_dst_file) {
 475        qemu_fclose(s->rp_state.from_dst_file);
 476    }
 477}
 478
 479void colo_checkpoint_notify(void *opaque)
 480{
 481    MigrationState *s = opaque;
 482    int64_t next_notify_time;
 483
 484    qemu_sem_post(&s->colo_checkpoint_sem);
 485    s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 486    next_notify_time = s->colo_checkpoint_time +
 487                    s->parameters.x_checkpoint_delay;
 488    timer_mod(s->colo_delay_timer, next_notify_time);
 489}
 490
 491void migrate_start_colo_process(MigrationState *s)
 492{
 493    qemu_mutex_unlock_iothread();
 494    qemu_sem_init(&s->colo_checkpoint_sem, 0);
 495    s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
 496                                colo_checkpoint_notify, s);
 497
 498    qemu_sem_init(&s->colo_exit_sem, 0);
 499    migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
 500                      MIGRATION_STATUS_COLO);
 501    colo_process_checkpoint(s);
 502    qemu_mutex_lock_iothread();
 503}
 504
 505static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
 506                                     Error **errp)
 507{
 508    COLOMessage msg;
 509    Error *local_err = NULL;
 510
 511    msg = colo_receive_message(f, &local_err);
 512    if (local_err) {
 513        error_propagate(errp, local_err);
 514        return;
 515    }
 516
 517    switch (msg) {
 518    case COLO_MESSAGE_CHECKPOINT_REQUEST:
 519        *checkpoint_request = 1;
 520        break;
 521    default:
 522        *checkpoint_request = 0;
 523        error_setg(errp, "Got unknown COLO message: %d", msg);
 524        break;
 525    }
 526}
 527
 528void *colo_process_incoming_thread(void *opaque)
 529{
 530    MigrationIncomingState *mis = opaque;
 531    QEMUFile *fb = NULL;
 532    QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
 533    uint64_t total_size;
 534    uint64_t value;
 535    Error *local_err = NULL;
 536
 537    qemu_sem_init(&mis->colo_incoming_sem, 0);
 538
 539    migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
 540                      MIGRATION_STATUS_COLO);
 541
 542    failover_init_state();
 543
 544    mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
 545    if (!mis->to_src_file) {
 546        error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
 547        goto out;
 548    }
 549    /*
 550     * Note: the communication between Primary side and Secondary side
 551     * should be sequential, we set the fd to unblocked in migration incoming
 552     * coroutine, and here we are in the COLO incoming thread, so it is ok to
 553     * set the fd back to blocked.
 554     */
 555    qemu_file_set_blocking(mis->from_src_file, true);
 556
 557    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 558    fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
 559    object_unref(OBJECT(bioc));
 560
 561    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
 562                      &local_err);
 563    if (local_err) {
 564        goto out;
 565    }
 566
 567    while (mis->state == MIGRATION_STATUS_COLO) {
 568        int request = 0;
 569
 570        colo_wait_handle_message(mis->from_src_file, &request, &local_err);
 571        if (local_err) {
 572            goto out;
 573        }
 574        assert(request);
 575        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 576            error_report("failover request");
 577            goto out;
 578        }
 579
 580        /* FIXME: This is unnecessary for periodic checkpoint mode */
 581        colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
 582                     &local_err);
 583        if (local_err) {
 584            goto out;
 585        }
 586
 587        colo_receive_check_message(mis->from_src_file,
 588                           COLO_MESSAGE_VMSTATE_SEND, &local_err);
 589        if (local_err) {
 590            goto out;
 591        }
 592
 593        value = colo_receive_message_value(mis->from_src_file,
 594                                 COLO_MESSAGE_VMSTATE_SIZE, &local_err);
 595        if (local_err) {
 596            goto out;
 597        }
 598
 599        /*
 600         * Read VM device state data into channel buffer,
 601         * It's better to re-use the memory allocated.
 602         * Here we need to handle the channel buffer directly.
 603         */
 604        if (value > bioc->capacity) {
 605            bioc->capacity = value;
 606            bioc->data = g_realloc(bioc->data, bioc->capacity);
 607        }
 608        total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
 609        if (total_size != value) {
 610            error_report("Got %" PRIu64 " VMState data, less than expected"
 611                        " %" PRIu64, total_size, value);
 612            goto out;
 613        }
 614        bioc->usage = total_size;
 615        qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 616
 617        colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
 618                     &local_err);
 619        if (local_err) {
 620            goto out;
 621        }
 622
 623        qemu_mutex_lock_iothread();
 624        qemu_system_reset(SHUTDOWN_CAUSE_NONE);
 625        vmstate_loading = true;
 626        if (qemu_loadvm_state(fb) < 0) {
 627            error_report("COLO: loadvm failed");
 628            qemu_mutex_unlock_iothread();
 629            goto out;
 630        }
 631
 632        vmstate_loading = false;
 633        qemu_mutex_unlock_iothread();
 634
 635        if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 636            failover_set_state(FAILOVER_STATUS_RELAUNCH,
 637                            FAILOVER_STATUS_NONE);
 638            failover_request_active(NULL);
 639            goto out;
 640        }
 641
 642        colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
 643                     &local_err);
 644        if (local_err) {
 645            goto out;
 646        }
 647    }
 648
 649out:
 650    vmstate_loading = false;
 651    /* Throw the unreported error message after exited from loop */
 652    if (local_err) {
 653        error_report_err(local_err);
 654    }
 655
 656    if (fb) {
 657        qemu_fclose(fb);
 658    }
 659
 660    /* Hope this not to be too long to loop here */
 661    qemu_sem_wait(&mis->colo_incoming_sem);
 662    qemu_sem_destroy(&mis->colo_incoming_sem);
 663    /* Must be called after failover BH is completed */
 664    if (mis->to_src_file) {
 665        qemu_fclose(mis->to_src_file);
 666    }
 667    migration_incoming_exit_colo();
 668
 669    return NULL;
 670}
 671