1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46#include "qemu/osdep.h"
47#include <poll.h>
48#include "qemu/rcu_queue.h"
49#include "aio-posix.h"
50
51enum {
52 FDMON_IO_URING_ENTRIES = 128,
53
54
55 FDMON_IO_URING_PENDING = (1 << 0),
56 FDMON_IO_URING_ADD = (1 << 1),
57 FDMON_IO_URING_REMOVE = (1 << 2),
58};
59
60static inline int poll_events_from_pfd(int pfd_events)
61{
62 return (pfd_events & G_IO_IN ? POLLIN : 0) |
63 (pfd_events & G_IO_OUT ? POLLOUT : 0) |
64 (pfd_events & G_IO_HUP ? POLLHUP : 0) |
65 (pfd_events & G_IO_ERR ? POLLERR : 0);
66}
67
68static inline int pfd_events_from_poll(int poll_events)
69{
70 return (poll_events & POLLIN ? G_IO_IN : 0) |
71 (poll_events & POLLOUT ? G_IO_OUT : 0) |
72 (poll_events & POLLHUP ? G_IO_HUP : 0) |
73 (poll_events & POLLERR ? G_IO_ERR : 0);
74}
75
76
77
78
79
80static struct io_uring_sqe *get_sqe(AioContext *ctx)
81{
82 struct io_uring *ring = &ctx->fdmon_io_uring;
83 struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
84 int ret;
85
86 if (likely(sqe)) {
87 return sqe;
88 }
89
90
91 do {
92 ret = io_uring_submit(ring);
93 } while (ret == -EINTR);
94
95 assert(ret > 1);
96 sqe = io_uring_get_sqe(ring);
97 assert(sqe);
98 return sqe;
99}
100
101
102static void enqueue(AioHandlerSList *head, AioHandler *node, unsigned flags)
103{
104 unsigned old_flags;
105
106 old_flags = atomic_fetch_or(&node->flags, FDMON_IO_URING_PENDING | flags);
107 if (!(old_flags & FDMON_IO_URING_PENDING)) {
108 QSLIST_INSERT_HEAD_ATOMIC(head, node, node_submitted);
109 }
110}
111
112
113static AioHandler *dequeue(AioHandlerSList *head, unsigned *flags)
114{
115 AioHandler *node = QSLIST_FIRST(head);
116
117 if (!node) {
118 return NULL;
119 }
120
121
122 QSLIST_REMOVE_HEAD(head, node_submitted);
123
124
125
126
127
128
129
130 *flags = atomic_fetch_and(&node->flags, ~(FDMON_IO_URING_PENDING |
131 FDMON_IO_URING_ADD));
132 return node;
133}
134
135static void fdmon_io_uring_update(AioContext *ctx,
136 AioHandler *old_node,
137 AioHandler *new_node)
138{
139 if (new_node) {
140 enqueue(&ctx->submit_list, new_node, FDMON_IO_URING_ADD);
141 }
142
143 if (old_node) {
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162 assert(!QLIST_IS_INSERTED(old_node, node_deleted));
163 old_node->node_deleted.le_prev = &old_node->node_deleted.le_next;
164
165 enqueue(&ctx->submit_list, old_node, FDMON_IO_URING_REMOVE);
166 }
167}
168
169static void add_poll_add_sqe(AioContext *ctx, AioHandler *node)
170{
171 struct io_uring_sqe *sqe = get_sqe(ctx);
172 int events = poll_events_from_pfd(node->pfd.events);
173
174 io_uring_prep_poll_add(sqe, node->pfd.fd, events);
175 io_uring_sqe_set_data(sqe, node);
176}
177
178static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node)
179{
180 struct io_uring_sqe *sqe = get_sqe(ctx);
181
182 io_uring_prep_poll_remove(sqe, node);
183}
184
185
186static void add_timeout_sqe(AioContext *ctx, int64_t ns)
187{
188 struct io_uring_sqe *sqe;
189 struct __kernel_timespec ts = {
190 .tv_sec = ns / NANOSECONDS_PER_SECOND,
191 .tv_nsec = ns % NANOSECONDS_PER_SECOND,
192 };
193
194 sqe = get_sqe(ctx);
195 io_uring_prep_timeout(sqe, &ts, 1, 0);
196}
197
198
199static void fill_sq_ring(AioContext *ctx)
200{
201 AioHandlerSList submit_list;
202 AioHandler *node;
203 unsigned flags;
204
205 QSLIST_MOVE_ATOMIC(&submit_list, &ctx->submit_list);
206
207 while ((node = dequeue(&submit_list, &flags))) {
208
209 if (flags & FDMON_IO_URING_ADD) {
210 add_poll_add_sqe(ctx, node);
211 }
212 if (flags & FDMON_IO_URING_REMOVE) {
213 add_poll_remove_sqe(ctx, node);
214 }
215 }
216}
217
218
219static bool process_cqe(AioContext *ctx,
220 AioHandlerList *ready_list,
221 struct io_uring_cqe *cqe)
222{
223 AioHandler *node = io_uring_cqe_get_data(cqe);
224 unsigned flags;
225
226
227 if (!node) {
228 return false;
229 }
230
231
232
233
234
235
236 flags = atomic_fetch_and(&node->flags, ~FDMON_IO_URING_REMOVE);
237 if (flags & FDMON_IO_URING_REMOVE) {
238 QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
239 return false;
240 }
241
242 aio_add_ready_handler(ready_list, node, pfd_events_from_poll(cqe->res));
243
244
245 add_poll_add_sqe(ctx, node);
246 return true;
247}
248
249static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list)
250{
251 struct io_uring *ring = &ctx->fdmon_io_uring;
252 struct io_uring_cqe *cqe;
253 unsigned num_cqes = 0;
254 unsigned num_ready = 0;
255 unsigned head;
256
257 io_uring_for_each_cqe(ring, head, cqe) {
258 if (process_cqe(ctx, ready_list, cqe)) {
259 num_ready++;
260 }
261
262 num_cqes++;
263 }
264
265 io_uring_cq_advance(ring, num_cqes);
266 return num_ready;
267}
268
269static int fdmon_io_uring_wait(AioContext *ctx, AioHandlerList *ready_list,
270 int64_t timeout)
271{
272 unsigned wait_nr = 1;
273 int ret;
274
275
276 if (atomic_read(&ctx->external_disable_cnt)) {
277 return fdmon_poll_ops.wait(ctx, ready_list, timeout);
278 }
279
280 if (timeout == 0) {
281 wait_nr = 0;
282 } else if (timeout > 0) {
283 add_timeout_sqe(ctx, timeout);
284 }
285
286 fill_sq_ring(ctx);
287
288 do {
289 ret = io_uring_submit_and_wait(&ctx->fdmon_io_uring, wait_nr);
290 } while (ret == -EINTR);
291
292 assert(ret >= 0);
293
294 return process_cq_ring(ctx, ready_list);
295}
296
297static bool fdmon_io_uring_need_wait(AioContext *ctx)
298{
299
300 if (io_uring_cq_ready(&ctx->fdmon_io_uring)) {
301 return true;
302 }
303
304
305 if (io_uring_sq_ready(&ctx->fdmon_io_uring)) {
306 return true;
307 }
308
309
310 if (!QSLIST_EMPTY_RCU(&ctx->submit_list)) {
311 return true;
312 }
313
314
315 return atomic_read(&ctx->external_disable_cnt);
316}
317
318static const FDMonOps fdmon_io_uring_ops = {
319 .update = fdmon_io_uring_update,
320 .wait = fdmon_io_uring_wait,
321 .need_wait = fdmon_io_uring_need_wait,
322};
323
324bool fdmon_io_uring_setup(AioContext *ctx)
325{
326 int ret;
327
328 ret = io_uring_queue_init(FDMON_IO_URING_ENTRIES, &ctx->fdmon_io_uring, 0);
329 if (ret != 0) {
330 return false;
331 }
332
333 QSLIST_INIT(&ctx->submit_list);
334 ctx->fdmon_ops = &fdmon_io_uring_ops;
335 return true;
336}
337
338void fdmon_io_uring_destroy(AioContext *ctx)
339{
340 if (ctx->fdmon_ops == &fdmon_io_uring_ops) {
341 AioHandler *node;
342
343 io_uring_queue_exit(&ctx->fdmon_io_uring);
344
345
346 while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) {
347 unsigned flags = atomic_fetch_and(&node->flags,
348 ~(FDMON_IO_URING_PENDING |
349 FDMON_IO_URING_ADD |
350 FDMON_IO_URING_REMOVE));
351
352 if (flags & FDMON_IO_URING_REMOVE) {
353 QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
354 }
355
356 QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted);
357 }
358
359 ctx->fdmon_ops = &fdmon_poll_ops;
360 }
361}
362