qemu/util/thread-pool.c
<<
>>
Prefs
   1/*
   2 * QEMU block layer thread pool
   3 *
   4 * Copyright IBM, Corp. 2008
   5 * Copyright Red Hat, Inc. 2012
   6 *
   7 * Authors:
   8 *  Anthony Liguori   <aliguori@us.ibm.com>
   9 *  Paolo Bonzini     <pbonzini@redhat.com>
  10 *
  11 * This work is licensed under the terms of the GNU GPL, version 2.  See
  12 * the COPYING file in the top-level directory.
  13 *
  14 * Contributions after 2012-01-13 are licensed under the terms of the
  15 * GNU GPL, version 2 or (at your option) any later version.
  16 */
  17#include "qemu/osdep.h"
  18#include "qemu/queue.h"
  19#include "qemu/thread.h"
  20#include "qemu/coroutine.h"
  21#include "trace.h"
  22#include "block/thread-pool.h"
  23#include "qemu/main-loop.h"
  24
  25static void do_spawn_thread(ThreadPool *pool);
  26
  27typedef struct ThreadPoolElement ThreadPoolElement;
  28
  29enum ThreadState {
  30    THREAD_QUEUED,
  31    THREAD_ACTIVE,
  32    THREAD_DONE,
  33};
  34
  35struct ThreadPoolElement {
  36    BlockAIOCB common;
  37    ThreadPool *pool;
  38    ThreadPoolFunc *func;
  39    void *arg;
  40
  41    /* Moving state out of THREAD_QUEUED is protected by lock.  After
  42     * that, only the worker thread can write to it.  Reads and writes
  43     * of state and ret are ordered with memory barriers.
  44     */
  45    enum ThreadState state;
  46    int ret;
  47
  48    /* Access to this list is protected by lock.  */
  49    QTAILQ_ENTRY(ThreadPoolElement) reqs;
  50
  51    /* Access to this list is protected by the global mutex.  */
  52    QLIST_ENTRY(ThreadPoolElement) all;
  53};
  54
  55struct ThreadPool {
  56    AioContext *ctx;
  57    QEMUBH *completion_bh;
  58    QemuMutex lock;
  59    QemuCond worker_stopped;
  60    QemuCond request_cond;
  61    QEMUBH *new_thread_bh;
  62
  63    /* The following variables are only accessed from one AioContext. */
  64    QLIST_HEAD(, ThreadPoolElement) head;
  65
  66    /* The following variables are protected by lock.  */
  67    QTAILQ_HEAD(, ThreadPoolElement) request_list;
  68    int cur_threads;
  69    int idle_threads;
  70    int new_threads;     /* backlog of threads we need to create */
  71    int pending_threads; /* threads created but not running yet */
  72    int min_threads;
  73    int max_threads;
  74};
  75
  76static void *worker_thread(void *opaque)
  77{
  78    ThreadPool *pool = opaque;
  79
  80    qemu_mutex_lock(&pool->lock);
  81    pool->pending_threads--;
  82    do_spawn_thread(pool);
  83
  84    while (pool->cur_threads <= pool->max_threads) {
  85        ThreadPoolElement *req;
  86        int ret;
  87
  88        if (QTAILQ_EMPTY(&pool->request_list)) {
  89            pool->idle_threads++;
  90            ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
  91            pool->idle_threads--;
  92            if (ret == 0 &&
  93                QTAILQ_EMPTY(&pool->request_list) &&
  94                pool->cur_threads > pool->min_threads) {
  95                /* Timed out + no work to do + no need for warm threads = exit.  */
  96                break;
  97            }
  98            /*
  99             * Even if there was some work to do, check if there aren't
 100             * too many worker threads before picking it up.
 101             */
 102            continue;
 103        }
 104
 105        req = QTAILQ_FIRST(&pool->request_list);
 106        QTAILQ_REMOVE(&pool->request_list, req, reqs);
 107        req->state = THREAD_ACTIVE;
 108        qemu_mutex_unlock(&pool->lock);
 109
 110        ret = req->func(req->arg);
 111
 112        req->ret = ret;
 113        /* Write ret before state.  */
 114        smp_wmb();
 115        req->state = THREAD_DONE;
 116
 117        qemu_bh_schedule(pool->completion_bh);
 118        qemu_mutex_lock(&pool->lock);
 119    }
 120
 121    pool->cur_threads--;
 122    qemu_cond_signal(&pool->worker_stopped);
 123
 124    /*
 125     * Wake up another thread, in case we got a wakeup but decided
 126     * to exit due to pool->cur_threads > pool->max_threads.
 127     */
 128    qemu_cond_signal(&pool->request_cond);
 129    qemu_mutex_unlock(&pool->lock);
 130    return NULL;
 131}
 132
 133static void do_spawn_thread(ThreadPool *pool)
 134{
 135    QemuThread t;
 136
 137    /* Runs with lock taken.  */
 138    if (!pool->new_threads) {
 139        return;
 140    }
 141
 142    pool->new_threads--;
 143    pool->pending_threads++;
 144
 145    qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED);
 146}
 147
 148static void spawn_thread_bh_fn(void *opaque)
 149{
 150    ThreadPool *pool = opaque;
 151
 152    qemu_mutex_lock(&pool->lock);
 153    do_spawn_thread(pool);
 154    qemu_mutex_unlock(&pool->lock);
 155}
 156
 157static void spawn_thread(ThreadPool *pool)
 158{
 159    pool->cur_threads++;
 160    pool->new_threads++;
 161    /* If there are threads being created, they will spawn new workers, so
 162     * we don't spend time creating many threads in a loop holding a mutex or
 163     * starving the current vcpu.
 164     *
 165     * If there are no idle threads, ask the main thread to create one, so we
 166     * inherit the correct affinity instead of the vcpu affinity.
 167     */
 168    if (!pool->pending_threads) {
 169        qemu_bh_schedule(pool->new_thread_bh);
 170    }
 171}
 172
 173static void thread_pool_completion_bh(void *opaque)
 174{
 175    ThreadPool *pool = opaque;
 176    ThreadPoolElement *elem, *next;
 177
 178    aio_context_acquire(pool->ctx);
 179restart:
 180    QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
 181        if (elem->state != THREAD_DONE) {
 182            continue;
 183        }
 184
 185        trace_thread_pool_complete(pool, elem, elem->common.opaque,
 186                                   elem->ret);
 187        QLIST_REMOVE(elem, all);
 188
 189        if (elem->common.cb) {
 190            /* Read state before ret.  */
 191            smp_rmb();
 192
 193            /* Schedule ourselves in case elem->common.cb() calls aio_poll() to
 194             * wait for another request that completed at the same time.
 195             */
 196            qemu_bh_schedule(pool->completion_bh);
 197
 198            aio_context_release(pool->ctx);
 199            elem->common.cb(elem->common.opaque, elem->ret);
 200            aio_context_acquire(pool->ctx);
 201
 202            /* We can safely cancel the completion_bh here regardless of someone
 203             * else having scheduled it meanwhile because we reenter the
 204             * completion function anyway (goto restart).
 205             */
 206            qemu_bh_cancel(pool->completion_bh);
 207
 208            qemu_aio_unref(elem);
 209            goto restart;
 210        } else {
 211            qemu_aio_unref(elem);
 212        }
 213    }
 214    aio_context_release(pool->ctx);
 215}
 216
 217static void thread_pool_cancel(BlockAIOCB *acb)
 218{
 219    ThreadPoolElement *elem = (ThreadPoolElement *)acb;
 220    ThreadPool *pool = elem->pool;
 221
 222    trace_thread_pool_cancel(elem, elem->common.opaque);
 223
 224    QEMU_LOCK_GUARD(&pool->lock);
 225    if (elem->state == THREAD_QUEUED) {
 226        QTAILQ_REMOVE(&pool->request_list, elem, reqs);
 227        qemu_bh_schedule(pool->completion_bh);
 228
 229        elem->state = THREAD_DONE;
 230        elem->ret = -ECANCELED;
 231    }
 232
 233}
 234
 235static AioContext *thread_pool_get_aio_context(BlockAIOCB *acb)
 236{
 237    ThreadPoolElement *elem = (ThreadPoolElement *)acb;
 238    ThreadPool *pool = elem->pool;
 239    return pool->ctx;
 240}
 241
 242static const AIOCBInfo thread_pool_aiocb_info = {
 243    .aiocb_size         = sizeof(ThreadPoolElement),
 244    .cancel_async       = thread_pool_cancel,
 245    .get_aio_context    = thread_pool_get_aio_context,
 246};
 247
 248BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
 249        ThreadPoolFunc *func, void *arg,
 250        BlockCompletionFunc *cb, void *opaque)
 251{
 252    ThreadPoolElement *req;
 253
 254    req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
 255    req->func = func;
 256    req->arg = arg;
 257    req->state = THREAD_QUEUED;
 258    req->pool = pool;
 259
 260    QLIST_INSERT_HEAD(&pool->head, req, all);
 261
 262    trace_thread_pool_submit(pool, req, arg);
 263
 264    qemu_mutex_lock(&pool->lock);
 265    if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
 266        spawn_thread(pool);
 267    }
 268    QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
 269    qemu_mutex_unlock(&pool->lock);
 270    qemu_cond_signal(&pool->request_cond);
 271    return &req->common;
 272}
 273
 274typedef struct ThreadPoolCo {
 275    Coroutine *co;
 276    int ret;
 277} ThreadPoolCo;
 278
 279static void thread_pool_co_cb(void *opaque, int ret)
 280{
 281    ThreadPoolCo *co = opaque;
 282
 283    co->ret = ret;
 284    aio_co_wake(co->co);
 285}
 286
 287int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
 288                                       void *arg)
 289{
 290    ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
 291    assert(qemu_in_coroutine());
 292    thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
 293    qemu_coroutine_yield();
 294    return tpc.ret;
 295}
 296
 297void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
 298{
 299    thread_pool_submit_aio(pool, func, arg, NULL, NULL);
 300}
 301
 302void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
 303{
 304    qemu_mutex_lock(&pool->lock);
 305
 306    pool->min_threads = ctx->thread_pool_min;
 307    pool->max_threads = ctx->thread_pool_max;
 308
 309    /*
 310     * We either have to:
 311     *  - Increase the number available of threads until over the min_threads
 312     *    threshold.
 313     *  - Bump the worker threads so that they exit, until under the max_threads
 314     *    threshold.
 315     *  - Do nothing. The current number of threads fall in between the min and
 316     *    max thresholds. We'll let the pool manage itself.
 317     */
 318    for (int i = pool->cur_threads; i < pool->min_threads; i++) {
 319        spawn_thread(pool);
 320    }
 321
 322    for (int i = pool->cur_threads; i > pool->max_threads; i--) {
 323        qemu_cond_signal(&pool->request_cond);
 324    }
 325
 326    qemu_mutex_unlock(&pool->lock);
 327}
 328
 329static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
 330{
 331    if (!ctx) {
 332        ctx = qemu_get_aio_context();
 333    }
 334
 335    memset(pool, 0, sizeof(*pool));
 336    pool->ctx = ctx;
 337    pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool);
 338    qemu_mutex_init(&pool->lock);
 339    qemu_cond_init(&pool->worker_stopped);
 340    qemu_cond_init(&pool->request_cond);
 341    pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
 342
 343    QLIST_INIT(&pool->head);
 344    QTAILQ_INIT(&pool->request_list);
 345
 346    thread_pool_update_params(pool, ctx);
 347}
 348
 349ThreadPool *thread_pool_new(AioContext *ctx)
 350{
 351    ThreadPool *pool = g_new(ThreadPool, 1);
 352    thread_pool_init_one(pool, ctx);
 353    return pool;
 354}
 355
 356void thread_pool_free(ThreadPool *pool)
 357{
 358    if (!pool) {
 359        return;
 360    }
 361
 362    assert(QLIST_EMPTY(&pool->head));
 363
 364    qemu_mutex_lock(&pool->lock);
 365
 366    /* Stop new threads from spawning */
 367    qemu_bh_delete(pool->new_thread_bh);
 368    pool->cur_threads -= pool->new_threads;
 369    pool->new_threads = 0;
 370
 371    /* Wait for worker threads to terminate */
 372    pool->max_threads = 0;
 373    qemu_cond_broadcast(&pool->request_cond);
 374    while (pool->cur_threads > 0) {
 375        qemu_cond_wait(&pool->worker_stopped, &pool->lock);
 376    }
 377
 378    qemu_mutex_unlock(&pool->lock);
 379
 380    qemu_bh_delete(pool->completion_bh);
 381    qemu_cond_destroy(&pool->request_cond);
 382    qemu_cond_destroy(&pool->worker_stopped);
 383    qemu_mutex_destroy(&pool->lock);
 384    g_free(pool);
 385}
 386