qemu/tests/iothread.c
<<
>>
Prefs
   1/*
   2 * Event loop thread implementation for unit tests
   3 *
   4 * Copyright Red Hat Inc., 2013, 2016
   5 *
   6 * Authors:
   7 *  Stefan Hajnoczi   <stefanha@redhat.com>
   8 *  Paolo Bonzini     <pbonzini@redhat.com>
   9 *
  10 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  11 * See the COPYING file in the top-level directory.
  12 *
  13 */
  14
  15#include "qemu/osdep.h"
  16#include "qapi/error.h"
  17#include "block/aio.h"
  18#include "qemu/main-loop.h"
  19#include "qemu/rcu.h"
  20#include "iothread.h"
  21
  22struct IOThread {
  23    AioContext *ctx;
  24    GMainContext *worker_context;
  25    GMainLoop *main_loop;
  26
  27    QemuThread thread;
  28    QemuMutex init_done_lock;
  29    QemuCond init_done_cond;    /* is thread initialization done? */
  30    bool stopping;
  31};
  32
  33static __thread IOThread *my_iothread;
  34
  35AioContext *qemu_get_current_aio_context(void)
  36{
  37    return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
  38}
  39
  40static void iothread_init_gcontext(IOThread *iothread)
  41{
  42    GSource *source;
  43
  44    iothread->worker_context = g_main_context_new();
  45    source = aio_get_g_source(iothread_get_aio_context(iothread));
  46    g_source_attach(source, iothread->worker_context);
  47    g_source_unref(source);
  48    iothread->main_loop = g_main_loop_new(iothread->worker_context, TRUE);
  49}
  50
  51static void *iothread_run(void *opaque)
  52{
  53    IOThread *iothread = opaque;
  54
  55    rcu_register_thread();
  56
  57    my_iothread = iothread;
  58    qemu_mutex_lock(&iothread->init_done_lock);
  59    iothread->ctx = aio_context_new(&error_abort);
  60
  61    /*
  62     * We must connect the ctx to a GMainContext, because in older versions
  63     * of glib the g_source_ref()/unref() functions are not threadsafe
  64     * on sources without a context.
  65     */
  66    iothread_init_gcontext(iothread);
  67
  68    /*
  69     * g_main_context_push_thread_default() must be called before anything
  70     * in this new thread uses glib.
  71     */
  72    g_main_context_push_thread_default(iothread->worker_context);
  73
  74    qemu_cond_signal(&iothread->init_done_cond);
  75    qemu_mutex_unlock(&iothread->init_done_lock);
  76
  77    while (!atomic_read(&iothread->stopping)) {
  78        aio_poll(iothread->ctx, true);
  79    }
  80
  81    g_main_context_pop_thread_default(iothread->worker_context);
  82    rcu_unregister_thread();
  83    return NULL;
  84}
  85
  86static void iothread_stop_bh(void *opaque)
  87{
  88    IOThread *iothread = opaque;
  89
  90    iothread->stopping = true;
  91}
  92
  93void iothread_join(IOThread *iothread)
  94{
  95    aio_bh_schedule_oneshot(iothread->ctx, iothread_stop_bh, iothread);
  96    qemu_thread_join(&iothread->thread);
  97    g_main_context_unref(iothread->worker_context);
  98    g_main_loop_unref(iothread->main_loop);
  99    qemu_cond_destroy(&iothread->init_done_cond);
 100    qemu_mutex_destroy(&iothread->init_done_lock);
 101    aio_context_unref(iothread->ctx);
 102    g_free(iothread);
 103}
 104
 105IOThread *iothread_new(void)
 106{
 107    IOThread *iothread = g_new0(IOThread, 1);
 108
 109    qemu_mutex_init(&iothread->init_done_lock);
 110    qemu_cond_init(&iothread->init_done_cond);
 111    qemu_thread_create(&iothread->thread, NULL, iothread_run,
 112                       iothread, QEMU_THREAD_JOINABLE);
 113
 114    /* Wait for initialization to complete */
 115    qemu_mutex_lock(&iothread->init_done_lock);
 116    while (iothread->ctx == NULL) {
 117        qemu_cond_wait(&iothread->init_done_cond,
 118                       &iothread->init_done_lock);
 119    }
 120    qemu_mutex_unlock(&iothread->init_done_lock);
 121    return iothread;
 122}
 123
 124AioContext *iothread_get_aio_context(IOThread *iothread)
 125{
 126    return iothread->ctx;
 127}
 128