qemu/iothread.c
<<
>>
Prefs
   1/*
   2 * Event loop thread
   3 *
   4 * Copyright Red Hat Inc., 2013
   5 *
   6 * Authors:
   7 *  Stefan Hajnoczi   <stefanha@redhat.com>
   8 *
   9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
  10 * See the COPYING file in the top-level directory.
  11 *
  12 */
  13
  14#include "qemu/osdep.h"
  15#include "qom/object.h"
  16#include "qom/object_interfaces.h"
  17#include "qemu/module.h"
  18#include "block/aio.h"
  19#include "block/block.h"
  20#include "sysemu/iothread.h"
  21#include "qmp-commands.h"
  22#include "qemu/error-report.h"
  23#include "qemu/rcu.h"
  24#include "qemu/main-loop.h"
  25
  26typedef ObjectClass IOThreadClass;
  27
  28#define IOTHREAD_GET_CLASS(obj) \
  29   OBJECT_GET_CLASS(IOThreadClass, obj, TYPE_IOTHREAD)
  30#define IOTHREAD_CLASS(klass) \
  31   OBJECT_CLASS_CHECK(IOThreadClass, klass, TYPE_IOTHREAD)
  32
  33/* Benchmark results from 2016 on NVMe SSD drives show max polling times around
  34 * 16-32 microseconds yield IOPS improvements for both iodepth=1 and iodepth=32
  35 * workloads.
  36 */
  37#define IOTHREAD_POLL_MAX_NS_DEFAULT 32768ULL
  38
  39static __thread IOThread *my_iothread;
  40
  41AioContext *qemu_get_current_aio_context(void)
  42{
  43    return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
  44}
  45
  46static void *iothread_run(void *opaque)
  47{
  48    IOThread *iothread = opaque;
  49
  50    rcu_register_thread();
  51
  52    my_iothread = iothread;
  53    qemu_mutex_lock(&iothread->init_done_lock);
  54    iothread->thread_id = qemu_get_thread_id();
  55    qemu_cond_signal(&iothread->init_done_cond);
  56    qemu_mutex_unlock(&iothread->init_done_lock);
  57
  58    while (!atomic_read(&iothread->stopping)) {
  59        aio_poll(iothread->ctx, true);
  60
  61        if (atomic_read(&iothread->worker_context)) {
  62            GMainLoop *loop;
  63
  64            g_main_context_push_thread_default(iothread->worker_context);
  65            iothread->main_loop =
  66                g_main_loop_new(iothread->worker_context, TRUE);
  67            loop = iothread->main_loop;
  68
  69            g_main_loop_run(iothread->main_loop);
  70            iothread->main_loop = NULL;
  71            g_main_loop_unref(loop);
  72
  73            g_main_context_pop_thread_default(iothread->worker_context);
  74        }
  75    }
  76
  77    rcu_unregister_thread();
  78    return NULL;
  79}
  80
  81void iothread_stop(IOThread *iothread)
  82{
  83    if (!iothread->ctx || iothread->stopping) {
  84        return;
  85    }
  86    iothread->stopping = true;
  87    aio_notify(iothread->ctx);
  88    if (atomic_read(&iothread->main_loop)) {
  89        g_main_loop_quit(iothread->main_loop);
  90    }
  91    qemu_thread_join(&iothread->thread);
  92}
  93
  94static int iothread_stop_iter(Object *object, void *opaque)
  95{
  96    IOThread *iothread;
  97
  98    iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
  99    if (!iothread) {
 100        return 0;
 101    }
 102    iothread_stop(iothread);
 103    return 0;
 104}
 105
 106static void iothread_instance_init(Object *obj)
 107{
 108    IOThread *iothread = IOTHREAD(obj);
 109
 110    iothread->poll_max_ns = IOTHREAD_POLL_MAX_NS_DEFAULT;
 111}
 112
 113static void iothread_instance_finalize(Object *obj)
 114{
 115    IOThread *iothread = IOTHREAD(obj);
 116
 117    iothread_stop(iothread);
 118    if (iothread->worker_context) {
 119        g_main_context_unref(iothread->worker_context);
 120        iothread->worker_context = NULL;
 121    }
 122    qemu_cond_destroy(&iothread->init_done_cond);
 123    qemu_mutex_destroy(&iothread->init_done_lock);
 124    if (!iothread->ctx) {
 125        return;
 126    }
 127    aio_context_unref(iothread->ctx);
 128}
 129
 130static void iothread_complete(UserCreatable *obj, Error **errp)
 131{
 132    Error *local_error = NULL;
 133    IOThread *iothread = IOTHREAD(obj);
 134    char *name, *thread_name;
 135
 136    iothread->stopping = false;
 137    iothread->thread_id = -1;
 138    iothread->ctx = aio_context_new(&local_error);
 139    if (!iothread->ctx) {
 140        error_propagate(errp, local_error);
 141        return;
 142    }
 143
 144    aio_context_set_poll_params(iothread->ctx,
 145                                iothread->poll_max_ns,
 146                                iothread->poll_grow,
 147                                iothread->poll_shrink,
 148                                &local_error);
 149    if (local_error) {
 150        error_propagate(errp, local_error);
 151        aio_context_unref(iothread->ctx);
 152        iothread->ctx = NULL;
 153        return;
 154    }
 155
 156    qemu_mutex_init(&iothread->init_done_lock);
 157    qemu_cond_init(&iothread->init_done_cond);
 158    iothread->once = (GOnce) G_ONCE_INIT;
 159
 160    /* This assumes we are called from a thread with useful CPU affinity for us
 161     * to inherit.
 162     */
 163    name = object_get_canonical_path_component(OBJECT(obj));
 164    thread_name = g_strdup_printf("IO %s", name);
 165    qemu_thread_create(&iothread->thread, thread_name, iothread_run,
 166                       iothread, QEMU_THREAD_JOINABLE);
 167    g_free(thread_name);
 168    g_free(name);
 169
 170    /* Wait for initialization to complete */
 171    qemu_mutex_lock(&iothread->init_done_lock);
 172    while (iothread->thread_id == -1) {
 173        qemu_cond_wait(&iothread->init_done_cond,
 174                       &iothread->init_done_lock);
 175    }
 176    qemu_mutex_unlock(&iothread->init_done_lock);
 177}
 178
 179typedef struct {
 180    const char *name;
 181    ptrdiff_t offset; /* field's byte offset in IOThread struct */
 182} PollParamInfo;
 183
 184static PollParamInfo poll_max_ns_info = {
 185    "poll-max-ns", offsetof(IOThread, poll_max_ns),
 186};
 187static PollParamInfo poll_grow_info = {
 188    "poll-grow", offsetof(IOThread, poll_grow),
 189};
 190static PollParamInfo poll_shrink_info = {
 191    "poll-shrink", offsetof(IOThread, poll_shrink),
 192};
 193
 194static void iothread_get_poll_param(Object *obj, Visitor *v,
 195        const char *name, void *opaque, Error **errp)
 196{
 197    IOThread *iothread = IOTHREAD(obj);
 198    PollParamInfo *info = opaque;
 199    int64_t *field = (void *)iothread + info->offset;
 200
 201    visit_type_int64(v, name, field, errp);
 202}
 203
 204static void iothread_set_poll_param(Object *obj, Visitor *v,
 205        const char *name, void *opaque, Error **errp)
 206{
 207    IOThread *iothread = IOTHREAD(obj);
 208    PollParamInfo *info = opaque;
 209    int64_t *field = (void *)iothread + info->offset;
 210    Error *local_err = NULL;
 211    int64_t value;
 212
 213    visit_type_int64(v, name, &value, &local_err);
 214    if (local_err) {
 215        goto out;
 216    }
 217
 218    if (value < 0) {
 219        error_setg(&local_err, "%s value must be in range [0, %"PRId64"]",
 220                   info->name, INT64_MAX);
 221        goto out;
 222    }
 223
 224    *field = value;
 225
 226    if (iothread->ctx) {
 227        aio_context_set_poll_params(iothread->ctx,
 228                                    iothread->poll_max_ns,
 229                                    iothread->poll_grow,
 230                                    iothread->poll_shrink,
 231                                    &local_err);
 232    }
 233
 234out:
 235    error_propagate(errp, local_err);
 236}
 237
 238static void iothread_class_init(ObjectClass *klass, void *class_data)
 239{
 240    UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
 241    ucc->complete = iothread_complete;
 242
 243    object_class_property_add(klass, "poll-max-ns", "int",
 244                              iothread_get_poll_param,
 245                              iothread_set_poll_param,
 246                              NULL, &poll_max_ns_info, &error_abort);
 247    object_class_property_add(klass, "poll-grow", "int",
 248                              iothread_get_poll_param,
 249                              iothread_set_poll_param,
 250                              NULL, &poll_grow_info, &error_abort);
 251    object_class_property_add(klass, "poll-shrink", "int",
 252                              iothread_get_poll_param,
 253                              iothread_set_poll_param,
 254                              NULL, &poll_shrink_info, &error_abort);
 255}
 256
 257static const TypeInfo iothread_info = {
 258    .name = TYPE_IOTHREAD,
 259    .parent = TYPE_OBJECT,
 260    .class_init = iothread_class_init,
 261    .instance_size = sizeof(IOThread),
 262    .instance_init = iothread_instance_init,
 263    .instance_finalize = iothread_instance_finalize,
 264    .interfaces = (InterfaceInfo[]) {
 265        {TYPE_USER_CREATABLE},
 266        {}
 267    },
 268};
 269
 270static void iothread_register_types(void)
 271{
 272    type_register_static(&iothread_info);
 273}
 274
 275type_init(iothread_register_types)
 276
 277char *iothread_get_id(IOThread *iothread)
 278{
 279    return object_get_canonical_path_component(OBJECT(iothread));
 280}
 281
 282AioContext *iothread_get_aio_context(IOThread *iothread)
 283{
 284    return iothread->ctx;
 285}
 286
 287static int query_one_iothread(Object *object, void *opaque)
 288{
 289    IOThreadInfoList ***prev = opaque;
 290    IOThreadInfoList *elem;
 291    IOThreadInfo *info;
 292    IOThread *iothread;
 293
 294    iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
 295    if (!iothread) {
 296        return 0;
 297    }
 298
 299    info = g_new0(IOThreadInfo, 1);
 300    info->id = iothread_get_id(iothread);
 301    info->thread_id = iothread->thread_id;
 302    info->poll_max_ns = iothread->poll_max_ns;
 303    info->poll_grow = iothread->poll_grow;
 304    info->poll_shrink = iothread->poll_shrink;
 305
 306    elem = g_new0(IOThreadInfoList, 1);
 307    elem->value = info;
 308    elem->next = NULL;
 309
 310    **prev = elem;
 311    *prev = &elem->next;
 312    return 0;
 313}
 314
 315IOThreadInfoList *qmp_query_iothreads(Error **errp)
 316{
 317    IOThreadInfoList *head = NULL;
 318    IOThreadInfoList **prev = &head;
 319    Object *container = object_get_objects_root();
 320
 321    object_child_foreach(container, query_one_iothread, &prev);
 322    return head;
 323}
 324
 325void iothread_stop_all(void)
 326{
 327    Object *container = object_get_objects_root();
 328    BlockDriverState *bs;
 329    BdrvNextIterator it;
 330
 331    for (bs = bdrv_first(&it); bs; bs = bdrv_next(&it)) {
 332        AioContext *ctx = bdrv_get_aio_context(bs);
 333        if (ctx == qemu_get_aio_context()) {
 334            continue;
 335        }
 336        aio_context_acquire(ctx);
 337        bdrv_set_aio_context(bs, qemu_get_aio_context());
 338        aio_context_release(ctx);
 339    }
 340
 341    object_child_foreach(container, iothread_stop_iter, NULL);
 342}
 343
 344static gpointer iothread_g_main_context_init(gpointer opaque)
 345{
 346    AioContext *ctx;
 347    IOThread *iothread = opaque;
 348    GSource *source;
 349
 350    iothread->worker_context = g_main_context_new();
 351
 352    ctx = iothread_get_aio_context(iothread);
 353    source = aio_get_g_source(ctx);
 354    g_source_attach(source, iothread->worker_context);
 355    g_source_unref(source);
 356
 357    aio_notify(iothread->ctx);
 358    return NULL;
 359}
 360
 361GMainContext *iothread_get_g_main_context(IOThread *iothread)
 362{
 363    g_once(&iothread->once, iothread_g_main_context_init, iothread);
 364
 365    return iothread->worker_context;
 366}
 367
 368IOThread *iothread_create(const char *id, Error **errp)
 369{
 370    Object *obj;
 371
 372    obj = object_new_with_props(TYPE_IOTHREAD,
 373                                object_get_internal_root(),
 374                                id, errp, NULL);
 375
 376    return IOTHREAD(obj);
 377}
 378
 379void iothread_destroy(IOThread *iothread)
 380{
 381    object_unparent(OBJECT(iothread));
 382}
 383