qemu/migration/multifd-zlib.c
<<
>>
Prefs
   1/*
   2 * Multifd zlib compression implementation
   3 *
   4 * Copyright (c) 2020 Red Hat Inc
   5 *
   6 * Authors:
   7 *  Juan Quintela <quintela@redhat.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  10 * See the COPYING file in the top-level directory.
  11 */
  12
  13#include "qemu/osdep.h"
  14#include <zlib.h>
  15#include "qemu/rcu.h"
  16#include "exec/ramblock.h"
  17#include "exec/target_page.h"
  18#include "qapi/error.h"
  19#include "migration.h"
  20#include "trace.h"
  21#include "multifd.h"
  22
  23struct zlib_data {
  24    /* stream for compression */
  25    z_stream zs;
  26    /* compressed buffer */
  27    uint8_t *zbuff;
  28    /* size of compressed buffer */
  29    uint32_t zbuff_len;
  30};
  31
  32/* Multifd zlib compression */
  33
  34/**
  35 * zlib_send_setup: setup send side
  36 *
  37 * Setup each channel with zlib compression.
  38 *
  39 * Returns 0 for success or -1 for error
  40 *
  41 * @p: Params for the channel that we are using
  42 * @errp: pointer to an error
  43 */
  44static int zlib_send_setup(MultiFDSendParams *p, Error **errp)
  45{
  46    struct zlib_data *z = g_new0(struct zlib_data, 1);
  47    z_stream *zs = &z->zs;
  48
  49    zs->zalloc = Z_NULL;
  50    zs->zfree = Z_NULL;
  51    zs->opaque = Z_NULL;
  52    if (deflateInit(zs, migrate_multifd_zlib_level()) != Z_OK) {
  53        g_free(z);
  54        error_setg(errp, "multifd %u: deflate init failed", p->id);
  55        return -1;
  56    }
  57    /* This is the maxium size of the compressed buffer */
  58    z->zbuff_len = compressBound(MULTIFD_PACKET_SIZE);
  59    z->zbuff = g_try_malloc(z->zbuff_len);
  60    if (!z->zbuff) {
  61        deflateEnd(&z->zs);
  62        g_free(z);
  63        error_setg(errp, "multifd %u: out of memory for zbuff", p->id);
  64        return -1;
  65    }
  66    p->data = z;
  67    return 0;
  68}
  69
  70/**
  71 * zlib_send_cleanup: cleanup send side
  72 *
  73 * Close the channel and return memory.
  74 *
  75 * @p: Params for the channel that we are using
  76 * @errp: pointer to an error
  77 */
  78static void zlib_send_cleanup(MultiFDSendParams *p, Error **errp)
  79{
  80    struct zlib_data *z = p->data;
  81
  82    deflateEnd(&z->zs);
  83    g_free(z->zbuff);
  84    z->zbuff = NULL;
  85    g_free(p->data);
  86    p->data = NULL;
  87}
  88
  89/**
  90 * zlib_send_prepare: prepare date to be able to send
  91 *
  92 * Create a compressed buffer with all the pages that we are going to
  93 * send.
  94 *
  95 * Returns 0 for success or -1 for error
  96 *
  97 * @p: Params for the channel that we are using
  98 * @errp: pointer to an error
  99 */
 100static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
 101{
 102    struct zlib_data *z = p->data;
 103    size_t page_size = qemu_target_page_size();
 104    z_stream *zs = &z->zs;
 105    uint32_t out_size = 0;
 106    int ret;
 107    uint32_t i;
 108
 109    for (i = 0; i < p->normal_num; i++) {
 110        uint32_t available = z->zbuff_len - out_size;
 111        int flush = Z_NO_FLUSH;
 112
 113        if (i == p->normal_num - 1) {
 114            flush = Z_SYNC_FLUSH;
 115        }
 116
 117        zs->avail_in = page_size;
 118        zs->next_in = p->pages->block->host + p->normal[i];
 119
 120        zs->avail_out = available;
 121        zs->next_out = z->zbuff + out_size;
 122
 123        /*
 124         * Welcome to deflate semantics
 125         *
 126         * We need to loop while:
 127         * - return is Z_OK
 128         * - there are stuff to be compressed
 129         * - there are output space free
 130         */
 131        do {
 132            ret = deflate(zs, flush);
 133        } while (ret == Z_OK && zs->avail_in && zs->avail_out);
 134        if (ret == Z_OK && zs->avail_in) {
 135            error_setg(errp, "multifd %u: deflate failed to compress all input",
 136                       p->id);
 137            return -1;
 138        }
 139        if (ret != Z_OK) {
 140            error_setg(errp, "multifd %u: deflate returned %d instead of Z_OK",
 141                       p->id, ret);
 142            return -1;
 143        }
 144        out_size += available - zs->avail_out;
 145    }
 146    p->iov[p->iovs_num].iov_base = z->zbuff;
 147    p->iov[p->iovs_num].iov_len = out_size;
 148    p->iovs_num++;
 149    p->next_packet_size = out_size;
 150    p->flags |= MULTIFD_FLAG_ZLIB;
 151
 152    return 0;
 153}
 154
 155/**
 156 * zlib_recv_setup: setup receive side
 157 *
 158 * Create the compressed channel and buffer.
 159 *
 160 * Returns 0 for success or -1 for error
 161 *
 162 * @p: Params for the channel that we are using
 163 * @errp: pointer to an error
 164 */
 165static int zlib_recv_setup(MultiFDRecvParams *p, Error **errp)
 166{
 167    struct zlib_data *z = g_new0(struct zlib_data, 1);
 168    z_stream *zs = &z->zs;
 169
 170    p->data = z;
 171    zs->zalloc = Z_NULL;
 172    zs->zfree = Z_NULL;
 173    zs->opaque = Z_NULL;
 174    zs->avail_in = 0;
 175    zs->next_in = Z_NULL;
 176    if (inflateInit(zs) != Z_OK) {
 177        error_setg(errp, "multifd %u: inflate init failed", p->id);
 178        return -1;
 179    }
 180    /* To be safe, we reserve twice the size of the packet */
 181    z->zbuff_len = MULTIFD_PACKET_SIZE * 2;
 182    z->zbuff = g_try_malloc(z->zbuff_len);
 183    if (!z->zbuff) {
 184        inflateEnd(zs);
 185        error_setg(errp, "multifd %u: out of memory for zbuff", p->id);
 186        return -1;
 187    }
 188    return 0;
 189}
 190
 191/**
 192 * zlib_recv_cleanup: setup receive side
 193 *
 194 * For no compression this function does nothing.
 195 *
 196 * @p: Params for the channel that we are using
 197 */
 198static void zlib_recv_cleanup(MultiFDRecvParams *p)
 199{
 200    struct zlib_data *z = p->data;
 201
 202    inflateEnd(&z->zs);
 203    g_free(z->zbuff);
 204    z->zbuff = NULL;
 205    g_free(p->data);
 206    p->data = NULL;
 207}
 208
 209/**
 210 * zlib_recv_pages: read the data from the channel into actual pages
 211 *
 212 * Read the compressed buffer, and uncompress it into the actual
 213 * pages.
 214 *
 215 * Returns 0 for success or -1 for error
 216 *
 217 * @p: Params for the channel that we are using
 218 * @errp: pointer to an error
 219 */
 220static int zlib_recv_pages(MultiFDRecvParams *p, Error **errp)
 221{
 222    struct zlib_data *z = p->data;
 223    size_t page_size = qemu_target_page_size();
 224    z_stream *zs = &z->zs;
 225    uint32_t in_size = p->next_packet_size;
 226    /* we measure the change of total_out */
 227    uint32_t out_size = zs->total_out;
 228    uint32_t expected_size = p->normal_num * page_size;
 229    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
 230    int ret;
 231    int i;
 232
 233    if (flags != MULTIFD_FLAG_ZLIB) {
 234        error_setg(errp, "multifd %u: flags received %x flags expected %x",
 235                   p->id, flags, MULTIFD_FLAG_ZLIB);
 236        return -1;
 237    }
 238    ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp);
 239
 240    if (ret != 0) {
 241        return ret;
 242    }
 243
 244    zs->avail_in = in_size;
 245    zs->next_in = z->zbuff;
 246
 247    for (i = 0; i < p->normal_num; i++) {
 248        int flush = Z_NO_FLUSH;
 249        unsigned long start = zs->total_out;
 250
 251        if (i == p->normal_num - 1) {
 252            flush = Z_SYNC_FLUSH;
 253        }
 254
 255        zs->avail_out = page_size;
 256        zs->next_out = p->host + p->normal[i];
 257
 258        /*
 259         * Welcome to inflate semantics
 260         *
 261         * We need to loop while:
 262         * - return is Z_OK
 263         * - there are input available
 264         * - we haven't completed a full page
 265         */
 266        do {
 267            ret = inflate(zs, flush);
 268        } while (ret == Z_OK && zs->avail_in
 269                             && (zs->total_out - start) < page_size);
 270        if (ret == Z_OK && (zs->total_out - start) < page_size) {
 271            error_setg(errp, "multifd %u: inflate generated too few output",
 272                       p->id);
 273            return -1;
 274        }
 275        if (ret != Z_OK) {
 276            error_setg(errp, "multifd %u: inflate returned %d instead of Z_OK",
 277                       p->id, ret);
 278            return -1;
 279        }
 280    }
 281    out_size = zs->total_out - out_size;
 282    if (out_size != expected_size) {
 283        error_setg(errp, "multifd %u: packet size received %u size expected %u",
 284                   p->id, out_size, expected_size);
 285        return -1;
 286    }
 287    return 0;
 288}
 289
 290static MultiFDMethods multifd_zlib_ops = {
 291    .send_setup = zlib_send_setup,
 292    .send_cleanup = zlib_send_cleanup,
 293    .send_prepare = zlib_send_prepare,
 294    .recv_setup = zlib_recv_setup,
 295    .recv_cleanup = zlib_recv_cleanup,
 296    .recv_pages = zlib_recv_pages
 297};
 298
 299static void multifd_zlib_register(void)
 300{
 301    multifd_register_ops(MULTIFD_COMPRESSION_ZLIB, &multifd_zlib_ops);
 302}
 303
 304migration_init(multifd_zlib_register);
 305