qemu/migration/migration.c
<<
>>
Prefs
   1/*
   2 * QEMU live migration
   3 *
   4 * Copyright IBM, Corp. 2008
   5 *
   6 * Authors:
   7 *  Anthony Liguori   <aliguori@us.ibm.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2.  See
  10 * the COPYING file in the top-level directory.
  11 *
  12 * Contributions after 2012-01-13 are licensed under the terms of the
  13 * GNU GPL, version 2 or (at your option) any later version.
  14 */
  15
  16#include "qemu-common.h"
  17#include "qemu/error-report.h"
  18#include "qemu/main-loop.h"
  19#include "migration/migration.h"
  20#include "migration/qemu-file.h"
  21#include "sysemu/sysemu.h"
  22#include "block/block.h"
  23#include "qapi/qmp/qerror.h"
  24#include "qapi/util.h"
  25#include "qemu/sockets.h"
  26#include "qemu/rcu.h"
  27#include "migration/block.h"
  28#include "migration/postcopy-ram.h"
  29#include "qemu/thread.h"
  30#include "qmp-commands.h"
  31#include "trace.h"
  32#include "qapi-event.h"
  33#include "qom/cpu.h"
  34#include "exec/memory.h"
  35#include "exec/address-spaces.h"
  36
  37#define MAX_THROTTLE  (32 << 20)      /* Migration transfer speed throttling */
  38
  39/* Amount of time to allocate to each "chunk" of bandwidth-throttled
  40 * data. */
  41#define BUFFER_DELAY     100
  42#define XFER_LIMIT_RATIO (1000 / BUFFER_DELAY)
  43
  44/* Default compression thread count */
  45#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
  46/* Default decompression thread count, usually decompression is at
  47 * least 4 times as fast as compression.*/
  48#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
  49/*0: means nocompress, 1: best speed, ... 9: best compress ratio */
  50#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
  51/* Define default autoconverge cpu throttle migration parameters */
  52#define DEFAULT_MIGRATE_X_CPU_THROTTLE_INITIAL 20
  53#define DEFAULT_MIGRATE_X_CPU_THROTTLE_INCREMENT 10
  54
  55/* Migration XBZRLE default cache size */
  56#define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024)
  57
  58static NotifierList migration_state_notifiers =
  59    NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
  60
  61static bool deferred_incoming;
  62
  63/*
  64 * Current state of incoming postcopy; note this is not part of
  65 * MigrationIncomingState since it's state is used during cleanup
  66 * at the end as MIS is being freed.
  67 */
  68static PostcopyState incoming_postcopy_state;
  69
  70/* When we add fault tolerance, we could have several
  71   migrations at once.  For now we don't need to add
  72   dynamic creation of migration */
  73
  74/* For outgoing */
  75MigrationState *migrate_get_current(void)
  76{
  77    static bool once;
  78    static MigrationState current_migration = {
  79        .state = MIGRATION_STATUS_NONE,
  80        .bandwidth_limit = MAX_THROTTLE,
  81        .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
  82        .mbps = -1,
  83        .parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL] =
  84                DEFAULT_MIGRATE_COMPRESS_LEVEL,
  85        .parameters[MIGRATION_PARAMETER_COMPRESS_THREADS] =
  86                DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
  87        .parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS] =
  88                DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,
  89        .parameters[MIGRATION_PARAMETER_X_CPU_THROTTLE_INITIAL] =
  90                DEFAULT_MIGRATE_X_CPU_THROTTLE_INITIAL,
  91        .parameters[MIGRATION_PARAMETER_X_CPU_THROTTLE_INCREMENT] =
  92                DEFAULT_MIGRATE_X_CPU_THROTTLE_INCREMENT,
  93    };
  94
  95    if (!once) {
  96        qemu_mutex_init(&current_migration.src_page_req_mutex);
  97        once = true;
  98    }
  99    return &current_migration;
 100}
 101
 102/* For incoming */
 103static MigrationIncomingState *mis_current;
 104
 105MigrationIncomingState *migration_incoming_get_current(void)
 106{
 107    return mis_current;
 108}
 109
 110MigrationIncomingState *migration_incoming_state_new(QEMUFile* f)
 111{
 112    mis_current = g_new0(MigrationIncomingState, 1);
 113    mis_current->from_src_file = f;
 114    QLIST_INIT(&mis_current->loadvm_handlers);
 115    qemu_mutex_init(&mis_current->rp_mutex);
 116    qemu_event_init(&mis_current->main_thread_load_event, false);
 117
 118    return mis_current;
 119}
 120
 121void migration_incoming_state_destroy(void)
 122{
 123    qemu_event_destroy(&mis_current->main_thread_load_event);
 124    loadvm_free_handlers(mis_current);
 125    g_free(mis_current);
 126    mis_current = NULL;
 127}
 128
 129
 130typedef struct {
 131    bool optional;
 132    uint32_t size;
 133    uint8_t runstate[100];
 134    RunState state;
 135    bool received;
 136} GlobalState;
 137
 138static GlobalState global_state;
 139
 140int global_state_store(void)
 141{
 142    if (!runstate_store((char *)global_state.runstate,
 143                        sizeof(global_state.runstate))) {
 144        error_report("runstate name too big: %s", global_state.runstate);
 145        trace_migrate_state_too_big();
 146        return -EINVAL;
 147    }
 148    return 0;
 149}
 150
 151void global_state_store_running(void)
 152{
 153    const char *state = RunState_lookup[RUN_STATE_RUNNING];
 154    strncpy((char *)global_state.runstate,
 155           state, sizeof(global_state.runstate));
 156}
 157
 158static bool global_state_received(void)
 159{
 160    return global_state.received;
 161}
 162
 163static RunState global_state_get_runstate(void)
 164{
 165    return global_state.state;
 166}
 167
 168void global_state_set_optional(void)
 169{
 170    global_state.optional = true;
 171}
 172
 173static bool global_state_needed(void *opaque)
 174{
 175    GlobalState *s = opaque;
 176    char *runstate = (char *)s->runstate;
 177
 178    /* If it is not optional, it is mandatory */
 179
 180    if (s->optional == false) {
 181        return true;
 182    }
 183
 184    /* If state is running or paused, it is not needed */
 185
 186    if (strcmp(runstate, "running") == 0 ||
 187        strcmp(runstate, "paused") == 0) {
 188        return false;
 189    }
 190
 191    /* for any other state it is needed */
 192    return true;
 193}
 194
 195static int global_state_post_load(void *opaque, int version_id)
 196{
 197    GlobalState *s = opaque;
 198    Error *local_err = NULL;
 199    int r;
 200    char *runstate = (char *)s->runstate;
 201
 202    s->received = true;
 203    trace_migrate_global_state_post_load(runstate);
 204
 205    r = qapi_enum_parse(RunState_lookup, runstate, RUN_STATE_MAX,
 206                                -1, &local_err);
 207
 208    if (r == -1) {
 209        if (local_err) {
 210            error_report_err(local_err);
 211        }
 212        return -EINVAL;
 213    }
 214    s->state = r;
 215
 216    return 0;
 217}
 218
 219static void global_state_pre_save(void *opaque)
 220{
 221    GlobalState *s = opaque;
 222
 223    trace_migrate_global_state_pre_save((char *)s->runstate);
 224    s->size = strlen((char *)s->runstate) + 1;
 225}
 226
 227static const VMStateDescription vmstate_globalstate = {
 228    .name = "globalstate",
 229    .version_id = 1,
 230    .minimum_version_id = 1,
 231    .post_load = global_state_post_load,
 232    .pre_save = global_state_pre_save,
 233    .needed = global_state_needed,
 234    .fields = (VMStateField[]) {
 235        VMSTATE_UINT32(size, GlobalState),
 236        VMSTATE_BUFFER(runstate, GlobalState),
 237        VMSTATE_END_OF_LIST()
 238    },
 239};
 240
 241void register_global_state(void)
 242{
 243    /* We would use it independently that we receive it */
 244    strcpy((char *)&global_state.runstate, "");
 245    global_state.received = false;
 246    vmstate_register(NULL, 0, &vmstate_globalstate, &global_state);
 247}
 248
 249static void migrate_generate_event(int new_state)
 250{
 251    if (migrate_use_events()) {
 252        qapi_event_send_migration(new_state, &error_abort);
 253    }
 254}
 255
 256/*
 257 * Called on -incoming with a defer: uri.
 258 * The migration can be started later after any parameters have been
 259 * changed.
 260 */
 261static void deferred_incoming_migration(Error **errp)
 262{
 263    if (deferred_incoming) {
 264        error_setg(errp, "Incoming migration already deferred");
 265    }
 266    deferred_incoming = true;
 267}
 268
 269/* Request a range of pages from the source VM at the given
 270 * start address.
 271 *   rbname: Name of the RAMBlock to request the page in, if NULL it's the same
 272 *           as the last request (a name must have been given previously)
 273 *   Start: Address offset within the RB
 274 *   Len: Length in bytes required - must be a multiple of pagesize
 275 */
 276void migrate_send_rp_req_pages(MigrationIncomingState *mis, const char *rbname,
 277                               ram_addr_t start, size_t len)
 278{
 279    uint8_t bufc[12 + 1 + 255]; /* start (8), len (4), rbname upto 256 */
 280    size_t msglen = 12; /* start + len */
 281
 282    *(uint64_t *)bufc = cpu_to_be64((uint64_t)start);
 283    *(uint32_t *)(bufc + 8) = cpu_to_be32((uint32_t)len);
 284
 285    if (rbname) {
 286        int rbname_len = strlen(rbname);
 287        assert(rbname_len < 256);
 288
 289        bufc[msglen++] = rbname_len;
 290        memcpy(bufc + msglen, rbname, rbname_len);
 291        msglen += rbname_len;
 292        migrate_send_rp_message(mis, MIG_RP_MSG_REQ_PAGES_ID, msglen, bufc);
 293    } else {
 294        migrate_send_rp_message(mis, MIG_RP_MSG_REQ_PAGES, msglen, bufc);
 295    }
 296}
 297
 298void qemu_start_incoming_migration(const char *uri, Error **errp)
 299{
 300    const char *p;
 301
 302    qapi_event_send_migration(MIGRATION_STATUS_SETUP, &error_abort);
 303    if (!strcmp(uri, "defer")) {
 304        deferred_incoming_migration(errp);
 305    } else if (strstart(uri, "tcp:", &p)) {
 306        tcp_start_incoming_migration(p, errp);
 307#ifdef CONFIG_RDMA
 308    } else if (strstart(uri, "rdma:", &p)) {
 309        rdma_start_incoming_migration(p, errp);
 310#endif
 311#if !defined(WIN32)
 312    } else if (strstart(uri, "exec:", &p)) {
 313        exec_start_incoming_migration(p, errp);
 314    } else if (strstart(uri, "unix:", &p)) {
 315        unix_start_incoming_migration(p, errp);
 316    } else if (strstart(uri, "fd:", &p)) {
 317        fd_start_incoming_migration(p, errp);
 318#endif
 319    } else {
 320        error_setg(errp, "unknown migration protocol: %s", uri);
 321    }
 322}
 323
 324static void process_incoming_migration_co(void *opaque)
 325{
 326    QEMUFile *f = opaque;
 327    Error *local_err = NULL;
 328    MigrationIncomingState *mis;
 329    PostcopyState ps;
 330    int ret;
 331
 332    mis = migration_incoming_state_new(f);
 333    postcopy_state_set(POSTCOPY_INCOMING_NONE);
 334    migrate_generate_event(MIGRATION_STATUS_ACTIVE);
 335
 336    ret = qemu_loadvm_state(f);
 337
 338    ps = postcopy_state_get();
 339    trace_process_incoming_migration_co_end(ret, ps);
 340    if (ps != POSTCOPY_INCOMING_NONE) {
 341        if (ps == POSTCOPY_INCOMING_ADVISE) {
 342            /*
 343             * Where a migration had postcopy enabled (and thus went to advise)
 344             * but managed to complete within the precopy period, we can use
 345             * the normal exit.
 346             */
 347            postcopy_ram_incoming_cleanup(mis);
 348        } else if (ret >= 0) {
 349            /*
 350             * Postcopy was started, cleanup should happen at the end of the
 351             * postcopy thread.
 352             */
 353            trace_process_incoming_migration_co_postcopy_end_main();
 354            return;
 355        }
 356        /* Else if something went wrong then just fall out of the normal exit */
 357    }
 358
 359    qemu_fclose(f);
 360    free_xbzrle_decoded_buf();
 361    migration_incoming_state_destroy();
 362
 363    if (ret < 0) {
 364        migrate_generate_event(MIGRATION_STATUS_FAILED);
 365        error_report("load of migration failed: %s", strerror(-ret));
 366        migrate_decompress_threads_join();
 367        exit(EXIT_FAILURE);
 368    }
 369
 370    /* Make sure all file formats flush their mutable metadata */
 371    bdrv_invalidate_cache_all(&local_err);
 372    if (local_err) {
 373        migrate_generate_event(MIGRATION_STATUS_FAILED);
 374        error_report_err(local_err);
 375        migrate_decompress_threads_join();
 376        exit(EXIT_FAILURE);
 377    }
 378
 379    /*
 380     * This must happen after all error conditions are dealt with and
 381     * we're sure the VM is going to be running on this host.
 382     */
 383    qemu_announce_self();
 384
 385    /* If global state section was not received or we are in running
 386       state, we need to obey autostart. Any other state is set with
 387       runstate_set. */
 388
 389    if (!global_state_received() ||
 390        global_state_get_runstate() == RUN_STATE_RUNNING) {
 391        if (autostart) {
 392            vm_start();
 393        } else {
 394            runstate_set(RUN_STATE_PAUSED);
 395        }
 396    } else {
 397        runstate_set(global_state_get_runstate());
 398    }
 399    migrate_decompress_threads_join();
 400    /*
 401     * This must happen after any state changes since as soon as an external
 402     * observer sees this event they might start to prod at the VM assuming
 403     * it's ready to use.
 404     */
 405    migrate_generate_event(MIGRATION_STATUS_COMPLETED);
 406}
 407
 408void process_incoming_migration(QEMUFile *f)
 409{
 410    Coroutine *co = qemu_coroutine_create(process_incoming_migration_co);
 411    int fd = qemu_get_fd(f);
 412
 413    assert(fd != -1);
 414    migrate_decompress_threads_create();
 415    qemu_set_nonblock(fd);
 416    qemu_coroutine_enter(co, f);
 417}
 418
 419/*
 420 * Send a message on the return channel back to the source
 421 * of the migration.
 422 */
 423void migrate_send_rp_message(MigrationIncomingState *mis,
 424                             enum mig_rp_message_type message_type,
 425                             uint16_t len, void *data)
 426{
 427    trace_migrate_send_rp_message((int)message_type, len);
 428    qemu_mutex_lock(&mis->rp_mutex);
 429    qemu_put_be16(mis->to_src_file, (unsigned int)message_type);
 430    qemu_put_be16(mis->to_src_file, len);
 431    qemu_put_buffer(mis->to_src_file, data, len);
 432    qemu_fflush(mis->to_src_file);
 433    qemu_mutex_unlock(&mis->rp_mutex);
 434}
 435
 436/*
 437 * Send a 'SHUT' message on the return channel with the given value
 438 * to indicate that we've finished with the RP.  Non-0 value indicates
 439 * error.
 440 */
 441void migrate_send_rp_shut(MigrationIncomingState *mis,
 442                          uint32_t value)
 443{
 444    uint32_t buf;
 445
 446    buf = cpu_to_be32(value);
 447    migrate_send_rp_message(mis, MIG_RP_MSG_SHUT, sizeof(buf), &buf);
 448}
 449
 450/*
 451 * Send a 'PONG' message on the return channel with the given value
 452 * (normally in response to a 'PING')
 453 */
 454void migrate_send_rp_pong(MigrationIncomingState *mis,
 455                          uint32_t value)
 456{
 457    uint32_t buf;
 458
 459    buf = cpu_to_be32(value);
 460    migrate_send_rp_message(mis, MIG_RP_MSG_PONG, sizeof(buf), &buf);
 461}
 462
 463/* amount of nanoseconds we are willing to wait for migration to be down.
 464 * the choice of nanoseconds is because it is the maximum resolution that
 465 * get_clock() can achieve. It is an internal measure. All user-visible
 466 * units must be in seconds */
 467static uint64_t max_downtime = 300000000;
 468
 469uint64_t migrate_max_downtime(void)
 470{
 471    return max_downtime;
 472}
 473
 474MigrationCapabilityStatusList *qmp_query_migrate_capabilities(Error **errp)
 475{
 476    MigrationCapabilityStatusList *head = NULL;
 477    MigrationCapabilityStatusList *caps;
 478    MigrationState *s = migrate_get_current();
 479    int i;
 480
 481    caps = NULL; /* silence compiler warning */
 482    for (i = 0; i < MIGRATION_CAPABILITY_MAX; i++) {
 483        if (head == NULL) {
 484            head = g_malloc0(sizeof(*caps));
 485            caps = head;
 486        } else {
 487            caps->next = g_malloc0(sizeof(*caps));
 488            caps = caps->next;
 489        }
 490        caps->value =
 491            g_malloc(sizeof(*caps->value));
 492        caps->value->capability = i;
 493        caps->value->state = s->enabled_capabilities[i];
 494    }
 495
 496    return head;
 497}
 498
 499MigrationParameters *qmp_query_migrate_parameters(Error **errp)
 500{
 501    MigrationParameters *params;
 502    MigrationState *s = migrate_get_current();
 503
 504    params = g_malloc0(sizeof(*params));
 505    params->compress_level = s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL];
 506    params->compress_threads =
 507            s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS];
 508    params->decompress_threads =
 509            s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS];
 510    params->x_cpu_throttle_initial =
 511            s->parameters[MIGRATION_PARAMETER_X_CPU_THROTTLE_INITIAL];
 512    params->x_cpu_throttle_increment =
 513            s->parameters[MIGRATION_PARAMETER_X_CPU_THROTTLE_INCREMENT];
 514
 515    return params;
 516}
 517
 518/*
 519 * Return true if we're already in the middle of a migration
 520 * (i.e. any of the active or setup states)
 521 */
 522static bool migration_is_setup_or_active(int state)
 523{
 524    switch (state) {
 525    case MIGRATION_STATUS_ACTIVE:
 526    case MIGRATION_STATUS_POSTCOPY_ACTIVE:
 527    case MIGRATION_STATUS_SETUP:
 528        return true;
 529
 530    default:
 531        return false;
 532
 533    }
 534}
 535
 536static void get_xbzrle_cache_stats(MigrationInfo *info)
 537{
 538    if (migrate_use_xbzrle()) {
 539        info->has_xbzrle_cache = true;
 540        info->xbzrle_cache = g_malloc0(sizeof(*info->xbzrle_cache));
 541        info->xbzrle_cache->cache_size = migrate_xbzrle_cache_size();
 542        info->xbzrle_cache->bytes = xbzrle_mig_bytes_transferred();
 543        info->xbzrle_cache->pages = xbzrle_mig_pages_transferred();
 544        info->xbzrle_cache->cache_miss = xbzrle_mig_pages_cache_miss();
 545        info->xbzrle_cache->cache_miss_rate = xbzrle_mig_cache_miss_rate();
 546        info->xbzrle_cache->overflow = xbzrle_mig_pages_overflow();
 547    }
 548}
 549
 550MigrationInfo *qmp_query_migrate(Error **errp)
 551{
 552    MigrationInfo *info = g_malloc0(sizeof(*info));
 553    MigrationState *s = migrate_get_current();
 554
 555    switch (s->state) {
 556    case MIGRATION_STATUS_NONE:
 557        /* no migration has happened ever */
 558        break;
 559    case MIGRATION_STATUS_SETUP:
 560        info->has_status = true;
 561        info->has_total_time = false;
 562        break;
 563    case MIGRATION_STATUS_ACTIVE:
 564    case MIGRATION_STATUS_CANCELLING:
 565        info->has_status = true;
 566        info->has_total_time = true;
 567        info->total_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME)
 568            - s->total_time;
 569        info->has_expected_downtime = true;
 570        info->expected_downtime = s->expected_downtime;
 571        info->has_setup_time = true;
 572        info->setup_time = s->setup_time;
 573
 574        info->has_ram = true;
 575        info->ram = g_malloc0(sizeof(*info->ram));
 576        info->ram->transferred = ram_bytes_transferred();
 577        info->ram->remaining = ram_bytes_remaining();
 578        info->ram->total = ram_bytes_total();
 579        info->ram->duplicate = dup_mig_pages_transferred();
 580        info->ram->skipped = skipped_mig_pages_transferred();
 581        info->ram->normal = norm_mig_pages_transferred();
 582        info->ram->normal_bytes = norm_mig_bytes_transferred();
 583        info->ram->dirty_pages_rate = s->dirty_pages_rate;
 584        info->ram->mbps = s->mbps;
 585        info->ram->dirty_sync_count = s->dirty_sync_count;
 586
 587        if (blk_mig_active()) {
 588            info->has_disk = true;
 589            info->disk = g_malloc0(sizeof(*info->disk));
 590            info->disk->transferred = blk_mig_bytes_transferred();
 591            info->disk->remaining = blk_mig_bytes_remaining();
 592            info->disk->total = blk_mig_bytes_total();
 593        }
 594
 595        if (cpu_throttle_active()) {
 596            info->has_x_cpu_throttle_percentage = true;
 597            info->x_cpu_throttle_percentage = cpu_throttle_get_percentage();
 598        }
 599
 600        get_xbzrle_cache_stats(info);
 601        break;
 602    case MIGRATION_STATUS_POSTCOPY_ACTIVE:
 603        /* Mostly the same as active; TODO add some postcopy stats */
 604        info->has_status = true;
 605        info->has_total_time = true;
 606        info->total_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME)
 607            - s->total_time;
 608        info->has_expected_downtime = true;
 609        info->expected_downtime = s->expected_downtime;
 610        info->has_setup_time = true;
 611        info->setup_time = s->setup_time;
 612
 613        info->has_ram = true;
 614        info->ram = g_malloc0(sizeof(*info->ram));
 615        info->ram->transferred = ram_bytes_transferred();
 616        info->ram->remaining = ram_bytes_remaining();
 617        info->ram->total = ram_bytes_total();
 618        info->ram->duplicate = dup_mig_pages_transferred();
 619        info->ram->skipped = skipped_mig_pages_transferred();
 620        info->ram->normal = norm_mig_pages_transferred();
 621        info->ram->normal_bytes = norm_mig_bytes_transferred();
 622        info->ram->dirty_pages_rate = s->dirty_pages_rate;
 623        info->ram->mbps = s->mbps;
 624
 625        if (blk_mig_active()) {
 626            info->has_disk = true;
 627            info->disk = g_malloc0(sizeof(*info->disk));
 628            info->disk->transferred = blk_mig_bytes_transferred();
 629            info->disk->remaining = blk_mig_bytes_remaining();
 630            info->disk->total = blk_mig_bytes_total();
 631        }
 632
 633        get_xbzrle_cache_stats(info);
 634        break;
 635    case MIGRATION_STATUS_COMPLETED:
 636        get_xbzrle_cache_stats(info);
 637
 638        info->has_status = true;
 639        info->has_total_time = true;
 640        info->total_time = s->total_time;
 641        info->has_downtime = true;
 642        info->downtime = s->downtime;
 643        info->has_setup_time = true;
 644        info->setup_time = s->setup_time;
 645
 646        info->has_ram = true;
 647        info->ram = g_malloc0(sizeof(*info->ram));
 648        info->ram->transferred = ram_bytes_transferred();
 649        info->ram->remaining = 0;
 650        info->ram->total = ram_bytes_total();
 651        info->ram->duplicate = dup_mig_pages_transferred();
 652        info->ram->skipped = skipped_mig_pages_transferred();
 653        info->ram->normal = norm_mig_pages_transferred();
 654        info->ram->normal_bytes = norm_mig_bytes_transferred();
 655        info->ram->mbps = s->mbps;
 656        info->ram->dirty_sync_count = s->dirty_sync_count;
 657        break;
 658    case MIGRATION_STATUS_FAILED:
 659        info->has_status = true;
 660        break;
 661    case MIGRATION_STATUS_CANCELLED:
 662        info->has_status = true;
 663        break;
 664    }
 665    info->status = s->state;
 666
 667    return info;
 668}
 669
 670void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
 671                                  Error **errp)
 672{
 673    MigrationState *s = migrate_get_current();
 674    MigrationCapabilityStatusList *cap;
 675
 676    if (migration_is_setup_or_active(s->state)) {
 677        error_setg(errp, QERR_MIGRATION_ACTIVE);
 678        return;
 679    }
 680
 681    for (cap = params; cap; cap = cap->next) {
 682        s->enabled_capabilities[cap->value->capability] = cap->value->state;
 683    }
 684
 685    if (migrate_postcopy_ram()) {
 686        if (migrate_use_compression()) {
 687            /* The decompression threads asynchronously write into RAM
 688             * rather than use the atomic copies needed to avoid
 689             * userfaulting.  It should be possible to fix the decompression
 690             * threads for compatibility in future.
 691             */
 692            error_report("Postcopy is not currently compatible with "
 693                         "compression");
 694            s->enabled_capabilities[MIGRATION_CAPABILITY_X_POSTCOPY_RAM] =
 695                false;
 696        }
 697    }
 698}
 699
 700void qmp_migrate_set_parameters(bool has_compress_level,
 701                                int64_t compress_level,
 702                                bool has_compress_threads,
 703                                int64_t compress_threads,
 704                                bool has_decompress_threads,
 705                                int64_t decompress_threads,
 706                                bool has_x_cpu_throttle_initial,
 707                                int64_t x_cpu_throttle_initial,
 708                                bool has_x_cpu_throttle_increment,
 709                                int64_t x_cpu_throttle_increment, Error **errp)
 710{
 711    MigrationState *s = migrate_get_current();
 712
 713    if (has_compress_level && (compress_level < 0 || compress_level > 9)) {
 714        error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "compress_level",
 715                   "is invalid, it should be in the range of 0 to 9");
 716        return;
 717    }
 718    if (has_compress_threads &&
 719            (compress_threads < 1 || compress_threads > 255)) {
 720        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
 721                   "compress_threads",
 722                   "is invalid, it should be in the range of 1 to 255");
 723        return;
 724    }
 725    if (has_decompress_threads &&
 726            (decompress_threads < 1 || decompress_threads > 255)) {
 727        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
 728                   "decompress_threads",
 729                   "is invalid, it should be in the range of 1 to 255");
 730        return;
 731    }
 732    if (has_x_cpu_throttle_initial &&
 733            (x_cpu_throttle_initial < 1 || x_cpu_throttle_initial > 99)) {
 734        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
 735                   "x_cpu_throttle_initial",
 736                   "an integer in the range of 1 to 99");
 737    }
 738    if (has_x_cpu_throttle_increment &&
 739            (x_cpu_throttle_increment < 1 || x_cpu_throttle_increment > 99)) {
 740        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
 741                   "x_cpu_throttle_increment",
 742                   "an integer in the range of 1 to 99");
 743    }
 744
 745    if (has_compress_level) {
 746        s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL] = compress_level;
 747    }
 748    if (has_compress_threads) {
 749        s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS] = compress_threads;
 750    }
 751    if (has_decompress_threads) {
 752        s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS] =
 753                                                    decompress_threads;
 754    }
 755    if (has_x_cpu_throttle_initial) {
 756        s->parameters[MIGRATION_PARAMETER_X_CPU_THROTTLE_INITIAL] =
 757                                                    x_cpu_throttle_initial;
 758    }
 759
 760    if (has_x_cpu_throttle_increment) {
 761        s->parameters[MIGRATION_PARAMETER_X_CPU_THROTTLE_INCREMENT] =
 762                                                    x_cpu_throttle_increment;
 763    }
 764}
 765
 766void qmp_migrate_start_postcopy(Error **errp)
 767{
 768    MigrationState *s = migrate_get_current();
 769
 770    if (!migrate_postcopy_ram()) {
 771        error_setg(errp, "Enable postcopy with migrate_set_capability before"
 772                         " the start of migration");
 773        return;
 774    }
 775
 776    if (s->state == MIGRATION_STATUS_NONE) {
 777        error_setg(errp, "Postcopy must be started after migration has been"
 778                         " started");
 779        return;
 780    }
 781    /*
 782     * we don't error if migration has finished since that would be racy
 783     * with issuing this command.
 784     */
 785    atomic_set(&s->start_postcopy, true);
 786}
 787
 788/* shared migration helpers */
 789
 790static void migrate_set_state(MigrationState *s, int old_state, int new_state)
 791{
 792    if (atomic_cmpxchg(&s->state, old_state, new_state) == old_state) {
 793        trace_migrate_set_state(new_state);
 794        migrate_generate_event(new_state);
 795    }
 796}
 797
 798static void migrate_fd_cleanup(void *opaque)
 799{
 800    MigrationState *s = opaque;
 801
 802    qemu_bh_delete(s->cleanup_bh);
 803    s->cleanup_bh = NULL;
 804
 805    flush_page_queue(s);
 806
 807    if (s->file) {
 808        trace_migrate_fd_cleanup();
 809        qemu_mutex_unlock_iothread();
 810        if (s->migration_thread_running) {
 811            qemu_thread_join(&s->thread);
 812            s->migration_thread_running = false;
 813        }
 814        qemu_mutex_lock_iothread();
 815
 816        migrate_compress_threads_join();
 817        qemu_fclose(s->file);
 818        s->file = NULL;
 819    }
 820
 821    assert((s->state != MIGRATION_STATUS_ACTIVE) &&
 822           (s->state != MIGRATION_STATUS_POSTCOPY_ACTIVE));
 823
 824    if (s->state == MIGRATION_STATUS_CANCELLING) {
 825        migrate_set_state(s, MIGRATION_STATUS_CANCELLING,
 826                          MIGRATION_STATUS_CANCELLED);
 827    }
 828
 829    notifier_list_notify(&migration_state_notifiers, s);
 830}
 831
 832void migrate_fd_error(MigrationState *s)
 833{
 834    trace_migrate_fd_error();
 835    assert(s->file == NULL);
 836    migrate_set_state(s, MIGRATION_STATUS_SETUP, MIGRATION_STATUS_FAILED);
 837    notifier_list_notify(&migration_state_notifiers, s);
 838}
 839
 840static void migrate_fd_cancel(MigrationState *s)
 841{
 842    int old_state ;
 843    QEMUFile *f = migrate_get_current()->file;
 844    trace_migrate_fd_cancel();
 845
 846    if (s->rp_state.from_dst_file) {
 847        /* shutdown the rp socket, so causing the rp thread to shutdown */
 848        qemu_file_shutdown(s->rp_state.from_dst_file);
 849    }
 850
 851    do {
 852        old_state = s->state;
 853        if (!migration_is_setup_or_active(old_state)) {
 854            break;
 855        }
 856        migrate_set_state(s, old_state, MIGRATION_STATUS_CANCELLING);
 857    } while (s->state != MIGRATION_STATUS_CANCELLING);
 858
 859    /*
 860     * If we're unlucky the migration code might be stuck somewhere in a
 861     * send/write while the network has failed and is waiting to timeout;
 862     * if we've got shutdown(2) available then we can force it to quit.
 863     * The outgoing qemu file gets closed in migrate_fd_cleanup that is
 864     * called in a bh, so there is no race against this cancel.
 865     */
 866    if (s->state == MIGRATION_STATUS_CANCELLING && f) {
 867        qemu_file_shutdown(f);
 868    }
 869}
 870
 871void add_migration_state_change_notifier(Notifier *notify)
 872{
 873    notifier_list_add(&migration_state_notifiers, notify);
 874}
 875
 876void remove_migration_state_change_notifier(Notifier *notify)
 877{
 878    notifier_remove(notify);
 879}
 880
 881bool migration_in_setup(MigrationState *s)
 882{
 883    return s->state == MIGRATION_STATUS_SETUP;
 884}
 885
 886bool migration_has_finished(MigrationState *s)
 887{
 888    return s->state == MIGRATION_STATUS_COMPLETED;
 889}
 890
 891bool migration_has_failed(MigrationState *s)
 892{
 893    return (s->state == MIGRATION_STATUS_CANCELLED ||
 894            s->state == MIGRATION_STATUS_FAILED);
 895}
 896
 897bool migration_in_postcopy(MigrationState *s)
 898{
 899    return (s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE);
 900}
 901
 902MigrationState *migrate_init(const MigrationParams *params)
 903{
 904    MigrationState *s = migrate_get_current();
 905
 906    /*
 907     * Reinitialise all migration state, except
 908     * parameters/capabilities that the user set, and
 909     * locks.
 910     */
 911    s->bytes_xfer = 0;
 912    s->xfer_limit = 0;
 913    s->cleanup_bh = 0;
 914    s->file = NULL;
 915    s->state = MIGRATION_STATUS_NONE;
 916    s->params = *params;
 917    s->rp_state.from_dst_file = NULL;
 918    s->rp_state.error = false;
 919    s->mbps = 0.0;
 920    s->downtime = 0;
 921    s->expected_downtime = 0;
 922    s->dirty_pages_rate = 0;
 923    s->dirty_bytes_rate = 0;
 924    s->setup_time = 0;
 925    s->dirty_sync_count = 0;
 926    s->start_postcopy = false;
 927    s->migration_thread_running = false;
 928    s->last_req_rb = NULL;
 929
 930    migrate_set_state(s, MIGRATION_STATUS_NONE, MIGRATION_STATUS_SETUP);
 931
 932    QSIMPLEQ_INIT(&s->src_page_requests);
 933
 934    s->total_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
 935    return s;
 936}
 937
 938static GSList *migration_blockers;
 939
 940void migrate_add_blocker(Error *reason)
 941{
 942    migration_blockers = g_slist_prepend(migration_blockers, reason);
 943}
 944
 945void migrate_del_blocker(Error *reason)
 946{
 947    migration_blockers = g_slist_remove(migration_blockers, reason);
 948}
 949
 950void qmp_migrate_incoming(const char *uri, Error **errp)
 951{
 952    Error *local_err = NULL;
 953    static bool once = true;
 954
 955    if (!deferred_incoming) {
 956        error_setg(errp, "For use with '-incoming defer'");
 957        return;
 958    }
 959    if (!once) {
 960        error_setg(errp, "The incoming migration has already been started");
 961    }
 962
 963    qemu_start_incoming_migration(uri, &local_err);
 964
 965    if (local_err) {
 966        error_propagate(errp, local_err);
 967        return;
 968    }
 969
 970    once = false;
 971}
 972
 973void qmp_migrate(const char *uri, bool has_blk, bool blk,
 974                 bool has_inc, bool inc, bool has_detach, bool detach,
 975                 Error **errp)
 976{
 977    Error *local_err = NULL;
 978    MigrationState *s = migrate_get_current();
 979    MigrationParams params;
 980    const char *p;
 981
 982    params.blk = has_blk && blk;
 983    params.shared = has_inc && inc;
 984
 985    if (migration_is_setup_or_active(s->state) ||
 986        s->state == MIGRATION_STATUS_CANCELLING) {
 987        error_setg(errp, QERR_MIGRATION_ACTIVE);
 988        return;
 989    }
 990    if (runstate_check(RUN_STATE_INMIGRATE)) {
 991        error_setg(errp, "Guest is waiting for an incoming migration");
 992        return;
 993    }
 994
 995    if (qemu_savevm_state_blocked(errp)) {
 996        return;
 997    }
 998
 999    if (migration_blockers) {
1000        *errp = error_copy(migration_blockers->data);
1001        return;
1002    }
1003
1004    /* We are starting a new migration, so we want to start in a clean
1005       state.  This change is only needed if previous migration
1006       failed/was cancelled.  We don't use migrate_set_state() because
1007       we are setting the initial state, not changing it. */
1008    s->state = MIGRATION_STATUS_NONE;
1009
1010    s = migrate_init(&params);
1011
1012    if (strstart(uri, "tcp:", &p)) {
1013        tcp_start_outgoing_migration(s, p, &local_err);
1014#ifdef CONFIG_RDMA
1015    } else if (strstart(uri, "rdma:", &p)) {
1016        rdma_start_outgoing_migration(s, p, &local_err);
1017#endif
1018#if !defined(WIN32)
1019    } else if (strstart(uri, "exec:", &p)) {
1020        exec_start_outgoing_migration(s, p, &local_err);
1021    } else if (strstart(uri, "unix:", &p)) {
1022        unix_start_outgoing_migration(s, p, &local_err);
1023    } else if (strstart(uri, "fd:", &p)) {
1024        fd_start_outgoing_migration(s, p, &local_err);
1025#endif
1026    } else {
1027        error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "uri",
1028                   "a valid migration protocol");
1029        migrate_set_state(s, MIGRATION_STATUS_SETUP, MIGRATION_STATUS_FAILED);
1030        return;
1031    }
1032
1033    if (local_err) {
1034        migrate_fd_error(s);
1035        error_propagate(errp, local_err);
1036        return;
1037    }
1038}
1039
1040void qmp_migrate_cancel(Error **errp)
1041{
1042    migrate_fd_cancel(migrate_get_current());
1043}
1044
1045void qmp_migrate_set_cache_size(int64_t value, Error **errp)
1046{
1047    MigrationState *s = migrate_get_current();
1048    int64_t new_size;
1049
1050    /* Check for truncation */
1051    if (value != (size_t)value) {
1052        error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "cache size",
1053                   "exceeding address space");
1054        return;
1055    }
1056
1057    /* Cache should not be larger than guest ram size */
1058    if (value > ram_bytes_total()) {
1059        error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "cache size",
1060                   "exceeds guest ram size ");
1061        return;
1062    }
1063
1064    new_size = xbzrle_cache_resize(value);
1065    if (new_size < 0) {
1066        error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "cache size",
1067                   "is smaller than page size");
1068        return;
1069    }
1070
1071    s->xbzrle_cache_size = new_size;
1072}
1073
1074int64_t qmp_query_migrate_cache_size(Error **errp)
1075{
1076    return migrate_xbzrle_cache_size();
1077}
1078
1079void qmp_migrate_set_speed(int64_t value, Error **errp)
1080{
1081    MigrationState *s;
1082
1083    if (value < 0) {
1084        value = 0;
1085    }
1086    if (value > SIZE_MAX) {
1087        value = SIZE_MAX;
1088    }
1089
1090    s = migrate_get_current();
1091    s->bandwidth_limit = value;
1092    if (s->file) {
1093        qemu_file_set_rate_limit(s->file, s->bandwidth_limit / XFER_LIMIT_RATIO);
1094    }
1095}
1096
1097void qmp_migrate_set_downtime(double value, Error **errp)
1098{
1099    value *= 1e9;
1100    value = MAX(0, MIN(UINT64_MAX, value));
1101    max_downtime = (uint64_t)value;
1102}
1103
1104bool migrate_postcopy_ram(void)
1105{
1106    MigrationState *s;
1107
1108    s = migrate_get_current();
1109
1110    return s->enabled_capabilities[MIGRATION_CAPABILITY_X_POSTCOPY_RAM];
1111}
1112
1113bool migrate_auto_converge(void)
1114{
1115    MigrationState *s;
1116
1117    s = migrate_get_current();
1118
1119    return s->enabled_capabilities[MIGRATION_CAPABILITY_AUTO_CONVERGE];
1120}
1121
1122bool migrate_zero_blocks(void)
1123{
1124    MigrationState *s;
1125
1126    s = migrate_get_current();
1127
1128    return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS];
1129}
1130
1131bool migrate_use_compression(void)
1132{
1133    MigrationState *s;
1134
1135    s = migrate_get_current();
1136
1137    return s->enabled_capabilities[MIGRATION_CAPABILITY_COMPRESS];
1138}
1139
1140int migrate_compress_level(void)
1141{
1142    MigrationState *s;
1143
1144    s = migrate_get_current();
1145
1146    return s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL];
1147}
1148
1149int migrate_compress_threads(void)
1150{
1151    MigrationState *s;
1152
1153    s = migrate_get_current();
1154
1155    return s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS];
1156}
1157
1158int migrate_decompress_threads(void)
1159{
1160    MigrationState *s;
1161
1162    s = migrate_get_current();
1163
1164    return s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS];
1165}
1166
1167bool migrate_use_events(void)
1168{
1169    MigrationState *s;
1170
1171    s = migrate_get_current();
1172
1173    return s->enabled_capabilities[MIGRATION_CAPABILITY_EVENTS];
1174}
1175
1176int migrate_use_xbzrle(void)
1177{
1178    MigrationState *s;
1179
1180    s = migrate_get_current();
1181
1182    return s->enabled_capabilities[MIGRATION_CAPABILITY_XBZRLE];
1183}
1184
1185int64_t migrate_xbzrle_cache_size(void)
1186{
1187    MigrationState *s;
1188
1189    s = migrate_get_current();
1190
1191    return s->xbzrle_cache_size;
1192}
1193
1194/* migration thread support */
1195/*
1196 * Something bad happened to the RP stream, mark an error
1197 * The caller shall print or trace something to indicate why
1198 */
1199static void mark_source_rp_bad(MigrationState *s)
1200{
1201    s->rp_state.error = true;
1202}
1203
1204static struct rp_cmd_args {
1205    ssize_t     len; /* -1 = variable */
1206    const char *name;
1207} rp_cmd_args[] = {
1208    [MIG_RP_MSG_INVALID]        = { .len = -1, .name = "INVALID" },
1209    [MIG_RP_MSG_SHUT]           = { .len =  4, .name = "SHUT" },
1210    [MIG_RP_MSG_PONG]           = { .len =  4, .name = "PONG" },
1211    [MIG_RP_MSG_REQ_PAGES]      = { .len = 12, .name = "REQ_PAGES" },
1212    [MIG_RP_MSG_REQ_PAGES_ID]   = { .len = -1, .name = "REQ_PAGES_ID" },
1213    [MIG_RP_MSG_MAX]            = { .len = -1, .name = "MAX" },
1214};
1215
1216/*
1217 * Process a request for pages received on the return path,
1218 * We're allowed to send more than requested (e.g. to round to our page size)
1219 * and we don't need to send pages that have already been sent.
1220 */
1221static void migrate_handle_rp_req_pages(MigrationState *ms, const char* rbname,
1222                                       ram_addr_t start, size_t len)
1223{
1224    long our_host_ps = getpagesize();
1225
1226    trace_migrate_handle_rp_req_pages(rbname, start, len);
1227
1228    /*
1229     * Since we currently insist on matching page sizes, just sanity check
1230     * we're being asked for whole host pages.
1231     */
1232    if (start & (our_host_ps-1) ||
1233       (len & (our_host_ps-1))) {
1234        error_report("%s: Misaligned page request, start: " RAM_ADDR_FMT
1235                     " len: %zd", __func__, start, len);
1236        mark_source_rp_bad(ms);
1237        return;
1238    }
1239
1240    if (ram_save_queue_pages(ms, rbname, start, len)) {
1241        mark_source_rp_bad(ms);
1242    }
1243}
1244
1245/*
1246 * Handles messages sent on the return path towards the source VM
1247 *
1248 */
1249static void *source_return_path_thread(void *opaque)
1250{
1251    MigrationState *ms = opaque;
1252    QEMUFile *rp = ms->rp_state.from_dst_file;
1253    uint16_t header_len, header_type;
1254    const int max_len = 512;
1255    uint8_t buf[max_len];
1256    uint32_t tmp32, sibling_error;
1257    ram_addr_t start = 0; /* =0 to silence warning */
1258    size_t  len = 0, expected_len;
1259    int res;
1260
1261    trace_source_return_path_thread_entry();
1262    while (!ms->rp_state.error && !qemu_file_get_error(rp) &&
1263           migration_is_setup_or_active(ms->state)) {
1264        trace_source_return_path_thread_loop_top();
1265        header_type = qemu_get_be16(rp);
1266        header_len = qemu_get_be16(rp);
1267
1268        if (header_type >= MIG_RP_MSG_MAX ||
1269            header_type == MIG_RP_MSG_INVALID) {
1270            error_report("RP: Received invalid message 0x%04x length 0x%04x",
1271                    header_type, header_len);
1272            mark_source_rp_bad(ms);
1273            goto out;
1274        }
1275
1276        if ((rp_cmd_args[header_type].len != -1 &&
1277            header_len != rp_cmd_args[header_type].len) ||
1278            header_len > max_len) {
1279            error_report("RP: Received '%s' message (0x%04x) with"
1280                    "incorrect length %d expecting %zu",
1281                    rp_cmd_args[header_type].name, header_type, header_len,
1282                    (size_t)rp_cmd_args[header_type].len);
1283            mark_source_rp_bad(ms);
1284            goto out;
1285        }
1286
1287        /* We know we've got a valid header by this point */
1288        res = qemu_get_buffer(rp, buf, header_len);
1289        if (res != header_len) {
1290            error_report("RP: Failed reading data for message 0x%04x"
1291                         " read %d expected %d",
1292                         header_type, res, header_len);
1293            mark_source_rp_bad(ms);
1294            goto out;
1295        }
1296
1297        /* OK, we have the message and the data */
1298        switch (header_type) {
1299        case MIG_RP_MSG_SHUT:
1300            sibling_error = be32_to_cpup((uint32_t *)buf);
1301            trace_source_return_path_thread_shut(sibling_error);
1302            if (sibling_error) {
1303                error_report("RP: Sibling indicated error %d", sibling_error);
1304                mark_source_rp_bad(ms);
1305            }
1306            /*
1307             * We'll let the main thread deal with closing the RP
1308             * we could do a shutdown(2) on it, but we're the only user
1309             * anyway, so there's nothing gained.
1310             */
1311            goto out;
1312
1313        case MIG_RP_MSG_PONG:
1314            tmp32 = be32_to_cpup((uint32_t *)buf);
1315            trace_source_return_path_thread_pong(tmp32);
1316            break;
1317
1318        case MIG_RP_MSG_REQ_PAGES:
1319            start = be64_to_cpup((uint64_t *)buf);
1320            len = be32_to_cpup((uint32_t *)(buf + 8));
1321            migrate_handle_rp_req_pages(ms, NULL, start, len);
1322            break;
1323
1324        case MIG_RP_MSG_REQ_PAGES_ID:
1325            expected_len = 12 + 1; /* header + termination */
1326
1327            if (header_len >= expected_len) {
1328                start = be64_to_cpup((uint64_t *)buf);
1329                len = be32_to_cpup((uint32_t *)(buf + 8));
1330                /* Now we expect an idstr */
1331                tmp32 = buf[12]; /* Length of the following idstr */
1332                buf[13 + tmp32] = '\0';
1333                expected_len += tmp32;
1334            }
1335            if (header_len != expected_len) {
1336                error_report("RP: Req_Page_id with length %d expecting %zd",
1337                        header_len, expected_len);
1338                mark_source_rp_bad(ms);
1339                goto out;
1340            }
1341            migrate_handle_rp_req_pages(ms, (char *)&buf[13], start, len);
1342            break;
1343
1344        default:
1345            break;
1346        }
1347    }
1348    if (qemu_file_get_error(rp)) {
1349        trace_source_return_path_thread_bad_end();
1350        mark_source_rp_bad(ms);
1351    }
1352
1353    trace_source_return_path_thread_end();
1354out:
1355    ms->rp_state.from_dst_file = NULL;
1356    qemu_fclose(rp);
1357    return NULL;
1358}
1359
1360static int open_return_path_on_source(MigrationState *ms)
1361{
1362
1363    ms->rp_state.from_dst_file = qemu_file_get_return_path(ms->file);
1364    if (!ms->rp_state.from_dst_file) {
1365        return -1;
1366    }
1367
1368    trace_open_return_path_on_source();
1369    qemu_thread_create(&ms->rp_state.rp_thread, "return path",
1370                       source_return_path_thread, ms, QEMU_THREAD_JOINABLE);
1371
1372    trace_open_return_path_on_source_continue();
1373
1374    return 0;
1375}
1376
1377/* Returns 0 if the RP was ok, otherwise there was an error on the RP */
1378static int await_return_path_close_on_source(MigrationState *ms)
1379{
1380    /*
1381     * If this is a normal exit then the destination will send a SHUT and the
1382     * rp_thread will exit, however if there's an error we need to cause
1383     * it to exit.
1384     */
1385    if (qemu_file_get_error(ms->file) && ms->rp_state.from_dst_file) {
1386        /*
1387         * shutdown(2), if we have it, will cause it to unblock if it's stuck
1388         * waiting for the destination.
1389         */
1390        qemu_file_shutdown(ms->rp_state.from_dst_file);
1391        mark_source_rp_bad(ms);
1392    }
1393    trace_await_return_path_close_on_source_joining();
1394    qemu_thread_join(&ms->rp_state.rp_thread);
1395    trace_await_return_path_close_on_source_close();
1396    return ms->rp_state.error;
1397}
1398
1399/*
1400 * Switch from normal iteration to postcopy
1401 * Returns non-0 on error
1402 */
1403static int postcopy_start(MigrationState *ms, bool *old_vm_running)
1404{
1405    int ret;
1406    const QEMUSizedBuffer *qsb;
1407    int64_t time_at_stop = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1408    migrate_set_state(ms, MIGRATION_STATUS_ACTIVE,
1409                      MIGRATION_STATUS_POSTCOPY_ACTIVE);
1410
1411    trace_postcopy_start();
1412    qemu_mutex_lock_iothread();
1413    trace_postcopy_start_set_run();
1414
1415    qemu_system_wakeup_request(QEMU_WAKEUP_REASON_OTHER);
1416    *old_vm_running = runstate_is_running();
1417    global_state_store();
1418    ret = vm_stop_force_state(RUN_STATE_FINISH_MIGRATE);
1419
1420    if (ret < 0) {
1421        goto fail;
1422    }
1423
1424    /*
1425     * Cause any non-postcopiable, but iterative devices to
1426     * send out their final data.
1427     */
1428    qemu_savevm_state_complete_precopy(ms->file, true);
1429
1430    /*
1431     * in Finish migrate and with the io-lock held everything should
1432     * be quiet, but we've potentially still got dirty pages and we
1433     * need to tell the destination to throw any pages it's already received
1434     * that are dirty
1435     */
1436    if (ram_postcopy_send_discard_bitmap(ms)) {
1437        error_report("postcopy send discard bitmap failed");
1438        goto fail;
1439    }
1440
1441    /*
1442     * send rest of state - note things that are doing postcopy
1443     * will notice we're in POSTCOPY_ACTIVE and not actually
1444     * wrap their state up here
1445     */
1446    qemu_file_set_rate_limit(ms->file, INT64_MAX);
1447    /* Ping just for debugging, helps line traces up */
1448    qemu_savevm_send_ping(ms->file, 2);
1449
1450    /*
1451     * While loading the device state we may trigger page transfer
1452     * requests and the fd must be free to process those, and thus
1453     * the destination must read the whole device state off the fd before
1454     * it starts processing it.  Unfortunately the ad-hoc migration format
1455     * doesn't allow the destination to know the size to read without fully
1456     * parsing it through each devices load-state code (especially the open
1457     * coded devices that use get/put).
1458     * So we wrap the device state up in a package with a length at the start;
1459     * to do this we use a qemu_buf to hold the whole of the device state.
1460     */
1461    QEMUFile *fb = qemu_bufopen("w", NULL);
1462    if (!fb) {
1463        error_report("Failed to create buffered file");
1464        goto fail;
1465    }
1466
1467    /*
1468     * Make sure the receiver can get incoming pages before we send the rest
1469     * of the state
1470     */
1471    qemu_savevm_send_postcopy_listen(fb);
1472
1473    qemu_savevm_state_complete_precopy(fb, false);
1474    qemu_savevm_send_ping(fb, 3);
1475
1476    qemu_savevm_send_postcopy_run(fb);
1477
1478    /* <><> end of stuff going into the package */
1479    qsb = qemu_buf_get(fb);
1480
1481    /* Now send that blob */
1482    if (qemu_savevm_send_packaged(ms->file, qsb)) {
1483        goto fail_closefb;
1484    }
1485    qemu_fclose(fb);
1486    ms->downtime =  qemu_clock_get_ms(QEMU_CLOCK_REALTIME) - time_at_stop;
1487
1488    qemu_mutex_unlock_iothread();
1489
1490    /*
1491     * Although this ping is just for debug, it could potentially be
1492     * used for getting a better measurement of downtime at the source.
1493     */
1494    qemu_savevm_send_ping(ms->file, 4);
1495
1496    ret = qemu_file_get_error(ms->file);
1497    if (ret) {
1498        error_report("postcopy_start: Migration stream errored");
1499        migrate_set_state(ms, MIGRATION_STATUS_POSTCOPY_ACTIVE,
1500                              MIGRATION_STATUS_FAILED);
1501    }
1502
1503    return ret;
1504
1505fail_closefb:
1506    qemu_fclose(fb);
1507fail:
1508    migrate_set_state(ms, MIGRATION_STATUS_POSTCOPY_ACTIVE,
1509                          MIGRATION_STATUS_FAILED);
1510    qemu_mutex_unlock_iothread();
1511    return -1;
1512}
1513
1514/**
1515 * migration_completion: Used by migration_thread when there's not much left.
1516 *   The caller 'breaks' the loop when this returns.
1517 *
1518 * @s: Current migration state
1519 * @current_active_state: The migration state we expect to be in
1520 * @*old_vm_running: Pointer to old_vm_running flag
1521 * @*start_time: Pointer to time to update
1522 */
1523static void migration_completion(MigrationState *s, int current_active_state,
1524                                 bool *old_vm_running,
1525                                 int64_t *start_time)
1526{
1527    int ret;
1528
1529    if (s->state == MIGRATION_STATUS_ACTIVE) {
1530        qemu_mutex_lock_iothread();
1531        *start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1532        qemu_system_wakeup_request(QEMU_WAKEUP_REASON_OTHER);
1533        *old_vm_running = runstate_is_running();
1534        ret = global_state_store();
1535
1536        if (!ret) {
1537            ret = vm_stop_force_state(RUN_STATE_FINISH_MIGRATE);
1538            if (ret >= 0) {
1539                qemu_file_set_rate_limit(s->file, INT64_MAX);
1540                qemu_savevm_state_complete_precopy(s->file, false);
1541            }
1542        }
1543        qemu_mutex_unlock_iothread();
1544
1545        if (ret < 0) {
1546            goto fail;
1547        }
1548    } else if (s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
1549        trace_migration_completion_postcopy_end();
1550
1551        qemu_savevm_state_complete_postcopy(s->file);
1552        trace_migration_completion_postcopy_end_after_complete();
1553    }
1554
1555    /*
1556     * If rp was opened we must clean up the thread before
1557     * cleaning everything else up (since if there are no failures
1558     * it will wait for the destination to send it's status in
1559     * a SHUT command).
1560     * Postcopy opens rp if enabled (even if it's not avtivated)
1561     */
1562    if (migrate_postcopy_ram()) {
1563        int rp_error;
1564        trace_migration_completion_postcopy_end_before_rp();
1565        rp_error = await_return_path_close_on_source(s);
1566        trace_migration_completion_postcopy_end_after_rp(rp_error);
1567        if (rp_error) {
1568            goto fail;
1569        }
1570    }
1571
1572    if (qemu_file_get_error(s->file)) {
1573        trace_migration_completion_file_err();
1574        goto fail;
1575    }
1576
1577    migrate_set_state(s, current_active_state, MIGRATION_STATUS_COMPLETED);
1578    return;
1579
1580fail:
1581    migrate_set_state(s, current_active_state, MIGRATION_STATUS_FAILED);
1582}
1583
1584/*
1585 * Master migration thread on the source VM.
1586 * It drives the migration and pumps the data down the outgoing channel.
1587 */
1588static void *migration_thread(void *opaque)
1589{
1590    MigrationState *s = opaque;
1591    /* Used by the bandwidth calcs, updated later */
1592    int64_t initial_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1593    int64_t setup_start = qemu_clock_get_ms(QEMU_CLOCK_HOST);
1594    int64_t initial_bytes = 0;
1595    int64_t max_size = 0;
1596    int64_t start_time = initial_time;
1597    int64_t end_time;
1598    bool old_vm_running = false;
1599    bool entered_postcopy = false;
1600    /* The active state we expect to be in; ACTIVE or POSTCOPY_ACTIVE */
1601    enum MigrationStatus current_active_state = MIGRATION_STATUS_ACTIVE;
1602
1603    rcu_register_thread();
1604
1605    qemu_savevm_state_header(s->file);
1606
1607    if (migrate_postcopy_ram()) {
1608        /* Now tell the dest that it should open its end so it can reply */
1609        qemu_savevm_send_open_return_path(s->file);
1610
1611        /* And do a ping that will make stuff easier to debug */
1612        qemu_savevm_send_ping(s->file, 1);
1613
1614        /*
1615         * Tell the destination that we *might* want to do postcopy later;
1616         * if the other end can't do postcopy it should fail now, nice and
1617         * early.
1618         */
1619        qemu_savevm_send_postcopy_advise(s->file);
1620    }
1621
1622    qemu_savevm_state_begin(s->file, &s->params);
1623
1624    s->setup_time = qemu_clock_get_ms(QEMU_CLOCK_HOST) - setup_start;
1625    current_active_state = MIGRATION_STATUS_ACTIVE;
1626    migrate_set_state(s, MIGRATION_STATUS_SETUP, MIGRATION_STATUS_ACTIVE);
1627
1628    trace_migration_thread_setup_complete();
1629
1630    while (s->state == MIGRATION_STATUS_ACTIVE ||
1631           s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
1632        int64_t current_time;
1633        uint64_t pending_size;
1634
1635        if (!qemu_file_rate_limit(s->file)) {
1636            uint64_t pend_post, pend_nonpost;
1637
1638            qemu_savevm_state_pending(s->file, max_size, &pend_nonpost,
1639                                      &pend_post);
1640            pending_size = pend_nonpost + pend_post;
1641            trace_migrate_pending(pending_size, max_size,
1642                                  pend_post, pend_nonpost);
1643            if (pending_size && pending_size >= max_size) {
1644                /* Still a significant amount to transfer */
1645
1646                if (migrate_postcopy_ram() &&
1647                    s->state != MIGRATION_STATUS_POSTCOPY_ACTIVE &&
1648                    pend_nonpost <= max_size &&
1649                    atomic_read(&s->start_postcopy)) {
1650
1651                    if (!postcopy_start(s, &old_vm_running)) {
1652                        current_active_state = MIGRATION_STATUS_POSTCOPY_ACTIVE;
1653                        entered_postcopy = true;
1654                    }
1655
1656                    continue;
1657                }
1658                /* Just another iteration step */
1659                qemu_savevm_state_iterate(s->file, entered_postcopy);
1660            } else {
1661                trace_migration_thread_low_pending(pending_size);
1662                migration_completion(s, current_active_state,
1663                                     &old_vm_running, &start_time);
1664                break;
1665            }
1666        }
1667
1668        if (qemu_file_get_error(s->file)) {
1669            migrate_set_state(s, current_active_state, MIGRATION_STATUS_FAILED);
1670            trace_migration_thread_file_err();
1671            break;
1672        }
1673        current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1674        if (current_time >= initial_time + BUFFER_DELAY) {
1675            uint64_t transferred_bytes = qemu_ftell(s->file) - initial_bytes;
1676            uint64_t time_spent = current_time - initial_time;
1677            double bandwidth = (double)transferred_bytes / time_spent;
1678            max_size = bandwidth * migrate_max_downtime() / 1000000;
1679
1680            s->mbps = time_spent ? (((double) transferred_bytes * 8.0) /
1681                    ((double) time_spent / 1000.0)) / 1000.0 / 1000.0 : -1;
1682
1683            trace_migrate_transferred(transferred_bytes, time_spent,
1684                                      bandwidth, max_size);
1685            /* if we haven't sent anything, we don't want to recalculate
1686               10000 is a small enough number for our purposes */
1687            if (s->dirty_bytes_rate && transferred_bytes > 10000) {
1688                s->expected_downtime = s->dirty_bytes_rate / bandwidth;
1689            }
1690
1691            qemu_file_reset_rate_limit(s->file);
1692            initial_time = current_time;
1693            initial_bytes = qemu_ftell(s->file);
1694        }
1695        if (qemu_file_rate_limit(s->file)) {
1696            /* usleep expects microseconds */
1697            g_usleep((initial_time + BUFFER_DELAY - current_time)*1000);
1698        }
1699    }
1700
1701    trace_migration_thread_after_loop();
1702    /* If we enabled cpu throttling for auto-converge, turn it off. */
1703    cpu_throttle_stop();
1704    end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1705
1706    qemu_mutex_lock_iothread();
1707    qemu_savevm_state_cleanup();
1708    if (s->state == MIGRATION_STATUS_COMPLETED) {
1709        uint64_t transferred_bytes = qemu_ftell(s->file);
1710        s->total_time = end_time - s->total_time;
1711        if (!entered_postcopy) {
1712            s->downtime = end_time - start_time;
1713        }
1714        if (s->total_time) {
1715            s->mbps = (((double) transferred_bytes * 8.0) /
1716                       ((double) s->total_time)) / 1000;
1717        }
1718        runstate_set(RUN_STATE_POSTMIGRATE);
1719    } else {
1720        if (old_vm_running && !entered_postcopy) {
1721            vm_start();
1722        }
1723    }
1724    qemu_bh_schedule(s->cleanup_bh);
1725    qemu_mutex_unlock_iothread();
1726
1727    rcu_unregister_thread();
1728    return NULL;
1729}
1730
1731void migrate_fd_connect(MigrationState *s)
1732{
1733    /* This is a best 1st approximation. ns to ms */
1734    s->expected_downtime = max_downtime/1000000;
1735    s->cleanup_bh = qemu_bh_new(migrate_fd_cleanup, s);
1736
1737    qemu_file_set_rate_limit(s->file,
1738                             s->bandwidth_limit / XFER_LIMIT_RATIO);
1739
1740    /* Notify before starting migration thread */
1741    notifier_list_notify(&migration_state_notifiers, s);
1742
1743    /*
1744     * Open the return path; currently for postcopy but other things might
1745     * also want it.
1746     */
1747    if (migrate_postcopy_ram()) {
1748        if (open_return_path_on_source(s)) {
1749            error_report("Unable to open return-path for postcopy");
1750            migrate_set_state(s, MIGRATION_STATUS_SETUP,
1751                              MIGRATION_STATUS_FAILED);
1752            migrate_fd_cleanup(s);
1753            return;
1754        }
1755    }
1756
1757    migrate_compress_threads_create();
1758    qemu_thread_create(&s->thread, "migration", migration_thread, s,
1759                       QEMU_THREAD_JOINABLE);
1760    s->migration_thread_running = true;
1761}
1762
1763PostcopyState  postcopy_state_get(void)
1764{
1765    return atomic_mb_read(&incoming_postcopy_state);
1766}
1767
1768/* Set the state and return the old state */
1769PostcopyState postcopy_state_set(PostcopyState new_state)
1770{
1771    return atomic_xchg(&incoming_postcopy_state, new_state);
1772}
1773
1774