qemu/migration/ram.c
<<
>>
Prefs
   1/*
   2 * QEMU System Emulator
   3 *
   4 * Copyright (c) 2003-2008 Fabrice Bellard
   5 * Copyright (c) 2011-2015 Red Hat Inc
   6 *
   7 * Authors:
   8 *  Juan Quintela <quintela@redhat.com>
   9 *
  10 * Permission is hereby granted, free of charge, to any person obtaining a copy
  11 * of this software and associated documentation files (the "Software"), to deal
  12 * in the Software without restriction, including without limitation the rights
  13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  14 * copies of the Software, and to permit persons to whom the Software is
  15 * furnished to do so, subject to the following conditions:
  16 *
  17 * The above copyright notice and this permission notice shall be included in
  18 * all copies or substantial portions of the Software.
  19 *
  20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  23 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  26 * THE SOFTWARE.
  27 */
  28
  29#include "qemu/osdep.h"
  30#include "cpu.h"
  31#include <zlib.h>
  32#include "qemu/cutils.h"
  33#include "qemu/bitops.h"
  34#include "qemu/bitmap.h"
  35#include "qemu/main-loop.h"
  36#include "qemu/pmem.h"
  37#include "xbzrle.h"
  38#include "ram.h"
  39#include "migration.h"
  40#include "socket.h"
  41#include "migration/register.h"
  42#include "migration/misc.h"
  43#include "qemu-file.h"
  44#include "postcopy-ram.h"
  45#include "page_cache.h"
  46#include "qemu/error-report.h"
  47#include "qapi/error.h"
  48#include "qapi/qapi-events-migration.h"
  49#include "qapi/qmp/qerror.h"
  50#include "trace.h"
  51#include "exec/ram_addr.h"
  52#include "exec/target_page.h"
  53#include "qemu/rcu_queue.h"
  54#include "migration/colo.h"
  55#include "block.h"
  56#include "sysemu/sysemu.h"
  57#include "qemu/uuid.h"
  58#include "savevm.h"
  59#include "qemu/iov.h"
  60
  61/***********************************************************/
  62/* ram save/restore */
  63
  64/* RAM_SAVE_FLAG_ZERO used to be named RAM_SAVE_FLAG_COMPRESS, it
  65 * worked for pages that where filled with the same char.  We switched
  66 * it to only search for the zero value.  And to avoid confusion with
  67 * RAM_SSAVE_FLAG_COMPRESS_PAGE just rename it.
  68 */
  69
  70#define RAM_SAVE_FLAG_FULL     0x01 /* Obsolete, not used anymore */
  71#define RAM_SAVE_FLAG_ZERO     0x02
  72#define RAM_SAVE_FLAG_MEM_SIZE 0x04
  73#define RAM_SAVE_FLAG_PAGE     0x08
  74#define RAM_SAVE_FLAG_EOS      0x10
  75#define RAM_SAVE_FLAG_CONTINUE 0x20
  76#define RAM_SAVE_FLAG_XBZRLE   0x40
  77/* 0x80 is reserved in migration.h start with 0x100 next */
  78#define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
  79
  80static inline bool is_zero_range(uint8_t *p, uint64_t size)
  81{
  82    return buffer_is_zero(p, size);
  83}
  84
  85XBZRLECacheStats xbzrle_counters;
  86
  87/* struct contains XBZRLE cache and a static page
  88   used by the compression */
  89static struct {
  90    /* buffer used for XBZRLE encoding */
  91    uint8_t *encoded_buf;
  92    /* buffer for storing page content */
  93    uint8_t *current_buf;
  94    /* Cache for XBZRLE, Protected by lock. */
  95    PageCache *cache;
  96    QemuMutex lock;
  97    /* it will store a page full of zeros */
  98    uint8_t *zero_target_page;
  99    /* buffer used for XBZRLE decoding */
 100    uint8_t *decoded_buf;
 101} XBZRLE;
 102
 103static void XBZRLE_cache_lock(void)
 104{
 105    if (migrate_use_xbzrle())
 106        qemu_mutex_lock(&XBZRLE.lock);
 107}
 108
 109static void XBZRLE_cache_unlock(void)
 110{
 111    if (migrate_use_xbzrle())
 112        qemu_mutex_unlock(&XBZRLE.lock);
 113}
 114
 115/**
 116 * xbzrle_cache_resize: resize the xbzrle cache
 117 *
 118 * This function is called from qmp_migrate_set_cache_size in main
 119 * thread, possibly while a migration is in progress.  A running
 120 * migration may be using the cache and might finish during this call,
 121 * hence changes to the cache are protected by XBZRLE.lock().
 122 *
 123 * Returns 0 for success or -1 for error
 124 *
 125 * @new_size: new cache size
 126 * @errp: set *errp if the check failed, with reason
 127 */
 128int xbzrle_cache_resize(int64_t new_size, Error **errp)
 129{
 130    PageCache *new_cache;
 131    int64_t ret = 0;
 132
 133    /* Check for truncation */
 134    if (new_size != (size_t)new_size) {
 135        error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "cache size",
 136                   "exceeding address space");
 137        return -1;
 138    }
 139
 140    if (new_size == migrate_xbzrle_cache_size()) {
 141        /* nothing to do */
 142        return 0;
 143    }
 144
 145    XBZRLE_cache_lock();
 146
 147    if (XBZRLE.cache != NULL) {
 148        new_cache = cache_init(new_size, TARGET_PAGE_SIZE, errp);
 149        if (!new_cache) {
 150            ret = -1;
 151            goto out;
 152        }
 153
 154        cache_fini(XBZRLE.cache);
 155        XBZRLE.cache = new_cache;
 156    }
 157out:
 158    XBZRLE_cache_unlock();
 159    return ret;
 160}
 161
 162static bool ramblock_is_ignored(RAMBlock *block)
 163{
 164    return !qemu_ram_is_migratable(block) ||
 165           (migrate_ignore_shared() && qemu_ram_is_shared(block));
 166}
 167
 168/* Should be holding either ram_list.mutex, or the RCU lock. */
 169#define RAMBLOCK_FOREACH_NOT_IGNORED(block)            \
 170    INTERNAL_RAMBLOCK_FOREACH(block)                   \
 171        if (ramblock_is_ignored(block)) {} else
 172
 173#define RAMBLOCK_FOREACH_MIGRATABLE(block)             \
 174    INTERNAL_RAMBLOCK_FOREACH(block)                   \
 175        if (!qemu_ram_is_migratable(block)) {} else
 176
 177#undef RAMBLOCK_FOREACH
 178
 179int foreach_not_ignored_block(RAMBlockIterFunc func, void *opaque)
 180{
 181    RAMBlock *block;
 182    int ret = 0;
 183
 184    RCU_READ_LOCK_GUARD();
 185
 186    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
 187        ret = func(block, opaque);
 188        if (ret) {
 189            break;
 190        }
 191    }
 192    return ret;
 193}
 194
 195static void ramblock_recv_map_init(void)
 196{
 197    RAMBlock *rb;
 198
 199    RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
 200        assert(!rb->receivedmap);
 201        rb->receivedmap = bitmap_new(rb->max_length >> qemu_target_page_bits());
 202    }
 203}
 204
 205int ramblock_recv_bitmap_test(RAMBlock *rb, void *host_addr)
 206{
 207    return test_bit(ramblock_recv_bitmap_offset(host_addr, rb),
 208                    rb->receivedmap);
 209}
 210
 211bool ramblock_recv_bitmap_test_byte_offset(RAMBlock *rb, uint64_t byte_offset)
 212{
 213    return test_bit(byte_offset >> TARGET_PAGE_BITS, rb->receivedmap);
 214}
 215
 216void ramblock_recv_bitmap_set(RAMBlock *rb, void *host_addr)
 217{
 218    set_bit_atomic(ramblock_recv_bitmap_offset(host_addr, rb), rb->receivedmap);
 219}
 220
 221void ramblock_recv_bitmap_set_range(RAMBlock *rb, void *host_addr,
 222                                    size_t nr)
 223{
 224    bitmap_set_atomic(rb->receivedmap,
 225                      ramblock_recv_bitmap_offset(host_addr, rb),
 226                      nr);
 227}
 228
 229#define  RAMBLOCK_RECV_BITMAP_ENDING  (0x0123456789abcdefULL)
 230
 231/*
 232 * Format: bitmap_size (8 bytes) + whole_bitmap (N bytes).
 233 *
 234 * Returns >0 if success with sent bytes, or <0 if error.
 235 */
 236int64_t ramblock_recv_bitmap_send(QEMUFile *file,
 237                                  const char *block_name)
 238{
 239    RAMBlock *block = qemu_ram_block_by_name(block_name);
 240    unsigned long *le_bitmap, nbits;
 241    uint64_t size;
 242
 243    if (!block) {
 244        error_report("%s: invalid block name: %s", __func__, block_name);
 245        return -1;
 246    }
 247
 248    nbits = block->used_length >> TARGET_PAGE_BITS;
 249
 250    /*
 251     * Make sure the tmp bitmap buffer is big enough, e.g., on 32bit
 252     * machines we may need 4 more bytes for padding (see below
 253     * comment). So extend it a bit before hand.
 254     */
 255    le_bitmap = bitmap_new(nbits + BITS_PER_LONG);
 256
 257    /*
 258     * Always use little endian when sending the bitmap. This is
 259     * required that when source and destination VMs are not using the
 260     * same endianess. (Note: big endian won't work.)
 261     */
 262    bitmap_to_le(le_bitmap, block->receivedmap, nbits);
 263
 264    /* Size of the bitmap, in bytes */
 265    size = DIV_ROUND_UP(nbits, 8);
 266
 267    /*
 268     * size is always aligned to 8 bytes for 64bit machines, but it
 269     * may not be true for 32bit machines. We need this padding to
 270     * make sure the migration can survive even between 32bit and
 271     * 64bit machines.
 272     */
 273    size = ROUND_UP(size, 8);
 274
 275    qemu_put_be64(file, size);
 276    qemu_put_buffer(file, (const uint8_t *)le_bitmap, size);
 277    /*
 278     * Mark as an end, in case the middle part is screwed up due to
 279     * some "misterious" reason.
 280     */
 281    qemu_put_be64(file, RAMBLOCK_RECV_BITMAP_ENDING);
 282    qemu_fflush(file);
 283
 284    g_free(le_bitmap);
 285
 286    if (qemu_file_get_error(file)) {
 287        return qemu_file_get_error(file);
 288    }
 289
 290    return size + sizeof(size);
 291}
 292
 293/*
 294 * An outstanding page request, on the source, having been received
 295 * and queued
 296 */
 297struct RAMSrcPageRequest {
 298    RAMBlock *rb;
 299    hwaddr    offset;
 300    hwaddr    len;
 301
 302    QSIMPLEQ_ENTRY(RAMSrcPageRequest) next_req;
 303};
 304
 305/* State of RAM for migration */
 306struct RAMState {
 307    /* QEMUFile used for this migration */
 308    QEMUFile *f;
 309    /* Last block that we have visited searching for dirty pages */
 310    RAMBlock *last_seen_block;
 311    /* Last block from where we have sent data */
 312    RAMBlock *last_sent_block;
 313    /* Last dirty target page we have sent */
 314    ram_addr_t last_page;
 315    /* last ram version we have seen */
 316    uint32_t last_version;
 317    /* We are in the first round */
 318    bool ram_bulk_stage;
 319    /* The free page optimization is enabled */
 320    bool fpo_enabled;
 321    /* How many times we have dirty too many pages */
 322    int dirty_rate_high_cnt;
 323    /* these variables are used for bitmap sync */
 324    /* last time we did a full bitmap_sync */
 325    int64_t time_last_bitmap_sync;
 326    /* bytes transferred at start_time */
 327    uint64_t bytes_xfer_prev;
 328    /* number of dirty pages since start_time */
 329    uint64_t num_dirty_pages_period;
 330    /* xbzrle misses since the beginning of the period */
 331    uint64_t xbzrle_cache_miss_prev;
 332
 333    /* compression statistics since the beginning of the period */
 334    /* amount of count that no free thread to compress data */
 335    uint64_t compress_thread_busy_prev;
 336    /* amount bytes after compression */
 337    uint64_t compressed_size_prev;
 338    /* amount of compressed pages */
 339    uint64_t compress_pages_prev;
 340
 341    /* total handled target pages at the beginning of period */
 342    uint64_t target_page_count_prev;
 343    /* total handled target pages since start */
 344    uint64_t target_page_count;
 345    /* number of dirty bits in the bitmap */
 346    uint64_t migration_dirty_pages;
 347    /* Protects modification of the bitmap and migration dirty pages */
 348    QemuMutex bitmap_mutex;
 349    /* The RAMBlock used in the last src_page_requests */
 350    RAMBlock *last_req_rb;
 351    /* Queue of outstanding page requests from the destination */
 352    QemuMutex src_page_req_mutex;
 353    QSIMPLEQ_HEAD(, RAMSrcPageRequest) src_page_requests;
 354};
 355typedef struct RAMState RAMState;
 356
 357static RAMState *ram_state;
 358
 359static NotifierWithReturnList precopy_notifier_list;
 360
 361void precopy_infrastructure_init(void)
 362{
 363    notifier_with_return_list_init(&precopy_notifier_list);
 364}
 365
 366void precopy_add_notifier(NotifierWithReturn *n)
 367{
 368    notifier_with_return_list_add(&precopy_notifier_list, n);
 369}
 370
 371void precopy_remove_notifier(NotifierWithReturn *n)
 372{
 373    notifier_with_return_remove(n);
 374}
 375
 376int precopy_notify(PrecopyNotifyReason reason, Error **errp)
 377{
 378    PrecopyNotifyData pnd;
 379    pnd.reason = reason;
 380    pnd.errp = errp;
 381
 382    return notifier_with_return_list_notify(&precopy_notifier_list, &pnd);
 383}
 384
 385void precopy_enable_free_page_optimization(void)
 386{
 387    if (!ram_state) {
 388        return;
 389    }
 390
 391    ram_state->fpo_enabled = true;
 392}
 393
 394uint64_t ram_bytes_remaining(void)
 395{
 396    return ram_state ? (ram_state->migration_dirty_pages * TARGET_PAGE_SIZE) :
 397                       0;
 398}
 399
 400MigrationStats ram_counters;
 401
 402/* used by the search for pages to send */
 403struct PageSearchStatus {
 404    /* Current block being searched */
 405    RAMBlock    *block;
 406    /* Current page to search from */
 407    unsigned long page;
 408    /* Set once we wrap around */
 409    bool         complete_round;
 410};
 411typedef struct PageSearchStatus PageSearchStatus;
 412
 413CompressionStats compression_counters;
 414
 415struct CompressParam {
 416    bool done;
 417    bool quit;
 418    bool zero_page;
 419    QEMUFile *file;
 420    QemuMutex mutex;
 421    QemuCond cond;
 422    RAMBlock *block;
 423    ram_addr_t offset;
 424
 425    /* internally used fields */
 426    z_stream stream;
 427    uint8_t *originbuf;
 428};
 429typedef struct CompressParam CompressParam;
 430
 431struct DecompressParam {
 432    bool done;
 433    bool quit;
 434    QemuMutex mutex;
 435    QemuCond cond;
 436    void *des;
 437    uint8_t *compbuf;
 438    int len;
 439    z_stream stream;
 440};
 441typedef struct DecompressParam DecompressParam;
 442
 443static CompressParam *comp_param;
 444static QemuThread *compress_threads;
 445/* comp_done_cond is used to wake up the migration thread when
 446 * one of the compression threads has finished the compression.
 447 * comp_done_lock is used to co-work with comp_done_cond.
 448 */
 449static QemuMutex comp_done_lock;
 450static QemuCond comp_done_cond;
 451/* The empty QEMUFileOps will be used by file in CompressParam */
 452static const QEMUFileOps empty_ops = { };
 453
 454static QEMUFile *decomp_file;
 455static DecompressParam *decomp_param;
 456static QemuThread *decompress_threads;
 457static QemuMutex decomp_done_lock;
 458static QemuCond decomp_done_cond;
 459
 460static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
 461                                 ram_addr_t offset, uint8_t *source_buf);
 462
 463static void *do_data_compress(void *opaque)
 464{
 465    CompressParam *param = opaque;
 466    RAMBlock *block;
 467    ram_addr_t offset;
 468    bool zero_page;
 469
 470    qemu_mutex_lock(&param->mutex);
 471    while (!param->quit) {
 472        if (param->block) {
 473            block = param->block;
 474            offset = param->offset;
 475            param->block = NULL;
 476            qemu_mutex_unlock(&param->mutex);
 477
 478            zero_page = do_compress_ram_page(param->file, &param->stream,
 479                                             block, offset, param->originbuf);
 480
 481            qemu_mutex_lock(&comp_done_lock);
 482            param->done = true;
 483            param->zero_page = zero_page;
 484            qemu_cond_signal(&comp_done_cond);
 485            qemu_mutex_unlock(&comp_done_lock);
 486
 487            qemu_mutex_lock(&param->mutex);
 488        } else {
 489            qemu_cond_wait(&param->cond, &param->mutex);
 490        }
 491    }
 492    qemu_mutex_unlock(&param->mutex);
 493
 494    return NULL;
 495}
 496
 497static void compress_threads_save_cleanup(void)
 498{
 499    int i, thread_count;
 500
 501    if (!migrate_use_compression() || !comp_param) {
 502        return;
 503    }
 504
 505    thread_count = migrate_compress_threads();
 506    for (i = 0; i < thread_count; i++) {
 507        /*
 508         * we use it as a indicator which shows if the thread is
 509         * properly init'd or not
 510         */
 511        if (!comp_param[i].file) {
 512            break;
 513        }
 514
 515        qemu_mutex_lock(&comp_param[i].mutex);
 516        comp_param[i].quit = true;
 517        qemu_cond_signal(&comp_param[i].cond);
 518        qemu_mutex_unlock(&comp_param[i].mutex);
 519
 520        qemu_thread_join(compress_threads + i);
 521        qemu_mutex_destroy(&comp_param[i].mutex);
 522        qemu_cond_destroy(&comp_param[i].cond);
 523        deflateEnd(&comp_param[i].stream);
 524        g_free(comp_param[i].originbuf);
 525        qemu_fclose(comp_param[i].file);
 526        comp_param[i].file = NULL;
 527    }
 528    qemu_mutex_destroy(&comp_done_lock);
 529    qemu_cond_destroy(&comp_done_cond);
 530    g_free(compress_threads);
 531    g_free(comp_param);
 532    compress_threads = NULL;
 533    comp_param = NULL;
 534}
 535
 536static int compress_threads_save_setup(void)
 537{
 538    int i, thread_count;
 539
 540    if (!migrate_use_compression()) {
 541        return 0;
 542    }
 543    thread_count = migrate_compress_threads();
 544    compress_threads = g_new0(QemuThread, thread_count);
 545    comp_param = g_new0(CompressParam, thread_count);
 546    qemu_cond_init(&comp_done_cond);
 547    qemu_mutex_init(&comp_done_lock);
 548    for (i = 0; i < thread_count; i++) {
 549        comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
 550        if (!comp_param[i].originbuf) {
 551            goto exit;
 552        }
 553
 554        if (deflateInit(&comp_param[i].stream,
 555                        migrate_compress_level()) != Z_OK) {
 556            g_free(comp_param[i].originbuf);
 557            goto exit;
 558        }
 559
 560        /* comp_param[i].file is just used as a dummy buffer to save data,
 561         * set its ops to empty.
 562         */
 563        comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
 564        comp_param[i].done = true;
 565        comp_param[i].quit = false;
 566        qemu_mutex_init(&comp_param[i].mutex);
 567        qemu_cond_init(&comp_param[i].cond);
 568        qemu_thread_create(compress_threads + i, "compress",
 569                           do_data_compress, comp_param + i,
 570                           QEMU_THREAD_JOINABLE);
 571    }
 572    return 0;
 573
 574exit:
 575    compress_threads_save_cleanup();
 576    return -1;
 577}
 578
 579/* Multiple fd's */
 580
 581#define MULTIFD_MAGIC 0x11223344U
 582#define MULTIFD_VERSION 1
 583
 584#define MULTIFD_FLAG_SYNC (1 << 0)
 585
 586/* This value needs to be a multiple of qemu_target_page_size() */
 587#define MULTIFD_PACKET_SIZE (512 * 1024)
 588
 589typedef struct {
 590    uint32_t magic;
 591    uint32_t version;
 592    unsigned char uuid[16]; /* QemuUUID */
 593    uint8_t id;
 594    uint8_t unused1[7];     /* Reserved for future use */
 595    uint64_t unused2[4];    /* Reserved for future use */
 596} __attribute__((packed)) MultiFDInit_t;
 597
 598typedef struct {
 599    uint32_t magic;
 600    uint32_t version;
 601    uint32_t flags;
 602    /* maximum number of allocated pages */
 603    uint32_t pages_alloc;
 604    uint32_t pages_used;
 605    /* size of the next packet that contains pages */
 606    uint32_t next_packet_size;
 607    uint64_t packet_num;
 608    uint64_t unused[4];    /* Reserved for future use */
 609    char ramblock[256];
 610    uint64_t offset[];
 611} __attribute__((packed)) MultiFDPacket_t;
 612
 613typedef struct {
 614    /* number of used pages */
 615    uint32_t used;
 616    /* number of allocated pages */
 617    uint32_t allocated;
 618    /* global number of generated multifd packets */
 619    uint64_t packet_num;
 620    /* offset of each page */
 621    ram_addr_t *offset;
 622    /* pointer to each page */
 623    struct iovec *iov;
 624    RAMBlock *block;
 625} MultiFDPages_t;
 626
 627typedef struct {
 628    /* this fields are not changed once the thread is created */
 629    /* channel number */
 630    uint8_t id;
 631    /* channel thread name */
 632    char *name;
 633    /* channel thread id */
 634    QemuThread thread;
 635    /* communication channel */
 636    QIOChannel *c;
 637    /* sem where to wait for more work */
 638    QemuSemaphore sem;
 639    /* this mutex protects the following parameters */
 640    QemuMutex mutex;
 641    /* is this channel thread running */
 642    bool running;
 643    /* should this thread finish */
 644    bool quit;
 645    /* thread has work to do */
 646    int pending_job;
 647    /* array of pages to sent */
 648    MultiFDPages_t *pages;
 649    /* packet allocated len */
 650    uint32_t packet_len;
 651    /* pointer to the packet */
 652    MultiFDPacket_t *packet;
 653    /* multifd flags for each packet */
 654    uint32_t flags;
 655    /* size of the next packet that contains pages */
 656    uint32_t next_packet_size;
 657    /* global number of generated multifd packets */
 658    uint64_t packet_num;
 659    /* thread local variables */
 660    /* packets sent through this channel */
 661    uint64_t num_packets;
 662    /* pages sent through this channel */
 663    uint64_t num_pages;
 664    /* syncs main thread and channels */
 665    QemuSemaphore sem_sync;
 666}  MultiFDSendParams;
 667
 668typedef struct {
 669    /* this fields are not changed once the thread is created */
 670    /* channel number */
 671    uint8_t id;
 672    /* channel thread name */
 673    char *name;
 674    /* channel thread id */
 675    QemuThread thread;
 676    /* communication channel */
 677    QIOChannel *c;
 678    /* this mutex protects the following parameters */
 679    QemuMutex mutex;
 680    /* is this channel thread running */
 681    bool running;
 682    /* should this thread finish */
 683    bool quit;
 684    /* array of pages to receive */
 685    MultiFDPages_t *pages;
 686    /* packet allocated len */
 687    uint32_t packet_len;
 688    /* pointer to the packet */
 689    MultiFDPacket_t *packet;
 690    /* multifd flags for each packet */
 691    uint32_t flags;
 692    /* global number of generated multifd packets */
 693    uint64_t packet_num;
 694    /* thread local variables */
 695    /* size of the next packet that contains pages */
 696    uint32_t next_packet_size;
 697    /* packets sent through this channel */
 698    uint64_t num_packets;
 699    /* pages sent through this channel */
 700    uint64_t num_pages;
 701    /* syncs main thread and channels */
 702    QemuSemaphore sem_sync;
 703} MultiFDRecvParams;
 704
 705static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
 706{
 707    MultiFDInit_t msg;
 708    int ret;
 709
 710    msg.magic = cpu_to_be32(MULTIFD_MAGIC);
 711    msg.version = cpu_to_be32(MULTIFD_VERSION);
 712    msg.id = p->id;
 713    memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
 714
 715    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
 716    if (ret != 0) {
 717        return -1;
 718    }
 719    return 0;
 720}
 721
 722static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
 723{
 724    MultiFDInit_t msg;
 725    int ret;
 726
 727    ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
 728    if (ret != 0) {
 729        return -1;
 730    }
 731
 732    msg.magic = be32_to_cpu(msg.magic);
 733    msg.version = be32_to_cpu(msg.version);
 734
 735    if (msg.magic != MULTIFD_MAGIC) {
 736        error_setg(errp, "multifd: received packet magic %x "
 737                   "expected %x", msg.magic, MULTIFD_MAGIC);
 738        return -1;
 739    }
 740
 741    if (msg.version != MULTIFD_VERSION) {
 742        error_setg(errp, "multifd: received packet version %d "
 743                   "expected %d", msg.version, MULTIFD_VERSION);
 744        return -1;
 745    }
 746
 747    if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
 748        char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
 749        char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
 750
 751        error_setg(errp, "multifd: received uuid '%s' and expected "
 752                   "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
 753        g_free(uuid);
 754        g_free(msg_uuid);
 755        return -1;
 756    }
 757
 758    if (msg.id > migrate_multifd_channels()) {
 759        error_setg(errp, "multifd: received channel version %d "
 760                   "expected %d", msg.version, MULTIFD_VERSION);
 761        return -1;
 762    }
 763
 764    return msg.id;
 765}
 766
 767static MultiFDPages_t *multifd_pages_init(size_t size)
 768{
 769    MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
 770
 771    pages->allocated = size;
 772    pages->iov = g_new0(struct iovec, size);
 773    pages->offset = g_new0(ram_addr_t, size);
 774
 775    return pages;
 776}
 777
 778static void multifd_pages_clear(MultiFDPages_t *pages)
 779{
 780    pages->used = 0;
 781    pages->allocated = 0;
 782    pages->packet_num = 0;
 783    pages->block = NULL;
 784    g_free(pages->iov);
 785    pages->iov = NULL;
 786    g_free(pages->offset);
 787    pages->offset = NULL;
 788    g_free(pages);
 789}
 790
 791static void multifd_send_fill_packet(MultiFDSendParams *p)
 792{
 793    MultiFDPacket_t *packet = p->packet;
 794    int i;
 795
 796    packet->flags = cpu_to_be32(p->flags);
 797    packet->pages_alloc = cpu_to_be32(p->pages->allocated);
 798    packet->pages_used = cpu_to_be32(p->pages->used);
 799    packet->next_packet_size = cpu_to_be32(p->next_packet_size);
 800    packet->packet_num = cpu_to_be64(p->packet_num);
 801
 802    if (p->pages->block) {
 803        strncpy(packet->ramblock, p->pages->block->idstr, 256);
 804    }
 805
 806    for (i = 0; i < p->pages->used; i++) {
 807        packet->offset[i] = cpu_to_be64(p->pages->offset[i]);
 808    }
 809}
 810
 811static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
 812{
 813    MultiFDPacket_t *packet = p->packet;
 814    uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size();
 815    RAMBlock *block;
 816    int i;
 817
 818    packet->magic = be32_to_cpu(packet->magic);
 819    if (packet->magic != MULTIFD_MAGIC) {
 820        error_setg(errp, "multifd: received packet "
 821                   "magic %x and expected magic %x",
 822                   packet->magic, MULTIFD_MAGIC);
 823        return -1;
 824    }
 825
 826    packet->version = be32_to_cpu(packet->version);
 827    if (packet->version != MULTIFD_VERSION) {
 828        error_setg(errp, "multifd: received packet "
 829                   "version %d and expected version %d",
 830                   packet->version, MULTIFD_VERSION);
 831        return -1;
 832    }
 833
 834    p->flags = be32_to_cpu(packet->flags);
 835
 836    packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
 837    /*
 838     * If we received a packet that is 100 times bigger than expected
 839     * just stop migration.  It is a magic number.
 840     */
 841    if (packet->pages_alloc > pages_max * 100) {
 842        error_setg(errp, "multifd: received packet "
 843                   "with size %d and expected a maximum size of %d",
 844                   packet->pages_alloc, pages_max * 100) ;
 845        return -1;
 846    }
 847    /*
 848     * We received a packet that is bigger than expected but inside
 849     * reasonable limits (see previous comment).  Just reallocate.
 850     */
 851    if (packet->pages_alloc > p->pages->allocated) {
 852        multifd_pages_clear(p->pages);
 853        p->pages = multifd_pages_init(packet->pages_alloc);
 854    }
 855
 856    p->pages->used = be32_to_cpu(packet->pages_used);
 857    if (p->pages->used > packet->pages_alloc) {
 858        error_setg(errp, "multifd: received packet "
 859                   "with %d pages and expected maximum pages are %d",
 860                   p->pages->used, packet->pages_alloc) ;
 861        return -1;
 862    }
 863
 864    p->next_packet_size = be32_to_cpu(packet->next_packet_size);
 865    p->packet_num = be64_to_cpu(packet->packet_num);
 866
 867    if (p->pages->used) {
 868        /* make sure that ramblock is 0 terminated */
 869        packet->ramblock[255] = 0;
 870        block = qemu_ram_block_by_name(packet->ramblock);
 871        if (!block) {
 872            error_setg(errp, "multifd: unknown ram block %s",
 873                       packet->ramblock);
 874            return -1;
 875        }
 876    }
 877
 878    for (i = 0; i < p->pages->used; i++) {
 879        ram_addr_t offset = be64_to_cpu(packet->offset[i]);
 880
 881        if (offset > (block->used_length - TARGET_PAGE_SIZE)) {
 882            error_setg(errp, "multifd: offset too long " RAM_ADDR_FMT
 883                       " (max " RAM_ADDR_FMT ")",
 884                       offset, block->max_length);
 885            return -1;
 886        }
 887        p->pages->iov[i].iov_base = block->host + offset;
 888        p->pages->iov[i].iov_len = TARGET_PAGE_SIZE;
 889    }
 890
 891    return 0;
 892}
 893
 894struct {
 895    MultiFDSendParams *params;
 896    /* array of pages to sent */
 897    MultiFDPages_t *pages;
 898    /* global number of generated multifd packets */
 899    uint64_t packet_num;
 900    /* send channels ready */
 901    QemuSemaphore channels_ready;
 902} *multifd_send_state;
 903
 904/*
 905 * How we use multifd_send_state->pages and channel->pages?
 906 *
 907 * We create a pages for each channel, and a main one.  Each time that
 908 * we need to send a batch of pages we interchange the ones between
 909 * multifd_send_state and the channel that is sending it.  There are
 910 * two reasons for that:
 911 *    - to not have to do so many mallocs during migration
 912 *    - to make easier to know what to free at the end of migration
 913 *
 914 * This way we always know who is the owner of each "pages" struct,
 915 * and we don't need any locking.  It belongs to the migration thread
 916 * or to the channel thread.  Switching is safe because the migration
 917 * thread is using the channel mutex when changing it, and the channel
 918 * have to had finish with its own, otherwise pending_job can't be
 919 * false.
 920 */
 921
 922static int multifd_send_pages(RAMState *rs)
 923{
 924    int i;
 925    static int next_channel;
 926    MultiFDSendParams *p = NULL; /* make happy gcc */
 927    MultiFDPages_t *pages = multifd_send_state->pages;
 928    uint64_t transferred;
 929
 930    qemu_sem_wait(&multifd_send_state->channels_ready);
 931    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
 932        p = &multifd_send_state->params[i];
 933
 934        qemu_mutex_lock(&p->mutex);
 935        if (p->quit) {
 936            error_report("%s: channel %d has already quit!", __func__, i);
 937            qemu_mutex_unlock(&p->mutex);
 938            return -1;
 939        }
 940        if (!p->pending_job) {
 941            p->pending_job++;
 942            next_channel = (i + 1) % migrate_multifd_channels();
 943            break;
 944        }
 945        qemu_mutex_unlock(&p->mutex);
 946    }
 947    p->pages->used = 0;
 948
 949    p->packet_num = multifd_send_state->packet_num++;
 950    p->pages->block = NULL;
 951    multifd_send_state->pages = p->pages;
 952    p->pages = pages;
 953    transferred = ((uint64_t) pages->used) * TARGET_PAGE_SIZE + p->packet_len;
 954    qemu_file_update_transfer(rs->f, transferred);
 955    ram_counters.multifd_bytes += transferred;
 956    ram_counters.transferred += transferred;;
 957    qemu_mutex_unlock(&p->mutex);
 958    qemu_sem_post(&p->sem);
 959
 960    return 1;
 961}
 962
 963static int multifd_queue_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
 964{
 965    MultiFDPages_t *pages = multifd_send_state->pages;
 966
 967    if (!pages->block) {
 968        pages->block = block;
 969    }
 970
 971    if (pages->block == block) {
 972        pages->offset[pages->used] = offset;
 973        pages->iov[pages->used].iov_base = block->host + offset;
 974        pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
 975        pages->used++;
 976
 977        if (pages->used < pages->allocated) {
 978            return 1;
 979        }
 980    }
 981
 982    if (multifd_send_pages(rs) < 0) {
 983        return -1;
 984    }
 985
 986    if (pages->block != block) {
 987        return  multifd_queue_page(rs, block, offset);
 988    }
 989
 990    return 1;
 991}
 992
 993static void multifd_send_terminate_threads(Error *err)
 994{
 995    int i;
 996
 997    trace_multifd_send_terminate_threads(err != NULL);
 998
 999    if (err) {
1000        MigrationState *s = migrate_get_current();
1001        migrate_set_error(s, err);
1002        if (s->state == MIGRATION_STATUS_SETUP ||
1003            s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
1004            s->state == MIGRATION_STATUS_DEVICE ||
1005            s->state == MIGRATION_STATUS_ACTIVE) {
1006            migrate_set_state(&s->state, s->state,
1007                              MIGRATION_STATUS_FAILED);
1008        }
1009    }
1010
1011    for (i = 0; i < migrate_multifd_channels(); i++) {
1012        MultiFDSendParams *p = &multifd_send_state->params[i];
1013
1014        qemu_mutex_lock(&p->mutex);
1015        p->quit = true;
1016        qemu_sem_post(&p->sem);
1017        qemu_mutex_unlock(&p->mutex);
1018    }
1019}
1020
1021void multifd_save_cleanup(void)
1022{
1023    int i;
1024
1025    if (!migrate_use_multifd()) {
1026        return;
1027    }
1028    multifd_send_terminate_threads(NULL);
1029    for (i = 0; i < migrate_multifd_channels(); i++) {
1030        MultiFDSendParams *p = &multifd_send_state->params[i];
1031
1032        if (p->running) {
1033            qemu_thread_join(&p->thread);
1034        }
1035        socket_send_channel_destroy(p->c);
1036        p->c = NULL;
1037        qemu_mutex_destroy(&p->mutex);
1038        qemu_sem_destroy(&p->sem);
1039        qemu_sem_destroy(&p->sem_sync);
1040        g_free(p->name);
1041        p->name = NULL;
1042        multifd_pages_clear(p->pages);
1043        p->pages = NULL;
1044        p->packet_len = 0;
1045        g_free(p->packet);
1046        p->packet = NULL;
1047    }
1048    qemu_sem_destroy(&multifd_send_state->channels_ready);
1049    g_free(multifd_send_state->params);
1050    multifd_send_state->params = NULL;
1051    multifd_pages_clear(multifd_send_state->pages);
1052    multifd_send_state->pages = NULL;
1053    g_free(multifd_send_state);
1054    multifd_send_state = NULL;
1055}
1056
1057static void multifd_send_sync_main(RAMState *rs)
1058{
1059    int i;
1060
1061    if (!migrate_use_multifd()) {
1062        return;
1063    }
1064    if (multifd_send_state->pages->used) {
1065        if (multifd_send_pages(rs) < 0) {
1066            error_report("%s: multifd_send_pages fail", __func__);
1067            return;
1068        }
1069    }
1070    for (i = 0; i < migrate_multifd_channels(); i++) {
1071        MultiFDSendParams *p = &multifd_send_state->params[i];
1072
1073        trace_multifd_send_sync_main_signal(p->id);
1074
1075        qemu_mutex_lock(&p->mutex);
1076
1077        if (p->quit) {
1078            error_report("%s: channel %d has already quit", __func__, i);
1079            qemu_mutex_unlock(&p->mutex);
1080            return;
1081        }
1082
1083        p->packet_num = multifd_send_state->packet_num++;
1084        p->flags |= MULTIFD_FLAG_SYNC;
1085        p->pending_job++;
1086        qemu_file_update_transfer(rs->f, p->packet_len);
1087        ram_counters.multifd_bytes += p->packet_len;
1088        ram_counters.transferred += p->packet_len;
1089        qemu_mutex_unlock(&p->mutex);
1090        qemu_sem_post(&p->sem);
1091    }
1092    for (i = 0; i < migrate_multifd_channels(); i++) {
1093        MultiFDSendParams *p = &multifd_send_state->params[i];
1094
1095        trace_multifd_send_sync_main_wait(p->id);
1096        qemu_sem_wait(&p->sem_sync);
1097    }
1098    trace_multifd_send_sync_main(multifd_send_state->packet_num);
1099}
1100
1101static void *multifd_send_thread(void *opaque)
1102{
1103    MultiFDSendParams *p = opaque;
1104    Error *local_err = NULL;
1105    int ret = 0;
1106    uint32_t flags = 0;
1107
1108    trace_multifd_send_thread_start(p->id);
1109    rcu_register_thread();
1110
1111    if (multifd_send_initial_packet(p, &local_err) < 0) {
1112        ret = -1;
1113        goto out;
1114    }
1115    /* initial packet */
1116    p->num_packets = 1;
1117
1118    while (true) {
1119        qemu_sem_wait(&p->sem);
1120        qemu_mutex_lock(&p->mutex);
1121
1122        if (p->pending_job) {
1123            uint32_t used = p->pages->used;
1124            uint64_t packet_num = p->packet_num;
1125            flags = p->flags;
1126
1127            p->next_packet_size = used * qemu_target_page_size();
1128            multifd_send_fill_packet(p);
1129            p->flags = 0;
1130            p->num_packets++;
1131            p->num_pages += used;
1132            qemu_mutex_unlock(&p->mutex);
1133
1134            trace_multifd_send(p->id, packet_num, used, flags,
1135                               p->next_packet_size);
1136
1137            ret = qio_channel_write_all(p->c, (void *)p->packet,
1138                                        p->packet_len, &local_err);
1139            if (ret != 0) {
1140                break;
1141            }
1142
1143            if (used) {
1144                ret = qio_channel_writev_all(p->c, p->pages->iov,
1145                                             used, &local_err);
1146                if (ret != 0) {
1147                    break;
1148                }
1149            }
1150
1151            qemu_mutex_lock(&p->mutex);
1152            p->pending_job--;
1153            qemu_mutex_unlock(&p->mutex);
1154
1155            if (flags & MULTIFD_FLAG_SYNC) {
1156                qemu_sem_post(&p->sem_sync);
1157            }
1158            qemu_sem_post(&multifd_send_state->channels_ready);
1159        } else if (p->quit) {
1160            qemu_mutex_unlock(&p->mutex);
1161            break;
1162        } else {
1163            qemu_mutex_unlock(&p->mutex);
1164            /* sometimes there are spurious wakeups */
1165        }
1166    }
1167
1168out:
1169    if (local_err) {
1170        trace_multifd_send_error(p->id);
1171        multifd_send_terminate_threads(local_err);
1172    }
1173
1174    /*
1175     * Error happen, I will exit, but I can't just leave, tell
1176     * who pay attention to me.
1177     */
1178    if (ret != 0) {
1179        qemu_sem_post(&p->sem_sync);
1180        qemu_sem_post(&multifd_send_state->channels_ready);
1181    }
1182
1183    qemu_mutex_lock(&p->mutex);
1184    p->running = false;
1185    qemu_mutex_unlock(&p->mutex);
1186
1187    rcu_unregister_thread();
1188    trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
1189
1190    return NULL;
1191}
1192
1193static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
1194{
1195    MultiFDSendParams *p = opaque;
1196    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
1197    Error *local_err = NULL;
1198
1199    trace_multifd_new_send_channel_async(p->id);
1200    if (qio_task_propagate_error(task, &local_err)) {
1201        migrate_set_error(migrate_get_current(), local_err);
1202        multifd_save_cleanup();
1203    } else {
1204        p->c = QIO_CHANNEL(sioc);
1205        qio_channel_set_delay(p->c, false);
1206        p->running = true;
1207        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
1208                           QEMU_THREAD_JOINABLE);
1209    }
1210}
1211
1212int multifd_save_setup(void)
1213{
1214    int thread_count;
1215    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1216    uint8_t i;
1217
1218    if (!migrate_use_multifd()) {
1219        return 0;
1220    }
1221    thread_count = migrate_multifd_channels();
1222    multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1223    multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1224    multifd_send_state->pages = multifd_pages_init(page_count);
1225    qemu_sem_init(&multifd_send_state->channels_ready, 0);
1226
1227    for (i = 0; i < thread_count; i++) {
1228        MultiFDSendParams *p = &multifd_send_state->params[i];
1229
1230        qemu_mutex_init(&p->mutex);
1231        qemu_sem_init(&p->sem, 0);
1232        qemu_sem_init(&p->sem_sync, 0);
1233        p->quit = false;
1234        p->pending_job = 0;
1235        p->id = i;
1236        p->pages = multifd_pages_init(page_count);
1237        p->packet_len = sizeof(MultiFDPacket_t)
1238                      + sizeof(ram_addr_t) * page_count;
1239        p->packet = g_malloc0(p->packet_len);
1240        p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
1241        p->packet->version = cpu_to_be32(MULTIFD_VERSION);
1242        p->name = g_strdup_printf("multifdsend_%d", i);
1243        socket_send_channel_create(multifd_new_send_channel_async, p);
1244    }
1245    return 0;
1246}
1247
1248struct {
1249    MultiFDRecvParams *params;
1250    /* number of created threads */
1251    int count;
1252    /* syncs main thread and channels */
1253    QemuSemaphore sem_sync;
1254    /* global number of generated multifd packets */
1255    uint64_t packet_num;
1256} *multifd_recv_state;
1257
1258static void multifd_recv_terminate_threads(Error *err)
1259{
1260    int i;
1261
1262    trace_multifd_recv_terminate_threads(err != NULL);
1263
1264    if (err) {
1265        MigrationState *s = migrate_get_current();
1266        migrate_set_error(s, err);
1267        if (s->state == MIGRATION_STATUS_SETUP ||
1268            s->state == MIGRATION_STATUS_ACTIVE) {
1269            migrate_set_state(&s->state, s->state,
1270                              MIGRATION_STATUS_FAILED);
1271        }
1272    }
1273
1274    for (i = 0; i < migrate_multifd_channels(); i++) {
1275        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1276
1277        qemu_mutex_lock(&p->mutex);
1278        p->quit = true;
1279        /* We could arrive here for two reasons:
1280           - normal quit, i.e. everything went fine, just finished
1281           - error quit: We close the channels so the channel threads
1282             finish the qio_channel_read_all_eof() */
1283        qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1284        qemu_mutex_unlock(&p->mutex);
1285    }
1286}
1287
1288int multifd_load_cleanup(Error **errp)
1289{
1290    int i;
1291    int ret = 0;
1292
1293    if (!migrate_use_multifd()) {
1294        return 0;
1295    }
1296    multifd_recv_terminate_threads(NULL);
1297    for (i = 0; i < migrate_multifd_channels(); i++) {
1298        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1299
1300        if (p->running) {
1301            p->quit = true;
1302            /*
1303             * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
1304             * however try to wakeup it without harm in cleanup phase.
1305             */
1306            qemu_sem_post(&p->sem_sync);
1307            qemu_thread_join(&p->thread);
1308        }
1309        object_unref(OBJECT(p->c));
1310        p->c = NULL;
1311        qemu_mutex_destroy(&p->mutex);
1312        qemu_sem_destroy(&p->sem_sync);
1313        g_free(p->name);
1314        p->name = NULL;
1315        multifd_pages_clear(p->pages);
1316        p->pages = NULL;
1317        p->packet_len = 0;
1318        g_free(p->packet);
1319        p->packet = NULL;
1320    }
1321    qemu_sem_destroy(&multifd_recv_state->sem_sync);
1322    g_free(multifd_recv_state->params);
1323    multifd_recv_state->params = NULL;
1324    g_free(multifd_recv_state);
1325    multifd_recv_state = NULL;
1326
1327    return ret;
1328}
1329
1330static void multifd_recv_sync_main(void)
1331{
1332    int i;
1333
1334    if (!migrate_use_multifd()) {
1335        return;
1336    }
1337    for (i = 0; i < migrate_multifd_channels(); i++) {
1338        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1339
1340        trace_multifd_recv_sync_main_wait(p->id);
1341        qemu_sem_wait(&multifd_recv_state->sem_sync);
1342    }
1343    for (i = 0; i < migrate_multifd_channels(); i++) {
1344        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1345
1346        qemu_mutex_lock(&p->mutex);
1347        if (multifd_recv_state->packet_num < p->packet_num) {
1348            multifd_recv_state->packet_num = p->packet_num;
1349        }
1350        qemu_mutex_unlock(&p->mutex);
1351        trace_multifd_recv_sync_main_signal(p->id);
1352        qemu_sem_post(&p->sem_sync);
1353    }
1354    trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1355}
1356
1357static void *multifd_recv_thread(void *opaque)
1358{
1359    MultiFDRecvParams *p = opaque;
1360    Error *local_err = NULL;
1361    int ret;
1362
1363    trace_multifd_recv_thread_start(p->id);
1364    rcu_register_thread();
1365
1366    while (true) {
1367        uint32_t used;
1368        uint32_t flags;
1369
1370        if (p->quit) {
1371            break;
1372        }
1373
1374        ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1375                                       p->packet_len, &local_err);
1376        if (ret == 0) {   /* EOF */
1377            break;
1378        }
1379        if (ret == -1) {   /* Error */
1380            break;
1381        }
1382
1383        qemu_mutex_lock(&p->mutex);
1384        ret = multifd_recv_unfill_packet(p, &local_err);
1385        if (ret) {
1386            qemu_mutex_unlock(&p->mutex);
1387            break;
1388        }
1389
1390        used = p->pages->used;
1391        flags = p->flags;
1392        trace_multifd_recv(p->id, p->packet_num, used, flags,
1393                           p->next_packet_size);
1394        p->num_packets++;
1395        p->num_pages += used;
1396        qemu_mutex_unlock(&p->mutex);
1397
1398        if (used) {
1399            ret = qio_channel_readv_all(p->c, p->pages->iov,
1400                                        used, &local_err);
1401            if (ret != 0) {
1402                break;
1403            }
1404        }
1405
1406        if (flags & MULTIFD_FLAG_SYNC) {
1407            qemu_sem_post(&multifd_recv_state->sem_sync);
1408            qemu_sem_wait(&p->sem_sync);
1409        }
1410    }
1411
1412    if (local_err) {
1413        multifd_recv_terminate_threads(local_err);
1414    }
1415    qemu_mutex_lock(&p->mutex);
1416    p->running = false;
1417    qemu_mutex_unlock(&p->mutex);
1418
1419    rcu_unregister_thread();
1420    trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
1421
1422    return NULL;
1423}
1424
1425int multifd_load_setup(void)
1426{
1427    int thread_count;
1428    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1429    uint8_t i;
1430
1431    if (!migrate_use_multifd()) {
1432        return 0;
1433    }
1434    thread_count = migrate_multifd_channels();
1435    multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1436    multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1437    atomic_set(&multifd_recv_state->count, 0);
1438    qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1439
1440    for (i = 0; i < thread_count; i++) {
1441        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1442
1443        qemu_mutex_init(&p->mutex);
1444        qemu_sem_init(&p->sem_sync, 0);
1445        p->quit = false;
1446        p->id = i;
1447        p->pages = multifd_pages_init(page_count);
1448        p->packet_len = sizeof(MultiFDPacket_t)
1449                      + sizeof(ram_addr_t) * page_count;
1450        p->packet = g_malloc0(p->packet_len);
1451        p->name = g_strdup_printf("multifdrecv_%d", i);
1452    }
1453    return 0;
1454}
1455
1456bool multifd_recv_all_channels_created(void)
1457{
1458    int thread_count = migrate_multifd_channels();
1459
1460    if (!migrate_use_multifd()) {
1461        return true;
1462    }
1463
1464    return thread_count == atomic_read(&multifd_recv_state->count);
1465}
1466
1467/*
1468 * Try to receive all multifd channels to get ready for the migration.
1469 * - Return true and do not set @errp when correctly receving all channels;
1470 * - Return false and do not set @errp when correctly receiving the current one;
1471 * - Return false and set @errp when failing to receive the current channel.
1472 */
1473bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1474{
1475    MultiFDRecvParams *p;
1476    Error *local_err = NULL;
1477    int id;
1478
1479    id = multifd_recv_initial_packet(ioc, &local_err);
1480    if (id < 0) {
1481        multifd_recv_terminate_threads(local_err);
1482        error_propagate_prepend(errp, local_err,
1483                                "failed to receive packet"
1484                                " via multifd channel %d: ",
1485                                atomic_read(&multifd_recv_state->count));
1486        return false;
1487    }
1488    trace_multifd_recv_new_channel(id);
1489
1490    p = &multifd_recv_state->params[id];
1491    if (p->c != NULL) {
1492        error_setg(&local_err, "multifd: received id '%d' already setup'",
1493                   id);
1494        multifd_recv_terminate_threads(local_err);
1495        error_propagate(errp, local_err);
1496        return false;
1497    }
1498    p->c = ioc;
1499    object_ref(OBJECT(ioc));
1500    /* initial packet */
1501    p->num_packets = 1;
1502
1503    p->running = true;
1504    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1505                       QEMU_THREAD_JOINABLE);
1506    atomic_inc(&multifd_recv_state->count);
1507    return atomic_read(&multifd_recv_state->count) ==
1508           migrate_multifd_channels();
1509}
1510
1511/**
1512 * save_page_header: write page header to wire
1513 *
1514 * If this is the 1st block, it also writes the block identification
1515 *
1516 * Returns the number of bytes written
1517 *
1518 * @f: QEMUFile where to send the data
1519 * @block: block that contains the page we want to send
1520 * @offset: offset inside the block for the page
1521 *          in the lower bits, it contains flags
1522 */
1523static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
1524                               ram_addr_t offset)
1525{
1526    size_t size, len;
1527
1528    if (block == rs->last_sent_block) {
1529        offset |= RAM_SAVE_FLAG_CONTINUE;
1530    }
1531    qemu_put_be64(f, offset);
1532    size = 8;
1533
1534    if (!(offset & RAM_SAVE_FLAG_CONTINUE)) {
1535        len = strlen(block->idstr);
1536        qemu_put_byte(f, len);
1537        qemu_put_buffer(f, (uint8_t *)block->idstr, len);
1538        size += 1 + len;
1539        rs->last_sent_block = block;
1540    }
1541    return size;
1542}
1543
1544/**
1545 * mig_throttle_guest_down: throotle down the guest
1546 *
1547 * Reduce amount of guest cpu execution to hopefully slow down memory
1548 * writes. If guest dirty memory rate is reduced below the rate at
1549 * which we can transfer pages to the destination then we should be
1550 * able to complete migration. Some workloads dirty memory way too
1551 * fast and will not effectively converge, even with auto-converge.
1552 */
1553static void mig_throttle_guest_down(void)
1554{
1555    MigrationState *s = migrate_get_current();
1556    uint64_t pct_initial = s->parameters.cpu_throttle_initial;
1557    uint64_t pct_icrement = s->parameters.cpu_throttle_increment;
1558    int pct_max = s->parameters.max_cpu_throttle;
1559
1560    /* We have not started throttling yet. Let's start it. */
1561    if (!cpu_throttle_active()) {
1562        cpu_throttle_set(pct_initial);
1563    } else {
1564        /* Throttling already on, just increase the rate */
1565        cpu_throttle_set(MIN(cpu_throttle_get_percentage() + pct_icrement,
1566                         pct_max));
1567    }
1568}
1569
1570/**
1571 * xbzrle_cache_zero_page: insert a zero page in the XBZRLE cache
1572 *
1573 * @rs: current RAM state
1574 * @current_addr: address for the zero page
1575 *
1576 * Update the xbzrle cache to reflect a page that's been sent as all 0.
1577 * The important thing is that a stale (not-yet-0'd) page be replaced
1578 * by the new data.
1579 * As a bonus, if the page wasn't in the cache it gets added so that
1580 * when a small write is made into the 0'd page it gets XBZRLE sent.
1581 */
1582static void xbzrle_cache_zero_page(RAMState *rs, ram_addr_t current_addr)
1583{
1584    if (rs->ram_bulk_stage || !migrate_use_xbzrle()) {
1585        return;
1586    }
1587
1588    /* We don't care if this fails to allocate a new cache page
1589     * as long as it updated an old one */
1590    cache_insert(XBZRLE.cache, current_addr, XBZRLE.zero_target_page,
1591                 ram_counters.dirty_sync_count);
1592}
1593
1594#define ENCODING_FLAG_XBZRLE 0x1
1595
1596/**
1597 * save_xbzrle_page: compress and send current page
1598 *
1599 * Returns: 1 means that we wrote the page
1600 *          0 means that page is identical to the one already sent
1601 *          -1 means that xbzrle would be longer than normal
1602 *
1603 * @rs: current RAM state
1604 * @current_data: pointer to the address of the page contents
1605 * @current_addr: addr of the page
1606 * @block: block that contains the page we want to send
1607 * @offset: offset inside the block for the page
1608 * @last_stage: if we are at the completion stage
1609 */
1610static int save_xbzrle_page(RAMState *rs, uint8_t **current_data,
1611                            ram_addr_t current_addr, RAMBlock *block,
1612                            ram_addr_t offset, bool last_stage)
1613{
1614    int encoded_len = 0, bytes_xbzrle;
1615    uint8_t *prev_cached_page;
1616
1617    if (!cache_is_cached(XBZRLE.cache, current_addr,
1618                         ram_counters.dirty_sync_count)) {
1619        xbzrle_counters.cache_miss++;
1620        if (!last_stage) {
1621            if (cache_insert(XBZRLE.cache, current_addr, *current_data,
1622                             ram_counters.dirty_sync_count) == -1) {
1623                return -1;
1624            } else {
1625                /* update *current_data when the page has been
1626                   inserted into cache */
1627                *current_data = get_cached_data(XBZRLE.cache, current_addr);
1628            }
1629        }
1630        return -1;
1631    }
1632
1633    prev_cached_page = get_cached_data(XBZRLE.cache, current_addr);
1634
1635    /* save current buffer into memory */
1636    memcpy(XBZRLE.current_buf, *current_data, TARGET_PAGE_SIZE);
1637
1638    /* XBZRLE encoding (if there is no overflow) */
1639    encoded_len = xbzrle_encode_buffer(prev_cached_page, XBZRLE.current_buf,
1640                                       TARGET_PAGE_SIZE, XBZRLE.encoded_buf,
1641                                       TARGET_PAGE_SIZE);
1642
1643    /*
1644     * Update the cache contents, so that it corresponds to the data
1645     * sent, in all cases except where we skip the page.
1646     */
1647    if (!last_stage && encoded_len != 0) {
1648        memcpy(prev_cached_page, XBZRLE.current_buf, TARGET_PAGE_SIZE);
1649        /*
1650         * In the case where we couldn't compress, ensure that the caller
1651         * sends the data from the cache, since the guest might have
1652         * changed the RAM since we copied it.
1653         */
1654        *current_data = prev_cached_page;
1655    }
1656
1657    if (encoded_len == 0) {
1658        trace_save_xbzrle_page_skipping();
1659        return 0;
1660    } else if (encoded_len == -1) {
1661        trace_save_xbzrle_page_overflow();
1662        xbzrle_counters.overflow++;
1663        return -1;
1664    }
1665
1666    /* Send XBZRLE based compressed page */
1667    bytes_xbzrle = save_page_header(rs, rs->f, block,
1668                                    offset | RAM_SAVE_FLAG_XBZRLE);
1669    qemu_put_byte(rs->f, ENCODING_FLAG_XBZRLE);
1670    qemu_put_be16(rs->f, encoded_len);
1671    qemu_put_buffer(rs->f, XBZRLE.encoded_buf, encoded_len);
1672    bytes_xbzrle += encoded_len + 1 + 2;
1673    xbzrle_counters.pages++;
1674    xbzrle_counters.bytes += bytes_xbzrle;
1675    ram_counters.transferred += bytes_xbzrle;
1676
1677    return 1;
1678}
1679
1680/**
1681 * migration_bitmap_find_dirty: find the next dirty page from start
1682 *
1683 * Returns the page offset within memory region of the start of a dirty page
1684 *
1685 * @rs: current RAM state
1686 * @rb: RAMBlock where to search for dirty pages
1687 * @start: page where we start the search
1688 */
1689static inline
1690unsigned long migration_bitmap_find_dirty(RAMState *rs, RAMBlock *rb,
1691                                          unsigned long start)
1692{
1693    unsigned long size = rb->used_length >> TARGET_PAGE_BITS;
1694    unsigned long *bitmap = rb->bmap;
1695    unsigned long next;
1696
1697    if (ramblock_is_ignored(rb)) {
1698        return size;
1699    }
1700
1701    /*
1702     * When the free page optimization is enabled, we need to check the bitmap
1703     * to send the non-free pages rather than all the pages in the bulk stage.
1704     */
1705    if (!rs->fpo_enabled && rs->ram_bulk_stage && start > 0) {
1706        next = start + 1;
1707    } else {
1708        next = find_next_bit(bitmap, size, start);
1709    }
1710
1711    return next;
1712}
1713
1714static inline bool migration_bitmap_clear_dirty(RAMState *rs,
1715                                                RAMBlock *rb,
1716                                                unsigned long page)
1717{
1718    bool ret;
1719
1720    qemu_mutex_lock(&rs->bitmap_mutex);
1721
1722    /*
1723     * Clear dirty bitmap if needed.  This _must_ be called before we
1724     * send any of the page in the chunk because we need to make sure
1725     * we can capture further page content changes when we sync dirty
1726     * log the next time.  So as long as we are going to send any of
1727     * the page in the chunk we clear the remote dirty bitmap for all.
1728     * Clearing it earlier won't be a problem, but too late will.
1729     */
1730    if (rb->clear_bmap && clear_bmap_test_and_clear(rb, page)) {
1731        uint8_t shift = rb->clear_bmap_shift;
1732        hwaddr size = 1ULL << (TARGET_PAGE_BITS + shift);
1733        hwaddr start = (page << TARGET_PAGE_BITS) & (-size);
1734
1735        /*
1736         * CLEAR_BITMAP_SHIFT_MIN should always guarantee this... this
1737         * can make things easier sometimes since then start address
1738         * of the small chunk will always be 64 pages aligned so the
1739         * bitmap will always be aligned to unsigned long.  We should
1740         * even be able to remove this restriction but I'm simply
1741         * keeping it.
1742         */
1743        assert(shift >= 6);
1744        trace_migration_bitmap_clear_dirty(rb->idstr, start, size, page);
1745        memory_region_clear_dirty_bitmap(rb->mr, start, size);
1746    }
1747
1748    ret = test_and_clear_bit(page, rb->bmap);
1749
1750    if (ret) {
1751        rs->migration_dirty_pages--;
1752    }
1753    qemu_mutex_unlock(&rs->bitmap_mutex);
1754
1755    return ret;
1756}
1757
1758/* Called with RCU critical section */
1759static void ramblock_sync_dirty_bitmap(RAMState *rs, RAMBlock *rb)
1760{
1761    rs->migration_dirty_pages +=
1762        cpu_physical_memory_sync_dirty_bitmap(rb, 0, rb->used_length,
1763                                              &rs->num_dirty_pages_period);
1764}
1765
1766/**
1767 * ram_pagesize_summary: calculate all the pagesizes of a VM
1768 *
1769 * Returns a summary bitmap of the page sizes of all RAMBlocks
1770 *
1771 * For VMs with just normal pages this is equivalent to the host page
1772 * size. If it's got some huge pages then it's the OR of all the
1773 * different page sizes.
1774 */
1775uint64_t ram_pagesize_summary(void)
1776{
1777    RAMBlock *block;
1778    uint64_t summary = 0;
1779
1780    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
1781        summary |= block->page_size;
1782    }
1783
1784    return summary;
1785}
1786
1787uint64_t ram_get_total_transferred_pages(void)
1788{
1789    return  ram_counters.normal + ram_counters.duplicate +
1790                compression_counters.pages + xbzrle_counters.pages;
1791}
1792
1793static void migration_update_rates(RAMState *rs, int64_t end_time)
1794{
1795    uint64_t page_count = rs->target_page_count - rs->target_page_count_prev;
1796    double compressed_size;
1797
1798    /* calculate period counters */
1799    ram_counters.dirty_pages_rate = rs->num_dirty_pages_period * 1000
1800                / (end_time - rs->time_last_bitmap_sync);
1801
1802    if (!page_count) {
1803        return;
1804    }
1805
1806    if (migrate_use_xbzrle()) {
1807        xbzrle_counters.cache_miss_rate = (double)(xbzrle_counters.cache_miss -
1808            rs->xbzrle_cache_miss_prev) / page_count;
1809        rs->xbzrle_cache_miss_prev = xbzrle_counters.cache_miss;
1810    }
1811
1812    if (migrate_use_compression()) {
1813        compression_counters.busy_rate = (double)(compression_counters.busy -
1814            rs->compress_thread_busy_prev) / page_count;
1815        rs->compress_thread_busy_prev = compression_counters.busy;
1816
1817        compressed_size = compression_counters.compressed_size -
1818                          rs->compressed_size_prev;
1819        if (compressed_size) {
1820            double uncompressed_size = (compression_counters.pages -
1821                                    rs->compress_pages_prev) * TARGET_PAGE_SIZE;
1822
1823            /* Compression-Ratio = Uncompressed-size / Compressed-size */
1824            compression_counters.compression_rate =
1825                                        uncompressed_size / compressed_size;
1826
1827            rs->compress_pages_prev = compression_counters.pages;
1828            rs->compressed_size_prev = compression_counters.compressed_size;
1829        }
1830    }
1831}
1832
1833static void migration_bitmap_sync(RAMState *rs)
1834{
1835    RAMBlock *block;
1836    int64_t end_time;
1837    uint64_t bytes_xfer_now;
1838
1839    ram_counters.dirty_sync_count++;
1840
1841    if (!rs->time_last_bitmap_sync) {
1842        rs->time_last_bitmap_sync = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1843    }
1844
1845    trace_migration_bitmap_sync_start();
1846    memory_global_dirty_log_sync();
1847
1848    qemu_mutex_lock(&rs->bitmap_mutex);
1849    WITH_RCU_READ_LOCK_GUARD() {
1850        RAMBLOCK_FOREACH_NOT_IGNORED(block) {
1851            ramblock_sync_dirty_bitmap(rs, block);
1852        }
1853        ram_counters.remaining = ram_bytes_remaining();
1854    }
1855    qemu_mutex_unlock(&rs->bitmap_mutex);
1856
1857    memory_global_after_dirty_log_sync();
1858    trace_migration_bitmap_sync_end(rs->num_dirty_pages_period);
1859
1860    end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1861
1862    /* more than 1 second = 1000 millisecons */
1863    if (end_time > rs->time_last_bitmap_sync + 1000) {
1864        bytes_xfer_now = ram_counters.transferred;
1865
1866        /* During block migration the auto-converge logic incorrectly detects
1867         * that ram migration makes no progress. Avoid this by disabling the
1868         * throttling logic during the bulk phase of block migration. */
1869        if (migrate_auto_converge() && !blk_mig_bulk_active()) {
1870            /* The following detection logic can be refined later. For now:
1871               Check to see if the dirtied bytes is 50% more than the approx.
1872               amount of bytes that just got transferred since the last time we
1873               were in this routine. If that happens twice, start or increase
1874               throttling */
1875
1876            if ((rs->num_dirty_pages_period * TARGET_PAGE_SIZE >
1877                   (bytes_xfer_now - rs->bytes_xfer_prev) / 2) &&
1878                (++rs->dirty_rate_high_cnt >= 2)) {
1879                    trace_migration_throttle();
1880                    rs->dirty_rate_high_cnt = 0;
1881                    mig_throttle_guest_down();
1882            }
1883        }
1884
1885        migration_update_rates(rs, end_time);
1886
1887        rs->target_page_count_prev = rs->target_page_count;
1888
1889        /* reset period counters */
1890        rs->time_last_bitmap_sync = end_time;
1891        rs->num_dirty_pages_period = 0;
1892        rs->bytes_xfer_prev = bytes_xfer_now;
1893    }
1894    if (migrate_use_events()) {
1895        qapi_event_send_migration_pass(ram_counters.dirty_sync_count);
1896    }
1897}
1898
1899static void migration_bitmap_sync_precopy(RAMState *rs)
1900{
1901    Error *local_err = NULL;
1902
1903    /*
1904     * The current notifier usage is just an optimization to migration, so we
1905     * don't stop the normal migration process in the error case.
1906     */
1907    if (precopy_notify(PRECOPY_NOTIFY_BEFORE_BITMAP_SYNC, &local_err)) {
1908        error_report_err(local_err);
1909        local_err = NULL;
1910    }
1911
1912    migration_bitmap_sync(rs);
1913
1914    if (precopy_notify(PRECOPY_NOTIFY_AFTER_BITMAP_SYNC, &local_err)) {
1915        error_report_err(local_err);
1916    }
1917}
1918
1919/**
1920 * save_zero_page_to_file: send the zero page to the file
1921 *
1922 * Returns the size of data written to the file, 0 means the page is not
1923 * a zero page
1924 *
1925 * @rs: current RAM state
1926 * @file: the file where the data is saved
1927 * @block: block that contains the page we want to send
1928 * @offset: offset inside the block for the page
1929 */
1930static int save_zero_page_to_file(RAMState *rs, QEMUFile *file,
1931                                  RAMBlock *block, ram_addr_t offset)
1932{
1933    uint8_t *p = block->host + offset;
1934    int len = 0;
1935
1936    if (is_zero_range(p, TARGET_PAGE_SIZE)) {
1937        len += save_page_header(rs, file, block, offset | RAM_SAVE_FLAG_ZERO);
1938        qemu_put_byte(file, 0);
1939        len += 1;
1940    }
1941    return len;
1942}
1943
1944/**
1945 * save_zero_page: send the zero page to the stream
1946 *
1947 * Returns the number of pages written.
1948 *
1949 * @rs: current RAM state
1950 * @block: block that contains the page we want to send
1951 * @offset: offset inside the block for the page
1952 */
1953static int save_zero_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
1954{
1955    int len = save_zero_page_to_file(rs, rs->f, block, offset);
1956
1957    if (len) {
1958        ram_counters.duplicate++;
1959        ram_counters.transferred += len;
1960        return 1;
1961    }
1962    return -1;
1963}
1964
1965static void ram_release_pages(const char *rbname, uint64_t offset, int pages)
1966{
1967    if (!migrate_release_ram() || !migration_in_postcopy()) {
1968        return;
1969    }
1970
1971    ram_discard_range(rbname, offset, pages << TARGET_PAGE_BITS);
1972}
1973
1974/*
1975 * @pages: the number of pages written by the control path,
1976 *        < 0 - error
1977 *        > 0 - number of pages written
1978 *
1979 * Return true if the pages has been saved, otherwise false is returned.
1980 */
1981static bool control_save_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
1982                              int *pages)
1983{
1984    uint64_t bytes_xmit = 0;
1985    int ret;
1986
1987    *pages = -1;
1988    ret = ram_control_save_page(rs->f, block->offset, offset, TARGET_PAGE_SIZE,
1989                                &bytes_xmit);
1990    if (ret == RAM_SAVE_CONTROL_NOT_SUPP) {
1991        return false;
1992    }
1993
1994    if (bytes_xmit) {
1995        ram_counters.transferred += bytes_xmit;
1996        *pages = 1;
1997    }
1998
1999    if (ret == RAM_SAVE_CONTROL_DELAYED) {
2000        return true;
2001    }
2002
2003    if (bytes_xmit > 0) {
2004        ram_counters.normal++;
2005    } else if (bytes_xmit == 0) {
2006        ram_counters.duplicate++;
2007    }
2008
2009    return true;
2010}
2011
2012/*
2013 * directly send the page to the stream
2014 *
2015 * Returns the number of pages written.
2016 *
2017 * @rs: current RAM state
2018 * @block: block that contains the page we want to send
2019 * @offset: offset inside the block for the page
2020 * @buf: the page to be sent
2021 * @async: send to page asyncly
2022 */
2023static int save_normal_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
2024                            uint8_t *buf, bool async)
2025{
2026    ram_counters.transferred += save_page_header(rs, rs->f, block,
2027                                                 offset | RAM_SAVE_FLAG_PAGE);
2028    if (async) {
2029        qemu_put_buffer_async(rs->f, buf, TARGET_PAGE_SIZE,
2030                              migrate_release_ram() &
2031                              migration_in_postcopy());
2032    } else {
2033        qemu_put_buffer(rs->f, buf, TARGET_PAGE_SIZE);
2034    }
2035    ram_counters.transferred += TARGET_PAGE_SIZE;
2036    ram_counters.normal++;
2037    return 1;
2038}
2039
2040/**
2041 * ram_save_page: send the given page to the stream
2042 *
2043 * Returns the number of pages written.
2044 *          < 0 - error
2045 *          >=0 - Number of pages written - this might legally be 0
2046 *                if xbzrle noticed the page was the same.
2047 *
2048 * @rs: current RAM state
2049 * @block: block that contains the page we want to send
2050 * @offset: offset inside the block for the page
2051 * @last_stage: if we are at the completion stage
2052 */
2053static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
2054{
2055    int pages = -1;
2056    uint8_t *p;
2057    bool send_async = true;
2058    RAMBlock *block = pss->block;
2059    ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
2060    ram_addr_t current_addr = block->offset + offset;
2061
2062    p = block->host + offset;
2063    trace_ram_save_page(block->idstr, (uint64_t)offset, p);
2064
2065    XBZRLE_cache_lock();
2066    if (!rs->ram_bulk_stage && !migration_in_postcopy() &&
2067        migrate_use_xbzrle()) {
2068        pages = save_xbzrle_page(rs, &p, current_addr, block,
2069                                 offset, last_stage);
2070        if (!last_stage) {
2071            /* Can't send this cached data async, since the cache page
2072             * might get updated before it gets to the wire
2073             */
2074            send_async = false;
2075        }
2076    }
2077
2078    /* XBZRLE overflow or normal page */
2079    if (pages == -1) {
2080        pages = save_normal_page(rs, block, offset, p, send_async);
2081    }
2082
2083    XBZRLE_cache_unlock();
2084
2085    return pages;
2086}
2087
2088static int ram_save_multifd_page(RAMState *rs, RAMBlock *block,
2089                                 ram_addr_t offset)
2090{
2091    if (multifd_queue_page(rs, block, offset) < 0) {
2092        return -1;
2093    }
2094    ram_counters.normal++;
2095
2096    return 1;
2097}
2098
2099static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
2100                                 ram_addr_t offset, uint8_t *source_buf)
2101{
2102    RAMState *rs = ram_state;
2103    uint8_t *p = block->host + (offset & TARGET_PAGE_MASK);
2104    bool zero_page = false;
2105    int ret;
2106
2107    if (save_zero_page_to_file(rs, f, block, offset)) {
2108        zero_page = true;
2109        goto exit;
2110    }
2111
2112    save_page_header(rs, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
2113
2114    /*
2115     * copy it to a internal buffer to avoid it being modified by VM
2116     * so that we can catch up the error during compression and
2117     * decompression
2118     */
2119    memcpy(source_buf, p, TARGET_PAGE_SIZE);
2120    ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
2121    if (ret < 0) {
2122        qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
2123        error_report("compressed data failed!");
2124        return false;
2125    }
2126
2127exit:
2128    ram_release_pages(block->idstr, offset & TARGET_PAGE_MASK, 1);
2129    return zero_page;
2130}
2131
2132static void
2133update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
2134{
2135    ram_counters.transferred += bytes_xmit;
2136
2137    if (param->zero_page) {
2138        ram_counters.duplicate++;
2139        return;
2140    }
2141
2142    /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */
2143    compression_counters.compressed_size += bytes_xmit - 8;
2144    compression_counters.pages++;
2145}
2146
2147static bool save_page_use_compression(RAMState *rs);
2148
2149static void flush_compressed_data(RAMState *rs)
2150{
2151    int idx, len, thread_count;
2152
2153    if (!save_page_use_compression(rs)) {
2154        return;
2155    }
2156    thread_count = migrate_compress_threads();
2157
2158    qemu_mutex_lock(&comp_done_lock);
2159    for (idx = 0; idx < thread_count; idx++) {
2160        while (!comp_param[idx].done) {
2161            qemu_cond_wait(&comp_done_cond, &comp_done_lock);
2162        }
2163    }
2164    qemu_mutex_unlock(&comp_done_lock);
2165
2166    for (idx = 0; idx < thread_count; idx++) {
2167        qemu_mutex_lock(&comp_param[idx].mutex);
2168        if (!comp_param[idx].quit) {
2169            len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
2170            /*
2171             * it's safe to fetch zero_page without holding comp_done_lock
2172             * as there is no further request submitted to the thread,
2173             * i.e, the thread should be waiting for a request at this point.
2174             */
2175            update_compress_thread_counts(&comp_param[idx], len);
2176        }
2177        qemu_mutex_unlock(&comp_param[idx].mutex);
2178    }
2179}
2180
2181static inline void set_compress_params(CompressParam *param, RAMBlock *block,
2182                                       ram_addr_t offset)
2183{
2184    param->block = block;
2185    param->offset = offset;
2186}
2187
2188static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
2189                                           ram_addr_t offset)
2190{
2191    int idx, thread_count, bytes_xmit = -1, pages = -1;
2192    bool wait = migrate_compress_wait_thread();
2193
2194    thread_count = migrate_compress_threads();
2195    qemu_mutex_lock(&comp_done_lock);
2196retry:
2197    for (idx = 0; idx < thread_count; idx++) {
2198        if (comp_param[idx].done) {
2199            comp_param[idx].done = false;
2200            bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
2201            qemu_mutex_lock(&comp_param[idx].mutex);
2202            set_compress_params(&comp_param[idx], block, offset);
2203            qemu_cond_signal(&comp_param[idx].cond);
2204            qemu_mutex_unlock(&comp_param[idx].mutex);
2205            pages = 1;
2206            update_compress_thread_counts(&comp_param[idx], bytes_xmit);
2207            break;
2208        }
2209    }
2210
2211    /*
2212     * wait for the free thread if the user specifies 'compress-wait-thread',
2213     * otherwise we will post the page out in the main thread as normal page.
2214     */
2215    if (pages < 0 && wait) {
2216        qemu_cond_wait(&comp_done_cond, &comp_done_lock);
2217        goto retry;
2218    }
2219    qemu_mutex_unlock(&comp_done_lock);
2220
2221    return pages;
2222}
2223
2224/**
2225 * find_dirty_block: find the next dirty page and update any state
2226 * associated with the search process.
2227 *
2228 * Returns true if a page is found
2229 *
2230 * @rs: current RAM state
2231 * @pss: data about the state of the current dirty page scan
2232 * @again: set to false if the search has scanned the whole of RAM
2233 */
2234static bool find_dirty_block(RAMState *rs, PageSearchStatus *pss, bool *again)
2235{
2236    pss->page = migration_bitmap_find_dirty(rs, pss->block, pss->page);
2237    if (pss->complete_round && pss->block == rs->last_seen_block &&
2238        pss->page >= rs->last_page) {
2239        /*
2240         * We've been once around the RAM and haven't found anything.
2241         * Give up.
2242         */
2243        *again = false;
2244        return false;
2245    }
2246    if ((pss->page << TARGET_PAGE_BITS) >= pss->block->used_length) {
2247        /* Didn't find anything in this RAM Block */
2248        pss->page = 0;
2249        pss->block = QLIST_NEXT_RCU(pss->block, next);
2250        if (!pss->block) {
2251            /*
2252             * If memory migration starts over, we will meet a dirtied page
2253             * which may still exists in compression threads's ring, so we
2254             * should flush the compressed data to make sure the new page
2255             * is not overwritten by the old one in the destination.
2256             *
2257             * Also If xbzrle is on, stop using the data compression at this
2258             * point. In theory, xbzrle can do better than compression.
2259             */
2260            flush_compressed_data(rs);
2261
2262            /* Hit the end of the list */
2263            pss->block = QLIST_FIRST_RCU(&ram_list.blocks);
2264            /* Flag that we've looped */
2265            pss->complete_round = true;
2266            rs->ram_bulk_stage = false;
2267        }
2268        /* Didn't find anything this time, but try again on the new block */
2269        *again = true;
2270        return false;
2271    } else {
2272        /* Can go around again, but... */
2273        *again = true;
2274        /* We've found something so probably don't need to */
2275        return true;
2276    }
2277}
2278
2279/**
2280 * unqueue_page: gets a page of the queue
2281 *
2282 * Helper for 'get_queued_page' - gets a page off the queue
2283 *
2284 * Returns the block of the page (or NULL if none available)
2285 *
2286 * @rs: current RAM state
2287 * @offset: used to return the offset within the RAMBlock
2288 */
2289static RAMBlock *unqueue_page(RAMState *rs, ram_addr_t *offset)
2290{
2291    RAMBlock *block = NULL;
2292
2293    if (QSIMPLEQ_EMPTY_ATOMIC(&rs->src_page_requests)) {
2294        return NULL;
2295    }
2296
2297    qemu_mutex_lock(&rs->src_page_req_mutex);
2298    if (!QSIMPLEQ_EMPTY(&rs->src_page_requests)) {
2299        struct RAMSrcPageRequest *entry =
2300                                QSIMPLEQ_FIRST(&rs->src_page_requests);
2301        block = entry->rb;
2302        *offset = entry->offset;
2303
2304        if (entry->len > TARGET_PAGE_SIZE) {
2305            entry->len -= TARGET_PAGE_SIZE;
2306            entry->offset += TARGET_PAGE_SIZE;
2307        } else {
2308            memory_region_unref(block->mr);
2309            QSIMPLEQ_REMOVE_HEAD(&rs->src_page_requests, next_req);
2310            g_free(entry);
2311            migration_consume_urgent_request();
2312        }
2313    }
2314    qemu_mutex_unlock(&rs->src_page_req_mutex);
2315
2316    return block;
2317}
2318
2319/**
2320 * get_queued_page: unqueue a page from the postcopy requests
2321 *
2322 * Skips pages that are already sent (!dirty)
2323 *
2324 * Returns true if a queued page is found
2325 *
2326 * @rs: current RAM state
2327 * @pss: data about the state of the current dirty page scan
2328 */
2329static bool get_queued_page(RAMState *rs, PageSearchStatus *pss)
2330{
2331    RAMBlock  *block;
2332    ram_addr_t offset;
2333    bool dirty;
2334
2335    do {
2336        block = unqueue_page(rs, &offset);
2337        /*
2338         * We're sending this page, and since it's postcopy nothing else
2339         * will dirty it, and we must make sure it doesn't get sent again
2340         * even if this queue request was received after the background
2341         * search already sent it.
2342         */
2343        if (block) {
2344            unsigned long page;
2345
2346            page = offset >> TARGET_PAGE_BITS;
2347            dirty = test_bit(page, block->bmap);
2348            if (!dirty) {
2349                trace_get_queued_page_not_dirty(block->idstr, (uint64_t)offset,
2350                                                page);
2351            } else {
2352                trace_get_queued_page(block->idstr, (uint64_t)offset, page);
2353            }
2354        }
2355
2356    } while (block && !dirty);
2357
2358    if (block) {
2359        /*
2360         * As soon as we start servicing pages out of order, then we have
2361         * to kill the bulk stage, since the bulk stage assumes
2362         * in (migration_bitmap_find_and_reset_dirty) that every page is
2363         * dirty, that's no longer true.
2364         */
2365        rs->ram_bulk_stage = false;
2366
2367        /*
2368         * We want the background search to continue from the queued page
2369         * since the guest is likely to want other pages near to the page
2370         * it just requested.
2371         */
2372        pss->block = block;
2373        pss->page = offset >> TARGET_PAGE_BITS;
2374
2375        /*
2376         * This unqueued page would break the "one round" check, even is
2377         * really rare.
2378         */
2379        pss->complete_round = false;
2380    }
2381
2382    return !!block;
2383}
2384
2385/**
2386 * migration_page_queue_free: drop any remaining pages in the ram
2387 * request queue
2388 *
2389 * It should be empty at the end anyway, but in error cases there may
2390 * be some left.  in case that there is any page left, we drop it.
2391 *
2392 */
2393static void migration_page_queue_free(RAMState *rs)
2394{
2395    struct RAMSrcPageRequest *mspr, *next_mspr;
2396    /* This queue generally should be empty - but in the case of a failed
2397     * migration might have some droppings in.
2398     */
2399    RCU_READ_LOCK_GUARD();
2400    QSIMPLEQ_FOREACH_SAFE(mspr, &rs->src_page_requests, next_req, next_mspr) {
2401        memory_region_unref(mspr->rb->mr);
2402        QSIMPLEQ_REMOVE_HEAD(&rs->src_page_requests, next_req);
2403        g_free(mspr);
2404    }
2405}
2406
2407/**
2408 * ram_save_queue_pages: queue the page for transmission
2409 *
2410 * A request from postcopy destination for example.
2411 *
2412 * Returns zero on success or negative on error
2413 *
2414 * @rbname: Name of the RAMBLock of the request. NULL means the
2415 *          same that last one.
2416 * @start: starting address from the start of the RAMBlock
2417 * @len: length (in bytes) to send
2418 */
2419int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len)
2420{
2421    RAMBlock *ramblock;
2422    RAMState *rs = ram_state;
2423
2424    ram_counters.postcopy_requests++;
2425    RCU_READ_LOCK_GUARD();
2426
2427    if (!rbname) {
2428        /* Reuse last RAMBlock */
2429        ramblock = rs->last_req_rb;
2430
2431        if (!ramblock) {
2432            /*
2433             * Shouldn't happen, we can't reuse the last RAMBlock if
2434             * it's the 1st request.
2435             */
2436            error_report("ram_save_queue_pages no previous block");
2437            goto err;
2438        }
2439    } else {
2440        ramblock = qemu_ram_block_by_name(rbname);
2441
2442        if (!ramblock) {
2443            /* We shouldn't be asked for a non-existent RAMBlock */
2444            error_report("ram_save_queue_pages no block '%s'", rbname);
2445            goto err;
2446        }
2447        rs->last_req_rb = ramblock;
2448    }
2449    trace_ram_save_queue_pages(ramblock->idstr, start, len);
2450    if (start+len > ramblock->used_length) {
2451        error_report("%s request overrun start=" RAM_ADDR_FMT " len="
2452                     RAM_ADDR_FMT " blocklen=" RAM_ADDR_FMT,
2453                     __func__, start, len, ramblock->used_length);
2454        goto err;
2455    }
2456
2457    struct RAMSrcPageRequest *new_entry =
2458        g_malloc0(sizeof(struct RAMSrcPageRequest));
2459    new_entry->rb = ramblock;
2460    new_entry->offset = start;
2461    new_entry->len = len;
2462
2463    memory_region_ref(ramblock->mr);
2464    qemu_mutex_lock(&rs->src_page_req_mutex);
2465    QSIMPLEQ_INSERT_TAIL(&rs->src_page_requests, new_entry, next_req);
2466    migration_make_urgent_request();
2467    qemu_mutex_unlock(&rs->src_page_req_mutex);
2468
2469    return 0;
2470
2471err:
2472    return -1;
2473}
2474
2475static bool save_page_use_compression(RAMState *rs)
2476{
2477    if (!migrate_use_compression()) {
2478        return false;
2479    }
2480
2481    /*
2482     * If xbzrle is on, stop using the data compression after first
2483     * round of migration even if compression is enabled. In theory,
2484     * xbzrle can do better than compression.
2485     */
2486    if (rs->ram_bulk_stage || !migrate_use_xbzrle()) {
2487        return true;
2488    }
2489
2490    return false;
2491}
2492
2493/*
2494 * try to compress the page before posting it out, return true if the page
2495 * has been properly handled by compression, otherwise needs other
2496 * paths to handle it
2497 */
2498static bool save_compress_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
2499{
2500    if (!save_page_use_compression(rs)) {
2501        return false;
2502    }
2503
2504    /*
2505     * When starting the process of a new block, the first page of
2506     * the block should be sent out before other pages in the same
2507     * block, and all the pages in last block should have been sent
2508     * out, keeping this order is important, because the 'cont' flag
2509     * is used to avoid resending the block name.
2510     *
2511     * We post the fist page as normal page as compression will take
2512     * much CPU resource.
2513     */
2514    if (block != rs->last_sent_block) {
2515        flush_compressed_data(rs);
2516        return false;
2517    }
2518
2519    if (compress_page_with_multi_thread(rs, block, offset) > 0) {
2520        return true;
2521    }
2522
2523    compression_counters.busy++;
2524    return false;
2525}
2526
2527/**
2528 * ram_save_target_page: save one target page
2529 *
2530 * Returns the number of pages written
2531 *
2532 * @rs: current RAM state
2533 * @pss: data about the page we want to send
2534 * @last_stage: if we are at the completion stage
2535 */
2536static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
2537                                bool last_stage)
2538{
2539    RAMBlock *block = pss->block;
2540    ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
2541    int res;
2542
2543    if (control_save_page(rs, block, offset, &res)) {
2544        return res;
2545    }
2546
2547    if (save_compress_page(rs, block, offset)) {
2548        return 1;
2549    }
2550
2551    res = save_zero_page(rs, block, offset);
2552    if (res > 0) {
2553        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
2554         * page would be stale
2555         */
2556        if (!save_page_use_compression(rs)) {
2557            XBZRLE_cache_lock();
2558            xbzrle_cache_zero_page(rs, block->offset + offset);
2559            XBZRLE_cache_unlock();
2560        }
2561        ram_release_pages(block->idstr, offset, res);
2562        return res;
2563    }
2564
2565    /*
2566     * do not use multifd for compression as the first page in the new
2567     * block should be posted out before sending the compressed page
2568     */
2569    if (!save_page_use_compression(rs) && migrate_use_multifd()) {
2570        return ram_save_multifd_page(rs, block, offset);
2571    }
2572
2573    return ram_save_page(rs, pss, last_stage);
2574}
2575
2576/**
2577 * ram_save_host_page: save a whole host page
2578 *
2579 * Starting at *offset send pages up to the end of the current host
2580 * page. It's valid for the initial offset to point into the middle of
2581 * a host page in which case the remainder of the hostpage is sent.
2582 * Only dirty target pages are sent. Note that the host page size may
2583 * be a huge page for this block.
2584 * The saving stops at the boundary of the used_length of the block
2585 * if the RAMBlock isn't a multiple of the host page size.
2586 *
2587 * Returns the number of pages written or negative on error
2588 *
2589 * @rs: current RAM state
2590 * @ms: current migration state
2591 * @pss: data about the page we want to send
2592 * @last_stage: if we are at the completion stage
2593 */
2594static int ram_save_host_page(RAMState *rs, PageSearchStatus *pss,
2595                              bool last_stage)
2596{
2597    int tmppages, pages = 0;
2598    size_t pagesize_bits =
2599        qemu_ram_pagesize(pss->block) >> TARGET_PAGE_BITS;
2600
2601    if (ramblock_is_ignored(pss->block)) {
2602        error_report("block %s should not be migrated !", pss->block->idstr);
2603        return 0;
2604    }
2605
2606    do {
2607        /* Check the pages is dirty and if it is send it */
2608        if (!migration_bitmap_clear_dirty(rs, pss->block, pss->page)) {
2609            pss->page++;
2610            continue;
2611        }
2612
2613        tmppages = ram_save_target_page(rs, pss, last_stage);
2614        if (tmppages < 0) {
2615            return tmppages;
2616        }
2617
2618        pages += tmppages;
2619        pss->page++;
2620        /* Allow rate limiting to happen in the middle of huge pages */
2621        migration_rate_limit();
2622    } while ((pss->page & (pagesize_bits - 1)) &&
2623             offset_in_ramblock(pss->block, pss->page << TARGET_PAGE_BITS));
2624
2625    /* The offset we leave with is the last one we looked at */
2626    pss->page--;
2627    return pages;
2628}
2629
2630/**
2631 * ram_find_and_save_block: finds a dirty page and sends it to f
2632 *
2633 * Called within an RCU critical section.
2634 *
2635 * Returns the number of pages written where zero means no dirty pages,
2636 * or negative on error
2637 *
2638 * @rs: current RAM state
2639 * @last_stage: if we are at the completion stage
2640 *
2641 * On systems where host-page-size > target-page-size it will send all the
2642 * pages in a host page that are dirty.
2643 */
2644
2645static int ram_find_and_save_block(RAMState *rs, bool last_stage)
2646{
2647    PageSearchStatus pss;
2648    int pages = 0;
2649    bool again, found;
2650
2651    /* No dirty page as there is zero RAM */
2652    if (!ram_bytes_total()) {
2653        return pages;
2654    }
2655
2656    pss.block = rs->last_seen_block;
2657    pss.page = rs->last_page;
2658    pss.complete_round = false;
2659
2660    if (!pss.block) {
2661        pss.block = QLIST_FIRST_RCU(&ram_list.blocks);
2662    }
2663
2664    do {
2665        again = true;
2666        found = get_queued_page(rs, &pss);
2667
2668        if (!found) {
2669            /* priority queue empty, so just search for something dirty */
2670            found = find_dirty_block(rs, &pss, &again);
2671        }
2672
2673        if (found) {
2674            pages = ram_save_host_page(rs, &pss, last_stage);
2675        }
2676    } while (!pages && again);
2677
2678    rs->last_seen_block = pss.block;
2679    rs->last_page = pss.page;
2680
2681    return pages;
2682}
2683
2684void acct_update_position(QEMUFile *f, size_t size, bool zero)
2685{
2686    uint64_t pages = size / TARGET_PAGE_SIZE;
2687
2688    if (zero) {
2689        ram_counters.duplicate += pages;
2690    } else {
2691        ram_counters.normal += pages;
2692        ram_counters.transferred += size;
2693        qemu_update_position(f, size);
2694    }
2695}
2696
2697static uint64_t ram_bytes_total_common(bool count_ignored)
2698{
2699    RAMBlock *block;
2700    uint64_t total = 0;
2701
2702    RCU_READ_LOCK_GUARD();
2703
2704    if (count_ignored) {
2705        RAMBLOCK_FOREACH_MIGRATABLE(block) {
2706            total += block->used_length;
2707        }
2708    } else {
2709        RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2710            total += block->used_length;
2711        }
2712    }
2713    return total;
2714}
2715
2716uint64_t ram_bytes_total(void)
2717{
2718    return ram_bytes_total_common(false);
2719}
2720
2721static void xbzrle_load_setup(void)
2722{
2723    XBZRLE.decoded_buf = g_malloc(TARGET_PAGE_SIZE);
2724}
2725
2726static void xbzrle_load_cleanup(void)
2727{
2728    g_free(XBZRLE.decoded_buf);
2729    XBZRLE.decoded_buf = NULL;
2730}
2731
2732static void ram_state_cleanup(RAMState **rsp)
2733{
2734    if (*rsp) {
2735        migration_page_queue_free(*rsp);
2736        qemu_mutex_destroy(&(*rsp)->bitmap_mutex);
2737        qemu_mutex_destroy(&(*rsp)->src_page_req_mutex);
2738        g_free(*rsp);
2739        *rsp = NULL;
2740    }
2741}
2742
2743static void xbzrle_cleanup(void)
2744{
2745    XBZRLE_cache_lock();
2746    if (XBZRLE.cache) {
2747        cache_fini(XBZRLE.cache);
2748        g_free(XBZRLE.encoded_buf);
2749        g_free(XBZRLE.current_buf);
2750        g_free(XBZRLE.zero_target_page);
2751        XBZRLE.cache = NULL;
2752        XBZRLE.encoded_buf = NULL;
2753        XBZRLE.current_buf = NULL;
2754        XBZRLE.zero_target_page = NULL;
2755    }
2756    XBZRLE_cache_unlock();
2757}
2758
2759static void ram_save_cleanup(void *opaque)
2760{
2761    RAMState **rsp = opaque;
2762    RAMBlock *block;
2763
2764    /* caller have hold iothread lock or is in a bh, so there is
2765     * no writing race against the migration bitmap
2766     */
2767    memory_global_dirty_log_stop();
2768
2769    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2770        g_free(block->clear_bmap);
2771        block->clear_bmap = NULL;
2772        g_free(block->bmap);
2773        block->bmap = NULL;
2774    }
2775
2776    xbzrle_cleanup();
2777    compress_threads_save_cleanup();
2778    ram_state_cleanup(rsp);
2779}
2780
2781static void ram_state_reset(RAMState *rs)
2782{
2783    rs->last_seen_block = NULL;
2784    rs->last_sent_block = NULL;
2785    rs->last_page = 0;
2786    rs->last_version = ram_list.version;
2787    rs->ram_bulk_stage = true;
2788    rs->fpo_enabled = false;
2789}
2790
2791#define MAX_WAIT 50 /* ms, half buffered_file limit */
2792
2793/*
2794 * 'expected' is the value you expect the bitmap mostly to be full
2795 * of; it won't bother printing lines that are all this value.
2796 * If 'todump' is null the migration bitmap is dumped.
2797 */
2798void ram_debug_dump_bitmap(unsigned long *todump, bool expected,
2799                           unsigned long pages)
2800{
2801    int64_t cur;
2802    int64_t linelen = 128;
2803    char linebuf[129];
2804
2805    for (cur = 0; cur < pages; cur += linelen) {
2806        int64_t curb;
2807        bool found = false;
2808        /*
2809         * Last line; catch the case where the line length
2810         * is longer than remaining ram
2811         */
2812        if (cur + linelen > pages) {
2813            linelen = pages - cur;
2814        }
2815        for (curb = 0; curb < linelen; curb++) {
2816            bool thisbit = test_bit(cur + curb, todump);
2817            linebuf[curb] = thisbit ? '1' : '.';
2818            found = found || (thisbit != expected);
2819        }
2820        if (found) {
2821            linebuf[curb] = '\0';
2822            fprintf(stderr,  "0x%08" PRIx64 " : %s\n", cur, linebuf);
2823        }
2824    }
2825}
2826
2827/* **** functions for postcopy ***** */
2828
2829void ram_postcopy_migrated_memory_release(MigrationState *ms)
2830{
2831    struct RAMBlock *block;
2832
2833    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2834        unsigned long *bitmap = block->bmap;
2835        unsigned long range = block->used_length >> TARGET_PAGE_BITS;
2836        unsigned long run_start = find_next_zero_bit(bitmap, range, 0);
2837
2838        while (run_start < range) {
2839            unsigned long run_end = find_next_bit(bitmap, range, run_start + 1);
2840            ram_discard_range(block->idstr, run_start << TARGET_PAGE_BITS,
2841                              (run_end - run_start) << TARGET_PAGE_BITS);
2842            run_start = find_next_zero_bit(bitmap, range, run_end + 1);
2843        }
2844    }
2845}
2846
2847/**
2848 * postcopy_send_discard_bm_ram: discard a RAMBlock
2849 *
2850 * Returns zero on success
2851 *
2852 * Callback from postcopy_each_ram_send_discard for each RAMBlock
2853 *
2854 * @ms: current migration state
2855 * @block: RAMBlock to discard
2856 */
2857static int postcopy_send_discard_bm_ram(MigrationState *ms, RAMBlock *block)
2858{
2859    unsigned long end = block->used_length >> TARGET_PAGE_BITS;
2860    unsigned long current;
2861    unsigned long *bitmap = block->bmap;
2862
2863    for (current = 0; current < end; ) {
2864        unsigned long one = find_next_bit(bitmap, end, current);
2865        unsigned long zero, discard_length;
2866
2867        if (one >= end) {
2868            break;
2869        }
2870
2871        zero = find_next_zero_bit(bitmap, end, one + 1);
2872
2873        if (zero >= end) {
2874            discard_length = end - one;
2875        } else {
2876            discard_length = zero - one;
2877        }
2878        postcopy_discard_send_range(ms, one, discard_length);
2879        current = one + discard_length;
2880    }
2881
2882    return 0;
2883}
2884
2885/**
2886 * postcopy_each_ram_send_discard: discard all RAMBlocks
2887 *
2888 * Returns 0 for success or negative for error
2889 *
2890 * Utility for the outgoing postcopy code.
2891 *   Calls postcopy_send_discard_bm_ram for each RAMBlock
2892 *   passing it bitmap indexes and name.
2893 * (qemu_ram_foreach_block ends up passing unscaled lengths
2894 *  which would mean postcopy code would have to deal with target page)
2895 *
2896 * @ms: current migration state
2897 */
2898static int postcopy_each_ram_send_discard(MigrationState *ms)
2899{
2900    struct RAMBlock *block;
2901    int ret;
2902
2903    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2904        postcopy_discard_send_init(ms, block->idstr);
2905
2906        /*
2907         * Postcopy sends chunks of bitmap over the wire, but it
2908         * just needs indexes at this point, avoids it having
2909         * target page specific code.
2910         */
2911        ret = postcopy_send_discard_bm_ram(ms, block);
2912        postcopy_discard_send_finish(ms);
2913        if (ret) {
2914            return ret;
2915        }
2916    }
2917
2918    return 0;
2919}
2920
2921/**
2922 * postcopy_chunk_hostpages_pass: canonicalize bitmap in hostpages
2923 *
2924 * Helper for postcopy_chunk_hostpages; it's called twice to
2925 * canonicalize the two bitmaps, that are similar, but one is
2926 * inverted.
2927 *
2928 * Postcopy requires that all target pages in a hostpage are dirty or
2929 * clean, not a mix.  This function canonicalizes the bitmaps.
2930 *
2931 * @ms: current migration state
2932 * @block: block that contains the page we want to canonicalize
2933 */
2934static void postcopy_chunk_hostpages_pass(MigrationState *ms, RAMBlock *block)
2935{
2936    RAMState *rs = ram_state;
2937    unsigned long *bitmap = block->bmap;
2938    unsigned int host_ratio = block->page_size / TARGET_PAGE_SIZE;
2939    unsigned long pages = block->used_length >> TARGET_PAGE_BITS;
2940    unsigned long run_start;
2941
2942    if (block->page_size == TARGET_PAGE_SIZE) {
2943        /* Easy case - TPS==HPS for a non-huge page RAMBlock */
2944        return;
2945    }
2946
2947    /* Find a dirty page */
2948    run_start = find_next_bit(bitmap, pages, 0);
2949
2950    while (run_start < pages) {
2951
2952        /*
2953         * If the start of this run of pages is in the middle of a host
2954         * page, then we need to fixup this host page.
2955         */
2956        if (QEMU_IS_ALIGNED(run_start, host_ratio)) {
2957            /* Find the end of this run */
2958            run_start = find_next_zero_bit(bitmap, pages, run_start + 1);
2959            /*
2960             * If the end isn't at the start of a host page, then the
2961             * run doesn't finish at the end of a host page
2962             * and we need to discard.
2963             */
2964        }
2965
2966        if (!QEMU_IS_ALIGNED(run_start, host_ratio)) {
2967            unsigned long page;
2968            unsigned long fixup_start_addr = QEMU_ALIGN_DOWN(run_start,
2969                                                             host_ratio);
2970            run_start = QEMU_ALIGN_UP(run_start, host_ratio);
2971
2972            /* Clean up the bitmap */
2973            for (page = fixup_start_addr;
2974                 page < fixup_start_addr + host_ratio; page++) {
2975                /*
2976                 * Remark them as dirty, updating the count for any pages
2977                 * that weren't previously dirty.
2978                 */
2979                rs->migration_dirty_pages += !test_and_set_bit(page, bitmap);
2980            }
2981        }
2982
2983        /* Find the next dirty page for the next iteration */
2984        run_start = find_next_bit(bitmap, pages, run_start);
2985    }
2986}
2987
2988/**
2989 * postcopy_chunk_hostpages: discard any partially sent host page
2990 *
2991 * Utility for the outgoing postcopy code.
2992 *
2993 * Discard any partially sent host-page size chunks, mark any partially
2994 * dirty host-page size chunks as all dirty.  In this case the host-page
2995 * is the host-page for the particular RAMBlock, i.e. it might be a huge page
2996 *
2997 * Returns zero on success
2998 *
2999 * @ms: current migration state
3000 * @block: block we want to work with
3001 */
3002static int postcopy_chunk_hostpages(MigrationState *ms, RAMBlock *block)
3003{
3004    postcopy_discard_send_init(ms, block->idstr);
3005
3006    /*
3007     * Ensure that all partially dirty host pages are made fully dirty.
3008     */
3009    postcopy_chunk_hostpages_pass(ms, block);
3010
3011    postcopy_discard_send_finish(ms);
3012    return 0;
3013}
3014
3015/**
3016 * ram_postcopy_send_discard_bitmap: transmit the discard bitmap
3017 *
3018 * Returns zero on success
3019 *
3020 * Transmit the set of pages to be discarded after precopy to the target
3021 * these are pages that:
3022 *     a) Have been previously transmitted but are now dirty again
3023 *     b) Pages that have never been transmitted, this ensures that
3024 *        any pages on the destination that have been mapped by background
3025 *        tasks get discarded (transparent huge pages is the specific concern)
3026 * Hopefully this is pretty sparse
3027 *
3028 * @ms: current migration state
3029 */
3030int ram_postcopy_send_discard_bitmap(MigrationState *ms)
3031{
3032    RAMState *rs = ram_state;
3033    RAMBlock *block;
3034    int ret;
3035
3036    RCU_READ_LOCK_GUARD();
3037
3038    /* This should be our last sync, the src is now paused */
3039    migration_bitmap_sync(rs);
3040
3041    /* Easiest way to make sure we don't resume in the middle of a host-page */
3042    rs->last_seen_block = NULL;
3043    rs->last_sent_block = NULL;
3044    rs->last_page = 0;
3045
3046    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3047        /* Deal with TPS != HPS and huge pages */
3048        ret = postcopy_chunk_hostpages(ms, block);
3049        if (ret) {
3050            return ret;
3051        }
3052
3053#ifdef DEBUG_POSTCOPY
3054        ram_debug_dump_bitmap(block->bmap, true,
3055                              block->used_length >> TARGET_PAGE_BITS);
3056#endif
3057    }
3058    trace_ram_postcopy_send_discard_bitmap();
3059
3060    ret = postcopy_each_ram_send_discard(ms);
3061
3062    return ret;
3063}
3064
3065/**
3066 * ram_discard_range: discard dirtied pages at the beginning of postcopy
3067 *
3068 * Returns zero on success
3069 *
3070 * @rbname: name of the RAMBlock of the request. NULL means the
3071 *          same that last one.
3072 * @start: RAMBlock starting page
3073 * @length: RAMBlock size
3074 */
3075int ram_discard_range(const char *rbname, uint64_t start, size_t length)
3076{
3077    int ret = -1;
3078
3079    trace_ram_discard_range(rbname, start, length);
3080
3081    RCU_READ_LOCK_GUARD();
3082    RAMBlock *rb = qemu_ram_block_by_name(rbname);
3083
3084    if (!rb) {
3085        error_report("ram_discard_range: Failed to find block '%s'", rbname);
3086        goto err;
3087    }
3088
3089    /*
3090     * On source VM, we don't need to update the received bitmap since
3091     * we don't even have one.
3092     */
3093    if (rb->receivedmap) {
3094        bitmap_clear(rb->receivedmap, start >> qemu_target_page_bits(),
3095                     length >> qemu_target_page_bits());
3096    }
3097
3098    ret = ram_block_discard_range(rb, start, length);
3099
3100err:
3101    return ret;
3102}
3103
3104/*
3105 * For every allocation, we will try not to crash the VM if the
3106 * allocation failed.
3107 */
3108static int xbzrle_init(void)
3109{
3110    Error *local_err = NULL;
3111
3112    if (!migrate_use_xbzrle()) {
3113        return 0;
3114    }
3115
3116    XBZRLE_cache_lock();
3117
3118    XBZRLE.zero_target_page = g_try_malloc0(TARGET_PAGE_SIZE);
3119    if (!XBZRLE.zero_target_page) {
3120        error_report("%s: Error allocating zero page", __func__);
3121        goto err_out;
3122    }
3123
3124    XBZRLE.cache = cache_init(migrate_xbzrle_cache_size(),
3125                              TARGET_PAGE_SIZE, &local_err);
3126    if (!XBZRLE.cache) {
3127        error_report_err(local_err);
3128        goto free_zero_page;
3129    }
3130
3131    XBZRLE.encoded_buf = g_try_malloc0(TARGET_PAGE_SIZE);
3132    if (!XBZRLE.encoded_buf) {
3133        error_report("%s: Error allocating encoded_buf", __func__);
3134        goto free_cache;
3135    }
3136
3137    XBZRLE.current_buf = g_try_malloc(TARGET_PAGE_SIZE);
3138    if (!XBZRLE.current_buf) {
3139        error_report("%s: Error allocating current_buf", __func__);
3140        goto free_encoded_buf;
3141    }
3142
3143    /* We are all good */
3144    XBZRLE_cache_unlock();
3145    return 0;
3146
3147free_encoded_buf:
3148    g_free(XBZRLE.encoded_buf);
3149    XBZRLE.encoded_buf = NULL;
3150free_cache:
3151    cache_fini(XBZRLE.cache);
3152    XBZRLE.cache = NULL;
3153free_zero_page:
3154    g_free(XBZRLE.zero_target_page);
3155    XBZRLE.zero_target_page = NULL;
3156err_out:
3157    XBZRLE_cache_unlock();
3158    return -ENOMEM;
3159}
3160
3161static int ram_state_init(RAMState **rsp)
3162{
3163    *rsp = g_try_new0(RAMState, 1);
3164
3165    if (!*rsp) {
3166        error_report("%s: Init ramstate fail", __func__);
3167        return -1;
3168    }
3169
3170    qemu_mutex_init(&(*rsp)->bitmap_mutex);
3171    qemu_mutex_init(&(*rsp)->src_page_req_mutex);
3172    QSIMPLEQ_INIT(&(*rsp)->src_page_requests);
3173
3174    /*
3175     * Count the total number of pages used by ram blocks not including any
3176     * gaps due to alignment or unplugs.
3177     * This must match with the initial values of dirty bitmap.
3178     */
3179    (*rsp)->migration_dirty_pages = ram_bytes_total() >> TARGET_PAGE_BITS;
3180    ram_state_reset(*rsp);
3181
3182    return 0;
3183}
3184
3185static void ram_list_init_bitmaps(void)
3186{
3187    MigrationState *ms = migrate_get_current();
3188    RAMBlock *block;
3189    unsigned long pages;
3190    uint8_t shift;
3191
3192    /* Skip setting bitmap if there is no RAM */
3193    if (ram_bytes_total()) {
3194        shift = ms->clear_bitmap_shift;
3195        if (shift > CLEAR_BITMAP_SHIFT_MAX) {
3196            error_report("clear_bitmap_shift (%u) too big, using "
3197                         "max value (%u)", shift, CLEAR_BITMAP_SHIFT_MAX);
3198            shift = CLEAR_BITMAP_SHIFT_MAX;
3199        } else if (shift < CLEAR_BITMAP_SHIFT_MIN) {
3200            error_report("clear_bitmap_shift (%u) too small, using "
3201                         "min value (%u)", shift, CLEAR_BITMAP_SHIFT_MIN);
3202            shift = CLEAR_BITMAP_SHIFT_MIN;
3203        }
3204
3205        RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3206            pages = block->max_length >> TARGET_PAGE_BITS;
3207            /*
3208             * The initial dirty bitmap for migration must be set with all
3209             * ones to make sure we'll migrate every guest RAM page to
3210             * destination.
3211             * Here we set RAMBlock.bmap all to 1 because when rebegin a
3212             * new migration after a failed migration, ram_list.
3213             * dirty_memory[DIRTY_MEMORY_MIGRATION] don't include the whole
3214             * guest memory.
3215             */
3216            block->bmap = bitmap_new(pages);
3217            bitmap_set(block->bmap, 0, pages);
3218            block->clear_bmap_shift = shift;
3219            block->clear_bmap = bitmap_new(clear_bmap_size(pages, shift));
3220        }
3221    }
3222}
3223
3224static void ram_init_bitmaps(RAMState *rs)
3225{
3226    /* For memory_global_dirty_log_start below.  */
3227    qemu_mutex_lock_iothread();
3228    qemu_mutex_lock_ramlist();
3229
3230    WITH_RCU_READ_LOCK_GUARD() {
3231        ram_list_init_bitmaps();
3232        memory_global_dirty_log_start();
3233        migration_bitmap_sync_precopy(rs);
3234    }
3235    qemu_mutex_unlock_ramlist();
3236    qemu_mutex_unlock_iothread();
3237}
3238
3239static int ram_init_all(RAMState **rsp)
3240{
3241    if (ram_state_init(rsp)) {
3242        return -1;
3243    }
3244
3245    if (xbzrle_init()) {
3246        ram_state_cleanup(rsp);
3247        return -1;
3248    }
3249
3250    ram_init_bitmaps(*rsp);
3251
3252    return 0;
3253}
3254
3255static void ram_state_resume_prepare(RAMState *rs, QEMUFile *out)
3256{
3257    RAMBlock *block;
3258    uint64_t pages = 0;
3259
3260    /*
3261     * Postcopy is not using xbzrle/compression, so no need for that.
3262     * Also, since source are already halted, we don't need to care
3263     * about dirty page logging as well.
3264     */
3265
3266    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3267        pages += bitmap_count_one(block->bmap,
3268                                  block->used_length >> TARGET_PAGE_BITS);
3269    }
3270
3271    /* This may not be aligned with current bitmaps. Recalculate. */
3272    rs->migration_dirty_pages = pages;
3273
3274    rs->last_seen_block = NULL;
3275    rs->last_sent_block = NULL;
3276    rs->last_page = 0;
3277    rs->last_version = ram_list.version;
3278    /*
3279     * Disable the bulk stage, otherwise we'll resend the whole RAM no
3280     * matter what we have sent.
3281     */
3282    rs->ram_bulk_stage = false;
3283
3284    /* Update RAMState cache of output QEMUFile */
3285    rs->f = out;
3286
3287    trace_ram_state_resume_prepare(pages);
3288}
3289
3290/*
3291 * This function clears bits of the free pages reported by the caller from the
3292 * migration dirty bitmap. @addr is the host address corresponding to the
3293 * start of the continuous guest free pages, and @len is the total bytes of
3294 * those pages.
3295 */
3296void qemu_guest_free_page_hint(void *addr, size_t len)
3297{
3298    RAMBlock *block;
3299    ram_addr_t offset;
3300    size_t used_len, start, npages;
3301    MigrationState *s = migrate_get_current();
3302
3303    /* This function is currently expected to be used during live migration */
3304    if (!migration_is_setup_or_active(s->state)) {
3305        return;
3306    }
3307
3308    for (; len > 0; len -= used_len, addr += used_len) {
3309        block = qemu_ram_block_from_host(addr, false, &offset);
3310        if (unlikely(!block || offset >= block->used_length)) {
3311            /*
3312             * The implementation might not support RAMBlock resize during
3313             * live migration, but it could happen in theory with future
3314             * updates. So we add a check here to capture that case.
3315             */
3316            error_report_once("%s unexpected error", __func__);
3317            return;
3318        }
3319
3320        if (len <= block->used_length - offset) {
3321            used_len = len;
3322        } else {
3323            used_len = block->used_length - offset;
3324        }
3325
3326        start = offset >> TARGET_PAGE_BITS;
3327        npages = used_len >> TARGET_PAGE_BITS;
3328
3329        qemu_mutex_lock(&ram_state->bitmap_mutex);
3330        ram_state->migration_dirty_pages -=
3331                      bitmap_count_one_with_offset(block->bmap, start, npages);
3332        bitmap_clear(block->bmap, start, npages);
3333        qemu_mutex_unlock(&ram_state->bitmap_mutex);
3334    }
3335}
3336
3337/*
3338 * Each of ram_save_setup, ram_save_iterate and ram_save_complete has
3339 * long-running RCU critical section.  When rcu-reclaims in the code
3340 * start to become numerous it will be necessary to reduce the
3341 * granularity of these critical sections.
3342 */
3343
3344/**
3345 * ram_save_setup: Setup RAM for migration
3346 *
3347 * Returns zero to indicate success and negative for error
3348 *
3349 * @f: QEMUFile where to send the data
3350 * @opaque: RAMState pointer
3351 */
3352static int ram_save_setup(QEMUFile *f, void *opaque)
3353{
3354    RAMState **rsp = opaque;
3355    RAMBlock *block;
3356
3357    if (compress_threads_save_setup()) {
3358        return -1;
3359    }
3360
3361    /* migration has already setup the bitmap, reuse it. */
3362    if (!migration_in_colo_state()) {
3363        if (ram_init_all(rsp) != 0) {
3364            compress_threads_save_cleanup();
3365            return -1;
3366        }
3367    }
3368    (*rsp)->f = f;
3369
3370    WITH_RCU_READ_LOCK_GUARD() {
3371        qemu_put_be64(f, ram_bytes_total_common(true) | RAM_SAVE_FLAG_MEM_SIZE);
3372
3373        RAMBLOCK_FOREACH_MIGRATABLE(block) {
3374            qemu_put_byte(f, strlen(block->idstr));
3375            qemu_put_buffer(f, (uint8_t *)block->idstr, strlen(block->idstr));
3376            qemu_put_be64(f, block->used_length);
3377            if (migrate_postcopy_ram() && block->page_size !=
3378                                          qemu_host_page_size) {
3379                qemu_put_be64(f, block->page_size);
3380            }
3381            if (migrate_ignore_shared()) {
3382                qemu_put_be64(f, block->mr->addr);
3383            }
3384        }
3385    }
3386
3387    ram_control_before_iterate(f, RAM_CONTROL_SETUP);
3388    ram_control_after_iterate(f, RAM_CONTROL_SETUP);
3389
3390    multifd_send_sync_main(*rsp);
3391    qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3392    qemu_fflush(f);
3393
3394    return 0;
3395}
3396
3397/**
3398 * ram_save_iterate: iterative stage for migration
3399 *
3400 * Returns zero to indicate success and negative for error
3401 *
3402 * @f: QEMUFile where to send the data
3403 * @opaque: RAMState pointer
3404 */
3405static int ram_save_iterate(QEMUFile *f, void *opaque)
3406{
3407    RAMState **temp = opaque;
3408    RAMState *rs = *temp;
3409    int ret;
3410    int i;
3411    int64_t t0;
3412    int done = 0;
3413
3414    if (blk_mig_bulk_active()) {
3415        /* Avoid transferring ram during bulk phase of block migration as
3416         * the bulk phase will usually take a long time and transferring
3417         * ram updates during that time is pointless. */
3418        goto out;
3419    }
3420
3421    WITH_RCU_READ_LOCK_GUARD() {
3422        if (ram_list.version != rs->last_version) {
3423            ram_state_reset(rs);
3424        }
3425
3426        /* Read version before ram_list.blocks */
3427        smp_rmb();
3428
3429        ram_control_before_iterate(f, RAM_CONTROL_ROUND);
3430
3431        t0 = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
3432        i = 0;
3433        while ((ret = qemu_file_rate_limit(f)) == 0 ||
3434                !QSIMPLEQ_EMPTY(&rs->src_page_requests)) {
3435            int pages;
3436
3437            if (qemu_file_get_error(f)) {
3438                break;
3439            }
3440
3441            pages = ram_find_and_save_block(rs, false);
3442            /* no more pages to sent */
3443            if (pages == 0) {
3444                done = 1;
3445                break;
3446            }
3447
3448            if (pages < 0) {
3449                qemu_file_set_error(f, pages);
3450                break;
3451            }
3452
3453            rs->target_page_count += pages;
3454
3455            /*
3456             * we want to check in the 1st loop, just in case it was the 1st
3457             * time and we had to sync the dirty bitmap.
3458             * qemu_clock_get_ns() is a bit expensive, so we only check each
3459             * some iterations
3460             */
3461            if ((i & 63) == 0) {
3462                uint64_t t1 = (qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - t0) /
3463                              1000000;
3464                if (t1 > MAX_WAIT) {
3465                    trace_ram_save_iterate_big_wait(t1, i);
3466                    break;
3467                }
3468            }
3469            i++;
3470        }
3471    }
3472
3473    /*
3474     * Must occur before EOS (or any QEMUFile operation)
3475     * because of RDMA protocol.
3476     */
3477    ram_control_after_iterate(f, RAM_CONTROL_ROUND);
3478
3479out:
3480    multifd_send_sync_main(rs);
3481    qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3482    qemu_fflush(f);
3483    ram_counters.transferred += 8;
3484
3485    ret = qemu_file_get_error(f);
3486    if (ret < 0) {
3487        return ret;
3488    }
3489
3490    return done;
3491}
3492
3493/**
3494 * ram_save_complete: function called to send the remaining amount of ram
3495 *
3496 * Returns zero to indicate success or negative on error
3497 *
3498 * Called with iothread lock
3499 *
3500 * @f: QEMUFile where to send the data
3501 * @opaque: RAMState pointer
3502 */
3503static int ram_save_complete(QEMUFile *f, void *opaque)
3504{
3505    RAMState **temp = opaque;
3506    RAMState *rs = *temp;
3507    int ret = 0;
3508
3509    WITH_RCU_READ_LOCK_GUARD() {
3510        if (!migration_in_postcopy()) {
3511            migration_bitmap_sync_precopy(rs);
3512        }
3513
3514        ram_control_before_iterate(f, RAM_CONTROL_FINISH);
3515
3516        /* try transferring iterative blocks of memory */
3517
3518        /* flush all remaining blocks regardless of rate limiting */
3519        while (true) {
3520            int pages;
3521
3522            pages = ram_find_and_save_block(rs, !migration_in_colo_state());
3523            /* no more blocks to sent */
3524            if (pages == 0) {
3525                break;
3526            }
3527            if (pages < 0) {
3528                ret = pages;
3529                break;
3530            }
3531        }
3532
3533        flush_compressed_data(rs);
3534        ram_control_after_iterate(f, RAM_CONTROL_FINISH);
3535    }
3536
3537    multifd_send_sync_main(rs);
3538    qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3539    qemu_fflush(f);
3540
3541    return ret;
3542}
3543
3544static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
3545                             uint64_t *res_precopy_only,
3546                             uint64_t *res_compatible,
3547                             uint64_t *res_postcopy_only)
3548{
3549    RAMState **temp = opaque;
3550    RAMState *rs = *temp;
3551    uint64_t remaining_size;
3552
3553    remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
3554
3555    if (!migration_in_postcopy() &&
3556        remaining_size < max_size) {
3557        qemu_mutex_lock_iothread();
3558        WITH_RCU_READ_LOCK_GUARD() {
3559            migration_bitmap_sync_precopy(rs);
3560        }
3561        qemu_mutex_unlock_iothread();
3562        remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
3563    }
3564
3565    if (migrate_postcopy_ram()) {
3566        /* We can do postcopy, and all the data is postcopiable */
3567        *res_compatible += remaining_size;
3568    } else {
3569        *res_precopy_only += remaining_size;
3570    }
3571}
3572
3573static int load_xbzrle(QEMUFile *f, ram_addr_t addr, void *host)
3574{
3575    unsigned int xh_len;
3576    int xh_flags;
3577    uint8_t *loaded_data;
3578
3579    /* extract RLE header */
3580    xh_flags = qemu_get_byte(f);
3581    xh_len = qemu_get_be16(f);
3582
3583    if (xh_flags != ENCODING_FLAG_XBZRLE) {
3584        error_report("Failed to load XBZRLE page - wrong compression!");
3585        return -1;
3586    }
3587
3588    if (xh_len > TARGET_PAGE_SIZE) {
3589        error_report("Failed to load XBZRLE page - len overflow!");
3590        return -1;
3591    }
3592    loaded_data = XBZRLE.decoded_buf;
3593    /* load data and decode */
3594    /* it can change loaded_data to point to an internal buffer */
3595    qemu_get_buffer_in_place(f, &loaded_data, xh_len);
3596
3597    /* decode RLE */
3598    if (xbzrle_decode_buffer(loaded_data, xh_len, host,
3599                             TARGET_PAGE_SIZE) == -1) {
3600        error_report("Failed to load XBZRLE page - decode error!");
3601        return -1;
3602    }
3603
3604    return 0;
3605}
3606
3607/**
3608 * ram_block_from_stream: read a RAMBlock id from the migration stream
3609 *
3610 * Must be called from within a rcu critical section.
3611 *
3612 * Returns a pointer from within the RCU-protected ram_list.
3613 *
3614 * @f: QEMUFile where to read the data from
3615 * @flags: Page flags (mostly to see if it's a continuation of previous block)
3616 */
3617static inline RAMBlock *ram_block_from_stream(QEMUFile *f, int flags)
3618{
3619    static RAMBlock *block = NULL;
3620    char id[256];
3621    uint8_t len;
3622
3623    if (flags & RAM_SAVE_FLAG_CONTINUE) {
3624        if (!block) {
3625            error_report("Ack, bad migration stream!");
3626            return NULL;
3627        }
3628        return block;
3629    }
3630
3631    len = qemu_get_byte(f);
3632    qemu_get_buffer(f, (uint8_t *)id, len);
3633    id[len] = 0;
3634
3635    block = qemu_ram_block_by_name(id);
3636    if (!block) {
3637        error_report("Can't find block %s", id);
3638        return NULL;
3639    }
3640
3641    if (ramblock_is_ignored(block)) {
3642        error_report("block %s should not be migrated !", id);
3643        return NULL;
3644    }
3645
3646    return block;
3647}
3648
3649static inline void *host_from_ram_block_offset(RAMBlock *block,
3650                                               ram_addr_t offset)
3651{
3652    if (!offset_in_ramblock(block, offset)) {
3653        return NULL;
3654    }
3655
3656    return block->host + offset;
3657}
3658
3659static inline void *colo_cache_from_block_offset(RAMBlock *block,
3660                                                 ram_addr_t offset)
3661{
3662    if (!offset_in_ramblock(block, offset)) {
3663        return NULL;
3664    }
3665    if (!block->colo_cache) {
3666        error_report("%s: colo_cache is NULL in block :%s",
3667                     __func__, block->idstr);
3668        return NULL;
3669    }
3670
3671    /*
3672    * During colo checkpoint, we need bitmap of these migrated pages.
3673    * It help us to decide which pages in ram cache should be flushed
3674    * into VM's RAM later.
3675    */
3676    if (!test_and_set_bit(offset >> TARGET_PAGE_BITS, block->bmap)) {
3677        ram_state->migration_dirty_pages++;
3678    }
3679    return block->colo_cache + offset;
3680}
3681
3682/**
3683 * ram_handle_compressed: handle the zero page case
3684 *
3685 * If a page (or a whole RDMA chunk) has been
3686 * determined to be zero, then zap it.
3687 *
3688 * @host: host address for the zero page
3689 * @ch: what the page is filled from.  We only support zero
3690 * @size: size of the zero page
3691 */
3692void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
3693{
3694    if (ch != 0 || !is_zero_range(host, size)) {
3695        memset(host, ch, size);
3696    }
3697}
3698
3699/* return the size after decompression, or negative value on error */
3700static int
3701qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
3702                     const uint8_t *source, size_t source_len)
3703{
3704    int err;
3705
3706    err = inflateReset(stream);
3707    if (err != Z_OK) {
3708        return -1;
3709    }
3710
3711    stream->avail_in = source_len;
3712    stream->next_in = (uint8_t *)source;
3713    stream->avail_out = dest_len;
3714    stream->next_out = dest;
3715
3716    err = inflate(stream, Z_NO_FLUSH);
3717    if (err != Z_STREAM_END) {
3718        return -1;
3719    }
3720
3721    return stream->total_out;
3722}
3723
3724static void *do_data_decompress(void *opaque)
3725{
3726    DecompressParam *param = opaque;
3727    unsigned long pagesize;
3728    uint8_t *des;
3729    int len, ret;
3730
3731    qemu_mutex_lock(&param->mutex);
3732    while (!param->quit) {
3733        if (param->des) {
3734            des = param->des;
3735            len = param->len;
3736            param->des = 0;
3737            qemu_mutex_unlock(&param->mutex);
3738
3739            pagesize = TARGET_PAGE_SIZE;
3740
3741            ret = qemu_uncompress_data(&param->stream, des, pagesize,
3742                                       param->compbuf, len);
3743            if (ret < 0 && migrate_get_current()->decompress_error_check) {
3744                error_report("decompress data failed");
3745                qemu_file_set_error(decomp_file, ret);
3746            }
3747
3748            qemu_mutex_lock(&decomp_done_lock);
3749            param->done = true;
3750            qemu_cond_signal(&decomp_done_cond);
3751            qemu_mutex_unlock(&decomp_done_lock);
3752
3753            qemu_mutex_lock(&param->mutex);
3754        } else {
3755            qemu_cond_wait(&param->cond, &param->mutex);
3756        }
3757    }
3758    qemu_mutex_unlock(&param->mutex);
3759
3760    return NULL;
3761}
3762
3763static int wait_for_decompress_done(void)
3764{
3765    int idx, thread_count;
3766
3767    if (!migrate_use_compression()) {
3768        return 0;
3769    }
3770
3771    thread_count = migrate_decompress_threads();
3772    qemu_mutex_lock(&decomp_done_lock);
3773    for (idx = 0; idx < thread_count; idx++) {
3774        while (!decomp_param[idx].done) {
3775            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
3776        }
3777    }
3778    qemu_mutex_unlock(&decomp_done_lock);
3779    return qemu_file_get_error(decomp_file);
3780}
3781
3782static void compress_threads_load_cleanup(void)
3783{
3784    int i, thread_count;
3785
3786    if (!migrate_use_compression()) {
3787        return;
3788    }
3789    thread_count = migrate_decompress_threads();
3790    for (i = 0; i < thread_count; i++) {
3791        /*
3792         * we use it as a indicator which shows if the thread is
3793         * properly init'd or not
3794         */
3795        if (!decomp_param[i].compbuf) {
3796            break;
3797        }
3798
3799        qemu_mutex_lock(&decomp_param[i].mutex);
3800        decomp_param[i].quit = true;
3801        qemu_cond_signal(&decomp_param[i].cond);
3802        qemu_mutex_unlock(&decomp_param[i].mutex);
3803    }
3804    for (i = 0; i < thread_count; i++) {
3805        if (!decomp_param[i].compbuf) {
3806            break;
3807        }
3808
3809        qemu_thread_join(decompress_threads + i);
3810        qemu_mutex_destroy(&decomp_param[i].mutex);
3811        qemu_cond_destroy(&decomp_param[i].cond);
3812        inflateEnd(&decomp_param[i].stream);
3813        g_free(decomp_param[i].compbuf);
3814        decomp_param[i].compbuf = NULL;
3815    }
3816    g_free(decompress_threads);
3817    g_free(decomp_param);
3818    decompress_threads = NULL;
3819    decomp_param = NULL;
3820    decomp_file = NULL;
3821}
3822
3823static int compress_threads_load_setup(QEMUFile *f)
3824{
3825    int i, thread_count;
3826
3827    if (!migrate_use_compression()) {
3828        return 0;
3829    }
3830
3831    thread_count = migrate_decompress_threads();
3832    decompress_threads = g_new0(QemuThread, thread_count);
3833    decomp_param = g_new0(DecompressParam, thread_count);
3834    qemu_mutex_init(&decomp_done_lock);
3835    qemu_cond_init(&decomp_done_cond);
3836    decomp_file = f;
3837    for (i = 0; i < thread_count; i++) {
3838        if (inflateInit(&decomp_param[i].stream) != Z_OK) {
3839            goto exit;
3840        }
3841
3842        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
3843        qemu_mutex_init(&decomp_param[i].mutex);
3844        qemu_cond_init(&decomp_param[i].cond);
3845        decomp_param[i].done = true;
3846        decomp_param[i].quit = false;
3847        qemu_thread_create(decompress_threads + i, "decompress",
3848                           do_data_decompress, decomp_param + i,
3849                           QEMU_THREAD_JOINABLE);
3850    }
3851    return 0;
3852exit:
3853    compress_threads_load_cleanup();
3854    return -1;
3855}
3856
3857static void decompress_data_with_multi_threads(QEMUFile *f,
3858                                               void *host, int len)
3859{
3860    int idx, thread_count;
3861
3862    thread_count = migrate_decompress_threads();
3863    qemu_mutex_lock(&decomp_done_lock);
3864    while (true) {
3865        for (idx = 0; idx < thread_count; idx++) {
3866            if (decomp_param[idx].done) {
3867                decomp_param[idx].done = false;
3868                qemu_mutex_lock(&decomp_param[idx].mutex);
3869                qemu_get_buffer(f, decomp_param[idx].compbuf, len);
3870                decomp_param[idx].des = host;
3871                decomp_param[idx].len = len;
3872                qemu_cond_signal(&decomp_param[idx].cond);
3873                qemu_mutex_unlock(&decomp_param[idx].mutex);
3874                break;
3875            }
3876        }
3877        if (idx < thread_count) {
3878            break;
3879        } else {
3880            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
3881        }
3882    }
3883    qemu_mutex_unlock(&decomp_done_lock);
3884}
3885
3886/*
3887 * colo cache: this is for secondary VM, we cache the whole
3888 * memory of the secondary VM, it is need to hold the global lock
3889 * to call this helper.
3890 */
3891int colo_init_ram_cache(void)
3892{
3893    RAMBlock *block;
3894
3895    rcu_read_lock();
3896    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3897        block->colo_cache = qemu_anon_ram_alloc(block->used_length,
3898                                                NULL,
3899                                                false);
3900        if (!block->colo_cache) {
3901            error_report("%s: Can't alloc memory for COLO cache of block %s,"
3902                         "size 0x" RAM_ADDR_FMT, __func__, block->idstr,
3903                         block->used_length);
3904            RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3905                if (block->colo_cache) {
3906                    qemu_anon_ram_free(block->colo_cache, block->used_length);
3907                    block->colo_cache = NULL;
3908                }
3909            }
3910            return -errno;
3911        }
3912        memcpy(block->colo_cache, block->host, block->used_length);
3913    }
3914    rcu_read_unlock();
3915    /*
3916    * Record the dirty pages that sent by PVM, we use this dirty bitmap together
3917    * with to decide which page in cache should be flushed into SVM's RAM. Here
3918    * we use the same name 'ram_bitmap' as for migration.
3919    */
3920    if (ram_bytes_total()) {
3921        RAMBlock *block;
3922
3923        RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3924            unsigned long pages = block->max_length >> TARGET_PAGE_BITS;
3925
3926            block->bmap = bitmap_new(pages);
3927            bitmap_set(block->bmap, 0, pages);
3928        }
3929    }
3930    ram_state = g_new0(RAMState, 1);
3931    ram_state->migration_dirty_pages = 0;
3932    qemu_mutex_init(&ram_state->bitmap_mutex);
3933    memory_global_dirty_log_start();
3934
3935    return 0;
3936}
3937
3938/* It is need to hold the global lock to call this helper */
3939void colo_release_ram_cache(void)
3940{
3941    RAMBlock *block;
3942
3943    memory_global_dirty_log_stop();
3944    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3945        g_free(block->bmap);
3946        block->bmap = NULL;
3947    }
3948
3949    WITH_RCU_READ_LOCK_GUARD() {
3950        RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3951            if (block->colo_cache) {
3952                qemu_anon_ram_free(block->colo_cache, block->used_length);
3953                block->colo_cache = NULL;
3954            }
3955        }
3956    }
3957    qemu_mutex_destroy(&ram_state->bitmap_mutex);
3958    g_free(ram_state);
3959    ram_state = NULL;
3960}
3961
3962/**
3963 * ram_load_setup: Setup RAM for migration incoming side
3964 *
3965 * Returns zero to indicate success and negative for error
3966 *
3967 * @f: QEMUFile where to receive the data
3968 * @opaque: RAMState pointer
3969 */
3970static int ram_load_setup(QEMUFile *f, void *opaque)
3971{
3972    if (compress_threads_load_setup(f)) {
3973        return -1;
3974    }
3975
3976    xbzrle_load_setup();
3977    ramblock_recv_map_init();
3978
3979    return 0;
3980}
3981
3982static int ram_load_cleanup(void *opaque)
3983{
3984    RAMBlock *rb;
3985
3986    RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
3987        if (ramblock_is_pmem(rb)) {
3988            pmem_persist(rb->host, rb->used_length);
3989        }
3990    }
3991
3992    xbzrle_load_cleanup();
3993    compress_threads_load_cleanup();
3994
3995    RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
3996        g_free(rb->receivedmap);
3997        rb->receivedmap = NULL;
3998    }
3999
4000    return 0;
4001}
4002
4003/**
4004 * ram_postcopy_incoming_init: allocate postcopy data structures
4005 *
4006 * Returns 0 for success and negative if there was one error
4007 *
4008 * @mis: current migration incoming state
4009 *
4010 * Allocate data structures etc needed by incoming migration with
4011 * postcopy-ram. postcopy-ram's similarly names
4012 * postcopy_ram_incoming_init does the work.
4013 */
4014int ram_postcopy_incoming_init(MigrationIncomingState *mis)
4015{
4016    return postcopy_ram_incoming_init(mis);
4017}
4018
4019/**
4020 * ram_load_postcopy: load a page in postcopy case
4021 *
4022 * Returns 0 for success or -errno in case of error
4023 *
4024 * Called in postcopy mode by ram_load().
4025 * rcu_read_lock is taken prior to this being called.
4026 *
4027 * @f: QEMUFile where to send the data
4028 */
4029static int ram_load_postcopy(QEMUFile *f)
4030{
4031    int flags = 0, ret = 0;
4032    bool place_needed = false;
4033    bool matches_target_page_size = false;
4034    MigrationIncomingState *mis = migration_incoming_get_current();
4035    /* Temporary page that is later 'placed' */
4036    void *postcopy_host_page = mis->postcopy_tmp_page;
4037    void *last_host = NULL;
4038    bool all_zero = false;
4039
4040    while (!ret && !(flags & RAM_SAVE_FLAG_EOS)) {
4041        ram_addr_t addr;
4042        void *host = NULL;
4043        void *page_buffer = NULL;
4044        void *place_source = NULL;
4045        RAMBlock *block = NULL;
4046        uint8_t ch;
4047
4048        addr = qemu_get_be64(f);
4049
4050        /*
4051         * If qemu file error, we should stop here, and then "addr"
4052         * may be invalid
4053         */
4054        ret = qemu_file_get_error(f);
4055        if (ret) {
4056            break;
4057        }
4058
4059        flags = addr & ~TARGET_PAGE_MASK;
4060        addr &= TARGET_PAGE_MASK;
4061
4062        trace_ram_load_postcopy_loop((uint64_t)addr, flags);
4063        place_needed = false;
4064        if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE)) {
4065            block = ram_block_from_stream(f, flags);
4066
4067            host = host_from_ram_block_offset(block, addr);
4068            if (!host) {
4069                error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
4070                ret = -EINVAL;
4071                break;
4072            }
4073            matches_target_page_size = block->page_size == TARGET_PAGE_SIZE;
4074            /*
4075             * Postcopy requires that we place whole host pages atomically;
4076             * these may be huge pages for RAMBlocks that are backed by
4077             * hugetlbfs.
4078             * To make it atomic, the data is read into a temporary page
4079             * that's moved into place later.
4080             * The migration protocol uses,  possibly smaller, target-pages
4081             * however the source ensures it always sends all the components
4082             * of a host page in order.
4083             */
4084            page_buffer = postcopy_host_page +
4085                          ((uintptr_t)host & (block->page_size - 1));
4086            /* If all TP are zero then we can optimise the place */
4087            if (!((uintptr_t)host & (block->page_size - 1))) {
4088                all_zero = true;
4089            } else {
4090                /* not the 1st TP within the HP */
4091                if (host != (last_host + TARGET_PAGE_SIZE)) {
4092                    error_report("Non-sequential target page %p/%p",
4093                                  host, last_host);
4094                    ret = -EINVAL;
4095                    break;
4096                }
4097            }
4098
4099
4100            /*
4101             * If it's the last part of a host page then we place the host
4102             * page
4103             */
4104            place_needed = (((uintptr_t)host + TARGET_PAGE_SIZE) &
4105                                     (block->page_size - 1)) == 0;
4106            place_source = postcopy_host_page;
4107        }
4108        last_host = host;
4109
4110        switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
4111        case RAM_SAVE_FLAG_ZERO:
4112            ch = qemu_get_byte(f);
4113            memset(page_buffer, ch, TARGET_PAGE_SIZE);
4114            if (ch) {
4115                all_zero = false;
4116            }
4117            break;
4118
4119        case RAM_SAVE_FLAG_PAGE:
4120            all_zero = false;
4121            if (!matches_target_page_size) {
4122                /* For huge pages, we always use temporary buffer */
4123                qemu_get_buffer(f, page_buffer, TARGET_PAGE_SIZE);
4124            } else {
4125                /*
4126                 * For small pages that matches target page size, we
4127                 * avoid the qemu_file copy.  Instead we directly use
4128                 * the buffer of QEMUFile to place the page.  Note: we
4129                 * cannot do any QEMUFile operation before using that
4130                 * buffer to make sure the buffer is valid when
4131                 * placing the page.
4132                 */
4133                qemu_get_buffer_in_place(f, (uint8_t **)&place_source,
4134                                         TARGET_PAGE_SIZE);
4135            }
4136            break;
4137        case RAM_SAVE_FLAG_EOS:
4138            /* normal exit */
4139            multifd_recv_sync_main();
4140            break;
4141        default:
4142            error_report("Unknown combination of migration flags: %#x"
4143                         " (postcopy mode)", flags);
4144            ret = -EINVAL;
4145            break;
4146        }
4147
4148        /* Detect for any possible file errors */
4149        if (!ret && qemu_file_get_error(f)) {
4150            ret = qemu_file_get_error(f);
4151        }
4152
4153        if (!ret && place_needed) {
4154            /* This gets called at the last target page in the host page */
4155            void *place_dest = host + TARGET_PAGE_SIZE - block->page_size;
4156
4157            if (all_zero) {
4158                ret = postcopy_place_page_zero(mis, place_dest,
4159                                               block);
4160            } else {
4161                ret = postcopy_place_page(mis, place_dest,
4162                                          place_source, block);
4163            }
4164        }
4165    }
4166
4167    return ret;
4168}
4169
4170static bool postcopy_is_advised(void)
4171{
4172    PostcopyState ps = postcopy_state_get();
4173    return ps >= POSTCOPY_INCOMING_ADVISE && ps < POSTCOPY_INCOMING_END;
4174}
4175
4176static bool postcopy_is_running(void)
4177{
4178    PostcopyState ps = postcopy_state_get();
4179    return ps >= POSTCOPY_INCOMING_LISTENING && ps < POSTCOPY_INCOMING_END;
4180}
4181
4182/*
4183 * Flush content of RAM cache into SVM's memory.
4184 * Only flush the pages that be dirtied by PVM or SVM or both.
4185 */
4186static void colo_flush_ram_cache(void)
4187{
4188    RAMBlock *block = NULL;
4189    void *dst_host;
4190    void *src_host;
4191    unsigned long offset = 0;
4192
4193    memory_global_dirty_log_sync();
4194    WITH_RCU_READ_LOCK_GUARD() {
4195        RAMBLOCK_FOREACH_NOT_IGNORED(block) {
4196            ramblock_sync_dirty_bitmap(ram_state, block);
4197        }
4198    }
4199
4200    trace_colo_flush_ram_cache_begin(ram_state->migration_dirty_pages);
4201    WITH_RCU_READ_LOCK_GUARD() {
4202        block = QLIST_FIRST_RCU(&ram_list.blocks);
4203
4204        while (block) {
4205            offset = migration_bitmap_find_dirty(ram_state, block, offset);
4206
4207            if (offset << TARGET_PAGE_BITS >= block->used_length) {
4208                offset = 0;
4209                block = QLIST_NEXT_RCU(block, next);
4210            } else {
4211                migration_bitmap_clear_dirty(ram_state, block, offset);
4212                dst_host = block->host + (offset << TARGET_PAGE_BITS);
4213                src_host = block->colo_cache + (offset << TARGET_PAGE_BITS);
4214                memcpy(dst_host, src_host, TARGET_PAGE_SIZE);
4215            }
4216        }
4217    }
4218    trace_colo_flush_ram_cache_end();
4219}
4220
4221/**
4222 * ram_load_precopy: load pages in precopy case
4223 *
4224 * Returns 0 for success or -errno in case of error
4225 *
4226 * Called in precopy mode by ram_load().
4227 * rcu_read_lock is taken prior to this being called.
4228 *
4229 * @f: QEMUFile where to send the data
4230 */
4231static int ram_load_precopy(QEMUFile *f)
4232{
4233    int flags = 0, ret = 0, invalid_flags = 0, len = 0;
4234    /* ADVISE is earlier, it shows the source has the postcopy capability on */
4235    bool postcopy_advised = postcopy_is_advised();
4236    if (!migrate_use_compression()) {
4237        invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
4238    }
4239
4240    while (!ret && !(flags & RAM_SAVE_FLAG_EOS)) {
4241        ram_addr_t addr, total_ram_bytes;
4242        void *host = NULL;
4243        uint8_t ch;
4244
4245        addr = qemu_get_be64(f);
4246        flags = addr & ~TARGET_PAGE_MASK;
4247        addr &= TARGET_PAGE_MASK;
4248
4249        if (flags & invalid_flags) {
4250            if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) {
4251                error_report("Received an unexpected compressed page");
4252            }
4253
4254            ret = -EINVAL;
4255            break;
4256        }
4257
4258        if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
4259                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
4260            RAMBlock *block = ram_block_from_stream(f, flags);
4261
4262            /*
4263             * After going into COLO, we should load the Page into colo_cache.
4264             */
4265            if (migration_incoming_in_colo_state()) {
4266                host = colo_cache_from_block_offset(block, addr);
4267            } else {
4268                host = host_from_ram_block_offset(block, addr);
4269            }
4270            if (!host) {
4271                error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
4272                ret = -EINVAL;
4273                break;
4274            }
4275
4276            if (!migration_incoming_in_colo_state()) {
4277                ramblock_recv_bitmap_set(block, host);
4278            }
4279
4280            trace_ram_load_loop(block->idstr, (uint64_t)addr, flags, host);
4281        }
4282
4283        switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
4284        case RAM_SAVE_FLAG_MEM_SIZE:
4285            /* Synchronize RAM block list */
4286            total_ram_bytes = addr;
4287            while (!ret && total_ram_bytes) {
4288                RAMBlock *block;
4289                char id[256];
4290                ram_addr_t length;
4291
4292                len = qemu_get_byte(f);
4293                qemu_get_buffer(f, (uint8_t *)id, len);
4294                id[len] = 0;
4295                length = qemu_get_be64(f);
4296
4297                block = qemu_ram_block_by_name(id);
4298                if (block && !qemu_ram_is_migratable(block)) {
4299                    error_report("block %s should not be migrated !", id);
4300                    ret = -EINVAL;
4301                } else if (block) {
4302                    if (length != block->used_length) {
4303                        Error *local_err = NULL;
4304
4305                        ret = qemu_ram_resize(block, length,
4306                                              &local_err);
4307                        if (local_err) {
4308                            error_report_err(local_err);
4309                        }
4310                    }
4311                    /* For postcopy we need to check hugepage sizes match */
4312                    if (postcopy_advised &&
4313                        block->page_size != qemu_host_page_size) {
4314                        uint64_t remote_page_size = qemu_get_be64(f);
4315                        if (remote_page_size != block->page_size) {
4316                            error_report("Mismatched RAM page size %s "
4317                                         "(local) %zd != %" PRId64,
4318                                         id, block->page_size,
4319                                         remote_page_size);
4320                            ret = -EINVAL;
4321                        }
4322                    }
4323                    if (migrate_ignore_shared()) {
4324                        hwaddr addr = qemu_get_be64(f);
4325                        if (ramblock_is_ignored(block) &&
4326                            block->mr->addr != addr) {
4327                            error_report("Mismatched GPAs for block %s "
4328                                         "%" PRId64 "!= %" PRId64,
4329                                         id, (uint64_t)addr,
4330                                         (uint64_t)block->mr->addr);
4331                            ret = -EINVAL;
4332                        }
4333                    }
4334                    ram_control_load_hook(f, RAM_CONTROL_BLOCK_REG,
4335                                          block->idstr);
4336                } else {
4337                    error_report("Unknown ramblock \"%s\", cannot "
4338                                 "accept migration", id);
4339                    ret = -EINVAL;
4340                }
4341
4342                total_ram_bytes -= length;
4343            }
4344            break;
4345
4346        case RAM_SAVE_FLAG_ZERO:
4347            ch = qemu_get_byte(f);
4348            ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
4349            break;
4350
4351        case RAM_SAVE_FLAG_PAGE:
4352            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
4353            break;
4354
4355        case RAM_SAVE_FLAG_COMPRESS_PAGE:
4356            len = qemu_get_be32(f);
4357            if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
4358                error_report("Invalid compressed data length: %d", len);
4359                ret = -EINVAL;
4360                break;
4361            }
4362            decompress_data_with_multi_threads(f, host, len);
4363            break;
4364
4365        case RAM_SAVE_FLAG_XBZRLE:
4366            if (load_xbzrle(f, addr, host) < 0) {
4367                error_report("Failed to decompress XBZRLE page at "
4368                             RAM_ADDR_FMT, addr);
4369                ret = -EINVAL;
4370                break;
4371            }
4372            break;
4373        case RAM_SAVE_FLAG_EOS:
4374            /* normal exit */
4375            multifd_recv_sync_main();
4376            break;
4377        default:
4378            if (flags & RAM_SAVE_FLAG_HOOK) {
4379                ram_control_load_hook(f, RAM_CONTROL_HOOK, NULL);
4380            } else {
4381                error_report("Unknown combination of migration flags: %#x",
4382                             flags);
4383                ret = -EINVAL;
4384            }
4385        }
4386        if (!ret) {
4387            ret = qemu_file_get_error(f);
4388        }
4389    }
4390
4391    return ret;
4392}
4393
4394static int ram_load(QEMUFile *f, void *opaque, int version_id)
4395{
4396    int ret = 0;
4397    static uint64_t seq_iter;
4398    /*
4399     * If system is running in postcopy mode, page inserts to host memory must
4400     * be atomic
4401     */
4402    bool postcopy_running = postcopy_is_running();
4403
4404    seq_iter++;
4405
4406    if (version_id != 4) {
4407        return -EINVAL;
4408    }
4409
4410    /*
4411     * This RCU critical section can be very long running.
4412     * When RCU reclaims in the code start to become numerous,
4413     * it will be necessary to reduce the granularity of this
4414     * critical section.
4415     */
4416    WITH_RCU_READ_LOCK_GUARD() {
4417        if (postcopy_running) {
4418            ret = ram_load_postcopy(f);
4419        } else {
4420            ret = ram_load_precopy(f);
4421        }
4422
4423        ret |= wait_for_decompress_done();
4424    }
4425    trace_ram_load_complete(ret, seq_iter);
4426
4427    if (!ret  && migration_incoming_in_colo_state()) {
4428        colo_flush_ram_cache();
4429    }
4430    return ret;
4431}
4432
4433static bool ram_has_postcopy(void *opaque)
4434{
4435    RAMBlock *rb;
4436    RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
4437        if (ramblock_is_pmem(rb)) {
4438            info_report("Block: %s, host: %p is a nvdimm memory, postcopy"
4439                         "is not supported now!", rb->idstr, rb->host);
4440            return false;
4441        }
4442    }
4443
4444    return migrate_postcopy_ram();
4445}
4446
4447/* Sync all the dirty bitmap with destination VM.  */
4448static int ram_dirty_bitmap_sync_all(MigrationState *s, RAMState *rs)
4449{
4450    RAMBlock *block;
4451    QEMUFile *file = s->to_dst_file;
4452    int ramblock_count = 0;
4453
4454    trace_ram_dirty_bitmap_sync_start();
4455
4456    RAMBLOCK_FOREACH_NOT_IGNORED(block) {
4457        qemu_savevm_send_recv_bitmap(file, block->idstr);
4458        trace_ram_dirty_bitmap_request(block->idstr);
4459        ramblock_count++;
4460    }
4461
4462    trace_ram_dirty_bitmap_sync_wait();
4463
4464    /* Wait until all the ramblocks' dirty bitmap synced */
4465    while (ramblock_count--) {
4466        qemu_sem_wait(&s->rp_state.rp_sem);
4467    }
4468
4469    trace_ram_dirty_bitmap_sync_complete();
4470
4471    return 0;
4472}
4473
4474static void ram_dirty_bitmap_reload_notify(MigrationState *s)
4475{
4476    qemu_sem_post(&s->rp_state.rp_sem);
4477}
4478
4479/*
4480 * Read the received bitmap, revert it as the initial dirty bitmap.
4481 * This is only used when the postcopy migration is paused but wants
4482 * to resume from a middle point.
4483 */
4484int ram_dirty_bitmap_reload(MigrationState *s, RAMBlock *block)
4485{
4486    int ret = -EINVAL;
4487    QEMUFile *file = s->rp_state.from_dst_file;
4488    unsigned long *le_bitmap, nbits = block->used_length >> TARGET_PAGE_BITS;
4489    uint64_t local_size = DIV_ROUND_UP(nbits, 8);
4490    uint64_t size, end_mark;
4491
4492    trace_ram_dirty_bitmap_reload_begin(block->idstr);
4493
4494    if (s->state != MIGRATION_STATUS_POSTCOPY_RECOVER) {
4495        error_report("%s: incorrect state %s", __func__,
4496                     MigrationStatus_str(s->state));
4497        return -EINVAL;
4498    }
4499
4500    /*
4501     * Note: see comments in ramblock_recv_bitmap_send() on why we
4502     * need the endianess convertion, and the paddings.
4503     */
4504    local_size = ROUND_UP(local_size, 8);
4505
4506    /* Add paddings */
4507    le_bitmap = bitmap_new(nbits + BITS_PER_LONG);
4508
4509    size = qemu_get_be64(file);
4510
4511    /* The size of the bitmap should match with our ramblock */
4512    if (size != local_size) {
4513        error_report("%s: ramblock '%s' bitmap size mismatch "
4514                     "(0x%"PRIx64" != 0x%"PRIx64")", __func__,
4515                     block->idstr, size, local_size);
4516        ret = -EINVAL;
4517        goto out;
4518    }
4519
4520    size = qemu_get_buffer(file, (uint8_t *)le_bitmap, local_size);
4521    end_mark = qemu_get_be64(file);
4522
4523    ret = qemu_file_get_error(file);
4524    if (ret || size != local_size) {
4525        error_report("%s: read bitmap failed for ramblock '%s': %d"
4526                     " (size 0x%"PRIx64", got: 0x%"PRIx64")",
4527                     __func__, block->idstr, ret, local_size, size);
4528        ret = -EIO;
4529        goto out;
4530    }
4531
4532    if (end_mark != RAMBLOCK_RECV_BITMAP_ENDING) {
4533        error_report("%s: ramblock '%s' end mark incorrect: 0x%"PRIu64,
4534                     __func__, block->idstr, end_mark);
4535        ret = -EINVAL;
4536        goto out;
4537    }
4538
4539    /*
4540     * Endianess convertion. We are during postcopy (though paused).
4541     * The dirty bitmap won't change. We can directly modify it.
4542     */
4543    bitmap_from_le(block->bmap, le_bitmap, nbits);
4544
4545    /*
4546     * What we received is "received bitmap". Revert it as the initial
4547     * dirty bitmap for this ramblock.
4548     */
4549    bitmap_complement(block->bmap, block->bmap, nbits);
4550
4551    trace_ram_dirty_bitmap_reload_complete(block->idstr);
4552
4553    /*
4554     * We succeeded to sync bitmap for current ramblock. If this is
4555     * the last one to sync, we need to notify the main send thread.
4556     */
4557    ram_dirty_bitmap_reload_notify(s);
4558
4559    ret = 0;
4560out:
4561    g_free(le_bitmap);
4562    return ret;
4563}
4564
4565static int ram_resume_prepare(MigrationState *s, void *opaque)
4566{
4567    RAMState *rs = *(RAMState **)opaque;
4568    int ret;
4569
4570    ret = ram_dirty_bitmap_sync_all(s, rs);
4571    if (ret) {
4572        return ret;
4573    }
4574
4575    ram_state_resume_prepare(rs, s->to_dst_file);
4576
4577    return 0;
4578}
4579
4580static SaveVMHandlers savevm_ram_handlers = {
4581    .save_setup = ram_save_setup,
4582    .save_live_iterate = ram_save_iterate,
4583    .save_live_complete_postcopy = ram_save_complete,
4584    .save_live_complete_precopy = ram_save_complete,
4585    .has_postcopy = ram_has_postcopy,
4586    .save_live_pending = ram_save_pending,
4587    .load_state = ram_load,
4588    .save_cleanup = ram_save_cleanup,
4589    .load_setup = ram_load_setup,
4590    .load_cleanup = ram_load_cleanup,
4591    .resume_prepare = ram_resume_prepare,
4592};
4593
4594void ram_mig_init(void)
4595{
4596    qemu_mutex_init(&XBZRLE.lock);
4597    register_savevm_live("ram", 0, 4, &savevm_ram_handlers, &ram_state);
4598}
4599