qemu/buffered_file.c
<<
>>
Prefs
   1/*
   2 * QEMU buffered QEMUFile
   3 *
   4 * Copyright IBM, Corp. 2008
   5 *
   6 * Authors:
   7 *  Anthony Liguori   <aliguori@us.ibm.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2.  See
  10 * the COPYING file in the top-level directory.
  11 *
  12 */
  13
  14#include "qemu-common.h"
  15#include "hw/hw.h"
  16#include "qemu-timer.h"
  17#include "qemu-char.h"
  18#include "buffered_file.h"
  19
  20//#define DEBUG_BUFFERED_FILE
  21
  22typedef struct QEMUFileBuffered
  23{
  24    BufferedPutFunc *put_buffer;
  25    BufferedPutReadyFunc *put_ready;
  26    BufferedWaitForUnfreezeFunc *wait_for_unfreeze;
  27    BufferedCloseFunc *close;
  28    void *opaque;
  29    QEMUFile *file;
  30    int freeze_output;
  31    size_t bytes_xfer;
  32    size_t xfer_limit;
  33    uint8_t *buffer;
  34    size_t buffer_size;
  35    size_t buffer_capacity;
  36    QEMUTimer *timer;
  37} QEMUFileBuffered;
  38
  39#ifdef DEBUG_BUFFERED_FILE
  40#define DPRINTF(fmt, ...) \
  41    do { printf("buffered-file: " fmt, ## __VA_ARGS__); } while (0)
  42#else
  43#define DPRINTF(fmt, ...) \
  44    do { } while (0)
  45#endif
  46
  47static void buffered_append(QEMUFileBuffered *s,
  48                            const uint8_t *buf, size_t size)
  49{
  50    if (size > (s->buffer_capacity - s->buffer_size)) {
  51        void *tmp;
  52
  53        DPRINTF("increasing buffer capacity from %zu by %zu\n",
  54                s->buffer_capacity, size + 1024);
  55
  56        s->buffer_capacity += size + 1024;
  57
  58        tmp = g_realloc(s->buffer, s->buffer_capacity);
  59        if (tmp == NULL) {
  60            fprintf(stderr, "qemu file buffer expansion failed\n");
  61            exit(1);
  62        }
  63
  64        s->buffer = tmp;
  65    }
  66
  67    memcpy(s->buffer + s->buffer_size, buf, size);
  68    s->buffer_size += size;
  69}
  70
  71static void buffered_flush(QEMUFileBuffered *s)
  72{
  73    size_t offset = 0;
  74    int error;
  75
  76    error = qemu_file_get_error(s->file);
  77    if (error != 0) {
  78        DPRINTF("flush when error, bailing: %s\n", strerror(-error));
  79        return;
  80    }
  81
  82    DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
  83
  84    while (offset < s->buffer_size) {
  85        ssize_t ret;
  86
  87        ret = s->put_buffer(s->opaque, s->buffer + offset,
  88                            s->buffer_size - offset);
  89        if (ret == -EAGAIN) {
  90            DPRINTF("backend not ready, freezing\n");
  91            s->freeze_output = 1;
  92            break;
  93        }
  94
  95        if (ret <= 0) {
  96            DPRINTF("error flushing data, %zd\n", ret);
  97            qemu_file_set_error(s->file, ret);
  98            break;
  99        } else {
 100            DPRINTF("flushed %zd byte(s)\n", ret);
 101            offset += ret;
 102        }
 103    }
 104
 105    DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
 106    memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
 107    s->buffer_size -= offset;
 108}
 109
 110static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
 111{
 112    QEMUFileBuffered *s = opaque;
 113    int offset = 0, error;
 114    ssize_t ret;
 115
 116    DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
 117
 118    error = qemu_file_get_error(s->file);
 119    if (error) {
 120        DPRINTF("flush when error, bailing: %s\n", strerror(-error));
 121        return error;
 122    }
 123
 124    DPRINTF("unfreezing output\n");
 125    s->freeze_output = 0;
 126
 127    buffered_flush(s);
 128
 129    while (!s->freeze_output && offset < size) {
 130        if (s->bytes_xfer > s->xfer_limit) {
 131            DPRINTF("transfer limit exceeded when putting\n");
 132            break;
 133        }
 134
 135        ret = s->put_buffer(s->opaque, buf + offset, size - offset);
 136        if (ret == -EAGAIN) {
 137            DPRINTF("backend not ready, freezing\n");
 138            s->freeze_output = 1;
 139            break;
 140        }
 141
 142        if (ret <= 0) {
 143            DPRINTF("error putting\n");
 144            qemu_file_set_error(s->file, ret);
 145            offset = -EINVAL;
 146            break;
 147        }
 148
 149        DPRINTF("put %zd byte(s)\n", ret);
 150        offset += ret;
 151        s->bytes_xfer += ret;
 152    }
 153
 154    if (offset >= 0) {
 155        DPRINTF("buffering %d bytes\n", size - offset);
 156        buffered_append(s, buf + offset, size - offset);
 157        offset = size;
 158    }
 159
 160    if (pos == 0 && size == 0) {
 161        DPRINTF("file is ready\n");
 162        if (s->bytes_xfer <= s->xfer_limit) {
 163            DPRINTF("notifying client\n");
 164            s->put_ready(s->opaque);
 165        }
 166    }
 167
 168    return offset;
 169}
 170
 171static int buffered_close(void *opaque)
 172{
 173    QEMUFileBuffered *s = opaque;
 174    int ret;
 175
 176    DPRINTF("closing\n");
 177
 178    while (!qemu_file_get_error(s->file) && s->buffer_size) {
 179        buffered_flush(s);
 180        if (s->freeze_output)
 181            s->wait_for_unfreeze(s->opaque);
 182    }
 183
 184    ret = s->close(s->opaque);
 185
 186    qemu_del_timer(s->timer);
 187    qemu_free_timer(s->timer);
 188    g_free(s->buffer);
 189    g_free(s);
 190
 191    return ret;
 192}
 193
 194/*
 195 * The meaning of the return values is:
 196 *   0: We can continue sending
 197 *   1: Time to stop
 198 *   negative: There has been an error
 199 */
 200static int buffered_rate_limit(void *opaque)
 201{
 202    QEMUFileBuffered *s = opaque;
 203    int ret;
 204
 205    ret = qemu_file_get_error(s->file);
 206    if (ret) {
 207        return ret;
 208    }
 209    if (s->freeze_output)
 210        return 1;
 211
 212    if (s->bytes_xfer > s->xfer_limit)
 213        return 1;
 214
 215    return 0;
 216}
 217
 218static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate)
 219{
 220    QEMUFileBuffered *s = opaque;
 221    if (qemu_file_get_error(s->file)) {
 222        goto out;
 223    }
 224    if (new_rate > SIZE_MAX) {
 225        new_rate = SIZE_MAX;
 226    }
 227
 228    s->xfer_limit = new_rate / 10;
 229    
 230out:
 231    return s->xfer_limit;
 232}
 233
 234static int64_t buffered_get_rate_limit(void *opaque)
 235{
 236    QEMUFileBuffered *s = opaque;
 237  
 238    return s->xfer_limit;
 239}
 240
 241static void buffered_rate_tick(void *opaque)
 242{
 243    QEMUFileBuffered *s = opaque;
 244
 245    if (qemu_file_get_error(s->file)) {
 246        buffered_close(s);
 247        return;
 248    }
 249
 250    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
 251
 252    if (s->freeze_output)
 253        return;
 254
 255    s->bytes_xfer = 0;
 256
 257    buffered_flush(s);
 258
 259    /* Add some checks around this */
 260    s->put_ready(s->opaque);
 261}
 262
 263QEMUFile *qemu_fopen_ops_buffered(void *opaque,
 264                                  size_t bytes_per_sec,
 265                                  BufferedPutFunc *put_buffer,
 266                                  BufferedPutReadyFunc *put_ready,
 267                                  BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
 268                                  BufferedCloseFunc *close)
 269{
 270    QEMUFileBuffered *s;
 271
 272    s = g_malloc0(sizeof(*s));
 273
 274    s->opaque = opaque;
 275    s->xfer_limit = bytes_per_sec / 10;
 276    s->put_buffer = put_buffer;
 277    s->put_ready = put_ready;
 278    s->wait_for_unfreeze = wait_for_unfreeze;
 279    s->close = close;
 280
 281    s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
 282                             buffered_close, buffered_rate_limit,
 283                             buffered_set_rate_limit,
 284                             buffered_get_rate_limit);
 285
 286    s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
 287
 288    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
 289
 290    return s->file;
 291}
 292