qemu/migration/multifd-zstd.c
<<
>>
Prefs
   1/*
   2 * Multifd zlib compression implementation
   3 *
   4 * Copyright (c) 2020 Red Hat Inc
   5 *
   6 * Authors:
   7 *  Juan Quintela <quintela@redhat.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  10 * See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include <zstd.h>
  15#include "qemu/rcu.h"
  16#include "exec/target_page.h"
  17#include "qapi/error.h"
  18#include "migration.h"
  19#include "trace.h"
  20#include "multifd.h"
  21
  22struct zstd_data {
  23    /* stream for compression */
  24    ZSTD_CStream *zcs;
  25    /* stream for decompression */
  26    ZSTD_DStream *zds;
  27    /* buffers */
  28    ZSTD_inBuffer in;
  29    ZSTD_outBuffer out;
  30    /* compressed buffer */
  31    uint8_t *zbuff;
  32    /* size of compressed buffer */
  33    uint32_t zbuff_len;
  34};
  35
  36/* Multifd zstd compression */
  37
  38/**
  39 * zstd_send_setup: setup send side
  40 *
  41 * Setup each channel with zstd compression.
  42 *
  43 * Returns 0 for success or -1 for error
  44 *
  45 * @p: Params for the channel that we are using
  46 * @errp: pointer to an error
  47 */
  48static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
  49{
  50    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
  51    struct zstd_data *z = g_new0(struct zstd_data, 1);
  52    int res;
  53
  54    p->data = z;
  55    z->zcs = ZSTD_createCStream();
  56    if (!z->zcs) {
  57        g_free(z);
  58        error_setg(errp, "multifd %d: zstd createCStream failed", p->id);
  59        return -1;
  60    }
  61
  62    res = ZSTD_initCStream(z->zcs, migrate_multifd_zstd_level());
  63    if (ZSTD_isError(res)) {
  64        ZSTD_freeCStream(z->zcs);
  65        g_free(z);
  66        error_setg(errp, "multifd %d: initCStream failed with error %s",
  67                   p->id, ZSTD_getErrorName(res));
  68        return -1;
  69    }
  70    /* We will never have more than page_count pages */
  71    z->zbuff_len = page_count * qemu_target_page_size();
  72    z->zbuff_len *= 2;
  73    z->zbuff = g_try_malloc(z->zbuff_len);
  74    if (!z->zbuff) {
  75        ZSTD_freeCStream(z->zcs);
  76        g_free(z);
  77        error_setg(errp, "multifd %d: out of memory for zbuff", p->id);
  78        return -1;
  79    }
  80    return 0;
  81}
  82
  83/**
  84 * zstd_send_cleanup: cleanup send side
  85 *
  86 * Close the channel and return memory.
  87 *
  88 * @p: Params for the channel that we are using
  89 */
  90static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
  91{
  92    struct zstd_data *z = p->data;
  93
  94    ZSTD_freeCStream(z->zcs);
  95    z->zcs = NULL;
  96    g_free(z->zbuff);
  97    z->zbuff = NULL;
  98    g_free(p->data);
  99    p->data = NULL;
 100}
 101
 102/**
 103 * zstd_send_prepare: prepare date to be able to send
 104 *
 105 * Create a compressed buffer with all the pages that we are going to
 106 * send.
 107 *
 108 * Returns 0 for success or -1 for error
 109 *
 110 * @p: Params for the channel that we are using
 111 * @used: number of pages used
 112 */
 113static int zstd_send_prepare(MultiFDSendParams *p, uint32_t used, Error **errp)
 114{
 115    struct iovec *iov = p->pages->iov;
 116    struct zstd_data *z = p->data;
 117    int ret;
 118    uint32_t i;
 119
 120    z->out.dst = z->zbuff;
 121    z->out.size = z->zbuff_len;
 122    z->out.pos = 0;
 123
 124    for (i = 0; i < used; i++) {
 125        ZSTD_EndDirective flush = ZSTD_e_continue;
 126
 127        if (i == used - 1) {
 128            flush = ZSTD_e_flush;
 129        }
 130        z->in.src = iov[i].iov_base;
 131        z->in.size = iov[i].iov_len;
 132        z->in.pos = 0;
 133
 134        /*
 135         * Welcome to compressStream2 semantics
 136         *
 137         * We need to loop while:
 138         * - return is > 0
 139         * - there is input available
 140         * - there is output space free
 141         */
 142        do {
 143            ret = ZSTD_compressStream2(z->zcs, &z->out, &z->in, flush);
 144        } while (ret > 0 && (z->in.size - z->in.pos > 0)
 145                         && (z->out.size - z->out.pos > 0));
 146        if (ret > 0 && (z->in.size - z->in.pos > 0)) {
 147            error_setg(errp, "multifd %d: compressStream buffer too small",
 148                       p->id);
 149            return -1;
 150        }
 151        if (ZSTD_isError(ret)) {
 152            error_setg(errp, "multifd %d: compressStream error %s",
 153                       p->id, ZSTD_getErrorName(ret));
 154            return -1;
 155        }
 156    }
 157    p->next_packet_size = z->out.pos;
 158    p->flags |= MULTIFD_FLAG_ZSTD;
 159
 160    return 0;
 161}
 162
 163/**
 164 * zstd_send_write: do the actual write of the data
 165 *
 166 * Do the actual write of the comprresed buffer.
 167 *
 168 * Returns 0 for success or -1 for error
 169 *
 170 * @p: Params for the channel that we are using
 171 * @used: number of pages used
 172 * @errp: pointer to an error
 173 */
 174static int zstd_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
 175{
 176    struct zstd_data *z = p->data;
 177
 178    return qio_channel_write_all(p->c, (void *)z->zbuff, p->next_packet_size,
 179                                 errp);
 180}
 181
 182/**
 183 * zstd_recv_setup: setup receive side
 184 *
 185 * Create the compressed channel and buffer.
 186 *
 187 * Returns 0 for success or -1 for error
 188 *
 189 * @p: Params for the channel that we are using
 190 * @errp: pointer to an error
 191 */
 192static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp)
 193{
 194    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
 195    struct zstd_data *z = g_new0(struct zstd_data, 1);
 196    int ret;
 197
 198    p->data = z;
 199    z->zds = ZSTD_createDStream();
 200    if (!z->zds) {
 201        g_free(z);
 202        error_setg(errp, "multifd %d: zstd createDStream failed", p->id);
 203        return -1;
 204    }
 205
 206    ret = ZSTD_initDStream(z->zds);
 207    if (ZSTD_isError(ret)) {
 208        ZSTD_freeDStream(z->zds);
 209        g_free(z);
 210        error_setg(errp, "multifd %d: initDStream failed with error %s",
 211                   p->id, ZSTD_getErrorName(ret));
 212        return -1;
 213    }
 214
 215    /* We will never have more than page_count pages */
 216    z->zbuff_len = page_count * qemu_target_page_size();
 217    /* We know compression "could" use more space */
 218    z->zbuff_len *= 2;
 219    z->zbuff = g_try_malloc(z->zbuff_len);
 220    if (!z->zbuff) {
 221        ZSTD_freeDStream(z->zds);
 222        g_free(z);
 223        error_setg(errp, "multifd %d: out of memory for zbuff", p->id);
 224        return -1;
 225    }
 226    return 0;
 227}
 228
 229/**
 230 * zstd_recv_cleanup: setup receive side
 231 *
 232 * For no compression this function does nothing.
 233 *
 234 * @p: Params for the channel that we are using
 235 */
 236static void zstd_recv_cleanup(MultiFDRecvParams *p)
 237{
 238    struct zstd_data *z = p->data;
 239
 240    ZSTD_freeDStream(z->zds);
 241    z->zds = NULL;
 242    g_free(z->zbuff);
 243    z->zbuff = NULL;
 244    g_free(p->data);
 245    p->data = NULL;
 246}
 247
 248/**
 249 * zstd_recv_pages: read the data from the channel into actual pages
 250 *
 251 * Read the compressed buffer, and uncompress it into the actual
 252 * pages.
 253 *
 254 * Returns 0 for success or -1 for error
 255 *
 256 * @p: Params for the channel that we are using
 257 * @used: number of pages used
 258 * @errp: pointer to an error
 259 */
 260static int zstd_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
 261{
 262    uint32_t in_size = p->next_packet_size;
 263    uint32_t out_size = 0;
 264    uint32_t expected_size = used * qemu_target_page_size();
 265    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
 266    struct zstd_data *z = p->data;
 267    int ret;
 268    int i;
 269
 270    if (flags != MULTIFD_FLAG_ZSTD) {
 271        error_setg(errp, "multifd %d: flags received %x flags expected %x",
 272                   p->id, flags, MULTIFD_FLAG_ZSTD);
 273        return -1;
 274    }
 275    ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp);
 276
 277    if (ret != 0) {
 278        return ret;
 279    }
 280
 281    z->in.src = z->zbuff;
 282    z->in.size = in_size;
 283    z->in.pos = 0;
 284
 285    for (i = 0; i < used; i++) {
 286        struct iovec *iov = &p->pages->iov[i];
 287
 288        z->out.dst = iov->iov_base;
 289        z->out.size = iov->iov_len;
 290        z->out.pos = 0;
 291
 292        /*
 293         * Welcome to decompressStream semantics
 294         *
 295         * We need to loop while:
 296         * - return is > 0
 297         * - there is input available
 298         * - we haven't put out a full page
 299         */
 300        do {
 301            ret = ZSTD_decompressStream(z->zds, &z->out, &z->in);
 302        } while (ret > 0 && (z->in.size - z->in.pos > 0)
 303                         && (z->out.pos < iov->iov_len));
 304        if (ret > 0 && (z->out.pos < iov->iov_len)) {
 305            error_setg(errp, "multifd %d: decompressStream buffer too small",
 306                       p->id);
 307            return -1;
 308        }
 309        if (ZSTD_isError(ret)) {
 310            error_setg(errp, "multifd %d: decompressStream returned %s",
 311                       p->id, ZSTD_getErrorName(ret));
 312            return ret;
 313        }
 314        out_size += z->out.pos;
 315    }
 316    if (out_size != expected_size) {
 317        error_setg(errp, "multifd %d: packet size received %d size expected %d",
 318                   p->id, out_size, expected_size);
 319        return -1;
 320    }
 321    return 0;
 322}
 323
 324static MultiFDMethods multifd_zstd_ops = {
 325    .send_setup = zstd_send_setup,
 326    .send_cleanup = zstd_send_cleanup,
 327    .send_prepare = zstd_send_prepare,
 328    .send_write = zstd_send_write,
 329    .recv_setup = zstd_recv_setup,
 330    .recv_cleanup = zstd_recv_cleanup,
 331    .recv_pages = zstd_recv_pages
 332};
 333
 334static void multifd_zstd_register(void)
 335{
 336    multifd_register_ops(MULTIFD_COMPRESSION_ZSTD, &multifd_zstd_ops);
 337}
 338
 339migration_init(multifd_zstd_register);
 340