qemu/posix-aio-compat.c
<<
>>
Prefs
   1/*
   2 * QEMU posix-aio emulation
   3 *
   4 * Copyright IBM, Corp. 2008
   5 *
   6 * Authors:
   7 *  Anthony Liguori   <aliguori@us.ibm.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2.  See
  10 * the COPYING file in the top-level directory.
  11 *
  12 */
  13
  14#include <sys/ioctl.h>
  15#include <sys/types.h>
  16#include <pthread.h>
  17#include <unistd.h>
  18#include <errno.h>
  19#include <time.h>
  20#include <signal.h>
  21#include <string.h>
  22#include <stdlib.h>
  23#include <stdio.h>
  24
  25#include "qemu-queue.h"
  26#include "osdep.h"
  27#include "sysemu.h"
  28#include "qemu-common.h"
  29#include "trace.h"
  30#include "block_int.h"
  31
  32#include "block/raw-posix-aio.h"
  33
  34
  35struct qemu_paiocb {
  36    BlockDriverAIOCB common;
  37    int aio_fildes;
  38    union {
  39        struct iovec *aio_iov;
  40        void *aio_ioctl_buf;
  41    };
  42    int aio_niov;
  43    size_t aio_nbytes;
  44#define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
  45    int ev_signo;
  46    off_t aio_offset;
  47
  48    QTAILQ_ENTRY(qemu_paiocb) node;
  49    int aio_type;
  50    ssize_t ret;
  51    int active;
  52    struct qemu_paiocb *next;
  53
  54    int async_context_id;
  55};
  56
  57typedef struct PosixAioState {
  58    int rfd, wfd;
  59    struct qemu_paiocb *first_aio;
  60} PosixAioState;
  61
  62
  63static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
  64static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  65static pthread_t thread_id;
  66static pthread_attr_t attr;
  67static int max_threads = 64;
  68static int cur_threads = 0;
  69static int idle_threads = 0;
  70static QTAILQ_HEAD(, qemu_paiocb) request_list;
  71
  72#ifdef CONFIG_PREADV
  73static int preadv_present = 1;
  74#else
  75static int preadv_present = 0;
  76#endif
  77
  78static void die2(int err, const char *what)
  79{
  80    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
  81    abort();
  82}
  83
  84static void die(const char *what)
  85{
  86    die2(errno, what);
  87}
  88
  89static void mutex_lock(pthread_mutex_t *mutex)
  90{
  91    int ret = pthread_mutex_lock(mutex);
  92    if (ret) die2(ret, "pthread_mutex_lock");
  93}
  94
  95static void mutex_unlock(pthread_mutex_t *mutex)
  96{
  97    int ret = pthread_mutex_unlock(mutex);
  98    if (ret) die2(ret, "pthread_mutex_unlock");
  99}
 100
 101static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
 102                           struct timespec *ts)
 103{
 104    int ret = pthread_cond_timedwait(cond, mutex, ts);
 105    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
 106    return ret;
 107}
 108
 109static void cond_signal(pthread_cond_t *cond)
 110{
 111    int ret = pthread_cond_signal(cond);
 112    if (ret) die2(ret, "pthread_cond_signal");
 113}
 114
 115static void thread_create(pthread_t *thread, pthread_attr_t *attr,
 116                          void *(*start_routine)(void*), void *arg)
 117{
 118    int ret = pthread_create(thread, attr, start_routine, arg);
 119    if (ret) die2(ret, "pthread_create");
 120}
 121
 122static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
 123{
 124    int ret;
 125
 126    ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
 127    if (ret == -1)
 128        return -errno;
 129
 130    /*
 131     * This looks weird, but the aio code only consideres a request
 132     * successful if it has written the number full number of bytes.
 133     *
 134     * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
 135     * so in fact we return the ioctl command here to make posix_aio_read()
 136     * happy..
 137     */
 138    return aiocb->aio_nbytes;
 139}
 140
 141static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
 142{
 143    int ret;
 144
 145    ret = qemu_fdatasync(aiocb->aio_fildes);
 146    if (ret == -1)
 147        return -errno;
 148    return 0;
 149}
 150
 151#ifdef CONFIG_PREADV
 152
 153static ssize_t
 154qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
 155{
 156    return preadv(fd, iov, nr_iov, offset);
 157}
 158
 159static ssize_t
 160qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
 161{
 162    return pwritev(fd, iov, nr_iov, offset);
 163}
 164
 165#else
 166
 167static ssize_t
 168qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
 169{
 170    return -ENOSYS;
 171}
 172
 173static ssize_t
 174qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
 175{
 176    return -ENOSYS;
 177}
 178
 179#endif
 180
 181static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
 182{
 183    size_t offset = 0;
 184    ssize_t len;
 185
 186    do {
 187        if (aiocb->aio_type & QEMU_AIO_WRITE)
 188            len = qemu_pwritev(aiocb->aio_fildes,
 189                               aiocb->aio_iov,
 190                               aiocb->aio_niov,
 191                               aiocb->aio_offset + offset);
 192         else
 193            len = qemu_preadv(aiocb->aio_fildes,
 194                              aiocb->aio_iov,
 195                              aiocb->aio_niov,
 196                              aiocb->aio_offset + offset);
 197    } while (len == -1 && errno == EINTR);
 198
 199    if (len == -1)
 200        return -errno;
 201    return len;
 202}
 203
 204static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
 205{
 206    ssize_t offset = 0;
 207    ssize_t len;
 208
 209    while (offset < aiocb->aio_nbytes) {
 210         if (aiocb->aio_type & QEMU_AIO_WRITE)
 211             len = pwrite(aiocb->aio_fildes,
 212                          (const char *)buf + offset,
 213                          aiocb->aio_nbytes - offset,
 214                          aiocb->aio_offset + offset);
 215         else
 216             len = pread(aiocb->aio_fildes,
 217                         buf + offset,
 218                         aiocb->aio_nbytes - offset,
 219                         aiocb->aio_offset + offset);
 220
 221         if (len == -1 && errno == EINTR)
 222             continue;
 223         else if (len == -1) {
 224             offset = -errno;
 225             break;
 226         } else if (len == 0)
 227             break;
 228
 229         offset += len;
 230    }
 231
 232    return offset;
 233}
 234
 235static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
 236{
 237    ssize_t nbytes;
 238    char *buf;
 239
 240    if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
 241        /*
 242         * If there is just a single buffer, and it is properly aligned
 243         * we can just use plain pread/pwrite without any problems.
 244         */
 245        if (aiocb->aio_niov == 1)
 246             return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
 247
 248        /*
 249         * We have more than one iovec, and all are properly aligned.
 250         *
 251         * Try preadv/pwritev first and fall back to linearizing the
 252         * buffer if it's not supported.
 253         */
 254        if (preadv_present) {
 255            nbytes = handle_aiocb_rw_vector(aiocb);
 256            if (nbytes == aiocb->aio_nbytes)
 257                return nbytes;
 258            if (nbytes < 0 && nbytes != -ENOSYS)
 259                return nbytes;
 260            preadv_present = 0;
 261        }
 262
 263        /*
 264         * XXX(hch): short read/write.  no easy way to handle the reminder
 265         * using these interfaces.  For now retry using plain
 266         * pread/pwrite?
 267         */
 268    }
 269
 270    /*
 271     * Ok, we have to do it the hard way, copy all segments into
 272     * a single aligned buffer.
 273     */
 274    buf = qemu_blockalign(aiocb->common.bs, aiocb->aio_nbytes);
 275    if (aiocb->aio_type & QEMU_AIO_WRITE) {
 276        char *p = buf;
 277        int i;
 278
 279        for (i = 0; i < aiocb->aio_niov; ++i) {
 280            memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
 281            p += aiocb->aio_iov[i].iov_len;
 282        }
 283    }
 284
 285    nbytes = handle_aiocb_rw_linear(aiocb, buf);
 286    if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
 287        char *p = buf;
 288        size_t count = aiocb->aio_nbytes, copy;
 289        int i;
 290
 291        for (i = 0; i < aiocb->aio_niov && count; ++i) {
 292            copy = count;
 293            if (copy > aiocb->aio_iov[i].iov_len)
 294                copy = aiocb->aio_iov[i].iov_len;
 295            memcpy(aiocb->aio_iov[i].iov_base, p, copy);
 296            p     += copy;
 297            count -= copy;
 298        }
 299    }
 300    qemu_vfree(buf);
 301
 302    return nbytes;
 303}
 304
 305static void *aio_thread(void *unused)
 306{
 307    pid_t pid;
 308
 309    pid = getpid();
 310
 311    while (1) {
 312        struct qemu_paiocb *aiocb;
 313        ssize_t ret = 0;
 314        qemu_timeval tv;
 315        struct timespec ts;
 316
 317        qemu_gettimeofday(&tv);
 318        ts.tv_sec = tv.tv_sec + 10;
 319        ts.tv_nsec = 0;
 320
 321        mutex_lock(&lock);
 322
 323        while (QTAILQ_EMPTY(&request_list) &&
 324               !(ret == ETIMEDOUT)) {
 325            ret = cond_timedwait(&cond, &lock, &ts);
 326        }
 327
 328        if (QTAILQ_EMPTY(&request_list))
 329            break;
 330
 331        aiocb = QTAILQ_FIRST(&request_list);
 332        QTAILQ_REMOVE(&request_list, aiocb, node);
 333        aiocb->active = 1;
 334        idle_threads--;
 335        mutex_unlock(&lock);
 336
 337        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
 338        case QEMU_AIO_READ:
 339        case QEMU_AIO_WRITE:
 340            ret = handle_aiocb_rw(aiocb);
 341            break;
 342        case QEMU_AIO_FLUSH:
 343            ret = handle_aiocb_flush(aiocb);
 344            break;
 345        case QEMU_AIO_IOCTL:
 346            ret = handle_aiocb_ioctl(aiocb);
 347            break;
 348        default:
 349            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
 350            ret = -EINVAL;
 351            break;
 352        }
 353
 354        mutex_lock(&lock);
 355        aiocb->ret = ret;
 356        idle_threads++;
 357        mutex_unlock(&lock);
 358
 359        if (kill(pid, aiocb->ev_signo)) die("kill failed");
 360    }
 361
 362    idle_threads--;
 363    cur_threads--;
 364    mutex_unlock(&lock);
 365
 366    return NULL;
 367}
 368
 369static void spawn_thread(void)
 370{
 371    sigset_t set, oldset;
 372
 373    cur_threads++;
 374    idle_threads++;
 375
 376    /* block all signals */
 377    if (sigfillset(&set)) die("sigfillset");
 378    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
 379
 380    thread_create(&thread_id, &attr, aio_thread, NULL);
 381
 382    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
 383}
 384
 385static void qemu_paio_submit(struct qemu_paiocb *aiocb)
 386{
 387    aiocb->ret = -EINPROGRESS;
 388    aiocb->active = 0;
 389    mutex_lock(&lock);
 390    if (idle_threads == 0 && cur_threads < max_threads)
 391        spawn_thread();
 392    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
 393    mutex_unlock(&lock);
 394    cond_signal(&cond);
 395}
 396
 397static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
 398{
 399    ssize_t ret;
 400
 401    mutex_lock(&lock);
 402    ret = aiocb->ret;
 403    mutex_unlock(&lock);
 404
 405    return ret;
 406}
 407
 408static int qemu_paio_error(struct qemu_paiocb *aiocb)
 409{
 410    ssize_t ret = qemu_paio_return(aiocb);
 411
 412    if (ret < 0)
 413        ret = -ret;
 414    else
 415        ret = 0;
 416
 417    return ret;
 418}
 419
 420static int posix_aio_process_queue(void *opaque)
 421{
 422    PosixAioState *s = opaque;
 423    struct qemu_paiocb *acb, **pacb;
 424    int ret;
 425    int result = 0;
 426    int async_context_id = get_async_context_id();
 427
 428    for(;;) {
 429        pacb = &s->first_aio;
 430        for(;;) {
 431            acb = *pacb;
 432            if (!acb)
 433                return result;
 434
 435            /* we're only interested in requests in the right context */
 436            if (acb->async_context_id != async_context_id) {
 437                pacb = &acb->next;
 438                continue;
 439            }
 440
 441            ret = qemu_paio_error(acb);
 442            if (ret == ECANCELED) {
 443                /* remove the request */
 444                *pacb = acb->next;
 445                qemu_aio_release(acb);
 446                result = 1;
 447            } else if (ret != EINPROGRESS) {
 448                /* end of aio */
 449                if (ret == 0) {
 450                    ret = qemu_paio_return(acb);
 451                    if (ret == acb->aio_nbytes)
 452                        ret = 0;
 453                    else
 454                        ret = -EINVAL;
 455                } else {
 456                    ret = -ret;
 457                }
 458                /* remove the request */
 459                *pacb = acb->next;
 460                /* call the callback */
 461                acb->common.cb(acb->common.opaque, ret);
 462                qemu_aio_release(acb);
 463                result = 1;
 464                break;
 465            } else {
 466                pacb = &acb->next;
 467            }
 468        }
 469    }
 470
 471    return result;
 472}
 473
 474static void posix_aio_read(void *opaque)
 475{
 476    PosixAioState *s = opaque;
 477    ssize_t len;
 478
 479    /* read all bytes from signal pipe */
 480    for (;;) {
 481        char bytes[16];
 482
 483        len = read(s->rfd, bytes, sizeof(bytes));
 484        if (len == -1 && errno == EINTR)
 485            continue; /* try again */
 486        if (len == sizeof(bytes))
 487            continue; /* more to read */
 488        break;
 489    }
 490
 491    posix_aio_process_queue(s);
 492}
 493
 494static int posix_aio_flush(void *opaque)
 495{
 496    PosixAioState *s = opaque;
 497    return !!s->first_aio;
 498}
 499
 500static PosixAioState *posix_aio_state;
 501
 502static void aio_signal_handler(int signum)
 503{
 504    if (posix_aio_state) {
 505        char byte = 0;
 506        ssize_t ret;
 507
 508        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
 509        if (ret < 0 && errno != EAGAIN)
 510            die("write()");
 511    }
 512
 513    qemu_service_io();
 514}
 515
 516static void paio_remove(struct qemu_paiocb *acb)
 517{
 518    struct qemu_paiocb **pacb;
 519
 520    /* remove the callback from the queue */
 521    pacb = &posix_aio_state->first_aio;
 522    for(;;) {
 523        if (*pacb == NULL) {
 524            fprintf(stderr, "paio_remove: aio request not found!\n");
 525            break;
 526        } else if (*pacb == acb) {
 527            *pacb = acb->next;
 528            qemu_aio_release(acb);
 529            break;
 530        }
 531        pacb = &(*pacb)->next;
 532    }
 533}
 534
 535static void paio_cancel(BlockDriverAIOCB *blockacb)
 536{
 537    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
 538    int active = 0;
 539
 540    mutex_lock(&lock);
 541    if (!acb->active) {
 542        QTAILQ_REMOVE(&request_list, acb, node);
 543        acb->ret = -ECANCELED;
 544    } else if (acb->ret == -EINPROGRESS) {
 545        active = 1;
 546    }
 547    mutex_unlock(&lock);
 548
 549    if (active) {
 550        /* fail safe: if the aio could not be canceled, we wait for
 551           it */
 552        while (qemu_paio_error(acb) == EINPROGRESS)
 553            ;
 554    }
 555
 556    paio_remove(acb);
 557}
 558
 559static AIOPool raw_aio_pool = {
 560    .aiocb_size         = sizeof(struct qemu_paiocb),
 561    .cancel             = paio_cancel,
 562};
 563
 564BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
 565        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
 566        BlockDriverCompletionFunc *cb, void *opaque, int type)
 567{
 568    struct qemu_paiocb *acb;
 569
 570    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
 571    if (!acb)
 572        return NULL;
 573    acb->aio_type = type;
 574    acb->aio_fildes = fd;
 575    acb->ev_signo = SIGUSR2;
 576    acb->async_context_id = get_async_context_id();
 577
 578    if (qiov) {
 579        acb->aio_iov = qiov->iov;
 580        acb->aio_niov = qiov->niov;
 581    }
 582    acb->aio_nbytes = nb_sectors * 512;
 583    acb->aio_offset = sector_num * 512;
 584
 585    acb->next = posix_aio_state->first_aio;
 586    posix_aio_state->first_aio = acb;
 587
 588    trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
 589    qemu_paio_submit(acb);
 590    return &acb->common;
 591}
 592
 593BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
 594        unsigned long int req, void *buf,
 595        BlockDriverCompletionFunc *cb, void *opaque)
 596{
 597    struct qemu_paiocb *acb;
 598
 599    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
 600    if (!acb)
 601        return NULL;
 602    acb->aio_type = QEMU_AIO_IOCTL;
 603    acb->aio_fildes = fd;
 604    acb->ev_signo = SIGUSR2;
 605    acb->async_context_id = get_async_context_id();
 606    acb->aio_offset = 0;
 607    acb->aio_ioctl_buf = buf;
 608    acb->aio_ioctl_cmd = req;
 609
 610    acb->next = posix_aio_state->first_aio;
 611    posix_aio_state->first_aio = acb;
 612
 613    qemu_paio_submit(acb);
 614    return &acb->common;
 615}
 616
 617int paio_init(void)
 618{
 619    struct sigaction act;
 620    PosixAioState *s;
 621    int fds[2];
 622    int ret;
 623
 624    if (posix_aio_state)
 625        return 0;
 626
 627    s = qemu_malloc(sizeof(PosixAioState));
 628
 629    sigfillset(&act.sa_mask);
 630    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
 631    act.sa_handler = aio_signal_handler;
 632    sigaction(SIGUSR2, &act, NULL);
 633
 634    s->first_aio = NULL;
 635    if (qemu_pipe(fds) == -1) {
 636        fprintf(stderr, "failed to create pipe\n");
 637        return -1;
 638    }
 639
 640    s->rfd = fds[0];
 641    s->wfd = fds[1];
 642
 643    fcntl(s->rfd, F_SETFL, O_NONBLOCK);
 644    fcntl(s->wfd, F_SETFL, O_NONBLOCK);
 645
 646    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
 647        posix_aio_process_queue, s);
 648
 649    ret = pthread_attr_init(&attr);
 650    if (ret)
 651        die2(ret, "pthread_attr_init");
 652
 653    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
 654    if (ret)
 655        die2(ret, "pthread_attr_setdetachstate");
 656
 657    QTAILQ_INIT(&request_list);
 658
 659    posix_aio_state = s;
 660    return 0;
 661}
 662