1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29#include "qemu/osdep.h"
30#include "qemu/coroutine.h"
31#include "qemu/coroutine_int.h"
32#include "qemu/processor.h"
33#include "qemu/queue.h"
34#include "block/aio.h"
35#include "trace.h"
36
37void qemu_co_queue_init(CoQueue *queue)
38{
39 QSIMPLEQ_INIT(&queue->entries);
40}
41
42void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuLockable *lock,
43 CoQueueWaitFlags flags)
44{
45 Coroutine *self = qemu_coroutine_self();
46 if (flags & CO_QUEUE_WAIT_FRONT) {
47 QSIMPLEQ_INSERT_HEAD(&queue->entries, self, co_queue_next);
48 } else {
49 QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
50 }
51
52 if (lock) {
53 qemu_lockable_unlock(lock);
54 }
55
56
57
58
59
60
61 qemu_coroutine_yield();
62 assert(qemu_in_coroutine());
63
64
65
66
67
68
69
70 if (lock) {
71 qemu_lockable_lock(lock);
72 }
73}
74
75bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock)
76{
77 Coroutine *next;
78
79 next = QSIMPLEQ_FIRST(&queue->entries);
80 if (!next) {
81 return false;
82 }
83
84 QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
85 if (lock) {
86 qemu_lockable_unlock(lock);
87 }
88 aio_co_wake(next);
89 if (lock) {
90 qemu_lockable_lock(lock);
91 }
92 return true;
93}
94
95bool coroutine_fn qemu_co_queue_next(CoQueue *queue)
96{
97
98 return qemu_co_enter_next_impl(queue, NULL);
99}
100
101void qemu_co_enter_all_impl(CoQueue *queue, QemuLockable *lock)
102{
103 while (qemu_co_enter_next_impl(queue, lock)) {
104
105 }
106}
107
108void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue)
109{
110
111 qemu_co_enter_all_impl(queue, NULL);
112}
113
114bool qemu_co_queue_empty(CoQueue *queue)
115{
116 return QSIMPLEQ_FIRST(&queue->entries) == NULL;
117}
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138typedef struct CoWaitRecord {
139 Coroutine *co;
140 QSLIST_ENTRY(CoWaitRecord) next;
141} CoWaitRecord;
142
143static void coroutine_fn push_waiter(CoMutex *mutex, CoWaitRecord *w)
144{
145 w->co = qemu_coroutine_self();
146 QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next);
147}
148
149static void move_waiters(CoMutex *mutex)
150{
151 QSLIST_HEAD(, CoWaitRecord) reversed;
152 QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push);
153 while (!QSLIST_EMPTY(&reversed)) {
154 CoWaitRecord *w = QSLIST_FIRST(&reversed);
155 QSLIST_REMOVE_HEAD(&reversed, next);
156 QSLIST_INSERT_HEAD(&mutex->to_pop, w, next);
157 }
158}
159
160static CoWaitRecord *pop_waiter(CoMutex *mutex)
161{
162 CoWaitRecord *w;
163
164 if (QSLIST_EMPTY(&mutex->to_pop)) {
165 move_waiters(mutex);
166 if (QSLIST_EMPTY(&mutex->to_pop)) {
167 return NULL;
168 }
169 }
170 w = QSLIST_FIRST(&mutex->to_pop);
171 QSLIST_REMOVE_HEAD(&mutex->to_pop, next);
172 return w;
173}
174
175static bool has_waiters(CoMutex *mutex)
176{
177 return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push);
178}
179
180void qemu_co_mutex_init(CoMutex *mutex)
181{
182 memset(mutex, 0, sizeof(*mutex));
183}
184
185static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co)
186{
187
188
189
190 smp_read_barrier_depends();
191 mutex->ctx = co->ctx;
192 aio_co_wake(co);
193}
194
195static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx,
196 CoMutex *mutex)
197{
198 Coroutine *self = qemu_coroutine_self();
199 CoWaitRecord w;
200 unsigned old_handoff;
201
202 trace_qemu_co_mutex_lock_entry(mutex, self);
203 push_waiter(mutex, &w);
204
205
206
207
208 old_handoff = qatomic_mb_read(&mutex->handoff);
209 if (old_handoff &&
210 has_waiters(mutex) &&
211 qatomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) {
212
213
214
215 CoWaitRecord *to_wake = pop_waiter(mutex);
216 Coroutine *co = to_wake->co;
217 if (co == self) {
218
219 assert(to_wake == &w);
220 mutex->ctx = ctx;
221 return;
222 }
223
224 qemu_co_mutex_wake(mutex, co);
225 }
226
227 qemu_coroutine_yield();
228 trace_qemu_co_mutex_lock_return(mutex, self);
229}
230
231void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
232{
233 AioContext *ctx = qemu_get_current_aio_context();
234 Coroutine *self = qemu_coroutine_self();
235 int waiters, i;
236
237
238
239
240
241
242
243
244 i = 0;
245retry_fast_path:
246 waiters = qatomic_cmpxchg(&mutex->locked, 0, 1);
247 if (waiters != 0) {
248 while (waiters == 1 && ++i < 1000) {
249 if (qatomic_read(&mutex->ctx) == ctx) {
250 break;
251 }
252 if (qatomic_read(&mutex->locked) == 0) {
253 goto retry_fast_path;
254 }
255 cpu_relax();
256 }
257 waiters = qatomic_fetch_inc(&mutex->locked);
258 }
259
260 if (waiters == 0) {
261
262 trace_qemu_co_mutex_lock_uncontended(mutex, self);
263 mutex->ctx = ctx;
264 } else {
265 qemu_co_mutex_lock_slowpath(ctx, mutex);
266 }
267 mutex->holder = self;
268 self->locks_held++;
269}
270
271void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
272{
273 Coroutine *self = qemu_coroutine_self();
274
275 trace_qemu_co_mutex_unlock_entry(mutex, self);
276
277 assert(mutex->locked);
278 assert(mutex->holder == self);
279 assert(qemu_in_coroutine());
280
281 mutex->ctx = NULL;
282 mutex->holder = NULL;
283 self->locks_held--;
284 if (qatomic_fetch_dec(&mutex->locked) == 1) {
285
286 return;
287 }
288
289 for (;;) {
290 CoWaitRecord *to_wake = pop_waiter(mutex);
291 unsigned our_handoff;
292
293 if (to_wake) {
294 qemu_co_mutex_wake(mutex, to_wake->co);
295 break;
296 }
297
298
299
300
301
302 if (++mutex->sequence == 0) {
303 mutex->sequence = 1;
304 }
305
306 our_handoff = mutex->sequence;
307 qatomic_mb_set(&mutex->handoff, our_handoff);
308 if (!has_waiters(mutex)) {
309
310
311
312 break;
313 }
314
315
316
317
318 if (qatomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) {
319 break;
320 }
321 }
322
323 trace_qemu_co_mutex_unlock_return(mutex, self);
324}
325
326struct CoRwTicket {
327 bool read;
328 Coroutine *co;
329 QSIMPLEQ_ENTRY(CoRwTicket) next;
330};
331
332void qemu_co_rwlock_init(CoRwlock *lock)
333{
334 qemu_co_mutex_init(&lock->mutex);
335 lock->owners = 0;
336 QSIMPLEQ_INIT(&lock->tickets);
337}
338
339
340static void coroutine_fn qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)
341{
342 CoRwTicket *tkt = QSIMPLEQ_FIRST(&lock->tickets);
343 Coroutine *co = NULL;
344
345
346
347
348
349
350 if (tkt) {
351 if (tkt->read) {
352 if (lock->owners >= 0) {
353 lock->owners++;
354 co = tkt->co;
355 }
356 } else {
357 if (lock->owners == 0) {
358 lock->owners = -1;
359 co = tkt->co;
360 }
361 }
362 }
363
364 if (co) {
365 QSIMPLEQ_REMOVE_HEAD(&lock->tickets, next);
366 qemu_co_mutex_unlock(&lock->mutex);
367 aio_co_wake(co);
368 } else {
369 qemu_co_mutex_unlock(&lock->mutex);
370 }
371}
372
373void coroutine_fn qemu_co_rwlock_rdlock(CoRwlock *lock)
374{
375 Coroutine *self = qemu_coroutine_self();
376
377 qemu_co_mutex_lock(&lock->mutex);
378
379 if (lock->owners == 0 || (lock->owners > 0 && QSIMPLEQ_EMPTY(&lock->tickets))) {
380 lock->owners++;
381 qemu_co_mutex_unlock(&lock->mutex);
382 } else {
383 CoRwTicket my_ticket = { true, self };
384
385 QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
386 qemu_co_mutex_unlock(&lock->mutex);
387 qemu_coroutine_yield();
388 assert(lock->owners >= 1);
389
390
391 qemu_co_mutex_lock(&lock->mutex);
392 qemu_co_rwlock_maybe_wake_one(lock);
393 }
394
395 self->locks_held++;
396}
397
398void coroutine_fn qemu_co_rwlock_unlock(CoRwlock *lock)
399{
400 Coroutine *self = qemu_coroutine_self();
401
402 assert(qemu_in_coroutine());
403 self->locks_held--;
404
405 qemu_co_mutex_lock(&lock->mutex);
406 if (lock->owners > 0) {
407 lock->owners--;
408 } else {
409 assert(lock->owners == -1);
410 lock->owners = 0;
411 }
412
413 qemu_co_rwlock_maybe_wake_one(lock);
414}
415
416void coroutine_fn qemu_co_rwlock_downgrade(CoRwlock *lock)
417{
418 qemu_co_mutex_lock(&lock->mutex);
419 assert(lock->owners == -1);
420 lock->owners = 1;
421
422
423 qemu_co_rwlock_maybe_wake_one(lock);
424}
425
426void coroutine_fn qemu_co_rwlock_wrlock(CoRwlock *lock)
427{
428 Coroutine *self = qemu_coroutine_self();
429
430 qemu_co_mutex_lock(&lock->mutex);
431 if (lock->owners == 0) {
432 lock->owners = -1;
433 qemu_co_mutex_unlock(&lock->mutex);
434 } else {
435 CoRwTicket my_ticket = { false, qemu_coroutine_self() };
436
437 QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
438 qemu_co_mutex_unlock(&lock->mutex);
439 qemu_coroutine_yield();
440 assert(lock->owners == -1);
441 }
442
443 self->locks_held++;
444}
445
446void coroutine_fn qemu_co_rwlock_upgrade(CoRwlock *lock)
447{
448 qemu_co_mutex_lock(&lock->mutex);
449 assert(lock->owners > 0);
450
451 if (lock->owners == 1 && QSIMPLEQ_EMPTY(&lock->tickets)) {
452 lock->owners = -1;
453 qemu_co_mutex_unlock(&lock->mutex);
454 } else {
455 CoRwTicket my_ticket = { false, qemu_coroutine_self() };
456
457 lock->owners--;
458 QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
459 qemu_co_rwlock_maybe_wake_one(lock);
460 qemu_coroutine_yield();
461 assert(lock->owners == -1);
462 }
463}
464