qemu/posix-aio-compat.c
<<
>>
Prefs
   1/*
   2 * QEMU posix-aio emulation
   3 *
   4 * Copyright IBM, Corp. 2008
   5 *
   6 * Authors:
   7 *  Anthony Liguori   <aliguori@us.ibm.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2.  See
  10 * the COPYING file in the top-level directory.
  11 *
  12 * Contributions after 2012-01-13 are licensed under the terms of the
  13 * GNU GPL, version 2 or (at your option) any later version.
  14 */
  15
  16#include <sys/ioctl.h>
  17#include <sys/types.h>
  18#include <pthread.h>
  19#include <unistd.h>
  20#include <errno.h>
  21#include <time.h>
  22#include <string.h>
  23#include <stdlib.h>
  24#include <stdio.h>
  25
  26#include "qemu-queue.h"
  27#include "osdep.h"
  28#include "sysemu.h"
  29#include "qemu-common.h"
  30#include "trace.h"
  31#include "block_int.h"
  32#include "iov.h"
  33
  34#include "block/raw-posix-aio.h"
  35
  36static void do_spawn_thread(void);
  37
  38struct qemu_paiocb {
  39    BlockDriverAIOCB common;
  40    int aio_fildes;
  41    union {
  42        struct iovec *aio_iov;
  43        void *aio_ioctl_buf;
  44    };
  45    int aio_niov;
  46    size_t aio_nbytes;
  47#define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
  48    off_t aio_offset;
  49
  50    QTAILQ_ENTRY(qemu_paiocb) node;
  51    int aio_type;
  52    ssize_t ret;
  53    int active;
  54    struct qemu_paiocb *next;
  55};
  56
  57typedef struct PosixAioState {
  58    int rfd, wfd;
  59    struct qemu_paiocb *first_aio;
  60} PosixAioState;
  61
  62
  63static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
  64static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  65static pthread_t thread_id;
  66static pthread_attr_t attr;
  67static int max_threads = 64;
  68static int cur_threads = 0;
  69static int idle_threads = 0;
  70static int new_threads = 0;     /* backlog of threads we need to create */
  71static int pending_threads = 0; /* threads created but not running yet */
  72static QEMUBH *new_thread_bh;
  73static QTAILQ_HEAD(, qemu_paiocb) request_list;
  74
  75#ifdef CONFIG_PREADV
  76static int preadv_present = 1;
  77#else
  78static int preadv_present = 0;
  79#endif
  80
  81static void die2(int err, const char *what)
  82{
  83    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
  84    abort();
  85}
  86
  87static void die(const char *what)
  88{
  89    die2(errno, what);
  90}
  91
  92static void mutex_lock(pthread_mutex_t *mutex)
  93{
  94    int ret = pthread_mutex_lock(mutex);
  95    if (ret) die2(ret, "pthread_mutex_lock");
  96}
  97
  98static void mutex_unlock(pthread_mutex_t *mutex)
  99{
 100    int ret = pthread_mutex_unlock(mutex);
 101    if (ret) die2(ret, "pthread_mutex_unlock");
 102}
 103
 104static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
 105                           struct timespec *ts)
 106{
 107    int ret = pthread_cond_timedwait(cond, mutex, ts);
 108    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
 109    return ret;
 110}
 111
 112static void cond_signal(pthread_cond_t *cond)
 113{
 114    int ret = pthread_cond_signal(cond);
 115    if (ret) die2(ret, "pthread_cond_signal");
 116}
 117
 118static void thread_create(pthread_t *thread, pthread_attr_t *attr,
 119                          void *(*start_routine)(void*), void *arg)
 120{
 121    int ret = pthread_create(thread, attr, start_routine, arg);
 122    if (ret) die2(ret, "pthread_create");
 123}
 124
 125static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
 126{
 127    int ret;
 128
 129    ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
 130    if (ret == -1)
 131        return -errno;
 132
 133    /*
 134     * This looks weird, but the aio code only considers a request
 135     * successful if it has written the full number of bytes.
 136     *
 137     * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
 138     * so in fact we return the ioctl command here to make posix_aio_read()
 139     * happy..
 140     */
 141    return aiocb->aio_nbytes;
 142}
 143
 144static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
 145{
 146    int ret;
 147
 148    ret = qemu_fdatasync(aiocb->aio_fildes);
 149    if (ret == -1)
 150        return -errno;
 151    return 0;
 152}
 153
 154#ifdef CONFIG_PREADV
 155
 156static ssize_t
 157qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
 158{
 159    return preadv(fd, iov, nr_iov, offset);
 160}
 161
 162static ssize_t
 163qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
 164{
 165    return pwritev(fd, iov, nr_iov, offset);
 166}
 167
 168#else
 169
 170static ssize_t
 171qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
 172{
 173    return -ENOSYS;
 174}
 175
 176static ssize_t
 177qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
 178{
 179    return -ENOSYS;
 180}
 181
 182#endif
 183
 184static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
 185{
 186    ssize_t len;
 187
 188    do {
 189        if (aiocb->aio_type & QEMU_AIO_WRITE)
 190            len = qemu_pwritev(aiocb->aio_fildes,
 191                               aiocb->aio_iov,
 192                               aiocb->aio_niov,
 193                               aiocb->aio_offset);
 194         else
 195            len = qemu_preadv(aiocb->aio_fildes,
 196                              aiocb->aio_iov,
 197                              aiocb->aio_niov,
 198                              aiocb->aio_offset);
 199    } while (len == -1 && errno == EINTR);
 200
 201    if (len == -1)
 202        return -errno;
 203    return len;
 204}
 205
 206/*
 207 * Read/writes the data to/from a given linear buffer.
 208 *
 209 * Returns the number of bytes handles or -errno in case of an error. Short
 210 * reads are only returned if the end of the file is reached.
 211 */
 212static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
 213{
 214    ssize_t offset = 0;
 215    ssize_t len;
 216
 217    while (offset < aiocb->aio_nbytes) {
 218         if (aiocb->aio_type & QEMU_AIO_WRITE)
 219             len = pwrite(aiocb->aio_fildes,
 220                          (const char *)buf + offset,
 221                          aiocb->aio_nbytes - offset,
 222                          aiocb->aio_offset + offset);
 223         else
 224             len = pread(aiocb->aio_fildes,
 225                         buf + offset,
 226                         aiocb->aio_nbytes - offset,
 227                         aiocb->aio_offset + offset);
 228
 229         if (len == -1 && errno == EINTR)
 230             continue;
 231         else if (len == -1) {
 232             offset = -errno;
 233             break;
 234         } else if (len == 0)
 235             break;
 236
 237         offset += len;
 238    }
 239
 240    return offset;
 241}
 242
 243static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
 244{
 245    ssize_t nbytes;
 246    char *buf;
 247
 248    if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
 249        /*
 250         * If there is just a single buffer, and it is properly aligned
 251         * we can just use plain pread/pwrite without any problems.
 252         */
 253        if (aiocb->aio_niov == 1)
 254             return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
 255
 256        /*
 257         * We have more than one iovec, and all are properly aligned.
 258         *
 259         * Try preadv/pwritev first and fall back to linearizing the
 260         * buffer if it's not supported.
 261         */
 262        if (preadv_present) {
 263            nbytes = handle_aiocb_rw_vector(aiocb);
 264            if (nbytes == aiocb->aio_nbytes)
 265                return nbytes;
 266            if (nbytes < 0 && nbytes != -ENOSYS)
 267                return nbytes;
 268            preadv_present = 0;
 269        }
 270
 271        /*
 272         * XXX(hch): short read/write.  no easy way to handle the reminder
 273         * using these interfaces.  For now retry using plain
 274         * pread/pwrite?
 275         */
 276    }
 277
 278    /*
 279     * Ok, we have to do it the hard way, copy all segments into
 280     * a single aligned buffer.
 281     */
 282    buf = qemu_blockalign(aiocb->common.bs, aiocb->aio_nbytes);
 283    if (aiocb->aio_type & QEMU_AIO_WRITE) {
 284        char *p = buf;
 285        int i;
 286
 287        for (i = 0; i < aiocb->aio_niov; ++i) {
 288            memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
 289            p += aiocb->aio_iov[i].iov_len;
 290        }
 291    }
 292
 293    nbytes = handle_aiocb_rw_linear(aiocb, buf);
 294    if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
 295        char *p = buf;
 296        size_t count = aiocb->aio_nbytes, copy;
 297        int i;
 298
 299        for (i = 0; i < aiocb->aio_niov && count; ++i) {
 300            copy = count;
 301            if (copy > aiocb->aio_iov[i].iov_len)
 302                copy = aiocb->aio_iov[i].iov_len;
 303            memcpy(aiocb->aio_iov[i].iov_base, p, copy);
 304            p     += copy;
 305            count -= copy;
 306        }
 307    }
 308    qemu_vfree(buf);
 309
 310    return nbytes;
 311}
 312
 313static void posix_aio_notify_event(void);
 314
 315static void *aio_thread(void *unused)
 316{
 317    mutex_lock(&lock);
 318    pending_threads--;
 319    mutex_unlock(&lock);
 320    do_spawn_thread();
 321
 322    while (1) {
 323        struct qemu_paiocb *aiocb;
 324        ssize_t ret = 0;
 325        qemu_timeval tv;
 326        struct timespec ts;
 327
 328        qemu_gettimeofday(&tv);
 329        ts.tv_sec = tv.tv_sec + 10;
 330        ts.tv_nsec = 0;
 331
 332        mutex_lock(&lock);
 333
 334        while (QTAILQ_EMPTY(&request_list) &&
 335               !(ret == ETIMEDOUT)) {
 336            idle_threads++;
 337            ret = cond_timedwait(&cond, &lock, &ts);
 338            idle_threads--;
 339        }
 340
 341        if (QTAILQ_EMPTY(&request_list))
 342            break;
 343
 344        aiocb = QTAILQ_FIRST(&request_list);
 345        QTAILQ_REMOVE(&request_list, aiocb, node);
 346        aiocb->active = 1;
 347        mutex_unlock(&lock);
 348
 349        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
 350        case QEMU_AIO_READ:
 351            ret = handle_aiocb_rw(aiocb);
 352            if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->common.bs->growable) {
 353                /* A short read means that we have reached EOF. Pad the buffer
 354                 * with zeros for bytes after EOF. */
 355                iov_memset(aiocb->aio_iov, aiocb->aio_niov, ret,
 356                           0, aiocb->aio_nbytes - ret);
 357
 358                ret = aiocb->aio_nbytes;
 359            }
 360            break;
 361        case QEMU_AIO_WRITE:
 362            ret = handle_aiocb_rw(aiocb);
 363            break;
 364        case QEMU_AIO_FLUSH:
 365            ret = handle_aiocb_flush(aiocb);
 366            break;
 367        case QEMU_AIO_IOCTL:
 368            ret = handle_aiocb_ioctl(aiocb);
 369            break;
 370        default:
 371            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
 372            ret = -EINVAL;
 373            break;
 374        }
 375
 376        mutex_lock(&lock);
 377        aiocb->ret = ret;
 378        mutex_unlock(&lock);
 379
 380        posix_aio_notify_event();
 381    }
 382
 383    cur_threads--;
 384    mutex_unlock(&lock);
 385
 386    return NULL;
 387}
 388
 389static void do_spawn_thread(void)
 390{
 391    sigset_t set, oldset;
 392
 393    mutex_lock(&lock);
 394    if (!new_threads) {
 395        mutex_unlock(&lock);
 396        return;
 397    }
 398
 399    new_threads--;
 400    pending_threads++;
 401
 402    mutex_unlock(&lock);
 403
 404    /* block all signals */
 405    if (sigfillset(&set)) die("sigfillset");
 406    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
 407
 408    thread_create(&thread_id, &attr, aio_thread, NULL);
 409
 410    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
 411}
 412
 413static void spawn_thread_bh_fn(void *opaque)
 414{
 415    do_spawn_thread();
 416}
 417
 418static void spawn_thread(void)
 419{
 420    cur_threads++;
 421    new_threads++;
 422    /* If there are threads being created, they will spawn new workers, so
 423     * we don't spend time creating many threads in a loop holding a mutex or
 424     * starving the current vcpu.
 425     *
 426     * If there are no idle threads, ask the main thread to create one, so we
 427     * inherit the correct affinity instead of the vcpu affinity.
 428     */
 429    if (!pending_threads) {
 430        qemu_bh_schedule(new_thread_bh);
 431    }
 432}
 433
 434static void qemu_paio_submit(struct qemu_paiocb *aiocb)
 435{
 436    aiocb->ret = -EINPROGRESS;
 437    aiocb->active = 0;
 438    mutex_lock(&lock);
 439    if (idle_threads == 0 && cur_threads < max_threads)
 440        spawn_thread();
 441    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
 442    mutex_unlock(&lock);
 443    cond_signal(&cond);
 444}
 445
 446static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
 447{
 448    ssize_t ret;
 449
 450    mutex_lock(&lock);
 451    ret = aiocb->ret;
 452    mutex_unlock(&lock);
 453
 454    return ret;
 455}
 456
 457static int qemu_paio_error(struct qemu_paiocb *aiocb)
 458{
 459    ssize_t ret = qemu_paio_return(aiocb);
 460
 461    if (ret < 0)
 462        ret = -ret;
 463    else
 464        ret = 0;
 465
 466    return ret;
 467}
 468
 469static void posix_aio_read(void *opaque)
 470{
 471    PosixAioState *s = opaque;
 472    struct qemu_paiocb *acb, **pacb;
 473    int ret;
 474    ssize_t len;
 475
 476    /* read all bytes from signal pipe */
 477    for (;;) {
 478        char bytes[16];
 479
 480        len = read(s->rfd, bytes, sizeof(bytes));
 481        if (len == -1 && errno == EINTR)
 482            continue; /* try again */
 483        if (len == sizeof(bytes))
 484            continue; /* more to read */
 485        break;
 486    }
 487
 488    for(;;) {
 489        pacb = &s->first_aio;
 490        for(;;) {
 491            acb = *pacb;
 492            if (!acb)
 493                return;
 494
 495            ret = qemu_paio_error(acb);
 496            if (ret == ECANCELED) {
 497                /* remove the request */
 498                *pacb = acb->next;
 499                qemu_aio_release(acb);
 500            } else if (ret != EINPROGRESS) {
 501                /* end of aio */
 502                if (ret == 0) {
 503                    ret = qemu_paio_return(acb);
 504                    if (ret == acb->aio_nbytes)
 505                        ret = 0;
 506                    else
 507                        ret = -EINVAL;
 508                } else {
 509                    ret = -ret;
 510                }
 511
 512                trace_paio_complete(acb, acb->common.opaque, ret);
 513
 514                /* remove the request */
 515                *pacb = acb->next;
 516                /* call the callback */
 517                acb->common.cb(acb->common.opaque, ret);
 518                qemu_aio_release(acb);
 519                break;
 520            } else {
 521                pacb = &acb->next;
 522            }
 523        }
 524    }
 525}
 526
 527static int posix_aio_flush(void *opaque)
 528{
 529    PosixAioState *s = opaque;
 530    return !!s->first_aio;
 531}
 532
 533static PosixAioState *posix_aio_state;
 534
 535static void posix_aio_notify_event(void)
 536{
 537    char byte = 0;
 538    ssize_t ret;
 539
 540    ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
 541    if (ret < 0 && errno != EAGAIN)
 542        die("write()");
 543}
 544
 545static void paio_remove(struct qemu_paiocb *acb)
 546{
 547    struct qemu_paiocb **pacb;
 548
 549    /* remove the callback from the queue */
 550    pacb = &posix_aio_state->first_aio;
 551    for(;;) {
 552        if (*pacb == NULL) {
 553            fprintf(stderr, "paio_remove: aio request not found!\n");
 554            break;
 555        } else if (*pacb == acb) {
 556            *pacb = acb->next;
 557            qemu_aio_release(acb);
 558            break;
 559        }
 560        pacb = &(*pacb)->next;
 561    }
 562}
 563
 564static void paio_cancel(BlockDriverAIOCB *blockacb)
 565{
 566    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
 567    int active = 0;
 568
 569    trace_paio_cancel(acb, acb->common.opaque);
 570
 571    mutex_lock(&lock);
 572    if (!acb->active) {
 573        QTAILQ_REMOVE(&request_list, acb, node);
 574        acb->ret = -ECANCELED;
 575    } else if (acb->ret == -EINPROGRESS) {
 576        active = 1;
 577    }
 578    mutex_unlock(&lock);
 579
 580    if (active) {
 581        /* fail safe: if the aio could not be canceled, we wait for
 582           it */
 583        while (qemu_paio_error(acb) == EINPROGRESS)
 584            ;
 585    }
 586
 587    paio_remove(acb);
 588}
 589
 590static AIOPool raw_aio_pool = {
 591    .aiocb_size         = sizeof(struct qemu_paiocb),
 592    .cancel             = paio_cancel,
 593};
 594
 595BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
 596        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
 597        BlockDriverCompletionFunc *cb, void *opaque, int type)
 598{
 599    struct qemu_paiocb *acb;
 600
 601    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
 602    acb->aio_type = type;
 603    acb->aio_fildes = fd;
 604
 605    if (qiov) {
 606        acb->aio_iov = qiov->iov;
 607        acb->aio_niov = qiov->niov;
 608    }
 609    acb->aio_nbytes = nb_sectors * 512;
 610    acb->aio_offset = sector_num * 512;
 611
 612    acb->next = posix_aio_state->first_aio;
 613    posix_aio_state->first_aio = acb;
 614
 615    trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
 616    qemu_paio_submit(acb);
 617    return &acb->common;
 618}
 619
 620BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
 621        unsigned long int req, void *buf,
 622        BlockDriverCompletionFunc *cb, void *opaque)
 623{
 624    struct qemu_paiocb *acb;
 625
 626    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
 627    acb->aio_type = QEMU_AIO_IOCTL;
 628    acb->aio_fildes = fd;
 629    acb->aio_offset = 0;
 630    acb->aio_ioctl_buf = buf;
 631    acb->aio_ioctl_cmd = req;
 632
 633    acb->next = posix_aio_state->first_aio;
 634    posix_aio_state->first_aio = acb;
 635
 636    qemu_paio_submit(acb);
 637    return &acb->common;
 638}
 639
 640int paio_init(void)
 641{
 642    PosixAioState *s;
 643    int fds[2];
 644    int ret;
 645
 646    if (posix_aio_state)
 647        return 0;
 648
 649    s = g_malloc(sizeof(PosixAioState));
 650
 651    s->first_aio = NULL;
 652    if (qemu_pipe(fds) == -1) {
 653        fprintf(stderr, "failed to create pipe\n");
 654        g_free(s);
 655        return -1;
 656    }
 657
 658    s->rfd = fds[0];
 659    s->wfd = fds[1];
 660
 661    fcntl(s->rfd, F_SETFL, O_NONBLOCK);
 662    fcntl(s->wfd, F_SETFL, O_NONBLOCK);
 663
 664    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
 665
 666    ret = pthread_attr_init(&attr);
 667    if (ret)
 668        die2(ret, "pthread_attr_init");
 669
 670    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
 671    if (ret)
 672        die2(ret, "pthread_attr_setdetachstate");
 673
 674    QTAILQ_INIT(&request_list);
 675    new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
 676
 677    posix_aio_state = s;
 678    return 0;
 679}
 680