1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29#include "qemu/osdep.h"
30#include "qemu-common.h"
31#include "qemu/rcu.h"
32#include "qemu/atomic.h"
33#include "qemu/thread.h"
34#include "qemu/main-loop.h"
35
36
37
38
39
40#define RCU_GP_LOCKED (1UL << 0)
41#define RCU_GP_CTR (1UL << 1)
42
43unsigned long rcu_gp_ctr = RCU_GP_LOCKED;
44
45QemuEvent rcu_gp_event;
46static QemuMutex rcu_registry_lock;
47static QemuMutex rcu_sync_lock;
48
49
50
51
52
53static inline int rcu_gp_ongoing(unsigned long *ctr)
54{
55 unsigned long v;
56
57 v = atomic_read(ctr);
58 return v && (v != rcu_gp_ctr);
59}
60
61
62
63
64__thread struct rcu_reader_data rcu_reader;
65
66
67typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
68static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
69
70
71static void wait_for_readers(void)
72{
73 ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders);
74 struct rcu_reader_data *index, *tmp;
75
76 for (;;) {
77
78
79
80 qemu_event_reset(&rcu_gp_event);
81
82
83
84
85
86
87 smp_wmb();
88 QLIST_FOREACH(index, ®istry, node) {
89 atomic_set(&index->waiting, true);
90 }
91
92
93 smp_mb();
94
95 QLIST_FOREACH_SAFE(index, ®istry, node, tmp) {
96 if (!rcu_gp_ongoing(&index->ctr)) {
97 QLIST_REMOVE(index, node);
98 QLIST_INSERT_HEAD(&qsreaders, index, node);
99
100
101
102
103 atomic_set(&index->waiting, false);
104 }
105 }
106
107
108 smp_rmb();
109
110 if (QLIST_EMPTY(®istry)) {
111 break;
112 }
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 qemu_mutex_unlock(&rcu_registry_lock);
132 qemu_event_wait(&rcu_gp_event);
133 qemu_mutex_lock(&rcu_registry_lock);
134 }
135
136
137 QLIST_SWAP(®istry, &qsreaders, node);
138}
139
140void synchronize_rcu(void)
141{
142 qemu_mutex_lock(&rcu_sync_lock);
143 qemu_mutex_lock(&rcu_registry_lock);
144
145 if (!QLIST_EMPTY(®istry)) {
146
147
148
149 if (sizeof(rcu_gp_ctr) < 8) {
150
151
152
153
154
155 atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
156 wait_for_readers();
157 atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
158 } else {
159
160 atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
161 }
162
163 wait_for_readers();
164 }
165
166 qemu_mutex_unlock(&rcu_registry_lock);
167 qemu_mutex_unlock(&rcu_sync_lock);
168}
169
170
171#define RCU_CALL_MIN_SIZE 30
172
173
174
175
176static struct rcu_head dummy;
177static struct rcu_head *head = &dummy, **tail = &dummy.next;
178static int rcu_call_count;
179static QemuEvent rcu_call_ready_event;
180
181static void enqueue(struct rcu_head *node)
182{
183 struct rcu_head **old_tail;
184
185 node->next = NULL;
186 old_tail = atomic_xchg(&tail, &node->next);
187 atomic_mb_set(old_tail, node);
188}
189
190static struct rcu_head *try_dequeue(void)
191{
192 struct rcu_head *node, *next;
193
194retry:
195
196
197
198
199
200
201 if (head == &dummy && atomic_mb_read(&tail) == &dummy.next) {
202 abort();
203 }
204
205
206
207
208 node = head;
209 next = atomic_mb_read(&head->next);
210 if (!next) {
211 return NULL;
212 }
213
214
215
216
217
218
219 head = next;
220
221
222 if (node == &dummy) {
223 enqueue(node);
224 goto retry;
225 }
226
227 return node;
228}
229
230static void *call_rcu_thread(void *opaque)
231{
232 struct rcu_head *node;
233
234 rcu_register_thread();
235
236 for (;;) {
237 int tries = 0;
238 int n = atomic_read(&rcu_call_count);
239
240
241
242
243
244 while (n == 0 || (n < RCU_CALL_MIN_SIZE && ++tries <= 5)) {
245 g_usleep(10000);
246 if (n == 0) {
247 qemu_event_reset(&rcu_call_ready_event);
248 n = atomic_read(&rcu_call_count);
249 if (n == 0) {
250 qemu_event_wait(&rcu_call_ready_event);
251 }
252 }
253 n = atomic_read(&rcu_call_count);
254 }
255
256 atomic_sub(&rcu_call_count, n);
257 synchronize_rcu();
258 qemu_mutex_lock_iothread();
259 while (n > 0) {
260 node = try_dequeue();
261 while (!node) {
262 qemu_mutex_unlock_iothread();
263 qemu_event_reset(&rcu_call_ready_event);
264 node = try_dequeue();
265 if (!node) {
266 qemu_event_wait(&rcu_call_ready_event);
267 node = try_dequeue();
268 }
269 qemu_mutex_lock_iothread();
270 }
271
272 n--;
273 node->func(node);
274 }
275 qemu_mutex_unlock_iothread();
276 }
277 abort();
278}
279
280void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
281{
282 node->func = func;
283 enqueue(node);
284 atomic_inc(&rcu_call_count);
285 qemu_event_set(&rcu_call_ready_event);
286}
287
288void rcu_register_thread(void)
289{
290 assert(rcu_reader.ctr == 0);
291 qemu_mutex_lock(&rcu_registry_lock);
292 QLIST_INSERT_HEAD(®istry, &rcu_reader, node);
293 qemu_mutex_unlock(&rcu_registry_lock);
294}
295
296void rcu_unregister_thread(void)
297{
298 qemu_mutex_lock(&rcu_registry_lock);
299 QLIST_REMOVE(&rcu_reader, node);
300 qemu_mutex_unlock(&rcu_registry_lock);
301}
302
303static void rcu_init_complete(void)
304{
305 QemuThread thread;
306
307 qemu_mutex_init(&rcu_registry_lock);
308 qemu_mutex_init(&rcu_sync_lock);
309 qemu_event_init(&rcu_gp_event, true);
310
311 qemu_event_init(&rcu_call_ready_event, false);
312
313
314
315
316 qemu_thread_create(&thread, "call_rcu", call_rcu_thread,
317 NULL, QEMU_THREAD_DETACHED);
318
319 rcu_register_thread();
320}
321
322#ifdef CONFIG_POSIX
323static void rcu_init_lock(void)
324{
325 qemu_mutex_lock(&rcu_sync_lock);
326 qemu_mutex_lock(&rcu_registry_lock);
327}
328
329static void rcu_init_unlock(void)
330{
331 qemu_mutex_unlock(&rcu_registry_lock);
332 qemu_mutex_unlock(&rcu_sync_lock);
333}
334#endif
335
336void rcu_after_fork(void)
337{
338 memset(®istry, 0, sizeof(registry));
339 rcu_init_complete();
340}
341
342static void __attribute__((__constructor__)) rcu_init(void)
343{
344#ifdef CONFIG_POSIX
345 pthread_atfork(rcu_init_lock, rcu_init_unlock, rcu_init_unlock);
346#endif
347 rcu_init_complete();
348}
349