1
2
3
4
5
6
7
8
9
10
11
12
13#include "qemu/osdep.h"
14#include "block/aio.h"
15#include "qemu/coroutine.h"
16#include "qemu/thread.h"
17#include "qemu/error-report.h"
18#include "iothread.h"
19
20
21
22#define NUM_CONTEXTS 5
23
24static IOThread *threads[NUM_CONTEXTS];
25static AioContext *ctx[NUM_CONTEXTS];
26static __thread int id = -1;
27
28static QemuEvent done_event;
29
30
31
32typedef struct CtxRunData {
33 QEMUBHFunc *cb;
34 void *arg;
35} CtxRunData;
36
37static void ctx_run_bh_cb(void *opaque)
38{
39 CtxRunData *data = opaque;
40
41 data->cb(data->arg);
42 qemu_event_set(&done_event);
43}
44
45static void ctx_run(int i, QEMUBHFunc *cb, void *opaque)
46{
47 CtxRunData data = {
48 .cb = cb,
49 .arg = opaque
50 };
51
52 qemu_event_reset(&done_event);
53 aio_bh_schedule_oneshot(ctx[i], ctx_run_bh_cb, &data);
54 qemu_event_wait(&done_event);
55}
56
57
58
59static void set_id_cb(void *opaque)
60{
61 int *i = opaque;
62
63 id = *i;
64}
65
66static void create_aio_contexts(void)
67{
68 int i;
69
70 for (i = 0; i < NUM_CONTEXTS; i++) {
71 threads[i] = iothread_new();
72 ctx[i] = iothread_get_aio_context(threads[i]);
73 }
74
75 qemu_event_init(&done_event, false);
76 for (i = 0; i < NUM_CONTEXTS; i++) {
77 ctx_run(i, set_id_cb, &i);
78 }
79}
80
81
82
83static void join_aio_contexts(void)
84{
85 int i;
86
87 for (i = 0; i < NUM_CONTEXTS; i++) {
88 aio_context_ref(ctx[i]);
89 }
90 for (i = 0; i < NUM_CONTEXTS; i++) {
91 iothread_join(threads[i]);
92 }
93 for (i = 0; i < NUM_CONTEXTS; i++) {
94 aio_context_unref(ctx[i]);
95 }
96 qemu_event_destroy(&done_event);
97}
98
99
100
101static void test_lifecycle(void)
102{
103 create_aio_contexts();
104 join_aio_contexts();
105}
106
107
108
109static Coroutine *to_schedule[NUM_CONTEXTS];
110static bool stop[NUM_CONTEXTS];
111
112static int count_retry;
113static int count_here;
114static int count_other;
115
116static bool schedule_next(int n)
117{
118 Coroutine *co;
119
120 co = qatomic_xchg(&to_schedule[n], NULL);
121 if (!co) {
122 qatomic_inc(&count_retry);
123 return false;
124 }
125
126 if (n == id) {
127 qatomic_inc(&count_here);
128 } else {
129 qatomic_inc(&count_other);
130 }
131
132 aio_co_schedule(ctx[n], co);
133 return true;
134}
135
136static void finish_cb(void *opaque)
137{
138 stop[id] = true;
139 schedule_next(id);
140}
141
142static coroutine_fn void test_multi_co_schedule_entry(void *opaque)
143{
144 g_assert(to_schedule[id] == NULL);
145
146
147
148
149
150
151 while (!stop[id]) {
152 int n;
153
154 n = g_test_rand_int_range(0, NUM_CONTEXTS);
155 schedule_next(n);
156
157 qatomic_set_mb(&to_schedule[id], qemu_coroutine_self());
158
159 qemu_coroutine_yield();
160 g_assert(to_schedule[id] == NULL);
161 }
162}
163
164
165static void test_multi_co_schedule(int seconds)
166{
167 int i;
168
169 count_here = count_other = count_retry = 0;
170
171 create_aio_contexts();
172 for (i = 0; i < NUM_CONTEXTS; i++) {
173 Coroutine *co1 = qemu_coroutine_create(test_multi_co_schedule_entry, NULL);
174 aio_co_schedule(ctx[i], co1);
175 }
176
177 g_usleep(seconds * 1000000);
178
179
180 for (i = 0; i < NUM_CONTEXTS; i++) {
181 ctx_run(i, finish_cb, NULL);
182 g_assert(to_schedule[i] == NULL);
183 }
184
185 join_aio_contexts();
186 g_test_message("scheduled %d, queued %d, retry %d, total %d",
187 count_other, count_here, count_retry,
188 count_here + count_other + count_retry);
189}
190
191static void test_multi_co_schedule_1(void)
192{
193 test_multi_co_schedule(1);
194}
195
196static void test_multi_co_schedule_10(void)
197{
198 test_multi_co_schedule(10);
199}
200
201
202
203static uint32_t atomic_counter;
204static uint32_t running;
205static uint32_t counter;
206static CoMutex comutex;
207static bool now_stopping;
208
209static void coroutine_fn test_multi_co_mutex_entry(void *opaque)
210{
211 while (!qatomic_read(&now_stopping)) {
212 qemu_co_mutex_lock(&comutex);
213 counter++;
214 qemu_co_mutex_unlock(&comutex);
215
216
217
218
219
220
221 qatomic_inc(&atomic_counter);
222 }
223 qatomic_dec(&running);
224}
225
226static void test_multi_co_mutex(int threads, int seconds)
227{
228 int i;
229
230 qemu_co_mutex_init(&comutex);
231 counter = 0;
232 atomic_counter = 0;
233 now_stopping = false;
234
235 create_aio_contexts();
236 assert(threads <= NUM_CONTEXTS);
237 running = threads;
238 for (i = 0; i < threads; i++) {
239 Coroutine *co1 = qemu_coroutine_create(test_multi_co_mutex_entry, NULL);
240 aio_co_schedule(ctx[i], co1);
241 }
242
243 g_usleep(seconds * 1000000);
244
245 qatomic_set(&now_stopping, true);
246 while (running > 0) {
247 g_usleep(100000);
248 }
249
250 join_aio_contexts();
251 g_test_message("%d iterations/second", counter / seconds);
252 g_assert_cmpint(counter, ==, atomic_counter);
253}
254
255
256
257
258
259static void test_multi_co_mutex_1(void)
260{
261 test_multi_co_mutex(NUM_CONTEXTS, 1);
262}
263
264static void test_multi_co_mutex_10(void)
265{
266 test_multi_co_mutex(NUM_CONTEXTS, 10);
267}
268
269
270
271
272
273
274static void test_multi_co_mutex_2_3(void)
275{
276 test_multi_co_mutex(2, 3);
277}
278
279static void test_multi_co_mutex_2_30(void)
280{
281 test_multi_co_mutex(2, 30);
282}
283
284
285
286#ifdef CONFIG_LINUX
287#include "qemu/futex.h"
288
289
290
291
292static struct {
293 int next, locked;
294 int padding[14];
295} nodes[NUM_CONTEXTS] __attribute__((__aligned__(64)));
296
297static int mutex_head = -1;
298
299static void mcs_mutex_lock(void)
300{
301 int prev;
302
303 nodes[id].next = -1;
304 nodes[id].locked = 1;
305 prev = qatomic_xchg(&mutex_head, id);
306 if (prev != -1) {
307 qatomic_set(&nodes[prev].next, id);
308 while (qatomic_read(&nodes[id].locked) == 1) {
309 qemu_futex_wait(&nodes[id].locked, 1);
310 }
311 }
312}
313
314static void mcs_mutex_unlock(void)
315{
316 int next;
317 if (qatomic_read(&nodes[id].next) == -1) {
318 if (qatomic_read(&mutex_head) == id &&
319 qatomic_cmpxchg(&mutex_head, id, -1) == id) {
320
321 return;
322 }
323 while (qatomic_read(&nodes[id].next) == -1) {
324
325
326
327 }
328 }
329
330
331 next = qatomic_read(&nodes[id].next);
332 nodes[next].locked = 0;
333 qemu_futex_wake_single(&nodes[next].locked);
334}
335
336static void test_multi_fair_mutex_entry(void *opaque)
337{
338 while (!qatomic_read(&now_stopping)) {
339 mcs_mutex_lock();
340 counter++;
341 mcs_mutex_unlock();
342 qatomic_inc(&atomic_counter);
343 }
344 qatomic_dec(&running);
345}
346
347static void test_multi_fair_mutex(int threads, int seconds)
348{
349 int i;
350
351 assert(mutex_head == -1);
352 counter = 0;
353 atomic_counter = 0;
354 now_stopping = false;
355
356 create_aio_contexts();
357 assert(threads <= NUM_CONTEXTS);
358 running = threads;
359 for (i = 0; i < threads; i++) {
360 Coroutine *co1 = qemu_coroutine_create(test_multi_fair_mutex_entry, NULL);
361 aio_co_schedule(ctx[i], co1);
362 }
363
364 g_usleep(seconds * 1000000);
365
366 qatomic_set(&now_stopping, true);
367 while (running > 0) {
368 g_usleep(100000);
369 }
370
371 join_aio_contexts();
372 g_test_message("%d iterations/second", counter / seconds);
373 g_assert_cmpint(counter, ==, atomic_counter);
374}
375
376static void test_multi_fair_mutex_1(void)
377{
378 test_multi_fair_mutex(NUM_CONTEXTS, 1);
379}
380
381static void test_multi_fair_mutex_10(void)
382{
383 test_multi_fair_mutex(NUM_CONTEXTS, 10);
384}
385#endif
386
387
388
389
390static QemuMutex mutex;
391
392static void test_multi_mutex_entry(void *opaque)
393{
394 while (!qatomic_read(&now_stopping)) {
395 qemu_mutex_lock(&mutex);
396 counter++;
397 qemu_mutex_unlock(&mutex);
398 qatomic_inc(&atomic_counter);
399 }
400 qatomic_dec(&running);
401}
402
403static void test_multi_mutex(int threads, int seconds)
404{
405 int i;
406
407 qemu_mutex_init(&mutex);
408 counter = 0;
409 atomic_counter = 0;
410 now_stopping = false;
411
412 create_aio_contexts();
413 assert(threads <= NUM_CONTEXTS);
414 running = threads;
415 for (i = 0; i < threads; i++) {
416 Coroutine *co1 = qemu_coroutine_create(test_multi_mutex_entry, NULL);
417 aio_co_schedule(ctx[i], co1);
418 }
419
420 g_usleep(seconds * 1000000);
421
422 qatomic_set(&now_stopping, true);
423 while (running > 0) {
424 g_usleep(100000);
425 }
426
427 join_aio_contexts();
428 g_test_message("%d iterations/second", counter / seconds);
429 g_assert_cmpint(counter, ==, atomic_counter);
430}
431
432static void test_multi_mutex_1(void)
433{
434 test_multi_mutex(NUM_CONTEXTS, 1);
435}
436
437static void test_multi_mutex_10(void)
438{
439 test_multi_mutex(NUM_CONTEXTS, 10);
440}
441
442
443
444int main(int argc, char **argv)
445{
446 init_clocks(NULL);
447
448 g_test_init(&argc, &argv, NULL);
449 g_test_add_func("/aio/multi/lifecycle", test_lifecycle);
450 if (g_test_quick()) {
451 g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
452 g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_1);
453 g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_3);
454#ifdef CONFIG_LINUX
455 g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_1);
456#endif
457 g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_1);
458 } else {
459 g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);
460 g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_10);
461 g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_30);
462#ifdef CONFIG_LINUX
463 g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_10);
464#endif
465 g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_10);
466 }
467 return g_test_run();
468}
469