qemu/migration/colo.c
<<
>>
Prefs
   1/*
   2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
   3 * (a.k.a. Fault Tolerance or Continuous Replication)
   4 *
   5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 * Copyright (c) 2016 Intel Corporation
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or
  10 * later.  See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include "sysemu/sysemu.h"
  15#include "qemu-file-channel.h"
  16#include "migration.h"
  17#include "qemu-file.h"
  18#include "savevm.h"
  19#include "migration/colo.h"
  20#include "block.h"
  21#include "io/channel-buffer.h"
  22#include "trace.h"
  23#include "qemu/error-report.h"
  24#include "migration/failover.h"
  25#include "replication.h"
  26#include "qmp-commands.h"
  27
  28static bool vmstate_loading;
  29
  30#define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
  31
  32bool migration_in_colo_state(void)
  33{
  34    MigrationState *s = migrate_get_current();
  35
  36    return (s->state == MIGRATION_STATUS_COLO);
  37}
  38
  39bool migration_incoming_in_colo_state(void)
  40{
  41    MigrationIncomingState *mis = migration_incoming_get_current();
  42
  43    return mis && (mis->state == MIGRATION_STATUS_COLO);
  44}
  45
  46static bool colo_runstate_is_stopped(void)
  47{
  48    return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
  49}
  50
  51static void secondary_vm_do_failover(void)
  52{
  53    int old_state;
  54    MigrationIncomingState *mis = migration_incoming_get_current();
  55
  56    /* Can not do failover during the process of VM's loading VMstate, Or
  57     * it will break the secondary VM.
  58     */
  59    if (vmstate_loading) {
  60        old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
  61                        FAILOVER_STATUS_RELAUNCH);
  62        if (old_state != FAILOVER_STATUS_ACTIVE) {
  63            error_report("Unknown error while do failover for secondary VM,"
  64                         "old_state: %s", FailoverStatus_str(old_state));
  65        }
  66        return;
  67    }
  68
  69    migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
  70                      MIGRATION_STATUS_COMPLETED);
  71
  72    if (!autostart) {
  73        error_report("\"-S\" qemu option will be ignored in secondary side");
  74        /* recover runstate to normal migration finish state */
  75        autostart = true;
  76    }
  77    /*
  78     * Make sure COLO incoming thread not block in recv or send,
  79     * If mis->from_src_file and mis->to_src_file use the same fd,
  80     * The second shutdown() will return -1, we ignore this value,
  81     * It is harmless.
  82     */
  83    if (mis->from_src_file) {
  84        qemu_file_shutdown(mis->from_src_file);
  85    }
  86    if (mis->to_src_file) {
  87        qemu_file_shutdown(mis->to_src_file);
  88    }
  89
  90    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
  91                                   FAILOVER_STATUS_COMPLETED);
  92    if (old_state != FAILOVER_STATUS_ACTIVE) {
  93        error_report("Incorrect state (%s) while doing failover for "
  94                     "secondary VM", FailoverStatus_str(old_state));
  95        return;
  96    }
  97    /* Notify COLO incoming thread that failover work is finished */
  98    qemu_sem_post(&mis->colo_incoming_sem);
  99    /* For Secondary VM, jump to incoming co */
 100    if (mis->migration_incoming_co) {
 101        qemu_coroutine_enter(mis->migration_incoming_co);
 102    }
 103}
 104
 105static void primary_vm_do_failover(void)
 106{
 107    MigrationState *s = migrate_get_current();
 108    int old_state;
 109
 110    migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
 111                      MIGRATION_STATUS_COMPLETED);
 112
 113    /*
 114     * Wake up COLO thread which may blocked in recv() or send(),
 115     * The s->rp_state.from_dst_file and s->to_dst_file may use the
 116     * same fd, but we still shutdown the fd for twice, it is harmless.
 117     */
 118    if (s->to_dst_file) {
 119        qemu_file_shutdown(s->to_dst_file);
 120    }
 121    if (s->rp_state.from_dst_file) {
 122        qemu_file_shutdown(s->rp_state.from_dst_file);
 123    }
 124
 125    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
 126                                   FAILOVER_STATUS_COMPLETED);
 127    if (old_state != FAILOVER_STATUS_ACTIVE) {
 128        error_report("Incorrect state (%s) while doing failover for Primary VM",
 129                     FailoverStatus_str(old_state));
 130        return;
 131    }
 132    /* Notify COLO thread that failover work is finished */
 133    qemu_sem_post(&s->colo_exit_sem);
 134}
 135
 136void colo_do_failover(MigrationState *s)
 137{
 138    /* Make sure VM stopped while failover happened. */
 139    if (!colo_runstate_is_stopped()) {
 140        vm_stop_force_state(RUN_STATE_COLO);
 141    }
 142
 143    if (get_colo_mode() == COLO_MODE_PRIMARY) {
 144        primary_vm_do_failover();
 145    } else {
 146        secondary_vm_do_failover();
 147    }
 148}
 149
 150void qmp_xen_set_replication(bool enable, bool primary,
 151                             bool has_failover, bool failover,
 152                             Error **errp)
 153{
 154#ifdef CONFIG_REPLICATION
 155    ReplicationMode mode = primary ?
 156                           REPLICATION_MODE_PRIMARY :
 157                           REPLICATION_MODE_SECONDARY;
 158
 159    if (has_failover && enable) {
 160        error_setg(errp, "Parameter 'failover' is only for"
 161                   " stopping replication");
 162        return;
 163    }
 164
 165    if (enable) {
 166        replication_start_all(mode, errp);
 167    } else {
 168        if (!has_failover) {
 169            failover = NULL;
 170        }
 171        replication_stop_all(failover, failover ? NULL : errp);
 172    }
 173#else
 174    abort();
 175#endif
 176}
 177
 178ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
 179{
 180#ifdef CONFIG_REPLICATION
 181    Error *err = NULL;
 182    ReplicationStatus *s = g_new0(ReplicationStatus, 1);
 183
 184    replication_get_error_all(&err);
 185    if (err) {
 186        s->error = true;
 187        s->has_desc = true;
 188        s->desc = g_strdup(error_get_pretty(err));
 189    } else {
 190        s->error = false;
 191    }
 192
 193    error_free(err);
 194    return s;
 195#else
 196    abort();
 197#endif
 198}
 199
 200void qmp_xen_colo_do_checkpoint(Error **errp)
 201{
 202#ifdef CONFIG_REPLICATION
 203    replication_do_checkpoint_all(errp);
 204#else
 205    abort();
 206#endif
 207}
 208
 209static void colo_send_message(QEMUFile *f, COLOMessage msg,
 210                              Error **errp)
 211{
 212    int ret;
 213
 214    if (msg >= COLO_MESSAGE__MAX) {
 215        error_setg(errp, "%s: Invalid message", __func__);
 216        return;
 217    }
 218    qemu_put_be32(f, msg);
 219    qemu_fflush(f);
 220
 221    ret = qemu_file_get_error(f);
 222    if (ret < 0) {
 223        error_setg_errno(errp, -ret, "Can't send COLO message");
 224    }
 225    trace_colo_send_message(COLOMessage_str(msg));
 226}
 227
 228static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
 229                                    uint64_t value, Error **errp)
 230{
 231    Error *local_err = NULL;
 232    int ret;
 233
 234    colo_send_message(f, msg, &local_err);
 235    if (local_err) {
 236        error_propagate(errp, local_err);
 237        return;
 238    }
 239    qemu_put_be64(f, value);
 240    qemu_fflush(f);
 241
 242    ret = qemu_file_get_error(f);
 243    if (ret < 0) {
 244        error_setg_errno(errp, -ret, "Failed to send value for message:%s",
 245                         COLOMessage_str(msg));
 246    }
 247}
 248
 249static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
 250{
 251    COLOMessage msg;
 252    int ret;
 253
 254    msg = qemu_get_be32(f);
 255    ret = qemu_file_get_error(f);
 256    if (ret < 0) {
 257        error_setg_errno(errp, -ret, "Can't receive COLO message");
 258        return msg;
 259    }
 260    if (msg >= COLO_MESSAGE__MAX) {
 261        error_setg(errp, "%s: Invalid message", __func__);
 262        return msg;
 263    }
 264    trace_colo_receive_message(COLOMessage_str(msg));
 265    return msg;
 266}
 267
 268static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
 269                                       Error **errp)
 270{
 271    COLOMessage msg;
 272    Error *local_err = NULL;
 273
 274    msg = colo_receive_message(f, &local_err);
 275    if (local_err) {
 276        error_propagate(errp, local_err);
 277        return;
 278    }
 279    if (msg != expect_msg) {
 280        error_setg(errp, "Unexpected COLO message %d, expected %d",
 281                          msg, expect_msg);
 282    }
 283}
 284
 285static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
 286                                           Error **errp)
 287{
 288    Error *local_err = NULL;
 289    uint64_t value;
 290    int ret;
 291
 292    colo_receive_check_message(f, expect_msg, &local_err);
 293    if (local_err) {
 294        error_propagate(errp, local_err);
 295        return 0;
 296    }
 297
 298    value = qemu_get_be64(f);
 299    ret = qemu_file_get_error(f);
 300    if (ret < 0) {
 301        error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
 302                         COLOMessage_str(expect_msg));
 303    }
 304    return value;
 305}
 306
 307static int colo_do_checkpoint_transaction(MigrationState *s,
 308                                          QIOChannelBuffer *bioc,
 309                                          QEMUFile *fb)
 310{
 311    Error *local_err = NULL;
 312    int ret = -1;
 313
 314    colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
 315                      &local_err);
 316    if (local_err) {
 317        goto out;
 318    }
 319
 320    colo_receive_check_message(s->rp_state.from_dst_file,
 321                    COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
 322    if (local_err) {
 323        goto out;
 324    }
 325    /* Reset channel-buffer directly */
 326    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 327    bioc->usage = 0;
 328
 329    qemu_mutex_lock_iothread();
 330    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 331        qemu_mutex_unlock_iothread();
 332        goto out;
 333    }
 334    vm_stop_force_state(RUN_STATE_COLO);
 335    qemu_mutex_unlock_iothread();
 336    trace_colo_vm_state_change("run", "stop");
 337    /*
 338     * Failover request bh could be called after vm_stop_force_state(),
 339     * So we need check failover_request_is_active() again.
 340     */
 341    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 342        goto out;
 343    }
 344
 345    /* Disable block migration */
 346    migrate_set_block_enabled(false, &local_err);
 347    qemu_savevm_state_header(fb);
 348    qemu_savevm_state_setup(fb);
 349    qemu_mutex_lock_iothread();
 350    qemu_savevm_state_complete_precopy(fb, false, false);
 351    qemu_mutex_unlock_iothread();
 352
 353    qemu_fflush(fb);
 354
 355    colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
 356    if (local_err) {
 357        goto out;
 358    }
 359    /*
 360     * We need the size of the VMstate data in Secondary side,
 361     * With which we can decide how much data should be read.
 362     */
 363    colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
 364                            bioc->usage, &local_err);
 365    if (local_err) {
 366        goto out;
 367    }
 368
 369    qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
 370    qemu_fflush(s->to_dst_file);
 371    ret = qemu_file_get_error(s->to_dst_file);
 372    if (ret < 0) {
 373        goto out;
 374    }
 375
 376    colo_receive_check_message(s->rp_state.from_dst_file,
 377                       COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
 378    if (local_err) {
 379        goto out;
 380    }
 381
 382    colo_receive_check_message(s->rp_state.from_dst_file,
 383                       COLO_MESSAGE_VMSTATE_LOADED, &local_err);
 384    if (local_err) {
 385        goto out;
 386    }
 387
 388    ret = 0;
 389
 390    qemu_mutex_lock_iothread();
 391    vm_start();
 392    qemu_mutex_unlock_iothread();
 393    trace_colo_vm_state_change("stop", "run");
 394
 395out:
 396    if (local_err) {
 397        error_report_err(local_err);
 398    }
 399    return ret;
 400}
 401
 402static void colo_process_checkpoint(MigrationState *s)
 403{
 404    QIOChannelBuffer *bioc;
 405    QEMUFile *fb = NULL;
 406    int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 407    Error *local_err = NULL;
 408    int ret;
 409
 410    failover_init_state();
 411
 412    s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
 413    if (!s->rp_state.from_dst_file) {
 414        error_report("Open QEMUFile from_dst_file failed");
 415        goto out;
 416    }
 417
 418    /*
 419     * Wait for Secondary finish loading VM states and enter COLO
 420     * restore.
 421     */
 422    colo_receive_check_message(s->rp_state.from_dst_file,
 423                       COLO_MESSAGE_CHECKPOINT_READY, &local_err);
 424    if (local_err) {
 425        goto out;
 426    }
 427    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 428    fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
 429    object_unref(OBJECT(bioc));
 430
 431    qemu_mutex_lock_iothread();
 432    vm_start();
 433    qemu_mutex_unlock_iothread();
 434    trace_colo_vm_state_change("stop", "run");
 435
 436    timer_mod(s->colo_delay_timer,
 437            current_time + s->parameters.x_checkpoint_delay);
 438
 439    while (s->state == MIGRATION_STATUS_COLO) {
 440        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 441            error_report("failover request");
 442            goto out;
 443        }
 444
 445        qemu_sem_wait(&s->colo_checkpoint_sem);
 446
 447        ret = colo_do_checkpoint_transaction(s, bioc, fb);
 448        if (ret < 0) {
 449            goto out;
 450        }
 451    }
 452
 453out:
 454    /* Throw the unreported error message after exited from loop */
 455    if (local_err) {
 456        error_report_err(local_err);
 457    }
 458
 459    if (fb) {
 460        qemu_fclose(fb);
 461    }
 462
 463    timer_del(s->colo_delay_timer);
 464
 465    /* Hope this not to be too long to wait here */
 466    qemu_sem_wait(&s->colo_exit_sem);
 467    qemu_sem_destroy(&s->colo_exit_sem);
 468    /*
 469     * Must be called after failover BH is completed,
 470     * Or the failover BH may shutdown the wrong fd that
 471     * re-used by other threads after we release here.
 472     */
 473    if (s->rp_state.from_dst_file) {
 474        qemu_fclose(s->rp_state.from_dst_file);
 475    }
 476}
 477
 478void colo_checkpoint_notify(void *opaque)
 479{
 480    MigrationState *s = opaque;
 481    int64_t next_notify_time;
 482
 483    qemu_sem_post(&s->colo_checkpoint_sem);
 484    s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 485    next_notify_time = s->colo_checkpoint_time +
 486                    s->parameters.x_checkpoint_delay;
 487    timer_mod(s->colo_delay_timer, next_notify_time);
 488}
 489
 490void migrate_start_colo_process(MigrationState *s)
 491{
 492    qemu_mutex_unlock_iothread();
 493    qemu_sem_init(&s->colo_checkpoint_sem, 0);
 494    s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
 495                                colo_checkpoint_notify, s);
 496
 497    qemu_sem_init(&s->colo_exit_sem, 0);
 498    migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
 499                      MIGRATION_STATUS_COLO);
 500    colo_process_checkpoint(s);
 501    qemu_mutex_lock_iothread();
 502}
 503
 504static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
 505                                     Error **errp)
 506{
 507    COLOMessage msg;
 508    Error *local_err = NULL;
 509
 510    msg = colo_receive_message(f, &local_err);
 511    if (local_err) {
 512        error_propagate(errp, local_err);
 513        return;
 514    }
 515
 516    switch (msg) {
 517    case COLO_MESSAGE_CHECKPOINT_REQUEST:
 518        *checkpoint_request = 1;
 519        break;
 520    default:
 521        *checkpoint_request = 0;
 522        error_setg(errp, "Got unknown COLO message: %d", msg);
 523        break;
 524    }
 525}
 526
 527void *colo_process_incoming_thread(void *opaque)
 528{
 529    MigrationIncomingState *mis = opaque;
 530    QEMUFile *fb = NULL;
 531    QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
 532    uint64_t total_size;
 533    uint64_t value;
 534    Error *local_err = NULL;
 535
 536    qemu_sem_init(&mis->colo_incoming_sem, 0);
 537
 538    migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
 539                      MIGRATION_STATUS_COLO);
 540
 541    failover_init_state();
 542
 543    mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
 544    if (!mis->to_src_file) {
 545        error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
 546        goto out;
 547    }
 548    /*
 549     * Note: the communication between Primary side and Secondary side
 550     * should be sequential, we set the fd to unblocked in migration incoming
 551     * coroutine, and here we are in the COLO incoming thread, so it is ok to
 552     * set the fd back to blocked.
 553     */
 554    qemu_file_set_blocking(mis->from_src_file, true);
 555
 556    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 557    fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
 558    object_unref(OBJECT(bioc));
 559
 560    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
 561                      &local_err);
 562    if (local_err) {
 563        goto out;
 564    }
 565
 566    while (mis->state == MIGRATION_STATUS_COLO) {
 567        int request = 0;
 568
 569        colo_wait_handle_message(mis->from_src_file, &request, &local_err);
 570        if (local_err) {
 571            goto out;
 572        }
 573        assert(request);
 574        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 575            error_report("failover request");
 576            goto out;
 577        }
 578
 579        /* FIXME: This is unnecessary for periodic checkpoint mode */
 580        colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
 581                     &local_err);
 582        if (local_err) {
 583            goto out;
 584        }
 585
 586        colo_receive_check_message(mis->from_src_file,
 587                           COLO_MESSAGE_VMSTATE_SEND, &local_err);
 588        if (local_err) {
 589            goto out;
 590        }
 591
 592        value = colo_receive_message_value(mis->from_src_file,
 593                                 COLO_MESSAGE_VMSTATE_SIZE, &local_err);
 594        if (local_err) {
 595            goto out;
 596        }
 597
 598        /*
 599         * Read VM device state data into channel buffer,
 600         * It's better to re-use the memory allocated.
 601         * Here we need to handle the channel buffer directly.
 602         */
 603        if (value > bioc->capacity) {
 604            bioc->capacity = value;
 605            bioc->data = g_realloc(bioc->data, bioc->capacity);
 606        }
 607        total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
 608        if (total_size != value) {
 609            error_report("Got %" PRIu64 " VMState data, less than expected"
 610                        " %" PRIu64, total_size, value);
 611            goto out;
 612        }
 613        bioc->usage = total_size;
 614        qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 615
 616        colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
 617                     &local_err);
 618        if (local_err) {
 619            goto out;
 620        }
 621
 622        qemu_mutex_lock_iothread();
 623        qemu_system_reset(SHUTDOWN_CAUSE_NONE);
 624        vmstate_loading = true;
 625        if (qemu_loadvm_state(fb) < 0) {
 626            error_report("COLO: loadvm failed");
 627            qemu_mutex_unlock_iothread();
 628            goto out;
 629        }
 630
 631        vmstate_loading = false;
 632        qemu_mutex_unlock_iothread();
 633
 634        if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
 635            failover_set_state(FAILOVER_STATUS_RELAUNCH,
 636                            FAILOVER_STATUS_NONE);
 637            failover_request_active(NULL);
 638            goto out;
 639        }
 640
 641        colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
 642                     &local_err);
 643        if (local_err) {
 644            goto out;
 645        }
 646    }
 647
 648out:
 649    vmstate_loading = false;
 650    /* Throw the unreported error message after exited from loop */
 651    if (local_err) {
 652        error_report_err(local_err);
 653    }
 654
 655    if (fb) {
 656        qemu_fclose(fb);
 657    }
 658
 659    /* Hope this not to be too long to loop here */
 660    qemu_sem_wait(&mis->colo_incoming_sem);
 661    qemu_sem_destroy(&mis->colo_incoming_sem);
 662    /* Must be called after failover BH is completed */
 663    if (mis->to_src_file) {
 664        qemu_fclose(mis->to_src_file);
 665    }
 666    migration_incoming_exit_colo();
 667
 668    return NULL;
 669}
 670