1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29#include "qemu/osdep.h"
30#include "qemu/coroutine.h"
31#include "qemu/coroutine_int.h"
32#include "qemu/processor.h"
33#include "qemu/queue.h"
34#include "block/aio.h"
35#include "trace.h"
36
37void qemu_co_queue_init(CoQueue *queue)
38{
39 QSIMPLEQ_INIT(&queue->entries);
40}
41
42void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuLockable *lock)
43{
44 Coroutine *self = qemu_coroutine_self();
45 QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
46
47 if (lock) {
48 qemu_lockable_unlock(lock);
49 }
50
51
52
53
54
55
56 qemu_coroutine_yield();
57 assert(qemu_in_coroutine());
58
59
60
61
62
63
64
65 if (lock) {
66 qemu_lockable_lock(lock);
67 }
68}
69
70static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
71{
72 Coroutine *next;
73
74 if (QSIMPLEQ_EMPTY(&queue->entries)) {
75 return false;
76 }
77
78 while ((next = QSIMPLEQ_FIRST(&queue->entries)) != NULL) {
79 QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
80 aio_co_wake(next);
81 if (single) {
82 break;
83 }
84 }
85 return true;
86}
87
88bool qemu_co_queue_next(CoQueue *queue)
89{
90 return qemu_co_queue_do_restart(queue, true);
91}
92
93void qemu_co_queue_restart_all(CoQueue *queue)
94{
95 qemu_co_queue_do_restart(queue, false);
96}
97
98bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock)
99{
100 Coroutine *next;
101
102 next = QSIMPLEQ_FIRST(&queue->entries);
103 if (!next) {
104 return false;
105 }
106
107 QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
108 if (lock) {
109 qemu_lockable_unlock(lock);
110 }
111 aio_co_wake(next);
112 if (lock) {
113 qemu_lockable_lock(lock);
114 }
115 return true;
116}
117
118bool qemu_co_queue_empty(CoQueue *queue)
119{
120 return QSIMPLEQ_FIRST(&queue->entries) == NULL;
121}
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142typedef struct CoWaitRecord {
143 Coroutine *co;
144 QSLIST_ENTRY(CoWaitRecord) next;
145} CoWaitRecord;
146
147static void push_waiter(CoMutex *mutex, CoWaitRecord *w)
148{
149 w->co = qemu_coroutine_self();
150 QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next);
151}
152
153static void move_waiters(CoMutex *mutex)
154{
155 QSLIST_HEAD(, CoWaitRecord) reversed;
156 QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push);
157 while (!QSLIST_EMPTY(&reversed)) {
158 CoWaitRecord *w = QSLIST_FIRST(&reversed);
159 QSLIST_REMOVE_HEAD(&reversed, next);
160 QSLIST_INSERT_HEAD(&mutex->to_pop, w, next);
161 }
162}
163
164static CoWaitRecord *pop_waiter(CoMutex *mutex)
165{
166 CoWaitRecord *w;
167
168 if (QSLIST_EMPTY(&mutex->to_pop)) {
169 move_waiters(mutex);
170 if (QSLIST_EMPTY(&mutex->to_pop)) {
171 return NULL;
172 }
173 }
174 w = QSLIST_FIRST(&mutex->to_pop);
175 QSLIST_REMOVE_HEAD(&mutex->to_pop, next);
176 return w;
177}
178
179static bool has_waiters(CoMutex *mutex)
180{
181 return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push);
182}
183
184void qemu_co_mutex_init(CoMutex *mutex)
185{
186 memset(mutex, 0, sizeof(*mutex));
187}
188
189static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co)
190{
191
192
193
194 smp_read_barrier_depends();
195 mutex->ctx = co->ctx;
196 aio_co_wake(co);
197}
198
199static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx,
200 CoMutex *mutex)
201{
202 Coroutine *self = qemu_coroutine_self();
203 CoWaitRecord w;
204 unsigned old_handoff;
205
206 trace_qemu_co_mutex_lock_entry(mutex, self);
207 w.co = self;
208 push_waiter(mutex, &w);
209
210
211
212
213 old_handoff = qatomic_mb_read(&mutex->handoff);
214 if (old_handoff &&
215 has_waiters(mutex) &&
216 qatomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) {
217
218
219
220 CoWaitRecord *to_wake = pop_waiter(mutex);
221 Coroutine *co = to_wake->co;
222 if (co == self) {
223
224 assert(to_wake == &w);
225 mutex->ctx = ctx;
226 return;
227 }
228
229 qemu_co_mutex_wake(mutex, co);
230 }
231
232 qemu_coroutine_yield();
233 trace_qemu_co_mutex_lock_return(mutex, self);
234}
235
236void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
237{
238 AioContext *ctx = qemu_get_current_aio_context();
239 Coroutine *self = qemu_coroutine_self();
240 int waiters, i;
241
242
243
244
245
246
247
248
249 i = 0;
250retry_fast_path:
251 waiters = qatomic_cmpxchg(&mutex->locked, 0, 1);
252 if (waiters != 0) {
253 while (waiters == 1 && ++i < 1000) {
254 if (qatomic_read(&mutex->ctx) == ctx) {
255 break;
256 }
257 if (qatomic_read(&mutex->locked) == 0) {
258 goto retry_fast_path;
259 }
260 cpu_relax();
261 }
262 waiters = qatomic_fetch_inc(&mutex->locked);
263 }
264
265 if (waiters == 0) {
266
267 trace_qemu_co_mutex_lock_uncontended(mutex, self);
268 mutex->ctx = ctx;
269 } else {
270 qemu_co_mutex_lock_slowpath(ctx, mutex);
271 }
272 mutex->holder = self;
273 self->locks_held++;
274}
275
276void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
277{
278 Coroutine *self = qemu_coroutine_self();
279
280 trace_qemu_co_mutex_unlock_entry(mutex, self);
281
282 assert(mutex->locked);
283 assert(mutex->holder == self);
284 assert(qemu_in_coroutine());
285
286 mutex->ctx = NULL;
287 mutex->holder = NULL;
288 self->locks_held--;
289 if (qatomic_fetch_dec(&mutex->locked) == 1) {
290
291 return;
292 }
293
294 for (;;) {
295 CoWaitRecord *to_wake = pop_waiter(mutex);
296 unsigned our_handoff;
297
298 if (to_wake) {
299 qemu_co_mutex_wake(mutex, to_wake->co);
300 break;
301 }
302
303
304
305
306
307 if (++mutex->sequence == 0) {
308 mutex->sequence = 1;
309 }
310
311 our_handoff = mutex->sequence;
312 qatomic_mb_set(&mutex->handoff, our_handoff);
313 if (!has_waiters(mutex)) {
314
315
316
317 break;
318 }
319
320
321
322
323 if (qatomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) {
324 break;
325 }
326 }
327
328 trace_qemu_co_mutex_unlock_return(mutex, self);
329}
330
331void qemu_co_rwlock_init(CoRwlock *lock)
332{
333 memset(lock, 0, sizeof(*lock));
334 qemu_co_queue_init(&lock->queue);
335 qemu_co_mutex_init(&lock->mutex);
336}
337
338void qemu_co_rwlock_rdlock(CoRwlock *lock)
339{
340 Coroutine *self = qemu_coroutine_self();
341
342 qemu_co_mutex_lock(&lock->mutex);
343
344 while (lock->pending_writer) {
345 qemu_co_queue_wait(&lock->queue, &lock->mutex);
346 }
347 lock->reader++;
348 qemu_co_mutex_unlock(&lock->mutex);
349
350
351 self->locks_held++;
352}
353
354void qemu_co_rwlock_unlock(CoRwlock *lock)
355{
356 Coroutine *self = qemu_coroutine_self();
357
358 assert(qemu_in_coroutine());
359 if (!lock->reader) {
360
361 qemu_co_queue_restart_all(&lock->queue);
362 } else {
363 self->locks_held--;
364
365 qemu_co_mutex_lock(&lock->mutex);
366 lock->reader--;
367 assert(lock->reader >= 0);
368
369 if (!lock->reader) {
370 qemu_co_queue_next(&lock->queue);
371 }
372 }
373 qemu_co_mutex_unlock(&lock->mutex);
374}
375
376void qemu_co_rwlock_downgrade(CoRwlock *lock)
377{
378 Coroutine *self = qemu_coroutine_self();
379
380
381
382
383 assert(lock->reader == 0);
384 lock->reader++;
385 qemu_co_mutex_unlock(&lock->mutex);
386
387
388 self->locks_held++;
389}
390
391void qemu_co_rwlock_wrlock(CoRwlock *lock)
392{
393 qemu_co_mutex_lock(&lock->mutex);
394 lock->pending_writer++;
395 while (lock->reader) {
396 qemu_co_queue_wait(&lock->queue, &lock->mutex);
397 }
398 lock->pending_writer--;
399
400
401
402
403
404}
405
406void qemu_co_rwlock_upgrade(CoRwlock *lock)
407{
408 Coroutine *self = qemu_coroutine_self();
409
410 qemu_co_mutex_lock(&lock->mutex);
411 assert(lock->reader > 0);
412 lock->reader--;
413 lock->pending_writer++;
414 while (lock->reader) {
415 qemu_co_queue_wait(&lock->queue, &lock->mutex);
416 }
417 lock->pending_writer--;
418
419
420
421
422
423 self->locks_held--;
424}
425