qemu/migration/multifd-zstd.c
<<
>>
Prefs
   1/*
   2 * Multifd zlib compression implementation
   3 *
   4 * Copyright (c) 2020 Red Hat Inc
   5 *
   6 * Authors:
   7 *  Juan Quintela <quintela@redhat.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  10 * See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include <zstd.h>
  15#include "qemu/rcu.h"
  16#include "exec/ramblock.h"
  17#include "exec/target_page.h"
  18#include "qapi/error.h"
  19#include "migration.h"
  20#include "trace.h"
  21#include "multifd.h"
  22
  23struct zstd_data {
  24    /* stream for compression */
  25    ZSTD_CStream *zcs;
  26    /* stream for decompression */
  27    ZSTD_DStream *zds;
  28    /* buffers */
  29    ZSTD_inBuffer in;
  30    ZSTD_outBuffer out;
  31    /* compressed buffer */
  32    uint8_t *zbuff;
  33    /* size of compressed buffer */
  34    uint32_t zbuff_len;
  35};
  36
  37/* Multifd zstd compression */
  38
  39/**
  40 * zstd_send_setup: setup send side
  41 *
  42 * Setup each channel with zstd compression.
  43 *
  44 * Returns 0 for success or -1 for error
  45 *
  46 * @p: Params for the channel that we are using
  47 * @errp: pointer to an error
  48 */
  49static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
  50{
  51    struct zstd_data *z = g_new0(struct zstd_data, 1);
  52    int res;
  53
  54    p->data = z;
  55    z->zcs = ZSTD_createCStream();
  56    if (!z->zcs) {
  57        g_free(z);
  58        error_setg(errp, "multifd %u: zstd createCStream failed", p->id);
  59        return -1;
  60    }
  61
  62    res = ZSTD_initCStream(z->zcs, migrate_multifd_zstd_level());
  63    if (ZSTD_isError(res)) {
  64        ZSTD_freeCStream(z->zcs);
  65        g_free(z);
  66        error_setg(errp, "multifd %u: initCStream failed with error %s",
  67                   p->id, ZSTD_getErrorName(res));
  68        return -1;
  69    }
  70    /* This is the maxium size of the compressed buffer */
  71    z->zbuff_len = ZSTD_compressBound(MULTIFD_PACKET_SIZE);
  72    z->zbuff = g_try_malloc(z->zbuff_len);
  73    if (!z->zbuff) {
  74        ZSTD_freeCStream(z->zcs);
  75        g_free(z);
  76        error_setg(errp, "multifd %u: out of memory for zbuff", p->id);
  77        return -1;
  78    }
  79    return 0;
  80}
  81
  82/**
  83 * zstd_send_cleanup: cleanup send side
  84 *
  85 * Close the channel and return memory.
  86 *
  87 * @p: Params for the channel that we are using
  88 * @errp: pointer to an error
  89 */
  90static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
  91{
  92    struct zstd_data *z = p->data;
  93
  94    ZSTD_freeCStream(z->zcs);
  95    z->zcs = NULL;
  96    g_free(z->zbuff);
  97    z->zbuff = NULL;
  98    g_free(p->data);
  99    p->data = NULL;
 100}
 101
 102/**
 103 * zstd_send_prepare: prepare date to be able to send
 104 *
 105 * Create a compressed buffer with all the pages that we are going to
 106 * send.
 107 *
 108 * Returns 0 for success or -1 for error
 109 *
 110 * @p: Params for the channel that we are using
 111 * @errp: pointer to an error
 112 */
 113static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
 114{
 115    struct zstd_data *z = p->data;
 116    int ret;
 117    uint32_t i;
 118
 119    z->out.dst = z->zbuff;
 120    z->out.size = z->zbuff_len;
 121    z->out.pos = 0;
 122
 123    for (i = 0; i < p->normal_num; i++) {
 124        ZSTD_EndDirective flush = ZSTD_e_continue;
 125
 126        if (i == p->normal_num - 1) {
 127            flush = ZSTD_e_flush;
 128        }
 129        z->in.src = p->pages->block->host + p->normal[i];
 130        z->in.size = p->page_size;
 131        z->in.pos = 0;
 132
 133        /*
 134         * Welcome to compressStream2 semantics
 135         *
 136         * We need to loop while:
 137         * - return is > 0
 138         * - there is input available
 139         * - there is output space free
 140         */
 141        do {
 142            ret = ZSTD_compressStream2(z->zcs, &z->out, &z->in, flush);
 143        } while (ret > 0 && (z->in.size - z->in.pos > 0)
 144                         && (z->out.size - z->out.pos > 0));
 145        if (ret > 0 && (z->in.size - z->in.pos > 0)) {
 146            error_setg(errp, "multifd %u: compressStream buffer too small",
 147                       p->id);
 148            return -1;
 149        }
 150        if (ZSTD_isError(ret)) {
 151            error_setg(errp, "multifd %u: compressStream error %s",
 152                       p->id, ZSTD_getErrorName(ret));
 153            return -1;
 154        }
 155    }
 156    p->iov[p->iovs_num].iov_base = z->zbuff;
 157    p->iov[p->iovs_num].iov_len = z->out.pos;
 158    p->iovs_num++;
 159    p->next_packet_size = z->out.pos;
 160    p->flags |= MULTIFD_FLAG_ZSTD;
 161
 162    return 0;
 163}
 164
 165/**
 166 * zstd_recv_setup: setup receive side
 167 *
 168 * Create the compressed channel and buffer.
 169 *
 170 * Returns 0 for success or -1 for error
 171 *
 172 * @p: Params for the channel that we are using
 173 * @errp: pointer to an error
 174 */
 175static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp)
 176{
 177    struct zstd_data *z = g_new0(struct zstd_data, 1);
 178    int ret;
 179
 180    p->data = z;
 181    z->zds = ZSTD_createDStream();
 182    if (!z->zds) {
 183        g_free(z);
 184        error_setg(errp, "multifd %u: zstd createDStream failed", p->id);
 185        return -1;
 186    }
 187
 188    ret = ZSTD_initDStream(z->zds);
 189    if (ZSTD_isError(ret)) {
 190        ZSTD_freeDStream(z->zds);
 191        g_free(z);
 192        error_setg(errp, "multifd %u: initDStream failed with error %s",
 193                   p->id, ZSTD_getErrorName(ret));
 194        return -1;
 195    }
 196
 197    /* To be safe, we reserve twice the size of the packet */
 198    z->zbuff_len = MULTIFD_PACKET_SIZE * 2;
 199    z->zbuff = g_try_malloc(z->zbuff_len);
 200    if (!z->zbuff) {
 201        ZSTD_freeDStream(z->zds);
 202        g_free(z);
 203        error_setg(errp, "multifd %u: out of memory for zbuff", p->id);
 204        return -1;
 205    }
 206    return 0;
 207}
 208
 209/**
 210 * zstd_recv_cleanup: setup receive side
 211 *
 212 * For no compression this function does nothing.
 213 *
 214 * @p: Params for the channel that we are using
 215 */
 216static void zstd_recv_cleanup(MultiFDRecvParams *p)
 217{
 218    struct zstd_data *z = p->data;
 219
 220    ZSTD_freeDStream(z->zds);
 221    z->zds = NULL;
 222    g_free(z->zbuff);
 223    z->zbuff = NULL;
 224    g_free(p->data);
 225    p->data = NULL;
 226}
 227
 228/**
 229 * zstd_recv_pages: read the data from the channel into actual pages
 230 *
 231 * Read the compressed buffer, and uncompress it into the actual
 232 * pages.
 233 *
 234 * Returns 0 for success or -1 for error
 235 *
 236 * @p: Params for the channel that we are using
 237 * @errp: pointer to an error
 238 */
 239static int zstd_recv_pages(MultiFDRecvParams *p, Error **errp)
 240{
 241    uint32_t in_size = p->next_packet_size;
 242    uint32_t out_size = 0;
 243    uint32_t expected_size = p->normal_num * p->page_size;
 244    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
 245    struct zstd_data *z = p->data;
 246    int ret;
 247    int i;
 248
 249    if (flags != MULTIFD_FLAG_ZSTD) {
 250        error_setg(errp, "multifd %u: flags received %x flags expected %x",
 251                   p->id, flags, MULTIFD_FLAG_ZSTD);
 252        return -1;
 253    }
 254    ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp);
 255
 256    if (ret != 0) {
 257        return ret;
 258    }
 259
 260    z->in.src = z->zbuff;
 261    z->in.size = in_size;
 262    z->in.pos = 0;
 263
 264    for (i = 0; i < p->normal_num; i++) {
 265        z->out.dst = p->host + p->normal[i];
 266        z->out.size = p->page_size;
 267        z->out.pos = 0;
 268
 269        /*
 270         * Welcome to decompressStream semantics
 271         *
 272         * We need to loop while:
 273         * - return is > 0
 274         * - there is input available
 275         * - we haven't put out a full page
 276         */
 277        do {
 278            ret = ZSTD_decompressStream(z->zds, &z->out, &z->in);
 279        } while (ret > 0 && (z->in.size - z->in.pos > 0)
 280                         && (z->out.pos < p->page_size));
 281        if (ret > 0 && (z->out.pos < p->page_size)) {
 282            error_setg(errp, "multifd %u: decompressStream buffer too small",
 283                       p->id);
 284            return -1;
 285        }
 286        if (ZSTD_isError(ret)) {
 287            error_setg(errp, "multifd %u: decompressStream returned %s",
 288                       p->id, ZSTD_getErrorName(ret));
 289            return ret;
 290        }
 291        out_size += z->out.pos;
 292    }
 293    if (out_size != expected_size) {
 294        error_setg(errp, "multifd %u: packet size received %u size expected %u",
 295                   p->id, out_size, expected_size);
 296        return -1;
 297    }
 298    return 0;
 299}
 300
 301static MultiFDMethods multifd_zstd_ops = {
 302    .send_setup = zstd_send_setup,
 303    .send_cleanup = zstd_send_cleanup,
 304    .send_prepare = zstd_send_prepare,
 305    .recv_setup = zstd_recv_setup,
 306    .recv_cleanup = zstd_recv_cleanup,
 307    .recv_pages = zstd_recv_pages
 308};
 309
 310static void multifd_zstd_register(void)
 311{
 312    multifd_register_ops(MULTIFD_COMPRESSION_ZSTD, &multifd_zstd_ops);
 313}
 314
 315migration_init(multifd_zstd_register);
 316