qemu/posix-aio-compat.c
<<
>>
Prefs
   1/*
   2 * QEMU posix-aio emulation
   3 *
   4 * Copyright IBM, Corp. 2008
   5 *
   6 * Authors:
   7 *  Anthony Liguori   <aliguori@us.ibm.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2.  See
  10 * the COPYING file in the top-level directory.
  11 *
  12 */
  13
  14#include <pthread.h>
  15#include <unistd.h>
  16#include <errno.h>
  17#include <time.h>
  18#include <string.h>
  19#include <stdlib.h>
  20#include <stdio.h>
  21#include "osdep.h"
  22
  23#include "posix-aio-compat.h"
  24
  25static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
  26static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  27static pthread_t thread_id;
  28static pthread_attr_t attr;
  29static int max_threads = 64;
  30static int cur_threads = 0;
  31static int idle_threads = 0;
  32static TAILQ_HEAD(, qemu_paiocb) request_list;
  33
  34static void die2(int err, const char *what)
  35{
  36    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
  37    abort();
  38}
  39
  40static void die(const char *what)
  41{
  42    die2(errno, what);
  43}
  44
  45static void mutex_lock(pthread_mutex_t *mutex)
  46{
  47    int ret = pthread_mutex_lock(mutex);
  48    if (ret) die2(ret, "pthread_mutex_lock");
  49}
  50
  51static void mutex_unlock(pthread_mutex_t *mutex)
  52{
  53    int ret = pthread_mutex_unlock(mutex);
  54    if (ret) die2(ret, "pthread_mutex_unlock");
  55}
  56
  57static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
  58                           struct timespec *ts)
  59{
  60    int ret = pthread_cond_timedwait(cond, mutex, ts);
  61    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
  62    return ret;
  63}
  64
  65static void cond_signal(pthread_cond_t *cond)
  66{
  67    int ret = pthread_cond_signal(cond);
  68    if (ret) die2(ret, "pthread_cond_signal");
  69}
  70
  71static void thread_create(pthread_t *thread, pthread_attr_t *attr,
  72                          void *(*start_routine)(void*), void *arg)
  73{
  74    int ret = pthread_create(thread, attr, start_routine, arg);
  75    if (ret) die2(ret, "pthread_create");
  76}
  77
  78static void *aio_thread(void *unused)
  79{
  80    pid_t pid;
  81    sigset_t set;
  82
  83    pid = getpid();
  84
  85    /* block all signals */
  86    if (sigfillset(&set)) die("sigfillset");
  87    if (sigprocmask(SIG_BLOCK, &set, NULL)) die("sigprocmask");
  88
  89    while (1) {
  90        struct qemu_paiocb *aiocb;
  91        size_t offset;
  92        int ret = 0;
  93        qemu_timeval tv;
  94        struct timespec ts;
  95
  96        qemu_gettimeofday(&tv);
  97        ts.tv_sec = tv.tv_sec + 10;
  98        ts.tv_nsec = 0;
  99
 100        mutex_lock(&lock);
 101
 102        while (TAILQ_EMPTY(&request_list) &&
 103               !(ret == ETIMEDOUT)) {
 104            ret = cond_timedwait(&cond, &lock, &ts);
 105        }
 106
 107        if (TAILQ_EMPTY(&request_list))
 108            break;
 109
 110        aiocb = TAILQ_FIRST(&request_list);
 111        TAILQ_REMOVE(&request_list, aiocb, node);
 112
 113        offset = 0;
 114        aiocb->active = 1;
 115
 116        idle_threads--;
 117        mutex_unlock(&lock);
 118
 119        while (offset < aiocb->aio_nbytes) {
 120            ssize_t len;
 121
 122            if (aiocb->is_write)
 123                len = pwrite(aiocb->aio_fildes,
 124                             (const char *)aiocb->aio_buf + offset,
 125                             aiocb->aio_nbytes - offset,
 126                             aiocb->aio_offset + offset);
 127            else
 128                len = pread(aiocb->aio_fildes,
 129                            (char *)aiocb->aio_buf + offset,
 130                            aiocb->aio_nbytes - offset,
 131                            aiocb->aio_offset + offset);
 132
 133            if (len == -1 && errno == EINTR)
 134                continue;
 135            else if (len == -1) {
 136                offset = -errno;
 137                break;
 138            } else if (len == 0)
 139                break;
 140
 141            offset += len;
 142        }
 143
 144        mutex_lock(&lock);
 145        aiocb->ret = offset;
 146        idle_threads++;
 147        mutex_unlock(&lock);
 148
 149        if (kill(pid, aiocb->ev_signo)) die("kill failed");
 150    }
 151
 152    idle_threads--;
 153    cur_threads--;
 154    mutex_unlock(&lock);
 155
 156    return NULL;
 157}
 158
 159static void spawn_thread(void)
 160{
 161    cur_threads++;
 162    idle_threads++;
 163    thread_create(&thread_id, &attr, aio_thread, NULL);
 164}
 165
 166int qemu_paio_init(struct qemu_paioinit *aioinit)
 167{
 168    int ret;
 169
 170    ret = pthread_attr_init(&attr);
 171    if (ret) die2(ret, "pthread_attr_init");
 172
 173    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
 174    if (ret) die2(ret, "pthread_attr_setdetachstate");
 175
 176    TAILQ_INIT(&request_list);
 177
 178    return 0;
 179}
 180
 181static int qemu_paio_submit(struct qemu_paiocb *aiocb, int is_write)
 182{
 183    aiocb->is_write = is_write;
 184    aiocb->ret = -EINPROGRESS;
 185    aiocb->active = 0;
 186    mutex_lock(&lock);
 187    if (idle_threads == 0 && cur_threads < max_threads)
 188        spawn_thread();
 189    TAILQ_INSERT_TAIL(&request_list, aiocb, node);
 190    mutex_unlock(&lock);
 191    cond_signal(&cond);
 192
 193    return 0;
 194}
 195
 196int qemu_paio_read(struct qemu_paiocb *aiocb)
 197{
 198    return qemu_paio_submit(aiocb, 0);
 199}
 200
 201int qemu_paio_write(struct qemu_paiocb *aiocb)
 202{
 203    return qemu_paio_submit(aiocb, 1);
 204}
 205
 206ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
 207{
 208    ssize_t ret;
 209
 210    mutex_lock(&lock);
 211    ret = aiocb->ret;
 212    mutex_unlock(&lock);
 213
 214    return ret;
 215}
 216
 217int qemu_paio_error(struct qemu_paiocb *aiocb)
 218{
 219    ssize_t ret = qemu_paio_return(aiocb);
 220
 221    if (ret < 0)
 222        ret = -ret;
 223    else
 224        ret = 0;
 225
 226    return ret;
 227}
 228
 229int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
 230{
 231    int ret;
 232
 233    mutex_lock(&lock);
 234    if (!aiocb->active) {
 235        TAILQ_REMOVE(&request_list, aiocb, node);
 236        aiocb->ret = -ECANCELED;
 237        ret = QEMU_PAIO_CANCELED;
 238    } else if (aiocb->ret == -EINPROGRESS)
 239        ret = QEMU_PAIO_NOTCANCELED;
 240    else
 241        ret = QEMU_PAIO_ALLDONE;
 242    mutex_unlock(&lock);
 243
 244    return ret;
 245}
 246