qemu/migration/rdma.c
<<
>>
Prefs
   1/*
   2 * RDMA protocol and interfaces
   3 *
   4 * Copyright IBM, Corp. 2010-2013
   5 * Copyright Red Hat, Inc. 2015-2016
   6 *
   7 * Authors:
   8 *  Michael R. Hines <mrhines@us.ibm.com>
   9 *  Jiuxing Liu <jl@us.ibm.com>
  10 *  Daniel P. Berrange <berrange@redhat.com>
  11 *
  12 * This work is licensed under the terms of the GNU GPL, version 2 or
  13 * later.  See the COPYING file in the top-level directory.
  14 *
  15 */
  16
  17#include "qemu/osdep.h"
  18#include "qapi/error.h"
  19#include "qemu/cutils.h"
  20#include "exec/target_page.h"
  21#include "rdma.h"
  22#include "migration.h"
  23#include "migration-stats.h"
  24#include "qemu-file.h"
  25#include "ram.h"
  26#include "qemu/error-report.h"
  27#include "qemu/main-loop.h"
  28#include "qemu/module.h"
  29#include "qemu/rcu.h"
  30#include "qemu/sockets.h"
  31#include "qemu/bitmap.h"
  32#include "qemu/coroutine.h"
  33#include "system/memory.h"
  34#include <sys/socket.h>
  35#include <netdb.h>
  36#include <arpa/inet.h>
  37#include <rdma/rdma_cma.h>
  38#include "trace.h"
  39#include "qom/object.h"
  40#include "options.h"
  41#include <poll.h>
  42
  43#define RDMA_RESOLVE_TIMEOUT_MS 10000
  44
  45/* Do not merge data if larger than this. */
  46#define RDMA_MERGE_MAX (2 * 1024 * 1024)
  47#define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
  48
  49#define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
  50
  51/*
  52 * This is only for non-live state being migrated.
  53 * Instead of RDMA_WRITE messages, we use RDMA_SEND
  54 * messages for that state, which requires a different
  55 * delivery design than main memory.
  56 */
  57#define RDMA_SEND_INCREMENT 32768
  58
  59/*
  60 * Maximum size infiniband SEND message
  61 */
  62#define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
  63#define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
  64
  65#define RDMA_CONTROL_VERSION_CURRENT 1
  66/*
  67 * Capabilities for negotiation.
  68 */
  69#define RDMA_CAPABILITY_PIN_ALL 0x01
  70
  71/*
  72 * Add the other flags above to this list of known capabilities
  73 * as they are introduced.
  74 */
  75static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
  76
  77/*
  78 * A work request ID is 64-bits and we split up these bits
  79 * into 3 parts:
  80 *
  81 * bits 0-15 : type of control message, 2^16
  82 * bits 16-29: ram block index, 2^14
  83 * bits 30-63: ram block chunk number, 2^34
  84 *
  85 * The last two bit ranges are only used for RDMA writes,
  86 * in order to track their completion and potentially
  87 * also track unregistration status of the message.
  88 */
  89#define RDMA_WRID_TYPE_SHIFT  0UL
  90#define RDMA_WRID_BLOCK_SHIFT 16UL
  91#define RDMA_WRID_CHUNK_SHIFT 30UL
  92
  93#define RDMA_WRID_TYPE_MASK \
  94    ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
  95
  96#define RDMA_WRID_BLOCK_MASK \
  97    (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
  98
  99#define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
 100
 101/*
 102 * RDMA migration protocol:
 103 * 1. RDMA Writes (data messages, i.e. RAM)
 104 * 2. IB Send/Recv (control channel messages)
 105 */
 106enum {
 107    RDMA_WRID_NONE = 0,
 108    RDMA_WRID_RDMA_WRITE = 1,
 109    RDMA_WRID_SEND_CONTROL = 2000,
 110    RDMA_WRID_RECV_CONTROL = 4000,
 111};
 112
 113/*
 114 * Work request IDs for IB SEND messages only (not RDMA writes).
 115 * This is used by the migration protocol to transmit
 116 * control messages (such as device state and registration commands)
 117 *
 118 * We could use more WRs, but we have enough for now.
 119 */
 120enum {
 121    RDMA_WRID_READY = 0,
 122    RDMA_WRID_DATA,
 123    RDMA_WRID_CONTROL,
 124    RDMA_WRID_MAX,
 125};
 126
 127/*
 128 * SEND/RECV IB Control Messages.
 129 */
 130enum {
 131    RDMA_CONTROL_NONE = 0,
 132    RDMA_CONTROL_ERROR,
 133    RDMA_CONTROL_READY,               /* ready to receive */
 134    RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
 135    RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
 136    RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
 137    RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
 138    RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
 139    RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
 140    RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
 141    RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
 142    RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
 143};
 144
 145
 146/*
 147 * Memory and MR structures used to represent an IB Send/Recv work request.
 148 * This is *not* used for RDMA writes, only IB Send/Recv.
 149 */
 150typedef struct {
 151    uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
 152    struct   ibv_mr *control_mr;               /* registration metadata */
 153    size_t   control_len;                      /* length of the message */
 154    uint8_t *control_curr;                     /* start of unconsumed bytes */
 155} RDMAWorkRequestData;
 156
 157/*
 158 * Negotiate RDMA capabilities during connection-setup time.
 159 */
 160typedef struct {
 161    uint32_t version;
 162    uint32_t flags;
 163} RDMACapabilities;
 164
 165static void caps_to_network(RDMACapabilities *cap)
 166{
 167    cap->version = htonl(cap->version);
 168    cap->flags = htonl(cap->flags);
 169}
 170
 171static void network_to_caps(RDMACapabilities *cap)
 172{
 173    cap->version = ntohl(cap->version);
 174    cap->flags = ntohl(cap->flags);
 175}
 176
 177/*
 178 * Representation of a RAMBlock from an RDMA perspective.
 179 * This is not transmitted, only local.
 180 * This and subsequent structures cannot be linked lists
 181 * because we're using a single IB message to transmit
 182 * the information. It's small anyway, so a list is overkill.
 183 */
 184typedef struct RDMALocalBlock {
 185    char          *block_name;
 186    uint8_t       *local_host_addr; /* local virtual address */
 187    uint64_t       remote_host_addr; /* remote virtual address */
 188    uint64_t       offset;
 189    uint64_t       length;
 190    struct         ibv_mr **pmr;    /* MRs for chunk-level registration */
 191    struct         ibv_mr *mr;      /* MR for non-chunk-level registration */
 192    uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
 193    uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration */
 194    int            index;           /* which block are we */
 195    unsigned int   src_index;       /* (Only used on dest) */
 196    bool           is_ram_block;
 197    int            nb_chunks;
 198    unsigned long *transit_bitmap;
 199    unsigned long *unregister_bitmap;
 200} RDMALocalBlock;
 201
 202/*
 203 * Also represents a RAMblock, but only on the dest.
 204 * This gets transmitted by the dest during connection-time
 205 * to the source VM and then is used to populate the
 206 * corresponding RDMALocalBlock with
 207 * the information needed to perform the actual RDMA.
 208 */
 209typedef struct QEMU_PACKED RDMADestBlock {
 210    uint64_t remote_host_addr;
 211    uint64_t offset;
 212    uint64_t length;
 213    uint32_t remote_rkey;
 214    uint32_t padding;
 215} RDMADestBlock;
 216
 217static const char *control_desc(unsigned int rdma_control)
 218{
 219    static const char *strs[] = {
 220        [RDMA_CONTROL_NONE] = "NONE",
 221        [RDMA_CONTROL_ERROR] = "ERROR",
 222        [RDMA_CONTROL_READY] = "READY",
 223        [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
 224        [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
 225        [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
 226        [RDMA_CONTROL_COMPRESS] = "COMPRESS",
 227        [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
 228        [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
 229        [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
 230        [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
 231        [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
 232    };
 233
 234    if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
 235        return "??BAD CONTROL VALUE??";
 236    }
 237
 238    return strs[rdma_control];
 239}
 240
 241#if !defined(htonll)
 242static uint64_t htonll(uint64_t v)
 243{
 244    union { uint32_t lv[2]; uint64_t llv; } u;
 245    u.lv[0] = htonl(v >> 32);
 246    u.lv[1] = htonl(v & 0xFFFFFFFFULL);
 247    return u.llv;
 248}
 249#endif
 250
 251#if !defined(ntohll)
 252static uint64_t ntohll(uint64_t v)
 253{
 254    union { uint32_t lv[2]; uint64_t llv; } u;
 255    u.llv = v;
 256    return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
 257}
 258#endif
 259
 260static void dest_block_to_network(RDMADestBlock *db)
 261{
 262    db->remote_host_addr = htonll(db->remote_host_addr);
 263    db->offset = htonll(db->offset);
 264    db->length = htonll(db->length);
 265    db->remote_rkey = htonl(db->remote_rkey);
 266}
 267
 268static void network_to_dest_block(RDMADestBlock *db)
 269{
 270    db->remote_host_addr = ntohll(db->remote_host_addr);
 271    db->offset = ntohll(db->offset);
 272    db->length = ntohll(db->length);
 273    db->remote_rkey = ntohl(db->remote_rkey);
 274}
 275
 276/*
 277 * Virtual address of the above structures used for transmitting
 278 * the RAMBlock descriptions at connection-time.
 279 * This structure is *not* transmitted.
 280 */
 281typedef struct RDMALocalBlocks {
 282    int nb_blocks;
 283    bool     init;             /* main memory init complete */
 284    RDMALocalBlock *block;
 285} RDMALocalBlocks;
 286
 287/*
 288 * Main data structure for RDMA state.
 289 * While there is only one copy of this structure being allocated right now,
 290 * this is the place where one would start if you wanted to consider
 291 * having more than one RDMA connection open at the same time.
 292 */
 293typedef struct RDMAContext {
 294    char *host;
 295    int port;
 296
 297    RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
 298
 299    /*
 300     * This is used by *_exchange_send() to figure out whether or not
 301     * the initial "READY" message has already been received or not.
 302     * This is because other functions may potentially poll() and detect
 303     * the READY message before send() does, in which case we need to
 304     * know if it completed.
 305     */
 306    int control_ready_expected;
 307
 308    /* number of outstanding writes */
 309    int nb_sent;
 310
 311    /* store info about current buffer so that we can
 312       merge it with future sends */
 313    uint64_t current_addr;
 314    uint64_t current_length;
 315    /* index of ram block the current buffer belongs to */
 316    int current_index;
 317    /* index of the chunk in the current ram block */
 318    int current_chunk;
 319
 320    bool pin_all;
 321
 322    /*
 323     * infiniband-specific variables for opening the device
 324     * and maintaining connection state and so forth.
 325     *
 326     * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
 327     * cm_id->verbs, cm_id->channel, and cm_id->qp.
 328     */
 329    struct rdma_cm_id *cm_id;               /* connection manager ID */
 330    struct rdma_cm_id *listen_id;
 331    bool connected;
 332
 333    struct ibv_context          *verbs;
 334    struct rdma_event_channel   *channel;
 335    struct ibv_qp *qp;                      /* queue pair */
 336    struct ibv_comp_channel *recv_comp_channel;  /* recv completion channel */
 337    struct ibv_comp_channel *send_comp_channel;  /* send completion channel */
 338    struct ibv_pd *pd;                      /* protection domain */
 339    struct ibv_cq *recv_cq;                 /* recvieve completion queue */
 340    struct ibv_cq *send_cq;                 /* send completion queue */
 341
 342    /*
 343     * If a previous write failed (perhaps because of a failed
 344     * memory registration, then do not attempt any future work
 345     * and remember the error state.
 346     */
 347    bool errored;
 348    bool error_reported;
 349    bool received_error;
 350
 351    /*
 352     * Description of ram blocks used throughout the code.
 353     */
 354    RDMALocalBlocks local_ram_blocks;
 355    RDMADestBlock  *dest_blocks;
 356
 357    /* Index of the next RAMBlock received during block registration */
 358    unsigned int    next_src_index;
 359
 360    /*
 361     * Migration on *destination* started.
 362     * Then use coroutine yield function.
 363     * Source runs in a thread, so we don't care.
 364     */
 365    int migration_started_on_destination;
 366
 367    int total_registrations;
 368    int total_writes;
 369
 370    int unregister_current, unregister_next;
 371    uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
 372
 373    GHashTable *blockmap;
 374
 375    /* the RDMAContext for return path */
 376    struct RDMAContext *return_path;
 377    bool is_return_path;
 378} RDMAContext;
 379
 380#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
 381OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA)
 382
 383
 384
 385struct QIOChannelRDMA {
 386    QIOChannel parent;
 387    RDMAContext *rdmain;
 388    RDMAContext *rdmaout;
 389    QEMUFile *file;
 390    bool blocking; /* XXX we don't actually honour this yet */
 391};
 392
 393/*
 394 * Main structure for IB Send/Recv control messages.
 395 * This gets prepended at the beginning of every Send/Recv.
 396 */
 397typedef struct QEMU_PACKED {
 398    uint32_t len;     /* Total length of data portion */
 399    uint32_t type;    /* which control command to perform */
 400    uint32_t repeat;  /* number of commands in data portion of same type */
 401    uint32_t padding;
 402} RDMAControlHeader;
 403
 404static void control_to_network(RDMAControlHeader *control)
 405{
 406    control->type = htonl(control->type);
 407    control->len = htonl(control->len);
 408    control->repeat = htonl(control->repeat);
 409}
 410
 411static void network_to_control(RDMAControlHeader *control)
 412{
 413    control->type = ntohl(control->type);
 414    control->len = ntohl(control->len);
 415    control->repeat = ntohl(control->repeat);
 416}
 417
 418/*
 419 * Register a single Chunk.
 420 * Information sent by the source VM to inform the dest
 421 * to register an single chunk of memory before we can perform
 422 * the actual RDMA operation.
 423 */
 424typedef struct QEMU_PACKED {
 425    union QEMU_PACKED {
 426        uint64_t current_addr;  /* offset into the ram_addr_t space */
 427        uint64_t chunk;         /* chunk to lookup if unregistering */
 428    } key;
 429    uint32_t current_index; /* which ramblock the chunk belongs to */
 430    uint32_t padding;
 431    uint64_t chunks;            /* how many sequential chunks to register */
 432} RDMARegister;
 433
 434static bool rdma_errored(RDMAContext *rdma)
 435{
 436    if (rdma->errored && !rdma->error_reported) {
 437        error_report("RDMA is in an error state waiting migration"
 438                     " to abort!");
 439        rdma->error_reported = true;
 440    }
 441    return rdma->errored;
 442}
 443
 444static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
 445{
 446    RDMALocalBlock *local_block;
 447    local_block  = &rdma->local_ram_blocks.block[reg->current_index];
 448
 449    if (local_block->is_ram_block) {
 450        /*
 451         * current_addr as passed in is an address in the local ram_addr_t
 452         * space, we need to translate this for the destination
 453         */
 454        reg->key.current_addr -= local_block->offset;
 455        reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
 456    }
 457    reg->key.current_addr = htonll(reg->key.current_addr);
 458    reg->current_index = htonl(reg->current_index);
 459    reg->chunks = htonll(reg->chunks);
 460}
 461
 462static void network_to_register(RDMARegister *reg)
 463{
 464    reg->key.current_addr = ntohll(reg->key.current_addr);
 465    reg->current_index = ntohl(reg->current_index);
 466    reg->chunks = ntohll(reg->chunks);
 467}
 468
 469typedef struct QEMU_PACKED {
 470    uint32_t value;     /* if zero, we will madvise() */
 471    uint32_t block_idx; /* which ram block index */
 472    uint64_t offset;    /* Address in remote ram_addr_t space */
 473    uint64_t length;    /* length of the chunk */
 474} RDMACompress;
 475
 476static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
 477{
 478    comp->value = htonl(comp->value);
 479    /*
 480     * comp->offset as passed in is an address in the local ram_addr_t
 481     * space, we need to translate this for the destination
 482     */
 483    comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
 484    comp->offset += rdma->dest_blocks[comp->block_idx].offset;
 485    comp->block_idx = htonl(comp->block_idx);
 486    comp->offset = htonll(comp->offset);
 487    comp->length = htonll(comp->length);
 488}
 489
 490static void network_to_compress(RDMACompress *comp)
 491{
 492    comp->value = ntohl(comp->value);
 493    comp->block_idx = ntohl(comp->block_idx);
 494    comp->offset = ntohll(comp->offset);
 495    comp->length = ntohll(comp->length);
 496}
 497
 498/*
 499 * The result of the dest's memory registration produces an "rkey"
 500 * which the source VM must reference in order to perform
 501 * the RDMA operation.
 502 */
 503typedef struct QEMU_PACKED {
 504    uint32_t rkey;
 505    uint32_t padding;
 506    uint64_t host_addr;
 507} RDMARegisterResult;
 508
 509static void result_to_network(RDMARegisterResult *result)
 510{
 511    result->rkey = htonl(result->rkey);
 512    result->host_addr = htonll(result->host_addr);
 513};
 514
 515static void network_to_result(RDMARegisterResult *result)
 516{
 517    result->rkey = ntohl(result->rkey);
 518    result->host_addr = ntohll(result->host_addr);
 519};
 520
 521static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
 522                                   uint8_t *data, RDMAControlHeader *resp,
 523                                   int *resp_idx,
 524                                   int (*callback)(RDMAContext *rdma,
 525                                                   Error **errp),
 526                                   Error **errp);
 527
 528static inline uint64_t ram_chunk_index(const uint8_t *start,
 529                                       const uint8_t *host)
 530{
 531    return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
 532}
 533
 534static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
 535                                       uint64_t i)
 536{
 537    return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
 538                                  (i << RDMA_REG_CHUNK_SHIFT));
 539}
 540
 541static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
 542                                     uint64_t i)
 543{
 544    uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
 545                                         (1UL << RDMA_REG_CHUNK_SHIFT);
 546
 547    if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
 548        result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
 549    }
 550
 551    return result;
 552}
 553
 554static void rdma_add_block(RDMAContext *rdma, const char *block_name,
 555                           void *host_addr,
 556                           ram_addr_t block_offset, uint64_t length)
 557{
 558    RDMALocalBlocks *local = &rdma->local_ram_blocks;
 559    RDMALocalBlock *block;
 560    RDMALocalBlock *old = local->block;
 561
 562    local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
 563
 564    if (local->nb_blocks) {
 565        if (rdma->blockmap) {
 566            for (int x = 0; x < local->nb_blocks; x++) {
 567                g_hash_table_remove(rdma->blockmap,
 568                                    (void *)(uintptr_t)old[x].offset);
 569                g_hash_table_insert(rdma->blockmap,
 570                                    (void *)(uintptr_t)old[x].offset,
 571                                    &local->block[x]);
 572            }
 573        }
 574        memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
 575        g_free(old);
 576    }
 577
 578    block = &local->block[local->nb_blocks];
 579
 580    block->block_name = g_strdup(block_name);
 581    block->local_host_addr = host_addr;
 582    block->offset = block_offset;
 583    block->length = length;
 584    block->index = local->nb_blocks;
 585    block->src_index = ~0U; /* Filled in by the receipt of the block list */
 586    block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
 587    block->transit_bitmap = bitmap_new(block->nb_chunks);
 588    bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
 589    block->unregister_bitmap = bitmap_new(block->nb_chunks);
 590    bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
 591    block->remote_keys = g_new0(uint32_t, block->nb_chunks);
 592
 593    block->is_ram_block = local->init ? false : true;
 594
 595    if (rdma->blockmap) {
 596        g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
 597    }
 598
 599    trace_rdma_add_block(block_name, local->nb_blocks,
 600                         (uintptr_t) block->local_host_addr,
 601                         block->offset, block->length,
 602                         (uintptr_t) (block->local_host_addr + block->length),
 603                         BITS_TO_LONGS(block->nb_chunks) *
 604                             sizeof(unsigned long) * 8,
 605                         block->nb_chunks);
 606
 607    local->nb_blocks++;
 608}
 609
 610/*
 611 * Memory regions need to be registered with the device and queue pairs setup
 612 * in advanced before the migration starts. This tells us where the RAM blocks
 613 * are so that we can register them individually.
 614 */
 615static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque)
 616{
 617    const char *block_name = qemu_ram_get_idstr(rb);
 618    void *host_addr = qemu_ram_get_host_addr(rb);
 619    ram_addr_t block_offset = qemu_ram_get_offset(rb);
 620    ram_addr_t length = qemu_ram_get_used_length(rb);
 621    rdma_add_block(opaque, block_name, host_addr, block_offset, length);
 622    return 0;
 623}
 624
 625/*
 626 * Identify the RAMBlocks and their quantity. They will be references to
 627 * identify chunk boundaries inside each RAMBlock and also be referenced
 628 * during dynamic page registration.
 629 */
 630static void qemu_rdma_init_ram_blocks(RDMAContext *rdma)
 631{
 632    RDMALocalBlocks *local = &rdma->local_ram_blocks;
 633    int ret;
 634
 635    assert(rdma->blockmap == NULL);
 636    memset(local, 0, sizeof *local);
 637    ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma);
 638    assert(!ret);
 639    trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
 640    rdma->dest_blocks = g_new0(RDMADestBlock,
 641                               rdma->local_ram_blocks.nb_blocks);
 642    local->init = true;
 643}
 644
 645/*
 646 * Note: If used outside of cleanup, the caller must ensure that the destination
 647 * block structures are also updated
 648 */
 649static void rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
 650{
 651    RDMALocalBlocks *local = &rdma->local_ram_blocks;
 652    RDMALocalBlock *old = local->block;
 653
 654    if (rdma->blockmap) {
 655        g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
 656    }
 657    if (block->pmr) {
 658        for (int j = 0; j < block->nb_chunks; j++) {
 659            if (!block->pmr[j]) {
 660                continue;
 661            }
 662            ibv_dereg_mr(block->pmr[j]);
 663            rdma->total_registrations--;
 664        }
 665        g_free(block->pmr);
 666        block->pmr = NULL;
 667    }
 668
 669    if (block->mr) {
 670        ibv_dereg_mr(block->mr);
 671        rdma->total_registrations--;
 672        block->mr = NULL;
 673    }
 674
 675    g_free(block->transit_bitmap);
 676    block->transit_bitmap = NULL;
 677
 678    g_free(block->unregister_bitmap);
 679    block->unregister_bitmap = NULL;
 680
 681    g_free(block->remote_keys);
 682    block->remote_keys = NULL;
 683
 684    g_free(block->block_name);
 685    block->block_name = NULL;
 686
 687    if (rdma->blockmap) {
 688        for (int x = 0; x < local->nb_blocks; x++) {
 689            g_hash_table_remove(rdma->blockmap,
 690                                (void *)(uintptr_t)old[x].offset);
 691        }
 692    }
 693
 694    if (local->nb_blocks > 1) {
 695
 696        local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
 697
 698        if (block->index) {
 699            memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
 700        }
 701
 702        if (block->index < (local->nb_blocks - 1)) {
 703            memcpy(local->block + block->index, old + (block->index + 1),
 704                sizeof(RDMALocalBlock) *
 705                    (local->nb_blocks - (block->index + 1)));
 706            for (int x = block->index; x < local->nb_blocks - 1; x++) {
 707                local->block[x].index--;
 708            }
 709        }
 710    } else {
 711        assert(block == local->block);
 712        local->block = NULL;
 713    }
 714
 715    trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
 716                           block->offset, block->length,
 717                            (uintptr_t)(block->local_host_addr + block->length),
 718                           BITS_TO_LONGS(block->nb_chunks) *
 719                               sizeof(unsigned long) * 8, block->nb_chunks);
 720
 721    g_free(old);
 722
 723    local->nb_blocks--;
 724
 725    if (local->nb_blocks && rdma->blockmap) {
 726        for (int x = 0; x < local->nb_blocks; x++) {
 727            g_hash_table_insert(rdma->blockmap,
 728                                (void *)(uintptr_t)local->block[x].offset,
 729                                &local->block[x]);
 730        }
 731    }
 732}
 733
 734/*
 735 * Trace RDMA device open, with device details.
 736 */
 737static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
 738{
 739    struct ibv_port_attr port;
 740
 741    if (ibv_query_port(verbs, 1, &port)) {
 742        trace_qemu_rdma_dump_id_failed(who);
 743        return;
 744    }
 745
 746    trace_qemu_rdma_dump_id(who,
 747                verbs->device->name,
 748                verbs->device->dev_name,
 749                verbs->device->dev_path,
 750                verbs->device->ibdev_path,
 751                port.link_layer,
 752                port.link_layer == IBV_LINK_LAYER_INFINIBAND ? "Infiniband"
 753                : port.link_layer == IBV_LINK_LAYER_ETHERNET ? "Ethernet"
 754                : "Unknown");
 755}
 756
 757/*
 758 * Trace RDMA gid addressing information.
 759 * Useful for understanding the RDMA device hierarchy in the kernel.
 760 */
 761static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
 762{
 763    char sgid[33];
 764    char dgid[33];
 765    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
 766    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
 767    trace_qemu_rdma_dump_gid(who, sgid, dgid);
 768}
 769
 770/*
 771 * Figure out which RDMA device corresponds to the requested IP hostname
 772 * Also create the initial connection manager identifiers for opening
 773 * the connection.
 774 */
 775static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
 776{
 777    int ret;
 778    struct rdma_addrinfo *res;
 779    char port_str[16];
 780    struct rdma_cm_event *cm_event;
 781    char ip[40] = "unknown";
 782
 783    if (rdma->host == NULL || !strcmp(rdma->host, "")) {
 784        error_setg(errp, "RDMA ERROR: RDMA hostname has not been set");
 785        return -1;
 786    }
 787
 788    /* create CM channel */
 789    rdma->channel = rdma_create_event_channel();
 790    if (!rdma->channel) {
 791        error_setg(errp, "RDMA ERROR: could not create CM channel");
 792        return -1;
 793    }
 794
 795    /* create CM id */
 796    ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
 797    if (ret < 0) {
 798        error_setg(errp, "RDMA ERROR: could not create channel id");
 799        goto err_resolve_create_id;
 800    }
 801
 802    snprintf(port_str, 16, "%d", rdma->port);
 803    port_str[15] = '\0';
 804
 805    ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
 806    if (ret) {
 807        error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s",
 808                   rdma->host);
 809        goto err_resolve_get_addr;
 810    }
 811
 812    /* Try all addresses, exit loop on first success of resolving address */
 813    for (struct rdma_addrinfo *e = res; e != NULL; e = e->ai_next) {
 814
 815        inet_ntop(e->ai_family,
 816            &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
 817        trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
 818
 819        ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
 820                RDMA_RESOLVE_TIMEOUT_MS);
 821        if (ret >= 0) {
 822            goto route;
 823        }
 824    }
 825
 826    rdma_freeaddrinfo(res);
 827    error_setg(errp, "RDMA ERROR: could not resolve address %s", rdma->host);
 828    goto err_resolve_get_addr;
 829
 830route:
 831    rdma_freeaddrinfo(res);
 832    qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
 833
 834    ret = rdma_get_cm_event(rdma->channel, &cm_event);
 835    if (ret < 0) {
 836        error_setg(errp, "RDMA ERROR: could not perform event_addr_resolved");
 837        goto err_resolve_get_addr;
 838    }
 839
 840    if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
 841        error_setg(errp,
 842                   "RDMA ERROR: result not equal to event_addr_resolved %s",
 843                   rdma_event_str(cm_event->event));
 844        rdma_ack_cm_event(cm_event);
 845        goto err_resolve_get_addr;
 846    }
 847    rdma_ack_cm_event(cm_event);
 848
 849    /* resolve route */
 850    ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
 851    if (ret < 0) {
 852        error_setg(errp, "RDMA ERROR: could not resolve rdma route");
 853        goto err_resolve_get_addr;
 854    }
 855
 856    ret = rdma_get_cm_event(rdma->channel, &cm_event);
 857    if (ret < 0) {
 858        error_setg(errp, "RDMA ERROR: could not perform event_route_resolved");
 859        goto err_resolve_get_addr;
 860    }
 861    if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
 862        error_setg(errp, "RDMA ERROR: "
 863                   "result not equal to event_route_resolved: %s",
 864                   rdma_event_str(cm_event->event));
 865        rdma_ack_cm_event(cm_event);
 866        goto err_resolve_get_addr;
 867    }
 868    rdma_ack_cm_event(cm_event);
 869    rdma->verbs = rdma->cm_id->verbs;
 870    qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
 871    qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
 872    return 0;
 873
 874err_resolve_get_addr:
 875    rdma_destroy_id(rdma->cm_id);
 876    rdma->cm_id = NULL;
 877err_resolve_create_id:
 878    rdma_destroy_event_channel(rdma->channel);
 879    rdma->channel = NULL;
 880    return -1;
 881}
 882
 883/*
 884 * Create protection domain and completion queues
 885 */
 886static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma, Error **errp)
 887{
 888    /* allocate pd */
 889    rdma->pd = ibv_alloc_pd(rdma->verbs);
 890    if (!rdma->pd) {
 891        error_setg(errp, "failed to allocate protection domain");
 892        return -1;
 893    }
 894
 895    /* create receive completion channel */
 896    rdma->recv_comp_channel = ibv_create_comp_channel(rdma->verbs);
 897    if (!rdma->recv_comp_channel) {
 898        error_setg(errp, "failed to allocate receive completion channel");
 899        goto err_alloc_pd_cq;
 900    }
 901
 902    /*
 903     * Completion queue can be filled by read work requests.
 904     */
 905    rdma->recv_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
 906                                  NULL, rdma->recv_comp_channel, 0);
 907    if (!rdma->recv_cq) {
 908        error_setg(errp, "failed to allocate receive completion queue");
 909        goto err_alloc_pd_cq;
 910    }
 911
 912    /* create send completion channel */
 913    rdma->send_comp_channel = ibv_create_comp_channel(rdma->verbs);
 914    if (!rdma->send_comp_channel) {
 915        error_setg(errp, "failed to allocate send completion channel");
 916        goto err_alloc_pd_cq;
 917    }
 918
 919    rdma->send_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
 920                                  NULL, rdma->send_comp_channel, 0);
 921    if (!rdma->send_cq) {
 922        error_setg(errp, "failed to allocate send completion queue");
 923        goto err_alloc_pd_cq;
 924    }
 925
 926    return 0;
 927
 928err_alloc_pd_cq:
 929    if (rdma->pd) {
 930        ibv_dealloc_pd(rdma->pd);
 931    }
 932    if (rdma->recv_comp_channel) {
 933        ibv_destroy_comp_channel(rdma->recv_comp_channel);
 934    }
 935    if (rdma->send_comp_channel) {
 936        ibv_destroy_comp_channel(rdma->send_comp_channel);
 937    }
 938    if (rdma->recv_cq) {
 939        ibv_destroy_cq(rdma->recv_cq);
 940        rdma->recv_cq = NULL;
 941    }
 942    rdma->pd = NULL;
 943    rdma->recv_comp_channel = NULL;
 944    rdma->send_comp_channel = NULL;
 945    return -1;
 946
 947}
 948
 949/*
 950 * Create queue pairs.
 951 */
 952static int qemu_rdma_alloc_qp(RDMAContext *rdma)
 953{
 954    struct ibv_qp_init_attr attr = { 0 };
 955
 956    attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
 957    attr.cap.max_recv_wr = 3;
 958    attr.cap.max_send_sge = 1;
 959    attr.cap.max_recv_sge = 1;
 960    attr.send_cq = rdma->send_cq;
 961    attr.recv_cq = rdma->recv_cq;
 962    attr.qp_type = IBV_QPT_RC;
 963
 964    if (rdma_create_qp(rdma->cm_id, rdma->pd, &attr) < 0) {
 965        return -1;
 966    }
 967
 968    rdma->qp = rdma->cm_id->qp;
 969    return 0;
 970}
 971
 972/* Check whether On-Demand Paging is supported by RDAM device */
 973static bool rdma_support_odp(struct ibv_context *dev)
 974{
 975    struct ibv_device_attr_ex attr = {0};
 976
 977    if (ibv_query_device_ex(dev, NULL, &attr)) {
 978        return false;
 979    }
 980
 981    if (attr.odp_caps.general_caps & IBV_ODP_SUPPORT) {
 982        return true;
 983    }
 984
 985    return false;
 986}
 987
 988/*
 989 * ibv_advise_mr to avoid RNR NAK error as far as possible.
 990 * The responder mr registering with ODP will sent RNR NAK back to
 991 * the requester in the face of the page fault.
 992 */
 993static void qemu_rdma_advise_prefetch_mr(struct ibv_pd *pd, uint64_t addr,
 994                                         uint32_t len,  uint32_t lkey,
 995                                         const char *name, bool wr)
 996{
 997#ifdef HAVE_IBV_ADVISE_MR
 998    int ret;
 999    int advice = wr ? IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE :
1000                 IBV_ADVISE_MR_ADVICE_PREFETCH;
1001    struct ibv_sge sg_list = {.lkey = lkey, .addr = addr, .length = len};
1002
1003    ret = ibv_advise_mr(pd, advice,
1004                        IBV_ADVISE_MR_FLAG_FLUSH, &sg_list, 1);
1005    /* ignore the error */
1006    trace_qemu_rdma_advise_mr(name, len, addr, strerror(ret));
1007#endif
1008}
1009
1010static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma, Error **errp)
1011{
1012    int i;
1013    RDMALocalBlocks *local = &rdma->local_ram_blocks;
1014
1015    for (i = 0; i < local->nb_blocks; i++) {
1016        int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE;
1017
1018        local->block[i].mr =
1019            ibv_reg_mr(rdma->pd,
1020                    local->block[i].local_host_addr,
1021                    local->block[i].length, access
1022                    );
1023        /*
1024         * ibv_reg_mr() is not documented to set errno.  If it does,
1025         * it's somebody else's doc bug.  If it doesn't, the use of
1026         * errno below is wrong.
1027         * TODO Find out whether ibv_reg_mr() sets errno.
1028         */
1029        if (!local->block[i].mr &&
1030            errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1031                access |= IBV_ACCESS_ON_DEMAND;
1032                /* register ODP mr */
1033                local->block[i].mr =
1034                    ibv_reg_mr(rdma->pd,
1035                               local->block[i].local_host_addr,
1036                               local->block[i].length, access);
1037                trace_qemu_rdma_register_odp_mr(local->block[i].block_name);
1038
1039                if (local->block[i].mr) {
1040                    qemu_rdma_advise_prefetch_mr(rdma->pd,
1041                                    (uintptr_t)local->block[i].local_host_addr,
1042                                    local->block[i].length,
1043                                    local->block[i].mr->lkey,
1044                                    local->block[i].block_name,
1045                                    true);
1046                }
1047        }
1048
1049        if (!local->block[i].mr) {
1050            error_setg_errno(errp, errno,
1051                             "Failed to register local dest ram block!");
1052            goto err;
1053        }
1054        rdma->total_registrations++;
1055    }
1056
1057    return 0;
1058
1059err:
1060    for (i--; i >= 0; i--) {
1061        ibv_dereg_mr(local->block[i].mr);
1062        local->block[i].mr = NULL;
1063        rdma->total_registrations--;
1064    }
1065
1066    return -1;
1067
1068}
1069
1070/*
1071 * Find the ram block that corresponds to the page requested to be
1072 * transmitted by QEMU.
1073 *
1074 * Once the block is found, also identify which 'chunk' within that
1075 * block that the page belongs to.
1076 */
1077static void qemu_rdma_search_ram_block(RDMAContext *rdma,
1078                                       uintptr_t block_offset,
1079                                       uint64_t offset,
1080                                       uint64_t length,
1081                                       uint64_t *block_index,
1082                                       uint64_t *chunk_index)
1083{
1084    uint64_t current_addr = block_offset + offset;
1085    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1086                                                (void *) block_offset);
1087    assert(block);
1088    assert(current_addr >= block->offset);
1089    assert((current_addr + length) <= (block->offset + block->length));
1090
1091    *block_index = block->index;
1092    *chunk_index = ram_chunk_index(block->local_host_addr,
1093                block->local_host_addr + (current_addr - block->offset));
1094}
1095
1096/*
1097 * Register a chunk with IB. If the chunk was already registered
1098 * previously, then skip.
1099 *
1100 * Also return the keys associated with the registration needed
1101 * to perform the actual RDMA operation.
1102 */
1103static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1104        RDMALocalBlock *block, uintptr_t host_addr,
1105        uint32_t *lkey, uint32_t *rkey, int chunk,
1106        uint8_t *chunk_start, uint8_t *chunk_end)
1107{
1108    if (block->mr) {
1109        if (lkey) {
1110            *lkey = block->mr->lkey;
1111        }
1112        if (rkey) {
1113            *rkey = block->mr->rkey;
1114        }
1115        return 0;
1116    }
1117
1118    /* allocate memory to store chunk MRs */
1119    if (!block->pmr) {
1120        block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1121    }
1122
1123    /*
1124     * If 'rkey', then we're the destination, so grant access to the source.
1125     *
1126     * If 'lkey', then we're the source VM, so grant access only to ourselves.
1127     */
1128    if (!block->pmr[chunk]) {
1129        uint64_t len = chunk_end - chunk_start;
1130        int access = rkey ? IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE :
1131                     0;
1132
1133        trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1134
1135        block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1136        /*
1137         * ibv_reg_mr() is not documented to set errno.  If it does,
1138         * it's somebody else's doc bug.  If it doesn't, the use of
1139         * errno below is wrong.
1140         * TODO Find out whether ibv_reg_mr() sets errno.
1141         */
1142        if (!block->pmr[chunk] &&
1143            errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1144            access |= IBV_ACCESS_ON_DEMAND;
1145            /* register ODP mr */
1146            block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1147            trace_qemu_rdma_register_odp_mr(block->block_name);
1148
1149            if (block->pmr[chunk]) {
1150                qemu_rdma_advise_prefetch_mr(rdma->pd, (uintptr_t)chunk_start,
1151                                            len, block->pmr[chunk]->lkey,
1152                                            block->block_name, rkey);
1153
1154            }
1155        }
1156    }
1157    if (!block->pmr[chunk]) {
1158        return -1;
1159    }
1160    rdma->total_registrations++;
1161
1162    if (lkey) {
1163        *lkey = block->pmr[chunk]->lkey;
1164    }
1165    if (rkey) {
1166        *rkey = block->pmr[chunk]->rkey;
1167    }
1168    return 0;
1169}
1170
1171/*
1172 * Register (at connection time) the memory used for control
1173 * channel messages.
1174 */
1175static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1176{
1177    rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1178            rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1179            IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1180    if (rdma->wr_data[idx].control_mr) {
1181        rdma->total_registrations++;
1182        return 0;
1183    }
1184    return -1;
1185}
1186
1187/*
1188 * Perform a non-optimized memory unregistration after every transfer
1189 * for demonstration purposes, only if pin-all is not requested.
1190 *
1191 * Potential optimizations:
1192 * 1. Start a new thread to run this function continuously
1193        - for bit clearing
1194        - and for receipt of unregister messages
1195 * 2. Use an LRU.
1196 * 3. Use workload hints.
1197 */
1198static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1199{
1200    Error *err = NULL;
1201
1202    while (rdma->unregistrations[rdma->unregister_current]) {
1203        int ret;
1204        uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1205        uint64_t chunk =
1206            (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1207        uint64_t index =
1208            (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1209        RDMALocalBlock *block =
1210            &(rdma->local_ram_blocks.block[index]);
1211        RDMARegister reg = { .current_index = index };
1212        RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1213                                 };
1214        RDMAControlHeader head = { .len = sizeof(RDMARegister),
1215                                   .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1216                                   .repeat = 1,
1217                                 };
1218
1219        trace_qemu_rdma_unregister_waiting_proc(chunk,
1220                                                rdma->unregister_current);
1221
1222        rdma->unregistrations[rdma->unregister_current] = 0;
1223        rdma->unregister_current++;
1224
1225        if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1226            rdma->unregister_current = 0;
1227        }
1228
1229
1230        /*
1231         * Unregistration is speculative (because migration is single-threaded
1232         * and we cannot break the protocol's inifinband message ordering).
1233         * Thus, if the memory is currently being used for transmission,
1234         * then abort the attempt to unregister and try again
1235         * later the next time a completion is received for this memory.
1236         */
1237        clear_bit(chunk, block->unregister_bitmap);
1238
1239        if (test_bit(chunk, block->transit_bitmap)) {
1240            trace_qemu_rdma_unregister_waiting_inflight(chunk);
1241            continue;
1242        }
1243
1244        trace_qemu_rdma_unregister_waiting_send(chunk);
1245
1246        ret = ibv_dereg_mr(block->pmr[chunk]);
1247        block->pmr[chunk] = NULL;
1248        block->remote_keys[chunk] = 0;
1249
1250        if (ret != 0) {
1251            error_report("unregistration chunk failed: %s",
1252                         strerror(ret));
1253            return -1;
1254        }
1255        rdma->total_registrations--;
1256
1257        reg.key.chunk = chunk;
1258        register_to_network(rdma, &reg);
1259        ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1260                                      &resp, NULL, NULL, &err);
1261        if (ret < 0) {
1262            error_report_err(err);
1263            return -1;
1264        }
1265
1266        trace_qemu_rdma_unregister_waiting_complete(chunk);
1267    }
1268
1269    return 0;
1270}
1271
1272static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1273                                         uint64_t chunk)
1274{
1275    uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1276
1277    result |= (index << RDMA_WRID_BLOCK_SHIFT);
1278    result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1279
1280    return result;
1281}
1282
1283/*
1284 * Consult the connection manager to see a work request
1285 * (of any kind) has completed.
1286 * Return the work request ID that completed.
1287 */
1288static int qemu_rdma_poll(RDMAContext *rdma, struct ibv_cq *cq,
1289                          uint64_t *wr_id_out, uint32_t *byte_len)
1290{
1291    int ret;
1292    struct ibv_wc wc;
1293    uint64_t wr_id;
1294
1295    ret = ibv_poll_cq(cq, 1, &wc);
1296
1297    if (!ret) {
1298        *wr_id_out = RDMA_WRID_NONE;
1299        return 0;
1300    }
1301
1302    if (ret < 0) {
1303        return -1;
1304    }
1305
1306    wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1307
1308    if (wc.status != IBV_WC_SUCCESS) {
1309        return -1;
1310    }
1311
1312    if (rdma->control_ready_expected &&
1313        (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1314        trace_qemu_rdma_poll_recv(wr_id - RDMA_WRID_RECV_CONTROL, wr_id,
1315                                  rdma->nb_sent);
1316        rdma->control_ready_expected = 0;
1317    }
1318
1319    if (wr_id == RDMA_WRID_RDMA_WRITE) {
1320        uint64_t chunk =
1321            (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1322        uint64_t index =
1323            (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1324        RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1325
1326        trace_qemu_rdma_poll_write(wr_id, rdma->nb_sent,
1327                                   index, chunk, block->local_host_addr,
1328                                   (void *)(uintptr_t)block->remote_host_addr);
1329
1330        clear_bit(chunk, block->transit_bitmap);
1331
1332        if (rdma->nb_sent > 0) {
1333            rdma->nb_sent--;
1334        }
1335    } else {
1336        trace_qemu_rdma_poll_other(wr_id, rdma->nb_sent);
1337    }
1338
1339    *wr_id_out = wc.wr_id;
1340    if (byte_len) {
1341        *byte_len = wc.byte_len;
1342    }
1343
1344    return  0;
1345}
1346
1347/* Wait for activity on the completion channel.
1348 * Returns 0 on success, none-0 on error.
1349 */
1350static int qemu_rdma_wait_comp_channel(RDMAContext *rdma,
1351                                       struct ibv_comp_channel *comp_channel)
1352{
1353    struct rdma_cm_event *cm_event;
1354
1355    /*
1356     * Coroutine doesn't start until migration_fd_process_incoming()
1357     * so don't yield unless we know we're running inside of a coroutine.
1358     */
1359    if (rdma->migration_started_on_destination &&
1360        migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
1361        yield_until_fd_readable(comp_channel->fd);
1362    } else {
1363        /* This is the source side, we're in a separate thread
1364         * or destination prior to migration_fd_process_incoming()
1365         * after postcopy, the destination also in a separate thread.
1366         * we can't yield; so we have to poll the fd.
1367         * But we need to be able to handle 'cancel' or an error
1368         * without hanging forever.
1369         */
1370        while (!rdma->errored && !rdma->received_error) {
1371            GPollFD pfds[2];
1372            pfds[0].fd = comp_channel->fd;
1373            pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1374            pfds[0].revents = 0;
1375
1376            pfds[1].fd = rdma->channel->fd;
1377            pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1378            pfds[1].revents = 0;
1379
1380            /* 0.1s timeout, should be fine for a 'cancel' */
1381            switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) {
1382            case 2:
1383            case 1: /* fd active */
1384                if (pfds[0].revents) {
1385                    return 0;
1386                }
1387
1388                if (pfds[1].revents) {
1389                    if (rdma_get_cm_event(rdma->channel, &cm_event) < 0) {
1390                        return -1;
1391                    }
1392
1393                    if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
1394                        cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
1395                        rdma_ack_cm_event(cm_event);
1396                        return -1;
1397                    }
1398                    rdma_ack_cm_event(cm_event);
1399                }
1400                break;
1401
1402            case 0: /* Timeout, go around again */
1403                break;
1404
1405            default: /* Error of some type -
1406                      * I don't trust errno from qemu_poll_ns
1407                     */
1408                return -1;
1409            }
1410
1411            if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
1412                /* Bail out and let the cancellation happen */
1413                return -1;
1414            }
1415        }
1416    }
1417
1418    if (rdma->received_error) {
1419        return -1;
1420    }
1421    return -rdma->errored;
1422}
1423
1424static struct ibv_comp_channel *to_channel(RDMAContext *rdma, uint64_t wrid)
1425{
1426    return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_comp_channel :
1427           rdma->recv_comp_channel;
1428}
1429
1430static struct ibv_cq *to_cq(RDMAContext *rdma, uint64_t wrid)
1431{
1432    return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_cq : rdma->recv_cq;
1433}
1434
1435/*
1436 * Block until the next work request has completed.
1437 *
1438 * First poll to see if a work request has already completed,
1439 * otherwise block.
1440 *
1441 * If we encounter completed work requests for IDs other than
1442 * the one we're interested in, then that's generally an error.
1443 *
1444 * The only exception is actual RDMA Write completions. These
1445 * completions only need to be recorded, but do not actually
1446 * need further processing.
1447 */
1448static int qemu_rdma_block_for_wrid(RDMAContext *rdma,
1449                                    uint64_t wrid_requested,
1450                                    uint32_t *byte_len)
1451{
1452    int num_cq_events = 0, ret;
1453    struct ibv_cq *cq;
1454    void *cq_ctx;
1455    uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1456    struct ibv_comp_channel *ch = to_channel(rdma, wrid_requested);
1457    struct ibv_cq *poll_cq = to_cq(rdma, wrid_requested);
1458
1459    if (ibv_req_notify_cq(poll_cq, 0)) {
1460        return -1;
1461    }
1462    /* poll cq first */
1463    while (wr_id != wrid_requested) {
1464        ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
1465        if (ret < 0) {
1466            return -1;
1467        }
1468
1469        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1470
1471        if (wr_id == RDMA_WRID_NONE) {
1472            break;
1473        }
1474        if (wr_id != wrid_requested) {
1475            trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id);
1476        }
1477    }
1478
1479    if (wr_id == wrid_requested) {
1480        return 0;
1481    }
1482
1483    while (1) {
1484        ret = qemu_rdma_wait_comp_channel(rdma, ch);
1485        if (ret < 0) {
1486            goto err_block_for_wrid;
1487        }
1488
1489        ret = ibv_get_cq_event(ch, &cq, &cq_ctx);
1490        if (ret < 0) {
1491            goto err_block_for_wrid;
1492        }
1493
1494        num_cq_events++;
1495
1496        if (ibv_req_notify_cq(cq, 0)) {
1497            goto err_block_for_wrid;
1498        }
1499
1500        while (wr_id != wrid_requested) {
1501            ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
1502            if (ret < 0) {
1503                goto err_block_for_wrid;
1504            }
1505
1506            wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1507
1508            if (wr_id == RDMA_WRID_NONE) {
1509                break;
1510            }
1511            if (wr_id != wrid_requested) {
1512                trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id);
1513            }
1514        }
1515
1516        if (wr_id == wrid_requested) {
1517            goto success_block_for_wrid;
1518        }
1519    }
1520
1521success_block_for_wrid:
1522    if (num_cq_events) {
1523        ibv_ack_cq_events(cq, num_cq_events);
1524    }
1525    return 0;
1526
1527err_block_for_wrid:
1528    if (num_cq_events) {
1529        ibv_ack_cq_events(cq, num_cq_events);
1530    }
1531
1532    rdma->errored = true;
1533    return -1;
1534}
1535
1536/*
1537 * Post a SEND message work request for the control channel
1538 * containing some data and block until the post completes.
1539 */
1540static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1541                                       RDMAControlHeader *head,
1542                                       Error **errp)
1543{
1544    int ret;
1545    RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1546    struct ibv_send_wr *bad_wr;
1547    struct ibv_sge sge = {
1548                           .addr = (uintptr_t)(wr->control),
1549                           .length = head->len + sizeof(RDMAControlHeader),
1550                           .lkey = wr->control_mr->lkey,
1551                         };
1552    struct ibv_send_wr send_wr = {
1553                                   .wr_id = RDMA_WRID_SEND_CONTROL,
1554                                   .opcode = IBV_WR_SEND,
1555                                   .send_flags = IBV_SEND_SIGNALED,
1556                                   .sg_list = &sge,
1557                                   .num_sge = 1,
1558                                };
1559
1560    trace_qemu_rdma_post_send_control(control_desc(head->type));
1561
1562    /*
1563     * We don't actually need to do a memcpy() in here if we used
1564     * the "sge" properly, but since we're only sending control messages
1565     * (not RAM in a performance-critical path), then its OK for now.
1566     *
1567     * The copy makes the RDMAControlHeader simpler to manipulate
1568     * for the time being.
1569     */
1570    assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1571    memcpy(wr->control, head, sizeof(RDMAControlHeader));
1572    control_to_network((void *) wr->control);
1573
1574    if (buf) {
1575        memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1576    }
1577
1578
1579    ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1580
1581    if (ret > 0) {
1582        error_setg(errp, "Failed to use post IB SEND for control");
1583        return -1;
1584    }
1585
1586    ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1587    if (ret < 0) {
1588        error_setg(errp, "rdma migration: send polling control error");
1589        return -1;
1590    }
1591
1592    return 0;
1593}
1594
1595/*
1596 * Post a RECV work request in anticipation of some future receipt
1597 * of data on the control channel.
1598 */
1599static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx,
1600                                       Error **errp)
1601{
1602    struct ibv_recv_wr *bad_wr;
1603    struct ibv_sge sge = {
1604                            .addr = (uintptr_t)(rdma->wr_data[idx].control),
1605                            .length = RDMA_CONTROL_MAX_BUFFER,
1606                            .lkey = rdma->wr_data[idx].control_mr->lkey,
1607                         };
1608
1609    struct ibv_recv_wr recv_wr = {
1610                                    .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1611                                    .sg_list = &sge,
1612                                    .num_sge = 1,
1613                                 };
1614
1615
1616    if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1617        error_setg(errp, "error posting control recv");
1618        return -1;
1619    }
1620
1621    return 0;
1622}
1623
1624/*
1625 * Block and wait for a RECV control channel message to arrive.
1626 */
1627static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1628                RDMAControlHeader *head, uint32_t expecting, int idx,
1629                Error **errp)
1630{
1631    uint32_t byte_len;
1632    int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1633                                       &byte_len);
1634
1635    if (ret < 0) {
1636        error_setg(errp, "rdma migration: recv polling control error!");
1637        return -1;
1638    }
1639
1640    network_to_control((void *) rdma->wr_data[idx].control);
1641    memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1642
1643    trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
1644
1645    if (expecting == RDMA_CONTROL_NONE) {
1646        trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
1647                                             head->type);
1648    } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1649        error_setg(errp, "Was expecting a %s (%d) control message"
1650                ", but got: %s (%d), length: %d",
1651                control_desc(expecting), expecting,
1652                control_desc(head->type), head->type, head->len);
1653        if (head->type == RDMA_CONTROL_ERROR) {
1654            rdma->received_error = true;
1655        }
1656        return -1;
1657    }
1658    if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1659        error_setg(errp, "too long length: %d", head->len);
1660        return -1;
1661    }
1662    if (sizeof(*head) + head->len != byte_len) {
1663        error_setg(errp, "Malformed length: %d byte_len %d",
1664                   head->len, byte_len);
1665        return -1;
1666    }
1667
1668    return 0;
1669}
1670
1671/*
1672 * When a RECV work request has completed, the work request's
1673 * buffer is pointed at the header.
1674 *
1675 * This will advance the pointer to the data portion
1676 * of the control message of the work request's buffer that
1677 * was populated after the work request finished.
1678 */
1679static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1680                                  RDMAControlHeader *head)
1681{
1682    rdma->wr_data[idx].control_len = head->len;
1683    rdma->wr_data[idx].control_curr =
1684        rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1685}
1686
1687/*
1688 * This is an 'atomic' high-level operation to deliver a single, unified
1689 * control-channel message.
1690 *
1691 * Additionally, if the user is expecting some kind of reply to this message,
1692 * they can request a 'resp' response message be filled in by posting an
1693 * additional work request on behalf of the user and waiting for an additional
1694 * completion.
1695 *
1696 * The extra (optional) response is used during registration to us from having
1697 * to perform an *additional* exchange of message just to provide a response by
1698 * instead piggy-backing on the acknowledgement.
1699 */
1700static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1701                                   uint8_t *data, RDMAControlHeader *resp,
1702                                   int *resp_idx,
1703                                   int (*callback)(RDMAContext *rdma,
1704                                                   Error **errp),
1705                                   Error **errp)
1706{
1707    int ret;
1708
1709    /*
1710     * Wait until the dest is ready before attempting to deliver the message
1711     * by waiting for a READY message.
1712     */
1713    if (rdma->control_ready_expected) {
1714        RDMAControlHeader resp_ignored;
1715
1716        ret = qemu_rdma_exchange_get_response(rdma, &resp_ignored,
1717                                              RDMA_CONTROL_READY,
1718                                              RDMA_WRID_READY, errp);
1719        if (ret < 0) {
1720            return -1;
1721        }
1722    }
1723
1724    /*
1725     * If the user is expecting a response, post a WR in anticipation of it.
1726     */
1727    if (resp) {
1728        ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA, errp);
1729        if (ret < 0) {
1730            return -1;
1731        }
1732    }
1733
1734    /*
1735     * Post a WR to replace the one we just consumed for the READY message.
1736     */
1737    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
1738    if (ret < 0) {
1739        return -1;
1740    }
1741
1742    /*
1743     * Deliver the control message that was requested.
1744     */
1745    ret = qemu_rdma_post_send_control(rdma, data, head, errp);
1746
1747    if (ret < 0) {
1748        return -1;
1749    }
1750
1751    /*
1752     * If we're expecting a response, block and wait for it.
1753     */
1754    if (resp) {
1755        if (callback) {
1756            trace_qemu_rdma_exchange_send_issue_callback();
1757            ret = callback(rdma, errp);
1758            if (ret < 0) {
1759                return -1;
1760            }
1761        }
1762
1763        trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
1764        ret = qemu_rdma_exchange_get_response(rdma, resp,
1765                                              resp->type, RDMA_WRID_DATA,
1766                                              errp);
1767
1768        if (ret < 0) {
1769            return -1;
1770        }
1771
1772        qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1773        if (resp_idx) {
1774            *resp_idx = RDMA_WRID_DATA;
1775        }
1776        trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
1777    }
1778
1779    rdma->control_ready_expected = 1;
1780
1781    return 0;
1782}
1783
1784/*
1785 * This is an 'atomic' high-level operation to receive a single, unified
1786 * control-channel message.
1787 */
1788static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1789                                   uint32_t expecting, Error **errp)
1790{
1791    RDMAControlHeader ready = {
1792                                .len = 0,
1793                                .type = RDMA_CONTROL_READY,
1794                                .repeat = 1,
1795                              };
1796    int ret;
1797
1798    /*
1799     * Inform the source that we're ready to receive a message.
1800     */
1801    ret = qemu_rdma_post_send_control(rdma, NULL, &ready, errp);
1802
1803    if (ret < 0) {
1804        return -1;
1805    }
1806
1807    /*
1808     * Block and wait for the message.
1809     */
1810    ret = qemu_rdma_exchange_get_response(rdma, head,
1811                                          expecting, RDMA_WRID_READY, errp);
1812
1813    if (ret < 0) {
1814        return -1;
1815    }
1816
1817    qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1818
1819    /*
1820     * Post a new RECV work request to replace the one we just consumed.
1821     */
1822    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
1823    if (ret < 0) {
1824        return -1;
1825    }
1826
1827    return 0;
1828}
1829
1830/*
1831 * Write an actual chunk of memory using RDMA.
1832 *
1833 * If we're using dynamic registration on the dest-side, we have to
1834 * send a registration command first.
1835 */
1836static int qemu_rdma_write_one(RDMAContext *rdma,
1837                               int current_index, uint64_t current_addr,
1838                               uint64_t length, Error **errp)
1839{
1840    struct ibv_sge sge;
1841    struct ibv_send_wr send_wr = { 0 };
1842    struct ibv_send_wr *bad_wr;
1843    int reg_result_idx, ret, count = 0;
1844    uint64_t chunk, chunks;
1845    uint8_t *chunk_start, *chunk_end;
1846    RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
1847    RDMARegister reg;
1848    RDMARegisterResult *reg_result;
1849    RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
1850    RDMAControlHeader head = { .len = sizeof(RDMARegister),
1851                               .type = RDMA_CONTROL_REGISTER_REQUEST,
1852                               .repeat = 1,
1853                             };
1854
1855retry:
1856    sge.addr = (uintptr_t)(block->local_host_addr +
1857                            (current_addr - block->offset));
1858    sge.length = length;
1859
1860    chunk = ram_chunk_index(block->local_host_addr,
1861                            (uint8_t *)(uintptr_t)sge.addr);
1862    chunk_start = ram_chunk_start(block, chunk);
1863
1864    if (block->is_ram_block) {
1865        chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
1866
1867        if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1868            chunks--;
1869        }
1870    } else {
1871        chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
1872
1873        if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1874            chunks--;
1875        }
1876    }
1877
1878    trace_qemu_rdma_write_one_top(chunks + 1,
1879                                  (chunks + 1) *
1880                                  (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
1881
1882    chunk_end = ram_chunk_end(block, chunk + chunks);
1883
1884
1885    while (test_bit(chunk, block->transit_bitmap)) {
1886        (void)count;
1887        trace_qemu_rdma_write_one_block(count++, current_index, chunk,
1888                sge.addr, length, rdma->nb_sent, block->nb_chunks);
1889
1890        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
1891
1892        if (ret < 0) {
1893            error_setg(errp, "Failed to Wait for previous write to complete "
1894                    "block %d chunk %" PRIu64
1895                    " current %" PRIu64 " len %" PRIu64 " %d",
1896                    current_index, chunk, sge.addr, length, rdma->nb_sent);
1897            return -1;
1898        }
1899    }
1900
1901    if (!rdma->pin_all || !block->is_ram_block) {
1902        if (!block->remote_keys[chunk]) {
1903            /*
1904             * This chunk has not yet been registered, so first check to see
1905             * if the entire chunk is zero. If so, tell the other size to
1906             * memset() + madvise() the entire chunk without RDMA.
1907             */
1908
1909            if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
1910                RDMACompress comp = {
1911                                        .offset = current_addr,
1912                                        .value = 0,
1913                                        .block_idx = current_index,
1914                                        .length = length,
1915                                    };
1916
1917                head.len = sizeof(comp);
1918                head.type = RDMA_CONTROL_COMPRESS;
1919
1920                trace_qemu_rdma_write_one_zero(chunk, sge.length,
1921                                               current_index, current_addr);
1922
1923                compress_to_network(rdma, &comp);
1924                ret = qemu_rdma_exchange_send(rdma, &head,
1925                                (uint8_t *) &comp, NULL, NULL, NULL, errp);
1926
1927                if (ret < 0) {
1928                    return -1;
1929                }
1930
1931                /*
1932                 * TODO: Here we are sending something, but we are not
1933                 * accounting for anything transferred.  The following is wrong:
1934                 *
1935                 * stat64_add(&mig_stats.rdma_bytes, sge.length);
1936                 *
1937                 * because we are using some kind of compression.  I
1938                 * would think that head.len would be the more similar
1939                 * thing to a correct value.
1940                 */
1941                stat64_add(&mig_stats.zero_pages,
1942                           sge.length / qemu_target_page_size());
1943                return 1;
1944            }
1945
1946            /*
1947             * Otherwise, tell other side to register.
1948             */
1949            reg.current_index = current_index;
1950            if (block->is_ram_block) {
1951                reg.key.current_addr = current_addr;
1952            } else {
1953                reg.key.chunk = chunk;
1954            }
1955            reg.chunks = chunks;
1956
1957            trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
1958                                              current_addr);
1959
1960            register_to_network(rdma, &reg);
1961            ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1962                                    &resp, &reg_result_idx, NULL, errp);
1963            if (ret < 0) {
1964                return -1;
1965            }
1966
1967            /* try to overlap this single registration with the one we sent. */
1968            if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
1969                                                &sge.lkey, NULL, chunk,
1970                                                chunk_start, chunk_end)) {
1971                error_setg(errp, "cannot get lkey");
1972                return -1;
1973            }
1974
1975            reg_result = (RDMARegisterResult *)
1976                    rdma->wr_data[reg_result_idx].control_curr;
1977
1978            network_to_result(reg_result);
1979
1980            trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
1981                                                 reg_result->rkey, chunk);
1982
1983            block->remote_keys[chunk] = reg_result->rkey;
1984            block->remote_host_addr = reg_result->host_addr;
1985        } else {
1986            /* already registered before */
1987            if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
1988                                                &sge.lkey, NULL, chunk,
1989                                                chunk_start, chunk_end)) {
1990                error_setg(errp, "cannot get lkey!");
1991                return -1;
1992            }
1993        }
1994
1995        send_wr.wr.rdma.rkey = block->remote_keys[chunk];
1996    } else {
1997        send_wr.wr.rdma.rkey = block->remote_rkey;
1998
1999        if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2000                                                     &sge.lkey, NULL, chunk,
2001                                                     chunk_start, chunk_end)) {
2002            error_setg(errp, "cannot get lkey!");
2003            return -1;
2004        }
2005    }
2006
2007    /*
2008     * Encode the ram block index and chunk within this wrid.
2009     * We will use this information at the time of completion
2010     * to figure out which bitmap to check against and then which
2011     * chunk in the bitmap to look for.
2012     */
2013    send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2014                                        current_index, chunk);
2015
2016    send_wr.opcode = IBV_WR_RDMA_WRITE;
2017    send_wr.send_flags = IBV_SEND_SIGNALED;
2018    send_wr.sg_list = &sge;
2019    send_wr.num_sge = 1;
2020    send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2021                                (current_addr - block->offset);
2022
2023    trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2024                                   sge.length);
2025
2026    /*
2027     * ibv_post_send() does not return negative error numbers,
2028     * per the specification they are positive - no idea why.
2029     */
2030    ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2031
2032    if (ret == ENOMEM) {
2033        trace_qemu_rdma_write_one_queue_full();
2034        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2035        if (ret < 0) {
2036            error_setg(errp, "rdma migration: failed to make "
2037                         "room in full send queue!");
2038            return -1;
2039        }
2040
2041        goto retry;
2042
2043    } else if (ret > 0) {
2044        error_setg_errno(errp, ret,
2045                         "rdma migration: post rdma write failed");
2046        return -1;
2047    }
2048
2049    set_bit(chunk, block->transit_bitmap);
2050    stat64_add(&mig_stats.normal_pages, sge.length / qemu_target_page_size());
2051    /*
2052     * We are adding to transferred the amount of data written, but no
2053     * overhead at all.  I will assume that RDMA is magicaly and don't
2054     * need to transfer (at least) the addresses where it wants to
2055     * write the pages.  Here it looks like it should be something
2056     * like:
2057     *     sizeof(send_wr) + sge.length
2058     * but this being RDMA, who knows.
2059     */
2060    stat64_add(&mig_stats.rdma_bytes, sge.length);
2061    ram_transferred_add(sge.length);
2062    rdma->total_writes++;
2063
2064    return 0;
2065}
2066
2067/*
2068 * Push out any unwritten RDMA operations.
2069 *
2070 * We support sending out multiple chunks at the same time.
2071 * Not all of them need to get signaled in the completion queue.
2072 */
2073static int qemu_rdma_write_flush(RDMAContext *rdma, Error **errp)
2074{
2075    int ret;
2076
2077    if (!rdma->current_length) {
2078        return 0;
2079    }
2080
2081    ret = qemu_rdma_write_one(rdma, rdma->current_index, rdma->current_addr,
2082                              rdma->current_length, errp);
2083
2084    if (ret < 0) {
2085        return -1;
2086    }
2087
2088    if (ret == 0) {
2089        rdma->nb_sent++;
2090        trace_qemu_rdma_write_flush(rdma->nb_sent);
2091    }
2092
2093    rdma->current_length = 0;
2094    rdma->current_addr = 0;
2095
2096    return 0;
2097}
2098
2099static inline bool qemu_rdma_buffer_mergeable(RDMAContext *rdma,
2100                    uint64_t offset, uint64_t len)
2101{
2102    RDMALocalBlock *block;
2103    uint8_t *host_addr;
2104    uint8_t *chunk_end;
2105
2106    if (rdma->current_index < 0) {
2107        return false;
2108    }
2109
2110    if (rdma->current_chunk < 0) {
2111        return false;
2112    }
2113
2114    block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2115    host_addr = block->local_host_addr + (offset - block->offset);
2116    chunk_end = ram_chunk_end(block, rdma->current_chunk);
2117
2118    if (rdma->current_length == 0) {
2119        return false;
2120    }
2121
2122    /*
2123     * Only merge into chunk sequentially.
2124     */
2125    if (offset != (rdma->current_addr + rdma->current_length)) {
2126        return false;
2127    }
2128
2129    if (offset < block->offset) {
2130        return false;
2131    }
2132
2133    if ((offset + len) > (block->offset + block->length)) {
2134        return false;
2135    }
2136
2137    if ((host_addr + len) > chunk_end) {
2138        return false;
2139    }
2140
2141    return true;
2142}
2143
2144/*
2145 * We're not actually writing here, but doing three things:
2146 *
2147 * 1. Identify the chunk the buffer belongs to.
2148 * 2. If the chunk is full or the buffer doesn't belong to the current
2149 *    chunk, then start a new chunk and flush() the old chunk.
2150 * 3. To keep the hardware busy, we also group chunks into batches
2151 *    and only require that a batch gets acknowledged in the completion
2152 *    queue instead of each individual chunk.
2153 */
2154static int qemu_rdma_write(RDMAContext *rdma,
2155                           uint64_t block_offset, uint64_t offset,
2156                           uint64_t len, Error **errp)
2157{
2158    uint64_t current_addr = block_offset + offset;
2159    uint64_t index = rdma->current_index;
2160    uint64_t chunk = rdma->current_chunk;
2161
2162    /* If we cannot merge it, we flush the current buffer first. */
2163    if (!qemu_rdma_buffer_mergeable(rdma, current_addr, len)) {
2164        if (qemu_rdma_write_flush(rdma, errp) < 0) {
2165            return -1;
2166        }
2167        rdma->current_length = 0;
2168        rdma->current_addr = current_addr;
2169
2170        qemu_rdma_search_ram_block(rdma, block_offset,
2171                                   offset, len, &index, &chunk);
2172        rdma->current_index = index;
2173        rdma->current_chunk = chunk;
2174    }
2175
2176    /* merge it */
2177    rdma->current_length += len;
2178
2179    /* flush it if buffer is too large */
2180    if (rdma->current_length >= RDMA_MERGE_MAX) {
2181        return qemu_rdma_write_flush(rdma, errp);
2182    }
2183
2184    return 0;
2185}
2186
2187static void qemu_rdma_cleanup(RDMAContext *rdma)
2188{
2189    Error *err = NULL;
2190
2191    if (rdma->cm_id && rdma->connected) {
2192        if ((rdma->errored ||
2193             migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
2194            !rdma->received_error) {
2195            RDMAControlHeader head = { .len = 0,
2196                                       .type = RDMA_CONTROL_ERROR,
2197                                       .repeat = 1,
2198                                     };
2199            warn_report("Early error. Sending error.");
2200            if (qemu_rdma_post_send_control(rdma, NULL, &head, &err) < 0) {
2201                warn_report_err(err);
2202            }
2203        }
2204
2205        rdma_disconnect(rdma->cm_id);
2206        trace_qemu_rdma_cleanup_disconnect();
2207        rdma->connected = false;
2208    }
2209
2210    if (rdma->channel) {
2211        qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
2212    }
2213    g_free(rdma->dest_blocks);
2214    rdma->dest_blocks = NULL;
2215
2216    for (int i = 0; i < RDMA_WRID_MAX; i++) {
2217        if (rdma->wr_data[i].control_mr) {
2218            rdma->total_registrations--;
2219            ibv_dereg_mr(rdma->wr_data[i].control_mr);
2220        }
2221        rdma->wr_data[i].control_mr = NULL;
2222    }
2223
2224    if (rdma->local_ram_blocks.block) {
2225        while (rdma->local_ram_blocks.nb_blocks) {
2226            rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2227        }
2228    }
2229
2230    if (rdma->qp) {
2231        rdma_destroy_qp(rdma->cm_id);
2232        rdma->qp = NULL;
2233    }
2234    if (rdma->recv_cq) {
2235        ibv_destroy_cq(rdma->recv_cq);
2236        rdma->recv_cq = NULL;
2237    }
2238    if (rdma->send_cq) {
2239        ibv_destroy_cq(rdma->send_cq);
2240        rdma->send_cq = NULL;
2241    }
2242    if (rdma->recv_comp_channel) {
2243        ibv_destroy_comp_channel(rdma->recv_comp_channel);
2244        rdma->recv_comp_channel = NULL;
2245    }
2246    if (rdma->send_comp_channel) {
2247        ibv_destroy_comp_channel(rdma->send_comp_channel);
2248        rdma->send_comp_channel = NULL;
2249    }
2250    if (rdma->pd) {
2251        ibv_dealloc_pd(rdma->pd);
2252        rdma->pd = NULL;
2253    }
2254    if (rdma->cm_id) {
2255        rdma_destroy_id(rdma->cm_id);
2256        rdma->cm_id = NULL;
2257    }
2258
2259    /* the destination side, listen_id and channel is shared */
2260    if (rdma->listen_id) {
2261        if (!rdma->is_return_path) {
2262            rdma_destroy_id(rdma->listen_id);
2263        }
2264        rdma->listen_id = NULL;
2265
2266        if (rdma->channel) {
2267            if (!rdma->is_return_path) {
2268                rdma_destroy_event_channel(rdma->channel);
2269            }
2270            rdma->channel = NULL;
2271        }
2272    }
2273
2274    if (rdma->channel) {
2275        rdma_destroy_event_channel(rdma->channel);
2276        rdma->channel = NULL;
2277    }
2278    g_free(rdma->host);
2279    rdma->host = NULL;
2280}
2281
2282
2283static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
2284{
2285    int ret;
2286
2287    /*
2288     * Will be validated against destination's actual capabilities
2289     * after the connect() completes.
2290     */
2291    rdma->pin_all = pin_all;
2292
2293    ret = qemu_rdma_resolve_host(rdma, errp);
2294    if (ret < 0) {
2295        goto err_rdma_source_init;
2296    }
2297
2298    ret = qemu_rdma_alloc_pd_cq(rdma, errp);
2299    if (ret < 0) {
2300        goto err_rdma_source_init;
2301    }
2302
2303    ret = qemu_rdma_alloc_qp(rdma);
2304    if (ret < 0) {
2305        error_setg(errp, "RDMA ERROR: rdma migration: error allocating qp!");
2306        goto err_rdma_source_init;
2307    }
2308
2309    qemu_rdma_init_ram_blocks(rdma);
2310
2311    /* Build the hash that maps from offset to RAMBlock */
2312    rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2313    for (int i = 0; i < rdma->local_ram_blocks.nb_blocks; i++) {
2314        g_hash_table_insert(rdma->blockmap,
2315                (void *)(uintptr_t)rdma->local_ram_blocks.block[i].offset,
2316                &rdma->local_ram_blocks.block[i]);
2317    }
2318
2319    for (int i = 0; i < RDMA_WRID_MAX; i++) {
2320        ret = qemu_rdma_reg_control(rdma, i);
2321        if (ret < 0) {
2322            error_setg(errp, "RDMA ERROR: rdma migration: error "
2323                       "registering %d control!", i);
2324            goto err_rdma_source_init;
2325        }
2326    }
2327
2328    return 0;
2329
2330err_rdma_source_init:
2331    qemu_rdma_cleanup(rdma);
2332    return -1;
2333}
2334
2335static int qemu_get_cm_event_timeout(RDMAContext *rdma,
2336                                     struct rdma_cm_event **cm_event,
2337                                     long msec, Error **errp)
2338{
2339    int ret;
2340    struct pollfd poll_fd = {
2341                                .fd = rdma->channel->fd,
2342                                .events = POLLIN,
2343                                .revents = 0
2344                            };
2345
2346    do {
2347        ret = poll(&poll_fd, 1, msec);
2348    } while (ret < 0 && errno == EINTR);
2349
2350    if (ret == 0) {
2351        error_setg(errp, "RDMA ERROR: poll cm event timeout");
2352        return -1;
2353    } else if (ret < 0) {
2354        error_setg(errp, "RDMA ERROR: failed to poll cm event, errno=%i",
2355                   errno);
2356        return -1;
2357    } else if (poll_fd.revents & POLLIN) {
2358        if (rdma_get_cm_event(rdma->channel, cm_event) < 0) {
2359            error_setg(errp, "RDMA ERROR: failed to get cm event");
2360            return -1;
2361        }
2362        return 0;
2363    } else {
2364        error_setg(errp, "RDMA ERROR: no POLLIN event, revent=%x",
2365                   poll_fd.revents);
2366        return -1;
2367    }
2368}
2369
2370static int qemu_rdma_connect(RDMAContext *rdma, bool return_path,
2371                             Error **errp)
2372{
2373    RDMACapabilities cap = {
2374                                .version = RDMA_CONTROL_VERSION_CURRENT,
2375                                .flags = 0,
2376                           };
2377    struct rdma_conn_param conn_param = { .initiator_depth = 2,
2378                                          .retry_count = 5,
2379                                          .private_data = &cap,
2380                                          .private_data_len = sizeof(cap),
2381                                        };
2382    struct rdma_cm_event *cm_event;
2383    int ret;
2384
2385    /*
2386     * Only negotiate the capability with destination if the user
2387     * on the source first requested the capability.
2388     */
2389    if (rdma->pin_all) {
2390        trace_qemu_rdma_connect_pin_all_requested();
2391        cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2392    }
2393
2394    caps_to_network(&cap);
2395
2396    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
2397    if (ret < 0) {
2398        goto err_rdma_source_connect;
2399    }
2400
2401    ret = rdma_connect(rdma->cm_id, &conn_param);
2402    if (ret < 0) {
2403        error_setg_errno(errp, errno,
2404                         "RDMA ERROR: connecting to destination!");
2405        goto err_rdma_source_connect;
2406    }
2407
2408    if (return_path) {
2409        ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp);
2410    } else {
2411        ret = rdma_get_cm_event(rdma->channel, &cm_event);
2412        if (ret < 0) {
2413            error_setg_errno(errp, errno,
2414                             "RDMA ERROR: failed to get cm event");
2415        }
2416    }
2417    if (ret < 0) {
2418        goto err_rdma_source_connect;
2419    }
2420
2421    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2422        error_setg(errp, "RDMA ERROR: connecting to destination!");
2423        rdma_ack_cm_event(cm_event);
2424        goto err_rdma_source_connect;
2425    }
2426    rdma->connected = true;
2427
2428    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2429    network_to_caps(&cap);
2430
2431    /*
2432     * Verify that the *requested* capabilities are supported by the destination
2433     * and disable them otherwise.
2434     */
2435    if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2436        warn_report("RDMA: Server cannot support pinning all memory. "
2437                    "Will register memory dynamically.");
2438        rdma->pin_all = false;
2439    }
2440
2441    trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2442
2443    rdma_ack_cm_event(cm_event);
2444
2445    rdma->control_ready_expected = 1;
2446    rdma->nb_sent = 0;
2447    return 0;
2448
2449err_rdma_source_connect:
2450    qemu_rdma_cleanup(rdma);
2451    return -1;
2452}
2453
2454static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2455{
2456    int ret;
2457    struct rdma_cm_id *listen_id;
2458    char ip[40] = "unknown";
2459    struct rdma_addrinfo *res, *e;
2460    char port_str[16];
2461    int reuse = 1;
2462
2463    for (int i = 0; i < RDMA_WRID_MAX; i++) {
2464        rdma->wr_data[i].control_len = 0;
2465        rdma->wr_data[i].control_curr = NULL;
2466    }
2467
2468    if (!rdma->host || !rdma->host[0]) {
2469        error_setg(errp, "RDMA ERROR: RDMA host is not set!");
2470        rdma->errored = true;
2471        return -1;
2472    }
2473    /* create CM channel */
2474    rdma->channel = rdma_create_event_channel();
2475    if (!rdma->channel) {
2476        error_setg(errp, "RDMA ERROR: could not create rdma event channel");
2477        rdma->errored = true;
2478        return -1;
2479    }
2480
2481    /* create CM id */
2482    ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2483    if (ret < 0) {
2484        error_setg(errp, "RDMA ERROR: could not create cm_id!");
2485        goto err_dest_init_create_listen_id;
2486    }
2487
2488    snprintf(port_str, 16, "%d", rdma->port);
2489    port_str[15] = '\0';
2490
2491    ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2492    if (ret) {
2493        error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s",
2494                   rdma->host);
2495        goto err_dest_init_bind_addr;
2496    }
2497
2498    ret = rdma_set_option(listen_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
2499                          &reuse, sizeof reuse);
2500    if (ret < 0) {
2501        error_setg(errp, "RDMA ERROR: Error: could not set REUSEADDR option");
2502        goto err_dest_init_bind_addr;
2503    }
2504
2505    /* Try all addresses */
2506    for (e = res; e != NULL; e = e->ai_next) {
2507
2508        inet_ntop(e->ai_family,
2509            &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2510        trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2511        ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2512        if (ret < 0) {
2513            continue;
2514        }
2515        break;
2516    }
2517
2518    rdma_freeaddrinfo(res);
2519    if (!e) {
2520        error_setg(errp, "RDMA ERROR: Error: could not rdma_bind_addr!");
2521        goto err_dest_init_bind_addr;
2522    }
2523
2524    rdma->listen_id = listen_id;
2525    qemu_rdma_dump_gid("dest_init", listen_id);
2526    return 0;
2527
2528err_dest_init_bind_addr:
2529    rdma_destroy_id(listen_id);
2530err_dest_init_create_listen_id:
2531    rdma_destroy_event_channel(rdma->channel);
2532    rdma->channel = NULL;
2533    rdma->errored = true;
2534    return -1;
2535
2536}
2537
2538static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
2539                                            RDMAContext *rdma)
2540{
2541    for (int i = 0; i < RDMA_WRID_MAX; i++) {
2542        rdma_return_path->wr_data[i].control_len = 0;
2543        rdma_return_path->wr_data[i].control_curr = NULL;
2544    }
2545
2546    /*the CM channel and CM id is shared*/
2547    rdma_return_path->channel = rdma->channel;
2548    rdma_return_path->listen_id = rdma->listen_id;
2549
2550    rdma->return_path = rdma_return_path;
2551    rdma_return_path->return_path = rdma;
2552    rdma_return_path->is_return_path = true;
2553}
2554
2555static RDMAContext *qemu_rdma_data_init(InetSocketAddress *saddr, Error **errp)
2556{
2557    RDMAContext *rdma = NULL;
2558
2559    rdma = g_new0(RDMAContext, 1);
2560    rdma->current_index = -1;
2561    rdma->current_chunk = -1;
2562
2563    rdma->host = g_strdup(saddr->host);
2564    rdma->port = atoi(saddr->port);
2565    return rdma;
2566}
2567
2568/*
2569 * QEMUFile interface to the control channel.
2570 * SEND messages for control only.
2571 * VM's ram is handled with regular RDMA messages.
2572 */
2573static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2574                                       const struct iovec *iov,
2575                                       size_t niov,
2576                                       int *fds,
2577                                       size_t nfds,
2578                                       int flags,
2579                                       Error **errp)
2580{
2581    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2582    RDMAContext *rdma;
2583    int ret;
2584    ssize_t done = 0;
2585    size_t len;
2586
2587    RCU_READ_LOCK_GUARD();
2588    rdma = qatomic_rcu_read(&rioc->rdmaout);
2589
2590    if (!rdma) {
2591        error_setg(errp, "RDMA control channel output is not set");
2592        return -1;
2593    }
2594
2595    if (rdma->errored) {
2596        error_setg(errp,
2597                   "RDMA is in an error state waiting migration to abort!");
2598        return -1;
2599    }
2600
2601    /*
2602     * Push out any writes that
2603     * we're queued up for VM's ram.
2604     */
2605    ret = qemu_rdma_write_flush(rdma, errp);
2606    if (ret < 0) {
2607        rdma->errored = true;
2608        return -1;
2609    }
2610
2611    for (int i = 0; i < niov; i++) {
2612        size_t remaining = iov[i].iov_len;
2613        uint8_t * data = (void *)iov[i].iov_base;
2614        while (remaining) {
2615            RDMAControlHeader head = {};
2616
2617            len = MIN(remaining, RDMA_SEND_INCREMENT);
2618            remaining -= len;
2619
2620            head.len = len;
2621            head.type = RDMA_CONTROL_QEMU_FILE;
2622
2623            ret = qemu_rdma_exchange_send(rdma, &head,
2624                                          data, NULL, NULL, NULL, errp);
2625
2626            if (ret < 0) {
2627                rdma->errored = true;
2628                return -1;
2629            }
2630
2631            data += len;
2632            done += len;
2633        }
2634    }
2635
2636    return done;
2637}
2638
2639static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2640                             size_t size, int idx)
2641{
2642    size_t len = 0;
2643
2644    if (rdma->wr_data[idx].control_len) {
2645        trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2646
2647        len = MIN(size, rdma->wr_data[idx].control_len);
2648        memcpy(buf, rdma->wr_data[idx].control_curr, len);
2649        rdma->wr_data[idx].control_curr += len;
2650        rdma->wr_data[idx].control_len -= len;
2651    }
2652
2653    return len;
2654}
2655
2656/*
2657 * QEMUFile interface to the control channel.
2658 * RDMA links don't use bytestreams, so we have to
2659 * return bytes to QEMUFile opportunistically.
2660 */
2661static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2662                                      const struct iovec *iov,
2663                                      size_t niov,
2664                                      int **fds,
2665                                      size_t *nfds,
2666                                      int flags,
2667                                      Error **errp)
2668{
2669    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2670    RDMAContext *rdma;
2671    RDMAControlHeader head;
2672    int ret;
2673    ssize_t done = 0;
2674    size_t len;
2675
2676    RCU_READ_LOCK_GUARD();
2677    rdma = qatomic_rcu_read(&rioc->rdmain);
2678
2679    if (!rdma) {
2680        error_setg(errp, "RDMA control channel input is not set");
2681        return -1;
2682    }
2683
2684    if (rdma->errored) {
2685        error_setg(errp,
2686                   "RDMA is in an error state waiting migration to abort!");
2687        return -1;
2688    }
2689
2690    for (int i = 0; i < niov; i++) {
2691        size_t want = iov[i].iov_len;
2692        uint8_t *data = (void *)iov[i].iov_base;
2693
2694        /*
2695         * First, we hold on to the last SEND message we
2696         * were given and dish out the bytes until we run
2697         * out of bytes.
2698         */
2699        len = qemu_rdma_fill(rdma, data, want, 0);
2700        done += len;
2701        want -= len;
2702        /* Got what we needed, so go to next iovec */
2703        if (want == 0) {
2704            continue;
2705        }
2706
2707        /* If we got any data so far, then don't wait
2708         * for more, just return what we have */
2709        if (done > 0) {
2710            break;
2711        }
2712
2713
2714        /* We've got nothing at all, so lets wait for
2715         * more to arrive
2716         */
2717        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE,
2718                                      errp);
2719
2720        if (ret < 0) {
2721            rdma->errored = true;
2722            return -1;
2723        }
2724
2725        /*
2726         * SEND was received with new bytes, now try again.
2727         */
2728        len = qemu_rdma_fill(rdma, data, want, 0);
2729        done += len;
2730        want -= len;
2731
2732        /* Still didn't get enough, so lets just return */
2733        if (want) {
2734            if (done == 0) {
2735                return QIO_CHANNEL_ERR_BLOCK;
2736            } else {
2737                break;
2738            }
2739        }
2740    }
2741    return done;
2742}
2743
2744/*
2745 * Block until all the outstanding chunks have been delivered by the hardware.
2746 */
2747static int qemu_rdma_drain_cq(RDMAContext *rdma)
2748{
2749    Error *err = NULL;
2750
2751    if (qemu_rdma_write_flush(rdma, &err) < 0) {
2752        error_report_err(err);
2753        return -1;
2754    }
2755
2756    while (rdma->nb_sent) {
2757        if (qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL) < 0) {
2758            error_report("rdma migration: complete polling error!");
2759            return -1;
2760        }
2761    }
2762
2763    qemu_rdma_unregister_waiting(rdma);
2764
2765    return 0;
2766}
2767
2768
2769static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2770                                         bool blocking,
2771                                         Error **errp)
2772{
2773    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2774    /* XXX we should make readv/writev actually honour this :-) */
2775    rioc->blocking = blocking;
2776    return 0;
2777}
2778
2779
2780typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2781struct QIOChannelRDMASource {
2782    GSource parent;
2783    QIOChannelRDMA *rioc;
2784    GIOCondition condition;
2785};
2786
2787static gboolean
2788qio_channel_rdma_source_prepare(GSource *source,
2789                                gint *timeout)
2790{
2791    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2792    RDMAContext *rdma;
2793    GIOCondition cond = 0;
2794    *timeout = -1;
2795
2796    RCU_READ_LOCK_GUARD();
2797    if (rsource->condition == G_IO_IN) {
2798        rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
2799    } else {
2800        rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
2801    }
2802
2803    if (!rdma) {
2804        error_report("RDMAContext is NULL when prepare Gsource");
2805        return FALSE;
2806    }
2807
2808    if (rdma->wr_data[0].control_len) {
2809        cond |= G_IO_IN;
2810    }
2811    cond |= G_IO_OUT;
2812
2813    return cond & rsource->condition;
2814}
2815
2816static gboolean
2817qio_channel_rdma_source_check(GSource *source)
2818{
2819    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2820    RDMAContext *rdma;
2821    GIOCondition cond = 0;
2822
2823    RCU_READ_LOCK_GUARD();
2824    if (rsource->condition == G_IO_IN) {
2825        rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
2826    } else {
2827        rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
2828    }
2829
2830    if (!rdma) {
2831        error_report("RDMAContext is NULL when check Gsource");
2832        return FALSE;
2833    }
2834
2835    if (rdma->wr_data[0].control_len) {
2836        cond |= G_IO_IN;
2837    }
2838    cond |= G_IO_OUT;
2839
2840    return cond & rsource->condition;
2841}
2842
2843static gboolean
2844qio_channel_rdma_source_dispatch(GSource *source,
2845                                 GSourceFunc callback,
2846                                 gpointer user_data)
2847{
2848    QIOChannelFunc func = (QIOChannelFunc)callback;
2849    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2850    RDMAContext *rdma;
2851    GIOCondition cond = 0;
2852
2853    RCU_READ_LOCK_GUARD();
2854    if (rsource->condition == G_IO_IN) {
2855        rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
2856    } else {
2857        rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
2858    }
2859
2860    if (!rdma) {
2861        error_report("RDMAContext is NULL when dispatch Gsource");
2862        return FALSE;
2863    }
2864
2865    if (rdma->wr_data[0].control_len) {
2866        cond |= G_IO_IN;
2867    }
2868    cond |= G_IO_OUT;
2869
2870    return (*func)(QIO_CHANNEL(rsource->rioc),
2871                   (cond & rsource->condition),
2872                   user_data);
2873}
2874
2875static void
2876qio_channel_rdma_source_finalize(GSource *source)
2877{
2878    QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
2879
2880    object_unref(OBJECT(ssource->rioc));
2881}
2882
2883static GSourceFuncs qio_channel_rdma_source_funcs = {
2884    qio_channel_rdma_source_prepare,
2885    qio_channel_rdma_source_check,
2886    qio_channel_rdma_source_dispatch,
2887    qio_channel_rdma_source_finalize
2888};
2889
2890static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
2891                                              GIOCondition condition)
2892{
2893    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2894    QIOChannelRDMASource *ssource;
2895    GSource *source;
2896
2897    source = g_source_new(&qio_channel_rdma_source_funcs,
2898                          sizeof(QIOChannelRDMASource));
2899    ssource = (QIOChannelRDMASource *)source;
2900
2901    ssource->rioc = rioc;
2902    object_ref(OBJECT(rioc));
2903
2904    ssource->condition = condition;
2905
2906    return source;
2907}
2908
2909static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
2910                                                AioContext *read_ctx,
2911                                                IOHandler *io_read,
2912                                                AioContext *write_ctx,
2913                                                IOHandler *io_write,
2914                                                void *opaque)
2915{
2916    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2917    if (io_read) {
2918        aio_set_fd_handler(read_ctx, rioc->rdmain->recv_comp_channel->fd,
2919                           io_read, io_write, NULL, NULL, opaque);
2920        aio_set_fd_handler(read_ctx, rioc->rdmain->send_comp_channel->fd,
2921                           io_read, io_write, NULL, NULL, opaque);
2922    } else {
2923        aio_set_fd_handler(write_ctx, rioc->rdmaout->recv_comp_channel->fd,
2924                           io_read, io_write, NULL, NULL, opaque);
2925        aio_set_fd_handler(write_ctx, rioc->rdmaout->send_comp_channel->fd,
2926                           io_read, io_write, NULL, NULL, opaque);
2927    }
2928}
2929
2930struct rdma_close_rcu {
2931    struct rcu_head rcu;
2932    RDMAContext *rdmain;
2933    RDMAContext *rdmaout;
2934};
2935
2936/* callback from qio_channel_rdma_close via call_rcu */
2937static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu)
2938{
2939    if (rcu->rdmain) {
2940        qemu_rdma_cleanup(rcu->rdmain);
2941    }
2942
2943    if (rcu->rdmaout) {
2944        qemu_rdma_cleanup(rcu->rdmaout);
2945    }
2946
2947    g_free(rcu->rdmain);
2948    g_free(rcu->rdmaout);
2949    g_free(rcu);
2950}
2951
2952static int qio_channel_rdma_close(QIOChannel *ioc,
2953                                  Error **errp)
2954{
2955    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2956    RDMAContext *rdmain, *rdmaout;
2957    struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1);
2958
2959    trace_qemu_rdma_close();
2960
2961    rdmain = rioc->rdmain;
2962    if (rdmain) {
2963        qatomic_rcu_set(&rioc->rdmain, NULL);
2964    }
2965
2966    rdmaout = rioc->rdmaout;
2967    if (rdmaout) {
2968        qatomic_rcu_set(&rioc->rdmaout, NULL);
2969    }
2970
2971    rcu->rdmain = rdmain;
2972    rcu->rdmaout = rdmaout;
2973    call_rcu(rcu, qio_channel_rdma_close_rcu, rcu);
2974
2975    return 0;
2976}
2977
2978static int
2979qio_channel_rdma_shutdown(QIOChannel *ioc,
2980                            QIOChannelShutdown how,
2981                            Error **errp)
2982{
2983    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2984    RDMAContext *rdmain, *rdmaout;
2985
2986    RCU_READ_LOCK_GUARD();
2987
2988    rdmain = qatomic_rcu_read(&rioc->rdmain);
2989    rdmaout = qatomic_rcu_read(&rioc->rdmain);
2990
2991    switch (how) {
2992    case QIO_CHANNEL_SHUTDOWN_READ:
2993        if (rdmain) {
2994            rdmain->errored = true;
2995        }
2996        break;
2997    case QIO_CHANNEL_SHUTDOWN_WRITE:
2998        if (rdmaout) {
2999            rdmaout->errored = true;
3000        }
3001        break;
3002    case QIO_CHANNEL_SHUTDOWN_BOTH:
3003    default:
3004        if (rdmain) {
3005            rdmain->errored = true;
3006        }
3007        if (rdmaout) {
3008            rdmaout->errored = true;
3009        }
3010        break;
3011    }
3012
3013    return 0;
3014}
3015
3016/*
3017 * Parameters:
3018 *    @offset == 0 :
3019 *        This means that 'block_offset' is a full virtual address that does not
3020 *        belong to a RAMBlock of the virtual machine and instead
3021 *        represents a private malloc'd memory area that the caller wishes to
3022 *        transfer.
3023 *
3024 *    @offset != 0 :
3025 *        Offset is an offset to be added to block_offset and used
3026 *        to also lookup the corresponding RAMBlock.
3027 *
3028 *    @size : Number of bytes to transfer
3029 *
3030 *    @pages_sent : User-specificed pointer to indicate how many pages were
3031 *                  sent. Usually, this will not be more than a few bytes of
3032 *                  the protocol because most transfers are sent asynchronously.
3033 */
3034static int qemu_rdma_save_page(QEMUFile *f, ram_addr_t block_offset,
3035                               ram_addr_t offset, size_t size)
3036{
3037    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3038    Error *err = NULL;
3039    RDMAContext *rdma;
3040    int ret;
3041
3042    RCU_READ_LOCK_GUARD();
3043    rdma = qatomic_rcu_read(&rioc->rdmaout);
3044
3045    if (!rdma) {
3046        return -1;
3047    }
3048
3049    if (rdma_errored(rdma)) {
3050        return -1;
3051    }
3052
3053    qemu_fflush(f);
3054
3055    /*
3056     * Add this page to the current 'chunk'. If the chunk
3057     * is full, or the page doesn't belong to the current chunk,
3058     * an actual RDMA write will occur and a new chunk will be formed.
3059     */
3060    ret = qemu_rdma_write(rdma, block_offset, offset, size, &err);
3061    if (ret < 0) {
3062        error_report_err(err);
3063        goto err;
3064    }
3065
3066    /*
3067     * Drain the Completion Queue if possible, but do not block,
3068     * just poll.
3069     *
3070     * If nothing to poll, the end of the iteration will do this
3071     * again to make sure we don't overflow the request queue.
3072     */
3073    while (1) {
3074        uint64_t wr_id, wr_id_in;
3075        ret = qemu_rdma_poll(rdma, rdma->recv_cq, &wr_id_in, NULL);
3076
3077        if (ret < 0) {
3078            error_report("rdma migration: polling error");
3079            goto err;
3080        }
3081
3082        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3083
3084        if (wr_id == RDMA_WRID_NONE) {
3085            break;
3086        }
3087    }
3088
3089    while (1) {
3090        uint64_t wr_id, wr_id_in;
3091        ret = qemu_rdma_poll(rdma, rdma->send_cq, &wr_id_in, NULL);
3092
3093        if (ret < 0) {
3094            error_report("rdma migration: polling error");
3095            goto err;
3096        }
3097
3098        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3099
3100        if (wr_id == RDMA_WRID_NONE) {
3101            break;
3102        }
3103    }
3104
3105    return RAM_SAVE_CONTROL_DELAYED;
3106
3107err:
3108    rdma->errored = true;
3109    return -1;
3110}
3111
3112int rdma_control_save_page(QEMUFile *f, ram_addr_t block_offset,
3113                           ram_addr_t offset, size_t size)
3114{
3115    assert(migrate_rdma());
3116
3117    int ret = qemu_rdma_save_page(f, block_offset, offset, size);
3118
3119    if (ret != RAM_SAVE_CONTROL_DELAYED) {
3120        if (ret < 0) {
3121            qemu_file_set_error(f, ret);
3122        }
3123    }
3124    return ret;
3125}
3126
3127static void rdma_accept_incoming_migration(void *opaque);
3128
3129static void rdma_cm_poll_handler(void *opaque)
3130{
3131    RDMAContext *rdma = opaque;
3132    struct rdma_cm_event *cm_event;
3133    MigrationIncomingState *mis = migration_incoming_get_current();
3134
3135    if (rdma_get_cm_event(rdma->channel, &cm_event) < 0) {
3136        error_report("get_cm_event failed %d", errno);
3137        return;
3138    }
3139
3140    if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
3141        cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
3142        if (!rdma->errored &&
3143            migration_incoming_get_current()->state !=
3144              MIGRATION_STATUS_COMPLETED) {
3145            error_report("receive cm event, cm event is %d", cm_event->event);
3146            rdma->errored = true;
3147            if (rdma->return_path) {
3148                rdma->return_path->errored = true;
3149            }
3150        }
3151        rdma_ack_cm_event(cm_event);
3152        if (mis->loadvm_co) {
3153            qemu_coroutine_enter(mis->loadvm_co);
3154        }
3155        return;
3156    }
3157    rdma_ack_cm_event(cm_event);
3158}
3159
3160static int qemu_rdma_accept(RDMAContext *rdma)
3161{
3162    Error *err = NULL;
3163    RDMACapabilities cap;
3164    struct rdma_conn_param conn_param = {
3165                                            .responder_resources = 2,
3166                                            .private_data = &cap,
3167                                            .private_data_len = sizeof(cap),
3168                                         };
3169    RDMAContext *rdma_return_path = NULL;
3170    g_autoptr(InetSocketAddress) isock = g_new0(InetSocketAddress, 1);
3171    struct rdma_cm_event *cm_event;
3172    struct ibv_context *verbs;
3173    int ret;
3174
3175    ret = rdma_get_cm_event(rdma->channel, &cm_event);
3176    if (ret < 0) {
3177        goto err_rdma_dest_wait;
3178    }
3179
3180    if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
3181        rdma_ack_cm_event(cm_event);
3182        goto err_rdma_dest_wait;
3183    }
3184
3185    isock->host = g_strdup(rdma->host);
3186    isock->port = g_strdup_printf("%d", rdma->port);
3187
3188    /*
3189     * initialize the RDMAContext for return path for postcopy after first
3190     * connection request reached.
3191     */
3192    if ((migrate_postcopy() || migrate_return_path())
3193        && !rdma->is_return_path) {
3194        rdma_return_path = qemu_rdma_data_init(isock, NULL);
3195        if (rdma_return_path == NULL) {
3196            rdma_ack_cm_event(cm_event);
3197            goto err_rdma_dest_wait;
3198        }
3199
3200        qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
3201    }
3202
3203    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
3204
3205    network_to_caps(&cap);
3206
3207    if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
3208        error_report("Unknown source RDMA version: %d, bailing...",
3209                     cap.version);
3210        rdma_ack_cm_event(cm_event);
3211        goto err_rdma_dest_wait;
3212    }
3213
3214    /*
3215     * Respond with only the capabilities this version of QEMU knows about.
3216     */
3217    cap.flags &= known_capabilities;
3218
3219    /*
3220     * Enable the ones that we do know about.
3221     * Add other checks here as new ones are introduced.
3222     */
3223    if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
3224        rdma->pin_all = true;
3225    }
3226
3227    rdma->cm_id = cm_event->id;
3228    verbs = cm_event->id->verbs;
3229
3230    rdma_ack_cm_event(cm_event);
3231
3232    trace_qemu_rdma_accept_pin_state(rdma->pin_all);
3233
3234    caps_to_network(&cap);
3235
3236    trace_qemu_rdma_accept_pin_verbsc(verbs);
3237
3238    if (!rdma->verbs) {
3239        rdma->verbs = verbs;
3240    } else if (rdma->verbs != verbs) {
3241        error_report("ibv context not matching %p, %p!", rdma->verbs,
3242                     verbs);
3243        goto err_rdma_dest_wait;
3244    }
3245
3246    qemu_rdma_dump_id("dest_init", verbs);
3247
3248    ret = qemu_rdma_alloc_pd_cq(rdma, &err);
3249    if (ret < 0) {
3250        error_report_err(err);
3251        goto err_rdma_dest_wait;
3252    }
3253
3254    ret = qemu_rdma_alloc_qp(rdma);
3255    if (ret < 0) {
3256        error_report("rdma migration: error allocating qp!");
3257        goto err_rdma_dest_wait;
3258    }
3259
3260    qemu_rdma_init_ram_blocks(rdma);
3261
3262    for (int i = 0; i < RDMA_WRID_MAX; i++) {
3263        ret = qemu_rdma_reg_control(rdma, i);
3264        if (ret < 0) {
3265            error_report("rdma: error registering %d control", i);
3266            goto err_rdma_dest_wait;
3267        }
3268    }
3269
3270    /* Accept the second connection request for return path */
3271    if ((migrate_postcopy() || migrate_return_path())
3272        && !rdma->is_return_path) {
3273        qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3274                            NULL,
3275                            (void *)(intptr_t)rdma->return_path);
3276    } else {
3277        qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
3278                            NULL, rdma);
3279    }
3280
3281    ret = rdma_accept(rdma->cm_id, &conn_param);
3282    if (ret < 0) {
3283        error_report("rdma_accept failed");
3284        goto err_rdma_dest_wait;
3285    }
3286
3287    ret = rdma_get_cm_event(rdma->channel, &cm_event);
3288    if (ret < 0) {
3289        error_report("rdma_accept get_cm_event failed");
3290        goto err_rdma_dest_wait;
3291    }
3292
3293    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
3294        error_report("rdma_accept not event established");
3295        rdma_ack_cm_event(cm_event);
3296        goto err_rdma_dest_wait;
3297    }
3298
3299    rdma_ack_cm_event(cm_event);
3300    rdma->connected = true;
3301
3302    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, &err);
3303    if (ret < 0) {
3304        error_report_err(err);
3305        goto err_rdma_dest_wait;
3306    }
3307
3308    qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3309
3310    return 0;
3311
3312err_rdma_dest_wait:
3313    rdma->errored = true;
3314    qemu_rdma_cleanup(rdma);
3315    g_free(rdma_return_path);
3316    return -1;
3317}
3318
3319static int dest_ram_sort_func(const void *a, const void *b)
3320{
3321    unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3322    unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3323
3324    return (a_index < b_index) ? -1 : (a_index != b_index);
3325}
3326
3327/*
3328 * During each iteration of the migration, we listen for instructions
3329 * by the source VM to perform dynamic page registrations before they
3330 * can perform RDMA operations.
3331 *
3332 * We respond with the 'rkey'.
3333 *
3334 * Keep doing this until the source tells us to stop.
3335 */
3336int rdma_registration_handle(QEMUFile *f)
3337{
3338    RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3339                               .type = RDMA_CONTROL_REGISTER_RESULT,
3340                               .repeat = 0,
3341                             };
3342    RDMAControlHeader unreg_resp = { .len = 0,
3343                               .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3344                               .repeat = 0,
3345                             };
3346    RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3347                                 .repeat = 1 };
3348    QIOChannelRDMA *rioc;
3349    Error *err = NULL;
3350    RDMAContext *rdma;
3351    RDMALocalBlocks *local;
3352    RDMAControlHeader head;
3353    RDMARegister *reg, *registers;
3354    RDMACompress *comp;
3355    RDMARegisterResult *reg_result;
3356    static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3357    RDMALocalBlock *block;
3358    void *host_addr;
3359    int ret;
3360    int idx = 0;
3361
3362    if (!migrate_rdma()) {
3363        return 0;
3364    }
3365
3366    RCU_READ_LOCK_GUARD();
3367    rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3368    rdma = qatomic_rcu_read(&rioc->rdmain);
3369
3370    if (!rdma) {
3371        return -1;
3372    }
3373
3374    if (rdma_errored(rdma)) {
3375        return -1;
3376    }
3377
3378    local = &rdma->local_ram_blocks;
3379    do {
3380        trace_rdma_registration_handle_wait();
3381
3382        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE, &err);
3383
3384        if (ret < 0) {
3385            error_report_err(err);
3386            break;
3387        }
3388
3389        if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
3390            error_report("rdma: Too many requests in this message (%d)."
3391                            "Bailing.", head.repeat);
3392            break;
3393        }
3394
3395        switch (head.type) {
3396        case RDMA_CONTROL_COMPRESS:
3397            comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3398            network_to_compress(comp);
3399
3400            trace_rdma_registration_handle_compress(comp->length,
3401                                                    comp->block_idx,
3402                                                    comp->offset);
3403            if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3404                error_report("rdma: 'compress' bad block index %u (vs %d)",
3405                             (unsigned int)comp->block_idx,
3406                             rdma->local_ram_blocks.nb_blocks);
3407                goto err;
3408            }
3409            block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3410
3411            host_addr = block->local_host_addr +
3412                            (comp->offset - block->offset);
3413            if (comp->value) {
3414                error_report("rdma: Zero page with non-zero (%d) value",
3415                             comp->value);
3416                goto err;
3417            }
3418            ram_handle_zero(host_addr, comp->length);
3419            break;
3420
3421        case RDMA_CONTROL_REGISTER_FINISHED:
3422            trace_rdma_registration_handle_finished();
3423            return 0;
3424
3425        case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3426            trace_rdma_registration_handle_ram_blocks();
3427
3428            /* Sort our local RAM Block list so it's the same as the source,
3429             * we can do this since we've filled in a src_index in the list
3430             * as we received the RAMBlock list earlier.
3431             */
3432            qsort(rdma->local_ram_blocks.block,
3433                  rdma->local_ram_blocks.nb_blocks,
3434                  sizeof(RDMALocalBlock), dest_ram_sort_func);
3435            for (int i = 0; i < local->nb_blocks; i++) {
3436                local->block[i].index = i;
3437            }
3438
3439            if (rdma->pin_all) {
3440                ret = qemu_rdma_reg_whole_ram_blocks(rdma, &err);
3441                if (ret < 0) {
3442                    error_report_err(err);
3443                    goto err;
3444                }
3445            }
3446
3447            /*
3448             * Dest uses this to prepare to transmit the RAMBlock descriptions
3449             * to the source VM after connection setup.
3450             * Both sides use the "remote" structure to communicate and update
3451             * their "local" descriptions with what was sent.
3452             */
3453            for (int i = 0; i < local->nb_blocks; i++) {
3454                rdma->dest_blocks[i].remote_host_addr =
3455                    (uintptr_t)(local->block[i].local_host_addr);
3456
3457                if (rdma->pin_all) {
3458                    rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3459                }
3460
3461                rdma->dest_blocks[i].offset = local->block[i].offset;
3462                rdma->dest_blocks[i].length = local->block[i].length;
3463
3464                dest_block_to_network(&rdma->dest_blocks[i]);
3465                trace_rdma_registration_handle_ram_blocks_loop(
3466                    local->block[i].block_name,
3467                    local->block[i].offset,
3468                    local->block[i].length,
3469                    local->block[i].local_host_addr,
3470                    local->block[i].src_index);
3471            }
3472
3473            blocks.len = rdma->local_ram_blocks.nb_blocks
3474                                                * sizeof(RDMADestBlock);
3475
3476
3477            ret = qemu_rdma_post_send_control(rdma,
3478                                    (uint8_t *) rdma->dest_blocks, &blocks,
3479                                    &err);
3480
3481            if (ret < 0) {
3482                error_report_err(err);
3483                goto err;
3484            }
3485
3486            break;
3487        case RDMA_CONTROL_REGISTER_REQUEST:
3488            trace_rdma_registration_handle_register(head.repeat);
3489
3490            reg_resp.repeat = head.repeat;
3491            registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3492
3493            for (int count = 0; count < head.repeat; count++) {
3494                uint64_t chunk;
3495                uint8_t *chunk_start, *chunk_end;
3496
3497                reg = &registers[count];
3498                network_to_register(reg);
3499
3500                reg_result = &results[count];
3501
3502                trace_rdma_registration_handle_register_loop(count,
3503                         reg->current_index, reg->key.current_addr, reg->chunks);
3504
3505                if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3506                    error_report("rdma: 'register' bad block index %u (vs %d)",
3507                                 (unsigned int)reg->current_index,
3508                                 rdma->local_ram_blocks.nb_blocks);
3509                    goto err;
3510                }
3511                block = &(rdma->local_ram_blocks.block[reg->current_index]);
3512                if (block->is_ram_block) {
3513                    if (block->offset > reg->key.current_addr) {
3514                        error_report("rdma: bad register address for block %s"
3515                            " offset: %" PRIx64 " current_addr: %" PRIx64,
3516                            block->block_name, block->offset,
3517                            reg->key.current_addr);
3518                        goto err;
3519                    }
3520                    host_addr = (block->local_host_addr +
3521                                (reg->key.current_addr - block->offset));
3522                    chunk = ram_chunk_index(block->local_host_addr,
3523                                            (uint8_t *) host_addr);
3524                } else {
3525                    chunk = reg->key.chunk;
3526                    host_addr = block->local_host_addr +
3527                        (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3528                    /* Check for particularly bad chunk value */
3529                    if (host_addr < (void *)block->local_host_addr) {
3530                        error_report("rdma: bad chunk for block %s"
3531                            " chunk: %" PRIx64,
3532                            block->block_name, reg->key.chunk);
3533                        goto err;
3534                    }
3535                }
3536                chunk_start = ram_chunk_start(block, chunk);
3537                chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3538                /* avoid "-Waddress-of-packed-member" warning */
3539                uint32_t tmp_rkey = 0;
3540                if (qemu_rdma_register_and_get_keys(rdma, block,
3541                            (uintptr_t)host_addr, NULL, &tmp_rkey,
3542                            chunk, chunk_start, chunk_end)) {
3543                    error_report("cannot get rkey");
3544                    goto err;
3545                }
3546                reg_result->rkey = tmp_rkey;
3547
3548                reg_result->host_addr = (uintptr_t)block->local_host_addr;
3549
3550                trace_rdma_registration_handle_register_rkey(reg_result->rkey);
3551
3552                result_to_network(reg_result);
3553            }
3554
3555            ret = qemu_rdma_post_send_control(rdma,
3556                            (uint8_t *) results, &reg_resp, &err);
3557
3558            if (ret < 0) {
3559                error_report_err(err);
3560                goto err;
3561            }
3562            break;
3563        case RDMA_CONTROL_UNREGISTER_REQUEST:
3564            trace_rdma_registration_handle_unregister(head.repeat);
3565            unreg_resp.repeat = head.repeat;
3566            registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3567
3568            for (int count = 0; count < head.repeat; count++) {
3569                reg = &registers[count];
3570                network_to_register(reg);
3571
3572                trace_rdma_registration_handle_unregister_loop(count,
3573                           reg->current_index, reg->key.chunk);
3574
3575                block = &(rdma->local_ram_blocks.block[reg->current_index]);
3576
3577                ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3578                block->pmr[reg->key.chunk] = NULL;
3579
3580                if (ret != 0) {
3581                    error_report("rdma unregistration chunk failed: %s",
3582                                 strerror(errno));
3583                    goto err;
3584                }
3585
3586                rdma->total_registrations--;
3587
3588                trace_rdma_registration_handle_unregister_success(reg->key.chunk);
3589            }
3590
3591            ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp, &err);
3592
3593            if (ret < 0) {
3594                error_report_err(err);
3595                goto err;
3596            }
3597            break;
3598        case RDMA_CONTROL_REGISTER_RESULT:
3599            error_report("Invalid RESULT message at dest.");
3600            goto err;
3601        default:
3602            error_report("Unknown control message %s", control_desc(head.type));
3603            goto err;
3604        }
3605    } while (1);
3606
3607err:
3608    rdma->errored = true;
3609    return -1;
3610}
3611
3612/* Destination:
3613 * Called during the initial RAM load section which lists the
3614 * RAMBlocks by name.  This lets us know the order of the RAMBlocks on
3615 * the source.  We've already built our local RAMBlock list, but not
3616 * yet sent the list to the source.
3617 */
3618int rdma_block_notification_handle(QEMUFile *f, const char *name)
3619{
3620    int curr;
3621    int found = -1;
3622
3623    if (!migrate_rdma()) {
3624        return 0;
3625    }
3626
3627    RCU_READ_LOCK_GUARD();
3628    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3629    RDMAContext *rdma = qatomic_rcu_read(&rioc->rdmain);
3630
3631    if (!rdma) {
3632        return -1;
3633    }
3634
3635    /* Find the matching RAMBlock in our local list */
3636    for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3637        if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3638            found = curr;
3639            break;
3640        }
3641    }
3642
3643    if (found == -1) {
3644        error_report("RAMBlock '%s' not found on destination", name);
3645        return -1;
3646    }
3647
3648    rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3649    trace_rdma_block_notification_handle(name, rdma->next_src_index);
3650    rdma->next_src_index++;
3651
3652    return 0;
3653}
3654
3655int rdma_registration_start(QEMUFile *f, uint64_t flags)
3656{
3657    if (!migrate_rdma()) {
3658        return 0;
3659    }
3660
3661    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3662    RCU_READ_LOCK_GUARD();
3663    RDMAContext *rdma = qatomic_rcu_read(&rioc->rdmaout);
3664    if (!rdma) {
3665        return -1;
3666    }
3667
3668    if (rdma_errored(rdma)) {
3669        return -1;
3670    }
3671
3672    trace_rdma_registration_start(flags);
3673    qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3674    return qemu_fflush(f);
3675}
3676
3677/*
3678 * Inform dest that dynamic registrations are done for now.
3679 * First, flush writes, if any.
3680 */
3681int rdma_registration_stop(QEMUFile *f, uint64_t flags)
3682{
3683    QIOChannelRDMA *rioc;
3684    Error *err = NULL;
3685    RDMAContext *rdma;
3686    RDMAControlHeader head = { .len = 0, .repeat = 1 };
3687    int ret;
3688
3689    if (!migrate_rdma()) {
3690        return 0;
3691    }
3692
3693    RCU_READ_LOCK_GUARD();
3694    rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3695    rdma = qatomic_rcu_read(&rioc->rdmaout);
3696    if (!rdma) {
3697        return -1;
3698    }
3699
3700    if (rdma_errored(rdma)) {
3701        return -1;
3702    }
3703
3704    qemu_fflush(f);
3705    ret = qemu_rdma_drain_cq(rdma);
3706
3707    if (ret < 0) {
3708        goto err;
3709    }
3710
3711    if (flags == RAM_CONTROL_SETUP) {
3712        RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3713        RDMALocalBlocks *local = &rdma->local_ram_blocks;
3714        int reg_result_idx, nb_dest_blocks;
3715
3716        head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3717        trace_rdma_registration_stop_ram();
3718
3719        /*
3720         * Make sure that we parallelize the pinning on both sides.
3721         * For very large guests, doing this serially takes a really
3722         * long time, so we have to 'interleave' the pinning locally
3723         * with the control messages by performing the pinning on this
3724         * side before we receive the control response from the other
3725         * side that the pinning has completed.
3726         */
3727        ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3728                    &reg_result_idx, rdma->pin_all ?
3729                    qemu_rdma_reg_whole_ram_blocks : NULL,
3730                    &err);
3731        if (ret < 0) {
3732            error_report_err(err);
3733            return -1;
3734        }
3735
3736        nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3737
3738        /*
3739         * The protocol uses two different sets of rkeys (mutually exclusive):
3740         * 1. One key to represent the virtual address of the entire ram block.
3741         *    (dynamic chunk registration disabled - pin everything with one rkey.)
3742         * 2. One to represent individual chunks within a ram block.
3743         *    (dynamic chunk registration enabled - pin individual chunks.)
3744         *
3745         * Once the capability is successfully negotiated, the destination transmits
3746         * the keys to use (or sends them later) including the virtual addresses
3747         * and then propagates the remote ram block descriptions to his local copy.
3748         */
3749
3750        if (local->nb_blocks != nb_dest_blocks) {
3751            error_report("ram blocks mismatch (Number of blocks %d vs %d)",
3752                         local->nb_blocks, nb_dest_blocks);
3753            error_printf("Your QEMU command line parameters are probably "
3754                         "not identical on both the source and destination.");
3755            rdma->errored = true;
3756            return -1;
3757        }
3758
3759        qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3760        memcpy(rdma->dest_blocks,
3761            rdma->wr_data[reg_result_idx].control_curr, resp.len);
3762        for (int i = 0; i < nb_dest_blocks; i++) {
3763            network_to_dest_block(&rdma->dest_blocks[i]);
3764
3765            /* We require that the blocks are in the same order */
3766            if (rdma->dest_blocks[i].length != local->block[i].length) {
3767                error_report("Block %s/%d has a different length %" PRIu64
3768                             "vs %" PRIu64,
3769                             local->block[i].block_name, i,
3770                             local->block[i].length,
3771                             rdma->dest_blocks[i].length);
3772                rdma->errored = true;
3773                return -1;
3774            }
3775            local->block[i].remote_host_addr =
3776                    rdma->dest_blocks[i].remote_host_addr;
3777            local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3778        }
3779    }
3780
3781    trace_rdma_registration_stop(flags);
3782
3783    head.type = RDMA_CONTROL_REGISTER_FINISHED;
3784    ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL, &err);
3785
3786    if (ret < 0) {
3787        error_report_err(err);
3788        goto err;
3789    }
3790
3791    return 0;
3792err:
3793    rdma->errored = true;
3794    return -1;
3795}
3796
3797static void qio_channel_rdma_finalize(Object *obj)
3798{
3799    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
3800    if (rioc->rdmain) {
3801        qemu_rdma_cleanup(rioc->rdmain);
3802        g_free(rioc->rdmain);
3803        rioc->rdmain = NULL;
3804    }
3805    if (rioc->rdmaout) {
3806        qemu_rdma_cleanup(rioc->rdmaout);
3807        g_free(rioc->rdmaout);
3808        rioc->rdmaout = NULL;
3809    }
3810}
3811
3812static void qio_channel_rdma_class_init(ObjectClass *klass,
3813                                        const void *class_data G_GNUC_UNUSED)
3814{
3815    QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
3816
3817    ioc_klass->io_writev = qio_channel_rdma_writev;
3818    ioc_klass->io_readv = qio_channel_rdma_readv;
3819    ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
3820    ioc_klass->io_close = qio_channel_rdma_close;
3821    ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
3822    ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
3823    ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
3824}
3825
3826static const TypeInfo qio_channel_rdma_info = {
3827    .parent = TYPE_QIO_CHANNEL,
3828    .name = TYPE_QIO_CHANNEL_RDMA,
3829    .instance_size = sizeof(QIOChannelRDMA),
3830    .instance_finalize = qio_channel_rdma_finalize,
3831    .class_init = qio_channel_rdma_class_init,
3832};
3833
3834static void qio_channel_rdma_register_types(void)
3835{
3836    type_register_static(&qio_channel_rdma_info);
3837}
3838
3839type_init(qio_channel_rdma_register_types);
3840
3841static QEMUFile *rdma_new_input(RDMAContext *rdma)
3842{
3843    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
3844
3845    rioc->file = qemu_file_new_input(QIO_CHANNEL(rioc));
3846    rioc->rdmain = rdma;
3847    rioc->rdmaout = rdma->return_path;
3848
3849    return rioc->file;
3850}
3851
3852static QEMUFile *rdma_new_output(RDMAContext *rdma)
3853{
3854    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
3855
3856    rioc->file = qemu_file_new_output(QIO_CHANNEL(rioc));
3857    rioc->rdmaout = rdma;
3858    rioc->rdmain = rdma->return_path;
3859
3860    return rioc->file;
3861}
3862
3863static void rdma_accept_incoming_migration(void *opaque)
3864{
3865    RDMAContext *rdma = opaque;
3866    QEMUFile *f;
3867
3868    trace_qemu_rdma_accept_incoming_migration();
3869    if (qemu_rdma_accept(rdma) < 0) {
3870        error_report("RDMA ERROR: Migration initialization failed");
3871        return;
3872    }
3873
3874    trace_qemu_rdma_accept_incoming_migration_accepted();
3875
3876    if (rdma->is_return_path) {
3877        return;
3878    }
3879
3880    f = rdma_new_input(rdma);
3881    if (f == NULL) {
3882        error_report("RDMA ERROR: could not open RDMA for input");
3883        qemu_rdma_cleanup(rdma);
3884        return;
3885    }
3886
3887    rdma->migration_started_on_destination = 1;
3888    migration_fd_process_incoming(f);
3889}
3890
3891void rdma_start_incoming_migration(InetSocketAddress *host_port,
3892                                   Error **errp)
3893{
3894    MigrationState *s = migrate_get_current();
3895    int ret;
3896    RDMAContext *rdma;
3897
3898    trace_rdma_start_incoming_migration();
3899
3900    /* Avoid ram_block_discard_disable(), cannot change during migration. */
3901    if (ram_block_discard_is_required()) {
3902        error_setg(errp, "RDMA: cannot disable RAM discard");
3903        return;
3904    }
3905
3906    rdma = qemu_rdma_data_init(host_port, errp);
3907    if (rdma == NULL) {
3908        goto err;
3909    }
3910
3911    ret = qemu_rdma_dest_init(rdma, errp);
3912    if (ret < 0) {
3913        goto err;
3914    }
3915
3916    trace_rdma_start_incoming_migration_after_dest_init();
3917
3918    ret = rdma_listen(rdma->listen_id, 5);
3919
3920    if (ret < 0) {
3921        error_setg(errp, "RDMA ERROR: listening on socket!");
3922        goto cleanup_rdma;
3923    }
3924
3925    trace_rdma_start_incoming_migration_after_rdma_listen();
3926    s->rdma_migration = true;
3927    qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3928                        NULL, (void *)(intptr_t)rdma);
3929    return;
3930
3931cleanup_rdma:
3932    qemu_rdma_cleanup(rdma);
3933err:
3934    if (rdma) {
3935        g_free(rdma->host);
3936    }
3937    g_free(rdma);
3938}
3939
3940void rdma_start_outgoing_migration(void *opaque,
3941                            InetSocketAddress *host_port, Error **errp)
3942{
3943    MigrationState *s = opaque;
3944    RDMAContext *rdma_return_path = NULL;
3945    RDMAContext *rdma;
3946    int ret;
3947
3948    /* Avoid ram_block_discard_disable(), cannot change during migration. */
3949    if (ram_block_discard_is_required()) {
3950        error_setg(errp, "RDMA: cannot disable RAM discard");
3951        return;
3952    }
3953
3954    rdma = qemu_rdma_data_init(host_port, errp);
3955    if (rdma == NULL) {
3956        goto err;
3957    }
3958
3959    ret = qemu_rdma_source_init(rdma, migrate_rdma_pin_all(), errp);
3960
3961    if (ret < 0) {
3962        goto err;
3963    }
3964
3965    trace_rdma_start_outgoing_migration_after_rdma_source_init();
3966    ret = qemu_rdma_connect(rdma, false, errp);
3967
3968    if (ret < 0) {
3969        goto err;
3970    }
3971
3972    /* RDMA postcopy need a separate queue pair for return path */
3973    if (migrate_postcopy() || migrate_return_path()) {
3974        rdma_return_path = qemu_rdma_data_init(host_port, errp);
3975
3976        if (rdma_return_path == NULL) {
3977            goto return_path_err;
3978        }
3979
3980        ret = qemu_rdma_source_init(rdma_return_path,
3981                                    migrate_rdma_pin_all(), errp);
3982
3983        if (ret < 0) {
3984            goto return_path_err;
3985        }
3986
3987        ret = qemu_rdma_connect(rdma_return_path, true, errp);
3988
3989        if (ret < 0) {
3990            goto return_path_err;
3991        }
3992
3993        rdma->return_path = rdma_return_path;
3994        rdma_return_path->return_path = rdma;
3995        rdma_return_path->is_return_path = true;
3996    }
3997
3998    trace_rdma_start_outgoing_migration_after_rdma_connect();
3999
4000    s->to_dst_file = rdma_new_output(rdma);
4001    s->rdma_migration = true;
4002    migration_connect(s, NULL);
4003    return;
4004return_path_err:
4005    qemu_rdma_cleanup(rdma);
4006err:
4007    g_free(rdma);
4008    g_free(rdma_return_path);
4009}
4010