qemu/migration/multifd.c
<<
>>
Prefs
   1/*
   2 * Multifd common code
   3 *
   4 * Copyright (c) 2019-2020 Red Hat Inc
   5 *
   6 * Authors:
   7 *  Juan Quintela <quintela@redhat.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  10 * See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include "qemu/rcu.h"
  15#include "exec/target_page.h"
  16#include "sysemu/sysemu.h"
  17#include "exec/ramblock.h"
  18#include "qemu/error-report.h"
  19#include "qapi/error.h"
  20#include "ram.h"
  21#include "migration.h"
  22#include "socket.h"
  23#include "qemu-file.h"
  24#include "trace.h"
  25#include "multifd.h"
  26
  27/* Multiple fd's */
  28
  29#define MULTIFD_MAGIC 0x11223344U
  30#define MULTIFD_VERSION 1
  31
  32typedef struct {
  33    uint32_t magic;
  34    uint32_t version;
  35    unsigned char uuid[16]; /* QemuUUID */
  36    uint8_t id;
  37    uint8_t unused1[7];     /* Reserved for future use */
  38    uint64_t unused2[4];    /* Reserved for future use */
  39} __attribute__((packed)) MultiFDInit_t;
  40
  41/* Multifd without compression */
  42
  43/**
  44 * nocomp_send_setup: setup send side
  45 *
  46 * For no compression this function does nothing.
  47 *
  48 * Returns 0 for success or -1 for error
  49 *
  50 * @p: Params for the channel that we are using
  51 * @errp: pointer to an error
  52 */
  53static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
  54{
  55    return 0;
  56}
  57
  58/**
  59 * nocomp_send_cleanup: cleanup send side
  60 *
  61 * For no compression this function does nothing.
  62 *
  63 * @p: Params for the channel that we are using
  64 */
  65static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
  66{
  67    return;
  68}
  69
  70/**
  71 * nocomp_send_prepare: prepare date to be able to send
  72 *
  73 * For no compression we just have to calculate the size of the
  74 * packet.
  75 *
  76 * Returns 0 for success or -1 for error
  77 *
  78 * @p: Params for the channel that we are using
  79 * @used: number of pages used
  80 * @errp: pointer to an error
  81 */
  82static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used,
  83                               Error **errp)
  84{
  85    p->next_packet_size = used * qemu_target_page_size();
  86    p->flags |= MULTIFD_FLAG_NOCOMP;
  87    return 0;
  88}
  89
  90/**
  91 * nocomp_send_write: do the actual write of the data
  92 *
  93 * For no compression we just have to write the data.
  94 *
  95 * Returns 0 for success or -1 for error
  96 *
  97 * @p: Params for the channel that we are using
  98 * @used: number of pages used
  99 * @errp: pointer to an error
 100 */
 101static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
 102{
 103    return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
 104}
 105
 106/**
 107 * nocomp_recv_setup: setup receive side
 108 *
 109 * For no compression this function does nothing.
 110 *
 111 * Returns 0 for success or -1 for error
 112 *
 113 * @p: Params for the channel that we are using
 114 * @errp: pointer to an error
 115 */
 116static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
 117{
 118    return 0;
 119}
 120
 121/**
 122 * nocomp_recv_cleanup: setup receive side
 123 *
 124 * For no compression this function does nothing.
 125 *
 126 * @p: Params for the channel that we are using
 127 */
 128static void nocomp_recv_cleanup(MultiFDRecvParams *p)
 129{
 130}
 131
 132/**
 133 * nocomp_recv_pages: read the data from the channel into actual pages
 134 *
 135 * For no compression we just need to read things into the correct place.
 136 *
 137 * Returns 0 for success or -1 for error
 138 *
 139 * @p: Params for the channel that we are using
 140 * @used: number of pages used
 141 * @errp: pointer to an error
 142 */
 143static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
 144{
 145    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
 146
 147    if (flags != MULTIFD_FLAG_NOCOMP) {
 148        error_setg(errp, "multifd %d: flags received %x flags expected %x",
 149                   p->id, flags, MULTIFD_FLAG_NOCOMP);
 150        return -1;
 151    }
 152    return qio_channel_readv_all(p->c, p->pages->iov, used, errp);
 153}
 154
 155static MultiFDMethods multifd_nocomp_ops = {
 156    .send_setup = nocomp_send_setup,
 157    .send_cleanup = nocomp_send_cleanup,
 158    .send_prepare = nocomp_send_prepare,
 159    .send_write = nocomp_send_write,
 160    .recv_setup = nocomp_recv_setup,
 161    .recv_cleanup = nocomp_recv_cleanup,
 162    .recv_pages = nocomp_recv_pages
 163};
 164
 165static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
 166    [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
 167};
 168
 169void multifd_register_ops(int method, MultiFDMethods *ops)
 170{
 171    assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
 172    multifd_ops[method] = ops;
 173}
 174
 175static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
 176{
 177    MultiFDInit_t msg = {};
 178    int ret;
 179
 180    msg.magic = cpu_to_be32(MULTIFD_MAGIC);
 181    msg.version = cpu_to_be32(MULTIFD_VERSION);
 182    msg.id = p->id;
 183    memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
 184
 185    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
 186    if (ret != 0) {
 187        return -1;
 188    }
 189    return 0;
 190}
 191
 192static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
 193{
 194    MultiFDInit_t msg;
 195    int ret;
 196
 197    ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
 198    if (ret != 0) {
 199        return -1;
 200    }
 201
 202    msg.magic = be32_to_cpu(msg.magic);
 203    msg.version = be32_to_cpu(msg.version);
 204
 205    if (msg.magic != MULTIFD_MAGIC) {
 206        error_setg(errp, "multifd: received packet magic %x "
 207                   "expected %x", msg.magic, MULTIFD_MAGIC);
 208        return -1;
 209    }
 210
 211    if (msg.version != MULTIFD_VERSION) {
 212        error_setg(errp, "multifd: received packet version %d "
 213                   "expected %d", msg.version, MULTIFD_VERSION);
 214        return -1;
 215    }
 216
 217    if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
 218        char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
 219        char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
 220
 221        error_setg(errp, "multifd: received uuid '%s' and expected "
 222                   "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
 223        g_free(uuid);
 224        g_free(msg_uuid);
 225        return -1;
 226    }
 227
 228    if (msg.id > migrate_multifd_channels()) {
 229        error_setg(errp, "multifd: received channel version %d "
 230                   "expected %d", msg.version, MULTIFD_VERSION);
 231        return -1;
 232    }
 233
 234    return msg.id;
 235}
 236
 237static MultiFDPages_t *multifd_pages_init(size_t size)
 238{
 239    MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
 240
 241    pages->allocated = size;
 242    pages->iov = g_new0(struct iovec, size);
 243    pages->offset = g_new0(ram_addr_t, size);
 244
 245    return pages;
 246}
 247
 248static void multifd_pages_clear(MultiFDPages_t *pages)
 249{
 250    pages->used = 0;
 251    pages->allocated = 0;
 252    pages->packet_num = 0;
 253    pages->block = NULL;
 254    g_free(pages->iov);
 255    pages->iov = NULL;
 256    g_free(pages->offset);
 257    pages->offset = NULL;
 258    g_free(pages);
 259}
 260
 261static void multifd_send_fill_packet(MultiFDSendParams *p)
 262{
 263    MultiFDPacket_t *packet = p->packet;
 264    int i;
 265
 266    packet->flags = cpu_to_be32(p->flags);
 267    packet->pages_alloc = cpu_to_be32(p->pages->allocated);
 268    packet->pages_used = cpu_to_be32(p->pages->used);
 269    packet->next_packet_size = cpu_to_be32(p->next_packet_size);
 270    packet->packet_num = cpu_to_be64(p->packet_num);
 271
 272    if (p->pages->block) {
 273        strncpy(packet->ramblock, p->pages->block->idstr, 256);
 274    }
 275
 276    for (i = 0; i < p->pages->used; i++) {
 277        /* there are architectures where ram_addr_t is 32 bit */
 278        uint64_t temp = p->pages->offset[i];
 279
 280        packet->offset[i] = cpu_to_be64(temp);
 281    }
 282}
 283
 284static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
 285{
 286    MultiFDPacket_t *packet = p->packet;
 287    uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size();
 288    RAMBlock *block;
 289    int i;
 290
 291    packet->magic = be32_to_cpu(packet->magic);
 292    if (packet->magic != MULTIFD_MAGIC) {
 293        error_setg(errp, "multifd: received packet "
 294                   "magic %x and expected magic %x",
 295                   packet->magic, MULTIFD_MAGIC);
 296        return -1;
 297    }
 298
 299    packet->version = be32_to_cpu(packet->version);
 300    if (packet->version != MULTIFD_VERSION) {
 301        error_setg(errp, "multifd: received packet "
 302                   "version %d and expected version %d",
 303                   packet->version, MULTIFD_VERSION);
 304        return -1;
 305    }
 306
 307    p->flags = be32_to_cpu(packet->flags);
 308
 309    packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
 310    /*
 311     * If we received a packet that is 100 times bigger than expected
 312     * just stop migration.  It is a magic number.
 313     */
 314    if (packet->pages_alloc > pages_max * 100) {
 315        error_setg(errp, "multifd: received packet "
 316                   "with size %d and expected a maximum size of %d",
 317                   packet->pages_alloc, pages_max * 100) ;
 318        return -1;
 319    }
 320    /*
 321     * We received a packet that is bigger than expected but inside
 322     * reasonable limits (see previous comment).  Just reallocate.
 323     */
 324    if (packet->pages_alloc > p->pages->allocated) {
 325        multifd_pages_clear(p->pages);
 326        p->pages = multifd_pages_init(packet->pages_alloc);
 327    }
 328
 329    p->pages->used = be32_to_cpu(packet->pages_used);
 330    if (p->pages->used > packet->pages_alloc) {
 331        error_setg(errp, "multifd: received packet "
 332                   "with %d pages and expected maximum pages are %d",
 333                   p->pages->used, packet->pages_alloc) ;
 334        return -1;
 335    }
 336
 337    p->next_packet_size = be32_to_cpu(packet->next_packet_size);
 338    p->packet_num = be64_to_cpu(packet->packet_num);
 339
 340    if (p->pages->used == 0) {
 341        return 0;
 342    }
 343
 344    /* make sure that ramblock is 0 terminated */
 345    packet->ramblock[255] = 0;
 346    block = qemu_ram_block_by_name(packet->ramblock);
 347    if (!block) {
 348        error_setg(errp, "multifd: unknown ram block %s",
 349                   packet->ramblock);
 350        return -1;
 351    }
 352
 353    for (i = 0; i < p->pages->used; i++) {
 354        uint64_t offset = be64_to_cpu(packet->offset[i]);
 355
 356        if (offset > (block->used_length - qemu_target_page_size())) {
 357            error_setg(errp, "multifd: offset too long %" PRIu64
 358                       " (max " RAM_ADDR_FMT ")",
 359                       offset, block->max_length);
 360            return -1;
 361        }
 362        p->pages->iov[i].iov_base = block->host + offset;
 363        p->pages->iov[i].iov_len = qemu_target_page_size();
 364    }
 365
 366    return 0;
 367}
 368
 369struct {
 370    MultiFDSendParams *params;
 371    /* array of pages to sent */
 372    MultiFDPages_t *pages;
 373    /* global number of generated multifd packets */
 374    uint64_t packet_num;
 375    /* send channels ready */
 376    QemuSemaphore channels_ready;
 377    /*
 378     * Have we already run terminate threads.  There is a race when it
 379     * happens that we got one error while we are exiting.
 380     * We will use atomic operations.  Only valid values are 0 and 1.
 381     */
 382    int exiting;
 383    /* multifd ops */
 384    MultiFDMethods *ops;
 385} *multifd_send_state;
 386
 387/*
 388 * How we use multifd_send_state->pages and channel->pages?
 389 *
 390 * We create a pages for each channel, and a main one.  Each time that
 391 * we need to send a batch of pages we interchange the ones between
 392 * multifd_send_state and the channel that is sending it.  There are
 393 * two reasons for that:
 394 *    - to not have to do so many mallocs during migration
 395 *    - to make easier to know what to free at the end of migration
 396 *
 397 * This way we always know who is the owner of each "pages" struct,
 398 * and we don't need any locking.  It belongs to the migration thread
 399 * or to the channel thread.  Switching is safe because the migration
 400 * thread is using the channel mutex when changing it, and the channel
 401 * have to had finish with its own, otherwise pending_job can't be
 402 * false.
 403 */
 404
 405static int multifd_send_pages(QEMUFile *f)
 406{
 407    int i;
 408    static int next_channel;
 409    MultiFDSendParams *p = NULL; /* make happy gcc */
 410    MultiFDPages_t *pages = multifd_send_state->pages;
 411    uint64_t transferred;
 412
 413    if (atomic_read(&multifd_send_state->exiting)) {
 414        return -1;
 415    }
 416
 417    qemu_sem_wait(&multifd_send_state->channels_ready);
 418    /*
 419     * next_channel can remain from a previous migration that was
 420     * using more channels, so ensure it doesn't overflow if the
 421     * limit is lower now.
 422     */
 423    next_channel %= migrate_multifd_channels();
 424    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
 425        p = &multifd_send_state->params[i];
 426
 427        qemu_mutex_lock(&p->mutex);
 428        if (p->quit) {
 429            error_report("%s: channel %d has already quit!", __func__, i);
 430            qemu_mutex_unlock(&p->mutex);
 431            return -1;
 432        }
 433        if (!p->pending_job) {
 434            p->pending_job++;
 435            next_channel = (i + 1) % migrate_multifd_channels();
 436            break;
 437        }
 438        qemu_mutex_unlock(&p->mutex);
 439    }
 440    assert(!p->pages->used);
 441    assert(!p->pages->block);
 442
 443    p->packet_num = multifd_send_state->packet_num++;
 444    multifd_send_state->pages = p->pages;
 445    p->pages = pages;
 446    transferred = ((uint64_t) pages->used) * qemu_target_page_size()
 447                + p->packet_len;
 448    qemu_file_update_transfer(f, transferred);
 449    ram_counters.multifd_bytes += transferred;
 450    ram_counters.transferred += transferred;;
 451    qemu_mutex_unlock(&p->mutex);
 452    qemu_sem_post(&p->sem);
 453
 454    return 1;
 455}
 456
 457int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
 458{
 459    MultiFDPages_t *pages = multifd_send_state->pages;
 460
 461    if (!pages->block) {
 462        pages->block = block;
 463    }
 464
 465    if (pages->block == block) {
 466        pages->offset[pages->used] = offset;
 467        pages->iov[pages->used].iov_base = block->host + offset;
 468        pages->iov[pages->used].iov_len = qemu_target_page_size();
 469        pages->used++;
 470
 471        if (pages->used < pages->allocated) {
 472            return 1;
 473        }
 474    }
 475
 476    if (multifd_send_pages(f) < 0) {
 477        return -1;
 478    }
 479
 480    if (pages->block != block) {
 481        return  multifd_queue_page(f, block, offset);
 482    }
 483
 484    return 1;
 485}
 486
 487static void multifd_send_terminate_threads(Error *err)
 488{
 489    int i;
 490
 491    trace_multifd_send_terminate_threads(err != NULL);
 492
 493    if (err) {
 494        MigrationState *s = migrate_get_current();
 495        migrate_set_error(s, err);
 496        if (s->state == MIGRATION_STATUS_SETUP ||
 497            s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
 498            s->state == MIGRATION_STATUS_DEVICE ||
 499            s->state == MIGRATION_STATUS_ACTIVE) {
 500            migrate_set_state(&s->state, s->state,
 501                              MIGRATION_STATUS_FAILED);
 502        }
 503    }
 504
 505    /*
 506     * We don't want to exit each threads twice.  Depending on where
 507     * we get the error, or if there are two independent errors in two
 508     * threads at the same time, we can end calling this function
 509     * twice.
 510     */
 511    if (atomic_xchg(&multifd_send_state->exiting, 1)) {
 512        return;
 513    }
 514
 515    for (i = 0; i < migrate_multifd_channels(); i++) {
 516        MultiFDSendParams *p = &multifd_send_state->params[i];
 517
 518        qemu_mutex_lock(&p->mutex);
 519        p->quit = true;
 520        qemu_sem_post(&p->sem);
 521        qemu_mutex_unlock(&p->mutex);
 522    }
 523}
 524
 525void multifd_save_cleanup(void)
 526{
 527    int i;
 528
 529    if (!migrate_use_multifd()) {
 530        return;
 531    }
 532    multifd_send_terminate_threads(NULL);
 533    for (i = 0; i < migrate_multifd_channels(); i++) {
 534        MultiFDSendParams *p = &multifd_send_state->params[i];
 535
 536        if (p->running) {
 537            qemu_thread_join(&p->thread);
 538        }
 539    }
 540    for (i = 0; i < migrate_multifd_channels(); i++) {
 541        MultiFDSendParams *p = &multifd_send_state->params[i];
 542        Error *local_err = NULL;
 543
 544        socket_send_channel_destroy(p->c);
 545        p->c = NULL;
 546        qemu_mutex_destroy(&p->mutex);
 547        qemu_sem_destroy(&p->sem);
 548        qemu_sem_destroy(&p->sem_sync);
 549        g_free(p->name);
 550        p->name = NULL;
 551        multifd_pages_clear(p->pages);
 552        p->pages = NULL;
 553        p->packet_len = 0;
 554        g_free(p->packet);
 555        p->packet = NULL;
 556        multifd_send_state->ops->send_cleanup(p, &local_err);
 557        if (local_err) {
 558            migrate_set_error(migrate_get_current(), local_err);
 559            error_free(local_err);
 560        }
 561    }
 562    qemu_sem_destroy(&multifd_send_state->channels_ready);
 563    g_free(multifd_send_state->params);
 564    multifd_send_state->params = NULL;
 565    multifd_pages_clear(multifd_send_state->pages);
 566    multifd_send_state->pages = NULL;
 567    g_free(multifd_send_state);
 568    multifd_send_state = NULL;
 569}
 570
 571void multifd_send_sync_main(QEMUFile *f)
 572{
 573    int i;
 574
 575    if (!migrate_use_multifd()) {
 576        return;
 577    }
 578    if (multifd_send_state->pages->used) {
 579        if (multifd_send_pages(f) < 0) {
 580            error_report("%s: multifd_send_pages fail", __func__);
 581            return;
 582        }
 583    }
 584    for (i = 0; i < migrate_multifd_channels(); i++) {
 585        MultiFDSendParams *p = &multifd_send_state->params[i];
 586
 587        trace_multifd_send_sync_main_signal(p->id);
 588
 589        qemu_mutex_lock(&p->mutex);
 590
 591        if (p->quit) {
 592            error_report("%s: channel %d has already quit", __func__, i);
 593            qemu_mutex_unlock(&p->mutex);
 594            return;
 595        }
 596
 597        p->packet_num = multifd_send_state->packet_num++;
 598        p->flags |= MULTIFD_FLAG_SYNC;
 599        p->pending_job++;
 600        qemu_file_update_transfer(f, p->packet_len);
 601        ram_counters.multifd_bytes += p->packet_len;
 602        ram_counters.transferred += p->packet_len;
 603        qemu_mutex_unlock(&p->mutex);
 604        qemu_sem_post(&p->sem);
 605    }
 606    for (i = 0; i < migrate_multifd_channels(); i++) {
 607        MultiFDSendParams *p = &multifd_send_state->params[i];
 608
 609        trace_multifd_send_sync_main_wait(p->id);
 610        qemu_sem_wait(&p->sem_sync);
 611    }
 612    trace_multifd_send_sync_main(multifd_send_state->packet_num);
 613}
 614
 615static void *multifd_send_thread(void *opaque)
 616{
 617    MultiFDSendParams *p = opaque;
 618    Error *local_err = NULL;
 619    int ret = 0;
 620    uint32_t flags = 0;
 621
 622    trace_multifd_send_thread_start(p->id);
 623    rcu_register_thread();
 624
 625    if (multifd_send_initial_packet(p, &local_err) < 0) {
 626        ret = -1;
 627        goto out;
 628    }
 629    /* initial packet */
 630    p->num_packets = 1;
 631
 632    while (true) {
 633        qemu_sem_wait(&p->sem);
 634
 635        if (atomic_read(&multifd_send_state->exiting)) {
 636            break;
 637        }
 638        qemu_mutex_lock(&p->mutex);
 639
 640        if (p->pending_job) {
 641            uint32_t used = p->pages->used;
 642            uint64_t packet_num = p->packet_num;
 643            flags = p->flags;
 644
 645            if (used) {
 646                ret = multifd_send_state->ops->send_prepare(p, used,
 647                                                            &local_err);
 648                if (ret != 0) {
 649                    qemu_mutex_unlock(&p->mutex);
 650                    break;
 651                }
 652            }
 653            multifd_send_fill_packet(p);
 654            p->flags = 0;
 655            p->num_packets++;
 656            p->num_pages += used;
 657            p->pages->used = 0;
 658            p->pages->block = NULL;
 659            qemu_mutex_unlock(&p->mutex);
 660
 661            trace_multifd_send(p->id, packet_num, used, flags,
 662                               p->next_packet_size);
 663
 664            ret = qio_channel_write_all(p->c, (void *)p->packet,
 665                                        p->packet_len, &local_err);
 666            if (ret != 0) {
 667                break;
 668            }
 669
 670            if (used) {
 671                ret = multifd_send_state->ops->send_write(p, used, &local_err);
 672                if (ret != 0) {
 673                    break;
 674                }
 675            }
 676
 677            qemu_mutex_lock(&p->mutex);
 678            p->pending_job--;
 679            qemu_mutex_unlock(&p->mutex);
 680
 681            if (flags & MULTIFD_FLAG_SYNC) {
 682                qemu_sem_post(&p->sem_sync);
 683            }
 684            qemu_sem_post(&multifd_send_state->channels_ready);
 685        } else if (p->quit) {
 686            qemu_mutex_unlock(&p->mutex);
 687            break;
 688        } else {
 689            qemu_mutex_unlock(&p->mutex);
 690            /* sometimes there are spurious wakeups */
 691        }
 692    }
 693
 694out:
 695    if (local_err) {
 696        trace_multifd_send_error(p->id);
 697        multifd_send_terminate_threads(local_err);
 698        error_free(local_err);
 699    }
 700
 701    /*
 702     * Error happen, I will exit, but I can't just leave, tell
 703     * who pay attention to me.
 704     */
 705    if (ret != 0) {
 706        qemu_sem_post(&p->sem_sync);
 707        qemu_sem_post(&multifd_send_state->channels_ready);
 708    }
 709
 710    qemu_mutex_lock(&p->mutex);
 711    p->running = false;
 712    qemu_mutex_unlock(&p->mutex);
 713
 714    rcu_unregister_thread();
 715    trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
 716
 717    return NULL;
 718}
 719
 720static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
 721{
 722    MultiFDSendParams *p = opaque;
 723    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
 724    Error *local_err = NULL;
 725
 726    trace_multifd_new_send_channel_async(p->id);
 727    if (qio_task_propagate_error(task, &local_err)) {
 728        migrate_set_error(migrate_get_current(), local_err);
 729        /* Error happen, we need to tell who pay attention to me */
 730        qemu_sem_post(&multifd_send_state->channels_ready);
 731        qemu_sem_post(&p->sem_sync);
 732        /*
 733         * Although multifd_send_thread is not created, but main migration
 734         * thread neet to judge whether it is running, so we need to mark
 735         * its status.
 736         */
 737        p->quit = true;
 738        object_unref(OBJECT(sioc));
 739        error_free(local_err);
 740    } else {
 741        p->c = QIO_CHANNEL(sioc);
 742        qio_channel_set_delay(p->c, false);
 743        p->running = true;
 744        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
 745                           QEMU_THREAD_JOINABLE);
 746    }
 747}
 748
 749int multifd_save_setup(Error **errp)
 750{
 751    int thread_count;
 752    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
 753    uint8_t i;
 754
 755    if (!migrate_use_multifd()) {
 756        return 0;
 757    }
 758    thread_count = migrate_multifd_channels();
 759    multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
 760    multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
 761    multifd_send_state->pages = multifd_pages_init(page_count);
 762    qemu_sem_init(&multifd_send_state->channels_ready, 0);
 763    atomic_set(&multifd_send_state->exiting, 0);
 764    multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
 765
 766    for (i = 0; i < thread_count; i++) {
 767        MultiFDSendParams *p = &multifd_send_state->params[i];
 768
 769        qemu_mutex_init(&p->mutex);
 770        qemu_sem_init(&p->sem, 0);
 771        qemu_sem_init(&p->sem_sync, 0);
 772        p->quit = false;
 773        p->pending_job = 0;
 774        p->id = i;
 775        p->pages = multifd_pages_init(page_count);
 776        p->packet_len = sizeof(MultiFDPacket_t)
 777                      + sizeof(uint64_t) * page_count;
 778        p->packet = g_malloc0(p->packet_len);
 779        p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
 780        p->packet->version = cpu_to_be32(MULTIFD_VERSION);
 781        p->name = g_strdup_printf("multifdsend_%d", i);
 782        socket_send_channel_create(multifd_new_send_channel_async, p);
 783    }
 784
 785    for (i = 0; i < thread_count; i++) {
 786        MultiFDSendParams *p = &multifd_send_state->params[i];
 787        Error *local_err = NULL;
 788        int ret;
 789
 790        ret = multifd_send_state->ops->send_setup(p, &local_err);
 791        if (ret) {
 792            error_propagate(errp, local_err);
 793            return ret;
 794        }
 795    }
 796    return 0;
 797}
 798
 799struct {
 800    MultiFDRecvParams *params;
 801    /* number of created threads */
 802    int count;
 803    /* syncs main thread and channels */
 804    QemuSemaphore sem_sync;
 805    /* global number of generated multifd packets */
 806    uint64_t packet_num;
 807    /* multifd ops */
 808    MultiFDMethods *ops;
 809} *multifd_recv_state;
 810
 811static void multifd_recv_terminate_threads(Error *err)
 812{
 813    int i;
 814
 815    trace_multifd_recv_terminate_threads(err != NULL);
 816
 817    if (err) {
 818        MigrationState *s = migrate_get_current();
 819        migrate_set_error(s, err);
 820        if (s->state == MIGRATION_STATUS_SETUP ||
 821            s->state == MIGRATION_STATUS_ACTIVE) {
 822            migrate_set_state(&s->state, s->state,
 823                              MIGRATION_STATUS_FAILED);
 824        }
 825    }
 826
 827    for (i = 0; i < migrate_multifd_channels(); i++) {
 828        MultiFDRecvParams *p = &multifd_recv_state->params[i];
 829
 830        qemu_mutex_lock(&p->mutex);
 831        p->quit = true;
 832        /*
 833         * We could arrive here for two reasons:
 834         *  - normal quit, i.e. everything went fine, just finished
 835         *  - error quit: We close the channels so the channel threads
 836         *    finish the qio_channel_read_all_eof()
 837         */
 838        if (p->c) {
 839            qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
 840        }
 841        qemu_mutex_unlock(&p->mutex);
 842    }
 843}
 844
 845int multifd_load_cleanup(Error **errp)
 846{
 847    int i;
 848
 849    if (!migrate_use_multifd()) {
 850        return 0;
 851    }
 852    multifd_recv_terminate_threads(NULL);
 853    for (i = 0; i < migrate_multifd_channels(); i++) {
 854        MultiFDRecvParams *p = &multifd_recv_state->params[i];
 855
 856        if (p->running) {
 857            p->quit = true;
 858            /*
 859             * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
 860             * however try to wakeup it without harm in cleanup phase.
 861             */
 862            qemu_sem_post(&p->sem_sync);
 863            qemu_thread_join(&p->thread);
 864        }
 865    }
 866    for (i = 0; i < migrate_multifd_channels(); i++) {
 867        MultiFDRecvParams *p = &multifd_recv_state->params[i];
 868
 869        object_unref(OBJECT(p->c));
 870        p->c = NULL;
 871        qemu_mutex_destroy(&p->mutex);
 872        qemu_sem_destroy(&p->sem_sync);
 873        g_free(p->name);
 874        p->name = NULL;
 875        multifd_pages_clear(p->pages);
 876        p->pages = NULL;
 877        p->packet_len = 0;
 878        g_free(p->packet);
 879        p->packet = NULL;
 880        multifd_recv_state->ops->recv_cleanup(p);
 881    }
 882    qemu_sem_destroy(&multifd_recv_state->sem_sync);
 883    g_free(multifd_recv_state->params);
 884    multifd_recv_state->params = NULL;
 885    g_free(multifd_recv_state);
 886    multifd_recv_state = NULL;
 887
 888    return 0;
 889}
 890
 891void multifd_recv_sync_main(void)
 892{
 893    int i;
 894
 895    if (!migrate_use_multifd()) {
 896        return;
 897    }
 898    for (i = 0; i < migrate_multifd_channels(); i++) {
 899        MultiFDRecvParams *p = &multifd_recv_state->params[i];
 900
 901        trace_multifd_recv_sync_main_wait(p->id);
 902        qemu_sem_wait(&multifd_recv_state->sem_sync);
 903    }
 904    for (i = 0; i < migrate_multifd_channels(); i++) {
 905        MultiFDRecvParams *p = &multifd_recv_state->params[i];
 906
 907        WITH_QEMU_LOCK_GUARD(&p->mutex) {
 908            if (multifd_recv_state->packet_num < p->packet_num) {
 909                multifd_recv_state->packet_num = p->packet_num;
 910            }
 911        }
 912        trace_multifd_recv_sync_main_signal(p->id);
 913        qemu_sem_post(&p->sem_sync);
 914    }
 915    trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
 916}
 917
 918static void *multifd_recv_thread(void *opaque)
 919{
 920    MultiFDRecvParams *p = opaque;
 921    Error *local_err = NULL;
 922    int ret;
 923
 924    trace_multifd_recv_thread_start(p->id);
 925    rcu_register_thread();
 926
 927    while (true) {
 928        uint32_t used;
 929        uint32_t flags;
 930
 931        if (p->quit) {
 932            break;
 933        }
 934
 935        ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
 936                                       p->packet_len, &local_err);
 937        if (ret == 0) {   /* EOF */
 938            break;
 939        }
 940        if (ret == -1) {   /* Error */
 941            break;
 942        }
 943
 944        qemu_mutex_lock(&p->mutex);
 945        ret = multifd_recv_unfill_packet(p, &local_err);
 946        if (ret) {
 947            qemu_mutex_unlock(&p->mutex);
 948            break;
 949        }
 950
 951        used = p->pages->used;
 952        flags = p->flags;
 953        /* recv methods don't know how to handle the SYNC flag */
 954        p->flags &= ~MULTIFD_FLAG_SYNC;
 955        trace_multifd_recv(p->id, p->packet_num, used, flags,
 956                           p->next_packet_size);
 957        p->num_packets++;
 958        p->num_pages += used;
 959        qemu_mutex_unlock(&p->mutex);
 960
 961        if (used) {
 962            ret = multifd_recv_state->ops->recv_pages(p, used, &local_err);
 963            if (ret != 0) {
 964                break;
 965            }
 966        }
 967
 968        if (flags & MULTIFD_FLAG_SYNC) {
 969            qemu_sem_post(&multifd_recv_state->sem_sync);
 970            qemu_sem_wait(&p->sem_sync);
 971        }
 972    }
 973
 974    if (local_err) {
 975        multifd_recv_terminate_threads(local_err);
 976        error_free(local_err);
 977    }
 978    qemu_mutex_lock(&p->mutex);
 979    p->running = false;
 980    qemu_mutex_unlock(&p->mutex);
 981
 982    rcu_unregister_thread();
 983    trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
 984
 985    return NULL;
 986}
 987
 988int multifd_load_setup(Error **errp)
 989{
 990    int thread_count;
 991    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
 992    uint8_t i;
 993
 994    if (!migrate_use_multifd()) {
 995        return 0;
 996    }
 997    thread_count = migrate_multifd_channels();
 998    multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
 999    multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1000    atomic_set(&multifd_recv_state->count, 0);
1001    qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1002    multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1003
1004    for (i = 0; i < thread_count; i++) {
1005        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1006
1007        qemu_mutex_init(&p->mutex);
1008        qemu_sem_init(&p->sem_sync, 0);
1009        p->quit = false;
1010        p->id = i;
1011        p->pages = multifd_pages_init(page_count);
1012        p->packet_len = sizeof(MultiFDPacket_t)
1013                      + sizeof(uint64_t) * page_count;
1014        p->packet = g_malloc0(p->packet_len);
1015        p->name = g_strdup_printf("multifdrecv_%d", i);
1016    }
1017
1018    for (i = 0; i < thread_count; i++) {
1019        MultiFDRecvParams *p = &multifd_recv_state->params[i];
1020        Error *local_err = NULL;
1021        int ret;
1022
1023        ret = multifd_recv_state->ops->recv_setup(p, &local_err);
1024        if (ret) {
1025            error_propagate(errp, local_err);
1026            return ret;
1027        }
1028    }
1029    return 0;
1030}
1031
1032bool multifd_recv_all_channels_created(void)
1033{
1034    int thread_count = migrate_multifd_channels();
1035
1036    if (!migrate_use_multifd()) {
1037        return true;
1038    }
1039
1040    return thread_count == atomic_read(&multifd_recv_state->count);
1041}
1042
1043/*
1044 * Try to receive all multifd channels to get ready for the migration.
1045 * - Return true and do not set @errp when correctly receving all channels;
1046 * - Return false and do not set @errp when correctly receiving the current one;
1047 * - Return false and set @errp when failing to receive the current channel.
1048 */
1049bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1050{
1051    MultiFDRecvParams *p;
1052    Error *local_err = NULL;
1053    int id;
1054
1055    id = multifd_recv_initial_packet(ioc, &local_err);
1056    if (id < 0) {
1057        multifd_recv_terminate_threads(local_err);
1058        error_propagate_prepend(errp, local_err,
1059                                "failed to receive packet"
1060                                " via multifd channel %d: ",
1061                                atomic_read(&multifd_recv_state->count));
1062        return false;
1063    }
1064    trace_multifd_recv_new_channel(id);
1065
1066    p = &multifd_recv_state->params[id];
1067    if (p->c != NULL) {
1068        error_setg(&local_err, "multifd: received id '%d' already setup'",
1069                   id);
1070        multifd_recv_terminate_threads(local_err);
1071        error_propagate(errp, local_err);
1072        return false;
1073    }
1074    p->c = ioc;
1075    object_ref(OBJECT(ioc));
1076    /* initial packet */
1077    p->num_packets = 1;
1078
1079    p->running = true;
1080    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1081                       QEMU_THREAD_JOINABLE);
1082    atomic_inc(&multifd_recv_state->count);
1083    return atomic_read(&multifd_recv_state->count) ==
1084           migrate_multifd_channels();
1085}
1086