qemu/migration/rdma.c
<<
>>
Prefs
   1/*
   2 * RDMA protocol and interfaces
   3 *
   4 * Copyright IBM, Corp. 2010-2013
   5 * Copyright Red Hat, Inc. 2015-2016
   6 *
   7 * Authors:
   8 *  Michael R. Hines <mrhines@us.ibm.com>
   9 *  Jiuxing Liu <jl@us.ibm.com>
  10 *  Daniel P. Berrange <berrange@redhat.com>
  11 *
  12 * This work is licensed under the terms of the GNU GPL, version 2 or
  13 * later.  See the COPYING file in the top-level directory.
  14 *
  15 */
  16#include "qemu/osdep.h"
  17#include "qapi/error.h"
  18#include "qemu-common.h"
  19#include "qemu/cutils.h"
  20#include "rdma.h"
  21#include "migration.h"
  22#include "qemu-file.h"
  23#include "ram.h"
  24#include "qemu-file-channel.h"
  25#include "qemu/error-report.h"
  26#include "qemu/main-loop.h"
  27#include "qemu/sockets.h"
  28#include "qemu/bitmap.h"
  29#include "qemu/coroutine.h"
  30#include <sys/socket.h>
  31#include <netdb.h>
  32#include <arpa/inet.h>
  33#include <rdma/rdma_cma.h>
  34#include "trace.h"
  35
  36/*
  37 * Print and error on both the Monitor and the Log file.
  38 */
  39#define ERROR(errp, fmt, ...) \
  40    do { \
  41        fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
  42        if (errp && (*(errp) == NULL)) { \
  43            error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
  44        } \
  45    } while (0)
  46
  47#define RDMA_RESOLVE_TIMEOUT_MS 10000
  48
  49/* Do not merge data if larger than this. */
  50#define RDMA_MERGE_MAX (2 * 1024 * 1024)
  51#define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
  52
  53#define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
  54
  55/*
  56 * This is only for non-live state being migrated.
  57 * Instead of RDMA_WRITE messages, we use RDMA_SEND
  58 * messages for that state, which requires a different
  59 * delivery design than main memory.
  60 */
  61#define RDMA_SEND_INCREMENT 32768
  62
  63/*
  64 * Maximum size infiniband SEND message
  65 */
  66#define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
  67#define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
  68
  69#define RDMA_CONTROL_VERSION_CURRENT 1
  70/*
  71 * Capabilities for negotiation.
  72 */
  73#define RDMA_CAPABILITY_PIN_ALL 0x01
  74
  75/*
  76 * Add the other flags above to this list of known capabilities
  77 * as they are introduced.
  78 */
  79static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
  80
  81#define CHECK_ERROR_STATE() \
  82    do { \
  83        if (rdma->error_state) { \
  84            if (!rdma->error_reported) { \
  85                error_report("RDMA is in an error state waiting migration" \
  86                                " to abort!"); \
  87                rdma->error_reported = 1; \
  88            } \
  89            rcu_read_unlock(); \
  90            return rdma->error_state; \
  91        } \
  92    } while (0)
  93
  94/*
  95 * A work request ID is 64-bits and we split up these bits
  96 * into 3 parts:
  97 *
  98 * bits 0-15 : type of control message, 2^16
  99 * bits 16-29: ram block index, 2^14
 100 * bits 30-63: ram block chunk number, 2^34
 101 *
 102 * The last two bit ranges are only used for RDMA writes,
 103 * in order to track their completion and potentially
 104 * also track unregistration status of the message.
 105 */
 106#define RDMA_WRID_TYPE_SHIFT  0UL
 107#define RDMA_WRID_BLOCK_SHIFT 16UL
 108#define RDMA_WRID_CHUNK_SHIFT 30UL
 109
 110#define RDMA_WRID_TYPE_MASK \
 111    ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
 112
 113#define RDMA_WRID_BLOCK_MASK \
 114    (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
 115
 116#define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
 117
 118/*
 119 * RDMA migration protocol:
 120 * 1. RDMA Writes (data messages, i.e. RAM)
 121 * 2. IB Send/Recv (control channel messages)
 122 */
 123enum {
 124    RDMA_WRID_NONE = 0,
 125    RDMA_WRID_RDMA_WRITE = 1,
 126    RDMA_WRID_SEND_CONTROL = 2000,
 127    RDMA_WRID_RECV_CONTROL = 4000,
 128};
 129
 130static const char *wrid_desc[] = {
 131    [RDMA_WRID_NONE] = "NONE",
 132    [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
 133    [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
 134    [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
 135};
 136
 137/*
 138 * Work request IDs for IB SEND messages only (not RDMA writes).
 139 * This is used by the migration protocol to transmit
 140 * control messages (such as device state and registration commands)
 141 *
 142 * We could use more WRs, but we have enough for now.
 143 */
 144enum {
 145    RDMA_WRID_READY = 0,
 146    RDMA_WRID_DATA,
 147    RDMA_WRID_CONTROL,
 148    RDMA_WRID_MAX,
 149};
 150
 151/*
 152 * SEND/RECV IB Control Messages.
 153 */
 154enum {
 155    RDMA_CONTROL_NONE = 0,
 156    RDMA_CONTROL_ERROR,
 157    RDMA_CONTROL_READY,               /* ready to receive */
 158    RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
 159    RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
 160    RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
 161    RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
 162    RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
 163    RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
 164    RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
 165    RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
 166    RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
 167};
 168
 169
 170/*
 171 * Memory and MR structures used to represent an IB Send/Recv work request.
 172 * This is *not* used for RDMA writes, only IB Send/Recv.
 173 */
 174typedef struct {
 175    uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
 176    struct   ibv_mr *control_mr;               /* registration metadata */
 177    size_t   control_len;                      /* length of the message */
 178    uint8_t *control_curr;                     /* start of unconsumed bytes */
 179} RDMAWorkRequestData;
 180
 181/*
 182 * Negotiate RDMA capabilities during connection-setup time.
 183 */
 184typedef struct {
 185    uint32_t version;
 186    uint32_t flags;
 187} RDMACapabilities;
 188
 189static void caps_to_network(RDMACapabilities *cap)
 190{
 191    cap->version = htonl(cap->version);
 192    cap->flags = htonl(cap->flags);
 193}
 194
 195static void network_to_caps(RDMACapabilities *cap)
 196{
 197    cap->version = ntohl(cap->version);
 198    cap->flags = ntohl(cap->flags);
 199}
 200
 201/*
 202 * Representation of a RAMBlock from an RDMA perspective.
 203 * This is not transmitted, only local.
 204 * This and subsequent structures cannot be linked lists
 205 * because we're using a single IB message to transmit
 206 * the information. It's small anyway, so a list is overkill.
 207 */
 208typedef struct RDMALocalBlock {
 209    char          *block_name;
 210    uint8_t       *local_host_addr; /* local virtual address */
 211    uint64_t       remote_host_addr; /* remote virtual address */
 212    uint64_t       offset;
 213    uint64_t       length;
 214    struct         ibv_mr **pmr;    /* MRs for chunk-level registration */
 215    struct         ibv_mr *mr;      /* MR for non-chunk-level registration */
 216    uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
 217    uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration */
 218    int            index;           /* which block are we */
 219    unsigned int   src_index;       /* (Only used on dest) */
 220    bool           is_ram_block;
 221    int            nb_chunks;
 222    unsigned long *transit_bitmap;
 223    unsigned long *unregister_bitmap;
 224} RDMALocalBlock;
 225
 226/*
 227 * Also represents a RAMblock, but only on the dest.
 228 * This gets transmitted by the dest during connection-time
 229 * to the source VM and then is used to populate the
 230 * corresponding RDMALocalBlock with
 231 * the information needed to perform the actual RDMA.
 232 */
 233typedef struct QEMU_PACKED RDMADestBlock {
 234    uint64_t remote_host_addr;
 235    uint64_t offset;
 236    uint64_t length;
 237    uint32_t remote_rkey;
 238    uint32_t padding;
 239} RDMADestBlock;
 240
 241static const char *control_desc(unsigned int rdma_control)
 242{
 243    static const char *strs[] = {
 244        [RDMA_CONTROL_NONE] = "NONE",
 245        [RDMA_CONTROL_ERROR] = "ERROR",
 246        [RDMA_CONTROL_READY] = "READY",
 247        [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
 248        [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
 249        [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
 250        [RDMA_CONTROL_COMPRESS] = "COMPRESS",
 251        [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
 252        [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
 253        [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
 254        [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
 255        [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
 256    };
 257
 258    if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
 259        return "??BAD CONTROL VALUE??";
 260    }
 261
 262    return strs[rdma_control];
 263}
 264
 265static uint64_t htonll(uint64_t v)
 266{
 267    union { uint32_t lv[2]; uint64_t llv; } u;
 268    u.lv[0] = htonl(v >> 32);
 269    u.lv[1] = htonl(v & 0xFFFFFFFFULL);
 270    return u.llv;
 271}
 272
 273static uint64_t ntohll(uint64_t v) {
 274    union { uint32_t lv[2]; uint64_t llv; } u;
 275    u.llv = v;
 276    return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
 277}
 278
 279static void dest_block_to_network(RDMADestBlock *db)
 280{
 281    db->remote_host_addr = htonll(db->remote_host_addr);
 282    db->offset = htonll(db->offset);
 283    db->length = htonll(db->length);
 284    db->remote_rkey = htonl(db->remote_rkey);
 285}
 286
 287static void network_to_dest_block(RDMADestBlock *db)
 288{
 289    db->remote_host_addr = ntohll(db->remote_host_addr);
 290    db->offset = ntohll(db->offset);
 291    db->length = ntohll(db->length);
 292    db->remote_rkey = ntohl(db->remote_rkey);
 293}
 294
 295/*
 296 * Virtual address of the above structures used for transmitting
 297 * the RAMBlock descriptions at connection-time.
 298 * This structure is *not* transmitted.
 299 */
 300typedef struct RDMALocalBlocks {
 301    int nb_blocks;
 302    bool     init;             /* main memory init complete */
 303    RDMALocalBlock *block;
 304} RDMALocalBlocks;
 305
 306/*
 307 * Main data structure for RDMA state.
 308 * While there is only one copy of this structure being allocated right now,
 309 * this is the place where one would start if you wanted to consider
 310 * having more than one RDMA connection open at the same time.
 311 */
 312typedef struct RDMAContext {
 313    char *host;
 314    int port;
 315
 316    RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
 317
 318    /*
 319     * This is used by *_exchange_send() to figure out whether or not
 320     * the initial "READY" message has already been received or not.
 321     * This is because other functions may potentially poll() and detect
 322     * the READY message before send() does, in which case we need to
 323     * know if it completed.
 324     */
 325    int control_ready_expected;
 326
 327    /* number of outstanding writes */
 328    int nb_sent;
 329
 330    /* store info about current buffer so that we can
 331       merge it with future sends */
 332    uint64_t current_addr;
 333    uint64_t current_length;
 334    /* index of ram block the current buffer belongs to */
 335    int current_index;
 336    /* index of the chunk in the current ram block */
 337    int current_chunk;
 338
 339    bool pin_all;
 340
 341    /*
 342     * infiniband-specific variables for opening the device
 343     * and maintaining connection state and so forth.
 344     *
 345     * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
 346     * cm_id->verbs, cm_id->channel, and cm_id->qp.
 347     */
 348    struct rdma_cm_id *cm_id;               /* connection manager ID */
 349    struct rdma_cm_id *listen_id;
 350    bool connected;
 351
 352    struct ibv_context          *verbs;
 353    struct rdma_event_channel   *channel;
 354    struct ibv_qp *qp;                      /* queue pair */
 355    struct ibv_comp_channel *comp_channel;  /* completion channel */
 356    struct ibv_pd *pd;                      /* protection domain */
 357    struct ibv_cq *cq;                      /* completion queue */
 358
 359    /*
 360     * If a previous write failed (perhaps because of a failed
 361     * memory registration, then do not attempt any future work
 362     * and remember the error state.
 363     */
 364    int error_state;
 365    int error_reported;
 366    int received_error;
 367
 368    /*
 369     * Description of ram blocks used throughout the code.
 370     */
 371    RDMALocalBlocks local_ram_blocks;
 372    RDMADestBlock  *dest_blocks;
 373
 374    /* Index of the next RAMBlock received during block registration */
 375    unsigned int    next_src_index;
 376
 377    /*
 378     * Migration on *destination* started.
 379     * Then use coroutine yield function.
 380     * Source runs in a thread, so we don't care.
 381     */
 382    int migration_started_on_destination;
 383
 384    int total_registrations;
 385    int total_writes;
 386
 387    int unregister_current, unregister_next;
 388    uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
 389
 390    GHashTable *blockmap;
 391
 392    /* the RDMAContext for return path */
 393    struct RDMAContext *return_path;
 394    bool is_return_path;
 395} RDMAContext;
 396
 397#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
 398#define QIO_CHANNEL_RDMA(obj)                                     \
 399    OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA)
 400
 401typedef struct QIOChannelRDMA QIOChannelRDMA;
 402
 403
 404struct QIOChannelRDMA {
 405    QIOChannel parent;
 406    RDMAContext *rdmain;
 407    RDMAContext *rdmaout;
 408    QEMUFile *file;
 409    bool blocking; /* XXX we don't actually honour this yet */
 410};
 411
 412/*
 413 * Main structure for IB Send/Recv control messages.
 414 * This gets prepended at the beginning of every Send/Recv.
 415 */
 416typedef struct QEMU_PACKED {
 417    uint32_t len;     /* Total length of data portion */
 418    uint32_t type;    /* which control command to perform */
 419    uint32_t repeat;  /* number of commands in data portion of same type */
 420    uint32_t padding;
 421} RDMAControlHeader;
 422
 423static void control_to_network(RDMAControlHeader *control)
 424{
 425    control->type = htonl(control->type);
 426    control->len = htonl(control->len);
 427    control->repeat = htonl(control->repeat);
 428}
 429
 430static void network_to_control(RDMAControlHeader *control)
 431{
 432    control->type = ntohl(control->type);
 433    control->len = ntohl(control->len);
 434    control->repeat = ntohl(control->repeat);
 435}
 436
 437/*
 438 * Register a single Chunk.
 439 * Information sent by the source VM to inform the dest
 440 * to register an single chunk of memory before we can perform
 441 * the actual RDMA operation.
 442 */
 443typedef struct QEMU_PACKED {
 444    union QEMU_PACKED {
 445        uint64_t current_addr;  /* offset into the ram_addr_t space */
 446        uint64_t chunk;         /* chunk to lookup if unregistering */
 447    } key;
 448    uint32_t current_index; /* which ramblock the chunk belongs to */
 449    uint32_t padding;
 450    uint64_t chunks;            /* how many sequential chunks to register */
 451} RDMARegister;
 452
 453static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
 454{
 455    RDMALocalBlock *local_block;
 456    local_block  = &rdma->local_ram_blocks.block[reg->current_index];
 457
 458    if (local_block->is_ram_block) {
 459        /*
 460         * current_addr as passed in is an address in the local ram_addr_t
 461         * space, we need to translate this for the destination
 462         */
 463        reg->key.current_addr -= local_block->offset;
 464        reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
 465    }
 466    reg->key.current_addr = htonll(reg->key.current_addr);
 467    reg->current_index = htonl(reg->current_index);
 468    reg->chunks = htonll(reg->chunks);
 469}
 470
 471static void network_to_register(RDMARegister *reg)
 472{
 473    reg->key.current_addr = ntohll(reg->key.current_addr);
 474    reg->current_index = ntohl(reg->current_index);
 475    reg->chunks = ntohll(reg->chunks);
 476}
 477
 478typedef struct QEMU_PACKED {
 479    uint32_t value;     /* if zero, we will madvise() */
 480    uint32_t block_idx; /* which ram block index */
 481    uint64_t offset;    /* Address in remote ram_addr_t space */
 482    uint64_t length;    /* length of the chunk */
 483} RDMACompress;
 484
 485static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
 486{
 487    comp->value = htonl(comp->value);
 488    /*
 489     * comp->offset as passed in is an address in the local ram_addr_t
 490     * space, we need to translate this for the destination
 491     */
 492    comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
 493    comp->offset += rdma->dest_blocks[comp->block_idx].offset;
 494    comp->block_idx = htonl(comp->block_idx);
 495    comp->offset = htonll(comp->offset);
 496    comp->length = htonll(comp->length);
 497}
 498
 499static void network_to_compress(RDMACompress *comp)
 500{
 501    comp->value = ntohl(comp->value);
 502    comp->block_idx = ntohl(comp->block_idx);
 503    comp->offset = ntohll(comp->offset);
 504    comp->length = ntohll(comp->length);
 505}
 506
 507/*
 508 * The result of the dest's memory registration produces an "rkey"
 509 * which the source VM must reference in order to perform
 510 * the RDMA operation.
 511 */
 512typedef struct QEMU_PACKED {
 513    uint32_t rkey;
 514    uint32_t padding;
 515    uint64_t host_addr;
 516} RDMARegisterResult;
 517
 518static void result_to_network(RDMARegisterResult *result)
 519{
 520    result->rkey = htonl(result->rkey);
 521    result->host_addr = htonll(result->host_addr);
 522};
 523
 524static void network_to_result(RDMARegisterResult *result)
 525{
 526    result->rkey = ntohl(result->rkey);
 527    result->host_addr = ntohll(result->host_addr);
 528};
 529
 530const char *print_wrid(int wrid);
 531static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
 532                                   uint8_t *data, RDMAControlHeader *resp,
 533                                   int *resp_idx,
 534                                   int (*callback)(RDMAContext *rdma));
 535
 536static inline uint64_t ram_chunk_index(const uint8_t *start,
 537                                       const uint8_t *host)
 538{
 539    return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
 540}
 541
 542static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
 543                                       uint64_t i)
 544{
 545    return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
 546                                  (i << RDMA_REG_CHUNK_SHIFT));
 547}
 548
 549static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
 550                                     uint64_t i)
 551{
 552    uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
 553                                         (1UL << RDMA_REG_CHUNK_SHIFT);
 554
 555    if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
 556        result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
 557    }
 558
 559    return result;
 560}
 561
 562static int rdma_add_block(RDMAContext *rdma, const char *block_name,
 563                         void *host_addr,
 564                         ram_addr_t block_offset, uint64_t length)
 565{
 566    RDMALocalBlocks *local = &rdma->local_ram_blocks;
 567    RDMALocalBlock *block;
 568    RDMALocalBlock *old = local->block;
 569
 570    local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
 571
 572    if (local->nb_blocks) {
 573        int x;
 574
 575        if (rdma->blockmap) {
 576            for (x = 0; x < local->nb_blocks; x++) {
 577                g_hash_table_remove(rdma->blockmap,
 578                                    (void *)(uintptr_t)old[x].offset);
 579                g_hash_table_insert(rdma->blockmap,
 580                                    (void *)(uintptr_t)old[x].offset,
 581                                    &local->block[x]);
 582            }
 583        }
 584        memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
 585        g_free(old);
 586    }
 587
 588    block = &local->block[local->nb_blocks];
 589
 590    block->block_name = g_strdup(block_name);
 591    block->local_host_addr = host_addr;
 592    block->offset = block_offset;
 593    block->length = length;
 594    block->index = local->nb_blocks;
 595    block->src_index = ~0U; /* Filled in by the receipt of the block list */
 596    block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
 597    block->transit_bitmap = bitmap_new(block->nb_chunks);
 598    bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
 599    block->unregister_bitmap = bitmap_new(block->nb_chunks);
 600    bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
 601    block->remote_keys = g_new0(uint32_t, block->nb_chunks);
 602
 603    block->is_ram_block = local->init ? false : true;
 604
 605    if (rdma->blockmap) {
 606        g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
 607    }
 608
 609    trace_rdma_add_block(block_name, local->nb_blocks,
 610                         (uintptr_t) block->local_host_addr,
 611                         block->offset, block->length,
 612                         (uintptr_t) (block->local_host_addr + block->length),
 613                         BITS_TO_LONGS(block->nb_chunks) *
 614                             sizeof(unsigned long) * 8,
 615                         block->nb_chunks);
 616
 617    local->nb_blocks++;
 618
 619    return 0;
 620}
 621
 622/*
 623 * Memory regions need to be registered with the device and queue pairs setup
 624 * in advanced before the migration starts. This tells us where the RAM blocks
 625 * are so that we can register them individually.
 626 */
 627static int qemu_rdma_init_one_block(const char *block_name, void *host_addr,
 628    ram_addr_t block_offset, ram_addr_t length, void *opaque)
 629{
 630    return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
 631}
 632
 633/*
 634 * Identify the RAMBlocks and their quantity. They will be references to
 635 * identify chunk boundaries inside each RAMBlock and also be referenced
 636 * during dynamic page registration.
 637 */
 638static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
 639{
 640    RDMALocalBlocks *local = &rdma->local_ram_blocks;
 641
 642    assert(rdma->blockmap == NULL);
 643    memset(local, 0, sizeof *local);
 644    qemu_ram_foreach_migratable_block(qemu_rdma_init_one_block, rdma);
 645    trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
 646    rdma->dest_blocks = g_new0(RDMADestBlock,
 647                               rdma->local_ram_blocks.nb_blocks);
 648    local->init = true;
 649    return 0;
 650}
 651
 652/*
 653 * Note: If used outside of cleanup, the caller must ensure that the destination
 654 * block structures are also updated
 655 */
 656static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
 657{
 658    RDMALocalBlocks *local = &rdma->local_ram_blocks;
 659    RDMALocalBlock *old = local->block;
 660    int x;
 661
 662    if (rdma->blockmap) {
 663        g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
 664    }
 665    if (block->pmr) {
 666        int j;
 667
 668        for (j = 0; j < block->nb_chunks; j++) {
 669            if (!block->pmr[j]) {
 670                continue;
 671            }
 672            ibv_dereg_mr(block->pmr[j]);
 673            rdma->total_registrations--;
 674        }
 675        g_free(block->pmr);
 676        block->pmr = NULL;
 677    }
 678
 679    if (block->mr) {
 680        ibv_dereg_mr(block->mr);
 681        rdma->total_registrations--;
 682        block->mr = NULL;
 683    }
 684
 685    g_free(block->transit_bitmap);
 686    block->transit_bitmap = NULL;
 687
 688    g_free(block->unregister_bitmap);
 689    block->unregister_bitmap = NULL;
 690
 691    g_free(block->remote_keys);
 692    block->remote_keys = NULL;
 693
 694    g_free(block->block_name);
 695    block->block_name = NULL;
 696
 697    if (rdma->blockmap) {
 698        for (x = 0; x < local->nb_blocks; x++) {
 699            g_hash_table_remove(rdma->blockmap,
 700                                (void *)(uintptr_t)old[x].offset);
 701        }
 702    }
 703
 704    if (local->nb_blocks > 1) {
 705
 706        local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
 707
 708        if (block->index) {
 709            memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
 710        }
 711
 712        if (block->index < (local->nb_blocks - 1)) {
 713            memcpy(local->block + block->index, old + (block->index + 1),
 714                sizeof(RDMALocalBlock) *
 715                    (local->nb_blocks - (block->index + 1)));
 716            for (x = block->index; x < local->nb_blocks - 1; x++) {
 717                local->block[x].index--;
 718            }
 719        }
 720    } else {
 721        assert(block == local->block);
 722        local->block = NULL;
 723    }
 724
 725    trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
 726                           block->offset, block->length,
 727                            (uintptr_t)(block->local_host_addr + block->length),
 728                           BITS_TO_LONGS(block->nb_chunks) *
 729                               sizeof(unsigned long) * 8, block->nb_chunks);
 730
 731    g_free(old);
 732
 733    local->nb_blocks--;
 734
 735    if (local->nb_blocks && rdma->blockmap) {
 736        for (x = 0; x < local->nb_blocks; x++) {
 737            g_hash_table_insert(rdma->blockmap,
 738                                (void *)(uintptr_t)local->block[x].offset,
 739                                &local->block[x]);
 740        }
 741    }
 742
 743    return 0;
 744}
 745
 746/*
 747 * Put in the log file which RDMA device was opened and the details
 748 * associated with that device.
 749 */
 750static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
 751{
 752    struct ibv_port_attr port;
 753
 754    if (ibv_query_port(verbs, 1, &port)) {
 755        error_report("Failed to query port information");
 756        return;
 757    }
 758
 759    printf("%s RDMA Device opened: kernel name %s "
 760           "uverbs device name %s, "
 761           "infiniband_verbs class device path %s, "
 762           "infiniband class device path %s, "
 763           "transport: (%d) %s\n",
 764                who,
 765                verbs->device->name,
 766                verbs->device->dev_name,
 767                verbs->device->dev_path,
 768                verbs->device->ibdev_path,
 769                port.link_layer,
 770                (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
 771                 ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
 772                    ? "Ethernet" : "Unknown"));
 773}
 774
 775/*
 776 * Put in the log file the RDMA gid addressing information,
 777 * useful for folks who have trouble understanding the
 778 * RDMA device hierarchy in the kernel.
 779 */
 780static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
 781{
 782    char sgid[33];
 783    char dgid[33];
 784    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
 785    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
 786    trace_qemu_rdma_dump_gid(who, sgid, dgid);
 787}
 788
 789/*
 790 * As of now, IPv6 over RoCE / iWARP is not supported by linux.
 791 * We will try the next addrinfo struct, and fail if there are
 792 * no other valid addresses to bind against.
 793 *
 794 * If user is listening on '[::]', then we will not have a opened a device
 795 * yet and have no way of verifying if the device is RoCE or not.
 796 *
 797 * In this case, the source VM will throw an error for ALL types of
 798 * connections (both IPv4 and IPv6) if the destination machine does not have
 799 * a regular infiniband network available for use.
 800 *
 801 * The only way to guarantee that an error is thrown for broken kernels is
 802 * for the management software to choose a *specific* interface at bind time
 803 * and validate what time of hardware it is.
 804 *
 805 * Unfortunately, this puts the user in a fix:
 806 *
 807 *  If the source VM connects with an IPv4 address without knowing that the
 808 *  destination has bound to '[::]' the migration will unconditionally fail
 809 *  unless the management software is explicitly listening on the IPv4
 810 *  address while using a RoCE-based device.
 811 *
 812 *  If the source VM connects with an IPv6 address, then we're OK because we can
 813 *  throw an error on the source (and similarly on the destination).
 814 *
 815 *  But in mixed environments, this will be broken for a while until it is fixed
 816 *  inside linux.
 817 *
 818 * We do provide a *tiny* bit of help in this function: We can list all of the
 819 * devices in the system and check to see if all the devices are RoCE or
 820 * Infiniband.
 821 *
 822 * If we detect that we have a *pure* RoCE environment, then we can safely
 823 * thrown an error even if the management software has specified '[::]' as the
 824 * bind address.
 825 *
 826 * However, if there is are multiple hetergeneous devices, then we cannot make
 827 * this assumption and the user just has to be sure they know what they are
 828 * doing.
 829 *
 830 * Patches are being reviewed on linux-rdma.
 831 */
 832static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
 833{
 834    struct ibv_port_attr port_attr;
 835
 836    /* This bug only exists in linux, to our knowledge. */
 837#ifdef CONFIG_LINUX
 838
 839    /*
 840     * Verbs are only NULL if management has bound to '[::]'.
 841     *
 842     * Let's iterate through all the devices and see if there any pure IB
 843     * devices (non-ethernet).
 844     *
 845     * If not, then we can safely proceed with the migration.
 846     * Otherwise, there are no guarantees until the bug is fixed in linux.
 847     */
 848    if (!verbs) {
 849        int num_devices, x;
 850        struct ibv_device ** dev_list = ibv_get_device_list(&num_devices);
 851        bool roce_found = false;
 852        bool ib_found = false;
 853
 854        for (x = 0; x < num_devices; x++) {
 855            verbs = ibv_open_device(dev_list[x]);
 856            if (!verbs) {
 857                if (errno == EPERM) {
 858                    continue;
 859                } else {
 860                    return -EINVAL;
 861                }
 862            }
 863
 864            if (ibv_query_port(verbs, 1, &port_attr)) {
 865                ibv_close_device(verbs);
 866                ERROR(errp, "Could not query initial IB port");
 867                return -EINVAL;
 868            }
 869
 870            if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
 871                ib_found = true;
 872            } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
 873                roce_found = true;
 874            }
 875
 876            ibv_close_device(verbs);
 877
 878        }
 879
 880        if (roce_found) {
 881            if (ib_found) {
 882                fprintf(stderr, "WARN: migrations may fail:"
 883                                " IPv6 over RoCE / iWARP in linux"
 884                                " is broken. But since you appear to have a"
 885                                " mixed RoCE / IB environment, be sure to only"
 886                                " migrate over the IB fabric until the kernel "
 887                                " fixes the bug.\n");
 888            } else {
 889                ERROR(errp, "You only have RoCE / iWARP devices in your systems"
 890                            " and your management software has specified '[::]'"
 891                            ", but IPv6 over RoCE / iWARP is not supported in Linux.");
 892                return -ENONET;
 893            }
 894        }
 895
 896        return 0;
 897    }
 898
 899    /*
 900     * If we have a verbs context, that means that some other than '[::]' was
 901     * used by the management software for binding. In which case we can
 902     * actually warn the user about a potentially broken kernel.
 903     */
 904
 905    /* IB ports start with 1, not 0 */
 906    if (ibv_query_port(verbs, 1, &port_attr)) {
 907        ERROR(errp, "Could not query initial IB port");
 908        return -EINVAL;
 909    }
 910
 911    if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
 912        ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
 913                    "(but patches on linux-rdma in progress)");
 914        return -ENONET;
 915    }
 916
 917#endif
 918
 919    return 0;
 920}
 921
 922/*
 923 * Figure out which RDMA device corresponds to the requested IP hostname
 924 * Also create the initial connection manager identifiers for opening
 925 * the connection.
 926 */
 927static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
 928{
 929    int ret;
 930    struct rdma_addrinfo *res;
 931    char port_str[16];
 932    struct rdma_cm_event *cm_event;
 933    char ip[40] = "unknown";
 934    struct rdma_addrinfo *e;
 935
 936    if (rdma->host == NULL || !strcmp(rdma->host, "")) {
 937        ERROR(errp, "RDMA hostname has not been set");
 938        return -EINVAL;
 939    }
 940
 941    /* create CM channel */
 942    rdma->channel = rdma_create_event_channel();
 943    if (!rdma->channel) {
 944        ERROR(errp, "could not create CM channel");
 945        return -EINVAL;
 946    }
 947
 948    /* create CM id */
 949    ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
 950    if (ret) {
 951        ERROR(errp, "could not create channel id");
 952        goto err_resolve_create_id;
 953    }
 954
 955    snprintf(port_str, 16, "%d", rdma->port);
 956    port_str[15] = '\0';
 957
 958    ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
 959    if (ret < 0) {
 960        ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
 961        goto err_resolve_get_addr;
 962    }
 963
 964    for (e = res; e != NULL; e = e->ai_next) {
 965        inet_ntop(e->ai_family,
 966            &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
 967        trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
 968
 969        ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
 970                RDMA_RESOLVE_TIMEOUT_MS);
 971        if (!ret) {
 972            if (e->ai_family == AF_INET6) {
 973                ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp);
 974                if (ret) {
 975                    continue;
 976                }
 977            }
 978            goto route;
 979        }
 980    }
 981
 982    ERROR(errp, "could not resolve address %s", rdma->host);
 983    goto err_resolve_get_addr;
 984
 985route:
 986    qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
 987
 988    ret = rdma_get_cm_event(rdma->channel, &cm_event);
 989    if (ret) {
 990        ERROR(errp, "could not perform event_addr_resolved");
 991        goto err_resolve_get_addr;
 992    }
 993
 994    if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
 995        ERROR(errp, "result not equal to event_addr_resolved %s",
 996                rdma_event_str(cm_event->event));
 997        perror("rdma_resolve_addr");
 998        rdma_ack_cm_event(cm_event);
 999        ret = -EINVAL;
1000        goto err_resolve_get_addr;
1001    }
1002    rdma_ack_cm_event(cm_event);
1003
1004    /* resolve route */
1005    ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
1006    if (ret) {
1007        ERROR(errp, "could not resolve rdma route");
1008        goto err_resolve_get_addr;
1009    }
1010
1011    ret = rdma_get_cm_event(rdma->channel, &cm_event);
1012    if (ret) {
1013        ERROR(errp, "could not perform event_route_resolved");
1014        goto err_resolve_get_addr;
1015    }
1016    if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
1017        ERROR(errp, "result not equal to event_route_resolved: %s",
1018                        rdma_event_str(cm_event->event));
1019        rdma_ack_cm_event(cm_event);
1020        ret = -EINVAL;
1021        goto err_resolve_get_addr;
1022    }
1023    rdma_ack_cm_event(cm_event);
1024    rdma->verbs = rdma->cm_id->verbs;
1025    qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
1026    qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
1027    return 0;
1028
1029err_resolve_get_addr:
1030    rdma_destroy_id(rdma->cm_id);
1031    rdma->cm_id = NULL;
1032err_resolve_create_id:
1033    rdma_destroy_event_channel(rdma->channel);
1034    rdma->channel = NULL;
1035    return ret;
1036}
1037
1038/*
1039 * Create protection domain and completion queues
1040 */
1041static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
1042{
1043    /* allocate pd */
1044    rdma->pd = ibv_alloc_pd(rdma->verbs);
1045    if (!rdma->pd) {
1046        error_report("failed to allocate protection domain");
1047        return -1;
1048    }
1049
1050    /* create completion channel */
1051    rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
1052    if (!rdma->comp_channel) {
1053        error_report("failed to allocate completion channel");
1054        goto err_alloc_pd_cq;
1055    }
1056
1057    /*
1058     * Completion queue can be filled by both read and write work requests,
1059     * so must reflect the sum of both possible queue sizes.
1060     */
1061    rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1062            NULL, rdma->comp_channel, 0);
1063    if (!rdma->cq) {
1064        error_report("failed to allocate completion queue");
1065        goto err_alloc_pd_cq;
1066    }
1067
1068    return 0;
1069
1070err_alloc_pd_cq:
1071    if (rdma->pd) {
1072        ibv_dealloc_pd(rdma->pd);
1073    }
1074    if (rdma->comp_channel) {
1075        ibv_destroy_comp_channel(rdma->comp_channel);
1076    }
1077    rdma->pd = NULL;
1078    rdma->comp_channel = NULL;
1079    return -1;
1080
1081}
1082
1083/*
1084 * Create queue pairs.
1085 */
1086static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1087{
1088    struct ibv_qp_init_attr attr = { 0 };
1089    int ret;
1090
1091    attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1092    attr.cap.max_recv_wr = 3;
1093    attr.cap.max_send_sge = 1;
1094    attr.cap.max_recv_sge = 1;
1095    attr.send_cq = rdma->cq;
1096    attr.recv_cq = rdma->cq;
1097    attr.qp_type = IBV_QPT_RC;
1098
1099    ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
1100    if (ret) {
1101        return -1;
1102    }
1103
1104    rdma->qp = rdma->cm_id->qp;
1105    return 0;
1106}
1107
1108static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
1109{
1110    int i;
1111    RDMALocalBlocks *local = &rdma->local_ram_blocks;
1112
1113    for (i = 0; i < local->nb_blocks; i++) {
1114        local->block[i].mr =
1115            ibv_reg_mr(rdma->pd,
1116                    local->block[i].local_host_addr,
1117                    local->block[i].length,
1118                    IBV_ACCESS_LOCAL_WRITE |
1119                    IBV_ACCESS_REMOTE_WRITE
1120                    );
1121        if (!local->block[i].mr) {
1122            perror("Failed to register local dest ram block!\n");
1123            break;
1124        }
1125        rdma->total_registrations++;
1126    }
1127
1128    if (i >= local->nb_blocks) {
1129        return 0;
1130    }
1131
1132    for (i--; i >= 0; i--) {
1133        ibv_dereg_mr(local->block[i].mr);
1134        rdma->total_registrations--;
1135    }
1136
1137    return -1;
1138
1139}
1140
1141/*
1142 * Find the ram block that corresponds to the page requested to be
1143 * transmitted by QEMU.
1144 *
1145 * Once the block is found, also identify which 'chunk' within that
1146 * block that the page belongs to.
1147 *
1148 * This search cannot fail or the migration will fail.
1149 */
1150static int qemu_rdma_search_ram_block(RDMAContext *rdma,
1151                                      uintptr_t block_offset,
1152                                      uint64_t offset,
1153                                      uint64_t length,
1154                                      uint64_t *block_index,
1155                                      uint64_t *chunk_index)
1156{
1157    uint64_t current_addr = block_offset + offset;
1158    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1159                                                (void *) block_offset);
1160    assert(block);
1161    assert(current_addr >= block->offset);
1162    assert((current_addr + length) <= (block->offset + block->length));
1163
1164    *block_index = block->index;
1165    *chunk_index = ram_chunk_index(block->local_host_addr,
1166                block->local_host_addr + (current_addr - block->offset));
1167
1168    return 0;
1169}
1170
1171/*
1172 * Register a chunk with IB. If the chunk was already registered
1173 * previously, then skip.
1174 *
1175 * Also return the keys associated with the registration needed
1176 * to perform the actual RDMA operation.
1177 */
1178static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1179        RDMALocalBlock *block, uintptr_t host_addr,
1180        uint32_t *lkey, uint32_t *rkey, int chunk,
1181        uint8_t *chunk_start, uint8_t *chunk_end)
1182{
1183    if (block->mr) {
1184        if (lkey) {
1185            *lkey = block->mr->lkey;
1186        }
1187        if (rkey) {
1188            *rkey = block->mr->rkey;
1189        }
1190        return 0;
1191    }
1192
1193    /* allocate memory to store chunk MRs */
1194    if (!block->pmr) {
1195        block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1196    }
1197
1198    /*
1199     * If 'rkey', then we're the destination, so grant access to the source.
1200     *
1201     * If 'lkey', then we're the source VM, so grant access only to ourselves.
1202     */
1203    if (!block->pmr[chunk]) {
1204        uint64_t len = chunk_end - chunk_start;
1205
1206        trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1207
1208        block->pmr[chunk] = ibv_reg_mr(rdma->pd,
1209                chunk_start, len,
1210                (rkey ? (IBV_ACCESS_LOCAL_WRITE |
1211                        IBV_ACCESS_REMOTE_WRITE) : 0));
1212
1213        if (!block->pmr[chunk]) {
1214            perror("Failed to register chunk!");
1215            fprintf(stderr, "Chunk details: block: %d chunk index %d"
1216                            " start %" PRIuPTR " end %" PRIuPTR
1217                            " host %" PRIuPTR
1218                            " local %" PRIuPTR " registrations: %d\n",
1219                            block->index, chunk, (uintptr_t)chunk_start,
1220                            (uintptr_t)chunk_end, host_addr,
1221                            (uintptr_t)block->local_host_addr,
1222                            rdma->total_registrations);
1223            return -1;
1224        }
1225        rdma->total_registrations++;
1226    }
1227
1228    if (lkey) {
1229        *lkey = block->pmr[chunk]->lkey;
1230    }
1231    if (rkey) {
1232        *rkey = block->pmr[chunk]->rkey;
1233    }
1234    return 0;
1235}
1236
1237/*
1238 * Register (at connection time) the memory used for control
1239 * channel messages.
1240 */
1241static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1242{
1243    rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1244            rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1245            IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1246    if (rdma->wr_data[idx].control_mr) {
1247        rdma->total_registrations++;
1248        return 0;
1249    }
1250    error_report("qemu_rdma_reg_control failed");
1251    return -1;
1252}
1253
1254const char *print_wrid(int wrid)
1255{
1256    if (wrid >= RDMA_WRID_RECV_CONTROL) {
1257        return wrid_desc[RDMA_WRID_RECV_CONTROL];
1258    }
1259    return wrid_desc[wrid];
1260}
1261
1262/*
1263 * RDMA requires memory registration (mlock/pinning), but this is not good for
1264 * overcommitment.
1265 *
1266 * In preparation for the future where LRU information or workload-specific
1267 * writable writable working set memory access behavior is available to QEMU
1268 * it would be nice to have in place the ability to UN-register/UN-pin
1269 * particular memory regions from the RDMA hardware when it is determine that
1270 * those regions of memory will likely not be accessed again in the near future.
1271 *
1272 * While we do not yet have such information right now, the following
1273 * compile-time option allows us to perform a non-optimized version of this
1274 * behavior.
1275 *
1276 * By uncommenting this option, you will cause *all* RDMA transfers to be
1277 * unregistered immediately after the transfer completes on both sides of the
1278 * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
1279 *
1280 * This will have a terrible impact on migration performance, so until future
1281 * workload information or LRU information is available, do not attempt to use
1282 * this feature except for basic testing.
1283 */
1284//#define RDMA_UNREGISTRATION_EXAMPLE
1285
1286/*
1287 * Perform a non-optimized memory unregistration after every transfer
1288 * for demonstration purposes, only if pin-all is not requested.
1289 *
1290 * Potential optimizations:
1291 * 1. Start a new thread to run this function continuously
1292        - for bit clearing
1293        - and for receipt of unregister messages
1294 * 2. Use an LRU.
1295 * 3. Use workload hints.
1296 */
1297static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1298{
1299    while (rdma->unregistrations[rdma->unregister_current]) {
1300        int ret;
1301        uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1302        uint64_t chunk =
1303            (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1304        uint64_t index =
1305            (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1306        RDMALocalBlock *block =
1307            &(rdma->local_ram_blocks.block[index]);
1308        RDMARegister reg = { .current_index = index };
1309        RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1310                                 };
1311        RDMAControlHeader head = { .len = sizeof(RDMARegister),
1312                                   .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1313                                   .repeat = 1,
1314                                 };
1315
1316        trace_qemu_rdma_unregister_waiting_proc(chunk,
1317                                                rdma->unregister_current);
1318
1319        rdma->unregistrations[rdma->unregister_current] = 0;
1320        rdma->unregister_current++;
1321
1322        if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1323            rdma->unregister_current = 0;
1324        }
1325
1326
1327        /*
1328         * Unregistration is speculative (because migration is single-threaded
1329         * and we cannot break the protocol's inifinband message ordering).
1330         * Thus, if the memory is currently being used for transmission,
1331         * then abort the attempt to unregister and try again
1332         * later the next time a completion is received for this memory.
1333         */
1334        clear_bit(chunk, block->unregister_bitmap);
1335
1336        if (test_bit(chunk, block->transit_bitmap)) {
1337            trace_qemu_rdma_unregister_waiting_inflight(chunk);
1338            continue;
1339        }
1340
1341        trace_qemu_rdma_unregister_waiting_send(chunk);
1342
1343        ret = ibv_dereg_mr(block->pmr[chunk]);
1344        block->pmr[chunk] = NULL;
1345        block->remote_keys[chunk] = 0;
1346
1347        if (ret != 0) {
1348            perror("unregistration chunk failed");
1349            return -ret;
1350        }
1351        rdma->total_registrations--;
1352
1353        reg.key.chunk = chunk;
1354        register_to_network(rdma, &reg);
1355        ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1356                                &resp, NULL, NULL);
1357        if (ret < 0) {
1358            return ret;
1359        }
1360
1361        trace_qemu_rdma_unregister_waiting_complete(chunk);
1362    }
1363
1364    return 0;
1365}
1366
1367static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1368                                         uint64_t chunk)
1369{
1370    uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1371
1372    result |= (index << RDMA_WRID_BLOCK_SHIFT);
1373    result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1374
1375    return result;
1376}
1377
1378/*
1379 * Set bit for unregistration in the next iteration.
1380 * We cannot transmit right here, but will unpin later.
1381 */
1382static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
1383                                        uint64_t chunk, uint64_t wr_id)
1384{
1385    if (rdma->unregistrations[rdma->unregister_next] != 0) {
1386        error_report("rdma migration: queue is full");
1387    } else {
1388        RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1389
1390        if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
1391            trace_qemu_rdma_signal_unregister_append(chunk,
1392                                                     rdma->unregister_next);
1393
1394            rdma->unregistrations[rdma->unregister_next++] =
1395                    qemu_rdma_make_wrid(wr_id, index, chunk);
1396
1397            if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
1398                rdma->unregister_next = 0;
1399            }
1400        } else {
1401            trace_qemu_rdma_signal_unregister_already(chunk);
1402        }
1403    }
1404}
1405
1406/*
1407 * Consult the connection manager to see a work request
1408 * (of any kind) has completed.
1409 * Return the work request ID that completed.
1410 */
1411static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
1412                               uint32_t *byte_len)
1413{
1414    int ret;
1415    struct ibv_wc wc;
1416    uint64_t wr_id;
1417
1418    ret = ibv_poll_cq(rdma->cq, 1, &wc);
1419
1420    if (!ret) {
1421        *wr_id_out = RDMA_WRID_NONE;
1422        return 0;
1423    }
1424
1425    if (ret < 0) {
1426        error_report("ibv_poll_cq return %d", ret);
1427        return ret;
1428    }
1429
1430    wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1431
1432    if (wc.status != IBV_WC_SUCCESS) {
1433        fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1434                        wc.status, ibv_wc_status_str(wc.status));
1435        fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1436
1437        return -1;
1438    }
1439
1440    if (rdma->control_ready_expected &&
1441        (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1442        trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL],
1443                  wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1444        rdma->control_ready_expected = 0;
1445    }
1446
1447    if (wr_id == RDMA_WRID_RDMA_WRITE) {
1448        uint64_t chunk =
1449            (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1450        uint64_t index =
1451            (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1452        RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1453
1454        trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
1455                                   index, chunk, block->local_host_addr,
1456                                   (void *)(uintptr_t)block->remote_host_addr);
1457
1458        clear_bit(chunk, block->transit_bitmap);
1459
1460        if (rdma->nb_sent > 0) {
1461            rdma->nb_sent--;
1462        }
1463
1464        if (!rdma->pin_all) {
1465            /*
1466             * FYI: If one wanted to signal a specific chunk to be unregistered
1467             * using LRU or workload-specific information, this is the function
1468             * you would call to do so. That chunk would then get asynchronously
1469             * unregistered later.
1470             */
1471#ifdef RDMA_UNREGISTRATION_EXAMPLE
1472            qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
1473#endif
1474        }
1475    } else {
1476        trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
1477    }
1478
1479    *wr_id_out = wc.wr_id;
1480    if (byte_len) {
1481        *byte_len = wc.byte_len;
1482    }
1483
1484    return  0;
1485}
1486
1487/* Wait for activity on the completion channel.
1488 * Returns 0 on success, none-0 on error.
1489 */
1490static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
1491{
1492    struct rdma_cm_event *cm_event;
1493    int ret = -1;
1494
1495    /*
1496     * Coroutine doesn't start until migration_fd_process_incoming()
1497     * so don't yield unless we know we're running inside of a coroutine.
1498     */
1499    if (rdma->migration_started_on_destination &&
1500        migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
1501        yield_until_fd_readable(rdma->comp_channel->fd);
1502    } else {
1503        /* This is the source side, we're in a separate thread
1504         * or destination prior to migration_fd_process_incoming()
1505         * after postcopy, the destination also in a seprate thread.
1506         * we can't yield; so we have to poll the fd.
1507         * But we need to be able to handle 'cancel' or an error
1508         * without hanging forever.
1509         */
1510        while (!rdma->error_state  && !rdma->received_error) {
1511            GPollFD pfds[2];
1512            pfds[0].fd = rdma->comp_channel->fd;
1513            pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1514            pfds[0].revents = 0;
1515
1516            pfds[1].fd = rdma->channel->fd;
1517            pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1518            pfds[1].revents = 0;
1519
1520            /* 0.1s timeout, should be fine for a 'cancel' */
1521            switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) {
1522            case 2:
1523            case 1: /* fd active */
1524                if (pfds[0].revents) {
1525                    return 0;
1526                }
1527
1528                if (pfds[1].revents) {
1529                    ret = rdma_get_cm_event(rdma->channel, &cm_event);
1530                    if (!ret) {
1531                        rdma_ack_cm_event(cm_event);
1532                    }
1533
1534                    error_report("receive cm event while wait comp channel,"
1535                                 "cm event is %d", cm_event->event);
1536                    if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
1537                        cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
1538                        return -EPIPE;
1539                    }
1540                }
1541                break;
1542
1543            case 0: /* Timeout, go around again */
1544                break;
1545
1546            default: /* Error of some type -
1547                      * I don't trust errno from qemu_poll_ns
1548                     */
1549                error_report("%s: poll failed", __func__);
1550                return -EPIPE;
1551            }
1552
1553            if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
1554                /* Bail out and let the cancellation happen */
1555                return -EPIPE;
1556            }
1557        }
1558    }
1559
1560    if (rdma->received_error) {
1561        return -EPIPE;
1562    }
1563    return rdma->error_state;
1564}
1565
1566/*
1567 * Block until the next work request has completed.
1568 *
1569 * First poll to see if a work request has already completed,
1570 * otherwise block.
1571 *
1572 * If we encounter completed work requests for IDs other than
1573 * the one we're interested in, then that's generally an error.
1574 *
1575 * The only exception is actual RDMA Write completions. These
1576 * completions only need to be recorded, but do not actually
1577 * need further processing.
1578 */
1579static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
1580                                    uint32_t *byte_len)
1581{
1582    int num_cq_events = 0, ret = 0;
1583    struct ibv_cq *cq;
1584    void *cq_ctx;
1585    uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1586
1587    if (ibv_req_notify_cq(rdma->cq, 0)) {
1588        return -1;
1589    }
1590    /* poll cq first */
1591    while (wr_id != wrid_requested) {
1592        ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1593        if (ret < 0) {
1594            return ret;
1595        }
1596
1597        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1598
1599        if (wr_id == RDMA_WRID_NONE) {
1600            break;
1601        }
1602        if (wr_id != wrid_requested) {
1603            trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1604                       wrid_requested, print_wrid(wr_id), wr_id);
1605        }
1606    }
1607
1608    if (wr_id == wrid_requested) {
1609        return 0;
1610    }
1611
1612    while (1) {
1613        ret = qemu_rdma_wait_comp_channel(rdma);
1614        if (ret) {
1615            goto err_block_for_wrid;
1616        }
1617
1618        ret = ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx);
1619        if (ret) {
1620            perror("ibv_get_cq_event");
1621            goto err_block_for_wrid;
1622        }
1623
1624        num_cq_events++;
1625
1626        ret = -ibv_req_notify_cq(cq, 0);
1627        if (ret) {
1628            goto err_block_for_wrid;
1629        }
1630
1631        while (wr_id != wrid_requested) {
1632            ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1633            if (ret < 0) {
1634                goto err_block_for_wrid;
1635            }
1636
1637            wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1638
1639            if (wr_id == RDMA_WRID_NONE) {
1640                break;
1641            }
1642            if (wr_id != wrid_requested) {
1643                trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1644                                   wrid_requested, print_wrid(wr_id), wr_id);
1645            }
1646        }
1647
1648        if (wr_id == wrid_requested) {
1649            goto success_block_for_wrid;
1650        }
1651    }
1652
1653success_block_for_wrid:
1654    if (num_cq_events) {
1655        ibv_ack_cq_events(cq, num_cq_events);
1656    }
1657    return 0;
1658
1659err_block_for_wrid:
1660    if (num_cq_events) {
1661        ibv_ack_cq_events(cq, num_cq_events);
1662    }
1663
1664    rdma->error_state = ret;
1665    return ret;
1666}
1667
1668/*
1669 * Post a SEND message work request for the control channel
1670 * containing some data and block until the post completes.
1671 */
1672static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1673                                       RDMAControlHeader *head)
1674{
1675    int ret = 0;
1676    RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1677    struct ibv_send_wr *bad_wr;
1678    struct ibv_sge sge = {
1679                           .addr = (uintptr_t)(wr->control),
1680                           .length = head->len + sizeof(RDMAControlHeader),
1681                           .lkey = wr->control_mr->lkey,
1682                         };
1683    struct ibv_send_wr send_wr = {
1684                                   .wr_id = RDMA_WRID_SEND_CONTROL,
1685                                   .opcode = IBV_WR_SEND,
1686                                   .send_flags = IBV_SEND_SIGNALED,
1687                                   .sg_list = &sge,
1688                                   .num_sge = 1,
1689                                };
1690
1691    trace_qemu_rdma_post_send_control(control_desc(head->type));
1692
1693    /*
1694     * We don't actually need to do a memcpy() in here if we used
1695     * the "sge" properly, but since we're only sending control messages
1696     * (not RAM in a performance-critical path), then its OK for now.
1697     *
1698     * The copy makes the RDMAControlHeader simpler to manipulate
1699     * for the time being.
1700     */
1701    assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1702    memcpy(wr->control, head, sizeof(RDMAControlHeader));
1703    control_to_network((void *) wr->control);
1704
1705    if (buf) {
1706        memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1707    }
1708
1709
1710    ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1711
1712    if (ret > 0) {
1713        error_report("Failed to use post IB SEND for control");
1714        return -ret;
1715    }
1716
1717    ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1718    if (ret < 0) {
1719        error_report("rdma migration: send polling control error");
1720    }
1721
1722    return ret;
1723}
1724
1725/*
1726 * Post a RECV work request in anticipation of some future receipt
1727 * of data on the control channel.
1728 */
1729static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1730{
1731    struct ibv_recv_wr *bad_wr;
1732    struct ibv_sge sge = {
1733                            .addr = (uintptr_t)(rdma->wr_data[idx].control),
1734                            .length = RDMA_CONTROL_MAX_BUFFER,
1735                            .lkey = rdma->wr_data[idx].control_mr->lkey,
1736                         };
1737
1738    struct ibv_recv_wr recv_wr = {
1739                                    .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1740                                    .sg_list = &sge,
1741                                    .num_sge = 1,
1742                                 };
1743
1744
1745    if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1746        return -1;
1747    }
1748
1749    return 0;
1750}
1751
1752/*
1753 * Block and wait for a RECV control channel message to arrive.
1754 */
1755static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1756                RDMAControlHeader *head, int expecting, int idx)
1757{
1758    uint32_t byte_len;
1759    int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1760                                       &byte_len);
1761
1762    if (ret < 0) {
1763        error_report("rdma migration: recv polling control error!");
1764        return ret;
1765    }
1766
1767    network_to_control((void *) rdma->wr_data[idx].control);
1768    memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1769
1770    trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
1771
1772    if (expecting == RDMA_CONTROL_NONE) {
1773        trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
1774                                             head->type);
1775    } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1776        error_report("Was expecting a %s (%d) control message"
1777                ", but got: %s (%d), length: %d",
1778                control_desc(expecting), expecting,
1779                control_desc(head->type), head->type, head->len);
1780        if (head->type == RDMA_CONTROL_ERROR) {
1781            rdma->received_error = true;
1782        }
1783        return -EIO;
1784    }
1785    if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1786        error_report("too long length: %d", head->len);
1787        return -EINVAL;
1788    }
1789    if (sizeof(*head) + head->len != byte_len) {
1790        error_report("Malformed length: %d byte_len %d", head->len, byte_len);
1791        return -EINVAL;
1792    }
1793
1794    return 0;
1795}
1796
1797/*
1798 * When a RECV work request has completed, the work request's
1799 * buffer is pointed at the header.
1800 *
1801 * This will advance the pointer to the data portion
1802 * of the control message of the work request's buffer that
1803 * was populated after the work request finished.
1804 */
1805static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1806                                  RDMAControlHeader *head)
1807{
1808    rdma->wr_data[idx].control_len = head->len;
1809    rdma->wr_data[idx].control_curr =
1810        rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1811}
1812
1813/*
1814 * This is an 'atomic' high-level operation to deliver a single, unified
1815 * control-channel message.
1816 *
1817 * Additionally, if the user is expecting some kind of reply to this message,
1818 * they can request a 'resp' response message be filled in by posting an
1819 * additional work request on behalf of the user and waiting for an additional
1820 * completion.
1821 *
1822 * The extra (optional) response is used during registration to us from having
1823 * to perform an *additional* exchange of message just to provide a response by
1824 * instead piggy-backing on the acknowledgement.
1825 */
1826static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1827                                   uint8_t *data, RDMAControlHeader *resp,
1828                                   int *resp_idx,
1829                                   int (*callback)(RDMAContext *rdma))
1830{
1831    int ret = 0;
1832
1833    /*
1834     * Wait until the dest is ready before attempting to deliver the message
1835     * by waiting for a READY message.
1836     */
1837    if (rdma->control_ready_expected) {
1838        RDMAControlHeader resp;
1839        ret = qemu_rdma_exchange_get_response(rdma,
1840                                    &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1841        if (ret < 0) {
1842            return ret;
1843        }
1844    }
1845
1846    /*
1847     * If the user is expecting a response, post a WR in anticipation of it.
1848     */
1849    if (resp) {
1850        ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1851        if (ret) {
1852            error_report("rdma migration: error posting"
1853                    " extra control recv for anticipated result!");
1854            return ret;
1855        }
1856    }
1857
1858    /*
1859     * Post a WR to replace the one we just consumed for the READY message.
1860     */
1861    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1862    if (ret) {
1863        error_report("rdma migration: error posting first control recv!");
1864        return ret;
1865    }
1866
1867    /*
1868     * Deliver the control message that was requested.
1869     */
1870    ret = qemu_rdma_post_send_control(rdma, data, head);
1871
1872    if (ret < 0) {
1873        error_report("Failed to send control buffer!");
1874        return ret;
1875    }
1876
1877    /*
1878     * If we're expecting a response, block and wait for it.
1879     */
1880    if (resp) {
1881        if (callback) {
1882            trace_qemu_rdma_exchange_send_issue_callback();
1883            ret = callback(rdma);
1884            if (ret < 0) {
1885                return ret;
1886            }
1887        }
1888
1889        trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
1890        ret = qemu_rdma_exchange_get_response(rdma, resp,
1891                                              resp->type, RDMA_WRID_DATA);
1892
1893        if (ret < 0) {
1894            return ret;
1895        }
1896
1897        qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1898        if (resp_idx) {
1899            *resp_idx = RDMA_WRID_DATA;
1900        }
1901        trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
1902    }
1903
1904    rdma->control_ready_expected = 1;
1905
1906    return 0;
1907}
1908
1909/*
1910 * This is an 'atomic' high-level operation to receive a single, unified
1911 * control-channel message.
1912 */
1913static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1914                                int expecting)
1915{
1916    RDMAControlHeader ready = {
1917                                .len = 0,
1918                                .type = RDMA_CONTROL_READY,
1919                                .repeat = 1,
1920                              };
1921    int ret;
1922
1923    /*
1924     * Inform the source that we're ready to receive a message.
1925     */
1926    ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1927
1928    if (ret < 0) {
1929        error_report("Failed to send control buffer!");
1930        return ret;
1931    }
1932
1933    /*
1934     * Block and wait for the message.
1935     */
1936    ret = qemu_rdma_exchange_get_response(rdma, head,
1937                                          expecting, RDMA_WRID_READY);
1938
1939    if (ret < 0) {
1940        return ret;
1941    }
1942
1943    qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1944
1945    /*
1946     * Post a new RECV work request to replace the one we just consumed.
1947     */
1948    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1949    if (ret) {
1950        error_report("rdma migration: error posting second control recv!");
1951        return ret;
1952    }
1953
1954    return 0;
1955}
1956
1957/*
1958 * Write an actual chunk of memory using RDMA.
1959 *
1960 * If we're using dynamic registration on the dest-side, we have to
1961 * send a registration command first.
1962 */
1963static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
1964                               int current_index, uint64_t current_addr,
1965                               uint64_t length)
1966{
1967    struct ibv_sge sge;
1968    struct ibv_send_wr send_wr = { 0 };
1969    struct ibv_send_wr *bad_wr;
1970    int reg_result_idx, ret, count = 0;
1971    uint64_t chunk, chunks;
1972    uint8_t *chunk_start, *chunk_end;
1973    RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
1974    RDMARegister reg;
1975    RDMARegisterResult *reg_result;
1976    RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
1977    RDMAControlHeader head = { .len = sizeof(RDMARegister),
1978                               .type = RDMA_CONTROL_REGISTER_REQUEST,
1979                               .repeat = 1,
1980                             };
1981
1982retry:
1983    sge.addr = (uintptr_t)(block->local_host_addr +
1984                            (current_addr - block->offset));
1985    sge.length = length;
1986
1987    chunk = ram_chunk_index(block->local_host_addr,
1988                            (uint8_t *)(uintptr_t)sge.addr);
1989    chunk_start = ram_chunk_start(block, chunk);
1990
1991    if (block->is_ram_block) {
1992        chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
1993
1994        if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1995            chunks--;
1996        }
1997    } else {
1998        chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
1999
2000        if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2001            chunks--;
2002        }
2003    }
2004
2005    trace_qemu_rdma_write_one_top(chunks + 1,
2006                                  (chunks + 1) *
2007                                  (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
2008
2009    chunk_end = ram_chunk_end(block, chunk + chunks);
2010
2011    if (!rdma->pin_all) {
2012#ifdef RDMA_UNREGISTRATION_EXAMPLE
2013        qemu_rdma_unregister_waiting(rdma);
2014#endif
2015    }
2016
2017    while (test_bit(chunk, block->transit_bitmap)) {
2018        (void)count;
2019        trace_qemu_rdma_write_one_block(count++, current_index, chunk,
2020                sge.addr, length, rdma->nb_sent, block->nb_chunks);
2021
2022        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2023
2024        if (ret < 0) {
2025            error_report("Failed to Wait for previous write to complete "
2026                    "block %d chunk %" PRIu64
2027                    " current %" PRIu64 " len %" PRIu64 " %d",
2028                    current_index, chunk, sge.addr, length, rdma->nb_sent);
2029            return ret;
2030        }
2031    }
2032
2033    if (!rdma->pin_all || !block->is_ram_block) {
2034        if (!block->remote_keys[chunk]) {
2035            /*
2036             * This chunk has not yet been registered, so first check to see
2037             * if the entire chunk is zero. If so, tell the other size to
2038             * memset() + madvise() the entire chunk without RDMA.
2039             */
2040
2041            if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
2042                RDMACompress comp = {
2043                                        .offset = current_addr,
2044                                        .value = 0,
2045                                        .block_idx = current_index,
2046                                        .length = length,
2047                                    };
2048
2049                head.len = sizeof(comp);
2050                head.type = RDMA_CONTROL_COMPRESS;
2051
2052                trace_qemu_rdma_write_one_zero(chunk, sge.length,
2053                                               current_index, current_addr);
2054
2055                compress_to_network(rdma, &comp);
2056                ret = qemu_rdma_exchange_send(rdma, &head,
2057                                (uint8_t *) &comp, NULL, NULL, NULL);
2058
2059                if (ret < 0) {
2060                    return -EIO;
2061                }
2062
2063                acct_update_position(f, sge.length, true);
2064
2065                return 1;
2066            }
2067
2068            /*
2069             * Otherwise, tell other side to register.
2070             */
2071            reg.current_index = current_index;
2072            if (block->is_ram_block) {
2073                reg.key.current_addr = current_addr;
2074            } else {
2075                reg.key.chunk = chunk;
2076            }
2077            reg.chunks = chunks;
2078
2079            trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
2080                                              current_addr);
2081
2082            register_to_network(rdma, &reg);
2083            ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
2084                                    &resp, &reg_result_idx, NULL);
2085            if (ret < 0) {
2086                return ret;
2087            }
2088
2089            /* try to overlap this single registration with the one we sent. */
2090            if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2091                                                &sge.lkey, NULL, chunk,
2092                                                chunk_start, chunk_end)) {
2093                error_report("cannot get lkey");
2094                return -EINVAL;
2095            }
2096
2097            reg_result = (RDMARegisterResult *)
2098                    rdma->wr_data[reg_result_idx].control_curr;
2099
2100            network_to_result(reg_result);
2101
2102            trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
2103                                                 reg_result->rkey, chunk);
2104
2105            block->remote_keys[chunk] = reg_result->rkey;
2106            block->remote_host_addr = reg_result->host_addr;
2107        } else {
2108            /* already registered before */
2109            if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2110                                                &sge.lkey, NULL, chunk,
2111                                                chunk_start, chunk_end)) {
2112                error_report("cannot get lkey!");
2113                return -EINVAL;
2114            }
2115        }
2116
2117        send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2118    } else {
2119        send_wr.wr.rdma.rkey = block->remote_rkey;
2120
2121        if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2122                                                     &sge.lkey, NULL, chunk,
2123                                                     chunk_start, chunk_end)) {
2124            error_report("cannot get lkey!");
2125            return -EINVAL;
2126        }
2127    }
2128
2129    /*
2130     * Encode the ram block index and chunk within this wrid.
2131     * We will use this information at the time of completion
2132     * to figure out which bitmap to check against and then which
2133     * chunk in the bitmap to look for.
2134     */
2135    send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2136                                        current_index, chunk);
2137
2138    send_wr.opcode = IBV_WR_RDMA_WRITE;
2139    send_wr.send_flags = IBV_SEND_SIGNALED;
2140    send_wr.sg_list = &sge;
2141    send_wr.num_sge = 1;
2142    send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2143                                (current_addr - block->offset);
2144
2145    trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2146                                   sge.length);
2147
2148    /*
2149     * ibv_post_send() does not return negative error numbers,
2150     * per the specification they are positive - no idea why.
2151     */
2152    ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2153
2154    if (ret == ENOMEM) {
2155        trace_qemu_rdma_write_one_queue_full();
2156        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2157        if (ret < 0) {
2158            error_report("rdma migration: failed to make "
2159                         "room in full send queue! %d", ret);
2160            return ret;
2161        }
2162
2163        goto retry;
2164
2165    } else if (ret > 0) {
2166        perror("rdma migration: post rdma write failed");
2167        return -ret;
2168    }
2169
2170    set_bit(chunk, block->transit_bitmap);
2171    acct_update_position(f, sge.length, false);
2172    rdma->total_writes++;
2173
2174    return 0;
2175}
2176
2177/*
2178 * Push out any unwritten RDMA operations.
2179 *
2180 * We support sending out multiple chunks at the same time.
2181 * Not all of them need to get signaled in the completion queue.
2182 */
2183static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
2184{
2185    int ret;
2186
2187    if (!rdma->current_length) {
2188        return 0;
2189    }
2190
2191    ret = qemu_rdma_write_one(f, rdma,
2192            rdma->current_index, rdma->current_addr, rdma->current_length);
2193
2194    if (ret < 0) {
2195        return ret;
2196    }
2197
2198    if (ret == 0) {
2199        rdma->nb_sent++;
2200        trace_qemu_rdma_write_flush(rdma->nb_sent);
2201    }
2202
2203    rdma->current_length = 0;
2204    rdma->current_addr = 0;
2205
2206    return 0;
2207}
2208
2209static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
2210                    uint64_t offset, uint64_t len)
2211{
2212    RDMALocalBlock *block;
2213    uint8_t *host_addr;
2214    uint8_t *chunk_end;
2215
2216    if (rdma->current_index < 0) {
2217        return 0;
2218    }
2219
2220    if (rdma->current_chunk < 0) {
2221        return 0;
2222    }
2223
2224    block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2225    host_addr = block->local_host_addr + (offset - block->offset);
2226    chunk_end = ram_chunk_end(block, rdma->current_chunk);
2227
2228    if (rdma->current_length == 0) {
2229        return 0;
2230    }
2231
2232    /*
2233     * Only merge into chunk sequentially.
2234     */
2235    if (offset != (rdma->current_addr + rdma->current_length)) {
2236        return 0;
2237    }
2238
2239    if (offset < block->offset) {
2240        return 0;
2241    }
2242
2243    if ((offset + len) > (block->offset + block->length)) {
2244        return 0;
2245    }
2246
2247    if ((host_addr + len) > chunk_end) {
2248        return 0;
2249    }
2250
2251    return 1;
2252}
2253
2254/*
2255 * We're not actually writing here, but doing three things:
2256 *
2257 * 1. Identify the chunk the buffer belongs to.
2258 * 2. If the chunk is full or the buffer doesn't belong to the current
2259 *    chunk, then start a new chunk and flush() the old chunk.
2260 * 3. To keep the hardware busy, we also group chunks into batches
2261 *    and only require that a batch gets acknowledged in the completion
2262 *    qeueue instead of each individual chunk.
2263 */
2264static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
2265                           uint64_t block_offset, uint64_t offset,
2266                           uint64_t len)
2267{
2268    uint64_t current_addr = block_offset + offset;
2269    uint64_t index = rdma->current_index;
2270    uint64_t chunk = rdma->current_chunk;
2271    int ret;
2272
2273    /* If we cannot merge it, we flush the current buffer first. */
2274    if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
2275        ret = qemu_rdma_write_flush(f, rdma);
2276        if (ret) {
2277            return ret;
2278        }
2279        rdma->current_length = 0;
2280        rdma->current_addr = current_addr;
2281
2282        ret = qemu_rdma_search_ram_block(rdma, block_offset,
2283                                         offset, len, &index, &chunk);
2284        if (ret) {
2285            error_report("ram block search failed");
2286            return ret;
2287        }
2288        rdma->current_index = index;
2289        rdma->current_chunk = chunk;
2290    }
2291
2292    /* merge it */
2293    rdma->current_length += len;
2294
2295    /* flush it if buffer is too large */
2296    if (rdma->current_length >= RDMA_MERGE_MAX) {
2297        return qemu_rdma_write_flush(f, rdma);
2298    }
2299
2300    return 0;
2301}
2302
2303static void qemu_rdma_cleanup(RDMAContext *rdma)
2304{
2305    int idx;
2306
2307    if (rdma->cm_id && rdma->connected) {
2308        if ((rdma->error_state ||
2309             migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
2310            !rdma->received_error) {
2311            RDMAControlHeader head = { .len = 0,
2312                                       .type = RDMA_CONTROL_ERROR,
2313                                       .repeat = 1,
2314                                     };
2315            error_report("Early error. Sending error.");
2316            qemu_rdma_post_send_control(rdma, NULL, &head);
2317        }
2318
2319        rdma_disconnect(rdma->cm_id);
2320        trace_qemu_rdma_cleanup_disconnect();
2321        rdma->connected = false;
2322    }
2323
2324    g_free(rdma->dest_blocks);
2325    rdma->dest_blocks = NULL;
2326
2327    for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2328        if (rdma->wr_data[idx].control_mr) {
2329            rdma->total_registrations--;
2330            ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2331        }
2332        rdma->wr_data[idx].control_mr = NULL;
2333    }
2334
2335    if (rdma->local_ram_blocks.block) {
2336        while (rdma->local_ram_blocks.nb_blocks) {
2337            rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2338        }
2339    }
2340
2341    if (rdma->qp) {
2342        rdma_destroy_qp(rdma->cm_id);
2343        rdma->qp = NULL;
2344    }
2345    if (rdma->cq) {
2346        ibv_destroy_cq(rdma->cq);
2347        rdma->cq = NULL;
2348    }
2349    if (rdma->comp_channel) {
2350        ibv_destroy_comp_channel(rdma->comp_channel);
2351        rdma->comp_channel = NULL;
2352    }
2353    if (rdma->pd) {
2354        ibv_dealloc_pd(rdma->pd);
2355        rdma->pd = NULL;
2356    }
2357    if (rdma->cm_id) {
2358        rdma_destroy_id(rdma->cm_id);
2359        rdma->cm_id = NULL;
2360    }
2361
2362    /* the destination side, listen_id and channel is shared */
2363    if (rdma->listen_id) {
2364        if (!rdma->is_return_path) {
2365            rdma_destroy_id(rdma->listen_id);
2366        }
2367        rdma->listen_id = NULL;
2368
2369        if (rdma->channel) {
2370            if (!rdma->is_return_path) {
2371                rdma_destroy_event_channel(rdma->channel);
2372            }
2373            rdma->channel = NULL;
2374        }
2375    }
2376
2377    if (rdma->channel) {
2378        rdma_destroy_event_channel(rdma->channel);
2379        rdma->channel = NULL;
2380    }
2381    g_free(rdma->host);
2382    rdma->host = NULL;
2383}
2384
2385
2386static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
2387{
2388    int ret, idx;
2389    Error *local_err = NULL, **temp = &local_err;
2390
2391    /*
2392     * Will be validated against destination's actual capabilities
2393     * after the connect() completes.
2394     */
2395    rdma->pin_all = pin_all;
2396
2397    ret = qemu_rdma_resolve_host(rdma, temp);
2398    if (ret) {
2399        goto err_rdma_source_init;
2400    }
2401
2402    ret = qemu_rdma_alloc_pd_cq(rdma);
2403    if (ret) {
2404        ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2405                    " limits may be too low. Please check $ ulimit -a # and "
2406                    "search for 'ulimit -l' in the output");
2407        goto err_rdma_source_init;
2408    }
2409
2410    ret = qemu_rdma_alloc_qp(rdma);
2411    if (ret) {
2412        ERROR(temp, "rdma migration: error allocating qp!");
2413        goto err_rdma_source_init;
2414    }
2415
2416    ret = qemu_rdma_init_ram_blocks(rdma);
2417    if (ret) {
2418        ERROR(temp, "rdma migration: error initializing ram blocks!");
2419        goto err_rdma_source_init;
2420    }
2421
2422    /* Build the hash that maps from offset to RAMBlock */
2423    rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2424    for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
2425        g_hash_table_insert(rdma->blockmap,
2426                (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
2427                &rdma->local_ram_blocks.block[idx]);
2428    }
2429
2430    for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2431        ret = qemu_rdma_reg_control(rdma, idx);
2432        if (ret) {
2433            ERROR(temp, "rdma migration: error registering %d control!",
2434                                                            idx);
2435            goto err_rdma_source_init;
2436        }
2437    }
2438
2439    return 0;
2440
2441err_rdma_source_init:
2442    error_propagate(errp, local_err);
2443    qemu_rdma_cleanup(rdma);
2444    return -1;
2445}
2446
2447static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
2448{
2449    RDMACapabilities cap = {
2450                                .version = RDMA_CONTROL_VERSION_CURRENT,
2451                                .flags = 0,
2452                           };
2453    struct rdma_conn_param conn_param = { .initiator_depth = 2,
2454                                          .retry_count = 5,
2455                                          .private_data = &cap,
2456                                          .private_data_len = sizeof(cap),
2457                                        };
2458    struct rdma_cm_event *cm_event;
2459    int ret;
2460
2461    /*
2462     * Only negotiate the capability with destination if the user
2463     * on the source first requested the capability.
2464     */
2465    if (rdma->pin_all) {
2466        trace_qemu_rdma_connect_pin_all_requested();
2467        cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2468    }
2469
2470    caps_to_network(&cap);
2471
2472    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2473    if (ret) {
2474        ERROR(errp, "posting second control recv");
2475        goto err_rdma_source_connect;
2476    }
2477
2478    ret = rdma_connect(rdma->cm_id, &conn_param);
2479    if (ret) {
2480        perror("rdma_connect");
2481        ERROR(errp, "connecting to destination!");
2482        goto err_rdma_source_connect;
2483    }
2484
2485    ret = rdma_get_cm_event(rdma->channel, &cm_event);
2486    if (ret) {
2487        perror("rdma_get_cm_event after rdma_connect");
2488        ERROR(errp, "connecting to destination!");
2489        rdma_ack_cm_event(cm_event);
2490        goto err_rdma_source_connect;
2491    }
2492
2493    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2494        perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
2495        ERROR(errp, "connecting to destination!");
2496        rdma_ack_cm_event(cm_event);
2497        goto err_rdma_source_connect;
2498    }
2499    rdma->connected = true;
2500
2501    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2502    network_to_caps(&cap);
2503
2504    /*
2505     * Verify that the *requested* capabilities are supported by the destination
2506     * and disable them otherwise.
2507     */
2508    if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2509        ERROR(errp, "Server cannot support pinning all memory. "
2510                        "Will register memory dynamically.");
2511        rdma->pin_all = false;
2512    }
2513
2514    trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2515
2516    rdma_ack_cm_event(cm_event);
2517
2518    rdma->control_ready_expected = 1;
2519    rdma->nb_sent = 0;
2520    return 0;
2521
2522err_rdma_source_connect:
2523    qemu_rdma_cleanup(rdma);
2524    return -1;
2525}
2526
2527static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2528{
2529    int ret, idx;
2530    struct rdma_cm_id *listen_id;
2531    char ip[40] = "unknown";
2532    struct rdma_addrinfo *res, *e;
2533    char port_str[16];
2534
2535    for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2536        rdma->wr_data[idx].control_len = 0;
2537        rdma->wr_data[idx].control_curr = NULL;
2538    }
2539
2540    if (!rdma->host || !rdma->host[0]) {
2541        ERROR(errp, "RDMA host is not set!");
2542        rdma->error_state = -EINVAL;
2543        return -1;
2544    }
2545    /* create CM channel */
2546    rdma->channel = rdma_create_event_channel();
2547    if (!rdma->channel) {
2548        ERROR(errp, "could not create rdma event channel");
2549        rdma->error_state = -EINVAL;
2550        return -1;
2551    }
2552
2553    /* create CM id */
2554    ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2555    if (ret) {
2556        ERROR(errp, "could not create cm_id!");
2557        goto err_dest_init_create_listen_id;
2558    }
2559
2560    snprintf(port_str, 16, "%d", rdma->port);
2561    port_str[15] = '\0';
2562
2563    ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2564    if (ret < 0) {
2565        ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
2566        goto err_dest_init_bind_addr;
2567    }
2568
2569    for (e = res; e != NULL; e = e->ai_next) {
2570        inet_ntop(e->ai_family,
2571            &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2572        trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2573        ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2574        if (ret) {
2575            continue;
2576        }
2577        if (e->ai_family == AF_INET6) {
2578            ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp);
2579            if (ret) {
2580                continue;
2581            }
2582        }
2583        break;
2584    }
2585
2586    if (!e) {
2587        ERROR(errp, "Error: could not rdma_bind_addr!");
2588        goto err_dest_init_bind_addr;
2589    }
2590
2591    rdma->listen_id = listen_id;
2592    qemu_rdma_dump_gid("dest_init", listen_id);
2593    return 0;
2594
2595err_dest_init_bind_addr:
2596    rdma_destroy_id(listen_id);
2597err_dest_init_create_listen_id:
2598    rdma_destroy_event_channel(rdma->channel);
2599    rdma->channel = NULL;
2600    rdma->error_state = ret;
2601    return ret;
2602
2603}
2604
2605static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
2606                                            RDMAContext *rdma)
2607{
2608    int idx;
2609
2610    for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2611        rdma_return_path->wr_data[idx].control_len = 0;
2612        rdma_return_path->wr_data[idx].control_curr = NULL;
2613    }
2614
2615    /*the CM channel and CM id is shared*/
2616    rdma_return_path->channel = rdma->channel;
2617    rdma_return_path->listen_id = rdma->listen_id;
2618
2619    rdma->return_path = rdma_return_path;
2620    rdma_return_path->return_path = rdma;
2621    rdma_return_path->is_return_path = true;
2622}
2623
2624static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2625{
2626    RDMAContext *rdma = NULL;
2627    InetSocketAddress *addr;
2628
2629    if (host_port) {
2630        rdma = g_new0(RDMAContext, 1);
2631        rdma->current_index = -1;
2632        rdma->current_chunk = -1;
2633
2634        addr = g_new(InetSocketAddress, 1);
2635        if (!inet_parse(addr, host_port, NULL)) {
2636            rdma->port = atoi(addr->port);
2637            rdma->host = g_strdup(addr->host);
2638        } else {
2639            ERROR(errp, "bad RDMA migration address '%s'", host_port);
2640            g_free(rdma);
2641            rdma = NULL;
2642        }
2643
2644        qapi_free_InetSocketAddress(addr);
2645    }
2646
2647    return rdma;
2648}
2649
2650/*
2651 * QEMUFile interface to the control channel.
2652 * SEND messages for control only.
2653 * VM's ram is handled with regular RDMA messages.
2654 */
2655static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2656                                       const struct iovec *iov,
2657                                       size_t niov,
2658                                       int *fds,
2659                                       size_t nfds,
2660                                       Error **errp)
2661{
2662    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2663    QEMUFile *f = rioc->file;
2664    RDMAContext *rdma;
2665    int ret;
2666    ssize_t done = 0;
2667    size_t i;
2668    size_t len = 0;
2669
2670    rcu_read_lock();
2671    rdma = atomic_rcu_read(&rioc->rdmaout);
2672
2673    if (!rdma) {
2674        rcu_read_unlock();
2675        return -EIO;
2676    }
2677
2678    CHECK_ERROR_STATE();
2679
2680    /*
2681     * Push out any writes that
2682     * we're queued up for VM's ram.
2683     */
2684    ret = qemu_rdma_write_flush(f, rdma);
2685    if (ret < 0) {
2686        rdma->error_state = ret;
2687        rcu_read_unlock();
2688        return ret;
2689    }
2690
2691    for (i = 0; i < niov; i++) {
2692        size_t remaining = iov[i].iov_len;
2693        uint8_t * data = (void *)iov[i].iov_base;
2694        while (remaining) {
2695            RDMAControlHeader head;
2696
2697            len = MIN(remaining, RDMA_SEND_INCREMENT);
2698            remaining -= len;
2699
2700            head.len = len;
2701            head.type = RDMA_CONTROL_QEMU_FILE;
2702
2703            ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2704
2705            if (ret < 0) {
2706                rdma->error_state = ret;
2707                rcu_read_unlock();
2708                return ret;
2709            }
2710
2711            data += len;
2712            done += len;
2713        }
2714    }
2715
2716    rcu_read_unlock();
2717    return done;
2718}
2719
2720static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2721                             size_t size, int idx)
2722{
2723    size_t len = 0;
2724
2725    if (rdma->wr_data[idx].control_len) {
2726        trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2727
2728        len = MIN(size, rdma->wr_data[idx].control_len);
2729        memcpy(buf, rdma->wr_data[idx].control_curr, len);
2730        rdma->wr_data[idx].control_curr += len;
2731        rdma->wr_data[idx].control_len -= len;
2732    }
2733
2734    return len;
2735}
2736
2737/*
2738 * QEMUFile interface to the control channel.
2739 * RDMA links don't use bytestreams, so we have to
2740 * return bytes to QEMUFile opportunistically.
2741 */
2742static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2743                                      const struct iovec *iov,
2744                                      size_t niov,
2745                                      int **fds,
2746                                      size_t *nfds,
2747                                      Error **errp)
2748{
2749    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2750    RDMAContext *rdma;
2751    RDMAControlHeader head;
2752    int ret = 0;
2753    ssize_t i;
2754    size_t done = 0;
2755
2756    rcu_read_lock();
2757    rdma = atomic_rcu_read(&rioc->rdmain);
2758
2759    if (!rdma) {
2760        rcu_read_unlock();
2761        return -EIO;
2762    }
2763
2764    CHECK_ERROR_STATE();
2765
2766    for (i = 0; i < niov; i++) {
2767        size_t want = iov[i].iov_len;
2768        uint8_t *data = (void *)iov[i].iov_base;
2769
2770        /*
2771         * First, we hold on to the last SEND message we
2772         * were given and dish out the bytes until we run
2773         * out of bytes.
2774         */
2775        ret = qemu_rdma_fill(rdma, data, want, 0);
2776        done += ret;
2777        want -= ret;
2778        /* Got what we needed, so go to next iovec */
2779        if (want == 0) {
2780            continue;
2781        }
2782
2783        /* If we got any data so far, then don't wait
2784         * for more, just return what we have */
2785        if (done > 0) {
2786            break;
2787        }
2788
2789
2790        /* We've got nothing at all, so lets wait for
2791         * more to arrive
2792         */
2793        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2794
2795        if (ret < 0) {
2796            rdma->error_state = ret;
2797            rcu_read_unlock();
2798            return ret;
2799        }
2800
2801        /*
2802         * SEND was received with new bytes, now try again.
2803         */
2804        ret = qemu_rdma_fill(rdma, data, want, 0);
2805        done += ret;
2806        want -= ret;
2807
2808        /* Still didn't get enough, so lets just return */
2809        if (want) {
2810            if (done == 0) {
2811                rcu_read_unlock();
2812                return QIO_CHANNEL_ERR_BLOCK;
2813            } else {
2814                break;
2815            }
2816        }
2817    }
2818    rcu_read_unlock();
2819    return done;
2820}
2821
2822/*
2823 * Block until all the outstanding chunks have been delivered by the hardware.
2824 */
2825static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2826{
2827    int ret;
2828
2829    if (qemu_rdma_write_flush(f, rdma) < 0) {
2830        return -EIO;
2831    }
2832
2833    while (rdma->nb_sent) {
2834        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2835        if (ret < 0) {
2836            error_report("rdma migration: complete polling error!");
2837            return -EIO;
2838        }
2839    }
2840
2841    qemu_rdma_unregister_waiting(rdma);
2842
2843    return 0;
2844}
2845
2846
2847static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2848                                         bool blocking,
2849                                         Error **errp)
2850{
2851    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2852    /* XXX we should make readv/writev actually honour this :-) */
2853    rioc->blocking = blocking;
2854    return 0;
2855}
2856
2857
2858typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2859struct QIOChannelRDMASource {
2860    GSource parent;
2861    QIOChannelRDMA *rioc;
2862    GIOCondition condition;
2863};
2864
2865static gboolean
2866qio_channel_rdma_source_prepare(GSource *source,
2867                                gint *timeout)
2868{
2869    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2870    RDMAContext *rdma;
2871    GIOCondition cond = 0;
2872    *timeout = -1;
2873
2874    rcu_read_lock();
2875    if (rsource->condition == G_IO_IN) {
2876        rdma = atomic_rcu_read(&rsource->rioc->rdmain);
2877    } else {
2878        rdma = atomic_rcu_read(&rsource->rioc->rdmaout);
2879    }
2880
2881    if (!rdma) {
2882        error_report("RDMAContext is NULL when prepare Gsource");
2883        rcu_read_unlock();
2884        return FALSE;
2885    }
2886
2887    if (rdma->wr_data[0].control_len) {
2888        cond |= G_IO_IN;
2889    }
2890    cond |= G_IO_OUT;
2891
2892    rcu_read_unlock();
2893    return cond & rsource->condition;
2894}
2895
2896static gboolean
2897qio_channel_rdma_source_check(GSource *source)
2898{
2899    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2900    RDMAContext *rdma;
2901    GIOCondition cond = 0;
2902
2903    rcu_read_lock();
2904    if (rsource->condition == G_IO_IN) {
2905        rdma = atomic_rcu_read(&rsource->rioc->rdmain);
2906    } else {
2907        rdma = atomic_rcu_read(&rsource->rioc->rdmaout);
2908    }
2909
2910    if (!rdma) {
2911        error_report("RDMAContext is NULL when check Gsource");
2912        rcu_read_unlock();
2913        return FALSE;
2914    }
2915
2916    if (rdma->wr_data[0].control_len) {
2917        cond |= G_IO_IN;
2918    }
2919    cond |= G_IO_OUT;
2920
2921    rcu_read_unlock();
2922    return cond & rsource->condition;
2923}
2924
2925static gboolean
2926qio_channel_rdma_source_dispatch(GSource *source,
2927                                 GSourceFunc callback,
2928                                 gpointer user_data)
2929{
2930    QIOChannelFunc func = (QIOChannelFunc)callback;
2931    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2932    RDMAContext *rdma;
2933    GIOCondition cond = 0;
2934
2935    rcu_read_lock();
2936    if (rsource->condition == G_IO_IN) {
2937        rdma = atomic_rcu_read(&rsource->rioc->rdmain);
2938    } else {
2939        rdma = atomic_rcu_read(&rsource->rioc->rdmaout);
2940    }
2941
2942    if (!rdma) {
2943        error_report("RDMAContext is NULL when dispatch Gsource");
2944        rcu_read_unlock();
2945        return FALSE;
2946    }
2947
2948    if (rdma->wr_data[0].control_len) {
2949        cond |= G_IO_IN;
2950    }
2951    cond |= G_IO_OUT;
2952
2953    rcu_read_unlock();
2954    return (*func)(QIO_CHANNEL(rsource->rioc),
2955                   (cond & rsource->condition),
2956                   user_data);
2957}
2958
2959static void
2960qio_channel_rdma_source_finalize(GSource *source)
2961{
2962    QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
2963
2964    object_unref(OBJECT(ssource->rioc));
2965}
2966
2967GSourceFuncs qio_channel_rdma_source_funcs = {
2968    qio_channel_rdma_source_prepare,
2969    qio_channel_rdma_source_check,
2970    qio_channel_rdma_source_dispatch,
2971    qio_channel_rdma_source_finalize
2972};
2973
2974static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
2975                                              GIOCondition condition)
2976{
2977    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2978    QIOChannelRDMASource *ssource;
2979    GSource *source;
2980
2981    source = g_source_new(&qio_channel_rdma_source_funcs,
2982                          sizeof(QIOChannelRDMASource));
2983    ssource = (QIOChannelRDMASource *)source;
2984
2985    ssource->rioc = rioc;
2986    object_ref(OBJECT(rioc));
2987
2988    ssource->condition = condition;
2989
2990    return source;
2991}
2992
2993static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
2994                                                  AioContext *ctx,
2995                                                  IOHandler *io_read,
2996                                                  IOHandler *io_write,
2997                                                  void *opaque)
2998{
2999    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3000    if (io_read) {
3001        aio_set_fd_handler(ctx, rioc->rdmain->comp_channel->fd,
3002                           false, io_read, io_write, NULL, opaque);
3003    } else {
3004        aio_set_fd_handler(ctx, rioc->rdmaout->comp_channel->fd,
3005                           false, io_read, io_write, NULL, opaque);
3006    }
3007}
3008
3009static int qio_channel_rdma_close(QIOChannel *ioc,
3010                                  Error **errp)
3011{
3012    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3013    RDMAContext *rdmain, *rdmaout;
3014    trace_qemu_rdma_close();
3015
3016    rdmain = rioc->rdmain;
3017    if (rdmain) {
3018        atomic_rcu_set(&rioc->rdmain, NULL);
3019    }
3020
3021    rdmaout = rioc->rdmaout;
3022    if (rdmaout) {
3023        atomic_rcu_set(&rioc->rdmaout, NULL);
3024    }
3025
3026    synchronize_rcu();
3027
3028    if (rdmain) {
3029        qemu_rdma_cleanup(rdmain);
3030    }
3031
3032    if (rdmaout) {
3033        qemu_rdma_cleanup(rdmaout);
3034    }
3035
3036    g_free(rdmain);
3037    g_free(rdmaout);
3038
3039    return 0;
3040}
3041
3042static int
3043qio_channel_rdma_shutdown(QIOChannel *ioc,
3044                            QIOChannelShutdown how,
3045                            Error **errp)
3046{
3047    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3048    RDMAContext *rdmain, *rdmaout;
3049
3050    rcu_read_lock();
3051
3052    rdmain = atomic_rcu_read(&rioc->rdmain);
3053    rdmaout = atomic_rcu_read(&rioc->rdmain);
3054
3055    switch (how) {
3056    case QIO_CHANNEL_SHUTDOWN_READ:
3057        if (rdmain) {
3058            rdmain->error_state = -1;
3059        }
3060        break;
3061    case QIO_CHANNEL_SHUTDOWN_WRITE:
3062        if (rdmaout) {
3063            rdmaout->error_state = -1;
3064        }
3065        break;
3066    case QIO_CHANNEL_SHUTDOWN_BOTH:
3067    default:
3068        if (rdmain) {
3069            rdmain->error_state = -1;
3070        }
3071        if (rdmaout) {
3072            rdmaout->error_state = -1;
3073        }
3074        break;
3075    }
3076
3077    rcu_read_unlock();
3078    return 0;
3079}
3080
3081/*
3082 * Parameters:
3083 *    @offset == 0 :
3084 *        This means that 'block_offset' is a full virtual address that does not
3085 *        belong to a RAMBlock of the virtual machine and instead
3086 *        represents a private malloc'd memory area that the caller wishes to
3087 *        transfer.
3088 *
3089 *    @offset != 0 :
3090 *        Offset is an offset to be added to block_offset and used
3091 *        to also lookup the corresponding RAMBlock.
3092 *
3093 *    @size > 0 :
3094 *        Initiate an transfer this size.
3095 *
3096 *    @size == 0 :
3097 *        A 'hint' or 'advice' that means that we wish to speculatively
3098 *        and asynchronously unregister this memory. In this case, there is no
3099 *        guarantee that the unregister will actually happen, for example,
3100 *        if the memory is being actively transmitted. Additionally, the memory
3101 *        may be re-registered at any future time if a write within the same
3102 *        chunk was requested again, even if you attempted to unregister it
3103 *        here.
3104 *
3105 *    @size < 0 : TODO, not yet supported
3106 *        Unregister the memory NOW. This means that the caller does not
3107 *        expect there to be any future RDMA transfers and we just want to clean
3108 *        things up. This is used in case the upper layer owns the memory and
3109 *        cannot wait for qemu_fclose() to occur.
3110 *
3111 *    @bytes_sent : User-specificed pointer to indicate how many bytes were
3112 *                  sent. Usually, this will not be more than a few bytes of
3113 *                  the protocol because most transfers are sent asynchronously.
3114 */
3115static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
3116                                  ram_addr_t block_offset, ram_addr_t offset,
3117                                  size_t size, uint64_t *bytes_sent)
3118{
3119    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3120    RDMAContext *rdma;
3121    int ret;
3122
3123    rcu_read_lock();
3124    rdma = atomic_rcu_read(&rioc->rdmaout);
3125
3126    if (!rdma) {
3127        rcu_read_unlock();
3128        return -EIO;
3129    }
3130
3131    CHECK_ERROR_STATE();
3132
3133    if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
3134        rcu_read_unlock();
3135        return RAM_SAVE_CONTROL_NOT_SUPP;
3136    }
3137
3138    qemu_fflush(f);
3139
3140    if (size > 0) {
3141        /*
3142         * Add this page to the current 'chunk'. If the chunk
3143         * is full, or the page doen't belong to the current chunk,
3144         * an actual RDMA write will occur and a new chunk will be formed.
3145         */
3146        ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
3147        if (ret < 0) {
3148            error_report("rdma migration: write error! %d", ret);
3149            goto err;
3150        }
3151
3152        /*
3153         * We always return 1 bytes because the RDMA
3154         * protocol is completely asynchronous. We do not yet know
3155         * whether an  identified chunk is zero or not because we're
3156         * waiting for other pages to potentially be merged with
3157         * the current chunk. So, we have to call qemu_update_position()
3158         * later on when the actual write occurs.
3159         */
3160        if (bytes_sent) {
3161            *bytes_sent = 1;
3162        }
3163    } else {
3164        uint64_t index, chunk;
3165
3166        /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
3167        if (size < 0) {
3168            ret = qemu_rdma_drain_cq(f, rdma);
3169            if (ret < 0) {
3170                fprintf(stderr, "rdma: failed to synchronously drain"
3171                                " completion queue before unregistration.\n");
3172                goto err;
3173            }
3174        }
3175        */
3176
3177        ret = qemu_rdma_search_ram_block(rdma, block_offset,
3178                                         offset, size, &index, &chunk);
3179
3180        if (ret) {
3181            error_report("ram block search failed");
3182            goto err;
3183        }
3184
3185        qemu_rdma_signal_unregister(rdma, index, chunk, 0);
3186
3187        /*
3188         * TODO: Synchronous, guaranteed unregistration (should not occur during
3189         * fast-path). Otherwise, unregisters will process on the next call to
3190         * qemu_rdma_drain_cq()
3191        if (size < 0) {
3192            qemu_rdma_unregister_waiting(rdma);
3193        }
3194        */
3195    }
3196
3197    /*
3198     * Drain the Completion Queue if possible, but do not block,
3199     * just poll.
3200     *
3201     * If nothing to poll, the end of the iteration will do this
3202     * again to make sure we don't overflow the request queue.
3203     */
3204    while (1) {
3205        uint64_t wr_id, wr_id_in;
3206        int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
3207        if (ret < 0) {
3208            error_report("rdma migration: polling error! %d", ret);
3209            goto err;
3210        }
3211
3212        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3213
3214        if (wr_id == RDMA_WRID_NONE) {
3215            break;
3216        }
3217    }
3218
3219    rcu_read_unlock();
3220    return RAM_SAVE_CONTROL_DELAYED;
3221err:
3222    rdma->error_state = ret;
3223    rcu_read_unlock();
3224    return ret;
3225}
3226
3227static void rdma_accept_incoming_migration(void *opaque);
3228
3229static void rdma_cm_poll_handler(void *opaque)
3230{
3231    RDMAContext *rdma = opaque;
3232    int ret;
3233    struct rdma_cm_event *cm_event;
3234    MigrationIncomingState *mis = migration_incoming_get_current();
3235
3236    ret = rdma_get_cm_event(rdma->channel, &cm_event);
3237    if (ret) {
3238        error_report("get_cm_event failed %d", errno);
3239        return;
3240    }
3241    rdma_ack_cm_event(cm_event);
3242
3243    if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
3244        cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
3245        error_report("receive cm event, cm event is %d", cm_event->event);
3246        rdma->error_state = -EPIPE;
3247        if (rdma->return_path) {
3248            rdma->return_path->error_state = -EPIPE;
3249        }
3250
3251        if (mis->migration_incoming_co) {
3252            qemu_coroutine_enter(mis->migration_incoming_co);
3253        }
3254        return;
3255    }
3256}
3257
3258static int qemu_rdma_accept(RDMAContext *rdma)
3259{
3260    RDMACapabilities cap;
3261    struct rdma_conn_param conn_param = {
3262                                            .responder_resources = 2,
3263                                            .private_data = &cap,
3264                                            .private_data_len = sizeof(cap),
3265                                         };
3266    struct rdma_cm_event *cm_event;
3267    struct ibv_context *verbs;
3268    int ret = -EINVAL;
3269    int idx;
3270
3271    ret = rdma_get_cm_event(rdma->channel, &cm_event);
3272    if (ret) {
3273        goto err_rdma_dest_wait;
3274    }
3275
3276    if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
3277        rdma_ack_cm_event(cm_event);
3278        goto err_rdma_dest_wait;
3279    }
3280
3281    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
3282
3283    network_to_caps(&cap);
3284
3285    if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
3286            error_report("Unknown source RDMA version: %d, bailing...",
3287                            cap.version);
3288            rdma_ack_cm_event(cm_event);
3289            goto err_rdma_dest_wait;
3290    }
3291
3292    /*
3293     * Respond with only the capabilities this version of QEMU knows about.
3294     */
3295    cap.flags &= known_capabilities;
3296
3297    /*
3298     * Enable the ones that we do know about.
3299     * Add other checks here as new ones are introduced.
3300     */
3301    if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
3302        rdma->pin_all = true;
3303    }
3304
3305    rdma->cm_id = cm_event->id;
3306    verbs = cm_event->id->verbs;
3307
3308    rdma_ack_cm_event(cm_event);
3309
3310    trace_qemu_rdma_accept_pin_state(rdma->pin_all);
3311
3312    caps_to_network(&cap);
3313
3314    trace_qemu_rdma_accept_pin_verbsc(verbs);
3315
3316    if (!rdma->verbs) {
3317        rdma->verbs = verbs;
3318    } else if (rdma->verbs != verbs) {
3319            error_report("ibv context not matching %p, %p!", rdma->verbs,
3320                         verbs);
3321            goto err_rdma_dest_wait;
3322    }
3323
3324    qemu_rdma_dump_id("dest_init", verbs);
3325
3326    ret = qemu_rdma_alloc_pd_cq(rdma);
3327    if (ret) {
3328        error_report("rdma migration: error allocating pd and cq!");
3329        goto err_rdma_dest_wait;
3330    }
3331
3332    ret = qemu_rdma_alloc_qp(rdma);
3333    if (ret) {
3334        error_report("rdma migration: error allocating qp!");
3335        goto err_rdma_dest_wait;
3336    }
3337
3338    ret = qemu_rdma_init_ram_blocks(rdma);
3339    if (ret) {
3340        error_report("rdma migration: error initializing ram blocks!");
3341        goto err_rdma_dest_wait;
3342    }
3343
3344    for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
3345        ret = qemu_rdma_reg_control(rdma, idx);
3346        if (ret) {
3347            error_report("rdma: error registering %d control", idx);
3348            goto err_rdma_dest_wait;
3349        }
3350    }
3351
3352    /* Accept the second connection request for return path */
3353    if (migrate_postcopy() && !rdma->is_return_path) {
3354        qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3355                            NULL,
3356                            (void *)(intptr_t)rdma->return_path);
3357    } else {
3358        qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
3359                            NULL, rdma);
3360    }
3361
3362    ret = rdma_accept(rdma->cm_id, &conn_param);
3363    if (ret) {
3364        error_report("rdma_accept returns %d", ret);
3365        goto err_rdma_dest_wait;
3366    }
3367
3368    ret = rdma_get_cm_event(rdma->channel, &cm_event);
3369    if (ret) {
3370        error_report("rdma_accept get_cm_event failed %d", ret);
3371        goto err_rdma_dest_wait;
3372    }
3373
3374    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
3375        error_report("rdma_accept not event established");
3376        rdma_ack_cm_event(cm_event);
3377        goto err_rdma_dest_wait;
3378    }
3379
3380    rdma_ack_cm_event(cm_event);
3381    rdma->connected = true;
3382
3383    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
3384    if (ret) {
3385        error_report("rdma migration: error posting second control recv");
3386        goto err_rdma_dest_wait;
3387    }
3388
3389    qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3390
3391    return 0;
3392
3393err_rdma_dest_wait:
3394    rdma->error_state = ret;
3395    qemu_rdma_cleanup(rdma);
3396    return ret;
3397}
3398
3399static int dest_ram_sort_func(const void *a, const void *b)
3400{
3401    unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3402    unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3403
3404    return (a_index < b_index) ? -1 : (a_index != b_index);
3405}
3406
3407/*
3408 * During each iteration of the migration, we listen for instructions
3409 * by the source VM to perform dynamic page registrations before they
3410 * can perform RDMA operations.
3411 *
3412 * We respond with the 'rkey'.
3413 *
3414 * Keep doing this until the source tells us to stop.
3415 */
3416static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
3417{
3418    RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3419                               .type = RDMA_CONTROL_REGISTER_RESULT,
3420                               .repeat = 0,
3421                             };
3422    RDMAControlHeader unreg_resp = { .len = 0,
3423                               .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3424                               .repeat = 0,
3425                             };
3426    RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3427                                 .repeat = 1 };
3428    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3429    RDMAContext *rdma;
3430    RDMALocalBlocks *local;
3431    RDMAControlHeader head;
3432    RDMARegister *reg, *registers;
3433    RDMACompress *comp;
3434    RDMARegisterResult *reg_result;
3435    static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3436    RDMALocalBlock *block;
3437    void *host_addr;
3438    int ret = 0;
3439    int idx = 0;
3440    int count = 0;
3441    int i = 0;
3442
3443    rcu_read_lock();
3444    rdma = atomic_rcu_read(&rioc->rdmain);
3445
3446    if (!rdma) {
3447        rcu_read_unlock();
3448        return -EIO;
3449    }
3450
3451    CHECK_ERROR_STATE();
3452
3453    local = &rdma->local_ram_blocks;
3454    do {
3455        trace_qemu_rdma_registration_handle_wait();
3456
3457        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
3458
3459        if (ret < 0) {
3460            break;
3461        }
3462
3463        if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
3464            error_report("rdma: Too many requests in this message (%d)."
3465                            "Bailing.", head.repeat);
3466            ret = -EIO;
3467            break;
3468        }
3469
3470        switch (head.type) {
3471        case RDMA_CONTROL_COMPRESS:
3472            comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3473            network_to_compress(comp);
3474
3475            trace_qemu_rdma_registration_handle_compress(comp->length,
3476                                                         comp->block_idx,
3477                                                         comp->offset);
3478            if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3479                error_report("rdma: 'compress' bad block index %u (vs %d)",
3480                             (unsigned int)comp->block_idx,
3481                             rdma->local_ram_blocks.nb_blocks);
3482                ret = -EIO;
3483                goto out;
3484            }
3485            block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3486
3487            host_addr = block->local_host_addr +
3488                            (comp->offset - block->offset);
3489
3490            ram_handle_compressed(host_addr, comp->value, comp->length);
3491            break;
3492
3493        case RDMA_CONTROL_REGISTER_FINISHED:
3494            trace_qemu_rdma_registration_handle_finished();
3495            goto out;
3496
3497        case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3498            trace_qemu_rdma_registration_handle_ram_blocks();
3499
3500            /* Sort our local RAM Block list so it's the same as the source,
3501             * we can do this since we've filled in a src_index in the list
3502             * as we received the RAMBlock list earlier.
3503             */
3504            qsort(rdma->local_ram_blocks.block,
3505                  rdma->local_ram_blocks.nb_blocks,
3506                  sizeof(RDMALocalBlock), dest_ram_sort_func);
3507            for (i = 0; i < local->nb_blocks; i++) {
3508                local->block[i].index = i;
3509            }
3510
3511            if (rdma->pin_all) {
3512                ret = qemu_rdma_reg_whole_ram_blocks(rdma);
3513                if (ret) {
3514                    error_report("rdma migration: error dest "
3515                                    "registering ram blocks");
3516                    goto out;
3517                }
3518            }
3519
3520            /*
3521             * Dest uses this to prepare to transmit the RAMBlock descriptions
3522             * to the source VM after connection setup.
3523             * Both sides use the "remote" structure to communicate and update
3524             * their "local" descriptions with what was sent.
3525             */
3526            for (i = 0; i < local->nb_blocks; i++) {
3527                rdma->dest_blocks[i].remote_host_addr =
3528                    (uintptr_t)(local->block[i].local_host_addr);
3529
3530                if (rdma->pin_all) {
3531                    rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3532                }
3533
3534                rdma->dest_blocks[i].offset = local->block[i].offset;
3535                rdma->dest_blocks[i].length = local->block[i].length;
3536
3537                dest_block_to_network(&rdma->dest_blocks[i]);
3538                trace_qemu_rdma_registration_handle_ram_blocks_loop(
3539                    local->block[i].block_name,
3540                    local->block[i].offset,
3541                    local->block[i].length,
3542                    local->block[i].local_host_addr,
3543                    local->block[i].src_index);
3544            }
3545
3546            blocks.len = rdma->local_ram_blocks.nb_blocks
3547                                                * sizeof(RDMADestBlock);
3548
3549
3550            ret = qemu_rdma_post_send_control(rdma,
3551                                        (uint8_t *) rdma->dest_blocks, &blocks);
3552
3553            if (ret < 0) {
3554                error_report("rdma migration: error sending remote info");
3555                goto out;
3556            }
3557
3558            break;
3559        case RDMA_CONTROL_REGISTER_REQUEST:
3560            trace_qemu_rdma_registration_handle_register(head.repeat);
3561
3562            reg_resp.repeat = head.repeat;
3563            registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3564
3565            for (count = 0; count < head.repeat; count++) {
3566                uint64_t chunk;
3567                uint8_t *chunk_start, *chunk_end;
3568
3569                reg = &registers[count];
3570                network_to_register(reg);
3571
3572                reg_result = &results[count];
3573
3574                trace_qemu_rdma_registration_handle_register_loop(count,
3575                         reg->current_index, reg->key.current_addr, reg->chunks);
3576
3577                if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3578                    error_report("rdma: 'register' bad block index %u (vs %d)",
3579                                 (unsigned int)reg->current_index,
3580                                 rdma->local_ram_blocks.nb_blocks);
3581                    ret = -ENOENT;
3582                    goto out;
3583                }
3584                block = &(rdma->local_ram_blocks.block[reg->current_index]);
3585                if (block->is_ram_block) {
3586                    if (block->offset > reg->key.current_addr) {
3587                        error_report("rdma: bad register address for block %s"
3588                            " offset: %" PRIx64 " current_addr: %" PRIx64,
3589                            block->block_name, block->offset,
3590                            reg->key.current_addr);
3591                        ret = -ERANGE;
3592                        goto out;
3593                    }
3594                    host_addr = (block->local_host_addr +
3595                                (reg->key.current_addr - block->offset));
3596                    chunk = ram_chunk_index(block->local_host_addr,
3597                                            (uint8_t *) host_addr);
3598                } else {
3599                    chunk = reg->key.chunk;
3600                    host_addr = block->local_host_addr +
3601                        (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3602                    /* Check for particularly bad chunk value */
3603                    if (host_addr < (void *)block->local_host_addr) {
3604                        error_report("rdma: bad chunk for block %s"
3605                            " chunk: %" PRIx64,
3606                            block->block_name, reg->key.chunk);
3607                        ret = -ERANGE;
3608                        goto out;
3609                    }
3610                }
3611                chunk_start = ram_chunk_start(block, chunk);
3612                chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3613                if (qemu_rdma_register_and_get_keys(rdma, block,
3614                            (uintptr_t)host_addr, NULL, &reg_result->rkey,
3615                            chunk, chunk_start, chunk_end)) {
3616                    error_report("cannot get rkey");
3617                    ret = -EINVAL;
3618                    goto out;
3619                }
3620
3621                reg_result->host_addr = (uintptr_t)block->local_host_addr;
3622
3623                trace_qemu_rdma_registration_handle_register_rkey(
3624                                                           reg_result->rkey);
3625
3626                result_to_network(reg_result);
3627            }
3628
3629            ret = qemu_rdma_post_send_control(rdma,
3630                            (uint8_t *) results, &reg_resp);
3631
3632            if (ret < 0) {
3633                error_report("Failed to send control buffer");
3634                goto out;
3635            }
3636            break;
3637        case RDMA_CONTROL_UNREGISTER_REQUEST:
3638            trace_qemu_rdma_registration_handle_unregister(head.repeat);
3639            unreg_resp.repeat = head.repeat;
3640            registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3641
3642            for (count = 0; count < head.repeat; count++) {
3643                reg = &registers[count];
3644                network_to_register(reg);
3645
3646                trace_qemu_rdma_registration_handle_unregister_loop(count,
3647                           reg->current_index, reg->key.chunk);
3648
3649                block = &(rdma->local_ram_blocks.block[reg->current_index]);
3650
3651                ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3652                block->pmr[reg->key.chunk] = NULL;
3653
3654                if (ret != 0) {
3655                    perror("rdma unregistration chunk failed");
3656                    ret = -ret;
3657                    goto out;
3658                }
3659
3660                rdma->total_registrations--;
3661
3662                trace_qemu_rdma_registration_handle_unregister_success(
3663                                                       reg->key.chunk);
3664            }
3665
3666            ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
3667
3668            if (ret < 0) {
3669                error_report("Failed to send control buffer");
3670                goto out;
3671            }
3672            break;
3673        case RDMA_CONTROL_REGISTER_RESULT:
3674            error_report("Invalid RESULT message at dest.");
3675            ret = -EIO;
3676            goto out;
3677        default:
3678            error_report("Unknown control message %s", control_desc(head.type));
3679            ret = -EIO;
3680            goto out;
3681        }
3682    } while (1);
3683out:
3684    if (ret < 0) {
3685        rdma->error_state = ret;
3686    }
3687    rcu_read_unlock();
3688    return ret;
3689}
3690
3691/* Destination:
3692 * Called via a ram_control_load_hook during the initial RAM load section which
3693 * lists the RAMBlocks by name.  This lets us know the order of the RAMBlocks
3694 * on the source.
3695 * We've already built our local RAMBlock list, but not yet sent the list to
3696 * the source.
3697 */
3698static int
3699rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
3700{
3701    RDMAContext *rdma;
3702    int curr;
3703    int found = -1;
3704
3705    rcu_read_lock();
3706    rdma = atomic_rcu_read(&rioc->rdmain);
3707
3708    if (!rdma) {
3709        rcu_read_unlock();
3710        return -EIO;
3711    }
3712
3713    /* Find the matching RAMBlock in our local list */
3714    for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3715        if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3716            found = curr;
3717            break;
3718        }
3719    }
3720
3721    if (found == -1) {
3722        error_report("RAMBlock '%s' not found on destination", name);
3723        rcu_read_unlock();
3724        return -ENOENT;
3725    }
3726
3727    rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3728    trace_rdma_block_notification_handle(name, rdma->next_src_index);
3729    rdma->next_src_index++;
3730
3731    rcu_read_unlock();
3732    return 0;
3733}
3734
3735static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
3736{
3737    switch (flags) {
3738    case RAM_CONTROL_BLOCK_REG:
3739        return rdma_block_notification_handle(opaque, data);
3740
3741    case RAM_CONTROL_HOOK:
3742        return qemu_rdma_registration_handle(f, opaque);
3743
3744    default:
3745        /* Shouldn't be called with any other values */
3746        abort();
3747    }
3748}
3749
3750static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
3751                                        uint64_t flags, void *data)
3752{
3753    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3754    RDMAContext *rdma;
3755
3756    rcu_read_lock();
3757    rdma = atomic_rcu_read(&rioc->rdmaout);
3758    if (!rdma) {
3759        rcu_read_unlock();
3760        return -EIO;
3761    }
3762
3763    CHECK_ERROR_STATE();
3764
3765    if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
3766        rcu_read_unlock();
3767        return 0;
3768    }
3769
3770    trace_qemu_rdma_registration_start(flags);
3771    qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3772    qemu_fflush(f);
3773
3774    rcu_read_unlock();
3775    return 0;
3776}
3777
3778/*
3779 * Inform dest that dynamic registrations are done for now.
3780 * First, flush writes, if any.
3781 */
3782static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
3783                                       uint64_t flags, void *data)
3784{
3785    Error *local_err = NULL, **errp = &local_err;
3786    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3787    RDMAContext *rdma;
3788    RDMAControlHeader head = { .len = 0, .repeat = 1 };
3789    int ret = 0;
3790
3791    rcu_read_lock();
3792    rdma = atomic_rcu_read(&rioc->rdmaout);
3793    if (!rdma) {
3794        rcu_read_unlock();
3795        return -EIO;
3796    }
3797
3798    CHECK_ERROR_STATE();
3799
3800    if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
3801        rcu_read_unlock();
3802        return 0;
3803    }
3804
3805    qemu_fflush(f);
3806    ret = qemu_rdma_drain_cq(f, rdma);
3807
3808    if (ret < 0) {
3809        goto err;
3810    }
3811
3812    if (flags == RAM_CONTROL_SETUP) {
3813        RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3814        RDMALocalBlocks *local = &rdma->local_ram_blocks;
3815        int reg_result_idx, i, nb_dest_blocks;
3816
3817        head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3818        trace_qemu_rdma_registration_stop_ram();
3819
3820        /*
3821         * Make sure that we parallelize the pinning on both sides.
3822         * For very large guests, doing this serially takes a really
3823         * long time, so we have to 'interleave' the pinning locally
3824         * with the control messages by performing the pinning on this
3825         * side before we receive the control response from the other
3826         * side that the pinning has completed.
3827         */
3828        ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3829                    &reg_result_idx, rdma->pin_all ?
3830                    qemu_rdma_reg_whole_ram_blocks : NULL);
3831        if (ret < 0) {
3832            ERROR(errp, "receiving remote info!");
3833            rcu_read_unlock();
3834            return ret;
3835        }
3836
3837        nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3838
3839        /*
3840         * The protocol uses two different sets of rkeys (mutually exclusive):
3841         * 1. One key to represent the virtual address of the entire ram block.
3842         *    (dynamic chunk registration disabled - pin everything with one rkey.)
3843         * 2. One to represent individual chunks within a ram block.
3844         *    (dynamic chunk registration enabled - pin individual chunks.)
3845         *
3846         * Once the capability is successfully negotiated, the destination transmits
3847         * the keys to use (or sends them later) including the virtual addresses
3848         * and then propagates the remote ram block descriptions to his local copy.
3849         */
3850
3851        if (local->nb_blocks != nb_dest_blocks) {
3852            ERROR(errp, "ram blocks mismatch (Number of blocks %d vs %d) "
3853                        "Your QEMU command line parameters are probably "
3854                        "not identical on both the source and destination.",
3855                        local->nb_blocks, nb_dest_blocks);
3856            rdma->error_state = -EINVAL;
3857            rcu_read_unlock();
3858            return -EINVAL;
3859        }
3860
3861        qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3862        memcpy(rdma->dest_blocks,
3863            rdma->wr_data[reg_result_idx].control_curr, resp.len);
3864        for (i = 0; i < nb_dest_blocks; i++) {
3865            network_to_dest_block(&rdma->dest_blocks[i]);
3866
3867            /* We require that the blocks are in the same order */
3868            if (rdma->dest_blocks[i].length != local->block[i].length) {
3869                ERROR(errp, "Block %s/%d has a different length %" PRIu64
3870                            "vs %" PRIu64, local->block[i].block_name, i,
3871                            local->block[i].length,
3872                            rdma->dest_blocks[i].length);
3873                rdma->error_state = -EINVAL;
3874                rcu_read_unlock();
3875                return -EINVAL;
3876            }
3877            local->block[i].remote_host_addr =
3878                    rdma->dest_blocks[i].remote_host_addr;
3879            local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3880        }
3881    }
3882
3883    trace_qemu_rdma_registration_stop(flags);
3884
3885    head.type = RDMA_CONTROL_REGISTER_FINISHED;
3886    ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3887
3888    if (ret < 0) {
3889        goto err;
3890    }
3891
3892    rcu_read_unlock();
3893    return 0;
3894err:
3895    rdma->error_state = ret;
3896    rcu_read_unlock();
3897    return ret;
3898}
3899
3900static const QEMUFileHooks rdma_read_hooks = {
3901    .hook_ram_load = rdma_load_hook,
3902};
3903
3904static const QEMUFileHooks rdma_write_hooks = {
3905    .before_ram_iterate = qemu_rdma_registration_start,
3906    .after_ram_iterate  = qemu_rdma_registration_stop,
3907    .save_page          = qemu_rdma_save_page,
3908};
3909
3910
3911static void qio_channel_rdma_finalize(Object *obj)
3912{
3913    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
3914    if (rioc->rdmain) {
3915        qemu_rdma_cleanup(rioc->rdmain);
3916        g_free(rioc->rdmain);
3917        rioc->rdmain = NULL;
3918    }
3919    if (rioc->rdmaout) {
3920        qemu_rdma_cleanup(rioc->rdmaout);
3921        g_free(rioc->rdmaout);
3922        rioc->rdmaout = NULL;
3923    }
3924}
3925
3926static void qio_channel_rdma_class_init(ObjectClass *klass,
3927                                        void *class_data G_GNUC_UNUSED)
3928{
3929    QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
3930
3931    ioc_klass->io_writev = qio_channel_rdma_writev;
3932    ioc_klass->io_readv = qio_channel_rdma_readv;
3933    ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
3934    ioc_klass->io_close = qio_channel_rdma_close;
3935    ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
3936    ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
3937    ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
3938}
3939
3940static const TypeInfo qio_channel_rdma_info = {
3941    .parent = TYPE_QIO_CHANNEL,
3942    .name = TYPE_QIO_CHANNEL_RDMA,
3943    .instance_size = sizeof(QIOChannelRDMA),
3944    .instance_finalize = qio_channel_rdma_finalize,
3945    .class_init = qio_channel_rdma_class_init,
3946};
3947
3948static void qio_channel_rdma_register_types(void)
3949{
3950    type_register_static(&qio_channel_rdma_info);
3951}
3952
3953type_init(qio_channel_rdma_register_types);
3954
3955static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
3956{
3957    QIOChannelRDMA *rioc;
3958
3959    if (qemu_file_mode_is_not_valid(mode)) {
3960        return NULL;
3961    }
3962
3963    rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
3964
3965    if (mode[0] == 'w') {
3966        rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
3967        rioc->rdmaout = rdma;
3968        rioc->rdmain = rdma->return_path;
3969        qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
3970    } else {
3971        rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
3972        rioc->rdmain = rdma;
3973        rioc->rdmaout = rdma->return_path;
3974        qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
3975    }
3976
3977    return rioc->file;
3978}
3979
3980static void rdma_accept_incoming_migration(void *opaque)
3981{
3982    RDMAContext *rdma = opaque;
3983    int ret;
3984    QEMUFile *f;
3985    Error *local_err = NULL, **errp = &local_err;
3986
3987    trace_qemu_rdma_accept_incoming_migration();
3988    ret = qemu_rdma_accept(rdma);
3989
3990    if (ret) {
3991        ERROR(errp, "RDMA Migration initialization failed!");
3992        return;
3993    }
3994
3995    trace_qemu_rdma_accept_incoming_migration_accepted();
3996
3997    if (rdma->is_return_path) {
3998        return;
3999    }
4000
4001    f = qemu_fopen_rdma(rdma, "rb");
4002    if (f == NULL) {
4003        ERROR(errp, "could not qemu_fopen_rdma!");
4004        qemu_rdma_cleanup(rdma);
4005        return;
4006    }
4007
4008    rdma->migration_started_on_destination = 1;
4009    migration_fd_process_incoming(f);
4010}
4011
4012void rdma_start_incoming_migration(const char *host_port, Error **errp)
4013{
4014    int ret;
4015    RDMAContext *rdma, *rdma_return_path = NULL;
4016    Error *local_err = NULL;
4017
4018    trace_rdma_start_incoming_migration();
4019    rdma = qemu_rdma_data_init(host_port, &local_err);
4020
4021    if (rdma == NULL) {
4022        goto err;
4023    }
4024
4025    ret = qemu_rdma_dest_init(rdma, &local_err);
4026
4027    if (ret) {
4028        goto err;
4029    }
4030
4031    trace_rdma_start_incoming_migration_after_dest_init();
4032
4033    ret = rdma_listen(rdma->listen_id, 5);
4034
4035    if (ret) {
4036        ERROR(errp, "listening on socket!");
4037        goto err;
4038    }
4039
4040    trace_rdma_start_incoming_migration_after_rdma_listen();
4041
4042    /* initialize the RDMAContext for return path */
4043    if (migrate_postcopy()) {
4044        rdma_return_path = qemu_rdma_data_init(host_port, &local_err);
4045
4046        if (rdma_return_path == NULL) {
4047            goto err;
4048        }
4049
4050        qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
4051    }
4052
4053    qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
4054                        NULL, (void *)(intptr_t)rdma);
4055    return;
4056err:
4057    error_propagate(errp, local_err);
4058    g_free(rdma);
4059    g_free(rdma_return_path);
4060}
4061
4062void rdma_start_outgoing_migration(void *opaque,
4063                            const char *host_port, Error **errp)
4064{
4065    MigrationState *s = opaque;
4066    RDMAContext *rdma = qemu_rdma_data_init(host_port, errp);
4067    RDMAContext *rdma_return_path = NULL;
4068    int ret = 0;
4069
4070    if (rdma == NULL) {
4071        goto err;
4072    }
4073
4074    ret = qemu_rdma_source_init(rdma,
4075        s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
4076
4077    if (ret) {
4078        goto err;
4079    }
4080
4081    trace_rdma_start_outgoing_migration_after_rdma_source_init();
4082    ret = qemu_rdma_connect(rdma, errp);
4083
4084    if (ret) {
4085        goto err;
4086    }
4087
4088    /* RDMA postcopy need a seprate queue pair for return path */
4089    if (migrate_postcopy()) {
4090        rdma_return_path = qemu_rdma_data_init(host_port, errp);
4091
4092        if (rdma_return_path == NULL) {
4093            goto err;
4094        }
4095
4096        ret = qemu_rdma_source_init(rdma_return_path,
4097            s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
4098
4099        if (ret) {
4100            goto err;
4101        }
4102
4103        ret = qemu_rdma_connect(rdma_return_path, errp);
4104
4105        if (ret) {
4106            goto err;
4107        }
4108
4109        rdma->return_path = rdma_return_path;
4110        rdma_return_path->return_path = rdma;
4111        rdma_return_path->is_return_path = true;
4112    }
4113
4114    trace_rdma_start_outgoing_migration_after_rdma_connect();
4115
4116    s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
4117    migrate_fd_connect(s, NULL);
4118    return;
4119err:
4120    g_free(rdma);
4121    g_free(rdma_return_path);
4122}
4123