1
2
3
4
5
6
7
8
9
10
11
12#include "qemu/osdep.h"
13#include "qapi/error.h"
14#include "qemu/lockable.h"
15#include "block/thread-pool.h"
16#include "migration.h"
17#include "migration/misc.h"
18#include "multifd.h"
19#include "options.h"
20
21static struct {
22 QemuMutex queue_job_mutex;
23
24 MultiFDSendData *send_data;
25
26 ThreadPool *threads;
27 bool threads_abort;
28} *multifd_send_device_state;
29
30void multifd_device_state_send_setup(void)
31{
32 assert(!multifd_send_device_state);
33 multifd_send_device_state = g_malloc(sizeof(*multifd_send_device_state));
34
35 qemu_mutex_init(&multifd_send_device_state->queue_job_mutex);
36
37 multifd_send_device_state->send_data = multifd_send_data_alloc();
38
39 multifd_send_device_state->threads = thread_pool_new();
40 multifd_send_device_state->threads_abort = false;
41}
42
43void multifd_device_state_send_cleanup(void)
44{
45 g_clear_pointer(&multifd_send_device_state->threads, thread_pool_free);
46 g_clear_pointer(&multifd_send_device_state->send_data,
47 multifd_send_data_free);
48
49 qemu_mutex_destroy(&multifd_send_device_state->queue_job_mutex);
50
51 g_clear_pointer(&multifd_send_device_state, g_free);
52}
53
54void multifd_send_data_clear_device_state(MultiFDDeviceState_t *device_state)
55{
56 g_clear_pointer(&device_state->idstr, g_free);
57 g_clear_pointer(&device_state->buf, g_free);
58}
59
60static void multifd_device_state_fill_packet(MultiFDSendParams *p)
61{
62 MultiFDDeviceState_t *device_state = &p->data->u.device_state;
63 MultiFDPacketDeviceState_t *packet = p->packet_device_state;
64
65 packet->hdr.flags = cpu_to_be32(p->flags);
66 strncpy(packet->idstr, device_state->idstr, sizeof(packet->idstr) - 1);
67 packet->idstr[sizeof(packet->idstr) - 1] = 0;
68 packet->instance_id = cpu_to_be32(device_state->instance_id);
69 packet->next_packet_size = cpu_to_be32(p->next_packet_size);
70}
71
72static void multifd_prepare_header_device_state(MultiFDSendParams *p)
73{
74 p->iov[0].iov_len = sizeof(*p->packet_device_state);
75 p->iov[0].iov_base = p->packet_device_state;
76 p->iovs_num++;
77}
78
79void multifd_device_state_send_prepare(MultiFDSendParams *p)
80{
81 MultiFDDeviceState_t *device_state = &p->data->u.device_state;
82
83 assert(multifd_payload_device_state(p->data));
84
85 multifd_prepare_header_device_state(p);
86
87 assert(!(p->flags & MULTIFD_FLAG_SYNC));
88
89 p->next_packet_size = device_state->buf_len;
90 if (p->next_packet_size > 0) {
91 p->iov[p->iovs_num].iov_base = device_state->buf;
92 p->iov[p->iovs_num].iov_len = p->next_packet_size;
93 p->iovs_num++;
94 }
95
96 p->flags |= MULTIFD_FLAG_NOCOMP | MULTIFD_FLAG_DEVICE_STATE;
97
98 multifd_device_state_fill_packet(p);
99}
100
101bool multifd_queue_device_state(char *idstr, uint32_t instance_id,
102 char *data, size_t len)
103{
104
105 QEMU_LOCK_GUARD(&multifd_send_device_state->queue_job_mutex);
106 MultiFDDeviceState_t *device_state;
107
108 assert(multifd_payload_empty(multifd_send_device_state->send_data));
109
110 multifd_set_payload_type(multifd_send_device_state->send_data,
111 MULTIFD_PAYLOAD_DEVICE_STATE);
112 device_state = &multifd_send_device_state->send_data->u.device_state;
113 device_state->idstr = g_strdup(idstr);
114 device_state->instance_id = instance_id;
115 device_state->buf = g_memdup2(data, len);
116 device_state->buf_len = len;
117
118 if (!multifd_send(&multifd_send_device_state->send_data)) {
119 multifd_send_data_clear(multifd_send_device_state->send_data);
120 return false;
121 }
122
123 return true;
124}
125
126bool multifd_device_state_supported(void)
127{
128 return migrate_multifd() && !migrate_mapped_ram() &&
129 migrate_multifd_compression() == MULTIFD_COMPRESSION_NONE;
130}
131
132static void multifd_device_state_save_thread_data_free(void *opaque)
133{
134 SaveCompletePrecopyThreadData *data = opaque;
135
136 g_clear_pointer(&data->idstr, g_free);
137 g_free(data);
138}
139
140static int multifd_device_state_save_thread(void *opaque)
141{
142 SaveCompletePrecopyThreadData *data = opaque;
143 g_autoptr(Error) local_err = NULL;
144
145 if (!data->hdlr(data, &local_err)) {
146 MigrationState *s = migrate_get_current();
147
148
149
150
151
152
153
154
155 assert(local_err);
156
157
158
159
160
161 migrate_set_error(s, local_err);
162 }
163
164 return 0;
165}
166
167bool multifd_device_state_save_thread_should_exit(void)
168{
169 return qatomic_read(&multifd_send_device_state->threads_abort);
170}
171
172void
173multifd_spawn_device_state_save_thread(SaveCompletePrecopyThreadHandler hdlr,
174 char *idstr, uint32_t instance_id,
175 void *opaque)
176{
177 SaveCompletePrecopyThreadData *data;
178
179 assert(multifd_device_state_supported());
180 assert(multifd_send_device_state);
181
182 assert(!qatomic_read(&multifd_send_device_state->threads_abort));
183
184 data = g_new(SaveCompletePrecopyThreadData, 1);
185 data->hdlr = hdlr;
186 data->idstr = g_strdup(idstr);
187 data->instance_id = instance_id;
188 data->handler_opaque = opaque;
189
190 thread_pool_submit_immediate(multifd_send_device_state->threads,
191 multifd_device_state_save_thread,
192 data,
193 multifd_device_state_save_thread_data_free);
194}
195
196void multifd_abort_device_state_save_threads(void)
197{
198 assert(multifd_device_state_supported());
199
200 qatomic_set(&multifd_send_device_state->threads_abort, true);
201}
202
203bool multifd_join_device_state_save_threads(void)
204{
205 MigrationState *s = migrate_get_current();
206
207 assert(multifd_device_state_supported());
208
209 thread_pool_wait(multifd_send_device_state->threads);
210
211 return !migrate_has_error(s);
212}
213