1
2
3
4
5
6
7
8
9
10
11
12
13
14#include "qemu-common.h"
15#include "migration/migration.h"
16#include "migration/qemu-file.h"
17#include "exec/cpu-common.h"
18#include "qemu/main-loop.h"
19#include "qemu/sockets.h"
20#include "qemu/bitmap.h"
21#include "block/coroutine.h"
22#include <stdio.h>
23#include <sys/types.h>
24#include <sys/socket.h>
25#include <netdb.h>
26#include <arpa/inet.h>
27#include <string.h>
28#include <rdma/rdma_cma.h>
29
30
31
32
33
34#ifdef DEBUG_RDMA
35#define DPRINTF(fmt, ...) \
36 do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
37#else
38#define DPRINTF(fmt, ...) \
39 do { } while (0)
40#endif
41
42#ifdef DEBUG_RDMA_VERBOSE
43#define DDPRINTF(fmt, ...) \
44 do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
45#else
46#define DDPRINTF(fmt, ...) \
47 do { } while (0)
48#endif
49
50#ifdef DEBUG_RDMA_REALLY_VERBOSE
51#define DDDPRINTF(fmt, ...) \
52 do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
53#else
54#define DDDPRINTF(fmt, ...) \
55 do { } while (0)
56#endif
57
58
59
60
61#define ERROR(errp, fmt, ...) \
62 do { \
63 fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
64 if (errp && (*(errp) == NULL)) { \
65 error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
66 } \
67 } while (0)
68
69#define RDMA_RESOLVE_TIMEOUT_MS 10000
70
71
72#define RDMA_MERGE_MAX (2 * 1024 * 1024)
73#define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
74
75#define RDMA_REG_CHUNK_SHIFT 20
76
77
78
79
80
81
82
83#define RDMA_SEND_INCREMENT 32768
84
85
86
87
88#define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
89#define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
90
91#define RDMA_CONTROL_VERSION_CURRENT 1
92
93
94
95#define RDMA_CAPABILITY_PIN_ALL 0x01
96
97
98
99
100
101static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
102
103#define CHECK_ERROR_STATE() \
104 do { \
105 if (rdma->error_state) { \
106 if (!rdma->error_reported) { \
107 fprintf(stderr, "RDMA is in an error state waiting migration" \
108 " to abort!\n"); \
109 rdma->error_reported = 1; \
110 } \
111 return rdma->error_state; \
112 } \
113 } while (0);
114
115
116
117
118
119
120
121
122
123
124
125
126
127#define RDMA_WRID_TYPE_SHIFT 0UL
128#define RDMA_WRID_BLOCK_SHIFT 16UL
129#define RDMA_WRID_CHUNK_SHIFT 30UL
130
131#define RDMA_WRID_TYPE_MASK \
132 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
133
134#define RDMA_WRID_BLOCK_MASK \
135 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
136
137#define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
138
139
140
141
142
143
144enum {
145 RDMA_WRID_NONE = 0,
146 RDMA_WRID_RDMA_WRITE = 1,
147 RDMA_WRID_SEND_CONTROL = 2000,
148 RDMA_WRID_RECV_CONTROL = 4000,
149};
150
151const char *wrid_desc[] = {
152 [RDMA_WRID_NONE] = "NONE",
153 [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
154 [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
155 [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
156};
157
158
159
160
161
162
163
164
165enum {
166 RDMA_WRID_READY = 0,
167 RDMA_WRID_DATA,
168 RDMA_WRID_CONTROL,
169 RDMA_WRID_MAX,
170};
171
172
173
174
175enum {
176 RDMA_CONTROL_NONE = 0,
177 RDMA_CONTROL_ERROR,
178 RDMA_CONTROL_READY,
179 RDMA_CONTROL_QEMU_FILE,
180 RDMA_CONTROL_RAM_BLOCKS_REQUEST,
181 RDMA_CONTROL_RAM_BLOCKS_RESULT,
182 RDMA_CONTROL_COMPRESS,
183 RDMA_CONTROL_REGISTER_REQUEST,
184 RDMA_CONTROL_REGISTER_RESULT,
185 RDMA_CONTROL_REGISTER_FINISHED,
186 RDMA_CONTROL_UNREGISTER_REQUEST,
187 RDMA_CONTROL_UNREGISTER_FINISHED,
188};
189
190const char *control_desc[] = {
191 [RDMA_CONTROL_NONE] = "NONE",
192 [RDMA_CONTROL_ERROR] = "ERROR",
193 [RDMA_CONTROL_READY] = "READY",
194 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
195 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
196 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
197 [RDMA_CONTROL_COMPRESS] = "COMPRESS",
198 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
199 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
200 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
201 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
202 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
203};
204
205
206
207
208
209typedef struct {
210 uint8_t control[RDMA_CONTROL_MAX_BUFFER];
211 struct ibv_mr *control_mr;
212 size_t control_len;
213 uint8_t *control_curr;
214} RDMAWorkRequestData;
215
216
217
218
219typedef struct {
220 uint32_t version;
221 uint32_t flags;
222} RDMACapabilities;
223
224static void caps_to_network(RDMACapabilities *cap)
225{
226 cap->version = htonl(cap->version);
227 cap->flags = htonl(cap->flags);
228}
229
230static void network_to_caps(RDMACapabilities *cap)
231{
232 cap->version = ntohl(cap->version);
233 cap->flags = ntohl(cap->flags);
234}
235
236
237
238
239
240
241
242
243typedef struct RDMALocalBlock {
244 uint8_t *local_host_addr;
245 uint64_t remote_host_addr;
246 uint64_t offset;
247 uint64_t length;
248 struct ibv_mr **pmr;
249 struct ibv_mr *mr;
250 uint32_t *remote_keys;
251 uint32_t remote_rkey;
252 int index;
253 bool is_ram_block;
254 int nb_chunks;
255 unsigned long *transit_bitmap;
256 unsigned long *unregister_bitmap;
257} RDMALocalBlock;
258
259
260
261
262
263
264
265
266typedef struct QEMU_PACKED RDMARemoteBlock {
267 uint64_t remote_host_addr;
268 uint64_t offset;
269 uint64_t length;
270 uint32_t remote_rkey;
271 uint32_t padding;
272} RDMARemoteBlock;
273
274static uint64_t htonll(uint64_t v)
275{
276 union { uint32_t lv[2]; uint64_t llv; } u;
277 u.lv[0] = htonl(v >> 32);
278 u.lv[1] = htonl(v & 0xFFFFFFFFULL);
279 return u.llv;
280}
281
282static uint64_t ntohll(uint64_t v) {
283 union { uint32_t lv[2]; uint64_t llv; } u;
284 u.llv = v;
285 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
286}
287
288static void remote_block_to_network(RDMARemoteBlock *rb)
289{
290 rb->remote_host_addr = htonll(rb->remote_host_addr);
291 rb->offset = htonll(rb->offset);
292 rb->length = htonll(rb->length);
293 rb->remote_rkey = htonl(rb->remote_rkey);
294}
295
296static void network_to_remote_block(RDMARemoteBlock *rb)
297{
298 rb->remote_host_addr = ntohll(rb->remote_host_addr);
299 rb->offset = ntohll(rb->offset);
300 rb->length = ntohll(rb->length);
301 rb->remote_rkey = ntohl(rb->remote_rkey);
302}
303
304
305
306
307
308
309typedef struct RDMALocalBlocks {
310 int nb_blocks;
311 bool init;
312 RDMALocalBlock *block;
313} RDMALocalBlocks;
314
315
316
317
318
319
320
321typedef struct RDMAContext {
322 char *host;
323 int port;
324
325 RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
326
327
328
329
330
331
332
333
334 int control_ready_expected;
335
336
337 int nb_sent;
338
339
340
341 uint64_t current_addr;
342 uint64_t current_length;
343
344 int current_index;
345
346 int current_chunk;
347
348 bool pin_all;
349
350
351
352
353
354
355
356
357 struct rdma_cm_id *cm_id;
358 struct rdma_cm_id *listen_id;
359 bool connected;
360
361 struct ibv_context *verbs;
362 struct rdma_event_channel *channel;
363 struct ibv_qp *qp;
364 struct ibv_comp_channel *comp_channel;
365 struct ibv_pd *pd;
366 struct ibv_cq *cq;
367
368
369
370
371
372
373 int error_state;
374 int error_reported;
375
376
377
378
379 RDMALocalBlocks local_ram_blocks;
380 RDMARemoteBlock *block;
381
382
383
384
385
386
387 int migration_started_on_destination;
388
389 int total_registrations;
390 int total_writes;
391
392 int unregister_current, unregister_next;
393 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
394
395 GHashTable *blockmap;
396} RDMAContext;
397
398
399
400
401typedef struct QEMUFileRDMA {
402 RDMAContext *rdma;
403 size_t len;
404 void *file;
405} QEMUFileRDMA;
406
407
408
409
410
411typedef struct QEMU_PACKED {
412 uint32_t len;
413 uint32_t type;
414 uint32_t repeat;
415 uint32_t padding;
416} RDMAControlHeader;
417
418static void control_to_network(RDMAControlHeader *control)
419{
420 control->type = htonl(control->type);
421 control->len = htonl(control->len);
422 control->repeat = htonl(control->repeat);
423}
424
425static void network_to_control(RDMAControlHeader *control)
426{
427 control->type = ntohl(control->type);
428 control->len = ntohl(control->len);
429 control->repeat = ntohl(control->repeat);
430}
431
432
433
434
435
436
437
438typedef struct QEMU_PACKED {
439 union QEMU_PACKED {
440 uint64_t current_addr;
441 uint64_t chunk;
442 } key;
443 uint32_t current_index;
444 uint32_t padding;
445 uint64_t chunks;
446} RDMARegister;
447
448static void register_to_network(RDMARegister *reg)
449{
450 reg->key.current_addr = htonll(reg->key.current_addr);
451 reg->current_index = htonl(reg->current_index);
452 reg->chunks = htonll(reg->chunks);
453}
454
455static void network_to_register(RDMARegister *reg)
456{
457 reg->key.current_addr = ntohll(reg->key.current_addr);
458 reg->current_index = ntohl(reg->current_index);
459 reg->chunks = ntohll(reg->chunks);
460}
461
462typedef struct QEMU_PACKED {
463 uint32_t value;
464 uint32_t block_idx;
465 uint64_t offset;
466 uint64_t length;
467} RDMACompress;
468
469static void compress_to_network(RDMACompress *comp)
470{
471 comp->value = htonl(comp->value);
472 comp->block_idx = htonl(comp->block_idx);
473 comp->offset = htonll(comp->offset);
474 comp->length = htonll(comp->length);
475}
476
477static void network_to_compress(RDMACompress *comp)
478{
479 comp->value = ntohl(comp->value);
480 comp->block_idx = ntohl(comp->block_idx);
481 comp->offset = ntohll(comp->offset);
482 comp->length = ntohll(comp->length);
483}
484
485
486
487
488
489
490typedef struct QEMU_PACKED {
491 uint32_t rkey;
492 uint32_t padding;
493 uint64_t host_addr;
494} RDMARegisterResult;
495
496static void result_to_network(RDMARegisterResult *result)
497{
498 result->rkey = htonl(result->rkey);
499 result->host_addr = htonll(result->host_addr);
500};
501
502static void network_to_result(RDMARegisterResult *result)
503{
504 result->rkey = ntohl(result->rkey);
505 result->host_addr = ntohll(result->host_addr);
506};
507
508const char *print_wrid(int wrid);
509static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
510 uint8_t *data, RDMAControlHeader *resp,
511 int *resp_idx,
512 int (*callback)(RDMAContext *rdma));
513
514static inline uint64_t ram_chunk_index(const uint8_t *start,
515 const uint8_t *host)
516{
517 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
518}
519
520static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
521 uint64_t i)
522{
523 return (uint8_t *) (((uintptr_t) rdma_ram_block->local_host_addr)
524 + (i << RDMA_REG_CHUNK_SHIFT));
525}
526
527static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
528 uint64_t i)
529{
530 uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
531 (1UL << RDMA_REG_CHUNK_SHIFT);
532
533 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
534 result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
535 }
536
537 return result;
538}
539
540static int __qemu_rdma_add_block(RDMAContext *rdma, void *host_addr,
541 ram_addr_t block_offset, uint64_t length)
542{
543 RDMALocalBlocks *local = &rdma->local_ram_blocks;
544 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
545 (void *) block_offset);
546 RDMALocalBlock *old = local->block;
547
548 assert(block == NULL);
549
550 local->block = g_malloc0(sizeof(RDMALocalBlock) * (local->nb_blocks + 1));
551
552 if (local->nb_blocks) {
553 int x;
554
555 for (x = 0; x < local->nb_blocks; x++) {
556 g_hash_table_remove(rdma->blockmap, (void *)old[x].offset);
557 g_hash_table_insert(rdma->blockmap, (void *)old[x].offset,
558 &local->block[x]);
559 }
560 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
561 g_free(old);
562 }
563
564 block = &local->block[local->nb_blocks];
565
566 block->local_host_addr = host_addr;
567 block->offset = block_offset;
568 block->length = length;
569 block->index = local->nb_blocks;
570 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
571 block->transit_bitmap = bitmap_new(block->nb_chunks);
572 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
573 block->unregister_bitmap = bitmap_new(block->nb_chunks);
574 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
575 block->remote_keys = g_malloc0(block->nb_chunks * sizeof(uint32_t));
576
577 block->is_ram_block = local->init ? false : true;
578
579 g_hash_table_insert(rdma->blockmap, (void *) block_offset, block);
580
581 DDPRINTF("Added Block: %d, addr: %" PRIu64 ", offset: %" PRIu64
582 " length: %" PRIu64 " end: %" PRIu64 " bits %" PRIu64 " chunks %d\n",
583 local->nb_blocks, (uint64_t) block->local_host_addr, block->offset,
584 block->length, (uint64_t) (block->local_host_addr + block->length),
585 BITS_TO_LONGS(block->nb_chunks) *
586 sizeof(unsigned long) * 8, block->nb_chunks);
587
588 local->nb_blocks++;
589
590 return 0;
591}
592
593
594
595
596
597
598static void qemu_rdma_init_one_block(void *host_addr,
599 ram_addr_t block_offset, ram_addr_t length, void *opaque)
600{
601 __qemu_rdma_add_block(opaque, host_addr, block_offset, length);
602}
603
604
605
606
607
608
609static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
610{
611 RDMALocalBlocks *local = &rdma->local_ram_blocks;
612
613 assert(rdma->blockmap == NULL);
614 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
615 memset(local, 0, sizeof *local);
616 qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma);
617 DPRINTF("Allocated %d local ram block structures\n", local->nb_blocks);
618 rdma->block = (RDMARemoteBlock *) g_malloc0(sizeof(RDMARemoteBlock) *
619 rdma->local_ram_blocks.nb_blocks);
620 local->init = true;
621 return 0;
622}
623
624static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
625{
626 RDMALocalBlocks *local = &rdma->local_ram_blocks;
627 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
628 (void *) block_offset);
629 RDMALocalBlock *old = local->block;
630 int x;
631
632 assert(block);
633
634 if (block->pmr) {
635 int j;
636
637 for (j = 0; j < block->nb_chunks; j++) {
638 if (!block->pmr[j]) {
639 continue;
640 }
641 ibv_dereg_mr(block->pmr[j]);
642 rdma->total_registrations--;
643 }
644 g_free(block->pmr);
645 block->pmr = NULL;
646 }
647
648 if (block->mr) {
649 ibv_dereg_mr(block->mr);
650 rdma->total_registrations--;
651 block->mr = NULL;
652 }
653
654 g_free(block->transit_bitmap);
655 block->transit_bitmap = NULL;
656
657 g_free(block->unregister_bitmap);
658 block->unregister_bitmap = NULL;
659
660 g_free(block->remote_keys);
661 block->remote_keys = NULL;
662
663 for (x = 0; x < local->nb_blocks; x++) {
664 g_hash_table_remove(rdma->blockmap, (void *)old[x].offset);
665 }
666
667 if (local->nb_blocks > 1) {
668
669 local->block = g_malloc0(sizeof(RDMALocalBlock) *
670 (local->nb_blocks - 1));
671
672 if (block->index) {
673 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
674 }
675
676 if (block->index < (local->nb_blocks - 1)) {
677 memcpy(local->block + block->index, old + (block->index + 1),
678 sizeof(RDMALocalBlock) *
679 (local->nb_blocks - (block->index + 1)));
680 }
681 } else {
682 assert(block == local->block);
683 local->block = NULL;
684 }
685
686 DDPRINTF("Deleted Block: %d, addr: %" PRIu64 ", offset: %" PRIu64
687 " length: %" PRIu64 " end: %" PRIu64 " bits %" PRIu64 " chunks %d\n",
688 local->nb_blocks, (uint64_t) block->local_host_addr, block->offset,
689 block->length, (uint64_t) (block->local_host_addr + block->length),
690 BITS_TO_LONGS(block->nb_chunks) *
691 sizeof(unsigned long) * 8, block->nb_chunks);
692
693 g_free(old);
694
695 local->nb_blocks--;
696
697 if (local->nb_blocks) {
698 for (x = 0; x < local->nb_blocks; x++) {
699 g_hash_table_insert(rdma->blockmap, (void *)local->block[x].offset,
700 &local->block[x]);
701 }
702 }
703
704 return 0;
705}
706
707
708
709
710
711static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
712{
713 struct ibv_port_attr port;
714
715 if (ibv_query_port(verbs, 1, &port)) {
716 fprintf(stderr, "FAILED TO QUERY PORT INFORMATION!\n");
717 return;
718 }
719
720 printf("%s RDMA Device opened: kernel name %s "
721 "uverbs device name %s, "
722 "infiniband_verbs class device path %s, "
723 "infiniband class device path %s, "
724 "transport: (%d) %s\n",
725 who,
726 verbs->device->name,
727 verbs->device->dev_name,
728 verbs->device->dev_path,
729 verbs->device->ibdev_path,
730 port.link_layer,
731 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
732 ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
733 ? "Ethernet" : "Unknown"));
734}
735
736
737
738
739
740
741static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
742{
743 char sgid[33];
744 char dgid[33];
745 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
746 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
747 DPRINTF("%s Source GID: %s, Dest GID: %s\n", who, sgid, dgid);
748}
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793static int qemu_rdma_broken_ipv6_kernel(Error **errp, struct ibv_context *verbs)
794{
795 struct ibv_port_attr port_attr;
796
797
798#ifdef CONFIG_LINUX
799
800
801
802
803
804
805
806
807
808
809 if (!verbs) {
810 int num_devices, x;
811 struct ibv_device ** dev_list = ibv_get_device_list(&num_devices);
812 bool roce_found = false;
813 bool ib_found = false;
814
815 for (x = 0; x < num_devices; x++) {
816 verbs = ibv_open_device(dev_list[x]);
817
818 if (ibv_query_port(verbs, 1, &port_attr)) {
819 ibv_close_device(verbs);
820 ERROR(errp, "Could not query initial IB port");
821 return -EINVAL;
822 }
823
824 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
825 ib_found = true;
826 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
827 roce_found = true;
828 }
829
830 ibv_close_device(verbs);
831
832 }
833
834 if (roce_found) {
835 if (ib_found) {
836 fprintf(stderr, "WARN: migrations may fail:"
837 " IPv6 over RoCE / iWARP in linux"
838 " is broken. But since you appear to have a"
839 " mixed RoCE / IB environment, be sure to only"
840 " migrate over the IB fabric until the kernel "
841 " fixes the bug.\n");
842 } else {
843 ERROR(errp, "You only have RoCE / iWARP devices in your systems"
844 " and your management software has specified '[::]'"
845 ", but IPv6 over RoCE / iWARP is not supported in Linux.");
846 return -ENONET;
847 }
848 }
849
850 return 0;
851 }
852
853
854
855
856
857
858
859
860 if (ibv_query_port(verbs, 1, &port_attr)) {
861 ERROR(errp, "Could not query initial IB port");
862 return -EINVAL;
863 }
864
865 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
866 ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
867 "(but patches on linux-rdma in progress)");
868 return -ENONET;
869 }
870
871#endif
872
873 return 0;
874}
875
876
877
878
879
880
881static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
882{
883 int ret;
884 struct rdma_addrinfo *res;
885 char port_str[16];
886 struct rdma_cm_event *cm_event;
887 char ip[40] = "unknown";
888 struct rdma_addrinfo *e;
889
890 if (rdma->host == NULL || !strcmp(rdma->host, "")) {
891 ERROR(errp, "RDMA hostname has not been set");
892 return -EINVAL;
893 }
894
895
896 rdma->channel = rdma_create_event_channel();
897 if (!rdma->channel) {
898 ERROR(errp, "could not create CM channel");
899 return -EINVAL;
900 }
901
902
903 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
904 if (ret) {
905 ERROR(errp, "could not create channel id");
906 goto err_resolve_create_id;
907 }
908
909 snprintf(port_str, 16, "%d", rdma->port);
910 port_str[15] = '\0';
911
912 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
913 if (ret < 0) {
914 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
915 goto err_resolve_get_addr;
916 }
917
918 for (e = res; e != NULL; e = e->ai_next) {
919 inet_ntop(e->ai_family,
920 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
921 DPRINTF("Trying %s => %s\n", rdma->host, ip);
922
923 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
924 RDMA_RESOLVE_TIMEOUT_MS);
925 if (!ret) {
926 if (e->ai_family == AF_INET6) {
927 ret = qemu_rdma_broken_ipv6_kernel(errp, rdma->cm_id->verbs);
928 if (ret) {
929 continue;
930 }
931 }
932 goto route;
933 }
934 }
935
936 ERROR(errp, "could not resolve address %s", rdma->host);
937 goto err_resolve_get_addr;
938
939route:
940 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
941
942 ret = rdma_get_cm_event(rdma->channel, &cm_event);
943 if (ret) {
944 ERROR(errp, "could not perform event_addr_resolved");
945 goto err_resolve_get_addr;
946 }
947
948 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
949 ERROR(errp, "result not equal to event_addr_resolved %s",
950 rdma_event_str(cm_event->event));
951 perror("rdma_resolve_addr");
952 rdma_ack_cm_event(cm_event);
953 ret = -EINVAL;
954 goto err_resolve_get_addr;
955 }
956 rdma_ack_cm_event(cm_event);
957
958
959 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
960 if (ret) {
961 ERROR(errp, "could not resolve rdma route");
962 goto err_resolve_get_addr;
963 }
964
965 ret = rdma_get_cm_event(rdma->channel, &cm_event);
966 if (ret) {
967 ERROR(errp, "could not perform event_route_resolved");
968 goto err_resolve_get_addr;
969 }
970 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
971 ERROR(errp, "result not equal to event_route_resolved: %s",
972 rdma_event_str(cm_event->event));
973 rdma_ack_cm_event(cm_event);
974 ret = -EINVAL;
975 goto err_resolve_get_addr;
976 }
977 rdma_ack_cm_event(cm_event);
978 rdma->verbs = rdma->cm_id->verbs;
979 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
980 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
981 return 0;
982
983err_resolve_get_addr:
984 rdma_destroy_id(rdma->cm_id);
985 rdma->cm_id = NULL;
986err_resolve_create_id:
987 rdma_destroy_event_channel(rdma->channel);
988 rdma->channel = NULL;
989 return ret;
990}
991
992
993
994
995static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
996{
997
998 rdma->pd = ibv_alloc_pd(rdma->verbs);
999 if (!rdma->pd) {
1000 fprintf(stderr, "failed to allocate protection domain\n");
1001 return -1;
1002 }
1003
1004
1005 rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
1006 if (!rdma->comp_channel) {
1007 fprintf(stderr, "failed to allocate completion channel\n");
1008 goto err_alloc_pd_cq;
1009 }
1010
1011
1012
1013
1014
1015 rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1016 NULL, rdma->comp_channel, 0);
1017 if (!rdma->cq) {
1018 fprintf(stderr, "failed to allocate completion queue\n");
1019 goto err_alloc_pd_cq;
1020 }
1021
1022 return 0;
1023
1024err_alloc_pd_cq:
1025 if (rdma->pd) {
1026 ibv_dealloc_pd(rdma->pd);
1027 }
1028 if (rdma->comp_channel) {
1029 ibv_destroy_comp_channel(rdma->comp_channel);
1030 }
1031 rdma->pd = NULL;
1032 rdma->comp_channel = NULL;
1033 return -1;
1034
1035}
1036
1037
1038
1039
1040static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1041{
1042 struct ibv_qp_init_attr attr = { 0 };
1043 int ret;
1044
1045 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1046 attr.cap.max_recv_wr = 3;
1047 attr.cap.max_send_sge = 1;
1048 attr.cap.max_recv_sge = 1;
1049 attr.send_cq = rdma->cq;
1050 attr.recv_cq = rdma->cq;
1051 attr.qp_type = IBV_QPT_RC;
1052
1053 ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
1054 if (ret) {
1055 return -1;
1056 }
1057
1058 rdma->qp = rdma->cm_id->qp;
1059 return 0;
1060}
1061
1062static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
1063{
1064 int i;
1065 RDMALocalBlocks *local = &rdma->local_ram_blocks;
1066
1067 for (i = 0; i < local->nb_blocks; i++) {
1068 local->block[i].mr =
1069 ibv_reg_mr(rdma->pd,
1070 local->block[i].local_host_addr,
1071 local->block[i].length,
1072 IBV_ACCESS_LOCAL_WRITE |
1073 IBV_ACCESS_REMOTE_WRITE
1074 );
1075 if (!local->block[i].mr) {
1076 perror("Failed to register local dest ram block!\n");
1077 break;
1078 }
1079 rdma->total_registrations++;
1080 }
1081
1082 if (i >= local->nb_blocks) {
1083 return 0;
1084 }
1085
1086 for (i--; i >= 0; i--) {
1087 ibv_dereg_mr(local->block[i].mr);
1088 rdma->total_registrations--;
1089 }
1090
1091 return -1;
1092
1093}
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104static int qemu_rdma_search_ram_block(RDMAContext *rdma,
1105 uint64_t block_offset,
1106 uint64_t offset,
1107 uint64_t length,
1108 uint64_t *block_index,
1109 uint64_t *chunk_index)
1110{
1111 uint64_t current_addr = block_offset + offset;
1112 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1113 (void *) block_offset);
1114 assert(block);
1115 assert(current_addr >= block->offset);
1116 assert((current_addr + length) <= (block->offset + block->length));
1117
1118 *block_index = block->index;
1119 *chunk_index = ram_chunk_index(block->local_host_addr,
1120 block->local_host_addr + (current_addr - block->offset));
1121
1122 return 0;
1123}
1124
1125
1126
1127
1128
1129
1130
1131
1132static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1133 RDMALocalBlock *block, uint8_t *host_addr,
1134 uint32_t *lkey, uint32_t *rkey, int chunk,
1135 uint8_t *chunk_start, uint8_t *chunk_end)
1136{
1137 if (block->mr) {
1138 if (lkey) {
1139 *lkey = block->mr->lkey;
1140 }
1141 if (rkey) {
1142 *rkey = block->mr->rkey;
1143 }
1144 return 0;
1145 }
1146
1147
1148 if (!block->pmr) {
1149 block->pmr = g_malloc0(block->nb_chunks * sizeof(struct ibv_mr *));
1150 if (!block->pmr) {
1151 return -1;
1152 }
1153 }
1154
1155
1156
1157
1158
1159
1160 if (!block->pmr[chunk]) {
1161 uint64_t len = chunk_end - chunk_start;
1162
1163 DDPRINTF("Registering %" PRIu64 " bytes @ %p\n",
1164 len, chunk_start);
1165
1166 block->pmr[chunk] = ibv_reg_mr(rdma->pd,
1167 chunk_start, len,
1168 (rkey ? (IBV_ACCESS_LOCAL_WRITE |
1169 IBV_ACCESS_REMOTE_WRITE) : 0));
1170
1171 if (!block->pmr[chunk]) {
1172 perror("Failed to register chunk!");
1173 fprintf(stderr, "Chunk details: block: %d chunk index %d"
1174 " start %" PRIu64 " end %" PRIu64 " host %" PRIu64
1175 " local %" PRIu64 " registrations: %d\n",
1176 block->index, chunk, (uint64_t) chunk_start,
1177 (uint64_t) chunk_end, (uint64_t) host_addr,
1178 (uint64_t) block->local_host_addr,
1179 rdma->total_registrations);
1180 return -1;
1181 }
1182 rdma->total_registrations++;
1183 }
1184
1185 if (lkey) {
1186 *lkey = block->pmr[chunk]->lkey;
1187 }
1188 if (rkey) {
1189 *rkey = block->pmr[chunk]->rkey;
1190 }
1191 return 0;
1192}
1193
1194
1195
1196
1197
1198static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1199{
1200 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1201 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1202 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1203 if (rdma->wr_data[idx].control_mr) {
1204 rdma->total_registrations++;
1205 return 0;
1206 }
1207 fprintf(stderr, "qemu_rdma_reg_control failed!\n");
1208 return -1;
1209}
1210
1211const char *print_wrid(int wrid)
1212{
1213 if (wrid >= RDMA_WRID_RECV_CONTROL) {
1214 return wrid_desc[RDMA_WRID_RECV_CONTROL];
1215 }
1216 return wrid_desc[wrid];
1217}
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1255{
1256 while (rdma->unregistrations[rdma->unregister_current]) {
1257 int ret;
1258 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1259 uint64_t chunk =
1260 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1261 uint64_t index =
1262 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1263 RDMALocalBlock *block =
1264 &(rdma->local_ram_blocks.block[index]);
1265 RDMARegister reg = { .current_index = index };
1266 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1267 };
1268 RDMAControlHeader head = { .len = sizeof(RDMARegister),
1269 .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1270 .repeat = 1,
1271 };
1272
1273 DDPRINTF("Processing unregister for chunk: %" PRIu64
1274 " at position %d\n", chunk, rdma->unregister_current);
1275
1276 rdma->unregistrations[rdma->unregister_current] = 0;
1277 rdma->unregister_current++;
1278
1279 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1280 rdma->unregister_current = 0;
1281 }
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291 clear_bit(chunk, block->unregister_bitmap);
1292
1293 if (test_bit(chunk, block->transit_bitmap)) {
1294 DDPRINTF("Cannot unregister inflight chunk: %" PRIu64 "\n", chunk);
1295 continue;
1296 }
1297
1298 DDPRINTF("Sending unregister for chunk: %" PRIu64 "\n", chunk);
1299
1300 ret = ibv_dereg_mr(block->pmr[chunk]);
1301 block->pmr[chunk] = NULL;
1302 block->remote_keys[chunk] = 0;
1303
1304 if (ret != 0) {
1305 perror("unregistration chunk failed");
1306 return -ret;
1307 }
1308 rdma->total_registrations--;
1309
1310 reg.key.chunk = chunk;
1311 register_to_network(®);
1312 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®,
1313 &resp, NULL, NULL);
1314 if (ret < 0) {
1315 return ret;
1316 }
1317
1318 DDPRINTF("Unregister for chunk: %" PRIu64 " complete.\n", chunk);
1319 }
1320
1321 return 0;
1322}
1323
1324static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1325 uint64_t chunk)
1326{
1327 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1328
1329 result |= (index << RDMA_WRID_BLOCK_SHIFT);
1330 result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1331
1332 return result;
1333}
1334
1335
1336
1337
1338
1339static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
1340 uint64_t chunk, uint64_t wr_id)
1341{
1342 if (rdma->unregistrations[rdma->unregister_next] != 0) {
1343 fprintf(stderr, "rdma migration: queue is full!\n");
1344 } else {
1345 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1346
1347 if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
1348 DDPRINTF("Appending unregister chunk %" PRIu64
1349 " at position %d\n", chunk, rdma->unregister_next);
1350
1351 rdma->unregistrations[rdma->unregister_next++] =
1352 qemu_rdma_make_wrid(wr_id, index, chunk);
1353
1354 if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
1355 rdma->unregister_next = 0;
1356 }
1357 } else {
1358 DDPRINTF("Unregister chunk %" PRIu64 " already in queue.\n",
1359 chunk);
1360 }
1361 }
1362}
1363
1364
1365
1366
1367
1368
1369static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
1370 uint32_t *byte_len)
1371{
1372 int ret;
1373 struct ibv_wc wc;
1374 uint64_t wr_id;
1375
1376 ret = ibv_poll_cq(rdma->cq, 1, &wc);
1377
1378 if (!ret) {
1379 *wr_id_out = RDMA_WRID_NONE;
1380 return 0;
1381 }
1382
1383 if (ret < 0) {
1384 fprintf(stderr, "ibv_poll_cq return %d!\n", ret);
1385 return ret;
1386 }
1387
1388 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1389
1390 if (wc.status != IBV_WC_SUCCESS) {
1391 fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1392 wc.status, ibv_wc_status_str(wc.status));
1393 fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1394
1395 return -1;
1396 }
1397
1398 if (rdma->control_ready_expected &&
1399 (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1400 DDDPRINTF("completion %s #%" PRId64 " received (%" PRId64 ")"
1401 " left %d\n", wrid_desc[RDMA_WRID_RECV_CONTROL],
1402 wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1403 rdma->control_ready_expected = 0;
1404 }
1405
1406 if (wr_id == RDMA_WRID_RDMA_WRITE) {
1407 uint64_t chunk =
1408 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1409 uint64_t index =
1410 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1411 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1412
1413 DDDPRINTF("completions %s (%" PRId64 ") left %d, "
1414 "block %" PRIu64 ", chunk: %" PRIu64 " %p %p\n",
1415 print_wrid(wr_id), wr_id, rdma->nb_sent, index, chunk,
1416 block->local_host_addr, (void *)block->remote_host_addr);
1417
1418 clear_bit(chunk, block->transit_bitmap);
1419
1420 if (rdma->nb_sent > 0) {
1421 rdma->nb_sent--;
1422 }
1423
1424 if (!rdma->pin_all) {
1425
1426
1427
1428
1429
1430
1431#ifdef RDMA_UNREGISTRATION_EXAMPLE
1432 qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
1433#endif
1434 }
1435 } else {
1436 DDDPRINTF("other completion %s (%" PRId64 ") received left %d\n",
1437 print_wrid(wr_id), wr_id, rdma->nb_sent);
1438 }
1439
1440 *wr_id_out = wc.wr_id;
1441 if (byte_len) {
1442 *byte_len = wc.byte_len;
1443 }
1444
1445 return 0;
1446}
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
1462 uint32_t *byte_len)
1463{
1464 int num_cq_events = 0, ret = 0;
1465 struct ibv_cq *cq;
1466 void *cq_ctx;
1467 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1468
1469 if (ibv_req_notify_cq(rdma->cq, 0)) {
1470 return -1;
1471 }
1472
1473 while (wr_id != wrid_requested) {
1474 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1475 if (ret < 0) {
1476 return ret;
1477 }
1478
1479 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1480
1481 if (wr_id == RDMA_WRID_NONE) {
1482 break;
1483 }
1484 if (wr_id != wrid_requested) {
1485 DDDPRINTF("A Wanted wrid %s (%d) but got %s (%" PRIu64 ")\n",
1486 print_wrid(wrid_requested),
1487 wrid_requested, print_wrid(wr_id), wr_id);
1488 }
1489 }
1490
1491 if (wr_id == wrid_requested) {
1492 return 0;
1493 }
1494
1495 while (1) {
1496
1497
1498
1499
1500 if (rdma->migration_started_on_destination) {
1501 yield_until_fd_readable(rdma->comp_channel->fd);
1502 }
1503
1504 if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
1505 perror("ibv_get_cq_event");
1506 goto err_block_for_wrid;
1507 }
1508
1509 num_cq_events++;
1510
1511 if (ibv_req_notify_cq(cq, 0)) {
1512 goto err_block_for_wrid;
1513 }
1514
1515 while (wr_id != wrid_requested) {
1516 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1517 if (ret < 0) {
1518 goto err_block_for_wrid;
1519 }
1520
1521 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1522
1523 if (wr_id == RDMA_WRID_NONE) {
1524 break;
1525 }
1526 if (wr_id != wrid_requested) {
1527 DDDPRINTF("B Wanted wrid %s (%d) but got %s (%" PRIu64 ")\n",
1528 print_wrid(wrid_requested), wrid_requested,
1529 print_wrid(wr_id), wr_id);
1530 }
1531 }
1532
1533 if (wr_id == wrid_requested) {
1534 goto success_block_for_wrid;
1535 }
1536 }
1537
1538success_block_for_wrid:
1539 if (num_cq_events) {
1540 ibv_ack_cq_events(cq, num_cq_events);
1541 }
1542 return 0;
1543
1544err_block_for_wrid:
1545 if (num_cq_events) {
1546 ibv_ack_cq_events(cq, num_cq_events);
1547 }
1548 return ret;
1549}
1550
1551
1552
1553
1554
1555static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1556 RDMAControlHeader *head)
1557{
1558 int ret = 0;
1559 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1560 struct ibv_send_wr *bad_wr;
1561 struct ibv_sge sge = {
1562 .addr = (uint64_t)(wr->control),
1563 .length = head->len + sizeof(RDMAControlHeader),
1564 .lkey = wr->control_mr->lkey,
1565 };
1566 struct ibv_send_wr send_wr = {
1567 .wr_id = RDMA_WRID_SEND_CONTROL,
1568 .opcode = IBV_WR_SEND,
1569 .send_flags = IBV_SEND_SIGNALED,
1570 .sg_list = &sge,
1571 .num_sge = 1,
1572 };
1573
1574 DDDPRINTF("CONTROL: sending %s..\n", control_desc[head->type]);
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1585 memcpy(wr->control, head, sizeof(RDMAControlHeader));
1586 control_to_network((void *) wr->control);
1587
1588 if (buf) {
1589 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1590 }
1591
1592
1593 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1594
1595 if (ret > 0) {
1596 fprintf(stderr, "Failed to use post IB SEND for control!\n");
1597 return -ret;
1598 }
1599
1600 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1601 if (ret < 0) {
1602 fprintf(stderr, "rdma migration: send polling control error!\n");
1603 }
1604
1605 return ret;
1606}
1607
1608
1609
1610
1611
1612static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1613{
1614 struct ibv_recv_wr *bad_wr;
1615 struct ibv_sge sge = {
1616 .addr = (uint64_t)(rdma->wr_data[idx].control),
1617 .length = RDMA_CONTROL_MAX_BUFFER,
1618 .lkey = rdma->wr_data[idx].control_mr->lkey,
1619 };
1620
1621 struct ibv_recv_wr recv_wr = {
1622 .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1623 .sg_list = &sge,
1624 .num_sge = 1,
1625 };
1626
1627
1628 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1629 return -1;
1630 }
1631
1632 return 0;
1633}
1634
1635
1636
1637
1638static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1639 RDMAControlHeader *head, int expecting, int idx)
1640{
1641 uint32_t byte_len;
1642 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1643 &byte_len);
1644
1645 if (ret < 0) {
1646 fprintf(stderr, "rdma migration: recv polling control error!\n");
1647 return ret;
1648 }
1649
1650 network_to_control((void *) rdma->wr_data[idx].control);
1651 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1652
1653 DDDPRINTF("CONTROL: %s receiving...\n", control_desc[expecting]);
1654
1655 if (expecting == RDMA_CONTROL_NONE) {
1656 DDDPRINTF("Surprise: got %s (%d)\n",
1657 control_desc[head->type], head->type);
1658 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1659 fprintf(stderr, "Was expecting a %s (%d) control message"
1660 ", but got: %s (%d), length: %d\n",
1661 control_desc[expecting], expecting,
1662 control_desc[head->type], head->type, head->len);
1663 return -EIO;
1664 }
1665 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1666 fprintf(stderr, "too long length: %d\n", head->len);
1667 return -EINVAL;
1668 }
1669 if (sizeof(*head) + head->len != byte_len) {
1670 fprintf(stderr, "Malformed length: %d byte_len %d\n",
1671 head->len, byte_len);
1672 return -EINVAL;
1673 }
1674
1675 return 0;
1676}
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1687 RDMAControlHeader *head)
1688{
1689 rdma->wr_data[idx].control_len = head->len;
1690 rdma->wr_data[idx].control_curr =
1691 rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1692}
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1708 uint8_t *data, RDMAControlHeader *resp,
1709 int *resp_idx,
1710 int (*callback)(RDMAContext *rdma))
1711{
1712 int ret = 0;
1713
1714
1715
1716
1717
1718 if (rdma->control_ready_expected) {
1719 RDMAControlHeader resp;
1720 ret = qemu_rdma_exchange_get_response(rdma,
1721 &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1722 if (ret < 0) {
1723 return ret;
1724 }
1725 }
1726
1727
1728
1729
1730 if (resp) {
1731 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1732 if (ret) {
1733 fprintf(stderr, "rdma migration: error posting"
1734 " extra control recv for anticipated result!");
1735 return ret;
1736 }
1737 }
1738
1739
1740
1741
1742 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1743 if (ret) {
1744 fprintf(stderr, "rdma migration: error posting first control recv!");
1745 return ret;
1746 }
1747
1748
1749
1750
1751 ret = qemu_rdma_post_send_control(rdma, data, head);
1752
1753 if (ret < 0) {
1754 fprintf(stderr, "Failed to send control buffer!\n");
1755 return ret;
1756 }
1757
1758
1759
1760
1761 if (resp) {
1762 if (callback) {
1763 DDPRINTF("Issuing callback before receiving response...\n");
1764 ret = callback(rdma);
1765 if (ret < 0) {
1766 return ret;
1767 }
1768 }
1769
1770 DDPRINTF("Waiting for response %s\n", control_desc[resp->type]);
1771 ret = qemu_rdma_exchange_get_response(rdma, resp,
1772 resp->type, RDMA_WRID_DATA);
1773
1774 if (ret < 0) {
1775 return ret;
1776 }
1777
1778 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1779 if (resp_idx) {
1780 *resp_idx = RDMA_WRID_DATA;
1781 }
1782 DDPRINTF("Response %s received.\n", control_desc[resp->type]);
1783 }
1784
1785 rdma->control_ready_expected = 1;
1786
1787 return 0;
1788}
1789
1790
1791
1792
1793
1794static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1795 int expecting)
1796{
1797 RDMAControlHeader ready = {
1798 .len = 0,
1799 .type = RDMA_CONTROL_READY,
1800 .repeat = 1,
1801 };
1802 int ret;
1803
1804
1805
1806
1807 ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1808
1809 if (ret < 0) {
1810 fprintf(stderr, "Failed to send control buffer!\n");
1811 return ret;
1812 }
1813
1814
1815
1816
1817 ret = qemu_rdma_exchange_get_response(rdma, head,
1818 expecting, RDMA_WRID_READY);
1819
1820 if (ret < 0) {
1821 return ret;
1822 }
1823
1824 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1825
1826
1827
1828
1829 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1830 if (ret) {
1831 fprintf(stderr, "rdma migration: error posting second control recv!");
1832 return ret;
1833 }
1834
1835 return 0;
1836}
1837
1838
1839
1840
1841
1842
1843
1844static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
1845 int current_index, uint64_t current_addr,
1846 uint64_t length)
1847{
1848 struct ibv_sge sge;
1849 struct ibv_send_wr send_wr = { 0 };
1850 struct ibv_send_wr *bad_wr;
1851 int reg_result_idx, ret, count = 0;
1852 uint64_t chunk, chunks;
1853 uint8_t *chunk_start, *chunk_end;
1854 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
1855 RDMARegister reg;
1856 RDMARegisterResult *reg_result;
1857 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
1858 RDMAControlHeader head = { .len = sizeof(RDMARegister),
1859 .type = RDMA_CONTROL_REGISTER_REQUEST,
1860 .repeat = 1,
1861 };
1862
1863retry:
1864 sge.addr = (uint64_t)(block->local_host_addr +
1865 (current_addr - block->offset));
1866 sge.length = length;
1867
1868 chunk = ram_chunk_index(block->local_host_addr, (uint8_t *) sge.addr);
1869 chunk_start = ram_chunk_start(block, chunk);
1870
1871 if (block->is_ram_block) {
1872 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
1873
1874 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1875 chunks--;
1876 }
1877 } else {
1878 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
1879
1880 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1881 chunks--;
1882 }
1883 }
1884
1885 DDPRINTF("Writing %" PRIu64 " chunks, (%" PRIu64 " MB)\n",
1886 chunks + 1, (chunks + 1) * (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
1887
1888 chunk_end = ram_chunk_end(block, chunk + chunks);
1889
1890 if (!rdma->pin_all) {
1891#ifdef RDMA_UNREGISTRATION_EXAMPLE
1892 qemu_rdma_unregister_waiting(rdma);
1893#endif
1894 }
1895
1896 while (test_bit(chunk, block->transit_bitmap)) {
1897 (void)count;
1898 DDPRINTF("(%d) Not clobbering: block: %d chunk %" PRIu64
1899 " current %" PRIu64 " len %" PRIu64 " %d %d\n",
1900 count++, current_index, chunk,
1901 sge.addr, length, rdma->nb_sent, block->nb_chunks);
1902
1903 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
1904
1905 if (ret < 0) {
1906 fprintf(stderr, "Failed to Wait for previous write to complete "
1907 "block %d chunk %" PRIu64
1908 " current %" PRIu64 " len %" PRIu64 " %d\n",
1909 current_index, chunk, sge.addr, length, rdma->nb_sent);
1910 return ret;
1911 }
1912 }
1913
1914 if (!rdma->pin_all || !block->is_ram_block) {
1915 if (!block->remote_keys[chunk]) {
1916
1917
1918
1919
1920
1921
1922 if (can_use_buffer_find_nonzero_offset((void *)sge.addr, length)
1923 && buffer_find_nonzero_offset((void *)sge.addr,
1924 length) == length) {
1925 RDMACompress comp = {
1926 .offset = current_addr,
1927 .value = 0,
1928 .block_idx = current_index,
1929 .length = length,
1930 };
1931
1932 head.len = sizeof(comp);
1933 head.type = RDMA_CONTROL_COMPRESS;
1934
1935 DDPRINTF("Entire chunk is zero, sending compress: %"
1936 PRIu64 " for %d "
1937 "bytes, index: %d, offset: %" PRId64 "...\n",
1938 chunk, sge.length, current_index, current_addr);
1939
1940 compress_to_network(&comp);
1941 ret = qemu_rdma_exchange_send(rdma, &head,
1942 (uint8_t *) &comp, NULL, NULL, NULL);
1943
1944 if (ret < 0) {
1945 return -EIO;
1946 }
1947
1948 acct_update_position(f, sge.length, true);
1949
1950 return 1;
1951 }
1952
1953
1954
1955
1956 reg.current_index = current_index;
1957 if (block->is_ram_block) {
1958 reg.key.current_addr = current_addr;
1959 } else {
1960 reg.key.chunk = chunk;
1961 }
1962 reg.chunks = chunks;
1963
1964 DDPRINTF("Sending registration request chunk %" PRIu64 " for %d "
1965 "bytes, index: %d, offset: %" PRId64 "...\n",
1966 chunk, sge.length, current_index, current_addr);
1967
1968 register_to_network(®);
1969 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®,
1970 &resp, ®_result_idx, NULL);
1971 if (ret < 0) {
1972 return ret;
1973 }
1974
1975
1976 if (qemu_rdma_register_and_get_keys(rdma, block,
1977 (uint8_t *) sge.addr,
1978 &sge.lkey, NULL, chunk,
1979 chunk_start, chunk_end)) {
1980 fprintf(stderr, "cannot get lkey!\n");
1981 return -EINVAL;
1982 }
1983
1984 reg_result = (RDMARegisterResult *)
1985 rdma->wr_data[reg_result_idx].control_curr;
1986
1987 network_to_result(reg_result);
1988
1989 DDPRINTF("Received registration result:"
1990 " my key: %x their key %x, chunk %" PRIu64 "\n",
1991 block->remote_keys[chunk], reg_result->rkey, chunk);
1992
1993 block->remote_keys[chunk] = reg_result->rkey;
1994 block->remote_host_addr = reg_result->host_addr;
1995 } else {
1996
1997 if (qemu_rdma_register_and_get_keys(rdma, block,
1998 (uint8_t *)sge.addr,
1999 &sge.lkey, NULL, chunk,
2000 chunk_start, chunk_end)) {
2001 fprintf(stderr, "cannot get lkey!\n");
2002 return -EINVAL;
2003 }
2004 }
2005
2006 send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2007 } else {
2008 send_wr.wr.rdma.rkey = block->remote_rkey;
2009
2010 if (qemu_rdma_register_and_get_keys(rdma, block, (uint8_t *)sge.addr,
2011 &sge.lkey, NULL, chunk,
2012 chunk_start, chunk_end)) {
2013 fprintf(stderr, "cannot get lkey!\n");
2014 return -EINVAL;
2015 }
2016 }
2017
2018
2019
2020
2021
2022
2023
2024 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2025 current_index, chunk);
2026
2027 send_wr.opcode = IBV_WR_RDMA_WRITE;
2028 send_wr.send_flags = IBV_SEND_SIGNALED;
2029 send_wr.sg_list = &sge;
2030 send_wr.num_sge = 1;
2031 send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2032 (current_addr - block->offset);
2033
2034 DDDPRINTF("Posting chunk: %" PRIu64 ", addr: %lx"
2035 " remote: %lx, bytes %" PRIu32 "\n",
2036 chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2037 sge.length);
2038
2039
2040
2041
2042
2043 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2044
2045 if (ret == ENOMEM) {
2046 DDPRINTF("send queue is full. wait a little....\n");
2047 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2048 if (ret < 0) {
2049 fprintf(stderr, "rdma migration: failed to make "
2050 "room in full send queue! %d\n", ret);
2051 return ret;
2052 }
2053
2054 goto retry;
2055
2056 } else if (ret > 0) {
2057 perror("rdma migration: post rdma write failed");
2058 return -ret;
2059 }
2060
2061 set_bit(chunk, block->transit_bitmap);
2062 acct_update_position(f, sge.length, false);
2063 rdma->total_writes++;
2064
2065 return 0;
2066}
2067
2068
2069
2070
2071
2072
2073
2074static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
2075{
2076 int ret;
2077
2078 if (!rdma->current_length) {
2079 return 0;
2080 }
2081
2082 ret = qemu_rdma_write_one(f, rdma,
2083 rdma->current_index, rdma->current_addr, rdma->current_length);
2084
2085 if (ret < 0) {
2086 return ret;
2087 }
2088
2089 if (ret == 0) {
2090 rdma->nb_sent++;
2091 DDDPRINTF("sent total: %d\n", rdma->nb_sent);
2092 }
2093
2094 rdma->current_length = 0;
2095 rdma->current_addr = 0;
2096
2097 return 0;
2098}
2099
2100static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
2101 uint64_t offset, uint64_t len)
2102{
2103 RDMALocalBlock *block;
2104 uint8_t *host_addr;
2105 uint8_t *chunk_end;
2106
2107 if (rdma->current_index < 0) {
2108 return 0;
2109 }
2110
2111 if (rdma->current_chunk < 0) {
2112 return 0;
2113 }
2114
2115 block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2116 host_addr = block->local_host_addr + (offset - block->offset);
2117 chunk_end = ram_chunk_end(block, rdma->current_chunk);
2118
2119 if (rdma->current_length == 0) {
2120 return 0;
2121 }
2122
2123
2124
2125
2126 if (offset != (rdma->current_addr + rdma->current_length)) {
2127 return 0;
2128 }
2129
2130 if (offset < block->offset) {
2131 return 0;
2132 }
2133
2134 if ((offset + len) > (block->offset + block->length)) {
2135 return 0;
2136 }
2137
2138 if ((host_addr + len) > chunk_end) {
2139 return 0;
2140 }
2141
2142 return 1;
2143}
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
2156 uint64_t block_offset, uint64_t offset,
2157 uint64_t len)
2158{
2159 uint64_t current_addr = block_offset + offset;
2160 uint64_t index = rdma->current_index;
2161 uint64_t chunk = rdma->current_chunk;
2162 int ret;
2163
2164
2165 if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
2166 ret = qemu_rdma_write_flush(f, rdma);
2167 if (ret) {
2168 return ret;
2169 }
2170 rdma->current_length = 0;
2171 rdma->current_addr = current_addr;
2172
2173 ret = qemu_rdma_search_ram_block(rdma, block_offset,
2174 offset, len, &index, &chunk);
2175 if (ret) {
2176 fprintf(stderr, "ram block search failed\n");
2177 return ret;
2178 }
2179 rdma->current_index = index;
2180 rdma->current_chunk = chunk;
2181 }
2182
2183
2184 rdma->current_length += len;
2185
2186
2187 if (rdma->current_length >= RDMA_MERGE_MAX) {
2188 return qemu_rdma_write_flush(f, rdma);
2189 }
2190
2191 return 0;
2192}
2193
2194static void qemu_rdma_cleanup(RDMAContext *rdma)
2195{
2196 struct rdma_cm_event *cm_event;
2197 int ret, idx;
2198
2199 if (rdma->cm_id && rdma->connected) {
2200 if (rdma->error_state) {
2201 RDMAControlHeader head = { .len = 0,
2202 .type = RDMA_CONTROL_ERROR,
2203 .repeat = 1,
2204 };
2205 fprintf(stderr, "Early error. Sending error.\n");
2206 qemu_rdma_post_send_control(rdma, NULL, &head);
2207 }
2208
2209 ret = rdma_disconnect(rdma->cm_id);
2210 if (!ret) {
2211 DDPRINTF("waiting for disconnect\n");
2212 ret = rdma_get_cm_event(rdma->channel, &cm_event);
2213 if (!ret) {
2214 rdma_ack_cm_event(cm_event);
2215 }
2216 }
2217 DDPRINTF("Disconnected.\n");
2218 rdma->connected = false;
2219 }
2220
2221 g_free(rdma->block);
2222 rdma->block = NULL;
2223
2224 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2225 if (rdma->wr_data[idx].control_mr) {
2226 rdma->total_registrations--;
2227 ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2228 }
2229 rdma->wr_data[idx].control_mr = NULL;
2230 }
2231
2232 if (rdma->local_ram_blocks.block) {
2233 while (rdma->local_ram_blocks.nb_blocks) {
2234 __qemu_rdma_delete_block(rdma,
2235 rdma->local_ram_blocks.block->offset);
2236 }
2237 }
2238
2239 if (rdma->cq) {
2240 ibv_destroy_cq(rdma->cq);
2241 rdma->cq = NULL;
2242 }
2243 if (rdma->comp_channel) {
2244 ibv_destroy_comp_channel(rdma->comp_channel);
2245 rdma->comp_channel = NULL;
2246 }
2247 if (rdma->pd) {
2248 ibv_dealloc_pd(rdma->pd);
2249 rdma->pd = NULL;
2250 }
2251 if (rdma->listen_id) {
2252 rdma_destroy_id(rdma->listen_id);
2253 rdma->listen_id = NULL;
2254 }
2255 if (rdma->cm_id) {
2256 if (rdma->qp) {
2257 rdma_destroy_qp(rdma->cm_id);
2258 rdma->qp = NULL;
2259 }
2260 rdma_destroy_id(rdma->cm_id);
2261 rdma->cm_id = NULL;
2262 }
2263 if (rdma->channel) {
2264 rdma_destroy_event_channel(rdma->channel);
2265 rdma->channel = NULL;
2266 }
2267 g_free(rdma->host);
2268 rdma->host = NULL;
2269}
2270
2271
2272static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
2273{
2274 int ret, idx;
2275 Error *local_err = NULL, **temp = &local_err;
2276
2277
2278
2279
2280
2281 rdma->pin_all = pin_all;
2282
2283 ret = qemu_rdma_resolve_host(rdma, temp);
2284 if (ret) {
2285 goto err_rdma_source_init;
2286 }
2287
2288 ret = qemu_rdma_alloc_pd_cq(rdma);
2289 if (ret) {
2290 ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2291 " limits may be too low. Please check $ ulimit -a # and "
2292 "search for 'ulimit -l' in the output");
2293 goto err_rdma_source_init;
2294 }
2295
2296 ret = qemu_rdma_alloc_qp(rdma);
2297 if (ret) {
2298 ERROR(temp, "rdma migration: error allocating qp!");
2299 goto err_rdma_source_init;
2300 }
2301
2302 ret = qemu_rdma_init_ram_blocks(rdma);
2303 if (ret) {
2304 ERROR(temp, "rdma migration: error initializing ram blocks!");
2305 goto err_rdma_source_init;
2306 }
2307
2308 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2309 ret = qemu_rdma_reg_control(rdma, idx);
2310 if (ret) {
2311 ERROR(temp, "rdma migration: error registering %d control!",
2312 idx);
2313 goto err_rdma_source_init;
2314 }
2315 }
2316
2317 return 0;
2318
2319err_rdma_source_init:
2320 error_propagate(errp, local_err);
2321 qemu_rdma_cleanup(rdma);
2322 return -1;
2323}
2324
2325static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
2326{
2327 RDMACapabilities cap = {
2328 .version = RDMA_CONTROL_VERSION_CURRENT,
2329 .flags = 0,
2330 };
2331 struct rdma_conn_param conn_param = { .initiator_depth = 2,
2332 .retry_count = 5,
2333 .private_data = &cap,
2334 .private_data_len = sizeof(cap),
2335 };
2336 struct rdma_cm_event *cm_event;
2337 int ret;
2338
2339
2340
2341
2342
2343 if (rdma->pin_all) {
2344 DPRINTF("Server pin-all memory requested.\n");
2345 cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2346 }
2347
2348 caps_to_network(&cap);
2349
2350 ret = rdma_connect(rdma->cm_id, &conn_param);
2351 if (ret) {
2352 perror("rdma_connect");
2353 ERROR(errp, "connecting to destination!");
2354 rdma_destroy_id(rdma->cm_id);
2355 rdma->cm_id = NULL;
2356 goto err_rdma_source_connect;
2357 }
2358
2359 ret = rdma_get_cm_event(rdma->channel, &cm_event);
2360 if (ret) {
2361 perror("rdma_get_cm_event after rdma_connect");
2362 ERROR(errp, "connecting to destination!");
2363 rdma_ack_cm_event(cm_event);
2364 rdma_destroy_id(rdma->cm_id);
2365 rdma->cm_id = NULL;
2366 goto err_rdma_source_connect;
2367 }
2368
2369 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2370 perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
2371 ERROR(errp, "connecting to destination!");
2372 rdma_ack_cm_event(cm_event);
2373 rdma_destroy_id(rdma->cm_id);
2374 rdma->cm_id = NULL;
2375 goto err_rdma_source_connect;
2376 }
2377 rdma->connected = true;
2378
2379 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2380 network_to_caps(&cap);
2381
2382
2383
2384
2385
2386 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2387 ERROR(errp, "Server cannot support pinning all memory. "
2388 "Will register memory dynamically.");
2389 rdma->pin_all = false;
2390 }
2391
2392 DPRINTF("Pin all memory: %s\n", rdma->pin_all ? "enabled" : "disabled");
2393
2394 rdma_ack_cm_event(cm_event);
2395
2396 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2397 if (ret) {
2398 ERROR(errp, "posting second control recv!");
2399 goto err_rdma_source_connect;
2400 }
2401
2402 rdma->control_ready_expected = 1;
2403 rdma->nb_sent = 0;
2404 return 0;
2405
2406err_rdma_source_connect:
2407 qemu_rdma_cleanup(rdma);
2408 return -1;
2409}
2410
2411static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2412{
2413 int ret = -EINVAL, idx;
2414 struct rdma_cm_id *listen_id;
2415 char ip[40] = "unknown";
2416 struct rdma_addrinfo *res;
2417 char port_str[16];
2418
2419 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2420 rdma->wr_data[idx].control_len = 0;
2421 rdma->wr_data[idx].control_curr = NULL;
2422 }
2423
2424 if (rdma->host == NULL) {
2425 ERROR(errp, "RDMA host is not set!");
2426 rdma->error_state = -EINVAL;
2427 return -1;
2428 }
2429
2430 rdma->channel = rdma_create_event_channel();
2431 if (!rdma->channel) {
2432 ERROR(errp, "could not create rdma event channel");
2433 rdma->error_state = -EINVAL;
2434 return -1;
2435 }
2436
2437
2438 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2439 if (ret) {
2440 ERROR(errp, "could not create cm_id!");
2441 goto err_dest_init_create_listen_id;
2442 }
2443
2444 snprintf(port_str, 16, "%d", rdma->port);
2445 port_str[15] = '\0';
2446
2447 if (rdma->host && strcmp("", rdma->host)) {
2448 struct rdma_addrinfo *e;
2449
2450 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2451 if (ret < 0) {
2452 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
2453 goto err_dest_init_bind_addr;
2454 }
2455
2456 for (e = res; e != NULL; e = e->ai_next) {
2457 inet_ntop(e->ai_family,
2458 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2459 DPRINTF("Trying %s => %s\n", rdma->host, ip);
2460 ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2461 if (!ret) {
2462 if (e->ai_family == AF_INET6) {
2463 ret = qemu_rdma_broken_ipv6_kernel(errp, listen_id->verbs);
2464 if (ret) {
2465 continue;
2466 }
2467 }
2468
2469 goto listen;
2470 }
2471 }
2472
2473 ERROR(errp, "Error: could not rdma_bind_addr!");
2474 goto err_dest_init_bind_addr;
2475 } else {
2476 ERROR(errp, "migration host and port not specified!");
2477 ret = -EINVAL;
2478 goto err_dest_init_bind_addr;
2479 }
2480listen:
2481
2482 rdma->listen_id = listen_id;
2483 qemu_rdma_dump_gid("dest_init", listen_id);
2484 return 0;
2485
2486err_dest_init_bind_addr:
2487 rdma_destroy_id(listen_id);
2488err_dest_init_create_listen_id:
2489 rdma_destroy_event_channel(rdma->channel);
2490 rdma->channel = NULL;
2491 rdma->error_state = ret;
2492 return ret;
2493
2494}
2495
2496static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2497{
2498 RDMAContext *rdma = NULL;
2499 InetSocketAddress *addr;
2500
2501 if (host_port) {
2502 rdma = g_malloc0(sizeof(RDMAContext));
2503 memset(rdma, 0, sizeof(RDMAContext));
2504 rdma->current_index = -1;
2505 rdma->current_chunk = -1;
2506
2507 addr = inet_parse(host_port, NULL);
2508 if (addr != NULL) {
2509 rdma->port = atoi(addr->port);
2510 rdma->host = g_strdup(addr->host);
2511 } else {
2512 ERROR(errp, "bad RDMA migration address '%s'", host_port);
2513 g_free(rdma);
2514 rdma = NULL;
2515 }
2516
2517 qapi_free_InetSocketAddress(addr);
2518 }
2519
2520 return rdma;
2521}
2522
2523
2524
2525
2526
2527
2528static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
2529 int64_t pos, int size)
2530{
2531 QEMUFileRDMA *r = opaque;
2532 QEMUFile *f = r->file;
2533 RDMAContext *rdma = r->rdma;
2534 size_t remaining = size;
2535 uint8_t * data = (void *) buf;
2536 int ret;
2537
2538 CHECK_ERROR_STATE();
2539
2540
2541
2542
2543
2544 ret = qemu_rdma_write_flush(f, rdma);
2545 if (ret < 0) {
2546 rdma->error_state = ret;
2547 return ret;
2548 }
2549
2550 while (remaining) {
2551 RDMAControlHeader head;
2552
2553 r->len = MIN(remaining, RDMA_SEND_INCREMENT);
2554 remaining -= r->len;
2555
2556 head.len = r->len;
2557 head.type = RDMA_CONTROL_QEMU_FILE;
2558
2559 ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2560
2561 if (ret < 0) {
2562 rdma->error_state = ret;
2563 return ret;
2564 }
2565
2566 data += r->len;
2567 }
2568
2569 return size;
2570}
2571
2572static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2573 int size, int idx)
2574{
2575 size_t len = 0;
2576
2577 if (rdma->wr_data[idx].control_len) {
2578 DDDPRINTF("RDMA %" PRId64 " of %d bytes already in buffer\n",
2579 rdma->wr_data[idx].control_len, size);
2580
2581 len = MIN(size, rdma->wr_data[idx].control_len);
2582 memcpy(buf, rdma->wr_data[idx].control_curr, len);
2583 rdma->wr_data[idx].control_curr += len;
2584 rdma->wr_data[idx].control_len -= len;
2585 }
2586
2587 return len;
2588}
2589
2590
2591
2592
2593
2594
2595static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
2596 int64_t pos, int size)
2597{
2598 QEMUFileRDMA *r = opaque;
2599 RDMAContext *rdma = r->rdma;
2600 RDMAControlHeader head;
2601 int ret = 0;
2602
2603 CHECK_ERROR_STATE();
2604
2605
2606
2607
2608
2609
2610 r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
2611 if (r->len) {
2612 return r->len;
2613 }
2614
2615
2616
2617
2618
2619 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2620
2621 if (ret < 0) {
2622 rdma->error_state = ret;
2623 return ret;
2624 }
2625
2626
2627
2628
2629 return qemu_rdma_fill(r->rdma, buf, size, 0);
2630}
2631
2632
2633
2634
2635static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2636{
2637 int ret;
2638
2639 if (qemu_rdma_write_flush(f, rdma) < 0) {
2640 return -EIO;
2641 }
2642
2643 while (rdma->nb_sent) {
2644 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2645 if (ret < 0) {
2646 fprintf(stderr, "rdma migration: complete polling error!\n");
2647 return -EIO;
2648 }
2649 }
2650
2651 qemu_rdma_unregister_waiting(rdma);
2652
2653 return 0;
2654}
2655
2656static int qemu_rdma_close(void *opaque)
2657{
2658 DPRINTF("Shutting down connection.\n");
2659 QEMUFileRDMA *r = opaque;
2660 if (r->rdma) {
2661 qemu_rdma_cleanup(r->rdma);
2662 g_free(r->rdma);
2663 }
2664 g_free(r);
2665 return 0;
2666}
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
2703 ram_addr_t block_offset, ram_addr_t offset,
2704 size_t size, int *bytes_sent)
2705{
2706 QEMUFileRDMA *rfile = opaque;
2707 RDMAContext *rdma = rfile->rdma;
2708 int ret;
2709
2710 CHECK_ERROR_STATE();
2711
2712 qemu_fflush(f);
2713
2714 if (size > 0) {
2715
2716
2717
2718
2719
2720 ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
2721 if (ret < 0) {
2722 fprintf(stderr, "rdma migration: write error! %d\n", ret);
2723 goto err;
2724 }
2725
2726
2727
2728
2729
2730
2731
2732
2733
2734 if (bytes_sent) {
2735 *bytes_sent = 1;
2736 }
2737 } else {
2738 uint64_t index, chunk;
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750
2751 ret = qemu_rdma_search_ram_block(rdma, block_offset,
2752 offset, size, &index, &chunk);
2753
2754 if (ret) {
2755 fprintf(stderr, "ram block search failed\n");
2756 goto err;
2757 }
2758
2759 qemu_rdma_signal_unregister(rdma, index, chunk, 0);
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769 }
2770
2771
2772
2773
2774
2775
2776
2777
2778 while (1) {
2779 uint64_t wr_id, wr_id_in;
2780 int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
2781 if (ret < 0) {
2782 fprintf(stderr, "rdma migration: polling error! %d\n", ret);
2783 goto err;
2784 }
2785
2786 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
2787
2788 if (wr_id == RDMA_WRID_NONE) {
2789 break;
2790 }
2791 }
2792
2793 return RAM_SAVE_CONTROL_DELAYED;
2794err:
2795 rdma->error_state = ret;
2796 return ret;
2797}
2798
2799static int qemu_rdma_accept(RDMAContext *rdma)
2800{
2801 RDMACapabilities cap;
2802 struct rdma_conn_param conn_param = {
2803 .responder_resources = 2,
2804 .private_data = &cap,
2805 .private_data_len = sizeof(cap),
2806 };
2807 struct rdma_cm_event *cm_event;
2808 struct ibv_context *verbs;
2809 int ret = -EINVAL;
2810 int idx;
2811
2812 ret = rdma_get_cm_event(rdma->channel, &cm_event);
2813 if (ret) {
2814 goto err_rdma_dest_wait;
2815 }
2816
2817 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
2818 rdma_ack_cm_event(cm_event);
2819 goto err_rdma_dest_wait;
2820 }
2821
2822 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2823
2824 network_to_caps(&cap);
2825
2826 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
2827 fprintf(stderr, "Unknown source RDMA version: %d, bailing...\n",
2828 cap.version);
2829 rdma_ack_cm_event(cm_event);
2830 goto err_rdma_dest_wait;
2831 }
2832
2833
2834
2835
2836 cap.flags &= known_capabilities;
2837
2838
2839
2840
2841
2842 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
2843 rdma->pin_all = true;
2844 }
2845
2846 rdma->cm_id = cm_event->id;
2847 verbs = cm_event->id->verbs;
2848
2849 rdma_ack_cm_event(cm_event);
2850
2851 DPRINTF("Memory pin all: %s\n", rdma->pin_all ? "enabled" : "disabled");
2852
2853 caps_to_network(&cap);
2854
2855 DPRINTF("verbs context after listen: %p\n", verbs);
2856
2857 if (!rdma->verbs) {
2858 rdma->verbs = verbs;
2859 } else if (rdma->verbs != verbs) {
2860 fprintf(stderr, "ibv context not matching %p, %p!\n",
2861 rdma->verbs, verbs);
2862 goto err_rdma_dest_wait;
2863 }
2864
2865 qemu_rdma_dump_id("dest_init", verbs);
2866
2867 ret = qemu_rdma_alloc_pd_cq(rdma);
2868 if (ret) {
2869 fprintf(stderr, "rdma migration: error allocating pd and cq!\n");
2870 goto err_rdma_dest_wait;
2871 }
2872
2873 ret = qemu_rdma_alloc_qp(rdma);
2874 if (ret) {
2875 fprintf(stderr, "rdma migration: error allocating qp!\n");
2876 goto err_rdma_dest_wait;
2877 }
2878
2879 ret = qemu_rdma_init_ram_blocks(rdma);
2880 if (ret) {
2881 fprintf(stderr, "rdma migration: error initializing ram blocks!\n");
2882 goto err_rdma_dest_wait;
2883 }
2884
2885 for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2886 ret = qemu_rdma_reg_control(rdma, idx);
2887 if (ret) {
2888 fprintf(stderr, "rdma: error registering %d control!\n", idx);
2889 goto err_rdma_dest_wait;
2890 }
2891 }
2892
2893 qemu_set_fd_handler2(rdma->channel->fd, NULL, NULL, NULL, NULL);
2894
2895 ret = rdma_accept(rdma->cm_id, &conn_param);
2896 if (ret) {
2897 fprintf(stderr, "rdma_accept returns %d!\n", ret);
2898 goto err_rdma_dest_wait;
2899 }
2900
2901 ret = rdma_get_cm_event(rdma->channel, &cm_event);
2902 if (ret) {
2903 fprintf(stderr, "rdma_accept get_cm_event failed %d!\n", ret);
2904 goto err_rdma_dest_wait;
2905 }
2906
2907 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2908 fprintf(stderr, "rdma_accept not event established!\n");
2909 rdma_ack_cm_event(cm_event);
2910 goto err_rdma_dest_wait;
2911 }
2912
2913 rdma_ack_cm_event(cm_event);
2914 rdma->connected = true;
2915
2916 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2917 if (ret) {
2918 fprintf(stderr, "rdma migration: error posting second control recv!\n");
2919 goto err_rdma_dest_wait;
2920 }
2921
2922 qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
2923
2924 return 0;
2925
2926err_rdma_dest_wait:
2927 rdma->error_state = ret;
2928 qemu_rdma_cleanup(rdma);
2929 return ret;
2930}
2931
2932
2933
2934
2935
2936
2937
2938
2939
2940
2941static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
2942 uint64_t flags)
2943{
2944 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
2945 .type = RDMA_CONTROL_REGISTER_RESULT,
2946 .repeat = 0,
2947 };
2948 RDMAControlHeader unreg_resp = { .len = 0,
2949 .type = RDMA_CONTROL_UNREGISTER_FINISHED,
2950 .repeat = 0,
2951 };
2952 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
2953 .repeat = 1 };
2954 QEMUFileRDMA *rfile = opaque;
2955 RDMAContext *rdma = rfile->rdma;
2956 RDMALocalBlocks *local = &rdma->local_ram_blocks;
2957 RDMAControlHeader head;
2958 RDMARegister *reg, *registers;
2959 RDMACompress *comp;
2960 RDMARegisterResult *reg_result;
2961 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
2962 RDMALocalBlock *block;
2963 void *host_addr;
2964 int ret = 0;
2965 int idx = 0;
2966 int count = 0;
2967 int i = 0;
2968
2969 CHECK_ERROR_STATE();
2970
2971 do {
2972 DDDPRINTF("Waiting for next request %" PRIu64 "...\n", flags);
2973
2974 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
2975
2976 if (ret < 0) {
2977 break;
2978 }
2979
2980 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
2981 fprintf(stderr, "rdma: Too many requests in this message (%d)."
2982 "Bailing.\n", head.repeat);
2983 ret = -EIO;
2984 break;
2985 }
2986
2987 switch (head.type) {
2988 case RDMA_CONTROL_COMPRESS:
2989 comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
2990 network_to_compress(comp);
2991
2992 DDPRINTF("Zapping zero chunk: %" PRId64
2993 " bytes, index %d, offset %" PRId64 "\n",
2994 comp->length, comp->block_idx, comp->offset);
2995 block = &(rdma->local_ram_blocks.block[comp->block_idx]);
2996
2997 host_addr = block->local_host_addr +
2998 (comp->offset - block->offset);
2999
3000 ram_handle_compressed(host_addr, comp->value, comp->length);
3001 break;
3002
3003 case RDMA_CONTROL_REGISTER_FINISHED:
3004 DDDPRINTF("Current registrations complete.\n");
3005 goto out;
3006
3007 case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3008 DPRINTF("Initial setup info requested.\n");
3009
3010 if (rdma->pin_all) {
3011 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
3012 if (ret) {
3013 fprintf(stderr, "rdma migration: error dest "
3014 "registering ram blocks!\n");
3015 goto out;
3016 }
3017 }
3018
3019
3020
3021
3022
3023
3024
3025 for (i = 0; i < local->nb_blocks; i++) {
3026 rdma->block[i].remote_host_addr =
3027 (uint64_t)(local->block[i].local_host_addr);
3028
3029 if (rdma->pin_all) {
3030 rdma->block[i].remote_rkey = local->block[i].mr->rkey;
3031 }
3032
3033 rdma->block[i].offset = local->block[i].offset;
3034 rdma->block[i].length = local->block[i].length;
3035
3036 remote_block_to_network(&rdma->block[i]);
3037 }
3038
3039 blocks.len = rdma->local_ram_blocks.nb_blocks
3040 * sizeof(RDMARemoteBlock);
3041
3042
3043 ret = qemu_rdma_post_send_control(rdma,
3044 (uint8_t *) rdma->block, &blocks);
3045
3046 if (ret < 0) {
3047 fprintf(stderr, "rdma migration: error sending remote info!\n");
3048 goto out;
3049 }
3050
3051 break;
3052 case RDMA_CONTROL_REGISTER_REQUEST:
3053 DDPRINTF("There are %d registration requests\n", head.repeat);
3054
3055 reg_resp.repeat = head.repeat;
3056 registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3057
3058 for (count = 0; count < head.repeat; count++) {
3059 uint64_t chunk;
3060 uint8_t *chunk_start, *chunk_end;
3061
3062 reg = ®isters[count];
3063 network_to_register(reg);
3064
3065 reg_result = &results[count];
3066
3067 DDPRINTF("Registration request (%d): index %d, current_addr %"
3068 PRIu64 " chunks: %" PRIu64 "\n", count,
3069 reg->current_index, reg->key.current_addr, reg->chunks);
3070
3071 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3072 if (block->is_ram_block) {
3073 host_addr = (block->local_host_addr +
3074 (reg->key.current_addr - block->offset));
3075 chunk = ram_chunk_index(block->local_host_addr,
3076 (uint8_t *) host_addr);
3077 } else {
3078 chunk = reg->key.chunk;
3079 host_addr = block->local_host_addr +
3080 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3081 }
3082 chunk_start = ram_chunk_start(block, chunk);
3083 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3084 if (qemu_rdma_register_and_get_keys(rdma, block,
3085 (uint8_t *)host_addr, NULL, ®_result->rkey,
3086 chunk, chunk_start, chunk_end)) {
3087 fprintf(stderr, "cannot get rkey!\n");
3088 ret = -EINVAL;
3089 goto out;
3090 }
3091
3092 reg_result->host_addr = (uint64_t) block->local_host_addr;
3093
3094 DDPRINTF("Registered rkey for this request: %x\n",
3095 reg_result->rkey);
3096
3097 result_to_network(reg_result);
3098 }
3099
3100 ret = qemu_rdma_post_send_control(rdma,
3101 (uint8_t *) results, ®_resp);
3102
3103 if (ret < 0) {
3104 fprintf(stderr, "Failed to send control buffer!\n");
3105 goto out;
3106 }
3107 break;
3108 case RDMA_CONTROL_UNREGISTER_REQUEST:
3109 DDPRINTF("There are %d unregistration requests\n", head.repeat);
3110 unreg_resp.repeat = head.repeat;
3111 registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3112
3113 for (count = 0; count < head.repeat; count++) {
3114 reg = ®isters[count];
3115 network_to_register(reg);
3116
3117 DDPRINTF("Unregistration request (%d): "
3118 " index %d, chunk %" PRIu64 "\n",
3119 count, reg->current_index, reg->key.chunk);
3120
3121 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3122
3123 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3124 block->pmr[reg->key.chunk] = NULL;
3125
3126 if (ret != 0) {
3127 perror("rdma unregistration chunk failed");
3128 ret = -ret;
3129 goto out;
3130 }
3131
3132 rdma->total_registrations--;
3133
3134 DDPRINTF("Unregistered chunk %" PRIu64 " successfully.\n",
3135 reg->key.chunk);
3136 }
3137
3138 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
3139
3140 if (ret < 0) {
3141 fprintf(stderr, "Failed to send control buffer!\n");
3142 goto out;
3143 }
3144 break;
3145 case RDMA_CONTROL_REGISTER_RESULT:
3146 fprintf(stderr, "Invalid RESULT message at dest.\n");
3147 ret = -EIO;
3148 goto out;
3149 default:
3150 fprintf(stderr, "Unknown control message %s\n",
3151 control_desc[head.type]);
3152 ret = -EIO;
3153 goto out;
3154 }
3155 } while (1);
3156out:
3157 if (ret < 0) {
3158 rdma->error_state = ret;
3159 }
3160 return ret;
3161}
3162
3163static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
3164 uint64_t flags)
3165{
3166 QEMUFileRDMA *rfile = opaque;
3167 RDMAContext *rdma = rfile->rdma;
3168
3169 CHECK_ERROR_STATE();
3170
3171 DDDPRINTF("start section: %" PRIu64 "\n", flags);
3172 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3173 qemu_fflush(f);
3174
3175 return 0;
3176}
3177
3178
3179
3180
3181
3182static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
3183 uint64_t flags)
3184{
3185 Error *local_err = NULL, **errp = &local_err;
3186 QEMUFileRDMA *rfile = opaque;
3187 RDMAContext *rdma = rfile->rdma;
3188 RDMAControlHeader head = { .len = 0, .repeat = 1 };
3189 int ret = 0;
3190
3191 CHECK_ERROR_STATE();
3192
3193 qemu_fflush(f);
3194 ret = qemu_rdma_drain_cq(f, rdma);
3195
3196 if (ret < 0) {
3197 goto err;
3198 }
3199
3200 if (flags == RAM_CONTROL_SETUP) {
3201 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3202 RDMALocalBlocks *local = &rdma->local_ram_blocks;
3203 int reg_result_idx, i, j, nb_remote_blocks;
3204
3205 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3206 DPRINTF("Sending registration setup for ram blocks...\n");
3207
3208
3209
3210
3211
3212
3213
3214
3215
3216 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3217 ®_result_idx, rdma->pin_all ?
3218 qemu_rdma_reg_whole_ram_blocks : NULL);
3219 if (ret < 0) {
3220 ERROR(errp, "receiving remote info!");
3221 return ret;
3222 }
3223
3224 nb_remote_blocks = resp.len / sizeof(RDMARemoteBlock);
3225
3226
3227
3228
3229
3230
3231
3232
3233
3234
3235
3236
3237
3238 if (local->nb_blocks != nb_remote_blocks) {
3239 ERROR(errp, "ram blocks mismatch #1! "
3240 "Your QEMU command line parameters are probably "
3241 "not identical on both the source and destination.");
3242 return -EINVAL;
3243 }
3244
3245 qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3246 memcpy(rdma->block,
3247 rdma->wr_data[reg_result_idx].control_curr, resp.len);
3248 for (i = 0; i < nb_remote_blocks; i++) {
3249 network_to_remote_block(&rdma->block[i]);
3250
3251
3252 for (j = 0; j < local->nb_blocks; j++) {
3253 if (rdma->block[i].offset != local->block[j].offset) {
3254 continue;
3255 }
3256
3257 if (rdma->block[i].length != local->block[j].length) {
3258 ERROR(errp, "ram blocks mismatch #2! "
3259 "Your QEMU command line parameters are probably "
3260 "not identical on both the source and destination.");
3261 return -EINVAL;
3262 }
3263 local->block[j].remote_host_addr =
3264 rdma->block[i].remote_host_addr;
3265 local->block[j].remote_rkey = rdma->block[i].remote_rkey;
3266 break;
3267 }
3268
3269 if (j >= local->nb_blocks) {
3270 ERROR(errp, "ram blocks mismatch #3! "
3271 "Your QEMU command line parameters are probably "
3272 "not identical on both the source and destination.");
3273 return -EINVAL;
3274 }
3275 }
3276 }
3277
3278 DDDPRINTF("Sending registration finish %" PRIu64 "...\n", flags);
3279
3280 head.type = RDMA_CONTROL_REGISTER_FINISHED;
3281 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3282
3283 if (ret < 0) {
3284 goto err;
3285 }
3286
3287 return 0;
3288err:
3289 rdma->error_state = ret;
3290 return ret;
3291}
3292
3293static int qemu_rdma_get_fd(void *opaque)
3294{
3295 QEMUFileRDMA *rfile = opaque;
3296 RDMAContext *rdma = rfile->rdma;
3297
3298 return rdma->comp_channel->fd;
3299}
3300
3301const QEMUFileOps rdma_read_ops = {
3302 .get_buffer = qemu_rdma_get_buffer,
3303 .get_fd = qemu_rdma_get_fd,
3304 .close = qemu_rdma_close,
3305 .hook_ram_load = qemu_rdma_registration_handle,
3306};
3307
3308const QEMUFileOps rdma_write_ops = {
3309 .put_buffer = qemu_rdma_put_buffer,
3310 .close = qemu_rdma_close,
3311 .before_ram_iterate = qemu_rdma_registration_start,
3312 .after_ram_iterate = qemu_rdma_registration_stop,
3313 .save_page = qemu_rdma_save_page,
3314};
3315
3316static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
3317{
3318 QEMUFileRDMA *r = g_malloc0(sizeof(QEMUFileRDMA));
3319
3320 if (qemu_file_mode_is_not_valid(mode)) {
3321 return NULL;
3322 }
3323
3324 r->rdma = rdma;
3325
3326 if (mode[0] == 'w') {
3327 r->file = qemu_fopen_ops(r, &rdma_write_ops);
3328 } else {
3329 r->file = qemu_fopen_ops(r, &rdma_read_ops);
3330 }
3331
3332 return r->file;
3333}
3334
3335static void rdma_accept_incoming_migration(void *opaque)
3336{
3337 RDMAContext *rdma = opaque;
3338 int ret;
3339 QEMUFile *f;
3340 Error *local_err = NULL, **errp = &local_err;
3341
3342 DPRINTF("Accepting rdma connection...\n");
3343 ret = qemu_rdma_accept(rdma);
3344
3345 if (ret) {
3346 ERROR(errp, "RDMA Migration initialization failed!");
3347 return;
3348 }
3349
3350 DPRINTF("Accepted migration\n");
3351
3352 f = qemu_fopen_rdma(rdma, "rb");
3353 if (f == NULL) {
3354 ERROR(errp, "could not qemu_fopen_rdma!");
3355 qemu_rdma_cleanup(rdma);
3356 return;
3357 }
3358
3359 rdma->migration_started_on_destination = 1;
3360 process_incoming_migration(f);
3361}
3362
3363void rdma_start_incoming_migration(const char *host_port, Error **errp)
3364{
3365 int ret;
3366 RDMAContext *rdma;
3367 Error *local_err = NULL;
3368
3369 DPRINTF("Starting RDMA-based incoming migration\n");
3370 rdma = qemu_rdma_data_init(host_port, &local_err);
3371
3372 if (rdma == NULL) {
3373 goto err;
3374 }
3375
3376 ret = qemu_rdma_dest_init(rdma, &local_err);
3377
3378 if (ret) {
3379 goto err;
3380 }
3381
3382 DPRINTF("qemu_rdma_dest_init success\n");
3383
3384 ret = rdma_listen(rdma->listen_id, 5);
3385
3386 if (ret) {
3387 ERROR(errp, "listening on socket!");
3388 goto err;
3389 }
3390
3391 DPRINTF("rdma_listen success\n");
3392
3393 qemu_set_fd_handler2(rdma->channel->fd, NULL,
3394 rdma_accept_incoming_migration, NULL,
3395 (void *)(intptr_t) rdma);
3396 return;
3397err:
3398 error_propagate(errp, local_err);
3399 g_free(rdma);
3400}
3401
3402void rdma_start_outgoing_migration(void *opaque,
3403 const char *host_port, Error **errp)
3404{
3405 MigrationState *s = opaque;
3406 Error *local_err = NULL, **temp = &local_err;
3407 RDMAContext *rdma = qemu_rdma_data_init(host_port, &local_err);
3408 int ret = 0;
3409
3410 if (rdma == NULL) {
3411 ERROR(temp, "Failed to initialize RDMA data structures! %d", ret);
3412 goto err;
3413 }
3414
3415 ret = qemu_rdma_source_init(rdma, &local_err,
3416 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL]);
3417
3418 if (ret) {
3419 goto err;
3420 }
3421
3422 DPRINTF("qemu_rdma_source_init success\n");
3423 ret = qemu_rdma_connect(rdma, &local_err);
3424
3425 if (ret) {
3426 goto err;
3427 }
3428
3429 DPRINTF("qemu_rdma_source_connect success\n");
3430
3431 s->file = qemu_fopen_rdma(rdma, "wb");
3432 migrate_fd_connect(s);
3433 return;
3434err:
3435 error_propagate(errp, local_err);
3436 g_free(rdma);
3437 migrate_fd_error(s);
3438}
3439