qemu/migration/multifd-zstd.c
<<
>>
Prefs
   1/*
   2 * Multifd zlib compression implementation
   3 *
   4 * Copyright (c) 2020 Red Hat Inc
   5 *
   6 * Authors:
   7 *  Juan Quintela <quintela@redhat.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  10 * See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include <zstd.h>
  15#include "qemu/rcu.h"
  16#include "exec/ramblock.h"
  17#include "exec/target_page.h"
  18#include "qapi/error.h"
  19#include "migration.h"
  20#include "trace.h"
  21#include "multifd.h"
  22
  23struct zstd_data {
  24    /* stream for compression */
  25    ZSTD_CStream *zcs;
  26    /* stream for decompression */
  27    ZSTD_DStream *zds;
  28    /* buffers */
  29    ZSTD_inBuffer in;
  30    ZSTD_outBuffer out;
  31    /* compressed buffer */
  32    uint8_t *zbuff;
  33    /* size of compressed buffer */
  34    uint32_t zbuff_len;
  35};
  36
  37/* Multifd zstd compression */
  38
  39/**
  40 * zstd_send_setup: setup send side
  41 *
  42 * Setup each channel with zstd compression.
  43 *
  44 * Returns 0 for success or -1 for error
  45 *
  46 * @p: Params for the channel that we are using
  47 * @errp: pointer to an error
  48 */
  49static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
  50{
  51    struct zstd_data *z = g_new0(struct zstd_data, 1);
  52    int res;
  53
  54    p->data = z;
  55    z->zcs = ZSTD_createCStream();
  56    if (!z->zcs) {
  57        g_free(z);
  58        error_setg(errp, "multifd %u: zstd createCStream failed", p->id);
  59        return -1;
  60    }
  61
  62    res = ZSTD_initCStream(z->zcs, migrate_multifd_zstd_level());
  63    if (ZSTD_isError(res)) {
  64        ZSTD_freeCStream(z->zcs);
  65        g_free(z);
  66        error_setg(errp, "multifd %u: initCStream failed with error %s",
  67                   p->id, ZSTD_getErrorName(res));
  68        return -1;
  69    }
  70    /* This is the maxium size of the compressed buffer */
  71    z->zbuff_len = ZSTD_compressBound(MULTIFD_PACKET_SIZE);
  72    z->zbuff = g_try_malloc(z->zbuff_len);
  73    if (!z->zbuff) {
  74        ZSTD_freeCStream(z->zcs);
  75        g_free(z);
  76        error_setg(errp, "multifd %u: out of memory for zbuff", p->id);
  77        return -1;
  78    }
  79    return 0;
  80}
  81
  82/**
  83 * zstd_send_cleanup: cleanup send side
  84 *
  85 * Close the channel and return memory.
  86 *
  87 * @p: Params for the channel that we are using
  88 * @errp: pointer to an error
  89 */
  90static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
  91{
  92    struct zstd_data *z = p->data;
  93
  94    ZSTD_freeCStream(z->zcs);
  95    z->zcs = NULL;
  96    g_free(z->zbuff);
  97    z->zbuff = NULL;
  98    g_free(p->data);
  99    p->data = NULL;
 100}
 101
 102/**
 103 * zstd_send_prepare: prepare date to be able to send
 104 *
 105 * Create a compressed buffer with all the pages that we are going to
 106 * send.
 107 *
 108 * Returns 0 for success or -1 for error
 109 *
 110 * @p: Params for the channel that we are using
 111 * @errp: pointer to an error
 112 */
 113static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
 114{
 115    struct zstd_data *z = p->data;
 116    size_t page_size = qemu_target_page_size();
 117    int ret;
 118    uint32_t i;
 119
 120    z->out.dst = z->zbuff;
 121    z->out.size = z->zbuff_len;
 122    z->out.pos = 0;
 123
 124    for (i = 0; i < p->normal_num; i++) {
 125        ZSTD_EndDirective flush = ZSTD_e_continue;
 126
 127        if (i == p->normal_num - 1) {
 128            flush = ZSTD_e_flush;
 129        }
 130        z->in.src = p->pages->block->host + p->normal[i];
 131        z->in.size = page_size;
 132        z->in.pos = 0;
 133
 134        /*
 135         * Welcome to compressStream2 semantics
 136         *
 137         * We need to loop while:
 138         * - return is > 0
 139         * - there is input available
 140         * - there is output space free
 141         */
 142        do {
 143            ret = ZSTD_compressStream2(z->zcs, &z->out, &z->in, flush);
 144        } while (ret > 0 && (z->in.size - z->in.pos > 0)
 145                         && (z->out.size - z->out.pos > 0));
 146        if (ret > 0 && (z->in.size - z->in.pos > 0)) {
 147            error_setg(errp, "multifd %u: compressStream buffer too small",
 148                       p->id);
 149            return -1;
 150        }
 151        if (ZSTD_isError(ret)) {
 152            error_setg(errp, "multifd %u: compressStream error %s",
 153                       p->id, ZSTD_getErrorName(ret));
 154            return -1;
 155        }
 156    }
 157    p->iov[p->iovs_num].iov_base = z->zbuff;
 158    p->iov[p->iovs_num].iov_len = z->out.pos;
 159    p->iovs_num++;
 160    p->next_packet_size = z->out.pos;
 161    p->flags |= MULTIFD_FLAG_ZSTD;
 162
 163    return 0;
 164}
 165
 166/**
 167 * zstd_recv_setup: setup receive side
 168 *
 169 * Create the compressed channel and buffer.
 170 *
 171 * Returns 0 for success or -1 for error
 172 *
 173 * @p: Params for the channel that we are using
 174 * @errp: pointer to an error
 175 */
 176static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp)
 177{
 178    struct zstd_data *z = g_new0(struct zstd_data, 1);
 179    int ret;
 180
 181    p->data = z;
 182    z->zds = ZSTD_createDStream();
 183    if (!z->zds) {
 184        g_free(z);
 185        error_setg(errp, "multifd %u: zstd createDStream failed", p->id);
 186        return -1;
 187    }
 188
 189    ret = ZSTD_initDStream(z->zds);
 190    if (ZSTD_isError(ret)) {
 191        ZSTD_freeDStream(z->zds);
 192        g_free(z);
 193        error_setg(errp, "multifd %u: initDStream failed with error %s",
 194                   p->id, ZSTD_getErrorName(ret));
 195        return -1;
 196    }
 197
 198    /* To be safe, we reserve twice the size of the packet */
 199    z->zbuff_len = MULTIFD_PACKET_SIZE * 2;
 200    z->zbuff = g_try_malloc(z->zbuff_len);
 201    if (!z->zbuff) {
 202        ZSTD_freeDStream(z->zds);
 203        g_free(z);
 204        error_setg(errp, "multifd %u: out of memory for zbuff", p->id);
 205        return -1;
 206    }
 207    return 0;
 208}
 209
 210/**
 211 * zstd_recv_cleanup: setup receive side
 212 *
 213 * For no compression this function does nothing.
 214 *
 215 * @p: Params for the channel that we are using
 216 */
 217static void zstd_recv_cleanup(MultiFDRecvParams *p)
 218{
 219    struct zstd_data *z = p->data;
 220
 221    ZSTD_freeDStream(z->zds);
 222    z->zds = NULL;
 223    g_free(z->zbuff);
 224    z->zbuff = NULL;
 225    g_free(p->data);
 226    p->data = NULL;
 227}
 228
 229/**
 230 * zstd_recv_pages: read the data from the channel into actual pages
 231 *
 232 * Read the compressed buffer, and uncompress it into the actual
 233 * pages.
 234 *
 235 * Returns 0 for success or -1 for error
 236 *
 237 * @p: Params for the channel that we are using
 238 * @errp: pointer to an error
 239 */
 240static int zstd_recv_pages(MultiFDRecvParams *p, Error **errp)
 241{
 242    uint32_t in_size = p->next_packet_size;
 243    uint32_t out_size = 0;
 244    size_t page_size = qemu_target_page_size();
 245    uint32_t expected_size = p->normal_num * page_size;
 246    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
 247    struct zstd_data *z = p->data;
 248    int ret;
 249    int i;
 250
 251    if (flags != MULTIFD_FLAG_ZSTD) {
 252        error_setg(errp, "multifd %u: flags received %x flags expected %x",
 253                   p->id, flags, MULTIFD_FLAG_ZSTD);
 254        return -1;
 255    }
 256    ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp);
 257
 258    if (ret != 0) {
 259        return ret;
 260    }
 261
 262    z->in.src = z->zbuff;
 263    z->in.size = in_size;
 264    z->in.pos = 0;
 265
 266    for (i = 0; i < p->normal_num; i++) {
 267        z->out.dst = p->host + p->normal[i];
 268        z->out.size = page_size;
 269        z->out.pos = 0;
 270
 271        /*
 272         * Welcome to decompressStream semantics
 273         *
 274         * We need to loop while:
 275         * - return is > 0
 276         * - there is input available
 277         * - we haven't put out a full page
 278         */
 279        do {
 280            ret = ZSTD_decompressStream(z->zds, &z->out, &z->in);
 281        } while (ret > 0 && (z->in.size - z->in.pos > 0)
 282                         && (z->out.pos < page_size));
 283        if (ret > 0 && (z->out.pos < page_size)) {
 284            error_setg(errp, "multifd %u: decompressStream buffer too small",
 285                       p->id);
 286            return -1;
 287        }
 288        if (ZSTD_isError(ret)) {
 289            error_setg(errp, "multifd %u: decompressStream returned %s",
 290                       p->id, ZSTD_getErrorName(ret));
 291            return ret;
 292        }
 293        out_size += z->out.pos;
 294    }
 295    if (out_size != expected_size) {
 296        error_setg(errp, "multifd %u: packet size received %u size expected %u",
 297                   p->id, out_size, expected_size);
 298        return -1;
 299    }
 300    return 0;
 301}
 302
 303static MultiFDMethods multifd_zstd_ops = {
 304    .send_setup = zstd_send_setup,
 305    .send_cleanup = zstd_send_cleanup,
 306    .send_prepare = zstd_send_prepare,
 307    .recv_setup = zstd_recv_setup,
 308    .recv_cleanup = zstd_recv_cleanup,
 309    .recv_pages = zstd_recv_pages
 310};
 311
 312static void multifd_zstd_register(void)
 313{
 314    multifd_register_ops(MULTIFD_COMPRESSION_ZSTD, &multifd_zstd_ops);
 315}
 316
 317migration_init(multifd_zstd_register);
 318