qemu/io/task.c
<<
>>
Prefs
   1/*
   2 * QEMU I/O task
   3 *
   4 * Copyright (c) 2015 Red Hat, Inc.
   5 *
   6 * This library is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU Lesser General Public
   8 * License as published by the Free Software Foundation; either
   9 * version 2 of the License, or (at your option) any later version.
  10 *
  11 * This library is distributed in the hope that it will be useful,
  12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * Lesser General Public License for more details.
  15 *
  16 * You should have received a copy of the GNU Lesser General Public
  17 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
  18 *
  19 */
  20
  21#include "qemu/osdep.h"
  22#include "io/task.h"
  23#include "qapi/error.h"
  24#include "qemu/thread.h"
  25#include "trace.h"
  26
  27struct QIOTask {
  28    Object *source;
  29    QIOTaskFunc func;
  30    gpointer opaque;
  31    GDestroyNotify destroy;
  32    Error *err;
  33    gpointer result;
  34    GDestroyNotify destroyResult;
  35};
  36
  37
  38QIOTask *qio_task_new(Object *source,
  39                      QIOTaskFunc func,
  40                      gpointer opaque,
  41                      GDestroyNotify destroy)
  42{
  43    QIOTask *task;
  44
  45    task = g_new0(QIOTask, 1);
  46
  47    task->source = source;
  48    object_ref(source);
  49    task->func = func;
  50    task->opaque = opaque;
  51    task->destroy = destroy;
  52
  53    trace_qio_task_new(task, source, func, opaque);
  54
  55    return task;
  56}
  57
  58static void qio_task_free(QIOTask *task)
  59{
  60    if (task->destroy) {
  61        task->destroy(task->opaque);
  62    }
  63    if (task->destroyResult) {
  64        task->destroyResult(task->result);
  65    }
  66    if (task->err) {
  67        error_free(task->err);
  68    }
  69    object_unref(task->source);
  70
  71    g_free(task);
  72}
  73
  74
  75struct QIOTaskThreadData {
  76    QIOTask *task;
  77    QIOTaskWorker worker;
  78    gpointer opaque;
  79    GDestroyNotify destroy;
  80    GMainContext *context;
  81};
  82
  83
  84static gboolean qio_task_thread_result(gpointer opaque)
  85{
  86    struct QIOTaskThreadData *data = opaque;
  87
  88    trace_qio_task_thread_result(data->task);
  89    qio_task_complete(data->task);
  90
  91    if (data->destroy) {
  92        data->destroy(data->opaque);
  93    }
  94
  95    if (data->context) {
  96        g_main_context_unref(data->context);
  97    }
  98
  99    g_free(data);
 100
 101    return FALSE;
 102}
 103
 104
 105static gpointer qio_task_thread_worker(gpointer opaque)
 106{
 107    struct QIOTaskThreadData *data = opaque;
 108    GSource *idle;
 109
 110    trace_qio_task_thread_run(data->task);
 111    data->worker(data->task, data->opaque);
 112
 113    /* We're running in the background thread, and must only
 114     * ever report the task results in the main event loop
 115     * thread. So we schedule an idle callback to report
 116     * the worker results
 117     */
 118    trace_qio_task_thread_exit(data->task);
 119
 120    idle = g_idle_source_new();
 121    g_source_set_callback(idle, qio_task_thread_result, data, NULL);
 122    g_source_attach(idle, data->context);
 123
 124    return NULL;
 125}
 126
 127
 128void qio_task_run_in_thread(QIOTask *task,
 129                            QIOTaskWorker worker,
 130                            gpointer opaque,
 131                            GDestroyNotify destroy,
 132                            GMainContext *context)
 133{
 134    struct QIOTaskThreadData *data = g_new0(struct QIOTaskThreadData, 1);
 135    QemuThread thread;
 136
 137    if (context) {
 138        g_main_context_ref(context);
 139    }
 140
 141    data->task = task;
 142    data->worker = worker;
 143    data->opaque = opaque;
 144    data->destroy = destroy;
 145    data->context = context;
 146
 147    trace_qio_task_thread_start(task, worker, opaque);
 148    qemu_thread_create(&thread,
 149                       "io-task-worker",
 150                       qio_task_thread_worker,
 151                       data,
 152                       QEMU_THREAD_DETACHED);
 153}
 154
 155
 156void qio_task_complete(QIOTask *task)
 157{
 158    task->func(task, task->opaque);
 159    trace_qio_task_complete(task);
 160    qio_task_free(task);
 161}
 162
 163
 164void qio_task_set_error(QIOTask *task,
 165                        Error *err)
 166{
 167    error_propagate(&task->err, err);
 168}
 169
 170
 171bool qio_task_propagate_error(QIOTask *task,
 172                              Error **errp)
 173{
 174    if (task->err) {
 175        error_propagate(errp, task->err);
 176        task->err = NULL;
 177        return true;
 178    }
 179
 180    return false;
 181}
 182
 183
 184void qio_task_set_result_pointer(QIOTask *task,
 185                                 gpointer result,
 186                                 GDestroyNotify destroy)
 187{
 188    task->result = result;
 189    task->destroyResult = destroy;
 190}
 191
 192
 193gpointer qio_task_get_result_pointer(QIOTask *task)
 194{
 195    return task->result;
 196}
 197
 198
 199Object *qio_task_get_source(QIOTask *task)
 200{
 201    return task->source;
 202}
 203