qemu/iothread.c
<<
>>
Prefs
   1/*
   2 * Event loop thread
   3 *
   4 * Copyright Red Hat Inc., 2013
   5 *
   6 * Authors:
   7 *  Stefan Hajnoczi   <stefanha@redhat.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  10 * See the COPYING file in the top-level directory.
  11 *
  12 */
  13
  14#include "qemu/osdep.h"
  15#include "qom/object.h"
  16#include "qom/object_interfaces.h"
  17#include "qemu/module.h"
  18#include "block/aio.h"
  19#include "sysemu/iothread.h"
  20#include "qmp-commands.h"
  21#include "qemu/error-report.h"
  22#include "qemu/rcu.h"
  23
  24typedef ObjectClass IOThreadClass;
  25
  26#define IOTHREAD_GET_CLASS(obj) \
  27   OBJECT_GET_CLASS(IOThreadClass, obj, TYPE_IOTHREAD)
  28#define IOTHREAD_CLASS(klass) \
  29   OBJECT_CLASS_CHECK(IOThreadClass, klass, TYPE_IOTHREAD)
  30
  31static void *iothread_run(void *opaque)
  32{
  33    IOThread *iothread = opaque;
  34    bool blocking;
  35
  36    rcu_register_thread();
  37
  38    qemu_mutex_lock(&iothread->init_done_lock);
  39    iothread->thread_id = qemu_get_thread_id();
  40    qemu_cond_signal(&iothread->init_done_cond);
  41    qemu_mutex_unlock(&iothread->init_done_lock);
  42
  43    while (!iothread->stopping) {
  44        aio_context_acquire(iothread->ctx);
  45        blocking = true;
  46        while (!iothread->stopping && aio_poll(iothread->ctx, blocking)) {
  47            /* Progress was made, keep going */
  48            blocking = false;
  49        }
  50        aio_context_release(iothread->ctx);
  51    }
  52
  53    rcu_unregister_thread();
  54    return NULL;
  55}
  56
  57static int iothread_stop(Object *object, void *opaque)
  58{
  59    IOThread *iothread;
  60
  61    iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
  62    if (!iothread || !iothread->ctx) {
  63        return 0;
  64    }
  65    iothread->stopping = true;
  66    aio_notify(iothread->ctx);
  67    qemu_thread_join(&iothread->thread);
  68    return 0;
  69}
  70
  71static void iothread_instance_finalize(Object *obj)
  72{
  73    IOThread *iothread = IOTHREAD(obj);
  74
  75    iothread_stop(obj, NULL);
  76    qemu_cond_destroy(&iothread->init_done_cond);
  77    qemu_mutex_destroy(&iothread->init_done_lock);
  78    aio_context_unref(iothread->ctx);
  79}
  80
  81static void iothread_complete(UserCreatable *obj, Error **errp)
  82{
  83    Error *local_error = NULL;
  84    IOThread *iothread = IOTHREAD(obj);
  85    char *name, *thread_name;
  86
  87    iothread->stopping = false;
  88    iothread->thread_id = -1;
  89    iothread->ctx = aio_context_new(&local_error);
  90    if (!iothread->ctx) {
  91        error_propagate(errp, local_error);
  92        return;
  93    }
  94
  95    qemu_mutex_init(&iothread->init_done_lock);
  96    qemu_cond_init(&iothread->init_done_cond);
  97
  98    /* This assumes we are called from a thread with useful CPU affinity for us
  99     * to inherit.
 100     */
 101    name = object_get_canonical_path_component(OBJECT(obj));
 102    thread_name = g_strdup_printf("IO %s", name);
 103    qemu_thread_create(&iothread->thread, thread_name, iothread_run,
 104                       iothread, QEMU_THREAD_JOINABLE);
 105    g_free(thread_name);
 106    g_free(name);
 107
 108    /* Wait for initialization to complete */
 109    qemu_mutex_lock(&iothread->init_done_lock);
 110    while (iothread->thread_id == -1) {
 111        qemu_cond_wait(&iothread->init_done_cond,
 112                       &iothread->init_done_lock);
 113    }
 114    qemu_mutex_unlock(&iothread->init_done_lock);
 115}
 116
 117static void iothread_class_init(ObjectClass *klass, void *class_data)
 118{
 119    UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
 120    ucc->complete = iothread_complete;
 121}
 122
 123static const TypeInfo iothread_info = {
 124    .name = TYPE_IOTHREAD,
 125    .parent = TYPE_OBJECT,
 126    .class_init = iothread_class_init,
 127    .instance_size = sizeof(IOThread),
 128    .instance_finalize = iothread_instance_finalize,
 129    .interfaces = (InterfaceInfo[]) {
 130        {TYPE_USER_CREATABLE},
 131        {}
 132    },
 133};
 134
 135static void iothread_register_types(void)
 136{
 137    type_register_static(&iothread_info);
 138}
 139
 140type_init(iothread_register_types)
 141
 142char *iothread_get_id(IOThread *iothread)
 143{
 144    return object_get_canonical_path_component(OBJECT(iothread));
 145}
 146
 147AioContext *iothread_get_aio_context(IOThread *iothread)
 148{
 149    return iothread->ctx;
 150}
 151
 152static int query_one_iothread(Object *object, void *opaque)
 153{
 154    IOThreadInfoList ***prev = opaque;
 155    IOThreadInfoList *elem;
 156    IOThreadInfo *info;
 157    IOThread *iothread;
 158
 159    iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
 160    if (!iothread) {
 161        return 0;
 162    }
 163
 164    info = g_new0(IOThreadInfo, 1);
 165    info->id = iothread_get_id(iothread);
 166    info->thread_id = iothread->thread_id;
 167
 168    elem = g_new0(IOThreadInfoList, 1);
 169    elem->value = info;
 170    elem->next = NULL;
 171
 172    **prev = elem;
 173    *prev = &elem->next;
 174    return 0;
 175}
 176
 177IOThreadInfoList *qmp_query_iothreads(Error **errp)
 178{
 179    IOThreadInfoList *head = NULL;
 180    IOThreadInfoList **prev = &head;
 181    Object *container = object_get_objects_root();
 182
 183    object_child_foreach(container, query_one_iothread, &prev);
 184    return head;
 185}
 186
 187void iothread_stop_all(void)
 188{
 189    Object *container = object_get_objects_root();
 190
 191    object_child_foreach(container, iothread_stop, NULL);
 192}
 193