qemu/tests/test-thread-pool.c
<<
>>
Prefs
   1#include <glib.h>
   2#include "qemu-common.h"
   3#include "block/aio.h"
   4#include "block/thread-pool.h"
   5#include "block/block.h"
   6#include "qemu/timer.h"
   7
   8static AioContext *ctx;
   9static ThreadPool *pool;
  10static int active;
  11
  12typedef struct {
  13    BlockDriverAIOCB *aiocb;
  14    int n;
  15    int ret;
  16} WorkerTestData;
  17
  18static int worker_cb(void *opaque)
  19{
  20    WorkerTestData *data = opaque;
  21    return atomic_fetch_inc(&data->n);
  22}
  23
  24static int long_cb(void *opaque)
  25{
  26    WorkerTestData *data = opaque;
  27    atomic_inc(&data->n);
  28    g_usleep(2000000);
  29    atomic_inc(&data->n);
  30    return 0;
  31}
  32
  33static void done_cb(void *opaque, int ret)
  34{
  35    WorkerTestData *data = opaque;
  36    g_assert_cmpint(data->ret, ==, -EINPROGRESS);
  37    data->ret = ret;
  38    data->aiocb = NULL;
  39
  40    /* Callbacks are serialized, so no need to use atomic ops.  */
  41    active--;
  42}
  43
  44static void test_submit(void)
  45{
  46    WorkerTestData data = { .n = 0 };
  47    thread_pool_submit(pool, worker_cb, &data);
  48    while (data.n == 0) {
  49        aio_poll(ctx, true);
  50    }
  51    g_assert_cmpint(data.n, ==, 1);
  52}
  53
  54static void test_submit_aio(void)
  55{
  56    WorkerTestData data = { .n = 0, .ret = -EINPROGRESS };
  57    data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data,
  58                                        done_cb, &data);
  59
  60    /* The callbacks are not called until after the first wait.  */
  61    active = 1;
  62    g_assert_cmpint(data.ret, ==, -EINPROGRESS);
  63    while (data.ret == -EINPROGRESS) {
  64        aio_poll(ctx, true);
  65    }
  66    g_assert_cmpint(active, ==, 0);
  67    g_assert_cmpint(data.n, ==, 1);
  68    g_assert_cmpint(data.ret, ==, 0);
  69}
  70
  71static void co_test_cb(void *opaque)
  72{
  73    WorkerTestData *data = opaque;
  74
  75    active = 1;
  76    data->n = 0;
  77    data->ret = -EINPROGRESS;
  78    thread_pool_submit_co(pool, worker_cb, data);
  79
  80    /* The test continues in test_submit_co, after qemu_coroutine_enter... */
  81
  82    g_assert_cmpint(data->n, ==, 1);
  83    data->ret = 0;
  84    active--;
  85
  86    /* The test continues in test_submit_co, after qemu_aio_wait_all... */
  87}
  88
  89static void test_submit_co(void)
  90{
  91    WorkerTestData data;
  92    Coroutine *co = qemu_coroutine_create(co_test_cb);
  93
  94    qemu_coroutine_enter(co, &data);
  95
  96    /* Back here once the worker has started.  */
  97
  98    g_assert_cmpint(active, ==, 1);
  99    g_assert_cmpint(data.ret, ==, -EINPROGRESS);
 100
 101    /* qemu_aio_wait_all will execute the rest of the coroutine.  */
 102
 103    while (data.ret == -EINPROGRESS) {
 104        aio_poll(ctx, true);
 105    }
 106
 107    /* Back here after the coroutine has finished.  */
 108
 109    g_assert_cmpint(active, ==, 0);
 110    g_assert_cmpint(data.ret, ==, 0);
 111}
 112
 113static void test_submit_many(void)
 114{
 115    WorkerTestData data[100];
 116    int i;
 117
 118    /* Start more work items than there will be threads.  */
 119    for (i = 0; i < 100; i++) {
 120        data[i].n = 0;
 121        data[i].ret = -EINPROGRESS;
 122        thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]);
 123    }
 124
 125    active = 100;
 126    while (active > 0) {
 127        aio_poll(ctx, true);
 128    }
 129    for (i = 0; i < 100; i++) {
 130        g_assert_cmpint(data[i].n, ==, 1);
 131        g_assert_cmpint(data[i].ret, ==, 0);
 132    }
 133}
 134
 135static void test_cancel(void)
 136{
 137    WorkerTestData data[100];
 138    int num_canceled;
 139    int i;
 140
 141    /* Start more work items than there will be threads, to ensure
 142     * the pool is full.
 143     */
 144    test_submit_many();
 145
 146    /* Start long running jobs, to ensure we can cancel some.  */
 147    for (i = 0; i < 100; i++) {
 148        data[i].n = 0;
 149        data[i].ret = -EINPROGRESS;
 150        data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i],
 151                                               done_cb, &data[i]);
 152    }
 153
 154    /* Starting the threads may be left to a bottom half.  Let it
 155     * run, but do not waste too much time...
 156     */
 157    active = 100;
 158    aio_notify(ctx);
 159    aio_poll(ctx, false);
 160
 161    /* Wait some time for the threads to start, with some sanity
 162     * testing on the behavior of the scheduler...
 163     */
 164    g_assert_cmpint(active, ==, 100);
 165    g_usleep(1000000);
 166    g_assert_cmpint(active, >, 50);
 167
 168    /* Cancel the jobs that haven't been started yet.  */
 169    num_canceled = 0;
 170    for (i = 0; i < 100; i++) {
 171        if (atomic_cmpxchg(&data[i].n, 0, 3) == 0) {
 172            data[i].ret = -ECANCELED;
 173            bdrv_aio_cancel(data[i].aiocb);
 174            active--;
 175            num_canceled++;
 176        }
 177    }
 178    g_assert_cmpint(active, >, 0);
 179    g_assert_cmpint(num_canceled, <, 100);
 180
 181    /* Canceling the others will be a blocking operation.  */
 182    for (i = 0; i < 100; i++) {
 183        if (data[i].n != 3) {
 184            bdrv_aio_cancel(data[i].aiocb);
 185        }
 186    }
 187
 188    /* Finish execution and execute any remaining callbacks.  */
 189    while (active > 0) {
 190        aio_poll(ctx, true);
 191    }
 192    g_assert_cmpint(active, ==, 0);
 193    for (i = 0; i < 100; i++) {
 194        if (data[i].n == 3) {
 195            g_assert_cmpint(data[i].ret, ==, -ECANCELED);
 196            g_assert(data[i].aiocb != NULL);
 197        } else {
 198            g_assert_cmpint(data[i].n, ==, 2);
 199            g_assert_cmpint(data[i].ret, ==, 0);
 200            g_assert(data[i].aiocb == NULL);
 201        }
 202    }
 203}
 204
 205int main(int argc, char **argv)
 206{
 207    int ret;
 208
 209    init_clocks();
 210
 211    ctx = aio_context_new();
 212    pool = aio_get_thread_pool(ctx);
 213
 214    g_test_init(&argc, &argv, NULL);
 215    g_test_add_func("/thread-pool/submit", test_submit);
 216    g_test_add_func("/thread-pool/submit-aio", test_submit_aio);
 217    g_test_add_func("/thread-pool/submit-co", test_submit_co);
 218    g_test_add_func("/thread-pool/submit-many", test_submit_many);
 219    g_test_add_func("/thread-pool/cancel", test_cancel);
 220
 221    ret = g_test_run();
 222
 223    aio_context_unref(ctx);
 224    return ret;
 225}
 226