qemu/migration/ram.c
<<
>>
Prefs
   1/*
   2 * QEMU System Emulator
   3 *
   4 * Copyright (c) 2003-2008 Fabrice Bellard
   5 * Copyright (c) 2011-2015 Red Hat Inc
   6 *
   7 * Authors:
   8 *  Juan Quintela <quintela@redhat.com>
   9 *
  10 * Permission is hereby granted, free of charge, to any person obtaining a copy
  11 * of this software and associated documentation files (the "Software"), to deal
  12 * in the Software without restriction, including without limitation the rights
  13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  14 * copies of the Software, and to permit persons to whom the Software is
  15 * furnished to do so, subject to the following conditions:
  16 *
  17 * The above copyright notice and this permission notice shall be included in
  18 * all copies or substantial portions of the Software.
  19 *
  20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  23 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  26 * THE SOFTWARE.
  27 */
  28
  29#include "qemu/osdep.h"
  30#include "cpu.h"
  31#include <zlib.h>
  32#include "qemu/cutils.h"
  33#include "qemu/bitops.h"
  34#include "qemu/bitmap.h"
  35#include "qemu/main-loop.h"
  36#include "qemu/pmem.h"
  37#include "xbzrle.h"
  38#include "ram.h"
  39#include "migration.h"
  40#include "socket.h"
  41#include "migration/register.h"
  42#include "migration/misc.h"
  43#include "qemu-file.h"
  44#include "postcopy-ram.h"
  45#include "page_cache.h"
  46#include "qemu/error-report.h"
  47#include "qapi/error.h"
  48#include "qapi/qapi-events-migration.h"
  49#include "qapi/qmp/qerror.h"
  50#include "trace.h"
  51#include "exec/ram_addr.h"
  52#include "exec/target_page.h"
  53#include "qemu/rcu_queue.h"
  54#include "migration/colo.h"
  55#include "block.h"
  56#include "sysemu/sysemu.h"
  57#include "qemu/uuid.h"
  58#include "savevm.h"
  59#include "qemu/iov.h"
  60
  61/***********************************************************/
  62/* ram save/restore */
  63
  64/* RAM_SAVE_FLAG_ZERO used to be named RAM_SAVE_FLAG_COMPRESS, it
  65 * worked for pages that where filled with the same char.  We switched
  66 * it to only search for the zero value.  And to avoid confusion with
  67 * RAM_SSAVE_FLAG_COMPRESS_PAGE just rename it.
  68 */
  69
  70#define RAM_SAVE_FLAG_FULL     0x01 /* Obsolete, not used anymore */
  71#define RAM_SAVE_FLAG_ZERO     0x02
  72#define RAM_SAVE_FLAG_MEM_SIZE 0x04
  73#define RAM_SAVE_FLAG_PAGE     0x08
  74#define RAM_SAVE_FLAG_EOS      0x10
  75#define RAM_SAVE_FLAG_CONTINUE 0x20
  76#define RAM_SAVE_FLAG_XBZRLE   0x40
  77/* 0x80 is reserved in migration.h start with 0x100 next */
  78#define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
  79
  80static inline bool is_zero_range(uint8_t *p, uint64_t size)
  81{
  82    return buffer_is_zero(p, size);
  83}
  84
  85XBZRLECacheStats xbzrle_counters;
  86
  87/* struct contains XBZRLE cache and a static page
  88   used by the compression */
  89static struct {
  90    /* buffer used for XBZRLE encoding */
  91    uint8_t *encoded_buf;
  92    /* buffer for storing page content */
  93    uint8_t *current_buf;
  94    /* Cache for XBZRLE, Protected by lock. */
  95    PageCache *cache;
  96    QemuMutex lock;
  97    /* it will store a page full of zeros */
  98    uint8_t *zero_target_page;
  99    /* buffer used for XBZRLE decoding */
 100    uint8_t *decoded_buf;
 101} XBZRLE;
 102
 103static void XBZRLE_cache_lock(void)
 104{
 105    if (migrate_use_xbzrle())
 106        qemu_mutex_lock(&XBZRLE.lock);
 107}
 108
 109static void XBZRLE_cache_unlock(void)
 110{
 111    if (migrate_use_xbzrle())
 112        qemu_mutex_unlock(&XBZRLE.lock);
 113}
 114
 115/**
 116 * xbzrle_cache_resize: resize the xbzrle cache
 117 *
 118 * This function is called from qmp_migrate_set_cache_size in main
 119 * thread, possibly while a migration is in progress.  A running
 120 * migration may be using the cache and might finish during this call,
 121 * hence changes to the cache are protected by XBZRLE.lock().
 122 *
 123 * Returns 0 for success or -1 for error
 124 *
 125 * @new_size: new cache size
 126 * @errp: set *errp if the check failed, with reason
 127 */
 128int xbzrle_cache_resize(int64_t new_size, Error **errp)
 129{
 130    PageCache *new_cache;
 131    int64_t ret = 0;
 132
 133    /* Check for truncation */
 134    if (new_size != (size_t)new_size) {
 135        error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "cache size",
 136                   "exceeding address space");
 137        return -1;
 138    }
 139
 140    if (new_size == migrate_xbzrle_cache_size()) {
 141        /* nothing to do */
 142        return 0;
 143    }
 144
 145    XBZRLE_cache_lock();
 146
 147    if (XBZRLE.cache != NULL) {
 148        new_cache = cache_init(new_size, TARGET_PAGE_SIZE, errp);
 149        if (!new_cache) {
 150            ret = -1;
 151            goto out;
 152        }
 153
 154        cache_fini(XBZRLE.cache);
 155        XBZRLE.cache = new_cache;
 156    }
 157out:
 158    XBZRLE_cache_unlock();
 159    return ret;
 160}
 161
 162/* Should be holding either ram_list.mutex, or the RCU lock. */
 163#define RAMBLOCK_FOREACH_MIGRATABLE(block)             \
 164    INTERNAL_RAMBLOCK_FOREACH(block)                   \
 165        if (!qemu_ram_is_migratable(block)) {} else
 166
 167#undef RAMBLOCK_FOREACH
 168
 169static void ramblock_recv_map_init(void)
 170{
 171    RAMBlock *rb;
 172
 173    RAMBLOCK_FOREACH_MIGRATABLE(rb) {
 174        assert(!rb->receivedmap);
 175        rb->receivedmap = bitmap_new(rb->max_length >> qemu_target_page_bits());
 176    }
 177}
 178
 179int ramblock_recv_bitmap_test(RAMBlock *rb, void *host_addr)
 180{
 181    return test_bit(ramblock_recv_bitmap_offset(host_addr, rb),
 182                    rb->receivedmap);
 183}
 184
 185bool ramblock_recv_bitmap_test_byte_offset(RAMBlock *rb, uint64_t byte_offset)
 186{
 187    return test_bit(byte_offset >> TARGET_PAGE_BITS, rb->receivedmap);
 188}
 189
 190void ramblock_recv_bitmap_set(RAMBlock *rb, void *host_addr)
 191{
 192    set_bit_atomic(ramblock_recv_bitmap_offset(host_addr, rb), rb->receivedmap);
 193}
 194
 195void ramblock_recv_bitmap_set_range(RAMBlock *rb, void *host_addr,
 196                                    size_t nr)
 197{
 198    bitmap_set_atomic(rb->receivedmap,
 199                      ramblock_recv_bitmap_offset(host_addr, rb),
 200                      nr);
 201}
 202
 203#define  RAMBLOCK_RECV_BITMAP_ENDING  (0x0123456789abcdefULL)
 204
 205/*
 206 * Format: bitmap_size (8 bytes) + whole_bitmap (N bytes).
 207 *
 208 * Returns >0 if success with sent bytes, or <0 if error.
 209 */
 210int64_t ramblock_recv_bitmap_send(QEMUFile *file,
 211                                  const char *block_name)
 212{
 213    RAMBlock *block = qemu_ram_block_by_name(block_name);
 214    unsigned long *le_bitmap, nbits;
 215    uint64_t size;
 216
 217    if (!block) {
 218        error_report("%s: invalid block name: %s", __func__, block_name);
 219        return -1;
 220    }
 221
 222    nbits = block->used_length >> TARGET_PAGE_BITS;
 223
 224    /*
 225     * Make sure the tmp bitmap buffer is big enough, e.g., on 32bit
 226     * machines we may need 4 more bytes for padding (see below
 227     * comment). So extend it a bit before hand.
 228     */
 229    le_bitmap = bitmap_new(nbits + BITS_PER_LONG);
 230
 231    /*
 232     * Always use little endian when sending the bitmap. This is
 233     * required that when source and destination VMs are not using the
 234     * same endianess. (Note: big endian won't work.)
 235     */
 236    bitmap_to_le(le_bitmap, block->receivedmap, nbits);
 237
 238    /* Size of the bitmap, in bytes */
 239    size = DIV_ROUND_UP(nbits, 8);
 240
 241    /*
 242     * size is always aligned to 8 bytes for 64bit machines, but it
 243     * may not be true for 32bit machines. We need this padding to
 244     * make sure the migration can survive even between 32bit and
 245     * 64bit machines.
 246     */
 247    size = ROUND_UP(size, 8);
 248
 249    qemu_put_be64(file, size);
 250    qemu_put_buffer(file, (const uint8_t *)le_bitmap, size);
 251    /*
 252     * Mark as an end, in case the middle part is screwed up due to
 253     * some "misterious" reason.
 254     */
 255    qemu_put_be64(file, RAMBLOCK_RECV_BITMAP_ENDING);
 256    qemu_fflush(file);
 257
 258    g_free(le_bitmap);
 259
 260    if (qemu_file_get_error(file)) {
 261        return qemu_file_get_error(file);
 262    }
 263
 264    return size + sizeof(size);
 265}
 266
 267/*
 268 * An outstanding page request, on the source, having been received
 269 * and queued
 270 */
 271struct RAMSrcPageRequest {
 272    RAMBlock *rb;
 273    hwaddr    offset;
 274    hwaddr    len;
 275
 276    QSIMPLEQ_ENTRY(RAMSrcPageRequest) next_req;
 277};
 278
 279/* State of RAM for migration */
 280struct RAMState {
 281    /* QEMUFile used for this migration */
 282    QEMUFile *f;
 283    /* Last block that we have visited searching for dirty pages */
 284    RAMBlock *last_seen_block;
 285    /* Last block from where we have sent data */
 286    RAMBlock *last_sent_block;
 287    /* Last dirty target page we have sent */
 288    ram_addr_t last_page;
 289    /* last ram version we have seen */
 290    uint32_t last_version;
 291    /* We are in the first round */
 292    bool ram_bulk_stage;
 293    /* How many times we have dirty too many pages */
 294    int dirty_rate_high_cnt;
 295    /* these variables are used for bitmap sync */
 296    /* last time we did a full bitmap_sync */
 297    int64_t time_last_bitmap_sync;
 298    /* bytes transferred at start_time */
 299    uint64_t bytes_xfer_prev;
 300    /* number of dirty pages since start_time */
 301    uint64_t num_dirty_pages_period;
 302    /* xbzrle misses since the beginning of the period */
 303    uint64_t xbzrle_cache_miss_prev;
 304
 305    /* compression statistics since the beginning of the period */
 306    /* amount of count that no free thread to compress data */
 307    uint64_t compress_thread_busy_prev;
 308    /* amount bytes after compression */
 309    uint64_t compressed_size_prev;
 310    /* amount of compressed pages */
 311    uint64_t compress_pages_prev;
 312
 313    /* total handled target pages at the beginning of period */
 314    uint64_t target_page_count_prev;
 315    /* total handled target pages since start */
 316    uint64_t target_page_count;
 317    /* number of dirty bits in the bitmap */
 318    uint64_t migration_dirty_pages;
 319    /* protects modification of the bitmap */
 320    QemuMutex bitmap_mutex;
 321    /* The RAMBlock used in the last src_page_requests */
 322    RAMBlock *last_req_rb;
 323    /* Queue of outstanding page requests from the destination */
 324    QemuMutex src_page_req_mutex;
 325    QSIMPLEQ_HEAD(src_page_requests, RAMSrcPageRequest) src_page_requests;
 326};
 327typedef struct RAMState RAMState;
 328
 329static RAMState *ram_state;
 330
 331uint64_t ram_bytes_remaining(void)
 332{
 333    return ram_state ? (ram_state->migration_dirty_pages * TARGET_PAGE_SIZE) :
 334                       0;
 335}
 336
 337MigrationStats ram_counters;
 338
 339/* used by the search for pages to send */
 340struct PageSearchStatus {
 341    /* Current block being searched */
 342    RAMBlock    *block;
 343    /* Current page to search from */
 344    unsigned long page;
 345    /* Set once we wrap around */
 346    bool         complete_round;
 347};
 348typedef struct PageSearchStatus PageSearchStatus;
 349
 350CompressionStats compression_counters;
 351
 352struct CompressParam {
 353    bool done;
 354    bool quit;
 355    bool zero_page;
 356    QEMUFile *file;
 357    QemuMutex mutex;
 358    QemuCond cond;
 359    RAMBlock *block;
 360    ram_addr_t offset;
 361
 362    /* internally used fields */
 363    z_stream stream;
 364    uint8_t *originbuf;
 365};
 366typedef struct CompressParam CompressParam;
 367
 368struct DecompressParam {
 369    bool done;
 370    bool quit;
 371    QemuMutex mutex;
 372    QemuCond cond;
 373    void *des;
 374    uint8_t *compbuf;
 375    int len;
 376    z_stream stream;
 377};
 378typedef struct DecompressParam DecompressParam;
 379
 380static CompressParam *comp_param;
 381static QemuThread *compress_threads;
 382/* comp_done_cond is used to wake up the migration thread when
 383 * one of the compression threads has finished the compression.
 384 * comp_done_lock is used to co-work with comp_done_cond.
 385 */
 386static QemuMutex comp_done_lock;
 387static QemuCond comp_done_cond;
 388/* The empty QEMUFileOps will be used by file in CompressParam */
 389static const QEMUFileOps empty_ops = { };
 390
 391static QEMUFile *decomp_file;
 392static DecompressParam *decomp_param;
 393static QemuThread *decompress_threads;
 394static QemuMutex decomp_done_lock;
 395static QemuCond decomp_done_cond;
 396
 397static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
 398                                 ram_addr_t offset, uint8_t *source_buf);
 399
 400static void *do_data_compress(void *opaque)
 401{
 402    CompressParam *param = opaque;
 403    RAMBlock *block;
 404    ram_addr_t offset;
 405    bool zero_page;
 406
 407    qemu_mutex_lock(&param->mutex);
 408    while (!param->quit) {
 409        if (param->block) {
 410            block = param->block;
 411            offset = param->offset;
 412            param->block = NULL;
 413            qemu_mutex_unlock(&param->mutex);
 414
 415            zero_page = do_compress_ram_page(param->file, &param->stream,
 416                                             block, offset, param->originbuf);
 417
 418            qemu_mutex_lock(&comp_done_lock);
 419            param->done = true;
 420            param->zero_page = zero_page;
 421            qemu_cond_signal(&comp_done_cond);
 422            qemu_mutex_unlock(&comp_done_lock);
 423
 424            qemu_mutex_lock(&param->mutex);
 425        } else {
 426            qemu_cond_wait(&param->cond, &param->mutex);
 427        }
 428    }
 429    qemu_mutex_unlock(&param->mutex);
 430
 431    return NULL;
 432}
 433
 434static void compress_threads_save_cleanup(void)
 435{
 436    int i, thread_count;
 437
 438    if (!migrate_use_compression() || !comp_param) {
 439        return;
 440    }
 441
 442    thread_count = migrate_compress_threads();
 443    for (i = 0; i < thread_count; i++) {
 444        /*
 445         * we use it as a indicator which shows if the thread is
 446         * properly init'd or not
 447         */
 448        if (!comp_param[i].file) {
 449            break;
 450        }
 451
 452        qemu_mutex_lock(&comp_param[i].mutex);
 453        comp_param[i].quit = true;
 454        qemu_cond_signal(&comp_param[i].cond);
 455        qemu_mutex_unlock(&comp_param[i].mutex);
 456
 457        qemu_thread_join(compress_threads + i);
 458        qemu_mutex_destroy(&comp_param[i].mutex);
 459        qemu_cond_destroy(&comp_param[i].cond);
 460        deflateEnd(&comp_param[i].stream);
 461        g_free(comp_param[i].originbuf);
 462        qemu_fclose(comp_param[i].file);
 463        comp_param[i].file = NULL;
 464    }
 465    qemu_mutex_destroy(&comp_done_lock);
 466    qemu_cond_destroy(&comp_done_cond);
 467    g_free(compress_threads);
 468    g_free(comp_param);
 469    compress_threads = NULL;
 470    comp_param = NULL;
 471}
 472
 473static int compress_threads_save_setup(void)
 474{
 475    int i, thread_count;
 476
 477    if (!migrate_use_compression()) {
 478        return 0;
 479    }
 480    thread_count = migrate_compress_threads();
 481    compress_threads = g_new0(QemuThread, thread_count);
 482    comp_param = g_new0(CompressParam, thread_count);
 483    qemu_cond_init(&comp_done_cond);
 484    qemu_mutex_init(&comp_done_lock);
 485    for (i = 0; i < thread_count; i++) {
 486        comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
 487        if (!comp_param[i].originbuf) {
 488            goto exit;
 489        }
 490
 491        if (deflateInit(&comp_param[i].stream,
 492                        migrate_compress_level()) != Z_OK) {
 493            g_free(comp_param[i].originbuf);
 494            goto exit;
 495        }
 496
 497        /* comp_param[i].file is just used as a dummy buffer to save data,
 498         * set its ops to empty.
 499         */
 500        comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
 501        comp_param[i].done = true;
 502        comp_param[i].quit = false;
 503        qemu_mutex_init(&comp_param[i].mutex);
 504        qemu_cond_init(&comp_param[i].cond);
 505        qemu_thread_create(compress_threads + i, "compress",
 506                           do_data_compress, comp_param + i,
 507                           QEMU_THREAD_JOINABLE);
 508    }
 509    return 0;
 510
 511exit:
 512    compress_threads_save_cleanup();
 513    return -1;
 514}
 515
 516/* Multiple fd's */
 517
 518#define MULTIFD_MAGIC 0x11223344U
 519#define MULTIFD_VERSION 1
 520
 521#define MULTIFD_FLAG_SYNC (1 << 0)
 522
 523typedef struct {
 524    uint32_t magic;
 525    uint32_t version;
 526    unsigned char uuid[16]; /* QemuUUID */
 527    uint8_t id;
 528} __attribute__((packed)) MultiFDInit_t;
 529
 530typedef struct {
 531    uint32_t magic;
 532    uint32_t version;
 533    uint32_t flags;
 534    uint32_t size;
 535    uint32_t used;
 536    uint64_t packet_num;
 537    char ramblock[256];
 538    uint64_t offset[];
 539} __attribute__((packed)) MultiFDPacket_t;
 540
 541typedef struct {
 542    /* number of used pages */
 543    uint32_t used;
 544    /* number of allocated pages */
 545    uint32_t allocated;
 546    /* global number of generated multifd packets */
 547    uint64_t packet_num;
 548    /* offset of each page */
 549    ram_addr_t *offset;
 550    /* pointer to each page */
 551    struct iovec *iov;
 552    RAMBlock *block;
 553} MultiFDPages_t;
 554
 555typedef struct {
 556    /* this fields are not changed once the thread is created */
 557    /* channel number */
 558    uint8_t id;
 559    /* channel thread name */
 560    char *name;
 561    /* channel thread id */
 562    QemuThread thread;
 563    /* communication channel */
 564    QIOChannel *c;
 565    /* sem where to wait for more work */
 566    QemuSemaphore sem;
 567    /* this mutex protects the following parameters */
 568    QemuMutex mutex;
 569    /* is this channel thread running */
 570    bool running;
 571    /* should this thread finish */
 572    bool quit;
 573    /* thread has work to do */
 574    int pending_job;
 575    /* array of pages to sent */
 576    MultiFDPages_t *pages;
 577    /* packet allocated len */
 578    uint32_t packet_len;
 579    /* pointer to the packet */
 580    MultiFDPacket_t *packet;
 581    /* multifd flags for each packet */
 582    uint32_t flags;
 583    /* global number of generated multifd packets */
 584    uint64_t packet_num;
 585    /* thread local variables */
 586    /* packets sent through this channel */
 587    uint64_t num_packets;
 588    /* pages sent through this channel */
 589    uint64_t num_pages;
 590    /* syncs main thread and channels */
 591    QemuSemaphore sem_sync;
 592}  MultiFDSendParams;
 593
 594typedef struct {
 595    /* this fields are not changed once the thread is created */
 596    /* channel number */
 597    uint8_t id;
 598    /* channel thread name */
 599    char *name;
 600    /* channel thread id */
 601    QemuThread thread;
 602    /* communication channel */
 603    QIOChannel *c;
 604    /* this mutex protects the following parameters */
 605    QemuMutex mutex;
 606    /* is this channel thread running */
 607    bool running;
 608    /* array of pages to receive */
 609    MultiFDPages_t *pages;
 610    /* packet allocated len */
 611    uint32_t packet_len;
 612    /* pointer to the packet */
 613    MultiFDPacket_t *packet;
 614    /* multifd flags for each packet */
 615    uint32_t flags;
 616    /* global number of generated multifd packets */
 617    uint64_t packet_num;
 618    /* thread local variables */
 619    /* packets sent through this channel */
 620    uint64_t num_packets;
 621    /* pages sent through this channel */
 622    uint64_t num_pages;
 623    /* syncs main thread and channels */
 624    QemuSemaphore sem_sync;
 625} MultiFDRecvParams;
 626
 627static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
 628{
 629    MultiFDInit_t msg;
 630    int ret;
 631
 632    msg.magic = cpu_to_be32(MULTIFD_MAGIC);
 633    msg.version = cpu_to_be32(MULTIFD_VERSION);
 634    msg.id = p->id;
 635    memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
 636
 637    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
 638    if (ret != 0) {
 639        return -1;
 640    }
 641    return 0;
 642}
 643
 644static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
 645{
 646    MultiFDInit_t msg;
 647    int ret;
 648
 649    ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
 650    if (ret != 0) {
 651        return -1;
 652    }
 653
 654    msg.magic = be32_to_cpu(msg.magic);
 655    msg.version = be32_to_cpu(msg.version);
 656
 657    if (msg.magic != MULTIFD_MAGIC) {
 658        error_setg(errp, "multifd: received packet magic %x "
 659                   "expected %x", msg.magic, MULTIFD_MAGIC);
 660        return -1;
 661    }
 662
 663    if (msg.version != MULTIFD_VERSION) {
 664        error_setg(errp, "multifd: received packet version %d "
 665                   "expected %d", msg.version, MULTIFD_VERSION);
 666        return -1;
 667    }
 668
 669    if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
 670        char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
 671        char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
 672
 673        error_setg(errp, "multifd: received uuid '%s' and expected "
 674                   "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
 675        g_free(uuid);
 676        g_free(msg_uuid);
 677        return -1;
 678    }
 679
 680    if (msg.id > migrate_multifd_channels()) {
 681        error_setg(errp, "multifd: received channel version %d "
 682                   "expected %d", msg.version, MULTIFD_VERSION);
 683        return -1;
 684    }
 685
 686    return msg.id;
 687}
 688
 689static MultiFDPages_t *multifd_pages_init(size_t size)
 690{
 691    MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
 692
 693    pages->allocated = size;
 694    pages->iov = g_new0(struct iovec, size);
 695    pages->offset = g_new0(ram_addr_t, size);
 696
 697    return pages;
 698}
 699
 700static void multifd_pages_clear(MultiFDPages_t *pages)
 701{
 702    pages->used = 0;
 703    pages->allocated = 0;
 704    pages->packet_num = 0;
 705    pages->block = NULL;
 706    g_free(pages->iov);
 707    pages->iov = NULL;
 708    g_free(pages->offset);
 709    pages->offset = NULL;
 710    g_free(pages);
 711}
 712
 713static void multifd_send_fill_packet(MultiFDSendParams *p)
 714{
 715    MultiFDPacket_t *packet = p->packet;
 716    int i;
 717
 718    packet->magic = cpu_to_be32(MULTIFD_MAGIC);
 719    packet->version = cpu_to_be32(MULTIFD_VERSION);
 720    packet->flags = cpu_to_be32(p->flags);
 721    packet->size = cpu_to_be32(migrate_multifd_page_count());
 722    packet->used = cpu_to_be32(p->pages->used);
 723    packet->packet_num = cpu_to_be64(p->packet_num);
 724
 725    if (p->pages->block) {
 726        strncpy(packet->ramblock, p->pages->block->idstr, 256);
 727    }
 728
 729    for (i = 0; i < p->pages->used; i++) {
 730        packet->offset[i] = cpu_to_be64(p->pages->offset[i]);
 731    }
 732}
 733
 734static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
 735{
 736    MultiFDPacket_t *packet = p->packet;
 737    RAMBlock *block;
 738    int i;
 739
 740    packet->magic = be32_to_cpu(packet->magic);
 741    if (packet->magic != MULTIFD_MAGIC) {
 742        error_setg(errp, "multifd: received packet "
 743                   "magic %x and expected magic %x",
 744                   packet->magic, MULTIFD_MAGIC);
 745        return -1;
 746    }
 747
 748    packet->version = be32_to_cpu(packet->version);
 749    if (packet->version != MULTIFD_VERSION) {
 750        error_setg(errp, "multifd: received packet "
 751                   "version %d and expected version %d",
 752                   packet->version, MULTIFD_VERSION);
 753        return -1;
 754    }
 755
 756    p->flags = be32_to_cpu(packet->flags);
 757
 758    packet->size = be32_to_cpu(packet->size);
 759    if (packet->size > migrate_multifd_page_count()) {
 760        error_setg(errp, "multifd: received packet "
 761                   "with size %d and expected maximum size %d",
 762                   packet->size, migrate_multifd_page_count()) ;
 763        return -1;
 764    }
 765
 766    p->pages->used = be32_to_cpu(packet->used);
 767    if (p->pages->used > packet->size) {
 768        error_setg(errp, "multifd: received packet "
 769                   "with size %d and expected maximum size %d",
 770                   p->pages->used, packet->size) ;
 771        return -1;
 772    }
 773
 774    p->packet_num = be64_to_cpu(packet->packet_num);
 775
 776    if (p->pages->used) {
 777        /* make sure that ramblock is 0 terminated */
 778        packet->ramblock[255] = 0;
 779        block = qemu_ram_block_by_name(packet->ramblock);
 780        if (!block) {
 781            error_setg(errp, "multifd: unknown ram block %s",
 782                       packet->ramblock);
 783            return -1;
 784        }
 785    }
 786
 787    for (i = 0; i < p->pages->used; i++) {
 788        ram_addr_t offset = be64_to_cpu(packet->offset[i]);
 789
 790        if (offset > (block->used_length - TARGET_PAGE_SIZE)) {
 791            error_setg(errp, "multifd: offset too long " RAM_ADDR_FMT
 792                       " (max " RAM_ADDR_FMT ")",
 793                       offset, block->max_length);
 794            return -1;
 795        }
 796        p->pages->iov[i].iov_base = block->host + offset;
 797        p->pages->iov[i].iov_len = TARGET_PAGE_SIZE;
 798    }
 799
 800    return 0;
 801}
 802
 803struct {
 804    MultiFDSendParams *params;
 805    /* number of created threads */
 806    int count;
 807    /* array of pages to sent */
 808    MultiFDPages_t *pages;
 809    /* syncs main thread and channels */
 810    QemuSemaphore sem_sync;
 811    /* global number of generated multifd packets */
 812    uint64_t packet_num;
 813    /* send channels ready */
 814    QemuSemaphore channels_ready;
 815} *multifd_send_state;
 816
 817/*
 818 * How we use multifd_send_state->pages and channel->pages?
 819 *
 820 * We create a pages for each channel, and a main one.  Each time that
 821 * we need to send a batch of pages we interchange the ones between
 822 * multifd_send_state and the channel that is sending it.  There are
 823 * two reasons for that:
 824 *    - to not have to do so many mallocs during migration
 825 *    - to make easier to know what to free at the end of migration
 826 *
 827 * This way we always know who is the owner of each "pages" struct,
 828 * and we don't need any loocking.  It belongs to the migration thread
 829 * or to the channel thread.  Switching is safe because the migration
 830 * thread is using the channel mutex when changing it, and the channel
 831 * have to had finish with its own, otherwise pending_job can't be
 832 * false.
 833 */
 834
 835static void multifd_send_pages(void)
 836{
 837    int i;
 838    static int next_channel;
 839    MultiFDSendParams *p = NULL; /* make happy gcc */
 840    MultiFDPages_t *pages = multifd_send_state->pages;
 841    uint64_t transferred;
 842
 843    qemu_sem_wait(&multifd_send_state->channels_ready);
 844    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
 845        p = &multifd_send_state->params[i];
 846
 847        qemu_mutex_lock(&p->mutex);
 848        if (!p->pending_job) {
 849            p->pending_job++;
 850            next_channel = (i + 1) % migrate_multifd_channels();
 851            break;
 852        }
 853        qemu_mutex_unlock(&p->mutex);
 854    }
 855    p->pages->used = 0;
 856
 857    p->packet_num = multifd_send_state->packet_num++;
 858    p->pages->block = NULL;
 859    multifd_send_state->pages = p->pages;
 860    p->pages = pages;
 861    transferred = ((uint64_t) pages->used) * TARGET_PAGE_SIZE + p->packet_len;
 862    ram_counters.multifd_bytes += transferred;
 863    ram_counters.transferred += transferred;;
 864    qemu_mutex_unlock(&p->mutex);
 865    qemu_sem_post(&p->sem);
 866}
 867
 868static void multifd_queue_page(RAMBlock *block, ram_addr_t offset)
 869{
 870    MultiFDPages_t *pages = multifd_send_state->pages;
 871
 872    if (!pages->block) {
 873        pages->block = block;
 874    }
 875
 876    if (pages->block == block) {
 877        pages->offset[pages->used] = offset;
 878        pages->iov[pages->used].iov_base = block->host + offset;
 879        pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
 880        pages->used++;
 881
 882        if (pages->used < pages->allocated) {
 883            return;
 884        }
 885    }
 886
 887    multifd_send_pages();
 888
 889    if (pages->block != block) {
 890        multifd_queue_page(block, offset);
 891    }
 892}
 893
 894static void multifd_send_terminate_threads(Error *err)
 895{
 896    int i;
 897
 898    if (err) {
 899        MigrationState *s = migrate_get_current();
 900        migrate_set_error(s, err);
 901        if (s->state == MIGRATION_STATUS_SETUP ||
 902            s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
 903            s->state == MIGRATION_STATUS_DEVICE ||
 904            s->state == MIGRATION_STATUS_ACTIVE) {
 905            migrate_set_state(&s->state, s->state,
 906                              MIGRATION_STATUS_FAILED);
 907        }
 908    }
 909
 910    for (i = 0; i < migrate_multifd_channels(); i++) {
 911        MultiFDSendParams *p = &multifd_send_state->params[i];
 912
 913        qemu_mutex_lock(&p->mutex);
 914        p->quit = true;
 915        qemu_sem_post(&p->sem);
 916        qemu_mutex_unlock(&p->mutex);
 917    }
 918}
 919
 920int multifd_save_cleanup(Error **errp)
 921{
 922    int i;
 923    int ret = 0;
 924
 925    if (!migrate_use_multifd()) {
 926        return 0;
 927    }
 928    multifd_send_terminate_threads(NULL);
 929    for (i = 0; i < migrate_multifd_channels(); i++) {
 930        MultiFDSendParams *p = &multifd_send_state->params[i];
 931
 932        if (p->running) {
 933            qemu_thread_join(&p->thread);
 934        }
 935        socket_send_channel_destroy(p->c);
 936        p->c = NULL;
 937        qemu_mutex_destroy(&p->mutex);
 938        qemu_sem_destroy(&p->sem);
 939        qemu_sem_destroy(&p->sem_sync);
 940        g_free(p->name);
 941        p->name = NULL;
 942        multifd_pages_clear(p->pages);
 943        p->pages = NULL;
 944        p->packet_len = 0;
 945        g_free(p->packet);
 946        p->packet = NULL;
 947    }
 948    qemu_sem_destroy(&multifd_send_state->channels_ready);
 949    qemu_sem_destroy(&multifd_send_state->sem_sync);
 950    g_free(multifd_send_state->params);
 951    multifd_send_state->params = NULL;
 952    multifd_pages_clear(multifd_send_state->pages);
 953    multifd_send_state->pages = NULL;
 954    g_free(multifd_send_state);
 955    multifd_send_state = NULL;
 956    return ret;
 957}
 958
 959static void multifd_send_sync_main(void)
 960{
 961    int i;
 962
 963    if (!migrate_use_multifd()) {
 964        return;
 965    }
 966    if (multifd_send_state->pages->used) {
 967        multifd_send_pages();
 968    }
 969    for (i = 0; i < migrate_multifd_channels(); i++) {
 970        MultiFDSendParams *p = &multifd_send_state->params[i];
 971
 972        trace_multifd_send_sync_main_signal(p->id);
 973
 974        qemu_mutex_lock(&p->mutex);
 975
 976        p->packet_num = multifd_send_state->packet_num++;
 977        p->flags |= MULTIFD_FLAG_SYNC;
 978        p->pending_job++;
 979        qemu_mutex_unlock(&p->mutex);
 980        qemu_sem_post(&p->sem);
 981    }
 982    for (i = 0; i < migrate_multifd_channels(); i++) {
 983        MultiFDSendParams *p = &multifd_send_state->params[i];
 984
 985        trace_multifd_send_sync_main_wait(p->id);
 986        qemu_sem_wait(&multifd_send_state->sem_sync);
 987    }
 988    trace_multifd_send_sync_main(multifd_send_state->packet_num);
 989}
 990
 991static void *multifd_send_thread(void *opaque)
 992{
 993    MultiFDSendParams *p = opaque;
 994    Error *local_err = NULL;
 995    int ret;
 996
 997    trace_multifd_send_thread_start(p->id);
 998    rcu_register_thread();
 999
1000    if (multifd_send_initial_packet(p, &local_err) < 0) {
1001        goto out;
1002    }
1003    /* initial packet */
1004    p->num_packets = 1;
1005
1006    while (true) {
1007        qemu_sem_wait(&p->sem);
1008        qemu_mutex_lock(&p->mutex);
1009
1010        if (p->pending_job) {
1011            uint32_t used = p->pages->used;
1012            uint64_t packet_num = p->packet_num;
1013            uint32_t flags = p->flags;
1014
1015            multifd_send_fill_packet(p);
1016            p->flags = 0;
1017            p->num_packets++;
1018            p->num_pages += used;
1019            p->pages->used = 0;
1020            qemu_mutex_unlock(&p->mutex);
1021
1022            trace_multifd_send(p->id, packet_num, used, flags);
1023
1024            ret = qio_channel_write_all(p->c, (void *)p->packet,
1025                                        p->packet_len, &local_err);
1026            if (ret != 0) {
1027                break;
1028            }
1029
1030            ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err);
1031            if (ret != 0) {
1032                break;
1033            }
1034
1035            qemu_mutex_lock(&p->mutex);
1036            p->pending_job--;
1037            qemu_mutex_unlock(&p->mutex);
1038
1039            if (flags & MULTIFD_FLAG_SYNC) {
1040                qemu_sem_post(&multifd_send_state->sem_sync);
1041            }
1042            qemu_sem_post(&multifd_send_state->channels_ready);
1043        } else if (p->quit) {
1044            qemu_mutex_unlock(&p->mutex);
1045            break;
1046        } else {
1047            qemu_mutex_unlock(&p->mutex);
1048            /* sometimes there are spurious wakeups */
1049        }
1050    }
1051
1052out:
1053    if (local_err) {
1054        multifd_send_terminate_threads(local_err);
1055    }
1056
1057    qemu_mutex_lock(&p->mutex);
1058    p->running = false;
1059    qemu_mutex_unlock(&p->mutex);
1060
1061    rcu_unregister_thread();
1062    trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
1063
1064    return NULL;
1065}
1066
1067static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
1068{
1069    MultiFDSendParams *p = opaque;
1070    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
1071    Error *local_err = NULL;
1072
1073    if (qio_task_propagate_error(task, &local_err)) {
1074        if (multifd_save_cleanup(&local_err) != 0) {
1075            migrate_set_error(migrate_get_current(), local_err);
1076        }
1077    } else {
1078        p->c = QIO_CHANNEL(sioc);
1079        qio_channel_set_delay(p->c, false);
1080        p->running = true;
1081        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
1082                           QEMU_THREAD_JOINABLE);
1083
1084        atomic_inc(&multifd_send_state->count);
1085    }
1086}
1087
1088int multifd_save_setup(void)
1089{
1090    int thread_count;
1091    uint32_t page_count = migrate_multifd_page_count();
1092    uint8_t i;
1093
1094    if (!migrate_use_multifd()) {
1095        return 0;
1096    }
1097    thread_count = migrate_multifd_channels();
1098    multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1099    multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1100    atomic_set(&multifd_send_state->count, 0);
1101    multifd_send_state->pages = multifd_pages_init(page_count);
1102    qemu_sem_init(&multifd_send_state->sem_sync, 0);
1103    qemu_sem_init(&multifd_send_state->channels_ready, 0);
1104
1105    for (i = 0; i < thread_count; i++) {
1106        MultiFDSendParams *p = &multifd_send_state->params[i];
1107
1108        qemu_mutex_init(&p->mutex);
1109        qemu_sem_init(&p->sem, 0);
1110        qemu_sem_init(&p->sem_sync, 0);
1111        p->quit = false;
1112        p->pending_job = 0;
1113        p->id = i;
1114        p->pages = multifd_pages_init(page_count);
1115        p->packet_len = sizeof(MultiFDPacket_t)
1116                      + sizeof(ram_addr_t) * page_count;
1117        p->packet = g_malloc0(p->packet_len);
1118        p->name = g_strdup_printf("multifdsend_%d", i);
1119        socket_send_channel_create(multifd_new_send_channel_async, p);
1120    }
1121    return 0;
1122}
1123
1124struct {
1125    MultiFDRecvParams *params;
1126    /* number of created threads */
1127    int count;
1128    /* syncs main thread and channels */
1129    QemuSemaphore sem_sync;
1130    /* global number of generated multifd packets */
1131    uint64_t packet_num;
1132} *multifd_recv_state;
1133
1134static void multifd_recv_terminate_threads(Error *err)
1135{
1136    int i;
1137
1138    if (err) {
1139        MigrationState *s = migrate_get_current();
1140        migrate_set_error(s, err);
1141        if (s->state == MIGRATION_STATUS_SETUP ||
1142            s->state == MIGRATION_STATUS_ACTIVE) {
1143            migrate_set_state(&s->state, s->state,
1144                              MIGRATION_STATUS_FAILED);
1145        }
1146    }
1147
1148    for (i = 0; i < migrate_multifd_channels(); i++) {
1149        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1150
1151        qemu_mutex_lock(&p->mutex);
1152        /* We could arrive here for two reasons:
1153           - normal quit, i.e. everything went fine, just finished
1154           - error quit: We close the channels so the channel threads
1155             finish the qio_channel_read_all_eof() */
1156        qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1157        qemu_mutex_unlock(&p->mutex);
1158    }
1159}
1160
1161int multifd_load_cleanup(Error **errp)
1162{
1163    int i;
1164    int ret = 0;
1165
1166    if (!migrate_use_multifd()) {
1167        return 0;
1168    }
1169    multifd_recv_terminate_threads(NULL);
1170    for (i = 0; i < migrate_multifd_channels(); i++) {
1171        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1172
1173        if (p->running) {
1174            qemu_thread_join(&p->thread);
1175        }
1176        object_unref(OBJECT(p->c));
1177        p->c = NULL;
1178        qemu_mutex_destroy(&p->mutex);
1179        qemu_sem_destroy(&p->sem_sync);
1180        g_free(p->name);
1181        p->name = NULL;
1182        multifd_pages_clear(p->pages);
1183        p->pages = NULL;
1184        p->packet_len = 0;
1185        g_free(p->packet);
1186        p->packet = NULL;
1187    }
1188    qemu_sem_destroy(&multifd_recv_state->sem_sync);
1189    g_free(multifd_recv_state->params);
1190    multifd_recv_state->params = NULL;
1191    g_free(multifd_recv_state);
1192    multifd_recv_state = NULL;
1193
1194    return ret;
1195}
1196
1197static void multifd_recv_sync_main(void)
1198{
1199    int i;
1200
1201    if (!migrate_use_multifd()) {
1202        return;
1203    }
1204    for (i = 0; i < migrate_multifd_channels(); i++) {
1205        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1206
1207        trace_multifd_recv_sync_main_wait(p->id);
1208        qemu_sem_wait(&multifd_recv_state->sem_sync);
1209        qemu_mutex_lock(&p->mutex);
1210        if (multifd_recv_state->packet_num < p->packet_num) {
1211            multifd_recv_state->packet_num = p->packet_num;
1212        }
1213        qemu_mutex_unlock(&p->mutex);
1214    }
1215    for (i = 0; i < migrate_multifd_channels(); i++) {
1216        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1217
1218        trace_multifd_recv_sync_main_signal(p->id);
1219        qemu_sem_post(&p->sem_sync);
1220    }
1221    trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1222}
1223
1224static void *multifd_recv_thread(void *opaque)
1225{
1226    MultiFDRecvParams *p = opaque;
1227    Error *local_err = NULL;
1228    int ret;
1229
1230    trace_multifd_recv_thread_start(p->id);
1231    rcu_register_thread();
1232
1233    while (true) {
1234        uint32_t used;
1235        uint32_t flags;
1236
1237        ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1238                                       p->packet_len, &local_err);
1239        if (ret == 0) {   /* EOF */
1240            break;
1241        }
1242        if (ret == -1) {   /* Error */
1243            break;
1244        }
1245
1246        qemu_mutex_lock(&p->mutex);
1247        ret = multifd_recv_unfill_packet(p, &local_err);
1248        if (ret) {
1249            qemu_mutex_unlock(&p->mutex);
1250            break;
1251        }
1252
1253        used = p->pages->used;
1254        flags = p->flags;
1255        trace_multifd_recv(p->id, p->packet_num, used, flags);
1256        p->num_packets++;
1257        p->num_pages += used;
1258        qemu_mutex_unlock(&p->mutex);
1259
1260        ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
1261        if (ret != 0) {
1262            break;
1263        }
1264
1265        if (flags & MULTIFD_FLAG_SYNC) {
1266            qemu_sem_post(&multifd_recv_state->sem_sync);
1267            qemu_sem_wait(&p->sem_sync);
1268        }
1269    }
1270
1271    if (local_err) {
1272        multifd_recv_terminate_threads(local_err);
1273    }
1274    qemu_mutex_lock(&p->mutex);
1275    p->running = false;
1276    qemu_mutex_unlock(&p->mutex);
1277
1278    rcu_unregister_thread();
1279    trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
1280
1281    return NULL;
1282}
1283
1284int multifd_load_setup(void)
1285{
1286    int thread_count;
1287    uint32_t page_count = migrate_multifd_page_count();
1288    uint8_t i;
1289
1290    if (!migrate_use_multifd()) {
1291        return 0;
1292    }
1293    thread_count = migrate_multifd_channels();
1294    multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1295    multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1296    atomic_set(&multifd_recv_state->count, 0);
1297    qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1298
1299    for (i = 0; i < thread_count; i++) {
1300        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1301
1302        qemu_mutex_init(&p->mutex);
1303        qemu_sem_init(&p->sem_sync, 0);
1304        p->id = i;
1305        p->pages = multifd_pages_init(page_count);
1306        p->packet_len = sizeof(MultiFDPacket_t)
1307                      + sizeof(ram_addr_t) * page_count;
1308        p->packet = g_malloc0(p->packet_len);
1309        p->name = g_strdup_printf("multifdrecv_%d", i);
1310    }
1311    return 0;
1312}
1313
1314bool multifd_recv_all_channels_created(void)
1315{
1316    int thread_count = migrate_multifd_channels();
1317
1318    if (!migrate_use_multifd()) {
1319        return true;
1320    }
1321
1322    return thread_count == atomic_read(&multifd_recv_state->count);
1323}
1324
1325/* Return true if multifd is ready for the migration, otherwise false */
1326bool multifd_recv_new_channel(QIOChannel *ioc)
1327{
1328    MultiFDRecvParams *p;
1329    Error *local_err = NULL;
1330    int id;
1331
1332    id = multifd_recv_initial_packet(ioc, &local_err);
1333    if (id < 0) {
1334        multifd_recv_terminate_threads(local_err);
1335        return false;
1336    }
1337
1338    p = &multifd_recv_state->params[id];
1339    if (p->c != NULL) {
1340        error_setg(&local_err, "multifd: received id '%d' already setup'",
1341                   id);
1342        multifd_recv_terminate_threads(local_err);
1343        return false;
1344    }
1345    p->c = ioc;
1346    object_ref(OBJECT(ioc));
1347    /* initial packet */
1348    p->num_packets = 1;
1349
1350    p->running = true;
1351    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1352                       QEMU_THREAD_JOINABLE);
1353    atomic_inc(&multifd_recv_state->count);
1354    return multifd_recv_state->count == migrate_multifd_channels();
1355}
1356
1357/**
1358 * save_page_header: write page header to wire
1359 *
1360 * If this is the 1st block, it also writes the block identification
1361 *
1362 * Returns the number of bytes written
1363 *
1364 * @f: QEMUFile where to send the data
1365 * @block: block that contains the page we want to send
1366 * @offset: offset inside the block for the page
1367 *          in the lower bits, it contains flags
1368 */
1369static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
1370                               ram_addr_t offset)
1371{
1372    size_t size, len;
1373
1374    if (block == rs->last_sent_block) {
1375        offset |= RAM_SAVE_FLAG_CONTINUE;
1376    }
1377    qemu_put_be64(f, offset);
1378    size = 8;
1379
1380    if (!(offset & RAM_SAVE_FLAG_CONTINUE)) {
1381        len = strlen(block->idstr);
1382        qemu_put_byte(f, len);
1383        qemu_put_buffer(f, (uint8_t *)block->idstr, len);
1384        size += 1 + len;
1385        rs->last_sent_block = block;
1386    }
1387    return size;
1388}
1389
1390/**
1391 * mig_throttle_guest_down: throotle down the guest
1392 *
1393 * Reduce amount of guest cpu execution to hopefully slow down memory
1394 * writes. If guest dirty memory rate is reduced below the rate at
1395 * which we can transfer pages to the destination then we should be
1396 * able to complete migration. Some workloads dirty memory way too
1397 * fast and will not effectively converge, even with auto-converge.
1398 */
1399static void mig_throttle_guest_down(void)
1400{
1401    MigrationState *s = migrate_get_current();
1402    uint64_t pct_initial = s->parameters.cpu_throttle_initial;
1403    uint64_t pct_icrement = s->parameters.cpu_throttle_increment;
1404    int pct_max = s->parameters.max_cpu_throttle;
1405
1406    /* We have not started throttling yet. Let's start it. */
1407    if (!cpu_throttle_active()) {
1408        cpu_throttle_set(pct_initial);
1409    } else {
1410        /* Throttling already on, just increase the rate */
1411        cpu_throttle_set(MIN(cpu_throttle_get_percentage() + pct_icrement,
1412                         pct_max));
1413    }
1414}
1415
1416/**
1417 * xbzrle_cache_zero_page: insert a zero page in the XBZRLE cache
1418 *
1419 * @rs: current RAM state
1420 * @current_addr: address for the zero page
1421 *
1422 * Update the xbzrle cache to reflect a page that's been sent as all 0.
1423 * The important thing is that a stale (not-yet-0'd) page be replaced
1424 * by the new data.
1425 * As a bonus, if the page wasn't in the cache it gets added so that
1426 * when a small write is made into the 0'd page it gets XBZRLE sent.
1427 */
1428static void xbzrle_cache_zero_page(RAMState *rs, ram_addr_t current_addr)
1429{
1430    if (rs->ram_bulk_stage || !migrate_use_xbzrle()) {
1431        return;
1432    }
1433
1434    /* We don't care if this fails to allocate a new cache page
1435     * as long as it updated an old one */
1436    cache_insert(XBZRLE.cache, current_addr, XBZRLE.zero_target_page,
1437                 ram_counters.dirty_sync_count);
1438}
1439
1440#define ENCODING_FLAG_XBZRLE 0x1
1441
1442/**
1443 * save_xbzrle_page: compress and send current page
1444 *
1445 * Returns: 1 means that we wrote the page
1446 *          0 means that page is identical to the one already sent
1447 *          -1 means that xbzrle would be longer than normal
1448 *
1449 * @rs: current RAM state
1450 * @current_data: pointer to the address of the page contents
1451 * @current_addr: addr of the page
1452 * @block: block that contains the page we want to send
1453 * @offset: offset inside the block for the page
1454 * @last_stage: if we are at the completion stage
1455 */
1456static int save_xbzrle_page(RAMState *rs, uint8_t **current_data,
1457                            ram_addr_t current_addr, RAMBlock *block,
1458                            ram_addr_t offset, bool last_stage)
1459{
1460    int encoded_len = 0, bytes_xbzrle;
1461    uint8_t *prev_cached_page;
1462
1463    if (!cache_is_cached(XBZRLE.cache, current_addr,
1464                         ram_counters.dirty_sync_count)) {
1465        xbzrle_counters.cache_miss++;
1466        if (!last_stage) {
1467            if (cache_insert(XBZRLE.cache, current_addr, *current_data,
1468                             ram_counters.dirty_sync_count) == -1) {
1469                return -1;
1470            } else {
1471                /* update *current_data when the page has been
1472                   inserted into cache */
1473                *current_data = get_cached_data(XBZRLE.cache, current_addr);
1474            }
1475        }
1476        return -1;
1477    }
1478
1479    prev_cached_page = get_cached_data(XBZRLE.cache, current_addr);
1480
1481    /* save current buffer into memory */
1482    memcpy(XBZRLE.current_buf, *current_data, TARGET_PAGE_SIZE);
1483
1484    /* XBZRLE encoding (if there is no overflow) */
1485    encoded_len = xbzrle_encode_buffer(prev_cached_page, XBZRLE.current_buf,
1486                                       TARGET_PAGE_SIZE, XBZRLE.encoded_buf,
1487                                       TARGET_PAGE_SIZE);
1488    if (encoded_len == 0) {
1489        trace_save_xbzrle_page_skipping();
1490        return 0;
1491    } else if (encoded_len == -1) {
1492        trace_save_xbzrle_page_overflow();
1493        xbzrle_counters.overflow++;
1494        /* update data in the cache */
1495        if (!last_stage) {
1496            memcpy(prev_cached_page, *current_data, TARGET_PAGE_SIZE);
1497            *current_data = prev_cached_page;
1498        }
1499        return -1;
1500    }
1501
1502    /* we need to update the data in the cache, in order to get the same data */
1503    if (!last_stage) {
1504        memcpy(prev_cached_page, XBZRLE.current_buf, TARGET_PAGE_SIZE);
1505    }
1506
1507    /* Send XBZRLE based compressed page */
1508    bytes_xbzrle = save_page_header(rs, rs->f, block,
1509                                    offset | RAM_SAVE_FLAG_XBZRLE);
1510    qemu_put_byte(rs->f, ENCODING_FLAG_XBZRLE);
1511    qemu_put_be16(rs->f, encoded_len);
1512    qemu_put_buffer(rs->f, XBZRLE.encoded_buf, encoded_len);
1513    bytes_xbzrle += encoded_len + 1 + 2;
1514    xbzrle_counters.pages++;
1515    xbzrle_counters.bytes += bytes_xbzrle;
1516    ram_counters.transferred += bytes_xbzrle;
1517
1518    return 1;
1519}
1520
1521/**
1522 * migration_bitmap_find_dirty: find the next dirty page from start
1523 *
1524 * Called with rcu_read_lock() to protect migration_bitmap
1525 *
1526 * Returns the byte offset within memory region of the start of a dirty page
1527 *
1528 * @rs: current RAM state
1529 * @rb: RAMBlock where to search for dirty pages
1530 * @start: page where we start the search
1531 */
1532static inline
1533unsigned long migration_bitmap_find_dirty(RAMState *rs, RAMBlock *rb,
1534                                          unsigned long start)
1535{
1536    unsigned long size = rb->used_length >> TARGET_PAGE_BITS;
1537    unsigned long *bitmap = rb->bmap;
1538    unsigned long next;
1539
1540    if (!qemu_ram_is_migratable(rb)) {
1541        return size;
1542    }
1543
1544    if (rs->ram_bulk_stage && start > 0) {
1545        next = start + 1;
1546    } else {
1547        next = find_next_bit(bitmap, size, start);
1548    }
1549
1550    return next;
1551}
1552
1553static inline bool migration_bitmap_clear_dirty(RAMState *rs,
1554                                                RAMBlock *rb,
1555                                                unsigned long page)
1556{
1557    bool ret;
1558
1559    ret = test_and_clear_bit(page, rb->bmap);
1560
1561    if (ret) {
1562        rs->migration_dirty_pages--;
1563    }
1564    return ret;
1565}
1566
1567static void migration_bitmap_sync_range(RAMState *rs, RAMBlock *rb,
1568                                        ram_addr_t start, ram_addr_t length)
1569{
1570    rs->migration_dirty_pages +=
1571        cpu_physical_memory_sync_dirty_bitmap(rb, start, length,
1572                                              &rs->num_dirty_pages_period);
1573}
1574
1575/**
1576 * ram_pagesize_summary: calculate all the pagesizes of a VM
1577 *
1578 * Returns a summary bitmap of the page sizes of all RAMBlocks
1579 *
1580 * For VMs with just normal pages this is equivalent to the host page
1581 * size. If it's got some huge pages then it's the OR of all the
1582 * different page sizes.
1583 */
1584uint64_t ram_pagesize_summary(void)
1585{
1586    RAMBlock *block;
1587    uint64_t summary = 0;
1588
1589    RAMBLOCK_FOREACH_MIGRATABLE(block) {
1590        summary |= block->page_size;
1591    }
1592
1593    return summary;
1594}
1595
1596static void migration_update_rates(RAMState *rs, int64_t end_time)
1597{
1598    uint64_t page_count = rs->target_page_count - rs->target_page_count_prev;
1599    double compressed_size;
1600
1601    /* calculate period counters */
1602    ram_counters.dirty_pages_rate = rs->num_dirty_pages_period * 1000
1603                / (end_time - rs->time_last_bitmap_sync);
1604
1605    if (!page_count) {
1606        return;
1607    }
1608
1609    if (migrate_use_xbzrle()) {
1610        xbzrle_counters.cache_miss_rate = (double)(xbzrle_counters.cache_miss -
1611            rs->xbzrle_cache_miss_prev) / page_count;
1612        rs->xbzrle_cache_miss_prev = xbzrle_counters.cache_miss;
1613    }
1614
1615    if (migrate_use_compression()) {
1616        compression_counters.busy_rate = (double)(compression_counters.busy -
1617            rs->compress_thread_busy_prev) / page_count;
1618        rs->compress_thread_busy_prev = compression_counters.busy;
1619
1620        compressed_size = compression_counters.compressed_size -
1621                          rs->compressed_size_prev;
1622        if (compressed_size) {
1623            double uncompressed_size = (compression_counters.pages -
1624                                    rs->compress_pages_prev) * TARGET_PAGE_SIZE;
1625
1626            /* Compression-Ratio = Uncompressed-size / Compressed-size */
1627            compression_counters.compression_rate =
1628                                        uncompressed_size / compressed_size;
1629
1630            rs->compress_pages_prev = compression_counters.pages;
1631            rs->compressed_size_prev = compression_counters.compressed_size;
1632        }
1633    }
1634}
1635
1636static void migration_bitmap_sync(RAMState *rs)
1637{
1638    RAMBlock *block;
1639    int64_t end_time;
1640    uint64_t bytes_xfer_now;
1641
1642    ram_counters.dirty_sync_count++;
1643
1644    if (!rs->time_last_bitmap_sync) {
1645        rs->time_last_bitmap_sync = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1646    }
1647
1648    trace_migration_bitmap_sync_start();
1649    memory_global_dirty_log_sync();
1650
1651    qemu_mutex_lock(&rs->bitmap_mutex);
1652    rcu_read_lock();
1653    RAMBLOCK_FOREACH_MIGRATABLE(block) {
1654        migration_bitmap_sync_range(rs, block, 0, block->used_length);
1655    }
1656    ram_counters.remaining = ram_bytes_remaining();
1657    rcu_read_unlock();
1658    qemu_mutex_unlock(&rs->bitmap_mutex);
1659
1660    trace_migration_bitmap_sync_end(rs->num_dirty_pages_period);
1661
1662    end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1663
1664    /* more than 1 second = 1000 millisecons */
1665    if (end_time > rs->time_last_bitmap_sync + 1000) {
1666        bytes_xfer_now = ram_counters.transferred;
1667
1668        /* During block migration the auto-converge logic incorrectly detects
1669         * that ram migration makes no progress. Avoid this by disabling the
1670         * throttling logic during the bulk phase of block migration. */
1671        if (migrate_auto_converge() && !blk_mig_bulk_active()) {
1672            /* The following detection logic can be refined later. For now:
1673               Check to see if the dirtied bytes is 50% more than the approx.
1674               amount of bytes that just got transferred since the last time we
1675               were in this routine. If that happens twice, start or increase
1676               throttling */
1677
1678            if ((rs->num_dirty_pages_period * TARGET_PAGE_SIZE >
1679                   (bytes_xfer_now - rs->bytes_xfer_prev) / 2) &&
1680                (++rs->dirty_rate_high_cnt >= 2)) {
1681                    trace_migration_throttle();
1682                    rs->dirty_rate_high_cnt = 0;
1683                    mig_throttle_guest_down();
1684            }
1685        }
1686
1687        migration_update_rates(rs, end_time);
1688
1689        rs->target_page_count_prev = rs->target_page_count;
1690
1691        /* reset period counters */
1692        rs->time_last_bitmap_sync = end_time;
1693        rs->num_dirty_pages_period = 0;
1694        rs->bytes_xfer_prev = bytes_xfer_now;
1695    }
1696    if (migrate_use_events()) {
1697        qapi_event_send_migration_pass(ram_counters.dirty_sync_count);
1698    }
1699}
1700
1701/**
1702 * save_zero_page_to_file: send the zero page to the file
1703 *
1704 * Returns the size of data written to the file, 0 means the page is not
1705 * a zero page
1706 *
1707 * @rs: current RAM state
1708 * @file: the file where the data is saved
1709 * @block: block that contains the page we want to send
1710 * @offset: offset inside the block for the page
1711 */
1712static int save_zero_page_to_file(RAMState *rs, QEMUFile *file,
1713                                  RAMBlock *block, ram_addr_t offset)
1714{
1715    uint8_t *p = block->host + offset;
1716    int len = 0;
1717
1718    if (is_zero_range(p, TARGET_PAGE_SIZE)) {
1719        len += save_page_header(rs, file, block, offset | RAM_SAVE_FLAG_ZERO);
1720        qemu_put_byte(file, 0);
1721        len += 1;
1722    }
1723    return len;
1724}
1725
1726/**
1727 * save_zero_page: send the zero page to the stream
1728 *
1729 * Returns the number of pages written.
1730 *
1731 * @rs: current RAM state
1732 * @block: block that contains the page we want to send
1733 * @offset: offset inside the block for the page
1734 */
1735static int save_zero_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
1736{
1737    int len = save_zero_page_to_file(rs, rs->f, block, offset);
1738
1739    if (len) {
1740        ram_counters.duplicate++;
1741        ram_counters.transferred += len;
1742        return 1;
1743    }
1744    return -1;
1745}
1746
1747static void ram_release_pages(const char *rbname, uint64_t offset, int pages)
1748{
1749    if (!migrate_release_ram() || !migration_in_postcopy()) {
1750        return;
1751    }
1752
1753    ram_discard_range(rbname, offset, pages << TARGET_PAGE_BITS);
1754}
1755
1756/*
1757 * @pages: the number of pages written by the control path,
1758 *        < 0 - error
1759 *        > 0 - number of pages written
1760 *
1761 * Return true if the pages has been saved, otherwise false is returned.
1762 */
1763static bool control_save_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
1764                              int *pages)
1765{
1766    uint64_t bytes_xmit = 0;
1767    int ret;
1768
1769    *pages = -1;
1770    ret = ram_control_save_page(rs->f, block->offset, offset, TARGET_PAGE_SIZE,
1771                                &bytes_xmit);
1772    if (ret == RAM_SAVE_CONTROL_NOT_SUPP) {
1773        return false;
1774    }
1775
1776    if (bytes_xmit) {
1777        ram_counters.transferred += bytes_xmit;
1778        *pages = 1;
1779    }
1780
1781    if (ret == RAM_SAVE_CONTROL_DELAYED) {
1782        return true;
1783    }
1784
1785    if (bytes_xmit > 0) {
1786        ram_counters.normal++;
1787    } else if (bytes_xmit == 0) {
1788        ram_counters.duplicate++;
1789    }
1790
1791    return true;
1792}
1793
1794/*
1795 * directly send the page to the stream
1796 *
1797 * Returns the number of pages written.
1798 *
1799 * @rs: current RAM state
1800 * @block: block that contains the page we want to send
1801 * @offset: offset inside the block for the page
1802 * @buf: the page to be sent
1803 * @async: send to page asyncly
1804 */
1805static int save_normal_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
1806                            uint8_t *buf, bool async)
1807{
1808    ram_counters.transferred += save_page_header(rs, rs->f, block,
1809                                                 offset | RAM_SAVE_FLAG_PAGE);
1810    if (async) {
1811        qemu_put_buffer_async(rs->f, buf, TARGET_PAGE_SIZE,
1812                              migrate_release_ram() &
1813                              migration_in_postcopy());
1814    } else {
1815        qemu_put_buffer(rs->f, buf, TARGET_PAGE_SIZE);
1816    }
1817    ram_counters.transferred += TARGET_PAGE_SIZE;
1818    ram_counters.normal++;
1819    return 1;
1820}
1821
1822/**
1823 * ram_save_page: send the given page to the stream
1824 *
1825 * Returns the number of pages written.
1826 *          < 0 - error
1827 *          >=0 - Number of pages written - this might legally be 0
1828 *                if xbzrle noticed the page was the same.
1829 *
1830 * @rs: current RAM state
1831 * @block: block that contains the page we want to send
1832 * @offset: offset inside the block for the page
1833 * @last_stage: if we are at the completion stage
1834 */
1835static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
1836{
1837    int pages = -1;
1838    uint8_t *p;
1839    bool send_async = true;
1840    RAMBlock *block = pss->block;
1841    ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
1842    ram_addr_t current_addr = block->offset + offset;
1843
1844    p = block->host + offset;
1845    trace_ram_save_page(block->idstr, (uint64_t)offset, p);
1846
1847    XBZRLE_cache_lock();
1848    if (!rs->ram_bulk_stage && !migration_in_postcopy() &&
1849        migrate_use_xbzrle()) {
1850        pages = save_xbzrle_page(rs, &p, current_addr, block,
1851                                 offset, last_stage);
1852        if (!last_stage) {
1853            /* Can't send this cached data async, since the cache page
1854             * might get updated before it gets to the wire
1855             */
1856            send_async = false;
1857        }
1858    }
1859
1860    /* XBZRLE overflow or normal page */
1861    if (pages == -1) {
1862        pages = save_normal_page(rs, block, offset, p, send_async);
1863    }
1864
1865    XBZRLE_cache_unlock();
1866
1867    return pages;
1868}
1869
1870static int ram_save_multifd_page(RAMState *rs, RAMBlock *block,
1871                                 ram_addr_t offset)
1872{
1873    multifd_queue_page(block, offset);
1874    ram_counters.normal++;
1875
1876    return 1;
1877}
1878
1879static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
1880                                 ram_addr_t offset, uint8_t *source_buf)
1881{
1882    RAMState *rs = ram_state;
1883    uint8_t *p = block->host + (offset & TARGET_PAGE_MASK);
1884    bool zero_page = false;
1885    int ret;
1886
1887    if (save_zero_page_to_file(rs, f, block, offset)) {
1888        zero_page = true;
1889        goto exit;
1890    }
1891
1892    save_page_header(rs, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
1893
1894    /*
1895     * copy it to a internal buffer to avoid it being modified by VM
1896     * so that we can catch up the error during compression and
1897     * decompression
1898     */
1899    memcpy(source_buf, p, TARGET_PAGE_SIZE);
1900    ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
1901    if (ret < 0) {
1902        qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
1903        error_report("compressed data failed!");
1904        return false;
1905    }
1906
1907exit:
1908    ram_release_pages(block->idstr, offset & TARGET_PAGE_MASK, 1);
1909    return zero_page;
1910}
1911
1912static void
1913update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
1914{
1915    ram_counters.transferred += bytes_xmit;
1916
1917    if (param->zero_page) {
1918        ram_counters.duplicate++;
1919        return;
1920    }
1921
1922    /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */
1923    compression_counters.compressed_size += bytes_xmit - 8;
1924    compression_counters.pages++;
1925}
1926
1927static bool save_page_use_compression(RAMState *rs);
1928
1929static void flush_compressed_data(RAMState *rs)
1930{
1931    int idx, len, thread_count;
1932
1933    if (!save_page_use_compression(rs)) {
1934        return;
1935    }
1936    thread_count = migrate_compress_threads();
1937
1938    qemu_mutex_lock(&comp_done_lock);
1939    for (idx = 0; idx < thread_count; idx++) {
1940        while (!comp_param[idx].done) {
1941            qemu_cond_wait(&comp_done_cond, &comp_done_lock);
1942        }
1943    }
1944    qemu_mutex_unlock(&comp_done_lock);
1945
1946    for (idx = 0; idx < thread_count; idx++) {
1947        qemu_mutex_lock(&comp_param[idx].mutex);
1948        if (!comp_param[idx].quit) {
1949            len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
1950            /*
1951             * it's safe to fetch zero_page without holding comp_done_lock
1952             * as there is no further request submitted to the thread,
1953             * i.e, the thread should be waiting for a request at this point.
1954             */
1955            update_compress_thread_counts(&comp_param[idx], len);
1956        }
1957        qemu_mutex_unlock(&comp_param[idx].mutex);
1958    }
1959}
1960
1961static inline void set_compress_params(CompressParam *param, RAMBlock *block,
1962                                       ram_addr_t offset)
1963{
1964    param->block = block;
1965    param->offset = offset;
1966}
1967
1968static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
1969                                           ram_addr_t offset)
1970{
1971    int idx, thread_count, bytes_xmit = -1, pages = -1;
1972    bool wait = migrate_compress_wait_thread();
1973
1974    thread_count = migrate_compress_threads();
1975    qemu_mutex_lock(&comp_done_lock);
1976retry:
1977    for (idx = 0; idx < thread_count; idx++) {
1978        if (comp_param[idx].done) {
1979            comp_param[idx].done = false;
1980            bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
1981            qemu_mutex_lock(&comp_param[idx].mutex);
1982            set_compress_params(&comp_param[idx], block, offset);
1983            qemu_cond_signal(&comp_param[idx].cond);
1984            qemu_mutex_unlock(&comp_param[idx].mutex);
1985            pages = 1;
1986            update_compress_thread_counts(&comp_param[idx], bytes_xmit);
1987            break;
1988        }
1989    }
1990
1991    /*
1992     * wait for the free thread if the user specifies 'compress-wait-thread',
1993     * otherwise we will post the page out in the main thread as normal page.
1994     */
1995    if (pages < 0 && wait) {
1996        qemu_cond_wait(&comp_done_cond, &comp_done_lock);
1997        goto retry;
1998    }
1999    qemu_mutex_unlock(&comp_done_lock);
2000
2001    return pages;
2002}
2003
2004/**
2005 * find_dirty_block: find the next dirty page and update any state
2006 * associated with the search process.
2007 *
2008 * Returns if a page is found
2009 *
2010 * @rs: current RAM state
2011 * @pss: data about the state of the current dirty page scan
2012 * @again: set to false if the search has scanned the whole of RAM
2013 */
2014static bool find_dirty_block(RAMState *rs, PageSearchStatus *pss, bool *again)
2015{
2016    pss->page = migration_bitmap_find_dirty(rs, pss->block, pss->page);
2017    if (pss->complete_round && pss->block == rs->last_seen_block &&
2018        pss->page >= rs->last_page) {
2019        /*
2020         * We've been once around the RAM and haven't found anything.
2021         * Give up.
2022         */
2023        *again = false;
2024        return false;
2025    }
2026    if ((pss->page << TARGET_PAGE_BITS) >= pss->block->used_length) {
2027        /* Didn't find anything in this RAM Block */
2028        pss->page = 0;
2029        pss->block = QLIST_NEXT_RCU(pss->block, next);
2030        if (!pss->block) {
2031            /*
2032             * If memory migration starts over, we will meet a dirtied page
2033             * which may still exists in compression threads's ring, so we
2034             * should flush the compressed data to make sure the new page
2035             * is not overwritten by the old one in the destination.
2036             *
2037             * Also If xbzrle is on, stop using the data compression at this
2038             * point. In theory, xbzrle can do better than compression.
2039             */
2040            flush_compressed_data(rs);
2041
2042            /* Hit the end of the list */
2043            pss->block = QLIST_FIRST_RCU(&ram_list.blocks);
2044            /* Flag that we've looped */
2045            pss->complete_round = true;
2046            rs->ram_bulk_stage = false;
2047        }
2048        /* Didn't find anything this time, but try again on the new block */
2049        *again = true;
2050        return false;
2051    } else {
2052        /* Can go around again, but... */
2053        *again = true;
2054        /* We've found something so probably don't need to */
2055        return true;
2056    }
2057}
2058
2059/**
2060 * unqueue_page: gets a page of the queue
2061 *
2062 * Helper for 'get_queued_page' - gets a page off the queue
2063 *
2064 * Returns the block of the page (or NULL if none available)
2065 *
2066 * @rs: current RAM state
2067 * @offset: used to return the offset within the RAMBlock
2068 */
2069static RAMBlock *unqueue_page(RAMState *rs, ram_addr_t *offset)
2070{
2071    RAMBlock *block = NULL;
2072
2073    if (QSIMPLEQ_EMPTY_ATOMIC(&rs->src_page_requests)) {
2074        return NULL;
2075    }
2076
2077    qemu_mutex_lock(&rs->src_page_req_mutex);
2078    if (!QSIMPLEQ_EMPTY(&rs->src_page_requests)) {
2079        struct RAMSrcPageRequest *entry =
2080                                QSIMPLEQ_FIRST(&rs->src_page_requests);
2081        block = entry->rb;
2082        *offset = entry->offset;
2083
2084        if (entry->len > TARGET_PAGE_SIZE) {
2085            entry->len -= TARGET_PAGE_SIZE;
2086            entry->offset += TARGET_PAGE_SIZE;
2087        } else {
2088            memory_region_unref(block->mr);
2089            QSIMPLEQ_REMOVE_HEAD(&rs->src_page_requests, next_req);
2090            g_free(entry);
2091            migration_consume_urgent_request();
2092        }
2093    }
2094    qemu_mutex_unlock(&rs->src_page_req_mutex);
2095
2096    return block;
2097}
2098
2099/**
2100 * get_queued_page: unqueue a page from the postocpy requests
2101 *
2102 * Skips pages that are already sent (!dirty)
2103 *
2104 * Returns if a queued page is found
2105 *
2106 * @rs: current RAM state
2107 * @pss: data about the state of the current dirty page scan
2108 */
2109static bool get_queued_page(RAMState *rs, PageSearchStatus *pss)
2110{
2111    RAMBlock  *block;
2112    ram_addr_t offset;
2113    bool dirty;
2114
2115    do {
2116        block = unqueue_page(rs, &offset);
2117        /*
2118         * We're sending this page, and since it's postcopy nothing else
2119         * will dirty it, and we must make sure it doesn't get sent again
2120         * even if this queue request was received after the background
2121         * search already sent it.
2122         */
2123        if (block) {
2124            unsigned long page;
2125
2126            page = offset >> TARGET_PAGE_BITS;
2127            dirty = test_bit(page, block->bmap);
2128            if (!dirty) {
2129                trace_get_queued_page_not_dirty(block->idstr, (uint64_t)offset,
2130                       page, test_bit(page, block->unsentmap));
2131            } else {
2132                trace_get_queued_page(block->idstr, (uint64_t)offset, page);
2133            }
2134        }
2135
2136    } while (block && !dirty);
2137
2138    if (block) {
2139        /*
2140         * As soon as we start servicing pages out of order, then we have
2141         * to kill the bulk stage, since the bulk stage assumes
2142         * in (migration_bitmap_find_and_reset_dirty) that every page is
2143         * dirty, that's no longer true.
2144         */
2145        rs->ram_bulk_stage = false;
2146
2147        /*
2148         * We want the background search to continue from the queued page
2149         * since the guest is likely to want other pages near to the page
2150         * it just requested.
2151         */
2152        pss->block = block;
2153        pss->page = offset >> TARGET_PAGE_BITS;
2154    }
2155
2156    return !!block;
2157}
2158
2159/**
2160 * migration_page_queue_free: drop any remaining pages in the ram
2161 * request queue
2162 *
2163 * It should be empty at the end anyway, but in error cases there may
2164 * be some left.  in case that there is any page left, we drop it.
2165 *
2166 */
2167static void migration_page_queue_free(RAMState *rs)
2168{
2169    struct RAMSrcPageRequest *mspr, *next_mspr;
2170    /* This queue generally should be empty - but in the case of a failed
2171     * migration might have some droppings in.
2172     */
2173    rcu_read_lock();
2174    QSIMPLEQ_FOREACH_SAFE(mspr, &rs->src_page_requests, next_req, next_mspr) {
2175        memory_region_unref(mspr->rb->mr);
2176        QSIMPLEQ_REMOVE_HEAD(&rs->src_page_requests, next_req);
2177        g_free(mspr);
2178    }
2179    rcu_read_unlock();
2180}
2181
2182/**
2183 * ram_save_queue_pages: queue the page for transmission
2184 *
2185 * A request from postcopy destination for example.
2186 *
2187 * Returns zero on success or negative on error
2188 *
2189 * @rbname: Name of the RAMBLock of the request. NULL means the
2190 *          same that last one.
2191 * @start: starting address from the start of the RAMBlock
2192 * @len: length (in bytes) to send
2193 */
2194int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len)
2195{
2196    RAMBlock *ramblock;
2197    RAMState *rs = ram_state;
2198
2199    ram_counters.postcopy_requests++;
2200    rcu_read_lock();
2201    if (!rbname) {
2202        /* Reuse last RAMBlock */
2203        ramblock = rs->last_req_rb;
2204
2205        if (!ramblock) {
2206            /*
2207             * Shouldn't happen, we can't reuse the last RAMBlock if
2208             * it's the 1st request.
2209             */
2210            error_report("ram_save_queue_pages no previous block");
2211            goto err;
2212        }
2213    } else {
2214        ramblock = qemu_ram_block_by_name(rbname);
2215
2216        if (!ramblock) {
2217            /* We shouldn't be asked for a non-existent RAMBlock */
2218            error_report("ram_save_queue_pages no block '%s'", rbname);
2219            goto err;
2220        }
2221        rs->last_req_rb = ramblock;
2222    }
2223    trace_ram_save_queue_pages(ramblock->idstr, start, len);
2224    if (start+len > ramblock->used_length) {
2225        error_report("%s request overrun start=" RAM_ADDR_FMT " len="
2226                     RAM_ADDR_FMT " blocklen=" RAM_ADDR_FMT,
2227                     __func__, start, len, ramblock->used_length);
2228        goto err;
2229    }
2230
2231    struct RAMSrcPageRequest *new_entry =
2232        g_malloc0(sizeof(struct RAMSrcPageRequest));
2233    new_entry->rb = ramblock;
2234    new_entry->offset = start;
2235    new_entry->len = len;
2236
2237    memory_region_ref(ramblock->mr);
2238    qemu_mutex_lock(&rs->src_page_req_mutex);
2239    QSIMPLEQ_INSERT_TAIL(&rs->src_page_requests, new_entry, next_req);
2240    migration_make_urgent_request();
2241    qemu_mutex_unlock(&rs->src_page_req_mutex);
2242    rcu_read_unlock();
2243
2244    return 0;
2245
2246err:
2247    rcu_read_unlock();
2248    return -1;
2249}
2250
2251static bool save_page_use_compression(RAMState *rs)
2252{
2253    if (!migrate_use_compression()) {
2254        return false;
2255    }
2256
2257    /*
2258     * If xbzrle is on, stop using the data compression after first
2259     * round of migration even if compression is enabled. In theory,
2260     * xbzrle can do better than compression.
2261     */
2262    if (rs->ram_bulk_stage || !migrate_use_xbzrle()) {
2263        return true;
2264    }
2265
2266    return false;
2267}
2268
2269/*
2270 * try to compress the page before posting it out, return true if the page
2271 * has been properly handled by compression, otherwise needs other
2272 * paths to handle it
2273 */
2274static bool save_compress_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
2275{
2276    if (!save_page_use_compression(rs)) {
2277        return false;
2278    }
2279
2280    /*
2281     * When starting the process of a new block, the first page of
2282     * the block should be sent out before other pages in the same
2283     * block, and all the pages in last block should have been sent
2284     * out, keeping this order is important, because the 'cont' flag
2285     * is used to avoid resending the block name.
2286     *
2287     * We post the fist page as normal page as compression will take
2288     * much CPU resource.
2289     */
2290    if (block != rs->last_sent_block) {
2291        flush_compressed_data(rs);
2292        return false;
2293    }
2294
2295    if (compress_page_with_multi_thread(rs, block, offset) > 0) {
2296        return true;
2297    }
2298
2299    compression_counters.busy++;
2300    return false;
2301}
2302
2303/**
2304 * ram_save_target_page: save one target page
2305 *
2306 * Returns the number of pages written
2307 *
2308 * @rs: current RAM state
2309 * @pss: data about the page we want to send
2310 * @last_stage: if we are at the completion stage
2311 */
2312static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
2313                                bool last_stage)
2314{
2315    RAMBlock *block = pss->block;
2316    ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
2317    int res;
2318
2319    if (control_save_page(rs, block, offset, &res)) {
2320        return res;
2321    }
2322
2323    if (save_compress_page(rs, block, offset)) {
2324        return 1;
2325    }
2326
2327    res = save_zero_page(rs, block, offset);
2328    if (res > 0) {
2329        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
2330         * page would be stale
2331         */
2332        if (!save_page_use_compression(rs)) {
2333            XBZRLE_cache_lock();
2334            xbzrle_cache_zero_page(rs, block->offset + offset);
2335            XBZRLE_cache_unlock();
2336        }
2337        ram_release_pages(block->idstr, offset, res);
2338        return res;
2339    }
2340
2341    /*
2342     * do not use multifd for compression as the first page in the new
2343     * block should be posted out before sending the compressed page
2344     */
2345    if (!save_page_use_compression(rs) && migrate_use_multifd()) {
2346        return ram_save_multifd_page(rs, block, offset);
2347    }
2348
2349    return ram_save_page(rs, pss, last_stage);
2350}
2351
2352/**
2353 * ram_save_host_page: save a whole host page
2354 *
2355 * Starting at *offset send pages up to the end of the current host
2356 * page. It's valid for the initial offset to point into the middle of
2357 * a host page in which case the remainder of the hostpage is sent.
2358 * Only dirty target pages are sent. Note that the host page size may
2359 * be a huge page for this block.
2360 * The saving stops at the boundary of the used_length of the block
2361 * if the RAMBlock isn't a multiple of the host page size.
2362 *
2363 * Returns the number of pages written or negative on error
2364 *
2365 * @rs: current RAM state
2366 * @ms: current migration state
2367 * @pss: data about the page we want to send
2368 * @last_stage: if we are at the completion stage
2369 */
2370static int ram_save_host_page(RAMState *rs, PageSearchStatus *pss,
2371                              bool last_stage)
2372{
2373    int tmppages, pages = 0;
2374    size_t pagesize_bits =
2375        qemu_ram_pagesize(pss->block) >> TARGET_PAGE_BITS;
2376
2377    if (!qemu_ram_is_migratable(pss->block)) {
2378        error_report("block %s should not be migrated !", pss->block->idstr);
2379        return 0;
2380    }
2381
2382    do {
2383        /* Check the pages is dirty and if it is send it */
2384        if (!migration_bitmap_clear_dirty(rs, pss->block, pss->page)) {
2385            pss->page++;
2386            continue;
2387        }
2388
2389        tmppages = ram_save_target_page(rs, pss, last_stage);
2390        if (tmppages < 0) {
2391            return tmppages;
2392        }
2393
2394        pages += tmppages;
2395        if (pss->block->unsentmap) {
2396            clear_bit(pss->page, pss->block->unsentmap);
2397        }
2398
2399        pss->page++;
2400    } while ((pss->page & (pagesize_bits - 1)) &&
2401             offset_in_ramblock(pss->block, pss->page << TARGET_PAGE_BITS));
2402
2403    /* The offset we leave with is the last one we looked at */
2404    pss->page--;
2405    return pages;
2406}
2407
2408/**
2409 * ram_find_and_save_block: finds a dirty page and sends it to f
2410 *
2411 * Called within an RCU critical section.
2412 *
2413 * Returns the number of pages written where zero means no dirty pages,
2414 * or negative on error
2415 *
2416 * @rs: current RAM state
2417 * @last_stage: if we are at the completion stage
2418 *
2419 * On systems where host-page-size > target-page-size it will send all the
2420 * pages in a host page that are dirty.
2421 */
2422
2423static int ram_find_and_save_block(RAMState *rs, bool last_stage)
2424{
2425    PageSearchStatus pss;
2426    int pages = 0;
2427    bool again, found;
2428
2429    /* No dirty page as there is zero RAM */
2430    if (!ram_bytes_total()) {
2431        return pages;
2432    }
2433
2434    pss.block = rs->last_seen_block;
2435    pss.page = rs->last_page;
2436    pss.complete_round = false;
2437
2438    if (!pss.block) {
2439        pss.block = QLIST_FIRST_RCU(&ram_list.blocks);
2440    }
2441
2442    do {
2443        again = true;
2444        found = get_queued_page(rs, &pss);
2445
2446        if (!found) {
2447            /* priority queue empty, so just search for something dirty */
2448            found = find_dirty_block(rs, &pss, &again);
2449        }
2450
2451        if (found) {
2452            pages = ram_save_host_page(rs, &pss, last_stage);
2453        }
2454    } while (!pages && again);
2455
2456    rs->last_seen_block = pss.block;
2457    rs->last_page = pss.page;
2458
2459    return pages;
2460}
2461
2462void acct_update_position(QEMUFile *f, size_t size, bool zero)
2463{
2464    uint64_t pages = size / TARGET_PAGE_SIZE;
2465
2466    if (zero) {
2467        ram_counters.duplicate += pages;
2468    } else {
2469        ram_counters.normal += pages;
2470        ram_counters.transferred += size;
2471        qemu_update_position(f, size);
2472    }
2473}
2474
2475uint64_t ram_bytes_total(void)
2476{
2477    RAMBlock *block;
2478    uint64_t total = 0;
2479
2480    rcu_read_lock();
2481    RAMBLOCK_FOREACH_MIGRATABLE(block) {
2482        total += block->used_length;
2483    }
2484    rcu_read_unlock();
2485    return total;
2486}
2487
2488static void xbzrle_load_setup(void)
2489{
2490    XBZRLE.decoded_buf = g_malloc(TARGET_PAGE_SIZE);
2491}
2492
2493static void xbzrle_load_cleanup(void)
2494{
2495    g_free(XBZRLE.decoded_buf);
2496    XBZRLE.decoded_buf = NULL;
2497}
2498
2499static void ram_state_cleanup(RAMState **rsp)
2500{
2501    if (*rsp) {
2502        migration_page_queue_free(*rsp);
2503        qemu_mutex_destroy(&(*rsp)->bitmap_mutex);
2504        qemu_mutex_destroy(&(*rsp)->src_page_req_mutex);
2505        g_free(*rsp);
2506        *rsp = NULL;
2507    }
2508}
2509
2510static void xbzrle_cleanup(void)
2511{
2512    XBZRLE_cache_lock();
2513    if (XBZRLE.cache) {
2514        cache_fini(XBZRLE.cache);
2515        g_free(XBZRLE.encoded_buf);
2516        g_free(XBZRLE.current_buf);
2517        g_free(XBZRLE.zero_target_page);
2518        XBZRLE.cache = NULL;
2519        XBZRLE.encoded_buf = NULL;
2520        XBZRLE.current_buf = NULL;
2521        XBZRLE.zero_target_page = NULL;
2522    }
2523    XBZRLE_cache_unlock();
2524}
2525
2526static void ram_save_cleanup(void *opaque)
2527{
2528    RAMState **rsp = opaque;
2529    RAMBlock *block;
2530
2531    /* caller have hold iothread lock or is in a bh, so there is
2532     * no writing race against this migration_bitmap
2533     */
2534    memory_global_dirty_log_stop();
2535
2536    RAMBLOCK_FOREACH_MIGRATABLE(block) {
2537        g_free(block->bmap);
2538        block->bmap = NULL;
2539        g_free(block->unsentmap);
2540        block->unsentmap = NULL;
2541    }
2542
2543    xbzrle_cleanup();
2544    compress_threads_save_cleanup();
2545    ram_state_cleanup(rsp);
2546}
2547
2548static void ram_state_reset(RAMState *rs)
2549{
2550    rs->last_seen_block = NULL;
2551    rs->last_sent_block = NULL;
2552    rs->last_page = 0;
2553    rs->last_version = ram_list.version;
2554    rs->ram_bulk_stage = true;
2555}
2556
2557#define MAX_WAIT 50 /* ms, half buffered_file limit */
2558
2559/*
2560 * 'expected' is the value you expect the bitmap mostly to be full
2561 * of; it won't bother printing lines that are all this value.
2562 * If 'todump' is null the migration bitmap is dumped.
2563 */
2564void ram_debug_dump_bitmap(unsigned long *todump, bool expected,
2565                           unsigned long pages)
2566{
2567    int64_t cur;
2568    int64_t linelen = 128;
2569    char linebuf[129];
2570
2571    for (cur = 0; cur < pages; cur += linelen) {
2572        int64_t curb;
2573        bool found = false;
2574        /*
2575         * Last line; catch the case where the line length
2576         * is longer than remaining ram
2577         */
2578        if (cur + linelen > pages) {
2579            linelen = pages - cur;
2580        }
2581        for (curb = 0; curb < linelen; curb++) {
2582            bool thisbit = test_bit(cur + curb, todump);
2583            linebuf[curb] = thisbit ? '1' : '.';
2584            found = found || (thisbit != expected);
2585        }
2586        if (found) {
2587            linebuf[curb] = '\0';
2588            fprintf(stderr,  "0x%08" PRIx64 " : %s\n", cur, linebuf);
2589        }
2590    }
2591}
2592
2593/* **** functions for postcopy ***** */
2594
2595void ram_postcopy_migrated_memory_release(MigrationState *ms)
2596{
2597    struct RAMBlock *block;
2598
2599    RAMBLOCK_FOREACH_MIGRATABLE(block) {
2600        unsigned long *bitmap = block->bmap;
2601        unsigned long range = block->used_length >> TARGET_PAGE_BITS;
2602        unsigned long run_start = find_next_zero_bit(bitmap, range, 0);
2603
2604        while (run_start < range) {
2605            unsigned long run_end = find_next_bit(bitmap, range, run_start + 1);
2606            ram_discard_range(block->idstr, run_start << TARGET_PAGE_BITS,
2607                              (run_end - run_start) << TARGET_PAGE_BITS);
2608            run_start = find_next_zero_bit(bitmap, range, run_end + 1);
2609        }
2610    }
2611}
2612
2613/**
2614 * postcopy_send_discard_bm_ram: discard a RAMBlock
2615 *
2616 * Returns zero on success
2617 *
2618 * Callback from postcopy_each_ram_send_discard for each RAMBlock
2619 * Note: At this point the 'unsentmap' is the processed bitmap combined
2620 *       with the dirtymap; so a '1' means it's either dirty or unsent.
2621 *
2622 * @ms: current migration state
2623 * @pds: state for postcopy
2624 * @start: RAMBlock starting page
2625 * @length: RAMBlock size
2626 */
2627static int postcopy_send_discard_bm_ram(MigrationState *ms,
2628                                        PostcopyDiscardState *pds,
2629                                        RAMBlock *block)
2630{
2631    unsigned long end = block->used_length >> TARGET_PAGE_BITS;
2632    unsigned long current;
2633    unsigned long *unsentmap = block->unsentmap;
2634
2635    for (current = 0; current < end; ) {
2636        unsigned long one = find_next_bit(unsentmap, end, current);
2637
2638        if (one <= end) {
2639            unsigned long zero = find_next_zero_bit(unsentmap, end, one + 1);
2640            unsigned long discard_length;
2641
2642            if (zero >= end) {
2643                discard_length = end - one;
2644            } else {
2645                discard_length = zero - one;
2646            }
2647            if (discard_length) {
2648                postcopy_discard_send_range(ms, pds, one, discard_length);
2649            }
2650            current = one + discard_length;
2651        } else {
2652            current = one;
2653        }
2654    }
2655
2656    return 0;
2657}
2658
2659/**
2660 * postcopy_each_ram_send_discard: discard all RAMBlocks
2661 *
2662 * Returns 0 for success or negative for error
2663 *
2664 * Utility for the outgoing postcopy code.
2665 *   Calls postcopy_send_discard_bm_ram for each RAMBlock
2666 *   passing it bitmap indexes and name.
2667 * (qemu_ram_foreach_block ends up passing unscaled lengths
2668 *  which would mean postcopy code would have to deal with target page)
2669 *
2670 * @ms: current migration state
2671 */
2672static int postcopy_each_ram_send_discard(MigrationState *ms)
2673{
2674    struct RAMBlock *block;
2675    int ret;
2676
2677    RAMBLOCK_FOREACH_MIGRATABLE(block) {
2678        PostcopyDiscardState *pds =
2679            postcopy_discard_send_init(ms, block->idstr);
2680
2681        /*
2682         * Postcopy sends chunks of bitmap over the wire, but it
2683         * just needs indexes at this point, avoids it having
2684         * target page specific code.
2685         */
2686        ret = postcopy_send_discard_bm_ram(ms, pds, block);
2687        postcopy_discard_send_finish(ms, pds);
2688        if (ret) {
2689            return ret;
2690        }
2691    }
2692
2693    return 0;
2694}
2695
2696/**
2697 * postcopy_chunk_hostpages_pass: canocalize bitmap in hostpages
2698 *
2699 * Helper for postcopy_chunk_hostpages; it's called twice to
2700 * canonicalize the two bitmaps, that are similar, but one is
2701 * inverted.
2702 *
2703 * Postcopy requires that all target pages in a hostpage are dirty or
2704 * clean, not a mix.  This function canonicalizes the bitmaps.
2705 *
2706 * @ms: current migration state
2707 * @unsent_pass: if true we need to canonicalize partially unsent host pages
2708 *               otherwise we need to canonicalize partially dirty host pages
2709 * @block: block that contains the page we want to canonicalize
2710 * @pds: state for postcopy
2711 */
2712static void postcopy_chunk_hostpages_pass(MigrationState *ms, bool unsent_pass,
2713                                          RAMBlock *block,
2714                                          PostcopyDiscardState *pds)
2715{
2716    RAMState *rs = ram_state;
2717    unsigned long *bitmap = block->bmap;
2718    unsigned long *unsentmap = block->unsentmap;
2719    unsigned int host_ratio = block->page_size / TARGET_PAGE_SIZE;
2720    unsigned long pages = block->used_length >> TARGET_PAGE_BITS;
2721    unsigned long run_start;
2722
2723    if (block->page_size == TARGET_PAGE_SIZE) {
2724        /* Easy case - TPS==HPS for a non-huge page RAMBlock */
2725        return;
2726    }
2727
2728    if (unsent_pass) {
2729        /* Find a sent page */
2730        run_start = find_next_zero_bit(unsentmap, pages, 0);
2731    } else {
2732        /* Find a dirty page */
2733        run_start = find_next_bit(bitmap, pages, 0);
2734    }
2735
2736    while (run_start < pages) {
2737        bool do_fixup = false;
2738        unsigned long fixup_start_addr;
2739        unsigned long host_offset;
2740
2741        /*
2742         * If the start of this run of pages is in the middle of a host
2743         * page, then we need to fixup this host page.
2744         */
2745        host_offset = run_start % host_ratio;
2746        if (host_offset) {
2747            do_fixup = true;
2748            run_start -= host_offset;
2749            fixup_start_addr = run_start;
2750            /* For the next pass */
2751            run_start = run_start + host_ratio;
2752        } else {
2753            /* Find the end of this run */
2754            unsigned long run_end;
2755            if (unsent_pass) {
2756                run_end = find_next_bit(unsentmap, pages, run_start + 1);
2757            } else {
2758                run_end = find_next_zero_bit(bitmap, pages, run_start + 1);
2759            }
2760            /*
2761             * If the end isn't at the start of a host page, then the
2762             * run doesn't finish at the end of a host page
2763             * and we need to discard.
2764             */
2765            host_offset = run_end % host_ratio;
2766            if (host_offset) {
2767                do_fixup = true;
2768                fixup_start_addr = run_end - host_offset;
2769                /*
2770                 * This host page has gone, the next loop iteration starts
2771                 * from after the fixup
2772                 */
2773                run_start = fixup_start_addr + host_ratio;
2774            } else {
2775                /*
2776                 * No discards on this iteration, next loop starts from
2777                 * next sent/dirty page
2778                 */
2779                run_start = run_end + 1;
2780            }
2781        }
2782
2783        if (do_fixup) {
2784            unsigned long page;
2785
2786            /* Tell the destination to discard this page */
2787            if (unsent_pass || !test_bit(fixup_start_addr, unsentmap)) {
2788                /* For the unsent_pass we:
2789                 *     discard partially sent pages
2790                 * For the !unsent_pass (dirty) we:
2791                 *     discard partially dirty pages that were sent
2792                 *     (any partially sent pages were already discarded
2793                 *     by the previous unsent_pass)
2794                 */
2795                postcopy_discard_send_range(ms, pds, fixup_start_addr,
2796                                            host_ratio);
2797            }
2798
2799            /* Clean up the bitmap */
2800            for (page = fixup_start_addr;
2801                 page < fixup_start_addr + host_ratio; page++) {
2802                /* All pages in this host page are now not sent */
2803                set_bit(page, unsentmap);
2804
2805                /*
2806                 * Remark them as dirty, updating the count for any pages
2807                 * that weren't previously dirty.
2808                 */
2809                rs->migration_dirty_pages += !test_and_set_bit(page, bitmap);
2810            }
2811        }
2812
2813        if (unsent_pass) {
2814            /* Find the next sent page for the next iteration */
2815            run_start = find_next_zero_bit(unsentmap, pages, run_start);
2816        } else {
2817            /* Find the next dirty page for the next iteration */
2818            run_start = find_next_bit(bitmap, pages, run_start);
2819        }
2820    }
2821}
2822
2823/**
2824 * postcopy_chuck_hostpages: discrad any partially sent host page
2825 *
2826 * Utility for the outgoing postcopy code.
2827 *
2828 * Discard any partially sent host-page size chunks, mark any partially
2829 * dirty host-page size chunks as all dirty.  In this case the host-page
2830 * is the host-page for the particular RAMBlock, i.e. it might be a huge page
2831 *
2832 * Returns zero on success
2833 *
2834 * @ms: current migration state
2835 * @block: block we want to work with
2836 */
2837static int postcopy_chunk_hostpages(MigrationState *ms, RAMBlock *block)
2838{
2839    PostcopyDiscardState *pds =
2840        postcopy_discard_send_init(ms, block->idstr);
2841
2842    /* First pass: Discard all partially sent host pages */
2843    postcopy_chunk_hostpages_pass(ms, true, block, pds);
2844    /*
2845     * Second pass: Ensure that all partially dirty host pages are made
2846     * fully dirty.
2847     */
2848    postcopy_chunk_hostpages_pass(ms, false, block, pds);
2849
2850    postcopy_discard_send_finish(ms, pds);
2851    return 0;
2852}
2853
2854/**
2855 * ram_postcopy_send_discard_bitmap: transmit the discard bitmap
2856 *
2857 * Returns zero on success
2858 *
2859 * Transmit the set of pages to be discarded after precopy to the target
2860 * these are pages that:
2861 *     a) Have been previously transmitted but are now dirty again
2862 *     b) Pages that have never been transmitted, this ensures that
2863 *        any pages on the destination that have been mapped by background
2864 *        tasks get discarded (transparent huge pages is the specific concern)
2865 * Hopefully this is pretty sparse
2866 *
2867 * @ms: current migration state
2868 */
2869int ram_postcopy_send_discard_bitmap(MigrationState *ms)
2870{
2871    RAMState *rs = ram_state;
2872    RAMBlock *block;
2873    int ret;
2874
2875    rcu_read_lock();
2876
2877    /* This should be our last sync, the src is now paused */
2878    migration_bitmap_sync(rs);
2879
2880    /* Easiest way to make sure we don't resume in the middle of a host-page */
2881    rs->last_seen_block = NULL;
2882    rs->last_sent_block = NULL;
2883    rs->last_page = 0;
2884
2885    RAMBLOCK_FOREACH_MIGRATABLE(block) {
2886        unsigned long pages = block->used_length >> TARGET_PAGE_BITS;
2887        unsigned long *bitmap = block->bmap;
2888        unsigned long *unsentmap = block->unsentmap;
2889
2890        if (!unsentmap) {
2891            /* We don't have a safe way to resize the sentmap, so
2892             * if the bitmap was resized it will be NULL at this
2893             * point.
2894             */
2895            error_report("migration ram resized during precopy phase");
2896            rcu_read_unlock();
2897            return -EINVAL;
2898        }
2899        /* Deal with TPS != HPS and huge pages */
2900        ret = postcopy_chunk_hostpages(ms, block);
2901        if (ret) {
2902            rcu_read_unlock();
2903            return ret;
2904        }
2905
2906        /*
2907         * Update the unsentmap to be unsentmap = unsentmap | dirty
2908         */
2909        bitmap_or(unsentmap, unsentmap, bitmap, pages);
2910#ifdef DEBUG_POSTCOPY
2911        ram_debug_dump_bitmap(unsentmap, true, pages);
2912#endif
2913    }
2914    trace_ram_postcopy_send_discard_bitmap();
2915
2916    ret = postcopy_each_ram_send_discard(ms);
2917    rcu_read_unlock();
2918
2919    return ret;
2920}
2921
2922/**
2923 * ram_discard_range: discard dirtied pages at the beginning of postcopy
2924 *
2925 * Returns zero on success
2926 *
2927 * @rbname: name of the RAMBlock of the request. NULL means the
2928 *          same that last one.
2929 * @start: RAMBlock starting page
2930 * @length: RAMBlock size
2931 */
2932int ram_discard_range(const char *rbname, uint64_t start, size_t length)
2933{
2934    int ret = -1;
2935
2936    trace_ram_discard_range(rbname, start, length);
2937
2938    rcu_read_lock();
2939    RAMBlock *rb = qemu_ram_block_by_name(rbname);
2940
2941    if (!rb) {
2942        error_report("ram_discard_range: Failed to find block '%s'", rbname);
2943        goto err;
2944    }
2945
2946    /*
2947     * On source VM, we don't need to update the received bitmap since
2948     * we don't even have one.
2949     */
2950    if (rb->receivedmap) {
2951        bitmap_clear(rb->receivedmap, start >> qemu_target_page_bits(),
2952                     length >> qemu_target_page_bits());
2953    }
2954
2955    ret = ram_block_discard_range(rb, start, length);
2956
2957err:
2958    rcu_read_unlock();
2959
2960    return ret;
2961}
2962
2963/*
2964 * For every allocation, we will try not to crash the VM if the
2965 * allocation failed.
2966 */
2967static int xbzrle_init(void)
2968{
2969    Error *local_err = NULL;
2970
2971    if (!migrate_use_xbzrle()) {
2972        return 0;
2973    }
2974
2975    XBZRLE_cache_lock();
2976
2977    XBZRLE.zero_target_page = g_try_malloc0(TARGET_PAGE_SIZE);
2978    if (!XBZRLE.zero_target_page) {
2979        error_report("%s: Error allocating zero page", __func__);
2980        goto err_out;
2981    }
2982
2983    XBZRLE.cache = cache_init(migrate_xbzrle_cache_size(),
2984                              TARGET_PAGE_SIZE, &local_err);
2985    if (!XBZRLE.cache) {
2986        error_report_err(local_err);
2987        goto free_zero_page;
2988    }
2989
2990    XBZRLE.encoded_buf = g_try_malloc0(TARGET_PAGE_SIZE);
2991    if (!XBZRLE.encoded_buf) {
2992        error_report("%s: Error allocating encoded_buf", __func__);
2993        goto free_cache;
2994    }
2995
2996    XBZRLE.current_buf = g_try_malloc(TARGET_PAGE_SIZE);
2997    if (!XBZRLE.current_buf) {
2998        error_report("%s: Error allocating current_buf", __func__);
2999        goto free_encoded_buf;
3000    }
3001
3002    /* We are all good */
3003    XBZRLE_cache_unlock();
3004    return 0;
3005
3006free_encoded_buf:
3007    g_free(XBZRLE.encoded_buf);
3008    XBZRLE.encoded_buf = NULL;
3009free_cache:
3010    cache_fini(XBZRLE.cache);
3011    XBZRLE.cache = NULL;
3012free_zero_page:
3013    g_free(XBZRLE.zero_target_page);
3014    XBZRLE.zero_target_page = NULL;
3015err_out:
3016    XBZRLE_cache_unlock();
3017    return -ENOMEM;
3018}
3019
3020static int ram_state_init(RAMState **rsp)
3021{
3022    *rsp = g_try_new0(RAMState, 1);
3023
3024    if (!*rsp) {
3025        error_report("%s: Init ramstate fail", __func__);
3026        return -1;
3027    }
3028
3029    qemu_mutex_init(&(*rsp)->bitmap_mutex);
3030    qemu_mutex_init(&(*rsp)->src_page_req_mutex);
3031    QSIMPLEQ_INIT(&(*rsp)->src_page_requests);
3032
3033    /*
3034     * Count the total number of pages used by ram blocks not including any
3035     * gaps due to alignment or unplugs.
3036     */
3037    (*rsp)->migration_dirty_pages = ram_bytes_total() >> TARGET_PAGE_BITS;
3038
3039    ram_state_reset(*rsp);
3040
3041    return 0;
3042}
3043
3044static void ram_list_init_bitmaps(void)
3045{
3046    RAMBlock *block;
3047    unsigned long pages;
3048
3049    /* Skip setting bitmap if there is no RAM */
3050    if (ram_bytes_total()) {
3051        RAMBLOCK_FOREACH_MIGRATABLE(block) {
3052            pages = block->max_length >> TARGET_PAGE_BITS;
3053            block->bmap = bitmap_new(pages);
3054            bitmap_set(block->bmap, 0, pages);
3055            if (migrate_postcopy_ram()) {
3056                block->unsentmap = bitmap_new(pages);
3057                bitmap_set(block->unsentmap, 0, pages);
3058            }
3059        }
3060    }
3061}
3062
3063static void ram_init_bitmaps(RAMState *rs)
3064{
3065    /* For memory_global_dirty_log_start below.  */
3066    qemu_mutex_lock_iothread();
3067    qemu_mutex_lock_ramlist();
3068    rcu_read_lock();
3069
3070    ram_list_init_bitmaps();
3071    memory_global_dirty_log_start();
3072    migration_bitmap_sync(rs);
3073
3074    rcu_read_unlock();
3075    qemu_mutex_unlock_ramlist();
3076    qemu_mutex_unlock_iothread();
3077}
3078
3079static int ram_init_all(RAMState **rsp)
3080{
3081    if (ram_state_init(rsp)) {
3082        return -1;
3083    }
3084
3085    if (xbzrle_init()) {
3086        ram_state_cleanup(rsp);
3087        return -1;
3088    }
3089
3090    ram_init_bitmaps(*rsp);
3091
3092    return 0;
3093}
3094
3095static void ram_state_resume_prepare(RAMState *rs, QEMUFile *out)
3096{
3097    RAMBlock *block;
3098    uint64_t pages = 0;
3099
3100    /*
3101     * Postcopy is not using xbzrle/compression, so no need for that.
3102     * Also, since source are already halted, we don't need to care
3103     * about dirty page logging as well.
3104     */
3105
3106    RAMBLOCK_FOREACH_MIGRATABLE(block) {
3107        pages += bitmap_count_one(block->bmap,
3108                                  block->used_length >> TARGET_PAGE_BITS);
3109    }
3110
3111    /* This may not be aligned with current bitmaps. Recalculate. */
3112    rs->migration_dirty_pages = pages;
3113
3114    rs->last_seen_block = NULL;
3115    rs->last_sent_block = NULL;
3116    rs->last_page = 0;
3117    rs->last_version = ram_list.version;
3118    /*
3119     * Disable the bulk stage, otherwise we'll resend the whole RAM no
3120     * matter what we have sent.
3121     */
3122    rs->ram_bulk_stage = false;
3123
3124    /* Update RAMState cache of output QEMUFile */
3125    rs->f = out;
3126
3127    trace_ram_state_resume_prepare(pages);
3128}
3129
3130/*
3131 * Each of ram_save_setup, ram_save_iterate and ram_save_complete has
3132 * long-running RCU critical section.  When rcu-reclaims in the code
3133 * start to become numerous it will be necessary to reduce the
3134 * granularity of these critical sections.
3135 */
3136
3137/**
3138 * ram_save_setup: Setup RAM for migration
3139 *
3140 * Returns zero to indicate success and negative for error
3141 *
3142 * @f: QEMUFile where to send the data
3143 * @opaque: RAMState pointer
3144 */
3145static int ram_save_setup(QEMUFile *f, void *opaque)
3146{
3147    RAMState **rsp = opaque;
3148    RAMBlock *block;
3149
3150    if (compress_threads_save_setup()) {
3151        return -1;
3152    }
3153
3154    /* migration has already setup the bitmap, reuse it. */
3155    if (!migration_in_colo_state()) {
3156        if (ram_init_all(rsp) != 0) {
3157            compress_threads_save_cleanup();
3158            return -1;
3159        }
3160    }
3161    (*rsp)->f = f;
3162
3163    rcu_read_lock();
3164
3165    qemu_put_be64(f, ram_bytes_total() | RAM_SAVE_FLAG_MEM_SIZE);
3166
3167    RAMBLOCK_FOREACH_MIGRATABLE(block) {
3168        qemu_put_byte(f, strlen(block->idstr));
3169        qemu_put_buffer(f, (uint8_t *)block->idstr, strlen(block->idstr));
3170        qemu_put_be64(f, block->used_length);
3171        if (migrate_postcopy_ram() && block->page_size != qemu_host_page_size) {
3172            qemu_put_be64(f, block->page_size);
3173        }
3174    }
3175
3176    rcu_read_unlock();
3177
3178    ram_control_before_iterate(f, RAM_CONTROL_SETUP);
3179    ram_control_after_iterate(f, RAM_CONTROL_SETUP);
3180
3181    multifd_send_sync_main();
3182    qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3183    qemu_fflush(f);
3184
3185    return 0;
3186}
3187
3188/**
3189 * ram_save_iterate: iterative stage for migration
3190 *
3191 * Returns zero to indicate success and negative for error
3192 *
3193 * @f: QEMUFile where to send the data
3194 * @opaque: RAMState pointer
3195 */
3196static int ram_save_iterate(QEMUFile *f, void *opaque)
3197{
3198    RAMState **temp = opaque;
3199    RAMState *rs = *temp;
3200    int ret;
3201    int i;
3202    int64_t t0;
3203    int done = 0;
3204
3205    if (blk_mig_bulk_active()) {
3206        /* Avoid transferring ram during bulk phase of block migration as
3207         * the bulk phase will usually take a long time and transferring
3208         * ram updates during that time is pointless. */
3209        goto out;
3210    }
3211
3212    rcu_read_lock();
3213    if (ram_list.version != rs->last_version) {
3214        ram_state_reset(rs);
3215    }
3216
3217    /* Read version before ram_list.blocks */
3218    smp_rmb();
3219
3220    ram_control_before_iterate(f, RAM_CONTROL_ROUND);
3221
3222    t0 = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
3223    i = 0;
3224    while ((ret = qemu_file_rate_limit(f)) == 0 ||
3225            !QSIMPLEQ_EMPTY(&rs->src_page_requests)) {
3226        int pages;
3227
3228        if (qemu_file_get_error(f)) {
3229            break;
3230        }
3231
3232        pages = ram_find_and_save_block(rs, false);
3233        /* no more pages to sent */
3234        if (pages == 0) {
3235            done = 1;
3236            break;
3237        }
3238
3239        if (pages < 0) {
3240            qemu_file_set_error(f, pages);
3241            break;
3242        }
3243
3244        rs->target_page_count += pages;
3245
3246        /* we want to check in the 1st loop, just in case it was the 1st time
3247           and we had to sync the dirty bitmap.
3248           qemu_get_clock_ns() is a bit expensive, so we only check each some
3249           iterations
3250        */
3251        if ((i & 63) == 0) {
3252            uint64_t t1 = (qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - t0) / 1000000;
3253            if (t1 > MAX_WAIT) {
3254                trace_ram_save_iterate_big_wait(t1, i);
3255                break;
3256            }
3257        }
3258        i++;
3259    }
3260    rcu_read_unlock();
3261
3262    /*
3263     * Must occur before EOS (or any QEMUFile operation)
3264     * because of RDMA protocol.
3265     */
3266    ram_control_after_iterate(f, RAM_CONTROL_ROUND);
3267
3268    multifd_send_sync_main();
3269out:
3270    qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3271    qemu_fflush(f);
3272    ram_counters.transferred += 8;
3273
3274    ret = qemu_file_get_error(f);
3275    if (ret < 0) {
3276        return ret;
3277    }
3278
3279    return done;
3280}
3281
3282/**
3283 * ram_save_complete: function called to send the remaining amount of ram
3284 *
3285 * Returns zero to indicate success or negative on error
3286 *
3287 * Called with iothread lock
3288 *
3289 * @f: QEMUFile where to send the data
3290 * @opaque: RAMState pointer
3291 */
3292static int ram_save_complete(QEMUFile *f, void *opaque)
3293{
3294    RAMState **temp = opaque;
3295    RAMState *rs = *temp;
3296    int ret = 0;
3297
3298    rcu_read_lock();
3299
3300    if (!migration_in_postcopy()) {
3301        migration_bitmap_sync(rs);
3302    }
3303
3304    ram_control_before_iterate(f, RAM_CONTROL_FINISH);
3305
3306    /* try transferring iterative blocks of memory */
3307
3308    /* flush all remaining blocks regardless of rate limiting */
3309    while (true) {
3310        int pages;
3311
3312        pages = ram_find_and_save_block(rs, !migration_in_colo_state());
3313        /* no more blocks to sent */
3314        if (pages == 0) {
3315            break;
3316        }
3317        if (pages < 0) {
3318            ret = pages;
3319            break;
3320        }
3321    }
3322
3323    flush_compressed_data(rs);
3324    ram_control_after_iterate(f, RAM_CONTROL_FINISH);
3325
3326    rcu_read_unlock();
3327
3328    multifd_send_sync_main();
3329    qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3330    qemu_fflush(f);
3331
3332    return ret;
3333}
3334
3335static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
3336                             uint64_t *res_precopy_only,
3337                             uint64_t *res_compatible,
3338                             uint64_t *res_postcopy_only)
3339{
3340    RAMState **temp = opaque;
3341    RAMState *rs = *temp;
3342    uint64_t remaining_size;
3343
3344    remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
3345
3346    if (!migration_in_postcopy() &&
3347        remaining_size < max_size) {
3348        qemu_mutex_lock_iothread();
3349        rcu_read_lock();
3350        migration_bitmap_sync(rs);
3351        rcu_read_unlock();
3352        qemu_mutex_unlock_iothread();
3353        remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
3354    }
3355
3356    if (migrate_postcopy_ram()) {
3357        /* We can do postcopy, and all the data is postcopiable */
3358        *res_compatible += remaining_size;
3359    } else {
3360        *res_precopy_only += remaining_size;
3361    }
3362}
3363
3364static int load_xbzrle(QEMUFile *f, ram_addr_t addr, void *host)
3365{
3366    unsigned int xh_len;
3367    int xh_flags;
3368    uint8_t *loaded_data;
3369
3370    /* extract RLE header */
3371    xh_flags = qemu_get_byte(f);
3372    xh_len = qemu_get_be16(f);
3373
3374    if (xh_flags != ENCODING_FLAG_XBZRLE) {
3375        error_report("Failed to load XBZRLE page - wrong compression!");
3376        return -1;
3377    }
3378
3379    if (xh_len > TARGET_PAGE_SIZE) {
3380        error_report("Failed to load XBZRLE page - len overflow!");
3381        return -1;
3382    }
3383    loaded_data = XBZRLE.decoded_buf;
3384    /* load data and decode */
3385    /* it can change loaded_data to point to an internal buffer */
3386    qemu_get_buffer_in_place(f, &loaded_data, xh_len);
3387
3388    /* decode RLE */
3389    if (xbzrle_decode_buffer(loaded_data, xh_len, host,
3390                             TARGET_PAGE_SIZE) == -1) {
3391        error_report("Failed to load XBZRLE page - decode error!");
3392        return -1;
3393    }
3394
3395    return 0;
3396}
3397
3398/**
3399 * ram_block_from_stream: read a RAMBlock id from the migration stream
3400 *
3401 * Must be called from within a rcu critical section.
3402 *
3403 * Returns a pointer from within the RCU-protected ram_list.
3404 *
3405 * @f: QEMUFile where to read the data from
3406 * @flags: Page flags (mostly to see if it's a continuation of previous block)
3407 */
3408static inline RAMBlock *ram_block_from_stream(QEMUFile *f, int flags)
3409{
3410    static RAMBlock *block = NULL;
3411    char id[256];
3412    uint8_t len;
3413
3414    if (flags & RAM_SAVE_FLAG_CONTINUE) {
3415        if (!block) {
3416            error_report("Ack, bad migration stream!");
3417            return NULL;
3418        }
3419        return block;
3420    }
3421
3422    len = qemu_get_byte(f);
3423    qemu_get_buffer(f, (uint8_t *)id, len);
3424    id[len] = 0;
3425
3426    block = qemu_ram_block_by_name(id);
3427    if (!block) {
3428        error_report("Can't find block %s", id);
3429        return NULL;
3430    }
3431
3432    if (!qemu_ram_is_migratable(block)) {
3433        error_report("block %s should not be migrated !", id);
3434        return NULL;
3435    }
3436
3437    return block;
3438}
3439
3440static inline void *host_from_ram_block_offset(RAMBlock *block,
3441                                               ram_addr_t offset)
3442{
3443    if (!offset_in_ramblock(block, offset)) {
3444        return NULL;
3445    }
3446
3447    return block->host + offset;
3448}
3449
3450static inline void *colo_cache_from_block_offset(RAMBlock *block,
3451                                                 ram_addr_t offset)
3452{
3453    if (!offset_in_ramblock(block, offset)) {
3454        return NULL;
3455    }
3456    if (!block->colo_cache) {
3457        error_report("%s: colo_cache is NULL in block :%s",
3458                     __func__, block->idstr);
3459        return NULL;
3460    }
3461
3462    /*
3463    * During colo checkpoint, we need bitmap of these migrated pages.
3464    * It help us to decide which pages in ram cache should be flushed
3465    * into VM's RAM later.
3466    */
3467    if (!test_and_set_bit(offset >> TARGET_PAGE_BITS, block->bmap)) {
3468        ram_state->migration_dirty_pages++;
3469    }
3470    return block->colo_cache + offset;
3471}
3472
3473/**
3474 * ram_handle_compressed: handle the zero page case
3475 *
3476 * If a page (or a whole RDMA chunk) has been
3477 * determined to be zero, then zap it.
3478 *
3479 * @host: host address for the zero page
3480 * @ch: what the page is filled from.  We only support zero
3481 * @size: size of the zero page
3482 */
3483void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
3484{
3485    if (ch != 0 || !is_zero_range(host, size)) {
3486        memset(host, ch, size);
3487    }
3488}
3489
3490/* return the size after decompression, or negative value on error */
3491static int
3492qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
3493                     const uint8_t *source, size_t source_len)
3494{
3495    int err;
3496
3497    err = inflateReset(stream);
3498    if (err != Z_OK) {
3499        return -1;
3500    }
3501
3502    stream->avail_in = source_len;
3503    stream->next_in = (uint8_t *)source;
3504    stream->avail_out = dest_len;
3505    stream->next_out = dest;
3506
3507    err = inflate(stream, Z_NO_FLUSH);
3508    if (err != Z_STREAM_END) {
3509        return -1;
3510    }
3511
3512    return stream->total_out;
3513}
3514
3515static void *do_data_decompress(void *opaque)
3516{
3517    DecompressParam *param = opaque;
3518    unsigned long pagesize;
3519    uint8_t *des;
3520    int len, ret;
3521
3522    qemu_mutex_lock(&param->mutex);
3523    while (!param->quit) {
3524        if (param->des) {
3525            des = param->des;
3526            len = param->len;
3527            param->des = 0;
3528            qemu_mutex_unlock(&param->mutex);
3529
3530            pagesize = TARGET_PAGE_SIZE;
3531
3532            ret = qemu_uncompress_data(&param->stream, des, pagesize,
3533                                       param->compbuf, len);
3534            if (ret < 0 && migrate_get_current()->decompress_error_check) {
3535                error_report("decompress data failed");
3536                qemu_file_set_error(decomp_file, ret);
3537            }
3538
3539            qemu_mutex_lock(&decomp_done_lock);
3540            param->done = true;
3541            qemu_cond_signal(&decomp_done_cond);
3542            qemu_mutex_unlock(&decomp_done_lock);
3543
3544            qemu_mutex_lock(&param->mutex);
3545        } else {
3546            qemu_cond_wait(&param->cond, &param->mutex);
3547        }
3548    }
3549    qemu_mutex_unlock(&param->mutex);
3550
3551    return NULL;
3552}
3553
3554static int wait_for_decompress_done(void)
3555{
3556    int idx, thread_count;
3557
3558    if (!migrate_use_compression()) {
3559        return 0;
3560    }
3561
3562    thread_count = migrate_decompress_threads();
3563    qemu_mutex_lock(&decomp_done_lock);
3564    for (idx = 0; idx < thread_count; idx++) {
3565        while (!decomp_param[idx].done) {
3566            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
3567        }
3568    }
3569    qemu_mutex_unlock(&decomp_done_lock);
3570    return qemu_file_get_error(decomp_file);
3571}
3572
3573static void compress_threads_load_cleanup(void)
3574{
3575    int i, thread_count;
3576
3577    if (!migrate_use_compression()) {
3578        return;
3579    }
3580    thread_count = migrate_decompress_threads();
3581    for (i = 0; i < thread_count; i++) {
3582        /*
3583         * we use it as a indicator which shows if the thread is
3584         * properly init'd or not
3585         */
3586        if (!decomp_param[i].compbuf) {
3587            break;
3588        }
3589
3590        qemu_mutex_lock(&decomp_param[i].mutex);
3591        decomp_param[i].quit = true;
3592        qemu_cond_signal(&decomp_param[i].cond);
3593        qemu_mutex_unlock(&decomp_param[i].mutex);
3594    }
3595    for (i = 0; i < thread_count; i++) {
3596        if (!decomp_param[i].compbuf) {
3597            break;
3598        }
3599
3600        qemu_thread_join(decompress_threads + i);
3601        qemu_mutex_destroy(&decomp_param[i].mutex);
3602        qemu_cond_destroy(&decomp_param[i].cond);
3603        inflateEnd(&decomp_param[i].stream);
3604        g_free(decomp_param[i].compbuf);
3605        decomp_param[i].compbuf = NULL;
3606    }
3607    g_free(decompress_threads);
3608    g_free(decomp_param);
3609    decompress_threads = NULL;
3610    decomp_param = NULL;
3611    decomp_file = NULL;
3612}
3613
3614static int compress_threads_load_setup(QEMUFile *f)
3615{
3616    int i, thread_count;
3617
3618    if (!migrate_use_compression()) {
3619        return 0;
3620    }
3621
3622    thread_count = migrate_decompress_threads();
3623    decompress_threads = g_new0(QemuThread, thread_count);
3624    decomp_param = g_new0(DecompressParam, thread_count);
3625    qemu_mutex_init(&decomp_done_lock);
3626    qemu_cond_init(&decomp_done_cond);
3627    decomp_file = f;
3628    for (i = 0; i < thread_count; i++) {
3629        if (inflateInit(&decomp_param[i].stream) != Z_OK) {
3630            goto exit;
3631        }
3632
3633        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
3634        qemu_mutex_init(&decomp_param[i].mutex);
3635        qemu_cond_init(&decomp_param[i].cond);
3636        decomp_param[i].done = true;
3637        decomp_param[i].quit = false;
3638        qemu_thread_create(decompress_threads + i, "decompress",
3639                           do_data_decompress, decomp_param + i,
3640                           QEMU_THREAD_JOINABLE);
3641    }
3642    return 0;
3643exit:
3644    compress_threads_load_cleanup();
3645    return -1;
3646}
3647
3648static void decompress_data_with_multi_threads(QEMUFile *f,
3649                                               void *host, int len)
3650{
3651    int idx, thread_count;
3652
3653    thread_count = migrate_decompress_threads();
3654    qemu_mutex_lock(&decomp_done_lock);
3655    while (true) {
3656        for (idx = 0; idx < thread_count; idx++) {
3657            if (decomp_param[idx].done) {
3658                decomp_param[idx].done = false;
3659                qemu_mutex_lock(&decomp_param[idx].mutex);
3660                qemu_get_buffer(f, decomp_param[idx].compbuf, len);
3661                decomp_param[idx].des = host;
3662                decomp_param[idx].len = len;
3663                qemu_cond_signal(&decomp_param[idx].cond);
3664                qemu_mutex_unlock(&decomp_param[idx].mutex);
3665                break;
3666            }
3667        }
3668        if (idx < thread_count) {
3669            break;
3670        } else {
3671            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
3672        }
3673    }
3674    qemu_mutex_unlock(&decomp_done_lock);
3675}
3676
3677/*
3678 * colo cache: this is for secondary VM, we cache the whole
3679 * memory of the secondary VM, it is need to hold the global lock
3680 * to call this helper.
3681 */
3682int colo_init_ram_cache(void)
3683{
3684    RAMBlock *block;
3685
3686    rcu_read_lock();
3687    RAMBLOCK_FOREACH_MIGRATABLE(block) {
3688        block->colo_cache = qemu_anon_ram_alloc(block->used_length,
3689                                                NULL,
3690                                                false);
3691        if (!block->colo_cache) {
3692            error_report("%s: Can't alloc memory for COLO cache of block %s,"
3693                         "size 0x" RAM_ADDR_FMT, __func__, block->idstr,
3694                         block->used_length);
3695            goto out_locked;
3696        }
3697        memcpy(block->colo_cache, block->host, block->used_length);
3698    }
3699    rcu_read_unlock();
3700    /*
3701    * Record the dirty pages that sent by PVM, we use this dirty bitmap together
3702    * with to decide which page in cache should be flushed into SVM's RAM. Here
3703    * we use the same name 'ram_bitmap' as for migration.
3704    */
3705    if (ram_bytes_total()) {
3706        RAMBlock *block;
3707
3708        RAMBLOCK_FOREACH_MIGRATABLE(block) {
3709            unsigned long pages = block->max_length >> TARGET_PAGE_BITS;
3710
3711            block->bmap = bitmap_new(pages);
3712            bitmap_set(block->bmap, 0, pages);
3713        }
3714    }
3715    ram_state = g_new0(RAMState, 1);
3716    ram_state->migration_dirty_pages = 0;
3717    memory_global_dirty_log_start();
3718
3719    return 0;
3720
3721out_locked:
3722
3723    RAMBLOCK_FOREACH_MIGRATABLE(block) {
3724        if (block->colo_cache) {
3725            qemu_anon_ram_free(block->colo_cache, block->used_length);
3726            block->colo_cache = NULL;
3727        }
3728    }
3729
3730    rcu_read_unlock();
3731    return -errno;
3732}
3733
3734/* It is need to hold the global lock to call this helper */
3735void colo_release_ram_cache(void)
3736{
3737    RAMBlock *block;
3738
3739    memory_global_dirty_log_stop();
3740    RAMBLOCK_FOREACH_MIGRATABLE(block) {
3741        g_free(block->bmap);
3742        block->bmap = NULL;
3743    }
3744
3745    rcu_read_lock();
3746
3747    RAMBLOCK_FOREACH_MIGRATABLE(block) {
3748        if (block->colo_cache) {
3749            qemu_anon_ram_free(block->colo_cache, block->used_length);
3750            block->colo_cache = NULL;
3751        }
3752    }
3753
3754    rcu_read_unlock();
3755    g_free(ram_state);
3756    ram_state = NULL;
3757}
3758
3759/**
3760 * ram_load_setup: Setup RAM for migration incoming side
3761 *
3762 * Returns zero to indicate success and negative for error
3763 *
3764 * @f: QEMUFile where to receive the data
3765 * @opaque: RAMState pointer
3766 */
3767static int ram_load_setup(QEMUFile *f, void *opaque)
3768{
3769    if (compress_threads_load_setup(f)) {
3770        return -1;
3771    }
3772
3773    xbzrle_load_setup();
3774    ramblock_recv_map_init();
3775
3776    return 0;
3777}
3778
3779static int ram_load_cleanup(void *opaque)
3780{
3781    RAMBlock *rb;
3782
3783    RAMBLOCK_FOREACH_MIGRATABLE(rb) {
3784        if (ramblock_is_pmem(rb)) {
3785            pmem_persist(rb->host, rb->used_length);
3786        }
3787    }
3788
3789    xbzrle_load_cleanup();
3790    compress_threads_load_cleanup();
3791
3792    RAMBLOCK_FOREACH_MIGRATABLE(rb) {
3793        g_free(rb->receivedmap);
3794        rb->receivedmap = NULL;
3795    }
3796
3797    return 0;
3798}
3799
3800/**
3801 * ram_postcopy_incoming_init: allocate postcopy data structures
3802 *
3803 * Returns 0 for success and negative if there was one error
3804 *
3805 * @mis: current migration incoming state
3806 *
3807 * Allocate data structures etc needed by incoming migration with
3808 * postcopy-ram. postcopy-ram's similarly names
3809 * postcopy_ram_incoming_init does the work.
3810 */
3811int ram_postcopy_incoming_init(MigrationIncomingState *mis)
3812{
3813    return postcopy_ram_incoming_init(mis);
3814}
3815
3816/**
3817 * ram_load_postcopy: load a page in postcopy case
3818 *
3819 * Returns 0 for success or -errno in case of error
3820 *
3821 * Called in postcopy mode by ram_load().
3822 * rcu_read_lock is taken prior to this being called.
3823 *
3824 * @f: QEMUFile where to send the data
3825 */
3826static int ram_load_postcopy(QEMUFile *f)
3827{
3828    int flags = 0, ret = 0;
3829    bool place_needed = false;
3830    bool matches_target_page_size = false;
3831    MigrationIncomingState *mis = migration_incoming_get_current();
3832    /* Temporary page that is later 'placed' */
3833    void *postcopy_host_page = postcopy_get_tmp_page(mis);
3834    void *last_host = NULL;
3835    bool all_zero = false;
3836
3837    while (!ret && !(flags & RAM_SAVE_FLAG_EOS)) {
3838        ram_addr_t addr;
3839        void *host = NULL;
3840        void *page_buffer = NULL;
3841        void *place_source = NULL;
3842        RAMBlock *block = NULL;
3843        uint8_t ch;
3844
3845        addr = qemu_get_be64(f);
3846
3847        /*
3848         * If qemu file error, we should stop here, and then "addr"
3849         * may be invalid
3850         */
3851        ret = qemu_file_get_error(f);
3852        if (ret) {
3853            break;
3854        }
3855
3856        flags = addr & ~TARGET_PAGE_MASK;
3857        addr &= TARGET_PAGE_MASK;
3858
3859        trace_ram_load_postcopy_loop((uint64_t)addr, flags);
3860        place_needed = false;
3861        if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE)) {
3862            block = ram_block_from_stream(f, flags);
3863
3864            host = host_from_ram_block_offset(block, addr);
3865            if (!host) {
3866                error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
3867                ret = -EINVAL;
3868                break;
3869            }
3870            matches_target_page_size = block->page_size == TARGET_PAGE_SIZE;
3871            /*
3872             * Postcopy requires that we place whole host pages atomically;
3873             * these may be huge pages for RAMBlocks that are backed by
3874             * hugetlbfs.
3875             * To make it atomic, the data is read into a temporary page
3876             * that's moved into place later.
3877             * The migration protocol uses,  possibly smaller, target-pages
3878             * however the source ensures it always sends all the components
3879             * of a host page in order.
3880             */
3881            page_buffer = postcopy_host_page +
3882                          ((uintptr_t)host & (block->page_size - 1));
3883            /* If all TP are zero then we can optimise the place */
3884            if (!((uintptr_t)host & (block->page_size - 1))) {
3885                all_zero = true;
3886            } else {
3887                /* not the 1st TP within the HP */
3888                if (host != (last_host + TARGET_PAGE_SIZE)) {
3889                    error_report("Non-sequential target page %p/%p",
3890                                  host, last_host);
3891                    ret = -EINVAL;
3892                    break;
3893                }
3894            }
3895
3896
3897            /*
3898             * If it's the last part of a host page then we place the host
3899             * page
3900             */
3901            place_needed = (((uintptr_t)host + TARGET_PAGE_SIZE) &
3902                                     (block->page_size - 1)) == 0;
3903            place_source = postcopy_host_page;
3904        }
3905        last_host = host;
3906
3907        switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
3908        case RAM_SAVE_FLAG_ZERO:
3909            ch = qemu_get_byte(f);
3910            memset(page_buffer, ch, TARGET_PAGE_SIZE);
3911            if (ch) {
3912                all_zero = false;
3913            }
3914            break;
3915
3916        case RAM_SAVE_FLAG_PAGE:
3917            all_zero = false;
3918            if (!matches_target_page_size) {
3919                /* For huge pages, we always use temporary buffer */
3920                qemu_get_buffer(f, page_buffer, TARGET_PAGE_SIZE);
3921            } else {
3922                /*
3923                 * For small pages that matches target page size, we
3924                 * avoid the qemu_file copy.  Instead we directly use
3925                 * the buffer of QEMUFile to place the page.  Note: we
3926                 * cannot do any QEMUFile operation before using that
3927                 * buffer to make sure the buffer is valid when
3928                 * placing the page.
3929                 */
3930                qemu_get_buffer_in_place(f, (uint8_t **)&place_source,
3931                                         TARGET_PAGE_SIZE);
3932            }
3933            break;
3934        case RAM_SAVE_FLAG_EOS:
3935            /* normal exit */
3936            multifd_recv_sync_main();
3937            break;
3938        default:
3939            error_report("Unknown combination of migration flags: %#x"
3940                         " (postcopy mode)", flags);
3941            ret = -EINVAL;
3942            break;
3943        }
3944
3945        /* Detect for any possible file errors */
3946        if (!ret && qemu_file_get_error(f)) {
3947            ret = qemu_file_get_error(f);
3948        }
3949
3950        if (!ret && place_needed) {
3951            /* This gets called at the last target page in the host page */
3952            void *place_dest = host + TARGET_PAGE_SIZE - block->page_size;
3953
3954            if (all_zero) {
3955                ret = postcopy_place_page_zero(mis, place_dest,
3956                                               block);
3957            } else {
3958                ret = postcopy_place_page(mis, place_dest,
3959                                          place_source, block);
3960            }
3961        }
3962    }
3963
3964    return ret;
3965}
3966
3967static bool postcopy_is_advised(void)
3968{
3969    PostcopyState ps = postcopy_state_get();
3970    return ps >= POSTCOPY_INCOMING_ADVISE && ps < POSTCOPY_INCOMING_END;
3971}
3972
3973static bool postcopy_is_running(void)
3974{
3975    PostcopyState ps = postcopy_state_get();
3976    return ps >= POSTCOPY_INCOMING_LISTENING && ps < POSTCOPY_INCOMING_END;
3977}
3978
3979/*
3980 * Flush content of RAM cache into SVM's memory.
3981 * Only flush the pages that be dirtied by PVM or SVM or both.
3982 */
3983static void colo_flush_ram_cache(void)
3984{
3985    RAMBlock *block = NULL;
3986    void *dst_host;
3987    void *src_host;
3988    unsigned long offset = 0;
3989
3990    memory_global_dirty_log_sync();
3991    rcu_read_lock();
3992    RAMBLOCK_FOREACH_MIGRATABLE(block) {
3993        migration_bitmap_sync_range(ram_state, block, 0, block->used_length);
3994    }
3995    rcu_read_unlock();
3996
3997    trace_colo_flush_ram_cache_begin(ram_state->migration_dirty_pages);
3998    rcu_read_lock();
3999    block = QLIST_FIRST_RCU(&ram_list.blocks);
4000
4001    while (block) {
4002        offset = migration_bitmap_find_dirty(ram_state, block, offset);
4003
4004        if (offset << TARGET_PAGE_BITS >= block->used_length) {
4005            offset = 0;
4006            block = QLIST_NEXT_RCU(block, next);
4007        } else {
4008            migration_bitmap_clear_dirty(ram_state, block, offset);
4009            dst_host = block->host + (offset << TARGET_PAGE_BITS);
4010            src_host = block->colo_cache + (offset << TARGET_PAGE_BITS);
4011            memcpy(dst_host, src_host, TARGET_PAGE_SIZE);
4012        }
4013    }
4014
4015    rcu_read_unlock();
4016    trace_colo_flush_ram_cache_end();
4017}
4018
4019static int ram_load(QEMUFile *f, void *opaque, int version_id)
4020{
4021    int flags = 0, ret = 0, invalid_flags = 0;
4022    static uint64_t seq_iter;
4023    int len = 0;
4024    /*
4025     * If system is running in postcopy mode, page inserts to host memory must
4026     * be atomic
4027     */
4028    bool postcopy_running = postcopy_is_running();
4029    /* ADVISE is earlier, it shows the source has the postcopy capability on */
4030    bool postcopy_advised = postcopy_is_advised();
4031
4032    seq_iter++;
4033
4034    if (version_id != 4) {
4035        ret = -EINVAL;
4036    }
4037
4038    if (!migrate_use_compression()) {
4039        invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
4040    }
4041    /* This RCU critical section can be very long running.
4042     * When RCU reclaims in the code start to become numerous,
4043     * it will be necessary to reduce the granularity of this
4044     * critical section.
4045     */
4046    rcu_read_lock();
4047
4048    if (postcopy_running) {
4049        ret = ram_load_postcopy(f);
4050    }
4051
4052    while (!postcopy_running && !ret && !(flags & RAM_SAVE_FLAG_EOS)) {
4053        ram_addr_t addr, total_ram_bytes;
4054        void *host = NULL;
4055        uint8_t ch;
4056
4057        addr = qemu_get_be64(f);
4058        flags = addr & ~TARGET_PAGE_MASK;
4059        addr &= TARGET_PAGE_MASK;
4060
4061        if (flags & invalid_flags) {
4062            if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) {
4063                error_report("Received an unexpected compressed page");
4064            }
4065
4066            ret = -EINVAL;
4067            break;
4068        }
4069
4070        if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
4071                     RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
4072            RAMBlock *block = ram_block_from_stream(f, flags);
4073
4074            /*
4075             * After going into COLO, we should load the Page into colo_cache.
4076             */
4077            if (migration_incoming_in_colo_state()) {
4078                host = colo_cache_from_block_offset(block, addr);
4079            } else {
4080                host = host_from_ram_block_offset(block, addr);
4081            }
4082            if (!host) {
4083                error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
4084                ret = -EINVAL;
4085                break;
4086            }
4087
4088            if (!migration_incoming_in_colo_state()) {
4089                ramblock_recv_bitmap_set(block, host);
4090            }
4091
4092            trace_ram_load_loop(block->idstr, (uint64_t)addr, flags, host);
4093        }
4094
4095        switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
4096        case RAM_SAVE_FLAG_MEM_SIZE:
4097            /* Synchronize RAM block list */
4098            total_ram_bytes = addr;
4099            while (!ret && total_ram_bytes) {
4100                RAMBlock *block;
4101                char id[256];
4102                ram_addr_t length;
4103
4104                len = qemu_get_byte(f);
4105                qemu_get_buffer(f, (uint8_t *)id, len);
4106                id[len] = 0;
4107                length = qemu_get_be64(f);
4108
4109                block = qemu_ram_block_by_name(id);
4110                if (block && !qemu_ram_is_migratable(block)) {
4111                    error_report("block %s should not be migrated !", id);
4112                    ret = -EINVAL;
4113                } else if (block) {
4114                    if (length != block->used_length) {
4115                        Error *local_err = NULL;
4116
4117                        ret = qemu_ram_resize(block, length,
4118                                              &local_err);
4119                        if (local_err) {
4120                            error_report_err(local_err);
4121                        }
4122                    }
4123                    /* For postcopy we need to check hugepage sizes match */
4124                    if (postcopy_advised &&
4125                        block->page_size != qemu_host_page_size) {
4126                        uint64_t remote_page_size = qemu_get_be64(f);
4127                        if (remote_page_size != block->page_size) {
4128                            error_report("Mismatched RAM page size %s "
4129                                         "(local) %zd != %" PRId64,
4130                                         id, block->page_size,
4131                                         remote_page_size);
4132                            ret = -EINVAL;
4133                        }
4134                    }
4135                    ram_control_load_hook(f, RAM_CONTROL_BLOCK_REG,
4136                                          block->idstr);
4137                } else {
4138                    error_report("Unknown ramblock \"%s\", cannot "
4139                                 "accept migration", id);
4140                    ret = -EINVAL;
4141                }
4142
4143                total_ram_bytes -= length;
4144            }
4145            break;
4146
4147        case RAM_SAVE_FLAG_ZERO:
4148            ch = qemu_get_byte(f);
4149            ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
4150            break;
4151
4152        case RAM_SAVE_FLAG_PAGE:
4153            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
4154            break;
4155
4156        case RAM_SAVE_FLAG_COMPRESS_PAGE:
4157            len = qemu_get_be32(f);
4158            if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
4159                error_report("Invalid compressed data length: %d", len);
4160                ret = -EINVAL;
4161                break;
4162            }
4163            decompress_data_with_multi_threads(f, host, len);
4164            break;
4165
4166        case RAM_SAVE_FLAG_XBZRLE:
4167            if (load_xbzrle(f, addr, host) < 0) {
4168                error_report("Failed to decompress XBZRLE page at "
4169                             RAM_ADDR_FMT, addr);
4170                ret = -EINVAL;
4171                break;
4172            }
4173            break;
4174        case RAM_SAVE_FLAG_EOS:
4175            /* normal exit */
4176            multifd_recv_sync_main();
4177            break;
4178        default:
4179            if (flags & RAM_SAVE_FLAG_HOOK) {
4180                ram_control_load_hook(f, RAM_CONTROL_HOOK, NULL);
4181            } else {
4182                error_report("Unknown combination of migration flags: %#x",
4183                             flags);
4184                ret = -EINVAL;
4185            }
4186        }
4187        if (!ret) {
4188            ret = qemu_file_get_error(f);
4189        }
4190    }
4191
4192    ret |= wait_for_decompress_done();
4193    rcu_read_unlock();
4194    trace_ram_load_complete(ret, seq_iter);
4195
4196    if (!ret  && migration_incoming_in_colo_state()) {
4197        colo_flush_ram_cache();
4198    }
4199    return ret;
4200}
4201
4202static bool ram_has_postcopy(void *opaque)
4203{
4204    RAMBlock *rb;
4205    RAMBLOCK_FOREACH_MIGRATABLE(rb) {
4206        if (ramblock_is_pmem(rb)) {
4207            info_report("Block: %s, host: %p is a nvdimm memory, postcopy"
4208                         "is not supported now!", rb->idstr, rb->host);
4209            return false;
4210        }
4211    }
4212
4213    return migrate_postcopy_ram();
4214}
4215
4216/* Sync all the dirty bitmap with destination VM.  */
4217static int ram_dirty_bitmap_sync_all(MigrationState *s, RAMState *rs)
4218{
4219    RAMBlock *block;
4220    QEMUFile *file = s->to_dst_file;
4221    int ramblock_count = 0;
4222
4223    trace_ram_dirty_bitmap_sync_start();
4224
4225    RAMBLOCK_FOREACH_MIGRATABLE(block) {
4226        qemu_savevm_send_recv_bitmap(file, block->idstr);
4227        trace_ram_dirty_bitmap_request(block->idstr);
4228        ramblock_count++;
4229    }
4230
4231    trace_ram_dirty_bitmap_sync_wait();
4232
4233    /* Wait until all the ramblocks' dirty bitmap synced */
4234    while (ramblock_count--) {
4235        qemu_sem_wait(&s->rp_state.rp_sem);
4236    }
4237
4238    trace_ram_dirty_bitmap_sync_complete();
4239
4240    return 0;
4241}
4242
4243static void ram_dirty_bitmap_reload_notify(MigrationState *s)
4244{
4245    qemu_sem_post(&s->rp_state.rp_sem);
4246}
4247
4248/*
4249 * Read the received bitmap, revert it as the initial dirty bitmap.
4250 * This is only used when the postcopy migration is paused but wants
4251 * to resume from a middle point.
4252 */
4253int ram_dirty_bitmap_reload(MigrationState *s, RAMBlock *block)
4254{
4255    int ret = -EINVAL;
4256    QEMUFile *file = s->rp_state.from_dst_file;
4257    unsigned long *le_bitmap, nbits = block->used_length >> TARGET_PAGE_BITS;
4258    uint64_t local_size = DIV_ROUND_UP(nbits, 8);
4259    uint64_t size, end_mark;
4260
4261    trace_ram_dirty_bitmap_reload_begin(block->idstr);
4262
4263    if (s->state != MIGRATION_STATUS_POSTCOPY_RECOVER) {
4264        error_report("%s: incorrect state %s", __func__,
4265                     MigrationStatus_str(s->state));
4266        return -EINVAL;
4267    }
4268
4269    /*
4270     * Note: see comments in ramblock_recv_bitmap_send() on why we
4271     * need the endianess convertion, and the paddings.
4272     */
4273    local_size = ROUND_UP(local_size, 8);
4274
4275    /* Add paddings */
4276    le_bitmap = bitmap_new(nbits + BITS_PER_LONG);
4277
4278    size = qemu_get_be64(file);
4279
4280    /* The size of the bitmap should match with our ramblock */
4281    if (size != local_size) {
4282        error_report("%s: ramblock '%s' bitmap size mismatch "
4283                     "(0x%"PRIx64" != 0x%"PRIx64")", __func__,
4284                     block->idstr, size, local_size);
4285        ret = -EINVAL;
4286        goto out;
4287    }
4288
4289    size = qemu_get_buffer(file, (uint8_t *)le_bitmap, local_size);
4290    end_mark = qemu_get_be64(file);
4291
4292    ret = qemu_file_get_error(file);
4293    if (ret || size != local_size) {
4294        error_report("%s: read bitmap failed for ramblock '%s': %d"
4295                     " (size 0x%"PRIx64", got: 0x%"PRIx64")",
4296                     __func__, block->idstr, ret, local_size, size);
4297        ret = -EIO;
4298        goto out;
4299    }
4300
4301    if (end_mark != RAMBLOCK_RECV_BITMAP_ENDING) {
4302        error_report("%s: ramblock '%s' end mark incorrect: 0x%"PRIu64,
4303                     __func__, block->idstr, end_mark);
4304        ret = -EINVAL;
4305        goto out;
4306    }
4307
4308    /*
4309     * Endianess convertion. We are during postcopy (though paused).
4310     * The dirty bitmap won't change. We can directly modify it.
4311     */
4312    bitmap_from_le(block->bmap, le_bitmap, nbits);
4313
4314    /*
4315     * What we received is "received bitmap". Revert it as the initial
4316     * dirty bitmap for this ramblock.
4317     */
4318    bitmap_complement(block->bmap, block->bmap, nbits);
4319
4320    trace_ram_dirty_bitmap_reload_complete(block->idstr);
4321
4322    /*
4323     * We succeeded to sync bitmap for current ramblock. If this is
4324     * the last one to sync, we need to notify the main send thread.
4325     */
4326    ram_dirty_bitmap_reload_notify(s);
4327
4328    ret = 0;
4329out:
4330    g_free(le_bitmap);
4331    return ret;
4332}
4333
4334static int ram_resume_prepare(MigrationState *s, void *opaque)
4335{
4336    RAMState *rs = *(RAMState **)opaque;
4337    int ret;
4338
4339    ret = ram_dirty_bitmap_sync_all(s, rs);
4340    if (ret) {
4341        return ret;
4342    }
4343
4344    ram_state_resume_prepare(rs, s->to_dst_file);
4345
4346    return 0;
4347}
4348
4349static SaveVMHandlers savevm_ram_handlers = {
4350    .save_setup = ram_save_setup,
4351    .save_live_iterate = ram_save_iterate,
4352    .save_live_complete_postcopy = ram_save_complete,
4353    .save_live_complete_precopy = ram_save_complete,
4354    .has_postcopy = ram_has_postcopy,
4355    .save_live_pending = ram_save_pending,
4356    .load_state = ram_load,
4357    .save_cleanup = ram_save_cleanup,
4358    .load_setup = ram_load_setup,
4359    .load_cleanup = ram_load_cleanup,
4360    .resume_prepare = ram_resume_prepare,
4361};
4362
4363void ram_mig_init(void)
4364{
4365    qemu_mutex_init(&XBZRLE.lock);
4366    register_savevm_live(NULL, "ram", 0, 4, &savevm_ram_handlers, &ram_state);
4367}
4368