qemu/migration/multifd-device-state.c
<<
>>
Prefs
   1/*
   2 * Multifd device state migration
   3 *
   4 * Copyright (C) 2024,2025 Oracle and/or its affiliates.
   5 *
   6 * This work is licensed under the terms of the GNU GPL, version 2 or later.
   7 * See the COPYING file in the top-level directory.
   8 *
   9 * SPDX-License-Identifier: GPL-2.0-or-later
  10 */
  11
  12#include "qemu/osdep.h"
  13#include "qapi/error.h"
  14#include "qemu/lockable.h"
  15#include "block/thread-pool.h"
  16#include "migration.h"
  17#include "migration/misc.h"
  18#include "multifd.h"
  19#include "options.h"
  20
  21static struct {
  22    QemuMutex queue_job_mutex;
  23
  24    MultiFDSendData *send_data;
  25
  26    ThreadPool *threads;
  27    bool threads_abort;
  28} *multifd_send_device_state;
  29
  30void multifd_device_state_send_setup(void)
  31{
  32    assert(!multifd_send_device_state);
  33    multifd_send_device_state = g_malloc(sizeof(*multifd_send_device_state));
  34
  35    qemu_mutex_init(&multifd_send_device_state->queue_job_mutex);
  36
  37    multifd_send_device_state->send_data = multifd_send_data_alloc();
  38
  39    multifd_send_device_state->threads = thread_pool_new();
  40    multifd_send_device_state->threads_abort = false;
  41}
  42
  43void multifd_device_state_send_cleanup(void)
  44{
  45    g_clear_pointer(&multifd_send_device_state->threads, thread_pool_free);
  46    g_clear_pointer(&multifd_send_device_state->send_data,
  47                    multifd_send_data_free);
  48
  49    qemu_mutex_destroy(&multifd_send_device_state->queue_job_mutex);
  50
  51    g_clear_pointer(&multifd_send_device_state, g_free);
  52}
  53
  54void multifd_send_data_clear_device_state(MultiFDDeviceState_t *device_state)
  55{
  56    g_clear_pointer(&device_state->idstr, g_free);
  57    g_clear_pointer(&device_state->buf, g_free);
  58}
  59
  60static void multifd_device_state_fill_packet(MultiFDSendParams *p)
  61{
  62    MultiFDDeviceState_t *device_state = &p->data->u.device_state;
  63    MultiFDPacketDeviceState_t *packet = p->packet_device_state;
  64
  65    packet->hdr.flags = cpu_to_be32(p->flags);
  66    strncpy(packet->idstr, device_state->idstr, sizeof(packet->idstr) - 1);
  67    packet->idstr[sizeof(packet->idstr) - 1] = 0;
  68    packet->instance_id = cpu_to_be32(device_state->instance_id);
  69    packet->next_packet_size = cpu_to_be32(p->next_packet_size);
  70}
  71
  72static void multifd_prepare_header_device_state(MultiFDSendParams *p)
  73{
  74    p->iov[0].iov_len = sizeof(*p->packet_device_state);
  75    p->iov[0].iov_base = p->packet_device_state;
  76    p->iovs_num++;
  77}
  78
  79void multifd_device_state_send_prepare(MultiFDSendParams *p)
  80{
  81    MultiFDDeviceState_t *device_state = &p->data->u.device_state;
  82
  83    assert(multifd_payload_device_state(p->data));
  84
  85    multifd_prepare_header_device_state(p);
  86
  87    assert(!(p->flags & MULTIFD_FLAG_SYNC));
  88
  89    p->next_packet_size = device_state->buf_len;
  90    if (p->next_packet_size > 0) {
  91        p->iov[p->iovs_num].iov_base = device_state->buf;
  92        p->iov[p->iovs_num].iov_len = p->next_packet_size;
  93        p->iovs_num++;
  94    }
  95
  96    p->flags |= MULTIFD_FLAG_NOCOMP | MULTIFD_FLAG_DEVICE_STATE;
  97
  98    multifd_device_state_fill_packet(p);
  99}
 100
 101bool multifd_queue_device_state(char *idstr, uint32_t instance_id,
 102                                char *data, size_t len)
 103{
 104    /* Device state submissions can come from multiple threads */
 105    QEMU_LOCK_GUARD(&multifd_send_device_state->queue_job_mutex);
 106    MultiFDDeviceState_t *device_state;
 107
 108    assert(multifd_payload_empty(multifd_send_device_state->send_data));
 109
 110    multifd_set_payload_type(multifd_send_device_state->send_data,
 111                             MULTIFD_PAYLOAD_DEVICE_STATE);
 112    device_state = &multifd_send_device_state->send_data->u.device_state;
 113    device_state->idstr = g_strdup(idstr);
 114    device_state->instance_id = instance_id;
 115    device_state->buf = g_memdup2(data, len);
 116    device_state->buf_len = len;
 117
 118    if (!multifd_send(&multifd_send_device_state->send_data)) {
 119        multifd_send_data_clear(multifd_send_device_state->send_data);
 120        return false;
 121    }
 122
 123    return true;
 124}
 125
 126bool multifd_device_state_supported(void)
 127{
 128    return migrate_multifd() && !migrate_mapped_ram() &&
 129        migrate_multifd_compression() == MULTIFD_COMPRESSION_NONE;
 130}
 131
 132static void multifd_device_state_save_thread_data_free(void *opaque)
 133{
 134    SaveCompletePrecopyThreadData *data = opaque;
 135
 136    g_clear_pointer(&data->idstr, g_free);
 137    g_free(data);
 138}
 139
 140static int multifd_device_state_save_thread(void *opaque)
 141{
 142    SaveCompletePrecopyThreadData *data = opaque;
 143    g_autoptr(Error) local_err = NULL;
 144
 145    if (!data->hdlr(data, &local_err)) {
 146        MigrationState *s = migrate_get_current();
 147
 148        /*
 149         * Can't call abort_device_state_save_threads() here since new
 150         * save threads could still be in process of being launched
 151         * (if, for example, the very first save thread launched exited
 152         * with an error very quickly).
 153         */
 154
 155        assert(local_err);
 156
 157        /*
 158         * In case of multiple save threads failing which thread error
 159         * return we end setting is purely arbitrary.
 160         */
 161        migrate_set_error(s, local_err);
 162    }
 163
 164    return 0;
 165}
 166
 167bool multifd_device_state_save_thread_should_exit(void)
 168{
 169    return qatomic_read(&multifd_send_device_state->threads_abort);
 170}
 171
 172void
 173multifd_spawn_device_state_save_thread(SaveCompletePrecopyThreadHandler hdlr,
 174                                       char *idstr, uint32_t instance_id,
 175                                       void *opaque)
 176{
 177    SaveCompletePrecopyThreadData *data;
 178
 179    assert(multifd_device_state_supported());
 180    assert(multifd_send_device_state);
 181
 182    assert(!qatomic_read(&multifd_send_device_state->threads_abort));
 183
 184    data = g_new(SaveCompletePrecopyThreadData, 1);
 185    data->hdlr = hdlr;
 186    data->idstr = g_strdup(idstr);
 187    data->instance_id = instance_id;
 188    data->handler_opaque = opaque;
 189
 190    thread_pool_submit_immediate(multifd_send_device_state->threads,
 191                                 multifd_device_state_save_thread,
 192                                 data,
 193                                 multifd_device_state_save_thread_data_free);
 194}
 195
 196void multifd_abort_device_state_save_threads(void)
 197{
 198    assert(multifd_device_state_supported());
 199
 200    qatomic_set(&multifd_send_device_state->threads_abort, true);
 201}
 202
 203bool multifd_join_device_state_save_threads(void)
 204{
 205    MigrationState *s = migrate_get_current();
 206
 207    assert(multifd_device_state_supported());
 208
 209    thread_pool_wait(multifd_send_device_state->threads);
 210
 211    return !migrate_has_error(s);
 212}
 213