qemu/buffered_file.c
<<
>>
Prefs
   1/*
   2 * QEMU buffered QEMUFile
   3 *
   4 * Copyright IBM, Corp. 2008
   5 *
   6 * Authors:
   7 *  Anthony Liguori   <aliguori@us.ibm.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2.  See
  10 * the COPYING file in the top-level directory.
  11 *
  12 */
  13
  14#include "qemu-common.h"
  15#include "hw/hw.h"
  16#include "qemu-timer.h"
  17#include "qemu-char.h"
  18#include "buffered_file.h"
  19
  20//#define DEBUG_BUFFERED_FILE
  21
  22typedef struct QEMUFileBuffered
  23{
  24    BufferedPutFunc *put_buffer;
  25    BufferedPutReadyFunc *put_ready;
  26    BufferedWaitForUnfreezeFunc *wait_for_unfreeze;
  27    BufferedCloseFunc *close;
  28    void *opaque;
  29    QEMUFile *file;
  30    int has_error;
  31    int freeze_output;
  32    size_t bytes_xfer;
  33    size_t xfer_limit;
  34    uint8_t *buffer;
  35    size_t buffer_size;
  36    size_t buffer_capacity;
  37    QEMUTimer *timer;
  38} QEMUFileBuffered;
  39
  40#ifdef DEBUG_BUFFERED_FILE
  41#define DPRINTF(fmt, ...) \
  42    do { printf("buffered-file: " fmt, ## __VA_ARGS__); } while (0)
  43#else
  44#define DPRINTF(fmt, ...) \
  45    do { } while (0)
  46#endif
  47
  48static void buffered_append(QEMUFileBuffered *s,
  49                            const uint8_t *buf, size_t size)
  50{
  51    if (size > (s->buffer_capacity - s->buffer_size)) {
  52        void *tmp;
  53
  54        DPRINTF("increasing buffer capacity from %zu by %zu\n",
  55                s->buffer_capacity, size + 1024);
  56
  57        s->buffer_capacity += size + 1024;
  58
  59        tmp = qemu_realloc(s->buffer, s->buffer_capacity);
  60        if (tmp == NULL) {
  61            fprintf(stderr, "qemu file buffer expansion failed\n");
  62            exit(1);
  63        }
  64
  65        s->buffer = tmp;
  66    }
  67
  68    memcpy(s->buffer + s->buffer_size, buf, size);
  69    s->buffer_size += size;
  70}
  71
  72static void buffered_flush(QEMUFileBuffered *s)
  73{
  74    size_t offset = 0;
  75
  76    if (s->has_error) {
  77        DPRINTF("flush when error, bailing\n");
  78        return;
  79    }
  80
  81    DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
  82
  83    while (offset < s->buffer_size) {
  84        ssize_t ret;
  85
  86        ret = s->put_buffer(s->opaque, s->buffer + offset,
  87                            s->buffer_size - offset);
  88        if (ret == -EAGAIN) {
  89            DPRINTF("backend not ready, freezing\n");
  90            s->freeze_output = 1;
  91            break;
  92        }
  93
  94        if (ret <= 0) {
  95            DPRINTF("error flushing data, %zd\n", ret);
  96            s->has_error = 1;
  97            break;
  98        } else {
  99            DPRINTF("flushed %zd byte(s)\n", ret);
 100            offset += ret;
 101        }
 102    }
 103
 104    DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
 105    memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
 106    s->buffer_size -= offset;
 107}
 108
 109static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
 110{
 111    QEMUFileBuffered *s = opaque;
 112    int offset = 0;
 113    ssize_t ret;
 114
 115    DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
 116
 117    if (s->has_error) {
 118        DPRINTF("flush when error, bailing\n");
 119        return -EINVAL;
 120    }
 121
 122    DPRINTF("unfreezing output\n");
 123    s->freeze_output = 0;
 124
 125    buffered_flush(s);
 126
 127    while (!s->freeze_output && offset < size) {
 128        if (s->bytes_xfer > s->xfer_limit) {
 129            DPRINTF("transfer limit exceeded when putting\n");
 130            break;
 131        }
 132
 133        ret = s->put_buffer(s->opaque, buf + offset, size - offset);
 134        if (ret == -EAGAIN) {
 135            DPRINTF("backend not ready, freezing\n");
 136            s->freeze_output = 1;
 137            break;
 138        }
 139
 140        if (ret <= 0) {
 141            DPRINTF("error putting\n");
 142            s->has_error = 1;
 143            offset = -EINVAL;
 144            break;
 145        }
 146
 147        DPRINTF("put %zd byte(s)\n", ret);
 148        offset += ret;
 149        s->bytes_xfer += ret;
 150    }
 151
 152    if (offset >= 0) {
 153        DPRINTF("buffering %d bytes\n", size - offset);
 154        buffered_append(s, buf + offset, size - offset);
 155        offset = size;
 156    }
 157
 158    if (pos == 0 && size == 0) {
 159        DPRINTF("file is ready\n");
 160        if (s->bytes_xfer <= s->xfer_limit) {
 161            DPRINTF("notifying client\n");
 162            s->put_ready(s->opaque);
 163        }
 164    }
 165
 166    return offset;
 167}
 168
 169static int buffered_close(void *opaque)
 170{
 171    QEMUFileBuffered *s = opaque;
 172    int ret;
 173
 174    DPRINTF("closing\n");
 175
 176    while (!s->has_error && s->buffer_size) {
 177        buffered_flush(s);
 178        if (s->freeze_output)
 179            s->wait_for_unfreeze(s);
 180    }
 181
 182    ret = s->close(s->opaque);
 183
 184    qemu_del_timer(s->timer);
 185    qemu_free_timer(s->timer);
 186    qemu_free(s->buffer);
 187    qemu_free(s);
 188
 189    return ret;
 190}
 191
 192static int buffered_rate_limit(void *opaque)
 193{
 194    QEMUFileBuffered *s = opaque;
 195
 196    if (s->has_error)
 197        return 0;
 198
 199    if (s->freeze_output)
 200        return 1;
 201
 202    if (s->bytes_xfer > s->xfer_limit)
 203        return 1;
 204
 205    return 0;
 206}
 207
 208static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate)
 209{
 210    QEMUFileBuffered *s = opaque;
 211    if (s->has_error)
 212        goto out;
 213
 214    if (new_rate > SIZE_MAX) {
 215        new_rate = SIZE_MAX;
 216    }
 217
 218    s->xfer_limit = new_rate / 10;
 219    
 220out:
 221    return s->xfer_limit;
 222}
 223
 224static int64_t buffered_get_rate_limit(void *opaque)
 225{
 226    QEMUFileBuffered *s = opaque;
 227  
 228    return s->xfer_limit;
 229}
 230
 231static void buffered_rate_tick(void *opaque)
 232{
 233    QEMUFileBuffered *s = opaque;
 234
 235    if (s->has_error) {
 236        buffered_close(s);
 237        return;
 238    }
 239
 240    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
 241
 242    if (s->freeze_output)
 243        return;
 244
 245    s->bytes_xfer = 0;
 246
 247    buffered_flush(s);
 248
 249    /* Add some checks around this */
 250    s->put_ready(s->opaque);
 251}
 252
 253QEMUFile *qemu_fopen_ops_buffered(void *opaque,
 254                                  size_t bytes_per_sec,
 255                                  BufferedPutFunc *put_buffer,
 256                                  BufferedPutReadyFunc *put_ready,
 257                                  BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
 258                                  BufferedCloseFunc *close)
 259{
 260    QEMUFileBuffered *s;
 261
 262    s = qemu_mallocz(sizeof(*s));
 263
 264    s->opaque = opaque;
 265    s->xfer_limit = bytes_per_sec / 10;
 266    s->put_buffer = put_buffer;
 267    s->put_ready = put_ready;
 268    s->wait_for_unfreeze = wait_for_unfreeze;
 269    s->close = close;
 270
 271    s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
 272                             buffered_close, buffered_rate_limit,
 273                             buffered_set_rate_limit,
 274                             buffered_get_rate_limit);
 275
 276    s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
 277
 278    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
 279
 280    return s->file;
 281}
 282