qemu/migration/multifd.c
<<
>>
Prefs
   1/*
   2 * Multifd common code
   3 *
   4 * Copyright (c) 2019-2020 Red Hat Inc
   5 *
   6 * Authors:
   7 *  Juan Quintela <quintela@redhat.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  10 * See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include "qemu/rcu.h"
  15#include "exec/target_page.h"
  16#include "sysemu/sysemu.h"
  17#include "exec/ramblock.h"
  18#include "qemu/error-report.h"
  19#include "qapi/error.h"
  20#include "ram.h"
  21#include "migration.h"
  22#include "migration-stats.h"
  23#include "socket.h"
  24#include "tls.h"
  25#include "qemu-file.h"
  26#include "trace.h"
  27#include "multifd.h"
  28#include "threadinfo.h"
  29#include "options.h"
  30#include "qemu/yank.h"
  31#include "io/channel-socket.h"
  32#include "yank_functions.h"
  33
  34/* Multiple fd's */
  35
  36#define MULTIFD_MAGIC 0x11223344U
  37#define MULTIFD_VERSION 1
  38
  39typedef struct {
  40    uint32_t magic;
  41    uint32_t version;
  42    unsigned char uuid[16]; /* QemuUUID */
  43    uint8_t id;
  44    uint8_t unused1[7];     /* Reserved for future use */
  45    uint64_t unused2[4];    /* Reserved for future use */
  46} __attribute__((packed)) MultiFDInit_t;
  47
  48/* Multifd without compression */
  49
  50/**
  51 * nocomp_send_setup: setup send side
  52 *
  53 * For no compression this function does nothing.
  54 *
  55 * Returns 0 for success or -1 for error
  56 *
  57 * @p: Params for the channel that we are using
  58 * @errp: pointer to an error
  59 */
  60static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
  61{
  62    return 0;
  63}
  64
  65/**
  66 * nocomp_send_cleanup: cleanup send side
  67 *
  68 * For no compression this function does nothing.
  69 *
  70 * @p: Params for the channel that we are using
  71 * @errp: pointer to an error
  72 */
  73static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
  74{
  75    return;
  76}
  77
  78/**
  79 * nocomp_send_prepare: prepare date to be able to send
  80 *
  81 * For no compression we just have to calculate the size of the
  82 * packet.
  83 *
  84 * Returns 0 for success or -1 for error
  85 *
  86 * @p: Params for the channel that we are using
  87 * @errp: pointer to an error
  88 */
  89static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
  90{
  91    MultiFDPages_t *pages = p->pages;
  92
  93    for (int i = 0; i < p->normal_num; i++) {
  94        p->iov[p->iovs_num].iov_base = pages->block->host + p->normal[i];
  95        p->iov[p->iovs_num].iov_len = p->page_size;
  96        p->iovs_num++;
  97    }
  98
  99    p->next_packet_size = p->normal_num * p->page_size;
 100    p->flags |= MULTIFD_FLAG_NOCOMP;
 101    return 0;
 102}
 103
 104/**
 105 * nocomp_recv_setup: setup receive side
 106 *
 107 * For no compression this function does nothing.
 108 *
 109 * Returns 0 for success or -1 for error
 110 *
 111 * @p: Params for the channel that we are using
 112 * @errp: pointer to an error
 113 */
 114static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
 115{
 116    return 0;
 117}
 118
 119/**
 120 * nocomp_recv_cleanup: setup receive side
 121 *
 122 * For no compression this function does nothing.
 123 *
 124 * @p: Params for the channel that we are using
 125 */
 126static void nocomp_recv_cleanup(MultiFDRecvParams *p)
 127{
 128}
 129
 130/**
 131 * nocomp_recv_pages: read the data from the channel into actual pages
 132 *
 133 * For no compression we just need to read things into the correct place.
 134 *
 135 * Returns 0 for success or -1 for error
 136 *
 137 * @p: Params for the channel that we are using
 138 * @errp: pointer to an error
 139 */
 140static int nocomp_recv_pages(MultiFDRecvParams *p, Error **errp)
 141{
 142    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
 143
 144    if (flags != MULTIFD_FLAG_NOCOMP) {
 145        error_setg(errp, "multifd %u: flags received %x flags expected %x",
 146                   p->id, flags, MULTIFD_FLAG_NOCOMP);
 147        return -1;
 148    }
 149    for (int i = 0; i < p->normal_num; i++) {
 150        p->iov[i].iov_base = p->host + p->normal[i];
 151        p->iov[i].iov_len = p->page_size;
 152    }
 153    return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
 154}
 155
 156static MultiFDMethods multifd_nocomp_ops = {
 157    .send_setup = nocomp_send_setup,
 158    .send_cleanup = nocomp_send_cleanup,
 159    .send_prepare = nocomp_send_prepare,
 160    .recv_setup = nocomp_recv_setup,
 161    .recv_cleanup = nocomp_recv_cleanup,
 162    .recv_pages = nocomp_recv_pages
 163};
 164
 165static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
 166    [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
 167};
 168
 169void multifd_register_ops(int method, MultiFDMethods *ops)
 170{
 171    assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
 172    multifd_ops[method] = ops;
 173}
 174
 175static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
 176{
 177    MultiFDInit_t msg = {};
 178    size_t size = sizeof(msg);
 179    int ret;
 180
 181    msg.magic = cpu_to_be32(MULTIFD_MAGIC);
 182    msg.version = cpu_to_be32(MULTIFD_VERSION);
 183    msg.id = p->id;
 184    memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
 185
 186    ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
 187    if (ret != 0) {
 188        return -1;
 189    }
 190    stat64_add(&mig_stats.multifd_bytes, size);
 191    stat64_add(&mig_stats.transferred, size);
 192    return 0;
 193}
 194
 195static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
 196{
 197    MultiFDInit_t msg;
 198    int ret;
 199
 200    ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
 201    if (ret != 0) {
 202        return -1;
 203    }
 204
 205    msg.magic = be32_to_cpu(msg.magic);
 206    msg.version = be32_to_cpu(msg.version);
 207
 208    if (msg.magic != MULTIFD_MAGIC) {
 209        error_setg(errp, "multifd: received packet magic %x "
 210                   "expected %x", msg.magic, MULTIFD_MAGIC);
 211        return -1;
 212    }
 213
 214    if (msg.version != MULTIFD_VERSION) {
 215        error_setg(errp, "multifd: received packet version %u "
 216                   "expected %u", msg.version, MULTIFD_VERSION);
 217        return -1;
 218    }
 219
 220    if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
 221        char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
 222        char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
 223
 224        error_setg(errp, "multifd: received uuid '%s' and expected "
 225                   "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
 226        g_free(uuid);
 227        g_free(msg_uuid);
 228        return -1;
 229    }
 230
 231    if (msg.id > migrate_multifd_channels()) {
 232        error_setg(errp, "multifd: received channel version %u "
 233                   "expected %u", msg.version, MULTIFD_VERSION);
 234        return -1;
 235    }
 236
 237    return msg.id;
 238}
 239
 240static MultiFDPages_t *multifd_pages_init(size_t size)
 241{
 242    MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
 243
 244    pages->allocated = size;
 245    pages->offset = g_new0(ram_addr_t, size);
 246
 247    return pages;
 248}
 249
 250static void multifd_pages_clear(MultiFDPages_t *pages)
 251{
 252    pages->num = 0;
 253    pages->allocated = 0;
 254    pages->packet_num = 0;
 255    pages->block = NULL;
 256    g_free(pages->offset);
 257    pages->offset = NULL;
 258    g_free(pages);
 259}
 260
 261static void multifd_send_fill_packet(MultiFDSendParams *p)
 262{
 263    MultiFDPacket_t *packet = p->packet;
 264    int i;
 265
 266    packet->flags = cpu_to_be32(p->flags);
 267    packet->pages_alloc = cpu_to_be32(p->pages->allocated);
 268    packet->normal_pages = cpu_to_be32(p->normal_num);
 269    packet->next_packet_size = cpu_to_be32(p->next_packet_size);
 270    packet->packet_num = cpu_to_be64(p->packet_num);
 271
 272    if (p->pages->block) {
 273        strncpy(packet->ramblock, p->pages->block->idstr, 256);
 274    }
 275
 276    for (i = 0; i < p->normal_num; i++) {
 277        /* there are architectures where ram_addr_t is 32 bit */
 278        uint64_t temp = p->normal[i];
 279
 280        packet->offset[i] = cpu_to_be64(temp);
 281    }
 282}
 283
 284static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
 285{
 286    MultiFDPacket_t *packet = p->packet;
 287    int i;
 288
 289    packet->magic = be32_to_cpu(packet->magic);
 290    if (packet->magic != MULTIFD_MAGIC) {
 291        error_setg(errp, "multifd: received packet "
 292                   "magic %x and expected magic %x",
 293                   packet->magic, MULTIFD_MAGIC);
 294        return -1;
 295    }
 296
 297    packet->version = be32_to_cpu(packet->version);
 298    if (packet->version != MULTIFD_VERSION) {
 299        error_setg(errp, "multifd: received packet "
 300                   "version %u and expected version %u",
 301                   packet->version, MULTIFD_VERSION);
 302        return -1;
 303    }
 304
 305    p->flags = be32_to_cpu(packet->flags);
 306
 307    packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
 308    /*
 309     * If we received a packet that is 100 times bigger than expected
 310     * just stop migration.  It is a magic number.
 311     */
 312    if (packet->pages_alloc > p->page_count) {
 313        error_setg(errp, "multifd: received packet "
 314                   "with size %u and expected a size of %u",
 315                   packet->pages_alloc, p->page_count) ;
 316        return -1;
 317    }
 318
 319    p->normal_num = be32_to_cpu(packet->normal_pages);
 320    if (p->normal_num > packet->pages_alloc) {
 321        error_setg(errp, "multifd: received packet "
 322                   "with %u pages and expected maximum pages are %u",
 323                   p->normal_num, packet->pages_alloc) ;
 324        return -1;
 325    }
 326
 327    p->next_packet_size = be32_to_cpu(packet->next_packet_size);
 328    p->packet_num = be64_to_cpu(packet->packet_num);
 329
 330    if (p->normal_num == 0) {
 331        return 0;
 332    }
 333
 334    /* make sure that ramblock is 0 terminated */
 335    packet->ramblock[255] = 0;
 336    p->block = qemu_ram_block_by_name(packet->ramblock);
 337    if (!p->block) {
 338        error_setg(errp, "multifd: unknown ram block %s",
 339                   packet->ramblock);
 340        return -1;
 341    }
 342
 343    p->host = p->block->host;
 344    for (i = 0; i < p->normal_num; i++) {
 345        uint64_t offset = be64_to_cpu(packet->offset[i]);
 346
 347        if (offset > (p->block->used_length - p->page_size)) {
 348            error_setg(errp, "multifd: offset too long %" PRIu64
 349                       " (max " RAM_ADDR_FMT ")",
 350                       offset, p->block->used_length);
 351            return -1;
 352        }
 353        p->normal[i] = offset;
 354    }
 355
 356    return 0;
 357}
 358
 359struct {
 360    MultiFDSendParams *params;
 361    /* array of pages to sent */
 362    MultiFDPages_t *pages;
 363    /* global number of generated multifd packets */
 364    uint64_t packet_num;
 365    /* send channels ready */
 366    QemuSemaphore channels_ready;
 367    /*
 368     * Have we already run terminate threads.  There is a race when it
 369     * happens that we got one error while we are exiting.
 370     * We will use atomic operations.  Only valid values are 0 and 1.
 371     */
 372    int exiting;
 373    /* multifd ops */
 374    MultiFDMethods *ops;
 375} *multifd_send_state;
 376
 377/*
 378 * How we use multifd_send_state->pages and channel->pages?
 379 *
 380 * We create a pages for each channel, and a main one.  Each time that
 381 * we need to send a batch of pages we interchange the ones between
 382 * multifd_send_state and the channel that is sending it.  There are
 383 * two reasons for that:
 384 *    - to not have to do so many mallocs during migration
 385 *    - to make easier to know what to free at the end of migration
 386 *
 387 * This way we always know who is the owner of each "pages" struct,
 388 * and we don't need any locking.  It belongs to the migration thread
 389 * or to the channel thread.  Switching is safe because the migration
 390 * thread is using the channel mutex when changing it, and the channel
 391 * have to had finish with its own, otherwise pending_job can't be
 392 * false.
 393 */
 394
 395static int multifd_send_pages(QEMUFile *f)
 396{
 397    int i;
 398    static int next_channel;
 399    MultiFDSendParams *p = NULL; /* make happy gcc */
 400    MultiFDPages_t *pages = multifd_send_state->pages;
 401
 402    if (qatomic_read(&multifd_send_state->exiting)) {
 403        return -1;
 404    }
 405
 406    qemu_sem_wait(&multifd_send_state->channels_ready);
 407    /*
 408     * next_channel can remain from a previous migration that was
 409     * using more channels, so ensure it doesn't overflow if the
 410     * limit is lower now.
 411     */
 412    next_channel %= migrate_multifd_channels();
 413    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
 414        p = &multifd_send_state->params[i];
 415
 416        qemu_mutex_lock(&p->mutex);
 417        if (p->quit) {
 418            error_report("%s: channel %d has already quit!", __func__, i);
 419            qemu_mutex_unlock(&p->mutex);
 420            return -1;
 421        }
 422        if (!p->pending_job) {
 423            p->pending_job++;
 424            next_channel = (i + 1) % migrate_multifd_channels();
 425            break;
 426        }
 427        qemu_mutex_unlock(&p->mutex);
 428    }
 429    assert(!p->pages->num);
 430    assert(!p->pages->block);
 431
 432    p->packet_num = multifd_send_state->packet_num++;
 433    multifd_send_state->pages = p->pages;
 434    p->pages = pages;
 435    qemu_mutex_unlock(&p->mutex);
 436    qemu_sem_post(&p->sem);
 437
 438    return 1;
 439}
 440
 441int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
 442{
 443    MultiFDPages_t *pages = multifd_send_state->pages;
 444    bool changed = false;
 445
 446    if (!pages->block) {
 447        pages->block = block;
 448    }
 449
 450    if (pages->block == block) {
 451        pages->offset[pages->num] = offset;
 452        pages->num++;
 453
 454        if (pages->num < pages->allocated) {
 455            return 1;
 456        }
 457    } else {
 458        changed = true;
 459    }
 460
 461    if (multifd_send_pages(f) < 0) {
 462        return -1;
 463    }
 464
 465    if (changed) {
 466        return multifd_queue_page(f, block, offset);
 467    }
 468
 469    return 1;
 470}
 471
 472static void multifd_send_terminate_threads(Error *err)
 473{
 474    int i;
 475
 476    trace_multifd_send_terminate_threads(err != NULL);
 477
 478    if (err) {
 479        MigrationState *s = migrate_get_current();
 480        migrate_set_error(s, err);
 481        if (s->state == MIGRATION_STATUS_SETUP ||
 482            s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
 483            s->state == MIGRATION_STATUS_DEVICE ||
 484            s->state == MIGRATION_STATUS_ACTIVE) {
 485            migrate_set_state(&s->state, s->state,
 486                              MIGRATION_STATUS_FAILED);
 487        }
 488    }
 489
 490    /*
 491     * We don't want to exit each threads twice.  Depending on where
 492     * we get the error, or if there are two independent errors in two
 493     * threads at the same time, we can end calling this function
 494     * twice.
 495     */
 496    if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
 497        return;
 498    }
 499
 500    for (i = 0; i < migrate_multifd_channels(); i++) {
 501        MultiFDSendParams *p = &multifd_send_state->params[i];
 502
 503        qemu_mutex_lock(&p->mutex);
 504        p->quit = true;
 505        qemu_sem_post(&p->sem);
 506        if (p->c) {
 507            qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
 508        }
 509        qemu_mutex_unlock(&p->mutex);
 510    }
 511}
 512
 513void multifd_save_cleanup(void)
 514{
 515    int i;
 516
 517    if (!migrate_multifd()) {
 518        return;
 519    }
 520    multifd_send_terminate_threads(NULL);
 521    for (i = 0; i < migrate_multifd_channels(); i++) {
 522        MultiFDSendParams *p = &multifd_send_state->params[i];
 523
 524        if (p->running) {
 525            qemu_thread_join(&p->thread);
 526        }
 527    }
 528    for (i = 0; i < migrate_multifd_channels(); i++) {
 529        MultiFDSendParams *p = &multifd_send_state->params[i];
 530        Error *local_err = NULL;
 531
 532        if (p->registered_yank) {
 533            migration_ioc_unregister_yank(p->c);
 534        }
 535        socket_send_channel_destroy(p->c);
 536        p->c = NULL;
 537        qemu_mutex_destroy(&p->mutex);
 538        qemu_sem_destroy(&p->sem);
 539        qemu_sem_destroy(&p->sem_sync);
 540        g_free(p->name);
 541        p->name = NULL;
 542        multifd_pages_clear(p->pages);
 543        p->pages = NULL;
 544        p->packet_len = 0;
 545        g_free(p->packet);
 546        p->packet = NULL;
 547        g_free(p->iov);
 548        p->iov = NULL;
 549        g_free(p->normal);
 550        p->normal = NULL;
 551        multifd_send_state->ops->send_cleanup(p, &local_err);
 552        if (local_err) {
 553            migrate_set_error(migrate_get_current(), local_err);
 554            error_free(local_err);
 555        }
 556    }
 557    qemu_sem_destroy(&multifd_send_state->channels_ready);
 558    g_free(multifd_send_state->params);
 559    multifd_send_state->params = NULL;
 560    multifd_pages_clear(multifd_send_state->pages);
 561    multifd_send_state->pages = NULL;
 562    g_free(multifd_send_state);
 563    multifd_send_state = NULL;
 564}
 565
 566static int multifd_zero_copy_flush(QIOChannel *c)
 567{
 568    int ret;
 569    Error *err = NULL;
 570
 571    ret = qio_channel_flush(c, &err);
 572    if (ret < 0) {
 573        error_report_err(err);
 574        return -1;
 575    }
 576    if (ret == 1) {
 577        stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
 578    }
 579
 580    return ret;
 581}
 582
 583int multifd_send_sync_main(QEMUFile *f)
 584{
 585    int i;
 586    bool flush_zero_copy;
 587
 588    if (!migrate_multifd()) {
 589        return 0;
 590    }
 591    if (multifd_send_state->pages->num) {
 592        if (multifd_send_pages(f) < 0) {
 593            error_report("%s: multifd_send_pages fail", __func__);
 594            return -1;
 595        }
 596    }
 597
 598    /*
 599     * When using zero-copy, it's necessary to flush the pages before any of
 600     * the pages can be sent again, so we'll make sure the new version of the
 601     * pages will always arrive _later_ than the old pages.
 602     *
 603     * Currently we achieve this by flushing the zero-page requested writes
 604     * per ram iteration, but in the future we could potentially optimize it
 605     * to be less frequent, e.g. only after we finished one whole scanning of
 606     * all the dirty bitmaps.
 607     */
 608
 609    flush_zero_copy = migrate_zero_copy_send();
 610
 611    for (i = 0; i < migrate_multifd_channels(); i++) {
 612        MultiFDSendParams *p = &multifd_send_state->params[i];
 613
 614        trace_multifd_send_sync_main_signal(p->id);
 615
 616        qemu_mutex_lock(&p->mutex);
 617
 618        if (p->quit) {
 619            error_report("%s: channel %d has already quit", __func__, i);
 620            qemu_mutex_unlock(&p->mutex);
 621            return -1;
 622        }
 623
 624        p->packet_num = multifd_send_state->packet_num++;
 625        p->flags |= MULTIFD_FLAG_SYNC;
 626        p->pending_job++;
 627        qemu_mutex_unlock(&p->mutex);
 628        qemu_sem_post(&p->sem);
 629    }
 630    for (i = 0; i < migrate_multifd_channels(); i++) {
 631        MultiFDSendParams *p = &multifd_send_state->params[i];
 632
 633        qemu_sem_wait(&multifd_send_state->channels_ready);
 634        trace_multifd_send_sync_main_wait(p->id);
 635        qemu_sem_wait(&p->sem_sync);
 636
 637        if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
 638            return -1;
 639        }
 640    }
 641    trace_multifd_send_sync_main(multifd_send_state->packet_num);
 642
 643    return 0;
 644}
 645
 646static void *multifd_send_thread(void *opaque)
 647{
 648    MultiFDSendParams *p = opaque;
 649    MigrationThread *thread = NULL;
 650    Error *local_err = NULL;
 651    int ret = 0;
 652    bool use_zero_copy_send = migrate_zero_copy_send();
 653
 654    thread = migration_threads_add(p->name, qemu_get_thread_id());
 655
 656    trace_multifd_send_thread_start(p->id);
 657    rcu_register_thread();
 658
 659    if (multifd_send_initial_packet(p, &local_err) < 0) {
 660        ret = -1;
 661        goto out;
 662    }
 663    /* initial packet */
 664    p->num_packets = 1;
 665
 666    while (true) {
 667        qemu_sem_post(&multifd_send_state->channels_ready);
 668        qemu_sem_wait(&p->sem);
 669
 670        if (qatomic_read(&multifd_send_state->exiting)) {
 671            break;
 672        }
 673        qemu_mutex_lock(&p->mutex);
 674
 675        if (p->pending_job) {
 676            uint64_t packet_num = p->packet_num;
 677            uint32_t flags;
 678            p->normal_num = 0;
 679
 680            if (use_zero_copy_send) {
 681                p->iovs_num = 0;
 682            } else {
 683                p->iovs_num = 1;
 684            }
 685
 686            for (int i = 0; i < p->pages->num; i++) {
 687                p->normal[p->normal_num] = p->pages->offset[i];
 688                p->normal_num++;
 689            }
 690
 691            if (p->normal_num) {
 692                ret = multifd_send_state->ops->send_prepare(p, &local_err);
 693                if (ret != 0) {
 694                    qemu_mutex_unlock(&p->mutex);
 695                    break;
 696                }
 697            }
 698            multifd_send_fill_packet(p);
 699            flags = p->flags;
 700            p->flags = 0;
 701            p->num_packets++;
 702            p->total_normal_pages += p->normal_num;
 703            p->pages->num = 0;
 704            p->pages->block = NULL;
 705            qemu_mutex_unlock(&p->mutex);
 706
 707            trace_multifd_send(p->id, packet_num, p->normal_num, flags,
 708                               p->next_packet_size);
 709
 710            if (use_zero_copy_send) {
 711                /* Send header first, without zerocopy */
 712                ret = qio_channel_write_all(p->c, (void *)p->packet,
 713                                            p->packet_len, &local_err);
 714                if (ret != 0) {
 715                    break;
 716                }
 717                stat64_add(&mig_stats.multifd_bytes, p->packet_len);
 718                stat64_add(&mig_stats.transferred, p->packet_len);
 719            } else {
 720                /* Send header using the same writev call */
 721                p->iov[0].iov_len = p->packet_len;
 722                p->iov[0].iov_base = p->packet;
 723            }
 724
 725            ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
 726                                              0, p->write_flags, &local_err);
 727            if (ret != 0) {
 728                break;
 729            }
 730
 731            stat64_add(&mig_stats.multifd_bytes, p->next_packet_size);
 732            stat64_add(&mig_stats.transferred, p->next_packet_size);
 733            qemu_mutex_lock(&p->mutex);
 734            p->pending_job--;
 735            qemu_mutex_unlock(&p->mutex);
 736
 737            if (flags & MULTIFD_FLAG_SYNC) {
 738                qemu_sem_post(&p->sem_sync);
 739            }
 740        } else if (p->quit) {
 741            qemu_mutex_unlock(&p->mutex);
 742            break;
 743        } else {
 744            qemu_mutex_unlock(&p->mutex);
 745            /* sometimes there are spurious wakeups */
 746        }
 747    }
 748
 749out:
 750    if (local_err) {
 751        trace_multifd_send_error(p->id);
 752        multifd_send_terminate_threads(local_err);
 753        error_free(local_err);
 754    }
 755
 756    /*
 757     * Error happen, I will exit, but I can't just leave, tell
 758     * who pay attention to me.
 759     */
 760    if (ret != 0) {
 761        qemu_sem_post(&p->sem_sync);
 762        qemu_sem_post(&multifd_send_state->channels_ready);
 763    }
 764
 765    qemu_mutex_lock(&p->mutex);
 766    p->running = false;
 767    qemu_mutex_unlock(&p->mutex);
 768
 769    rcu_unregister_thread();
 770    migration_threads_remove(thread);
 771    trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages);
 772
 773    return NULL;
 774}
 775
 776static bool multifd_channel_connect(MultiFDSendParams *p,
 777                                    QIOChannel *ioc,
 778                                    Error *error);
 779
 780static void multifd_tls_outgoing_handshake(QIOTask *task,
 781                                           gpointer opaque)
 782{
 783    MultiFDSendParams *p = opaque;
 784    QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
 785    Error *err = NULL;
 786
 787    if (qio_task_propagate_error(task, &err)) {
 788        trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
 789    } else {
 790        trace_multifd_tls_outgoing_handshake_complete(ioc);
 791    }
 792
 793    if (!multifd_channel_connect(p, ioc, err)) {
 794        /*
 795         * Error happen, mark multifd_send_thread status as 'quit' although it
 796         * is not created, and then tell who pay attention to me.
 797         */
 798        p->quit = true;
 799        qemu_sem_post(&multifd_send_state->channels_ready);
 800        qemu_sem_post(&p->sem_sync);
 801    }
 802}
 803
 804static void *multifd_tls_handshake_thread(void *opaque)
 805{
 806    MultiFDSendParams *p = opaque;
 807    QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c);
 808
 809    qio_channel_tls_handshake(tioc,
 810                              multifd_tls_outgoing_handshake,
 811                              p,
 812                              NULL,
 813                              NULL);
 814    return NULL;
 815}
 816
 817static void multifd_tls_channel_connect(MultiFDSendParams *p,
 818                                        QIOChannel *ioc,
 819                                        Error **errp)
 820{
 821    MigrationState *s = migrate_get_current();
 822    const char *hostname = s->hostname;
 823    QIOChannelTLS *tioc;
 824
 825    tioc = migration_tls_client_create(ioc, hostname, errp);
 826    if (!tioc) {
 827        return;
 828    }
 829
 830    object_unref(OBJECT(ioc));
 831    trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
 832    qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
 833    p->c = QIO_CHANNEL(tioc);
 834    qemu_thread_create(&p->thread, "multifd-tls-handshake-worker",
 835                       multifd_tls_handshake_thread, p,
 836                       QEMU_THREAD_JOINABLE);
 837}
 838
 839static bool multifd_channel_connect(MultiFDSendParams *p,
 840                                    QIOChannel *ioc,
 841                                    Error *error)
 842{
 843    trace_multifd_set_outgoing_channel(
 844        ioc, object_get_typename(OBJECT(ioc)),
 845        migrate_get_current()->hostname, error);
 846
 847    if (error) {
 848        return false;
 849    }
 850    if (migrate_channel_requires_tls_upgrade(ioc)) {
 851        multifd_tls_channel_connect(p, ioc, &error);
 852        if (!error) {
 853            /*
 854             * tls_channel_connect will call back to this
 855             * function after the TLS handshake,
 856             * so we mustn't call multifd_send_thread until then
 857             */
 858            return true;
 859        } else {
 860            return false;
 861        }
 862    } else {
 863        migration_ioc_register_yank(ioc);
 864        p->registered_yank = true;
 865        p->c = ioc;
 866        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
 867                           QEMU_THREAD_JOINABLE);
 868    }
 869    return true;
 870}
 871
 872static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
 873                                             QIOChannel *ioc, Error *err)
 874{
 875     migrate_set_error(migrate_get_current(), err);
 876     /* Error happen, we need to tell who pay attention to me */
 877     qemu_sem_post(&multifd_send_state->channels_ready);
 878     qemu_sem_post(&p->sem_sync);
 879     /*
 880      * Although multifd_send_thread is not created, but main migration
 881      * thread need to judge whether it is running, so we need to mark
 882      * its status.
 883      */
 884     p->quit = true;
 885     object_unref(OBJECT(ioc));
 886     error_free(err);
 887}
 888
 889static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
 890{
 891    MultiFDSendParams *p = opaque;
 892    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
 893    Error *local_err = NULL;
 894
 895    trace_multifd_new_send_channel_async(p->id);
 896    if (!qio_task_propagate_error(task, &local_err)) {
 897        p->c = sioc;
 898        qio_channel_set_delay(p->c, false);
 899        p->running = true;
 900        if (multifd_channel_connect(p, sioc, local_err)) {
 901            return;
 902        }
 903    }
 904
 905    multifd_new_send_channel_cleanup(p, sioc, local_err);
 906}
 907
 908int multifd_save_setup(Error **errp)
 909{
 910    int thread_count;
 911    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
 912    uint8_t i;
 913
 914    if (!migrate_multifd()) {
 915        return 0;
 916    }
 917
 918    thread_count = migrate_multifd_channels();
 919    multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
 920    multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
 921    multifd_send_state->pages = multifd_pages_init(page_count);
 922    qemu_sem_init(&multifd_send_state->channels_ready, 0);
 923    qatomic_set(&multifd_send_state->exiting, 0);
 924    multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
 925
 926    for (i = 0; i < thread_count; i++) {
 927        MultiFDSendParams *p = &multifd_send_state->params[i];
 928
 929        qemu_mutex_init(&p->mutex);
 930        qemu_sem_init(&p->sem, 0);
 931        qemu_sem_init(&p->sem_sync, 0);
 932        p->quit = false;
 933        p->pending_job = 0;
 934        p->id = i;
 935        p->pages = multifd_pages_init(page_count);
 936        p->packet_len = sizeof(MultiFDPacket_t)
 937                      + sizeof(uint64_t) * page_count;
 938        p->packet = g_malloc0(p->packet_len);
 939        p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
 940        p->packet->version = cpu_to_be32(MULTIFD_VERSION);
 941        p->name = g_strdup_printf("multifdsend_%d", i);
 942        /* We need one extra place for the packet header */
 943        p->iov = g_new0(struct iovec, page_count + 1);
 944        p->normal = g_new0(ram_addr_t, page_count);
 945        p->page_size = qemu_target_page_size();
 946        p->page_count = page_count;
 947
 948        if (migrate_zero_copy_send()) {
 949            p->write_flags = QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
 950        } else {
 951            p->write_flags = 0;
 952        }
 953
 954        socket_send_channel_create(multifd_new_send_channel_async, p);
 955    }
 956
 957    for (i = 0; i < thread_count; i++) {
 958        MultiFDSendParams *p = &multifd_send_state->params[i];
 959        Error *local_err = NULL;
 960        int ret;
 961
 962        ret = multifd_send_state->ops->send_setup(p, &local_err);
 963        if (ret) {
 964            error_propagate(errp, local_err);
 965            return ret;
 966        }
 967    }
 968    return 0;
 969}
 970
 971struct {
 972    MultiFDRecvParams *params;
 973    /* number of created threads */
 974    int count;
 975    /* syncs main thread and channels */
 976    QemuSemaphore sem_sync;
 977    /* global number of generated multifd packets */
 978    uint64_t packet_num;
 979    /* multifd ops */
 980    MultiFDMethods *ops;
 981} *multifd_recv_state;
 982
 983static void multifd_recv_terminate_threads(Error *err)
 984{
 985    int i;
 986
 987    trace_multifd_recv_terminate_threads(err != NULL);
 988
 989    if (err) {
 990        MigrationState *s = migrate_get_current();
 991        migrate_set_error(s, err);
 992        if (s->state == MIGRATION_STATUS_SETUP ||
 993            s->state == MIGRATION_STATUS_ACTIVE) {
 994            migrate_set_state(&s->state, s->state,
 995                              MIGRATION_STATUS_FAILED);
 996        }
 997    }
 998
 999    for (i = 0; i < migrate_multifd_channels(); i++) {
1000        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1001
1002        qemu_mutex_lock(&p->mutex);
1003        p->quit = true;
1004        /*
1005         * We could arrive here for two reasons:
1006         *  - normal quit, i.e. everything went fine, just finished
1007         *  - error quit: We close the channels so the channel threads
1008         *    finish the qio_channel_read_all_eof()
1009         */
1010        if (p->c) {
1011            qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1012        }
1013        qemu_mutex_unlock(&p->mutex);
1014    }
1015}
1016
1017void multifd_load_shutdown(void)
1018{
1019    if (migrate_multifd()) {
1020        multifd_recv_terminate_threads(NULL);
1021    }
1022}
1023
1024void multifd_load_cleanup(void)
1025{
1026    int i;
1027
1028    if (!migrate_multifd()) {
1029        return;
1030    }
1031    multifd_recv_terminate_threads(NULL);
1032    for (i = 0; i < migrate_multifd_channels(); i++) {
1033        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1034
1035        if (p->running) {
1036            /*
1037             * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
1038             * however try to wakeup it without harm in cleanup phase.
1039             */
1040            qemu_sem_post(&p->sem_sync);
1041        }
1042
1043        qemu_thread_join(&p->thread);
1044    }
1045    for (i = 0; i < migrate_multifd_channels(); i++) {
1046        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1047
1048        migration_ioc_unregister_yank(p->c);
1049        object_unref(OBJECT(p->c));
1050        p->c = NULL;
1051        qemu_mutex_destroy(&p->mutex);
1052        qemu_sem_destroy(&p->sem_sync);
1053        g_free(p->name);
1054        p->name = NULL;
1055        p->packet_len = 0;
1056        g_free(p->packet);
1057        p->packet = NULL;
1058        g_free(p->iov);
1059        p->iov = NULL;
1060        g_free(p->normal);
1061        p->normal = NULL;
1062        multifd_recv_state->ops->recv_cleanup(p);
1063    }
1064    qemu_sem_destroy(&multifd_recv_state->sem_sync);
1065    g_free(multifd_recv_state->params);
1066    multifd_recv_state->params = NULL;
1067    g_free(multifd_recv_state);
1068    multifd_recv_state = NULL;
1069}
1070
1071void multifd_recv_sync_main(void)
1072{
1073    int i;
1074
1075    if (!migrate_multifd()) {
1076        return;
1077    }
1078    for (i = 0; i < migrate_multifd_channels(); i++) {
1079        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1080
1081        trace_multifd_recv_sync_main_wait(p->id);
1082        qemu_sem_wait(&multifd_recv_state->sem_sync);
1083    }
1084    for (i = 0; i < migrate_multifd_channels(); i++) {
1085        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1086
1087        WITH_QEMU_LOCK_GUARD(&p->mutex) {
1088            if (multifd_recv_state->packet_num < p->packet_num) {
1089                multifd_recv_state->packet_num = p->packet_num;
1090            }
1091        }
1092        trace_multifd_recv_sync_main_signal(p->id);
1093        qemu_sem_post(&p->sem_sync);
1094    }
1095    trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1096}
1097
1098static void *multifd_recv_thread(void *opaque)
1099{
1100    MultiFDRecvParams *p = opaque;
1101    Error *local_err = NULL;
1102    int ret;
1103
1104    trace_multifd_recv_thread_start(p->id);
1105    rcu_register_thread();
1106
1107    while (true) {
1108        uint32_t flags;
1109
1110        if (p->quit) {
1111            break;
1112        }
1113
1114        ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1115                                       p->packet_len, &local_err);
1116        if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
1117            break;
1118        }
1119
1120        qemu_mutex_lock(&p->mutex);
1121        ret = multifd_recv_unfill_packet(p, &local_err);
1122        if (ret) {
1123            qemu_mutex_unlock(&p->mutex);
1124            break;
1125        }
1126
1127        flags = p->flags;
1128        /* recv methods don't know how to handle the SYNC flag */
1129        p->flags &= ~MULTIFD_FLAG_SYNC;
1130        trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
1131                           p->next_packet_size);
1132        p->num_packets++;
1133        p->total_normal_pages += p->normal_num;
1134        qemu_mutex_unlock(&p->mutex);
1135
1136        if (p->normal_num) {
1137            ret = multifd_recv_state->ops->recv_pages(p, &local_err);
1138            if (ret != 0) {
1139                break;
1140            }
1141        }
1142
1143        if (flags & MULTIFD_FLAG_SYNC) {
1144            qemu_sem_post(&multifd_recv_state->sem_sync);
1145            qemu_sem_wait(&p->sem_sync);
1146        }
1147    }
1148
1149    if (local_err) {
1150        multifd_recv_terminate_threads(local_err);
1151        error_free(local_err);
1152    }
1153    qemu_mutex_lock(&p->mutex);
1154    p->running = false;
1155    qemu_mutex_unlock(&p->mutex);
1156
1157    rcu_unregister_thread();
1158    trace_multifd_recv_thread_end(p->id, p->num_packets, p->total_normal_pages);
1159
1160    return NULL;
1161}
1162
1163int multifd_load_setup(Error **errp)
1164{
1165    int thread_count;
1166    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1167    uint8_t i;
1168
1169    /*
1170     * Return successfully if multiFD recv state is already initialised
1171     * or multiFD is not enabled.
1172     */
1173    if (multifd_recv_state || !migrate_multifd()) {
1174        return 0;
1175    }
1176
1177    thread_count = migrate_multifd_channels();
1178    multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1179    multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1180    qatomic_set(&multifd_recv_state->count, 0);
1181    qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1182    multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1183
1184    for (i = 0; i < thread_count; i++) {
1185        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1186
1187        qemu_mutex_init(&p->mutex);
1188        qemu_sem_init(&p->sem_sync, 0);
1189        p->quit = false;
1190        p->id = i;
1191        p->packet_len = sizeof(MultiFDPacket_t)
1192                      + sizeof(uint64_t) * page_count;
1193        p->packet = g_malloc0(p->packet_len);
1194        p->name = g_strdup_printf("multifdrecv_%d", i);
1195        p->iov = g_new0(struct iovec, page_count);
1196        p->normal = g_new0(ram_addr_t, page_count);
1197        p->page_count = page_count;
1198        p->page_size = qemu_target_page_size();
1199    }
1200
1201    for (i = 0; i < thread_count; i++) {
1202        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1203        Error *local_err = NULL;
1204        int ret;
1205
1206        ret = multifd_recv_state->ops->recv_setup(p, &local_err);
1207        if (ret) {
1208            error_propagate(errp, local_err);
1209            return ret;
1210        }
1211    }
1212    return 0;
1213}
1214
1215bool multifd_recv_all_channels_created(void)
1216{
1217    int thread_count = migrate_multifd_channels();
1218
1219    if (!migrate_multifd()) {
1220        return true;
1221    }
1222
1223    if (!multifd_recv_state) {
1224        /* Called before any connections created */
1225        return false;
1226    }
1227
1228    return thread_count == qatomic_read(&multifd_recv_state->count);
1229}
1230
1231/*
1232 * Try to receive all multifd channels to get ready for the migration.
1233 * Sets @errp when failing to receive the current channel.
1234 */
1235void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1236{
1237    MultiFDRecvParams *p;
1238    Error *local_err = NULL;
1239    int id;
1240
1241    id = multifd_recv_initial_packet(ioc, &local_err);
1242    if (id < 0) {
1243        multifd_recv_terminate_threads(local_err);
1244        error_propagate_prepend(errp, local_err,
1245                                "failed to receive packet"
1246                                " via multifd channel %d: ",
1247                                qatomic_read(&multifd_recv_state->count));
1248        return;
1249    }
1250    trace_multifd_recv_new_channel(id);
1251
1252    p = &multifd_recv_state->params[id];
1253    if (p->c != NULL) {
1254        error_setg(&local_err, "multifd: received id '%d' already setup'",
1255                   id);
1256        multifd_recv_terminate_threads(local_err);
1257        error_propagate(errp, local_err);
1258        return;
1259    }
1260    p->c = ioc;
1261    object_ref(OBJECT(ioc));
1262    /* initial packet */
1263    p->num_packets = 1;
1264
1265    p->running = true;
1266    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1267                       QEMU_THREAD_JOINABLE);
1268    qatomic_inc(&multifd_recv_state->count);
1269}
1270