qemu/migration/rdma.c
<<
>>
Prefs
   1/*
   2 * RDMA protocol and interfaces
   3 *
   4 * Copyright IBM, Corp. 2010-2013
   5 * Copyright Red Hat, Inc. 2015-2016
   6 *
   7 * Authors:
   8 *  Michael R. Hines <mrhines@us.ibm.com>
   9 *  Jiuxing Liu <jl@us.ibm.com>
  10 *  Daniel P. Berrange <berrange@redhat.com>
  11 *
  12 * This work is licensed under the terms of the GNU GPL, version 2 or
  13 * later.  See the COPYING file in the top-level directory.
  14 *
  15 */
  16#include "qemu/osdep.h"
  17#include "qapi/error.h"
  18#include "qemu-common.h"
  19#include "qemu/cutils.h"
  20#include "rdma.h"
  21#include "migration.h"
  22#include "qemu-file.h"
  23#include "ram.h"
  24#include "qemu-file-channel.h"
  25#include "qemu/error-report.h"
  26#include "qemu/main-loop.h"
  27#include "qemu/sockets.h"
  28#include "qemu/bitmap.h"
  29#include "qemu/coroutine.h"
  30#include <sys/socket.h>
  31#include <netdb.h>
  32#include <arpa/inet.h>
  33#include <rdma/rdma_cma.h>
  34#include "trace.h"
  35
  36/*
  37 * Print and error on both the Monitor and the Log file.
  38 */
  39#define ERROR(errp, fmt, ...) \
  40    do { \
  41        fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
  42        if (errp && (*(errp) == NULL)) { \
  43            error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
  44        } \
  45    } while (0)
  46
  47#define RDMA_RESOLVE_TIMEOUT_MS 10000
  48
  49/* Do not merge data if larger than this. */
  50#define RDMA_MERGE_MAX (2 * 1024 * 1024)
  51#define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
  52
  53#define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
  54
  55/*
  56 * This is only for non-live state being migrated.
  57 * Instead of RDMA_WRITE messages, we use RDMA_SEND
  58 * messages for that state, which requires a different
  59 * delivery design than main memory.
  60 */
  61#define RDMA_SEND_INCREMENT 32768
  62
  63/*
  64 * Maximum size infiniband SEND message
  65 */
  66#define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
  67#define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
  68
  69#define RDMA_CONTROL_VERSION_CURRENT 1
  70/*
  71 * Capabilities for negotiation.
  72 */
  73#define RDMA_CAPABILITY_PIN_ALL 0x01
  74
  75/*
  76 * Add the other flags above to this list of known capabilities
  77 * as they are introduced.
  78 */
  79static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
  80
  81#define CHECK_ERROR_STATE() \
  82    do { \
  83        if (rdma->error_state) { \
  84            if (!rdma->error_reported) { \
  85                error_report("RDMA is in an error state waiting migration" \
  86                                " to abort!"); \
  87                rdma->error_reported = 1; \
  88            } \
  89            return rdma->error_state; \
  90        } \
  91    } while (0)
  92
  93/*
  94 * A work request ID is 64-bits and we split up these bits
  95 * into 3 parts:
  96 *
  97 * bits 0-15 : type of control message, 2^16
  98 * bits 16-29: ram block index, 2^14
  99 * bits 30-63: ram block chunk number, 2^34
 100 *
 101 * The last two bit ranges are only used for RDMA writes,
 102 * in order to track their completion and potentially
 103 * also track unregistration status of the message.
 104 */
 105#define RDMA_WRID_TYPE_SHIFT  0UL
 106#define RDMA_WRID_BLOCK_SHIFT 16UL
 107#define RDMA_WRID_CHUNK_SHIFT 30UL
 108
 109#define RDMA_WRID_TYPE_MASK \
 110    ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
 111
 112#define RDMA_WRID_BLOCK_MASK \
 113    (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
 114
 115#define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
 116
 117/*
 118 * RDMA migration protocol:
 119 * 1. RDMA Writes (data messages, i.e. RAM)
 120 * 2. IB Send/Recv (control channel messages)
 121 */
 122enum {
 123    RDMA_WRID_NONE = 0,
 124    RDMA_WRID_RDMA_WRITE = 1,
 125    RDMA_WRID_SEND_CONTROL = 2000,
 126    RDMA_WRID_RECV_CONTROL = 4000,
 127};
 128
 129static const char *wrid_desc[] = {
 130    [RDMA_WRID_NONE] = "NONE",
 131    [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
 132    [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
 133    [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
 134};
 135
 136/*
 137 * Work request IDs for IB SEND messages only (not RDMA writes).
 138 * This is used by the migration protocol to transmit
 139 * control messages (such as device state and registration commands)
 140 *
 141 * We could use more WRs, but we have enough for now.
 142 */
 143enum {
 144    RDMA_WRID_READY = 0,
 145    RDMA_WRID_DATA,
 146    RDMA_WRID_CONTROL,
 147    RDMA_WRID_MAX,
 148};
 149
 150/*
 151 * SEND/RECV IB Control Messages.
 152 */
 153enum {
 154    RDMA_CONTROL_NONE = 0,
 155    RDMA_CONTROL_ERROR,
 156    RDMA_CONTROL_READY,               /* ready to receive */
 157    RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
 158    RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
 159    RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
 160    RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
 161    RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
 162    RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
 163    RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
 164    RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
 165    RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
 166};
 167
 168
 169/*
 170 * Memory and MR structures used to represent an IB Send/Recv work request.
 171 * This is *not* used for RDMA writes, only IB Send/Recv.
 172 */
 173typedef struct {
 174    uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
 175    struct   ibv_mr *control_mr;               /* registration metadata */
 176    size_t   control_len;                      /* length of the message */
 177    uint8_t *control_curr;                     /* start of unconsumed bytes */
 178} RDMAWorkRequestData;
 179
 180/*
 181 * Negotiate RDMA capabilities during connection-setup time.
 182 */
 183typedef struct {
 184    uint32_t version;
 185    uint32_t flags;
 186} RDMACapabilities;
 187
 188static void caps_to_network(RDMACapabilities *cap)
 189{
 190    cap->version = htonl(cap->version);
 191    cap->flags = htonl(cap->flags);
 192}
 193
 194static void network_to_caps(RDMACapabilities *cap)
 195{
 196    cap->version = ntohl(cap->version);
 197    cap->flags = ntohl(cap->flags);
 198}
 199
 200/*
 201 * Representation of a RAMBlock from an RDMA perspective.
 202 * This is not transmitted, only local.
 203 * This and subsequent structures cannot be linked lists
 204 * because we're using a single IB message to transmit
 205 * the information. It's small anyway, so a list is overkill.
 206 */
 207typedef struct RDMALocalBlock {
 208    char          *block_name;
 209    uint8_t       *local_host_addr; /* local virtual address */
 210    uint64_t       remote_host_addr; /* remote virtual address */
 211    uint64_t       offset;
 212    uint64_t       length;
 213    struct         ibv_mr **pmr;    /* MRs for chunk-level registration */
 214    struct         ibv_mr *mr;      /* MR for non-chunk-level registration */
 215    uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
 216    uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration */
 217    int            index;           /* which block are we */
 218    unsigned int   src_index;       /* (Only used on dest) */
 219    bool           is_ram_block;
 220    int            nb_chunks;
 221    unsigned long *transit_bitmap;
 222    unsigned long *unregister_bitmap;
 223} RDMALocalBlock;
 224
 225/*
 226 * Also represents a RAMblock, but only on the dest.
 227 * This gets transmitted by the dest during connection-time
 228 * to the source VM and then is used to populate the
 229 * corresponding RDMALocalBlock with
 230 * the information needed to perform the actual RDMA.
 231 */
 232typedef struct QEMU_PACKED RDMADestBlock {
 233    uint64_t remote_host_addr;
 234    uint64_t offset;
 235    uint64_t length;
 236    uint32_t remote_rkey;
 237    uint32_t padding;
 238} RDMADestBlock;
 239
 240static const char *control_desc(unsigned int rdma_control)
 241{
 242    static const char *strs[] = {
 243        [RDMA_CONTROL_NONE] = "NONE",
 244        [RDMA_CONTROL_ERROR] = "ERROR",
 245        [RDMA_CONTROL_READY] = "READY",
 246        [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
 247        [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
 248        [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
 249        [RDMA_CONTROL_COMPRESS] = "COMPRESS",
 250        [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
 251        [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
 252        [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
 253        [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
 254        [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
 255    };
 256
 257    if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
 258        return "??BAD CONTROL VALUE??";
 259    }
 260
 261    return strs[rdma_control];
 262}
 263
 264static uint64_t htonll(uint64_t v)
 265{
 266    union { uint32_t lv[2]; uint64_t llv; } u;
 267    u.lv[0] = htonl(v >> 32);
 268    u.lv[1] = htonl(v & 0xFFFFFFFFULL);
 269    return u.llv;
 270}
 271
 272static uint64_t ntohll(uint64_t v) {
 273    union { uint32_t lv[2]; uint64_t llv; } u;
 274    u.llv = v;
 275    return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
 276}
 277
 278static void dest_block_to_network(RDMADestBlock *db)
 279{
 280    db->remote_host_addr = htonll(db->remote_host_addr);
 281    db->offset = htonll(db->offset);
 282    db->length = htonll(db->length);
 283    db->remote_rkey = htonl(db->remote_rkey);
 284}
 285
 286static void network_to_dest_block(RDMADestBlock *db)
 287{
 288    db->remote_host_addr = ntohll(db->remote_host_addr);
 289    db->offset = ntohll(db->offset);
 290    db->length = ntohll(db->length);
 291    db->remote_rkey = ntohl(db->remote_rkey);
 292}
 293
 294/*
 295 * Virtual address of the above structures used for transmitting
 296 * the RAMBlock descriptions at connection-time.
 297 * This structure is *not* transmitted.
 298 */
 299typedef struct RDMALocalBlocks {
 300    int nb_blocks;
 301    bool     init;             /* main memory init complete */
 302    RDMALocalBlock *block;
 303} RDMALocalBlocks;
 304
 305/*
 306 * Main data structure for RDMA state.
 307 * While there is only one copy of this structure being allocated right now,
 308 * this is the place where one would start if you wanted to consider
 309 * having more than one RDMA connection open at the same time.
 310 */
 311typedef struct RDMAContext {
 312    char *host;
 313    int port;
 314
 315    RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
 316
 317    /*
 318     * This is used by *_exchange_send() to figure out whether or not
 319     * the initial "READY" message has already been received or not.
 320     * This is because other functions may potentially poll() and detect
 321     * the READY message before send() does, in which case we need to
 322     * know if it completed.
 323     */
 324    int control_ready_expected;
 325
 326    /* number of outstanding writes */
 327    int nb_sent;
 328
 329    /* store info about current buffer so that we can
 330       merge it with future sends */
 331    uint64_t current_addr;
 332    uint64_t current_length;
 333    /* index of ram block the current buffer belongs to */
 334    int current_index;
 335    /* index of the chunk in the current ram block */
 336    int current_chunk;
 337
 338    bool pin_all;
 339
 340    /*
 341     * infiniband-specific variables for opening the device
 342     * and maintaining connection state and so forth.
 343     *
 344     * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
 345     * cm_id->verbs, cm_id->channel, and cm_id->qp.
 346     */
 347    struct rdma_cm_id *cm_id;               /* connection manager ID */
 348    struct rdma_cm_id *listen_id;
 349    bool connected;
 350
 351    struct ibv_context          *verbs;
 352    struct rdma_event_channel   *channel;
 353    struct ibv_qp *qp;                      /* queue pair */
 354    struct ibv_comp_channel *comp_channel;  /* completion channel */
 355    struct ibv_pd *pd;                      /* protection domain */
 356    struct ibv_cq *cq;                      /* completion queue */
 357
 358    /*
 359     * If a previous write failed (perhaps because of a failed
 360     * memory registration, then do not attempt any future work
 361     * and remember the error state.
 362     */
 363    int error_state;
 364    int error_reported;
 365    int received_error;
 366
 367    /*
 368     * Description of ram blocks used throughout the code.
 369     */
 370    RDMALocalBlocks local_ram_blocks;
 371    RDMADestBlock  *dest_blocks;
 372
 373    /* Index of the next RAMBlock received during block registration */
 374    unsigned int    next_src_index;
 375
 376    /*
 377     * Migration on *destination* started.
 378     * Then use coroutine yield function.
 379     * Source runs in a thread, so we don't care.
 380     */
 381    int migration_started_on_destination;
 382
 383    int total_registrations;
 384    int total_writes;
 385
 386    int unregister_current, unregister_next;
 387    uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
 388
 389    GHashTable *blockmap;
 390} RDMAContext;
 391
 392#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
 393#define QIO_CHANNEL_RDMA(obj)                                     \
 394    OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA)
 395
 396typedef struct QIOChannelRDMA QIOChannelRDMA;
 397
 398
 399struct QIOChannelRDMA {
 400    QIOChannel parent;
 401    RDMAContext *rdma;
 402    QEMUFile *file;
 403    bool blocking; /* XXX we don't actually honour this yet */
 404};
 405
 406/*
 407 * Main structure for IB Send/Recv control messages.
 408 * This gets prepended at the beginning of every Send/Recv.
 409 */
 410typedef struct QEMU_PACKED {
 411    uint32_t len;     /* Total length of data portion */
 412    uint32_t type;    /* which control command to perform */
 413    uint32_t repeat;  /* number of commands in data portion of same type */
 414    uint32_t padding;
 415} RDMAControlHeader;
 416
 417static void control_to_network(RDMAControlHeader *control)
 418{
 419    control->type = htonl(control->type);
 420    control->len = htonl(control->len);
 421    control->repeat = htonl(control->repeat);
 422}
 423
 424static void network_to_control(RDMAControlHeader *control)
 425{
 426    control->type = ntohl(control->type);
 427    control->len = ntohl(control->len);
 428    control->repeat = ntohl(control->repeat);
 429}
 430
 431/*
 432 * Register a single Chunk.
 433 * Information sent by the source VM to inform the dest
 434 * to register an single chunk of memory before we can perform
 435 * the actual RDMA operation.
 436 */
 437typedef struct QEMU_PACKED {
 438    union QEMU_PACKED {
 439        uint64_t current_addr;  /* offset into the ram_addr_t space */
 440        uint64_t chunk;         /* chunk to lookup if unregistering */
 441    } key;
 442    uint32_t current_index; /* which ramblock the chunk belongs to */
 443    uint32_t padding;
 444    uint64_t chunks;            /* how many sequential chunks to register */
 445} RDMARegister;
 446
 447static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
 448{
 449    RDMALocalBlock *local_block;
 450    local_block  = &rdma->local_ram_blocks.block[reg->current_index];
 451
 452    if (local_block->is_ram_block) {
 453        /*
 454         * current_addr as passed in is an address in the local ram_addr_t
 455         * space, we need to translate this for the destination
 456         */
 457        reg->key.current_addr -= local_block->offset;
 458        reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
 459    }
 460    reg->key.current_addr = htonll(reg->key.current_addr);
 461    reg->current_index = htonl(reg->current_index);
 462    reg->chunks = htonll(reg->chunks);
 463}
 464
 465static void network_to_register(RDMARegister *reg)
 466{
 467    reg->key.current_addr = ntohll(reg->key.current_addr);
 468    reg->current_index = ntohl(reg->current_index);
 469    reg->chunks = ntohll(reg->chunks);
 470}
 471
 472typedef struct QEMU_PACKED {
 473    uint32_t value;     /* if zero, we will madvise() */
 474    uint32_t block_idx; /* which ram block index */
 475    uint64_t offset;    /* Address in remote ram_addr_t space */
 476    uint64_t length;    /* length of the chunk */
 477} RDMACompress;
 478
 479static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
 480{
 481    comp->value = htonl(comp->value);
 482    /*
 483     * comp->offset as passed in is an address in the local ram_addr_t
 484     * space, we need to translate this for the destination
 485     */
 486    comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
 487    comp->offset += rdma->dest_blocks[comp->block_idx].offset;
 488    comp->block_idx = htonl(comp->block_idx);
 489    comp->offset = htonll(comp->offset);
 490    comp->length = htonll(comp->length);
 491}
 492
 493static void network_to_compress(RDMACompress *comp)
 494{
 495    comp->value = ntohl(comp->value);
 496    comp->block_idx = ntohl(comp->block_idx);
 497    comp->offset = ntohll(comp->offset);
 498    comp->length = ntohll(comp->length);
 499}
 500
 501/*
 502 * The result of the dest's memory registration produces an "rkey"
 503 * which the source VM must reference in order to perform
 504 * the RDMA operation.
 505 */
 506typedef struct QEMU_PACKED {
 507    uint32_t rkey;
 508    uint32_t padding;
 509    uint64_t host_addr;
 510} RDMARegisterResult;
 511
 512static void result_to_network(RDMARegisterResult *result)
 513{
 514    result->rkey = htonl(result->rkey);
 515    result->host_addr = htonll(result->host_addr);
 516};
 517
 518static void network_to_result(RDMARegisterResult *result)
 519{
 520    result->rkey = ntohl(result->rkey);
 521    result->host_addr = ntohll(result->host_addr);
 522};
 523
 524const char *print_wrid(int wrid);
 525static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
 526                                   uint8_t *data, RDMAControlHeader *resp,
 527                                   int *resp_idx,
 528                                   int (*callback)(RDMAContext *rdma));
 529
 530static inline uint64_t ram_chunk_index(const uint8_t *start,
 531                                       const uint8_t *host)
 532{
 533    return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
 534}
 535
 536static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
 537                                       uint64_t i)
 538{
 539    return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
 540                                  (i << RDMA_REG_CHUNK_SHIFT));
 541}
 542
 543static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
 544                                     uint64_t i)
 545{
 546    uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
 547                                         (1UL << RDMA_REG_CHUNK_SHIFT);
 548
 549    if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
 550        result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
 551    }
 552
 553    return result;
 554}
 555
 556static int rdma_add_block(RDMAContext *rdma, const char *block_name,
 557                         void *host_addr,
 558                         ram_addr_t block_offset, uint64_t length)
 559{
 560    RDMALocalBlocks *local = &rdma->local_ram_blocks;
 561    RDMALocalBlock *block;
 562    RDMALocalBlock *old = local->block;
 563
 564    local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
 565
 566    if (local->nb_blocks) {
 567        int x;
 568
 569        if (rdma->blockmap) {
 570            for (x = 0; x < local->nb_blocks; x++) {
 571                g_hash_table_remove(rdma->blockmap,
 572                                    (void *)(uintptr_t)old[x].offset);
 573                g_hash_table_insert(rdma->blockmap,
 574                                    (void *)(uintptr_t)old[x].offset,
 575                                    &local->block[x]);
 576            }
 577        }
 578        memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
 579        g_free(old);
 580    }
 581
 582    block = &local->block[local->nb_blocks];
 583
 584    block->block_name = g_strdup(block_name);
 585    block->local_host_addr = host_addr;
 586    block->offset = block_offset;
 587    block->length = length;
 588    block->index = local->nb_blocks;
 589    block->src_index = ~0U; /* Filled in by the receipt of the block list */
 590    block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
 591    block->transit_bitmap = bitmap_new(block->nb_chunks);
 592    bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
 593    block->unregister_bitmap = bitmap_new(block->nb_chunks);
 594    bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
 595    block->remote_keys = g_new0(uint32_t, block->nb_chunks);
 596
 597    block->is_ram_block = local->init ? false : true;
 598
 599    if (rdma->blockmap) {
 600        g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
 601    }
 602
 603    trace_rdma_add_block(block_name, local->nb_blocks,
 604                         (uintptr_t) block->local_host_addr,
 605                         block->offset, block->length,
 606                         (uintptr_t) (block->local_host_addr + block->length),
 607                         BITS_TO_LONGS(block->nb_chunks) *
 608                             sizeof(unsigned long) * 8,
 609                         block->nb_chunks);
 610
 611    local->nb_blocks++;
 612
 613    return 0;
 614}
 615
 616/*
 617 * Memory regions need to be registered with the device and queue pairs setup
 618 * in advanced before the migration starts. This tells us where the RAM blocks
 619 * are so that we can register them individually.
 620 */
 621static int qemu_rdma_init_one_block(const char *block_name, void *host_addr,
 622    ram_addr_t block_offset, ram_addr_t length, void *opaque)
 623{
 624    return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
 625}
 626
 627/*
 628 * Identify the RAMBlocks and their quantity. They will be references to
 629 * identify chunk boundaries inside each RAMBlock and also be referenced
 630 * during dynamic page registration.
 631 */
 632static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
 633{
 634    RDMALocalBlocks *local = &rdma->local_ram_blocks;
 635
 636    assert(rdma->blockmap == NULL);
 637    memset(local, 0, sizeof *local);
 638    qemu_ram_foreach_migratable_block(qemu_rdma_init_one_block, rdma);
 639    trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
 640    rdma->dest_blocks = g_new0(RDMADestBlock,
 641                               rdma->local_ram_blocks.nb_blocks);
 642    local->init = true;
 643    return 0;
 644}
 645
 646/*
 647 * Note: If used outside of cleanup, the caller must ensure that the destination
 648 * block structures are also updated
 649 */
 650static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
 651{
 652    RDMALocalBlocks *local = &rdma->local_ram_blocks;
 653    RDMALocalBlock *old = local->block;
 654    int x;
 655
 656    if (rdma->blockmap) {
 657        g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
 658    }
 659    if (block->pmr) {
 660        int j;
 661
 662        for (j = 0; j < block->nb_chunks; j++) {
 663            if (!block->pmr[j]) {
 664                continue;
 665            }
 666            ibv_dereg_mr(block->pmr[j]);
 667            rdma->total_registrations--;
 668        }
 669        g_free(block->pmr);
 670        block->pmr = NULL;
 671    }
 672
 673    if (block->mr) {
 674        ibv_dereg_mr(block->mr);
 675        rdma->total_registrations--;
 676        block->mr = NULL;
 677    }
 678
 679    g_free(block->transit_bitmap);
 680    block->transit_bitmap = NULL;
 681
 682    g_free(block->unregister_bitmap);
 683    block->unregister_bitmap = NULL;
 684
 685    g_free(block->remote_keys);
 686    block->remote_keys = NULL;
 687
 688    g_free(block->block_name);
 689    block->block_name = NULL;
 690
 691    if (rdma->blockmap) {
 692        for (x = 0; x < local->nb_blocks; x++) {
 693            g_hash_table_remove(rdma->blockmap,
 694                                (void *)(uintptr_t)old[x].offset);
 695        }
 696    }
 697
 698    if (local->nb_blocks > 1) {
 699
 700        local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
 701
 702        if (block->index) {
 703            memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
 704        }
 705
 706        if (block->index < (local->nb_blocks - 1)) {
 707            memcpy(local->block + block->index, old + (block->index + 1),
 708                sizeof(RDMALocalBlock) *
 709                    (local->nb_blocks - (block->index + 1)));
 710            for (x = block->index; x < local->nb_blocks - 1; x++) {
 711                local->block[x].index--;
 712            }
 713        }
 714    } else {
 715        assert(block == local->block);
 716        local->block = NULL;
 717    }
 718
 719    trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
 720                           block->offset, block->length,
 721                            (uintptr_t)(block->local_host_addr + block->length),
 722                           BITS_TO_LONGS(block->nb_chunks) *
 723                               sizeof(unsigned long) * 8, block->nb_chunks);
 724
 725    g_free(old);
 726
 727    local->nb_blocks--;
 728
 729    if (local->nb_blocks && rdma->blockmap) {
 730        for (x = 0; x < local->nb_blocks; x++) {
 731            g_hash_table_insert(rdma->blockmap,
 732                                (void *)(uintptr_t)local->block[x].offset,
 733                                &local->block[x]);
 734        }
 735    }
 736
 737    return 0;
 738}
 739
 740/*
 741 * Put in the log file which RDMA device was opened and the details
 742 * associated with that device.
 743 */
 744static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
 745{
 746    struct ibv_port_attr port;
 747
 748    if (ibv_query_port(verbs, 1, &port)) {
 749        error_report("Failed to query port information");
 750        return;
 751    }
 752
 753    printf("%s RDMA Device opened: kernel name %s "
 754           "uverbs device name %s, "
 755           "infiniband_verbs class device path %s, "
 756           "infiniband class device path %s, "
 757           "transport: (%d) %s\n",
 758                who,
 759                verbs->device->name,
 760                verbs->device->dev_name,
 761                verbs->device->dev_path,
 762                verbs->device->ibdev_path,
 763                port.link_layer,
 764                (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
 765                 ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
 766                    ? "Ethernet" : "Unknown"));
 767}
 768
 769/*
 770 * Put in the log file the RDMA gid addressing information,
 771 * useful for folks who have trouble understanding the
 772 * RDMA device hierarchy in the kernel.
 773 */
 774static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
 775{
 776    char sgid[33];
 777    char dgid[33];
 778    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
 779    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
 780    trace_qemu_rdma_dump_gid(who, sgid, dgid);
 781}
 782
 783/*
 784 * As of now, IPv6 over RoCE / iWARP is not supported by linux.
 785 * We will try the next addrinfo struct, and fail if there are
 786 * no other valid addresses to bind against.
 787 *
 788 * If user is listening on '[::]', then we will not have a opened a device
 789 * yet and have no way of verifying if the device is RoCE or not.
 790 *
 791 * In this case, the source VM will throw an error for ALL types of
 792 * connections (both IPv4 and IPv6) if the destination machine does not have
 793 * a regular infiniband network available for use.
 794 *
 795 * The only way to guarantee that an error is thrown for broken kernels is
 796 * for the management software to choose a *specific* interface at bind time
 797 * and validate what time of hardware it is.
 798 *
 799 * Unfortunately, this puts the user in a fix:
 800 *
 801 *  If the source VM connects with an IPv4 address without knowing that the
 802 *  destination has bound to '[::]' the migration will unconditionally fail
 803 *  unless the management software is explicitly listening on the IPv4
 804 *  address while using a RoCE-based device.
 805 *
 806 *  If the source VM connects with an IPv6 address, then we're OK because we can
 807 *  throw an error on the source (and similarly on the destination).
 808 *
 809 *  But in mixed environments, this will be broken for a while until it is fixed
 810 *  inside linux.
 811 *
 812 * We do provide a *tiny* bit of help in this function: We can list all of the
 813 * devices in the system and check to see if all the devices are RoCE or
 814 * Infiniband.
 815 *
 816 * If we detect that we have a *pure* RoCE environment, then we can safely
 817 * thrown an error even if the management software has specified '[::]' as the
 818 * bind address.
 819 *
 820 * However, if there is are multiple hetergeneous devices, then we cannot make
 821 * this assumption and the user just has to be sure they know what they are
 822 * doing.
 823 *
 824 * Patches are being reviewed on linux-rdma.
 825 */
 826static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
 827{
 828    struct ibv_port_attr port_attr;
 829
 830    /* This bug only exists in linux, to our knowledge. */
 831#ifdef CONFIG_LINUX
 832
 833    /*
 834     * Verbs are only NULL if management has bound to '[::]'.
 835     *
 836     * Let's iterate through all the devices and see if there any pure IB
 837     * devices (non-ethernet).
 838     *
 839     * If not, then we can safely proceed with the migration.
 840     * Otherwise, there are no guarantees until the bug is fixed in linux.
 841     */
 842    if (!verbs) {
 843        int num_devices, x;
 844        struct ibv_device ** dev_list = ibv_get_device_list(&num_devices);
 845        bool roce_found = false;
 846        bool ib_found = false;
 847
 848        for (x = 0; x < num_devices; x++) {
 849            verbs = ibv_open_device(dev_list[x]);
 850            if (!verbs) {
 851                if (errno == EPERM) {
 852                    continue;
 853                } else {
 854                    return -EINVAL;
 855                }
 856            }
 857
 858            if (ibv_query_port(verbs, 1, &port_attr)) {
 859                ibv_close_device(verbs);
 860                ERROR(errp, "Could not query initial IB port");
 861                return -EINVAL;
 862            }
 863
 864            if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
 865                ib_found = true;
 866            } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
 867                roce_found = true;
 868            }
 869
 870            ibv_close_device(verbs);
 871
 872        }
 873
 874        if (roce_found) {
 875            if (ib_found) {
 876                fprintf(stderr, "WARN: migrations may fail:"
 877                                " IPv6 over RoCE / iWARP in linux"
 878                                " is broken. But since you appear to have a"
 879                                " mixed RoCE / IB environment, be sure to only"
 880                                " migrate over the IB fabric until the kernel "
 881                                " fixes the bug.\n");
 882            } else {
 883                ERROR(errp, "You only have RoCE / iWARP devices in your systems"
 884                            " and your management software has specified '[::]'"
 885                            ", but IPv6 over RoCE / iWARP is not supported in Linux.");
 886                return -ENONET;
 887            }
 888        }
 889
 890        return 0;
 891    }
 892
 893    /*
 894     * If we have a verbs context, that means that some other than '[::]' was
 895     * used by the management software for binding. In which case we can
 896     * actually warn the user about a potentially broken kernel.
 897     */
 898
 899    /* IB ports start with 1, not 0 */
 900    if (ibv_query_port(verbs, 1, &port_attr)) {
 901        ERROR(errp, "Could not query initial IB port");
 902        return -EINVAL;
 903    }
 904
 905    if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
 906        ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
 907                    "(but patches on linux-rdma in progress)");
 908        return -ENONET;
 909    }
 910
 911#endif
 912
 913    return 0;
 914}
 915
 916/*
 917 * Figure out which RDMA device corresponds to the requested IP hostname
 918 * Also create the initial connection manager identifiers for opening
 919 * the connection.
 920 */
 921static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
 922{
 923    int ret;
 924    struct rdma_addrinfo *res;
 925    char port_str[16];
 926    struct rdma_cm_event *cm_event;
 927    char ip[40] = "unknown";
 928    struct rdma_addrinfo *e;
 929
 930    if (rdma->host == NULL || !strcmp(rdma->host, "")) {
 931        ERROR(errp, "RDMA hostname has not been set");
 932        return -EINVAL;
 933    }
 934
 935    /* create CM channel */
 936    rdma->channel = rdma_create_event_channel();
 937    if (!rdma->channel) {
 938        ERROR(errp, "could not create CM channel");
 939        return -EINVAL;
 940    }
 941
 942    /* create CM id */
 943    ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
 944    if (ret) {
 945        ERROR(errp, "could not create channel id");
 946        goto err_resolve_create_id;
 947    }
 948
 949    snprintf(port_str, 16, "%d", rdma->port);
 950    port_str[15] = '\0';
 951
 952    ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
 953    if (ret < 0) {
 954        ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
 955        goto err_resolve_get_addr;
 956    }
 957
 958    for (e = res; e != NULL; e = e->ai_next) {
 959        inet_ntop(e->ai_family,
 960            &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
 961        trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
 962
 963        ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
 964                RDMA_RESOLVE_TIMEOUT_MS);
 965        if (!ret) {
 966            if (e->ai_family == AF_INET6) {
 967                ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp);
 968                if (ret) {
 969                    continue;
 970                }
 971            }
 972            goto route;
 973        }
 974    }
 975
 976    ERROR(errp, "could not resolve address %s", rdma->host);
 977    goto err_resolve_get_addr;
 978
 979route:
 980    qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
 981
 982    ret = rdma_get_cm_event(rdma->channel, &cm_event);
 983    if (ret) {
 984        ERROR(errp, "could not perform event_addr_resolved");
 985        goto err_resolve_get_addr;
 986    }
 987
 988    if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
 989        ERROR(errp, "result not equal to event_addr_resolved %s",
 990                rdma_event_str(cm_event->event));
 991        perror("rdma_resolve_addr");
 992        rdma_ack_cm_event(cm_event);
 993        ret = -EINVAL;
 994        goto err_resolve_get_addr;
 995    }
 996    rdma_ack_cm_event(cm_event);
 997
 998    /* resolve route */
 999    ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
1000    if (ret) {
1001        ERROR(errp, "could not resolve rdma route");
1002        goto err_resolve_get_addr;
1003    }
1004
1005    ret = rdma_get_cm_event(rdma->channel, &cm_event);
1006    if (ret) {
1007        ERROR(errp, "could not perform event_route_resolved");
1008        goto err_resolve_get_addr;
1009    }
1010    if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
1011        ERROR(errp, "result not equal to event_route_resolved: %s",
1012                        rdma_event_str(cm_event->event));
1013        rdma_ack_cm_event(cm_event);
1014        ret = -EINVAL;
1015        goto err_resolve_get_addr;
1016    }
1017    rdma_ack_cm_event(cm_event);
1018    rdma->verbs = rdma->cm_id->verbs;
1019    qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
1020    qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
1021    return 0;
1022
1023err_resolve_get_addr:
1024    rdma_destroy_id(rdma->cm_id);
1025    rdma->cm_id = NULL;
1026err_resolve_create_id:
1027    rdma_destroy_event_channel(rdma->channel);
1028    rdma->channel = NULL;
1029    return ret;
1030}
1031
1032/*
1033 * Create protection domain and completion queues
1034 */
1035static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
1036{
1037    /* allocate pd */
1038    rdma->pd = ibv_alloc_pd(rdma->verbs);
1039    if (!rdma->pd) {
1040        error_report("failed to allocate protection domain");
1041        return -1;
1042    }
1043
1044    /* create completion channel */
1045    rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
1046    if (!rdma->comp_channel) {
1047        error_report("failed to allocate completion channel");
1048        goto err_alloc_pd_cq;
1049    }
1050
1051    /*
1052     * Completion queue can be filled by both read and write work requests,
1053     * so must reflect the sum of both possible queue sizes.
1054     */
1055    rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1056            NULL, rdma->comp_channel, 0);
1057    if (!rdma->cq) {
1058        error_report("failed to allocate completion queue");
1059        goto err_alloc_pd_cq;
1060    }
1061
1062    return 0;
1063
1064err_alloc_pd_cq:
1065    if (rdma->pd) {
1066        ibv_dealloc_pd(rdma->pd);
1067    }
1068    if (rdma->comp_channel) {
1069        ibv_destroy_comp_channel(rdma->comp_channel);
1070    }
1071    rdma->pd = NULL;
1072    rdma->comp_channel = NULL;
1073    return -1;
1074
1075}
1076
1077/*
1078 * Create queue pairs.
1079 */
1080static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1081{
1082    struct ibv_qp_init_attr attr = { 0 };
1083    int ret;
1084
1085    attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1086    attr.cap.max_recv_wr = 3;
1087    attr.cap.max_send_sge = 1;
1088    attr.cap.max_recv_sge = 1;
1089    attr.send_cq = rdma->cq;
1090    attr.recv_cq = rdma->cq;
1091    attr.qp_type = IBV_QPT_RC;
1092
1093    ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
1094    if (ret) {
1095        return -1;
1096    }
1097
1098    rdma->qp = rdma->cm_id->qp;
1099    return 0;
1100}
1101
1102static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
1103{
1104    int i;
1105    RDMALocalBlocks *local = &rdma->local_ram_blocks;
1106
1107    for (i = 0; i < local->nb_blocks; i++) {
1108        local->block[i].mr =
1109            ibv_reg_mr(rdma->pd,
1110                    local->block[i].local_host_addr,
1111                    local->block[i].length,
1112                    IBV_ACCESS_LOCAL_WRITE |
1113                    IBV_ACCESS_REMOTE_WRITE
1114                    );
1115        if (!local->block[i].mr) {
1116            perror("Failed to register local dest ram block!\n");
1117            break;
1118        }
1119        rdma->total_registrations++;
1120    }
1121
1122    if (i >= local->nb_blocks) {
1123        return 0;
1124    }
1125
1126    for (i--; i >= 0; i--) {
1127        ibv_dereg_mr(local->block[i].mr);
1128        rdma->total_registrations--;
1129    }
1130
1131    return -1;
1132
1133}
1134
1135/*
1136 * Find the ram block that corresponds to the page requested to be
1137 * transmitted by QEMU.
1138 *
1139 * Once the block is found, also identify which 'chunk' within that
1140 * block that the page belongs to.
1141 *
1142 * This search cannot fail or the migration will fail.
1143 */
1144static int qemu_rdma_search_ram_block(RDMAContext *rdma,
1145                                      uintptr_t block_offset,
1146                                      uint64_t offset,
1147                                      uint64_t length,
1148                                      uint64_t *block_index,
1149                                      uint64_t *chunk_index)
1150{
1151    uint64_t current_addr = block_offset + offset;
1152    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1153                                                (void *) block_offset);
1154    assert(block);
1155    assert(current_addr >= block->offset);
1156    assert((current_addr + length) <= (block->offset + block->length));
1157
1158    *block_index = block->index;
1159    *chunk_index = ram_chunk_index(block->local_host_addr,
1160                block->local_host_addr + (current_addr - block->offset));
1161
1162    return 0;
1163}
1164
1165/*
1166 * Register a chunk with IB. If the chunk was already registered
1167 * previously, then skip.
1168 *
1169 * Also return the keys associated with the registration needed
1170 * to perform the actual RDMA operation.
1171 */
1172static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1173        RDMALocalBlock *block, uintptr_t host_addr,
1174        uint32_t *lkey, uint32_t *rkey, int chunk,
1175        uint8_t *chunk_start, uint8_t *chunk_end)
1176{
1177    if (block->mr) {
1178        if (lkey) {
1179            *lkey = block->mr->lkey;
1180        }
1181        if (rkey) {
1182            *rkey = block->mr->rkey;
1183        }
1184        return 0;
1185    }
1186
1187    /* allocate memory to store chunk MRs */
1188    if (!block->pmr) {
1189        block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1190    }
1191
1192    /*
1193     * If 'rkey', then we're the destination, so grant access to the source.
1194     *
1195     * If 'lkey', then we're the source VM, so grant access only to ourselves.
1196     */
1197    if (!block->pmr[chunk]) {
1198        uint64_t len = chunk_end - chunk_start;
1199
1200        trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1201
1202        block->pmr[chunk] = ibv_reg_mr(rdma->pd,
1203                chunk_start, len,
1204                (rkey ? (IBV_ACCESS_LOCAL_WRITE |
1205                        IBV_ACCESS_REMOTE_WRITE) : 0));
1206
1207        if (!block->pmr[chunk]) {
1208            perror("Failed to register chunk!");
1209            fprintf(stderr, "Chunk details: block: %d chunk index %d"
1210                            " start %" PRIuPTR " end %" PRIuPTR
1211                            " host %" PRIuPTR
1212                            " local %" PRIuPTR " registrations: %d\n",
1213                            block->index, chunk, (uintptr_t)chunk_start,
1214                            (uintptr_t)chunk_end, host_addr,
1215                            (uintptr_t)block->local_host_addr,
1216                            rdma->total_registrations);
1217            return -1;
1218        }
1219        rdma->total_registrations++;
1220    }
1221
1222    if (lkey) {
1223        *lkey = block->pmr[chunk]->lkey;
1224    }
1225    if (rkey) {
1226        *rkey = block->pmr[chunk]->rkey;
1227    }
1228    return 0;
1229}
1230
1231/*
1232 * Register (at connection time) the memory used for control
1233 * channel messages.
1234 */
1235static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1236{
1237    rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1238            rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1239            IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1240    if (rdma->wr_data[idx].control_mr) {
1241        rdma->total_registrations++;
1242        return 0;
1243    }
1244    error_report("qemu_rdma_reg_control failed");
1245    return -1;
1246}
1247
1248const char *print_wrid(int wrid)
1249{
1250    if (wrid >= RDMA_WRID_RECV_CONTROL) {
1251        return wrid_desc[RDMA_WRID_RECV_CONTROL];
1252    }
1253    return wrid_desc[wrid];
1254}
1255
1256/*
1257 * RDMA requires memory registration (mlock/pinning), but this is not good for
1258 * overcommitment.
1259 *
1260 * In preparation for the future where LRU information or workload-specific
1261 * writable writable working set memory access behavior is available to QEMU
1262 * it would be nice to have in place the ability to UN-register/UN-pin
1263 * particular memory regions from the RDMA hardware when it is determine that
1264 * those regions of memory will likely not be accessed again in the near future.
1265 *
1266 * While we do not yet have such information right now, the following
1267 * compile-time option allows us to perform a non-optimized version of this
1268 * behavior.
1269 *
1270 * By uncommenting this option, you will cause *all* RDMA transfers to be
1271 * unregistered immediately after the transfer completes on both sides of the
1272 * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
1273 *
1274 * This will have a terrible impact on migration performance, so until future
1275 * workload information or LRU information is available, do not attempt to use
1276 * this feature except for basic testing.
1277 */
1278//#define RDMA_UNREGISTRATION_EXAMPLE
1279
1280/*
1281 * Perform a non-optimized memory unregistration after every transfer
1282 * for demonstration purposes, only if pin-all is not requested.
1283 *
1284 * Potential optimizations:
1285 * 1. Start a new thread to run this function continuously
1286        - for bit clearing
1287        - and for receipt of unregister messages
1288 * 2. Use an LRU.
1289 * 3. Use workload hints.
1290 */
1291static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1292{
1293    while (rdma->unregistrations[rdma->unregister_current]) {
1294        int ret;
1295        uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1296        uint64_t chunk =
1297            (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1298        uint64_t index =
1299            (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1300        RDMALocalBlock *block =
1301            &(rdma->local_ram_blocks.block[index]);
1302        RDMARegister reg = { .current_index = index };
1303        RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1304                                 };
1305        RDMAControlHeader head = { .len = sizeof(RDMARegister),
1306                                   .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1307                                   .repeat = 1,
1308                                 };
1309
1310        trace_qemu_rdma_unregister_waiting_proc(chunk,
1311                                                rdma->unregister_current);
1312
1313        rdma->unregistrations[rdma->unregister_current] = 0;
1314        rdma->unregister_current++;
1315
1316        if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1317            rdma->unregister_current = 0;
1318        }
1319
1320
1321        /*
1322         * Unregistration is speculative (because migration is single-threaded
1323         * and we cannot break the protocol's inifinband message ordering).
1324         * Thus, if the memory is currently being used for transmission,
1325         * then abort the attempt to unregister and try again
1326         * later the next time a completion is received for this memory.
1327         */
1328        clear_bit(chunk, block->unregister_bitmap);
1329
1330        if (test_bit(chunk, block->transit_bitmap)) {
1331            trace_qemu_rdma_unregister_waiting_inflight(chunk);
1332            continue;
1333        }
1334
1335        trace_qemu_rdma_unregister_waiting_send(chunk);
1336
1337        ret = ibv_dereg_mr(block->pmr[chunk]);
1338        block->pmr[chunk] = NULL;
1339        block->remote_keys[chunk] = 0;
1340
1341        if (ret != 0) {
1342            perror("unregistration chunk failed");
1343            return -ret;
1344        }
1345        rdma->total_registrations--;
1346
1347        reg.key.chunk = chunk;
1348        register_to_network(rdma, &reg);
1349        ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1350                                &resp, NULL, NULL);
1351        if (ret < 0) {
1352            return ret;
1353        }
1354
1355        trace_qemu_rdma_unregister_waiting_complete(chunk);
1356    }
1357
1358    return 0;
1359}
1360
1361static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1362                                         uint64_t chunk)
1363{
1364    uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1365
1366    result |= (index << RDMA_WRID_BLOCK_SHIFT);
1367    result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1368
1369    return result;
1370}
1371
1372/*
1373 * Set bit for unregistration in the next iteration.
1374 * We cannot transmit right here, but will unpin later.
1375 */
1376static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
1377                                        uint64_t chunk, uint64_t wr_id)
1378{
1379    if (rdma->unregistrations[rdma->unregister_next] != 0) {
1380        error_report("rdma migration: queue is full");
1381    } else {
1382        RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1383
1384        if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
1385            trace_qemu_rdma_signal_unregister_append(chunk,
1386                                                     rdma->unregister_next);
1387
1388            rdma->unregistrations[rdma->unregister_next++] =
1389                    qemu_rdma_make_wrid(wr_id, index, chunk);
1390
1391            if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
1392                rdma->unregister_next = 0;
1393            }
1394        } else {
1395            trace_qemu_rdma_signal_unregister_already(chunk);
1396        }
1397    }
1398}
1399
1400/*
1401 * Consult the connection manager to see a work request
1402 * (of any kind) has completed.
1403 * Return the work request ID that completed.
1404 */
1405static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
1406                               uint32_t *byte_len)
1407{
1408    int ret;
1409    struct ibv_wc wc;
1410    uint64_t wr_id;
1411
1412    ret = ibv_poll_cq(rdma->cq, 1, &wc);
1413
1414    if (!ret) {
1415        *wr_id_out = RDMA_WRID_NONE;
1416        return 0;
1417    }
1418
1419    if (ret < 0) {
1420        error_report("ibv_poll_cq return %d", ret);
1421        return ret;
1422    }
1423
1424    wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1425
1426    if (wc.status != IBV_WC_SUCCESS) {
1427        fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1428                        wc.status, ibv_wc_status_str(wc.status));
1429        fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1430
1431        return -1;
1432    }
1433
1434    if (rdma->control_ready_expected &&
1435        (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1436        trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL],
1437                  wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1438        rdma->control_ready_expected = 0;
1439    }
1440
1441    if (wr_id == RDMA_WRID_RDMA_WRITE) {
1442        uint64_t chunk =
1443            (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1444        uint64_t index =
1445            (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1446        RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1447
1448        trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
1449                                   index, chunk, block->local_host_addr,
1450                                   (void *)(uintptr_t)block->remote_host_addr);
1451
1452        clear_bit(chunk, block->transit_bitmap);
1453
1454        if (rdma->nb_sent > 0) {
1455            rdma->nb_sent--;
1456        }
1457
1458        if (!rdma->pin_all) {
1459            /*
1460             * FYI: If one wanted to signal a specific chunk to be unregistered
1461             * using LRU or workload-specific information, this is the function
1462             * you would call to do so. That chunk would then get asynchronously
1463             * unregistered later.
1464             */
1465#ifdef RDMA_UNREGISTRATION_EXAMPLE
1466            qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
1467#endif
1468        }
1469    } else {
1470        trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
1471    }
1472
1473    *wr_id_out = wc.wr_id;
1474    if (byte_len) {
1475        *byte_len = wc.byte_len;
1476    }
1477
1478    return  0;
1479}
1480
1481/* Wait for activity on the completion channel.
1482 * Returns 0 on success, none-0 on error.
1483 */
1484static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
1485{
1486    /*
1487     * Coroutine doesn't start until migration_fd_process_incoming()
1488     * so don't yield unless we know we're running inside of a coroutine.
1489     */
1490    if (rdma->migration_started_on_destination) {
1491        yield_until_fd_readable(rdma->comp_channel->fd);
1492    } else {
1493        /* This is the source side, we're in a separate thread
1494         * or destination prior to migration_fd_process_incoming()
1495         * we can't yield; so we have to poll the fd.
1496         * But we need to be able to handle 'cancel' or an error
1497         * without hanging forever.
1498         */
1499        while (!rdma->error_state  && !rdma->received_error) {
1500            GPollFD pfds[1];
1501            pfds[0].fd = rdma->comp_channel->fd;
1502            pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1503            /* 0.1s timeout, should be fine for a 'cancel' */
1504            switch (qemu_poll_ns(pfds, 1, 100 * 1000 * 1000)) {
1505            case 1: /* fd active */
1506                return 0;
1507
1508            case 0: /* Timeout, go around again */
1509                break;
1510
1511            default: /* Error of some type -
1512                      * I don't trust errno from qemu_poll_ns
1513                     */
1514                error_report("%s: poll failed", __func__);
1515                return -EPIPE;
1516            }
1517
1518            if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
1519                /* Bail out and let the cancellation happen */
1520                return -EPIPE;
1521            }
1522        }
1523    }
1524
1525    if (rdma->received_error) {
1526        return -EPIPE;
1527    }
1528    return rdma->error_state;
1529}
1530
1531/*
1532 * Block until the next work request has completed.
1533 *
1534 * First poll to see if a work request has already completed,
1535 * otherwise block.
1536 *
1537 * If we encounter completed work requests for IDs other than
1538 * the one we're interested in, then that's generally an error.
1539 *
1540 * The only exception is actual RDMA Write completions. These
1541 * completions only need to be recorded, but do not actually
1542 * need further processing.
1543 */
1544static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
1545                                    uint32_t *byte_len)
1546{
1547    int num_cq_events = 0, ret = 0;
1548    struct ibv_cq *cq;
1549    void *cq_ctx;
1550    uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1551
1552    if (ibv_req_notify_cq(rdma->cq, 0)) {
1553        return -1;
1554    }
1555    /* poll cq first */
1556    while (wr_id != wrid_requested) {
1557        ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1558        if (ret < 0) {
1559            return ret;
1560        }
1561
1562        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1563
1564        if (wr_id == RDMA_WRID_NONE) {
1565            break;
1566        }
1567        if (wr_id != wrid_requested) {
1568            trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1569                       wrid_requested, print_wrid(wr_id), wr_id);
1570        }
1571    }
1572
1573    if (wr_id == wrid_requested) {
1574        return 0;
1575    }
1576
1577    while (1) {
1578        ret = qemu_rdma_wait_comp_channel(rdma);
1579        if (ret) {
1580            goto err_block_for_wrid;
1581        }
1582
1583        ret = ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx);
1584        if (ret) {
1585            perror("ibv_get_cq_event");
1586            goto err_block_for_wrid;
1587        }
1588
1589        num_cq_events++;
1590
1591        ret = -ibv_req_notify_cq(cq, 0);
1592        if (ret) {
1593            goto err_block_for_wrid;
1594        }
1595
1596        while (wr_id != wrid_requested) {
1597            ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1598            if (ret < 0) {
1599                goto err_block_for_wrid;
1600            }
1601
1602            wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1603
1604            if (wr_id == RDMA_WRID_NONE) {
1605                break;
1606            }
1607            if (wr_id != wrid_requested) {
1608                trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1609                                   wrid_requested, print_wrid(wr_id), wr_id);
1610            }
1611        }
1612
1613        if (wr_id == wrid_requested) {
1614            goto success_block_for_wrid;
1615        }
1616    }
1617
1618success_block_for_wrid:
1619    if (num_cq_events) {
1620        ibv_ack_cq_events(cq, num_cq_events);
1621    }
1622    return 0;
1623
1624err_block_for_wrid:
1625    if (num_cq_events) {
1626        ibv_ack_cq_events(cq, num_cq_events);
1627    }
1628
1629    rdma->error_state = ret;
1630    return ret;
1631}
1632
1633/*
1634 * Post a SEND message work request for the control channel
1635 * containing some data and block until the post completes.
1636 */
1637static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1638                                       RDMAControlHeader *head)
1639{
1640    int ret = 0;
1641    RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1642    struct ibv_send_wr *bad_wr;
1643    struct ibv_sge sge = {
1644                           .addr = (uintptr_t)(wr->control),
1645                           .length = head->len + sizeof(RDMAControlHeader),
1646                           .lkey = wr->control_mr->lkey,
1647                         };
1648    struct ibv_send_wr send_wr = {
1649                                   .wr_id = RDMA_WRID_SEND_CONTROL,
1650                                   .opcode = IBV_WR_SEND,
1651                                   .send_flags = IBV_SEND_SIGNALED,
1652                                   .sg_list = &sge,
1653                                   .num_sge = 1,
1654                                };
1655
1656    trace_qemu_rdma_post_send_control(control_desc(head->type));
1657
1658    /*
1659     * We don't actually need to do a memcpy() in here if we used
1660     * the "sge" properly, but since we're only sending control messages
1661     * (not RAM in a performance-critical path), then its OK for now.
1662     *
1663     * The copy makes the RDMAControlHeader simpler to manipulate
1664     * for the time being.
1665     */
1666    assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1667    memcpy(wr->control, head, sizeof(RDMAControlHeader));
1668    control_to_network((void *) wr->control);
1669
1670    if (buf) {
1671        memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1672    }
1673
1674
1675    ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1676
1677    if (ret > 0) {
1678        error_report("Failed to use post IB SEND for control");
1679        return -ret;
1680    }
1681
1682    ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1683    if (ret < 0) {
1684        error_report("rdma migration: send polling control error");
1685    }
1686
1687    return ret;
1688}
1689
1690/*
1691 * Post a RECV work request in anticipation of some future receipt
1692 * of data on the control channel.
1693 */
1694static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1695{
1696    struct ibv_recv_wr *bad_wr;
1697    struct ibv_sge sge = {
1698                            .addr = (uintptr_t)(rdma->wr_data[idx].control),
1699                            .length = RDMA_CONTROL_MAX_BUFFER,
1700                            .lkey = rdma->wr_data[idx].control_mr->lkey,
1701                         };
1702
1703    struct ibv_recv_wr recv_wr = {
1704                                    .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1705                                    .sg_list = &sge,
1706                                    .num_sge = 1,
1707                                 };
1708
1709
1710    if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1711        return -1;
1712    }
1713
1714    return 0;
1715}
1716
1717/*
1718 * Block and wait for a RECV control channel message to arrive.
1719 */
1720static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1721                RDMAControlHeader *head, int expecting, int idx)
1722{
1723    uint32_t byte_len;
1724    int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1725                                       &byte_len);
1726
1727    if (ret < 0) {
1728        error_report("rdma migration: recv polling control error!");
1729        return ret;
1730    }
1731
1732    network_to_control((void *) rdma->wr_data[idx].control);
1733    memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1734
1735    trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
1736
1737    if (expecting == RDMA_CONTROL_NONE) {
1738        trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
1739                                             head->type);
1740    } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1741        error_report("Was expecting a %s (%d) control message"
1742                ", but got: %s (%d), length: %d",
1743                control_desc(expecting), expecting,
1744                control_desc(head->type), head->type, head->len);
1745        if (head->type == RDMA_CONTROL_ERROR) {
1746            rdma->received_error = true;
1747        }
1748        return -EIO;
1749    }
1750    if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1751        error_report("too long length: %d", head->len);
1752        return -EINVAL;
1753    }
1754    if (sizeof(*head) + head->len != byte_len) {
1755        error_report("Malformed length: %d byte_len %d", head->len, byte_len);
1756        return -EINVAL;
1757    }
1758
1759    return 0;
1760}
1761
1762/*
1763 * When a RECV work request has completed, the work request's
1764 * buffer is pointed at the header.
1765 *
1766 * This will advance the pointer to the data portion
1767 * of the control message of the work request's buffer that
1768 * was populated after the work request finished.
1769 */
1770static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1771                                  RDMAControlHeader *head)
1772{
1773    rdma->wr_data[idx].control_len = head->len;
1774    rdma->wr_data[idx].control_curr =
1775        rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1776}
1777
1778/*
1779 * This is an 'atomic' high-level operation to deliver a single, unified
1780 * control-channel message.
1781 *
1782 * Additionally, if the user is expecting some kind of reply to this message,
1783 * they can request a 'resp' response message be filled in by posting an
1784 * additional work request on behalf of the user and waiting for an additional
1785 * completion.
1786 *
1787 * The extra (optional) response is used during registration to us from having
1788 * to perform an *additional* exchange of message just to provide a response by
1789 * instead piggy-backing on the acknowledgement.
1790 */
1791static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1792                                   uint8_t *data, RDMAControlHeader *resp,
1793                                   int *resp_idx,
1794                                   int (*callback)(RDMAContext *rdma))
1795{
1796    int ret = 0;
1797
1798    /*
1799     * Wait until the dest is ready before attempting to deliver the message
1800     * by waiting for a READY message.
1801     */
1802    if (rdma->control_ready_expected) {
1803        RDMAControlHeader resp;
1804        ret = qemu_rdma_exchange_get_response(rdma,
1805                                    &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1806        if (ret < 0) {
1807            return ret;
1808        }
1809    }
1810
1811    /*
1812     * If the user is expecting a response, post a WR in anticipation of it.
1813     */
1814    if (resp) {
1815        ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1816        if (ret) {
1817            error_report("rdma migration: error posting"
1818                    " extra control recv for anticipated result!");
1819            return ret;
1820        }
1821    }
1822
1823    /*
1824     * Post a WR to replace the one we just consumed for the READY message.
1825     */
1826    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1827    if (ret) {
1828        error_report("rdma migration: error posting first control recv!");
1829        return ret;
1830    }
1831
1832    /*
1833     * Deliver the control message that was requested.
1834     */
1835    ret = qemu_rdma_post_send_control(rdma, data, head);
1836
1837    if (ret < 0) {
1838        error_report("Failed to send control buffer!");
1839        return ret;
1840    }
1841
1842    /*
1843     * If we're expecting a response, block and wait for it.
1844     */
1845    if (resp) {
1846        if (callback) {
1847            trace_qemu_rdma_exchange_send_issue_callback();
1848            ret = callback(rdma);
1849            if (ret < 0) {
1850                return ret;
1851            }
1852        }
1853
1854        trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
1855        ret = qemu_rdma_exchange_get_response(rdma, resp,
1856                                              resp->type, RDMA_WRID_DATA);
1857
1858        if (ret < 0) {
1859            return ret;
1860        }
1861
1862        qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1863        if (resp_idx) {
1864            *resp_idx = RDMA_WRID_DATA;
1865        }
1866        trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
1867    }
1868
1869    rdma->control_ready_expected = 1;
1870
1871    return 0;
1872}
1873
1874/*
1875 * This is an 'atomic' high-level operation to receive a single, unified
1876 * control-channel message.
1877 */
1878static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1879                                int expecting)
1880{
1881    RDMAControlHeader ready = {
1882                                .len = 0,
1883                                .type = RDMA_CONTROL_READY,
1884                                .repeat = 1,
1885                              };
1886    int ret;
1887
1888    /*
1889     * Inform the source that we're ready to receive a message.
1890     */
1891    ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1892
1893    if (ret < 0) {
1894        error_report("Failed to send control buffer!");
1895        return ret;
1896    }
1897
1898    /*
1899     * Block and wait for the message.
1900     */
1901    ret = qemu_rdma_exchange_get_response(rdma, head,
1902                                          expecting, RDMA_WRID_READY);
1903
1904    if (ret < 0) {
1905        return ret;
1906    }
1907
1908    qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1909
1910    /*
1911     * Post a new RECV work request to replace the one we just consumed.
1912     */
1913    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1914    if (ret) {
1915        error_report("rdma migration: error posting second control recv!");
1916        return ret;
1917    }
1918
1919    return 0;
1920}
1921
1922/*
1923 * Write an actual chunk of memory using RDMA.
1924 *
1925 * If we're using dynamic registration on the dest-side, we have to
1926 * send a registration command first.
1927 */
1928static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
1929                               int current_index, uint64_t current_addr,
1930                               uint64_t length)
1931{
1932    struct ibv_sge sge;
1933    struct ibv_send_wr send_wr = { 0 };
1934    struct ibv_send_wr *bad_wr;
1935    int reg_result_idx, ret, count = 0;
1936    uint64_t chunk, chunks;
1937    uint8_t *chunk_start, *chunk_end;
1938    RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
1939    RDMARegister reg;
1940    RDMARegisterResult *reg_result;
1941    RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
1942    RDMAControlHeader head = { .len = sizeof(RDMARegister),
1943                               .type = RDMA_CONTROL_REGISTER_REQUEST,
1944                               .repeat = 1,
1945                             };
1946
1947retry:
1948    sge.addr = (uintptr_t)(block->local_host_addr +
1949                            (current_addr - block->offset));
1950    sge.length = length;
1951
1952    chunk = ram_chunk_index(block->local_host_addr,
1953                            (uint8_t *)(uintptr_t)sge.addr);
1954    chunk_start = ram_chunk_start(block, chunk);
1955
1956    if (block->is_ram_block) {
1957        chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
1958
1959        if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1960            chunks--;
1961        }
1962    } else {
1963        chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
1964
1965        if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1966            chunks--;
1967        }
1968    }
1969
1970    trace_qemu_rdma_write_one_top(chunks + 1,
1971                                  (chunks + 1) *
1972                                  (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
1973
1974    chunk_end = ram_chunk_end(block, chunk + chunks);
1975
1976    if (!rdma->pin_all) {
1977#ifdef RDMA_UNREGISTRATION_EXAMPLE
1978        qemu_rdma_unregister_waiting(rdma);
1979#endif
1980    }
1981
1982    while (test_bit(chunk, block->transit_bitmap)) {
1983        (void)count;
1984        trace_qemu_rdma_write_one_block(count++, current_index, chunk,
1985                sge.addr, length, rdma->nb_sent, block->nb_chunks);
1986
1987        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
1988
1989        if (ret < 0) {
1990            error_report("Failed to Wait for previous write to complete "
1991                    "block %d chunk %" PRIu64
1992                    " current %" PRIu64 " len %" PRIu64 " %d",
1993                    current_index, chunk, sge.addr, length, rdma->nb_sent);
1994            return ret;
1995        }
1996    }
1997
1998    if (!rdma->pin_all || !block->is_ram_block) {
1999        if (!block->remote_keys[chunk]) {
2000            /*
2001             * This chunk has not yet been registered, so first check to see
2002             * if the entire chunk is zero. If so, tell the other size to
2003             * memset() + madvise() the entire chunk without RDMA.
2004             */
2005
2006            if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
2007                RDMACompress comp = {
2008                                        .offset = current_addr,
2009                                        .value = 0,
2010                                        .block_idx = current_index,
2011                                        .length = length,
2012                                    };
2013
2014                head.len = sizeof(comp);
2015                head.type = RDMA_CONTROL_COMPRESS;
2016
2017                trace_qemu_rdma_write_one_zero(chunk, sge.length,
2018                                               current_index, current_addr);
2019
2020                compress_to_network(rdma, &comp);
2021                ret = qemu_rdma_exchange_send(rdma, &head,
2022                                (uint8_t *) &comp, NULL, NULL, NULL);
2023
2024                if (ret < 0) {
2025                    return -EIO;
2026                }
2027
2028                acct_update_position(f, sge.length, true);
2029
2030                return 1;
2031            }
2032
2033            /*
2034             * Otherwise, tell other side to register.
2035             */
2036            reg.current_index = current_index;
2037            if (block->is_ram_block) {
2038                reg.key.current_addr = current_addr;
2039            } else {
2040                reg.key.chunk = chunk;
2041            }
2042            reg.chunks = chunks;
2043
2044            trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
2045                                              current_addr);
2046
2047            register_to_network(rdma, &reg);
2048            ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
2049                                    &resp, &reg_result_idx, NULL);
2050            if (ret < 0) {
2051                return ret;
2052            }
2053
2054            /* try to overlap this single registration with the one we sent. */
2055            if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2056                                                &sge.lkey, NULL, chunk,
2057                                                chunk_start, chunk_end)) {
2058                error_report("cannot get lkey");
2059                return -EINVAL;
2060            }
2061
2062            reg_result = (RDMARegisterResult *)
2063                    rdma->wr_data[reg_result_idx].control_curr;
2064
2065            network_to_result(reg_result);
2066
2067            trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
2068                                                 reg_result->rkey, chunk);
2069
2070            block->remote_keys[chunk] = reg_result->rkey;
2071            block->remote_host_addr = reg_result->host_addr;
2072        } else {
2073            /* already registered before */
2074            if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2075                                                &sge.lkey, NULL, chunk,
2076                                                chunk_start, chunk_end)) {
2077                error_report("cannot get lkey!");
2078                return -EINVAL;
2079            }
2080        }
2081
2082        send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2083    } else {
2084        send_wr.wr.rdma.rkey = block->remote_rkey;
2085
2086        if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2087                                                     &sge.lkey, NULL, chunk,
2088                                                     chunk_start, chunk_end)) {
2089            error_report("cannot get lkey!");
2090            return -EINVAL;
2091        }
2092    }
2093
2094    /*
2095     * Encode the ram block index and chunk within this wrid.
2096     * We will use this information at the time of completion
2097     * to figure out which bitmap to check against and then which
2098     * chunk in the bitmap to look for.
2099     */
2100    send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2101                                        current_index, chunk);
2102
2103    send_wr.opcode = IBV_WR_RDMA_WRITE;
2104    send_wr.send_flags = IBV_SEND_SIGNALED;
2105    send_wr.sg_list = &sge;
2106    send_wr.num_sge = 1;
2107    send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2108                                (current_addr - block->offset);
2109
2110    trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2111                                   sge.length);
2112
2113    /*
2114     * ibv_post_send() does not return negative error numbers,
2115     * per the specification they are positive - no idea why.
2116     */
2117    ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2118
2119    if (ret == ENOMEM) {
2120        trace_qemu_rdma_write_one_queue_full();
2121        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2122        if (ret < 0) {
2123            error_report("rdma migration: failed to make "
2124                         "room in full send queue! %d", ret);
2125            return ret;
2126        }
2127
2128        goto retry;
2129
2130    } else if (ret > 0) {
2131        perror("rdma migration: post rdma write failed");
2132        return -ret;
2133    }
2134
2135    set_bit(chunk, block->transit_bitmap);
2136    acct_update_position(f, sge.length, false);
2137    rdma->total_writes++;
2138
2139    return 0;
2140}
2141
2142/*
2143 * Push out any unwritten RDMA operations.
2144 *
2145 * We support sending out multiple chunks at the same time.
2146 * Not all of them need to get signaled in the completion queue.
2147 */
2148static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
2149{
2150    int ret;
2151
2152    if (!rdma->current_length) {
2153        return 0;
2154    }
2155
2156    ret = qemu_rdma_write_one(f, rdma,
2157            rdma->current_index, rdma->current_addr, rdma->current_length);
2158
2159    if (ret < 0) {
2160        return ret;
2161    }
2162
2163    if (ret == 0) {
2164        rdma->nb_sent++;
2165        trace_qemu_rdma_write_flush(rdma->nb_sent);
2166    }
2167
2168    rdma->current_length = 0;
2169    rdma->current_addr = 0;
2170
2171    return 0;
2172}
2173
2174static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
2175                    uint64_t offset, uint64_t len)
2176{
2177    RDMALocalBlock *block;
2178    uint8_t *host_addr;
2179    uint8_t *chunk_end;
2180
2181    if (rdma->current_index < 0) {
2182        return 0;
2183    }
2184
2185    if (rdma->current_chunk < 0) {
2186        return 0;
2187    }
2188
2189    block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2190    host_addr = block->local_host_addr + (offset - block->offset);
2191    chunk_end = ram_chunk_end(block, rdma->current_chunk);
2192
2193    if (rdma->current_length == 0) {
2194        return 0;
2195    }
2196
2197    /*
2198     * Only merge into chunk sequentially.
2199     */
2200    if (offset != (rdma->current_addr + rdma->current_length)) {
2201        return 0;
2202    }
2203
2204    if (offset < block->offset) {
2205        return 0;
2206    }
2207
2208    if ((offset + len) > (block->offset + block->length)) {
2209        return 0;
2210    }
2211
2212    if ((host_addr + len) > chunk_end) {
2213        return 0;
2214    }
2215
2216    return 1;
2217}
2218
2219/*
2220 * We're not actually writing here, but doing three things:
2221 *
2222 * 1. Identify the chunk the buffer belongs to.
2223 * 2. If the chunk is full or the buffer doesn't belong to the current
2224 *    chunk, then start a new chunk and flush() the old chunk.
2225 * 3. To keep the hardware busy, we also group chunks into batches
2226 *    and only require that a batch gets acknowledged in the completion
2227 *    qeueue instead of each individual chunk.
2228 */
2229static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
2230                           uint64_t block_offset, uint64_t offset,
2231                           uint64_t len)
2232{
2233    uint64_t current_addr = block_offset + offset;
2234    uint64_t index = rdma->current_index;
2235    uint64_t chunk = rdma->current_chunk;
2236    int ret;
2237
2238    /* If we cannot merge it, we flush the current buffer first. */
2239    if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
2240        ret = qemu_rdma_write_flush(f, rdma);
2241        if (ret) {
2242            return ret;
2243        }
2244        rdma->current_length = 0;
2245        rdma->current_addr = current_addr;
2246
2247        ret = qemu_rdma_search_ram_block(rdma, block_offset,
2248                                         offset, len, &index, &chunk);
2249        if (ret) {
2250            error_report("ram block search failed");
2251            return ret;
2252        }
2253        rdma->current_index = index;
2254        rdma->current_chunk = chunk;
2255    }
2256
2257    /* merge it */
2258    rdma->current_length += len;
2259
2260    /* flush it if buffer is too large */
2261    if (rdma->current_length >= RDMA_MERGE_MAX) {
2262        return qemu_rdma_write_flush(f, rdma);
2263    }
2264
2265    return 0;
2266}
2267
2268static void qemu_rdma_cleanup(RDMAContext *rdma)
2269{
2270    int idx;
2271
2272    if (rdma->cm_id && rdma->connected) {
2273        if ((rdma->error_state ||
2274             migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
2275            !rdma->received_error) {
2276            RDMAControlHeader head = { .len = 0,
2277                                       .type = RDMA_CONTROL_ERROR,
2278                                       .repeat = 1,
2279                                     };
2280            error_report("Early error. Sending error.");
2281            qemu_rdma_post_send_control(rdma, NULL, &head);
2282        }
2283
2284        rdma_disconnect(rdma->cm_id);
2285        trace_qemu_rdma_cleanup_disconnect();
2286        rdma->connected = false;
2287    }
2288
2289    g_free(rdma->dest_blocks);
2290    rdma->dest_blocks = NULL;
2291
2292    for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2293        if (rdma->wr_data[idx].control_mr) {
2294            rdma->total_registrations--;
2295            ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2296        }
2297        rdma->wr_data[idx].control_mr = NULL;
2298    }
2299
2300    if (rdma->local_ram_blocks.block) {
2301        while (rdma->local_ram_blocks.nb_blocks) {
2302            rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2303        }
2304    }
2305
2306    if (rdma->qp) {
2307        rdma_destroy_qp(rdma->cm_id);
2308        rdma->qp = NULL;
2309    }
2310    if (rdma->cq) {
2311        ibv_destroy_cq(rdma->cq);
2312        rdma->cq = NULL;
2313    }
2314    if (rdma->comp_channel) {
2315        ibv_destroy_comp_channel(rdma->comp_channel);
2316        rdma->comp_channel = NULL;
2317    }
2318    if (rdma->pd) {
2319        ibv_dealloc_pd(rdma->pd);
2320        rdma->pd = NULL;
2321    }
2322    if (rdma->cm_id) {
2323        rdma_destroy_id(rdma->cm_id);
2324        rdma->cm_id = NULL;
2325    }
2326    if (rdma->listen_id) {
2327        rdma_destroy_id(rdma->listen_id);
2328        rdma->listen_id = NULL;
2329    }
2330    if (rdma->channel) {
2331        rdma_destroy_event_channel(rdma->channel);
2332        rdma->channel = NULL;
2333    }
2334    g_free(rdma->host);
2335    rdma->host = NULL;
2336}
2337
2338
2339static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
2340{
2341    int ret, idx;
2342    Error *local_err = NULL, **temp = &local_err;
2343
2344    /*
2345     * Will be validated against destination's actual capabilities
2346     * after the connect() completes.
2347     */
2348    rdma->pin_all = pin_all;
2349
2350    ret = qemu_rdma_resolve_host(rdma, temp);
2351    if (ret) {
2352        goto err_rdma_source_init;
2353    }
2354
2355    ret = qemu_rdma_alloc_pd_cq(rdma);
2356    if (ret) {
2357        ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2358                    " limits may be too low. Please check $ ulimit -a # and "
2359                    "search for 'ulimit -l' in the output");
2360        goto err_rdma_source_init;
2361    }
2362
2363    ret = qemu_rdma_alloc_qp(rdma);
2364    if (ret) {
2365        ERROR(temp, "rdma migration: error allocating qp!");
2366        goto err_rdma_source_init;
2367    }
2368
2369    ret = qemu_rdma_init_ram_blocks(rdma);
2370    if (ret) {
2371        ERROR(temp, "rdma migration: error initializing ram blocks!");
2372        goto err_rdma_source_init;
2373    }
2374
2375    /* Build the hash that maps from offset to RAMBlock */
2376    rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2377    for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
2378        g_hash_table_insert(rdma->blockmap,
2379                (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
2380                &rdma->local_ram_blocks.block[idx]);
2381    }
2382
2383    for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2384        ret = qemu_rdma_reg_control(rdma, idx);
2385        if (ret) {
2386            ERROR(temp, "rdma migration: error registering %d control!",
2387                                                            idx);
2388            goto err_rdma_source_init;
2389        }
2390    }
2391
2392    return 0;
2393
2394err_rdma_source_init:
2395    error_propagate(errp, local_err);
2396    qemu_rdma_cleanup(rdma);
2397    return -1;
2398}
2399
2400static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
2401{
2402    RDMACapabilities cap = {
2403                                .version = RDMA_CONTROL_VERSION_CURRENT,
2404                                .flags = 0,
2405                           };
2406    struct rdma_conn_param conn_param = { .initiator_depth = 2,
2407                                          .retry_count = 5,
2408                                          .private_data = &cap,
2409                                          .private_data_len = sizeof(cap),
2410                                        };
2411    struct rdma_cm_event *cm_event;
2412    int ret;
2413
2414    /*
2415     * Only negotiate the capability with destination if the user
2416     * on the source first requested the capability.
2417     */
2418    if (rdma->pin_all) {
2419        trace_qemu_rdma_connect_pin_all_requested();
2420        cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2421    }
2422
2423    caps_to_network(&cap);
2424
2425    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2426    if (ret) {
2427        ERROR(errp, "posting second control recv");
2428        goto err_rdma_source_connect;
2429    }
2430
2431    ret = rdma_connect(rdma->cm_id, &conn_param);
2432    if (ret) {
2433        perror("rdma_connect");
2434        ERROR(errp, "connecting to destination!");
2435        goto err_rdma_source_connect;
2436    }
2437
2438    ret = rdma_get_cm_event(rdma->channel, &cm_event);
2439    if (ret) {
2440        perror("rdma_get_cm_event after rdma_connect");
2441        ERROR(errp, "connecting to destination!");
2442        rdma_ack_cm_event(cm_event);
2443        goto err_rdma_source_connect;
2444    }
2445
2446    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2447        perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
2448        ERROR(errp, "connecting to destination!");
2449        rdma_ack_cm_event(cm_event);
2450        goto err_rdma_source_connect;
2451    }
2452    rdma->connected = true;
2453
2454    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2455    network_to_caps(&cap);
2456
2457    /*
2458     * Verify that the *requested* capabilities are supported by the destination
2459     * and disable them otherwise.
2460     */
2461    if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2462        ERROR(errp, "Server cannot support pinning all memory. "
2463                        "Will register memory dynamically.");
2464        rdma->pin_all = false;
2465    }
2466
2467    trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2468
2469    rdma_ack_cm_event(cm_event);
2470
2471    rdma->control_ready_expected = 1;
2472    rdma->nb_sent = 0;
2473    return 0;
2474
2475err_rdma_source_connect:
2476    qemu_rdma_cleanup(rdma);
2477    return -1;
2478}
2479
2480static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2481{
2482    int ret, idx;
2483    struct rdma_cm_id *listen_id;
2484    char ip[40] = "unknown";
2485    struct rdma_addrinfo *res, *e;
2486    char port_str[16];
2487
2488    for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2489        rdma->wr_data[idx].control_len = 0;
2490        rdma->wr_data[idx].control_curr = NULL;
2491    }
2492
2493    if (!rdma->host || !rdma->host[0]) {
2494        ERROR(errp, "RDMA host is not set!");
2495        rdma->error_state = -EINVAL;
2496        return -1;
2497    }
2498    /* create CM channel */
2499    rdma->channel = rdma_create_event_channel();
2500    if (!rdma->channel) {
2501        ERROR(errp, "could not create rdma event channel");
2502        rdma->error_state = -EINVAL;
2503        return -1;
2504    }
2505
2506    /* create CM id */
2507    ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2508    if (ret) {
2509        ERROR(errp, "could not create cm_id!");
2510        goto err_dest_init_create_listen_id;
2511    }
2512
2513    snprintf(port_str, 16, "%d", rdma->port);
2514    port_str[15] = '\0';
2515
2516    ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2517    if (ret < 0) {
2518        ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
2519        goto err_dest_init_bind_addr;
2520    }
2521
2522    for (e = res; e != NULL; e = e->ai_next) {
2523        inet_ntop(e->ai_family,
2524            &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2525        trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2526        ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2527        if (ret) {
2528            continue;
2529        }
2530        if (e->ai_family == AF_INET6) {
2531            ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp);
2532            if (ret) {
2533                continue;
2534            }
2535        }
2536        break;
2537    }
2538
2539    if (!e) {
2540        ERROR(errp, "Error: could not rdma_bind_addr!");
2541        goto err_dest_init_bind_addr;
2542    }
2543
2544    rdma->listen_id = listen_id;
2545    qemu_rdma_dump_gid("dest_init", listen_id);
2546    return 0;
2547
2548err_dest_init_bind_addr:
2549    rdma_destroy_id(listen_id);
2550err_dest_init_create_listen_id:
2551    rdma_destroy_event_channel(rdma->channel);
2552    rdma->channel = NULL;
2553    rdma->error_state = ret;
2554    return ret;
2555
2556}
2557
2558static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2559{
2560    RDMAContext *rdma = NULL;
2561    InetSocketAddress *addr;
2562
2563    if (host_port) {
2564        rdma = g_new0(RDMAContext, 1);
2565        rdma->current_index = -1;
2566        rdma->current_chunk = -1;
2567
2568        addr = g_new(InetSocketAddress, 1);
2569        if (!inet_parse(addr, host_port, NULL)) {
2570            rdma->port = atoi(addr->port);
2571            rdma->host = g_strdup(addr->host);
2572        } else {
2573            ERROR(errp, "bad RDMA migration address '%s'", host_port);
2574            g_free(rdma);
2575            rdma = NULL;
2576        }
2577
2578        qapi_free_InetSocketAddress(addr);
2579    }
2580
2581    return rdma;
2582}
2583
2584/*
2585 * QEMUFile interface to the control channel.
2586 * SEND messages for control only.
2587 * VM's ram is handled with regular RDMA messages.
2588 */
2589static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2590                                       const struct iovec *iov,
2591                                       size_t niov,
2592                                       int *fds,
2593                                       size_t nfds,
2594                                       Error **errp)
2595{
2596    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2597    QEMUFile *f = rioc->file;
2598    RDMAContext *rdma = rioc->rdma;
2599    int ret;
2600    ssize_t done = 0;
2601    size_t i;
2602    size_t len = 0;
2603
2604    CHECK_ERROR_STATE();
2605
2606    /*
2607     * Push out any writes that
2608     * we're queued up for VM's ram.
2609     */
2610    ret = qemu_rdma_write_flush(f, rdma);
2611    if (ret < 0) {
2612        rdma->error_state = ret;
2613        return ret;
2614    }
2615
2616    for (i = 0; i < niov; i++) {
2617        size_t remaining = iov[i].iov_len;
2618        uint8_t * data = (void *)iov[i].iov_base;
2619        while (remaining) {
2620            RDMAControlHeader head;
2621
2622            len = MIN(remaining, RDMA_SEND_INCREMENT);
2623            remaining -= len;
2624
2625            head.len = len;
2626            head.type = RDMA_CONTROL_QEMU_FILE;
2627
2628            ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2629
2630            if (ret < 0) {
2631                rdma->error_state = ret;
2632                return ret;
2633            }
2634
2635            data += len;
2636            done += len;
2637        }
2638    }
2639
2640    return done;
2641}
2642
2643static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2644                             size_t size, int idx)
2645{
2646    size_t len = 0;
2647
2648    if (rdma->wr_data[idx].control_len) {
2649        trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2650
2651        len = MIN(size, rdma->wr_data[idx].control_len);
2652        memcpy(buf, rdma->wr_data[idx].control_curr, len);
2653        rdma->wr_data[idx].control_curr += len;
2654        rdma->wr_data[idx].control_len -= len;
2655    }
2656
2657    return len;
2658}
2659
2660/*
2661 * QEMUFile interface to the control channel.
2662 * RDMA links don't use bytestreams, so we have to
2663 * return bytes to QEMUFile opportunistically.
2664 */
2665static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2666                                      const struct iovec *iov,
2667                                      size_t niov,
2668                                      int **fds,
2669                                      size_t *nfds,
2670                                      Error **errp)
2671{
2672    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2673    RDMAContext *rdma = rioc->rdma;
2674    RDMAControlHeader head;
2675    int ret = 0;
2676    ssize_t i;
2677    size_t done = 0;
2678
2679    CHECK_ERROR_STATE();
2680
2681    for (i = 0; i < niov; i++) {
2682        size_t want = iov[i].iov_len;
2683        uint8_t *data = (void *)iov[i].iov_base;
2684
2685        /*
2686         * First, we hold on to the last SEND message we
2687         * were given and dish out the bytes until we run
2688         * out of bytes.
2689         */
2690        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
2691        done += ret;
2692        want -= ret;
2693        /* Got what we needed, so go to next iovec */
2694        if (want == 0) {
2695            continue;
2696        }
2697
2698        /* If we got any data so far, then don't wait
2699         * for more, just return what we have */
2700        if (done > 0) {
2701            break;
2702        }
2703
2704
2705        /* We've got nothing at all, so lets wait for
2706         * more to arrive
2707         */
2708        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2709
2710        if (ret < 0) {
2711            rdma->error_state = ret;
2712            return ret;
2713        }
2714
2715        /*
2716         * SEND was received with new bytes, now try again.
2717         */
2718        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
2719        done += ret;
2720        want -= ret;
2721
2722        /* Still didn't get enough, so lets just return */
2723        if (want) {
2724            if (done == 0) {
2725                return QIO_CHANNEL_ERR_BLOCK;
2726            } else {
2727                break;
2728            }
2729        }
2730    }
2731    return done;
2732}
2733
2734/*
2735 * Block until all the outstanding chunks have been delivered by the hardware.
2736 */
2737static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2738{
2739    int ret;
2740
2741    if (qemu_rdma_write_flush(f, rdma) < 0) {
2742        return -EIO;
2743    }
2744
2745    while (rdma->nb_sent) {
2746        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2747        if (ret < 0) {
2748            error_report("rdma migration: complete polling error!");
2749            return -EIO;
2750        }
2751    }
2752
2753    qemu_rdma_unregister_waiting(rdma);
2754
2755    return 0;
2756}
2757
2758
2759static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2760                                         bool blocking,
2761                                         Error **errp)
2762{
2763    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2764    /* XXX we should make readv/writev actually honour this :-) */
2765    rioc->blocking = blocking;
2766    return 0;
2767}
2768
2769
2770typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2771struct QIOChannelRDMASource {
2772    GSource parent;
2773    QIOChannelRDMA *rioc;
2774    GIOCondition condition;
2775};
2776
2777static gboolean
2778qio_channel_rdma_source_prepare(GSource *source,
2779                                gint *timeout)
2780{
2781    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2782    RDMAContext *rdma = rsource->rioc->rdma;
2783    GIOCondition cond = 0;
2784    *timeout = -1;
2785
2786    if (rdma->wr_data[0].control_len) {
2787        cond |= G_IO_IN;
2788    }
2789    cond |= G_IO_OUT;
2790
2791    return cond & rsource->condition;
2792}
2793
2794static gboolean
2795qio_channel_rdma_source_check(GSource *source)
2796{
2797    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2798    RDMAContext *rdma = rsource->rioc->rdma;
2799    GIOCondition cond = 0;
2800
2801    if (rdma->wr_data[0].control_len) {
2802        cond |= G_IO_IN;
2803    }
2804    cond |= G_IO_OUT;
2805
2806    return cond & rsource->condition;
2807}
2808
2809static gboolean
2810qio_channel_rdma_source_dispatch(GSource *source,
2811                                 GSourceFunc callback,
2812                                 gpointer user_data)
2813{
2814    QIOChannelFunc func = (QIOChannelFunc)callback;
2815    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2816    RDMAContext *rdma = rsource->rioc->rdma;
2817    GIOCondition cond = 0;
2818
2819    if (rdma->wr_data[0].control_len) {
2820        cond |= G_IO_IN;
2821    }
2822    cond |= G_IO_OUT;
2823
2824    return (*func)(QIO_CHANNEL(rsource->rioc),
2825                   (cond & rsource->condition),
2826                   user_data);
2827}
2828
2829static void
2830qio_channel_rdma_source_finalize(GSource *source)
2831{
2832    QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
2833
2834    object_unref(OBJECT(ssource->rioc));
2835}
2836
2837GSourceFuncs qio_channel_rdma_source_funcs = {
2838    qio_channel_rdma_source_prepare,
2839    qio_channel_rdma_source_check,
2840    qio_channel_rdma_source_dispatch,
2841    qio_channel_rdma_source_finalize
2842};
2843
2844static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
2845                                              GIOCondition condition)
2846{
2847    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2848    QIOChannelRDMASource *ssource;
2849    GSource *source;
2850
2851    source = g_source_new(&qio_channel_rdma_source_funcs,
2852                          sizeof(QIOChannelRDMASource));
2853    ssource = (QIOChannelRDMASource *)source;
2854
2855    ssource->rioc = rioc;
2856    object_ref(OBJECT(rioc));
2857
2858    ssource->condition = condition;
2859
2860    return source;
2861}
2862
2863
2864static int qio_channel_rdma_close(QIOChannel *ioc,
2865                                  Error **errp)
2866{
2867    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2868    trace_qemu_rdma_close();
2869    if (rioc->rdma) {
2870        if (!rioc->rdma->error_state) {
2871            rioc->rdma->error_state = qemu_file_get_error(rioc->file);
2872        }
2873        qemu_rdma_cleanup(rioc->rdma);
2874        g_free(rioc->rdma);
2875        rioc->rdma = NULL;
2876    }
2877    return 0;
2878}
2879
2880/*
2881 * Parameters:
2882 *    @offset == 0 :
2883 *        This means that 'block_offset' is a full virtual address that does not
2884 *        belong to a RAMBlock of the virtual machine and instead
2885 *        represents a private malloc'd memory area that the caller wishes to
2886 *        transfer.
2887 *
2888 *    @offset != 0 :
2889 *        Offset is an offset to be added to block_offset and used
2890 *        to also lookup the corresponding RAMBlock.
2891 *
2892 *    @size > 0 :
2893 *        Initiate an transfer this size.
2894 *
2895 *    @size == 0 :
2896 *        A 'hint' or 'advice' that means that we wish to speculatively
2897 *        and asynchronously unregister this memory. In this case, there is no
2898 *        guarantee that the unregister will actually happen, for example,
2899 *        if the memory is being actively transmitted. Additionally, the memory
2900 *        may be re-registered at any future time if a write within the same
2901 *        chunk was requested again, even if you attempted to unregister it
2902 *        here.
2903 *
2904 *    @size < 0 : TODO, not yet supported
2905 *        Unregister the memory NOW. This means that the caller does not
2906 *        expect there to be any future RDMA transfers and we just want to clean
2907 *        things up. This is used in case the upper layer owns the memory and
2908 *        cannot wait for qemu_fclose() to occur.
2909 *
2910 *    @bytes_sent : User-specificed pointer to indicate how many bytes were
2911 *                  sent. Usually, this will not be more than a few bytes of
2912 *                  the protocol because most transfers are sent asynchronously.
2913 */
2914static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
2915                                  ram_addr_t block_offset, ram_addr_t offset,
2916                                  size_t size, uint64_t *bytes_sent)
2917{
2918    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
2919    RDMAContext *rdma = rioc->rdma;
2920    int ret;
2921
2922    CHECK_ERROR_STATE();
2923
2924    qemu_fflush(f);
2925
2926    if (size > 0) {
2927        /*
2928         * Add this page to the current 'chunk'. If the chunk
2929         * is full, or the page doen't belong to the current chunk,
2930         * an actual RDMA write will occur and a new chunk will be formed.
2931         */
2932        ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
2933        if (ret < 0) {
2934            error_report("rdma migration: write error! %d", ret);
2935            goto err;
2936        }
2937
2938        /*
2939         * We always return 1 bytes because the RDMA
2940         * protocol is completely asynchronous. We do not yet know
2941         * whether an  identified chunk is zero or not because we're
2942         * waiting for other pages to potentially be merged with
2943         * the current chunk. So, we have to call qemu_update_position()
2944         * later on when the actual write occurs.
2945         */
2946        if (bytes_sent) {
2947            *bytes_sent = 1;
2948        }
2949    } else {
2950        uint64_t index, chunk;
2951
2952        /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
2953        if (size < 0) {
2954            ret = qemu_rdma_drain_cq(f, rdma);
2955            if (ret < 0) {
2956                fprintf(stderr, "rdma: failed to synchronously drain"
2957                                " completion queue before unregistration.\n");
2958                goto err;
2959            }
2960        }
2961        */
2962
2963        ret = qemu_rdma_search_ram_block(rdma, block_offset,
2964                                         offset, size, &index, &chunk);
2965
2966        if (ret) {
2967            error_report("ram block search failed");
2968            goto err;
2969        }
2970
2971        qemu_rdma_signal_unregister(rdma, index, chunk, 0);
2972
2973        /*
2974         * TODO: Synchronous, guaranteed unregistration (should not occur during
2975         * fast-path). Otherwise, unregisters will process on the next call to
2976         * qemu_rdma_drain_cq()
2977        if (size < 0) {
2978            qemu_rdma_unregister_waiting(rdma);
2979        }
2980        */
2981    }
2982
2983    /*
2984     * Drain the Completion Queue if possible, but do not block,
2985     * just poll.
2986     *
2987     * If nothing to poll, the end of the iteration will do this
2988     * again to make sure we don't overflow the request queue.
2989     */
2990    while (1) {
2991        uint64_t wr_id, wr_id_in;
2992        int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
2993        if (ret < 0) {
2994            error_report("rdma migration: polling error! %d", ret);
2995            goto err;
2996        }
2997
2998        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
2999
3000        if (wr_id == RDMA_WRID_NONE) {
3001            break;
3002        }
3003    }
3004
3005    return RAM_SAVE_CONTROL_DELAYED;
3006err:
3007    rdma->error_state = ret;
3008    return ret;
3009}
3010
3011static int qemu_rdma_accept(RDMAContext *rdma)
3012{
3013    RDMACapabilities cap;
3014    struct rdma_conn_param conn_param = {
3015                                            .responder_resources = 2,
3016                                            .private_data = &cap,
3017                                            .private_data_len = sizeof(cap),
3018                                         };
3019    struct rdma_cm_event *cm_event;
3020    struct ibv_context *verbs;
3021    int ret = -EINVAL;
3022    int idx;
3023
3024    ret = rdma_get_cm_event(rdma->channel, &cm_event);
3025    if (ret) {
3026        goto err_rdma_dest_wait;
3027    }
3028
3029    if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
3030        rdma_ack_cm_event(cm_event);
3031        goto err_rdma_dest_wait;
3032    }
3033
3034    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
3035
3036    network_to_caps(&cap);
3037
3038    if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
3039            error_report("Unknown source RDMA version: %d, bailing...",
3040                            cap.version);
3041            rdma_ack_cm_event(cm_event);
3042            goto err_rdma_dest_wait;
3043    }
3044
3045    /*
3046     * Respond with only the capabilities this version of QEMU knows about.
3047     */
3048    cap.flags &= known_capabilities;
3049
3050    /*
3051     * Enable the ones that we do know about.
3052     * Add other checks here as new ones are introduced.
3053     */
3054    if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
3055        rdma->pin_all = true;
3056    }
3057
3058    rdma->cm_id = cm_event->id;
3059    verbs = cm_event->id->verbs;
3060
3061    rdma_ack_cm_event(cm_event);
3062
3063    trace_qemu_rdma_accept_pin_state(rdma->pin_all);
3064
3065    caps_to_network(&cap);
3066
3067    trace_qemu_rdma_accept_pin_verbsc(verbs);
3068
3069    if (!rdma->verbs) {
3070        rdma->verbs = verbs;
3071    } else if (rdma->verbs != verbs) {
3072            error_report("ibv context not matching %p, %p!", rdma->verbs,
3073                         verbs);
3074            goto err_rdma_dest_wait;
3075    }
3076
3077    qemu_rdma_dump_id("dest_init", verbs);
3078
3079    ret = qemu_rdma_alloc_pd_cq(rdma);
3080    if (ret) {
3081        error_report("rdma migration: error allocating pd and cq!");
3082        goto err_rdma_dest_wait;
3083    }
3084
3085    ret = qemu_rdma_alloc_qp(rdma);
3086    if (ret) {
3087        error_report("rdma migration: error allocating qp!");
3088        goto err_rdma_dest_wait;
3089    }
3090
3091    ret = qemu_rdma_init_ram_blocks(rdma);
3092    if (ret) {
3093        error_report("rdma migration: error initializing ram blocks!");
3094        goto err_rdma_dest_wait;
3095    }
3096
3097    for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
3098        ret = qemu_rdma_reg_control(rdma, idx);
3099        if (ret) {
3100            error_report("rdma: error registering %d control", idx);
3101            goto err_rdma_dest_wait;
3102        }
3103    }
3104
3105    qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
3106
3107    ret = rdma_accept(rdma->cm_id, &conn_param);
3108    if (ret) {
3109        error_report("rdma_accept returns %d", ret);
3110        goto err_rdma_dest_wait;
3111    }
3112
3113    ret = rdma_get_cm_event(rdma->channel, &cm_event);
3114    if (ret) {
3115        error_report("rdma_accept get_cm_event failed %d", ret);
3116        goto err_rdma_dest_wait;
3117    }
3118
3119    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
3120        error_report("rdma_accept not event established");
3121        rdma_ack_cm_event(cm_event);
3122        goto err_rdma_dest_wait;
3123    }
3124
3125    rdma_ack_cm_event(cm_event);
3126    rdma->connected = true;
3127
3128    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
3129    if (ret) {
3130        error_report("rdma migration: error posting second control recv");
3131        goto err_rdma_dest_wait;
3132    }
3133
3134    qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3135
3136    return 0;
3137
3138err_rdma_dest_wait:
3139    rdma->error_state = ret;
3140    qemu_rdma_cleanup(rdma);
3141    return ret;
3142}
3143
3144static int dest_ram_sort_func(const void *a, const void *b)
3145{
3146    unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3147    unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3148
3149    return (a_index < b_index) ? -1 : (a_index != b_index);
3150}
3151
3152/*
3153 * During each iteration of the migration, we listen for instructions
3154 * by the source VM to perform dynamic page registrations before they
3155 * can perform RDMA operations.
3156 *
3157 * We respond with the 'rkey'.
3158 *
3159 * Keep doing this until the source tells us to stop.
3160 */
3161static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
3162{
3163    RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3164                               .type = RDMA_CONTROL_REGISTER_RESULT,
3165                               .repeat = 0,
3166                             };
3167    RDMAControlHeader unreg_resp = { .len = 0,
3168                               .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3169                               .repeat = 0,
3170                             };
3171    RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3172                                 .repeat = 1 };
3173    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3174    RDMAContext *rdma = rioc->rdma;
3175    RDMALocalBlocks *local = &rdma->local_ram_blocks;
3176    RDMAControlHeader head;
3177    RDMARegister *reg, *registers;
3178    RDMACompress *comp;
3179    RDMARegisterResult *reg_result;
3180    static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3181    RDMALocalBlock *block;
3182    void *host_addr;
3183    int ret = 0;
3184    int idx = 0;
3185    int count = 0;
3186    int i = 0;
3187
3188    CHECK_ERROR_STATE();
3189
3190    do {
3191        trace_qemu_rdma_registration_handle_wait();
3192
3193        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
3194
3195        if (ret < 0) {
3196            break;
3197        }
3198
3199        if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
3200            error_report("rdma: Too many requests in this message (%d)."
3201                            "Bailing.", head.repeat);
3202            ret = -EIO;
3203            break;
3204        }
3205
3206        switch (head.type) {
3207        case RDMA_CONTROL_COMPRESS:
3208            comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3209            network_to_compress(comp);
3210
3211            trace_qemu_rdma_registration_handle_compress(comp->length,
3212                                                         comp->block_idx,
3213                                                         comp->offset);
3214            if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3215                error_report("rdma: 'compress' bad block index %u (vs %d)",
3216                             (unsigned int)comp->block_idx,
3217                             rdma->local_ram_blocks.nb_blocks);
3218                ret = -EIO;
3219                goto out;
3220            }
3221            block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3222
3223            host_addr = block->local_host_addr +
3224                            (comp->offset - block->offset);
3225
3226            ram_handle_compressed(host_addr, comp->value, comp->length);
3227            break;
3228
3229        case RDMA_CONTROL_REGISTER_FINISHED:
3230            trace_qemu_rdma_registration_handle_finished();
3231            goto out;
3232
3233        case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3234            trace_qemu_rdma_registration_handle_ram_blocks();
3235
3236            /* Sort our local RAM Block list so it's the same as the source,
3237             * we can do this since we've filled in a src_index in the list
3238             * as we received the RAMBlock list earlier.
3239             */
3240            qsort(rdma->local_ram_blocks.block,
3241                  rdma->local_ram_blocks.nb_blocks,
3242                  sizeof(RDMALocalBlock), dest_ram_sort_func);
3243            for (i = 0; i < local->nb_blocks; i++) {
3244                local->block[i].index = i;
3245            }
3246
3247            if (rdma->pin_all) {
3248                ret = qemu_rdma_reg_whole_ram_blocks(rdma);
3249                if (ret) {
3250                    error_report("rdma migration: error dest "
3251                                    "registering ram blocks");
3252                    goto out;
3253                }
3254            }
3255
3256            /*
3257             * Dest uses this to prepare to transmit the RAMBlock descriptions
3258             * to the source VM after connection setup.
3259             * Both sides use the "remote" structure to communicate and update
3260             * their "local" descriptions with what was sent.
3261             */
3262            for (i = 0; i < local->nb_blocks; i++) {
3263                rdma->dest_blocks[i].remote_host_addr =
3264                    (uintptr_t)(local->block[i].local_host_addr);
3265
3266                if (rdma->pin_all) {
3267                    rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3268                }
3269
3270                rdma->dest_blocks[i].offset = local->block[i].offset;
3271                rdma->dest_blocks[i].length = local->block[i].length;
3272
3273                dest_block_to_network(&rdma->dest_blocks[i]);
3274                trace_qemu_rdma_registration_handle_ram_blocks_loop(
3275                    local->block[i].block_name,
3276                    local->block[i].offset,
3277                    local->block[i].length,
3278                    local->block[i].local_host_addr,
3279                    local->block[i].src_index);
3280            }
3281
3282            blocks.len = rdma->local_ram_blocks.nb_blocks
3283                                                * sizeof(RDMADestBlock);
3284
3285
3286            ret = qemu_rdma_post_send_control(rdma,
3287                                        (uint8_t *) rdma->dest_blocks, &blocks);
3288
3289            if (ret < 0) {
3290                error_report("rdma migration: error sending remote info");
3291                goto out;
3292            }
3293
3294            break;
3295        case RDMA_CONTROL_REGISTER_REQUEST:
3296            trace_qemu_rdma_registration_handle_register(head.repeat);
3297
3298            reg_resp.repeat = head.repeat;
3299            registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3300
3301            for (count = 0; count < head.repeat; count++) {
3302                uint64_t chunk;
3303                uint8_t *chunk_start, *chunk_end;
3304
3305                reg = &registers[count];
3306                network_to_register(reg);
3307
3308                reg_result = &results[count];
3309
3310                trace_qemu_rdma_registration_handle_register_loop(count,
3311                         reg->current_index, reg->key.current_addr, reg->chunks);
3312
3313                if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3314                    error_report("rdma: 'register' bad block index %u (vs %d)",
3315                                 (unsigned int)reg->current_index,
3316                                 rdma->local_ram_blocks.nb_blocks);
3317                    ret = -ENOENT;
3318                    goto out;
3319                }
3320                block = &(rdma->local_ram_blocks.block[reg->current_index]);
3321                if (block->is_ram_block) {
3322                    if (block->offset > reg->key.current_addr) {
3323                        error_report("rdma: bad register address for block %s"
3324                            " offset: %" PRIx64 " current_addr: %" PRIx64,
3325                            block->block_name, block->offset,
3326                            reg->key.current_addr);
3327                        ret = -ERANGE;
3328                        goto out;
3329                    }
3330                    host_addr = (block->local_host_addr +
3331                                (reg->key.current_addr - block->offset));
3332                    chunk = ram_chunk_index(block->local_host_addr,
3333                                            (uint8_t *) host_addr);
3334                } else {
3335                    chunk = reg->key.chunk;
3336                    host_addr = block->local_host_addr +
3337                        (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3338                    /* Check for particularly bad chunk value */
3339                    if (host_addr < (void *)block->local_host_addr) {
3340                        error_report("rdma: bad chunk for block %s"
3341                            " chunk: %" PRIx64,
3342                            block->block_name, reg->key.chunk);
3343                        ret = -ERANGE;
3344                        goto out;
3345                    }
3346                }
3347                chunk_start = ram_chunk_start(block, chunk);
3348                chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3349                if (qemu_rdma_register_and_get_keys(rdma, block,
3350                            (uintptr_t)host_addr, NULL, &reg_result->rkey,
3351                            chunk, chunk_start, chunk_end)) {
3352                    error_report("cannot get rkey");
3353                    ret = -EINVAL;
3354                    goto out;
3355                }
3356
3357                reg_result->host_addr = (uintptr_t)block->local_host_addr;
3358
3359                trace_qemu_rdma_registration_handle_register_rkey(
3360                                                           reg_result->rkey);
3361
3362                result_to_network(reg_result);
3363            }
3364
3365            ret = qemu_rdma_post_send_control(rdma,
3366                            (uint8_t *) results, &reg_resp);
3367
3368            if (ret < 0) {
3369                error_report("Failed to send control buffer");
3370                goto out;
3371            }
3372            break;
3373        case RDMA_CONTROL_UNREGISTER_REQUEST:
3374            trace_qemu_rdma_registration_handle_unregister(head.repeat);
3375            unreg_resp.repeat = head.repeat;
3376            registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3377
3378            for (count = 0; count < head.repeat; count++) {
3379                reg = &registers[count];
3380                network_to_register(reg);
3381
3382                trace_qemu_rdma_registration_handle_unregister_loop(count,
3383                           reg->current_index, reg->key.chunk);
3384
3385                block = &(rdma->local_ram_blocks.block[reg->current_index]);
3386
3387                ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3388                block->pmr[reg->key.chunk] = NULL;
3389
3390                if (ret != 0) {
3391                    perror("rdma unregistration chunk failed");
3392                    ret = -ret;
3393                    goto out;
3394                }
3395
3396                rdma->total_registrations--;
3397
3398                trace_qemu_rdma_registration_handle_unregister_success(
3399                                                       reg->key.chunk);
3400            }
3401
3402            ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
3403
3404            if (ret < 0) {
3405                error_report("Failed to send control buffer");
3406                goto out;
3407            }
3408            break;
3409        case RDMA_CONTROL_REGISTER_RESULT:
3410            error_report("Invalid RESULT message at dest.");
3411            ret = -EIO;
3412            goto out;
3413        default:
3414            error_report("Unknown control message %s", control_desc(head.type));
3415            ret = -EIO;
3416            goto out;
3417        }
3418    } while (1);
3419out:
3420    if (ret < 0) {
3421        rdma->error_state = ret;
3422    }
3423    return ret;
3424}
3425
3426/* Destination:
3427 * Called via a ram_control_load_hook during the initial RAM load section which
3428 * lists the RAMBlocks by name.  This lets us know the order of the RAMBlocks
3429 * on the source.
3430 * We've already built our local RAMBlock list, but not yet sent the list to
3431 * the source.
3432 */
3433static int
3434rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
3435{
3436    RDMAContext *rdma = rioc->rdma;
3437    int curr;
3438    int found = -1;
3439
3440    /* Find the matching RAMBlock in our local list */
3441    for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3442        if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3443            found = curr;
3444            break;
3445        }
3446    }
3447
3448    if (found == -1) {
3449        error_report("RAMBlock '%s' not found on destination", name);
3450        return -ENOENT;
3451    }
3452
3453    rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3454    trace_rdma_block_notification_handle(name, rdma->next_src_index);
3455    rdma->next_src_index++;
3456
3457    return 0;
3458}
3459
3460static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
3461{
3462    switch (flags) {
3463    case RAM_CONTROL_BLOCK_REG:
3464        return rdma_block_notification_handle(opaque, data);
3465
3466    case RAM_CONTROL_HOOK:
3467        return qemu_rdma_registration_handle(f, opaque);
3468
3469    default:
3470        /* Shouldn't be called with any other values */
3471        abort();
3472    }
3473}
3474
3475static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
3476                                        uint64_t flags, void *data)
3477{
3478    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3479    RDMAContext *rdma = rioc->rdma;
3480
3481    CHECK_ERROR_STATE();
3482
3483    trace_qemu_rdma_registration_start(flags);
3484    qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3485    qemu_fflush(f);
3486
3487    return 0;
3488}
3489
3490/*
3491 * Inform dest that dynamic registrations are done for now.
3492 * First, flush writes, if any.
3493 */
3494static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
3495                                       uint64_t flags, void *data)
3496{
3497    Error *local_err = NULL, **errp = &local_err;
3498    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3499    RDMAContext *rdma = rioc->rdma;
3500    RDMAControlHeader head = { .len = 0, .repeat = 1 };
3501    int ret = 0;
3502
3503    CHECK_ERROR_STATE();
3504
3505    qemu_fflush(f);
3506    ret = qemu_rdma_drain_cq(f, rdma);
3507
3508    if (ret < 0) {
3509        goto err;
3510    }
3511
3512    if (flags == RAM_CONTROL_SETUP) {
3513        RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3514        RDMALocalBlocks *local = &rdma->local_ram_blocks;
3515        int reg_result_idx, i, nb_dest_blocks;
3516
3517        head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3518        trace_qemu_rdma_registration_stop_ram();
3519
3520        /*
3521         * Make sure that we parallelize the pinning on both sides.
3522         * For very large guests, doing this serially takes a really
3523         * long time, so we have to 'interleave' the pinning locally
3524         * with the control messages by performing the pinning on this
3525         * side before we receive the control response from the other
3526         * side that the pinning has completed.
3527         */
3528        ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3529                    &reg_result_idx, rdma->pin_all ?
3530                    qemu_rdma_reg_whole_ram_blocks : NULL);
3531        if (ret < 0) {
3532            ERROR(errp, "receiving remote info!");
3533            return ret;
3534        }
3535
3536        nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3537
3538        /*
3539         * The protocol uses two different sets of rkeys (mutually exclusive):
3540         * 1. One key to represent the virtual address of the entire ram block.
3541         *    (dynamic chunk registration disabled - pin everything with one rkey.)
3542         * 2. One to represent individual chunks within a ram block.
3543         *    (dynamic chunk registration enabled - pin individual chunks.)
3544         *
3545         * Once the capability is successfully negotiated, the destination transmits
3546         * the keys to use (or sends them later) including the virtual addresses
3547         * and then propagates the remote ram block descriptions to his local copy.
3548         */
3549
3550        if (local->nb_blocks != nb_dest_blocks) {
3551            ERROR(errp, "ram blocks mismatch (Number of blocks %d vs %d) "
3552                        "Your QEMU command line parameters are probably "
3553                        "not identical on both the source and destination.",
3554                        local->nb_blocks, nb_dest_blocks);
3555            rdma->error_state = -EINVAL;
3556            return -EINVAL;
3557        }
3558
3559        qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3560        memcpy(rdma->dest_blocks,
3561            rdma->wr_data[reg_result_idx].control_curr, resp.len);
3562        for (i = 0; i < nb_dest_blocks; i++) {
3563            network_to_dest_block(&rdma->dest_blocks[i]);
3564
3565            /* We require that the blocks are in the same order */
3566            if (rdma->dest_blocks[i].length != local->block[i].length) {
3567                ERROR(errp, "Block %s/%d has a different length %" PRIu64
3568                            "vs %" PRIu64, local->block[i].block_name, i,
3569                            local->block[i].length,
3570                            rdma->dest_blocks[i].length);
3571                rdma->error_state = -EINVAL;
3572                return -EINVAL;
3573            }
3574            local->block[i].remote_host_addr =
3575                    rdma->dest_blocks[i].remote_host_addr;
3576            local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3577        }
3578    }
3579
3580    trace_qemu_rdma_registration_stop(flags);
3581
3582    head.type = RDMA_CONTROL_REGISTER_FINISHED;
3583    ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3584
3585    if (ret < 0) {
3586        goto err;
3587    }
3588
3589    return 0;
3590err:
3591    rdma->error_state = ret;
3592    return ret;
3593}
3594
3595static const QEMUFileHooks rdma_read_hooks = {
3596    .hook_ram_load = rdma_load_hook,
3597};
3598
3599static const QEMUFileHooks rdma_write_hooks = {
3600    .before_ram_iterate = qemu_rdma_registration_start,
3601    .after_ram_iterate  = qemu_rdma_registration_stop,
3602    .save_page          = qemu_rdma_save_page,
3603};
3604
3605
3606static void qio_channel_rdma_finalize(Object *obj)
3607{
3608    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
3609    if (rioc->rdma) {
3610        qemu_rdma_cleanup(rioc->rdma);
3611        g_free(rioc->rdma);
3612        rioc->rdma = NULL;
3613    }
3614}
3615
3616static void qio_channel_rdma_class_init(ObjectClass *klass,
3617                                        void *class_data G_GNUC_UNUSED)
3618{
3619    QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
3620
3621    ioc_klass->io_writev = qio_channel_rdma_writev;
3622    ioc_klass->io_readv = qio_channel_rdma_readv;
3623    ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
3624    ioc_klass->io_close = qio_channel_rdma_close;
3625    ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
3626}
3627
3628static const TypeInfo qio_channel_rdma_info = {
3629    .parent = TYPE_QIO_CHANNEL,
3630    .name = TYPE_QIO_CHANNEL_RDMA,
3631    .instance_size = sizeof(QIOChannelRDMA),
3632    .instance_finalize = qio_channel_rdma_finalize,
3633    .class_init = qio_channel_rdma_class_init,
3634};
3635
3636static void qio_channel_rdma_register_types(void)
3637{
3638    type_register_static(&qio_channel_rdma_info);
3639}
3640
3641type_init(qio_channel_rdma_register_types);
3642
3643static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
3644{
3645    QIOChannelRDMA *rioc;
3646
3647    if (qemu_file_mode_is_not_valid(mode)) {
3648        return NULL;
3649    }
3650
3651    rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
3652    rioc->rdma = rdma;
3653
3654    if (mode[0] == 'w') {
3655        rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
3656        qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
3657    } else {
3658        rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
3659        qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
3660    }
3661
3662    return rioc->file;
3663}
3664
3665static void rdma_accept_incoming_migration(void *opaque)
3666{
3667    RDMAContext *rdma = opaque;
3668    int ret;
3669    QEMUFile *f;
3670    Error *local_err = NULL, **errp = &local_err;
3671
3672    trace_qemu_rdma_accept_incoming_migration();
3673    ret = qemu_rdma_accept(rdma);
3674
3675    if (ret) {
3676        ERROR(errp, "RDMA Migration initialization failed!");
3677        return;
3678    }
3679
3680    trace_qemu_rdma_accept_incoming_migration_accepted();
3681
3682    f = qemu_fopen_rdma(rdma, "rb");
3683    if (f == NULL) {
3684        ERROR(errp, "could not qemu_fopen_rdma!");
3685        qemu_rdma_cleanup(rdma);
3686        return;
3687    }
3688
3689    rdma->migration_started_on_destination = 1;
3690    migration_fd_process_incoming(f);
3691}
3692
3693void rdma_start_incoming_migration(const char *host_port, Error **errp)
3694{
3695    int ret;
3696    RDMAContext *rdma;
3697    Error *local_err = NULL;
3698
3699    trace_rdma_start_incoming_migration();
3700    rdma = qemu_rdma_data_init(host_port, &local_err);
3701
3702    if (rdma == NULL) {
3703        goto err;
3704    }
3705
3706    ret = qemu_rdma_dest_init(rdma, &local_err);
3707
3708    if (ret) {
3709        goto err;
3710    }
3711
3712    trace_rdma_start_incoming_migration_after_dest_init();
3713
3714    ret = rdma_listen(rdma->listen_id, 5);
3715
3716    if (ret) {
3717        ERROR(errp, "listening on socket!");
3718        goto err;
3719    }
3720
3721    trace_rdma_start_incoming_migration_after_rdma_listen();
3722
3723    qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3724                        NULL, (void *)(intptr_t)rdma);
3725    return;
3726err:
3727    error_propagate(errp, local_err);
3728    g_free(rdma);
3729}
3730
3731void rdma_start_outgoing_migration(void *opaque,
3732                            const char *host_port, Error **errp)
3733{
3734    MigrationState *s = opaque;
3735    RDMAContext *rdma = qemu_rdma_data_init(host_port, errp);
3736    int ret = 0;
3737
3738    if (rdma == NULL) {
3739        goto err;
3740    }
3741
3742    ret = qemu_rdma_source_init(rdma,
3743        s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
3744
3745    if (ret) {
3746        goto err;
3747    }
3748
3749    trace_rdma_start_outgoing_migration_after_rdma_source_init();
3750    ret = qemu_rdma_connect(rdma, errp);
3751
3752    if (ret) {
3753        goto err;
3754    }
3755
3756    trace_rdma_start_outgoing_migration_after_rdma_connect();
3757
3758    s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
3759    migrate_fd_connect(s, NULL);
3760    return;
3761err:
3762    g_free(rdma);
3763}
3764