qemu/migration/rdma.c
<<
>>
Prefs
   1/*
   2 * RDMA protocol and interfaces
   3 *
   4 * Copyright IBM, Corp. 2010-2013
   5 * Copyright Red Hat, Inc. 2015-2016
   6 *
   7 * Authors:
   8 *  Michael R. Hines <mrhines@us.ibm.com>
   9 *  Jiuxing Liu <jl@us.ibm.com>
  10 *  Daniel P. Berrange <berrange@redhat.com>
  11 *
  12 * This work is licensed under the terms of the GNU GPL, version 2 or
  13 * later.  See the COPYING file in the top-level directory.
  14 *
  15 */
  16#include "qemu/osdep.h"
  17#include "qapi/error.h"
  18#include "qemu-common.h"
  19#include "qemu/cutils.h"
  20#include "migration/migration.h"
  21#include "migration/qemu-file.h"
  22#include "exec/cpu-common.h"
  23#include "qemu/error-report.h"
  24#include "qemu/main-loop.h"
  25#include "qemu/sockets.h"
  26#include "qemu/bitmap.h"
  27#include "qemu/coroutine.h"
  28#include <sys/socket.h>
  29#include <netdb.h>
  30#include <arpa/inet.h>
  31#include <rdma/rdma_cma.h>
  32#include "trace.h"
  33
  34/*
  35 * Print and error on both the Monitor and the Log file.
  36 */
  37#define ERROR(errp, fmt, ...) \
  38    do { \
  39        fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
  40        if (errp && (*(errp) == NULL)) { \
  41            error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
  42        } \
  43    } while (0)
  44
  45#define RDMA_RESOLVE_TIMEOUT_MS 10000
  46
  47/* Do not merge data if larger than this. */
  48#define RDMA_MERGE_MAX (2 * 1024 * 1024)
  49#define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
  50
  51#define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
  52
  53/*
  54 * This is only for non-live state being migrated.
  55 * Instead of RDMA_WRITE messages, we use RDMA_SEND
  56 * messages for that state, which requires a different
  57 * delivery design than main memory.
  58 */
  59#define RDMA_SEND_INCREMENT 32768
  60
  61/*
  62 * Maximum size infiniband SEND message
  63 */
  64#define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
  65#define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
  66
  67#define RDMA_CONTROL_VERSION_CURRENT 1
  68/*
  69 * Capabilities for negotiation.
  70 */
  71#define RDMA_CAPABILITY_PIN_ALL 0x01
  72
  73/*
  74 * Add the other flags above to this list of known capabilities
  75 * as they are introduced.
  76 */
  77static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
  78
  79#define CHECK_ERROR_STATE() \
  80    do { \
  81        if (rdma->error_state) { \
  82            if (!rdma->error_reported) { \
  83                error_report("RDMA is in an error state waiting migration" \
  84                                " to abort!"); \
  85                rdma->error_reported = 1; \
  86            } \
  87            return rdma->error_state; \
  88        } \
  89    } while (0);
  90
  91/*
  92 * A work request ID is 64-bits and we split up these bits
  93 * into 3 parts:
  94 *
  95 * bits 0-15 : type of control message, 2^16
  96 * bits 16-29: ram block index, 2^14
  97 * bits 30-63: ram block chunk number, 2^34
  98 *
  99 * The last two bit ranges are only used for RDMA writes,
 100 * in order to track their completion and potentially
 101 * also track unregistration status of the message.
 102 */
 103#define RDMA_WRID_TYPE_SHIFT  0UL
 104#define RDMA_WRID_BLOCK_SHIFT 16UL
 105#define RDMA_WRID_CHUNK_SHIFT 30UL
 106
 107#define RDMA_WRID_TYPE_MASK \
 108    ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
 109
 110#define RDMA_WRID_BLOCK_MASK \
 111    (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
 112
 113#define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
 114
 115/*
 116 * RDMA migration protocol:
 117 * 1. RDMA Writes (data messages, i.e. RAM)
 118 * 2. IB Send/Recv (control channel messages)
 119 */
 120enum {
 121    RDMA_WRID_NONE = 0,
 122    RDMA_WRID_RDMA_WRITE = 1,
 123    RDMA_WRID_SEND_CONTROL = 2000,
 124    RDMA_WRID_RECV_CONTROL = 4000,
 125};
 126
 127static const char *wrid_desc[] = {
 128    [RDMA_WRID_NONE] = "NONE",
 129    [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
 130    [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
 131    [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
 132};
 133
 134/*
 135 * Work request IDs for IB SEND messages only (not RDMA writes).
 136 * This is used by the migration protocol to transmit
 137 * control messages (such as device state and registration commands)
 138 *
 139 * We could use more WRs, but we have enough for now.
 140 */
 141enum {
 142    RDMA_WRID_READY = 0,
 143    RDMA_WRID_DATA,
 144    RDMA_WRID_CONTROL,
 145    RDMA_WRID_MAX,
 146};
 147
 148/*
 149 * SEND/RECV IB Control Messages.
 150 */
 151enum {
 152    RDMA_CONTROL_NONE = 0,
 153    RDMA_CONTROL_ERROR,
 154    RDMA_CONTROL_READY,               /* ready to receive */
 155    RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
 156    RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
 157    RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
 158    RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
 159    RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
 160    RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
 161    RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
 162    RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
 163    RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
 164};
 165
 166static const char *control_desc[] = {
 167    [RDMA_CONTROL_NONE] = "NONE",
 168    [RDMA_CONTROL_ERROR] = "ERROR",
 169    [RDMA_CONTROL_READY] = "READY",
 170    [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
 171    [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
 172    [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
 173    [RDMA_CONTROL_COMPRESS] = "COMPRESS",
 174    [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
 175    [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
 176    [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
 177    [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
 178    [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
 179};
 180
 181/*
 182 * Memory and MR structures used to represent an IB Send/Recv work request.
 183 * This is *not* used for RDMA writes, only IB Send/Recv.
 184 */
 185typedef struct {
 186    uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
 187    struct   ibv_mr *control_mr;               /* registration metadata */
 188    size_t   control_len;                      /* length of the message */
 189    uint8_t *control_curr;                     /* start of unconsumed bytes */
 190} RDMAWorkRequestData;
 191
 192/*
 193 * Negotiate RDMA capabilities during connection-setup time.
 194 */
 195typedef struct {
 196    uint32_t version;
 197    uint32_t flags;
 198} RDMACapabilities;
 199
 200static void caps_to_network(RDMACapabilities *cap)
 201{
 202    cap->version = htonl(cap->version);
 203    cap->flags = htonl(cap->flags);
 204}
 205
 206static void network_to_caps(RDMACapabilities *cap)
 207{
 208    cap->version = ntohl(cap->version);
 209    cap->flags = ntohl(cap->flags);
 210}
 211
 212/*
 213 * Representation of a RAMBlock from an RDMA perspective.
 214 * This is not transmitted, only local.
 215 * This and subsequent structures cannot be linked lists
 216 * because we're using a single IB message to transmit
 217 * the information. It's small anyway, so a list is overkill.
 218 */
 219typedef struct RDMALocalBlock {
 220    char          *block_name;
 221    uint8_t       *local_host_addr; /* local virtual address */
 222    uint64_t       remote_host_addr; /* remote virtual address */
 223    uint64_t       offset;
 224    uint64_t       length;
 225    struct         ibv_mr **pmr;    /* MRs for chunk-level registration */
 226    struct         ibv_mr *mr;      /* MR for non-chunk-level registration */
 227    uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
 228    uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration */
 229    int            index;           /* which block are we */
 230    unsigned int   src_index;       /* (Only used on dest) */
 231    bool           is_ram_block;
 232    int            nb_chunks;
 233    unsigned long *transit_bitmap;
 234    unsigned long *unregister_bitmap;
 235} RDMALocalBlock;
 236
 237/*
 238 * Also represents a RAMblock, but only on the dest.
 239 * This gets transmitted by the dest during connection-time
 240 * to the source VM and then is used to populate the
 241 * corresponding RDMALocalBlock with
 242 * the information needed to perform the actual RDMA.
 243 */
 244typedef struct QEMU_PACKED RDMADestBlock {
 245    uint64_t remote_host_addr;
 246    uint64_t offset;
 247    uint64_t length;
 248    uint32_t remote_rkey;
 249    uint32_t padding;
 250} RDMADestBlock;
 251
 252static uint64_t htonll(uint64_t v)
 253{
 254    union { uint32_t lv[2]; uint64_t llv; } u;
 255    u.lv[0] = htonl(v >> 32);
 256    u.lv[1] = htonl(v & 0xFFFFFFFFULL);
 257    return u.llv;
 258}
 259
 260static uint64_t ntohll(uint64_t v) {
 261    union { uint32_t lv[2]; uint64_t llv; } u;
 262    u.llv = v;
 263    return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
 264}
 265
 266static void dest_block_to_network(RDMADestBlock *db)
 267{
 268    db->remote_host_addr = htonll(db->remote_host_addr);
 269    db->offset = htonll(db->offset);
 270    db->length = htonll(db->length);
 271    db->remote_rkey = htonl(db->remote_rkey);
 272}
 273
 274static void network_to_dest_block(RDMADestBlock *db)
 275{
 276    db->remote_host_addr = ntohll(db->remote_host_addr);
 277    db->offset = ntohll(db->offset);
 278    db->length = ntohll(db->length);
 279    db->remote_rkey = ntohl(db->remote_rkey);
 280}
 281
 282/*
 283 * Virtual address of the above structures used for transmitting
 284 * the RAMBlock descriptions at connection-time.
 285 * This structure is *not* transmitted.
 286 */
 287typedef struct RDMALocalBlocks {
 288    int nb_blocks;
 289    bool     init;             /* main memory init complete */
 290    RDMALocalBlock *block;
 291} RDMALocalBlocks;
 292
 293/*
 294 * Main data structure for RDMA state.
 295 * While there is only one copy of this structure being allocated right now,
 296 * this is the place where one would start if you wanted to consider
 297 * having more than one RDMA connection open at the same time.
 298 */
 299typedef struct RDMAContext {
 300    char *host;
 301    int port;
 302
 303    RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
 304
 305    /*
 306     * This is used by *_exchange_send() to figure out whether or not
 307     * the initial "READY" message has already been received or not.
 308     * This is because other functions may potentially poll() and detect
 309     * the READY message before send() does, in which case we need to
 310     * know if it completed.
 311     */
 312    int control_ready_expected;
 313
 314    /* number of outstanding writes */
 315    int nb_sent;
 316
 317    /* store info about current buffer so that we can
 318       merge it with future sends */
 319    uint64_t current_addr;
 320    uint64_t current_length;
 321    /* index of ram block the current buffer belongs to */
 322    int current_index;
 323    /* index of the chunk in the current ram block */
 324    int current_chunk;
 325
 326    bool pin_all;
 327
 328    /*
 329     * infiniband-specific variables for opening the device
 330     * and maintaining connection state and so forth.
 331     *
 332     * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
 333     * cm_id->verbs, cm_id->channel, and cm_id->qp.
 334     */
 335    struct rdma_cm_id *cm_id;               /* connection manager ID */
 336    struct rdma_cm_id *listen_id;
 337    bool connected;
 338
 339    struct ibv_context          *verbs;
 340    struct rdma_event_channel   *channel;
 341    struct ibv_qp *qp;                      /* queue pair */
 342    struct ibv_comp_channel *comp_channel;  /* completion channel */
 343    struct ibv_pd *pd;                      /* protection domain */
 344    struct ibv_cq *cq;                      /* completion queue */
 345
 346    /*
 347     * If a previous write failed (perhaps because of a failed
 348     * memory registration, then do not attempt any future work
 349     * and remember the error state.
 350     */
 351    int error_state;
 352    int error_reported;
 353    int received_error;
 354
 355    /*
 356     * Description of ram blocks used throughout the code.
 357     */
 358    RDMALocalBlocks local_ram_blocks;
 359    RDMADestBlock  *dest_blocks;
 360
 361    /* Index of the next RAMBlock received during block registration */
 362    unsigned int    next_src_index;
 363
 364    /*
 365     * Migration on *destination* started.
 366     * Then use coroutine yield function.
 367     * Source runs in a thread, so we don't care.
 368     */
 369    int migration_started_on_destination;
 370
 371    int total_registrations;
 372    int total_writes;
 373
 374    int unregister_current, unregister_next;
 375    uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
 376
 377    GHashTable *blockmap;
 378} RDMAContext;
 379
 380#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
 381#define QIO_CHANNEL_RDMA(obj)                                     \
 382    OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA)
 383
 384typedef struct QIOChannelRDMA QIOChannelRDMA;
 385
 386
 387struct QIOChannelRDMA {
 388    QIOChannel parent;
 389    RDMAContext *rdma;
 390    QEMUFile *file;
 391    size_t len;
 392    bool blocking; /* XXX we don't actually honour this yet */
 393};
 394
 395/*
 396 * Main structure for IB Send/Recv control messages.
 397 * This gets prepended at the beginning of every Send/Recv.
 398 */
 399typedef struct QEMU_PACKED {
 400    uint32_t len;     /* Total length of data portion */
 401    uint32_t type;    /* which control command to perform */
 402    uint32_t repeat;  /* number of commands in data portion of same type */
 403    uint32_t padding;
 404} RDMAControlHeader;
 405
 406static void control_to_network(RDMAControlHeader *control)
 407{
 408    control->type = htonl(control->type);
 409    control->len = htonl(control->len);
 410    control->repeat = htonl(control->repeat);
 411}
 412
 413static void network_to_control(RDMAControlHeader *control)
 414{
 415    control->type = ntohl(control->type);
 416    control->len = ntohl(control->len);
 417    control->repeat = ntohl(control->repeat);
 418}
 419
 420/*
 421 * Register a single Chunk.
 422 * Information sent by the source VM to inform the dest
 423 * to register an single chunk of memory before we can perform
 424 * the actual RDMA operation.
 425 */
 426typedef struct QEMU_PACKED {
 427    union QEMU_PACKED {
 428        uint64_t current_addr;  /* offset into the ram_addr_t space */
 429        uint64_t chunk;         /* chunk to lookup if unregistering */
 430    } key;
 431    uint32_t current_index; /* which ramblock the chunk belongs to */
 432    uint32_t padding;
 433    uint64_t chunks;            /* how many sequential chunks to register */
 434} RDMARegister;
 435
 436static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
 437{
 438    RDMALocalBlock *local_block;
 439    local_block  = &rdma->local_ram_blocks.block[reg->current_index];
 440
 441    if (local_block->is_ram_block) {
 442        /*
 443         * current_addr as passed in is an address in the local ram_addr_t
 444         * space, we need to translate this for the destination
 445         */
 446        reg->key.current_addr -= local_block->offset;
 447        reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
 448    }
 449    reg->key.current_addr = htonll(reg->key.current_addr);
 450    reg->current_index = htonl(reg->current_index);
 451    reg->chunks = htonll(reg->chunks);
 452}
 453
 454static void network_to_register(RDMARegister *reg)
 455{
 456    reg->key.current_addr = ntohll(reg->key.current_addr);
 457    reg->current_index = ntohl(reg->current_index);
 458    reg->chunks = ntohll(reg->chunks);
 459}
 460
 461typedef struct QEMU_PACKED {
 462    uint32_t value;     /* if zero, we will madvise() */
 463    uint32_t block_idx; /* which ram block index */
 464    uint64_t offset;    /* Address in remote ram_addr_t space */
 465    uint64_t length;    /* length of the chunk */
 466} RDMACompress;
 467
 468static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
 469{
 470    comp->value = htonl(comp->value);
 471    /*
 472     * comp->offset as passed in is an address in the local ram_addr_t
 473     * space, we need to translate this for the destination
 474     */
 475    comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
 476    comp->offset += rdma->dest_blocks[comp->block_idx].offset;
 477    comp->block_idx = htonl(comp->block_idx);
 478    comp->offset = htonll(comp->offset);
 479    comp->length = htonll(comp->length);
 480}
 481
 482static void network_to_compress(RDMACompress *comp)
 483{
 484    comp->value = ntohl(comp->value);
 485    comp->block_idx = ntohl(comp->block_idx);
 486    comp->offset = ntohll(comp->offset);
 487    comp->length = ntohll(comp->length);
 488}
 489
 490/*
 491 * The result of the dest's memory registration produces an "rkey"
 492 * which the source VM must reference in order to perform
 493 * the RDMA operation.
 494 */
 495typedef struct QEMU_PACKED {
 496    uint32_t rkey;
 497    uint32_t padding;
 498    uint64_t host_addr;
 499} RDMARegisterResult;
 500
 501static void result_to_network(RDMARegisterResult *result)
 502{
 503    result->rkey = htonl(result->rkey);
 504    result->host_addr = htonll(result->host_addr);
 505};
 506
 507static void network_to_result(RDMARegisterResult *result)
 508{
 509    result->rkey = ntohl(result->rkey);
 510    result->host_addr = ntohll(result->host_addr);
 511};
 512
 513const char *print_wrid(int wrid);
 514static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
 515                                   uint8_t *data, RDMAControlHeader *resp,
 516                                   int *resp_idx,
 517                                   int (*callback)(RDMAContext *rdma));
 518
 519static inline uint64_t ram_chunk_index(const uint8_t *start,
 520                                       const uint8_t *host)
 521{
 522    return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
 523}
 524
 525static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
 526                                       uint64_t i)
 527{
 528    return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
 529                                  (i << RDMA_REG_CHUNK_SHIFT));
 530}
 531
 532static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
 533                                     uint64_t i)
 534{
 535    uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
 536                                         (1UL << RDMA_REG_CHUNK_SHIFT);
 537
 538    if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
 539        result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
 540    }
 541
 542    return result;
 543}
 544
 545static int rdma_add_block(RDMAContext *rdma, const char *block_name,
 546                         void *host_addr,
 547                         ram_addr_t block_offset, uint64_t length)
 548{
 549    RDMALocalBlocks *local = &rdma->local_ram_blocks;
 550    RDMALocalBlock *block;
 551    RDMALocalBlock *old = local->block;
 552
 553    local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
 554
 555    if (local->nb_blocks) {
 556        int x;
 557
 558        if (rdma->blockmap) {
 559            for (x = 0; x < local->nb_blocks; x++) {
 560                g_hash_table_remove(rdma->blockmap,
 561                                    (void *)(uintptr_t)old[x].offset);
 562                g_hash_table_insert(rdma->blockmap,
 563                                    (void *)(uintptr_t)old[x].offset,
 564                                    &local->block[x]);
 565            }
 566        }
 567        memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
 568        g_free(old);
 569    }
 570
 571    block = &local->block[local->nb_blocks];
 572
 573    block->block_name = g_strdup(block_name);
 574    block->local_host_addr = host_addr;
 575    block->offset = block_offset;
 576    block->length = length;
 577    block->index = local->nb_blocks;
 578    block->src_index = ~0U; /* Filled in by the receipt of the block list */
 579    block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
 580    block->transit_bitmap = bitmap_new(block->nb_chunks);
 581    bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
 582    block->unregister_bitmap = bitmap_new(block->nb_chunks);
 583    bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
 584    block->remote_keys = g_new0(uint32_t, block->nb_chunks);
 585
 586    block->is_ram_block = local->init ? false : true;
 587
 588    if (rdma->blockmap) {
 589        g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
 590    }
 591
 592    trace_rdma_add_block(block_name, local->nb_blocks,
 593                         (uintptr_t) block->local_host_addr,
 594                         block->offset, block->length,
 595                         (uintptr_t) (block->local_host_addr + block->length),
 596                         BITS_TO_LONGS(block->nb_chunks) *
 597                             sizeof(unsigned long) * 8,
 598                         block->nb_chunks);
 599
 600    local->nb_blocks++;
 601
 602    return 0;
 603}
 604
 605/*
 606 * Memory regions need to be registered with the device and queue pairs setup
 607 * in advanced before the migration starts. This tells us where the RAM blocks
 608 * are so that we can register them individually.
 609 */
 610static int qemu_rdma_init_one_block(const char *block_name, void *host_addr,
 611    ram_addr_t block_offset, ram_addr_t length, void *opaque)
 612{
 613    return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
 614}
 615
 616/*
 617 * Identify the RAMBlocks and their quantity. They will be references to
 618 * identify chunk boundaries inside each RAMBlock and also be referenced
 619 * during dynamic page registration.
 620 */
 621static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
 622{
 623    RDMALocalBlocks *local = &rdma->local_ram_blocks;
 624
 625    assert(rdma->blockmap == NULL);
 626    memset(local, 0, sizeof *local);
 627    qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma);
 628    trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
 629    rdma->dest_blocks = g_new0(RDMADestBlock,
 630                               rdma->local_ram_blocks.nb_blocks);
 631    local->init = true;
 632    return 0;
 633}
 634
 635/*
 636 * Note: If used outside of cleanup, the caller must ensure that the destination
 637 * block structures are also updated
 638 */
 639static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
 640{
 641    RDMALocalBlocks *local = &rdma->local_ram_blocks;
 642    RDMALocalBlock *old = local->block;
 643    int x;
 644
 645    if (rdma->blockmap) {
 646        g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
 647    }
 648    if (block->pmr) {
 649        int j;
 650
 651        for (j = 0; j < block->nb_chunks; j++) {
 652            if (!block->pmr[j]) {
 653                continue;
 654            }
 655            ibv_dereg_mr(block->pmr[j]);
 656            rdma->total_registrations--;
 657        }
 658        g_free(block->pmr);
 659        block->pmr = NULL;
 660    }
 661
 662    if (block->mr) {
 663        ibv_dereg_mr(block->mr);
 664        rdma->total_registrations--;
 665        block->mr = NULL;
 666    }
 667
 668    g_free(block->transit_bitmap);
 669    block->transit_bitmap = NULL;
 670
 671    g_free(block->unregister_bitmap);
 672    block->unregister_bitmap = NULL;
 673
 674    g_free(block->remote_keys);
 675    block->remote_keys = NULL;
 676
 677    g_free(block->block_name);
 678    block->block_name = NULL;
 679
 680    if (rdma->blockmap) {
 681        for (x = 0; x < local->nb_blocks; x++) {
 682            g_hash_table_remove(rdma->blockmap,
 683                                (void *)(uintptr_t)old[x].offset);
 684        }
 685    }
 686
 687    if (local->nb_blocks > 1) {
 688
 689        local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
 690
 691        if (block->index) {
 692            memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
 693        }
 694
 695        if (block->index < (local->nb_blocks - 1)) {
 696            memcpy(local->block + block->index, old + (block->index + 1),
 697                sizeof(RDMALocalBlock) *
 698                    (local->nb_blocks - (block->index + 1)));
 699        }
 700    } else {
 701        assert(block == local->block);
 702        local->block = NULL;
 703    }
 704
 705    trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
 706                           block->offset, block->length,
 707                            (uintptr_t)(block->local_host_addr + block->length),
 708                           BITS_TO_LONGS(block->nb_chunks) *
 709                               sizeof(unsigned long) * 8, block->nb_chunks);
 710
 711    g_free(old);
 712
 713    local->nb_blocks--;
 714
 715    if (local->nb_blocks && rdma->blockmap) {
 716        for (x = 0; x < local->nb_blocks; x++) {
 717            g_hash_table_insert(rdma->blockmap,
 718                                (void *)(uintptr_t)local->block[x].offset,
 719                                &local->block[x]);
 720        }
 721    }
 722
 723    return 0;
 724}
 725
 726/*
 727 * Put in the log file which RDMA device was opened and the details
 728 * associated with that device.
 729 */
 730static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
 731{
 732    struct ibv_port_attr port;
 733
 734    if (ibv_query_port(verbs, 1, &port)) {
 735        error_report("Failed to query port information");
 736        return;
 737    }
 738
 739    printf("%s RDMA Device opened: kernel name %s "
 740           "uverbs device name %s, "
 741           "infiniband_verbs class device path %s, "
 742           "infiniband class device path %s, "
 743           "transport: (%d) %s\n",
 744                who,
 745                verbs->device->name,
 746                verbs->device->dev_name,
 747                verbs->device->dev_path,
 748                verbs->device->ibdev_path,
 749                port.link_layer,
 750                (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
 751                 ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
 752                    ? "Ethernet" : "Unknown"));
 753}
 754
 755/*
 756 * Put in the log file the RDMA gid addressing information,
 757 * useful for folks who have trouble understanding the
 758 * RDMA device hierarchy in the kernel.
 759 */
 760static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
 761{
 762    char sgid[33];
 763    char dgid[33];
 764    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
 765    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
 766    trace_qemu_rdma_dump_gid(who, sgid, dgid);
 767}
 768
 769/*
 770 * As of now, IPv6 over RoCE / iWARP is not supported by linux.
 771 * We will try the next addrinfo struct, and fail if there are
 772 * no other valid addresses to bind against.
 773 *
 774 * If user is listening on '[::]', then we will not have a opened a device
 775 * yet and have no way of verifying if the device is RoCE or not.
 776 *
 777 * In this case, the source VM will throw an error for ALL types of
 778 * connections (both IPv4 and IPv6) if the destination machine does not have
 779 * a regular infiniband network available for use.
 780 *
 781 * The only way to guarantee that an error is thrown for broken kernels is
 782 * for the management software to choose a *specific* interface at bind time
 783 * and validate what time of hardware it is.
 784 *
 785 * Unfortunately, this puts the user in a fix:
 786 *
 787 *  If the source VM connects with an IPv4 address without knowing that the
 788 *  destination has bound to '[::]' the migration will unconditionally fail
 789 *  unless the management software is explicitly listening on the IPv4
 790 *  address while using a RoCE-based device.
 791 *
 792 *  If the source VM connects with an IPv6 address, then we're OK because we can
 793 *  throw an error on the source (and similarly on the destination).
 794 *
 795 *  But in mixed environments, this will be broken for a while until it is fixed
 796 *  inside linux.
 797 *
 798 * We do provide a *tiny* bit of help in this function: We can list all of the
 799 * devices in the system and check to see if all the devices are RoCE or
 800 * Infiniband.
 801 *
 802 * If we detect that we have a *pure* RoCE environment, then we can safely
 803 * thrown an error even if the management software has specified '[::]' as the
 804 * bind address.
 805 *
 806 * However, if there is are multiple hetergeneous devices, then we cannot make
 807 * this assumption and the user just has to be sure they know what they are
 808 * doing.
 809 *
 810 * Patches are being reviewed on linux-rdma.
 811 */
 812static int qemu_rdma_broken_ipv6_kernel(Error **errp, struct ibv_context *verbs)
 813{
 814    struct ibv_port_attr port_attr;
 815
 816    /* This bug only exists in linux, to our knowledge. */
 817#ifdef CONFIG_LINUX
 818
 819    /*
 820     * Verbs are only NULL if management has bound to '[::]'.
 821     *
 822     * Let's iterate through all the devices and see if there any pure IB
 823     * devices (non-ethernet).
 824     *
 825     * If not, then we can safely proceed with the migration.
 826     * Otherwise, there are no guarantees until the bug is fixed in linux.
 827     */
 828    if (!verbs) {
 829        int num_devices, x;
 830        struct ibv_device ** dev_list = ibv_get_device_list(&num_devices);
 831        bool roce_found = false;
 832        bool ib_found = false;
 833
 834        for (x = 0; x < num_devices; x++) {
 835            verbs = ibv_open_device(dev_list[x]);
 836            if (!verbs) {
 837                if (errno == EPERM) {
 838                    continue;
 839                } else {
 840                    return -EINVAL;
 841                }
 842            }
 843
 844            if (ibv_query_port(verbs, 1, &port_attr)) {
 845                ibv_close_device(verbs);
 846                ERROR(errp, "Could not query initial IB port");
 847                return -EINVAL;
 848            }
 849
 850            if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
 851                ib_found = true;
 852            } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
 853                roce_found = true;
 854            }
 855
 856            ibv_close_device(verbs);
 857
 858        }
 859
 860        if (roce_found) {
 861            if (ib_found) {
 862                fprintf(stderr, "WARN: migrations may fail:"
 863                                " IPv6 over RoCE / iWARP in linux"
 864                                " is broken. But since you appear to have a"
 865                                " mixed RoCE / IB environment, be sure to only"
 866                                " migrate over the IB fabric until the kernel "
 867                                " fixes the bug.\n");
 868            } else {
 869                ERROR(errp, "You only have RoCE / iWARP devices in your systems"
 870                            " and your management software has specified '[::]'"
 871                            ", but IPv6 over RoCE / iWARP is not supported in Linux.");
 872                return -ENONET;
 873            }
 874        }
 875
 876        return 0;
 877    }
 878
 879    /*
 880     * If we have a verbs context, that means that some other than '[::]' was
 881     * used by the management software for binding. In which case we can
 882     * actually warn the user about a potentially broken kernel.
 883     */
 884
 885    /* IB ports start with 1, not 0 */
 886    if (ibv_query_port(verbs, 1, &port_attr)) {
 887        ERROR(errp, "Could not query initial IB port");
 888        return -EINVAL;
 889    }
 890
 891    if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
 892        ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
 893                    "(but patches on linux-rdma in progress)");
 894        return -ENONET;
 895    }
 896
 897#endif
 898
 899    return 0;
 900}
 901
 902/*
 903 * Figure out which RDMA device corresponds to the requested IP hostname
 904 * Also create the initial connection manager identifiers for opening
 905 * the connection.
 906 */
 907static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
 908{
 909    int ret;
 910    struct rdma_addrinfo *res;
 911    char port_str[16];
 912    struct rdma_cm_event *cm_event;
 913    char ip[40] = "unknown";
 914    struct rdma_addrinfo *e;
 915
 916    if (rdma->host == NULL || !strcmp(rdma->host, "")) {
 917        ERROR(errp, "RDMA hostname has not been set");
 918        return -EINVAL;
 919    }
 920
 921    /* create CM channel */
 922    rdma->channel = rdma_create_event_channel();
 923    if (!rdma->channel) {
 924        ERROR(errp, "could not create CM channel");
 925        return -EINVAL;
 926    }
 927
 928    /* create CM id */
 929    ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
 930    if (ret) {
 931        ERROR(errp, "could not create channel id");
 932        goto err_resolve_create_id;
 933    }
 934
 935    snprintf(port_str, 16, "%d", rdma->port);
 936    port_str[15] = '\0';
 937
 938    ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
 939    if (ret < 0) {
 940        ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
 941        goto err_resolve_get_addr;
 942    }
 943
 944    for (e = res; e != NULL; e = e->ai_next) {
 945        inet_ntop(e->ai_family,
 946            &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
 947        trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
 948
 949        ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
 950                RDMA_RESOLVE_TIMEOUT_MS);
 951        if (!ret) {
 952            if (e->ai_family == AF_INET6) {
 953                ret = qemu_rdma_broken_ipv6_kernel(errp, rdma->cm_id->verbs);
 954                if (ret) {
 955                    continue;
 956                }
 957            }
 958            goto route;
 959        }
 960    }
 961
 962    ERROR(errp, "could not resolve address %s", rdma->host);
 963    goto err_resolve_get_addr;
 964
 965route:
 966    qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
 967
 968    ret = rdma_get_cm_event(rdma->channel, &cm_event);
 969    if (ret) {
 970        ERROR(errp, "could not perform event_addr_resolved");
 971        goto err_resolve_get_addr;
 972    }
 973
 974    if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
 975        ERROR(errp, "result not equal to event_addr_resolved %s",
 976                rdma_event_str(cm_event->event));
 977        perror("rdma_resolve_addr");
 978        rdma_ack_cm_event(cm_event);
 979        ret = -EINVAL;
 980        goto err_resolve_get_addr;
 981    }
 982    rdma_ack_cm_event(cm_event);
 983
 984    /* resolve route */
 985    ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
 986    if (ret) {
 987        ERROR(errp, "could not resolve rdma route");
 988        goto err_resolve_get_addr;
 989    }
 990
 991    ret = rdma_get_cm_event(rdma->channel, &cm_event);
 992    if (ret) {
 993        ERROR(errp, "could not perform event_route_resolved");
 994        goto err_resolve_get_addr;
 995    }
 996    if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
 997        ERROR(errp, "result not equal to event_route_resolved: %s",
 998                        rdma_event_str(cm_event->event));
 999        rdma_ack_cm_event(cm_event);
1000        ret = -EINVAL;
1001        goto err_resolve_get_addr;
1002    }
1003    rdma_ack_cm_event(cm_event);
1004    rdma->verbs = rdma->cm_id->verbs;
1005    qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
1006    qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
1007    return 0;
1008
1009err_resolve_get_addr:
1010    rdma_destroy_id(rdma->cm_id);
1011    rdma->cm_id = NULL;
1012err_resolve_create_id:
1013    rdma_destroy_event_channel(rdma->channel);
1014    rdma->channel = NULL;
1015    return ret;
1016}
1017
1018/*
1019 * Create protection domain and completion queues
1020 */
1021static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
1022{
1023    /* allocate pd */
1024    rdma->pd = ibv_alloc_pd(rdma->verbs);
1025    if (!rdma->pd) {
1026        error_report("failed to allocate protection domain");
1027        return -1;
1028    }
1029
1030    /* create completion channel */
1031    rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
1032    if (!rdma->comp_channel) {
1033        error_report("failed to allocate completion channel");
1034        goto err_alloc_pd_cq;
1035    }
1036
1037    /*
1038     * Completion queue can be filled by both read and write work requests,
1039     * so must reflect the sum of both possible queue sizes.
1040     */
1041    rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1042            NULL, rdma->comp_channel, 0);
1043    if (!rdma->cq) {
1044        error_report("failed to allocate completion queue");
1045        goto err_alloc_pd_cq;
1046    }
1047
1048    return 0;
1049
1050err_alloc_pd_cq:
1051    if (rdma->pd) {
1052        ibv_dealloc_pd(rdma->pd);
1053    }
1054    if (rdma->comp_channel) {
1055        ibv_destroy_comp_channel(rdma->comp_channel);
1056    }
1057    rdma->pd = NULL;
1058    rdma->comp_channel = NULL;
1059    return -1;
1060
1061}
1062
1063/*
1064 * Create queue pairs.
1065 */
1066static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1067{
1068    struct ibv_qp_init_attr attr = { 0 };
1069    int ret;
1070
1071    attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1072    attr.cap.max_recv_wr = 3;
1073    attr.cap.max_send_sge = 1;
1074    attr.cap.max_recv_sge = 1;
1075    attr.send_cq = rdma->cq;
1076    attr.recv_cq = rdma->cq;
1077    attr.qp_type = IBV_QPT_RC;
1078
1079    ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
1080    if (ret) {
1081        return -1;
1082    }
1083
1084    rdma->qp = rdma->cm_id->qp;
1085    return 0;
1086}
1087
1088static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
1089{
1090    int i;
1091    RDMALocalBlocks *local = &rdma->local_ram_blocks;
1092
1093    for (i = 0; i < local->nb_blocks; i++) {
1094        local->block[i].mr =
1095            ibv_reg_mr(rdma->pd,
1096                    local->block[i].local_host_addr,
1097                    local->block[i].length,
1098                    IBV_ACCESS_LOCAL_WRITE |
1099                    IBV_ACCESS_REMOTE_WRITE
1100                    );
1101        if (!local->block[i].mr) {
1102            perror("Failed to register local dest ram block!\n");
1103            break;
1104        }
1105        rdma->total_registrations++;
1106    }
1107
1108    if (i >= local->nb_blocks) {
1109        return 0;
1110    }
1111
1112    for (i--; i >= 0; i--) {
1113        ibv_dereg_mr(local->block[i].mr);
1114        rdma->total_registrations--;
1115    }
1116
1117    return -1;
1118
1119}
1120
1121/*
1122 * Find the ram block that corresponds to the page requested to be
1123 * transmitted by QEMU.
1124 *
1125 * Once the block is found, also identify which 'chunk' within that
1126 * block that the page belongs to.
1127 *
1128 * This search cannot fail or the migration will fail.
1129 */
1130static int qemu_rdma_search_ram_block(RDMAContext *rdma,
1131                                      uintptr_t block_offset,
1132                                      uint64_t offset,
1133                                      uint64_t length,
1134                                      uint64_t *block_index,
1135                                      uint64_t *chunk_index)
1136{
1137    uint64_t current_addr = block_offset + offset;
1138    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1139                                                (void *) block_offset);
1140    assert(block);
1141    assert(current_addr >= block->offset);
1142    assert((current_addr + length) <= (block->offset + block->length));
1143
1144    *block_index = block->index;
1145    *chunk_index = ram_chunk_index(block->local_host_addr,
1146                block->local_host_addr + (current_addr - block->offset));
1147
1148    return 0;
1149}
1150
1151/*
1152 * Register a chunk with IB. If the chunk was already registered
1153 * previously, then skip.
1154 *
1155 * Also return the keys associated with the registration needed
1156 * to perform the actual RDMA operation.
1157 */
1158static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1159        RDMALocalBlock *block, uintptr_t host_addr,
1160        uint32_t *lkey, uint32_t *rkey, int chunk,
1161        uint8_t *chunk_start, uint8_t *chunk_end)
1162{
1163    if (block->mr) {
1164        if (lkey) {
1165            *lkey = block->mr->lkey;
1166        }
1167        if (rkey) {
1168            *rkey = block->mr->rkey;
1169        }
1170        return 0;
1171    }
1172
1173    /* allocate memory to store chunk MRs */
1174    if (!block->pmr) {
1175        block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1176    }
1177
1178    /*
1179     * If 'rkey', then we're the destination, so grant access to the source.
1180     *
1181     * If 'lkey', then we're the source VM, so grant access only to ourselves.
1182     */
1183    if (!block->pmr[chunk]) {
1184        uint64_t len = chunk_end - chunk_start;
1185
1186        trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1187
1188        block->pmr[chunk] = ibv_reg_mr(rdma->pd,
1189                chunk_start, len,
1190                (rkey ? (IBV_ACCESS_LOCAL_WRITE |
1191                        IBV_ACCESS_REMOTE_WRITE) : 0));
1192
1193        if (!block->pmr[chunk]) {
1194            perror("Failed to register chunk!");
1195            fprintf(stderr, "Chunk details: block: %d chunk index %d"
1196                            " start %" PRIuPTR " end %" PRIuPTR
1197                            " host %" PRIuPTR
1198                            " local %" PRIuPTR " registrations: %d\n",
1199                            block->index, chunk, (uintptr_t)chunk_start,
1200                            (uintptr_t)chunk_end, host_addr,
1201                            (uintptr_t)block->local_host_addr,
1202                            rdma->total_registrations);
1203            return -1;
1204        }
1205        rdma->total_registrations++;
1206    }
1207
1208    if (lkey) {
1209        *lkey = block->pmr[chunk]->lkey;
1210    }
1211    if (rkey) {
1212        *rkey = block->pmr[chunk]->rkey;
1213    }
1214    return 0;
1215}
1216
1217/*
1218 * Register (at connection time) the memory used for control
1219 * channel messages.
1220 */
1221static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1222{
1223    rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1224            rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1225            IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1226    if (rdma->wr_data[idx].control_mr) {
1227        rdma->total_registrations++;
1228        return 0;
1229    }
1230    error_report("qemu_rdma_reg_control failed");
1231    return -1;
1232}
1233
1234const char *print_wrid(int wrid)
1235{
1236    if (wrid >= RDMA_WRID_RECV_CONTROL) {
1237        return wrid_desc[RDMA_WRID_RECV_CONTROL];
1238    }
1239    return wrid_desc[wrid];
1240}
1241
1242/*
1243 * RDMA requires memory registration (mlock/pinning), but this is not good for
1244 * overcommitment.
1245 *
1246 * In preparation for the future where LRU information or workload-specific
1247 * writable writable working set memory access behavior is available to QEMU
1248 * it would be nice to have in place the ability to UN-register/UN-pin
1249 * particular memory regions from the RDMA hardware when it is determine that
1250 * those regions of memory will likely not be accessed again in the near future.
1251 *
1252 * While we do not yet have such information right now, the following
1253 * compile-time option allows us to perform a non-optimized version of this
1254 * behavior.
1255 *
1256 * By uncommenting this option, you will cause *all* RDMA transfers to be
1257 * unregistered immediately after the transfer completes on both sides of the
1258 * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
1259 *
1260 * This will have a terrible impact on migration performance, so until future
1261 * workload information or LRU information is available, do not attempt to use
1262 * this feature except for basic testing.
1263 */
1264//#define RDMA_UNREGISTRATION_EXAMPLE
1265
1266/*
1267 * Perform a non-optimized memory unregistration after every transfer
1268 * for demonstration purposes, only if pin-all is not requested.
1269 *
1270 * Potential optimizations:
1271 * 1. Start a new thread to run this function continuously
1272        - for bit clearing
1273        - and for receipt of unregister messages
1274 * 2. Use an LRU.
1275 * 3. Use workload hints.
1276 */
1277static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1278{
1279    while (rdma->unregistrations[rdma->unregister_current]) {
1280        int ret;
1281        uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1282        uint64_t chunk =
1283            (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1284        uint64_t index =
1285            (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1286        RDMALocalBlock *block =
1287            &(rdma->local_ram_blocks.block[index]);
1288        RDMARegister reg = { .current_index = index };
1289        RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1290                                 };
1291        RDMAControlHeader head = { .len = sizeof(RDMARegister),
1292                                   .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1293                                   .repeat = 1,
1294                                 };
1295
1296        trace_qemu_rdma_unregister_waiting_proc(chunk,
1297                                                rdma->unregister_current);
1298
1299        rdma->unregistrations[rdma->unregister_current] = 0;
1300        rdma->unregister_current++;
1301
1302        if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1303            rdma->unregister_current = 0;
1304        }
1305
1306
1307        /*
1308         * Unregistration is speculative (because migration is single-threaded
1309         * and we cannot break the protocol's inifinband message ordering).
1310         * Thus, if the memory is currently being used for transmission,
1311         * then abort the attempt to unregister and try again
1312         * later the next time a completion is received for this memory.
1313         */
1314        clear_bit(chunk, block->unregister_bitmap);
1315
1316        if (test_bit(chunk, block->transit_bitmap)) {
1317            trace_qemu_rdma_unregister_waiting_inflight(chunk);
1318            continue;
1319        }
1320
1321        trace_qemu_rdma_unregister_waiting_send(chunk);
1322
1323        ret = ibv_dereg_mr(block->pmr[chunk]);
1324        block->pmr[chunk] = NULL;
1325        block->remote_keys[chunk] = 0;
1326
1327        if (ret != 0) {
1328            perror("unregistration chunk failed");
1329            return -ret;
1330        }
1331        rdma->total_registrations--;
1332
1333        reg.key.chunk = chunk;
1334        register_to_network(rdma, &reg);
1335        ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1336                                &resp, NULL, NULL);
1337        if (ret < 0) {
1338            return ret;
1339        }
1340
1341        trace_qemu_rdma_unregister_waiting_complete(chunk);
1342    }
1343
1344    return 0;
1345}
1346
1347static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1348                                         uint64_t chunk)
1349{
1350    uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1351
1352    result |= (index << RDMA_WRID_BLOCK_SHIFT);
1353    result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1354
1355    return result;
1356}
1357
1358/*
1359 * Set bit for unregistration in the next iteration.
1360 * We cannot transmit right here, but will unpin later.
1361 */
1362static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
1363                                        uint64_t chunk, uint64_t wr_id)
1364{
1365    if (rdma->unregistrations[rdma->unregister_next] != 0) {
1366        error_report("rdma migration: queue is full");
1367    } else {
1368        RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1369
1370        if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
1371            trace_qemu_rdma_signal_unregister_append(chunk,
1372                                                     rdma->unregister_next);
1373
1374            rdma->unregistrations[rdma->unregister_next++] =
1375                    qemu_rdma_make_wrid(wr_id, index, chunk);
1376
1377            if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
1378                rdma->unregister_next = 0;
1379            }
1380        } else {
1381            trace_qemu_rdma_signal_unregister_already(chunk);
1382        }
1383    }
1384}
1385
1386/*
1387 * Consult the connection manager to see a work request
1388 * (of any kind) has completed.
1389 * Return the work request ID that completed.
1390 */
1391static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
1392                               uint32_t *byte_len)
1393{
1394    int ret;
1395    struct ibv_wc wc;
1396    uint64_t wr_id;
1397
1398    ret = ibv_poll_cq(rdma->cq, 1, &wc);
1399
1400    if (!ret) {
1401        *wr_id_out = RDMA_WRID_NONE;
1402        return 0;
1403    }
1404
1405    if (ret < 0) {
1406        error_report("ibv_poll_cq return %d", ret);
1407        return ret;
1408    }
1409
1410    wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1411
1412    if (wc.status != IBV_WC_SUCCESS) {
1413        fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1414                        wc.status, ibv_wc_status_str(wc.status));
1415        fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1416
1417        return -1;
1418    }
1419
1420    if (rdma->control_ready_expected &&
1421        (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1422        trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL],
1423                  wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1424        rdma->control_ready_expected = 0;
1425    }
1426
1427    if (wr_id == RDMA_WRID_RDMA_WRITE) {
1428        uint64_t chunk =
1429            (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1430        uint64_t index =
1431            (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1432        RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1433
1434        trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
1435                                   index, chunk, block->local_host_addr,
1436                                   (void *)(uintptr_t)block->remote_host_addr);
1437
1438        clear_bit(chunk, block->transit_bitmap);
1439
1440        if (rdma->nb_sent > 0) {
1441            rdma->nb_sent--;
1442        }
1443
1444        if (!rdma->pin_all) {
1445            /*
1446             * FYI: If one wanted to signal a specific chunk to be unregistered
1447             * using LRU or workload-specific information, this is the function
1448             * you would call to do so. That chunk would then get asynchronously
1449             * unregistered later.
1450             */
1451#ifdef RDMA_UNREGISTRATION_EXAMPLE
1452            qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
1453#endif
1454        }
1455    } else {
1456        trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
1457    }
1458
1459    *wr_id_out = wc.wr_id;
1460    if (byte_len) {
1461        *byte_len = wc.byte_len;
1462    }
1463
1464    return  0;
1465}
1466
1467/*
1468 * Block until the next work request has completed.
1469 *
1470 * First poll to see if a work request has already completed,
1471 * otherwise block.
1472 *
1473 * If we encounter completed work requests for IDs other than
1474 * the one we're interested in, then that's generally an error.
1475 *
1476 * The only exception is actual RDMA Write completions. These
1477 * completions only need to be recorded, but do not actually
1478 * need further processing.
1479 */
1480static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
1481                                    uint32_t *byte_len)
1482{
1483    int num_cq_events = 0, ret = 0;
1484    struct ibv_cq *cq;
1485    void *cq_ctx;
1486    uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1487
1488    if (ibv_req_notify_cq(rdma->cq, 0)) {
1489        return -1;
1490    }
1491    /* poll cq first */
1492    while (wr_id != wrid_requested) {
1493        ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1494        if (ret < 0) {
1495            return ret;
1496        }
1497
1498        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1499
1500        if (wr_id == RDMA_WRID_NONE) {
1501            break;
1502        }
1503        if (wr_id != wrid_requested) {
1504            trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1505                       wrid_requested, print_wrid(wr_id), wr_id);
1506        }
1507    }
1508
1509    if (wr_id == wrid_requested) {
1510        return 0;
1511    }
1512
1513    while (1) {
1514        /*
1515         * Coroutine doesn't start until migration_fd_process_incoming()
1516         * so don't yield unless we know we're running inside of a coroutine.
1517         */
1518        if (rdma->migration_started_on_destination) {
1519            yield_until_fd_readable(rdma->comp_channel->fd);
1520        }
1521
1522        if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
1523            perror("ibv_get_cq_event");
1524            goto err_block_for_wrid;
1525        }
1526
1527        num_cq_events++;
1528
1529        if (ibv_req_notify_cq(cq, 0)) {
1530            goto err_block_for_wrid;
1531        }
1532
1533        while (wr_id != wrid_requested) {
1534            ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1535            if (ret < 0) {
1536                goto err_block_for_wrid;
1537            }
1538
1539            wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1540
1541            if (wr_id == RDMA_WRID_NONE) {
1542                break;
1543            }
1544            if (wr_id != wrid_requested) {
1545                trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1546                                   wrid_requested, print_wrid(wr_id), wr_id);
1547            }
1548        }
1549
1550        if (wr_id == wrid_requested) {
1551            goto success_block_for_wrid;
1552        }
1553    }
1554
1555success_block_for_wrid:
1556    if (num_cq_events) {
1557        ibv_ack_cq_events(cq, num_cq_events);
1558    }
1559    return 0;
1560
1561err_block_for_wrid:
1562    if (num_cq_events) {
1563        ibv_ack_cq_events(cq, num_cq_events);
1564    }
1565    return ret;
1566}
1567
1568/*
1569 * Post a SEND message work request for the control channel
1570 * containing some data and block until the post completes.
1571 */
1572static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1573                                       RDMAControlHeader *head)
1574{
1575    int ret = 0;
1576    RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1577    struct ibv_send_wr *bad_wr;
1578    struct ibv_sge sge = {
1579                           .addr = (uintptr_t)(wr->control),
1580                           .length = head->len + sizeof(RDMAControlHeader),
1581                           .lkey = wr->control_mr->lkey,
1582                         };
1583    struct ibv_send_wr send_wr = {
1584                                   .wr_id = RDMA_WRID_SEND_CONTROL,
1585                                   .opcode = IBV_WR_SEND,
1586                                   .send_flags = IBV_SEND_SIGNALED,
1587                                   .sg_list = &sge,
1588                                   .num_sge = 1,
1589                                };
1590
1591    trace_qemu_rdma_post_send_control(control_desc[head->type]);
1592
1593    /*
1594     * We don't actually need to do a memcpy() in here if we used
1595     * the "sge" properly, but since we're only sending control messages
1596     * (not RAM in a performance-critical path), then its OK for now.
1597     *
1598     * The copy makes the RDMAControlHeader simpler to manipulate
1599     * for the time being.
1600     */
1601    assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1602    memcpy(wr->control, head, sizeof(RDMAControlHeader));
1603    control_to_network((void *) wr->control);
1604
1605    if (buf) {
1606        memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1607    }
1608
1609
1610    ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1611
1612    if (ret > 0) {
1613        error_report("Failed to use post IB SEND for control");
1614        return -ret;
1615    }
1616
1617    ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1618    if (ret < 0) {
1619        error_report("rdma migration: send polling control error");
1620    }
1621
1622    return ret;
1623}
1624
1625/*
1626 * Post a RECV work request in anticipation of some future receipt
1627 * of data on the control channel.
1628 */
1629static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1630{
1631    struct ibv_recv_wr *bad_wr;
1632    struct ibv_sge sge = {
1633                            .addr = (uintptr_t)(rdma->wr_data[idx].control),
1634                            .length = RDMA_CONTROL_MAX_BUFFER,
1635                            .lkey = rdma->wr_data[idx].control_mr->lkey,
1636                         };
1637
1638    struct ibv_recv_wr recv_wr = {
1639                                    .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1640                                    .sg_list = &sge,
1641                                    .num_sge = 1,
1642                                 };
1643
1644
1645    if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1646        return -1;
1647    }
1648
1649    return 0;
1650}
1651
1652/*
1653 * Block and wait for a RECV control channel message to arrive.
1654 */
1655static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1656                RDMAControlHeader *head, int expecting, int idx)
1657{
1658    uint32_t byte_len;
1659    int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1660                                       &byte_len);
1661
1662    if (ret < 0) {
1663        error_report("rdma migration: recv polling control error!");
1664        return ret;
1665    }
1666
1667    network_to_control((void *) rdma->wr_data[idx].control);
1668    memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1669
1670    trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]);
1671
1672    if (expecting == RDMA_CONTROL_NONE) {
1673        trace_qemu_rdma_exchange_get_response_none(control_desc[head->type],
1674                                             head->type);
1675    } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1676        error_report("Was expecting a %s (%d) control message"
1677                ", but got: %s (%d), length: %d",
1678                control_desc[expecting], expecting,
1679                control_desc[head->type], head->type, head->len);
1680        if (head->type == RDMA_CONTROL_ERROR) {
1681            rdma->received_error = true;
1682        }
1683        return -EIO;
1684    }
1685    if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1686        error_report("too long length: %d", head->len);
1687        return -EINVAL;
1688    }
1689    if (sizeof(*head) + head->len != byte_len) {
1690        error_report("Malformed length: %d byte_len %d", head->len, byte_len);
1691        return -EINVAL;
1692    }
1693
1694    return 0;
1695}
1696
1697/*
1698 * When a RECV work request has completed, the work request's
1699 * buffer is pointed at the header.
1700 *
1701 * This will advance the pointer to the data portion
1702 * of the control message of the work request's buffer that
1703 * was populated after the work request finished.
1704 */
1705static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1706                                  RDMAControlHeader *head)
1707{
1708    rdma->wr_data[idx].control_len = head->len;
1709    rdma->wr_data[idx].control_curr =
1710        rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1711}
1712
1713/*
1714 * This is an 'atomic' high-level operation to deliver a single, unified
1715 * control-channel message.
1716 *
1717 * Additionally, if the user is expecting some kind of reply to this message,
1718 * they can request a 'resp' response message be filled in by posting an
1719 * additional work request on behalf of the user and waiting for an additional
1720 * completion.
1721 *
1722 * The extra (optional) response is used during registration to us from having
1723 * to perform an *additional* exchange of message just to provide a response by
1724 * instead piggy-backing on the acknowledgement.
1725 */
1726static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1727                                   uint8_t *data, RDMAControlHeader *resp,
1728                                   int *resp_idx,
1729                                   int (*callback)(RDMAContext *rdma))
1730{
1731    int ret = 0;
1732
1733    /*
1734     * Wait until the dest is ready before attempting to deliver the message
1735     * by waiting for a READY message.
1736     */
1737    if (rdma->control_ready_expected) {
1738        RDMAControlHeader resp;
1739        ret = qemu_rdma_exchange_get_response(rdma,
1740                                    &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1741        if (ret < 0) {
1742            return ret;
1743        }
1744    }
1745
1746    /*
1747     * If the user is expecting a response, post a WR in anticipation of it.
1748     */
1749    if (resp) {
1750        ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1751        if (ret) {
1752            error_report("rdma migration: error posting"
1753                    " extra control recv for anticipated result!");
1754            return ret;
1755        }
1756    }
1757
1758    /*
1759     * Post a WR to replace the one we just consumed for the READY message.
1760     */
1761    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1762    if (ret) {
1763        error_report("rdma migration: error posting first control recv!");
1764        return ret;
1765    }
1766
1767    /*
1768     * Deliver the control message that was requested.
1769     */
1770    ret = qemu_rdma_post_send_control(rdma, data, head);
1771
1772    if (ret < 0) {
1773        error_report("Failed to send control buffer!");
1774        return ret;
1775    }
1776
1777    /*
1778     * If we're expecting a response, block and wait for it.
1779     */
1780    if (resp) {
1781        if (callback) {
1782            trace_qemu_rdma_exchange_send_issue_callback();
1783            ret = callback(rdma);
1784            if (ret < 0) {
1785                return ret;
1786            }
1787        }
1788
1789        trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]);
1790        ret = qemu_rdma_exchange_get_response(rdma, resp,
1791                                              resp->type, RDMA_WRID_DATA);
1792
1793        if (ret < 0) {
1794            return ret;
1795        }
1796
1797        qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1798        if (resp_idx) {
1799            *resp_idx = RDMA_WRID_DATA;
1800        }
1801        trace_qemu_rdma_exchange_send_received(control_desc[resp->type]);
1802    }
1803
1804    rdma->control_ready_expected = 1;
1805
1806    return 0;
1807}
1808
1809/*
1810 * This is an 'atomic' high-level operation to receive a single, unified
1811 * control-channel message.
1812 */
1813static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1814                                int expecting)
1815{
1816    RDMAControlHeader ready = {
1817                                .len = 0,
1818                                .type = RDMA_CONTROL_READY,
1819                                .repeat = 1,
1820                              };
1821    int ret;
1822
1823    /*
1824     * Inform the source that we're ready to receive a message.
1825     */
1826    ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1827
1828    if (ret < 0) {
1829        error_report("Failed to send control buffer!");
1830        return ret;
1831    }
1832
1833    /*
1834     * Block and wait for the message.
1835     */
1836    ret = qemu_rdma_exchange_get_response(rdma, head,
1837                                          expecting, RDMA_WRID_READY);
1838
1839    if (ret < 0) {
1840        return ret;
1841    }
1842
1843    qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1844
1845    /*
1846     * Post a new RECV work request to replace the one we just consumed.
1847     */
1848    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1849    if (ret) {
1850        error_report("rdma migration: error posting second control recv!");
1851        return ret;
1852    }
1853
1854    return 0;
1855}
1856
1857/*
1858 * Write an actual chunk of memory using RDMA.
1859 *
1860 * If we're using dynamic registration on the dest-side, we have to
1861 * send a registration command first.
1862 */
1863static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
1864                               int current_index, uint64_t current_addr,
1865                               uint64_t length)
1866{
1867    struct ibv_sge sge;
1868    struct ibv_send_wr send_wr = { 0 };
1869    struct ibv_send_wr *bad_wr;
1870    int reg_result_idx, ret, count = 0;
1871    uint64_t chunk, chunks;
1872    uint8_t *chunk_start, *chunk_end;
1873    RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
1874    RDMARegister reg;
1875    RDMARegisterResult *reg_result;
1876    RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
1877    RDMAControlHeader head = { .len = sizeof(RDMARegister),
1878                               .type = RDMA_CONTROL_REGISTER_REQUEST,
1879                               .repeat = 1,
1880                             };
1881
1882retry:
1883    sge.addr = (uintptr_t)(block->local_host_addr +
1884                            (current_addr - block->offset));
1885    sge.length = length;
1886
1887    chunk = ram_chunk_index(block->local_host_addr,
1888                            (uint8_t *)(uintptr_t)sge.addr);
1889    chunk_start = ram_chunk_start(block, chunk);
1890
1891    if (block->is_ram_block) {
1892        chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
1893
1894        if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1895            chunks--;
1896        }
1897    } else {
1898        chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
1899
1900        if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1901            chunks--;
1902        }
1903    }
1904
1905    trace_qemu_rdma_write_one_top(chunks + 1,
1906                                  (chunks + 1) *
1907                                  (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
1908
1909    chunk_end = ram_chunk_end(block, chunk + chunks);
1910
1911    if (!rdma->pin_all) {
1912#ifdef RDMA_UNREGISTRATION_EXAMPLE
1913        qemu_rdma_unregister_waiting(rdma);
1914#endif
1915    }
1916
1917    while (test_bit(chunk, block->transit_bitmap)) {
1918        (void)count;
1919        trace_qemu_rdma_write_one_block(count++, current_index, chunk,
1920                sge.addr, length, rdma->nb_sent, block->nb_chunks);
1921
1922        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
1923
1924        if (ret < 0) {
1925            error_report("Failed to Wait for previous write to complete "
1926                    "block %d chunk %" PRIu64
1927                    " current %" PRIu64 " len %" PRIu64 " %d",
1928                    current_index, chunk, sge.addr, length, rdma->nb_sent);
1929            return ret;
1930        }
1931    }
1932
1933    if (!rdma->pin_all || !block->is_ram_block) {
1934        if (!block->remote_keys[chunk]) {
1935            /*
1936             * This chunk has not yet been registered, so first check to see
1937             * if the entire chunk is zero. If so, tell the other size to
1938             * memset() + madvise() the entire chunk without RDMA.
1939             */
1940
1941            if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
1942                RDMACompress comp = {
1943                                        .offset = current_addr,
1944                                        .value = 0,
1945                                        .block_idx = current_index,
1946                                        .length = length,
1947                                    };
1948
1949                head.len = sizeof(comp);
1950                head.type = RDMA_CONTROL_COMPRESS;
1951
1952                trace_qemu_rdma_write_one_zero(chunk, sge.length,
1953                                               current_index, current_addr);
1954
1955                compress_to_network(rdma, &comp);
1956                ret = qemu_rdma_exchange_send(rdma, &head,
1957                                (uint8_t *) &comp, NULL, NULL, NULL);
1958
1959                if (ret < 0) {
1960                    return -EIO;
1961                }
1962
1963                acct_update_position(f, sge.length, true);
1964
1965                return 1;
1966            }
1967
1968            /*
1969             * Otherwise, tell other side to register.
1970             */
1971            reg.current_index = current_index;
1972            if (block->is_ram_block) {
1973                reg.key.current_addr = current_addr;
1974            } else {
1975                reg.key.chunk = chunk;
1976            }
1977            reg.chunks = chunks;
1978
1979            trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
1980                                              current_addr);
1981
1982            register_to_network(rdma, &reg);
1983            ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1984                                    &resp, &reg_result_idx, NULL);
1985            if (ret < 0) {
1986                return ret;
1987            }
1988
1989            /* try to overlap this single registration with the one we sent. */
1990            if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
1991                                                &sge.lkey, NULL, chunk,
1992                                                chunk_start, chunk_end)) {
1993                error_report("cannot get lkey");
1994                return -EINVAL;
1995            }
1996
1997            reg_result = (RDMARegisterResult *)
1998                    rdma->wr_data[reg_result_idx].control_curr;
1999
2000            network_to_result(reg_result);
2001
2002            trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
2003                                                 reg_result->rkey, chunk);
2004
2005            block->remote_keys[chunk] = reg_result->rkey;
2006            block->remote_host_addr = reg_result->host_addr;
2007        } else {
2008            /* already registered before */
2009            if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2010                                                &sge.lkey, NULL, chunk,
2011                                                chunk_start, chunk_end)) {
2012                error_report("cannot get lkey!");
2013                return -EINVAL;
2014            }
2015        }
2016
2017        send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2018    } else {
2019        send_wr.wr.rdma.rkey = block->remote_rkey;
2020
2021        if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2022                                                     &sge.lkey, NULL, chunk,
2023                                                     chunk_start, chunk_end)) {
2024            error_report("cannot get lkey!");
2025            return -EINVAL;
2026        }
2027    }
2028
2029    /*
2030     * Encode the ram block index and chunk within this wrid.
2031     * We will use this information at the time of completion
2032     * to figure out which bitmap to check against and then which
2033     * chunk in the bitmap to look for.
2034     */
2035    send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2036                                        current_index, chunk);
2037
2038    send_wr.opcode = IBV_WR_RDMA_WRITE;
2039    send_wr.send_flags = IBV_SEND_SIGNALED;
2040    send_wr.sg_list = &sge;
2041    send_wr.num_sge = 1;
2042    send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2043                                (current_addr - block->offset);
2044
2045    trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2046                                   sge.length);
2047
2048    /*
2049     * ibv_post_send() does not return negative error numbers,
2050     * per the specification they are positive - no idea why.
2051     */
2052    ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2053
2054    if (ret == ENOMEM) {
2055        trace_qemu_rdma_write_one_queue_full();
2056        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2057        if (ret < 0) {
2058            error_report("rdma migration: failed to make "
2059                         "room in full send queue! %d", ret);
2060            return ret;
2061        }
2062
2063        goto retry;
2064
2065    } else if (ret > 0) {
2066        perror("rdma migration: post rdma write failed");
2067        return -ret;
2068    }
2069
2070    set_bit(chunk, block->transit_bitmap);
2071    acct_update_position(f, sge.length, false);
2072    rdma->total_writes++;
2073
2074    return 0;
2075}
2076
2077/*
2078 * Push out any unwritten RDMA operations.
2079 *
2080 * We support sending out multiple chunks at the same time.
2081 * Not all of them need to get signaled in the completion queue.
2082 */
2083static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
2084{
2085    int ret;
2086
2087    if (!rdma->current_length) {
2088        return 0;
2089    }
2090
2091    ret = qemu_rdma_write_one(f, rdma,
2092            rdma->current_index, rdma->current_addr, rdma->current_length);
2093
2094    if (ret < 0) {
2095        return ret;
2096    }
2097
2098    if (ret == 0) {
2099        rdma->nb_sent++;
2100        trace_qemu_rdma_write_flush(rdma->nb_sent);
2101    }
2102
2103    rdma->current_length = 0;
2104    rdma->current_addr = 0;
2105
2106    return 0;
2107}
2108
2109static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
2110                    uint64_t offset, uint64_t len)
2111{
2112    RDMALocalBlock *block;
2113    uint8_t *host_addr;
2114    uint8_t *chunk_end;
2115
2116    if (rdma->current_index < 0) {
2117        return 0;
2118    }
2119
2120    if (rdma->current_chunk < 0) {
2121        return 0;
2122    }
2123
2124    block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2125    host_addr = block->local_host_addr + (offset - block->offset);
2126    chunk_end = ram_chunk_end(block, rdma->current_chunk);
2127
2128    if (rdma->current_length == 0) {
2129        return 0;
2130    }
2131
2132    /*
2133     * Only merge into chunk sequentially.
2134     */
2135    if (offset != (rdma->current_addr + rdma->current_length)) {
2136        return 0;
2137    }
2138
2139    if (offset < block->offset) {
2140        return 0;
2141    }
2142
2143    if ((offset + len) > (block->offset + block->length)) {
2144        return 0;
2145    }
2146
2147    if ((host_addr + len) > chunk_end) {
2148        return 0;
2149    }
2150
2151    return 1;
2152}
2153
2154/*
2155 * We're not actually writing here, but doing three things:
2156 *
2157 * 1. Identify the chunk the buffer belongs to.
2158 * 2. If the chunk is full or the buffer doesn't belong to the current
2159 *    chunk, then start a new chunk and flush() the old chunk.
2160 * 3. To keep the hardware busy, we also group chunks into batches
2161 *    and only require that a batch gets acknowledged in the completion
2162 *    qeueue instead of each individual chunk.
2163 */
2164static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
2165                           uint64_t block_offset, uint64_t offset,
2166                           uint64_t len)
2167{
2168    uint64_t current_addr = block_offset + offset;
2169    uint64_t index = rdma->current_index;
2170    uint64_t chunk = rdma->current_chunk;
2171    int ret;
2172
2173    /* If we cannot merge it, we flush the current buffer first. */
2174    if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
2175        ret = qemu_rdma_write_flush(f, rdma);
2176        if (ret) {
2177            return ret;
2178        }
2179        rdma->current_length = 0;
2180        rdma->current_addr = current_addr;
2181
2182        ret = qemu_rdma_search_ram_block(rdma, block_offset,
2183                                         offset, len, &index, &chunk);
2184        if (ret) {
2185            error_report("ram block search failed");
2186            return ret;
2187        }
2188        rdma->current_index = index;
2189        rdma->current_chunk = chunk;
2190    }
2191
2192    /* merge it */
2193    rdma->current_length += len;
2194
2195    /* flush it if buffer is too large */
2196    if (rdma->current_length >= RDMA_MERGE_MAX) {
2197        return qemu_rdma_write_flush(f, rdma);
2198    }
2199
2200    return 0;
2201}
2202
2203static void qemu_rdma_cleanup(RDMAContext *rdma)
2204{
2205    struct rdma_cm_event *cm_event;
2206    int ret, idx;
2207
2208    if (rdma->cm_id && rdma->connected) {
2209        if (rdma->error_state && !rdma->received_error) {
2210            RDMAControlHeader head = { .len = 0,
2211                                       .type = RDMA_CONTROL_ERROR,
2212                                       .repeat = 1,
2213                                     };
2214            error_report("Early error. Sending error.");
2215            qemu_rdma_post_send_control(rdma, NULL, &head);
2216        }
2217
2218        ret = rdma_disconnect(rdma->cm_id);
2219        if (!ret) {
2220            trace_qemu_rdma_cleanup_waiting_for_disconnect();
2221            ret = rdma_get_cm_event(rdma->channel, &cm_event);
2222            if (!ret) {
2223                rdma_ack_cm_event(cm_event);
2224            }
2225        }
2226        trace_qemu_rdma_cleanup_disconnect();
2227        rdma->connected = false;
2228    }
2229
2230    g_free(rdma->dest_blocks);
2231    rdma->dest_blocks = NULL;
2232
2233    for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2234        if (rdma->wr_data[idx].control_mr) {
2235            rdma->total_registrations--;
2236            ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2237        }
2238        rdma->wr_data[idx].control_mr = NULL;
2239    }
2240
2241    if (rdma->local_ram_blocks.block) {
2242        while (rdma->local_ram_blocks.nb_blocks) {
2243            rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2244        }
2245    }
2246
2247    if (rdma->qp) {
2248        rdma_destroy_qp(rdma->cm_id);
2249        rdma->qp = NULL;
2250    }
2251    if (rdma->cq) {
2252        ibv_destroy_cq(rdma->cq);
2253        rdma->cq = NULL;
2254    }
2255    if (rdma->comp_channel) {
2256        ibv_destroy_comp_channel(rdma->comp_channel);
2257        rdma->comp_channel = NULL;
2258    }
2259    if (rdma->pd) {
2260        ibv_dealloc_pd(rdma->pd);
2261        rdma->pd = NULL;
2262    }
2263    if (rdma->cm_id) {
2264        rdma_destroy_id(rdma->cm_id);
2265        rdma->cm_id = NULL;
2266    }
2267    if (rdma->listen_id) {
2268        rdma_destroy_id(rdma->listen_id);
2269        rdma->listen_id = NULL;
2270    }
2271    if (rdma->channel) {
2272        rdma_destroy_event_channel(rdma->channel);
2273        rdma->channel = NULL;
2274    }
2275    g_free(rdma->host);
2276    rdma->host = NULL;
2277}
2278
2279
2280static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
2281{
2282    int ret, idx;
2283    Error *local_err = NULL, **temp = &local_err;
2284
2285    /*
2286     * Will be validated against destination's actual capabilities
2287     * after the connect() completes.
2288     */
2289    rdma->pin_all = pin_all;
2290
2291    ret = qemu_rdma_resolve_host(rdma, temp);
2292    if (ret) {
2293        goto err_rdma_source_init;
2294    }
2295
2296    ret = qemu_rdma_alloc_pd_cq(rdma);
2297    if (ret) {
2298        ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2299                    " limits may be too low. Please check $ ulimit -a # and "
2300                    "search for 'ulimit -l' in the output");
2301        goto err_rdma_source_init;
2302    }
2303
2304    ret = qemu_rdma_alloc_qp(rdma);
2305    if (ret) {
2306        ERROR(temp, "rdma migration: error allocating qp!");
2307        goto err_rdma_source_init;
2308    }
2309
2310    ret = qemu_rdma_init_ram_blocks(rdma);
2311    if (ret) {
2312        ERROR(temp, "rdma migration: error initializing ram blocks!");
2313        goto err_rdma_source_init;
2314    }
2315
2316    /* Build the hash that maps from offset to RAMBlock */
2317    rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2318    for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
2319        g_hash_table_insert(rdma->blockmap,
2320                (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
2321                &rdma->local_ram_blocks.block[idx]);
2322    }
2323
2324    for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2325        ret = qemu_rdma_reg_control(rdma, idx);
2326        if (ret) {
2327            ERROR(temp, "rdma migration: error registering %d control!",
2328                                                            idx);
2329            goto err_rdma_source_init;
2330        }
2331    }
2332
2333    return 0;
2334
2335err_rdma_source_init:
2336    error_propagate(errp, local_err);
2337    qemu_rdma_cleanup(rdma);
2338    return -1;
2339}
2340
2341static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
2342{
2343    RDMACapabilities cap = {
2344                                .version = RDMA_CONTROL_VERSION_CURRENT,
2345                                .flags = 0,
2346                           };
2347    struct rdma_conn_param conn_param = { .initiator_depth = 2,
2348                                          .retry_count = 5,
2349                                          .private_data = &cap,
2350                                          .private_data_len = sizeof(cap),
2351                                        };
2352    struct rdma_cm_event *cm_event;
2353    int ret;
2354
2355    /*
2356     * Only negotiate the capability with destination if the user
2357     * on the source first requested the capability.
2358     */
2359    if (rdma->pin_all) {
2360        trace_qemu_rdma_connect_pin_all_requested();
2361        cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2362    }
2363
2364    caps_to_network(&cap);
2365
2366    ret = rdma_connect(rdma->cm_id, &conn_param);
2367    if (ret) {
2368        perror("rdma_connect");
2369        ERROR(errp, "connecting to destination!");
2370        goto err_rdma_source_connect;
2371    }
2372
2373    ret = rdma_get_cm_event(rdma->channel, &cm_event);
2374    if (ret) {
2375        perror("rdma_get_cm_event after rdma_connect");
2376        ERROR(errp, "connecting to destination!");
2377        rdma_ack_cm_event(cm_event);
2378        goto err_rdma_source_connect;
2379    }
2380
2381    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2382        perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
2383        ERROR(errp, "connecting to destination!");
2384        rdma_ack_cm_event(cm_event);
2385        goto err_rdma_source_connect;
2386    }
2387    rdma->connected = true;
2388
2389    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2390    network_to_caps(&cap);
2391
2392    /*
2393     * Verify that the *requested* capabilities are supported by the destination
2394     * and disable them otherwise.
2395     */
2396    if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2397        ERROR(errp, "Server cannot support pinning all memory. "
2398                        "Will register memory dynamically.");
2399        rdma->pin_all = false;
2400    }
2401
2402    trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2403
2404    rdma_ack_cm_event(cm_event);
2405
2406    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2407    if (ret) {
2408        ERROR(errp, "posting second control recv!");
2409        goto err_rdma_source_connect;
2410    }
2411
2412    rdma->control_ready_expected = 1;
2413    rdma->nb_sent = 0;
2414    return 0;
2415
2416err_rdma_source_connect:
2417    qemu_rdma_cleanup(rdma);
2418    return -1;
2419}
2420
2421static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2422{
2423    int ret, idx;
2424    struct rdma_cm_id *listen_id;
2425    char ip[40] = "unknown";
2426    struct rdma_addrinfo *res, *e;
2427    char port_str[16];
2428
2429    for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2430        rdma->wr_data[idx].control_len = 0;
2431        rdma->wr_data[idx].control_curr = NULL;
2432    }
2433
2434    if (!rdma->host || !rdma->host[0]) {
2435        ERROR(errp, "RDMA host is not set!");
2436        rdma->error_state = -EINVAL;
2437        return -1;
2438    }
2439    /* create CM channel */
2440    rdma->channel = rdma_create_event_channel();
2441    if (!rdma->channel) {
2442        ERROR(errp, "could not create rdma event channel");
2443        rdma->error_state = -EINVAL;
2444        return -1;
2445    }
2446
2447    /* create CM id */
2448    ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2449    if (ret) {
2450        ERROR(errp, "could not create cm_id!");
2451        goto err_dest_init_create_listen_id;
2452    }
2453
2454    snprintf(port_str, 16, "%d", rdma->port);
2455    port_str[15] = '\0';
2456
2457    ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2458    if (ret < 0) {
2459        ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
2460        goto err_dest_init_bind_addr;
2461    }
2462
2463    for (e = res; e != NULL; e = e->ai_next) {
2464        inet_ntop(e->ai_family,
2465            &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2466        trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2467        ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2468        if (ret) {
2469            continue;
2470        }
2471        if (e->ai_family == AF_INET6) {
2472            ret = qemu_rdma_broken_ipv6_kernel(errp, listen_id->verbs);
2473            if (ret) {
2474                continue;
2475            }
2476        }
2477        break;
2478    }
2479
2480    if (!e) {
2481        ERROR(errp, "Error: could not rdma_bind_addr!");
2482        goto err_dest_init_bind_addr;
2483    }
2484
2485    rdma->listen_id = listen_id;
2486    qemu_rdma_dump_gid("dest_init", listen_id);
2487    return 0;
2488
2489err_dest_init_bind_addr:
2490    rdma_destroy_id(listen_id);
2491err_dest_init_create_listen_id:
2492    rdma_destroy_event_channel(rdma->channel);
2493    rdma->channel = NULL;
2494    rdma->error_state = ret;
2495    return ret;
2496
2497}
2498
2499static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2500{
2501    RDMAContext *rdma = NULL;
2502    InetSocketAddress *addr;
2503
2504    if (host_port) {
2505        rdma = g_new0(RDMAContext, 1);
2506        rdma->current_index = -1;
2507        rdma->current_chunk = -1;
2508
2509        addr = inet_parse(host_port, NULL);
2510        if (addr != NULL) {
2511            rdma->port = atoi(addr->port);
2512            rdma->host = g_strdup(addr->host);
2513        } else {
2514            ERROR(errp, "bad RDMA migration address '%s'", host_port);
2515            g_free(rdma);
2516            rdma = NULL;
2517        }
2518
2519        qapi_free_InetSocketAddress(addr);
2520    }
2521
2522    return rdma;
2523}
2524
2525/*
2526 * QEMUFile interface to the control channel.
2527 * SEND messages for control only.
2528 * VM's ram is handled with regular RDMA messages.
2529 */
2530static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2531                                       const struct iovec *iov,
2532                                       size_t niov,
2533                                       int *fds,
2534                                       size_t nfds,
2535                                       Error **errp)
2536{
2537    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2538    QEMUFile *f = rioc->file;
2539    RDMAContext *rdma = rioc->rdma;
2540    int ret;
2541    ssize_t done = 0;
2542    size_t i;
2543
2544    CHECK_ERROR_STATE();
2545
2546    /*
2547     * Push out any writes that
2548     * we're queued up for VM's ram.
2549     */
2550    ret = qemu_rdma_write_flush(f, rdma);
2551    if (ret < 0) {
2552        rdma->error_state = ret;
2553        return ret;
2554    }
2555
2556    for (i = 0; i < niov; i++) {
2557        size_t remaining = iov[i].iov_len;
2558        uint8_t * data = (void *)iov[i].iov_base;
2559        while (remaining) {
2560            RDMAControlHeader head;
2561
2562            rioc->len = MIN(remaining, RDMA_SEND_INCREMENT);
2563            remaining -= rioc->len;
2564
2565            head.len = rioc->len;
2566            head.type = RDMA_CONTROL_QEMU_FILE;
2567
2568            ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2569
2570            if (ret < 0) {
2571                rdma->error_state = ret;
2572                return ret;
2573            }
2574
2575            data += rioc->len;
2576            done += rioc->len;
2577        }
2578    }
2579
2580    return done;
2581}
2582
2583static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2584                             size_t size, int idx)
2585{
2586    size_t len = 0;
2587
2588    if (rdma->wr_data[idx].control_len) {
2589        trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2590
2591        len = MIN(size, rdma->wr_data[idx].control_len);
2592        memcpy(buf, rdma->wr_data[idx].control_curr, len);
2593        rdma->wr_data[idx].control_curr += len;
2594        rdma->wr_data[idx].control_len -= len;
2595    }
2596
2597    return len;
2598}
2599
2600/*
2601 * QEMUFile interface to the control channel.
2602 * RDMA links don't use bytestreams, so we have to
2603 * return bytes to QEMUFile opportunistically.
2604 */
2605static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2606                                      const struct iovec *iov,
2607                                      size_t niov,
2608                                      int **fds,
2609                                      size_t *nfds,
2610                                      Error **errp)
2611{
2612    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2613    RDMAContext *rdma = rioc->rdma;
2614    RDMAControlHeader head;
2615    int ret = 0;
2616    ssize_t i;
2617    size_t done = 0;
2618
2619    CHECK_ERROR_STATE();
2620
2621    for (i = 0; i < niov; i++) {
2622        size_t want = iov[i].iov_len;
2623        uint8_t *data = (void *)iov[i].iov_base;
2624
2625        /*
2626         * First, we hold on to the last SEND message we
2627         * were given and dish out the bytes until we run
2628         * out of bytes.
2629         */
2630        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
2631        done += ret;
2632        want -= ret;
2633        /* Got what we needed, so go to next iovec */
2634        if (want == 0) {
2635            continue;
2636        }
2637
2638        /* If we got any data so far, then don't wait
2639         * for more, just return what we have */
2640        if (done > 0) {
2641            break;
2642        }
2643
2644
2645        /* We've got nothing at all, so lets wait for
2646         * more to arrive
2647         */
2648        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2649
2650        if (ret < 0) {
2651            rdma->error_state = ret;
2652            return ret;
2653        }
2654
2655        /*
2656         * SEND was received with new bytes, now try again.
2657         */
2658        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
2659        done += ret;
2660        want -= ret;
2661
2662        /* Still didn't get enough, so lets just return */
2663        if (want) {
2664            if (done == 0) {
2665                return QIO_CHANNEL_ERR_BLOCK;
2666            } else {
2667                break;
2668            }
2669        }
2670    }
2671    rioc->len = done;
2672    return rioc->len;
2673}
2674
2675/*
2676 * Block until all the outstanding chunks have been delivered by the hardware.
2677 */
2678static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2679{
2680    int ret;
2681
2682    if (qemu_rdma_write_flush(f, rdma) < 0) {
2683        return -EIO;
2684    }
2685
2686    while (rdma->nb_sent) {
2687        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2688        if (ret < 0) {
2689            error_report("rdma migration: complete polling error!");
2690            return -EIO;
2691        }
2692    }
2693
2694    qemu_rdma_unregister_waiting(rdma);
2695
2696    return 0;
2697}
2698
2699
2700static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2701                                         bool blocking,
2702                                         Error **errp)
2703{
2704    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2705    /* XXX we should make readv/writev actually honour this :-) */
2706    rioc->blocking = blocking;
2707    return 0;
2708}
2709
2710
2711typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2712struct QIOChannelRDMASource {
2713    GSource parent;
2714    QIOChannelRDMA *rioc;
2715    GIOCondition condition;
2716};
2717
2718static gboolean
2719qio_channel_rdma_source_prepare(GSource *source,
2720                                gint *timeout)
2721{
2722    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2723    RDMAContext *rdma = rsource->rioc->rdma;
2724    GIOCondition cond = 0;
2725    *timeout = -1;
2726
2727    if (rdma->wr_data[0].control_len) {
2728        cond |= G_IO_IN;
2729    }
2730    cond |= G_IO_OUT;
2731
2732    return cond & rsource->condition;
2733}
2734
2735static gboolean
2736qio_channel_rdma_source_check(GSource *source)
2737{
2738    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2739    RDMAContext *rdma = rsource->rioc->rdma;
2740    GIOCondition cond = 0;
2741
2742    if (rdma->wr_data[0].control_len) {
2743        cond |= G_IO_IN;
2744    }
2745    cond |= G_IO_OUT;
2746
2747    return cond & rsource->condition;
2748}
2749
2750static gboolean
2751qio_channel_rdma_source_dispatch(GSource *source,
2752                                 GSourceFunc callback,
2753                                 gpointer user_data)
2754{
2755    QIOChannelFunc func = (QIOChannelFunc)callback;
2756    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2757    RDMAContext *rdma = rsource->rioc->rdma;
2758    GIOCondition cond = 0;
2759
2760    if (rdma->wr_data[0].control_len) {
2761        cond |= G_IO_IN;
2762    }
2763    cond |= G_IO_OUT;
2764
2765    return (*func)(QIO_CHANNEL(rsource->rioc),
2766                   (cond & rsource->condition),
2767                   user_data);
2768}
2769
2770static void
2771qio_channel_rdma_source_finalize(GSource *source)
2772{
2773    QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
2774
2775    object_unref(OBJECT(ssource->rioc));
2776}
2777
2778GSourceFuncs qio_channel_rdma_source_funcs = {
2779    qio_channel_rdma_source_prepare,
2780    qio_channel_rdma_source_check,
2781    qio_channel_rdma_source_dispatch,
2782    qio_channel_rdma_source_finalize
2783};
2784
2785static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
2786                                              GIOCondition condition)
2787{
2788    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2789    QIOChannelRDMASource *ssource;
2790    GSource *source;
2791
2792    source = g_source_new(&qio_channel_rdma_source_funcs,
2793                          sizeof(QIOChannelRDMASource));
2794    ssource = (QIOChannelRDMASource *)source;
2795
2796    ssource->rioc = rioc;
2797    object_ref(OBJECT(rioc));
2798
2799    ssource->condition = condition;
2800
2801    return source;
2802}
2803
2804
2805static int qio_channel_rdma_close(QIOChannel *ioc,
2806                                  Error **errp)
2807{
2808    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2809    trace_qemu_rdma_close();
2810    if (rioc->rdma) {
2811        if (!rioc->rdma->error_state) {
2812            rioc->rdma->error_state = qemu_file_get_error(rioc->file);
2813        }
2814        qemu_rdma_cleanup(rioc->rdma);
2815        g_free(rioc->rdma);
2816        rioc->rdma = NULL;
2817    }
2818    return 0;
2819}
2820
2821/*
2822 * Parameters:
2823 *    @offset == 0 :
2824 *        This means that 'block_offset' is a full virtual address that does not
2825 *        belong to a RAMBlock of the virtual machine and instead
2826 *        represents a private malloc'd memory area that the caller wishes to
2827 *        transfer.
2828 *
2829 *    @offset != 0 :
2830 *        Offset is an offset to be added to block_offset and used
2831 *        to also lookup the corresponding RAMBlock.
2832 *
2833 *    @size > 0 :
2834 *        Initiate an transfer this size.
2835 *
2836 *    @size == 0 :
2837 *        A 'hint' or 'advice' that means that we wish to speculatively
2838 *        and asynchronously unregister this memory. In this case, there is no
2839 *        guarantee that the unregister will actually happen, for example,
2840 *        if the memory is being actively transmitted. Additionally, the memory
2841 *        may be re-registered at any future time if a write within the same
2842 *        chunk was requested again, even if you attempted to unregister it
2843 *        here.
2844 *
2845 *    @size < 0 : TODO, not yet supported
2846 *        Unregister the memory NOW. This means that the caller does not
2847 *        expect there to be any future RDMA transfers and we just want to clean
2848 *        things up. This is used in case the upper layer owns the memory and
2849 *        cannot wait for qemu_fclose() to occur.
2850 *
2851 *    @bytes_sent : User-specificed pointer to indicate how many bytes were
2852 *                  sent. Usually, this will not be more than a few bytes of
2853 *                  the protocol because most transfers are sent asynchronously.
2854 */
2855static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
2856                                  ram_addr_t block_offset, ram_addr_t offset,
2857                                  size_t size, uint64_t *bytes_sent)
2858{
2859    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
2860    RDMAContext *rdma = rioc->rdma;
2861    int ret;
2862
2863    CHECK_ERROR_STATE();
2864
2865    qemu_fflush(f);
2866
2867    if (size > 0) {
2868        /*
2869         * Add this page to the current 'chunk'. If the chunk
2870         * is full, or the page doen't belong to the current chunk,
2871         * an actual RDMA write will occur and a new chunk will be formed.
2872         */
2873        ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
2874        if (ret < 0) {
2875            error_report("rdma migration: write error! %d", ret);
2876            goto err;
2877        }
2878
2879        /*
2880         * We always return 1 bytes because the RDMA
2881         * protocol is completely asynchronous. We do not yet know
2882         * whether an  identified chunk is zero or not because we're
2883         * waiting for other pages to potentially be merged with
2884         * the current chunk. So, we have to call qemu_update_position()
2885         * later on when the actual write occurs.
2886         */
2887        if (bytes_sent) {
2888            *bytes_sent = 1;
2889        }
2890    } else {
2891        uint64_t index, chunk;
2892
2893        /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
2894        if (size < 0) {
2895            ret = qemu_rdma_drain_cq(f, rdma);
2896            if (ret < 0) {
2897                fprintf(stderr, "rdma: failed to synchronously drain"
2898                                " completion queue before unregistration.\n");
2899                goto err;
2900            }
2901        }
2902        */
2903
2904        ret = qemu_rdma_search_ram_block(rdma, block_offset,
2905                                         offset, size, &index, &chunk);
2906
2907        if (ret) {
2908            error_report("ram block search failed");
2909            goto err;
2910        }
2911
2912        qemu_rdma_signal_unregister(rdma, index, chunk, 0);
2913
2914        /*
2915         * TODO: Synchronous, guaranteed unregistration (should not occur during
2916         * fast-path). Otherwise, unregisters will process on the next call to
2917         * qemu_rdma_drain_cq()
2918        if (size < 0) {
2919            qemu_rdma_unregister_waiting(rdma);
2920        }
2921        */
2922    }
2923
2924    /*
2925     * Drain the Completion Queue if possible, but do not block,
2926     * just poll.
2927     *
2928     * If nothing to poll, the end of the iteration will do this
2929     * again to make sure we don't overflow the request queue.
2930     */
2931    while (1) {
2932        uint64_t wr_id, wr_id_in;
2933        int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
2934        if (ret < 0) {
2935            error_report("rdma migration: polling error! %d", ret);
2936            goto err;
2937        }
2938
2939        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
2940
2941        if (wr_id == RDMA_WRID_NONE) {
2942            break;
2943        }
2944    }
2945
2946    return RAM_SAVE_CONTROL_DELAYED;
2947err:
2948    rdma->error_state = ret;
2949    return ret;
2950}
2951
2952static int qemu_rdma_accept(RDMAContext *rdma)
2953{
2954    RDMACapabilities cap;
2955    struct rdma_conn_param conn_param = {
2956                                            .responder_resources = 2,
2957                                            .private_data = &cap,
2958                                            .private_data_len = sizeof(cap),
2959                                         };
2960    struct rdma_cm_event *cm_event;
2961    struct ibv_context *verbs;
2962    int ret = -EINVAL;
2963    int idx;
2964
2965    ret = rdma_get_cm_event(rdma->channel, &cm_event);
2966    if (ret) {
2967        goto err_rdma_dest_wait;
2968    }
2969
2970    if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
2971        rdma_ack_cm_event(cm_event);
2972        goto err_rdma_dest_wait;
2973    }
2974
2975    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2976
2977    network_to_caps(&cap);
2978
2979    if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
2980            error_report("Unknown source RDMA version: %d, bailing...",
2981                            cap.version);
2982            rdma_ack_cm_event(cm_event);
2983            goto err_rdma_dest_wait;
2984    }
2985
2986    /*
2987     * Respond with only the capabilities this version of QEMU knows about.
2988     */
2989    cap.flags &= known_capabilities;
2990
2991    /*
2992     * Enable the ones that we do know about.
2993     * Add other checks here as new ones are introduced.
2994     */
2995    if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
2996        rdma->pin_all = true;
2997    }
2998
2999    rdma->cm_id = cm_event->id;
3000    verbs = cm_event->id->verbs;
3001
3002    rdma_ack_cm_event(cm_event);
3003
3004    trace_qemu_rdma_accept_pin_state(rdma->pin_all);
3005
3006    caps_to_network(&cap);
3007
3008    trace_qemu_rdma_accept_pin_verbsc(verbs);
3009
3010    if (!rdma->verbs) {
3011        rdma->verbs = verbs;
3012    } else if (rdma->verbs != verbs) {
3013            error_report("ibv context not matching %p, %p!", rdma->verbs,
3014                         verbs);
3015            goto err_rdma_dest_wait;
3016    }
3017
3018    qemu_rdma_dump_id("dest_init", verbs);
3019
3020    ret = qemu_rdma_alloc_pd_cq(rdma);
3021    if (ret) {
3022        error_report("rdma migration: error allocating pd and cq!");
3023        goto err_rdma_dest_wait;
3024    }
3025
3026    ret = qemu_rdma_alloc_qp(rdma);
3027    if (ret) {
3028        error_report("rdma migration: error allocating qp!");
3029        goto err_rdma_dest_wait;
3030    }
3031
3032    ret = qemu_rdma_init_ram_blocks(rdma);
3033    if (ret) {
3034        error_report("rdma migration: error initializing ram blocks!");
3035        goto err_rdma_dest_wait;
3036    }
3037
3038    for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
3039        ret = qemu_rdma_reg_control(rdma, idx);
3040        if (ret) {
3041            error_report("rdma: error registering %d control", idx);
3042            goto err_rdma_dest_wait;
3043        }
3044    }
3045
3046    qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
3047
3048    ret = rdma_accept(rdma->cm_id, &conn_param);
3049    if (ret) {
3050        error_report("rdma_accept returns %d", ret);
3051        goto err_rdma_dest_wait;
3052    }
3053
3054    ret = rdma_get_cm_event(rdma->channel, &cm_event);
3055    if (ret) {
3056        error_report("rdma_accept get_cm_event failed %d", ret);
3057        goto err_rdma_dest_wait;
3058    }
3059
3060    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
3061        error_report("rdma_accept not event established");
3062        rdma_ack_cm_event(cm_event);
3063        goto err_rdma_dest_wait;
3064    }
3065
3066    rdma_ack_cm_event(cm_event);
3067    rdma->connected = true;
3068
3069    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
3070    if (ret) {
3071        error_report("rdma migration: error posting second control recv");
3072        goto err_rdma_dest_wait;
3073    }
3074
3075    qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3076
3077    return 0;
3078
3079err_rdma_dest_wait:
3080    rdma->error_state = ret;
3081    qemu_rdma_cleanup(rdma);
3082    return ret;
3083}
3084
3085static int dest_ram_sort_func(const void *a, const void *b)
3086{
3087    unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3088    unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3089
3090    return (a_index < b_index) ? -1 : (a_index != b_index);
3091}
3092
3093/*
3094 * During each iteration of the migration, we listen for instructions
3095 * by the source VM to perform dynamic page registrations before they
3096 * can perform RDMA operations.
3097 *
3098 * We respond with the 'rkey'.
3099 *
3100 * Keep doing this until the source tells us to stop.
3101 */
3102static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
3103{
3104    RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3105                               .type = RDMA_CONTROL_REGISTER_RESULT,
3106                               .repeat = 0,
3107                             };
3108    RDMAControlHeader unreg_resp = { .len = 0,
3109                               .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3110                               .repeat = 0,
3111                             };
3112    RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3113                                 .repeat = 1 };
3114    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3115    RDMAContext *rdma = rioc->rdma;
3116    RDMALocalBlocks *local = &rdma->local_ram_blocks;
3117    RDMAControlHeader head;
3118    RDMARegister *reg, *registers;
3119    RDMACompress *comp;
3120    RDMARegisterResult *reg_result;
3121    static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3122    RDMALocalBlock *block;
3123    void *host_addr;
3124    int ret = 0;
3125    int idx = 0;
3126    int count = 0;
3127    int i = 0;
3128
3129    CHECK_ERROR_STATE();
3130
3131    do {
3132        trace_qemu_rdma_registration_handle_wait();
3133
3134        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
3135
3136        if (ret < 0) {
3137            break;
3138        }
3139
3140        if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
3141            error_report("rdma: Too many requests in this message (%d)."
3142                            "Bailing.", head.repeat);
3143            ret = -EIO;
3144            break;
3145        }
3146
3147        switch (head.type) {
3148        case RDMA_CONTROL_COMPRESS:
3149            comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3150            network_to_compress(comp);
3151
3152            trace_qemu_rdma_registration_handle_compress(comp->length,
3153                                                         comp->block_idx,
3154                                                         comp->offset);
3155            if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3156                error_report("rdma: 'compress' bad block index %u (vs %d)",
3157                             (unsigned int)comp->block_idx,
3158                             rdma->local_ram_blocks.nb_blocks);
3159                ret = -EIO;
3160                goto out;
3161            }
3162            block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3163
3164            host_addr = block->local_host_addr +
3165                            (comp->offset - block->offset);
3166
3167            ram_handle_compressed(host_addr, comp->value, comp->length);
3168            break;
3169
3170        case RDMA_CONTROL_REGISTER_FINISHED:
3171            trace_qemu_rdma_registration_handle_finished();
3172            goto out;
3173
3174        case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3175            trace_qemu_rdma_registration_handle_ram_blocks();
3176
3177            /* Sort our local RAM Block list so it's the same as the source,
3178             * we can do this since we've filled in a src_index in the list
3179             * as we received the RAMBlock list earlier.
3180             */
3181            qsort(rdma->local_ram_blocks.block,
3182                  rdma->local_ram_blocks.nb_blocks,
3183                  sizeof(RDMALocalBlock), dest_ram_sort_func);
3184            if (rdma->pin_all) {
3185                ret = qemu_rdma_reg_whole_ram_blocks(rdma);
3186                if (ret) {
3187                    error_report("rdma migration: error dest "
3188                                    "registering ram blocks");
3189                    goto out;
3190                }
3191            }
3192
3193            /*
3194             * Dest uses this to prepare to transmit the RAMBlock descriptions
3195             * to the source VM after connection setup.
3196             * Both sides use the "remote" structure to communicate and update
3197             * their "local" descriptions with what was sent.
3198             */
3199            for (i = 0; i < local->nb_blocks; i++) {
3200                rdma->dest_blocks[i].remote_host_addr =
3201                    (uintptr_t)(local->block[i].local_host_addr);
3202
3203                if (rdma->pin_all) {
3204                    rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3205                }
3206
3207                rdma->dest_blocks[i].offset = local->block[i].offset;
3208                rdma->dest_blocks[i].length = local->block[i].length;
3209
3210                dest_block_to_network(&rdma->dest_blocks[i]);
3211                trace_qemu_rdma_registration_handle_ram_blocks_loop(
3212                    local->block[i].block_name,
3213                    local->block[i].offset,
3214                    local->block[i].length,
3215                    local->block[i].local_host_addr,
3216                    local->block[i].src_index);
3217            }
3218
3219            blocks.len = rdma->local_ram_blocks.nb_blocks
3220                                                * sizeof(RDMADestBlock);
3221
3222
3223            ret = qemu_rdma_post_send_control(rdma,
3224                                        (uint8_t *) rdma->dest_blocks, &blocks);
3225
3226            if (ret < 0) {
3227                error_report("rdma migration: error sending remote info");
3228                goto out;
3229            }
3230
3231            break;
3232        case RDMA_CONTROL_REGISTER_REQUEST:
3233            trace_qemu_rdma_registration_handle_register(head.repeat);
3234
3235            reg_resp.repeat = head.repeat;
3236            registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3237
3238            for (count = 0; count < head.repeat; count++) {
3239                uint64_t chunk;
3240                uint8_t *chunk_start, *chunk_end;
3241
3242                reg = &registers[count];
3243                network_to_register(reg);
3244
3245                reg_result = &results[count];
3246
3247                trace_qemu_rdma_registration_handle_register_loop(count,
3248                         reg->current_index, reg->key.current_addr, reg->chunks);
3249
3250                if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3251                    error_report("rdma: 'register' bad block index %u (vs %d)",
3252                                 (unsigned int)reg->current_index,
3253                                 rdma->local_ram_blocks.nb_blocks);
3254                    ret = -ENOENT;
3255                    goto out;
3256                }
3257                block = &(rdma->local_ram_blocks.block[reg->current_index]);
3258                if (block->is_ram_block) {
3259                    if (block->offset > reg->key.current_addr) {
3260                        error_report("rdma: bad register address for block %s"
3261                            " offset: %" PRIx64 " current_addr: %" PRIx64,
3262                            block->block_name, block->offset,
3263                            reg->key.current_addr);
3264                        ret = -ERANGE;
3265                        goto out;
3266                    }
3267                    host_addr = (block->local_host_addr +
3268                                (reg->key.current_addr - block->offset));
3269                    chunk = ram_chunk_index(block->local_host_addr,
3270                                            (uint8_t *) host_addr);
3271                } else {
3272                    chunk = reg->key.chunk;
3273                    host_addr = block->local_host_addr +
3274                        (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3275                    /* Check for particularly bad chunk value */
3276                    if (host_addr < (void *)block->local_host_addr) {
3277                        error_report("rdma: bad chunk for block %s"
3278                            " chunk: %" PRIx64,
3279                            block->block_name, reg->key.chunk);
3280                        ret = -ERANGE;
3281                        goto out;
3282                    }
3283                }
3284                chunk_start = ram_chunk_start(block, chunk);
3285                chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3286                if (qemu_rdma_register_and_get_keys(rdma, block,
3287                            (uintptr_t)host_addr, NULL, &reg_result->rkey,
3288                            chunk, chunk_start, chunk_end)) {
3289                    error_report("cannot get rkey");
3290                    ret = -EINVAL;
3291                    goto out;
3292                }
3293
3294                reg_result->host_addr = (uintptr_t)block->local_host_addr;
3295
3296                trace_qemu_rdma_registration_handle_register_rkey(
3297                                                           reg_result->rkey);
3298
3299                result_to_network(reg_result);
3300            }
3301
3302            ret = qemu_rdma_post_send_control(rdma,
3303                            (uint8_t *) results, &reg_resp);
3304
3305            if (ret < 0) {
3306                error_report("Failed to send control buffer");
3307                goto out;
3308            }
3309            break;
3310        case RDMA_CONTROL_UNREGISTER_REQUEST:
3311            trace_qemu_rdma_registration_handle_unregister(head.repeat);
3312            unreg_resp.repeat = head.repeat;
3313            registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3314
3315            for (count = 0; count < head.repeat; count++) {
3316                reg = &registers[count];
3317                network_to_register(reg);
3318
3319                trace_qemu_rdma_registration_handle_unregister_loop(count,
3320                           reg->current_index, reg->key.chunk);
3321
3322                block = &(rdma->local_ram_blocks.block[reg->current_index]);
3323
3324                ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3325                block->pmr[reg->key.chunk] = NULL;
3326
3327                if (ret != 0) {
3328                    perror("rdma unregistration chunk failed");
3329                    ret = -ret;
3330                    goto out;
3331                }
3332
3333                rdma->total_registrations--;
3334
3335                trace_qemu_rdma_registration_handle_unregister_success(
3336                                                       reg->key.chunk);
3337            }
3338
3339            ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
3340
3341            if (ret < 0) {
3342                error_report("Failed to send control buffer");
3343                goto out;
3344            }
3345            break;
3346        case RDMA_CONTROL_REGISTER_RESULT:
3347            error_report("Invalid RESULT message at dest.");
3348            ret = -EIO;
3349            goto out;
3350        default:
3351            error_report("Unknown control message %s", control_desc[head.type]);
3352            ret = -EIO;
3353            goto out;
3354        }
3355    } while (1);
3356out:
3357    if (ret < 0) {
3358        rdma->error_state = ret;
3359    }
3360    return ret;
3361}
3362
3363/* Destination:
3364 * Called via a ram_control_load_hook during the initial RAM load section which
3365 * lists the RAMBlocks by name.  This lets us know the order of the RAMBlocks
3366 * on the source.
3367 * We've already built our local RAMBlock list, but not yet sent the list to
3368 * the source.
3369 */
3370static int
3371rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
3372{
3373    RDMAContext *rdma = rioc->rdma;
3374    int curr;
3375    int found = -1;
3376
3377    /* Find the matching RAMBlock in our local list */
3378    for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3379        if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3380            found = curr;
3381            break;
3382        }
3383    }
3384
3385    if (found == -1) {
3386        error_report("RAMBlock '%s' not found on destination", name);
3387        return -ENOENT;
3388    }
3389
3390    rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3391    trace_rdma_block_notification_handle(name, rdma->next_src_index);
3392    rdma->next_src_index++;
3393
3394    return 0;
3395}
3396
3397static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
3398{
3399    switch (flags) {
3400    case RAM_CONTROL_BLOCK_REG:
3401        return rdma_block_notification_handle(opaque, data);
3402
3403    case RAM_CONTROL_HOOK:
3404        return qemu_rdma_registration_handle(f, opaque);
3405
3406    default:
3407        /* Shouldn't be called with any other values */
3408        abort();
3409    }
3410}
3411
3412static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
3413                                        uint64_t flags, void *data)
3414{
3415    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3416    RDMAContext *rdma = rioc->rdma;
3417
3418    CHECK_ERROR_STATE();
3419
3420    trace_qemu_rdma_registration_start(flags);
3421    qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3422    qemu_fflush(f);
3423
3424    return 0;
3425}
3426
3427/*
3428 * Inform dest that dynamic registrations are done for now.
3429 * First, flush writes, if any.
3430 */
3431static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
3432                                       uint64_t flags, void *data)
3433{
3434    Error *local_err = NULL, **errp = &local_err;
3435    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3436    RDMAContext *rdma = rioc->rdma;
3437    RDMAControlHeader head = { .len = 0, .repeat = 1 };
3438    int ret = 0;
3439
3440    CHECK_ERROR_STATE();
3441
3442    qemu_fflush(f);
3443    ret = qemu_rdma_drain_cq(f, rdma);
3444
3445    if (ret < 0) {
3446        goto err;
3447    }
3448
3449    if (flags == RAM_CONTROL_SETUP) {
3450        RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3451        RDMALocalBlocks *local = &rdma->local_ram_blocks;
3452        int reg_result_idx, i, nb_dest_blocks;
3453
3454        head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3455        trace_qemu_rdma_registration_stop_ram();
3456
3457        /*
3458         * Make sure that we parallelize the pinning on both sides.
3459         * For very large guests, doing this serially takes a really
3460         * long time, so we have to 'interleave' the pinning locally
3461         * with the control messages by performing the pinning on this
3462         * side before we receive the control response from the other
3463         * side that the pinning has completed.
3464         */
3465        ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3466                    &reg_result_idx, rdma->pin_all ?
3467                    qemu_rdma_reg_whole_ram_blocks : NULL);
3468        if (ret < 0) {
3469            ERROR(errp, "receiving remote info!");
3470            return ret;
3471        }
3472
3473        nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3474
3475        /*
3476         * The protocol uses two different sets of rkeys (mutually exclusive):
3477         * 1. One key to represent the virtual address of the entire ram block.
3478         *    (dynamic chunk registration disabled - pin everything with one rkey.)
3479         * 2. One to represent individual chunks within a ram block.
3480         *    (dynamic chunk registration enabled - pin individual chunks.)
3481         *
3482         * Once the capability is successfully negotiated, the destination transmits
3483         * the keys to use (or sends them later) including the virtual addresses
3484         * and then propagates the remote ram block descriptions to his local copy.
3485         */
3486
3487        if (local->nb_blocks != nb_dest_blocks) {
3488            ERROR(errp, "ram blocks mismatch (Number of blocks %d vs %d) "
3489                        "Your QEMU command line parameters are probably "
3490                        "not identical on both the source and destination.",
3491                        local->nb_blocks, nb_dest_blocks);
3492            rdma->error_state = -EINVAL;
3493            return -EINVAL;
3494        }
3495
3496        qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3497        memcpy(rdma->dest_blocks,
3498            rdma->wr_data[reg_result_idx].control_curr, resp.len);
3499        for (i = 0; i < nb_dest_blocks; i++) {
3500            network_to_dest_block(&rdma->dest_blocks[i]);
3501
3502            /* We require that the blocks are in the same order */
3503            if (rdma->dest_blocks[i].length != local->block[i].length) {
3504                ERROR(errp, "Block %s/%d has a different length %" PRIu64
3505                            "vs %" PRIu64, local->block[i].block_name, i,
3506                            local->block[i].length,
3507                            rdma->dest_blocks[i].length);
3508                rdma->error_state = -EINVAL;
3509                return -EINVAL;
3510            }
3511            local->block[i].remote_host_addr =
3512                    rdma->dest_blocks[i].remote_host_addr;
3513            local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3514        }
3515    }
3516
3517    trace_qemu_rdma_registration_stop(flags);
3518
3519    head.type = RDMA_CONTROL_REGISTER_FINISHED;
3520    ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3521
3522    if (ret < 0) {
3523        goto err;
3524    }
3525
3526    return 0;
3527err:
3528    rdma->error_state = ret;
3529    return ret;
3530}
3531
3532static const QEMUFileHooks rdma_read_hooks = {
3533    .hook_ram_load = rdma_load_hook,
3534};
3535
3536static const QEMUFileHooks rdma_write_hooks = {
3537    .before_ram_iterate = qemu_rdma_registration_start,
3538    .after_ram_iterate  = qemu_rdma_registration_stop,
3539    .save_page          = qemu_rdma_save_page,
3540};
3541
3542
3543static void qio_channel_rdma_finalize(Object *obj)
3544{
3545    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
3546    if (rioc->rdma) {
3547        qemu_rdma_cleanup(rioc->rdma);
3548        g_free(rioc->rdma);
3549        rioc->rdma = NULL;
3550    }
3551}
3552
3553static void qio_channel_rdma_class_init(ObjectClass *klass,
3554                                        void *class_data G_GNUC_UNUSED)
3555{
3556    QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
3557
3558    ioc_klass->io_writev = qio_channel_rdma_writev;
3559    ioc_klass->io_readv = qio_channel_rdma_readv;
3560    ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
3561    ioc_klass->io_close = qio_channel_rdma_close;
3562    ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
3563}
3564
3565static const TypeInfo qio_channel_rdma_info = {
3566    .parent = TYPE_QIO_CHANNEL,
3567    .name = TYPE_QIO_CHANNEL_RDMA,
3568    .instance_size = sizeof(QIOChannelRDMA),
3569    .instance_finalize = qio_channel_rdma_finalize,
3570    .class_init = qio_channel_rdma_class_init,
3571};
3572
3573static void qio_channel_rdma_register_types(void)
3574{
3575    type_register_static(&qio_channel_rdma_info);
3576}
3577
3578type_init(qio_channel_rdma_register_types);
3579
3580static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
3581{
3582    QIOChannelRDMA *rioc;
3583
3584    if (qemu_file_mode_is_not_valid(mode)) {
3585        return NULL;
3586    }
3587
3588    rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
3589    rioc->rdma = rdma;
3590
3591    if (mode[0] == 'w') {
3592        rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
3593        qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
3594    } else {
3595        rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
3596        qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
3597    }
3598
3599    return rioc->file;
3600}
3601
3602static void rdma_accept_incoming_migration(void *opaque)
3603{
3604    RDMAContext *rdma = opaque;
3605    int ret;
3606    QEMUFile *f;
3607    Error *local_err = NULL, **errp = &local_err;
3608
3609    trace_qemu_rdma_accept_incoming_migration();
3610    ret = qemu_rdma_accept(rdma);
3611
3612    if (ret) {
3613        ERROR(errp, "RDMA Migration initialization failed!");
3614        return;
3615    }
3616
3617    trace_qemu_rdma_accept_incoming_migration_accepted();
3618
3619    f = qemu_fopen_rdma(rdma, "rb");
3620    if (f == NULL) {
3621        ERROR(errp, "could not qemu_fopen_rdma!");
3622        qemu_rdma_cleanup(rdma);
3623        return;
3624    }
3625
3626    rdma->migration_started_on_destination = 1;
3627    migration_fd_process_incoming(f);
3628}
3629
3630void rdma_start_incoming_migration(const char *host_port, Error **errp)
3631{
3632    int ret;
3633    RDMAContext *rdma;
3634    Error *local_err = NULL;
3635
3636    trace_rdma_start_incoming_migration();
3637    rdma = qemu_rdma_data_init(host_port, &local_err);
3638
3639    if (rdma == NULL) {
3640        goto err;
3641    }
3642
3643    ret = qemu_rdma_dest_init(rdma, &local_err);
3644
3645    if (ret) {
3646        goto err;
3647    }
3648
3649    trace_rdma_start_incoming_migration_after_dest_init();
3650
3651    ret = rdma_listen(rdma->listen_id, 5);
3652
3653    if (ret) {
3654        ERROR(errp, "listening on socket!");
3655        goto err;
3656    }
3657
3658    trace_rdma_start_incoming_migration_after_rdma_listen();
3659
3660    qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3661                        NULL, (void *)(intptr_t)rdma);
3662    return;
3663err:
3664    error_propagate(errp, local_err);
3665    g_free(rdma);
3666}
3667
3668void rdma_start_outgoing_migration(void *opaque,
3669                            const char *host_port, Error **errp)
3670{
3671    MigrationState *s = opaque;
3672    RDMAContext *rdma = qemu_rdma_data_init(host_port, errp);
3673    int ret = 0;
3674
3675    if (rdma == NULL) {
3676        goto err;
3677    }
3678
3679    ret = qemu_rdma_source_init(rdma, errp,
3680        s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL]);
3681
3682    if (ret) {
3683        goto err;
3684    }
3685
3686    trace_rdma_start_outgoing_migration_after_rdma_source_init();
3687    ret = qemu_rdma_connect(rdma, errp);
3688
3689    if (ret) {
3690        goto err;
3691    }
3692
3693    trace_rdma_start_outgoing_migration_after_rdma_connect();
3694
3695    s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
3696    migrate_fd_connect(s);
3697    return;
3698err:
3699    g_free(rdma);
3700}
3701