qemu/io/task.c
<<
>>
Prefs
   1/*
   2 * QEMU I/O task
   3 *
   4 * Copyright (c) 2015 Red Hat, Inc.
   5 *
   6 * This library is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU Lesser General Public
   8 * License as published by the Free Software Foundation; either
   9 * version 2 of the License, or (at your option) any later version.
  10 *
  11 * This library is distributed in the hope that it will be useful,
  12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * Lesser General Public License for more details.
  15 *
  16 * You should have received a copy of the GNU Lesser General Public
  17 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
  18 *
  19 */
  20
  21#include "qemu/osdep.h"
  22#include "io/task.h"
  23#include "qapi/error.h"
  24#include "qemu/thread.h"
  25#include "trace.h"
  26
  27struct QIOTaskThreadData {
  28    QIOTaskWorker worker;
  29    gpointer opaque;
  30    GDestroyNotify destroy;
  31    GMainContext *context;
  32    GSource *completion;
  33};
  34
  35
  36struct QIOTask {
  37    Object *source;
  38    QIOTaskFunc func;
  39    gpointer opaque;
  40    GDestroyNotify destroy;
  41    Error *err;
  42    gpointer result;
  43    GDestroyNotify destroyResult;
  44    QemuMutex thread_lock;
  45    QemuCond thread_cond;
  46    struct QIOTaskThreadData *thread;
  47};
  48
  49
  50QIOTask *qio_task_new(Object *source,
  51                      QIOTaskFunc func,
  52                      gpointer opaque,
  53                      GDestroyNotify destroy)
  54{
  55    QIOTask *task;
  56
  57    task = g_new0(QIOTask, 1);
  58
  59    task->source = source;
  60    object_ref(source);
  61    task->func = func;
  62    task->opaque = opaque;
  63    task->destroy = destroy;
  64    qemu_mutex_init(&task->thread_lock);
  65    qemu_cond_init(&task->thread_cond);
  66
  67    trace_qio_task_new(task, source, func, opaque);
  68
  69    return task;
  70}
  71
  72static void qio_task_free(QIOTask *task)
  73{
  74    qemu_mutex_lock(&task->thread_lock);
  75    if (task->thread) {
  76        if (task->thread->destroy) {
  77            task->thread->destroy(task->thread->opaque);
  78        }
  79
  80        if (task->thread->context) {
  81            g_main_context_unref(task->thread->context);
  82        }
  83
  84        g_free(task->thread);
  85    }
  86
  87    if (task->destroy) {
  88        task->destroy(task->opaque);
  89    }
  90    if (task->destroyResult) {
  91        task->destroyResult(task->result);
  92    }
  93    if (task->err) {
  94        error_free(task->err);
  95    }
  96    object_unref(task->source);
  97
  98    qemu_mutex_unlock(&task->thread_lock);
  99    qemu_mutex_destroy(&task->thread_lock);
 100    qemu_cond_destroy(&task->thread_cond);
 101
 102    g_free(task);
 103}
 104
 105
 106static gboolean qio_task_thread_result(gpointer opaque)
 107{
 108    QIOTask *task = opaque;
 109
 110    trace_qio_task_thread_result(task);
 111    qio_task_complete(task);
 112
 113    return FALSE;
 114}
 115
 116
 117static gpointer qio_task_thread_worker(gpointer opaque)
 118{
 119    QIOTask *task = opaque;
 120
 121    trace_qio_task_thread_run(task);
 122
 123    task->thread->worker(task, task->thread->opaque);
 124
 125    /* We're running in the background thread, and must only
 126     * ever report the task results in the main event loop
 127     * thread. So we schedule an idle callback to report
 128     * the worker results
 129     */
 130    trace_qio_task_thread_exit(task);
 131
 132    qemu_mutex_lock(&task->thread_lock);
 133
 134    task->thread->completion = g_idle_source_new();
 135    g_source_set_callback(task->thread->completion,
 136                          qio_task_thread_result, task, NULL);
 137    g_source_attach(task->thread->completion,
 138                    task->thread->context);
 139    trace_qio_task_thread_source_attach(task, task->thread->completion);
 140
 141    qemu_cond_signal(&task->thread_cond);
 142    qemu_mutex_unlock(&task->thread_lock);
 143
 144    return NULL;
 145}
 146
 147
 148void qio_task_run_in_thread(QIOTask *task,
 149                            QIOTaskWorker worker,
 150                            gpointer opaque,
 151                            GDestroyNotify destroy,
 152                            GMainContext *context)
 153{
 154    struct QIOTaskThreadData *data = g_new0(struct QIOTaskThreadData, 1);
 155    QemuThread thread;
 156
 157    if (context) {
 158        g_main_context_ref(context);
 159    }
 160
 161    data->worker = worker;
 162    data->opaque = opaque;
 163    data->destroy = destroy;
 164    data->context = context;
 165
 166    task->thread = data;
 167
 168    trace_qio_task_thread_start(task, worker, opaque);
 169    qemu_thread_create(&thread,
 170                       "io-task-worker",
 171                       qio_task_thread_worker,
 172                       task,
 173                       QEMU_THREAD_DETACHED);
 174}
 175
 176
 177void qio_task_wait_thread(QIOTask *task)
 178{
 179    qemu_mutex_lock(&task->thread_lock);
 180    g_assert(task->thread != NULL);
 181    while (task->thread->completion == NULL) {
 182        qemu_cond_wait(&task->thread_cond, &task->thread_lock);
 183    }
 184
 185    trace_qio_task_thread_source_cancel(task, task->thread->completion);
 186    g_source_destroy(task->thread->completion);
 187    qemu_mutex_unlock(&task->thread_lock);
 188
 189    qio_task_thread_result(task);
 190}
 191
 192
 193void qio_task_complete(QIOTask *task)
 194{
 195    task->func(task, task->opaque);
 196    trace_qio_task_complete(task);
 197    qio_task_free(task);
 198}
 199
 200
 201void qio_task_set_error(QIOTask *task,
 202                        Error *err)
 203{
 204    error_propagate(&task->err, err);
 205}
 206
 207
 208bool qio_task_propagate_error(QIOTask *task,
 209                              Error **errp)
 210{
 211    if (task->err) {
 212        error_propagate(errp, task->err);
 213        task->err = NULL;
 214        return true;
 215    }
 216
 217    return false;
 218}
 219
 220
 221void qio_task_set_result_pointer(QIOTask *task,
 222                                 gpointer result,
 223                                 GDestroyNotify destroy)
 224{
 225    task->result = result;
 226    task->destroyResult = destroy;
 227}
 228
 229
 230gpointer qio_task_get_result_pointer(QIOTask *task)
 231{
 232    return task->result;
 233}
 234
 235
 236Object *qio_task_get_source(QIOTask *task)
 237{
 238    return task->source;
 239}
 240