1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29#include "qemu/osdep.h"
30#include "qemu-common.h"
31#include "qemu/coroutine.h"
32#include "qemu/coroutine_int.h"
33#include "qemu/processor.h"
34#include "qemu/queue.h"
35#include "block/aio.h"
36#include "trace.h"
37
38void qemu_co_queue_init(CoQueue *queue)
39{
40 QSIMPLEQ_INIT(&queue->entries);
41}
42
43void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuLockable *lock)
44{
45 Coroutine *self = qemu_coroutine_self();
46 QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
47
48 if (lock) {
49 qemu_lockable_unlock(lock);
50 }
51
52
53
54
55
56
57 qemu_coroutine_yield();
58 assert(qemu_in_coroutine());
59
60
61
62
63
64
65
66 if (lock) {
67 qemu_lockable_lock(lock);
68 }
69}
70
71static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
72{
73 Coroutine *next;
74
75 if (QSIMPLEQ_EMPTY(&queue->entries)) {
76 return false;
77 }
78
79 while ((next = QSIMPLEQ_FIRST(&queue->entries)) != NULL) {
80 QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
81 aio_co_wake(next);
82 if (single) {
83 break;
84 }
85 }
86 return true;
87}
88
89bool coroutine_fn qemu_co_queue_next(CoQueue *queue)
90{
91 assert(qemu_in_coroutine());
92 return qemu_co_queue_do_restart(queue, true);
93}
94
95void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue)
96{
97 assert(qemu_in_coroutine());
98 qemu_co_queue_do_restart(queue, false);
99}
100
101bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock)
102{
103 Coroutine *next;
104
105 next = QSIMPLEQ_FIRST(&queue->entries);
106 if (!next) {
107 return false;
108 }
109
110 QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
111 if (lock) {
112 qemu_lockable_unlock(lock);
113 }
114 aio_co_wake(next);
115 if (lock) {
116 qemu_lockable_lock(lock);
117 }
118 return true;
119}
120
121bool qemu_co_queue_empty(CoQueue *queue)
122{
123 return QSIMPLEQ_FIRST(&queue->entries) == NULL;
124}
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145typedef struct CoWaitRecord {
146 Coroutine *co;
147 QSLIST_ENTRY(CoWaitRecord) next;
148} CoWaitRecord;
149
150static void push_waiter(CoMutex *mutex, CoWaitRecord *w)
151{
152 w->co = qemu_coroutine_self();
153 QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next);
154}
155
156static void move_waiters(CoMutex *mutex)
157{
158 QSLIST_HEAD(, CoWaitRecord) reversed;
159 QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push);
160 while (!QSLIST_EMPTY(&reversed)) {
161 CoWaitRecord *w = QSLIST_FIRST(&reversed);
162 QSLIST_REMOVE_HEAD(&reversed, next);
163 QSLIST_INSERT_HEAD(&mutex->to_pop, w, next);
164 }
165}
166
167static CoWaitRecord *pop_waiter(CoMutex *mutex)
168{
169 CoWaitRecord *w;
170
171 if (QSLIST_EMPTY(&mutex->to_pop)) {
172 move_waiters(mutex);
173 if (QSLIST_EMPTY(&mutex->to_pop)) {
174 return NULL;
175 }
176 }
177 w = QSLIST_FIRST(&mutex->to_pop);
178 QSLIST_REMOVE_HEAD(&mutex->to_pop, next);
179 return w;
180}
181
182static bool has_waiters(CoMutex *mutex)
183{
184 return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push);
185}
186
187void qemu_co_mutex_init(CoMutex *mutex)
188{
189 memset(mutex, 0, sizeof(*mutex));
190}
191
192static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co)
193{
194
195
196
197 smp_read_barrier_depends();
198 mutex->ctx = co->ctx;
199 aio_co_wake(co);
200}
201
202static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx,
203 CoMutex *mutex)
204{
205 Coroutine *self = qemu_coroutine_self();
206 CoWaitRecord w;
207 unsigned old_handoff;
208
209 trace_qemu_co_mutex_lock_entry(mutex, self);
210 w.co = self;
211 push_waiter(mutex, &w);
212
213
214
215
216 old_handoff = atomic_mb_read(&mutex->handoff);
217 if (old_handoff &&
218 has_waiters(mutex) &&
219 atomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) {
220
221
222
223 CoWaitRecord *to_wake = pop_waiter(mutex);
224 Coroutine *co = to_wake->co;
225 if (co == self) {
226
227 assert(to_wake == &w);
228 mutex->ctx = ctx;
229 return;
230 }
231
232 qemu_co_mutex_wake(mutex, co);
233 }
234
235 qemu_coroutine_yield();
236 trace_qemu_co_mutex_lock_return(mutex, self);
237}
238
239void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
240{
241 AioContext *ctx = qemu_get_current_aio_context();
242 Coroutine *self = qemu_coroutine_self();
243 int waiters, i;
244
245
246
247
248
249
250
251
252 i = 0;
253retry_fast_path:
254 waiters = atomic_cmpxchg(&mutex->locked, 0, 1);
255 if (waiters != 0) {
256 while (waiters == 1 && ++i < 1000) {
257 if (atomic_read(&mutex->ctx) == ctx) {
258 break;
259 }
260 if (atomic_read(&mutex->locked) == 0) {
261 goto retry_fast_path;
262 }
263 cpu_relax();
264 }
265 waiters = atomic_fetch_inc(&mutex->locked);
266 }
267
268 if (waiters == 0) {
269
270 trace_qemu_co_mutex_lock_uncontended(mutex, self);
271 mutex->ctx = ctx;
272 } else {
273 qemu_co_mutex_lock_slowpath(ctx, mutex);
274 }
275 mutex->holder = self;
276 self->locks_held++;
277}
278
279void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
280{
281 Coroutine *self = qemu_coroutine_self();
282
283 trace_qemu_co_mutex_unlock_entry(mutex, self);
284
285 assert(mutex->locked);
286 assert(mutex->holder == self);
287 assert(qemu_in_coroutine());
288
289 mutex->ctx = NULL;
290 mutex->holder = NULL;
291 self->locks_held--;
292 if (atomic_fetch_dec(&mutex->locked) == 1) {
293
294 return;
295 }
296
297 for (;;) {
298 CoWaitRecord *to_wake = pop_waiter(mutex);
299 unsigned our_handoff;
300
301 if (to_wake) {
302 qemu_co_mutex_wake(mutex, to_wake->co);
303 break;
304 }
305
306
307
308
309
310 if (++mutex->sequence == 0) {
311 mutex->sequence = 1;
312 }
313
314 our_handoff = mutex->sequence;
315 atomic_mb_set(&mutex->handoff, our_handoff);
316 if (!has_waiters(mutex)) {
317
318
319
320 break;
321 }
322
323
324
325
326 if (atomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) {
327 break;
328 }
329 }
330
331 trace_qemu_co_mutex_unlock_return(mutex, self);
332}
333
334void qemu_co_rwlock_init(CoRwlock *lock)
335{
336 memset(lock, 0, sizeof(*lock));
337 qemu_co_queue_init(&lock->queue);
338 qemu_co_mutex_init(&lock->mutex);
339}
340
341void qemu_co_rwlock_rdlock(CoRwlock *lock)
342{
343 Coroutine *self = qemu_coroutine_self();
344
345 qemu_co_mutex_lock(&lock->mutex);
346
347 while (lock->pending_writer) {
348 qemu_co_queue_wait(&lock->queue, &lock->mutex);
349 }
350 lock->reader++;
351 qemu_co_mutex_unlock(&lock->mutex);
352
353
354 self->locks_held++;
355}
356
357void qemu_co_rwlock_unlock(CoRwlock *lock)
358{
359 Coroutine *self = qemu_coroutine_self();
360
361 assert(qemu_in_coroutine());
362 if (!lock->reader) {
363
364 qemu_co_queue_restart_all(&lock->queue);
365 } else {
366 self->locks_held--;
367
368 qemu_co_mutex_lock(&lock->mutex);
369 lock->reader--;
370 assert(lock->reader >= 0);
371
372 if (!lock->reader) {
373 qemu_co_queue_next(&lock->queue);
374 }
375 }
376 qemu_co_mutex_unlock(&lock->mutex);
377}
378
379void qemu_co_rwlock_downgrade(CoRwlock *lock)
380{
381 Coroutine *self = qemu_coroutine_self();
382
383
384
385
386 assert(lock->reader == 0);
387 lock->reader++;
388 qemu_co_mutex_unlock(&lock->mutex);
389
390
391 self->locks_held++;
392}
393
394void qemu_co_rwlock_wrlock(CoRwlock *lock)
395{
396 qemu_co_mutex_lock(&lock->mutex);
397 lock->pending_writer++;
398 while (lock->reader) {
399 qemu_co_queue_wait(&lock->queue, &lock->mutex);
400 }
401 lock->pending_writer--;
402
403
404
405
406
407}
408
409void qemu_co_rwlock_upgrade(CoRwlock *lock)
410{
411 Coroutine *self = qemu_coroutine_self();
412
413 qemu_co_mutex_lock(&lock->mutex);
414 assert(lock->reader > 0);
415 lock->reader--;
416 lock->pending_writer++;
417 while (lock->reader) {
418 qemu_co_queue_wait(&lock->queue, &lock->mutex);
419 }
420 lock->pending_writer--;
421
422
423
424
425
426 self->locks_held--;
427}
428