qemu/migration/ram.c
<<
>>
Prefs
   1/*
   2 * QEMU System Emulator
   3 *
   4 * Copyright (c) 2003-2008 Fabrice Bellard
   5 * Copyright (c) 2011-2015 Red Hat Inc
   6 *
   7 * Authors:
   8 *  Juan Quintela <quintela@redhat.com>
   9 *
  10 * Permission is hereby granted, free of charge, to any person obtaining a copy
  11 * of this software and associated documentation files (the "Software"), to deal
  12 * in the Software without restriction, including without limitation the rights
  13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  14 * copies of the Software, and to permit persons to whom the Software is
  15 * furnished to do so, subject to the following conditions:
  16 *
  17 * The above copyright notice and this permission notice shall be included in
  18 * all copies or substantial portions of the Software.
  19 *
  20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  23 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  26 * THE SOFTWARE.
  27 */
  28
  29#include "qemu/osdep.h"
  30#include "cpu.h"
  31#include <zlib.h>
  32#include "qemu/cutils.h"
  33#include "qemu/bitops.h"
  34#include "qemu/bitmap.h"
  35#include "qemu/main-loop.h"
  36#include "qemu/pmem.h"
  37#include "xbzrle.h"
  38#include "ram.h"
  39#include "migration.h"
  40#include "socket.h"
  41#include "migration/register.h"
  42#include "migration/misc.h"
  43#include "qemu-file.h"
  44#include "postcopy-ram.h"
  45#include "page_cache.h"
  46#include "qemu/error-report.h"
  47#include "qapi/error.h"
  48#include "qapi/qapi-events-migration.h"
  49#include "qapi/qmp/qerror.h"
  50#include "trace.h"
  51#include "exec/ram_addr.h"
  52#include "exec/target_page.h"
  53#include "qemu/rcu_queue.h"
  54#include "migration/colo.h"
  55#include "block.h"
  56#include "sysemu/sysemu.h"
  57#include "qemu/uuid.h"
  58#include "savevm.h"
  59#include "qemu/iov.h"
  60
  61/***********************************************************/
  62/* ram save/restore */
  63
  64/* RAM_SAVE_FLAG_ZERO used to be named RAM_SAVE_FLAG_COMPRESS, it
  65 * worked for pages that where filled with the same char.  We switched
  66 * it to only search for the zero value.  And to avoid confusion with
  67 * RAM_SSAVE_FLAG_COMPRESS_PAGE just rename it.
  68 */
  69
  70#define RAM_SAVE_FLAG_FULL     0x01 /* Obsolete, not used anymore */
  71#define RAM_SAVE_FLAG_ZERO     0x02
  72#define RAM_SAVE_FLAG_MEM_SIZE 0x04
  73#define RAM_SAVE_FLAG_PAGE     0x08
  74#define RAM_SAVE_FLAG_EOS      0x10
  75#define RAM_SAVE_FLAG_CONTINUE 0x20
  76#define RAM_SAVE_FLAG_XBZRLE   0x40
  77/* 0x80 is reserved in migration.h start with 0x100 next */
  78#define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
  79
  80static inline bool is_zero_range(uint8_t *p, uint64_t size)
  81{
  82    return buffer_is_zero(p, size);
  83}
  84
  85XBZRLECacheStats xbzrle_counters;
  86
  87/* struct contains XBZRLE cache and a static page
  88   used by the compression */
  89static struct {
  90    /* buffer used for XBZRLE encoding */
  91    uint8_t *encoded_buf;
  92    /* buffer for storing page content */
  93    uint8_t *current_buf;
  94    /* Cache for XBZRLE, Protected by lock. */
  95    PageCache *cache;
  96    QemuMutex lock;
  97    /* it will store a page full of zeros */
  98    uint8_t *zero_target_page;
  99    /* buffer used for XBZRLE decoding */
 100    uint8_t *decoded_buf;
 101} XBZRLE;
 102
 103static void XBZRLE_cache_lock(void)
 104{
 105    if (migrate_use_xbzrle())
 106        qemu_mutex_lock(&XBZRLE.lock);
 107}
 108
 109static void XBZRLE_cache_unlock(void)
 110{
 111    if (migrate_use_xbzrle())
 112        qemu_mutex_unlock(&XBZRLE.lock);
 113}
 114
 115/**
 116 * xbzrle_cache_resize: resize the xbzrle cache
 117 *
 118 * This function is called from qmp_migrate_set_cache_size in main
 119 * thread, possibly while a migration is in progress.  A running
 120 * migration may be using the cache and might finish during this call,
 121 * hence changes to the cache are protected by XBZRLE.lock().
 122 *
 123 * Returns 0 for success or -1 for error
 124 *
 125 * @new_size: new cache size
 126 * @errp: set *errp if the check failed, with reason
 127 */
 128int xbzrle_cache_resize(int64_t new_size, Error **errp)
 129{
 130    PageCache *new_cache;
 131    int64_t ret = 0;
 132
 133    /* Check for truncation */
 134    if (new_size != (size_t)new_size) {
 135        error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "cache size",
 136                   "exceeding address space");
 137        return -1;
 138    }
 139
 140    if (new_size == migrate_xbzrle_cache_size()) {
 141        /* nothing to do */
 142        return 0;
 143    }
 144
 145    XBZRLE_cache_lock();
 146
 147    if (XBZRLE.cache != NULL) {
 148        new_cache = cache_init(new_size, TARGET_PAGE_SIZE, errp);
 149        if (!new_cache) {
 150            ret = -1;
 151            goto out;
 152        }
 153
 154        cache_fini(XBZRLE.cache);
 155        XBZRLE.cache = new_cache;
 156    }
 157out:
 158    XBZRLE_cache_unlock();
 159    return ret;
 160}
 161
 162static bool ramblock_is_ignored(RAMBlock *block)
 163{
 164    return !qemu_ram_is_migratable(block) ||
 165           (migrate_ignore_shared() && qemu_ram_is_shared(block));
 166}
 167
 168/* Should be holding either ram_list.mutex, or the RCU lock. */
 169#define RAMBLOCK_FOREACH_NOT_IGNORED(block)            \
 170    INTERNAL_RAMBLOCK_FOREACH(block)                   \
 171        if (ramblock_is_ignored(block)) {} else
 172
 173#define RAMBLOCK_FOREACH_MIGRATABLE(block)             \
 174    INTERNAL_RAMBLOCK_FOREACH(block)                   \
 175        if (!qemu_ram_is_migratable(block)) {} else
 176
 177#undef RAMBLOCK_FOREACH
 178
 179int foreach_not_ignored_block(RAMBlockIterFunc func, void *opaque)
 180{
 181    RAMBlock *block;
 182    int ret = 0;
 183
 184    rcu_read_lock();
 185    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
 186        ret = func(block, opaque);
 187        if (ret) {
 188            break;
 189        }
 190    }
 191    rcu_read_unlock();
 192    return ret;
 193}
 194
 195static void ramblock_recv_map_init(void)
 196{
 197    RAMBlock *rb;
 198
 199    RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
 200        assert(!rb->receivedmap);
 201        rb->receivedmap = bitmap_new(rb->max_length >> qemu_target_page_bits());
 202    }
 203}
 204
 205int ramblock_recv_bitmap_test(RAMBlock *rb, void *host_addr)
 206{
 207    return test_bit(ramblock_recv_bitmap_offset(host_addr, rb),
 208                    rb->receivedmap);
 209}
 210
 211bool ramblock_recv_bitmap_test_byte_offset(RAMBlock *rb, uint64_t byte_offset)
 212{
 213    return test_bit(byte_offset >> TARGET_PAGE_BITS, rb->receivedmap);
 214}
 215
 216void ramblock_recv_bitmap_set(RAMBlock *rb, void *host_addr)
 217{
 218    set_bit_atomic(ramblock_recv_bitmap_offset(host_addr, rb), rb->receivedmap);
 219}
 220
 221void ramblock_recv_bitmap_set_range(RAMBlock *rb, void *host_addr,
 222                                    size_t nr)
 223{
 224    bitmap_set_atomic(rb->receivedmap,
 225                      ramblock_recv_bitmap_offset(host_addr, rb),
 226                      nr);
 227}
 228
 229#define  RAMBLOCK_RECV_BITMAP_ENDING  (0x0123456789abcdefULL)
 230
 231/*
 232 * Format: bitmap_size (8 bytes) + whole_bitmap (N bytes).
 233 *
 234 * Returns >0 if success with sent bytes, or <0 if error.
 235 */
 236int64_t ramblock_recv_bitmap_send(QEMUFile *file,
 237                                  const char *block_name)
 238{
 239    RAMBlock *block = qemu_ram_block_by_name(block_name);
 240    unsigned long *le_bitmap, nbits;
 241    uint64_t size;
 242
 243    if (!block) {
 244        error_report("%s: invalid block name: %s", __func__, block_name);
 245        return -1;
 246    }
 247
 248    nbits = block->used_length >> TARGET_PAGE_BITS;
 249
 250    /*
 251     * Make sure the tmp bitmap buffer is big enough, e.g., on 32bit
 252     * machines we may need 4 more bytes for padding (see below
 253     * comment). So extend it a bit before hand.
 254     */
 255    le_bitmap = bitmap_new(nbits + BITS_PER_LONG);
 256
 257    /*
 258     * Always use little endian when sending the bitmap. This is
 259     * required that when source and destination VMs are not using the
 260     * same endianess. (Note: big endian won't work.)
 261     */
 262    bitmap_to_le(le_bitmap, block->receivedmap, nbits);
 263
 264    /* Size of the bitmap, in bytes */
 265    size = DIV_ROUND_UP(nbits, 8);
 266
 267    /*
 268     * size is always aligned to 8 bytes for 64bit machines, but it
 269     * may not be true for 32bit machines. We need this padding to
 270     * make sure the migration can survive even between 32bit and
 271     * 64bit machines.
 272     */
 273    size = ROUND_UP(size, 8);
 274
 275    qemu_put_be64(file, size);
 276    qemu_put_buffer(file, (const uint8_t *)le_bitmap, size);
 277    /*
 278     * Mark as an end, in case the middle part is screwed up due to
 279     * some "misterious" reason.
 280     */
 281    qemu_put_be64(file, RAMBLOCK_RECV_BITMAP_ENDING);
 282    qemu_fflush(file);
 283
 284    g_free(le_bitmap);
 285
 286    if (qemu_file_get_error(file)) {
 287        return qemu_file_get_error(file);
 288    }
 289
 290    return size + sizeof(size);
 291}
 292
 293/*
 294 * An outstanding page request, on the source, having been received
 295 * and queued
 296 */
 297struct RAMSrcPageRequest {
 298    RAMBlock *rb;
 299    hwaddr    offset;
 300    hwaddr    len;
 301
 302    QSIMPLEQ_ENTRY(RAMSrcPageRequest) next_req;
 303};
 304
 305/* State of RAM for migration */
 306struct RAMState {
 307    /* QEMUFile used for this migration */
 308    QEMUFile *f;
 309    /* Last block that we have visited searching for dirty pages */
 310    RAMBlock *last_seen_block;
 311    /* Last block from where we have sent data */
 312    RAMBlock *last_sent_block;
 313    /* Last dirty target page we have sent */
 314    ram_addr_t last_page;
 315    /* last ram version we have seen */
 316    uint32_t last_version;
 317    /* We are in the first round */
 318    bool ram_bulk_stage;
 319    /* The free page optimization is enabled */
 320    bool fpo_enabled;
 321    /* How many times we have dirty too many pages */
 322    int dirty_rate_high_cnt;
 323    /* these variables are used for bitmap sync */
 324    /* last time we did a full bitmap_sync */
 325    int64_t time_last_bitmap_sync;
 326    /* bytes transferred at start_time */
 327    uint64_t bytes_xfer_prev;
 328    /* number of dirty pages since start_time */
 329    uint64_t num_dirty_pages_period;
 330    /* xbzrle misses since the beginning of the period */
 331    uint64_t xbzrle_cache_miss_prev;
 332
 333    /* compression statistics since the beginning of the period */
 334    /* amount of count that no free thread to compress data */
 335    uint64_t compress_thread_busy_prev;
 336    /* amount bytes after compression */
 337    uint64_t compressed_size_prev;
 338    /* amount of compressed pages */
 339    uint64_t compress_pages_prev;
 340
 341    /* total handled target pages at the beginning of period */
 342    uint64_t target_page_count_prev;
 343    /* total handled target pages since start */
 344    uint64_t target_page_count;
 345    /* number of dirty bits in the bitmap */
 346    uint64_t migration_dirty_pages;
 347    /* Protects modification of the bitmap and migration dirty pages */
 348    QemuMutex bitmap_mutex;
 349    /* The RAMBlock used in the last src_page_requests */
 350    RAMBlock *last_req_rb;
 351    /* Queue of outstanding page requests from the destination */
 352    QemuMutex src_page_req_mutex;
 353    QSIMPLEQ_HEAD(, RAMSrcPageRequest) src_page_requests;
 354};
 355typedef struct RAMState RAMState;
 356
 357static RAMState *ram_state;
 358
 359static NotifierWithReturnList precopy_notifier_list;
 360
 361void precopy_infrastructure_init(void)
 362{
 363    notifier_with_return_list_init(&precopy_notifier_list);
 364}
 365
 366void precopy_add_notifier(NotifierWithReturn *n)
 367{
 368    notifier_with_return_list_add(&precopy_notifier_list, n);
 369}
 370
 371void precopy_remove_notifier(NotifierWithReturn *n)
 372{
 373    notifier_with_return_remove(n);
 374}
 375
 376int precopy_notify(PrecopyNotifyReason reason, Error **errp)
 377{
 378    PrecopyNotifyData pnd;
 379    pnd.reason = reason;
 380    pnd.errp = errp;
 381
 382    return notifier_with_return_list_notify(&precopy_notifier_list, &pnd);
 383}
 384
 385void precopy_enable_free_page_optimization(void)
 386{
 387    if (!ram_state) {
 388        return;
 389    }
 390
 391    ram_state->fpo_enabled = true;
 392}
 393
 394uint64_t ram_bytes_remaining(void)
 395{
 396    return ram_state ? (ram_state->migration_dirty_pages * TARGET_PAGE_SIZE) :
 397                       0;
 398}
 399
 400MigrationStats ram_counters;
 401
 402/* used by the search for pages to send */
 403struct PageSearchStatus {
 404    /* Current block being searched */
 405    RAMBlock    *block;
 406    /* Current page to search from */
 407    unsigned long page;
 408    /* Set once we wrap around */
 409    bool         complete_round;
 410};
 411typedef struct PageSearchStatus PageSearchStatus;
 412
 413CompressionStats compression_counters;
 414
 415struct CompressParam {
 416    bool done;
 417    bool quit;
 418    bool zero_page;
 419    QEMUFile *file;
 420    QemuMutex mutex;
 421    QemuCond cond;
 422    RAMBlock *block;
 423    ram_addr_t offset;
 424
 425    /* internally used fields */
 426    z_stream stream;
 427    uint8_t *originbuf;
 428};
 429typedef struct CompressParam CompressParam;
 430
 431struct DecompressParam {
 432    bool done;
 433    bool quit;
 434    QemuMutex mutex;
 435    QemuCond cond;
 436    void *des;
 437    uint8_t *compbuf;
 438    int len;
 439    z_stream stream;
 440};
 441typedef struct DecompressParam DecompressParam;
 442
 443static CompressParam *comp_param;
 444static QemuThread *compress_threads;
 445/* comp_done_cond is used to wake up the migration thread when
 446 * one of the compression threads has finished the compression.
 447 * comp_done_lock is used to co-work with comp_done_cond.
 448 */
 449static QemuMutex comp_done_lock;
 450static QemuCond comp_done_cond;
 451/* The empty QEMUFileOps will be used by file in CompressParam */
 452static const QEMUFileOps empty_ops = { };
 453
 454static QEMUFile *decomp_file;
 455static DecompressParam *decomp_param;
 456static QemuThread *decompress_threads;
 457static QemuMutex decomp_done_lock;
 458static QemuCond decomp_done_cond;
 459
 460static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
 461                                 ram_addr_t offset, uint8_t *source_buf);
 462
 463static void *do_data_compress(void *opaque)
 464{
 465    CompressParam *param = opaque;
 466    RAMBlock *block;
 467    ram_addr_t offset;
 468    bool zero_page;
 469
 470    qemu_mutex_lock(&param->mutex);
 471    while (!param->quit) {
 472        if (param->block) {
 473            block = param->block;
 474            offset = param->offset;
 475            param->block = NULL;
 476            qemu_mutex_unlock(&param->mutex);
 477
 478            zero_page = do_compress_ram_page(param->file, &param->stream,
 479                                             block, offset, param->originbuf);
 480
 481            qemu_mutex_lock(&comp_done_lock);
 482            param->done = true;
 483            param->zero_page = zero_page;
 484            qemu_cond_signal(&comp_done_cond);
 485            qemu_mutex_unlock(&comp_done_lock);
 486
 487            qemu_mutex_lock(&param->mutex);
 488        } else {
 489            qemu_cond_wait(&param->cond, &param->mutex);
 490        }
 491    }
 492    qemu_mutex_unlock(&param->mutex);
 493
 494    return NULL;
 495}
 496
 497static void compress_threads_save_cleanup(void)
 498{
 499    int i, thread_count;
 500
 501    if (!migrate_use_compression() || !comp_param) {
 502        return;
 503    }
 504
 505    thread_count = migrate_compress_threads();
 506    for (i = 0; i < thread_count; i++) {
 507        /*
 508         * we use it as a indicator which shows if the thread is
 509         * properly init'd or not
 510         */
 511        if (!comp_param[i].file) {
 512            break;
 513        }
 514
 515        qemu_mutex_lock(&comp_param[i].mutex);
 516        comp_param[i].quit = true;
 517        qemu_cond_signal(&comp_param[i].cond);
 518        qemu_mutex_unlock(&comp_param[i].mutex);
 519
 520        qemu_thread_join(compress_threads + i);
 521        qemu_mutex_destroy(&comp_param[i].mutex);
 522        qemu_cond_destroy(&comp_param[i].cond);
 523        deflateEnd(&comp_param[i].stream);
 524        g_free(comp_param[i].originbuf);
 525        qemu_fclose(comp_param[i].file);
 526        comp_param[i].file = NULL;
 527    }
 528    qemu_mutex_destroy(&comp_done_lock);
 529    qemu_cond_destroy(&comp_done_cond);
 530    g_free(compress_threads);
 531    g_free(comp_param);
 532    compress_threads = NULL;
 533    comp_param = NULL;
 534}
 535
 536static int compress_threads_save_setup(void)
 537{
 538    int i, thread_count;
 539
 540    if (!migrate_use_compression()) {
 541        return 0;
 542    }
 543    thread_count = migrate_compress_threads();
 544    compress_threads = g_new0(QemuThread, thread_count);
 545    comp_param = g_new0(CompressParam, thread_count);
 546    qemu_cond_init(&comp_done_cond);
 547    qemu_mutex_init(&comp_done_lock);
 548    for (i = 0; i < thread_count; i++) {
 549        comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
 550        if (!comp_param[i].originbuf) {
 551            goto exit;
 552        }
 553
 554        if (deflateInit(&comp_param[i].stream,
 555                        migrate_compress_level()) != Z_OK) {
 556            g_free(comp_param[i].originbuf);
 557            goto exit;
 558        }
 559
 560        /* comp_param[i].file is just used as a dummy buffer to save data,
 561         * set its ops to empty.
 562         */
 563        comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
 564        comp_param[i].done = true;
 565        comp_param[i].quit = false;
 566        qemu_mutex_init(&comp_param[i].mutex);
 567        qemu_cond_init(&comp_param[i].cond);
 568        qemu_thread_create(compress_threads + i, "compress",
 569                           do_data_compress, comp_param + i,
 570                           QEMU_THREAD_JOINABLE);
 571    }
 572    return 0;
 573
 574exit:
 575    compress_threads_save_cleanup();
 576    return -1;
 577}
 578
 579/* Multiple fd's */
 580
 581#define MULTIFD_MAGIC 0x11223344U
 582#define MULTIFD_VERSION 1
 583
 584#define MULTIFD_FLAG_SYNC (1 << 0)
 585
 586/* This value needs to be a multiple of qemu_target_page_size() */
 587#define MULTIFD_PACKET_SIZE (512 * 1024)
 588
 589typedef struct {
 590    uint32_t magic;
 591    uint32_t version;
 592    unsigned char uuid[16]; /* QemuUUID */
 593    uint8_t id;
 594    uint8_t unused1[7];     /* Reserved for future use */
 595    uint64_t unused2[4];    /* Reserved for future use */
 596} __attribute__((packed)) MultiFDInit_t;
 597
 598typedef struct {
 599    uint32_t magic;
 600    uint32_t version;
 601    uint32_t flags;
 602    /* maximum number of allocated pages */
 603    uint32_t pages_alloc;
 604    uint32_t pages_used;
 605    /* size of the next packet that contains pages */
 606    uint32_t next_packet_size;
 607    uint64_t packet_num;
 608    uint64_t unused[4];    /* Reserved for future use */
 609    char ramblock[256];
 610    uint64_t offset[];
 611} __attribute__((packed)) MultiFDPacket_t;
 612
 613typedef struct {
 614    /* number of used pages */
 615    uint32_t used;
 616    /* number of allocated pages */
 617    uint32_t allocated;
 618    /* global number of generated multifd packets */
 619    uint64_t packet_num;
 620    /* offset of each page */
 621    ram_addr_t *offset;
 622    /* pointer to each page */
 623    struct iovec *iov;
 624    RAMBlock *block;
 625} MultiFDPages_t;
 626
 627typedef struct {
 628    /* this fields are not changed once the thread is created */
 629    /* channel number */
 630    uint8_t id;
 631    /* channel thread name */
 632    char *name;
 633    /* channel thread id */
 634    QemuThread thread;
 635    /* communication channel */
 636    QIOChannel *c;
 637    /* sem where to wait for more work */
 638    QemuSemaphore sem;
 639    /* this mutex protects the following parameters */
 640    QemuMutex mutex;
 641    /* is this channel thread running */
 642    bool running;
 643    /* should this thread finish */
 644    bool quit;
 645    /* thread has work to do */
 646    int pending_job;
 647    /* array of pages to sent */
 648    MultiFDPages_t *pages;
 649    /* packet allocated len */
 650    uint32_t packet_len;
 651    /* pointer to the packet */
 652    MultiFDPacket_t *packet;
 653    /* multifd flags for each packet */
 654    uint32_t flags;
 655    /* size of the next packet that contains pages */
 656    uint32_t next_packet_size;
 657    /* global number of generated multifd packets */
 658    uint64_t packet_num;
 659    /* thread local variables */
 660    /* packets sent through this channel */
 661    uint64_t num_packets;
 662    /* pages sent through this channel */
 663    uint64_t num_pages;
 664    /* syncs main thread and channels */
 665    QemuSemaphore sem_sync;
 666}  MultiFDSendParams;
 667
 668typedef struct {
 669    /* this fields are not changed once the thread is created */
 670    /* channel number */
 671    uint8_t id;
 672    /* channel thread name */
 673    char *name;
 674    /* channel thread id */
 675    QemuThread thread;
 676    /* communication channel */
 677    QIOChannel *c;
 678    /* this mutex protects the following parameters */
 679    QemuMutex mutex;
 680    /* is this channel thread running */
 681    bool running;
 682    /* array of pages to receive */
 683    MultiFDPages_t *pages;
 684    /* packet allocated len */
 685    uint32_t packet_len;
 686    /* pointer to the packet */
 687    MultiFDPacket_t *packet;
 688    /* multifd flags for each packet */
 689    uint32_t flags;
 690    /* global number of generated multifd packets */
 691    uint64_t packet_num;
 692    /* thread local variables */
 693    /* size of the next packet that contains pages */
 694    uint32_t next_packet_size;
 695    /* packets sent through this channel */
 696    uint64_t num_packets;
 697    /* pages sent through this channel */
 698    uint64_t num_pages;
 699    /* syncs main thread and channels */
 700    QemuSemaphore sem_sync;
 701} MultiFDRecvParams;
 702
 703static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
 704{
 705    MultiFDInit_t msg;
 706    int ret;
 707
 708    msg.magic = cpu_to_be32(MULTIFD_MAGIC);
 709    msg.version = cpu_to_be32(MULTIFD_VERSION);
 710    msg.id = p->id;
 711    memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
 712
 713    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
 714    if (ret != 0) {
 715        return -1;
 716    }
 717    return 0;
 718}
 719
 720static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
 721{
 722    MultiFDInit_t msg;
 723    int ret;
 724
 725    ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
 726    if (ret != 0) {
 727        return -1;
 728    }
 729
 730    msg.magic = be32_to_cpu(msg.magic);
 731    msg.version = be32_to_cpu(msg.version);
 732
 733    if (msg.magic != MULTIFD_MAGIC) {
 734        error_setg(errp, "multifd: received packet magic %x "
 735                   "expected %x", msg.magic, MULTIFD_MAGIC);
 736        return -1;
 737    }
 738
 739    if (msg.version != MULTIFD_VERSION) {
 740        error_setg(errp, "multifd: received packet version %d "
 741                   "expected %d", msg.version, MULTIFD_VERSION);
 742        return -1;
 743    }
 744
 745    if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
 746        char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
 747        char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
 748
 749        error_setg(errp, "multifd: received uuid '%s' and expected "
 750                   "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
 751        g_free(uuid);
 752        g_free(msg_uuid);
 753        return -1;
 754    }
 755
 756    if (msg.id > migrate_multifd_channels()) {
 757        error_setg(errp, "multifd: received channel version %d "
 758                   "expected %d", msg.version, MULTIFD_VERSION);
 759        return -1;
 760    }
 761
 762    return msg.id;
 763}
 764
 765static MultiFDPages_t *multifd_pages_init(size_t size)
 766{
 767    MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
 768
 769    pages->allocated = size;
 770    pages->iov = g_new0(struct iovec, size);
 771    pages->offset = g_new0(ram_addr_t, size);
 772
 773    return pages;
 774}
 775
 776static void multifd_pages_clear(MultiFDPages_t *pages)
 777{
 778    pages->used = 0;
 779    pages->allocated = 0;
 780    pages->packet_num = 0;
 781    pages->block = NULL;
 782    g_free(pages->iov);
 783    pages->iov = NULL;
 784    g_free(pages->offset);
 785    pages->offset = NULL;
 786    g_free(pages);
 787}
 788
 789static void multifd_send_fill_packet(MultiFDSendParams *p)
 790{
 791    MultiFDPacket_t *packet = p->packet;
 792    uint32_t page_max = MULTIFD_PACKET_SIZE / qemu_target_page_size();
 793    int i;
 794
 795    packet->magic = cpu_to_be32(MULTIFD_MAGIC);
 796    packet->version = cpu_to_be32(MULTIFD_VERSION);
 797    packet->flags = cpu_to_be32(p->flags);
 798    packet->pages_alloc = cpu_to_be32(page_max);
 799    packet->pages_used = cpu_to_be32(p->pages->used);
 800    packet->next_packet_size = cpu_to_be32(p->next_packet_size);
 801    packet->packet_num = cpu_to_be64(p->packet_num);
 802
 803    if (p->pages->block) {
 804        strncpy(packet->ramblock, p->pages->block->idstr, 256);
 805    }
 806
 807    for (i = 0; i < p->pages->used; i++) {
 808        packet->offset[i] = cpu_to_be64(p->pages->offset[i]);
 809    }
 810}
 811
 812static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
 813{
 814    MultiFDPacket_t *packet = p->packet;
 815    uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size();
 816    RAMBlock *block;
 817    int i;
 818
 819    packet->magic = be32_to_cpu(packet->magic);
 820    if (packet->magic != MULTIFD_MAGIC) {
 821        error_setg(errp, "multifd: received packet "
 822                   "magic %x and expected magic %x",
 823                   packet->magic, MULTIFD_MAGIC);
 824        return -1;
 825    }
 826
 827    packet->version = be32_to_cpu(packet->version);
 828    if (packet->version != MULTIFD_VERSION) {
 829        error_setg(errp, "multifd: received packet "
 830                   "version %d and expected version %d",
 831                   packet->version, MULTIFD_VERSION);
 832        return -1;
 833    }
 834
 835    p->flags = be32_to_cpu(packet->flags);
 836
 837    packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
 838    /*
 839     * If we recevied a packet that is 100 times bigger than expected
 840     * just stop migration.  It is a magic number.
 841     */
 842    if (packet->pages_alloc > pages_max * 100) {
 843        error_setg(errp, "multifd: received packet "
 844                   "with size %d and expected a maximum size of %d",
 845                   packet->pages_alloc, pages_max * 100) ;
 846        return -1;
 847    }
 848    /*
 849     * We received a packet that is bigger than expected but inside
 850     * reasonable limits (see previous comment).  Just reallocate.
 851     */
 852    if (packet->pages_alloc > p->pages->allocated) {
 853        multifd_pages_clear(p->pages);
 854        p->pages = multifd_pages_init(packet->pages_alloc);
 855    }
 856
 857    p->pages->used = be32_to_cpu(packet->pages_used);
 858    if (p->pages->used > packet->pages_alloc) {
 859        error_setg(errp, "multifd: received packet "
 860                   "with %d pages and expected maximum pages are %d",
 861                   p->pages->used, packet->pages_alloc) ;
 862        return -1;
 863    }
 864
 865    p->next_packet_size = be32_to_cpu(packet->next_packet_size);
 866    p->packet_num = be64_to_cpu(packet->packet_num);
 867
 868    if (p->pages->used) {
 869        /* make sure that ramblock is 0 terminated */
 870        packet->ramblock[255] = 0;
 871        block = qemu_ram_block_by_name(packet->ramblock);
 872        if (!block) {
 873            error_setg(errp, "multifd: unknown ram block %s",
 874                       packet->ramblock);
 875            return -1;
 876        }
 877    }
 878
 879    for (i = 0; i < p->pages->used; i++) {
 880        ram_addr_t offset = be64_to_cpu(packet->offset[i]);
 881
 882        if (offset > (block->used_length - TARGET_PAGE_SIZE)) {
 883            error_setg(errp, "multifd: offset too long " RAM_ADDR_FMT
 884                       " (max " RAM_ADDR_FMT ")",
 885                       offset, block->max_length);
 886            return -1;
 887        }
 888        p->pages->iov[i].iov_base = block->host + offset;
 889        p->pages->iov[i].iov_len = TARGET_PAGE_SIZE;
 890    }
 891
 892    return 0;
 893}
 894
 895struct {
 896    MultiFDSendParams *params;
 897    /* number of created threads */
 898    int count;
 899    /* array of pages to sent */
 900    MultiFDPages_t *pages;
 901    /* syncs main thread and channels */
 902    QemuSemaphore sem_sync;
 903    /* global number of generated multifd packets */
 904    uint64_t packet_num;
 905    /* send channels ready */
 906    QemuSemaphore channels_ready;
 907} *multifd_send_state;
 908
 909/*
 910 * How we use multifd_send_state->pages and channel->pages?
 911 *
 912 * We create a pages for each channel, and a main one.  Each time that
 913 * we need to send a batch of pages we interchange the ones between
 914 * multifd_send_state and the channel that is sending it.  There are
 915 * two reasons for that:
 916 *    - to not have to do so many mallocs during migration
 917 *    - to make easier to know what to free at the end of migration
 918 *
 919 * This way we always know who is the owner of each "pages" struct,
 920 * and we don't need any loocking.  It belongs to the migration thread
 921 * or to the channel thread.  Switching is safe because the migration
 922 * thread is using the channel mutex when changing it, and the channel
 923 * have to had finish with its own, otherwise pending_job can't be
 924 * false.
 925 */
 926
 927static void multifd_send_pages(void)
 928{
 929    int i;
 930    static int next_channel;
 931    MultiFDSendParams *p = NULL; /* make happy gcc */
 932    MultiFDPages_t *pages = multifd_send_state->pages;
 933    uint64_t transferred;
 934
 935    qemu_sem_wait(&multifd_send_state->channels_ready);
 936    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
 937        p = &multifd_send_state->params[i];
 938
 939        qemu_mutex_lock(&p->mutex);
 940        if (!p->pending_job) {
 941            p->pending_job++;
 942            next_channel = (i + 1) % migrate_multifd_channels();
 943            break;
 944        }
 945        qemu_mutex_unlock(&p->mutex);
 946    }
 947    p->pages->used = 0;
 948
 949    p->packet_num = multifd_send_state->packet_num++;
 950    p->pages->block = NULL;
 951    multifd_send_state->pages = p->pages;
 952    p->pages = pages;
 953    transferred = ((uint64_t) pages->used) * TARGET_PAGE_SIZE + p->packet_len;
 954    ram_counters.multifd_bytes += transferred;
 955    ram_counters.transferred += transferred;;
 956    qemu_mutex_unlock(&p->mutex);
 957    qemu_sem_post(&p->sem);
 958}
 959
 960static void multifd_queue_page(RAMBlock *block, ram_addr_t offset)
 961{
 962    MultiFDPages_t *pages = multifd_send_state->pages;
 963
 964    if (!pages->block) {
 965        pages->block = block;
 966    }
 967
 968    if (pages->block == block) {
 969        pages->offset[pages->used] = offset;
 970        pages->iov[pages->used].iov_base = block->host + offset;
 971        pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
 972        pages->used++;
 973
 974        if (pages->used < pages->allocated) {
 975            return;
 976        }
 977    }
 978
 979    multifd_send_pages();
 980
 981    if (pages->block != block) {
 982        multifd_queue_page(block, offset);
 983    }
 984}
 985
 986static void multifd_send_terminate_threads(Error *err)
 987{
 988    int i;
 989
 990    if (err) {
 991        MigrationState *s = migrate_get_current();
 992        migrate_set_error(s, err);
 993        if (s->state == MIGRATION_STATUS_SETUP ||
 994            s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
 995            s->state == MIGRATION_STATUS_DEVICE ||
 996            s->state == MIGRATION_STATUS_ACTIVE) {
 997            migrate_set_state(&s->state, s->state,
 998                              MIGRATION_STATUS_FAILED);
 999        }
1000    }
1001
1002    for (i = 0; i < migrate_multifd_channels(); i++) {
1003        MultiFDSendParams *p = &multifd_send_state->params[i];
1004
1005        qemu_mutex_lock(&p->mutex);
1006        p->quit = true;
1007        qemu_sem_post(&p->sem);
1008        qemu_mutex_unlock(&p->mutex);
1009    }
1010}
1011
1012void multifd_save_cleanup(void)
1013{
1014    int i;
1015
1016    if (!migrate_use_multifd()) {
1017        return;
1018    }
1019    multifd_send_terminate_threads(NULL);
1020    for (i = 0; i < migrate_multifd_channels(); i++) {
1021        MultiFDSendParams *p = &multifd_send_state->params[i];
1022
1023        if (p->running) {
1024            qemu_thread_join(&p->thread);
1025        }
1026        socket_send_channel_destroy(p->c);
1027        p->c = NULL;
1028        qemu_mutex_destroy(&p->mutex);
1029        qemu_sem_destroy(&p->sem);
1030        qemu_sem_destroy(&p->sem_sync);
1031        g_free(p->name);
1032        p->name = NULL;
1033        multifd_pages_clear(p->pages);
1034        p->pages = NULL;
1035        p->packet_len = 0;
1036        g_free(p->packet);
1037        p->packet = NULL;
1038    }
1039    qemu_sem_destroy(&multifd_send_state->channels_ready);
1040    qemu_sem_destroy(&multifd_send_state->sem_sync);
1041    g_free(multifd_send_state->params);
1042    multifd_send_state->params = NULL;
1043    multifd_pages_clear(multifd_send_state->pages);
1044    multifd_send_state->pages = NULL;
1045    g_free(multifd_send_state);
1046    multifd_send_state = NULL;
1047}
1048
1049static void multifd_send_sync_main(void)
1050{
1051    int i;
1052
1053    if (!migrate_use_multifd()) {
1054        return;
1055    }
1056    if (multifd_send_state->pages->used) {
1057        multifd_send_pages();
1058    }
1059    for (i = 0; i < migrate_multifd_channels(); i++) {
1060        MultiFDSendParams *p = &multifd_send_state->params[i];
1061
1062        trace_multifd_send_sync_main_signal(p->id);
1063
1064        qemu_mutex_lock(&p->mutex);
1065
1066        p->packet_num = multifd_send_state->packet_num++;
1067        p->flags |= MULTIFD_FLAG_SYNC;
1068        p->pending_job++;
1069        qemu_mutex_unlock(&p->mutex);
1070        qemu_sem_post(&p->sem);
1071    }
1072    for (i = 0; i < migrate_multifd_channels(); i++) {
1073        MultiFDSendParams *p = &multifd_send_state->params[i];
1074
1075        trace_multifd_send_sync_main_wait(p->id);
1076        qemu_sem_wait(&multifd_send_state->sem_sync);
1077    }
1078    trace_multifd_send_sync_main(multifd_send_state->packet_num);
1079}
1080
1081static void *multifd_send_thread(void *opaque)
1082{
1083    MultiFDSendParams *p = opaque;
1084    Error *local_err = NULL;
1085    int ret;
1086
1087    trace_multifd_send_thread_start(p->id);
1088    rcu_register_thread();
1089
1090    if (multifd_send_initial_packet(p, &local_err) < 0) {
1091        goto out;
1092    }
1093    /* initial packet */
1094    p->num_packets = 1;
1095
1096    while (true) {
1097        qemu_sem_wait(&p->sem);
1098        qemu_mutex_lock(&p->mutex);
1099
1100        if (p->pending_job) {
1101            uint32_t used = p->pages->used;
1102            uint64_t packet_num = p->packet_num;
1103            uint32_t flags = p->flags;
1104
1105            p->next_packet_size = used * qemu_target_page_size();
1106            multifd_send_fill_packet(p);
1107            p->flags = 0;
1108            p->num_packets++;
1109            p->num_pages += used;
1110            p->pages->used = 0;
1111            qemu_mutex_unlock(&p->mutex);
1112
1113            trace_multifd_send(p->id, packet_num, used, flags,
1114                               p->next_packet_size);
1115
1116            ret = qio_channel_write_all(p->c, (void *)p->packet,
1117                                        p->packet_len, &local_err);
1118            if (ret != 0) {
1119                break;
1120            }
1121
1122            if (used) {
1123                ret = qio_channel_writev_all(p->c, p->pages->iov,
1124                                             used, &local_err);
1125                if (ret != 0) {
1126                    break;
1127                }
1128            }
1129
1130            qemu_mutex_lock(&p->mutex);
1131            p->pending_job--;
1132            qemu_mutex_unlock(&p->mutex);
1133
1134            if (flags & MULTIFD_FLAG_SYNC) {
1135                qemu_sem_post(&multifd_send_state->sem_sync);
1136            }
1137            qemu_sem_post(&multifd_send_state->channels_ready);
1138        } else if (p->quit) {
1139            qemu_mutex_unlock(&p->mutex);
1140            break;
1141        } else {
1142            qemu_mutex_unlock(&p->mutex);
1143            /* sometimes there are spurious wakeups */
1144        }
1145    }
1146
1147out:
1148    if (local_err) {
1149        multifd_send_terminate_threads(local_err);
1150    }
1151
1152    qemu_mutex_lock(&p->mutex);
1153    p->running = false;
1154    qemu_mutex_unlock(&p->mutex);
1155
1156    rcu_unregister_thread();
1157    trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
1158
1159    return NULL;
1160}
1161
1162static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
1163{
1164    MultiFDSendParams *p = opaque;
1165    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
1166    Error *local_err = NULL;
1167
1168    if (qio_task_propagate_error(task, &local_err)) {
1169        migrate_set_error(migrate_get_current(), local_err);
1170        multifd_save_cleanup();
1171    } else {
1172        p->c = QIO_CHANNEL(sioc);
1173        qio_channel_set_delay(p->c, false);
1174        p->running = true;
1175        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
1176                           QEMU_THREAD_JOINABLE);
1177
1178        atomic_inc(&multifd_send_state->count);
1179    }
1180}
1181
1182int multifd_save_setup(void)
1183{
1184    int thread_count;
1185    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1186    uint8_t i;
1187
1188    if (!migrate_use_multifd()) {
1189        return 0;
1190    }
1191    thread_count = migrate_multifd_channels();
1192    multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1193    multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1194    atomic_set(&multifd_send_state->count, 0);
1195    multifd_send_state->pages = multifd_pages_init(page_count);
1196    qemu_sem_init(&multifd_send_state->sem_sync, 0);
1197    qemu_sem_init(&multifd_send_state->channels_ready, 0);
1198
1199    for (i = 0; i < thread_count; i++) {
1200        MultiFDSendParams *p = &multifd_send_state->params[i];
1201
1202        qemu_mutex_init(&p->mutex);
1203        qemu_sem_init(&p->sem, 0);
1204        qemu_sem_init(&p->sem_sync, 0);
1205        p->quit = false;
1206        p->pending_job = 0;
1207        p->id = i;
1208        p->pages = multifd_pages_init(page_count);
1209        p->packet_len = sizeof(MultiFDPacket_t)
1210                      + sizeof(ram_addr_t) * page_count;
1211        p->packet = g_malloc0(p->packet_len);
1212        p->name = g_strdup_printf("multifdsend_%d", i);
1213        socket_send_channel_create(multifd_new_send_channel_async, p);
1214    }
1215    return 0;
1216}
1217
1218struct {
1219    MultiFDRecvParams *params;
1220    /* number of created threads */
1221    int count;
1222    /* syncs main thread and channels */
1223    QemuSemaphore sem_sync;
1224    /* global number of generated multifd packets */
1225    uint64_t packet_num;
1226} *multifd_recv_state;
1227
1228static void multifd_recv_terminate_threads(Error *err)
1229{
1230    int i;
1231
1232    if (err) {
1233        MigrationState *s = migrate_get_current();
1234        migrate_set_error(s, err);
1235        if (s->state == MIGRATION_STATUS_SETUP ||
1236            s->state == MIGRATION_STATUS_ACTIVE) {
1237            migrate_set_state(&s->state, s->state,
1238                              MIGRATION_STATUS_FAILED);
1239        }
1240    }
1241
1242    for (i = 0; i < migrate_multifd_channels(); i++) {
1243        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1244
1245        qemu_mutex_lock(&p->mutex);
1246        /* We could arrive here for two reasons:
1247           - normal quit, i.e. everything went fine, just finished
1248           - error quit: We close the channels so the channel threads
1249             finish the qio_channel_read_all_eof() */
1250        qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1251        qemu_mutex_unlock(&p->mutex);
1252    }
1253}
1254
1255int multifd_load_cleanup(Error **errp)
1256{
1257    int i;
1258    int ret = 0;
1259
1260    if (!migrate_use_multifd()) {
1261        return 0;
1262    }
1263    multifd_recv_terminate_threads(NULL);
1264    for (i = 0; i < migrate_multifd_channels(); i++) {
1265        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1266
1267        if (p->running) {
1268            qemu_thread_join(&p->thread);
1269        }
1270        object_unref(OBJECT(p->c));
1271        p->c = NULL;
1272        qemu_mutex_destroy(&p->mutex);
1273        qemu_sem_destroy(&p->sem_sync);
1274        g_free(p->name);
1275        p->name = NULL;
1276        multifd_pages_clear(p->pages);
1277        p->pages = NULL;
1278        p->packet_len = 0;
1279        g_free(p->packet);
1280        p->packet = NULL;
1281    }
1282    qemu_sem_destroy(&multifd_recv_state->sem_sync);
1283    g_free(multifd_recv_state->params);
1284    multifd_recv_state->params = NULL;
1285    g_free(multifd_recv_state);
1286    multifd_recv_state = NULL;
1287
1288    return ret;
1289}
1290
1291static void multifd_recv_sync_main(void)
1292{
1293    int i;
1294
1295    if (!migrate_use_multifd()) {
1296        return;
1297    }
1298    for (i = 0; i < migrate_multifd_channels(); i++) {
1299        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1300
1301        trace_multifd_recv_sync_main_wait(p->id);
1302        qemu_sem_wait(&multifd_recv_state->sem_sync);
1303        qemu_mutex_lock(&p->mutex);
1304        if (multifd_recv_state->packet_num < p->packet_num) {
1305            multifd_recv_state->packet_num = p->packet_num;
1306        }
1307        qemu_mutex_unlock(&p->mutex);
1308    }
1309    for (i = 0; i < migrate_multifd_channels(); i++) {
1310        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1311
1312        trace_multifd_recv_sync_main_signal(p->id);
1313        qemu_sem_post(&p->sem_sync);
1314    }
1315    trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1316}
1317
1318static void *multifd_recv_thread(void *opaque)
1319{
1320    MultiFDRecvParams *p = opaque;
1321    Error *local_err = NULL;
1322    int ret;
1323
1324    trace_multifd_recv_thread_start(p->id);
1325    rcu_register_thread();
1326
1327    while (true) {
1328        uint32_t used;
1329        uint32_t flags;
1330
1331        ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1332                                       p->packet_len, &local_err);
1333        if (ret == 0) {   /* EOF */
1334            break;
1335        }
1336        if (ret == -1) {   /* Error */
1337            break;
1338        }
1339
1340        qemu_mutex_lock(&p->mutex);
1341        ret = multifd_recv_unfill_packet(p, &local_err);
1342        if (ret) {
1343            qemu_mutex_unlock(&p->mutex);
1344            break;
1345        }
1346
1347        used = p->pages->used;
1348        flags = p->flags;
1349        trace_multifd_recv(p->id, p->packet_num, used, flags,
1350                           p->next_packet_size);
1351        p->num_packets++;
1352        p->num_pages += used;
1353        qemu_mutex_unlock(&p->mutex);
1354
1355        if (used) {
1356            ret = qio_channel_readv_all(p->c, p->pages->iov,
1357                                        used, &local_err);
1358            if (ret != 0) {
1359                break;
1360            }
1361        }
1362
1363        if (flags & MULTIFD_FLAG_SYNC) {
1364            qemu_sem_post(&multifd_recv_state->sem_sync);
1365            qemu_sem_wait(&p->sem_sync);
1366        }
1367    }
1368
1369    if (local_err) {
1370        multifd_recv_terminate_threads(local_err);
1371    }
1372    qemu_mutex_lock(&p->mutex);
1373    p->running = false;
1374    qemu_mutex_unlock(&p->mutex);
1375
1376    rcu_unregister_thread();
1377    trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
1378
1379    return NULL;
1380}
1381
1382int multifd_load_setup(void)
1383{
1384    int thread_count;
1385    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1386    uint8_t i;
1387
1388    if (!migrate_use_multifd()) {
1389        return 0;
1390    }
1391    thread_count = migrate_multifd_channels();
1392    multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1393    multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1394    atomic_set(&multifd_recv_state->count, 0);
1395    qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1396
1397    for (i = 0; i < thread_count; i++) {
1398        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1399
1400        qemu_mutex_init(&p->mutex);
1401        qemu_sem_init(&p->sem_sync, 0);
1402        p->id = i;
1403        p->pages = multifd_pages_init(page_count);
1404        p->packet_len = sizeof(MultiFDPacket_t)
1405                      + sizeof(ram_addr_t) * page_count;
1406        p->packet = g_malloc0(p->packet_len);
1407        p->name = g_strdup_printf("multifdrecv_%d", i);
1408    }
1409    return 0;
1410}
1411
1412bool multifd_recv_all_channels_created(void)
1413{
1414    int thread_count = migrate_multifd_channels();
1415
1416    if (!migrate_use_multifd()) {
1417        return true;
1418    }
1419
1420    return thread_count == atomic_read(&multifd_recv_state->count);
1421}
1422
1423/*
1424 * Try to receive all multifd channels to get ready for the migration.
1425 * - Return true and do not set @errp when correctly receving all channels;
1426 * - Return false and do not set @errp when correctly receiving the current one;
1427 * - Return false and set @errp when failing to receive the current channel.
1428 */
1429bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1430{
1431    MultiFDRecvParams *p;
1432    Error *local_err = NULL;
1433    int id;
1434
1435    id = multifd_recv_initial_packet(ioc, &local_err);
1436    if (id < 0) {
1437        multifd_recv_terminate_threads(local_err);
1438        error_propagate_prepend(errp, local_err,
1439                                "failed to receive packet"
1440                                " via multifd channel %d: ",
1441                                atomic_read(&multifd_recv_state->count));
1442        return false;
1443    }
1444
1445    p = &multifd_recv_state->params[id];
1446    if (p->c != NULL) {
1447        error_setg(&local_err, "multifd: received id '%d' already setup'",
1448                   id);
1449        multifd_recv_terminate_threads(local_err);
1450        error_propagate(errp, local_err);
1451        return false;
1452    }
1453    p->c = ioc;
1454    object_ref(OBJECT(ioc));
1455    /* initial packet */
1456    p->num_packets = 1;
1457
1458    p->running = true;
1459    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1460                       QEMU_THREAD_JOINABLE);
1461    atomic_inc(&multifd_recv_state->count);
1462    return atomic_read(&multifd_recv_state->count) ==
1463           migrate_multifd_channels();
1464}
1465
1466/**
1467 * save_page_header: write page header to wire
1468 *
1469 * If this is the 1st block, it also writes the block identification
1470 *
1471 * Returns the number of bytes written
1472 *
1473 * @f: QEMUFile where to send the data
1474 * @block: block that contains the page we want to send
1475 * @offset: offset inside the block for the page
1476 *          in the lower bits, it contains flags
1477 */
1478static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
1479                               ram_addr_t offset)
1480{
1481    size_t size, len;
1482
1483    if (block == rs->last_sent_block) {
1484        offset |= RAM_SAVE_FLAG_CONTINUE;
1485    }
1486    qemu_put_be64(f, offset);
1487    size = 8;
1488
1489    if (!(offset & RAM_SAVE_FLAG_CONTINUE)) {
1490        len = strlen(block->idstr);
1491        qemu_put_byte(f, len);
1492        qemu_put_buffer(f, (uint8_t *)block->idstr, len);
1493        size += 1 + len;
1494        rs->last_sent_block = block;
1495    }
1496    return size;
1497}
1498
1499/**
1500 * mig_throttle_guest_down: throotle down the guest
1501 *
1502 * Reduce amount of guest cpu execution to hopefully slow down memory
1503 * writes. If guest dirty memory rate is reduced below the rate at
1504 * which we can transfer pages to the destination then we should be
1505 * able to complete migration. Some workloads dirty memory way too
1506 * fast and will not effectively converge, even with auto-converge.
1507 */
1508static void mig_throttle_guest_down(void)
1509{
1510    MigrationState *s = migrate_get_current();
1511    uint64_t pct_initial = s->parameters.cpu_throttle_initial;
1512    uint64_t pct_icrement = s->parameters.cpu_throttle_increment;
1513    int pct_max = s->parameters.max_cpu_throttle;
1514
1515    /* We have not started throttling yet. Let's start it. */
1516    if (!cpu_throttle_active()) {
1517        cpu_throttle_set(pct_initial);
1518    } else {
1519        /* Throttling already on, just increase the rate */
1520        cpu_throttle_set(MIN(cpu_throttle_get_percentage() + pct_icrement,
1521                         pct_max));
1522    }
1523}
1524
1525/**
1526 * xbzrle_cache_zero_page: insert a zero page in the XBZRLE cache
1527 *
1528 * @rs: current RAM state
1529 * @current_addr: address for the zero page
1530 *
1531 * Update the xbzrle cache to reflect a page that's been sent as all 0.
1532 * The important thing is that a stale (not-yet-0'd) page be replaced
1533 * by the new data.
1534 * As a bonus, if the page wasn't in the cache it gets added so that
1535 * when a small write is made into the 0'd page it gets XBZRLE sent.
1536 */
1537static void xbzrle_cache_zero_page(RAMState *rs, ram_addr_t current_addr)
1538{
1539    if (rs->ram_bulk_stage || !migrate_use_xbzrle()) {
1540        return;
1541    }
1542
1543    /* We don't care if this fails to allocate a new cache page
1544     * as long as it updated an old one */
1545    cache_insert(XBZRLE.cache, current_addr, XBZRLE.zero_target_page,
1546                 ram_counters.dirty_sync_count);
1547}
1548
1549#define ENCODING_FLAG_XBZRLE 0x1
1550
1551/**
1552 * save_xbzrle_page: compress and send current page
1553 *
1554 * Returns: 1 means that we wrote the page
1555 *          0 means that page is identical to the one already sent
1556 *          -1 means that xbzrle would be longer than normal
1557 *
1558 * @rs: current RAM state
1559 * @current_data: pointer to the address of the page contents
1560 * @current_addr: addr of the page
1561 * @block: block that contains the page we want to send
1562 * @offset: offset inside the block for the page
1563 * @last_stage: if we are at the completion stage
1564 */
1565static int save_xbzrle_page(RAMState *rs, uint8_t **current_data,
1566                            ram_addr_t current_addr, RAMBlock *block,
1567                            ram_addr_t offset, bool last_stage)
1568{
1569    int encoded_len = 0, bytes_xbzrle;
1570    uint8_t *prev_cached_page;
1571
1572    if (!cache_is_cached(XBZRLE.cache, current_addr,
1573                         ram_counters.dirty_sync_count)) {
1574        xbzrle_counters.cache_miss++;
1575        if (!last_stage) {
1576            if (cache_insert(XBZRLE.cache, current_addr, *current_data,
1577                             ram_counters.dirty_sync_count) == -1) {
1578                return -1;
1579            } else {
1580                /* update *current_data when the page has been
1581                   inserted into cache */
1582                *current_data = get_cached_data(XBZRLE.cache, current_addr);
1583            }
1584        }
1585        return -1;
1586    }
1587
1588    prev_cached_page = get_cached_data(XBZRLE.cache, current_addr);
1589
1590    /* save current buffer into memory */
1591    memcpy(XBZRLE.current_buf, *current_data, TARGET_PAGE_SIZE);
1592
1593    /* XBZRLE encoding (if there is no overflow) */
1594    encoded_len = xbzrle_encode_buffer(prev_cached_page, XBZRLE.current_buf,
1595                                       TARGET_PAGE_SIZE, XBZRLE.encoded_buf,
1596                                       TARGET_PAGE_SIZE);
1597    if (encoded_len == 0) {
1598        trace_save_xbzrle_page_skipping();
1599        return 0;
1600    } else if (encoded_len == -1) {
1601        trace_save_xbzrle_page_overflow();
1602        xbzrle_counters.overflow++;
1603        /* update data in the cache */
1604        if (!last_stage) {
1605            memcpy(prev_cached_page, *current_data, TARGET_PAGE_SIZE);
1606            *current_data = prev_cached_page;
1607        }
1608        return -1;
1609    }
1610
1611    /* we need to update the data in the cache, in order to get the same data */
1612    if (!last_stage) {
1613        memcpy(prev_cached_page, XBZRLE.current_buf, TARGET_PAGE_SIZE);
1614    }
1615
1616    /* Send XBZRLE based compressed page */
1617    bytes_xbzrle = save_page_header(rs, rs->f, block,
1618                                    offset | RAM_SAVE_FLAG_XBZRLE);
1619    qemu_put_byte(rs->f, ENCODING_FLAG_XBZRLE);
1620    qemu_put_be16(rs->f, encoded_len);
1621    qemu_put_buffer(rs->f, XBZRLE.encoded_buf, encoded_len);
1622    bytes_xbzrle += encoded_len + 1 + 2;
1623    xbzrle_counters.pages++;
1624    xbzrle_counters.bytes += bytes_xbzrle;
1625    ram_counters.transferred += bytes_xbzrle;
1626
1627    return 1;
1628}
1629
1630/**
1631 * migration_bitmap_find_dirty: find the next dirty page from start
1632 *
1633 * Called with rcu_read_lock() to protect migration_bitmap
1634 *
1635 * Returns the byte offset within memory region of the start of a dirty page
1636 *
1637 * @rs: current RAM state
1638 * @rb: RAMBlock where to search for dirty pages
1639 * @start: page where we start the search
1640 */
1641static inline
1642unsigned long migration_bitmap_find_dirty(RAMState *rs, RAMBlock *rb,
1643                                          unsigned long start)
1644{
1645    unsigned long size = rb->used_length >> TARGET_PAGE_BITS;
1646    unsigned long *bitmap = rb->bmap;
1647    unsigned long next;
1648
1649    if (ramblock_is_ignored(rb)) {
1650        return size;
1651    }
1652
1653    /*
1654     * When the free page optimization is enabled, we need to check the bitmap
1655     * to send the non-free pages rather than all the pages in the bulk stage.
1656     */
1657    if (!rs->fpo_enabled && rs->ram_bulk_stage && start > 0) {
1658        next = start + 1;
1659    } else {
1660        next = find_next_bit(bitmap, size, start);
1661    }
1662
1663    return next;
1664}
1665
1666static inline bool migration_bitmap_clear_dirty(RAMState *rs,
1667                                                RAMBlock *rb,
1668                                                unsigned long page)
1669{
1670    bool ret;
1671
1672    qemu_mutex_lock(&rs->bitmap_mutex);
1673    ret = test_and_clear_bit(page, rb->bmap);
1674
1675    if (ret) {
1676        rs->migration_dirty_pages--;
1677    }
1678    qemu_mutex_unlock(&rs->bitmap_mutex);
1679
1680    return ret;
1681}
1682
1683static void migration_bitmap_sync_range(RAMState *rs, RAMBlock *rb,
1684                                        ram_addr_t start, ram_addr_t length)
1685{
1686    rs->migration_dirty_pages +=
1687        cpu_physical_memory_sync_dirty_bitmap(rb, start, length,
1688                                              &rs->num_dirty_pages_period);
1689}
1690
1691/**
1692 * ram_pagesize_summary: calculate all the pagesizes of a VM
1693 *
1694 * Returns a summary bitmap of the page sizes of all RAMBlocks
1695 *
1696 * For VMs with just normal pages this is equivalent to the host page
1697 * size. If it's got some huge pages then it's the OR of all the
1698 * different page sizes.
1699 */
1700uint64_t ram_pagesize_summary(void)
1701{
1702    RAMBlock *block;
1703    uint64_t summary = 0;
1704
1705    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
1706        summary |= block->page_size;
1707    }
1708
1709    return summary;
1710}
1711
1712uint64_t ram_get_total_transferred_pages(void)
1713{
1714    return  ram_counters.normal + ram_counters.duplicate +
1715                compression_counters.pages + xbzrle_counters.pages;
1716}
1717
1718static void migration_update_rates(RAMState *rs, int64_t end_time)
1719{
1720    uint64_t page_count = rs->target_page_count - rs->target_page_count_prev;
1721    double compressed_size;
1722
1723    /* calculate period counters */
1724    ram_counters.dirty_pages_rate = rs->num_dirty_pages_period * 1000
1725                / (end_time - rs->time_last_bitmap_sync);
1726
1727    if (!page_count) {
1728        return;
1729    }
1730
1731    if (migrate_use_xbzrle()) {
1732        xbzrle_counters.cache_miss_rate = (double)(xbzrle_counters.cache_miss -
1733            rs->xbzrle_cache_miss_prev) / page_count;
1734        rs->xbzrle_cache_miss_prev = xbzrle_counters.cache_miss;
1735    }
1736
1737    if (migrate_use_compression()) {
1738        compression_counters.busy_rate = (double)(compression_counters.busy -
1739            rs->compress_thread_busy_prev) / page_count;
1740        rs->compress_thread_busy_prev = compression_counters.busy;
1741
1742        compressed_size = compression_counters.compressed_size -
1743                          rs->compressed_size_prev;
1744        if (compressed_size) {
1745            double uncompressed_size = (compression_counters.pages -
1746                                    rs->compress_pages_prev) * TARGET_PAGE_SIZE;
1747
1748            /* Compression-Ratio = Uncompressed-size / Compressed-size */
1749            compression_counters.compression_rate =
1750                                        uncompressed_size / compressed_size;
1751
1752            rs->compress_pages_prev = compression_counters.pages;
1753            rs->compressed_size_prev = compression_counters.compressed_size;
1754        }
1755    }
1756}
1757
1758static void migration_bitmap_sync(RAMState *rs)
1759{
1760    RAMBlock *block;
1761    int64_t end_time;
1762    uint64_t bytes_xfer_now;
1763
1764    ram_counters.dirty_sync_count++;
1765
1766    if (!rs->time_last_bitmap_sync) {
1767        rs->time_last_bitmap_sync = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1768    }
1769
1770    trace_migration_bitmap_sync_start();
1771    memory_global_dirty_log_sync();
1772
1773    qemu_mutex_lock(&rs->bitmap_mutex);
1774    rcu_read_lock();
1775    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
1776        migration_bitmap_sync_range(rs, block, 0, block->used_length);
1777    }
1778    ram_counters.remaining = ram_bytes_remaining();
1779    rcu_read_unlock();
1780    qemu_mutex_unlock(&rs->bitmap_mutex);
1781
1782    trace_migration_bitmap_sync_end(rs->num_dirty_pages_period);
1783
1784    end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1785
1786    /* more than 1 second = 1000 millisecons */
1787    if (end_time > rs->time_last_bitmap_sync + 1000) {
1788        bytes_xfer_now = ram_counters.transferred;
1789
1790        /* During block migration the auto-converge logic incorrectly detects
1791         * that ram migration makes no progress. Avoid this by disabling the
1792         * throttling logic during the bulk phase of block migration. */
1793        if (migrate_auto_converge() && !blk_mig_bulk_active()) {
1794            /* The following detection logic can be refined later. For now:
1795               Check to see if the dirtied bytes is 50% more than the approx.
1796               amount of bytes that just got transferred since the last time we
1797               were in this routine. If that happens twice, start or increase
1798               throttling */
1799
1800            if ((rs->num_dirty_pages_period * TARGET_PAGE_SIZE >
1801                   (bytes_xfer_now - rs->bytes_xfer_prev) / 2) &&
1802                (++rs->dirty_rate_high_cnt >= 2)) {
1803                    trace_migration_throttle();
1804                    rs->dirty_rate_high_cnt = 0;
1805                    mig_throttle_guest_down();
1806            }
1807        }
1808
1809        migration_update_rates(rs, end_time);
1810
1811        rs->target_page_count_prev = rs->target_page_count;
1812
1813        /* reset period counters */
1814        rs->time_last_bitmap_sync = end_time;
1815        rs->num_dirty_pages_period = 0;
1816        rs->bytes_xfer_prev = bytes_xfer_now;
1817    }
1818    if (migrate_use_events()) {
1819        qapi_event_send_migration_pass(ram_counters.dirty_sync_count);
1820    }
1821}
1822
1823static void migration_bitmap_sync_precopy(RAMState *rs)
1824{
1825    Error *local_err = NULL;
1826
1827    /*
1828     * The current notifier usage is just an optimization to migration, so we
1829     * don't stop the normal migration process in the error case.
1830     */
1831    if (precopy_notify(PRECOPY_NOTIFY_BEFORE_BITMAP_SYNC, &local_err)) {
1832        error_report_err(local_err);
1833    }
1834
1835    migration_bitmap_sync(rs);
1836
1837    if (precopy_notify(PRECOPY_NOTIFY_AFTER_BITMAP_SYNC, &local_err)) {
1838        error_report_err(local_err);
1839    }
1840}
1841
1842/**
1843 * save_zero_page_to_file: send the zero page to the file
1844 *
1845 * Returns the size of data written to the file, 0 means the page is not
1846 * a zero page
1847 *
1848 * @rs: current RAM state
1849 * @file: the file where the data is saved
1850 * @block: block that contains the page we want to send
1851 * @offset: offset inside the block for the page
1852 */
1853static int save_zero_page_to_file(RAMState *rs, QEMUFile *file,
1854                                  RAMBlock *block, ram_addr_t offset)
1855{
1856    uint8_t *p = block->host + offset;
1857    int len = 0;
1858
1859    if (is_zero_range(p, TARGET_PAGE_SIZE)) {
1860        len += save_page_header(rs, file, block, offset | RAM_SAVE_FLAG_ZERO);
1861        qemu_put_byte(file, 0);
1862        len += 1;
1863    }
1864    return len;
1865}
1866
1867/**
1868 * save_zero_page: send the zero page to the stream
1869 *
1870 * Returns the number of pages written.
1871 *
1872 * @rs: current RAM state
1873 * @block: block that contains the page we want to send
1874 * @offset: offset inside the block for the page
1875 */
1876static int save_zero_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
1877{
1878    int len = save_zero_page_to_file(rs, rs->f, block, offset);
1879
1880    if (len) {
1881        ram_counters.duplicate++;
1882        ram_counters.transferred += len;
1883        return 1;
1884    }
1885    return -1;
1886}
1887
1888static void ram_release_pages(const char *rbname, uint64_t offset, int pages)
1889{
1890    if (!migrate_release_ram() || !migration_in_postcopy()) {
1891        return;
1892    }
1893
1894    ram_discard_range(rbname, offset, pages << TARGET_PAGE_BITS);
1895}
1896
1897/*
1898 * @pages: the number of pages written by the control path,
1899 *        < 0 - error
1900 *        > 0 - number of pages written
1901 *
1902 * Return true if the pages has been saved, otherwise false is returned.
1903 */
1904static bool control_save_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
1905                              int *pages)
1906{
1907    uint64_t bytes_xmit = 0;
1908    int ret;
1909
1910    *pages = -1;
1911    ret = ram_control_save_page(rs->f, block->offset, offset, TARGET_PAGE_SIZE,
1912                                &bytes_xmit);
1913    if (ret == RAM_SAVE_CONTROL_NOT_SUPP) {
1914        return false;
1915    }
1916
1917    if (bytes_xmit) {
1918        ram_counters.transferred += bytes_xmit;
1919        *pages = 1;
1920    }
1921
1922    if (ret == RAM_SAVE_CONTROL_DELAYED) {
1923        return true;
1924    }
1925
1926    if (bytes_xmit > 0) {
1927        ram_counters.normal++;
1928    } else if (bytes_xmit == 0) {
1929        ram_counters.duplicate++;
1930    }
1931
1932    return true;
1933}
1934
1935/*
1936 * directly send the page to the stream
1937 *
1938 * Returns the number of pages written.
1939 *
1940 * @rs: current RAM state
1941 * @block: block that contains the page we want to send
1942 * @offset: offset inside the block for the page
1943 * @buf: the page to be sent
1944 * @async: send to page asyncly
1945 */
1946static int save_normal_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
1947                            uint8_t *buf, bool async)
1948{
1949    ram_counters.transferred += save_page_header(rs, rs->f, block,
1950                                                 offset | RAM_SAVE_FLAG_PAGE);
1951    if (async) {
1952        qemu_put_buffer_async(rs->f, buf, TARGET_PAGE_SIZE,
1953                              migrate_release_ram() &
1954                              migration_in_postcopy());
1955    } else {
1956        qemu_put_buffer(rs->f, buf, TARGET_PAGE_SIZE);
1957    }
1958    ram_counters.transferred += TARGET_PAGE_SIZE;
1959    ram_counters.normal++;
1960    return 1;
1961}
1962
1963/**
1964 * ram_save_page: send the given page to the stream
1965 *
1966 * Returns the number of pages written.
1967 *          < 0 - error
1968 *          >=0 - Number of pages written - this might legally be 0
1969 *                if xbzrle noticed the page was the same.
1970 *
1971 * @rs: current RAM state
1972 * @block: block that contains the page we want to send
1973 * @offset: offset inside the block for the page
1974 * @last_stage: if we are at the completion stage
1975 */
1976static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
1977{
1978    int pages = -1;
1979    uint8_t *p;
1980    bool send_async = true;
1981    RAMBlock *block = pss->block;
1982    ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
1983    ram_addr_t current_addr = block->offset + offset;
1984
1985    p = block->host + offset;
1986    trace_ram_save_page(block->idstr, (uint64_t)offset, p);
1987
1988    XBZRLE_cache_lock();
1989    if (!rs->ram_bulk_stage && !migration_in_postcopy() &&
1990        migrate_use_xbzrle()) {
1991        pages = save_xbzrle_page(rs, &p, current_addr, block,
1992                                 offset, last_stage);
1993        if (!last_stage) {
1994            /* Can't send this cached data async, since the cache page
1995             * might get updated before it gets to the wire
1996             */
1997            send_async = false;
1998        }
1999    }
2000
2001    /* XBZRLE overflow or normal page */
2002    if (pages == -1) {
2003        pages = save_normal_page(rs, block, offset, p, send_async);
2004    }
2005
2006    XBZRLE_cache_unlock();
2007
2008    return pages;
2009}
2010
2011static int ram_save_multifd_page(RAMState *rs, RAMBlock *block,
2012                                 ram_addr_t offset)
2013{
2014    multifd_queue_page(block, offset);
2015    ram_counters.normal++;
2016
2017    return 1;
2018}
2019
2020static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
2021                                 ram_addr_t offset, uint8_t *source_buf)
2022{
2023    RAMState *rs = ram_state;
2024    uint8_t *p = block->host + (offset & TARGET_PAGE_MASK);
2025    bool zero_page = false;
2026    int ret;
2027
2028    if (save_zero_page_to_file(rs, f, block, offset)) {
2029        zero_page = true;
2030        goto exit;
2031    }
2032
2033    save_page_header(rs, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
2034
2035    /*
2036     * copy it to a internal buffer to avoid it being modified by VM
2037     * so that we can catch up the error during compression and
2038     * decompression
2039     */
2040    memcpy(source_buf, p, TARGET_PAGE_SIZE);
2041    ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
2042    if (ret < 0) {
2043        qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
2044        error_report("compressed data failed!");
2045        return false;
2046    }
2047
2048exit:
2049    ram_release_pages(block->idstr, offset & TARGET_PAGE_MASK, 1);
2050    return zero_page;
2051}
2052
2053static void
2054update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
2055{
2056    ram_counters.transferred += bytes_xmit;
2057
2058    if (param->zero_page) {
2059        ram_counters.duplicate++;
2060        return;
2061    }
2062
2063    /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */
2064    compression_counters.compressed_size += bytes_xmit - 8;
2065    compression_counters.pages++;
2066}
2067
2068static bool save_page_use_compression(RAMState *rs);
2069
2070static void flush_compressed_data(RAMState *rs)
2071{
2072    int idx, len, thread_count;
2073
2074    if (!save_page_use_compression(rs)) {
2075        return;
2076    }
2077    thread_count = migrate_compress_threads();
2078
2079    qemu_mutex_lock(&comp_done_lock);
2080    for (idx = 0; idx < thread_count; idx++) {
2081        while (!comp_param[idx].done) {
2082            qemu_cond_wait(&comp_done_cond, &comp_done_lock);
2083        }
2084    }
2085    qemu_mutex_unlock(&comp_done_lock);
2086
2087    for (idx = 0; idx < thread_count; idx++) {
2088        qemu_mutex_lock(&comp_param[idx].mutex);
2089        if (!comp_param[idx].quit) {
2090            len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
2091            /*
2092             * it's safe to fetch zero_page without holding comp_done_lock
2093             * as there is no further request submitted to the thread,
2094             * i.e, the thread should be waiting for a request at this point.
2095             */
2096            update_compress_thread_counts(&comp_param[idx], len);
2097        }
2098        qemu_mutex_unlock(&comp_param[idx].mutex);
2099    }
2100}
2101
2102static inline void set_compress_params(CompressParam *param, RAMBlock *block,
2103                                       ram_addr_t offset)
2104{
2105    param->block = block;
2106    param->offset = offset;
2107}
2108
2109static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
2110                                           ram_addr_t offset)
2111{
2112    int idx, thread_count, bytes_xmit = -1, pages = -1;
2113    bool wait = migrate_compress_wait_thread();
2114
2115    thread_count = migrate_compress_threads();
2116    qemu_mutex_lock(&comp_done_lock);
2117retry:
2118    for (idx = 0; idx < thread_count; idx++) {
2119        if (comp_param[idx].done) {
2120            comp_param[idx].done = false;
2121            bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
2122            qemu_mutex_lock(&comp_param[idx].mutex);
2123            set_compress_params(&comp_param[idx], block, offset);
2124            qemu_cond_signal(&comp_param[idx].cond);
2125            qemu_mutex_unlock(&comp_param[idx].mutex);
2126            pages = 1;
2127            update_compress_thread_counts(&comp_param[idx], bytes_xmit);
2128            break;
2129        }
2130    }
2131
2132    /*
2133     * wait for the free thread if the user specifies 'compress-wait-thread',
2134     * otherwise we will post the page out in the main thread as normal page.
2135     */
2136    if (pages < 0 && wait) {
2137        qemu_cond_wait(&comp_done_cond, &comp_done_lock);
2138        goto retry;
2139    }
2140    qemu_mutex_unlock(&comp_done_lock);
2141
2142    return pages;
2143}
2144
2145/**
2146 * find_dirty_block: find the next dirty page and update any state
2147 * associated with the search process.
2148 *
2149 * Returns if a page is found
2150 *
2151 * @rs: current RAM state
2152 * @pss: data about the state of the current dirty page scan
2153 * @again: set to false if the search has scanned the whole of RAM
2154 */
2155static bool find_dirty_block(RAMState *rs, PageSearchStatus *pss, bool *again)
2156{
2157    pss->page = migration_bitmap_find_dirty(rs, pss->block, pss->page);
2158    if (pss->complete_round && pss->block == rs->last_seen_block &&
2159        pss->page >= rs->last_page) {
2160        /*
2161         * We've been once around the RAM and haven't found anything.
2162         * Give up.
2163         */
2164        *again = false;
2165        return false;
2166    }
2167    if ((pss->page << TARGET_PAGE_BITS) >= pss->block->used_length) {
2168        /* Didn't find anything in this RAM Block */
2169        pss->page = 0;
2170        pss->block = QLIST_NEXT_RCU(pss->block, next);
2171        if (!pss->block) {
2172            /*
2173             * If memory migration starts over, we will meet a dirtied page
2174             * which may still exists in compression threads's ring, so we
2175             * should flush the compressed data to make sure the new page
2176             * is not overwritten by the old one in the destination.
2177             *
2178             * Also If xbzrle is on, stop using the data compression at this
2179             * point. In theory, xbzrle can do better than compression.
2180             */
2181            flush_compressed_data(rs);
2182
2183            /* Hit the end of the list */
2184            pss->block = QLIST_FIRST_RCU(&ram_list.blocks);
2185            /* Flag that we've looped */
2186            pss->complete_round = true;
2187            rs->ram_bulk_stage = false;
2188        }
2189        /* Didn't find anything this time, but try again on the new block */
2190        *again = true;
2191        return false;
2192    } else {
2193        /* Can go around again, but... */
2194        *again = true;
2195        /* We've found something so probably don't need to */
2196        return true;
2197    }
2198}
2199
2200/**
2201 * unqueue_page: gets a page of the queue
2202 *
2203 * Helper for 'get_queued_page' - gets a page off the queue
2204 *
2205 * Returns the block of the page (or NULL if none available)
2206 *
2207 * @rs: current RAM state
2208 * @offset: used to return the offset within the RAMBlock
2209 */
2210static RAMBlock *unqueue_page(RAMState *rs, ram_addr_t *offset)
2211{
2212    RAMBlock *block = NULL;
2213
2214    if (QSIMPLEQ_EMPTY_ATOMIC(&rs->src_page_requests)) {
2215        return NULL;
2216    }
2217
2218    qemu_mutex_lock(&rs->src_page_req_mutex);
2219    if (!QSIMPLEQ_EMPTY(&rs->src_page_requests)) {
2220        struct RAMSrcPageRequest *entry =
2221                                QSIMPLEQ_FIRST(&rs->src_page_requests);
2222        block = entry->rb;
2223        *offset = entry->offset;
2224
2225        if (entry->len > TARGET_PAGE_SIZE) {
2226            entry->len -= TARGET_PAGE_SIZE;
2227            entry->offset += TARGET_PAGE_SIZE;
2228        } else {
2229            memory_region_unref(block->mr);
2230            QSIMPLEQ_REMOVE_HEAD(&rs->src_page_requests, next_req);
2231            g_free(entry);
2232            migration_consume_urgent_request();
2233        }
2234    }
2235    qemu_mutex_unlock(&rs->src_page_req_mutex);
2236
2237    return block;
2238}
2239
2240/**
2241 * get_queued_page: unqueue a page from the postocpy requests
2242 *
2243 * Skips pages that are already sent (!dirty)
2244 *
2245 * Returns if a queued page is found
2246 *
2247 * @rs: current RAM state
2248 * @pss: data about the state of the current dirty page scan
2249 */
2250static bool get_queued_page(RAMState *rs, PageSearchStatus *pss)
2251{
2252    RAMBlock  *block;
2253    ram_addr_t offset;
2254    bool dirty;
2255
2256    do {
2257        block = unqueue_page(rs, &offset);
2258        /*
2259         * We're sending this page, and since it's postcopy nothing else
2260         * will dirty it, and we must make sure it doesn't get sent again
2261         * even if this queue request was received after the background
2262         * search already sent it.
2263         */
2264        if (block) {
2265            unsigned long page;
2266
2267            page = offset >> TARGET_PAGE_BITS;
2268            dirty = test_bit(page, block->bmap);
2269            if (!dirty) {
2270                trace_get_queued_page_not_dirty(block->idstr, (uint64_t)offset,
2271                       page, test_bit(page, block->unsentmap));
2272            } else {
2273                trace_get_queued_page(block->idstr, (uint64_t)offset, page);
2274            }
2275        }
2276
2277    } while (block && !dirty);
2278
2279    if (block) {
2280        /*
2281         * As soon as we start servicing pages out of order, then we have
2282         * to kill the bulk stage, since the bulk stage assumes
2283         * in (migration_bitmap_find_and_reset_dirty) that every page is
2284         * dirty, that's no longer true.
2285         */
2286        rs->ram_bulk_stage = false;
2287
2288        /*
2289         * We want the background search to continue from the queued page
2290         * since the guest is likely to want other pages near to the page
2291         * it just requested.
2292         */
2293        pss->block = block;
2294        pss->page = offset >> TARGET_PAGE_BITS;
2295    }
2296
2297    return !!block;
2298}
2299
2300/**
2301 * migration_page_queue_free: drop any remaining pages in the ram
2302 * request queue
2303 *
2304 * It should be empty at the end anyway, but in error cases there may
2305 * be some left.  in case that there is any page left, we drop it.
2306 *
2307 */
2308static void migration_page_queue_free(RAMState *rs)
2309{
2310    struct RAMSrcPageRequest *mspr, *next_mspr;
2311    /* This queue generally should be empty - but in the case of a failed
2312     * migration might have some droppings in.
2313     */
2314    rcu_read_lock();
2315    QSIMPLEQ_FOREACH_SAFE(mspr, &rs->src_page_requests, next_req, next_mspr) {
2316        memory_region_unref(mspr->rb->mr);
2317        QSIMPLEQ_REMOVE_HEAD(&rs->src_page_requests, next_req);
2318        g_free(mspr);
2319    }
2320    rcu_read_unlock();
2321}
2322
2323/**
2324 * ram_save_queue_pages: queue the page for transmission
2325 *
2326 * A request from postcopy destination for example.
2327 *
2328 * Returns zero on success or negative on error
2329 *
2330 * @rbname: Name of the RAMBLock of the request. NULL means the
2331 *          same that last one.
2332 * @start: starting address from the start of the RAMBlock
2333 * @len: length (in bytes) to send
2334 */
2335int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len)
2336{
2337    RAMBlock *ramblock;
2338    RAMState *rs = ram_state;
2339
2340    ram_counters.postcopy_requests++;
2341    rcu_read_lock();
2342    if (!rbname) {
2343        /* Reuse last RAMBlock */
2344        ramblock = rs->last_req_rb;
2345
2346        if (!ramblock) {
2347            /*
2348             * Shouldn't happen, we can't reuse the last RAMBlock if
2349             * it's the 1st request.
2350             */
2351            error_report("ram_save_queue_pages no previous block");
2352            goto err;
2353        }
2354    } else {
2355        ramblock = qemu_ram_block_by_name(rbname);
2356
2357        if (!ramblock) {
2358            /* We shouldn't be asked for a non-existent RAMBlock */
2359            error_report("ram_save_queue_pages no block '%s'", rbname);
2360            goto err;
2361        }
2362        rs->last_req_rb = ramblock;
2363    }
2364    trace_ram_save_queue_pages(ramblock->idstr, start, len);
2365    if (start+len > ramblock->used_length) {
2366        error_report("%s request overrun start=" RAM_ADDR_FMT " len="
2367                     RAM_ADDR_FMT " blocklen=" RAM_ADDR_FMT,
2368                     __func__, start, len, ramblock->used_length);
2369        goto err;
2370    }
2371
2372    struct RAMSrcPageRequest *new_entry =
2373        g_malloc0(sizeof(struct RAMSrcPageRequest));
2374    new_entry->rb = ramblock;
2375    new_entry->offset = start;
2376    new_entry->len = len;
2377
2378    memory_region_ref(ramblock->mr);
2379    qemu_mutex_lock(&rs->src_page_req_mutex);
2380    QSIMPLEQ_INSERT_TAIL(&rs->src_page_requests, new_entry, next_req);
2381    migration_make_urgent_request();
2382    qemu_mutex_unlock(&rs->src_page_req_mutex);
2383    rcu_read_unlock();
2384
2385    return 0;
2386
2387err:
2388    rcu_read_unlock();
2389    return -1;
2390}
2391
2392static bool save_page_use_compression(RAMState *rs)
2393{
2394    if (!migrate_use_compression()) {
2395        return false;
2396    }
2397
2398    /*
2399     * If xbzrle is on, stop using the data compression after first
2400     * round of migration even if compression is enabled. In theory,
2401     * xbzrle can do better than compression.
2402     */
2403    if (rs->ram_bulk_stage || !migrate_use_xbzrle()) {
2404        return true;
2405    }
2406
2407    return false;
2408}
2409
2410/*
2411 * try to compress the page before posting it out, return true if the page
2412 * has been properly handled by compression, otherwise needs other
2413 * paths to handle it
2414 */
2415static bool save_compress_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
2416{
2417    if (!save_page_use_compression(rs)) {
2418        return false;
2419    }
2420
2421    /*
2422     * When starting the process of a new block, the first page of
2423     * the block should be sent out before other pages in the same
2424     * block, and all the pages in last block should have been sent
2425     * out, keeping this order is important, because the 'cont' flag
2426     * is used to avoid resending the block name.
2427     *
2428     * We post the fist page as normal page as compression will take
2429     * much CPU resource.
2430     */
2431    if (block != rs->last_sent_block) {
2432        flush_compressed_data(rs);
2433        return false;
2434    }
2435
2436    if (compress_page_with_multi_thread(rs, block, offset) > 0) {
2437        return true;
2438    }
2439
2440    compression_counters.busy++;
2441    return false;
2442}
2443
2444/**
2445 * ram_save_target_page: save one target page
2446 *
2447 * Returns the number of pages written
2448 *
2449 * @rs: current RAM state
2450 * @pss: data about the page we want to send
2451 * @last_stage: if we are at the completion stage
2452 */
2453static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
2454                                bool last_stage)
2455{
2456    RAMBlock *block = pss->block;
2457    ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
2458    int res;
2459
2460    if (control_save_page(rs, block, offset, &res)) {
2461        return res;
2462    }
2463
2464    if (save_compress_page(rs, block, offset)) {
2465        return 1;
2466    }
2467
2468    res = save_zero_page(rs, block, offset);
2469    if (res > 0) {
2470        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
2471         * page would be stale
2472         */
2473        if (!save_page_use_compression(rs)) {
2474            XBZRLE_cache_lock();
2475            xbzrle_cache_zero_page(rs, block->offset + offset);
2476            XBZRLE_cache_unlock();
2477        }
2478        ram_release_pages(block->idstr, offset, res);
2479        return res;
2480    }
2481
2482    /*
2483     * do not use multifd for compression as the first page in the new
2484     * block should be posted out before sending the compressed page
2485     */
2486    if (!save_page_use_compression(rs) && migrate_use_multifd()) {
2487        return ram_save_multifd_page(rs, block, offset);
2488    }
2489
2490    return ram_save_page(rs, pss, last_stage);
2491}
2492
2493/**
2494 * ram_save_host_page: save a whole host page
2495 *
2496 * Starting at *offset send pages up to the end of the current host
2497 * page. It's valid for the initial offset to point into the middle of
2498 * a host page in which case the remainder of the hostpage is sent.
2499 * Only dirty target pages are sent. Note that the host page size may
2500 * be a huge page for this block.
2501 * The saving stops at the boundary of the used_length of the block
2502 * if the RAMBlock isn't a multiple of the host page size.
2503 *
2504 * Returns the number of pages written or negative on error
2505 *
2506 * @rs: current RAM state
2507 * @ms: current migration state
2508 * @pss: data about the page we want to send
2509 * @last_stage: if we are at the completion stage
2510 */
2511static int ram_save_host_page(RAMState *rs, PageSearchStatus *pss,
2512                              bool last_stage)
2513{
2514    int tmppages, pages = 0;
2515    size_t pagesize_bits =
2516        qemu_ram_pagesize(pss->block) >> TARGET_PAGE_BITS;
2517
2518    if (ramblock_is_ignored(pss->block)) {
2519        error_report("block %s should not be migrated !", pss->block->idstr);
2520        return 0;
2521    }
2522
2523    do {
2524        /* Check the pages is dirty and if it is send it */
2525        if (!migration_bitmap_clear_dirty(rs, pss->block, pss->page)) {
2526            pss->page++;
2527            continue;
2528        }
2529
2530        tmppages = ram_save_target_page(rs, pss, last_stage);
2531        if (tmppages < 0) {
2532            return tmppages;
2533        }
2534
2535        pages += tmppages;
2536        if (pss->block->unsentmap) {
2537            clear_bit(pss->page, pss->block->unsentmap);
2538        }
2539
2540        pss->page++;
2541    } while ((pss->page & (pagesize_bits - 1)) &&
2542             offset_in_ramblock(pss->block, pss->page << TARGET_PAGE_BITS));
2543
2544    /* The offset we leave with is the last one we looked at */
2545    pss->page--;
2546    return pages;
2547}
2548
2549/**
2550 * ram_find_and_save_block: finds a dirty page and sends it to f
2551 *
2552 * Called within an RCU critical section.
2553 *
2554 * Returns the number of pages written where zero means no dirty pages,
2555 * or negative on error
2556 *
2557 * @rs: current RAM state
2558 * @last_stage: if we are at the completion stage
2559 *
2560 * On systems where host-page-size > target-page-size it will send all the
2561 * pages in a host page that are dirty.
2562 */
2563
2564static int ram_find_and_save_block(RAMState *rs, bool last_stage)
2565{
2566    PageSearchStatus pss;
2567    int pages = 0;
2568    bool again, found;
2569
2570    /* No dirty page as there is zero RAM */
2571    if (!ram_bytes_total()) {
2572        return pages;
2573    }
2574
2575    pss.block = rs->last_seen_block;
2576    pss.page = rs->last_page;
2577    pss.complete_round = false;
2578
2579    if (!pss.block) {
2580        pss.block = QLIST_FIRST_RCU(&ram_list.blocks);
2581    }
2582
2583    do {
2584        again = true;
2585        found = get_queued_page(rs, &pss);
2586
2587        if (!found) {
2588            /* priority queue empty, so just search for something dirty */
2589            found = find_dirty_block(rs, &pss, &again);
2590        }
2591
2592        if (found) {
2593            pages = ram_save_host_page(rs, &pss, last_stage);
2594        }
2595    } while (!pages && again);
2596
2597    rs->last_seen_block = pss.block;
2598    rs->last_page = pss.page;
2599
2600    return pages;
2601}
2602
2603void acct_update_position(QEMUFile *f, size_t size, bool zero)
2604{
2605    uint64_t pages = size / TARGET_PAGE_SIZE;
2606
2607    if (zero) {
2608        ram_counters.duplicate += pages;
2609    } else {
2610        ram_counters.normal += pages;
2611        ram_counters.transferred += size;
2612        qemu_update_position(f, size);
2613    }
2614}
2615
2616static uint64_t ram_bytes_total_common(bool count_ignored)
2617{
2618    RAMBlock *block;
2619    uint64_t total = 0;
2620
2621    rcu_read_lock();
2622    if (count_ignored) {
2623        RAMBLOCK_FOREACH_MIGRATABLE(block) {
2624            total += block->used_length;
2625        }
2626    } else {
2627        RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2628            total += block->used_length;
2629        }
2630    }
2631    rcu_read_unlock();
2632    return total;
2633}
2634
2635uint64_t ram_bytes_total(void)
2636{
2637    return ram_bytes_total_common(false);
2638}
2639
2640static void xbzrle_load_setup(void)
2641{
2642    XBZRLE.decoded_buf = g_malloc(TARGET_PAGE_SIZE);
2643}
2644
2645static void xbzrle_load_cleanup(void)
2646{
2647    g_free(XBZRLE.decoded_buf);
2648    XBZRLE.decoded_buf = NULL;
2649}
2650
2651static void ram_state_cleanup(RAMState **rsp)
2652{
2653    if (*rsp) {
2654        migration_page_queue_free(*rsp);
2655        qemu_mutex_destroy(&(*rsp)->bitmap_mutex);
2656        qemu_mutex_destroy(&(*rsp)->src_page_req_mutex);
2657        g_free(*rsp);
2658        *rsp = NULL;
2659    }
2660}
2661
2662static void xbzrle_cleanup(void)
2663{
2664    XBZRLE_cache_lock();
2665    if (XBZRLE.cache) {
2666        cache_fini(XBZRLE.cache);
2667        g_free(XBZRLE.encoded_buf);
2668        g_free(XBZRLE.current_buf);
2669        g_free(XBZRLE.zero_target_page);
2670        XBZRLE.cache = NULL;
2671        XBZRLE.encoded_buf = NULL;
2672        XBZRLE.current_buf = NULL;
2673        XBZRLE.zero_target_page = NULL;
2674    }
2675    XBZRLE_cache_unlock();
2676}
2677
2678static void ram_save_cleanup(void *opaque)
2679{
2680    RAMState **rsp = opaque;
2681    RAMBlock *block;
2682
2683    /* caller have hold iothread lock or is in a bh, so there is
2684     * no writing race against this migration_bitmap
2685     */
2686    memory_global_dirty_log_stop();
2687
2688    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2689        g_free(block->bmap);
2690        block->bmap = NULL;
2691        g_free(block->unsentmap);
2692        block->unsentmap = NULL;
2693    }
2694
2695    xbzrle_cleanup();
2696    compress_threads_save_cleanup();
2697    ram_state_cleanup(rsp);
2698}
2699
2700static void ram_state_reset(RAMState *rs)
2701{
2702    rs->last_seen_block = NULL;
2703    rs->last_sent_block = NULL;
2704    rs->last_page = 0;
2705    rs->last_version = ram_list.version;
2706    rs->ram_bulk_stage = true;
2707    rs->fpo_enabled = false;
2708}
2709
2710#define MAX_WAIT 50 /* ms, half buffered_file limit */
2711
2712/*
2713 * 'expected' is the value you expect the bitmap mostly to be full
2714 * of; it won't bother printing lines that are all this value.
2715 * If 'todump' is null the migration bitmap is dumped.
2716 */
2717void ram_debug_dump_bitmap(unsigned long *todump, bool expected,
2718                           unsigned long pages)
2719{
2720    int64_t cur;
2721    int64_t linelen = 128;
2722    char linebuf[129];
2723
2724    for (cur = 0; cur < pages; cur += linelen) {
2725        int64_t curb;
2726        bool found = false;
2727        /*
2728         * Last line; catch the case where the line length
2729         * is longer than remaining ram
2730         */
2731        if (cur + linelen > pages) {
2732            linelen = pages - cur;
2733        }
2734        for (curb = 0; curb < linelen; curb++) {
2735            bool thisbit = test_bit(cur + curb, todump);
2736            linebuf[curb] = thisbit ? '1' : '.';
2737            found = found || (thisbit != expected);
2738        }
2739        if (found) {
2740            linebuf[curb] = '\0';
2741            fprintf(stderr,  "0x%08" PRIx64 " : %s\n", cur, linebuf);
2742        }
2743    }
2744}
2745
2746/* **** functions for postcopy ***** */
2747
2748void ram_postcopy_migrated_memory_release(MigrationState *ms)
2749{
2750    struct RAMBlock *block;
2751
2752    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2753        unsigned long *bitmap = block->bmap;
2754        unsigned long range = block->used_length >> TARGET_PAGE_BITS;
2755        unsigned long run_start = find_next_zero_bit(bitmap, range, 0);
2756
2757        while (run_start < range) {
2758            unsigned long run_end = find_next_bit(bitmap, range, run_start + 1);
2759            ram_discard_range(block->idstr, run_start << TARGET_PAGE_BITS,
2760                              (run_end - run_start) << TARGET_PAGE_BITS);
2761            run_start = find_next_zero_bit(bitmap, range, run_end + 1);
2762        }
2763    }
2764}
2765
2766/**
2767 * postcopy_send_discard_bm_ram: discard a RAMBlock
2768 *
2769 * Returns zero on success
2770 *
2771 * Callback from postcopy_each_ram_send_discard for each RAMBlock
2772 * Note: At this point the 'unsentmap' is the processed bitmap combined
2773 *       with the dirtymap; so a '1' means it's either dirty or unsent.
2774 *
2775 * @ms: current migration state
2776 * @pds: state for postcopy
2777 * @start: RAMBlock starting page
2778 * @length: RAMBlock size
2779 */
2780static int postcopy_send_discard_bm_ram(MigrationState *ms,
2781                                        PostcopyDiscardState *pds,
2782                                        RAMBlock *block)
2783{
2784    unsigned long end = block->used_length >> TARGET_PAGE_BITS;
2785    unsigned long current;
2786    unsigned long *unsentmap = block->unsentmap;
2787
2788    for (current = 0; current < end; ) {
2789        unsigned long one = find_next_bit(unsentmap, end, current);
2790
2791        if (one <= end) {
2792            unsigned long zero = find_next_zero_bit(unsentmap, end, one + 1);
2793            unsigned long discard_length;
2794
2795            if (zero >= end) {
2796                discard_length = end - one;
2797            } else {
2798                discard_length = zero - one;
2799            }
2800            if (discard_length) {
2801                postcopy_discard_send_range(ms, pds, one, discard_length);
2802            }
2803            current = one + discard_length;
2804        } else {
2805            current = one;
2806        }
2807    }
2808
2809    return 0;
2810}
2811
2812/**
2813 * postcopy_each_ram_send_discard: discard all RAMBlocks
2814 *
2815 * Returns 0 for success or negative for error
2816 *
2817 * Utility for the outgoing postcopy code.
2818 *   Calls postcopy_send_discard_bm_ram for each RAMBlock
2819 *   passing it bitmap indexes and name.
2820 * (qemu_ram_foreach_block ends up passing unscaled lengths
2821 *  which would mean postcopy code would have to deal with target page)
2822 *
2823 * @ms: current migration state
2824 */
2825static int postcopy_each_ram_send_discard(MigrationState *ms)
2826{
2827    struct RAMBlock *block;
2828    int ret;
2829
2830    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2831        PostcopyDiscardState *pds =
2832            postcopy_discard_send_init(ms, block->idstr);
2833
2834        /*
2835         * Postcopy sends chunks of bitmap over the wire, but it
2836         * just needs indexes at this point, avoids it having
2837         * target page specific code.
2838         */
2839        ret = postcopy_send_discard_bm_ram(ms, pds, block);
2840        postcopy_discard_send_finish(ms, pds);
2841        if (ret) {
2842            return ret;
2843        }
2844    }
2845
2846    return 0;
2847}
2848
2849/**
2850 * postcopy_chunk_hostpages_pass: canocalize bitmap in hostpages
2851 *
2852 * Helper for postcopy_chunk_hostpages; it's called twice to
2853 * canonicalize the two bitmaps, that are similar, but one is
2854 * inverted.
2855 *
2856 * Postcopy requires that all target pages in a hostpage are dirty or
2857 * clean, not a mix.  This function canonicalizes the bitmaps.
2858 *
2859 * @ms: current migration state
2860 * @unsent_pass: if true we need to canonicalize partially unsent host pages
2861 *               otherwise we need to canonicalize partially dirty host pages
2862 * @block: block that contains the page we want to canonicalize
2863 * @pds: state for postcopy
2864 */
2865static void postcopy_chunk_hostpages_pass(MigrationState *ms, bool unsent_pass,
2866                                          RAMBlock *block,
2867                                          PostcopyDiscardState *pds)
2868{
2869    RAMState *rs = ram_state;
2870    unsigned long *bitmap = block->bmap;
2871    unsigned long *unsentmap = block->unsentmap;
2872    unsigned int host_ratio = block->page_size / TARGET_PAGE_SIZE;
2873    unsigned long pages = block->used_length >> TARGET_PAGE_BITS;
2874    unsigned long run_start;
2875
2876    if (block->page_size == TARGET_PAGE_SIZE) {
2877        /* Easy case - TPS==HPS for a non-huge page RAMBlock */
2878        return;
2879    }
2880
2881    if (unsent_pass) {
2882        /* Find a sent page */
2883        run_start = find_next_zero_bit(unsentmap, pages, 0);
2884    } else {
2885        /* Find a dirty page */
2886        run_start = find_next_bit(bitmap, pages, 0);
2887    }
2888
2889    while (run_start < pages) {
2890        bool do_fixup = false;
2891        unsigned long fixup_start_addr;
2892        unsigned long host_offset;
2893
2894        /*
2895         * If the start of this run of pages is in the middle of a host
2896         * page, then we need to fixup this host page.
2897         */
2898        host_offset = run_start % host_ratio;
2899        if (host_offset) {
2900            do_fixup = true;
2901            run_start -= host_offset;
2902            fixup_start_addr = run_start;
2903            /* For the next pass */
2904            run_start = run_start + host_ratio;
2905        } else {
2906            /* Find the end of this run */
2907            unsigned long run_end;
2908            if (unsent_pass) {
2909                run_end = find_next_bit(unsentmap, pages, run_start + 1);
2910            } else {
2911                run_end = find_next_zero_bit(bitmap, pages, run_start + 1);
2912            }
2913            /*
2914             * If the end isn't at the start of a host page, then the
2915             * run doesn't finish at the end of a host page
2916             * and we need to discard.
2917             */
2918            host_offset = run_end % host_ratio;
2919            if (host_offset) {
2920                do_fixup = true;
2921                fixup_start_addr = run_end - host_offset;
2922                /*
2923                 * This host page has gone, the next loop iteration starts
2924                 * from after the fixup
2925                 */
2926                run_start = fixup_start_addr + host_ratio;
2927            } else {
2928                /*
2929                 * No discards on this iteration, next loop starts from
2930                 * next sent/dirty page
2931                 */
2932                run_start = run_end + 1;
2933            }
2934        }
2935
2936        if (do_fixup) {
2937            unsigned long page;
2938
2939            /* Tell the destination to discard this page */
2940            if (unsent_pass || !test_bit(fixup_start_addr, unsentmap)) {
2941                /* For the unsent_pass we:
2942                 *     discard partially sent pages
2943                 * For the !unsent_pass (dirty) we:
2944                 *     discard partially dirty pages that were sent
2945                 *     (any partially sent pages were already discarded
2946                 *     by the previous unsent_pass)
2947                 */
2948                postcopy_discard_send_range(ms, pds, fixup_start_addr,
2949                                            host_ratio);
2950            }
2951
2952            /* Clean up the bitmap */
2953            for (page = fixup_start_addr;
2954                 page < fixup_start_addr + host_ratio; page++) {
2955                /* All pages in this host page are now not sent */
2956                set_bit(page, unsentmap);
2957
2958                /*
2959                 * Remark them as dirty, updating the count for any pages
2960                 * that weren't previously dirty.
2961                 */
2962                rs->migration_dirty_pages += !test_and_set_bit(page, bitmap);
2963            }
2964        }
2965
2966        if (unsent_pass) {
2967            /* Find the next sent page for the next iteration */
2968            run_start = find_next_zero_bit(unsentmap, pages, run_start);
2969        } else {
2970            /* Find the next dirty page for the next iteration */
2971            run_start = find_next_bit(bitmap, pages, run_start);
2972        }
2973    }
2974}
2975
2976/**
2977 * postcopy_chuck_hostpages: discrad any partially sent host page
2978 *
2979 * Utility for the outgoing postcopy code.
2980 *
2981 * Discard any partially sent host-page size chunks, mark any partially
2982 * dirty host-page size chunks as all dirty.  In this case the host-page
2983 * is the host-page for the particular RAMBlock, i.e. it might be a huge page
2984 *
2985 * Returns zero on success
2986 *
2987 * @ms: current migration state
2988 * @block: block we want to work with
2989 */
2990static int postcopy_chunk_hostpages(MigrationState *ms, RAMBlock *block)
2991{
2992    PostcopyDiscardState *pds =
2993        postcopy_discard_send_init(ms, block->idstr);
2994
2995    /* First pass: Discard all partially sent host pages */
2996    postcopy_chunk_hostpages_pass(ms, true, block, pds);
2997    /*
2998     * Second pass: Ensure that all partially dirty host pages are made
2999     * fully dirty.
3000     */
3001    postcopy_chunk_hostpages_pass(ms, false, block, pds);
3002
3003    postcopy_discard_send_finish(ms, pds);
3004    return 0;
3005}
3006
3007/**
3008 * ram_postcopy_send_discard_bitmap: transmit the discard bitmap
3009 *
3010 * Returns zero on success
3011 *
3012 * Transmit the set of pages to be discarded after precopy to the target
3013 * these are pages that:
3014 *     a) Have been previously transmitted but are now dirty again
3015 *     b) Pages that have never been transmitted, this ensures that
3016 *        any pages on the destination that have been mapped by background
3017 *        tasks get discarded (transparent huge pages is the specific concern)
3018 * Hopefully this is pretty sparse
3019 *
3020 * @ms: current migration state
3021 */
3022int ram_postcopy_send_discard_bitmap(MigrationState *ms)
3023{
3024    RAMState *rs = ram_state;
3025    RAMBlock *block;
3026    int ret;
3027
3028    rcu_read_lock();
3029
3030    /* This should be our last sync, the src is now paused */
3031    migration_bitmap_sync(rs);
3032
3033    /* Easiest way to make sure we don't resume in the middle of a host-page */
3034    rs->last_seen_block = NULL;
3035    rs->last_sent_block = NULL;
3036    rs->last_page = 0;
3037
3038    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3039        unsigned long pages = block->used_length >> TARGET_PAGE_BITS;
3040        unsigned long *bitmap = block->bmap;
3041        unsigned long *unsentmap = block->unsentmap;
3042
3043        if (!unsentmap) {
3044            /* We don't have a safe way to resize the sentmap, so
3045             * if the bitmap was resized it will be NULL at this
3046             * point.
3047             */
3048            error_report("migration ram resized during precopy phase");
3049            rcu_read_unlock();
3050            return -EINVAL;
3051        }
3052        /* Deal with TPS != HPS and huge pages */
3053        ret = postcopy_chunk_hostpages(ms, block);
3054        if (ret) {
3055            rcu_read_unlock();
3056            return ret;
3057        }
3058
3059        /*
3060         * Update the unsentmap to be unsentmap = unsentmap | dirty
3061         */
3062        bitmap_or(unsentmap, unsentmap, bitmap, pages);
3063#ifdef DEBUG_POSTCOPY
3064        ram_debug_dump_bitmap(unsentmap, true, pages);
3065#endif
3066    }
3067    trace_ram_postcopy_send_discard_bitmap();
3068
3069    ret = postcopy_each_ram_send_discard(ms);
3070    rcu_read_unlock();
3071
3072    return ret;
3073}
3074
3075/**
3076 * ram_discard_range: discard dirtied pages at the beginning of postcopy
3077 *
3078 * Returns zero on success
3079 *
3080 * @rbname: name of the RAMBlock of the request. NULL means the
3081 *          same that last one.
3082 * @start: RAMBlock starting page
3083 * @length: RAMBlock size
3084 */
3085int ram_discard_range(const char *rbname, uint64_t start, size_t length)
3086{
3087    int ret = -1;
3088
3089    trace_ram_discard_range(rbname, start, length);
3090
3091    rcu_read_lock();
3092    RAMBlock *rb = qemu_ram_block_by_name(rbname);
3093
3094    if (!rb) {
3095        error_report("ram_discard_range: Failed to find block '%s'", rbname);
3096        goto err;
3097    }
3098
3099    /*
3100     * On source VM, we don't need to update the received bitmap since
3101     * we don't even have one.
3102     */
3103    if (rb->receivedmap) {
3104        bitmap_clear(rb->receivedmap, start >> qemu_target_page_bits(),
3105                     length >> qemu_target_page_bits());
3106    }
3107
3108    ret = ram_block_discard_range(rb, start, length);
3109
3110err:
3111    rcu_read_unlock();
3112
3113    return ret;
3114}
3115
3116/*
3117 * For every allocation, we will try not to crash the VM if the
3118 * allocation failed.
3119 */
3120static int xbzrle_init(void)
3121{
3122    Error *local_err = NULL;
3123
3124    if (!migrate_use_xbzrle()) {
3125        return 0;
3126    }
3127
3128    XBZRLE_cache_lock();
3129
3130    XBZRLE.zero_target_page = g_try_malloc0(TARGET_PAGE_SIZE);
3131    if (!XBZRLE.zero_target_page) {
3132        error_report("%s: Error allocating zero page", __func__);
3133        goto err_out;
3134    }
3135
3136    XBZRLE.cache = cache_init(migrate_xbzrle_cache_size(),
3137                              TARGET_PAGE_SIZE, &local_err);
3138    if (!XBZRLE.cache) {
3139        error_report_err(local_err);
3140        goto free_zero_page;
3141    }
3142
3143    XBZRLE.encoded_buf = g_try_malloc0(TARGET_PAGE_SIZE);
3144    if (!XBZRLE.encoded_buf) {
3145        error_report("%s: Error allocating encoded_buf", __func__);
3146        goto free_cache;
3147    }
3148
3149    XBZRLE.current_buf = g_try_malloc(TARGET_PAGE_SIZE);
3150    if (!XBZRLE.current_buf) {
3151        error_report("%s: Error allocating current_buf", __func__);
3152        goto free_encoded_buf;
3153    }
3154
3155    /* We are all good */
3156    XBZRLE_cache_unlock();
3157    return 0;
3158
3159free_encoded_buf:
3160    g_free(XBZRLE.encoded_buf);
3161    XBZRLE.encoded_buf = NULL;
3162free_cache:
3163    cache_fini(XBZRLE.cache);
3164    XBZRLE.cache = NULL;
3165free_zero_page:
3166    g_free(XBZRLE.zero_target_page);
3167    XBZRLE.zero_target_page = NULL;
3168err_out:
3169    XBZRLE_cache_unlock();
3170    return -ENOMEM;
3171}
3172
3173static int ram_state_init(RAMState **rsp)
3174{
3175    *rsp = g_try_new0(RAMState, 1);
3176
3177    if (!*rsp) {
3178        error_report("%s: Init ramstate fail", __func__);
3179        return -1;
3180    }
3181
3182    qemu_mutex_init(&(*rsp)->bitmap_mutex);
3183    qemu_mutex_init(&(*rsp)->src_page_req_mutex);
3184    QSIMPLEQ_INIT(&(*rsp)->src_page_requests);
3185
3186    /*
3187     * Count the total number of pages used by ram blocks not including any
3188     * gaps due to alignment or unplugs.
3189     */
3190    (*rsp)->migration_dirty_pages = ram_bytes_total() >> TARGET_PAGE_BITS;
3191
3192    ram_state_reset(*rsp);
3193
3194    return 0;
3195}
3196
3197static void ram_list_init_bitmaps(void)
3198{
3199    RAMBlock *block;
3200    unsigned long pages;
3201
3202    /* Skip setting bitmap if there is no RAM */
3203    if (ram_bytes_total()) {
3204        RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3205            pages = block->max_length >> TARGET_PAGE_BITS;
3206            block->bmap = bitmap_new(pages);
3207            bitmap_set(block->bmap, 0, pages);
3208            if (migrate_postcopy_ram()) {
3209                block->unsentmap = bitmap_new(pages);
3210                bitmap_set(block->unsentmap, 0, pages);
3211            }
3212        }
3213    }
3214}
3215
3216static void ram_init_bitmaps(RAMState *rs)
3217{
3218    /* For memory_global_dirty_log_start below.  */
3219    qemu_mutex_lock_iothread();
3220    qemu_mutex_lock_ramlist();
3221    rcu_read_lock();
3222
3223    ram_list_init_bitmaps();
3224    memory_global_dirty_log_start();
3225    migration_bitmap_sync_precopy(rs);
3226
3227    rcu_read_unlock();
3228    qemu_mutex_unlock_ramlist();
3229    qemu_mutex_unlock_iothread();
3230}
3231
3232static int ram_init_all(RAMState **rsp)
3233{
3234    if (ram_state_init(rsp)) {
3235        return -1;
3236    }
3237
3238    if (xbzrle_init()) {
3239        ram_state_cleanup(rsp);
3240        return -1;
3241    }
3242
3243    ram_init_bitmaps(*rsp);
3244
3245    return 0;
3246}
3247
3248static void ram_state_resume_prepare(RAMState *rs, QEMUFile *out)
3249{
3250    RAMBlock *block;
3251    uint64_t pages = 0;
3252
3253    /*
3254     * Postcopy is not using xbzrle/compression, so no need for that.
3255     * Also, since source are already halted, we don't need to care
3256     * about dirty page logging as well.
3257     */
3258
3259    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3260        pages += bitmap_count_one(block->bmap,
3261                                  block->used_length >> TARGET_PAGE_BITS);
3262    }
3263
3264    /* This may not be aligned with current bitmaps. Recalculate. */
3265    rs->migration_dirty_pages = pages;
3266
3267    rs->last_seen_block = NULL;
3268    rs->last_sent_block = NULL;
3269    rs->last_page = 0;
3270    rs->last_version = ram_list.version;
3271    /*
3272     * Disable the bulk stage, otherwise we'll resend the whole RAM no
3273     * matter what we have sent.
3274     */
3275    rs->ram_bulk_stage = false;
3276
3277    /* Update RAMState cache of output QEMUFile */
3278    rs->f = out;
3279
3280    trace_ram_state_resume_prepare(pages);
3281}
3282
3283/*
3284 * This function clears bits of the free pages reported by the caller from the
3285 * migration dirty bitmap. @addr is the host address corresponding to the
3286 * start of the continuous guest free pages, and @len is the total bytes of
3287 * those pages.
3288 */
3289void qemu_guest_free_page_hint(void *addr, size_t len)
3290{
3291    RAMBlock *block;
3292    ram_addr_t offset;
3293    size_t used_len, start, npages;
3294    MigrationState *s = migrate_get_current();
3295
3296    /* This function is currently expected to be used during live migration */
3297    if (!migration_is_setup_or_active(s->state)) {
3298        return;
3299    }
3300
3301    for (; len > 0; len -= used_len, addr += used_len) {
3302        block = qemu_ram_block_from_host(addr, false, &offset);
3303        if (unlikely(!block || offset >= block->used_length)) {
3304            /*
3305             * The implementation might not support RAMBlock resize during
3306             * live migration, but it could happen in theory with future
3307             * updates. So we add a check here to capture that case.
3308             */
3309            error_report_once("%s unexpected error", __func__);
3310            return;
3311        }
3312
3313        if (len <= block->used_length - offset) {
3314            used_len = len;
3315        } else {
3316            used_len = block->used_length - offset;
3317        }
3318
3319        start = offset >> TARGET_PAGE_BITS;
3320        npages = used_len >> TARGET_PAGE_BITS;
3321
3322        qemu_mutex_lock(&ram_state->bitmap_mutex);
3323        ram_state->migration_dirty_pages -=
3324                      bitmap_count_one_with_offset(block->bmap, start, npages);
3325        bitmap_clear(block->bmap, start, npages);
3326        qemu_mutex_unlock(&ram_state->bitmap_mutex);
3327    }
3328}
3329
3330/*
3331 * Each of ram_save_setup, ram_save_iterate and ram_save_complete has
3332 * long-running RCU critical section.  When rcu-reclaims in the code
3333 * start to become numerous it will be necessary to reduce the
3334 * granularity of these critical sections.
3335 */
3336
3337/**
3338 * ram_save_setup: Setup RAM for migration
3339 *
3340 * Returns zero to indicate success and negative for error
3341 *
3342 * @f: QEMUFile where to send the data
3343 * @opaque: RAMState pointer
3344 */
3345static int ram_save_setup(QEMUFile *f, void *opaque)
3346{
3347    RAMState **rsp = opaque;
3348    RAMBlock *block;
3349
3350    if (compress_threads_save_setup()) {
3351        return -1;
3352    }
3353
3354    /* migration has already setup the bitmap, reuse it. */
3355    if (!migration_in_colo_state()) {
3356        if (ram_init_all(rsp) != 0) {
3357            compress_threads_save_cleanup();
3358            return -1;
3359        }
3360    }
3361    (*rsp)->f = f;
3362
3363    rcu_read_lock();
3364
3365    qemu_put_be64(f, ram_bytes_total_common(true) | RAM_SAVE_FLAG_MEM_SIZE);
3366
3367    RAMBLOCK_FOREACH_MIGRATABLE(block) {
3368        qemu_put_byte(f, strlen(block->idstr));
3369        qemu_put_buffer(f, (uint8_t *)block->idstr, strlen(block->idstr));
3370        qemu_put_be64(f, block->used_length);
3371        if (migrate_postcopy_ram() && block->page_size != qemu_host_page_size) {
3372            qemu_put_be64(f, block->page_size);
3373        }
3374        if (migrate_ignore_shared()) {
3375            qemu_put_be64(f, block->mr->addr);
3376            qemu_put_byte(f, ramblock_is_ignored(block) ? 1 : 0);
3377        }
3378    }
3379
3380    rcu_read_unlock();
3381
3382    ram_control_before_iterate(f, RAM_CONTROL_SETUP);
3383    ram_control_after_iterate(f, RAM_CONTROL_SETUP);
3384
3385    multifd_send_sync_main();
3386    qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3387    qemu_fflush(f);
3388
3389    return 0;
3390}
3391
3392/**
3393 * ram_save_iterate: iterative stage for migration
3394 *
3395 * Returns zero to indicate success and negative for error
3396 *
3397 * @f: QEMUFile where to send the data
3398 * @opaque: RAMState pointer
3399 */
3400static int ram_save_iterate(QEMUFile *f, void *opaque)
3401{
3402    RAMState **temp = opaque;
3403    RAMState *rs = *temp;
3404    int ret;
3405    int i;
3406    int64_t t0;
3407    int done = 0;
3408
3409    if (blk_mig_bulk_active()) {
3410        /* Avoid transferring ram during bulk phase of block migration as
3411         * the bulk phase will usually take a long time and transferring
3412         * ram updates during that time is pointless. */
3413        goto out;
3414    }
3415
3416    rcu_read_lock();
3417    if (ram_list.version != rs->last_version) {
3418        ram_state_reset(rs);
3419    }
3420
3421    /* Read version before ram_list.blocks */
3422    smp_rmb();
3423
3424    ram_control_before_iterate(f, RAM_CONTROL_ROUND);
3425
3426    t0 = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
3427    i = 0;
3428    while ((ret = qemu_file_rate_limit(f)) == 0 ||
3429            !QSIMPLEQ_EMPTY(&rs->src_page_requests)) {
3430        int pages;
3431
3432        if (qemu_file_get_error(f)) {
3433            break;
3434        }
3435
3436        pages = ram_find_and_save_block(rs, false);
3437        /* no more pages to sent */
3438        if (pages == 0) {
3439            done = 1;
3440            break;
3441        }
3442
3443        if (pages < 0) {
3444            qemu_file_set_error(f, pages);
3445            break;
3446        }
3447
3448        rs->target_page_count += pages;
3449
3450        /* we want to check in the 1st loop, just in case it was the 1st time
3451           and we had to sync the dirty bitmap.
3452           qemu_get_clock_ns() is a bit expensive, so we only check each some
3453           iterations
3454        */
3455        if ((i & 63) == 0) {
3456            uint64_t t1 = (qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - t0) / 1000000;
3457            if (t1 > MAX_WAIT) {
3458                trace_ram_save_iterate_big_wait(t1, i);
3459                break;
3460            }
3461        }
3462        i++;
3463    }
3464    rcu_read_unlock();
3465
3466    /*
3467     * Must occur before EOS (or any QEMUFile operation)
3468     * because of RDMA protocol.
3469     */
3470    ram_control_after_iterate(f, RAM_CONTROL_ROUND);
3471
3472    multifd_send_sync_main();
3473out:
3474    qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3475    qemu_fflush(f);
3476    ram_counters.transferred += 8;
3477
3478    ret = qemu_file_get_error(f);
3479    if (ret < 0) {
3480        return ret;
3481    }
3482
3483    return done;
3484}
3485
3486/**
3487 * ram_save_complete: function called to send the remaining amount of ram
3488 *
3489 * Returns zero to indicate success or negative on error
3490 *
3491 * Called with iothread lock
3492 *
3493 * @f: QEMUFile where to send the data
3494 * @opaque: RAMState pointer
3495 */
3496static int ram_save_complete(QEMUFile *f, void *opaque)
3497{
3498    RAMState **temp = opaque;
3499    RAMState *rs = *temp;
3500    int ret = 0;
3501
3502    rcu_read_lock();
3503
3504    if (!migration_in_postcopy()) {
3505        migration_bitmap_sync_precopy(rs);
3506    }
3507
3508    ram_control_before_iterate(f, RAM_CONTROL_FINISH);
3509
3510    /* try transferring iterative blocks of memory */
3511
3512    /* flush all remaining blocks regardless of rate limiting */
3513    while (true) {
3514        int pages;
3515
3516        pages = ram_find_and_save_block(rs, !migration_in_colo_state());
3517        /* no more blocks to sent */
3518        if (pages == 0) {
3519            break;
3520        }
3521        if (pages < 0) {
3522            ret = pages;
3523            break;
3524        }
3525    }
3526
3527    flush_compressed_data(rs);
3528    ram_control_after_iterate(f, RAM_CONTROL_FINISH);
3529
3530    rcu_read_unlock();
3531
3532    multifd_send_sync_main();
3533    qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3534    qemu_fflush(f);
3535
3536    return ret;
3537}
3538
3539static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
3540                             uint64_t *res_precopy_only,
3541                             uint64_t *res_compatible,
3542                             uint64_t *res_postcopy_only)
3543{
3544    RAMState **temp = opaque;
3545    RAMState *rs = *temp;
3546    uint64_t remaining_size;
3547
3548    remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
3549
3550    if (!migration_in_postcopy() &&
3551        remaining_size < max_size) {
3552        qemu_mutex_lock_iothread();
3553        rcu_read_lock();
3554        migration_bitmap_sync_precopy(rs);
3555        rcu_read_unlock();
3556        qemu_mutex_unlock_iothread();
3557        remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
3558    }
3559
3560    if (migrate_postcopy_ram()) {
3561        /* We can do postcopy, and all the data is postcopiable */
3562        *res_compatible += remaining_size;
3563    } else {
3564        *res_precopy_only += remaining_size;
3565    }
3566}
3567
3568static int load_xbzrle(QEMUFile *f, ram_addr_t addr, void *host)
3569{
3570    unsigned int xh_len;
3571    int xh_flags;
3572    uint8_t *loaded_data;
3573
3574    /* extract RLE header */
3575    xh_flags = qemu_get_byte(f);
3576    xh_len = qemu_get_be16(f);
3577
3578    if (xh_flags != ENCODING_FLAG_XBZRLE) {
3579        error_report("Failed to load XBZRLE page - wrong compression!");
3580        return -1;
3581    }
3582
3583    if (xh_len > TARGET_PAGE_SIZE) {
3584        error_report("Failed to load XBZRLE page - len overflow!");
3585        return -1;
3586    }
3587    loaded_data = XBZRLE.decoded_buf;
3588    /* load data and decode */
3589    /* it can change loaded_data to point to an internal buffer */
3590    qemu_get_buffer_in_place(f, &loaded_data, xh_len);
3591
3592    /* decode RLE */
3593    if (xbzrle_decode_buffer(loaded_data, xh_len, host,
3594                             TARGET_PAGE_SIZE) == -1) {
3595        error_report("Failed to load XBZRLE page - decode error!");
3596        return -1;
3597    }
3598
3599    return 0;
3600}
3601
3602/**
3603 * ram_block_from_stream: read a RAMBlock id from the migration stream
3604 *
3605 * Must be called from within a rcu critical section.
3606 *
3607 * Returns a pointer from within the RCU-protected ram_list.
3608 *
3609 * @f: QEMUFile where to read the data from
3610 * @flags: Page flags (mostly to see if it's a continuation of previous block)
3611 */
3612static inline RAMBlock *ram_block_from_stream(QEMUFile *f, int flags)
3613{
3614    static RAMBlock *block = NULL;
3615    char id[256];
3616    uint8_t len;
3617
3618    if (flags & RAM_SAVE_FLAG_CONTINUE) {
3619        if (!block) {
3620            error_report("Ack, bad migration stream!");
3621            return NULL;
3622        }
3623        return block;
3624    }
3625
3626    len = qemu_get_byte(f);
3627    qemu_get_buffer(f, (uint8_t *)id, len);
3628    id[len] = 0;
3629
3630    block = qemu_ram_block_by_name(id);
3631    if (!block) {
3632        error_report("Can't find block %s", id);
3633        return NULL;
3634    }
3635
3636    if (ramblock_is_ignored(block)) {
3637        error_report("block %s should not be migrated !", id);
3638        return NULL;
3639    }
3640
3641    return block;
3642}
3643
3644static inline void *host_from_ram_block_offset(RAMBlock *block,
3645                                               ram_addr_t offset)
3646{
3647    if (!offset_in_ramblock(block, offset)) {
3648        return NULL;
3649    }
3650
3651    return block->host + offset;
3652}
3653
3654static inline void *colo_cache_from_block_offset(RAMBlock *block,
3655                                                 ram_addr_t offset)
3656{
3657    if (!offset_in_ramblock(block, offset)) {
3658        return NULL;
3659    }
3660    if (!block->colo_cache) {
3661        error_report("%s: colo_cache is NULL in block :%s",
3662                     __func__, block->idstr);
3663        return NULL;
3664    }
3665
3666    /*
3667    * During colo checkpoint, we need bitmap of these migrated pages.
3668    * It help us to decide which pages in ram cache should be flushed
3669    * into VM's RAM later.
3670    */
3671    if (!test_and_set_bit(offset >> TARGET_PAGE_BITS, block->bmap)) {
3672        ram_state->migration_dirty_pages++;
3673    }
3674    return block->colo_cache + offset;
3675}
3676
3677/**
3678 * ram_handle_compressed: handle the zero page case
3679 *
3680 * If a page (or a whole RDMA chunk) has been
3681 * determined to be zero, then zap it.
3682 *
3683 * @host: host address for the zero page
3684 * @ch: what the page is filled from.  We only support zero
3685 * @size: size of the zero page
3686 */
3687void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
3688{
3689    if (ch != 0 || !is_zero_range(host, size)) {
3690        memset(host, ch, size);
3691    }
3692}
3693
3694/* return the size after decompression, or negative value on error */
3695static int
3696qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
3697                     const uint8_t *source, size_t source_len)
3698{
3699    int err;
3700
3701    err = inflateReset(stream);
3702    if (err != Z_OK) {
3703        return -1;
3704    }
3705
3706    stream->avail_in = source_len;
3707    stream->next_in = (uint8_t *)source;
3708    stream->avail_out = dest_len;
3709    stream->next_out = dest;
3710
3711    err = inflate(stream, Z_NO_FLUSH);
3712    if (err != Z_STREAM_END) {
3713        return -1;
3714    }
3715
3716    return stream->total_out;
3717}
3718
3719static void *do_data_decompress(void *opaque)
3720{
3721    DecompressParam *param = opaque;
3722    unsigned long pagesize;
3723    uint8_t *des;
3724    int len, ret;
3725
3726    qemu_mutex_lock(&param->mutex);
3727    while (!param->quit) {
3728        if (param->des) {
3729            des = param->des;
3730            len = param->len;
3731            param->des = 0;
3732            qemu_mutex_unlock(&param->mutex);
3733
3734            pagesize = TARGET_PAGE_SIZE;
3735
3736            ret = qemu_uncompress_data(&param->stream, des, pagesize,
3737                                       param->compbuf, len);
3738            if (ret < 0 && migrate_get_current()->decompress_error_check) {
3739                error_report("decompress data failed");
3740                qemu_file_set_error(decomp_file, ret);
3741            }
3742
3743            qemu_mutex_lock(&decomp_done_lock);
3744            param->done = true;
3745            qemu_cond_signal(&decomp_done_cond);
3746            qemu_mutex_unlock(&decomp_done_lock);
3747
3748            qemu_mutex_lock(&param->mutex);
3749        } else {
3750            qemu_cond_wait(&param->cond, &param->mutex);
3751        }
3752    }
3753    qemu_mutex_unlock(&param->mutex);
3754
3755    return NULL;
3756}
3757
3758static int wait_for_decompress_done(void)
3759{
3760    int idx, thread_count;
3761
3762    if (!migrate_use_compression()) {
3763        return 0;
3764    }
3765
3766    thread_count = migrate_decompress_threads();
3767    qemu_mutex_lock(&decomp_done_lock);
3768    for (idx = 0; idx < thread_count; idx++) {
3769        while (!decomp_param[idx].done) {
3770            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
3771        }
3772    }
3773    qemu_mutex_unlock(&decomp_done_lock);
3774    return qemu_file_get_error(decomp_file);
3775}
3776
3777static void compress_threads_load_cleanup(void)
3778{
3779    int i, thread_count;
3780
3781    if (!migrate_use_compression()) {
3782        return;
3783    }
3784    thread_count = migrate_decompress_threads();
3785    for (i = 0; i < thread_count; i++) {
3786        /*
3787         * we use it as a indicator which shows if the thread is
3788         * properly init'd or not
3789         */
3790        if (!decomp_param[i].compbuf) {
3791            break;
3792        }
3793
3794        qemu_mutex_lock(&decomp_param[i].mutex);
3795        decomp_param[i].quit = true;
3796        qemu_cond_signal(&decomp_param[i].cond);
3797        qemu_mutex_unlock(&decomp_param[i].mutex);
3798    }
3799    for (i = 0; i < thread_count; i++) {
3800        if (!decomp_param[i].compbuf) {
3801            break;
3802        }
3803
3804        qemu_thread_join(decompress_threads + i);
3805        qemu_mutex_destroy(&decomp_param[i].mutex);
3806        qemu_cond_destroy(&decomp_param[i].cond);
3807        inflateEnd(&decomp_param[i].stream);
3808        g_free(decomp_param[i].compbuf);
3809        decomp_param[i].compbuf = NULL;
3810    }
3811    g_free(decompress_threads);
3812    g_free(decomp_param);
3813    decompress_threads = NULL;
3814    decomp_param = NULL;
3815    decomp_file = NULL;
3816}
3817
3818static int compress_threads_load_setup(QEMUFile *f)
3819{
3820    int i, thread_count;
3821
3822    if (!migrate_use_compression()) {
3823        return 0;
3824    }
3825
3826    thread_count = migrate_decompress_threads();
3827    decompress_threads = g_new0(QemuThread, thread_count);
3828    decomp_param = g_new0(DecompressParam, thread_count);
3829    qemu_mutex_init(&decomp_done_lock);
3830    qemu_cond_init(&decomp_done_cond);
3831    decomp_file = f;
3832    for (i = 0; i < thread_count; i++) {
3833        if (inflateInit(&decomp_param[i].stream) != Z_OK) {
3834            goto exit;
3835        }
3836
3837        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
3838        qemu_mutex_init(&decomp_param[i].mutex);
3839        qemu_cond_init(&decomp_param[i].cond);
3840        decomp_param[i].done = true;
3841        decomp_param[i].quit = false;
3842        qemu_thread_create(decompress_threads + i, "decompress",
3843                           do_data_decompress, decomp_param + i,
3844                           QEMU_THREAD_JOINABLE);
3845    }
3846    return 0;
3847exit:
3848    compress_threads_load_cleanup();
3849    return -1;
3850}
3851
3852static void decompress_data_with_multi_threads(QEMUFile *f,
3853                                               void *host, int len)
3854{
3855    int idx, thread_count;
3856
3857    thread_count = migrate_decompress_threads();
3858    qemu_mutex_lock(&decomp_done_lock);
3859    while (true) {
3860        for (idx = 0; idx < thread_count; idx++) {
3861            if (decomp_param[idx].done) {
3862                decomp_param[idx].done = false;
3863                qemu_mutex_lock(&decomp_param[idx].mutex);
3864                qemu_get_buffer(f, decomp_param[idx].compbuf, len);
3865                decomp_param[idx].des = host;
3866                decomp_param[idx].len = len;
3867                qemu_cond_signal(&decomp_param[idx].cond);
3868                qemu_mutex_unlock(&decomp_param[idx].mutex);
3869                break;
3870            }
3871        }
3872        if (idx < thread_count) {
3873            break;
3874        } else {
3875            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
3876        }
3877    }
3878    qemu_mutex_unlock(&decomp_done_lock);
3879}
3880
3881/*
3882 * colo cache: this is for secondary VM, we cache the whole
3883 * memory of the secondary VM, it is need to hold the global lock
3884 * to call this helper.
3885 */
3886int colo_init_ram_cache(void)
3887{
3888    RAMBlock *block;
3889
3890    rcu_read_lock();
3891    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3892        block->colo_cache = qemu_anon_ram_alloc(block->used_length,
3893                                                NULL,
3894                                                false);
3895        if (!block->colo_cache) {
3896            error_report("%s: Can't alloc memory for COLO cache of block %s,"
3897                         "size 0x" RAM_ADDR_FMT, __func__, block->idstr,
3898                         block->used_length);
3899            goto out_locked;
3900        }
3901        memcpy(block->colo_cache, block->host, block->used_length);
3902    }
3903    rcu_read_unlock();
3904    /*
3905    * Record the dirty pages that sent by PVM, we use this dirty bitmap together
3906    * with to decide which page in cache should be flushed into SVM's RAM. Here
3907    * we use the same name 'ram_bitmap' as for migration.
3908    */
3909    if (ram_bytes_total()) {
3910        RAMBlock *block;
3911
3912        RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3913            unsigned long pages = block->max_length >> TARGET_PAGE_BITS;
3914
3915            block->bmap = bitmap_new(pages);
3916            bitmap_set(block->bmap, 0, pages);
3917        }
3918    }
3919    ram_state = g_new0(RAMState, 1);
3920    ram_state->migration_dirty_pages = 0;
3921    qemu_mutex_init(&ram_state->bitmap_mutex);
3922    memory_global_dirty_log_start();
3923
3924    return 0;
3925
3926out_locked:
3927
3928    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3929        if (block->colo_cache) {
3930            qemu_anon_ram_free(block->colo_cache, block->used_length);
3931            block->colo_cache = NULL;
3932        }
3933    }
3934
3935    rcu_read_unlock();
3936    return -errno;
3937}
3938
3939/* It is need to hold the global lock to call this helper */
3940void colo_release_ram_cache(void)
3941{
3942    RAMBlock *block;
3943
3944    memory_global_dirty_log_stop();
3945    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3946        g_free(block->bmap);
3947        block->bmap = NULL;
3948    }
3949
3950    rcu_read_lock();
3951
3952    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3953        if (block->colo_cache) {
3954            qemu_anon_ram_free(block->colo_cache, block->used_length);
3955            block->colo_cache = NULL;
3956        }
3957    }
3958
3959    rcu_read_unlock();
3960    qemu_mutex_destroy(&ram_state->bitmap_mutex);
3961    g_free(ram_state);
3962    ram_state = NULL;
3963}
3964
3965/**
3966 * ram_load_setup: Setup RAM for migration incoming side
3967 *
3968 * Returns zero to indicate success and negative for error
3969 *
3970 * @f: QEMUFile where to receive the data
3971 * @opaque: RAMState pointer
3972 */
3973static int ram_load_setup(QEMUFile *f, void *opaque)
3974{
3975    if (compress_threads_load_setup(f)) {
3976        return -1;
3977    }
3978
3979    xbzrle_load_setup();
3980    ramblock_recv_map_init();
3981
3982    return 0;
3983}
3984
3985static int ram_load_cleanup(void *opaque)
3986{
3987    RAMBlock *rb;
3988
3989    RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
3990        if (ramblock_is_pmem(rb)) {
3991            pmem_persist(rb->host, rb->used_length);
3992        }
3993    }
3994
3995    xbzrle_load_cleanup();
3996    compress_threads_load_cleanup();
3997
3998    RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
3999        g_free(rb->receivedmap);
4000        rb->receivedmap = NULL;
4001    }
4002
4003    return 0;
4004}
4005
4006/**
4007 * ram_postcopy_incoming_init: allocate postcopy data structures
4008 *
4009 * Returns 0 for success and negative if there was one error
4010 *
4011 * @mis: current migration incoming state
4012 *
4013 * Allocate data structures etc needed by incoming migration with
4014 * postcopy-ram. postcopy-ram's similarly names
4015 * postcopy_ram_incoming_init does the work.
4016 */
4017int ram_postcopy_incoming_init(MigrationIncomingState *mis)
4018{
4019    return postcopy_ram_incoming_init(mis);
4020}
4021
4022/**
4023 * ram_load_postcopy: load a page in postcopy case
4024 *
4025 * Returns 0 for success or -errno in case of error
4026 *
4027 * Called in postcopy mode by ram_load().
4028 * rcu_read_lock is taken prior to this being called.
4029 *
4030 * @f: QEMUFile where to send the data
4031 */
4032static int ram_load_postcopy(QEMUFile *f)
4033{
4034    int flags = 0, ret = 0;
4035    bool place_needed = false;
4036    bool matches_target_page_size = false;
4037    MigrationIncomingState *mis = migration_incoming_get_current();
4038    /* Temporary page that is later 'placed' */
4039    void *postcopy_host_page = postcopy_get_tmp_page(mis);
4040    void *last_host = NULL;
4041    bool all_zero = false;
4042
4043    while (!ret && !(flags & RAM_SAVE_FLAG_EOS)) {
4044        ram_addr_t addr;
4045        void *host = NULL;
4046        void *page_buffer = NULL;
4047        void *place_source = NULL;
4048        RAMBlock *block = NULL;
4049        uint8_t ch;
4050
4051        addr = qemu_get_be64(f);
4052
4053        /*
4054         * If qemu file error, we should stop here, and then "addr"
4055         * may be invalid
4056         */
4057        ret = qemu_file_get_error(f);
4058        if (ret) {
4059            break;
4060        }
4061
4062        flags = addr & ~TARGET_PAGE_MASK;
4063        addr &= TARGET_PAGE_MASK;
4064
4065        trace_ram_load_postcopy_loop((uint64_t)addr, flags);
4066        place_needed = false;
4067        if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE)) {
4068            block = ram_block_from_stream(f, flags);
4069
4070            host = host_from_ram_block_offset(block, addr);
4071            if (!host) {
4072                error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
4073                ret = -EINVAL;
4074                break;
4075            }
4076            matches_target_page_size = block->page_size == TARGET_PAGE_SIZE;
4077            /*
4078             * Postcopy requires that we place whole host pages atomically;
4079             * these may be huge pages for RAMBlocks that are backed by
4080             * hugetlbfs.
4081             * To make it atomic, the data is read into a temporary page
4082             * that's moved into place later.
4083             * The migration protocol uses,  possibly smaller, target-pages
4084             * however the source ensures it always sends all the components
4085             * of a host page in order.
4086             */
4087            page_buffer = postcopy_host_page +
4088                          ((uintptr_t)host & (block->page_size - 1));
4089            /* If all TP are zero then we can optimise the place */
4090            if (!((uintptr_t)host & (block->page_size - 1))) {
4091                all_zero = true;
4092            } else {
4093                /* not the 1st TP within the HP */
4094                if (host != (last_host + TARGET_PAGE_SIZE)) {
4095                    error_report("Non-sequential target page %p/%p",
4096                                  host, last_host);
4097                    ret = -EINVAL;
4098                    break;
4099                }
4100            }
4101
4102
4103            /*
4104             * If it's the last part of a host page then we place the host
4105             * page
4106             */
4107            place_needed = (((uintptr_t)host + TARGET_PAGE_SIZE) &
4108                                     (block->page_size - 1)) == 0;
4109            place_source = postcopy_host_page;
4110        }
4111        last_host = host;
4112
4113        switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
4114        case RAM_SAVE_FLAG_ZERO:
4115            ch = qemu_get_byte(f);
4116            memset(page_buffer, ch, TARGET_PAGE_SIZE);
4117            if (ch) {
4118                all_zero = false;
4119            }
4120            break;
4121
4122        case RAM_SAVE_FLAG_PAGE:
4123            all_zero = false;
4124            if (!matches_target_page_size) {
4125                /* For huge pages, we always use temporary buffer */
4126                qemu_get_buffer(f, page_buffer, TARGET_PAGE_SIZE);
4127            } else {
4128                /*
4129                 * For small pages that matches target page size, we
4130                 * avoid the qemu_file copy.  Instead we directly use
4131                 * the buffer of QEMUFile to place the page.  Note: we
4132                 * cannot do any QEMUFile operation before using that
4133                 * buffer to make sure the buffer is valid when
4134                 * placing the page.
4135                 */
4136                qemu_get_buffer_in_place(f, (uint8_t **)&place_source,
4137                                         TARGET_PAGE_SIZE);
4138            }
4139            break;
4140        case RAM_SAVE_FLAG_EOS:
4141            /* normal exit */
4142            multifd_recv_sync_main();
4143            break;
4144        default:
4145            error_report("Unknown combination of migration flags: %#x"
4146                         " (postcopy mode)", flags);
4147            ret = -EINVAL;
4148            break;
4149        }
4150
4151        /* Detect for any possible file errors */
4152        if (!ret && qemu_file_get_error(f)) {
4153            ret = qemu_file_get_error(f);
4154        }
4155
4156        if (!ret && place_needed) {
4157            /* This gets called at the last target page in the host page */
4158            void *place_dest = host + TARGET_PAGE_SIZE - block->page_size;
4159
4160            if (all_zero) {
4161                ret = postcopy_place_page_zero(mis, place_dest,
4162                                               block);
4163            } else {
4164                ret = postcopy_place_page(mis, place_dest,
4165                                          place_source, block);
4166            }
4167        }
4168    }
4169
4170    return ret;
4171}
4172
4173static bool postcopy_is_advised(void)
4174{
4175    PostcopyState ps = postcopy_state_get();
4176    return ps >= POSTCOPY_INCOMING_ADVISE && ps < POSTCOPY_INCOMING_END;
4177}
4178
4179static bool postcopy_is_running(void)
4180{
4181    PostcopyState ps = postcopy_state_get();
4182    return ps >= POSTCOPY_INCOMING_LISTENING && ps < POSTCOPY_INCOMING_END;
4183}
4184
4185/*
4186 * Flush content of RAM cache into SVM's memory.
4187 * Only flush the pages that be dirtied by PVM or SVM or both.
4188 */
4189static void colo_flush_ram_cache(void)
4190{
4191    RAMBlock *block = NULL;
4192    void *dst_host;
4193    void *src_host;
4194    unsigned long offset = 0;
4195
4196    memory_global_dirty_log_sync();
4197    rcu_read_lock();
4198    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
4199        migration_bitmap_sync_range(ram_state, block, 0, block->used_length);
4200    }
4201    rcu_read_unlock();
4202
4203    trace_colo_flush_ram_cache_begin(ram_state->migration_dirty_pages);
4204    rcu_read_lock();
4205    block = QLIST_FIRST_RCU(&ram_list.blocks);
4206
4207    while (block) {
4208        offset = migration_bitmap_find_dirty(ram_state, block, offset);
4209
4210        if (offset << TARGET_PAGE_BITS >= block->used_length) {
4211            offset = 0;
4212            block = QLIST_NEXT_RCU(block, next);
4213        } else {
4214            migration_bitmap_clear_dirty(ram_state, block, offset);
4215            dst_host = block->host + (offset << TARGET_PAGE_BITS);
4216            src_host = block->colo_cache + (offset << TARGET_PAGE_BITS);
4217            memcpy(dst_host, src_host, TARGET_PAGE_SIZE);
4218        }
4219    }
4220
4221    rcu_read_unlock();
4222    trace_colo_flush_ram_cache_end();
4223}
4224
4225static int ram_load(QEMUFile *f, void *opaque, int version_id)
4226{
4227    int flags = 0, ret = 0, invalid_flags = 0;
4228    static uint64_t seq_iter;
4229    int len = 0;
4230    /*
4231     * If system is running in postcopy mode, page inserts to host memory must
4232     * be atomic
4233     */
4234    bool postcopy_running = postcopy_is_running();
4235    /* ADVISE is earlier, it shows the source has the postcopy capability on */
4236    bool postcopy_advised = postcopy_is_advised();
4237
4238    seq_iter++;
4239
4240    if (version_id != 4) {
4241        ret = -EINVAL;
4242    }
4243
4244    if (!migrate_use_compression()) {
4245        invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
4246    }
4247    /* This RCU critical section can be very long running.
4248     * When RCU reclaims in the code start to become numerous,
4249     * it will be necessary to reduce the granularity of this
4250     * critical section.
4251     */
4252    rcu_read_lock();
4253
4254    if (postcopy_running) {
4255        ret = ram_load_postcopy(f);
4256    }
4257
4258    while (!postcopy_running && !ret && !(flags & RAM_SAVE_FLAG_EOS)) {
4259        ram_addr_t addr, total_ram_bytes;
4260        void *host = NULL;
4261        uint8_t ch;
4262
4263        addr = qemu_get_be64(f);
4264        flags = addr & ~TARGET_PAGE_MASK;
4265        addr &= TARGET_PAGE_MASK;
4266
4267        if (flags & invalid_flags) {
4268            if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) {
4269                error_report("Received an unexpected compressed page");
4270            }
4271
4272            ret = -EINVAL;
4273            break;
4274        }
4275
4276        if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
4277                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
4278            RAMBlock *block = ram_block_from_stream(f, flags);
4279
4280            /*
4281             * After going into COLO, we should load the Page into colo_cache.
4282             */
4283            if (migration_incoming_in_colo_state()) {
4284                host = colo_cache_from_block_offset(block, addr);
4285            } else {
4286                host = host_from_ram_block_offset(block, addr);
4287            }
4288            if (!host) {
4289                error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
4290                ret = -EINVAL;
4291                break;
4292            }
4293
4294            if (!migration_incoming_in_colo_state()) {
4295                ramblock_recv_bitmap_set(block, host);
4296            }
4297
4298            trace_ram_load_loop(block->idstr, (uint64_t)addr, flags, host);
4299        }
4300
4301        switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
4302        case RAM_SAVE_FLAG_MEM_SIZE:
4303            /* Synchronize RAM block list */
4304            total_ram_bytes = addr;
4305            while (!ret && total_ram_bytes) {
4306                RAMBlock *block;
4307                char id[256];
4308                ram_addr_t length;
4309
4310                len = qemu_get_byte(f);
4311                qemu_get_buffer(f, (uint8_t *)id, len);
4312                id[len] = 0;
4313                length = qemu_get_be64(f);
4314
4315                block = qemu_ram_block_by_name(id);
4316                if (block && !qemu_ram_is_migratable(block)) {
4317                    error_report("block %s should not be migrated !", id);
4318                    ret = -EINVAL;
4319                } else if (block) {
4320                    if (length != block->used_length) {
4321                        Error *local_err = NULL;
4322
4323                        ret = qemu_ram_resize(block, length,
4324                                              &local_err);
4325                        if (local_err) {
4326                            error_report_err(local_err);
4327                        }
4328                    }
4329                    /* For postcopy we need to check hugepage sizes match */
4330                    if (postcopy_advised &&
4331                        block->page_size != qemu_host_page_size) {
4332                        uint64_t remote_page_size = qemu_get_be64(f);
4333                        if (remote_page_size != block->page_size) {
4334                            error_report("Mismatched RAM page size %s "
4335                                         "(local) %zd != %" PRId64,
4336                                         id, block->page_size,
4337                                         remote_page_size);
4338                            ret = -EINVAL;
4339                        }
4340                    }
4341                    if (migrate_ignore_shared()) {
4342                        hwaddr addr = qemu_get_be64(f);
4343                        bool ignored = qemu_get_byte(f);
4344                        if (ignored != ramblock_is_ignored(block)) {
4345                            error_report("RAM block %s should %s be migrated",
4346                                         id, ignored ? "" : "not");
4347                            ret = -EINVAL;
4348                        }
4349                        if (ramblock_is_ignored(block) &&
4350                            block->mr->addr != addr) {
4351                            error_report("Mismatched GPAs for block %s "
4352                                         "%" PRId64 "!= %" PRId64,
4353                                         id, (uint64_t)addr,
4354                                         (uint64_t)block->mr->addr);
4355                            ret = -EINVAL;
4356                        }
4357                    }
4358                    ram_control_load_hook(f, RAM_CONTROL_BLOCK_REG,
4359                                          block->idstr);
4360                } else {
4361                    error_report("Unknown ramblock \"%s\", cannot "
4362                                 "accept migration", id);
4363                    ret = -EINVAL;
4364                }
4365
4366                total_ram_bytes -= length;
4367            }
4368            break;
4369
4370        case RAM_SAVE_FLAG_ZERO:
4371            ch = qemu_get_byte(f);
4372            ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
4373            break;
4374
4375        case RAM_SAVE_FLAG_PAGE:
4376            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
4377            break;
4378
4379        case RAM_SAVE_FLAG_COMPRESS_PAGE:
4380            len = qemu_get_be32(f);
4381            if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
4382                error_report("Invalid compressed data length: %d", len);
4383                ret = -EINVAL;
4384                break;
4385            }
4386            decompress_data_with_multi_threads(f, host, len);
4387            break;
4388
4389        case RAM_SAVE_FLAG_XBZRLE:
4390            if (load_xbzrle(f, addr, host) < 0) {
4391                error_report("Failed to decompress XBZRLE page at "
4392                             RAM_ADDR_FMT, addr);
4393                ret = -EINVAL;
4394                break;
4395            }
4396            break;
4397        case RAM_SAVE_FLAG_EOS:
4398            /* normal exit */
4399            multifd_recv_sync_main();
4400            break;
4401        default:
4402            if (flags & RAM_SAVE_FLAG_HOOK) {
4403                ram_control_load_hook(f, RAM_CONTROL_HOOK, NULL);
4404            } else {
4405                error_report("Unknown combination of migration flags: %#x",
4406                             flags);
4407                ret = -EINVAL;
4408            }
4409        }
4410        if (!ret) {
4411            ret = qemu_file_get_error(f);
4412        }
4413    }
4414
4415    ret |= wait_for_decompress_done();
4416    rcu_read_unlock();
4417    trace_ram_load_complete(ret, seq_iter);
4418
4419    if (!ret  && migration_incoming_in_colo_state()) {
4420        colo_flush_ram_cache();
4421    }
4422    return ret;
4423}
4424
4425static bool ram_has_postcopy(void *opaque)
4426{
4427    RAMBlock *rb;
4428    RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
4429        if (ramblock_is_pmem(rb)) {
4430            info_report("Block: %s, host: %p is a nvdimm memory, postcopy"
4431                         "is not supported now!", rb->idstr, rb->host);
4432            return false;
4433        }
4434    }
4435
4436    return migrate_postcopy_ram();
4437}
4438
4439/* Sync all the dirty bitmap with destination VM.  */
4440static int ram_dirty_bitmap_sync_all(MigrationState *s, RAMState *rs)
4441{
4442    RAMBlock *block;
4443    QEMUFile *file = s->to_dst_file;
4444    int ramblock_count = 0;
4445
4446    trace_ram_dirty_bitmap_sync_start();
4447
4448    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
4449        qemu_savevm_send_recv_bitmap(file, block->idstr);
4450        trace_ram_dirty_bitmap_request(block->idstr);
4451        ramblock_count++;
4452    }
4453
4454    trace_ram_dirty_bitmap_sync_wait();
4455
4456    /* Wait until all the ramblocks' dirty bitmap synced */
4457    while (ramblock_count--) {
4458        qemu_sem_wait(&s->rp_state.rp_sem);
4459    }
4460
4461    trace_ram_dirty_bitmap_sync_complete();
4462
4463    return 0;
4464}
4465
4466static void ram_dirty_bitmap_reload_notify(MigrationState *s)
4467{
4468    qemu_sem_post(&s->rp_state.rp_sem);
4469}
4470
4471/*
4472 * Read the received bitmap, revert it as the initial dirty bitmap.
4473 * This is only used when the postcopy migration is paused but wants
4474 * to resume from a middle point.
4475 */
4476int ram_dirty_bitmap_reload(MigrationState *s, RAMBlock *block)
4477{
4478    int ret = -EINVAL;
4479    QEMUFile *file = s->rp_state.from_dst_file;
4480    unsigned long *le_bitmap, nbits = block->used_length >> TARGET_PAGE_BITS;
4481    uint64_t local_size = DIV_ROUND_UP(nbits, 8);
4482    uint64_t size, end_mark;
4483
4484    trace_ram_dirty_bitmap_reload_begin(block->idstr);
4485
4486    if (s->state != MIGRATION_STATUS_POSTCOPY_RECOVER) {
4487        error_report("%s: incorrect state %s", __func__,
4488                     MigrationStatus_str(s->state));
4489        return -EINVAL;
4490    }
4491
4492    /*
4493     * Note: see comments in ramblock_recv_bitmap_send() on why we
4494     * need the endianess convertion, and the paddings.
4495     */
4496    local_size = ROUND_UP(local_size, 8);
4497
4498    /* Add paddings */
4499    le_bitmap = bitmap_new(nbits + BITS_PER_LONG);
4500
4501    size = qemu_get_be64(file);
4502
4503    /* The size of the bitmap should match with our ramblock */
4504    if (size != local_size) {
4505        error_report("%s: ramblock '%s' bitmap size mismatch "
4506                     "(0x%"PRIx64" != 0x%"PRIx64")", __func__,
4507                     block->idstr, size, local_size);
4508        ret = -EINVAL;
4509        goto out;
4510    }
4511
4512    size = qemu_get_buffer(file, (uint8_t *)le_bitmap, local_size);
4513    end_mark = qemu_get_be64(file);
4514
4515    ret = qemu_file_get_error(file);
4516    if (ret || size != local_size) {
4517        error_report("%s: read bitmap failed for ramblock '%s': %d"
4518                     " (size 0x%"PRIx64", got: 0x%"PRIx64")",
4519                     __func__, block->idstr, ret, local_size, size);
4520        ret = -EIO;
4521        goto out;
4522    }
4523
4524    if (end_mark != RAMBLOCK_RECV_BITMAP_ENDING) {
4525        error_report("%s: ramblock '%s' end mark incorrect: 0x%"PRIu64,
4526                     __func__, block->idstr, end_mark);
4527        ret = -EINVAL;
4528        goto out;
4529    }
4530
4531    /*
4532     * Endianess convertion. We are during postcopy (though paused).
4533     * The dirty bitmap won't change. We can directly modify it.
4534     */
4535    bitmap_from_le(block->bmap, le_bitmap, nbits);
4536
4537    /*
4538     * What we received is "received bitmap". Revert it as the initial
4539     * dirty bitmap for this ramblock.
4540     */
4541    bitmap_complement(block->bmap, block->bmap, nbits);
4542
4543    trace_ram_dirty_bitmap_reload_complete(block->idstr);
4544
4545    /*
4546     * We succeeded to sync bitmap for current ramblock. If this is
4547     * the last one to sync, we need to notify the main send thread.
4548     */
4549    ram_dirty_bitmap_reload_notify(s);
4550
4551    ret = 0;
4552out:
4553    g_free(le_bitmap);
4554    return ret;
4555}
4556
4557static int ram_resume_prepare(MigrationState *s, void *opaque)
4558{
4559    RAMState *rs = *(RAMState **)opaque;
4560    int ret;
4561
4562    ret = ram_dirty_bitmap_sync_all(s, rs);
4563    if (ret) {
4564        return ret;
4565    }
4566
4567    ram_state_resume_prepare(rs, s->to_dst_file);
4568
4569    return 0;
4570}
4571
4572static SaveVMHandlers savevm_ram_handlers = {
4573    .save_setup = ram_save_setup,
4574    .save_live_iterate = ram_save_iterate,
4575    .save_live_complete_postcopy = ram_save_complete,
4576    .save_live_complete_precopy = ram_save_complete,
4577    .has_postcopy = ram_has_postcopy,
4578    .save_live_pending = ram_save_pending,
4579    .load_state = ram_load,
4580    .save_cleanup = ram_save_cleanup,
4581    .load_setup = ram_load_setup,
4582    .load_cleanup = ram_load_cleanup,
4583    .resume_prepare = ram_resume_prepare,
4584};
4585
4586void ram_mig_init(void)
4587{
4588    qemu_mutex_init(&XBZRLE.lock);
4589    register_savevm_live(NULL, "ram", 0, 4, &savevm_ram_handlers, &ram_state);
4590}
4591