qemu/migration/colo.c
<<
>>
Prefs
   1/*
   2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
   3 * (a.k.a. Fault Tolerance or Continuous Replication)
   4 *
   5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
   6 * Copyright (c) 2016 FUJITSU LIMITED
   7 * Copyright (c) 2016 Intel Corporation
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or
  10 * later.  See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include "qemu/timer.h"
  15#include "sysemu/sysemu.h"
  16#include "migration/colo.h"
  17#include "io/channel-buffer.h"
  18#include "trace.h"
  19#include "qemu/error-report.h"
  20#include "qapi/error.h"
  21#include "migration/failover.h"
  22
  23#define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
  24
  25bool colo_supported(void)
  26{
  27    return true;
  28}
  29
  30bool migration_in_colo_state(void)
  31{
  32    MigrationState *s = migrate_get_current();
  33
  34    return (s->state == MIGRATION_STATUS_COLO);
  35}
  36
  37bool migration_incoming_in_colo_state(void)
  38{
  39    MigrationIncomingState *mis = migration_incoming_get_current();
  40
  41    return mis && (mis->state == MIGRATION_STATUS_COLO);
  42}
  43
  44static bool colo_runstate_is_stopped(void)
  45{
  46    return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
  47}
  48
  49static void secondary_vm_do_failover(void)
  50{
  51    int old_state;
  52    MigrationIncomingState *mis = migration_incoming_get_current();
  53
  54    migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
  55                      MIGRATION_STATUS_COMPLETED);
  56
  57    if (!autostart) {
  58        error_report("\"-S\" qemu option will be ignored in secondary side");
  59        /* recover runstate to normal migration finish state */
  60        autostart = true;
  61    }
  62
  63    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
  64                                   FAILOVER_STATUS_COMPLETED);
  65    if (old_state != FAILOVER_STATUS_ACTIVE) {
  66        error_report("Incorrect state (%s) while doing failover for "
  67                     "secondary VM", FailoverStatus_lookup[old_state]);
  68        return;
  69    }
  70    /* For Secondary VM, jump to incoming co */
  71    if (mis->migration_incoming_co) {
  72        qemu_coroutine_enter(mis->migration_incoming_co);
  73    }
  74}
  75
  76static void primary_vm_do_failover(void)
  77{
  78    MigrationState *s = migrate_get_current();
  79    int old_state;
  80
  81    migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
  82                      MIGRATION_STATUS_COMPLETED);
  83
  84    old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
  85                                   FAILOVER_STATUS_COMPLETED);
  86    if (old_state != FAILOVER_STATUS_ACTIVE) {
  87        error_report("Incorrect state (%s) while doing failover for Primary VM",
  88                     FailoverStatus_lookup[old_state]);
  89        return;
  90    }
  91}
  92
  93void colo_do_failover(MigrationState *s)
  94{
  95    /* Make sure VM stopped while failover happened. */
  96    if (!colo_runstate_is_stopped()) {
  97        vm_stop_force_state(RUN_STATE_COLO);
  98    }
  99
 100    if (get_colo_mode() == COLO_MODE_PRIMARY) {
 101        primary_vm_do_failover();
 102    } else {
 103        secondary_vm_do_failover();
 104    }
 105}
 106
 107static void colo_send_message(QEMUFile *f, COLOMessage msg,
 108                              Error **errp)
 109{
 110    int ret;
 111
 112    if (msg >= COLO_MESSAGE__MAX) {
 113        error_setg(errp, "%s: Invalid message", __func__);
 114        return;
 115    }
 116    qemu_put_be32(f, msg);
 117    qemu_fflush(f);
 118
 119    ret = qemu_file_get_error(f);
 120    if (ret < 0) {
 121        error_setg_errno(errp, -ret, "Can't send COLO message");
 122    }
 123    trace_colo_send_message(COLOMessage_lookup[msg]);
 124}
 125
 126static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
 127                                    uint64_t value, Error **errp)
 128{
 129    Error *local_err = NULL;
 130    int ret;
 131
 132    colo_send_message(f, msg, &local_err);
 133    if (local_err) {
 134        error_propagate(errp, local_err);
 135        return;
 136    }
 137    qemu_put_be64(f, value);
 138    qemu_fflush(f);
 139
 140    ret = qemu_file_get_error(f);
 141    if (ret < 0) {
 142        error_setg_errno(errp, -ret, "Failed to send value for message:%s",
 143                         COLOMessage_lookup[msg]);
 144    }
 145}
 146
 147static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
 148{
 149    COLOMessage msg;
 150    int ret;
 151
 152    msg = qemu_get_be32(f);
 153    ret = qemu_file_get_error(f);
 154    if (ret < 0) {
 155        error_setg_errno(errp, -ret, "Can't receive COLO message");
 156        return msg;
 157    }
 158    if (msg >= COLO_MESSAGE__MAX) {
 159        error_setg(errp, "%s: Invalid message", __func__);
 160        return msg;
 161    }
 162    trace_colo_receive_message(COLOMessage_lookup[msg]);
 163    return msg;
 164}
 165
 166static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
 167                                       Error **errp)
 168{
 169    COLOMessage msg;
 170    Error *local_err = NULL;
 171
 172    msg = colo_receive_message(f, &local_err);
 173    if (local_err) {
 174        error_propagate(errp, local_err);
 175        return;
 176    }
 177    if (msg != expect_msg) {
 178        error_setg(errp, "Unexpected COLO message %d, expected %d",
 179                          msg, expect_msg);
 180    }
 181}
 182
 183static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
 184                                           Error **errp)
 185{
 186    Error *local_err = NULL;
 187    uint64_t value;
 188    int ret;
 189
 190    colo_receive_check_message(f, expect_msg, &local_err);
 191    if (local_err) {
 192        error_propagate(errp, local_err);
 193        return 0;
 194    }
 195
 196    value = qemu_get_be64(f);
 197    ret = qemu_file_get_error(f);
 198    if (ret < 0) {
 199        error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
 200                         COLOMessage_lookup[expect_msg]);
 201    }
 202    return value;
 203}
 204
 205static int colo_do_checkpoint_transaction(MigrationState *s,
 206                                          QIOChannelBuffer *bioc,
 207                                          QEMUFile *fb)
 208{
 209    Error *local_err = NULL;
 210    int ret = -1;
 211
 212    colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
 213                      &local_err);
 214    if (local_err) {
 215        goto out;
 216    }
 217
 218    colo_receive_check_message(s->rp_state.from_dst_file,
 219                    COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
 220    if (local_err) {
 221        goto out;
 222    }
 223    /* Reset channel-buffer directly */
 224    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 225    bioc->usage = 0;
 226
 227    qemu_mutex_lock_iothread();
 228    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 229        qemu_mutex_unlock_iothread();
 230        goto out;
 231    }
 232    vm_stop_force_state(RUN_STATE_COLO);
 233    qemu_mutex_unlock_iothread();
 234    trace_colo_vm_state_change("run", "stop");
 235    /*
 236     * Failover request bh could be called after vm_stop_force_state(),
 237     * So we need check failover_request_is_active() again.
 238     */
 239    if (failover_get_state() != FAILOVER_STATUS_NONE) {
 240        goto out;
 241    }
 242
 243    /* Disable block migration */
 244    s->params.blk = 0;
 245    s->params.shared = 0;
 246    qemu_savevm_state_header(fb);
 247    qemu_savevm_state_begin(fb, &s->params);
 248    qemu_mutex_lock_iothread();
 249    qemu_savevm_state_complete_precopy(fb, false);
 250    qemu_mutex_unlock_iothread();
 251
 252    qemu_fflush(fb);
 253
 254    colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
 255    if (local_err) {
 256        goto out;
 257    }
 258    /*
 259     * We need the size of the VMstate data in Secondary side,
 260     * With which we can decide how much data should be read.
 261     */
 262    colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
 263                            bioc->usage, &local_err);
 264    if (local_err) {
 265        goto out;
 266    }
 267
 268    qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
 269    qemu_fflush(s->to_dst_file);
 270    ret = qemu_file_get_error(s->to_dst_file);
 271    if (ret < 0) {
 272        goto out;
 273    }
 274
 275    colo_receive_check_message(s->rp_state.from_dst_file,
 276                       COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
 277    if (local_err) {
 278        goto out;
 279    }
 280
 281    colo_receive_check_message(s->rp_state.from_dst_file,
 282                       COLO_MESSAGE_VMSTATE_LOADED, &local_err);
 283    if (local_err) {
 284        goto out;
 285    }
 286
 287    ret = 0;
 288
 289    qemu_mutex_lock_iothread();
 290    vm_start();
 291    qemu_mutex_unlock_iothread();
 292    trace_colo_vm_state_change("stop", "run");
 293
 294out:
 295    if (local_err) {
 296        error_report_err(local_err);
 297    }
 298    return ret;
 299}
 300
 301static void colo_process_checkpoint(MigrationState *s)
 302{
 303    QIOChannelBuffer *bioc;
 304    QEMUFile *fb = NULL;
 305    int64_t current_time, checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 306    Error *local_err = NULL;
 307    int ret;
 308
 309    failover_init_state();
 310
 311    s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
 312    if (!s->rp_state.from_dst_file) {
 313        error_report("Open QEMUFile from_dst_file failed");
 314        goto out;
 315    }
 316
 317    /*
 318     * Wait for Secondary finish loading VM states and enter COLO
 319     * restore.
 320     */
 321    colo_receive_check_message(s->rp_state.from_dst_file,
 322                       COLO_MESSAGE_CHECKPOINT_READY, &local_err);
 323    if (local_err) {
 324        goto out;
 325    }
 326    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 327    fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
 328    object_unref(OBJECT(bioc));
 329
 330    qemu_mutex_lock_iothread();
 331    vm_start();
 332    qemu_mutex_unlock_iothread();
 333    trace_colo_vm_state_change("stop", "run");
 334
 335    while (s->state == MIGRATION_STATUS_COLO) {
 336        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 337            error_report("failover request");
 338            goto out;
 339        }
 340
 341        current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 342        if (current_time - checkpoint_time <
 343            s->parameters.x_checkpoint_delay) {
 344            int64_t delay_ms;
 345
 346            delay_ms = s->parameters.x_checkpoint_delay -
 347                       (current_time - checkpoint_time);
 348            g_usleep(delay_ms * 1000);
 349        }
 350        ret = colo_do_checkpoint_transaction(s, bioc, fb);
 351        if (ret < 0) {
 352            goto out;
 353        }
 354        checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 355    }
 356
 357out:
 358    /* Throw the unreported error message after exited from loop */
 359    if (local_err) {
 360        error_report_err(local_err);
 361    }
 362
 363    if (fb) {
 364        qemu_fclose(fb);
 365    }
 366
 367    if (s->rp_state.from_dst_file) {
 368        qemu_fclose(s->rp_state.from_dst_file);
 369    }
 370}
 371
 372void migrate_start_colo_process(MigrationState *s)
 373{
 374    qemu_mutex_unlock_iothread();
 375    migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
 376                      MIGRATION_STATUS_COLO);
 377    colo_process_checkpoint(s);
 378    qemu_mutex_lock_iothread();
 379}
 380
 381static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
 382                                     Error **errp)
 383{
 384    COLOMessage msg;
 385    Error *local_err = NULL;
 386
 387    msg = colo_receive_message(f, &local_err);
 388    if (local_err) {
 389        error_propagate(errp, local_err);
 390        return;
 391    }
 392
 393    switch (msg) {
 394    case COLO_MESSAGE_CHECKPOINT_REQUEST:
 395        *checkpoint_request = 1;
 396        break;
 397    default:
 398        *checkpoint_request = 0;
 399        error_setg(errp, "Got unknown COLO message: %d", msg);
 400        break;
 401    }
 402}
 403
 404void *colo_process_incoming_thread(void *opaque)
 405{
 406    MigrationIncomingState *mis = opaque;
 407    QEMUFile *fb = NULL;
 408    QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
 409    uint64_t total_size;
 410    uint64_t value;
 411    Error *local_err = NULL;
 412
 413    migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
 414                      MIGRATION_STATUS_COLO);
 415
 416    failover_init_state();
 417
 418    mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
 419    if (!mis->to_src_file) {
 420        error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
 421        goto out;
 422    }
 423    /*
 424     * Note: the communication between Primary side and Secondary side
 425     * should be sequential, we set the fd to unblocked in migration incoming
 426     * coroutine, and here we are in the COLO incoming thread, so it is ok to
 427     * set the fd back to blocked.
 428     */
 429    qemu_file_set_blocking(mis->from_src_file, true);
 430
 431    bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
 432    fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
 433    object_unref(OBJECT(bioc));
 434
 435    colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
 436                      &local_err);
 437    if (local_err) {
 438        goto out;
 439    }
 440
 441    while (mis->state == MIGRATION_STATUS_COLO) {
 442        int request = 0;
 443
 444        colo_wait_handle_message(mis->from_src_file, &request, &local_err);
 445        if (local_err) {
 446            goto out;
 447        }
 448        assert(request);
 449        if (failover_get_state() != FAILOVER_STATUS_NONE) {
 450            error_report("failover request");
 451            goto out;
 452        }
 453
 454        /* FIXME: This is unnecessary for periodic checkpoint mode */
 455        colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
 456                     &local_err);
 457        if (local_err) {
 458            goto out;
 459        }
 460
 461        colo_receive_check_message(mis->from_src_file,
 462                           COLO_MESSAGE_VMSTATE_SEND, &local_err);
 463        if (local_err) {
 464            goto out;
 465        }
 466
 467        value = colo_receive_message_value(mis->from_src_file,
 468                                 COLO_MESSAGE_VMSTATE_SIZE, &local_err);
 469        if (local_err) {
 470            goto out;
 471        }
 472
 473        /*
 474         * Read VM device state data into channel buffer,
 475         * It's better to re-use the memory allocated.
 476         * Here we need to handle the channel buffer directly.
 477         */
 478        if (value > bioc->capacity) {
 479            bioc->capacity = value;
 480            bioc->data = g_realloc(bioc->data, bioc->capacity);
 481        }
 482        total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
 483        if (total_size != value) {
 484            error_report("Got %" PRIu64 " VMState data, less than expected"
 485                        " %" PRIu64, total_size, value);
 486            goto out;
 487        }
 488        bioc->usage = total_size;
 489        qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
 490
 491        colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
 492                     &local_err);
 493        if (local_err) {
 494            goto out;
 495        }
 496
 497        qemu_mutex_lock_iothread();
 498        qemu_system_reset(VMRESET_SILENT);
 499        if (qemu_loadvm_state(fb) < 0) {
 500            error_report("COLO: loadvm failed");
 501            qemu_mutex_unlock_iothread();
 502            goto out;
 503        }
 504        qemu_mutex_unlock_iothread();
 505
 506        colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
 507                     &local_err);
 508        if (local_err) {
 509            goto out;
 510        }
 511    }
 512
 513out:
 514    /* Throw the unreported error message after exited from loop */
 515    if (local_err) {
 516        error_report_err(local_err);
 517    }
 518
 519    if (fb) {
 520        qemu_fclose(fb);
 521    }
 522
 523    if (mis->to_src_file) {
 524        qemu_fclose(mis->to_src_file);
 525    }
 526    migration_incoming_exit_colo();
 527
 528    return NULL;
 529}
 530