qemu/migration/multifd-zstd.c
<<
>>
Prefs
   1/*
   2 * Multifd zlib compression implementation
   3 *
   4 * Copyright (c) 2020 Red Hat Inc
   5 *
   6 * Authors:
   7 *  Juan Quintela <quintela@redhat.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  10 * See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include <zstd.h>
  15#include "qemu/rcu.h"
  16#include "system/ramblock.h"
  17#include "exec/target_page.h"
  18#include "qapi/error.h"
  19#include "migration.h"
  20#include "trace.h"
  21#include "options.h"
  22#include "multifd.h"
  23
  24struct zstd_data {
  25    /* stream for compression */
  26    ZSTD_CStream *zcs;
  27    /* stream for decompression */
  28    ZSTD_DStream *zds;
  29    /* buffers */
  30    ZSTD_inBuffer in;
  31    ZSTD_outBuffer out;
  32    /* compressed buffer */
  33    uint8_t *zbuff;
  34    /* size of compressed buffer */
  35    uint32_t zbuff_len;
  36};
  37
  38/* Multifd zstd compression */
  39
  40static int multifd_zstd_send_setup(MultiFDSendParams *p, Error **errp)
  41{
  42    struct zstd_data *z = g_new0(struct zstd_data, 1);
  43    int res;
  44
  45    z->zcs = ZSTD_createCStream();
  46    if (!z->zcs) {
  47        g_free(z);
  48        error_setg(errp, "multifd %u: zstd createCStream failed", p->id);
  49        return -1;
  50    }
  51
  52    res = ZSTD_initCStream(z->zcs, migrate_multifd_zstd_level());
  53    if (ZSTD_isError(res)) {
  54        ZSTD_freeCStream(z->zcs);
  55        g_free(z);
  56        error_setg(errp, "multifd %u: initCStream failed with error %s",
  57                   p->id, ZSTD_getErrorName(res));
  58        return -1;
  59    }
  60    /* This is the maximum size of the compressed buffer */
  61    z->zbuff_len = ZSTD_compressBound(MULTIFD_PACKET_SIZE);
  62    z->zbuff = g_try_malloc(z->zbuff_len);
  63    if (!z->zbuff) {
  64        ZSTD_freeCStream(z->zcs);
  65        g_free(z);
  66        error_setg(errp, "multifd %u: out of memory for zbuff", p->id);
  67        return -1;
  68    }
  69    p->compress_data = z;
  70
  71    /* Needs 2 IOVs, one for packet header and one for compressed data */
  72    p->iov = g_new0(struct iovec, 2);
  73    return 0;
  74}
  75
  76static void multifd_zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
  77{
  78    struct zstd_data *z = p->compress_data;
  79
  80    ZSTD_freeCStream(z->zcs);
  81    z->zcs = NULL;
  82    g_free(z->zbuff);
  83    z->zbuff = NULL;
  84    g_free(p->compress_data);
  85    p->compress_data = NULL;
  86
  87    g_free(p->iov);
  88    p->iov = NULL;
  89}
  90
  91static int multifd_zstd_send_prepare(MultiFDSendParams *p, Error **errp)
  92{
  93    MultiFDPages_t *pages = &p->data->u.ram;
  94    struct zstd_data *z = p->compress_data;
  95    int ret;
  96    uint32_t i;
  97
  98    if (!multifd_send_prepare_common(p)) {
  99        goto out;
 100    }
 101
 102    z->out.dst = z->zbuff;
 103    z->out.size = z->zbuff_len;
 104    z->out.pos = 0;
 105
 106    for (i = 0; i < pages->normal_num; i++) {
 107        ZSTD_EndDirective flush = ZSTD_e_continue;
 108
 109        if (i == pages->normal_num - 1) {
 110            flush = ZSTD_e_flush;
 111        }
 112        z->in.src = pages->block->host + pages->offset[i];
 113        z->in.size = multifd_ram_page_size();
 114        z->in.pos = 0;
 115
 116        /*
 117         * Welcome to compressStream2 semantics
 118         *
 119         * We need to loop while:
 120         * - return is > 0
 121         * - there is input available
 122         * - there is output space free
 123         */
 124        do {
 125            ret = ZSTD_compressStream2(z->zcs, &z->out, &z->in, flush);
 126        } while (ret > 0 && (z->in.size > z->in.pos)
 127                         && (z->out.size > z->out.pos));
 128        if (ret > 0 && (z->in.size > z->in.pos)) {
 129            error_setg(errp, "multifd %u: compressStream buffer too small",
 130                       p->id);
 131            return -1;
 132        }
 133        if (ZSTD_isError(ret)) {
 134            error_setg(errp, "multifd %u: compressStream error %s",
 135                       p->id, ZSTD_getErrorName(ret));
 136            return -1;
 137        }
 138    }
 139    p->iov[p->iovs_num].iov_base = z->zbuff;
 140    p->iov[p->iovs_num].iov_len = z->out.pos;
 141    p->iovs_num++;
 142    p->next_packet_size = z->out.pos;
 143
 144out:
 145    p->flags |= MULTIFD_FLAG_ZSTD;
 146    multifd_send_fill_packet(p);
 147    return 0;
 148}
 149
 150static int multifd_zstd_recv_setup(MultiFDRecvParams *p, Error **errp)
 151{
 152    struct zstd_data *z = g_new0(struct zstd_data, 1);
 153    int ret;
 154
 155    p->compress_data = z;
 156    z->zds = ZSTD_createDStream();
 157    if (!z->zds) {
 158        g_free(z);
 159        error_setg(errp, "multifd %u: zstd createDStream failed", p->id);
 160        return -1;
 161    }
 162
 163    ret = ZSTD_initDStream(z->zds);
 164    if (ZSTD_isError(ret)) {
 165        ZSTD_freeDStream(z->zds);
 166        g_free(z);
 167        error_setg(errp, "multifd %u: initDStream failed with error %s",
 168                   p->id, ZSTD_getErrorName(ret));
 169        return -1;
 170    }
 171
 172    /* To be safe, we reserve twice the size of the packet */
 173    z->zbuff_len = MULTIFD_PACKET_SIZE * 2;
 174    z->zbuff = g_try_malloc(z->zbuff_len);
 175    if (!z->zbuff) {
 176        ZSTD_freeDStream(z->zds);
 177        g_free(z);
 178        error_setg(errp, "multifd %u: out of memory for zbuff", p->id);
 179        return -1;
 180    }
 181    return 0;
 182}
 183
 184static void multifd_zstd_recv_cleanup(MultiFDRecvParams *p)
 185{
 186    struct zstd_data *z = p->compress_data;
 187
 188    ZSTD_freeDStream(z->zds);
 189    z->zds = NULL;
 190    g_free(z->zbuff);
 191    z->zbuff = NULL;
 192    g_free(p->compress_data);
 193    p->compress_data = NULL;
 194}
 195
 196static int multifd_zstd_recv(MultiFDRecvParams *p, Error **errp)
 197{
 198    uint32_t in_size = p->next_packet_size;
 199    uint32_t out_size = 0;
 200    uint32_t page_size = multifd_ram_page_size();
 201    uint32_t expected_size = p->normal_num * page_size;
 202    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
 203    struct zstd_data *z = p->compress_data;
 204    int ret;
 205    int i;
 206
 207    if (flags != MULTIFD_FLAG_ZSTD) {
 208        error_setg(errp, "multifd %u: flags received %x flags expected %x",
 209                   p->id, flags, MULTIFD_FLAG_ZSTD);
 210        return -1;
 211    }
 212
 213    multifd_recv_zero_page_process(p);
 214
 215    if (!p->normal_num) {
 216        assert(in_size == 0);
 217        return 0;
 218    }
 219
 220    ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp);
 221
 222    if (ret != 0) {
 223        return ret;
 224    }
 225
 226    z->in.src = z->zbuff;
 227    z->in.size = in_size;
 228    z->in.pos = 0;
 229
 230    for (i = 0; i < p->normal_num; i++) {
 231        ramblock_recv_bitmap_set_offset(p->block, p->normal[i]);
 232        z->out.dst = p->host + p->normal[i];
 233        z->out.size = page_size;
 234        z->out.pos = 0;
 235
 236        /*
 237         * Welcome to decompressStream semantics
 238         *
 239         * We need to loop while:
 240         * - return is > 0
 241         * - there is input available
 242         * - we haven't put out a full page
 243         */
 244        do {
 245            ret = ZSTD_decompressStream(z->zds, &z->out, &z->in);
 246        } while (ret > 0 && (z->in.size > z->in.pos)
 247                         && (z->out.pos < page_size));
 248        if (ret > 0 && (z->out.pos < page_size)) {
 249            error_setg(errp, "multifd %u: decompressStream buffer too small",
 250                       p->id);
 251            return -1;
 252        }
 253        if (ZSTD_isError(ret)) {
 254            error_setg(errp, "multifd %u: decompressStream returned %s",
 255                       p->id, ZSTD_getErrorName(ret));
 256            return ret;
 257        }
 258        out_size += z->out.pos;
 259    }
 260    if (out_size != expected_size) {
 261        error_setg(errp, "multifd %u: packet size received %u size expected %u",
 262                   p->id, out_size, expected_size);
 263        return -1;
 264    }
 265    return 0;
 266}
 267
 268static const MultiFDMethods multifd_zstd_ops = {
 269    .send_setup = multifd_zstd_send_setup,
 270    .send_cleanup = multifd_zstd_send_cleanup,
 271    .send_prepare = multifd_zstd_send_prepare,
 272    .recv_setup = multifd_zstd_recv_setup,
 273    .recv_cleanup = multifd_zstd_recv_cleanup,
 274    .recv = multifd_zstd_recv
 275};
 276
 277static void multifd_zstd_register(void)
 278{
 279    multifd_register_ops(MULTIFD_COMPRESSION_ZSTD, &multifd_zstd_ops);
 280}
 281
 282migration_init(multifd_zstd_register);
 283