1
2
3
4
5
6
7
8
9
10
11
12
13#include "qemu/osdep.h"
14#include "qemu/cutils.h"
15#include "qemu/iov.h"
16#include "qemu/rcu.h"
17#include "exec/target_page.h"
18#include "system/system.h"
19#include "system/ramblock.h"
20#include "qemu/error-report.h"
21#include "qapi/error.h"
22#include "file.h"
23#include "migration/misc.h"
24#include "migration.h"
25#include "migration-stats.h"
26#include "savevm.h"
27#include "socket.h"
28#include "tls.h"
29#include "qemu-file.h"
30#include "trace.h"
31#include "multifd.h"
32#include "threadinfo.h"
33#include "options.h"
34#include "qemu/yank.h"
35#include "io/channel-file.h"
36#include "io/channel-socket.h"
37#include "yank_functions.h"
38
39typedef struct {
40 uint32_t magic;
41 uint32_t version;
42 unsigned char uuid[16];
43 uint8_t id;
44 uint8_t unused1[7];
45 uint64_t unused2[4];
46} __attribute__((packed)) MultiFDInit_t;
47
48struct {
49 MultiFDSendParams *params;
50
51
52 QemuMutex multifd_send_mutex;
53
54
55
56
57
58
59
60
61
62
63
64
65
66 uintptr_t packet_num;
67
68
69
70
71 QemuSemaphore channels_created;
72
73 QemuSemaphore channels_ready;
74
75
76
77
78
79 int exiting;
80
81 const MultiFDMethods *ops;
82} *multifd_send_state;
83
84struct {
85 MultiFDRecvParams *params;
86 MultiFDRecvData *data;
87
88 int count;
89
90
91
92
93 QemuSemaphore sem_sync;
94
95 uint64_t packet_num;
96 int exiting;
97
98 const MultiFDMethods *ops;
99} *multifd_recv_state;
100
101MultiFDSendData *multifd_send_data_alloc(void)
102{
103 MultiFDSendData *new = g_new0(MultiFDSendData, 1);
104
105 multifd_ram_payload_alloc(&new->u.ram);
106
107
108 return new;
109}
110
111void multifd_send_data_clear(MultiFDSendData *data)
112{
113 if (multifd_payload_empty(data)) {
114 return;
115 }
116
117 switch (data->type) {
118 case MULTIFD_PAYLOAD_DEVICE_STATE:
119 multifd_send_data_clear_device_state(&data->u.device_state);
120 break;
121 default:
122
123 break;
124 }
125
126 data->type = MULTIFD_PAYLOAD_NONE;
127}
128
129void multifd_send_data_free(MultiFDSendData *data)
130{
131 if (!data) {
132 return;
133 }
134
135
136 multifd_send_data_clear(data);
137
138 multifd_ram_payload_free(&data->u.ram);
139
140 g_free(data);
141}
142
143static bool multifd_use_packets(void)
144{
145 return !migrate_mapped_ram();
146}
147
148void multifd_send_channel_created(void)
149{
150 qemu_sem_post(&multifd_send_state->channels_created);
151}
152
153static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {};
154
155void multifd_register_ops(int method, const MultiFDMethods *ops)
156{
157 assert(0 <= method && method < MULTIFD_COMPRESSION__MAX);
158 assert(!multifd_ops[method]);
159 multifd_ops[method] = ops;
160}
161
162static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
163{
164 MultiFDInit_t msg = {};
165 size_t size = sizeof(msg);
166 int ret;
167
168 msg.magic = cpu_to_be32(MULTIFD_MAGIC);
169 msg.version = cpu_to_be32(MULTIFD_VERSION);
170 msg.id = p->id;
171 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
172
173 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
174 if (ret != 0) {
175 return -1;
176 }
177 stat64_add(&mig_stats.multifd_bytes, size);
178 return 0;
179}
180
181static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
182{
183 MultiFDInit_t msg;
184 int ret;
185
186 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
187 if (ret != 0) {
188 return -1;
189 }
190
191 msg.magic = be32_to_cpu(msg.magic);
192 msg.version = be32_to_cpu(msg.version);
193
194 if (msg.magic != MULTIFD_MAGIC) {
195 error_setg(errp, "multifd: received packet magic %x "
196 "expected %x", msg.magic, MULTIFD_MAGIC);
197 return -1;
198 }
199
200 if (msg.version != MULTIFD_VERSION) {
201 error_setg(errp, "multifd: received packet version %u "
202 "expected %u", msg.version, MULTIFD_VERSION);
203 return -1;
204 }
205
206 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
207 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
208 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
209
210 error_setg(errp, "multifd: received uuid '%s' and expected "
211 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
212 g_free(uuid);
213 g_free(msg_uuid);
214 return -1;
215 }
216
217 if (msg.id > migrate_multifd_channels()) {
218 error_setg(errp, "multifd: received channel id %u is greater than "
219 "number of channels %u", msg.id, migrate_multifd_channels());
220 return -1;
221 }
222
223 return msg.id;
224}
225
226
227void multifd_send_fill_packet(MultiFDSendParams *p)
228{
229 MultiFDPacket_t *packet = p->packet;
230 uint64_t packet_num;
231 bool sync_packet = p->flags & MULTIFD_FLAG_SYNC;
232
233 memset(packet, 0, p->packet_len);
234
235 packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
236 packet->hdr.version = cpu_to_be32(MULTIFD_VERSION);
237
238 packet->hdr.flags = cpu_to_be32(p->flags);
239 packet->next_packet_size = cpu_to_be32(p->next_packet_size);
240
241 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
242 packet->packet_num = cpu_to_be64(packet_num);
243
244 p->packets_sent++;
245
246 if (!sync_packet) {
247 multifd_ram_fill_packet(p);
248 }
249
250 trace_multifd_send_fill(p->id, packet_num,
251 p->flags, p->next_packet_size);
252}
253
254static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p,
255 const MultiFDPacketHdr_t *hdr,
256 Error **errp)
257{
258 uint32_t magic = be32_to_cpu(hdr->magic);
259 uint32_t version = be32_to_cpu(hdr->version);
260
261 if (magic != MULTIFD_MAGIC) {
262 error_setg(errp, "multifd: received packet magic %x, expected %x",
263 magic, MULTIFD_MAGIC);
264 return -1;
265 }
266
267 if (version != MULTIFD_VERSION) {
268 error_setg(errp, "multifd: received packet version %u, expected %u",
269 version, MULTIFD_VERSION);
270 return -1;
271 }
272
273 p->flags = be32_to_cpu(hdr->flags);
274
275 return 0;
276}
277
278static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p,
279 Error **errp)
280{
281 MultiFDPacketDeviceState_t *packet = p->packet_dev_state;
282
283 packet->instance_id = be32_to_cpu(packet->instance_id);
284 p->next_packet_size = be32_to_cpu(packet->next_packet_size);
285
286 return 0;
287}
288
289static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp)
290{
291 const MultiFDPacket_t *packet = p->packet;
292 int ret = 0;
293
294 p->next_packet_size = be32_to_cpu(packet->next_packet_size);
295 p->packet_num = be64_to_cpu(packet->packet_num);
296
297
298 ret = multifd_ram_unfill_packet(p, errp);
299
300 trace_multifd_recv_unfill(p->id, p->packet_num, p->flags,
301 p->next_packet_size);
302
303 return ret;
304}
305
306static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
307{
308 p->packets_recved++;
309
310 if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
311 return multifd_recv_unfill_packet_device_state(p, errp);
312 }
313
314 return multifd_recv_unfill_packet_ram(p, errp);
315}
316
317static bool multifd_send_should_exit(void)
318{
319 return qatomic_read(&multifd_send_state->exiting);
320}
321
322static bool multifd_recv_should_exit(void)
323{
324 return qatomic_read(&multifd_recv_state->exiting);
325}
326
327
328
329
330
331
332
333static void multifd_send_kick_main(MultiFDSendParams *p)
334{
335 qemu_sem_post(&p->sem_sync);
336 qemu_sem_post(&multifd_send_state->channels_ready);
337}
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353bool multifd_send(MultiFDSendData **send_data)
354{
355 int i;
356 static int next_channel;
357 MultiFDSendParams *p = NULL;
358 MultiFDSendData *tmp;
359
360 if (multifd_send_should_exit()) {
361 return false;
362 }
363
364 QEMU_LOCK_GUARD(&multifd_send_state->multifd_send_mutex);
365
366
367 qemu_sem_wait(&multifd_send_state->channels_ready);
368
369
370
371
372
373
374 next_channel %= migrate_multifd_channels();
375 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
376 if (multifd_send_should_exit()) {
377 return false;
378 }
379 p = &multifd_send_state->params[i];
380
381
382
383
384 if (qatomic_read(&p->pending_job) == false) {
385 next_channel = (i + 1) % migrate_multifd_channels();
386 break;
387 }
388 }
389
390
391
392
393
394 smp_mb_acquire();
395
396 assert(multifd_payload_empty(p->data));
397
398
399
400
401
402 tmp = *send_data;
403 *send_data = p->data;
404 p->data = tmp;
405
406
407
408
409
410 qatomic_store_release(&p->pending_job, true);
411 qemu_sem_post(&p->sem);
412
413 return true;
414}
415
416
417static void multifd_send_set_error(Error *err)
418{
419
420
421
422
423
424
425 if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
426 return;
427 }
428
429 if (err) {
430 MigrationState *s = migrate_get_current();
431 migrate_set_error(s, err);
432 if (s->state == MIGRATION_STATUS_SETUP ||
433 s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
434 s->state == MIGRATION_STATUS_DEVICE ||
435 s->state == MIGRATION_STATUS_ACTIVE) {
436 migrate_set_state(&s->state, s->state,
437 MIGRATION_STATUS_FAILED);
438 }
439 }
440}
441
442static void multifd_send_terminate_threads(void)
443{
444 int i;
445
446 trace_multifd_send_terminate_threads();
447
448
449
450
451
452 qatomic_set(&multifd_send_state->exiting, 1);
453
454
455
456
457
458 for (i = 0; i < migrate_multifd_channels(); i++) {
459 MultiFDSendParams *p = &multifd_send_state->params[i];
460
461 qemu_sem_post(&p->sem);
462 if (p->c) {
463 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
464 }
465 }
466
467
468
469
470 for (i = 0; i < migrate_multifd_channels(); i++) {
471 MultiFDSendParams *p = &multifd_send_state->params[i];
472
473 if (p->tls_thread_created) {
474 qemu_thread_join(&p->tls_thread);
475 }
476
477 if (p->thread_created) {
478 qemu_thread_join(&p->thread);
479 }
480 }
481}
482
483static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
484{
485 if (p->c) {
486 migration_ioc_unregister_yank(p->c);
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509 qio_channel_close(p->c, &error_abort);
510 object_unref(OBJECT(p->c));
511 p->c = NULL;
512 }
513 qemu_sem_destroy(&p->sem);
514 qemu_sem_destroy(&p->sem_sync);
515 g_free(p->name);
516 p->name = NULL;
517 g_clear_pointer(&p->data, multifd_send_data_free);
518 p->packet_len = 0;
519 g_clear_pointer(&p->packet_device_state, g_free);
520 g_free(p->packet);
521 p->packet = NULL;
522 multifd_send_state->ops->send_cleanup(p, errp);
523 assert(!p->iov);
524
525 return *errp == NULL;
526}
527
528static void multifd_send_cleanup_state(void)
529{
530 file_cleanup_outgoing_migration();
531 socket_cleanup_outgoing_migration();
532 multifd_device_state_send_cleanup();
533 qemu_sem_destroy(&multifd_send_state->channels_created);
534 qemu_sem_destroy(&multifd_send_state->channels_ready);
535 qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex);
536 g_free(multifd_send_state->params);
537 multifd_send_state->params = NULL;
538 g_free(multifd_send_state);
539 multifd_send_state = NULL;
540}
541
542void multifd_send_shutdown(void)
543{
544 int i;
545
546 if (!migrate_multifd()) {
547 return;
548 }
549
550 for (i = 0; i < migrate_multifd_channels(); i++) {
551 MultiFDSendParams *p = &multifd_send_state->params[i];
552
553
554 if (p->tls_thread_created && p->thread_created) {
555 Error *local_err = NULL;
556
557
558
559
560
561
562
563
564 migration_tls_channel_end(p->c, &local_err);
565
566
567
568
569
570
571 if (local_err && !migration_has_failed(migrate_get_current())) {
572 warn_report(
573 "multifd_send_%d: Failed to terminate TLS connection: %s",
574 p->id, error_get_pretty(local_err));
575 break;
576 }
577 }
578 }
579
580 multifd_send_terminate_threads();
581
582 for (i = 0; i < migrate_multifd_channels(); i++) {
583 MultiFDSendParams *p = &multifd_send_state->params[i];
584 Error *local_err = NULL;
585
586 if (!multifd_send_cleanup_channel(p, &local_err)) {
587 migrate_set_error(migrate_get_current(), local_err);
588 error_free(local_err);
589 }
590 }
591
592 multifd_send_cleanup_state();
593}
594
595static int multifd_zero_copy_flush(QIOChannel *c)
596{
597 int ret;
598 Error *err = NULL;
599
600 ret = qio_channel_flush(c, &err);
601 if (ret < 0) {
602 error_report_err(err);
603 return -1;
604 }
605 if (ret == 1) {
606 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
607 }
608
609 return ret;
610}
611
612int multifd_send_sync_main(MultiFDSyncReq req)
613{
614 int i;
615 bool flush_zero_copy;
616
617 assert(req != MULTIFD_SYNC_NONE);
618
619 flush_zero_copy = migrate_zero_copy_send();
620
621 for (i = 0; i < migrate_multifd_channels(); i++) {
622 MultiFDSendParams *p = &multifd_send_state->params[i];
623
624 if (multifd_send_should_exit()) {
625 return -1;
626 }
627
628 trace_multifd_send_sync_main_signal(p->id);
629
630
631
632
633
634 assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE);
635 qatomic_set(&p->pending_sync, req);
636 qemu_sem_post(&p->sem);
637 }
638 for (i = 0; i < migrate_multifd_channels(); i++) {
639 MultiFDSendParams *p = &multifd_send_state->params[i];
640
641 if (multifd_send_should_exit()) {
642 return -1;
643 }
644
645 qemu_sem_wait(&multifd_send_state->channels_ready);
646 trace_multifd_send_sync_main_wait(p->id);
647 qemu_sem_wait(&p->sem_sync);
648
649 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
650 return -1;
651 }
652 }
653 trace_multifd_send_sync_main(multifd_send_state->packet_num);
654
655 return 0;
656}
657
658static void *multifd_send_thread(void *opaque)
659{
660 MultiFDSendParams *p = opaque;
661 MigrationThread *thread = NULL;
662 Error *local_err = NULL;
663 int ret = 0;
664 bool use_packets = multifd_use_packets();
665
666 thread = migration_threads_add(p->name, qemu_get_thread_id());
667
668 trace_multifd_send_thread_start(p->id);
669 rcu_register_thread();
670
671 if (use_packets) {
672 if (multifd_send_initial_packet(p, &local_err) < 0) {
673 ret = -1;
674 goto out;
675 }
676 }
677
678 while (true) {
679 qemu_sem_post(&multifd_send_state->channels_ready);
680 qemu_sem_wait(&p->sem);
681
682 if (multifd_send_should_exit()) {
683 break;
684 }
685
686
687
688
689
690 if (qatomic_load_acquire(&p->pending_job)) {
691 bool is_device_state = multifd_payload_device_state(p->data);
692 size_t total_size;
693 int write_flags_masked = 0;
694
695 p->flags = 0;
696 p->iovs_num = 0;
697 assert(!multifd_payload_empty(p->data));
698
699 if (is_device_state) {
700 multifd_device_state_send_prepare(p);
701
702
703 write_flags_masked |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
704 } else {
705 ret = multifd_send_state->ops->send_prepare(p, &local_err);
706 if (ret != 0) {
707 break;
708 }
709 }
710
711
712
713
714
715
716 total_size = iov_size(p->iov, p->iovs_num);
717
718 if (migrate_mapped_ram()) {
719 assert(!is_device_state);
720
721 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
722 &p->data->u.ram, &local_err);
723 } else {
724 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
725 NULL, 0,
726 p->write_flags & ~write_flags_masked,
727 &local_err);
728 }
729
730 if (ret != 0) {
731 break;
732 }
733
734 stat64_add(&mig_stats.multifd_bytes, total_size);
735
736 p->next_packet_size = 0;
737 multifd_send_data_clear(p->data);
738
739
740
741
742
743
744 qatomic_store_release(&p->pending_job, false);
745 } else {
746 MultiFDSyncReq req = qatomic_read(&p->pending_sync);
747
748
749
750
751
752
753 assert(req != MULTIFD_SYNC_NONE);
754
755
756 if (req == MULTIFD_SYNC_ALL) {
757 p->flags = MULTIFD_FLAG_SYNC;
758 multifd_send_fill_packet(p);
759 ret = qio_channel_write_all(p->c, (void *)p->packet,
760 p->packet_len, &local_err);
761 if (ret != 0) {
762 break;
763 }
764
765 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
766 }
767
768 qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE);
769 qemu_sem_post(&p->sem_sync);
770 }
771 }
772
773out:
774 if (ret) {
775 assert(local_err);
776 trace_multifd_send_error(p->id);
777 multifd_send_set_error(local_err);
778 multifd_send_kick_main(p);
779 error_free(local_err);
780 }
781
782 rcu_unregister_thread();
783 migration_threads_remove(thread);
784 trace_multifd_send_thread_end(p->id, p->packets_sent);
785
786 return NULL;
787}
788
789static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
790
791typedef struct {
792 MultiFDSendParams *p;
793 QIOChannelTLS *tioc;
794} MultiFDTLSThreadArgs;
795
796static void *multifd_tls_handshake_thread(void *opaque)
797{
798 MultiFDTLSThreadArgs *args = opaque;
799
800 qio_channel_tls_handshake(args->tioc,
801 multifd_new_send_channel_async,
802 args->p,
803 NULL,
804 NULL);
805 g_free(args);
806
807 return NULL;
808}
809
810static bool multifd_tls_channel_connect(MultiFDSendParams *p,
811 QIOChannel *ioc,
812 Error **errp)
813{
814 MigrationState *s = migrate_get_current();
815 const char *hostname = s->hostname;
816 MultiFDTLSThreadArgs *args;
817 QIOChannelTLS *tioc;
818
819 tioc = migration_tls_client_create(ioc, hostname, errp);
820 if (!tioc) {
821 return false;
822 }
823
824
825
826
827
828 object_unref(OBJECT(ioc));
829 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
830 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
831
832 args = g_new0(MultiFDTLSThreadArgs, 1);
833 args->tioc = tioc;
834 args->p = p;
835
836 p->tls_thread_created = true;
837 qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS,
838 multifd_tls_handshake_thread, args,
839 QEMU_THREAD_JOINABLE);
840 return true;
841}
842
843void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
844{
845 qio_channel_set_delay(ioc, false);
846
847 migration_ioc_register_yank(ioc);
848
849 p->c = ioc;
850
851 p->thread_created = true;
852 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
853 QEMU_THREAD_JOINABLE);
854}
855
856
857
858
859
860
861
862static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
863{
864 MultiFDSendParams *p = opaque;
865 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
866 Error *local_err = NULL;
867 bool ret;
868
869 trace_multifd_new_send_channel_async(p->id);
870
871 if (qio_task_propagate_error(task, &local_err)) {
872 ret = false;
873 goto out;
874 }
875
876 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
877 migrate_get_current()->hostname);
878
879 if (migrate_channel_requires_tls_upgrade(ioc)) {
880 ret = multifd_tls_channel_connect(p, ioc, &local_err);
881 if (ret) {
882 return;
883 }
884 } else {
885 multifd_channel_connect(p, ioc);
886 ret = true;
887 }
888
889out:
890
891
892
893
894 multifd_send_channel_created();
895
896 if (ret) {
897 return;
898 }
899
900 trace_multifd_new_send_channel_async_error(p->id, local_err);
901 multifd_send_set_error(local_err);
902
903
904
905
906
907 object_unref(OBJECT(ioc));
908 error_free(local_err);
909}
910
911static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
912{
913 if (!multifd_use_packets()) {
914 return file_send_channel_create(opaque, errp);
915 }
916
917 socket_send_channel_create(multifd_new_send_channel_async, opaque);
918 return true;
919}
920
921bool multifd_send_setup(void)
922{
923 MigrationState *s = migrate_get_current();
924 int thread_count, ret = 0;
925 uint32_t page_count = multifd_ram_page_count();
926 bool use_packets = multifd_use_packets();
927 uint8_t i;
928
929 if (!migrate_multifd()) {
930 return true;
931 }
932
933 thread_count = migrate_multifd_channels();
934 multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
935 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
936 qemu_mutex_init(&multifd_send_state->multifd_send_mutex);
937 qemu_sem_init(&multifd_send_state->channels_created, 0);
938 qemu_sem_init(&multifd_send_state->channels_ready, 0);
939 qatomic_set(&multifd_send_state->exiting, 0);
940 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
941
942 for (i = 0; i < thread_count; i++) {
943 MultiFDSendParams *p = &multifd_send_state->params[i];
944 Error *local_err = NULL;
945
946 qemu_sem_init(&p->sem, 0);
947 qemu_sem_init(&p->sem_sync, 0);
948 p->id = i;
949 p->data = multifd_send_data_alloc();
950
951 if (use_packets) {
952 p->packet_len = sizeof(MultiFDPacket_t)
953 + sizeof(uint64_t) * page_count;
954 p->packet = g_malloc0(p->packet_len);
955 p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state));
956 p->packet_device_state->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
957 p->packet_device_state->hdr.version = cpu_to_be32(MULTIFD_VERSION);
958 }
959 p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i);
960 p->write_flags = 0;
961
962 if (!multifd_new_send_channel_create(p, &local_err)) {
963 migrate_set_error(s, local_err);
964 ret = -1;
965 }
966 }
967
968
969
970
971
972
973 for (i = 0; i < thread_count; i++) {
974 qemu_sem_wait(&multifd_send_state->channels_created);
975 }
976
977 if (ret) {
978 goto err;
979 }
980
981 for (i = 0; i < thread_count; i++) {
982 MultiFDSendParams *p = &multifd_send_state->params[i];
983 Error *local_err = NULL;
984
985 ret = multifd_send_state->ops->send_setup(p, &local_err);
986 if (ret) {
987 migrate_set_error(s, local_err);
988 goto err;
989 }
990 assert(p->iov);
991 }
992
993 multifd_device_state_send_setup();
994
995 return true;
996
997err:
998 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
999 MIGRATION_STATUS_FAILED);
1000 return false;
1001}
1002
1003bool multifd_recv(void)
1004{
1005 int i;
1006 static int next_recv_channel;
1007 MultiFDRecvParams *p = NULL;
1008 MultiFDRecvData *data = multifd_recv_state->data;
1009
1010
1011
1012
1013
1014
1015 next_recv_channel %= migrate_multifd_channels();
1016 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
1017 if (multifd_recv_should_exit()) {
1018 return false;
1019 }
1020
1021 p = &multifd_recv_state->params[i];
1022
1023 if (qatomic_read(&p->pending_job) == false) {
1024 next_recv_channel = (i + 1) % migrate_multifd_channels();
1025 break;
1026 }
1027 }
1028
1029
1030
1031
1032
1033 smp_mb_acquire();
1034
1035 assert(!p->data->size);
1036 multifd_recv_state->data = p->data;
1037 p->data = data;
1038
1039
1040
1041
1042
1043 qatomic_store_release(&p->pending_job, true);
1044 qemu_sem_post(&p->sem);
1045
1046 return true;
1047}
1048
1049MultiFDRecvData *multifd_get_recv_data(void)
1050{
1051 return multifd_recv_state->data;
1052}
1053
1054static void multifd_recv_terminate_threads(Error *err)
1055{
1056 int i;
1057
1058 trace_multifd_recv_terminate_threads(err != NULL);
1059
1060 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
1061 return;
1062 }
1063
1064 if (err) {
1065 MigrationState *s = migrate_get_current();
1066 migrate_set_error(s, err);
1067 if (s->state == MIGRATION_STATUS_SETUP ||
1068 s->state == MIGRATION_STATUS_ACTIVE) {
1069 migrate_set_state(&s->state, s->state,
1070 MIGRATION_STATUS_FAILED);
1071 }
1072 }
1073
1074 for (i = 0; i < migrate_multifd_channels(); i++) {
1075 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1076
1077
1078
1079
1080
1081 if (multifd_use_packets()) {
1082
1083
1084
1085
1086
1087
1088 qemu_sem_post(&p->sem_sync);
1089 } else {
1090
1091
1092
1093
1094
1095
1096
1097 qemu_sem_post(&p->sem);
1098 }
1099
1100
1101
1102
1103
1104
1105
1106 if (p->c) {
1107 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1108 }
1109 }
1110}
1111
1112void multifd_recv_shutdown(void)
1113{
1114 if (migrate_multifd()) {
1115 multifd_recv_terminate_threads(NULL);
1116 }
1117}
1118
1119static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1120{
1121 migration_ioc_unregister_yank(p->c);
1122 object_unref(OBJECT(p->c));
1123 p->c = NULL;
1124 qemu_mutex_destroy(&p->mutex);
1125 qemu_sem_destroy(&p->sem_sync);
1126 qemu_sem_destroy(&p->sem);
1127 g_free(p->data);
1128 p->data = NULL;
1129 g_free(p->name);
1130 p->name = NULL;
1131 p->packet_len = 0;
1132 g_free(p->packet);
1133 p->packet = NULL;
1134 g_clear_pointer(&p->packet_dev_state, g_free);
1135 g_free(p->normal);
1136 p->normal = NULL;
1137 g_free(p->zero);
1138 p->zero = NULL;
1139 multifd_recv_state->ops->recv_cleanup(p);
1140}
1141
1142static void multifd_recv_cleanup_state(void)
1143{
1144 qemu_sem_destroy(&multifd_recv_state->sem_sync);
1145 g_free(multifd_recv_state->params);
1146 multifd_recv_state->params = NULL;
1147 g_free(multifd_recv_state->data);
1148 multifd_recv_state->data = NULL;
1149 g_free(multifd_recv_state);
1150 multifd_recv_state = NULL;
1151}
1152
1153void multifd_recv_cleanup(void)
1154{
1155 int i;
1156
1157 if (!migrate_multifd()) {
1158 return;
1159 }
1160 multifd_recv_terminate_threads(NULL);
1161 for (i = 0; i < migrate_multifd_channels(); i++) {
1162 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1163
1164 if (p->thread_created) {
1165 qemu_thread_join(&p->thread);
1166 }
1167 }
1168 for (i = 0; i < migrate_multifd_channels(); i++) {
1169 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1170 }
1171 multifd_recv_cleanup_state();
1172}
1173
1174void multifd_recv_sync_main(void)
1175{
1176 int thread_count = migrate_multifd_channels();
1177 bool file_based = !multifd_use_packets();
1178 int i;
1179
1180 if (!migrate_multifd()) {
1181 return;
1182 }
1183
1184
1185
1186
1187
1188 if (file_based) {
1189 for (i = 0; i < thread_count; i++) {
1190 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1191
1192 trace_multifd_recv_sync_main_signal(p->id);
1193 qemu_sem_post(&p->sem);
1194 }
1195 }
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206 for (i = 0; i < thread_count; i++) {
1207 trace_multifd_recv_sync_main_wait(i);
1208 qemu_sem_wait(&multifd_recv_state->sem_sync);
1209 }
1210
1211 if (file_based) {
1212
1213
1214
1215
1216 return;
1217 }
1218
1219
1220
1221
1222 for (i = 0; i < thread_count; i++) {
1223 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1224
1225 WITH_QEMU_LOCK_GUARD(&p->mutex) {
1226 if (multifd_recv_state->packet_num < p->packet_num) {
1227 multifd_recv_state->packet_num = p->packet_num;
1228 }
1229 }
1230 trace_multifd_recv_sync_main_signal(p->id);
1231 qemu_sem_post(&p->sem_sync);
1232 }
1233 trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1234}
1235
1236static int multifd_device_state_recv(MultiFDRecvParams *p, Error **errp)
1237{
1238 g_autofree char *dev_state_buf = NULL;
1239 int ret;
1240
1241 dev_state_buf = g_malloc(p->next_packet_size);
1242
1243 ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, errp);
1244 if (ret != 0) {
1245 return ret;
1246 }
1247
1248 if (p->packet_dev_state->idstr[sizeof(p->packet_dev_state->idstr) - 1]
1249 != 0) {
1250 error_setg(errp, "unterminated multifd device state idstr");
1251 return -1;
1252 }
1253
1254 if (!qemu_loadvm_load_state_buffer(p->packet_dev_state->idstr,
1255 p->packet_dev_state->instance_id,
1256 dev_state_buf, p->next_packet_size,
1257 errp)) {
1258 ret = -1;
1259 }
1260
1261 return ret;
1262}
1263
1264static void *multifd_recv_thread(void *opaque)
1265{
1266 MigrationState *s = migrate_get_current();
1267 MultiFDRecvParams *p = opaque;
1268 Error *local_err = NULL;
1269 bool use_packets = multifd_use_packets();
1270 int ret;
1271
1272 trace_multifd_recv_thread_start(p->id);
1273 rcu_register_thread();
1274
1275 if (!s->multifd_clean_tls_termination) {
1276 p->read_flags = QIO_CHANNEL_READ_FLAG_RELAXED_EOF;
1277 }
1278
1279 while (true) {
1280 MultiFDPacketHdr_t hdr;
1281 uint32_t flags = 0;
1282 bool is_device_state = false;
1283 bool has_data = false;
1284 uint8_t *pkt_buf;
1285 size_t pkt_len;
1286
1287 p->normal_num = 0;
1288
1289 if (use_packets) {
1290 struct iovec iov = {
1291 .iov_base = (void *)&hdr,
1292 .iov_len = sizeof(hdr)
1293 };
1294
1295 if (multifd_recv_should_exit()) {
1296 break;
1297 }
1298
1299 ret = qio_channel_readv_full_all_eof(p->c, &iov, 1, NULL, NULL,
1300 p->read_flags, &local_err);
1301 if (!ret) {
1302
1303 assert(!local_err);
1304 break;
1305 }
1306
1307 if (ret == -1) {
1308 break;
1309 }
1310
1311 ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err);
1312 if (ret) {
1313 break;
1314 }
1315
1316 is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE;
1317 if (is_device_state) {
1318 pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr);
1319 pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr);
1320 } else {
1321 pkt_buf = (uint8_t *)p->packet + sizeof(hdr);
1322 pkt_len = p->packet_len - sizeof(hdr);
1323 }
1324
1325 ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len,
1326 &local_err);
1327 if (!ret) {
1328
1329 error_setg(&local_err, "multifd: unexpected EOF after packet header");
1330 break;
1331 }
1332
1333 if (ret == -1) {
1334 break;
1335 }
1336
1337 qemu_mutex_lock(&p->mutex);
1338 ret = multifd_recv_unfill_packet(p, &local_err);
1339 if (ret) {
1340 qemu_mutex_unlock(&p->mutex);
1341 break;
1342 }
1343
1344 flags = p->flags;
1345
1346 p->flags &= ~MULTIFD_FLAG_SYNC;
1347
1348 if (is_device_state) {
1349 has_data = p->next_packet_size > 0;
1350 } else {
1351
1352
1353
1354
1355
1356 has_data = p->normal_num || p->zero_num;
1357 }
1358
1359 qemu_mutex_unlock(&p->mutex);
1360 } else {
1361
1362
1363
1364
1365 qemu_sem_wait(&p->sem);
1366
1367 if (multifd_recv_should_exit()) {
1368 break;
1369 }
1370
1371
1372 if (!qatomic_load_acquire(&p->pending_job)) {
1373
1374
1375
1376
1377
1378
1379 qemu_sem_post(&multifd_recv_state->sem_sync);
1380 continue;
1381 }
1382
1383 has_data = !!p->data->size;
1384 }
1385
1386 if (has_data) {
1387
1388
1389
1390
1391
1392
1393 assert(!migration_in_postcopy());
1394 if (is_device_state) {
1395 assert(use_packets);
1396 ret = multifd_device_state_recv(p, &local_err);
1397 } else {
1398 ret = multifd_recv_state->ops->recv(p, &local_err);
1399 }
1400 if (ret != 0) {
1401 break;
1402 }
1403 } else if (is_device_state) {
1404 error_setg(&local_err,
1405 "multifd: received empty device state packet");
1406 break;
1407 }
1408
1409 if (use_packets) {
1410 if (flags & MULTIFD_FLAG_SYNC) {
1411 if (is_device_state) {
1412 error_setg(&local_err,
1413 "multifd: received SYNC device state packet");
1414 break;
1415 }
1416
1417 qemu_sem_post(&multifd_recv_state->sem_sync);
1418 qemu_sem_wait(&p->sem_sync);
1419 }
1420 } else {
1421 p->data->size = 0;
1422
1423
1424
1425
1426
1427 qatomic_store_release(&p->pending_job, false);
1428 }
1429 }
1430
1431 if (local_err) {
1432 multifd_recv_terminate_threads(local_err);
1433 error_free(local_err);
1434 }
1435
1436 rcu_unregister_thread();
1437 trace_multifd_recv_thread_end(p->id, p->packets_recved);
1438
1439 return NULL;
1440}
1441
1442int multifd_recv_setup(Error **errp)
1443{
1444 int thread_count;
1445 uint32_t page_count = multifd_ram_page_count();
1446 bool use_packets = multifd_use_packets();
1447 uint8_t i;
1448
1449
1450
1451
1452
1453 if (multifd_recv_state || !migrate_multifd()) {
1454 return 0;
1455 }
1456
1457 thread_count = migrate_multifd_channels();
1458 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1459 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1460
1461 multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1462 multifd_recv_state->data->size = 0;
1463
1464 qatomic_set(&multifd_recv_state->count, 0);
1465 qatomic_set(&multifd_recv_state->exiting, 0);
1466 qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1467 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1468
1469 for (i = 0; i < thread_count; i++) {
1470 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1471
1472 qemu_mutex_init(&p->mutex);
1473 qemu_sem_init(&p->sem_sync, 0);
1474 qemu_sem_init(&p->sem, 0);
1475 p->pending_job = false;
1476 p->id = i;
1477
1478 p->data = g_new0(MultiFDRecvData, 1);
1479 p->data->size = 0;
1480
1481 if (use_packets) {
1482 p->packet_len = sizeof(MultiFDPacket_t)
1483 + sizeof(uint64_t) * page_count;
1484 p->packet = g_malloc0(p->packet_len);
1485 p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state));
1486 }
1487 p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i);
1488 p->normal = g_new0(ram_addr_t, page_count);
1489 p->zero = g_new0(ram_addr_t, page_count);
1490 }
1491
1492 for (i = 0; i < thread_count; i++) {
1493 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1494 int ret;
1495
1496 ret = multifd_recv_state->ops->recv_setup(p, errp);
1497 if (ret) {
1498 return ret;
1499 }
1500 }
1501 return 0;
1502}
1503
1504bool multifd_recv_all_channels_created(void)
1505{
1506 int thread_count = migrate_multifd_channels();
1507
1508 if (!migrate_multifd()) {
1509 return true;
1510 }
1511
1512 if (!multifd_recv_state) {
1513
1514 return false;
1515 }
1516
1517 return thread_count == qatomic_read(&multifd_recv_state->count);
1518}
1519
1520
1521
1522
1523
1524void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1525{
1526 MultiFDRecvParams *p;
1527 Error *local_err = NULL;
1528 bool use_packets = multifd_use_packets();
1529 int id;
1530
1531 if (use_packets) {
1532 id = multifd_recv_initial_packet(ioc, &local_err);
1533 if (id < 0) {
1534 multifd_recv_terminate_threads(local_err);
1535 error_propagate_prepend(errp, local_err,
1536 "failed to receive packet"
1537 " via multifd channel %d: ",
1538 qatomic_read(&multifd_recv_state->count));
1539 return;
1540 }
1541 trace_multifd_recv_new_channel(id);
1542 } else {
1543 id = qatomic_read(&multifd_recv_state->count);
1544 }
1545
1546 p = &multifd_recv_state->params[id];
1547 if (p->c != NULL) {
1548 error_setg(&local_err, "multifd: received id '%d' already setup'",
1549 id);
1550 multifd_recv_terminate_threads(local_err);
1551 error_propagate(errp, local_err);
1552 return;
1553 }
1554 p->c = ioc;
1555 object_ref(OBJECT(ioc));
1556
1557 p->thread_created = true;
1558 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1559 QEMU_THREAD_JOINABLE);
1560 qatomic_inc(&multifd_recv_state->count);
1561}
1562