qemu/migration/multifd.c
<<
>>
Prefs
   1/*
   2 * Multifd common code
   3 *
   4 * Copyright (c) 2019-2020 Red Hat Inc
   5 *
   6 * Authors:
   7 *  Juan Quintela <quintela@redhat.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  10 * See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include "qemu/cutils.h"
  15#include "qemu/iov.h"
  16#include "qemu/rcu.h"
  17#include "exec/target_page.h"
  18#include "system/system.h"
  19#include "system/ramblock.h"
  20#include "qemu/error-report.h"
  21#include "qapi/error.h"
  22#include "file.h"
  23#include "migration/misc.h"
  24#include "migration.h"
  25#include "migration-stats.h"
  26#include "savevm.h"
  27#include "socket.h"
  28#include "tls.h"
  29#include "qemu-file.h"
  30#include "trace.h"
  31#include "multifd.h"
  32#include "threadinfo.h"
  33#include "options.h"
  34#include "qemu/yank.h"
  35#include "io/channel-file.h"
  36#include "io/channel-socket.h"
  37#include "yank_functions.h"
  38
  39typedef struct {
  40    uint32_t magic;
  41    uint32_t version;
  42    unsigned char uuid[16]; /* QemuUUID */
  43    uint8_t id;
  44    uint8_t unused1[7];     /* Reserved for future use */
  45    uint64_t unused2[4];    /* Reserved for future use */
  46} __attribute__((packed)) MultiFDInit_t;
  47
  48struct {
  49    MultiFDSendParams *params;
  50
  51    /* multifd_send() body is not thread safe, needs serialization */
  52    QemuMutex multifd_send_mutex;
  53
  54    /*
  55     * Global number of generated multifd packets.
  56     *
  57     * Note that we used 'uintptr_t' because it'll naturally support atomic
  58     * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
  59     * multifd will overflow the packet_num easier, but that should be
  60     * fine.
  61     *
  62     * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
  63     * hosts, however so far it does not support atomic fetch_add() yet.
  64     * Make it easy for now.
  65     */
  66    uintptr_t packet_num;
  67    /*
  68     * Synchronization point past which no more channels will be
  69     * created.
  70     */
  71    QemuSemaphore channels_created;
  72    /* send channels ready */
  73    QemuSemaphore channels_ready;
  74    /*
  75     * Have we already run terminate threads.  There is a race when it
  76     * happens that we got one error while we are exiting.
  77     * We will use atomic operations.  Only valid values are 0 and 1.
  78     */
  79    int exiting;
  80    /* multifd ops */
  81    const MultiFDMethods *ops;
  82} *multifd_send_state;
  83
  84struct {
  85    MultiFDRecvParams *params;
  86    MultiFDRecvData *data;
  87    /* number of created threads */
  88    int count;
  89    /*
  90     * This is always posted by the recv threads, the migration thread
  91     * uses it to wait for recv threads to finish assigned tasks.
  92     */
  93    QemuSemaphore sem_sync;
  94    /* global number of generated multifd packets */
  95    uint64_t packet_num;
  96    int exiting;
  97    /* multifd ops */
  98    const MultiFDMethods *ops;
  99} *multifd_recv_state;
 100
 101MultiFDSendData *multifd_send_data_alloc(void)
 102{
 103    MultiFDSendData *new = g_new0(MultiFDSendData, 1);
 104
 105    multifd_ram_payload_alloc(&new->u.ram);
 106    /* Device state allocates its payload on-demand */
 107
 108    return new;
 109}
 110
 111void multifd_send_data_clear(MultiFDSendData *data)
 112{
 113    if (multifd_payload_empty(data)) {
 114        return;
 115    }
 116
 117    switch (data->type) {
 118    case MULTIFD_PAYLOAD_DEVICE_STATE:
 119        multifd_send_data_clear_device_state(&data->u.device_state);
 120        break;
 121    default:
 122        /* Nothing to do */
 123        break;
 124    }
 125
 126    data->type = MULTIFD_PAYLOAD_NONE;
 127}
 128
 129void multifd_send_data_free(MultiFDSendData *data)
 130{
 131    if (!data) {
 132        return;
 133    }
 134
 135    /* This also free's device state payload */
 136    multifd_send_data_clear(data);
 137
 138    multifd_ram_payload_free(&data->u.ram);
 139
 140    g_free(data);
 141}
 142
 143static bool multifd_use_packets(void)
 144{
 145    return !migrate_mapped_ram();
 146}
 147
 148void multifd_send_channel_created(void)
 149{
 150    qemu_sem_post(&multifd_send_state->channels_created);
 151}
 152
 153static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {};
 154
 155void multifd_register_ops(int method, const MultiFDMethods *ops)
 156{
 157    assert(0 <= method && method < MULTIFD_COMPRESSION__MAX);
 158    assert(!multifd_ops[method]);
 159    multifd_ops[method] = ops;
 160}
 161
 162static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
 163{
 164    MultiFDInit_t msg = {};
 165    size_t size = sizeof(msg);
 166    int ret;
 167
 168    msg.magic = cpu_to_be32(MULTIFD_MAGIC);
 169    msg.version = cpu_to_be32(MULTIFD_VERSION);
 170    msg.id = p->id;
 171    memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
 172
 173    ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
 174    if (ret != 0) {
 175        return -1;
 176    }
 177    stat64_add(&mig_stats.multifd_bytes, size);
 178    return 0;
 179}
 180
 181static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
 182{
 183    MultiFDInit_t msg;
 184    int ret;
 185
 186    ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
 187    if (ret != 0) {
 188        return -1;
 189    }
 190
 191    msg.magic = be32_to_cpu(msg.magic);
 192    msg.version = be32_to_cpu(msg.version);
 193
 194    if (msg.magic != MULTIFD_MAGIC) {
 195        error_setg(errp, "multifd: received packet magic %x "
 196                   "expected %x", msg.magic, MULTIFD_MAGIC);
 197        return -1;
 198    }
 199
 200    if (msg.version != MULTIFD_VERSION) {
 201        error_setg(errp, "multifd: received packet version %u "
 202                   "expected %u", msg.version, MULTIFD_VERSION);
 203        return -1;
 204    }
 205
 206    if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
 207        char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
 208        char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
 209
 210        error_setg(errp, "multifd: received uuid '%s' and expected "
 211                   "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
 212        g_free(uuid);
 213        g_free(msg_uuid);
 214        return -1;
 215    }
 216
 217    if (msg.id > migrate_multifd_channels()) {
 218        error_setg(errp, "multifd: received channel id %u is greater than "
 219                   "number of channels %u", msg.id, migrate_multifd_channels());
 220        return -1;
 221    }
 222
 223    return msg.id;
 224}
 225
 226/* Fills a RAM multifd packet */
 227void multifd_send_fill_packet(MultiFDSendParams *p)
 228{
 229    MultiFDPacket_t *packet = p->packet;
 230    uint64_t packet_num;
 231    bool sync_packet = p->flags & MULTIFD_FLAG_SYNC;
 232
 233    memset(packet, 0, p->packet_len);
 234
 235    packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
 236    packet->hdr.version = cpu_to_be32(MULTIFD_VERSION);
 237
 238    packet->hdr.flags = cpu_to_be32(p->flags);
 239    packet->next_packet_size = cpu_to_be32(p->next_packet_size);
 240
 241    packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
 242    packet->packet_num = cpu_to_be64(packet_num);
 243
 244    p->packets_sent++;
 245
 246    if (!sync_packet) {
 247        multifd_ram_fill_packet(p);
 248    }
 249
 250    trace_multifd_send_fill(p->id, packet_num,
 251                            p->flags, p->next_packet_size);
 252}
 253
 254static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p,
 255                                             const MultiFDPacketHdr_t *hdr,
 256                                             Error **errp)
 257{
 258    uint32_t magic = be32_to_cpu(hdr->magic);
 259    uint32_t version = be32_to_cpu(hdr->version);
 260
 261    if (magic != MULTIFD_MAGIC) {
 262        error_setg(errp, "multifd: received packet magic %x, expected %x",
 263                   magic, MULTIFD_MAGIC);
 264        return -1;
 265    }
 266
 267    if (version != MULTIFD_VERSION) {
 268        error_setg(errp, "multifd: received packet version %u, expected %u",
 269                   version, MULTIFD_VERSION);
 270        return -1;
 271    }
 272
 273    p->flags = be32_to_cpu(hdr->flags);
 274
 275    return 0;
 276}
 277
 278static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p,
 279                                                   Error **errp)
 280{
 281    MultiFDPacketDeviceState_t *packet = p->packet_dev_state;
 282
 283    packet->instance_id = be32_to_cpu(packet->instance_id);
 284    p->next_packet_size = be32_to_cpu(packet->next_packet_size);
 285
 286    return 0;
 287}
 288
 289static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp)
 290{
 291    const MultiFDPacket_t *packet = p->packet;
 292    int ret = 0;
 293
 294    p->next_packet_size = be32_to_cpu(packet->next_packet_size);
 295    p->packet_num = be64_to_cpu(packet->packet_num);
 296
 297    /* Always unfill, old QEMUs (<9.0) send data along with SYNC */
 298    ret = multifd_ram_unfill_packet(p, errp);
 299
 300    trace_multifd_recv_unfill(p->id, p->packet_num, p->flags,
 301                              p->next_packet_size);
 302
 303    return ret;
 304}
 305
 306static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
 307{
 308    p->packets_recved++;
 309
 310    if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
 311        return multifd_recv_unfill_packet_device_state(p, errp);
 312    }
 313
 314    return multifd_recv_unfill_packet_ram(p, errp);
 315}
 316
 317static bool multifd_send_should_exit(void)
 318{
 319    return qatomic_read(&multifd_send_state->exiting);
 320}
 321
 322static bool multifd_recv_should_exit(void)
 323{
 324    return qatomic_read(&multifd_recv_state->exiting);
 325}
 326
 327/*
 328 * The migration thread can wait on either of the two semaphores.  This
 329 * function can be used to kick the main thread out of waiting on either of
 330 * them.  Should mostly only be called when something wrong happened with
 331 * the current multifd send thread.
 332 */
 333static void multifd_send_kick_main(MultiFDSendParams *p)
 334{
 335    qemu_sem_post(&p->sem_sync);
 336    qemu_sem_post(&multifd_send_state->channels_ready);
 337}
 338
 339/*
 340 * multifd_send() works by exchanging the MultiFDSendData object
 341 * provided by the caller with an unused MultiFDSendData object from
 342 * the next channel that is found to be idle.
 343 *
 344 * The channel owns the data until it finishes transmitting and the
 345 * caller owns the empty object until it fills it with data and calls
 346 * this function again. No locking necessary.
 347 *
 348 * Switching is safe because both the migration thread and the channel
 349 * thread have barriers in place to serialize access.
 350 *
 351 * Returns true if succeed, false otherwise.
 352 */
 353bool multifd_send(MultiFDSendData **send_data)
 354{
 355    int i;
 356    static int next_channel;
 357    MultiFDSendParams *p = NULL; /* make happy gcc */
 358    MultiFDSendData *tmp;
 359
 360    if (multifd_send_should_exit()) {
 361        return false;
 362    }
 363
 364    QEMU_LOCK_GUARD(&multifd_send_state->multifd_send_mutex);
 365
 366    /* We wait here, until at least one channel is ready */
 367    qemu_sem_wait(&multifd_send_state->channels_ready);
 368
 369    /*
 370     * next_channel can remain from a previous migration that was
 371     * using more channels, so ensure it doesn't overflow if the
 372     * limit is lower now.
 373     */
 374    next_channel %= migrate_multifd_channels();
 375    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
 376        if (multifd_send_should_exit()) {
 377            return false;
 378        }
 379        p = &multifd_send_state->params[i];
 380        /*
 381         * Lockless read to p->pending_job is safe, because only multifd
 382         * sender thread can clear it.
 383         */
 384        if (qatomic_read(&p->pending_job) == false) {
 385            next_channel = (i + 1) % migrate_multifd_channels();
 386            break;
 387        }
 388    }
 389
 390    /*
 391     * Make sure we read p->pending_job before all the rest.  Pairs with
 392     * qatomic_store_release() in multifd_send_thread().
 393     */
 394    smp_mb_acquire();
 395
 396    assert(multifd_payload_empty(p->data));
 397
 398    /*
 399     * Swap the pointers. The channel gets the client data for
 400     * transferring and the client gets back an unused data slot.
 401     */
 402    tmp = *send_data;
 403    *send_data = p->data;
 404    p->data = tmp;
 405
 406    /*
 407     * Making sure p->data is setup before marking pending_job=true. Pairs
 408     * with the qatomic_load_acquire() in multifd_send_thread().
 409     */
 410    qatomic_store_release(&p->pending_job, true);
 411    qemu_sem_post(&p->sem);
 412
 413    return true;
 414}
 415
 416/* Multifd send side hit an error; remember it and prepare to quit */
 417static void multifd_send_set_error(Error *err)
 418{
 419    /*
 420     * We don't want to exit each threads twice.  Depending on where
 421     * we get the error, or if there are two independent errors in two
 422     * threads at the same time, we can end calling this function
 423     * twice.
 424     */
 425    if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
 426        return;
 427    }
 428
 429    if (err) {
 430        MigrationState *s = migrate_get_current();
 431        migrate_set_error(s, err);
 432        if (s->state == MIGRATION_STATUS_SETUP ||
 433            s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
 434            s->state == MIGRATION_STATUS_DEVICE ||
 435            s->state == MIGRATION_STATUS_ACTIVE) {
 436            migrate_set_state(&s->state, s->state,
 437                              MIGRATION_STATUS_FAILED);
 438        }
 439    }
 440}
 441
 442static void multifd_send_terminate_threads(void)
 443{
 444    int i;
 445
 446    trace_multifd_send_terminate_threads();
 447
 448    /*
 449     * Tell everyone we're quitting.  No xchg() needed here; we simply
 450     * always set it.
 451     */
 452    qatomic_set(&multifd_send_state->exiting, 1);
 453
 454    /*
 455     * Firstly, kick all threads out; no matter whether they are just idle,
 456     * or blocked in an IO system call.
 457     */
 458    for (i = 0; i < migrate_multifd_channels(); i++) {
 459        MultiFDSendParams *p = &multifd_send_state->params[i];
 460
 461        qemu_sem_post(&p->sem);
 462        if (p->c) {
 463            qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
 464        }
 465    }
 466
 467    /*
 468     * Finally recycle all the threads.
 469     */
 470    for (i = 0; i < migrate_multifd_channels(); i++) {
 471        MultiFDSendParams *p = &multifd_send_state->params[i];
 472
 473        if (p->tls_thread_created) {
 474            qemu_thread_join(&p->tls_thread);
 475        }
 476
 477        if (p->thread_created) {
 478            qemu_thread_join(&p->thread);
 479        }
 480    }
 481}
 482
 483static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
 484{
 485    if (p->c) {
 486        migration_ioc_unregister_yank(p->c);
 487        /*
 488         * The object_unref() cannot guarantee the fd will always be
 489         * released because finalize() of the iochannel is only
 490         * triggered on the last reference and it's not guaranteed
 491         * that we always hold the last refcount when reaching here.
 492         *
 493         * Closing the fd explicitly has the benefit that if there is any
 494         * registered I/O handler callbacks on such fd, that will get a
 495         * POLLNVAL event and will further trigger the cleanup to finally
 496         * release the IOC.
 497         *
 498         * FIXME: It should logically be guaranteed that all multifd
 499         * channels have no I/O handler callback registered when reaching
 500         * here, because migration thread will wait for all multifd channel
 501         * establishments to complete during setup.  Since
 502         * migration_cleanup() will be scheduled in main thread too, all
 503         * previous callbacks should guarantee to be completed when
 504         * reaching here.  See multifd_send_state.channels_created and its
 505         * usage.  In the future, we could replace this with an assert
 506         * making sure we're the last reference, or simply drop it if above
 507         * is more clear to be justified.
 508         */
 509        qio_channel_close(p->c, &error_abort);
 510        object_unref(OBJECT(p->c));
 511        p->c = NULL;
 512    }
 513    qemu_sem_destroy(&p->sem);
 514    qemu_sem_destroy(&p->sem_sync);
 515    g_free(p->name);
 516    p->name = NULL;
 517    g_clear_pointer(&p->data, multifd_send_data_free);
 518    p->packet_len = 0;
 519    g_clear_pointer(&p->packet_device_state, g_free);
 520    g_free(p->packet);
 521    p->packet = NULL;
 522    multifd_send_state->ops->send_cleanup(p, errp);
 523    assert(!p->iov);
 524
 525    return *errp == NULL;
 526}
 527
 528static void multifd_send_cleanup_state(void)
 529{
 530    file_cleanup_outgoing_migration();
 531    socket_cleanup_outgoing_migration();
 532    multifd_device_state_send_cleanup();
 533    qemu_sem_destroy(&multifd_send_state->channels_created);
 534    qemu_sem_destroy(&multifd_send_state->channels_ready);
 535    qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex);
 536    g_free(multifd_send_state->params);
 537    multifd_send_state->params = NULL;
 538    g_free(multifd_send_state);
 539    multifd_send_state = NULL;
 540}
 541
 542void multifd_send_shutdown(void)
 543{
 544    int i;
 545
 546    if (!migrate_multifd()) {
 547        return;
 548    }
 549
 550    for (i = 0; i < migrate_multifd_channels(); i++) {
 551        MultiFDSendParams *p = &multifd_send_state->params[i];
 552
 553        /* thread_created implies the TLS handshake has succeeded */
 554        if (p->tls_thread_created && p->thread_created) {
 555            Error *local_err = NULL;
 556            /*
 557             * The destination expects the TLS session to always be
 558             * properly terminated. This helps to detect a premature
 559             * termination in the middle of the stream.  Note that
 560             * older QEMUs always break the connection on the source
 561             * and the destination always sees
 562             * GNUTLS_E_PREMATURE_TERMINATION.
 563             */
 564            migration_tls_channel_end(p->c, &local_err);
 565
 566            /*
 567             * The above can return an error in case the migration has
 568             * already failed. If the migration succeeded, errors are
 569             * not expected but there's no need to kill the source.
 570             */
 571            if (local_err && !migration_has_failed(migrate_get_current())) {
 572                warn_report(
 573                    "multifd_send_%d: Failed to terminate TLS connection: %s",
 574                    p->id, error_get_pretty(local_err));
 575                break;
 576            }
 577        }
 578    }
 579
 580    multifd_send_terminate_threads();
 581
 582    for (i = 0; i < migrate_multifd_channels(); i++) {
 583        MultiFDSendParams *p = &multifd_send_state->params[i];
 584        Error *local_err = NULL;
 585
 586        if (!multifd_send_cleanup_channel(p, &local_err)) {
 587            migrate_set_error(migrate_get_current(), local_err);
 588            error_free(local_err);
 589        }
 590    }
 591
 592    multifd_send_cleanup_state();
 593}
 594
 595static int multifd_zero_copy_flush(QIOChannel *c)
 596{
 597    int ret;
 598    Error *err = NULL;
 599
 600    ret = qio_channel_flush(c, &err);
 601    if (ret < 0) {
 602        error_report_err(err);
 603        return -1;
 604    }
 605    if (ret == 1) {
 606        stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
 607    }
 608
 609    return ret;
 610}
 611
 612int multifd_send_sync_main(MultiFDSyncReq req)
 613{
 614    int i;
 615    bool flush_zero_copy;
 616
 617    assert(req != MULTIFD_SYNC_NONE);
 618
 619    flush_zero_copy = migrate_zero_copy_send();
 620
 621    for (i = 0; i < migrate_multifd_channels(); i++) {
 622        MultiFDSendParams *p = &multifd_send_state->params[i];
 623
 624        if (multifd_send_should_exit()) {
 625            return -1;
 626        }
 627
 628        trace_multifd_send_sync_main_signal(p->id);
 629
 630        /*
 631         * We should be the only user so far, so not possible to be set by
 632         * others concurrently.
 633         */
 634        assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE);
 635        qatomic_set(&p->pending_sync, req);
 636        qemu_sem_post(&p->sem);
 637    }
 638    for (i = 0; i < migrate_multifd_channels(); i++) {
 639        MultiFDSendParams *p = &multifd_send_state->params[i];
 640
 641        if (multifd_send_should_exit()) {
 642            return -1;
 643        }
 644
 645        qemu_sem_wait(&multifd_send_state->channels_ready);
 646        trace_multifd_send_sync_main_wait(p->id);
 647        qemu_sem_wait(&p->sem_sync);
 648
 649        if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
 650            return -1;
 651        }
 652    }
 653    trace_multifd_send_sync_main(multifd_send_state->packet_num);
 654
 655    return 0;
 656}
 657
 658static void *multifd_send_thread(void *opaque)
 659{
 660    MultiFDSendParams *p = opaque;
 661    MigrationThread *thread = NULL;
 662    Error *local_err = NULL;
 663    int ret = 0;
 664    bool use_packets = multifd_use_packets();
 665
 666    thread = migration_threads_add(p->name, qemu_get_thread_id());
 667
 668    trace_multifd_send_thread_start(p->id);
 669    rcu_register_thread();
 670
 671    if (use_packets) {
 672        if (multifd_send_initial_packet(p, &local_err) < 0) {
 673            ret = -1;
 674            goto out;
 675        }
 676    }
 677
 678    while (true) {
 679        qemu_sem_post(&multifd_send_state->channels_ready);
 680        qemu_sem_wait(&p->sem);
 681
 682        if (multifd_send_should_exit()) {
 683            break;
 684        }
 685
 686        /*
 687         * Read pending_job flag before p->data.  Pairs with the
 688         * qatomic_store_release() in multifd_send().
 689         */
 690        if (qatomic_load_acquire(&p->pending_job)) {
 691            bool is_device_state = multifd_payload_device_state(p->data);
 692            size_t total_size;
 693            int write_flags_masked = 0;
 694
 695            p->flags = 0;
 696            p->iovs_num = 0;
 697            assert(!multifd_payload_empty(p->data));
 698
 699            if (is_device_state) {
 700                multifd_device_state_send_prepare(p);
 701
 702                /* Device state packets cannot be sent via zerocopy */
 703                write_flags_masked |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
 704            } else {
 705                ret = multifd_send_state->ops->send_prepare(p, &local_err);
 706                if (ret != 0) {
 707                    break;
 708                }
 709            }
 710
 711            /*
 712             * The packet header in the zerocopy RAM case is accounted for
 713             * in multifd_nocomp_send_prepare() - where it is actually
 714             * being sent.
 715             */
 716            total_size = iov_size(p->iov, p->iovs_num);
 717
 718            if (migrate_mapped_ram()) {
 719                assert(!is_device_state);
 720
 721                ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
 722                                              &p->data->u.ram, &local_err);
 723            } else {
 724                ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
 725                                                  NULL, 0,
 726                                                  p->write_flags & ~write_flags_masked,
 727                                                  &local_err);
 728            }
 729
 730            if (ret != 0) {
 731                break;
 732            }
 733
 734            stat64_add(&mig_stats.multifd_bytes, total_size);
 735
 736            p->next_packet_size = 0;
 737            multifd_send_data_clear(p->data);
 738
 739            /*
 740             * Making sure p->data is published before saying "we're
 741             * free".  Pairs with the smp_mb_acquire() in
 742             * multifd_send().
 743             */
 744            qatomic_store_release(&p->pending_job, false);
 745        } else {
 746            MultiFDSyncReq req = qatomic_read(&p->pending_sync);
 747
 748            /*
 749             * If not a normal job, must be a sync request.  Note that
 750             * pending_sync is a standalone flag (unlike pending_job), so
 751             * it doesn't require explicit memory barriers.
 752             */
 753            assert(req != MULTIFD_SYNC_NONE);
 754
 755            /* Only push the SYNC message if it involves a remote sync */
 756            if (req == MULTIFD_SYNC_ALL) {
 757                p->flags = MULTIFD_FLAG_SYNC;
 758                multifd_send_fill_packet(p);
 759                ret = qio_channel_write_all(p->c, (void *)p->packet,
 760                                            p->packet_len, &local_err);
 761                if (ret != 0) {
 762                    break;
 763                }
 764                /* p->next_packet_size will always be zero for a SYNC packet */
 765                stat64_add(&mig_stats.multifd_bytes, p->packet_len);
 766            }
 767
 768            qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE);
 769            qemu_sem_post(&p->sem_sync);
 770        }
 771    }
 772
 773out:
 774    if (ret) {
 775        assert(local_err);
 776        trace_multifd_send_error(p->id);
 777        multifd_send_set_error(local_err);
 778        multifd_send_kick_main(p);
 779        error_free(local_err);
 780    }
 781
 782    rcu_unregister_thread();
 783    migration_threads_remove(thread);
 784    trace_multifd_send_thread_end(p->id, p->packets_sent);
 785
 786    return NULL;
 787}
 788
 789static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
 790
 791typedef struct {
 792    MultiFDSendParams *p;
 793    QIOChannelTLS *tioc;
 794} MultiFDTLSThreadArgs;
 795
 796static void *multifd_tls_handshake_thread(void *opaque)
 797{
 798    MultiFDTLSThreadArgs *args = opaque;
 799
 800    qio_channel_tls_handshake(args->tioc,
 801                              multifd_new_send_channel_async,
 802                              args->p,
 803                              NULL,
 804                              NULL);
 805    g_free(args);
 806
 807    return NULL;
 808}
 809
 810static bool multifd_tls_channel_connect(MultiFDSendParams *p,
 811                                        QIOChannel *ioc,
 812                                        Error **errp)
 813{
 814    MigrationState *s = migrate_get_current();
 815    const char *hostname = s->hostname;
 816    MultiFDTLSThreadArgs *args;
 817    QIOChannelTLS *tioc;
 818
 819    tioc = migration_tls_client_create(ioc, hostname, errp);
 820    if (!tioc) {
 821        return false;
 822    }
 823
 824    /*
 825     * Ownership of the socket channel now transfers to the newly
 826     * created TLS channel, which has already taken a reference.
 827     */
 828    object_unref(OBJECT(ioc));
 829    trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
 830    qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
 831
 832    args = g_new0(MultiFDTLSThreadArgs, 1);
 833    args->tioc = tioc;
 834    args->p = p;
 835
 836    p->tls_thread_created = true;
 837    qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS,
 838                       multifd_tls_handshake_thread, args,
 839                       QEMU_THREAD_JOINABLE);
 840    return true;
 841}
 842
 843void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
 844{
 845    qio_channel_set_delay(ioc, false);
 846
 847    migration_ioc_register_yank(ioc);
 848    /* Setup p->c only if the channel is completely setup */
 849    p->c = ioc;
 850
 851    p->thread_created = true;
 852    qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
 853                       QEMU_THREAD_JOINABLE);
 854}
 855
 856/*
 857 * When TLS is enabled this function is called once to establish the
 858 * TLS connection and a second time after the TLS handshake to create
 859 * the multifd channel. Without TLS it goes straight into the channel
 860 * creation.
 861 */
 862static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
 863{
 864    MultiFDSendParams *p = opaque;
 865    QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
 866    Error *local_err = NULL;
 867    bool ret;
 868
 869    trace_multifd_new_send_channel_async(p->id);
 870
 871    if (qio_task_propagate_error(task, &local_err)) {
 872        ret = false;
 873        goto out;
 874    }
 875
 876    trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
 877                                       migrate_get_current()->hostname);
 878
 879    if (migrate_channel_requires_tls_upgrade(ioc)) {
 880        ret = multifd_tls_channel_connect(p, ioc, &local_err);
 881        if (ret) {
 882            return;
 883        }
 884    } else {
 885        multifd_channel_connect(p, ioc);
 886        ret = true;
 887    }
 888
 889out:
 890    /*
 891     * Here we're not interested whether creation succeeded, only that
 892     * it happened at all.
 893     */
 894    multifd_send_channel_created();
 895
 896    if (ret) {
 897        return;
 898    }
 899
 900    trace_multifd_new_send_channel_async_error(p->id, local_err);
 901    multifd_send_set_error(local_err);
 902    /*
 903     * For error cases (TLS or non-TLS), IO channel is always freed here
 904     * rather than when cleanup multifd: since p->c is not set, multifd
 905     * cleanup code doesn't even know its existence.
 906     */
 907    object_unref(OBJECT(ioc));
 908    error_free(local_err);
 909}
 910
 911static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
 912{
 913    if (!multifd_use_packets()) {
 914        return file_send_channel_create(opaque, errp);
 915    }
 916
 917    socket_send_channel_create(multifd_new_send_channel_async, opaque);
 918    return true;
 919}
 920
 921bool multifd_send_setup(void)
 922{
 923    MigrationState *s = migrate_get_current();
 924    int thread_count, ret = 0;
 925    uint32_t page_count = multifd_ram_page_count();
 926    bool use_packets = multifd_use_packets();
 927    uint8_t i;
 928
 929    if (!migrate_multifd()) {
 930        return true;
 931    }
 932
 933    thread_count = migrate_multifd_channels();
 934    multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
 935    multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
 936    qemu_mutex_init(&multifd_send_state->multifd_send_mutex);
 937    qemu_sem_init(&multifd_send_state->channels_created, 0);
 938    qemu_sem_init(&multifd_send_state->channels_ready, 0);
 939    qatomic_set(&multifd_send_state->exiting, 0);
 940    multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
 941
 942    for (i = 0; i < thread_count; i++) {
 943        MultiFDSendParams *p = &multifd_send_state->params[i];
 944        Error *local_err = NULL;
 945
 946        qemu_sem_init(&p->sem, 0);
 947        qemu_sem_init(&p->sem_sync, 0);
 948        p->id = i;
 949        p->data = multifd_send_data_alloc();
 950
 951        if (use_packets) {
 952            p->packet_len = sizeof(MultiFDPacket_t)
 953                          + sizeof(uint64_t) * page_count;
 954            p->packet = g_malloc0(p->packet_len);
 955            p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state));
 956            p->packet_device_state->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
 957            p->packet_device_state->hdr.version = cpu_to_be32(MULTIFD_VERSION);
 958        }
 959        p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i);
 960        p->write_flags = 0;
 961
 962        if (!multifd_new_send_channel_create(p, &local_err)) {
 963            migrate_set_error(s, local_err);
 964            ret = -1;
 965        }
 966    }
 967
 968    /*
 969     * Wait until channel creation has started for all channels. The
 970     * creation can still fail, but no more channels will be created
 971     * past this point.
 972     */
 973    for (i = 0; i < thread_count; i++) {
 974        qemu_sem_wait(&multifd_send_state->channels_created);
 975    }
 976
 977    if (ret) {
 978        goto err;
 979    }
 980
 981    for (i = 0; i < thread_count; i++) {
 982        MultiFDSendParams *p = &multifd_send_state->params[i];
 983        Error *local_err = NULL;
 984
 985        ret = multifd_send_state->ops->send_setup(p, &local_err);
 986        if (ret) {
 987            migrate_set_error(s, local_err);
 988            goto err;
 989        }
 990        assert(p->iov);
 991    }
 992
 993    multifd_device_state_send_setup();
 994
 995    return true;
 996
 997err:
 998    migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
 999                      MIGRATION_STATUS_FAILED);
1000    return false;
1001}
1002
1003bool multifd_recv(void)
1004{
1005    int i;
1006    static int next_recv_channel;
1007    MultiFDRecvParams *p = NULL;
1008    MultiFDRecvData *data = multifd_recv_state->data;
1009
1010    /*
1011     * next_channel can remain from a previous migration that was
1012     * using more channels, so ensure it doesn't overflow if the
1013     * limit is lower now.
1014     */
1015    next_recv_channel %= migrate_multifd_channels();
1016    for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
1017        if (multifd_recv_should_exit()) {
1018            return false;
1019        }
1020
1021        p = &multifd_recv_state->params[i];
1022
1023        if (qatomic_read(&p->pending_job) == false) {
1024            next_recv_channel = (i + 1) % migrate_multifd_channels();
1025            break;
1026        }
1027    }
1028
1029    /*
1030     * Order pending_job read before manipulating p->data below. Pairs
1031     * with qatomic_store_release() at multifd_recv_thread().
1032     */
1033    smp_mb_acquire();
1034
1035    assert(!p->data->size);
1036    multifd_recv_state->data = p->data;
1037    p->data = data;
1038
1039    /*
1040     * Order p->data update before setting pending_job. Pairs with
1041     * qatomic_load_acquire() at multifd_recv_thread().
1042     */
1043    qatomic_store_release(&p->pending_job, true);
1044    qemu_sem_post(&p->sem);
1045
1046    return true;
1047}
1048
1049MultiFDRecvData *multifd_get_recv_data(void)
1050{
1051    return multifd_recv_state->data;
1052}
1053
1054static void multifd_recv_terminate_threads(Error *err)
1055{
1056    int i;
1057
1058    trace_multifd_recv_terminate_threads(err != NULL);
1059
1060    if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
1061        return;
1062    }
1063
1064    if (err) {
1065        MigrationState *s = migrate_get_current();
1066        migrate_set_error(s, err);
1067        if (s->state == MIGRATION_STATUS_SETUP ||
1068            s->state == MIGRATION_STATUS_ACTIVE) {
1069            migrate_set_state(&s->state, s->state,
1070                              MIGRATION_STATUS_FAILED);
1071        }
1072    }
1073
1074    for (i = 0; i < migrate_multifd_channels(); i++) {
1075        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1076
1077        /*
1078         * The migration thread and channels interact differently
1079         * depending on the presence of packets.
1080         */
1081        if (multifd_use_packets()) {
1082            /*
1083             * The channel receives as long as there are packets. When
1084             * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1085             * channel waits for the migration thread to sync. If the
1086             * sync never happens, do it here.
1087             */
1088            qemu_sem_post(&p->sem_sync);
1089        } else {
1090            /*
1091             * The channel waits for the migration thread to give it
1092             * work. When the migration thread runs out of work, it
1093             * releases the channel and waits for any pending work to
1094             * finish. If we reach here (e.g. due to error) before the
1095             * work runs out, release the channel.
1096             */
1097            qemu_sem_post(&p->sem);
1098        }
1099
1100        /*
1101         * We could arrive here for two reasons:
1102         *  - normal quit, i.e. everything went fine, just finished
1103         *  - error quit: We close the channels so the channel threads
1104         *    finish the qio_channel_read_all_eof()
1105         */
1106        if (p->c) {
1107            qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1108        }
1109    }
1110}
1111
1112void multifd_recv_shutdown(void)
1113{
1114    if (migrate_multifd()) {
1115        multifd_recv_terminate_threads(NULL);
1116    }
1117}
1118
1119static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1120{
1121    migration_ioc_unregister_yank(p->c);
1122    object_unref(OBJECT(p->c));
1123    p->c = NULL;
1124    qemu_mutex_destroy(&p->mutex);
1125    qemu_sem_destroy(&p->sem_sync);
1126    qemu_sem_destroy(&p->sem);
1127    g_free(p->data);
1128    p->data = NULL;
1129    g_free(p->name);
1130    p->name = NULL;
1131    p->packet_len = 0;
1132    g_free(p->packet);
1133    p->packet = NULL;
1134    g_clear_pointer(&p->packet_dev_state, g_free);
1135    g_free(p->normal);
1136    p->normal = NULL;
1137    g_free(p->zero);
1138    p->zero = NULL;
1139    multifd_recv_state->ops->recv_cleanup(p);
1140}
1141
1142static void multifd_recv_cleanup_state(void)
1143{
1144    qemu_sem_destroy(&multifd_recv_state->sem_sync);
1145    g_free(multifd_recv_state->params);
1146    multifd_recv_state->params = NULL;
1147    g_free(multifd_recv_state->data);
1148    multifd_recv_state->data = NULL;
1149    g_free(multifd_recv_state);
1150    multifd_recv_state = NULL;
1151}
1152
1153void multifd_recv_cleanup(void)
1154{
1155    int i;
1156
1157    if (!migrate_multifd()) {
1158        return;
1159    }
1160    multifd_recv_terminate_threads(NULL);
1161    for (i = 0; i < migrate_multifd_channels(); i++) {
1162        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1163
1164        if (p->thread_created) {
1165            qemu_thread_join(&p->thread);
1166        }
1167    }
1168    for (i = 0; i < migrate_multifd_channels(); i++) {
1169        multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1170    }
1171    multifd_recv_cleanup_state();
1172}
1173
1174void multifd_recv_sync_main(void)
1175{
1176    int thread_count = migrate_multifd_channels();
1177    bool file_based = !multifd_use_packets();
1178    int i;
1179
1180    if (!migrate_multifd()) {
1181        return;
1182    }
1183
1184    /*
1185     * File-based channels don't use packets and therefore need to
1186     * wait for more work. Release them to start the sync.
1187     */
1188    if (file_based) {
1189        for (i = 0; i < thread_count; i++) {
1190            MultiFDRecvParams *p = &multifd_recv_state->params[i];
1191
1192            trace_multifd_recv_sync_main_signal(p->id);
1193            qemu_sem_post(&p->sem);
1194        }
1195    }
1196
1197    /*
1198     * Initiate the synchronization by waiting for all channels.
1199     *
1200     * For socket-based migration this means each channel has received
1201     * the SYNC packet on the stream.
1202     *
1203     * For file-based migration this means each channel is done with
1204     * the work (pending_job=false).
1205     */
1206    for (i = 0; i < thread_count; i++) {
1207        trace_multifd_recv_sync_main_wait(i);
1208        qemu_sem_wait(&multifd_recv_state->sem_sync);
1209    }
1210
1211    if (file_based) {
1212        /*
1213         * For file-based loading is done in one iteration. We're
1214         * done.
1215         */
1216        return;
1217    }
1218
1219    /*
1220     * Sync done. Release the channels for the next iteration.
1221     */
1222    for (i = 0; i < thread_count; i++) {
1223        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1224
1225        WITH_QEMU_LOCK_GUARD(&p->mutex) {
1226            if (multifd_recv_state->packet_num < p->packet_num) {
1227                multifd_recv_state->packet_num = p->packet_num;
1228            }
1229        }
1230        trace_multifd_recv_sync_main_signal(p->id);
1231        qemu_sem_post(&p->sem_sync);
1232    }
1233    trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1234}
1235
1236static int multifd_device_state_recv(MultiFDRecvParams *p, Error **errp)
1237{
1238    g_autofree char *dev_state_buf = NULL;
1239    int ret;
1240
1241    dev_state_buf = g_malloc(p->next_packet_size);
1242
1243    ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, errp);
1244    if (ret != 0) {
1245        return ret;
1246    }
1247
1248    if (p->packet_dev_state->idstr[sizeof(p->packet_dev_state->idstr) - 1]
1249        != 0) {
1250        error_setg(errp, "unterminated multifd device state idstr");
1251        return -1;
1252    }
1253
1254    if (!qemu_loadvm_load_state_buffer(p->packet_dev_state->idstr,
1255                                       p->packet_dev_state->instance_id,
1256                                       dev_state_buf, p->next_packet_size,
1257                                       errp)) {
1258        ret = -1;
1259    }
1260
1261    return ret;
1262}
1263
1264static void *multifd_recv_thread(void *opaque)
1265{
1266    MigrationState *s = migrate_get_current();
1267    MultiFDRecvParams *p = opaque;
1268    Error *local_err = NULL;
1269    bool use_packets = multifd_use_packets();
1270    int ret;
1271
1272    trace_multifd_recv_thread_start(p->id);
1273    rcu_register_thread();
1274
1275    if (!s->multifd_clean_tls_termination) {
1276        p->read_flags = QIO_CHANNEL_READ_FLAG_RELAXED_EOF;
1277    }
1278
1279    while (true) {
1280        MultiFDPacketHdr_t hdr;
1281        uint32_t flags = 0;
1282        bool is_device_state = false;
1283        bool has_data = false;
1284        uint8_t *pkt_buf;
1285        size_t pkt_len;
1286
1287        p->normal_num = 0;
1288
1289        if (use_packets) {
1290            struct iovec iov = {
1291                .iov_base = (void *)&hdr,
1292                .iov_len = sizeof(hdr)
1293            };
1294
1295            if (multifd_recv_should_exit()) {
1296                break;
1297            }
1298
1299            ret = qio_channel_readv_full_all_eof(p->c, &iov, 1, NULL, NULL,
1300                                                 p->read_flags, &local_err);
1301            if (!ret) {
1302                /* EOF */
1303                assert(!local_err);
1304                break;
1305            }
1306
1307            if (ret == -1) {
1308                break;
1309            }
1310
1311            ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err);
1312            if (ret) {
1313                break;
1314            }
1315
1316            is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE;
1317            if (is_device_state) {
1318                pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr);
1319                pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr);
1320            } else {
1321                pkt_buf = (uint8_t *)p->packet + sizeof(hdr);
1322                pkt_len = p->packet_len - sizeof(hdr);
1323            }
1324
1325            ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len,
1326                                           &local_err);
1327            if (!ret) {
1328                /* EOF */
1329                error_setg(&local_err, "multifd: unexpected EOF after packet header");
1330                break;
1331            }
1332
1333            if (ret == -1) {
1334                break;
1335            }
1336
1337            qemu_mutex_lock(&p->mutex);
1338            ret = multifd_recv_unfill_packet(p, &local_err);
1339            if (ret) {
1340                qemu_mutex_unlock(&p->mutex);
1341                break;
1342            }
1343
1344            flags = p->flags;
1345            /* recv methods don't know how to handle the SYNC flag */
1346            p->flags &= ~MULTIFD_FLAG_SYNC;
1347
1348            if (is_device_state) {
1349                has_data = p->next_packet_size > 0;
1350            } else {
1351                /*
1352                 * Even if it's a SYNC packet, this needs to be set
1353                 * because older QEMUs (<9.0) still send data along with
1354                 * the SYNC packet.
1355                 */
1356                has_data = p->normal_num || p->zero_num;
1357            }
1358
1359            qemu_mutex_unlock(&p->mutex);
1360        } else {
1361            /*
1362             * No packets, so we need to wait for the vmstate code to
1363             * give us work.
1364             */
1365            qemu_sem_wait(&p->sem);
1366
1367            if (multifd_recv_should_exit()) {
1368                break;
1369            }
1370
1371            /* pairs with qatomic_store_release() at multifd_recv() */
1372            if (!qatomic_load_acquire(&p->pending_job)) {
1373                /*
1374                 * Migration thread did not send work, this is
1375                 * equivalent to pending_sync on the sending
1376                 * side. Post sem_sync to notify we reached this
1377                 * point.
1378                 */
1379                qemu_sem_post(&multifd_recv_state->sem_sync);
1380                continue;
1381            }
1382
1383            has_data = !!p->data->size;
1384        }
1385
1386        if (has_data) {
1387            /*
1388             * multifd thread should not be active and receive data
1389             * when migration is in the Postcopy phase. Two threads
1390             * writing the same memory area could easily corrupt
1391             * the guest state.
1392             */
1393            assert(!migration_in_postcopy());
1394            if (is_device_state) {
1395                assert(use_packets);
1396                ret = multifd_device_state_recv(p, &local_err);
1397            } else {
1398                ret = multifd_recv_state->ops->recv(p, &local_err);
1399            }
1400            if (ret != 0) {
1401                break;
1402            }
1403        } else if (is_device_state) {
1404            error_setg(&local_err,
1405                       "multifd: received empty device state packet");
1406            break;
1407        }
1408
1409        if (use_packets) {
1410            if (flags & MULTIFD_FLAG_SYNC) {
1411                if (is_device_state) {
1412                    error_setg(&local_err,
1413                               "multifd: received SYNC device state packet");
1414                    break;
1415                }
1416
1417                qemu_sem_post(&multifd_recv_state->sem_sync);
1418                qemu_sem_wait(&p->sem_sync);
1419            }
1420        } else {
1421            p->data->size = 0;
1422            /*
1423             * Order data->size update before clearing
1424             * pending_job. Pairs with smp_mb_acquire() at
1425             * multifd_recv().
1426             */
1427            qatomic_store_release(&p->pending_job, false);
1428        }
1429    }
1430
1431    if (local_err) {
1432        multifd_recv_terminate_threads(local_err);
1433        error_free(local_err);
1434    }
1435
1436    rcu_unregister_thread();
1437    trace_multifd_recv_thread_end(p->id, p->packets_recved);
1438
1439    return NULL;
1440}
1441
1442int multifd_recv_setup(Error **errp)
1443{
1444    int thread_count;
1445    uint32_t page_count = multifd_ram_page_count();
1446    bool use_packets = multifd_use_packets();
1447    uint8_t i;
1448
1449    /*
1450     * Return successfully if multiFD recv state is already initialised
1451     * or multiFD is not enabled.
1452     */
1453    if (multifd_recv_state || !migrate_multifd()) {
1454        return 0;
1455    }
1456
1457    thread_count = migrate_multifd_channels();
1458    multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1459    multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1460
1461    multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1462    multifd_recv_state->data->size = 0;
1463
1464    qatomic_set(&multifd_recv_state->count, 0);
1465    qatomic_set(&multifd_recv_state->exiting, 0);
1466    qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1467    multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1468
1469    for (i = 0; i < thread_count; i++) {
1470        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1471
1472        qemu_mutex_init(&p->mutex);
1473        qemu_sem_init(&p->sem_sync, 0);
1474        qemu_sem_init(&p->sem, 0);
1475        p->pending_job = false;
1476        p->id = i;
1477
1478        p->data = g_new0(MultiFDRecvData, 1);
1479        p->data->size = 0;
1480
1481        if (use_packets) {
1482            p->packet_len = sizeof(MultiFDPacket_t)
1483                + sizeof(uint64_t) * page_count;
1484            p->packet = g_malloc0(p->packet_len);
1485            p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state));
1486        }
1487        p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i);
1488        p->normal = g_new0(ram_addr_t, page_count);
1489        p->zero = g_new0(ram_addr_t, page_count);
1490    }
1491
1492    for (i = 0; i < thread_count; i++) {
1493        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1494        int ret;
1495
1496        ret = multifd_recv_state->ops->recv_setup(p, errp);
1497        if (ret) {
1498            return ret;
1499        }
1500    }
1501    return 0;
1502}
1503
1504bool multifd_recv_all_channels_created(void)
1505{
1506    int thread_count = migrate_multifd_channels();
1507
1508    if (!migrate_multifd()) {
1509        return true;
1510    }
1511
1512    if (!multifd_recv_state) {
1513        /* Called before any connections created */
1514        return false;
1515    }
1516
1517    return thread_count == qatomic_read(&multifd_recv_state->count);
1518}
1519
1520/*
1521 * Try to receive all multifd channels to get ready for the migration.
1522 * Sets @errp when failing to receive the current channel.
1523 */
1524void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1525{
1526    MultiFDRecvParams *p;
1527    Error *local_err = NULL;
1528    bool use_packets = multifd_use_packets();
1529    int id;
1530
1531    if (use_packets) {
1532        id = multifd_recv_initial_packet(ioc, &local_err);
1533        if (id < 0) {
1534            multifd_recv_terminate_threads(local_err);
1535            error_propagate_prepend(errp, local_err,
1536                                    "failed to receive packet"
1537                                    " via multifd channel %d: ",
1538                                    qatomic_read(&multifd_recv_state->count));
1539            return;
1540        }
1541        trace_multifd_recv_new_channel(id);
1542    } else {
1543        id = qatomic_read(&multifd_recv_state->count);
1544    }
1545
1546    p = &multifd_recv_state->params[id];
1547    if (p->c != NULL) {
1548        error_setg(&local_err, "multifd: received id '%d' already setup'",
1549                   id);
1550        multifd_recv_terminate_threads(local_err);
1551        error_propagate(errp, local_err);
1552        return;
1553    }
1554    p->c = ioc;
1555    object_ref(OBJECT(ioc));
1556
1557    p->thread_created = true;
1558    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1559                       QEMU_THREAD_JOINABLE);
1560    qatomic_inc(&multifd_recv_state->count);
1561}
1562