qemu/posix-aio-compat.c
<<
>>
Prefs
   1/*
   2 * QEMU posix-aio emulation
   3 *
   4 * Copyright IBM, Corp. 2008
   5 *
   6 * Authors:
   7 *  Anthony Liguori   <aliguori@us.ibm.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2.  See
  10 * the COPYING file in the top-level directory.
  11 *
  12 * Contributions after 2012-01-13 are licensed under the terms of the
  13 * GNU GPL, version 2 or (at your option) any later version.
  14 */
  15
  16#include <sys/ioctl.h>
  17#include <sys/types.h>
  18#include <pthread.h>
  19#include <unistd.h>
  20#include <errno.h>
  21#include <time.h>
  22#include <string.h>
  23#include <stdlib.h>
  24#include <stdio.h>
  25
  26#include "qemu-queue.h"
  27#include "osdep.h"
  28#include "sysemu.h"
  29#include "qemu-common.h"
  30#include "trace.h"
  31#include "block_int.h"
  32
  33#include "block/raw-posix-aio.h"
  34
  35static void do_spawn_thread(void);
  36
  37struct qemu_paiocb {
  38    BlockDriverAIOCB common;
  39    int aio_fildes;
  40    union {
  41        struct iovec *aio_iov;
  42        void *aio_ioctl_buf;
  43    };
  44    int aio_niov;
  45    size_t aio_nbytes;
  46#define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
  47    off_t aio_offset;
  48
  49    QTAILQ_ENTRY(qemu_paiocb) node;
  50    int aio_type;
  51    ssize_t ret;
  52    int active;
  53    struct qemu_paiocb *next;
  54};
  55
  56typedef struct PosixAioState {
  57    int rfd, wfd;
  58    struct qemu_paiocb *first_aio;
  59} PosixAioState;
  60
  61
  62static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
  63static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  64static pthread_t thread_id;
  65static pthread_attr_t attr;
  66static int max_threads = 64;
  67static int cur_threads = 0;
  68static int idle_threads = 0;
  69static int new_threads = 0;     /* backlog of threads we need to create */
  70static int pending_threads = 0; /* threads created but not running yet */
  71static QEMUBH *new_thread_bh;
  72static QTAILQ_HEAD(, qemu_paiocb) request_list;
  73
  74#ifdef CONFIG_PREADV
  75static int preadv_present = 1;
  76#else
  77static int preadv_present = 0;
  78#endif
  79
  80static void die2(int err, const char *what)
  81{
  82    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
  83    abort();
  84}
  85
  86static void die(const char *what)
  87{
  88    die2(errno, what);
  89}
  90
  91static void mutex_lock(pthread_mutex_t *mutex)
  92{
  93    int ret = pthread_mutex_lock(mutex);
  94    if (ret) die2(ret, "pthread_mutex_lock");
  95}
  96
  97static void mutex_unlock(pthread_mutex_t *mutex)
  98{
  99    int ret = pthread_mutex_unlock(mutex);
 100    if (ret) die2(ret, "pthread_mutex_unlock");
 101}
 102
 103static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
 104                           struct timespec *ts)
 105{
 106    int ret = pthread_cond_timedwait(cond, mutex, ts);
 107    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
 108    return ret;
 109}
 110
 111static void cond_signal(pthread_cond_t *cond)
 112{
 113    int ret = pthread_cond_signal(cond);
 114    if (ret) die2(ret, "pthread_cond_signal");
 115}
 116
 117static void thread_create(pthread_t *thread, pthread_attr_t *attr,
 118                          void *(*start_routine)(void*), void *arg)
 119{
 120    int ret = pthread_create(thread, attr, start_routine, arg);
 121    if (ret) die2(ret, "pthread_create");
 122}
 123
 124static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
 125{
 126    int ret;
 127
 128    ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
 129    if (ret == -1)
 130        return -errno;
 131
 132    /*
 133     * This looks weird, but the aio code only considers a request
 134     * successful if it has written the full number of bytes.
 135     *
 136     * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
 137     * so in fact we return the ioctl command here to make posix_aio_read()
 138     * happy..
 139     */
 140    return aiocb->aio_nbytes;
 141}
 142
 143static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
 144{
 145    int ret;
 146
 147    ret = qemu_fdatasync(aiocb->aio_fildes);
 148    if (ret == -1)
 149        return -errno;
 150    return 0;
 151}
 152
 153#ifdef CONFIG_PREADV
 154
 155static ssize_t
 156qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
 157{
 158    return preadv(fd, iov, nr_iov, offset);
 159}
 160
 161static ssize_t
 162qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
 163{
 164    return pwritev(fd, iov, nr_iov, offset);
 165}
 166
 167#else
 168
 169static ssize_t
 170qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
 171{
 172    return -ENOSYS;
 173}
 174
 175static ssize_t
 176qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
 177{
 178    return -ENOSYS;
 179}
 180
 181#endif
 182
 183static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
 184{
 185    ssize_t len;
 186
 187    do {
 188        if (aiocb->aio_type & QEMU_AIO_WRITE)
 189            len = qemu_pwritev(aiocb->aio_fildes,
 190                               aiocb->aio_iov,
 191                               aiocb->aio_niov,
 192                               aiocb->aio_offset);
 193         else
 194            len = qemu_preadv(aiocb->aio_fildes,
 195                              aiocb->aio_iov,
 196                              aiocb->aio_niov,
 197                              aiocb->aio_offset);
 198    } while (len == -1 && errno == EINTR);
 199
 200    if (len == -1)
 201        return -errno;
 202    return len;
 203}
 204
 205/*
 206 * Read/writes the data to/from a given linear buffer.
 207 *
 208 * Returns the number of bytes handles or -errno in case of an error. Short
 209 * reads are only returned if the end of the file is reached.
 210 */
 211static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
 212{
 213    ssize_t offset = 0;
 214    ssize_t len;
 215
 216    while (offset < aiocb->aio_nbytes) {
 217         if (aiocb->aio_type & QEMU_AIO_WRITE)
 218             len = pwrite(aiocb->aio_fildes,
 219                          (const char *)buf + offset,
 220                          aiocb->aio_nbytes - offset,
 221                          aiocb->aio_offset + offset);
 222         else
 223             len = pread(aiocb->aio_fildes,
 224                         buf + offset,
 225                         aiocb->aio_nbytes - offset,
 226                         aiocb->aio_offset + offset);
 227
 228         if (len == -1 && errno == EINTR)
 229             continue;
 230         else if (len == -1) {
 231             offset = -errno;
 232             break;
 233         } else if (len == 0)
 234             break;
 235
 236         offset += len;
 237    }
 238
 239    return offset;
 240}
 241
 242static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
 243{
 244    ssize_t nbytes;
 245    char *buf;
 246
 247    if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
 248        /*
 249         * If there is just a single buffer, and it is properly aligned
 250         * we can just use plain pread/pwrite without any problems.
 251         */
 252        if (aiocb->aio_niov == 1)
 253             return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
 254
 255        /*
 256         * We have more than one iovec, and all are properly aligned.
 257         *
 258         * Try preadv/pwritev first and fall back to linearizing the
 259         * buffer if it's not supported.
 260         */
 261        if (preadv_present) {
 262            nbytes = handle_aiocb_rw_vector(aiocb);
 263            if (nbytes == aiocb->aio_nbytes)
 264                return nbytes;
 265            if (nbytes < 0 && nbytes != -ENOSYS)
 266                return nbytes;
 267            preadv_present = 0;
 268        }
 269
 270        /*
 271         * XXX(hch): short read/write.  no easy way to handle the reminder
 272         * using these interfaces.  For now retry using plain
 273         * pread/pwrite?
 274         */
 275    }
 276
 277    /*
 278     * Ok, we have to do it the hard way, copy all segments into
 279     * a single aligned buffer.
 280     */
 281    buf = qemu_blockalign(aiocb->common.bs, aiocb->aio_nbytes);
 282    if (aiocb->aio_type & QEMU_AIO_WRITE) {
 283        char *p = buf;
 284        int i;
 285
 286        for (i = 0; i < aiocb->aio_niov; ++i) {
 287            memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
 288            p += aiocb->aio_iov[i].iov_len;
 289        }
 290    }
 291
 292    nbytes = handle_aiocb_rw_linear(aiocb, buf);
 293    if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
 294        char *p = buf;
 295        size_t count = aiocb->aio_nbytes, copy;
 296        int i;
 297
 298        for (i = 0; i < aiocb->aio_niov && count; ++i) {
 299            copy = count;
 300            if (copy > aiocb->aio_iov[i].iov_len)
 301                copy = aiocb->aio_iov[i].iov_len;
 302            memcpy(aiocb->aio_iov[i].iov_base, p, copy);
 303            p     += copy;
 304            count -= copy;
 305        }
 306    }
 307    qemu_vfree(buf);
 308
 309    return nbytes;
 310}
 311
 312static void posix_aio_notify_event(void);
 313
 314static void *aio_thread(void *unused)
 315{
 316    mutex_lock(&lock);
 317    pending_threads--;
 318    mutex_unlock(&lock);
 319    do_spawn_thread();
 320
 321    while (1) {
 322        struct qemu_paiocb *aiocb;
 323        ssize_t ret = 0;
 324        qemu_timeval tv;
 325        struct timespec ts;
 326
 327        qemu_gettimeofday(&tv);
 328        ts.tv_sec = tv.tv_sec + 10;
 329        ts.tv_nsec = 0;
 330
 331        mutex_lock(&lock);
 332
 333        while (QTAILQ_EMPTY(&request_list) &&
 334               !(ret == ETIMEDOUT)) {
 335            idle_threads++;
 336            ret = cond_timedwait(&cond, &lock, &ts);
 337            idle_threads--;
 338        }
 339
 340        if (QTAILQ_EMPTY(&request_list))
 341            break;
 342
 343        aiocb = QTAILQ_FIRST(&request_list);
 344        QTAILQ_REMOVE(&request_list, aiocb, node);
 345        aiocb->active = 1;
 346        mutex_unlock(&lock);
 347
 348        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
 349        case QEMU_AIO_READ:
 350            ret = handle_aiocb_rw(aiocb);
 351            if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->common.bs->growable) {
 352                /* A short read means that we have reached EOF. Pad the buffer
 353                 * with zeros for bytes after EOF. */
 354                QEMUIOVector qiov;
 355
 356                qemu_iovec_init_external(&qiov, aiocb->aio_iov,
 357                                         aiocb->aio_niov);
 358                qemu_iovec_memset_skip(&qiov, 0, aiocb->aio_nbytes - ret, ret);
 359
 360                ret = aiocb->aio_nbytes;
 361            }
 362            break;
 363        case QEMU_AIO_WRITE:
 364            ret = handle_aiocb_rw(aiocb);
 365            break;
 366        case QEMU_AIO_FLUSH:
 367            ret = handle_aiocb_flush(aiocb);
 368            break;
 369        case QEMU_AIO_IOCTL:
 370            ret = handle_aiocb_ioctl(aiocb);
 371            break;
 372        default:
 373            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
 374            ret = -EINVAL;
 375            break;
 376        }
 377
 378        mutex_lock(&lock);
 379        aiocb->ret = ret;
 380        mutex_unlock(&lock);
 381
 382        posix_aio_notify_event();
 383    }
 384
 385    cur_threads--;
 386    mutex_unlock(&lock);
 387
 388    return NULL;
 389}
 390
 391static void do_spawn_thread(void)
 392{
 393    sigset_t set, oldset;
 394
 395    mutex_lock(&lock);
 396    if (!new_threads) {
 397        mutex_unlock(&lock);
 398        return;
 399    }
 400
 401    new_threads--;
 402    pending_threads++;
 403
 404    mutex_unlock(&lock);
 405
 406    /* block all signals */
 407    if (sigfillset(&set)) die("sigfillset");
 408    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
 409
 410    thread_create(&thread_id, &attr, aio_thread, NULL);
 411
 412    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
 413}
 414
 415static void spawn_thread_bh_fn(void *opaque)
 416{
 417    do_spawn_thread();
 418}
 419
 420static void spawn_thread(void)
 421{
 422    cur_threads++;
 423    new_threads++;
 424    /* If there are threads being created, they will spawn new workers, so
 425     * we don't spend time creating many threads in a loop holding a mutex or
 426     * starving the current vcpu.
 427     *
 428     * If there are no idle threads, ask the main thread to create one, so we
 429     * inherit the correct affinity instead of the vcpu affinity.
 430     */
 431    if (!pending_threads) {
 432        qemu_bh_schedule(new_thread_bh);
 433    }
 434}
 435
 436static void qemu_paio_submit(struct qemu_paiocb *aiocb)
 437{
 438    aiocb->ret = -EINPROGRESS;
 439    aiocb->active = 0;
 440    mutex_lock(&lock);
 441    if (idle_threads == 0 && cur_threads < max_threads)
 442        spawn_thread();
 443    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
 444    mutex_unlock(&lock);
 445    cond_signal(&cond);
 446}
 447
 448static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
 449{
 450    ssize_t ret;
 451
 452    mutex_lock(&lock);
 453    ret = aiocb->ret;
 454    mutex_unlock(&lock);
 455
 456    return ret;
 457}
 458
 459static int qemu_paio_error(struct qemu_paiocb *aiocb)
 460{
 461    ssize_t ret = qemu_paio_return(aiocb);
 462
 463    if (ret < 0)
 464        ret = -ret;
 465    else
 466        ret = 0;
 467
 468    return ret;
 469}
 470
 471static void posix_aio_read(void *opaque)
 472{
 473    PosixAioState *s = opaque;
 474    struct qemu_paiocb *acb, **pacb;
 475    int ret;
 476    ssize_t len;
 477
 478    /* read all bytes from signal pipe */
 479    for (;;) {
 480        char bytes[16];
 481
 482        len = read(s->rfd, bytes, sizeof(bytes));
 483        if (len == -1 && errno == EINTR)
 484            continue; /* try again */
 485        if (len == sizeof(bytes))
 486            continue; /* more to read */
 487        break;
 488    }
 489
 490    for(;;) {
 491        pacb = &s->first_aio;
 492        for(;;) {
 493            acb = *pacb;
 494            if (!acb)
 495                return;
 496
 497            ret = qemu_paio_error(acb);
 498            if (ret == ECANCELED) {
 499                /* remove the request */
 500                *pacb = acb->next;
 501                qemu_aio_release(acb);
 502            } else if (ret != EINPROGRESS) {
 503                /* end of aio */
 504                if (ret == 0) {
 505                    ret = qemu_paio_return(acb);
 506                    if (ret == acb->aio_nbytes)
 507                        ret = 0;
 508                    else
 509                        ret = -EINVAL;
 510                } else {
 511                    ret = -ret;
 512                }
 513
 514                trace_paio_complete(acb, acb->common.opaque, ret);
 515
 516                /* remove the request */
 517                *pacb = acb->next;
 518                /* call the callback */
 519                acb->common.cb(acb->common.opaque, ret);
 520                qemu_aio_release(acb);
 521                break;
 522            } else {
 523                pacb = &acb->next;
 524            }
 525        }
 526    }
 527}
 528
 529static int posix_aio_flush(void *opaque)
 530{
 531    PosixAioState *s = opaque;
 532    return !!s->first_aio;
 533}
 534
 535static PosixAioState *posix_aio_state;
 536
 537static void posix_aio_notify_event(void)
 538{
 539    char byte = 0;
 540    ssize_t ret;
 541
 542    ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
 543    if (ret < 0 && errno != EAGAIN)
 544        die("write()");
 545}
 546
 547static void paio_remove(struct qemu_paiocb *acb)
 548{
 549    struct qemu_paiocb **pacb;
 550
 551    /* remove the callback from the queue */
 552    pacb = &posix_aio_state->first_aio;
 553    for(;;) {
 554        if (*pacb == NULL) {
 555            fprintf(stderr, "paio_remove: aio request not found!\n");
 556            break;
 557        } else if (*pacb == acb) {
 558            *pacb = acb->next;
 559            qemu_aio_release(acb);
 560            break;
 561        }
 562        pacb = &(*pacb)->next;
 563    }
 564}
 565
 566static void paio_cancel(BlockDriverAIOCB *blockacb)
 567{
 568    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
 569    int active = 0;
 570
 571    trace_paio_cancel(acb, acb->common.opaque);
 572
 573    mutex_lock(&lock);
 574    if (!acb->active) {
 575        QTAILQ_REMOVE(&request_list, acb, node);
 576        acb->ret = -ECANCELED;
 577    } else if (acb->ret == -EINPROGRESS) {
 578        active = 1;
 579    }
 580    mutex_unlock(&lock);
 581
 582    if (active) {
 583        /* fail safe: if the aio could not be canceled, we wait for
 584           it */
 585        while (qemu_paio_error(acb) == EINPROGRESS)
 586            ;
 587    }
 588
 589    paio_remove(acb);
 590}
 591
 592static AIOPool raw_aio_pool = {
 593    .aiocb_size         = sizeof(struct qemu_paiocb),
 594    .cancel             = paio_cancel,
 595};
 596
 597BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
 598        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
 599        BlockDriverCompletionFunc *cb, void *opaque, int type)
 600{
 601    struct qemu_paiocb *acb;
 602
 603    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
 604    acb->aio_type = type;
 605    acb->aio_fildes = fd;
 606
 607    if (qiov) {
 608        acb->aio_iov = qiov->iov;
 609        acb->aio_niov = qiov->niov;
 610    }
 611    acb->aio_nbytes = nb_sectors * 512;
 612    acb->aio_offset = sector_num * 512;
 613
 614    acb->next = posix_aio_state->first_aio;
 615    posix_aio_state->first_aio = acb;
 616
 617    trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
 618    qemu_paio_submit(acb);
 619    return &acb->common;
 620}
 621
 622BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
 623        unsigned long int req, void *buf,
 624        BlockDriverCompletionFunc *cb, void *opaque)
 625{
 626    struct qemu_paiocb *acb;
 627
 628    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
 629    acb->aio_type = QEMU_AIO_IOCTL;
 630    acb->aio_fildes = fd;
 631    acb->aio_offset = 0;
 632    acb->aio_ioctl_buf = buf;
 633    acb->aio_ioctl_cmd = req;
 634
 635    acb->next = posix_aio_state->first_aio;
 636    posix_aio_state->first_aio = acb;
 637
 638    qemu_paio_submit(acb);
 639    return &acb->common;
 640}
 641
 642int paio_init(void)
 643{
 644    PosixAioState *s;
 645    int fds[2];
 646    int ret;
 647
 648    if (posix_aio_state)
 649        return 0;
 650
 651    s = g_malloc(sizeof(PosixAioState));
 652
 653    s->first_aio = NULL;
 654    if (qemu_pipe(fds) == -1) {
 655        fprintf(stderr, "failed to create pipe\n");
 656        g_free(s);
 657        return -1;
 658    }
 659
 660    s->rfd = fds[0];
 661    s->wfd = fds[1];
 662
 663    fcntl(s->rfd, F_SETFL, O_NONBLOCK);
 664    fcntl(s->wfd, F_SETFL, O_NONBLOCK);
 665
 666    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
 667
 668    ret = pthread_attr_init(&attr);
 669    if (ret)
 670        die2(ret, "pthread_attr_init");
 671
 672    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
 673    if (ret)
 674        die2(ret, "pthread_attr_setdetachstate");
 675
 676    QTAILQ_INIT(&request_list);
 677    new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
 678
 679    posix_aio_state = s;
 680    return 0;
 681}
 682