1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29#include "qemu/osdep.h"
30#include "qemu/coroutine.h"
31#include "qemu/coroutine_int.h"
32#include "qemu/processor.h"
33#include "qemu/queue.h"
34#include "block/aio.h"
35#include "trace.h"
36
37void qemu_co_queue_init(CoQueue *queue)
38{
39 QSIMPLEQ_INIT(&queue->entries);
40}
41
42void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuLockable *lock)
43{
44 Coroutine *self = qemu_coroutine_self();
45 QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
46
47 if (lock) {
48 qemu_lockable_unlock(lock);
49 }
50
51
52
53
54
55
56 qemu_coroutine_yield();
57 assert(qemu_in_coroutine());
58
59
60
61
62
63
64
65 if (lock) {
66 qemu_lockable_lock(lock);
67 }
68}
69
70static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
71{
72 Coroutine *next;
73
74 if (QSIMPLEQ_EMPTY(&queue->entries)) {
75 return false;
76 }
77
78 while ((next = QSIMPLEQ_FIRST(&queue->entries)) != NULL) {
79 QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
80 aio_co_wake(next);
81 if (single) {
82 break;
83 }
84 }
85 return true;
86}
87
88bool coroutine_fn qemu_co_queue_next(CoQueue *queue)
89{
90 assert(qemu_in_coroutine());
91 return qemu_co_queue_do_restart(queue, true);
92}
93
94void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue)
95{
96 assert(qemu_in_coroutine());
97 qemu_co_queue_do_restart(queue, false);
98}
99
100bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock)
101{
102 Coroutine *next;
103
104 next = QSIMPLEQ_FIRST(&queue->entries);
105 if (!next) {
106 return false;
107 }
108
109 QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
110 if (lock) {
111 qemu_lockable_unlock(lock);
112 }
113 aio_co_wake(next);
114 if (lock) {
115 qemu_lockable_lock(lock);
116 }
117 return true;
118}
119
120bool qemu_co_queue_empty(CoQueue *queue)
121{
122 return QSIMPLEQ_FIRST(&queue->entries) == NULL;
123}
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144typedef struct CoWaitRecord {
145 Coroutine *co;
146 QSLIST_ENTRY(CoWaitRecord) next;
147} CoWaitRecord;
148
149static void push_waiter(CoMutex *mutex, CoWaitRecord *w)
150{
151 w->co = qemu_coroutine_self();
152 QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next);
153}
154
155static void move_waiters(CoMutex *mutex)
156{
157 QSLIST_HEAD(, CoWaitRecord) reversed;
158 QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push);
159 while (!QSLIST_EMPTY(&reversed)) {
160 CoWaitRecord *w = QSLIST_FIRST(&reversed);
161 QSLIST_REMOVE_HEAD(&reversed, next);
162 QSLIST_INSERT_HEAD(&mutex->to_pop, w, next);
163 }
164}
165
166static CoWaitRecord *pop_waiter(CoMutex *mutex)
167{
168 CoWaitRecord *w;
169
170 if (QSLIST_EMPTY(&mutex->to_pop)) {
171 move_waiters(mutex);
172 if (QSLIST_EMPTY(&mutex->to_pop)) {
173 return NULL;
174 }
175 }
176 w = QSLIST_FIRST(&mutex->to_pop);
177 QSLIST_REMOVE_HEAD(&mutex->to_pop, next);
178 return w;
179}
180
181static bool has_waiters(CoMutex *mutex)
182{
183 return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push);
184}
185
186void qemu_co_mutex_init(CoMutex *mutex)
187{
188 memset(mutex, 0, sizeof(*mutex));
189}
190
191static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co)
192{
193
194
195
196 smp_read_barrier_depends();
197 mutex->ctx = co->ctx;
198 aio_co_wake(co);
199}
200
201static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx,
202 CoMutex *mutex)
203{
204 Coroutine *self = qemu_coroutine_self();
205 CoWaitRecord w;
206 unsigned old_handoff;
207
208 trace_qemu_co_mutex_lock_entry(mutex, self);
209 w.co = self;
210 push_waiter(mutex, &w);
211
212
213
214
215 old_handoff = atomic_mb_read(&mutex->handoff);
216 if (old_handoff &&
217 has_waiters(mutex) &&
218 atomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) {
219
220
221
222 CoWaitRecord *to_wake = pop_waiter(mutex);
223 Coroutine *co = to_wake->co;
224 if (co == self) {
225
226 assert(to_wake == &w);
227 mutex->ctx = ctx;
228 return;
229 }
230
231 qemu_co_mutex_wake(mutex, co);
232 }
233
234 qemu_coroutine_yield();
235 trace_qemu_co_mutex_lock_return(mutex, self);
236}
237
238void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
239{
240 AioContext *ctx = qemu_get_current_aio_context();
241 Coroutine *self = qemu_coroutine_self();
242 int waiters, i;
243
244
245
246
247
248
249
250
251 i = 0;
252retry_fast_path:
253 waiters = atomic_cmpxchg(&mutex->locked, 0, 1);
254 if (waiters != 0) {
255 while (waiters == 1 && ++i < 1000) {
256 if (atomic_read(&mutex->ctx) == ctx) {
257 break;
258 }
259 if (atomic_read(&mutex->locked) == 0) {
260 goto retry_fast_path;
261 }
262 cpu_relax();
263 }
264 waiters = atomic_fetch_inc(&mutex->locked);
265 }
266
267 if (waiters == 0) {
268
269 trace_qemu_co_mutex_lock_uncontended(mutex, self);
270 mutex->ctx = ctx;
271 } else {
272 qemu_co_mutex_lock_slowpath(ctx, mutex);
273 }
274 mutex->holder = self;
275 self->locks_held++;
276}
277
278void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
279{
280 Coroutine *self = qemu_coroutine_self();
281
282 trace_qemu_co_mutex_unlock_entry(mutex, self);
283
284 assert(mutex->locked);
285 assert(mutex->holder == self);
286 assert(qemu_in_coroutine());
287
288 mutex->ctx = NULL;
289 mutex->holder = NULL;
290 self->locks_held--;
291 if (atomic_fetch_dec(&mutex->locked) == 1) {
292
293 return;
294 }
295
296 for (;;) {
297 CoWaitRecord *to_wake = pop_waiter(mutex);
298 unsigned our_handoff;
299
300 if (to_wake) {
301 qemu_co_mutex_wake(mutex, to_wake->co);
302 break;
303 }
304
305
306
307
308
309 if (++mutex->sequence == 0) {
310 mutex->sequence = 1;
311 }
312
313 our_handoff = mutex->sequence;
314 atomic_mb_set(&mutex->handoff, our_handoff);
315 if (!has_waiters(mutex)) {
316
317
318
319 break;
320 }
321
322
323
324
325 if (atomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) {
326 break;
327 }
328 }
329
330 trace_qemu_co_mutex_unlock_return(mutex, self);
331}
332
333void qemu_co_rwlock_init(CoRwlock *lock)
334{
335 memset(lock, 0, sizeof(*lock));
336 qemu_co_queue_init(&lock->queue);
337 qemu_co_mutex_init(&lock->mutex);
338}
339
340void qemu_co_rwlock_rdlock(CoRwlock *lock)
341{
342 Coroutine *self = qemu_coroutine_self();
343
344 qemu_co_mutex_lock(&lock->mutex);
345
346 while (lock->pending_writer) {
347 qemu_co_queue_wait(&lock->queue, &lock->mutex);
348 }
349 lock->reader++;
350 qemu_co_mutex_unlock(&lock->mutex);
351
352
353 self->locks_held++;
354}
355
356void qemu_co_rwlock_unlock(CoRwlock *lock)
357{
358 Coroutine *self = qemu_coroutine_self();
359
360 assert(qemu_in_coroutine());
361 if (!lock->reader) {
362
363 qemu_co_queue_restart_all(&lock->queue);
364 } else {
365 self->locks_held--;
366
367 qemu_co_mutex_lock(&lock->mutex);
368 lock->reader--;
369 assert(lock->reader >= 0);
370
371 if (!lock->reader) {
372 qemu_co_queue_next(&lock->queue);
373 }
374 }
375 qemu_co_mutex_unlock(&lock->mutex);
376}
377
378void qemu_co_rwlock_downgrade(CoRwlock *lock)
379{
380 Coroutine *self = qemu_coroutine_self();
381
382
383
384
385 assert(lock->reader == 0);
386 lock->reader++;
387 qemu_co_mutex_unlock(&lock->mutex);
388
389
390 self->locks_held++;
391}
392
393void qemu_co_rwlock_wrlock(CoRwlock *lock)
394{
395 qemu_co_mutex_lock(&lock->mutex);
396 lock->pending_writer++;
397 while (lock->reader) {
398 qemu_co_queue_wait(&lock->queue, &lock->mutex);
399 }
400 lock->pending_writer--;
401
402
403
404
405
406}
407
408void qemu_co_rwlock_upgrade(CoRwlock *lock)
409{
410 Coroutine *self = qemu_coroutine_self();
411
412 qemu_co_mutex_lock(&lock->mutex);
413 assert(lock->reader > 0);
414 lock->reader--;
415 lock->pending_writer++;
416 while (lock->reader) {
417 qemu_co_queue_wait(&lock->queue, &lock->mutex);
418 }
419 lock->pending_writer--;
420
421
422
423
424
425 self->locks_held--;
426}
427